]> git.saurik.com Git - redis.git/blame - redis.c
first fix for append only mode
[redis.git] / redis.c
CommitLineData
ed9b544e 1/*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
1812e024 30#define REDIS_VERSION "1.050"
23d4709d 31
32#include "fmacros.h"
fbf9bcdb 33#include "config.h"
ed9b544e 34
35#include <stdio.h>
36#include <stdlib.h>
37#include <string.h>
38#include <time.h>
39#include <unistd.h>
c9468bcf 40#define __USE_POSIX199309
ed9b544e 41#include <signal.h>
fbf9bcdb 42
43#ifdef HAVE_BACKTRACE
c9468bcf 44#include <execinfo.h>
45#include <ucontext.h>
fbf9bcdb 46#endif /* HAVE_BACKTRACE */
47
ed9b544e 48#include <sys/wait.h>
49#include <errno.h>
50#include <assert.h>
51#include <ctype.h>
52#include <stdarg.h>
53#include <inttypes.h>
54#include <arpa/inet.h>
55#include <sys/stat.h>
56#include <fcntl.h>
57#include <sys/time.h>
58#include <sys/resource.h>
f78fd11b 59#include <limits.h>
a7866db6 60#include <math.h>
0bc1b2f6 61
62#if defined(__sun)
5043dff3 63#include "solarisfixes.h"
64#endif
ed9b544e 65
c9468bcf 66#include "redis.h"
ed9b544e 67#include "ae.h" /* Event driven programming library */
68#include "sds.h" /* Dynamic safe strings */
69#include "anet.h" /* Networking the easy way */
70#include "dict.h" /* Hash tables */
71#include "adlist.h" /* Linked lists */
72#include "zmalloc.h" /* total memory usage aware version of malloc/free */
5f5b9840 73#include "lzf.h" /* LZF compression library */
74#include "pqsort.h" /* Partial qsort for SORT+LIMIT */
ed9b544e 75
76/* Error codes */
77#define REDIS_OK 0
78#define REDIS_ERR -1
79
80/* Static server configuration */
81#define REDIS_SERVERPORT 6379 /* TCP port */
82#define REDIS_MAXIDLETIME (60*5) /* default client timeout */
6208b3a7 83#define REDIS_IOBUF_LEN 1024
ed9b544e 84#define REDIS_LOADBUF_LEN 1024
93ea3759 85#define REDIS_STATIC_ARGS 4
ed9b544e 86#define REDIS_DEFAULT_DBNUM 16
87#define REDIS_CONFIGLINE_MAX 1024
88#define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
89#define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
94754ccc 90#define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
6f376729 91#define REDIS_MAX_WRITE_PER_EVENT (1024*64)
cd194638 92#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
ed9b544e 93
94/* Hash table parameters */
95#define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
ed9b544e 96
97/* Command flags */
3fd78bcd 98#define REDIS_CMD_BULK 1 /* Bulk write command */
99#define REDIS_CMD_INLINE 2 /* Inline command */
100/* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
101 this flags will return an error when the 'maxmemory' option is set in the
102 config file and the server is using more than maxmemory bytes of memory.
103 In short this commands are denied on low memory conditions. */
104#define REDIS_CMD_DENYOOM 4
ed9b544e 105
106/* Object types */
107#define REDIS_STRING 0
108#define REDIS_LIST 1
109#define REDIS_SET 2
1812e024 110#define REDIS_ZSET 3
111#define REDIS_HASH 4
f78fd11b 112
942a3961 113/* Objects encoding */
114#define REDIS_ENCODING_RAW 0 /* Raw representation */
115#define REDIS_ENCODING_INT 1 /* Encoded as integer */
116
f78fd11b 117/* Object types only used for dumping to disk */
bb32ede5 118#define REDIS_EXPIRETIME 253
ed9b544e 119#define REDIS_SELECTDB 254
120#define REDIS_EOF 255
121
f78fd11b 122/* Defines related to the dump file format. To store 32 bits lengths for short
123 * keys requires a lot of space, so we check the most significant 2 bits of
124 * the first byte to interpreter the length:
125 *
126 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
127 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
128 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
a4d1ba9a 129 * 11|000000 this means: specially encoded object will follow. The six bits
130 * number specify the kind of object that follows.
131 * See the REDIS_RDB_ENC_* defines.
f78fd11b 132 *
10c43610 133 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
134 * values, will fit inside. */
f78fd11b 135#define REDIS_RDB_6BITLEN 0
136#define REDIS_RDB_14BITLEN 1
137#define REDIS_RDB_32BITLEN 2
17be1a4a 138#define REDIS_RDB_ENCVAL 3
f78fd11b 139#define REDIS_RDB_LENERR UINT_MAX
140
a4d1ba9a 141/* When a length of a string object stored on disk has the first two bits
142 * set, the remaining two bits specify a special encoding for the object
143 * accordingly to the following defines: */
144#define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
145#define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
146#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
774e3047 147#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
a4d1ba9a 148
ed9b544e 149/* Client flags */
150#define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
151#define REDIS_SLAVE 2 /* This client is a slave server */
152#define REDIS_MASTER 4 /* This client is a master server */
87eca727 153#define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
ed9b544e 154
40d224a9 155/* Slave replication state - slave side */
ed9b544e 156#define REDIS_REPL_NONE 0 /* No active replication */
157#define REDIS_REPL_CONNECT 1 /* Must connect to master */
158#define REDIS_REPL_CONNECTED 2 /* Connected to master */
159
40d224a9 160/* Slave replication state - from the point of view of master
161 * Note that in SEND_BULK and ONLINE state the slave receives new updates
162 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
163 * to start the next background saving in order to send updates to it. */
164#define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
165#define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
166#define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
167#define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
168
ed9b544e 169/* List related stuff */
170#define REDIS_HEAD 0
171#define REDIS_TAIL 1
172
173/* Sort operations */
174#define REDIS_SORT_GET 0
175#define REDIS_SORT_DEL 1
176#define REDIS_SORT_INCR 2
177#define REDIS_SORT_DECR 3
178#define REDIS_SORT_ASC 4
179#define REDIS_SORT_DESC 5
180#define REDIS_SORTKEY_MAX 1024
181
182/* Log levels */
183#define REDIS_DEBUG 0
184#define REDIS_NOTICE 1
185#define REDIS_WARNING 2
186
187/* Anti-warning macro... */
188#define REDIS_NOTUSED(V) ((void) V)
189
6b47e12e 190#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
191#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
ed9b544e 192
193/*================================= Data types ============================== */
194
195/* A redis object, that is a type able to hold a string / list / set */
196typedef struct redisObject {
ed9b544e 197 void *ptr;
942a3961 198 unsigned char type;
199 unsigned char encoding;
200 unsigned char notused[2];
ed9b544e 201 int refcount;
202} robj;
203
3305306f 204typedef struct redisDb {
205 dict *dict;
206 dict *expires;
207 int id;
208} redisDb;
209
ed9b544e 210/* With multiplexing we need to take per-clinet state.
211 * Clients are taken in a liked list. */
212typedef struct redisClient {
213 int fd;
3305306f 214 redisDb *db;
ed9b544e 215 int dictid;
216 sds querybuf;
e8a74421 217 robj **argv, **mbargv;
218 int argc, mbargc;
40d224a9 219 int bulklen; /* bulk read len. -1 if not in bulk read mode */
e8a74421 220 int multibulk; /* multi bulk command format active */
ed9b544e 221 list *reply;
222 int sentlen;
223 time_t lastinteraction; /* time of the last interaction, used for timeout */
40d224a9 224 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
225 int slaveseldb; /* slave selected db, if this client is a slave */
226 int authenticated; /* when requirepass is non-NULL */
227 int replstate; /* replication state if this is a slave */
228 int repldbfd; /* replication DB file descriptor */
6208b3a7 229 long repldboff; /* replication DB file offset */
40d224a9 230 off_t repldbsize; /* replication DB file size */
ed9b544e 231} redisClient;
232
233struct saveparam {
234 time_t seconds;
235 int changes;
236};
237
238/* Global server state structure */
239struct redisServer {
240 int port;
241 int fd;
3305306f 242 redisDb *db;
10c43610 243 dict *sharingpool;
244 unsigned int sharingpoolsize;
ed9b544e 245 long long dirty; /* changes to DB from the last save */
246 list *clients;
87eca727 247 list *slaves, *monitors;
ed9b544e 248 char neterr[ANET_ERR_LEN];
249 aeEventLoop *el;
250 int cronloops; /* number of times the cron function run */
251 list *objfreelist; /* A list of freed objects to avoid malloc() */
252 time_t lastsave; /* Unix time of last save succeeede */
5fba9f71 253 size_t usedmemory; /* Used memory in megabytes */
ed9b544e 254 /* Fields used only for stats */
255 time_t stat_starttime; /* server start time */
256 long long stat_numcommands; /* number of processed commands */
257 long long stat_numconnections; /* number of connections received */
258 /* Configuration */
259 int verbosity;
260 int glueoutputbuf;
261 int maxidletime;
262 int dbnum;
263 int daemonize;
44b38ef4 264 int appendonly;
265 int appendfd;
266 int appendseldb;
ed329fcf 267 char *pidfile;
ed9b544e 268 int bgsaveinprogress;
9f3c422c 269 pid_t bgsavechildpid;
ed9b544e 270 struct saveparam *saveparams;
271 int saveparamslen;
272 char *logfile;
273 char *bindaddr;
274 char *dbfilename;
44b38ef4 275 char *appendfilename;
abcb223e 276 char *requirepass;
10c43610 277 int shareobjects;
ed9b544e 278 /* Replication related */
279 int isslave;
280 char *masterhost;
281 int masterport;
40d224a9 282 redisClient *master; /* client that is master for this slave */
ed9b544e 283 int replstate;
285add55 284 unsigned int maxclients;
d4465900 285 unsigned long maxmemory;
ed9b544e 286 /* Sort parameters - qsort_r() is only available under BSD so we
287 * have to take this state global, in order to pass it to sortCompare() */
288 int sort_desc;
289 int sort_alpha;
290 int sort_bypattern;
291};
292
293typedef void redisCommandProc(redisClient *c);
294struct redisCommand {
295 char *name;
296 redisCommandProc *proc;
297 int arity;
298 int flags;
299};
300
de96dbfe 301struct redisFunctionSym {
302 char *name;
56906eef 303 unsigned long pointer;
de96dbfe 304};
305
ed9b544e 306typedef struct _redisSortObject {
307 robj *obj;
308 union {
309 double score;
310 robj *cmpobj;
311 } u;
312} redisSortObject;
313
314typedef struct _redisSortOperation {
315 int type;
316 robj *pattern;
317} redisSortOperation;
318
6b47e12e 319/* ZSETs use a specialized version of Skiplists */
320
321typedef struct zskiplistNode {
322 struct zskiplistNode **forward;
e3870fab 323 struct zskiplistNode *backward;
6b47e12e 324 double score;
325 robj *obj;
326} zskiplistNode;
327
328typedef struct zskiplist {
e3870fab 329 struct zskiplistNode *header, *tail;
d13f767c 330 unsigned long length;
6b47e12e 331 int level;
332} zskiplist;
333
1812e024 334typedef struct zset {
335 dict *dict;
6b47e12e 336 zskiplist *zsl;
1812e024 337} zset;
338
6b47e12e 339/* Our shared "common" objects */
340
ed9b544e 341struct sharedObjectsStruct {
c937aa89 342 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
7b45bfb2 343 *colon, *nullbulk, *nullmultibulk,
c937aa89 344 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
345 *outofrangeerr, *plus,
ed9b544e 346 *select0, *select1, *select2, *select3, *select4,
347 *select5, *select6, *select7, *select8, *select9;
348} shared;
349
a7866db6 350/* Global vars that are actally used as constants. The following double
351 * values are used for double on-disk serialization, and are initialized
352 * at runtime to avoid strange compiler optimizations. */
353
354static double R_Zero, R_PosInf, R_NegInf, R_Nan;
355
ed9b544e 356/*================================ Prototypes =============================== */
357
358static void freeStringObject(robj *o);
359static void freeListObject(robj *o);
360static void freeSetObject(robj *o);
361static void decrRefCount(void *o);
362static robj *createObject(int type, void *ptr);
363static void freeClient(redisClient *c);
f78fd11b 364static int rdbLoad(char *filename);
ed9b544e 365static void addReply(redisClient *c, robj *obj);
366static void addReplySds(redisClient *c, sds s);
367static void incrRefCount(robj *o);
f78fd11b 368static int rdbSaveBackground(char *filename);
ed9b544e 369static robj *createStringObject(char *ptr, size_t len);
87eca727 370static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
44b38ef4 371static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
ed9b544e 372static int syncWithMaster(void);
10c43610 373static robj *tryObjectSharing(robj *o);
942a3961 374static int tryObjectEncoding(robj *o);
375static robj *getDecodedObject(const robj *o);
3305306f 376static int removeExpire(redisDb *db, robj *key);
377static int expireIfNeeded(redisDb *db, robj *key);
378static int deleteIfVolatile(redisDb *db, robj *key);
94754ccc 379static int deleteKey(redisDb *db, robj *key);
bb32ede5 380static time_t getExpire(redisDb *db, robj *key);
381static int setExpire(redisDb *db, robj *key, time_t when);
a3b21203 382static void updateSlavesWaitingBgsave(int bgsaveerr);
3fd78bcd 383static void freeMemoryIfNeeded(void);
de96dbfe 384static int processCommand(redisClient *c);
56906eef 385static void setupSigSegvAction(void);
a3b21203 386static void rdbRemoveTempFile(pid_t childpid);
0ea663ea 387static size_t stringObjectLen(robj *o);
638e42ac 388static void processInputBuffer(redisClient *c);
6b47e12e 389static zskiplist *zslCreate(void);
fd8ccf44 390static void zslFree(zskiplist *zsl);
2b59cfdf 391static void zslInsert(zskiplist *zsl, double score, robj *obj);
ed9b544e 392
abcb223e 393static void authCommand(redisClient *c);
ed9b544e 394static void pingCommand(redisClient *c);
395static void echoCommand(redisClient *c);
396static void setCommand(redisClient *c);
397static void setnxCommand(redisClient *c);
398static void getCommand(redisClient *c);
399static void delCommand(redisClient *c);
400static void existsCommand(redisClient *c);
401static void incrCommand(redisClient *c);
402static void decrCommand(redisClient *c);
403static void incrbyCommand(redisClient *c);
404static void decrbyCommand(redisClient *c);
405static void selectCommand(redisClient *c);
406static void randomkeyCommand(redisClient *c);
407static void keysCommand(redisClient *c);
408static void dbsizeCommand(redisClient *c);
409static void lastsaveCommand(redisClient *c);
410static void saveCommand(redisClient *c);
411static void bgsaveCommand(redisClient *c);
412static void shutdownCommand(redisClient *c);
413static void moveCommand(redisClient *c);
414static void renameCommand(redisClient *c);
415static void renamenxCommand(redisClient *c);
416static void lpushCommand(redisClient *c);
417static void rpushCommand(redisClient *c);
418static void lpopCommand(redisClient *c);
419static void rpopCommand(redisClient *c);
420static void llenCommand(redisClient *c);
421static void lindexCommand(redisClient *c);
422static void lrangeCommand(redisClient *c);
423static void ltrimCommand(redisClient *c);
424static void typeCommand(redisClient *c);
425static void lsetCommand(redisClient *c);
426static void saddCommand(redisClient *c);
427static void sremCommand(redisClient *c);
a4460ef4 428static void smoveCommand(redisClient *c);
ed9b544e 429static void sismemberCommand(redisClient *c);
430static void scardCommand(redisClient *c);
12fea928 431static void spopCommand(redisClient *c);
2abb95a9 432static void srandmemberCommand(redisClient *c);
ed9b544e 433static void sinterCommand(redisClient *c);
434static void sinterstoreCommand(redisClient *c);
40d224a9 435static void sunionCommand(redisClient *c);
436static void sunionstoreCommand(redisClient *c);
f4f56e1d 437static void sdiffCommand(redisClient *c);
438static void sdiffstoreCommand(redisClient *c);
ed9b544e 439static void syncCommand(redisClient *c);
440static void flushdbCommand(redisClient *c);
441static void flushallCommand(redisClient *c);
442static void sortCommand(redisClient *c);
443static void lremCommand(redisClient *c);
444static void infoCommand(redisClient *c);
70003d28 445static void mgetCommand(redisClient *c);
87eca727 446static void monitorCommand(redisClient *c);
3305306f 447static void expireCommand(redisClient *c);
802e8373 448static void expireatCommand(redisClient *c);
f6b141c5 449static void getsetCommand(redisClient *c);
fd88489a 450static void ttlCommand(redisClient *c);
321b0e13 451static void slaveofCommand(redisClient *c);
7f957c92 452static void debugCommand(redisClient *c);
f6b141c5 453static void msetCommand(redisClient *c);
454static void msetnxCommand(redisClient *c);
fd8ccf44 455static void zaddCommand(redisClient *c);
cc812361 456static void zrangeCommand(redisClient *c);
50c55df5 457static void zrangebyscoreCommand(redisClient *c);
e3870fab 458static void zrevrangeCommand(redisClient *c);
3c41331e 459static void zcardCommand(redisClient *c);
1b7106e7 460static void zremCommand(redisClient *c);
6e333bbe 461static void zscoreCommand(redisClient *c);
1807985b 462static void zremrangebyscoreCommand(redisClient *c);
f6b141c5 463
ed9b544e 464/*================================= Globals ================================= */
465
466/* Global vars */
467static struct redisServer server; /* server global state */
468static struct redisCommand cmdTable[] = {
469 {"get",getCommand,2,REDIS_CMD_INLINE},
3fd78bcd 470 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
471 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
5109cdff 472 {"del",delCommand,-2,REDIS_CMD_INLINE},
ed9b544e 473 {"exists",existsCommand,2,REDIS_CMD_INLINE},
3fd78bcd 474 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
475 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
70003d28 476 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
3fd78bcd 477 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
478 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 479 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
480 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
481 {"llen",llenCommand,2,REDIS_CMD_INLINE},
482 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
3fd78bcd 483 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 484 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
485 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
486 {"lrem",lremCommand,4,REDIS_CMD_BULK},
3fd78bcd 487 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 488 {"srem",sremCommand,3,REDIS_CMD_BULK},
a4460ef4 489 {"smove",smoveCommand,4,REDIS_CMD_BULK},
ed9b544e 490 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
491 {"scard",scardCommand,2,REDIS_CMD_INLINE},
12fea928 492 {"spop",spopCommand,2,REDIS_CMD_INLINE},
2abb95a9 493 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
3fd78bcd 494 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
495 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
496 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
497 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
498 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
499 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
ed9b544e 500 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
fd8ccf44 501 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
1b7106e7 502 {"zrem",zremCommand,3,REDIS_CMD_BULK},
1807985b 503 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
cc812361 504 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
50c55df5 505 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
e3870fab 506 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
3c41331e 507 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
6e333bbe 508 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
3fd78bcd 509 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
510 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
f6b141c5 511 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
512 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
513 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 514 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
515 {"select",selectCommand,2,REDIS_CMD_INLINE},
516 {"move",moveCommand,3,REDIS_CMD_INLINE},
517 {"rename",renameCommand,3,REDIS_CMD_INLINE},
518 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
321b0e13 519 {"expire",expireCommand,3,REDIS_CMD_INLINE},
802e8373 520 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
ed9b544e 521 {"keys",keysCommand,2,REDIS_CMD_INLINE},
522 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
abcb223e 523 {"auth",authCommand,2,REDIS_CMD_INLINE},
ed9b544e 524 {"ping",pingCommand,1,REDIS_CMD_INLINE},
525 {"echo",echoCommand,2,REDIS_CMD_BULK},
526 {"save",saveCommand,1,REDIS_CMD_INLINE},
527 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
528 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
529 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
530 {"type",typeCommand,2,REDIS_CMD_INLINE},
531 {"sync",syncCommand,1,REDIS_CMD_INLINE},
532 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
533 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
3fd78bcd 534 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
ed9b544e 535 {"info",infoCommand,1,REDIS_CMD_INLINE},
87eca727 536 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
fd88489a 537 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
321b0e13 538 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
7f957c92 539 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
ed9b544e 540 {NULL,NULL,0,0}
541};
ed9b544e 542/*============================ Utility functions ============================ */
543
544/* Glob-style pattern matching. */
545int stringmatchlen(const char *pattern, int patternLen,
546 const char *string, int stringLen, int nocase)
547{
548 while(patternLen) {
549 switch(pattern[0]) {
550 case '*':
551 while (pattern[1] == '*') {
552 pattern++;
553 patternLen--;
554 }
555 if (patternLen == 1)
556 return 1; /* match */
557 while(stringLen) {
558 if (stringmatchlen(pattern+1, patternLen-1,
559 string, stringLen, nocase))
560 return 1; /* match */
561 string++;
562 stringLen--;
563 }
564 return 0; /* no match */
565 break;
566 case '?':
567 if (stringLen == 0)
568 return 0; /* no match */
569 string++;
570 stringLen--;
571 break;
572 case '[':
573 {
574 int not, match;
575
576 pattern++;
577 patternLen--;
578 not = pattern[0] == '^';
579 if (not) {
580 pattern++;
581 patternLen--;
582 }
583 match = 0;
584 while(1) {
585 if (pattern[0] == '\\') {
586 pattern++;
587 patternLen--;
588 if (pattern[0] == string[0])
589 match = 1;
590 } else if (pattern[0] == ']') {
591 break;
592 } else if (patternLen == 0) {
593 pattern--;
594 patternLen++;
595 break;
596 } else if (pattern[1] == '-' && patternLen >= 3) {
597 int start = pattern[0];
598 int end = pattern[2];
599 int c = string[0];
600 if (start > end) {
601 int t = start;
602 start = end;
603 end = t;
604 }
605 if (nocase) {
606 start = tolower(start);
607 end = tolower(end);
608 c = tolower(c);
609 }
610 pattern += 2;
611 patternLen -= 2;
612 if (c >= start && c <= end)
613 match = 1;
614 } else {
615 if (!nocase) {
616 if (pattern[0] == string[0])
617 match = 1;
618 } else {
619 if (tolower((int)pattern[0]) == tolower((int)string[0]))
620 match = 1;
621 }
622 }
623 pattern++;
624 patternLen--;
625 }
626 if (not)
627 match = !match;
628 if (!match)
629 return 0; /* no match */
630 string++;
631 stringLen--;
632 break;
633 }
634 case '\\':
635 if (patternLen >= 2) {
636 pattern++;
637 patternLen--;
638 }
639 /* fall through */
640 default:
641 if (!nocase) {
642 if (pattern[0] != string[0])
643 return 0; /* no match */
644 } else {
645 if (tolower((int)pattern[0]) != tolower((int)string[0]))
646 return 0; /* no match */
647 }
648 string++;
649 stringLen--;
650 break;
651 }
652 pattern++;
653 patternLen--;
654 if (stringLen == 0) {
655 while(*pattern == '*') {
656 pattern++;
657 patternLen--;
658 }
659 break;
660 }
661 }
662 if (patternLen == 0 && stringLen == 0)
663 return 1;
664 return 0;
665}
666
56906eef 667static void redisLog(int level, const char *fmt, ...) {
ed9b544e 668 va_list ap;
669 FILE *fp;
670
671 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
672 if (!fp) return;
673
674 va_start(ap, fmt);
675 if (level >= server.verbosity) {
676 char *c = ".-*";
1904ecc1 677 char buf[64];
678 time_t now;
679
680 now = time(NULL);
6c9385e0 681 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
1904ecc1 682 fprintf(fp,"%s %c ",buf,c[level]);
ed9b544e 683 vfprintf(fp, fmt, ap);
684 fprintf(fp,"\n");
685 fflush(fp);
686 }
687 va_end(ap);
688
689 if (server.logfile) fclose(fp);
690}
691
692/*====================== Hash table type implementation ==================== */
693
694/* This is an hash table type that uses the SDS dynamic strings libary as
695 * keys and radis objects as values (objects can hold SDS strings,
696 * lists, sets). */
697
1812e024 698static void dictVanillaFree(void *privdata, void *val)
699{
700 DICT_NOTUSED(privdata);
701 zfree(val);
702}
703
ed9b544e 704static int sdsDictKeyCompare(void *privdata, const void *key1,
705 const void *key2)
706{
707 int l1,l2;
708 DICT_NOTUSED(privdata);
709
710 l1 = sdslen((sds)key1);
711 l2 = sdslen((sds)key2);
712 if (l1 != l2) return 0;
713 return memcmp(key1, key2, l1) == 0;
714}
715
716static void dictRedisObjectDestructor(void *privdata, void *val)
717{
718 DICT_NOTUSED(privdata);
719
720 decrRefCount(val);
721}
722
942a3961 723static int dictObjKeyCompare(void *privdata, const void *key1,
ed9b544e 724 const void *key2)
725{
726 const robj *o1 = key1, *o2 = key2;
727 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
728}
729
942a3961 730static unsigned int dictObjHash(const void *key) {
ed9b544e 731 const robj *o = key;
732 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
733}
734
942a3961 735static int dictEncObjKeyCompare(void *privdata, const void *key1,
736 const void *key2)
737{
738 const robj *o1 = key1, *o2 = key2;
739
740 if (o1->encoding == REDIS_ENCODING_RAW &&
741 o2->encoding == REDIS_ENCODING_RAW)
742 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
743 else {
744 robj *dec1, *dec2;
745 int cmp;
746
747 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
748 getDecodedObject(o1) : (robj*)o1;
749 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
750 getDecodedObject(o2) : (robj*)o2;
751 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
752 if (dec1 != o1) decrRefCount(dec1);
753 if (dec2 != o2) decrRefCount(dec2);
754 return cmp;
755 }
756}
757
758static unsigned int dictEncObjHash(const void *key) {
759 const robj *o = key;
760
761 if (o->encoding == REDIS_ENCODING_RAW)
762 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
763 else {
764 robj *dec = getDecodedObject(o);
765 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
766 decrRefCount(dec);
767 return hash;
768 }
769}
770
ed9b544e 771static dictType setDictType = {
942a3961 772 dictEncObjHash, /* hash function */
ed9b544e 773 NULL, /* key dup */
774 NULL, /* val dup */
942a3961 775 dictEncObjKeyCompare, /* key compare */
ed9b544e 776 dictRedisObjectDestructor, /* key destructor */
777 NULL /* val destructor */
778};
779
1812e024 780static dictType zsetDictType = {
781 dictEncObjHash, /* hash function */
782 NULL, /* key dup */
783 NULL, /* val dup */
784 dictEncObjKeyCompare, /* key compare */
785 dictRedisObjectDestructor, /* key destructor */
786 dictVanillaFree /* val destructor */
787};
788
ed9b544e 789static dictType hashDictType = {
942a3961 790 dictObjHash, /* hash function */
ed9b544e 791 NULL, /* key dup */
792 NULL, /* val dup */
942a3961 793 dictObjKeyCompare, /* key compare */
ed9b544e 794 dictRedisObjectDestructor, /* key destructor */
795 dictRedisObjectDestructor /* val destructor */
796};
797
798/* ========================= Random utility functions ======================= */
799
800/* Redis generally does not try to recover from out of memory conditions
801 * when allocating objects or strings, it is not clear if it will be possible
802 * to report this condition to the client since the networking layer itself
803 * is based on heap allocation for send buffers, so we simply abort.
804 * At least the code will be simpler to read... */
805static void oom(const char *msg) {
806 fprintf(stderr, "%s: Out of memory\n",msg);
807 fflush(stderr);
808 sleep(1);
809 abort();
810}
811
812/* ====================== Redis server networking stuff ===================== */
56906eef 813static void closeTimedoutClients(void) {
ed9b544e 814 redisClient *c;
ed9b544e 815 listNode *ln;
816 time_t now = time(NULL);
817
6208b3a7 818 listRewind(server.clients);
819 while ((ln = listYield(server.clients)) != NULL) {
ed9b544e 820 c = listNodeValue(ln);
821 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
c7cf2ec9 822 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
ed9b544e 823 (now - c->lastinteraction > server.maxidletime)) {
824 redisLog(REDIS_DEBUG,"Closing idle client");
825 freeClient(c);
826 }
827 }
ed9b544e 828}
829
12fea928 830static int htNeedsResize(dict *dict) {
831 long long size, used;
832
833 size = dictSlots(dict);
834 used = dictSize(dict);
835 return (size && used && size > DICT_HT_INITIAL_SIZE &&
836 (used*100/size < REDIS_HT_MINFILL));
837}
838
0bc03378 839/* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
840 * we resize the hash table to save memory */
56906eef 841static void tryResizeHashTables(void) {
0bc03378 842 int j;
843
844 for (j = 0; j < server.dbnum; j++) {
12fea928 845 if (htNeedsResize(server.db[j].dict)) {
846 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
0bc03378 847 dictResize(server.db[j].dict);
12fea928 848 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
0bc03378 849 }
12fea928 850 if (htNeedsResize(server.db[j].expires))
851 dictResize(server.db[j].expires);
0bc03378 852 }
853}
854
56906eef 855static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
94754ccc 856 int j, loops = server.cronloops++;
ed9b544e 857 REDIS_NOTUSED(eventLoop);
858 REDIS_NOTUSED(id);
859 REDIS_NOTUSED(clientData);
860
861 /* Update the global state with the amount of used memory */
862 server.usedmemory = zmalloc_used_memory();
863
0bc03378 864 /* Show some info about non-empty databases */
ed9b544e 865 for (j = 0; j < server.dbnum; j++) {
dec423d9 866 long long size, used, vkeys;
94754ccc 867
3305306f 868 size = dictSlots(server.db[j].dict);
869 used = dictSize(server.db[j].dict);
94754ccc 870 vkeys = dictSize(server.db[j].expires);
c3cb078d 871 if (!(loops % 5) && (used || vkeys)) {
872 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
a4d1ba9a 873 /* dictPrintStats(server.dict); */
ed9b544e 874 }
ed9b544e 875 }
876
0bc03378 877 /* We don't want to resize the hash tables while a bacground saving
878 * is in progress: the saving child is created using fork() that is
879 * implemented with a copy-on-write semantic in most modern systems, so
880 * if we resize the HT while there is the saving child at work actually
881 * a lot of memory movements in the parent will cause a lot of pages
882 * copied. */
883 if (!server.bgsaveinprogress) tryResizeHashTables();
884
ed9b544e 885 /* Show information about connected clients */
886 if (!(loops % 5)) {
21aecf4b 887 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
ed9b544e 888 listLength(server.clients)-listLength(server.slaves),
889 listLength(server.slaves),
10c43610 890 server.usedmemory,
3305306f 891 dictSize(server.sharingpool));
ed9b544e 892 }
893
894 /* Close connections of timedout clients */
0150db36 895 if (server.maxidletime && !(loops % 10))
ed9b544e 896 closeTimedoutClients();
897
898 /* Check if a background saving in progress terminated */
899 if (server.bgsaveinprogress) {
900 int statloc;
901 if (wait4(-1,&statloc,WNOHANG,NULL)) {
902 int exitcode = WEXITSTATUS(statloc);
9f3c422c 903 int bysignal = WIFSIGNALED(statloc);
904
905 if (!bysignal && exitcode == 0) {
ed9b544e 906 redisLog(REDIS_NOTICE,
907 "Background saving terminated with success");
908 server.dirty = 0;
909 server.lastsave = time(NULL);
9f3c422c 910 } else if (!bysignal && exitcode != 0) {
911 redisLog(REDIS_WARNING, "Background saving error");
ed9b544e 912 } else {
913 redisLog(REDIS_WARNING,
9f3c422c 914 "Background saving terminated by signal");
a3b21203 915 rdbRemoveTempFile(server.bgsavechildpid);
ed9b544e 916 }
917 server.bgsaveinprogress = 0;
9f3c422c 918 server.bgsavechildpid = -1;
a3b21203 919 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
ed9b544e 920 }
921 } else {
922 /* If there is not a background saving in progress check if
923 * we have to save now */
924 time_t now = time(NULL);
925 for (j = 0; j < server.saveparamslen; j++) {
926 struct saveparam *sp = server.saveparams+j;
927
928 if (server.dirty >= sp->changes &&
929 now-server.lastsave > sp->seconds) {
930 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
931 sp->changes, sp->seconds);
f78fd11b 932 rdbSaveBackground(server.dbfilename);
ed9b544e 933 break;
934 }
935 }
936 }
94754ccc 937
938 /* Try to expire a few timed out keys */
939 for (j = 0; j < server.dbnum; j++) {
940 redisDb *db = server.db+j;
941 int num = dictSize(db->expires);
942
943 if (num) {
944 time_t now = time(NULL);
945
946 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
947 num = REDIS_EXPIRELOOKUPS_PER_CRON;
948 while (num--) {
949 dictEntry *de;
950 time_t t;
951
952 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
953 t = (time_t) dictGetEntryVal(de);
954 if (now > t) {
955 deleteKey(db,dictGetEntryKey(de));
956 }
957 }
958 }
959 }
960
ed9b544e 961 /* Check if we should connect to a MASTER */
962 if (server.replstate == REDIS_REPL_CONNECT) {
963 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
964 if (syncWithMaster() == REDIS_OK) {
965 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
966 }
967 }
968 return 1000;
969}
970
971static void createSharedObjects(void) {
972 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
973 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
974 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
c937aa89 975 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
976 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
977 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
978 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
979 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
980 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
ed9b544e 981 /* no such key */
ed9b544e 982 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
983 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
984 "-ERR Operation against a key holding the wrong kind of value\r\n"));
ed9b544e 985 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
986 "-ERR no such key\r\n"));
ed9b544e 987 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
988 "-ERR syntax error\r\n"));
c937aa89 989 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
990 "-ERR source and destination objects are the same\r\n"));
991 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
992 "-ERR index out of range\r\n"));
ed9b544e 993 shared.space = createObject(REDIS_STRING,sdsnew(" "));
c937aa89 994 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
995 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
ed9b544e 996 shared.select0 = createStringObject("select 0\r\n",10);
997 shared.select1 = createStringObject("select 1\r\n",10);
998 shared.select2 = createStringObject("select 2\r\n",10);
999 shared.select3 = createStringObject("select 3\r\n",10);
1000 shared.select4 = createStringObject("select 4\r\n",10);
1001 shared.select5 = createStringObject("select 5\r\n",10);
1002 shared.select6 = createStringObject("select 6\r\n",10);
1003 shared.select7 = createStringObject("select 7\r\n",10);
1004 shared.select8 = createStringObject("select 8\r\n",10);
1005 shared.select9 = createStringObject("select 9\r\n",10);
1006}
1007
1008static void appendServerSaveParams(time_t seconds, int changes) {
1009 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
ed9b544e 1010 server.saveparams[server.saveparamslen].seconds = seconds;
1011 server.saveparams[server.saveparamslen].changes = changes;
1012 server.saveparamslen++;
1013}
1014
1015static void ResetServerSaveParams() {
1016 zfree(server.saveparams);
1017 server.saveparams = NULL;
1018 server.saveparamslen = 0;
1019}
1020
1021static void initServerConfig() {
1022 server.dbnum = REDIS_DEFAULT_DBNUM;
1023 server.port = REDIS_SERVERPORT;
1024 server.verbosity = REDIS_DEBUG;
1025 server.maxidletime = REDIS_MAXIDLETIME;
1026 server.saveparams = NULL;
1027 server.logfile = NULL; /* NULL = log on standard output */
1028 server.bindaddr = NULL;
1029 server.glueoutputbuf = 1;
1030 server.daemonize = 0;
44b38ef4 1031 server.appendonly = 0;
1032 server.appendfd = -1;
1033 server.appendseldb = -1; /* Make sure the first time will not match */
ed329fcf 1034 server.pidfile = "/var/run/redis.pid";
ed9b544e 1035 server.dbfilename = "dump.rdb";
44b38ef4 1036 server.appendfilename = "appendonly.log";
abcb223e 1037 server.requirepass = NULL;
10c43610 1038 server.shareobjects = 0;
21aecf4b 1039 server.sharingpoolsize = 1024;
285add55 1040 server.maxclients = 0;
3fd78bcd 1041 server.maxmemory = 0;
ed9b544e 1042 ResetServerSaveParams();
1043
1044 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1045 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1046 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1047 /* Replication related */
1048 server.isslave = 0;
1049 server.masterhost = NULL;
1050 server.masterport = 6379;
1051 server.master = NULL;
1052 server.replstate = REDIS_REPL_NONE;
a7866db6 1053
1054 /* Double constants initialization */
1055 R_Zero = 0.0;
1056 R_PosInf = 1.0/R_Zero;
1057 R_NegInf = -1.0/R_Zero;
1058 R_Nan = R_Zero/R_Zero;
ed9b544e 1059}
1060
1061static void initServer() {
1062 int j;
1063
1064 signal(SIGHUP, SIG_IGN);
1065 signal(SIGPIPE, SIG_IGN);
fe3bbfbe 1066 setupSigSegvAction();
ed9b544e 1067
1068 server.clients = listCreate();
1069 server.slaves = listCreate();
87eca727 1070 server.monitors = listCreate();
ed9b544e 1071 server.objfreelist = listCreate();
1072 createSharedObjects();
1073 server.el = aeCreateEventLoop();
3305306f 1074 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
10c43610 1075 server.sharingpool = dictCreate(&setDictType,NULL);
ed9b544e 1076 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1077 if (server.fd == -1) {
1078 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1079 exit(1);
1080 }
3305306f 1081 for (j = 0; j < server.dbnum; j++) {
1082 server.db[j].dict = dictCreate(&hashDictType,NULL);
1083 server.db[j].expires = dictCreate(&setDictType,NULL);
1084 server.db[j].id = j;
1085 }
ed9b544e 1086 server.cronloops = 0;
1087 server.bgsaveinprogress = 0;
9f3c422c 1088 server.bgsavechildpid = -1;
ed9b544e 1089 server.lastsave = time(NULL);
1090 server.dirty = 0;
1091 server.usedmemory = 0;
1092 server.stat_numcommands = 0;
1093 server.stat_numconnections = 0;
1094 server.stat_starttime = time(NULL);
1095 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
44b38ef4 1096
1097 if (server.appendonly) {
1098 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT);
1099 if (server.appendfd == -1) {
1100 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1101 strerror(errno));
1102 exit(1);
1103 }
1104 }
ed9b544e 1105}
1106
1107/* Empty the whole database */
ca37e9cd 1108static long long emptyDb() {
ed9b544e 1109 int j;
ca37e9cd 1110 long long removed = 0;
ed9b544e 1111
3305306f 1112 for (j = 0; j < server.dbnum; j++) {
ca37e9cd 1113 removed += dictSize(server.db[j].dict);
3305306f 1114 dictEmpty(server.db[j].dict);
1115 dictEmpty(server.db[j].expires);
1116 }
ca37e9cd 1117 return removed;
ed9b544e 1118}
1119
85dd2f3a 1120static int yesnotoi(char *s) {
1121 if (!strcasecmp(s,"yes")) return 1;
1122 else if (!strcasecmp(s,"no")) return 0;
1123 else return -1;
1124}
1125
ed9b544e 1126/* I agree, this is a very rudimental way to load a configuration...
1127 will improve later if the config gets more complex */
1128static void loadServerConfig(char *filename) {
c9a111ac 1129 FILE *fp;
ed9b544e 1130 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1131 int linenum = 0;
1132 sds line = NULL;
c9a111ac 1133
1134 if (filename[0] == '-' && filename[1] == '\0')
1135 fp = stdin;
1136 else {
1137 if ((fp = fopen(filename,"r")) == NULL) {
1138 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1139 exit(1);
1140 }
ed9b544e 1141 }
c9a111ac 1142
ed9b544e 1143 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1144 sds *argv;
1145 int argc, j;
1146
1147 linenum++;
1148 line = sdsnew(buf);
1149 line = sdstrim(line," \t\r\n");
1150
1151 /* Skip comments and blank lines*/
1152 if (line[0] == '#' || line[0] == '\0') {
1153 sdsfree(line);
1154 continue;
1155 }
1156
1157 /* Split into arguments */
1158 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1159 sdstolower(argv[0]);
1160
1161 /* Execute config directives */
bb0b03a3 1162 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
ed9b544e 1163 server.maxidletime = atoi(argv[1]);
0150db36 1164 if (server.maxidletime < 0) {
ed9b544e 1165 err = "Invalid timeout value"; goto loaderr;
1166 }
bb0b03a3 1167 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
ed9b544e 1168 server.port = atoi(argv[1]);
1169 if (server.port < 1 || server.port > 65535) {
1170 err = "Invalid port"; goto loaderr;
1171 }
bb0b03a3 1172 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
ed9b544e 1173 server.bindaddr = zstrdup(argv[1]);
bb0b03a3 1174 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
ed9b544e 1175 int seconds = atoi(argv[1]);
1176 int changes = atoi(argv[2]);
1177 if (seconds < 1 || changes < 0) {
1178 err = "Invalid save parameters"; goto loaderr;
1179 }
1180 appendServerSaveParams(seconds,changes);
bb0b03a3 1181 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
ed9b544e 1182 if (chdir(argv[1]) == -1) {
1183 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1184 argv[1], strerror(errno));
1185 exit(1);
1186 }
bb0b03a3 1187 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1188 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1189 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1190 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
ed9b544e 1191 else {
1192 err = "Invalid log level. Must be one of debug, notice, warning";
1193 goto loaderr;
1194 }
bb0b03a3 1195 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
c9a111ac 1196 FILE *logfp;
ed9b544e 1197
1198 server.logfile = zstrdup(argv[1]);
bb0b03a3 1199 if (!strcasecmp(server.logfile,"stdout")) {
ed9b544e 1200 zfree(server.logfile);
1201 server.logfile = NULL;
1202 }
1203 if (server.logfile) {
1204 /* Test if we are able to open the file. The server will not
1205 * be able to abort just for this problem later... */
c9a111ac 1206 logfp = fopen(server.logfile,"a");
1207 if (logfp == NULL) {
ed9b544e 1208 err = sdscatprintf(sdsempty(),
1209 "Can't open the log file: %s", strerror(errno));
1210 goto loaderr;
1211 }
c9a111ac 1212 fclose(logfp);
ed9b544e 1213 }
bb0b03a3 1214 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
ed9b544e 1215 server.dbnum = atoi(argv[1]);
1216 if (server.dbnum < 1) {
1217 err = "Invalid number of databases"; goto loaderr;
1218 }
285add55 1219 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1220 server.maxclients = atoi(argv[1]);
3fd78bcd 1221 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
d4465900 1222 server.maxmemory = strtoll(argv[1], NULL, 10);
bb0b03a3 1223 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
ed9b544e 1224 server.masterhost = sdsnew(argv[1]);
1225 server.masterport = atoi(argv[2]);
1226 server.replstate = REDIS_REPL_CONNECT;
bb0b03a3 1227 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
85dd2f3a 1228 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
ed9b544e 1229 err = "argument must be 'yes' or 'no'"; goto loaderr;
1230 }
bb0b03a3 1231 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
85dd2f3a 1232 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
10c43610 1233 err = "argument must be 'yes' or 'no'"; goto loaderr;
1234 }
e52c65b9 1235 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1236 server.sharingpoolsize = atoi(argv[1]);
1237 if (server.sharingpoolsize < 1) {
1238 err = "invalid object sharing pool size"; goto loaderr;
1239 }
bb0b03a3 1240 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
85dd2f3a 1241 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
ed9b544e 1242 err = "argument must be 'yes' or 'no'"; goto loaderr;
1243 }
44b38ef4 1244 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1245 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1246 err = "argument must be 'yes' or 'no'"; goto loaderr;
1247 }
bb0b03a3 1248 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
abcb223e 1249 server.requirepass = zstrdup(argv[1]);
bb0b03a3 1250 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
ed329fcf 1251 server.pidfile = zstrdup(argv[1]);
bb0b03a3 1252 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
b8b553c8 1253 server.dbfilename = zstrdup(argv[1]);
ed9b544e 1254 } else {
1255 err = "Bad directive or wrong number of arguments"; goto loaderr;
1256 }
1257 for (j = 0; j < argc; j++)
1258 sdsfree(argv[j]);
1259 zfree(argv);
1260 sdsfree(line);
1261 }
c9a111ac 1262 if (fp != stdin) fclose(fp);
ed9b544e 1263 return;
1264
1265loaderr:
1266 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1267 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1268 fprintf(stderr, ">>> '%s'\n", line);
1269 fprintf(stderr, "%s\n", err);
1270 exit(1);
1271}
1272
1273static void freeClientArgv(redisClient *c) {
1274 int j;
1275
1276 for (j = 0; j < c->argc; j++)
1277 decrRefCount(c->argv[j]);
e8a74421 1278 for (j = 0; j < c->mbargc; j++)
1279 decrRefCount(c->mbargv[j]);
ed9b544e 1280 c->argc = 0;
e8a74421 1281 c->mbargc = 0;
ed9b544e 1282}
1283
1284static void freeClient(redisClient *c) {
1285 listNode *ln;
1286
1287 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1288 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1289 sdsfree(c->querybuf);
1290 listRelease(c->reply);
1291 freeClientArgv(c);
1292 close(c->fd);
1293 ln = listSearchKey(server.clients,c);
1294 assert(ln != NULL);
1295 listDelNode(server.clients,ln);
1296 if (c->flags & REDIS_SLAVE) {
6208b3a7 1297 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1298 close(c->repldbfd);
87eca727 1299 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1300 ln = listSearchKey(l,c);
ed9b544e 1301 assert(ln != NULL);
87eca727 1302 listDelNode(l,ln);
ed9b544e 1303 }
1304 if (c->flags & REDIS_MASTER) {
1305 server.master = NULL;
1306 server.replstate = REDIS_REPL_CONNECT;
1307 }
93ea3759 1308 zfree(c->argv);
e8a74421 1309 zfree(c->mbargv);
ed9b544e 1310 zfree(c);
1311}
1312
1313static void glueReplyBuffersIfNeeded(redisClient *c) {
1314 int totlen = 0;
6208b3a7 1315 listNode *ln;
ed9b544e 1316 robj *o;
1317
6208b3a7 1318 listRewind(c->reply);
1319 while((ln = listYield(c->reply))) {
ed9b544e 1320 o = ln->value;
1321 totlen += sdslen(o->ptr);
ed9b544e 1322 /* This optimization makes more sense if we don't have to copy
1323 * too much data */
1324 if (totlen > 1024) return;
1325 }
1326 if (totlen > 0) {
1327 char buf[1024];
1328 int copylen = 0;
1329
6208b3a7 1330 listRewind(c->reply);
1331 while((ln = listYield(c->reply))) {
ed9b544e 1332 o = ln->value;
1333 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1334 copylen += sdslen(o->ptr);
1335 listDelNode(c->reply,ln);
ed9b544e 1336 }
1337 /* Now the output buffer is empty, add the new single element */
6fdc78ac 1338 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
6b47e12e 1339 listAddNodeTail(c->reply,o);
ed9b544e 1340 }
1341}
1342
1343static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1344 redisClient *c = privdata;
1345 int nwritten = 0, totwritten = 0, objlen;
1346 robj *o;
1347 REDIS_NOTUSED(el);
1348 REDIS_NOTUSED(mask);
1349
1350 if (server.glueoutputbuf && listLength(c->reply) > 1)
1351 glueReplyBuffersIfNeeded(c);
1352 while(listLength(c->reply)) {
1353 o = listNodeValue(listFirst(c->reply));
1354 objlen = sdslen(o->ptr);
1355
1356 if (objlen == 0) {
1357 listDelNode(c->reply,listFirst(c->reply));
1358 continue;
1359 }
1360
1361 if (c->flags & REDIS_MASTER) {
6f376729 1362 /* Don't reply to a master */
ed9b544e 1363 nwritten = objlen - c->sentlen;
1364 } else {
a4d1ba9a 1365 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
ed9b544e 1366 if (nwritten <= 0) break;
1367 }
1368 c->sentlen += nwritten;
1369 totwritten += nwritten;
1370 /* If we fully sent the object on head go to the next one */
1371 if (c->sentlen == objlen) {
1372 listDelNode(c->reply,listFirst(c->reply));
1373 c->sentlen = 0;
1374 }
6f376729 1375 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1376 * bytes, in a single threaded server it's a good idea to server
1377 * other clients as well, even if a very large request comes from
1378 * super fast link that is always able to accept data (in real world
1379 * terms think to 'KEYS *' against the loopback interfae) */
1380 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
ed9b544e 1381 }
1382 if (nwritten == -1) {
1383 if (errno == EAGAIN) {
1384 nwritten = 0;
1385 } else {
1386 redisLog(REDIS_DEBUG,
1387 "Error writing to client: %s", strerror(errno));
1388 freeClient(c);
1389 return;
1390 }
1391 }
1392 if (totwritten > 0) c->lastinteraction = time(NULL);
1393 if (listLength(c->reply) == 0) {
1394 c->sentlen = 0;
1395 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1396 }
1397}
1398
1399static struct redisCommand *lookupCommand(char *name) {
1400 int j = 0;
1401 while(cmdTable[j].name != NULL) {
bb0b03a3 1402 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
ed9b544e 1403 j++;
1404 }
1405 return NULL;
1406}
1407
1408/* resetClient prepare the client to process the next command */
1409static void resetClient(redisClient *c) {
1410 freeClientArgv(c);
1411 c->bulklen = -1;
e8a74421 1412 c->multibulk = 0;
ed9b544e 1413}
1414
1415/* If this function gets called we already read a whole
1416 * command, argments are in the client argv/argc fields.
1417 * processCommand() execute the command or prepare the
1418 * server for a bulk read from the client.
1419 *
1420 * If 1 is returned the client is still alive and valid and
1421 * and other operations can be performed by the caller. Otherwise
1422 * if 0 is returned the client was destroied (i.e. after QUIT). */
1423static int processCommand(redisClient *c) {
1424 struct redisCommand *cmd;
1425 long long dirty;
1426
3fd78bcd 1427 /* Free some memory if needed (maxmemory setting) */
1428 if (server.maxmemory) freeMemoryIfNeeded();
1429
e8a74421 1430 /* Handle the multi bulk command type. This is an alternative protocol
1431 * supported by Redis in order to receive commands that are composed of
1432 * multiple binary-safe "bulk" arguments. The latency of processing is
1433 * a bit higher but this allows things like multi-sets, so if this
1434 * protocol is used only for MSET and similar commands this is a big win. */
1435 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1436 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1437 if (c->multibulk <= 0) {
1438 resetClient(c);
1439 return 1;
1440 } else {
1441 decrRefCount(c->argv[c->argc-1]);
1442 c->argc--;
1443 return 1;
1444 }
1445 } else if (c->multibulk) {
1446 if (c->bulklen == -1) {
1447 if (((char*)c->argv[0]->ptr)[0] != '$') {
1448 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1449 resetClient(c);
1450 return 1;
1451 } else {
1452 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1453 decrRefCount(c->argv[0]);
1454 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1455 c->argc--;
1456 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1457 resetClient(c);
1458 return 1;
1459 }
1460 c->argc--;
1461 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1462 return 1;
1463 }
1464 } else {
1465 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1466 c->mbargv[c->mbargc] = c->argv[0];
1467 c->mbargc++;
1468 c->argc--;
1469 c->multibulk--;
1470 if (c->multibulk == 0) {
1471 robj **auxargv;
1472 int auxargc;
1473
1474 /* Here we need to swap the multi-bulk argc/argv with the
1475 * normal argc/argv of the client structure. */
1476 auxargv = c->argv;
1477 c->argv = c->mbargv;
1478 c->mbargv = auxargv;
1479
1480 auxargc = c->argc;
1481 c->argc = c->mbargc;
1482 c->mbargc = auxargc;
1483
1484 /* We need to set bulklen to something different than -1
1485 * in order for the code below to process the command without
1486 * to try to read the last argument of a bulk command as
1487 * a special argument. */
1488 c->bulklen = 0;
1489 /* continue below and process the command */
1490 } else {
1491 c->bulklen = -1;
1492 return 1;
1493 }
1494 }
1495 }
1496 /* -- end of multi bulk commands processing -- */
1497
ed9b544e 1498 /* The QUIT command is handled as a special case. Normal command
1499 * procs are unable to close the client connection safely */
bb0b03a3 1500 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
ed9b544e 1501 freeClient(c);
1502 return 0;
1503 }
1504 cmd = lookupCommand(c->argv[0]->ptr);
1505 if (!cmd) {
1506 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1507 resetClient(c);
1508 return 1;
1509 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1510 (c->argc < -cmd->arity)) {
1511 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1512 resetClient(c);
1513 return 1;
3fd78bcd 1514 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1515 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1516 resetClient(c);
1517 return 1;
ed9b544e 1518 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1519 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1520
1521 decrRefCount(c->argv[c->argc-1]);
1522 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1523 c->argc--;
1524 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1525 resetClient(c);
1526 return 1;
1527 }
1528 c->argc--;
1529 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1530 /* It is possible that the bulk read is already in the
8d0490e7 1531 * buffer. Check this condition and handle it accordingly.
1532 * This is just a fast path, alternative to call processInputBuffer().
1533 * It's a good idea since the code is small and this condition
1534 * happens most of the times. */
ed9b544e 1535 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1536 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1537 c->argc++;
1538 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1539 } else {
1540 return 1;
1541 }
1542 }
10c43610 1543 /* Let's try to share objects on the command arguments vector */
1544 if (server.shareobjects) {
1545 int j;
1546 for(j = 1; j < c->argc; j++)
1547 c->argv[j] = tryObjectSharing(c->argv[j]);
1548 }
942a3961 1549 /* Let's try to encode the bulk object to save space. */
1550 if (cmd->flags & REDIS_CMD_BULK)
1551 tryObjectEncoding(c->argv[c->argc-1]);
1552
e63943a4 1553 /* Check if the user is authenticated */
1554 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1555 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1556 resetClient(c);
1557 return 1;
1558 }
1559
ed9b544e 1560 /* Exec the command */
1561 dirty = server.dirty;
1562 cmd->proc(c);
44b38ef4 1563 if (server.appendonly != 0)
1564 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
ed9b544e 1565 if (server.dirty-dirty != 0 && listLength(server.slaves))
3305306f 1566 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
87eca727 1567 if (listLength(server.monitors))
3305306f 1568 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
ed9b544e 1569 server.stat_numcommands++;
1570
1571 /* Prepare the client for the next command */
1572 if (c->flags & REDIS_CLOSE) {
1573 freeClient(c);
1574 return 0;
1575 }
1576 resetClient(c);
1577 return 1;
1578}
1579
87eca727 1580static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
6208b3a7 1581 listNode *ln;
ed9b544e 1582 int outc = 0, j;
93ea3759 1583 robj **outv;
1584 /* (args*2)+1 is enough room for args, spaces, newlines */
1585 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1586
1587 if (argc <= REDIS_STATIC_ARGS) {
1588 outv = static_outv;
1589 } else {
1590 outv = zmalloc(sizeof(robj*)*(argc*2+1));
93ea3759 1591 }
ed9b544e 1592
1593 for (j = 0; j < argc; j++) {
1594 if (j != 0) outv[outc++] = shared.space;
1595 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1596 robj *lenobj;
1597
1598 lenobj = createObject(REDIS_STRING,
0ea663ea 1599 sdscatprintf(sdsempty(),"%d\r\n",
1600 stringObjectLen(argv[j])));
ed9b544e 1601 lenobj->refcount = 0;
1602 outv[outc++] = lenobj;
1603 }
1604 outv[outc++] = argv[j];
1605 }
1606 outv[outc++] = shared.crlf;
1607
40d224a9 1608 /* Increment all the refcounts at start and decrement at end in order to
1609 * be sure to free objects if there is no slave in a replication state
1610 * able to be feed with commands */
1611 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
6208b3a7 1612 listRewind(slaves);
1613 while((ln = listYield(slaves))) {
ed9b544e 1614 redisClient *slave = ln->value;
40d224a9 1615
1616 /* Don't feed slaves that are still waiting for BGSAVE to start */
6208b3a7 1617 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
40d224a9 1618
1619 /* Feed all the other slaves, MONITORs and so on */
ed9b544e 1620 if (slave->slaveseldb != dictid) {
1621 robj *selectcmd;
1622
1623 switch(dictid) {
1624 case 0: selectcmd = shared.select0; break;
1625 case 1: selectcmd = shared.select1; break;
1626 case 2: selectcmd = shared.select2; break;
1627 case 3: selectcmd = shared.select3; break;
1628 case 4: selectcmd = shared.select4; break;
1629 case 5: selectcmd = shared.select5; break;
1630 case 6: selectcmd = shared.select6; break;
1631 case 7: selectcmd = shared.select7; break;
1632 case 8: selectcmd = shared.select8; break;
1633 case 9: selectcmd = shared.select9; break;
1634 default:
1635 selectcmd = createObject(REDIS_STRING,
1636 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1637 selectcmd->refcount = 0;
1638 break;
1639 }
1640 addReply(slave,selectcmd);
1641 slave->slaveseldb = dictid;
1642 }
1643 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
ed9b544e 1644 }
40d224a9 1645 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
93ea3759 1646 if (outv != static_outv) zfree(outv);
ed9b544e 1647}
1648
44b38ef4 1649/* TODO: translate EXPIREs into EXPIRETOs */
1650static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1651 sds buf = sdsempty();
1652 int j;
1653 ssize_t nwritten;
1654
1655 /* The DB this command was targetting is not the same as the last command
1656 * we appendend. To issue a SELECT command is needed. */
1657 if (dictid != server.appendseldb) {
1658 char seldb[64];
1659
1660 snprintf(seldb,sizeof(seldb),"%d",dictid);
1661 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
1662 strlen(seldb),seldb);
16f92547 1663 server.appendseldb = dictid;
44b38ef4 1664 }
1665 /* Append the actual command */
1666 buf = sdscatprintf(buf,"*%d\r\n",argc);
1667 for (j = 0; j < argc; j++) {
1668 robj *o = argv[j];
1669
1670 if (o->encoding != REDIS_ENCODING_RAW)
1671 o = getDecodedObject(o);
1672 buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
1673 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
1674 buf = sdscatlen(buf,"\r\n",2);
1675 if (o != argv[j])
1676 decrRefCount(o);
1677 }
1678 /* We want to perform a single write. This should be guaranteed atomic
1679 * at least if the filesystem we are writing is a real physical one.
1680 * While this will save us against the server being killed I don't think
1681 * there is much to do about the whole server stopping for power problems
1682 * or alike */
1683 nwritten = write(server.appendfd,buf,sdslen(buf));
1684 if (nwritten != (unsigned)sdslen(buf)) {
1685 /* Ooops, we are in troubles. The best thing to do for now is
1686 * to simply exit instead to give the illusion that everything is
1687 * working as expected. */
1688 if (nwritten == -1) {
1689 redisLog(REDIS_WARNING,"Aborting on error writing to the append-only file: %s",strerror(errno));
1690 } else {
1691 redisLog(REDIS_WARNING,"Aborting on short write while writing to the append-only file: %s",strerror(errno));
1692 }
1693 abort();
1694 }
1695 fsync(server.appendfd); /* Let's try to get this data on the disk */
1696}
1697
638e42ac 1698static void processInputBuffer(redisClient *c) {
ed9b544e 1699again:
1700 if (c->bulklen == -1) {
1701 /* Read the first line of the query */
1702 char *p = strchr(c->querybuf,'\n');
1703 size_t querylen;
644fafa3 1704
ed9b544e 1705 if (p) {
1706 sds query, *argv;
1707 int argc, j;
1708
1709 query = c->querybuf;
1710 c->querybuf = sdsempty();
1711 querylen = 1+(p-(query));
1712 if (sdslen(query) > querylen) {
1713 /* leave data after the first line of the query in the buffer */
1714 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1715 }
1716 *p = '\0'; /* remove "\n" */
1717 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1718 sdsupdatelen(query);
1719
1720 /* Now we can split the query in arguments */
1721 if (sdslen(query) == 0) {
1722 /* Ignore empty query */
1723 sdsfree(query);
1724 return;
1725 }
1726 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
93ea3759 1727 sdsfree(query);
1728
1729 if (c->argv) zfree(c->argv);
1730 c->argv = zmalloc(sizeof(robj*)*argc);
93ea3759 1731
1732 for (j = 0; j < argc; j++) {
ed9b544e 1733 if (sdslen(argv[j])) {
1734 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1735 c->argc++;
1736 } else {
1737 sdsfree(argv[j]);
1738 }
1739 }
1740 zfree(argv);
1741 /* Execute the command. If the client is still valid
1742 * after processCommand() return and there is something
1743 * on the query buffer try to process the next command. */
af807d87 1744 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
ed9b544e 1745 return;
644fafa3 1746 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
ed9b544e 1747 redisLog(REDIS_DEBUG, "Client protocol error");
1748 freeClient(c);
1749 return;
1750 }
1751 } else {
1752 /* Bulk read handling. Note that if we are at this point
1753 the client already sent a command terminated with a newline,
1754 we are reading the bulk data that is actually the last
1755 argument of the command. */
1756 int qbl = sdslen(c->querybuf);
1757
1758 if (c->bulklen <= qbl) {
1759 /* Copy everything but the final CRLF as final argument */
1760 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1761 c->argc++;
1762 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
638e42ac 1763 /* Process the command. If the client is still valid after
1764 * the processing and there is more data in the buffer
1765 * try to parse it. */
1766 if (processCommand(c) && sdslen(c->querybuf)) goto again;
ed9b544e 1767 return;
1768 }
1769 }
1770}
1771
638e42ac 1772static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1773 redisClient *c = (redisClient*) privdata;
1774 char buf[REDIS_IOBUF_LEN];
1775 int nread;
1776 REDIS_NOTUSED(el);
1777 REDIS_NOTUSED(mask);
1778
1779 nread = read(fd, buf, REDIS_IOBUF_LEN);
1780 if (nread == -1) {
1781 if (errno == EAGAIN) {
1782 nread = 0;
1783 } else {
1784 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1785 freeClient(c);
1786 return;
1787 }
1788 } else if (nread == 0) {
1789 redisLog(REDIS_DEBUG, "Client closed connection");
1790 freeClient(c);
1791 return;
1792 }
1793 if (nread) {
1794 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1795 c->lastinteraction = time(NULL);
1796 } else {
1797 return;
1798 }
1799 processInputBuffer(c);
1800}
1801
ed9b544e 1802static int selectDb(redisClient *c, int id) {
1803 if (id < 0 || id >= server.dbnum)
1804 return REDIS_ERR;
3305306f 1805 c->db = &server.db[id];
ed9b544e 1806 return REDIS_OK;
1807}
1808
40d224a9 1809static void *dupClientReplyValue(void *o) {
1810 incrRefCount((robj*)o);
1811 return 0;
1812}
1813
ed9b544e 1814static redisClient *createClient(int fd) {
1815 redisClient *c = zmalloc(sizeof(*c));
1816
1817 anetNonBlock(NULL,fd);
1818 anetTcpNoDelay(NULL,fd);
1819 if (!c) return NULL;
1820 selectDb(c,0);
1821 c->fd = fd;
1822 c->querybuf = sdsempty();
1823 c->argc = 0;
93ea3759 1824 c->argv = NULL;
ed9b544e 1825 c->bulklen = -1;
e8a74421 1826 c->multibulk = 0;
1827 c->mbargc = 0;
1828 c->mbargv = NULL;
ed9b544e 1829 c->sentlen = 0;
1830 c->flags = 0;
1831 c->lastinteraction = time(NULL);
abcb223e 1832 c->authenticated = 0;
40d224a9 1833 c->replstate = REDIS_REPL_NONE;
6b47e12e 1834 c->reply = listCreate();
ed9b544e 1835 listSetFreeMethod(c->reply,decrRefCount);
40d224a9 1836 listSetDupMethod(c->reply,dupClientReplyValue);
ed9b544e 1837 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1838 readQueryFromClient, c, NULL) == AE_ERR) {
1839 freeClient(c);
1840 return NULL;
1841 }
6b47e12e 1842 listAddNodeTail(server.clients,c);
ed9b544e 1843 return c;
1844}
1845
1846static void addReply(redisClient *c, robj *obj) {
1847 if (listLength(c->reply) == 0 &&
6208b3a7 1848 (c->replstate == REDIS_REPL_NONE ||
1849 c->replstate == REDIS_REPL_ONLINE) &&
ed9b544e 1850 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1851 sendReplyToClient, c, NULL) == AE_ERR) return;
942a3961 1852 if (obj->encoding != REDIS_ENCODING_RAW) {
1853 obj = getDecodedObject(obj);
1854 } else {
1855 incrRefCount(obj);
1856 }
6b47e12e 1857 listAddNodeTail(c->reply,obj);
ed9b544e 1858}
1859
1860static void addReplySds(redisClient *c, sds s) {
1861 robj *o = createObject(REDIS_STRING,s);
1862 addReply(c,o);
1863 decrRefCount(o);
1864}
1865
942a3961 1866static void addReplyBulkLen(redisClient *c, robj *obj) {
1867 size_t len;
1868
1869 if (obj->encoding == REDIS_ENCODING_RAW) {
1870 len = sdslen(obj->ptr);
1871 } else {
1872 long n = (long)obj->ptr;
1873
1874 len = 1;
1875 if (n < 0) {
1876 len++;
1877 n = -n;
1878 }
1879 while((n = n/10) != 0) {
1880 len++;
1881 }
1882 }
1883 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1884}
1885
ed9b544e 1886static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1887 int cport, cfd;
1888 char cip[128];
285add55 1889 redisClient *c;
ed9b544e 1890 REDIS_NOTUSED(el);
1891 REDIS_NOTUSED(mask);
1892 REDIS_NOTUSED(privdata);
1893
1894 cfd = anetAccept(server.neterr, fd, cip, &cport);
1895 if (cfd == AE_ERR) {
1896 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1897 return;
1898 }
1899 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
285add55 1900 if ((c = createClient(cfd)) == NULL) {
ed9b544e 1901 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1902 close(cfd); /* May be already closed, just ingore errors */
1903 return;
1904 }
285add55 1905 /* If maxclient directive is set and this is one client more... close the
1906 * connection. Note that we create the client instead to check before
1907 * for this condition, since now the socket is already set in nonblocking
1908 * mode and we can send an error for free using the Kernel I/O */
1909 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1910 char *err = "-ERR max number of clients reached\r\n";
1911
1912 /* That's a best effort error message, don't check write errors */
d7fc9edb 1913 (void) write(c->fd,err,strlen(err));
285add55 1914 freeClient(c);
1915 return;
1916 }
ed9b544e 1917 server.stat_numconnections++;
1918}
1919
1920/* ======================= Redis objects implementation ===================== */
1921
1922static robj *createObject(int type, void *ptr) {
1923 robj *o;
1924
1925 if (listLength(server.objfreelist)) {
1926 listNode *head = listFirst(server.objfreelist);
1927 o = listNodeValue(head);
1928 listDelNode(server.objfreelist,head);
1929 } else {
1930 o = zmalloc(sizeof(*o));
1931 }
ed9b544e 1932 o->type = type;
942a3961 1933 o->encoding = REDIS_ENCODING_RAW;
ed9b544e 1934 o->ptr = ptr;
1935 o->refcount = 1;
1936 return o;
1937}
1938
1939static robj *createStringObject(char *ptr, size_t len) {
1940 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1941}
1942
1943static robj *createListObject(void) {
1944 list *l = listCreate();
1945
ed9b544e 1946 listSetFreeMethod(l,decrRefCount);
1947 return createObject(REDIS_LIST,l);
1948}
1949
1950static robj *createSetObject(void) {
1951 dict *d = dictCreate(&setDictType,NULL);
ed9b544e 1952 return createObject(REDIS_SET,d);
1953}
1954
1812e024 1955static robj *createZsetObject(void) {
6b47e12e 1956 zset *zs = zmalloc(sizeof(*zs));
1957
1958 zs->dict = dictCreate(&zsetDictType,NULL);
1959 zs->zsl = zslCreate();
1960 return createObject(REDIS_ZSET,zs);
1812e024 1961}
1962
ed9b544e 1963static void freeStringObject(robj *o) {
942a3961 1964 if (o->encoding == REDIS_ENCODING_RAW) {
1965 sdsfree(o->ptr);
1966 }
ed9b544e 1967}
1968
1969static void freeListObject(robj *o) {
1970 listRelease((list*) o->ptr);
1971}
1972
1973static void freeSetObject(robj *o) {
1974 dictRelease((dict*) o->ptr);
1975}
1976
fd8ccf44 1977static void freeZsetObject(robj *o) {
1978 zset *zs = o->ptr;
1979
1980 dictRelease(zs->dict);
1981 zslFree(zs->zsl);
1982 zfree(zs);
1983}
1984
ed9b544e 1985static void freeHashObject(robj *o) {
1986 dictRelease((dict*) o->ptr);
1987}
1988
1989static void incrRefCount(robj *o) {
1990 o->refcount++;
94754ccc 1991#ifdef DEBUG_REFCOUNT
1992 if (o->type == REDIS_STRING)
1993 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1994#endif
ed9b544e 1995}
1996
1997static void decrRefCount(void *obj) {
1998 robj *o = obj;
94754ccc 1999
2000#ifdef DEBUG_REFCOUNT
2001 if (o->type == REDIS_STRING)
2002 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2003#endif
ed9b544e 2004 if (--(o->refcount) == 0) {
2005 switch(o->type) {
2006 case REDIS_STRING: freeStringObject(o); break;
2007 case REDIS_LIST: freeListObject(o); break;
2008 case REDIS_SET: freeSetObject(o); break;
fd8ccf44 2009 case REDIS_ZSET: freeZsetObject(o); break;
ed9b544e 2010 case REDIS_HASH: freeHashObject(o); break;
2011 default: assert(0 != 0); break;
2012 }
2013 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2014 !listAddNodeHead(server.objfreelist,o))
2015 zfree(o);
2016 }
2017}
2018
942a3961 2019static robj *lookupKey(redisDb *db, robj *key) {
2020 dictEntry *de = dictFind(db->dict,key);
2021 return de ? dictGetEntryVal(de) : NULL;
2022}
2023
2024static robj *lookupKeyRead(redisDb *db, robj *key) {
2025 expireIfNeeded(db,key);
2026 return lookupKey(db,key);
2027}
2028
2029static robj *lookupKeyWrite(redisDb *db, robj *key) {
2030 deleteIfVolatile(db,key);
2031 return lookupKey(db,key);
2032}
2033
2034static int deleteKey(redisDb *db, robj *key) {
2035 int retval;
2036
2037 /* We need to protect key from destruction: after the first dictDelete()
2038 * it may happen that 'key' is no longer valid if we don't increment
2039 * it's count. This may happen when we get the object reference directly
2040 * from the hash table with dictRandomKey() or dict iterators */
2041 incrRefCount(key);
2042 if (dictSize(db->expires)) dictDelete(db->expires,key);
2043 retval = dictDelete(db->dict,key);
2044 decrRefCount(key);
2045
2046 return retval == DICT_OK;
2047}
2048
10c43610 2049/* Try to share an object against the shared objects pool */
2050static robj *tryObjectSharing(robj *o) {
2051 struct dictEntry *de;
2052 unsigned long c;
2053
3305306f 2054 if (o == NULL || server.shareobjects == 0) return o;
10c43610 2055
2056 assert(o->type == REDIS_STRING);
2057 de = dictFind(server.sharingpool,o);
2058 if (de) {
2059 robj *shared = dictGetEntryKey(de);
2060
2061 c = ((unsigned long) dictGetEntryVal(de))+1;
2062 dictGetEntryVal(de) = (void*) c;
2063 incrRefCount(shared);
2064 decrRefCount(o);
2065 return shared;
2066 } else {
2067 /* Here we are using a stream algorihtm: Every time an object is
2068 * shared we increment its count, everytime there is a miss we
2069 * recrement the counter of a random object. If this object reaches
2070 * zero we remove the object and put the current object instead. */
3305306f 2071 if (dictSize(server.sharingpool) >=
10c43610 2072 server.sharingpoolsize) {
2073 de = dictGetRandomKey(server.sharingpool);
2074 assert(de != NULL);
2075 c = ((unsigned long) dictGetEntryVal(de))-1;
2076 dictGetEntryVal(de) = (void*) c;
2077 if (c == 0) {
2078 dictDelete(server.sharingpool,de->key);
2079 }
2080 } else {
2081 c = 0; /* If the pool is empty we want to add this object */
2082 }
2083 if (c == 0) {
2084 int retval;
2085
2086 retval = dictAdd(server.sharingpool,o,(void*)1);
2087 assert(retval == DICT_OK);
2088 incrRefCount(o);
2089 }
2090 return o;
2091 }
2092}
2093
724a51b1 2094/* Check if the nul-terminated string 's' can be represented by a long
2095 * (that is, is a number that fits into long without any other space or
2096 * character before or after the digits).
2097 *
2098 * If so, the function returns REDIS_OK and *longval is set to the value
2099 * of the number. Otherwise REDIS_ERR is returned */
f69f2cba 2100static int isStringRepresentableAsLong(sds s, long *longval) {
724a51b1 2101 char buf[32], *endptr;
2102 long value;
2103 int slen;
2104
2105 value = strtol(s, &endptr, 10);
2106 if (endptr[0] != '\0') return REDIS_ERR;
2107 slen = snprintf(buf,32,"%ld",value);
2108
2109 /* If the number converted back into a string is not identical
2110 * then it's not possible to encode the string as integer */
f69f2cba 2111 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
724a51b1 2112 if (longval) *longval = value;
2113 return REDIS_OK;
2114}
2115
942a3961 2116/* Try to encode a string object in order to save space */
2117static int tryObjectEncoding(robj *o) {
2118 long value;
942a3961 2119 sds s = o->ptr;
3305306f 2120
942a3961 2121 if (o->encoding != REDIS_ENCODING_RAW)
2122 return REDIS_ERR; /* Already encoded */
3305306f 2123
942a3961 2124 /* It's not save to encode shared objects: shared objects can be shared
2125 * everywhere in the "object space" of Redis. Encoded objects can only
2126 * appear as "values" (and not, for instance, as keys) */
2127 if (o->refcount > 1) return REDIS_ERR;
3305306f 2128
942a3961 2129 /* Currently we try to encode only strings */
2130 assert(o->type == REDIS_STRING);
94754ccc 2131
724a51b1 2132 /* Check if we can represent this string as a long integer */
2133 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
942a3961 2134
2135 /* Ok, this object can be encoded */
2136 o->encoding = REDIS_ENCODING_INT;
2137 sdsfree(o->ptr);
2138 o->ptr = (void*) value;
2139 return REDIS_OK;
2140}
2141
2142/* Get a decoded version of an encoded object (returned as a new object) */
2143static robj *getDecodedObject(const robj *o) {
2144 robj *dec;
2145
2146 assert(o->encoding != REDIS_ENCODING_RAW);
2147 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2148 char buf[32];
2149
2150 snprintf(buf,32,"%ld",(long)o->ptr);
2151 dec = createStringObject(buf,strlen(buf));
2152 return dec;
2153 } else {
2154 assert(1 != 1);
2155 }
3305306f 2156}
2157
d7f43c08 2158/* Compare two string objects via strcmp() or alike.
2159 * Note that the objects may be integer-encoded. In such a case we
2160 * use snprintf() to get a string representation of the numbers on the stack
2161 * and compare the strings, it's much faster than calling getDecodedObject(). */
724a51b1 2162static int compareStringObjects(robj *a, robj *b) {
2163 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
d7f43c08 2164 char bufa[128], bufb[128], *astr, *bstr;
2165 int bothsds = 1;
724a51b1 2166
e197b441 2167 if (a == b) return 0;
d7f43c08 2168 if (a->encoding != REDIS_ENCODING_RAW) {
2169 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2170 astr = bufa;
2171 bothsds = 0;
724a51b1 2172 } else {
d7f43c08 2173 astr = a->ptr;
724a51b1 2174 }
d7f43c08 2175 if (b->encoding != REDIS_ENCODING_RAW) {
2176 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2177 bstr = bufb;
2178 bothsds = 0;
2179 } else {
2180 bstr = b->ptr;
2181 }
2182 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
724a51b1 2183}
2184
0ea663ea 2185static size_t stringObjectLen(robj *o) {
2186 assert(o->type == REDIS_STRING);
2187 if (o->encoding == REDIS_ENCODING_RAW) {
2188 return sdslen(o->ptr);
2189 } else {
2190 char buf[32];
2191
2192 return snprintf(buf,32,"%ld",(long)o->ptr);
2193 }
2194}
2195
ed9b544e 2196/*============================ DB saving/loading ============================ */
2197
f78fd11b 2198static int rdbSaveType(FILE *fp, unsigned char type) {
2199 if (fwrite(&type,1,1,fp) == 0) return -1;
2200 return 0;
2201}
2202
bb32ede5 2203static int rdbSaveTime(FILE *fp, time_t t) {
2204 int32_t t32 = (int32_t) t;
2205 if (fwrite(&t32,4,1,fp) == 0) return -1;
2206 return 0;
2207}
2208
e3566d4b 2209/* check rdbLoadLen() comments for more info */
f78fd11b 2210static int rdbSaveLen(FILE *fp, uint32_t len) {
2211 unsigned char buf[2];
2212
2213 if (len < (1<<6)) {
2214 /* Save a 6 bit len */
10c43610 2215 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
f78fd11b 2216 if (fwrite(buf,1,1,fp) == 0) return -1;
2217 } else if (len < (1<<14)) {
2218 /* Save a 14 bit len */
10c43610 2219 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
f78fd11b 2220 buf[1] = len&0xFF;
17be1a4a 2221 if (fwrite(buf,2,1,fp) == 0) return -1;
f78fd11b 2222 } else {
2223 /* Save a 32 bit len */
10c43610 2224 buf[0] = (REDIS_RDB_32BITLEN<<6);
f78fd11b 2225 if (fwrite(buf,1,1,fp) == 0) return -1;
2226 len = htonl(len);
2227 if (fwrite(&len,4,1,fp) == 0) return -1;
2228 }
2229 return 0;
2230}
2231
e3566d4b 2232/* String objects in the form "2391" "-100" without any space and with a
2233 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2234 * encoded as integers to save space */
56906eef 2235static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
e3566d4b 2236 long long value;
2237 char *endptr, buf[32];
2238
2239 /* Check if it's possible to encode this value as a number */
2240 value = strtoll(s, &endptr, 10);
2241 if (endptr[0] != '\0') return 0;
2242 snprintf(buf,32,"%lld",value);
2243
2244 /* If the number converted back into a string is not identical
2245 * then it's not possible to encode the string as integer */
2246 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2247
2248 /* Finally check if it fits in our ranges */
2249 if (value >= -(1<<7) && value <= (1<<7)-1) {
2250 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2251 enc[1] = value&0xFF;
2252 return 2;
2253 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2254 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2255 enc[1] = value&0xFF;
2256 enc[2] = (value>>8)&0xFF;
2257 return 3;
2258 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2259 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2260 enc[1] = value&0xFF;
2261 enc[2] = (value>>8)&0xFF;
2262 enc[3] = (value>>16)&0xFF;
2263 enc[4] = (value>>24)&0xFF;
2264 return 5;
2265 } else {
2266 return 0;
2267 }
2268}
2269
774e3047 2270static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2271 unsigned int comprlen, outlen;
2272 unsigned char byte;
2273 void *out;
2274
2275 /* We require at least four bytes compression for this to be worth it */
2276 outlen = sdslen(obj->ptr)-4;
2277 if (outlen <= 0) return 0;
3a2694c4 2278 if ((out = zmalloc(outlen+1)) == NULL) return 0;
774e3047 2279 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2280 if (comprlen == 0) {
88e85998 2281 zfree(out);
774e3047 2282 return 0;
2283 }
2284 /* Data compressed! Let's save it on disk */
2285 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2286 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2287 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2288 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2289 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
88e85998 2290 zfree(out);
774e3047 2291 return comprlen;
2292
2293writeerr:
88e85998 2294 zfree(out);
774e3047 2295 return -1;
2296}
2297
e3566d4b 2298/* Save a string objet as [len][data] on disk. If the object is a string
2299 * representation of an integer value we try to safe it in a special form */
942a3961 2300static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2301 size_t len;
e3566d4b 2302 int enclen;
10c43610 2303
942a3961 2304 len = sdslen(obj->ptr);
2305
774e3047 2306 /* Try integer encoding */
e3566d4b 2307 if (len <= 11) {
2308 unsigned char buf[5];
2309 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2310 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2311 return 0;
2312 }
2313 }
774e3047 2314
2315 /* Try LZF compression - under 20 bytes it's unable to compress even
88e85998 2316 * aaaaaaaaaaaaaaaaaa so skip it */
942a3961 2317 if (len > 20) {
774e3047 2318 int retval;
2319
2320 retval = rdbSaveLzfStringObject(fp,obj);
2321 if (retval == -1) return -1;
2322 if (retval > 0) return 0;
2323 /* retval == 0 means data can't be compressed, save the old way */
2324 }
2325
2326 /* Store verbatim */
10c43610 2327 if (rdbSaveLen(fp,len) == -1) return -1;
2328 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2329 return 0;
2330}
2331
942a3961 2332/* Like rdbSaveStringObjectRaw() but handle encoded objects */
2333static int rdbSaveStringObject(FILE *fp, robj *obj) {
2334 int retval;
2335 robj *dec;
2336
2337 if (obj->encoding != REDIS_ENCODING_RAW) {
2338 dec = getDecodedObject(obj);
2339 retval = rdbSaveStringObjectRaw(fp,dec);
2340 decrRefCount(dec);
2341 return retval;
2342 } else {
2343 return rdbSaveStringObjectRaw(fp,obj);
2344 }
2345}
2346
a7866db6 2347/* Save a double value. Doubles are saved as strings prefixed by an unsigned
2348 * 8 bit integer specifing the length of the representation.
2349 * This 8 bit integer has special values in order to specify the following
2350 * conditions:
2351 * 253: not a number
2352 * 254: + inf
2353 * 255: - inf
2354 */
2355static int rdbSaveDoubleValue(FILE *fp, double val) {
2356 unsigned char buf[128];
2357 int len;
2358
2359 if (isnan(val)) {
2360 buf[0] = 253;
2361 len = 1;
2362 } else if (!isfinite(val)) {
2363 len = 1;
2364 buf[0] = (val < 0) ? 255 : 254;
2365 } else {
2366 snprintf((char*)buf+1,sizeof(buf)-1,"%.16g",val);
2367 buf[0] = strlen((char*)buf);
2368 len = buf[0]+1;
2369 }
2370 if (fwrite(buf,len,1,fp) == 0) return -1;
2371 return 0;
2372}
2373
ed9b544e 2374/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
f78fd11b 2375static int rdbSave(char *filename) {
ed9b544e 2376 dictIterator *di = NULL;
2377 dictEntry *de;
ed9b544e 2378 FILE *fp;
2379 char tmpfile[256];
2380 int j;
bb32ede5 2381 time_t now = time(NULL);
ed9b544e 2382
a3b21203 2383 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
ed9b544e 2384 fp = fopen(tmpfile,"w");
2385 if (!fp) {
2386 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2387 return REDIS_ERR;
2388 }
f78fd11b 2389 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
ed9b544e 2390 for (j = 0; j < server.dbnum; j++) {
bb32ede5 2391 redisDb *db = server.db+j;
2392 dict *d = db->dict;
3305306f 2393 if (dictSize(d) == 0) continue;
ed9b544e 2394 di = dictGetIterator(d);
2395 if (!di) {
2396 fclose(fp);
2397 return REDIS_ERR;
2398 }
2399
2400 /* Write the SELECT DB opcode */
f78fd11b 2401 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2402 if (rdbSaveLen(fp,j) == -1) goto werr;
ed9b544e 2403
2404 /* Iterate this DB writing every entry */
2405 while((de = dictNext(di)) != NULL) {
2406 robj *key = dictGetEntryKey(de);
2407 robj *o = dictGetEntryVal(de);
bb32ede5 2408 time_t expiretime = getExpire(db,key);
2409
2410 /* Save the expire time */
2411 if (expiretime != -1) {
2412 /* If this key is already expired skip it */
2413 if (expiretime < now) continue;
2414 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2415 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2416 }
2417 /* Save the key and associated value */
f78fd11b 2418 if (rdbSaveType(fp,o->type) == -1) goto werr;
10c43610 2419 if (rdbSaveStringObject(fp,key) == -1) goto werr;
f78fd11b 2420 if (o->type == REDIS_STRING) {
ed9b544e 2421 /* Save a string value */
10c43610 2422 if (rdbSaveStringObject(fp,o) == -1) goto werr;
f78fd11b 2423 } else if (o->type == REDIS_LIST) {
ed9b544e 2424 /* Save a list value */
2425 list *list = o->ptr;
6208b3a7 2426 listNode *ln;
ed9b544e 2427
6208b3a7 2428 listRewind(list);
f78fd11b 2429 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
6208b3a7 2430 while((ln = listYield(list))) {
ed9b544e 2431 robj *eleobj = listNodeValue(ln);
f78fd11b 2432
10c43610 2433 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
ed9b544e 2434 }
f78fd11b 2435 } else if (o->type == REDIS_SET) {
ed9b544e 2436 /* Save a set value */
2437 dict *set = o->ptr;
2438 dictIterator *di = dictGetIterator(set);
2439 dictEntry *de;
2440
3305306f 2441 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
ed9b544e 2442 while((de = dictNext(di)) != NULL) {
10c43610 2443 robj *eleobj = dictGetEntryKey(de);
ed9b544e 2444
10c43610 2445 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
ed9b544e 2446 }
2447 dictReleaseIterator(di);
2b59cfdf 2448 } else if (o->type == REDIS_ZSET) {
2449 /* Save a set value */
2450 zset *zs = o->ptr;
2451 dictIterator *di = dictGetIterator(zs->dict);
2452 dictEntry *de;
2453
2454 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2455 while((de = dictNext(di)) != NULL) {
2456 robj *eleobj = dictGetEntryKey(de);
2457 double *score = dictGetEntryVal(de);
2458
2459 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2460 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2461 }
2462 dictReleaseIterator(di);
ed9b544e 2463 } else {
2464 assert(0 != 0);
2465 }
2466 }
2467 dictReleaseIterator(di);
2468 }
2469 /* EOF opcode */
f78fd11b 2470 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2471
2472 /* Make sure data will not remain on the OS's output buffers */
ed9b544e 2473 fflush(fp);
2474 fsync(fileno(fp));
2475 fclose(fp);
2476
2477 /* Use RENAME to make sure the DB file is changed atomically only
2478 * if the generate DB file is ok. */
2479 if (rename(tmpfile,filename) == -1) {
325d1eb4 2480 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
ed9b544e 2481 unlink(tmpfile);
2482 return REDIS_ERR;
2483 }
2484 redisLog(REDIS_NOTICE,"DB saved on disk");
2485 server.dirty = 0;
2486 server.lastsave = time(NULL);
2487 return REDIS_OK;
2488
2489werr:
2490 fclose(fp);
2491 unlink(tmpfile);
2492 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2493 if (di) dictReleaseIterator(di);
2494 return REDIS_ERR;
2495}
2496
f78fd11b 2497static int rdbSaveBackground(char *filename) {
ed9b544e 2498 pid_t childpid;
2499
2500 if (server.bgsaveinprogress) return REDIS_ERR;
2501 if ((childpid = fork()) == 0) {
2502 /* Child */
2503 close(server.fd);
f78fd11b 2504 if (rdbSave(filename) == REDIS_OK) {
ed9b544e 2505 exit(0);
2506 } else {
2507 exit(1);
2508 }
2509 } else {
2510 /* Parent */
5a7c647e 2511 if (childpid == -1) {
2512 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2513 strerror(errno));
2514 return REDIS_ERR;
2515 }
ed9b544e 2516 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2517 server.bgsaveinprogress = 1;
9f3c422c 2518 server.bgsavechildpid = childpid;
ed9b544e 2519 return REDIS_OK;
2520 }
2521 return REDIS_OK; /* unreached */
2522}
2523
a3b21203 2524static void rdbRemoveTempFile(pid_t childpid) {
2525 char tmpfile[256];
2526
2527 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2528 unlink(tmpfile);
2529}
2530
f78fd11b 2531static int rdbLoadType(FILE *fp) {
2532 unsigned char type;
7b45bfb2 2533 if (fread(&type,1,1,fp) == 0) return -1;
2534 return type;
2535}
2536
bb32ede5 2537static time_t rdbLoadTime(FILE *fp) {
2538 int32_t t32;
2539 if (fread(&t32,4,1,fp) == 0) return -1;
2540 return (time_t) t32;
2541}
2542
e3566d4b 2543/* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2544 * of this file for a description of how this are stored on disk.
2545 *
2546 * isencoded is set to 1 if the readed length is not actually a length but
2547 * an "encoding type", check the above comments for more info */
2548static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
f78fd11b 2549 unsigned char buf[2];
2550 uint32_t len;
2551
e3566d4b 2552 if (isencoded) *isencoded = 0;
f78fd11b 2553 if (rdbver == 0) {
2554 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2555 return ntohl(len);
2556 } else {
17be1a4a 2557 int type;
2558
f78fd11b 2559 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
17be1a4a 2560 type = (buf[0]&0xC0)>>6;
2561 if (type == REDIS_RDB_6BITLEN) {
f78fd11b 2562 /* Read a 6 bit len */
e3566d4b 2563 return buf[0]&0x3F;
2564 } else if (type == REDIS_RDB_ENCVAL) {
2565 /* Read a 6 bit len encoding type */
2566 if (isencoded) *isencoded = 1;
2567 return buf[0]&0x3F;
17be1a4a 2568 } else if (type == REDIS_RDB_14BITLEN) {
f78fd11b 2569 /* Read a 14 bit len */
2570 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2571 return ((buf[0]&0x3F)<<8)|buf[1];
2572 } else {
2573 /* Read a 32 bit len */
2574 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2575 return ntohl(len);
2576 }
2577 }
f78fd11b 2578}
2579
e3566d4b 2580static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2581 unsigned char enc[4];
2582 long long val;
2583
2584 if (enctype == REDIS_RDB_ENC_INT8) {
2585 if (fread(enc,1,1,fp) == 0) return NULL;
2586 val = (signed char)enc[0];
2587 } else if (enctype == REDIS_RDB_ENC_INT16) {
2588 uint16_t v;
2589 if (fread(enc,2,1,fp) == 0) return NULL;
2590 v = enc[0]|(enc[1]<<8);
2591 val = (int16_t)v;
2592 } else if (enctype == REDIS_RDB_ENC_INT32) {
2593 uint32_t v;
2594 if (fread(enc,4,1,fp) == 0) return NULL;
2595 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2596 val = (int32_t)v;
2597 } else {
2598 val = 0; /* anti-warning */
2599 assert(0!=0);
2600 }
2601 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2602}
2603
88e85998 2604static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2605 unsigned int len, clen;
2606 unsigned char *c = NULL;
2607 sds val = NULL;
2608
2609 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2610 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2611 if ((c = zmalloc(clen)) == NULL) goto err;
2612 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2613 if (fread(c,clen,1,fp) == 0) goto err;
2614 if (lzf_decompress(c,clen,val,len) == 0) goto err;
5109cdff 2615 zfree(c);
88e85998 2616 return createObject(REDIS_STRING,val);
2617err:
2618 zfree(c);
2619 sdsfree(val);
2620 return NULL;
2621}
2622
e3566d4b 2623static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2624 int isencoded;
2625 uint32_t len;
f78fd11b 2626 sds val;
2627
e3566d4b 2628 len = rdbLoadLen(fp,rdbver,&isencoded);
2629 if (isencoded) {
2630 switch(len) {
2631 case REDIS_RDB_ENC_INT8:
2632 case REDIS_RDB_ENC_INT16:
2633 case REDIS_RDB_ENC_INT32:
3305306f 2634 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
88e85998 2635 case REDIS_RDB_ENC_LZF:
2636 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
e3566d4b 2637 default:
2638 assert(0!=0);
2639 }
2640 }
2641
f78fd11b 2642 if (len == REDIS_RDB_LENERR) return NULL;
2643 val = sdsnewlen(NULL,len);
2644 if (len && fread(val,len,1,fp) == 0) {
2645 sdsfree(val);
2646 return NULL;
2647 }
10c43610 2648 return tryObjectSharing(createObject(REDIS_STRING,val));
f78fd11b 2649}
2650
a7866db6 2651/* For information about double serialization check rdbSaveDoubleValue() */
2652static int rdbLoadDoubleValue(FILE *fp, double *val) {
2653 char buf[128];
2654 unsigned char len;
2655
2656 if (fread(&len,1,1,fp) == 0) return -1;
2657 switch(len) {
2658 case 255: *val = R_NegInf; return 0;
2659 case 254: *val = R_PosInf; return 0;
2660 case 253: *val = R_Nan; return 0;
2661 default:
2662 if (fread(buf,len,1,fp) == 0) return -1;
2663 sscanf(buf, "%lg", val);
2664 return 0;
2665 }
2666}
2667
f78fd11b 2668static int rdbLoad(char *filename) {
ed9b544e 2669 FILE *fp;
f78fd11b 2670 robj *keyobj = NULL;
2671 uint32_t dbid;
bb32ede5 2672 int type, retval, rdbver;
3305306f 2673 dict *d = server.db[0].dict;
bb32ede5 2674 redisDb *db = server.db+0;
f78fd11b 2675 char buf[1024];
bb32ede5 2676 time_t expiretime = -1, now = time(NULL);
2677
ed9b544e 2678 fp = fopen(filename,"r");
2679 if (!fp) return REDIS_ERR;
2680 if (fread(buf,9,1,fp) == 0) goto eoferr;
f78fd11b 2681 buf[9] = '\0';
2682 if (memcmp(buf,"REDIS",5) != 0) {
ed9b544e 2683 fclose(fp);
2684 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2685 return REDIS_ERR;
2686 }
f78fd11b 2687 rdbver = atoi(buf+5);
2688 if (rdbver > 1) {
2689 fclose(fp);
2690 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2691 return REDIS_ERR;
2692 }
ed9b544e 2693 while(1) {
2694 robj *o;
2695
2696 /* Read type. */
f78fd11b 2697 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
bb32ede5 2698 if (type == REDIS_EXPIRETIME) {
2699 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2700 /* We read the time so we need to read the object type again */
2701 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2702 }
ed9b544e 2703 if (type == REDIS_EOF) break;
2704 /* Handle SELECT DB opcode as a special case */
2705 if (type == REDIS_SELECTDB) {
e3566d4b 2706 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2707 goto eoferr;
ed9b544e 2708 if (dbid >= (unsigned)server.dbnum) {
f78fd11b 2709 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
ed9b544e 2710 exit(1);
2711 }
bb32ede5 2712 db = server.db+dbid;
2713 d = db->dict;
ed9b544e 2714 continue;
2715 }
2716 /* Read key */
f78fd11b 2717 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
ed9b544e 2718
2719 if (type == REDIS_STRING) {
2720 /* Read string value */
f78fd11b 2721 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
942a3961 2722 tryObjectEncoding(o);
ed9b544e 2723 } else if (type == REDIS_LIST || type == REDIS_SET) {
2724 /* Read list/set value */
2725 uint32_t listlen;
f78fd11b 2726
e3566d4b 2727 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
f78fd11b 2728 goto eoferr;
ed9b544e 2729 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2730 /* Load every single element of the list/set */
2731 while(listlen--) {
2732 robj *ele;
2733
f78fd11b 2734 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
942a3961 2735 tryObjectEncoding(ele);
ed9b544e 2736 if (type == REDIS_LIST) {
6b47e12e 2737 listAddNodeTail((list*)o->ptr,ele);
ed9b544e 2738 } else {
6b47e12e 2739 dictAdd((dict*)o->ptr,ele,NULL);
ed9b544e 2740 }
ed9b544e 2741 }
2b59cfdf 2742 } else if (type == REDIS_ZSET) {
2743 /* Read list/set value */
2744 uint32_t zsetlen;
2745 zset *zs;
2746
2747 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2748 goto eoferr;
2749 o = createZsetObject();
2750 zs = o->ptr;
2751 /* Load every single element of the list/set */
2752 while(zsetlen--) {
2753 robj *ele;
2754 double *score = zmalloc(sizeof(double));
2755
2756 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2757 tryObjectEncoding(ele);
2758 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2759 dictAdd(zs->dict,ele,score);
2760 zslInsert(zs->zsl,*score,ele);
2761 incrRefCount(ele); /* added to skiplist */
2762 }
ed9b544e 2763 } else {
2764 assert(0 != 0);
2765 }
2766 /* Add the new object in the hash table */
f78fd11b 2767 retval = dictAdd(d,keyobj,o);
ed9b544e 2768 if (retval == DICT_ERR) {
f78fd11b 2769 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
ed9b544e 2770 exit(1);
2771 }
bb32ede5 2772 /* Set the expire time if needed */
2773 if (expiretime != -1) {
2774 setExpire(db,keyobj,expiretime);
2775 /* Delete this key if already expired */
2776 if (expiretime < now) deleteKey(db,keyobj);
2777 expiretime = -1;
2778 }
f78fd11b 2779 keyobj = o = NULL;
ed9b544e 2780 }
2781 fclose(fp);
2782 return REDIS_OK;
2783
2784eoferr: /* unexpected end of file is handled here with a fatal exit */
e3566d4b 2785 if (keyobj) decrRefCount(keyobj);
2786 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
ed9b544e 2787 exit(1);
2788 return REDIS_ERR; /* Just to avoid warning */
2789}
2790
2791/*================================== Commands =============================== */
2792
abcb223e 2793static void authCommand(redisClient *c) {
2e77c2ee 2794 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
abcb223e
BH
2795 c->authenticated = 1;
2796 addReply(c,shared.ok);
2797 } else {
2798 c->authenticated = 0;
fa4c0aba 2799 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
abcb223e
BH
2800 }
2801}
2802
ed9b544e 2803static void pingCommand(redisClient *c) {
2804 addReply(c,shared.pong);
2805}
2806
2807static void echoCommand(redisClient *c) {
942a3961 2808 addReplyBulkLen(c,c->argv[1]);
ed9b544e 2809 addReply(c,c->argv[1]);
2810 addReply(c,shared.crlf);
2811}
2812
2813/*=================================== Strings =============================== */
2814
2815static void setGenericCommand(redisClient *c, int nx) {
2816 int retval;
2817
3305306f 2818 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
ed9b544e 2819 if (retval == DICT_ERR) {
2820 if (!nx) {
3305306f 2821 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
ed9b544e 2822 incrRefCount(c->argv[2]);
2823 } else {
c937aa89 2824 addReply(c,shared.czero);
ed9b544e 2825 return;
2826 }
2827 } else {
2828 incrRefCount(c->argv[1]);
2829 incrRefCount(c->argv[2]);
2830 }
2831 server.dirty++;
3305306f 2832 removeExpire(c->db,c->argv[1]);
c937aa89 2833 addReply(c, nx ? shared.cone : shared.ok);
ed9b544e 2834}
2835
2836static void setCommand(redisClient *c) {
a4d1ba9a 2837 setGenericCommand(c,0);
ed9b544e 2838}
2839
2840static void setnxCommand(redisClient *c) {
a4d1ba9a 2841 setGenericCommand(c,1);
ed9b544e 2842}
2843
2844static void getCommand(redisClient *c) {
3305306f 2845 robj *o = lookupKeyRead(c->db,c->argv[1]);
2846
2847 if (o == NULL) {
c937aa89 2848 addReply(c,shared.nullbulk);
ed9b544e 2849 } else {
ed9b544e 2850 if (o->type != REDIS_STRING) {
c937aa89 2851 addReply(c,shared.wrongtypeerr);
ed9b544e 2852 } else {
942a3961 2853 addReplyBulkLen(c,o);
ed9b544e 2854 addReply(c,o);
2855 addReply(c,shared.crlf);
2856 }
2857 }
2858}
2859
f6b141c5 2860static void getsetCommand(redisClient *c) {
a431eb74 2861 getCommand(c);
2862 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2863 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2864 } else {
2865 incrRefCount(c->argv[1]);
2866 }
2867 incrRefCount(c->argv[2]);
2868 server.dirty++;
2869 removeExpire(c->db,c->argv[1]);
2870}
2871
70003d28 2872static void mgetCommand(redisClient *c) {
70003d28 2873 int j;
2874
c937aa89 2875 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
70003d28 2876 for (j = 1; j < c->argc; j++) {
3305306f 2877 robj *o = lookupKeyRead(c->db,c->argv[j]);
2878 if (o == NULL) {
c937aa89 2879 addReply(c,shared.nullbulk);
70003d28 2880 } else {
70003d28 2881 if (o->type != REDIS_STRING) {
c937aa89 2882 addReply(c,shared.nullbulk);
70003d28 2883 } else {
942a3961 2884 addReplyBulkLen(c,o);
70003d28 2885 addReply(c,o);
2886 addReply(c,shared.crlf);
2887 }
2888 }
2889 }
2890}
2891
d68ed120 2892static void incrDecrCommand(redisClient *c, long long incr) {
ed9b544e 2893 long long value;
2894 int retval;
2895 robj *o;
2896
3305306f 2897 o = lookupKeyWrite(c->db,c->argv[1]);
2898 if (o == NULL) {
ed9b544e 2899 value = 0;
2900 } else {
ed9b544e 2901 if (o->type != REDIS_STRING) {
2902 value = 0;
2903 } else {
2904 char *eptr;
2905
942a3961 2906 if (o->encoding == REDIS_ENCODING_RAW)
2907 value = strtoll(o->ptr, &eptr, 10);
2908 else if (o->encoding == REDIS_ENCODING_INT)
2909 value = (long)o->ptr;
2910 else
2911 assert(1 != 1);
ed9b544e 2912 }
2913 }
2914
2915 value += incr;
2916 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
942a3961 2917 tryObjectEncoding(o);
3305306f 2918 retval = dictAdd(c->db->dict,c->argv[1],o);
ed9b544e 2919 if (retval == DICT_ERR) {
3305306f 2920 dictReplace(c->db->dict,c->argv[1],o);
2921 removeExpire(c->db,c->argv[1]);
ed9b544e 2922 } else {
2923 incrRefCount(c->argv[1]);
2924 }
2925 server.dirty++;
c937aa89 2926 addReply(c,shared.colon);
ed9b544e 2927 addReply(c,o);
2928 addReply(c,shared.crlf);
2929}
2930
2931static void incrCommand(redisClient *c) {
a4d1ba9a 2932 incrDecrCommand(c,1);
ed9b544e 2933}
2934
2935static void decrCommand(redisClient *c) {
a4d1ba9a 2936 incrDecrCommand(c,-1);
ed9b544e 2937}
2938
2939static void incrbyCommand(redisClient *c) {
d68ed120 2940 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
a4d1ba9a 2941 incrDecrCommand(c,incr);
ed9b544e 2942}
2943
2944static void decrbyCommand(redisClient *c) {
d68ed120 2945 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
a4d1ba9a 2946 incrDecrCommand(c,-incr);
ed9b544e 2947}
2948
2949/* ========================= Type agnostic commands ========================= */
2950
2951static void delCommand(redisClient *c) {
5109cdff 2952 int deleted = 0, j;
2953
2954 for (j = 1; j < c->argc; j++) {
2955 if (deleteKey(c->db,c->argv[j])) {
2956 server.dirty++;
2957 deleted++;
2958 }
2959 }
2960 switch(deleted) {
2961 case 0:
c937aa89 2962 addReply(c,shared.czero);
5109cdff 2963 break;
2964 case 1:
2965 addReply(c,shared.cone);
2966 break;
2967 default:
2968 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2969 break;
ed9b544e 2970 }
2971}
2972
2973static void existsCommand(redisClient *c) {
3305306f 2974 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
ed9b544e 2975}
2976
2977static void selectCommand(redisClient *c) {
2978 int id = atoi(c->argv[1]->ptr);
2979
2980 if (selectDb(c,id) == REDIS_ERR) {
774e3047 2981 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
ed9b544e 2982 } else {
2983 addReply(c,shared.ok);
2984 }
2985}
2986
2987static void randomkeyCommand(redisClient *c) {
2988 dictEntry *de;
3305306f 2989
2990 while(1) {
2991 de = dictGetRandomKey(c->db->dict);
ce7bef07 2992 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3305306f 2993 }
ed9b544e 2994 if (de == NULL) {
ce7bef07 2995 addReply(c,shared.plus);
ed9b544e 2996 addReply(c,shared.crlf);
2997 } else {
c937aa89 2998 addReply(c,shared.plus);
ed9b544e 2999 addReply(c,dictGetEntryKey(de));
3000 addReply(c,shared.crlf);
3001 }
3002}
3003
3004static void keysCommand(redisClient *c) {
3005 dictIterator *di;
3006 dictEntry *de;
3007 sds pattern = c->argv[1]->ptr;
3008 int plen = sdslen(pattern);
3009 int numkeys = 0, keyslen = 0;
3010 robj *lenobj = createObject(REDIS_STRING,NULL);
3011
3305306f 3012 di = dictGetIterator(c->db->dict);
ed9b544e 3013 addReply(c,lenobj);
3014 decrRefCount(lenobj);
3015 while((de = dictNext(di)) != NULL) {
3016 robj *keyobj = dictGetEntryKey(de);
3305306f 3017
ed9b544e 3018 sds key = keyobj->ptr;
3019 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3020 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3305306f 3021 if (expireIfNeeded(c->db,keyobj) == 0) {
3022 if (numkeys != 0)
3023 addReply(c,shared.space);
3024 addReply(c,keyobj);
3025 numkeys++;
3026 keyslen += sdslen(key);
3027 }
ed9b544e 3028 }
3029 }
3030 dictReleaseIterator(di);
c937aa89 3031 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
ed9b544e 3032 addReply(c,shared.crlf);
3033}
3034
3035static void dbsizeCommand(redisClient *c) {
3036 addReplySds(c,
3305306f 3037 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
ed9b544e 3038}
3039
3040static void lastsaveCommand(redisClient *c) {
3041 addReplySds(c,
c937aa89 3042 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
ed9b544e 3043}
3044
3045static void typeCommand(redisClient *c) {
3305306f 3046 robj *o;
ed9b544e 3047 char *type;
3305306f 3048
3049 o = lookupKeyRead(c->db,c->argv[1]);
3050 if (o == NULL) {
c937aa89 3051 type = "+none";
ed9b544e 3052 } else {
ed9b544e 3053 switch(o->type) {
c937aa89 3054 case REDIS_STRING: type = "+string"; break;
3055 case REDIS_LIST: type = "+list"; break;
3056 case REDIS_SET: type = "+set"; break;
ed9b544e 3057 default: type = "unknown"; break;
3058 }
3059 }
3060 addReplySds(c,sdsnew(type));
3061 addReply(c,shared.crlf);
3062}
3063
3064static void saveCommand(redisClient *c) {
05557f6d 3065 if (server.bgsaveinprogress) {
3066 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3067 return;
3068 }
f78fd11b 3069 if (rdbSave(server.dbfilename) == REDIS_OK) {
ed9b544e 3070 addReply(c,shared.ok);
3071 } else {
3072 addReply(c,shared.err);
3073 }
3074}
3075
3076static void bgsaveCommand(redisClient *c) {
3077 if (server.bgsaveinprogress) {
3078 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3079 return;
3080 }
f78fd11b 3081 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
ed9b544e 3082 addReply(c,shared.ok);
3083 } else {
3084 addReply(c,shared.err);
3085 }
3086}
3087
3088static void shutdownCommand(redisClient *c) {
3089 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
a3b21203 3090 /* Kill the saving child if there is a background saving in progress.
3091 We want to avoid race conditions, for instance our saving child may
3092 overwrite the synchronous saving did by SHUTDOWN. */
9f3c422c 3093 if (server.bgsaveinprogress) {
3094 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3095 kill(server.bgsavechildpid,SIGKILL);
a3b21203 3096 rdbRemoveTempFile(server.bgsavechildpid);
9f3c422c 3097 }
a3b21203 3098 /* SYNC SAVE */
f78fd11b 3099 if (rdbSave(server.dbfilename) == REDIS_OK) {
9f3c422c 3100 if (server.daemonize)
b284af55 3101 unlink(server.pidfile);
b284af55 3102 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
ed9b544e 3103 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3104 exit(1);
3105 } else {
a3b21203 3106 /* Ooops.. error saving! The best we can do is to continue operating.
3107 * Note that if there was a background saving process, in the next
3108 * cron() Redis will be notified that the background saving aborted,
3109 * handling special stuff like slaves pending for synchronization... */
ed9b544e 3110 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3111 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3112 }
3113}
3114
3115static void renameGenericCommand(redisClient *c, int nx) {
ed9b544e 3116 robj *o;
3117
3118 /* To use the same key as src and dst is probably an error */
3119 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
c937aa89 3120 addReply(c,shared.sameobjecterr);
ed9b544e 3121 return;
3122 }
3123
3305306f 3124 o = lookupKeyWrite(c->db,c->argv[1]);
3125 if (o == NULL) {
c937aa89 3126 addReply(c,shared.nokeyerr);
ed9b544e 3127 return;
3128 }
ed9b544e 3129 incrRefCount(o);
3305306f 3130 deleteIfVolatile(c->db,c->argv[2]);
3131 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
ed9b544e 3132 if (nx) {
3133 decrRefCount(o);
c937aa89 3134 addReply(c,shared.czero);
ed9b544e 3135 return;
3136 }
3305306f 3137 dictReplace(c->db->dict,c->argv[2],o);
ed9b544e 3138 } else {
3139 incrRefCount(c->argv[2]);
3140 }
3305306f 3141 deleteKey(c->db,c->argv[1]);
ed9b544e 3142 server.dirty++;
c937aa89 3143 addReply(c,nx ? shared.cone : shared.ok);
ed9b544e 3144}
3145
3146static void renameCommand(redisClient *c) {
3147 renameGenericCommand(c,0);
3148}
3149
3150static void renamenxCommand(redisClient *c) {
3151 renameGenericCommand(c,1);
3152}
3153
3154static void moveCommand(redisClient *c) {
3305306f 3155 robj *o;
3156 redisDb *src, *dst;
ed9b544e 3157 int srcid;
3158
3159 /* Obtain source and target DB pointers */
3305306f 3160 src = c->db;
3161 srcid = c->db->id;
ed9b544e 3162 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
c937aa89 3163 addReply(c,shared.outofrangeerr);
ed9b544e 3164 return;
3165 }
3305306f 3166 dst = c->db;
3167 selectDb(c,srcid); /* Back to the source DB */
ed9b544e 3168
3169 /* If the user is moving using as target the same
3170 * DB as the source DB it is probably an error. */
3171 if (src == dst) {
c937aa89 3172 addReply(c,shared.sameobjecterr);
ed9b544e 3173 return;
3174 }
3175
3176 /* Check if the element exists and get a reference */
3305306f 3177 o = lookupKeyWrite(c->db,c->argv[1]);
3178 if (!o) {
c937aa89 3179 addReply(c,shared.czero);
ed9b544e 3180 return;
3181 }
3182
3183 /* Try to add the element to the target DB */
3305306f 3184 deleteIfVolatile(dst,c->argv[1]);
3185 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
c937aa89 3186 addReply(c,shared.czero);
ed9b544e 3187 return;
3188 }
3305306f 3189 incrRefCount(c->argv[1]);
ed9b544e 3190 incrRefCount(o);
3191
3192 /* OK! key moved, free the entry in the source DB */
3305306f 3193 deleteKey(src,c->argv[1]);
ed9b544e 3194 server.dirty++;
c937aa89 3195 addReply(c,shared.cone);
ed9b544e 3196}
3197
3198/* =================================== Lists ================================ */
3199static void pushGenericCommand(redisClient *c, int where) {
3200 robj *lobj;
ed9b544e 3201 list *list;
3305306f 3202
3203 lobj = lookupKeyWrite(c->db,c->argv[1]);
3204 if (lobj == NULL) {
ed9b544e 3205 lobj = createListObject();
3206 list = lobj->ptr;
3207 if (where == REDIS_HEAD) {
6b47e12e 3208 listAddNodeHead(list,c->argv[2]);
ed9b544e 3209 } else {
6b47e12e 3210 listAddNodeTail(list,c->argv[2]);
ed9b544e 3211 }
3305306f 3212 dictAdd(c->db->dict,c->argv[1],lobj);
ed9b544e 3213 incrRefCount(c->argv[1]);
3214 incrRefCount(c->argv[2]);
3215 } else {
ed9b544e 3216 if (lobj->type != REDIS_LIST) {
3217 addReply(c,shared.wrongtypeerr);
3218 return;
3219 }
3220 list = lobj->ptr;
3221 if (where == REDIS_HEAD) {
6b47e12e 3222 listAddNodeHead(list,c->argv[2]);
ed9b544e 3223 } else {
6b47e12e 3224 listAddNodeTail(list,c->argv[2]);
ed9b544e 3225 }
3226 incrRefCount(c->argv[2]);
3227 }
3228 server.dirty++;
3229 addReply(c,shared.ok);
3230}
3231
3232static void lpushCommand(redisClient *c) {
3233 pushGenericCommand(c,REDIS_HEAD);
3234}
3235
3236static void rpushCommand(redisClient *c) {
3237 pushGenericCommand(c,REDIS_TAIL);
3238}
3239
3240static void llenCommand(redisClient *c) {
3305306f 3241 robj *o;
ed9b544e 3242 list *l;
3243
3305306f 3244 o = lookupKeyRead(c->db,c->argv[1]);
3245 if (o == NULL) {
c937aa89 3246 addReply(c,shared.czero);
ed9b544e 3247 return;
3248 } else {
ed9b544e 3249 if (o->type != REDIS_LIST) {
c937aa89 3250 addReply(c,shared.wrongtypeerr);
ed9b544e 3251 } else {
3252 l = o->ptr;
c937aa89 3253 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
ed9b544e 3254 }
3255 }
3256}
3257
3258static void lindexCommand(redisClient *c) {
3305306f 3259 robj *o;
ed9b544e 3260 int index = atoi(c->argv[2]->ptr);
3261
3305306f 3262 o = lookupKeyRead(c->db,c->argv[1]);
3263 if (o == NULL) {
c937aa89 3264 addReply(c,shared.nullbulk);
ed9b544e 3265 } else {
ed9b544e 3266 if (o->type != REDIS_LIST) {
c937aa89 3267 addReply(c,shared.wrongtypeerr);
ed9b544e 3268 } else {
3269 list *list = o->ptr;
3270 listNode *ln;
3271
3272 ln = listIndex(list, index);
3273 if (ln == NULL) {
c937aa89 3274 addReply(c,shared.nullbulk);
ed9b544e 3275 } else {
3276 robj *ele = listNodeValue(ln);
942a3961 3277 addReplyBulkLen(c,ele);
ed9b544e 3278 addReply(c,ele);
3279 addReply(c,shared.crlf);
3280 }
3281 }
3282 }
3283}
3284
3285static void lsetCommand(redisClient *c) {
3305306f 3286 robj *o;
ed9b544e 3287 int index = atoi(c->argv[2]->ptr);
3288
3305306f 3289 o = lookupKeyWrite(c->db,c->argv[1]);
3290 if (o == NULL) {
ed9b544e 3291 addReply(c,shared.nokeyerr);
3292 } else {
ed9b544e 3293 if (o->type != REDIS_LIST) {
3294 addReply(c,shared.wrongtypeerr);
3295 } else {
3296 list *list = o->ptr;
3297 listNode *ln;
3298
3299 ln = listIndex(list, index);
3300 if (ln == NULL) {
c937aa89 3301 addReply(c,shared.outofrangeerr);
ed9b544e 3302 } else {
3303 robj *ele = listNodeValue(ln);
3304
3305 decrRefCount(ele);
3306 listNodeValue(ln) = c->argv[3];
3307 incrRefCount(c->argv[3]);
3308 addReply(c,shared.ok);
3309 server.dirty++;
3310 }
3311 }
3312 }
3313}
3314
3315static void popGenericCommand(redisClient *c, int where) {
3305306f 3316 robj *o;
3317
3318 o = lookupKeyWrite(c->db,c->argv[1]);
3319 if (o == NULL) {
c937aa89 3320 addReply(c,shared.nullbulk);
ed9b544e 3321 } else {
ed9b544e 3322 if (o->type != REDIS_LIST) {
c937aa89 3323 addReply(c,shared.wrongtypeerr);
ed9b544e 3324 } else {
3325 list *list = o->ptr;
3326 listNode *ln;
3327
3328 if (where == REDIS_HEAD)
3329 ln = listFirst(list);
3330 else
3331 ln = listLast(list);
3332
3333 if (ln == NULL) {
c937aa89 3334 addReply(c,shared.nullbulk);
ed9b544e 3335 } else {
3336 robj *ele = listNodeValue(ln);
942a3961 3337 addReplyBulkLen(c,ele);
ed9b544e 3338 addReply(c,ele);
3339 addReply(c,shared.crlf);
3340 listDelNode(list,ln);
3341 server.dirty++;
3342 }
3343 }
3344 }
3345}
3346
3347static void lpopCommand(redisClient *c) {
3348 popGenericCommand(c,REDIS_HEAD);
3349}
3350
3351static void rpopCommand(redisClient *c) {
3352 popGenericCommand(c,REDIS_TAIL);
3353}
3354
3355static void lrangeCommand(redisClient *c) {
3305306f 3356 robj *o;
ed9b544e 3357 int start = atoi(c->argv[2]->ptr);
3358 int end = atoi(c->argv[3]->ptr);
3305306f 3359
3360 o = lookupKeyRead(c->db,c->argv[1]);
3361 if (o == NULL) {
c937aa89 3362 addReply(c,shared.nullmultibulk);
ed9b544e 3363 } else {
ed9b544e 3364 if (o->type != REDIS_LIST) {
c937aa89 3365 addReply(c,shared.wrongtypeerr);
ed9b544e 3366 } else {
3367 list *list = o->ptr;
3368 listNode *ln;
3369 int llen = listLength(list);
3370 int rangelen, j;
3371 robj *ele;
3372
3373 /* convert negative indexes */
3374 if (start < 0) start = llen+start;
3375 if (end < 0) end = llen+end;
3376 if (start < 0) start = 0;
3377 if (end < 0) end = 0;
3378
3379 /* indexes sanity checks */
3380 if (start > end || start >= llen) {
3381 /* Out of range start or start > end result in empty list */
c937aa89 3382 addReply(c,shared.emptymultibulk);
ed9b544e 3383 return;
3384 }
3385 if (end >= llen) end = llen-1;
3386 rangelen = (end-start)+1;
3387
3388 /* Return the result in form of a multi-bulk reply */
3389 ln = listIndex(list, start);
c937aa89 3390 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
ed9b544e 3391 for (j = 0; j < rangelen; j++) {
3392 ele = listNodeValue(ln);
942a3961 3393 addReplyBulkLen(c,ele);
ed9b544e 3394 addReply(c,ele);
3395 addReply(c,shared.crlf);
3396 ln = ln->next;
3397 }
3398 }
3399 }
3400}
3401
3402static void ltrimCommand(redisClient *c) {
3305306f 3403 robj *o;
ed9b544e 3404 int start = atoi(c->argv[2]->ptr);
3405 int end = atoi(c->argv[3]->ptr);
3406
3305306f 3407 o = lookupKeyWrite(c->db,c->argv[1]);
3408 if (o == NULL) {
ed9b544e 3409 addReply(c,shared.nokeyerr);
3410 } else {
ed9b544e 3411 if (o->type != REDIS_LIST) {
3412 addReply(c,shared.wrongtypeerr);
3413 } else {
3414 list *list = o->ptr;
3415 listNode *ln;
3416 int llen = listLength(list);
3417 int j, ltrim, rtrim;
3418
3419 /* convert negative indexes */
3420 if (start < 0) start = llen+start;
3421 if (end < 0) end = llen+end;
3422 if (start < 0) start = 0;
3423 if (end < 0) end = 0;
3424
3425 /* indexes sanity checks */
3426 if (start > end || start >= llen) {
3427 /* Out of range start or start > end result in empty list */
3428 ltrim = llen;
3429 rtrim = 0;
3430 } else {
3431 if (end >= llen) end = llen-1;
3432 ltrim = start;
3433 rtrim = llen-end-1;
3434 }
3435
3436 /* Remove list elements to perform the trim */
3437 for (j = 0; j < ltrim; j++) {
3438 ln = listFirst(list);
3439 listDelNode(list,ln);
3440 }
3441 for (j = 0; j < rtrim; j++) {
3442 ln = listLast(list);
3443 listDelNode(list,ln);
3444 }
ed9b544e 3445 server.dirty++;
e59229a2 3446 addReply(c,shared.ok);
ed9b544e 3447 }
3448 }
3449}
3450
3451static void lremCommand(redisClient *c) {
3305306f 3452 robj *o;
ed9b544e 3453
3305306f 3454 o = lookupKeyWrite(c->db,c->argv[1]);
3455 if (o == NULL) {
33c08b39 3456 addReply(c,shared.czero);
ed9b544e 3457 } else {
ed9b544e 3458 if (o->type != REDIS_LIST) {
c937aa89 3459 addReply(c,shared.wrongtypeerr);
ed9b544e 3460 } else {
3461 list *list = o->ptr;
3462 listNode *ln, *next;
3463 int toremove = atoi(c->argv[2]->ptr);
3464 int removed = 0;
3465 int fromtail = 0;
3466
3467 if (toremove < 0) {
3468 toremove = -toremove;
3469 fromtail = 1;
3470 }
3471 ln = fromtail ? list->tail : list->head;
3472 while (ln) {
ed9b544e 3473 robj *ele = listNodeValue(ln);
a4d1ba9a 3474
3475 next = fromtail ? ln->prev : ln->next;
724a51b1 3476 if (compareStringObjects(ele,c->argv[3]) == 0) {
ed9b544e 3477 listDelNode(list,ln);
3478 server.dirty++;
3479 removed++;
3480 if (toremove && removed == toremove) break;
3481 }
3482 ln = next;
3483 }
c937aa89 3484 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
ed9b544e 3485 }
3486 }
3487}
3488
3489/* ==================================== Sets ================================ */
3490
3491static void saddCommand(redisClient *c) {
ed9b544e 3492 robj *set;
3493
3305306f 3494 set = lookupKeyWrite(c->db,c->argv[1]);
3495 if (set == NULL) {
ed9b544e 3496 set = createSetObject();
3305306f 3497 dictAdd(c->db->dict,c->argv[1],set);
ed9b544e 3498 incrRefCount(c->argv[1]);
3499 } else {
ed9b544e 3500 if (set->type != REDIS_SET) {
c937aa89 3501 addReply(c,shared.wrongtypeerr);
ed9b544e 3502 return;
3503 }
3504 }
3505 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3506 incrRefCount(c->argv[2]);
3507 server.dirty++;
c937aa89 3508 addReply(c,shared.cone);
ed9b544e 3509 } else {
c937aa89 3510 addReply(c,shared.czero);
ed9b544e 3511 }
3512}
3513
3514static void sremCommand(redisClient *c) {
3305306f 3515 robj *set;
ed9b544e 3516
3305306f 3517 set = lookupKeyWrite(c->db,c->argv[1]);
3518 if (set == NULL) {
c937aa89 3519 addReply(c,shared.czero);
ed9b544e 3520 } else {
ed9b544e 3521 if (set->type != REDIS_SET) {
c937aa89 3522 addReply(c,shared.wrongtypeerr);
ed9b544e 3523 return;
3524 }
3525 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3526 server.dirty++;
12fea928 3527 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
c937aa89 3528 addReply(c,shared.cone);
ed9b544e 3529 } else {
c937aa89 3530 addReply(c,shared.czero);
ed9b544e 3531 }
3532 }
3533}
3534
a4460ef4 3535static void smoveCommand(redisClient *c) {
3536 robj *srcset, *dstset;
3537
3538 srcset = lookupKeyWrite(c->db,c->argv[1]);
3539 dstset = lookupKeyWrite(c->db,c->argv[2]);
3540
3541 /* If the source key does not exist return 0, if it's of the wrong type
3542 * raise an error */
3543 if (srcset == NULL || srcset->type != REDIS_SET) {
3544 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3545 return;
3546 }
3547 /* Error if the destination key is not a set as well */
3548 if (dstset && dstset->type != REDIS_SET) {
3549 addReply(c,shared.wrongtypeerr);
3550 return;
3551 }
3552 /* Remove the element from the source set */
3553 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3554 /* Key not found in the src set! return zero */
3555 addReply(c,shared.czero);
3556 return;
3557 }
3558 server.dirty++;
3559 /* Add the element to the destination set */
3560 if (!dstset) {
3561 dstset = createSetObject();
3562 dictAdd(c->db->dict,c->argv[2],dstset);
3563 incrRefCount(c->argv[2]);
3564 }
3565 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3566 incrRefCount(c->argv[3]);
3567 addReply(c,shared.cone);
3568}
3569
ed9b544e 3570static void sismemberCommand(redisClient *c) {
3305306f 3571 robj *set;
ed9b544e 3572
3305306f 3573 set = lookupKeyRead(c->db,c->argv[1]);
3574 if (set == NULL) {
c937aa89 3575 addReply(c,shared.czero);
ed9b544e 3576 } else {
ed9b544e 3577 if (set->type != REDIS_SET) {
c937aa89 3578 addReply(c,shared.wrongtypeerr);
ed9b544e 3579 return;
3580 }
3581 if (dictFind(set->ptr,c->argv[2]))
c937aa89 3582 addReply(c,shared.cone);
ed9b544e 3583 else
c937aa89 3584 addReply(c,shared.czero);
ed9b544e 3585 }
3586}
3587
3588static void scardCommand(redisClient *c) {
3305306f 3589 robj *o;
ed9b544e 3590 dict *s;
3591
3305306f 3592 o = lookupKeyRead(c->db,c->argv[1]);
3593 if (o == NULL) {
c937aa89 3594 addReply(c,shared.czero);
ed9b544e 3595 return;
3596 } else {
ed9b544e 3597 if (o->type != REDIS_SET) {
c937aa89 3598 addReply(c,shared.wrongtypeerr);
ed9b544e 3599 } else {
3600 s = o->ptr;
c937aa89 3601 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3305306f 3602 dictSize(s)));
ed9b544e 3603 }
3604 }
3605}
3606
12fea928 3607static void spopCommand(redisClient *c) {
3608 robj *set;
3609 dictEntry *de;
3610
3611 set = lookupKeyWrite(c->db,c->argv[1]);
3612 if (set == NULL) {
3613 addReply(c,shared.nullbulk);
3614 } else {
3615 if (set->type != REDIS_SET) {
3616 addReply(c,shared.wrongtypeerr);
3617 return;
3618 }
3619 de = dictGetRandomKey(set->ptr);
3620 if (de == NULL) {
3621 addReply(c,shared.nullbulk);
3622 } else {
3623 robj *ele = dictGetEntryKey(de);
3624
942a3961 3625 addReplyBulkLen(c,ele);
12fea928 3626 addReply(c,ele);
3627 addReply(c,shared.crlf);
3628 dictDelete(set->ptr,ele);
3629 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3630 server.dirty++;
3631 }
3632 }
3633}
3634
2abb95a9 3635static void srandmemberCommand(redisClient *c) {
3636 robj *set;
3637 dictEntry *de;
3638
3639 set = lookupKeyRead(c->db,c->argv[1]);
3640 if (set == NULL) {
3641 addReply(c,shared.nullbulk);
3642 } else {
3643 if (set->type != REDIS_SET) {
3644 addReply(c,shared.wrongtypeerr);
3645 return;
3646 }
3647 de = dictGetRandomKey(set->ptr);
3648 if (de == NULL) {
3649 addReply(c,shared.nullbulk);
3650 } else {
3651 robj *ele = dictGetEntryKey(de);
3652
3653 addReplyBulkLen(c,ele);
3654 addReply(c,ele);
3655 addReply(c,shared.crlf);
3656 }
3657 }
3658}
3659
ed9b544e 3660static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3661 dict **d1 = (void*) s1, **d2 = (void*) s2;
3662
3305306f 3663 return dictSize(*d1)-dictSize(*d2);
ed9b544e 3664}
3665
3666static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3667 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3668 dictIterator *di;
3669 dictEntry *de;
3670 robj *lenobj = NULL, *dstset = NULL;
3671 int j, cardinality = 0;
3672
ed9b544e 3673 for (j = 0; j < setsnum; j++) {
3674 robj *setobj;
3305306f 3675
3676 setobj = dstkey ?
3677 lookupKeyWrite(c->db,setskeys[j]) :
3678 lookupKeyRead(c->db,setskeys[j]);
3679 if (!setobj) {
ed9b544e 3680 zfree(dv);
5faa6025 3681 if (dstkey) {
3682 deleteKey(c->db,dstkey);
3683 addReply(c,shared.ok);
3684 } else {
3685 addReply(c,shared.nullmultibulk);
3686 }
ed9b544e 3687 return;
3688 }
ed9b544e 3689 if (setobj->type != REDIS_SET) {
3690 zfree(dv);
c937aa89 3691 addReply(c,shared.wrongtypeerr);
ed9b544e 3692 return;
3693 }
3694 dv[j] = setobj->ptr;
3695 }
3696 /* Sort sets from the smallest to largest, this will improve our
3697 * algorithm's performace */
3698 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3699
3700 /* The first thing we should output is the total number of elements...
3701 * since this is a multi-bulk write, but at this stage we don't know
3702 * the intersection set size, so we use a trick, append an empty object
3703 * to the output list and save the pointer to later modify it with the
3704 * right length */
3705 if (!dstkey) {
3706 lenobj = createObject(REDIS_STRING,NULL);
3707 addReply(c,lenobj);
3708 decrRefCount(lenobj);
3709 } else {
3710 /* If we have a target key where to store the resulting set
3711 * create this key with an empty set inside */
3712 dstset = createSetObject();
ed9b544e 3713 }
3714
3715 /* Iterate all the elements of the first (smallest) set, and test
3716 * the element against all the other sets, if at least one set does
3717 * not include the element it is discarded */
3718 di = dictGetIterator(dv[0]);
ed9b544e 3719
3720 while((de = dictNext(di)) != NULL) {
3721 robj *ele;
3722
3723 for (j = 1; j < setsnum; j++)
3724 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3725 if (j != setsnum)
3726 continue; /* at least one set does not contain the member */
3727 ele = dictGetEntryKey(de);
3728 if (!dstkey) {
942a3961 3729 addReplyBulkLen(c,ele);
ed9b544e 3730 addReply(c,ele);
3731 addReply(c,shared.crlf);
3732 cardinality++;
3733 } else {
3734 dictAdd(dstset->ptr,ele,NULL);
3735 incrRefCount(ele);
3736 }
3737 }
3738 dictReleaseIterator(di);
3739
83cdfe18
AG
3740 if (dstkey) {
3741 /* Store the resulting set into the target */
3742 deleteKey(c->db,dstkey);
3743 dictAdd(c->db->dict,dstkey,dstset);
3744 incrRefCount(dstkey);
3745 }
3746
40d224a9 3747 if (!dstkey) {
c937aa89 3748 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
40d224a9 3749 } else {
03fd01c7 3750 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3751 dictSize((dict*)dstset->ptr)));
40d224a9 3752 server.dirty++;
3753 }
ed9b544e 3754 zfree(dv);
3755}
3756
3757static void sinterCommand(redisClient *c) {
3758 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3759}
3760
3761static void sinterstoreCommand(redisClient *c) {
3762 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3763}
3764
f4f56e1d 3765#define REDIS_OP_UNION 0
3766#define REDIS_OP_DIFF 1
3767
3768static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
40d224a9 3769 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3770 dictIterator *di;
3771 dictEntry *de;
f4f56e1d 3772 robj *dstset = NULL;
40d224a9 3773 int j, cardinality = 0;
3774
40d224a9 3775 for (j = 0; j < setsnum; j++) {
3776 robj *setobj;
3777
3778 setobj = dstkey ?
3779 lookupKeyWrite(c->db,setskeys[j]) :
3780 lookupKeyRead(c->db,setskeys[j]);
3781 if (!setobj) {
3782 dv[j] = NULL;
3783 continue;
3784 }
3785 if (setobj->type != REDIS_SET) {
3786 zfree(dv);
3787 addReply(c,shared.wrongtypeerr);
3788 return;
3789 }
3790 dv[j] = setobj->ptr;
3791 }
3792
3793 /* We need a temp set object to store our union. If the dstkey
3794 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3795 * this set object will be the resulting object to set into the target key*/
3796 dstset = createSetObject();
3797
40d224a9 3798 /* Iterate all the elements of all the sets, add every element a single
3799 * time to the result set */
3800 for (j = 0; j < setsnum; j++) {
51829ed3 3801 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
40d224a9 3802 if (!dv[j]) continue; /* non existing keys are like empty sets */
3803
3804 di = dictGetIterator(dv[j]);
40d224a9 3805
3806 while((de = dictNext(di)) != NULL) {
3807 robj *ele;
3808
3809 /* dictAdd will not add the same element multiple times */
3810 ele = dictGetEntryKey(de);
f4f56e1d 3811 if (op == REDIS_OP_UNION || j == 0) {
3812 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3813 incrRefCount(ele);
40d224a9 3814 cardinality++;
3815 }
f4f56e1d 3816 } else if (op == REDIS_OP_DIFF) {
3817 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3818 cardinality--;
3819 }
40d224a9 3820 }
3821 }
3822 dictReleaseIterator(di);
51829ed3
AG
3823
3824 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
40d224a9 3825 }
3826
f4f56e1d 3827 /* Output the content of the resulting set, if not in STORE mode */
3828 if (!dstkey) {
3829 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3830 di = dictGetIterator(dstset->ptr);
f4f56e1d 3831 while((de = dictNext(di)) != NULL) {
3832 robj *ele;
3833
3834 ele = dictGetEntryKey(de);
942a3961 3835 addReplyBulkLen(c,ele);
f4f56e1d 3836 addReply(c,ele);
3837 addReply(c,shared.crlf);
3838 }
3839 dictReleaseIterator(di);
83cdfe18
AG
3840 } else {
3841 /* If we have a target key where to store the resulting set
3842 * create this key with the result set inside */
3843 deleteKey(c->db,dstkey);
3844 dictAdd(c->db->dict,dstkey,dstset);
3845 incrRefCount(dstkey);
f4f56e1d 3846 }
3847
3848 /* Cleanup */
40d224a9 3849 if (!dstkey) {
40d224a9 3850 decrRefCount(dstset);
3851 } else {
03fd01c7 3852 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3853 dictSize((dict*)dstset->ptr)));
40d224a9 3854 server.dirty++;
3855 }
3856 zfree(dv);
3857}
3858
3859static void sunionCommand(redisClient *c) {
f4f56e1d 3860 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
40d224a9 3861}
3862
3863static void sunionstoreCommand(redisClient *c) {
f4f56e1d 3864 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3865}
3866
3867static void sdiffCommand(redisClient *c) {
3868 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3869}
3870
3871static void sdiffstoreCommand(redisClient *c) {
3872 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
40d224a9 3873}
3874
6b47e12e 3875/* ==================================== ZSets =============================== */
3876
3877/* ZSETs are ordered sets using two data structures to hold the same elements
3878 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3879 * data structure.
3880 *
3881 * The elements are added to an hash table mapping Redis objects to scores.
3882 * At the same time the elements are added to a skip list mapping scores
3883 * to Redis objects (so objects are sorted by scores in this "view"). */
3884
3885/* This skiplist implementation is almost a C translation of the original
3886 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3887 * Alternative to Balanced Trees", modified in three ways:
3888 * a) this implementation allows for repeated values.
3889 * b) the comparison is not just by key (our 'score') but by satellite data.
3890 * c) there is a back pointer, so it's a doubly linked list with the back
3891 * pointers being only at "level 1". This allows to traverse the list
3892 * from tail to head, useful for ZREVRANGE. */
3893
3894static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3895 zskiplistNode *zn = zmalloc(sizeof(*zn));
3896
3897 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3898 zn->score = score;
3899 zn->obj = obj;
3900 return zn;
3901}
3902
3903static zskiplist *zslCreate(void) {
3904 int j;
3905 zskiplist *zsl;
3906
3907 zsl = zmalloc(sizeof(*zsl));
3908 zsl->level = 1;
cc812361 3909 zsl->length = 0;
6b47e12e 3910 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3911 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3912 zsl->header->forward[j] = NULL;
e3870fab 3913 zsl->header->backward = NULL;
3914 zsl->tail = NULL;
6b47e12e 3915 return zsl;
3916}
3917
fd8ccf44 3918static void zslFreeNode(zskiplistNode *node) {
3919 decrRefCount(node->obj);
ad807e6f 3920 zfree(node->forward);
fd8ccf44 3921 zfree(node);
3922}
3923
3924static void zslFree(zskiplist *zsl) {
ad807e6f 3925 zskiplistNode *node = zsl->header->forward[0], *next;
fd8ccf44 3926
ad807e6f 3927 zfree(zsl->header->forward);
3928 zfree(zsl->header);
fd8ccf44 3929 while(node) {
599379dd 3930 next = node->forward[0];
fd8ccf44 3931 zslFreeNode(node);
3932 node = next;
3933 }
ad807e6f 3934 zfree(zsl);
fd8ccf44 3935}
3936
6b47e12e 3937static int zslRandomLevel(void) {
3938 int level = 1;
3939 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3940 level += 1;
3941 return level;
3942}
3943
3944static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3945 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3946 int i, level;
3947
3948 x = zsl->header;
3949 for (i = zsl->level-1; i >= 0; i--) {
9d60e6e4 3950 while (x->forward[i] &&
3951 (x->forward[i]->score < score ||
3952 (x->forward[i]->score == score &&
3953 compareStringObjects(x->forward[i]->obj,obj) < 0)))
6b47e12e 3954 x = x->forward[i];
3955 update[i] = x;
3956 }
6b47e12e 3957 /* we assume the key is not already inside, since we allow duplicated
3958 * scores, and the re-insertion of score and redis object should never
3959 * happpen since the caller of zslInsert() should test in the hash table
3960 * if the element is already inside or not. */
3961 level = zslRandomLevel();
3962 if (level > zsl->level) {
3963 for (i = zsl->level; i < level; i++)
3964 update[i] = zsl->header;
3965 zsl->level = level;
3966 }
3967 x = zslCreateNode(level,score,obj);
3968 for (i = 0; i < level; i++) {
3969 x->forward[i] = update[i]->forward[i];
3970 update[i]->forward[i] = x;
3971 }
bb975144 3972 x->backward = (update[0] == zsl->header) ? NULL : update[0];
e3870fab 3973 if (x->forward[0])
3974 x->forward[0]->backward = x;
3975 else
3976 zsl->tail = x;
cc812361 3977 zsl->length++;
6b47e12e 3978}
3979
50c55df5 3980/* Delete an element with matching score/object from the skiplist. */
fd8ccf44 3981static int zslDelete(zskiplist *zsl, double score, robj *obj) {
e197b441 3982 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3983 int i;
3984
3985 x = zsl->header;
3986 for (i = zsl->level-1; i >= 0; i--) {
9d60e6e4 3987 while (x->forward[i] &&
3988 (x->forward[i]->score < score ||
3989 (x->forward[i]->score == score &&
3990 compareStringObjects(x->forward[i]->obj,obj) < 0)))
e197b441 3991 x = x->forward[i];
3992 update[i] = x;
3993 }
3994 /* We may have multiple elements with the same score, what we need
3995 * is to find the element with both the right score and object. */
3996 x = x->forward[0];
50c55df5 3997 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
9d60e6e4 3998 for (i = 0; i < zsl->level; i++) {
3999 if (update[i]->forward[i] != x) break;
4000 update[i]->forward[i] = x->forward[i];
4001 }
4002 if (x->forward[0]) {
4003 x->forward[0]->backward = (x->backward == zsl->header) ?
4004 NULL : x->backward;
e197b441 4005 } else {
9d60e6e4 4006 zsl->tail = x->backward;
e197b441 4007 }
9d60e6e4 4008 zslFreeNode(x);
4009 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4010 zsl->level--;
4011 zsl->length--;
4012 return 1;
4013 } else {
4014 return 0; /* not found */
e197b441 4015 }
4016 return 0; /* not found */
fd8ccf44 4017}
4018
1807985b 4019/* Delete all the elements with score between min and max from the skiplist.
4020 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4021 * Note that this function takes the reference to the hash table view of the
4022 * sorted set, in order to remove the elements from the hash table too. */
4023static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4024 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4025 unsigned long removed = 0;
4026 int i;
4027
4028 x = zsl->header;
4029 for (i = zsl->level-1; i >= 0; i--) {
4030 while (x->forward[i] && x->forward[i]->score < min)
4031 x = x->forward[i];
4032 update[i] = x;
4033 }
4034 /* We may have multiple elements with the same score, what we need
4035 * is to find the element with both the right score and object. */
4036 x = x->forward[0];
4037 while (x && x->score <= max) {
4038 zskiplistNode *next;
4039
4040 for (i = 0; i < zsl->level; i++) {
4041 if (update[i]->forward[i] != x) break;
4042 update[i]->forward[i] = x->forward[i];
4043 }
4044 if (x->forward[0]) {
4045 x->forward[0]->backward = (x->backward == zsl->header) ?
4046 NULL : x->backward;
4047 } else {
4048 zsl->tail = x->backward;
4049 }
4050 next = x->forward[0];
4051 dictDelete(dict,x->obj);
4052 zslFreeNode(x);
4053 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4054 zsl->level--;
4055 zsl->length--;
4056 removed++;
4057 x = next;
4058 }
4059 return removed; /* not found */
4060}
4061
50c55df5 4062/* Find the first node having a score equal or greater than the specified one.
4063 * Returns NULL if there is no match. */
4064static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4065 zskiplistNode *x;
4066 int i;
4067
4068 x = zsl->header;
4069 for (i = zsl->level-1; i >= 0; i--) {
4070 while (x->forward[i] && x->forward[i]->score < score)
4071 x = x->forward[i];
4072 }
4073 /* We may have multiple elements with the same score, what we need
4074 * is to find the element with both the right score and object. */
4075 return x->forward[0];
4076}
4077
fd8ccf44 4078/* The actual Z-commands implementations */
4079
4080static void zaddCommand(redisClient *c) {
4081 robj *zsetobj;
4082 zset *zs;
4083 double *score;
4084
4085 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4086 if (zsetobj == NULL) {
4087 zsetobj = createZsetObject();
4088 dictAdd(c->db->dict,c->argv[1],zsetobj);
4089 incrRefCount(c->argv[1]);
4090 } else {
4091 if (zsetobj->type != REDIS_ZSET) {
4092 addReply(c,shared.wrongtypeerr);
4093 return;
4094 }
4095 }
4096 score = zmalloc(sizeof(double));
4097 *score = strtod(c->argv[2]->ptr,NULL);
4098 zs = zsetobj->ptr;
4099 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
4100 /* case 1: New element */
4101 incrRefCount(c->argv[3]); /* added to hash */
4102 zslInsert(zs->zsl,*score,c->argv[3]);
4103 incrRefCount(c->argv[3]); /* added to skiplist */
4104 server.dirty++;
4105 addReply(c,shared.cone);
4106 } else {
4107 dictEntry *de;
4108 double *oldscore;
4109
4110 /* case 2: Score update operation */
4111 de = dictFind(zs->dict,c->argv[3]);
4112 assert(de != NULL);
4113 oldscore = dictGetEntryVal(de);
4114 if (*score != *oldscore) {
4115 int deleted;
4116
e197b441 4117 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
fd8ccf44 4118 assert(deleted != 0);
4119 zslInsert(zs->zsl,*score,c->argv[3]);
4120 incrRefCount(c->argv[3]);
2161a965 4121 dictReplace(zs->dict,c->argv[3],score);
fd8ccf44 4122 server.dirty++;
2161a965 4123 } else {
4124 zfree(score);
fd8ccf44 4125 }
4126 addReply(c,shared.czero);
4127 }
4128}
4129
1b7106e7 4130static void zremCommand(redisClient *c) {
4131 robj *zsetobj;
4132 zset *zs;
4133
4134 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4135 if (zsetobj == NULL) {
4136 addReply(c,shared.czero);
4137 } else {
4138 dictEntry *de;
4139 double *oldscore;
4140 int deleted;
4141
4142 if (zsetobj->type != REDIS_ZSET) {
4143 addReply(c,shared.wrongtypeerr);
4144 return;
4145 }
4146 zs = zsetobj->ptr;
4147 de = dictFind(zs->dict,c->argv[2]);
4148 if (de == NULL) {
4149 addReply(c,shared.czero);
4150 return;
4151 }
4152 /* Delete from the skiplist */
4153 oldscore = dictGetEntryVal(de);
4154 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4155 assert(deleted != 0);
4156
4157 /* Delete from the hash table */
4158 dictDelete(zs->dict,c->argv[2]);
4159 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4160 server.dirty++;
4161 addReply(c,shared.cone);
4162 }
4163}
4164
1807985b 4165static void zremrangebyscoreCommand(redisClient *c) {
4166 double min = strtod(c->argv[2]->ptr,NULL);
4167 double max = strtod(c->argv[3]->ptr,NULL);
4168 robj *zsetobj;
4169 zset *zs;
4170
4171 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4172 if (zsetobj == NULL) {
4173 addReply(c,shared.czero);
4174 } else {
4175 long deleted;
4176
4177 if (zsetobj->type != REDIS_ZSET) {
4178 addReply(c,shared.wrongtypeerr);
4179 return;
4180 }
4181 zs = zsetobj->ptr;
4182 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4183 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4184 server.dirty += deleted;
4185 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4186 }
4187}
4188
e3870fab 4189static void zrangeGenericCommand(redisClient *c, int reverse) {
cc812361 4190 robj *o;
4191 int start = atoi(c->argv[2]->ptr);
4192 int end = atoi(c->argv[3]->ptr);
4193
4194 o = lookupKeyRead(c->db,c->argv[1]);
4195 if (o == NULL) {
4196 addReply(c,shared.nullmultibulk);
4197 } else {
4198 if (o->type != REDIS_ZSET) {
4199 addReply(c,shared.wrongtypeerr);
4200 } else {
4201 zset *zsetobj = o->ptr;
4202 zskiplist *zsl = zsetobj->zsl;
4203 zskiplistNode *ln;
4204
4205 int llen = zsl->length;
4206 int rangelen, j;
4207 robj *ele;
4208
4209 /* convert negative indexes */
4210 if (start < 0) start = llen+start;
4211 if (end < 0) end = llen+end;
4212 if (start < 0) start = 0;
4213 if (end < 0) end = 0;
4214
4215 /* indexes sanity checks */
4216 if (start > end || start >= llen) {
4217 /* Out of range start or start > end result in empty list */
4218 addReply(c,shared.emptymultibulk);
4219 return;
4220 }
4221 if (end >= llen) end = llen-1;
4222 rangelen = (end-start)+1;
4223
4224 /* Return the result in form of a multi-bulk reply */
e3870fab 4225 if (reverse) {
4226 ln = zsl->tail;
4227 while (start--)
4228 ln = ln->backward;
4229 } else {
4230 ln = zsl->header->forward[0];
4231 while (start--)
4232 ln = ln->forward[0];
4233 }
cc812361 4234
4235 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4236 for (j = 0; j < rangelen; j++) {
0aad7a19 4237 ele = ln->obj;
cc812361 4238 addReplyBulkLen(c,ele);
4239 addReply(c,ele);
4240 addReply(c,shared.crlf);
e3870fab 4241 ln = reverse ? ln->backward : ln->forward[0];
cc812361 4242 }
4243 }
4244 }
4245}
4246
e3870fab 4247static void zrangeCommand(redisClient *c) {
4248 zrangeGenericCommand(c,0);
4249}
4250
4251static void zrevrangeCommand(redisClient *c) {
4252 zrangeGenericCommand(c,1);
4253}
4254
50c55df5 4255static void zrangebyscoreCommand(redisClient *c) {
4256 robj *o;
4257 double min = strtod(c->argv[2]->ptr,NULL);
4258 double max = strtod(c->argv[3]->ptr,NULL);
4259
4260 o = lookupKeyRead(c->db,c->argv[1]);
4261 if (o == NULL) {
4262 addReply(c,shared.nullmultibulk);
4263 } else {
4264 if (o->type != REDIS_ZSET) {
4265 addReply(c,shared.wrongtypeerr);
4266 } else {
4267 zset *zsetobj = o->ptr;
4268 zskiplist *zsl = zsetobj->zsl;
4269 zskiplistNode *ln;
4270 robj *ele, *lenobj;
4271 unsigned int rangelen = 0;
4272
4273 /* Get the first node with the score >= min */
4274 ln = zslFirstWithScore(zsl,min);
4275 if (ln == NULL) {
4276 /* No element matching the speciifed interval */
4277 addReply(c,shared.emptymultibulk);
4278 return;
4279 }
4280
4281 /* We don't know in advance how many matching elements there
4282 * are in the list, so we push this object that will represent
4283 * the multi-bulk length in the output buffer, and will "fix"
4284 * it later */
4285 lenobj = createObject(REDIS_STRING,NULL);
4286 addReply(c,lenobj);
4287
dbbc7285 4288 while(ln && ln->score <= max) {
50c55df5 4289 ele = ln->obj;
4290 addReplyBulkLen(c,ele);
4291 addReply(c,ele);
4292 addReply(c,shared.crlf);
4293 ln = ln->forward[0];
4294 rangelen++;
4295 }
4296 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4297 }
4298 }
4299}
4300
3c41331e 4301static void zcardCommand(redisClient *c) {
e197b441 4302 robj *o;
4303 zset *zs;
4304
4305 o = lookupKeyRead(c->db,c->argv[1]);
4306 if (o == NULL) {
4307 addReply(c,shared.czero);
4308 return;
4309 } else {
4310 if (o->type != REDIS_ZSET) {
4311 addReply(c,shared.wrongtypeerr);
4312 } else {
4313 zs = o->ptr;
4314 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4315 }
4316 }
4317}
4318
6e333bbe 4319static void zscoreCommand(redisClient *c) {
4320 robj *o;
4321 zset *zs;
4322
4323 o = lookupKeyRead(c->db,c->argv[1]);
4324 if (o == NULL) {
4325 addReply(c,shared.czero);
4326 return;
4327 } else {
4328 if (o->type != REDIS_ZSET) {
4329 addReply(c,shared.wrongtypeerr);
4330 } else {
4331 dictEntry *de;
4332
4333 zs = o->ptr;
4334 de = dictFind(zs->dict,c->argv[2]);
4335 if (!de) {
4336 addReply(c,shared.nullbulk);
4337 } else {
4338 char buf[128];
4339 double *score = dictGetEntryVal(de);
4340
4341 snprintf(buf,sizeof(buf),"%.16g",*score);
4342 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
4343 strlen(buf),buf));
4344 }
4345 }
4346 }
4347}
4348
6b47e12e 4349/* ========================= Non type-specific commands ==================== */
4350
ed9b544e 4351static void flushdbCommand(redisClient *c) {
ca37e9cd 4352 server.dirty += dictSize(c->db->dict);
3305306f 4353 dictEmpty(c->db->dict);
4354 dictEmpty(c->db->expires);
ed9b544e 4355 addReply(c,shared.ok);
ed9b544e 4356}
4357
4358static void flushallCommand(redisClient *c) {
ca37e9cd 4359 server.dirty += emptyDb();
ed9b544e 4360 addReply(c,shared.ok);
f78fd11b 4361 rdbSave(server.dbfilename);
ca37e9cd 4362 server.dirty++;
ed9b544e 4363}
4364
56906eef 4365static redisSortOperation *createSortOperation(int type, robj *pattern) {
ed9b544e 4366 redisSortOperation *so = zmalloc(sizeof(*so));
ed9b544e 4367 so->type = type;
4368 so->pattern = pattern;
4369 return so;
4370}
4371
4372/* Return the value associated to the key with a name obtained
4373 * substituting the first occurence of '*' in 'pattern' with 'subst' */
56906eef 4374static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
ed9b544e 4375 char *p;
4376 sds spat, ssub;
4377 robj keyobj;
4378 int prefixlen, sublen, postfixlen;
ed9b544e 4379 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4380 struct {
f1017b3f 4381 long len;
4382 long free;
ed9b544e 4383 char buf[REDIS_SORTKEY_MAX+1];
4384 } keyname;
4385
942a3961 4386 if (subst->encoding == REDIS_ENCODING_RAW)
4387 incrRefCount(subst);
4388 else {
4389 subst = getDecodedObject(subst);
4390 }
4391
ed9b544e 4392 spat = pattern->ptr;
4393 ssub = subst->ptr;
4394 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4395 p = strchr(spat,'*');
4396 if (!p) return NULL;
4397
4398 prefixlen = p-spat;
4399 sublen = sdslen(ssub);
4400 postfixlen = sdslen(spat)-(prefixlen+1);
4401 memcpy(keyname.buf,spat,prefixlen);
4402 memcpy(keyname.buf+prefixlen,ssub,sublen);
4403 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4404 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4405 keyname.len = prefixlen+sublen+postfixlen;
4406
4407 keyobj.refcount = 1;
4408 keyobj.type = REDIS_STRING;
4409 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4410
942a3961 4411 decrRefCount(subst);
4412
a4d1ba9a 4413 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3305306f 4414 return lookupKeyRead(db,&keyobj);
ed9b544e 4415}
4416
4417/* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4418 * the additional parameter is not standard but a BSD-specific we have to
4419 * pass sorting parameters via the global 'server' structure */
4420static int sortCompare(const void *s1, const void *s2) {
4421 const redisSortObject *so1 = s1, *so2 = s2;
4422 int cmp;
4423
4424 if (!server.sort_alpha) {
4425 /* Numeric sorting. Here it's trivial as we precomputed scores */
4426 if (so1->u.score > so2->u.score) {
4427 cmp = 1;
4428 } else if (so1->u.score < so2->u.score) {
4429 cmp = -1;
4430 } else {
4431 cmp = 0;
4432 }
4433 } else {
4434 /* Alphanumeric sorting */
4435 if (server.sort_bypattern) {
4436 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4437 /* At least one compare object is NULL */
4438 if (so1->u.cmpobj == so2->u.cmpobj)
4439 cmp = 0;
4440 else if (so1->u.cmpobj == NULL)
4441 cmp = -1;
4442 else
4443 cmp = 1;
4444 } else {
4445 /* We have both the objects, use strcoll */
4446 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4447 }
4448 } else {
4449 /* Compare elements directly */
942a3961 4450 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4451 so2->obj->encoding == REDIS_ENCODING_RAW) {
4452 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4453 } else {
4454 robj *dec1, *dec2;
4455
4456 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4457 so1->obj : getDecodedObject(so1->obj);
4458 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4459 so2->obj : getDecodedObject(so2->obj);
4460 cmp = strcoll(dec1->ptr,dec2->ptr);
4461 if (dec1 != so1->obj) decrRefCount(dec1);
4462 if (dec2 != so2->obj) decrRefCount(dec2);
4463 }
ed9b544e 4464 }
4465 }
4466 return server.sort_desc ? -cmp : cmp;
4467}
4468
4469/* The SORT command is the most complex command in Redis. Warning: this code
4470 * is optimized for speed and a bit less for readability */
4471static void sortCommand(redisClient *c) {
ed9b544e 4472 list *operations;
4473 int outputlen = 0;
4474 int desc = 0, alpha = 0;
4475 int limit_start = 0, limit_count = -1, start, end;
4476 int j, dontsort = 0, vectorlen;
4477 int getop = 0; /* GET operation counter */
4478 robj *sortval, *sortby = NULL;
4479 redisSortObject *vector; /* Resulting vector to sort */
4480
4481 /* Lookup the key to sort. It must be of the right types */
3305306f 4482 sortval = lookupKeyRead(c->db,c->argv[1]);
4483 if (sortval == NULL) {
c937aa89 4484 addReply(c,shared.nokeyerr);
ed9b544e 4485 return;
4486 }
ed9b544e 4487 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
c937aa89 4488 addReply(c,shared.wrongtypeerr);
ed9b544e 4489 return;
4490 }
4491
4492 /* Create a list of operations to perform for every sorted element.
4493 * Operations can be GET/DEL/INCR/DECR */
4494 operations = listCreate();
092dac2a 4495 listSetFreeMethod(operations,zfree);
ed9b544e 4496 j = 2;
4497
4498 /* Now we need to protect sortval incrementing its count, in the future
4499 * SORT may have options able to overwrite/delete keys during the sorting
4500 * and the sorted key itself may get destroied */
4501 incrRefCount(sortval);
4502
4503 /* The SORT command has an SQL-alike syntax, parse it */
4504 while(j < c->argc) {
4505 int leftargs = c->argc-j-1;
4506 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4507 desc = 0;
4508 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4509 desc = 1;
4510 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4511 alpha = 1;
4512 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4513 limit_start = atoi(c->argv[j+1]->ptr);
4514 limit_count = atoi(c->argv[j+2]->ptr);
4515 j+=2;
4516 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4517 sortby = c->argv[j+1];
4518 /* If the BY pattern does not contain '*', i.e. it is constant,
4519 * we don't need to sort nor to lookup the weight keys. */
4520 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4521 j++;
4522 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4523 listAddNodeTail(operations,createSortOperation(
4524 REDIS_SORT_GET,c->argv[j+1]));
4525 getop++;
4526 j++;
4527 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
4528 listAddNodeTail(operations,createSortOperation(
4529 REDIS_SORT_DEL,c->argv[j+1]));
4530 j++;
4531 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
4532 listAddNodeTail(operations,createSortOperation(
4533 REDIS_SORT_INCR,c->argv[j+1]));
4534 j++;
4535 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4536 listAddNodeTail(operations,createSortOperation(
4537 REDIS_SORT_DECR,c->argv[j+1]));
4538 j++;
4539 } else {
4540 decrRefCount(sortval);
4541 listRelease(operations);
c937aa89 4542 addReply(c,shared.syntaxerr);
ed9b544e 4543 return;
4544 }
4545 j++;
4546 }
4547
4548 /* Load the sorting vector with all the objects to sort */
4549 vectorlen = (sortval->type == REDIS_LIST) ?
4550 listLength((list*)sortval->ptr) :
3305306f 4551 dictSize((dict*)sortval->ptr);
ed9b544e 4552 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
ed9b544e 4553 j = 0;
4554 if (sortval->type == REDIS_LIST) {
4555 list *list = sortval->ptr;
6208b3a7 4556 listNode *ln;
4557
4558 listRewind(list);
4559 while((ln = listYield(list))) {
ed9b544e 4560 robj *ele = ln->value;
4561 vector[j].obj = ele;
4562 vector[j].u.score = 0;
4563 vector[j].u.cmpobj = NULL;
ed9b544e 4564 j++;
4565 }
4566 } else {
4567 dict *set = sortval->ptr;
4568 dictIterator *di;
4569 dictEntry *setele;
4570
4571 di = dictGetIterator(set);
ed9b544e 4572 while((setele = dictNext(di)) != NULL) {
4573 vector[j].obj = dictGetEntryKey(setele);
4574 vector[j].u.score = 0;
4575 vector[j].u.cmpobj = NULL;
4576 j++;
4577 }
4578 dictReleaseIterator(di);
4579 }
4580 assert(j == vectorlen);
4581
4582 /* Now it's time to load the right scores in the sorting vector */
4583 if (dontsort == 0) {
4584 for (j = 0; j < vectorlen; j++) {
4585 if (sortby) {
4586 robj *byval;
4587
3305306f 4588 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
ed9b544e 4589 if (!byval || byval->type != REDIS_STRING) continue;
4590 if (alpha) {
942a3961 4591 if (byval->encoding == REDIS_ENCODING_RAW) {
4592 vector[j].u.cmpobj = byval;
4593 incrRefCount(byval);
4594 } else {
4595 vector[j].u.cmpobj = getDecodedObject(byval);
4596 }
ed9b544e 4597 } else {
942a3961 4598 if (byval->encoding == REDIS_ENCODING_RAW) {
4599 vector[j].u.score = strtod(byval->ptr,NULL);
4600 } else {
f1017b3f 4601 if (byval->encoding == REDIS_ENCODING_INT) {
942a3961 4602 vector[j].u.score = (long)byval->ptr;
f1017b3f 4603 } else
942a3961 4604 assert(1 != 1);
4605 }
ed9b544e 4606 }
4607 } else {
942a3961 4608 if (!alpha) {
4609 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4610 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4611 else {
4612 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4613 vector[j].u.score = (long) vector[j].obj->ptr;
4614 else
4615 assert(1 != 1);
4616 }
4617 }
ed9b544e 4618 }
4619 }
4620 }
4621
4622 /* We are ready to sort the vector... perform a bit of sanity check
4623 * on the LIMIT option too. We'll use a partial version of quicksort. */
4624 start = (limit_start < 0) ? 0 : limit_start;
4625 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4626 if (start >= vectorlen) {
4627 start = vectorlen-1;
4628 end = vectorlen-2;
4629 }
4630 if (end >= vectorlen) end = vectorlen-1;
4631
4632 if (dontsort == 0) {
4633 server.sort_desc = desc;
4634 server.sort_alpha = alpha;
4635 server.sort_bypattern = sortby ? 1 : 0;
5f5b9840 4636 if (sortby && (start != 0 || end != vectorlen-1))
4637 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4638 else
4639 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
ed9b544e 4640 }
4641
4642 /* Send command output to the output buffer, performing the specified
4643 * GET/DEL/INCR/DECR operations if any. */
4644 outputlen = getop ? getop*(end-start+1) : end-start+1;
c937aa89 4645 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
ed9b544e 4646 for (j = start; j <= end; j++) {
6208b3a7 4647 listNode *ln;
ed9b544e 4648 if (!getop) {
942a3961 4649 addReplyBulkLen(c,vector[j].obj);
ed9b544e 4650 addReply(c,vector[j].obj);
4651 addReply(c,shared.crlf);
4652 }
6208b3a7 4653 listRewind(operations);
4654 while((ln = listYield(operations))) {
ed9b544e 4655 redisSortOperation *sop = ln->value;
3305306f 4656 robj *val = lookupKeyByPattern(c->db,sop->pattern,
ed9b544e 4657 vector[j].obj);
4658
4659 if (sop->type == REDIS_SORT_GET) {
4660 if (!val || val->type != REDIS_STRING) {
9eb00f21 4661 addReply(c,shared.nullbulk);
ed9b544e 4662 } else {
942a3961 4663 addReplyBulkLen(c,val);
ed9b544e 4664 addReply(c,val);
4665 addReply(c,shared.crlf);
4666 }
4667 } else if (sop->type == REDIS_SORT_DEL) {
4668 /* TODO */
4669 }
ed9b544e 4670 }
4671 }
4672
4673 /* Cleanup */
4674 decrRefCount(sortval);
4675 listRelease(operations);
4676 for (j = 0; j < vectorlen; j++) {
4677 if (sortby && alpha && vector[j].u.cmpobj)
4678 decrRefCount(vector[j].u.cmpobj);
4679 }
4680 zfree(vector);
4681}
4682
4683static void infoCommand(redisClient *c) {
4684 sds info;
4685 time_t uptime = time(NULL)-server.stat_starttime;
c3cb078d 4686 int j;
ed9b544e 4687
4688 info = sdscatprintf(sdsempty(),
4689 "redis_version:%s\r\n"
f1017b3f 4690 "arch_bits:%s\r\n"
a0f643ea 4691 "uptime_in_seconds:%d\r\n"
4692 "uptime_in_days:%d\r\n"
ed9b544e 4693 "connected_clients:%d\r\n"
4694 "connected_slaves:%d\r\n"
5fba9f71 4695 "used_memory:%zu\r\n"
ed9b544e 4696 "changes_since_last_save:%lld\r\n"
be2bb6b0 4697 "bgsave_in_progress:%d\r\n"
ed9b544e 4698 "last_save_time:%d\r\n"
4699 "total_connections_received:%lld\r\n"
4700 "total_commands_processed:%lld\r\n"
a0f643ea 4701 "role:%s\r\n"
ed9b544e 4702 ,REDIS_VERSION,
f1017b3f 4703 (sizeof(long) == 8) ? "64" : "32",
a0f643ea 4704 uptime,
4705 uptime/(3600*24),
ed9b544e 4706 listLength(server.clients)-listLength(server.slaves),
4707 listLength(server.slaves),
4708 server.usedmemory,
4709 server.dirty,
be2bb6b0 4710 server.bgsaveinprogress,
ed9b544e 4711 server.lastsave,
4712 server.stat_numconnections,
4713 server.stat_numcommands,
a0f643ea 4714 server.masterhost == NULL ? "master" : "slave"
ed9b544e 4715 );
a0f643ea 4716 if (server.masterhost) {
4717 info = sdscatprintf(info,
4718 "master_host:%s\r\n"
4719 "master_port:%d\r\n"
4720 "master_link_status:%s\r\n"
4721 "master_last_io_seconds_ago:%d\r\n"
4722 ,server.masterhost,
4723 server.masterport,
4724 (server.replstate == REDIS_REPL_CONNECTED) ?
4725 "up" : "down",
f72b934d 4726 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
a0f643ea 4727 );
4728 }
c3cb078d 4729 for (j = 0; j < server.dbnum; j++) {
4730 long long keys, vkeys;
4731
4732 keys = dictSize(server.db[j].dict);
4733 vkeys = dictSize(server.db[j].expires);
4734 if (keys || vkeys) {
4735 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4736 j, keys, vkeys);
4737 }
4738 }
c937aa89 4739 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
ed9b544e 4740 addReplySds(c,info);
70003d28 4741 addReply(c,shared.crlf);
ed9b544e 4742}
4743
3305306f 4744static void monitorCommand(redisClient *c) {
4745 /* ignore MONITOR if aleady slave or in monitor mode */
4746 if (c->flags & REDIS_SLAVE) return;
4747
4748 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4749 c->slaveseldb = 0;
6b47e12e 4750 listAddNodeTail(server.monitors,c);
3305306f 4751 addReply(c,shared.ok);
4752}
4753
4754/* ================================= Expire ================================= */
4755static int removeExpire(redisDb *db, robj *key) {
4756 if (dictDelete(db->expires,key) == DICT_OK) {
4757 return 1;
4758 } else {
4759 return 0;
4760 }
4761}
4762
4763static int setExpire(redisDb *db, robj *key, time_t when) {
4764 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4765 return 0;
4766 } else {
4767 incrRefCount(key);
4768 return 1;
4769 }
4770}
4771
bb32ede5 4772/* Return the expire time of the specified key, or -1 if no expire
4773 * is associated with this key (i.e. the key is non volatile) */
4774static time_t getExpire(redisDb *db, robj *key) {
4775 dictEntry *de;
4776
4777 /* No expire? return ASAP */
4778 if (dictSize(db->expires) == 0 ||
4779 (de = dictFind(db->expires,key)) == NULL) return -1;
4780
4781 return (time_t) dictGetEntryVal(de);
4782}
4783
3305306f 4784static int expireIfNeeded(redisDb *db, robj *key) {
4785 time_t when;
4786 dictEntry *de;
4787
4788 /* No expire? return ASAP */
4789 if (dictSize(db->expires) == 0 ||
4790 (de = dictFind(db->expires,key)) == NULL) return 0;
4791
4792 /* Lookup the expire */
4793 when = (time_t) dictGetEntryVal(de);
4794 if (time(NULL) <= when) return 0;
4795
4796 /* Delete the key */
4797 dictDelete(db->expires,key);
4798 return dictDelete(db->dict,key) == DICT_OK;
4799}
4800
4801static int deleteIfVolatile(redisDb *db, robj *key) {
4802 dictEntry *de;
4803
4804 /* No expire? return ASAP */
4805 if (dictSize(db->expires) == 0 ||
4806 (de = dictFind(db->expires,key)) == NULL) return 0;
4807
4808 /* Delete the key */
0c66a471 4809 server.dirty++;
3305306f 4810 dictDelete(db->expires,key);
4811 return dictDelete(db->dict,key) == DICT_OK;
4812}
4813
802e8373 4814static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
3305306f 4815 dictEntry *de;
3305306f 4816
802e8373 4817 de = dictFind(c->db->dict,key);
3305306f 4818 if (de == NULL) {
4819 addReply(c,shared.czero);
4820 return;
4821 }
43e5ccdf 4822 if (seconds < 0) {
4823 if (deleteKey(c->db,key)) server.dirty++;
4824 addReply(c, shared.cone);
3305306f 4825 return;
4826 } else {
4827 time_t when = time(NULL)+seconds;
802e8373 4828 if (setExpire(c->db,key,when)) {
3305306f 4829 addReply(c,shared.cone);
77423026 4830 server.dirty++;
4831 } else {
3305306f 4832 addReply(c,shared.czero);
77423026 4833 }
3305306f 4834 return;
4835 }
4836}
4837
802e8373 4838static void expireCommand(redisClient *c) {
4839 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
4840}
4841
4842static void expireatCommand(redisClient *c) {
4843 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
4844}
4845
fd88489a 4846static void ttlCommand(redisClient *c) {
4847 time_t expire;
4848 int ttl = -1;
4849
4850 expire = getExpire(c->db,c->argv[1]);
4851 if (expire != -1) {
4852 ttl = (int) (expire-time(NULL));
4853 if (ttl < 0) ttl = -1;
4854 }
4855 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4856}
4857
f6b141c5 4858static void msetGenericCommand(redisClient *c, int nx) {
4859 int j;
4860
4861 if ((c->argc % 2) == 0) {
4862 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4863 return;
4864 }
4865 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4866 * set nothing at all if at least one already key exists. */
4867 if (nx) {
4868 for (j = 1; j < c->argc; j += 2) {
4869 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4870 addReply(c, shared.czero);
4871 return;
4872 }
4873 }
4874 }
4875
4876 for (j = 1; j < c->argc; j += 2) {
2ed22c8b 4877 int retval;
4878
4879 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4880 if (retval == DICT_ERR) {
4881 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4882 incrRefCount(c->argv[j+1]);
4883 } else {
4884 incrRefCount(c->argv[j]);
4885 incrRefCount(c->argv[j+1]);
4886 }
f6b141c5 4887 removeExpire(c->db,c->argv[j]);
4888 }
4889 server.dirty += (c->argc-1)/2;
4890 addReply(c, nx ? shared.cone : shared.ok);
4891}
4892
4893static void msetCommand(redisClient *c) {
4894 msetGenericCommand(c,0);
4895}
4896
4897static void msetnxCommand(redisClient *c) {
4898 msetGenericCommand(c,1);
4899}
4900
ed9b544e 4901/* =============================== Replication ============================= */
4902
a4d1ba9a 4903static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
ed9b544e 4904 ssize_t nwritten, ret = size;
4905 time_t start = time(NULL);
4906
4907 timeout++;
4908 while(size) {
4909 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4910 nwritten = write(fd,ptr,size);
4911 if (nwritten == -1) return -1;
4912 ptr += nwritten;
4913 size -= nwritten;
4914 }
4915 if ((time(NULL)-start) > timeout) {
4916 errno = ETIMEDOUT;
4917 return -1;
4918 }
4919 }
4920 return ret;
4921}
4922
a4d1ba9a 4923static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
ed9b544e 4924 ssize_t nread, totread = 0;
4925 time_t start = time(NULL);
4926
4927 timeout++;
4928 while(size) {
4929 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4930 nread = read(fd,ptr,size);
4931 if (nread == -1) return -1;
4932 ptr += nread;
4933 size -= nread;
4934 totread += nread;
4935 }
4936 if ((time(NULL)-start) > timeout) {
4937 errno = ETIMEDOUT;
4938 return -1;
4939 }
4940 }
4941 return totread;
4942}
4943
4944static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4945 ssize_t nread = 0;
4946
4947 size--;
4948 while(size) {
4949 char c;
4950
4951 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4952 if (c == '\n') {
4953 *ptr = '\0';
4954 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4955 return nread;
4956 } else {
4957 *ptr++ = c;
4958 *ptr = '\0';
4959 nread++;
4960 }
4961 }
4962 return nread;
4963}
4964
4965static void syncCommand(redisClient *c) {
40d224a9 4966 /* ignore SYNC if aleady slave or in monitor mode */
4967 if (c->flags & REDIS_SLAVE) return;
4968
4969 /* SYNC can't be issued when the server has pending data to send to
4970 * the client about already issued commands. We need a fresh reply
4971 * buffer registering the differences between the BGSAVE and the current
4972 * dataset, so that we can copy to other slaves if needed. */
4973 if (listLength(c->reply) != 0) {
4974 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4975 return;
4976 }
4977
4978 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4979 /* Here we need to check if there is a background saving operation
4980 * in progress, or if it is required to start one */
4981 if (server.bgsaveinprogress) {
4982 /* Ok a background save is in progress. Let's check if it is a good
4983 * one for replication, i.e. if there is another slave that is
4984 * registering differences since the server forked to save */
4985 redisClient *slave;
4986 listNode *ln;
4987
6208b3a7 4988 listRewind(server.slaves);
4989 while((ln = listYield(server.slaves))) {
40d224a9 4990 slave = ln->value;
4991 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
40d224a9 4992 }
4993 if (ln) {
4994 /* Perfect, the server is already registering differences for
4995 * another slave. Set the right state, and copy the buffer. */
4996 listRelease(c->reply);
4997 c->reply = listDup(slave->reply);
40d224a9 4998 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4999 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5000 } else {
5001 /* No way, we need to wait for the next BGSAVE in order to
5002 * register differences */
5003 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5004 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5005 }
5006 } else {
5007 /* Ok we don't have a BGSAVE in progress, let's start one */
5008 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5009 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5010 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5011 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5012 return;
5013 }
5014 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5015 }
6208b3a7 5016 c->repldbfd = -1;
40d224a9 5017 c->flags |= REDIS_SLAVE;
5018 c->slaveseldb = 0;
6b47e12e 5019 listAddNodeTail(server.slaves,c);
40d224a9 5020 return;
5021}
5022
6208b3a7 5023static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5024 redisClient *slave = privdata;
5025 REDIS_NOTUSED(el);
5026 REDIS_NOTUSED(mask);
5027 char buf[REDIS_IOBUF_LEN];
5028 ssize_t nwritten, buflen;
5029
5030 if (slave->repldboff == 0) {
5031 /* Write the bulk write count before to transfer the DB. In theory here
5032 * we don't know how much room there is in the output buffer of the
5033 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5034 * operations) will never be smaller than the few bytes we need. */
5035 sds bulkcount;
5036
5037 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5038 slave->repldbsize);
5039 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5040 {
5041 sdsfree(bulkcount);
5042 freeClient(slave);
5043 return;
5044 }
5045 sdsfree(bulkcount);
5046 }
5047 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5048 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5049 if (buflen <= 0) {
5050 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5051 (buflen == 0) ? "premature EOF" : strerror(errno));
5052 freeClient(slave);
5053 return;
5054 }
5055 if ((nwritten = write(fd,buf,buflen)) == -1) {
5056 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5057 strerror(errno));
5058 freeClient(slave);
5059 return;
5060 }
5061 slave->repldboff += nwritten;
5062 if (slave->repldboff == slave->repldbsize) {
5063 close(slave->repldbfd);
5064 slave->repldbfd = -1;
5065 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5066 slave->replstate = REDIS_REPL_ONLINE;
5067 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5068 sendReplyToClient, slave, NULL) == AE_ERR) {
5069 freeClient(slave);
5070 return;
5071 }
5072 addReplySds(slave,sdsempty());
5073 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5074 }
5075}
ed9b544e 5076
a3b21203 5077/* This function is called at the end of every backgrond saving.
5078 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5079 * otherwise REDIS_ERR is passed to the function.
5080 *
5081 * The goal of this function is to handle slaves waiting for a successful
5082 * background saving in order to perform non-blocking synchronization. */
5083static void updateSlavesWaitingBgsave(int bgsaveerr) {
6208b3a7 5084 listNode *ln;
5085 int startbgsave = 0;
ed9b544e 5086
6208b3a7 5087 listRewind(server.slaves);
5088 while((ln = listYield(server.slaves))) {
5089 redisClient *slave = ln->value;
ed9b544e 5090
6208b3a7 5091 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5092 startbgsave = 1;
5093 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5094 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
dde65f3f 5095 struct redis_stat buf;
6208b3a7 5096
5097 if (bgsaveerr != REDIS_OK) {
5098 freeClient(slave);
5099 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5100 continue;
5101 }
5102 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
dde65f3f 5103 redis_fstat(slave->repldbfd,&buf) == -1) {
6208b3a7 5104 freeClient(slave);
5105 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5106 continue;
5107 }
5108 slave->repldboff = 0;
5109 slave->repldbsize = buf.st_size;
5110 slave->replstate = REDIS_REPL_SEND_BULK;
5111 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5112 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
5113 freeClient(slave);
5114 continue;
5115 }
5116 }
ed9b544e 5117 }
6208b3a7 5118 if (startbgsave) {
5119 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5120 listRewind(server.slaves);
5121 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5122 while((ln = listYield(server.slaves))) {
5123 redisClient *slave = ln->value;
ed9b544e 5124
6208b3a7 5125 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5126 freeClient(slave);
5127 }
5128 }
5129 }
ed9b544e 5130}
5131
5132static int syncWithMaster(void) {
5133 char buf[1024], tmpfile[256];
5134 int dumpsize;
5135 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5136 int dfd;
5137
5138 if (fd == -1) {
5139 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5140 strerror(errno));
5141 return REDIS_ERR;
5142 }
5143 /* Issue the SYNC command */
5144 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5145 close(fd);
5146 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5147 strerror(errno));
5148 return REDIS_ERR;
5149 }
5150 /* Read the bulk write count */
8c4d91fc 5151 if (syncReadLine(fd,buf,1024,3600) == -1) {
ed9b544e 5152 close(fd);
5153 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5154 strerror(errno));
5155 return REDIS_ERR;
5156 }
4aa701c1 5157 if (buf[0] != '$') {
5158 close(fd);
5159 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5160 return REDIS_ERR;
5161 }
c937aa89 5162 dumpsize = atoi(buf+1);
ed9b544e 5163 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5164 /* Read the bulk write data on a temp file */
5165 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5166 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5167 if (dfd == -1) {
5168 close(fd);
5169 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5170 return REDIS_ERR;
5171 }
5172 while(dumpsize) {
5173 int nread, nwritten;
5174
5175 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5176 if (nread == -1) {
5177 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5178 strerror(errno));
5179 close(fd);
5180 close(dfd);
5181 return REDIS_ERR;
5182 }
5183 nwritten = write(dfd,buf,nread);
5184 if (nwritten == -1) {
5185 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5186 close(fd);
5187 close(dfd);
5188 return REDIS_ERR;
5189 }
5190 dumpsize -= nread;
5191 }
5192 close(dfd);
5193 if (rename(tmpfile,server.dbfilename) == -1) {
5194 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5195 unlink(tmpfile);
5196 close(fd);
5197 return REDIS_ERR;
5198 }
5199 emptyDb();
f78fd11b 5200 if (rdbLoad(server.dbfilename) != REDIS_OK) {
ed9b544e 5201 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5202 close(fd);
5203 return REDIS_ERR;
5204 }
5205 server.master = createClient(fd);
5206 server.master->flags |= REDIS_MASTER;
5207 server.replstate = REDIS_REPL_CONNECTED;
5208 return REDIS_OK;
5209}
5210
321b0e13 5211static void slaveofCommand(redisClient *c) {
5212 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5213 !strcasecmp(c->argv[2]->ptr,"one")) {
5214 if (server.masterhost) {
5215 sdsfree(server.masterhost);
5216 server.masterhost = NULL;
5217 if (server.master) freeClient(server.master);
5218 server.replstate = REDIS_REPL_NONE;
5219 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5220 }
5221 } else {
5222 sdsfree(server.masterhost);
5223 server.masterhost = sdsdup(c->argv[1]->ptr);
5224 server.masterport = atoi(c->argv[2]->ptr);
5225 if (server.master) freeClient(server.master);
5226 server.replstate = REDIS_REPL_CONNECT;
5227 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5228 server.masterhost, server.masterport);
5229 }
5230 addReply(c,shared.ok);
5231}
5232
3fd78bcd 5233/* ============================ Maxmemory directive ======================== */
5234
5235/* This function gets called when 'maxmemory' is set on the config file to limit
5236 * the max memory used by the server, and we are out of memory.
5237 * This function will try to, in order:
5238 *
5239 * - Free objects from the free list
5240 * - Try to remove keys with an EXPIRE set
5241 *
5242 * It is not possible to free enough memory to reach used-memory < maxmemory
5243 * the server will start refusing commands that will enlarge even more the
5244 * memory usage.
5245 */
5246static void freeMemoryIfNeeded(void) {
5247 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5248 if (listLength(server.objfreelist)) {
5249 robj *o;
5250
5251 listNode *head = listFirst(server.objfreelist);
5252 o = listNodeValue(head);
5253 listDelNode(server.objfreelist,head);
5254 zfree(o);
5255 } else {
5256 int j, k, freed = 0;
5257
5258 for (j = 0; j < server.dbnum; j++) {
5259 int minttl = -1;
5260 robj *minkey = NULL;
5261 struct dictEntry *de;
5262
5263 if (dictSize(server.db[j].expires)) {
5264 freed = 1;
5265 /* From a sample of three keys drop the one nearest to
5266 * the natural expire */
5267 for (k = 0; k < 3; k++) {
5268 time_t t;
5269
5270 de = dictGetRandomKey(server.db[j].expires);
5271 t = (time_t) dictGetEntryVal(de);
5272 if (minttl == -1 || t < minttl) {
5273 minkey = dictGetEntryKey(de);
5274 minttl = t;
5275 }
5276 }
5277 deleteKey(server.db+j,minkey);
5278 }
5279 }
5280 if (!freed) return; /* nothing to free... */
5281 }
5282 }
5283}
5284
7f957c92 5285/* ================================= Debugging ============================== */
5286
5287static void debugCommand(redisClient *c) {
5288 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
5289 *((char*)-1) = 'x';
333298da 5290 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
5291 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
5292 robj *key, *val;
5293
5294 if (!de) {
5295 addReply(c,shared.nokeyerr);
5296 return;
5297 }
5298 key = dictGetEntryKey(de);
5299 val = dictGetEntryVal(de);
5300 addReplySds(c,sdscatprintf(sdsempty(),
942a3961 5301 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5302 key, key->refcount, val, val->refcount, val->encoding));
7f957c92 5303 } else {
333298da 5304 addReplySds(c,sdsnew(
5305 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
7f957c92 5306 }
5307}
56906eef 5308
d76412d1 5309#ifdef HAVE_BACKTRACE
56906eef 5310static struct redisFunctionSym symsTable[] = {
724a51b1 5311{"compareStringObjects", (unsigned long)compareStringObjects},
5312{"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
942a3961 5313{"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
5314{"dictEncObjHash", (unsigned long)dictEncObjHash},
5315{"incrDecrCommand", (unsigned long)incrDecrCommand},
56906eef 5316{"freeStringObject", (unsigned long)freeStringObject},
5317{"freeListObject", (unsigned long)freeListObject},
5318{"freeSetObject", (unsigned long)freeSetObject},
5319{"decrRefCount", (unsigned long)decrRefCount},
5320{"createObject", (unsigned long)createObject},
5321{"freeClient", (unsigned long)freeClient},
5322{"rdbLoad", (unsigned long)rdbLoad},
942a3961 5323{"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
5324{"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
56906eef 5325{"addReply", (unsigned long)addReply},
5326{"addReplySds", (unsigned long)addReplySds},
5327{"incrRefCount", (unsigned long)incrRefCount},
5328{"rdbSaveBackground", (unsigned long)rdbSaveBackground},
5329{"createStringObject", (unsigned long)createStringObject},
5330{"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
5331{"syncWithMaster", (unsigned long)syncWithMaster},
5332{"tryObjectSharing", (unsigned long)tryObjectSharing},
942a3961 5333{"tryObjectEncoding", (unsigned long)tryObjectEncoding},
5334{"getDecodedObject", (unsigned long)getDecodedObject},
56906eef 5335{"removeExpire", (unsigned long)removeExpire},
5336{"expireIfNeeded", (unsigned long)expireIfNeeded},
5337{"deleteIfVolatile", (unsigned long)deleteIfVolatile},
5338{"deleteKey", (unsigned long)deleteKey},
5339{"getExpire", (unsigned long)getExpire},
5340{"setExpire", (unsigned long)setExpire},
a3b21203 5341{"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
56906eef 5342{"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
5343{"authCommand", (unsigned long)authCommand},
5344{"pingCommand", (unsigned long)pingCommand},
5345{"echoCommand", (unsigned long)echoCommand},
5346{"setCommand", (unsigned long)setCommand},
5347{"setnxCommand", (unsigned long)setnxCommand},
5348{"getCommand", (unsigned long)getCommand},
5349{"delCommand", (unsigned long)delCommand},
5350{"existsCommand", (unsigned long)existsCommand},
5351{"incrCommand", (unsigned long)incrCommand},
5352{"decrCommand", (unsigned long)decrCommand},
5353{"incrbyCommand", (unsigned long)incrbyCommand},
5354{"decrbyCommand", (unsigned long)decrbyCommand},
5355{"selectCommand", (unsigned long)selectCommand},
5356{"randomkeyCommand", (unsigned long)randomkeyCommand},
5357{"keysCommand", (unsigned long)keysCommand},
5358{"dbsizeCommand", (unsigned long)dbsizeCommand},
5359{"lastsaveCommand", (unsigned long)lastsaveCommand},
5360{"saveCommand", (unsigned long)saveCommand},
5361{"bgsaveCommand", (unsigned long)bgsaveCommand},
5362{"shutdownCommand", (unsigned long)shutdownCommand},
5363{"moveCommand", (unsigned long)moveCommand},
5364{"renameCommand", (unsigned long)renameCommand},
5365{"renamenxCommand", (unsigned long)renamenxCommand},
5366{"lpushCommand", (unsigned long)lpushCommand},
5367{"rpushCommand", (unsigned long)rpushCommand},
5368{"lpopCommand", (unsigned long)lpopCommand},
5369{"rpopCommand", (unsigned long)rpopCommand},
5370{"llenCommand", (unsigned long)llenCommand},
5371{"lindexCommand", (unsigned long)lindexCommand},
5372{"lrangeCommand", (unsigned long)lrangeCommand},
5373{"ltrimCommand", (unsigned long)ltrimCommand},
5374{"typeCommand", (unsigned long)typeCommand},
5375{"lsetCommand", (unsigned long)lsetCommand},
5376{"saddCommand", (unsigned long)saddCommand},
5377{"sremCommand", (unsigned long)sremCommand},
5378{"smoveCommand", (unsigned long)smoveCommand},
5379{"sismemberCommand", (unsigned long)sismemberCommand},
5380{"scardCommand", (unsigned long)scardCommand},
12fea928 5381{"spopCommand", (unsigned long)spopCommand},
2abb95a9 5382{"srandmemberCommand", (unsigned long)srandmemberCommand},
56906eef 5383{"sinterCommand", (unsigned long)sinterCommand},
5384{"sinterstoreCommand", (unsigned long)sinterstoreCommand},
5385{"sunionCommand", (unsigned long)sunionCommand},
5386{"sunionstoreCommand", (unsigned long)sunionstoreCommand},
5387{"sdiffCommand", (unsigned long)sdiffCommand},
5388{"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
5389{"syncCommand", (unsigned long)syncCommand},
5390{"flushdbCommand", (unsigned long)flushdbCommand},
5391{"flushallCommand", (unsigned long)flushallCommand},
5392{"sortCommand", (unsigned long)sortCommand},
5393{"lremCommand", (unsigned long)lremCommand},
5394{"infoCommand", (unsigned long)infoCommand},
5395{"mgetCommand", (unsigned long)mgetCommand},
5396{"monitorCommand", (unsigned long)monitorCommand},
5397{"expireCommand", (unsigned long)expireCommand},
802e8373 5398{"expireatCommand", (unsigned long)expireatCommand},
f6b141c5 5399{"getsetCommand", (unsigned long)getsetCommand},
56906eef 5400{"ttlCommand", (unsigned long)ttlCommand},
5401{"slaveofCommand", (unsigned long)slaveofCommand},
5402{"debugCommand", (unsigned long)debugCommand},
5403{"processCommand", (unsigned long)processCommand},
5404{"setupSigSegvAction", (unsigned long)setupSigSegvAction},
56906eef 5405{"readQueryFromClient", (unsigned long)readQueryFromClient},
a3b21203 5406{"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
f6b141c5 5407{"msetGenericCommand", (unsigned long)msetGenericCommand},
5408{"msetCommand", (unsigned long)msetCommand},
5409{"msetnxCommand", (unsigned long)msetnxCommand},
ace4ee54 5410{"zslCreateNode", (unsigned long)zslCreateNode},
5411{"zslCreate", (unsigned long)zslCreate},
5412{"zslFreeNode",(unsigned long)zslFreeNode},
5413{"zslFree",(unsigned long)zslFree},
5414{"zslRandomLevel",(unsigned long)zslRandomLevel},
5415{"zslInsert",(unsigned long)zslInsert},
5416{"zslDelete",(unsigned long)zslDelete},
5417{"createZsetObject",(unsigned long)createZsetObject},
5418{"zaddCommand",(unsigned long)zaddCommand},
e3870fab 5419{"zrangeGenericCommand",(unsigned long)zrangeGenericCommand},
cc812361 5420{"zrangeCommand",(unsigned long)zrangeCommand},
e3870fab 5421{"zrevrangeCommand",(unsigned long)zrevrangeCommand},
1b7106e7 5422{"zremCommand",(unsigned long)zremCommand},
2b59cfdf 5423{"rdbSaveDoubleValue",(unsigned long)rdbSaveDoubleValue},
5424{"rdbLoadDoubleValue",(unsigned long)rdbLoadDoubleValue},
56906eef 5425{NULL,0}
5426};
5427
5428/* This function try to convert a pointer into a function name. It's used in
5429 * oreder to provide a backtrace under segmentation fault that's able to
5430 * display functions declared as static (otherwise the backtrace is useless). */
5431static char *findFuncName(void *pointer, unsigned long *offset){
5432 int i, ret = -1;
5433 unsigned long off, minoff = 0;
5434
5435 /* Try to match against the Symbol with the smallest offset */
5436 for (i=0; symsTable[i].pointer; i++) {
5437 unsigned long lp = (unsigned long) pointer;
5438
5439 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5440 off=lp-symsTable[i].pointer;
5441 if (ret < 0 || off < minoff) {
5442 minoff=off;
5443 ret=i;
5444 }
5445 }
5446 }
5447 if (ret == -1) return NULL;
5448 *offset = minoff;
5449 return symsTable[ret].name;
5450}
5451
5452static void *getMcontextEip(ucontext_t *uc) {
5453#if defined(__FreeBSD__)
5454 return (void*) uc->uc_mcontext.mc_eip;
5455#elif defined(__dietlibc__)
5456 return (void*) uc->uc_mcontext.eip;
06db1f50 5457#elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
56906eef 5458 return (void*) uc->uc_mcontext->__ss.__eip;
06db1f50 5459#elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
cb7e07cc 5460 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
06db1f50 5461 return (void*) uc->uc_mcontext->__ss.__rip;
cbc59b38 5462 #else
5463 return (void*) uc->uc_mcontext->__ss.__eip;
5464 #endif
b91cf5ef 5465#elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
56906eef 5466 return (void*) uc->uc_mcontext.gregs[REG_EIP];
b91cf5ef 5467#elif defined(__ia64__) /* Linux IA64 */
5468 return (void*) uc->uc_mcontext.sc_ip;
5469#else
5470 return NULL;
56906eef 5471#endif
5472}
5473
5474static void segvHandler(int sig, siginfo_t *info, void *secret) {
5475 void *trace[100];
5476 char **messages = NULL;
5477 int i, trace_size = 0;
5478 unsigned long offset=0;
5479 time_t uptime = time(NULL)-server.stat_starttime;
5480 ucontext_t *uc = (ucontext_t*) secret;
5481 REDIS_NOTUSED(info);
5482
5483 redisLog(REDIS_WARNING,
5484 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5485 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
433cc893 5486 "redis_version:%s; "
56906eef 5487 "uptime_in_seconds:%d; "
433cc893 5488 "connected_clients:%d; "
5489 "connected_slaves:%d; "
5490 "used_memory:%zu; "
5491 "changes_since_last_save:%lld; "
5492 "bgsave_in_progress:%d; "
5493 "last_save_time:%d; "
5494 "total_connections_received:%lld; "
5495 "total_commands_processed:%lld; "
5496 "role:%s;"
5497 ,REDIS_VERSION,
5498 uptime,
5499 listLength(server.clients)-listLength(server.slaves),
5500 listLength(server.slaves),
5501 server.usedmemory,
5502 server.dirty,
5503 server.bgsaveinprogress,
5504 server.lastsave,
5505 server.stat_numconnections,
5506 server.stat_numcommands,
5507 server.masterhost == NULL ? "master" : "slave"
5508 ));
56906eef 5509
5510 trace_size = backtrace(trace, 100);
de96dbfe 5511 /* overwrite sigaction with caller's address */
b91cf5ef 5512 if (getMcontextEip(uc) != NULL) {
5513 trace[1] = getMcontextEip(uc);
5514 }
56906eef 5515 messages = backtrace_symbols(trace, trace_size);
fe3bbfbe 5516
d76412d1 5517 for (i=1; i<trace_size; ++i) {
56906eef 5518 char *fn = findFuncName(trace[i], &offset), *p;
5519
5520 p = strchr(messages[i],'+');
5521 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5522 redisLog(REDIS_WARNING,"%s", messages[i]);
5523 } else {
5524 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5525 }
5526 }
5527 free(messages);
5528 exit(0);
fe3bbfbe 5529}
56906eef 5530
5531static void setupSigSegvAction(void) {
5532 struct sigaction act;
5533
5534 sigemptyset (&act.sa_mask);
5535 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5536 * is used. Otherwise, sa_handler is used */
5537 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5538 act.sa_sigaction = segvHandler;
5539 sigaction (SIGSEGV, &act, NULL);
5540 sigaction (SIGBUS, &act, NULL);
12fea928 5541 sigaction (SIGFPE, &act, NULL);
5542 sigaction (SIGILL, &act, NULL);
5543 sigaction (SIGBUS, &act, NULL);
e65fdc78 5544 return;
56906eef 5545}
d76412d1 5546#else /* HAVE_BACKTRACE */
5547static void setupSigSegvAction(void) {
5548}
5549#endif /* HAVE_BACKTRACE */
e65fdc78 5550
ed9b544e 5551/* =================================== Main! ================================ */
5552
0bc03378 5553#ifdef __linux__
5554int linuxOvercommitMemoryValue(void) {
5555 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5556 char buf[64];
5557
5558 if (!fp) return -1;
5559 if (fgets(buf,64,fp) == NULL) {
5560 fclose(fp);
5561 return -1;
5562 }
5563 fclose(fp);
5564
5565 return atoi(buf);
5566}
5567
5568void linuxOvercommitMemoryWarning(void) {
5569 if (linuxOvercommitMemoryValue() == 0) {
b91cf5ef 5570 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
0bc03378 5571 }
5572}
5573#endif /* __linux__ */
5574
ed9b544e 5575static void daemonize(void) {
5576 int fd;
5577 FILE *fp;
5578
5579 if (fork() != 0) exit(0); /* parent exits */
5580 setsid(); /* create a new session */
5581
5582 /* Every output goes to /dev/null. If Redis is daemonized but
5583 * the 'logfile' is set to 'stdout' in the configuration file
5584 * it will not log at all. */
5585 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5586 dup2(fd, STDIN_FILENO);
5587 dup2(fd, STDOUT_FILENO);
5588 dup2(fd, STDERR_FILENO);
5589 if (fd > STDERR_FILENO) close(fd);
5590 }
5591 /* Try to write the pid file */
ed329fcf 5592 fp = fopen(server.pidfile,"w");
ed9b544e 5593 if (fp) {
5594 fprintf(fp,"%d\n",getpid());
5595 fclose(fp);
5596 }
5597}
5598
5599int main(int argc, char **argv) {
5600 initServerConfig();
5601 if (argc == 2) {
5602 ResetServerSaveParams();
5603 loadServerConfig(argv[1]);
5604 } else if (argc > 2) {
5605 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5606 exit(1);
8cca9b82 5607 } else {
5608 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
ed9b544e 5609 }
5610 initServer();
5611 if (server.daemonize) daemonize();
5612 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
b91cf5ef 5613#ifdef __linux__
5614 linuxOvercommitMemoryWarning();
5615#endif
f78fd11b 5616 if (rdbLoad(server.dbfilename) == REDIS_OK)
ed9b544e 5617 redisLog(REDIS_NOTICE,"DB loaded from disk");
5618 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5619 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
46713f83 5620 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
ed9b544e 5621 aeMain(server.el);
5622 aeDeleteEventLoop(server.el);
5623 return 0;
5624}