]> git.saurik.com Git - redis.git/blame - redis.c
BLPOP timeouts implemented
[redis.git] / redis.c
CommitLineData
ed9b544e 1/*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
7c8163b8 30#define REDIS_VERSION "1.3.0"
23d4709d 31
32#include "fmacros.h"
fbf9bcdb 33#include "config.h"
ed9b544e 34
35#include <stdio.h>
36#include <stdlib.h>
37#include <string.h>
38#include <time.h>
39#include <unistd.h>
c9468bcf 40#define __USE_POSIX199309
ed9b544e 41#include <signal.h>
fbf9bcdb 42
43#ifdef HAVE_BACKTRACE
c9468bcf 44#include <execinfo.h>
45#include <ucontext.h>
fbf9bcdb 46#endif /* HAVE_BACKTRACE */
47
ed9b544e 48#include <sys/wait.h>
49#include <errno.h>
50#include <assert.h>
51#include <ctype.h>
52#include <stdarg.h>
53#include <inttypes.h>
54#include <arpa/inet.h>
55#include <sys/stat.h>
56#include <fcntl.h>
57#include <sys/time.h>
58#include <sys/resource.h>
2895e862 59#include <sys/uio.h>
f78fd11b 60#include <limits.h>
a7866db6 61#include <math.h>
0bc1b2f6 62
63#if defined(__sun)
5043dff3 64#include "solarisfixes.h"
65#endif
ed9b544e 66
c9468bcf 67#include "redis.h"
ed9b544e 68#include "ae.h" /* Event driven programming library */
69#include "sds.h" /* Dynamic safe strings */
70#include "anet.h" /* Networking the easy way */
71#include "dict.h" /* Hash tables */
72#include "adlist.h" /* Linked lists */
73#include "zmalloc.h" /* total memory usage aware version of malloc/free */
5f5b9840 74#include "lzf.h" /* LZF compression library */
75#include "pqsort.h" /* Partial qsort for SORT+LIMIT */
ed9b544e 76
77/* Error codes */
78#define REDIS_OK 0
79#define REDIS_ERR -1
80
81/* Static server configuration */
82#define REDIS_SERVERPORT 6379 /* TCP port */
83#define REDIS_MAXIDLETIME (60*5) /* default client timeout */
6208b3a7 84#define REDIS_IOBUF_LEN 1024
ed9b544e 85#define REDIS_LOADBUF_LEN 1024
93ea3759 86#define REDIS_STATIC_ARGS 4
ed9b544e 87#define REDIS_DEFAULT_DBNUM 16
88#define REDIS_CONFIGLINE_MAX 1024
89#define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90#define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
94754ccc 91#define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
6f376729 92#define REDIS_MAX_WRITE_PER_EVENT (1024*64)
2895e862 93#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96#define REDIS_WRITEV_THRESHOLD 3
97/* Max number of iovecs used for each writev call */
98#define REDIS_WRITEV_IOVEC_COUNT 256
ed9b544e 99
100/* Hash table parameters */
101#define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
ed9b544e 102
103/* Command flags */
3fd78bcd 104#define REDIS_CMD_BULK 1 /* Bulk write command */
105#define REDIS_CMD_INLINE 2 /* Inline command */
106/* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110#define REDIS_CMD_DENYOOM 4
ed9b544e 111
112/* Object types */
113#define REDIS_STRING 0
114#define REDIS_LIST 1
115#define REDIS_SET 2
1812e024 116#define REDIS_ZSET 3
117#define REDIS_HASH 4
f78fd11b 118
942a3961 119/* Objects encoding */
120#define REDIS_ENCODING_RAW 0 /* Raw representation */
121#define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
f78fd11b 123/* Object types only used for dumping to disk */
bb32ede5 124#define REDIS_EXPIRETIME 253
ed9b544e 125#define REDIS_SELECTDB 254
126#define REDIS_EOF 255
127
f78fd11b 128/* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
a4d1ba9a 135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
f78fd11b 138 *
10c43610 139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
f78fd11b 141#define REDIS_RDB_6BITLEN 0
142#define REDIS_RDB_14BITLEN 1
143#define REDIS_RDB_32BITLEN 2
17be1a4a 144#define REDIS_RDB_ENCVAL 3
f78fd11b 145#define REDIS_RDB_LENERR UINT_MAX
146
a4d1ba9a 147/* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150#define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151#define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
774e3047 153#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
a4d1ba9a 154
ed9b544e 155/* Client flags */
156#define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157#define REDIS_SLAVE 2 /* This client is a slave server */
158#define REDIS_MASTER 4 /* This client is a master server */
87eca727 159#define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
6e469882 160#define REDIS_MULTI 16 /* This client is in a MULTI context */
4409877e 161#define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
ed9b544e 162
40d224a9 163/* Slave replication state - slave side */
ed9b544e 164#define REDIS_REPL_NONE 0 /* No active replication */
165#define REDIS_REPL_CONNECT 1 /* Must connect to master */
166#define REDIS_REPL_CONNECTED 2 /* Connected to master */
167
40d224a9 168/* Slave replication state - from the point of view of master
169 * Note that in SEND_BULK and ONLINE state the slave receives new updates
170 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
171 * to start the next background saving in order to send updates to it. */
172#define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
173#define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
174#define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
175#define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
176
ed9b544e 177/* List related stuff */
178#define REDIS_HEAD 0
179#define REDIS_TAIL 1
180
181/* Sort operations */
182#define REDIS_SORT_GET 0
443c6409 183#define REDIS_SORT_ASC 1
184#define REDIS_SORT_DESC 2
ed9b544e 185#define REDIS_SORTKEY_MAX 1024
186
187/* Log levels */
188#define REDIS_DEBUG 0
189#define REDIS_NOTICE 1
190#define REDIS_WARNING 2
191
192/* Anti-warning macro... */
193#define REDIS_NOTUSED(V) ((void) V)
194
6b47e12e 195#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
196#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
ed9b544e 197
48f0308a 198/* Append only defines */
199#define APPENDFSYNC_NO 0
200#define APPENDFSYNC_ALWAYS 1
201#define APPENDFSYNC_EVERYSEC 2
202
dfc5e96c 203/* We can print the stacktrace, so our assert is defined this way: */
204#define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
205static void _redisAssert(char *estr);
206
ed9b544e 207/*================================= Data types ============================== */
208
209/* A redis object, that is a type able to hold a string / list / set */
210typedef struct redisObject {
ed9b544e 211 void *ptr;
942a3961 212 unsigned char type;
213 unsigned char encoding;
214 unsigned char notused[2];
ed9b544e 215 int refcount;
216} robj;
217
dfc5e96c 218/* Macro used to initalize a Redis object allocated on the stack.
219 * Note that this macro is taken near the structure definition to make sure
220 * we'll update it when the structure is changed, to avoid bugs like
221 * bug #85 introduced exactly in this way. */
222#define initStaticStringObject(_var,_ptr) do { \
223 _var.refcount = 1; \
224 _var.type = REDIS_STRING; \
225 _var.encoding = REDIS_ENCODING_RAW; \
226 _var.ptr = _ptr; \
227} while(0);
228
3305306f 229typedef struct redisDb {
4409877e 230 dict *dict; /* The keyspace for this DB */
231 dict *expires; /* Timeout of keys with a timeout set */
232 dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */
3305306f 233 int id;
234} redisDb;
235
6e469882 236/* Client MULTI/EXEC state */
237typedef struct multiCmd {
238 robj **argv;
239 int argc;
240 struct redisCommand *cmd;
241} multiCmd;
242
243typedef struct multiState {
244 multiCmd *commands; /* Array of MULTI commands */
245 int count; /* Total number of MULTI commands */
246} multiState;
247
ed9b544e 248/* With multiplexing we need to take per-clinet state.
249 * Clients are taken in a liked list. */
250typedef struct redisClient {
251 int fd;
3305306f 252 redisDb *db;
ed9b544e 253 int dictid;
254 sds querybuf;
e8a74421 255 robj **argv, **mbargv;
256 int argc, mbargc;
40d224a9 257 int bulklen; /* bulk read len. -1 if not in bulk read mode */
e8a74421 258 int multibulk; /* multi bulk command format active */
ed9b544e 259 list *reply;
260 int sentlen;
261 time_t lastinteraction; /* time of the last interaction, used for timeout */
40d224a9 262 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
6e469882 263 /* REDIS_MULTI */
40d224a9 264 int slaveseldb; /* slave selected db, if this client is a slave */
265 int authenticated; /* when requirepass is non-NULL */
266 int replstate; /* replication state if this is a slave */
267 int repldbfd; /* replication DB file descriptor */
6e469882 268 long repldboff; /* replication DB file offset */
40d224a9 269 off_t repldbsize; /* replication DB file size */
6e469882 270 multiState mstate; /* MULTI/EXEC state */
4409877e 271 robj *blockingkey; /* The key we waiting to terminate a blocking
272 * operation such as BLPOP. Otherwise NULL. */
273 time_t blockingto; /* Blocking operation timeout. If UNIX current time
274 * is >= blockingto then the operation timed out. */
ed9b544e 275} redisClient;
276
277struct saveparam {
278 time_t seconds;
279 int changes;
280};
281
282/* Global server state structure */
283struct redisServer {
284 int port;
285 int fd;
3305306f 286 redisDb *db;
4409877e 287 dict *sharingpool; /* Poll used for object sharing */
10c43610 288 unsigned int sharingpoolsize;
ed9b544e 289 long long dirty; /* changes to DB from the last save */
290 list *clients;
87eca727 291 list *slaves, *monitors;
ed9b544e 292 char neterr[ANET_ERR_LEN];
293 aeEventLoop *el;
294 int cronloops; /* number of times the cron function run */
295 list *objfreelist; /* A list of freed objects to avoid malloc() */
296 time_t lastsave; /* Unix time of last save succeeede */
5fba9f71 297 size_t usedmemory; /* Used memory in megabytes */
ed9b544e 298 /* Fields used only for stats */
299 time_t stat_starttime; /* server start time */
300 long long stat_numcommands; /* number of processed commands */
301 long long stat_numconnections; /* number of connections received */
302 /* Configuration */
303 int verbosity;
304 int glueoutputbuf;
305 int maxidletime;
306 int dbnum;
307 int daemonize;
44b38ef4 308 int appendonly;
48f0308a 309 int appendfsync;
310 time_t lastfsync;
44b38ef4 311 int appendfd;
312 int appendseldb;
ed329fcf 313 char *pidfile;
9f3c422c 314 pid_t bgsavechildpid;
9d65a1bb 315 pid_t bgrewritechildpid;
316 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
ed9b544e 317 struct saveparam *saveparams;
318 int saveparamslen;
319 char *logfile;
320 char *bindaddr;
321 char *dbfilename;
44b38ef4 322 char *appendfilename;
abcb223e 323 char *requirepass;
10c43610 324 int shareobjects;
121f70cf 325 int rdbcompression;
ed9b544e 326 /* Replication related */
327 int isslave;
d0ccebcf 328 char *masterauth;
ed9b544e 329 char *masterhost;
330 int masterport;
40d224a9 331 redisClient *master; /* client that is master for this slave */
ed9b544e 332 int replstate;
285add55 333 unsigned int maxclients;
d4465900 334 unsigned long maxmemory;
f86a74e9 335 unsigned int blockedclients;
ed9b544e 336 /* Sort parameters - qsort_r() is only available under BSD so we
337 * have to take this state global, in order to pass it to sortCompare() */
338 int sort_desc;
339 int sort_alpha;
340 int sort_bypattern;
341};
342
343typedef void redisCommandProc(redisClient *c);
344struct redisCommand {
345 char *name;
346 redisCommandProc *proc;
347 int arity;
348 int flags;
349};
350
de96dbfe 351struct redisFunctionSym {
352 char *name;
56906eef 353 unsigned long pointer;
de96dbfe 354};
355
ed9b544e 356typedef struct _redisSortObject {
357 robj *obj;
358 union {
359 double score;
360 robj *cmpobj;
361 } u;
362} redisSortObject;
363
364typedef struct _redisSortOperation {
365 int type;
366 robj *pattern;
367} redisSortOperation;
368
6b47e12e 369/* ZSETs use a specialized version of Skiplists */
370
371typedef struct zskiplistNode {
372 struct zskiplistNode **forward;
e3870fab 373 struct zskiplistNode *backward;
6b47e12e 374 double score;
375 robj *obj;
376} zskiplistNode;
377
378typedef struct zskiplist {
e3870fab 379 struct zskiplistNode *header, *tail;
d13f767c 380 unsigned long length;
6b47e12e 381 int level;
382} zskiplist;
383
1812e024 384typedef struct zset {
385 dict *dict;
6b47e12e 386 zskiplist *zsl;
1812e024 387} zset;
388
6b47e12e 389/* Our shared "common" objects */
390
ed9b544e 391struct sharedObjectsStruct {
c937aa89 392 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
6e469882 393 *colon, *nullbulk, *nullmultibulk, *queued,
c937aa89 394 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
395 *outofrangeerr, *plus,
ed9b544e 396 *select0, *select1, *select2, *select3, *select4,
397 *select5, *select6, *select7, *select8, *select9;
398} shared;
399
a7866db6 400/* Global vars that are actally used as constants. The following double
401 * values are used for double on-disk serialization, and are initialized
402 * at runtime to avoid strange compiler optimizations. */
403
404static double R_Zero, R_PosInf, R_NegInf, R_Nan;
405
ed9b544e 406/*================================ Prototypes =============================== */
407
408static void freeStringObject(robj *o);
409static void freeListObject(robj *o);
410static void freeSetObject(robj *o);
411static void decrRefCount(void *o);
412static robj *createObject(int type, void *ptr);
413static void freeClient(redisClient *c);
f78fd11b 414static int rdbLoad(char *filename);
ed9b544e 415static void addReply(redisClient *c, robj *obj);
416static void addReplySds(redisClient *c, sds s);
417static void incrRefCount(robj *o);
f78fd11b 418static int rdbSaveBackground(char *filename);
ed9b544e 419static robj *createStringObject(char *ptr, size_t len);
87eca727 420static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
44b38ef4 421static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
ed9b544e 422static int syncWithMaster(void);
10c43610 423static robj *tryObjectSharing(robj *o);
942a3961 424static int tryObjectEncoding(robj *o);
9d65a1bb 425static robj *getDecodedObject(robj *o);
3305306f 426static int removeExpire(redisDb *db, robj *key);
427static int expireIfNeeded(redisDb *db, robj *key);
428static int deleteIfVolatile(redisDb *db, robj *key);
94754ccc 429static int deleteKey(redisDb *db, robj *key);
bb32ede5 430static time_t getExpire(redisDb *db, robj *key);
431static int setExpire(redisDb *db, robj *key, time_t when);
a3b21203 432static void updateSlavesWaitingBgsave(int bgsaveerr);
3fd78bcd 433static void freeMemoryIfNeeded(void);
de96dbfe 434static int processCommand(redisClient *c);
56906eef 435static void setupSigSegvAction(void);
a3b21203 436static void rdbRemoveTempFile(pid_t childpid);
9d65a1bb 437static void aofRemoveTempFile(pid_t childpid);
0ea663ea 438static size_t stringObjectLen(robj *o);
638e42ac 439static void processInputBuffer(redisClient *c);
6b47e12e 440static zskiplist *zslCreate(void);
fd8ccf44 441static void zslFree(zskiplist *zsl);
2b59cfdf 442static void zslInsert(zskiplist *zsl, double score, robj *obj);
2895e862 443static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
6e469882 444static void initClientMultiState(redisClient *c);
445static void freeClientMultiState(redisClient *c);
446static void queueMultiCommand(redisClient *c, struct redisCommand *cmd);
4409877e 447static void unblockClient(redisClient *c);
448static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
ed9b544e 449
abcb223e 450static void authCommand(redisClient *c);
ed9b544e 451static void pingCommand(redisClient *c);
452static void echoCommand(redisClient *c);
453static void setCommand(redisClient *c);
454static void setnxCommand(redisClient *c);
455static void getCommand(redisClient *c);
456static void delCommand(redisClient *c);
457static void existsCommand(redisClient *c);
458static void incrCommand(redisClient *c);
459static void decrCommand(redisClient *c);
460static void incrbyCommand(redisClient *c);
461static void decrbyCommand(redisClient *c);
462static void selectCommand(redisClient *c);
463static void randomkeyCommand(redisClient *c);
464static void keysCommand(redisClient *c);
465static void dbsizeCommand(redisClient *c);
466static void lastsaveCommand(redisClient *c);
467static void saveCommand(redisClient *c);
468static void bgsaveCommand(redisClient *c);
9d65a1bb 469static void bgrewriteaofCommand(redisClient *c);
ed9b544e 470static void shutdownCommand(redisClient *c);
471static void moveCommand(redisClient *c);
472static void renameCommand(redisClient *c);
473static void renamenxCommand(redisClient *c);
474static void lpushCommand(redisClient *c);
475static void rpushCommand(redisClient *c);
476static void lpopCommand(redisClient *c);
477static void rpopCommand(redisClient *c);
478static void llenCommand(redisClient *c);
479static void lindexCommand(redisClient *c);
480static void lrangeCommand(redisClient *c);
481static void ltrimCommand(redisClient *c);
482static void typeCommand(redisClient *c);
483static void lsetCommand(redisClient *c);
484static void saddCommand(redisClient *c);
485static void sremCommand(redisClient *c);
a4460ef4 486static void smoveCommand(redisClient *c);
ed9b544e 487static void sismemberCommand(redisClient *c);
488static void scardCommand(redisClient *c);
12fea928 489static void spopCommand(redisClient *c);
2abb95a9 490static void srandmemberCommand(redisClient *c);
ed9b544e 491static void sinterCommand(redisClient *c);
492static void sinterstoreCommand(redisClient *c);
40d224a9 493static void sunionCommand(redisClient *c);
494static void sunionstoreCommand(redisClient *c);
f4f56e1d 495static void sdiffCommand(redisClient *c);
496static void sdiffstoreCommand(redisClient *c);
ed9b544e 497static void syncCommand(redisClient *c);
498static void flushdbCommand(redisClient *c);
499static void flushallCommand(redisClient *c);
500static void sortCommand(redisClient *c);
501static void lremCommand(redisClient *c);
0f5f7e9a 502static void rpoplpushcommand(redisClient *c);
ed9b544e 503static void infoCommand(redisClient *c);
70003d28 504static void mgetCommand(redisClient *c);
87eca727 505static void monitorCommand(redisClient *c);
3305306f 506static void expireCommand(redisClient *c);
802e8373 507static void expireatCommand(redisClient *c);
f6b141c5 508static void getsetCommand(redisClient *c);
fd88489a 509static void ttlCommand(redisClient *c);
321b0e13 510static void slaveofCommand(redisClient *c);
7f957c92 511static void debugCommand(redisClient *c);
f6b141c5 512static void msetCommand(redisClient *c);
513static void msetnxCommand(redisClient *c);
fd8ccf44 514static void zaddCommand(redisClient *c);
7db723ad 515static void zincrbyCommand(redisClient *c);
cc812361 516static void zrangeCommand(redisClient *c);
50c55df5 517static void zrangebyscoreCommand(redisClient *c);
e3870fab 518static void zrevrangeCommand(redisClient *c);
3c41331e 519static void zcardCommand(redisClient *c);
1b7106e7 520static void zremCommand(redisClient *c);
6e333bbe 521static void zscoreCommand(redisClient *c);
1807985b 522static void zremrangebyscoreCommand(redisClient *c);
6e469882 523static void multiCommand(redisClient *c);
524static void execCommand(redisClient *c);
4409877e 525static void blpopCommand(redisClient *c);
526static void brpopCommand(redisClient *c);
f6b141c5 527
ed9b544e 528/*================================= Globals ================================= */
529
530/* Global vars */
531static struct redisServer server; /* server global state */
532static struct redisCommand cmdTable[] = {
533 {"get",getCommand,2,REDIS_CMD_INLINE},
3fd78bcd 534 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
535 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
5109cdff 536 {"del",delCommand,-2,REDIS_CMD_INLINE},
ed9b544e 537 {"exists",existsCommand,2,REDIS_CMD_INLINE},
3fd78bcd 538 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
539 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
70003d28 540 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
3fd78bcd 541 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
542 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 543 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
544 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
4409877e 545 {"brpop",brpopCommand,3,REDIS_CMD_INLINE},
546 {"blpop",blpopCommand,3,REDIS_CMD_INLINE},
ed9b544e 547 {"llen",llenCommand,2,REDIS_CMD_INLINE},
548 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
3fd78bcd 549 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 550 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
551 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
552 {"lrem",lremCommand,4,REDIS_CMD_BULK},
0b13687c 553 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
3fd78bcd 554 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 555 {"srem",sremCommand,3,REDIS_CMD_BULK},
a4460ef4 556 {"smove",smoveCommand,4,REDIS_CMD_BULK},
ed9b544e 557 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
558 {"scard",scardCommand,2,REDIS_CMD_INLINE},
12fea928 559 {"spop",spopCommand,2,REDIS_CMD_INLINE},
2abb95a9 560 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
3fd78bcd 561 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
562 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
563 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
564 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
565 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
566 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
ed9b544e 567 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
fd8ccf44 568 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
7db723ad 569 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
1b7106e7 570 {"zrem",zremCommand,3,REDIS_CMD_BULK},
1807985b 571 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
752da584 572 {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE},
80181f78 573 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
752da584 574 {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE},
3c41331e 575 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
6e333bbe 576 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
3fd78bcd 577 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
578 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
f6b141c5 579 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
580 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
581 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
ed9b544e 582 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
583 {"select",selectCommand,2,REDIS_CMD_INLINE},
584 {"move",moveCommand,3,REDIS_CMD_INLINE},
585 {"rename",renameCommand,3,REDIS_CMD_INLINE},
586 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
321b0e13 587 {"expire",expireCommand,3,REDIS_CMD_INLINE},
802e8373 588 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
ed9b544e 589 {"keys",keysCommand,2,REDIS_CMD_INLINE},
590 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
abcb223e 591 {"auth",authCommand,2,REDIS_CMD_INLINE},
ed9b544e 592 {"ping",pingCommand,1,REDIS_CMD_INLINE},
593 {"echo",echoCommand,2,REDIS_CMD_BULK},
594 {"save",saveCommand,1,REDIS_CMD_INLINE},
595 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
9d65a1bb 596 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
ed9b544e 597 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
598 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
599 {"type",typeCommand,2,REDIS_CMD_INLINE},
6e469882 600 {"multi",multiCommand,1,REDIS_CMD_INLINE},
601 {"exec",execCommand,1,REDIS_CMD_INLINE},
ed9b544e 602 {"sync",syncCommand,1,REDIS_CMD_INLINE},
603 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
604 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
3fd78bcd 605 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
ed9b544e 606 {"info",infoCommand,1,REDIS_CMD_INLINE},
87eca727 607 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
fd88489a 608 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
321b0e13 609 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
7f957c92 610 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
ed9b544e 611 {NULL,NULL,0,0}
612};
bcfc686d 613
ed9b544e 614/*============================ Utility functions ============================ */
615
616/* Glob-style pattern matching. */
617int stringmatchlen(const char *pattern, int patternLen,
618 const char *string, int stringLen, int nocase)
619{
620 while(patternLen) {
621 switch(pattern[0]) {
622 case '*':
623 while (pattern[1] == '*') {
624 pattern++;
625 patternLen--;
626 }
627 if (patternLen == 1)
628 return 1; /* match */
629 while(stringLen) {
630 if (stringmatchlen(pattern+1, patternLen-1,
631 string, stringLen, nocase))
632 return 1; /* match */
633 string++;
634 stringLen--;
635 }
636 return 0; /* no match */
637 break;
638 case '?':
639 if (stringLen == 0)
640 return 0; /* no match */
641 string++;
642 stringLen--;
643 break;
644 case '[':
645 {
646 int not, match;
647
648 pattern++;
649 patternLen--;
650 not = pattern[0] == '^';
651 if (not) {
652 pattern++;
653 patternLen--;
654 }
655 match = 0;
656 while(1) {
657 if (pattern[0] == '\\') {
658 pattern++;
659 patternLen--;
660 if (pattern[0] == string[0])
661 match = 1;
662 } else if (pattern[0] == ']') {
663 break;
664 } else if (patternLen == 0) {
665 pattern--;
666 patternLen++;
667 break;
668 } else if (pattern[1] == '-' && patternLen >= 3) {
669 int start = pattern[0];
670 int end = pattern[2];
671 int c = string[0];
672 if (start > end) {
673 int t = start;
674 start = end;
675 end = t;
676 }
677 if (nocase) {
678 start = tolower(start);
679 end = tolower(end);
680 c = tolower(c);
681 }
682 pattern += 2;
683 patternLen -= 2;
684 if (c >= start && c <= end)
685 match = 1;
686 } else {
687 if (!nocase) {
688 if (pattern[0] == string[0])
689 match = 1;
690 } else {
691 if (tolower((int)pattern[0]) == tolower((int)string[0]))
692 match = 1;
693 }
694 }
695 pattern++;
696 patternLen--;
697 }
698 if (not)
699 match = !match;
700 if (!match)
701 return 0; /* no match */
702 string++;
703 stringLen--;
704 break;
705 }
706 case '\\':
707 if (patternLen >= 2) {
708 pattern++;
709 patternLen--;
710 }
711 /* fall through */
712 default:
713 if (!nocase) {
714 if (pattern[0] != string[0])
715 return 0; /* no match */
716 } else {
717 if (tolower((int)pattern[0]) != tolower((int)string[0]))
718 return 0; /* no match */
719 }
720 string++;
721 stringLen--;
722 break;
723 }
724 pattern++;
725 patternLen--;
726 if (stringLen == 0) {
727 while(*pattern == '*') {
728 pattern++;
729 patternLen--;
730 }
731 break;
732 }
733 }
734 if (patternLen == 0 && stringLen == 0)
735 return 1;
736 return 0;
737}
738
56906eef 739static void redisLog(int level, const char *fmt, ...) {
ed9b544e 740 va_list ap;
741 FILE *fp;
742
743 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
744 if (!fp) return;
745
746 va_start(ap, fmt);
747 if (level >= server.verbosity) {
748 char *c = ".-*";
1904ecc1 749 char buf[64];
750 time_t now;
751
752 now = time(NULL);
6c9385e0 753 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
1904ecc1 754 fprintf(fp,"%s %c ",buf,c[level]);
ed9b544e 755 vfprintf(fp, fmt, ap);
756 fprintf(fp,"\n");
757 fflush(fp);
758 }
759 va_end(ap);
760
761 if (server.logfile) fclose(fp);
762}
763
764/*====================== Hash table type implementation ==================== */
765
766/* This is an hash table type that uses the SDS dynamic strings libary as
767 * keys and radis objects as values (objects can hold SDS strings,
768 * lists, sets). */
769
1812e024 770static void dictVanillaFree(void *privdata, void *val)
771{
772 DICT_NOTUSED(privdata);
773 zfree(val);
774}
775
4409877e 776static void dictListDestructor(void *privdata, void *val)
777{
778 DICT_NOTUSED(privdata);
779 listRelease((list*)val);
780}
781
ed9b544e 782static int sdsDictKeyCompare(void *privdata, const void *key1,
783 const void *key2)
784{
785 int l1,l2;
786 DICT_NOTUSED(privdata);
787
788 l1 = sdslen((sds)key1);
789 l2 = sdslen((sds)key2);
790 if (l1 != l2) return 0;
791 return memcmp(key1, key2, l1) == 0;
792}
793
794static void dictRedisObjectDestructor(void *privdata, void *val)
795{
796 DICT_NOTUSED(privdata);
797
798 decrRefCount(val);
799}
800
942a3961 801static int dictObjKeyCompare(void *privdata, const void *key1,
ed9b544e 802 const void *key2)
803{
804 const robj *o1 = key1, *o2 = key2;
805 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
806}
807
942a3961 808static unsigned int dictObjHash(const void *key) {
ed9b544e 809 const robj *o = key;
810 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
811}
812
942a3961 813static int dictEncObjKeyCompare(void *privdata, const void *key1,
814 const void *key2)
815{
9d65a1bb 816 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
817 int cmp;
942a3961 818
9d65a1bb 819 o1 = getDecodedObject(o1);
820 o2 = getDecodedObject(o2);
821 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
822 decrRefCount(o1);
823 decrRefCount(o2);
824 return cmp;
942a3961 825}
826
827static unsigned int dictEncObjHash(const void *key) {
9d65a1bb 828 robj *o = (robj*) key;
942a3961 829
9d65a1bb 830 o = getDecodedObject(o);
831 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
832 decrRefCount(o);
833 return hash;
942a3961 834}
835
ed9b544e 836static dictType setDictType = {
942a3961 837 dictEncObjHash, /* hash function */
ed9b544e 838 NULL, /* key dup */
839 NULL, /* val dup */
942a3961 840 dictEncObjKeyCompare, /* key compare */
ed9b544e 841 dictRedisObjectDestructor, /* key destructor */
842 NULL /* val destructor */
843};
844
1812e024 845static dictType zsetDictType = {
846 dictEncObjHash, /* hash function */
847 NULL, /* key dup */
848 NULL, /* val dup */
849 dictEncObjKeyCompare, /* key compare */
850 dictRedisObjectDestructor, /* key destructor */
da0a1620 851 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
1812e024 852};
853
ed9b544e 854static dictType hashDictType = {
942a3961 855 dictObjHash, /* hash function */
ed9b544e 856 NULL, /* key dup */
857 NULL, /* val dup */
942a3961 858 dictObjKeyCompare, /* key compare */
ed9b544e 859 dictRedisObjectDestructor, /* key destructor */
860 dictRedisObjectDestructor /* val destructor */
861};
862
4409877e 863/* Keylist hash table type has unencoded redis objects as keys and
864 * lists as values. It's used for blocking operations (BLPOP) */
865static dictType keylistDictType = {
866 dictObjHash, /* hash function */
867 NULL, /* key dup */
868 NULL, /* val dup */
869 dictObjKeyCompare, /* key compare */
870 dictRedisObjectDestructor, /* key destructor */
871 dictListDestructor /* val destructor */
872};
873
ed9b544e 874/* ========================= Random utility functions ======================= */
875
876/* Redis generally does not try to recover from out of memory conditions
877 * when allocating objects or strings, it is not clear if it will be possible
878 * to report this condition to the client since the networking layer itself
879 * is based on heap allocation for send buffers, so we simply abort.
880 * At least the code will be simpler to read... */
881static void oom(const char *msg) {
71c54b21 882 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
ed9b544e 883 sleep(1);
884 abort();
885}
886
887/* ====================== Redis server networking stuff ===================== */
56906eef 888static void closeTimedoutClients(void) {
ed9b544e 889 redisClient *c;
ed9b544e 890 listNode *ln;
891 time_t now = time(NULL);
892
6208b3a7 893 listRewind(server.clients);
894 while ((ln = listYield(server.clients)) != NULL) {
ed9b544e 895 c = listNodeValue(ln);
f86a74e9 896 if (server.maxidletime &&
897 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
c7cf2ec9 898 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
f86a74e9 899 (now - c->lastinteraction > server.maxidletime))
900 {
ed9b544e 901 redisLog(REDIS_DEBUG,"Closing idle client");
902 freeClient(c);
f86a74e9 903 } else if (c->flags & REDIS_BLOCKED) {
904 if (c->blockingto < now) {
905 addReply(c,shared.nullbulk);
906 unblockClient(c);
907 }
ed9b544e 908 }
909 }
ed9b544e 910}
911
12fea928 912static int htNeedsResize(dict *dict) {
913 long long size, used;
914
915 size = dictSlots(dict);
916 used = dictSize(dict);
917 return (size && used && size > DICT_HT_INITIAL_SIZE &&
918 (used*100/size < REDIS_HT_MINFILL));
919}
920
0bc03378 921/* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
922 * we resize the hash table to save memory */
56906eef 923static void tryResizeHashTables(void) {
0bc03378 924 int j;
925
926 for (j = 0; j < server.dbnum; j++) {
12fea928 927 if (htNeedsResize(server.db[j].dict)) {
928 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
0bc03378 929 dictResize(server.db[j].dict);
12fea928 930 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
0bc03378 931 }
12fea928 932 if (htNeedsResize(server.db[j].expires))
933 dictResize(server.db[j].expires);
0bc03378 934 }
935}
936
9d65a1bb 937/* A background saving child (BGSAVE) terminated its work. Handle this. */
938void backgroundSaveDoneHandler(int statloc) {
939 int exitcode = WEXITSTATUS(statloc);
940 int bysignal = WIFSIGNALED(statloc);
941
942 if (!bysignal && exitcode == 0) {
943 redisLog(REDIS_NOTICE,
944 "Background saving terminated with success");
945 server.dirty = 0;
946 server.lastsave = time(NULL);
947 } else if (!bysignal && exitcode != 0) {
948 redisLog(REDIS_WARNING, "Background saving error");
949 } else {
950 redisLog(REDIS_WARNING,
951 "Background saving terminated by signal");
952 rdbRemoveTempFile(server.bgsavechildpid);
953 }
954 server.bgsavechildpid = -1;
955 /* Possibly there are slaves waiting for a BGSAVE in order to be served
956 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
957 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
958}
959
960/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
961 * Handle this. */
962void backgroundRewriteDoneHandler(int statloc) {
963 int exitcode = WEXITSTATUS(statloc);
964 int bysignal = WIFSIGNALED(statloc);
965
966 if (!bysignal && exitcode == 0) {
967 int fd;
968 char tmpfile[256];
969
970 redisLog(REDIS_NOTICE,
971 "Background append only file rewriting terminated with success");
972 /* Now it's time to flush the differences accumulated by the parent */
973 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
974 fd = open(tmpfile,O_WRONLY|O_APPEND);
975 if (fd == -1) {
976 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
977 goto cleanup;
978 }
979 /* Flush our data... */
980 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
981 (signed) sdslen(server.bgrewritebuf)) {
982 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
983 close(fd);
984 goto cleanup;
985 }
b32627cd 986 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
9d65a1bb 987 /* Now our work is to rename the temp file into the stable file. And
988 * switch the file descriptor used by the server for append only. */
989 if (rename(tmpfile,server.appendfilename) == -1) {
990 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
991 close(fd);
992 goto cleanup;
993 }
994 /* Mission completed... almost */
995 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
996 if (server.appendfd != -1) {
997 /* If append only is actually enabled... */
998 close(server.appendfd);
999 server.appendfd = fd;
1000 fsync(fd);
85a83172 1001 server.appendseldb = -1; /* Make sure it will issue SELECT */
9d65a1bb 1002 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
1003 } else {
1004 /* If append only is disabled we just generate a dump in this
1005 * format. Why not? */
1006 close(fd);
1007 }
1008 } else if (!bysignal && exitcode != 0) {
1009 redisLog(REDIS_WARNING, "Background append only file rewriting error");
1010 } else {
1011 redisLog(REDIS_WARNING,
1012 "Background append only file rewriting terminated by signal");
1013 }
1014cleanup:
1015 sdsfree(server.bgrewritebuf);
1016 server.bgrewritebuf = sdsempty();
1017 aofRemoveTempFile(server.bgrewritechildpid);
1018 server.bgrewritechildpid = -1;
1019}
1020
56906eef 1021static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
94754ccc 1022 int j, loops = server.cronloops++;
ed9b544e 1023 REDIS_NOTUSED(eventLoop);
1024 REDIS_NOTUSED(id);
1025 REDIS_NOTUSED(clientData);
1026
1027 /* Update the global state with the amount of used memory */
1028 server.usedmemory = zmalloc_used_memory();
1029
0bc03378 1030 /* Show some info about non-empty databases */
ed9b544e 1031 for (j = 0; j < server.dbnum; j++) {
dec423d9 1032 long long size, used, vkeys;
94754ccc 1033
3305306f 1034 size = dictSlots(server.db[j].dict);
1035 used = dictSize(server.db[j].dict);
94754ccc 1036 vkeys = dictSize(server.db[j].expires);
c3cb078d 1037 if (!(loops % 5) && (used || vkeys)) {
1038 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
a4d1ba9a 1039 /* dictPrintStats(server.dict); */
ed9b544e 1040 }
ed9b544e 1041 }
1042
0bc03378 1043 /* We don't want to resize the hash tables while a bacground saving
1044 * is in progress: the saving child is created using fork() that is
1045 * implemented with a copy-on-write semantic in most modern systems, so
1046 * if we resize the HT while there is the saving child at work actually
1047 * a lot of memory movements in the parent will cause a lot of pages
1048 * copied. */
9d65a1bb 1049 if (server.bgsavechildpid == -1) tryResizeHashTables();
0bc03378 1050
ed9b544e 1051 /* Show information about connected clients */
1052 if (!(loops % 5)) {
21aecf4b 1053 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
ed9b544e 1054 listLength(server.clients)-listLength(server.slaves),
1055 listLength(server.slaves),
10c43610 1056 server.usedmemory,
3305306f 1057 dictSize(server.sharingpool));
ed9b544e 1058 }
1059
1060 /* Close connections of timedout clients */
f86a74e9 1061 if ((server.maxidletime && !(loops % 10)) || server.blockedclients)
ed9b544e 1062 closeTimedoutClients();
1063
9d65a1bb 1064 /* Check if a background saving or AOF rewrite in progress terminated */
1065 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
ed9b544e 1066 int statloc;
9d65a1bb 1067 pid_t pid;
1068
1069 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1070 if (pid == server.bgsavechildpid) {
1071 backgroundSaveDoneHandler(statloc);
ed9b544e 1072 } else {
9d65a1bb 1073 backgroundRewriteDoneHandler(statloc);
ed9b544e 1074 }
ed9b544e 1075 }
1076 } else {
1077 /* If there is not a background saving in progress check if
1078 * we have to save now */
1079 time_t now = time(NULL);
1080 for (j = 0; j < server.saveparamslen; j++) {
1081 struct saveparam *sp = server.saveparams+j;
1082
1083 if (server.dirty >= sp->changes &&
1084 now-server.lastsave > sp->seconds) {
1085 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1086 sp->changes, sp->seconds);
f78fd11b 1087 rdbSaveBackground(server.dbfilename);
ed9b544e 1088 break;
1089 }
1090 }
1091 }
94754ccc 1092
f2324293 1093 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1094 * will use few CPU cycles if there are few expiring keys, otherwise
1095 * it will get more aggressive to avoid that too much memory is used by
1096 * keys that can be removed from the keyspace. */
94754ccc 1097 for (j = 0; j < server.dbnum; j++) {
f2324293 1098 int expired;
94754ccc 1099 redisDb *db = server.db+j;
94754ccc 1100
f2324293 1101 /* Continue to expire if at the end of the cycle more than 25%
1102 * of the keys were expired. */
1103 do {
1104 int num = dictSize(db->expires);
94754ccc 1105 time_t now = time(NULL);
1106
f2324293 1107 expired = 0;
94754ccc 1108 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1109 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1110 while (num--) {
1111 dictEntry *de;
1112 time_t t;
1113
1114 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1115 t = (time_t) dictGetEntryVal(de);
1116 if (now > t) {
1117 deleteKey(db,dictGetEntryKey(de));
f2324293 1118 expired++;
94754ccc 1119 }
1120 }
f2324293 1121 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
94754ccc 1122 }
1123
ed9b544e 1124 /* Check if we should connect to a MASTER */
1125 if (server.replstate == REDIS_REPL_CONNECT) {
1126 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1127 if (syncWithMaster() == REDIS_OK) {
1128 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1129 }
1130 }
1131 return 1000;
1132}
1133
1134static void createSharedObjects(void) {
1135 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1136 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1137 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
c937aa89 1138 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1139 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1140 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1141 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1142 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1143 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
ed9b544e 1144 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
6e469882 1145 shared.queued = createObject(REDIS_STRING,sdsnew("+QUEUED\r\n"));
ed9b544e 1146 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1147 "-ERR Operation against a key holding the wrong kind of value\r\n"));
ed9b544e 1148 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1149 "-ERR no such key\r\n"));
ed9b544e 1150 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1151 "-ERR syntax error\r\n"));
c937aa89 1152 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1153 "-ERR source and destination objects are the same\r\n"));
1154 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1155 "-ERR index out of range\r\n"));
ed9b544e 1156 shared.space = createObject(REDIS_STRING,sdsnew(" "));
c937aa89 1157 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1158 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
ed9b544e 1159 shared.select0 = createStringObject("select 0\r\n",10);
1160 shared.select1 = createStringObject("select 1\r\n",10);
1161 shared.select2 = createStringObject("select 2\r\n",10);
1162 shared.select3 = createStringObject("select 3\r\n",10);
1163 shared.select4 = createStringObject("select 4\r\n",10);
1164 shared.select5 = createStringObject("select 5\r\n",10);
1165 shared.select6 = createStringObject("select 6\r\n",10);
1166 shared.select7 = createStringObject("select 7\r\n",10);
1167 shared.select8 = createStringObject("select 8\r\n",10);
1168 shared.select9 = createStringObject("select 9\r\n",10);
1169}
1170
1171static void appendServerSaveParams(time_t seconds, int changes) {
1172 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
ed9b544e 1173 server.saveparams[server.saveparamslen].seconds = seconds;
1174 server.saveparams[server.saveparamslen].changes = changes;
1175 server.saveparamslen++;
1176}
1177
bcfc686d 1178static void resetServerSaveParams() {
ed9b544e 1179 zfree(server.saveparams);
1180 server.saveparams = NULL;
1181 server.saveparamslen = 0;
1182}
1183
1184static void initServerConfig() {
1185 server.dbnum = REDIS_DEFAULT_DBNUM;
1186 server.port = REDIS_SERVERPORT;
1187 server.verbosity = REDIS_DEBUG;
1188 server.maxidletime = REDIS_MAXIDLETIME;
1189 server.saveparams = NULL;
1190 server.logfile = NULL; /* NULL = log on standard output */
1191 server.bindaddr = NULL;
1192 server.glueoutputbuf = 1;
1193 server.daemonize = 0;
44b38ef4 1194 server.appendonly = 0;
4e141d5a 1195 server.appendfsync = APPENDFSYNC_ALWAYS;
48f0308a 1196 server.lastfsync = time(NULL);
44b38ef4 1197 server.appendfd = -1;
1198 server.appendseldb = -1; /* Make sure the first time will not match */
ed329fcf 1199 server.pidfile = "/var/run/redis.pid";
ed9b544e 1200 server.dbfilename = "dump.rdb";
9d65a1bb 1201 server.appendfilename = "appendonly.aof";
abcb223e 1202 server.requirepass = NULL;
10c43610 1203 server.shareobjects = 0;
b0553789 1204 server.rdbcompression = 1;
21aecf4b 1205 server.sharingpoolsize = 1024;
285add55 1206 server.maxclients = 0;
f86a74e9 1207 server.blockedclients = 0;
3fd78bcd 1208 server.maxmemory = 0;
bcfc686d 1209 resetServerSaveParams();
ed9b544e 1210
1211 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1212 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1213 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1214 /* Replication related */
1215 server.isslave = 0;
d0ccebcf 1216 server.masterauth = NULL;
ed9b544e 1217 server.masterhost = NULL;
1218 server.masterport = 6379;
1219 server.master = NULL;
1220 server.replstate = REDIS_REPL_NONE;
a7866db6 1221
1222 /* Double constants initialization */
1223 R_Zero = 0.0;
1224 R_PosInf = 1.0/R_Zero;
1225 R_NegInf = -1.0/R_Zero;
1226 R_Nan = R_Zero/R_Zero;
ed9b544e 1227}
1228
1229static void initServer() {
1230 int j;
1231
1232 signal(SIGHUP, SIG_IGN);
1233 signal(SIGPIPE, SIG_IGN);
fe3bbfbe 1234 setupSigSegvAction();
ed9b544e 1235
1236 server.clients = listCreate();
1237 server.slaves = listCreate();
87eca727 1238 server.monitors = listCreate();
ed9b544e 1239 server.objfreelist = listCreate();
1240 createSharedObjects();
1241 server.el = aeCreateEventLoop();
3305306f 1242 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
10c43610 1243 server.sharingpool = dictCreate(&setDictType,NULL);
ed9b544e 1244 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1245 if (server.fd == -1) {
1246 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1247 exit(1);
1248 }
3305306f 1249 for (j = 0; j < server.dbnum; j++) {
1250 server.db[j].dict = dictCreate(&hashDictType,NULL);
1251 server.db[j].expires = dictCreate(&setDictType,NULL);
4409877e 1252 server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL);
3305306f 1253 server.db[j].id = j;
1254 }
ed9b544e 1255 server.cronloops = 0;
9f3c422c 1256 server.bgsavechildpid = -1;
9d65a1bb 1257 server.bgrewritechildpid = -1;
1258 server.bgrewritebuf = sdsempty();
ed9b544e 1259 server.lastsave = time(NULL);
1260 server.dirty = 0;
1261 server.usedmemory = 0;
1262 server.stat_numcommands = 0;
1263 server.stat_numconnections = 0;
1264 server.stat_starttime = time(NULL);
d8f8b666 1265 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
44b38ef4 1266
1267 if (server.appendonly) {
71eba477 1268 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
44b38ef4 1269 if (server.appendfd == -1) {
1270 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1271 strerror(errno));
1272 exit(1);
1273 }
1274 }
ed9b544e 1275}
1276
1277/* Empty the whole database */
ca37e9cd 1278static long long emptyDb() {
ed9b544e 1279 int j;
ca37e9cd 1280 long long removed = 0;
ed9b544e 1281
3305306f 1282 for (j = 0; j < server.dbnum; j++) {
ca37e9cd 1283 removed += dictSize(server.db[j].dict);
3305306f 1284 dictEmpty(server.db[j].dict);
1285 dictEmpty(server.db[j].expires);
1286 }
ca37e9cd 1287 return removed;
ed9b544e 1288}
1289
85dd2f3a 1290static int yesnotoi(char *s) {
1291 if (!strcasecmp(s,"yes")) return 1;
1292 else if (!strcasecmp(s,"no")) return 0;
1293 else return -1;
1294}
1295
ed9b544e 1296/* I agree, this is a very rudimental way to load a configuration...
1297 will improve later if the config gets more complex */
1298static void loadServerConfig(char *filename) {
c9a111ac 1299 FILE *fp;
ed9b544e 1300 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1301 int linenum = 0;
1302 sds line = NULL;
c9a111ac 1303
1304 if (filename[0] == '-' && filename[1] == '\0')
1305 fp = stdin;
1306 else {
1307 if ((fp = fopen(filename,"r")) == NULL) {
1308 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1309 exit(1);
1310 }
ed9b544e 1311 }
c9a111ac 1312
ed9b544e 1313 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1314 sds *argv;
1315 int argc, j;
1316
1317 linenum++;
1318 line = sdsnew(buf);
1319 line = sdstrim(line," \t\r\n");
1320
1321 /* Skip comments and blank lines*/
1322 if (line[0] == '#' || line[0] == '\0') {
1323 sdsfree(line);
1324 continue;
1325 }
1326
1327 /* Split into arguments */
1328 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1329 sdstolower(argv[0]);
1330
1331 /* Execute config directives */
bb0b03a3 1332 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
ed9b544e 1333 server.maxidletime = atoi(argv[1]);
0150db36 1334 if (server.maxidletime < 0) {
ed9b544e 1335 err = "Invalid timeout value"; goto loaderr;
1336 }
bb0b03a3 1337 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
ed9b544e 1338 server.port = atoi(argv[1]);
1339 if (server.port < 1 || server.port > 65535) {
1340 err = "Invalid port"; goto loaderr;
1341 }
bb0b03a3 1342 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
ed9b544e 1343 server.bindaddr = zstrdup(argv[1]);
bb0b03a3 1344 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
ed9b544e 1345 int seconds = atoi(argv[1]);
1346 int changes = atoi(argv[2]);
1347 if (seconds < 1 || changes < 0) {
1348 err = "Invalid save parameters"; goto loaderr;
1349 }
1350 appendServerSaveParams(seconds,changes);
bb0b03a3 1351 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
ed9b544e 1352 if (chdir(argv[1]) == -1) {
1353 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1354 argv[1], strerror(errno));
1355 exit(1);
1356 }
bb0b03a3 1357 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1358 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1359 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1360 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
ed9b544e 1361 else {
1362 err = "Invalid log level. Must be one of debug, notice, warning";
1363 goto loaderr;
1364 }
bb0b03a3 1365 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
c9a111ac 1366 FILE *logfp;
ed9b544e 1367
1368 server.logfile = zstrdup(argv[1]);
bb0b03a3 1369 if (!strcasecmp(server.logfile,"stdout")) {
ed9b544e 1370 zfree(server.logfile);
1371 server.logfile = NULL;
1372 }
1373 if (server.logfile) {
1374 /* Test if we are able to open the file. The server will not
1375 * be able to abort just for this problem later... */
c9a111ac 1376 logfp = fopen(server.logfile,"a");
1377 if (logfp == NULL) {
ed9b544e 1378 err = sdscatprintf(sdsempty(),
1379 "Can't open the log file: %s", strerror(errno));
1380 goto loaderr;
1381 }
c9a111ac 1382 fclose(logfp);
ed9b544e 1383 }
bb0b03a3 1384 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
ed9b544e 1385 server.dbnum = atoi(argv[1]);
1386 if (server.dbnum < 1) {
1387 err = "Invalid number of databases"; goto loaderr;
1388 }
285add55 1389 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1390 server.maxclients = atoi(argv[1]);
3fd78bcd 1391 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
d4465900 1392 server.maxmemory = strtoll(argv[1], NULL, 10);
bb0b03a3 1393 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
ed9b544e 1394 server.masterhost = sdsnew(argv[1]);
1395 server.masterport = atoi(argv[2]);
1396 server.replstate = REDIS_REPL_CONNECT;
d0ccebcf 1397 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1398 server.masterauth = zstrdup(argv[1]);
bb0b03a3 1399 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
85dd2f3a 1400 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
ed9b544e 1401 err = "argument must be 'yes' or 'no'"; goto loaderr;
1402 }
bb0b03a3 1403 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
85dd2f3a 1404 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
10c43610 1405 err = "argument must be 'yes' or 'no'"; goto loaderr;
1406 }
121f70cf 1407 } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
1408 if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
1409 err = "argument must be 'yes' or 'no'"; goto loaderr;
1410 }
e52c65b9 1411 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1412 server.sharingpoolsize = atoi(argv[1]);
1413 if (server.sharingpoolsize < 1) {
1414 err = "invalid object sharing pool size"; goto loaderr;
1415 }
bb0b03a3 1416 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
85dd2f3a 1417 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
ed9b544e 1418 err = "argument must be 'yes' or 'no'"; goto loaderr;
1419 }
44b38ef4 1420 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1421 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1422 err = "argument must be 'yes' or 'no'"; goto loaderr;
1423 }
48f0308a 1424 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1766c6da 1425 if (!strcasecmp(argv[1],"no")) {
48f0308a 1426 server.appendfsync = APPENDFSYNC_NO;
1766c6da 1427 } else if (!strcasecmp(argv[1],"always")) {
48f0308a 1428 server.appendfsync = APPENDFSYNC_ALWAYS;
1766c6da 1429 } else if (!strcasecmp(argv[1],"everysec")) {
48f0308a 1430 server.appendfsync = APPENDFSYNC_EVERYSEC;
1431 } else {
1432 err = "argument must be 'no', 'always' or 'everysec'";
1433 goto loaderr;
1434 }
bb0b03a3 1435 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
abcb223e 1436 server.requirepass = zstrdup(argv[1]);
bb0b03a3 1437 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
ed329fcf 1438 server.pidfile = zstrdup(argv[1]);
bb0b03a3 1439 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
b8b553c8 1440 server.dbfilename = zstrdup(argv[1]);
ed9b544e 1441 } else {
1442 err = "Bad directive or wrong number of arguments"; goto loaderr;
1443 }
1444 for (j = 0; j < argc; j++)
1445 sdsfree(argv[j]);
1446 zfree(argv);
1447 sdsfree(line);
1448 }
c9a111ac 1449 if (fp != stdin) fclose(fp);
ed9b544e 1450 return;
1451
1452loaderr:
1453 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1454 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1455 fprintf(stderr, ">>> '%s'\n", line);
1456 fprintf(stderr, "%s\n", err);
1457 exit(1);
1458}
1459
1460static void freeClientArgv(redisClient *c) {
1461 int j;
1462
1463 for (j = 0; j < c->argc; j++)
1464 decrRefCount(c->argv[j]);
e8a74421 1465 for (j = 0; j < c->mbargc; j++)
1466 decrRefCount(c->mbargv[j]);
ed9b544e 1467 c->argc = 0;
e8a74421 1468 c->mbargc = 0;
ed9b544e 1469}
1470
1471static void freeClient(redisClient *c) {
1472 listNode *ln;
1473
4409877e 1474 /* Note that if the client we are freeing is blocked into a blocking
1475 * call, we have to set querybuf to NULL *before* to call unblockClient()
1476 * to avoid processInputBuffer() will get called. Also it is important
1477 * to remove the file events after this, because this call adds
1478 * the READABLE event. */
1479 sdsfree(c->querybuf);
1480 c->querybuf = NULL;
1481 if (c->flags & REDIS_BLOCKED)
1482 unblockClient(c);
1483
ed9b544e 1484 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1485 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
ed9b544e 1486 listRelease(c->reply);
1487 freeClientArgv(c);
1488 close(c->fd);
1489 ln = listSearchKey(server.clients,c);
dfc5e96c 1490 redisAssert(ln != NULL);
ed9b544e 1491 listDelNode(server.clients,ln);
1492 if (c->flags & REDIS_SLAVE) {
6208b3a7 1493 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1494 close(c->repldbfd);
87eca727 1495 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1496 ln = listSearchKey(l,c);
dfc5e96c 1497 redisAssert(ln != NULL);
87eca727 1498 listDelNode(l,ln);
ed9b544e 1499 }
1500 if (c->flags & REDIS_MASTER) {
1501 server.master = NULL;
1502 server.replstate = REDIS_REPL_CONNECT;
1503 }
93ea3759 1504 zfree(c->argv);
e8a74421 1505 zfree(c->mbargv);
6e469882 1506 freeClientMultiState(c);
ed9b544e 1507 zfree(c);
1508}
1509
cc30e368 1510#define GLUEREPLY_UP_TO (1024)
ed9b544e 1511static void glueReplyBuffersIfNeeded(redisClient *c) {
c28b42ac 1512 int copylen = 0;
1513 char buf[GLUEREPLY_UP_TO];
6208b3a7 1514 listNode *ln;
ed9b544e 1515 robj *o;
1516
6208b3a7 1517 listRewind(c->reply);
1518 while((ln = listYield(c->reply))) {
c28b42ac 1519 int objlen;
1520
ed9b544e 1521 o = ln->value;
c28b42ac 1522 objlen = sdslen(o->ptr);
1523 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1524 memcpy(buf+copylen,o->ptr,objlen);
1525 copylen += objlen;
ed9b544e 1526 listDelNode(c->reply,ln);
c28b42ac 1527 } else {
1528 if (copylen == 0) return;
1529 break;
ed9b544e 1530 }
ed9b544e 1531 }
c28b42ac 1532 /* Now the output buffer is empty, add the new single element */
1533 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1534 listAddNodeHead(c->reply,o);
ed9b544e 1535}
1536
1537static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1538 redisClient *c = privdata;
1539 int nwritten = 0, totwritten = 0, objlen;
1540 robj *o;
1541 REDIS_NOTUSED(el);
1542 REDIS_NOTUSED(mask);
1543
2895e862 1544 /* Use writev() if we have enough buffers to send */
7ea870c0 1545 if (!server.glueoutputbuf &&
1546 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1547 !(c->flags & REDIS_MASTER))
2895e862 1548 {
1549 sendReplyToClientWritev(el, fd, privdata, mask);
1550 return;
1551 }
2895e862 1552
ed9b544e 1553 while(listLength(c->reply)) {
c28b42ac 1554 if (server.glueoutputbuf && listLength(c->reply) > 1)
1555 glueReplyBuffersIfNeeded(c);
1556
ed9b544e 1557 o = listNodeValue(listFirst(c->reply));
1558 objlen = sdslen(o->ptr);
1559
1560 if (objlen == 0) {
1561 listDelNode(c->reply,listFirst(c->reply));
1562 continue;
1563 }
1564
1565 if (c->flags & REDIS_MASTER) {
6f376729 1566 /* Don't reply to a master */
ed9b544e 1567 nwritten = objlen - c->sentlen;
1568 } else {
a4d1ba9a 1569 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
ed9b544e 1570 if (nwritten <= 0) break;
1571 }
1572 c->sentlen += nwritten;
1573 totwritten += nwritten;
1574 /* If we fully sent the object on head go to the next one */
1575 if (c->sentlen == objlen) {
1576 listDelNode(c->reply,listFirst(c->reply));
1577 c->sentlen = 0;
1578 }
6f376729 1579 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
12f9d551 1580 * bytes, in a single threaded server it's a good idea to serve
6f376729 1581 * other clients as well, even if a very large request comes from
1582 * super fast link that is always able to accept data (in real world
12f9d551 1583 * scenario think about 'KEYS *' against the loopback interfae) */
6f376729 1584 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
ed9b544e 1585 }
1586 if (nwritten == -1) {
1587 if (errno == EAGAIN) {
1588 nwritten = 0;
1589 } else {
1590 redisLog(REDIS_DEBUG,
1591 "Error writing to client: %s", strerror(errno));
1592 freeClient(c);
1593 return;
1594 }
1595 }
1596 if (totwritten > 0) c->lastinteraction = time(NULL);
1597 if (listLength(c->reply) == 0) {
1598 c->sentlen = 0;
1599 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1600 }
1601}
1602
2895e862 1603static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1604{
1605 redisClient *c = privdata;
1606 int nwritten = 0, totwritten = 0, objlen, willwrite;
1607 robj *o;
1608 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1609 int offset, ion = 0;
1610 REDIS_NOTUSED(el);
1611 REDIS_NOTUSED(mask);
1612
1613 listNode *node;
1614 while (listLength(c->reply)) {
1615 offset = c->sentlen;
1616 ion = 0;
1617 willwrite = 0;
1618
1619 /* fill-in the iov[] array */
1620 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1621 o = listNodeValue(node);
1622 objlen = sdslen(o->ptr);
1623
1624 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1625 break;
1626
1627 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1628 break; /* no more iovecs */
1629
1630 iov[ion].iov_base = ((char*)o->ptr) + offset;
1631 iov[ion].iov_len = objlen - offset;
1632 willwrite += objlen - offset;
1633 offset = 0; /* just for the first item */
1634 ion++;
1635 }
1636
1637 if(willwrite == 0)
1638 break;
1639
1640 /* write all collected blocks at once */
1641 if((nwritten = writev(fd, iov, ion)) < 0) {
1642 if (errno != EAGAIN) {
1643 redisLog(REDIS_DEBUG,
1644 "Error writing to client: %s", strerror(errno));
1645 freeClient(c);
1646 return;
1647 }
1648 break;
1649 }
1650
1651 totwritten += nwritten;
1652 offset = c->sentlen;
1653
1654 /* remove written robjs from c->reply */
1655 while (nwritten && listLength(c->reply)) {
1656 o = listNodeValue(listFirst(c->reply));
1657 objlen = sdslen(o->ptr);
1658
1659 if(nwritten >= objlen - offset) {
1660 listDelNode(c->reply, listFirst(c->reply));
1661 nwritten -= objlen - offset;
1662 c->sentlen = 0;
1663 } else {
1664 /* partial write */
1665 c->sentlen += nwritten;
1666 break;
1667 }
1668 offset = 0;
1669 }
1670 }
1671
1672 if (totwritten > 0)
1673 c->lastinteraction = time(NULL);
1674
1675 if (listLength(c->reply) == 0) {
1676 c->sentlen = 0;
1677 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1678 }
1679}
1680
ed9b544e 1681static struct redisCommand *lookupCommand(char *name) {
1682 int j = 0;
1683 while(cmdTable[j].name != NULL) {
bb0b03a3 1684 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
ed9b544e 1685 j++;
1686 }
1687 return NULL;
1688}
1689
1690/* resetClient prepare the client to process the next command */
1691static void resetClient(redisClient *c) {
1692 freeClientArgv(c);
1693 c->bulklen = -1;
e8a74421 1694 c->multibulk = 0;
ed9b544e 1695}
1696
6e469882 1697/* Call() is the core of Redis execution of a command */
1698static void call(redisClient *c, struct redisCommand *cmd) {
1699 long long dirty;
1700
1701 dirty = server.dirty;
1702 cmd->proc(c);
1703 if (server.appendonly && server.dirty-dirty)
1704 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1705 if (server.dirty-dirty && listLength(server.slaves))
1706 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1707 if (listLength(server.monitors))
1708 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1709 server.stat_numcommands++;
1710}
1711
ed9b544e 1712/* If this function gets called we already read a whole
1713 * command, argments are in the client argv/argc fields.
1714 * processCommand() execute the command or prepare the
1715 * server for a bulk read from the client.
1716 *
1717 * If 1 is returned the client is still alive and valid and
1718 * and other operations can be performed by the caller. Otherwise
1719 * if 0 is returned the client was destroied (i.e. after QUIT). */
1720static int processCommand(redisClient *c) {
1721 struct redisCommand *cmd;
ed9b544e 1722
3fd78bcd 1723 /* Free some memory if needed (maxmemory setting) */
1724 if (server.maxmemory) freeMemoryIfNeeded();
1725
e8a74421 1726 /* Handle the multi bulk command type. This is an alternative protocol
1727 * supported by Redis in order to receive commands that are composed of
1728 * multiple binary-safe "bulk" arguments. The latency of processing is
1729 * a bit higher but this allows things like multi-sets, so if this
1730 * protocol is used only for MSET and similar commands this is a big win. */
1731 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1732 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1733 if (c->multibulk <= 0) {
1734 resetClient(c);
1735 return 1;
1736 } else {
1737 decrRefCount(c->argv[c->argc-1]);
1738 c->argc--;
1739 return 1;
1740 }
1741 } else if (c->multibulk) {
1742 if (c->bulklen == -1) {
1743 if (((char*)c->argv[0]->ptr)[0] != '$') {
1744 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1745 resetClient(c);
1746 return 1;
1747 } else {
1748 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1749 decrRefCount(c->argv[0]);
1750 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1751 c->argc--;
1752 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1753 resetClient(c);
1754 return 1;
1755 }
1756 c->argc--;
1757 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1758 return 1;
1759 }
1760 } else {
1761 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1762 c->mbargv[c->mbargc] = c->argv[0];
1763 c->mbargc++;
1764 c->argc--;
1765 c->multibulk--;
1766 if (c->multibulk == 0) {
1767 robj **auxargv;
1768 int auxargc;
1769
1770 /* Here we need to swap the multi-bulk argc/argv with the
1771 * normal argc/argv of the client structure. */
1772 auxargv = c->argv;
1773 c->argv = c->mbargv;
1774 c->mbargv = auxargv;
1775
1776 auxargc = c->argc;
1777 c->argc = c->mbargc;
1778 c->mbargc = auxargc;
1779
1780 /* We need to set bulklen to something different than -1
1781 * in order for the code below to process the command without
1782 * to try to read the last argument of a bulk command as
1783 * a special argument. */
1784 c->bulklen = 0;
1785 /* continue below and process the command */
1786 } else {
1787 c->bulklen = -1;
1788 return 1;
1789 }
1790 }
1791 }
1792 /* -- end of multi bulk commands processing -- */
1793
ed9b544e 1794 /* The QUIT command is handled as a special case. Normal command
1795 * procs are unable to close the client connection safely */
bb0b03a3 1796 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
ed9b544e 1797 freeClient(c);
1798 return 0;
1799 }
1800 cmd = lookupCommand(c->argv[0]->ptr);
1801 if (!cmd) {
2c14807b 1802 addReplySds(c,
1803 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1804 (char*)c->argv[0]->ptr));
ed9b544e 1805 resetClient(c);
1806 return 1;
1807 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1808 (c->argc < -cmd->arity)) {
454d4e43 1809 addReplySds(c,
1810 sdscatprintf(sdsempty(),
1811 "-ERR wrong number of arguments for '%s' command\r\n",
1812 cmd->name));
ed9b544e 1813 resetClient(c);
1814 return 1;
3fd78bcd 1815 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1816 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1817 resetClient(c);
1818 return 1;
ed9b544e 1819 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1820 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1821
1822 decrRefCount(c->argv[c->argc-1]);
1823 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1824 c->argc--;
1825 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1826 resetClient(c);
1827 return 1;
1828 }
1829 c->argc--;
1830 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1831 /* It is possible that the bulk read is already in the
8d0490e7 1832 * buffer. Check this condition and handle it accordingly.
1833 * This is just a fast path, alternative to call processInputBuffer().
1834 * It's a good idea since the code is small and this condition
1835 * happens most of the times. */
ed9b544e 1836 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1837 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1838 c->argc++;
1839 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1840 } else {
1841 return 1;
1842 }
1843 }
10c43610 1844 /* Let's try to share objects on the command arguments vector */
1845 if (server.shareobjects) {
1846 int j;
1847 for(j = 1; j < c->argc; j++)
1848 c->argv[j] = tryObjectSharing(c->argv[j]);
1849 }
942a3961 1850 /* Let's try to encode the bulk object to save space. */
1851 if (cmd->flags & REDIS_CMD_BULK)
1852 tryObjectEncoding(c->argv[c->argc-1]);
1853
e63943a4 1854 /* Check if the user is authenticated */
1855 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1856 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1857 resetClient(c);
1858 return 1;
1859 }
1860
ed9b544e 1861 /* Exec the command */
6e469882 1862 if (c->flags & REDIS_MULTI && cmd->proc != execCommand) {
1863 queueMultiCommand(c,cmd);
1864 addReply(c,shared.queued);
1865 } else {
1866 call(c,cmd);
1867 }
ed9b544e 1868
1869 /* Prepare the client for the next command */
1870 if (c->flags & REDIS_CLOSE) {
1871 freeClient(c);
1872 return 0;
1873 }
1874 resetClient(c);
1875 return 1;
1876}
1877
87eca727 1878static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
6208b3a7 1879 listNode *ln;
ed9b544e 1880 int outc = 0, j;
93ea3759 1881 robj **outv;
1882 /* (args*2)+1 is enough room for args, spaces, newlines */
1883 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1884
1885 if (argc <= REDIS_STATIC_ARGS) {
1886 outv = static_outv;
1887 } else {
1888 outv = zmalloc(sizeof(robj*)*(argc*2+1));
93ea3759 1889 }
ed9b544e 1890
1891 for (j = 0; j < argc; j++) {
1892 if (j != 0) outv[outc++] = shared.space;
1893 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1894 robj *lenobj;
1895
1896 lenobj = createObject(REDIS_STRING,
682ac724 1897 sdscatprintf(sdsempty(),"%lu\r\n",
83c6a618 1898 (unsigned long) stringObjectLen(argv[j])));
ed9b544e 1899 lenobj->refcount = 0;
1900 outv[outc++] = lenobj;
1901 }
1902 outv[outc++] = argv[j];
1903 }
1904 outv[outc++] = shared.crlf;
1905
40d224a9 1906 /* Increment all the refcounts at start and decrement at end in order to
1907 * be sure to free objects if there is no slave in a replication state
1908 * able to be feed with commands */
1909 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
6208b3a7 1910 listRewind(slaves);
1911 while((ln = listYield(slaves))) {
ed9b544e 1912 redisClient *slave = ln->value;
40d224a9 1913
1914 /* Don't feed slaves that are still waiting for BGSAVE to start */
6208b3a7 1915 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
40d224a9 1916
1917 /* Feed all the other slaves, MONITORs and so on */
ed9b544e 1918 if (slave->slaveseldb != dictid) {
1919 robj *selectcmd;
1920
1921 switch(dictid) {
1922 case 0: selectcmd = shared.select0; break;
1923 case 1: selectcmd = shared.select1; break;
1924 case 2: selectcmd = shared.select2; break;
1925 case 3: selectcmd = shared.select3; break;
1926 case 4: selectcmd = shared.select4; break;
1927 case 5: selectcmd = shared.select5; break;
1928 case 6: selectcmd = shared.select6; break;
1929 case 7: selectcmd = shared.select7; break;
1930 case 8: selectcmd = shared.select8; break;
1931 case 9: selectcmd = shared.select9; break;
1932 default:
1933 selectcmd = createObject(REDIS_STRING,
1934 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1935 selectcmd->refcount = 0;
1936 break;
1937 }
1938 addReply(slave,selectcmd);
1939 slave->slaveseldb = dictid;
1940 }
1941 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
ed9b544e 1942 }
40d224a9 1943 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
93ea3759 1944 if (outv != static_outv) zfree(outv);
ed9b544e 1945}
1946
638e42ac 1947static void processInputBuffer(redisClient *c) {
ed9b544e 1948again:
4409877e 1949 /* Before to process the input buffer, make sure the client is not
1950 * waitig for a blocking operation such as BLPOP. Note that the first
1951 * iteration the client is never blocked, otherwise the processInputBuffer
1952 * would not be called at all, but after the execution of the first commands
1953 * in the input buffer the client may be blocked, and the "goto again"
1954 * will try to reiterate. The following line will make it return asap. */
1955 if (c->flags & REDIS_BLOCKED) return;
ed9b544e 1956 if (c->bulklen == -1) {
1957 /* Read the first line of the query */
1958 char *p = strchr(c->querybuf,'\n');
1959 size_t querylen;
644fafa3 1960
ed9b544e 1961 if (p) {
1962 sds query, *argv;
1963 int argc, j;
1964
1965 query = c->querybuf;
1966 c->querybuf = sdsempty();
1967 querylen = 1+(p-(query));
1968 if (sdslen(query) > querylen) {
1969 /* leave data after the first line of the query in the buffer */
1970 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1971 }
1972 *p = '\0'; /* remove "\n" */
1973 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1974 sdsupdatelen(query);
1975
1976 /* Now we can split the query in arguments */
ed9b544e 1977 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
93ea3759 1978 sdsfree(query);
1979
1980 if (c->argv) zfree(c->argv);
1981 c->argv = zmalloc(sizeof(robj*)*argc);
93ea3759 1982
1983 for (j = 0; j < argc; j++) {
ed9b544e 1984 if (sdslen(argv[j])) {
1985 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1986 c->argc++;
1987 } else {
1988 sdsfree(argv[j]);
1989 }
1990 }
1991 zfree(argv);
7c49733c 1992 if (c->argc) {
1993 /* Execute the command. If the client is still valid
1994 * after processCommand() return and there is something
1995 * on the query buffer try to process the next command. */
1996 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1997 } else {
1998 /* Nothing to process, argc == 0. Just process the query
1999 * buffer if it's not empty or return to the caller */
2000 if (sdslen(c->querybuf)) goto again;
2001 }
ed9b544e 2002 return;
644fafa3 2003 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
ed9b544e 2004 redisLog(REDIS_DEBUG, "Client protocol error");
2005 freeClient(c);
2006 return;
2007 }
2008 } else {
2009 /* Bulk read handling. Note that if we are at this point
2010 the client already sent a command terminated with a newline,
2011 we are reading the bulk data that is actually the last
2012 argument of the command. */
2013 int qbl = sdslen(c->querybuf);
2014
2015 if (c->bulklen <= qbl) {
2016 /* Copy everything but the final CRLF as final argument */
2017 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
2018 c->argc++;
2019 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
638e42ac 2020 /* Process the command. If the client is still valid after
2021 * the processing and there is more data in the buffer
2022 * try to parse it. */
2023 if (processCommand(c) && sdslen(c->querybuf)) goto again;
ed9b544e 2024 return;
2025 }
2026 }
2027}
2028
638e42ac 2029static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
2030 redisClient *c = (redisClient*) privdata;
2031 char buf[REDIS_IOBUF_LEN];
2032 int nread;
2033 REDIS_NOTUSED(el);
2034 REDIS_NOTUSED(mask);
2035
2036 nread = read(fd, buf, REDIS_IOBUF_LEN);
2037 if (nread == -1) {
2038 if (errno == EAGAIN) {
2039 nread = 0;
2040 } else {
2041 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
2042 freeClient(c);
2043 return;
2044 }
2045 } else if (nread == 0) {
2046 redisLog(REDIS_DEBUG, "Client closed connection");
2047 freeClient(c);
2048 return;
2049 }
2050 if (nread) {
2051 c->querybuf = sdscatlen(c->querybuf, buf, nread);
2052 c->lastinteraction = time(NULL);
2053 } else {
2054 return;
2055 }
2056 processInputBuffer(c);
2057}
2058
ed9b544e 2059static int selectDb(redisClient *c, int id) {
2060 if (id < 0 || id >= server.dbnum)
2061 return REDIS_ERR;
3305306f 2062 c->db = &server.db[id];
ed9b544e 2063 return REDIS_OK;
2064}
2065
40d224a9 2066static void *dupClientReplyValue(void *o) {
2067 incrRefCount((robj*)o);
2068 return 0;
2069}
2070
ed9b544e 2071static redisClient *createClient(int fd) {
2072 redisClient *c = zmalloc(sizeof(*c));
2073
2074 anetNonBlock(NULL,fd);
2075 anetTcpNoDelay(NULL,fd);
2076 if (!c) return NULL;
2077 selectDb(c,0);
2078 c->fd = fd;
2079 c->querybuf = sdsempty();
2080 c->argc = 0;
93ea3759 2081 c->argv = NULL;
ed9b544e 2082 c->bulklen = -1;
e8a74421 2083 c->multibulk = 0;
2084 c->mbargc = 0;
2085 c->mbargv = NULL;
ed9b544e 2086 c->sentlen = 0;
2087 c->flags = 0;
2088 c->lastinteraction = time(NULL);
abcb223e 2089 c->authenticated = 0;
40d224a9 2090 c->replstate = REDIS_REPL_NONE;
6b47e12e 2091 c->reply = listCreate();
4409877e 2092 c->blockingkey = NULL;
ed9b544e 2093 listSetFreeMethod(c->reply,decrRefCount);
40d224a9 2094 listSetDupMethod(c->reply,dupClientReplyValue);
ed9b544e 2095 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
266373b2 2096 readQueryFromClient, c) == AE_ERR) {
ed9b544e 2097 freeClient(c);
2098 return NULL;
2099 }
6b47e12e 2100 listAddNodeTail(server.clients,c);
6e469882 2101 initClientMultiState(c);
ed9b544e 2102 return c;
2103}
2104
2105static void addReply(redisClient *c, robj *obj) {
2106 if (listLength(c->reply) == 0 &&
6208b3a7 2107 (c->replstate == REDIS_REPL_NONE ||
2108 c->replstate == REDIS_REPL_ONLINE) &&
ed9b544e 2109 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
266373b2 2110 sendReplyToClient, c) == AE_ERR) return;
9d65a1bb 2111 listAddNodeTail(c->reply,getDecodedObject(obj));
ed9b544e 2112}
2113
2114static void addReplySds(redisClient *c, sds s) {
2115 robj *o = createObject(REDIS_STRING,s);
2116 addReply(c,o);
2117 decrRefCount(o);
2118}
2119
e2665397 2120static void addReplyDouble(redisClient *c, double d) {
2121 char buf[128];
2122
2123 snprintf(buf,sizeof(buf),"%.17g",d);
682ac724 2124 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
83c6a618 2125 (unsigned long) strlen(buf),buf));
e2665397 2126}
2127
942a3961 2128static void addReplyBulkLen(redisClient *c, robj *obj) {
2129 size_t len;
2130
2131 if (obj->encoding == REDIS_ENCODING_RAW) {
2132 len = sdslen(obj->ptr);
2133 } else {
2134 long n = (long)obj->ptr;
2135
e054afda 2136 /* Compute how many bytes will take this integer as a radix 10 string */
942a3961 2137 len = 1;
2138 if (n < 0) {
2139 len++;
2140 n = -n;
2141 }
2142 while((n = n/10) != 0) {
2143 len++;
2144 }
2145 }
83c6a618 2146 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
942a3961 2147}
2148
ed9b544e 2149static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2150 int cport, cfd;
2151 char cip[128];
285add55 2152 redisClient *c;
ed9b544e 2153 REDIS_NOTUSED(el);
2154 REDIS_NOTUSED(mask);
2155 REDIS_NOTUSED(privdata);
2156
2157 cfd = anetAccept(server.neterr, fd, cip, &cport);
2158 if (cfd == AE_ERR) {
2159 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2160 return;
2161 }
2162 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
285add55 2163 if ((c = createClient(cfd)) == NULL) {
ed9b544e 2164 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2165 close(cfd); /* May be already closed, just ingore errors */
2166 return;
2167 }
285add55 2168 /* If maxclient directive is set and this is one client more... close the
2169 * connection. Note that we create the client instead to check before
2170 * for this condition, since now the socket is already set in nonblocking
2171 * mode and we can send an error for free using the Kernel I/O */
2172 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2173 char *err = "-ERR max number of clients reached\r\n";
2174
2175 /* That's a best effort error message, don't check write errors */
fee803ba 2176 if (write(c->fd,err,strlen(err)) == -1) {
2177 /* Nothing to do, Just to avoid the warning... */
2178 }
285add55 2179 freeClient(c);
2180 return;
2181 }
ed9b544e 2182 server.stat_numconnections++;
2183}
2184
2185/* ======================= Redis objects implementation ===================== */
2186
2187static robj *createObject(int type, void *ptr) {
2188 robj *o;
2189
2190 if (listLength(server.objfreelist)) {
2191 listNode *head = listFirst(server.objfreelist);
2192 o = listNodeValue(head);
2193 listDelNode(server.objfreelist,head);
2194 } else {
2195 o = zmalloc(sizeof(*o));
2196 }
ed9b544e 2197 o->type = type;
942a3961 2198 o->encoding = REDIS_ENCODING_RAW;
ed9b544e 2199 o->ptr = ptr;
2200 o->refcount = 1;
2201 return o;
2202}
2203
2204static robj *createStringObject(char *ptr, size_t len) {
2205 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2206}
2207
2208static robj *createListObject(void) {
2209 list *l = listCreate();
2210
ed9b544e 2211 listSetFreeMethod(l,decrRefCount);
2212 return createObject(REDIS_LIST,l);
2213}
2214
2215static robj *createSetObject(void) {
2216 dict *d = dictCreate(&setDictType,NULL);
ed9b544e 2217 return createObject(REDIS_SET,d);
2218}
2219
1812e024 2220static robj *createZsetObject(void) {
6b47e12e 2221 zset *zs = zmalloc(sizeof(*zs));
2222
2223 zs->dict = dictCreate(&zsetDictType,NULL);
2224 zs->zsl = zslCreate();
2225 return createObject(REDIS_ZSET,zs);
1812e024 2226}
2227
ed9b544e 2228static void freeStringObject(robj *o) {
942a3961 2229 if (o->encoding == REDIS_ENCODING_RAW) {
2230 sdsfree(o->ptr);
2231 }
ed9b544e 2232}
2233
2234static void freeListObject(robj *o) {
2235 listRelease((list*) o->ptr);
2236}
2237
2238static void freeSetObject(robj *o) {
2239 dictRelease((dict*) o->ptr);
2240}
2241
fd8ccf44 2242static void freeZsetObject(robj *o) {
2243 zset *zs = o->ptr;
2244
2245 dictRelease(zs->dict);
2246 zslFree(zs->zsl);
2247 zfree(zs);
2248}
2249
ed9b544e 2250static void freeHashObject(robj *o) {
2251 dictRelease((dict*) o->ptr);
2252}
2253
2254static void incrRefCount(robj *o) {
2255 o->refcount++;
94754ccc 2256#ifdef DEBUG_REFCOUNT
2257 if (o->type == REDIS_STRING)
2258 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2259#endif
ed9b544e 2260}
2261
2262static void decrRefCount(void *obj) {
2263 robj *o = obj;
94754ccc 2264
2265#ifdef DEBUG_REFCOUNT
2266 if (o->type == REDIS_STRING)
2267 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2268#endif
ed9b544e 2269 if (--(o->refcount) == 0) {
2270 switch(o->type) {
2271 case REDIS_STRING: freeStringObject(o); break;
2272 case REDIS_LIST: freeListObject(o); break;
2273 case REDIS_SET: freeSetObject(o); break;
fd8ccf44 2274 case REDIS_ZSET: freeZsetObject(o); break;
ed9b544e 2275 case REDIS_HASH: freeHashObject(o); break;
dfc5e96c 2276 default: redisAssert(0 != 0); break;
ed9b544e 2277 }
2278 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2279 !listAddNodeHead(server.objfreelist,o))
2280 zfree(o);
2281 }
2282}
2283
942a3961 2284static robj *lookupKey(redisDb *db, robj *key) {
2285 dictEntry *de = dictFind(db->dict,key);
2286 return de ? dictGetEntryVal(de) : NULL;
2287}
2288
2289static robj *lookupKeyRead(redisDb *db, robj *key) {
2290 expireIfNeeded(db,key);
2291 return lookupKey(db,key);
2292}
2293
2294static robj *lookupKeyWrite(redisDb *db, robj *key) {
2295 deleteIfVolatile(db,key);
2296 return lookupKey(db,key);
2297}
2298
2299static int deleteKey(redisDb *db, robj *key) {
2300 int retval;
2301
2302 /* We need to protect key from destruction: after the first dictDelete()
2303 * it may happen that 'key' is no longer valid if we don't increment
2304 * it's count. This may happen when we get the object reference directly
2305 * from the hash table with dictRandomKey() or dict iterators */
2306 incrRefCount(key);
2307 if (dictSize(db->expires)) dictDelete(db->expires,key);
2308 retval = dictDelete(db->dict,key);
2309 decrRefCount(key);
2310
2311 return retval == DICT_OK;
2312}
2313
10c43610 2314/* Try to share an object against the shared objects pool */
2315static robj *tryObjectSharing(robj *o) {
2316 struct dictEntry *de;
2317 unsigned long c;
2318
3305306f 2319 if (o == NULL || server.shareobjects == 0) return o;
10c43610 2320
dfc5e96c 2321 redisAssert(o->type == REDIS_STRING);
10c43610 2322 de = dictFind(server.sharingpool,o);
2323 if (de) {
2324 robj *shared = dictGetEntryKey(de);
2325
2326 c = ((unsigned long) dictGetEntryVal(de))+1;
2327 dictGetEntryVal(de) = (void*) c;
2328 incrRefCount(shared);
2329 decrRefCount(o);
2330 return shared;
2331 } else {
2332 /* Here we are using a stream algorihtm: Every time an object is
2333 * shared we increment its count, everytime there is a miss we
2334 * recrement the counter of a random object. If this object reaches
2335 * zero we remove the object and put the current object instead. */
3305306f 2336 if (dictSize(server.sharingpool) >=
10c43610 2337 server.sharingpoolsize) {
2338 de = dictGetRandomKey(server.sharingpool);
dfc5e96c 2339 redisAssert(de != NULL);
10c43610 2340 c = ((unsigned long) dictGetEntryVal(de))-1;
2341 dictGetEntryVal(de) = (void*) c;
2342 if (c == 0) {
2343 dictDelete(server.sharingpool,de->key);
2344 }
2345 } else {
2346 c = 0; /* If the pool is empty we want to add this object */
2347 }
2348 if (c == 0) {
2349 int retval;
2350
2351 retval = dictAdd(server.sharingpool,o,(void*)1);
dfc5e96c 2352 redisAssert(retval == DICT_OK);
10c43610 2353 incrRefCount(o);
2354 }
2355 return o;
2356 }
2357}
2358
724a51b1 2359/* Check if the nul-terminated string 's' can be represented by a long
2360 * (that is, is a number that fits into long without any other space or
2361 * character before or after the digits).
2362 *
2363 * If so, the function returns REDIS_OK and *longval is set to the value
2364 * of the number. Otherwise REDIS_ERR is returned */
f69f2cba 2365static int isStringRepresentableAsLong(sds s, long *longval) {
724a51b1 2366 char buf[32], *endptr;
2367 long value;
2368 int slen;
2369
2370 value = strtol(s, &endptr, 10);
2371 if (endptr[0] != '\0') return REDIS_ERR;
2372 slen = snprintf(buf,32,"%ld",value);
2373
2374 /* If the number converted back into a string is not identical
2375 * then it's not possible to encode the string as integer */
f69f2cba 2376 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
724a51b1 2377 if (longval) *longval = value;
2378 return REDIS_OK;
2379}
2380
942a3961 2381/* Try to encode a string object in order to save space */
2382static int tryObjectEncoding(robj *o) {
2383 long value;
942a3961 2384 sds s = o->ptr;
3305306f 2385
942a3961 2386 if (o->encoding != REDIS_ENCODING_RAW)
2387 return REDIS_ERR; /* Already encoded */
3305306f 2388
942a3961 2389 /* It's not save to encode shared objects: shared objects can be shared
2390 * everywhere in the "object space" of Redis. Encoded objects can only
2391 * appear as "values" (and not, for instance, as keys) */
2392 if (o->refcount > 1) return REDIS_ERR;
3305306f 2393
942a3961 2394 /* Currently we try to encode only strings */
dfc5e96c 2395 redisAssert(o->type == REDIS_STRING);
94754ccc 2396
724a51b1 2397 /* Check if we can represent this string as a long integer */
2398 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
942a3961 2399
2400 /* Ok, this object can be encoded */
2401 o->encoding = REDIS_ENCODING_INT;
2402 sdsfree(o->ptr);
2403 o->ptr = (void*) value;
2404 return REDIS_OK;
2405}
2406
9d65a1bb 2407/* Get a decoded version of an encoded object (returned as a new object).
2408 * If the object is already raw-encoded just increment the ref count. */
2409static robj *getDecodedObject(robj *o) {
942a3961 2410 robj *dec;
2411
9d65a1bb 2412 if (o->encoding == REDIS_ENCODING_RAW) {
2413 incrRefCount(o);
2414 return o;
2415 }
942a3961 2416 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2417 char buf[32];
2418
2419 snprintf(buf,32,"%ld",(long)o->ptr);
2420 dec = createStringObject(buf,strlen(buf));
2421 return dec;
2422 } else {
dfc5e96c 2423 redisAssert(1 != 1);
942a3961 2424 }
3305306f 2425}
2426
d7f43c08 2427/* Compare two string objects via strcmp() or alike.
2428 * Note that the objects may be integer-encoded. In such a case we
2429 * use snprintf() to get a string representation of the numbers on the stack
1fd9bc8a 2430 * and compare the strings, it's much faster than calling getDecodedObject().
2431 *
2432 * Important note: if objects are not integer encoded, but binary-safe strings,
2433 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2434 * binary safe. */
724a51b1 2435static int compareStringObjects(robj *a, robj *b) {
dfc5e96c 2436 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
d7f43c08 2437 char bufa[128], bufb[128], *astr, *bstr;
2438 int bothsds = 1;
724a51b1 2439
e197b441 2440 if (a == b) return 0;
d7f43c08 2441 if (a->encoding != REDIS_ENCODING_RAW) {
2442 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2443 astr = bufa;
2444 bothsds = 0;
724a51b1 2445 } else {
d7f43c08 2446 astr = a->ptr;
724a51b1 2447 }
d7f43c08 2448 if (b->encoding != REDIS_ENCODING_RAW) {
2449 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2450 bstr = bufb;
2451 bothsds = 0;
2452 } else {
2453 bstr = b->ptr;
2454 }
2455 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
724a51b1 2456}
2457
0ea663ea 2458static size_t stringObjectLen(robj *o) {
dfc5e96c 2459 redisAssert(o->type == REDIS_STRING);
0ea663ea 2460 if (o->encoding == REDIS_ENCODING_RAW) {
2461 return sdslen(o->ptr);
2462 } else {
2463 char buf[32];
2464
2465 return snprintf(buf,32,"%ld",(long)o->ptr);
2466 }
2467}
2468
ed9b544e 2469/*============================ DB saving/loading ============================ */
2470
f78fd11b 2471static int rdbSaveType(FILE *fp, unsigned char type) {
2472 if (fwrite(&type,1,1,fp) == 0) return -1;
2473 return 0;
2474}
2475
bb32ede5 2476static int rdbSaveTime(FILE *fp, time_t t) {
2477 int32_t t32 = (int32_t) t;
2478 if (fwrite(&t32,4,1,fp) == 0) return -1;
2479 return 0;
2480}
2481
e3566d4b 2482/* check rdbLoadLen() comments for more info */
f78fd11b 2483static int rdbSaveLen(FILE *fp, uint32_t len) {
2484 unsigned char buf[2];
2485
2486 if (len < (1<<6)) {
2487 /* Save a 6 bit len */
10c43610 2488 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
f78fd11b 2489 if (fwrite(buf,1,1,fp) == 0) return -1;
2490 } else if (len < (1<<14)) {
2491 /* Save a 14 bit len */
10c43610 2492 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
f78fd11b 2493 buf[1] = len&0xFF;
17be1a4a 2494 if (fwrite(buf,2,1,fp) == 0) return -1;
f78fd11b 2495 } else {
2496 /* Save a 32 bit len */
10c43610 2497 buf[0] = (REDIS_RDB_32BITLEN<<6);
f78fd11b 2498 if (fwrite(buf,1,1,fp) == 0) return -1;
2499 len = htonl(len);
2500 if (fwrite(&len,4,1,fp) == 0) return -1;
2501 }
2502 return 0;
2503}
2504
e3566d4b 2505/* String objects in the form "2391" "-100" without any space and with a
2506 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2507 * encoded as integers to save space */
56906eef 2508static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
e3566d4b 2509 long long value;
2510 char *endptr, buf[32];
2511
2512 /* Check if it's possible to encode this value as a number */
2513 value = strtoll(s, &endptr, 10);
2514 if (endptr[0] != '\0') return 0;
2515 snprintf(buf,32,"%lld",value);
2516
2517 /* If the number converted back into a string is not identical
2518 * then it's not possible to encode the string as integer */
2519 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2520
2521 /* Finally check if it fits in our ranges */
2522 if (value >= -(1<<7) && value <= (1<<7)-1) {
2523 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2524 enc[1] = value&0xFF;
2525 return 2;
2526 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2527 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2528 enc[1] = value&0xFF;
2529 enc[2] = (value>>8)&0xFF;
2530 return 3;
2531 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2532 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2533 enc[1] = value&0xFF;
2534 enc[2] = (value>>8)&0xFF;
2535 enc[3] = (value>>16)&0xFF;
2536 enc[4] = (value>>24)&0xFF;
2537 return 5;
2538 } else {
2539 return 0;
2540 }
2541}
2542
774e3047 2543static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2544 unsigned int comprlen, outlen;
2545 unsigned char byte;
2546 void *out;
2547
2548 /* We require at least four bytes compression for this to be worth it */
2549 outlen = sdslen(obj->ptr)-4;
2550 if (outlen <= 0) return 0;
3a2694c4 2551 if ((out = zmalloc(outlen+1)) == NULL) return 0;
774e3047 2552 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2553 if (comprlen == 0) {
88e85998 2554 zfree(out);
774e3047 2555 return 0;
2556 }
2557 /* Data compressed! Let's save it on disk */
2558 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2559 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2560 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2561 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2562 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
88e85998 2563 zfree(out);
774e3047 2564 return comprlen;
2565
2566writeerr:
88e85998 2567 zfree(out);
774e3047 2568 return -1;
2569}
2570
e3566d4b 2571/* Save a string objet as [len][data] on disk. If the object is a string
2572 * representation of an integer value we try to safe it in a special form */
942a3961 2573static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2574 size_t len;
e3566d4b 2575 int enclen;
10c43610 2576
942a3961 2577 len = sdslen(obj->ptr);
2578
774e3047 2579 /* Try integer encoding */
e3566d4b 2580 if (len <= 11) {
2581 unsigned char buf[5];
2582 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2583 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2584 return 0;
2585 }
2586 }
774e3047 2587
2588 /* Try LZF compression - under 20 bytes it's unable to compress even
88e85998 2589 * aaaaaaaaaaaaaaaaaa so skip it */
121f70cf 2590 if (server.rdbcompression && len > 20) {
774e3047 2591 int retval;
2592
2593 retval = rdbSaveLzfStringObject(fp,obj);
2594 if (retval == -1) return -1;
2595 if (retval > 0) return 0;
2596 /* retval == 0 means data can't be compressed, save the old way */
2597 }
2598
2599 /* Store verbatim */
10c43610 2600 if (rdbSaveLen(fp,len) == -1) return -1;
2601 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2602 return 0;
2603}
2604
942a3961 2605/* Like rdbSaveStringObjectRaw() but handle encoded objects */
2606static int rdbSaveStringObject(FILE *fp, robj *obj) {
2607 int retval;
942a3961 2608
9d65a1bb 2609 obj = getDecodedObject(obj);
2610 retval = rdbSaveStringObjectRaw(fp,obj);
2611 decrRefCount(obj);
2612 return retval;
942a3961 2613}
2614
a7866db6 2615/* Save a double value. Doubles are saved as strings prefixed by an unsigned
2616 * 8 bit integer specifing the length of the representation.
2617 * This 8 bit integer has special values in order to specify the following
2618 * conditions:
2619 * 253: not a number
2620 * 254: + inf
2621 * 255: - inf
2622 */
2623static int rdbSaveDoubleValue(FILE *fp, double val) {
2624 unsigned char buf[128];
2625 int len;
2626
2627 if (isnan(val)) {
2628 buf[0] = 253;
2629 len = 1;
2630 } else if (!isfinite(val)) {
2631 len = 1;
2632 buf[0] = (val < 0) ? 255 : 254;
2633 } else {
eaa256ad 2634 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
6c446631 2635 buf[0] = strlen((char*)buf+1);
a7866db6 2636 len = buf[0]+1;
2637 }
2638 if (fwrite(buf,len,1,fp) == 0) return -1;
2639 return 0;
2640}
2641
ed9b544e 2642/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
f78fd11b 2643static int rdbSave(char *filename) {
ed9b544e 2644 dictIterator *di = NULL;
2645 dictEntry *de;
ed9b544e 2646 FILE *fp;
2647 char tmpfile[256];
2648 int j;
bb32ede5 2649 time_t now = time(NULL);
ed9b544e 2650
a3b21203 2651 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
ed9b544e 2652 fp = fopen(tmpfile,"w");
2653 if (!fp) {
2654 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2655 return REDIS_ERR;
2656 }
f78fd11b 2657 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
ed9b544e 2658 for (j = 0; j < server.dbnum; j++) {
bb32ede5 2659 redisDb *db = server.db+j;
2660 dict *d = db->dict;
3305306f 2661 if (dictSize(d) == 0) continue;
ed9b544e 2662 di = dictGetIterator(d);
2663 if (!di) {
2664 fclose(fp);
2665 return REDIS_ERR;
2666 }
2667
2668 /* Write the SELECT DB opcode */
f78fd11b 2669 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2670 if (rdbSaveLen(fp,j) == -1) goto werr;
ed9b544e 2671
2672 /* Iterate this DB writing every entry */
2673 while((de = dictNext(di)) != NULL) {
2674 robj *key = dictGetEntryKey(de);
2675 robj *o = dictGetEntryVal(de);
bb32ede5 2676 time_t expiretime = getExpire(db,key);
2677
2678 /* Save the expire time */
2679 if (expiretime != -1) {
2680 /* If this key is already expired skip it */
2681 if (expiretime < now) continue;
2682 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2683 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2684 }
2685 /* Save the key and associated value */
f78fd11b 2686 if (rdbSaveType(fp,o->type) == -1) goto werr;
10c43610 2687 if (rdbSaveStringObject(fp,key) == -1) goto werr;
f78fd11b 2688 if (o->type == REDIS_STRING) {
ed9b544e 2689 /* Save a string value */
10c43610 2690 if (rdbSaveStringObject(fp,o) == -1) goto werr;
f78fd11b 2691 } else if (o->type == REDIS_LIST) {
ed9b544e 2692 /* Save a list value */
2693 list *list = o->ptr;
6208b3a7 2694 listNode *ln;
ed9b544e 2695
6208b3a7 2696 listRewind(list);
f78fd11b 2697 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
6208b3a7 2698 while((ln = listYield(list))) {
ed9b544e 2699 robj *eleobj = listNodeValue(ln);
f78fd11b 2700
10c43610 2701 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
ed9b544e 2702 }
f78fd11b 2703 } else if (o->type == REDIS_SET) {
ed9b544e 2704 /* Save a set value */
2705 dict *set = o->ptr;
2706 dictIterator *di = dictGetIterator(set);
2707 dictEntry *de;
2708
3305306f 2709 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
ed9b544e 2710 while((de = dictNext(di)) != NULL) {
10c43610 2711 robj *eleobj = dictGetEntryKey(de);
ed9b544e 2712
10c43610 2713 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
ed9b544e 2714 }
2715 dictReleaseIterator(di);
2b59cfdf 2716 } else if (o->type == REDIS_ZSET) {
2717 /* Save a set value */
2718 zset *zs = o->ptr;
2719 dictIterator *di = dictGetIterator(zs->dict);
2720 dictEntry *de;
2721
2722 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2723 while((de = dictNext(di)) != NULL) {
2724 robj *eleobj = dictGetEntryKey(de);
2725 double *score = dictGetEntryVal(de);
2726
2727 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2728 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2729 }
2730 dictReleaseIterator(di);
ed9b544e 2731 } else {
dfc5e96c 2732 redisAssert(0 != 0);
ed9b544e 2733 }
2734 }
2735 dictReleaseIterator(di);
2736 }
2737 /* EOF opcode */
f78fd11b 2738 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2739
2740 /* Make sure data will not remain on the OS's output buffers */
ed9b544e 2741 fflush(fp);
2742 fsync(fileno(fp));
2743 fclose(fp);
2744
2745 /* Use RENAME to make sure the DB file is changed atomically only
2746 * if the generate DB file is ok. */
2747 if (rename(tmpfile,filename) == -1) {
325d1eb4 2748 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
ed9b544e 2749 unlink(tmpfile);
2750 return REDIS_ERR;
2751 }
2752 redisLog(REDIS_NOTICE,"DB saved on disk");
2753 server.dirty = 0;
2754 server.lastsave = time(NULL);
2755 return REDIS_OK;
2756
2757werr:
2758 fclose(fp);
2759 unlink(tmpfile);
2760 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2761 if (di) dictReleaseIterator(di);
2762 return REDIS_ERR;
2763}
2764
f78fd11b 2765static int rdbSaveBackground(char *filename) {
ed9b544e 2766 pid_t childpid;
2767
9d65a1bb 2768 if (server.bgsavechildpid != -1) return REDIS_ERR;
ed9b544e 2769 if ((childpid = fork()) == 0) {
2770 /* Child */
2771 close(server.fd);
f78fd11b 2772 if (rdbSave(filename) == REDIS_OK) {
ed9b544e 2773 exit(0);
2774 } else {
2775 exit(1);
2776 }
2777 } else {
2778 /* Parent */
5a7c647e 2779 if (childpid == -1) {
2780 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2781 strerror(errno));
2782 return REDIS_ERR;
2783 }
ed9b544e 2784 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
9f3c422c 2785 server.bgsavechildpid = childpid;
ed9b544e 2786 return REDIS_OK;
2787 }
2788 return REDIS_OK; /* unreached */
2789}
2790
a3b21203 2791static void rdbRemoveTempFile(pid_t childpid) {
2792 char tmpfile[256];
2793
2794 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2795 unlink(tmpfile);
2796}
2797
f78fd11b 2798static int rdbLoadType(FILE *fp) {
2799 unsigned char type;
7b45bfb2 2800 if (fread(&type,1,1,fp) == 0) return -1;
2801 return type;
2802}
2803
bb32ede5 2804static time_t rdbLoadTime(FILE *fp) {
2805 int32_t t32;
2806 if (fread(&t32,4,1,fp) == 0) return -1;
2807 return (time_t) t32;
2808}
2809
e3566d4b 2810/* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2811 * of this file for a description of how this are stored on disk.
2812 *
2813 * isencoded is set to 1 if the readed length is not actually a length but
2814 * an "encoding type", check the above comments for more info */
2815static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
f78fd11b 2816 unsigned char buf[2];
2817 uint32_t len;
2818
e3566d4b 2819 if (isencoded) *isencoded = 0;
f78fd11b 2820 if (rdbver == 0) {
2821 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2822 return ntohl(len);
2823 } else {
17be1a4a 2824 int type;
2825
f78fd11b 2826 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
17be1a4a 2827 type = (buf[0]&0xC0)>>6;
2828 if (type == REDIS_RDB_6BITLEN) {
f78fd11b 2829 /* Read a 6 bit len */
e3566d4b 2830 return buf[0]&0x3F;
2831 } else if (type == REDIS_RDB_ENCVAL) {
2832 /* Read a 6 bit len encoding type */
2833 if (isencoded) *isencoded = 1;
2834 return buf[0]&0x3F;
17be1a4a 2835 } else if (type == REDIS_RDB_14BITLEN) {
f78fd11b 2836 /* Read a 14 bit len */
2837 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2838 return ((buf[0]&0x3F)<<8)|buf[1];
2839 } else {
2840 /* Read a 32 bit len */
2841 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2842 return ntohl(len);
2843 }
2844 }
f78fd11b 2845}
2846
e3566d4b 2847static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2848 unsigned char enc[4];
2849 long long val;
2850
2851 if (enctype == REDIS_RDB_ENC_INT8) {
2852 if (fread(enc,1,1,fp) == 0) return NULL;
2853 val = (signed char)enc[0];
2854 } else if (enctype == REDIS_RDB_ENC_INT16) {
2855 uint16_t v;
2856 if (fread(enc,2,1,fp) == 0) return NULL;
2857 v = enc[0]|(enc[1]<<8);
2858 val = (int16_t)v;
2859 } else if (enctype == REDIS_RDB_ENC_INT32) {
2860 uint32_t v;
2861 if (fread(enc,4,1,fp) == 0) return NULL;
2862 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2863 val = (int32_t)v;
2864 } else {
2865 val = 0; /* anti-warning */
dfc5e96c 2866 redisAssert(0!=0);
e3566d4b 2867 }
2868 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2869}
2870
88e85998 2871static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2872 unsigned int len, clen;
2873 unsigned char *c = NULL;
2874 sds val = NULL;
2875
2876 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2877 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2878 if ((c = zmalloc(clen)) == NULL) goto err;
2879 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2880 if (fread(c,clen,1,fp) == 0) goto err;
2881 if (lzf_decompress(c,clen,val,len) == 0) goto err;
5109cdff 2882 zfree(c);
88e85998 2883 return createObject(REDIS_STRING,val);
2884err:
2885 zfree(c);
2886 sdsfree(val);
2887 return NULL;
2888}
2889
e3566d4b 2890static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2891 int isencoded;
2892 uint32_t len;
f78fd11b 2893 sds val;
2894
e3566d4b 2895 len = rdbLoadLen(fp,rdbver,&isencoded);
2896 if (isencoded) {
2897 switch(len) {
2898 case REDIS_RDB_ENC_INT8:
2899 case REDIS_RDB_ENC_INT16:
2900 case REDIS_RDB_ENC_INT32:
3305306f 2901 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
88e85998 2902 case REDIS_RDB_ENC_LZF:
2903 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
e3566d4b 2904 default:
dfc5e96c 2905 redisAssert(0!=0);
e3566d4b 2906 }
2907 }
2908
f78fd11b 2909 if (len == REDIS_RDB_LENERR) return NULL;
2910 val = sdsnewlen(NULL,len);
2911 if (len && fread(val,len,1,fp) == 0) {
2912 sdsfree(val);
2913 return NULL;
2914 }
10c43610 2915 return tryObjectSharing(createObject(REDIS_STRING,val));
f78fd11b 2916}
2917
a7866db6 2918/* For information about double serialization check rdbSaveDoubleValue() */
2919static int rdbLoadDoubleValue(FILE *fp, double *val) {
2920 char buf[128];
2921 unsigned char len;
2922
2923 if (fread(&len,1,1,fp) == 0) return -1;
2924 switch(len) {
2925 case 255: *val = R_NegInf; return 0;
2926 case 254: *val = R_PosInf; return 0;
2927 case 253: *val = R_Nan; return 0;
2928 default:
2929 if (fread(buf,len,1,fp) == 0) return -1;
231d758e 2930 buf[len] = '\0';
a7866db6 2931 sscanf(buf, "%lg", val);
2932 return 0;
2933 }
2934}
2935
f78fd11b 2936static int rdbLoad(char *filename) {
ed9b544e 2937 FILE *fp;
f78fd11b 2938 robj *keyobj = NULL;
2939 uint32_t dbid;
bb32ede5 2940 int type, retval, rdbver;
3305306f 2941 dict *d = server.db[0].dict;
bb32ede5 2942 redisDb *db = server.db+0;
f78fd11b 2943 char buf[1024];
bb32ede5 2944 time_t expiretime = -1, now = time(NULL);
2945
ed9b544e 2946 fp = fopen(filename,"r");
2947 if (!fp) return REDIS_ERR;
2948 if (fread(buf,9,1,fp) == 0) goto eoferr;
f78fd11b 2949 buf[9] = '\0';
2950 if (memcmp(buf,"REDIS",5) != 0) {
ed9b544e 2951 fclose(fp);
2952 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2953 return REDIS_ERR;
2954 }
f78fd11b 2955 rdbver = atoi(buf+5);
2956 if (rdbver > 1) {
2957 fclose(fp);
2958 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2959 return REDIS_ERR;
2960 }
ed9b544e 2961 while(1) {
2962 robj *o;
2963
2964 /* Read type. */
f78fd11b 2965 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
bb32ede5 2966 if (type == REDIS_EXPIRETIME) {
2967 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2968 /* We read the time so we need to read the object type again */
2969 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2970 }
ed9b544e 2971 if (type == REDIS_EOF) break;
2972 /* Handle SELECT DB opcode as a special case */
2973 if (type == REDIS_SELECTDB) {
e3566d4b 2974 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2975 goto eoferr;
ed9b544e 2976 if (dbid >= (unsigned)server.dbnum) {
f78fd11b 2977 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
ed9b544e 2978 exit(1);
2979 }
bb32ede5 2980 db = server.db+dbid;
2981 d = db->dict;
ed9b544e 2982 continue;
2983 }
2984 /* Read key */
f78fd11b 2985 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
ed9b544e 2986
2987 if (type == REDIS_STRING) {
2988 /* Read string value */
f78fd11b 2989 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
942a3961 2990 tryObjectEncoding(o);
ed9b544e 2991 } else if (type == REDIS_LIST || type == REDIS_SET) {
2992 /* Read list/set value */
2993 uint32_t listlen;
f78fd11b 2994
e3566d4b 2995 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
f78fd11b 2996 goto eoferr;
ed9b544e 2997 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2998 /* Load every single element of the list/set */
2999 while(listlen--) {
3000 robj *ele;
3001
f78fd11b 3002 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
942a3961 3003 tryObjectEncoding(ele);
ed9b544e 3004 if (type == REDIS_LIST) {
6b47e12e 3005 listAddNodeTail((list*)o->ptr,ele);
ed9b544e 3006 } else {
6b47e12e 3007 dictAdd((dict*)o->ptr,ele,NULL);
ed9b544e 3008 }
ed9b544e 3009 }
2b59cfdf 3010 } else if (type == REDIS_ZSET) {
3011 /* Read list/set value */
3012 uint32_t zsetlen;
3013 zset *zs;
3014
3015 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
3016 goto eoferr;
3017 o = createZsetObject();
3018 zs = o->ptr;
3019 /* Load every single element of the list/set */
3020 while(zsetlen--) {
3021 robj *ele;
3022 double *score = zmalloc(sizeof(double));
3023
3024 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
3025 tryObjectEncoding(ele);
3026 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
3027 dictAdd(zs->dict,ele,score);
3028 zslInsert(zs->zsl,*score,ele);
3029 incrRefCount(ele); /* added to skiplist */
3030 }
ed9b544e 3031 } else {
dfc5e96c 3032 redisAssert(0 != 0);
ed9b544e 3033 }
3034 /* Add the new object in the hash table */
f78fd11b 3035 retval = dictAdd(d,keyobj,o);
ed9b544e 3036 if (retval == DICT_ERR) {
f78fd11b 3037 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
ed9b544e 3038 exit(1);
3039 }
bb32ede5 3040 /* Set the expire time if needed */
3041 if (expiretime != -1) {
3042 setExpire(db,keyobj,expiretime);
3043 /* Delete this key if already expired */
3044 if (expiretime < now) deleteKey(db,keyobj);
3045 expiretime = -1;
3046 }
f78fd11b 3047 keyobj = o = NULL;
ed9b544e 3048 }
3049 fclose(fp);
3050 return REDIS_OK;
3051
3052eoferr: /* unexpected end of file is handled here with a fatal exit */
e3566d4b 3053 if (keyobj) decrRefCount(keyobj);
f80dff62 3054 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
ed9b544e 3055 exit(1);
3056 return REDIS_ERR; /* Just to avoid warning */
3057}
3058
3059/*================================== Commands =============================== */
3060
abcb223e 3061static void authCommand(redisClient *c) {
2e77c2ee 3062 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
abcb223e
BH
3063 c->authenticated = 1;
3064 addReply(c,shared.ok);
3065 } else {
3066 c->authenticated = 0;
fa4c0aba 3067 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
abcb223e
BH
3068 }
3069}
3070
ed9b544e 3071static void pingCommand(redisClient *c) {
3072 addReply(c,shared.pong);
3073}
3074
3075static void echoCommand(redisClient *c) {
942a3961 3076 addReplyBulkLen(c,c->argv[1]);
ed9b544e 3077 addReply(c,c->argv[1]);
3078 addReply(c,shared.crlf);
3079}
3080
3081/*=================================== Strings =============================== */
3082
3083static void setGenericCommand(redisClient *c, int nx) {
3084 int retval;
3085
333fd216 3086 if (nx) deleteIfVolatile(c->db,c->argv[1]);
3305306f 3087 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
ed9b544e 3088 if (retval == DICT_ERR) {
3089 if (!nx) {
3305306f 3090 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
ed9b544e 3091 incrRefCount(c->argv[2]);
3092 } else {
c937aa89 3093 addReply(c,shared.czero);
ed9b544e 3094 return;
3095 }
3096 } else {
3097 incrRefCount(c->argv[1]);
3098 incrRefCount(c->argv[2]);
3099 }
3100 server.dirty++;
3305306f 3101 removeExpire(c->db,c->argv[1]);
c937aa89 3102 addReply(c, nx ? shared.cone : shared.ok);
ed9b544e 3103}
3104
3105static void setCommand(redisClient *c) {
a4d1ba9a 3106 setGenericCommand(c,0);
ed9b544e 3107}
3108
3109static void setnxCommand(redisClient *c) {
a4d1ba9a 3110 setGenericCommand(c,1);
ed9b544e 3111}
3112
322fc7d8 3113static int getGenericCommand(redisClient *c) {
3305306f 3114 robj *o = lookupKeyRead(c->db,c->argv[1]);
3115
3116 if (o == NULL) {
c937aa89 3117 addReply(c,shared.nullbulk);
322fc7d8 3118 return REDIS_OK;
ed9b544e 3119 } else {
ed9b544e 3120 if (o->type != REDIS_STRING) {
c937aa89 3121 addReply(c,shared.wrongtypeerr);
322fc7d8 3122 return REDIS_ERR;
ed9b544e 3123 } else {
942a3961 3124 addReplyBulkLen(c,o);
ed9b544e 3125 addReply(c,o);
3126 addReply(c,shared.crlf);
322fc7d8 3127 return REDIS_OK;
ed9b544e 3128 }
3129 }
3130}
3131
322fc7d8 3132static void getCommand(redisClient *c) {
3133 getGenericCommand(c);
3134}
3135
f6b141c5 3136static void getsetCommand(redisClient *c) {
322fc7d8 3137 if (getGenericCommand(c) == REDIS_ERR) return;
a431eb74 3138 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3139 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3140 } else {
3141 incrRefCount(c->argv[1]);
3142 }
3143 incrRefCount(c->argv[2]);
3144 server.dirty++;
3145 removeExpire(c->db,c->argv[1]);
3146}
3147
70003d28 3148static void mgetCommand(redisClient *c) {
70003d28 3149 int j;
3150
c937aa89 3151 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
70003d28 3152 for (j = 1; j < c->argc; j++) {
3305306f 3153 robj *o = lookupKeyRead(c->db,c->argv[j]);
3154 if (o == NULL) {
c937aa89 3155 addReply(c,shared.nullbulk);
70003d28 3156 } else {
70003d28 3157 if (o->type != REDIS_STRING) {
c937aa89 3158 addReply(c,shared.nullbulk);
70003d28 3159 } else {
942a3961 3160 addReplyBulkLen(c,o);
70003d28 3161 addReply(c,o);
3162 addReply(c,shared.crlf);
3163 }
3164 }
3165 }
3166}
3167
6c446631 3168static void msetGenericCommand(redisClient *c, int nx) {
906573e7 3169 int j, busykeys = 0;
6c446631 3170
3171 if ((c->argc % 2) == 0) {
454d4e43 3172 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
6c446631 3173 return;
3174 }
3175 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3176 * set nothing at all if at least one already key exists. */
3177 if (nx) {
3178 for (j = 1; j < c->argc; j += 2) {
906573e7 3179 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3180 busykeys++;
6c446631 3181 }
3182 }
3183 }
906573e7 3184 if (busykeys) {
3185 addReply(c, shared.czero);
3186 return;
3187 }
6c446631 3188
3189 for (j = 1; j < c->argc; j += 2) {
3190 int retval;
3191
17511391 3192 tryObjectEncoding(c->argv[j+1]);
6c446631 3193 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3194 if (retval == DICT_ERR) {
3195 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3196 incrRefCount(c->argv[j+1]);
3197 } else {
3198 incrRefCount(c->argv[j]);
3199 incrRefCount(c->argv[j+1]);
3200 }
3201 removeExpire(c->db,c->argv[j]);
3202 }
3203 server.dirty += (c->argc-1)/2;
3204 addReply(c, nx ? shared.cone : shared.ok);
3205}
3206
3207static void msetCommand(redisClient *c) {
3208 msetGenericCommand(c,0);
3209}
3210
3211static void msetnxCommand(redisClient *c) {
3212 msetGenericCommand(c,1);
3213}
3214
d68ed120 3215static void incrDecrCommand(redisClient *c, long long incr) {
ed9b544e 3216 long long value;
3217 int retval;
3218 robj *o;
3219
3305306f 3220 o = lookupKeyWrite(c->db,c->argv[1]);
3221 if (o == NULL) {
ed9b544e 3222 value = 0;
3223 } else {
ed9b544e 3224 if (o->type != REDIS_STRING) {
3225 value = 0;
3226 } else {
3227 char *eptr;
3228
942a3961 3229 if (o->encoding == REDIS_ENCODING_RAW)
3230 value = strtoll(o->ptr, &eptr, 10);
3231 else if (o->encoding == REDIS_ENCODING_INT)
3232 value = (long)o->ptr;
3233 else
dfc5e96c 3234 redisAssert(1 != 1);
ed9b544e 3235 }
3236 }
3237
3238 value += incr;
3239 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
942a3961 3240 tryObjectEncoding(o);
3305306f 3241 retval = dictAdd(c->db->dict,c->argv[1],o);
ed9b544e 3242 if (retval == DICT_ERR) {
3305306f 3243 dictReplace(c->db->dict,c->argv[1],o);
3244 removeExpire(c->db,c->argv[1]);
ed9b544e 3245 } else {
3246 incrRefCount(c->argv[1]);
3247 }
3248 server.dirty++;
c937aa89 3249 addReply(c,shared.colon);
ed9b544e 3250 addReply(c,o);
3251 addReply(c,shared.crlf);
3252}
3253
3254static void incrCommand(redisClient *c) {
a4d1ba9a 3255 incrDecrCommand(c,1);
ed9b544e 3256}
3257
3258static void decrCommand(redisClient *c) {
a4d1ba9a 3259 incrDecrCommand(c,-1);
ed9b544e 3260}
3261
3262static void incrbyCommand(redisClient *c) {
d68ed120 3263 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
a4d1ba9a 3264 incrDecrCommand(c,incr);
ed9b544e 3265}
3266
3267static void decrbyCommand(redisClient *c) {
d68ed120 3268 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
a4d1ba9a 3269 incrDecrCommand(c,-incr);
ed9b544e 3270}
3271
3272/* ========================= Type agnostic commands ========================= */
3273
3274static void delCommand(redisClient *c) {
5109cdff 3275 int deleted = 0, j;
3276
3277 for (j = 1; j < c->argc; j++) {
3278 if (deleteKey(c->db,c->argv[j])) {
3279 server.dirty++;
3280 deleted++;
3281 }
3282 }
3283 switch(deleted) {
3284 case 0:
c937aa89 3285 addReply(c,shared.czero);
5109cdff 3286 break;
3287 case 1:
3288 addReply(c,shared.cone);
3289 break;
3290 default:
3291 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3292 break;
ed9b544e 3293 }
3294}
3295
3296static void existsCommand(redisClient *c) {
3305306f 3297 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
ed9b544e 3298}
3299
3300static void selectCommand(redisClient *c) {
3301 int id = atoi(c->argv[1]->ptr);
3302
3303 if (selectDb(c,id) == REDIS_ERR) {
774e3047 3304 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
ed9b544e 3305 } else {
3306 addReply(c,shared.ok);
3307 }
3308}
3309
3310static void randomkeyCommand(redisClient *c) {
3311 dictEntry *de;
3305306f 3312
3313 while(1) {
3314 de = dictGetRandomKey(c->db->dict);
ce7bef07 3315 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3305306f 3316 }
ed9b544e 3317 if (de == NULL) {
ce7bef07 3318 addReply(c,shared.plus);
ed9b544e 3319 addReply(c,shared.crlf);
3320 } else {
c937aa89 3321 addReply(c,shared.plus);
ed9b544e 3322 addReply(c,dictGetEntryKey(de));
3323 addReply(c,shared.crlf);
3324 }
3325}
3326
3327static void keysCommand(redisClient *c) {
3328 dictIterator *di;
3329 dictEntry *de;
3330 sds pattern = c->argv[1]->ptr;
3331 int plen = sdslen(pattern);
682ac724 3332 unsigned long numkeys = 0, keyslen = 0;
ed9b544e 3333 robj *lenobj = createObject(REDIS_STRING,NULL);
3334
3305306f 3335 di = dictGetIterator(c->db->dict);
ed9b544e 3336 addReply(c,lenobj);
3337 decrRefCount(lenobj);
3338 while((de = dictNext(di)) != NULL) {
3339 robj *keyobj = dictGetEntryKey(de);
3305306f 3340
ed9b544e 3341 sds key = keyobj->ptr;
3342 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3343 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3305306f 3344 if (expireIfNeeded(c->db,keyobj) == 0) {
3345 if (numkeys != 0)
3346 addReply(c,shared.space);
3347 addReply(c,keyobj);
3348 numkeys++;
3349 keyslen += sdslen(key);
3350 }
ed9b544e 3351 }
3352 }
3353 dictReleaseIterator(di);
c937aa89 3354 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
ed9b544e 3355 addReply(c,shared.crlf);
3356}
3357
3358static void dbsizeCommand(redisClient *c) {
3359 addReplySds(c,
3305306f 3360 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
ed9b544e 3361}
3362
3363static void lastsaveCommand(redisClient *c) {
3364 addReplySds(c,
c937aa89 3365 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
ed9b544e 3366}
3367
3368static void typeCommand(redisClient *c) {
3305306f 3369 robj *o;
ed9b544e 3370 char *type;
3305306f 3371
3372 o = lookupKeyRead(c->db,c->argv[1]);
3373 if (o == NULL) {
c937aa89 3374 type = "+none";
ed9b544e 3375 } else {
ed9b544e 3376 switch(o->type) {
c937aa89 3377 case REDIS_STRING: type = "+string"; break;
3378 case REDIS_LIST: type = "+list"; break;
3379 case REDIS_SET: type = "+set"; break;
412a8bce 3380 case REDIS_ZSET: type = "+zset"; break;
ed9b544e 3381 default: type = "unknown"; break;
3382 }
3383 }
3384 addReplySds(c,sdsnew(type));
3385 addReply(c,shared.crlf);
3386}
3387
3388static void saveCommand(redisClient *c) {
9d65a1bb 3389 if (server.bgsavechildpid != -1) {
05557f6d 3390 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3391 return;
3392 }
f78fd11b 3393 if (rdbSave(server.dbfilename) == REDIS_OK) {
ed9b544e 3394 addReply(c,shared.ok);
3395 } else {
3396 addReply(c,shared.err);
3397 }
3398}
3399
3400static void bgsaveCommand(redisClient *c) {
9d65a1bb 3401 if (server.bgsavechildpid != -1) {
ed9b544e 3402 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3403 return;
3404 }
f78fd11b 3405 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
49b99ab4 3406 char *status = "+Background saving started\r\n";
3407 addReplySds(c,sdsnew(status));
ed9b544e 3408 } else {
3409 addReply(c,shared.err);
3410 }
3411}
3412
3413static void shutdownCommand(redisClient *c) {
3414 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
a3b21203 3415 /* Kill the saving child if there is a background saving in progress.
3416 We want to avoid race conditions, for instance our saving child may
3417 overwrite the synchronous saving did by SHUTDOWN. */
9d65a1bb 3418 if (server.bgsavechildpid != -1) {
9f3c422c 3419 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3420 kill(server.bgsavechildpid,SIGKILL);
a3b21203 3421 rdbRemoveTempFile(server.bgsavechildpid);
9f3c422c 3422 }
ac945e2d 3423 if (server.appendonly) {
3424 /* Append only file: fsync() the AOF and exit */
3425 fsync(server.appendfd);
3426 exit(0);
ed9b544e 3427 } else {
ac945e2d 3428 /* Snapshotting. Perform a SYNC SAVE and exit */
3429 if (rdbSave(server.dbfilename) == REDIS_OK) {
3430 if (server.daemonize)
3431 unlink(server.pidfile);
3432 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3433 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3434 exit(0);
3435 } else {
3436 /* Ooops.. error saving! The best we can do is to continue operating.
3437 * Note that if there was a background saving process, in the next
3438 * cron() Redis will be notified that the background saving aborted,
3439 * handling special stuff like slaves pending for synchronization... */
3440 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3441 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3442 }
ed9b544e 3443 }
3444}
3445
3446static void renameGenericCommand(redisClient *c, int nx) {
ed9b544e 3447 robj *o;
3448
3449 /* To use the same key as src and dst is probably an error */
3450 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
c937aa89 3451 addReply(c,shared.sameobjecterr);
ed9b544e 3452 return;
3453 }
3454
3305306f 3455 o = lookupKeyWrite(c->db,c->argv[1]);
3456 if (o == NULL) {
c937aa89 3457 addReply(c,shared.nokeyerr);
ed9b544e 3458 return;
3459 }
ed9b544e 3460 incrRefCount(o);
3305306f 3461 deleteIfVolatile(c->db,c->argv[2]);
3462 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
ed9b544e 3463 if (nx) {
3464 decrRefCount(o);
c937aa89 3465 addReply(c,shared.czero);
ed9b544e 3466 return;
3467 }
3305306f 3468 dictReplace(c->db->dict,c->argv[2],o);
ed9b544e 3469 } else {
3470 incrRefCount(c->argv[2]);
3471 }
3305306f 3472 deleteKey(c->db,c->argv[1]);
ed9b544e 3473 server.dirty++;
c937aa89 3474 addReply(c,nx ? shared.cone : shared.ok);
ed9b544e 3475}
3476
3477static void renameCommand(redisClient *c) {
3478 renameGenericCommand(c,0);
3479}
3480
3481static void renamenxCommand(redisClient *c) {
3482 renameGenericCommand(c,1);
3483}
3484
3485static void moveCommand(redisClient *c) {
3305306f 3486 robj *o;
3487 redisDb *src, *dst;
ed9b544e 3488 int srcid;
3489
3490 /* Obtain source and target DB pointers */
3305306f 3491 src = c->db;
3492 srcid = c->db->id;
ed9b544e 3493 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
c937aa89 3494 addReply(c,shared.outofrangeerr);
ed9b544e 3495 return;
3496 }
3305306f 3497 dst = c->db;
3498 selectDb(c,srcid); /* Back to the source DB */
ed9b544e 3499
3500 /* If the user is moving using as target the same
3501 * DB as the source DB it is probably an error. */
3502 if (src == dst) {
c937aa89 3503 addReply(c,shared.sameobjecterr);
ed9b544e 3504 return;
3505 }
3506
3507 /* Check if the element exists and get a reference */
3305306f 3508 o = lookupKeyWrite(c->db,c->argv[1]);
3509 if (!o) {
c937aa89 3510 addReply(c,shared.czero);
ed9b544e 3511 return;
3512 }
3513
3514 /* Try to add the element to the target DB */
3305306f 3515 deleteIfVolatile(dst,c->argv[1]);
3516 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
c937aa89 3517 addReply(c,shared.czero);
ed9b544e 3518 return;
3519 }
3305306f 3520 incrRefCount(c->argv[1]);
ed9b544e 3521 incrRefCount(o);
3522
3523 /* OK! key moved, free the entry in the source DB */
3305306f 3524 deleteKey(src,c->argv[1]);
ed9b544e 3525 server.dirty++;
c937aa89 3526 addReply(c,shared.cone);
ed9b544e 3527}
3528
3529/* =================================== Lists ================================ */
3530static void pushGenericCommand(redisClient *c, int where) {
3531 robj *lobj;
ed9b544e 3532 list *list;
3305306f 3533
3534 lobj = lookupKeyWrite(c->db,c->argv[1]);
3535 if (lobj == NULL) {
95242ab5 3536 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3537 addReply(c,shared.ok);
3538 return;
3539 }
ed9b544e 3540 lobj = createListObject();
3541 list = lobj->ptr;
3542 if (where == REDIS_HEAD) {
6b47e12e 3543 listAddNodeHead(list,c->argv[2]);
ed9b544e 3544 } else {
6b47e12e 3545 listAddNodeTail(list,c->argv[2]);
ed9b544e 3546 }
3305306f 3547 dictAdd(c->db->dict,c->argv[1],lobj);
ed9b544e 3548 incrRefCount(c->argv[1]);
3549 incrRefCount(c->argv[2]);
3550 } else {
ed9b544e 3551 if (lobj->type != REDIS_LIST) {
3552 addReply(c,shared.wrongtypeerr);
3553 return;
3554 }
95242ab5 3555 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3556 addReply(c,shared.ok);
3557 return;
3558 }
ed9b544e 3559 list = lobj->ptr;
3560 if (where == REDIS_HEAD) {
6b47e12e 3561 listAddNodeHead(list,c->argv[2]);
ed9b544e 3562 } else {
6b47e12e 3563 listAddNodeTail(list,c->argv[2]);
ed9b544e 3564 }
3565 incrRefCount(c->argv[2]);
3566 }
3567 server.dirty++;
3568 addReply(c,shared.ok);
3569}
3570
3571static void lpushCommand(redisClient *c) {
3572 pushGenericCommand(c,REDIS_HEAD);
3573}
3574
3575static void rpushCommand(redisClient *c) {
3576 pushGenericCommand(c,REDIS_TAIL);
3577}
3578
3579static void llenCommand(redisClient *c) {
3305306f 3580 robj *o;
ed9b544e 3581 list *l;
3582
3305306f 3583 o = lookupKeyRead(c->db,c->argv[1]);
3584 if (o == NULL) {
c937aa89 3585 addReply(c,shared.czero);
ed9b544e 3586 return;
3587 } else {
ed9b544e 3588 if (o->type != REDIS_LIST) {
c937aa89 3589 addReply(c,shared.wrongtypeerr);
ed9b544e 3590 } else {
3591 l = o->ptr;
c937aa89 3592 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
ed9b544e 3593 }
3594 }
3595}
3596
3597static void lindexCommand(redisClient *c) {
3305306f 3598 robj *o;
ed9b544e 3599 int index = atoi(c->argv[2]->ptr);
3600
3305306f 3601 o = lookupKeyRead(c->db,c->argv[1]);
3602 if (o == NULL) {
c937aa89 3603 addReply(c,shared.nullbulk);
ed9b544e 3604 } else {
ed9b544e 3605 if (o->type != REDIS_LIST) {
c937aa89 3606 addReply(c,shared.wrongtypeerr);
ed9b544e 3607 } else {
3608 list *list = o->ptr;
3609 listNode *ln;
3610
3611 ln = listIndex(list, index);
3612 if (ln == NULL) {
c937aa89 3613 addReply(c,shared.nullbulk);
ed9b544e 3614 } else {
3615 robj *ele = listNodeValue(ln);
942a3961 3616 addReplyBulkLen(c,ele);
ed9b544e 3617 addReply(c,ele);
3618 addReply(c,shared.crlf);
3619 }
3620 }
3621 }
3622}
3623
3624static void lsetCommand(redisClient *c) {
3305306f 3625 robj *o;
ed9b544e 3626 int index = atoi(c->argv[2]->ptr);
3627
3305306f 3628 o = lookupKeyWrite(c->db,c->argv[1]);
3629 if (o == NULL) {
ed9b544e 3630 addReply(c,shared.nokeyerr);
3631 } else {
ed9b544e 3632 if (o->type != REDIS_LIST) {
3633 addReply(c,shared.wrongtypeerr);
3634 } else {
3635 list *list = o->ptr;
3636 listNode *ln;
3637
3638 ln = listIndex(list, index);
3639 if (ln == NULL) {
c937aa89 3640 addReply(c,shared.outofrangeerr);
ed9b544e 3641 } else {
3642 robj *ele = listNodeValue(ln);
3643
3644 decrRefCount(ele);
3645 listNodeValue(ln) = c->argv[3];
3646 incrRefCount(c->argv[3]);
3647 addReply(c,shared.ok);
3648 server.dirty++;
3649 }
3650 }
3651 }
3652}
3653
3654static void popGenericCommand(redisClient *c, int where) {
3305306f 3655 robj *o;
3656
3657 o = lookupKeyWrite(c->db,c->argv[1]);
3658 if (o == NULL) {
c937aa89 3659 addReply(c,shared.nullbulk);
ed9b544e 3660 } else {
ed9b544e 3661 if (o->type != REDIS_LIST) {
c937aa89 3662 addReply(c,shared.wrongtypeerr);
ed9b544e 3663 } else {
3664 list *list = o->ptr;
3665 listNode *ln;
3666
3667 if (where == REDIS_HEAD)
3668 ln = listFirst(list);
3669 else
3670 ln = listLast(list);
3671
3672 if (ln == NULL) {
c937aa89 3673 addReply(c,shared.nullbulk);
ed9b544e 3674 } else {
3675 robj *ele = listNodeValue(ln);
942a3961 3676 addReplyBulkLen(c,ele);
ed9b544e 3677 addReply(c,ele);
3678 addReply(c,shared.crlf);
3679 listDelNode(list,ln);
3680 server.dirty++;
3681 }
3682 }
3683 }
3684}
3685
3686static void lpopCommand(redisClient *c) {
3687 popGenericCommand(c,REDIS_HEAD);
3688}
3689
3690static void rpopCommand(redisClient *c) {
3691 popGenericCommand(c,REDIS_TAIL);
3692}
3693
3694static void lrangeCommand(redisClient *c) {
3305306f 3695 robj *o;
ed9b544e 3696 int start = atoi(c->argv[2]->ptr);
3697 int end = atoi(c->argv[3]->ptr);
3305306f 3698
3699 o = lookupKeyRead(c->db,c->argv[1]);
3700 if (o == NULL) {
c937aa89 3701 addReply(c,shared.nullmultibulk);
ed9b544e 3702 } else {
ed9b544e 3703 if (o->type != REDIS_LIST) {
c937aa89 3704 addReply(c,shared.wrongtypeerr);
ed9b544e 3705 } else {
3706 list *list = o->ptr;
3707 listNode *ln;
3708 int llen = listLength(list);
3709 int rangelen, j;
3710 robj *ele;
3711
3712 /* convert negative indexes */
3713 if (start < 0) start = llen+start;
3714 if (end < 0) end = llen+end;
3715 if (start < 0) start = 0;
3716 if (end < 0) end = 0;
3717
3718 /* indexes sanity checks */
3719 if (start > end || start >= llen) {
3720 /* Out of range start or start > end result in empty list */
c937aa89 3721 addReply(c,shared.emptymultibulk);
ed9b544e 3722 return;
3723 }
3724 if (end >= llen) end = llen-1;
3725 rangelen = (end-start)+1;
3726
3727 /* Return the result in form of a multi-bulk reply */
3728 ln = listIndex(list, start);
c937aa89 3729 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
ed9b544e 3730 for (j = 0; j < rangelen; j++) {
3731 ele = listNodeValue(ln);
942a3961 3732 addReplyBulkLen(c,ele);
ed9b544e 3733 addReply(c,ele);
3734 addReply(c,shared.crlf);
3735 ln = ln->next;
3736 }
3737 }
3738 }
3739}
3740
3741static void ltrimCommand(redisClient *c) {
3305306f 3742 robj *o;
ed9b544e 3743 int start = atoi(c->argv[2]->ptr);
3744 int end = atoi(c->argv[3]->ptr);
3745
3305306f 3746 o = lookupKeyWrite(c->db,c->argv[1]);
3747 if (o == NULL) {
ab9d4cb1 3748 addReply(c,shared.ok);
ed9b544e 3749 } else {
ed9b544e 3750 if (o->type != REDIS_LIST) {
3751 addReply(c,shared.wrongtypeerr);
3752 } else {
3753 list *list = o->ptr;
3754 listNode *ln;
3755 int llen = listLength(list);
3756 int j, ltrim, rtrim;
3757
3758 /* convert negative indexes */
3759 if (start < 0) start = llen+start;
3760 if (end < 0) end = llen+end;
3761 if (start < 0) start = 0;
3762 if (end < 0) end = 0;
3763
3764 /* indexes sanity checks */
3765 if (start > end || start >= llen) {
3766 /* Out of range start or start > end result in empty list */
3767 ltrim = llen;
3768 rtrim = 0;
3769 } else {
3770 if (end >= llen) end = llen-1;
3771 ltrim = start;
3772 rtrim = llen-end-1;
3773 }
3774
3775 /* Remove list elements to perform the trim */
3776 for (j = 0; j < ltrim; j++) {
3777 ln = listFirst(list);
3778 listDelNode(list,ln);
3779 }
3780 for (j = 0; j < rtrim; j++) {
3781 ln = listLast(list);
3782 listDelNode(list,ln);
3783 }
ed9b544e 3784 server.dirty++;
e59229a2 3785 addReply(c,shared.ok);
ed9b544e 3786 }
3787 }
3788}
3789
3790static void lremCommand(redisClient *c) {
3305306f 3791 robj *o;
ed9b544e 3792
3305306f 3793 o = lookupKeyWrite(c->db,c->argv[1]);
3794 if (o == NULL) {
33c08b39 3795 addReply(c,shared.czero);
ed9b544e 3796 } else {
ed9b544e 3797 if (o->type != REDIS_LIST) {
c937aa89 3798 addReply(c,shared.wrongtypeerr);
ed9b544e 3799 } else {
3800 list *list = o->ptr;
3801 listNode *ln, *next;
3802 int toremove = atoi(c->argv[2]->ptr);
3803 int removed = 0;
3804 int fromtail = 0;
3805
3806 if (toremove < 0) {
3807 toremove = -toremove;
3808 fromtail = 1;
3809 }
3810 ln = fromtail ? list->tail : list->head;
3811 while (ln) {
ed9b544e 3812 robj *ele = listNodeValue(ln);
a4d1ba9a 3813
3814 next = fromtail ? ln->prev : ln->next;
724a51b1 3815 if (compareStringObjects(ele,c->argv[3]) == 0) {
ed9b544e 3816 listDelNode(list,ln);
3817 server.dirty++;
3818 removed++;
3819 if (toremove && removed == toremove) break;
3820 }
3821 ln = next;
3822 }
c937aa89 3823 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
ed9b544e 3824 }
3825 }
3826}
3827
12f9d551 3828/* This is the semantic of this command:
0f5f7e9a 3829 * RPOPLPUSH srclist dstlist:
12f9d551 3830 * IF LLEN(srclist) > 0
3831 * element = RPOP srclist
3832 * LPUSH dstlist element
3833 * RETURN element
3834 * ELSE
3835 * RETURN nil
3836 * END
3837 * END
3838 *
3839 * The idea is to be able to get an element from a list in a reliable way
3840 * since the element is not just returned but pushed against another list
3841 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3842 */
0f5f7e9a 3843static void rpoplpushcommand(redisClient *c) {
12f9d551 3844 robj *sobj;
3845
3846 sobj = lookupKeyWrite(c->db,c->argv[1]);
3847 if (sobj == NULL) {
3848 addReply(c,shared.nullbulk);
3849 } else {
3850 if (sobj->type != REDIS_LIST) {
3851 addReply(c,shared.wrongtypeerr);
3852 } else {
3853 list *srclist = sobj->ptr;
3854 listNode *ln = listLast(srclist);
3855
3856 if (ln == NULL) {
3857 addReply(c,shared.nullbulk);
3858 } else {
3859 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3860 robj *ele = listNodeValue(ln);
3861 list *dstlist;
3862
3863 if (dobj == NULL) {
3864
3865 /* Create the list if the key does not exist */
3866 dobj = createListObject();
3867 dictAdd(c->db->dict,c->argv[2],dobj);
3868 incrRefCount(c->argv[2]);
3869 } else if (dobj->type != REDIS_LIST) {
3870 addReply(c,shared.wrongtypeerr);
3871 return;
3872 }
3873 /* Add the element to the target list */
3874 dstlist = dobj->ptr;
3875 listAddNodeHead(dstlist,ele);
3876 incrRefCount(ele);
3877
3878 /* Send the element to the client as reply as well */
3879 addReplyBulkLen(c,ele);
3880 addReply(c,ele);
3881 addReply(c,shared.crlf);
3882
3883 /* Finally remove the element from the source list */
3884 listDelNode(srclist,ln);
3885 server.dirty++;
3886 }
3887 }
3888 }
3889}
3890
3891
ed9b544e 3892/* ==================================== Sets ================================ */
3893
3894static void saddCommand(redisClient *c) {
ed9b544e 3895 robj *set;
3896
3305306f 3897 set = lookupKeyWrite(c->db,c->argv[1]);
3898 if (set == NULL) {
ed9b544e 3899 set = createSetObject();
3305306f 3900 dictAdd(c->db->dict,c->argv[1],set);
ed9b544e 3901 incrRefCount(c->argv[1]);
3902 } else {
ed9b544e 3903 if (set->type != REDIS_SET) {
c937aa89 3904 addReply(c,shared.wrongtypeerr);
ed9b544e 3905 return;
3906 }
3907 }
3908 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3909 incrRefCount(c->argv[2]);
3910 server.dirty++;
c937aa89 3911 addReply(c,shared.cone);
ed9b544e 3912 } else {
c937aa89 3913 addReply(c,shared.czero);
ed9b544e 3914 }
3915}
3916
3917static void sremCommand(redisClient *c) {
3305306f 3918 robj *set;
ed9b544e 3919
3305306f 3920 set = lookupKeyWrite(c->db,c->argv[1]);
3921 if (set == NULL) {
c937aa89 3922 addReply(c,shared.czero);
ed9b544e 3923 } else {
ed9b544e 3924 if (set->type != REDIS_SET) {
c937aa89 3925 addReply(c,shared.wrongtypeerr);
ed9b544e 3926 return;
3927 }
3928 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3929 server.dirty++;
12fea928 3930 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
c937aa89 3931 addReply(c,shared.cone);
ed9b544e 3932 } else {
c937aa89 3933 addReply(c,shared.czero);
ed9b544e 3934 }
3935 }
3936}
3937
a4460ef4 3938static void smoveCommand(redisClient *c) {
3939 robj *srcset, *dstset;
3940
3941 srcset = lookupKeyWrite(c->db,c->argv[1]);
3942 dstset = lookupKeyWrite(c->db,c->argv[2]);
3943
3944 /* If the source key does not exist return 0, if it's of the wrong type
3945 * raise an error */
3946 if (srcset == NULL || srcset->type != REDIS_SET) {
3947 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3948 return;
3949 }
3950 /* Error if the destination key is not a set as well */
3951 if (dstset && dstset->type != REDIS_SET) {
3952 addReply(c,shared.wrongtypeerr);
3953 return;
3954 }
3955 /* Remove the element from the source set */
3956 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3957 /* Key not found in the src set! return zero */
3958 addReply(c,shared.czero);
3959 return;
3960 }
3961 server.dirty++;
3962 /* Add the element to the destination set */
3963 if (!dstset) {
3964 dstset = createSetObject();
3965 dictAdd(c->db->dict,c->argv[2],dstset);
3966 incrRefCount(c->argv[2]);
3967 }
3968 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3969 incrRefCount(c->argv[3]);
3970 addReply(c,shared.cone);
3971}
3972
ed9b544e 3973static void sismemberCommand(redisClient *c) {
3305306f 3974 robj *set;
ed9b544e 3975
3305306f 3976 set = lookupKeyRead(c->db,c->argv[1]);
3977 if (set == NULL) {
c937aa89 3978 addReply(c,shared.czero);
ed9b544e 3979 } else {
ed9b544e 3980 if (set->type != REDIS_SET) {
c937aa89 3981 addReply(c,shared.wrongtypeerr);
ed9b544e 3982 return;
3983 }
3984 if (dictFind(set->ptr,c->argv[2]))
c937aa89 3985 addReply(c,shared.cone);
ed9b544e 3986 else
c937aa89 3987 addReply(c,shared.czero);
ed9b544e 3988 }
3989}
3990
3991static void scardCommand(redisClient *c) {
3305306f 3992 robj *o;
ed9b544e 3993 dict *s;
3994
3305306f 3995 o = lookupKeyRead(c->db,c->argv[1]);
3996 if (o == NULL) {
c937aa89 3997 addReply(c,shared.czero);
ed9b544e 3998 return;
3999 } else {
ed9b544e 4000 if (o->type != REDIS_SET) {
c937aa89 4001 addReply(c,shared.wrongtypeerr);
ed9b544e 4002 } else {
4003 s = o->ptr;
682ac724 4004 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
3305306f 4005 dictSize(s)));
ed9b544e 4006 }
4007 }
4008}
4009
12fea928 4010static void spopCommand(redisClient *c) {
4011 robj *set;
4012 dictEntry *de;
4013
4014 set = lookupKeyWrite(c->db,c->argv[1]);
4015 if (set == NULL) {
4016 addReply(c,shared.nullbulk);
4017 } else {
4018 if (set->type != REDIS_SET) {
4019 addReply(c,shared.wrongtypeerr);
4020 return;
4021 }
4022 de = dictGetRandomKey(set->ptr);
4023 if (de == NULL) {
4024 addReply(c,shared.nullbulk);
4025 } else {
4026 robj *ele = dictGetEntryKey(de);
4027
942a3961 4028 addReplyBulkLen(c,ele);
12fea928 4029 addReply(c,ele);
4030 addReply(c,shared.crlf);
4031 dictDelete(set->ptr,ele);
4032 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
4033 server.dirty++;
4034 }
4035 }
4036}
4037
2abb95a9 4038static void srandmemberCommand(redisClient *c) {
4039 robj *set;
4040 dictEntry *de;
4041
4042 set = lookupKeyRead(c->db,c->argv[1]);
4043 if (set == NULL) {
4044 addReply(c,shared.nullbulk);
4045 } else {
4046 if (set->type != REDIS_SET) {
4047 addReply(c,shared.wrongtypeerr);
4048 return;
4049 }
4050 de = dictGetRandomKey(set->ptr);
4051 if (de == NULL) {
4052 addReply(c,shared.nullbulk);
4053 } else {
4054 robj *ele = dictGetEntryKey(de);
4055
4056 addReplyBulkLen(c,ele);
4057 addReply(c,ele);
4058 addReply(c,shared.crlf);
4059 }
4060 }
4061}
4062
ed9b544e 4063static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
4064 dict **d1 = (void*) s1, **d2 = (void*) s2;
4065
3305306f 4066 return dictSize(*d1)-dictSize(*d2);
ed9b544e 4067}
4068
682ac724 4069static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
ed9b544e 4070 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4071 dictIterator *di;
4072 dictEntry *de;
4073 robj *lenobj = NULL, *dstset = NULL;
682ac724 4074 unsigned long j, cardinality = 0;
ed9b544e 4075
ed9b544e 4076 for (j = 0; j < setsnum; j++) {
4077 robj *setobj;
3305306f 4078
4079 setobj = dstkey ?
4080 lookupKeyWrite(c->db,setskeys[j]) :
4081 lookupKeyRead(c->db,setskeys[j]);
4082 if (!setobj) {
ed9b544e 4083 zfree(dv);
5faa6025 4084 if (dstkey) {
fdcaae84 4085 if (deleteKey(c->db,dstkey))
4086 server.dirty++;
0d36ded0 4087 addReply(c,shared.czero);
5faa6025 4088 } else {
4089 addReply(c,shared.nullmultibulk);
4090 }
ed9b544e 4091 return;
4092 }
ed9b544e 4093 if (setobj->type != REDIS_SET) {
4094 zfree(dv);
c937aa89 4095 addReply(c,shared.wrongtypeerr);
ed9b544e 4096 return;
4097 }
4098 dv[j] = setobj->ptr;
4099 }
4100 /* Sort sets from the smallest to largest, this will improve our
4101 * algorithm's performace */
4102 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
4103
4104 /* The first thing we should output is the total number of elements...
4105 * since this is a multi-bulk write, but at this stage we don't know
4106 * the intersection set size, so we use a trick, append an empty object
4107 * to the output list and save the pointer to later modify it with the
4108 * right length */
4109 if (!dstkey) {
4110 lenobj = createObject(REDIS_STRING,NULL);
4111 addReply(c,lenobj);
4112 decrRefCount(lenobj);
4113 } else {
4114 /* If we have a target key where to store the resulting set
4115 * create this key with an empty set inside */
4116 dstset = createSetObject();
ed9b544e 4117 }
4118
4119 /* Iterate all the elements of the first (smallest) set, and test
4120 * the element against all the other sets, if at least one set does
4121 * not include the element it is discarded */
4122 di = dictGetIterator(dv[0]);
ed9b544e 4123
4124 while((de = dictNext(di)) != NULL) {
4125 robj *ele;
4126
4127 for (j = 1; j < setsnum; j++)
4128 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4129 if (j != setsnum)
4130 continue; /* at least one set does not contain the member */
4131 ele = dictGetEntryKey(de);
4132 if (!dstkey) {
942a3961 4133 addReplyBulkLen(c,ele);
ed9b544e 4134 addReply(c,ele);
4135 addReply(c,shared.crlf);
4136 cardinality++;
4137 } else {
4138 dictAdd(dstset->ptr,ele,NULL);
4139 incrRefCount(ele);
4140 }
4141 }
4142 dictReleaseIterator(di);
4143
83cdfe18
AG
4144 if (dstkey) {
4145 /* Store the resulting set into the target */
4146 deleteKey(c->db,dstkey);
4147 dictAdd(c->db->dict,dstkey,dstset);
4148 incrRefCount(dstkey);
4149 }
4150
40d224a9 4151 if (!dstkey) {
682ac724 4152 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
40d224a9 4153 } else {
682ac724 4154 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
03fd01c7 4155 dictSize((dict*)dstset->ptr)));
40d224a9 4156 server.dirty++;
4157 }
ed9b544e 4158 zfree(dv);
4159}
4160
4161static void sinterCommand(redisClient *c) {
4162 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4163}
4164
4165static void sinterstoreCommand(redisClient *c) {
4166 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4167}
4168
f4f56e1d 4169#define REDIS_OP_UNION 0
4170#define REDIS_OP_DIFF 1
4171
4172static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
40d224a9 4173 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4174 dictIterator *di;
4175 dictEntry *de;
f4f56e1d 4176 robj *dstset = NULL;
40d224a9 4177 int j, cardinality = 0;
4178
40d224a9 4179 for (j = 0; j < setsnum; j++) {
4180 robj *setobj;
4181
4182 setobj = dstkey ?
4183 lookupKeyWrite(c->db,setskeys[j]) :
4184 lookupKeyRead(c->db,setskeys[j]);
4185 if (!setobj) {
4186 dv[j] = NULL;
4187 continue;
4188 }
4189 if (setobj->type != REDIS_SET) {
4190 zfree(dv);
4191 addReply(c,shared.wrongtypeerr);
4192 return;
4193 }
4194 dv[j] = setobj->ptr;
4195 }
4196
4197 /* We need a temp set object to store our union. If the dstkey
4198 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4199 * this set object will be the resulting object to set into the target key*/
4200 dstset = createSetObject();
4201
40d224a9 4202 /* Iterate all the elements of all the sets, add every element a single
4203 * time to the result set */
4204 for (j = 0; j < setsnum; j++) {
51829ed3 4205 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
40d224a9 4206 if (!dv[j]) continue; /* non existing keys are like empty sets */
4207
4208 di = dictGetIterator(dv[j]);
40d224a9 4209
4210 while((de = dictNext(di)) != NULL) {
4211 robj *ele;
4212
4213 /* dictAdd will not add the same element multiple times */
4214 ele = dictGetEntryKey(de);
f4f56e1d 4215 if (op == REDIS_OP_UNION || j == 0) {
4216 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4217 incrRefCount(ele);
40d224a9 4218 cardinality++;
4219 }
f4f56e1d 4220 } else if (op == REDIS_OP_DIFF) {
4221 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4222 cardinality--;
4223 }
40d224a9 4224 }
4225 }
4226 dictReleaseIterator(di);
51829ed3
AG
4227
4228 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
40d224a9 4229 }
4230
f4f56e1d 4231 /* Output the content of the resulting set, if not in STORE mode */
4232 if (!dstkey) {
4233 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4234 di = dictGetIterator(dstset->ptr);
f4f56e1d 4235 while((de = dictNext(di)) != NULL) {
4236 robj *ele;
4237
4238 ele = dictGetEntryKey(de);
942a3961 4239 addReplyBulkLen(c,ele);
f4f56e1d 4240 addReply(c,ele);
4241 addReply(c,shared.crlf);
4242 }
4243 dictReleaseIterator(di);
83cdfe18
AG
4244 } else {
4245 /* If we have a target key where to store the resulting set
4246 * create this key with the result set inside */
4247 deleteKey(c->db,dstkey);
4248 dictAdd(c->db->dict,dstkey,dstset);
4249 incrRefCount(dstkey);
f4f56e1d 4250 }
4251
4252 /* Cleanup */
40d224a9 4253 if (!dstkey) {
40d224a9 4254 decrRefCount(dstset);
4255 } else {
682ac724 4256 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
03fd01c7 4257 dictSize((dict*)dstset->ptr)));
40d224a9 4258 server.dirty++;
4259 }
4260 zfree(dv);
4261}
4262
4263static void sunionCommand(redisClient *c) {
f4f56e1d 4264 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
40d224a9 4265}
4266
4267static void sunionstoreCommand(redisClient *c) {
f4f56e1d 4268 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4269}
4270
4271static void sdiffCommand(redisClient *c) {
4272 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4273}
4274
4275static void sdiffstoreCommand(redisClient *c) {
4276 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
40d224a9 4277}
4278
6b47e12e 4279/* ==================================== ZSets =============================== */
4280
4281/* ZSETs are ordered sets using two data structures to hold the same elements
4282 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4283 * data structure.
4284 *
4285 * The elements are added to an hash table mapping Redis objects to scores.
4286 * At the same time the elements are added to a skip list mapping scores
4287 * to Redis objects (so objects are sorted by scores in this "view"). */
4288
4289/* This skiplist implementation is almost a C translation of the original
4290 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4291 * Alternative to Balanced Trees", modified in three ways:
4292 * a) this implementation allows for repeated values.
4293 * b) the comparison is not just by key (our 'score') but by satellite data.
4294 * c) there is a back pointer, so it's a doubly linked list with the back
4295 * pointers being only at "level 1". This allows to traverse the list
4296 * from tail to head, useful for ZREVRANGE. */
4297
4298static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4299 zskiplistNode *zn = zmalloc(sizeof(*zn));
4300
4301 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4302 zn->score = score;
4303 zn->obj = obj;
4304 return zn;
4305}
4306
4307static zskiplist *zslCreate(void) {
4308 int j;
4309 zskiplist *zsl;
4310
4311 zsl = zmalloc(sizeof(*zsl));
4312 zsl->level = 1;
cc812361 4313 zsl->length = 0;
6b47e12e 4314 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4315 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4316 zsl->header->forward[j] = NULL;
e3870fab 4317 zsl->header->backward = NULL;
4318 zsl->tail = NULL;
6b47e12e 4319 return zsl;
4320}
4321
fd8ccf44 4322static void zslFreeNode(zskiplistNode *node) {
4323 decrRefCount(node->obj);
ad807e6f 4324 zfree(node->forward);
fd8ccf44 4325 zfree(node);
4326}
4327
4328static void zslFree(zskiplist *zsl) {
ad807e6f 4329 zskiplistNode *node = zsl->header->forward[0], *next;
fd8ccf44 4330
ad807e6f 4331 zfree(zsl->header->forward);
4332 zfree(zsl->header);
fd8ccf44 4333 while(node) {
599379dd 4334 next = node->forward[0];
fd8ccf44 4335 zslFreeNode(node);
4336 node = next;
4337 }
ad807e6f 4338 zfree(zsl);
fd8ccf44 4339}
4340
6b47e12e 4341static int zslRandomLevel(void) {
4342 int level = 1;
4343 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4344 level += 1;
4345 return level;
4346}
4347
4348static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4349 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4350 int i, level;
4351
4352 x = zsl->header;
4353 for (i = zsl->level-1; i >= 0; i--) {
9d60e6e4 4354 while (x->forward[i] &&
4355 (x->forward[i]->score < score ||
4356 (x->forward[i]->score == score &&
4357 compareStringObjects(x->forward[i]->obj,obj) < 0)))
6b47e12e 4358 x = x->forward[i];
4359 update[i] = x;
4360 }
6b47e12e 4361 /* we assume the key is not already inside, since we allow duplicated
4362 * scores, and the re-insertion of score and redis object should never
4363 * happpen since the caller of zslInsert() should test in the hash table
4364 * if the element is already inside or not. */
4365 level = zslRandomLevel();
4366 if (level > zsl->level) {
4367 for (i = zsl->level; i < level; i++)
4368 update[i] = zsl->header;
4369 zsl->level = level;
4370 }
4371 x = zslCreateNode(level,score,obj);
4372 for (i = 0; i < level; i++) {
4373 x->forward[i] = update[i]->forward[i];
4374 update[i]->forward[i] = x;
4375 }
bb975144 4376 x->backward = (update[0] == zsl->header) ? NULL : update[0];
e3870fab 4377 if (x->forward[0])
4378 x->forward[0]->backward = x;
4379 else
4380 zsl->tail = x;
cc812361 4381 zsl->length++;
6b47e12e 4382}
4383
50c55df5 4384/* Delete an element with matching score/object from the skiplist. */
fd8ccf44 4385static int zslDelete(zskiplist *zsl, double score, robj *obj) {
e197b441 4386 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4387 int i;
4388
4389 x = zsl->header;
4390 for (i = zsl->level-1; i >= 0; i--) {
9d60e6e4 4391 while (x->forward[i] &&
4392 (x->forward[i]->score < score ||
4393 (x->forward[i]->score == score &&
4394 compareStringObjects(x->forward[i]->obj,obj) < 0)))
e197b441 4395 x = x->forward[i];
4396 update[i] = x;
4397 }
4398 /* We may have multiple elements with the same score, what we need
4399 * is to find the element with both the right score and object. */
4400 x = x->forward[0];
50c55df5 4401 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
9d60e6e4 4402 for (i = 0; i < zsl->level; i++) {
4403 if (update[i]->forward[i] != x) break;
4404 update[i]->forward[i] = x->forward[i];
4405 }
4406 if (x->forward[0]) {
4407 x->forward[0]->backward = (x->backward == zsl->header) ?
4408 NULL : x->backward;
e197b441 4409 } else {
9d60e6e4 4410 zsl->tail = x->backward;
e197b441 4411 }
9d60e6e4 4412 zslFreeNode(x);
4413 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4414 zsl->level--;
4415 zsl->length--;
4416 return 1;
4417 } else {
4418 return 0; /* not found */
e197b441 4419 }
4420 return 0; /* not found */
fd8ccf44 4421}
4422
1807985b 4423/* Delete all the elements with score between min and max from the skiplist.
4424 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4425 * Note that this function takes the reference to the hash table view of the
4426 * sorted set, in order to remove the elements from the hash table too. */
4427static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4428 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4429 unsigned long removed = 0;
4430 int i;
4431
4432 x = zsl->header;
4433 for (i = zsl->level-1; i >= 0; i--) {
4434 while (x->forward[i] && x->forward[i]->score < min)
4435 x = x->forward[i];
4436 update[i] = x;
4437 }
4438 /* We may have multiple elements with the same score, what we need
4439 * is to find the element with both the right score and object. */
4440 x = x->forward[0];
4441 while (x && x->score <= max) {
4442 zskiplistNode *next;
4443
4444 for (i = 0; i < zsl->level; i++) {
4445 if (update[i]->forward[i] != x) break;
4446 update[i]->forward[i] = x->forward[i];
4447 }
4448 if (x->forward[0]) {
4449 x->forward[0]->backward = (x->backward == zsl->header) ?
4450 NULL : x->backward;
4451 } else {
4452 zsl->tail = x->backward;
4453 }
4454 next = x->forward[0];
4455 dictDelete(dict,x->obj);
4456 zslFreeNode(x);
4457 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4458 zsl->level--;
4459 zsl->length--;
4460 removed++;
4461 x = next;
4462 }
4463 return removed; /* not found */
4464}
4465
50c55df5 4466/* Find the first node having a score equal or greater than the specified one.
4467 * Returns NULL if there is no match. */
4468static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4469 zskiplistNode *x;
4470 int i;
4471
4472 x = zsl->header;
4473 for (i = zsl->level-1; i >= 0; i--) {
4474 while (x->forward[i] && x->forward[i]->score < score)
4475 x = x->forward[i];
4476 }
4477 /* We may have multiple elements with the same score, what we need
4478 * is to find the element with both the right score and object. */
4479 return x->forward[0];
4480}
4481
fd8ccf44 4482/* The actual Z-commands implementations */
4483
7db723ad 4484/* This generic command implements both ZADD and ZINCRBY.
e2665397 4485 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
7db723ad 4486 * the increment if the operation is a ZINCRBY (doincrement == 1). */
e2665397 4487static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
fd8ccf44 4488 robj *zsetobj;
4489 zset *zs;
4490 double *score;
4491
e2665397 4492 zsetobj = lookupKeyWrite(c->db,key);
fd8ccf44 4493 if (zsetobj == NULL) {
4494 zsetobj = createZsetObject();
e2665397 4495 dictAdd(c->db->dict,key,zsetobj);
4496 incrRefCount(key);
fd8ccf44 4497 } else {
4498 if (zsetobj->type != REDIS_ZSET) {
4499 addReply(c,shared.wrongtypeerr);
4500 return;
4501 }
4502 }
fd8ccf44 4503 zs = zsetobj->ptr;
e2665397 4504
7db723ad 4505 /* Ok now since we implement both ZADD and ZINCRBY here the code
e2665397 4506 * needs to handle the two different conditions. It's all about setting
4507 * '*score', that is, the new score to set, to the right value. */
4508 score = zmalloc(sizeof(double));
4509 if (doincrement) {
4510 dictEntry *de;
4511
4512 /* Read the old score. If the element was not present starts from 0 */
4513 de = dictFind(zs->dict,ele);
4514 if (de) {
4515 double *oldscore = dictGetEntryVal(de);
4516 *score = *oldscore + scoreval;
4517 } else {
4518 *score = scoreval;
4519 }
4520 } else {
4521 *score = scoreval;
4522 }
4523
4524 /* What follows is a simple remove and re-insert operation that is common
7db723ad 4525 * to both ZADD and ZINCRBY... */
e2665397 4526 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
fd8ccf44 4527 /* case 1: New element */
e2665397 4528 incrRefCount(ele); /* added to hash */
4529 zslInsert(zs->zsl,*score,ele);
4530 incrRefCount(ele); /* added to skiplist */
fd8ccf44 4531 server.dirty++;
e2665397 4532 if (doincrement)
e2665397 4533 addReplyDouble(c,*score);
91d71bfc 4534 else
4535 addReply(c,shared.cone);
fd8ccf44 4536 } else {
4537 dictEntry *de;
4538 double *oldscore;
4539
4540 /* case 2: Score update operation */
e2665397 4541 de = dictFind(zs->dict,ele);
dfc5e96c 4542 redisAssert(de != NULL);
fd8ccf44 4543 oldscore = dictGetEntryVal(de);
4544 if (*score != *oldscore) {
4545 int deleted;
4546
e2665397 4547 /* Remove and insert the element in the skip list with new score */
4548 deleted = zslDelete(zs->zsl,*oldscore,ele);
dfc5e96c 4549 redisAssert(deleted != 0);
e2665397 4550 zslInsert(zs->zsl,*score,ele);
4551 incrRefCount(ele);
4552 /* Update the score in the hash table */
4553 dictReplace(zs->dict,ele,score);
fd8ccf44 4554 server.dirty++;
2161a965 4555 } else {
4556 zfree(score);
fd8ccf44 4557 }
e2665397 4558 if (doincrement)
4559 addReplyDouble(c,*score);
4560 else
4561 addReply(c,shared.czero);
fd8ccf44 4562 }
4563}
4564
e2665397 4565static void zaddCommand(redisClient *c) {
4566 double scoreval;
4567
4568 scoreval = strtod(c->argv[2]->ptr,NULL);
4569 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4570}
4571
7db723ad 4572static void zincrbyCommand(redisClient *c) {
e2665397 4573 double scoreval;
4574
4575 scoreval = strtod(c->argv[2]->ptr,NULL);
4576 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4577}
4578
1b7106e7 4579static void zremCommand(redisClient *c) {
4580 robj *zsetobj;
4581 zset *zs;
4582
4583 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4584 if (zsetobj == NULL) {
4585 addReply(c,shared.czero);
4586 } else {
4587 dictEntry *de;
4588 double *oldscore;
4589 int deleted;
4590
4591 if (zsetobj->type != REDIS_ZSET) {
4592 addReply(c,shared.wrongtypeerr);
4593 return;
4594 }
4595 zs = zsetobj->ptr;
4596 de = dictFind(zs->dict,c->argv[2]);
4597 if (de == NULL) {
4598 addReply(c,shared.czero);
4599 return;
4600 }
4601 /* Delete from the skiplist */
4602 oldscore = dictGetEntryVal(de);
4603 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
dfc5e96c 4604 redisAssert(deleted != 0);
1b7106e7 4605
4606 /* Delete from the hash table */
4607 dictDelete(zs->dict,c->argv[2]);
4608 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4609 server.dirty++;
4610 addReply(c,shared.cone);
4611 }
4612}
4613
1807985b 4614static void zremrangebyscoreCommand(redisClient *c) {
4615 double min = strtod(c->argv[2]->ptr,NULL);
4616 double max = strtod(c->argv[3]->ptr,NULL);
4617 robj *zsetobj;
4618 zset *zs;
4619
4620 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4621 if (zsetobj == NULL) {
4622 addReply(c,shared.czero);
4623 } else {
4624 long deleted;
4625
4626 if (zsetobj->type != REDIS_ZSET) {
4627 addReply(c,shared.wrongtypeerr);
4628 return;
4629 }
4630 zs = zsetobj->ptr;
4631 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4632 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4633 server.dirty += deleted;
4634 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4635 }
4636}
4637
e3870fab 4638static void zrangeGenericCommand(redisClient *c, int reverse) {
cc812361 4639 robj *o;
4640 int start = atoi(c->argv[2]->ptr);
4641 int end = atoi(c->argv[3]->ptr);
752da584 4642 int withscores = 0;
4643
4644 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
4645 withscores = 1;
4646 } else if (c->argc >= 5) {
4647 addReply(c,shared.syntaxerr);
4648 return;
4649 }
cc812361 4650
4651 o = lookupKeyRead(c->db,c->argv[1]);
4652 if (o == NULL) {
4653 addReply(c,shared.nullmultibulk);
4654 } else {
4655 if (o->type != REDIS_ZSET) {
4656 addReply(c,shared.wrongtypeerr);
4657 } else {
4658 zset *zsetobj = o->ptr;
4659 zskiplist *zsl = zsetobj->zsl;
4660 zskiplistNode *ln;
4661
4662 int llen = zsl->length;
4663 int rangelen, j;
4664 robj *ele;
4665
4666 /* convert negative indexes */
4667 if (start < 0) start = llen+start;
4668 if (end < 0) end = llen+end;
4669 if (start < 0) start = 0;
4670 if (end < 0) end = 0;
4671
4672 /* indexes sanity checks */
4673 if (start > end || start >= llen) {
4674 /* Out of range start or start > end result in empty list */
4675 addReply(c,shared.emptymultibulk);
4676 return;
4677 }
4678 if (end >= llen) end = llen-1;
4679 rangelen = (end-start)+1;
4680
4681 /* Return the result in form of a multi-bulk reply */
e3870fab 4682 if (reverse) {
4683 ln = zsl->tail;
4684 while (start--)
4685 ln = ln->backward;
4686 } else {
4687 ln = zsl->header->forward[0];
4688 while (start--)
4689 ln = ln->forward[0];
4690 }
cc812361 4691
752da584 4692 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
4693 withscores ? (rangelen*2) : rangelen));
cc812361 4694 for (j = 0; j < rangelen; j++) {
0aad7a19 4695 ele = ln->obj;
cc812361 4696 addReplyBulkLen(c,ele);
4697 addReply(c,ele);
4698 addReply(c,shared.crlf);
752da584 4699 if (withscores)
4700 addReplyDouble(c,ln->score);
e3870fab 4701 ln = reverse ? ln->backward : ln->forward[0];
cc812361 4702 }
4703 }
4704 }
4705}
4706
e3870fab 4707static void zrangeCommand(redisClient *c) {
4708 zrangeGenericCommand(c,0);
4709}
4710
4711static void zrevrangeCommand(redisClient *c) {
4712 zrangeGenericCommand(c,1);
4713}
4714
50c55df5 4715static void zrangebyscoreCommand(redisClient *c) {
4716 robj *o;
4717 double min = strtod(c->argv[2]->ptr,NULL);
4718 double max = strtod(c->argv[3]->ptr,NULL);
80181f78 4719 int offset = 0, limit = -1;
4720
4721 if (c->argc != 4 && c->argc != 7) {
454d4e43 4722 addReplySds(c,
4723 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
80181f78 4724 return;
4725 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4726 addReply(c,shared.syntaxerr);
4727 return;
4728 } else if (c->argc == 7) {
4729 offset = atoi(c->argv[5]->ptr);
4730 limit = atoi(c->argv[6]->ptr);
0b13687c 4731 if (offset < 0) offset = 0;
80181f78 4732 }
50c55df5 4733
4734 o = lookupKeyRead(c->db,c->argv[1]);
4735 if (o == NULL) {
4736 addReply(c,shared.nullmultibulk);
4737 } else {
4738 if (o->type != REDIS_ZSET) {
4739 addReply(c,shared.wrongtypeerr);
4740 } else {
4741 zset *zsetobj = o->ptr;
4742 zskiplist *zsl = zsetobj->zsl;
4743 zskiplistNode *ln;
4744 robj *ele, *lenobj;
4745 unsigned int rangelen = 0;
4746
4747 /* Get the first node with the score >= min */
4748 ln = zslFirstWithScore(zsl,min);
4749 if (ln == NULL) {
4750 /* No element matching the speciifed interval */
4751 addReply(c,shared.emptymultibulk);
4752 return;
4753 }
4754
4755 /* We don't know in advance how many matching elements there
4756 * are in the list, so we push this object that will represent
4757 * the multi-bulk length in the output buffer, and will "fix"
4758 * it later */
4759 lenobj = createObject(REDIS_STRING,NULL);
4760 addReply(c,lenobj);
c74e7c77 4761 decrRefCount(lenobj);
50c55df5 4762
dbbc7285 4763 while(ln && ln->score <= max) {
80181f78 4764 if (offset) {
4765 offset--;
4766 ln = ln->forward[0];
4767 continue;
4768 }
4769 if (limit == 0) break;
50c55df5 4770 ele = ln->obj;
4771 addReplyBulkLen(c,ele);
4772 addReply(c,ele);
4773 addReply(c,shared.crlf);
4774 ln = ln->forward[0];
4775 rangelen++;
80181f78 4776 if (limit > 0) limit--;
50c55df5 4777 }
4778 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4779 }
4780 }
4781}
4782
3c41331e 4783static void zcardCommand(redisClient *c) {
e197b441 4784 robj *o;
4785 zset *zs;
4786
4787 o = lookupKeyRead(c->db,c->argv[1]);
4788 if (o == NULL) {
4789 addReply(c,shared.czero);
4790 return;
4791 } else {
4792 if (o->type != REDIS_ZSET) {
4793 addReply(c,shared.wrongtypeerr);
4794 } else {
4795 zs = o->ptr;
682ac724 4796 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
e197b441 4797 }
4798 }
4799}
4800
6e333bbe 4801static void zscoreCommand(redisClient *c) {
4802 robj *o;
4803 zset *zs;
4804
4805 o = lookupKeyRead(c->db,c->argv[1]);
4806 if (o == NULL) {
96d8b4ee 4807 addReply(c,shared.nullbulk);
6e333bbe 4808 return;
4809 } else {
4810 if (o->type != REDIS_ZSET) {
4811 addReply(c,shared.wrongtypeerr);
4812 } else {
4813 dictEntry *de;
4814
4815 zs = o->ptr;
4816 de = dictFind(zs->dict,c->argv[2]);
4817 if (!de) {
4818 addReply(c,shared.nullbulk);
4819 } else {
6e333bbe 4820 double *score = dictGetEntryVal(de);
4821
e2665397 4822 addReplyDouble(c,*score);
6e333bbe 4823 }
4824 }
4825 }
4826}
4827
6b47e12e 4828/* ========================= Non type-specific commands ==================== */
4829
ed9b544e 4830static void flushdbCommand(redisClient *c) {
ca37e9cd 4831 server.dirty += dictSize(c->db->dict);
3305306f 4832 dictEmpty(c->db->dict);
4833 dictEmpty(c->db->expires);
ed9b544e 4834 addReply(c,shared.ok);
ed9b544e 4835}
4836
4837static void flushallCommand(redisClient *c) {
ca37e9cd 4838 server.dirty += emptyDb();
ed9b544e 4839 addReply(c,shared.ok);
f78fd11b 4840 rdbSave(server.dbfilename);
ca37e9cd 4841 server.dirty++;
ed9b544e 4842}
4843
56906eef 4844static redisSortOperation *createSortOperation(int type, robj *pattern) {
ed9b544e 4845 redisSortOperation *so = zmalloc(sizeof(*so));
ed9b544e 4846 so->type = type;
4847 so->pattern = pattern;
4848 return so;
4849}
4850
4851/* Return the value associated to the key with a name obtained
4852 * substituting the first occurence of '*' in 'pattern' with 'subst' */
56906eef 4853static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
ed9b544e 4854 char *p;
4855 sds spat, ssub;
4856 robj keyobj;
4857 int prefixlen, sublen, postfixlen;
ed9b544e 4858 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4859 struct {
f1017b3f 4860 long len;
4861 long free;
ed9b544e 4862 char buf[REDIS_SORTKEY_MAX+1];
4863 } keyname;
4864
28173a49 4865 /* If the pattern is "#" return the substitution object itself in order
4866 * to implement the "SORT ... GET #" feature. */
4867 spat = pattern->ptr;
4868 if (spat[0] == '#' && spat[1] == '\0') {
4869 return subst;
4870 }
4871
4872 /* The substitution object may be specially encoded. If so we create
9d65a1bb 4873 * a decoded object on the fly. Otherwise getDecodedObject will just
4874 * increment the ref count, that we'll decrement later. */
4875 subst = getDecodedObject(subst);
942a3961 4876
ed9b544e 4877 ssub = subst->ptr;
4878 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4879 p = strchr(spat,'*');
ed5a857a 4880 if (!p) {
4881 decrRefCount(subst);
4882 return NULL;
4883 }
ed9b544e 4884
4885 prefixlen = p-spat;
4886 sublen = sdslen(ssub);
4887 postfixlen = sdslen(spat)-(prefixlen+1);
4888 memcpy(keyname.buf,spat,prefixlen);
4889 memcpy(keyname.buf+prefixlen,ssub,sublen);
4890 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4891 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4892 keyname.len = prefixlen+sublen+postfixlen;
4893
dfc5e96c 4894 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
942a3961 4895 decrRefCount(subst);
4896
a4d1ba9a 4897 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3305306f 4898 return lookupKeyRead(db,&keyobj);
ed9b544e 4899}
4900
4901/* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4902 * the additional parameter is not standard but a BSD-specific we have to
4903 * pass sorting parameters via the global 'server' structure */
4904static int sortCompare(const void *s1, const void *s2) {
4905 const redisSortObject *so1 = s1, *so2 = s2;
4906 int cmp;
4907
4908 if (!server.sort_alpha) {
4909 /* Numeric sorting. Here it's trivial as we precomputed scores */
4910 if (so1->u.score > so2->u.score) {
4911 cmp = 1;
4912 } else if (so1->u.score < so2->u.score) {
4913 cmp = -1;
4914 } else {
4915 cmp = 0;
4916 }
4917 } else {
4918 /* Alphanumeric sorting */
4919 if (server.sort_bypattern) {
4920 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4921 /* At least one compare object is NULL */
4922 if (so1->u.cmpobj == so2->u.cmpobj)
4923 cmp = 0;
4924 else if (so1->u.cmpobj == NULL)
4925 cmp = -1;
4926 else
4927 cmp = 1;
4928 } else {
4929 /* We have both the objects, use strcoll */
4930 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4931 }
4932 } else {
4933 /* Compare elements directly */
9d65a1bb 4934 robj *dec1, *dec2;
4935
4936 dec1 = getDecodedObject(so1->obj);
4937 dec2 = getDecodedObject(so2->obj);
4938 cmp = strcoll(dec1->ptr,dec2->ptr);
4939 decrRefCount(dec1);
4940 decrRefCount(dec2);
ed9b544e 4941 }
4942 }
4943 return server.sort_desc ? -cmp : cmp;
4944}
4945
4946/* The SORT command is the most complex command in Redis. Warning: this code
4947 * is optimized for speed and a bit less for readability */
4948static void sortCommand(redisClient *c) {
ed9b544e 4949 list *operations;
4950 int outputlen = 0;
4951 int desc = 0, alpha = 0;
4952 int limit_start = 0, limit_count = -1, start, end;
4953 int j, dontsort = 0, vectorlen;
4954 int getop = 0; /* GET operation counter */
443c6409 4955 robj *sortval, *sortby = NULL, *storekey = NULL;
ed9b544e 4956 redisSortObject *vector; /* Resulting vector to sort */
4957
4958 /* Lookup the key to sort. It must be of the right types */
3305306f 4959 sortval = lookupKeyRead(c->db,c->argv[1]);
4960 if (sortval == NULL) {
d922ae65 4961 addReply(c,shared.nullmultibulk);
ed9b544e 4962 return;
4963 }
a5eb649b 4964 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
4965 sortval->type != REDIS_ZSET)
4966 {
c937aa89 4967 addReply(c,shared.wrongtypeerr);
ed9b544e 4968 return;
4969 }
4970
4971 /* Create a list of operations to perform for every sorted element.
4972 * Operations can be GET/DEL/INCR/DECR */
4973 operations = listCreate();
092dac2a 4974 listSetFreeMethod(operations,zfree);
ed9b544e 4975 j = 2;
4976
4977 /* Now we need to protect sortval incrementing its count, in the future
4978 * SORT may have options able to overwrite/delete keys during the sorting
4979 * and the sorted key itself may get destroied */
4980 incrRefCount(sortval);
4981
4982 /* The SORT command has an SQL-alike syntax, parse it */
4983 while(j < c->argc) {
4984 int leftargs = c->argc-j-1;
4985 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4986 desc = 0;
4987 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4988 desc = 1;
4989 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4990 alpha = 1;
4991 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4992 limit_start = atoi(c->argv[j+1]->ptr);
4993 limit_count = atoi(c->argv[j+2]->ptr);
4994 j+=2;
443c6409 4995 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4996 storekey = c->argv[j+1];
4997 j++;
ed9b544e 4998 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4999 sortby = c->argv[j+1];
5000 /* If the BY pattern does not contain '*', i.e. it is constant,
5001 * we don't need to sort nor to lookup the weight keys. */
5002 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
5003 j++;
5004 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
5005 listAddNodeTail(operations,createSortOperation(
5006 REDIS_SORT_GET,c->argv[j+1]));
5007 getop++;
5008 j++;
ed9b544e 5009 } else {
5010 decrRefCount(sortval);
5011 listRelease(operations);
c937aa89 5012 addReply(c,shared.syntaxerr);
ed9b544e 5013 return;
5014 }
5015 j++;
5016 }
5017
5018 /* Load the sorting vector with all the objects to sort */
a5eb649b 5019 switch(sortval->type) {
5020 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
5021 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
5022 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
dfc5e96c 5023 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
a5eb649b 5024 }
ed9b544e 5025 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
ed9b544e 5026 j = 0;
a5eb649b 5027
ed9b544e 5028 if (sortval->type == REDIS_LIST) {
5029 list *list = sortval->ptr;
6208b3a7 5030 listNode *ln;
5031
5032 listRewind(list);
5033 while((ln = listYield(list))) {
ed9b544e 5034 robj *ele = ln->value;
5035 vector[j].obj = ele;
5036 vector[j].u.score = 0;
5037 vector[j].u.cmpobj = NULL;
ed9b544e 5038 j++;
5039 }
5040 } else {
a5eb649b 5041 dict *set;
ed9b544e 5042 dictIterator *di;
5043 dictEntry *setele;
5044
a5eb649b 5045 if (sortval->type == REDIS_SET) {
5046 set = sortval->ptr;
5047 } else {
5048 zset *zs = sortval->ptr;
5049 set = zs->dict;
5050 }
5051
ed9b544e 5052 di = dictGetIterator(set);
ed9b544e 5053 while((setele = dictNext(di)) != NULL) {
5054 vector[j].obj = dictGetEntryKey(setele);
5055 vector[j].u.score = 0;
5056 vector[j].u.cmpobj = NULL;
5057 j++;
5058 }
5059 dictReleaseIterator(di);
5060 }
dfc5e96c 5061 redisAssert(j == vectorlen);
ed9b544e 5062
5063 /* Now it's time to load the right scores in the sorting vector */
5064 if (dontsort == 0) {
5065 for (j = 0; j < vectorlen; j++) {
5066 if (sortby) {
5067 robj *byval;
5068
3305306f 5069 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
ed9b544e 5070 if (!byval || byval->type != REDIS_STRING) continue;
5071 if (alpha) {
9d65a1bb 5072 vector[j].u.cmpobj = getDecodedObject(byval);
ed9b544e 5073 } else {
942a3961 5074 if (byval->encoding == REDIS_ENCODING_RAW) {
5075 vector[j].u.score = strtod(byval->ptr,NULL);
5076 } else {
9d65a1bb 5077 /* Don't need to decode the object if it's
5078 * integer-encoded (the only encoding supported) so
5079 * far. We can just cast it */
f1017b3f 5080 if (byval->encoding == REDIS_ENCODING_INT) {
942a3961 5081 vector[j].u.score = (long)byval->ptr;
f1017b3f 5082 } else
dfc5e96c 5083 redisAssert(1 != 1);
942a3961 5084 }
ed9b544e 5085 }
5086 } else {
942a3961 5087 if (!alpha) {
5088 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
5089 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
5090 else {
5091 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
5092 vector[j].u.score = (long) vector[j].obj->ptr;
5093 else
dfc5e96c 5094 redisAssert(1 != 1);
942a3961 5095 }
5096 }
ed9b544e 5097 }
5098 }
5099 }
5100
5101 /* We are ready to sort the vector... perform a bit of sanity check
5102 * on the LIMIT option too. We'll use a partial version of quicksort. */
5103 start = (limit_start < 0) ? 0 : limit_start;
5104 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
5105 if (start >= vectorlen) {
5106 start = vectorlen-1;
5107 end = vectorlen-2;
5108 }
5109 if (end >= vectorlen) end = vectorlen-1;
5110
5111 if (dontsort == 0) {
5112 server.sort_desc = desc;
5113 server.sort_alpha = alpha;
5114 server.sort_bypattern = sortby ? 1 : 0;
5f5b9840 5115 if (sortby && (start != 0 || end != vectorlen-1))
5116 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
5117 else
5118 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
ed9b544e 5119 }
5120
5121 /* Send command output to the output buffer, performing the specified
5122 * GET/DEL/INCR/DECR operations if any. */
5123 outputlen = getop ? getop*(end-start+1) : end-start+1;
443c6409 5124 if (storekey == NULL) {
5125 /* STORE option not specified, sent the sorting result to client */
5126 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
5127 for (j = start; j <= end; j++) {
5128 listNode *ln;
5129 if (!getop) {
5130 addReplyBulkLen(c,vector[j].obj);
5131 addReply(c,vector[j].obj);
5132 addReply(c,shared.crlf);
5133 }
5134 listRewind(operations);
5135 while((ln = listYield(operations))) {
5136 redisSortOperation *sop = ln->value;
5137 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5138 vector[j].obj);
5139
5140 if (sop->type == REDIS_SORT_GET) {
5141 if (!val || val->type != REDIS_STRING) {
5142 addReply(c,shared.nullbulk);
5143 } else {
5144 addReplyBulkLen(c,val);
5145 addReply(c,val);
5146 addReply(c,shared.crlf);
5147 }
5148 } else {
dfc5e96c 5149 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
443c6409 5150 }
5151 }
ed9b544e 5152 }
443c6409 5153 } else {
5154 robj *listObject = createListObject();
5155 list *listPtr = (list*) listObject->ptr;
5156
5157 /* STORE option specified, set the sorting result as a List object */
5158 for (j = start; j <= end; j++) {
5159 listNode *ln;
5160 if (!getop) {
5161 listAddNodeTail(listPtr,vector[j].obj);
5162 incrRefCount(vector[j].obj);
5163 }
5164 listRewind(operations);
5165 while((ln = listYield(operations))) {
5166 redisSortOperation *sop = ln->value;
5167 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5168 vector[j].obj);
5169
5170 if (sop->type == REDIS_SORT_GET) {
5171 if (!val || val->type != REDIS_STRING) {
5172 listAddNodeTail(listPtr,createStringObject("",0));
5173 } else {
5174 listAddNodeTail(listPtr,val);
5175 incrRefCount(val);
5176 }
ed9b544e 5177 } else {
dfc5e96c 5178 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
ed9b544e 5179 }
ed9b544e 5180 }
ed9b544e 5181 }
121796f7 5182 if (dictReplace(c->db->dict,storekey,listObject)) {
5183 incrRefCount(storekey);
5184 }
443c6409 5185 /* Note: we add 1 because the DB is dirty anyway since even if the
5186 * SORT result is empty a new key is set and maybe the old content
5187 * replaced. */
5188 server.dirty += 1+outputlen;
5189 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
ed9b544e 5190 }
5191
5192 /* Cleanup */
5193 decrRefCount(sortval);
5194 listRelease(operations);
5195 for (j = 0; j < vectorlen; j++) {
5196 if (sortby && alpha && vector[j].u.cmpobj)
5197 decrRefCount(vector[j].u.cmpobj);
5198 }
5199 zfree(vector);
5200}
5201
1c85b79f 5202/* Create the string returned by the INFO command. This is decoupled
5203 * by the INFO command itself as we need to report the same information
5204 * on memory corruption problems. */
5205static sds genRedisInfoString(void) {
ed9b544e 5206 sds info;
5207 time_t uptime = time(NULL)-server.stat_starttime;
c3cb078d 5208 int j;
ed9b544e 5209
5210 info = sdscatprintf(sdsempty(),
5211 "redis_version:%s\r\n"
f1017b3f 5212 "arch_bits:%s\r\n"
7a932b74 5213 "multiplexing_api:%s\r\n"
682ac724 5214 "uptime_in_seconds:%ld\r\n"
5215 "uptime_in_days:%ld\r\n"
ed9b544e 5216 "connected_clients:%d\r\n"
5217 "connected_slaves:%d\r\n"
f86a74e9 5218 "blocked_clients:%d\r\n"
5fba9f71 5219 "used_memory:%zu\r\n"
ed9b544e 5220 "changes_since_last_save:%lld\r\n"
be2bb6b0 5221 "bgsave_in_progress:%d\r\n"
682ac724 5222 "last_save_time:%ld\r\n"
b3fad521 5223 "bgrewriteaof_in_progress:%d\r\n"
ed9b544e 5224 "total_connections_received:%lld\r\n"
5225 "total_commands_processed:%lld\r\n"
a0f643ea 5226 "role:%s\r\n"
ed9b544e 5227 ,REDIS_VERSION,
f1017b3f 5228 (sizeof(long) == 8) ? "64" : "32",
7a932b74 5229 aeGetApiName(),
a0f643ea 5230 uptime,
5231 uptime/(3600*24),
ed9b544e 5232 listLength(server.clients)-listLength(server.slaves),
5233 listLength(server.slaves),
f86a74e9 5234 server.blockedclients,
ed9b544e 5235 server.usedmemory,
5236 server.dirty,
9d65a1bb 5237 server.bgsavechildpid != -1,
ed9b544e 5238 server.lastsave,
b3fad521 5239 server.bgrewritechildpid != -1,
ed9b544e 5240 server.stat_numconnections,
5241 server.stat_numcommands,
a0f643ea 5242 server.masterhost == NULL ? "master" : "slave"
ed9b544e 5243 );
a0f643ea 5244 if (server.masterhost) {
5245 info = sdscatprintf(info,
5246 "master_host:%s\r\n"
5247 "master_port:%d\r\n"
5248 "master_link_status:%s\r\n"
5249 "master_last_io_seconds_ago:%d\r\n"
5250 ,server.masterhost,
5251 server.masterport,
5252 (server.replstate == REDIS_REPL_CONNECTED) ?
5253 "up" : "down",
f72b934d 5254 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
a0f643ea 5255 );
5256 }
c3cb078d 5257 for (j = 0; j < server.dbnum; j++) {
5258 long long keys, vkeys;
5259
5260 keys = dictSize(server.db[j].dict);
5261 vkeys = dictSize(server.db[j].expires);
5262 if (keys || vkeys) {
9d65a1bb 5263 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
c3cb078d 5264 j, keys, vkeys);
5265 }
5266 }
1c85b79f 5267 return info;
5268}
5269
5270static void infoCommand(redisClient *c) {
5271 sds info = genRedisInfoString();
83c6a618 5272 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5273 (unsigned long)sdslen(info)));
ed9b544e 5274 addReplySds(c,info);
70003d28 5275 addReply(c,shared.crlf);
ed9b544e 5276}
5277
3305306f 5278static void monitorCommand(redisClient *c) {
5279 /* ignore MONITOR if aleady slave or in monitor mode */
5280 if (c->flags & REDIS_SLAVE) return;
5281
5282 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5283 c->slaveseldb = 0;
6b47e12e 5284 listAddNodeTail(server.monitors,c);
3305306f 5285 addReply(c,shared.ok);
5286}
5287
5288/* ================================= Expire ================================= */
5289static int removeExpire(redisDb *db, robj *key) {
5290 if (dictDelete(db->expires,key) == DICT_OK) {
5291 return 1;
5292 } else {
5293 return 0;
5294 }
5295}
5296
5297static int setExpire(redisDb *db, robj *key, time_t when) {
5298 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5299 return 0;
5300 } else {
5301 incrRefCount(key);
5302 return 1;
5303 }
5304}
5305
bb32ede5 5306/* Return the expire time of the specified key, or -1 if no expire
5307 * is associated with this key (i.e. the key is non volatile) */
5308static time_t getExpire(redisDb *db, robj *key) {
5309 dictEntry *de;
5310
5311 /* No expire? return ASAP */
5312 if (dictSize(db->expires) == 0 ||
5313 (de = dictFind(db->expires,key)) == NULL) return -1;
5314
5315 return (time_t) dictGetEntryVal(de);
5316}
5317
3305306f 5318static int expireIfNeeded(redisDb *db, robj *key) {
5319 time_t when;
5320 dictEntry *de;
5321
5322 /* No expire? return ASAP */
5323 if (dictSize(db->expires) == 0 ||
5324 (de = dictFind(db->expires,key)) == NULL) return 0;
5325
5326 /* Lookup the expire */
5327 when = (time_t) dictGetEntryVal(de);
5328 if (time(NULL) <= when) return 0;
5329
5330 /* Delete the key */
5331 dictDelete(db->expires,key);
5332 return dictDelete(db->dict,key) == DICT_OK;
5333}
5334
5335static int deleteIfVolatile(redisDb *db, robj *key) {
5336 dictEntry *de;
5337
5338 /* No expire? return ASAP */
5339 if (dictSize(db->expires) == 0 ||
5340 (de = dictFind(db->expires,key)) == NULL) return 0;
5341
5342 /* Delete the key */
0c66a471 5343 server.dirty++;
3305306f 5344 dictDelete(db->expires,key);
5345 return dictDelete(db->dict,key) == DICT_OK;
5346}
5347
802e8373 5348static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
3305306f 5349 dictEntry *de;
3305306f 5350
802e8373 5351 de = dictFind(c->db->dict,key);
3305306f 5352 if (de == NULL) {
5353 addReply(c,shared.czero);
5354 return;
5355 }
43e5ccdf 5356 if (seconds < 0) {
5357 if (deleteKey(c->db,key)) server.dirty++;
5358 addReply(c, shared.cone);
3305306f 5359 return;
5360 } else {
5361 time_t when = time(NULL)+seconds;
802e8373 5362 if (setExpire(c->db,key,when)) {
3305306f 5363 addReply(c,shared.cone);
77423026 5364 server.dirty++;
5365 } else {
3305306f 5366 addReply(c,shared.czero);
77423026 5367 }
3305306f 5368 return;
5369 }
5370}
5371
802e8373 5372static void expireCommand(redisClient *c) {
5373 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5374}
5375
5376static void expireatCommand(redisClient *c) {
5377 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5378}
5379
fd88489a 5380static void ttlCommand(redisClient *c) {
5381 time_t expire;
5382 int ttl = -1;
5383
5384 expire = getExpire(c->db,c->argv[1]);
5385 if (expire != -1) {
5386 ttl = (int) (expire-time(NULL));
5387 if (ttl < 0) ttl = -1;
5388 }
5389 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5390}
5391
6e469882 5392/* ================================ MULTI/EXEC ============================== */
5393
5394/* Client state initialization for MULTI/EXEC */
5395static void initClientMultiState(redisClient *c) {
5396 c->mstate.commands = NULL;
5397 c->mstate.count = 0;
5398}
5399
5400/* Release all the resources associated with MULTI/EXEC state */
5401static void freeClientMultiState(redisClient *c) {
5402 int j;
5403
5404 for (j = 0; j < c->mstate.count; j++) {
5405 int i;
5406 multiCmd *mc = c->mstate.commands+j;
5407
5408 for (i = 0; i < mc->argc; i++)
5409 decrRefCount(mc->argv[i]);
5410 zfree(mc->argv);
5411 }
5412 zfree(c->mstate.commands);
5413}
5414
5415/* Add a new command into the MULTI commands queue */
5416static void queueMultiCommand(redisClient *c, struct redisCommand *cmd) {
5417 multiCmd *mc;
5418 int j;
5419
5420 c->mstate.commands = zrealloc(c->mstate.commands,
5421 sizeof(multiCmd)*(c->mstate.count+1));
5422 mc = c->mstate.commands+c->mstate.count;
5423 mc->cmd = cmd;
5424 mc->argc = c->argc;
5425 mc->argv = zmalloc(sizeof(robj*)*c->argc);
5426 memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
5427 for (j = 0; j < c->argc; j++)
5428 incrRefCount(mc->argv[j]);
5429 c->mstate.count++;
5430}
5431
5432static void multiCommand(redisClient *c) {
5433 c->flags |= REDIS_MULTI;
36c548f0 5434 addReply(c,shared.ok);
6e469882 5435}
5436
5437static void execCommand(redisClient *c) {
5438 int j;
5439 robj **orig_argv;
5440 int orig_argc;
5441
5442 if (!(c->flags & REDIS_MULTI)) {
5443 addReplySds(c,sdsnew("-ERR EXEC without MULTI\r\n"));
5444 return;
5445 }
5446
5447 orig_argv = c->argv;
5448 orig_argc = c->argc;
5449 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
5450 for (j = 0; j < c->mstate.count; j++) {
5451 c->argc = c->mstate.commands[j].argc;
5452 c->argv = c->mstate.commands[j].argv;
5453 call(c,c->mstate.commands[j].cmd);
5454 }
5455 c->argv = orig_argv;
5456 c->argc = orig_argc;
5457 freeClientMultiState(c);
5458 initClientMultiState(c);
5459 c->flags &= (~REDIS_MULTI);
5460}
5461
4409877e 5462/* =========================== Blocking Operations ========================= */
5463
5464/* Currently Redis blocking operations support is limited to list POP ops,
5465 * so the current implementation is not fully generic, but it is also not
5466 * completely specific so it will not require a rewrite to support new
5467 * kind of blocking operations in the future.
5468 *
5469 * Still it's important to note that list blocking operations can be already
5470 * used as a notification mechanism in order to implement other blocking
5471 * operations at application level, so there must be a very strong evidence
5472 * of usefulness and generality before new blocking operations are implemented.
5473 *
5474 * This is how the current blocking POP works, we use BLPOP as example:
5475 * - If the user calls BLPOP and the key exists and contains a non empty list
5476 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5477 * if there is not to block.
5478 * - If instead BLPOP is called and the key does not exists or the list is
5479 * empty we need to block. In order to do so we remove the notification for
5480 * new data to read in the client socket (so that we'll not serve new
5481 * requests if the blocking request is not served). Also we put the client
95242ab5 5482 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
4409877e 5483 * blocking for this keys.
5484 * - If a PUSH operation against a key with blocked clients waiting is
5485 * performed, we serve the first in the list: basically instead to push
5486 * the new element inside the list we return it to the (first / oldest)
5487 * blocking client, unblock the client, and remove it form the list.
5488 *
5489 * The above comment and the source code should be enough in order to understand
5490 * the implementation and modify / fix it later.
5491 */
5492
5493/* Set a client in blocking mode for the specified key, with the specified
5494 * timeout */
5495static void blockForKey(redisClient *c, robj *key, time_t timeout) {
5496 dictEntry *de;
5497 list *l;
5498
5499 c->blockingkey = key;
5500 incrRefCount(key);
5501 c->blockingto = timeout;
5502 de = dictFind(c->db->blockingkeys,key);
5503 if (de == NULL) {
5504 int retval;
5505
5506 l = listCreate();
2affc3ed 5507 retval = dictAdd(c->db->blockingkeys,key,l);
9fe33a0e 5508 incrRefCount(key);
4409877e 5509 assert(retval == DICT_OK);
5510 } else {
5511 l = dictGetEntryVal(de);
5512 }
5513 listAddNodeTail(l,c);
5514 c->flags |= REDIS_BLOCKED;
5515 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
f86a74e9 5516 server.blockedclients++;
4409877e 5517}
5518
5519/* Unblock a client that's waiting in a blocking operation such as BLPOP */
5520static void unblockClient(redisClient *c) {
5521 dictEntry *de;
5522 list *l;
5523
5524 /* Remove this client from the list of clients waiting for this key. */
5525 assert(c->blockingkey != NULL);
5526 de = dictFind(c->db->blockingkeys,c->blockingkey);
5527 assert(de != NULL);
5528 l = dictGetEntryVal(de);
5529 listDelNode(l,listSearchKey(l,c));
5530 /* If the list is empty we need to remove it to avoid wasting memory */
5531 if (listLength(l) == 0)
5532 dictDelete(c->db->blockingkeys,c->blockingkey);
5533 /* Finally set the right flags in the client structure */
5534 decrRefCount(c->blockingkey);
5535 c->blockingkey = NULL;
5536 c->flags &= (~REDIS_BLOCKED);
f86a74e9 5537 server.blockedclients--;
4409877e 5538 /* Ok now we are ready to get read events from socket, note that we
5539 * can't trap errors here as it's possible that unblockClients() is
5540 * called from freeClient() itself, and the only thing we can do
5541 * if we failed to register the READABLE event is to kill the client.
5542 * Still the following function should never fail in the real world as
5543 * we are sure the file descriptor is sane, and we exit on out of mem. */
5544 aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c);
5545 /* As a final step we want to process data if there is some command waiting
5546 * in the input buffer. Note that this is safe even if unblockClient()
5547 * gets called from freeClient() because freeClient() will be smart
5548 * enough to call this function *after* c->querybuf was set to NULL. */
5549 if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
5550}
5551
5552/* This should be called from any function PUSHing into lists.
5553 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5554 * 'ele' is the element pushed.
5555 *
5556 * If the function returns 0 there was no client waiting for a list push
5557 * against this key.
5558 *
5559 * If the function returns 1 there was a client waiting for a list push
5560 * against this key, the element was passed to this client thus it's not
5561 * needed to actually add it to the list and the caller should return asap. */
5562static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
5563 struct dictEntry *de;
5564 redisClient *receiver;
5565 list *l;
5566 listNode *ln;
5567
5568 de = dictFind(c->db->blockingkeys,key);
5569 if (de == NULL) return 0;
5570 l = dictGetEntryVal(de);
5571 ln = listFirst(l);
5572 assert(ln != NULL);
5573 receiver = ln->value;
4409877e 5574
5575 addReplyBulkLen(receiver,ele);
5576 addReply(receiver,ele);
5577 addReply(receiver,shared.crlf);
5578 unblockClient(receiver);
5579 return 1;
5580}
5581
5582/* Blocking RPOP/LPOP */
5583static void blockingPopGenericCommand(redisClient *c, int where) {
5584 robj *o;
5585 time_t timeout;
5586
5587 o = lookupKeyWrite(c->db,c->argv[1]);
5588 if (o != NULL) {
5589 if (o->type != REDIS_LIST) {
5590 popGenericCommand(c,where);
5591 return;
5592 } else {
5593 list *list = o->ptr;
5594 if (listLength(list) != 0) {
5595 /* If the list contains elements fall back to the usual
5596 * non-blocking POP operation */
5597 popGenericCommand(c,where);
5598 return;
5599 }
5600 }
5601 }
5602 /* If the list is empty or the key does not exists we must block */
5603 timeout = strtol(c->argv[2]->ptr,NULL,10);
5604 if (timeout > 0) timeout += time(NULL);
5605 blockForKey(c,c->argv[1],timeout);
5606}
5607
5608static void blpopCommand(redisClient *c) {
5609 blockingPopGenericCommand(c,REDIS_HEAD);
5610}
5611
5612static void brpopCommand(redisClient *c) {
5613 blockingPopGenericCommand(c,REDIS_TAIL);
5614}
5615
ed9b544e 5616/* =============================== Replication ============================= */
5617
a4d1ba9a 5618static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
ed9b544e 5619 ssize_t nwritten, ret = size;
5620 time_t start = time(NULL);
5621
5622 timeout++;
5623 while(size) {
5624 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5625 nwritten = write(fd,ptr,size);
5626 if (nwritten == -1) return -1;
5627 ptr += nwritten;
5628 size -= nwritten;
5629 }
5630 if ((time(NULL)-start) > timeout) {
5631 errno = ETIMEDOUT;
5632 return -1;
5633 }
5634 }
5635 return ret;
5636}
5637
a4d1ba9a 5638static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
ed9b544e 5639 ssize_t nread, totread = 0;
5640 time_t start = time(NULL);
5641
5642 timeout++;
5643 while(size) {
5644 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5645 nread = read(fd,ptr,size);
5646 if (nread == -1) return -1;
5647 ptr += nread;
5648 size -= nread;
5649 totread += nread;
5650 }
5651 if ((time(NULL)-start) > timeout) {
5652 errno = ETIMEDOUT;
5653 return -1;
5654 }
5655 }
5656 return totread;
5657}
5658
5659static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5660 ssize_t nread = 0;
5661
5662 size--;
5663 while(size) {
5664 char c;
5665
5666 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5667 if (c == '\n') {
5668 *ptr = '\0';
5669 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5670 return nread;
5671 } else {
5672 *ptr++ = c;
5673 *ptr = '\0';
5674 nread++;
5675 }
5676 }
5677 return nread;
5678}
5679
5680static void syncCommand(redisClient *c) {
40d224a9 5681 /* ignore SYNC if aleady slave or in monitor mode */
5682 if (c->flags & REDIS_SLAVE) return;
5683
5684 /* SYNC can't be issued when the server has pending data to send to
5685 * the client about already issued commands. We need a fresh reply
5686 * buffer registering the differences between the BGSAVE and the current
5687 * dataset, so that we can copy to other slaves if needed. */
5688 if (listLength(c->reply) != 0) {
5689 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5690 return;
5691 }
5692
5693 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5694 /* Here we need to check if there is a background saving operation
5695 * in progress, or if it is required to start one */
9d65a1bb 5696 if (server.bgsavechildpid != -1) {
40d224a9 5697 /* Ok a background save is in progress. Let's check if it is a good
5698 * one for replication, i.e. if there is another slave that is
5699 * registering differences since the server forked to save */
5700 redisClient *slave;
5701 listNode *ln;
5702
6208b3a7 5703 listRewind(server.slaves);
5704 while((ln = listYield(server.slaves))) {
40d224a9 5705 slave = ln->value;
5706 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
40d224a9 5707 }
5708 if (ln) {
5709 /* Perfect, the server is already registering differences for
5710 * another slave. Set the right state, and copy the buffer. */
5711 listRelease(c->reply);
5712 c->reply = listDup(slave->reply);
40d224a9 5713 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5714 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5715 } else {
5716 /* No way, we need to wait for the next BGSAVE in order to
5717 * register differences */
5718 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5719 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5720 }
5721 } else {
5722 /* Ok we don't have a BGSAVE in progress, let's start one */
5723 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5724 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5725 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5726 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5727 return;
5728 }
5729 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5730 }
6208b3a7 5731 c->repldbfd = -1;
40d224a9 5732 c->flags |= REDIS_SLAVE;
5733 c->slaveseldb = 0;
6b47e12e 5734 listAddNodeTail(server.slaves,c);
40d224a9 5735 return;
5736}
5737
6208b3a7 5738static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5739 redisClient *slave = privdata;
5740 REDIS_NOTUSED(el);
5741 REDIS_NOTUSED(mask);
5742 char buf[REDIS_IOBUF_LEN];
5743 ssize_t nwritten, buflen;
5744
5745 if (slave->repldboff == 0) {
5746 /* Write the bulk write count before to transfer the DB. In theory here
5747 * we don't know how much room there is in the output buffer of the
5748 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5749 * operations) will never be smaller than the few bytes we need. */
5750 sds bulkcount;
5751
5752 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5753 slave->repldbsize);
5754 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5755 {
5756 sdsfree(bulkcount);
5757 freeClient(slave);
5758 return;
5759 }
5760 sdsfree(bulkcount);
5761 }
5762 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5763 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5764 if (buflen <= 0) {
5765 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5766 (buflen == 0) ? "premature EOF" : strerror(errno));
5767 freeClient(slave);
5768 return;
5769 }
5770 if ((nwritten = write(fd,buf,buflen)) == -1) {
5771 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5772 strerror(errno));
5773 freeClient(slave);
5774 return;
5775 }
5776 slave->repldboff += nwritten;
5777 if (slave->repldboff == slave->repldbsize) {
5778 close(slave->repldbfd);
5779 slave->repldbfd = -1;
5780 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5781 slave->replstate = REDIS_REPL_ONLINE;
5782 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
266373b2 5783 sendReplyToClient, slave) == AE_ERR) {
6208b3a7 5784 freeClient(slave);
5785 return;
5786 }
5787 addReplySds(slave,sdsempty());
5788 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5789 }
5790}
ed9b544e 5791
a3b21203 5792/* This function is called at the end of every backgrond saving.
5793 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5794 * otherwise REDIS_ERR is passed to the function.
5795 *
5796 * The goal of this function is to handle slaves waiting for a successful
5797 * background saving in order to perform non-blocking synchronization. */
5798static void updateSlavesWaitingBgsave(int bgsaveerr) {
6208b3a7 5799 listNode *ln;
5800 int startbgsave = 0;
ed9b544e 5801
6208b3a7 5802 listRewind(server.slaves);
5803 while((ln = listYield(server.slaves))) {
5804 redisClient *slave = ln->value;
ed9b544e 5805
6208b3a7 5806 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5807 startbgsave = 1;
5808 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5809 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
dde65f3f 5810 struct redis_stat buf;
6208b3a7 5811
5812 if (bgsaveerr != REDIS_OK) {
5813 freeClient(slave);
5814 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5815 continue;
5816 }
5817 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
dde65f3f 5818 redis_fstat(slave->repldbfd,&buf) == -1) {
6208b3a7 5819 freeClient(slave);
5820 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5821 continue;
5822 }
5823 slave->repldboff = 0;
5824 slave->repldbsize = buf.st_size;
5825 slave->replstate = REDIS_REPL_SEND_BULK;
5826 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
266373b2 5827 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
6208b3a7 5828 freeClient(slave);
5829 continue;
5830 }
5831 }
ed9b544e 5832 }
6208b3a7 5833 if (startbgsave) {
5834 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5835 listRewind(server.slaves);
5836 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5837 while((ln = listYield(server.slaves))) {
5838 redisClient *slave = ln->value;
ed9b544e 5839
6208b3a7 5840 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5841 freeClient(slave);
5842 }
5843 }
5844 }
ed9b544e 5845}
5846
5847static int syncWithMaster(void) {
d0ccebcf 5848 char buf[1024], tmpfile[256], authcmd[1024];
ed9b544e 5849 int dumpsize;
5850 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5851 int dfd;
5852
5853 if (fd == -1) {
5854 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5855 strerror(errno));
5856 return REDIS_ERR;
5857 }
d0ccebcf 5858
5859 /* AUTH with the master if required. */
5860 if(server.masterauth) {
5861 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5862 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5863 close(fd);
5864 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5865 strerror(errno));
5866 return REDIS_ERR;
5867 }
5868 /* Read the AUTH result. */
5869 if (syncReadLine(fd,buf,1024,3600) == -1) {
5870 close(fd);
5871 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5872 strerror(errno));
5873 return REDIS_ERR;
5874 }
5875 if (buf[0] != '+') {
5876 close(fd);
5877 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5878 return REDIS_ERR;
5879 }
5880 }
5881
ed9b544e 5882 /* Issue the SYNC command */
5883 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5884 close(fd);
5885 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5886 strerror(errno));
5887 return REDIS_ERR;
5888 }
5889 /* Read the bulk write count */
8c4d91fc 5890 if (syncReadLine(fd,buf,1024,3600) == -1) {
ed9b544e 5891 close(fd);
5892 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5893 strerror(errno));
5894 return REDIS_ERR;
5895 }
4aa701c1 5896 if (buf[0] != '$') {
5897 close(fd);
5898 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5899 return REDIS_ERR;
5900 }
c937aa89 5901 dumpsize = atoi(buf+1);
ed9b544e 5902 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5903 /* Read the bulk write data on a temp file */
5904 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5905 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5906 if (dfd == -1) {
5907 close(fd);
5908 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5909 return REDIS_ERR;
5910 }
5911 while(dumpsize) {
5912 int nread, nwritten;
5913
5914 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5915 if (nread == -1) {
5916 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5917 strerror(errno));
5918 close(fd);
5919 close(dfd);
5920 return REDIS_ERR;
5921 }
5922 nwritten = write(dfd,buf,nread);
5923 if (nwritten == -1) {
5924 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5925 close(fd);
5926 close(dfd);
5927 return REDIS_ERR;
5928 }
5929 dumpsize -= nread;
5930 }
5931 close(dfd);
5932 if (rename(tmpfile,server.dbfilename) == -1) {
5933 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5934 unlink(tmpfile);
5935 close(fd);
5936 return REDIS_ERR;
5937 }
5938 emptyDb();
f78fd11b 5939 if (rdbLoad(server.dbfilename) != REDIS_OK) {
ed9b544e 5940 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5941 close(fd);
5942 return REDIS_ERR;
5943 }
5944 server.master = createClient(fd);
5945 server.master->flags |= REDIS_MASTER;
179b3952 5946 server.master->authenticated = 1;
ed9b544e 5947 server.replstate = REDIS_REPL_CONNECTED;
5948 return REDIS_OK;
5949}
5950
321b0e13 5951static void slaveofCommand(redisClient *c) {
5952 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5953 !strcasecmp(c->argv[2]->ptr,"one")) {
5954 if (server.masterhost) {
5955 sdsfree(server.masterhost);
5956 server.masterhost = NULL;
5957 if (server.master) freeClient(server.master);
5958 server.replstate = REDIS_REPL_NONE;
5959 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5960 }
5961 } else {
5962 sdsfree(server.masterhost);
5963 server.masterhost = sdsdup(c->argv[1]->ptr);
5964 server.masterport = atoi(c->argv[2]->ptr);
5965 if (server.master) freeClient(server.master);
5966 server.replstate = REDIS_REPL_CONNECT;
5967 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5968 server.masterhost, server.masterport);
5969 }
5970 addReply(c,shared.ok);
5971}
5972
3fd78bcd 5973/* ============================ Maxmemory directive ======================== */
5974
5975/* This function gets called when 'maxmemory' is set on the config file to limit
5976 * the max memory used by the server, and we are out of memory.
5977 * This function will try to, in order:
5978 *
5979 * - Free objects from the free list
5980 * - Try to remove keys with an EXPIRE set
5981 *
5982 * It is not possible to free enough memory to reach used-memory < maxmemory
5983 * the server will start refusing commands that will enlarge even more the
5984 * memory usage.
5985 */
5986static void freeMemoryIfNeeded(void) {
5987 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5988 if (listLength(server.objfreelist)) {
5989 robj *o;
5990
5991 listNode *head = listFirst(server.objfreelist);
5992 o = listNodeValue(head);
5993 listDelNode(server.objfreelist,head);
5994 zfree(o);
5995 } else {
5996 int j, k, freed = 0;
5997
5998 for (j = 0; j < server.dbnum; j++) {
5999 int minttl = -1;
6000 robj *minkey = NULL;
6001 struct dictEntry *de;
6002
6003 if (dictSize(server.db[j].expires)) {
6004 freed = 1;
6005 /* From a sample of three keys drop the one nearest to
6006 * the natural expire */
6007 for (k = 0; k < 3; k++) {
6008 time_t t;
6009
6010 de = dictGetRandomKey(server.db[j].expires);
6011 t = (time_t) dictGetEntryVal(de);
6012 if (minttl == -1 || t < minttl) {
6013 minkey = dictGetEntryKey(de);
6014 minttl = t;
6015 }
6016 }
6017 deleteKey(server.db+j,minkey);
6018 }
6019 }
6020 if (!freed) return; /* nothing to free... */
6021 }
6022 }
6023}
6024
f80dff62 6025/* ============================== Append Only file ========================== */
6026
6027static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
6028 sds buf = sdsempty();
6029 int j;
6030 ssize_t nwritten;
6031 time_t now;
6032 robj *tmpargv[3];
6033
6034 /* The DB this command was targetting is not the same as the last command
6035 * we appendend. To issue a SELECT command is needed. */
6036 if (dictid != server.appendseldb) {
6037 char seldb[64];
6038
6039 snprintf(seldb,sizeof(seldb),"%d",dictid);
682ac724 6040 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
83c6a618 6041 (unsigned long)strlen(seldb),seldb);
f80dff62 6042 server.appendseldb = dictid;
6043 }
6044
6045 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6046 * EXPIREs into EXPIREATs calls */
6047 if (cmd->proc == expireCommand) {
6048 long when;
6049
6050 tmpargv[0] = createStringObject("EXPIREAT",8);
6051 tmpargv[1] = argv[1];
6052 incrRefCount(argv[1]);
6053 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
6054 tmpargv[2] = createObject(REDIS_STRING,
6055 sdscatprintf(sdsempty(),"%ld",when));
6056 argv = tmpargv;
6057 }
6058
6059 /* Append the actual command */
6060 buf = sdscatprintf(buf,"*%d\r\n",argc);
6061 for (j = 0; j < argc; j++) {
6062 robj *o = argv[j];
6063
9d65a1bb 6064 o = getDecodedObject(o);
83c6a618 6065 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
f80dff62 6066 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
6067 buf = sdscatlen(buf,"\r\n",2);
9d65a1bb 6068 decrRefCount(o);
f80dff62 6069 }
6070
6071 /* Free the objects from the modified argv for EXPIREAT */
6072 if (cmd->proc == expireCommand) {
6073 for (j = 0; j < 3; j++)
6074 decrRefCount(argv[j]);
6075 }
6076
6077 /* We want to perform a single write. This should be guaranteed atomic
6078 * at least if the filesystem we are writing is a real physical one.
6079 * While this will save us against the server being killed I don't think
6080 * there is much to do about the whole server stopping for power problems
6081 * or alike */
6082 nwritten = write(server.appendfd,buf,sdslen(buf));
6083 if (nwritten != (signed)sdslen(buf)) {
6084 /* Ooops, we are in troubles. The best thing to do for now is
6085 * to simply exit instead to give the illusion that everything is
6086 * working as expected. */
6087 if (nwritten == -1) {
6088 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
6089 } else {
6090 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
6091 }
6092 exit(1);
6093 }
85a83172 6094 /* If a background append only file rewriting is in progress we want to
6095 * accumulate the differences between the child DB and the current one
6096 * in a buffer, so that when the child process will do its work we
6097 * can append the differences to the new append only file. */
6098 if (server.bgrewritechildpid != -1)
6099 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
6100
6101 sdsfree(buf);
f80dff62 6102 now = time(NULL);
6103 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
6104 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
6105 now-server.lastfsync > 1))
6106 {
6107 fsync(server.appendfd); /* Let's try to get this data on the disk */
6108 server.lastfsync = now;
6109 }
6110}
6111
6112/* In Redis commands are always executed in the context of a client, so in
6113 * order to load the append only file we need to create a fake client. */
6114static struct redisClient *createFakeClient(void) {
6115 struct redisClient *c = zmalloc(sizeof(*c));
6116
6117 selectDb(c,0);
6118 c->fd = -1;
6119 c->querybuf = sdsempty();
6120 c->argc = 0;
6121 c->argv = NULL;
6122 c->flags = 0;
9387d17d 6123 /* We set the fake client as a slave waiting for the synchronization
6124 * so that Redis will not try to send replies to this client. */
6125 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
f80dff62 6126 c->reply = listCreate();
6127 listSetFreeMethod(c->reply,decrRefCount);
6128 listSetDupMethod(c->reply,dupClientReplyValue);
6129 return c;
6130}
6131
6132static void freeFakeClient(struct redisClient *c) {
6133 sdsfree(c->querybuf);
6134 listRelease(c->reply);
6135 zfree(c);
6136}
6137
6138/* Replay the append log file. On error REDIS_OK is returned. On non fatal
6139 * error (the append only file is zero-length) REDIS_ERR is returned. On
6140 * fatal error an error message is logged and the program exists. */
6141int loadAppendOnlyFile(char *filename) {
6142 struct redisClient *fakeClient;
6143 FILE *fp = fopen(filename,"r");
6144 struct redis_stat sb;
6145
6146 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
6147 return REDIS_ERR;
6148
6149 if (fp == NULL) {
6150 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
6151 exit(1);
6152 }
6153
6154 fakeClient = createFakeClient();
6155 while(1) {
6156 int argc, j;
6157 unsigned long len;
6158 robj **argv;
6159 char buf[128];
6160 sds argsds;
6161 struct redisCommand *cmd;
6162
6163 if (fgets(buf,sizeof(buf),fp) == NULL) {
6164 if (feof(fp))
6165 break;
6166 else
6167 goto readerr;
6168 }
6169 if (buf[0] != '*') goto fmterr;
6170 argc = atoi(buf+1);
6171 argv = zmalloc(sizeof(robj*)*argc);
6172 for (j = 0; j < argc; j++) {
6173 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
6174 if (buf[0] != '$') goto fmterr;
6175 len = strtol(buf+1,NULL,10);
6176 argsds = sdsnewlen(NULL,len);
0f151ef1 6177 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
f80dff62 6178 argv[j] = createObject(REDIS_STRING,argsds);
6179 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
6180 }
6181
6182 /* Command lookup */
6183 cmd = lookupCommand(argv[0]->ptr);
6184 if (!cmd) {
6185 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
6186 exit(1);
6187 }
6188 /* Try object sharing and encoding */
6189 if (server.shareobjects) {
6190 int j;
6191 for(j = 1; j < argc; j++)
6192 argv[j] = tryObjectSharing(argv[j]);
6193 }
6194 if (cmd->flags & REDIS_CMD_BULK)
6195 tryObjectEncoding(argv[argc-1]);
6196 /* Run the command in the context of a fake client */
6197 fakeClient->argc = argc;
6198 fakeClient->argv = argv;
6199 cmd->proc(fakeClient);
6200 /* Discard the reply objects list from the fake client */
6201 while(listLength(fakeClient->reply))
6202 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
6203 /* Clean up, ready for the next command */
6204 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
6205 zfree(argv);
6206 }
6207 fclose(fp);
6208 freeFakeClient(fakeClient);
6209 return REDIS_OK;
6210
6211readerr:
6212 if (feof(fp)) {
6213 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
6214 } else {
6215 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
6216 }
6217 exit(1);
6218fmterr:
6219 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
6220 exit(1);
6221}
6222
9d65a1bb 6223/* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6224static int fwriteBulk(FILE *fp, robj *obj) {
6225 char buf[128];
6226 obj = getDecodedObject(obj);
6227 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
6228 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
e96e4fbf 6229 if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0)
6230 goto err;
9d65a1bb 6231 if (fwrite("\r\n",2,1,fp) == 0) goto err;
6232 decrRefCount(obj);
6233 return 1;
6234err:
6235 decrRefCount(obj);
6236 return 0;
6237}
6238
6239/* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6240static int fwriteBulkDouble(FILE *fp, double d) {
6241 char buf[128], dbuf[128];
6242
6243 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
6244 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
6245 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6246 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
6247 return 1;
6248}
6249
6250/* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6251static int fwriteBulkLong(FILE *fp, long l) {
6252 char buf[128], lbuf[128];
6253
6254 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
6255 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
6256 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6257 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
6258 return 1;
6259}
6260
6261/* Write a sequence of commands able to fully rebuild the dataset into
6262 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6263static int rewriteAppendOnlyFile(char *filename) {
6264 dictIterator *di = NULL;
6265 dictEntry *de;
6266 FILE *fp;
6267 char tmpfile[256];
6268 int j;
6269 time_t now = time(NULL);
6270
6271 /* Note that we have to use a different temp name here compared to the
6272 * one used by rewriteAppendOnlyFileBackground() function. */
6273 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
6274 fp = fopen(tmpfile,"w");
6275 if (!fp) {
6276 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
6277 return REDIS_ERR;
6278 }
6279 for (j = 0; j < server.dbnum; j++) {
6280 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
6281 redisDb *db = server.db+j;
6282 dict *d = db->dict;
6283 if (dictSize(d) == 0) continue;
6284 di = dictGetIterator(d);
6285 if (!di) {
6286 fclose(fp);
6287 return REDIS_ERR;
6288 }
6289
6290 /* SELECT the new DB */
6291 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
85a83172 6292 if (fwriteBulkLong(fp,j) == 0) goto werr;
9d65a1bb 6293
6294 /* Iterate this DB writing every entry */
6295 while((de = dictNext(di)) != NULL) {
6296 robj *key = dictGetEntryKey(de);
6297 robj *o = dictGetEntryVal(de);
6298 time_t expiretime = getExpire(db,key);
6299
6300 /* Save the key and associated value */
9d65a1bb 6301 if (o->type == REDIS_STRING) {
6302 /* Emit a SET command */
6303 char cmd[]="*3\r\n$3\r\nSET\r\n";
6304 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6305 /* Key and value */
6306 if (fwriteBulk(fp,key) == 0) goto werr;
6307 if (fwriteBulk(fp,o) == 0) goto werr;
6308 } else if (o->type == REDIS_LIST) {
6309 /* Emit the RPUSHes needed to rebuild the list */
6310 list *list = o->ptr;
6311 listNode *ln;
6312
6313 listRewind(list);
6314 while((ln = listYield(list))) {
6315 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
6316 robj *eleobj = listNodeValue(ln);
6317
6318 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6319 if (fwriteBulk(fp,key) == 0) goto werr;
6320 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6321 }
6322 } else if (o->type == REDIS_SET) {
6323 /* Emit the SADDs needed to rebuild the set */
6324 dict *set = o->ptr;
6325 dictIterator *di = dictGetIterator(set);
6326 dictEntry *de;
6327
6328 while((de = dictNext(di)) != NULL) {
6329 char cmd[]="*3\r\n$4\r\nSADD\r\n";
6330 robj *eleobj = dictGetEntryKey(de);
6331
6332 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6333 if (fwriteBulk(fp,key) == 0) goto werr;
6334 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6335 }
6336 dictReleaseIterator(di);
6337 } else if (o->type == REDIS_ZSET) {
6338 /* Emit the ZADDs needed to rebuild the sorted set */
6339 zset *zs = o->ptr;
6340 dictIterator *di = dictGetIterator(zs->dict);
6341 dictEntry *de;
6342
6343 while((de = dictNext(di)) != NULL) {
6344 char cmd[]="*4\r\n$4\r\nZADD\r\n";
6345 robj *eleobj = dictGetEntryKey(de);
6346 double *score = dictGetEntryVal(de);
6347
6348 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6349 if (fwriteBulk(fp,key) == 0) goto werr;
6350 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
6351 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6352 }
6353 dictReleaseIterator(di);
6354 } else {
dfc5e96c 6355 redisAssert(0 != 0);
9d65a1bb 6356 }
6357 /* Save the expire time */
6358 if (expiretime != -1) {
e96e4fbf 6359 char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
9d65a1bb 6360 /* If this key is already expired skip it */
6361 if (expiretime < now) continue;
6362 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6363 if (fwriteBulk(fp,key) == 0) goto werr;
6364 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6365 }
6366 }
6367 dictReleaseIterator(di);
6368 }
6369
6370 /* Make sure data will not remain on the OS's output buffers */
6371 fflush(fp);
6372 fsync(fileno(fp));
6373 fclose(fp);
6374
6375 /* Use RENAME to make sure the DB file is changed atomically only
6376 * if the generate DB file is ok. */
6377 if (rename(tmpfile,filename) == -1) {
6378 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6379 unlink(tmpfile);
6380 return REDIS_ERR;
6381 }
6382 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6383 return REDIS_OK;
6384
6385werr:
6386 fclose(fp);
6387 unlink(tmpfile);
e96e4fbf 6388 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
9d65a1bb 6389 if (di) dictReleaseIterator(di);
6390 return REDIS_ERR;
6391}
6392
6393/* This is how rewriting of the append only file in background works:
6394 *
6395 * 1) The user calls BGREWRITEAOF
6396 * 2) Redis calls this function, that forks():
6397 * 2a) the child rewrite the append only file in a temp file.
6398 * 2b) the parent accumulates differences in server.bgrewritebuf.
6399 * 3) When the child finished '2a' exists.
6400 * 4) The parent will trap the exit code, if it's OK, will append the
6401 * data accumulated into server.bgrewritebuf into the temp file, and
6402 * finally will rename(2) the temp file in the actual file name.
6403 * The the new file is reopened as the new append only file. Profit!
6404 */
6405static int rewriteAppendOnlyFileBackground(void) {
6406 pid_t childpid;
6407
6408 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6409 if ((childpid = fork()) == 0) {
6410 /* Child */
6411 char tmpfile[256];
6412 close(server.fd);
6413
6414 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6415 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6416 exit(0);
6417 } else {
6418 exit(1);
6419 }
6420 } else {
6421 /* Parent */
6422 if (childpid == -1) {
6423 redisLog(REDIS_WARNING,
6424 "Can't rewrite append only file in background: fork: %s",
6425 strerror(errno));
6426 return REDIS_ERR;
6427 }
6428 redisLog(REDIS_NOTICE,
6429 "Background append only file rewriting started by pid %d",childpid);
6430 server.bgrewritechildpid = childpid;
85a83172 6431 /* We set appendseldb to -1 in order to force the next call to the
6432 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6433 * accumulated by the parent into server.bgrewritebuf will start
6434 * with a SELECT statement and it will be safe to merge. */
6435 server.appendseldb = -1;
9d65a1bb 6436 return REDIS_OK;
6437 }
6438 return REDIS_OK; /* unreached */
6439}
6440
6441static void bgrewriteaofCommand(redisClient *c) {
6442 if (server.bgrewritechildpid != -1) {
6443 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6444 return;
6445 }
6446 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
49b99ab4 6447 char *status = "+Background append only file rewriting started\r\n";
6448 addReplySds(c,sdsnew(status));
9d65a1bb 6449 } else {
6450 addReply(c,shared.err);
6451 }
6452}
6453
6454static void aofRemoveTempFile(pid_t childpid) {
6455 char tmpfile[256];
6456
6457 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6458 unlink(tmpfile);
6459}
6460
7f957c92 6461/* ================================= Debugging ============================== */
6462
6463static void debugCommand(redisClient *c) {
6464 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6465 *((char*)-1) = 'x';
210e29f7 6466 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6467 if (rdbSave(server.dbfilename) != REDIS_OK) {
6468 addReply(c,shared.err);
6469 return;
6470 }
6471 emptyDb();
6472 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6473 addReply(c,shared.err);
6474 return;
6475 }
6476 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6477 addReply(c,shared.ok);
71c2b467 6478 } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
6479 emptyDb();
6480 if (loadAppendOnlyFile(server.appendfilename) != REDIS_OK) {
6481 addReply(c,shared.err);
6482 return;
6483 }
6484 redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
6485 addReply(c,shared.ok);
333298da 6486 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6487 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6488 robj *key, *val;
6489
6490 if (!de) {
6491 addReply(c,shared.nokeyerr);
6492 return;
6493 }
6494 key = dictGetEntryKey(de);
6495 val = dictGetEntryVal(de);
6496 addReplySds(c,sdscatprintf(sdsempty(),
942a3961 6497 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
682ac724 6498 (void*)key, key->refcount, (void*)val, val->refcount,
6499 val->encoding));
7f957c92 6500 } else {
333298da 6501 addReplySds(c,sdsnew(
210e29f7 6502 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
7f957c92 6503 }
6504}
56906eef 6505
dfc5e96c 6506static void _redisAssert(char *estr) {
6507 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6508 redisLog(REDIS_WARNING,"==> %s\n",estr);
6509#ifdef HAVE_BACKTRACE
6510 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6511 *((char*)-1) = 'x';
6512#endif
6513}
6514
bcfc686d 6515/* =================================== Main! ================================ */
56906eef 6516
bcfc686d 6517#ifdef __linux__
6518int linuxOvercommitMemoryValue(void) {
6519 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6520 char buf[64];
56906eef 6521
bcfc686d 6522 if (!fp) return -1;
6523 if (fgets(buf,64,fp) == NULL) {
6524 fclose(fp);
6525 return -1;
6526 }
6527 fclose(fp);
56906eef 6528
bcfc686d 6529 return atoi(buf);
6530}
6531
6532void linuxOvercommitMemoryWarning(void) {
6533 if (linuxOvercommitMemoryValue() == 0) {
6534 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6535 }
6536}
6537#endif /* __linux__ */
6538
6539static void daemonize(void) {
6540 int fd;
6541 FILE *fp;
6542
6543 if (fork() != 0) exit(0); /* parent exits */
71c54b21 6544 printf("New pid: %d\n", getpid());
bcfc686d 6545 setsid(); /* create a new session */
6546
6547 /* Every output goes to /dev/null. If Redis is daemonized but
6548 * the 'logfile' is set to 'stdout' in the configuration file
6549 * it will not log at all. */
6550 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6551 dup2(fd, STDIN_FILENO);
6552 dup2(fd, STDOUT_FILENO);
6553 dup2(fd, STDERR_FILENO);
6554 if (fd > STDERR_FILENO) close(fd);
6555 }
6556 /* Try to write the pid file */
6557 fp = fopen(server.pidfile,"w");
6558 if (fp) {
6559 fprintf(fp,"%d\n",getpid());
6560 fclose(fp);
56906eef 6561 }
56906eef 6562}
6563
bcfc686d 6564int main(int argc, char **argv) {
6565 initServerConfig();
6566 if (argc == 2) {
6567 resetServerSaveParams();
6568 loadServerConfig(argv[1]);
6569 } else if (argc > 2) {
6570 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6571 exit(1);
6572 } else {
6573 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6574 }
bcfc686d 6575 if (server.daemonize) daemonize();
71c54b21 6576 initServer();
bcfc686d 6577 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6578#ifdef __linux__
6579 linuxOvercommitMemoryWarning();
6580#endif
6581 if (server.appendonly) {
6582 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6583 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6584 } else {
6585 if (rdbLoad(server.dbfilename) == REDIS_OK)
6586 redisLog(REDIS_NOTICE,"DB loaded from disk");
6587 }
6588 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
266373b2 6589 acceptHandler, NULL) == AE_ERR) oom("creating file event");
bcfc686d 6590 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6591 aeMain(server.el);
6592 aeDeleteEventLoop(server.el);
6593 return 0;
6594}
6595
6596/* ============================= Backtrace support ========================= */
6597
6598#ifdef HAVE_BACKTRACE
6599static char *findFuncName(void *pointer, unsigned long *offset);
6600
56906eef 6601static void *getMcontextEip(ucontext_t *uc) {
6602#if defined(__FreeBSD__)
6603 return (void*) uc->uc_mcontext.mc_eip;
6604#elif defined(__dietlibc__)
6605 return (void*) uc->uc_mcontext.eip;
06db1f50 6606#elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
da0a1620 6607 #if __x86_64__
6608 return (void*) uc->uc_mcontext->__ss.__rip;
6609 #else
56906eef 6610 return (void*) uc->uc_mcontext->__ss.__eip;
da0a1620 6611 #endif
06db1f50 6612#elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
cb7e07cc 6613 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
06db1f50 6614 return (void*) uc->uc_mcontext->__ss.__rip;
cbc59b38 6615 #else
6616 return (void*) uc->uc_mcontext->__ss.__eip;
6617 #endif
c04c9ac9 6618#elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
6619 return (void*) uc->uc_mcontext.gregs[REG_EIP]; /* Linux 32/64 bit */
b91cf5ef 6620#elif defined(__ia64__) /* Linux IA64 */
6621 return (void*) uc->uc_mcontext.sc_ip;
6622#else
6623 return NULL;
56906eef 6624#endif
6625}
6626
6627static void segvHandler(int sig, siginfo_t *info, void *secret) {
6628 void *trace[100];
6629 char **messages = NULL;
6630 int i, trace_size = 0;
6631 unsigned long offset=0;
56906eef 6632 ucontext_t *uc = (ucontext_t*) secret;
1c85b79f 6633 sds infostring;
56906eef 6634 REDIS_NOTUSED(info);
6635
6636 redisLog(REDIS_WARNING,
6637 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
1c85b79f 6638 infostring = genRedisInfoString();
6639 redisLog(REDIS_WARNING, "%s",infostring);
6640 /* It's not safe to sdsfree() the returned string under memory
6641 * corruption conditions. Let it leak as we are going to abort */
56906eef 6642
6643 trace_size = backtrace(trace, 100);
de96dbfe 6644 /* overwrite sigaction with caller's address */
b91cf5ef 6645 if (getMcontextEip(uc) != NULL) {
6646 trace[1] = getMcontextEip(uc);
6647 }
56906eef 6648 messages = backtrace_symbols(trace, trace_size);
fe3bbfbe 6649
d76412d1 6650 for (i=1; i<trace_size; ++i) {
56906eef 6651 char *fn = findFuncName(trace[i], &offset), *p;
6652
6653 p = strchr(messages[i],'+');
6654 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6655 redisLog(REDIS_WARNING,"%s", messages[i]);
6656 } else {
6657 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6658 }
6659 }
1c85b79f 6660 // free(messages); Don't call free() with possibly corrupted memory.
56906eef 6661 exit(0);
fe3bbfbe 6662}
56906eef 6663
6664static void setupSigSegvAction(void) {
6665 struct sigaction act;
6666
6667 sigemptyset (&act.sa_mask);
6668 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6669 * is used. Otherwise, sa_handler is used */
6670 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6671 act.sa_sigaction = segvHandler;
6672 sigaction (SIGSEGV, &act, NULL);
6673 sigaction (SIGBUS, &act, NULL);
12fea928 6674 sigaction (SIGFPE, &act, NULL);
6675 sigaction (SIGILL, &act, NULL);
6676 sigaction (SIGBUS, &act, NULL);
e65fdc78 6677 return;
56906eef 6678}
e65fdc78 6679
bcfc686d 6680#include "staticsymbols.h"
6681/* This function try to convert a pointer into a function name. It's used in
6682 * oreder to provide a backtrace under segmentation fault that's able to
6683 * display functions declared as static (otherwise the backtrace is useless). */
6684static char *findFuncName(void *pointer, unsigned long *offset){
6685 int i, ret = -1;
6686 unsigned long off, minoff = 0;
ed9b544e 6687
bcfc686d 6688 /* Try to match against the Symbol with the smallest offset */
6689 for (i=0; symsTable[i].pointer; i++) {
6690 unsigned long lp = (unsigned long) pointer;
0bc03378 6691
bcfc686d 6692 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6693 off=lp-symsTable[i].pointer;
6694 if (ret < 0 || off < minoff) {
6695 minoff=off;
6696 ret=i;
6697 }
6698 }
0bc03378 6699 }
bcfc686d 6700 if (ret == -1) return NULL;
6701 *offset = minoff;
6702 return symsTable[ret].name;
0bc03378 6703}
bcfc686d 6704#else /* HAVE_BACKTRACE */
6705static void setupSigSegvAction(void) {
0bc03378 6706}
bcfc686d 6707#endif /* HAVE_BACKTRACE */
0bc03378 6708
ed9b544e 6709
ed9b544e 6710
bcfc686d 6711/* The End */
6712
6713
ed9b544e 6714