]> git.saurik.com Git - redis.git/blame - redis.c
TODO updated
[redis.git] / redis.c
CommitLineData
ed9b544e 1/*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
1c0abf3c 30#define REDIS_VERSION "1.07"
23d4709d 31
32#include "fmacros.h"
fbf9bcdb 33#include "config.h"
ed9b544e 34
35#include <stdio.h>
36#include <stdlib.h>
37#include <string.h>
38#include <time.h>
39#include <unistd.h>
c9468bcf 40#define __USE_POSIX199309
ed9b544e 41#include <signal.h>
fbf9bcdb 42
43#ifdef HAVE_BACKTRACE
c9468bcf 44#include <execinfo.h>
45#include <ucontext.h>
fbf9bcdb 46#endif /* HAVE_BACKTRACE */
47
ed9b544e 48#include <sys/wait.h>
49#include <errno.h>
50#include <assert.h>
51#include <ctype.h>
52#include <stdarg.h>
53#include <inttypes.h>
54#include <arpa/inet.h>
55#include <sys/stat.h>
56#include <fcntl.h>
57#include <sys/time.h>
58#include <sys/resource.h>
2895e862 59#include <sys/uio.h>
f78fd11b 60#include <limits.h>
a7866db6 61#include <math.h>
0bc1b2f6 62
63#if defined(__sun)
5043dff3 64#include "solarisfixes.h"
65#endif
ed9b544e 66
c9468bcf 67#include "redis.h"
ed9b544e 68#include "ae.h" /* Event driven programming library */
69#include "sds.h" /* Dynamic safe strings */
70#include "anet.h" /* Networking the easy way */
71#include "dict.h" /* Hash tables */
72#include "adlist.h" /* Linked lists */
73#include "zmalloc.h" /* total memory usage aware version of malloc/free */
5f5b9840 74#include "lzf.h" /* LZF compression library */
75#include "pqsort.h" /* Partial qsort for SORT+LIMIT */
ed9b544e 76
77/* Error codes */
78#define REDIS_OK 0
79#define REDIS_ERR -1
80
81/* Static server configuration */
82#define REDIS_SERVERPORT 6379 /* TCP port */
83#define REDIS_MAXIDLETIME (60*5) /* default client timeout */
6208b3a7 84#define REDIS_IOBUF_LEN 1024
ed9b544e 85#define REDIS_LOADBUF_LEN 1024
93ea3759 86#define REDIS_STATIC_ARGS 4
ed9b544e 87#define REDIS_DEFAULT_DBNUM 16
88#define REDIS_CONFIGLINE_MAX 1024
89#define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90#define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
94754ccc 91#define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
6f376729 92#define REDIS_MAX_WRITE_PER_EVENT (1024*64)
2895e862 93#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96#define REDIS_WRITEV_THRESHOLD 3
97/* Max number of iovecs used for each writev call */
98#define REDIS_WRITEV_IOVEC_COUNT 256
ed9b544e 99
100/* Hash table parameters */
101#define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
ed9b544e 102
103/* Command flags */
3fd78bcd 104#define REDIS_CMD_BULK 1 /* Bulk write command */
105#define REDIS_CMD_INLINE 2 /* Inline command */
106/* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110#define REDIS_CMD_DENYOOM 4
ed9b544e 111
112/* Object types */
113#define REDIS_STRING 0
114#define REDIS_LIST 1
115#define REDIS_SET 2
1812e024 116#define REDIS_ZSET 3
117#define REDIS_HASH 4
f78fd11b 118
942a3961 119/* Objects encoding */
120#define REDIS_ENCODING_RAW 0 /* Raw representation */
121#define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
f78fd11b 123/* Object types only used for dumping to disk */
bb32ede5 124#define REDIS_EXPIRETIME 253
ed9b544e 125#define REDIS_SELECTDB 254
126#define REDIS_EOF 255
127
f78fd11b 128/* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
a4d1ba9a 135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
f78fd11b 138 *
10c43610 139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
f78fd11b 141#define REDIS_RDB_6BITLEN 0
142#define REDIS_RDB_14BITLEN 1
143#define REDIS_RDB_32BITLEN 2
17be1a4a 144#define REDIS_RDB_ENCVAL 3
f78fd11b 145#define REDIS_RDB_LENERR UINT_MAX
146
a4d1ba9a 147/* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150#define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151#define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
774e3047 153#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
a4d1ba9a 154
ed9b544e 155/* Client flags */
156#define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157#define REDIS_SLAVE 2 /* This client is a slave server */
158#define REDIS_MASTER 4 /* This client is a master server */
87eca727 159#define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
ed9b544e 160
40d224a9 161/* Slave replication state - slave side */
ed9b544e 162#define REDIS_REPL_NONE 0 /* No active replication */
163#define REDIS_REPL_CONNECT 1 /* Must connect to master */
164#define REDIS_REPL_CONNECTED 2 /* Connected to master */
165
40d224a9 166/* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170#define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171#define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172#define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173#define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
174
ed9b544e 175/* List related stuff */
176#define REDIS_HEAD 0
177#define REDIS_TAIL 1
178
179/* Sort operations */
180#define REDIS_SORT_GET 0
443c6409 181#define REDIS_SORT_ASC 1
182#define REDIS_SORT_DESC 2
ed9b544e 183#define REDIS_SORTKEY_MAX 1024
184
185/* Log levels */
186#define REDIS_DEBUG 0
187#define REDIS_NOTICE 1
188#define REDIS_WARNING 2
189
190/* Anti-warning macro... */
191#define REDIS_NOTUSED(V) ((void) V)
192
6b47e12e 193#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
ed9b544e 195
48f0308a 196/* Append only defines */
197#define APPENDFSYNC_NO 0
198#define APPENDFSYNC_ALWAYS 1
199#define APPENDFSYNC_EVERYSEC 2
200
ed9b544e 201/*================================= Data types ============================== */
202
203/* A redis object, that is a type able to hold a string / list / set */
204typedef struct redisObject {
ed9b544e 205 void *ptr;
942a3961 206 unsigned char type;
207 unsigned char encoding;
208 unsigned char notused[2];
ed9b544e 209 int refcount;
210} robj;
211
3305306f 212typedef struct redisDb {
213 dict *dict;
214 dict *expires;
215 int id;
216} redisDb;
217
ed9b544e 218/* With multiplexing we need to take per-clinet state.
219 * Clients are taken in a liked list. */
220typedef struct redisClient {
221 int fd;
3305306f 222 redisDb *db;
ed9b544e 223 int dictid;
224 sds querybuf;
e8a74421 225 robj **argv, **mbargv;
226 int argc, mbargc;
40d224a9 227 int bulklen; /* bulk read len. -1 if not in bulk read mode */
e8a74421 228 int multibulk; /* multi bulk command format active */
ed9b544e 229 list *reply;
230 int sentlen;
231 time_t lastinteraction; /* time of the last interaction, used for timeout */
40d224a9 232 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
233 int slaveseldb; /* slave selected db, if this client is a slave */
234 int authenticated; /* when requirepass is non-NULL */
235 int replstate; /* replication state if this is a slave */
236 int repldbfd; /* replication DB file descriptor */
6208b3a7 237 long repldboff; /* replication DB file offset */
40d224a9 238 off_t repldbsize; /* replication DB file size */
ed9b544e 239} redisClient;
240
241struct saveparam {
242 time_t seconds;
243 int changes;
244};
245
246/* Global server state structure */
247struct redisServer {
248 int port;
249 int fd;
3305306f 250 redisDb *db;
10c43610 251 dict *sharingpool;
252 unsigned int sharingpoolsize;
ed9b544e 253 long long dirty; /* changes to DB from the last save */
254 list *clients;
87eca727 255 list *slaves, *monitors;
ed9b544e 256 char neterr[ANET_ERR_LEN];
257 aeEventLoop *el;
258 int cronloops; /* number of times the cron function run */
259 list *objfreelist; /* A list of freed objects to avoid malloc() */
260 time_t lastsave; /* Unix time of last save succeeede */
5fba9f71 261 size_t usedmemory; /* Used memory in megabytes */
ed9b544e 262 /* Fields used only for stats */
263 time_t stat_starttime; /* server start time */
264 long long stat_numcommands; /* number of processed commands */
265 long long stat_numconnections; /* number of connections received */
266 /* Configuration */
267 int verbosity;
268 int glueoutputbuf;
269 int maxidletime;
270 int dbnum;
271 int daemonize;
44b38ef4 272 int appendonly;
48f0308a 273 int appendfsync;
274 time_t lastfsync;
44b38ef4 275 int appendfd;
276 int appendseldb;
ed329fcf 277 char *pidfile;
9f3c422c 278 pid_t bgsavechildpid;
9d65a1bb 279 pid_t bgrewritechildpid;
280 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
ed9b544e 281 struct saveparam *saveparams;
282 int saveparamslen;
283 char *logfile;
284 char *bindaddr;
285 char *dbfilename;
44b38ef4 286 char *appendfilename;
abcb223e 287 char *requirepass;
10c43610 288 int shareobjects;
ed9b544e 289 /* Replication related */
290 int isslave;
d0ccebcf 291 char *masterauth;
ed9b544e 292 char *masterhost;
293 int masterport;
40d224a9 294 redisClient *master; /* client that is master for this slave */
ed9b544e 295 int replstate;
285add55 296 unsigned int maxclients;
d4465900 297 unsigned long maxmemory;
ed9b544e 298 /* Sort parameters - qsort_r() is only available under BSD so we
299 * have to take this state global, in order to pass it to sortCompare() */
300 int sort_desc;
301 int sort_alpha;
302 int sort_bypattern;
303};
304
305typedef void redisCommandProc(redisClient *c);
306struct redisCommand {
307 char *name;
308 redisCommandProc *proc;
309 int arity;
310 int flags;
311};
312
de96dbfe 313struct redisFunctionSym {
314 char *name;
56906eef 315 unsigned long pointer;
de96dbfe 316};
317
ed9b544e 318typedef struct _redisSortObject {
319 robj *obj;
320 union {
321 double score;
322 robj *cmpobj;
323 } u;
324} redisSortObject;
325
326typedef struct _redisSortOperation {
327 int type;
328 robj *pattern;
329} redisSortOperation;
330
6b47e12e 331/* ZSETs use a specialized version of Skiplists */
332
333typedef struct zskiplistNode {
334 struct zskiplistNode **forward;
e3870fab 335 struct zskiplistNode *backward;
6b47e12e 336 double score;
337 robj *obj;
338} zskiplistNode;
339
340typedef struct zskiplist {
e3870fab 341 struct zskiplistNode *header, *tail;
d13f767c 342 unsigned long length;
6b47e12e 343 int level;
344} zskiplist;
345
1812e024 346typedef struct zset {
347 dict *dict;
6b47e12e 348 zskiplist *zsl;
1812e024 349} zset;
350
6b47e12e 351/* Our shared "common" objects */
352
ed9b544e 353struct sharedObjectsStruct {
c937aa89 354 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
7b45bfb2 355 *colon, *nullbulk, *nullmultibulk,
c937aa89 356 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
357 *outofrangeerr, *plus,
ed9b544e 358 *select0, *select1, *select2, *select3, *select4,
359 *select5, *select6, *select7, *select8, *select9;
360} shared;
361
a7866db6 362/* Global vars that are actally used as constants. The following double
363 * values are used for double on-disk serialization, and are initialized
364 * at runtime to avoid strange compiler optimizations. */
365
366static double R_Zero, R_PosInf, R_NegInf, R_Nan;
367
ed9b544e 368/*================================ Prototypes =============================== */
369
370static void freeStringObject(robj *o);
371static void freeListObject(robj *o);
372static void freeSetObject(robj *o);
373static void decrRefCount(void *o);
374static robj *createObject(int type, void *ptr);
375static void freeClient(redisClient *c);
f78fd11b 376static int rdbLoad(char *filename);
ed9b544e 377static void addReply(redisClient *c, robj *obj);
378static void addReplySds(redisClient *c, sds s);
379static void incrRefCount(robj *o);
f78fd11b 380static int rdbSaveBackground(char *filename);
ed9b544e 381static robj *createStringObject(char *ptr, size_t len);
87eca727 382static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
44b38ef4 383static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
ed9b544e 384static int syncWithMaster(void);
10c43610 385static robj *tryObjectSharing(robj *o);
942a3961 386static int tryObjectEncoding(robj *o);
9d65a1bb 387static robj *getDecodedObject(robj *o);
3305306f 388static int removeExpire(redisDb *db, robj *key);
389static int expireIfNeeded(redisDb *db, robj *key);
390static int deleteIfVolatile(redisDb *db, robj *key);
94754ccc 391static int deleteKey(redisDb *db, robj *key);
bb32ede5 392static time_t getExpire(redisDb *db, robj *key);
393static int setExpire(redisDb *db, robj *key, time_t when);
a3b21203 394static void updateSlavesWaitingBgsave(int bgsaveerr);
3fd78bcd 395static void freeMemoryIfNeeded(void);
de96dbfe 396static int processCommand(redisClient *c);
56906eef 397static void setupSigSegvAction(void);
a3b21203 398static void rdbRemoveTempFile(pid_t childpid);
9d65a1bb 399static void aofRemoveTempFile(pid_t childpid);
0ea663ea 400static size_t stringObjectLen(robj *o);
638e42ac 401static void processInputBuffer(redisClient *c);
6b47e12e 402static zskiplist *zslCreate(void);
fd8ccf44 403static void zslFree(zskiplist *zsl);
2b59cfdf 404static void zslInsert(zskiplist *zsl, double score, robj *obj);
2895e862 405static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
ed9b544e 406
abcb223e 407static void authCommand(redisClient *c);
ed9b544e 408static void pingCommand(redisClient *c);
409static void echoCommand(redisClient *c);
410static void setCommand(redisClient *c);
411static void setnxCommand(redisClient *c);
412static void getCommand(redisClient *c);
413static void delCommand(redisClient *c);
414static void existsCommand(redisClient *c);
415static void incrCommand(redisClient *c);
416static void decrCommand(redisClient *c);
417static void incrbyCommand(redisClient *c);
418static void decrbyCommand(redisClient *c);
419static void selectCommand(redisClient *c);
420static void randomkeyCommand(redisClient *c);
421static void keysCommand(redisClient *c);
422static void dbsizeCommand(redisClient *c);
423static void lastsaveCommand(redisClient *c);
424static void saveCommand(redisClient *c);
425static void bgsaveCommand(redisClient *c);
9d65a1bb 426static void bgrewriteaofCommand(redisClient *c);
ed9b544e 427static void shutdownCommand(redisClient *c);
428static void moveCommand(redisClient *c);
429static void renameCommand(redisClient *c);
430static void renamenxCommand(redisClient *c);
431static void lpushCommand(redisClient *c);
432static void rpushCommand(redisClient *c);
433static void lpopCommand(redisClient *c);
434static void rpopCommand(redisClient *c);
435static void llenCommand(redisClient *c);
436static void lindexCommand(redisClient *c);
437static void lrangeCommand(redisClient *c);
438static void ltrimCommand(redisClient *c);
439static void typeCommand(redisClient *c);
440static void lsetCommand(redisClient *c);
441static void saddCommand(redisClient *c);
442static void sremCommand(redisClient *c);
a4460ef4 443static void smoveCommand(redisClient *c);
ed9b544e 444static void sismemberCommand(redisClient *c);
445static void scardCommand(redisClient *c);
12fea928 446static void spopCommand(redisClient *c);
2abb95a9 447static void srandmemberCommand(redisClient *c);
ed9b544e 448static void sinterCommand(redisClient *c);
449static void sinterstoreCommand(redisClient *c);
40d224a9 450static void sunionCommand(redisClient *c);
451static void sunionstoreCommand(redisClient *c);
f4f56e1d 452static void sdiffCommand(redisClient *c);
453static void sdiffstoreCommand(redisClient *c);
ed9b544e 454static void syncCommand(redisClient *c);
455static void flushdbCommand(redisClient *c);
456static void flushallCommand(redisClient *c);
457static void sortCommand(redisClient *c);
458static void lremCommand(redisClient *c);
0f5f7e9a 459static void rpoplpushcommand(redisClient *c);
ed9b544e 460static void infoCommand(redisClient *c);
70003d28 461static void mgetCommand(redisClient *c);
87eca727 462static void monitorCommand(redisClient *c);
3305306f 463static void expireCommand(redisClient *c);
802e8373 464static void expireatCommand(redisClient *c);
f6b141c5 465static void getsetCommand(redisClient *c);
fd88489a 466static void ttlCommand(redisClient *c);
321b0e13 467static void slaveofCommand(redisClient *c);
7f957c92 468static void debugCommand(redisClient *c);
f6b141c5 469static void msetCommand(redisClient *c);
470static void msetnxCommand(redisClient *c);
fd8ccf44 471static void zaddCommand(redisClient *c);
7db723ad 472static void zincrbyCommand(redisClient *c);
cc812361 473static void zrangeCommand(redisClient *c);
50c55df5 474static void zrangebyscoreCommand(redisClient *c);
e3870fab 475static void zrevrangeCommand(redisClient *c);
3c41331e 476static void zcardCommand(redisClient *c);
1b7106e7 477static void zremCommand(redisClient *c);
6e333bbe 478static void zscoreCommand(redisClient *c);
1807985b 479static void zremrangebyscoreCommand(redisClient *c);
f6b141c5 480
ed9b544e 481/*================================= Globals ================================= */
482
483/* Global vars */
484static struct redisServer server; /* server global state */
485static struct redisCommand cmdTable[] = {
486 {"get",getCommand,2,REDIS_CMD_INLINE},
3fd78bcd 487 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
488 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
5109cdff 489 {"del",delCommand,-2,REDIS_CMD_INLINE},
ed9b544e 490 {"exists",existsCommand,2,REDIS_CMD_INLINE},
3fd78bcd 491 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
492 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
70003d28 493 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
3fd78bcd 494 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
495 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 496 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
497 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
498 {"llen",llenCommand,2,REDIS_CMD_INLINE},
499 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
3fd78bcd 500 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 501 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
502 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
503 {"lrem",lremCommand,4,REDIS_CMD_BULK},
0f5f7e9a 504 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK},
3fd78bcd 505 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 506 {"srem",sremCommand,3,REDIS_CMD_BULK},
a4460ef4 507 {"smove",smoveCommand,4,REDIS_CMD_BULK},
ed9b544e 508 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
509 {"scard",scardCommand,2,REDIS_CMD_INLINE},
12fea928 510 {"spop",spopCommand,2,REDIS_CMD_INLINE},
2abb95a9 511 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
3fd78bcd 512 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
513 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
514 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
515 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
516 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
517 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
ed9b544e 518 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
fd8ccf44 519 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
7db723ad 520 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
1b7106e7 521 {"zrem",zremCommand,3,REDIS_CMD_BULK},
1807985b 522 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
cc812361 523 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
50c55df5 524 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
e3870fab 525 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
3c41331e 526 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
6e333bbe 527 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
3fd78bcd 528 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
529 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
f6b141c5 530 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
531 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
532 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 533 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
534 {"select",selectCommand,2,REDIS_CMD_INLINE},
535 {"move",moveCommand,3,REDIS_CMD_INLINE},
536 {"rename",renameCommand,3,REDIS_CMD_INLINE},
537 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
321b0e13 538 {"expire",expireCommand,3,REDIS_CMD_INLINE},
802e8373 539 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
ed9b544e 540 {"keys",keysCommand,2,REDIS_CMD_INLINE},
541 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
abcb223e 542 {"auth",authCommand,2,REDIS_CMD_INLINE},
ed9b544e 543 {"ping",pingCommand,1,REDIS_CMD_INLINE},
544 {"echo",echoCommand,2,REDIS_CMD_BULK},
545 {"save",saveCommand,1,REDIS_CMD_INLINE},
546 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
9d65a1bb 547 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
ed9b544e 548 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
549 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
550 {"type",typeCommand,2,REDIS_CMD_INLINE},
551 {"sync",syncCommand,1,REDIS_CMD_INLINE},
552 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
553 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
3fd78bcd 554 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
ed9b544e 555 {"info",infoCommand,1,REDIS_CMD_INLINE},
87eca727 556 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
fd88489a 557 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
321b0e13 558 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
7f957c92 559 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
ed9b544e 560 {NULL,NULL,0,0}
561};
bcfc686d 562
ed9b544e 563/*============================ Utility functions ============================ */
564
565/* Glob-style pattern matching. */
566int stringmatchlen(const char *pattern, int patternLen,
567 const char *string, int stringLen, int nocase)
568{
569 while(patternLen) {
570 switch(pattern[0]) {
571 case '*':
572 while (pattern[1] == '*') {
573 pattern++;
574 patternLen--;
575 }
576 if (patternLen == 1)
577 return 1; /* match */
578 while(stringLen) {
579 if (stringmatchlen(pattern+1, patternLen-1,
580 string, stringLen, nocase))
581 return 1; /* match */
582 string++;
583 stringLen--;
584 }
585 return 0; /* no match */
586 break;
587 case '?':
588 if (stringLen == 0)
589 return 0; /* no match */
590 string++;
591 stringLen--;
592 break;
593 case '[':
594 {
595 int not, match;
596
597 pattern++;
598 patternLen--;
599 not = pattern[0] == '^';
600 if (not) {
601 pattern++;
602 patternLen--;
603 }
604 match = 0;
605 while(1) {
606 if (pattern[0] == '\\') {
607 pattern++;
608 patternLen--;
609 if (pattern[0] == string[0])
610 match = 1;
611 } else if (pattern[0] == ']') {
612 break;
613 } else if (patternLen == 0) {
614 pattern--;
615 patternLen++;
616 break;
617 } else if (pattern[1] == '-' && patternLen >= 3) {
618 int start = pattern[0];
619 int end = pattern[2];
620 int c = string[0];
621 if (start > end) {
622 int t = start;
623 start = end;
624 end = t;
625 }
626 if (nocase) {
627 start = tolower(start);
628 end = tolower(end);
629 c = tolower(c);
630 }
631 pattern += 2;
632 patternLen -= 2;
633 if (c >= start && c <= end)
634 match = 1;
635 } else {
636 if (!nocase) {
637 if (pattern[0] == string[0])
638 match = 1;
639 } else {
640 if (tolower((int)pattern[0]) == tolower((int)string[0]))
641 match = 1;
642 }
643 }
644 pattern++;
645 patternLen--;
646 }
647 if (not)
648 match = !match;
649 if (!match)
650 return 0; /* no match */
651 string++;
652 stringLen--;
653 break;
654 }
655 case '\\':
656 if (patternLen >= 2) {
657 pattern++;
658 patternLen--;
659 }
660 /* fall through */
661 default:
662 if (!nocase) {
663 if (pattern[0] != string[0])
664 return 0; /* no match */
665 } else {
666 if (tolower((int)pattern[0]) != tolower((int)string[0]))
667 return 0; /* no match */
668 }
669 string++;
670 stringLen--;
671 break;
672 }
673 pattern++;
674 patternLen--;
675 if (stringLen == 0) {
676 while(*pattern == '*') {
677 pattern++;
678 patternLen--;
679 }
680 break;
681 }
682 }
683 if (patternLen == 0 && stringLen == 0)
684 return 1;
685 return 0;
686}
687
56906eef 688static void redisLog(int level, const char *fmt, ...) {
ed9b544e 689 va_list ap;
690 FILE *fp;
691
692 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
693 if (!fp) return;
694
695 va_start(ap, fmt);
696 if (level >= server.verbosity) {
697 char *c = ".-*";
1904ecc1 698 char buf[64];
699 time_t now;
700
701 now = time(NULL);
6c9385e0 702 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
1904ecc1 703 fprintf(fp,"%s %c ",buf,c[level]);
ed9b544e 704 vfprintf(fp, fmt, ap);
705 fprintf(fp,"\n");
706 fflush(fp);
707 }
708 va_end(ap);
709
710 if (server.logfile) fclose(fp);
711}
712
713/*====================== Hash table type implementation ==================== */
714
715/* This is an hash table type that uses the SDS dynamic strings libary as
716 * keys and radis objects as values (objects can hold SDS strings,
717 * lists, sets). */
718
1812e024 719static void dictVanillaFree(void *privdata, void *val)
720{
721 DICT_NOTUSED(privdata);
722 zfree(val);
723}
724
ed9b544e 725static int sdsDictKeyCompare(void *privdata, const void *key1,
726 const void *key2)
727{
728 int l1,l2;
729 DICT_NOTUSED(privdata);
730
731 l1 = sdslen((sds)key1);
732 l2 = sdslen((sds)key2);
733 if (l1 != l2) return 0;
734 return memcmp(key1, key2, l1) == 0;
735}
736
737static void dictRedisObjectDestructor(void *privdata, void *val)
738{
739 DICT_NOTUSED(privdata);
740
741 decrRefCount(val);
742}
743
942a3961 744static int dictObjKeyCompare(void *privdata, const void *key1,
ed9b544e 745 const void *key2)
746{
747 const robj *o1 = key1, *o2 = key2;
748 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
749}
750
942a3961 751static unsigned int dictObjHash(const void *key) {
ed9b544e 752 const robj *o = key;
753 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
754}
755
942a3961 756static int dictEncObjKeyCompare(void *privdata, const void *key1,
757 const void *key2)
758{
9d65a1bb 759 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
760 int cmp;
942a3961 761
9d65a1bb 762 o1 = getDecodedObject(o1);
763 o2 = getDecodedObject(o2);
764 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
765 decrRefCount(o1);
766 decrRefCount(o2);
767 return cmp;
942a3961 768}
769
770static unsigned int dictEncObjHash(const void *key) {
9d65a1bb 771 robj *o = (robj*) key;
942a3961 772
9d65a1bb 773 o = getDecodedObject(o);
774 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
775 decrRefCount(o);
776 return hash;
942a3961 777}
778
ed9b544e 779static dictType setDictType = {
942a3961 780 dictEncObjHash, /* hash function */
ed9b544e 781 NULL, /* key dup */
782 NULL, /* val dup */
942a3961 783 dictEncObjKeyCompare, /* key compare */
ed9b544e 784 dictRedisObjectDestructor, /* key destructor */
785 NULL /* val destructor */
786};
787
1812e024 788static dictType zsetDictType = {
789 dictEncObjHash, /* hash function */
790 NULL, /* key dup */
791 NULL, /* val dup */
792 dictEncObjKeyCompare, /* key compare */
793 dictRedisObjectDestructor, /* key destructor */
794 dictVanillaFree /* val destructor */
795};
796
ed9b544e 797static dictType hashDictType = {
942a3961 798 dictObjHash, /* hash function */
ed9b544e 799 NULL, /* key dup */
800 NULL, /* val dup */
942a3961 801 dictObjKeyCompare, /* key compare */
ed9b544e 802 dictRedisObjectDestructor, /* key destructor */
803 dictRedisObjectDestructor /* val destructor */
804};
805
806/* ========================= Random utility functions ======================= */
807
808/* Redis generally does not try to recover from out of memory conditions
809 * when allocating objects or strings, it is not clear if it will be possible
810 * to report this condition to the client since the networking layer itself
811 * is based on heap allocation for send buffers, so we simply abort.
812 * At least the code will be simpler to read... */
813static void oom(const char *msg) {
814 fprintf(stderr, "%s: Out of memory\n",msg);
815 fflush(stderr);
816 sleep(1);
817 abort();
818}
819
820/* ====================== Redis server networking stuff ===================== */
56906eef 821static void closeTimedoutClients(void) {
ed9b544e 822 redisClient *c;
ed9b544e 823 listNode *ln;
824 time_t now = time(NULL);
825
6208b3a7 826 listRewind(server.clients);
827 while ((ln = listYield(server.clients)) != NULL) {
ed9b544e 828 c = listNodeValue(ln);
829 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
c7cf2ec9 830 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
ed9b544e 831 (now - c->lastinteraction > server.maxidletime)) {
832 redisLog(REDIS_DEBUG,"Closing idle client");
833 freeClient(c);
834 }
835 }
ed9b544e 836}
837
12fea928 838static int htNeedsResize(dict *dict) {
839 long long size, used;
840
841 size = dictSlots(dict);
842 used = dictSize(dict);
843 return (size && used && size > DICT_HT_INITIAL_SIZE &&
844 (used*100/size < REDIS_HT_MINFILL));
845}
846
0bc03378 847/* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
848 * we resize the hash table to save memory */
56906eef 849static void tryResizeHashTables(void) {
0bc03378 850 int j;
851
852 for (j = 0; j < server.dbnum; j++) {
12fea928 853 if (htNeedsResize(server.db[j].dict)) {
854 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
0bc03378 855 dictResize(server.db[j].dict);
12fea928 856 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
0bc03378 857 }
12fea928 858 if (htNeedsResize(server.db[j].expires))
859 dictResize(server.db[j].expires);
0bc03378 860 }
861}
862
9d65a1bb 863/* A background saving child (BGSAVE) terminated its work. Handle this. */
864void backgroundSaveDoneHandler(int statloc) {
865 int exitcode = WEXITSTATUS(statloc);
866 int bysignal = WIFSIGNALED(statloc);
867
868 if (!bysignal && exitcode == 0) {
869 redisLog(REDIS_NOTICE,
870 "Background saving terminated with success");
871 server.dirty = 0;
872 server.lastsave = time(NULL);
873 } else if (!bysignal && exitcode != 0) {
874 redisLog(REDIS_WARNING, "Background saving error");
875 } else {
876 redisLog(REDIS_WARNING,
877 "Background saving terminated by signal");
878 rdbRemoveTempFile(server.bgsavechildpid);
879 }
880 server.bgsavechildpid = -1;
881 /* Possibly there are slaves waiting for a BGSAVE in order to be served
882 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
883 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
884}
885
886/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
887 * Handle this. */
888void backgroundRewriteDoneHandler(int statloc) {
889 int exitcode = WEXITSTATUS(statloc);
890 int bysignal = WIFSIGNALED(statloc);
891
892 if (!bysignal && exitcode == 0) {
893 int fd;
894 char tmpfile[256];
895
896 redisLog(REDIS_NOTICE,
897 "Background append only file rewriting terminated with success");
898 /* Now it's time to flush the differences accumulated by the parent */
899 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
900 fd = open(tmpfile,O_WRONLY|O_APPEND);
901 if (fd == -1) {
902 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
903 goto cleanup;
904 }
905 /* Flush our data... */
906 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
907 (signed) sdslen(server.bgrewritebuf)) {
908 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
909 close(fd);
910 goto cleanup;
911 }
912 redisLog(REDIS_WARNING,"Parent diff flushed into the new append log file with success");
913 /* Now our work is to rename the temp file into the stable file. And
914 * switch the file descriptor used by the server for append only. */
915 if (rename(tmpfile,server.appendfilename) == -1) {
916 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
917 close(fd);
918 goto cleanup;
919 }
920 /* Mission completed... almost */
921 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
922 if (server.appendfd != -1) {
923 /* If append only is actually enabled... */
924 close(server.appendfd);
925 server.appendfd = fd;
926 fsync(fd);
85a83172 927 server.appendseldb = -1; /* Make sure it will issue SELECT */
9d65a1bb 928 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
929 } else {
930 /* If append only is disabled we just generate a dump in this
931 * format. Why not? */
932 close(fd);
933 }
934 } else if (!bysignal && exitcode != 0) {
935 redisLog(REDIS_WARNING, "Background append only file rewriting error");
936 } else {
937 redisLog(REDIS_WARNING,
938 "Background append only file rewriting terminated by signal");
939 }
940cleanup:
941 sdsfree(server.bgrewritebuf);
942 server.bgrewritebuf = sdsempty();
943 aofRemoveTempFile(server.bgrewritechildpid);
944 server.bgrewritechildpid = -1;
945}
946
56906eef 947static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
94754ccc 948 int j, loops = server.cronloops++;
ed9b544e 949 REDIS_NOTUSED(eventLoop);
950 REDIS_NOTUSED(id);
951 REDIS_NOTUSED(clientData);
952
953 /* Update the global state with the amount of used memory */
954 server.usedmemory = zmalloc_used_memory();
955
0bc03378 956 /* Show some info about non-empty databases */
ed9b544e 957 for (j = 0; j < server.dbnum; j++) {
dec423d9 958 long long size, used, vkeys;
94754ccc 959
3305306f 960 size = dictSlots(server.db[j].dict);
961 used = dictSize(server.db[j].dict);
94754ccc 962 vkeys = dictSize(server.db[j].expires);
c3cb078d 963 if (!(loops % 5) && (used || vkeys)) {
964 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
a4d1ba9a 965 /* dictPrintStats(server.dict); */
ed9b544e 966 }
ed9b544e 967 }
968
0bc03378 969 /* We don't want to resize the hash tables while a bacground saving
970 * is in progress: the saving child is created using fork() that is
971 * implemented with a copy-on-write semantic in most modern systems, so
972 * if we resize the HT while there is the saving child at work actually
973 * a lot of memory movements in the parent will cause a lot of pages
974 * copied. */
9d65a1bb 975 if (server.bgsavechildpid == -1) tryResizeHashTables();
0bc03378 976
ed9b544e 977 /* Show information about connected clients */
978 if (!(loops % 5)) {
21aecf4b 979 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
ed9b544e 980 listLength(server.clients)-listLength(server.slaves),
981 listLength(server.slaves),
10c43610 982 server.usedmemory,
3305306f 983 dictSize(server.sharingpool));
ed9b544e 984 }
985
986 /* Close connections of timedout clients */
0150db36 987 if (server.maxidletime && !(loops % 10))
ed9b544e 988 closeTimedoutClients();
989
9d65a1bb 990 /* Check if a background saving or AOF rewrite in progress terminated */
991 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
ed9b544e 992 int statloc;
9d65a1bb 993 pid_t pid;
994
995 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
996 if (pid == server.bgsavechildpid) {
997 backgroundSaveDoneHandler(statloc);
ed9b544e 998 } else {
9d65a1bb 999 backgroundRewriteDoneHandler(statloc);
ed9b544e 1000 }
ed9b544e 1001 }
1002 } else {
1003 /* If there is not a background saving in progress check if
1004 * we have to save now */
1005 time_t now = time(NULL);
1006 for (j = 0; j < server.saveparamslen; j++) {
1007 struct saveparam *sp = server.saveparams+j;
1008
1009 if (server.dirty >= sp->changes &&
1010 now-server.lastsave > sp->seconds) {
1011 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1012 sp->changes, sp->seconds);
f78fd11b 1013 rdbSaveBackground(server.dbfilename);
ed9b544e 1014 break;
1015 }
1016 }
1017 }
94754ccc 1018
f2324293 1019 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1020 * will use few CPU cycles if there are few expiring keys, otherwise
1021 * it will get more aggressive to avoid that too much memory is used by
1022 * keys that can be removed from the keyspace. */
94754ccc 1023 for (j = 0; j < server.dbnum; j++) {
f2324293 1024 int expired;
94754ccc 1025 redisDb *db = server.db+j;
94754ccc 1026
f2324293 1027 /* Continue to expire if at the end of the cycle more than 25%
1028 * of the keys were expired. */
1029 do {
1030 int num = dictSize(db->expires);
94754ccc 1031 time_t now = time(NULL);
1032
f2324293 1033 expired = 0;
94754ccc 1034 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1035 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1036 while (num--) {
1037 dictEntry *de;
1038 time_t t;
1039
1040 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1041 t = (time_t) dictGetEntryVal(de);
1042 if (now > t) {
1043 deleteKey(db,dictGetEntryKey(de));
f2324293 1044 expired++;
94754ccc 1045 }
1046 }
f2324293 1047 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
94754ccc 1048 }
1049
ed9b544e 1050 /* Check if we should connect to a MASTER */
1051 if (server.replstate == REDIS_REPL_CONNECT) {
1052 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1053 if (syncWithMaster() == REDIS_OK) {
1054 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1055 }
1056 }
1057 return 1000;
1058}
1059
1060static void createSharedObjects(void) {
1061 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1062 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1063 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
c937aa89 1064 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1065 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1066 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1067 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1068 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1069 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
ed9b544e 1070 /* no such key */
ed9b544e 1071 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1072 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1073 "-ERR Operation against a key holding the wrong kind of value\r\n"));
ed9b544e 1074 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1075 "-ERR no such key\r\n"));
ed9b544e 1076 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1077 "-ERR syntax error\r\n"));
c937aa89 1078 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1079 "-ERR source and destination objects are the same\r\n"));
1080 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1081 "-ERR index out of range\r\n"));
ed9b544e 1082 shared.space = createObject(REDIS_STRING,sdsnew(" "));
c937aa89 1083 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1084 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
ed9b544e 1085 shared.select0 = createStringObject("select 0\r\n",10);
1086 shared.select1 = createStringObject("select 1\r\n",10);
1087 shared.select2 = createStringObject("select 2\r\n",10);
1088 shared.select3 = createStringObject("select 3\r\n",10);
1089 shared.select4 = createStringObject("select 4\r\n",10);
1090 shared.select5 = createStringObject("select 5\r\n",10);
1091 shared.select6 = createStringObject("select 6\r\n",10);
1092 shared.select7 = createStringObject("select 7\r\n",10);
1093 shared.select8 = createStringObject("select 8\r\n",10);
1094 shared.select9 = createStringObject("select 9\r\n",10);
1095}
1096
1097static void appendServerSaveParams(time_t seconds, int changes) {
1098 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
ed9b544e 1099 server.saveparams[server.saveparamslen].seconds = seconds;
1100 server.saveparams[server.saveparamslen].changes = changes;
1101 server.saveparamslen++;
1102}
1103
bcfc686d 1104static void resetServerSaveParams() {
ed9b544e 1105 zfree(server.saveparams);
1106 server.saveparams = NULL;
1107 server.saveparamslen = 0;
1108}
1109
1110static void initServerConfig() {
1111 server.dbnum = REDIS_DEFAULT_DBNUM;
1112 server.port = REDIS_SERVERPORT;
1113 server.verbosity = REDIS_DEBUG;
1114 server.maxidletime = REDIS_MAXIDLETIME;
1115 server.saveparams = NULL;
1116 server.logfile = NULL; /* NULL = log on standard output */
1117 server.bindaddr = NULL;
1118 server.glueoutputbuf = 1;
1119 server.daemonize = 0;
44b38ef4 1120 server.appendonly = 0;
4e141d5a 1121 server.appendfsync = APPENDFSYNC_ALWAYS;
48f0308a 1122 server.lastfsync = time(NULL);
44b38ef4 1123 server.appendfd = -1;
1124 server.appendseldb = -1; /* Make sure the first time will not match */
ed329fcf 1125 server.pidfile = "/var/run/redis.pid";
ed9b544e 1126 server.dbfilename = "dump.rdb";
9d65a1bb 1127 server.appendfilename = "appendonly.aof";
abcb223e 1128 server.requirepass = NULL;
10c43610 1129 server.shareobjects = 0;
21aecf4b 1130 server.sharingpoolsize = 1024;
285add55 1131 server.maxclients = 0;
3fd78bcd 1132 server.maxmemory = 0;
bcfc686d 1133 resetServerSaveParams();
ed9b544e 1134
1135 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1136 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1137 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1138 /* Replication related */
1139 server.isslave = 0;
d0ccebcf 1140 server.masterauth = NULL;
ed9b544e 1141 server.masterhost = NULL;
1142 server.masterport = 6379;
1143 server.master = NULL;
1144 server.replstate = REDIS_REPL_NONE;
a7866db6 1145
1146 /* Double constants initialization */
1147 R_Zero = 0.0;
1148 R_PosInf = 1.0/R_Zero;
1149 R_NegInf = -1.0/R_Zero;
1150 R_Nan = R_Zero/R_Zero;
ed9b544e 1151}
1152
1153static void initServer() {
1154 int j;
1155
1156 signal(SIGHUP, SIG_IGN);
1157 signal(SIGPIPE, SIG_IGN);
fe3bbfbe 1158 setupSigSegvAction();
ed9b544e 1159
1160 server.clients = listCreate();
1161 server.slaves = listCreate();
87eca727 1162 server.monitors = listCreate();
ed9b544e 1163 server.objfreelist = listCreate();
1164 createSharedObjects();
1165 server.el = aeCreateEventLoop();
3305306f 1166 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
10c43610 1167 server.sharingpool = dictCreate(&setDictType,NULL);
ed9b544e 1168 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1169 if (server.fd == -1) {
1170 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1171 exit(1);
1172 }
3305306f 1173 for (j = 0; j < server.dbnum; j++) {
1174 server.db[j].dict = dictCreate(&hashDictType,NULL);
1175 server.db[j].expires = dictCreate(&setDictType,NULL);
1176 server.db[j].id = j;
1177 }
ed9b544e 1178 server.cronloops = 0;
9f3c422c 1179 server.bgsavechildpid = -1;
9d65a1bb 1180 server.bgrewritechildpid = -1;
1181 server.bgrewritebuf = sdsempty();
ed9b544e 1182 server.lastsave = time(NULL);
1183 server.dirty = 0;
1184 server.usedmemory = 0;
1185 server.stat_numcommands = 0;
1186 server.stat_numconnections = 0;
1187 server.stat_starttime = time(NULL);
d8f8b666 1188 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
44b38ef4 1189
1190 if (server.appendonly) {
71eba477 1191 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
44b38ef4 1192 if (server.appendfd == -1) {
1193 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1194 strerror(errno));
1195 exit(1);
1196 }
1197 }
ed9b544e 1198}
1199
1200/* Empty the whole database */
ca37e9cd 1201static long long emptyDb() {
ed9b544e 1202 int j;
ca37e9cd 1203 long long removed = 0;
ed9b544e 1204
3305306f 1205 for (j = 0; j < server.dbnum; j++) {
ca37e9cd 1206 removed += dictSize(server.db[j].dict);
3305306f 1207 dictEmpty(server.db[j].dict);
1208 dictEmpty(server.db[j].expires);
1209 }
ca37e9cd 1210 return removed;
ed9b544e 1211}
1212
85dd2f3a 1213static int yesnotoi(char *s) {
1214 if (!strcasecmp(s,"yes")) return 1;
1215 else if (!strcasecmp(s,"no")) return 0;
1216 else return -1;
1217}
1218
ed9b544e 1219/* I agree, this is a very rudimental way to load a configuration...
1220 will improve later if the config gets more complex */
1221static void loadServerConfig(char *filename) {
c9a111ac 1222 FILE *fp;
ed9b544e 1223 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1224 int linenum = 0;
1225 sds line = NULL;
c9a111ac 1226
1227 if (filename[0] == '-' && filename[1] == '\0')
1228 fp = stdin;
1229 else {
1230 if ((fp = fopen(filename,"r")) == NULL) {
1231 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1232 exit(1);
1233 }
ed9b544e 1234 }
c9a111ac 1235
ed9b544e 1236 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1237 sds *argv;
1238 int argc, j;
1239
1240 linenum++;
1241 line = sdsnew(buf);
1242 line = sdstrim(line," \t\r\n");
1243
1244 /* Skip comments and blank lines*/
1245 if (line[0] == '#' || line[0] == '\0') {
1246 sdsfree(line);
1247 continue;
1248 }
1249
1250 /* Split into arguments */
1251 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1252 sdstolower(argv[0]);
1253
1254 /* Execute config directives */
bb0b03a3 1255 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
ed9b544e 1256 server.maxidletime = atoi(argv[1]);
0150db36 1257 if (server.maxidletime < 0) {
ed9b544e 1258 err = "Invalid timeout value"; goto loaderr;
1259 }
bb0b03a3 1260 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
ed9b544e 1261 server.port = atoi(argv[1]);
1262 if (server.port < 1 || server.port > 65535) {
1263 err = "Invalid port"; goto loaderr;
1264 }
bb0b03a3 1265 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
ed9b544e 1266 server.bindaddr = zstrdup(argv[1]);
bb0b03a3 1267 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
ed9b544e 1268 int seconds = atoi(argv[1]);
1269 int changes = atoi(argv[2]);
1270 if (seconds < 1 || changes < 0) {
1271 err = "Invalid save parameters"; goto loaderr;
1272 }
1273 appendServerSaveParams(seconds,changes);
bb0b03a3 1274 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
ed9b544e 1275 if (chdir(argv[1]) == -1) {
1276 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1277 argv[1], strerror(errno));
1278 exit(1);
1279 }
bb0b03a3 1280 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1281 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1282 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1283 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
ed9b544e 1284 else {
1285 err = "Invalid log level. Must be one of debug, notice, warning";
1286 goto loaderr;
1287 }
bb0b03a3 1288 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
c9a111ac 1289 FILE *logfp;
ed9b544e 1290
1291 server.logfile = zstrdup(argv[1]);
bb0b03a3 1292 if (!strcasecmp(server.logfile,"stdout")) {
ed9b544e 1293 zfree(server.logfile);
1294 server.logfile = NULL;
1295 }
1296 if (server.logfile) {
1297 /* Test if we are able to open the file. The server will not
1298 * be able to abort just for this problem later... */
c9a111ac 1299 logfp = fopen(server.logfile,"a");
1300 if (logfp == NULL) {
ed9b544e 1301 err = sdscatprintf(sdsempty(),
1302 "Can't open the log file: %s", strerror(errno));
1303 goto loaderr;
1304 }
c9a111ac 1305 fclose(logfp);
ed9b544e 1306 }
bb0b03a3 1307 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
ed9b544e 1308 server.dbnum = atoi(argv[1]);
1309 if (server.dbnum < 1) {
1310 err = "Invalid number of databases"; goto loaderr;
1311 }
285add55 1312 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1313 server.maxclients = atoi(argv[1]);
3fd78bcd 1314 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
d4465900 1315 server.maxmemory = strtoll(argv[1], NULL, 10);
bb0b03a3 1316 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
ed9b544e 1317 server.masterhost = sdsnew(argv[1]);
1318 server.masterport = atoi(argv[2]);
1319 server.replstate = REDIS_REPL_CONNECT;
d0ccebcf 1320 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1321 server.masterauth = zstrdup(argv[1]);
bb0b03a3 1322 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
85dd2f3a 1323 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
ed9b544e 1324 err = "argument must be 'yes' or 'no'"; goto loaderr;
1325 }
bb0b03a3 1326 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
85dd2f3a 1327 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
10c43610 1328 err = "argument must be 'yes' or 'no'"; goto loaderr;
1329 }
e52c65b9 1330 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1331 server.sharingpoolsize = atoi(argv[1]);
1332 if (server.sharingpoolsize < 1) {
1333 err = "invalid object sharing pool size"; goto loaderr;
1334 }
bb0b03a3 1335 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
85dd2f3a 1336 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
ed9b544e 1337 err = "argument must be 'yes' or 'no'"; goto loaderr;
1338 }
44b38ef4 1339 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1340 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1341 err = "argument must be 'yes' or 'no'"; goto loaderr;
1342 }
48f0308a 1343 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1766c6da 1344 if (!strcasecmp(argv[1],"no")) {
48f0308a 1345 server.appendfsync = APPENDFSYNC_NO;
1766c6da 1346 } else if (!strcasecmp(argv[1],"always")) {
48f0308a 1347 server.appendfsync = APPENDFSYNC_ALWAYS;
1766c6da 1348 } else if (!strcasecmp(argv[1],"everysec")) {
48f0308a 1349 server.appendfsync = APPENDFSYNC_EVERYSEC;
1350 } else {
1351 err = "argument must be 'no', 'always' or 'everysec'";
1352 goto loaderr;
1353 }
bb0b03a3 1354 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
abcb223e 1355 server.requirepass = zstrdup(argv[1]);
bb0b03a3 1356 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
ed329fcf 1357 server.pidfile = zstrdup(argv[1]);
bb0b03a3 1358 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
b8b553c8 1359 server.dbfilename = zstrdup(argv[1]);
ed9b544e 1360 } else {
1361 err = "Bad directive or wrong number of arguments"; goto loaderr;
1362 }
1363 for (j = 0; j < argc; j++)
1364 sdsfree(argv[j]);
1365 zfree(argv);
1366 sdsfree(line);
1367 }
c9a111ac 1368 if (fp != stdin) fclose(fp);
ed9b544e 1369 return;
1370
1371loaderr:
1372 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1373 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1374 fprintf(stderr, ">>> '%s'\n", line);
1375 fprintf(stderr, "%s\n", err);
1376 exit(1);
1377}
1378
1379static void freeClientArgv(redisClient *c) {
1380 int j;
1381
1382 for (j = 0; j < c->argc; j++)
1383 decrRefCount(c->argv[j]);
e8a74421 1384 for (j = 0; j < c->mbargc; j++)
1385 decrRefCount(c->mbargv[j]);
ed9b544e 1386 c->argc = 0;
e8a74421 1387 c->mbargc = 0;
ed9b544e 1388}
1389
1390static void freeClient(redisClient *c) {
1391 listNode *ln;
1392
1393 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1394 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1395 sdsfree(c->querybuf);
1396 listRelease(c->reply);
1397 freeClientArgv(c);
1398 close(c->fd);
1399 ln = listSearchKey(server.clients,c);
1400 assert(ln != NULL);
1401 listDelNode(server.clients,ln);
1402 if (c->flags & REDIS_SLAVE) {
6208b3a7 1403 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1404 close(c->repldbfd);
87eca727 1405 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1406 ln = listSearchKey(l,c);
ed9b544e 1407 assert(ln != NULL);
87eca727 1408 listDelNode(l,ln);
ed9b544e 1409 }
1410 if (c->flags & REDIS_MASTER) {
1411 server.master = NULL;
1412 server.replstate = REDIS_REPL_CONNECT;
1413 }
93ea3759 1414 zfree(c->argv);
e8a74421 1415 zfree(c->mbargv);
ed9b544e 1416 zfree(c);
1417}
1418
cc30e368 1419#define GLUEREPLY_UP_TO (1024)
ed9b544e 1420static void glueReplyBuffersIfNeeded(redisClient *c) {
c28b42ac 1421 int copylen = 0;
1422 char buf[GLUEREPLY_UP_TO];
6208b3a7 1423 listNode *ln;
ed9b544e 1424 robj *o;
1425
6208b3a7 1426 listRewind(c->reply);
1427 while((ln = listYield(c->reply))) {
c28b42ac 1428 int objlen;
1429
ed9b544e 1430 o = ln->value;
c28b42ac 1431 objlen = sdslen(o->ptr);
1432 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1433 memcpy(buf+copylen,o->ptr,objlen);
1434 copylen += objlen;
ed9b544e 1435 listDelNode(c->reply,ln);
c28b42ac 1436 } else {
1437 if (copylen == 0) return;
1438 break;
ed9b544e 1439 }
ed9b544e 1440 }
c28b42ac 1441 /* Now the output buffer is empty, add the new single element */
1442 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1443 listAddNodeHead(c->reply,o);
ed9b544e 1444}
1445
1446static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1447 redisClient *c = privdata;
1448 int nwritten = 0, totwritten = 0, objlen;
1449 robj *o;
1450 REDIS_NOTUSED(el);
1451 REDIS_NOTUSED(mask);
1452
2895e862 1453 /* Use writev() if we have enough buffers to send */
7ea870c0 1454 if (!server.glueoutputbuf &&
1455 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1456 !(c->flags & REDIS_MASTER))
2895e862 1457 {
1458 sendReplyToClientWritev(el, fd, privdata, mask);
1459 return;
1460 }
2895e862 1461
ed9b544e 1462 while(listLength(c->reply)) {
c28b42ac 1463 if (server.glueoutputbuf && listLength(c->reply) > 1)
1464 glueReplyBuffersIfNeeded(c);
1465
ed9b544e 1466 o = listNodeValue(listFirst(c->reply));
1467 objlen = sdslen(o->ptr);
1468
1469 if (objlen == 0) {
1470 listDelNode(c->reply,listFirst(c->reply));
1471 continue;
1472 }
1473
1474 if (c->flags & REDIS_MASTER) {
6f376729 1475 /* Don't reply to a master */
ed9b544e 1476 nwritten = objlen - c->sentlen;
1477 } else {
a4d1ba9a 1478 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
ed9b544e 1479 if (nwritten <= 0) break;
1480 }
1481 c->sentlen += nwritten;
1482 totwritten += nwritten;
1483 /* If we fully sent the object on head go to the next one */
1484 if (c->sentlen == objlen) {
1485 listDelNode(c->reply,listFirst(c->reply));
1486 c->sentlen = 0;
1487 }
6f376729 1488 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
12f9d551 1489 * bytes, in a single threaded server it's a good idea to serve
6f376729 1490 * other clients as well, even if a very large request comes from
1491 * super fast link that is always able to accept data (in real world
12f9d551 1492 * scenario think about 'KEYS *' against the loopback interfae) */
6f376729 1493 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
ed9b544e 1494 }
1495 if (nwritten == -1) {
1496 if (errno == EAGAIN) {
1497 nwritten = 0;
1498 } else {
1499 redisLog(REDIS_DEBUG,
1500 "Error writing to client: %s", strerror(errno));
1501 freeClient(c);
1502 return;
1503 }
1504 }
1505 if (totwritten > 0) c->lastinteraction = time(NULL);
1506 if (listLength(c->reply) == 0) {
1507 c->sentlen = 0;
1508 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1509 }
1510}
1511
2895e862 1512static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1513{
1514 redisClient *c = privdata;
1515 int nwritten = 0, totwritten = 0, objlen, willwrite;
1516 robj *o;
1517 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1518 int offset, ion = 0;
1519 REDIS_NOTUSED(el);
1520 REDIS_NOTUSED(mask);
1521
1522 listNode *node;
1523 while (listLength(c->reply)) {
1524 offset = c->sentlen;
1525 ion = 0;
1526 willwrite = 0;
1527
1528 /* fill-in the iov[] array */
1529 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1530 o = listNodeValue(node);
1531 objlen = sdslen(o->ptr);
1532
1533 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1534 break;
1535
1536 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1537 break; /* no more iovecs */
1538
1539 iov[ion].iov_base = ((char*)o->ptr) + offset;
1540 iov[ion].iov_len = objlen - offset;
1541 willwrite += objlen - offset;
1542 offset = 0; /* just for the first item */
1543 ion++;
1544 }
1545
1546 if(willwrite == 0)
1547 break;
1548
1549 /* write all collected blocks at once */
1550 if((nwritten = writev(fd, iov, ion)) < 0) {
1551 if (errno != EAGAIN) {
1552 redisLog(REDIS_DEBUG,
1553 "Error writing to client: %s", strerror(errno));
1554 freeClient(c);
1555 return;
1556 }
1557 break;
1558 }
1559
1560 totwritten += nwritten;
1561 offset = c->sentlen;
1562
1563 /* remove written robjs from c->reply */
1564 while (nwritten && listLength(c->reply)) {
1565 o = listNodeValue(listFirst(c->reply));
1566 objlen = sdslen(o->ptr);
1567
1568 if(nwritten >= objlen - offset) {
1569 listDelNode(c->reply, listFirst(c->reply));
1570 nwritten -= objlen - offset;
1571 c->sentlen = 0;
1572 } else {
1573 /* partial write */
1574 c->sentlen += nwritten;
1575 break;
1576 }
1577 offset = 0;
1578 }
1579 }
1580
1581 if (totwritten > 0)
1582 c->lastinteraction = time(NULL);
1583
1584 if (listLength(c->reply) == 0) {
1585 c->sentlen = 0;
1586 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1587 }
1588}
1589
ed9b544e 1590static struct redisCommand *lookupCommand(char *name) {
1591 int j = 0;
1592 while(cmdTable[j].name != NULL) {
bb0b03a3 1593 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
ed9b544e 1594 j++;
1595 }
1596 return NULL;
1597}
1598
1599/* resetClient prepare the client to process the next command */
1600static void resetClient(redisClient *c) {
1601 freeClientArgv(c);
1602 c->bulklen = -1;
e8a74421 1603 c->multibulk = 0;
ed9b544e 1604}
1605
1606/* If this function gets called we already read a whole
1607 * command, argments are in the client argv/argc fields.
1608 * processCommand() execute the command or prepare the
1609 * server for a bulk read from the client.
1610 *
1611 * If 1 is returned the client is still alive and valid and
1612 * and other operations can be performed by the caller. Otherwise
1613 * if 0 is returned the client was destroied (i.e. after QUIT). */
1614static int processCommand(redisClient *c) {
1615 struct redisCommand *cmd;
1616 long long dirty;
1617
3fd78bcd 1618 /* Free some memory if needed (maxmemory setting) */
1619 if (server.maxmemory) freeMemoryIfNeeded();
1620
e8a74421 1621 /* Handle the multi bulk command type. This is an alternative protocol
1622 * supported by Redis in order to receive commands that are composed of
1623 * multiple binary-safe "bulk" arguments. The latency of processing is
1624 * a bit higher but this allows things like multi-sets, so if this
1625 * protocol is used only for MSET and similar commands this is a big win. */
1626 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1627 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1628 if (c->multibulk <= 0) {
1629 resetClient(c);
1630 return 1;
1631 } else {
1632 decrRefCount(c->argv[c->argc-1]);
1633 c->argc--;
1634 return 1;
1635 }
1636 } else if (c->multibulk) {
1637 if (c->bulklen == -1) {
1638 if (((char*)c->argv[0]->ptr)[0] != '$') {
1639 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1640 resetClient(c);
1641 return 1;
1642 } else {
1643 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1644 decrRefCount(c->argv[0]);
1645 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1646 c->argc--;
1647 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1648 resetClient(c);
1649 return 1;
1650 }
1651 c->argc--;
1652 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1653 return 1;
1654 }
1655 } else {
1656 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1657 c->mbargv[c->mbargc] = c->argv[0];
1658 c->mbargc++;
1659 c->argc--;
1660 c->multibulk--;
1661 if (c->multibulk == 0) {
1662 robj **auxargv;
1663 int auxargc;
1664
1665 /* Here we need to swap the multi-bulk argc/argv with the
1666 * normal argc/argv of the client structure. */
1667 auxargv = c->argv;
1668 c->argv = c->mbargv;
1669 c->mbargv = auxargv;
1670
1671 auxargc = c->argc;
1672 c->argc = c->mbargc;
1673 c->mbargc = auxargc;
1674
1675 /* We need to set bulklen to something different than -1
1676 * in order for the code below to process the command without
1677 * to try to read the last argument of a bulk command as
1678 * a special argument. */
1679 c->bulklen = 0;
1680 /* continue below and process the command */
1681 } else {
1682 c->bulklen = -1;
1683 return 1;
1684 }
1685 }
1686 }
1687 /* -- end of multi bulk commands processing -- */
1688
ed9b544e 1689 /* The QUIT command is handled as a special case. Normal command
1690 * procs are unable to close the client connection safely */
bb0b03a3 1691 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
ed9b544e 1692 freeClient(c);
1693 return 0;
1694 }
1695 cmd = lookupCommand(c->argv[0]->ptr);
1696 if (!cmd) {
1697 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1698 resetClient(c);
1699 return 1;
1700 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1701 (c->argc < -cmd->arity)) {
1702 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1703 resetClient(c);
1704 return 1;
3fd78bcd 1705 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1706 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1707 resetClient(c);
1708 return 1;
ed9b544e 1709 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1710 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1711
1712 decrRefCount(c->argv[c->argc-1]);
1713 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1714 c->argc--;
1715 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1716 resetClient(c);
1717 return 1;
1718 }
1719 c->argc--;
1720 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1721 /* It is possible that the bulk read is already in the
8d0490e7 1722 * buffer. Check this condition and handle it accordingly.
1723 * This is just a fast path, alternative to call processInputBuffer().
1724 * It's a good idea since the code is small and this condition
1725 * happens most of the times. */
ed9b544e 1726 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1727 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1728 c->argc++;
1729 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1730 } else {
1731 return 1;
1732 }
1733 }
10c43610 1734 /* Let's try to share objects on the command arguments vector */
1735 if (server.shareobjects) {
1736 int j;
1737 for(j = 1; j < c->argc; j++)
1738 c->argv[j] = tryObjectSharing(c->argv[j]);
1739 }
942a3961 1740 /* Let's try to encode the bulk object to save space. */
1741 if (cmd->flags & REDIS_CMD_BULK)
1742 tryObjectEncoding(c->argv[c->argc-1]);
1743
e63943a4 1744 /* Check if the user is authenticated */
1745 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1746 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1747 resetClient(c);
1748 return 1;
1749 }
1750
ed9b544e 1751 /* Exec the command */
1752 dirty = server.dirty;
1753 cmd->proc(c);
33ed1a42 1754 if (server.appendonly && server.dirty-dirty)
44b38ef4 1755 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
33ed1a42 1756 if (server.dirty-dirty && listLength(server.slaves))
3305306f 1757 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
87eca727 1758 if (listLength(server.monitors))
3305306f 1759 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
ed9b544e 1760 server.stat_numcommands++;
1761
1762 /* Prepare the client for the next command */
1763 if (c->flags & REDIS_CLOSE) {
1764 freeClient(c);
1765 return 0;
1766 }
1767 resetClient(c);
1768 return 1;
1769}
1770
87eca727 1771static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
6208b3a7 1772 listNode *ln;
ed9b544e 1773 int outc = 0, j;
93ea3759 1774 robj **outv;
1775 /* (args*2)+1 is enough room for args, spaces, newlines */
1776 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1777
1778 if (argc <= REDIS_STATIC_ARGS) {
1779 outv = static_outv;
1780 } else {
1781 outv = zmalloc(sizeof(robj*)*(argc*2+1));
93ea3759 1782 }
ed9b544e 1783
1784 for (j = 0; j < argc; j++) {
1785 if (j != 0) outv[outc++] = shared.space;
1786 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1787 robj *lenobj;
1788
1789 lenobj = createObject(REDIS_STRING,
0ea663ea 1790 sdscatprintf(sdsempty(),"%d\r\n",
1791 stringObjectLen(argv[j])));
ed9b544e 1792 lenobj->refcount = 0;
1793 outv[outc++] = lenobj;
1794 }
1795 outv[outc++] = argv[j];
1796 }
1797 outv[outc++] = shared.crlf;
1798
40d224a9 1799 /* Increment all the refcounts at start and decrement at end in order to
1800 * be sure to free objects if there is no slave in a replication state
1801 * able to be feed with commands */
1802 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
6208b3a7 1803 listRewind(slaves);
1804 while((ln = listYield(slaves))) {
ed9b544e 1805 redisClient *slave = ln->value;
40d224a9 1806
1807 /* Don't feed slaves that are still waiting for BGSAVE to start */
6208b3a7 1808 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
40d224a9 1809
1810 /* Feed all the other slaves, MONITORs and so on */
ed9b544e 1811 if (slave->slaveseldb != dictid) {
1812 robj *selectcmd;
1813
1814 switch(dictid) {
1815 case 0: selectcmd = shared.select0; break;
1816 case 1: selectcmd = shared.select1; break;
1817 case 2: selectcmd = shared.select2; break;
1818 case 3: selectcmd = shared.select3; break;
1819 case 4: selectcmd = shared.select4; break;
1820 case 5: selectcmd = shared.select5; break;
1821 case 6: selectcmd = shared.select6; break;
1822 case 7: selectcmd = shared.select7; break;
1823 case 8: selectcmd = shared.select8; break;
1824 case 9: selectcmd = shared.select9; break;
1825 default:
1826 selectcmd = createObject(REDIS_STRING,
1827 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1828 selectcmd->refcount = 0;
1829 break;
1830 }
1831 addReply(slave,selectcmd);
1832 slave->slaveseldb = dictid;
1833 }
1834 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
ed9b544e 1835 }
40d224a9 1836 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
93ea3759 1837 if (outv != static_outv) zfree(outv);
ed9b544e 1838}
1839
638e42ac 1840static void processInputBuffer(redisClient *c) {
ed9b544e 1841again:
1842 if (c->bulklen == -1) {
1843 /* Read the first line of the query */
1844 char *p = strchr(c->querybuf,'\n');
1845 size_t querylen;
644fafa3 1846
ed9b544e 1847 if (p) {
1848 sds query, *argv;
1849 int argc, j;
1850
1851 query = c->querybuf;
1852 c->querybuf = sdsempty();
1853 querylen = 1+(p-(query));
1854 if (sdslen(query) > querylen) {
1855 /* leave data after the first line of the query in the buffer */
1856 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1857 }
1858 *p = '\0'; /* remove "\n" */
1859 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1860 sdsupdatelen(query);
1861
1862 /* Now we can split the query in arguments */
1863 if (sdslen(query) == 0) {
1864 /* Ignore empty query */
1865 sdsfree(query);
1866 return;
1867 }
1868 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
93ea3759 1869 sdsfree(query);
1870
1871 if (c->argv) zfree(c->argv);
1872 c->argv = zmalloc(sizeof(robj*)*argc);
93ea3759 1873
1874 for (j = 0; j < argc; j++) {
ed9b544e 1875 if (sdslen(argv[j])) {
1876 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1877 c->argc++;
1878 } else {
1879 sdsfree(argv[j]);
1880 }
1881 }
1882 zfree(argv);
1883 /* Execute the command. If the client is still valid
1884 * after processCommand() return and there is something
1885 * on the query buffer try to process the next command. */
af807d87 1886 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
ed9b544e 1887 return;
644fafa3 1888 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
ed9b544e 1889 redisLog(REDIS_DEBUG, "Client protocol error");
1890 freeClient(c);
1891 return;
1892 }
1893 } else {
1894 /* Bulk read handling. Note that if we are at this point
1895 the client already sent a command terminated with a newline,
1896 we are reading the bulk data that is actually the last
1897 argument of the command. */
1898 int qbl = sdslen(c->querybuf);
1899
1900 if (c->bulklen <= qbl) {
1901 /* Copy everything but the final CRLF as final argument */
1902 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1903 c->argc++;
1904 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
638e42ac 1905 /* Process the command. If the client is still valid after
1906 * the processing and there is more data in the buffer
1907 * try to parse it. */
1908 if (processCommand(c) && sdslen(c->querybuf)) goto again;
ed9b544e 1909 return;
1910 }
1911 }
1912}
1913
638e42ac 1914static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1915 redisClient *c = (redisClient*) privdata;
1916 char buf[REDIS_IOBUF_LEN];
1917 int nread;
1918 REDIS_NOTUSED(el);
1919 REDIS_NOTUSED(mask);
1920
1921 nread = read(fd, buf, REDIS_IOBUF_LEN);
1922 if (nread == -1) {
1923 if (errno == EAGAIN) {
1924 nread = 0;
1925 } else {
1926 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1927 freeClient(c);
1928 return;
1929 }
1930 } else if (nread == 0) {
1931 redisLog(REDIS_DEBUG, "Client closed connection");
1932 freeClient(c);
1933 return;
1934 }
1935 if (nread) {
1936 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1937 c->lastinteraction = time(NULL);
1938 } else {
1939 return;
1940 }
1941 processInputBuffer(c);
1942}
1943
ed9b544e 1944static int selectDb(redisClient *c, int id) {
1945 if (id < 0 || id >= server.dbnum)
1946 return REDIS_ERR;
3305306f 1947 c->db = &server.db[id];
ed9b544e 1948 return REDIS_OK;
1949}
1950
40d224a9 1951static void *dupClientReplyValue(void *o) {
1952 incrRefCount((robj*)o);
1953 return 0;
1954}
1955
ed9b544e 1956static redisClient *createClient(int fd) {
1957 redisClient *c = zmalloc(sizeof(*c));
1958
1959 anetNonBlock(NULL,fd);
1960 anetTcpNoDelay(NULL,fd);
1961 if (!c) return NULL;
1962 selectDb(c,0);
1963 c->fd = fd;
1964 c->querybuf = sdsempty();
1965 c->argc = 0;
93ea3759 1966 c->argv = NULL;
ed9b544e 1967 c->bulklen = -1;
e8a74421 1968 c->multibulk = 0;
1969 c->mbargc = 0;
1970 c->mbargv = NULL;
ed9b544e 1971 c->sentlen = 0;
1972 c->flags = 0;
1973 c->lastinteraction = time(NULL);
abcb223e 1974 c->authenticated = 0;
40d224a9 1975 c->replstate = REDIS_REPL_NONE;
6b47e12e 1976 c->reply = listCreate();
ed9b544e 1977 listSetFreeMethod(c->reply,decrRefCount);
40d224a9 1978 listSetDupMethod(c->reply,dupClientReplyValue);
ed9b544e 1979 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
266373b2 1980 readQueryFromClient, c) == AE_ERR) {
ed9b544e 1981 freeClient(c);
1982 return NULL;
1983 }
6b47e12e 1984 listAddNodeTail(server.clients,c);
ed9b544e 1985 return c;
1986}
1987
1988static void addReply(redisClient *c, robj *obj) {
1989 if (listLength(c->reply) == 0 &&
6208b3a7 1990 (c->replstate == REDIS_REPL_NONE ||
1991 c->replstate == REDIS_REPL_ONLINE) &&
ed9b544e 1992 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
266373b2 1993 sendReplyToClient, c) == AE_ERR) return;
9d65a1bb 1994 listAddNodeTail(c->reply,getDecodedObject(obj));
ed9b544e 1995}
1996
1997static void addReplySds(redisClient *c, sds s) {
1998 robj *o = createObject(REDIS_STRING,s);
1999 addReply(c,o);
2000 decrRefCount(o);
2001}
2002
e2665397 2003static void addReplyDouble(redisClient *c, double d) {
2004 char buf[128];
2005
2006 snprintf(buf,sizeof(buf),"%.17g",d);
2007 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
2008 strlen(buf),buf));
2009}
2010
942a3961 2011static void addReplyBulkLen(redisClient *c, robj *obj) {
2012 size_t len;
2013
2014 if (obj->encoding == REDIS_ENCODING_RAW) {
2015 len = sdslen(obj->ptr);
2016 } else {
2017 long n = (long)obj->ptr;
2018
2019 len = 1;
2020 if (n < 0) {
2021 len++;
2022 n = -n;
2023 }
2024 while((n = n/10) != 0) {
2025 len++;
2026 }
2027 }
2028 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
2029}
2030
ed9b544e 2031static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2032 int cport, cfd;
2033 char cip[128];
285add55 2034 redisClient *c;
ed9b544e 2035 REDIS_NOTUSED(el);
2036 REDIS_NOTUSED(mask);
2037 REDIS_NOTUSED(privdata);
2038
2039 cfd = anetAccept(server.neterr, fd, cip, &cport);
2040 if (cfd == AE_ERR) {
2041 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2042 return;
2043 }
2044 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
285add55 2045 if ((c = createClient(cfd)) == NULL) {
ed9b544e 2046 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2047 close(cfd); /* May be already closed, just ingore errors */
2048 return;
2049 }
285add55 2050 /* If maxclient directive is set and this is one client more... close the
2051 * connection. Note that we create the client instead to check before
2052 * for this condition, since now the socket is already set in nonblocking
2053 * mode and we can send an error for free using the Kernel I/O */
2054 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2055 char *err = "-ERR max number of clients reached\r\n";
2056
2057 /* That's a best effort error message, don't check write errors */
fee803ba 2058 if (write(c->fd,err,strlen(err)) == -1) {
2059 /* Nothing to do, Just to avoid the warning... */
2060 }
285add55 2061 freeClient(c);
2062 return;
2063 }
ed9b544e 2064 server.stat_numconnections++;
2065}
2066
2067/* ======================= Redis objects implementation ===================== */
2068
2069static robj *createObject(int type, void *ptr) {
2070 robj *o;
2071
2072 if (listLength(server.objfreelist)) {
2073 listNode *head = listFirst(server.objfreelist);
2074 o = listNodeValue(head);
2075 listDelNode(server.objfreelist,head);
2076 } else {
2077 o = zmalloc(sizeof(*o));
2078 }
ed9b544e 2079 o->type = type;
942a3961 2080 o->encoding = REDIS_ENCODING_RAW;
ed9b544e 2081 o->ptr = ptr;
2082 o->refcount = 1;
2083 return o;
2084}
2085
2086static robj *createStringObject(char *ptr, size_t len) {
2087 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2088}
2089
2090static robj *createListObject(void) {
2091 list *l = listCreate();
2092
ed9b544e 2093 listSetFreeMethod(l,decrRefCount);
2094 return createObject(REDIS_LIST,l);
2095}
2096
2097static robj *createSetObject(void) {
2098 dict *d = dictCreate(&setDictType,NULL);
ed9b544e 2099 return createObject(REDIS_SET,d);
2100}
2101
1812e024 2102static robj *createZsetObject(void) {
6b47e12e 2103 zset *zs = zmalloc(sizeof(*zs));
2104
2105 zs->dict = dictCreate(&zsetDictType,NULL);
2106 zs->zsl = zslCreate();
2107 return createObject(REDIS_ZSET,zs);
1812e024 2108}
2109
ed9b544e 2110static void freeStringObject(robj *o) {
942a3961 2111 if (o->encoding == REDIS_ENCODING_RAW) {
2112 sdsfree(o->ptr);
2113 }
ed9b544e 2114}
2115
2116static void freeListObject(robj *o) {
2117 listRelease((list*) o->ptr);
2118}
2119
2120static void freeSetObject(robj *o) {
2121 dictRelease((dict*) o->ptr);
2122}
2123
fd8ccf44 2124static void freeZsetObject(robj *o) {
2125 zset *zs = o->ptr;
2126
2127 dictRelease(zs->dict);
2128 zslFree(zs->zsl);
2129 zfree(zs);
2130}
2131
ed9b544e 2132static void freeHashObject(robj *o) {
2133 dictRelease((dict*) o->ptr);
2134}
2135
2136static void incrRefCount(robj *o) {
2137 o->refcount++;
94754ccc 2138#ifdef DEBUG_REFCOUNT
2139 if (o->type == REDIS_STRING)
2140 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2141#endif
ed9b544e 2142}
2143
2144static void decrRefCount(void *obj) {
2145 robj *o = obj;
94754ccc 2146
2147#ifdef DEBUG_REFCOUNT
2148 if (o->type == REDIS_STRING)
2149 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2150#endif
ed9b544e 2151 if (--(o->refcount) == 0) {
2152 switch(o->type) {
2153 case REDIS_STRING: freeStringObject(o); break;
2154 case REDIS_LIST: freeListObject(o); break;
2155 case REDIS_SET: freeSetObject(o); break;
fd8ccf44 2156 case REDIS_ZSET: freeZsetObject(o); break;
ed9b544e 2157 case REDIS_HASH: freeHashObject(o); break;
2158 default: assert(0 != 0); break;
2159 }
2160 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2161 !listAddNodeHead(server.objfreelist,o))
2162 zfree(o);
2163 }
2164}
2165
942a3961 2166static robj *lookupKey(redisDb *db, robj *key) {
2167 dictEntry *de = dictFind(db->dict,key);
2168 return de ? dictGetEntryVal(de) : NULL;
2169}
2170
2171static robj *lookupKeyRead(redisDb *db, robj *key) {
2172 expireIfNeeded(db,key);
2173 return lookupKey(db,key);
2174}
2175
2176static robj *lookupKeyWrite(redisDb *db, robj *key) {
2177 deleteIfVolatile(db,key);
2178 return lookupKey(db,key);
2179}
2180
2181static int deleteKey(redisDb *db, robj *key) {
2182 int retval;
2183
2184 /* We need to protect key from destruction: after the first dictDelete()
2185 * it may happen that 'key' is no longer valid if we don't increment
2186 * it's count. This may happen when we get the object reference directly
2187 * from the hash table with dictRandomKey() or dict iterators */
2188 incrRefCount(key);
2189 if (dictSize(db->expires)) dictDelete(db->expires,key);
2190 retval = dictDelete(db->dict,key);
2191 decrRefCount(key);
2192
2193 return retval == DICT_OK;
2194}
2195
10c43610 2196/* Try to share an object against the shared objects pool */
2197static robj *tryObjectSharing(robj *o) {
2198 struct dictEntry *de;
2199 unsigned long c;
2200
3305306f 2201 if (o == NULL || server.shareobjects == 0) return o;
10c43610 2202
2203 assert(o->type == REDIS_STRING);
2204 de = dictFind(server.sharingpool,o);
2205 if (de) {
2206 robj *shared = dictGetEntryKey(de);
2207
2208 c = ((unsigned long) dictGetEntryVal(de))+1;
2209 dictGetEntryVal(de) = (void*) c;
2210 incrRefCount(shared);
2211 decrRefCount(o);
2212 return shared;
2213 } else {
2214 /* Here we are using a stream algorihtm: Every time an object is
2215 * shared we increment its count, everytime there is a miss we
2216 * recrement the counter of a random object. If this object reaches
2217 * zero we remove the object and put the current object instead. */
3305306f 2218 if (dictSize(server.sharingpool) >=
10c43610 2219 server.sharingpoolsize) {
2220 de = dictGetRandomKey(server.sharingpool);
2221 assert(de != NULL);
2222 c = ((unsigned long) dictGetEntryVal(de))-1;
2223 dictGetEntryVal(de) = (void*) c;
2224 if (c == 0) {
2225 dictDelete(server.sharingpool,de->key);
2226 }
2227 } else {
2228 c = 0; /* If the pool is empty we want to add this object */
2229 }
2230 if (c == 0) {
2231 int retval;
2232
2233 retval = dictAdd(server.sharingpool,o,(void*)1);
2234 assert(retval == DICT_OK);
2235 incrRefCount(o);
2236 }
2237 return o;
2238 }
2239}
2240
724a51b1 2241/* Check if the nul-terminated string 's' can be represented by a long
2242 * (that is, is a number that fits into long without any other space or
2243 * character before or after the digits).
2244 *
2245 * If so, the function returns REDIS_OK and *longval is set to the value
2246 * of the number. Otherwise REDIS_ERR is returned */
f69f2cba 2247static int isStringRepresentableAsLong(sds s, long *longval) {
724a51b1 2248 char buf[32], *endptr;
2249 long value;
2250 int slen;
2251
2252 value = strtol(s, &endptr, 10);
2253 if (endptr[0] != '\0') return REDIS_ERR;
2254 slen = snprintf(buf,32,"%ld",value);
2255
2256 /* If the number converted back into a string is not identical
2257 * then it's not possible to encode the string as integer */
f69f2cba 2258 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
724a51b1 2259 if (longval) *longval = value;
2260 return REDIS_OK;
2261}
2262
942a3961 2263/* Try to encode a string object in order to save space */
2264static int tryObjectEncoding(robj *o) {
2265 long value;
942a3961 2266 sds s = o->ptr;
3305306f 2267
942a3961 2268 if (o->encoding != REDIS_ENCODING_RAW)
2269 return REDIS_ERR; /* Already encoded */
3305306f 2270
942a3961 2271 /* It's not save to encode shared objects: shared objects can be shared
2272 * everywhere in the "object space" of Redis. Encoded objects can only
2273 * appear as "values" (and not, for instance, as keys) */
2274 if (o->refcount > 1) return REDIS_ERR;
3305306f 2275
942a3961 2276 /* Currently we try to encode only strings */
2277 assert(o->type == REDIS_STRING);
94754ccc 2278
724a51b1 2279 /* Check if we can represent this string as a long integer */
2280 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
942a3961 2281
2282 /* Ok, this object can be encoded */
2283 o->encoding = REDIS_ENCODING_INT;
2284 sdsfree(o->ptr);
2285 o->ptr = (void*) value;
2286 return REDIS_OK;
2287}
2288
9d65a1bb 2289/* Get a decoded version of an encoded object (returned as a new object).
2290 * If the object is already raw-encoded just increment the ref count. */
2291static robj *getDecodedObject(robj *o) {
942a3961 2292 robj *dec;
2293
9d65a1bb 2294 if (o->encoding == REDIS_ENCODING_RAW) {
2295 incrRefCount(o);
2296 return o;
2297 }
942a3961 2298 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2299 char buf[32];
2300
2301 snprintf(buf,32,"%ld",(long)o->ptr);
2302 dec = createStringObject(buf,strlen(buf));
2303 return dec;
2304 } else {
2305 assert(1 != 1);
2306 }
3305306f 2307}
2308
d7f43c08 2309/* Compare two string objects via strcmp() or alike.
2310 * Note that the objects may be integer-encoded. In such a case we
2311 * use snprintf() to get a string representation of the numbers on the stack
1fd9bc8a 2312 * and compare the strings, it's much faster than calling getDecodedObject().
2313 *
2314 * Important note: if objects are not integer encoded, but binary-safe strings,
2315 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2316 * binary safe. */
724a51b1 2317static int compareStringObjects(robj *a, robj *b) {
2318 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
d7f43c08 2319 char bufa[128], bufb[128], *astr, *bstr;
2320 int bothsds = 1;
724a51b1 2321
e197b441 2322 if (a == b) return 0;
d7f43c08 2323 if (a->encoding != REDIS_ENCODING_RAW) {
2324 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2325 astr = bufa;
2326 bothsds = 0;
724a51b1 2327 } else {
d7f43c08 2328 astr = a->ptr;
724a51b1 2329 }
d7f43c08 2330 if (b->encoding != REDIS_ENCODING_RAW) {
2331 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2332 bstr = bufb;
2333 bothsds = 0;
2334 } else {
2335 bstr = b->ptr;
2336 }
2337 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
724a51b1 2338}
2339
0ea663ea 2340static size_t stringObjectLen(robj *o) {
2341 assert(o->type == REDIS_STRING);
2342 if (o->encoding == REDIS_ENCODING_RAW) {
2343 return sdslen(o->ptr);
2344 } else {
2345 char buf[32];
2346
2347 return snprintf(buf,32,"%ld",(long)o->ptr);
2348 }
2349}
2350
ed9b544e 2351/*============================ DB saving/loading ============================ */
2352
f78fd11b 2353static int rdbSaveType(FILE *fp, unsigned char type) {
2354 if (fwrite(&type,1,1,fp) == 0) return -1;
2355 return 0;
2356}
2357
bb32ede5 2358static int rdbSaveTime(FILE *fp, time_t t) {
2359 int32_t t32 = (int32_t) t;
2360 if (fwrite(&t32,4,1,fp) == 0) return -1;
2361 return 0;
2362}
2363
e3566d4b 2364/* check rdbLoadLen() comments for more info */
f78fd11b 2365static int rdbSaveLen(FILE *fp, uint32_t len) {
2366 unsigned char buf[2];
2367
2368 if (len < (1<<6)) {
2369 /* Save a 6 bit len */
10c43610 2370 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
f78fd11b 2371 if (fwrite(buf,1,1,fp) == 0) return -1;
2372 } else if (len < (1<<14)) {
2373 /* Save a 14 bit len */
10c43610 2374 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
f78fd11b 2375 buf[1] = len&0xFF;
17be1a4a 2376 if (fwrite(buf,2,1,fp) == 0) return -1;
f78fd11b 2377 } else {
2378 /* Save a 32 bit len */
10c43610 2379 buf[0] = (REDIS_RDB_32BITLEN<<6);
f78fd11b 2380 if (fwrite(buf,1,1,fp) == 0) return -1;
2381 len = htonl(len);
2382 if (fwrite(&len,4,1,fp) == 0) return -1;
2383 }
2384 return 0;
2385}
2386
e3566d4b 2387/* String objects in the form "2391" "-100" without any space and with a
2388 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2389 * encoded as integers to save space */
56906eef 2390static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
e3566d4b 2391 long long value;
2392 char *endptr, buf[32];
2393
2394 /* Check if it's possible to encode this value as a number */
2395 value = strtoll(s, &endptr, 10);
2396 if (endptr[0] != '\0') return 0;
2397 snprintf(buf,32,"%lld",value);
2398
2399 /* If the number converted back into a string is not identical
2400 * then it's not possible to encode the string as integer */
2401 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2402
2403 /* Finally check if it fits in our ranges */
2404 if (value >= -(1<<7) && value <= (1<<7)-1) {
2405 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2406 enc[1] = value&0xFF;
2407 return 2;
2408 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2409 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2410 enc[1] = value&0xFF;
2411 enc[2] = (value>>8)&0xFF;
2412 return 3;
2413 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2414 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2415 enc[1] = value&0xFF;
2416 enc[2] = (value>>8)&0xFF;
2417 enc[3] = (value>>16)&0xFF;
2418 enc[4] = (value>>24)&0xFF;
2419 return 5;
2420 } else {
2421 return 0;
2422 }
2423}
2424
774e3047 2425static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2426 unsigned int comprlen, outlen;
2427 unsigned char byte;
2428 void *out;
2429
2430 /* We require at least four bytes compression for this to be worth it */
2431 outlen = sdslen(obj->ptr)-4;
2432 if (outlen <= 0) return 0;
3a2694c4 2433 if ((out = zmalloc(outlen+1)) == NULL) return 0;
774e3047 2434 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2435 if (comprlen == 0) {
88e85998 2436 zfree(out);
774e3047 2437 return 0;
2438 }
2439 /* Data compressed! Let's save it on disk */
2440 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2441 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2442 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2443 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2444 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
88e85998 2445 zfree(out);
774e3047 2446 return comprlen;
2447
2448writeerr:
88e85998 2449 zfree(out);
774e3047 2450 return -1;
2451}
2452
e3566d4b 2453/* Save a string objet as [len][data] on disk. If the object is a string
2454 * representation of an integer value we try to safe it in a special form */
942a3961 2455static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2456 size_t len;
e3566d4b 2457 int enclen;
10c43610 2458
942a3961 2459 len = sdslen(obj->ptr);
2460
774e3047 2461 /* Try integer encoding */
e3566d4b 2462 if (len <= 11) {
2463 unsigned char buf[5];
2464 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2465 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2466 return 0;
2467 }
2468 }
774e3047 2469
2470 /* Try LZF compression - under 20 bytes it's unable to compress even
88e85998 2471 * aaaaaaaaaaaaaaaaaa so skip it */
942a3961 2472 if (len > 20) {
774e3047 2473 int retval;
2474
2475 retval = rdbSaveLzfStringObject(fp,obj);
2476 if (retval == -1) return -1;
2477 if (retval > 0) return 0;
2478 /* retval == 0 means data can't be compressed, save the old way */
2479 }
2480
2481 /* Store verbatim */
10c43610 2482 if (rdbSaveLen(fp,len) == -1) return -1;
2483 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2484 return 0;
2485}
2486
942a3961 2487/* Like rdbSaveStringObjectRaw() but handle encoded objects */
2488static int rdbSaveStringObject(FILE *fp, robj *obj) {
2489 int retval;
942a3961 2490
9d65a1bb 2491 obj = getDecodedObject(obj);
2492 retval = rdbSaveStringObjectRaw(fp,obj);
2493 decrRefCount(obj);
2494 return retval;
942a3961 2495}
2496
a7866db6 2497/* Save a double value. Doubles are saved as strings prefixed by an unsigned
2498 * 8 bit integer specifing the length of the representation.
2499 * This 8 bit integer has special values in order to specify the following
2500 * conditions:
2501 * 253: not a number
2502 * 254: + inf
2503 * 255: - inf
2504 */
2505static int rdbSaveDoubleValue(FILE *fp, double val) {
2506 unsigned char buf[128];
2507 int len;
2508
2509 if (isnan(val)) {
2510 buf[0] = 253;
2511 len = 1;
2512 } else if (!isfinite(val)) {
2513 len = 1;
2514 buf[0] = (val < 0) ? 255 : 254;
2515 } else {
eaa256ad 2516 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
6c446631 2517 buf[0] = strlen((char*)buf+1);
a7866db6 2518 len = buf[0]+1;
2519 }
2520 if (fwrite(buf,len,1,fp) == 0) return -1;
2521 return 0;
2522}
2523
ed9b544e 2524/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
f78fd11b 2525static int rdbSave(char *filename) {
ed9b544e 2526 dictIterator *di = NULL;
2527 dictEntry *de;
ed9b544e 2528 FILE *fp;
2529 char tmpfile[256];
2530 int j;
bb32ede5 2531 time_t now = time(NULL);
ed9b544e 2532
a3b21203 2533 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
ed9b544e 2534 fp = fopen(tmpfile,"w");
2535 if (!fp) {
2536 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2537 return REDIS_ERR;
2538 }
f78fd11b 2539 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
ed9b544e 2540 for (j = 0; j < server.dbnum; j++) {
bb32ede5 2541 redisDb *db = server.db+j;
2542 dict *d = db->dict;
3305306f 2543 if (dictSize(d) == 0) continue;
ed9b544e 2544 di = dictGetIterator(d);
2545 if (!di) {
2546 fclose(fp);
2547 return REDIS_ERR;
2548 }
2549
2550 /* Write the SELECT DB opcode */
f78fd11b 2551 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2552 if (rdbSaveLen(fp,j) == -1) goto werr;
ed9b544e 2553
2554 /* Iterate this DB writing every entry */
2555 while((de = dictNext(di)) != NULL) {
2556 robj *key = dictGetEntryKey(de);
2557 robj *o = dictGetEntryVal(de);
bb32ede5 2558 time_t expiretime = getExpire(db,key);
2559
2560 /* Save the expire time */
2561 if (expiretime != -1) {
2562 /* If this key is already expired skip it */
2563 if (expiretime < now) continue;
2564 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2565 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2566 }
2567 /* Save the key and associated value */
f78fd11b 2568 if (rdbSaveType(fp,o->type) == -1) goto werr;
10c43610 2569 if (rdbSaveStringObject(fp,key) == -1) goto werr;
f78fd11b 2570 if (o->type == REDIS_STRING) {
ed9b544e 2571 /* Save a string value */
10c43610 2572 if (rdbSaveStringObject(fp,o) == -1) goto werr;
f78fd11b 2573 } else if (o->type == REDIS_LIST) {
ed9b544e 2574 /* Save a list value */
2575 list *list = o->ptr;
6208b3a7 2576 listNode *ln;
ed9b544e 2577
6208b3a7 2578 listRewind(list);
f78fd11b 2579 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
6208b3a7 2580 while((ln = listYield(list))) {
ed9b544e 2581 robj *eleobj = listNodeValue(ln);
f78fd11b 2582
10c43610 2583 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
ed9b544e 2584 }
f78fd11b 2585 } else if (o->type == REDIS_SET) {
ed9b544e 2586 /* Save a set value */
2587 dict *set = o->ptr;
2588 dictIterator *di = dictGetIterator(set);
2589 dictEntry *de;
2590
3305306f 2591 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
ed9b544e 2592 while((de = dictNext(di)) != NULL) {
10c43610 2593 robj *eleobj = dictGetEntryKey(de);
ed9b544e 2594
10c43610 2595 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
ed9b544e 2596 }
2597 dictReleaseIterator(di);
2b59cfdf 2598 } else if (o->type == REDIS_ZSET) {
2599 /* Save a set value */
2600 zset *zs = o->ptr;
2601 dictIterator *di = dictGetIterator(zs->dict);
2602 dictEntry *de;
2603
2604 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2605 while((de = dictNext(di)) != NULL) {
2606 robj *eleobj = dictGetEntryKey(de);
2607 double *score = dictGetEntryVal(de);
2608
2609 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2610 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2611 }
2612 dictReleaseIterator(di);
ed9b544e 2613 } else {
2614 assert(0 != 0);
2615 }
2616 }
2617 dictReleaseIterator(di);
2618 }
2619 /* EOF opcode */
f78fd11b 2620 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2621
2622 /* Make sure data will not remain on the OS's output buffers */
ed9b544e 2623 fflush(fp);
2624 fsync(fileno(fp));
2625 fclose(fp);
2626
2627 /* Use RENAME to make sure the DB file is changed atomically only
2628 * if the generate DB file is ok. */
2629 if (rename(tmpfile,filename) == -1) {
325d1eb4 2630 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
ed9b544e 2631 unlink(tmpfile);
2632 return REDIS_ERR;
2633 }
2634 redisLog(REDIS_NOTICE,"DB saved on disk");
2635 server.dirty = 0;
2636 server.lastsave = time(NULL);
2637 return REDIS_OK;
2638
2639werr:
2640 fclose(fp);
2641 unlink(tmpfile);
2642 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2643 if (di) dictReleaseIterator(di);
2644 return REDIS_ERR;
2645}
2646
f78fd11b 2647static int rdbSaveBackground(char *filename) {
ed9b544e 2648 pid_t childpid;
2649
9d65a1bb 2650 if (server.bgsavechildpid != -1) return REDIS_ERR;
ed9b544e 2651 if ((childpid = fork()) == 0) {
2652 /* Child */
2653 close(server.fd);
f78fd11b 2654 if (rdbSave(filename) == REDIS_OK) {
ed9b544e 2655 exit(0);
2656 } else {
2657 exit(1);
2658 }
2659 } else {
2660 /* Parent */
5a7c647e 2661 if (childpid == -1) {
2662 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2663 strerror(errno));
2664 return REDIS_ERR;
2665 }
ed9b544e 2666 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
9f3c422c 2667 server.bgsavechildpid = childpid;
ed9b544e 2668 return REDIS_OK;
2669 }
2670 return REDIS_OK; /* unreached */
2671}
2672
a3b21203 2673static void rdbRemoveTempFile(pid_t childpid) {
2674 char tmpfile[256];
2675
2676 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2677 unlink(tmpfile);
2678}
2679
f78fd11b 2680static int rdbLoadType(FILE *fp) {
2681 unsigned char type;
7b45bfb2 2682 if (fread(&type,1,1,fp) == 0) return -1;
2683 return type;
2684}
2685
bb32ede5 2686static time_t rdbLoadTime(FILE *fp) {
2687 int32_t t32;
2688 if (fread(&t32,4,1,fp) == 0) return -1;
2689 return (time_t) t32;
2690}
2691
e3566d4b 2692/* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2693 * of this file for a description of how this are stored on disk.
2694 *
2695 * isencoded is set to 1 if the readed length is not actually a length but
2696 * an "encoding type", check the above comments for more info */
2697static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
f78fd11b 2698 unsigned char buf[2];
2699 uint32_t len;
2700
e3566d4b 2701 if (isencoded) *isencoded = 0;
f78fd11b 2702 if (rdbver == 0) {
2703 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2704 return ntohl(len);
2705 } else {
17be1a4a 2706 int type;
2707
f78fd11b 2708 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
17be1a4a 2709 type = (buf[0]&0xC0)>>6;
2710 if (type == REDIS_RDB_6BITLEN) {
f78fd11b 2711 /* Read a 6 bit len */
e3566d4b 2712 return buf[0]&0x3F;
2713 } else if (type == REDIS_RDB_ENCVAL) {
2714 /* Read a 6 bit len encoding type */
2715 if (isencoded) *isencoded = 1;
2716 return buf[0]&0x3F;
17be1a4a 2717 } else if (type == REDIS_RDB_14BITLEN) {
f78fd11b 2718 /* Read a 14 bit len */
2719 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2720 return ((buf[0]&0x3F)<<8)|buf[1];
2721 } else {
2722 /* Read a 32 bit len */
2723 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2724 return ntohl(len);
2725 }
2726 }
f78fd11b 2727}
2728
e3566d4b 2729static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2730 unsigned char enc[4];
2731 long long val;
2732
2733 if (enctype == REDIS_RDB_ENC_INT8) {
2734 if (fread(enc,1,1,fp) == 0) return NULL;
2735 val = (signed char)enc[0];
2736 } else if (enctype == REDIS_RDB_ENC_INT16) {
2737 uint16_t v;
2738 if (fread(enc,2,1,fp) == 0) return NULL;
2739 v = enc[0]|(enc[1]<<8);
2740 val = (int16_t)v;
2741 } else if (enctype == REDIS_RDB_ENC_INT32) {
2742 uint32_t v;
2743 if (fread(enc,4,1,fp) == 0) return NULL;
2744 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2745 val = (int32_t)v;
2746 } else {
2747 val = 0; /* anti-warning */
2748 assert(0!=0);
2749 }
2750 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2751}
2752
88e85998 2753static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2754 unsigned int len, clen;
2755 unsigned char *c = NULL;
2756 sds val = NULL;
2757
2758 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2759 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2760 if ((c = zmalloc(clen)) == NULL) goto err;
2761 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2762 if (fread(c,clen,1,fp) == 0) goto err;
2763 if (lzf_decompress(c,clen,val,len) == 0) goto err;
5109cdff 2764 zfree(c);
88e85998 2765 return createObject(REDIS_STRING,val);
2766err:
2767 zfree(c);
2768 sdsfree(val);
2769 return NULL;
2770}
2771
e3566d4b 2772static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2773 int isencoded;
2774 uint32_t len;
f78fd11b 2775 sds val;
2776
e3566d4b 2777 len = rdbLoadLen(fp,rdbver,&isencoded);
2778 if (isencoded) {
2779 switch(len) {
2780 case REDIS_RDB_ENC_INT8:
2781 case REDIS_RDB_ENC_INT16:
2782 case REDIS_RDB_ENC_INT32:
3305306f 2783 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
88e85998 2784 case REDIS_RDB_ENC_LZF:
2785 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
e3566d4b 2786 default:
2787 assert(0!=0);
2788 }
2789 }
2790
f78fd11b 2791 if (len == REDIS_RDB_LENERR) return NULL;
2792 val = sdsnewlen(NULL,len);
2793 if (len && fread(val,len,1,fp) == 0) {
2794 sdsfree(val);
2795 return NULL;
2796 }
10c43610 2797 return tryObjectSharing(createObject(REDIS_STRING,val));
f78fd11b 2798}
2799
a7866db6 2800/* For information about double serialization check rdbSaveDoubleValue() */
2801static int rdbLoadDoubleValue(FILE *fp, double *val) {
2802 char buf[128];
2803 unsigned char len;
2804
2805 if (fread(&len,1,1,fp) == 0) return -1;
2806 switch(len) {
2807 case 255: *val = R_NegInf; return 0;
2808 case 254: *val = R_PosInf; return 0;
2809 case 253: *val = R_Nan; return 0;
2810 default:
2811 if (fread(buf,len,1,fp) == 0) return -1;
2812 sscanf(buf, "%lg", val);
2813 return 0;
2814 }
2815}
2816
f78fd11b 2817static int rdbLoad(char *filename) {
ed9b544e 2818 FILE *fp;
f78fd11b 2819 robj *keyobj = NULL;
2820 uint32_t dbid;
bb32ede5 2821 int type, retval, rdbver;
3305306f 2822 dict *d = server.db[0].dict;
bb32ede5 2823 redisDb *db = server.db+0;
f78fd11b 2824 char buf[1024];
bb32ede5 2825 time_t expiretime = -1, now = time(NULL);
2826
ed9b544e 2827 fp = fopen(filename,"r");
2828 if (!fp) return REDIS_ERR;
2829 if (fread(buf,9,1,fp) == 0) goto eoferr;
f78fd11b 2830 buf[9] = '\0';
2831 if (memcmp(buf,"REDIS",5) != 0) {
ed9b544e 2832 fclose(fp);
2833 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2834 return REDIS_ERR;
2835 }
f78fd11b 2836 rdbver = atoi(buf+5);
2837 if (rdbver > 1) {
2838 fclose(fp);
2839 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2840 return REDIS_ERR;
2841 }
ed9b544e 2842 while(1) {
2843 robj *o;
2844
2845 /* Read type. */
f78fd11b 2846 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
bb32ede5 2847 if (type == REDIS_EXPIRETIME) {
2848 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2849 /* We read the time so we need to read the object type again */
2850 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2851 }
ed9b544e 2852 if (type == REDIS_EOF) break;
2853 /* Handle SELECT DB opcode as a special case */
2854 if (type == REDIS_SELECTDB) {
e3566d4b 2855 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2856 goto eoferr;
ed9b544e 2857 if (dbid >= (unsigned)server.dbnum) {
f78fd11b 2858 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
ed9b544e 2859 exit(1);
2860 }
bb32ede5 2861 db = server.db+dbid;
2862 d = db->dict;
ed9b544e 2863 continue;
2864 }
2865 /* Read key */
f78fd11b 2866 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
ed9b544e 2867
2868 if (type == REDIS_STRING) {
2869 /* Read string value */
f78fd11b 2870 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
942a3961 2871 tryObjectEncoding(o);
ed9b544e 2872 } else if (type == REDIS_LIST || type == REDIS_SET) {
2873 /* Read list/set value */
2874 uint32_t listlen;
f78fd11b 2875
e3566d4b 2876 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
f78fd11b 2877 goto eoferr;
ed9b544e 2878 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2879 /* Load every single element of the list/set */
2880 while(listlen--) {
2881 robj *ele;
2882
f78fd11b 2883 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
942a3961 2884 tryObjectEncoding(ele);
ed9b544e 2885 if (type == REDIS_LIST) {
6b47e12e 2886 listAddNodeTail((list*)o->ptr,ele);
ed9b544e 2887 } else {
6b47e12e 2888 dictAdd((dict*)o->ptr,ele,NULL);
ed9b544e 2889 }
ed9b544e 2890 }
2b59cfdf 2891 } else if (type == REDIS_ZSET) {
2892 /* Read list/set value */
2893 uint32_t zsetlen;
2894 zset *zs;
2895
2896 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2897 goto eoferr;
2898 o = createZsetObject();
2899 zs = o->ptr;
2900 /* Load every single element of the list/set */
2901 while(zsetlen--) {
2902 robj *ele;
2903 double *score = zmalloc(sizeof(double));
2904
2905 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2906 tryObjectEncoding(ele);
2907 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2908 dictAdd(zs->dict,ele,score);
2909 zslInsert(zs->zsl,*score,ele);
2910 incrRefCount(ele); /* added to skiplist */
2911 }
ed9b544e 2912 } else {
2913 assert(0 != 0);
2914 }
2915 /* Add the new object in the hash table */
f78fd11b 2916 retval = dictAdd(d,keyobj,o);
ed9b544e 2917 if (retval == DICT_ERR) {
f78fd11b 2918 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
ed9b544e 2919 exit(1);
2920 }
bb32ede5 2921 /* Set the expire time if needed */
2922 if (expiretime != -1) {
2923 setExpire(db,keyobj,expiretime);
2924 /* Delete this key if already expired */
2925 if (expiretime < now) deleteKey(db,keyobj);
2926 expiretime = -1;
2927 }
f78fd11b 2928 keyobj = o = NULL;
ed9b544e 2929 }
2930 fclose(fp);
2931 return REDIS_OK;
2932
2933eoferr: /* unexpected end of file is handled here with a fatal exit */
e3566d4b 2934 if (keyobj) decrRefCount(keyobj);
f80dff62 2935 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
ed9b544e 2936 exit(1);
2937 return REDIS_ERR; /* Just to avoid warning */
2938}
2939
2940/*================================== Commands =============================== */
2941
abcb223e 2942static void authCommand(redisClient *c) {
2e77c2ee 2943 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
abcb223e
BH
2944 c->authenticated = 1;
2945 addReply(c,shared.ok);
2946 } else {
2947 c->authenticated = 0;
fa4c0aba 2948 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
abcb223e
BH
2949 }
2950}
2951
ed9b544e 2952static void pingCommand(redisClient *c) {
2953 addReply(c,shared.pong);
2954}
2955
2956static void echoCommand(redisClient *c) {
942a3961 2957 addReplyBulkLen(c,c->argv[1]);
ed9b544e 2958 addReply(c,c->argv[1]);
2959 addReply(c,shared.crlf);
2960}
2961
2962/*=================================== Strings =============================== */
2963
2964static void setGenericCommand(redisClient *c, int nx) {
2965 int retval;
2966
3305306f 2967 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
ed9b544e 2968 if (retval == DICT_ERR) {
2969 if (!nx) {
3305306f 2970 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
ed9b544e 2971 incrRefCount(c->argv[2]);
2972 } else {
c937aa89 2973 addReply(c,shared.czero);
ed9b544e 2974 return;
2975 }
2976 } else {
2977 incrRefCount(c->argv[1]);
2978 incrRefCount(c->argv[2]);
2979 }
2980 server.dirty++;
3305306f 2981 removeExpire(c->db,c->argv[1]);
c937aa89 2982 addReply(c, nx ? shared.cone : shared.ok);
ed9b544e 2983}
2984
2985static void setCommand(redisClient *c) {
a4d1ba9a 2986 setGenericCommand(c,0);
ed9b544e 2987}
2988
2989static void setnxCommand(redisClient *c) {
a4d1ba9a 2990 setGenericCommand(c,1);
ed9b544e 2991}
2992
2993static void getCommand(redisClient *c) {
3305306f 2994 robj *o = lookupKeyRead(c->db,c->argv[1]);
2995
2996 if (o == NULL) {
c937aa89 2997 addReply(c,shared.nullbulk);
ed9b544e 2998 } else {
ed9b544e 2999 if (o->type != REDIS_STRING) {
c937aa89 3000 addReply(c,shared.wrongtypeerr);
ed9b544e 3001 } else {
942a3961 3002 addReplyBulkLen(c,o);
ed9b544e 3003 addReply(c,o);
3004 addReply(c,shared.crlf);
3005 }
3006 }
3007}
3008
f6b141c5 3009static void getsetCommand(redisClient *c) {
a431eb74 3010 getCommand(c);
3011 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3012 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3013 } else {
3014 incrRefCount(c->argv[1]);
3015 }
3016 incrRefCount(c->argv[2]);
3017 server.dirty++;
3018 removeExpire(c->db,c->argv[1]);
3019}
3020
70003d28 3021static void mgetCommand(redisClient *c) {
70003d28 3022 int j;
3023
c937aa89 3024 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
70003d28 3025 for (j = 1; j < c->argc; j++) {
3305306f 3026 robj *o = lookupKeyRead(c->db,c->argv[j]);
3027 if (o == NULL) {
c937aa89 3028 addReply(c,shared.nullbulk);
70003d28 3029 } else {
70003d28 3030 if (o->type != REDIS_STRING) {
c937aa89 3031 addReply(c,shared.nullbulk);
70003d28 3032 } else {
942a3961 3033 addReplyBulkLen(c,o);
70003d28 3034 addReply(c,o);
3035 addReply(c,shared.crlf);
3036 }
3037 }
3038 }
3039}
3040
6c446631 3041static void msetGenericCommand(redisClient *c, int nx) {
3042 int j;
3043
3044 if ((c->argc % 2) == 0) {
3045 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
3046 return;
3047 }
3048 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3049 * set nothing at all if at least one already key exists. */
3050 if (nx) {
3051 for (j = 1; j < c->argc; j += 2) {
3052 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
3053 addReply(c, shared.czero);
3054 return;
3055 }
3056 }
3057 }
3058
3059 for (j = 1; j < c->argc; j += 2) {
3060 int retval;
3061
17511391 3062 tryObjectEncoding(c->argv[j+1]);
6c446631 3063 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3064 if (retval == DICT_ERR) {
3065 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3066 incrRefCount(c->argv[j+1]);
3067 } else {
3068 incrRefCount(c->argv[j]);
3069 incrRefCount(c->argv[j+1]);
3070 }
3071 removeExpire(c->db,c->argv[j]);
3072 }
3073 server.dirty += (c->argc-1)/2;
3074 addReply(c, nx ? shared.cone : shared.ok);
3075}
3076
3077static void msetCommand(redisClient *c) {
3078 msetGenericCommand(c,0);
3079}
3080
3081static void msetnxCommand(redisClient *c) {
3082 msetGenericCommand(c,1);
3083}
3084
d68ed120 3085static void incrDecrCommand(redisClient *c, long long incr) {
ed9b544e 3086 long long value;
3087 int retval;
3088 robj *o;
3089
3305306f 3090 o = lookupKeyWrite(c->db,c->argv[1]);
3091 if (o == NULL) {
ed9b544e 3092 value = 0;
3093 } else {
ed9b544e 3094 if (o->type != REDIS_STRING) {
3095 value = 0;
3096 } else {
3097 char *eptr;
3098
942a3961 3099 if (o->encoding == REDIS_ENCODING_RAW)
3100 value = strtoll(o->ptr, &eptr, 10);
3101 else if (o->encoding == REDIS_ENCODING_INT)
3102 value = (long)o->ptr;
3103 else
3104 assert(1 != 1);
ed9b544e 3105 }
3106 }
3107
3108 value += incr;
3109 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
942a3961 3110 tryObjectEncoding(o);
3305306f 3111 retval = dictAdd(c->db->dict,c->argv[1],o);
ed9b544e 3112 if (retval == DICT_ERR) {
3305306f 3113 dictReplace(c->db->dict,c->argv[1],o);
3114 removeExpire(c->db,c->argv[1]);
ed9b544e 3115 } else {
3116 incrRefCount(c->argv[1]);
3117 }
3118 server.dirty++;
c937aa89 3119 addReply(c,shared.colon);
ed9b544e 3120 addReply(c,o);
3121 addReply(c,shared.crlf);
3122}
3123
3124static void incrCommand(redisClient *c) {
a4d1ba9a 3125 incrDecrCommand(c,1);
ed9b544e 3126}
3127
3128static void decrCommand(redisClient *c) {
a4d1ba9a 3129 incrDecrCommand(c,-1);
ed9b544e 3130}
3131
3132static void incrbyCommand(redisClient *c) {
d68ed120 3133 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
a4d1ba9a 3134 incrDecrCommand(c,incr);
ed9b544e 3135}
3136
3137static void decrbyCommand(redisClient *c) {
d68ed120 3138 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
a4d1ba9a 3139 incrDecrCommand(c,-incr);
ed9b544e 3140}
3141
3142/* ========================= Type agnostic commands ========================= */
3143
3144static void delCommand(redisClient *c) {
5109cdff 3145 int deleted = 0, j;
3146
3147 for (j = 1; j < c->argc; j++) {
3148 if (deleteKey(c->db,c->argv[j])) {
3149 server.dirty++;
3150 deleted++;
3151 }
3152 }
3153 switch(deleted) {
3154 case 0:
c937aa89 3155 addReply(c,shared.czero);
5109cdff 3156 break;
3157 case 1:
3158 addReply(c,shared.cone);
3159 break;
3160 default:
3161 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3162 break;
ed9b544e 3163 }
3164}
3165
3166static void existsCommand(redisClient *c) {
3305306f 3167 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
ed9b544e 3168}
3169
3170static void selectCommand(redisClient *c) {
3171 int id = atoi(c->argv[1]->ptr);
3172
3173 if (selectDb(c,id) == REDIS_ERR) {
774e3047 3174 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
ed9b544e 3175 } else {
3176 addReply(c,shared.ok);
3177 }
3178}
3179
3180static void randomkeyCommand(redisClient *c) {
3181 dictEntry *de;
3305306f 3182
3183 while(1) {
3184 de = dictGetRandomKey(c->db->dict);
ce7bef07 3185 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3305306f 3186 }
ed9b544e 3187 if (de == NULL) {
ce7bef07 3188 addReply(c,shared.plus);
ed9b544e 3189 addReply(c,shared.crlf);
3190 } else {
c937aa89 3191 addReply(c,shared.plus);
ed9b544e 3192 addReply(c,dictGetEntryKey(de));
3193 addReply(c,shared.crlf);
3194 }
3195}
3196
3197static void keysCommand(redisClient *c) {
3198 dictIterator *di;
3199 dictEntry *de;
3200 sds pattern = c->argv[1]->ptr;
3201 int plen = sdslen(pattern);
3202 int numkeys = 0, keyslen = 0;
3203 robj *lenobj = createObject(REDIS_STRING,NULL);
3204
3305306f 3205 di = dictGetIterator(c->db->dict);
ed9b544e 3206 addReply(c,lenobj);
3207 decrRefCount(lenobj);
3208 while((de = dictNext(di)) != NULL) {
3209 robj *keyobj = dictGetEntryKey(de);
3305306f 3210
ed9b544e 3211 sds key = keyobj->ptr;
3212 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3213 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3305306f 3214 if (expireIfNeeded(c->db,keyobj) == 0) {
3215 if (numkeys != 0)
3216 addReply(c,shared.space);
3217 addReply(c,keyobj);
3218 numkeys++;
3219 keyslen += sdslen(key);
3220 }
ed9b544e 3221 }
3222 }
3223 dictReleaseIterator(di);
c937aa89 3224 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
ed9b544e 3225 addReply(c,shared.crlf);
3226}
3227
3228static void dbsizeCommand(redisClient *c) {
3229 addReplySds(c,
3305306f 3230 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
ed9b544e 3231}
3232
3233static void lastsaveCommand(redisClient *c) {
3234 addReplySds(c,
c937aa89 3235 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
ed9b544e 3236}
3237
3238static void typeCommand(redisClient *c) {
3305306f 3239 robj *o;
ed9b544e 3240 char *type;
3305306f 3241
3242 o = lookupKeyRead(c->db,c->argv[1]);
3243 if (o == NULL) {
c937aa89 3244 type = "+none";
ed9b544e 3245 } else {
ed9b544e 3246 switch(o->type) {
c937aa89 3247 case REDIS_STRING: type = "+string"; break;
3248 case REDIS_LIST: type = "+list"; break;
3249 case REDIS_SET: type = "+set"; break;
412a8bce 3250 case REDIS_ZSET: type = "+zset"; break;
ed9b544e 3251 default: type = "unknown"; break;
3252 }
3253 }
3254 addReplySds(c,sdsnew(type));
3255 addReply(c,shared.crlf);
3256}
3257
3258static void saveCommand(redisClient *c) {
9d65a1bb 3259 if (server.bgsavechildpid != -1) {
05557f6d 3260 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3261 return;
3262 }
f78fd11b 3263 if (rdbSave(server.dbfilename) == REDIS_OK) {
ed9b544e 3264 addReply(c,shared.ok);
3265 } else {
3266 addReply(c,shared.err);
3267 }
3268}
3269
3270static void bgsaveCommand(redisClient *c) {
9d65a1bb 3271 if (server.bgsavechildpid != -1) {
ed9b544e 3272 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3273 return;
3274 }
f78fd11b 3275 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
ed9b544e 3276 addReply(c,shared.ok);
3277 } else {
3278 addReply(c,shared.err);
3279 }
3280}
3281
3282static void shutdownCommand(redisClient *c) {
3283 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
a3b21203 3284 /* Kill the saving child if there is a background saving in progress.
3285 We want to avoid race conditions, for instance our saving child may
3286 overwrite the synchronous saving did by SHUTDOWN. */
9d65a1bb 3287 if (server.bgsavechildpid != -1) {
9f3c422c 3288 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3289 kill(server.bgsavechildpid,SIGKILL);
a3b21203 3290 rdbRemoveTempFile(server.bgsavechildpid);
9f3c422c 3291 }
a3b21203 3292 /* SYNC SAVE */
f78fd11b 3293 if (rdbSave(server.dbfilename) == REDIS_OK) {
9f3c422c 3294 if (server.daemonize)
b284af55 3295 unlink(server.pidfile);
b284af55 3296 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
ed9b544e 3297 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3298 exit(1);
3299 } else {
a3b21203 3300 /* Ooops.. error saving! The best we can do is to continue operating.
3301 * Note that if there was a background saving process, in the next
3302 * cron() Redis will be notified that the background saving aborted,
3303 * handling special stuff like slaves pending for synchronization... */
ed9b544e 3304 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3305 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3306 }
3307}
3308
3309static void renameGenericCommand(redisClient *c, int nx) {
ed9b544e 3310 robj *o;
3311
3312 /* To use the same key as src and dst is probably an error */
3313 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
c937aa89 3314 addReply(c,shared.sameobjecterr);
ed9b544e 3315 return;
3316 }
3317
3305306f 3318 o = lookupKeyWrite(c->db,c->argv[1]);
3319 if (o == NULL) {
c937aa89 3320 addReply(c,shared.nokeyerr);
ed9b544e 3321 return;
3322 }
ed9b544e 3323 incrRefCount(o);
3305306f 3324 deleteIfVolatile(c->db,c->argv[2]);
3325 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
ed9b544e 3326 if (nx) {
3327 decrRefCount(o);
c937aa89 3328 addReply(c,shared.czero);
ed9b544e 3329 return;
3330 }
3305306f 3331 dictReplace(c->db->dict,c->argv[2],o);
ed9b544e 3332 } else {
3333 incrRefCount(c->argv[2]);
3334 }
3305306f 3335 deleteKey(c->db,c->argv[1]);
ed9b544e 3336 server.dirty++;
c937aa89 3337 addReply(c,nx ? shared.cone : shared.ok);
ed9b544e 3338}
3339
3340static void renameCommand(redisClient *c) {
3341 renameGenericCommand(c,0);
3342}
3343
3344static void renamenxCommand(redisClient *c) {
3345 renameGenericCommand(c,1);
3346}
3347
3348static void moveCommand(redisClient *c) {
3305306f 3349 robj *o;
3350 redisDb *src, *dst;
ed9b544e 3351 int srcid;
3352
3353 /* Obtain source and target DB pointers */
3305306f 3354 src = c->db;
3355 srcid = c->db->id;
ed9b544e 3356 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
c937aa89 3357 addReply(c,shared.outofrangeerr);
ed9b544e 3358 return;
3359 }
3305306f 3360 dst = c->db;
3361 selectDb(c,srcid); /* Back to the source DB */
ed9b544e 3362
3363 /* If the user is moving using as target the same
3364 * DB as the source DB it is probably an error. */
3365 if (src == dst) {
c937aa89 3366 addReply(c,shared.sameobjecterr);
ed9b544e 3367 return;
3368 }
3369
3370 /* Check if the element exists and get a reference */
3305306f 3371 o = lookupKeyWrite(c->db,c->argv[1]);
3372 if (!o) {
c937aa89 3373 addReply(c,shared.czero);
ed9b544e 3374 return;
3375 }
3376
3377 /* Try to add the element to the target DB */
3305306f 3378 deleteIfVolatile(dst,c->argv[1]);
3379 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
c937aa89 3380 addReply(c,shared.czero);
ed9b544e 3381 return;
3382 }
3305306f 3383 incrRefCount(c->argv[1]);
ed9b544e 3384 incrRefCount(o);
3385
3386 /* OK! key moved, free the entry in the source DB */
3305306f 3387 deleteKey(src,c->argv[1]);
ed9b544e 3388 server.dirty++;
c937aa89 3389 addReply(c,shared.cone);
ed9b544e 3390}
3391
3392/* =================================== Lists ================================ */
3393static void pushGenericCommand(redisClient *c, int where) {
3394 robj *lobj;
ed9b544e 3395 list *list;
3305306f 3396
3397 lobj = lookupKeyWrite(c->db,c->argv[1]);
3398 if (lobj == NULL) {
ed9b544e 3399 lobj = createListObject();
3400 list = lobj->ptr;
3401 if (where == REDIS_HEAD) {
6b47e12e 3402 listAddNodeHead(list,c->argv[2]);
ed9b544e 3403 } else {
6b47e12e 3404 listAddNodeTail(list,c->argv[2]);
ed9b544e 3405 }
3305306f 3406 dictAdd(c->db->dict,c->argv[1],lobj);
ed9b544e 3407 incrRefCount(c->argv[1]);
3408 incrRefCount(c->argv[2]);
3409 } else {
ed9b544e 3410 if (lobj->type != REDIS_LIST) {
3411 addReply(c,shared.wrongtypeerr);
3412 return;
3413 }
3414 list = lobj->ptr;
3415 if (where == REDIS_HEAD) {
6b47e12e 3416 listAddNodeHead(list,c->argv[2]);
ed9b544e 3417 } else {
6b47e12e 3418 listAddNodeTail(list,c->argv[2]);
ed9b544e 3419 }
3420 incrRefCount(c->argv[2]);
3421 }
3422 server.dirty++;
3423 addReply(c,shared.ok);
3424}
3425
3426static void lpushCommand(redisClient *c) {
3427 pushGenericCommand(c,REDIS_HEAD);
3428}
3429
3430static void rpushCommand(redisClient *c) {
3431 pushGenericCommand(c,REDIS_TAIL);
3432}
3433
3434static void llenCommand(redisClient *c) {
3305306f 3435 robj *o;
ed9b544e 3436 list *l;
3437
3305306f 3438 o = lookupKeyRead(c->db,c->argv[1]);
3439 if (o == NULL) {
c937aa89 3440 addReply(c,shared.czero);
ed9b544e 3441 return;
3442 } else {
ed9b544e 3443 if (o->type != REDIS_LIST) {
c937aa89 3444 addReply(c,shared.wrongtypeerr);
ed9b544e 3445 } else {
3446 l = o->ptr;
c937aa89 3447 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
ed9b544e 3448 }
3449 }
3450}
3451
3452static void lindexCommand(redisClient *c) {
3305306f 3453 robj *o;
ed9b544e 3454 int index = atoi(c->argv[2]->ptr);
3455
3305306f 3456 o = lookupKeyRead(c->db,c->argv[1]);
3457 if (o == NULL) {
c937aa89 3458 addReply(c,shared.nullbulk);
ed9b544e 3459 } else {
ed9b544e 3460 if (o->type != REDIS_LIST) {
c937aa89 3461 addReply(c,shared.wrongtypeerr);
ed9b544e 3462 } else {
3463 list *list = o->ptr;
3464 listNode *ln;
3465
3466 ln = listIndex(list, index);
3467 if (ln == NULL) {
c937aa89 3468 addReply(c,shared.nullbulk);
ed9b544e 3469 } else {
3470 robj *ele = listNodeValue(ln);
942a3961 3471 addReplyBulkLen(c,ele);
ed9b544e 3472 addReply(c,ele);
3473 addReply(c,shared.crlf);
3474 }
3475 }
3476 }
3477}
3478
3479static void lsetCommand(redisClient *c) {
3305306f 3480 robj *o;
ed9b544e 3481 int index = atoi(c->argv[2]->ptr);
3482
3305306f 3483 o = lookupKeyWrite(c->db,c->argv[1]);
3484 if (o == NULL) {
ed9b544e 3485 addReply(c,shared.nokeyerr);
3486 } else {
ed9b544e 3487 if (o->type != REDIS_LIST) {
3488 addReply(c,shared.wrongtypeerr);
3489 } else {
3490 list *list = o->ptr;
3491 listNode *ln;
3492
3493 ln = listIndex(list, index);
3494 if (ln == NULL) {
c937aa89 3495 addReply(c,shared.outofrangeerr);
ed9b544e 3496 } else {
3497 robj *ele = listNodeValue(ln);
3498
3499 decrRefCount(ele);
3500 listNodeValue(ln) = c->argv[3];
3501 incrRefCount(c->argv[3]);
3502 addReply(c,shared.ok);
3503 server.dirty++;
3504 }
3505 }
3506 }
3507}
3508
3509static void popGenericCommand(redisClient *c, int where) {
3305306f 3510 robj *o;
3511
3512 o = lookupKeyWrite(c->db,c->argv[1]);
3513 if (o == NULL) {
c937aa89 3514 addReply(c,shared.nullbulk);
ed9b544e 3515 } else {
ed9b544e 3516 if (o->type != REDIS_LIST) {
c937aa89 3517 addReply(c,shared.wrongtypeerr);
ed9b544e 3518 } else {
3519 list *list = o->ptr;
3520 listNode *ln;
3521
3522 if (where == REDIS_HEAD)
3523 ln = listFirst(list);
3524 else
3525 ln = listLast(list);
3526
3527 if (ln == NULL) {
c937aa89 3528 addReply(c,shared.nullbulk);
ed9b544e 3529 } else {
3530 robj *ele = listNodeValue(ln);
942a3961 3531 addReplyBulkLen(c,ele);
ed9b544e 3532 addReply(c,ele);
3533 addReply(c,shared.crlf);
3534 listDelNode(list,ln);
3535 server.dirty++;
3536 }
3537 }
3538 }
3539}
3540
3541static void lpopCommand(redisClient *c) {
3542 popGenericCommand(c,REDIS_HEAD);
3543}
3544
3545static void rpopCommand(redisClient *c) {
3546 popGenericCommand(c,REDIS_TAIL);
3547}
3548
3549static void lrangeCommand(redisClient *c) {
3305306f 3550 robj *o;
ed9b544e 3551 int start = atoi(c->argv[2]->ptr);
3552 int end = atoi(c->argv[3]->ptr);
3305306f 3553
3554 o = lookupKeyRead(c->db,c->argv[1]);
3555 if (o == NULL) {
c937aa89 3556 addReply(c,shared.nullmultibulk);
ed9b544e 3557 } else {
ed9b544e 3558 if (o->type != REDIS_LIST) {
c937aa89 3559 addReply(c,shared.wrongtypeerr);
ed9b544e 3560 } else {
3561 list *list = o->ptr;
3562 listNode *ln;
3563 int llen = listLength(list);
3564 int rangelen, j;
3565 robj *ele;
3566
3567 /* convert negative indexes */
3568 if (start < 0) start = llen+start;
3569 if (end < 0) end = llen+end;
3570 if (start < 0) start = 0;
3571 if (end < 0) end = 0;
3572
3573 /* indexes sanity checks */
3574 if (start > end || start >= llen) {
3575 /* Out of range start or start > end result in empty list */
c937aa89 3576 addReply(c,shared.emptymultibulk);
ed9b544e 3577 return;
3578 }
3579 if (end >= llen) end = llen-1;
3580 rangelen = (end-start)+1;
3581
3582 /* Return the result in form of a multi-bulk reply */
3583 ln = listIndex(list, start);
c937aa89 3584 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
ed9b544e 3585 for (j = 0; j < rangelen; j++) {
3586 ele = listNodeValue(ln);
942a3961 3587 addReplyBulkLen(c,ele);
ed9b544e 3588 addReply(c,ele);
3589 addReply(c,shared.crlf);
3590 ln = ln->next;
3591 }
3592 }
3593 }
3594}
3595
3596static void ltrimCommand(redisClient *c) {
3305306f 3597 robj *o;
ed9b544e 3598 int start = atoi(c->argv[2]->ptr);
3599 int end = atoi(c->argv[3]->ptr);
3600
3305306f 3601 o = lookupKeyWrite(c->db,c->argv[1]);
3602 if (o == NULL) {
ed9b544e 3603 addReply(c,shared.nokeyerr);
3604 } else {
ed9b544e 3605 if (o->type != REDIS_LIST) {
3606 addReply(c,shared.wrongtypeerr);
3607 } else {
3608 list *list = o->ptr;
3609 listNode *ln;
3610 int llen = listLength(list);
3611 int j, ltrim, rtrim;
3612
3613 /* convert negative indexes */
3614 if (start < 0) start = llen+start;
3615 if (end < 0) end = llen+end;
3616 if (start < 0) start = 0;
3617 if (end < 0) end = 0;
3618
3619 /* indexes sanity checks */
3620 if (start > end || start >= llen) {
3621 /* Out of range start or start > end result in empty list */
3622 ltrim = llen;
3623 rtrim = 0;
3624 } else {
3625 if (end >= llen) end = llen-1;
3626 ltrim = start;
3627 rtrim = llen-end-1;
3628 }
3629
3630 /* Remove list elements to perform the trim */
3631 for (j = 0; j < ltrim; j++) {
3632 ln = listFirst(list);
3633 listDelNode(list,ln);
3634 }
3635 for (j = 0; j < rtrim; j++) {
3636 ln = listLast(list);
3637 listDelNode(list,ln);
3638 }
ed9b544e 3639 server.dirty++;
e59229a2 3640 addReply(c,shared.ok);
ed9b544e 3641 }
3642 }
3643}
3644
3645static void lremCommand(redisClient *c) {
3305306f 3646 robj *o;
ed9b544e 3647
3305306f 3648 o = lookupKeyWrite(c->db,c->argv[1]);
3649 if (o == NULL) {
33c08b39 3650 addReply(c,shared.czero);
ed9b544e 3651 } else {
ed9b544e 3652 if (o->type != REDIS_LIST) {
c937aa89 3653 addReply(c,shared.wrongtypeerr);
ed9b544e 3654 } else {
3655 list *list = o->ptr;
3656 listNode *ln, *next;
3657 int toremove = atoi(c->argv[2]->ptr);
3658 int removed = 0;
3659 int fromtail = 0;
3660
3661 if (toremove < 0) {
3662 toremove = -toremove;
3663 fromtail = 1;
3664 }
3665 ln = fromtail ? list->tail : list->head;
3666 while (ln) {
ed9b544e 3667 robj *ele = listNodeValue(ln);
a4d1ba9a 3668
3669 next = fromtail ? ln->prev : ln->next;
724a51b1 3670 if (compareStringObjects(ele,c->argv[3]) == 0) {
ed9b544e 3671 listDelNode(list,ln);
3672 server.dirty++;
3673 removed++;
3674 if (toremove && removed == toremove) break;
3675 }
3676 ln = next;
3677 }
c937aa89 3678 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
ed9b544e 3679 }
3680 }
3681}
3682
12f9d551 3683/* This is the semantic of this command:
0f5f7e9a 3684 * RPOPLPUSH srclist dstlist:
12f9d551 3685 * IF LLEN(srclist) > 0
3686 * element = RPOP srclist
3687 * LPUSH dstlist element
3688 * RETURN element
3689 * ELSE
3690 * RETURN nil
3691 * END
3692 * END
3693 *
3694 * The idea is to be able to get an element from a list in a reliable way
3695 * since the element is not just returned but pushed against another list
3696 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3697 */
0f5f7e9a 3698static void rpoplpushcommand(redisClient *c) {
12f9d551 3699 robj *sobj;
3700
3701 sobj = lookupKeyWrite(c->db,c->argv[1]);
3702 if (sobj == NULL) {
3703 addReply(c,shared.nullbulk);
3704 } else {
3705 if (sobj->type != REDIS_LIST) {
3706 addReply(c,shared.wrongtypeerr);
3707 } else {
3708 list *srclist = sobj->ptr;
3709 listNode *ln = listLast(srclist);
3710
3711 if (ln == NULL) {
3712 addReply(c,shared.nullbulk);
3713 } else {
3714 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3715 robj *ele = listNodeValue(ln);
3716 list *dstlist;
3717
3718 if (dobj == NULL) {
3719
3720 /* Create the list if the key does not exist */
3721 dobj = createListObject();
3722 dictAdd(c->db->dict,c->argv[2],dobj);
3723 incrRefCount(c->argv[2]);
3724 } else if (dobj->type != REDIS_LIST) {
3725 addReply(c,shared.wrongtypeerr);
3726 return;
3727 }
3728 /* Add the element to the target list */
3729 dstlist = dobj->ptr;
3730 listAddNodeHead(dstlist,ele);
3731 incrRefCount(ele);
3732
3733 /* Send the element to the client as reply as well */
3734 addReplyBulkLen(c,ele);
3735 addReply(c,ele);
3736 addReply(c,shared.crlf);
3737
3738 /* Finally remove the element from the source list */
3739 listDelNode(srclist,ln);
3740 server.dirty++;
3741 }
3742 }
3743 }
3744}
3745
3746
ed9b544e 3747/* ==================================== Sets ================================ */
3748
3749static void saddCommand(redisClient *c) {
ed9b544e 3750 robj *set;
3751
3305306f 3752 set = lookupKeyWrite(c->db,c->argv[1]);
3753 if (set == NULL) {
ed9b544e 3754 set = createSetObject();
3305306f 3755 dictAdd(c->db->dict,c->argv[1],set);
ed9b544e 3756 incrRefCount(c->argv[1]);
3757 } else {
ed9b544e 3758 if (set->type != REDIS_SET) {
c937aa89 3759 addReply(c,shared.wrongtypeerr);
ed9b544e 3760 return;
3761 }
3762 }
3763 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3764 incrRefCount(c->argv[2]);
3765 server.dirty++;
c937aa89 3766 addReply(c,shared.cone);
ed9b544e 3767 } else {
c937aa89 3768 addReply(c,shared.czero);
ed9b544e 3769 }
3770}
3771
3772static void sremCommand(redisClient *c) {
3305306f 3773 robj *set;
ed9b544e 3774
3305306f 3775 set = lookupKeyWrite(c->db,c->argv[1]);
3776 if (set == NULL) {
c937aa89 3777 addReply(c,shared.czero);
ed9b544e 3778 } else {
ed9b544e 3779 if (set->type != REDIS_SET) {
c937aa89 3780 addReply(c,shared.wrongtypeerr);
ed9b544e 3781 return;
3782 }
3783 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3784 server.dirty++;
12fea928 3785 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
c937aa89 3786 addReply(c,shared.cone);
ed9b544e 3787 } else {
c937aa89 3788 addReply(c,shared.czero);
ed9b544e 3789 }
3790 }
3791}
3792
a4460ef4 3793static void smoveCommand(redisClient *c) {
3794 robj *srcset, *dstset;
3795
3796 srcset = lookupKeyWrite(c->db,c->argv[1]);
3797 dstset = lookupKeyWrite(c->db,c->argv[2]);
3798
3799 /* If the source key does not exist return 0, if it's of the wrong type
3800 * raise an error */
3801 if (srcset == NULL || srcset->type != REDIS_SET) {
3802 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3803 return;
3804 }
3805 /* Error if the destination key is not a set as well */
3806 if (dstset && dstset->type != REDIS_SET) {
3807 addReply(c,shared.wrongtypeerr);
3808 return;
3809 }
3810 /* Remove the element from the source set */
3811 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3812 /* Key not found in the src set! return zero */
3813 addReply(c,shared.czero);
3814 return;
3815 }
3816 server.dirty++;
3817 /* Add the element to the destination set */
3818 if (!dstset) {
3819 dstset = createSetObject();
3820 dictAdd(c->db->dict,c->argv[2],dstset);
3821 incrRefCount(c->argv[2]);
3822 }
3823 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3824 incrRefCount(c->argv[3]);
3825 addReply(c,shared.cone);
3826}
3827
ed9b544e 3828static void sismemberCommand(redisClient *c) {
3305306f 3829 robj *set;
ed9b544e 3830
3305306f 3831 set = lookupKeyRead(c->db,c->argv[1]);
3832 if (set == NULL) {
c937aa89 3833 addReply(c,shared.czero);
ed9b544e 3834 } else {
ed9b544e 3835 if (set->type != REDIS_SET) {
c937aa89 3836 addReply(c,shared.wrongtypeerr);
ed9b544e 3837 return;
3838 }
3839 if (dictFind(set->ptr,c->argv[2]))
c937aa89 3840 addReply(c,shared.cone);
ed9b544e 3841 else
c937aa89 3842 addReply(c,shared.czero);
ed9b544e 3843 }
3844}
3845
3846static void scardCommand(redisClient *c) {
3305306f 3847 robj *o;
ed9b544e 3848 dict *s;
3849
3305306f 3850 o = lookupKeyRead(c->db,c->argv[1]);
3851 if (o == NULL) {
c937aa89 3852 addReply(c,shared.czero);
ed9b544e 3853 return;
3854 } else {
ed9b544e 3855 if (o->type != REDIS_SET) {
c937aa89 3856 addReply(c,shared.wrongtypeerr);
ed9b544e 3857 } else {
3858 s = o->ptr;
c937aa89 3859 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3305306f 3860 dictSize(s)));
ed9b544e 3861 }
3862 }
3863}
3864
12fea928 3865static void spopCommand(redisClient *c) {
3866 robj *set;
3867 dictEntry *de;
3868
3869 set = lookupKeyWrite(c->db,c->argv[1]);
3870 if (set == NULL) {
3871 addReply(c,shared.nullbulk);
3872 } else {
3873 if (set->type != REDIS_SET) {
3874 addReply(c,shared.wrongtypeerr);
3875 return;
3876 }
3877 de = dictGetRandomKey(set->ptr);
3878 if (de == NULL) {
3879 addReply(c,shared.nullbulk);
3880 } else {
3881 robj *ele = dictGetEntryKey(de);
3882
942a3961 3883 addReplyBulkLen(c,ele);
12fea928 3884 addReply(c,ele);
3885 addReply(c,shared.crlf);
3886 dictDelete(set->ptr,ele);
3887 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3888 server.dirty++;
3889 }
3890 }
3891}
3892
2abb95a9 3893static void srandmemberCommand(redisClient *c) {
3894 robj *set;
3895 dictEntry *de;
3896
3897 set = lookupKeyRead(c->db,c->argv[1]);
3898 if (set == NULL) {
3899 addReply(c,shared.nullbulk);
3900 } else {
3901 if (set->type != REDIS_SET) {
3902 addReply(c,shared.wrongtypeerr);
3903 return;
3904 }
3905 de = dictGetRandomKey(set->ptr);
3906 if (de == NULL) {
3907 addReply(c,shared.nullbulk);
3908 } else {
3909 robj *ele = dictGetEntryKey(de);
3910
3911 addReplyBulkLen(c,ele);
3912 addReply(c,ele);
3913 addReply(c,shared.crlf);
3914 }
3915 }
3916}
3917
ed9b544e 3918static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3919 dict **d1 = (void*) s1, **d2 = (void*) s2;
3920
3305306f 3921 return dictSize(*d1)-dictSize(*d2);
ed9b544e 3922}
3923
3924static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3925 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3926 dictIterator *di;
3927 dictEntry *de;
3928 robj *lenobj = NULL, *dstset = NULL;
3929 int j, cardinality = 0;
3930
ed9b544e 3931 for (j = 0; j < setsnum; j++) {
3932 robj *setobj;
3305306f 3933
3934 setobj = dstkey ?
3935 lookupKeyWrite(c->db,setskeys[j]) :
3936 lookupKeyRead(c->db,setskeys[j]);
3937 if (!setobj) {
ed9b544e 3938 zfree(dv);
5faa6025 3939 if (dstkey) {
3940 deleteKey(c->db,dstkey);
3941 addReply(c,shared.ok);
3942 } else {
3943 addReply(c,shared.nullmultibulk);
3944 }
ed9b544e 3945 return;
3946 }
ed9b544e 3947 if (setobj->type != REDIS_SET) {
3948 zfree(dv);
c937aa89 3949 addReply(c,shared.wrongtypeerr);
ed9b544e 3950 return;
3951 }
3952 dv[j] = setobj->ptr;
3953 }
3954 /* Sort sets from the smallest to largest, this will improve our
3955 * algorithm's performace */
3956 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3957
3958 /* The first thing we should output is the total number of elements...
3959 * since this is a multi-bulk write, but at this stage we don't know
3960 * the intersection set size, so we use a trick, append an empty object
3961 * to the output list and save the pointer to later modify it with the
3962 * right length */
3963 if (!dstkey) {
3964 lenobj = createObject(REDIS_STRING,NULL);
3965 addReply(c,lenobj);
3966 decrRefCount(lenobj);
3967 } else {
3968 /* If we have a target key where to store the resulting set
3969 * create this key with an empty set inside */
3970 dstset = createSetObject();
ed9b544e 3971 }
3972
3973 /* Iterate all the elements of the first (smallest) set, and test
3974 * the element against all the other sets, if at least one set does
3975 * not include the element it is discarded */
3976 di = dictGetIterator(dv[0]);
ed9b544e 3977
3978 while((de = dictNext(di)) != NULL) {
3979 robj *ele;
3980
3981 for (j = 1; j < setsnum; j++)
3982 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3983 if (j != setsnum)
3984 continue; /* at least one set does not contain the member */
3985 ele = dictGetEntryKey(de);
3986 if (!dstkey) {
942a3961 3987 addReplyBulkLen(c,ele);
ed9b544e 3988 addReply(c,ele);
3989 addReply(c,shared.crlf);
3990 cardinality++;
3991 } else {
3992 dictAdd(dstset->ptr,ele,NULL);
3993 incrRefCount(ele);
3994 }
3995 }
3996 dictReleaseIterator(di);
3997
83cdfe18
AG
3998 if (dstkey) {
3999 /* Store the resulting set into the target */
4000 deleteKey(c->db,dstkey);
4001 dictAdd(c->db->dict,dstkey,dstset);
4002 incrRefCount(dstkey);
4003 }
4004
40d224a9 4005 if (!dstkey) {
c937aa89 4006 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
40d224a9 4007 } else {
03fd01c7 4008 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
4009 dictSize((dict*)dstset->ptr)));
40d224a9 4010 server.dirty++;
4011 }
ed9b544e 4012 zfree(dv);
4013}
4014
4015static void sinterCommand(redisClient *c) {
4016 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4017}
4018
4019static void sinterstoreCommand(redisClient *c) {
4020 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4021}
4022
f4f56e1d 4023#define REDIS_OP_UNION 0
4024#define REDIS_OP_DIFF 1
4025
4026static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
40d224a9 4027 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4028 dictIterator *di;
4029 dictEntry *de;
f4f56e1d 4030 robj *dstset = NULL;
40d224a9 4031 int j, cardinality = 0;
4032
40d224a9 4033 for (j = 0; j < setsnum; j++) {
4034 robj *setobj;
4035
4036 setobj = dstkey ?
4037 lookupKeyWrite(c->db,setskeys[j]) :
4038 lookupKeyRead(c->db,setskeys[j]);
4039 if (!setobj) {
4040 dv[j] = NULL;
4041 continue;
4042 }
4043 if (setobj->type != REDIS_SET) {
4044 zfree(dv);
4045 addReply(c,shared.wrongtypeerr);
4046 return;
4047 }
4048 dv[j] = setobj->ptr;
4049 }
4050
4051 /* We need a temp set object to store our union. If the dstkey
4052 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4053 * this set object will be the resulting object to set into the target key*/
4054 dstset = createSetObject();
4055
40d224a9 4056 /* Iterate all the elements of all the sets, add every element a single
4057 * time to the result set */
4058 for (j = 0; j < setsnum; j++) {
51829ed3 4059 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
40d224a9 4060 if (!dv[j]) continue; /* non existing keys are like empty sets */
4061
4062 di = dictGetIterator(dv[j]);
40d224a9 4063
4064 while((de = dictNext(di)) != NULL) {
4065 robj *ele;
4066
4067 /* dictAdd will not add the same element multiple times */
4068 ele = dictGetEntryKey(de);
f4f56e1d 4069 if (op == REDIS_OP_UNION || j == 0) {
4070 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4071 incrRefCount(ele);
40d224a9 4072 cardinality++;
4073 }
f4f56e1d 4074 } else if (op == REDIS_OP_DIFF) {
4075 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4076 cardinality--;
4077 }
40d224a9 4078 }
4079 }
4080 dictReleaseIterator(di);
51829ed3
AG
4081
4082 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
40d224a9 4083 }
4084
f4f56e1d 4085 /* Output the content of the resulting set, if not in STORE mode */
4086 if (!dstkey) {
4087 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4088 di = dictGetIterator(dstset->ptr);
f4f56e1d 4089 while((de = dictNext(di)) != NULL) {
4090 robj *ele;
4091
4092 ele = dictGetEntryKey(de);
942a3961 4093 addReplyBulkLen(c,ele);
f4f56e1d 4094 addReply(c,ele);
4095 addReply(c,shared.crlf);
4096 }
4097 dictReleaseIterator(di);
83cdfe18
AG
4098 } else {
4099 /* If we have a target key where to store the resulting set
4100 * create this key with the result set inside */
4101 deleteKey(c->db,dstkey);
4102 dictAdd(c->db->dict,dstkey,dstset);
4103 incrRefCount(dstkey);
f4f56e1d 4104 }
4105
4106 /* Cleanup */
40d224a9 4107 if (!dstkey) {
40d224a9 4108 decrRefCount(dstset);
4109 } else {
03fd01c7 4110 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
4111 dictSize((dict*)dstset->ptr)));
40d224a9 4112 server.dirty++;
4113 }
4114 zfree(dv);
4115}
4116
4117static void sunionCommand(redisClient *c) {
f4f56e1d 4118 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
40d224a9 4119}
4120
4121static void sunionstoreCommand(redisClient *c) {
f4f56e1d 4122 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4123}
4124
4125static void sdiffCommand(redisClient *c) {
4126 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4127}
4128
4129static void sdiffstoreCommand(redisClient *c) {
4130 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
40d224a9 4131}
4132
6b47e12e 4133/* ==================================== ZSets =============================== */
4134
4135/* ZSETs are ordered sets using two data structures to hold the same elements
4136 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4137 * data structure.
4138 *
4139 * The elements are added to an hash table mapping Redis objects to scores.
4140 * At the same time the elements are added to a skip list mapping scores
4141 * to Redis objects (so objects are sorted by scores in this "view"). */
4142
4143/* This skiplist implementation is almost a C translation of the original
4144 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4145 * Alternative to Balanced Trees", modified in three ways:
4146 * a) this implementation allows for repeated values.
4147 * b) the comparison is not just by key (our 'score') but by satellite data.
4148 * c) there is a back pointer, so it's a doubly linked list with the back
4149 * pointers being only at "level 1". This allows to traverse the list
4150 * from tail to head, useful for ZREVRANGE. */
4151
4152static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4153 zskiplistNode *zn = zmalloc(sizeof(*zn));
4154
4155 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4156 zn->score = score;
4157 zn->obj = obj;
4158 return zn;
4159}
4160
4161static zskiplist *zslCreate(void) {
4162 int j;
4163 zskiplist *zsl;
4164
4165 zsl = zmalloc(sizeof(*zsl));
4166 zsl->level = 1;
cc812361 4167 zsl->length = 0;
6b47e12e 4168 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4169 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4170 zsl->header->forward[j] = NULL;
e3870fab 4171 zsl->header->backward = NULL;
4172 zsl->tail = NULL;
6b47e12e 4173 return zsl;
4174}
4175
fd8ccf44 4176static void zslFreeNode(zskiplistNode *node) {
4177 decrRefCount(node->obj);
ad807e6f 4178 zfree(node->forward);
fd8ccf44 4179 zfree(node);
4180}
4181
4182static void zslFree(zskiplist *zsl) {
ad807e6f 4183 zskiplistNode *node = zsl->header->forward[0], *next;
fd8ccf44 4184
ad807e6f 4185 zfree(zsl->header->forward);
4186 zfree(zsl->header);
fd8ccf44 4187 while(node) {
599379dd 4188 next = node->forward[0];
fd8ccf44 4189 zslFreeNode(node);
4190 node = next;
4191 }
ad807e6f 4192 zfree(zsl);
fd8ccf44 4193}
4194
6b47e12e 4195static int zslRandomLevel(void) {
4196 int level = 1;
4197 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4198 level += 1;
4199 return level;
4200}
4201
4202static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4203 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4204 int i, level;
4205
4206 x = zsl->header;
4207 for (i = zsl->level-1; i >= 0; i--) {
9d60e6e4 4208 while (x->forward[i] &&
4209 (x->forward[i]->score < score ||
4210 (x->forward[i]->score == score &&
4211 compareStringObjects(x->forward[i]->obj,obj) < 0)))
6b47e12e 4212 x = x->forward[i];
4213 update[i] = x;
4214 }
6b47e12e 4215 /* we assume the key is not already inside, since we allow duplicated
4216 * scores, and the re-insertion of score and redis object should never
4217 * happpen since the caller of zslInsert() should test in the hash table
4218 * if the element is already inside or not. */
4219 level = zslRandomLevel();
4220 if (level > zsl->level) {
4221 for (i = zsl->level; i < level; i++)
4222 update[i] = zsl->header;
4223 zsl->level = level;
4224 }
4225 x = zslCreateNode(level,score,obj);
4226 for (i = 0; i < level; i++) {
4227 x->forward[i] = update[i]->forward[i];
4228 update[i]->forward[i] = x;
4229 }
bb975144 4230 x->backward = (update[0] == zsl->header) ? NULL : update[0];
e3870fab 4231 if (x->forward[0])
4232 x->forward[0]->backward = x;
4233 else
4234 zsl->tail = x;
cc812361 4235 zsl->length++;
6b47e12e 4236}
4237
50c55df5 4238/* Delete an element with matching score/object from the skiplist. */
fd8ccf44 4239static int zslDelete(zskiplist *zsl, double score, robj *obj) {
e197b441 4240 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4241 int i;
4242
4243 x = zsl->header;
4244 for (i = zsl->level-1; i >= 0; i--) {
9d60e6e4 4245 while (x->forward[i] &&
4246 (x->forward[i]->score < score ||
4247 (x->forward[i]->score == score &&
4248 compareStringObjects(x->forward[i]->obj,obj) < 0)))
e197b441 4249 x = x->forward[i];
4250 update[i] = x;
4251 }
4252 /* We may have multiple elements with the same score, what we need
4253 * is to find the element with both the right score and object. */
4254 x = x->forward[0];
50c55df5 4255 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
9d60e6e4 4256 for (i = 0; i < zsl->level; i++) {
4257 if (update[i]->forward[i] != x) break;
4258 update[i]->forward[i] = x->forward[i];
4259 }
4260 if (x->forward[0]) {
4261 x->forward[0]->backward = (x->backward == zsl->header) ?
4262 NULL : x->backward;
e197b441 4263 } else {
9d60e6e4 4264 zsl->tail = x->backward;
e197b441 4265 }
9d60e6e4 4266 zslFreeNode(x);
4267 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4268 zsl->level--;
4269 zsl->length--;
4270 return 1;
4271 } else {
4272 return 0; /* not found */
e197b441 4273 }
4274 return 0; /* not found */
fd8ccf44 4275}
4276
1807985b 4277/* Delete all the elements with score between min and max from the skiplist.
4278 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4279 * Note that this function takes the reference to the hash table view of the
4280 * sorted set, in order to remove the elements from the hash table too. */
4281static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4282 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4283 unsigned long removed = 0;
4284 int i;
4285
4286 x = zsl->header;
4287 for (i = zsl->level-1; i >= 0; i--) {
4288 while (x->forward[i] && x->forward[i]->score < min)
4289 x = x->forward[i];
4290 update[i] = x;
4291 }
4292 /* We may have multiple elements with the same score, what we need
4293 * is to find the element with both the right score and object. */
4294 x = x->forward[0];
4295 while (x && x->score <= max) {
4296 zskiplistNode *next;
4297
4298 for (i = 0; i < zsl->level; i++) {
4299 if (update[i]->forward[i] != x) break;
4300 update[i]->forward[i] = x->forward[i];
4301 }
4302 if (x->forward[0]) {
4303 x->forward[0]->backward = (x->backward == zsl->header) ?
4304 NULL : x->backward;
4305 } else {
4306 zsl->tail = x->backward;
4307 }
4308 next = x->forward[0];
4309 dictDelete(dict,x->obj);
4310 zslFreeNode(x);
4311 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4312 zsl->level--;
4313 zsl->length--;
4314 removed++;
4315 x = next;
4316 }
4317 return removed; /* not found */
4318}
4319
50c55df5 4320/* Find the first node having a score equal or greater than the specified one.
4321 * Returns NULL if there is no match. */
4322static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4323 zskiplistNode *x;
4324 int i;
4325
4326 x = zsl->header;
4327 for (i = zsl->level-1; i >= 0; i--) {
4328 while (x->forward[i] && x->forward[i]->score < score)
4329 x = x->forward[i];
4330 }
4331 /* We may have multiple elements with the same score, what we need
4332 * is to find the element with both the right score and object. */
4333 return x->forward[0];
4334}
4335
fd8ccf44 4336/* The actual Z-commands implementations */
4337
7db723ad 4338/* This generic command implements both ZADD and ZINCRBY.
e2665397 4339 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
7db723ad 4340 * the increment if the operation is a ZINCRBY (doincrement == 1). */
e2665397 4341static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
fd8ccf44 4342 robj *zsetobj;
4343 zset *zs;
4344 double *score;
4345
e2665397 4346 zsetobj = lookupKeyWrite(c->db,key);
fd8ccf44 4347 if (zsetobj == NULL) {
4348 zsetobj = createZsetObject();
e2665397 4349 dictAdd(c->db->dict,key,zsetobj);
4350 incrRefCount(key);
fd8ccf44 4351 } else {
4352 if (zsetobj->type != REDIS_ZSET) {
4353 addReply(c,shared.wrongtypeerr);
4354 return;
4355 }
4356 }
fd8ccf44 4357 zs = zsetobj->ptr;
e2665397 4358
7db723ad 4359 /* Ok now since we implement both ZADD and ZINCRBY here the code
e2665397 4360 * needs to handle the two different conditions. It's all about setting
4361 * '*score', that is, the new score to set, to the right value. */
4362 score = zmalloc(sizeof(double));
4363 if (doincrement) {
4364 dictEntry *de;
4365
4366 /* Read the old score. If the element was not present starts from 0 */
4367 de = dictFind(zs->dict,ele);
4368 if (de) {
4369 double *oldscore = dictGetEntryVal(de);
4370 *score = *oldscore + scoreval;
4371 } else {
4372 *score = scoreval;
4373 }
4374 } else {
4375 *score = scoreval;
4376 }
4377
4378 /* What follows is a simple remove and re-insert operation that is common
7db723ad 4379 * to both ZADD and ZINCRBY... */
e2665397 4380 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
fd8ccf44 4381 /* case 1: New element */
e2665397 4382 incrRefCount(ele); /* added to hash */
4383 zslInsert(zs->zsl,*score,ele);
4384 incrRefCount(ele); /* added to skiplist */
fd8ccf44 4385 server.dirty++;
e2665397 4386 if (doincrement)
e2665397 4387 addReplyDouble(c,*score);
91d71bfc 4388 else
4389 addReply(c,shared.cone);
fd8ccf44 4390 } else {
4391 dictEntry *de;
4392 double *oldscore;
4393
4394 /* case 2: Score update operation */
e2665397 4395 de = dictFind(zs->dict,ele);
fd8ccf44 4396 assert(de != NULL);
4397 oldscore = dictGetEntryVal(de);
4398 if (*score != *oldscore) {
4399 int deleted;
4400
e2665397 4401 /* Remove and insert the element in the skip list with new score */
4402 deleted = zslDelete(zs->zsl,*oldscore,ele);
fd8ccf44 4403 assert(deleted != 0);
e2665397 4404 zslInsert(zs->zsl,*score,ele);
4405 incrRefCount(ele);
4406 /* Update the score in the hash table */
4407 dictReplace(zs->dict,ele,score);
fd8ccf44 4408 server.dirty++;
2161a965 4409 } else {
4410 zfree(score);
fd8ccf44 4411 }
e2665397 4412 if (doincrement)
4413 addReplyDouble(c,*score);
4414 else
4415 addReply(c,shared.czero);
fd8ccf44 4416 }
4417}
4418
e2665397 4419static void zaddCommand(redisClient *c) {
4420 double scoreval;
4421
4422 scoreval = strtod(c->argv[2]->ptr,NULL);
4423 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4424}
4425
7db723ad 4426static void zincrbyCommand(redisClient *c) {
e2665397 4427 double scoreval;
4428
4429 scoreval = strtod(c->argv[2]->ptr,NULL);
4430 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4431}
4432
1b7106e7 4433static void zremCommand(redisClient *c) {
4434 robj *zsetobj;
4435 zset *zs;
4436
4437 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4438 if (zsetobj == NULL) {
4439 addReply(c,shared.czero);
4440 } else {
4441 dictEntry *de;
4442 double *oldscore;
4443 int deleted;
4444
4445 if (zsetobj->type != REDIS_ZSET) {
4446 addReply(c,shared.wrongtypeerr);
4447 return;
4448 }
4449 zs = zsetobj->ptr;
4450 de = dictFind(zs->dict,c->argv[2]);
4451 if (de == NULL) {
4452 addReply(c,shared.czero);
4453 return;
4454 }
4455 /* Delete from the skiplist */
4456 oldscore = dictGetEntryVal(de);
4457 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4458 assert(deleted != 0);
4459
4460 /* Delete from the hash table */
4461 dictDelete(zs->dict,c->argv[2]);
4462 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4463 server.dirty++;
4464 addReply(c,shared.cone);
4465 }
4466}
4467
1807985b 4468static void zremrangebyscoreCommand(redisClient *c) {
4469 double min = strtod(c->argv[2]->ptr,NULL);
4470 double max = strtod(c->argv[3]->ptr,NULL);
4471 robj *zsetobj;
4472 zset *zs;
4473
4474 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4475 if (zsetobj == NULL) {
4476 addReply(c,shared.czero);
4477 } else {
4478 long deleted;
4479
4480 if (zsetobj->type != REDIS_ZSET) {
4481 addReply(c,shared.wrongtypeerr);
4482 return;
4483 }
4484 zs = zsetobj->ptr;
4485 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4486 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4487 server.dirty += deleted;
4488 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4489 }
4490}
4491
e3870fab 4492static void zrangeGenericCommand(redisClient *c, int reverse) {
cc812361 4493 robj *o;
4494 int start = atoi(c->argv[2]->ptr);
4495 int end = atoi(c->argv[3]->ptr);
4496
4497 o = lookupKeyRead(c->db,c->argv[1]);
4498 if (o == NULL) {
4499 addReply(c,shared.nullmultibulk);
4500 } else {
4501 if (o->type != REDIS_ZSET) {
4502 addReply(c,shared.wrongtypeerr);
4503 } else {
4504 zset *zsetobj = o->ptr;
4505 zskiplist *zsl = zsetobj->zsl;
4506 zskiplistNode *ln;
4507
4508 int llen = zsl->length;
4509 int rangelen, j;
4510 robj *ele;
4511
4512 /* convert negative indexes */
4513 if (start < 0) start = llen+start;
4514 if (end < 0) end = llen+end;
4515 if (start < 0) start = 0;
4516 if (end < 0) end = 0;
4517
4518 /* indexes sanity checks */
4519 if (start > end || start >= llen) {
4520 /* Out of range start or start > end result in empty list */
4521 addReply(c,shared.emptymultibulk);
4522 return;
4523 }
4524 if (end >= llen) end = llen-1;
4525 rangelen = (end-start)+1;
4526
4527 /* Return the result in form of a multi-bulk reply */
e3870fab 4528 if (reverse) {
4529 ln = zsl->tail;
4530 while (start--)
4531 ln = ln->backward;
4532 } else {
4533 ln = zsl->header->forward[0];
4534 while (start--)
4535 ln = ln->forward[0];
4536 }
cc812361 4537
4538 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4539 for (j = 0; j < rangelen; j++) {
0aad7a19 4540 ele = ln->obj;
cc812361 4541 addReplyBulkLen(c,ele);
4542 addReply(c,ele);
4543 addReply(c,shared.crlf);
e3870fab 4544 ln = reverse ? ln->backward : ln->forward[0];
cc812361 4545 }
4546 }
4547 }
4548}
4549
e3870fab 4550static void zrangeCommand(redisClient *c) {
4551 zrangeGenericCommand(c,0);
4552}
4553
4554static void zrevrangeCommand(redisClient *c) {
4555 zrangeGenericCommand(c,1);
4556}
4557
50c55df5 4558static void zrangebyscoreCommand(redisClient *c) {
4559 robj *o;
4560 double min = strtod(c->argv[2]->ptr,NULL);
4561 double max = strtod(c->argv[3]->ptr,NULL);
4562
4563 o = lookupKeyRead(c->db,c->argv[1]);
4564 if (o == NULL) {
4565 addReply(c,shared.nullmultibulk);
4566 } else {
4567 if (o->type != REDIS_ZSET) {
4568 addReply(c,shared.wrongtypeerr);
4569 } else {
4570 zset *zsetobj = o->ptr;
4571 zskiplist *zsl = zsetobj->zsl;
4572 zskiplistNode *ln;
4573 robj *ele, *lenobj;
4574 unsigned int rangelen = 0;
4575
4576 /* Get the first node with the score >= min */
4577 ln = zslFirstWithScore(zsl,min);
4578 if (ln == NULL) {
4579 /* No element matching the speciifed interval */
4580 addReply(c,shared.emptymultibulk);
4581 return;
4582 }
4583
4584 /* We don't know in advance how many matching elements there
4585 * are in the list, so we push this object that will represent
4586 * the multi-bulk length in the output buffer, and will "fix"
4587 * it later */
4588 lenobj = createObject(REDIS_STRING,NULL);
4589 addReply(c,lenobj);
c74e7c77 4590 decrRefCount(lenobj);
50c55df5 4591
dbbc7285 4592 while(ln && ln->score <= max) {
50c55df5 4593 ele = ln->obj;
4594 addReplyBulkLen(c,ele);
4595 addReply(c,ele);
4596 addReply(c,shared.crlf);
4597 ln = ln->forward[0];
4598 rangelen++;
4599 }
4600 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4601 }
4602 }
4603}
4604
3c41331e 4605static void zcardCommand(redisClient *c) {
e197b441 4606 robj *o;
4607 zset *zs;
4608
4609 o = lookupKeyRead(c->db,c->argv[1]);
4610 if (o == NULL) {
4611 addReply(c,shared.czero);
4612 return;
4613 } else {
4614 if (o->type != REDIS_ZSET) {
4615 addReply(c,shared.wrongtypeerr);
4616 } else {
4617 zs = o->ptr;
4618 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4619 }
4620 }
4621}
4622
6e333bbe 4623static void zscoreCommand(redisClient *c) {
4624 robj *o;
4625 zset *zs;
4626
4627 o = lookupKeyRead(c->db,c->argv[1]);
4628 if (o == NULL) {
96d8b4ee 4629 addReply(c,shared.nullbulk);
6e333bbe 4630 return;
4631 } else {
4632 if (o->type != REDIS_ZSET) {
4633 addReply(c,shared.wrongtypeerr);
4634 } else {
4635 dictEntry *de;
4636
4637 zs = o->ptr;
4638 de = dictFind(zs->dict,c->argv[2]);
4639 if (!de) {
4640 addReply(c,shared.nullbulk);
4641 } else {
6e333bbe 4642 double *score = dictGetEntryVal(de);
4643
e2665397 4644 addReplyDouble(c,*score);
6e333bbe 4645 }
4646 }
4647 }
4648}
4649
6b47e12e 4650/* ========================= Non type-specific commands ==================== */
4651
ed9b544e 4652static void flushdbCommand(redisClient *c) {
ca37e9cd 4653 server.dirty += dictSize(c->db->dict);
3305306f 4654 dictEmpty(c->db->dict);
4655 dictEmpty(c->db->expires);
ed9b544e 4656 addReply(c,shared.ok);
ed9b544e 4657}
4658
4659static void flushallCommand(redisClient *c) {
ca37e9cd 4660 server.dirty += emptyDb();
ed9b544e 4661 addReply(c,shared.ok);
f78fd11b 4662 rdbSave(server.dbfilename);
ca37e9cd 4663 server.dirty++;
ed9b544e 4664}
4665
56906eef 4666static redisSortOperation *createSortOperation(int type, robj *pattern) {
ed9b544e 4667 redisSortOperation *so = zmalloc(sizeof(*so));
ed9b544e 4668 so->type = type;
4669 so->pattern = pattern;
4670 return so;
4671}
4672
4673/* Return the value associated to the key with a name obtained
4674 * substituting the first occurence of '*' in 'pattern' with 'subst' */
56906eef 4675static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
ed9b544e 4676 char *p;
4677 sds spat, ssub;
4678 robj keyobj;
4679 int prefixlen, sublen, postfixlen;
ed9b544e 4680 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4681 struct {
f1017b3f 4682 long len;
4683 long free;
ed9b544e 4684 char buf[REDIS_SORTKEY_MAX+1];
4685 } keyname;
4686
28173a49 4687 /* If the pattern is "#" return the substitution object itself in order
4688 * to implement the "SORT ... GET #" feature. */
4689 spat = pattern->ptr;
4690 if (spat[0] == '#' && spat[1] == '\0') {
4691 return subst;
4692 }
4693
4694 /* The substitution object may be specially encoded. If so we create
9d65a1bb 4695 * a decoded object on the fly. Otherwise getDecodedObject will just
4696 * increment the ref count, that we'll decrement later. */
4697 subst = getDecodedObject(subst);
942a3961 4698
ed9b544e 4699 ssub = subst->ptr;
4700 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4701 p = strchr(spat,'*');
ed5a857a 4702 if (!p) {
4703 decrRefCount(subst);
4704 return NULL;
4705 }
ed9b544e 4706
4707 prefixlen = p-spat;
4708 sublen = sdslen(ssub);
4709 postfixlen = sdslen(spat)-(prefixlen+1);
4710 memcpy(keyname.buf,spat,prefixlen);
4711 memcpy(keyname.buf+prefixlen,ssub,sublen);
4712 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4713 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4714 keyname.len = prefixlen+sublen+postfixlen;
4715
4716 keyobj.refcount = 1;
4717 keyobj.type = REDIS_STRING;
4718 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4719
942a3961 4720 decrRefCount(subst);
4721
a4d1ba9a 4722 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3305306f 4723 return lookupKeyRead(db,&keyobj);
ed9b544e 4724}
4725
4726/* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4727 * the additional parameter is not standard but a BSD-specific we have to
4728 * pass sorting parameters via the global 'server' structure */
4729static int sortCompare(const void *s1, const void *s2) {
4730 const redisSortObject *so1 = s1, *so2 = s2;
4731 int cmp;
4732
4733 if (!server.sort_alpha) {
4734 /* Numeric sorting. Here it's trivial as we precomputed scores */
4735 if (so1->u.score > so2->u.score) {
4736 cmp = 1;
4737 } else if (so1->u.score < so2->u.score) {
4738 cmp = -1;
4739 } else {
4740 cmp = 0;
4741 }
4742 } else {
4743 /* Alphanumeric sorting */
4744 if (server.sort_bypattern) {
4745 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4746 /* At least one compare object is NULL */
4747 if (so1->u.cmpobj == so2->u.cmpobj)
4748 cmp = 0;
4749 else if (so1->u.cmpobj == NULL)
4750 cmp = -1;
4751 else
4752 cmp = 1;
4753 } else {
4754 /* We have both the objects, use strcoll */
4755 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4756 }
4757 } else {
4758 /* Compare elements directly */
9d65a1bb 4759 robj *dec1, *dec2;
4760
4761 dec1 = getDecodedObject(so1->obj);
4762 dec2 = getDecodedObject(so2->obj);
4763 cmp = strcoll(dec1->ptr,dec2->ptr);
4764 decrRefCount(dec1);
4765 decrRefCount(dec2);
ed9b544e 4766 }
4767 }
4768 return server.sort_desc ? -cmp : cmp;
4769}
4770
4771/* The SORT command is the most complex command in Redis. Warning: this code
4772 * is optimized for speed and a bit less for readability */
4773static void sortCommand(redisClient *c) {
ed9b544e 4774 list *operations;
4775 int outputlen = 0;
4776 int desc = 0, alpha = 0;
4777 int limit_start = 0, limit_count = -1, start, end;
4778 int j, dontsort = 0, vectorlen;
4779 int getop = 0; /* GET operation counter */
443c6409 4780 robj *sortval, *sortby = NULL, *storekey = NULL;
ed9b544e 4781 redisSortObject *vector; /* Resulting vector to sort */
4782
4783 /* Lookup the key to sort. It must be of the right types */
3305306f 4784 sortval = lookupKeyRead(c->db,c->argv[1]);
4785 if (sortval == NULL) {
c937aa89 4786 addReply(c,shared.nokeyerr);
ed9b544e 4787 return;
4788 }
ed9b544e 4789 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
c937aa89 4790 addReply(c,shared.wrongtypeerr);
ed9b544e 4791 return;
4792 }
4793
4794 /* Create a list of operations to perform for every sorted element.
4795 * Operations can be GET/DEL/INCR/DECR */
4796 operations = listCreate();
092dac2a 4797 listSetFreeMethod(operations,zfree);
ed9b544e 4798 j = 2;
4799
4800 /* Now we need to protect sortval incrementing its count, in the future
4801 * SORT may have options able to overwrite/delete keys during the sorting
4802 * and the sorted key itself may get destroied */
4803 incrRefCount(sortval);
4804
4805 /* The SORT command has an SQL-alike syntax, parse it */
4806 while(j < c->argc) {
4807 int leftargs = c->argc-j-1;
4808 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4809 desc = 0;
4810 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4811 desc = 1;
4812 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4813 alpha = 1;
4814 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4815 limit_start = atoi(c->argv[j+1]->ptr);
4816 limit_count = atoi(c->argv[j+2]->ptr);
4817 j+=2;
443c6409 4818 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4819 storekey = c->argv[j+1];
4820 j++;
ed9b544e 4821 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4822 sortby = c->argv[j+1];
4823 /* If the BY pattern does not contain '*', i.e. it is constant,
4824 * we don't need to sort nor to lookup the weight keys. */
4825 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4826 j++;
4827 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4828 listAddNodeTail(operations,createSortOperation(
4829 REDIS_SORT_GET,c->argv[j+1]));
4830 getop++;
4831 j++;
ed9b544e 4832 } else {
4833 decrRefCount(sortval);
4834 listRelease(operations);
c937aa89 4835 addReply(c,shared.syntaxerr);
ed9b544e 4836 return;
4837 }
4838 j++;
4839 }
4840
4841 /* Load the sorting vector with all the objects to sort */
4842 vectorlen = (sortval->type == REDIS_LIST) ?
4843 listLength((list*)sortval->ptr) :
3305306f 4844 dictSize((dict*)sortval->ptr);
ed9b544e 4845 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
ed9b544e 4846 j = 0;
4847 if (sortval->type == REDIS_LIST) {
4848 list *list = sortval->ptr;
6208b3a7 4849 listNode *ln;
4850
4851 listRewind(list);
4852 while((ln = listYield(list))) {
ed9b544e 4853 robj *ele = ln->value;
4854 vector[j].obj = ele;
4855 vector[j].u.score = 0;
4856 vector[j].u.cmpobj = NULL;
ed9b544e 4857 j++;
4858 }
4859 } else {
4860 dict *set = sortval->ptr;
4861 dictIterator *di;
4862 dictEntry *setele;
4863
4864 di = dictGetIterator(set);
ed9b544e 4865 while((setele = dictNext(di)) != NULL) {
4866 vector[j].obj = dictGetEntryKey(setele);
4867 vector[j].u.score = 0;
4868 vector[j].u.cmpobj = NULL;
4869 j++;
4870 }
4871 dictReleaseIterator(di);
4872 }
4873 assert(j == vectorlen);
4874
4875 /* Now it's time to load the right scores in the sorting vector */
4876 if (dontsort == 0) {
4877 for (j = 0; j < vectorlen; j++) {
4878 if (sortby) {
4879 robj *byval;
4880
3305306f 4881 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
ed9b544e 4882 if (!byval || byval->type != REDIS_STRING) continue;
4883 if (alpha) {
9d65a1bb 4884 vector[j].u.cmpobj = getDecodedObject(byval);
ed9b544e 4885 } else {
942a3961 4886 if (byval->encoding == REDIS_ENCODING_RAW) {
4887 vector[j].u.score = strtod(byval->ptr,NULL);
4888 } else {
9d65a1bb 4889 /* Don't need to decode the object if it's
4890 * integer-encoded (the only encoding supported) so
4891 * far. We can just cast it */
f1017b3f 4892 if (byval->encoding == REDIS_ENCODING_INT) {
942a3961 4893 vector[j].u.score = (long)byval->ptr;
f1017b3f 4894 } else
942a3961 4895 assert(1 != 1);
4896 }
ed9b544e 4897 }
4898 } else {
942a3961 4899 if (!alpha) {
4900 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4901 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4902 else {
4903 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4904 vector[j].u.score = (long) vector[j].obj->ptr;
4905 else
4906 assert(1 != 1);
4907 }
4908 }
ed9b544e 4909 }
4910 }
4911 }
4912
4913 /* We are ready to sort the vector... perform a bit of sanity check
4914 * on the LIMIT option too. We'll use a partial version of quicksort. */
4915 start = (limit_start < 0) ? 0 : limit_start;
4916 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4917 if (start >= vectorlen) {
4918 start = vectorlen-1;
4919 end = vectorlen-2;
4920 }
4921 if (end >= vectorlen) end = vectorlen-1;
4922
4923 if (dontsort == 0) {
4924 server.sort_desc = desc;
4925 server.sort_alpha = alpha;
4926 server.sort_bypattern = sortby ? 1 : 0;
5f5b9840 4927 if (sortby && (start != 0 || end != vectorlen-1))
4928 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4929 else
4930 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
ed9b544e 4931 }
4932
4933 /* Send command output to the output buffer, performing the specified
4934 * GET/DEL/INCR/DECR operations if any. */
4935 outputlen = getop ? getop*(end-start+1) : end-start+1;
443c6409 4936 if (storekey == NULL) {
4937 /* STORE option not specified, sent the sorting result to client */
4938 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4939 for (j = start; j <= end; j++) {
4940 listNode *ln;
4941 if (!getop) {
4942 addReplyBulkLen(c,vector[j].obj);
4943 addReply(c,vector[j].obj);
4944 addReply(c,shared.crlf);
4945 }
4946 listRewind(operations);
4947 while((ln = listYield(operations))) {
4948 redisSortOperation *sop = ln->value;
4949 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4950 vector[j].obj);
4951
4952 if (sop->type == REDIS_SORT_GET) {
4953 if (!val || val->type != REDIS_STRING) {
4954 addReply(c,shared.nullbulk);
4955 } else {
4956 addReplyBulkLen(c,val);
4957 addReply(c,val);
4958 addReply(c,shared.crlf);
4959 }
4960 } else {
4961 assert(sop->type == REDIS_SORT_GET); /* always fails */
4962 }
4963 }
ed9b544e 4964 }
443c6409 4965 } else {
4966 robj *listObject = createListObject();
4967 list *listPtr = (list*) listObject->ptr;
4968
4969 /* STORE option specified, set the sorting result as a List object */
4970 for (j = start; j <= end; j++) {
4971 listNode *ln;
4972 if (!getop) {
4973 listAddNodeTail(listPtr,vector[j].obj);
4974 incrRefCount(vector[j].obj);
4975 }
4976 listRewind(operations);
4977 while((ln = listYield(operations))) {
4978 redisSortOperation *sop = ln->value;
4979 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4980 vector[j].obj);
4981
4982 if (sop->type == REDIS_SORT_GET) {
4983 if (!val || val->type != REDIS_STRING) {
4984 listAddNodeTail(listPtr,createStringObject("",0));
4985 } else {
4986 listAddNodeTail(listPtr,val);
4987 incrRefCount(val);
4988 }
ed9b544e 4989 } else {
443c6409 4990 assert(sop->type == REDIS_SORT_GET); /* always fails */
ed9b544e 4991 }
ed9b544e 4992 }
ed9b544e 4993 }
121796f7 4994 if (dictReplace(c->db->dict,storekey,listObject)) {
4995 incrRefCount(storekey);
4996 }
443c6409 4997 /* Note: we add 1 because the DB is dirty anyway since even if the
4998 * SORT result is empty a new key is set and maybe the old content
4999 * replaced. */
5000 server.dirty += 1+outputlen;
5001 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
ed9b544e 5002 }
5003
5004 /* Cleanup */
5005 decrRefCount(sortval);
5006 listRelease(operations);
5007 for (j = 0; j < vectorlen; j++) {
5008 if (sortby && alpha && vector[j].u.cmpobj)
5009 decrRefCount(vector[j].u.cmpobj);
5010 }
5011 zfree(vector);
5012}
5013
1c85b79f 5014/* Create the string returned by the INFO command. This is decoupled
5015 * by the INFO command itself as we need to report the same information
5016 * on memory corruption problems. */
5017static sds genRedisInfoString(void) {
ed9b544e 5018 sds info;
5019 time_t uptime = time(NULL)-server.stat_starttime;
c3cb078d 5020 int j;
ed9b544e 5021
5022 info = sdscatprintf(sdsempty(),
5023 "redis_version:%s\r\n"
f1017b3f 5024 "arch_bits:%s\r\n"
7a932b74 5025 "multiplexing_api:%s\r\n"
a0f643ea 5026 "uptime_in_seconds:%d\r\n"
5027 "uptime_in_days:%d\r\n"
ed9b544e 5028 "connected_clients:%d\r\n"
5029 "connected_slaves:%d\r\n"
5fba9f71 5030 "used_memory:%zu\r\n"
ed9b544e 5031 "changes_since_last_save:%lld\r\n"
be2bb6b0 5032 "bgsave_in_progress:%d\r\n"
ed9b544e 5033 "last_save_time:%d\r\n"
5034 "total_connections_received:%lld\r\n"
5035 "total_commands_processed:%lld\r\n"
a0f643ea 5036 "role:%s\r\n"
ed9b544e 5037 ,REDIS_VERSION,
f1017b3f 5038 (sizeof(long) == 8) ? "64" : "32",
7a932b74 5039 aeGetApiName(),
a0f643ea 5040 uptime,
5041 uptime/(3600*24),
ed9b544e 5042 listLength(server.clients)-listLength(server.slaves),
5043 listLength(server.slaves),
5044 server.usedmemory,
5045 server.dirty,
9d65a1bb 5046 server.bgsavechildpid != -1,
ed9b544e 5047 server.lastsave,
5048 server.stat_numconnections,
5049 server.stat_numcommands,
a0f643ea 5050 server.masterhost == NULL ? "master" : "slave"
ed9b544e 5051 );
a0f643ea 5052 if (server.masterhost) {
5053 info = sdscatprintf(info,
5054 "master_host:%s\r\n"
5055 "master_port:%d\r\n"
5056 "master_link_status:%s\r\n"
5057 "master_last_io_seconds_ago:%d\r\n"
5058 ,server.masterhost,
5059 server.masterport,
5060 (server.replstate == REDIS_REPL_CONNECTED) ?
5061 "up" : "down",
f72b934d 5062 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
a0f643ea 5063 );
5064 }
c3cb078d 5065 for (j = 0; j < server.dbnum; j++) {
5066 long long keys, vkeys;
5067
5068 keys = dictSize(server.db[j].dict);
5069 vkeys = dictSize(server.db[j].expires);
5070 if (keys || vkeys) {
9d65a1bb 5071 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
c3cb078d 5072 j, keys, vkeys);
5073 }
5074 }
1c85b79f 5075 return info;
5076}
5077
5078static void infoCommand(redisClient *c) {
5079 sds info = genRedisInfoString();
c937aa89 5080 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
ed9b544e 5081 addReplySds(c,info);
70003d28 5082 addReply(c,shared.crlf);
ed9b544e 5083}
5084
3305306f 5085static void monitorCommand(redisClient *c) {
5086 /* ignore MONITOR if aleady slave or in monitor mode */
5087 if (c->flags & REDIS_SLAVE) return;
5088
5089 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5090 c->slaveseldb = 0;
6b47e12e 5091 listAddNodeTail(server.monitors,c);
3305306f 5092 addReply(c,shared.ok);
5093}
5094
5095/* ================================= Expire ================================= */
5096static int removeExpire(redisDb *db, robj *key) {
5097 if (dictDelete(db->expires,key) == DICT_OK) {
5098 return 1;
5099 } else {
5100 return 0;
5101 }
5102}
5103
5104static int setExpire(redisDb *db, robj *key, time_t when) {
5105 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5106 return 0;
5107 } else {
5108 incrRefCount(key);
5109 return 1;
5110 }
5111}
5112
bb32ede5 5113/* Return the expire time of the specified key, or -1 if no expire
5114 * is associated with this key (i.e. the key is non volatile) */
5115static time_t getExpire(redisDb *db, robj *key) {
5116 dictEntry *de;
5117
5118 /* No expire? return ASAP */
5119 if (dictSize(db->expires) == 0 ||
5120 (de = dictFind(db->expires,key)) == NULL) return -1;
5121
5122 return (time_t) dictGetEntryVal(de);
5123}
5124
3305306f 5125static int expireIfNeeded(redisDb *db, robj *key) {
5126 time_t when;
5127 dictEntry *de;
5128
5129 /* No expire? return ASAP */
5130 if (dictSize(db->expires) == 0 ||
5131 (de = dictFind(db->expires,key)) == NULL) return 0;
5132
5133 /* Lookup the expire */
5134 when = (time_t) dictGetEntryVal(de);
5135 if (time(NULL) <= when) return 0;
5136
5137 /* Delete the key */
5138 dictDelete(db->expires,key);
5139 return dictDelete(db->dict,key) == DICT_OK;
5140}
5141
5142static int deleteIfVolatile(redisDb *db, robj *key) {
5143 dictEntry *de;
5144
5145 /* No expire? return ASAP */
5146 if (dictSize(db->expires) == 0 ||
5147 (de = dictFind(db->expires,key)) == NULL) return 0;
5148
5149 /* Delete the key */
0c66a471 5150 server.dirty++;
3305306f 5151 dictDelete(db->expires,key);
5152 return dictDelete(db->dict,key) == DICT_OK;
5153}
5154
802e8373 5155static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
3305306f 5156 dictEntry *de;
3305306f 5157
802e8373 5158 de = dictFind(c->db->dict,key);
3305306f 5159 if (de == NULL) {
5160 addReply(c,shared.czero);
5161 return;
5162 }
43e5ccdf 5163 if (seconds < 0) {
5164 if (deleteKey(c->db,key)) server.dirty++;
5165 addReply(c, shared.cone);
3305306f 5166 return;
5167 } else {
5168 time_t when = time(NULL)+seconds;
802e8373 5169 if (setExpire(c->db,key,when)) {
3305306f 5170 addReply(c,shared.cone);
77423026 5171 server.dirty++;
5172 } else {
3305306f 5173 addReply(c,shared.czero);
77423026 5174 }
3305306f 5175 return;
5176 }
5177}
5178
802e8373 5179static void expireCommand(redisClient *c) {
5180 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5181}
5182
5183static void expireatCommand(redisClient *c) {
5184 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5185}
5186
fd88489a 5187static void ttlCommand(redisClient *c) {
5188 time_t expire;
5189 int ttl = -1;
5190
5191 expire = getExpire(c->db,c->argv[1]);
5192 if (expire != -1) {
5193 ttl = (int) (expire-time(NULL));
5194 if (ttl < 0) ttl = -1;
5195 }
5196 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5197}
5198
ed9b544e 5199/* =============================== Replication ============================= */
5200
a4d1ba9a 5201static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
ed9b544e 5202 ssize_t nwritten, ret = size;
5203 time_t start = time(NULL);
5204
5205 timeout++;
5206 while(size) {
5207 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5208 nwritten = write(fd,ptr,size);
5209 if (nwritten == -1) return -1;
5210 ptr += nwritten;
5211 size -= nwritten;
5212 }
5213 if ((time(NULL)-start) > timeout) {
5214 errno = ETIMEDOUT;
5215 return -1;
5216 }
5217 }
5218 return ret;
5219}
5220
a4d1ba9a 5221static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
ed9b544e 5222 ssize_t nread, totread = 0;
5223 time_t start = time(NULL);
5224
5225 timeout++;
5226 while(size) {
5227 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5228 nread = read(fd,ptr,size);
5229 if (nread == -1) return -1;
5230 ptr += nread;
5231 size -= nread;
5232 totread += nread;
5233 }
5234 if ((time(NULL)-start) > timeout) {
5235 errno = ETIMEDOUT;
5236 return -1;
5237 }
5238 }
5239 return totread;
5240}
5241
5242static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5243 ssize_t nread = 0;
5244
5245 size--;
5246 while(size) {
5247 char c;
5248
5249 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5250 if (c == '\n') {
5251 *ptr = '\0';
5252 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5253 return nread;
5254 } else {
5255 *ptr++ = c;
5256 *ptr = '\0';
5257 nread++;
5258 }
5259 }
5260 return nread;
5261}
5262
5263static void syncCommand(redisClient *c) {
40d224a9 5264 /* ignore SYNC if aleady slave or in monitor mode */
5265 if (c->flags & REDIS_SLAVE) return;
5266
5267 /* SYNC can't be issued when the server has pending data to send to
5268 * the client about already issued commands. We need a fresh reply
5269 * buffer registering the differences between the BGSAVE and the current
5270 * dataset, so that we can copy to other slaves if needed. */
5271 if (listLength(c->reply) != 0) {
5272 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5273 return;
5274 }
5275
5276 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5277 /* Here we need to check if there is a background saving operation
5278 * in progress, or if it is required to start one */
9d65a1bb 5279 if (server.bgsavechildpid != -1) {
40d224a9 5280 /* Ok a background save is in progress. Let's check if it is a good
5281 * one for replication, i.e. if there is another slave that is
5282 * registering differences since the server forked to save */
5283 redisClient *slave;
5284 listNode *ln;
5285
6208b3a7 5286 listRewind(server.slaves);
5287 while((ln = listYield(server.slaves))) {
40d224a9 5288 slave = ln->value;
5289 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
40d224a9 5290 }
5291 if (ln) {
5292 /* Perfect, the server is already registering differences for
5293 * another slave. Set the right state, and copy the buffer. */
5294 listRelease(c->reply);
5295 c->reply = listDup(slave->reply);
40d224a9 5296 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5297 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5298 } else {
5299 /* No way, we need to wait for the next BGSAVE in order to
5300 * register differences */
5301 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5302 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5303 }
5304 } else {
5305 /* Ok we don't have a BGSAVE in progress, let's start one */
5306 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5307 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5308 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5309 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5310 return;
5311 }
5312 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5313 }
6208b3a7 5314 c->repldbfd = -1;
40d224a9 5315 c->flags |= REDIS_SLAVE;
5316 c->slaveseldb = 0;
6b47e12e 5317 listAddNodeTail(server.slaves,c);
40d224a9 5318 return;
5319}
5320
6208b3a7 5321static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5322 redisClient *slave = privdata;
5323 REDIS_NOTUSED(el);
5324 REDIS_NOTUSED(mask);
5325 char buf[REDIS_IOBUF_LEN];
5326 ssize_t nwritten, buflen;
5327
5328 if (slave->repldboff == 0) {
5329 /* Write the bulk write count before to transfer the DB. In theory here
5330 * we don't know how much room there is in the output buffer of the
5331 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5332 * operations) will never be smaller than the few bytes we need. */
5333 sds bulkcount;
5334
5335 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5336 slave->repldbsize);
5337 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5338 {
5339 sdsfree(bulkcount);
5340 freeClient(slave);
5341 return;
5342 }
5343 sdsfree(bulkcount);
5344 }
5345 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5346 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5347 if (buflen <= 0) {
5348 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5349 (buflen == 0) ? "premature EOF" : strerror(errno));
5350 freeClient(slave);
5351 return;
5352 }
5353 if ((nwritten = write(fd,buf,buflen)) == -1) {
5354 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5355 strerror(errno));
5356 freeClient(slave);
5357 return;
5358 }
5359 slave->repldboff += nwritten;
5360 if (slave->repldboff == slave->repldbsize) {
5361 close(slave->repldbfd);
5362 slave->repldbfd = -1;
5363 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5364 slave->replstate = REDIS_REPL_ONLINE;
5365 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
266373b2 5366 sendReplyToClient, slave) == AE_ERR) {
6208b3a7 5367 freeClient(slave);
5368 return;
5369 }
5370 addReplySds(slave,sdsempty());
5371 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5372 }
5373}
ed9b544e 5374
a3b21203 5375/* This function is called at the end of every backgrond saving.
5376 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5377 * otherwise REDIS_ERR is passed to the function.
5378 *
5379 * The goal of this function is to handle slaves waiting for a successful
5380 * background saving in order to perform non-blocking synchronization. */
5381static void updateSlavesWaitingBgsave(int bgsaveerr) {
6208b3a7 5382 listNode *ln;
5383 int startbgsave = 0;
ed9b544e 5384
6208b3a7 5385 listRewind(server.slaves);
5386 while((ln = listYield(server.slaves))) {
5387 redisClient *slave = ln->value;
ed9b544e 5388
6208b3a7 5389 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5390 startbgsave = 1;
5391 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5392 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
dde65f3f 5393 struct redis_stat buf;
6208b3a7 5394
5395 if (bgsaveerr != REDIS_OK) {
5396 freeClient(slave);
5397 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5398 continue;
5399 }
5400 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
dde65f3f 5401 redis_fstat(slave->repldbfd,&buf) == -1) {
6208b3a7 5402 freeClient(slave);
5403 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5404 continue;
5405 }
5406 slave->repldboff = 0;
5407 slave->repldbsize = buf.st_size;
5408 slave->replstate = REDIS_REPL_SEND_BULK;
5409 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
266373b2 5410 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
6208b3a7 5411 freeClient(slave);
5412 continue;
5413 }
5414 }
ed9b544e 5415 }
6208b3a7 5416 if (startbgsave) {
5417 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5418 listRewind(server.slaves);
5419 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5420 while((ln = listYield(server.slaves))) {
5421 redisClient *slave = ln->value;
ed9b544e 5422
6208b3a7 5423 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5424 freeClient(slave);
5425 }
5426 }
5427 }
ed9b544e 5428}
5429
5430static int syncWithMaster(void) {
d0ccebcf 5431 char buf[1024], tmpfile[256], authcmd[1024];
ed9b544e 5432 int dumpsize;
5433 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5434 int dfd;
5435
5436 if (fd == -1) {
5437 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5438 strerror(errno));
5439 return REDIS_ERR;
5440 }
d0ccebcf 5441
5442 /* AUTH with the master if required. */
5443 if(server.masterauth) {
5444 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5445 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5446 close(fd);
5447 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5448 strerror(errno));
5449 return REDIS_ERR;
5450 }
5451 /* Read the AUTH result. */
5452 if (syncReadLine(fd,buf,1024,3600) == -1) {
5453 close(fd);
5454 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5455 strerror(errno));
5456 return REDIS_ERR;
5457 }
5458 if (buf[0] != '+') {
5459 close(fd);
5460 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5461 return REDIS_ERR;
5462 }
5463 }
5464
ed9b544e 5465 /* Issue the SYNC command */
5466 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5467 close(fd);
5468 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5469 strerror(errno));
5470 return REDIS_ERR;
5471 }
5472 /* Read the bulk write count */
8c4d91fc 5473 if (syncReadLine(fd,buf,1024,3600) == -1) {
ed9b544e 5474 close(fd);
5475 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5476 strerror(errno));
5477 return REDIS_ERR;
5478 }
4aa701c1 5479 if (buf[0] != '$') {
5480 close(fd);
5481 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5482 return REDIS_ERR;
5483 }
c937aa89 5484 dumpsize = atoi(buf+1);
ed9b544e 5485 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5486 /* Read the bulk write data on a temp file */
5487 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5488 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5489 if (dfd == -1) {
5490 close(fd);
5491 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5492 return REDIS_ERR;
5493 }
5494 while(dumpsize) {
5495 int nread, nwritten;
5496
5497 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5498 if (nread == -1) {
5499 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5500 strerror(errno));
5501 close(fd);
5502 close(dfd);
5503 return REDIS_ERR;
5504 }
5505 nwritten = write(dfd,buf,nread);
5506 if (nwritten == -1) {
5507 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5508 close(fd);
5509 close(dfd);
5510 return REDIS_ERR;
5511 }
5512 dumpsize -= nread;
5513 }
5514 close(dfd);
5515 if (rename(tmpfile,server.dbfilename) == -1) {
5516 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5517 unlink(tmpfile);
5518 close(fd);
5519 return REDIS_ERR;
5520 }
5521 emptyDb();
f78fd11b 5522 if (rdbLoad(server.dbfilename) != REDIS_OK) {
ed9b544e 5523 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5524 close(fd);
5525 return REDIS_ERR;
5526 }
5527 server.master = createClient(fd);
5528 server.master->flags |= REDIS_MASTER;
5529 server.replstate = REDIS_REPL_CONNECTED;
5530 return REDIS_OK;
5531}
5532
321b0e13 5533static void slaveofCommand(redisClient *c) {
5534 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5535 !strcasecmp(c->argv[2]->ptr,"one")) {
5536 if (server.masterhost) {
5537 sdsfree(server.masterhost);
5538 server.masterhost = NULL;
5539 if (server.master) freeClient(server.master);
5540 server.replstate = REDIS_REPL_NONE;
5541 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5542 }
5543 } else {
5544 sdsfree(server.masterhost);
5545 server.masterhost = sdsdup(c->argv[1]->ptr);
5546 server.masterport = atoi(c->argv[2]->ptr);
5547 if (server.master) freeClient(server.master);
5548 server.replstate = REDIS_REPL_CONNECT;
5549 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5550 server.masterhost, server.masterport);
5551 }
5552 addReply(c,shared.ok);
5553}
5554
3fd78bcd 5555/* ============================ Maxmemory directive ======================== */
5556
5557/* This function gets called when 'maxmemory' is set on the config file to limit
5558 * the max memory used by the server, and we are out of memory.
5559 * This function will try to, in order:
5560 *
5561 * - Free objects from the free list
5562 * - Try to remove keys with an EXPIRE set
5563 *
5564 * It is not possible to free enough memory to reach used-memory < maxmemory
5565 * the server will start refusing commands that will enlarge even more the
5566 * memory usage.
5567 */
5568static void freeMemoryIfNeeded(void) {
5569 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5570 if (listLength(server.objfreelist)) {
5571 robj *o;
5572
5573 listNode *head = listFirst(server.objfreelist);
5574 o = listNodeValue(head);
5575 listDelNode(server.objfreelist,head);
5576 zfree(o);
5577 } else {
5578 int j, k, freed = 0;
5579
5580 for (j = 0; j < server.dbnum; j++) {
5581 int minttl = -1;
5582 robj *minkey = NULL;
5583 struct dictEntry *de;
5584
5585 if (dictSize(server.db[j].expires)) {
5586 freed = 1;
5587 /* From a sample of three keys drop the one nearest to
5588 * the natural expire */
5589 for (k = 0; k < 3; k++) {
5590 time_t t;
5591
5592 de = dictGetRandomKey(server.db[j].expires);
5593 t = (time_t) dictGetEntryVal(de);
5594 if (minttl == -1 || t < minttl) {
5595 minkey = dictGetEntryKey(de);
5596 minttl = t;
5597 }
5598 }
5599 deleteKey(server.db+j,minkey);
5600 }
5601 }
5602 if (!freed) return; /* nothing to free... */
5603 }
5604 }
5605}
5606
f80dff62 5607/* ============================== Append Only file ========================== */
5608
5609static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5610 sds buf = sdsempty();
5611 int j;
5612 ssize_t nwritten;
5613 time_t now;
5614 robj *tmpargv[3];
5615
5616 /* The DB this command was targetting is not the same as the last command
5617 * we appendend. To issue a SELECT command is needed. */
5618 if (dictid != server.appendseldb) {
5619 char seldb[64];
5620
5621 snprintf(seldb,sizeof(seldb),"%d",dictid);
5622 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
5623 strlen(seldb),seldb);
5624 server.appendseldb = dictid;
5625 }
5626
5627 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5628 * EXPIREs into EXPIREATs calls */
5629 if (cmd->proc == expireCommand) {
5630 long when;
5631
5632 tmpargv[0] = createStringObject("EXPIREAT",8);
5633 tmpargv[1] = argv[1];
5634 incrRefCount(argv[1]);
5635 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5636 tmpargv[2] = createObject(REDIS_STRING,
5637 sdscatprintf(sdsempty(),"%ld",when));
5638 argv = tmpargv;
5639 }
5640
5641 /* Append the actual command */
5642 buf = sdscatprintf(buf,"*%d\r\n",argc);
5643 for (j = 0; j < argc; j++) {
5644 robj *o = argv[j];
5645
9d65a1bb 5646 o = getDecodedObject(o);
f80dff62 5647 buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
5648 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5649 buf = sdscatlen(buf,"\r\n",2);
9d65a1bb 5650 decrRefCount(o);
f80dff62 5651 }
5652
5653 /* Free the objects from the modified argv for EXPIREAT */
5654 if (cmd->proc == expireCommand) {
5655 for (j = 0; j < 3; j++)
5656 decrRefCount(argv[j]);
5657 }
5658
5659 /* We want to perform a single write. This should be guaranteed atomic
5660 * at least if the filesystem we are writing is a real physical one.
5661 * While this will save us against the server being killed I don't think
5662 * there is much to do about the whole server stopping for power problems
5663 * or alike */
5664 nwritten = write(server.appendfd,buf,sdslen(buf));
5665 if (nwritten != (signed)sdslen(buf)) {
5666 /* Ooops, we are in troubles. The best thing to do for now is
5667 * to simply exit instead to give the illusion that everything is
5668 * working as expected. */
5669 if (nwritten == -1) {
5670 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5671 } else {
5672 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5673 }
5674 exit(1);
5675 }
85a83172 5676 /* If a background append only file rewriting is in progress we want to
5677 * accumulate the differences between the child DB and the current one
5678 * in a buffer, so that when the child process will do its work we
5679 * can append the differences to the new append only file. */
5680 if (server.bgrewritechildpid != -1)
5681 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
5682
5683 sdsfree(buf);
f80dff62 5684 now = time(NULL);
5685 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5686 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5687 now-server.lastfsync > 1))
5688 {
5689 fsync(server.appendfd); /* Let's try to get this data on the disk */
5690 server.lastfsync = now;
5691 }
5692}
5693
5694/* In Redis commands are always executed in the context of a client, so in
5695 * order to load the append only file we need to create a fake client. */
5696static struct redisClient *createFakeClient(void) {
5697 struct redisClient *c = zmalloc(sizeof(*c));
5698
5699 selectDb(c,0);
5700 c->fd = -1;
5701 c->querybuf = sdsempty();
5702 c->argc = 0;
5703 c->argv = NULL;
5704 c->flags = 0;
9387d17d 5705 /* We set the fake client as a slave waiting for the synchronization
5706 * so that Redis will not try to send replies to this client. */
5707 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
f80dff62 5708 c->reply = listCreate();
5709 listSetFreeMethod(c->reply,decrRefCount);
5710 listSetDupMethod(c->reply,dupClientReplyValue);
5711 return c;
5712}
5713
5714static void freeFakeClient(struct redisClient *c) {
5715 sdsfree(c->querybuf);
5716 listRelease(c->reply);
5717 zfree(c);
5718}
5719
5720/* Replay the append log file. On error REDIS_OK is returned. On non fatal
5721 * error (the append only file is zero-length) REDIS_ERR is returned. On
5722 * fatal error an error message is logged and the program exists. */
5723int loadAppendOnlyFile(char *filename) {
5724 struct redisClient *fakeClient;
5725 FILE *fp = fopen(filename,"r");
5726 struct redis_stat sb;
5727
5728 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5729 return REDIS_ERR;
5730
5731 if (fp == NULL) {
5732 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5733 exit(1);
5734 }
5735
5736 fakeClient = createFakeClient();
5737 while(1) {
5738 int argc, j;
5739 unsigned long len;
5740 robj **argv;
5741 char buf[128];
5742 sds argsds;
5743 struct redisCommand *cmd;
5744
5745 if (fgets(buf,sizeof(buf),fp) == NULL) {
5746 if (feof(fp))
5747 break;
5748 else
5749 goto readerr;
5750 }
5751 if (buf[0] != '*') goto fmterr;
5752 argc = atoi(buf+1);
5753 argv = zmalloc(sizeof(robj*)*argc);
5754 for (j = 0; j < argc; j++) {
5755 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5756 if (buf[0] != '$') goto fmterr;
5757 len = strtol(buf+1,NULL,10);
5758 argsds = sdsnewlen(NULL,len);
0f151ef1 5759 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
f80dff62 5760 argv[j] = createObject(REDIS_STRING,argsds);
5761 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5762 }
5763
5764 /* Command lookup */
5765 cmd = lookupCommand(argv[0]->ptr);
5766 if (!cmd) {
5767 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5768 exit(1);
5769 }
5770 /* Try object sharing and encoding */
5771 if (server.shareobjects) {
5772 int j;
5773 for(j = 1; j < argc; j++)
5774 argv[j] = tryObjectSharing(argv[j]);
5775 }
5776 if (cmd->flags & REDIS_CMD_BULK)
5777 tryObjectEncoding(argv[argc-1]);
5778 /* Run the command in the context of a fake client */
5779 fakeClient->argc = argc;
5780 fakeClient->argv = argv;
5781 cmd->proc(fakeClient);
5782 /* Discard the reply objects list from the fake client */
5783 while(listLength(fakeClient->reply))
5784 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5785 /* Clean up, ready for the next command */
5786 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5787 zfree(argv);
5788 }
5789 fclose(fp);
5790 freeFakeClient(fakeClient);
5791 return REDIS_OK;
5792
5793readerr:
5794 if (feof(fp)) {
5795 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5796 } else {
5797 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5798 }
5799 exit(1);
5800fmterr:
5801 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5802 exit(1);
5803}
5804
9d65a1bb 5805/* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5806static int fwriteBulk(FILE *fp, robj *obj) {
5807 char buf[128];
5808 obj = getDecodedObject(obj);
5809 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
5810 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
5811 if (fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0) goto err;
5812 if (fwrite("\r\n",2,1,fp) == 0) goto err;
5813 decrRefCount(obj);
5814 return 1;
5815err:
5816 decrRefCount(obj);
5817 return 0;
5818}
5819
5820/* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5821static int fwriteBulkDouble(FILE *fp, double d) {
5822 char buf[128], dbuf[128];
5823
5824 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
5825 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
5826 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5827 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
5828 return 1;
5829}
5830
5831/* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5832static int fwriteBulkLong(FILE *fp, long l) {
5833 char buf[128], lbuf[128];
5834
5835 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
5836 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
5837 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5838 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
5839 return 1;
5840}
5841
5842/* Write a sequence of commands able to fully rebuild the dataset into
5843 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5844static int rewriteAppendOnlyFile(char *filename) {
5845 dictIterator *di = NULL;
5846 dictEntry *de;
5847 FILE *fp;
5848 char tmpfile[256];
5849 int j;
5850 time_t now = time(NULL);
5851
5852 /* Note that we have to use a different temp name here compared to the
5853 * one used by rewriteAppendOnlyFileBackground() function. */
5854 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
5855 fp = fopen(tmpfile,"w");
5856 if (!fp) {
5857 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
5858 return REDIS_ERR;
5859 }
5860 for (j = 0; j < server.dbnum; j++) {
5861 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
5862 redisDb *db = server.db+j;
5863 dict *d = db->dict;
5864 if (dictSize(d) == 0) continue;
5865 di = dictGetIterator(d);
5866 if (!di) {
5867 fclose(fp);
5868 return REDIS_ERR;
5869 }
5870
5871 /* SELECT the new DB */
5872 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
85a83172 5873 if (fwriteBulkLong(fp,j) == 0) goto werr;
9d65a1bb 5874
5875 /* Iterate this DB writing every entry */
5876 while((de = dictNext(di)) != NULL) {
5877 robj *key = dictGetEntryKey(de);
5878 robj *o = dictGetEntryVal(de);
5879 time_t expiretime = getExpire(db,key);
5880
5881 /* Save the key and associated value */
9d65a1bb 5882 if (o->type == REDIS_STRING) {
5883 /* Emit a SET command */
5884 char cmd[]="*3\r\n$3\r\nSET\r\n";
5885 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5886 /* Key and value */
5887 if (fwriteBulk(fp,key) == 0) goto werr;
5888 if (fwriteBulk(fp,o) == 0) goto werr;
5889 } else if (o->type == REDIS_LIST) {
5890 /* Emit the RPUSHes needed to rebuild the list */
5891 list *list = o->ptr;
5892 listNode *ln;
5893
5894 listRewind(list);
5895 while((ln = listYield(list))) {
5896 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
5897 robj *eleobj = listNodeValue(ln);
5898
5899 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5900 if (fwriteBulk(fp,key) == 0) goto werr;
5901 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5902 }
5903 } else if (o->type == REDIS_SET) {
5904 /* Emit the SADDs needed to rebuild the set */
5905 dict *set = o->ptr;
5906 dictIterator *di = dictGetIterator(set);
5907 dictEntry *de;
5908
5909 while((de = dictNext(di)) != NULL) {
5910 char cmd[]="*3\r\n$4\r\nSADD\r\n";
5911 robj *eleobj = dictGetEntryKey(de);
5912
5913 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5914 if (fwriteBulk(fp,key) == 0) goto werr;
5915 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5916 }
5917 dictReleaseIterator(di);
5918 } else if (o->type == REDIS_ZSET) {
5919 /* Emit the ZADDs needed to rebuild the sorted set */
5920 zset *zs = o->ptr;
5921 dictIterator *di = dictGetIterator(zs->dict);
5922 dictEntry *de;
5923
5924 while((de = dictNext(di)) != NULL) {
5925 char cmd[]="*4\r\n$4\r\nZADD\r\n";
5926 robj *eleobj = dictGetEntryKey(de);
5927 double *score = dictGetEntryVal(de);
5928
5929 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5930 if (fwriteBulk(fp,key) == 0) goto werr;
5931 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
5932 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5933 }
5934 dictReleaseIterator(di);
5935 } else {
5936 assert(0 != 0);
5937 }
5938 /* Save the expire time */
5939 if (expiretime != -1) {
5940 char cmd[]="*3\r\n$6\r\nEXPIRE\r\n";
5941 /* If this key is already expired skip it */
5942 if (expiretime < now) continue;
5943 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5944 if (fwriteBulk(fp,key) == 0) goto werr;
5945 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
5946 }
5947 }
5948 dictReleaseIterator(di);
5949 }
5950
5951 /* Make sure data will not remain on the OS's output buffers */
5952 fflush(fp);
5953 fsync(fileno(fp));
5954 fclose(fp);
5955
5956 /* Use RENAME to make sure the DB file is changed atomically only
5957 * if the generate DB file is ok. */
5958 if (rename(tmpfile,filename) == -1) {
5959 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
5960 unlink(tmpfile);
5961 return REDIS_ERR;
5962 }
5963 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
5964 return REDIS_OK;
5965
5966werr:
5967 fclose(fp);
5968 unlink(tmpfile);
5969 redisLog(REDIS_WARNING,"Write error writing append only fileon disk: %s", strerror(errno));
5970 if (di) dictReleaseIterator(di);
5971 return REDIS_ERR;
5972}
5973
5974/* This is how rewriting of the append only file in background works:
5975 *
5976 * 1) The user calls BGREWRITEAOF
5977 * 2) Redis calls this function, that forks():
5978 * 2a) the child rewrite the append only file in a temp file.
5979 * 2b) the parent accumulates differences in server.bgrewritebuf.
5980 * 3) When the child finished '2a' exists.
5981 * 4) The parent will trap the exit code, if it's OK, will append the
5982 * data accumulated into server.bgrewritebuf into the temp file, and
5983 * finally will rename(2) the temp file in the actual file name.
5984 * The the new file is reopened as the new append only file. Profit!
5985 */
5986static int rewriteAppendOnlyFileBackground(void) {
5987 pid_t childpid;
5988
5989 if (server.bgrewritechildpid != -1) return REDIS_ERR;
5990 if ((childpid = fork()) == 0) {
5991 /* Child */
5992 char tmpfile[256];
5993 close(server.fd);
5994
5995 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
5996 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
5997 exit(0);
5998 } else {
5999 exit(1);
6000 }
6001 } else {
6002 /* Parent */
6003 if (childpid == -1) {
6004 redisLog(REDIS_WARNING,
6005 "Can't rewrite append only file in background: fork: %s",
6006 strerror(errno));
6007 return REDIS_ERR;
6008 }
6009 redisLog(REDIS_NOTICE,
6010 "Background append only file rewriting started by pid %d",childpid);
6011 server.bgrewritechildpid = childpid;
85a83172 6012 /* We set appendseldb to -1 in order to force the next call to the
6013 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6014 * accumulated by the parent into server.bgrewritebuf will start
6015 * with a SELECT statement and it will be safe to merge. */
6016 server.appendseldb = -1;
9d65a1bb 6017 return REDIS_OK;
6018 }
6019 return REDIS_OK; /* unreached */
6020}
6021
6022static void bgrewriteaofCommand(redisClient *c) {
6023 if (server.bgrewritechildpid != -1) {
6024 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6025 return;
6026 }
6027 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6028 addReply(c,shared.ok);
6029 } else {
6030 addReply(c,shared.err);
6031 }
6032}
6033
6034static void aofRemoveTempFile(pid_t childpid) {
6035 char tmpfile[256];
6036
6037 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6038 unlink(tmpfile);
6039}
6040
7f957c92 6041/* ================================= Debugging ============================== */
6042
6043static void debugCommand(redisClient *c) {
6044 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6045 *((char*)-1) = 'x';
210e29f7 6046 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6047 if (rdbSave(server.dbfilename) != REDIS_OK) {
6048 addReply(c,shared.err);
6049 return;
6050 }
6051 emptyDb();
6052 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6053 addReply(c,shared.err);
6054 return;
6055 }
6056 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6057 addReply(c,shared.ok);
333298da 6058 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6059 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6060 robj *key, *val;
6061
6062 if (!de) {
6063 addReply(c,shared.nokeyerr);
6064 return;
6065 }
6066 key = dictGetEntryKey(de);
6067 val = dictGetEntryVal(de);
6068 addReplySds(c,sdscatprintf(sdsempty(),
942a3961 6069 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6070 key, key->refcount, val, val->refcount, val->encoding));
7f957c92 6071 } else {
333298da 6072 addReplySds(c,sdsnew(
210e29f7 6073 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
7f957c92 6074 }
6075}
56906eef 6076
bcfc686d 6077/* =================================== Main! ================================ */
56906eef 6078
bcfc686d 6079#ifdef __linux__
6080int linuxOvercommitMemoryValue(void) {
6081 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6082 char buf[64];
56906eef 6083
bcfc686d 6084 if (!fp) return -1;
6085 if (fgets(buf,64,fp) == NULL) {
6086 fclose(fp);
6087 return -1;
6088 }
6089 fclose(fp);
56906eef 6090
bcfc686d 6091 return atoi(buf);
6092}
6093
6094void linuxOvercommitMemoryWarning(void) {
6095 if (linuxOvercommitMemoryValue() == 0) {
6096 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6097 }
6098}
6099#endif /* __linux__ */
6100
6101static void daemonize(void) {
6102 int fd;
6103 FILE *fp;
6104
6105 if (fork() != 0) exit(0); /* parent exits */
6106 setsid(); /* create a new session */
6107
6108 /* Every output goes to /dev/null. If Redis is daemonized but
6109 * the 'logfile' is set to 'stdout' in the configuration file
6110 * it will not log at all. */
6111 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6112 dup2(fd, STDIN_FILENO);
6113 dup2(fd, STDOUT_FILENO);
6114 dup2(fd, STDERR_FILENO);
6115 if (fd > STDERR_FILENO) close(fd);
6116 }
6117 /* Try to write the pid file */
6118 fp = fopen(server.pidfile,"w");
6119 if (fp) {
6120 fprintf(fp,"%d\n",getpid());
6121 fclose(fp);
56906eef 6122 }
56906eef 6123}
6124
bcfc686d 6125int main(int argc, char **argv) {
6126 initServerConfig();
6127 if (argc == 2) {
6128 resetServerSaveParams();
6129 loadServerConfig(argv[1]);
6130 } else if (argc > 2) {
6131 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6132 exit(1);
6133 } else {
6134 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6135 }
6136 initServer();
6137 if (server.daemonize) daemonize();
6138 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6139#ifdef __linux__
6140 linuxOvercommitMemoryWarning();
6141#endif
6142 if (server.appendonly) {
6143 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6144 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6145 } else {
6146 if (rdbLoad(server.dbfilename) == REDIS_OK)
6147 redisLog(REDIS_NOTICE,"DB loaded from disk");
6148 }
6149 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
266373b2 6150 acceptHandler, NULL) == AE_ERR) oom("creating file event");
bcfc686d 6151 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6152 aeMain(server.el);
6153 aeDeleteEventLoop(server.el);
6154 return 0;
6155}
6156
6157/* ============================= Backtrace support ========================= */
6158
6159#ifdef HAVE_BACKTRACE
6160static char *findFuncName(void *pointer, unsigned long *offset);
6161
56906eef 6162static void *getMcontextEip(ucontext_t *uc) {
6163#if defined(__FreeBSD__)
6164 return (void*) uc->uc_mcontext.mc_eip;
6165#elif defined(__dietlibc__)
6166 return (void*) uc->uc_mcontext.eip;
06db1f50 6167#elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
56906eef 6168 return (void*) uc->uc_mcontext->__ss.__eip;
06db1f50 6169#elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
cb7e07cc 6170 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
06db1f50 6171 return (void*) uc->uc_mcontext->__ss.__rip;
cbc59b38 6172 #else
6173 return (void*) uc->uc_mcontext->__ss.__eip;
6174 #endif
b91cf5ef 6175#elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
56906eef 6176 return (void*) uc->uc_mcontext.gregs[REG_EIP];
b91cf5ef 6177#elif defined(__ia64__) /* Linux IA64 */
6178 return (void*) uc->uc_mcontext.sc_ip;
6179#else
6180 return NULL;
56906eef 6181#endif
6182}
6183
6184static void segvHandler(int sig, siginfo_t *info, void *secret) {
6185 void *trace[100];
6186 char **messages = NULL;
6187 int i, trace_size = 0;
6188 unsigned long offset=0;
56906eef 6189 ucontext_t *uc = (ucontext_t*) secret;
1c85b79f 6190 sds infostring;
56906eef 6191 REDIS_NOTUSED(info);
6192
6193 redisLog(REDIS_WARNING,
6194 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
1c85b79f 6195 infostring = genRedisInfoString();
6196 redisLog(REDIS_WARNING, "%s",infostring);
6197 /* It's not safe to sdsfree() the returned string under memory
6198 * corruption conditions. Let it leak as we are going to abort */
56906eef 6199
6200 trace_size = backtrace(trace, 100);
de96dbfe 6201 /* overwrite sigaction with caller's address */
b91cf5ef 6202 if (getMcontextEip(uc) != NULL) {
6203 trace[1] = getMcontextEip(uc);
6204 }
56906eef 6205 messages = backtrace_symbols(trace, trace_size);
fe3bbfbe 6206
d76412d1 6207 for (i=1; i<trace_size; ++i) {
56906eef 6208 char *fn = findFuncName(trace[i], &offset), *p;
6209
6210 p = strchr(messages[i],'+');
6211 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6212 redisLog(REDIS_WARNING,"%s", messages[i]);
6213 } else {
6214 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6215 }
6216 }
1c85b79f 6217 // free(messages); Don't call free() with possibly corrupted memory.
56906eef 6218 exit(0);
fe3bbfbe 6219}
56906eef 6220
6221static void setupSigSegvAction(void) {
6222 struct sigaction act;
6223
6224 sigemptyset (&act.sa_mask);
6225 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6226 * is used. Otherwise, sa_handler is used */
6227 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6228 act.sa_sigaction = segvHandler;
6229 sigaction (SIGSEGV, &act, NULL);
6230 sigaction (SIGBUS, &act, NULL);
12fea928 6231 sigaction (SIGFPE, &act, NULL);
6232 sigaction (SIGILL, &act, NULL);
6233 sigaction (SIGBUS, &act, NULL);
e65fdc78 6234 return;
56906eef 6235}
e65fdc78 6236
bcfc686d 6237#include "staticsymbols.h"
6238/* This function try to convert a pointer into a function name. It's used in
6239 * oreder to provide a backtrace under segmentation fault that's able to
6240 * display functions declared as static (otherwise the backtrace is useless). */
6241static char *findFuncName(void *pointer, unsigned long *offset){
6242 int i, ret = -1;
6243 unsigned long off, minoff = 0;
ed9b544e 6244
bcfc686d 6245 /* Try to match against the Symbol with the smallest offset */
6246 for (i=0; symsTable[i].pointer; i++) {
6247 unsigned long lp = (unsigned long) pointer;
0bc03378 6248
bcfc686d 6249 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6250 off=lp-symsTable[i].pointer;
6251 if (ret < 0 || off < minoff) {
6252 minoff=off;
6253 ret=i;
6254 }
6255 }
0bc03378 6256 }
bcfc686d 6257 if (ret == -1) return NULL;
6258 *offset = minoff;
6259 return symsTable[ret].name;
0bc03378 6260}
bcfc686d 6261#else /* HAVE_BACKTRACE */
6262static void setupSigSegvAction(void) {
0bc03378 6263}
bcfc686d 6264#endif /* HAVE_BACKTRACE */
0bc03378 6265
ed9b544e 6266
ed9b544e 6267
bcfc686d 6268/* The End */
6269
6270
ed9b544e 6271