]> git.saurik.com Git - redis.git/blob - redis.c
MULTI/EXEC first implementation
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.1.95"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Client flags */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160 #define REDIS_MULTI 16 /* This client is in a MULTI context */
161
162 /* Slave replication state - slave side */
163 #define REDIS_REPL_NONE 0 /* No active replication */
164 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
165 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
166
167 /* Slave replication state - from the point of view of master
168 * Note that in SEND_BULK and ONLINE state the slave receives new updates
169 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
170 * to start the next background saving in order to send updates to it. */
171 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
172 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
173 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
174 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
175
176 /* List related stuff */
177 #define REDIS_HEAD 0
178 #define REDIS_TAIL 1
179
180 /* Sort operations */
181 #define REDIS_SORT_GET 0
182 #define REDIS_SORT_ASC 1
183 #define REDIS_SORT_DESC 2
184 #define REDIS_SORTKEY_MAX 1024
185
186 /* Log levels */
187 #define REDIS_DEBUG 0
188 #define REDIS_NOTICE 1
189 #define REDIS_WARNING 2
190
191 /* Anti-warning macro... */
192 #define REDIS_NOTUSED(V) ((void) V)
193
194 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
195 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
196
197 /* Append only defines */
198 #define APPENDFSYNC_NO 0
199 #define APPENDFSYNC_ALWAYS 1
200 #define APPENDFSYNC_EVERYSEC 2
201
202 /* We can print the stacktrace, so our assert is defined this way: */
203 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
204 static void _redisAssert(char *estr);
205
206 /*================================= Data types ============================== */
207
208 /* A redis object, that is a type able to hold a string / list / set */
209 typedef struct redisObject {
210 void *ptr;
211 unsigned char type;
212 unsigned char encoding;
213 unsigned char notused[2];
214 int refcount;
215 } robj;
216
217 /* Macro used to initalize a Redis object allocated on the stack.
218 * Note that this macro is taken near the structure definition to make sure
219 * we'll update it when the structure is changed, to avoid bugs like
220 * bug #85 introduced exactly in this way. */
221 #define initStaticStringObject(_var,_ptr) do { \
222 _var.refcount = 1; \
223 _var.type = REDIS_STRING; \
224 _var.encoding = REDIS_ENCODING_RAW; \
225 _var.ptr = _ptr; \
226 } while(0);
227
228 typedef struct redisDb {
229 dict *dict;
230 dict *expires;
231 int id;
232 } redisDb;
233
234 /* Client MULTI/EXEC state */
235 typedef struct multiCmd {
236 robj **argv;
237 int argc;
238 struct redisCommand *cmd;
239 } multiCmd;
240
241 typedef struct multiState {
242 multiCmd *commands; /* Array of MULTI commands */
243 int count; /* Total number of MULTI commands */
244 } multiState;
245
246 /* With multiplexing we need to take per-clinet state.
247 * Clients are taken in a liked list. */
248 typedef struct redisClient {
249 int fd;
250 redisDb *db;
251 int dictid;
252 sds querybuf;
253 robj **argv, **mbargv;
254 int argc, mbargc;
255 int bulklen; /* bulk read len. -1 if not in bulk read mode */
256 int multibulk; /* multi bulk command format active */
257 list *reply;
258 int sentlen;
259 time_t lastinteraction; /* time of the last interaction, used for timeout */
260 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
261 /* REDIS_MULTI */
262 int slaveseldb; /* slave selected db, if this client is a slave */
263 int authenticated; /* when requirepass is non-NULL */
264 int replstate; /* replication state if this is a slave */
265 int repldbfd; /* replication DB file descriptor */
266 long repldboff; /* replication DB file offset */
267 off_t repldbsize; /* replication DB file size */
268 multiState mstate; /* MULTI/EXEC state */
269 } redisClient;
270
271 struct saveparam {
272 time_t seconds;
273 int changes;
274 };
275
276 /* Global server state structure */
277 struct redisServer {
278 int port;
279 int fd;
280 redisDb *db;
281 dict *sharingpool;
282 unsigned int sharingpoolsize;
283 long long dirty; /* changes to DB from the last save */
284 list *clients;
285 list *slaves, *monitors;
286 char neterr[ANET_ERR_LEN];
287 aeEventLoop *el;
288 int cronloops; /* number of times the cron function run */
289 list *objfreelist; /* A list of freed objects to avoid malloc() */
290 time_t lastsave; /* Unix time of last save succeeede */
291 size_t usedmemory; /* Used memory in megabytes */
292 /* Fields used only for stats */
293 time_t stat_starttime; /* server start time */
294 long long stat_numcommands; /* number of processed commands */
295 long long stat_numconnections; /* number of connections received */
296 /* Configuration */
297 int verbosity;
298 int glueoutputbuf;
299 int maxidletime;
300 int dbnum;
301 int daemonize;
302 int appendonly;
303 int appendfsync;
304 time_t lastfsync;
305 int appendfd;
306 int appendseldb;
307 char *pidfile;
308 pid_t bgsavechildpid;
309 pid_t bgrewritechildpid;
310 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
311 struct saveparam *saveparams;
312 int saveparamslen;
313 char *logfile;
314 char *bindaddr;
315 char *dbfilename;
316 char *appendfilename;
317 char *requirepass;
318 int shareobjects;
319 int rdbcompression;
320 /* Replication related */
321 int isslave;
322 char *masterauth;
323 char *masterhost;
324 int masterport;
325 redisClient *master; /* client that is master for this slave */
326 int replstate;
327 unsigned int maxclients;
328 unsigned long maxmemory;
329 /* Sort parameters - qsort_r() is only available under BSD so we
330 * have to take this state global, in order to pass it to sortCompare() */
331 int sort_desc;
332 int sort_alpha;
333 int sort_bypattern;
334 };
335
336 typedef void redisCommandProc(redisClient *c);
337 struct redisCommand {
338 char *name;
339 redisCommandProc *proc;
340 int arity;
341 int flags;
342 };
343
344 struct redisFunctionSym {
345 char *name;
346 unsigned long pointer;
347 };
348
349 typedef struct _redisSortObject {
350 robj *obj;
351 union {
352 double score;
353 robj *cmpobj;
354 } u;
355 } redisSortObject;
356
357 typedef struct _redisSortOperation {
358 int type;
359 robj *pattern;
360 } redisSortOperation;
361
362 /* ZSETs use a specialized version of Skiplists */
363
364 typedef struct zskiplistNode {
365 struct zskiplistNode **forward;
366 struct zskiplistNode *backward;
367 double score;
368 robj *obj;
369 } zskiplistNode;
370
371 typedef struct zskiplist {
372 struct zskiplistNode *header, *tail;
373 unsigned long length;
374 int level;
375 } zskiplist;
376
377 typedef struct zset {
378 dict *dict;
379 zskiplist *zsl;
380 } zset;
381
382 /* Our shared "common" objects */
383
384 struct sharedObjectsStruct {
385 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
386 *colon, *nullbulk, *nullmultibulk, *queued,
387 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
388 *outofrangeerr, *plus,
389 *select0, *select1, *select2, *select3, *select4,
390 *select5, *select6, *select7, *select8, *select9;
391 } shared;
392
393 /* Global vars that are actally used as constants. The following double
394 * values are used for double on-disk serialization, and are initialized
395 * at runtime to avoid strange compiler optimizations. */
396
397 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
398
399 /*================================ Prototypes =============================== */
400
401 static void freeStringObject(robj *o);
402 static void freeListObject(robj *o);
403 static void freeSetObject(robj *o);
404 static void decrRefCount(void *o);
405 static robj *createObject(int type, void *ptr);
406 static void freeClient(redisClient *c);
407 static int rdbLoad(char *filename);
408 static void addReply(redisClient *c, robj *obj);
409 static void addReplySds(redisClient *c, sds s);
410 static void incrRefCount(robj *o);
411 static int rdbSaveBackground(char *filename);
412 static robj *createStringObject(char *ptr, size_t len);
413 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
414 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
415 static int syncWithMaster(void);
416 static robj *tryObjectSharing(robj *o);
417 static int tryObjectEncoding(robj *o);
418 static robj *getDecodedObject(robj *o);
419 static int removeExpire(redisDb *db, robj *key);
420 static int expireIfNeeded(redisDb *db, robj *key);
421 static int deleteIfVolatile(redisDb *db, robj *key);
422 static int deleteKey(redisDb *db, robj *key);
423 static time_t getExpire(redisDb *db, robj *key);
424 static int setExpire(redisDb *db, robj *key, time_t when);
425 static void updateSlavesWaitingBgsave(int bgsaveerr);
426 static void freeMemoryIfNeeded(void);
427 static int processCommand(redisClient *c);
428 static void setupSigSegvAction(void);
429 static void rdbRemoveTempFile(pid_t childpid);
430 static void aofRemoveTempFile(pid_t childpid);
431 static size_t stringObjectLen(robj *o);
432 static void processInputBuffer(redisClient *c);
433 static zskiplist *zslCreate(void);
434 static void zslFree(zskiplist *zsl);
435 static void zslInsert(zskiplist *zsl, double score, robj *obj);
436 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
437 static void initClientMultiState(redisClient *c);
438 static void freeClientMultiState(redisClient *c);
439 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd);
440
441 static void authCommand(redisClient *c);
442 static void pingCommand(redisClient *c);
443 static void echoCommand(redisClient *c);
444 static void setCommand(redisClient *c);
445 static void setnxCommand(redisClient *c);
446 static void getCommand(redisClient *c);
447 static void delCommand(redisClient *c);
448 static void existsCommand(redisClient *c);
449 static void incrCommand(redisClient *c);
450 static void decrCommand(redisClient *c);
451 static void incrbyCommand(redisClient *c);
452 static void decrbyCommand(redisClient *c);
453 static void selectCommand(redisClient *c);
454 static void randomkeyCommand(redisClient *c);
455 static void keysCommand(redisClient *c);
456 static void dbsizeCommand(redisClient *c);
457 static void lastsaveCommand(redisClient *c);
458 static void saveCommand(redisClient *c);
459 static void bgsaveCommand(redisClient *c);
460 static void bgrewriteaofCommand(redisClient *c);
461 static void shutdownCommand(redisClient *c);
462 static void moveCommand(redisClient *c);
463 static void renameCommand(redisClient *c);
464 static void renamenxCommand(redisClient *c);
465 static void lpushCommand(redisClient *c);
466 static void rpushCommand(redisClient *c);
467 static void lpopCommand(redisClient *c);
468 static void rpopCommand(redisClient *c);
469 static void llenCommand(redisClient *c);
470 static void lindexCommand(redisClient *c);
471 static void lrangeCommand(redisClient *c);
472 static void ltrimCommand(redisClient *c);
473 static void typeCommand(redisClient *c);
474 static void lsetCommand(redisClient *c);
475 static void saddCommand(redisClient *c);
476 static void sremCommand(redisClient *c);
477 static void smoveCommand(redisClient *c);
478 static void sismemberCommand(redisClient *c);
479 static void scardCommand(redisClient *c);
480 static void spopCommand(redisClient *c);
481 static void srandmemberCommand(redisClient *c);
482 static void sinterCommand(redisClient *c);
483 static void sinterstoreCommand(redisClient *c);
484 static void sunionCommand(redisClient *c);
485 static void sunionstoreCommand(redisClient *c);
486 static void sdiffCommand(redisClient *c);
487 static void sdiffstoreCommand(redisClient *c);
488 static void syncCommand(redisClient *c);
489 static void flushdbCommand(redisClient *c);
490 static void flushallCommand(redisClient *c);
491 static void sortCommand(redisClient *c);
492 static void lremCommand(redisClient *c);
493 static void rpoplpushcommand(redisClient *c);
494 static void infoCommand(redisClient *c);
495 static void mgetCommand(redisClient *c);
496 static void monitorCommand(redisClient *c);
497 static void expireCommand(redisClient *c);
498 static void expireatCommand(redisClient *c);
499 static void getsetCommand(redisClient *c);
500 static void ttlCommand(redisClient *c);
501 static void slaveofCommand(redisClient *c);
502 static void debugCommand(redisClient *c);
503 static void msetCommand(redisClient *c);
504 static void msetnxCommand(redisClient *c);
505 static void zaddCommand(redisClient *c);
506 static void zincrbyCommand(redisClient *c);
507 static void zrangeCommand(redisClient *c);
508 static void zrangebyscoreCommand(redisClient *c);
509 static void zrevrangeCommand(redisClient *c);
510 static void zcardCommand(redisClient *c);
511 static void zremCommand(redisClient *c);
512 static void zscoreCommand(redisClient *c);
513 static void zremrangebyscoreCommand(redisClient *c);
514 static void multiCommand(redisClient *c);
515 static void execCommand(redisClient *c);
516
517 /*================================= Globals ================================= */
518
519 /* Global vars */
520 static struct redisServer server; /* server global state */
521 static struct redisCommand cmdTable[] = {
522 {"get",getCommand,2,REDIS_CMD_INLINE},
523 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
524 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
525 {"del",delCommand,-2,REDIS_CMD_INLINE},
526 {"exists",existsCommand,2,REDIS_CMD_INLINE},
527 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
528 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
529 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
530 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
531 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
532 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
533 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
534 {"llen",llenCommand,2,REDIS_CMD_INLINE},
535 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
536 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
537 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
538 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
539 {"lrem",lremCommand,4,REDIS_CMD_BULK},
540 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
541 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
542 {"srem",sremCommand,3,REDIS_CMD_BULK},
543 {"smove",smoveCommand,4,REDIS_CMD_BULK},
544 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
545 {"scard",scardCommand,2,REDIS_CMD_INLINE},
546 {"spop",spopCommand,2,REDIS_CMD_INLINE},
547 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
548 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
549 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
550 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
551 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
552 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
553 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
554 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
555 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
556 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
557 {"zrem",zremCommand,3,REDIS_CMD_BULK},
558 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
559 {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE},
560 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
561 {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE},
562 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
563 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
564 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
565 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
566 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
567 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
568 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
569 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
570 {"select",selectCommand,2,REDIS_CMD_INLINE},
571 {"move",moveCommand,3,REDIS_CMD_INLINE},
572 {"rename",renameCommand,3,REDIS_CMD_INLINE},
573 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
574 {"expire",expireCommand,3,REDIS_CMD_INLINE},
575 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
576 {"keys",keysCommand,2,REDIS_CMD_INLINE},
577 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
578 {"auth",authCommand,2,REDIS_CMD_INLINE},
579 {"ping",pingCommand,1,REDIS_CMD_INLINE},
580 {"echo",echoCommand,2,REDIS_CMD_BULK},
581 {"save",saveCommand,1,REDIS_CMD_INLINE},
582 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
583 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
584 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
585 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
586 {"type",typeCommand,2,REDIS_CMD_INLINE},
587 {"multi",multiCommand,1,REDIS_CMD_INLINE},
588 {"exec",execCommand,1,REDIS_CMD_INLINE},
589 {"sync",syncCommand,1,REDIS_CMD_INLINE},
590 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
591 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
592 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
593 {"info",infoCommand,1,REDIS_CMD_INLINE},
594 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
595 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
596 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
597 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
598 {NULL,NULL,0,0}
599 };
600
601 /*============================ Utility functions ============================ */
602
603 /* Glob-style pattern matching. */
604 int stringmatchlen(const char *pattern, int patternLen,
605 const char *string, int stringLen, int nocase)
606 {
607 while(patternLen) {
608 switch(pattern[0]) {
609 case '*':
610 while (pattern[1] == '*') {
611 pattern++;
612 patternLen--;
613 }
614 if (patternLen == 1)
615 return 1; /* match */
616 while(stringLen) {
617 if (stringmatchlen(pattern+1, patternLen-1,
618 string, stringLen, nocase))
619 return 1; /* match */
620 string++;
621 stringLen--;
622 }
623 return 0; /* no match */
624 break;
625 case '?':
626 if (stringLen == 0)
627 return 0; /* no match */
628 string++;
629 stringLen--;
630 break;
631 case '[':
632 {
633 int not, match;
634
635 pattern++;
636 patternLen--;
637 not = pattern[0] == '^';
638 if (not) {
639 pattern++;
640 patternLen--;
641 }
642 match = 0;
643 while(1) {
644 if (pattern[0] == '\\') {
645 pattern++;
646 patternLen--;
647 if (pattern[0] == string[0])
648 match = 1;
649 } else if (pattern[0] == ']') {
650 break;
651 } else if (patternLen == 0) {
652 pattern--;
653 patternLen++;
654 break;
655 } else if (pattern[1] == '-' && patternLen >= 3) {
656 int start = pattern[0];
657 int end = pattern[2];
658 int c = string[0];
659 if (start > end) {
660 int t = start;
661 start = end;
662 end = t;
663 }
664 if (nocase) {
665 start = tolower(start);
666 end = tolower(end);
667 c = tolower(c);
668 }
669 pattern += 2;
670 patternLen -= 2;
671 if (c >= start && c <= end)
672 match = 1;
673 } else {
674 if (!nocase) {
675 if (pattern[0] == string[0])
676 match = 1;
677 } else {
678 if (tolower((int)pattern[0]) == tolower((int)string[0]))
679 match = 1;
680 }
681 }
682 pattern++;
683 patternLen--;
684 }
685 if (not)
686 match = !match;
687 if (!match)
688 return 0; /* no match */
689 string++;
690 stringLen--;
691 break;
692 }
693 case '\\':
694 if (patternLen >= 2) {
695 pattern++;
696 patternLen--;
697 }
698 /* fall through */
699 default:
700 if (!nocase) {
701 if (pattern[0] != string[0])
702 return 0; /* no match */
703 } else {
704 if (tolower((int)pattern[0]) != tolower((int)string[0]))
705 return 0; /* no match */
706 }
707 string++;
708 stringLen--;
709 break;
710 }
711 pattern++;
712 patternLen--;
713 if (stringLen == 0) {
714 while(*pattern == '*') {
715 pattern++;
716 patternLen--;
717 }
718 break;
719 }
720 }
721 if (patternLen == 0 && stringLen == 0)
722 return 1;
723 return 0;
724 }
725
726 static void redisLog(int level, const char *fmt, ...) {
727 va_list ap;
728 FILE *fp;
729
730 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
731 if (!fp) return;
732
733 va_start(ap, fmt);
734 if (level >= server.verbosity) {
735 char *c = ".-*";
736 char buf[64];
737 time_t now;
738
739 now = time(NULL);
740 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
741 fprintf(fp,"%s %c ",buf,c[level]);
742 vfprintf(fp, fmt, ap);
743 fprintf(fp,"\n");
744 fflush(fp);
745 }
746 va_end(ap);
747
748 if (server.logfile) fclose(fp);
749 }
750
751 /*====================== Hash table type implementation ==================== */
752
753 /* This is an hash table type that uses the SDS dynamic strings libary as
754 * keys and radis objects as values (objects can hold SDS strings,
755 * lists, sets). */
756
757 static void dictVanillaFree(void *privdata, void *val)
758 {
759 DICT_NOTUSED(privdata);
760 zfree(val);
761 }
762
763 static int sdsDictKeyCompare(void *privdata, const void *key1,
764 const void *key2)
765 {
766 int l1,l2;
767 DICT_NOTUSED(privdata);
768
769 l1 = sdslen((sds)key1);
770 l2 = sdslen((sds)key2);
771 if (l1 != l2) return 0;
772 return memcmp(key1, key2, l1) == 0;
773 }
774
775 static void dictRedisObjectDestructor(void *privdata, void *val)
776 {
777 DICT_NOTUSED(privdata);
778
779 decrRefCount(val);
780 }
781
782 static int dictObjKeyCompare(void *privdata, const void *key1,
783 const void *key2)
784 {
785 const robj *o1 = key1, *o2 = key2;
786 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
787 }
788
789 static unsigned int dictObjHash(const void *key) {
790 const robj *o = key;
791 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
792 }
793
794 static int dictEncObjKeyCompare(void *privdata, const void *key1,
795 const void *key2)
796 {
797 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
798 int cmp;
799
800 o1 = getDecodedObject(o1);
801 o2 = getDecodedObject(o2);
802 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
803 decrRefCount(o1);
804 decrRefCount(o2);
805 return cmp;
806 }
807
808 static unsigned int dictEncObjHash(const void *key) {
809 robj *o = (robj*) key;
810
811 o = getDecodedObject(o);
812 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
813 decrRefCount(o);
814 return hash;
815 }
816
817 static dictType setDictType = {
818 dictEncObjHash, /* hash function */
819 NULL, /* key dup */
820 NULL, /* val dup */
821 dictEncObjKeyCompare, /* key compare */
822 dictRedisObjectDestructor, /* key destructor */
823 NULL /* val destructor */
824 };
825
826 static dictType zsetDictType = {
827 dictEncObjHash, /* hash function */
828 NULL, /* key dup */
829 NULL, /* val dup */
830 dictEncObjKeyCompare, /* key compare */
831 dictRedisObjectDestructor, /* key destructor */
832 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
833 };
834
835 static dictType hashDictType = {
836 dictObjHash, /* hash function */
837 NULL, /* key dup */
838 NULL, /* val dup */
839 dictObjKeyCompare, /* key compare */
840 dictRedisObjectDestructor, /* key destructor */
841 dictRedisObjectDestructor /* val destructor */
842 };
843
844 /* ========================= Random utility functions ======================= */
845
846 /* Redis generally does not try to recover from out of memory conditions
847 * when allocating objects or strings, it is not clear if it will be possible
848 * to report this condition to the client since the networking layer itself
849 * is based on heap allocation for send buffers, so we simply abort.
850 * At least the code will be simpler to read... */
851 static void oom(const char *msg) {
852 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
853 sleep(1);
854 abort();
855 }
856
857 /* ====================== Redis server networking stuff ===================== */
858 static void closeTimedoutClients(void) {
859 redisClient *c;
860 listNode *ln;
861 time_t now = time(NULL);
862
863 listRewind(server.clients);
864 while ((ln = listYield(server.clients)) != NULL) {
865 c = listNodeValue(ln);
866 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
867 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
868 (now - c->lastinteraction > server.maxidletime)) {
869 redisLog(REDIS_DEBUG,"Closing idle client");
870 freeClient(c);
871 }
872 }
873 }
874
875 static int htNeedsResize(dict *dict) {
876 long long size, used;
877
878 size = dictSlots(dict);
879 used = dictSize(dict);
880 return (size && used && size > DICT_HT_INITIAL_SIZE &&
881 (used*100/size < REDIS_HT_MINFILL));
882 }
883
884 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
885 * we resize the hash table to save memory */
886 static void tryResizeHashTables(void) {
887 int j;
888
889 for (j = 0; j < server.dbnum; j++) {
890 if (htNeedsResize(server.db[j].dict)) {
891 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
892 dictResize(server.db[j].dict);
893 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
894 }
895 if (htNeedsResize(server.db[j].expires))
896 dictResize(server.db[j].expires);
897 }
898 }
899
900 /* A background saving child (BGSAVE) terminated its work. Handle this. */
901 void backgroundSaveDoneHandler(int statloc) {
902 int exitcode = WEXITSTATUS(statloc);
903 int bysignal = WIFSIGNALED(statloc);
904
905 if (!bysignal && exitcode == 0) {
906 redisLog(REDIS_NOTICE,
907 "Background saving terminated with success");
908 server.dirty = 0;
909 server.lastsave = time(NULL);
910 } else if (!bysignal && exitcode != 0) {
911 redisLog(REDIS_WARNING, "Background saving error");
912 } else {
913 redisLog(REDIS_WARNING,
914 "Background saving terminated by signal");
915 rdbRemoveTempFile(server.bgsavechildpid);
916 }
917 server.bgsavechildpid = -1;
918 /* Possibly there are slaves waiting for a BGSAVE in order to be served
919 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
920 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
921 }
922
923 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
924 * Handle this. */
925 void backgroundRewriteDoneHandler(int statloc) {
926 int exitcode = WEXITSTATUS(statloc);
927 int bysignal = WIFSIGNALED(statloc);
928
929 if (!bysignal && exitcode == 0) {
930 int fd;
931 char tmpfile[256];
932
933 redisLog(REDIS_NOTICE,
934 "Background append only file rewriting terminated with success");
935 /* Now it's time to flush the differences accumulated by the parent */
936 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
937 fd = open(tmpfile,O_WRONLY|O_APPEND);
938 if (fd == -1) {
939 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
940 goto cleanup;
941 }
942 /* Flush our data... */
943 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
944 (signed) sdslen(server.bgrewritebuf)) {
945 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
946 close(fd);
947 goto cleanup;
948 }
949 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
950 /* Now our work is to rename the temp file into the stable file. And
951 * switch the file descriptor used by the server for append only. */
952 if (rename(tmpfile,server.appendfilename) == -1) {
953 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
954 close(fd);
955 goto cleanup;
956 }
957 /* Mission completed... almost */
958 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
959 if (server.appendfd != -1) {
960 /* If append only is actually enabled... */
961 close(server.appendfd);
962 server.appendfd = fd;
963 fsync(fd);
964 server.appendseldb = -1; /* Make sure it will issue SELECT */
965 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
966 } else {
967 /* If append only is disabled we just generate a dump in this
968 * format. Why not? */
969 close(fd);
970 }
971 } else if (!bysignal && exitcode != 0) {
972 redisLog(REDIS_WARNING, "Background append only file rewriting error");
973 } else {
974 redisLog(REDIS_WARNING,
975 "Background append only file rewriting terminated by signal");
976 }
977 cleanup:
978 sdsfree(server.bgrewritebuf);
979 server.bgrewritebuf = sdsempty();
980 aofRemoveTempFile(server.bgrewritechildpid);
981 server.bgrewritechildpid = -1;
982 }
983
984 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
985 int j, loops = server.cronloops++;
986 REDIS_NOTUSED(eventLoop);
987 REDIS_NOTUSED(id);
988 REDIS_NOTUSED(clientData);
989
990 /* Update the global state with the amount of used memory */
991 server.usedmemory = zmalloc_used_memory();
992
993 /* Show some info about non-empty databases */
994 for (j = 0; j < server.dbnum; j++) {
995 long long size, used, vkeys;
996
997 size = dictSlots(server.db[j].dict);
998 used = dictSize(server.db[j].dict);
999 vkeys = dictSize(server.db[j].expires);
1000 if (!(loops % 5) && (used || vkeys)) {
1001 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
1002 /* dictPrintStats(server.dict); */
1003 }
1004 }
1005
1006 /* We don't want to resize the hash tables while a bacground saving
1007 * is in progress: the saving child is created using fork() that is
1008 * implemented with a copy-on-write semantic in most modern systems, so
1009 * if we resize the HT while there is the saving child at work actually
1010 * a lot of memory movements in the parent will cause a lot of pages
1011 * copied. */
1012 if (server.bgsavechildpid == -1) tryResizeHashTables();
1013
1014 /* Show information about connected clients */
1015 if (!(loops % 5)) {
1016 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1017 listLength(server.clients)-listLength(server.slaves),
1018 listLength(server.slaves),
1019 server.usedmemory,
1020 dictSize(server.sharingpool));
1021 }
1022
1023 /* Close connections of timedout clients */
1024 if (server.maxidletime && !(loops % 10))
1025 closeTimedoutClients();
1026
1027 /* Check if a background saving or AOF rewrite in progress terminated */
1028 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
1029 int statloc;
1030 pid_t pid;
1031
1032 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1033 if (pid == server.bgsavechildpid) {
1034 backgroundSaveDoneHandler(statloc);
1035 } else {
1036 backgroundRewriteDoneHandler(statloc);
1037 }
1038 }
1039 } else {
1040 /* If there is not a background saving in progress check if
1041 * we have to save now */
1042 time_t now = time(NULL);
1043 for (j = 0; j < server.saveparamslen; j++) {
1044 struct saveparam *sp = server.saveparams+j;
1045
1046 if (server.dirty >= sp->changes &&
1047 now-server.lastsave > sp->seconds) {
1048 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1049 sp->changes, sp->seconds);
1050 rdbSaveBackground(server.dbfilename);
1051 break;
1052 }
1053 }
1054 }
1055
1056 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1057 * will use few CPU cycles if there are few expiring keys, otherwise
1058 * it will get more aggressive to avoid that too much memory is used by
1059 * keys that can be removed from the keyspace. */
1060 for (j = 0; j < server.dbnum; j++) {
1061 int expired;
1062 redisDb *db = server.db+j;
1063
1064 /* Continue to expire if at the end of the cycle more than 25%
1065 * of the keys were expired. */
1066 do {
1067 int num = dictSize(db->expires);
1068 time_t now = time(NULL);
1069
1070 expired = 0;
1071 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1072 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1073 while (num--) {
1074 dictEntry *de;
1075 time_t t;
1076
1077 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1078 t = (time_t) dictGetEntryVal(de);
1079 if (now > t) {
1080 deleteKey(db,dictGetEntryKey(de));
1081 expired++;
1082 }
1083 }
1084 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1085 }
1086
1087 /* Check if we should connect to a MASTER */
1088 if (server.replstate == REDIS_REPL_CONNECT) {
1089 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1090 if (syncWithMaster() == REDIS_OK) {
1091 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1092 }
1093 }
1094 return 1000;
1095 }
1096
1097 static void createSharedObjects(void) {
1098 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1099 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1100 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1101 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1102 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1103 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1104 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1105 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1106 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1107 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1108 shared.queued = createObject(REDIS_STRING,sdsnew("+QUEUED\r\n"));
1109 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1110 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1111 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1112 "-ERR no such key\r\n"));
1113 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1114 "-ERR syntax error\r\n"));
1115 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1116 "-ERR source and destination objects are the same\r\n"));
1117 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1118 "-ERR index out of range\r\n"));
1119 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1120 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1121 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1122 shared.select0 = createStringObject("select 0\r\n",10);
1123 shared.select1 = createStringObject("select 1\r\n",10);
1124 shared.select2 = createStringObject("select 2\r\n",10);
1125 shared.select3 = createStringObject("select 3\r\n",10);
1126 shared.select4 = createStringObject("select 4\r\n",10);
1127 shared.select5 = createStringObject("select 5\r\n",10);
1128 shared.select6 = createStringObject("select 6\r\n",10);
1129 shared.select7 = createStringObject("select 7\r\n",10);
1130 shared.select8 = createStringObject("select 8\r\n",10);
1131 shared.select9 = createStringObject("select 9\r\n",10);
1132 }
1133
1134 static void appendServerSaveParams(time_t seconds, int changes) {
1135 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1136 server.saveparams[server.saveparamslen].seconds = seconds;
1137 server.saveparams[server.saveparamslen].changes = changes;
1138 server.saveparamslen++;
1139 }
1140
1141 static void resetServerSaveParams() {
1142 zfree(server.saveparams);
1143 server.saveparams = NULL;
1144 server.saveparamslen = 0;
1145 }
1146
1147 static void initServerConfig() {
1148 server.dbnum = REDIS_DEFAULT_DBNUM;
1149 server.port = REDIS_SERVERPORT;
1150 server.verbosity = REDIS_DEBUG;
1151 server.maxidletime = REDIS_MAXIDLETIME;
1152 server.saveparams = NULL;
1153 server.logfile = NULL; /* NULL = log on standard output */
1154 server.bindaddr = NULL;
1155 server.glueoutputbuf = 1;
1156 server.daemonize = 0;
1157 server.appendonly = 0;
1158 server.appendfsync = APPENDFSYNC_ALWAYS;
1159 server.lastfsync = time(NULL);
1160 server.appendfd = -1;
1161 server.appendseldb = -1; /* Make sure the first time will not match */
1162 server.pidfile = "/var/run/redis.pid";
1163 server.dbfilename = "dump.rdb";
1164 server.appendfilename = "appendonly.aof";
1165 server.requirepass = NULL;
1166 server.shareobjects = 0;
1167 server.rdbcompression = 1;
1168 server.sharingpoolsize = 1024;
1169 server.maxclients = 0;
1170 server.maxmemory = 0;
1171 resetServerSaveParams();
1172
1173 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1174 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1175 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1176 /* Replication related */
1177 server.isslave = 0;
1178 server.masterauth = NULL;
1179 server.masterhost = NULL;
1180 server.masterport = 6379;
1181 server.master = NULL;
1182 server.replstate = REDIS_REPL_NONE;
1183
1184 /* Double constants initialization */
1185 R_Zero = 0.0;
1186 R_PosInf = 1.0/R_Zero;
1187 R_NegInf = -1.0/R_Zero;
1188 R_Nan = R_Zero/R_Zero;
1189 }
1190
1191 static void initServer() {
1192 int j;
1193
1194 signal(SIGHUP, SIG_IGN);
1195 signal(SIGPIPE, SIG_IGN);
1196 setupSigSegvAction();
1197
1198 server.clients = listCreate();
1199 server.slaves = listCreate();
1200 server.monitors = listCreate();
1201 server.objfreelist = listCreate();
1202 createSharedObjects();
1203 server.el = aeCreateEventLoop();
1204 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1205 server.sharingpool = dictCreate(&setDictType,NULL);
1206 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1207 if (server.fd == -1) {
1208 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1209 exit(1);
1210 }
1211 for (j = 0; j < server.dbnum; j++) {
1212 server.db[j].dict = dictCreate(&hashDictType,NULL);
1213 server.db[j].expires = dictCreate(&setDictType,NULL);
1214 server.db[j].id = j;
1215 }
1216 server.cronloops = 0;
1217 server.bgsavechildpid = -1;
1218 server.bgrewritechildpid = -1;
1219 server.bgrewritebuf = sdsempty();
1220 server.lastsave = time(NULL);
1221 server.dirty = 0;
1222 server.usedmemory = 0;
1223 server.stat_numcommands = 0;
1224 server.stat_numconnections = 0;
1225 server.stat_starttime = time(NULL);
1226 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1227
1228 if (server.appendonly) {
1229 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1230 if (server.appendfd == -1) {
1231 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1232 strerror(errno));
1233 exit(1);
1234 }
1235 }
1236 }
1237
1238 /* Empty the whole database */
1239 static long long emptyDb() {
1240 int j;
1241 long long removed = 0;
1242
1243 for (j = 0; j < server.dbnum; j++) {
1244 removed += dictSize(server.db[j].dict);
1245 dictEmpty(server.db[j].dict);
1246 dictEmpty(server.db[j].expires);
1247 }
1248 return removed;
1249 }
1250
1251 static int yesnotoi(char *s) {
1252 if (!strcasecmp(s,"yes")) return 1;
1253 else if (!strcasecmp(s,"no")) return 0;
1254 else return -1;
1255 }
1256
1257 /* I agree, this is a very rudimental way to load a configuration...
1258 will improve later if the config gets more complex */
1259 static void loadServerConfig(char *filename) {
1260 FILE *fp;
1261 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1262 int linenum = 0;
1263 sds line = NULL;
1264
1265 if (filename[0] == '-' && filename[1] == '\0')
1266 fp = stdin;
1267 else {
1268 if ((fp = fopen(filename,"r")) == NULL) {
1269 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1270 exit(1);
1271 }
1272 }
1273
1274 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1275 sds *argv;
1276 int argc, j;
1277
1278 linenum++;
1279 line = sdsnew(buf);
1280 line = sdstrim(line," \t\r\n");
1281
1282 /* Skip comments and blank lines*/
1283 if (line[0] == '#' || line[0] == '\0') {
1284 sdsfree(line);
1285 continue;
1286 }
1287
1288 /* Split into arguments */
1289 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1290 sdstolower(argv[0]);
1291
1292 /* Execute config directives */
1293 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1294 server.maxidletime = atoi(argv[1]);
1295 if (server.maxidletime < 0) {
1296 err = "Invalid timeout value"; goto loaderr;
1297 }
1298 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1299 server.port = atoi(argv[1]);
1300 if (server.port < 1 || server.port > 65535) {
1301 err = "Invalid port"; goto loaderr;
1302 }
1303 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1304 server.bindaddr = zstrdup(argv[1]);
1305 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1306 int seconds = atoi(argv[1]);
1307 int changes = atoi(argv[2]);
1308 if (seconds < 1 || changes < 0) {
1309 err = "Invalid save parameters"; goto loaderr;
1310 }
1311 appendServerSaveParams(seconds,changes);
1312 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1313 if (chdir(argv[1]) == -1) {
1314 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1315 argv[1], strerror(errno));
1316 exit(1);
1317 }
1318 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1319 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1320 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1321 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1322 else {
1323 err = "Invalid log level. Must be one of debug, notice, warning";
1324 goto loaderr;
1325 }
1326 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1327 FILE *logfp;
1328
1329 server.logfile = zstrdup(argv[1]);
1330 if (!strcasecmp(server.logfile,"stdout")) {
1331 zfree(server.logfile);
1332 server.logfile = NULL;
1333 }
1334 if (server.logfile) {
1335 /* Test if we are able to open the file. The server will not
1336 * be able to abort just for this problem later... */
1337 logfp = fopen(server.logfile,"a");
1338 if (logfp == NULL) {
1339 err = sdscatprintf(sdsempty(),
1340 "Can't open the log file: %s", strerror(errno));
1341 goto loaderr;
1342 }
1343 fclose(logfp);
1344 }
1345 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1346 server.dbnum = atoi(argv[1]);
1347 if (server.dbnum < 1) {
1348 err = "Invalid number of databases"; goto loaderr;
1349 }
1350 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1351 server.maxclients = atoi(argv[1]);
1352 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1353 server.maxmemory = strtoll(argv[1], NULL, 10);
1354 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1355 server.masterhost = sdsnew(argv[1]);
1356 server.masterport = atoi(argv[2]);
1357 server.replstate = REDIS_REPL_CONNECT;
1358 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1359 server.masterauth = zstrdup(argv[1]);
1360 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1361 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1362 err = "argument must be 'yes' or 'no'"; goto loaderr;
1363 }
1364 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1365 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1366 err = "argument must be 'yes' or 'no'"; goto loaderr;
1367 }
1368 } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
1369 if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
1370 err = "argument must be 'yes' or 'no'"; goto loaderr;
1371 }
1372 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1373 server.sharingpoolsize = atoi(argv[1]);
1374 if (server.sharingpoolsize < 1) {
1375 err = "invalid object sharing pool size"; goto loaderr;
1376 }
1377 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1378 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1379 err = "argument must be 'yes' or 'no'"; goto loaderr;
1380 }
1381 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1382 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1383 err = "argument must be 'yes' or 'no'"; goto loaderr;
1384 }
1385 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1386 if (!strcasecmp(argv[1],"no")) {
1387 server.appendfsync = APPENDFSYNC_NO;
1388 } else if (!strcasecmp(argv[1],"always")) {
1389 server.appendfsync = APPENDFSYNC_ALWAYS;
1390 } else if (!strcasecmp(argv[1],"everysec")) {
1391 server.appendfsync = APPENDFSYNC_EVERYSEC;
1392 } else {
1393 err = "argument must be 'no', 'always' or 'everysec'";
1394 goto loaderr;
1395 }
1396 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1397 server.requirepass = zstrdup(argv[1]);
1398 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1399 server.pidfile = zstrdup(argv[1]);
1400 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1401 server.dbfilename = zstrdup(argv[1]);
1402 } else {
1403 err = "Bad directive or wrong number of arguments"; goto loaderr;
1404 }
1405 for (j = 0; j < argc; j++)
1406 sdsfree(argv[j]);
1407 zfree(argv);
1408 sdsfree(line);
1409 }
1410 if (fp != stdin) fclose(fp);
1411 return;
1412
1413 loaderr:
1414 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1415 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1416 fprintf(stderr, ">>> '%s'\n", line);
1417 fprintf(stderr, "%s\n", err);
1418 exit(1);
1419 }
1420
1421 static void freeClientArgv(redisClient *c) {
1422 int j;
1423
1424 for (j = 0; j < c->argc; j++)
1425 decrRefCount(c->argv[j]);
1426 for (j = 0; j < c->mbargc; j++)
1427 decrRefCount(c->mbargv[j]);
1428 c->argc = 0;
1429 c->mbargc = 0;
1430 }
1431
1432 static void freeClient(redisClient *c) {
1433 listNode *ln;
1434
1435 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1436 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1437 sdsfree(c->querybuf);
1438 listRelease(c->reply);
1439 freeClientArgv(c);
1440 close(c->fd);
1441 ln = listSearchKey(server.clients,c);
1442 redisAssert(ln != NULL);
1443 listDelNode(server.clients,ln);
1444 if (c->flags & REDIS_SLAVE) {
1445 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1446 close(c->repldbfd);
1447 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1448 ln = listSearchKey(l,c);
1449 redisAssert(ln != NULL);
1450 listDelNode(l,ln);
1451 }
1452 if (c->flags & REDIS_MASTER) {
1453 server.master = NULL;
1454 server.replstate = REDIS_REPL_CONNECT;
1455 }
1456 zfree(c->argv);
1457 zfree(c->mbargv);
1458 freeClientMultiState(c);
1459 zfree(c);
1460 }
1461
1462 #define GLUEREPLY_UP_TO (1024)
1463 static void glueReplyBuffersIfNeeded(redisClient *c) {
1464 int copylen = 0;
1465 char buf[GLUEREPLY_UP_TO];
1466 listNode *ln;
1467 robj *o;
1468
1469 listRewind(c->reply);
1470 while((ln = listYield(c->reply))) {
1471 int objlen;
1472
1473 o = ln->value;
1474 objlen = sdslen(o->ptr);
1475 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1476 memcpy(buf+copylen,o->ptr,objlen);
1477 copylen += objlen;
1478 listDelNode(c->reply,ln);
1479 } else {
1480 if (copylen == 0) return;
1481 break;
1482 }
1483 }
1484 /* Now the output buffer is empty, add the new single element */
1485 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1486 listAddNodeHead(c->reply,o);
1487 }
1488
1489 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1490 redisClient *c = privdata;
1491 int nwritten = 0, totwritten = 0, objlen;
1492 robj *o;
1493 REDIS_NOTUSED(el);
1494 REDIS_NOTUSED(mask);
1495
1496 /* Use writev() if we have enough buffers to send */
1497 if (!server.glueoutputbuf &&
1498 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1499 !(c->flags & REDIS_MASTER))
1500 {
1501 sendReplyToClientWritev(el, fd, privdata, mask);
1502 return;
1503 }
1504
1505 while(listLength(c->reply)) {
1506 if (server.glueoutputbuf && listLength(c->reply) > 1)
1507 glueReplyBuffersIfNeeded(c);
1508
1509 o = listNodeValue(listFirst(c->reply));
1510 objlen = sdslen(o->ptr);
1511
1512 if (objlen == 0) {
1513 listDelNode(c->reply,listFirst(c->reply));
1514 continue;
1515 }
1516
1517 if (c->flags & REDIS_MASTER) {
1518 /* Don't reply to a master */
1519 nwritten = objlen - c->sentlen;
1520 } else {
1521 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1522 if (nwritten <= 0) break;
1523 }
1524 c->sentlen += nwritten;
1525 totwritten += nwritten;
1526 /* If we fully sent the object on head go to the next one */
1527 if (c->sentlen == objlen) {
1528 listDelNode(c->reply,listFirst(c->reply));
1529 c->sentlen = 0;
1530 }
1531 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1532 * bytes, in a single threaded server it's a good idea to serve
1533 * other clients as well, even if a very large request comes from
1534 * super fast link that is always able to accept data (in real world
1535 * scenario think about 'KEYS *' against the loopback interfae) */
1536 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1537 }
1538 if (nwritten == -1) {
1539 if (errno == EAGAIN) {
1540 nwritten = 0;
1541 } else {
1542 redisLog(REDIS_DEBUG,
1543 "Error writing to client: %s", strerror(errno));
1544 freeClient(c);
1545 return;
1546 }
1547 }
1548 if (totwritten > 0) c->lastinteraction = time(NULL);
1549 if (listLength(c->reply) == 0) {
1550 c->sentlen = 0;
1551 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1552 }
1553 }
1554
1555 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1556 {
1557 redisClient *c = privdata;
1558 int nwritten = 0, totwritten = 0, objlen, willwrite;
1559 robj *o;
1560 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1561 int offset, ion = 0;
1562 REDIS_NOTUSED(el);
1563 REDIS_NOTUSED(mask);
1564
1565 listNode *node;
1566 while (listLength(c->reply)) {
1567 offset = c->sentlen;
1568 ion = 0;
1569 willwrite = 0;
1570
1571 /* fill-in the iov[] array */
1572 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1573 o = listNodeValue(node);
1574 objlen = sdslen(o->ptr);
1575
1576 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1577 break;
1578
1579 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1580 break; /* no more iovecs */
1581
1582 iov[ion].iov_base = ((char*)o->ptr) + offset;
1583 iov[ion].iov_len = objlen - offset;
1584 willwrite += objlen - offset;
1585 offset = 0; /* just for the first item */
1586 ion++;
1587 }
1588
1589 if(willwrite == 0)
1590 break;
1591
1592 /* write all collected blocks at once */
1593 if((nwritten = writev(fd, iov, ion)) < 0) {
1594 if (errno != EAGAIN) {
1595 redisLog(REDIS_DEBUG,
1596 "Error writing to client: %s", strerror(errno));
1597 freeClient(c);
1598 return;
1599 }
1600 break;
1601 }
1602
1603 totwritten += nwritten;
1604 offset = c->sentlen;
1605
1606 /* remove written robjs from c->reply */
1607 while (nwritten && listLength(c->reply)) {
1608 o = listNodeValue(listFirst(c->reply));
1609 objlen = sdslen(o->ptr);
1610
1611 if(nwritten >= objlen - offset) {
1612 listDelNode(c->reply, listFirst(c->reply));
1613 nwritten -= objlen - offset;
1614 c->sentlen = 0;
1615 } else {
1616 /* partial write */
1617 c->sentlen += nwritten;
1618 break;
1619 }
1620 offset = 0;
1621 }
1622 }
1623
1624 if (totwritten > 0)
1625 c->lastinteraction = time(NULL);
1626
1627 if (listLength(c->reply) == 0) {
1628 c->sentlen = 0;
1629 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1630 }
1631 }
1632
1633 static struct redisCommand *lookupCommand(char *name) {
1634 int j = 0;
1635 while(cmdTable[j].name != NULL) {
1636 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1637 j++;
1638 }
1639 return NULL;
1640 }
1641
1642 /* resetClient prepare the client to process the next command */
1643 static void resetClient(redisClient *c) {
1644 freeClientArgv(c);
1645 c->bulklen = -1;
1646 c->multibulk = 0;
1647 }
1648
1649 /* Call() is the core of Redis execution of a command */
1650 static void call(redisClient *c, struct redisCommand *cmd) {
1651 long long dirty;
1652
1653 dirty = server.dirty;
1654 cmd->proc(c);
1655 if (server.appendonly && server.dirty-dirty)
1656 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1657 if (server.dirty-dirty && listLength(server.slaves))
1658 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1659 if (listLength(server.monitors))
1660 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1661 server.stat_numcommands++;
1662 }
1663
1664 /* If this function gets called we already read a whole
1665 * command, argments are in the client argv/argc fields.
1666 * processCommand() execute the command or prepare the
1667 * server for a bulk read from the client.
1668 *
1669 * If 1 is returned the client is still alive and valid and
1670 * and other operations can be performed by the caller. Otherwise
1671 * if 0 is returned the client was destroied (i.e. after QUIT). */
1672 static int processCommand(redisClient *c) {
1673 struct redisCommand *cmd;
1674
1675 /* Free some memory if needed (maxmemory setting) */
1676 if (server.maxmemory) freeMemoryIfNeeded();
1677
1678 /* Handle the multi bulk command type. This is an alternative protocol
1679 * supported by Redis in order to receive commands that are composed of
1680 * multiple binary-safe "bulk" arguments. The latency of processing is
1681 * a bit higher but this allows things like multi-sets, so if this
1682 * protocol is used only for MSET and similar commands this is a big win. */
1683 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1684 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1685 if (c->multibulk <= 0) {
1686 resetClient(c);
1687 return 1;
1688 } else {
1689 decrRefCount(c->argv[c->argc-1]);
1690 c->argc--;
1691 return 1;
1692 }
1693 } else if (c->multibulk) {
1694 if (c->bulklen == -1) {
1695 if (((char*)c->argv[0]->ptr)[0] != '$') {
1696 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1697 resetClient(c);
1698 return 1;
1699 } else {
1700 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1701 decrRefCount(c->argv[0]);
1702 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1703 c->argc--;
1704 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1705 resetClient(c);
1706 return 1;
1707 }
1708 c->argc--;
1709 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1710 return 1;
1711 }
1712 } else {
1713 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1714 c->mbargv[c->mbargc] = c->argv[0];
1715 c->mbargc++;
1716 c->argc--;
1717 c->multibulk--;
1718 if (c->multibulk == 0) {
1719 robj **auxargv;
1720 int auxargc;
1721
1722 /* Here we need to swap the multi-bulk argc/argv with the
1723 * normal argc/argv of the client structure. */
1724 auxargv = c->argv;
1725 c->argv = c->mbargv;
1726 c->mbargv = auxargv;
1727
1728 auxargc = c->argc;
1729 c->argc = c->mbargc;
1730 c->mbargc = auxargc;
1731
1732 /* We need to set bulklen to something different than -1
1733 * in order for the code below to process the command without
1734 * to try to read the last argument of a bulk command as
1735 * a special argument. */
1736 c->bulklen = 0;
1737 /* continue below and process the command */
1738 } else {
1739 c->bulklen = -1;
1740 return 1;
1741 }
1742 }
1743 }
1744 /* -- end of multi bulk commands processing -- */
1745
1746 /* The QUIT command is handled as a special case. Normal command
1747 * procs are unable to close the client connection safely */
1748 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1749 freeClient(c);
1750 return 0;
1751 }
1752 cmd = lookupCommand(c->argv[0]->ptr);
1753 if (!cmd) {
1754 addReplySds(c,
1755 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1756 (char*)c->argv[0]->ptr));
1757 resetClient(c);
1758 return 1;
1759 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1760 (c->argc < -cmd->arity)) {
1761 addReplySds(c,
1762 sdscatprintf(sdsempty(),
1763 "-ERR wrong number of arguments for '%s' command\r\n",
1764 cmd->name));
1765 resetClient(c);
1766 return 1;
1767 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1768 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1769 resetClient(c);
1770 return 1;
1771 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1772 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1773
1774 decrRefCount(c->argv[c->argc-1]);
1775 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1776 c->argc--;
1777 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1778 resetClient(c);
1779 return 1;
1780 }
1781 c->argc--;
1782 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1783 /* It is possible that the bulk read is already in the
1784 * buffer. Check this condition and handle it accordingly.
1785 * This is just a fast path, alternative to call processInputBuffer().
1786 * It's a good idea since the code is small and this condition
1787 * happens most of the times. */
1788 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1789 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1790 c->argc++;
1791 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1792 } else {
1793 return 1;
1794 }
1795 }
1796 /* Let's try to share objects on the command arguments vector */
1797 if (server.shareobjects) {
1798 int j;
1799 for(j = 1; j < c->argc; j++)
1800 c->argv[j] = tryObjectSharing(c->argv[j]);
1801 }
1802 /* Let's try to encode the bulk object to save space. */
1803 if (cmd->flags & REDIS_CMD_BULK)
1804 tryObjectEncoding(c->argv[c->argc-1]);
1805
1806 /* Check if the user is authenticated */
1807 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1808 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1809 resetClient(c);
1810 return 1;
1811 }
1812
1813 /* Exec the command */
1814 if (c->flags & REDIS_MULTI && cmd->proc != execCommand) {
1815 queueMultiCommand(c,cmd);
1816 addReply(c,shared.queued);
1817 } else {
1818 call(c,cmd);
1819 }
1820
1821 /* Prepare the client for the next command */
1822 if (c->flags & REDIS_CLOSE) {
1823 freeClient(c);
1824 return 0;
1825 }
1826 resetClient(c);
1827 return 1;
1828 }
1829
1830 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1831 listNode *ln;
1832 int outc = 0, j;
1833 robj **outv;
1834 /* (args*2)+1 is enough room for args, spaces, newlines */
1835 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1836
1837 if (argc <= REDIS_STATIC_ARGS) {
1838 outv = static_outv;
1839 } else {
1840 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1841 }
1842
1843 for (j = 0; j < argc; j++) {
1844 if (j != 0) outv[outc++] = shared.space;
1845 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1846 robj *lenobj;
1847
1848 lenobj = createObject(REDIS_STRING,
1849 sdscatprintf(sdsempty(),"%lu\r\n",
1850 (unsigned long) stringObjectLen(argv[j])));
1851 lenobj->refcount = 0;
1852 outv[outc++] = lenobj;
1853 }
1854 outv[outc++] = argv[j];
1855 }
1856 outv[outc++] = shared.crlf;
1857
1858 /* Increment all the refcounts at start and decrement at end in order to
1859 * be sure to free objects if there is no slave in a replication state
1860 * able to be feed with commands */
1861 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1862 listRewind(slaves);
1863 while((ln = listYield(slaves))) {
1864 redisClient *slave = ln->value;
1865
1866 /* Don't feed slaves that are still waiting for BGSAVE to start */
1867 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1868
1869 /* Feed all the other slaves, MONITORs and so on */
1870 if (slave->slaveseldb != dictid) {
1871 robj *selectcmd;
1872
1873 switch(dictid) {
1874 case 0: selectcmd = shared.select0; break;
1875 case 1: selectcmd = shared.select1; break;
1876 case 2: selectcmd = shared.select2; break;
1877 case 3: selectcmd = shared.select3; break;
1878 case 4: selectcmd = shared.select4; break;
1879 case 5: selectcmd = shared.select5; break;
1880 case 6: selectcmd = shared.select6; break;
1881 case 7: selectcmd = shared.select7; break;
1882 case 8: selectcmd = shared.select8; break;
1883 case 9: selectcmd = shared.select9; break;
1884 default:
1885 selectcmd = createObject(REDIS_STRING,
1886 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1887 selectcmd->refcount = 0;
1888 break;
1889 }
1890 addReply(slave,selectcmd);
1891 slave->slaveseldb = dictid;
1892 }
1893 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1894 }
1895 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1896 if (outv != static_outv) zfree(outv);
1897 }
1898
1899 static void processInputBuffer(redisClient *c) {
1900 again:
1901 if (c->bulklen == -1) {
1902 /* Read the first line of the query */
1903 char *p = strchr(c->querybuf,'\n');
1904 size_t querylen;
1905
1906 if (p) {
1907 sds query, *argv;
1908 int argc, j;
1909
1910 query = c->querybuf;
1911 c->querybuf = sdsempty();
1912 querylen = 1+(p-(query));
1913 if (sdslen(query) > querylen) {
1914 /* leave data after the first line of the query in the buffer */
1915 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1916 }
1917 *p = '\0'; /* remove "\n" */
1918 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1919 sdsupdatelen(query);
1920
1921 /* Now we can split the query in arguments */
1922 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1923 sdsfree(query);
1924
1925 if (c->argv) zfree(c->argv);
1926 c->argv = zmalloc(sizeof(robj*)*argc);
1927
1928 for (j = 0; j < argc; j++) {
1929 if (sdslen(argv[j])) {
1930 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1931 c->argc++;
1932 } else {
1933 sdsfree(argv[j]);
1934 }
1935 }
1936 zfree(argv);
1937 if (c->argc) {
1938 /* Execute the command. If the client is still valid
1939 * after processCommand() return and there is something
1940 * on the query buffer try to process the next command. */
1941 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1942 } else {
1943 /* Nothing to process, argc == 0. Just process the query
1944 * buffer if it's not empty or return to the caller */
1945 if (sdslen(c->querybuf)) goto again;
1946 }
1947 return;
1948 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1949 redisLog(REDIS_DEBUG, "Client protocol error");
1950 freeClient(c);
1951 return;
1952 }
1953 } else {
1954 /* Bulk read handling. Note that if we are at this point
1955 the client already sent a command terminated with a newline,
1956 we are reading the bulk data that is actually the last
1957 argument of the command. */
1958 int qbl = sdslen(c->querybuf);
1959
1960 if (c->bulklen <= qbl) {
1961 /* Copy everything but the final CRLF as final argument */
1962 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1963 c->argc++;
1964 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1965 /* Process the command. If the client is still valid after
1966 * the processing and there is more data in the buffer
1967 * try to parse it. */
1968 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1969 return;
1970 }
1971 }
1972 }
1973
1974 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1975 redisClient *c = (redisClient*) privdata;
1976 char buf[REDIS_IOBUF_LEN];
1977 int nread;
1978 REDIS_NOTUSED(el);
1979 REDIS_NOTUSED(mask);
1980
1981 nread = read(fd, buf, REDIS_IOBUF_LEN);
1982 if (nread == -1) {
1983 if (errno == EAGAIN) {
1984 nread = 0;
1985 } else {
1986 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1987 freeClient(c);
1988 return;
1989 }
1990 } else if (nread == 0) {
1991 redisLog(REDIS_DEBUG, "Client closed connection");
1992 freeClient(c);
1993 return;
1994 }
1995 if (nread) {
1996 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1997 c->lastinteraction = time(NULL);
1998 } else {
1999 return;
2000 }
2001 processInputBuffer(c);
2002 }
2003
2004 static int selectDb(redisClient *c, int id) {
2005 if (id < 0 || id >= server.dbnum)
2006 return REDIS_ERR;
2007 c->db = &server.db[id];
2008 return REDIS_OK;
2009 }
2010
2011 static void *dupClientReplyValue(void *o) {
2012 incrRefCount((robj*)o);
2013 return 0;
2014 }
2015
2016 static redisClient *createClient(int fd) {
2017 redisClient *c = zmalloc(sizeof(*c));
2018
2019 anetNonBlock(NULL,fd);
2020 anetTcpNoDelay(NULL,fd);
2021 if (!c) return NULL;
2022 selectDb(c,0);
2023 c->fd = fd;
2024 c->querybuf = sdsempty();
2025 c->argc = 0;
2026 c->argv = NULL;
2027 c->bulklen = -1;
2028 c->multibulk = 0;
2029 c->mbargc = 0;
2030 c->mbargv = NULL;
2031 c->sentlen = 0;
2032 c->flags = 0;
2033 c->lastinteraction = time(NULL);
2034 c->authenticated = 0;
2035 c->replstate = REDIS_REPL_NONE;
2036 c->reply = listCreate();
2037 listSetFreeMethod(c->reply,decrRefCount);
2038 listSetDupMethod(c->reply,dupClientReplyValue);
2039 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
2040 readQueryFromClient, c) == AE_ERR) {
2041 freeClient(c);
2042 return NULL;
2043 }
2044 listAddNodeTail(server.clients,c);
2045 initClientMultiState(c);
2046 return c;
2047 }
2048
2049 static void addReply(redisClient *c, robj *obj) {
2050 if (listLength(c->reply) == 0 &&
2051 (c->replstate == REDIS_REPL_NONE ||
2052 c->replstate == REDIS_REPL_ONLINE) &&
2053 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
2054 sendReplyToClient, c) == AE_ERR) return;
2055 listAddNodeTail(c->reply,getDecodedObject(obj));
2056 }
2057
2058 static void addReplySds(redisClient *c, sds s) {
2059 robj *o = createObject(REDIS_STRING,s);
2060 addReply(c,o);
2061 decrRefCount(o);
2062 }
2063
2064 static void addReplyDouble(redisClient *c, double d) {
2065 char buf[128];
2066
2067 snprintf(buf,sizeof(buf),"%.17g",d);
2068 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2069 (unsigned long) strlen(buf),buf));
2070 }
2071
2072 static void addReplyBulkLen(redisClient *c, robj *obj) {
2073 size_t len;
2074
2075 if (obj->encoding == REDIS_ENCODING_RAW) {
2076 len = sdslen(obj->ptr);
2077 } else {
2078 long n = (long)obj->ptr;
2079
2080 /* Compute how many bytes will take this integer as a radix 10 string */
2081 len = 1;
2082 if (n < 0) {
2083 len++;
2084 n = -n;
2085 }
2086 while((n = n/10) != 0) {
2087 len++;
2088 }
2089 }
2090 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
2091 }
2092
2093 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2094 int cport, cfd;
2095 char cip[128];
2096 redisClient *c;
2097 REDIS_NOTUSED(el);
2098 REDIS_NOTUSED(mask);
2099 REDIS_NOTUSED(privdata);
2100
2101 cfd = anetAccept(server.neterr, fd, cip, &cport);
2102 if (cfd == AE_ERR) {
2103 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2104 return;
2105 }
2106 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2107 if ((c = createClient(cfd)) == NULL) {
2108 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2109 close(cfd); /* May be already closed, just ingore errors */
2110 return;
2111 }
2112 /* If maxclient directive is set and this is one client more... close the
2113 * connection. Note that we create the client instead to check before
2114 * for this condition, since now the socket is already set in nonblocking
2115 * mode and we can send an error for free using the Kernel I/O */
2116 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2117 char *err = "-ERR max number of clients reached\r\n";
2118
2119 /* That's a best effort error message, don't check write errors */
2120 if (write(c->fd,err,strlen(err)) == -1) {
2121 /* Nothing to do, Just to avoid the warning... */
2122 }
2123 freeClient(c);
2124 return;
2125 }
2126 server.stat_numconnections++;
2127 }
2128
2129 /* ======================= Redis objects implementation ===================== */
2130
2131 static robj *createObject(int type, void *ptr) {
2132 robj *o;
2133
2134 if (listLength(server.objfreelist)) {
2135 listNode *head = listFirst(server.objfreelist);
2136 o = listNodeValue(head);
2137 listDelNode(server.objfreelist,head);
2138 } else {
2139 o = zmalloc(sizeof(*o));
2140 }
2141 o->type = type;
2142 o->encoding = REDIS_ENCODING_RAW;
2143 o->ptr = ptr;
2144 o->refcount = 1;
2145 return o;
2146 }
2147
2148 static robj *createStringObject(char *ptr, size_t len) {
2149 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2150 }
2151
2152 static robj *createListObject(void) {
2153 list *l = listCreate();
2154
2155 listSetFreeMethod(l,decrRefCount);
2156 return createObject(REDIS_LIST,l);
2157 }
2158
2159 static robj *createSetObject(void) {
2160 dict *d = dictCreate(&setDictType,NULL);
2161 return createObject(REDIS_SET,d);
2162 }
2163
2164 static robj *createZsetObject(void) {
2165 zset *zs = zmalloc(sizeof(*zs));
2166
2167 zs->dict = dictCreate(&zsetDictType,NULL);
2168 zs->zsl = zslCreate();
2169 return createObject(REDIS_ZSET,zs);
2170 }
2171
2172 static void freeStringObject(robj *o) {
2173 if (o->encoding == REDIS_ENCODING_RAW) {
2174 sdsfree(o->ptr);
2175 }
2176 }
2177
2178 static void freeListObject(robj *o) {
2179 listRelease((list*) o->ptr);
2180 }
2181
2182 static void freeSetObject(robj *o) {
2183 dictRelease((dict*) o->ptr);
2184 }
2185
2186 static void freeZsetObject(robj *o) {
2187 zset *zs = o->ptr;
2188
2189 dictRelease(zs->dict);
2190 zslFree(zs->zsl);
2191 zfree(zs);
2192 }
2193
2194 static void freeHashObject(robj *o) {
2195 dictRelease((dict*) o->ptr);
2196 }
2197
2198 static void incrRefCount(robj *o) {
2199 o->refcount++;
2200 #ifdef DEBUG_REFCOUNT
2201 if (o->type == REDIS_STRING)
2202 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2203 #endif
2204 }
2205
2206 static void decrRefCount(void *obj) {
2207 robj *o = obj;
2208
2209 #ifdef DEBUG_REFCOUNT
2210 if (o->type == REDIS_STRING)
2211 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2212 #endif
2213 if (--(o->refcount) == 0) {
2214 switch(o->type) {
2215 case REDIS_STRING: freeStringObject(o); break;
2216 case REDIS_LIST: freeListObject(o); break;
2217 case REDIS_SET: freeSetObject(o); break;
2218 case REDIS_ZSET: freeZsetObject(o); break;
2219 case REDIS_HASH: freeHashObject(o); break;
2220 default: redisAssert(0 != 0); break;
2221 }
2222 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2223 !listAddNodeHead(server.objfreelist,o))
2224 zfree(o);
2225 }
2226 }
2227
2228 static robj *lookupKey(redisDb *db, robj *key) {
2229 dictEntry *de = dictFind(db->dict,key);
2230 return de ? dictGetEntryVal(de) : NULL;
2231 }
2232
2233 static robj *lookupKeyRead(redisDb *db, robj *key) {
2234 expireIfNeeded(db,key);
2235 return lookupKey(db,key);
2236 }
2237
2238 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2239 deleteIfVolatile(db,key);
2240 return lookupKey(db,key);
2241 }
2242
2243 static int deleteKey(redisDb *db, robj *key) {
2244 int retval;
2245
2246 /* We need to protect key from destruction: after the first dictDelete()
2247 * it may happen that 'key' is no longer valid if we don't increment
2248 * it's count. This may happen when we get the object reference directly
2249 * from the hash table with dictRandomKey() or dict iterators */
2250 incrRefCount(key);
2251 if (dictSize(db->expires)) dictDelete(db->expires,key);
2252 retval = dictDelete(db->dict,key);
2253 decrRefCount(key);
2254
2255 return retval == DICT_OK;
2256 }
2257
2258 /* Try to share an object against the shared objects pool */
2259 static robj *tryObjectSharing(robj *o) {
2260 struct dictEntry *de;
2261 unsigned long c;
2262
2263 if (o == NULL || server.shareobjects == 0) return o;
2264
2265 redisAssert(o->type == REDIS_STRING);
2266 de = dictFind(server.sharingpool,o);
2267 if (de) {
2268 robj *shared = dictGetEntryKey(de);
2269
2270 c = ((unsigned long) dictGetEntryVal(de))+1;
2271 dictGetEntryVal(de) = (void*) c;
2272 incrRefCount(shared);
2273 decrRefCount(o);
2274 return shared;
2275 } else {
2276 /* Here we are using a stream algorihtm: Every time an object is
2277 * shared we increment its count, everytime there is a miss we
2278 * recrement the counter of a random object. If this object reaches
2279 * zero we remove the object and put the current object instead. */
2280 if (dictSize(server.sharingpool) >=
2281 server.sharingpoolsize) {
2282 de = dictGetRandomKey(server.sharingpool);
2283 redisAssert(de != NULL);
2284 c = ((unsigned long) dictGetEntryVal(de))-1;
2285 dictGetEntryVal(de) = (void*) c;
2286 if (c == 0) {
2287 dictDelete(server.sharingpool,de->key);
2288 }
2289 } else {
2290 c = 0; /* If the pool is empty we want to add this object */
2291 }
2292 if (c == 0) {
2293 int retval;
2294
2295 retval = dictAdd(server.sharingpool,o,(void*)1);
2296 redisAssert(retval == DICT_OK);
2297 incrRefCount(o);
2298 }
2299 return o;
2300 }
2301 }
2302
2303 /* Check if the nul-terminated string 's' can be represented by a long
2304 * (that is, is a number that fits into long without any other space or
2305 * character before or after the digits).
2306 *
2307 * If so, the function returns REDIS_OK and *longval is set to the value
2308 * of the number. Otherwise REDIS_ERR is returned */
2309 static int isStringRepresentableAsLong(sds s, long *longval) {
2310 char buf[32], *endptr;
2311 long value;
2312 int slen;
2313
2314 value = strtol(s, &endptr, 10);
2315 if (endptr[0] != '\0') return REDIS_ERR;
2316 slen = snprintf(buf,32,"%ld",value);
2317
2318 /* If the number converted back into a string is not identical
2319 * then it's not possible to encode the string as integer */
2320 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2321 if (longval) *longval = value;
2322 return REDIS_OK;
2323 }
2324
2325 /* Try to encode a string object in order to save space */
2326 static int tryObjectEncoding(robj *o) {
2327 long value;
2328 sds s = o->ptr;
2329
2330 if (o->encoding != REDIS_ENCODING_RAW)
2331 return REDIS_ERR; /* Already encoded */
2332
2333 /* It's not save to encode shared objects: shared objects can be shared
2334 * everywhere in the "object space" of Redis. Encoded objects can only
2335 * appear as "values" (and not, for instance, as keys) */
2336 if (o->refcount > 1) return REDIS_ERR;
2337
2338 /* Currently we try to encode only strings */
2339 redisAssert(o->type == REDIS_STRING);
2340
2341 /* Check if we can represent this string as a long integer */
2342 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2343
2344 /* Ok, this object can be encoded */
2345 o->encoding = REDIS_ENCODING_INT;
2346 sdsfree(o->ptr);
2347 o->ptr = (void*) value;
2348 return REDIS_OK;
2349 }
2350
2351 /* Get a decoded version of an encoded object (returned as a new object).
2352 * If the object is already raw-encoded just increment the ref count. */
2353 static robj *getDecodedObject(robj *o) {
2354 robj *dec;
2355
2356 if (o->encoding == REDIS_ENCODING_RAW) {
2357 incrRefCount(o);
2358 return o;
2359 }
2360 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2361 char buf[32];
2362
2363 snprintf(buf,32,"%ld",(long)o->ptr);
2364 dec = createStringObject(buf,strlen(buf));
2365 return dec;
2366 } else {
2367 redisAssert(1 != 1);
2368 }
2369 }
2370
2371 /* Compare two string objects via strcmp() or alike.
2372 * Note that the objects may be integer-encoded. In such a case we
2373 * use snprintf() to get a string representation of the numbers on the stack
2374 * and compare the strings, it's much faster than calling getDecodedObject().
2375 *
2376 * Important note: if objects are not integer encoded, but binary-safe strings,
2377 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2378 * binary safe. */
2379 static int compareStringObjects(robj *a, robj *b) {
2380 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2381 char bufa[128], bufb[128], *astr, *bstr;
2382 int bothsds = 1;
2383
2384 if (a == b) return 0;
2385 if (a->encoding != REDIS_ENCODING_RAW) {
2386 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2387 astr = bufa;
2388 bothsds = 0;
2389 } else {
2390 astr = a->ptr;
2391 }
2392 if (b->encoding != REDIS_ENCODING_RAW) {
2393 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2394 bstr = bufb;
2395 bothsds = 0;
2396 } else {
2397 bstr = b->ptr;
2398 }
2399 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2400 }
2401
2402 static size_t stringObjectLen(robj *o) {
2403 redisAssert(o->type == REDIS_STRING);
2404 if (o->encoding == REDIS_ENCODING_RAW) {
2405 return sdslen(o->ptr);
2406 } else {
2407 char buf[32];
2408
2409 return snprintf(buf,32,"%ld",(long)o->ptr);
2410 }
2411 }
2412
2413 /*============================ DB saving/loading ============================ */
2414
2415 static int rdbSaveType(FILE *fp, unsigned char type) {
2416 if (fwrite(&type,1,1,fp) == 0) return -1;
2417 return 0;
2418 }
2419
2420 static int rdbSaveTime(FILE *fp, time_t t) {
2421 int32_t t32 = (int32_t) t;
2422 if (fwrite(&t32,4,1,fp) == 0) return -1;
2423 return 0;
2424 }
2425
2426 /* check rdbLoadLen() comments for more info */
2427 static int rdbSaveLen(FILE *fp, uint32_t len) {
2428 unsigned char buf[2];
2429
2430 if (len < (1<<6)) {
2431 /* Save a 6 bit len */
2432 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2433 if (fwrite(buf,1,1,fp) == 0) return -1;
2434 } else if (len < (1<<14)) {
2435 /* Save a 14 bit len */
2436 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2437 buf[1] = len&0xFF;
2438 if (fwrite(buf,2,1,fp) == 0) return -1;
2439 } else {
2440 /* Save a 32 bit len */
2441 buf[0] = (REDIS_RDB_32BITLEN<<6);
2442 if (fwrite(buf,1,1,fp) == 0) return -1;
2443 len = htonl(len);
2444 if (fwrite(&len,4,1,fp) == 0) return -1;
2445 }
2446 return 0;
2447 }
2448
2449 /* String objects in the form "2391" "-100" without any space and with a
2450 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2451 * encoded as integers to save space */
2452 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2453 long long value;
2454 char *endptr, buf[32];
2455
2456 /* Check if it's possible to encode this value as a number */
2457 value = strtoll(s, &endptr, 10);
2458 if (endptr[0] != '\0') return 0;
2459 snprintf(buf,32,"%lld",value);
2460
2461 /* If the number converted back into a string is not identical
2462 * then it's not possible to encode the string as integer */
2463 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2464
2465 /* Finally check if it fits in our ranges */
2466 if (value >= -(1<<7) && value <= (1<<7)-1) {
2467 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2468 enc[1] = value&0xFF;
2469 return 2;
2470 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2471 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2472 enc[1] = value&0xFF;
2473 enc[2] = (value>>8)&0xFF;
2474 return 3;
2475 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2476 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2477 enc[1] = value&0xFF;
2478 enc[2] = (value>>8)&0xFF;
2479 enc[3] = (value>>16)&0xFF;
2480 enc[4] = (value>>24)&0xFF;
2481 return 5;
2482 } else {
2483 return 0;
2484 }
2485 }
2486
2487 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2488 unsigned int comprlen, outlen;
2489 unsigned char byte;
2490 void *out;
2491
2492 /* We require at least four bytes compression for this to be worth it */
2493 outlen = sdslen(obj->ptr)-4;
2494 if (outlen <= 0) return 0;
2495 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2496 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2497 if (comprlen == 0) {
2498 zfree(out);
2499 return 0;
2500 }
2501 /* Data compressed! Let's save it on disk */
2502 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2503 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2504 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2505 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2506 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2507 zfree(out);
2508 return comprlen;
2509
2510 writeerr:
2511 zfree(out);
2512 return -1;
2513 }
2514
2515 /* Save a string objet as [len][data] on disk. If the object is a string
2516 * representation of an integer value we try to safe it in a special form */
2517 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2518 size_t len;
2519 int enclen;
2520
2521 len = sdslen(obj->ptr);
2522
2523 /* Try integer encoding */
2524 if (len <= 11) {
2525 unsigned char buf[5];
2526 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2527 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2528 return 0;
2529 }
2530 }
2531
2532 /* Try LZF compression - under 20 bytes it's unable to compress even
2533 * aaaaaaaaaaaaaaaaaa so skip it */
2534 if (server.rdbcompression && len > 20) {
2535 int retval;
2536
2537 retval = rdbSaveLzfStringObject(fp,obj);
2538 if (retval == -1) return -1;
2539 if (retval > 0) return 0;
2540 /* retval == 0 means data can't be compressed, save the old way */
2541 }
2542
2543 /* Store verbatim */
2544 if (rdbSaveLen(fp,len) == -1) return -1;
2545 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2546 return 0;
2547 }
2548
2549 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2550 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2551 int retval;
2552
2553 obj = getDecodedObject(obj);
2554 retval = rdbSaveStringObjectRaw(fp,obj);
2555 decrRefCount(obj);
2556 return retval;
2557 }
2558
2559 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2560 * 8 bit integer specifing the length of the representation.
2561 * This 8 bit integer has special values in order to specify the following
2562 * conditions:
2563 * 253: not a number
2564 * 254: + inf
2565 * 255: - inf
2566 */
2567 static int rdbSaveDoubleValue(FILE *fp, double val) {
2568 unsigned char buf[128];
2569 int len;
2570
2571 if (isnan(val)) {
2572 buf[0] = 253;
2573 len = 1;
2574 } else if (!isfinite(val)) {
2575 len = 1;
2576 buf[0] = (val < 0) ? 255 : 254;
2577 } else {
2578 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2579 buf[0] = strlen((char*)buf+1);
2580 len = buf[0]+1;
2581 }
2582 if (fwrite(buf,len,1,fp) == 0) return -1;
2583 return 0;
2584 }
2585
2586 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2587 static int rdbSave(char *filename) {
2588 dictIterator *di = NULL;
2589 dictEntry *de;
2590 FILE *fp;
2591 char tmpfile[256];
2592 int j;
2593 time_t now = time(NULL);
2594
2595 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2596 fp = fopen(tmpfile,"w");
2597 if (!fp) {
2598 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2599 return REDIS_ERR;
2600 }
2601 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2602 for (j = 0; j < server.dbnum; j++) {
2603 redisDb *db = server.db+j;
2604 dict *d = db->dict;
2605 if (dictSize(d) == 0) continue;
2606 di = dictGetIterator(d);
2607 if (!di) {
2608 fclose(fp);
2609 return REDIS_ERR;
2610 }
2611
2612 /* Write the SELECT DB opcode */
2613 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2614 if (rdbSaveLen(fp,j) == -1) goto werr;
2615
2616 /* Iterate this DB writing every entry */
2617 while((de = dictNext(di)) != NULL) {
2618 robj *key = dictGetEntryKey(de);
2619 robj *o = dictGetEntryVal(de);
2620 time_t expiretime = getExpire(db,key);
2621
2622 /* Save the expire time */
2623 if (expiretime != -1) {
2624 /* If this key is already expired skip it */
2625 if (expiretime < now) continue;
2626 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2627 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2628 }
2629 /* Save the key and associated value */
2630 if (rdbSaveType(fp,o->type) == -1) goto werr;
2631 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2632 if (o->type == REDIS_STRING) {
2633 /* Save a string value */
2634 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2635 } else if (o->type == REDIS_LIST) {
2636 /* Save a list value */
2637 list *list = o->ptr;
2638 listNode *ln;
2639
2640 listRewind(list);
2641 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2642 while((ln = listYield(list))) {
2643 robj *eleobj = listNodeValue(ln);
2644
2645 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2646 }
2647 } else if (o->type == REDIS_SET) {
2648 /* Save a set value */
2649 dict *set = o->ptr;
2650 dictIterator *di = dictGetIterator(set);
2651 dictEntry *de;
2652
2653 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2654 while((de = dictNext(di)) != NULL) {
2655 robj *eleobj = dictGetEntryKey(de);
2656
2657 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2658 }
2659 dictReleaseIterator(di);
2660 } else if (o->type == REDIS_ZSET) {
2661 /* Save a set value */
2662 zset *zs = o->ptr;
2663 dictIterator *di = dictGetIterator(zs->dict);
2664 dictEntry *de;
2665
2666 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2667 while((de = dictNext(di)) != NULL) {
2668 robj *eleobj = dictGetEntryKey(de);
2669 double *score = dictGetEntryVal(de);
2670
2671 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2672 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2673 }
2674 dictReleaseIterator(di);
2675 } else {
2676 redisAssert(0 != 0);
2677 }
2678 }
2679 dictReleaseIterator(di);
2680 }
2681 /* EOF opcode */
2682 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2683
2684 /* Make sure data will not remain on the OS's output buffers */
2685 fflush(fp);
2686 fsync(fileno(fp));
2687 fclose(fp);
2688
2689 /* Use RENAME to make sure the DB file is changed atomically only
2690 * if the generate DB file is ok. */
2691 if (rename(tmpfile,filename) == -1) {
2692 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2693 unlink(tmpfile);
2694 return REDIS_ERR;
2695 }
2696 redisLog(REDIS_NOTICE,"DB saved on disk");
2697 server.dirty = 0;
2698 server.lastsave = time(NULL);
2699 return REDIS_OK;
2700
2701 werr:
2702 fclose(fp);
2703 unlink(tmpfile);
2704 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2705 if (di) dictReleaseIterator(di);
2706 return REDIS_ERR;
2707 }
2708
2709 static int rdbSaveBackground(char *filename) {
2710 pid_t childpid;
2711
2712 if (server.bgsavechildpid != -1) return REDIS_ERR;
2713 if ((childpid = fork()) == 0) {
2714 /* Child */
2715 close(server.fd);
2716 if (rdbSave(filename) == REDIS_OK) {
2717 exit(0);
2718 } else {
2719 exit(1);
2720 }
2721 } else {
2722 /* Parent */
2723 if (childpid == -1) {
2724 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2725 strerror(errno));
2726 return REDIS_ERR;
2727 }
2728 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2729 server.bgsavechildpid = childpid;
2730 return REDIS_OK;
2731 }
2732 return REDIS_OK; /* unreached */
2733 }
2734
2735 static void rdbRemoveTempFile(pid_t childpid) {
2736 char tmpfile[256];
2737
2738 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2739 unlink(tmpfile);
2740 }
2741
2742 static int rdbLoadType(FILE *fp) {
2743 unsigned char type;
2744 if (fread(&type,1,1,fp) == 0) return -1;
2745 return type;
2746 }
2747
2748 static time_t rdbLoadTime(FILE *fp) {
2749 int32_t t32;
2750 if (fread(&t32,4,1,fp) == 0) return -1;
2751 return (time_t) t32;
2752 }
2753
2754 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2755 * of this file for a description of how this are stored on disk.
2756 *
2757 * isencoded is set to 1 if the readed length is not actually a length but
2758 * an "encoding type", check the above comments for more info */
2759 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2760 unsigned char buf[2];
2761 uint32_t len;
2762
2763 if (isencoded) *isencoded = 0;
2764 if (rdbver == 0) {
2765 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2766 return ntohl(len);
2767 } else {
2768 int type;
2769
2770 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2771 type = (buf[0]&0xC0)>>6;
2772 if (type == REDIS_RDB_6BITLEN) {
2773 /* Read a 6 bit len */
2774 return buf[0]&0x3F;
2775 } else if (type == REDIS_RDB_ENCVAL) {
2776 /* Read a 6 bit len encoding type */
2777 if (isencoded) *isencoded = 1;
2778 return buf[0]&0x3F;
2779 } else if (type == REDIS_RDB_14BITLEN) {
2780 /* Read a 14 bit len */
2781 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2782 return ((buf[0]&0x3F)<<8)|buf[1];
2783 } else {
2784 /* Read a 32 bit len */
2785 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2786 return ntohl(len);
2787 }
2788 }
2789 }
2790
2791 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2792 unsigned char enc[4];
2793 long long val;
2794
2795 if (enctype == REDIS_RDB_ENC_INT8) {
2796 if (fread(enc,1,1,fp) == 0) return NULL;
2797 val = (signed char)enc[0];
2798 } else if (enctype == REDIS_RDB_ENC_INT16) {
2799 uint16_t v;
2800 if (fread(enc,2,1,fp) == 0) return NULL;
2801 v = enc[0]|(enc[1]<<8);
2802 val = (int16_t)v;
2803 } else if (enctype == REDIS_RDB_ENC_INT32) {
2804 uint32_t v;
2805 if (fread(enc,4,1,fp) == 0) return NULL;
2806 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2807 val = (int32_t)v;
2808 } else {
2809 val = 0; /* anti-warning */
2810 redisAssert(0!=0);
2811 }
2812 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2813 }
2814
2815 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2816 unsigned int len, clen;
2817 unsigned char *c = NULL;
2818 sds val = NULL;
2819
2820 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2821 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2822 if ((c = zmalloc(clen)) == NULL) goto err;
2823 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2824 if (fread(c,clen,1,fp) == 0) goto err;
2825 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2826 zfree(c);
2827 return createObject(REDIS_STRING,val);
2828 err:
2829 zfree(c);
2830 sdsfree(val);
2831 return NULL;
2832 }
2833
2834 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2835 int isencoded;
2836 uint32_t len;
2837 sds val;
2838
2839 len = rdbLoadLen(fp,rdbver,&isencoded);
2840 if (isencoded) {
2841 switch(len) {
2842 case REDIS_RDB_ENC_INT8:
2843 case REDIS_RDB_ENC_INT16:
2844 case REDIS_RDB_ENC_INT32:
2845 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2846 case REDIS_RDB_ENC_LZF:
2847 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2848 default:
2849 redisAssert(0!=0);
2850 }
2851 }
2852
2853 if (len == REDIS_RDB_LENERR) return NULL;
2854 val = sdsnewlen(NULL,len);
2855 if (len && fread(val,len,1,fp) == 0) {
2856 sdsfree(val);
2857 return NULL;
2858 }
2859 return tryObjectSharing(createObject(REDIS_STRING,val));
2860 }
2861
2862 /* For information about double serialization check rdbSaveDoubleValue() */
2863 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2864 char buf[128];
2865 unsigned char len;
2866
2867 if (fread(&len,1,1,fp) == 0) return -1;
2868 switch(len) {
2869 case 255: *val = R_NegInf; return 0;
2870 case 254: *val = R_PosInf; return 0;
2871 case 253: *val = R_Nan; return 0;
2872 default:
2873 if (fread(buf,len,1,fp) == 0) return -1;
2874 buf[len] = '\0';
2875 sscanf(buf, "%lg", val);
2876 return 0;
2877 }
2878 }
2879
2880 static int rdbLoad(char *filename) {
2881 FILE *fp;
2882 robj *keyobj = NULL;
2883 uint32_t dbid;
2884 int type, retval, rdbver;
2885 dict *d = server.db[0].dict;
2886 redisDb *db = server.db+0;
2887 char buf[1024];
2888 time_t expiretime = -1, now = time(NULL);
2889
2890 fp = fopen(filename,"r");
2891 if (!fp) return REDIS_ERR;
2892 if (fread(buf,9,1,fp) == 0) goto eoferr;
2893 buf[9] = '\0';
2894 if (memcmp(buf,"REDIS",5) != 0) {
2895 fclose(fp);
2896 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2897 return REDIS_ERR;
2898 }
2899 rdbver = atoi(buf+5);
2900 if (rdbver > 1) {
2901 fclose(fp);
2902 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2903 return REDIS_ERR;
2904 }
2905 while(1) {
2906 robj *o;
2907
2908 /* Read type. */
2909 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2910 if (type == REDIS_EXPIRETIME) {
2911 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2912 /* We read the time so we need to read the object type again */
2913 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2914 }
2915 if (type == REDIS_EOF) break;
2916 /* Handle SELECT DB opcode as a special case */
2917 if (type == REDIS_SELECTDB) {
2918 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2919 goto eoferr;
2920 if (dbid >= (unsigned)server.dbnum) {
2921 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2922 exit(1);
2923 }
2924 db = server.db+dbid;
2925 d = db->dict;
2926 continue;
2927 }
2928 /* Read key */
2929 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2930
2931 if (type == REDIS_STRING) {
2932 /* Read string value */
2933 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2934 tryObjectEncoding(o);
2935 } else if (type == REDIS_LIST || type == REDIS_SET) {
2936 /* Read list/set value */
2937 uint32_t listlen;
2938
2939 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2940 goto eoferr;
2941 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2942 /* Load every single element of the list/set */
2943 while(listlen--) {
2944 robj *ele;
2945
2946 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2947 tryObjectEncoding(ele);
2948 if (type == REDIS_LIST) {
2949 listAddNodeTail((list*)o->ptr,ele);
2950 } else {
2951 dictAdd((dict*)o->ptr,ele,NULL);
2952 }
2953 }
2954 } else if (type == REDIS_ZSET) {
2955 /* Read list/set value */
2956 uint32_t zsetlen;
2957 zset *zs;
2958
2959 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2960 goto eoferr;
2961 o = createZsetObject();
2962 zs = o->ptr;
2963 /* Load every single element of the list/set */
2964 while(zsetlen--) {
2965 robj *ele;
2966 double *score = zmalloc(sizeof(double));
2967
2968 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2969 tryObjectEncoding(ele);
2970 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2971 dictAdd(zs->dict,ele,score);
2972 zslInsert(zs->zsl,*score,ele);
2973 incrRefCount(ele); /* added to skiplist */
2974 }
2975 } else {
2976 redisAssert(0 != 0);
2977 }
2978 /* Add the new object in the hash table */
2979 retval = dictAdd(d,keyobj,o);
2980 if (retval == DICT_ERR) {
2981 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2982 exit(1);
2983 }
2984 /* Set the expire time if needed */
2985 if (expiretime != -1) {
2986 setExpire(db,keyobj,expiretime);
2987 /* Delete this key if already expired */
2988 if (expiretime < now) deleteKey(db,keyobj);
2989 expiretime = -1;
2990 }
2991 keyobj = o = NULL;
2992 }
2993 fclose(fp);
2994 return REDIS_OK;
2995
2996 eoferr: /* unexpected end of file is handled here with a fatal exit */
2997 if (keyobj) decrRefCount(keyobj);
2998 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2999 exit(1);
3000 return REDIS_ERR; /* Just to avoid warning */
3001 }
3002
3003 /*================================== Commands =============================== */
3004
3005 static void authCommand(redisClient *c) {
3006 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
3007 c->authenticated = 1;
3008 addReply(c,shared.ok);
3009 } else {
3010 c->authenticated = 0;
3011 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3012 }
3013 }
3014
3015 static void pingCommand(redisClient *c) {
3016 addReply(c,shared.pong);
3017 }
3018
3019 static void echoCommand(redisClient *c) {
3020 addReplyBulkLen(c,c->argv[1]);
3021 addReply(c,c->argv[1]);
3022 addReply(c,shared.crlf);
3023 }
3024
3025 /*=================================== Strings =============================== */
3026
3027 static void setGenericCommand(redisClient *c, int nx) {
3028 int retval;
3029
3030 if (nx) deleteIfVolatile(c->db,c->argv[1]);
3031 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
3032 if (retval == DICT_ERR) {
3033 if (!nx) {
3034 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3035 incrRefCount(c->argv[2]);
3036 } else {
3037 addReply(c,shared.czero);
3038 return;
3039 }
3040 } else {
3041 incrRefCount(c->argv[1]);
3042 incrRefCount(c->argv[2]);
3043 }
3044 server.dirty++;
3045 removeExpire(c->db,c->argv[1]);
3046 addReply(c, nx ? shared.cone : shared.ok);
3047 }
3048
3049 static void setCommand(redisClient *c) {
3050 setGenericCommand(c,0);
3051 }
3052
3053 static void setnxCommand(redisClient *c) {
3054 setGenericCommand(c,1);
3055 }
3056
3057 static int getGenericCommand(redisClient *c) {
3058 robj *o = lookupKeyRead(c->db,c->argv[1]);
3059
3060 if (o == NULL) {
3061 addReply(c,shared.nullbulk);
3062 return REDIS_OK;
3063 } else {
3064 if (o->type != REDIS_STRING) {
3065 addReply(c,shared.wrongtypeerr);
3066 return REDIS_ERR;
3067 } else {
3068 addReplyBulkLen(c,o);
3069 addReply(c,o);
3070 addReply(c,shared.crlf);
3071 return REDIS_OK;
3072 }
3073 }
3074 }
3075
3076 static void getCommand(redisClient *c) {
3077 getGenericCommand(c);
3078 }
3079
3080 static void getsetCommand(redisClient *c) {
3081 if (getGenericCommand(c) == REDIS_ERR) return;
3082 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3083 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3084 } else {
3085 incrRefCount(c->argv[1]);
3086 }
3087 incrRefCount(c->argv[2]);
3088 server.dirty++;
3089 removeExpire(c->db,c->argv[1]);
3090 }
3091
3092 static void mgetCommand(redisClient *c) {
3093 int j;
3094
3095 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3096 for (j = 1; j < c->argc; j++) {
3097 robj *o = lookupKeyRead(c->db,c->argv[j]);
3098 if (o == NULL) {
3099 addReply(c,shared.nullbulk);
3100 } else {
3101 if (o->type != REDIS_STRING) {
3102 addReply(c,shared.nullbulk);
3103 } else {
3104 addReplyBulkLen(c,o);
3105 addReply(c,o);
3106 addReply(c,shared.crlf);
3107 }
3108 }
3109 }
3110 }
3111
3112 static void msetGenericCommand(redisClient *c, int nx) {
3113 int j, busykeys = 0;
3114
3115 if ((c->argc % 2) == 0) {
3116 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3117 return;
3118 }
3119 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3120 * set nothing at all if at least one already key exists. */
3121 if (nx) {
3122 for (j = 1; j < c->argc; j += 2) {
3123 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3124 busykeys++;
3125 }
3126 }
3127 }
3128 if (busykeys) {
3129 addReply(c, shared.czero);
3130 return;
3131 }
3132
3133 for (j = 1; j < c->argc; j += 2) {
3134 int retval;
3135
3136 tryObjectEncoding(c->argv[j+1]);
3137 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3138 if (retval == DICT_ERR) {
3139 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3140 incrRefCount(c->argv[j+1]);
3141 } else {
3142 incrRefCount(c->argv[j]);
3143 incrRefCount(c->argv[j+1]);
3144 }
3145 removeExpire(c->db,c->argv[j]);
3146 }
3147 server.dirty += (c->argc-1)/2;
3148 addReply(c, nx ? shared.cone : shared.ok);
3149 }
3150
3151 static void msetCommand(redisClient *c) {
3152 msetGenericCommand(c,0);
3153 }
3154
3155 static void msetnxCommand(redisClient *c) {
3156 msetGenericCommand(c,1);
3157 }
3158
3159 static void incrDecrCommand(redisClient *c, long long incr) {
3160 long long value;
3161 int retval;
3162 robj *o;
3163
3164 o = lookupKeyWrite(c->db,c->argv[1]);
3165 if (o == NULL) {
3166 value = 0;
3167 } else {
3168 if (o->type != REDIS_STRING) {
3169 value = 0;
3170 } else {
3171 char *eptr;
3172
3173 if (o->encoding == REDIS_ENCODING_RAW)
3174 value = strtoll(o->ptr, &eptr, 10);
3175 else if (o->encoding == REDIS_ENCODING_INT)
3176 value = (long)o->ptr;
3177 else
3178 redisAssert(1 != 1);
3179 }
3180 }
3181
3182 value += incr;
3183 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3184 tryObjectEncoding(o);
3185 retval = dictAdd(c->db->dict,c->argv[1],o);
3186 if (retval == DICT_ERR) {
3187 dictReplace(c->db->dict,c->argv[1],o);
3188 removeExpire(c->db,c->argv[1]);
3189 } else {
3190 incrRefCount(c->argv[1]);
3191 }
3192 server.dirty++;
3193 addReply(c,shared.colon);
3194 addReply(c,o);
3195 addReply(c,shared.crlf);
3196 }
3197
3198 static void incrCommand(redisClient *c) {
3199 incrDecrCommand(c,1);
3200 }
3201
3202 static void decrCommand(redisClient *c) {
3203 incrDecrCommand(c,-1);
3204 }
3205
3206 static void incrbyCommand(redisClient *c) {
3207 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3208 incrDecrCommand(c,incr);
3209 }
3210
3211 static void decrbyCommand(redisClient *c) {
3212 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3213 incrDecrCommand(c,-incr);
3214 }
3215
3216 /* ========================= Type agnostic commands ========================= */
3217
3218 static void delCommand(redisClient *c) {
3219 int deleted = 0, j;
3220
3221 for (j = 1; j < c->argc; j++) {
3222 if (deleteKey(c->db,c->argv[j])) {
3223 server.dirty++;
3224 deleted++;
3225 }
3226 }
3227 switch(deleted) {
3228 case 0:
3229 addReply(c,shared.czero);
3230 break;
3231 case 1:
3232 addReply(c,shared.cone);
3233 break;
3234 default:
3235 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3236 break;
3237 }
3238 }
3239
3240 static void existsCommand(redisClient *c) {
3241 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3242 }
3243
3244 static void selectCommand(redisClient *c) {
3245 int id = atoi(c->argv[1]->ptr);
3246
3247 if (selectDb(c,id) == REDIS_ERR) {
3248 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3249 } else {
3250 addReply(c,shared.ok);
3251 }
3252 }
3253
3254 static void randomkeyCommand(redisClient *c) {
3255 dictEntry *de;
3256
3257 while(1) {
3258 de = dictGetRandomKey(c->db->dict);
3259 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3260 }
3261 if (de == NULL) {
3262 addReply(c,shared.plus);
3263 addReply(c,shared.crlf);
3264 } else {
3265 addReply(c,shared.plus);
3266 addReply(c,dictGetEntryKey(de));
3267 addReply(c,shared.crlf);
3268 }
3269 }
3270
3271 static void keysCommand(redisClient *c) {
3272 dictIterator *di;
3273 dictEntry *de;
3274 sds pattern = c->argv[1]->ptr;
3275 int plen = sdslen(pattern);
3276 unsigned long numkeys = 0, keyslen = 0;
3277 robj *lenobj = createObject(REDIS_STRING,NULL);
3278
3279 di = dictGetIterator(c->db->dict);
3280 addReply(c,lenobj);
3281 decrRefCount(lenobj);
3282 while((de = dictNext(di)) != NULL) {
3283 robj *keyobj = dictGetEntryKey(de);
3284
3285 sds key = keyobj->ptr;
3286 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3287 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3288 if (expireIfNeeded(c->db,keyobj) == 0) {
3289 if (numkeys != 0)
3290 addReply(c,shared.space);
3291 addReply(c,keyobj);
3292 numkeys++;
3293 keyslen += sdslen(key);
3294 }
3295 }
3296 }
3297 dictReleaseIterator(di);
3298 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3299 addReply(c,shared.crlf);
3300 }
3301
3302 static void dbsizeCommand(redisClient *c) {
3303 addReplySds(c,
3304 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3305 }
3306
3307 static void lastsaveCommand(redisClient *c) {
3308 addReplySds(c,
3309 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3310 }
3311
3312 static void typeCommand(redisClient *c) {
3313 robj *o;
3314 char *type;
3315
3316 o = lookupKeyRead(c->db,c->argv[1]);
3317 if (o == NULL) {
3318 type = "+none";
3319 } else {
3320 switch(o->type) {
3321 case REDIS_STRING: type = "+string"; break;
3322 case REDIS_LIST: type = "+list"; break;
3323 case REDIS_SET: type = "+set"; break;
3324 case REDIS_ZSET: type = "+zset"; break;
3325 default: type = "unknown"; break;
3326 }
3327 }
3328 addReplySds(c,sdsnew(type));
3329 addReply(c,shared.crlf);
3330 }
3331
3332 static void saveCommand(redisClient *c) {
3333 if (server.bgsavechildpid != -1) {
3334 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3335 return;
3336 }
3337 if (rdbSave(server.dbfilename) == REDIS_OK) {
3338 addReply(c,shared.ok);
3339 } else {
3340 addReply(c,shared.err);
3341 }
3342 }
3343
3344 static void bgsaveCommand(redisClient *c) {
3345 if (server.bgsavechildpid != -1) {
3346 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3347 return;
3348 }
3349 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3350 char *status = "+Background saving started\r\n";
3351 addReplySds(c,sdsnew(status));
3352 } else {
3353 addReply(c,shared.err);
3354 }
3355 }
3356
3357 static void shutdownCommand(redisClient *c) {
3358 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3359 /* Kill the saving child if there is a background saving in progress.
3360 We want to avoid race conditions, for instance our saving child may
3361 overwrite the synchronous saving did by SHUTDOWN. */
3362 if (server.bgsavechildpid != -1) {
3363 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3364 kill(server.bgsavechildpid,SIGKILL);
3365 rdbRemoveTempFile(server.bgsavechildpid);
3366 }
3367 if (server.appendonly) {
3368 /* Append only file: fsync() the AOF and exit */
3369 fsync(server.appendfd);
3370 exit(0);
3371 } else {
3372 /* Snapshotting. Perform a SYNC SAVE and exit */
3373 if (rdbSave(server.dbfilename) == REDIS_OK) {
3374 if (server.daemonize)
3375 unlink(server.pidfile);
3376 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3377 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3378 exit(0);
3379 } else {
3380 /* Ooops.. error saving! The best we can do is to continue operating.
3381 * Note that if there was a background saving process, in the next
3382 * cron() Redis will be notified that the background saving aborted,
3383 * handling special stuff like slaves pending for synchronization... */
3384 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3385 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3386 }
3387 }
3388 }
3389
3390 static void renameGenericCommand(redisClient *c, int nx) {
3391 robj *o;
3392
3393 /* To use the same key as src and dst is probably an error */
3394 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3395 addReply(c,shared.sameobjecterr);
3396 return;
3397 }
3398
3399 o = lookupKeyWrite(c->db,c->argv[1]);
3400 if (o == NULL) {
3401 addReply(c,shared.nokeyerr);
3402 return;
3403 }
3404 incrRefCount(o);
3405 deleteIfVolatile(c->db,c->argv[2]);
3406 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3407 if (nx) {
3408 decrRefCount(o);
3409 addReply(c,shared.czero);
3410 return;
3411 }
3412 dictReplace(c->db->dict,c->argv[2],o);
3413 } else {
3414 incrRefCount(c->argv[2]);
3415 }
3416 deleteKey(c->db,c->argv[1]);
3417 server.dirty++;
3418 addReply(c,nx ? shared.cone : shared.ok);
3419 }
3420
3421 static void renameCommand(redisClient *c) {
3422 renameGenericCommand(c,0);
3423 }
3424
3425 static void renamenxCommand(redisClient *c) {
3426 renameGenericCommand(c,1);
3427 }
3428
3429 static void moveCommand(redisClient *c) {
3430 robj *o;
3431 redisDb *src, *dst;
3432 int srcid;
3433
3434 /* Obtain source and target DB pointers */
3435 src = c->db;
3436 srcid = c->db->id;
3437 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3438 addReply(c,shared.outofrangeerr);
3439 return;
3440 }
3441 dst = c->db;
3442 selectDb(c,srcid); /* Back to the source DB */
3443
3444 /* If the user is moving using as target the same
3445 * DB as the source DB it is probably an error. */
3446 if (src == dst) {
3447 addReply(c,shared.sameobjecterr);
3448 return;
3449 }
3450
3451 /* Check if the element exists and get a reference */
3452 o = lookupKeyWrite(c->db,c->argv[1]);
3453 if (!o) {
3454 addReply(c,shared.czero);
3455 return;
3456 }
3457
3458 /* Try to add the element to the target DB */
3459 deleteIfVolatile(dst,c->argv[1]);
3460 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3461 addReply(c,shared.czero);
3462 return;
3463 }
3464 incrRefCount(c->argv[1]);
3465 incrRefCount(o);
3466
3467 /* OK! key moved, free the entry in the source DB */
3468 deleteKey(src,c->argv[1]);
3469 server.dirty++;
3470 addReply(c,shared.cone);
3471 }
3472
3473 /* =================================== Lists ================================ */
3474 static void pushGenericCommand(redisClient *c, int where) {
3475 robj *lobj;
3476 list *list;
3477
3478 lobj = lookupKeyWrite(c->db,c->argv[1]);
3479 if (lobj == NULL) {
3480 lobj = createListObject();
3481 list = lobj->ptr;
3482 if (where == REDIS_HEAD) {
3483 listAddNodeHead(list,c->argv[2]);
3484 } else {
3485 listAddNodeTail(list,c->argv[2]);
3486 }
3487 dictAdd(c->db->dict,c->argv[1],lobj);
3488 incrRefCount(c->argv[1]);
3489 incrRefCount(c->argv[2]);
3490 } else {
3491 if (lobj->type != REDIS_LIST) {
3492 addReply(c,shared.wrongtypeerr);
3493 return;
3494 }
3495 list = lobj->ptr;
3496 if (where == REDIS_HEAD) {
3497 listAddNodeHead(list,c->argv[2]);
3498 } else {
3499 listAddNodeTail(list,c->argv[2]);
3500 }
3501 incrRefCount(c->argv[2]);
3502 }
3503 server.dirty++;
3504 addReply(c,shared.ok);
3505 }
3506
3507 static void lpushCommand(redisClient *c) {
3508 pushGenericCommand(c,REDIS_HEAD);
3509 }
3510
3511 static void rpushCommand(redisClient *c) {
3512 pushGenericCommand(c,REDIS_TAIL);
3513 }
3514
3515 static void llenCommand(redisClient *c) {
3516 robj *o;
3517 list *l;
3518
3519 o = lookupKeyRead(c->db,c->argv[1]);
3520 if (o == NULL) {
3521 addReply(c,shared.czero);
3522 return;
3523 } else {
3524 if (o->type != REDIS_LIST) {
3525 addReply(c,shared.wrongtypeerr);
3526 } else {
3527 l = o->ptr;
3528 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3529 }
3530 }
3531 }
3532
3533 static void lindexCommand(redisClient *c) {
3534 robj *o;
3535 int index = atoi(c->argv[2]->ptr);
3536
3537 o = lookupKeyRead(c->db,c->argv[1]);
3538 if (o == NULL) {
3539 addReply(c,shared.nullbulk);
3540 } else {
3541 if (o->type != REDIS_LIST) {
3542 addReply(c,shared.wrongtypeerr);
3543 } else {
3544 list *list = o->ptr;
3545 listNode *ln;
3546
3547 ln = listIndex(list, index);
3548 if (ln == NULL) {
3549 addReply(c,shared.nullbulk);
3550 } else {
3551 robj *ele = listNodeValue(ln);
3552 addReplyBulkLen(c,ele);
3553 addReply(c,ele);
3554 addReply(c,shared.crlf);
3555 }
3556 }
3557 }
3558 }
3559
3560 static void lsetCommand(redisClient *c) {
3561 robj *o;
3562 int index = atoi(c->argv[2]->ptr);
3563
3564 o = lookupKeyWrite(c->db,c->argv[1]);
3565 if (o == NULL) {
3566 addReply(c,shared.nokeyerr);
3567 } else {
3568 if (o->type != REDIS_LIST) {
3569 addReply(c,shared.wrongtypeerr);
3570 } else {
3571 list *list = o->ptr;
3572 listNode *ln;
3573
3574 ln = listIndex(list, index);
3575 if (ln == NULL) {
3576 addReply(c,shared.outofrangeerr);
3577 } else {
3578 robj *ele = listNodeValue(ln);
3579
3580 decrRefCount(ele);
3581 listNodeValue(ln) = c->argv[3];
3582 incrRefCount(c->argv[3]);
3583 addReply(c,shared.ok);
3584 server.dirty++;
3585 }
3586 }
3587 }
3588 }
3589
3590 static void popGenericCommand(redisClient *c, int where) {
3591 robj *o;
3592
3593 o = lookupKeyWrite(c->db,c->argv[1]);
3594 if (o == NULL) {
3595 addReply(c,shared.nullbulk);
3596 } else {
3597 if (o->type != REDIS_LIST) {
3598 addReply(c,shared.wrongtypeerr);
3599 } else {
3600 list *list = o->ptr;
3601 listNode *ln;
3602
3603 if (where == REDIS_HEAD)
3604 ln = listFirst(list);
3605 else
3606 ln = listLast(list);
3607
3608 if (ln == NULL) {
3609 addReply(c,shared.nullbulk);
3610 } else {
3611 robj *ele = listNodeValue(ln);
3612 addReplyBulkLen(c,ele);
3613 addReply(c,ele);
3614 addReply(c,shared.crlf);
3615 listDelNode(list,ln);
3616 server.dirty++;
3617 }
3618 }
3619 }
3620 }
3621
3622 static void lpopCommand(redisClient *c) {
3623 popGenericCommand(c,REDIS_HEAD);
3624 }
3625
3626 static void rpopCommand(redisClient *c) {
3627 popGenericCommand(c,REDIS_TAIL);
3628 }
3629
3630 static void lrangeCommand(redisClient *c) {
3631 robj *o;
3632 int start = atoi(c->argv[2]->ptr);
3633 int end = atoi(c->argv[3]->ptr);
3634
3635 o = lookupKeyRead(c->db,c->argv[1]);
3636 if (o == NULL) {
3637 addReply(c,shared.nullmultibulk);
3638 } else {
3639 if (o->type != REDIS_LIST) {
3640 addReply(c,shared.wrongtypeerr);
3641 } else {
3642 list *list = o->ptr;
3643 listNode *ln;
3644 int llen = listLength(list);
3645 int rangelen, j;
3646 robj *ele;
3647
3648 /* convert negative indexes */
3649 if (start < 0) start = llen+start;
3650 if (end < 0) end = llen+end;
3651 if (start < 0) start = 0;
3652 if (end < 0) end = 0;
3653
3654 /* indexes sanity checks */
3655 if (start > end || start >= llen) {
3656 /* Out of range start or start > end result in empty list */
3657 addReply(c,shared.emptymultibulk);
3658 return;
3659 }
3660 if (end >= llen) end = llen-1;
3661 rangelen = (end-start)+1;
3662
3663 /* Return the result in form of a multi-bulk reply */
3664 ln = listIndex(list, start);
3665 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3666 for (j = 0; j < rangelen; j++) {
3667 ele = listNodeValue(ln);
3668 addReplyBulkLen(c,ele);
3669 addReply(c,ele);
3670 addReply(c,shared.crlf);
3671 ln = ln->next;
3672 }
3673 }
3674 }
3675 }
3676
3677 static void ltrimCommand(redisClient *c) {
3678 robj *o;
3679 int start = atoi(c->argv[2]->ptr);
3680 int end = atoi(c->argv[3]->ptr);
3681
3682 o = lookupKeyWrite(c->db,c->argv[1]);
3683 if (o == NULL) {
3684 addReply(c,shared.ok);
3685 } else {
3686 if (o->type != REDIS_LIST) {
3687 addReply(c,shared.wrongtypeerr);
3688 } else {
3689 list *list = o->ptr;
3690 listNode *ln;
3691 int llen = listLength(list);
3692 int j, ltrim, rtrim;
3693
3694 /* convert negative indexes */
3695 if (start < 0) start = llen+start;
3696 if (end < 0) end = llen+end;
3697 if (start < 0) start = 0;
3698 if (end < 0) end = 0;
3699
3700 /* indexes sanity checks */
3701 if (start > end || start >= llen) {
3702 /* Out of range start or start > end result in empty list */
3703 ltrim = llen;
3704 rtrim = 0;
3705 } else {
3706 if (end >= llen) end = llen-1;
3707 ltrim = start;
3708 rtrim = llen-end-1;
3709 }
3710
3711 /* Remove list elements to perform the trim */
3712 for (j = 0; j < ltrim; j++) {
3713 ln = listFirst(list);
3714 listDelNode(list,ln);
3715 }
3716 for (j = 0; j < rtrim; j++) {
3717 ln = listLast(list);
3718 listDelNode(list,ln);
3719 }
3720 server.dirty++;
3721 addReply(c,shared.ok);
3722 }
3723 }
3724 }
3725
3726 static void lremCommand(redisClient *c) {
3727 robj *o;
3728
3729 o = lookupKeyWrite(c->db,c->argv[1]);
3730 if (o == NULL) {
3731 addReply(c,shared.czero);
3732 } else {
3733 if (o->type != REDIS_LIST) {
3734 addReply(c,shared.wrongtypeerr);
3735 } else {
3736 list *list = o->ptr;
3737 listNode *ln, *next;
3738 int toremove = atoi(c->argv[2]->ptr);
3739 int removed = 0;
3740 int fromtail = 0;
3741
3742 if (toremove < 0) {
3743 toremove = -toremove;
3744 fromtail = 1;
3745 }
3746 ln = fromtail ? list->tail : list->head;
3747 while (ln) {
3748 robj *ele = listNodeValue(ln);
3749
3750 next = fromtail ? ln->prev : ln->next;
3751 if (compareStringObjects(ele,c->argv[3]) == 0) {
3752 listDelNode(list,ln);
3753 server.dirty++;
3754 removed++;
3755 if (toremove && removed == toremove) break;
3756 }
3757 ln = next;
3758 }
3759 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3760 }
3761 }
3762 }
3763
3764 /* This is the semantic of this command:
3765 * RPOPLPUSH srclist dstlist:
3766 * IF LLEN(srclist) > 0
3767 * element = RPOP srclist
3768 * LPUSH dstlist element
3769 * RETURN element
3770 * ELSE
3771 * RETURN nil
3772 * END
3773 * END
3774 *
3775 * The idea is to be able to get an element from a list in a reliable way
3776 * since the element is not just returned but pushed against another list
3777 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3778 */
3779 static void rpoplpushcommand(redisClient *c) {
3780 robj *sobj;
3781
3782 sobj = lookupKeyWrite(c->db,c->argv[1]);
3783 if (sobj == NULL) {
3784 addReply(c,shared.nullbulk);
3785 } else {
3786 if (sobj->type != REDIS_LIST) {
3787 addReply(c,shared.wrongtypeerr);
3788 } else {
3789 list *srclist = sobj->ptr;
3790 listNode *ln = listLast(srclist);
3791
3792 if (ln == NULL) {
3793 addReply(c,shared.nullbulk);
3794 } else {
3795 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3796 robj *ele = listNodeValue(ln);
3797 list *dstlist;
3798
3799 if (dobj == NULL) {
3800
3801 /* Create the list if the key does not exist */
3802 dobj = createListObject();
3803 dictAdd(c->db->dict,c->argv[2],dobj);
3804 incrRefCount(c->argv[2]);
3805 } else if (dobj->type != REDIS_LIST) {
3806 addReply(c,shared.wrongtypeerr);
3807 return;
3808 }
3809 /* Add the element to the target list */
3810 dstlist = dobj->ptr;
3811 listAddNodeHead(dstlist,ele);
3812 incrRefCount(ele);
3813
3814 /* Send the element to the client as reply as well */
3815 addReplyBulkLen(c,ele);
3816 addReply(c,ele);
3817 addReply(c,shared.crlf);
3818
3819 /* Finally remove the element from the source list */
3820 listDelNode(srclist,ln);
3821 server.dirty++;
3822 }
3823 }
3824 }
3825 }
3826
3827
3828 /* ==================================== Sets ================================ */
3829
3830 static void saddCommand(redisClient *c) {
3831 robj *set;
3832
3833 set = lookupKeyWrite(c->db,c->argv[1]);
3834 if (set == NULL) {
3835 set = createSetObject();
3836 dictAdd(c->db->dict,c->argv[1],set);
3837 incrRefCount(c->argv[1]);
3838 } else {
3839 if (set->type != REDIS_SET) {
3840 addReply(c,shared.wrongtypeerr);
3841 return;
3842 }
3843 }
3844 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3845 incrRefCount(c->argv[2]);
3846 server.dirty++;
3847 addReply(c,shared.cone);
3848 } else {
3849 addReply(c,shared.czero);
3850 }
3851 }
3852
3853 static void sremCommand(redisClient *c) {
3854 robj *set;
3855
3856 set = lookupKeyWrite(c->db,c->argv[1]);
3857 if (set == NULL) {
3858 addReply(c,shared.czero);
3859 } else {
3860 if (set->type != REDIS_SET) {
3861 addReply(c,shared.wrongtypeerr);
3862 return;
3863 }
3864 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3865 server.dirty++;
3866 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3867 addReply(c,shared.cone);
3868 } else {
3869 addReply(c,shared.czero);
3870 }
3871 }
3872 }
3873
3874 static void smoveCommand(redisClient *c) {
3875 robj *srcset, *dstset;
3876
3877 srcset = lookupKeyWrite(c->db,c->argv[1]);
3878 dstset = lookupKeyWrite(c->db,c->argv[2]);
3879
3880 /* If the source key does not exist return 0, if it's of the wrong type
3881 * raise an error */
3882 if (srcset == NULL || srcset->type != REDIS_SET) {
3883 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3884 return;
3885 }
3886 /* Error if the destination key is not a set as well */
3887 if (dstset && dstset->type != REDIS_SET) {
3888 addReply(c,shared.wrongtypeerr);
3889 return;
3890 }
3891 /* Remove the element from the source set */
3892 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3893 /* Key not found in the src set! return zero */
3894 addReply(c,shared.czero);
3895 return;
3896 }
3897 server.dirty++;
3898 /* Add the element to the destination set */
3899 if (!dstset) {
3900 dstset = createSetObject();
3901 dictAdd(c->db->dict,c->argv[2],dstset);
3902 incrRefCount(c->argv[2]);
3903 }
3904 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3905 incrRefCount(c->argv[3]);
3906 addReply(c,shared.cone);
3907 }
3908
3909 static void sismemberCommand(redisClient *c) {
3910 robj *set;
3911
3912 set = lookupKeyRead(c->db,c->argv[1]);
3913 if (set == NULL) {
3914 addReply(c,shared.czero);
3915 } else {
3916 if (set->type != REDIS_SET) {
3917 addReply(c,shared.wrongtypeerr);
3918 return;
3919 }
3920 if (dictFind(set->ptr,c->argv[2]))
3921 addReply(c,shared.cone);
3922 else
3923 addReply(c,shared.czero);
3924 }
3925 }
3926
3927 static void scardCommand(redisClient *c) {
3928 robj *o;
3929 dict *s;
3930
3931 o = lookupKeyRead(c->db,c->argv[1]);
3932 if (o == NULL) {
3933 addReply(c,shared.czero);
3934 return;
3935 } else {
3936 if (o->type != REDIS_SET) {
3937 addReply(c,shared.wrongtypeerr);
3938 } else {
3939 s = o->ptr;
3940 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
3941 dictSize(s)));
3942 }
3943 }
3944 }
3945
3946 static void spopCommand(redisClient *c) {
3947 robj *set;
3948 dictEntry *de;
3949
3950 set = lookupKeyWrite(c->db,c->argv[1]);
3951 if (set == NULL) {
3952 addReply(c,shared.nullbulk);
3953 } else {
3954 if (set->type != REDIS_SET) {
3955 addReply(c,shared.wrongtypeerr);
3956 return;
3957 }
3958 de = dictGetRandomKey(set->ptr);
3959 if (de == NULL) {
3960 addReply(c,shared.nullbulk);
3961 } else {
3962 robj *ele = dictGetEntryKey(de);
3963
3964 addReplyBulkLen(c,ele);
3965 addReply(c,ele);
3966 addReply(c,shared.crlf);
3967 dictDelete(set->ptr,ele);
3968 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3969 server.dirty++;
3970 }
3971 }
3972 }
3973
3974 static void srandmemberCommand(redisClient *c) {
3975 robj *set;
3976 dictEntry *de;
3977
3978 set = lookupKeyRead(c->db,c->argv[1]);
3979 if (set == NULL) {
3980 addReply(c,shared.nullbulk);
3981 } else {
3982 if (set->type != REDIS_SET) {
3983 addReply(c,shared.wrongtypeerr);
3984 return;
3985 }
3986 de = dictGetRandomKey(set->ptr);
3987 if (de == NULL) {
3988 addReply(c,shared.nullbulk);
3989 } else {
3990 robj *ele = dictGetEntryKey(de);
3991
3992 addReplyBulkLen(c,ele);
3993 addReply(c,ele);
3994 addReply(c,shared.crlf);
3995 }
3996 }
3997 }
3998
3999 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
4000 dict **d1 = (void*) s1, **d2 = (void*) s2;
4001
4002 return dictSize(*d1)-dictSize(*d2);
4003 }
4004
4005 static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
4006 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4007 dictIterator *di;
4008 dictEntry *de;
4009 robj *lenobj = NULL, *dstset = NULL;
4010 unsigned long j, cardinality = 0;
4011
4012 for (j = 0; j < setsnum; j++) {
4013 robj *setobj;
4014
4015 setobj = dstkey ?
4016 lookupKeyWrite(c->db,setskeys[j]) :
4017 lookupKeyRead(c->db,setskeys[j]);
4018 if (!setobj) {
4019 zfree(dv);
4020 if (dstkey) {
4021 if (deleteKey(c->db,dstkey))
4022 server.dirty++;
4023 addReply(c,shared.czero);
4024 } else {
4025 addReply(c,shared.nullmultibulk);
4026 }
4027 return;
4028 }
4029 if (setobj->type != REDIS_SET) {
4030 zfree(dv);
4031 addReply(c,shared.wrongtypeerr);
4032 return;
4033 }
4034 dv[j] = setobj->ptr;
4035 }
4036 /* Sort sets from the smallest to largest, this will improve our
4037 * algorithm's performace */
4038 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
4039
4040 /* The first thing we should output is the total number of elements...
4041 * since this is a multi-bulk write, but at this stage we don't know
4042 * the intersection set size, so we use a trick, append an empty object
4043 * to the output list and save the pointer to later modify it with the
4044 * right length */
4045 if (!dstkey) {
4046 lenobj = createObject(REDIS_STRING,NULL);
4047 addReply(c,lenobj);
4048 decrRefCount(lenobj);
4049 } else {
4050 /* If we have a target key where to store the resulting set
4051 * create this key with an empty set inside */
4052 dstset = createSetObject();
4053 }
4054
4055 /* Iterate all the elements of the first (smallest) set, and test
4056 * the element against all the other sets, if at least one set does
4057 * not include the element it is discarded */
4058 di = dictGetIterator(dv[0]);
4059
4060 while((de = dictNext(di)) != NULL) {
4061 robj *ele;
4062
4063 for (j = 1; j < setsnum; j++)
4064 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4065 if (j != setsnum)
4066 continue; /* at least one set does not contain the member */
4067 ele = dictGetEntryKey(de);
4068 if (!dstkey) {
4069 addReplyBulkLen(c,ele);
4070 addReply(c,ele);
4071 addReply(c,shared.crlf);
4072 cardinality++;
4073 } else {
4074 dictAdd(dstset->ptr,ele,NULL);
4075 incrRefCount(ele);
4076 }
4077 }
4078 dictReleaseIterator(di);
4079
4080 if (dstkey) {
4081 /* Store the resulting set into the target */
4082 deleteKey(c->db,dstkey);
4083 dictAdd(c->db->dict,dstkey,dstset);
4084 incrRefCount(dstkey);
4085 }
4086
4087 if (!dstkey) {
4088 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
4089 } else {
4090 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4091 dictSize((dict*)dstset->ptr)));
4092 server.dirty++;
4093 }
4094 zfree(dv);
4095 }
4096
4097 static void sinterCommand(redisClient *c) {
4098 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4099 }
4100
4101 static void sinterstoreCommand(redisClient *c) {
4102 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4103 }
4104
4105 #define REDIS_OP_UNION 0
4106 #define REDIS_OP_DIFF 1
4107
4108 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4109 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4110 dictIterator *di;
4111 dictEntry *de;
4112 robj *dstset = NULL;
4113 int j, cardinality = 0;
4114
4115 for (j = 0; j < setsnum; j++) {
4116 robj *setobj;
4117
4118 setobj = dstkey ?
4119 lookupKeyWrite(c->db,setskeys[j]) :
4120 lookupKeyRead(c->db,setskeys[j]);
4121 if (!setobj) {
4122 dv[j] = NULL;
4123 continue;
4124 }
4125 if (setobj->type != REDIS_SET) {
4126 zfree(dv);
4127 addReply(c,shared.wrongtypeerr);
4128 return;
4129 }
4130 dv[j] = setobj->ptr;
4131 }
4132
4133 /* We need a temp set object to store our union. If the dstkey
4134 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4135 * this set object will be the resulting object to set into the target key*/
4136 dstset = createSetObject();
4137
4138 /* Iterate all the elements of all the sets, add every element a single
4139 * time to the result set */
4140 for (j = 0; j < setsnum; j++) {
4141 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4142 if (!dv[j]) continue; /* non existing keys are like empty sets */
4143
4144 di = dictGetIterator(dv[j]);
4145
4146 while((de = dictNext(di)) != NULL) {
4147 robj *ele;
4148
4149 /* dictAdd will not add the same element multiple times */
4150 ele = dictGetEntryKey(de);
4151 if (op == REDIS_OP_UNION || j == 0) {
4152 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4153 incrRefCount(ele);
4154 cardinality++;
4155 }
4156 } else if (op == REDIS_OP_DIFF) {
4157 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4158 cardinality--;
4159 }
4160 }
4161 }
4162 dictReleaseIterator(di);
4163
4164 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4165 }
4166
4167 /* Output the content of the resulting set, if not in STORE mode */
4168 if (!dstkey) {
4169 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4170 di = dictGetIterator(dstset->ptr);
4171 while((de = dictNext(di)) != NULL) {
4172 robj *ele;
4173
4174 ele = dictGetEntryKey(de);
4175 addReplyBulkLen(c,ele);
4176 addReply(c,ele);
4177 addReply(c,shared.crlf);
4178 }
4179 dictReleaseIterator(di);
4180 } else {
4181 /* If we have a target key where to store the resulting set
4182 * create this key with the result set inside */
4183 deleteKey(c->db,dstkey);
4184 dictAdd(c->db->dict,dstkey,dstset);
4185 incrRefCount(dstkey);
4186 }
4187
4188 /* Cleanup */
4189 if (!dstkey) {
4190 decrRefCount(dstset);
4191 } else {
4192 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4193 dictSize((dict*)dstset->ptr)));
4194 server.dirty++;
4195 }
4196 zfree(dv);
4197 }
4198
4199 static void sunionCommand(redisClient *c) {
4200 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4201 }
4202
4203 static void sunionstoreCommand(redisClient *c) {
4204 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4205 }
4206
4207 static void sdiffCommand(redisClient *c) {
4208 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4209 }
4210
4211 static void sdiffstoreCommand(redisClient *c) {
4212 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4213 }
4214
4215 /* ==================================== ZSets =============================== */
4216
4217 /* ZSETs are ordered sets using two data structures to hold the same elements
4218 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4219 * data structure.
4220 *
4221 * The elements are added to an hash table mapping Redis objects to scores.
4222 * At the same time the elements are added to a skip list mapping scores
4223 * to Redis objects (so objects are sorted by scores in this "view"). */
4224
4225 /* This skiplist implementation is almost a C translation of the original
4226 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4227 * Alternative to Balanced Trees", modified in three ways:
4228 * a) this implementation allows for repeated values.
4229 * b) the comparison is not just by key (our 'score') but by satellite data.
4230 * c) there is a back pointer, so it's a doubly linked list with the back
4231 * pointers being only at "level 1". This allows to traverse the list
4232 * from tail to head, useful for ZREVRANGE. */
4233
4234 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4235 zskiplistNode *zn = zmalloc(sizeof(*zn));
4236
4237 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4238 zn->score = score;
4239 zn->obj = obj;
4240 return zn;
4241 }
4242
4243 static zskiplist *zslCreate(void) {
4244 int j;
4245 zskiplist *zsl;
4246
4247 zsl = zmalloc(sizeof(*zsl));
4248 zsl->level = 1;
4249 zsl->length = 0;
4250 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4251 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4252 zsl->header->forward[j] = NULL;
4253 zsl->header->backward = NULL;
4254 zsl->tail = NULL;
4255 return zsl;
4256 }
4257
4258 static void zslFreeNode(zskiplistNode *node) {
4259 decrRefCount(node->obj);
4260 zfree(node->forward);
4261 zfree(node);
4262 }
4263
4264 static void zslFree(zskiplist *zsl) {
4265 zskiplistNode *node = zsl->header->forward[0], *next;
4266
4267 zfree(zsl->header->forward);
4268 zfree(zsl->header);
4269 while(node) {
4270 next = node->forward[0];
4271 zslFreeNode(node);
4272 node = next;
4273 }
4274 zfree(zsl);
4275 }
4276
4277 static int zslRandomLevel(void) {
4278 int level = 1;
4279 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4280 level += 1;
4281 return level;
4282 }
4283
4284 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4285 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4286 int i, level;
4287
4288 x = zsl->header;
4289 for (i = zsl->level-1; i >= 0; i--) {
4290 while (x->forward[i] &&
4291 (x->forward[i]->score < score ||
4292 (x->forward[i]->score == score &&
4293 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4294 x = x->forward[i];
4295 update[i] = x;
4296 }
4297 /* we assume the key is not already inside, since we allow duplicated
4298 * scores, and the re-insertion of score and redis object should never
4299 * happpen since the caller of zslInsert() should test in the hash table
4300 * if the element is already inside or not. */
4301 level = zslRandomLevel();
4302 if (level > zsl->level) {
4303 for (i = zsl->level; i < level; i++)
4304 update[i] = zsl->header;
4305 zsl->level = level;
4306 }
4307 x = zslCreateNode(level,score,obj);
4308 for (i = 0; i < level; i++) {
4309 x->forward[i] = update[i]->forward[i];
4310 update[i]->forward[i] = x;
4311 }
4312 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4313 if (x->forward[0])
4314 x->forward[0]->backward = x;
4315 else
4316 zsl->tail = x;
4317 zsl->length++;
4318 }
4319
4320 /* Delete an element with matching score/object from the skiplist. */
4321 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4322 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4323 int i;
4324
4325 x = zsl->header;
4326 for (i = zsl->level-1; i >= 0; i--) {
4327 while (x->forward[i] &&
4328 (x->forward[i]->score < score ||
4329 (x->forward[i]->score == score &&
4330 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4331 x = x->forward[i];
4332 update[i] = x;
4333 }
4334 /* We may have multiple elements with the same score, what we need
4335 * is to find the element with both the right score and object. */
4336 x = x->forward[0];
4337 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4338 for (i = 0; i < zsl->level; i++) {
4339 if (update[i]->forward[i] != x) break;
4340 update[i]->forward[i] = x->forward[i];
4341 }
4342 if (x->forward[0]) {
4343 x->forward[0]->backward = (x->backward == zsl->header) ?
4344 NULL : x->backward;
4345 } else {
4346 zsl->tail = x->backward;
4347 }
4348 zslFreeNode(x);
4349 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4350 zsl->level--;
4351 zsl->length--;
4352 return 1;
4353 } else {
4354 return 0; /* not found */
4355 }
4356 return 0; /* not found */
4357 }
4358
4359 /* Delete all the elements with score between min and max from the skiplist.
4360 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4361 * Note that this function takes the reference to the hash table view of the
4362 * sorted set, in order to remove the elements from the hash table too. */
4363 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4364 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4365 unsigned long removed = 0;
4366 int i;
4367
4368 x = zsl->header;
4369 for (i = zsl->level-1; i >= 0; i--) {
4370 while (x->forward[i] && x->forward[i]->score < min)
4371 x = x->forward[i];
4372 update[i] = x;
4373 }
4374 /* We may have multiple elements with the same score, what we need
4375 * is to find the element with both the right score and object. */
4376 x = x->forward[0];
4377 while (x && x->score <= max) {
4378 zskiplistNode *next;
4379
4380 for (i = 0; i < zsl->level; i++) {
4381 if (update[i]->forward[i] != x) break;
4382 update[i]->forward[i] = x->forward[i];
4383 }
4384 if (x->forward[0]) {
4385 x->forward[0]->backward = (x->backward == zsl->header) ?
4386 NULL : x->backward;
4387 } else {
4388 zsl->tail = x->backward;
4389 }
4390 next = x->forward[0];
4391 dictDelete(dict,x->obj);
4392 zslFreeNode(x);
4393 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4394 zsl->level--;
4395 zsl->length--;
4396 removed++;
4397 x = next;
4398 }
4399 return removed; /* not found */
4400 }
4401
4402 /* Find the first node having a score equal or greater than the specified one.
4403 * Returns NULL if there is no match. */
4404 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4405 zskiplistNode *x;
4406 int i;
4407
4408 x = zsl->header;
4409 for (i = zsl->level-1; i >= 0; i--) {
4410 while (x->forward[i] && x->forward[i]->score < score)
4411 x = x->forward[i];
4412 }
4413 /* We may have multiple elements with the same score, what we need
4414 * is to find the element with both the right score and object. */
4415 return x->forward[0];
4416 }
4417
4418 /* The actual Z-commands implementations */
4419
4420 /* This generic command implements both ZADD and ZINCRBY.
4421 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4422 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4423 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4424 robj *zsetobj;
4425 zset *zs;
4426 double *score;
4427
4428 zsetobj = lookupKeyWrite(c->db,key);
4429 if (zsetobj == NULL) {
4430 zsetobj = createZsetObject();
4431 dictAdd(c->db->dict,key,zsetobj);
4432 incrRefCount(key);
4433 } else {
4434 if (zsetobj->type != REDIS_ZSET) {
4435 addReply(c,shared.wrongtypeerr);
4436 return;
4437 }
4438 }
4439 zs = zsetobj->ptr;
4440
4441 /* Ok now since we implement both ZADD and ZINCRBY here the code
4442 * needs to handle the two different conditions. It's all about setting
4443 * '*score', that is, the new score to set, to the right value. */
4444 score = zmalloc(sizeof(double));
4445 if (doincrement) {
4446 dictEntry *de;
4447
4448 /* Read the old score. If the element was not present starts from 0 */
4449 de = dictFind(zs->dict,ele);
4450 if (de) {
4451 double *oldscore = dictGetEntryVal(de);
4452 *score = *oldscore + scoreval;
4453 } else {
4454 *score = scoreval;
4455 }
4456 } else {
4457 *score = scoreval;
4458 }
4459
4460 /* What follows is a simple remove and re-insert operation that is common
4461 * to both ZADD and ZINCRBY... */
4462 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4463 /* case 1: New element */
4464 incrRefCount(ele); /* added to hash */
4465 zslInsert(zs->zsl,*score,ele);
4466 incrRefCount(ele); /* added to skiplist */
4467 server.dirty++;
4468 if (doincrement)
4469 addReplyDouble(c,*score);
4470 else
4471 addReply(c,shared.cone);
4472 } else {
4473 dictEntry *de;
4474 double *oldscore;
4475
4476 /* case 2: Score update operation */
4477 de = dictFind(zs->dict,ele);
4478 redisAssert(de != NULL);
4479 oldscore = dictGetEntryVal(de);
4480 if (*score != *oldscore) {
4481 int deleted;
4482
4483 /* Remove and insert the element in the skip list with new score */
4484 deleted = zslDelete(zs->zsl,*oldscore,ele);
4485 redisAssert(deleted != 0);
4486 zslInsert(zs->zsl,*score,ele);
4487 incrRefCount(ele);
4488 /* Update the score in the hash table */
4489 dictReplace(zs->dict,ele,score);
4490 server.dirty++;
4491 } else {
4492 zfree(score);
4493 }
4494 if (doincrement)
4495 addReplyDouble(c,*score);
4496 else
4497 addReply(c,shared.czero);
4498 }
4499 }
4500
4501 static void zaddCommand(redisClient *c) {
4502 double scoreval;
4503
4504 scoreval = strtod(c->argv[2]->ptr,NULL);
4505 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4506 }
4507
4508 static void zincrbyCommand(redisClient *c) {
4509 double scoreval;
4510
4511 scoreval = strtod(c->argv[2]->ptr,NULL);
4512 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4513 }
4514
4515 static void zremCommand(redisClient *c) {
4516 robj *zsetobj;
4517 zset *zs;
4518
4519 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4520 if (zsetobj == NULL) {
4521 addReply(c,shared.czero);
4522 } else {
4523 dictEntry *de;
4524 double *oldscore;
4525 int deleted;
4526
4527 if (zsetobj->type != REDIS_ZSET) {
4528 addReply(c,shared.wrongtypeerr);
4529 return;
4530 }
4531 zs = zsetobj->ptr;
4532 de = dictFind(zs->dict,c->argv[2]);
4533 if (de == NULL) {
4534 addReply(c,shared.czero);
4535 return;
4536 }
4537 /* Delete from the skiplist */
4538 oldscore = dictGetEntryVal(de);
4539 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4540 redisAssert(deleted != 0);
4541
4542 /* Delete from the hash table */
4543 dictDelete(zs->dict,c->argv[2]);
4544 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4545 server.dirty++;
4546 addReply(c,shared.cone);
4547 }
4548 }
4549
4550 static void zremrangebyscoreCommand(redisClient *c) {
4551 double min = strtod(c->argv[2]->ptr,NULL);
4552 double max = strtod(c->argv[3]->ptr,NULL);
4553 robj *zsetobj;
4554 zset *zs;
4555
4556 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4557 if (zsetobj == NULL) {
4558 addReply(c,shared.czero);
4559 } else {
4560 long deleted;
4561
4562 if (zsetobj->type != REDIS_ZSET) {
4563 addReply(c,shared.wrongtypeerr);
4564 return;
4565 }
4566 zs = zsetobj->ptr;
4567 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4568 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4569 server.dirty += deleted;
4570 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4571 }
4572 }
4573
4574 static void zrangeGenericCommand(redisClient *c, int reverse) {
4575 robj *o;
4576 int start = atoi(c->argv[2]->ptr);
4577 int end = atoi(c->argv[3]->ptr);
4578 int withscores = 0;
4579
4580 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
4581 withscores = 1;
4582 } else if (c->argc >= 5) {
4583 addReply(c,shared.syntaxerr);
4584 return;
4585 }
4586
4587 o = lookupKeyRead(c->db,c->argv[1]);
4588 if (o == NULL) {
4589 addReply(c,shared.nullmultibulk);
4590 } else {
4591 if (o->type != REDIS_ZSET) {
4592 addReply(c,shared.wrongtypeerr);
4593 } else {
4594 zset *zsetobj = o->ptr;
4595 zskiplist *zsl = zsetobj->zsl;
4596 zskiplistNode *ln;
4597
4598 int llen = zsl->length;
4599 int rangelen, j;
4600 robj *ele;
4601
4602 /* convert negative indexes */
4603 if (start < 0) start = llen+start;
4604 if (end < 0) end = llen+end;
4605 if (start < 0) start = 0;
4606 if (end < 0) end = 0;
4607
4608 /* indexes sanity checks */
4609 if (start > end || start >= llen) {
4610 /* Out of range start or start > end result in empty list */
4611 addReply(c,shared.emptymultibulk);
4612 return;
4613 }
4614 if (end >= llen) end = llen-1;
4615 rangelen = (end-start)+1;
4616
4617 /* Return the result in form of a multi-bulk reply */
4618 if (reverse) {
4619 ln = zsl->tail;
4620 while (start--)
4621 ln = ln->backward;
4622 } else {
4623 ln = zsl->header->forward[0];
4624 while (start--)
4625 ln = ln->forward[0];
4626 }
4627
4628 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
4629 withscores ? (rangelen*2) : rangelen));
4630 for (j = 0; j < rangelen; j++) {
4631 ele = ln->obj;
4632 addReplyBulkLen(c,ele);
4633 addReply(c,ele);
4634 addReply(c,shared.crlf);
4635 if (withscores)
4636 addReplyDouble(c,ln->score);
4637 ln = reverse ? ln->backward : ln->forward[0];
4638 }
4639 }
4640 }
4641 }
4642
4643 static void zrangeCommand(redisClient *c) {
4644 zrangeGenericCommand(c,0);
4645 }
4646
4647 static void zrevrangeCommand(redisClient *c) {
4648 zrangeGenericCommand(c,1);
4649 }
4650
4651 static void zrangebyscoreCommand(redisClient *c) {
4652 robj *o;
4653 double min = strtod(c->argv[2]->ptr,NULL);
4654 double max = strtod(c->argv[3]->ptr,NULL);
4655 int offset = 0, limit = -1;
4656
4657 if (c->argc != 4 && c->argc != 7) {
4658 addReplySds(c,
4659 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4660 return;
4661 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4662 addReply(c,shared.syntaxerr);
4663 return;
4664 } else if (c->argc == 7) {
4665 offset = atoi(c->argv[5]->ptr);
4666 limit = atoi(c->argv[6]->ptr);
4667 if (offset < 0) offset = 0;
4668 }
4669
4670 o = lookupKeyRead(c->db,c->argv[1]);
4671 if (o == NULL) {
4672 addReply(c,shared.nullmultibulk);
4673 } else {
4674 if (o->type != REDIS_ZSET) {
4675 addReply(c,shared.wrongtypeerr);
4676 } else {
4677 zset *zsetobj = o->ptr;
4678 zskiplist *zsl = zsetobj->zsl;
4679 zskiplistNode *ln;
4680 robj *ele, *lenobj;
4681 unsigned int rangelen = 0;
4682
4683 /* Get the first node with the score >= min */
4684 ln = zslFirstWithScore(zsl,min);
4685 if (ln == NULL) {
4686 /* No element matching the speciifed interval */
4687 addReply(c,shared.emptymultibulk);
4688 return;
4689 }
4690
4691 /* We don't know in advance how many matching elements there
4692 * are in the list, so we push this object that will represent
4693 * the multi-bulk length in the output buffer, and will "fix"
4694 * it later */
4695 lenobj = createObject(REDIS_STRING,NULL);
4696 addReply(c,lenobj);
4697 decrRefCount(lenobj);
4698
4699 while(ln && ln->score <= max) {
4700 if (offset) {
4701 offset--;
4702 ln = ln->forward[0];
4703 continue;
4704 }
4705 if (limit == 0) break;
4706 ele = ln->obj;
4707 addReplyBulkLen(c,ele);
4708 addReply(c,ele);
4709 addReply(c,shared.crlf);
4710 ln = ln->forward[0];
4711 rangelen++;
4712 if (limit > 0) limit--;
4713 }
4714 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4715 }
4716 }
4717 }
4718
4719 static void zcardCommand(redisClient *c) {
4720 robj *o;
4721 zset *zs;
4722
4723 o = lookupKeyRead(c->db,c->argv[1]);
4724 if (o == NULL) {
4725 addReply(c,shared.czero);
4726 return;
4727 } else {
4728 if (o->type != REDIS_ZSET) {
4729 addReply(c,shared.wrongtypeerr);
4730 } else {
4731 zs = o->ptr;
4732 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
4733 }
4734 }
4735 }
4736
4737 static void zscoreCommand(redisClient *c) {
4738 robj *o;
4739 zset *zs;
4740
4741 o = lookupKeyRead(c->db,c->argv[1]);
4742 if (o == NULL) {
4743 addReply(c,shared.nullbulk);
4744 return;
4745 } else {
4746 if (o->type != REDIS_ZSET) {
4747 addReply(c,shared.wrongtypeerr);
4748 } else {
4749 dictEntry *de;
4750
4751 zs = o->ptr;
4752 de = dictFind(zs->dict,c->argv[2]);
4753 if (!de) {
4754 addReply(c,shared.nullbulk);
4755 } else {
4756 double *score = dictGetEntryVal(de);
4757
4758 addReplyDouble(c,*score);
4759 }
4760 }
4761 }
4762 }
4763
4764 /* ========================= Non type-specific commands ==================== */
4765
4766 static void flushdbCommand(redisClient *c) {
4767 server.dirty += dictSize(c->db->dict);
4768 dictEmpty(c->db->dict);
4769 dictEmpty(c->db->expires);
4770 addReply(c,shared.ok);
4771 }
4772
4773 static void flushallCommand(redisClient *c) {
4774 server.dirty += emptyDb();
4775 addReply(c,shared.ok);
4776 rdbSave(server.dbfilename);
4777 server.dirty++;
4778 }
4779
4780 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4781 redisSortOperation *so = zmalloc(sizeof(*so));
4782 so->type = type;
4783 so->pattern = pattern;
4784 return so;
4785 }
4786
4787 /* Return the value associated to the key with a name obtained
4788 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4789 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4790 char *p;
4791 sds spat, ssub;
4792 robj keyobj;
4793 int prefixlen, sublen, postfixlen;
4794 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4795 struct {
4796 long len;
4797 long free;
4798 char buf[REDIS_SORTKEY_MAX+1];
4799 } keyname;
4800
4801 /* If the pattern is "#" return the substitution object itself in order
4802 * to implement the "SORT ... GET #" feature. */
4803 spat = pattern->ptr;
4804 if (spat[0] == '#' && spat[1] == '\0') {
4805 return subst;
4806 }
4807
4808 /* The substitution object may be specially encoded. If so we create
4809 * a decoded object on the fly. Otherwise getDecodedObject will just
4810 * increment the ref count, that we'll decrement later. */
4811 subst = getDecodedObject(subst);
4812
4813 ssub = subst->ptr;
4814 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4815 p = strchr(spat,'*');
4816 if (!p) {
4817 decrRefCount(subst);
4818 return NULL;
4819 }
4820
4821 prefixlen = p-spat;
4822 sublen = sdslen(ssub);
4823 postfixlen = sdslen(spat)-(prefixlen+1);
4824 memcpy(keyname.buf,spat,prefixlen);
4825 memcpy(keyname.buf+prefixlen,ssub,sublen);
4826 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4827 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4828 keyname.len = prefixlen+sublen+postfixlen;
4829
4830 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
4831 decrRefCount(subst);
4832
4833 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4834 return lookupKeyRead(db,&keyobj);
4835 }
4836
4837 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4838 * the additional parameter is not standard but a BSD-specific we have to
4839 * pass sorting parameters via the global 'server' structure */
4840 static int sortCompare(const void *s1, const void *s2) {
4841 const redisSortObject *so1 = s1, *so2 = s2;
4842 int cmp;
4843
4844 if (!server.sort_alpha) {
4845 /* Numeric sorting. Here it's trivial as we precomputed scores */
4846 if (so1->u.score > so2->u.score) {
4847 cmp = 1;
4848 } else if (so1->u.score < so2->u.score) {
4849 cmp = -1;
4850 } else {
4851 cmp = 0;
4852 }
4853 } else {
4854 /* Alphanumeric sorting */
4855 if (server.sort_bypattern) {
4856 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4857 /* At least one compare object is NULL */
4858 if (so1->u.cmpobj == so2->u.cmpobj)
4859 cmp = 0;
4860 else if (so1->u.cmpobj == NULL)
4861 cmp = -1;
4862 else
4863 cmp = 1;
4864 } else {
4865 /* We have both the objects, use strcoll */
4866 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4867 }
4868 } else {
4869 /* Compare elements directly */
4870 robj *dec1, *dec2;
4871
4872 dec1 = getDecodedObject(so1->obj);
4873 dec2 = getDecodedObject(so2->obj);
4874 cmp = strcoll(dec1->ptr,dec2->ptr);
4875 decrRefCount(dec1);
4876 decrRefCount(dec2);
4877 }
4878 }
4879 return server.sort_desc ? -cmp : cmp;
4880 }
4881
4882 /* The SORT command is the most complex command in Redis. Warning: this code
4883 * is optimized for speed and a bit less for readability */
4884 static void sortCommand(redisClient *c) {
4885 list *operations;
4886 int outputlen = 0;
4887 int desc = 0, alpha = 0;
4888 int limit_start = 0, limit_count = -1, start, end;
4889 int j, dontsort = 0, vectorlen;
4890 int getop = 0; /* GET operation counter */
4891 robj *sortval, *sortby = NULL, *storekey = NULL;
4892 redisSortObject *vector; /* Resulting vector to sort */
4893
4894 /* Lookup the key to sort. It must be of the right types */
4895 sortval = lookupKeyRead(c->db,c->argv[1]);
4896 if (sortval == NULL) {
4897 addReply(c,shared.nullmultibulk);
4898 return;
4899 }
4900 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
4901 sortval->type != REDIS_ZSET)
4902 {
4903 addReply(c,shared.wrongtypeerr);
4904 return;
4905 }
4906
4907 /* Create a list of operations to perform for every sorted element.
4908 * Operations can be GET/DEL/INCR/DECR */
4909 operations = listCreate();
4910 listSetFreeMethod(operations,zfree);
4911 j = 2;
4912
4913 /* Now we need to protect sortval incrementing its count, in the future
4914 * SORT may have options able to overwrite/delete keys during the sorting
4915 * and the sorted key itself may get destroied */
4916 incrRefCount(sortval);
4917
4918 /* The SORT command has an SQL-alike syntax, parse it */
4919 while(j < c->argc) {
4920 int leftargs = c->argc-j-1;
4921 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4922 desc = 0;
4923 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4924 desc = 1;
4925 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4926 alpha = 1;
4927 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4928 limit_start = atoi(c->argv[j+1]->ptr);
4929 limit_count = atoi(c->argv[j+2]->ptr);
4930 j+=2;
4931 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4932 storekey = c->argv[j+1];
4933 j++;
4934 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4935 sortby = c->argv[j+1];
4936 /* If the BY pattern does not contain '*', i.e. it is constant,
4937 * we don't need to sort nor to lookup the weight keys. */
4938 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4939 j++;
4940 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4941 listAddNodeTail(operations,createSortOperation(
4942 REDIS_SORT_GET,c->argv[j+1]));
4943 getop++;
4944 j++;
4945 } else {
4946 decrRefCount(sortval);
4947 listRelease(operations);
4948 addReply(c,shared.syntaxerr);
4949 return;
4950 }
4951 j++;
4952 }
4953
4954 /* Load the sorting vector with all the objects to sort */
4955 switch(sortval->type) {
4956 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
4957 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
4958 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
4959 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
4960 }
4961 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4962 j = 0;
4963
4964 if (sortval->type == REDIS_LIST) {
4965 list *list = sortval->ptr;
4966 listNode *ln;
4967
4968 listRewind(list);
4969 while((ln = listYield(list))) {
4970 robj *ele = ln->value;
4971 vector[j].obj = ele;
4972 vector[j].u.score = 0;
4973 vector[j].u.cmpobj = NULL;
4974 j++;
4975 }
4976 } else {
4977 dict *set;
4978 dictIterator *di;
4979 dictEntry *setele;
4980
4981 if (sortval->type == REDIS_SET) {
4982 set = sortval->ptr;
4983 } else {
4984 zset *zs = sortval->ptr;
4985 set = zs->dict;
4986 }
4987
4988 di = dictGetIterator(set);
4989 while((setele = dictNext(di)) != NULL) {
4990 vector[j].obj = dictGetEntryKey(setele);
4991 vector[j].u.score = 0;
4992 vector[j].u.cmpobj = NULL;
4993 j++;
4994 }
4995 dictReleaseIterator(di);
4996 }
4997 redisAssert(j == vectorlen);
4998
4999 /* Now it's time to load the right scores in the sorting vector */
5000 if (dontsort == 0) {
5001 for (j = 0; j < vectorlen; j++) {
5002 if (sortby) {
5003 robj *byval;
5004
5005 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
5006 if (!byval || byval->type != REDIS_STRING) continue;
5007 if (alpha) {
5008 vector[j].u.cmpobj = getDecodedObject(byval);
5009 } else {
5010 if (byval->encoding == REDIS_ENCODING_RAW) {
5011 vector[j].u.score = strtod(byval->ptr,NULL);
5012 } else {
5013 /* Don't need to decode the object if it's
5014 * integer-encoded (the only encoding supported) so
5015 * far. We can just cast it */
5016 if (byval->encoding == REDIS_ENCODING_INT) {
5017 vector[j].u.score = (long)byval->ptr;
5018 } else
5019 redisAssert(1 != 1);
5020 }
5021 }
5022 } else {
5023 if (!alpha) {
5024 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
5025 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
5026 else {
5027 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
5028 vector[j].u.score = (long) vector[j].obj->ptr;
5029 else
5030 redisAssert(1 != 1);
5031 }
5032 }
5033 }
5034 }
5035 }
5036
5037 /* We are ready to sort the vector... perform a bit of sanity check
5038 * on the LIMIT option too. We'll use a partial version of quicksort. */
5039 start = (limit_start < 0) ? 0 : limit_start;
5040 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
5041 if (start >= vectorlen) {
5042 start = vectorlen-1;
5043 end = vectorlen-2;
5044 }
5045 if (end >= vectorlen) end = vectorlen-1;
5046
5047 if (dontsort == 0) {
5048 server.sort_desc = desc;
5049 server.sort_alpha = alpha;
5050 server.sort_bypattern = sortby ? 1 : 0;
5051 if (sortby && (start != 0 || end != vectorlen-1))
5052 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
5053 else
5054 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
5055 }
5056
5057 /* Send command output to the output buffer, performing the specified
5058 * GET/DEL/INCR/DECR operations if any. */
5059 outputlen = getop ? getop*(end-start+1) : end-start+1;
5060 if (storekey == NULL) {
5061 /* STORE option not specified, sent the sorting result to client */
5062 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
5063 for (j = start; j <= end; j++) {
5064 listNode *ln;
5065 if (!getop) {
5066 addReplyBulkLen(c,vector[j].obj);
5067 addReply(c,vector[j].obj);
5068 addReply(c,shared.crlf);
5069 }
5070 listRewind(operations);
5071 while((ln = listYield(operations))) {
5072 redisSortOperation *sop = ln->value;
5073 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5074 vector[j].obj);
5075
5076 if (sop->type == REDIS_SORT_GET) {
5077 if (!val || val->type != REDIS_STRING) {
5078 addReply(c,shared.nullbulk);
5079 } else {
5080 addReplyBulkLen(c,val);
5081 addReply(c,val);
5082 addReply(c,shared.crlf);
5083 }
5084 } else {
5085 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5086 }
5087 }
5088 }
5089 } else {
5090 robj *listObject = createListObject();
5091 list *listPtr = (list*) listObject->ptr;
5092
5093 /* STORE option specified, set the sorting result as a List object */
5094 for (j = start; j <= end; j++) {
5095 listNode *ln;
5096 if (!getop) {
5097 listAddNodeTail(listPtr,vector[j].obj);
5098 incrRefCount(vector[j].obj);
5099 }
5100 listRewind(operations);
5101 while((ln = listYield(operations))) {
5102 redisSortOperation *sop = ln->value;
5103 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5104 vector[j].obj);
5105
5106 if (sop->type == REDIS_SORT_GET) {
5107 if (!val || val->type != REDIS_STRING) {
5108 listAddNodeTail(listPtr,createStringObject("",0));
5109 } else {
5110 listAddNodeTail(listPtr,val);
5111 incrRefCount(val);
5112 }
5113 } else {
5114 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5115 }
5116 }
5117 }
5118 if (dictReplace(c->db->dict,storekey,listObject)) {
5119 incrRefCount(storekey);
5120 }
5121 /* Note: we add 1 because the DB is dirty anyway since even if the
5122 * SORT result is empty a new key is set and maybe the old content
5123 * replaced. */
5124 server.dirty += 1+outputlen;
5125 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5126 }
5127
5128 /* Cleanup */
5129 decrRefCount(sortval);
5130 listRelease(operations);
5131 for (j = 0; j < vectorlen; j++) {
5132 if (sortby && alpha && vector[j].u.cmpobj)
5133 decrRefCount(vector[j].u.cmpobj);
5134 }
5135 zfree(vector);
5136 }
5137
5138 /* Create the string returned by the INFO command. This is decoupled
5139 * by the INFO command itself as we need to report the same information
5140 * on memory corruption problems. */
5141 static sds genRedisInfoString(void) {
5142 sds info;
5143 time_t uptime = time(NULL)-server.stat_starttime;
5144 int j;
5145
5146 info = sdscatprintf(sdsempty(),
5147 "redis_version:%s\r\n"
5148 "arch_bits:%s\r\n"
5149 "multiplexing_api:%s\r\n"
5150 "uptime_in_seconds:%ld\r\n"
5151 "uptime_in_days:%ld\r\n"
5152 "connected_clients:%d\r\n"
5153 "connected_slaves:%d\r\n"
5154 "used_memory:%zu\r\n"
5155 "changes_since_last_save:%lld\r\n"
5156 "bgsave_in_progress:%d\r\n"
5157 "last_save_time:%ld\r\n"
5158 "bgrewriteaof_in_progress:%d\r\n"
5159 "total_connections_received:%lld\r\n"
5160 "total_commands_processed:%lld\r\n"
5161 "role:%s\r\n"
5162 ,REDIS_VERSION,
5163 (sizeof(long) == 8) ? "64" : "32",
5164 aeGetApiName(),
5165 uptime,
5166 uptime/(3600*24),
5167 listLength(server.clients)-listLength(server.slaves),
5168 listLength(server.slaves),
5169 server.usedmemory,
5170 server.dirty,
5171 server.bgsavechildpid != -1,
5172 server.lastsave,
5173 server.bgrewritechildpid != -1,
5174 server.stat_numconnections,
5175 server.stat_numcommands,
5176 server.masterhost == NULL ? "master" : "slave"
5177 );
5178 if (server.masterhost) {
5179 info = sdscatprintf(info,
5180 "master_host:%s\r\n"
5181 "master_port:%d\r\n"
5182 "master_link_status:%s\r\n"
5183 "master_last_io_seconds_ago:%d\r\n"
5184 ,server.masterhost,
5185 server.masterport,
5186 (server.replstate == REDIS_REPL_CONNECTED) ?
5187 "up" : "down",
5188 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5189 );
5190 }
5191 for (j = 0; j < server.dbnum; j++) {
5192 long long keys, vkeys;
5193
5194 keys = dictSize(server.db[j].dict);
5195 vkeys = dictSize(server.db[j].expires);
5196 if (keys || vkeys) {
5197 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5198 j, keys, vkeys);
5199 }
5200 }
5201 return info;
5202 }
5203
5204 static void infoCommand(redisClient *c) {
5205 sds info = genRedisInfoString();
5206 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5207 (unsigned long)sdslen(info)));
5208 addReplySds(c,info);
5209 addReply(c,shared.crlf);
5210 }
5211
5212 static void monitorCommand(redisClient *c) {
5213 /* ignore MONITOR if aleady slave or in monitor mode */
5214 if (c->flags & REDIS_SLAVE) return;
5215
5216 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5217 c->slaveseldb = 0;
5218 listAddNodeTail(server.monitors,c);
5219 addReply(c,shared.ok);
5220 }
5221
5222 /* ================================= Expire ================================= */
5223 static int removeExpire(redisDb *db, robj *key) {
5224 if (dictDelete(db->expires,key) == DICT_OK) {
5225 return 1;
5226 } else {
5227 return 0;
5228 }
5229 }
5230
5231 static int setExpire(redisDb *db, robj *key, time_t when) {
5232 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5233 return 0;
5234 } else {
5235 incrRefCount(key);
5236 return 1;
5237 }
5238 }
5239
5240 /* Return the expire time of the specified key, or -1 if no expire
5241 * is associated with this key (i.e. the key is non volatile) */
5242 static time_t getExpire(redisDb *db, robj *key) {
5243 dictEntry *de;
5244
5245 /* No expire? return ASAP */
5246 if (dictSize(db->expires) == 0 ||
5247 (de = dictFind(db->expires,key)) == NULL) return -1;
5248
5249 return (time_t) dictGetEntryVal(de);
5250 }
5251
5252 static int expireIfNeeded(redisDb *db, robj *key) {
5253 time_t when;
5254 dictEntry *de;
5255
5256 /* No expire? return ASAP */
5257 if (dictSize(db->expires) == 0 ||
5258 (de = dictFind(db->expires,key)) == NULL) return 0;
5259
5260 /* Lookup the expire */
5261 when = (time_t) dictGetEntryVal(de);
5262 if (time(NULL) <= when) return 0;
5263
5264 /* Delete the key */
5265 dictDelete(db->expires,key);
5266 return dictDelete(db->dict,key) == DICT_OK;
5267 }
5268
5269 static int deleteIfVolatile(redisDb *db, robj *key) {
5270 dictEntry *de;
5271
5272 /* No expire? return ASAP */
5273 if (dictSize(db->expires) == 0 ||
5274 (de = dictFind(db->expires,key)) == NULL) return 0;
5275
5276 /* Delete the key */
5277 server.dirty++;
5278 dictDelete(db->expires,key);
5279 return dictDelete(db->dict,key) == DICT_OK;
5280 }
5281
5282 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5283 dictEntry *de;
5284
5285 de = dictFind(c->db->dict,key);
5286 if (de == NULL) {
5287 addReply(c,shared.czero);
5288 return;
5289 }
5290 if (seconds < 0) {
5291 if (deleteKey(c->db,key)) server.dirty++;
5292 addReply(c, shared.cone);
5293 return;
5294 } else {
5295 time_t when = time(NULL)+seconds;
5296 if (setExpire(c->db,key,when)) {
5297 addReply(c,shared.cone);
5298 server.dirty++;
5299 } else {
5300 addReply(c,shared.czero);
5301 }
5302 return;
5303 }
5304 }
5305
5306 static void expireCommand(redisClient *c) {
5307 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5308 }
5309
5310 static void expireatCommand(redisClient *c) {
5311 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5312 }
5313
5314 static void ttlCommand(redisClient *c) {
5315 time_t expire;
5316 int ttl = -1;
5317
5318 expire = getExpire(c->db,c->argv[1]);
5319 if (expire != -1) {
5320 ttl = (int) (expire-time(NULL));
5321 if (ttl < 0) ttl = -1;
5322 }
5323 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5324 }
5325
5326 /* ================================ MULTI/EXEC ============================== */
5327
5328 /* Client state initialization for MULTI/EXEC */
5329 static void initClientMultiState(redisClient *c) {
5330 c->mstate.commands = NULL;
5331 c->mstate.count = 0;
5332 }
5333
5334 /* Release all the resources associated with MULTI/EXEC state */
5335 static void freeClientMultiState(redisClient *c) {
5336 int j;
5337
5338 for (j = 0; j < c->mstate.count; j++) {
5339 int i;
5340 multiCmd *mc = c->mstate.commands+j;
5341
5342 for (i = 0; i < mc->argc; i++)
5343 decrRefCount(mc->argv[i]);
5344 zfree(mc->argv);
5345 }
5346 zfree(c->mstate.commands);
5347 }
5348
5349 /* Add a new command into the MULTI commands queue */
5350 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd) {
5351 multiCmd *mc;
5352 int j;
5353
5354 c->mstate.commands = zrealloc(c->mstate.commands,
5355 sizeof(multiCmd)*(c->mstate.count+1));
5356 mc = c->mstate.commands+c->mstate.count;
5357 mc->cmd = cmd;
5358 mc->argc = c->argc;
5359 mc->argv = zmalloc(sizeof(robj*)*c->argc);
5360 memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
5361 for (j = 0; j < c->argc; j++)
5362 incrRefCount(mc->argv[j]);
5363 c->mstate.count++;
5364 }
5365
5366 static void multiCommand(redisClient *c) {
5367 c->flags |= REDIS_MULTI;
5368 }
5369
5370 static void execCommand(redisClient *c) {
5371 int j;
5372 robj **orig_argv;
5373 int orig_argc;
5374
5375 if (!(c->flags & REDIS_MULTI)) {
5376 addReplySds(c,sdsnew("-ERR EXEC without MULTI\r\n"));
5377 return;
5378 }
5379
5380 orig_argv = c->argv;
5381 orig_argc = c->argc;
5382 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
5383 for (j = 0; j < c->mstate.count; j++) {
5384 c->argc = c->mstate.commands[j].argc;
5385 c->argv = c->mstate.commands[j].argv;
5386 call(c,c->mstate.commands[j].cmd);
5387 }
5388 c->argv = orig_argv;
5389 c->argc = orig_argc;
5390 freeClientMultiState(c);
5391 initClientMultiState(c);
5392 c->flags &= (~REDIS_MULTI);
5393 }
5394
5395 /* =============================== Replication ============================= */
5396
5397 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5398 ssize_t nwritten, ret = size;
5399 time_t start = time(NULL);
5400
5401 timeout++;
5402 while(size) {
5403 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5404 nwritten = write(fd,ptr,size);
5405 if (nwritten == -1) return -1;
5406 ptr += nwritten;
5407 size -= nwritten;
5408 }
5409 if ((time(NULL)-start) > timeout) {
5410 errno = ETIMEDOUT;
5411 return -1;
5412 }
5413 }
5414 return ret;
5415 }
5416
5417 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5418 ssize_t nread, totread = 0;
5419 time_t start = time(NULL);
5420
5421 timeout++;
5422 while(size) {
5423 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5424 nread = read(fd,ptr,size);
5425 if (nread == -1) return -1;
5426 ptr += nread;
5427 size -= nread;
5428 totread += nread;
5429 }
5430 if ((time(NULL)-start) > timeout) {
5431 errno = ETIMEDOUT;
5432 return -1;
5433 }
5434 }
5435 return totread;
5436 }
5437
5438 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5439 ssize_t nread = 0;
5440
5441 size--;
5442 while(size) {
5443 char c;
5444
5445 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5446 if (c == '\n') {
5447 *ptr = '\0';
5448 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5449 return nread;
5450 } else {
5451 *ptr++ = c;
5452 *ptr = '\0';
5453 nread++;
5454 }
5455 }
5456 return nread;
5457 }
5458
5459 static void syncCommand(redisClient *c) {
5460 /* ignore SYNC if aleady slave or in monitor mode */
5461 if (c->flags & REDIS_SLAVE) return;
5462
5463 /* SYNC can't be issued when the server has pending data to send to
5464 * the client about already issued commands. We need a fresh reply
5465 * buffer registering the differences between the BGSAVE and the current
5466 * dataset, so that we can copy to other slaves if needed. */
5467 if (listLength(c->reply) != 0) {
5468 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5469 return;
5470 }
5471
5472 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5473 /* Here we need to check if there is a background saving operation
5474 * in progress, or if it is required to start one */
5475 if (server.bgsavechildpid != -1) {
5476 /* Ok a background save is in progress. Let's check if it is a good
5477 * one for replication, i.e. if there is another slave that is
5478 * registering differences since the server forked to save */
5479 redisClient *slave;
5480 listNode *ln;
5481
5482 listRewind(server.slaves);
5483 while((ln = listYield(server.slaves))) {
5484 slave = ln->value;
5485 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5486 }
5487 if (ln) {
5488 /* Perfect, the server is already registering differences for
5489 * another slave. Set the right state, and copy the buffer. */
5490 listRelease(c->reply);
5491 c->reply = listDup(slave->reply);
5492 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5493 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5494 } else {
5495 /* No way, we need to wait for the next BGSAVE in order to
5496 * register differences */
5497 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5498 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5499 }
5500 } else {
5501 /* Ok we don't have a BGSAVE in progress, let's start one */
5502 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5503 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5504 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5505 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5506 return;
5507 }
5508 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5509 }
5510 c->repldbfd = -1;
5511 c->flags |= REDIS_SLAVE;
5512 c->slaveseldb = 0;
5513 listAddNodeTail(server.slaves,c);
5514 return;
5515 }
5516
5517 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5518 redisClient *slave = privdata;
5519 REDIS_NOTUSED(el);
5520 REDIS_NOTUSED(mask);
5521 char buf[REDIS_IOBUF_LEN];
5522 ssize_t nwritten, buflen;
5523
5524 if (slave->repldboff == 0) {
5525 /* Write the bulk write count before to transfer the DB. In theory here
5526 * we don't know how much room there is in the output buffer of the
5527 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5528 * operations) will never be smaller than the few bytes we need. */
5529 sds bulkcount;
5530
5531 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5532 slave->repldbsize);
5533 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5534 {
5535 sdsfree(bulkcount);
5536 freeClient(slave);
5537 return;
5538 }
5539 sdsfree(bulkcount);
5540 }
5541 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5542 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5543 if (buflen <= 0) {
5544 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5545 (buflen == 0) ? "premature EOF" : strerror(errno));
5546 freeClient(slave);
5547 return;
5548 }
5549 if ((nwritten = write(fd,buf,buflen)) == -1) {
5550 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5551 strerror(errno));
5552 freeClient(slave);
5553 return;
5554 }
5555 slave->repldboff += nwritten;
5556 if (slave->repldboff == slave->repldbsize) {
5557 close(slave->repldbfd);
5558 slave->repldbfd = -1;
5559 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5560 slave->replstate = REDIS_REPL_ONLINE;
5561 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5562 sendReplyToClient, slave) == AE_ERR) {
5563 freeClient(slave);
5564 return;
5565 }
5566 addReplySds(slave,sdsempty());
5567 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5568 }
5569 }
5570
5571 /* This function is called at the end of every backgrond saving.
5572 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5573 * otherwise REDIS_ERR is passed to the function.
5574 *
5575 * The goal of this function is to handle slaves waiting for a successful
5576 * background saving in order to perform non-blocking synchronization. */
5577 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5578 listNode *ln;
5579 int startbgsave = 0;
5580
5581 listRewind(server.slaves);
5582 while((ln = listYield(server.slaves))) {
5583 redisClient *slave = ln->value;
5584
5585 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5586 startbgsave = 1;
5587 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5588 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5589 struct redis_stat buf;
5590
5591 if (bgsaveerr != REDIS_OK) {
5592 freeClient(slave);
5593 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5594 continue;
5595 }
5596 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5597 redis_fstat(slave->repldbfd,&buf) == -1) {
5598 freeClient(slave);
5599 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5600 continue;
5601 }
5602 slave->repldboff = 0;
5603 slave->repldbsize = buf.st_size;
5604 slave->replstate = REDIS_REPL_SEND_BULK;
5605 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5606 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5607 freeClient(slave);
5608 continue;
5609 }
5610 }
5611 }
5612 if (startbgsave) {
5613 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5614 listRewind(server.slaves);
5615 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5616 while((ln = listYield(server.slaves))) {
5617 redisClient *slave = ln->value;
5618
5619 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5620 freeClient(slave);
5621 }
5622 }
5623 }
5624 }
5625
5626 static int syncWithMaster(void) {
5627 char buf[1024], tmpfile[256], authcmd[1024];
5628 int dumpsize;
5629 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5630 int dfd;
5631
5632 if (fd == -1) {
5633 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5634 strerror(errno));
5635 return REDIS_ERR;
5636 }
5637
5638 /* AUTH with the master if required. */
5639 if(server.masterauth) {
5640 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5641 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5642 close(fd);
5643 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5644 strerror(errno));
5645 return REDIS_ERR;
5646 }
5647 /* Read the AUTH result. */
5648 if (syncReadLine(fd,buf,1024,3600) == -1) {
5649 close(fd);
5650 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5651 strerror(errno));
5652 return REDIS_ERR;
5653 }
5654 if (buf[0] != '+') {
5655 close(fd);
5656 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5657 return REDIS_ERR;
5658 }
5659 }
5660
5661 /* Issue the SYNC command */
5662 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5663 close(fd);
5664 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5665 strerror(errno));
5666 return REDIS_ERR;
5667 }
5668 /* Read the bulk write count */
5669 if (syncReadLine(fd,buf,1024,3600) == -1) {
5670 close(fd);
5671 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5672 strerror(errno));
5673 return REDIS_ERR;
5674 }
5675 if (buf[0] != '$') {
5676 close(fd);
5677 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5678 return REDIS_ERR;
5679 }
5680 dumpsize = atoi(buf+1);
5681 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5682 /* Read the bulk write data on a temp file */
5683 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5684 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5685 if (dfd == -1) {
5686 close(fd);
5687 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5688 return REDIS_ERR;
5689 }
5690 while(dumpsize) {
5691 int nread, nwritten;
5692
5693 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5694 if (nread == -1) {
5695 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5696 strerror(errno));
5697 close(fd);
5698 close(dfd);
5699 return REDIS_ERR;
5700 }
5701 nwritten = write(dfd,buf,nread);
5702 if (nwritten == -1) {
5703 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5704 close(fd);
5705 close(dfd);
5706 return REDIS_ERR;
5707 }
5708 dumpsize -= nread;
5709 }
5710 close(dfd);
5711 if (rename(tmpfile,server.dbfilename) == -1) {
5712 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5713 unlink(tmpfile);
5714 close(fd);
5715 return REDIS_ERR;
5716 }
5717 emptyDb();
5718 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5719 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5720 close(fd);
5721 return REDIS_ERR;
5722 }
5723 server.master = createClient(fd);
5724 server.master->flags |= REDIS_MASTER;
5725 server.master->authenticated = 1;
5726 server.replstate = REDIS_REPL_CONNECTED;
5727 return REDIS_OK;
5728 }
5729
5730 static void slaveofCommand(redisClient *c) {
5731 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5732 !strcasecmp(c->argv[2]->ptr,"one")) {
5733 if (server.masterhost) {
5734 sdsfree(server.masterhost);
5735 server.masterhost = NULL;
5736 if (server.master) freeClient(server.master);
5737 server.replstate = REDIS_REPL_NONE;
5738 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5739 }
5740 } else {
5741 sdsfree(server.masterhost);
5742 server.masterhost = sdsdup(c->argv[1]->ptr);
5743 server.masterport = atoi(c->argv[2]->ptr);
5744 if (server.master) freeClient(server.master);
5745 server.replstate = REDIS_REPL_CONNECT;
5746 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5747 server.masterhost, server.masterport);
5748 }
5749 addReply(c,shared.ok);
5750 }
5751
5752 /* ============================ Maxmemory directive ======================== */
5753
5754 /* This function gets called when 'maxmemory' is set on the config file to limit
5755 * the max memory used by the server, and we are out of memory.
5756 * This function will try to, in order:
5757 *
5758 * - Free objects from the free list
5759 * - Try to remove keys with an EXPIRE set
5760 *
5761 * It is not possible to free enough memory to reach used-memory < maxmemory
5762 * the server will start refusing commands that will enlarge even more the
5763 * memory usage.
5764 */
5765 static void freeMemoryIfNeeded(void) {
5766 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5767 if (listLength(server.objfreelist)) {
5768 robj *o;
5769
5770 listNode *head = listFirst(server.objfreelist);
5771 o = listNodeValue(head);
5772 listDelNode(server.objfreelist,head);
5773 zfree(o);
5774 } else {
5775 int j, k, freed = 0;
5776
5777 for (j = 0; j < server.dbnum; j++) {
5778 int minttl = -1;
5779 robj *minkey = NULL;
5780 struct dictEntry *de;
5781
5782 if (dictSize(server.db[j].expires)) {
5783 freed = 1;
5784 /* From a sample of three keys drop the one nearest to
5785 * the natural expire */
5786 for (k = 0; k < 3; k++) {
5787 time_t t;
5788
5789 de = dictGetRandomKey(server.db[j].expires);
5790 t = (time_t) dictGetEntryVal(de);
5791 if (minttl == -1 || t < minttl) {
5792 minkey = dictGetEntryKey(de);
5793 minttl = t;
5794 }
5795 }
5796 deleteKey(server.db+j,minkey);
5797 }
5798 }
5799 if (!freed) return; /* nothing to free... */
5800 }
5801 }
5802 }
5803
5804 /* ============================== Append Only file ========================== */
5805
5806 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5807 sds buf = sdsempty();
5808 int j;
5809 ssize_t nwritten;
5810 time_t now;
5811 robj *tmpargv[3];
5812
5813 /* The DB this command was targetting is not the same as the last command
5814 * we appendend. To issue a SELECT command is needed. */
5815 if (dictid != server.appendseldb) {
5816 char seldb[64];
5817
5818 snprintf(seldb,sizeof(seldb),"%d",dictid);
5819 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
5820 (unsigned long)strlen(seldb),seldb);
5821 server.appendseldb = dictid;
5822 }
5823
5824 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5825 * EXPIREs into EXPIREATs calls */
5826 if (cmd->proc == expireCommand) {
5827 long when;
5828
5829 tmpargv[0] = createStringObject("EXPIREAT",8);
5830 tmpargv[1] = argv[1];
5831 incrRefCount(argv[1]);
5832 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5833 tmpargv[2] = createObject(REDIS_STRING,
5834 sdscatprintf(sdsempty(),"%ld",when));
5835 argv = tmpargv;
5836 }
5837
5838 /* Append the actual command */
5839 buf = sdscatprintf(buf,"*%d\r\n",argc);
5840 for (j = 0; j < argc; j++) {
5841 robj *o = argv[j];
5842
5843 o = getDecodedObject(o);
5844 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
5845 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5846 buf = sdscatlen(buf,"\r\n",2);
5847 decrRefCount(o);
5848 }
5849
5850 /* Free the objects from the modified argv for EXPIREAT */
5851 if (cmd->proc == expireCommand) {
5852 for (j = 0; j < 3; j++)
5853 decrRefCount(argv[j]);
5854 }
5855
5856 /* We want to perform a single write. This should be guaranteed atomic
5857 * at least if the filesystem we are writing is a real physical one.
5858 * While this will save us against the server being killed I don't think
5859 * there is much to do about the whole server stopping for power problems
5860 * or alike */
5861 nwritten = write(server.appendfd,buf,sdslen(buf));
5862 if (nwritten != (signed)sdslen(buf)) {
5863 /* Ooops, we are in troubles. The best thing to do for now is
5864 * to simply exit instead to give the illusion that everything is
5865 * working as expected. */
5866 if (nwritten == -1) {
5867 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5868 } else {
5869 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5870 }
5871 exit(1);
5872 }
5873 /* If a background append only file rewriting is in progress we want to
5874 * accumulate the differences between the child DB and the current one
5875 * in a buffer, so that when the child process will do its work we
5876 * can append the differences to the new append only file. */
5877 if (server.bgrewritechildpid != -1)
5878 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
5879
5880 sdsfree(buf);
5881 now = time(NULL);
5882 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5883 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5884 now-server.lastfsync > 1))
5885 {
5886 fsync(server.appendfd); /* Let's try to get this data on the disk */
5887 server.lastfsync = now;
5888 }
5889 }
5890
5891 /* In Redis commands are always executed in the context of a client, so in
5892 * order to load the append only file we need to create a fake client. */
5893 static struct redisClient *createFakeClient(void) {
5894 struct redisClient *c = zmalloc(sizeof(*c));
5895
5896 selectDb(c,0);
5897 c->fd = -1;
5898 c->querybuf = sdsempty();
5899 c->argc = 0;
5900 c->argv = NULL;
5901 c->flags = 0;
5902 /* We set the fake client as a slave waiting for the synchronization
5903 * so that Redis will not try to send replies to this client. */
5904 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5905 c->reply = listCreate();
5906 listSetFreeMethod(c->reply,decrRefCount);
5907 listSetDupMethod(c->reply,dupClientReplyValue);
5908 return c;
5909 }
5910
5911 static void freeFakeClient(struct redisClient *c) {
5912 sdsfree(c->querybuf);
5913 listRelease(c->reply);
5914 zfree(c);
5915 }
5916
5917 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5918 * error (the append only file is zero-length) REDIS_ERR is returned. On
5919 * fatal error an error message is logged and the program exists. */
5920 int loadAppendOnlyFile(char *filename) {
5921 struct redisClient *fakeClient;
5922 FILE *fp = fopen(filename,"r");
5923 struct redis_stat sb;
5924
5925 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5926 return REDIS_ERR;
5927
5928 if (fp == NULL) {
5929 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5930 exit(1);
5931 }
5932
5933 fakeClient = createFakeClient();
5934 while(1) {
5935 int argc, j;
5936 unsigned long len;
5937 robj **argv;
5938 char buf[128];
5939 sds argsds;
5940 struct redisCommand *cmd;
5941
5942 if (fgets(buf,sizeof(buf),fp) == NULL) {
5943 if (feof(fp))
5944 break;
5945 else
5946 goto readerr;
5947 }
5948 if (buf[0] != '*') goto fmterr;
5949 argc = atoi(buf+1);
5950 argv = zmalloc(sizeof(robj*)*argc);
5951 for (j = 0; j < argc; j++) {
5952 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5953 if (buf[0] != '$') goto fmterr;
5954 len = strtol(buf+1,NULL,10);
5955 argsds = sdsnewlen(NULL,len);
5956 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
5957 argv[j] = createObject(REDIS_STRING,argsds);
5958 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5959 }
5960
5961 /* Command lookup */
5962 cmd = lookupCommand(argv[0]->ptr);
5963 if (!cmd) {
5964 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5965 exit(1);
5966 }
5967 /* Try object sharing and encoding */
5968 if (server.shareobjects) {
5969 int j;
5970 for(j = 1; j < argc; j++)
5971 argv[j] = tryObjectSharing(argv[j]);
5972 }
5973 if (cmd->flags & REDIS_CMD_BULK)
5974 tryObjectEncoding(argv[argc-1]);
5975 /* Run the command in the context of a fake client */
5976 fakeClient->argc = argc;
5977 fakeClient->argv = argv;
5978 cmd->proc(fakeClient);
5979 /* Discard the reply objects list from the fake client */
5980 while(listLength(fakeClient->reply))
5981 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5982 /* Clean up, ready for the next command */
5983 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5984 zfree(argv);
5985 }
5986 fclose(fp);
5987 freeFakeClient(fakeClient);
5988 return REDIS_OK;
5989
5990 readerr:
5991 if (feof(fp)) {
5992 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5993 } else {
5994 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5995 }
5996 exit(1);
5997 fmterr:
5998 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5999 exit(1);
6000 }
6001
6002 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6003 static int fwriteBulk(FILE *fp, robj *obj) {
6004 char buf[128];
6005 obj = getDecodedObject(obj);
6006 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
6007 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
6008 if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0)
6009 goto err;
6010 if (fwrite("\r\n",2,1,fp) == 0) goto err;
6011 decrRefCount(obj);
6012 return 1;
6013 err:
6014 decrRefCount(obj);
6015 return 0;
6016 }
6017
6018 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6019 static int fwriteBulkDouble(FILE *fp, double d) {
6020 char buf[128], dbuf[128];
6021
6022 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
6023 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
6024 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6025 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
6026 return 1;
6027 }
6028
6029 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6030 static int fwriteBulkLong(FILE *fp, long l) {
6031 char buf[128], lbuf[128];
6032
6033 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
6034 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
6035 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6036 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
6037 return 1;
6038 }
6039
6040 /* Write a sequence of commands able to fully rebuild the dataset into
6041 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6042 static int rewriteAppendOnlyFile(char *filename) {
6043 dictIterator *di = NULL;
6044 dictEntry *de;
6045 FILE *fp;
6046 char tmpfile[256];
6047 int j;
6048 time_t now = time(NULL);
6049
6050 /* Note that we have to use a different temp name here compared to the
6051 * one used by rewriteAppendOnlyFileBackground() function. */
6052 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
6053 fp = fopen(tmpfile,"w");
6054 if (!fp) {
6055 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
6056 return REDIS_ERR;
6057 }
6058 for (j = 0; j < server.dbnum; j++) {
6059 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
6060 redisDb *db = server.db+j;
6061 dict *d = db->dict;
6062 if (dictSize(d) == 0) continue;
6063 di = dictGetIterator(d);
6064 if (!di) {
6065 fclose(fp);
6066 return REDIS_ERR;
6067 }
6068
6069 /* SELECT the new DB */
6070 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
6071 if (fwriteBulkLong(fp,j) == 0) goto werr;
6072
6073 /* Iterate this DB writing every entry */
6074 while((de = dictNext(di)) != NULL) {
6075 robj *key = dictGetEntryKey(de);
6076 robj *o = dictGetEntryVal(de);
6077 time_t expiretime = getExpire(db,key);
6078
6079 /* Save the key and associated value */
6080 if (o->type == REDIS_STRING) {
6081 /* Emit a SET command */
6082 char cmd[]="*3\r\n$3\r\nSET\r\n";
6083 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6084 /* Key and value */
6085 if (fwriteBulk(fp,key) == 0) goto werr;
6086 if (fwriteBulk(fp,o) == 0) goto werr;
6087 } else if (o->type == REDIS_LIST) {
6088 /* Emit the RPUSHes needed to rebuild the list */
6089 list *list = o->ptr;
6090 listNode *ln;
6091
6092 listRewind(list);
6093 while((ln = listYield(list))) {
6094 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
6095 robj *eleobj = listNodeValue(ln);
6096
6097 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6098 if (fwriteBulk(fp,key) == 0) goto werr;
6099 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6100 }
6101 } else if (o->type == REDIS_SET) {
6102 /* Emit the SADDs needed to rebuild the set */
6103 dict *set = o->ptr;
6104 dictIterator *di = dictGetIterator(set);
6105 dictEntry *de;
6106
6107 while((de = dictNext(di)) != NULL) {
6108 char cmd[]="*3\r\n$4\r\nSADD\r\n";
6109 robj *eleobj = dictGetEntryKey(de);
6110
6111 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6112 if (fwriteBulk(fp,key) == 0) goto werr;
6113 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6114 }
6115 dictReleaseIterator(di);
6116 } else if (o->type == REDIS_ZSET) {
6117 /* Emit the ZADDs needed to rebuild the sorted set */
6118 zset *zs = o->ptr;
6119 dictIterator *di = dictGetIterator(zs->dict);
6120 dictEntry *de;
6121
6122 while((de = dictNext(di)) != NULL) {
6123 char cmd[]="*4\r\n$4\r\nZADD\r\n";
6124 robj *eleobj = dictGetEntryKey(de);
6125 double *score = dictGetEntryVal(de);
6126
6127 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6128 if (fwriteBulk(fp,key) == 0) goto werr;
6129 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
6130 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6131 }
6132 dictReleaseIterator(di);
6133 } else {
6134 redisAssert(0 != 0);
6135 }
6136 /* Save the expire time */
6137 if (expiretime != -1) {
6138 char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
6139 /* If this key is already expired skip it */
6140 if (expiretime < now) continue;
6141 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6142 if (fwriteBulk(fp,key) == 0) goto werr;
6143 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6144 }
6145 }
6146 dictReleaseIterator(di);
6147 }
6148
6149 /* Make sure data will not remain on the OS's output buffers */
6150 fflush(fp);
6151 fsync(fileno(fp));
6152 fclose(fp);
6153
6154 /* Use RENAME to make sure the DB file is changed atomically only
6155 * if the generate DB file is ok. */
6156 if (rename(tmpfile,filename) == -1) {
6157 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6158 unlink(tmpfile);
6159 return REDIS_ERR;
6160 }
6161 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6162 return REDIS_OK;
6163
6164 werr:
6165 fclose(fp);
6166 unlink(tmpfile);
6167 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
6168 if (di) dictReleaseIterator(di);
6169 return REDIS_ERR;
6170 }
6171
6172 /* This is how rewriting of the append only file in background works:
6173 *
6174 * 1) The user calls BGREWRITEAOF
6175 * 2) Redis calls this function, that forks():
6176 * 2a) the child rewrite the append only file in a temp file.
6177 * 2b) the parent accumulates differences in server.bgrewritebuf.
6178 * 3) When the child finished '2a' exists.
6179 * 4) The parent will trap the exit code, if it's OK, will append the
6180 * data accumulated into server.bgrewritebuf into the temp file, and
6181 * finally will rename(2) the temp file in the actual file name.
6182 * The the new file is reopened as the new append only file. Profit!
6183 */
6184 static int rewriteAppendOnlyFileBackground(void) {
6185 pid_t childpid;
6186
6187 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6188 if ((childpid = fork()) == 0) {
6189 /* Child */
6190 char tmpfile[256];
6191 close(server.fd);
6192
6193 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6194 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6195 exit(0);
6196 } else {
6197 exit(1);
6198 }
6199 } else {
6200 /* Parent */
6201 if (childpid == -1) {
6202 redisLog(REDIS_WARNING,
6203 "Can't rewrite append only file in background: fork: %s",
6204 strerror(errno));
6205 return REDIS_ERR;
6206 }
6207 redisLog(REDIS_NOTICE,
6208 "Background append only file rewriting started by pid %d",childpid);
6209 server.bgrewritechildpid = childpid;
6210 /* We set appendseldb to -1 in order to force the next call to the
6211 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6212 * accumulated by the parent into server.bgrewritebuf will start
6213 * with a SELECT statement and it will be safe to merge. */
6214 server.appendseldb = -1;
6215 return REDIS_OK;
6216 }
6217 return REDIS_OK; /* unreached */
6218 }
6219
6220 static void bgrewriteaofCommand(redisClient *c) {
6221 if (server.bgrewritechildpid != -1) {
6222 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6223 return;
6224 }
6225 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6226 char *status = "+Background append only file rewriting started\r\n";
6227 addReplySds(c,sdsnew(status));
6228 } else {
6229 addReply(c,shared.err);
6230 }
6231 }
6232
6233 static void aofRemoveTempFile(pid_t childpid) {
6234 char tmpfile[256];
6235
6236 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6237 unlink(tmpfile);
6238 }
6239
6240 /* ================================= Debugging ============================== */
6241
6242 static void debugCommand(redisClient *c) {
6243 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6244 *((char*)-1) = 'x';
6245 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6246 if (rdbSave(server.dbfilename) != REDIS_OK) {
6247 addReply(c,shared.err);
6248 return;
6249 }
6250 emptyDb();
6251 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6252 addReply(c,shared.err);
6253 return;
6254 }
6255 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6256 addReply(c,shared.ok);
6257 } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
6258 emptyDb();
6259 if (loadAppendOnlyFile(server.appendfilename) != REDIS_OK) {
6260 addReply(c,shared.err);
6261 return;
6262 }
6263 redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
6264 addReply(c,shared.ok);
6265 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6266 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6267 robj *key, *val;
6268
6269 if (!de) {
6270 addReply(c,shared.nokeyerr);
6271 return;
6272 }
6273 key = dictGetEntryKey(de);
6274 val = dictGetEntryVal(de);
6275 addReplySds(c,sdscatprintf(sdsempty(),
6276 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6277 (void*)key, key->refcount, (void*)val, val->refcount,
6278 val->encoding));
6279 } else {
6280 addReplySds(c,sdsnew(
6281 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6282 }
6283 }
6284
6285 static void _redisAssert(char *estr) {
6286 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6287 redisLog(REDIS_WARNING,"==> %s\n",estr);
6288 #ifdef HAVE_BACKTRACE
6289 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6290 *((char*)-1) = 'x';
6291 #endif
6292 }
6293
6294 /* =================================== Main! ================================ */
6295
6296 #ifdef __linux__
6297 int linuxOvercommitMemoryValue(void) {
6298 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6299 char buf[64];
6300
6301 if (!fp) return -1;
6302 if (fgets(buf,64,fp) == NULL) {
6303 fclose(fp);
6304 return -1;
6305 }
6306 fclose(fp);
6307
6308 return atoi(buf);
6309 }
6310
6311 void linuxOvercommitMemoryWarning(void) {
6312 if (linuxOvercommitMemoryValue() == 0) {
6313 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6314 }
6315 }
6316 #endif /* __linux__ */
6317
6318 static void daemonize(void) {
6319 int fd;
6320 FILE *fp;
6321
6322 if (fork() != 0) exit(0); /* parent exits */
6323 printf("New pid: %d\n", getpid());
6324 setsid(); /* create a new session */
6325
6326 /* Every output goes to /dev/null. If Redis is daemonized but
6327 * the 'logfile' is set to 'stdout' in the configuration file
6328 * it will not log at all. */
6329 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6330 dup2(fd, STDIN_FILENO);
6331 dup2(fd, STDOUT_FILENO);
6332 dup2(fd, STDERR_FILENO);
6333 if (fd > STDERR_FILENO) close(fd);
6334 }
6335 /* Try to write the pid file */
6336 fp = fopen(server.pidfile,"w");
6337 if (fp) {
6338 fprintf(fp,"%d\n",getpid());
6339 fclose(fp);
6340 }
6341 }
6342
6343 int main(int argc, char **argv) {
6344 initServerConfig();
6345 if (argc == 2) {
6346 resetServerSaveParams();
6347 loadServerConfig(argv[1]);
6348 } else if (argc > 2) {
6349 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6350 exit(1);
6351 } else {
6352 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6353 }
6354 if (server.daemonize) daemonize();
6355 initServer();
6356 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6357 #ifdef __linux__
6358 linuxOvercommitMemoryWarning();
6359 #endif
6360 if (server.appendonly) {
6361 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6362 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6363 } else {
6364 if (rdbLoad(server.dbfilename) == REDIS_OK)
6365 redisLog(REDIS_NOTICE,"DB loaded from disk");
6366 }
6367 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6368 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6369 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6370 aeMain(server.el);
6371 aeDeleteEventLoop(server.el);
6372 return 0;
6373 }
6374
6375 /* ============================= Backtrace support ========================= */
6376
6377 #ifdef HAVE_BACKTRACE
6378 static char *findFuncName(void *pointer, unsigned long *offset);
6379
6380 static void *getMcontextEip(ucontext_t *uc) {
6381 #if defined(__FreeBSD__)
6382 return (void*) uc->uc_mcontext.mc_eip;
6383 #elif defined(__dietlibc__)
6384 return (void*) uc->uc_mcontext.eip;
6385 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6386 #if __x86_64__
6387 return (void*) uc->uc_mcontext->__ss.__rip;
6388 #else
6389 return (void*) uc->uc_mcontext->__ss.__eip;
6390 #endif
6391 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6392 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6393 return (void*) uc->uc_mcontext->__ss.__rip;
6394 #else
6395 return (void*) uc->uc_mcontext->__ss.__eip;
6396 #endif
6397 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6398 return (void*) uc->uc_mcontext.gregs[REG_EIP];
6399 #elif defined(__ia64__) /* Linux IA64 */
6400 return (void*) uc->uc_mcontext.sc_ip;
6401 #else
6402 return NULL;
6403 #endif
6404 }
6405
6406 static void segvHandler(int sig, siginfo_t *info, void *secret) {
6407 void *trace[100];
6408 char **messages = NULL;
6409 int i, trace_size = 0;
6410 unsigned long offset=0;
6411 ucontext_t *uc = (ucontext_t*) secret;
6412 sds infostring;
6413 REDIS_NOTUSED(info);
6414
6415 redisLog(REDIS_WARNING,
6416 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6417 infostring = genRedisInfoString();
6418 redisLog(REDIS_WARNING, "%s",infostring);
6419 /* It's not safe to sdsfree() the returned string under memory
6420 * corruption conditions. Let it leak as we are going to abort */
6421
6422 trace_size = backtrace(trace, 100);
6423 /* overwrite sigaction with caller's address */
6424 if (getMcontextEip(uc) != NULL) {
6425 trace[1] = getMcontextEip(uc);
6426 }
6427 messages = backtrace_symbols(trace, trace_size);
6428
6429 for (i=1; i<trace_size; ++i) {
6430 char *fn = findFuncName(trace[i], &offset), *p;
6431
6432 p = strchr(messages[i],'+');
6433 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6434 redisLog(REDIS_WARNING,"%s", messages[i]);
6435 } else {
6436 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6437 }
6438 }
6439 // free(messages); Don't call free() with possibly corrupted memory.
6440 exit(0);
6441 }
6442
6443 static void setupSigSegvAction(void) {
6444 struct sigaction act;
6445
6446 sigemptyset (&act.sa_mask);
6447 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6448 * is used. Otherwise, sa_handler is used */
6449 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6450 act.sa_sigaction = segvHandler;
6451 sigaction (SIGSEGV, &act, NULL);
6452 sigaction (SIGBUS, &act, NULL);
6453 sigaction (SIGFPE, &act, NULL);
6454 sigaction (SIGILL, &act, NULL);
6455 sigaction (SIGBUS, &act, NULL);
6456 return;
6457 }
6458
6459 #include "staticsymbols.h"
6460 /* This function try to convert a pointer into a function name. It's used in
6461 * oreder to provide a backtrace under segmentation fault that's able to
6462 * display functions declared as static (otherwise the backtrace is useless). */
6463 static char *findFuncName(void *pointer, unsigned long *offset){
6464 int i, ret = -1;
6465 unsigned long off, minoff = 0;
6466
6467 /* Try to match against the Symbol with the smallest offset */
6468 for (i=0; symsTable[i].pointer; i++) {
6469 unsigned long lp = (unsigned long) pointer;
6470
6471 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6472 off=lp-symsTable[i].pointer;
6473 if (ret < 0 || off < minoff) {
6474 minoff=off;
6475 ret=i;
6476 }
6477 }
6478 }
6479 if (ret == -1) return NULL;
6480 *offset = minoff;
6481 return symsTable[ret].name;
6482 }
6483 #else /* HAVE_BACKTRACE */
6484 static void setupSigSegvAction(void) {
6485 }
6486 #endif /* HAVE_BACKTRACE */
6487
6488
6489
6490 /* The End */
6491
6492
6493