]> git.saurik.com Git - redis.git/blame - redis.c
saving code refactored a bit, added a function returning the number of bytes an objec...
[redis.git] / redis.c
CommitLineData
ed9b544e 1/*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
08ff05b0 30#define REDIS_VERSION "1.3.1"
23d4709d 31
32#include "fmacros.h"
fbf9bcdb 33#include "config.h"
ed9b544e 34
35#include <stdio.h>
36#include <stdlib.h>
37#include <string.h>
38#include <time.h>
39#include <unistd.h>
c9468bcf 40#define __USE_POSIX199309
ed9b544e 41#include <signal.h>
fbf9bcdb 42
43#ifdef HAVE_BACKTRACE
c9468bcf 44#include <execinfo.h>
45#include <ucontext.h>
fbf9bcdb 46#endif /* HAVE_BACKTRACE */
47
ed9b544e 48#include <sys/wait.h>
49#include <errno.h>
50#include <assert.h>
51#include <ctype.h>
52#include <stdarg.h>
53#include <inttypes.h>
54#include <arpa/inet.h>
55#include <sys/stat.h>
56#include <fcntl.h>
57#include <sys/time.h>
58#include <sys/resource.h>
2895e862 59#include <sys/uio.h>
f78fd11b 60#include <limits.h>
a7866db6 61#include <math.h>
0bc1b2f6 62
63#if defined(__sun)
5043dff3 64#include "solarisfixes.h"
65#endif
ed9b544e 66
c9468bcf 67#include "redis.h"
ed9b544e 68#include "ae.h" /* Event driven programming library */
69#include "sds.h" /* Dynamic safe strings */
70#include "anet.h" /* Networking the easy way */
71#include "dict.h" /* Hash tables */
72#include "adlist.h" /* Linked lists */
73#include "zmalloc.h" /* total memory usage aware version of malloc/free */
5f5b9840 74#include "lzf.h" /* LZF compression library */
75#include "pqsort.h" /* Partial qsort for SORT+LIMIT */
ed9b544e 76
77/* Error codes */
78#define REDIS_OK 0
79#define REDIS_ERR -1
80
81/* Static server configuration */
82#define REDIS_SERVERPORT 6379 /* TCP port */
83#define REDIS_MAXIDLETIME (60*5) /* default client timeout */
6208b3a7 84#define REDIS_IOBUF_LEN 1024
ed9b544e 85#define REDIS_LOADBUF_LEN 1024
93ea3759 86#define REDIS_STATIC_ARGS 4
ed9b544e 87#define REDIS_DEFAULT_DBNUM 16
88#define REDIS_CONFIGLINE_MAX 1024
89#define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90#define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
94754ccc 91#define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
6f376729 92#define REDIS_MAX_WRITE_PER_EVENT (1024*64)
2895e862 93#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96#define REDIS_WRITEV_THRESHOLD 3
97/* Max number of iovecs used for each writev call */
98#define REDIS_WRITEV_IOVEC_COUNT 256
ed9b544e 99
100/* Hash table parameters */
101#define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
ed9b544e 102
103/* Command flags */
3fd78bcd 104#define REDIS_CMD_BULK 1 /* Bulk write command */
105#define REDIS_CMD_INLINE 2 /* Inline command */
106/* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110#define REDIS_CMD_DENYOOM 4
ed9b544e 111
112/* Object types */
113#define REDIS_STRING 0
114#define REDIS_LIST 1
115#define REDIS_SET 2
1812e024 116#define REDIS_ZSET 3
117#define REDIS_HASH 4
f78fd11b 118
942a3961 119/* Objects encoding */
120#define REDIS_ENCODING_RAW 0 /* Raw representation */
121#define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
f78fd11b 123/* Object types only used for dumping to disk */
bb32ede5 124#define REDIS_EXPIRETIME 253
ed9b544e 125#define REDIS_SELECTDB 254
126#define REDIS_EOF 255
127
f78fd11b 128/* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
a4d1ba9a 135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
f78fd11b 138 *
10c43610 139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
f78fd11b 141#define REDIS_RDB_6BITLEN 0
142#define REDIS_RDB_14BITLEN 1
143#define REDIS_RDB_32BITLEN 2
17be1a4a 144#define REDIS_RDB_ENCVAL 3
f78fd11b 145#define REDIS_RDB_LENERR UINT_MAX
146
a4d1ba9a 147/* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150#define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151#define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
774e3047 153#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
a4d1ba9a 154
ed9b544e 155/* Client flags */
156#define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157#define REDIS_SLAVE 2 /* This client is a slave server */
158#define REDIS_MASTER 4 /* This client is a master server */
87eca727 159#define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
6e469882 160#define REDIS_MULTI 16 /* This client is in a MULTI context */
4409877e 161#define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
ed9b544e 162
40d224a9 163/* Slave replication state - slave side */
ed9b544e 164#define REDIS_REPL_NONE 0 /* No active replication */
165#define REDIS_REPL_CONNECT 1 /* Must connect to master */
166#define REDIS_REPL_CONNECTED 2 /* Connected to master */
167
40d224a9 168/* Slave replication state - from the point of view of master
169 * Note that in SEND_BULK and ONLINE state the slave receives new updates
170 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
171 * to start the next background saving in order to send updates to it. */
172#define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
173#define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
174#define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
175#define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
176
ed9b544e 177/* List related stuff */
178#define REDIS_HEAD 0
179#define REDIS_TAIL 1
180
181/* Sort operations */
182#define REDIS_SORT_GET 0
443c6409 183#define REDIS_SORT_ASC 1
184#define REDIS_SORT_DESC 2
ed9b544e 185#define REDIS_SORTKEY_MAX 1024
186
187/* Log levels */
188#define REDIS_DEBUG 0
189#define REDIS_NOTICE 1
190#define REDIS_WARNING 2
191
192/* Anti-warning macro... */
193#define REDIS_NOTUSED(V) ((void) V)
194
6b47e12e 195#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
196#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
ed9b544e 197
48f0308a 198/* Append only defines */
199#define APPENDFSYNC_NO 0
200#define APPENDFSYNC_ALWAYS 1
201#define APPENDFSYNC_EVERYSEC 2
202
dfc5e96c 203/* We can print the stacktrace, so our assert is defined this way: */
204#define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
205static void _redisAssert(char *estr);
206
ed9b544e 207/*================================= Data types ============================== */
208
209/* A redis object, that is a type able to hold a string / list / set */
210typedef struct redisObject {
ed9b544e 211 void *ptr;
942a3961 212 unsigned char type;
213 unsigned char encoding;
214 unsigned char notused[2];
ed9b544e 215 int refcount;
216} robj;
217
dfc5e96c 218/* Macro used to initalize a Redis object allocated on the stack.
219 * Note that this macro is taken near the structure definition to make sure
220 * we'll update it when the structure is changed, to avoid bugs like
221 * bug #85 introduced exactly in this way. */
222#define initStaticStringObject(_var,_ptr) do { \
223 _var.refcount = 1; \
224 _var.type = REDIS_STRING; \
225 _var.encoding = REDIS_ENCODING_RAW; \
226 _var.ptr = _ptr; \
227} while(0);
228
3305306f 229typedef struct redisDb {
4409877e 230 dict *dict; /* The keyspace for this DB */
231 dict *expires; /* Timeout of keys with a timeout set */
232 dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */
3305306f 233 int id;
234} redisDb;
235
6e469882 236/* Client MULTI/EXEC state */
237typedef struct multiCmd {
238 robj **argv;
239 int argc;
240 struct redisCommand *cmd;
241} multiCmd;
242
243typedef struct multiState {
244 multiCmd *commands; /* Array of MULTI commands */
245 int count; /* Total number of MULTI commands */
246} multiState;
247
ed9b544e 248/* With multiplexing we need to take per-clinet state.
249 * Clients are taken in a liked list. */
250typedef struct redisClient {
251 int fd;
3305306f 252 redisDb *db;
ed9b544e 253 int dictid;
254 sds querybuf;
e8a74421 255 robj **argv, **mbargv;
256 int argc, mbargc;
40d224a9 257 int bulklen; /* bulk read len. -1 if not in bulk read mode */
e8a74421 258 int multibulk; /* multi bulk command format active */
ed9b544e 259 list *reply;
260 int sentlen;
261 time_t lastinteraction; /* time of the last interaction, used for timeout */
40d224a9 262 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
6e469882 263 /* REDIS_MULTI */
40d224a9 264 int slaveseldb; /* slave selected db, if this client is a slave */
265 int authenticated; /* when requirepass is non-NULL */
266 int replstate; /* replication state if this is a slave */
267 int repldbfd; /* replication DB file descriptor */
6e469882 268 long repldboff; /* replication DB file offset */
40d224a9 269 off_t repldbsize; /* replication DB file size */
6e469882 270 multiState mstate; /* MULTI/EXEC state */
b177fd30 271 robj **blockingkeys; /* The key we waiting to terminate a blocking
4409877e 272 * operation such as BLPOP. Otherwise NULL. */
b177fd30 273 int blockingkeysnum; /* Number of blocking keys */
4409877e 274 time_t blockingto; /* Blocking operation timeout. If UNIX current time
275 * is >= blockingto then the operation timed out. */
ed9b544e 276} redisClient;
277
278struct saveparam {
279 time_t seconds;
280 int changes;
281};
282
283/* Global server state structure */
284struct redisServer {
285 int port;
286 int fd;
3305306f 287 redisDb *db;
4409877e 288 dict *sharingpool; /* Poll used for object sharing */
10c43610 289 unsigned int sharingpoolsize;
ed9b544e 290 long long dirty; /* changes to DB from the last save */
291 list *clients;
87eca727 292 list *slaves, *monitors;
ed9b544e 293 char neterr[ANET_ERR_LEN];
294 aeEventLoop *el;
295 int cronloops; /* number of times the cron function run */
296 list *objfreelist; /* A list of freed objects to avoid malloc() */
297 time_t lastsave; /* Unix time of last save succeeede */
5fba9f71 298 size_t usedmemory; /* Used memory in megabytes */
ed9b544e 299 /* Fields used only for stats */
300 time_t stat_starttime; /* server start time */
301 long long stat_numcommands; /* number of processed commands */
302 long long stat_numconnections; /* number of connections received */
303 /* Configuration */
304 int verbosity;
305 int glueoutputbuf;
306 int maxidletime;
307 int dbnum;
308 int daemonize;
44b38ef4 309 int appendonly;
48f0308a 310 int appendfsync;
311 time_t lastfsync;
44b38ef4 312 int appendfd;
313 int appendseldb;
ed329fcf 314 char *pidfile;
9f3c422c 315 pid_t bgsavechildpid;
9d65a1bb 316 pid_t bgrewritechildpid;
317 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
ed9b544e 318 struct saveparam *saveparams;
319 int saveparamslen;
320 char *logfile;
321 char *bindaddr;
322 char *dbfilename;
44b38ef4 323 char *appendfilename;
abcb223e 324 char *requirepass;
10c43610 325 int shareobjects;
121f70cf 326 int rdbcompression;
ed9b544e 327 /* Replication related */
328 int isslave;
d0ccebcf 329 char *masterauth;
ed9b544e 330 char *masterhost;
331 int masterport;
40d224a9 332 redisClient *master; /* client that is master for this slave */
ed9b544e 333 int replstate;
285add55 334 unsigned int maxclients;
d4465900 335 unsigned long maxmemory;
f86a74e9 336 unsigned int blockedclients;
ed9b544e 337 /* Sort parameters - qsort_r() is only available under BSD so we
338 * have to take this state global, in order to pass it to sortCompare() */
339 int sort_desc;
340 int sort_alpha;
341 int sort_bypattern;
342};
343
344typedef void redisCommandProc(redisClient *c);
345struct redisCommand {
346 char *name;
347 redisCommandProc *proc;
348 int arity;
349 int flags;
350};
351
de96dbfe 352struct redisFunctionSym {
353 char *name;
56906eef 354 unsigned long pointer;
de96dbfe 355};
356
ed9b544e 357typedef struct _redisSortObject {
358 robj *obj;
359 union {
360 double score;
361 robj *cmpobj;
362 } u;
363} redisSortObject;
364
365typedef struct _redisSortOperation {
366 int type;
367 robj *pattern;
368} redisSortOperation;
369
6b47e12e 370/* ZSETs use a specialized version of Skiplists */
371
372typedef struct zskiplistNode {
373 struct zskiplistNode **forward;
e3870fab 374 struct zskiplistNode *backward;
6b47e12e 375 double score;
376 robj *obj;
377} zskiplistNode;
378
379typedef struct zskiplist {
e3870fab 380 struct zskiplistNode *header, *tail;
d13f767c 381 unsigned long length;
6b47e12e 382 int level;
383} zskiplist;
384
1812e024 385typedef struct zset {
386 dict *dict;
6b47e12e 387 zskiplist *zsl;
1812e024 388} zset;
389
6b47e12e 390/* Our shared "common" objects */
391
ed9b544e 392struct sharedObjectsStruct {
c937aa89 393 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
6e469882 394 *colon, *nullbulk, *nullmultibulk, *queued,
c937aa89 395 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
396 *outofrangeerr, *plus,
ed9b544e 397 *select0, *select1, *select2, *select3, *select4,
398 *select5, *select6, *select7, *select8, *select9;
399} shared;
400
a7866db6 401/* Global vars that are actally used as constants. The following double
402 * values are used for double on-disk serialization, and are initialized
403 * at runtime to avoid strange compiler optimizations. */
404
405static double R_Zero, R_PosInf, R_NegInf, R_Nan;
406
ed9b544e 407/*================================ Prototypes =============================== */
408
409static void freeStringObject(robj *o);
410static void freeListObject(robj *o);
411static void freeSetObject(robj *o);
412static void decrRefCount(void *o);
413static robj *createObject(int type, void *ptr);
414static void freeClient(redisClient *c);
f78fd11b 415static int rdbLoad(char *filename);
ed9b544e 416static void addReply(redisClient *c, robj *obj);
417static void addReplySds(redisClient *c, sds s);
418static void incrRefCount(robj *o);
f78fd11b 419static int rdbSaveBackground(char *filename);
ed9b544e 420static robj *createStringObject(char *ptr, size_t len);
87eca727 421static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
44b38ef4 422static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
ed9b544e 423static int syncWithMaster(void);
10c43610 424static robj *tryObjectSharing(robj *o);
942a3961 425static int tryObjectEncoding(robj *o);
9d65a1bb 426static robj *getDecodedObject(robj *o);
3305306f 427static int removeExpire(redisDb *db, robj *key);
428static int expireIfNeeded(redisDb *db, robj *key);
429static int deleteIfVolatile(redisDb *db, robj *key);
94754ccc 430static int deleteKey(redisDb *db, robj *key);
bb32ede5 431static time_t getExpire(redisDb *db, robj *key);
432static int setExpire(redisDb *db, robj *key, time_t when);
a3b21203 433static void updateSlavesWaitingBgsave(int bgsaveerr);
3fd78bcd 434static void freeMemoryIfNeeded(void);
de96dbfe 435static int processCommand(redisClient *c);
56906eef 436static void setupSigSegvAction(void);
a3b21203 437static void rdbRemoveTempFile(pid_t childpid);
9d65a1bb 438static void aofRemoveTempFile(pid_t childpid);
0ea663ea 439static size_t stringObjectLen(robj *o);
638e42ac 440static void processInputBuffer(redisClient *c);
6b47e12e 441static zskiplist *zslCreate(void);
fd8ccf44 442static void zslFree(zskiplist *zsl);
2b59cfdf 443static void zslInsert(zskiplist *zsl, double score, robj *obj);
2895e862 444static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
6e469882 445static void initClientMultiState(redisClient *c);
446static void freeClientMultiState(redisClient *c);
447static void queueMultiCommand(redisClient *c, struct redisCommand *cmd);
4409877e 448static void unblockClient(redisClient *c);
449static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
ed9b544e 450
abcb223e 451static void authCommand(redisClient *c);
ed9b544e 452static void pingCommand(redisClient *c);
453static void echoCommand(redisClient *c);
454static void setCommand(redisClient *c);
455static void setnxCommand(redisClient *c);
456static void getCommand(redisClient *c);
457static void delCommand(redisClient *c);
458static void existsCommand(redisClient *c);
459static void incrCommand(redisClient *c);
460static void decrCommand(redisClient *c);
461static void incrbyCommand(redisClient *c);
462static void decrbyCommand(redisClient *c);
463static void selectCommand(redisClient *c);
464static void randomkeyCommand(redisClient *c);
465static void keysCommand(redisClient *c);
466static void dbsizeCommand(redisClient *c);
467static void lastsaveCommand(redisClient *c);
468static void saveCommand(redisClient *c);
469static void bgsaveCommand(redisClient *c);
9d65a1bb 470static void bgrewriteaofCommand(redisClient *c);
ed9b544e 471static void shutdownCommand(redisClient *c);
472static void moveCommand(redisClient *c);
473static void renameCommand(redisClient *c);
474static void renamenxCommand(redisClient *c);
475static void lpushCommand(redisClient *c);
476static void rpushCommand(redisClient *c);
477static void lpopCommand(redisClient *c);
478static void rpopCommand(redisClient *c);
479static void llenCommand(redisClient *c);
480static void lindexCommand(redisClient *c);
481static void lrangeCommand(redisClient *c);
482static void ltrimCommand(redisClient *c);
483static void typeCommand(redisClient *c);
484static void lsetCommand(redisClient *c);
485static void saddCommand(redisClient *c);
486static void sremCommand(redisClient *c);
a4460ef4 487static void smoveCommand(redisClient *c);
ed9b544e 488static void sismemberCommand(redisClient *c);
489static void scardCommand(redisClient *c);
12fea928 490static void spopCommand(redisClient *c);
2abb95a9 491static void srandmemberCommand(redisClient *c);
ed9b544e 492static void sinterCommand(redisClient *c);
493static void sinterstoreCommand(redisClient *c);
40d224a9 494static void sunionCommand(redisClient *c);
495static void sunionstoreCommand(redisClient *c);
f4f56e1d 496static void sdiffCommand(redisClient *c);
497static void sdiffstoreCommand(redisClient *c);
ed9b544e 498static void syncCommand(redisClient *c);
499static void flushdbCommand(redisClient *c);
500static void flushallCommand(redisClient *c);
501static void sortCommand(redisClient *c);
502static void lremCommand(redisClient *c);
0f5f7e9a 503static void rpoplpushcommand(redisClient *c);
ed9b544e 504static void infoCommand(redisClient *c);
70003d28 505static void mgetCommand(redisClient *c);
87eca727 506static void monitorCommand(redisClient *c);
3305306f 507static void expireCommand(redisClient *c);
802e8373 508static void expireatCommand(redisClient *c);
f6b141c5 509static void getsetCommand(redisClient *c);
fd88489a 510static void ttlCommand(redisClient *c);
321b0e13 511static void slaveofCommand(redisClient *c);
7f957c92 512static void debugCommand(redisClient *c);
f6b141c5 513static void msetCommand(redisClient *c);
514static void msetnxCommand(redisClient *c);
fd8ccf44 515static void zaddCommand(redisClient *c);
7db723ad 516static void zincrbyCommand(redisClient *c);
cc812361 517static void zrangeCommand(redisClient *c);
50c55df5 518static void zrangebyscoreCommand(redisClient *c);
e3870fab 519static void zrevrangeCommand(redisClient *c);
3c41331e 520static void zcardCommand(redisClient *c);
1b7106e7 521static void zremCommand(redisClient *c);
6e333bbe 522static void zscoreCommand(redisClient *c);
1807985b 523static void zremrangebyscoreCommand(redisClient *c);
6e469882 524static void multiCommand(redisClient *c);
525static void execCommand(redisClient *c);
4409877e 526static void blpopCommand(redisClient *c);
527static void brpopCommand(redisClient *c);
f6b141c5 528
ed9b544e 529/*================================= Globals ================================= */
530
531/* Global vars */
532static struct redisServer server; /* server global state */
533static struct redisCommand cmdTable[] = {
534 {"get",getCommand,2,REDIS_CMD_INLINE},
3fd78bcd 535 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
536 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
5109cdff 537 {"del",delCommand,-2,REDIS_CMD_INLINE},
ed9b544e 538 {"exists",existsCommand,2,REDIS_CMD_INLINE},
3fd78bcd 539 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
540 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
70003d28 541 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
3fd78bcd 542 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
543 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 544 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
545 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
b177fd30 546 {"brpop",brpopCommand,-3,REDIS_CMD_INLINE},
547 {"blpop",blpopCommand,-3,REDIS_CMD_INLINE},
ed9b544e 548 {"llen",llenCommand,2,REDIS_CMD_INLINE},
549 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
3fd78bcd 550 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 551 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
552 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
553 {"lrem",lremCommand,4,REDIS_CMD_BULK},
0b13687c 554 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
3fd78bcd 555 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 556 {"srem",sremCommand,3,REDIS_CMD_BULK},
a4460ef4 557 {"smove",smoveCommand,4,REDIS_CMD_BULK},
ed9b544e 558 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
559 {"scard",scardCommand,2,REDIS_CMD_INLINE},
12fea928 560 {"spop",spopCommand,2,REDIS_CMD_INLINE},
2abb95a9 561 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
3fd78bcd 562 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
563 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
564 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
565 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
566 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
567 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
ed9b544e 568 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
fd8ccf44 569 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
7db723ad 570 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
1b7106e7 571 {"zrem",zremCommand,3,REDIS_CMD_BULK},
1807985b 572 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
752da584 573 {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE},
80181f78 574 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
752da584 575 {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE},
3c41331e 576 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
6e333bbe 577 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
3fd78bcd 578 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
579 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
f6b141c5 580 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
581 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
582 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 583 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
584 {"select",selectCommand,2,REDIS_CMD_INLINE},
585 {"move",moveCommand,3,REDIS_CMD_INLINE},
586 {"rename",renameCommand,3,REDIS_CMD_INLINE},
587 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
321b0e13 588 {"expire",expireCommand,3,REDIS_CMD_INLINE},
802e8373 589 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
ed9b544e 590 {"keys",keysCommand,2,REDIS_CMD_INLINE},
591 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
abcb223e 592 {"auth",authCommand,2,REDIS_CMD_INLINE},
ed9b544e 593 {"ping",pingCommand,1,REDIS_CMD_INLINE},
594 {"echo",echoCommand,2,REDIS_CMD_BULK},
595 {"save",saveCommand,1,REDIS_CMD_INLINE},
596 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
9d65a1bb 597 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
ed9b544e 598 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
599 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
600 {"type",typeCommand,2,REDIS_CMD_INLINE},
6e469882 601 {"multi",multiCommand,1,REDIS_CMD_INLINE},
602 {"exec",execCommand,1,REDIS_CMD_INLINE},
ed9b544e 603 {"sync",syncCommand,1,REDIS_CMD_INLINE},
604 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
605 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
3fd78bcd 606 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
ed9b544e 607 {"info",infoCommand,1,REDIS_CMD_INLINE},
87eca727 608 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
fd88489a 609 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
321b0e13 610 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
7f957c92 611 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
ed9b544e 612 {NULL,NULL,0,0}
613};
bcfc686d 614
ed9b544e 615/*============================ Utility functions ============================ */
616
617/* Glob-style pattern matching. */
618int stringmatchlen(const char *pattern, int patternLen,
619 const char *string, int stringLen, int nocase)
620{
621 while(patternLen) {
622 switch(pattern[0]) {
623 case '*':
624 while (pattern[1] == '*') {
625 pattern++;
626 patternLen--;
627 }
628 if (patternLen == 1)
629 return 1; /* match */
630 while(stringLen) {
631 if (stringmatchlen(pattern+1, patternLen-1,
632 string, stringLen, nocase))
633 return 1; /* match */
634 string++;
635 stringLen--;
636 }
637 return 0; /* no match */
638 break;
639 case '?':
640 if (stringLen == 0)
641 return 0; /* no match */
642 string++;
643 stringLen--;
644 break;
645 case '[':
646 {
647 int not, match;
648
649 pattern++;
650 patternLen--;
651 not = pattern[0] == '^';
652 if (not) {
653 pattern++;
654 patternLen--;
655 }
656 match = 0;
657 while(1) {
658 if (pattern[0] == '\\') {
659 pattern++;
660 patternLen--;
661 if (pattern[0] == string[0])
662 match = 1;
663 } else if (pattern[0] == ']') {
664 break;
665 } else if (patternLen == 0) {
666 pattern--;
667 patternLen++;
668 break;
669 } else if (pattern[1] == '-' && patternLen >= 3) {
670 int start = pattern[0];
671 int end = pattern[2];
672 int c = string[0];
673 if (start > end) {
674 int t = start;
675 start = end;
676 end = t;
677 }
678 if (nocase) {
679 start = tolower(start);
680 end = tolower(end);
681 c = tolower(c);
682 }
683 pattern += 2;
684 patternLen -= 2;
685 if (c >= start && c <= end)
686 match = 1;
687 } else {
688 if (!nocase) {
689 if (pattern[0] == string[0])
690 match = 1;
691 } else {
692 if (tolower((int)pattern[0]) == tolower((int)string[0]))
693 match = 1;
694 }
695 }
696 pattern++;
697 patternLen--;
698 }
699 if (not)
700 match = !match;
701 if (!match)
702 return 0; /* no match */
703 string++;
704 stringLen--;
705 break;
706 }
707 case '\\':
708 if (patternLen >= 2) {
709 pattern++;
710 patternLen--;
711 }
712 /* fall through */
713 default:
714 if (!nocase) {
715 if (pattern[0] != string[0])
716 return 0; /* no match */
717 } else {
718 if (tolower((int)pattern[0]) != tolower((int)string[0]))
719 return 0; /* no match */
720 }
721 string++;
722 stringLen--;
723 break;
724 }
725 pattern++;
726 patternLen--;
727 if (stringLen == 0) {
728 while(*pattern == '*') {
729 pattern++;
730 patternLen--;
731 }
732 break;
733 }
734 }
735 if (patternLen == 0 && stringLen == 0)
736 return 1;
737 return 0;
738}
739
56906eef 740static void redisLog(int level, const char *fmt, ...) {
ed9b544e 741 va_list ap;
742 FILE *fp;
743
744 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
745 if (!fp) return;
746
747 va_start(ap, fmt);
748 if (level >= server.verbosity) {
749 char *c = ".-*";
1904ecc1 750 char buf[64];
751 time_t now;
752
753 now = time(NULL);
6c9385e0 754 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
1904ecc1 755 fprintf(fp,"%s %c ",buf,c[level]);
ed9b544e 756 vfprintf(fp, fmt, ap);
757 fprintf(fp,"\n");
758 fflush(fp);
759 }
760 va_end(ap);
761
762 if (server.logfile) fclose(fp);
763}
764
765/*====================== Hash table type implementation ==================== */
766
767/* This is an hash table type that uses the SDS dynamic strings libary as
768 * keys and radis objects as values (objects can hold SDS strings,
769 * lists, sets). */
770
1812e024 771static void dictVanillaFree(void *privdata, void *val)
772{
773 DICT_NOTUSED(privdata);
774 zfree(val);
775}
776
4409877e 777static void dictListDestructor(void *privdata, void *val)
778{
779 DICT_NOTUSED(privdata);
780 listRelease((list*)val);
781}
782
ed9b544e 783static int sdsDictKeyCompare(void *privdata, const void *key1,
784 const void *key2)
785{
786 int l1,l2;
787 DICT_NOTUSED(privdata);
788
789 l1 = sdslen((sds)key1);
790 l2 = sdslen((sds)key2);
791 if (l1 != l2) return 0;
792 return memcmp(key1, key2, l1) == 0;
793}
794
795static void dictRedisObjectDestructor(void *privdata, void *val)
796{
797 DICT_NOTUSED(privdata);
798
799 decrRefCount(val);
800}
801
942a3961 802static int dictObjKeyCompare(void *privdata, const void *key1,
ed9b544e 803 const void *key2)
804{
805 const robj *o1 = key1, *o2 = key2;
806 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
807}
808
942a3961 809static unsigned int dictObjHash(const void *key) {
ed9b544e 810 const robj *o = key;
811 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
812}
813
942a3961 814static int dictEncObjKeyCompare(void *privdata, const void *key1,
815 const void *key2)
816{
9d65a1bb 817 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
818 int cmp;
942a3961 819
9d65a1bb 820 o1 = getDecodedObject(o1);
821 o2 = getDecodedObject(o2);
822 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
823 decrRefCount(o1);
824 decrRefCount(o2);
825 return cmp;
942a3961 826}
827
828static unsigned int dictEncObjHash(const void *key) {
9d65a1bb 829 robj *o = (robj*) key;
942a3961 830
9d65a1bb 831 o = getDecodedObject(o);
832 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
833 decrRefCount(o);
834 return hash;
942a3961 835}
836
ed9b544e 837static dictType setDictType = {
942a3961 838 dictEncObjHash, /* hash function */
ed9b544e 839 NULL, /* key dup */
840 NULL, /* val dup */
942a3961 841 dictEncObjKeyCompare, /* key compare */
ed9b544e 842 dictRedisObjectDestructor, /* key destructor */
843 NULL /* val destructor */
844};
845
1812e024 846static dictType zsetDictType = {
847 dictEncObjHash, /* hash function */
848 NULL, /* key dup */
849 NULL, /* val dup */
850 dictEncObjKeyCompare, /* key compare */
851 dictRedisObjectDestructor, /* key destructor */
da0a1620 852 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
1812e024 853};
854
ed9b544e 855static dictType hashDictType = {
942a3961 856 dictObjHash, /* hash function */
ed9b544e 857 NULL, /* key dup */
858 NULL, /* val dup */
942a3961 859 dictObjKeyCompare, /* key compare */
ed9b544e 860 dictRedisObjectDestructor, /* key destructor */
861 dictRedisObjectDestructor /* val destructor */
862};
863
4409877e 864/* Keylist hash table type has unencoded redis objects as keys and
865 * lists as values. It's used for blocking operations (BLPOP) */
866static dictType keylistDictType = {
867 dictObjHash, /* hash function */
868 NULL, /* key dup */
869 NULL, /* val dup */
870 dictObjKeyCompare, /* key compare */
871 dictRedisObjectDestructor, /* key destructor */
872 dictListDestructor /* val destructor */
873};
874
ed9b544e 875/* ========================= Random utility functions ======================= */
876
877/* Redis generally does not try to recover from out of memory conditions
878 * when allocating objects or strings, it is not clear if it will be possible
879 * to report this condition to the client since the networking layer itself
880 * is based on heap allocation for send buffers, so we simply abort.
881 * At least the code will be simpler to read... */
882static void oom(const char *msg) {
71c54b21 883 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
ed9b544e 884 sleep(1);
885 abort();
886}
887
888/* ====================== Redis server networking stuff ===================== */
56906eef 889static void closeTimedoutClients(void) {
ed9b544e 890 redisClient *c;
ed9b544e 891 listNode *ln;
892 time_t now = time(NULL);
893
6208b3a7 894 listRewind(server.clients);
895 while ((ln = listYield(server.clients)) != NULL) {
ed9b544e 896 c = listNodeValue(ln);
f86a74e9 897 if (server.maxidletime &&
898 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
c7cf2ec9 899 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
f86a74e9 900 (now - c->lastinteraction > server.maxidletime))
901 {
ed9b544e 902 redisLog(REDIS_DEBUG,"Closing idle client");
903 freeClient(c);
f86a74e9 904 } else if (c->flags & REDIS_BLOCKED) {
58d976b8 905 if (c->blockingto != 0 && c->blockingto < now) {
b177fd30 906 addReply(c,shared.nullmultibulk);
f86a74e9 907 unblockClient(c);
908 }
ed9b544e 909 }
910 }
ed9b544e 911}
912
12fea928 913static int htNeedsResize(dict *dict) {
914 long long size, used;
915
916 size = dictSlots(dict);
917 used = dictSize(dict);
918 return (size && used && size > DICT_HT_INITIAL_SIZE &&
919 (used*100/size < REDIS_HT_MINFILL));
920}
921
0bc03378 922/* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
923 * we resize the hash table to save memory */
56906eef 924static void tryResizeHashTables(void) {
0bc03378 925 int j;
926
927 for (j = 0; j < server.dbnum; j++) {
12fea928 928 if (htNeedsResize(server.db[j].dict)) {
929 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
0bc03378 930 dictResize(server.db[j].dict);
12fea928 931 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
0bc03378 932 }
12fea928 933 if (htNeedsResize(server.db[j].expires))
934 dictResize(server.db[j].expires);
0bc03378 935 }
936}
937
9d65a1bb 938/* A background saving child (BGSAVE) terminated its work. Handle this. */
939void backgroundSaveDoneHandler(int statloc) {
940 int exitcode = WEXITSTATUS(statloc);
941 int bysignal = WIFSIGNALED(statloc);
942
943 if (!bysignal && exitcode == 0) {
944 redisLog(REDIS_NOTICE,
945 "Background saving terminated with success");
946 server.dirty = 0;
947 server.lastsave = time(NULL);
948 } else if (!bysignal && exitcode != 0) {
949 redisLog(REDIS_WARNING, "Background saving error");
950 } else {
951 redisLog(REDIS_WARNING,
952 "Background saving terminated by signal");
953 rdbRemoveTempFile(server.bgsavechildpid);
954 }
955 server.bgsavechildpid = -1;
956 /* Possibly there are slaves waiting for a BGSAVE in order to be served
957 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
958 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
959}
960
961/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
962 * Handle this. */
963void backgroundRewriteDoneHandler(int statloc) {
964 int exitcode = WEXITSTATUS(statloc);
965 int bysignal = WIFSIGNALED(statloc);
966
967 if (!bysignal && exitcode == 0) {
968 int fd;
969 char tmpfile[256];
970
971 redisLog(REDIS_NOTICE,
972 "Background append only file rewriting terminated with success");
973 /* Now it's time to flush the differences accumulated by the parent */
974 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
975 fd = open(tmpfile,O_WRONLY|O_APPEND);
976 if (fd == -1) {
977 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
978 goto cleanup;
979 }
980 /* Flush our data... */
981 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
982 (signed) sdslen(server.bgrewritebuf)) {
983 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
984 close(fd);
985 goto cleanup;
986 }
b32627cd 987 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
9d65a1bb 988 /* Now our work is to rename the temp file into the stable file. And
989 * switch the file descriptor used by the server for append only. */
990 if (rename(tmpfile,server.appendfilename) == -1) {
991 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
992 close(fd);
993 goto cleanup;
994 }
995 /* Mission completed... almost */
996 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
997 if (server.appendfd != -1) {
998 /* If append only is actually enabled... */
999 close(server.appendfd);
1000 server.appendfd = fd;
1001 fsync(fd);
85a83172 1002 server.appendseldb = -1; /* Make sure it will issue SELECT */
9d65a1bb 1003 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
1004 } else {
1005 /* If append only is disabled we just generate a dump in this
1006 * format. Why not? */
1007 close(fd);
1008 }
1009 } else if (!bysignal && exitcode != 0) {
1010 redisLog(REDIS_WARNING, "Background append only file rewriting error");
1011 } else {
1012 redisLog(REDIS_WARNING,
1013 "Background append only file rewriting terminated by signal");
1014 }
1015cleanup:
1016 sdsfree(server.bgrewritebuf);
1017 server.bgrewritebuf = sdsempty();
1018 aofRemoveTempFile(server.bgrewritechildpid);
1019 server.bgrewritechildpid = -1;
1020}
1021
56906eef 1022static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
94754ccc 1023 int j, loops = server.cronloops++;
ed9b544e 1024 REDIS_NOTUSED(eventLoop);
1025 REDIS_NOTUSED(id);
1026 REDIS_NOTUSED(clientData);
1027
1028 /* Update the global state with the amount of used memory */
1029 server.usedmemory = zmalloc_used_memory();
1030
0bc03378 1031 /* Show some info about non-empty databases */
ed9b544e 1032 for (j = 0; j < server.dbnum; j++) {
dec423d9 1033 long long size, used, vkeys;
94754ccc 1034
3305306f 1035 size = dictSlots(server.db[j].dict);
1036 used = dictSize(server.db[j].dict);
94754ccc 1037 vkeys = dictSize(server.db[j].expires);
c3cb078d 1038 if (!(loops % 5) && (used || vkeys)) {
1039 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
a4d1ba9a 1040 /* dictPrintStats(server.dict); */
ed9b544e 1041 }
ed9b544e 1042 }
1043
0bc03378 1044 /* We don't want to resize the hash tables while a bacground saving
1045 * is in progress: the saving child is created using fork() that is
1046 * implemented with a copy-on-write semantic in most modern systems, so
1047 * if we resize the HT while there is the saving child at work actually
1048 * a lot of memory movements in the parent will cause a lot of pages
1049 * copied. */
9d65a1bb 1050 if (server.bgsavechildpid == -1) tryResizeHashTables();
0bc03378 1051
ed9b544e 1052 /* Show information about connected clients */
1053 if (!(loops % 5)) {
21aecf4b 1054 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
ed9b544e 1055 listLength(server.clients)-listLength(server.slaves),
1056 listLength(server.slaves),
10c43610 1057 server.usedmemory,
3305306f 1058 dictSize(server.sharingpool));
ed9b544e 1059 }
1060
1061 /* Close connections of timedout clients */
f86a74e9 1062 if ((server.maxidletime && !(loops % 10)) || server.blockedclients)
ed9b544e 1063 closeTimedoutClients();
1064
9d65a1bb 1065 /* Check if a background saving or AOF rewrite in progress terminated */
1066 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
ed9b544e 1067 int statloc;
9d65a1bb 1068 pid_t pid;
1069
1070 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1071 if (pid == server.bgsavechildpid) {
1072 backgroundSaveDoneHandler(statloc);
ed9b544e 1073 } else {
9d65a1bb 1074 backgroundRewriteDoneHandler(statloc);
ed9b544e 1075 }
ed9b544e 1076 }
1077 } else {
1078 /* If there is not a background saving in progress check if
1079 * we have to save now */
1080 time_t now = time(NULL);
1081 for (j = 0; j < server.saveparamslen; j++) {
1082 struct saveparam *sp = server.saveparams+j;
1083
1084 if (server.dirty >= sp->changes &&
1085 now-server.lastsave > sp->seconds) {
1086 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1087 sp->changes, sp->seconds);
f78fd11b 1088 rdbSaveBackground(server.dbfilename);
ed9b544e 1089 break;
1090 }
1091 }
1092 }
94754ccc 1093
f2324293 1094 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1095 * will use few CPU cycles if there are few expiring keys, otherwise
1096 * it will get more aggressive to avoid that too much memory is used by
1097 * keys that can be removed from the keyspace. */
94754ccc 1098 for (j = 0; j < server.dbnum; j++) {
f2324293 1099 int expired;
94754ccc 1100 redisDb *db = server.db+j;
94754ccc 1101
f2324293 1102 /* Continue to expire if at the end of the cycle more than 25%
1103 * of the keys were expired. */
1104 do {
1105 int num = dictSize(db->expires);
94754ccc 1106 time_t now = time(NULL);
1107
f2324293 1108 expired = 0;
94754ccc 1109 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1110 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1111 while (num--) {
1112 dictEntry *de;
1113 time_t t;
1114
1115 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1116 t = (time_t) dictGetEntryVal(de);
1117 if (now > t) {
1118 deleteKey(db,dictGetEntryKey(de));
f2324293 1119 expired++;
94754ccc 1120 }
1121 }
f2324293 1122 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
94754ccc 1123 }
1124
ed9b544e 1125 /* Check if we should connect to a MASTER */
1126 if (server.replstate == REDIS_REPL_CONNECT) {
1127 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1128 if (syncWithMaster() == REDIS_OK) {
1129 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1130 }
1131 }
1132 return 1000;
1133}
1134
1135static void createSharedObjects(void) {
1136 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1137 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1138 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
c937aa89 1139 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1140 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1141 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1142 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1143 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1144 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
ed9b544e 1145 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
6e469882 1146 shared.queued = createObject(REDIS_STRING,sdsnew("+QUEUED\r\n"));
ed9b544e 1147 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1148 "-ERR Operation against a key holding the wrong kind of value\r\n"));
ed9b544e 1149 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1150 "-ERR no such key\r\n"));
ed9b544e 1151 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1152 "-ERR syntax error\r\n"));
c937aa89 1153 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1154 "-ERR source and destination objects are the same\r\n"));
1155 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1156 "-ERR index out of range\r\n"));
ed9b544e 1157 shared.space = createObject(REDIS_STRING,sdsnew(" "));
c937aa89 1158 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1159 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
ed9b544e 1160 shared.select0 = createStringObject("select 0\r\n",10);
1161 shared.select1 = createStringObject("select 1\r\n",10);
1162 shared.select2 = createStringObject("select 2\r\n",10);
1163 shared.select3 = createStringObject("select 3\r\n",10);
1164 shared.select4 = createStringObject("select 4\r\n",10);
1165 shared.select5 = createStringObject("select 5\r\n",10);
1166 shared.select6 = createStringObject("select 6\r\n",10);
1167 shared.select7 = createStringObject("select 7\r\n",10);
1168 shared.select8 = createStringObject("select 8\r\n",10);
1169 shared.select9 = createStringObject("select 9\r\n",10);
1170}
1171
1172static void appendServerSaveParams(time_t seconds, int changes) {
1173 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
ed9b544e 1174 server.saveparams[server.saveparamslen].seconds = seconds;
1175 server.saveparams[server.saveparamslen].changes = changes;
1176 server.saveparamslen++;
1177}
1178
bcfc686d 1179static void resetServerSaveParams() {
ed9b544e 1180 zfree(server.saveparams);
1181 server.saveparams = NULL;
1182 server.saveparamslen = 0;
1183}
1184
1185static void initServerConfig() {
1186 server.dbnum = REDIS_DEFAULT_DBNUM;
1187 server.port = REDIS_SERVERPORT;
1188 server.verbosity = REDIS_DEBUG;
1189 server.maxidletime = REDIS_MAXIDLETIME;
1190 server.saveparams = NULL;
1191 server.logfile = NULL; /* NULL = log on standard output */
1192 server.bindaddr = NULL;
1193 server.glueoutputbuf = 1;
1194 server.daemonize = 0;
44b38ef4 1195 server.appendonly = 0;
4e141d5a 1196 server.appendfsync = APPENDFSYNC_ALWAYS;
48f0308a 1197 server.lastfsync = time(NULL);
44b38ef4 1198 server.appendfd = -1;
1199 server.appendseldb = -1; /* Make sure the first time will not match */
ed329fcf 1200 server.pidfile = "/var/run/redis.pid";
ed9b544e 1201 server.dbfilename = "dump.rdb";
9d65a1bb 1202 server.appendfilename = "appendonly.aof";
abcb223e 1203 server.requirepass = NULL;
10c43610 1204 server.shareobjects = 0;
b0553789 1205 server.rdbcompression = 1;
21aecf4b 1206 server.sharingpoolsize = 1024;
285add55 1207 server.maxclients = 0;
f86a74e9 1208 server.blockedclients = 0;
3fd78bcd 1209 server.maxmemory = 0;
bcfc686d 1210 resetServerSaveParams();
ed9b544e 1211
1212 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1213 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1214 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1215 /* Replication related */
1216 server.isslave = 0;
d0ccebcf 1217 server.masterauth = NULL;
ed9b544e 1218 server.masterhost = NULL;
1219 server.masterport = 6379;
1220 server.master = NULL;
1221 server.replstate = REDIS_REPL_NONE;
a7866db6 1222
1223 /* Double constants initialization */
1224 R_Zero = 0.0;
1225 R_PosInf = 1.0/R_Zero;
1226 R_NegInf = -1.0/R_Zero;
1227 R_Nan = R_Zero/R_Zero;
ed9b544e 1228}
1229
1230static void initServer() {
1231 int j;
1232
1233 signal(SIGHUP, SIG_IGN);
1234 signal(SIGPIPE, SIG_IGN);
fe3bbfbe 1235 setupSigSegvAction();
ed9b544e 1236
1237 server.clients = listCreate();
1238 server.slaves = listCreate();
87eca727 1239 server.monitors = listCreate();
ed9b544e 1240 server.objfreelist = listCreate();
1241 createSharedObjects();
1242 server.el = aeCreateEventLoop();
3305306f 1243 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
10c43610 1244 server.sharingpool = dictCreate(&setDictType,NULL);
ed9b544e 1245 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1246 if (server.fd == -1) {
1247 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1248 exit(1);
1249 }
3305306f 1250 for (j = 0; j < server.dbnum; j++) {
1251 server.db[j].dict = dictCreate(&hashDictType,NULL);
1252 server.db[j].expires = dictCreate(&setDictType,NULL);
4409877e 1253 server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL);
3305306f 1254 server.db[j].id = j;
1255 }
ed9b544e 1256 server.cronloops = 0;
9f3c422c 1257 server.bgsavechildpid = -1;
9d65a1bb 1258 server.bgrewritechildpid = -1;
1259 server.bgrewritebuf = sdsempty();
ed9b544e 1260 server.lastsave = time(NULL);
1261 server.dirty = 0;
1262 server.usedmemory = 0;
1263 server.stat_numcommands = 0;
1264 server.stat_numconnections = 0;
1265 server.stat_starttime = time(NULL);
d8f8b666 1266 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
44b38ef4 1267
1268 if (server.appendonly) {
71eba477 1269 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
44b38ef4 1270 if (server.appendfd == -1) {
1271 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1272 strerror(errno));
1273 exit(1);
1274 }
1275 }
ed9b544e 1276}
1277
1278/* Empty the whole database */
ca37e9cd 1279static long long emptyDb() {
ed9b544e 1280 int j;
ca37e9cd 1281 long long removed = 0;
ed9b544e 1282
3305306f 1283 for (j = 0; j < server.dbnum; j++) {
ca37e9cd 1284 removed += dictSize(server.db[j].dict);
3305306f 1285 dictEmpty(server.db[j].dict);
1286 dictEmpty(server.db[j].expires);
1287 }
ca37e9cd 1288 return removed;
ed9b544e 1289}
1290
85dd2f3a 1291static int yesnotoi(char *s) {
1292 if (!strcasecmp(s,"yes")) return 1;
1293 else if (!strcasecmp(s,"no")) return 0;
1294 else return -1;
1295}
1296
ed9b544e 1297/* I agree, this is a very rudimental way to load a configuration...
1298 will improve later if the config gets more complex */
1299static void loadServerConfig(char *filename) {
c9a111ac 1300 FILE *fp;
ed9b544e 1301 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1302 int linenum = 0;
1303 sds line = NULL;
c9a111ac 1304
1305 if (filename[0] == '-' && filename[1] == '\0')
1306 fp = stdin;
1307 else {
1308 if ((fp = fopen(filename,"r")) == NULL) {
1309 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1310 exit(1);
1311 }
ed9b544e 1312 }
c9a111ac 1313
ed9b544e 1314 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1315 sds *argv;
1316 int argc, j;
1317
1318 linenum++;
1319 line = sdsnew(buf);
1320 line = sdstrim(line," \t\r\n");
1321
1322 /* Skip comments and blank lines*/
1323 if (line[0] == '#' || line[0] == '\0') {
1324 sdsfree(line);
1325 continue;
1326 }
1327
1328 /* Split into arguments */
1329 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1330 sdstolower(argv[0]);
1331
1332 /* Execute config directives */
bb0b03a3 1333 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
ed9b544e 1334 server.maxidletime = atoi(argv[1]);
0150db36 1335 if (server.maxidletime < 0) {
ed9b544e 1336 err = "Invalid timeout value"; goto loaderr;
1337 }
bb0b03a3 1338 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
ed9b544e 1339 server.port = atoi(argv[1]);
1340 if (server.port < 1 || server.port > 65535) {
1341 err = "Invalid port"; goto loaderr;
1342 }
bb0b03a3 1343 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
ed9b544e 1344 server.bindaddr = zstrdup(argv[1]);
bb0b03a3 1345 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
ed9b544e 1346 int seconds = atoi(argv[1]);
1347 int changes = atoi(argv[2]);
1348 if (seconds < 1 || changes < 0) {
1349 err = "Invalid save parameters"; goto loaderr;
1350 }
1351 appendServerSaveParams(seconds,changes);
bb0b03a3 1352 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
ed9b544e 1353 if (chdir(argv[1]) == -1) {
1354 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1355 argv[1], strerror(errno));
1356 exit(1);
1357 }
bb0b03a3 1358 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1359 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1360 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1361 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
ed9b544e 1362 else {
1363 err = "Invalid log level. Must be one of debug, notice, warning";
1364 goto loaderr;
1365 }
bb0b03a3 1366 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
c9a111ac 1367 FILE *logfp;
ed9b544e 1368
1369 server.logfile = zstrdup(argv[1]);
bb0b03a3 1370 if (!strcasecmp(server.logfile,"stdout")) {
ed9b544e 1371 zfree(server.logfile);
1372 server.logfile = NULL;
1373 }
1374 if (server.logfile) {
1375 /* Test if we are able to open the file. The server will not
1376 * be able to abort just for this problem later... */
c9a111ac 1377 logfp = fopen(server.logfile,"a");
1378 if (logfp == NULL) {
ed9b544e 1379 err = sdscatprintf(sdsempty(),
1380 "Can't open the log file: %s", strerror(errno));
1381 goto loaderr;
1382 }
c9a111ac 1383 fclose(logfp);
ed9b544e 1384 }
bb0b03a3 1385 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
ed9b544e 1386 server.dbnum = atoi(argv[1]);
1387 if (server.dbnum < 1) {
1388 err = "Invalid number of databases"; goto loaderr;
1389 }
285add55 1390 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1391 server.maxclients = atoi(argv[1]);
3fd78bcd 1392 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
d4465900 1393 server.maxmemory = strtoll(argv[1], NULL, 10);
bb0b03a3 1394 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
ed9b544e 1395 server.masterhost = sdsnew(argv[1]);
1396 server.masterport = atoi(argv[2]);
1397 server.replstate = REDIS_REPL_CONNECT;
d0ccebcf 1398 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1399 server.masterauth = zstrdup(argv[1]);
bb0b03a3 1400 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
85dd2f3a 1401 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
ed9b544e 1402 err = "argument must be 'yes' or 'no'"; goto loaderr;
1403 }
bb0b03a3 1404 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
85dd2f3a 1405 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
10c43610 1406 err = "argument must be 'yes' or 'no'"; goto loaderr;
1407 }
121f70cf 1408 } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
1409 if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
1410 err = "argument must be 'yes' or 'no'"; goto loaderr;
1411 }
e52c65b9 1412 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1413 server.sharingpoolsize = atoi(argv[1]);
1414 if (server.sharingpoolsize < 1) {
1415 err = "invalid object sharing pool size"; goto loaderr;
1416 }
bb0b03a3 1417 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
85dd2f3a 1418 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
ed9b544e 1419 err = "argument must be 'yes' or 'no'"; goto loaderr;
1420 }
44b38ef4 1421 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1422 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1423 err = "argument must be 'yes' or 'no'"; goto loaderr;
1424 }
48f0308a 1425 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1766c6da 1426 if (!strcasecmp(argv[1],"no")) {
48f0308a 1427 server.appendfsync = APPENDFSYNC_NO;
1766c6da 1428 } else if (!strcasecmp(argv[1],"always")) {
48f0308a 1429 server.appendfsync = APPENDFSYNC_ALWAYS;
1766c6da 1430 } else if (!strcasecmp(argv[1],"everysec")) {
48f0308a 1431 server.appendfsync = APPENDFSYNC_EVERYSEC;
1432 } else {
1433 err = "argument must be 'no', 'always' or 'everysec'";
1434 goto loaderr;
1435 }
bb0b03a3 1436 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
abcb223e 1437 server.requirepass = zstrdup(argv[1]);
bb0b03a3 1438 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
ed329fcf 1439 server.pidfile = zstrdup(argv[1]);
bb0b03a3 1440 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
b8b553c8 1441 server.dbfilename = zstrdup(argv[1]);
ed9b544e 1442 } else {
1443 err = "Bad directive or wrong number of arguments"; goto loaderr;
1444 }
1445 for (j = 0; j < argc; j++)
1446 sdsfree(argv[j]);
1447 zfree(argv);
1448 sdsfree(line);
1449 }
c9a111ac 1450 if (fp != stdin) fclose(fp);
ed9b544e 1451 return;
1452
1453loaderr:
1454 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1455 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1456 fprintf(stderr, ">>> '%s'\n", line);
1457 fprintf(stderr, "%s\n", err);
1458 exit(1);
1459}
1460
1461static void freeClientArgv(redisClient *c) {
1462 int j;
1463
1464 for (j = 0; j < c->argc; j++)
1465 decrRefCount(c->argv[j]);
e8a74421 1466 for (j = 0; j < c->mbargc; j++)
1467 decrRefCount(c->mbargv[j]);
ed9b544e 1468 c->argc = 0;
e8a74421 1469 c->mbargc = 0;
ed9b544e 1470}
1471
1472static void freeClient(redisClient *c) {
1473 listNode *ln;
1474
4409877e 1475 /* Note that if the client we are freeing is blocked into a blocking
1476 * call, we have to set querybuf to NULL *before* to call unblockClient()
1477 * to avoid processInputBuffer() will get called. Also it is important
1478 * to remove the file events after this, because this call adds
1479 * the READABLE event. */
1480 sdsfree(c->querybuf);
1481 c->querybuf = NULL;
1482 if (c->flags & REDIS_BLOCKED)
1483 unblockClient(c);
1484
ed9b544e 1485 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1486 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
ed9b544e 1487 listRelease(c->reply);
1488 freeClientArgv(c);
1489 close(c->fd);
1490 ln = listSearchKey(server.clients,c);
dfc5e96c 1491 redisAssert(ln != NULL);
ed9b544e 1492 listDelNode(server.clients,ln);
1493 if (c->flags & REDIS_SLAVE) {
6208b3a7 1494 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1495 close(c->repldbfd);
87eca727 1496 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1497 ln = listSearchKey(l,c);
dfc5e96c 1498 redisAssert(ln != NULL);
87eca727 1499 listDelNode(l,ln);
ed9b544e 1500 }
1501 if (c->flags & REDIS_MASTER) {
1502 server.master = NULL;
1503 server.replstate = REDIS_REPL_CONNECT;
1504 }
93ea3759 1505 zfree(c->argv);
e8a74421 1506 zfree(c->mbargv);
6e469882 1507 freeClientMultiState(c);
ed9b544e 1508 zfree(c);
1509}
1510
cc30e368 1511#define GLUEREPLY_UP_TO (1024)
ed9b544e 1512static void glueReplyBuffersIfNeeded(redisClient *c) {
c28b42ac 1513 int copylen = 0;
1514 char buf[GLUEREPLY_UP_TO];
6208b3a7 1515 listNode *ln;
ed9b544e 1516 robj *o;
1517
6208b3a7 1518 listRewind(c->reply);
1519 while((ln = listYield(c->reply))) {
c28b42ac 1520 int objlen;
1521
ed9b544e 1522 o = ln->value;
c28b42ac 1523 objlen = sdslen(o->ptr);
1524 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1525 memcpy(buf+copylen,o->ptr,objlen);
1526 copylen += objlen;
ed9b544e 1527 listDelNode(c->reply,ln);
c28b42ac 1528 } else {
1529 if (copylen == 0) return;
1530 break;
ed9b544e 1531 }
ed9b544e 1532 }
c28b42ac 1533 /* Now the output buffer is empty, add the new single element */
1534 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1535 listAddNodeHead(c->reply,o);
ed9b544e 1536}
1537
1538static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1539 redisClient *c = privdata;
1540 int nwritten = 0, totwritten = 0, objlen;
1541 robj *o;
1542 REDIS_NOTUSED(el);
1543 REDIS_NOTUSED(mask);
1544
2895e862 1545 /* Use writev() if we have enough buffers to send */
7ea870c0 1546 if (!server.glueoutputbuf &&
1547 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1548 !(c->flags & REDIS_MASTER))
2895e862 1549 {
1550 sendReplyToClientWritev(el, fd, privdata, mask);
1551 return;
1552 }
2895e862 1553
ed9b544e 1554 while(listLength(c->reply)) {
c28b42ac 1555 if (server.glueoutputbuf && listLength(c->reply) > 1)
1556 glueReplyBuffersIfNeeded(c);
1557
ed9b544e 1558 o = listNodeValue(listFirst(c->reply));
1559 objlen = sdslen(o->ptr);
1560
1561 if (objlen == 0) {
1562 listDelNode(c->reply,listFirst(c->reply));
1563 continue;
1564 }
1565
1566 if (c->flags & REDIS_MASTER) {
6f376729 1567 /* Don't reply to a master */
ed9b544e 1568 nwritten = objlen - c->sentlen;
1569 } else {
a4d1ba9a 1570 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
ed9b544e 1571 if (nwritten <= 0) break;
1572 }
1573 c->sentlen += nwritten;
1574 totwritten += nwritten;
1575 /* If we fully sent the object on head go to the next one */
1576 if (c->sentlen == objlen) {
1577 listDelNode(c->reply,listFirst(c->reply));
1578 c->sentlen = 0;
1579 }
6f376729 1580 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
12f9d551 1581 * bytes, in a single threaded server it's a good idea to serve
6f376729 1582 * other clients as well, even if a very large request comes from
1583 * super fast link that is always able to accept data (in real world
12f9d551 1584 * scenario think about 'KEYS *' against the loopback interfae) */
6f376729 1585 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
ed9b544e 1586 }
1587 if (nwritten == -1) {
1588 if (errno == EAGAIN) {
1589 nwritten = 0;
1590 } else {
1591 redisLog(REDIS_DEBUG,
1592 "Error writing to client: %s", strerror(errno));
1593 freeClient(c);
1594 return;
1595 }
1596 }
1597 if (totwritten > 0) c->lastinteraction = time(NULL);
1598 if (listLength(c->reply) == 0) {
1599 c->sentlen = 0;
1600 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1601 }
1602}
1603
2895e862 1604static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1605{
1606 redisClient *c = privdata;
1607 int nwritten = 0, totwritten = 0, objlen, willwrite;
1608 robj *o;
1609 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1610 int offset, ion = 0;
1611 REDIS_NOTUSED(el);
1612 REDIS_NOTUSED(mask);
1613
1614 listNode *node;
1615 while (listLength(c->reply)) {
1616 offset = c->sentlen;
1617 ion = 0;
1618 willwrite = 0;
1619
1620 /* fill-in the iov[] array */
1621 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1622 o = listNodeValue(node);
1623 objlen = sdslen(o->ptr);
1624
1625 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1626 break;
1627
1628 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1629 break; /* no more iovecs */
1630
1631 iov[ion].iov_base = ((char*)o->ptr) + offset;
1632 iov[ion].iov_len = objlen - offset;
1633 willwrite += objlen - offset;
1634 offset = 0; /* just for the first item */
1635 ion++;
1636 }
1637
1638 if(willwrite == 0)
1639 break;
1640
1641 /* write all collected blocks at once */
1642 if((nwritten = writev(fd, iov, ion)) < 0) {
1643 if (errno != EAGAIN) {
1644 redisLog(REDIS_DEBUG,
1645 "Error writing to client: %s", strerror(errno));
1646 freeClient(c);
1647 return;
1648 }
1649 break;
1650 }
1651
1652 totwritten += nwritten;
1653 offset = c->sentlen;
1654
1655 /* remove written robjs from c->reply */
1656 while (nwritten && listLength(c->reply)) {
1657 o = listNodeValue(listFirst(c->reply));
1658 objlen = sdslen(o->ptr);
1659
1660 if(nwritten >= objlen - offset) {
1661 listDelNode(c->reply, listFirst(c->reply));
1662 nwritten -= objlen - offset;
1663 c->sentlen = 0;
1664 } else {
1665 /* partial write */
1666 c->sentlen += nwritten;
1667 break;
1668 }
1669 offset = 0;
1670 }
1671 }
1672
1673 if (totwritten > 0)
1674 c->lastinteraction = time(NULL);
1675
1676 if (listLength(c->reply) == 0) {
1677 c->sentlen = 0;
1678 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1679 }
1680}
1681
ed9b544e 1682static struct redisCommand *lookupCommand(char *name) {
1683 int j = 0;
1684 while(cmdTable[j].name != NULL) {
bb0b03a3 1685 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
ed9b544e 1686 j++;
1687 }
1688 return NULL;
1689}
1690
1691/* resetClient prepare the client to process the next command */
1692static void resetClient(redisClient *c) {
1693 freeClientArgv(c);
1694 c->bulklen = -1;
e8a74421 1695 c->multibulk = 0;
ed9b544e 1696}
1697
6e469882 1698/* Call() is the core of Redis execution of a command */
1699static void call(redisClient *c, struct redisCommand *cmd) {
1700 long long dirty;
1701
1702 dirty = server.dirty;
1703 cmd->proc(c);
1704 if (server.appendonly && server.dirty-dirty)
1705 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1706 if (server.dirty-dirty && listLength(server.slaves))
1707 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1708 if (listLength(server.monitors))
1709 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1710 server.stat_numcommands++;
1711}
1712
ed9b544e 1713/* If this function gets called we already read a whole
1714 * command, argments are in the client argv/argc fields.
1715 * processCommand() execute the command or prepare the
1716 * server for a bulk read from the client.
1717 *
1718 * If 1 is returned the client is still alive and valid and
1719 * and other operations can be performed by the caller. Otherwise
1720 * if 0 is returned the client was destroied (i.e. after QUIT). */
1721static int processCommand(redisClient *c) {
1722 struct redisCommand *cmd;
ed9b544e 1723
3fd78bcd 1724 /* Free some memory if needed (maxmemory setting) */
1725 if (server.maxmemory) freeMemoryIfNeeded();
1726
e8a74421 1727 /* Handle the multi bulk command type. This is an alternative protocol
1728 * supported by Redis in order to receive commands that are composed of
1729 * multiple binary-safe "bulk" arguments. The latency of processing is
1730 * a bit higher but this allows things like multi-sets, so if this
1731 * protocol is used only for MSET and similar commands this is a big win. */
1732 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1733 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1734 if (c->multibulk <= 0) {
1735 resetClient(c);
1736 return 1;
1737 } else {
1738 decrRefCount(c->argv[c->argc-1]);
1739 c->argc--;
1740 return 1;
1741 }
1742 } else if (c->multibulk) {
1743 if (c->bulklen == -1) {
1744 if (((char*)c->argv[0]->ptr)[0] != '$') {
1745 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1746 resetClient(c);
1747 return 1;
1748 } else {
1749 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1750 decrRefCount(c->argv[0]);
1751 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1752 c->argc--;
1753 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1754 resetClient(c);
1755 return 1;
1756 }
1757 c->argc--;
1758 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1759 return 1;
1760 }
1761 } else {
1762 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1763 c->mbargv[c->mbargc] = c->argv[0];
1764 c->mbargc++;
1765 c->argc--;
1766 c->multibulk--;
1767 if (c->multibulk == 0) {
1768 robj **auxargv;
1769 int auxargc;
1770
1771 /* Here we need to swap the multi-bulk argc/argv with the
1772 * normal argc/argv of the client structure. */
1773 auxargv = c->argv;
1774 c->argv = c->mbargv;
1775 c->mbargv = auxargv;
1776
1777 auxargc = c->argc;
1778 c->argc = c->mbargc;
1779 c->mbargc = auxargc;
1780
1781 /* We need to set bulklen to something different than -1
1782 * in order for the code below to process the command without
1783 * to try to read the last argument of a bulk command as
1784 * a special argument. */
1785 c->bulklen = 0;
1786 /* continue below and process the command */
1787 } else {
1788 c->bulklen = -1;
1789 return 1;
1790 }
1791 }
1792 }
1793 /* -- end of multi bulk commands processing -- */
1794
ed9b544e 1795 /* The QUIT command is handled as a special case. Normal command
1796 * procs are unable to close the client connection safely */
bb0b03a3 1797 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
ed9b544e 1798 freeClient(c);
1799 return 0;
1800 }
1801 cmd = lookupCommand(c->argv[0]->ptr);
1802 if (!cmd) {
2c14807b 1803 addReplySds(c,
1804 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1805 (char*)c->argv[0]->ptr));
ed9b544e 1806 resetClient(c);
1807 return 1;
1808 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1809 (c->argc < -cmd->arity)) {
454d4e43 1810 addReplySds(c,
1811 sdscatprintf(sdsempty(),
1812 "-ERR wrong number of arguments for '%s' command\r\n",
1813 cmd->name));
ed9b544e 1814 resetClient(c);
1815 return 1;
3fd78bcd 1816 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1817 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1818 resetClient(c);
1819 return 1;
ed9b544e 1820 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1821 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1822
1823 decrRefCount(c->argv[c->argc-1]);
1824 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1825 c->argc--;
1826 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1827 resetClient(c);
1828 return 1;
1829 }
1830 c->argc--;
1831 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1832 /* It is possible that the bulk read is already in the
8d0490e7 1833 * buffer. Check this condition and handle it accordingly.
1834 * This is just a fast path, alternative to call processInputBuffer().
1835 * It's a good idea since the code is small and this condition
1836 * happens most of the times. */
ed9b544e 1837 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1838 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1839 c->argc++;
1840 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1841 } else {
1842 return 1;
1843 }
1844 }
10c43610 1845 /* Let's try to share objects on the command arguments vector */
1846 if (server.shareobjects) {
1847 int j;
1848 for(j = 1; j < c->argc; j++)
1849 c->argv[j] = tryObjectSharing(c->argv[j]);
1850 }
942a3961 1851 /* Let's try to encode the bulk object to save space. */
1852 if (cmd->flags & REDIS_CMD_BULK)
1853 tryObjectEncoding(c->argv[c->argc-1]);
1854
e63943a4 1855 /* Check if the user is authenticated */
1856 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1857 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1858 resetClient(c);
1859 return 1;
1860 }
1861
ed9b544e 1862 /* Exec the command */
6e469882 1863 if (c->flags & REDIS_MULTI && cmd->proc != execCommand) {
1864 queueMultiCommand(c,cmd);
1865 addReply(c,shared.queued);
1866 } else {
1867 call(c,cmd);
1868 }
ed9b544e 1869
1870 /* Prepare the client for the next command */
1871 if (c->flags & REDIS_CLOSE) {
1872 freeClient(c);
1873 return 0;
1874 }
1875 resetClient(c);
1876 return 1;
1877}
1878
87eca727 1879static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
6208b3a7 1880 listNode *ln;
ed9b544e 1881 int outc = 0, j;
93ea3759 1882 robj **outv;
1883 /* (args*2)+1 is enough room for args, spaces, newlines */
1884 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1885
1886 if (argc <= REDIS_STATIC_ARGS) {
1887 outv = static_outv;
1888 } else {
1889 outv = zmalloc(sizeof(robj*)*(argc*2+1));
93ea3759 1890 }
ed9b544e 1891
1892 for (j = 0; j < argc; j++) {
1893 if (j != 0) outv[outc++] = shared.space;
1894 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1895 robj *lenobj;
1896
1897 lenobj = createObject(REDIS_STRING,
682ac724 1898 sdscatprintf(sdsempty(),"%lu\r\n",
83c6a618 1899 (unsigned long) stringObjectLen(argv[j])));
ed9b544e 1900 lenobj->refcount = 0;
1901 outv[outc++] = lenobj;
1902 }
1903 outv[outc++] = argv[j];
1904 }
1905 outv[outc++] = shared.crlf;
1906
40d224a9 1907 /* Increment all the refcounts at start and decrement at end in order to
1908 * be sure to free objects if there is no slave in a replication state
1909 * able to be feed with commands */
1910 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
6208b3a7 1911 listRewind(slaves);
1912 while((ln = listYield(slaves))) {
ed9b544e 1913 redisClient *slave = ln->value;
40d224a9 1914
1915 /* Don't feed slaves that are still waiting for BGSAVE to start */
6208b3a7 1916 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
40d224a9 1917
1918 /* Feed all the other slaves, MONITORs and so on */
ed9b544e 1919 if (slave->slaveseldb != dictid) {
1920 robj *selectcmd;
1921
1922 switch(dictid) {
1923 case 0: selectcmd = shared.select0; break;
1924 case 1: selectcmd = shared.select1; break;
1925 case 2: selectcmd = shared.select2; break;
1926 case 3: selectcmd = shared.select3; break;
1927 case 4: selectcmd = shared.select4; break;
1928 case 5: selectcmd = shared.select5; break;
1929 case 6: selectcmd = shared.select6; break;
1930 case 7: selectcmd = shared.select7; break;
1931 case 8: selectcmd = shared.select8; break;
1932 case 9: selectcmd = shared.select9; break;
1933 default:
1934 selectcmd = createObject(REDIS_STRING,
1935 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1936 selectcmd->refcount = 0;
1937 break;
1938 }
1939 addReply(slave,selectcmd);
1940 slave->slaveseldb = dictid;
1941 }
1942 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
ed9b544e 1943 }
40d224a9 1944 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
93ea3759 1945 if (outv != static_outv) zfree(outv);
ed9b544e 1946}
1947
638e42ac 1948static void processInputBuffer(redisClient *c) {
ed9b544e 1949again:
4409877e 1950 /* Before to process the input buffer, make sure the client is not
1951 * waitig for a blocking operation such as BLPOP. Note that the first
1952 * iteration the client is never blocked, otherwise the processInputBuffer
1953 * would not be called at all, but after the execution of the first commands
1954 * in the input buffer the client may be blocked, and the "goto again"
1955 * will try to reiterate. The following line will make it return asap. */
1956 if (c->flags & REDIS_BLOCKED) return;
ed9b544e 1957 if (c->bulklen == -1) {
1958 /* Read the first line of the query */
1959 char *p = strchr(c->querybuf,'\n');
1960 size_t querylen;
644fafa3 1961
ed9b544e 1962 if (p) {
1963 sds query, *argv;
1964 int argc, j;
1965
1966 query = c->querybuf;
1967 c->querybuf = sdsempty();
1968 querylen = 1+(p-(query));
1969 if (sdslen(query) > querylen) {
1970 /* leave data after the first line of the query in the buffer */
1971 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1972 }
1973 *p = '\0'; /* remove "\n" */
1974 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1975 sdsupdatelen(query);
1976
1977 /* Now we can split the query in arguments */
ed9b544e 1978 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
93ea3759 1979 sdsfree(query);
1980
1981 if (c->argv) zfree(c->argv);
1982 c->argv = zmalloc(sizeof(robj*)*argc);
93ea3759 1983
1984 for (j = 0; j < argc; j++) {
ed9b544e 1985 if (sdslen(argv[j])) {
1986 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1987 c->argc++;
1988 } else {
1989 sdsfree(argv[j]);
1990 }
1991 }
1992 zfree(argv);
7c49733c 1993 if (c->argc) {
1994 /* Execute the command. If the client is still valid
1995 * after processCommand() return and there is something
1996 * on the query buffer try to process the next command. */
1997 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1998 } else {
1999 /* Nothing to process, argc == 0. Just process the query
2000 * buffer if it's not empty or return to the caller */
2001 if (sdslen(c->querybuf)) goto again;
2002 }
ed9b544e 2003 return;
644fafa3 2004 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
ed9b544e 2005 redisLog(REDIS_DEBUG, "Client protocol error");
2006 freeClient(c);
2007 return;
2008 }
2009 } else {
2010 /* Bulk read handling. Note that if we are at this point
2011 the client already sent a command terminated with a newline,
2012 we are reading the bulk data that is actually the last
2013 argument of the command. */
2014 int qbl = sdslen(c->querybuf);
2015
2016 if (c->bulklen <= qbl) {
2017 /* Copy everything but the final CRLF as final argument */
2018 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
2019 c->argc++;
2020 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
638e42ac 2021 /* Process the command. If the client is still valid after
2022 * the processing and there is more data in the buffer
2023 * try to parse it. */
2024 if (processCommand(c) && sdslen(c->querybuf)) goto again;
ed9b544e 2025 return;
2026 }
2027 }
2028}
2029
638e42ac 2030static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
2031 redisClient *c = (redisClient*) privdata;
2032 char buf[REDIS_IOBUF_LEN];
2033 int nread;
2034 REDIS_NOTUSED(el);
2035 REDIS_NOTUSED(mask);
2036
2037 nread = read(fd, buf, REDIS_IOBUF_LEN);
2038 if (nread == -1) {
2039 if (errno == EAGAIN) {
2040 nread = 0;
2041 } else {
2042 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
2043 freeClient(c);
2044 return;
2045 }
2046 } else if (nread == 0) {
2047 redisLog(REDIS_DEBUG, "Client closed connection");
2048 freeClient(c);
2049 return;
2050 }
2051 if (nread) {
2052 c->querybuf = sdscatlen(c->querybuf, buf, nread);
2053 c->lastinteraction = time(NULL);
2054 } else {
2055 return;
2056 }
2057 processInputBuffer(c);
2058}
2059
ed9b544e 2060static int selectDb(redisClient *c, int id) {
2061 if (id < 0 || id >= server.dbnum)
2062 return REDIS_ERR;
3305306f 2063 c->db = &server.db[id];
ed9b544e 2064 return REDIS_OK;
2065}
2066
40d224a9 2067static void *dupClientReplyValue(void *o) {
2068 incrRefCount((robj*)o);
2069 return 0;
2070}
2071
ed9b544e 2072static redisClient *createClient(int fd) {
2073 redisClient *c = zmalloc(sizeof(*c));
2074
2075 anetNonBlock(NULL,fd);
2076 anetTcpNoDelay(NULL,fd);
2077 if (!c) return NULL;
2078 selectDb(c,0);
2079 c->fd = fd;
2080 c->querybuf = sdsempty();
2081 c->argc = 0;
93ea3759 2082 c->argv = NULL;
ed9b544e 2083 c->bulklen = -1;
e8a74421 2084 c->multibulk = 0;
2085 c->mbargc = 0;
2086 c->mbargv = NULL;
ed9b544e 2087 c->sentlen = 0;
2088 c->flags = 0;
2089 c->lastinteraction = time(NULL);
abcb223e 2090 c->authenticated = 0;
40d224a9 2091 c->replstate = REDIS_REPL_NONE;
6b47e12e 2092 c->reply = listCreate();
b177fd30 2093 c->blockingkeys = NULL;
2094 c->blockingkeysnum = 0;
ed9b544e 2095 listSetFreeMethod(c->reply,decrRefCount);
40d224a9 2096 listSetDupMethod(c->reply,dupClientReplyValue);
ed9b544e 2097 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
266373b2 2098 readQueryFromClient, c) == AE_ERR) {
ed9b544e 2099 freeClient(c);
2100 return NULL;
2101 }
6b47e12e 2102 listAddNodeTail(server.clients,c);
6e469882 2103 initClientMultiState(c);
ed9b544e 2104 return c;
2105}
2106
2107static void addReply(redisClient *c, robj *obj) {
2108 if (listLength(c->reply) == 0 &&
6208b3a7 2109 (c->replstate == REDIS_REPL_NONE ||
2110 c->replstate == REDIS_REPL_ONLINE) &&
ed9b544e 2111 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
266373b2 2112 sendReplyToClient, c) == AE_ERR) return;
9d65a1bb 2113 listAddNodeTail(c->reply,getDecodedObject(obj));
ed9b544e 2114}
2115
2116static void addReplySds(redisClient *c, sds s) {
2117 robj *o = createObject(REDIS_STRING,s);
2118 addReply(c,o);
2119 decrRefCount(o);
2120}
2121
e2665397 2122static void addReplyDouble(redisClient *c, double d) {
2123 char buf[128];
2124
2125 snprintf(buf,sizeof(buf),"%.17g",d);
682ac724 2126 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
83c6a618 2127 (unsigned long) strlen(buf),buf));
e2665397 2128}
2129
942a3961 2130static void addReplyBulkLen(redisClient *c, robj *obj) {
2131 size_t len;
2132
2133 if (obj->encoding == REDIS_ENCODING_RAW) {
2134 len = sdslen(obj->ptr);
2135 } else {
2136 long n = (long)obj->ptr;
2137
e054afda 2138 /* Compute how many bytes will take this integer as a radix 10 string */
942a3961 2139 len = 1;
2140 if (n < 0) {
2141 len++;
2142 n = -n;
2143 }
2144 while((n = n/10) != 0) {
2145 len++;
2146 }
2147 }
83c6a618 2148 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
942a3961 2149}
2150
ed9b544e 2151static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2152 int cport, cfd;
2153 char cip[128];
285add55 2154 redisClient *c;
ed9b544e 2155 REDIS_NOTUSED(el);
2156 REDIS_NOTUSED(mask);
2157 REDIS_NOTUSED(privdata);
2158
2159 cfd = anetAccept(server.neterr, fd, cip, &cport);
2160 if (cfd == AE_ERR) {
2161 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2162 return;
2163 }
2164 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
285add55 2165 if ((c = createClient(cfd)) == NULL) {
ed9b544e 2166 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2167 close(cfd); /* May be already closed, just ingore errors */
2168 return;
2169 }
285add55 2170 /* If maxclient directive is set and this is one client more... close the
2171 * connection. Note that we create the client instead to check before
2172 * for this condition, since now the socket is already set in nonblocking
2173 * mode and we can send an error for free using the Kernel I/O */
2174 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2175 char *err = "-ERR max number of clients reached\r\n";
2176
2177 /* That's a best effort error message, don't check write errors */
fee803ba 2178 if (write(c->fd,err,strlen(err)) == -1) {
2179 /* Nothing to do, Just to avoid the warning... */
2180 }
285add55 2181 freeClient(c);
2182 return;
2183 }
ed9b544e 2184 server.stat_numconnections++;
2185}
2186
2187/* ======================= Redis objects implementation ===================== */
2188
2189static robj *createObject(int type, void *ptr) {
2190 robj *o;
2191
2192 if (listLength(server.objfreelist)) {
2193 listNode *head = listFirst(server.objfreelist);
2194 o = listNodeValue(head);
2195 listDelNode(server.objfreelist,head);
2196 } else {
2197 o = zmalloc(sizeof(*o));
2198 }
ed9b544e 2199 o->type = type;
942a3961 2200 o->encoding = REDIS_ENCODING_RAW;
ed9b544e 2201 o->ptr = ptr;
2202 o->refcount = 1;
2203 return o;
2204}
2205
2206static robj *createStringObject(char *ptr, size_t len) {
2207 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2208}
2209
2210static robj *createListObject(void) {
2211 list *l = listCreate();
2212
ed9b544e 2213 listSetFreeMethod(l,decrRefCount);
2214 return createObject(REDIS_LIST,l);
2215}
2216
2217static robj *createSetObject(void) {
2218 dict *d = dictCreate(&setDictType,NULL);
ed9b544e 2219 return createObject(REDIS_SET,d);
2220}
2221
1812e024 2222static robj *createZsetObject(void) {
6b47e12e 2223 zset *zs = zmalloc(sizeof(*zs));
2224
2225 zs->dict = dictCreate(&zsetDictType,NULL);
2226 zs->zsl = zslCreate();
2227 return createObject(REDIS_ZSET,zs);
1812e024 2228}
2229
ed9b544e 2230static void freeStringObject(robj *o) {
942a3961 2231 if (o->encoding == REDIS_ENCODING_RAW) {
2232 sdsfree(o->ptr);
2233 }
ed9b544e 2234}
2235
2236static void freeListObject(robj *o) {
2237 listRelease((list*) o->ptr);
2238}
2239
2240static void freeSetObject(robj *o) {
2241 dictRelease((dict*) o->ptr);
2242}
2243
fd8ccf44 2244static void freeZsetObject(robj *o) {
2245 zset *zs = o->ptr;
2246
2247 dictRelease(zs->dict);
2248 zslFree(zs->zsl);
2249 zfree(zs);
2250}
2251
ed9b544e 2252static void freeHashObject(robj *o) {
2253 dictRelease((dict*) o->ptr);
2254}
2255
2256static void incrRefCount(robj *o) {
2257 o->refcount++;
94754ccc 2258#ifdef DEBUG_REFCOUNT
2259 if (o->type == REDIS_STRING)
2260 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2261#endif
ed9b544e 2262}
2263
2264static void decrRefCount(void *obj) {
2265 robj *o = obj;
94754ccc 2266
2267#ifdef DEBUG_REFCOUNT
2268 if (o->type == REDIS_STRING)
2269 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2270#endif
ed9b544e 2271 if (--(o->refcount) == 0) {
2272 switch(o->type) {
2273 case REDIS_STRING: freeStringObject(o); break;
2274 case REDIS_LIST: freeListObject(o); break;
2275 case REDIS_SET: freeSetObject(o); break;
fd8ccf44 2276 case REDIS_ZSET: freeZsetObject(o); break;
ed9b544e 2277 case REDIS_HASH: freeHashObject(o); break;
dfc5e96c 2278 default: redisAssert(0 != 0); break;
ed9b544e 2279 }
2280 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2281 !listAddNodeHead(server.objfreelist,o))
2282 zfree(o);
2283 }
2284}
2285
942a3961 2286static robj *lookupKey(redisDb *db, robj *key) {
2287 dictEntry *de = dictFind(db->dict,key);
2288 return de ? dictGetEntryVal(de) : NULL;
2289}
2290
2291static robj *lookupKeyRead(redisDb *db, robj *key) {
2292 expireIfNeeded(db,key);
2293 return lookupKey(db,key);
2294}
2295
2296static robj *lookupKeyWrite(redisDb *db, robj *key) {
2297 deleteIfVolatile(db,key);
2298 return lookupKey(db,key);
2299}
2300
2301static int deleteKey(redisDb *db, robj *key) {
2302 int retval;
2303
2304 /* We need to protect key from destruction: after the first dictDelete()
2305 * it may happen that 'key' is no longer valid if we don't increment
2306 * it's count. This may happen when we get the object reference directly
2307 * from the hash table with dictRandomKey() or dict iterators */
2308 incrRefCount(key);
2309 if (dictSize(db->expires)) dictDelete(db->expires,key);
2310 retval = dictDelete(db->dict,key);
2311 decrRefCount(key);
2312
2313 return retval == DICT_OK;
2314}
2315
10c43610 2316/* Try to share an object against the shared objects pool */
2317static robj *tryObjectSharing(robj *o) {
2318 struct dictEntry *de;
2319 unsigned long c;
2320
3305306f 2321 if (o == NULL || server.shareobjects == 0) return o;
10c43610 2322
dfc5e96c 2323 redisAssert(o->type == REDIS_STRING);
10c43610 2324 de = dictFind(server.sharingpool,o);
2325 if (de) {
2326 robj *shared = dictGetEntryKey(de);
2327
2328 c = ((unsigned long) dictGetEntryVal(de))+1;
2329 dictGetEntryVal(de) = (void*) c;
2330 incrRefCount(shared);
2331 decrRefCount(o);
2332 return shared;
2333 } else {
2334 /* Here we are using a stream algorihtm: Every time an object is
2335 * shared we increment its count, everytime there is a miss we
2336 * recrement the counter of a random object. If this object reaches
2337 * zero we remove the object and put the current object instead. */
3305306f 2338 if (dictSize(server.sharingpool) >=
10c43610 2339 server.sharingpoolsize) {
2340 de = dictGetRandomKey(server.sharingpool);
dfc5e96c 2341 redisAssert(de != NULL);
10c43610 2342 c = ((unsigned long) dictGetEntryVal(de))-1;
2343 dictGetEntryVal(de) = (void*) c;
2344 if (c == 0) {
2345 dictDelete(server.sharingpool,de->key);
2346 }
2347 } else {
2348 c = 0; /* If the pool is empty we want to add this object */
2349 }
2350 if (c == 0) {
2351 int retval;
2352
2353 retval = dictAdd(server.sharingpool,o,(void*)1);
dfc5e96c 2354 redisAssert(retval == DICT_OK);
10c43610 2355 incrRefCount(o);
2356 }
2357 return o;
2358 }
2359}
2360
724a51b1 2361/* Check if the nul-terminated string 's' can be represented by a long
2362 * (that is, is a number that fits into long without any other space or
2363 * character before or after the digits).
2364 *
2365 * If so, the function returns REDIS_OK and *longval is set to the value
2366 * of the number. Otherwise REDIS_ERR is returned */
f69f2cba 2367static int isStringRepresentableAsLong(sds s, long *longval) {
724a51b1 2368 char buf[32], *endptr;
2369 long value;
2370 int slen;
2371
2372 value = strtol(s, &endptr, 10);
2373 if (endptr[0] != '\0') return REDIS_ERR;
2374 slen = snprintf(buf,32,"%ld",value);
2375
2376 /* If the number converted back into a string is not identical
2377 * then it's not possible to encode the string as integer */
f69f2cba 2378 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
724a51b1 2379 if (longval) *longval = value;
2380 return REDIS_OK;
2381}
2382
942a3961 2383/* Try to encode a string object in order to save space */
2384static int tryObjectEncoding(robj *o) {
2385 long value;
942a3961 2386 sds s = o->ptr;
3305306f 2387
942a3961 2388 if (o->encoding != REDIS_ENCODING_RAW)
2389 return REDIS_ERR; /* Already encoded */
3305306f 2390
942a3961 2391 /* It's not save to encode shared objects: shared objects can be shared
2392 * everywhere in the "object space" of Redis. Encoded objects can only
2393 * appear as "values" (and not, for instance, as keys) */
2394 if (o->refcount > 1) return REDIS_ERR;
3305306f 2395
942a3961 2396 /* Currently we try to encode only strings */
dfc5e96c 2397 redisAssert(o->type == REDIS_STRING);
94754ccc 2398
724a51b1 2399 /* Check if we can represent this string as a long integer */
2400 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
942a3961 2401
2402 /* Ok, this object can be encoded */
2403 o->encoding = REDIS_ENCODING_INT;
2404 sdsfree(o->ptr);
2405 o->ptr = (void*) value;
2406 return REDIS_OK;
2407}
2408
9d65a1bb 2409/* Get a decoded version of an encoded object (returned as a new object).
2410 * If the object is already raw-encoded just increment the ref count. */
2411static robj *getDecodedObject(robj *o) {
942a3961 2412 robj *dec;
2413
9d65a1bb 2414 if (o->encoding == REDIS_ENCODING_RAW) {
2415 incrRefCount(o);
2416 return o;
2417 }
942a3961 2418 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2419 char buf[32];
2420
2421 snprintf(buf,32,"%ld",(long)o->ptr);
2422 dec = createStringObject(buf,strlen(buf));
2423 return dec;
2424 } else {
dfc5e96c 2425 redisAssert(1 != 1);
942a3961 2426 }
3305306f 2427}
2428
d7f43c08 2429/* Compare two string objects via strcmp() or alike.
2430 * Note that the objects may be integer-encoded. In such a case we
2431 * use snprintf() to get a string representation of the numbers on the stack
1fd9bc8a 2432 * and compare the strings, it's much faster than calling getDecodedObject().
2433 *
2434 * Important note: if objects are not integer encoded, but binary-safe strings,
2435 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2436 * binary safe. */
724a51b1 2437static int compareStringObjects(robj *a, robj *b) {
dfc5e96c 2438 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
d7f43c08 2439 char bufa[128], bufb[128], *astr, *bstr;
2440 int bothsds = 1;
724a51b1 2441
e197b441 2442 if (a == b) return 0;
d7f43c08 2443 if (a->encoding != REDIS_ENCODING_RAW) {
2444 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2445 astr = bufa;
2446 bothsds = 0;
724a51b1 2447 } else {
d7f43c08 2448 astr = a->ptr;
724a51b1 2449 }
d7f43c08 2450 if (b->encoding != REDIS_ENCODING_RAW) {
2451 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2452 bstr = bufb;
2453 bothsds = 0;
2454 } else {
2455 bstr = b->ptr;
2456 }
2457 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
724a51b1 2458}
2459
0ea663ea 2460static size_t stringObjectLen(robj *o) {
dfc5e96c 2461 redisAssert(o->type == REDIS_STRING);
0ea663ea 2462 if (o->encoding == REDIS_ENCODING_RAW) {
2463 return sdslen(o->ptr);
2464 } else {
2465 char buf[32];
2466
2467 return snprintf(buf,32,"%ld",(long)o->ptr);
2468 }
2469}
2470
06233c45 2471/*============================ RDB saving/loading =========================== */
ed9b544e 2472
f78fd11b 2473static int rdbSaveType(FILE *fp, unsigned char type) {
2474 if (fwrite(&type,1,1,fp) == 0) return -1;
2475 return 0;
2476}
2477
bb32ede5 2478static int rdbSaveTime(FILE *fp, time_t t) {
2479 int32_t t32 = (int32_t) t;
2480 if (fwrite(&t32,4,1,fp) == 0) return -1;
2481 return 0;
2482}
2483
e3566d4b 2484/* check rdbLoadLen() comments for more info */
f78fd11b 2485static int rdbSaveLen(FILE *fp, uint32_t len) {
2486 unsigned char buf[2];
2487
2488 if (len < (1<<6)) {
2489 /* Save a 6 bit len */
10c43610 2490 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
f78fd11b 2491 if (fwrite(buf,1,1,fp) == 0) return -1;
2492 } else if (len < (1<<14)) {
2493 /* Save a 14 bit len */
10c43610 2494 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
f78fd11b 2495 buf[1] = len&0xFF;
17be1a4a 2496 if (fwrite(buf,2,1,fp) == 0) return -1;
f78fd11b 2497 } else {
2498 /* Save a 32 bit len */
10c43610 2499 buf[0] = (REDIS_RDB_32BITLEN<<6);
f78fd11b 2500 if (fwrite(buf,1,1,fp) == 0) return -1;
2501 len = htonl(len);
2502 if (fwrite(&len,4,1,fp) == 0) return -1;
2503 }
2504 return 0;
2505}
2506
e3566d4b 2507/* String objects in the form "2391" "-100" without any space and with a
2508 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2509 * encoded as integers to save space */
56906eef 2510static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
e3566d4b 2511 long long value;
2512 char *endptr, buf[32];
2513
2514 /* Check if it's possible to encode this value as a number */
2515 value = strtoll(s, &endptr, 10);
2516 if (endptr[0] != '\0') return 0;
2517 snprintf(buf,32,"%lld",value);
2518
2519 /* If the number converted back into a string is not identical
2520 * then it's not possible to encode the string as integer */
2521 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2522
2523 /* Finally check if it fits in our ranges */
2524 if (value >= -(1<<7) && value <= (1<<7)-1) {
2525 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2526 enc[1] = value&0xFF;
2527 return 2;
2528 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2529 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2530 enc[1] = value&0xFF;
2531 enc[2] = (value>>8)&0xFF;
2532 return 3;
2533 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2534 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2535 enc[1] = value&0xFF;
2536 enc[2] = (value>>8)&0xFF;
2537 enc[3] = (value>>16)&0xFF;
2538 enc[4] = (value>>24)&0xFF;
2539 return 5;
2540 } else {
2541 return 0;
2542 }
2543}
2544
774e3047 2545static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2546 unsigned int comprlen, outlen;
2547 unsigned char byte;
2548 void *out;
2549
2550 /* We require at least four bytes compression for this to be worth it */
2551 outlen = sdslen(obj->ptr)-4;
2552 if (outlen <= 0) return 0;
3a2694c4 2553 if ((out = zmalloc(outlen+1)) == NULL) return 0;
774e3047 2554 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2555 if (comprlen == 0) {
88e85998 2556 zfree(out);
774e3047 2557 return 0;
2558 }
2559 /* Data compressed! Let's save it on disk */
2560 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2561 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2562 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2563 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2564 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
88e85998 2565 zfree(out);
774e3047 2566 return comprlen;
2567
2568writeerr:
88e85998 2569 zfree(out);
774e3047 2570 return -1;
2571}
2572
e3566d4b 2573/* Save a string objet as [len][data] on disk. If the object is a string
2574 * representation of an integer value we try to safe it in a special form */
942a3961 2575static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2576 size_t len;
e3566d4b 2577 int enclen;
10c43610 2578
942a3961 2579 len = sdslen(obj->ptr);
2580
774e3047 2581 /* Try integer encoding */
e3566d4b 2582 if (len <= 11) {
2583 unsigned char buf[5];
2584 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2585 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2586 return 0;
2587 }
2588 }
774e3047 2589
2590 /* Try LZF compression - under 20 bytes it's unable to compress even
88e85998 2591 * aaaaaaaaaaaaaaaaaa so skip it */
121f70cf 2592 if (server.rdbcompression && len > 20) {
774e3047 2593 int retval;
2594
2595 retval = rdbSaveLzfStringObject(fp,obj);
2596 if (retval == -1) return -1;
2597 if (retval > 0) return 0;
2598 /* retval == 0 means data can't be compressed, save the old way */
2599 }
2600
2601 /* Store verbatim */
10c43610 2602 if (rdbSaveLen(fp,len) == -1) return -1;
2603 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2604 return 0;
2605}
2606
942a3961 2607/* Like rdbSaveStringObjectRaw() but handle encoded objects */
2608static int rdbSaveStringObject(FILE *fp, robj *obj) {
2609 int retval;
942a3961 2610
9d65a1bb 2611 obj = getDecodedObject(obj);
2612 retval = rdbSaveStringObjectRaw(fp,obj);
2613 decrRefCount(obj);
2614 return retval;
942a3961 2615}
2616
a7866db6 2617/* Save a double value. Doubles are saved as strings prefixed by an unsigned
2618 * 8 bit integer specifing the length of the representation.
2619 * This 8 bit integer has special values in order to specify the following
2620 * conditions:
2621 * 253: not a number
2622 * 254: + inf
2623 * 255: - inf
2624 */
2625static int rdbSaveDoubleValue(FILE *fp, double val) {
2626 unsigned char buf[128];
2627 int len;
2628
2629 if (isnan(val)) {
2630 buf[0] = 253;
2631 len = 1;
2632 } else if (!isfinite(val)) {
2633 len = 1;
2634 buf[0] = (val < 0) ? 255 : 254;
2635 } else {
eaa256ad 2636 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
6c446631 2637 buf[0] = strlen((char*)buf+1);
a7866db6 2638 len = buf[0]+1;
2639 }
2640 if (fwrite(buf,len,1,fp) == 0) return -1;
2641 return 0;
2642}
2643
06233c45 2644/* Save a Redis object. */
2645static int rdbSaveObject(FILE *fp, robj *o) {
2646 if (o->type == REDIS_STRING) {
2647 /* Save a string value */
2648 if (rdbSaveStringObject(fp,o) == -1) return -1;
2649 } else if (o->type == REDIS_LIST) {
2650 /* Save a list value */
2651 list *list = o->ptr;
2652 listNode *ln;
2653
2654 listRewind(list);
2655 if (rdbSaveLen(fp,listLength(list)) == -1) return -1;
2656 while((ln = listYield(list))) {
2657 robj *eleobj = listNodeValue(ln);
2658
2659 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2660 }
2661 } else if (o->type == REDIS_SET) {
2662 /* Save a set value */
2663 dict *set = o->ptr;
2664 dictIterator *di = dictGetIterator(set);
2665 dictEntry *de;
2666
2667 if (rdbSaveLen(fp,dictSize(set)) == -1) return -1;
2668 while((de = dictNext(di)) != NULL) {
2669 robj *eleobj = dictGetEntryKey(de);
2670
2671 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2672 }
2673 dictReleaseIterator(di);
2674 } else if (o->type == REDIS_ZSET) {
2675 /* Save a set value */
2676 zset *zs = o->ptr;
2677 dictIterator *di = dictGetIterator(zs->dict);
2678 dictEntry *de;
2679
2680 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) return -1;
2681 while((de = dictNext(di)) != NULL) {
2682 robj *eleobj = dictGetEntryKey(de);
2683 double *score = dictGetEntryVal(de);
2684
2685 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2686 if (rdbSaveDoubleValue(fp,*score) == -1) return -1;
2687 }
2688 dictReleaseIterator(di);
2689 } else {
2690 redisAssert(0 != 0);
2691 }
2692 return 0;
2693}
2694
2695/* Return the length the object will have on disk if saved with
2696 * the rdbSaveObject() function. Currently we use a trick to get
2697 * this length with very little changes to the code. In the future
2698 * we could switch to a faster solution. */
2699static off_t rdbSavedObjectLen(robj *o) {
2700 static FILE *fp = NULL;
2701
2702 if (fp == NULL) fp = fopen("/dev/null","w");
2703 assert(fp != NULL);
2704
2705 rewind(fp);
2706 assert(rdbSaveObject(fp,o) != 1);
2707 return ftello(fp);
2708}
2709
ed9b544e 2710/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
f78fd11b 2711static int rdbSave(char *filename) {
ed9b544e 2712 dictIterator *di = NULL;
2713 dictEntry *de;
ed9b544e 2714 FILE *fp;
2715 char tmpfile[256];
2716 int j;
bb32ede5 2717 time_t now = time(NULL);
ed9b544e 2718
a3b21203 2719 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
ed9b544e 2720 fp = fopen(tmpfile,"w");
2721 if (!fp) {
2722 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2723 return REDIS_ERR;
2724 }
f78fd11b 2725 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
ed9b544e 2726 for (j = 0; j < server.dbnum; j++) {
bb32ede5 2727 redisDb *db = server.db+j;
2728 dict *d = db->dict;
3305306f 2729 if (dictSize(d) == 0) continue;
ed9b544e 2730 di = dictGetIterator(d);
2731 if (!di) {
2732 fclose(fp);
2733 return REDIS_ERR;
2734 }
2735
2736 /* Write the SELECT DB opcode */
f78fd11b 2737 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2738 if (rdbSaveLen(fp,j) == -1) goto werr;
ed9b544e 2739
2740 /* Iterate this DB writing every entry */
2741 while((de = dictNext(di)) != NULL) {
2742 robj *key = dictGetEntryKey(de);
2743 robj *o = dictGetEntryVal(de);
bb32ede5 2744 time_t expiretime = getExpire(db,key);
2745
2746 /* Save the expire time */
2747 if (expiretime != -1) {
2748 /* If this key is already expired skip it */
2749 if (expiretime < now) continue;
2750 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2751 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2752 }
2753 /* Save the key and associated value */
f78fd11b 2754 if (rdbSaveType(fp,o->type) == -1) goto werr;
10c43610 2755 if (rdbSaveStringObject(fp,key) == -1) goto werr;
06233c45 2756 /* Save the actual value */
2757 if (rdbSaveObject(fp,o) == -1) goto werr;
ed9b544e 2758 }
2759 dictReleaseIterator(di);
2760 }
2761 /* EOF opcode */
f78fd11b 2762 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2763
2764 /* Make sure data will not remain on the OS's output buffers */
ed9b544e 2765 fflush(fp);
2766 fsync(fileno(fp));
2767 fclose(fp);
2768
2769 /* Use RENAME to make sure the DB file is changed atomically only
2770 * if the generate DB file is ok. */
2771 if (rename(tmpfile,filename) == -1) {
325d1eb4 2772 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
ed9b544e 2773 unlink(tmpfile);
2774 return REDIS_ERR;
2775 }
2776 redisLog(REDIS_NOTICE,"DB saved on disk");
2777 server.dirty = 0;
2778 server.lastsave = time(NULL);
2779 return REDIS_OK;
2780
2781werr:
2782 fclose(fp);
2783 unlink(tmpfile);
2784 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2785 if (di) dictReleaseIterator(di);
2786 return REDIS_ERR;
2787}
2788
f78fd11b 2789static int rdbSaveBackground(char *filename) {
ed9b544e 2790 pid_t childpid;
2791
9d65a1bb 2792 if (server.bgsavechildpid != -1) return REDIS_ERR;
ed9b544e 2793 if ((childpid = fork()) == 0) {
2794 /* Child */
2795 close(server.fd);
f78fd11b 2796 if (rdbSave(filename) == REDIS_OK) {
ed9b544e 2797 exit(0);
2798 } else {
2799 exit(1);
2800 }
2801 } else {
2802 /* Parent */
5a7c647e 2803 if (childpid == -1) {
2804 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2805 strerror(errno));
2806 return REDIS_ERR;
2807 }
ed9b544e 2808 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
9f3c422c 2809 server.bgsavechildpid = childpid;
ed9b544e 2810 return REDIS_OK;
2811 }
2812 return REDIS_OK; /* unreached */
2813}
2814
a3b21203 2815static void rdbRemoveTempFile(pid_t childpid) {
2816 char tmpfile[256];
2817
2818 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2819 unlink(tmpfile);
2820}
2821
f78fd11b 2822static int rdbLoadType(FILE *fp) {
2823 unsigned char type;
7b45bfb2 2824 if (fread(&type,1,1,fp) == 0) return -1;
2825 return type;
2826}
2827
bb32ede5 2828static time_t rdbLoadTime(FILE *fp) {
2829 int32_t t32;
2830 if (fread(&t32,4,1,fp) == 0) return -1;
2831 return (time_t) t32;
2832}
2833
e3566d4b 2834/* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2835 * of this file for a description of how this are stored on disk.
2836 *
2837 * isencoded is set to 1 if the readed length is not actually a length but
2838 * an "encoding type", check the above comments for more info */
2839static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
f78fd11b 2840 unsigned char buf[2];
2841 uint32_t len;
2842
e3566d4b 2843 if (isencoded) *isencoded = 0;
f78fd11b 2844 if (rdbver == 0) {
2845 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2846 return ntohl(len);
2847 } else {
17be1a4a 2848 int type;
2849
f78fd11b 2850 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
17be1a4a 2851 type = (buf[0]&0xC0)>>6;
2852 if (type == REDIS_RDB_6BITLEN) {
f78fd11b 2853 /* Read a 6 bit len */
e3566d4b 2854 return buf[0]&0x3F;
2855 } else if (type == REDIS_RDB_ENCVAL) {
2856 /* Read a 6 bit len encoding type */
2857 if (isencoded) *isencoded = 1;
2858 return buf[0]&0x3F;
17be1a4a 2859 } else if (type == REDIS_RDB_14BITLEN) {
f78fd11b 2860 /* Read a 14 bit len */
2861 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2862 return ((buf[0]&0x3F)<<8)|buf[1];
2863 } else {
2864 /* Read a 32 bit len */
2865 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2866 return ntohl(len);
2867 }
2868 }
f78fd11b 2869}
2870
e3566d4b 2871static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2872 unsigned char enc[4];
2873 long long val;
2874
2875 if (enctype == REDIS_RDB_ENC_INT8) {
2876 if (fread(enc,1,1,fp) == 0) return NULL;
2877 val = (signed char)enc[0];
2878 } else if (enctype == REDIS_RDB_ENC_INT16) {
2879 uint16_t v;
2880 if (fread(enc,2,1,fp) == 0) return NULL;
2881 v = enc[0]|(enc[1]<<8);
2882 val = (int16_t)v;
2883 } else if (enctype == REDIS_RDB_ENC_INT32) {
2884 uint32_t v;
2885 if (fread(enc,4,1,fp) == 0) return NULL;
2886 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2887 val = (int32_t)v;
2888 } else {
2889 val = 0; /* anti-warning */
dfc5e96c 2890 redisAssert(0!=0);
e3566d4b 2891 }
2892 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2893}
2894
88e85998 2895static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2896 unsigned int len, clen;
2897 unsigned char *c = NULL;
2898 sds val = NULL;
2899
2900 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2901 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2902 if ((c = zmalloc(clen)) == NULL) goto err;
2903 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2904 if (fread(c,clen,1,fp) == 0) goto err;
2905 if (lzf_decompress(c,clen,val,len) == 0) goto err;
5109cdff 2906 zfree(c);
88e85998 2907 return createObject(REDIS_STRING,val);
2908err:
2909 zfree(c);
2910 sdsfree(val);
2911 return NULL;
2912}
2913
e3566d4b 2914static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2915 int isencoded;
2916 uint32_t len;
f78fd11b 2917 sds val;
2918
e3566d4b 2919 len = rdbLoadLen(fp,rdbver,&isencoded);
2920 if (isencoded) {
2921 switch(len) {
2922 case REDIS_RDB_ENC_INT8:
2923 case REDIS_RDB_ENC_INT16:
2924 case REDIS_RDB_ENC_INT32:
3305306f 2925 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
88e85998 2926 case REDIS_RDB_ENC_LZF:
2927 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
e3566d4b 2928 default:
dfc5e96c 2929 redisAssert(0!=0);
e3566d4b 2930 }
2931 }
2932
f78fd11b 2933 if (len == REDIS_RDB_LENERR) return NULL;
2934 val = sdsnewlen(NULL,len);
2935 if (len && fread(val,len,1,fp) == 0) {
2936 sdsfree(val);
2937 return NULL;
2938 }
10c43610 2939 return tryObjectSharing(createObject(REDIS_STRING,val));
f78fd11b 2940}
2941
a7866db6 2942/* For information about double serialization check rdbSaveDoubleValue() */
2943static int rdbLoadDoubleValue(FILE *fp, double *val) {
2944 char buf[128];
2945 unsigned char len;
2946
2947 if (fread(&len,1,1,fp) == 0) return -1;
2948 switch(len) {
2949 case 255: *val = R_NegInf; return 0;
2950 case 254: *val = R_PosInf; return 0;
2951 case 253: *val = R_Nan; return 0;
2952 default:
2953 if (fread(buf,len,1,fp) == 0) return -1;
231d758e 2954 buf[len] = '\0';
a7866db6 2955 sscanf(buf, "%lg", val);
2956 return 0;
2957 }
2958}
2959
f78fd11b 2960static int rdbLoad(char *filename) {
ed9b544e 2961 FILE *fp;
f78fd11b 2962 robj *keyobj = NULL;
2963 uint32_t dbid;
bb32ede5 2964 int type, retval, rdbver;
3305306f 2965 dict *d = server.db[0].dict;
bb32ede5 2966 redisDb *db = server.db+0;
f78fd11b 2967 char buf[1024];
bb32ede5 2968 time_t expiretime = -1, now = time(NULL);
2969
ed9b544e 2970 fp = fopen(filename,"r");
2971 if (!fp) return REDIS_ERR;
2972 if (fread(buf,9,1,fp) == 0) goto eoferr;
f78fd11b 2973 buf[9] = '\0';
2974 if (memcmp(buf,"REDIS",5) != 0) {
ed9b544e 2975 fclose(fp);
2976 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2977 return REDIS_ERR;
2978 }
f78fd11b 2979 rdbver = atoi(buf+5);
2980 if (rdbver > 1) {
2981 fclose(fp);
2982 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2983 return REDIS_ERR;
2984 }
ed9b544e 2985 while(1) {
2986 robj *o;
2987
2988 /* Read type. */
f78fd11b 2989 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
bb32ede5 2990 if (type == REDIS_EXPIRETIME) {
2991 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2992 /* We read the time so we need to read the object type again */
2993 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2994 }
ed9b544e 2995 if (type == REDIS_EOF) break;
2996 /* Handle SELECT DB opcode as a special case */
2997 if (type == REDIS_SELECTDB) {
e3566d4b 2998 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2999 goto eoferr;
ed9b544e 3000 if (dbid >= (unsigned)server.dbnum) {
f78fd11b 3001 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
ed9b544e 3002 exit(1);
3003 }
bb32ede5 3004 db = server.db+dbid;
3005 d = db->dict;
ed9b544e 3006 continue;
3007 }
3008 /* Read key */
f78fd11b 3009 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
ed9b544e 3010
3011 if (type == REDIS_STRING) {
3012 /* Read string value */
f78fd11b 3013 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
942a3961 3014 tryObjectEncoding(o);
ed9b544e 3015 } else if (type == REDIS_LIST || type == REDIS_SET) {
3016 /* Read list/set value */
3017 uint32_t listlen;
f78fd11b 3018
e3566d4b 3019 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
f78fd11b 3020 goto eoferr;
ed9b544e 3021 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
3022 /* Load every single element of the list/set */
3023 while(listlen--) {
3024 robj *ele;
3025
f78fd11b 3026 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
942a3961 3027 tryObjectEncoding(ele);
ed9b544e 3028 if (type == REDIS_LIST) {
6b47e12e 3029 listAddNodeTail((list*)o->ptr,ele);
ed9b544e 3030 } else {
6b47e12e 3031 dictAdd((dict*)o->ptr,ele,NULL);
ed9b544e 3032 }
ed9b544e 3033 }
2b59cfdf 3034 } else if (type == REDIS_ZSET) {
3035 /* Read list/set value */
3036 uint32_t zsetlen;
3037 zset *zs;
3038
3039 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
3040 goto eoferr;
3041 o = createZsetObject();
3042 zs = o->ptr;
3043 /* Load every single element of the list/set */
3044 while(zsetlen--) {
3045 robj *ele;
3046 double *score = zmalloc(sizeof(double));
3047
3048 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
3049 tryObjectEncoding(ele);
3050 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
3051 dictAdd(zs->dict,ele,score);
3052 zslInsert(zs->zsl,*score,ele);
3053 incrRefCount(ele); /* added to skiplist */
3054 }
ed9b544e 3055 } else {
dfc5e96c 3056 redisAssert(0 != 0);
ed9b544e 3057 }
3058 /* Add the new object in the hash table */
f78fd11b 3059 retval = dictAdd(d,keyobj,o);
ed9b544e 3060 if (retval == DICT_ERR) {
f78fd11b 3061 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
ed9b544e 3062 exit(1);
3063 }
bb32ede5 3064 /* Set the expire time if needed */
3065 if (expiretime != -1) {
3066 setExpire(db,keyobj,expiretime);
3067 /* Delete this key if already expired */
3068 if (expiretime < now) deleteKey(db,keyobj);
3069 expiretime = -1;
3070 }
f78fd11b 3071 keyobj = o = NULL;
ed9b544e 3072 }
3073 fclose(fp);
3074 return REDIS_OK;
3075
3076eoferr: /* unexpected end of file is handled here with a fatal exit */
e3566d4b 3077 if (keyobj) decrRefCount(keyobj);
f80dff62 3078 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
ed9b544e 3079 exit(1);
3080 return REDIS_ERR; /* Just to avoid warning */
3081}
3082
3083/*================================== Commands =============================== */
3084
abcb223e 3085static void authCommand(redisClient *c) {
2e77c2ee 3086 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
abcb223e
BH
3087 c->authenticated = 1;
3088 addReply(c,shared.ok);
3089 } else {
3090 c->authenticated = 0;
fa4c0aba 3091 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
abcb223e
BH
3092 }
3093}
3094
ed9b544e 3095static void pingCommand(redisClient *c) {
3096 addReply(c,shared.pong);
3097}
3098
3099static void echoCommand(redisClient *c) {
942a3961 3100 addReplyBulkLen(c,c->argv[1]);
ed9b544e 3101 addReply(c,c->argv[1]);
3102 addReply(c,shared.crlf);
3103}
3104
3105/*=================================== Strings =============================== */
3106
3107static void setGenericCommand(redisClient *c, int nx) {
3108 int retval;
3109
333fd216 3110 if (nx) deleteIfVolatile(c->db,c->argv[1]);
3305306f 3111 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
ed9b544e 3112 if (retval == DICT_ERR) {
3113 if (!nx) {
3305306f 3114 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
ed9b544e 3115 incrRefCount(c->argv[2]);
3116 } else {
c937aa89 3117 addReply(c,shared.czero);
ed9b544e 3118 return;
3119 }
3120 } else {
3121 incrRefCount(c->argv[1]);
3122 incrRefCount(c->argv[2]);
3123 }
3124 server.dirty++;
3305306f 3125 removeExpire(c->db,c->argv[1]);
c937aa89 3126 addReply(c, nx ? shared.cone : shared.ok);
ed9b544e 3127}
3128
3129static void setCommand(redisClient *c) {
a4d1ba9a 3130 setGenericCommand(c,0);
ed9b544e 3131}
3132
3133static void setnxCommand(redisClient *c) {
a4d1ba9a 3134 setGenericCommand(c,1);
ed9b544e 3135}
3136
322fc7d8 3137static int getGenericCommand(redisClient *c) {
3305306f 3138 robj *o = lookupKeyRead(c->db,c->argv[1]);
3139
3140 if (o == NULL) {
c937aa89 3141 addReply(c,shared.nullbulk);
322fc7d8 3142 return REDIS_OK;
ed9b544e 3143 } else {
ed9b544e 3144 if (o->type != REDIS_STRING) {
c937aa89 3145 addReply(c,shared.wrongtypeerr);
322fc7d8 3146 return REDIS_ERR;
ed9b544e 3147 } else {
942a3961 3148 addReplyBulkLen(c,o);
ed9b544e 3149 addReply(c,o);
3150 addReply(c,shared.crlf);
322fc7d8 3151 return REDIS_OK;
ed9b544e 3152 }
3153 }
3154}
3155
322fc7d8 3156static void getCommand(redisClient *c) {
3157 getGenericCommand(c);
3158}
3159
f6b141c5 3160static void getsetCommand(redisClient *c) {
322fc7d8 3161 if (getGenericCommand(c) == REDIS_ERR) return;
a431eb74 3162 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3163 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3164 } else {
3165 incrRefCount(c->argv[1]);
3166 }
3167 incrRefCount(c->argv[2]);
3168 server.dirty++;
3169 removeExpire(c->db,c->argv[1]);
3170}
3171
70003d28 3172static void mgetCommand(redisClient *c) {
70003d28 3173 int j;
3174
c937aa89 3175 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
70003d28 3176 for (j = 1; j < c->argc; j++) {
3305306f 3177 robj *o = lookupKeyRead(c->db,c->argv[j]);
3178 if (o == NULL) {
c937aa89 3179 addReply(c,shared.nullbulk);
70003d28 3180 } else {
70003d28 3181 if (o->type != REDIS_STRING) {
c937aa89 3182 addReply(c,shared.nullbulk);
70003d28 3183 } else {
942a3961 3184 addReplyBulkLen(c,o);
70003d28 3185 addReply(c,o);
3186 addReply(c,shared.crlf);
3187 }
3188 }
3189 }
3190}
3191
6c446631 3192static void msetGenericCommand(redisClient *c, int nx) {
906573e7 3193 int j, busykeys = 0;
6c446631 3194
3195 if ((c->argc % 2) == 0) {
454d4e43 3196 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
6c446631 3197 return;
3198 }
3199 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3200 * set nothing at all if at least one already key exists. */
3201 if (nx) {
3202 for (j = 1; j < c->argc; j += 2) {
906573e7 3203 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3204 busykeys++;
6c446631 3205 }
3206 }
3207 }
906573e7 3208 if (busykeys) {
3209 addReply(c, shared.czero);
3210 return;
3211 }
6c446631 3212
3213 for (j = 1; j < c->argc; j += 2) {
3214 int retval;
3215
17511391 3216 tryObjectEncoding(c->argv[j+1]);
6c446631 3217 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3218 if (retval == DICT_ERR) {
3219 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3220 incrRefCount(c->argv[j+1]);
3221 } else {
3222 incrRefCount(c->argv[j]);
3223 incrRefCount(c->argv[j+1]);
3224 }
3225 removeExpire(c->db,c->argv[j]);
3226 }
3227 server.dirty += (c->argc-1)/2;
3228 addReply(c, nx ? shared.cone : shared.ok);
3229}
3230
3231static void msetCommand(redisClient *c) {
3232 msetGenericCommand(c,0);
3233}
3234
3235static void msetnxCommand(redisClient *c) {
3236 msetGenericCommand(c,1);
3237}
3238
d68ed120 3239static void incrDecrCommand(redisClient *c, long long incr) {
ed9b544e 3240 long long value;
3241 int retval;
3242 robj *o;
3243
3305306f 3244 o = lookupKeyWrite(c->db,c->argv[1]);
3245 if (o == NULL) {
ed9b544e 3246 value = 0;
3247 } else {
ed9b544e 3248 if (o->type != REDIS_STRING) {
3249 value = 0;
3250 } else {
3251 char *eptr;
3252
942a3961 3253 if (o->encoding == REDIS_ENCODING_RAW)
3254 value = strtoll(o->ptr, &eptr, 10);
3255 else if (o->encoding == REDIS_ENCODING_INT)
3256 value = (long)o->ptr;
3257 else
dfc5e96c 3258 redisAssert(1 != 1);
ed9b544e 3259 }
3260 }
3261
3262 value += incr;
3263 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
942a3961 3264 tryObjectEncoding(o);
3305306f 3265 retval = dictAdd(c->db->dict,c->argv[1],o);
ed9b544e 3266 if (retval == DICT_ERR) {
3305306f 3267 dictReplace(c->db->dict,c->argv[1],o);
3268 removeExpire(c->db,c->argv[1]);
ed9b544e 3269 } else {
3270 incrRefCount(c->argv[1]);
3271 }
3272 server.dirty++;
c937aa89 3273 addReply(c,shared.colon);
ed9b544e 3274 addReply(c,o);
3275 addReply(c,shared.crlf);
3276}
3277
3278static void incrCommand(redisClient *c) {
a4d1ba9a 3279 incrDecrCommand(c,1);
ed9b544e 3280}
3281
3282static void decrCommand(redisClient *c) {
a4d1ba9a 3283 incrDecrCommand(c,-1);
ed9b544e 3284}
3285
3286static void incrbyCommand(redisClient *c) {
d68ed120 3287 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
a4d1ba9a 3288 incrDecrCommand(c,incr);
ed9b544e 3289}
3290
3291static void decrbyCommand(redisClient *c) {
d68ed120 3292 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
a4d1ba9a 3293 incrDecrCommand(c,-incr);
ed9b544e 3294}
3295
3296/* ========================= Type agnostic commands ========================= */
3297
3298static void delCommand(redisClient *c) {
5109cdff 3299 int deleted = 0, j;
3300
3301 for (j = 1; j < c->argc; j++) {
3302 if (deleteKey(c->db,c->argv[j])) {
3303 server.dirty++;
3304 deleted++;
3305 }
3306 }
3307 switch(deleted) {
3308 case 0:
c937aa89 3309 addReply(c,shared.czero);
5109cdff 3310 break;
3311 case 1:
3312 addReply(c,shared.cone);
3313 break;
3314 default:
3315 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3316 break;
ed9b544e 3317 }
3318}
3319
3320static void existsCommand(redisClient *c) {
3305306f 3321 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
ed9b544e 3322}
3323
3324static void selectCommand(redisClient *c) {
3325 int id = atoi(c->argv[1]->ptr);
3326
3327 if (selectDb(c,id) == REDIS_ERR) {
774e3047 3328 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
ed9b544e 3329 } else {
3330 addReply(c,shared.ok);
3331 }
3332}
3333
3334static void randomkeyCommand(redisClient *c) {
3335 dictEntry *de;
3305306f 3336
3337 while(1) {
3338 de = dictGetRandomKey(c->db->dict);
ce7bef07 3339 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3305306f 3340 }
ed9b544e 3341 if (de == NULL) {
ce7bef07 3342 addReply(c,shared.plus);
ed9b544e 3343 addReply(c,shared.crlf);
3344 } else {
c937aa89 3345 addReply(c,shared.plus);
ed9b544e 3346 addReply(c,dictGetEntryKey(de));
3347 addReply(c,shared.crlf);
3348 }
3349}
3350
3351static void keysCommand(redisClient *c) {
3352 dictIterator *di;
3353 dictEntry *de;
3354 sds pattern = c->argv[1]->ptr;
3355 int plen = sdslen(pattern);
682ac724 3356 unsigned long numkeys = 0, keyslen = 0;
ed9b544e 3357 robj *lenobj = createObject(REDIS_STRING,NULL);
3358
3305306f 3359 di = dictGetIterator(c->db->dict);
ed9b544e 3360 addReply(c,lenobj);
3361 decrRefCount(lenobj);
3362 while((de = dictNext(di)) != NULL) {
3363 robj *keyobj = dictGetEntryKey(de);
3305306f 3364
ed9b544e 3365 sds key = keyobj->ptr;
3366 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3367 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3305306f 3368 if (expireIfNeeded(c->db,keyobj) == 0) {
3369 if (numkeys != 0)
3370 addReply(c,shared.space);
3371 addReply(c,keyobj);
3372 numkeys++;
3373 keyslen += sdslen(key);
3374 }
ed9b544e 3375 }
3376 }
3377 dictReleaseIterator(di);
c937aa89 3378 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
ed9b544e 3379 addReply(c,shared.crlf);
3380}
3381
3382static void dbsizeCommand(redisClient *c) {
3383 addReplySds(c,
3305306f 3384 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
ed9b544e 3385}
3386
3387static void lastsaveCommand(redisClient *c) {
3388 addReplySds(c,
c937aa89 3389 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
ed9b544e 3390}
3391
3392static void typeCommand(redisClient *c) {
3305306f 3393 robj *o;
ed9b544e 3394 char *type;
3305306f 3395
3396 o = lookupKeyRead(c->db,c->argv[1]);
3397 if (o == NULL) {
c937aa89 3398 type = "+none";
ed9b544e 3399 } else {
ed9b544e 3400 switch(o->type) {
c937aa89 3401 case REDIS_STRING: type = "+string"; break;
3402 case REDIS_LIST: type = "+list"; break;
3403 case REDIS_SET: type = "+set"; break;
412a8bce 3404 case REDIS_ZSET: type = "+zset"; break;
ed9b544e 3405 default: type = "unknown"; break;
3406 }
3407 }
3408 addReplySds(c,sdsnew(type));
3409 addReply(c,shared.crlf);
3410}
3411
3412static void saveCommand(redisClient *c) {
9d65a1bb 3413 if (server.bgsavechildpid != -1) {
05557f6d 3414 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3415 return;
3416 }
f78fd11b 3417 if (rdbSave(server.dbfilename) == REDIS_OK) {
ed9b544e 3418 addReply(c,shared.ok);
3419 } else {
3420 addReply(c,shared.err);
3421 }
3422}
3423
3424static void bgsaveCommand(redisClient *c) {
9d65a1bb 3425 if (server.bgsavechildpid != -1) {
ed9b544e 3426 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3427 return;
3428 }
f78fd11b 3429 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
49b99ab4 3430 char *status = "+Background saving started\r\n";
3431 addReplySds(c,sdsnew(status));
ed9b544e 3432 } else {
3433 addReply(c,shared.err);
3434 }
3435}
3436
3437static void shutdownCommand(redisClient *c) {
3438 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
a3b21203 3439 /* Kill the saving child if there is a background saving in progress.
3440 We want to avoid race conditions, for instance our saving child may
3441 overwrite the synchronous saving did by SHUTDOWN. */
9d65a1bb 3442 if (server.bgsavechildpid != -1) {
9f3c422c 3443 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3444 kill(server.bgsavechildpid,SIGKILL);
a3b21203 3445 rdbRemoveTempFile(server.bgsavechildpid);
9f3c422c 3446 }
ac945e2d 3447 if (server.appendonly) {
3448 /* Append only file: fsync() the AOF and exit */
3449 fsync(server.appendfd);
3450 exit(0);
ed9b544e 3451 } else {
ac945e2d 3452 /* Snapshotting. Perform a SYNC SAVE and exit */
3453 if (rdbSave(server.dbfilename) == REDIS_OK) {
3454 if (server.daemonize)
3455 unlink(server.pidfile);
3456 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3457 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3458 exit(0);
3459 } else {
3460 /* Ooops.. error saving! The best we can do is to continue operating.
3461 * Note that if there was a background saving process, in the next
3462 * cron() Redis will be notified that the background saving aborted,
3463 * handling special stuff like slaves pending for synchronization... */
3464 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3465 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3466 }
ed9b544e 3467 }
3468}
3469
3470static void renameGenericCommand(redisClient *c, int nx) {
ed9b544e 3471 robj *o;
3472
3473 /* To use the same key as src and dst is probably an error */
3474 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
c937aa89 3475 addReply(c,shared.sameobjecterr);
ed9b544e 3476 return;
3477 }
3478
3305306f 3479 o = lookupKeyWrite(c->db,c->argv[1]);
3480 if (o == NULL) {
c937aa89 3481 addReply(c,shared.nokeyerr);
ed9b544e 3482 return;
3483 }
ed9b544e 3484 incrRefCount(o);
3305306f 3485 deleteIfVolatile(c->db,c->argv[2]);
3486 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
ed9b544e 3487 if (nx) {
3488 decrRefCount(o);
c937aa89 3489 addReply(c,shared.czero);
ed9b544e 3490 return;
3491 }
3305306f 3492 dictReplace(c->db->dict,c->argv[2],o);
ed9b544e 3493 } else {
3494 incrRefCount(c->argv[2]);
3495 }
3305306f 3496 deleteKey(c->db,c->argv[1]);
ed9b544e 3497 server.dirty++;
c937aa89 3498 addReply(c,nx ? shared.cone : shared.ok);
ed9b544e 3499}
3500
3501static void renameCommand(redisClient *c) {
3502 renameGenericCommand(c,0);
3503}
3504
3505static void renamenxCommand(redisClient *c) {
3506 renameGenericCommand(c,1);
3507}
3508
3509static void moveCommand(redisClient *c) {
3305306f 3510 robj *o;
3511 redisDb *src, *dst;
ed9b544e 3512 int srcid;
3513
3514 /* Obtain source and target DB pointers */
3305306f 3515 src = c->db;
3516 srcid = c->db->id;
ed9b544e 3517 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
c937aa89 3518 addReply(c,shared.outofrangeerr);
ed9b544e 3519 return;
3520 }
3305306f 3521 dst = c->db;
3522 selectDb(c,srcid); /* Back to the source DB */
ed9b544e 3523
3524 /* If the user is moving using as target the same
3525 * DB as the source DB it is probably an error. */
3526 if (src == dst) {
c937aa89 3527 addReply(c,shared.sameobjecterr);
ed9b544e 3528 return;
3529 }
3530
3531 /* Check if the element exists and get a reference */
3305306f 3532 o = lookupKeyWrite(c->db,c->argv[1]);
3533 if (!o) {
c937aa89 3534 addReply(c,shared.czero);
ed9b544e 3535 return;
3536 }
3537
3538 /* Try to add the element to the target DB */
3305306f 3539 deleteIfVolatile(dst,c->argv[1]);
3540 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
c937aa89 3541 addReply(c,shared.czero);
ed9b544e 3542 return;
3543 }
3305306f 3544 incrRefCount(c->argv[1]);
ed9b544e 3545 incrRefCount(o);
3546
3547 /* OK! key moved, free the entry in the source DB */
3305306f 3548 deleteKey(src,c->argv[1]);
ed9b544e 3549 server.dirty++;
c937aa89 3550 addReply(c,shared.cone);
ed9b544e 3551}
3552
3553/* =================================== Lists ================================ */
3554static void pushGenericCommand(redisClient *c, int where) {
3555 robj *lobj;
ed9b544e 3556 list *list;
3305306f 3557
3558 lobj = lookupKeyWrite(c->db,c->argv[1]);
3559 if (lobj == NULL) {
95242ab5 3560 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3561 addReply(c,shared.ok);
3562 return;
3563 }
ed9b544e 3564 lobj = createListObject();
3565 list = lobj->ptr;
3566 if (where == REDIS_HEAD) {
6b47e12e 3567 listAddNodeHead(list,c->argv[2]);
ed9b544e 3568 } else {
6b47e12e 3569 listAddNodeTail(list,c->argv[2]);
ed9b544e 3570 }
3305306f 3571 dictAdd(c->db->dict,c->argv[1],lobj);
ed9b544e 3572 incrRefCount(c->argv[1]);
3573 incrRefCount(c->argv[2]);
3574 } else {
ed9b544e 3575 if (lobj->type != REDIS_LIST) {
3576 addReply(c,shared.wrongtypeerr);
3577 return;
3578 }
95242ab5 3579 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3580 addReply(c,shared.ok);
3581 return;
3582 }
ed9b544e 3583 list = lobj->ptr;
3584 if (where == REDIS_HEAD) {
6b47e12e 3585 listAddNodeHead(list,c->argv[2]);
ed9b544e 3586 } else {
6b47e12e 3587 listAddNodeTail(list,c->argv[2]);
ed9b544e 3588 }
3589 incrRefCount(c->argv[2]);
3590 }
3591 server.dirty++;
3592 addReply(c,shared.ok);
3593}
3594
3595static void lpushCommand(redisClient *c) {
3596 pushGenericCommand(c,REDIS_HEAD);
3597}
3598
3599static void rpushCommand(redisClient *c) {
3600 pushGenericCommand(c,REDIS_TAIL);
3601}
3602
3603static void llenCommand(redisClient *c) {
3305306f 3604 robj *o;
ed9b544e 3605 list *l;
3606
3305306f 3607 o = lookupKeyRead(c->db,c->argv[1]);
3608 if (o == NULL) {
c937aa89 3609 addReply(c,shared.czero);
ed9b544e 3610 return;
3611 } else {
ed9b544e 3612 if (o->type != REDIS_LIST) {
c937aa89 3613 addReply(c,shared.wrongtypeerr);
ed9b544e 3614 } else {
3615 l = o->ptr;
c937aa89 3616 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
ed9b544e 3617 }
3618 }
3619}
3620
3621static void lindexCommand(redisClient *c) {
3305306f 3622 robj *o;
ed9b544e 3623 int index = atoi(c->argv[2]->ptr);
3624
3305306f 3625 o = lookupKeyRead(c->db,c->argv[1]);
3626 if (o == NULL) {
c937aa89 3627 addReply(c,shared.nullbulk);
ed9b544e 3628 } else {
ed9b544e 3629 if (o->type != REDIS_LIST) {
c937aa89 3630 addReply(c,shared.wrongtypeerr);
ed9b544e 3631 } else {
3632 list *list = o->ptr;
3633 listNode *ln;
3634
3635 ln = listIndex(list, index);
3636 if (ln == NULL) {
c937aa89 3637 addReply(c,shared.nullbulk);
ed9b544e 3638 } else {
3639 robj *ele = listNodeValue(ln);
942a3961 3640 addReplyBulkLen(c,ele);
ed9b544e 3641 addReply(c,ele);
3642 addReply(c,shared.crlf);
3643 }
3644 }
3645 }
3646}
3647
3648static void lsetCommand(redisClient *c) {
3305306f 3649 robj *o;
ed9b544e 3650 int index = atoi(c->argv[2]->ptr);
3651
3305306f 3652 o = lookupKeyWrite(c->db,c->argv[1]);
3653 if (o == NULL) {
ed9b544e 3654 addReply(c,shared.nokeyerr);
3655 } else {
ed9b544e 3656 if (o->type != REDIS_LIST) {
3657 addReply(c,shared.wrongtypeerr);
3658 } else {
3659 list *list = o->ptr;
3660 listNode *ln;
3661
3662 ln = listIndex(list, index);
3663 if (ln == NULL) {
c937aa89 3664 addReply(c,shared.outofrangeerr);
ed9b544e 3665 } else {
3666 robj *ele = listNodeValue(ln);
3667
3668 decrRefCount(ele);
3669 listNodeValue(ln) = c->argv[3];
3670 incrRefCount(c->argv[3]);
3671 addReply(c,shared.ok);
3672 server.dirty++;
3673 }
3674 }
3675 }
3676}
3677
3678static void popGenericCommand(redisClient *c, int where) {
3305306f 3679 robj *o;
3680
3681 o = lookupKeyWrite(c->db,c->argv[1]);
3682 if (o == NULL) {
c937aa89 3683 addReply(c,shared.nullbulk);
ed9b544e 3684 } else {
ed9b544e 3685 if (o->type != REDIS_LIST) {
c937aa89 3686 addReply(c,shared.wrongtypeerr);
ed9b544e 3687 } else {
3688 list *list = o->ptr;
3689 listNode *ln;
3690
3691 if (where == REDIS_HEAD)
3692 ln = listFirst(list);
3693 else
3694 ln = listLast(list);
3695
3696 if (ln == NULL) {
c937aa89 3697 addReply(c,shared.nullbulk);
ed9b544e 3698 } else {
3699 robj *ele = listNodeValue(ln);
942a3961 3700 addReplyBulkLen(c,ele);
ed9b544e 3701 addReply(c,ele);
3702 addReply(c,shared.crlf);
3703 listDelNode(list,ln);
3704 server.dirty++;
3705 }
3706 }
3707 }
3708}
3709
3710static void lpopCommand(redisClient *c) {
3711 popGenericCommand(c,REDIS_HEAD);
3712}
3713
3714static void rpopCommand(redisClient *c) {
3715 popGenericCommand(c,REDIS_TAIL);
3716}
3717
3718static void lrangeCommand(redisClient *c) {
3305306f 3719 robj *o;
ed9b544e 3720 int start = atoi(c->argv[2]->ptr);
3721 int end = atoi(c->argv[3]->ptr);
3305306f 3722
3723 o = lookupKeyRead(c->db,c->argv[1]);
3724 if (o == NULL) {
c937aa89 3725 addReply(c,shared.nullmultibulk);
ed9b544e 3726 } else {
ed9b544e 3727 if (o->type != REDIS_LIST) {
c937aa89 3728 addReply(c,shared.wrongtypeerr);
ed9b544e 3729 } else {
3730 list *list = o->ptr;
3731 listNode *ln;
3732 int llen = listLength(list);
3733 int rangelen, j;
3734 robj *ele;
3735
3736 /* convert negative indexes */
3737 if (start < 0) start = llen+start;
3738 if (end < 0) end = llen+end;
3739 if (start < 0) start = 0;
3740 if (end < 0) end = 0;
3741
3742 /* indexes sanity checks */
3743 if (start > end || start >= llen) {
3744 /* Out of range start or start > end result in empty list */
c937aa89 3745 addReply(c,shared.emptymultibulk);
ed9b544e 3746 return;
3747 }
3748 if (end >= llen) end = llen-1;
3749 rangelen = (end-start)+1;
3750
3751 /* Return the result in form of a multi-bulk reply */
3752 ln = listIndex(list, start);
c937aa89 3753 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
ed9b544e 3754 for (j = 0; j < rangelen; j++) {
3755 ele = listNodeValue(ln);
942a3961 3756 addReplyBulkLen(c,ele);
ed9b544e 3757 addReply(c,ele);
3758 addReply(c,shared.crlf);
3759 ln = ln->next;
3760 }
3761 }
3762 }
3763}
3764
3765static void ltrimCommand(redisClient *c) {
3305306f 3766 robj *o;
ed9b544e 3767 int start = atoi(c->argv[2]->ptr);
3768 int end = atoi(c->argv[3]->ptr);
3769
3305306f 3770 o = lookupKeyWrite(c->db,c->argv[1]);
3771 if (o == NULL) {
ab9d4cb1 3772 addReply(c,shared.ok);
ed9b544e 3773 } else {
ed9b544e 3774 if (o->type != REDIS_LIST) {
3775 addReply(c,shared.wrongtypeerr);
3776 } else {
3777 list *list = o->ptr;
3778 listNode *ln;
3779 int llen = listLength(list);
3780 int j, ltrim, rtrim;
3781
3782 /* convert negative indexes */
3783 if (start < 0) start = llen+start;
3784 if (end < 0) end = llen+end;
3785 if (start < 0) start = 0;
3786 if (end < 0) end = 0;
3787
3788 /* indexes sanity checks */
3789 if (start > end || start >= llen) {
3790 /* Out of range start or start > end result in empty list */
3791 ltrim = llen;
3792 rtrim = 0;
3793 } else {
3794 if (end >= llen) end = llen-1;
3795 ltrim = start;
3796 rtrim = llen-end-1;
3797 }
3798
3799 /* Remove list elements to perform the trim */
3800 for (j = 0; j < ltrim; j++) {
3801 ln = listFirst(list);
3802 listDelNode(list,ln);
3803 }
3804 for (j = 0; j < rtrim; j++) {
3805 ln = listLast(list);
3806 listDelNode(list,ln);
3807 }
ed9b544e 3808 server.dirty++;
e59229a2 3809 addReply(c,shared.ok);
ed9b544e 3810 }
3811 }
3812}
3813
3814static void lremCommand(redisClient *c) {
3305306f 3815 robj *o;
ed9b544e 3816
3305306f 3817 o = lookupKeyWrite(c->db,c->argv[1]);
3818 if (o == NULL) {
33c08b39 3819 addReply(c,shared.czero);
ed9b544e 3820 } else {
ed9b544e 3821 if (o->type != REDIS_LIST) {
c937aa89 3822 addReply(c,shared.wrongtypeerr);
ed9b544e 3823 } else {
3824 list *list = o->ptr;
3825 listNode *ln, *next;
3826 int toremove = atoi(c->argv[2]->ptr);
3827 int removed = 0;
3828 int fromtail = 0;
3829
3830 if (toremove < 0) {
3831 toremove = -toremove;
3832 fromtail = 1;
3833 }
3834 ln = fromtail ? list->tail : list->head;
3835 while (ln) {
ed9b544e 3836 robj *ele = listNodeValue(ln);
a4d1ba9a 3837
3838 next = fromtail ? ln->prev : ln->next;
724a51b1 3839 if (compareStringObjects(ele,c->argv[3]) == 0) {
ed9b544e 3840 listDelNode(list,ln);
3841 server.dirty++;
3842 removed++;
3843 if (toremove && removed == toremove) break;
3844 }
3845 ln = next;
3846 }
c937aa89 3847 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
ed9b544e 3848 }
3849 }
3850}
3851
12f9d551 3852/* This is the semantic of this command:
0f5f7e9a 3853 * RPOPLPUSH srclist dstlist:
12f9d551 3854 * IF LLEN(srclist) > 0
3855 * element = RPOP srclist
3856 * LPUSH dstlist element
3857 * RETURN element
3858 * ELSE
3859 * RETURN nil
3860 * END
3861 * END
3862 *
3863 * The idea is to be able to get an element from a list in a reliable way
3864 * since the element is not just returned but pushed against another list
3865 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3866 */
0f5f7e9a 3867static void rpoplpushcommand(redisClient *c) {
12f9d551 3868 robj *sobj;
3869
3870 sobj = lookupKeyWrite(c->db,c->argv[1]);
3871 if (sobj == NULL) {
3872 addReply(c,shared.nullbulk);
3873 } else {
3874 if (sobj->type != REDIS_LIST) {
3875 addReply(c,shared.wrongtypeerr);
3876 } else {
3877 list *srclist = sobj->ptr;
3878 listNode *ln = listLast(srclist);
3879
3880 if (ln == NULL) {
3881 addReply(c,shared.nullbulk);
3882 } else {
3883 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3884 robj *ele = listNodeValue(ln);
3885 list *dstlist;
3886
e20fb74f 3887 if (dobj && dobj->type != REDIS_LIST) {
12f9d551 3888 addReply(c,shared.wrongtypeerr);
3889 return;
3890 }
e20fb74f 3891
3892 /* Add the element to the target list (unless it's directly
3893 * passed to some BLPOP-ing client */
3894 if (!handleClientsWaitingListPush(c,c->argv[2],ele)) {
3895 if (dobj == NULL) {
3896 /* Create the list if the key does not exist */
3897 dobj = createListObject();
3898 dictAdd(c->db->dict,c->argv[2],dobj);
3899 incrRefCount(c->argv[2]);
3900 }
3901 dstlist = dobj->ptr;
3902 listAddNodeHead(dstlist,ele);
3903 incrRefCount(ele);
3904 }
12f9d551 3905
3906 /* Send the element to the client as reply as well */
3907 addReplyBulkLen(c,ele);
3908 addReply(c,ele);
3909 addReply(c,shared.crlf);
3910
3911 /* Finally remove the element from the source list */
3912 listDelNode(srclist,ln);
3913 server.dirty++;
3914 }
3915 }
3916 }
3917}
3918
3919
ed9b544e 3920/* ==================================== Sets ================================ */
3921
3922static void saddCommand(redisClient *c) {
ed9b544e 3923 robj *set;
3924
3305306f 3925 set = lookupKeyWrite(c->db,c->argv[1]);
3926 if (set == NULL) {
ed9b544e 3927 set = createSetObject();
3305306f 3928 dictAdd(c->db->dict,c->argv[1],set);
ed9b544e 3929 incrRefCount(c->argv[1]);
3930 } else {
ed9b544e 3931 if (set->type != REDIS_SET) {
c937aa89 3932 addReply(c,shared.wrongtypeerr);
ed9b544e 3933 return;
3934 }
3935 }
3936 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3937 incrRefCount(c->argv[2]);
3938 server.dirty++;
c937aa89 3939 addReply(c,shared.cone);
ed9b544e 3940 } else {
c937aa89 3941 addReply(c,shared.czero);
ed9b544e 3942 }
3943}
3944
3945static void sremCommand(redisClient *c) {
3305306f 3946 robj *set;
ed9b544e 3947
3305306f 3948 set = lookupKeyWrite(c->db,c->argv[1]);
3949 if (set == NULL) {
c937aa89 3950 addReply(c,shared.czero);
ed9b544e 3951 } else {
ed9b544e 3952 if (set->type != REDIS_SET) {
c937aa89 3953 addReply(c,shared.wrongtypeerr);
ed9b544e 3954 return;
3955 }
3956 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3957 server.dirty++;
12fea928 3958 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
c937aa89 3959 addReply(c,shared.cone);
ed9b544e 3960 } else {
c937aa89 3961 addReply(c,shared.czero);
ed9b544e 3962 }
3963 }
3964}
3965
a4460ef4 3966static void smoveCommand(redisClient *c) {
3967 robj *srcset, *dstset;
3968
3969 srcset = lookupKeyWrite(c->db,c->argv[1]);
3970 dstset = lookupKeyWrite(c->db,c->argv[2]);
3971
3972 /* If the source key does not exist return 0, if it's of the wrong type
3973 * raise an error */
3974 if (srcset == NULL || srcset->type != REDIS_SET) {
3975 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3976 return;
3977 }
3978 /* Error if the destination key is not a set as well */
3979 if (dstset && dstset->type != REDIS_SET) {
3980 addReply(c,shared.wrongtypeerr);
3981 return;
3982 }
3983 /* Remove the element from the source set */
3984 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3985 /* Key not found in the src set! return zero */
3986 addReply(c,shared.czero);
3987 return;
3988 }
3989 server.dirty++;
3990 /* Add the element to the destination set */
3991 if (!dstset) {
3992 dstset = createSetObject();
3993 dictAdd(c->db->dict,c->argv[2],dstset);
3994 incrRefCount(c->argv[2]);
3995 }
3996 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3997 incrRefCount(c->argv[3]);
3998 addReply(c,shared.cone);
3999}
4000
ed9b544e 4001static void sismemberCommand(redisClient *c) {
3305306f 4002 robj *set;
ed9b544e 4003
3305306f 4004 set = lookupKeyRead(c->db,c->argv[1]);
4005 if (set == NULL) {
c937aa89 4006 addReply(c,shared.czero);
ed9b544e 4007 } else {
ed9b544e 4008 if (set->type != REDIS_SET) {
c937aa89 4009 addReply(c,shared.wrongtypeerr);
ed9b544e 4010 return;
4011 }
4012 if (dictFind(set->ptr,c->argv[2]))
c937aa89 4013 addReply(c,shared.cone);
ed9b544e 4014 else
c937aa89 4015 addReply(c,shared.czero);
ed9b544e 4016 }
4017}
4018
4019static void scardCommand(redisClient *c) {
3305306f 4020 robj *o;
ed9b544e 4021 dict *s;
4022
3305306f 4023 o = lookupKeyRead(c->db,c->argv[1]);
4024 if (o == NULL) {
c937aa89 4025 addReply(c,shared.czero);
ed9b544e 4026 return;
4027 } else {
ed9b544e 4028 if (o->type != REDIS_SET) {
c937aa89 4029 addReply(c,shared.wrongtypeerr);
ed9b544e 4030 } else {
4031 s = o->ptr;
682ac724 4032 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
3305306f 4033 dictSize(s)));
ed9b544e 4034 }
4035 }
4036}
4037
12fea928 4038static void spopCommand(redisClient *c) {
4039 robj *set;
4040 dictEntry *de;
4041
4042 set = lookupKeyWrite(c->db,c->argv[1]);
4043 if (set == NULL) {
4044 addReply(c,shared.nullbulk);
4045 } else {
4046 if (set->type != REDIS_SET) {
4047 addReply(c,shared.wrongtypeerr);
4048 return;
4049 }
4050 de = dictGetRandomKey(set->ptr);
4051 if (de == NULL) {
4052 addReply(c,shared.nullbulk);
4053 } else {
4054 robj *ele = dictGetEntryKey(de);
4055
942a3961 4056 addReplyBulkLen(c,ele);
12fea928 4057 addReply(c,ele);
4058 addReply(c,shared.crlf);
4059 dictDelete(set->ptr,ele);
4060 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
4061 server.dirty++;
4062 }
4063 }
4064}
4065
2abb95a9 4066static void srandmemberCommand(redisClient *c) {
4067 robj *set;
4068 dictEntry *de;
4069
4070 set = lookupKeyRead(c->db,c->argv[1]);
4071 if (set == NULL) {
4072 addReply(c,shared.nullbulk);
4073 } else {
4074 if (set->type != REDIS_SET) {
4075 addReply(c,shared.wrongtypeerr);
4076 return;
4077 }
4078 de = dictGetRandomKey(set->ptr);
4079 if (de == NULL) {
4080 addReply(c,shared.nullbulk);
4081 } else {
4082 robj *ele = dictGetEntryKey(de);
4083
4084 addReplyBulkLen(c,ele);
4085 addReply(c,ele);
4086 addReply(c,shared.crlf);
4087 }
4088 }
4089}
4090
ed9b544e 4091static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
4092 dict **d1 = (void*) s1, **d2 = (void*) s2;
4093
3305306f 4094 return dictSize(*d1)-dictSize(*d2);
ed9b544e 4095}
4096
682ac724 4097static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
ed9b544e 4098 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4099 dictIterator *di;
4100 dictEntry *de;
4101 robj *lenobj = NULL, *dstset = NULL;
682ac724 4102 unsigned long j, cardinality = 0;
ed9b544e 4103
ed9b544e 4104 for (j = 0; j < setsnum; j++) {
4105 robj *setobj;
3305306f 4106
4107 setobj = dstkey ?
4108 lookupKeyWrite(c->db,setskeys[j]) :
4109 lookupKeyRead(c->db,setskeys[j]);
4110 if (!setobj) {
ed9b544e 4111 zfree(dv);
5faa6025 4112 if (dstkey) {
fdcaae84 4113 if (deleteKey(c->db,dstkey))
4114 server.dirty++;
0d36ded0 4115 addReply(c,shared.czero);
5faa6025 4116 } else {
4117 addReply(c,shared.nullmultibulk);
4118 }
ed9b544e 4119 return;
4120 }
ed9b544e 4121 if (setobj->type != REDIS_SET) {
4122 zfree(dv);
c937aa89 4123 addReply(c,shared.wrongtypeerr);
ed9b544e 4124 return;
4125 }
4126 dv[j] = setobj->ptr;
4127 }
4128 /* Sort sets from the smallest to largest, this will improve our
4129 * algorithm's performace */
4130 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
4131
4132 /* The first thing we should output is the total number of elements...
4133 * since this is a multi-bulk write, but at this stage we don't know
4134 * the intersection set size, so we use a trick, append an empty object
4135 * to the output list and save the pointer to later modify it with the
4136 * right length */
4137 if (!dstkey) {
4138 lenobj = createObject(REDIS_STRING,NULL);
4139 addReply(c,lenobj);
4140 decrRefCount(lenobj);
4141 } else {
4142 /* If we have a target key where to store the resulting set
4143 * create this key with an empty set inside */
4144 dstset = createSetObject();
ed9b544e 4145 }
4146
4147 /* Iterate all the elements of the first (smallest) set, and test
4148 * the element against all the other sets, if at least one set does
4149 * not include the element it is discarded */
4150 di = dictGetIterator(dv[0]);
ed9b544e 4151
4152 while((de = dictNext(di)) != NULL) {
4153 robj *ele;
4154
4155 for (j = 1; j < setsnum; j++)
4156 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4157 if (j != setsnum)
4158 continue; /* at least one set does not contain the member */
4159 ele = dictGetEntryKey(de);
4160 if (!dstkey) {
942a3961 4161 addReplyBulkLen(c,ele);
ed9b544e 4162 addReply(c,ele);
4163 addReply(c,shared.crlf);
4164 cardinality++;
4165 } else {
4166 dictAdd(dstset->ptr,ele,NULL);
4167 incrRefCount(ele);
4168 }
4169 }
4170 dictReleaseIterator(di);
4171
83cdfe18
AG
4172 if (dstkey) {
4173 /* Store the resulting set into the target */
4174 deleteKey(c->db,dstkey);
4175 dictAdd(c->db->dict,dstkey,dstset);
4176 incrRefCount(dstkey);
4177 }
4178
40d224a9 4179 if (!dstkey) {
682ac724 4180 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
40d224a9 4181 } else {
682ac724 4182 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
03fd01c7 4183 dictSize((dict*)dstset->ptr)));
40d224a9 4184 server.dirty++;
4185 }
ed9b544e 4186 zfree(dv);
4187}
4188
4189static void sinterCommand(redisClient *c) {
4190 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4191}
4192
4193static void sinterstoreCommand(redisClient *c) {
4194 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4195}
4196
f4f56e1d 4197#define REDIS_OP_UNION 0
4198#define REDIS_OP_DIFF 1
4199
4200static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
40d224a9 4201 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4202 dictIterator *di;
4203 dictEntry *de;
f4f56e1d 4204 robj *dstset = NULL;
40d224a9 4205 int j, cardinality = 0;
4206
40d224a9 4207 for (j = 0; j < setsnum; j++) {
4208 robj *setobj;
4209
4210 setobj = dstkey ?
4211 lookupKeyWrite(c->db,setskeys[j]) :
4212 lookupKeyRead(c->db,setskeys[j]);
4213 if (!setobj) {
4214 dv[j] = NULL;
4215 continue;
4216 }
4217 if (setobj->type != REDIS_SET) {
4218 zfree(dv);
4219 addReply(c,shared.wrongtypeerr);
4220 return;
4221 }
4222 dv[j] = setobj->ptr;
4223 }
4224
4225 /* We need a temp set object to store our union. If the dstkey
4226 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4227 * this set object will be the resulting object to set into the target key*/
4228 dstset = createSetObject();
4229
40d224a9 4230 /* Iterate all the elements of all the sets, add every element a single
4231 * time to the result set */
4232 for (j = 0; j < setsnum; j++) {
51829ed3 4233 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
40d224a9 4234 if (!dv[j]) continue; /* non existing keys are like empty sets */
4235
4236 di = dictGetIterator(dv[j]);
40d224a9 4237
4238 while((de = dictNext(di)) != NULL) {
4239 robj *ele;
4240
4241 /* dictAdd will not add the same element multiple times */
4242 ele = dictGetEntryKey(de);
f4f56e1d 4243 if (op == REDIS_OP_UNION || j == 0) {
4244 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4245 incrRefCount(ele);
40d224a9 4246 cardinality++;
4247 }
f4f56e1d 4248 } else if (op == REDIS_OP_DIFF) {
4249 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4250 cardinality--;
4251 }
40d224a9 4252 }
4253 }
4254 dictReleaseIterator(di);
51829ed3
AG
4255
4256 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
40d224a9 4257 }
4258
f4f56e1d 4259 /* Output the content of the resulting set, if not in STORE mode */
4260 if (!dstkey) {
4261 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4262 di = dictGetIterator(dstset->ptr);
f4f56e1d 4263 while((de = dictNext(di)) != NULL) {
4264 robj *ele;
4265
4266 ele = dictGetEntryKey(de);
942a3961 4267 addReplyBulkLen(c,ele);
f4f56e1d 4268 addReply(c,ele);
4269 addReply(c,shared.crlf);
4270 }
4271 dictReleaseIterator(di);
83cdfe18
AG
4272 } else {
4273 /* If we have a target key where to store the resulting set
4274 * create this key with the result set inside */
4275 deleteKey(c->db,dstkey);
4276 dictAdd(c->db->dict,dstkey,dstset);
4277 incrRefCount(dstkey);
f4f56e1d 4278 }
4279
4280 /* Cleanup */
40d224a9 4281 if (!dstkey) {
40d224a9 4282 decrRefCount(dstset);
4283 } else {
682ac724 4284 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
03fd01c7 4285 dictSize((dict*)dstset->ptr)));
40d224a9 4286 server.dirty++;
4287 }
4288 zfree(dv);
4289}
4290
4291static void sunionCommand(redisClient *c) {
f4f56e1d 4292 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
40d224a9 4293}
4294
4295static void sunionstoreCommand(redisClient *c) {
f4f56e1d 4296 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4297}
4298
4299static void sdiffCommand(redisClient *c) {
4300 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4301}
4302
4303static void sdiffstoreCommand(redisClient *c) {
4304 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
40d224a9 4305}
4306
6b47e12e 4307/* ==================================== ZSets =============================== */
4308
4309/* ZSETs are ordered sets using two data structures to hold the same elements
4310 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4311 * data structure.
4312 *
4313 * The elements are added to an hash table mapping Redis objects to scores.
4314 * At the same time the elements are added to a skip list mapping scores
4315 * to Redis objects (so objects are sorted by scores in this "view"). */
4316
4317/* This skiplist implementation is almost a C translation of the original
4318 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4319 * Alternative to Balanced Trees", modified in three ways:
4320 * a) this implementation allows for repeated values.
4321 * b) the comparison is not just by key (our 'score') but by satellite data.
4322 * c) there is a back pointer, so it's a doubly linked list with the back
4323 * pointers being only at "level 1". This allows to traverse the list
4324 * from tail to head, useful for ZREVRANGE. */
4325
4326static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4327 zskiplistNode *zn = zmalloc(sizeof(*zn));
4328
4329 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4330 zn->score = score;
4331 zn->obj = obj;
4332 return zn;
4333}
4334
4335static zskiplist *zslCreate(void) {
4336 int j;
4337 zskiplist *zsl;
4338
4339 zsl = zmalloc(sizeof(*zsl));
4340 zsl->level = 1;
cc812361 4341 zsl->length = 0;
6b47e12e 4342 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4343 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4344 zsl->header->forward[j] = NULL;
e3870fab 4345 zsl->header->backward = NULL;
4346 zsl->tail = NULL;
6b47e12e 4347 return zsl;
4348}
4349
fd8ccf44 4350static void zslFreeNode(zskiplistNode *node) {
4351 decrRefCount(node->obj);
ad807e6f 4352 zfree(node->forward);
fd8ccf44 4353 zfree(node);
4354}
4355
4356static void zslFree(zskiplist *zsl) {
ad807e6f 4357 zskiplistNode *node = zsl->header->forward[0], *next;
fd8ccf44 4358
ad807e6f 4359 zfree(zsl->header->forward);
4360 zfree(zsl->header);
fd8ccf44 4361 while(node) {
599379dd 4362 next = node->forward[0];
fd8ccf44 4363 zslFreeNode(node);
4364 node = next;
4365 }
ad807e6f 4366 zfree(zsl);
fd8ccf44 4367}
4368
6b47e12e 4369static int zslRandomLevel(void) {
4370 int level = 1;
4371 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4372 level += 1;
4373 return level;
4374}
4375
4376static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4377 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4378 int i, level;
4379
4380 x = zsl->header;
4381 for (i = zsl->level-1; i >= 0; i--) {
9d60e6e4 4382 while (x->forward[i] &&
4383 (x->forward[i]->score < score ||
4384 (x->forward[i]->score == score &&
4385 compareStringObjects(x->forward[i]->obj,obj) < 0)))
6b47e12e 4386 x = x->forward[i];
4387 update[i] = x;
4388 }
6b47e12e 4389 /* we assume the key is not already inside, since we allow duplicated
4390 * scores, and the re-insertion of score and redis object should never
4391 * happpen since the caller of zslInsert() should test in the hash table
4392 * if the element is already inside or not. */
4393 level = zslRandomLevel();
4394 if (level > zsl->level) {
4395 for (i = zsl->level; i < level; i++)
4396 update[i] = zsl->header;
4397 zsl->level = level;
4398 }
4399 x = zslCreateNode(level,score,obj);
4400 for (i = 0; i < level; i++) {
4401 x->forward[i] = update[i]->forward[i];
4402 update[i]->forward[i] = x;
4403 }
bb975144 4404 x->backward = (update[0] == zsl->header) ? NULL : update[0];
e3870fab 4405 if (x->forward[0])
4406 x->forward[0]->backward = x;
4407 else
4408 zsl->tail = x;
cc812361 4409 zsl->length++;
6b47e12e 4410}
4411
50c55df5 4412/* Delete an element with matching score/object from the skiplist. */
fd8ccf44 4413static int zslDelete(zskiplist *zsl, double score, robj *obj) {
e197b441 4414 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4415 int i;
4416
4417 x = zsl->header;
4418 for (i = zsl->level-1; i >= 0; i--) {
9d60e6e4 4419 while (x->forward[i] &&
4420 (x->forward[i]->score < score ||
4421 (x->forward[i]->score == score &&
4422 compareStringObjects(x->forward[i]->obj,obj) < 0)))
e197b441 4423 x = x->forward[i];
4424 update[i] = x;
4425 }
4426 /* We may have multiple elements with the same score, what we need
4427 * is to find the element with both the right score and object. */
4428 x = x->forward[0];
50c55df5 4429 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
9d60e6e4 4430 for (i = 0; i < zsl->level; i++) {
4431 if (update[i]->forward[i] != x) break;
4432 update[i]->forward[i] = x->forward[i];
4433 }
4434 if (x->forward[0]) {
4435 x->forward[0]->backward = (x->backward == zsl->header) ?
4436 NULL : x->backward;
e197b441 4437 } else {
9d60e6e4 4438 zsl->tail = x->backward;
e197b441 4439 }
9d60e6e4 4440 zslFreeNode(x);
4441 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4442 zsl->level--;
4443 zsl->length--;
4444 return 1;
4445 } else {
4446 return 0; /* not found */
e197b441 4447 }
4448 return 0; /* not found */
fd8ccf44 4449}
4450
1807985b 4451/* Delete all the elements with score between min and max from the skiplist.
4452 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4453 * Note that this function takes the reference to the hash table view of the
4454 * sorted set, in order to remove the elements from the hash table too. */
4455static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4456 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4457 unsigned long removed = 0;
4458 int i;
4459
4460 x = zsl->header;
4461 for (i = zsl->level-1; i >= 0; i--) {
4462 while (x->forward[i] && x->forward[i]->score < min)
4463 x = x->forward[i];
4464 update[i] = x;
4465 }
4466 /* We may have multiple elements with the same score, what we need
4467 * is to find the element with both the right score and object. */
4468 x = x->forward[0];
4469 while (x && x->score <= max) {
4470 zskiplistNode *next;
4471
4472 for (i = 0; i < zsl->level; i++) {
4473 if (update[i]->forward[i] != x) break;
4474 update[i]->forward[i] = x->forward[i];
4475 }
4476 if (x->forward[0]) {
4477 x->forward[0]->backward = (x->backward == zsl->header) ?
4478 NULL : x->backward;
4479 } else {
4480 zsl->tail = x->backward;
4481 }
4482 next = x->forward[0];
4483 dictDelete(dict,x->obj);
4484 zslFreeNode(x);
4485 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4486 zsl->level--;
4487 zsl->length--;
4488 removed++;
4489 x = next;
4490 }
4491 return removed; /* not found */
4492}
4493
50c55df5 4494/* Find the first node having a score equal or greater than the specified one.
4495 * Returns NULL if there is no match. */
4496static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4497 zskiplistNode *x;
4498 int i;
4499
4500 x = zsl->header;
4501 for (i = zsl->level-1; i >= 0; i--) {
4502 while (x->forward[i] && x->forward[i]->score < score)
4503 x = x->forward[i];
4504 }
4505 /* We may have multiple elements with the same score, what we need
4506 * is to find the element with both the right score and object. */
4507 return x->forward[0];
4508}
4509
fd8ccf44 4510/* The actual Z-commands implementations */
4511
7db723ad 4512/* This generic command implements both ZADD and ZINCRBY.
e2665397 4513 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
7db723ad 4514 * the increment if the operation is a ZINCRBY (doincrement == 1). */
e2665397 4515static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
fd8ccf44 4516 robj *zsetobj;
4517 zset *zs;
4518 double *score;
4519
e2665397 4520 zsetobj = lookupKeyWrite(c->db,key);
fd8ccf44 4521 if (zsetobj == NULL) {
4522 zsetobj = createZsetObject();
e2665397 4523 dictAdd(c->db->dict,key,zsetobj);
4524 incrRefCount(key);
fd8ccf44 4525 } else {
4526 if (zsetobj->type != REDIS_ZSET) {
4527 addReply(c,shared.wrongtypeerr);
4528 return;
4529 }
4530 }
fd8ccf44 4531 zs = zsetobj->ptr;
e2665397 4532
7db723ad 4533 /* Ok now since we implement both ZADD and ZINCRBY here the code
e2665397 4534 * needs to handle the two different conditions. It's all about setting
4535 * '*score', that is, the new score to set, to the right value. */
4536 score = zmalloc(sizeof(double));
4537 if (doincrement) {
4538 dictEntry *de;
4539
4540 /* Read the old score. If the element was not present starts from 0 */
4541 de = dictFind(zs->dict,ele);
4542 if (de) {
4543 double *oldscore = dictGetEntryVal(de);
4544 *score = *oldscore + scoreval;
4545 } else {
4546 *score = scoreval;
4547 }
4548 } else {
4549 *score = scoreval;
4550 }
4551
4552 /* What follows is a simple remove and re-insert operation that is common
7db723ad 4553 * to both ZADD and ZINCRBY... */
e2665397 4554 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
fd8ccf44 4555 /* case 1: New element */
e2665397 4556 incrRefCount(ele); /* added to hash */
4557 zslInsert(zs->zsl,*score,ele);
4558 incrRefCount(ele); /* added to skiplist */
fd8ccf44 4559 server.dirty++;
e2665397 4560 if (doincrement)
e2665397 4561 addReplyDouble(c,*score);
91d71bfc 4562 else
4563 addReply(c,shared.cone);
fd8ccf44 4564 } else {
4565 dictEntry *de;
4566 double *oldscore;
4567
4568 /* case 2: Score update operation */
e2665397 4569 de = dictFind(zs->dict,ele);
dfc5e96c 4570 redisAssert(de != NULL);
fd8ccf44 4571 oldscore = dictGetEntryVal(de);
4572 if (*score != *oldscore) {
4573 int deleted;
4574
e2665397 4575 /* Remove and insert the element in the skip list with new score */
4576 deleted = zslDelete(zs->zsl,*oldscore,ele);
dfc5e96c 4577 redisAssert(deleted != 0);
e2665397 4578 zslInsert(zs->zsl,*score,ele);
4579 incrRefCount(ele);
4580 /* Update the score in the hash table */
4581 dictReplace(zs->dict,ele,score);
fd8ccf44 4582 server.dirty++;
2161a965 4583 } else {
4584 zfree(score);
fd8ccf44 4585 }
e2665397 4586 if (doincrement)
4587 addReplyDouble(c,*score);
4588 else
4589 addReply(c,shared.czero);
fd8ccf44 4590 }
4591}
4592
e2665397 4593static void zaddCommand(redisClient *c) {
4594 double scoreval;
4595
4596 scoreval = strtod(c->argv[2]->ptr,NULL);
4597 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4598}
4599
7db723ad 4600static void zincrbyCommand(redisClient *c) {
e2665397 4601 double scoreval;
4602
4603 scoreval = strtod(c->argv[2]->ptr,NULL);
4604 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4605}
4606
1b7106e7 4607static void zremCommand(redisClient *c) {
4608 robj *zsetobj;
4609 zset *zs;
4610
4611 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4612 if (zsetobj == NULL) {
4613 addReply(c,shared.czero);
4614 } else {
4615 dictEntry *de;
4616 double *oldscore;
4617 int deleted;
4618
4619 if (zsetobj->type != REDIS_ZSET) {
4620 addReply(c,shared.wrongtypeerr);
4621 return;
4622 }
4623 zs = zsetobj->ptr;
4624 de = dictFind(zs->dict,c->argv[2]);
4625 if (de == NULL) {
4626 addReply(c,shared.czero);
4627 return;
4628 }
4629 /* Delete from the skiplist */
4630 oldscore = dictGetEntryVal(de);
4631 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
dfc5e96c 4632 redisAssert(deleted != 0);
1b7106e7 4633
4634 /* Delete from the hash table */
4635 dictDelete(zs->dict,c->argv[2]);
4636 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4637 server.dirty++;
4638 addReply(c,shared.cone);
4639 }
4640}
4641
1807985b 4642static void zremrangebyscoreCommand(redisClient *c) {
4643 double min = strtod(c->argv[2]->ptr,NULL);
4644 double max = strtod(c->argv[3]->ptr,NULL);
4645 robj *zsetobj;
4646 zset *zs;
4647
4648 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4649 if (zsetobj == NULL) {
4650 addReply(c,shared.czero);
4651 } else {
4652 long deleted;
4653
4654 if (zsetobj->type != REDIS_ZSET) {
4655 addReply(c,shared.wrongtypeerr);
4656 return;
4657 }
4658 zs = zsetobj->ptr;
4659 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4660 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4661 server.dirty += deleted;
4662 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4663 }
4664}
4665
e3870fab 4666static void zrangeGenericCommand(redisClient *c, int reverse) {
cc812361 4667 robj *o;
4668 int start = atoi(c->argv[2]->ptr);
4669 int end = atoi(c->argv[3]->ptr);
752da584 4670 int withscores = 0;
4671
4672 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
4673 withscores = 1;
4674 } else if (c->argc >= 5) {
4675 addReply(c,shared.syntaxerr);
4676 return;
4677 }
cc812361 4678
4679 o = lookupKeyRead(c->db,c->argv[1]);
4680 if (o == NULL) {
4681 addReply(c,shared.nullmultibulk);
4682 } else {
4683 if (o->type != REDIS_ZSET) {
4684 addReply(c,shared.wrongtypeerr);
4685 } else {
4686 zset *zsetobj = o->ptr;
4687 zskiplist *zsl = zsetobj->zsl;
4688 zskiplistNode *ln;
4689
4690 int llen = zsl->length;
4691 int rangelen, j;
4692 robj *ele;
4693
4694 /* convert negative indexes */
4695 if (start < 0) start = llen+start;
4696 if (end < 0) end = llen+end;
4697 if (start < 0) start = 0;
4698 if (end < 0) end = 0;
4699
4700 /* indexes sanity checks */
4701 if (start > end || start >= llen) {
4702 /* Out of range start or start > end result in empty list */
4703 addReply(c,shared.emptymultibulk);
4704 return;
4705 }
4706 if (end >= llen) end = llen-1;
4707 rangelen = (end-start)+1;
4708
4709 /* Return the result in form of a multi-bulk reply */
e3870fab 4710 if (reverse) {
4711 ln = zsl->tail;
4712 while (start--)
4713 ln = ln->backward;
4714 } else {
4715 ln = zsl->header->forward[0];
4716 while (start--)
4717 ln = ln->forward[0];
4718 }
cc812361 4719
752da584 4720 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
4721 withscores ? (rangelen*2) : rangelen));
cc812361 4722 for (j = 0; j < rangelen; j++) {
0aad7a19 4723 ele = ln->obj;
cc812361 4724 addReplyBulkLen(c,ele);
4725 addReply(c,ele);
4726 addReply(c,shared.crlf);
752da584 4727 if (withscores)
4728 addReplyDouble(c,ln->score);
e3870fab 4729 ln = reverse ? ln->backward : ln->forward[0];
cc812361 4730 }
4731 }
4732 }
4733}
4734
e3870fab 4735static void zrangeCommand(redisClient *c) {
4736 zrangeGenericCommand(c,0);
4737}
4738
4739static void zrevrangeCommand(redisClient *c) {
4740 zrangeGenericCommand(c,1);
4741}
4742
50c55df5 4743static void zrangebyscoreCommand(redisClient *c) {
4744 robj *o;
4745 double min = strtod(c->argv[2]->ptr,NULL);
4746 double max = strtod(c->argv[3]->ptr,NULL);
80181f78 4747 int offset = 0, limit = -1;
4748
4749 if (c->argc != 4 && c->argc != 7) {
454d4e43 4750 addReplySds(c,
4751 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
80181f78 4752 return;
4753 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4754 addReply(c,shared.syntaxerr);
4755 return;
4756 } else if (c->argc == 7) {
4757 offset = atoi(c->argv[5]->ptr);
4758 limit = atoi(c->argv[6]->ptr);
0b13687c 4759 if (offset < 0) offset = 0;
80181f78 4760 }
50c55df5 4761
4762 o = lookupKeyRead(c->db,c->argv[1]);
4763 if (o == NULL) {
4764 addReply(c,shared.nullmultibulk);
4765 } else {
4766 if (o->type != REDIS_ZSET) {
4767 addReply(c,shared.wrongtypeerr);
4768 } else {
4769 zset *zsetobj = o->ptr;
4770 zskiplist *zsl = zsetobj->zsl;
4771 zskiplistNode *ln;
4772 robj *ele, *lenobj;
4773 unsigned int rangelen = 0;
4774
4775 /* Get the first node with the score >= min */
4776 ln = zslFirstWithScore(zsl,min);
4777 if (ln == NULL) {
4778 /* No element matching the speciifed interval */
4779 addReply(c,shared.emptymultibulk);
4780 return;
4781 }
4782
4783 /* We don't know in advance how many matching elements there
4784 * are in the list, so we push this object that will represent
4785 * the multi-bulk length in the output buffer, and will "fix"
4786 * it later */
4787 lenobj = createObject(REDIS_STRING,NULL);
4788 addReply(c,lenobj);
c74e7c77 4789 decrRefCount(lenobj);
50c55df5 4790
dbbc7285 4791 while(ln && ln->score <= max) {
80181f78 4792 if (offset) {
4793 offset--;
4794 ln = ln->forward[0];
4795 continue;
4796 }
4797 if (limit == 0) break;
50c55df5 4798 ele = ln->obj;
4799 addReplyBulkLen(c,ele);
4800 addReply(c,ele);
4801 addReply(c,shared.crlf);
4802 ln = ln->forward[0];
4803 rangelen++;
80181f78 4804 if (limit > 0) limit--;
50c55df5 4805 }
4806 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4807 }
4808 }
4809}
4810
3c41331e 4811static void zcardCommand(redisClient *c) {
e197b441 4812 robj *o;
4813 zset *zs;
4814
4815 o = lookupKeyRead(c->db,c->argv[1]);
4816 if (o == NULL) {
4817 addReply(c,shared.czero);
4818 return;
4819 } else {
4820 if (o->type != REDIS_ZSET) {
4821 addReply(c,shared.wrongtypeerr);
4822 } else {
4823 zs = o->ptr;
682ac724 4824 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
e197b441 4825 }
4826 }
4827}
4828
6e333bbe 4829static void zscoreCommand(redisClient *c) {
4830 robj *o;
4831 zset *zs;
4832
4833 o = lookupKeyRead(c->db,c->argv[1]);
4834 if (o == NULL) {
96d8b4ee 4835 addReply(c,shared.nullbulk);
6e333bbe 4836 return;
4837 } else {
4838 if (o->type != REDIS_ZSET) {
4839 addReply(c,shared.wrongtypeerr);
4840 } else {
4841 dictEntry *de;
4842
4843 zs = o->ptr;
4844 de = dictFind(zs->dict,c->argv[2]);
4845 if (!de) {
4846 addReply(c,shared.nullbulk);
4847 } else {
6e333bbe 4848 double *score = dictGetEntryVal(de);
4849
e2665397 4850 addReplyDouble(c,*score);
6e333bbe 4851 }
4852 }
4853 }
4854}
4855
6b47e12e 4856/* ========================= Non type-specific commands ==================== */
4857
ed9b544e 4858static void flushdbCommand(redisClient *c) {
ca37e9cd 4859 server.dirty += dictSize(c->db->dict);
3305306f 4860 dictEmpty(c->db->dict);
4861 dictEmpty(c->db->expires);
ed9b544e 4862 addReply(c,shared.ok);
ed9b544e 4863}
4864
4865static void flushallCommand(redisClient *c) {
ca37e9cd 4866 server.dirty += emptyDb();
ed9b544e 4867 addReply(c,shared.ok);
f78fd11b 4868 rdbSave(server.dbfilename);
ca37e9cd 4869 server.dirty++;
ed9b544e 4870}
4871
56906eef 4872static redisSortOperation *createSortOperation(int type, robj *pattern) {
ed9b544e 4873 redisSortOperation *so = zmalloc(sizeof(*so));
ed9b544e 4874 so->type = type;
4875 so->pattern = pattern;
4876 return so;
4877}
4878
4879/* Return the value associated to the key with a name obtained
4880 * substituting the first occurence of '*' in 'pattern' with 'subst' */
56906eef 4881static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
ed9b544e 4882 char *p;
4883 sds spat, ssub;
4884 robj keyobj;
4885 int prefixlen, sublen, postfixlen;
ed9b544e 4886 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4887 struct {
f1017b3f 4888 long len;
4889 long free;
ed9b544e 4890 char buf[REDIS_SORTKEY_MAX+1];
4891 } keyname;
4892
28173a49 4893 /* If the pattern is "#" return the substitution object itself in order
4894 * to implement the "SORT ... GET #" feature. */
4895 spat = pattern->ptr;
4896 if (spat[0] == '#' && spat[1] == '\0') {
4897 return subst;
4898 }
4899
4900 /* The substitution object may be specially encoded. If so we create
9d65a1bb 4901 * a decoded object on the fly. Otherwise getDecodedObject will just
4902 * increment the ref count, that we'll decrement later. */
4903 subst = getDecodedObject(subst);
942a3961 4904
ed9b544e 4905 ssub = subst->ptr;
4906 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4907 p = strchr(spat,'*');
ed5a857a 4908 if (!p) {
4909 decrRefCount(subst);
4910 return NULL;
4911 }
ed9b544e 4912
4913 prefixlen = p-spat;
4914 sublen = sdslen(ssub);
4915 postfixlen = sdslen(spat)-(prefixlen+1);
4916 memcpy(keyname.buf,spat,prefixlen);
4917 memcpy(keyname.buf+prefixlen,ssub,sublen);
4918 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4919 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4920 keyname.len = prefixlen+sublen+postfixlen;
4921
dfc5e96c 4922 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
942a3961 4923 decrRefCount(subst);
4924
a4d1ba9a 4925 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3305306f 4926 return lookupKeyRead(db,&keyobj);
ed9b544e 4927}
4928
4929/* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4930 * the additional parameter is not standard but a BSD-specific we have to
4931 * pass sorting parameters via the global 'server' structure */
4932static int sortCompare(const void *s1, const void *s2) {
4933 const redisSortObject *so1 = s1, *so2 = s2;
4934 int cmp;
4935
4936 if (!server.sort_alpha) {
4937 /* Numeric sorting. Here it's trivial as we precomputed scores */
4938 if (so1->u.score > so2->u.score) {
4939 cmp = 1;
4940 } else if (so1->u.score < so2->u.score) {
4941 cmp = -1;
4942 } else {
4943 cmp = 0;
4944 }
4945 } else {
4946 /* Alphanumeric sorting */
4947 if (server.sort_bypattern) {
4948 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4949 /* At least one compare object is NULL */
4950 if (so1->u.cmpobj == so2->u.cmpobj)
4951 cmp = 0;
4952 else if (so1->u.cmpobj == NULL)
4953 cmp = -1;
4954 else
4955 cmp = 1;
4956 } else {
4957 /* We have both the objects, use strcoll */
4958 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4959 }
4960 } else {
4961 /* Compare elements directly */
9d65a1bb 4962 robj *dec1, *dec2;
4963
4964 dec1 = getDecodedObject(so1->obj);
4965 dec2 = getDecodedObject(so2->obj);
4966 cmp = strcoll(dec1->ptr,dec2->ptr);
4967 decrRefCount(dec1);
4968 decrRefCount(dec2);
ed9b544e 4969 }
4970 }
4971 return server.sort_desc ? -cmp : cmp;
4972}
4973
4974/* The SORT command is the most complex command in Redis. Warning: this code
4975 * is optimized for speed and a bit less for readability */
4976static void sortCommand(redisClient *c) {
ed9b544e 4977 list *operations;
4978 int outputlen = 0;
4979 int desc = 0, alpha = 0;
4980 int limit_start = 0, limit_count = -1, start, end;
4981 int j, dontsort = 0, vectorlen;
4982 int getop = 0; /* GET operation counter */
443c6409 4983 robj *sortval, *sortby = NULL, *storekey = NULL;
ed9b544e 4984 redisSortObject *vector; /* Resulting vector to sort */
4985
4986 /* Lookup the key to sort. It must be of the right types */
3305306f 4987 sortval = lookupKeyRead(c->db,c->argv[1]);
4988 if (sortval == NULL) {
d922ae65 4989 addReply(c,shared.nullmultibulk);
ed9b544e 4990 return;
4991 }
a5eb649b 4992 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
4993 sortval->type != REDIS_ZSET)
4994 {
c937aa89 4995 addReply(c,shared.wrongtypeerr);
ed9b544e 4996 return;
4997 }
4998
4999 /* Create a list of operations to perform for every sorted element.
5000 * Operations can be GET/DEL/INCR/DECR */
5001 operations = listCreate();
092dac2a 5002 listSetFreeMethod(operations,zfree);
ed9b544e 5003 j = 2;
5004
5005 /* Now we need to protect sortval incrementing its count, in the future
5006 * SORT may have options able to overwrite/delete keys during the sorting
5007 * and the sorted key itself may get destroied */
5008 incrRefCount(sortval);
5009
5010 /* The SORT command has an SQL-alike syntax, parse it */
5011 while(j < c->argc) {
5012 int leftargs = c->argc-j-1;
5013 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
5014 desc = 0;
5015 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
5016 desc = 1;
5017 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
5018 alpha = 1;
5019 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
5020 limit_start = atoi(c->argv[j+1]->ptr);
5021 limit_count = atoi(c->argv[j+2]->ptr);
5022 j+=2;
443c6409 5023 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
5024 storekey = c->argv[j+1];
5025 j++;
ed9b544e 5026 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
5027 sortby = c->argv[j+1];
5028 /* If the BY pattern does not contain '*', i.e. it is constant,
5029 * we don't need to sort nor to lookup the weight keys. */
5030 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
5031 j++;
5032 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
5033 listAddNodeTail(operations,createSortOperation(
5034 REDIS_SORT_GET,c->argv[j+1]));
5035 getop++;
5036 j++;
ed9b544e 5037 } else {
5038 decrRefCount(sortval);
5039 listRelease(operations);
c937aa89 5040 addReply(c,shared.syntaxerr);
ed9b544e 5041 return;
5042 }
5043 j++;
5044 }
5045
5046 /* Load the sorting vector with all the objects to sort */
a5eb649b 5047 switch(sortval->type) {
5048 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
5049 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
5050 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
dfc5e96c 5051 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
a5eb649b 5052 }
ed9b544e 5053 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
ed9b544e 5054 j = 0;
a5eb649b 5055
ed9b544e 5056 if (sortval->type == REDIS_LIST) {
5057 list *list = sortval->ptr;
6208b3a7 5058 listNode *ln;
5059
5060 listRewind(list);
5061 while((ln = listYield(list))) {
ed9b544e 5062 robj *ele = ln->value;
5063 vector[j].obj = ele;
5064 vector[j].u.score = 0;
5065 vector[j].u.cmpobj = NULL;
ed9b544e 5066 j++;
5067 }
5068 } else {
a5eb649b 5069 dict *set;
ed9b544e 5070 dictIterator *di;
5071 dictEntry *setele;
5072
a5eb649b 5073 if (sortval->type == REDIS_SET) {
5074 set = sortval->ptr;
5075 } else {
5076 zset *zs = sortval->ptr;
5077 set = zs->dict;
5078 }
5079
ed9b544e 5080 di = dictGetIterator(set);
ed9b544e 5081 while((setele = dictNext(di)) != NULL) {
5082 vector[j].obj = dictGetEntryKey(setele);
5083 vector[j].u.score = 0;
5084 vector[j].u.cmpobj = NULL;
5085 j++;
5086 }
5087 dictReleaseIterator(di);
5088 }
dfc5e96c 5089 redisAssert(j == vectorlen);
ed9b544e 5090
5091 /* Now it's time to load the right scores in the sorting vector */
5092 if (dontsort == 0) {
5093 for (j = 0; j < vectorlen; j++) {
5094 if (sortby) {
5095 robj *byval;
5096
3305306f 5097 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
ed9b544e 5098 if (!byval || byval->type != REDIS_STRING) continue;
5099 if (alpha) {
9d65a1bb 5100 vector[j].u.cmpobj = getDecodedObject(byval);
ed9b544e 5101 } else {
942a3961 5102 if (byval->encoding == REDIS_ENCODING_RAW) {
5103 vector[j].u.score = strtod(byval->ptr,NULL);
5104 } else {
9d65a1bb 5105 /* Don't need to decode the object if it's
5106 * integer-encoded (the only encoding supported) so
5107 * far. We can just cast it */
f1017b3f 5108 if (byval->encoding == REDIS_ENCODING_INT) {
942a3961 5109 vector[j].u.score = (long)byval->ptr;
f1017b3f 5110 } else
dfc5e96c 5111 redisAssert(1 != 1);
942a3961 5112 }
ed9b544e 5113 }
5114 } else {
942a3961 5115 if (!alpha) {
5116 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
5117 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
5118 else {
5119 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
5120 vector[j].u.score = (long) vector[j].obj->ptr;
5121 else
dfc5e96c 5122 redisAssert(1 != 1);
942a3961 5123 }
5124 }
ed9b544e 5125 }
5126 }
5127 }
5128
5129 /* We are ready to sort the vector... perform a bit of sanity check
5130 * on the LIMIT option too. We'll use a partial version of quicksort. */
5131 start = (limit_start < 0) ? 0 : limit_start;
5132 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
5133 if (start >= vectorlen) {
5134 start = vectorlen-1;
5135 end = vectorlen-2;
5136 }
5137 if (end >= vectorlen) end = vectorlen-1;
5138
5139 if (dontsort == 0) {
5140 server.sort_desc = desc;
5141 server.sort_alpha = alpha;
5142 server.sort_bypattern = sortby ? 1 : 0;
5f5b9840 5143 if (sortby && (start != 0 || end != vectorlen-1))
5144 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
5145 else
5146 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
ed9b544e 5147 }
5148
5149 /* Send command output to the output buffer, performing the specified
5150 * GET/DEL/INCR/DECR operations if any. */
5151 outputlen = getop ? getop*(end-start+1) : end-start+1;
443c6409 5152 if (storekey == NULL) {
5153 /* STORE option not specified, sent the sorting result to client */
5154 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
5155 for (j = start; j <= end; j++) {
5156 listNode *ln;
5157 if (!getop) {
5158 addReplyBulkLen(c,vector[j].obj);
5159 addReply(c,vector[j].obj);
5160 addReply(c,shared.crlf);
5161 }
5162 listRewind(operations);
5163 while((ln = listYield(operations))) {
5164 redisSortOperation *sop = ln->value;
5165 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5166 vector[j].obj);
5167
5168 if (sop->type == REDIS_SORT_GET) {
5169 if (!val || val->type != REDIS_STRING) {
5170 addReply(c,shared.nullbulk);
5171 } else {
5172 addReplyBulkLen(c,val);
5173 addReply(c,val);
5174 addReply(c,shared.crlf);
5175 }
5176 } else {
dfc5e96c 5177 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
443c6409 5178 }
5179 }
ed9b544e 5180 }
443c6409 5181 } else {
5182 robj *listObject = createListObject();
5183 list *listPtr = (list*) listObject->ptr;
5184
5185 /* STORE option specified, set the sorting result as a List object */
5186 for (j = start; j <= end; j++) {
5187 listNode *ln;
5188 if (!getop) {
5189 listAddNodeTail(listPtr,vector[j].obj);
5190 incrRefCount(vector[j].obj);
5191 }
5192 listRewind(operations);
5193 while((ln = listYield(operations))) {
5194 redisSortOperation *sop = ln->value;
5195 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5196 vector[j].obj);
5197
5198 if (sop->type == REDIS_SORT_GET) {
5199 if (!val || val->type != REDIS_STRING) {
5200 listAddNodeTail(listPtr,createStringObject("",0));
5201 } else {
5202 listAddNodeTail(listPtr,val);
5203 incrRefCount(val);
5204 }
ed9b544e 5205 } else {
dfc5e96c 5206 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
ed9b544e 5207 }
ed9b544e 5208 }
ed9b544e 5209 }
121796f7 5210 if (dictReplace(c->db->dict,storekey,listObject)) {
5211 incrRefCount(storekey);
5212 }
443c6409 5213 /* Note: we add 1 because the DB is dirty anyway since even if the
5214 * SORT result is empty a new key is set and maybe the old content
5215 * replaced. */
5216 server.dirty += 1+outputlen;
5217 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
ed9b544e 5218 }
5219
5220 /* Cleanup */
5221 decrRefCount(sortval);
5222 listRelease(operations);
5223 for (j = 0; j < vectorlen; j++) {
5224 if (sortby && alpha && vector[j].u.cmpobj)
5225 decrRefCount(vector[j].u.cmpobj);
5226 }
5227 zfree(vector);
5228}
5229
1c85b79f 5230/* Create the string returned by the INFO command. This is decoupled
5231 * by the INFO command itself as we need to report the same information
5232 * on memory corruption problems. */
5233static sds genRedisInfoString(void) {
ed9b544e 5234 sds info;
5235 time_t uptime = time(NULL)-server.stat_starttime;
c3cb078d 5236 int j;
ed9b544e 5237
5238 info = sdscatprintf(sdsempty(),
5239 "redis_version:%s\r\n"
f1017b3f 5240 "arch_bits:%s\r\n"
7a932b74 5241 "multiplexing_api:%s\r\n"
682ac724 5242 "uptime_in_seconds:%ld\r\n"
5243 "uptime_in_days:%ld\r\n"
ed9b544e 5244 "connected_clients:%d\r\n"
5245 "connected_slaves:%d\r\n"
f86a74e9 5246 "blocked_clients:%d\r\n"
5fba9f71 5247 "used_memory:%zu\r\n"
ed9b544e 5248 "changes_since_last_save:%lld\r\n"
be2bb6b0 5249 "bgsave_in_progress:%d\r\n"
682ac724 5250 "last_save_time:%ld\r\n"
b3fad521 5251 "bgrewriteaof_in_progress:%d\r\n"
ed9b544e 5252 "total_connections_received:%lld\r\n"
5253 "total_commands_processed:%lld\r\n"
a0f643ea 5254 "role:%s\r\n"
ed9b544e 5255 ,REDIS_VERSION,
f1017b3f 5256 (sizeof(long) == 8) ? "64" : "32",
7a932b74 5257 aeGetApiName(),
a0f643ea 5258 uptime,
5259 uptime/(3600*24),
ed9b544e 5260 listLength(server.clients)-listLength(server.slaves),
5261 listLength(server.slaves),
f86a74e9 5262 server.blockedclients,
ed9b544e 5263 server.usedmemory,
5264 server.dirty,
9d65a1bb 5265 server.bgsavechildpid != -1,
ed9b544e 5266 server.lastsave,
b3fad521 5267 server.bgrewritechildpid != -1,
ed9b544e 5268 server.stat_numconnections,
5269 server.stat_numcommands,
a0f643ea 5270 server.masterhost == NULL ? "master" : "slave"
ed9b544e 5271 );
a0f643ea 5272 if (server.masterhost) {
5273 info = sdscatprintf(info,
5274 "master_host:%s\r\n"
5275 "master_port:%d\r\n"
5276 "master_link_status:%s\r\n"
5277 "master_last_io_seconds_ago:%d\r\n"
5278 ,server.masterhost,
5279 server.masterport,
5280 (server.replstate == REDIS_REPL_CONNECTED) ?
5281 "up" : "down",
f72b934d 5282 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
a0f643ea 5283 );
5284 }
c3cb078d 5285 for (j = 0; j < server.dbnum; j++) {
5286 long long keys, vkeys;
5287
5288 keys = dictSize(server.db[j].dict);
5289 vkeys = dictSize(server.db[j].expires);
5290 if (keys || vkeys) {
9d65a1bb 5291 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
c3cb078d 5292 j, keys, vkeys);
5293 }
5294 }
1c85b79f 5295 return info;
5296}
5297
5298static void infoCommand(redisClient *c) {
5299 sds info = genRedisInfoString();
83c6a618 5300 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5301 (unsigned long)sdslen(info)));
ed9b544e 5302 addReplySds(c,info);
70003d28 5303 addReply(c,shared.crlf);
ed9b544e 5304}
5305
3305306f 5306static void monitorCommand(redisClient *c) {
5307 /* ignore MONITOR if aleady slave or in monitor mode */
5308 if (c->flags & REDIS_SLAVE) return;
5309
5310 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5311 c->slaveseldb = 0;
6b47e12e 5312 listAddNodeTail(server.monitors,c);
3305306f 5313 addReply(c,shared.ok);
5314}
5315
5316/* ================================= Expire ================================= */
5317static int removeExpire(redisDb *db, robj *key) {
5318 if (dictDelete(db->expires,key) == DICT_OK) {
5319 return 1;
5320 } else {
5321 return 0;
5322 }
5323}
5324
5325static int setExpire(redisDb *db, robj *key, time_t when) {
5326 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5327 return 0;
5328 } else {
5329 incrRefCount(key);
5330 return 1;
5331 }
5332}
5333
bb32ede5 5334/* Return the expire time of the specified key, or -1 if no expire
5335 * is associated with this key (i.e. the key is non volatile) */
5336static time_t getExpire(redisDb *db, robj *key) {
5337 dictEntry *de;
5338
5339 /* No expire? return ASAP */
5340 if (dictSize(db->expires) == 0 ||
5341 (de = dictFind(db->expires,key)) == NULL) return -1;
5342
5343 return (time_t) dictGetEntryVal(de);
5344}
5345
3305306f 5346static int expireIfNeeded(redisDb *db, robj *key) {
5347 time_t when;
5348 dictEntry *de;
5349
5350 /* No expire? return ASAP */
5351 if (dictSize(db->expires) == 0 ||
5352 (de = dictFind(db->expires,key)) == NULL) return 0;
5353
5354 /* Lookup the expire */
5355 when = (time_t) dictGetEntryVal(de);
5356 if (time(NULL) <= when) return 0;
5357
5358 /* Delete the key */
5359 dictDelete(db->expires,key);
5360 return dictDelete(db->dict,key) == DICT_OK;
5361}
5362
5363static int deleteIfVolatile(redisDb *db, robj *key) {
5364 dictEntry *de;
5365
5366 /* No expire? return ASAP */
5367 if (dictSize(db->expires) == 0 ||
5368 (de = dictFind(db->expires,key)) == NULL) return 0;
5369
5370 /* Delete the key */
0c66a471 5371 server.dirty++;
3305306f 5372 dictDelete(db->expires,key);
5373 return dictDelete(db->dict,key) == DICT_OK;
5374}
5375
802e8373 5376static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
3305306f 5377 dictEntry *de;
3305306f 5378
802e8373 5379 de = dictFind(c->db->dict,key);
3305306f 5380 if (de == NULL) {
5381 addReply(c,shared.czero);
5382 return;
5383 }
43e5ccdf 5384 if (seconds < 0) {
5385 if (deleteKey(c->db,key)) server.dirty++;
5386 addReply(c, shared.cone);
3305306f 5387 return;
5388 } else {
5389 time_t when = time(NULL)+seconds;
802e8373 5390 if (setExpire(c->db,key,when)) {
3305306f 5391 addReply(c,shared.cone);
77423026 5392 server.dirty++;
5393 } else {
3305306f 5394 addReply(c,shared.czero);
77423026 5395 }
3305306f 5396 return;
5397 }
5398}
5399
802e8373 5400static void expireCommand(redisClient *c) {
5401 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5402}
5403
5404static void expireatCommand(redisClient *c) {
5405 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5406}
5407
fd88489a 5408static void ttlCommand(redisClient *c) {
5409 time_t expire;
5410 int ttl = -1;
5411
5412 expire = getExpire(c->db,c->argv[1]);
5413 if (expire != -1) {
5414 ttl = (int) (expire-time(NULL));
5415 if (ttl < 0) ttl = -1;
5416 }
5417 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5418}
5419
6e469882 5420/* ================================ MULTI/EXEC ============================== */
5421
5422/* Client state initialization for MULTI/EXEC */
5423static void initClientMultiState(redisClient *c) {
5424 c->mstate.commands = NULL;
5425 c->mstate.count = 0;
5426}
5427
5428/* Release all the resources associated with MULTI/EXEC state */
5429static void freeClientMultiState(redisClient *c) {
5430 int j;
5431
5432 for (j = 0; j < c->mstate.count; j++) {
5433 int i;
5434 multiCmd *mc = c->mstate.commands+j;
5435
5436 for (i = 0; i < mc->argc; i++)
5437 decrRefCount(mc->argv[i]);
5438 zfree(mc->argv);
5439 }
5440 zfree(c->mstate.commands);
5441}
5442
5443/* Add a new command into the MULTI commands queue */
5444static void queueMultiCommand(redisClient *c, struct redisCommand *cmd) {
5445 multiCmd *mc;
5446 int j;
5447
5448 c->mstate.commands = zrealloc(c->mstate.commands,
5449 sizeof(multiCmd)*(c->mstate.count+1));
5450 mc = c->mstate.commands+c->mstate.count;
5451 mc->cmd = cmd;
5452 mc->argc = c->argc;
5453 mc->argv = zmalloc(sizeof(robj*)*c->argc);
5454 memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
5455 for (j = 0; j < c->argc; j++)
5456 incrRefCount(mc->argv[j]);
5457 c->mstate.count++;
5458}
5459
5460static void multiCommand(redisClient *c) {
5461 c->flags |= REDIS_MULTI;
36c548f0 5462 addReply(c,shared.ok);
6e469882 5463}
5464
5465static void execCommand(redisClient *c) {
5466 int j;
5467 robj **orig_argv;
5468 int orig_argc;
5469
5470 if (!(c->flags & REDIS_MULTI)) {
5471 addReplySds(c,sdsnew("-ERR EXEC without MULTI\r\n"));
5472 return;
5473 }
5474
5475 orig_argv = c->argv;
5476 orig_argc = c->argc;
5477 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
5478 for (j = 0; j < c->mstate.count; j++) {
5479 c->argc = c->mstate.commands[j].argc;
5480 c->argv = c->mstate.commands[j].argv;
5481 call(c,c->mstate.commands[j].cmd);
5482 }
5483 c->argv = orig_argv;
5484 c->argc = orig_argc;
5485 freeClientMultiState(c);
5486 initClientMultiState(c);
5487 c->flags &= (~REDIS_MULTI);
5488}
5489
4409877e 5490/* =========================== Blocking Operations ========================= */
5491
5492/* Currently Redis blocking operations support is limited to list POP ops,
5493 * so the current implementation is not fully generic, but it is also not
5494 * completely specific so it will not require a rewrite to support new
5495 * kind of blocking operations in the future.
5496 *
5497 * Still it's important to note that list blocking operations can be already
5498 * used as a notification mechanism in order to implement other blocking
5499 * operations at application level, so there must be a very strong evidence
5500 * of usefulness and generality before new blocking operations are implemented.
5501 *
5502 * This is how the current blocking POP works, we use BLPOP as example:
5503 * - If the user calls BLPOP and the key exists and contains a non empty list
5504 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5505 * if there is not to block.
5506 * - If instead BLPOP is called and the key does not exists or the list is
5507 * empty we need to block. In order to do so we remove the notification for
5508 * new data to read in the client socket (so that we'll not serve new
5509 * requests if the blocking request is not served). Also we put the client
95242ab5 5510 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
4409877e 5511 * blocking for this keys.
5512 * - If a PUSH operation against a key with blocked clients waiting is
5513 * performed, we serve the first in the list: basically instead to push
5514 * the new element inside the list we return it to the (first / oldest)
5515 * blocking client, unblock the client, and remove it form the list.
5516 *
5517 * The above comment and the source code should be enough in order to understand
5518 * the implementation and modify / fix it later.
5519 */
5520
5521/* Set a client in blocking mode for the specified key, with the specified
5522 * timeout */
b177fd30 5523static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
4409877e 5524 dictEntry *de;
5525 list *l;
b177fd30 5526 int j;
4409877e 5527
b177fd30 5528 c->blockingkeys = zmalloc(sizeof(robj*)*numkeys);
5529 c->blockingkeysnum = numkeys;
4409877e 5530 c->blockingto = timeout;
b177fd30 5531 for (j = 0; j < numkeys; j++) {
5532 /* Add the key in the client structure, to map clients -> keys */
5533 c->blockingkeys[j] = keys[j];
5534 incrRefCount(keys[j]);
4409877e 5535
b177fd30 5536 /* And in the other "side", to map keys -> clients */
5537 de = dictFind(c->db->blockingkeys,keys[j]);
5538 if (de == NULL) {
5539 int retval;
5540
5541 /* For every key we take a list of clients blocked for it */
5542 l = listCreate();
5543 retval = dictAdd(c->db->blockingkeys,keys[j],l);
5544 incrRefCount(keys[j]);
5545 assert(retval == DICT_OK);
5546 } else {
5547 l = dictGetEntryVal(de);
5548 }
5549 listAddNodeTail(l,c);
4409877e 5550 }
b177fd30 5551 /* Mark the client as a blocked client */
4409877e 5552 c->flags |= REDIS_BLOCKED;
5553 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
f86a74e9 5554 server.blockedclients++;
4409877e 5555}
5556
5557/* Unblock a client that's waiting in a blocking operation such as BLPOP */
5558static void unblockClient(redisClient *c) {
5559 dictEntry *de;
5560 list *l;
b177fd30 5561 int j;
4409877e 5562
b177fd30 5563 assert(c->blockingkeys != NULL);
5564 /* The client may wait for multiple keys, so unblock it for every key. */
5565 for (j = 0; j < c->blockingkeysnum; j++) {
5566 /* Remove this client from the list of clients waiting for this key. */
5567 de = dictFind(c->db->blockingkeys,c->blockingkeys[j]);
5568 assert(de != NULL);
5569 l = dictGetEntryVal(de);
5570 listDelNode(l,listSearchKey(l,c));
5571 /* If the list is empty we need to remove it to avoid wasting memory */
5572 if (listLength(l) == 0)
5573 dictDelete(c->db->blockingkeys,c->blockingkeys[j]);
5574 decrRefCount(c->blockingkeys[j]);
5575 }
5576 /* Cleanup the client structure */
5577 zfree(c->blockingkeys);
5578 c->blockingkeys = NULL;
4409877e 5579 c->flags &= (~REDIS_BLOCKED);
f86a74e9 5580 server.blockedclients--;
4409877e 5581 /* Ok now we are ready to get read events from socket, note that we
5582 * can't trap errors here as it's possible that unblockClients() is
5583 * called from freeClient() itself, and the only thing we can do
5584 * if we failed to register the READABLE event is to kill the client.
5585 * Still the following function should never fail in the real world as
5586 * we are sure the file descriptor is sane, and we exit on out of mem. */
5587 aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c);
5588 /* As a final step we want to process data if there is some command waiting
5589 * in the input buffer. Note that this is safe even if unblockClient()
5590 * gets called from freeClient() because freeClient() will be smart
5591 * enough to call this function *after* c->querybuf was set to NULL. */
5592 if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
5593}
5594
5595/* This should be called from any function PUSHing into lists.
5596 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5597 * 'ele' is the element pushed.
5598 *
5599 * If the function returns 0 there was no client waiting for a list push
5600 * against this key.
5601 *
5602 * If the function returns 1 there was a client waiting for a list push
5603 * against this key, the element was passed to this client thus it's not
5604 * needed to actually add it to the list and the caller should return asap. */
5605static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
5606 struct dictEntry *de;
5607 redisClient *receiver;
5608 list *l;
5609 listNode *ln;
5610
5611 de = dictFind(c->db->blockingkeys,key);
5612 if (de == NULL) return 0;
5613 l = dictGetEntryVal(de);
5614 ln = listFirst(l);
5615 assert(ln != NULL);
5616 receiver = ln->value;
4409877e 5617
b177fd30 5618 addReplySds(receiver,sdsnew("*2\r\n"));
5619 addReplyBulkLen(receiver,key);
5620 addReply(receiver,key);
5621 addReply(receiver,shared.crlf);
4409877e 5622 addReplyBulkLen(receiver,ele);
5623 addReply(receiver,ele);
5624 addReply(receiver,shared.crlf);
5625 unblockClient(receiver);
5626 return 1;
5627}
5628
5629/* Blocking RPOP/LPOP */
5630static void blockingPopGenericCommand(redisClient *c, int where) {
5631 robj *o;
5632 time_t timeout;
b177fd30 5633 int j;
4409877e 5634
b177fd30 5635 for (j = 1; j < c->argc-1; j++) {
5636 o = lookupKeyWrite(c->db,c->argv[j]);
5637 if (o != NULL) {
5638 if (o->type != REDIS_LIST) {
5639 addReply(c,shared.wrongtypeerr);
4409877e 5640 return;
b177fd30 5641 } else {
5642 list *list = o->ptr;
5643 if (listLength(list) != 0) {
5644 /* If the list contains elements fall back to the usual
5645 * non-blocking POP operation */
5646 robj *argv[2], **orig_argv;
5647 int orig_argc;
5648
5649 /* We need to alter the command arguments before to call
5650 * popGenericCommand() as the command takes a single key. */
5651 orig_argv = c->argv;
5652 orig_argc = c->argc;
5653 argv[1] = c->argv[j];
5654 c->argv = argv;
5655 c->argc = 2;
5656
5657 /* Also the return value is different, we need to output
5658 * the multi bulk reply header and the key name. The
5659 * "real" command will add the last element (the value)
5660 * for us. If this souds like an hack to you it's just
5661 * because it is... */
5662 addReplySds(c,sdsnew("*2\r\n"));
5663 addReplyBulkLen(c,argv[1]);
5664 addReply(c,argv[1]);
5665 addReply(c,shared.crlf);
5666 popGenericCommand(c,where);
5667
5668 /* Fix the client structure with the original stuff */
5669 c->argv = orig_argv;
5670 c->argc = orig_argc;
5671 return;
5672 }
4409877e 5673 }
5674 }
5675 }
5676 /* If the list is empty or the key does not exists we must block */
b177fd30 5677 timeout = strtol(c->argv[c->argc-1]->ptr,NULL,10);
4409877e 5678 if (timeout > 0) timeout += time(NULL);
b177fd30 5679 blockForKeys(c,c->argv+1,c->argc-2,timeout);
4409877e 5680}
5681
5682static void blpopCommand(redisClient *c) {
5683 blockingPopGenericCommand(c,REDIS_HEAD);
5684}
5685
5686static void brpopCommand(redisClient *c) {
5687 blockingPopGenericCommand(c,REDIS_TAIL);
5688}
5689
ed9b544e 5690/* =============================== Replication ============================= */
5691
a4d1ba9a 5692static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
ed9b544e 5693 ssize_t nwritten, ret = size;
5694 time_t start = time(NULL);
5695
5696 timeout++;
5697 while(size) {
5698 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5699 nwritten = write(fd,ptr,size);
5700 if (nwritten == -1) return -1;
5701 ptr += nwritten;
5702 size -= nwritten;
5703 }
5704 if ((time(NULL)-start) > timeout) {
5705 errno = ETIMEDOUT;
5706 return -1;
5707 }
5708 }
5709 return ret;
5710}
5711
a4d1ba9a 5712static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
ed9b544e 5713 ssize_t nread, totread = 0;
5714 time_t start = time(NULL);
5715
5716 timeout++;
5717 while(size) {
5718 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5719 nread = read(fd,ptr,size);
5720 if (nread == -1) return -1;
5721 ptr += nread;
5722 size -= nread;
5723 totread += nread;
5724 }
5725 if ((time(NULL)-start) > timeout) {
5726 errno = ETIMEDOUT;
5727 return -1;
5728 }
5729 }
5730 return totread;
5731}
5732
5733static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5734 ssize_t nread = 0;
5735
5736 size--;
5737 while(size) {
5738 char c;
5739
5740 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5741 if (c == '\n') {
5742 *ptr = '\0';
5743 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5744 return nread;
5745 } else {
5746 *ptr++ = c;
5747 *ptr = '\0';
5748 nread++;
5749 }
5750 }
5751 return nread;
5752}
5753
5754static void syncCommand(redisClient *c) {
40d224a9 5755 /* ignore SYNC if aleady slave or in monitor mode */
5756 if (c->flags & REDIS_SLAVE) return;
5757
5758 /* SYNC can't be issued when the server has pending data to send to
5759 * the client about already issued commands. We need a fresh reply
5760 * buffer registering the differences between the BGSAVE and the current
5761 * dataset, so that we can copy to other slaves if needed. */
5762 if (listLength(c->reply) != 0) {
5763 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5764 return;
5765 }
5766
5767 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5768 /* Here we need to check if there is a background saving operation
5769 * in progress, or if it is required to start one */
9d65a1bb 5770 if (server.bgsavechildpid != -1) {
40d224a9 5771 /* Ok a background save is in progress. Let's check if it is a good
5772 * one for replication, i.e. if there is another slave that is
5773 * registering differences since the server forked to save */
5774 redisClient *slave;
5775 listNode *ln;
5776
6208b3a7 5777 listRewind(server.slaves);
5778 while((ln = listYield(server.slaves))) {
40d224a9 5779 slave = ln->value;
5780 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
40d224a9 5781 }
5782 if (ln) {
5783 /* Perfect, the server is already registering differences for
5784 * another slave. Set the right state, and copy the buffer. */
5785 listRelease(c->reply);
5786 c->reply = listDup(slave->reply);
40d224a9 5787 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5788 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5789 } else {
5790 /* No way, we need to wait for the next BGSAVE in order to
5791 * register differences */
5792 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5793 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5794 }
5795 } else {
5796 /* Ok we don't have a BGSAVE in progress, let's start one */
5797 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5798 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5799 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5800 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5801 return;
5802 }
5803 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5804 }
6208b3a7 5805 c->repldbfd = -1;
40d224a9 5806 c->flags |= REDIS_SLAVE;
5807 c->slaveseldb = 0;
6b47e12e 5808 listAddNodeTail(server.slaves,c);
40d224a9 5809 return;
5810}
5811
6208b3a7 5812static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5813 redisClient *slave = privdata;
5814 REDIS_NOTUSED(el);
5815 REDIS_NOTUSED(mask);
5816 char buf[REDIS_IOBUF_LEN];
5817 ssize_t nwritten, buflen;
5818
5819 if (slave->repldboff == 0) {
5820 /* Write the bulk write count before to transfer the DB. In theory here
5821 * we don't know how much room there is in the output buffer of the
5822 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5823 * operations) will never be smaller than the few bytes we need. */
5824 sds bulkcount;
5825
5826 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5827 slave->repldbsize);
5828 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5829 {
5830 sdsfree(bulkcount);
5831 freeClient(slave);
5832 return;
5833 }
5834 sdsfree(bulkcount);
5835 }
5836 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5837 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5838 if (buflen <= 0) {
5839 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5840 (buflen == 0) ? "premature EOF" : strerror(errno));
5841 freeClient(slave);
5842 return;
5843 }
5844 if ((nwritten = write(fd,buf,buflen)) == -1) {
5845 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5846 strerror(errno));
5847 freeClient(slave);
5848 return;
5849 }
5850 slave->repldboff += nwritten;
5851 if (slave->repldboff == slave->repldbsize) {
5852 close(slave->repldbfd);
5853 slave->repldbfd = -1;
5854 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5855 slave->replstate = REDIS_REPL_ONLINE;
5856 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
266373b2 5857 sendReplyToClient, slave) == AE_ERR) {
6208b3a7 5858 freeClient(slave);
5859 return;
5860 }
5861 addReplySds(slave,sdsempty());
5862 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5863 }
5864}
ed9b544e 5865
a3b21203 5866/* This function is called at the end of every backgrond saving.
5867 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5868 * otherwise REDIS_ERR is passed to the function.
5869 *
5870 * The goal of this function is to handle slaves waiting for a successful
5871 * background saving in order to perform non-blocking synchronization. */
5872static void updateSlavesWaitingBgsave(int bgsaveerr) {
6208b3a7 5873 listNode *ln;
5874 int startbgsave = 0;
ed9b544e 5875
6208b3a7 5876 listRewind(server.slaves);
5877 while((ln = listYield(server.slaves))) {
5878 redisClient *slave = ln->value;
ed9b544e 5879
6208b3a7 5880 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5881 startbgsave = 1;
5882 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5883 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
dde65f3f 5884 struct redis_stat buf;
6208b3a7 5885
5886 if (bgsaveerr != REDIS_OK) {
5887 freeClient(slave);
5888 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5889 continue;
5890 }
5891 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
dde65f3f 5892 redis_fstat(slave->repldbfd,&buf) == -1) {
6208b3a7 5893 freeClient(slave);
5894 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5895 continue;
5896 }
5897 slave->repldboff = 0;
5898 slave->repldbsize = buf.st_size;
5899 slave->replstate = REDIS_REPL_SEND_BULK;
5900 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
266373b2 5901 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
6208b3a7 5902 freeClient(slave);
5903 continue;
5904 }
5905 }
ed9b544e 5906 }
6208b3a7 5907 if (startbgsave) {
5908 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5909 listRewind(server.slaves);
5910 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5911 while((ln = listYield(server.slaves))) {
5912 redisClient *slave = ln->value;
ed9b544e 5913
6208b3a7 5914 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5915 freeClient(slave);
5916 }
5917 }
5918 }
ed9b544e 5919}
5920
5921static int syncWithMaster(void) {
d0ccebcf 5922 char buf[1024], tmpfile[256], authcmd[1024];
ed9b544e 5923 int dumpsize;
5924 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5925 int dfd;
5926
5927 if (fd == -1) {
5928 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5929 strerror(errno));
5930 return REDIS_ERR;
5931 }
d0ccebcf 5932
5933 /* AUTH with the master if required. */
5934 if(server.masterauth) {
5935 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5936 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5937 close(fd);
5938 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5939 strerror(errno));
5940 return REDIS_ERR;
5941 }
5942 /* Read the AUTH result. */
5943 if (syncReadLine(fd,buf,1024,3600) == -1) {
5944 close(fd);
5945 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5946 strerror(errno));
5947 return REDIS_ERR;
5948 }
5949 if (buf[0] != '+') {
5950 close(fd);
5951 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5952 return REDIS_ERR;
5953 }
5954 }
5955
ed9b544e 5956 /* Issue the SYNC command */
5957 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5958 close(fd);
5959 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5960 strerror(errno));
5961 return REDIS_ERR;
5962 }
5963 /* Read the bulk write count */
8c4d91fc 5964 if (syncReadLine(fd,buf,1024,3600) == -1) {
ed9b544e 5965 close(fd);
5966 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5967 strerror(errno));
5968 return REDIS_ERR;
5969 }
4aa701c1 5970 if (buf[0] != '$') {
5971 close(fd);
5972 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5973 return REDIS_ERR;
5974 }
c937aa89 5975 dumpsize = atoi(buf+1);
ed9b544e 5976 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5977 /* Read the bulk write data on a temp file */
5978 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5979 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5980 if (dfd == -1) {
5981 close(fd);
5982 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5983 return REDIS_ERR;
5984 }
5985 while(dumpsize) {
5986 int nread, nwritten;
5987
5988 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5989 if (nread == -1) {
5990 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5991 strerror(errno));
5992 close(fd);
5993 close(dfd);
5994 return REDIS_ERR;
5995 }
5996 nwritten = write(dfd,buf,nread);
5997 if (nwritten == -1) {
5998 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5999 close(fd);
6000 close(dfd);
6001 return REDIS_ERR;
6002 }
6003 dumpsize -= nread;
6004 }
6005 close(dfd);
6006 if (rename(tmpfile,server.dbfilename) == -1) {
6007 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
6008 unlink(tmpfile);
6009 close(fd);
6010 return REDIS_ERR;
6011 }
6012 emptyDb();
f78fd11b 6013 if (rdbLoad(server.dbfilename) != REDIS_OK) {
ed9b544e 6014 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
6015 close(fd);
6016 return REDIS_ERR;
6017 }
6018 server.master = createClient(fd);
6019 server.master->flags |= REDIS_MASTER;
179b3952 6020 server.master->authenticated = 1;
ed9b544e 6021 server.replstate = REDIS_REPL_CONNECTED;
6022 return REDIS_OK;
6023}
6024
321b0e13 6025static void slaveofCommand(redisClient *c) {
6026 if (!strcasecmp(c->argv[1]->ptr,"no") &&
6027 !strcasecmp(c->argv[2]->ptr,"one")) {
6028 if (server.masterhost) {
6029 sdsfree(server.masterhost);
6030 server.masterhost = NULL;
6031 if (server.master) freeClient(server.master);
6032 server.replstate = REDIS_REPL_NONE;
6033 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
6034 }
6035 } else {
6036 sdsfree(server.masterhost);
6037 server.masterhost = sdsdup(c->argv[1]->ptr);
6038 server.masterport = atoi(c->argv[2]->ptr);
6039 if (server.master) freeClient(server.master);
6040 server.replstate = REDIS_REPL_CONNECT;
6041 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
6042 server.masterhost, server.masterport);
6043 }
6044 addReply(c,shared.ok);
6045}
6046
3fd78bcd 6047/* ============================ Maxmemory directive ======================== */
6048
6049/* This function gets called when 'maxmemory' is set on the config file to limit
6050 * the max memory used by the server, and we are out of memory.
6051 * This function will try to, in order:
6052 *
6053 * - Free objects from the free list
6054 * - Try to remove keys with an EXPIRE set
6055 *
6056 * It is not possible to free enough memory to reach used-memory < maxmemory
6057 * the server will start refusing commands that will enlarge even more the
6058 * memory usage.
6059 */
6060static void freeMemoryIfNeeded(void) {
6061 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
6062 if (listLength(server.objfreelist)) {
6063 robj *o;
6064
6065 listNode *head = listFirst(server.objfreelist);
6066 o = listNodeValue(head);
6067 listDelNode(server.objfreelist,head);
6068 zfree(o);
6069 } else {
6070 int j, k, freed = 0;
6071
6072 for (j = 0; j < server.dbnum; j++) {
6073 int minttl = -1;
6074 robj *minkey = NULL;
6075 struct dictEntry *de;
6076
6077 if (dictSize(server.db[j].expires)) {
6078 freed = 1;
6079 /* From a sample of three keys drop the one nearest to
6080 * the natural expire */
6081 for (k = 0; k < 3; k++) {
6082 time_t t;
6083
6084 de = dictGetRandomKey(server.db[j].expires);
6085 t = (time_t) dictGetEntryVal(de);
6086 if (minttl == -1 || t < minttl) {
6087 minkey = dictGetEntryKey(de);
6088 minttl = t;
6089 }
6090 }
6091 deleteKey(server.db+j,minkey);
6092 }
6093 }
6094 if (!freed) return; /* nothing to free... */
6095 }
6096 }
6097}
6098
f80dff62 6099/* ============================== Append Only file ========================== */
6100
6101static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
6102 sds buf = sdsempty();
6103 int j;
6104 ssize_t nwritten;
6105 time_t now;
6106 robj *tmpargv[3];
6107
6108 /* The DB this command was targetting is not the same as the last command
6109 * we appendend. To issue a SELECT command is needed. */
6110 if (dictid != server.appendseldb) {
6111 char seldb[64];
6112
6113 snprintf(seldb,sizeof(seldb),"%d",dictid);
682ac724 6114 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
83c6a618 6115 (unsigned long)strlen(seldb),seldb);
f80dff62 6116 server.appendseldb = dictid;
6117 }
6118
6119 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6120 * EXPIREs into EXPIREATs calls */
6121 if (cmd->proc == expireCommand) {
6122 long when;
6123
6124 tmpargv[0] = createStringObject("EXPIREAT",8);
6125 tmpargv[1] = argv[1];
6126 incrRefCount(argv[1]);
6127 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
6128 tmpargv[2] = createObject(REDIS_STRING,
6129 sdscatprintf(sdsempty(),"%ld",when));
6130 argv = tmpargv;
6131 }
6132
6133 /* Append the actual command */
6134 buf = sdscatprintf(buf,"*%d\r\n",argc);
6135 for (j = 0; j < argc; j++) {
6136 robj *o = argv[j];
6137
9d65a1bb 6138 o = getDecodedObject(o);
83c6a618 6139 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
f80dff62 6140 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
6141 buf = sdscatlen(buf,"\r\n",2);
9d65a1bb 6142 decrRefCount(o);
f80dff62 6143 }
6144
6145 /* Free the objects from the modified argv for EXPIREAT */
6146 if (cmd->proc == expireCommand) {
6147 for (j = 0; j < 3; j++)
6148 decrRefCount(argv[j]);
6149 }
6150
6151 /* We want to perform a single write. This should be guaranteed atomic
6152 * at least if the filesystem we are writing is a real physical one.
6153 * While this will save us against the server being killed I don't think
6154 * there is much to do about the whole server stopping for power problems
6155 * or alike */
6156 nwritten = write(server.appendfd,buf,sdslen(buf));
6157 if (nwritten != (signed)sdslen(buf)) {
6158 /* Ooops, we are in troubles. The best thing to do for now is
6159 * to simply exit instead to give the illusion that everything is
6160 * working as expected. */
6161 if (nwritten == -1) {
6162 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
6163 } else {
6164 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
6165 }
6166 exit(1);
6167 }
85a83172 6168 /* If a background append only file rewriting is in progress we want to
6169 * accumulate the differences between the child DB and the current one
6170 * in a buffer, so that when the child process will do its work we
6171 * can append the differences to the new append only file. */
6172 if (server.bgrewritechildpid != -1)
6173 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
6174
6175 sdsfree(buf);
f80dff62 6176 now = time(NULL);
6177 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
6178 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
6179 now-server.lastfsync > 1))
6180 {
6181 fsync(server.appendfd); /* Let's try to get this data on the disk */
6182 server.lastfsync = now;
6183 }
6184}
6185
6186/* In Redis commands are always executed in the context of a client, so in
6187 * order to load the append only file we need to create a fake client. */
6188static struct redisClient *createFakeClient(void) {
6189 struct redisClient *c = zmalloc(sizeof(*c));
6190
6191 selectDb(c,0);
6192 c->fd = -1;
6193 c->querybuf = sdsempty();
6194 c->argc = 0;
6195 c->argv = NULL;
6196 c->flags = 0;
9387d17d 6197 /* We set the fake client as a slave waiting for the synchronization
6198 * so that Redis will not try to send replies to this client. */
6199 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
f80dff62 6200 c->reply = listCreate();
6201 listSetFreeMethod(c->reply,decrRefCount);
6202 listSetDupMethod(c->reply,dupClientReplyValue);
6203 return c;
6204}
6205
6206static void freeFakeClient(struct redisClient *c) {
6207 sdsfree(c->querybuf);
6208 listRelease(c->reply);
6209 zfree(c);
6210}
6211
6212/* Replay the append log file. On error REDIS_OK is returned. On non fatal
6213 * error (the append only file is zero-length) REDIS_ERR is returned. On
6214 * fatal error an error message is logged and the program exists. */
6215int loadAppendOnlyFile(char *filename) {
6216 struct redisClient *fakeClient;
6217 FILE *fp = fopen(filename,"r");
6218 struct redis_stat sb;
6219
6220 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
6221 return REDIS_ERR;
6222
6223 if (fp == NULL) {
6224 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
6225 exit(1);
6226 }
6227
6228 fakeClient = createFakeClient();
6229 while(1) {
6230 int argc, j;
6231 unsigned long len;
6232 robj **argv;
6233 char buf[128];
6234 sds argsds;
6235 struct redisCommand *cmd;
6236
6237 if (fgets(buf,sizeof(buf),fp) == NULL) {
6238 if (feof(fp))
6239 break;
6240 else
6241 goto readerr;
6242 }
6243 if (buf[0] != '*') goto fmterr;
6244 argc = atoi(buf+1);
6245 argv = zmalloc(sizeof(robj*)*argc);
6246 for (j = 0; j < argc; j++) {
6247 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
6248 if (buf[0] != '$') goto fmterr;
6249 len = strtol(buf+1,NULL,10);
6250 argsds = sdsnewlen(NULL,len);
0f151ef1 6251 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
f80dff62 6252 argv[j] = createObject(REDIS_STRING,argsds);
6253 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
6254 }
6255
6256 /* Command lookup */
6257 cmd = lookupCommand(argv[0]->ptr);
6258 if (!cmd) {
6259 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
6260 exit(1);
6261 }
6262 /* Try object sharing and encoding */
6263 if (server.shareobjects) {
6264 int j;
6265 for(j = 1; j < argc; j++)
6266 argv[j] = tryObjectSharing(argv[j]);
6267 }
6268 if (cmd->flags & REDIS_CMD_BULK)
6269 tryObjectEncoding(argv[argc-1]);
6270 /* Run the command in the context of a fake client */
6271 fakeClient->argc = argc;
6272 fakeClient->argv = argv;
6273 cmd->proc(fakeClient);
6274 /* Discard the reply objects list from the fake client */
6275 while(listLength(fakeClient->reply))
6276 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
6277 /* Clean up, ready for the next command */
6278 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
6279 zfree(argv);
6280 }
6281 fclose(fp);
6282 freeFakeClient(fakeClient);
6283 return REDIS_OK;
6284
6285readerr:
6286 if (feof(fp)) {
6287 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
6288 } else {
6289 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
6290 }
6291 exit(1);
6292fmterr:
6293 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
6294 exit(1);
6295}
6296
9d65a1bb 6297/* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6298static int fwriteBulk(FILE *fp, robj *obj) {
6299 char buf[128];
6300 obj = getDecodedObject(obj);
6301 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
6302 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
e96e4fbf 6303 if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0)
6304 goto err;
9d65a1bb 6305 if (fwrite("\r\n",2,1,fp) == 0) goto err;
6306 decrRefCount(obj);
6307 return 1;
6308err:
6309 decrRefCount(obj);
6310 return 0;
6311}
6312
6313/* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6314static int fwriteBulkDouble(FILE *fp, double d) {
6315 char buf[128], dbuf[128];
6316
6317 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
6318 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
6319 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6320 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
6321 return 1;
6322}
6323
6324/* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6325static int fwriteBulkLong(FILE *fp, long l) {
6326 char buf[128], lbuf[128];
6327
6328 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
6329 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
6330 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6331 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
6332 return 1;
6333}
6334
6335/* Write a sequence of commands able to fully rebuild the dataset into
6336 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6337static int rewriteAppendOnlyFile(char *filename) {
6338 dictIterator *di = NULL;
6339 dictEntry *de;
6340 FILE *fp;
6341 char tmpfile[256];
6342 int j;
6343 time_t now = time(NULL);
6344
6345 /* Note that we have to use a different temp name here compared to the
6346 * one used by rewriteAppendOnlyFileBackground() function. */
6347 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
6348 fp = fopen(tmpfile,"w");
6349 if (!fp) {
6350 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
6351 return REDIS_ERR;
6352 }
6353 for (j = 0; j < server.dbnum; j++) {
6354 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
6355 redisDb *db = server.db+j;
6356 dict *d = db->dict;
6357 if (dictSize(d) == 0) continue;
6358 di = dictGetIterator(d);
6359 if (!di) {
6360 fclose(fp);
6361 return REDIS_ERR;
6362 }
6363
6364 /* SELECT the new DB */
6365 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
85a83172 6366 if (fwriteBulkLong(fp,j) == 0) goto werr;
9d65a1bb 6367
6368 /* Iterate this DB writing every entry */
6369 while((de = dictNext(di)) != NULL) {
6370 robj *key = dictGetEntryKey(de);
6371 robj *o = dictGetEntryVal(de);
6372 time_t expiretime = getExpire(db,key);
6373
6374 /* Save the key and associated value */
9d65a1bb 6375 if (o->type == REDIS_STRING) {
6376 /* Emit a SET command */
6377 char cmd[]="*3\r\n$3\r\nSET\r\n";
6378 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6379 /* Key and value */
6380 if (fwriteBulk(fp,key) == 0) goto werr;
6381 if (fwriteBulk(fp,o) == 0) goto werr;
6382 } else if (o->type == REDIS_LIST) {
6383 /* Emit the RPUSHes needed to rebuild the list */
6384 list *list = o->ptr;
6385 listNode *ln;
6386
6387 listRewind(list);
6388 while((ln = listYield(list))) {
6389 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
6390 robj *eleobj = listNodeValue(ln);
6391
6392 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6393 if (fwriteBulk(fp,key) == 0) goto werr;
6394 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6395 }
6396 } else if (o->type == REDIS_SET) {
6397 /* Emit the SADDs needed to rebuild the set */
6398 dict *set = o->ptr;
6399 dictIterator *di = dictGetIterator(set);
6400 dictEntry *de;
6401
6402 while((de = dictNext(di)) != NULL) {
6403 char cmd[]="*3\r\n$4\r\nSADD\r\n";
6404 robj *eleobj = dictGetEntryKey(de);
6405
6406 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6407 if (fwriteBulk(fp,key) == 0) goto werr;
6408 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6409 }
6410 dictReleaseIterator(di);
6411 } else if (o->type == REDIS_ZSET) {
6412 /* Emit the ZADDs needed to rebuild the sorted set */
6413 zset *zs = o->ptr;
6414 dictIterator *di = dictGetIterator(zs->dict);
6415 dictEntry *de;
6416
6417 while((de = dictNext(di)) != NULL) {
6418 char cmd[]="*4\r\n$4\r\nZADD\r\n";
6419 robj *eleobj = dictGetEntryKey(de);
6420 double *score = dictGetEntryVal(de);
6421
6422 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6423 if (fwriteBulk(fp,key) == 0) goto werr;
6424 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
6425 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6426 }
6427 dictReleaseIterator(di);
6428 } else {
dfc5e96c 6429 redisAssert(0 != 0);
9d65a1bb 6430 }
6431 /* Save the expire time */
6432 if (expiretime != -1) {
e96e4fbf 6433 char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
9d65a1bb 6434 /* If this key is already expired skip it */
6435 if (expiretime < now) continue;
6436 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6437 if (fwriteBulk(fp,key) == 0) goto werr;
6438 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6439 }
6440 }
6441 dictReleaseIterator(di);
6442 }
6443
6444 /* Make sure data will not remain on the OS's output buffers */
6445 fflush(fp);
6446 fsync(fileno(fp));
6447 fclose(fp);
6448
6449 /* Use RENAME to make sure the DB file is changed atomically only
6450 * if the generate DB file is ok. */
6451 if (rename(tmpfile,filename) == -1) {
6452 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6453 unlink(tmpfile);
6454 return REDIS_ERR;
6455 }
6456 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6457 return REDIS_OK;
6458
6459werr:
6460 fclose(fp);
6461 unlink(tmpfile);
e96e4fbf 6462 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
9d65a1bb 6463 if (di) dictReleaseIterator(di);
6464 return REDIS_ERR;
6465}
6466
6467/* This is how rewriting of the append only file in background works:
6468 *
6469 * 1) The user calls BGREWRITEAOF
6470 * 2) Redis calls this function, that forks():
6471 * 2a) the child rewrite the append only file in a temp file.
6472 * 2b) the parent accumulates differences in server.bgrewritebuf.
6473 * 3) When the child finished '2a' exists.
6474 * 4) The parent will trap the exit code, if it's OK, will append the
6475 * data accumulated into server.bgrewritebuf into the temp file, and
6476 * finally will rename(2) the temp file in the actual file name.
6477 * The the new file is reopened as the new append only file. Profit!
6478 */
6479static int rewriteAppendOnlyFileBackground(void) {
6480 pid_t childpid;
6481
6482 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6483 if ((childpid = fork()) == 0) {
6484 /* Child */
6485 char tmpfile[256];
6486 close(server.fd);
6487
6488 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6489 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6490 exit(0);
6491 } else {
6492 exit(1);
6493 }
6494 } else {
6495 /* Parent */
6496 if (childpid == -1) {
6497 redisLog(REDIS_WARNING,
6498 "Can't rewrite append only file in background: fork: %s",
6499 strerror(errno));
6500 return REDIS_ERR;
6501 }
6502 redisLog(REDIS_NOTICE,
6503 "Background append only file rewriting started by pid %d",childpid);
6504 server.bgrewritechildpid = childpid;
85a83172 6505 /* We set appendseldb to -1 in order to force the next call to the
6506 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6507 * accumulated by the parent into server.bgrewritebuf will start
6508 * with a SELECT statement and it will be safe to merge. */
6509 server.appendseldb = -1;
9d65a1bb 6510 return REDIS_OK;
6511 }
6512 return REDIS_OK; /* unreached */
6513}
6514
6515static void bgrewriteaofCommand(redisClient *c) {
6516 if (server.bgrewritechildpid != -1) {
6517 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6518 return;
6519 }
6520 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
49b99ab4 6521 char *status = "+Background append only file rewriting started\r\n";
6522 addReplySds(c,sdsnew(status));
9d65a1bb 6523 } else {
6524 addReply(c,shared.err);
6525 }
6526}
6527
6528static void aofRemoveTempFile(pid_t childpid) {
6529 char tmpfile[256];
6530
6531 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6532 unlink(tmpfile);
6533}
6534
7f957c92 6535/* ================================= Debugging ============================== */
6536
6537static void debugCommand(redisClient *c) {
6538 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6539 *((char*)-1) = 'x';
210e29f7 6540 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6541 if (rdbSave(server.dbfilename) != REDIS_OK) {
6542 addReply(c,shared.err);
6543 return;
6544 }
6545 emptyDb();
6546 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6547 addReply(c,shared.err);
6548 return;
6549 }
6550 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6551 addReply(c,shared.ok);
71c2b467 6552 } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
6553 emptyDb();
6554 if (loadAppendOnlyFile(server.appendfilename) != REDIS_OK) {
6555 addReply(c,shared.err);
6556 return;
6557 }
6558 redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
6559 addReply(c,shared.ok);
333298da 6560 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6561 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6562 robj *key, *val;
6563
6564 if (!de) {
6565 addReply(c,shared.nokeyerr);
6566 return;
6567 }
6568 key = dictGetEntryKey(de);
6569 val = dictGetEntryVal(de);
6570 addReplySds(c,sdscatprintf(sdsempty(),
06233c45 6571 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d serializedlength:%lld\r\n",
682ac724 6572 (void*)key, key->refcount, (void*)val, val->refcount,
06233c45 6573 val->encoding, rdbSavedObjectLen(val)));
7f957c92 6574 } else {
333298da 6575 addReplySds(c,sdsnew(
210e29f7 6576 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
7f957c92 6577 }
6578}
56906eef 6579
dfc5e96c 6580static void _redisAssert(char *estr) {
6581 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6582 redisLog(REDIS_WARNING,"==> %s\n",estr);
6583#ifdef HAVE_BACKTRACE
6584 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6585 *((char*)-1) = 'x';
6586#endif
6587}
6588
bcfc686d 6589/* =================================== Main! ================================ */
56906eef 6590
bcfc686d 6591#ifdef __linux__
6592int linuxOvercommitMemoryValue(void) {
6593 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6594 char buf[64];
56906eef 6595
bcfc686d 6596 if (!fp) return -1;
6597 if (fgets(buf,64,fp) == NULL) {
6598 fclose(fp);
6599 return -1;
6600 }
6601 fclose(fp);
56906eef 6602
bcfc686d 6603 return atoi(buf);
6604}
6605
6606void linuxOvercommitMemoryWarning(void) {
6607 if (linuxOvercommitMemoryValue() == 0) {
6608 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6609 }
6610}
6611#endif /* __linux__ */
6612
6613static void daemonize(void) {
6614 int fd;
6615 FILE *fp;
6616
6617 if (fork() != 0) exit(0); /* parent exits */
71c54b21 6618 printf("New pid: %d\n", getpid());
bcfc686d 6619 setsid(); /* create a new session */
6620
6621 /* Every output goes to /dev/null. If Redis is daemonized but
6622 * the 'logfile' is set to 'stdout' in the configuration file
6623 * it will not log at all. */
6624 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6625 dup2(fd, STDIN_FILENO);
6626 dup2(fd, STDOUT_FILENO);
6627 dup2(fd, STDERR_FILENO);
6628 if (fd > STDERR_FILENO) close(fd);
6629 }
6630 /* Try to write the pid file */
6631 fp = fopen(server.pidfile,"w");
6632 if (fp) {
6633 fprintf(fp,"%d\n",getpid());
6634 fclose(fp);
56906eef 6635 }
56906eef 6636}
6637
bcfc686d 6638int main(int argc, char **argv) {
6639 initServerConfig();
6640 if (argc == 2) {
6641 resetServerSaveParams();
6642 loadServerConfig(argv[1]);
6643 } else if (argc > 2) {
6644 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6645 exit(1);
6646 } else {
6647 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6648 }
bcfc686d 6649 if (server.daemonize) daemonize();
71c54b21 6650 initServer();
bcfc686d 6651 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6652#ifdef __linux__
6653 linuxOvercommitMemoryWarning();
6654#endif
6655 if (server.appendonly) {
6656 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6657 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6658 } else {
6659 if (rdbLoad(server.dbfilename) == REDIS_OK)
6660 redisLog(REDIS_NOTICE,"DB loaded from disk");
6661 }
6662 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
266373b2 6663 acceptHandler, NULL) == AE_ERR) oom("creating file event");
bcfc686d 6664 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6665 aeMain(server.el);
6666 aeDeleteEventLoop(server.el);
6667 return 0;
6668}
6669
6670/* ============================= Backtrace support ========================= */
6671
6672#ifdef HAVE_BACKTRACE
6673static char *findFuncName(void *pointer, unsigned long *offset);
6674
56906eef 6675static void *getMcontextEip(ucontext_t *uc) {
6676#if defined(__FreeBSD__)
6677 return (void*) uc->uc_mcontext.mc_eip;
6678#elif defined(__dietlibc__)
6679 return (void*) uc->uc_mcontext.eip;
06db1f50 6680#elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
da0a1620 6681 #if __x86_64__
6682 return (void*) uc->uc_mcontext->__ss.__rip;
6683 #else
56906eef 6684 return (void*) uc->uc_mcontext->__ss.__eip;
da0a1620 6685 #endif
06db1f50 6686#elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
cb7e07cc 6687 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
06db1f50 6688 return (void*) uc->uc_mcontext->__ss.__rip;
cbc59b38 6689 #else
6690 return (void*) uc->uc_mcontext->__ss.__eip;
6691 #endif
c04c9ac9 6692#elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
6693 return (void*) uc->uc_mcontext.gregs[REG_EIP]; /* Linux 32/64 bit */
b91cf5ef 6694#elif defined(__ia64__) /* Linux IA64 */
6695 return (void*) uc->uc_mcontext.sc_ip;
6696#else
6697 return NULL;
56906eef 6698#endif
6699}
6700
6701static void segvHandler(int sig, siginfo_t *info, void *secret) {
6702 void *trace[100];
6703 char **messages = NULL;
6704 int i, trace_size = 0;
6705 unsigned long offset=0;
56906eef 6706 ucontext_t *uc = (ucontext_t*) secret;
1c85b79f 6707 sds infostring;
56906eef 6708 REDIS_NOTUSED(info);
6709
6710 redisLog(REDIS_WARNING,
6711 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
1c85b79f 6712 infostring = genRedisInfoString();
6713 redisLog(REDIS_WARNING, "%s",infostring);
6714 /* It's not safe to sdsfree() the returned string under memory
6715 * corruption conditions. Let it leak as we are going to abort */
56906eef 6716
6717 trace_size = backtrace(trace, 100);
de96dbfe 6718 /* overwrite sigaction with caller's address */
b91cf5ef 6719 if (getMcontextEip(uc) != NULL) {
6720 trace[1] = getMcontextEip(uc);
6721 }
56906eef 6722 messages = backtrace_symbols(trace, trace_size);
fe3bbfbe 6723
d76412d1 6724 for (i=1; i<trace_size; ++i) {
56906eef 6725 char *fn = findFuncName(trace[i], &offset), *p;
6726
6727 p = strchr(messages[i],'+');
6728 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6729 redisLog(REDIS_WARNING,"%s", messages[i]);
6730 } else {
6731 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6732 }
6733 }
b177fd30 6734 /* free(messages); Don't call free() with possibly corrupted memory. */
56906eef 6735 exit(0);
fe3bbfbe 6736}
56906eef 6737
6738static void setupSigSegvAction(void) {
6739 struct sigaction act;
6740
6741 sigemptyset (&act.sa_mask);
6742 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6743 * is used. Otherwise, sa_handler is used */
6744 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6745 act.sa_sigaction = segvHandler;
6746 sigaction (SIGSEGV, &act, NULL);
6747 sigaction (SIGBUS, &act, NULL);
12fea928 6748 sigaction (SIGFPE, &act, NULL);
6749 sigaction (SIGILL, &act, NULL);
6750 sigaction (SIGBUS, &act, NULL);
e65fdc78 6751 return;
56906eef 6752}
e65fdc78 6753
bcfc686d 6754#include "staticsymbols.h"
6755/* This function try to convert a pointer into a function name. It's used in
6756 * oreder to provide a backtrace under segmentation fault that's able to
6757 * display functions declared as static (otherwise the backtrace is useless). */
6758static char *findFuncName(void *pointer, unsigned long *offset){
6759 int i, ret = -1;
6760 unsigned long off, minoff = 0;
ed9b544e 6761
bcfc686d 6762 /* Try to match against the Symbol with the smallest offset */
6763 for (i=0; symsTable[i].pointer; i++) {
6764 unsigned long lp = (unsigned long) pointer;
0bc03378 6765
bcfc686d 6766 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6767 off=lp-symsTable[i].pointer;
6768 if (ret < 0 || off < minoff) {
6769 minoff=off;
6770 ret=i;
6771 }
6772 }
0bc03378 6773 }
bcfc686d 6774 if (ret == -1) return NULL;
6775 *offset = minoff;
6776 return symsTable[ret].name;
0bc03378 6777}
bcfc686d 6778#else /* HAVE_BACKTRACE */
6779static void setupSigSegvAction(void) {
0bc03378 6780}
bcfc686d 6781#endif /* HAVE_BACKTRACE */
0bc03378 6782
ed9b544e 6783
ed9b544e 6784
bcfc686d 6785/* The End */
6786
6787
ed9b544e 6788