]> git.saurik.com Git - redis.git/blame - redis.c
first working implementation of BLPOP and BRPOP, still everything is to test well
[redis.git] / redis.c
CommitLineData
ed9b544e 1/*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
7c8163b8 30#define REDIS_VERSION "1.3.0"
23d4709d 31
32#include "fmacros.h"
fbf9bcdb 33#include "config.h"
ed9b544e 34
35#include <stdio.h>
36#include <stdlib.h>
37#include <string.h>
38#include <time.h>
39#include <unistd.h>
c9468bcf 40#define __USE_POSIX199309
ed9b544e 41#include <signal.h>
fbf9bcdb 42
43#ifdef HAVE_BACKTRACE
c9468bcf 44#include <execinfo.h>
45#include <ucontext.h>
fbf9bcdb 46#endif /* HAVE_BACKTRACE */
47
ed9b544e 48#include <sys/wait.h>
49#include <errno.h>
50#include <assert.h>
51#include <ctype.h>
52#include <stdarg.h>
53#include <inttypes.h>
54#include <arpa/inet.h>
55#include <sys/stat.h>
56#include <fcntl.h>
57#include <sys/time.h>
58#include <sys/resource.h>
2895e862 59#include <sys/uio.h>
f78fd11b 60#include <limits.h>
a7866db6 61#include <math.h>
0bc1b2f6 62
63#if defined(__sun)
5043dff3 64#include "solarisfixes.h"
65#endif
ed9b544e 66
c9468bcf 67#include "redis.h"
ed9b544e 68#include "ae.h" /* Event driven programming library */
69#include "sds.h" /* Dynamic safe strings */
70#include "anet.h" /* Networking the easy way */
71#include "dict.h" /* Hash tables */
72#include "adlist.h" /* Linked lists */
73#include "zmalloc.h" /* total memory usage aware version of malloc/free */
5f5b9840 74#include "lzf.h" /* LZF compression library */
75#include "pqsort.h" /* Partial qsort for SORT+LIMIT */
ed9b544e 76
77/* Error codes */
78#define REDIS_OK 0
79#define REDIS_ERR -1
80
81/* Static server configuration */
82#define REDIS_SERVERPORT 6379 /* TCP port */
83#define REDIS_MAXIDLETIME (60*5) /* default client timeout */
6208b3a7 84#define REDIS_IOBUF_LEN 1024
ed9b544e 85#define REDIS_LOADBUF_LEN 1024
93ea3759 86#define REDIS_STATIC_ARGS 4
ed9b544e 87#define REDIS_DEFAULT_DBNUM 16
88#define REDIS_CONFIGLINE_MAX 1024
89#define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90#define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
94754ccc 91#define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
6f376729 92#define REDIS_MAX_WRITE_PER_EVENT (1024*64)
2895e862 93#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96#define REDIS_WRITEV_THRESHOLD 3
97/* Max number of iovecs used for each writev call */
98#define REDIS_WRITEV_IOVEC_COUNT 256
ed9b544e 99
100/* Hash table parameters */
101#define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
ed9b544e 102
103/* Command flags */
3fd78bcd 104#define REDIS_CMD_BULK 1 /* Bulk write command */
105#define REDIS_CMD_INLINE 2 /* Inline command */
106/* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110#define REDIS_CMD_DENYOOM 4
ed9b544e 111
112/* Object types */
113#define REDIS_STRING 0
114#define REDIS_LIST 1
115#define REDIS_SET 2
1812e024 116#define REDIS_ZSET 3
117#define REDIS_HASH 4
f78fd11b 118
942a3961 119/* Objects encoding */
120#define REDIS_ENCODING_RAW 0 /* Raw representation */
121#define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
f78fd11b 123/* Object types only used for dumping to disk */
bb32ede5 124#define REDIS_EXPIRETIME 253
ed9b544e 125#define REDIS_SELECTDB 254
126#define REDIS_EOF 255
127
f78fd11b 128/* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
a4d1ba9a 135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
f78fd11b 138 *
10c43610 139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
f78fd11b 141#define REDIS_RDB_6BITLEN 0
142#define REDIS_RDB_14BITLEN 1
143#define REDIS_RDB_32BITLEN 2
17be1a4a 144#define REDIS_RDB_ENCVAL 3
f78fd11b 145#define REDIS_RDB_LENERR UINT_MAX
146
a4d1ba9a 147/* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150#define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151#define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
774e3047 153#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
a4d1ba9a 154
ed9b544e 155/* Client flags */
156#define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157#define REDIS_SLAVE 2 /* This client is a slave server */
158#define REDIS_MASTER 4 /* This client is a master server */
87eca727 159#define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
6e469882 160#define REDIS_MULTI 16 /* This client is in a MULTI context */
4409877e 161#define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
ed9b544e 162
40d224a9 163/* Slave replication state - slave side */
ed9b544e 164#define REDIS_REPL_NONE 0 /* No active replication */
165#define REDIS_REPL_CONNECT 1 /* Must connect to master */
166#define REDIS_REPL_CONNECTED 2 /* Connected to master */
167
40d224a9 168/* Slave replication state - from the point of view of master
169 * Note that in SEND_BULK and ONLINE state the slave receives new updates
170 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
171 * to start the next background saving in order to send updates to it. */
172#define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
173#define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
174#define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
175#define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
176
ed9b544e 177/* List related stuff */
178#define REDIS_HEAD 0
179#define REDIS_TAIL 1
180
181/* Sort operations */
182#define REDIS_SORT_GET 0
443c6409 183#define REDIS_SORT_ASC 1
184#define REDIS_SORT_DESC 2
ed9b544e 185#define REDIS_SORTKEY_MAX 1024
186
187/* Log levels */
188#define REDIS_DEBUG 0
189#define REDIS_NOTICE 1
190#define REDIS_WARNING 2
191
192/* Anti-warning macro... */
193#define REDIS_NOTUSED(V) ((void) V)
194
6b47e12e 195#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
196#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
ed9b544e 197
48f0308a 198/* Append only defines */
199#define APPENDFSYNC_NO 0
200#define APPENDFSYNC_ALWAYS 1
201#define APPENDFSYNC_EVERYSEC 2
202
dfc5e96c 203/* We can print the stacktrace, so our assert is defined this way: */
204#define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
205static void _redisAssert(char *estr);
206
ed9b544e 207/*================================= Data types ============================== */
208
209/* A redis object, that is a type able to hold a string / list / set */
210typedef struct redisObject {
ed9b544e 211 void *ptr;
942a3961 212 unsigned char type;
213 unsigned char encoding;
214 unsigned char notused[2];
ed9b544e 215 int refcount;
216} robj;
217
dfc5e96c 218/* Macro used to initalize a Redis object allocated on the stack.
219 * Note that this macro is taken near the structure definition to make sure
220 * we'll update it when the structure is changed, to avoid bugs like
221 * bug #85 introduced exactly in this way. */
222#define initStaticStringObject(_var,_ptr) do { \
223 _var.refcount = 1; \
224 _var.type = REDIS_STRING; \
225 _var.encoding = REDIS_ENCODING_RAW; \
226 _var.ptr = _ptr; \
227} while(0);
228
3305306f 229typedef struct redisDb {
4409877e 230 dict *dict; /* The keyspace for this DB */
231 dict *expires; /* Timeout of keys with a timeout set */
232 dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */
3305306f 233 int id;
234} redisDb;
235
6e469882 236/* Client MULTI/EXEC state */
237typedef struct multiCmd {
238 robj **argv;
239 int argc;
240 struct redisCommand *cmd;
241} multiCmd;
242
243typedef struct multiState {
244 multiCmd *commands; /* Array of MULTI commands */
245 int count; /* Total number of MULTI commands */
246} multiState;
247
ed9b544e 248/* With multiplexing we need to take per-clinet state.
249 * Clients are taken in a liked list. */
250typedef struct redisClient {
251 int fd;
3305306f 252 redisDb *db;
ed9b544e 253 int dictid;
254 sds querybuf;
e8a74421 255 robj **argv, **mbargv;
256 int argc, mbargc;
40d224a9 257 int bulklen; /* bulk read len. -1 if not in bulk read mode */
e8a74421 258 int multibulk; /* multi bulk command format active */
ed9b544e 259 list *reply;
260 int sentlen;
261 time_t lastinteraction; /* time of the last interaction, used for timeout */
40d224a9 262 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
6e469882 263 /* REDIS_MULTI */
40d224a9 264 int slaveseldb; /* slave selected db, if this client is a slave */
265 int authenticated; /* when requirepass is non-NULL */
266 int replstate; /* replication state if this is a slave */
267 int repldbfd; /* replication DB file descriptor */
6e469882 268 long repldboff; /* replication DB file offset */
40d224a9 269 off_t repldbsize; /* replication DB file size */
6e469882 270 multiState mstate; /* MULTI/EXEC state */
4409877e 271 robj *blockingkey; /* The key we waiting to terminate a blocking
272 * operation such as BLPOP. Otherwise NULL. */
273 time_t blockingto; /* Blocking operation timeout. If UNIX current time
274 * is >= blockingto then the operation timed out. */
ed9b544e 275} redisClient;
276
277struct saveparam {
278 time_t seconds;
279 int changes;
280};
281
282/* Global server state structure */
283struct redisServer {
284 int port;
285 int fd;
3305306f 286 redisDb *db;
4409877e 287 dict *sharingpool; /* Poll used for object sharing */
10c43610 288 unsigned int sharingpoolsize;
ed9b544e 289 long long dirty; /* changes to DB from the last save */
290 list *clients;
87eca727 291 list *slaves, *monitors;
ed9b544e 292 char neterr[ANET_ERR_LEN];
293 aeEventLoop *el;
294 int cronloops; /* number of times the cron function run */
295 list *objfreelist; /* A list of freed objects to avoid malloc() */
296 time_t lastsave; /* Unix time of last save succeeede */
5fba9f71 297 size_t usedmemory; /* Used memory in megabytes */
ed9b544e 298 /* Fields used only for stats */
299 time_t stat_starttime; /* server start time */
300 long long stat_numcommands; /* number of processed commands */
301 long long stat_numconnections; /* number of connections received */
302 /* Configuration */
303 int verbosity;
304 int glueoutputbuf;
305 int maxidletime;
306 int dbnum;
307 int daemonize;
44b38ef4 308 int appendonly;
48f0308a 309 int appendfsync;
310 time_t lastfsync;
44b38ef4 311 int appendfd;
312 int appendseldb;
ed329fcf 313 char *pidfile;
9f3c422c 314 pid_t bgsavechildpid;
9d65a1bb 315 pid_t bgrewritechildpid;
316 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
ed9b544e 317 struct saveparam *saveparams;
318 int saveparamslen;
319 char *logfile;
320 char *bindaddr;
321 char *dbfilename;
44b38ef4 322 char *appendfilename;
abcb223e 323 char *requirepass;
10c43610 324 int shareobjects;
121f70cf 325 int rdbcompression;
ed9b544e 326 /* Replication related */
327 int isslave;
d0ccebcf 328 char *masterauth;
ed9b544e 329 char *masterhost;
330 int masterport;
40d224a9 331 redisClient *master; /* client that is master for this slave */
ed9b544e 332 int replstate;
285add55 333 unsigned int maxclients;
d4465900 334 unsigned long maxmemory;
ed9b544e 335 /* Sort parameters - qsort_r() is only available under BSD so we
336 * have to take this state global, in order to pass it to sortCompare() */
337 int sort_desc;
338 int sort_alpha;
339 int sort_bypattern;
340};
341
342typedef void redisCommandProc(redisClient *c);
343struct redisCommand {
344 char *name;
345 redisCommandProc *proc;
346 int arity;
347 int flags;
348};
349
de96dbfe 350struct redisFunctionSym {
351 char *name;
56906eef 352 unsigned long pointer;
de96dbfe 353};
354
ed9b544e 355typedef struct _redisSortObject {
356 robj *obj;
357 union {
358 double score;
359 robj *cmpobj;
360 } u;
361} redisSortObject;
362
363typedef struct _redisSortOperation {
364 int type;
365 robj *pattern;
366} redisSortOperation;
367
6b47e12e 368/* ZSETs use a specialized version of Skiplists */
369
370typedef struct zskiplistNode {
371 struct zskiplistNode **forward;
e3870fab 372 struct zskiplistNode *backward;
6b47e12e 373 double score;
374 robj *obj;
375} zskiplistNode;
376
377typedef struct zskiplist {
e3870fab 378 struct zskiplistNode *header, *tail;
d13f767c 379 unsigned long length;
6b47e12e 380 int level;
381} zskiplist;
382
1812e024 383typedef struct zset {
384 dict *dict;
6b47e12e 385 zskiplist *zsl;
1812e024 386} zset;
387
6b47e12e 388/* Our shared "common" objects */
389
ed9b544e 390struct sharedObjectsStruct {
c937aa89 391 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
6e469882 392 *colon, *nullbulk, *nullmultibulk, *queued,
c937aa89 393 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
394 *outofrangeerr, *plus,
ed9b544e 395 *select0, *select1, *select2, *select3, *select4,
396 *select5, *select6, *select7, *select8, *select9;
397} shared;
398
a7866db6 399/* Global vars that are actally used as constants. The following double
400 * values are used for double on-disk serialization, and are initialized
401 * at runtime to avoid strange compiler optimizations. */
402
403static double R_Zero, R_PosInf, R_NegInf, R_Nan;
404
ed9b544e 405/*================================ Prototypes =============================== */
406
407static void freeStringObject(robj *o);
408static void freeListObject(robj *o);
409static void freeSetObject(robj *o);
410static void decrRefCount(void *o);
411static robj *createObject(int type, void *ptr);
412static void freeClient(redisClient *c);
f78fd11b 413static int rdbLoad(char *filename);
ed9b544e 414static void addReply(redisClient *c, robj *obj);
415static void addReplySds(redisClient *c, sds s);
416static void incrRefCount(robj *o);
f78fd11b 417static int rdbSaveBackground(char *filename);
ed9b544e 418static robj *createStringObject(char *ptr, size_t len);
87eca727 419static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
44b38ef4 420static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
ed9b544e 421static int syncWithMaster(void);
10c43610 422static robj *tryObjectSharing(robj *o);
942a3961 423static int tryObjectEncoding(robj *o);
9d65a1bb 424static robj *getDecodedObject(robj *o);
3305306f 425static int removeExpire(redisDb *db, robj *key);
426static int expireIfNeeded(redisDb *db, robj *key);
427static int deleteIfVolatile(redisDb *db, robj *key);
94754ccc 428static int deleteKey(redisDb *db, robj *key);
bb32ede5 429static time_t getExpire(redisDb *db, robj *key);
430static int setExpire(redisDb *db, robj *key, time_t when);
a3b21203 431static void updateSlavesWaitingBgsave(int bgsaveerr);
3fd78bcd 432static void freeMemoryIfNeeded(void);
de96dbfe 433static int processCommand(redisClient *c);
56906eef 434static void setupSigSegvAction(void);
a3b21203 435static void rdbRemoveTempFile(pid_t childpid);
9d65a1bb 436static void aofRemoveTempFile(pid_t childpid);
0ea663ea 437static size_t stringObjectLen(robj *o);
638e42ac 438static void processInputBuffer(redisClient *c);
6b47e12e 439static zskiplist *zslCreate(void);
fd8ccf44 440static void zslFree(zskiplist *zsl);
2b59cfdf 441static void zslInsert(zskiplist *zsl, double score, robj *obj);
2895e862 442static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
6e469882 443static void initClientMultiState(redisClient *c);
444static void freeClientMultiState(redisClient *c);
445static void queueMultiCommand(redisClient *c, struct redisCommand *cmd);
4409877e 446static void unblockClient(redisClient *c);
447static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
ed9b544e 448
abcb223e 449static void authCommand(redisClient *c);
ed9b544e 450static void pingCommand(redisClient *c);
451static void echoCommand(redisClient *c);
452static void setCommand(redisClient *c);
453static void setnxCommand(redisClient *c);
454static void getCommand(redisClient *c);
455static void delCommand(redisClient *c);
456static void existsCommand(redisClient *c);
457static void incrCommand(redisClient *c);
458static void decrCommand(redisClient *c);
459static void incrbyCommand(redisClient *c);
460static void decrbyCommand(redisClient *c);
461static void selectCommand(redisClient *c);
462static void randomkeyCommand(redisClient *c);
463static void keysCommand(redisClient *c);
464static void dbsizeCommand(redisClient *c);
465static void lastsaveCommand(redisClient *c);
466static void saveCommand(redisClient *c);
467static void bgsaveCommand(redisClient *c);
9d65a1bb 468static void bgrewriteaofCommand(redisClient *c);
ed9b544e 469static void shutdownCommand(redisClient *c);
470static void moveCommand(redisClient *c);
471static void renameCommand(redisClient *c);
472static void renamenxCommand(redisClient *c);
473static void lpushCommand(redisClient *c);
474static void rpushCommand(redisClient *c);
475static void lpopCommand(redisClient *c);
476static void rpopCommand(redisClient *c);
477static void llenCommand(redisClient *c);
478static void lindexCommand(redisClient *c);
479static void lrangeCommand(redisClient *c);
480static void ltrimCommand(redisClient *c);
481static void typeCommand(redisClient *c);
482static void lsetCommand(redisClient *c);
483static void saddCommand(redisClient *c);
484static void sremCommand(redisClient *c);
a4460ef4 485static void smoveCommand(redisClient *c);
ed9b544e 486static void sismemberCommand(redisClient *c);
487static void scardCommand(redisClient *c);
12fea928 488static void spopCommand(redisClient *c);
2abb95a9 489static void srandmemberCommand(redisClient *c);
ed9b544e 490static void sinterCommand(redisClient *c);
491static void sinterstoreCommand(redisClient *c);
40d224a9 492static void sunionCommand(redisClient *c);
493static void sunionstoreCommand(redisClient *c);
f4f56e1d 494static void sdiffCommand(redisClient *c);
495static void sdiffstoreCommand(redisClient *c);
ed9b544e 496static void syncCommand(redisClient *c);
497static void flushdbCommand(redisClient *c);
498static void flushallCommand(redisClient *c);
499static void sortCommand(redisClient *c);
500static void lremCommand(redisClient *c);
0f5f7e9a 501static void rpoplpushcommand(redisClient *c);
ed9b544e 502static void infoCommand(redisClient *c);
70003d28 503static void mgetCommand(redisClient *c);
87eca727 504static void monitorCommand(redisClient *c);
3305306f 505static void expireCommand(redisClient *c);
802e8373 506static void expireatCommand(redisClient *c);
f6b141c5 507static void getsetCommand(redisClient *c);
fd88489a 508static void ttlCommand(redisClient *c);
321b0e13 509static void slaveofCommand(redisClient *c);
7f957c92 510static void debugCommand(redisClient *c);
f6b141c5 511static void msetCommand(redisClient *c);
512static void msetnxCommand(redisClient *c);
fd8ccf44 513static void zaddCommand(redisClient *c);
7db723ad 514static void zincrbyCommand(redisClient *c);
cc812361 515static void zrangeCommand(redisClient *c);
50c55df5 516static void zrangebyscoreCommand(redisClient *c);
e3870fab 517static void zrevrangeCommand(redisClient *c);
3c41331e 518static void zcardCommand(redisClient *c);
1b7106e7 519static void zremCommand(redisClient *c);
6e333bbe 520static void zscoreCommand(redisClient *c);
1807985b 521static void zremrangebyscoreCommand(redisClient *c);
6e469882 522static void multiCommand(redisClient *c);
523static void execCommand(redisClient *c);
4409877e 524static void blpopCommand(redisClient *c);
525static void brpopCommand(redisClient *c);
f6b141c5 526
ed9b544e 527/*================================= Globals ================================= */
528
529/* Global vars */
530static struct redisServer server; /* server global state */
531static struct redisCommand cmdTable[] = {
532 {"get",getCommand,2,REDIS_CMD_INLINE},
3fd78bcd 533 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
534 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
5109cdff 535 {"del",delCommand,-2,REDIS_CMD_INLINE},
ed9b544e 536 {"exists",existsCommand,2,REDIS_CMD_INLINE},
3fd78bcd 537 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
538 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
70003d28 539 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
3fd78bcd 540 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
541 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 542 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
543 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
4409877e 544 {"brpop",brpopCommand,3,REDIS_CMD_INLINE},
545 {"blpop",blpopCommand,3,REDIS_CMD_INLINE},
ed9b544e 546 {"llen",llenCommand,2,REDIS_CMD_INLINE},
547 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
3fd78bcd 548 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 549 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
550 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
551 {"lrem",lremCommand,4,REDIS_CMD_BULK},
0b13687c 552 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
3fd78bcd 553 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 554 {"srem",sremCommand,3,REDIS_CMD_BULK},
a4460ef4 555 {"smove",smoveCommand,4,REDIS_CMD_BULK},
ed9b544e 556 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
557 {"scard",scardCommand,2,REDIS_CMD_INLINE},
12fea928 558 {"spop",spopCommand,2,REDIS_CMD_INLINE},
2abb95a9 559 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
3fd78bcd 560 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
561 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
562 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
563 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
564 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
565 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
ed9b544e 566 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
fd8ccf44 567 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
7db723ad 568 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
1b7106e7 569 {"zrem",zremCommand,3,REDIS_CMD_BULK},
1807985b 570 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
752da584 571 {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE},
80181f78 572 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
752da584 573 {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE},
3c41331e 574 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
6e333bbe 575 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
3fd78bcd 576 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
577 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
f6b141c5 578 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
579 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
580 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 581 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
582 {"select",selectCommand,2,REDIS_CMD_INLINE},
583 {"move",moveCommand,3,REDIS_CMD_INLINE},
584 {"rename",renameCommand,3,REDIS_CMD_INLINE},
585 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
321b0e13 586 {"expire",expireCommand,3,REDIS_CMD_INLINE},
802e8373 587 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
ed9b544e 588 {"keys",keysCommand,2,REDIS_CMD_INLINE},
589 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
abcb223e 590 {"auth",authCommand,2,REDIS_CMD_INLINE},
ed9b544e 591 {"ping",pingCommand,1,REDIS_CMD_INLINE},
592 {"echo",echoCommand,2,REDIS_CMD_BULK},
593 {"save",saveCommand,1,REDIS_CMD_INLINE},
594 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
9d65a1bb 595 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
ed9b544e 596 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
597 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
598 {"type",typeCommand,2,REDIS_CMD_INLINE},
6e469882 599 {"multi",multiCommand,1,REDIS_CMD_INLINE},
600 {"exec",execCommand,1,REDIS_CMD_INLINE},
ed9b544e 601 {"sync",syncCommand,1,REDIS_CMD_INLINE},
602 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
603 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
3fd78bcd 604 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
ed9b544e 605 {"info",infoCommand,1,REDIS_CMD_INLINE},
87eca727 606 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
fd88489a 607 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
321b0e13 608 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
7f957c92 609 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
ed9b544e 610 {NULL,NULL,0,0}
611};
bcfc686d 612
ed9b544e 613/*============================ Utility functions ============================ */
614
615/* Glob-style pattern matching. */
616int stringmatchlen(const char *pattern, int patternLen,
617 const char *string, int stringLen, int nocase)
618{
619 while(patternLen) {
620 switch(pattern[0]) {
621 case '*':
622 while (pattern[1] == '*') {
623 pattern++;
624 patternLen--;
625 }
626 if (patternLen == 1)
627 return 1; /* match */
628 while(stringLen) {
629 if (stringmatchlen(pattern+1, patternLen-1,
630 string, stringLen, nocase))
631 return 1; /* match */
632 string++;
633 stringLen--;
634 }
635 return 0; /* no match */
636 break;
637 case '?':
638 if (stringLen == 0)
639 return 0; /* no match */
640 string++;
641 stringLen--;
642 break;
643 case '[':
644 {
645 int not, match;
646
647 pattern++;
648 patternLen--;
649 not = pattern[0] == '^';
650 if (not) {
651 pattern++;
652 patternLen--;
653 }
654 match = 0;
655 while(1) {
656 if (pattern[0] == '\\') {
657 pattern++;
658 patternLen--;
659 if (pattern[0] == string[0])
660 match = 1;
661 } else if (pattern[0] == ']') {
662 break;
663 } else if (patternLen == 0) {
664 pattern--;
665 patternLen++;
666 break;
667 } else if (pattern[1] == '-' && patternLen >= 3) {
668 int start = pattern[0];
669 int end = pattern[2];
670 int c = string[0];
671 if (start > end) {
672 int t = start;
673 start = end;
674 end = t;
675 }
676 if (nocase) {
677 start = tolower(start);
678 end = tolower(end);
679 c = tolower(c);
680 }
681 pattern += 2;
682 patternLen -= 2;
683 if (c >= start && c <= end)
684 match = 1;
685 } else {
686 if (!nocase) {
687 if (pattern[0] == string[0])
688 match = 1;
689 } else {
690 if (tolower((int)pattern[0]) == tolower((int)string[0]))
691 match = 1;
692 }
693 }
694 pattern++;
695 patternLen--;
696 }
697 if (not)
698 match = !match;
699 if (!match)
700 return 0; /* no match */
701 string++;
702 stringLen--;
703 break;
704 }
705 case '\\':
706 if (patternLen >= 2) {
707 pattern++;
708 patternLen--;
709 }
710 /* fall through */
711 default:
712 if (!nocase) {
713 if (pattern[0] != string[0])
714 return 0; /* no match */
715 } else {
716 if (tolower((int)pattern[0]) != tolower((int)string[0]))
717 return 0; /* no match */
718 }
719 string++;
720 stringLen--;
721 break;
722 }
723 pattern++;
724 patternLen--;
725 if (stringLen == 0) {
726 while(*pattern == '*') {
727 pattern++;
728 patternLen--;
729 }
730 break;
731 }
732 }
733 if (patternLen == 0 && stringLen == 0)
734 return 1;
735 return 0;
736}
737
56906eef 738static void redisLog(int level, const char *fmt, ...) {
ed9b544e 739 va_list ap;
740 FILE *fp;
741
742 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
743 if (!fp) return;
744
745 va_start(ap, fmt);
746 if (level >= server.verbosity) {
747 char *c = ".-*";
1904ecc1 748 char buf[64];
749 time_t now;
750
751 now = time(NULL);
6c9385e0 752 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
1904ecc1 753 fprintf(fp,"%s %c ",buf,c[level]);
ed9b544e 754 vfprintf(fp, fmt, ap);
755 fprintf(fp,"\n");
756 fflush(fp);
757 }
758 va_end(ap);
759
760 if (server.logfile) fclose(fp);
761}
762
763/*====================== Hash table type implementation ==================== */
764
765/* This is an hash table type that uses the SDS dynamic strings libary as
766 * keys and radis objects as values (objects can hold SDS strings,
767 * lists, sets). */
768
1812e024 769static void dictVanillaFree(void *privdata, void *val)
770{
771 DICT_NOTUSED(privdata);
772 zfree(val);
773}
774
4409877e 775static void dictListDestructor(void *privdata, void *val)
776{
777 DICT_NOTUSED(privdata);
778 listRelease((list*)val);
779}
780
ed9b544e 781static int sdsDictKeyCompare(void *privdata, const void *key1,
782 const void *key2)
783{
784 int l1,l2;
785 DICT_NOTUSED(privdata);
786
787 l1 = sdslen((sds)key1);
788 l2 = sdslen((sds)key2);
789 if (l1 != l2) return 0;
790 return memcmp(key1, key2, l1) == 0;
791}
792
793static void dictRedisObjectDestructor(void *privdata, void *val)
794{
795 DICT_NOTUSED(privdata);
796
797 decrRefCount(val);
798}
799
942a3961 800static int dictObjKeyCompare(void *privdata, const void *key1,
ed9b544e 801 const void *key2)
802{
803 const robj *o1 = key1, *o2 = key2;
804 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
805}
806
942a3961 807static unsigned int dictObjHash(const void *key) {
ed9b544e 808 const robj *o = key;
809 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
810}
811
942a3961 812static int dictEncObjKeyCompare(void *privdata, const void *key1,
813 const void *key2)
814{
9d65a1bb 815 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
816 int cmp;
942a3961 817
9d65a1bb 818 o1 = getDecodedObject(o1);
819 o2 = getDecodedObject(o2);
820 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
821 decrRefCount(o1);
822 decrRefCount(o2);
823 return cmp;
942a3961 824}
825
826static unsigned int dictEncObjHash(const void *key) {
9d65a1bb 827 robj *o = (robj*) key;
942a3961 828
9d65a1bb 829 o = getDecodedObject(o);
830 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
831 decrRefCount(o);
832 return hash;
942a3961 833}
834
ed9b544e 835static dictType setDictType = {
942a3961 836 dictEncObjHash, /* hash function */
ed9b544e 837 NULL, /* key dup */
838 NULL, /* val dup */
942a3961 839 dictEncObjKeyCompare, /* key compare */
ed9b544e 840 dictRedisObjectDestructor, /* key destructor */
841 NULL /* val destructor */
842};
843
1812e024 844static dictType zsetDictType = {
845 dictEncObjHash, /* hash function */
846 NULL, /* key dup */
847 NULL, /* val dup */
848 dictEncObjKeyCompare, /* key compare */
849 dictRedisObjectDestructor, /* key destructor */
da0a1620 850 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
1812e024 851};
852
ed9b544e 853static dictType hashDictType = {
942a3961 854 dictObjHash, /* hash function */
ed9b544e 855 NULL, /* key dup */
856 NULL, /* val dup */
942a3961 857 dictObjKeyCompare, /* key compare */
ed9b544e 858 dictRedisObjectDestructor, /* key destructor */
859 dictRedisObjectDestructor /* val destructor */
860};
861
4409877e 862/* Keylist hash table type has unencoded redis objects as keys and
863 * lists as values. It's used for blocking operations (BLPOP) */
864static dictType keylistDictType = {
865 dictObjHash, /* hash function */
866 NULL, /* key dup */
867 NULL, /* val dup */
868 dictObjKeyCompare, /* key compare */
869 dictRedisObjectDestructor, /* key destructor */
870 dictListDestructor /* val destructor */
871};
872
ed9b544e 873/* ========================= Random utility functions ======================= */
874
875/* Redis generally does not try to recover from out of memory conditions
876 * when allocating objects or strings, it is not clear if it will be possible
877 * to report this condition to the client since the networking layer itself
878 * is based on heap allocation for send buffers, so we simply abort.
879 * At least the code will be simpler to read... */
880static void oom(const char *msg) {
71c54b21 881 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
ed9b544e 882 sleep(1);
883 abort();
884}
885
886/* ====================== Redis server networking stuff ===================== */
56906eef 887static void closeTimedoutClients(void) {
ed9b544e 888 redisClient *c;
ed9b544e 889 listNode *ln;
890 time_t now = time(NULL);
891
6208b3a7 892 listRewind(server.clients);
893 while ((ln = listYield(server.clients)) != NULL) {
ed9b544e 894 c = listNodeValue(ln);
895 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
c7cf2ec9 896 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
ed9b544e 897 (now - c->lastinteraction > server.maxidletime)) {
898 redisLog(REDIS_DEBUG,"Closing idle client");
899 freeClient(c);
900 }
901 }
ed9b544e 902}
903
12fea928 904static int htNeedsResize(dict *dict) {
905 long long size, used;
906
907 size = dictSlots(dict);
908 used = dictSize(dict);
909 return (size && used && size > DICT_HT_INITIAL_SIZE &&
910 (used*100/size < REDIS_HT_MINFILL));
911}
912
0bc03378 913/* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
914 * we resize the hash table to save memory */
56906eef 915static void tryResizeHashTables(void) {
0bc03378 916 int j;
917
918 for (j = 0; j < server.dbnum; j++) {
12fea928 919 if (htNeedsResize(server.db[j].dict)) {
920 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
0bc03378 921 dictResize(server.db[j].dict);
12fea928 922 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
0bc03378 923 }
12fea928 924 if (htNeedsResize(server.db[j].expires))
925 dictResize(server.db[j].expires);
0bc03378 926 }
927}
928
9d65a1bb 929/* A background saving child (BGSAVE) terminated its work. Handle this. */
930void backgroundSaveDoneHandler(int statloc) {
931 int exitcode = WEXITSTATUS(statloc);
932 int bysignal = WIFSIGNALED(statloc);
933
934 if (!bysignal && exitcode == 0) {
935 redisLog(REDIS_NOTICE,
936 "Background saving terminated with success");
937 server.dirty = 0;
938 server.lastsave = time(NULL);
939 } else if (!bysignal && exitcode != 0) {
940 redisLog(REDIS_WARNING, "Background saving error");
941 } else {
942 redisLog(REDIS_WARNING,
943 "Background saving terminated by signal");
944 rdbRemoveTempFile(server.bgsavechildpid);
945 }
946 server.bgsavechildpid = -1;
947 /* Possibly there are slaves waiting for a BGSAVE in order to be served
948 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
949 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
950}
951
952/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
953 * Handle this. */
954void backgroundRewriteDoneHandler(int statloc) {
955 int exitcode = WEXITSTATUS(statloc);
956 int bysignal = WIFSIGNALED(statloc);
957
958 if (!bysignal && exitcode == 0) {
959 int fd;
960 char tmpfile[256];
961
962 redisLog(REDIS_NOTICE,
963 "Background append only file rewriting terminated with success");
964 /* Now it's time to flush the differences accumulated by the parent */
965 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
966 fd = open(tmpfile,O_WRONLY|O_APPEND);
967 if (fd == -1) {
968 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
969 goto cleanup;
970 }
971 /* Flush our data... */
972 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
973 (signed) sdslen(server.bgrewritebuf)) {
974 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
975 close(fd);
976 goto cleanup;
977 }
b32627cd 978 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
9d65a1bb 979 /* Now our work is to rename the temp file into the stable file. And
980 * switch the file descriptor used by the server for append only. */
981 if (rename(tmpfile,server.appendfilename) == -1) {
982 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
983 close(fd);
984 goto cleanup;
985 }
986 /* Mission completed... almost */
987 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
988 if (server.appendfd != -1) {
989 /* If append only is actually enabled... */
990 close(server.appendfd);
991 server.appendfd = fd;
992 fsync(fd);
85a83172 993 server.appendseldb = -1; /* Make sure it will issue SELECT */
9d65a1bb 994 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
995 } else {
996 /* If append only is disabled we just generate a dump in this
997 * format. Why not? */
998 close(fd);
999 }
1000 } else if (!bysignal && exitcode != 0) {
1001 redisLog(REDIS_WARNING, "Background append only file rewriting error");
1002 } else {
1003 redisLog(REDIS_WARNING,
1004 "Background append only file rewriting terminated by signal");
1005 }
1006cleanup:
1007 sdsfree(server.bgrewritebuf);
1008 server.bgrewritebuf = sdsempty();
1009 aofRemoveTempFile(server.bgrewritechildpid);
1010 server.bgrewritechildpid = -1;
1011}
1012
56906eef 1013static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
94754ccc 1014 int j, loops = server.cronloops++;
ed9b544e 1015 REDIS_NOTUSED(eventLoop);
1016 REDIS_NOTUSED(id);
1017 REDIS_NOTUSED(clientData);
1018
1019 /* Update the global state with the amount of used memory */
1020 server.usedmemory = zmalloc_used_memory();
1021
0bc03378 1022 /* Show some info about non-empty databases */
ed9b544e 1023 for (j = 0; j < server.dbnum; j++) {
dec423d9 1024 long long size, used, vkeys;
94754ccc 1025
3305306f 1026 size = dictSlots(server.db[j].dict);
1027 used = dictSize(server.db[j].dict);
94754ccc 1028 vkeys = dictSize(server.db[j].expires);
c3cb078d 1029 if (!(loops % 5) && (used || vkeys)) {
1030 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
a4d1ba9a 1031 /* dictPrintStats(server.dict); */
ed9b544e 1032 }
ed9b544e 1033 }
1034
0bc03378 1035 /* We don't want to resize the hash tables while a bacground saving
1036 * is in progress: the saving child is created using fork() that is
1037 * implemented with a copy-on-write semantic in most modern systems, so
1038 * if we resize the HT while there is the saving child at work actually
1039 * a lot of memory movements in the parent will cause a lot of pages
1040 * copied. */
9d65a1bb 1041 if (server.bgsavechildpid == -1) tryResizeHashTables();
0bc03378 1042
ed9b544e 1043 /* Show information about connected clients */
1044 if (!(loops % 5)) {
21aecf4b 1045 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
ed9b544e 1046 listLength(server.clients)-listLength(server.slaves),
1047 listLength(server.slaves),
10c43610 1048 server.usedmemory,
3305306f 1049 dictSize(server.sharingpool));
ed9b544e 1050 }
1051
1052 /* Close connections of timedout clients */
0150db36 1053 if (server.maxidletime && !(loops % 10))
ed9b544e 1054 closeTimedoutClients();
1055
9d65a1bb 1056 /* Check if a background saving or AOF rewrite in progress terminated */
1057 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
ed9b544e 1058 int statloc;
9d65a1bb 1059 pid_t pid;
1060
1061 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1062 if (pid == server.bgsavechildpid) {
1063 backgroundSaveDoneHandler(statloc);
ed9b544e 1064 } else {
9d65a1bb 1065 backgroundRewriteDoneHandler(statloc);
ed9b544e 1066 }
ed9b544e 1067 }
1068 } else {
1069 /* If there is not a background saving in progress check if
1070 * we have to save now */
1071 time_t now = time(NULL);
1072 for (j = 0; j < server.saveparamslen; j++) {
1073 struct saveparam *sp = server.saveparams+j;
1074
1075 if (server.dirty >= sp->changes &&
1076 now-server.lastsave > sp->seconds) {
1077 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1078 sp->changes, sp->seconds);
f78fd11b 1079 rdbSaveBackground(server.dbfilename);
ed9b544e 1080 break;
1081 }
1082 }
1083 }
94754ccc 1084
f2324293 1085 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1086 * will use few CPU cycles if there are few expiring keys, otherwise
1087 * it will get more aggressive to avoid that too much memory is used by
1088 * keys that can be removed from the keyspace. */
94754ccc 1089 for (j = 0; j < server.dbnum; j++) {
f2324293 1090 int expired;
94754ccc 1091 redisDb *db = server.db+j;
94754ccc 1092
f2324293 1093 /* Continue to expire if at the end of the cycle more than 25%
1094 * of the keys were expired. */
1095 do {
1096 int num = dictSize(db->expires);
94754ccc 1097 time_t now = time(NULL);
1098
f2324293 1099 expired = 0;
94754ccc 1100 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1101 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1102 while (num--) {
1103 dictEntry *de;
1104 time_t t;
1105
1106 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1107 t = (time_t) dictGetEntryVal(de);
1108 if (now > t) {
1109 deleteKey(db,dictGetEntryKey(de));
f2324293 1110 expired++;
94754ccc 1111 }
1112 }
f2324293 1113 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
94754ccc 1114 }
1115
ed9b544e 1116 /* Check if we should connect to a MASTER */
1117 if (server.replstate == REDIS_REPL_CONNECT) {
1118 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1119 if (syncWithMaster() == REDIS_OK) {
1120 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1121 }
1122 }
1123 return 1000;
1124}
1125
1126static void createSharedObjects(void) {
1127 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1128 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1129 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
c937aa89 1130 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1131 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1132 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1133 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1134 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1135 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
ed9b544e 1136 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
6e469882 1137 shared.queued = createObject(REDIS_STRING,sdsnew("+QUEUED\r\n"));
ed9b544e 1138 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1139 "-ERR Operation against a key holding the wrong kind of value\r\n"));
ed9b544e 1140 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1141 "-ERR no such key\r\n"));
ed9b544e 1142 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1143 "-ERR syntax error\r\n"));
c937aa89 1144 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1145 "-ERR source and destination objects are the same\r\n"));
1146 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1147 "-ERR index out of range\r\n"));
ed9b544e 1148 shared.space = createObject(REDIS_STRING,sdsnew(" "));
c937aa89 1149 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1150 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
ed9b544e 1151 shared.select0 = createStringObject("select 0\r\n",10);
1152 shared.select1 = createStringObject("select 1\r\n",10);
1153 shared.select2 = createStringObject("select 2\r\n",10);
1154 shared.select3 = createStringObject("select 3\r\n",10);
1155 shared.select4 = createStringObject("select 4\r\n",10);
1156 shared.select5 = createStringObject("select 5\r\n",10);
1157 shared.select6 = createStringObject("select 6\r\n",10);
1158 shared.select7 = createStringObject("select 7\r\n",10);
1159 shared.select8 = createStringObject("select 8\r\n",10);
1160 shared.select9 = createStringObject("select 9\r\n",10);
1161}
1162
1163static void appendServerSaveParams(time_t seconds, int changes) {
1164 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
ed9b544e 1165 server.saveparams[server.saveparamslen].seconds = seconds;
1166 server.saveparams[server.saveparamslen].changes = changes;
1167 server.saveparamslen++;
1168}
1169
bcfc686d 1170static void resetServerSaveParams() {
ed9b544e 1171 zfree(server.saveparams);
1172 server.saveparams = NULL;
1173 server.saveparamslen = 0;
1174}
1175
1176static void initServerConfig() {
1177 server.dbnum = REDIS_DEFAULT_DBNUM;
1178 server.port = REDIS_SERVERPORT;
1179 server.verbosity = REDIS_DEBUG;
1180 server.maxidletime = REDIS_MAXIDLETIME;
1181 server.saveparams = NULL;
1182 server.logfile = NULL; /* NULL = log on standard output */
1183 server.bindaddr = NULL;
1184 server.glueoutputbuf = 1;
1185 server.daemonize = 0;
44b38ef4 1186 server.appendonly = 0;
4e141d5a 1187 server.appendfsync = APPENDFSYNC_ALWAYS;
48f0308a 1188 server.lastfsync = time(NULL);
44b38ef4 1189 server.appendfd = -1;
1190 server.appendseldb = -1; /* Make sure the first time will not match */
ed329fcf 1191 server.pidfile = "/var/run/redis.pid";
ed9b544e 1192 server.dbfilename = "dump.rdb";
9d65a1bb 1193 server.appendfilename = "appendonly.aof";
abcb223e 1194 server.requirepass = NULL;
10c43610 1195 server.shareobjects = 0;
b0553789 1196 server.rdbcompression = 1;
21aecf4b 1197 server.sharingpoolsize = 1024;
285add55 1198 server.maxclients = 0;
3fd78bcd 1199 server.maxmemory = 0;
bcfc686d 1200 resetServerSaveParams();
ed9b544e 1201
1202 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1203 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1204 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1205 /* Replication related */
1206 server.isslave = 0;
d0ccebcf 1207 server.masterauth = NULL;
ed9b544e 1208 server.masterhost = NULL;
1209 server.masterport = 6379;
1210 server.master = NULL;
1211 server.replstate = REDIS_REPL_NONE;
a7866db6 1212
1213 /* Double constants initialization */
1214 R_Zero = 0.0;
1215 R_PosInf = 1.0/R_Zero;
1216 R_NegInf = -1.0/R_Zero;
1217 R_Nan = R_Zero/R_Zero;
ed9b544e 1218}
1219
1220static void initServer() {
1221 int j;
1222
1223 signal(SIGHUP, SIG_IGN);
1224 signal(SIGPIPE, SIG_IGN);
fe3bbfbe 1225 setupSigSegvAction();
ed9b544e 1226
1227 server.clients = listCreate();
1228 server.slaves = listCreate();
87eca727 1229 server.monitors = listCreate();
ed9b544e 1230 server.objfreelist = listCreate();
1231 createSharedObjects();
1232 server.el = aeCreateEventLoop();
3305306f 1233 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
10c43610 1234 server.sharingpool = dictCreate(&setDictType,NULL);
ed9b544e 1235 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1236 if (server.fd == -1) {
1237 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1238 exit(1);
1239 }
3305306f 1240 for (j = 0; j < server.dbnum; j++) {
1241 server.db[j].dict = dictCreate(&hashDictType,NULL);
1242 server.db[j].expires = dictCreate(&setDictType,NULL);
4409877e 1243 server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL);
3305306f 1244 server.db[j].id = j;
1245 }
ed9b544e 1246 server.cronloops = 0;
9f3c422c 1247 server.bgsavechildpid = -1;
9d65a1bb 1248 server.bgrewritechildpid = -1;
1249 server.bgrewritebuf = sdsempty();
ed9b544e 1250 server.lastsave = time(NULL);
1251 server.dirty = 0;
1252 server.usedmemory = 0;
1253 server.stat_numcommands = 0;
1254 server.stat_numconnections = 0;
1255 server.stat_starttime = time(NULL);
d8f8b666 1256 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
44b38ef4 1257
1258 if (server.appendonly) {
71eba477 1259 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
44b38ef4 1260 if (server.appendfd == -1) {
1261 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1262 strerror(errno));
1263 exit(1);
1264 }
1265 }
ed9b544e 1266}
1267
1268/* Empty the whole database */
ca37e9cd 1269static long long emptyDb() {
ed9b544e 1270 int j;
ca37e9cd 1271 long long removed = 0;
ed9b544e 1272
3305306f 1273 for (j = 0; j < server.dbnum; j++) {
ca37e9cd 1274 removed += dictSize(server.db[j].dict);
3305306f 1275 dictEmpty(server.db[j].dict);
1276 dictEmpty(server.db[j].expires);
1277 }
ca37e9cd 1278 return removed;
ed9b544e 1279}
1280
85dd2f3a 1281static int yesnotoi(char *s) {
1282 if (!strcasecmp(s,"yes")) return 1;
1283 else if (!strcasecmp(s,"no")) return 0;
1284 else return -1;
1285}
1286
ed9b544e 1287/* I agree, this is a very rudimental way to load a configuration...
1288 will improve later if the config gets more complex */
1289static void loadServerConfig(char *filename) {
c9a111ac 1290 FILE *fp;
ed9b544e 1291 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1292 int linenum = 0;
1293 sds line = NULL;
c9a111ac 1294
1295 if (filename[0] == '-' && filename[1] == '\0')
1296 fp = stdin;
1297 else {
1298 if ((fp = fopen(filename,"r")) == NULL) {
1299 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1300 exit(1);
1301 }
ed9b544e 1302 }
c9a111ac 1303
ed9b544e 1304 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1305 sds *argv;
1306 int argc, j;
1307
1308 linenum++;
1309 line = sdsnew(buf);
1310 line = sdstrim(line," \t\r\n");
1311
1312 /* Skip comments and blank lines*/
1313 if (line[0] == '#' || line[0] == '\0') {
1314 sdsfree(line);
1315 continue;
1316 }
1317
1318 /* Split into arguments */
1319 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1320 sdstolower(argv[0]);
1321
1322 /* Execute config directives */
bb0b03a3 1323 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
ed9b544e 1324 server.maxidletime = atoi(argv[1]);
0150db36 1325 if (server.maxidletime < 0) {
ed9b544e 1326 err = "Invalid timeout value"; goto loaderr;
1327 }
bb0b03a3 1328 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
ed9b544e 1329 server.port = atoi(argv[1]);
1330 if (server.port < 1 || server.port > 65535) {
1331 err = "Invalid port"; goto loaderr;
1332 }
bb0b03a3 1333 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
ed9b544e 1334 server.bindaddr = zstrdup(argv[1]);
bb0b03a3 1335 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
ed9b544e 1336 int seconds = atoi(argv[1]);
1337 int changes = atoi(argv[2]);
1338 if (seconds < 1 || changes < 0) {
1339 err = "Invalid save parameters"; goto loaderr;
1340 }
1341 appendServerSaveParams(seconds,changes);
bb0b03a3 1342 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
ed9b544e 1343 if (chdir(argv[1]) == -1) {
1344 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1345 argv[1], strerror(errno));
1346 exit(1);
1347 }
bb0b03a3 1348 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1349 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1350 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1351 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
ed9b544e 1352 else {
1353 err = "Invalid log level. Must be one of debug, notice, warning";
1354 goto loaderr;
1355 }
bb0b03a3 1356 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
c9a111ac 1357 FILE *logfp;
ed9b544e 1358
1359 server.logfile = zstrdup(argv[1]);
bb0b03a3 1360 if (!strcasecmp(server.logfile,"stdout")) {
ed9b544e 1361 zfree(server.logfile);
1362 server.logfile = NULL;
1363 }
1364 if (server.logfile) {
1365 /* Test if we are able to open the file. The server will not
1366 * be able to abort just for this problem later... */
c9a111ac 1367 logfp = fopen(server.logfile,"a");
1368 if (logfp == NULL) {
ed9b544e 1369 err = sdscatprintf(sdsempty(),
1370 "Can't open the log file: %s", strerror(errno));
1371 goto loaderr;
1372 }
c9a111ac 1373 fclose(logfp);
ed9b544e 1374 }
bb0b03a3 1375 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
ed9b544e 1376 server.dbnum = atoi(argv[1]);
1377 if (server.dbnum < 1) {
1378 err = "Invalid number of databases"; goto loaderr;
1379 }
285add55 1380 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1381 server.maxclients = atoi(argv[1]);
3fd78bcd 1382 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
d4465900 1383 server.maxmemory = strtoll(argv[1], NULL, 10);
bb0b03a3 1384 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
ed9b544e 1385 server.masterhost = sdsnew(argv[1]);
1386 server.masterport = atoi(argv[2]);
1387 server.replstate = REDIS_REPL_CONNECT;
d0ccebcf 1388 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1389 server.masterauth = zstrdup(argv[1]);
bb0b03a3 1390 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
85dd2f3a 1391 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
ed9b544e 1392 err = "argument must be 'yes' or 'no'"; goto loaderr;
1393 }
bb0b03a3 1394 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
85dd2f3a 1395 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
10c43610 1396 err = "argument must be 'yes' or 'no'"; goto loaderr;
1397 }
121f70cf 1398 } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
1399 if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
1400 err = "argument must be 'yes' or 'no'"; goto loaderr;
1401 }
e52c65b9 1402 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1403 server.sharingpoolsize = atoi(argv[1]);
1404 if (server.sharingpoolsize < 1) {
1405 err = "invalid object sharing pool size"; goto loaderr;
1406 }
bb0b03a3 1407 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
85dd2f3a 1408 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
ed9b544e 1409 err = "argument must be 'yes' or 'no'"; goto loaderr;
1410 }
44b38ef4 1411 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1412 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1413 err = "argument must be 'yes' or 'no'"; goto loaderr;
1414 }
48f0308a 1415 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1766c6da 1416 if (!strcasecmp(argv[1],"no")) {
48f0308a 1417 server.appendfsync = APPENDFSYNC_NO;
1766c6da 1418 } else if (!strcasecmp(argv[1],"always")) {
48f0308a 1419 server.appendfsync = APPENDFSYNC_ALWAYS;
1766c6da 1420 } else if (!strcasecmp(argv[1],"everysec")) {
48f0308a 1421 server.appendfsync = APPENDFSYNC_EVERYSEC;
1422 } else {
1423 err = "argument must be 'no', 'always' or 'everysec'";
1424 goto loaderr;
1425 }
bb0b03a3 1426 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
abcb223e 1427 server.requirepass = zstrdup(argv[1]);
bb0b03a3 1428 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
ed329fcf 1429 server.pidfile = zstrdup(argv[1]);
bb0b03a3 1430 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
b8b553c8 1431 server.dbfilename = zstrdup(argv[1]);
ed9b544e 1432 } else {
1433 err = "Bad directive or wrong number of arguments"; goto loaderr;
1434 }
1435 for (j = 0; j < argc; j++)
1436 sdsfree(argv[j]);
1437 zfree(argv);
1438 sdsfree(line);
1439 }
c9a111ac 1440 if (fp != stdin) fclose(fp);
ed9b544e 1441 return;
1442
1443loaderr:
1444 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1445 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1446 fprintf(stderr, ">>> '%s'\n", line);
1447 fprintf(stderr, "%s\n", err);
1448 exit(1);
1449}
1450
1451static void freeClientArgv(redisClient *c) {
1452 int j;
1453
1454 for (j = 0; j < c->argc; j++)
1455 decrRefCount(c->argv[j]);
e8a74421 1456 for (j = 0; j < c->mbargc; j++)
1457 decrRefCount(c->mbargv[j]);
ed9b544e 1458 c->argc = 0;
e8a74421 1459 c->mbargc = 0;
ed9b544e 1460}
1461
1462static void freeClient(redisClient *c) {
1463 listNode *ln;
1464
4409877e 1465 /* Note that if the client we are freeing is blocked into a blocking
1466 * call, we have to set querybuf to NULL *before* to call unblockClient()
1467 * to avoid processInputBuffer() will get called. Also it is important
1468 * to remove the file events after this, because this call adds
1469 * the READABLE event. */
1470 sdsfree(c->querybuf);
1471 c->querybuf = NULL;
1472 if (c->flags & REDIS_BLOCKED)
1473 unblockClient(c);
1474
ed9b544e 1475 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1476 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
ed9b544e 1477 listRelease(c->reply);
1478 freeClientArgv(c);
1479 close(c->fd);
1480 ln = listSearchKey(server.clients,c);
dfc5e96c 1481 redisAssert(ln != NULL);
ed9b544e 1482 listDelNode(server.clients,ln);
1483 if (c->flags & REDIS_SLAVE) {
6208b3a7 1484 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1485 close(c->repldbfd);
87eca727 1486 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1487 ln = listSearchKey(l,c);
dfc5e96c 1488 redisAssert(ln != NULL);
87eca727 1489 listDelNode(l,ln);
ed9b544e 1490 }
1491 if (c->flags & REDIS_MASTER) {
1492 server.master = NULL;
1493 server.replstate = REDIS_REPL_CONNECT;
1494 }
93ea3759 1495 zfree(c->argv);
e8a74421 1496 zfree(c->mbargv);
6e469882 1497 freeClientMultiState(c);
ed9b544e 1498 zfree(c);
1499}
1500
cc30e368 1501#define GLUEREPLY_UP_TO (1024)
ed9b544e 1502static void glueReplyBuffersIfNeeded(redisClient *c) {
c28b42ac 1503 int copylen = 0;
1504 char buf[GLUEREPLY_UP_TO];
6208b3a7 1505 listNode *ln;
ed9b544e 1506 robj *o;
1507
6208b3a7 1508 listRewind(c->reply);
1509 while((ln = listYield(c->reply))) {
c28b42ac 1510 int objlen;
1511
ed9b544e 1512 o = ln->value;
c28b42ac 1513 objlen = sdslen(o->ptr);
1514 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1515 memcpy(buf+copylen,o->ptr,objlen);
1516 copylen += objlen;
ed9b544e 1517 listDelNode(c->reply,ln);
c28b42ac 1518 } else {
1519 if (copylen == 0) return;
1520 break;
ed9b544e 1521 }
ed9b544e 1522 }
c28b42ac 1523 /* Now the output buffer is empty, add the new single element */
1524 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1525 listAddNodeHead(c->reply,o);
ed9b544e 1526}
1527
1528static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1529 redisClient *c = privdata;
1530 int nwritten = 0, totwritten = 0, objlen;
1531 robj *o;
1532 REDIS_NOTUSED(el);
1533 REDIS_NOTUSED(mask);
1534
2895e862 1535 /* Use writev() if we have enough buffers to send */
7ea870c0 1536 if (!server.glueoutputbuf &&
1537 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1538 !(c->flags & REDIS_MASTER))
2895e862 1539 {
1540 sendReplyToClientWritev(el, fd, privdata, mask);
1541 return;
1542 }
2895e862 1543
ed9b544e 1544 while(listLength(c->reply)) {
c28b42ac 1545 if (server.glueoutputbuf && listLength(c->reply) > 1)
1546 glueReplyBuffersIfNeeded(c);
1547
ed9b544e 1548 o = listNodeValue(listFirst(c->reply));
1549 objlen = sdslen(o->ptr);
1550
1551 if (objlen == 0) {
1552 listDelNode(c->reply,listFirst(c->reply));
1553 continue;
1554 }
1555
1556 if (c->flags & REDIS_MASTER) {
6f376729 1557 /* Don't reply to a master */
ed9b544e 1558 nwritten = objlen - c->sentlen;
1559 } else {
a4d1ba9a 1560 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
ed9b544e 1561 if (nwritten <= 0) break;
1562 }
1563 c->sentlen += nwritten;
1564 totwritten += nwritten;
1565 /* If we fully sent the object on head go to the next one */
1566 if (c->sentlen == objlen) {
1567 listDelNode(c->reply,listFirst(c->reply));
1568 c->sentlen = 0;
1569 }
6f376729 1570 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
12f9d551 1571 * bytes, in a single threaded server it's a good idea to serve
6f376729 1572 * other clients as well, even if a very large request comes from
1573 * super fast link that is always able to accept data (in real world
12f9d551 1574 * scenario think about 'KEYS *' against the loopback interfae) */
6f376729 1575 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
ed9b544e 1576 }
1577 if (nwritten == -1) {
1578 if (errno == EAGAIN) {
1579 nwritten = 0;
1580 } else {
1581 redisLog(REDIS_DEBUG,
1582 "Error writing to client: %s", strerror(errno));
1583 freeClient(c);
1584 return;
1585 }
1586 }
1587 if (totwritten > 0) c->lastinteraction = time(NULL);
1588 if (listLength(c->reply) == 0) {
1589 c->sentlen = 0;
1590 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1591 }
1592}
1593
2895e862 1594static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1595{
1596 redisClient *c = privdata;
1597 int nwritten = 0, totwritten = 0, objlen, willwrite;
1598 robj *o;
1599 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1600 int offset, ion = 0;
1601 REDIS_NOTUSED(el);
1602 REDIS_NOTUSED(mask);
1603
1604 listNode *node;
1605 while (listLength(c->reply)) {
1606 offset = c->sentlen;
1607 ion = 0;
1608 willwrite = 0;
1609
1610 /* fill-in the iov[] array */
1611 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1612 o = listNodeValue(node);
1613 objlen = sdslen(o->ptr);
1614
1615 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1616 break;
1617
1618 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1619 break; /* no more iovecs */
1620
1621 iov[ion].iov_base = ((char*)o->ptr) + offset;
1622 iov[ion].iov_len = objlen - offset;
1623 willwrite += objlen - offset;
1624 offset = 0; /* just for the first item */
1625 ion++;
1626 }
1627
1628 if(willwrite == 0)
1629 break;
1630
1631 /* write all collected blocks at once */
1632 if((nwritten = writev(fd, iov, ion)) < 0) {
1633 if (errno != EAGAIN) {
1634 redisLog(REDIS_DEBUG,
1635 "Error writing to client: %s", strerror(errno));
1636 freeClient(c);
1637 return;
1638 }
1639 break;
1640 }
1641
1642 totwritten += nwritten;
1643 offset = c->sentlen;
1644
1645 /* remove written robjs from c->reply */
1646 while (nwritten && listLength(c->reply)) {
1647 o = listNodeValue(listFirst(c->reply));
1648 objlen = sdslen(o->ptr);
1649
1650 if(nwritten >= objlen - offset) {
1651 listDelNode(c->reply, listFirst(c->reply));
1652 nwritten -= objlen - offset;
1653 c->sentlen = 0;
1654 } else {
1655 /* partial write */
1656 c->sentlen += nwritten;
1657 break;
1658 }
1659 offset = 0;
1660 }
1661 }
1662
1663 if (totwritten > 0)
1664 c->lastinteraction = time(NULL);
1665
1666 if (listLength(c->reply) == 0) {
1667 c->sentlen = 0;
1668 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1669 }
1670}
1671
ed9b544e 1672static struct redisCommand *lookupCommand(char *name) {
1673 int j = 0;
1674 while(cmdTable[j].name != NULL) {
bb0b03a3 1675 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
ed9b544e 1676 j++;
1677 }
1678 return NULL;
1679}
1680
1681/* resetClient prepare the client to process the next command */
1682static void resetClient(redisClient *c) {
1683 freeClientArgv(c);
1684 c->bulklen = -1;
e8a74421 1685 c->multibulk = 0;
ed9b544e 1686}
1687
6e469882 1688/* Call() is the core of Redis execution of a command */
1689static void call(redisClient *c, struct redisCommand *cmd) {
1690 long long dirty;
1691
1692 dirty = server.dirty;
1693 cmd->proc(c);
1694 if (server.appendonly && server.dirty-dirty)
1695 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1696 if (server.dirty-dirty && listLength(server.slaves))
1697 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1698 if (listLength(server.monitors))
1699 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1700 server.stat_numcommands++;
1701}
1702
ed9b544e 1703/* If this function gets called we already read a whole
1704 * command, argments are in the client argv/argc fields.
1705 * processCommand() execute the command or prepare the
1706 * server for a bulk read from the client.
1707 *
1708 * If 1 is returned the client is still alive and valid and
1709 * and other operations can be performed by the caller. Otherwise
1710 * if 0 is returned the client was destroied (i.e. after QUIT). */
1711static int processCommand(redisClient *c) {
1712 struct redisCommand *cmd;
ed9b544e 1713
3fd78bcd 1714 /* Free some memory if needed (maxmemory setting) */
1715 if (server.maxmemory) freeMemoryIfNeeded();
1716
e8a74421 1717 /* Handle the multi bulk command type. This is an alternative protocol
1718 * supported by Redis in order to receive commands that are composed of
1719 * multiple binary-safe "bulk" arguments. The latency of processing is
1720 * a bit higher but this allows things like multi-sets, so if this
1721 * protocol is used only for MSET and similar commands this is a big win. */
1722 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1723 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1724 if (c->multibulk <= 0) {
1725 resetClient(c);
1726 return 1;
1727 } else {
1728 decrRefCount(c->argv[c->argc-1]);
1729 c->argc--;
1730 return 1;
1731 }
1732 } else if (c->multibulk) {
1733 if (c->bulklen == -1) {
1734 if (((char*)c->argv[0]->ptr)[0] != '$') {
1735 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1736 resetClient(c);
1737 return 1;
1738 } else {
1739 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1740 decrRefCount(c->argv[0]);
1741 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1742 c->argc--;
1743 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1744 resetClient(c);
1745 return 1;
1746 }
1747 c->argc--;
1748 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1749 return 1;
1750 }
1751 } else {
1752 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1753 c->mbargv[c->mbargc] = c->argv[0];
1754 c->mbargc++;
1755 c->argc--;
1756 c->multibulk--;
1757 if (c->multibulk == 0) {
1758 robj **auxargv;
1759 int auxargc;
1760
1761 /* Here we need to swap the multi-bulk argc/argv with the
1762 * normal argc/argv of the client structure. */
1763 auxargv = c->argv;
1764 c->argv = c->mbargv;
1765 c->mbargv = auxargv;
1766
1767 auxargc = c->argc;
1768 c->argc = c->mbargc;
1769 c->mbargc = auxargc;
1770
1771 /* We need to set bulklen to something different than -1
1772 * in order for the code below to process the command without
1773 * to try to read the last argument of a bulk command as
1774 * a special argument. */
1775 c->bulklen = 0;
1776 /* continue below and process the command */
1777 } else {
1778 c->bulklen = -1;
1779 return 1;
1780 }
1781 }
1782 }
1783 /* -- end of multi bulk commands processing -- */
1784
ed9b544e 1785 /* The QUIT command is handled as a special case. Normal command
1786 * procs are unable to close the client connection safely */
bb0b03a3 1787 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
ed9b544e 1788 freeClient(c);
1789 return 0;
1790 }
1791 cmd = lookupCommand(c->argv[0]->ptr);
1792 if (!cmd) {
2c14807b 1793 addReplySds(c,
1794 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1795 (char*)c->argv[0]->ptr));
ed9b544e 1796 resetClient(c);
1797 return 1;
1798 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1799 (c->argc < -cmd->arity)) {
454d4e43 1800 addReplySds(c,
1801 sdscatprintf(sdsempty(),
1802 "-ERR wrong number of arguments for '%s' command\r\n",
1803 cmd->name));
ed9b544e 1804 resetClient(c);
1805 return 1;
3fd78bcd 1806 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1807 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1808 resetClient(c);
1809 return 1;
ed9b544e 1810 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1811 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1812
1813 decrRefCount(c->argv[c->argc-1]);
1814 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1815 c->argc--;
1816 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1817 resetClient(c);
1818 return 1;
1819 }
1820 c->argc--;
1821 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1822 /* It is possible that the bulk read is already in the
8d0490e7 1823 * buffer. Check this condition and handle it accordingly.
1824 * This is just a fast path, alternative to call processInputBuffer().
1825 * It's a good idea since the code is small and this condition
1826 * happens most of the times. */
ed9b544e 1827 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1828 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1829 c->argc++;
1830 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1831 } else {
1832 return 1;
1833 }
1834 }
10c43610 1835 /* Let's try to share objects on the command arguments vector */
1836 if (server.shareobjects) {
1837 int j;
1838 for(j = 1; j < c->argc; j++)
1839 c->argv[j] = tryObjectSharing(c->argv[j]);
1840 }
942a3961 1841 /* Let's try to encode the bulk object to save space. */
1842 if (cmd->flags & REDIS_CMD_BULK)
1843 tryObjectEncoding(c->argv[c->argc-1]);
1844
e63943a4 1845 /* Check if the user is authenticated */
1846 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1847 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1848 resetClient(c);
1849 return 1;
1850 }
1851
ed9b544e 1852 /* Exec the command */
6e469882 1853 if (c->flags & REDIS_MULTI && cmd->proc != execCommand) {
1854 queueMultiCommand(c,cmd);
1855 addReply(c,shared.queued);
1856 } else {
1857 call(c,cmd);
1858 }
ed9b544e 1859
1860 /* Prepare the client for the next command */
1861 if (c->flags & REDIS_CLOSE) {
1862 freeClient(c);
1863 return 0;
1864 }
1865 resetClient(c);
1866 return 1;
1867}
1868
87eca727 1869static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
6208b3a7 1870 listNode *ln;
ed9b544e 1871 int outc = 0, j;
93ea3759 1872 robj **outv;
1873 /* (args*2)+1 is enough room for args, spaces, newlines */
1874 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1875
1876 if (argc <= REDIS_STATIC_ARGS) {
1877 outv = static_outv;
1878 } else {
1879 outv = zmalloc(sizeof(robj*)*(argc*2+1));
93ea3759 1880 }
ed9b544e 1881
1882 for (j = 0; j < argc; j++) {
1883 if (j != 0) outv[outc++] = shared.space;
1884 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1885 robj *lenobj;
1886
1887 lenobj = createObject(REDIS_STRING,
682ac724 1888 sdscatprintf(sdsempty(),"%lu\r\n",
83c6a618 1889 (unsigned long) stringObjectLen(argv[j])));
ed9b544e 1890 lenobj->refcount = 0;
1891 outv[outc++] = lenobj;
1892 }
1893 outv[outc++] = argv[j];
1894 }
1895 outv[outc++] = shared.crlf;
1896
40d224a9 1897 /* Increment all the refcounts at start and decrement at end in order to
1898 * be sure to free objects if there is no slave in a replication state
1899 * able to be feed with commands */
1900 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
6208b3a7 1901 listRewind(slaves);
1902 while((ln = listYield(slaves))) {
ed9b544e 1903 redisClient *slave = ln->value;
40d224a9 1904
1905 /* Don't feed slaves that are still waiting for BGSAVE to start */
6208b3a7 1906 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
40d224a9 1907
1908 /* Feed all the other slaves, MONITORs and so on */
ed9b544e 1909 if (slave->slaveseldb != dictid) {
1910 robj *selectcmd;
1911
1912 switch(dictid) {
1913 case 0: selectcmd = shared.select0; break;
1914 case 1: selectcmd = shared.select1; break;
1915 case 2: selectcmd = shared.select2; break;
1916 case 3: selectcmd = shared.select3; break;
1917 case 4: selectcmd = shared.select4; break;
1918 case 5: selectcmd = shared.select5; break;
1919 case 6: selectcmd = shared.select6; break;
1920 case 7: selectcmd = shared.select7; break;
1921 case 8: selectcmd = shared.select8; break;
1922 case 9: selectcmd = shared.select9; break;
1923 default:
1924 selectcmd = createObject(REDIS_STRING,
1925 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1926 selectcmd->refcount = 0;
1927 break;
1928 }
1929 addReply(slave,selectcmd);
1930 slave->slaveseldb = dictid;
1931 }
1932 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
ed9b544e 1933 }
40d224a9 1934 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
93ea3759 1935 if (outv != static_outv) zfree(outv);
ed9b544e 1936}
1937
638e42ac 1938static void processInputBuffer(redisClient *c) {
ed9b544e 1939again:
4409877e 1940 /* Before to process the input buffer, make sure the client is not
1941 * waitig for a blocking operation such as BLPOP. Note that the first
1942 * iteration the client is never blocked, otherwise the processInputBuffer
1943 * would not be called at all, but after the execution of the first commands
1944 * in the input buffer the client may be blocked, and the "goto again"
1945 * will try to reiterate. The following line will make it return asap. */
1946 if (c->flags & REDIS_BLOCKED) return;
ed9b544e 1947 if (c->bulklen == -1) {
1948 /* Read the first line of the query */
1949 char *p = strchr(c->querybuf,'\n');
1950 size_t querylen;
644fafa3 1951
ed9b544e 1952 if (p) {
1953 sds query, *argv;
1954 int argc, j;
1955
1956 query = c->querybuf;
1957 c->querybuf = sdsempty();
1958 querylen = 1+(p-(query));
1959 if (sdslen(query) > querylen) {
1960 /* leave data after the first line of the query in the buffer */
1961 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1962 }
1963 *p = '\0'; /* remove "\n" */
1964 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1965 sdsupdatelen(query);
1966
1967 /* Now we can split the query in arguments */
ed9b544e 1968 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
93ea3759 1969 sdsfree(query);
1970
1971 if (c->argv) zfree(c->argv);
1972 c->argv = zmalloc(sizeof(robj*)*argc);
93ea3759 1973
1974 for (j = 0; j < argc; j++) {
ed9b544e 1975 if (sdslen(argv[j])) {
1976 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1977 c->argc++;
1978 } else {
1979 sdsfree(argv[j]);
1980 }
1981 }
1982 zfree(argv);
7c49733c 1983 if (c->argc) {
1984 /* Execute the command. If the client is still valid
1985 * after processCommand() return and there is something
1986 * on the query buffer try to process the next command. */
1987 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1988 } else {
1989 /* Nothing to process, argc == 0. Just process the query
1990 * buffer if it's not empty or return to the caller */
1991 if (sdslen(c->querybuf)) goto again;
1992 }
ed9b544e 1993 return;
644fafa3 1994 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
ed9b544e 1995 redisLog(REDIS_DEBUG, "Client protocol error");
1996 freeClient(c);
1997 return;
1998 }
1999 } else {
2000 /* Bulk read handling. Note that if we are at this point
2001 the client already sent a command terminated with a newline,
2002 we are reading the bulk data that is actually the last
2003 argument of the command. */
2004 int qbl = sdslen(c->querybuf);
2005
2006 if (c->bulklen <= qbl) {
2007 /* Copy everything but the final CRLF as final argument */
2008 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
2009 c->argc++;
2010 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
638e42ac 2011 /* Process the command. If the client is still valid after
2012 * the processing and there is more data in the buffer
2013 * try to parse it. */
2014 if (processCommand(c) && sdslen(c->querybuf)) goto again;
ed9b544e 2015 return;
2016 }
2017 }
2018}
2019
638e42ac 2020static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
2021 redisClient *c = (redisClient*) privdata;
2022 char buf[REDIS_IOBUF_LEN];
2023 int nread;
2024 REDIS_NOTUSED(el);
2025 REDIS_NOTUSED(mask);
2026
2027 nread = read(fd, buf, REDIS_IOBUF_LEN);
2028 if (nread == -1) {
2029 if (errno == EAGAIN) {
2030 nread = 0;
2031 } else {
2032 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
2033 freeClient(c);
2034 return;
2035 }
2036 } else if (nread == 0) {
2037 redisLog(REDIS_DEBUG, "Client closed connection");
2038 freeClient(c);
2039 return;
2040 }
2041 if (nread) {
2042 c->querybuf = sdscatlen(c->querybuf, buf, nread);
2043 c->lastinteraction = time(NULL);
2044 } else {
2045 return;
2046 }
2047 processInputBuffer(c);
2048}
2049
ed9b544e 2050static int selectDb(redisClient *c, int id) {
2051 if (id < 0 || id >= server.dbnum)
2052 return REDIS_ERR;
3305306f 2053 c->db = &server.db[id];
ed9b544e 2054 return REDIS_OK;
2055}
2056
40d224a9 2057static void *dupClientReplyValue(void *o) {
2058 incrRefCount((robj*)o);
2059 return 0;
2060}
2061
ed9b544e 2062static redisClient *createClient(int fd) {
2063 redisClient *c = zmalloc(sizeof(*c));
2064
2065 anetNonBlock(NULL,fd);
2066 anetTcpNoDelay(NULL,fd);
2067 if (!c) return NULL;
2068 selectDb(c,0);
2069 c->fd = fd;
2070 c->querybuf = sdsempty();
2071 c->argc = 0;
93ea3759 2072 c->argv = NULL;
ed9b544e 2073 c->bulklen = -1;
e8a74421 2074 c->multibulk = 0;
2075 c->mbargc = 0;
2076 c->mbargv = NULL;
ed9b544e 2077 c->sentlen = 0;
2078 c->flags = 0;
2079 c->lastinteraction = time(NULL);
abcb223e 2080 c->authenticated = 0;
40d224a9 2081 c->replstate = REDIS_REPL_NONE;
6b47e12e 2082 c->reply = listCreate();
4409877e 2083 c->blockingkey = NULL;
ed9b544e 2084 listSetFreeMethod(c->reply,decrRefCount);
40d224a9 2085 listSetDupMethod(c->reply,dupClientReplyValue);
ed9b544e 2086 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
266373b2 2087 readQueryFromClient, c) == AE_ERR) {
ed9b544e 2088 freeClient(c);
2089 return NULL;
2090 }
6b47e12e 2091 listAddNodeTail(server.clients,c);
6e469882 2092 initClientMultiState(c);
ed9b544e 2093 return c;
2094}
2095
2096static void addReply(redisClient *c, robj *obj) {
2097 if (listLength(c->reply) == 0 &&
6208b3a7 2098 (c->replstate == REDIS_REPL_NONE ||
2099 c->replstate == REDIS_REPL_ONLINE) &&
ed9b544e 2100 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
266373b2 2101 sendReplyToClient, c) == AE_ERR) return;
9d65a1bb 2102 listAddNodeTail(c->reply,getDecodedObject(obj));
ed9b544e 2103}
2104
2105static void addReplySds(redisClient *c, sds s) {
2106 robj *o = createObject(REDIS_STRING,s);
2107 addReply(c,o);
2108 decrRefCount(o);
2109}
2110
e2665397 2111static void addReplyDouble(redisClient *c, double d) {
2112 char buf[128];
2113
2114 snprintf(buf,sizeof(buf),"%.17g",d);
682ac724 2115 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
83c6a618 2116 (unsigned long) strlen(buf),buf));
e2665397 2117}
2118
942a3961 2119static void addReplyBulkLen(redisClient *c, robj *obj) {
2120 size_t len;
2121
2122 if (obj->encoding == REDIS_ENCODING_RAW) {
2123 len = sdslen(obj->ptr);
2124 } else {
2125 long n = (long)obj->ptr;
2126
e054afda 2127 /* Compute how many bytes will take this integer as a radix 10 string */
942a3961 2128 len = 1;
2129 if (n < 0) {
2130 len++;
2131 n = -n;
2132 }
2133 while((n = n/10) != 0) {
2134 len++;
2135 }
2136 }
83c6a618 2137 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
942a3961 2138}
2139
ed9b544e 2140static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2141 int cport, cfd;
2142 char cip[128];
285add55 2143 redisClient *c;
ed9b544e 2144 REDIS_NOTUSED(el);
2145 REDIS_NOTUSED(mask);
2146 REDIS_NOTUSED(privdata);
2147
2148 cfd = anetAccept(server.neterr, fd, cip, &cport);
2149 if (cfd == AE_ERR) {
2150 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2151 return;
2152 }
2153 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
285add55 2154 if ((c = createClient(cfd)) == NULL) {
ed9b544e 2155 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2156 close(cfd); /* May be already closed, just ingore errors */
2157 return;
2158 }
285add55 2159 /* If maxclient directive is set and this is one client more... close the
2160 * connection. Note that we create the client instead to check before
2161 * for this condition, since now the socket is already set in nonblocking
2162 * mode and we can send an error for free using the Kernel I/O */
2163 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2164 char *err = "-ERR max number of clients reached\r\n";
2165
2166 /* That's a best effort error message, don't check write errors */
fee803ba 2167 if (write(c->fd,err,strlen(err)) == -1) {
2168 /* Nothing to do, Just to avoid the warning... */
2169 }
285add55 2170 freeClient(c);
2171 return;
2172 }
ed9b544e 2173 server.stat_numconnections++;
2174}
2175
2176/* ======================= Redis objects implementation ===================== */
2177
2178static robj *createObject(int type, void *ptr) {
2179 robj *o;
2180
2181 if (listLength(server.objfreelist)) {
2182 listNode *head = listFirst(server.objfreelist);
2183 o = listNodeValue(head);
2184 listDelNode(server.objfreelist,head);
2185 } else {
2186 o = zmalloc(sizeof(*o));
2187 }
ed9b544e 2188 o->type = type;
942a3961 2189 o->encoding = REDIS_ENCODING_RAW;
ed9b544e 2190 o->ptr = ptr;
2191 o->refcount = 1;
2192 return o;
2193}
2194
2195static robj *createStringObject(char *ptr, size_t len) {
2196 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2197}
2198
2199static robj *createListObject(void) {
2200 list *l = listCreate();
2201
ed9b544e 2202 listSetFreeMethod(l,decrRefCount);
2203 return createObject(REDIS_LIST,l);
2204}
2205
2206static robj *createSetObject(void) {
2207 dict *d = dictCreate(&setDictType,NULL);
ed9b544e 2208 return createObject(REDIS_SET,d);
2209}
2210
1812e024 2211static robj *createZsetObject(void) {
6b47e12e 2212 zset *zs = zmalloc(sizeof(*zs));
2213
2214 zs->dict = dictCreate(&zsetDictType,NULL);
2215 zs->zsl = zslCreate();
2216 return createObject(REDIS_ZSET,zs);
1812e024 2217}
2218
ed9b544e 2219static void freeStringObject(robj *o) {
942a3961 2220 if (o->encoding == REDIS_ENCODING_RAW) {
2221 sdsfree(o->ptr);
2222 }
ed9b544e 2223}
2224
2225static void freeListObject(robj *o) {
2226 listRelease((list*) o->ptr);
2227}
2228
2229static void freeSetObject(robj *o) {
2230 dictRelease((dict*) o->ptr);
2231}
2232
fd8ccf44 2233static void freeZsetObject(robj *o) {
2234 zset *zs = o->ptr;
2235
2236 dictRelease(zs->dict);
2237 zslFree(zs->zsl);
2238 zfree(zs);
2239}
2240
ed9b544e 2241static void freeHashObject(robj *o) {
2242 dictRelease((dict*) o->ptr);
2243}
2244
2245static void incrRefCount(robj *o) {
2246 o->refcount++;
94754ccc 2247#ifdef DEBUG_REFCOUNT
2248 if (o->type == REDIS_STRING)
2249 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2250#endif
ed9b544e 2251}
2252
2253static void decrRefCount(void *obj) {
2254 robj *o = obj;
94754ccc 2255
2256#ifdef DEBUG_REFCOUNT
2257 if (o->type == REDIS_STRING)
2258 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2259#endif
ed9b544e 2260 if (--(o->refcount) == 0) {
2261 switch(o->type) {
2262 case REDIS_STRING: freeStringObject(o); break;
2263 case REDIS_LIST: freeListObject(o); break;
2264 case REDIS_SET: freeSetObject(o); break;
fd8ccf44 2265 case REDIS_ZSET: freeZsetObject(o); break;
ed9b544e 2266 case REDIS_HASH: freeHashObject(o); break;
dfc5e96c 2267 default: redisAssert(0 != 0); break;
ed9b544e 2268 }
2269 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2270 !listAddNodeHead(server.objfreelist,o))
2271 zfree(o);
2272 }
2273}
2274
942a3961 2275static robj *lookupKey(redisDb *db, robj *key) {
2276 dictEntry *de = dictFind(db->dict,key);
2277 return de ? dictGetEntryVal(de) : NULL;
2278}
2279
2280static robj *lookupKeyRead(redisDb *db, robj *key) {
2281 expireIfNeeded(db,key);
2282 return lookupKey(db,key);
2283}
2284
2285static robj *lookupKeyWrite(redisDb *db, robj *key) {
2286 deleteIfVolatile(db,key);
2287 return lookupKey(db,key);
2288}
2289
2290static int deleteKey(redisDb *db, robj *key) {
2291 int retval;
2292
2293 /* We need to protect key from destruction: after the first dictDelete()
2294 * it may happen that 'key' is no longer valid if we don't increment
2295 * it's count. This may happen when we get the object reference directly
2296 * from the hash table with dictRandomKey() or dict iterators */
2297 incrRefCount(key);
2298 if (dictSize(db->expires)) dictDelete(db->expires,key);
2299 retval = dictDelete(db->dict,key);
2300 decrRefCount(key);
2301
2302 return retval == DICT_OK;
2303}
2304
10c43610 2305/* Try to share an object against the shared objects pool */
2306static robj *tryObjectSharing(robj *o) {
2307 struct dictEntry *de;
2308 unsigned long c;
2309
3305306f 2310 if (o == NULL || server.shareobjects == 0) return o;
10c43610 2311
dfc5e96c 2312 redisAssert(o->type == REDIS_STRING);
10c43610 2313 de = dictFind(server.sharingpool,o);
2314 if (de) {
2315 robj *shared = dictGetEntryKey(de);
2316
2317 c = ((unsigned long) dictGetEntryVal(de))+1;
2318 dictGetEntryVal(de) = (void*) c;
2319 incrRefCount(shared);
2320 decrRefCount(o);
2321 return shared;
2322 } else {
2323 /* Here we are using a stream algorihtm: Every time an object is
2324 * shared we increment its count, everytime there is a miss we
2325 * recrement the counter of a random object. If this object reaches
2326 * zero we remove the object and put the current object instead. */
3305306f 2327 if (dictSize(server.sharingpool) >=
10c43610 2328 server.sharingpoolsize) {
2329 de = dictGetRandomKey(server.sharingpool);
dfc5e96c 2330 redisAssert(de != NULL);
10c43610 2331 c = ((unsigned long) dictGetEntryVal(de))-1;
2332 dictGetEntryVal(de) = (void*) c;
2333 if (c == 0) {
2334 dictDelete(server.sharingpool,de->key);
2335 }
2336 } else {
2337 c = 0; /* If the pool is empty we want to add this object */
2338 }
2339 if (c == 0) {
2340 int retval;
2341
2342 retval = dictAdd(server.sharingpool,o,(void*)1);
dfc5e96c 2343 redisAssert(retval == DICT_OK);
10c43610 2344 incrRefCount(o);
2345 }
2346 return o;
2347 }
2348}
2349
724a51b1 2350/* Check if the nul-terminated string 's' can be represented by a long
2351 * (that is, is a number that fits into long without any other space or
2352 * character before or after the digits).
2353 *
2354 * If so, the function returns REDIS_OK and *longval is set to the value
2355 * of the number. Otherwise REDIS_ERR is returned */
f69f2cba 2356static int isStringRepresentableAsLong(sds s, long *longval) {
724a51b1 2357 char buf[32], *endptr;
2358 long value;
2359 int slen;
2360
2361 value = strtol(s, &endptr, 10);
2362 if (endptr[0] != '\0') return REDIS_ERR;
2363 slen = snprintf(buf,32,"%ld",value);
2364
2365 /* If the number converted back into a string is not identical
2366 * then it's not possible to encode the string as integer */
f69f2cba 2367 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
724a51b1 2368 if (longval) *longval = value;
2369 return REDIS_OK;
2370}
2371
942a3961 2372/* Try to encode a string object in order to save space */
2373static int tryObjectEncoding(robj *o) {
2374 long value;
942a3961 2375 sds s = o->ptr;
3305306f 2376
942a3961 2377 if (o->encoding != REDIS_ENCODING_RAW)
2378 return REDIS_ERR; /* Already encoded */
3305306f 2379
942a3961 2380 /* It's not save to encode shared objects: shared objects can be shared
2381 * everywhere in the "object space" of Redis. Encoded objects can only
2382 * appear as "values" (and not, for instance, as keys) */
2383 if (o->refcount > 1) return REDIS_ERR;
3305306f 2384
942a3961 2385 /* Currently we try to encode only strings */
dfc5e96c 2386 redisAssert(o->type == REDIS_STRING);
94754ccc 2387
724a51b1 2388 /* Check if we can represent this string as a long integer */
2389 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
942a3961 2390
2391 /* Ok, this object can be encoded */
2392 o->encoding = REDIS_ENCODING_INT;
2393 sdsfree(o->ptr);
2394 o->ptr = (void*) value;
2395 return REDIS_OK;
2396}
2397
9d65a1bb 2398/* Get a decoded version of an encoded object (returned as a new object).
2399 * If the object is already raw-encoded just increment the ref count. */
2400static robj *getDecodedObject(robj *o) {
942a3961 2401 robj *dec;
2402
9d65a1bb 2403 if (o->encoding == REDIS_ENCODING_RAW) {
2404 incrRefCount(o);
2405 return o;
2406 }
942a3961 2407 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2408 char buf[32];
2409
2410 snprintf(buf,32,"%ld",(long)o->ptr);
2411 dec = createStringObject(buf,strlen(buf));
2412 return dec;
2413 } else {
dfc5e96c 2414 redisAssert(1 != 1);
942a3961 2415 }
3305306f 2416}
2417
d7f43c08 2418/* Compare two string objects via strcmp() or alike.
2419 * Note that the objects may be integer-encoded. In such a case we
2420 * use snprintf() to get a string representation of the numbers on the stack
1fd9bc8a 2421 * and compare the strings, it's much faster than calling getDecodedObject().
2422 *
2423 * Important note: if objects are not integer encoded, but binary-safe strings,
2424 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2425 * binary safe. */
724a51b1 2426static int compareStringObjects(robj *a, robj *b) {
dfc5e96c 2427 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
d7f43c08 2428 char bufa[128], bufb[128], *astr, *bstr;
2429 int bothsds = 1;
724a51b1 2430
e197b441 2431 if (a == b) return 0;
d7f43c08 2432 if (a->encoding != REDIS_ENCODING_RAW) {
2433 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2434 astr = bufa;
2435 bothsds = 0;
724a51b1 2436 } else {
d7f43c08 2437 astr = a->ptr;
724a51b1 2438 }
d7f43c08 2439 if (b->encoding != REDIS_ENCODING_RAW) {
2440 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2441 bstr = bufb;
2442 bothsds = 0;
2443 } else {
2444 bstr = b->ptr;
2445 }
2446 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
724a51b1 2447}
2448
0ea663ea 2449static size_t stringObjectLen(robj *o) {
dfc5e96c 2450 redisAssert(o->type == REDIS_STRING);
0ea663ea 2451 if (o->encoding == REDIS_ENCODING_RAW) {
2452 return sdslen(o->ptr);
2453 } else {
2454 char buf[32];
2455
2456 return snprintf(buf,32,"%ld",(long)o->ptr);
2457 }
2458}
2459
ed9b544e 2460/*============================ DB saving/loading ============================ */
2461
f78fd11b 2462static int rdbSaveType(FILE *fp, unsigned char type) {
2463 if (fwrite(&type,1,1,fp) == 0) return -1;
2464 return 0;
2465}
2466
bb32ede5 2467static int rdbSaveTime(FILE *fp, time_t t) {
2468 int32_t t32 = (int32_t) t;
2469 if (fwrite(&t32,4,1,fp) == 0) return -1;
2470 return 0;
2471}
2472
e3566d4b 2473/* check rdbLoadLen() comments for more info */
f78fd11b 2474static int rdbSaveLen(FILE *fp, uint32_t len) {
2475 unsigned char buf[2];
2476
2477 if (len < (1<<6)) {
2478 /* Save a 6 bit len */
10c43610 2479 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
f78fd11b 2480 if (fwrite(buf,1,1,fp) == 0) return -1;
2481 } else if (len < (1<<14)) {
2482 /* Save a 14 bit len */
10c43610 2483 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
f78fd11b 2484 buf[1] = len&0xFF;
17be1a4a 2485 if (fwrite(buf,2,1,fp) == 0) return -1;
f78fd11b 2486 } else {
2487 /* Save a 32 bit len */
10c43610 2488 buf[0] = (REDIS_RDB_32BITLEN<<6);
f78fd11b 2489 if (fwrite(buf,1,1,fp) == 0) return -1;
2490 len = htonl(len);
2491 if (fwrite(&len,4,1,fp) == 0) return -1;
2492 }
2493 return 0;
2494}
2495
e3566d4b 2496/* String objects in the form "2391" "-100" without any space and with a
2497 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2498 * encoded as integers to save space */
56906eef 2499static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
e3566d4b 2500 long long value;
2501 char *endptr, buf[32];
2502
2503 /* Check if it's possible to encode this value as a number */
2504 value = strtoll(s, &endptr, 10);
2505 if (endptr[0] != '\0') return 0;
2506 snprintf(buf,32,"%lld",value);
2507
2508 /* If the number converted back into a string is not identical
2509 * then it's not possible to encode the string as integer */
2510 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2511
2512 /* Finally check if it fits in our ranges */
2513 if (value >= -(1<<7) && value <= (1<<7)-1) {
2514 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2515 enc[1] = value&0xFF;
2516 return 2;
2517 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2518 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2519 enc[1] = value&0xFF;
2520 enc[2] = (value>>8)&0xFF;
2521 return 3;
2522 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2523 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2524 enc[1] = value&0xFF;
2525 enc[2] = (value>>8)&0xFF;
2526 enc[3] = (value>>16)&0xFF;
2527 enc[4] = (value>>24)&0xFF;
2528 return 5;
2529 } else {
2530 return 0;
2531 }
2532}
2533
774e3047 2534static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2535 unsigned int comprlen, outlen;
2536 unsigned char byte;
2537 void *out;
2538
2539 /* We require at least four bytes compression for this to be worth it */
2540 outlen = sdslen(obj->ptr)-4;
2541 if (outlen <= 0) return 0;
3a2694c4 2542 if ((out = zmalloc(outlen+1)) == NULL) return 0;
774e3047 2543 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2544 if (comprlen == 0) {
88e85998 2545 zfree(out);
774e3047 2546 return 0;
2547 }
2548 /* Data compressed! Let's save it on disk */
2549 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2550 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2551 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2552 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2553 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
88e85998 2554 zfree(out);
774e3047 2555 return comprlen;
2556
2557writeerr:
88e85998 2558 zfree(out);
774e3047 2559 return -1;
2560}
2561
e3566d4b 2562/* Save a string objet as [len][data] on disk. If the object is a string
2563 * representation of an integer value we try to safe it in a special form */
942a3961 2564static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2565 size_t len;
e3566d4b 2566 int enclen;
10c43610 2567
942a3961 2568 len = sdslen(obj->ptr);
2569
774e3047 2570 /* Try integer encoding */
e3566d4b 2571 if (len <= 11) {
2572 unsigned char buf[5];
2573 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2574 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2575 return 0;
2576 }
2577 }
774e3047 2578
2579 /* Try LZF compression - under 20 bytes it's unable to compress even
88e85998 2580 * aaaaaaaaaaaaaaaaaa so skip it */
121f70cf 2581 if (server.rdbcompression && len > 20) {
774e3047 2582 int retval;
2583
2584 retval = rdbSaveLzfStringObject(fp,obj);
2585 if (retval == -1) return -1;
2586 if (retval > 0) return 0;
2587 /* retval == 0 means data can't be compressed, save the old way */
2588 }
2589
2590 /* Store verbatim */
10c43610 2591 if (rdbSaveLen(fp,len) == -1) return -1;
2592 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2593 return 0;
2594}
2595
942a3961 2596/* Like rdbSaveStringObjectRaw() but handle encoded objects */
2597static int rdbSaveStringObject(FILE *fp, robj *obj) {
2598 int retval;
942a3961 2599
9d65a1bb 2600 obj = getDecodedObject(obj);
2601 retval = rdbSaveStringObjectRaw(fp,obj);
2602 decrRefCount(obj);
2603 return retval;
942a3961 2604}
2605
a7866db6 2606/* Save a double value. Doubles are saved as strings prefixed by an unsigned
2607 * 8 bit integer specifing the length of the representation.
2608 * This 8 bit integer has special values in order to specify the following
2609 * conditions:
2610 * 253: not a number
2611 * 254: + inf
2612 * 255: - inf
2613 */
2614static int rdbSaveDoubleValue(FILE *fp, double val) {
2615 unsigned char buf[128];
2616 int len;
2617
2618 if (isnan(val)) {
2619 buf[0] = 253;
2620 len = 1;
2621 } else if (!isfinite(val)) {
2622 len = 1;
2623 buf[0] = (val < 0) ? 255 : 254;
2624 } else {
eaa256ad 2625 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
6c446631 2626 buf[0] = strlen((char*)buf+1);
a7866db6 2627 len = buf[0]+1;
2628 }
2629 if (fwrite(buf,len,1,fp) == 0) return -1;
2630 return 0;
2631}
2632
ed9b544e 2633/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
f78fd11b 2634static int rdbSave(char *filename) {
ed9b544e 2635 dictIterator *di = NULL;
2636 dictEntry *de;
ed9b544e 2637 FILE *fp;
2638 char tmpfile[256];
2639 int j;
bb32ede5 2640 time_t now = time(NULL);
ed9b544e 2641
a3b21203 2642 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
ed9b544e 2643 fp = fopen(tmpfile,"w");
2644 if (!fp) {
2645 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2646 return REDIS_ERR;
2647 }
f78fd11b 2648 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
ed9b544e 2649 for (j = 0; j < server.dbnum; j++) {
bb32ede5 2650 redisDb *db = server.db+j;
2651 dict *d = db->dict;
3305306f 2652 if (dictSize(d) == 0) continue;
ed9b544e 2653 di = dictGetIterator(d);
2654 if (!di) {
2655 fclose(fp);
2656 return REDIS_ERR;
2657 }
2658
2659 /* Write the SELECT DB opcode */
f78fd11b 2660 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2661 if (rdbSaveLen(fp,j) == -1) goto werr;
ed9b544e 2662
2663 /* Iterate this DB writing every entry */
2664 while((de = dictNext(di)) != NULL) {
2665 robj *key = dictGetEntryKey(de);
2666 robj *o = dictGetEntryVal(de);
bb32ede5 2667 time_t expiretime = getExpire(db,key);
2668
2669 /* Save the expire time */
2670 if (expiretime != -1) {
2671 /* If this key is already expired skip it */
2672 if (expiretime < now) continue;
2673 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2674 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2675 }
2676 /* Save the key and associated value */
f78fd11b 2677 if (rdbSaveType(fp,o->type) == -1) goto werr;
10c43610 2678 if (rdbSaveStringObject(fp,key) == -1) goto werr;
f78fd11b 2679 if (o->type == REDIS_STRING) {
ed9b544e 2680 /* Save a string value */
10c43610 2681 if (rdbSaveStringObject(fp,o) == -1) goto werr;
f78fd11b 2682 } else if (o->type == REDIS_LIST) {
ed9b544e 2683 /* Save a list value */
2684 list *list = o->ptr;
6208b3a7 2685 listNode *ln;
ed9b544e 2686
6208b3a7 2687 listRewind(list);
f78fd11b 2688 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
6208b3a7 2689 while((ln = listYield(list))) {
ed9b544e 2690 robj *eleobj = listNodeValue(ln);
f78fd11b 2691
10c43610 2692 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
ed9b544e 2693 }
f78fd11b 2694 } else if (o->type == REDIS_SET) {
ed9b544e 2695 /* Save a set value */
2696 dict *set = o->ptr;
2697 dictIterator *di = dictGetIterator(set);
2698 dictEntry *de;
2699
3305306f 2700 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
ed9b544e 2701 while((de = dictNext(di)) != NULL) {
10c43610 2702 robj *eleobj = dictGetEntryKey(de);
ed9b544e 2703
10c43610 2704 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
ed9b544e 2705 }
2706 dictReleaseIterator(di);
2b59cfdf 2707 } else if (o->type == REDIS_ZSET) {
2708 /* Save a set value */
2709 zset *zs = o->ptr;
2710 dictIterator *di = dictGetIterator(zs->dict);
2711 dictEntry *de;
2712
2713 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2714 while((de = dictNext(di)) != NULL) {
2715 robj *eleobj = dictGetEntryKey(de);
2716 double *score = dictGetEntryVal(de);
2717
2718 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2719 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2720 }
2721 dictReleaseIterator(di);
ed9b544e 2722 } else {
dfc5e96c 2723 redisAssert(0 != 0);
ed9b544e 2724 }
2725 }
2726 dictReleaseIterator(di);
2727 }
2728 /* EOF opcode */
f78fd11b 2729 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2730
2731 /* Make sure data will not remain on the OS's output buffers */
ed9b544e 2732 fflush(fp);
2733 fsync(fileno(fp));
2734 fclose(fp);
2735
2736 /* Use RENAME to make sure the DB file is changed atomically only
2737 * if the generate DB file is ok. */
2738 if (rename(tmpfile,filename) == -1) {
325d1eb4 2739 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
ed9b544e 2740 unlink(tmpfile);
2741 return REDIS_ERR;
2742 }
2743 redisLog(REDIS_NOTICE,"DB saved on disk");
2744 server.dirty = 0;
2745 server.lastsave = time(NULL);
2746 return REDIS_OK;
2747
2748werr:
2749 fclose(fp);
2750 unlink(tmpfile);
2751 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2752 if (di) dictReleaseIterator(di);
2753 return REDIS_ERR;
2754}
2755
f78fd11b 2756static int rdbSaveBackground(char *filename) {
ed9b544e 2757 pid_t childpid;
2758
9d65a1bb 2759 if (server.bgsavechildpid != -1) return REDIS_ERR;
ed9b544e 2760 if ((childpid = fork()) == 0) {
2761 /* Child */
2762 close(server.fd);
f78fd11b 2763 if (rdbSave(filename) == REDIS_OK) {
ed9b544e 2764 exit(0);
2765 } else {
2766 exit(1);
2767 }
2768 } else {
2769 /* Parent */
5a7c647e 2770 if (childpid == -1) {
2771 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2772 strerror(errno));
2773 return REDIS_ERR;
2774 }
ed9b544e 2775 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
9f3c422c 2776 server.bgsavechildpid = childpid;
ed9b544e 2777 return REDIS_OK;
2778 }
2779 return REDIS_OK; /* unreached */
2780}
2781
a3b21203 2782static void rdbRemoveTempFile(pid_t childpid) {
2783 char tmpfile[256];
2784
2785 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2786 unlink(tmpfile);
2787}
2788
f78fd11b 2789static int rdbLoadType(FILE *fp) {
2790 unsigned char type;
7b45bfb2 2791 if (fread(&type,1,1,fp) == 0) return -1;
2792 return type;
2793}
2794
bb32ede5 2795static time_t rdbLoadTime(FILE *fp) {
2796 int32_t t32;
2797 if (fread(&t32,4,1,fp) == 0) return -1;
2798 return (time_t) t32;
2799}
2800
e3566d4b 2801/* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2802 * of this file for a description of how this are stored on disk.
2803 *
2804 * isencoded is set to 1 if the readed length is not actually a length but
2805 * an "encoding type", check the above comments for more info */
2806static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
f78fd11b 2807 unsigned char buf[2];
2808 uint32_t len;
2809
e3566d4b 2810 if (isencoded) *isencoded = 0;
f78fd11b 2811 if (rdbver == 0) {
2812 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2813 return ntohl(len);
2814 } else {
17be1a4a 2815 int type;
2816
f78fd11b 2817 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
17be1a4a 2818 type = (buf[0]&0xC0)>>6;
2819 if (type == REDIS_RDB_6BITLEN) {
f78fd11b 2820 /* Read a 6 bit len */
e3566d4b 2821 return buf[0]&0x3F;
2822 } else if (type == REDIS_RDB_ENCVAL) {
2823 /* Read a 6 bit len encoding type */
2824 if (isencoded) *isencoded = 1;
2825 return buf[0]&0x3F;
17be1a4a 2826 } else if (type == REDIS_RDB_14BITLEN) {
f78fd11b 2827 /* Read a 14 bit len */
2828 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2829 return ((buf[0]&0x3F)<<8)|buf[1];
2830 } else {
2831 /* Read a 32 bit len */
2832 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2833 return ntohl(len);
2834 }
2835 }
f78fd11b 2836}
2837
e3566d4b 2838static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2839 unsigned char enc[4];
2840 long long val;
2841
2842 if (enctype == REDIS_RDB_ENC_INT8) {
2843 if (fread(enc,1,1,fp) == 0) return NULL;
2844 val = (signed char)enc[0];
2845 } else if (enctype == REDIS_RDB_ENC_INT16) {
2846 uint16_t v;
2847 if (fread(enc,2,1,fp) == 0) return NULL;
2848 v = enc[0]|(enc[1]<<8);
2849 val = (int16_t)v;
2850 } else if (enctype == REDIS_RDB_ENC_INT32) {
2851 uint32_t v;
2852 if (fread(enc,4,1,fp) == 0) return NULL;
2853 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2854 val = (int32_t)v;
2855 } else {
2856 val = 0; /* anti-warning */
dfc5e96c 2857 redisAssert(0!=0);
e3566d4b 2858 }
2859 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2860}
2861
88e85998 2862static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2863 unsigned int len, clen;
2864 unsigned char *c = NULL;
2865 sds val = NULL;
2866
2867 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2868 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2869 if ((c = zmalloc(clen)) == NULL) goto err;
2870 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2871 if (fread(c,clen,1,fp) == 0) goto err;
2872 if (lzf_decompress(c,clen,val,len) == 0) goto err;
5109cdff 2873 zfree(c);
88e85998 2874 return createObject(REDIS_STRING,val);
2875err:
2876 zfree(c);
2877 sdsfree(val);
2878 return NULL;
2879}
2880
e3566d4b 2881static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2882 int isencoded;
2883 uint32_t len;
f78fd11b 2884 sds val;
2885
e3566d4b 2886 len = rdbLoadLen(fp,rdbver,&isencoded);
2887 if (isencoded) {
2888 switch(len) {
2889 case REDIS_RDB_ENC_INT8:
2890 case REDIS_RDB_ENC_INT16:
2891 case REDIS_RDB_ENC_INT32:
3305306f 2892 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
88e85998 2893 case REDIS_RDB_ENC_LZF:
2894 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
e3566d4b 2895 default:
dfc5e96c 2896 redisAssert(0!=0);
e3566d4b 2897 }
2898 }
2899
f78fd11b 2900 if (len == REDIS_RDB_LENERR) return NULL;
2901 val = sdsnewlen(NULL,len);
2902 if (len && fread(val,len,1,fp) == 0) {
2903 sdsfree(val);
2904 return NULL;
2905 }
10c43610 2906 return tryObjectSharing(createObject(REDIS_STRING,val));
f78fd11b 2907}
2908
a7866db6 2909/* For information about double serialization check rdbSaveDoubleValue() */
2910static int rdbLoadDoubleValue(FILE *fp, double *val) {
2911 char buf[128];
2912 unsigned char len;
2913
2914 if (fread(&len,1,1,fp) == 0) return -1;
2915 switch(len) {
2916 case 255: *val = R_NegInf; return 0;
2917 case 254: *val = R_PosInf; return 0;
2918 case 253: *val = R_Nan; return 0;
2919 default:
2920 if (fread(buf,len,1,fp) == 0) return -1;
231d758e 2921 buf[len] = '\0';
a7866db6 2922 sscanf(buf, "%lg", val);
2923 return 0;
2924 }
2925}
2926
f78fd11b 2927static int rdbLoad(char *filename) {
ed9b544e 2928 FILE *fp;
f78fd11b 2929 robj *keyobj = NULL;
2930 uint32_t dbid;
bb32ede5 2931 int type, retval, rdbver;
3305306f 2932 dict *d = server.db[0].dict;
bb32ede5 2933 redisDb *db = server.db+0;
f78fd11b 2934 char buf[1024];
bb32ede5 2935 time_t expiretime = -1, now = time(NULL);
2936
ed9b544e 2937 fp = fopen(filename,"r");
2938 if (!fp) return REDIS_ERR;
2939 if (fread(buf,9,1,fp) == 0) goto eoferr;
f78fd11b 2940 buf[9] = '\0';
2941 if (memcmp(buf,"REDIS",5) != 0) {
ed9b544e 2942 fclose(fp);
2943 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2944 return REDIS_ERR;
2945 }
f78fd11b 2946 rdbver = atoi(buf+5);
2947 if (rdbver > 1) {
2948 fclose(fp);
2949 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2950 return REDIS_ERR;
2951 }
ed9b544e 2952 while(1) {
2953 robj *o;
2954
2955 /* Read type. */
f78fd11b 2956 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
bb32ede5 2957 if (type == REDIS_EXPIRETIME) {
2958 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2959 /* We read the time so we need to read the object type again */
2960 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2961 }
ed9b544e 2962 if (type == REDIS_EOF) break;
2963 /* Handle SELECT DB opcode as a special case */
2964 if (type == REDIS_SELECTDB) {
e3566d4b 2965 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2966 goto eoferr;
ed9b544e 2967 if (dbid >= (unsigned)server.dbnum) {
f78fd11b 2968 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
ed9b544e 2969 exit(1);
2970 }
bb32ede5 2971 db = server.db+dbid;
2972 d = db->dict;
ed9b544e 2973 continue;
2974 }
2975 /* Read key */
f78fd11b 2976 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
ed9b544e 2977
2978 if (type == REDIS_STRING) {
2979 /* Read string value */
f78fd11b 2980 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
942a3961 2981 tryObjectEncoding(o);
ed9b544e 2982 } else if (type == REDIS_LIST || type == REDIS_SET) {
2983 /* Read list/set value */
2984 uint32_t listlen;
f78fd11b 2985
e3566d4b 2986 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
f78fd11b 2987 goto eoferr;
ed9b544e 2988 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2989 /* Load every single element of the list/set */
2990 while(listlen--) {
2991 robj *ele;
2992
f78fd11b 2993 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
942a3961 2994 tryObjectEncoding(ele);
ed9b544e 2995 if (type == REDIS_LIST) {
6b47e12e 2996 listAddNodeTail((list*)o->ptr,ele);
ed9b544e 2997 } else {
6b47e12e 2998 dictAdd((dict*)o->ptr,ele,NULL);
ed9b544e 2999 }
ed9b544e 3000 }
2b59cfdf 3001 } else if (type == REDIS_ZSET) {
3002 /* Read list/set value */
3003 uint32_t zsetlen;
3004 zset *zs;
3005
3006 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
3007 goto eoferr;
3008 o = createZsetObject();
3009 zs = o->ptr;
3010 /* Load every single element of the list/set */
3011 while(zsetlen--) {
3012 robj *ele;
3013 double *score = zmalloc(sizeof(double));
3014
3015 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
3016 tryObjectEncoding(ele);
3017 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
3018 dictAdd(zs->dict,ele,score);
3019 zslInsert(zs->zsl,*score,ele);
3020 incrRefCount(ele); /* added to skiplist */
3021 }
ed9b544e 3022 } else {
dfc5e96c 3023 redisAssert(0 != 0);
ed9b544e 3024 }
3025 /* Add the new object in the hash table */
f78fd11b 3026 retval = dictAdd(d,keyobj,o);
ed9b544e 3027 if (retval == DICT_ERR) {
f78fd11b 3028 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
ed9b544e 3029 exit(1);
3030 }
bb32ede5 3031 /* Set the expire time if needed */
3032 if (expiretime != -1) {
3033 setExpire(db,keyobj,expiretime);
3034 /* Delete this key if already expired */
3035 if (expiretime < now) deleteKey(db,keyobj);
3036 expiretime = -1;
3037 }
f78fd11b 3038 keyobj = o = NULL;
ed9b544e 3039 }
3040 fclose(fp);
3041 return REDIS_OK;
3042
3043eoferr: /* unexpected end of file is handled here with a fatal exit */
e3566d4b 3044 if (keyobj) decrRefCount(keyobj);
f80dff62 3045 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
ed9b544e 3046 exit(1);
3047 return REDIS_ERR; /* Just to avoid warning */
3048}
3049
3050/*================================== Commands =============================== */
3051
abcb223e 3052static void authCommand(redisClient *c) {
2e77c2ee 3053 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
abcb223e
BH
3054 c->authenticated = 1;
3055 addReply(c,shared.ok);
3056 } else {
3057 c->authenticated = 0;
fa4c0aba 3058 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
abcb223e
BH
3059 }
3060}
3061
ed9b544e 3062static void pingCommand(redisClient *c) {
3063 addReply(c,shared.pong);
3064}
3065
3066static void echoCommand(redisClient *c) {
942a3961 3067 addReplyBulkLen(c,c->argv[1]);
ed9b544e 3068 addReply(c,c->argv[1]);
3069 addReply(c,shared.crlf);
3070}
3071
3072/*=================================== Strings =============================== */
3073
3074static void setGenericCommand(redisClient *c, int nx) {
3075 int retval;
3076
333fd216 3077 if (nx) deleteIfVolatile(c->db,c->argv[1]);
3305306f 3078 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
ed9b544e 3079 if (retval == DICT_ERR) {
3080 if (!nx) {
3305306f 3081 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
ed9b544e 3082 incrRefCount(c->argv[2]);
3083 } else {
c937aa89 3084 addReply(c,shared.czero);
ed9b544e 3085 return;
3086 }
3087 } else {
3088 incrRefCount(c->argv[1]);
3089 incrRefCount(c->argv[2]);
3090 }
3091 server.dirty++;
3305306f 3092 removeExpire(c->db,c->argv[1]);
c937aa89 3093 addReply(c, nx ? shared.cone : shared.ok);
ed9b544e 3094}
3095
3096static void setCommand(redisClient *c) {
a4d1ba9a 3097 setGenericCommand(c,0);
ed9b544e 3098}
3099
3100static void setnxCommand(redisClient *c) {
a4d1ba9a 3101 setGenericCommand(c,1);
ed9b544e 3102}
3103
322fc7d8 3104static int getGenericCommand(redisClient *c) {
3305306f 3105 robj *o = lookupKeyRead(c->db,c->argv[1]);
3106
3107 if (o == NULL) {
c937aa89 3108 addReply(c,shared.nullbulk);
322fc7d8 3109 return REDIS_OK;
ed9b544e 3110 } else {
ed9b544e 3111 if (o->type != REDIS_STRING) {
c937aa89 3112 addReply(c,shared.wrongtypeerr);
322fc7d8 3113 return REDIS_ERR;
ed9b544e 3114 } else {
942a3961 3115 addReplyBulkLen(c,o);
ed9b544e 3116 addReply(c,o);
3117 addReply(c,shared.crlf);
322fc7d8 3118 return REDIS_OK;
ed9b544e 3119 }
3120 }
3121}
3122
322fc7d8 3123static void getCommand(redisClient *c) {
3124 getGenericCommand(c);
3125}
3126
f6b141c5 3127static void getsetCommand(redisClient *c) {
322fc7d8 3128 if (getGenericCommand(c) == REDIS_ERR) return;
a431eb74 3129 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3130 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3131 } else {
3132 incrRefCount(c->argv[1]);
3133 }
3134 incrRefCount(c->argv[2]);
3135 server.dirty++;
3136 removeExpire(c->db,c->argv[1]);
3137}
3138
70003d28 3139static void mgetCommand(redisClient *c) {
70003d28 3140 int j;
3141
c937aa89 3142 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
70003d28 3143 for (j = 1; j < c->argc; j++) {
3305306f 3144 robj *o = lookupKeyRead(c->db,c->argv[j]);
3145 if (o == NULL) {
c937aa89 3146 addReply(c,shared.nullbulk);
70003d28 3147 } else {
70003d28 3148 if (o->type != REDIS_STRING) {
c937aa89 3149 addReply(c,shared.nullbulk);
70003d28 3150 } else {
942a3961 3151 addReplyBulkLen(c,o);
70003d28 3152 addReply(c,o);
3153 addReply(c,shared.crlf);
3154 }
3155 }
3156 }
3157}
3158
6c446631 3159static void msetGenericCommand(redisClient *c, int nx) {
906573e7 3160 int j, busykeys = 0;
6c446631 3161
3162 if ((c->argc % 2) == 0) {
454d4e43 3163 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
6c446631 3164 return;
3165 }
3166 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3167 * set nothing at all if at least one already key exists. */
3168 if (nx) {
3169 for (j = 1; j < c->argc; j += 2) {
906573e7 3170 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3171 busykeys++;
6c446631 3172 }
3173 }
3174 }
906573e7 3175 if (busykeys) {
3176 addReply(c, shared.czero);
3177 return;
3178 }
6c446631 3179
3180 for (j = 1; j < c->argc; j += 2) {
3181 int retval;
3182
17511391 3183 tryObjectEncoding(c->argv[j+1]);
6c446631 3184 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3185 if (retval == DICT_ERR) {
3186 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3187 incrRefCount(c->argv[j+1]);
3188 } else {
3189 incrRefCount(c->argv[j]);
3190 incrRefCount(c->argv[j+1]);
3191 }
3192 removeExpire(c->db,c->argv[j]);
3193 }
3194 server.dirty += (c->argc-1)/2;
3195 addReply(c, nx ? shared.cone : shared.ok);
3196}
3197
3198static void msetCommand(redisClient *c) {
3199 msetGenericCommand(c,0);
3200}
3201
3202static void msetnxCommand(redisClient *c) {
3203 msetGenericCommand(c,1);
3204}
3205
d68ed120 3206static void incrDecrCommand(redisClient *c, long long incr) {
ed9b544e 3207 long long value;
3208 int retval;
3209 robj *o;
3210
3305306f 3211 o = lookupKeyWrite(c->db,c->argv[1]);
3212 if (o == NULL) {
ed9b544e 3213 value = 0;
3214 } else {
ed9b544e 3215 if (o->type != REDIS_STRING) {
3216 value = 0;
3217 } else {
3218 char *eptr;
3219
942a3961 3220 if (o->encoding == REDIS_ENCODING_RAW)
3221 value = strtoll(o->ptr, &eptr, 10);
3222 else if (o->encoding == REDIS_ENCODING_INT)
3223 value = (long)o->ptr;
3224 else
dfc5e96c 3225 redisAssert(1 != 1);
ed9b544e 3226 }
3227 }
3228
3229 value += incr;
3230 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
942a3961 3231 tryObjectEncoding(o);
3305306f 3232 retval = dictAdd(c->db->dict,c->argv[1],o);
ed9b544e 3233 if (retval == DICT_ERR) {
3305306f 3234 dictReplace(c->db->dict,c->argv[1],o);
3235 removeExpire(c->db,c->argv[1]);
ed9b544e 3236 } else {
3237 incrRefCount(c->argv[1]);
3238 }
3239 server.dirty++;
c937aa89 3240 addReply(c,shared.colon);
ed9b544e 3241 addReply(c,o);
3242 addReply(c,shared.crlf);
3243}
3244
3245static void incrCommand(redisClient *c) {
a4d1ba9a 3246 incrDecrCommand(c,1);
ed9b544e 3247}
3248
3249static void decrCommand(redisClient *c) {
a4d1ba9a 3250 incrDecrCommand(c,-1);
ed9b544e 3251}
3252
3253static void incrbyCommand(redisClient *c) {
d68ed120 3254 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
a4d1ba9a 3255 incrDecrCommand(c,incr);
ed9b544e 3256}
3257
3258static void decrbyCommand(redisClient *c) {
d68ed120 3259 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
a4d1ba9a 3260 incrDecrCommand(c,-incr);
ed9b544e 3261}
3262
3263/* ========================= Type agnostic commands ========================= */
3264
3265static void delCommand(redisClient *c) {
5109cdff 3266 int deleted = 0, j;
3267
3268 for (j = 1; j < c->argc; j++) {
3269 if (deleteKey(c->db,c->argv[j])) {
3270 server.dirty++;
3271 deleted++;
3272 }
3273 }
3274 switch(deleted) {
3275 case 0:
c937aa89 3276 addReply(c,shared.czero);
5109cdff 3277 break;
3278 case 1:
3279 addReply(c,shared.cone);
3280 break;
3281 default:
3282 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3283 break;
ed9b544e 3284 }
3285}
3286
3287static void existsCommand(redisClient *c) {
3305306f 3288 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
ed9b544e 3289}
3290
3291static void selectCommand(redisClient *c) {
3292 int id = atoi(c->argv[1]->ptr);
3293
3294 if (selectDb(c,id) == REDIS_ERR) {
774e3047 3295 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
ed9b544e 3296 } else {
3297 addReply(c,shared.ok);
3298 }
3299}
3300
3301static void randomkeyCommand(redisClient *c) {
3302 dictEntry *de;
3305306f 3303
3304 while(1) {
3305 de = dictGetRandomKey(c->db->dict);
ce7bef07 3306 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3305306f 3307 }
ed9b544e 3308 if (de == NULL) {
ce7bef07 3309 addReply(c,shared.plus);
ed9b544e 3310 addReply(c,shared.crlf);
3311 } else {
c937aa89 3312 addReply(c,shared.plus);
ed9b544e 3313 addReply(c,dictGetEntryKey(de));
3314 addReply(c,shared.crlf);
3315 }
3316}
3317
3318static void keysCommand(redisClient *c) {
3319 dictIterator *di;
3320 dictEntry *de;
3321 sds pattern = c->argv[1]->ptr;
3322 int plen = sdslen(pattern);
682ac724 3323 unsigned long numkeys = 0, keyslen = 0;
ed9b544e 3324 robj *lenobj = createObject(REDIS_STRING,NULL);
3325
3305306f 3326 di = dictGetIterator(c->db->dict);
ed9b544e 3327 addReply(c,lenobj);
3328 decrRefCount(lenobj);
3329 while((de = dictNext(di)) != NULL) {
3330 robj *keyobj = dictGetEntryKey(de);
3305306f 3331
ed9b544e 3332 sds key = keyobj->ptr;
3333 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3334 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3305306f 3335 if (expireIfNeeded(c->db,keyobj) == 0) {
3336 if (numkeys != 0)
3337 addReply(c,shared.space);
3338 addReply(c,keyobj);
3339 numkeys++;
3340 keyslen += sdslen(key);
3341 }
ed9b544e 3342 }
3343 }
3344 dictReleaseIterator(di);
c937aa89 3345 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
ed9b544e 3346 addReply(c,shared.crlf);
3347}
3348
3349static void dbsizeCommand(redisClient *c) {
3350 addReplySds(c,
3305306f 3351 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
ed9b544e 3352}
3353
3354static void lastsaveCommand(redisClient *c) {
3355 addReplySds(c,
c937aa89 3356 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
ed9b544e 3357}
3358
3359static void typeCommand(redisClient *c) {
3305306f 3360 robj *o;
ed9b544e 3361 char *type;
3305306f 3362
3363 o = lookupKeyRead(c->db,c->argv[1]);
3364 if (o == NULL) {
c937aa89 3365 type = "+none";
ed9b544e 3366 } else {
ed9b544e 3367 switch(o->type) {
c937aa89 3368 case REDIS_STRING: type = "+string"; break;
3369 case REDIS_LIST: type = "+list"; break;
3370 case REDIS_SET: type = "+set"; break;
412a8bce 3371 case REDIS_ZSET: type = "+zset"; break;
ed9b544e 3372 default: type = "unknown"; break;
3373 }
3374 }
3375 addReplySds(c,sdsnew(type));
3376 addReply(c,shared.crlf);
3377}
3378
3379static void saveCommand(redisClient *c) {
9d65a1bb 3380 if (server.bgsavechildpid != -1) {
05557f6d 3381 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3382 return;
3383 }
f78fd11b 3384 if (rdbSave(server.dbfilename) == REDIS_OK) {
ed9b544e 3385 addReply(c,shared.ok);
3386 } else {
3387 addReply(c,shared.err);
3388 }
3389}
3390
3391static void bgsaveCommand(redisClient *c) {
9d65a1bb 3392 if (server.bgsavechildpid != -1) {
ed9b544e 3393 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3394 return;
3395 }
f78fd11b 3396 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
49b99ab4 3397 char *status = "+Background saving started\r\n";
3398 addReplySds(c,sdsnew(status));
ed9b544e 3399 } else {
3400 addReply(c,shared.err);
3401 }
3402}
3403
3404static void shutdownCommand(redisClient *c) {
3405 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
a3b21203 3406 /* Kill the saving child if there is a background saving in progress.
3407 We want to avoid race conditions, for instance our saving child may
3408 overwrite the synchronous saving did by SHUTDOWN. */
9d65a1bb 3409 if (server.bgsavechildpid != -1) {
9f3c422c 3410 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3411 kill(server.bgsavechildpid,SIGKILL);
a3b21203 3412 rdbRemoveTempFile(server.bgsavechildpid);
9f3c422c 3413 }
ac945e2d 3414 if (server.appendonly) {
3415 /* Append only file: fsync() the AOF and exit */
3416 fsync(server.appendfd);
3417 exit(0);
ed9b544e 3418 } else {
ac945e2d 3419 /* Snapshotting. Perform a SYNC SAVE and exit */
3420 if (rdbSave(server.dbfilename) == REDIS_OK) {
3421 if (server.daemonize)
3422 unlink(server.pidfile);
3423 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3424 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3425 exit(0);
3426 } else {
3427 /* Ooops.. error saving! The best we can do is to continue operating.
3428 * Note that if there was a background saving process, in the next
3429 * cron() Redis will be notified that the background saving aborted,
3430 * handling special stuff like slaves pending for synchronization... */
3431 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3432 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3433 }
ed9b544e 3434 }
3435}
3436
3437static void renameGenericCommand(redisClient *c, int nx) {
ed9b544e 3438 robj *o;
3439
3440 /* To use the same key as src and dst is probably an error */
3441 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
c937aa89 3442 addReply(c,shared.sameobjecterr);
ed9b544e 3443 return;
3444 }
3445
3305306f 3446 o = lookupKeyWrite(c->db,c->argv[1]);
3447 if (o == NULL) {
c937aa89 3448 addReply(c,shared.nokeyerr);
ed9b544e 3449 return;
3450 }
ed9b544e 3451 incrRefCount(o);
3305306f 3452 deleteIfVolatile(c->db,c->argv[2]);
3453 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
ed9b544e 3454 if (nx) {
3455 decrRefCount(o);
c937aa89 3456 addReply(c,shared.czero);
ed9b544e 3457 return;
3458 }
3305306f 3459 dictReplace(c->db->dict,c->argv[2],o);
ed9b544e 3460 } else {
3461 incrRefCount(c->argv[2]);
3462 }
3305306f 3463 deleteKey(c->db,c->argv[1]);
ed9b544e 3464 server.dirty++;
c937aa89 3465 addReply(c,nx ? shared.cone : shared.ok);
ed9b544e 3466}
3467
3468static void renameCommand(redisClient *c) {
3469 renameGenericCommand(c,0);
3470}
3471
3472static void renamenxCommand(redisClient *c) {
3473 renameGenericCommand(c,1);
3474}
3475
3476static void moveCommand(redisClient *c) {
3305306f 3477 robj *o;
3478 redisDb *src, *dst;
ed9b544e 3479 int srcid;
3480
3481 /* Obtain source and target DB pointers */
3305306f 3482 src = c->db;
3483 srcid = c->db->id;
ed9b544e 3484 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
c937aa89 3485 addReply(c,shared.outofrangeerr);
ed9b544e 3486 return;
3487 }
3305306f 3488 dst = c->db;
3489 selectDb(c,srcid); /* Back to the source DB */
ed9b544e 3490
3491 /* If the user is moving using as target the same
3492 * DB as the source DB it is probably an error. */
3493 if (src == dst) {
c937aa89 3494 addReply(c,shared.sameobjecterr);
ed9b544e 3495 return;
3496 }
3497
3498 /* Check if the element exists and get a reference */
3305306f 3499 o = lookupKeyWrite(c->db,c->argv[1]);
3500 if (!o) {
c937aa89 3501 addReply(c,shared.czero);
ed9b544e 3502 return;
3503 }
3504
3505 /* Try to add the element to the target DB */
3305306f 3506 deleteIfVolatile(dst,c->argv[1]);
3507 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
c937aa89 3508 addReply(c,shared.czero);
ed9b544e 3509 return;
3510 }
3305306f 3511 incrRefCount(c->argv[1]);
ed9b544e 3512 incrRefCount(o);
3513
3514 /* OK! key moved, free the entry in the source DB */
3305306f 3515 deleteKey(src,c->argv[1]);
ed9b544e 3516 server.dirty++;
c937aa89 3517 addReply(c,shared.cone);
ed9b544e 3518}
3519
3520/* =================================== Lists ================================ */
3521static void pushGenericCommand(redisClient *c, int where) {
3522 robj *lobj;
ed9b544e 3523 list *list;
3305306f 3524
3525 lobj = lookupKeyWrite(c->db,c->argv[1]);
3526 if (lobj == NULL) {
95242ab5 3527 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3528 addReply(c,shared.ok);
3529 return;
3530 }
ed9b544e 3531 lobj = createListObject();
3532 list = lobj->ptr;
3533 if (where == REDIS_HEAD) {
6b47e12e 3534 listAddNodeHead(list,c->argv[2]);
ed9b544e 3535 } else {
6b47e12e 3536 listAddNodeTail(list,c->argv[2]);
ed9b544e 3537 }
3305306f 3538 dictAdd(c->db->dict,c->argv[1],lobj);
ed9b544e 3539 incrRefCount(c->argv[1]);
3540 incrRefCount(c->argv[2]);
3541 } else {
ed9b544e 3542 if (lobj->type != REDIS_LIST) {
3543 addReply(c,shared.wrongtypeerr);
3544 return;
3545 }
95242ab5 3546 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3547 addReply(c,shared.ok);
3548 return;
3549 }
ed9b544e 3550 list = lobj->ptr;
3551 if (where == REDIS_HEAD) {
6b47e12e 3552 listAddNodeHead(list,c->argv[2]);
ed9b544e 3553 } else {
6b47e12e 3554 listAddNodeTail(list,c->argv[2]);
ed9b544e 3555 }
3556 incrRefCount(c->argv[2]);
3557 }
3558 server.dirty++;
3559 addReply(c,shared.ok);
3560}
3561
3562static void lpushCommand(redisClient *c) {
3563 pushGenericCommand(c,REDIS_HEAD);
3564}
3565
3566static void rpushCommand(redisClient *c) {
3567 pushGenericCommand(c,REDIS_TAIL);
3568}
3569
3570static void llenCommand(redisClient *c) {
3305306f 3571 robj *o;
ed9b544e 3572 list *l;
3573
3305306f 3574 o = lookupKeyRead(c->db,c->argv[1]);
3575 if (o == NULL) {
c937aa89 3576 addReply(c,shared.czero);
ed9b544e 3577 return;
3578 } else {
ed9b544e 3579 if (o->type != REDIS_LIST) {
c937aa89 3580 addReply(c,shared.wrongtypeerr);
ed9b544e 3581 } else {
3582 l = o->ptr;
c937aa89 3583 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
ed9b544e 3584 }
3585 }
3586}
3587
3588static void lindexCommand(redisClient *c) {
3305306f 3589 robj *o;
ed9b544e 3590 int index = atoi(c->argv[2]->ptr);
3591
3305306f 3592 o = lookupKeyRead(c->db,c->argv[1]);
3593 if (o == NULL) {
c937aa89 3594 addReply(c,shared.nullbulk);
ed9b544e 3595 } else {
ed9b544e 3596 if (o->type != REDIS_LIST) {
c937aa89 3597 addReply(c,shared.wrongtypeerr);
ed9b544e 3598 } else {
3599 list *list = o->ptr;
3600 listNode *ln;
3601
3602 ln = listIndex(list, index);
3603 if (ln == NULL) {
c937aa89 3604 addReply(c,shared.nullbulk);
ed9b544e 3605 } else {
3606 robj *ele = listNodeValue(ln);
942a3961 3607 addReplyBulkLen(c,ele);
ed9b544e 3608 addReply(c,ele);
3609 addReply(c,shared.crlf);
3610 }
3611 }
3612 }
3613}
3614
3615static void lsetCommand(redisClient *c) {
3305306f 3616 robj *o;
ed9b544e 3617 int index = atoi(c->argv[2]->ptr);
3618
3305306f 3619 o = lookupKeyWrite(c->db,c->argv[1]);
3620 if (o == NULL) {
ed9b544e 3621 addReply(c,shared.nokeyerr);
3622 } else {
ed9b544e 3623 if (o->type != REDIS_LIST) {
3624 addReply(c,shared.wrongtypeerr);
3625 } else {
3626 list *list = o->ptr;
3627 listNode *ln;
3628
3629 ln = listIndex(list, index);
3630 if (ln == NULL) {
c937aa89 3631 addReply(c,shared.outofrangeerr);
ed9b544e 3632 } else {
3633 robj *ele = listNodeValue(ln);
3634
3635 decrRefCount(ele);
3636 listNodeValue(ln) = c->argv[3];
3637 incrRefCount(c->argv[3]);
3638 addReply(c,shared.ok);
3639 server.dirty++;
3640 }
3641 }
3642 }
3643}
3644
3645static void popGenericCommand(redisClient *c, int where) {
3305306f 3646 robj *o;
3647
3648 o = lookupKeyWrite(c->db,c->argv[1]);
3649 if (o == NULL) {
c937aa89 3650 addReply(c,shared.nullbulk);
ed9b544e 3651 } else {
ed9b544e 3652 if (o->type != REDIS_LIST) {
c937aa89 3653 addReply(c,shared.wrongtypeerr);
ed9b544e 3654 } else {
3655 list *list = o->ptr;
3656 listNode *ln;
3657
3658 if (where == REDIS_HEAD)
3659 ln = listFirst(list);
3660 else
3661 ln = listLast(list);
3662
3663 if (ln == NULL) {
c937aa89 3664 addReply(c,shared.nullbulk);
ed9b544e 3665 } else {
3666 robj *ele = listNodeValue(ln);
942a3961 3667 addReplyBulkLen(c,ele);
ed9b544e 3668 addReply(c,ele);
3669 addReply(c,shared.crlf);
3670 listDelNode(list,ln);
3671 server.dirty++;
3672 }
3673 }
3674 }
3675}
3676
3677static void lpopCommand(redisClient *c) {
3678 popGenericCommand(c,REDIS_HEAD);
3679}
3680
3681static void rpopCommand(redisClient *c) {
3682 popGenericCommand(c,REDIS_TAIL);
3683}
3684
3685static void lrangeCommand(redisClient *c) {
3305306f 3686 robj *o;
ed9b544e 3687 int start = atoi(c->argv[2]->ptr);
3688 int end = atoi(c->argv[3]->ptr);
3305306f 3689
3690 o = lookupKeyRead(c->db,c->argv[1]);
3691 if (o == NULL) {
c937aa89 3692 addReply(c,shared.nullmultibulk);
ed9b544e 3693 } else {
ed9b544e 3694 if (o->type != REDIS_LIST) {
c937aa89 3695 addReply(c,shared.wrongtypeerr);
ed9b544e 3696 } else {
3697 list *list = o->ptr;
3698 listNode *ln;
3699 int llen = listLength(list);
3700 int rangelen, j;
3701 robj *ele;
3702
3703 /* convert negative indexes */
3704 if (start < 0) start = llen+start;
3705 if (end < 0) end = llen+end;
3706 if (start < 0) start = 0;
3707 if (end < 0) end = 0;
3708
3709 /* indexes sanity checks */
3710 if (start > end || start >= llen) {
3711 /* Out of range start or start > end result in empty list */
c937aa89 3712 addReply(c,shared.emptymultibulk);
ed9b544e 3713 return;
3714 }
3715 if (end >= llen) end = llen-1;
3716 rangelen = (end-start)+1;
3717
3718 /* Return the result in form of a multi-bulk reply */
3719 ln = listIndex(list, start);
c937aa89 3720 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
ed9b544e 3721 for (j = 0; j < rangelen; j++) {
3722 ele = listNodeValue(ln);
942a3961 3723 addReplyBulkLen(c,ele);
ed9b544e 3724 addReply(c,ele);
3725 addReply(c,shared.crlf);
3726 ln = ln->next;
3727 }
3728 }
3729 }
3730}
3731
3732static void ltrimCommand(redisClient *c) {
3305306f 3733 robj *o;
ed9b544e 3734 int start = atoi(c->argv[2]->ptr);
3735 int end = atoi(c->argv[3]->ptr);
3736
3305306f 3737 o = lookupKeyWrite(c->db,c->argv[1]);
3738 if (o == NULL) {
ab9d4cb1 3739 addReply(c,shared.ok);
ed9b544e 3740 } else {
ed9b544e 3741 if (o->type != REDIS_LIST) {
3742 addReply(c,shared.wrongtypeerr);
3743 } else {
3744 list *list = o->ptr;
3745 listNode *ln;
3746 int llen = listLength(list);
3747 int j, ltrim, rtrim;
3748
3749 /* convert negative indexes */
3750 if (start < 0) start = llen+start;
3751 if (end < 0) end = llen+end;
3752 if (start < 0) start = 0;
3753 if (end < 0) end = 0;
3754
3755 /* indexes sanity checks */
3756 if (start > end || start >= llen) {
3757 /* Out of range start or start > end result in empty list */
3758 ltrim = llen;
3759 rtrim = 0;
3760 } else {
3761 if (end >= llen) end = llen-1;
3762 ltrim = start;
3763 rtrim = llen-end-1;
3764 }
3765
3766 /* Remove list elements to perform the trim */
3767 for (j = 0; j < ltrim; j++) {
3768 ln = listFirst(list);
3769 listDelNode(list,ln);
3770 }
3771 for (j = 0; j < rtrim; j++) {
3772 ln = listLast(list);
3773 listDelNode(list,ln);
3774 }
ed9b544e 3775 server.dirty++;
e59229a2 3776 addReply(c,shared.ok);
ed9b544e 3777 }
3778 }
3779}
3780
3781static void lremCommand(redisClient *c) {
3305306f 3782 robj *o;
ed9b544e 3783
3305306f 3784 o = lookupKeyWrite(c->db,c->argv[1]);
3785 if (o == NULL) {
33c08b39 3786 addReply(c,shared.czero);
ed9b544e 3787 } else {
ed9b544e 3788 if (o->type != REDIS_LIST) {
c937aa89 3789 addReply(c,shared.wrongtypeerr);
ed9b544e 3790 } else {
3791 list *list = o->ptr;
3792 listNode *ln, *next;
3793 int toremove = atoi(c->argv[2]->ptr);
3794 int removed = 0;
3795 int fromtail = 0;
3796
3797 if (toremove < 0) {
3798 toremove = -toremove;
3799 fromtail = 1;
3800 }
3801 ln = fromtail ? list->tail : list->head;
3802 while (ln) {
ed9b544e 3803 robj *ele = listNodeValue(ln);
a4d1ba9a 3804
3805 next = fromtail ? ln->prev : ln->next;
724a51b1 3806 if (compareStringObjects(ele,c->argv[3]) == 0) {
ed9b544e 3807 listDelNode(list,ln);
3808 server.dirty++;
3809 removed++;
3810 if (toremove && removed == toremove) break;
3811 }
3812 ln = next;
3813 }
c937aa89 3814 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
ed9b544e 3815 }
3816 }
3817}
3818
12f9d551 3819/* This is the semantic of this command:
0f5f7e9a 3820 * RPOPLPUSH srclist dstlist:
12f9d551 3821 * IF LLEN(srclist) > 0
3822 * element = RPOP srclist
3823 * LPUSH dstlist element
3824 * RETURN element
3825 * ELSE
3826 * RETURN nil
3827 * END
3828 * END
3829 *
3830 * The idea is to be able to get an element from a list in a reliable way
3831 * since the element is not just returned but pushed against another list
3832 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3833 */
0f5f7e9a 3834static void rpoplpushcommand(redisClient *c) {
12f9d551 3835 robj *sobj;
3836
3837 sobj = lookupKeyWrite(c->db,c->argv[1]);
3838 if (sobj == NULL) {
3839 addReply(c,shared.nullbulk);
3840 } else {
3841 if (sobj->type != REDIS_LIST) {
3842 addReply(c,shared.wrongtypeerr);
3843 } else {
3844 list *srclist = sobj->ptr;
3845 listNode *ln = listLast(srclist);
3846
3847 if (ln == NULL) {
3848 addReply(c,shared.nullbulk);
3849 } else {
3850 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3851 robj *ele = listNodeValue(ln);
3852 list *dstlist;
3853
3854 if (dobj == NULL) {
3855
3856 /* Create the list if the key does not exist */
3857 dobj = createListObject();
3858 dictAdd(c->db->dict,c->argv[2],dobj);
3859 incrRefCount(c->argv[2]);
3860 } else if (dobj->type != REDIS_LIST) {
3861 addReply(c,shared.wrongtypeerr);
3862 return;
3863 }
3864 /* Add the element to the target list */
3865 dstlist = dobj->ptr;
3866 listAddNodeHead(dstlist,ele);
3867 incrRefCount(ele);
3868
3869 /* Send the element to the client as reply as well */
3870 addReplyBulkLen(c,ele);
3871 addReply(c,ele);
3872 addReply(c,shared.crlf);
3873
3874 /* Finally remove the element from the source list */
3875 listDelNode(srclist,ln);
3876 server.dirty++;
3877 }
3878 }
3879 }
3880}
3881
3882
ed9b544e 3883/* ==================================== Sets ================================ */
3884
3885static void saddCommand(redisClient *c) {
ed9b544e 3886 robj *set;
3887
3305306f 3888 set = lookupKeyWrite(c->db,c->argv[1]);
3889 if (set == NULL) {
ed9b544e 3890 set = createSetObject();
3305306f 3891 dictAdd(c->db->dict,c->argv[1],set);
ed9b544e 3892 incrRefCount(c->argv[1]);
3893 } else {
ed9b544e 3894 if (set->type != REDIS_SET) {
c937aa89 3895 addReply(c,shared.wrongtypeerr);
ed9b544e 3896 return;
3897 }
3898 }
3899 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3900 incrRefCount(c->argv[2]);
3901 server.dirty++;
c937aa89 3902 addReply(c,shared.cone);
ed9b544e 3903 } else {
c937aa89 3904 addReply(c,shared.czero);
ed9b544e 3905 }
3906}
3907
3908static void sremCommand(redisClient *c) {
3305306f 3909 robj *set;
ed9b544e 3910
3305306f 3911 set = lookupKeyWrite(c->db,c->argv[1]);
3912 if (set == NULL) {
c937aa89 3913 addReply(c,shared.czero);
ed9b544e 3914 } else {
ed9b544e 3915 if (set->type != REDIS_SET) {
c937aa89 3916 addReply(c,shared.wrongtypeerr);
ed9b544e 3917 return;
3918 }
3919 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3920 server.dirty++;
12fea928 3921 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
c937aa89 3922 addReply(c,shared.cone);
ed9b544e 3923 } else {
c937aa89 3924 addReply(c,shared.czero);
ed9b544e 3925 }
3926 }
3927}
3928
a4460ef4 3929static void smoveCommand(redisClient *c) {
3930 robj *srcset, *dstset;
3931
3932 srcset = lookupKeyWrite(c->db,c->argv[1]);
3933 dstset = lookupKeyWrite(c->db,c->argv[2]);
3934
3935 /* If the source key does not exist return 0, if it's of the wrong type
3936 * raise an error */
3937 if (srcset == NULL || srcset->type != REDIS_SET) {
3938 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3939 return;
3940 }
3941 /* Error if the destination key is not a set as well */
3942 if (dstset && dstset->type != REDIS_SET) {
3943 addReply(c,shared.wrongtypeerr);
3944 return;
3945 }
3946 /* Remove the element from the source set */
3947 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3948 /* Key not found in the src set! return zero */
3949 addReply(c,shared.czero);
3950 return;
3951 }
3952 server.dirty++;
3953 /* Add the element to the destination set */
3954 if (!dstset) {
3955 dstset = createSetObject();
3956 dictAdd(c->db->dict,c->argv[2],dstset);
3957 incrRefCount(c->argv[2]);
3958 }
3959 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3960 incrRefCount(c->argv[3]);
3961 addReply(c,shared.cone);
3962}
3963
ed9b544e 3964static void sismemberCommand(redisClient *c) {
3305306f 3965 robj *set;
ed9b544e 3966
3305306f 3967 set = lookupKeyRead(c->db,c->argv[1]);
3968 if (set == NULL) {
c937aa89 3969 addReply(c,shared.czero);
ed9b544e 3970 } else {
ed9b544e 3971 if (set->type != REDIS_SET) {
c937aa89 3972 addReply(c,shared.wrongtypeerr);
ed9b544e 3973 return;
3974 }
3975 if (dictFind(set->ptr,c->argv[2]))
c937aa89 3976 addReply(c,shared.cone);
ed9b544e 3977 else
c937aa89 3978 addReply(c,shared.czero);
ed9b544e 3979 }
3980}
3981
3982static void scardCommand(redisClient *c) {
3305306f 3983 robj *o;
ed9b544e 3984 dict *s;
3985
3305306f 3986 o = lookupKeyRead(c->db,c->argv[1]);
3987 if (o == NULL) {
c937aa89 3988 addReply(c,shared.czero);
ed9b544e 3989 return;
3990 } else {
ed9b544e 3991 if (o->type != REDIS_SET) {
c937aa89 3992 addReply(c,shared.wrongtypeerr);
ed9b544e 3993 } else {
3994 s = o->ptr;
682ac724 3995 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
3305306f 3996 dictSize(s)));
ed9b544e 3997 }
3998 }
3999}
4000
12fea928 4001static void spopCommand(redisClient *c) {
4002 robj *set;
4003 dictEntry *de;
4004
4005 set = lookupKeyWrite(c->db,c->argv[1]);
4006 if (set == NULL) {
4007 addReply(c,shared.nullbulk);
4008 } else {
4009 if (set->type != REDIS_SET) {
4010 addReply(c,shared.wrongtypeerr);
4011 return;
4012 }
4013 de = dictGetRandomKey(set->ptr);
4014 if (de == NULL) {
4015 addReply(c,shared.nullbulk);
4016 } else {
4017 robj *ele = dictGetEntryKey(de);
4018
942a3961 4019 addReplyBulkLen(c,ele);
12fea928 4020 addReply(c,ele);
4021 addReply(c,shared.crlf);
4022 dictDelete(set->ptr,ele);
4023 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
4024 server.dirty++;
4025 }
4026 }
4027}
4028
2abb95a9 4029static void srandmemberCommand(redisClient *c) {
4030 robj *set;
4031 dictEntry *de;
4032
4033 set = lookupKeyRead(c->db,c->argv[1]);
4034 if (set == NULL) {
4035 addReply(c,shared.nullbulk);
4036 } else {
4037 if (set->type != REDIS_SET) {
4038 addReply(c,shared.wrongtypeerr);
4039 return;
4040 }
4041 de = dictGetRandomKey(set->ptr);
4042 if (de == NULL) {
4043 addReply(c,shared.nullbulk);
4044 } else {
4045 robj *ele = dictGetEntryKey(de);
4046
4047 addReplyBulkLen(c,ele);
4048 addReply(c,ele);
4049 addReply(c,shared.crlf);
4050 }
4051 }
4052}
4053
ed9b544e 4054static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
4055 dict **d1 = (void*) s1, **d2 = (void*) s2;
4056
3305306f 4057 return dictSize(*d1)-dictSize(*d2);
ed9b544e 4058}
4059
682ac724 4060static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
ed9b544e 4061 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4062 dictIterator *di;
4063 dictEntry *de;
4064 robj *lenobj = NULL, *dstset = NULL;
682ac724 4065 unsigned long j, cardinality = 0;
ed9b544e 4066
ed9b544e 4067 for (j = 0; j < setsnum; j++) {
4068 robj *setobj;
3305306f 4069
4070 setobj = dstkey ?
4071 lookupKeyWrite(c->db,setskeys[j]) :
4072 lookupKeyRead(c->db,setskeys[j]);
4073 if (!setobj) {
ed9b544e 4074 zfree(dv);
5faa6025 4075 if (dstkey) {
fdcaae84 4076 if (deleteKey(c->db,dstkey))
4077 server.dirty++;
0d36ded0 4078 addReply(c,shared.czero);
5faa6025 4079 } else {
4080 addReply(c,shared.nullmultibulk);
4081 }
ed9b544e 4082 return;
4083 }
ed9b544e 4084 if (setobj->type != REDIS_SET) {
4085 zfree(dv);
c937aa89 4086 addReply(c,shared.wrongtypeerr);
ed9b544e 4087 return;
4088 }
4089 dv[j] = setobj->ptr;
4090 }
4091 /* Sort sets from the smallest to largest, this will improve our
4092 * algorithm's performace */
4093 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
4094
4095 /* The first thing we should output is the total number of elements...
4096 * since this is a multi-bulk write, but at this stage we don't know
4097 * the intersection set size, so we use a trick, append an empty object
4098 * to the output list and save the pointer to later modify it with the
4099 * right length */
4100 if (!dstkey) {
4101 lenobj = createObject(REDIS_STRING,NULL);
4102 addReply(c,lenobj);
4103 decrRefCount(lenobj);
4104 } else {
4105 /* If we have a target key where to store the resulting set
4106 * create this key with an empty set inside */
4107 dstset = createSetObject();
ed9b544e 4108 }
4109
4110 /* Iterate all the elements of the first (smallest) set, and test
4111 * the element against all the other sets, if at least one set does
4112 * not include the element it is discarded */
4113 di = dictGetIterator(dv[0]);
ed9b544e 4114
4115 while((de = dictNext(di)) != NULL) {
4116 robj *ele;
4117
4118 for (j = 1; j < setsnum; j++)
4119 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4120 if (j != setsnum)
4121 continue; /* at least one set does not contain the member */
4122 ele = dictGetEntryKey(de);
4123 if (!dstkey) {
942a3961 4124 addReplyBulkLen(c,ele);
ed9b544e 4125 addReply(c,ele);
4126 addReply(c,shared.crlf);
4127 cardinality++;
4128 } else {
4129 dictAdd(dstset->ptr,ele,NULL);
4130 incrRefCount(ele);
4131 }
4132 }
4133 dictReleaseIterator(di);
4134
83cdfe18
AG
4135 if (dstkey) {
4136 /* Store the resulting set into the target */
4137 deleteKey(c->db,dstkey);
4138 dictAdd(c->db->dict,dstkey,dstset);
4139 incrRefCount(dstkey);
4140 }
4141
40d224a9 4142 if (!dstkey) {
682ac724 4143 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
40d224a9 4144 } else {
682ac724 4145 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
03fd01c7 4146 dictSize((dict*)dstset->ptr)));
40d224a9 4147 server.dirty++;
4148 }
ed9b544e 4149 zfree(dv);
4150}
4151
4152static void sinterCommand(redisClient *c) {
4153 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4154}
4155
4156static void sinterstoreCommand(redisClient *c) {
4157 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4158}
4159
f4f56e1d 4160#define REDIS_OP_UNION 0
4161#define REDIS_OP_DIFF 1
4162
4163static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
40d224a9 4164 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4165 dictIterator *di;
4166 dictEntry *de;
f4f56e1d 4167 robj *dstset = NULL;
40d224a9 4168 int j, cardinality = 0;
4169
40d224a9 4170 for (j = 0; j < setsnum; j++) {
4171 robj *setobj;
4172
4173 setobj = dstkey ?
4174 lookupKeyWrite(c->db,setskeys[j]) :
4175 lookupKeyRead(c->db,setskeys[j]);
4176 if (!setobj) {
4177 dv[j] = NULL;
4178 continue;
4179 }
4180 if (setobj->type != REDIS_SET) {
4181 zfree(dv);
4182 addReply(c,shared.wrongtypeerr);
4183 return;
4184 }
4185 dv[j] = setobj->ptr;
4186 }
4187
4188 /* We need a temp set object to store our union. If the dstkey
4189 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4190 * this set object will be the resulting object to set into the target key*/
4191 dstset = createSetObject();
4192
40d224a9 4193 /* Iterate all the elements of all the sets, add every element a single
4194 * time to the result set */
4195 for (j = 0; j < setsnum; j++) {
51829ed3 4196 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
40d224a9 4197 if (!dv[j]) continue; /* non existing keys are like empty sets */
4198
4199 di = dictGetIterator(dv[j]);
40d224a9 4200
4201 while((de = dictNext(di)) != NULL) {
4202 robj *ele;
4203
4204 /* dictAdd will not add the same element multiple times */
4205 ele = dictGetEntryKey(de);
f4f56e1d 4206 if (op == REDIS_OP_UNION || j == 0) {
4207 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4208 incrRefCount(ele);
40d224a9 4209 cardinality++;
4210 }
f4f56e1d 4211 } else if (op == REDIS_OP_DIFF) {
4212 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4213 cardinality--;
4214 }
40d224a9 4215 }
4216 }
4217 dictReleaseIterator(di);
51829ed3
AG
4218
4219 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
40d224a9 4220 }
4221
f4f56e1d 4222 /* Output the content of the resulting set, if not in STORE mode */
4223 if (!dstkey) {
4224 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4225 di = dictGetIterator(dstset->ptr);
f4f56e1d 4226 while((de = dictNext(di)) != NULL) {
4227 robj *ele;
4228
4229 ele = dictGetEntryKey(de);
942a3961 4230 addReplyBulkLen(c,ele);
f4f56e1d 4231 addReply(c,ele);
4232 addReply(c,shared.crlf);
4233 }
4234 dictReleaseIterator(di);
83cdfe18
AG
4235 } else {
4236 /* If we have a target key where to store the resulting set
4237 * create this key with the result set inside */
4238 deleteKey(c->db,dstkey);
4239 dictAdd(c->db->dict,dstkey,dstset);
4240 incrRefCount(dstkey);
f4f56e1d 4241 }
4242
4243 /* Cleanup */
40d224a9 4244 if (!dstkey) {
40d224a9 4245 decrRefCount(dstset);
4246 } else {
682ac724 4247 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
03fd01c7 4248 dictSize((dict*)dstset->ptr)));
40d224a9 4249 server.dirty++;
4250 }
4251 zfree(dv);
4252}
4253
4254static void sunionCommand(redisClient *c) {
f4f56e1d 4255 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
40d224a9 4256}
4257
4258static void sunionstoreCommand(redisClient *c) {
f4f56e1d 4259 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4260}
4261
4262static void sdiffCommand(redisClient *c) {
4263 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4264}
4265
4266static void sdiffstoreCommand(redisClient *c) {
4267 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
40d224a9 4268}
4269
6b47e12e 4270/* ==================================== ZSets =============================== */
4271
4272/* ZSETs are ordered sets using two data structures to hold the same elements
4273 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4274 * data structure.
4275 *
4276 * The elements are added to an hash table mapping Redis objects to scores.
4277 * At the same time the elements are added to a skip list mapping scores
4278 * to Redis objects (so objects are sorted by scores in this "view"). */
4279
4280/* This skiplist implementation is almost a C translation of the original
4281 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4282 * Alternative to Balanced Trees", modified in three ways:
4283 * a) this implementation allows for repeated values.
4284 * b) the comparison is not just by key (our 'score') but by satellite data.
4285 * c) there is a back pointer, so it's a doubly linked list with the back
4286 * pointers being only at "level 1". This allows to traverse the list
4287 * from tail to head, useful for ZREVRANGE. */
4288
4289static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4290 zskiplistNode *zn = zmalloc(sizeof(*zn));
4291
4292 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4293 zn->score = score;
4294 zn->obj = obj;
4295 return zn;
4296}
4297
4298static zskiplist *zslCreate(void) {
4299 int j;
4300 zskiplist *zsl;
4301
4302 zsl = zmalloc(sizeof(*zsl));
4303 zsl->level = 1;
cc812361 4304 zsl->length = 0;
6b47e12e 4305 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4306 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4307 zsl->header->forward[j] = NULL;
e3870fab 4308 zsl->header->backward = NULL;
4309 zsl->tail = NULL;
6b47e12e 4310 return zsl;
4311}
4312
fd8ccf44 4313static void zslFreeNode(zskiplistNode *node) {
4314 decrRefCount(node->obj);
ad807e6f 4315 zfree(node->forward);
fd8ccf44 4316 zfree(node);
4317}
4318
4319static void zslFree(zskiplist *zsl) {
ad807e6f 4320 zskiplistNode *node = zsl->header->forward[0], *next;
fd8ccf44 4321
ad807e6f 4322 zfree(zsl->header->forward);
4323 zfree(zsl->header);
fd8ccf44 4324 while(node) {
599379dd 4325 next = node->forward[0];
fd8ccf44 4326 zslFreeNode(node);
4327 node = next;
4328 }
ad807e6f 4329 zfree(zsl);
fd8ccf44 4330}
4331
6b47e12e 4332static int zslRandomLevel(void) {
4333 int level = 1;
4334 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4335 level += 1;
4336 return level;
4337}
4338
4339static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4340 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4341 int i, level;
4342
4343 x = zsl->header;
4344 for (i = zsl->level-1; i >= 0; i--) {
9d60e6e4 4345 while (x->forward[i] &&
4346 (x->forward[i]->score < score ||
4347 (x->forward[i]->score == score &&
4348 compareStringObjects(x->forward[i]->obj,obj) < 0)))
6b47e12e 4349 x = x->forward[i];
4350 update[i] = x;
4351 }
6b47e12e 4352 /* we assume the key is not already inside, since we allow duplicated
4353 * scores, and the re-insertion of score and redis object should never
4354 * happpen since the caller of zslInsert() should test in the hash table
4355 * if the element is already inside or not. */
4356 level = zslRandomLevel();
4357 if (level > zsl->level) {
4358 for (i = zsl->level; i < level; i++)
4359 update[i] = zsl->header;
4360 zsl->level = level;
4361 }
4362 x = zslCreateNode(level,score,obj);
4363 for (i = 0; i < level; i++) {
4364 x->forward[i] = update[i]->forward[i];
4365 update[i]->forward[i] = x;
4366 }
bb975144 4367 x->backward = (update[0] == zsl->header) ? NULL : update[0];
e3870fab 4368 if (x->forward[0])
4369 x->forward[0]->backward = x;
4370 else
4371 zsl->tail = x;
cc812361 4372 zsl->length++;
6b47e12e 4373}
4374
50c55df5 4375/* Delete an element with matching score/object from the skiplist. */
fd8ccf44 4376static int zslDelete(zskiplist *zsl, double score, robj *obj) {
e197b441 4377 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4378 int i;
4379
4380 x = zsl->header;
4381 for (i = zsl->level-1; i >= 0; i--) {
9d60e6e4 4382 while (x->forward[i] &&
4383 (x->forward[i]->score < score ||
4384 (x->forward[i]->score == score &&
4385 compareStringObjects(x->forward[i]->obj,obj) < 0)))
e197b441 4386 x = x->forward[i];
4387 update[i] = x;
4388 }
4389 /* We may have multiple elements with the same score, what we need
4390 * is to find the element with both the right score and object. */
4391 x = x->forward[0];
50c55df5 4392 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
9d60e6e4 4393 for (i = 0; i < zsl->level; i++) {
4394 if (update[i]->forward[i] != x) break;
4395 update[i]->forward[i] = x->forward[i];
4396 }
4397 if (x->forward[0]) {
4398 x->forward[0]->backward = (x->backward == zsl->header) ?
4399 NULL : x->backward;
e197b441 4400 } else {
9d60e6e4 4401 zsl->tail = x->backward;
e197b441 4402 }
9d60e6e4 4403 zslFreeNode(x);
4404 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4405 zsl->level--;
4406 zsl->length--;
4407 return 1;
4408 } else {
4409 return 0; /* not found */
e197b441 4410 }
4411 return 0; /* not found */
fd8ccf44 4412}
4413
1807985b 4414/* Delete all the elements with score between min and max from the skiplist.
4415 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4416 * Note that this function takes the reference to the hash table view of the
4417 * sorted set, in order to remove the elements from the hash table too. */
4418static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4419 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4420 unsigned long removed = 0;
4421 int i;
4422
4423 x = zsl->header;
4424 for (i = zsl->level-1; i >= 0; i--) {
4425 while (x->forward[i] && x->forward[i]->score < min)
4426 x = x->forward[i];
4427 update[i] = x;
4428 }
4429 /* We may have multiple elements with the same score, what we need
4430 * is to find the element with both the right score and object. */
4431 x = x->forward[0];
4432 while (x && x->score <= max) {
4433 zskiplistNode *next;
4434
4435 for (i = 0; i < zsl->level; i++) {
4436 if (update[i]->forward[i] != x) break;
4437 update[i]->forward[i] = x->forward[i];
4438 }
4439 if (x->forward[0]) {
4440 x->forward[0]->backward = (x->backward == zsl->header) ?
4441 NULL : x->backward;
4442 } else {
4443 zsl->tail = x->backward;
4444 }
4445 next = x->forward[0];
4446 dictDelete(dict,x->obj);
4447 zslFreeNode(x);
4448 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4449 zsl->level--;
4450 zsl->length--;
4451 removed++;
4452 x = next;
4453 }
4454 return removed; /* not found */
4455}
4456
50c55df5 4457/* Find the first node having a score equal or greater than the specified one.
4458 * Returns NULL if there is no match. */
4459static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4460 zskiplistNode *x;
4461 int i;
4462
4463 x = zsl->header;
4464 for (i = zsl->level-1; i >= 0; i--) {
4465 while (x->forward[i] && x->forward[i]->score < score)
4466 x = x->forward[i];
4467 }
4468 /* We may have multiple elements with the same score, what we need
4469 * is to find the element with both the right score and object. */
4470 return x->forward[0];
4471}
4472
fd8ccf44 4473/* The actual Z-commands implementations */
4474
7db723ad 4475/* This generic command implements both ZADD and ZINCRBY.
e2665397 4476 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
7db723ad 4477 * the increment if the operation is a ZINCRBY (doincrement == 1). */
e2665397 4478static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
fd8ccf44 4479 robj *zsetobj;
4480 zset *zs;
4481 double *score;
4482
e2665397 4483 zsetobj = lookupKeyWrite(c->db,key);
fd8ccf44 4484 if (zsetobj == NULL) {
4485 zsetobj = createZsetObject();
e2665397 4486 dictAdd(c->db->dict,key,zsetobj);
4487 incrRefCount(key);
fd8ccf44 4488 } else {
4489 if (zsetobj->type != REDIS_ZSET) {
4490 addReply(c,shared.wrongtypeerr);
4491 return;
4492 }
4493 }
fd8ccf44 4494 zs = zsetobj->ptr;
e2665397 4495
7db723ad 4496 /* Ok now since we implement both ZADD and ZINCRBY here the code
e2665397 4497 * needs to handle the two different conditions. It's all about setting
4498 * '*score', that is, the new score to set, to the right value. */
4499 score = zmalloc(sizeof(double));
4500 if (doincrement) {
4501 dictEntry *de;
4502
4503 /* Read the old score. If the element was not present starts from 0 */
4504 de = dictFind(zs->dict,ele);
4505 if (de) {
4506 double *oldscore = dictGetEntryVal(de);
4507 *score = *oldscore + scoreval;
4508 } else {
4509 *score = scoreval;
4510 }
4511 } else {
4512 *score = scoreval;
4513 }
4514
4515 /* What follows is a simple remove and re-insert operation that is common
7db723ad 4516 * to both ZADD and ZINCRBY... */
e2665397 4517 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
fd8ccf44 4518 /* case 1: New element */
e2665397 4519 incrRefCount(ele); /* added to hash */
4520 zslInsert(zs->zsl,*score,ele);
4521 incrRefCount(ele); /* added to skiplist */
fd8ccf44 4522 server.dirty++;
e2665397 4523 if (doincrement)
e2665397 4524 addReplyDouble(c,*score);
91d71bfc 4525 else
4526 addReply(c,shared.cone);
fd8ccf44 4527 } else {
4528 dictEntry *de;
4529 double *oldscore;
4530
4531 /* case 2: Score update operation */
e2665397 4532 de = dictFind(zs->dict,ele);
dfc5e96c 4533 redisAssert(de != NULL);
fd8ccf44 4534 oldscore = dictGetEntryVal(de);
4535 if (*score != *oldscore) {
4536 int deleted;
4537
e2665397 4538 /* Remove and insert the element in the skip list with new score */
4539 deleted = zslDelete(zs->zsl,*oldscore,ele);
dfc5e96c 4540 redisAssert(deleted != 0);
e2665397 4541 zslInsert(zs->zsl,*score,ele);
4542 incrRefCount(ele);
4543 /* Update the score in the hash table */
4544 dictReplace(zs->dict,ele,score);
fd8ccf44 4545 server.dirty++;
2161a965 4546 } else {
4547 zfree(score);
fd8ccf44 4548 }
e2665397 4549 if (doincrement)
4550 addReplyDouble(c,*score);
4551 else
4552 addReply(c,shared.czero);
fd8ccf44 4553 }
4554}
4555
e2665397 4556static void zaddCommand(redisClient *c) {
4557 double scoreval;
4558
4559 scoreval = strtod(c->argv[2]->ptr,NULL);
4560 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4561}
4562
7db723ad 4563static void zincrbyCommand(redisClient *c) {
e2665397 4564 double scoreval;
4565
4566 scoreval = strtod(c->argv[2]->ptr,NULL);
4567 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4568}
4569
1b7106e7 4570static void zremCommand(redisClient *c) {
4571 robj *zsetobj;
4572 zset *zs;
4573
4574 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4575 if (zsetobj == NULL) {
4576 addReply(c,shared.czero);
4577 } else {
4578 dictEntry *de;
4579 double *oldscore;
4580 int deleted;
4581
4582 if (zsetobj->type != REDIS_ZSET) {
4583 addReply(c,shared.wrongtypeerr);
4584 return;
4585 }
4586 zs = zsetobj->ptr;
4587 de = dictFind(zs->dict,c->argv[2]);
4588 if (de == NULL) {
4589 addReply(c,shared.czero);
4590 return;
4591 }
4592 /* Delete from the skiplist */
4593 oldscore = dictGetEntryVal(de);
4594 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
dfc5e96c 4595 redisAssert(deleted != 0);
1b7106e7 4596
4597 /* Delete from the hash table */
4598 dictDelete(zs->dict,c->argv[2]);
4599 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4600 server.dirty++;
4601 addReply(c,shared.cone);
4602 }
4603}
4604
1807985b 4605static void zremrangebyscoreCommand(redisClient *c) {
4606 double min = strtod(c->argv[2]->ptr,NULL);
4607 double max = strtod(c->argv[3]->ptr,NULL);
4608 robj *zsetobj;
4609 zset *zs;
4610
4611 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4612 if (zsetobj == NULL) {
4613 addReply(c,shared.czero);
4614 } else {
4615 long deleted;
4616
4617 if (zsetobj->type != REDIS_ZSET) {
4618 addReply(c,shared.wrongtypeerr);
4619 return;
4620 }
4621 zs = zsetobj->ptr;
4622 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4623 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4624 server.dirty += deleted;
4625 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4626 }
4627}
4628
e3870fab 4629static void zrangeGenericCommand(redisClient *c, int reverse) {
cc812361 4630 robj *o;
4631 int start = atoi(c->argv[2]->ptr);
4632 int end = atoi(c->argv[3]->ptr);
752da584 4633 int withscores = 0;
4634
4635 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
4636 withscores = 1;
4637 } else if (c->argc >= 5) {
4638 addReply(c,shared.syntaxerr);
4639 return;
4640 }
cc812361 4641
4642 o = lookupKeyRead(c->db,c->argv[1]);
4643 if (o == NULL) {
4644 addReply(c,shared.nullmultibulk);
4645 } else {
4646 if (o->type != REDIS_ZSET) {
4647 addReply(c,shared.wrongtypeerr);
4648 } else {
4649 zset *zsetobj = o->ptr;
4650 zskiplist *zsl = zsetobj->zsl;
4651 zskiplistNode *ln;
4652
4653 int llen = zsl->length;
4654 int rangelen, j;
4655 robj *ele;
4656
4657 /* convert negative indexes */
4658 if (start < 0) start = llen+start;
4659 if (end < 0) end = llen+end;
4660 if (start < 0) start = 0;
4661 if (end < 0) end = 0;
4662
4663 /* indexes sanity checks */
4664 if (start > end || start >= llen) {
4665 /* Out of range start or start > end result in empty list */
4666 addReply(c,shared.emptymultibulk);
4667 return;
4668 }
4669 if (end >= llen) end = llen-1;
4670 rangelen = (end-start)+1;
4671
4672 /* Return the result in form of a multi-bulk reply */
e3870fab 4673 if (reverse) {
4674 ln = zsl->tail;
4675 while (start--)
4676 ln = ln->backward;
4677 } else {
4678 ln = zsl->header->forward[0];
4679 while (start--)
4680 ln = ln->forward[0];
4681 }
cc812361 4682
752da584 4683 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
4684 withscores ? (rangelen*2) : rangelen));
cc812361 4685 for (j = 0; j < rangelen; j++) {
0aad7a19 4686 ele = ln->obj;
cc812361 4687 addReplyBulkLen(c,ele);
4688 addReply(c,ele);
4689 addReply(c,shared.crlf);
752da584 4690 if (withscores)
4691 addReplyDouble(c,ln->score);
e3870fab 4692 ln = reverse ? ln->backward : ln->forward[0];
cc812361 4693 }
4694 }
4695 }
4696}
4697
e3870fab 4698static void zrangeCommand(redisClient *c) {
4699 zrangeGenericCommand(c,0);
4700}
4701
4702static void zrevrangeCommand(redisClient *c) {
4703 zrangeGenericCommand(c,1);
4704}
4705
50c55df5 4706static void zrangebyscoreCommand(redisClient *c) {
4707 robj *o;
4708 double min = strtod(c->argv[2]->ptr,NULL);
4709 double max = strtod(c->argv[3]->ptr,NULL);
80181f78 4710 int offset = 0, limit = -1;
4711
4712 if (c->argc != 4 && c->argc != 7) {
454d4e43 4713 addReplySds(c,
4714 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
80181f78 4715 return;
4716 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4717 addReply(c,shared.syntaxerr);
4718 return;
4719 } else if (c->argc == 7) {
4720 offset = atoi(c->argv[5]->ptr);
4721 limit = atoi(c->argv[6]->ptr);
0b13687c 4722 if (offset < 0) offset = 0;
80181f78 4723 }
50c55df5 4724
4725 o = lookupKeyRead(c->db,c->argv[1]);
4726 if (o == NULL) {
4727 addReply(c,shared.nullmultibulk);
4728 } else {
4729 if (o->type != REDIS_ZSET) {
4730 addReply(c,shared.wrongtypeerr);
4731 } else {
4732 zset *zsetobj = o->ptr;
4733 zskiplist *zsl = zsetobj->zsl;
4734 zskiplistNode *ln;
4735 robj *ele, *lenobj;
4736 unsigned int rangelen = 0;
4737
4738 /* Get the first node with the score >= min */
4739 ln = zslFirstWithScore(zsl,min);
4740 if (ln == NULL) {
4741 /* No element matching the speciifed interval */
4742 addReply(c,shared.emptymultibulk);
4743 return;
4744 }
4745
4746 /* We don't know in advance how many matching elements there
4747 * are in the list, so we push this object that will represent
4748 * the multi-bulk length in the output buffer, and will "fix"
4749 * it later */
4750 lenobj = createObject(REDIS_STRING,NULL);
4751 addReply(c,lenobj);
c74e7c77 4752 decrRefCount(lenobj);
50c55df5 4753
dbbc7285 4754 while(ln && ln->score <= max) {
80181f78 4755 if (offset) {
4756 offset--;
4757 ln = ln->forward[0];
4758 continue;
4759 }
4760 if (limit == 0) break;
50c55df5 4761 ele = ln->obj;
4762 addReplyBulkLen(c,ele);
4763 addReply(c,ele);
4764 addReply(c,shared.crlf);
4765 ln = ln->forward[0];
4766 rangelen++;
80181f78 4767 if (limit > 0) limit--;
50c55df5 4768 }
4769 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4770 }
4771 }
4772}
4773
3c41331e 4774static void zcardCommand(redisClient *c) {
e197b441 4775 robj *o;
4776 zset *zs;
4777
4778 o = lookupKeyRead(c->db,c->argv[1]);
4779 if (o == NULL) {
4780 addReply(c,shared.czero);
4781 return;
4782 } else {
4783 if (o->type != REDIS_ZSET) {
4784 addReply(c,shared.wrongtypeerr);
4785 } else {
4786 zs = o->ptr;
682ac724 4787 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
e197b441 4788 }
4789 }
4790}
4791
6e333bbe 4792static void zscoreCommand(redisClient *c) {
4793 robj *o;
4794 zset *zs;
4795
4796 o = lookupKeyRead(c->db,c->argv[1]);
4797 if (o == NULL) {
96d8b4ee 4798 addReply(c,shared.nullbulk);
6e333bbe 4799 return;
4800 } else {
4801 if (o->type != REDIS_ZSET) {
4802 addReply(c,shared.wrongtypeerr);
4803 } else {
4804 dictEntry *de;
4805
4806 zs = o->ptr;
4807 de = dictFind(zs->dict,c->argv[2]);
4808 if (!de) {
4809 addReply(c,shared.nullbulk);
4810 } else {
6e333bbe 4811 double *score = dictGetEntryVal(de);
4812
e2665397 4813 addReplyDouble(c,*score);
6e333bbe 4814 }
4815 }
4816 }
4817}
4818
6b47e12e 4819/* ========================= Non type-specific commands ==================== */
4820
ed9b544e 4821static void flushdbCommand(redisClient *c) {
ca37e9cd 4822 server.dirty += dictSize(c->db->dict);
3305306f 4823 dictEmpty(c->db->dict);
4824 dictEmpty(c->db->expires);
ed9b544e 4825 addReply(c,shared.ok);
ed9b544e 4826}
4827
4828static void flushallCommand(redisClient *c) {
ca37e9cd 4829 server.dirty += emptyDb();
ed9b544e 4830 addReply(c,shared.ok);
f78fd11b 4831 rdbSave(server.dbfilename);
ca37e9cd 4832 server.dirty++;
ed9b544e 4833}
4834
56906eef 4835static redisSortOperation *createSortOperation(int type, robj *pattern) {
ed9b544e 4836 redisSortOperation *so = zmalloc(sizeof(*so));
ed9b544e 4837 so->type = type;
4838 so->pattern = pattern;
4839 return so;
4840}
4841
4842/* Return the value associated to the key with a name obtained
4843 * substituting the first occurence of '*' in 'pattern' with 'subst' */
56906eef 4844static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
ed9b544e 4845 char *p;
4846 sds spat, ssub;
4847 robj keyobj;
4848 int prefixlen, sublen, postfixlen;
ed9b544e 4849 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4850 struct {
f1017b3f 4851 long len;
4852 long free;
ed9b544e 4853 char buf[REDIS_SORTKEY_MAX+1];
4854 } keyname;
4855
28173a49 4856 /* If the pattern is "#" return the substitution object itself in order
4857 * to implement the "SORT ... GET #" feature. */
4858 spat = pattern->ptr;
4859 if (spat[0] == '#' && spat[1] == '\0') {
4860 return subst;
4861 }
4862
4863 /* The substitution object may be specially encoded. If so we create
9d65a1bb 4864 * a decoded object on the fly. Otherwise getDecodedObject will just
4865 * increment the ref count, that we'll decrement later. */
4866 subst = getDecodedObject(subst);
942a3961 4867
ed9b544e 4868 ssub = subst->ptr;
4869 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4870 p = strchr(spat,'*');
ed5a857a 4871 if (!p) {
4872 decrRefCount(subst);
4873 return NULL;
4874 }
ed9b544e 4875
4876 prefixlen = p-spat;
4877 sublen = sdslen(ssub);
4878 postfixlen = sdslen(spat)-(prefixlen+1);
4879 memcpy(keyname.buf,spat,prefixlen);
4880 memcpy(keyname.buf+prefixlen,ssub,sublen);
4881 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4882 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4883 keyname.len = prefixlen+sublen+postfixlen;
4884
dfc5e96c 4885 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
942a3961 4886 decrRefCount(subst);
4887
a4d1ba9a 4888 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3305306f 4889 return lookupKeyRead(db,&keyobj);
ed9b544e 4890}
4891
4892/* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4893 * the additional parameter is not standard but a BSD-specific we have to
4894 * pass sorting parameters via the global 'server' structure */
4895static int sortCompare(const void *s1, const void *s2) {
4896 const redisSortObject *so1 = s1, *so2 = s2;
4897 int cmp;
4898
4899 if (!server.sort_alpha) {
4900 /* Numeric sorting. Here it's trivial as we precomputed scores */
4901 if (so1->u.score > so2->u.score) {
4902 cmp = 1;
4903 } else if (so1->u.score < so2->u.score) {
4904 cmp = -1;
4905 } else {
4906 cmp = 0;
4907 }
4908 } else {
4909 /* Alphanumeric sorting */
4910 if (server.sort_bypattern) {
4911 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4912 /* At least one compare object is NULL */
4913 if (so1->u.cmpobj == so2->u.cmpobj)
4914 cmp = 0;
4915 else if (so1->u.cmpobj == NULL)
4916 cmp = -1;
4917 else
4918 cmp = 1;
4919 } else {
4920 /* We have both the objects, use strcoll */
4921 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4922 }
4923 } else {
4924 /* Compare elements directly */
9d65a1bb 4925 robj *dec1, *dec2;
4926
4927 dec1 = getDecodedObject(so1->obj);
4928 dec2 = getDecodedObject(so2->obj);
4929 cmp = strcoll(dec1->ptr,dec2->ptr);
4930 decrRefCount(dec1);
4931 decrRefCount(dec2);
ed9b544e 4932 }
4933 }
4934 return server.sort_desc ? -cmp : cmp;
4935}
4936
4937/* The SORT command is the most complex command in Redis. Warning: this code
4938 * is optimized for speed and a bit less for readability */
4939static void sortCommand(redisClient *c) {
ed9b544e 4940 list *operations;
4941 int outputlen = 0;
4942 int desc = 0, alpha = 0;
4943 int limit_start = 0, limit_count = -1, start, end;
4944 int j, dontsort = 0, vectorlen;
4945 int getop = 0; /* GET operation counter */
443c6409 4946 robj *sortval, *sortby = NULL, *storekey = NULL;
ed9b544e 4947 redisSortObject *vector; /* Resulting vector to sort */
4948
4949 /* Lookup the key to sort. It must be of the right types */
3305306f 4950 sortval = lookupKeyRead(c->db,c->argv[1]);
4951 if (sortval == NULL) {
d922ae65 4952 addReply(c,shared.nullmultibulk);
ed9b544e 4953 return;
4954 }
a5eb649b 4955 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
4956 sortval->type != REDIS_ZSET)
4957 {
c937aa89 4958 addReply(c,shared.wrongtypeerr);
ed9b544e 4959 return;
4960 }
4961
4962 /* Create a list of operations to perform for every sorted element.
4963 * Operations can be GET/DEL/INCR/DECR */
4964 operations = listCreate();
092dac2a 4965 listSetFreeMethod(operations,zfree);
ed9b544e 4966 j = 2;
4967
4968 /* Now we need to protect sortval incrementing its count, in the future
4969 * SORT may have options able to overwrite/delete keys during the sorting
4970 * and the sorted key itself may get destroied */
4971 incrRefCount(sortval);
4972
4973 /* The SORT command has an SQL-alike syntax, parse it */
4974 while(j < c->argc) {
4975 int leftargs = c->argc-j-1;
4976 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4977 desc = 0;
4978 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4979 desc = 1;
4980 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4981 alpha = 1;
4982 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4983 limit_start = atoi(c->argv[j+1]->ptr);
4984 limit_count = atoi(c->argv[j+2]->ptr);
4985 j+=2;
443c6409 4986 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4987 storekey = c->argv[j+1];
4988 j++;
ed9b544e 4989 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4990 sortby = c->argv[j+1];
4991 /* If the BY pattern does not contain '*', i.e. it is constant,
4992 * we don't need to sort nor to lookup the weight keys. */
4993 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4994 j++;
4995 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4996 listAddNodeTail(operations,createSortOperation(
4997 REDIS_SORT_GET,c->argv[j+1]));
4998 getop++;
4999 j++;
ed9b544e 5000 } else {
5001 decrRefCount(sortval);
5002 listRelease(operations);
c937aa89 5003 addReply(c,shared.syntaxerr);
ed9b544e 5004 return;
5005 }
5006 j++;
5007 }
5008
5009 /* Load the sorting vector with all the objects to sort */
a5eb649b 5010 switch(sortval->type) {
5011 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
5012 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
5013 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
dfc5e96c 5014 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
a5eb649b 5015 }
ed9b544e 5016 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
ed9b544e 5017 j = 0;
a5eb649b 5018
ed9b544e 5019 if (sortval->type == REDIS_LIST) {
5020 list *list = sortval->ptr;
6208b3a7 5021 listNode *ln;
5022
5023 listRewind(list);
5024 while((ln = listYield(list))) {
ed9b544e 5025 robj *ele = ln->value;
5026 vector[j].obj = ele;
5027 vector[j].u.score = 0;
5028 vector[j].u.cmpobj = NULL;
ed9b544e 5029 j++;
5030 }
5031 } else {
a5eb649b 5032 dict *set;
ed9b544e 5033 dictIterator *di;
5034 dictEntry *setele;
5035
a5eb649b 5036 if (sortval->type == REDIS_SET) {
5037 set = sortval->ptr;
5038 } else {
5039 zset *zs = sortval->ptr;
5040 set = zs->dict;
5041 }
5042
ed9b544e 5043 di = dictGetIterator(set);
ed9b544e 5044 while((setele = dictNext(di)) != NULL) {
5045 vector[j].obj = dictGetEntryKey(setele);
5046 vector[j].u.score = 0;
5047 vector[j].u.cmpobj = NULL;
5048 j++;
5049 }
5050 dictReleaseIterator(di);
5051 }
dfc5e96c 5052 redisAssert(j == vectorlen);
ed9b544e 5053
5054 /* Now it's time to load the right scores in the sorting vector */
5055 if (dontsort == 0) {
5056 for (j = 0; j < vectorlen; j++) {
5057 if (sortby) {
5058 robj *byval;
5059
3305306f 5060 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
ed9b544e 5061 if (!byval || byval->type != REDIS_STRING) continue;
5062 if (alpha) {
9d65a1bb 5063 vector[j].u.cmpobj = getDecodedObject(byval);
ed9b544e 5064 } else {
942a3961 5065 if (byval->encoding == REDIS_ENCODING_RAW) {
5066 vector[j].u.score = strtod(byval->ptr,NULL);
5067 } else {
9d65a1bb 5068 /* Don't need to decode the object if it's
5069 * integer-encoded (the only encoding supported) so
5070 * far. We can just cast it */
f1017b3f 5071 if (byval->encoding == REDIS_ENCODING_INT) {
942a3961 5072 vector[j].u.score = (long)byval->ptr;
f1017b3f 5073 } else
dfc5e96c 5074 redisAssert(1 != 1);
942a3961 5075 }
ed9b544e 5076 }
5077 } else {
942a3961 5078 if (!alpha) {
5079 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
5080 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
5081 else {
5082 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
5083 vector[j].u.score = (long) vector[j].obj->ptr;
5084 else
dfc5e96c 5085 redisAssert(1 != 1);
942a3961 5086 }
5087 }
ed9b544e 5088 }
5089 }
5090 }
5091
5092 /* We are ready to sort the vector... perform a bit of sanity check
5093 * on the LIMIT option too. We'll use a partial version of quicksort. */
5094 start = (limit_start < 0) ? 0 : limit_start;
5095 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
5096 if (start >= vectorlen) {
5097 start = vectorlen-1;
5098 end = vectorlen-2;
5099 }
5100 if (end >= vectorlen) end = vectorlen-1;
5101
5102 if (dontsort == 0) {
5103 server.sort_desc = desc;
5104 server.sort_alpha = alpha;
5105 server.sort_bypattern = sortby ? 1 : 0;
5f5b9840 5106 if (sortby && (start != 0 || end != vectorlen-1))
5107 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
5108 else
5109 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
ed9b544e 5110 }
5111
5112 /* Send command output to the output buffer, performing the specified
5113 * GET/DEL/INCR/DECR operations if any. */
5114 outputlen = getop ? getop*(end-start+1) : end-start+1;
443c6409 5115 if (storekey == NULL) {
5116 /* STORE option not specified, sent the sorting result to client */
5117 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
5118 for (j = start; j <= end; j++) {
5119 listNode *ln;
5120 if (!getop) {
5121 addReplyBulkLen(c,vector[j].obj);
5122 addReply(c,vector[j].obj);
5123 addReply(c,shared.crlf);
5124 }
5125 listRewind(operations);
5126 while((ln = listYield(operations))) {
5127 redisSortOperation *sop = ln->value;
5128 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5129 vector[j].obj);
5130
5131 if (sop->type == REDIS_SORT_GET) {
5132 if (!val || val->type != REDIS_STRING) {
5133 addReply(c,shared.nullbulk);
5134 } else {
5135 addReplyBulkLen(c,val);
5136 addReply(c,val);
5137 addReply(c,shared.crlf);
5138 }
5139 } else {
dfc5e96c 5140 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
443c6409 5141 }
5142 }
ed9b544e 5143 }
443c6409 5144 } else {
5145 robj *listObject = createListObject();
5146 list *listPtr = (list*) listObject->ptr;
5147
5148 /* STORE option specified, set the sorting result as a List object */
5149 for (j = start; j <= end; j++) {
5150 listNode *ln;
5151 if (!getop) {
5152 listAddNodeTail(listPtr,vector[j].obj);
5153 incrRefCount(vector[j].obj);
5154 }
5155 listRewind(operations);
5156 while((ln = listYield(operations))) {
5157 redisSortOperation *sop = ln->value;
5158 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5159 vector[j].obj);
5160
5161 if (sop->type == REDIS_SORT_GET) {
5162 if (!val || val->type != REDIS_STRING) {
5163 listAddNodeTail(listPtr,createStringObject("",0));
5164 } else {
5165 listAddNodeTail(listPtr,val);
5166 incrRefCount(val);
5167 }
ed9b544e 5168 } else {
dfc5e96c 5169 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
ed9b544e 5170 }
ed9b544e 5171 }
ed9b544e 5172 }
121796f7 5173 if (dictReplace(c->db->dict,storekey,listObject)) {
5174 incrRefCount(storekey);
5175 }
443c6409 5176 /* Note: we add 1 because the DB is dirty anyway since even if the
5177 * SORT result is empty a new key is set and maybe the old content
5178 * replaced. */
5179 server.dirty += 1+outputlen;
5180 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
ed9b544e 5181 }
5182
5183 /* Cleanup */
5184 decrRefCount(sortval);
5185 listRelease(operations);
5186 for (j = 0; j < vectorlen; j++) {
5187 if (sortby && alpha && vector[j].u.cmpobj)
5188 decrRefCount(vector[j].u.cmpobj);
5189 }
5190 zfree(vector);
5191}
5192
1c85b79f 5193/* Create the string returned by the INFO command. This is decoupled
5194 * by the INFO command itself as we need to report the same information
5195 * on memory corruption problems. */
5196static sds genRedisInfoString(void) {
ed9b544e 5197 sds info;
5198 time_t uptime = time(NULL)-server.stat_starttime;
c3cb078d 5199 int j;
ed9b544e 5200
5201 info = sdscatprintf(sdsempty(),
5202 "redis_version:%s\r\n"
f1017b3f 5203 "arch_bits:%s\r\n"
7a932b74 5204 "multiplexing_api:%s\r\n"
682ac724 5205 "uptime_in_seconds:%ld\r\n"
5206 "uptime_in_days:%ld\r\n"
ed9b544e 5207 "connected_clients:%d\r\n"
5208 "connected_slaves:%d\r\n"
5fba9f71 5209 "used_memory:%zu\r\n"
ed9b544e 5210 "changes_since_last_save:%lld\r\n"
be2bb6b0 5211 "bgsave_in_progress:%d\r\n"
682ac724 5212 "last_save_time:%ld\r\n"
b3fad521 5213 "bgrewriteaof_in_progress:%d\r\n"
ed9b544e 5214 "total_connections_received:%lld\r\n"
5215 "total_commands_processed:%lld\r\n"
a0f643ea 5216 "role:%s\r\n"
ed9b544e 5217 ,REDIS_VERSION,
f1017b3f 5218 (sizeof(long) == 8) ? "64" : "32",
7a932b74 5219 aeGetApiName(),
a0f643ea 5220 uptime,
5221 uptime/(3600*24),
ed9b544e 5222 listLength(server.clients)-listLength(server.slaves),
5223 listLength(server.slaves),
5224 server.usedmemory,
5225 server.dirty,
9d65a1bb 5226 server.bgsavechildpid != -1,
ed9b544e 5227 server.lastsave,
b3fad521 5228 server.bgrewritechildpid != -1,
ed9b544e 5229 server.stat_numconnections,
5230 server.stat_numcommands,
a0f643ea 5231 server.masterhost == NULL ? "master" : "slave"
ed9b544e 5232 );
a0f643ea 5233 if (server.masterhost) {
5234 info = sdscatprintf(info,
5235 "master_host:%s\r\n"
5236 "master_port:%d\r\n"
5237 "master_link_status:%s\r\n"
5238 "master_last_io_seconds_ago:%d\r\n"
5239 ,server.masterhost,
5240 server.masterport,
5241 (server.replstate == REDIS_REPL_CONNECTED) ?
5242 "up" : "down",
f72b934d 5243 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
a0f643ea 5244 );
5245 }
c3cb078d 5246 for (j = 0; j < server.dbnum; j++) {
5247 long long keys, vkeys;
5248
5249 keys = dictSize(server.db[j].dict);
5250 vkeys = dictSize(server.db[j].expires);
5251 if (keys || vkeys) {
9d65a1bb 5252 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
c3cb078d 5253 j, keys, vkeys);
5254 }
5255 }
1c85b79f 5256 return info;
5257}
5258
5259static void infoCommand(redisClient *c) {
5260 sds info = genRedisInfoString();
83c6a618 5261 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5262 (unsigned long)sdslen(info)));
ed9b544e 5263 addReplySds(c,info);
70003d28 5264 addReply(c,shared.crlf);
ed9b544e 5265}
5266
3305306f 5267static void monitorCommand(redisClient *c) {
5268 /* ignore MONITOR if aleady slave or in monitor mode */
5269 if (c->flags & REDIS_SLAVE) return;
5270
5271 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5272 c->slaveseldb = 0;
6b47e12e 5273 listAddNodeTail(server.monitors,c);
3305306f 5274 addReply(c,shared.ok);
5275}
5276
5277/* ================================= Expire ================================= */
5278static int removeExpire(redisDb *db, robj *key) {
5279 if (dictDelete(db->expires,key) == DICT_OK) {
5280 return 1;
5281 } else {
5282 return 0;
5283 }
5284}
5285
5286static int setExpire(redisDb *db, robj *key, time_t when) {
5287 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5288 return 0;
5289 } else {
5290 incrRefCount(key);
5291 return 1;
5292 }
5293}
5294
bb32ede5 5295/* Return the expire time of the specified key, or -1 if no expire
5296 * is associated with this key (i.e. the key is non volatile) */
5297static time_t getExpire(redisDb *db, robj *key) {
5298 dictEntry *de;
5299
5300 /* No expire? return ASAP */
5301 if (dictSize(db->expires) == 0 ||
5302 (de = dictFind(db->expires,key)) == NULL) return -1;
5303
5304 return (time_t) dictGetEntryVal(de);
5305}
5306
3305306f 5307static int expireIfNeeded(redisDb *db, robj *key) {
5308 time_t when;
5309 dictEntry *de;
5310
5311 /* No expire? return ASAP */
5312 if (dictSize(db->expires) == 0 ||
5313 (de = dictFind(db->expires,key)) == NULL) return 0;
5314
5315 /* Lookup the expire */
5316 when = (time_t) dictGetEntryVal(de);
5317 if (time(NULL) <= when) return 0;
5318
5319 /* Delete the key */
5320 dictDelete(db->expires,key);
5321 return dictDelete(db->dict,key) == DICT_OK;
5322}
5323
5324static int deleteIfVolatile(redisDb *db, robj *key) {
5325 dictEntry *de;
5326
5327 /* No expire? return ASAP */
5328 if (dictSize(db->expires) == 0 ||
5329 (de = dictFind(db->expires,key)) == NULL) return 0;
5330
5331 /* Delete the key */
0c66a471 5332 server.dirty++;
3305306f 5333 dictDelete(db->expires,key);
5334 return dictDelete(db->dict,key) == DICT_OK;
5335}
5336
802e8373 5337static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
3305306f 5338 dictEntry *de;
3305306f 5339
802e8373 5340 de = dictFind(c->db->dict,key);
3305306f 5341 if (de == NULL) {
5342 addReply(c,shared.czero);
5343 return;
5344 }
43e5ccdf 5345 if (seconds < 0) {
5346 if (deleteKey(c->db,key)) server.dirty++;
5347 addReply(c, shared.cone);
3305306f 5348 return;
5349 } else {
5350 time_t when = time(NULL)+seconds;
802e8373 5351 if (setExpire(c->db,key,when)) {
3305306f 5352 addReply(c,shared.cone);
77423026 5353 server.dirty++;
5354 } else {
3305306f 5355 addReply(c,shared.czero);
77423026 5356 }
3305306f 5357 return;
5358 }
5359}
5360
802e8373 5361static void expireCommand(redisClient *c) {
5362 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5363}
5364
5365static void expireatCommand(redisClient *c) {
5366 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5367}
5368
fd88489a 5369static void ttlCommand(redisClient *c) {
5370 time_t expire;
5371 int ttl = -1;
5372
5373 expire = getExpire(c->db,c->argv[1]);
5374 if (expire != -1) {
5375 ttl = (int) (expire-time(NULL));
5376 if (ttl < 0) ttl = -1;
5377 }
5378 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5379}
5380
6e469882 5381/* ================================ MULTI/EXEC ============================== */
5382
5383/* Client state initialization for MULTI/EXEC */
5384static void initClientMultiState(redisClient *c) {
5385 c->mstate.commands = NULL;
5386 c->mstate.count = 0;
5387}
5388
5389/* Release all the resources associated with MULTI/EXEC state */
5390static void freeClientMultiState(redisClient *c) {
5391 int j;
5392
5393 for (j = 0; j < c->mstate.count; j++) {
5394 int i;
5395 multiCmd *mc = c->mstate.commands+j;
5396
5397 for (i = 0; i < mc->argc; i++)
5398 decrRefCount(mc->argv[i]);
5399 zfree(mc->argv);
5400 }
5401 zfree(c->mstate.commands);
5402}
5403
5404/* Add a new command into the MULTI commands queue */
5405static void queueMultiCommand(redisClient *c, struct redisCommand *cmd) {
5406 multiCmd *mc;
5407 int j;
5408
5409 c->mstate.commands = zrealloc(c->mstate.commands,
5410 sizeof(multiCmd)*(c->mstate.count+1));
5411 mc = c->mstate.commands+c->mstate.count;
5412 mc->cmd = cmd;
5413 mc->argc = c->argc;
5414 mc->argv = zmalloc(sizeof(robj*)*c->argc);
5415 memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
5416 for (j = 0; j < c->argc; j++)
5417 incrRefCount(mc->argv[j]);
5418 c->mstate.count++;
5419}
5420
5421static void multiCommand(redisClient *c) {
5422 c->flags |= REDIS_MULTI;
36c548f0 5423 addReply(c,shared.ok);
6e469882 5424}
5425
5426static void execCommand(redisClient *c) {
5427 int j;
5428 robj **orig_argv;
5429 int orig_argc;
5430
5431 if (!(c->flags & REDIS_MULTI)) {
5432 addReplySds(c,sdsnew("-ERR EXEC without MULTI\r\n"));
5433 return;
5434 }
5435
5436 orig_argv = c->argv;
5437 orig_argc = c->argc;
5438 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
5439 for (j = 0; j < c->mstate.count; j++) {
5440 c->argc = c->mstate.commands[j].argc;
5441 c->argv = c->mstate.commands[j].argv;
5442 call(c,c->mstate.commands[j].cmd);
5443 }
5444 c->argv = orig_argv;
5445 c->argc = orig_argc;
5446 freeClientMultiState(c);
5447 initClientMultiState(c);
5448 c->flags &= (~REDIS_MULTI);
5449}
5450
4409877e 5451/* =========================== Blocking Operations ========================= */
5452
5453/* Currently Redis blocking operations support is limited to list POP ops,
5454 * so the current implementation is not fully generic, but it is also not
5455 * completely specific so it will not require a rewrite to support new
5456 * kind of blocking operations in the future.
5457 *
5458 * Still it's important to note that list blocking operations can be already
5459 * used as a notification mechanism in order to implement other blocking
5460 * operations at application level, so there must be a very strong evidence
5461 * of usefulness and generality before new blocking operations are implemented.
5462 *
5463 * This is how the current blocking POP works, we use BLPOP as example:
5464 * - If the user calls BLPOP and the key exists and contains a non empty list
5465 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5466 * if there is not to block.
5467 * - If instead BLPOP is called and the key does not exists or the list is
5468 * empty we need to block. In order to do so we remove the notification for
5469 * new data to read in the client socket (so that we'll not serve new
5470 * requests if the blocking request is not served). Also we put the client
95242ab5 5471 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
4409877e 5472 * blocking for this keys.
5473 * - If a PUSH operation against a key with blocked clients waiting is
5474 * performed, we serve the first in the list: basically instead to push
5475 * the new element inside the list we return it to the (first / oldest)
5476 * blocking client, unblock the client, and remove it form the list.
5477 *
5478 * The above comment and the source code should be enough in order to understand
5479 * the implementation and modify / fix it later.
5480 */
5481
5482/* Set a client in blocking mode for the specified key, with the specified
5483 * timeout */
5484static void blockForKey(redisClient *c, robj *key, time_t timeout) {
5485 dictEntry *de;
5486 list *l;
5487
5488 c->blockingkey = key;
5489 incrRefCount(key);
5490 c->blockingto = timeout;
5491 de = dictFind(c->db->blockingkeys,key);
5492 if (de == NULL) {
5493 int retval;
5494
5495 l = listCreate();
2affc3ed 5496 retval = dictAdd(c->db->blockingkeys,key,l);
9fe33a0e 5497 incrRefCount(key);
4409877e 5498 assert(retval == DICT_OK);
5499 } else {
5500 l = dictGetEntryVal(de);
5501 }
5502 listAddNodeTail(l,c);
5503 c->flags |= REDIS_BLOCKED;
5504 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
5505}
5506
5507/* Unblock a client that's waiting in a blocking operation such as BLPOP */
5508static void unblockClient(redisClient *c) {
5509 dictEntry *de;
5510 list *l;
5511
5512 /* Remove this client from the list of clients waiting for this key. */
5513 assert(c->blockingkey != NULL);
5514 de = dictFind(c->db->blockingkeys,c->blockingkey);
5515 assert(de != NULL);
5516 l = dictGetEntryVal(de);
5517 listDelNode(l,listSearchKey(l,c));
5518 /* If the list is empty we need to remove it to avoid wasting memory */
5519 if (listLength(l) == 0)
5520 dictDelete(c->db->blockingkeys,c->blockingkey);
5521 /* Finally set the right flags in the client structure */
5522 decrRefCount(c->blockingkey);
5523 c->blockingkey = NULL;
5524 c->flags &= (~REDIS_BLOCKED);
5525 /* Ok now we are ready to get read events from socket, note that we
5526 * can't trap errors here as it's possible that unblockClients() is
5527 * called from freeClient() itself, and the only thing we can do
5528 * if we failed to register the READABLE event is to kill the client.
5529 * Still the following function should never fail in the real world as
5530 * we are sure the file descriptor is sane, and we exit on out of mem. */
5531 aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c);
5532 /* As a final step we want to process data if there is some command waiting
5533 * in the input buffer. Note that this is safe even if unblockClient()
5534 * gets called from freeClient() because freeClient() will be smart
5535 * enough to call this function *after* c->querybuf was set to NULL. */
5536 if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
5537}
5538
5539/* This should be called from any function PUSHing into lists.
5540 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5541 * 'ele' is the element pushed.
5542 *
5543 * If the function returns 0 there was no client waiting for a list push
5544 * against this key.
5545 *
5546 * If the function returns 1 there was a client waiting for a list push
5547 * against this key, the element was passed to this client thus it's not
5548 * needed to actually add it to the list and the caller should return asap. */
5549static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
5550 struct dictEntry *de;
5551 redisClient *receiver;
5552 list *l;
5553 listNode *ln;
5554
5555 de = dictFind(c->db->blockingkeys,key);
5556 if (de == NULL) return 0;
5557 l = dictGetEntryVal(de);
5558 ln = listFirst(l);
5559 assert(ln != NULL);
5560 receiver = ln->value;
4409877e 5561
5562 addReplyBulkLen(receiver,ele);
5563 addReply(receiver,ele);
5564 addReply(receiver,shared.crlf);
5565 unblockClient(receiver);
5566 return 1;
5567}
5568
5569/* Blocking RPOP/LPOP */
5570static void blockingPopGenericCommand(redisClient *c, int where) {
5571 robj *o;
5572 time_t timeout;
5573
5574 o = lookupKeyWrite(c->db,c->argv[1]);
5575 if (o != NULL) {
5576 if (o->type != REDIS_LIST) {
5577 popGenericCommand(c,where);
5578 return;
5579 } else {
5580 list *list = o->ptr;
5581 if (listLength(list) != 0) {
5582 /* If the list contains elements fall back to the usual
5583 * non-blocking POP operation */
5584 popGenericCommand(c,where);
5585 return;
5586 }
5587 }
5588 }
5589 /* If the list is empty or the key does not exists we must block */
5590 timeout = strtol(c->argv[2]->ptr,NULL,10);
5591 if (timeout > 0) timeout += time(NULL);
5592 blockForKey(c,c->argv[1],timeout);
5593}
5594
5595static void blpopCommand(redisClient *c) {
5596 blockingPopGenericCommand(c,REDIS_HEAD);
5597}
5598
5599static void brpopCommand(redisClient *c) {
5600 blockingPopGenericCommand(c,REDIS_TAIL);
5601}
5602
ed9b544e 5603/* =============================== Replication ============================= */
5604
a4d1ba9a 5605static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
ed9b544e 5606 ssize_t nwritten, ret = size;
5607 time_t start = time(NULL);
5608
5609 timeout++;
5610 while(size) {
5611 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5612 nwritten = write(fd,ptr,size);
5613 if (nwritten == -1) return -1;
5614 ptr += nwritten;
5615 size -= nwritten;
5616 }
5617 if ((time(NULL)-start) > timeout) {
5618 errno = ETIMEDOUT;
5619 return -1;
5620 }
5621 }
5622 return ret;
5623}
5624
a4d1ba9a 5625static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
ed9b544e 5626 ssize_t nread, totread = 0;
5627 time_t start = time(NULL);
5628
5629 timeout++;
5630 while(size) {
5631 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5632 nread = read(fd,ptr,size);
5633 if (nread == -1) return -1;
5634 ptr += nread;
5635 size -= nread;
5636 totread += nread;
5637 }
5638 if ((time(NULL)-start) > timeout) {
5639 errno = ETIMEDOUT;
5640 return -1;
5641 }
5642 }
5643 return totread;
5644}
5645
5646static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5647 ssize_t nread = 0;
5648
5649 size--;
5650 while(size) {
5651 char c;
5652
5653 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5654 if (c == '\n') {
5655 *ptr = '\0';
5656 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5657 return nread;
5658 } else {
5659 *ptr++ = c;
5660 *ptr = '\0';
5661 nread++;
5662 }
5663 }
5664 return nread;
5665}
5666
5667static void syncCommand(redisClient *c) {
40d224a9 5668 /* ignore SYNC if aleady slave or in monitor mode */
5669 if (c->flags & REDIS_SLAVE) return;
5670
5671 /* SYNC can't be issued when the server has pending data to send to
5672 * the client about already issued commands. We need a fresh reply
5673 * buffer registering the differences between the BGSAVE and the current
5674 * dataset, so that we can copy to other slaves if needed. */
5675 if (listLength(c->reply) != 0) {
5676 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5677 return;
5678 }
5679
5680 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5681 /* Here we need to check if there is a background saving operation
5682 * in progress, or if it is required to start one */
9d65a1bb 5683 if (server.bgsavechildpid != -1) {
40d224a9 5684 /* Ok a background save is in progress. Let's check if it is a good
5685 * one for replication, i.e. if there is another slave that is
5686 * registering differences since the server forked to save */
5687 redisClient *slave;
5688 listNode *ln;
5689
6208b3a7 5690 listRewind(server.slaves);
5691 while((ln = listYield(server.slaves))) {
40d224a9 5692 slave = ln->value;
5693 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
40d224a9 5694 }
5695 if (ln) {
5696 /* Perfect, the server is already registering differences for
5697 * another slave. Set the right state, and copy the buffer. */
5698 listRelease(c->reply);
5699 c->reply = listDup(slave->reply);
40d224a9 5700 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5701 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5702 } else {
5703 /* No way, we need to wait for the next BGSAVE in order to
5704 * register differences */
5705 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5706 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5707 }
5708 } else {
5709 /* Ok we don't have a BGSAVE in progress, let's start one */
5710 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5711 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5712 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5713 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5714 return;
5715 }
5716 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5717 }
6208b3a7 5718 c->repldbfd = -1;
40d224a9 5719 c->flags |= REDIS_SLAVE;
5720 c->slaveseldb = 0;
6b47e12e 5721 listAddNodeTail(server.slaves,c);
40d224a9 5722 return;
5723}
5724
6208b3a7 5725static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5726 redisClient *slave = privdata;
5727 REDIS_NOTUSED(el);
5728 REDIS_NOTUSED(mask);
5729 char buf[REDIS_IOBUF_LEN];
5730 ssize_t nwritten, buflen;
5731
5732 if (slave->repldboff == 0) {
5733 /* Write the bulk write count before to transfer the DB. In theory here
5734 * we don't know how much room there is in the output buffer of the
5735 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5736 * operations) will never be smaller than the few bytes we need. */
5737 sds bulkcount;
5738
5739 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5740 slave->repldbsize);
5741 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5742 {
5743 sdsfree(bulkcount);
5744 freeClient(slave);
5745 return;
5746 }
5747 sdsfree(bulkcount);
5748 }
5749 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5750 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5751 if (buflen <= 0) {
5752 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5753 (buflen == 0) ? "premature EOF" : strerror(errno));
5754 freeClient(slave);
5755 return;
5756 }
5757 if ((nwritten = write(fd,buf,buflen)) == -1) {
5758 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5759 strerror(errno));
5760 freeClient(slave);
5761 return;
5762 }
5763 slave->repldboff += nwritten;
5764 if (slave->repldboff == slave->repldbsize) {
5765 close(slave->repldbfd);
5766 slave->repldbfd = -1;
5767 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5768 slave->replstate = REDIS_REPL_ONLINE;
5769 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
266373b2 5770 sendReplyToClient, slave) == AE_ERR) {
6208b3a7 5771 freeClient(slave);
5772 return;
5773 }
5774 addReplySds(slave,sdsempty());
5775 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5776 }
5777}
ed9b544e 5778
a3b21203 5779/* This function is called at the end of every backgrond saving.
5780 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5781 * otherwise REDIS_ERR is passed to the function.
5782 *
5783 * The goal of this function is to handle slaves waiting for a successful
5784 * background saving in order to perform non-blocking synchronization. */
5785static void updateSlavesWaitingBgsave(int bgsaveerr) {
6208b3a7 5786 listNode *ln;
5787 int startbgsave = 0;
ed9b544e 5788
6208b3a7 5789 listRewind(server.slaves);
5790 while((ln = listYield(server.slaves))) {
5791 redisClient *slave = ln->value;
ed9b544e 5792
6208b3a7 5793 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5794 startbgsave = 1;
5795 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5796 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
dde65f3f 5797 struct redis_stat buf;
6208b3a7 5798
5799 if (bgsaveerr != REDIS_OK) {
5800 freeClient(slave);
5801 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5802 continue;
5803 }
5804 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
dde65f3f 5805 redis_fstat(slave->repldbfd,&buf) == -1) {
6208b3a7 5806 freeClient(slave);
5807 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5808 continue;
5809 }
5810 slave->repldboff = 0;
5811 slave->repldbsize = buf.st_size;
5812 slave->replstate = REDIS_REPL_SEND_BULK;
5813 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
266373b2 5814 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
6208b3a7 5815 freeClient(slave);
5816 continue;
5817 }
5818 }
ed9b544e 5819 }
6208b3a7 5820 if (startbgsave) {
5821 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5822 listRewind(server.slaves);
5823 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5824 while((ln = listYield(server.slaves))) {
5825 redisClient *slave = ln->value;
ed9b544e 5826
6208b3a7 5827 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5828 freeClient(slave);
5829 }
5830 }
5831 }
ed9b544e 5832}
5833
5834static int syncWithMaster(void) {
d0ccebcf 5835 char buf[1024], tmpfile[256], authcmd[1024];
ed9b544e 5836 int dumpsize;
5837 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5838 int dfd;
5839
5840 if (fd == -1) {
5841 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5842 strerror(errno));
5843 return REDIS_ERR;
5844 }
d0ccebcf 5845
5846 /* AUTH with the master if required. */
5847 if(server.masterauth) {
5848 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5849 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5850 close(fd);
5851 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5852 strerror(errno));
5853 return REDIS_ERR;
5854 }
5855 /* Read the AUTH result. */
5856 if (syncReadLine(fd,buf,1024,3600) == -1) {
5857 close(fd);
5858 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5859 strerror(errno));
5860 return REDIS_ERR;
5861 }
5862 if (buf[0] != '+') {
5863 close(fd);
5864 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5865 return REDIS_ERR;
5866 }
5867 }
5868
ed9b544e 5869 /* Issue the SYNC command */
5870 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5871 close(fd);
5872 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5873 strerror(errno));
5874 return REDIS_ERR;
5875 }
5876 /* Read the bulk write count */
8c4d91fc 5877 if (syncReadLine(fd,buf,1024,3600) == -1) {
ed9b544e 5878 close(fd);
5879 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5880 strerror(errno));
5881 return REDIS_ERR;
5882 }
4aa701c1 5883 if (buf[0] != '$') {
5884 close(fd);
5885 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5886 return REDIS_ERR;
5887 }
c937aa89 5888 dumpsize = atoi(buf+1);
ed9b544e 5889 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5890 /* Read the bulk write data on a temp file */
5891 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5892 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5893 if (dfd == -1) {
5894 close(fd);
5895 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5896 return REDIS_ERR;
5897 }
5898 while(dumpsize) {
5899 int nread, nwritten;
5900
5901 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5902 if (nread == -1) {
5903 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5904 strerror(errno));
5905 close(fd);
5906 close(dfd);
5907 return REDIS_ERR;
5908 }
5909 nwritten = write(dfd,buf,nread);
5910 if (nwritten == -1) {
5911 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5912 close(fd);
5913 close(dfd);
5914 return REDIS_ERR;
5915 }
5916 dumpsize -= nread;
5917 }
5918 close(dfd);
5919 if (rename(tmpfile,server.dbfilename) == -1) {
5920 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5921 unlink(tmpfile);
5922 close(fd);
5923 return REDIS_ERR;
5924 }
5925 emptyDb();
f78fd11b 5926 if (rdbLoad(server.dbfilename) != REDIS_OK) {
ed9b544e 5927 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5928 close(fd);
5929 return REDIS_ERR;
5930 }
5931 server.master = createClient(fd);
5932 server.master->flags |= REDIS_MASTER;
179b3952 5933 server.master->authenticated = 1;
ed9b544e 5934 server.replstate = REDIS_REPL_CONNECTED;
5935 return REDIS_OK;
5936}
5937
321b0e13 5938static void slaveofCommand(redisClient *c) {
5939 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5940 !strcasecmp(c->argv[2]->ptr,"one")) {
5941 if (server.masterhost) {
5942 sdsfree(server.masterhost);
5943 server.masterhost = NULL;
5944 if (server.master) freeClient(server.master);
5945 server.replstate = REDIS_REPL_NONE;
5946 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5947 }
5948 } else {
5949 sdsfree(server.masterhost);
5950 server.masterhost = sdsdup(c->argv[1]->ptr);
5951 server.masterport = atoi(c->argv[2]->ptr);
5952 if (server.master) freeClient(server.master);
5953 server.replstate = REDIS_REPL_CONNECT;
5954 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5955 server.masterhost, server.masterport);
5956 }
5957 addReply(c,shared.ok);
5958}
5959
3fd78bcd 5960/* ============================ Maxmemory directive ======================== */
5961
5962/* This function gets called when 'maxmemory' is set on the config file to limit
5963 * the max memory used by the server, and we are out of memory.
5964 * This function will try to, in order:
5965 *
5966 * - Free objects from the free list
5967 * - Try to remove keys with an EXPIRE set
5968 *
5969 * It is not possible to free enough memory to reach used-memory < maxmemory
5970 * the server will start refusing commands that will enlarge even more the
5971 * memory usage.
5972 */
5973static void freeMemoryIfNeeded(void) {
5974 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5975 if (listLength(server.objfreelist)) {
5976 robj *o;
5977
5978 listNode *head = listFirst(server.objfreelist);
5979 o = listNodeValue(head);
5980 listDelNode(server.objfreelist,head);
5981 zfree(o);
5982 } else {
5983 int j, k, freed = 0;
5984
5985 for (j = 0; j < server.dbnum; j++) {
5986 int minttl = -1;
5987 robj *minkey = NULL;
5988 struct dictEntry *de;
5989
5990 if (dictSize(server.db[j].expires)) {
5991 freed = 1;
5992 /* From a sample of three keys drop the one nearest to
5993 * the natural expire */
5994 for (k = 0; k < 3; k++) {
5995 time_t t;
5996
5997 de = dictGetRandomKey(server.db[j].expires);
5998 t = (time_t) dictGetEntryVal(de);
5999 if (minttl == -1 || t < minttl) {
6000 minkey = dictGetEntryKey(de);
6001 minttl = t;
6002 }
6003 }
6004 deleteKey(server.db+j,minkey);
6005 }
6006 }
6007 if (!freed) return; /* nothing to free... */
6008 }
6009 }
6010}
6011
f80dff62 6012/* ============================== Append Only file ========================== */
6013
6014static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
6015 sds buf = sdsempty();
6016 int j;
6017 ssize_t nwritten;
6018 time_t now;
6019 robj *tmpargv[3];
6020
6021 /* The DB this command was targetting is not the same as the last command
6022 * we appendend. To issue a SELECT command is needed. */
6023 if (dictid != server.appendseldb) {
6024 char seldb[64];
6025
6026 snprintf(seldb,sizeof(seldb),"%d",dictid);
682ac724 6027 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
83c6a618 6028 (unsigned long)strlen(seldb),seldb);
f80dff62 6029 server.appendseldb = dictid;
6030 }
6031
6032 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6033 * EXPIREs into EXPIREATs calls */
6034 if (cmd->proc == expireCommand) {
6035 long when;
6036
6037 tmpargv[0] = createStringObject("EXPIREAT",8);
6038 tmpargv[1] = argv[1];
6039 incrRefCount(argv[1]);
6040 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
6041 tmpargv[2] = createObject(REDIS_STRING,
6042 sdscatprintf(sdsempty(),"%ld",when));
6043 argv = tmpargv;
6044 }
6045
6046 /* Append the actual command */
6047 buf = sdscatprintf(buf,"*%d\r\n",argc);
6048 for (j = 0; j < argc; j++) {
6049 robj *o = argv[j];
6050
9d65a1bb 6051 o = getDecodedObject(o);
83c6a618 6052 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
f80dff62 6053 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
6054 buf = sdscatlen(buf,"\r\n",2);
9d65a1bb 6055 decrRefCount(o);
f80dff62 6056 }
6057
6058 /* Free the objects from the modified argv for EXPIREAT */
6059 if (cmd->proc == expireCommand) {
6060 for (j = 0; j < 3; j++)
6061 decrRefCount(argv[j]);
6062 }
6063
6064 /* We want to perform a single write. This should be guaranteed atomic
6065 * at least if the filesystem we are writing is a real physical one.
6066 * While this will save us against the server being killed I don't think
6067 * there is much to do about the whole server stopping for power problems
6068 * or alike */
6069 nwritten = write(server.appendfd,buf,sdslen(buf));
6070 if (nwritten != (signed)sdslen(buf)) {
6071 /* Ooops, we are in troubles. The best thing to do for now is
6072 * to simply exit instead to give the illusion that everything is
6073 * working as expected. */
6074 if (nwritten == -1) {
6075 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
6076 } else {
6077 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
6078 }
6079 exit(1);
6080 }
85a83172 6081 /* If a background append only file rewriting is in progress we want to
6082 * accumulate the differences between the child DB and the current one
6083 * in a buffer, so that when the child process will do its work we
6084 * can append the differences to the new append only file. */
6085 if (server.bgrewritechildpid != -1)
6086 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
6087
6088 sdsfree(buf);
f80dff62 6089 now = time(NULL);
6090 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
6091 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
6092 now-server.lastfsync > 1))
6093 {
6094 fsync(server.appendfd); /* Let's try to get this data on the disk */
6095 server.lastfsync = now;
6096 }
6097}
6098
6099/* In Redis commands are always executed in the context of a client, so in
6100 * order to load the append only file we need to create a fake client. */
6101static struct redisClient *createFakeClient(void) {
6102 struct redisClient *c = zmalloc(sizeof(*c));
6103
6104 selectDb(c,0);
6105 c->fd = -1;
6106 c->querybuf = sdsempty();
6107 c->argc = 0;
6108 c->argv = NULL;
6109 c->flags = 0;
9387d17d 6110 /* We set the fake client as a slave waiting for the synchronization
6111 * so that Redis will not try to send replies to this client. */
6112 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
f80dff62 6113 c->reply = listCreate();
6114 listSetFreeMethod(c->reply,decrRefCount);
6115 listSetDupMethod(c->reply,dupClientReplyValue);
6116 return c;
6117}
6118
6119static void freeFakeClient(struct redisClient *c) {
6120 sdsfree(c->querybuf);
6121 listRelease(c->reply);
6122 zfree(c);
6123}
6124
6125/* Replay the append log file. On error REDIS_OK is returned. On non fatal
6126 * error (the append only file is zero-length) REDIS_ERR is returned. On
6127 * fatal error an error message is logged and the program exists. */
6128int loadAppendOnlyFile(char *filename) {
6129 struct redisClient *fakeClient;
6130 FILE *fp = fopen(filename,"r");
6131 struct redis_stat sb;
6132
6133 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
6134 return REDIS_ERR;
6135
6136 if (fp == NULL) {
6137 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
6138 exit(1);
6139 }
6140
6141 fakeClient = createFakeClient();
6142 while(1) {
6143 int argc, j;
6144 unsigned long len;
6145 robj **argv;
6146 char buf[128];
6147 sds argsds;
6148 struct redisCommand *cmd;
6149
6150 if (fgets(buf,sizeof(buf),fp) == NULL) {
6151 if (feof(fp))
6152 break;
6153 else
6154 goto readerr;
6155 }
6156 if (buf[0] != '*') goto fmterr;
6157 argc = atoi(buf+1);
6158 argv = zmalloc(sizeof(robj*)*argc);
6159 for (j = 0; j < argc; j++) {
6160 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
6161 if (buf[0] != '$') goto fmterr;
6162 len = strtol(buf+1,NULL,10);
6163 argsds = sdsnewlen(NULL,len);
0f151ef1 6164 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
f80dff62 6165 argv[j] = createObject(REDIS_STRING,argsds);
6166 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
6167 }
6168
6169 /* Command lookup */
6170 cmd = lookupCommand(argv[0]->ptr);
6171 if (!cmd) {
6172 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
6173 exit(1);
6174 }
6175 /* Try object sharing and encoding */
6176 if (server.shareobjects) {
6177 int j;
6178 for(j = 1; j < argc; j++)
6179 argv[j] = tryObjectSharing(argv[j]);
6180 }
6181 if (cmd->flags & REDIS_CMD_BULK)
6182 tryObjectEncoding(argv[argc-1]);
6183 /* Run the command in the context of a fake client */
6184 fakeClient->argc = argc;
6185 fakeClient->argv = argv;
6186 cmd->proc(fakeClient);
6187 /* Discard the reply objects list from the fake client */
6188 while(listLength(fakeClient->reply))
6189 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
6190 /* Clean up, ready for the next command */
6191 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
6192 zfree(argv);
6193 }
6194 fclose(fp);
6195 freeFakeClient(fakeClient);
6196 return REDIS_OK;
6197
6198readerr:
6199 if (feof(fp)) {
6200 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
6201 } else {
6202 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
6203 }
6204 exit(1);
6205fmterr:
6206 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
6207 exit(1);
6208}
6209
9d65a1bb 6210/* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6211static int fwriteBulk(FILE *fp, robj *obj) {
6212 char buf[128];
6213 obj = getDecodedObject(obj);
6214 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
6215 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
e96e4fbf 6216 if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0)
6217 goto err;
9d65a1bb 6218 if (fwrite("\r\n",2,1,fp) == 0) goto err;
6219 decrRefCount(obj);
6220 return 1;
6221err:
6222 decrRefCount(obj);
6223 return 0;
6224}
6225
6226/* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6227static int fwriteBulkDouble(FILE *fp, double d) {
6228 char buf[128], dbuf[128];
6229
6230 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
6231 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
6232 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6233 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
6234 return 1;
6235}
6236
6237/* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6238static int fwriteBulkLong(FILE *fp, long l) {
6239 char buf[128], lbuf[128];
6240
6241 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
6242 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
6243 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6244 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
6245 return 1;
6246}
6247
6248/* Write a sequence of commands able to fully rebuild the dataset into
6249 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6250static int rewriteAppendOnlyFile(char *filename) {
6251 dictIterator *di = NULL;
6252 dictEntry *de;
6253 FILE *fp;
6254 char tmpfile[256];
6255 int j;
6256 time_t now = time(NULL);
6257
6258 /* Note that we have to use a different temp name here compared to the
6259 * one used by rewriteAppendOnlyFileBackground() function. */
6260 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
6261 fp = fopen(tmpfile,"w");
6262 if (!fp) {
6263 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
6264 return REDIS_ERR;
6265 }
6266 for (j = 0; j < server.dbnum; j++) {
6267 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
6268 redisDb *db = server.db+j;
6269 dict *d = db->dict;
6270 if (dictSize(d) == 0) continue;
6271 di = dictGetIterator(d);
6272 if (!di) {
6273 fclose(fp);
6274 return REDIS_ERR;
6275 }
6276
6277 /* SELECT the new DB */
6278 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
85a83172 6279 if (fwriteBulkLong(fp,j) == 0) goto werr;
9d65a1bb 6280
6281 /* Iterate this DB writing every entry */
6282 while((de = dictNext(di)) != NULL) {
6283 robj *key = dictGetEntryKey(de);
6284 robj *o = dictGetEntryVal(de);
6285 time_t expiretime = getExpire(db,key);
6286
6287 /* Save the key and associated value */
9d65a1bb 6288 if (o->type == REDIS_STRING) {
6289 /* Emit a SET command */
6290 char cmd[]="*3\r\n$3\r\nSET\r\n";
6291 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6292 /* Key and value */
6293 if (fwriteBulk(fp,key) == 0) goto werr;
6294 if (fwriteBulk(fp,o) == 0) goto werr;
6295 } else if (o->type == REDIS_LIST) {
6296 /* Emit the RPUSHes needed to rebuild the list */
6297 list *list = o->ptr;
6298 listNode *ln;
6299
6300 listRewind(list);
6301 while((ln = listYield(list))) {
6302 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
6303 robj *eleobj = listNodeValue(ln);
6304
6305 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6306 if (fwriteBulk(fp,key) == 0) goto werr;
6307 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6308 }
6309 } else if (o->type == REDIS_SET) {
6310 /* Emit the SADDs needed to rebuild the set */
6311 dict *set = o->ptr;
6312 dictIterator *di = dictGetIterator(set);
6313 dictEntry *de;
6314
6315 while((de = dictNext(di)) != NULL) {
6316 char cmd[]="*3\r\n$4\r\nSADD\r\n";
6317 robj *eleobj = dictGetEntryKey(de);
6318
6319 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6320 if (fwriteBulk(fp,key) == 0) goto werr;
6321 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6322 }
6323 dictReleaseIterator(di);
6324 } else if (o->type == REDIS_ZSET) {
6325 /* Emit the ZADDs needed to rebuild the sorted set */
6326 zset *zs = o->ptr;
6327 dictIterator *di = dictGetIterator(zs->dict);
6328 dictEntry *de;
6329
6330 while((de = dictNext(di)) != NULL) {
6331 char cmd[]="*4\r\n$4\r\nZADD\r\n";
6332 robj *eleobj = dictGetEntryKey(de);
6333 double *score = dictGetEntryVal(de);
6334
6335 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6336 if (fwriteBulk(fp,key) == 0) goto werr;
6337 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
6338 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6339 }
6340 dictReleaseIterator(di);
6341 } else {
dfc5e96c 6342 redisAssert(0 != 0);
9d65a1bb 6343 }
6344 /* Save the expire time */
6345 if (expiretime != -1) {
e96e4fbf 6346 char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
9d65a1bb 6347 /* If this key is already expired skip it */
6348 if (expiretime < now) continue;
6349 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6350 if (fwriteBulk(fp,key) == 0) goto werr;
6351 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6352 }
6353 }
6354 dictReleaseIterator(di);
6355 }
6356
6357 /* Make sure data will not remain on the OS's output buffers */
6358 fflush(fp);
6359 fsync(fileno(fp));
6360 fclose(fp);
6361
6362 /* Use RENAME to make sure the DB file is changed atomically only
6363 * if the generate DB file is ok. */
6364 if (rename(tmpfile,filename) == -1) {
6365 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6366 unlink(tmpfile);
6367 return REDIS_ERR;
6368 }
6369 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6370 return REDIS_OK;
6371
6372werr:
6373 fclose(fp);
6374 unlink(tmpfile);
e96e4fbf 6375 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
9d65a1bb 6376 if (di) dictReleaseIterator(di);
6377 return REDIS_ERR;
6378}
6379
6380/* This is how rewriting of the append only file in background works:
6381 *
6382 * 1) The user calls BGREWRITEAOF
6383 * 2) Redis calls this function, that forks():
6384 * 2a) the child rewrite the append only file in a temp file.
6385 * 2b) the parent accumulates differences in server.bgrewritebuf.
6386 * 3) When the child finished '2a' exists.
6387 * 4) The parent will trap the exit code, if it's OK, will append the
6388 * data accumulated into server.bgrewritebuf into the temp file, and
6389 * finally will rename(2) the temp file in the actual file name.
6390 * The the new file is reopened as the new append only file. Profit!
6391 */
6392static int rewriteAppendOnlyFileBackground(void) {
6393 pid_t childpid;
6394
6395 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6396 if ((childpid = fork()) == 0) {
6397 /* Child */
6398 char tmpfile[256];
6399 close(server.fd);
6400
6401 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6402 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6403 exit(0);
6404 } else {
6405 exit(1);
6406 }
6407 } else {
6408 /* Parent */
6409 if (childpid == -1) {
6410 redisLog(REDIS_WARNING,
6411 "Can't rewrite append only file in background: fork: %s",
6412 strerror(errno));
6413 return REDIS_ERR;
6414 }
6415 redisLog(REDIS_NOTICE,
6416 "Background append only file rewriting started by pid %d",childpid);
6417 server.bgrewritechildpid = childpid;
85a83172 6418 /* We set appendseldb to -1 in order to force the next call to the
6419 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6420 * accumulated by the parent into server.bgrewritebuf will start
6421 * with a SELECT statement and it will be safe to merge. */
6422 server.appendseldb = -1;
9d65a1bb 6423 return REDIS_OK;
6424 }
6425 return REDIS_OK; /* unreached */
6426}
6427
6428static void bgrewriteaofCommand(redisClient *c) {
6429 if (server.bgrewritechildpid != -1) {
6430 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6431 return;
6432 }
6433 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
49b99ab4 6434 char *status = "+Background append only file rewriting started\r\n";
6435 addReplySds(c,sdsnew(status));
9d65a1bb 6436 } else {
6437 addReply(c,shared.err);
6438 }
6439}
6440
6441static void aofRemoveTempFile(pid_t childpid) {
6442 char tmpfile[256];
6443
6444 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6445 unlink(tmpfile);
6446}
6447
7f957c92 6448/* ================================= Debugging ============================== */
6449
6450static void debugCommand(redisClient *c) {
6451 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6452 *((char*)-1) = 'x';
210e29f7 6453 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6454 if (rdbSave(server.dbfilename) != REDIS_OK) {
6455 addReply(c,shared.err);
6456 return;
6457 }
6458 emptyDb();
6459 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6460 addReply(c,shared.err);
6461 return;
6462 }
6463 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6464 addReply(c,shared.ok);
71c2b467 6465 } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
6466 emptyDb();
6467 if (loadAppendOnlyFile(server.appendfilename) != REDIS_OK) {
6468 addReply(c,shared.err);
6469 return;
6470 }
6471 redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
6472 addReply(c,shared.ok);
333298da 6473 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6474 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6475 robj *key, *val;
6476
6477 if (!de) {
6478 addReply(c,shared.nokeyerr);
6479 return;
6480 }
6481 key = dictGetEntryKey(de);
6482 val = dictGetEntryVal(de);
6483 addReplySds(c,sdscatprintf(sdsempty(),
942a3961 6484 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
682ac724 6485 (void*)key, key->refcount, (void*)val, val->refcount,
6486 val->encoding));
7f957c92 6487 } else {
333298da 6488 addReplySds(c,sdsnew(
210e29f7 6489 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
7f957c92 6490 }
6491}
56906eef 6492
dfc5e96c 6493static void _redisAssert(char *estr) {
6494 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6495 redisLog(REDIS_WARNING,"==> %s\n",estr);
6496#ifdef HAVE_BACKTRACE
6497 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6498 *((char*)-1) = 'x';
6499#endif
6500}
6501
bcfc686d 6502/* =================================== Main! ================================ */
56906eef 6503
bcfc686d 6504#ifdef __linux__
6505int linuxOvercommitMemoryValue(void) {
6506 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6507 char buf[64];
56906eef 6508
bcfc686d 6509 if (!fp) return -1;
6510 if (fgets(buf,64,fp) == NULL) {
6511 fclose(fp);
6512 return -1;
6513 }
6514 fclose(fp);
56906eef 6515
bcfc686d 6516 return atoi(buf);
6517}
6518
6519void linuxOvercommitMemoryWarning(void) {
6520 if (linuxOvercommitMemoryValue() == 0) {
6521 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6522 }
6523}
6524#endif /* __linux__ */
6525
6526static void daemonize(void) {
6527 int fd;
6528 FILE *fp;
6529
6530 if (fork() != 0) exit(0); /* parent exits */
71c54b21 6531 printf("New pid: %d\n", getpid());
bcfc686d 6532 setsid(); /* create a new session */
6533
6534 /* Every output goes to /dev/null. If Redis is daemonized but
6535 * the 'logfile' is set to 'stdout' in the configuration file
6536 * it will not log at all. */
6537 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6538 dup2(fd, STDIN_FILENO);
6539 dup2(fd, STDOUT_FILENO);
6540 dup2(fd, STDERR_FILENO);
6541 if (fd > STDERR_FILENO) close(fd);
6542 }
6543 /* Try to write the pid file */
6544 fp = fopen(server.pidfile,"w");
6545 if (fp) {
6546 fprintf(fp,"%d\n",getpid());
6547 fclose(fp);
56906eef 6548 }
56906eef 6549}
6550
bcfc686d 6551int main(int argc, char **argv) {
6552 initServerConfig();
6553 if (argc == 2) {
6554 resetServerSaveParams();
6555 loadServerConfig(argv[1]);
6556 } else if (argc > 2) {
6557 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6558 exit(1);
6559 } else {
6560 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6561 }
bcfc686d 6562 if (server.daemonize) daemonize();
71c54b21 6563 initServer();
bcfc686d 6564 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6565#ifdef __linux__
6566 linuxOvercommitMemoryWarning();
6567#endif
6568 if (server.appendonly) {
6569 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6570 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6571 } else {
6572 if (rdbLoad(server.dbfilename) == REDIS_OK)
6573 redisLog(REDIS_NOTICE,"DB loaded from disk");
6574 }
6575 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
266373b2 6576 acceptHandler, NULL) == AE_ERR) oom("creating file event");
bcfc686d 6577 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6578 aeMain(server.el);
6579 aeDeleteEventLoop(server.el);
6580 return 0;
6581}
6582
6583/* ============================= Backtrace support ========================= */
6584
6585#ifdef HAVE_BACKTRACE
6586static char *findFuncName(void *pointer, unsigned long *offset);
6587
56906eef 6588static void *getMcontextEip(ucontext_t *uc) {
6589#if defined(__FreeBSD__)
6590 return (void*) uc->uc_mcontext.mc_eip;
6591#elif defined(__dietlibc__)
6592 return (void*) uc->uc_mcontext.eip;
06db1f50 6593#elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
da0a1620 6594 #if __x86_64__
6595 return (void*) uc->uc_mcontext->__ss.__rip;
6596 #else
56906eef 6597 return (void*) uc->uc_mcontext->__ss.__eip;
da0a1620 6598 #endif
06db1f50 6599#elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
cb7e07cc 6600 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
06db1f50 6601 return (void*) uc->uc_mcontext->__ss.__rip;
cbc59b38 6602 #else
6603 return (void*) uc->uc_mcontext->__ss.__eip;
6604 #endif
c04c9ac9 6605#elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
6606 return (void*) uc->uc_mcontext.gregs[REG_EIP]; /* Linux 32/64 bit */
b91cf5ef 6607#elif defined(__ia64__) /* Linux IA64 */
6608 return (void*) uc->uc_mcontext.sc_ip;
6609#else
6610 return NULL;
56906eef 6611#endif
6612}
6613
6614static void segvHandler(int sig, siginfo_t *info, void *secret) {
6615 void *trace[100];
6616 char **messages = NULL;
6617 int i, trace_size = 0;
6618 unsigned long offset=0;
56906eef 6619 ucontext_t *uc = (ucontext_t*) secret;
1c85b79f 6620 sds infostring;
56906eef 6621 REDIS_NOTUSED(info);
6622
6623 redisLog(REDIS_WARNING,
6624 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
1c85b79f 6625 infostring = genRedisInfoString();
6626 redisLog(REDIS_WARNING, "%s",infostring);
6627 /* It's not safe to sdsfree() the returned string under memory
6628 * corruption conditions. Let it leak as we are going to abort */
56906eef 6629
6630 trace_size = backtrace(trace, 100);
de96dbfe 6631 /* overwrite sigaction with caller's address */
b91cf5ef 6632 if (getMcontextEip(uc) != NULL) {
6633 trace[1] = getMcontextEip(uc);
6634 }
56906eef 6635 messages = backtrace_symbols(trace, trace_size);
fe3bbfbe 6636
d76412d1 6637 for (i=1; i<trace_size; ++i) {
56906eef 6638 char *fn = findFuncName(trace[i], &offset), *p;
6639
6640 p = strchr(messages[i],'+');
6641 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6642 redisLog(REDIS_WARNING,"%s", messages[i]);
6643 } else {
6644 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6645 }
6646 }
1c85b79f 6647 // free(messages); Don't call free() with possibly corrupted memory.
56906eef 6648 exit(0);
fe3bbfbe 6649}
56906eef 6650
6651static void setupSigSegvAction(void) {
6652 struct sigaction act;
6653
6654 sigemptyset (&act.sa_mask);
6655 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6656 * is used. Otherwise, sa_handler is used */
6657 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6658 act.sa_sigaction = segvHandler;
6659 sigaction (SIGSEGV, &act, NULL);
6660 sigaction (SIGBUS, &act, NULL);
12fea928 6661 sigaction (SIGFPE, &act, NULL);
6662 sigaction (SIGILL, &act, NULL);
6663 sigaction (SIGBUS, &act, NULL);
e65fdc78 6664 return;
56906eef 6665}
e65fdc78 6666
bcfc686d 6667#include "staticsymbols.h"
6668/* This function try to convert a pointer into a function name. It's used in
6669 * oreder to provide a backtrace under segmentation fault that's able to
6670 * display functions declared as static (otherwise the backtrace is useless). */
6671static char *findFuncName(void *pointer, unsigned long *offset){
6672 int i, ret = -1;
6673 unsigned long off, minoff = 0;
ed9b544e 6674
bcfc686d 6675 /* Try to match against the Symbol with the smallest offset */
6676 for (i=0; symsTable[i].pointer; i++) {
6677 unsigned long lp = (unsigned long) pointer;
0bc03378 6678
bcfc686d 6679 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6680 off=lp-symsTable[i].pointer;
6681 if (ret < 0 || off < minoff) {
6682 minoff=off;
6683 ret=i;
6684 }
6685 }
0bc03378 6686 }
bcfc686d 6687 if (ret == -1) return NULL;
6688 *offset = minoff;
6689 return symsTable[ret].name;
0bc03378 6690}
bcfc686d 6691#else /* HAVE_BACKTRACE */
6692static void setupSigSegvAction(void) {
0bc03378 6693}
bcfc686d 6694#endif /* HAVE_BACKTRACE */
0bc03378 6695
ed9b544e 6696
ed9b544e 6697
bcfc686d 6698/* The End */
6699
6700
ed9b544e 6701