]> git.saurik.com Git - redis.git/blob - redis.c
3b86e0847298b0b2f1c872d11f6b54add3760b13
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.3.0"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Client flags */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160 #define REDIS_MULTI 16 /* This client is in a MULTI context */
161
162 /* Slave replication state - slave side */
163 #define REDIS_REPL_NONE 0 /* No active replication */
164 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
165 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
166
167 /* Slave replication state - from the point of view of master
168 * Note that in SEND_BULK and ONLINE state the slave receives new updates
169 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
170 * to start the next background saving in order to send updates to it. */
171 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
172 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
173 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
174 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
175
176 /* List related stuff */
177 #define REDIS_HEAD 0
178 #define REDIS_TAIL 1
179
180 /* Sort operations */
181 #define REDIS_SORT_GET 0
182 #define REDIS_SORT_ASC 1
183 #define REDIS_SORT_DESC 2
184 #define REDIS_SORTKEY_MAX 1024
185
186 /* Log levels */
187 #define REDIS_DEBUG 0
188 #define REDIS_NOTICE 1
189 #define REDIS_WARNING 2
190
191 /* Anti-warning macro... */
192 #define REDIS_NOTUSED(V) ((void) V)
193
194 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
195 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
196
197 /* Append only defines */
198 #define APPENDFSYNC_NO 0
199 #define APPENDFSYNC_ALWAYS 1
200 #define APPENDFSYNC_EVERYSEC 2
201
202 /* We can print the stacktrace, so our assert is defined this way: */
203 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
204 static void _redisAssert(char *estr);
205
206 /*================================= Data types ============================== */
207
208 /* A redis object, that is a type able to hold a string / list / set */
209 typedef struct redisObject {
210 void *ptr;
211 unsigned char type;
212 unsigned char encoding;
213 unsigned char notused[2];
214 int refcount;
215 } robj;
216
217 /* Macro used to initalize a Redis object allocated on the stack.
218 * Note that this macro is taken near the structure definition to make sure
219 * we'll update it when the structure is changed, to avoid bugs like
220 * bug #85 introduced exactly in this way. */
221 #define initStaticStringObject(_var,_ptr) do { \
222 _var.refcount = 1; \
223 _var.type = REDIS_STRING; \
224 _var.encoding = REDIS_ENCODING_RAW; \
225 _var.ptr = _ptr; \
226 } while(0);
227
228 typedef struct redisDb {
229 dict *dict;
230 dict *expires;
231 int id;
232 } redisDb;
233
234 /* Client MULTI/EXEC state */
235 typedef struct multiCmd {
236 robj **argv;
237 int argc;
238 struct redisCommand *cmd;
239 } multiCmd;
240
241 typedef struct multiState {
242 multiCmd *commands; /* Array of MULTI commands */
243 int count; /* Total number of MULTI commands */
244 } multiState;
245
246 /* With multiplexing we need to take per-clinet state.
247 * Clients are taken in a liked list. */
248 typedef struct redisClient {
249 int fd;
250 redisDb *db;
251 int dictid;
252 sds querybuf;
253 robj **argv, **mbargv;
254 int argc, mbargc;
255 int bulklen; /* bulk read len. -1 if not in bulk read mode */
256 int multibulk; /* multi bulk command format active */
257 list *reply;
258 int sentlen;
259 time_t lastinteraction; /* time of the last interaction, used for timeout */
260 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
261 /* REDIS_MULTI */
262 int slaveseldb; /* slave selected db, if this client is a slave */
263 int authenticated; /* when requirepass is non-NULL */
264 int replstate; /* replication state if this is a slave */
265 int repldbfd; /* replication DB file descriptor */
266 long repldboff; /* replication DB file offset */
267 off_t repldbsize; /* replication DB file size */
268 multiState mstate; /* MULTI/EXEC state */
269 } redisClient;
270
271 struct saveparam {
272 time_t seconds;
273 int changes;
274 };
275
276 /* Global server state structure */
277 struct redisServer {
278 int port;
279 int fd;
280 redisDb *db;
281 dict *sharingpool;
282 unsigned int sharingpoolsize;
283 long long dirty; /* changes to DB from the last save */
284 list *clients;
285 list *slaves, *monitors;
286 char neterr[ANET_ERR_LEN];
287 aeEventLoop *el;
288 int cronloops; /* number of times the cron function run */
289 list *objfreelist; /* A list of freed objects to avoid malloc() */
290 time_t lastsave; /* Unix time of last save succeeede */
291 size_t usedmemory; /* Used memory in megabytes */
292 /* Fields used only for stats */
293 time_t stat_starttime; /* server start time */
294 long long stat_numcommands; /* number of processed commands */
295 long long stat_numconnections; /* number of connections received */
296 /* Configuration */
297 int verbosity;
298 int glueoutputbuf;
299 int maxidletime;
300 int dbnum;
301 int daemonize;
302 int appendonly;
303 int appendfsync;
304 time_t lastfsync;
305 int appendfd;
306 int appendseldb;
307 char *pidfile;
308 pid_t bgsavechildpid;
309 pid_t bgrewritechildpid;
310 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
311 struct saveparam *saveparams;
312 int saveparamslen;
313 char *logfile;
314 char *bindaddr;
315 char *dbfilename;
316 char *appendfilename;
317 char *requirepass;
318 int shareobjects;
319 int rdbcompression;
320 /* Replication related */
321 int isslave;
322 char *masterauth;
323 char *masterhost;
324 int masterport;
325 redisClient *master; /* client that is master for this slave */
326 int replstate;
327 unsigned int maxclients;
328 unsigned long maxmemory;
329 /* Sort parameters - qsort_r() is only available under BSD so we
330 * have to take this state global, in order to pass it to sortCompare() */
331 int sort_desc;
332 int sort_alpha;
333 int sort_bypattern;
334 };
335
336 typedef void redisCommandProc(redisClient *c);
337 struct redisCommand {
338 char *name;
339 redisCommandProc *proc;
340 int arity;
341 int flags;
342 };
343
344 struct redisFunctionSym {
345 char *name;
346 unsigned long pointer;
347 };
348
349 typedef struct _redisSortObject {
350 robj *obj;
351 union {
352 double score;
353 robj *cmpobj;
354 } u;
355 } redisSortObject;
356
357 typedef struct _redisSortOperation {
358 int type;
359 robj *pattern;
360 } redisSortOperation;
361
362 /* ZSETs use a specialized version of Skiplists */
363
364 typedef struct zskiplistNode {
365 struct zskiplistNode **forward;
366 struct zskiplistNode *backward;
367 double score;
368 robj *obj;
369 } zskiplistNode;
370
371 typedef struct zskiplist {
372 struct zskiplistNode *header, *tail;
373 unsigned long length;
374 int level;
375 } zskiplist;
376
377 typedef struct zset {
378 dict *dict;
379 zskiplist *zsl;
380 } zset;
381
382 /* Our shared "common" objects */
383
384 struct sharedObjectsStruct {
385 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
386 *colon, *nullbulk, *nullmultibulk, *queued,
387 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
388 *outofrangeerr, *plus,
389 *select0, *select1, *select2, *select3, *select4,
390 *select5, *select6, *select7, *select8, *select9;
391 } shared;
392
393 /* Global vars that are actally used as constants. The following double
394 * values are used for double on-disk serialization, and are initialized
395 * at runtime to avoid strange compiler optimizations. */
396
397 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
398
399 /*================================ Prototypes =============================== */
400
401 static void freeStringObject(robj *o);
402 static void freeListObject(robj *o);
403 static void freeSetObject(robj *o);
404 static void decrRefCount(void *o);
405 static robj *createObject(int type, void *ptr);
406 static void freeClient(redisClient *c);
407 static int rdbLoad(char *filename);
408 static void addReply(redisClient *c, robj *obj);
409 static void addReplySds(redisClient *c, sds s);
410 static void incrRefCount(robj *o);
411 static int rdbSaveBackground(char *filename);
412 static robj *createStringObject(char *ptr, size_t len);
413 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
414 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
415 static int syncWithMaster(void);
416 static robj *tryObjectSharing(robj *o);
417 static int tryObjectEncoding(robj *o);
418 static robj *getDecodedObject(robj *o);
419 static int removeExpire(redisDb *db, robj *key);
420 static int expireIfNeeded(redisDb *db, robj *key);
421 static int deleteIfVolatile(redisDb *db, robj *key);
422 static int deleteKey(redisDb *db, robj *key);
423 static time_t getExpire(redisDb *db, robj *key);
424 static int setExpire(redisDb *db, robj *key, time_t when);
425 static void updateSlavesWaitingBgsave(int bgsaveerr);
426 static void freeMemoryIfNeeded(void);
427 static int processCommand(redisClient *c);
428 static void setupSigSegvAction(void);
429 static void rdbRemoveTempFile(pid_t childpid);
430 static void aofRemoveTempFile(pid_t childpid);
431 static size_t stringObjectLen(robj *o);
432 static void processInputBuffer(redisClient *c);
433 static zskiplist *zslCreate(void);
434 static void zslFree(zskiplist *zsl);
435 static void zslInsert(zskiplist *zsl, double score, robj *obj);
436 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
437 static void initClientMultiState(redisClient *c);
438 static void freeClientMultiState(redisClient *c);
439 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd);
440
441 static void authCommand(redisClient *c);
442 static void pingCommand(redisClient *c);
443 static void echoCommand(redisClient *c);
444 static void setCommand(redisClient *c);
445 static void setnxCommand(redisClient *c);
446 static void getCommand(redisClient *c);
447 static void delCommand(redisClient *c);
448 static void existsCommand(redisClient *c);
449 static void incrCommand(redisClient *c);
450 static void decrCommand(redisClient *c);
451 static void incrbyCommand(redisClient *c);
452 static void decrbyCommand(redisClient *c);
453 static void selectCommand(redisClient *c);
454 static void randomkeyCommand(redisClient *c);
455 static void keysCommand(redisClient *c);
456 static void dbsizeCommand(redisClient *c);
457 static void lastsaveCommand(redisClient *c);
458 static void saveCommand(redisClient *c);
459 static void bgsaveCommand(redisClient *c);
460 static void bgrewriteaofCommand(redisClient *c);
461 static void shutdownCommand(redisClient *c);
462 static void moveCommand(redisClient *c);
463 static void renameCommand(redisClient *c);
464 static void renamenxCommand(redisClient *c);
465 static void lpushCommand(redisClient *c);
466 static void rpushCommand(redisClient *c);
467 static void lpopCommand(redisClient *c);
468 static void rpopCommand(redisClient *c);
469 static void llenCommand(redisClient *c);
470 static void lindexCommand(redisClient *c);
471 static void lrangeCommand(redisClient *c);
472 static void ltrimCommand(redisClient *c);
473 static void typeCommand(redisClient *c);
474 static void lsetCommand(redisClient *c);
475 static void saddCommand(redisClient *c);
476 static void sremCommand(redisClient *c);
477 static void smoveCommand(redisClient *c);
478 static void sismemberCommand(redisClient *c);
479 static void scardCommand(redisClient *c);
480 static void spopCommand(redisClient *c);
481 static void srandmemberCommand(redisClient *c);
482 static void sinterCommand(redisClient *c);
483 static void sinterstoreCommand(redisClient *c);
484 static void sunionCommand(redisClient *c);
485 static void sunionstoreCommand(redisClient *c);
486 static void sdiffCommand(redisClient *c);
487 static void sdiffstoreCommand(redisClient *c);
488 static void syncCommand(redisClient *c);
489 static void flushdbCommand(redisClient *c);
490 static void flushallCommand(redisClient *c);
491 static void sortCommand(redisClient *c);
492 static void lremCommand(redisClient *c);
493 static void rpoplpushcommand(redisClient *c);
494 static void infoCommand(redisClient *c);
495 static void mgetCommand(redisClient *c);
496 static void monitorCommand(redisClient *c);
497 static void expireCommand(redisClient *c);
498 static void expireatCommand(redisClient *c);
499 static void getsetCommand(redisClient *c);
500 static void ttlCommand(redisClient *c);
501 static void slaveofCommand(redisClient *c);
502 static void debugCommand(redisClient *c);
503 static void msetCommand(redisClient *c);
504 static void msetnxCommand(redisClient *c);
505 static void zaddCommand(redisClient *c);
506 static void zincrbyCommand(redisClient *c);
507 static void zrangeCommand(redisClient *c);
508 static void zrangebyscoreCommand(redisClient *c);
509 static void zrevrangeCommand(redisClient *c);
510 static void zcardCommand(redisClient *c);
511 static void zremCommand(redisClient *c);
512 static void zscoreCommand(redisClient *c);
513 static void zremrangebyscoreCommand(redisClient *c);
514 static void multiCommand(redisClient *c);
515 static void execCommand(redisClient *c);
516 static void aofsyncCommand(redisClient *c);
517
518 /*================================= Globals ================================= */
519
520 /* Global vars */
521 static struct redisServer server; /* server global state */
522 static struct redisCommand cmdTable[] = {
523 {"get",getCommand,2,REDIS_CMD_INLINE},
524 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
525 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
526 {"del",delCommand,-2,REDIS_CMD_INLINE},
527 {"exists",existsCommand,2,REDIS_CMD_INLINE},
528 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
529 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
530 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
531 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
532 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
533 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
534 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
535 {"llen",llenCommand,2,REDIS_CMD_INLINE},
536 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
537 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
538 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
539 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
540 {"lrem",lremCommand,4,REDIS_CMD_BULK},
541 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
542 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
543 {"srem",sremCommand,3,REDIS_CMD_BULK},
544 {"smove",smoveCommand,4,REDIS_CMD_BULK},
545 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
546 {"scard",scardCommand,2,REDIS_CMD_INLINE},
547 {"spop",spopCommand,2,REDIS_CMD_INLINE},
548 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
549 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
550 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
551 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
552 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
553 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
554 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
555 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
556 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
557 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
558 {"zrem",zremCommand,3,REDIS_CMD_BULK},
559 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
560 {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE},
561 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
562 {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE},
563 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
564 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
565 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
566 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
567 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
568 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
569 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
570 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
571 {"select",selectCommand,2,REDIS_CMD_INLINE},
572 {"move",moveCommand,3,REDIS_CMD_INLINE},
573 {"rename",renameCommand,3,REDIS_CMD_INLINE},
574 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
575 {"expire",expireCommand,3,REDIS_CMD_INLINE},
576 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
577 {"keys",keysCommand,2,REDIS_CMD_INLINE},
578 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
579 {"auth",authCommand,2,REDIS_CMD_INLINE},
580 {"ping",pingCommand,1,REDIS_CMD_INLINE},
581 {"echo",echoCommand,2,REDIS_CMD_BULK},
582 {"save",saveCommand,1,REDIS_CMD_INLINE},
583 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
584 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
585 {"aofsync",aofsyncCommand,1,REDIS_CMD_INLINE},
586 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
587 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
588 {"type",typeCommand,2,REDIS_CMD_INLINE},
589 {"multi",multiCommand,1,REDIS_CMD_INLINE},
590 {"exec",execCommand,1,REDIS_CMD_INLINE},
591 {"sync",syncCommand,1,REDIS_CMD_INLINE},
592 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
593 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
594 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
595 {"info",infoCommand,1,REDIS_CMD_INLINE},
596 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
597 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
598 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
599 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
600 {NULL,NULL,0,0}
601 };
602
603 /*============================ Utility functions ============================ */
604
605 /* Glob-style pattern matching. */
606 int stringmatchlen(const char *pattern, int patternLen,
607 const char *string, int stringLen, int nocase)
608 {
609 while(patternLen) {
610 switch(pattern[0]) {
611 case '*':
612 while (pattern[1] == '*') {
613 pattern++;
614 patternLen--;
615 }
616 if (patternLen == 1)
617 return 1; /* match */
618 while(stringLen) {
619 if (stringmatchlen(pattern+1, patternLen-1,
620 string, stringLen, nocase))
621 return 1; /* match */
622 string++;
623 stringLen--;
624 }
625 return 0; /* no match */
626 break;
627 case '?':
628 if (stringLen == 0)
629 return 0; /* no match */
630 string++;
631 stringLen--;
632 break;
633 case '[':
634 {
635 int not, match;
636
637 pattern++;
638 patternLen--;
639 not = pattern[0] == '^';
640 if (not) {
641 pattern++;
642 patternLen--;
643 }
644 match = 0;
645 while(1) {
646 if (pattern[0] == '\\') {
647 pattern++;
648 patternLen--;
649 if (pattern[0] == string[0])
650 match = 1;
651 } else if (pattern[0] == ']') {
652 break;
653 } else if (patternLen == 0) {
654 pattern--;
655 patternLen++;
656 break;
657 } else if (pattern[1] == '-' && patternLen >= 3) {
658 int start = pattern[0];
659 int end = pattern[2];
660 int c = string[0];
661 if (start > end) {
662 int t = start;
663 start = end;
664 end = t;
665 }
666 if (nocase) {
667 start = tolower(start);
668 end = tolower(end);
669 c = tolower(c);
670 }
671 pattern += 2;
672 patternLen -= 2;
673 if (c >= start && c <= end)
674 match = 1;
675 } else {
676 if (!nocase) {
677 if (pattern[0] == string[0])
678 match = 1;
679 } else {
680 if (tolower((int)pattern[0]) == tolower((int)string[0]))
681 match = 1;
682 }
683 }
684 pattern++;
685 patternLen--;
686 }
687 if (not)
688 match = !match;
689 if (!match)
690 return 0; /* no match */
691 string++;
692 stringLen--;
693 break;
694 }
695 case '\\':
696 if (patternLen >= 2) {
697 pattern++;
698 patternLen--;
699 }
700 /* fall through */
701 default:
702 if (!nocase) {
703 if (pattern[0] != string[0])
704 return 0; /* no match */
705 } else {
706 if (tolower((int)pattern[0]) != tolower((int)string[0]))
707 return 0; /* no match */
708 }
709 string++;
710 stringLen--;
711 break;
712 }
713 pattern++;
714 patternLen--;
715 if (stringLen == 0) {
716 while(*pattern == '*') {
717 pattern++;
718 patternLen--;
719 }
720 break;
721 }
722 }
723 if (patternLen == 0 && stringLen == 0)
724 return 1;
725 return 0;
726 }
727
728 static void redisLog(int level, const char *fmt, ...) {
729 va_list ap;
730 FILE *fp;
731
732 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
733 if (!fp) return;
734
735 va_start(ap, fmt);
736 if (level >= server.verbosity) {
737 char *c = ".-*";
738 char buf[64];
739 time_t now;
740
741 now = time(NULL);
742 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
743 fprintf(fp,"%s %c ",buf,c[level]);
744 vfprintf(fp, fmt, ap);
745 fprintf(fp,"\n");
746 fflush(fp);
747 }
748 va_end(ap);
749
750 if (server.logfile) fclose(fp);
751 }
752
753 /*====================== Hash table type implementation ==================== */
754
755 /* This is an hash table type that uses the SDS dynamic strings libary as
756 * keys and radis objects as values (objects can hold SDS strings,
757 * lists, sets). */
758
759 static void dictVanillaFree(void *privdata, void *val)
760 {
761 DICT_NOTUSED(privdata);
762 zfree(val);
763 }
764
765 static int sdsDictKeyCompare(void *privdata, const void *key1,
766 const void *key2)
767 {
768 int l1,l2;
769 DICT_NOTUSED(privdata);
770
771 l1 = sdslen((sds)key1);
772 l2 = sdslen((sds)key2);
773 if (l1 != l2) return 0;
774 return memcmp(key1, key2, l1) == 0;
775 }
776
777 static void dictRedisObjectDestructor(void *privdata, void *val)
778 {
779 DICT_NOTUSED(privdata);
780
781 decrRefCount(val);
782 }
783
784 static int dictObjKeyCompare(void *privdata, const void *key1,
785 const void *key2)
786 {
787 const robj *o1 = key1, *o2 = key2;
788 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
789 }
790
791 static unsigned int dictObjHash(const void *key) {
792 const robj *o = key;
793 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
794 }
795
796 static int dictEncObjKeyCompare(void *privdata, const void *key1,
797 const void *key2)
798 {
799 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
800 int cmp;
801
802 o1 = getDecodedObject(o1);
803 o2 = getDecodedObject(o2);
804 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
805 decrRefCount(o1);
806 decrRefCount(o2);
807 return cmp;
808 }
809
810 static unsigned int dictEncObjHash(const void *key) {
811 robj *o = (robj*) key;
812
813 o = getDecodedObject(o);
814 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
815 decrRefCount(o);
816 return hash;
817 }
818
819 static dictType setDictType = {
820 dictEncObjHash, /* hash function */
821 NULL, /* key dup */
822 NULL, /* val dup */
823 dictEncObjKeyCompare, /* key compare */
824 dictRedisObjectDestructor, /* key destructor */
825 NULL /* val destructor */
826 };
827
828 static dictType zsetDictType = {
829 dictEncObjHash, /* hash function */
830 NULL, /* key dup */
831 NULL, /* val dup */
832 dictEncObjKeyCompare, /* key compare */
833 dictRedisObjectDestructor, /* key destructor */
834 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
835 };
836
837 static dictType hashDictType = {
838 dictObjHash, /* hash function */
839 NULL, /* key dup */
840 NULL, /* val dup */
841 dictObjKeyCompare, /* key compare */
842 dictRedisObjectDestructor, /* key destructor */
843 dictRedisObjectDestructor /* val destructor */
844 };
845
846 /* ========================= Random utility functions ======================= */
847
848 /* Redis generally does not try to recover from out of memory conditions
849 * when allocating objects or strings, it is not clear if it will be possible
850 * to report this condition to the client since the networking layer itself
851 * is based on heap allocation for send buffers, so we simply abort.
852 * At least the code will be simpler to read... */
853 static void oom(const char *msg) {
854 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
855 sleep(1);
856 abort();
857 }
858
859 /* ====================== Redis server networking stuff ===================== */
860 static void closeTimedoutClients(void) {
861 redisClient *c;
862 listNode *ln;
863 time_t now = time(NULL);
864
865 listRewind(server.clients);
866 while ((ln = listYield(server.clients)) != NULL) {
867 c = listNodeValue(ln);
868 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
869 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
870 (now - c->lastinteraction > server.maxidletime)) {
871 redisLog(REDIS_DEBUG,"Closing idle client");
872 freeClient(c);
873 }
874 }
875 }
876
877 static int htNeedsResize(dict *dict) {
878 long long size, used;
879
880 size = dictSlots(dict);
881 used = dictSize(dict);
882 return (size && used && size > DICT_HT_INITIAL_SIZE &&
883 (used*100/size < REDIS_HT_MINFILL));
884 }
885
886 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
887 * we resize the hash table to save memory */
888 static void tryResizeHashTables(void) {
889 int j;
890
891 for (j = 0; j < server.dbnum; j++) {
892 if (htNeedsResize(server.db[j].dict)) {
893 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
894 dictResize(server.db[j].dict);
895 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
896 }
897 if (htNeedsResize(server.db[j].expires))
898 dictResize(server.db[j].expires);
899 }
900 }
901
902 /* A background saving child (BGSAVE) terminated its work. Handle this. */
903 void backgroundSaveDoneHandler(int statloc) {
904 int exitcode = WEXITSTATUS(statloc);
905 int bysignal = WIFSIGNALED(statloc);
906
907 if (!bysignal && exitcode == 0) {
908 redisLog(REDIS_NOTICE,
909 "Background saving terminated with success");
910 server.dirty = 0;
911 server.lastsave = time(NULL);
912 } else if (!bysignal && exitcode != 0) {
913 redisLog(REDIS_WARNING, "Background saving error");
914 } else {
915 redisLog(REDIS_WARNING,
916 "Background saving terminated by signal");
917 rdbRemoveTempFile(server.bgsavechildpid);
918 }
919 server.bgsavechildpid = -1;
920 /* Possibly there are slaves waiting for a BGSAVE in order to be served
921 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
922 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
923 }
924
925 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
926 * Handle this. */
927 void backgroundRewriteDoneHandler(int statloc) {
928 int exitcode = WEXITSTATUS(statloc);
929 int bysignal = WIFSIGNALED(statloc);
930
931 if (!bysignal && exitcode == 0) {
932 int fd;
933 char tmpfile[256];
934
935 redisLog(REDIS_NOTICE,
936 "Background append only file rewriting terminated with success");
937 /* Now it's time to flush the differences accumulated by the parent */
938 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
939 fd = open(tmpfile,O_WRONLY|O_APPEND);
940 if (fd == -1) {
941 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
942 goto cleanup;
943 }
944 /* Flush our data... */
945 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
946 (signed) sdslen(server.bgrewritebuf)) {
947 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
948 close(fd);
949 goto cleanup;
950 }
951 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
952 /* Now our work is to rename the temp file into the stable file. And
953 * switch the file descriptor used by the server for append only. */
954 if (rename(tmpfile,server.appendfilename) == -1) {
955 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
956 close(fd);
957 goto cleanup;
958 }
959 /* Mission completed... almost */
960 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
961 if (server.appendfd != -1) {
962 /* If append only is actually enabled... */
963 close(server.appendfd);
964 server.appendfd = fd;
965 fsync(fd);
966 server.appendseldb = -1; /* Make sure it will issue SELECT */
967 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
968 } else {
969 /* If append only is disabled we just generate a dump in this
970 * format. Why not? */
971 close(fd);
972 }
973 } else if (!bysignal && exitcode != 0) {
974 redisLog(REDIS_WARNING, "Background append only file rewriting error");
975 } else {
976 redisLog(REDIS_WARNING,
977 "Background append only file rewriting terminated by signal");
978 }
979 cleanup:
980 sdsfree(server.bgrewritebuf);
981 server.bgrewritebuf = sdsempty();
982 aofRemoveTempFile(server.bgrewritechildpid);
983 server.bgrewritechildpid = -1;
984 }
985
986 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
987 int j, loops = server.cronloops++;
988 REDIS_NOTUSED(eventLoop);
989 REDIS_NOTUSED(id);
990 REDIS_NOTUSED(clientData);
991
992 /* Update the global state with the amount of used memory */
993 server.usedmemory = zmalloc_used_memory();
994
995 /* Show some info about non-empty databases */
996 for (j = 0; j < server.dbnum; j++) {
997 long long size, used, vkeys;
998
999 size = dictSlots(server.db[j].dict);
1000 used = dictSize(server.db[j].dict);
1001 vkeys = dictSize(server.db[j].expires);
1002 if (!(loops % 5) && (used || vkeys)) {
1003 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
1004 /* dictPrintStats(server.dict); */
1005 }
1006 }
1007
1008 /* We don't want to resize the hash tables while a bacground saving
1009 * is in progress: the saving child is created using fork() that is
1010 * implemented with a copy-on-write semantic in most modern systems, so
1011 * if we resize the HT while there is the saving child at work actually
1012 * a lot of memory movements in the parent will cause a lot of pages
1013 * copied. */
1014 if (server.bgsavechildpid == -1) tryResizeHashTables();
1015
1016 /* Show information about connected clients */
1017 if (!(loops % 5)) {
1018 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1019 listLength(server.clients)-listLength(server.slaves),
1020 listLength(server.slaves),
1021 server.usedmemory,
1022 dictSize(server.sharingpool));
1023 }
1024
1025 /* Close connections of timedout clients */
1026 if (server.maxidletime && !(loops % 10))
1027 closeTimedoutClients();
1028
1029 /* Check if a background saving or AOF rewrite in progress terminated */
1030 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
1031 int statloc;
1032 pid_t pid;
1033
1034 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1035 if (pid == server.bgsavechildpid) {
1036 backgroundSaveDoneHandler(statloc);
1037 } else {
1038 backgroundRewriteDoneHandler(statloc);
1039 }
1040 }
1041 } else {
1042 /* If there is not a background saving in progress check if
1043 * we have to save now */
1044 time_t now = time(NULL);
1045 for (j = 0; j < server.saveparamslen; j++) {
1046 struct saveparam *sp = server.saveparams+j;
1047
1048 if (server.dirty >= sp->changes &&
1049 now-server.lastsave > sp->seconds) {
1050 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1051 sp->changes, sp->seconds);
1052 rdbSaveBackground(server.dbfilename);
1053 break;
1054 }
1055 }
1056 }
1057
1058 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1059 * will use few CPU cycles if there are few expiring keys, otherwise
1060 * it will get more aggressive to avoid that too much memory is used by
1061 * keys that can be removed from the keyspace. */
1062 for (j = 0; j < server.dbnum; j++) {
1063 int expired;
1064 redisDb *db = server.db+j;
1065
1066 /* Continue to expire if at the end of the cycle more than 25%
1067 * of the keys were expired. */
1068 do {
1069 int num = dictSize(db->expires);
1070 time_t now = time(NULL);
1071
1072 expired = 0;
1073 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1074 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1075 while (num--) {
1076 dictEntry *de;
1077 time_t t;
1078
1079 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1080 t = (time_t) dictGetEntryVal(de);
1081 if (now > t) {
1082 deleteKey(db,dictGetEntryKey(de));
1083 expired++;
1084 }
1085 }
1086 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1087 }
1088
1089 /* Check if we should connect to a MASTER */
1090 if (server.replstate == REDIS_REPL_CONNECT) {
1091 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1092 if (syncWithMaster() == REDIS_OK) {
1093 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1094 }
1095 }
1096 return 1000;
1097 }
1098
1099 static void createSharedObjects(void) {
1100 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1101 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1102 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1103 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1104 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1105 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1106 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1107 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1108 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1109 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1110 shared.queued = createObject(REDIS_STRING,sdsnew("+QUEUED\r\n"));
1111 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1112 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1113 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1114 "-ERR no such key\r\n"));
1115 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1116 "-ERR syntax error\r\n"));
1117 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1118 "-ERR source and destination objects are the same\r\n"));
1119 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1120 "-ERR index out of range\r\n"));
1121 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1122 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1123 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1124 shared.select0 = createStringObject("select 0\r\n",10);
1125 shared.select1 = createStringObject("select 1\r\n",10);
1126 shared.select2 = createStringObject("select 2\r\n",10);
1127 shared.select3 = createStringObject("select 3\r\n",10);
1128 shared.select4 = createStringObject("select 4\r\n",10);
1129 shared.select5 = createStringObject("select 5\r\n",10);
1130 shared.select6 = createStringObject("select 6\r\n",10);
1131 shared.select7 = createStringObject("select 7\r\n",10);
1132 shared.select8 = createStringObject("select 8\r\n",10);
1133 shared.select9 = createStringObject("select 9\r\n",10);
1134 }
1135
1136 static void appendServerSaveParams(time_t seconds, int changes) {
1137 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1138 server.saveparams[server.saveparamslen].seconds = seconds;
1139 server.saveparams[server.saveparamslen].changes = changes;
1140 server.saveparamslen++;
1141 }
1142
1143 static void resetServerSaveParams() {
1144 zfree(server.saveparams);
1145 server.saveparams = NULL;
1146 server.saveparamslen = 0;
1147 }
1148
1149 static void initServerConfig() {
1150 server.dbnum = REDIS_DEFAULT_DBNUM;
1151 server.port = REDIS_SERVERPORT;
1152 server.verbosity = REDIS_DEBUG;
1153 server.maxidletime = REDIS_MAXIDLETIME;
1154 server.saveparams = NULL;
1155 server.logfile = NULL; /* NULL = log on standard output */
1156 server.bindaddr = NULL;
1157 server.glueoutputbuf = 1;
1158 server.daemonize = 0;
1159 server.appendonly = 0;
1160 server.appendfsync = APPENDFSYNC_ALWAYS;
1161 server.lastfsync = time(NULL);
1162 server.appendfd = -1;
1163 server.appendseldb = -1; /* Make sure the first time will not match */
1164 server.pidfile = "/var/run/redis.pid";
1165 server.dbfilename = "dump.rdb";
1166 server.appendfilename = "appendonly.aof";
1167 server.requirepass = NULL;
1168 server.shareobjects = 0;
1169 server.rdbcompression = 1;
1170 server.sharingpoolsize = 1024;
1171 server.maxclients = 0;
1172 server.maxmemory = 0;
1173 resetServerSaveParams();
1174
1175 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1176 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1177 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1178 /* Replication related */
1179 server.isslave = 0;
1180 server.masterauth = NULL;
1181 server.masterhost = NULL;
1182 server.masterport = 6379;
1183 server.master = NULL;
1184 server.replstate = REDIS_REPL_NONE;
1185
1186 /* Double constants initialization */
1187 R_Zero = 0.0;
1188 R_PosInf = 1.0/R_Zero;
1189 R_NegInf = -1.0/R_Zero;
1190 R_Nan = R_Zero/R_Zero;
1191 }
1192
1193 static void initServer() {
1194 int j;
1195
1196 signal(SIGHUP, SIG_IGN);
1197 signal(SIGPIPE, SIG_IGN);
1198 setupSigSegvAction();
1199
1200 server.clients = listCreate();
1201 server.slaves = listCreate();
1202 server.monitors = listCreate();
1203 server.objfreelist = listCreate();
1204 createSharedObjects();
1205 server.el = aeCreateEventLoop();
1206 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1207 server.sharingpool = dictCreate(&setDictType,NULL);
1208 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1209 if (server.fd == -1) {
1210 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1211 exit(1);
1212 }
1213 for (j = 0; j < server.dbnum; j++) {
1214 server.db[j].dict = dictCreate(&hashDictType,NULL);
1215 server.db[j].expires = dictCreate(&setDictType,NULL);
1216 server.db[j].id = j;
1217 }
1218 server.cronloops = 0;
1219 server.bgsavechildpid = -1;
1220 server.bgrewritechildpid = -1;
1221 server.bgrewritebuf = sdsempty();
1222 server.lastsave = time(NULL);
1223 server.dirty = 0;
1224 server.usedmemory = 0;
1225 server.stat_numcommands = 0;
1226 server.stat_numconnections = 0;
1227 server.stat_starttime = time(NULL);
1228 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1229
1230 if (server.appendonly) {
1231 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1232 if (server.appendfd == -1) {
1233 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1234 strerror(errno));
1235 exit(1);
1236 }
1237 }
1238 }
1239
1240 /* Empty the whole database */
1241 static long long emptyDb() {
1242 int j;
1243 long long removed = 0;
1244
1245 for (j = 0; j < server.dbnum; j++) {
1246 removed += dictSize(server.db[j].dict);
1247 dictEmpty(server.db[j].dict);
1248 dictEmpty(server.db[j].expires);
1249 }
1250 return removed;
1251 }
1252
1253 static int yesnotoi(char *s) {
1254 if (!strcasecmp(s,"yes")) return 1;
1255 else if (!strcasecmp(s,"no")) return 0;
1256 else return -1;
1257 }
1258
1259 /* I agree, this is a very rudimental way to load a configuration...
1260 will improve later if the config gets more complex */
1261 static void loadServerConfig(char *filename) {
1262 FILE *fp;
1263 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1264 int linenum = 0;
1265 sds line = NULL;
1266
1267 if (filename[0] == '-' && filename[1] == '\0')
1268 fp = stdin;
1269 else {
1270 if ((fp = fopen(filename,"r")) == NULL) {
1271 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1272 exit(1);
1273 }
1274 }
1275
1276 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1277 sds *argv;
1278 int argc, j;
1279
1280 linenum++;
1281 line = sdsnew(buf);
1282 line = sdstrim(line," \t\r\n");
1283
1284 /* Skip comments and blank lines*/
1285 if (line[0] == '#' || line[0] == '\0') {
1286 sdsfree(line);
1287 continue;
1288 }
1289
1290 /* Split into arguments */
1291 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1292 sdstolower(argv[0]);
1293
1294 /* Execute config directives */
1295 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1296 server.maxidletime = atoi(argv[1]);
1297 if (server.maxidletime < 0) {
1298 err = "Invalid timeout value"; goto loaderr;
1299 }
1300 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1301 server.port = atoi(argv[1]);
1302 if (server.port < 1 || server.port > 65535) {
1303 err = "Invalid port"; goto loaderr;
1304 }
1305 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1306 server.bindaddr = zstrdup(argv[1]);
1307 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1308 int seconds = atoi(argv[1]);
1309 int changes = atoi(argv[2]);
1310 if (seconds < 1 || changes < 0) {
1311 err = "Invalid save parameters"; goto loaderr;
1312 }
1313 appendServerSaveParams(seconds,changes);
1314 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1315 if (chdir(argv[1]) == -1) {
1316 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1317 argv[1], strerror(errno));
1318 exit(1);
1319 }
1320 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1321 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1322 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1323 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1324 else {
1325 err = "Invalid log level. Must be one of debug, notice, warning";
1326 goto loaderr;
1327 }
1328 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1329 FILE *logfp;
1330
1331 server.logfile = zstrdup(argv[1]);
1332 if (!strcasecmp(server.logfile,"stdout")) {
1333 zfree(server.logfile);
1334 server.logfile = NULL;
1335 }
1336 if (server.logfile) {
1337 /* Test if we are able to open the file. The server will not
1338 * be able to abort just for this problem later... */
1339 logfp = fopen(server.logfile,"a");
1340 if (logfp == NULL) {
1341 err = sdscatprintf(sdsempty(),
1342 "Can't open the log file: %s", strerror(errno));
1343 goto loaderr;
1344 }
1345 fclose(logfp);
1346 }
1347 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1348 server.dbnum = atoi(argv[1]);
1349 if (server.dbnum < 1) {
1350 err = "Invalid number of databases"; goto loaderr;
1351 }
1352 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1353 server.maxclients = atoi(argv[1]);
1354 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1355 server.maxmemory = strtoll(argv[1], NULL, 10);
1356 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1357 server.masterhost = sdsnew(argv[1]);
1358 server.masterport = atoi(argv[2]);
1359 server.replstate = REDIS_REPL_CONNECT;
1360 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1361 server.masterauth = zstrdup(argv[1]);
1362 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1363 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1364 err = "argument must be 'yes' or 'no'"; goto loaderr;
1365 }
1366 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1367 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1368 err = "argument must be 'yes' or 'no'"; goto loaderr;
1369 }
1370 } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
1371 if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
1372 err = "argument must be 'yes' or 'no'"; goto loaderr;
1373 }
1374 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1375 server.sharingpoolsize = atoi(argv[1]);
1376 if (server.sharingpoolsize < 1) {
1377 err = "invalid object sharing pool size"; goto loaderr;
1378 }
1379 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1380 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1381 err = "argument must be 'yes' or 'no'"; goto loaderr;
1382 }
1383 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1384 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1385 err = "argument must be 'yes' or 'no'"; goto loaderr;
1386 }
1387 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1388 if (!strcasecmp(argv[1],"no")) {
1389 server.appendfsync = APPENDFSYNC_NO;
1390 } else if (!strcasecmp(argv[1],"always")) {
1391 server.appendfsync = APPENDFSYNC_ALWAYS;
1392 } else if (!strcasecmp(argv[1],"everysec")) {
1393 server.appendfsync = APPENDFSYNC_EVERYSEC;
1394 } else {
1395 err = "argument must be 'no', 'always' or 'everysec'";
1396 goto loaderr;
1397 }
1398 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1399 server.requirepass = zstrdup(argv[1]);
1400 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1401 server.pidfile = zstrdup(argv[1]);
1402 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1403 server.dbfilename = zstrdup(argv[1]);
1404 } else {
1405 err = "Bad directive or wrong number of arguments"; goto loaderr;
1406 }
1407 for (j = 0; j < argc; j++)
1408 sdsfree(argv[j]);
1409 zfree(argv);
1410 sdsfree(line);
1411 }
1412 if (fp != stdin) fclose(fp);
1413 return;
1414
1415 loaderr:
1416 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1417 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1418 fprintf(stderr, ">>> '%s'\n", line);
1419 fprintf(stderr, "%s\n", err);
1420 exit(1);
1421 }
1422
1423 static void freeClientArgv(redisClient *c) {
1424 int j;
1425
1426 for (j = 0; j < c->argc; j++)
1427 decrRefCount(c->argv[j]);
1428 for (j = 0; j < c->mbargc; j++)
1429 decrRefCount(c->mbargv[j]);
1430 c->argc = 0;
1431 c->mbargc = 0;
1432 }
1433
1434 static void freeClient(redisClient *c) {
1435 listNode *ln;
1436
1437 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1438 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1439 sdsfree(c->querybuf);
1440 listRelease(c->reply);
1441 freeClientArgv(c);
1442 close(c->fd);
1443 ln = listSearchKey(server.clients,c);
1444 redisAssert(ln != NULL);
1445 listDelNode(server.clients,ln);
1446 if (c->flags & REDIS_SLAVE) {
1447 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1448 close(c->repldbfd);
1449 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1450 ln = listSearchKey(l,c);
1451 redisAssert(ln != NULL);
1452 listDelNode(l,ln);
1453 }
1454 if (c->flags & REDIS_MASTER) {
1455 server.master = NULL;
1456 server.replstate = REDIS_REPL_CONNECT;
1457 }
1458 zfree(c->argv);
1459 zfree(c->mbargv);
1460 freeClientMultiState(c);
1461 zfree(c);
1462 }
1463
1464 #define GLUEREPLY_UP_TO (1024)
1465 static void glueReplyBuffersIfNeeded(redisClient *c) {
1466 int copylen = 0;
1467 char buf[GLUEREPLY_UP_TO];
1468 listNode *ln;
1469 robj *o;
1470
1471 listRewind(c->reply);
1472 while((ln = listYield(c->reply))) {
1473 int objlen;
1474
1475 o = ln->value;
1476 objlen = sdslen(o->ptr);
1477 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1478 memcpy(buf+copylen,o->ptr,objlen);
1479 copylen += objlen;
1480 listDelNode(c->reply,ln);
1481 } else {
1482 if (copylen == 0) return;
1483 break;
1484 }
1485 }
1486 /* Now the output buffer is empty, add the new single element */
1487 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1488 listAddNodeHead(c->reply,o);
1489 }
1490
1491 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1492 redisClient *c = privdata;
1493 int nwritten = 0, totwritten = 0, objlen;
1494 robj *o;
1495 REDIS_NOTUSED(el);
1496 REDIS_NOTUSED(mask);
1497
1498 /* Use writev() if we have enough buffers to send */
1499 if (!server.glueoutputbuf &&
1500 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1501 !(c->flags & REDIS_MASTER))
1502 {
1503 sendReplyToClientWritev(el, fd, privdata, mask);
1504 return;
1505 }
1506
1507 while(listLength(c->reply)) {
1508 if (server.glueoutputbuf && listLength(c->reply) > 1)
1509 glueReplyBuffersIfNeeded(c);
1510
1511 o = listNodeValue(listFirst(c->reply));
1512 objlen = sdslen(o->ptr);
1513
1514 if (objlen == 0) {
1515 listDelNode(c->reply,listFirst(c->reply));
1516 continue;
1517 }
1518
1519 if (c->flags & REDIS_MASTER) {
1520 /* Don't reply to a master */
1521 nwritten = objlen - c->sentlen;
1522 } else {
1523 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1524 if (nwritten <= 0) break;
1525 }
1526 c->sentlen += nwritten;
1527 totwritten += nwritten;
1528 /* If we fully sent the object on head go to the next one */
1529 if (c->sentlen == objlen) {
1530 listDelNode(c->reply,listFirst(c->reply));
1531 c->sentlen = 0;
1532 }
1533 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1534 * bytes, in a single threaded server it's a good idea to serve
1535 * other clients as well, even if a very large request comes from
1536 * super fast link that is always able to accept data (in real world
1537 * scenario think about 'KEYS *' against the loopback interfae) */
1538 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1539 }
1540 if (nwritten == -1) {
1541 if (errno == EAGAIN) {
1542 nwritten = 0;
1543 } else {
1544 redisLog(REDIS_DEBUG,
1545 "Error writing to client: %s", strerror(errno));
1546 freeClient(c);
1547 return;
1548 }
1549 }
1550 if (totwritten > 0) c->lastinteraction = time(NULL);
1551 if (listLength(c->reply) == 0) {
1552 c->sentlen = 0;
1553 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1554 }
1555 }
1556
1557 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1558 {
1559 redisClient *c = privdata;
1560 int nwritten = 0, totwritten = 0, objlen, willwrite;
1561 robj *o;
1562 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1563 int offset, ion = 0;
1564 REDIS_NOTUSED(el);
1565 REDIS_NOTUSED(mask);
1566
1567 listNode *node;
1568 while (listLength(c->reply)) {
1569 offset = c->sentlen;
1570 ion = 0;
1571 willwrite = 0;
1572
1573 /* fill-in the iov[] array */
1574 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1575 o = listNodeValue(node);
1576 objlen = sdslen(o->ptr);
1577
1578 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1579 break;
1580
1581 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1582 break; /* no more iovecs */
1583
1584 iov[ion].iov_base = ((char*)o->ptr) + offset;
1585 iov[ion].iov_len = objlen - offset;
1586 willwrite += objlen - offset;
1587 offset = 0; /* just for the first item */
1588 ion++;
1589 }
1590
1591 if(willwrite == 0)
1592 break;
1593
1594 /* write all collected blocks at once */
1595 if((nwritten = writev(fd, iov, ion)) < 0) {
1596 if (errno != EAGAIN) {
1597 redisLog(REDIS_DEBUG,
1598 "Error writing to client: %s", strerror(errno));
1599 freeClient(c);
1600 return;
1601 }
1602 break;
1603 }
1604
1605 totwritten += nwritten;
1606 offset = c->sentlen;
1607
1608 /* remove written robjs from c->reply */
1609 while (nwritten && listLength(c->reply)) {
1610 o = listNodeValue(listFirst(c->reply));
1611 objlen = sdslen(o->ptr);
1612
1613 if(nwritten >= objlen - offset) {
1614 listDelNode(c->reply, listFirst(c->reply));
1615 nwritten -= objlen - offset;
1616 c->sentlen = 0;
1617 } else {
1618 /* partial write */
1619 c->sentlen += nwritten;
1620 break;
1621 }
1622 offset = 0;
1623 }
1624 }
1625
1626 if (totwritten > 0)
1627 c->lastinteraction = time(NULL);
1628
1629 if (listLength(c->reply) == 0) {
1630 c->sentlen = 0;
1631 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1632 }
1633 }
1634
1635 static struct redisCommand *lookupCommand(char *name) {
1636 int j = 0;
1637 while(cmdTable[j].name != NULL) {
1638 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1639 j++;
1640 }
1641 return NULL;
1642 }
1643
1644 /* resetClient prepare the client to process the next command */
1645 static void resetClient(redisClient *c) {
1646 freeClientArgv(c);
1647 c->bulklen = -1;
1648 c->multibulk = 0;
1649 }
1650
1651 /* Call() is the core of Redis execution of a command */
1652 static void call(redisClient *c, struct redisCommand *cmd) {
1653 long long dirty;
1654
1655 dirty = server.dirty;
1656 cmd->proc(c);
1657 if (server.appendonly && server.dirty-dirty)
1658 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1659 if (server.dirty-dirty && listLength(server.slaves))
1660 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1661 if (listLength(server.monitors))
1662 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1663 server.stat_numcommands++;
1664 }
1665
1666 /* If this function gets called we already read a whole
1667 * command, argments are in the client argv/argc fields.
1668 * processCommand() execute the command or prepare the
1669 * server for a bulk read from the client.
1670 *
1671 * If 1 is returned the client is still alive and valid and
1672 * and other operations can be performed by the caller. Otherwise
1673 * if 0 is returned the client was destroied (i.e. after QUIT). */
1674 static int processCommand(redisClient *c) {
1675 struct redisCommand *cmd;
1676
1677 /* Free some memory if needed (maxmemory setting) */
1678 if (server.maxmemory) freeMemoryIfNeeded();
1679
1680 /* Handle the multi bulk command type. This is an alternative protocol
1681 * supported by Redis in order to receive commands that are composed of
1682 * multiple binary-safe "bulk" arguments. The latency of processing is
1683 * a bit higher but this allows things like multi-sets, so if this
1684 * protocol is used only for MSET and similar commands this is a big win. */
1685 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1686 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1687 if (c->multibulk <= 0) {
1688 resetClient(c);
1689 return 1;
1690 } else {
1691 decrRefCount(c->argv[c->argc-1]);
1692 c->argc--;
1693 return 1;
1694 }
1695 } else if (c->multibulk) {
1696 if (c->bulklen == -1) {
1697 if (((char*)c->argv[0]->ptr)[0] != '$') {
1698 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1699 resetClient(c);
1700 return 1;
1701 } else {
1702 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1703 decrRefCount(c->argv[0]);
1704 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1705 c->argc--;
1706 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1707 resetClient(c);
1708 return 1;
1709 }
1710 c->argc--;
1711 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1712 return 1;
1713 }
1714 } else {
1715 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1716 c->mbargv[c->mbargc] = c->argv[0];
1717 c->mbargc++;
1718 c->argc--;
1719 c->multibulk--;
1720 if (c->multibulk == 0) {
1721 robj **auxargv;
1722 int auxargc;
1723
1724 /* Here we need to swap the multi-bulk argc/argv with the
1725 * normal argc/argv of the client structure. */
1726 auxargv = c->argv;
1727 c->argv = c->mbargv;
1728 c->mbargv = auxargv;
1729
1730 auxargc = c->argc;
1731 c->argc = c->mbargc;
1732 c->mbargc = auxargc;
1733
1734 /* We need to set bulklen to something different than -1
1735 * in order for the code below to process the command without
1736 * to try to read the last argument of a bulk command as
1737 * a special argument. */
1738 c->bulklen = 0;
1739 /* continue below and process the command */
1740 } else {
1741 c->bulklen = -1;
1742 return 1;
1743 }
1744 }
1745 }
1746 /* -- end of multi bulk commands processing -- */
1747
1748 /* The QUIT command is handled as a special case. Normal command
1749 * procs are unable to close the client connection safely */
1750 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1751 freeClient(c);
1752 return 0;
1753 }
1754 cmd = lookupCommand(c->argv[0]->ptr);
1755 if (!cmd) {
1756 addReplySds(c,
1757 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1758 (char*)c->argv[0]->ptr));
1759 resetClient(c);
1760 return 1;
1761 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1762 (c->argc < -cmd->arity)) {
1763 addReplySds(c,
1764 sdscatprintf(sdsempty(),
1765 "-ERR wrong number of arguments for '%s' command\r\n",
1766 cmd->name));
1767 resetClient(c);
1768 return 1;
1769 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1770 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1771 resetClient(c);
1772 return 1;
1773 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1774 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1775
1776 decrRefCount(c->argv[c->argc-1]);
1777 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1778 c->argc--;
1779 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1780 resetClient(c);
1781 return 1;
1782 }
1783 c->argc--;
1784 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1785 /* It is possible that the bulk read is already in the
1786 * buffer. Check this condition and handle it accordingly.
1787 * This is just a fast path, alternative to call processInputBuffer().
1788 * It's a good idea since the code is small and this condition
1789 * happens most of the times. */
1790 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1791 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1792 c->argc++;
1793 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1794 } else {
1795 return 1;
1796 }
1797 }
1798 /* Let's try to share objects on the command arguments vector */
1799 if (server.shareobjects) {
1800 int j;
1801 for(j = 1; j < c->argc; j++)
1802 c->argv[j] = tryObjectSharing(c->argv[j]);
1803 }
1804 /* Let's try to encode the bulk object to save space. */
1805 if (cmd->flags & REDIS_CMD_BULK)
1806 tryObjectEncoding(c->argv[c->argc-1]);
1807
1808 /* Check if the user is authenticated */
1809 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1810 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1811 resetClient(c);
1812 return 1;
1813 }
1814
1815 /* Exec the command */
1816 if (c->flags & REDIS_MULTI && cmd->proc != execCommand) {
1817 queueMultiCommand(c,cmd);
1818 addReply(c,shared.queued);
1819 } else {
1820 call(c,cmd);
1821 }
1822
1823 /* Prepare the client for the next command */
1824 if (c->flags & REDIS_CLOSE) {
1825 freeClient(c);
1826 return 0;
1827 }
1828 resetClient(c);
1829 return 1;
1830 }
1831
1832 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1833 listNode *ln;
1834 int outc = 0, j;
1835 robj **outv;
1836 /* (args*2)+1 is enough room for args, spaces, newlines */
1837 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1838
1839 if (argc <= REDIS_STATIC_ARGS) {
1840 outv = static_outv;
1841 } else {
1842 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1843 }
1844
1845 for (j = 0; j < argc; j++) {
1846 if (j != 0) outv[outc++] = shared.space;
1847 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1848 robj *lenobj;
1849
1850 lenobj = createObject(REDIS_STRING,
1851 sdscatprintf(sdsempty(),"%lu\r\n",
1852 (unsigned long) stringObjectLen(argv[j])));
1853 lenobj->refcount = 0;
1854 outv[outc++] = lenobj;
1855 }
1856 outv[outc++] = argv[j];
1857 }
1858 outv[outc++] = shared.crlf;
1859
1860 /* Increment all the refcounts at start and decrement at end in order to
1861 * be sure to free objects if there is no slave in a replication state
1862 * able to be feed with commands */
1863 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1864 listRewind(slaves);
1865 while((ln = listYield(slaves))) {
1866 redisClient *slave = ln->value;
1867
1868 /* Don't feed slaves that are still waiting for BGSAVE to start */
1869 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1870
1871 /* Feed all the other slaves, MONITORs and so on */
1872 if (slave->slaveseldb != dictid) {
1873 robj *selectcmd;
1874
1875 switch(dictid) {
1876 case 0: selectcmd = shared.select0; break;
1877 case 1: selectcmd = shared.select1; break;
1878 case 2: selectcmd = shared.select2; break;
1879 case 3: selectcmd = shared.select3; break;
1880 case 4: selectcmd = shared.select4; break;
1881 case 5: selectcmd = shared.select5; break;
1882 case 6: selectcmd = shared.select6; break;
1883 case 7: selectcmd = shared.select7; break;
1884 case 8: selectcmd = shared.select8; break;
1885 case 9: selectcmd = shared.select9; break;
1886 default:
1887 selectcmd = createObject(REDIS_STRING,
1888 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1889 selectcmd->refcount = 0;
1890 break;
1891 }
1892 addReply(slave,selectcmd);
1893 slave->slaveseldb = dictid;
1894 }
1895 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1896 }
1897 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1898 if (outv != static_outv) zfree(outv);
1899 }
1900
1901 static void processInputBuffer(redisClient *c) {
1902 again:
1903 if (c->bulklen == -1) {
1904 /* Read the first line of the query */
1905 char *p = strchr(c->querybuf,'\n');
1906 size_t querylen;
1907
1908 if (p) {
1909 sds query, *argv;
1910 int argc, j;
1911
1912 query = c->querybuf;
1913 c->querybuf = sdsempty();
1914 querylen = 1+(p-(query));
1915 if (sdslen(query) > querylen) {
1916 /* leave data after the first line of the query in the buffer */
1917 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1918 }
1919 *p = '\0'; /* remove "\n" */
1920 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1921 sdsupdatelen(query);
1922
1923 /* Now we can split the query in arguments */
1924 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1925 sdsfree(query);
1926
1927 if (c->argv) zfree(c->argv);
1928 c->argv = zmalloc(sizeof(robj*)*argc);
1929
1930 for (j = 0; j < argc; j++) {
1931 if (sdslen(argv[j])) {
1932 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1933 c->argc++;
1934 } else {
1935 sdsfree(argv[j]);
1936 }
1937 }
1938 zfree(argv);
1939 if (c->argc) {
1940 /* Execute the command. If the client is still valid
1941 * after processCommand() return and there is something
1942 * on the query buffer try to process the next command. */
1943 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1944 } else {
1945 /* Nothing to process, argc == 0. Just process the query
1946 * buffer if it's not empty or return to the caller */
1947 if (sdslen(c->querybuf)) goto again;
1948 }
1949 return;
1950 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1951 redisLog(REDIS_DEBUG, "Client protocol error");
1952 freeClient(c);
1953 return;
1954 }
1955 } else {
1956 /* Bulk read handling. Note that if we are at this point
1957 the client already sent a command terminated with a newline,
1958 we are reading the bulk data that is actually the last
1959 argument of the command. */
1960 int qbl = sdslen(c->querybuf);
1961
1962 if (c->bulklen <= qbl) {
1963 /* Copy everything but the final CRLF as final argument */
1964 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1965 c->argc++;
1966 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1967 /* Process the command. If the client is still valid after
1968 * the processing and there is more data in the buffer
1969 * try to parse it. */
1970 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1971 return;
1972 }
1973 }
1974 }
1975
1976 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1977 redisClient *c = (redisClient*) privdata;
1978 char buf[REDIS_IOBUF_LEN];
1979 int nread;
1980 REDIS_NOTUSED(el);
1981 REDIS_NOTUSED(mask);
1982
1983 nread = read(fd, buf, REDIS_IOBUF_LEN);
1984 if (nread == -1) {
1985 if (errno == EAGAIN) {
1986 nread = 0;
1987 } else {
1988 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1989 freeClient(c);
1990 return;
1991 }
1992 } else if (nread == 0) {
1993 redisLog(REDIS_DEBUG, "Client closed connection");
1994 freeClient(c);
1995 return;
1996 }
1997 if (nread) {
1998 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1999 c->lastinteraction = time(NULL);
2000 } else {
2001 return;
2002 }
2003 processInputBuffer(c);
2004 }
2005
2006 static int selectDb(redisClient *c, int id) {
2007 if (id < 0 || id >= server.dbnum)
2008 return REDIS_ERR;
2009 c->db = &server.db[id];
2010 return REDIS_OK;
2011 }
2012
2013 static void *dupClientReplyValue(void *o) {
2014 incrRefCount((robj*)o);
2015 return 0;
2016 }
2017
2018 static redisClient *createClient(int fd) {
2019 redisClient *c = zmalloc(sizeof(*c));
2020
2021 anetNonBlock(NULL,fd);
2022 anetTcpNoDelay(NULL,fd);
2023 if (!c) return NULL;
2024 selectDb(c,0);
2025 c->fd = fd;
2026 c->querybuf = sdsempty();
2027 c->argc = 0;
2028 c->argv = NULL;
2029 c->bulklen = -1;
2030 c->multibulk = 0;
2031 c->mbargc = 0;
2032 c->mbargv = NULL;
2033 c->sentlen = 0;
2034 c->flags = 0;
2035 c->lastinteraction = time(NULL);
2036 c->authenticated = 0;
2037 c->replstate = REDIS_REPL_NONE;
2038 c->reply = listCreate();
2039 listSetFreeMethod(c->reply,decrRefCount);
2040 listSetDupMethod(c->reply,dupClientReplyValue);
2041 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
2042 readQueryFromClient, c) == AE_ERR) {
2043 freeClient(c);
2044 return NULL;
2045 }
2046 listAddNodeTail(server.clients,c);
2047 initClientMultiState(c);
2048 return c;
2049 }
2050
2051 static void addReply(redisClient *c, robj *obj) {
2052 if (listLength(c->reply) == 0 &&
2053 (c->replstate == REDIS_REPL_NONE ||
2054 c->replstate == REDIS_REPL_ONLINE) &&
2055 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
2056 sendReplyToClient, c) == AE_ERR) return;
2057 listAddNodeTail(c->reply,getDecodedObject(obj));
2058 }
2059
2060 static void addReplySds(redisClient *c, sds s) {
2061 robj *o = createObject(REDIS_STRING,s);
2062 addReply(c,o);
2063 decrRefCount(o);
2064 }
2065
2066 static void addReplyDouble(redisClient *c, double d) {
2067 char buf[128];
2068
2069 snprintf(buf,sizeof(buf),"%.17g",d);
2070 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2071 (unsigned long) strlen(buf),buf));
2072 }
2073
2074 static void addReplyBulkLen(redisClient *c, robj *obj) {
2075 size_t len;
2076
2077 if (obj->encoding == REDIS_ENCODING_RAW) {
2078 len = sdslen(obj->ptr);
2079 } else {
2080 long n = (long)obj->ptr;
2081
2082 /* Compute how many bytes will take this integer as a radix 10 string */
2083 len = 1;
2084 if (n < 0) {
2085 len++;
2086 n = -n;
2087 }
2088 while((n = n/10) != 0) {
2089 len++;
2090 }
2091 }
2092 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
2093 }
2094
2095 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2096 int cport, cfd;
2097 char cip[128];
2098 redisClient *c;
2099 REDIS_NOTUSED(el);
2100 REDIS_NOTUSED(mask);
2101 REDIS_NOTUSED(privdata);
2102
2103 cfd = anetAccept(server.neterr, fd, cip, &cport);
2104 if (cfd == AE_ERR) {
2105 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2106 return;
2107 }
2108 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2109 if ((c = createClient(cfd)) == NULL) {
2110 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2111 close(cfd); /* May be already closed, just ingore errors */
2112 return;
2113 }
2114 /* If maxclient directive is set and this is one client more... close the
2115 * connection. Note that we create the client instead to check before
2116 * for this condition, since now the socket is already set in nonblocking
2117 * mode and we can send an error for free using the Kernel I/O */
2118 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2119 char *err = "-ERR max number of clients reached\r\n";
2120
2121 /* That's a best effort error message, don't check write errors */
2122 if (write(c->fd,err,strlen(err)) == -1) {
2123 /* Nothing to do, Just to avoid the warning... */
2124 }
2125 freeClient(c);
2126 return;
2127 }
2128 server.stat_numconnections++;
2129 }
2130
2131 /* ======================= Redis objects implementation ===================== */
2132
2133 static robj *createObject(int type, void *ptr) {
2134 robj *o;
2135
2136 if (listLength(server.objfreelist)) {
2137 listNode *head = listFirst(server.objfreelist);
2138 o = listNodeValue(head);
2139 listDelNode(server.objfreelist,head);
2140 } else {
2141 o = zmalloc(sizeof(*o));
2142 }
2143 o->type = type;
2144 o->encoding = REDIS_ENCODING_RAW;
2145 o->ptr = ptr;
2146 o->refcount = 1;
2147 return o;
2148 }
2149
2150 static robj *createStringObject(char *ptr, size_t len) {
2151 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2152 }
2153
2154 static robj *createListObject(void) {
2155 list *l = listCreate();
2156
2157 listSetFreeMethod(l,decrRefCount);
2158 return createObject(REDIS_LIST,l);
2159 }
2160
2161 static robj *createSetObject(void) {
2162 dict *d = dictCreate(&setDictType,NULL);
2163 return createObject(REDIS_SET,d);
2164 }
2165
2166 static robj *createZsetObject(void) {
2167 zset *zs = zmalloc(sizeof(*zs));
2168
2169 zs->dict = dictCreate(&zsetDictType,NULL);
2170 zs->zsl = zslCreate();
2171 return createObject(REDIS_ZSET,zs);
2172 }
2173
2174 static void freeStringObject(robj *o) {
2175 if (o->encoding == REDIS_ENCODING_RAW) {
2176 sdsfree(o->ptr);
2177 }
2178 }
2179
2180 static void freeListObject(robj *o) {
2181 listRelease((list*) o->ptr);
2182 }
2183
2184 static void freeSetObject(robj *o) {
2185 dictRelease((dict*) o->ptr);
2186 }
2187
2188 static void freeZsetObject(robj *o) {
2189 zset *zs = o->ptr;
2190
2191 dictRelease(zs->dict);
2192 zslFree(zs->zsl);
2193 zfree(zs);
2194 }
2195
2196 static void freeHashObject(robj *o) {
2197 dictRelease((dict*) o->ptr);
2198 }
2199
2200 static void incrRefCount(robj *o) {
2201 o->refcount++;
2202 #ifdef DEBUG_REFCOUNT
2203 if (o->type == REDIS_STRING)
2204 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2205 #endif
2206 }
2207
2208 static void decrRefCount(void *obj) {
2209 robj *o = obj;
2210
2211 #ifdef DEBUG_REFCOUNT
2212 if (o->type == REDIS_STRING)
2213 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2214 #endif
2215 if (--(o->refcount) == 0) {
2216 switch(o->type) {
2217 case REDIS_STRING: freeStringObject(o); break;
2218 case REDIS_LIST: freeListObject(o); break;
2219 case REDIS_SET: freeSetObject(o); break;
2220 case REDIS_ZSET: freeZsetObject(o); break;
2221 case REDIS_HASH: freeHashObject(o); break;
2222 default: redisAssert(0 != 0); break;
2223 }
2224 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2225 !listAddNodeHead(server.objfreelist,o))
2226 zfree(o);
2227 }
2228 }
2229
2230 static robj *lookupKey(redisDb *db, robj *key) {
2231 dictEntry *de = dictFind(db->dict,key);
2232 return de ? dictGetEntryVal(de) : NULL;
2233 }
2234
2235 static robj *lookupKeyRead(redisDb *db, robj *key) {
2236 expireIfNeeded(db,key);
2237 return lookupKey(db,key);
2238 }
2239
2240 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2241 deleteIfVolatile(db,key);
2242 return lookupKey(db,key);
2243 }
2244
2245 static int deleteKey(redisDb *db, robj *key) {
2246 int retval;
2247
2248 /* We need to protect key from destruction: after the first dictDelete()
2249 * it may happen that 'key' is no longer valid if we don't increment
2250 * it's count. This may happen when we get the object reference directly
2251 * from the hash table with dictRandomKey() or dict iterators */
2252 incrRefCount(key);
2253 if (dictSize(db->expires)) dictDelete(db->expires,key);
2254 retval = dictDelete(db->dict,key);
2255 decrRefCount(key);
2256
2257 return retval == DICT_OK;
2258 }
2259
2260 /* Try to share an object against the shared objects pool */
2261 static robj *tryObjectSharing(robj *o) {
2262 struct dictEntry *de;
2263 unsigned long c;
2264
2265 if (o == NULL || server.shareobjects == 0) return o;
2266
2267 redisAssert(o->type == REDIS_STRING);
2268 de = dictFind(server.sharingpool,o);
2269 if (de) {
2270 robj *shared = dictGetEntryKey(de);
2271
2272 c = ((unsigned long) dictGetEntryVal(de))+1;
2273 dictGetEntryVal(de) = (void*) c;
2274 incrRefCount(shared);
2275 decrRefCount(o);
2276 return shared;
2277 } else {
2278 /* Here we are using a stream algorihtm: Every time an object is
2279 * shared we increment its count, everytime there is a miss we
2280 * recrement the counter of a random object. If this object reaches
2281 * zero we remove the object and put the current object instead. */
2282 if (dictSize(server.sharingpool) >=
2283 server.sharingpoolsize) {
2284 de = dictGetRandomKey(server.sharingpool);
2285 redisAssert(de != NULL);
2286 c = ((unsigned long) dictGetEntryVal(de))-1;
2287 dictGetEntryVal(de) = (void*) c;
2288 if (c == 0) {
2289 dictDelete(server.sharingpool,de->key);
2290 }
2291 } else {
2292 c = 0; /* If the pool is empty we want to add this object */
2293 }
2294 if (c == 0) {
2295 int retval;
2296
2297 retval = dictAdd(server.sharingpool,o,(void*)1);
2298 redisAssert(retval == DICT_OK);
2299 incrRefCount(o);
2300 }
2301 return o;
2302 }
2303 }
2304
2305 /* Check if the nul-terminated string 's' can be represented by a long
2306 * (that is, is a number that fits into long without any other space or
2307 * character before or after the digits).
2308 *
2309 * If so, the function returns REDIS_OK and *longval is set to the value
2310 * of the number. Otherwise REDIS_ERR is returned */
2311 static int isStringRepresentableAsLong(sds s, long *longval) {
2312 char buf[32], *endptr;
2313 long value;
2314 int slen;
2315
2316 value = strtol(s, &endptr, 10);
2317 if (endptr[0] != '\0') return REDIS_ERR;
2318 slen = snprintf(buf,32,"%ld",value);
2319
2320 /* If the number converted back into a string is not identical
2321 * then it's not possible to encode the string as integer */
2322 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2323 if (longval) *longval = value;
2324 return REDIS_OK;
2325 }
2326
2327 /* Try to encode a string object in order to save space */
2328 static int tryObjectEncoding(robj *o) {
2329 long value;
2330 sds s = o->ptr;
2331
2332 if (o->encoding != REDIS_ENCODING_RAW)
2333 return REDIS_ERR; /* Already encoded */
2334
2335 /* It's not save to encode shared objects: shared objects can be shared
2336 * everywhere in the "object space" of Redis. Encoded objects can only
2337 * appear as "values" (and not, for instance, as keys) */
2338 if (o->refcount > 1) return REDIS_ERR;
2339
2340 /* Currently we try to encode only strings */
2341 redisAssert(o->type == REDIS_STRING);
2342
2343 /* Check if we can represent this string as a long integer */
2344 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2345
2346 /* Ok, this object can be encoded */
2347 o->encoding = REDIS_ENCODING_INT;
2348 sdsfree(o->ptr);
2349 o->ptr = (void*) value;
2350 return REDIS_OK;
2351 }
2352
2353 /* Get a decoded version of an encoded object (returned as a new object).
2354 * If the object is already raw-encoded just increment the ref count. */
2355 static robj *getDecodedObject(robj *o) {
2356 robj *dec;
2357
2358 if (o->encoding == REDIS_ENCODING_RAW) {
2359 incrRefCount(o);
2360 return o;
2361 }
2362 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2363 char buf[32];
2364
2365 snprintf(buf,32,"%ld",(long)o->ptr);
2366 dec = createStringObject(buf,strlen(buf));
2367 return dec;
2368 } else {
2369 redisAssert(1 != 1);
2370 }
2371 }
2372
2373 /* Compare two string objects via strcmp() or alike.
2374 * Note that the objects may be integer-encoded. In such a case we
2375 * use snprintf() to get a string representation of the numbers on the stack
2376 * and compare the strings, it's much faster than calling getDecodedObject().
2377 *
2378 * Important note: if objects are not integer encoded, but binary-safe strings,
2379 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2380 * binary safe. */
2381 static int compareStringObjects(robj *a, robj *b) {
2382 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2383 char bufa[128], bufb[128], *astr, *bstr;
2384 int bothsds = 1;
2385
2386 if (a == b) return 0;
2387 if (a->encoding != REDIS_ENCODING_RAW) {
2388 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2389 astr = bufa;
2390 bothsds = 0;
2391 } else {
2392 astr = a->ptr;
2393 }
2394 if (b->encoding != REDIS_ENCODING_RAW) {
2395 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2396 bstr = bufb;
2397 bothsds = 0;
2398 } else {
2399 bstr = b->ptr;
2400 }
2401 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2402 }
2403
2404 static size_t stringObjectLen(robj *o) {
2405 redisAssert(o->type == REDIS_STRING);
2406 if (o->encoding == REDIS_ENCODING_RAW) {
2407 return sdslen(o->ptr);
2408 } else {
2409 char buf[32];
2410
2411 return snprintf(buf,32,"%ld",(long)o->ptr);
2412 }
2413 }
2414
2415 /*============================ DB saving/loading ============================ */
2416
2417 static int rdbSaveType(FILE *fp, unsigned char type) {
2418 if (fwrite(&type,1,1,fp) == 0) return -1;
2419 return 0;
2420 }
2421
2422 static int rdbSaveTime(FILE *fp, time_t t) {
2423 int32_t t32 = (int32_t) t;
2424 if (fwrite(&t32,4,1,fp) == 0) return -1;
2425 return 0;
2426 }
2427
2428 /* check rdbLoadLen() comments for more info */
2429 static int rdbSaveLen(FILE *fp, uint32_t len) {
2430 unsigned char buf[2];
2431
2432 if (len < (1<<6)) {
2433 /* Save a 6 bit len */
2434 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2435 if (fwrite(buf,1,1,fp) == 0) return -1;
2436 } else if (len < (1<<14)) {
2437 /* Save a 14 bit len */
2438 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2439 buf[1] = len&0xFF;
2440 if (fwrite(buf,2,1,fp) == 0) return -1;
2441 } else {
2442 /* Save a 32 bit len */
2443 buf[0] = (REDIS_RDB_32BITLEN<<6);
2444 if (fwrite(buf,1,1,fp) == 0) return -1;
2445 len = htonl(len);
2446 if (fwrite(&len,4,1,fp) == 0) return -1;
2447 }
2448 return 0;
2449 }
2450
2451 /* String objects in the form "2391" "-100" without any space and with a
2452 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2453 * encoded as integers to save space */
2454 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2455 long long value;
2456 char *endptr, buf[32];
2457
2458 /* Check if it's possible to encode this value as a number */
2459 value = strtoll(s, &endptr, 10);
2460 if (endptr[0] != '\0') return 0;
2461 snprintf(buf,32,"%lld",value);
2462
2463 /* If the number converted back into a string is not identical
2464 * then it's not possible to encode the string as integer */
2465 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2466
2467 /* Finally check if it fits in our ranges */
2468 if (value >= -(1<<7) && value <= (1<<7)-1) {
2469 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2470 enc[1] = value&0xFF;
2471 return 2;
2472 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2473 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2474 enc[1] = value&0xFF;
2475 enc[2] = (value>>8)&0xFF;
2476 return 3;
2477 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2478 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2479 enc[1] = value&0xFF;
2480 enc[2] = (value>>8)&0xFF;
2481 enc[3] = (value>>16)&0xFF;
2482 enc[4] = (value>>24)&0xFF;
2483 return 5;
2484 } else {
2485 return 0;
2486 }
2487 }
2488
2489 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2490 unsigned int comprlen, outlen;
2491 unsigned char byte;
2492 void *out;
2493
2494 /* We require at least four bytes compression for this to be worth it */
2495 outlen = sdslen(obj->ptr)-4;
2496 if (outlen <= 0) return 0;
2497 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2498 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2499 if (comprlen == 0) {
2500 zfree(out);
2501 return 0;
2502 }
2503 /* Data compressed! Let's save it on disk */
2504 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2505 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2506 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2507 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2508 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2509 zfree(out);
2510 return comprlen;
2511
2512 writeerr:
2513 zfree(out);
2514 return -1;
2515 }
2516
2517 /* Save a string objet as [len][data] on disk. If the object is a string
2518 * representation of an integer value we try to safe it in a special form */
2519 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2520 size_t len;
2521 int enclen;
2522
2523 len = sdslen(obj->ptr);
2524
2525 /* Try integer encoding */
2526 if (len <= 11) {
2527 unsigned char buf[5];
2528 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2529 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2530 return 0;
2531 }
2532 }
2533
2534 /* Try LZF compression - under 20 bytes it's unable to compress even
2535 * aaaaaaaaaaaaaaaaaa so skip it */
2536 if (server.rdbcompression && len > 20) {
2537 int retval;
2538
2539 retval = rdbSaveLzfStringObject(fp,obj);
2540 if (retval == -1) return -1;
2541 if (retval > 0) return 0;
2542 /* retval == 0 means data can't be compressed, save the old way */
2543 }
2544
2545 /* Store verbatim */
2546 if (rdbSaveLen(fp,len) == -1) return -1;
2547 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2548 return 0;
2549 }
2550
2551 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2552 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2553 int retval;
2554
2555 obj = getDecodedObject(obj);
2556 retval = rdbSaveStringObjectRaw(fp,obj);
2557 decrRefCount(obj);
2558 return retval;
2559 }
2560
2561 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2562 * 8 bit integer specifing the length of the representation.
2563 * This 8 bit integer has special values in order to specify the following
2564 * conditions:
2565 * 253: not a number
2566 * 254: + inf
2567 * 255: - inf
2568 */
2569 static int rdbSaveDoubleValue(FILE *fp, double val) {
2570 unsigned char buf[128];
2571 int len;
2572
2573 if (isnan(val)) {
2574 buf[0] = 253;
2575 len = 1;
2576 } else if (!isfinite(val)) {
2577 len = 1;
2578 buf[0] = (val < 0) ? 255 : 254;
2579 } else {
2580 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2581 buf[0] = strlen((char*)buf+1);
2582 len = buf[0]+1;
2583 }
2584 if (fwrite(buf,len,1,fp) == 0) return -1;
2585 return 0;
2586 }
2587
2588 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2589 static int rdbSave(char *filename) {
2590 dictIterator *di = NULL;
2591 dictEntry *de;
2592 FILE *fp;
2593 char tmpfile[256];
2594 int j;
2595 time_t now = time(NULL);
2596
2597 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2598 fp = fopen(tmpfile,"w");
2599 if (!fp) {
2600 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2601 return REDIS_ERR;
2602 }
2603 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2604 for (j = 0; j < server.dbnum; j++) {
2605 redisDb *db = server.db+j;
2606 dict *d = db->dict;
2607 if (dictSize(d) == 0) continue;
2608 di = dictGetIterator(d);
2609 if (!di) {
2610 fclose(fp);
2611 return REDIS_ERR;
2612 }
2613
2614 /* Write the SELECT DB opcode */
2615 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2616 if (rdbSaveLen(fp,j) == -1) goto werr;
2617
2618 /* Iterate this DB writing every entry */
2619 while((de = dictNext(di)) != NULL) {
2620 robj *key = dictGetEntryKey(de);
2621 robj *o = dictGetEntryVal(de);
2622 time_t expiretime = getExpire(db,key);
2623
2624 /* Save the expire time */
2625 if (expiretime != -1) {
2626 /* If this key is already expired skip it */
2627 if (expiretime < now) continue;
2628 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2629 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2630 }
2631 /* Save the key and associated value */
2632 if (rdbSaveType(fp,o->type) == -1) goto werr;
2633 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2634 if (o->type == REDIS_STRING) {
2635 /* Save a string value */
2636 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2637 } else if (o->type == REDIS_LIST) {
2638 /* Save a list value */
2639 list *list = o->ptr;
2640 listNode *ln;
2641
2642 listRewind(list);
2643 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2644 while((ln = listYield(list))) {
2645 robj *eleobj = listNodeValue(ln);
2646
2647 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2648 }
2649 } else if (o->type == REDIS_SET) {
2650 /* Save a set value */
2651 dict *set = o->ptr;
2652 dictIterator *di = dictGetIterator(set);
2653 dictEntry *de;
2654
2655 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2656 while((de = dictNext(di)) != NULL) {
2657 robj *eleobj = dictGetEntryKey(de);
2658
2659 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2660 }
2661 dictReleaseIterator(di);
2662 } else if (o->type == REDIS_ZSET) {
2663 /* Save a set value */
2664 zset *zs = o->ptr;
2665 dictIterator *di = dictGetIterator(zs->dict);
2666 dictEntry *de;
2667
2668 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2669 while((de = dictNext(di)) != NULL) {
2670 robj *eleobj = dictGetEntryKey(de);
2671 double *score = dictGetEntryVal(de);
2672
2673 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2674 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2675 }
2676 dictReleaseIterator(di);
2677 } else {
2678 redisAssert(0 != 0);
2679 }
2680 }
2681 dictReleaseIterator(di);
2682 }
2683 /* EOF opcode */
2684 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2685
2686 /* Make sure data will not remain on the OS's output buffers */
2687 fflush(fp);
2688 fsync(fileno(fp));
2689 fclose(fp);
2690
2691 /* Use RENAME to make sure the DB file is changed atomically only
2692 * if the generate DB file is ok. */
2693 if (rename(tmpfile,filename) == -1) {
2694 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2695 unlink(tmpfile);
2696 return REDIS_ERR;
2697 }
2698 redisLog(REDIS_NOTICE,"DB saved on disk");
2699 server.dirty = 0;
2700 server.lastsave = time(NULL);
2701 return REDIS_OK;
2702
2703 werr:
2704 fclose(fp);
2705 unlink(tmpfile);
2706 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2707 if (di) dictReleaseIterator(di);
2708 return REDIS_ERR;
2709 }
2710
2711 static int rdbSaveBackground(char *filename) {
2712 pid_t childpid;
2713
2714 if (server.bgsavechildpid != -1) return REDIS_ERR;
2715 if ((childpid = fork()) == 0) {
2716 /* Child */
2717 close(server.fd);
2718 if (rdbSave(filename) == REDIS_OK) {
2719 exit(0);
2720 } else {
2721 exit(1);
2722 }
2723 } else {
2724 /* Parent */
2725 if (childpid == -1) {
2726 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2727 strerror(errno));
2728 return REDIS_ERR;
2729 }
2730 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2731 server.bgsavechildpid = childpid;
2732 return REDIS_OK;
2733 }
2734 return REDIS_OK; /* unreached */
2735 }
2736
2737 static void rdbRemoveTempFile(pid_t childpid) {
2738 char tmpfile[256];
2739
2740 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2741 unlink(tmpfile);
2742 }
2743
2744 static int rdbLoadType(FILE *fp) {
2745 unsigned char type;
2746 if (fread(&type,1,1,fp) == 0) return -1;
2747 return type;
2748 }
2749
2750 static time_t rdbLoadTime(FILE *fp) {
2751 int32_t t32;
2752 if (fread(&t32,4,1,fp) == 0) return -1;
2753 return (time_t) t32;
2754 }
2755
2756 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2757 * of this file for a description of how this are stored on disk.
2758 *
2759 * isencoded is set to 1 if the readed length is not actually a length but
2760 * an "encoding type", check the above comments for more info */
2761 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2762 unsigned char buf[2];
2763 uint32_t len;
2764
2765 if (isencoded) *isencoded = 0;
2766 if (rdbver == 0) {
2767 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2768 return ntohl(len);
2769 } else {
2770 int type;
2771
2772 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2773 type = (buf[0]&0xC0)>>6;
2774 if (type == REDIS_RDB_6BITLEN) {
2775 /* Read a 6 bit len */
2776 return buf[0]&0x3F;
2777 } else if (type == REDIS_RDB_ENCVAL) {
2778 /* Read a 6 bit len encoding type */
2779 if (isencoded) *isencoded = 1;
2780 return buf[0]&0x3F;
2781 } else if (type == REDIS_RDB_14BITLEN) {
2782 /* Read a 14 bit len */
2783 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2784 return ((buf[0]&0x3F)<<8)|buf[1];
2785 } else {
2786 /* Read a 32 bit len */
2787 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2788 return ntohl(len);
2789 }
2790 }
2791 }
2792
2793 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2794 unsigned char enc[4];
2795 long long val;
2796
2797 if (enctype == REDIS_RDB_ENC_INT8) {
2798 if (fread(enc,1,1,fp) == 0) return NULL;
2799 val = (signed char)enc[0];
2800 } else if (enctype == REDIS_RDB_ENC_INT16) {
2801 uint16_t v;
2802 if (fread(enc,2,1,fp) == 0) return NULL;
2803 v = enc[0]|(enc[1]<<8);
2804 val = (int16_t)v;
2805 } else if (enctype == REDIS_RDB_ENC_INT32) {
2806 uint32_t v;
2807 if (fread(enc,4,1,fp) == 0) return NULL;
2808 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2809 val = (int32_t)v;
2810 } else {
2811 val = 0; /* anti-warning */
2812 redisAssert(0!=0);
2813 }
2814 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2815 }
2816
2817 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2818 unsigned int len, clen;
2819 unsigned char *c = NULL;
2820 sds val = NULL;
2821
2822 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2823 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2824 if ((c = zmalloc(clen)) == NULL) goto err;
2825 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2826 if (fread(c,clen,1,fp) == 0) goto err;
2827 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2828 zfree(c);
2829 return createObject(REDIS_STRING,val);
2830 err:
2831 zfree(c);
2832 sdsfree(val);
2833 return NULL;
2834 }
2835
2836 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2837 int isencoded;
2838 uint32_t len;
2839 sds val;
2840
2841 len = rdbLoadLen(fp,rdbver,&isencoded);
2842 if (isencoded) {
2843 switch(len) {
2844 case REDIS_RDB_ENC_INT8:
2845 case REDIS_RDB_ENC_INT16:
2846 case REDIS_RDB_ENC_INT32:
2847 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2848 case REDIS_RDB_ENC_LZF:
2849 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2850 default:
2851 redisAssert(0!=0);
2852 }
2853 }
2854
2855 if (len == REDIS_RDB_LENERR) return NULL;
2856 val = sdsnewlen(NULL,len);
2857 if (len && fread(val,len,1,fp) == 0) {
2858 sdsfree(val);
2859 return NULL;
2860 }
2861 return tryObjectSharing(createObject(REDIS_STRING,val));
2862 }
2863
2864 /* For information about double serialization check rdbSaveDoubleValue() */
2865 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2866 char buf[128];
2867 unsigned char len;
2868
2869 if (fread(&len,1,1,fp) == 0) return -1;
2870 switch(len) {
2871 case 255: *val = R_NegInf; return 0;
2872 case 254: *val = R_PosInf; return 0;
2873 case 253: *val = R_Nan; return 0;
2874 default:
2875 if (fread(buf,len,1,fp) == 0) return -1;
2876 buf[len] = '\0';
2877 sscanf(buf, "%lg", val);
2878 return 0;
2879 }
2880 }
2881
2882 static int rdbLoad(char *filename) {
2883 FILE *fp;
2884 robj *keyobj = NULL;
2885 uint32_t dbid;
2886 int type, retval, rdbver;
2887 dict *d = server.db[0].dict;
2888 redisDb *db = server.db+0;
2889 char buf[1024];
2890 time_t expiretime = -1, now = time(NULL);
2891
2892 fp = fopen(filename,"r");
2893 if (!fp) return REDIS_ERR;
2894 if (fread(buf,9,1,fp) == 0) goto eoferr;
2895 buf[9] = '\0';
2896 if (memcmp(buf,"REDIS",5) != 0) {
2897 fclose(fp);
2898 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2899 return REDIS_ERR;
2900 }
2901 rdbver = atoi(buf+5);
2902 if (rdbver > 1) {
2903 fclose(fp);
2904 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2905 return REDIS_ERR;
2906 }
2907 while(1) {
2908 robj *o;
2909
2910 /* Read type. */
2911 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2912 if (type == REDIS_EXPIRETIME) {
2913 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2914 /* We read the time so we need to read the object type again */
2915 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2916 }
2917 if (type == REDIS_EOF) break;
2918 /* Handle SELECT DB opcode as a special case */
2919 if (type == REDIS_SELECTDB) {
2920 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2921 goto eoferr;
2922 if (dbid >= (unsigned)server.dbnum) {
2923 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2924 exit(1);
2925 }
2926 db = server.db+dbid;
2927 d = db->dict;
2928 continue;
2929 }
2930 /* Read key */
2931 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2932
2933 if (type == REDIS_STRING) {
2934 /* Read string value */
2935 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2936 tryObjectEncoding(o);
2937 } else if (type == REDIS_LIST || type == REDIS_SET) {
2938 /* Read list/set value */
2939 uint32_t listlen;
2940
2941 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2942 goto eoferr;
2943 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2944 /* Load every single element of the list/set */
2945 while(listlen--) {
2946 robj *ele;
2947
2948 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2949 tryObjectEncoding(ele);
2950 if (type == REDIS_LIST) {
2951 listAddNodeTail((list*)o->ptr,ele);
2952 } else {
2953 dictAdd((dict*)o->ptr,ele,NULL);
2954 }
2955 }
2956 } else if (type == REDIS_ZSET) {
2957 /* Read list/set value */
2958 uint32_t zsetlen;
2959 zset *zs;
2960
2961 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2962 goto eoferr;
2963 o = createZsetObject();
2964 zs = o->ptr;
2965 /* Load every single element of the list/set */
2966 while(zsetlen--) {
2967 robj *ele;
2968 double *score = zmalloc(sizeof(double));
2969
2970 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2971 tryObjectEncoding(ele);
2972 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2973 dictAdd(zs->dict,ele,score);
2974 zslInsert(zs->zsl,*score,ele);
2975 incrRefCount(ele); /* added to skiplist */
2976 }
2977 } else {
2978 redisAssert(0 != 0);
2979 }
2980 /* Add the new object in the hash table */
2981 retval = dictAdd(d,keyobj,o);
2982 if (retval == DICT_ERR) {
2983 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2984 exit(1);
2985 }
2986 /* Set the expire time if needed */
2987 if (expiretime != -1) {
2988 setExpire(db,keyobj,expiretime);
2989 /* Delete this key if already expired */
2990 if (expiretime < now) deleteKey(db,keyobj);
2991 expiretime = -1;
2992 }
2993 keyobj = o = NULL;
2994 }
2995 fclose(fp);
2996 return REDIS_OK;
2997
2998 eoferr: /* unexpected end of file is handled here with a fatal exit */
2999 if (keyobj) decrRefCount(keyobj);
3000 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3001 exit(1);
3002 return REDIS_ERR; /* Just to avoid warning */
3003 }
3004
3005 /*================================== Commands =============================== */
3006
3007 static void authCommand(redisClient *c) {
3008 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
3009 c->authenticated = 1;
3010 addReply(c,shared.ok);
3011 } else {
3012 c->authenticated = 0;
3013 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3014 }
3015 }
3016
3017 static void pingCommand(redisClient *c) {
3018 addReply(c,shared.pong);
3019 }
3020
3021 static void echoCommand(redisClient *c) {
3022 addReplyBulkLen(c,c->argv[1]);
3023 addReply(c,c->argv[1]);
3024 addReply(c,shared.crlf);
3025 }
3026
3027 /*=================================== Strings =============================== */
3028
3029 static void setGenericCommand(redisClient *c, int nx) {
3030 int retval;
3031
3032 if (nx) deleteIfVolatile(c->db,c->argv[1]);
3033 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
3034 if (retval == DICT_ERR) {
3035 if (!nx) {
3036 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3037 incrRefCount(c->argv[2]);
3038 } else {
3039 addReply(c,shared.czero);
3040 return;
3041 }
3042 } else {
3043 incrRefCount(c->argv[1]);
3044 incrRefCount(c->argv[2]);
3045 }
3046 server.dirty++;
3047 removeExpire(c->db,c->argv[1]);
3048 addReply(c, nx ? shared.cone : shared.ok);
3049 }
3050
3051 static void setCommand(redisClient *c) {
3052 setGenericCommand(c,0);
3053 }
3054
3055 static void setnxCommand(redisClient *c) {
3056 setGenericCommand(c,1);
3057 }
3058
3059 static int getGenericCommand(redisClient *c) {
3060 robj *o = lookupKeyRead(c->db,c->argv[1]);
3061
3062 if (o == NULL) {
3063 addReply(c,shared.nullbulk);
3064 return REDIS_OK;
3065 } else {
3066 if (o->type != REDIS_STRING) {
3067 addReply(c,shared.wrongtypeerr);
3068 return REDIS_ERR;
3069 } else {
3070 addReplyBulkLen(c,o);
3071 addReply(c,o);
3072 addReply(c,shared.crlf);
3073 return REDIS_OK;
3074 }
3075 }
3076 }
3077
3078 static void getCommand(redisClient *c) {
3079 getGenericCommand(c);
3080 }
3081
3082 static void getsetCommand(redisClient *c) {
3083 if (getGenericCommand(c) == REDIS_ERR) return;
3084 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3085 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3086 } else {
3087 incrRefCount(c->argv[1]);
3088 }
3089 incrRefCount(c->argv[2]);
3090 server.dirty++;
3091 removeExpire(c->db,c->argv[1]);
3092 }
3093
3094 static void mgetCommand(redisClient *c) {
3095 int j;
3096
3097 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3098 for (j = 1; j < c->argc; j++) {
3099 robj *o = lookupKeyRead(c->db,c->argv[j]);
3100 if (o == NULL) {
3101 addReply(c,shared.nullbulk);
3102 } else {
3103 if (o->type != REDIS_STRING) {
3104 addReply(c,shared.nullbulk);
3105 } else {
3106 addReplyBulkLen(c,o);
3107 addReply(c,o);
3108 addReply(c,shared.crlf);
3109 }
3110 }
3111 }
3112 }
3113
3114 static void msetGenericCommand(redisClient *c, int nx) {
3115 int j, busykeys = 0;
3116
3117 if ((c->argc % 2) == 0) {
3118 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3119 return;
3120 }
3121 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3122 * set nothing at all if at least one already key exists. */
3123 if (nx) {
3124 for (j = 1; j < c->argc; j += 2) {
3125 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3126 busykeys++;
3127 }
3128 }
3129 }
3130 if (busykeys) {
3131 addReply(c, shared.czero);
3132 return;
3133 }
3134
3135 for (j = 1; j < c->argc; j += 2) {
3136 int retval;
3137
3138 tryObjectEncoding(c->argv[j+1]);
3139 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3140 if (retval == DICT_ERR) {
3141 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3142 incrRefCount(c->argv[j+1]);
3143 } else {
3144 incrRefCount(c->argv[j]);
3145 incrRefCount(c->argv[j+1]);
3146 }
3147 removeExpire(c->db,c->argv[j]);
3148 }
3149 server.dirty += (c->argc-1)/2;
3150 addReply(c, nx ? shared.cone : shared.ok);
3151 }
3152
3153 static void msetCommand(redisClient *c) {
3154 msetGenericCommand(c,0);
3155 }
3156
3157 static void msetnxCommand(redisClient *c) {
3158 msetGenericCommand(c,1);
3159 }
3160
3161 static void incrDecrCommand(redisClient *c, long long incr) {
3162 long long value;
3163 int retval;
3164 robj *o;
3165
3166 o = lookupKeyWrite(c->db,c->argv[1]);
3167 if (o == NULL) {
3168 value = 0;
3169 } else {
3170 if (o->type != REDIS_STRING) {
3171 value = 0;
3172 } else {
3173 char *eptr;
3174
3175 if (o->encoding == REDIS_ENCODING_RAW)
3176 value = strtoll(o->ptr, &eptr, 10);
3177 else if (o->encoding == REDIS_ENCODING_INT)
3178 value = (long)o->ptr;
3179 else
3180 redisAssert(1 != 1);
3181 }
3182 }
3183
3184 value += incr;
3185 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3186 tryObjectEncoding(o);
3187 retval = dictAdd(c->db->dict,c->argv[1],o);
3188 if (retval == DICT_ERR) {
3189 dictReplace(c->db->dict,c->argv[1],o);
3190 removeExpire(c->db,c->argv[1]);
3191 } else {
3192 incrRefCount(c->argv[1]);
3193 }
3194 server.dirty++;
3195 addReply(c,shared.colon);
3196 addReply(c,o);
3197 addReply(c,shared.crlf);
3198 }
3199
3200 static void incrCommand(redisClient *c) {
3201 incrDecrCommand(c,1);
3202 }
3203
3204 static void decrCommand(redisClient *c) {
3205 incrDecrCommand(c,-1);
3206 }
3207
3208 static void incrbyCommand(redisClient *c) {
3209 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3210 incrDecrCommand(c,incr);
3211 }
3212
3213 static void decrbyCommand(redisClient *c) {
3214 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3215 incrDecrCommand(c,-incr);
3216 }
3217
3218 /* ========================= Type agnostic commands ========================= */
3219
3220 static void delCommand(redisClient *c) {
3221 int deleted = 0, j;
3222
3223 for (j = 1; j < c->argc; j++) {
3224 if (deleteKey(c->db,c->argv[j])) {
3225 server.dirty++;
3226 deleted++;
3227 }
3228 }
3229 switch(deleted) {
3230 case 0:
3231 addReply(c,shared.czero);
3232 break;
3233 case 1:
3234 addReply(c,shared.cone);
3235 break;
3236 default:
3237 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3238 break;
3239 }
3240 }
3241
3242 static void existsCommand(redisClient *c) {
3243 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3244 }
3245
3246 static void selectCommand(redisClient *c) {
3247 int id = atoi(c->argv[1]->ptr);
3248
3249 if (selectDb(c,id) == REDIS_ERR) {
3250 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3251 } else {
3252 addReply(c,shared.ok);
3253 }
3254 }
3255
3256 static void randomkeyCommand(redisClient *c) {
3257 dictEntry *de;
3258
3259 while(1) {
3260 de = dictGetRandomKey(c->db->dict);
3261 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3262 }
3263 if (de == NULL) {
3264 addReply(c,shared.plus);
3265 addReply(c,shared.crlf);
3266 } else {
3267 addReply(c,shared.plus);
3268 addReply(c,dictGetEntryKey(de));
3269 addReply(c,shared.crlf);
3270 }
3271 }
3272
3273 static void keysCommand(redisClient *c) {
3274 dictIterator *di;
3275 dictEntry *de;
3276 sds pattern = c->argv[1]->ptr;
3277 int plen = sdslen(pattern);
3278 unsigned long numkeys = 0, keyslen = 0;
3279 robj *lenobj = createObject(REDIS_STRING,NULL);
3280
3281 di = dictGetIterator(c->db->dict);
3282 addReply(c,lenobj);
3283 decrRefCount(lenobj);
3284 while((de = dictNext(di)) != NULL) {
3285 robj *keyobj = dictGetEntryKey(de);
3286
3287 sds key = keyobj->ptr;
3288 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3289 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3290 if (expireIfNeeded(c->db,keyobj) == 0) {
3291 if (numkeys != 0)
3292 addReply(c,shared.space);
3293 addReply(c,keyobj);
3294 numkeys++;
3295 keyslen += sdslen(key);
3296 }
3297 }
3298 }
3299 dictReleaseIterator(di);
3300 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3301 addReply(c,shared.crlf);
3302 }
3303
3304 static void dbsizeCommand(redisClient *c) {
3305 addReplySds(c,
3306 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3307 }
3308
3309 static void lastsaveCommand(redisClient *c) {
3310 addReplySds(c,
3311 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3312 }
3313
3314 static void typeCommand(redisClient *c) {
3315 robj *o;
3316 char *type;
3317
3318 o = lookupKeyRead(c->db,c->argv[1]);
3319 if (o == NULL) {
3320 type = "+none";
3321 } else {
3322 switch(o->type) {
3323 case REDIS_STRING: type = "+string"; break;
3324 case REDIS_LIST: type = "+list"; break;
3325 case REDIS_SET: type = "+set"; break;
3326 case REDIS_ZSET: type = "+zset"; break;
3327 default: type = "unknown"; break;
3328 }
3329 }
3330 addReplySds(c,sdsnew(type));
3331 addReply(c,shared.crlf);
3332 }
3333
3334 static void saveCommand(redisClient *c) {
3335 if (server.bgsavechildpid != -1) {
3336 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3337 return;
3338 }
3339 if (rdbSave(server.dbfilename) == REDIS_OK) {
3340 addReply(c,shared.ok);
3341 } else {
3342 addReply(c,shared.err);
3343 }
3344 }
3345
3346 static void bgsaveCommand(redisClient *c) {
3347 if (server.bgsavechildpid != -1) {
3348 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3349 return;
3350 }
3351 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3352 char *status = "+Background saving started\r\n";
3353 addReplySds(c,sdsnew(status));
3354 } else {
3355 addReply(c,shared.err);
3356 }
3357 }
3358
3359 static void shutdownCommand(redisClient *c) {
3360 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3361 /* Kill the saving child if there is a background saving in progress.
3362 We want to avoid race conditions, for instance our saving child may
3363 overwrite the synchronous saving did by SHUTDOWN. */
3364 if (server.bgsavechildpid != -1) {
3365 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3366 kill(server.bgsavechildpid,SIGKILL);
3367 rdbRemoveTempFile(server.bgsavechildpid);
3368 }
3369 if (server.appendonly) {
3370 /* Append only file: fsync() the AOF and exit */
3371 fsync(server.appendfd);
3372 exit(0);
3373 } else {
3374 /* Snapshotting. Perform a SYNC SAVE and exit */
3375 if (rdbSave(server.dbfilename) == REDIS_OK) {
3376 if (server.daemonize)
3377 unlink(server.pidfile);
3378 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3379 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3380 exit(0);
3381 } else {
3382 /* Ooops.. error saving! The best we can do is to continue operating.
3383 * Note that if there was a background saving process, in the next
3384 * cron() Redis will be notified that the background saving aborted,
3385 * handling special stuff like slaves pending for synchronization... */
3386 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3387 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3388 }
3389 }
3390 }
3391
3392 static void renameGenericCommand(redisClient *c, int nx) {
3393 robj *o;
3394
3395 /* To use the same key as src and dst is probably an error */
3396 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3397 addReply(c,shared.sameobjecterr);
3398 return;
3399 }
3400
3401 o = lookupKeyWrite(c->db,c->argv[1]);
3402 if (o == NULL) {
3403 addReply(c,shared.nokeyerr);
3404 return;
3405 }
3406 incrRefCount(o);
3407 deleteIfVolatile(c->db,c->argv[2]);
3408 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3409 if (nx) {
3410 decrRefCount(o);
3411 addReply(c,shared.czero);
3412 return;
3413 }
3414 dictReplace(c->db->dict,c->argv[2],o);
3415 } else {
3416 incrRefCount(c->argv[2]);
3417 }
3418 deleteKey(c->db,c->argv[1]);
3419 server.dirty++;
3420 addReply(c,nx ? shared.cone : shared.ok);
3421 }
3422
3423 static void renameCommand(redisClient *c) {
3424 renameGenericCommand(c,0);
3425 }
3426
3427 static void renamenxCommand(redisClient *c) {
3428 renameGenericCommand(c,1);
3429 }
3430
3431 static void moveCommand(redisClient *c) {
3432 robj *o;
3433 redisDb *src, *dst;
3434 int srcid;
3435
3436 /* Obtain source and target DB pointers */
3437 src = c->db;
3438 srcid = c->db->id;
3439 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3440 addReply(c,shared.outofrangeerr);
3441 return;
3442 }
3443 dst = c->db;
3444 selectDb(c,srcid); /* Back to the source DB */
3445
3446 /* If the user is moving using as target the same
3447 * DB as the source DB it is probably an error. */
3448 if (src == dst) {
3449 addReply(c,shared.sameobjecterr);
3450 return;
3451 }
3452
3453 /* Check if the element exists and get a reference */
3454 o = lookupKeyWrite(c->db,c->argv[1]);
3455 if (!o) {
3456 addReply(c,shared.czero);
3457 return;
3458 }
3459
3460 /* Try to add the element to the target DB */
3461 deleteIfVolatile(dst,c->argv[1]);
3462 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3463 addReply(c,shared.czero);
3464 return;
3465 }
3466 incrRefCount(c->argv[1]);
3467 incrRefCount(o);
3468
3469 /* OK! key moved, free the entry in the source DB */
3470 deleteKey(src,c->argv[1]);
3471 server.dirty++;
3472 addReply(c,shared.cone);
3473 }
3474
3475 /* =================================== Lists ================================ */
3476 static void pushGenericCommand(redisClient *c, int where) {
3477 robj *lobj;
3478 list *list;
3479
3480 lobj = lookupKeyWrite(c->db,c->argv[1]);
3481 if (lobj == NULL) {
3482 lobj = createListObject();
3483 list = lobj->ptr;
3484 if (where == REDIS_HEAD) {
3485 listAddNodeHead(list,c->argv[2]);
3486 } else {
3487 listAddNodeTail(list,c->argv[2]);
3488 }
3489 dictAdd(c->db->dict,c->argv[1],lobj);
3490 incrRefCount(c->argv[1]);
3491 incrRefCount(c->argv[2]);
3492 } else {
3493 if (lobj->type != REDIS_LIST) {
3494 addReply(c,shared.wrongtypeerr);
3495 return;
3496 }
3497 list = lobj->ptr;
3498 if (where == REDIS_HEAD) {
3499 listAddNodeHead(list,c->argv[2]);
3500 } else {
3501 listAddNodeTail(list,c->argv[2]);
3502 }
3503 incrRefCount(c->argv[2]);
3504 }
3505 server.dirty++;
3506 addReply(c,shared.ok);
3507 }
3508
3509 static void lpushCommand(redisClient *c) {
3510 pushGenericCommand(c,REDIS_HEAD);
3511 }
3512
3513 static void rpushCommand(redisClient *c) {
3514 pushGenericCommand(c,REDIS_TAIL);
3515 }
3516
3517 static void llenCommand(redisClient *c) {
3518 robj *o;
3519 list *l;
3520
3521 o = lookupKeyRead(c->db,c->argv[1]);
3522 if (o == NULL) {
3523 addReply(c,shared.czero);
3524 return;
3525 } else {
3526 if (o->type != REDIS_LIST) {
3527 addReply(c,shared.wrongtypeerr);
3528 } else {
3529 l = o->ptr;
3530 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3531 }
3532 }
3533 }
3534
3535 static void lindexCommand(redisClient *c) {
3536 robj *o;
3537 int index = atoi(c->argv[2]->ptr);
3538
3539 o = lookupKeyRead(c->db,c->argv[1]);
3540 if (o == NULL) {
3541 addReply(c,shared.nullbulk);
3542 } else {
3543 if (o->type != REDIS_LIST) {
3544 addReply(c,shared.wrongtypeerr);
3545 } else {
3546 list *list = o->ptr;
3547 listNode *ln;
3548
3549 ln = listIndex(list, index);
3550 if (ln == NULL) {
3551 addReply(c,shared.nullbulk);
3552 } else {
3553 robj *ele = listNodeValue(ln);
3554 addReplyBulkLen(c,ele);
3555 addReply(c,ele);
3556 addReply(c,shared.crlf);
3557 }
3558 }
3559 }
3560 }
3561
3562 static void lsetCommand(redisClient *c) {
3563 robj *o;
3564 int index = atoi(c->argv[2]->ptr);
3565
3566 o = lookupKeyWrite(c->db,c->argv[1]);
3567 if (o == NULL) {
3568 addReply(c,shared.nokeyerr);
3569 } else {
3570 if (o->type != REDIS_LIST) {
3571 addReply(c,shared.wrongtypeerr);
3572 } else {
3573 list *list = o->ptr;
3574 listNode *ln;
3575
3576 ln = listIndex(list, index);
3577 if (ln == NULL) {
3578 addReply(c,shared.outofrangeerr);
3579 } else {
3580 robj *ele = listNodeValue(ln);
3581
3582 decrRefCount(ele);
3583 listNodeValue(ln) = c->argv[3];
3584 incrRefCount(c->argv[3]);
3585 addReply(c,shared.ok);
3586 server.dirty++;
3587 }
3588 }
3589 }
3590 }
3591
3592 static void popGenericCommand(redisClient *c, int where) {
3593 robj *o;
3594
3595 o = lookupKeyWrite(c->db,c->argv[1]);
3596 if (o == NULL) {
3597 addReply(c,shared.nullbulk);
3598 } else {
3599 if (o->type != REDIS_LIST) {
3600 addReply(c,shared.wrongtypeerr);
3601 } else {
3602 list *list = o->ptr;
3603 listNode *ln;
3604
3605 if (where == REDIS_HEAD)
3606 ln = listFirst(list);
3607 else
3608 ln = listLast(list);
3609
3610 if (ln == NULL) {
3611 addReply(c,shared.nullbulk);
3612 } else {
3613 robj *ele = listNodeValue(ln);
3614 addReplyBulkLen(c,ele);
3615 addReply(c,ele);
3616 addReply(c,shared.crlf);
3617 listDelNode(list,ln);
3618 server.dirty++;
3619 }
3620 }
3621 }
3622 }
3623
3624 static void lpopCommand(redisClient *c) {
3625 popGenericCommand(c,REDIS_HEAD);
3626 }
3627
3628 static void rpopCommand(redisClient *c) {
3629 popGenericCommand(c,REDIS_TAIL);
3630 }
3631
3632 static void lrangeCommand(redisClient *c) {
3633 robj *o;
3634 int start = atoi(c->argv[2]->ptr);
3635 int end = atoi(c->argv[3]->ptr);
3636
3637 o = lookupKeyRead(c->db,c->argv[1]);
3638 if (o == NULL) {
3639 addReply(c,shared.nullmultibulk);
3640 } else {
3641 if (o->type != REDIS_LIST) {
3642 addReply(c,shared.wrongtypeerr);
3643 } else {
3644 list *list = o->ptr;
3645 listNode *ln;
3646 int llen = listLength(list);
3647 int rangelen, j;
3648 robj *ele;
3649
3650 /* convert negative indexes */
3651 if (start < 0) start = llen+start;
3652 if (end < 0) end = llen+end;
3653 if (start < 0) start = 0;
3654 if (end < 0) end = 0;
3655
3656 /* indexes sanity checks */
3657 if (start > end || start >= llen) {
3658 /* Out of range start or start > end result in empty list */
3659 addReply(c,shared.emptymultibulk);
3660 return;
3661 }
3662 if (end >= llen) end = llen-1;
3663 rangelen = (end-start)+1;
3664
3665 /* Return the result in form of a multi-bulk reply */
3666 ln = listIndex(list, start);
3667 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3668 for (j = 0; j < rangelen; j++) {
3669 ele = listNodeValue(ln);
3670 addReplyBulkLen(c,ele);
3671 addReply(c,ele);
3672 addReply(c,shared.crlf);
3673 ln = ln->next;
3674 }
3675 }
3676 }
3677 }
3678
3679 static void ltrimCommand(redisClient *c) {
3680 robj *o;
3681 int start = atoi(c->argv[2]->ptr);
3682 int end = atoi(c->argv[3]->ptr);
3683
3684 o = lookupKeyWrite(c->db,c->argv[1]);
3685 if (o == NULL) {
3686 addReply(c,shared.ok);
3687 } else {
3688 if (o->type != REDIS_LIST) {
3689 addReply(c,shared.wrongtypeerr);
3690 } else {
3691 list *list = o->ptr;
3692 listNode *ln;
3693 int llen = listLength(list);
3694 int j, ltrim, rtrim;
3695
3696 /* convert negative indexes */
3697 if (start < 0) start = llen+start;
3698 if (end < 0) end = llen+end;
3699 if (start < 0) start = 0;
3700 if (end < 0) end = 0;
3701
3702 /* indexes sanity checks */
3703 if (start > end || start >= llen) {
3704 /* Out of range start or start > end result in empty list */
3705 ltrim = llen;
3706 rtrim = 0;
3707 } else {
3708 if (end >= llen) end = llen-1;
3709 ltrim = start;
3710 rtrim = llen-end-1;
3711 }
3712
3713 /* Remove list elements to perform the trim */
3714 for (j = 0; j < ltrim; j++) {
3715 ln = listFirst(list);
3716 listDelNode(list,ln);
3717 }
3718 for (j = 0; j < rtrim; j++) {
3719 ln = listLast(list);
3720 listDelNode(list,ln);
3721 }
3722 server.dirty++;
3723 addReply(c,shared.ok);
3724 }
3725 }
3726 }
3727
3728 static void lremCommand(redisClient *c) {
3729 robj *o;
3730
3731 o = lookupKeyWrite(c->db,c->argv[1]);
3732 if (o == NULL) {
3733 addReply(c,shared.czero);
3734 } else {
3735 if (o->type != REDIS_LIST) {
3736 addReply(c,shared.wrongtypeerr);
3737 } else {
3738 list *list = o->ptr;
3739 listNode *ln, *next;
3740 int toremove = atoi(c->argv[2]->ptr);
3741 int removed = 0;
3742 int fromtail = 0;
3743
3744 if (toremove < 0) {
3745 toremove = -toremove;
3746 fromtail = 1;
3747 }
3748 ln = fromtail ? list->tail : list->head;
3749 while (ln) {
3750 robj *ele = listNodeValue(ln);
3751
3752 next = fromtail ? ln->prev : ln->next;
3753 if (compareStringObjects(ele,c->argv[3]) == 0) {
3754 listDelNode(list,ln);
3755 server.dirty++;
3756 removed++;
3757 if (toremove && removed == toremove) break;
3758 }
3759 ln = next;
3760 }
3761 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3762 }
3763 }
3764 }
3765
3766 /* This is the semantic of this command:
3767 * RPOPLPUSH srclist dstlist:
3768 * IF LLEN(srclist) > 0
3769 * element = RPOP srclist
3770 * LPUSH dstlist element
3771 * RETURN element
3772 * ELSE
3773 * RETURN nil
3774 * END
3775 * END
3776 *
3777 * The idea is to be able to get an element from a list in a reliable way
3778 * since the element is not just returned but pushed against another list
3779 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3780 */
3781 static void rpoplpushcommand(redisClient *c) {
3782 robj *sobj;
3783
3784 sobj = lookupKeyWrite(c->db,c->argv[1]);
3785 if (sobj == NULL) {
3786 addReply(c,shared.nullbulk);
3787 } else {
3788 if (sobj->type != REDIS_LIST) {
3789 addReply(c,shared.wrongtypeerr);
3790 } else {
3791 list *srclist = sobj->ptr;
3792 listNode *ln = listLast(srclist);
3793
3794 if (ln == NULL) {
3795 addReply(c,shared.nullbulk);
3796 } else {
3797 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3798 robj *ele = listNodeValue(ln);
3799 list *dstlist;
3800
3801 if (dobj == NULL) {
3802
3803 /* Create the list if the key does not exist */
3804 dobj = createListObject();
3805 dictAdd(c->db->dict,c->argv[2],dobj);
3806 incrRefCount(c->argv[2]);
3807 } else if (dobj->type != REDIS_LIST) {
3808 addReply(c,shared.wrongtypeerr);
3809 return;
3810 }
3811 /* Add the element to the target list */
3812 dstlist = dobj->ptr;
3813 listAddNodeHead(dstlist,ele);
3814 incrRefCount(ele);
3815
3816 /* Send the element to the client as reply as well */
3817 addReplyBulkLen(c,ele);
3818 addReply(c,ele);
3819 addReply(c,shared.crlf);
3820
3821 /* Finally remove the element from the source list */
3822 listDelNode(srclist,ln);
3823 server.dirty++;
3824 }
3825 }
3826 }
3827 }
3828
3829
3830 /* ==================================== Sets ================================ */
3831
3832 static void saddCommand(redisClient *c) {
3833 robj *set;
3834
3835 set = lookupKeyWrite(c->db,c->argv[1]);
3836 if (set == NULL) {
3837 set = createSetObject();
3838 dictAdd(c->db->dict,c->argv[1],set);
3839 incrRefCount(c->argv[1]);
3840 } else {
3841 if (set->type != REDIS_SET) {
3842 addReply(c,shared.wrongtypeerr);
3843 return;
3844 }
3845 }
3846 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3847 incrRefCount(c->argv[2]);
3848 server.dirty++;
3849 addReply(c,shared.cone);
3850 } else {
3851 addReply(c,shared.czero);
3852 }
3853 }
3854
3855 static void sremCommand(redisClient *c) {
3856 robj *set;
3857
3858 set = lookupKeyWrite(c->db,c->argv[1]);
3859 if (set == NULL) {
3860 addReply(c,shared.czero);
3861 } else {
3862 if (set->type != REDIS_SET) {
3863 addReply(c,shared.wrongtypeerr);
3864 return;
3865 }
3866 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3867 server.dirty++;
3868 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3869 addReply(c,shared.cone);
3870 } else {
3871 addReply(c,shared.czero);
3872 }
3873 }
3874 }
3875
3876 static void smoveCommand(redisClient *c) {
3877 robj *srcset, *dstset;
3878
3879 srcset = lookupKeyWrite(c->db,c->argv[1]);
3880 dstset = lookupKeyWrite(c->db,c->argv[2]);
3881
3882 /* If the source key does not exist return 0, if it's of the wrong type
3883 * raise an error */
3884 if (srcset == NULL || srcset->type != REDIS_SET) {
3885 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3886 return;
3887 }
3888 /* Error if the destination key is not a set as well */
3889 if (dstset && dstset->type != REDIS_SET) {
3890 addReply(c,shared.wrongtypeerr);
3891 return;
3892 }
3893 /* Remove the element from the source set */
3894 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3895 /* Key not found in the src set! return zero */
3896 addReply(c,shared.czero);
3897 return;
3898 }
3899 server.dirty++;
3900 /* Add the element to the destination set */
3901 if (!dstset) {
3902 dstset = createSetObject();
3903 dictAdd(c->db->dict,c->argv[2],dstset);
3904 incrRefCount(c->argv[2]);
3905 }
3906 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3907 incrRefCount(c->argv[3]);
3908 addReply(c,shared.cone);
3909 }
3910
3911 static void sismemberCommand(redisClient *c) {
3912 robj *set;
3913
3914 set = lookupKeyRead(c->db,c->argv[1]);
3915 if (set == NULL) {
3916 addReply(c,shared.czero);
3917 } else {
3918 if (set->type != REDIS_SET) {
3919 addReply(c,shared.wrongtypeerr);
3920 return;
3921 }
3922 if (dictFind(set->ptr,c->argv[2]))
3923 addReply(c,shared.cone);
3924 else
3925 addReply(c,shared.czero);
3926 }
3927 }
3928
3929 static void scardCommand(redisClient *c) {
3930 robj *o;
3931 dict *s;
3932
3933 o = lookupKeyRead(c->db,c->argv[1]);
3934 if (o == NULL) {
3935 addReply(c,shared.czero);
3936 return;
3937 } else {
3938 if (o->type != REDIS_SET) {
3939 addReply(c,shared.wrongtypeerr);
3940 } else {
3941 s = o->ptr;
3942 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
3943 dictSize(s)));
3944 }
3945 }
3946 }
3947
3948 static void spopCommand(redisClient *c) {
3949 robj *set;
3950 dictEntry *de;
3951
3952 set = lookupKeyWrite(c->db,c->argv[1]);
3953 if (set == NULL) {
3954 addReply(c,shared.nullbulk);
3955 } else {
3956 if (set->type != REDIS_SET) {
3957 addReply(c,shared.wrongtypeerr);
3958 return;
3959 }
3960 de = dictGetRandomKey(set->ptr);
3961 if (de == NULL) {
3962 addReply(c,shared.nullbulk);
3963 } else {
3964 robj *ele = dictGetEntryKey(de);
3965
3966 addReplyBulkLen(c,ele);
3967 addReply(c,ele);
3968 addReply(c,shared.crlf);
3969 dictDelete(set->ptr,ele);
3970 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3971 server.dirty++;
3972 }
3973 }
3974 }
3975
3976 static void srandmemberCommand(redisClient *c) {
3977 robj *set;
3978 dictEntry *de;
3979
3980 set = lookupKeyRead(c->db,c->argv[1]);
3981 if (set == NULL) {
3982 addReply(c,shared.nullbulk);
3983 } else {
3984 if (set->type != REDIS_SET) {
3985 addReply(c,shared.wrongtypeerr);
3986 return;
3987 }
3988 de = dictGetRandomKey(set->ptr);
3989 if (de == NULL) {
3990 addReply(c,shared.nullbulk);
3991 } else {
3992 robj *ele = dictGetEntryKey(de);
3993
3994 addReplyBulkLen(c,ele);
3995 addReply(c,ele);
3996 addReply(c,shared.crlf);
3997 }
3998 }
3999 }
4000
4001 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
4002 dict **d1 = (void*) s1, **d2 = (void*) s2;
4003
4004 return dictSize(*d1)-dictSize(*d2);
4005 }
4006
4007 static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
4008 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4009 dictIterator *di;
4010 dictEntry *de;
4011 robj *lenobj = NULL, *dstset = NULL;
4012 unsigned long j, cardinality = 0;
4013
4014 for (j = 0; j < setsnum; j++) {
4015 robj *setobj;
4016
4017 setobj = dstkey ?
4018 lookupKeyWrite(c->db,setskeys[j]) :
4019 lookupKeyRead(c->db,setskeys[j]);
4020 if (!setobj) {
4021 zfree(dv);
4022 if (dstkey) {
4023 if (deleteKey(c->db,dstkey))
4024 server.dirty++;
4025 addReply(c,shared.czero);
4026 } else {
4027 addReply(c,shared.nullmultibulk);
4028 }
4029 return;
4030 }
4031 if (setobj->type != REDIS_SET) {
4032 zfree(dv);
4033 addReply(c,shared.wrongtypeerr);
4034 return;
4035 }
4036 dv[j] = setobj->ptr;
4037 }
4038 /* Sort sets from the smallest to largest, this will improve our
4039 * algorithm's performace */
4040 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
4041
4042 /* The first thing we should output is the total number of elements...
4043 * since this is a multi-bulk write, but at this stage we don't know
4044 * the intersection set size, so we use a trick, append an empty object
4045 * to the output list and save the pointer to later modify it with the
4046 * right length */
4047 if (!dstkey) {
4048 lenobj = createObject(REDIS_STRING,NULL);
4049 addReply(c,lenobj);
4050 decrRefCount(lenobj);
4051 } else {
4052 /* If we have a target key where to store the resulting set
4053 * create this key with an empty set inside */
4054 dstset = createSetObject();
4055 }
4056
4057 /* Iterate all the elements of the first (smallest) set, and test
4058 * the element against all the other sets, if at least one set does
4059 * not include the element it is discarded */
4060 di = dictGetIterator(dv[0]);
4061
4062 while((de = dictNext(di)) != NULL) {
4063 robj *ele;
4064
4065 for (j = 1; j < setsnum; j++)
4066 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4067 if (j != setsnum)
4068 continue; /* at least one set does not contain the member */
4069 ele = dictGetEntryKey(de);
4070 if (!dstkey) {
4071 addReplyBulkLen(c,ele);
4072 addReply(c,ele);
4073 addReply(c,shared.crlf);
4074 cardinality++;
4075 } else {
4076 dictAdd(dstset->ptr,ele,NULL);
4077 incrRefCount(ele);
4078 }
4079 }
4080 dictReleaseIterator(di);
4081
4082 if (dstkey) {
4083 /* Store the resulting set into the target */
4084 deleteKey(c->db,dstkey);
4085 dictAdd(c->db->dict,dstkey,dstset);
4086 incrRefCount(dstkey);
4087 }
4088
4089 if (!dstkey) {
4090 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
4091 } else {
4092 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4093 dictSize((dict*)dstset->ptr)));
4094 server.dirty++;
4095 }
4096 zfree(dv);
4097 }
4098
4099 static void sinterCommand(redisClient *c) {
4100 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4101 }
4102
4103 static void sinterstoreCommand(redisClient *c) {
4104 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4105 }
4106
4107 #define REDIS_OP_UNION 0
4108 #define REDIS_OP_DIFF 1
4109
4110 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4111 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4112 dictIterator *di;
4113 dictEntry *de;
4114 robj *dstset = NULL;
4115 int j, cardinality = 0;
4116
4117 for (j = 0; j < setsnum; j++) {
4118 robj *setobj;
4119
4120 setobj = dstkey ?
4121 lookupKeyWrite(c->db,setskeys[j]) :
4122 lookupKeyRead(c->db,setskeys[j]);
4123 if (!setobj) {
4124 dv[j] = NULL;
4125 continue;
4126 }
4127 if (setobj->type != REDIS_SET) {
4128 zfree(dv);
4129 addReply(c,shared.wrongtypeerr);
4130 return;
4131 }
4132 dv[j] = setobj->ptr;
4133 }
4134
4135 /* We need a temp set object to store our union. If the dstkey
4136 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4137 * this set object will be the resulting object to set into the target key*/
4138 dstset = createSetObject();
4139
4140 /* Iterate all the elements of all the sets, add every element a single
4141 * time to the result set */
4142 for (j = 0; j < setsnum; j++) {
4143 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4144 if (!dv[j]) continue; /* non existing keys are like empty sets */
4145
4146 di = dictGetIterator(dv[j]);
4147
4148 while((de = dictNext(di)) != NULL) {
4149 robj *ele;
4150
4151 /* dictAdd will not add the same element multiple times */
4152 ele = dictGetEntryKey(de);
4153 if (op == REDIS_OP_UNION || j == 0) {
4154 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4155 incrRefCount(ele);
4156 cardinality++;
4157 }
4158 } else if (op == REDIS_OP_DIFF) {
4159 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4160 cardinality--;
4161 }
4162 }
4163 }
4164 dictReleaseIterator(di);
4165
4166 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4167 }
4168
4169 /* Output the content of the resulting set, if not in STORE mode */
4170 if (!dstkey) {
4171 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4172 di = dictGetIterator(dstset->ptr);
4173 while((de = dictNext(di)) != NULL) {
4174 robj *ele;
4175
4176 ele = dictGetEntryKey(de);
4177 addReplyBulkLen(c,ele);
4178 addReply(c,ele);
4179 addReply(c,shared.crlf);
4180 }
4181 dictReleaseIterator(di);
4182 } else {
4183 /* If we have a target key where to store the resulting set
4184 * create this key with the result set inside */
4185 deleteKey(c->db,dstkey);
4186 dictAdd(c->db->dict,dstkey,dstset);
4187 incrRefCount(dstkey);
4188 }
4189
4190 /* Cleanup */
4191 if (!dstkey) {
4192 decrRefCount(dstset);
4193 } else {
4194 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4195 dictSize((dict*)dstset->ptr)));
4196 server.dirty++;
4197 }
4198 zfree(dv);
4199 }
4200
4201 static void sunionCommand(redisClient *c) {
4202 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4203 }
4204
4205 static void sunionstoreCommand(redisClient *c) {
4206 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4207 }
4208
4209 static void sdiffCommand(redisClient *c) {
4210 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4211 }
4212
4213 static void sdiffstoreCommand(redisClient *c) {
4214 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4215 }
4216
4217 /* ==================================== ZSets =============================== */
4218
4219 /* ZSETs are ordered sets using two data structures to hold the same elements
4220 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4221 * data structure.
4222 *
4223 * The elements are added to an hash table mapping Redis objects to scores.
4224 * At the same time the elements are added to a skip list mapping scores
4225 * to Redis objects (so objects are sorted by scores in this "view"). */
4226
4227 /* This skiplist implementation is almost a C translation of the original
4228 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4229 * Alternative to Balanced Trees", modified in three ways:
4230 * a) this implementation allows for repeated values.
4231 * b) the comparison is not just by key (our 'score') but by satellite data.
4232 * c) there is a back pointer, so it's a doubly linked list with the back
4233 * pointers being only at "level 1". This allows to traverse the list
4234 * from tail to head, useful for ZREVRANGE. */
4235
4236 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4237 zskiplistNode *zn = zmalloc(sizeof(*zn));
4238
4239 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4240 zn->score = score;
4241 zn->obj = obj;
4242 return zn;
4243 }
4244
4245 static zskiplist *zslCreate(void) {
4246 int j;
4247 zskiplist *zsl;
4248
4249 zsl = zmalloc(sizeof(*zsl));
4250 zsl->level = 1;
4251 zsl->length = 0;
4252 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4253 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4254 zsl->header->forward[j] = NULL;
4255 zsl->header->backward = NULL;
4256 zsl->tail = NULL;
4257 return zsl;
4258 }
4259
4260 static void zslFreeNode(zskiplistNode *node) {
4261 decrRefCount(node->obj);
4262 zfree(node->forward);
4263 zfree(node);
4264 }
4265
4266 static void zslFree(zskiplist *zsl) {
4267 zskiplistNode *node = zsl->header->forward[0], *next;
4268
4269 zfree(zsl->header->forward);
4270 zfree(zsl->header);
4271 while(node) {
4272 next = node->forward[0];
4273 zslFreeNode(node);
4274 node = next;
4275 }
4276 zfree(zsl);
4277 }
4278
4279 static int zslRandomLevel(void) {
4280 int level = 1;
4281 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4282 level += 1;
4283 return level;
4284 }
4285
4286 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4287 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4288 int i, level;
4289
4290 x = zsl->header;
4291 for (i = zsl->level-1; i >= 0; i--) {
4292 while (x->forward[i] &&
4293 (x->forward[i]->score < score ||
4294 (x->forward[i]->score == score &&
4295 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4296 x = x->forward[i];
4297 update[i] = x;
4298 }
4299 /* we assume the key is not already inside, since we allow duplicated
4300 * scores, and the re-insertion of score and redis object should never
4301 * happpen since the caller of zslInsert() should test in the hash table
4302 * if the element is already inside or not. */
4303 level = zslRandomLevel();
4304 if (level > zsl->level) {
4305 for (i = zsl->level; i < level; i++)
4306 update[i] = zsl->header;
4307 zsl->level = level;
4308 }
4309 x = zslCreateNode(level,score,obj);
4310 for (i = 0; i < level; i++) {
4311 x->forward[i] = update[i]->forward[i];
4312 update[i]->forward[i] = x;
4313 }
4314 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4315 if (x->forward[0])
4316 x->forward[0]->backward = x;
4317 else
4318 zsl->tail = x;
4319 zsl->length++;
4320 }
4321
4322 /* Delete an element with matching score/object from the skiplist. */
4323 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4324 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4325 int i;
4326
4327 x = zsl->header;
4328 for (i = zsl->level-1; i >= 0; i--) {
4329 while (x->forward[i] &&
4330 (x->forward[i]->score < score ||
4331 (x->forward[i]->score == score &&
4332 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4333 x = x->forward[i];
4334 update[i] = x;
4335 }
4336 /* We may have multiple elements with the same score, what we need
4337 * is to find the element with both the right score and object. */
4338 x = x->forward[0];
4339 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4340 for (i = 0; i < zsl->level; i++) {
4341 if (update[i]->forward[i] != x) break;
4342 update[i]->forward[i] = x->forward[i];
4343 }
4344 if (x->forward[0]) {
4345 x->forward[0]->backward = (x->backward == zsl->header) ?
4346 NULL : x->backward;
4347 } else {
4348 zsl->tail = x->backward;
4349 }
4350 zslFreeNode(x);
4351 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4352 zsl->level--;
4353 zsl->length--;
4354 return 1;
4355 } else {
4356 return 0; /* not found */
4357 }
4358 return 0; /* not found */
4359 }
4360
4361 /* Delete all the elements with score between min and max from the skiplist.
4362 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4363 * Note that this function takes the reference to the hash table view of the
4364 * sorted set, in order to remove the elements from the hash table too. */
4365 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4366 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4367 unsigned long removed = 0;
4368 int i;
4369
4370 x = zsl->header;
4371 for (i = zsl->level-1; i >= 0; i--) {
4372 while (x->forward[i] && x->forward[i]->score < min)
4373 x = x->forward[i];
4374 update[i] = x;
4375 }
4376 /* We may have multiple elements with the same score, what we need
4377 * is to find the element with both the right score and object. */
4378 x = x->forward[0];
4379 while (x && x->score <= max) {
4380 zskiplistNode *next;
4381
4382 for (i = 0; i < zsl->level; i++) {
4383 if (update[i]->forward[i] != x) break;
4384 update[i]->forward[i] = x->forward[i];
4385 }
4386 if (x->forward[0]) {
4387 x->forward[0]->backward = (x->backward == zsl->header) ?
4388 NULL : x->backward;
4389 } else {
4390 zsl->tail = x->backward;
4391 }
4392 next = x->forward[0];
4393 dictDelete(dict,x->obj);
4394 zslFreeNode(x);
4395 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4396 zsl->level--;
4397 zsl->length--;
4398 removed++;
4399 x = next;
4400 }
4401 return removed; /* not found */
4402 }
4403
4404 /* Find the first node having a score equal or greater than the specified one.
4405 * Returns NULL if there is no match. */
4406 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4407 zskiplistNode *x;
4408 int i;
4409
4410 x = zsl->header;
4411 for (i = zsl->level-1; i >= 0; i--) {
4412 while (x->forward[i] && x->forward[i]->score < score)
4413 x = x->forward[i];
4414 }
4415 /* We may have multiple elements with the same score, what we need
4416 * is to find the element with both the right score and object. */
4417 return x->forward[0];
4418 }
4419
4420 /* The actual Z-commands implementations */
4421
4422 /* This generic command implements both ZADD and ZINCRBY.
4423 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4424 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4425 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4426 robj *zsetobj;
4427 zset *zs;
4428 double *score;
4429
4430 zsetobj = lookupKeyWrite(c->db,key);
4431 if (zsetobj == NULL) {
4432 zsetobj = createZsetObject();
4433 dictAdd(c->db->dict,key,zsetobj);
4434 incrRefCount(key);
4435 } else {
4436 if (zsetobj->type != REDIS_ZSET) {
4437 addReply(c,shared.wrongtypeerr);
4438 return;
4439 }
4440 }
4441 zs = zsetobj->ptr;
4442
4443 /* Ok now since we implement both ZADD and ZINCRBY here the code
4444 * needs to handle the two different conditions. It's all about setting
4445 * '*score', that is, the new score to set, to the right value. */
4446 score = zmalloc(sizeof(double));
4447 if (doincrement) {
4448 dictEntry *de;
4449
4450 /* Read the old score. If the element was not present starts from 0 */
4451 de = dictFind(zs->dict,ele);
4452 if (de) {
4453 double *oldscore = dictGetEntryVal(de);
4454 *score = *oldscore + scoreval;
4455 } else {
4456 *score = scoreval;
4457 }
4458 } else {
4459 *score = scoreval;
4460 }
4461
4462 /* What follows is a simple remove and re-insert operation that is common
4463 * to both ZADD and ZINCRBY... */
4464 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4465 /* case 1: New element */
4466 incrRefCount(ele); /* added to hash */
4467 zslInsert(zs->zsl,*score,ele);
4468 incrRefCount(ele); /* added to skiplist */
4469 server.dirty++;
4470 if (doincrement)
4471 addReplyDouble(c,*score);
4472 else
4473 addReply(c,shared.cone);
4474 } else {
4475 dictEntry *de;
4476 double *oldscore;
4477
4478 /* case 2: Score update operation */
4479 de = dictFind(zs->dict,ele);
4480 redisAssert(de != NULL);
4481 oldscore = dictGetEntryVal(de);
4482 if (*score != *oldscore) {
4483 int deleted;
4484
4485 /* Remove and insert the element in the skip list with new score */
4486 deleted = zslDelete(zs->zsl,*oldscore,ele);
4487 redisAssert(deleted != 0);
4488 zslInsert(zs->zsl,*score,ele);
4489 incrRefCount(ele);
4490 /* Update the score in the hash table */
4491 dictReplace(zs->dict,ele,score);
4492 server.dirty++;
4493 } else {
4494 zfree(score);
4495 }
4496 if (doincrement)
4497 addReplyDouble(c,*score);
4498 else
4499 addReply(c,shared.czero);
4500 }
4501 }
4502
4503 static void zaddCommand(redisClient *c) {
4504 double scoreval;
4505
4506 scoreval = strtod(c->argv[2]->ptr,NULL);
4507 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4508 }
4509
4510 static void zincrbyCommand(redisClient *c) {
4511 double scoreval;
4512
4513 scoreval = strtod(c->argv[2]->ptr,NULL);
4514 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4515 }
4516
4517 static void zremCommand(redisClient *c) {
4518 robj *zsetobj;
4519 zset *zs;
4520
4521 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4522 if (zsetobj == NULL) {
4523 addReply(c,shared.czero);
4524 } else {
4525 dictEntry *de;
4526 double *oldscore;
4527 int deleted;
4528
4529 if (zsetobj->type != REDIS_ZSET) {
4530 addReply(c,shared.wrongtypeerr);
4531 return;
4532 }
4533 zs = zsetobj->ptr;
4534 de = dictFind(zs->dict,c->argv[2]);
4535 if (de == NULL) {
4536 addReply(c,shared.czero);
4537 return;
4538 }
4539 /* Delete from the skiplist */
4540 oldscore = dictGetEntryVal(de);
4541 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4542 redisAssert(deleted != 0);
4543
4544 /* Delete from the hash table */
4545 dictDelete(zs->dict,c->argv[2]);
4546 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4547 server.dirty++;
4548 addReply(c,shared.cone);
4549 }
4550 }
4551
4552 static void zremrangebyscoreCommand(redisClient *c) {
4553 double min = strtod(c->argv[2]->ptr,NULL);
4554 double max = strtod(c->argv[3]->ptr,NULL);
4555 robj *zsetobj;
4556 zset *zs;
4557
4558 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4559 if (zsetobj == NULL) {
4560 addReply(c,shared.czero);
4561 } else {
4562 long deleted;
4563
4564 if (zsetobj->type != REDIS_ZSET) {
4565 addReply(c,shared.wrongtypeerr);
4566 return;
4567 }
4568 zs = zsetobj->ptr;
4569 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4570 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4571 server.dirty += deleted;
4572 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4573 }
4574 }
4575
4576 static void zrangeGenericCommand(redisClient *c, int reverse) {
4577 robj *o;
4578 int start = atoi(c->argv[2]->ptr);
4579 int end = atoi(c->argv[3]->ptr);
4580 int withscores = 0;
4581
4582 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
4583 withscores = 1;
4584 } else if (c->argc >= 5) {
4585 addReply(c,shared.syntaxerr);
4586 return;
4587 }
4588
4589 o = lookupKeyRead(c->db,c->argv[1]);
4590 if (o == NULL) {
4591 addReply(c,shared.nullmultibulk);
4592 } else {
4593 if (o->type != REDIS_ZSET) {
4594 addReply(c,shared.wrongtypeerr);
4595 } else {
4596 zset *zsetobj = o->ptr;
4597 zskiplist *zsl = zsetobj->zsl;
4598 zskiplistNode *ln;
4599
4600 int llen = zsl->length;
4601 int rangelen, j;
4602 robj *ele;
4603
4604 /* convert negative indexes */
4605 if (start < 0) start = llen+start;
4606 if (end < 0) end = llen+end;
4607 if (start < 0) start = 0;
4608 if (end < 0) end = 0;
4609
4610 /* indexes sanity checks */
4611 if (start > end || start >= llen) {
4612 /* Out of range start or start > end result in empty list */
4613 addReply(c,shared.emptymultibulk);
4614 return;
4615 }
4616 if (end >= llen) end = llen-1;
4617 rangelen = (end-start)+1;
4618
4619 /* Return the result in form of a multi-bulk reply */
4620 if (reverse) {
4621 ln = zsl->tail;
4622 while (start--)
4623 ln = ln->backward;
4624 } else {
4625 ln = zsl->header->forward[0];
4626 while (start--)
4627 ln = ln->forward[0];
4628 }
4629
4630 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
4631 withscores ? (rangelen*2) : rangelen));
4632 for (j = 0; j < rangelen; j++) {
4633 ele = ln->obj;
4634 addReplyBulkLen(c,ele);
4635 addReply(c,ele);
4636 addReply(c,shared.crlf);
4637 if (withscores)
4638 addReplyDouble(c,ln->score);
4639 ln = reverse ? ln->backward : ln->forward[0];
4640 }
4641 }
4642 }
4643 }
4644
4645 static void zrangeCommand(redisClient *c) {
4646 zrangeGenericCommand(c,0);
4647 }
4648
4649 static void zrevrangeCommand(redisClient *c) {
4650 zrangeGenericCommand(c,1);
4651 }
4652
4653 static void zrangebyscoreCommand(redisClient *c) {
4654 robj *o;
4655 double min = strtod(c->argv[2]->ptr,NULL);
4656 double max = strtod(c->argv[3]->ptr,NULL);
4657 int offset = 0, limit = -1;
4658
4659 if (c->argc != 4 && c->argc != 7) {
4660 addReplySds(c,
4661 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4662 return;
4663 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4664 addReply(c,shared.syntaxerr);
4665 return;
4666 } else if (c->argc == 7) {
4667 offset = atoi(c->argv[5]->ptr);
4668 limit = atoi(c->argv[6]->ptr);
4669 if (offset < 0) offset = 0;
4670 }
4671
4672 o = lookupKeyRead(c->db,c->argv[1]);
4673 if (o == NULL) {
4674 addReply(c,shared.nullmultibulk);
4675 } else {
4676 if (o->type != REDIS_ZSET) {
4677 addReply(c,shared.wrongtypeerr);
4678 } else {
4679 zset *zsetobj = o->ptr;
4680 zskiplist *zsl = zsetobj->zsl;
4681 zskiplistNode *ln;
4682 robj *ele, *lenobj;
4683 unsigned int rangelen = 0;
4684
4685 /* Get the first node with the score >= min */
4686 ln = zslFirstWithScore(zsl,min);
4687 if (ln == NULL) {
4688 /* No element matching the speciifed interval */
4689 addReply(c,shared.emptymultibulk);
4690 return;
4691 }
4692
4693 /* We don't know in advance how many matching elements there
4694 * are in the list, so we push this object that will represent
4695 * the multi-bulk length in the output buffer, and will "fix"
4696 * it later */
4697 lenobj = createObject(REDIS_STRING,NULL);
4698 addReply(c,lenobj);
4699 decrRefCount(lenobj);
4700
4701 while(ln && ln->score <= max) {
4702 if (offset) {
4703 offset--;
4704 ln = ln->forward[0];
4705 continue;
4706 }
4707 if (limit == 0) break;
4708 ele = ln->obj;
4709 addReplyBulkLen(c,ele);
4710 addReply(c,ele);
4711 addReply(c,shared.crlf);
4712 ln = ln->forward[0];
4713 rangelen++;
4714 if (limit > 0) limit--;
4715 }
4716 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4717 }
4718 }
4719 }
4720
4721 static void zcardCommand(redisClient *c) {
4722 robj *o;
4723 zset *zs;
4724
4725 o = lookupKeyRead(c->db,c->argv[1]);
4726 if (o == NULL) {
4727 addReply(c,shared.czero);
4728 return;
4729 } else {
4730 if (o->type != REDIS_ZSET) {
4731 addReply(c,shared.wrongtypeerr);
4732 } else {
4733 zs = o->ptr;
4734 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
4735 }
4736 }
4737 }
4738
4739 static void zscoreCommand(redisClient *c) {
4740 robj *o;
4741 zset *zs;
4742
4743 o = lookupKeyRead(c->db,c->argv[1]);
4744 if (o == NULL) {
4745 addReply(c,shared.nullbulk);
4746 return;
4747 } else {
4748 if (o->type != REDIS_ZSET) {
4749 addReply(c,shared.wrongtypeerr);
4750 } else {
4751 dictEntry *de;
4752
4753 zs = o->ptr;
4754 de = dictFind(zs->dict,c->argv[2]);
4755 if (!de) {
4756 addReply(c,shared.nullbulk);
4757 } else {
4758 double *score = dictGetEntryVal(de);
4759
4760 addReplyDouble(c,*score);
4761 }
4762 }
4763 }
4764 }
4765
4766 /* ========================= Non type-specific commands ==================== */
4767
4768 static void flushdbCommand(redisClient *c) {
4769 server.dirty += dictSize(c->db->dict);
4770 dictEmpty(c->db->dict);
4771 dictEmpty(c->db->expires);
4772 addReply(c,shared.ok);
4773 }
4774
4775 static void flushallCommand(redisClient *c) {
4776 server.dirty += emptyDb();
4777 addReply(c,shared.ok);
4778 rdbSave(server.dbfilename);
4779 server.dirty++;
4780 }
4781
4782 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4783 redisSortOperation *so = zmalloc(sizeof(*so));
4784 so->type = type;
4785 so->pattern = pattern;
4786 return so;
4787 }
4788
4789 /* Return the value associated to the key with a name obtained
4790 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4791 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4792 char *p;
4793 sds spat, ssub;
4794 robj keyobj;
4795 int prefixlen, sublen, postfixlen;
4796 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4797 struct {
4798 long len;
4799 long free;
4800 char buf[REDIS_SORTKEY_MAX+1];
4801 } keyname;
4802
4803 /* If the pattern is "#" return the substitution object itself in order
4804 * to implement the "SORT ... GET #" feature. */
4805 spat = pattern->ptr;
4806 if (spat[0] == '#' && spat[1] == '\0') {
4807 return subst;
4808 }
4809
4810 /* The substitution object may be specially encoded. If so we create
4811 * a decoded object on the fly. Otherwise getDecodedObject will just
4812 * increment the ref count, that we'll decrement later. */
4813 subst = getDecodedObject(subst);
4814
4815 ssub = subst->ptr;
4816 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4817 p = strchr(spat,'*');
4818 if (!p) {
4819 decrRefCount(subst);
4820 return NULL;
4821 }
4822
4823 prefixlen = p-spat;
4824 sublen = sdslen(ssub);
4825 postfixlen = sdslen(spat)-(prefixlen+1);
4826 memcpy(keyname.buf,spat,prefixlen);
4827 memcpy(keyname.buf+prefixlen,ssub,sublen);
4828 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4829 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4830 keyname.len = prefixlen+sublen+postfixlen;
4831
4832 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
4833 decrRefCount(subst);
4834
4835 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4836 return lookupKeyRead(db,&keyobj);
4837 }
4838
4839 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4840 * the additional parameter is not standard but a BSD-specific we have to
4841 * pass sorting parameters via the global 'server' structure */
4842 static int sortCompare(const void *s1, const void *s2) {
4843 const redisSortObject *so1 = s1, *so2 = s2;
4844 int cmp;
4845
4846 if (!server.sort_alpha) {
4847 /* Numeric sorting. Here it's trivial as we precomputed scores */
4848 if (so1->u.score > so2->u.score) {
4849 cmp = 1;
4850 } else if (so1->u.score < so2->u.score) {
4851 cmp = -1;
4852 } else {
4853 cmp = 0;
4854 }
4855 } else {
4856 /* Alphanumeric sorting */
4857 if (server.sort_bypattern) {
4858 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4859 /* At least one compare object is NULL */
4860 if (so1->u.cmpobj == so2->u.cmpobj)
4861 cmp = 0;
4862 else if (so1->u.cmpobj == NULL)
4863 cmp = -1;
4864 else
4865 cmp = 1;
4866 } else {
4867 /* We have both the objects, use strcoll */
4868 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4869 }
4870 } else {
4871 /* Compare elements directly */
4872 robj *dec1, *dec2;
4873
4874 dec1 = getDecodedObject(so1->obj);
4875 dec2 = getDecodedObject(so2->obj);
4876 cmp = strcoll(dec1->ptr,dec2->ptr);
4877 decrRefCount(dec1);
4878 decrRefCount(dec2);
4879 }
4880 }
4881 return server.sort_desc ? -cmp : cmp;
4882 }
4883
4884 /* The SORT command is the most complex command in Redis. Warning: this code
4885 * is optimized for speed and a bit less for readability */
4886 static void sortCommand(redisClient *c) {
4887 list *operations;
4888 int outputlen = 0;
4889 int desc = 0, alpha = 0;
4890 int limit_start = 0, limit_count = -1, start, end;
4891 int j, dontsort = 0, vectorlen;
4892 int getop = 0; /* GET operation counter */
4893 robj *sortval, *sortby = NULL, *storekey = NULL;
4894 redisSortObject *vector; /* Resulting vector to sort */
4895
4896 /* Lookup the key to sort. It must be of the right types */
4897 sortval = lookupKeyRead(c->db,c->argv[1]);
4898 if (sortval == NULL) {
4899 addReply(c,shared.nullmultibulk);
4900 return;
4901 }
4902 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
4903 sortval->type != REDIS_ZSET)
4904 {
4905 addReply(c,shared.wrongtypeerr);
4906 return;
4907 }
4908
4909 /* Create a list of operations to perform for every sorted element.
4910 * Operations can be GET/DEL/INCR/DECR */
4911 operations = listCreate();
4912 listSetFreeMethod(operations,zfree);
4913 j = 2;
4914
4915 /* Now we need to protect sortval incrementing its count, in the future
4916 * SORT may have options able to overwrite/delete keys during the sorting
4917 * and the sorted key itself may get destroied */
4918 incrRefCount(sortval);
4919
4920 /* The SORT command has an SQL-alike syntax, parse it */
4921 while(j < c->argc) {
4922 int leftargs = c->argc-j-1;
4923 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4924 desc = 0;
4925 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4926 desc = 1;
4927 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4928 alpha = 1;
4929 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4930 limit_start = atoi(c->argv[j+1]->ptr);
4931 limit_count = atoi(c->argv[j+2]->ptr);
4932 j+=2;
4933 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4934 storekey = c->argv[j+1];
4935 j++;
4936 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4937 sortby = c->argv[j+1];
4938 /* If the BY pattern does not contain '*', i.e. it is constant,
4939 * we don't need to sort nor to lookup the weight keys. */
4940 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4941 j++;
4942 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4943 listAddNodeTail(operations,createSortOperation(
4944 REDIS_SORT_GET,c->argv[j+1]));
4945 getop++;
4946 j++;
4947 } else {
4948 decrRefCount(sortval);
4949 listRelease(operations);
4950 addReply(c,shared.syntaxerr);
4951 return;
4952 }
4953 j++;
4954 }
4955
4956 /* Load the sorting vector with all the objects to sort */
4957 switch(sortval->type) {
4958 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
4959 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
4960 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
4961 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
4962 }
4963 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4964 j = 0;
4965
4966 if (sortval->type == REDIS_LIST) {
4967 list *list = sortval->ptr;
4968 listNode *ln;
4969
4970 listRewind(list);
4971 while((ln = listYield(list))) {
4972 robj *ele = ln->value;
4973 vector[j].obj = ele;
4974 vector[j].u.score = 0;
4975 vector[j].u.cmpobj = NULL;
4976 j++;
4977 }
4978 } else {
4979 dict *set;
4980 dictIterator *di;
4981 dictEntry *setele;
4982
4983 if (sortval->type == REDIS_SET) {
4984 set = sortval->ptr;
4985 } else {
4986 zset *zs = sortval->ptr;
4987 set = zs->dict;
4988 }
4989
4990 di = dictGetIterator(set);
4991 while((setele = dictNext(di)) != NULL) {
4992 vector[j].obj = dictGetEntryKey(setele);
4993 vector[j].u.score = 0;
4994 vector[j].u.cmpobj = NULL;
4995 j++;
4996 }
4997 dictReleaseIterator(di);
4998 }
4999 redisAssert(j == vectorlen);
5000
5001 /* Now it's time to load the right scores in the sorting vector */
5002 if (dontsort == 0) {
5003 for (j = 0; j < vectorlen; j++) {
5004 if (sortby) {
5005 robj *byval;
5006
5007 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
5008 if (!byval || byval->type != REDIS_STRING) continue;
5009 if (alpha) {
5010 vector[j].u.cmpobj = getDecodedObject(byval);
5011 } else {
5012 if (byval->encoding == REDIS_ENCODING_RAW) {
5013 vector[j].u.score = strtod(byval->ptr,NULL);
5014 } else {
5015 /* Don't need to decode the object if it's
5016 * integer-encoded (the only encoding supported) so
5017 * far. We can just cast it */
5018 if (byval->encoding == REDIS_ENCODING_INT) {
5019 vector[j].u.score = (long)byval->ptr;
5020 } else
5021 redisAssert(1 != 1);
5022 }
5023 }
5024 } else {
5025 if (!alpha) {
5026 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
5027 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
5028 else {
5029 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
5030 vector[j].u.score = (long) vector[j].obj->ptr;
5031 else
5032 redisAssert(1 != 1);
5033 }
5034 }
5035 }
5036 }
5037 }
5038
5039 /* We are ready to sort the vector... perform a bit of sanity check
5040 * on the LIMIT option too. We'll use a partial version of quicksort. */
5041 start = (limit_start < 0) ? 0 : limit_start;
5042 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
5043 if (start >= vectorlen) {
5044 start = vectorlen-1;
5045 end = vectorlen-2;
5046 }
5047 if (end >= vectorlen) end = vectorlen-1;
5048
5049 if (dontsort == 0) {
5050 server.sort_desc = desc;
5051 server.sort_alpha = alpha;
5052 server.sort_bypattern = sortby ? 1 : 0;
5053 if (sortby && (start != 0 || end != vectorlen-1))
5054 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
5055 else
5056 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
5057 }
5058
5059 /* Send command output to the output buffer, performing the specified
5060 * GET/DEL/INCR/DECR operations if any. */
5061 outputlen = getop ? getop*(end-start+1) : end-start+1;
5062 if (storekey == NULL) {
5063 /* STORE option not specified, sent the sorting result to client */
5064 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
5065 for (j = start; j <= end; j++) {
5066 listNode *ln;
5067 if (!getop) {
5068 addReplyBulkLen(c,vector[j].obj);
5069 addReply(c,vector[j].obj);
5070 addReply(c,shared.crlf);
5071 }
5072 listRewind(operations);
5073 while((ln = listYield(operations))) {
5074 redisSortOperation *sop = ln->value;
5075 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5076 vector[j].obj);
5077
5078 if (sop->type == REDIS_SORT_GET) {
5079 if (!val || val->type != REDIS_STRING) {
5080 addReply(c,shared.nullbulk);
5081 } else {
5082 addReplyBulkLen(c,val);
5083 addReply(c,val);
5084 addReply(c,shared.crlf);
5085 }
5086 } else {
5087 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5088 }
5089 }
5090 }
5091 } else {
5092 robj *listObject = createListObject();
5093 list *listPtr = (list*) listObject->ptr;
5094
5095 /* STORE option specified, set the sorting result as a List object */
5096 for (j = start; j <= end; j++) {
5097 listNode *ln;
5098 if (!getop) {
5099 listAddNodeTail(listPtr,vector[j].obj);
5100 incrRefCount(vector[j].obj);
5101 }
5102 listRewind(operations);
5103 while((ln = listYield(operations))) {
5104 redisSortOperation *sop = ln->value;
5105 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5106 vector[j].obj);
5107
5108 if (sop->type == REDIS_SORT_GET) {
5109 if (!val || val->type != REDIS_STRING) {
5110 listAddNodeTail(listPtr,createStringObject("",0));
5111 } else {
5112 listAddNodeTail(listPtr,val);
5113 incrRefCount(val);
5114 }
5115 } else {
5116 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5117 }
5118 }
5119 }
5120 if (dictReplace(c->db->dict,storekey,listObject)) {
5121 incrRefCount(storekey);
5122 }
5123 /* Note: we add 1 because the DB is dirty anyway since even if the
5124 * SORT result is empty a new key is set and maybe the old content
5125 * replaced. */
5126 server.dirty += 1+outputlen;
5127 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5128 }
5129
5130 /* Cleanup */
5131 decrRefCount(sortval);
5132 listRelease(operations);
5133 for (j = 0; j < vectorlen; j++) {
5134 if (sortby && alpha && vector[j].u.cmpobj)
5135 decrRefCount(vector[j].u.cmpobj);
5136 }
5137 zfree(vector);
5138 }
5139
5140 /* Create the string returned by the INFO command. This is decoupled
5141 * by the INFO command itself as we need to report the same information
5142 * on memory corruption problems. */
5143 static sds genRedisInfoString(void) {
5144 sds info;
5145 time_t uptime = time(NULL)-server.stat_starttime;
5146 int j;
5147
5148 info = sdscatprintf(sdsempty(),
5149 "redis_version:%s\r\n"
5150 "arch_bits:%s\r\n"
5151 "multiplexing_api:%s\r\n"
5152 "uptime_in_seconds:%ld\r\n"
5153 "uptime_in_days:%ld\r\n"
5154 "connected_clients:%d\r\n"
5155 "connected_slaves:%d\r\n"
5156 "used_memory:%zu\r\n"
5157 "changes_since_last_save:%lld\r\n"
5158 "bgsave_in_progress:%d\r\n"
5159 "last_save_time:%ld\r\n"
5160 "bgrewriteaof_in_progress:%d\r\n"
5161 "total_connections_received:%lld\r\n"
5162 "total_commands_processed:%lld\r\n"
5163 "role:%s\r\n"
5164 ,REDIS_VERSION,
5165 (sizeof(long) == 8) ? "64" : "32",
5166 aeGetApiName(),
5167 uptime,
5168 uptime/(3600*24),
5169 listLength(server.clients)-listLength(server.slaves),
5170 listLength(server.slaves),
5171 server.usedmemory,
5172 server.dirty,
5173 server.bgsavechildpid != -1,
5174 server.lastsave,
5175 server.bgrewritechildpid != -1,
5176 server.stat_numconnections,
5177 server.stat_numcommands,
5178 server.masterhost == NULL ? "master" : "slave"
5179 );
5180 if (server.masterhost) {
5181 info = sdscatprintf(info,
5182 "master_host:%s\r\n"
5183 "master_port:%d\r\n"
5184 "master_link_status:%s\r\n"
5185 "master_last_io_seconds_ago:%d\r\n"
5186 ,server.masterhost,
5187 server.masterport,
5188 (server.replstate == REDIS_REPL_CONNECTED) ?
5189 "up" : "down",
5190 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5191 );
5192 }
5193 for (j = 0; j < server.dbnum; j++) {
5194 long long keys, vkeys;
5195
5196 keys = dictSize(server.db[j].dict);
5197 vkeys = dictSize(server.db[j].expires);
5198 if (keys || vkeys) {
5199 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5200 j, keys, vkeys);
5201 }
5202 }
5203 return info;
5204 }
5205
5206 static void infoCommand(redisClient *c) {
5207 sds info = genRedisInfoString();
5208 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5209 (unsigned long)sdslen(info)));
5210 addReplySds(c,info);
5211 addReply(c,shared.crlf);
5212 }
5213
5214 static void monitorCommand(redisClient *c) {
5215 /* ignore MONITOR if aleady slave or in monitor mode */
5216 if (c->flags & REDIS_SLAVE) return;
5217
5218 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5219 c->slaveseldb = 0;
5220 listAddNodeTail(server.monitors,c);
5221 addReply(c,shared.ok);
5222 }
5223
5224 /* ================================= Expire ================================= */
5225 static int removeExpire(redisDb *db, robj *key) {
5226 if (dictDelete(db->expires,key) == DICT_OK) {
5227 return 1;
5228 } else {
5229 return 0;
5230 }
5231 }
5232
5233 static int setExpire(redisDb *db, robj *key, time_t when) {
5234 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5235 return 0;
5236 } else {
5237 incrRefCount(key);
5238 return 1;
5239 }
5240 }
5241
5242 /* Return the expire time of the specified key, or -1 if no expire
5243 * is associated with this key (i.e. the key is non volatile) */
5244 static time_t getExpire(redisDb *db, robj *key) {
5245 dictEntry *de;
5246
5247 /* No expire? return ASAP */
5248 if (dictSize(db->expires) == 0 ||
5249 (de = dictFind(db->expires,key)) == NULL) return -1;
5250
5251 return (time_t) dictGetEntryVal(de);
5252 }
5253
5254 static int expireIfNeeded(redisDb *db, robj *key) {
5255 time_t when;
5256 dictEntry *de;
5257
5258 /* No expire? return ASAP */
5259 if (dictSize(db->expires) == 0 ||
5260 (de = dictFind(db->expires,key)) == NULL) return 0;
5261
5262 /* Lookup the expire */
5263 when = (time_t) dictGetEntryVal(de);
5264 if (time(NULL) <= when) return 0;
5265
5266 /* Delete the key */
5267 dictDelete(db->expires,key);
5268 return dictDelete(db->dict,key) == DICT_OK;
5269 }
5270
5271 static int deleteIfVolatile(redisDb *db, robj *key) {
5272 dictEntry *de;
5273
5274 /* No expire? return ASAP */
5275 if (dictSize(db->expires) == 0 ||
5276 (de = dictFind(db->expires,key)) == NULL) return 0;
5277
5278 /* Delete the key */
5279 server.dirty++;
5280 dictDelete(db->expires,key);
5281 return dictDelete(db->dict,key) == DICT_OK;
5282 }
5283
5284 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5285 dictEntry *de;
5286
5287 de = dictFind(c->db->dict,key);
5288 if (de == NULL) {
5289 addReply(c,shared.czero);
5290 return;
5291 }
5292 if (seconds < 0) {
5293 if (deleteKey(c->db,key)) server.dirty++;
5294 addReply(c, shared.cone);
5295 return;
5296 } else {
5297 time_t when = time(NULL)+seconds;
5298 if (setExpire(c->db,key,when)) {
5299 addReply(c,shared.cone);
5300 server.dirty++;
5301 } else {
5302 addReply(c,shared.czero);
5303 }
5304 return;
5305 }
5306 }
5307
5308 static void expireCommand(redisClient *c) {
5309 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5310 }
5311
5312 static void expireatCommand(redisClient *c) {
5313 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5314 }
5315
5316 static void ttlCommand(redisClient *c) {
5317 time_t expire;
5318 int ttl = -1;
5319
5320 expire = getExpire(c->db,c->argv[1]);
5321 if (expire != -1) {
5322 ttl = (int) (expire-time(NULL));
5323 if (ttl < 0) ttl = -1;
5324 }
5325 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5326 }
5327
5328 /* ================================ MULTI/EXEC ============================== */
5329
5330 /* Client state initialization for MULTI/EXEC */
5331 static void initClientMultiState(redisClient *c) {
5332 c->mstate.commands = NULL;
5333 c->mstate.count = 0;
5334 }
5335
5336 /* Release all the resources associated with MULTI/EXEC state */
5337 static void freeClientMultiState(redisClient *c) {
5338 int j;
5339
5340 for (j = 0; j < c->mstate.count; j++) {
5341 int i;
5342 multiCmd *mc = c->mstate.commands+j;
5343
5344 for (i = 0; i < mc->argc; i++)
5345 decrRefCount(mc->argv[i]);
5346 zfree(mc->argv);
5347 }
5348 zfree(c->mstate.commands);
5349 }
5350
5351 /* Add a new command into the MULTI commands queue */
5352 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd) {
5353 multiCmd *mc;
5354 int j;
5355
5356 c->mstate.commands = zrealloc(c->mstate.commands,
5357 sizeof(multiCmd)*(c->mstate.count+1));
5358 mc = c->mstate.commands+c->mstate.count;
5359 mc->cmd = cmd;
5360 mc->argc = c->argc;
5361 mc->argv = zmalloc(sizeof(robj*)*c->argc);
5362 memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
5363 for (j = 0; j < c->argc; j++)
5364 incrRefCount(mc->argv[j]);
5365 c->mstate.count++;
5366 }
5367
5368 static void multiCommand(redisClient *c) {
5369 c->flags |= REDIS_MULTI;
5370 addReply(c,shared.ok);
5371 }
5372
5373 static void execCommand(redisClient *c) {
5374 int j;
5375 robj **orig_argv;
5376 int orig_argc;
5377
5378 if (!(c->flags & REDIS_MULTI)) {
5379 addReplySds(c,sdsnew("-ERR EXEC without MULTI\r\n"));
5380 return;
5381 }
5382
5383 orig_argv = c->argv;
5384 orig_argc = c->argc;
5385 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
5386 for (j = 0; j < c->mstate.count; j++) {
5387 c->argc = c->mstate.commands[j].argc;
5388 c->argv = c->mstate.commands[j].argv;
5389 call(c,c->mstate.commands[j].cmd);
5390 }
5391 c->argv = orig_argv;
5392 c->argc = orig_argc;
5393 freeClientMultiState(c);
5394 initClientMultiState(c);
5395 c->flags &= (~REDIS_MULTI);
5396 }
5397
5398 /* =============================== Replication ============================= */
5399
5400 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5401 ssize_t nwritten, ret = size;
5402 time_t start = time(NULL);
5403
5404 timeout++;
5405 while(size) {
5406 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5407 nwritten = write(fd,ptr,size);
5408 if (nwritten == -1) return -1;
5409 ptr += nwritten;
5410 size -= nwritten;
5411 }
5412 if ((time(NULL)-start) > timeout) {
5413 errno = ETIMEDOUT;
5414 return -1;
5415 }
5416 }
5417 return ret;
5418 }
5419
5420 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5421 ssize_t nread, totread = 0;
5422 time_t start = time(NULL);
5423
5424 timeout++;
5425 while(size) {
5426 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5427 nread = read(fd,ptr,size);
5428 if (nread == -1) return -1;
5429 ptr += nread;
5430 size -= nread;
5431 totread += nread;
5432 }
5433 if ((time(NULL)-start) > timeout) {
5434 errno = ETIMEDOUT;
5435 return -1;
5436 }
5437 }
5438 return totread;
5439 }
5440
5441 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5442 ssize_t nread = 0;
5443
5444 size--;
5445 while(size) {
5446 char c;
5447
5448 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5449 if (c == '\n') {
5450 *ptr = '\0';
5451 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5452 return nread;
5453 } else {
5454 *ptr++ = c;
5455 *ptr = '\0';
5456 nread++;
5457 }
5458 }
5459 return nread;
5460 }
5461
5462 static void syncCommand(redisClient *c) {
5463 /* ignore SYNC if aleady slave or in monitor mode */
5464 if (c->flags & REDIS_SLAVE) return;
5465
5466 /* SYNC can't be issued when the server has pending data to send to
5467 * the client about already issued commands. We need a fresh reply
5468 * buffer registering the differences between the BGSAVE and the current
5469 * dataset, so that we can copy to other slaves if needed. */
5470 if (listLength(c->reply) != 0) {
5471 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5472 return;
5473 }
5474
5475 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5476 /* Here we need to check if there is a background saving operation
5477 * in progress, or if it is required to start one */
5478 if (server.bgsavechildpid != -1) {
5479 /* Ok a background save is in progress. Let's check if it is a good
5480 * one for replication, i.e. if there is another slave that is
5481 * registering differences since the server forked to save */
5482 redisClient *slave;
5483 listNode *ln;
5484
5485 listRewind(server.slaves);
5486 while((ln = listYield(server.slaves))) {
5487 slave = ln->value;
5488 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5489 }
5490 if (ln) {
5491 /* Perfect, the server is already registering differences for
5492 * another slave. Set the right state, and copy the buffer. */
5493 listRelease(c->reply);
5494 c->reply = listDup(slave->reply);
5495 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5496 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5497 } else {
5498 /* No way, we need to wait for the next BGSAVE in order to
5499 * register differences */
5500 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5501 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5502 }
5503 } else {
5504 /* Ok we don't have a BGSAVE in progress, let's start one */
5505 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5506 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5507 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5508 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5509 return;
5510 }
5511 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5512 }
5513 c->repldbfd = -1;
5514 c->flags |= REDIS_SLAVE;
5515 c->slaveseldb = 0;
5516 listAddNodeTail(server.slaves,c);
5517 return;
5518 }
5519
5520 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5521 redisClient *slave = privdata;
5522 REDIS_NOTUSED(el);
5523 REDIS_NOTUSED(mask);
5524 char buf[REDIS_IOBUF_LEN];
5525 ssize_t nwritten, buflen;
5526
5527 if (slave->repldboff == 0) {
5528 /* Write the bulk write count before to transfer the DB. In theory here
5529 * we don't know how much room there is in the output buffer of the
5530 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5531 * operations) will never be smaller than the few bytes we need. */
5532 sds bulkcount;
5533
5534 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5535 slave->repldbsize);
5536 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5537 {
5538 sdsfree(bulkcount);
5539 freeClient(slave);
5540 return;
5541 }
5542 sdsfree(bulkcount);
5543 }
5544 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5545 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5546 if (buflen <= 0) {
5547 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5548 (buflen == 0) ? "premature EOF" : strerror(errno));
5549 freeClient(slave);
5550 return;
5551 }
5552 if ((nwritten = write(fd,buf,buflen)) == -1) {
5553 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5554 strerror(errno));
5555 freeClient(slave);
5556 return;
5557 }
5558 slave->repldboff += nwritten;
5559 if (slave->repldboff == slave->repldbsize) {
5560 close(slave->repldbfd);
5561 slave->repldbfd = -1;
5562 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5563 slave->replstate = REDIS_REPL_ONLINE;
5564 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5565 sendReplyToClient, slave) == AE_ERR) {
5566 freeClient(slave);
5567 return;
5568 }
5569 addReplySds(slave,sdsempty());
5570 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5571 }
5572 }
5573
5574 /* This function is called at the end of every backgrond saving.
5575 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5576 * otherwise REDIS_ERR is passed to the function.
5577 *
5578 * The goal of this function is to handle slaves waiting for a successful
5579 * background saving in order to perform non-blocking synchronization. */
5580 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5581 listNode *ln;
5582 int startbgsave = 0;
5583
5584 listRewind(server.slaves);
5585 while((ln = listYield(server.slaves))) {
5586 redisClient *slave = ln->value;
5587
5588 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5589 startbgsave = 1;
5590 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5591 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5592 struct redis_stat buf;
5593
5594 if (bgsaveerr != REDIS_OK) {
5595 freeClient(slave);
5596 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5597 continue;
5598 }
5599 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5600 redis_fstat(slave->repldbfd,&buf) == -1) {
5601 freeClient(slave);
5602 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5603 continue;
5604 }
5605 slave->repldboff = 0;
5606 slave->repldbsize = buf.st_size;
5607 slave->replstate = REDIS_REPL_SEND_BULK;
5608 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5609 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5610 freeClient(slave);
5611 continue;
5612 }
5613 }
5614 }
5615 if (startbgsave) {
5616 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5617 listRewind(server.slaves);
5618 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5619 while((ln = listYield(server.slaves))) {
5620 redisClient *slave = ln->value;
5621
5622 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5623 freeClient(slave);
5624 }
5625 }
5626 }
5627 }
5628
5629 static int syncWithMaster(void) {
5630 char buf[1024], tmpfile[256], authcmd[1024];
5631 int dumpsize;
5632 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5633 int dfd;
5634
5635 if (fd == -1) {
5636 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5637 strerror(errno));
5638 return REDIS_ERR;
5639 }
5640
5641 /* AUTH with the master if required. */
5642 if(server.masterauth) {
5643 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5644 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5645 close(fd);
5646 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5647 strerror(errno));
5648 return REDIS_ERR;
5649 }
5650 /* Read the AUTH result. */
5651 if (syncReadLine(fd,buf,1024,3600) == -1) {
5652 close(fd);
5653 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5654 strerror(errno));
5655 return REDIS_ERR;
5656 }
5657 if (buf[0] != '+') {
5658 close(fd);
5659 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5660 return REDIS_ERR;
5661 }
5662 }
5663
5664 /* Issue the SYNC command */
5665 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5666 close(fd);
5667 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5668 strerror(errno));
5669 return REDIS_ERR;
5670 }
5671 /* Read the bulk write count */
5672 if (syncReadLine(fd,buf,1024,3600) == -1) {
5673 close(fd);
5674 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5675 strerror(errno));
5676 return REDIS_ERR;
5677 }
5678 if (buf[0] != '$') {
5679 close(fd);
5680 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5681 return REDIS_ERR;
5682 }
5683 dumpsize = atoi(buf+1);
5684 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5685 /* Read the bulk write data on a temp file */
5686 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5687 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5688 if (dfd == -1) {
5689 close(fd);
5690 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5691 return REDIS_ERR;
5692 }
5693 while(dumpsize) {
5694 int nread, nwritten;
5695
5696 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5697 if (nread == -1) {
5698 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5699 strerror(errno));
5700 close(fd);
5701 close(dfd);
5702 return REDIS_ERR;
5703 }
5704 nwritten = write(dfd,buf,nread);
5705 if (nwritten == -1) {
5706 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5707 close(fd);
5708 close(dfd);
5709 return REDIS_ERR;
5710 }
5711 dumpsize -= nread;
5712 }
5713 close(dfd);
5714 if (rename(tmpfile,server.dbfilename) == -1) {
5715 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5716 unlink(tmpfile);
5717 close(fd);
5718 return REDIS_ERR;
5719 }
5720 emptyDb();
5721 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5722 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5723 close(fd);
5724 return REDIS_ERR;
5725 }
5726 server.master = createClient(fd);
5727 server.master->flags |= REDIS_MASTER;
5728 server.master->authenticated = 1;
5729 server.replstate = REDIS_REPL_CONNECTED;
5730 return REDIS_OK;
5731 }
5732
5733 static void slaveofCommand(redisClient *c) {
5734 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5735 !strcasecmp(c->argv[2]->ptr,"one")) {
5736 if (server.masterhost) {
5737 sdsfree(server.masterhost);
5738 server.masterhost = NULL;
5739 if (server.master) freeClient(server.master);
5740 server.replstate = REDIS_REPL_NONE;
5741 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5742 }
5743 } else {
5744 sdsfree(server.masterhost);
5745 server.masterhost = sdsdup(c->argv[1]->ptr);
5746 server.masterport = atoi(c->argv[2]->ptr);
5747 if (server.master) freeClient(server.master);
5748 server.replstate = REDIS_REPL_CONNECT;
5749 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5750 server.masterhost, server.masterport);
5751 }
5752 addReply(c,shared.ok);
5753 }
5754
5755 /* ============================ Maxmemory directive ======================== */
5756
5757 /* This function gets called when 'maxmemory' is set on the config file to limit
5758 * the max memory used by the server, and we are out of memory.
5759 * This function will try to, in order:
5760 *
5761 * - Free objects from the free list
5762 * - Try to remove keys with an EXPIRE set
5763 *
5764 * It is not possible to free enough memory to reach used-memory < maxmemory
5765 * the server will start refusing commands that will enlarge even more the
5766 * memory usage.
5767 */
5768 static void freeMemoryIfNeeded(void) {
5769 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5770 if (listLength(server.objfreelist)) {
5771 robj *o;
5772
5773 listNode *head = listFirst(server.objfreelist);
5774 o = listNodeValue(head);
5775 listDelNode(server.objfreelist,head);
5776 zfree(o);
5777 } else {
5778 int j, k, freed = 0;
5779
5780 for (j = 0; j < server.dbnum; j++) {
5781 int minttl = -1;
5782 robj *minkey = NULL;
5783 struct dictEntry *de;
5784
5785 if (dictSize(server.db[j].expires)) {
5786 freed = 1;
5787 /* From a sample of three keys drop the one nearest to
5788 * the natural expire */
5789 for (k = 0; k < 3; k++) {
5790 time_t t;
5791
5792 de = dictGetRandomKey(server.db[j].expires);
5793 t = (time_t) dictGetEntryVal(de);
5794 if (minttl == -1 || t < minttl) {
5795 minkey = dictGetEntryKey(de);
5796 minttl = t;
5797 }
5798 }
5799 deleteKey(server.db+j,minkey);
5800 }
5801 }
5802 if (!freed) return; /* nothing to free... */
5803 }
5804 }
5805 }
5806
5807 /* ============================== Append Only file ========================== */
5808
5809 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5810 sds buf = sdsempty();
5811 int j;
5812 ssize_t nwritten;
5813 time_t now;
5814 robj *tmpargv[3];
5815
5816 /* The DB this command was targetting is not the same as the last command
5817 * we appendend. To issue a SELECT command is needed. */
5818 if (dictid != server.appendseldb) {
5819 char seldb[64];
5820
5821 snprintf(seldb,sizeof(seldb),"%d",dictid);
5822 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
5823 (unsigned long)strlen(seldb),seldb);
5824 server.appendseldb = dictid;
5825 }
5826
5827 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5828 * EXPIREs into EXPIREATs calls */
5829 if (cmd->proc == expireCommand) {
5830 long when;
5831
5832 tmpargv[0] = createStringObject("EXPIREAT",8);
5833 tmpargv[1] = argv[1];
5834 incrRefCount(argv[1]);
5835 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5836 tmpargv[2] = createObject(REDIS_STRING,
5837 sdscatprintf(sdsempty(),"%ld",when));
5838 argv = tmpargv;
5839 }
5840
5841 /* Append the actual command */
5842 buf = sdscatprintf(buf,"*%d\r\n",argc);
5843 for (j = 0; j < argc; j++) {
5844 robj *o = argv[j];
5845
5846 o = getDecodedObject(o);
5847 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
5848 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5849 buf = sdscatlen(buf,"\r\n",2);
5850 decrRefCount(o);
5851 }
5852
5853 /* Free the objects from the modified argv for EXPIREAT */
5854 if (cmd->proc == expireCommand) {
5855 for (j = 0; j < 3; j++)
5856 decrRefCount(argv[j]);
5857 }
5858
5859 /* We want to perform a single write. This should be guaranteed atomic
5860 * at least if the filesystem we are writing is a real physical one.
5861 * While this will save us against the server being killed I don't think
5862 * there is much to do about the whole server stopping for power problems
5863 * or alike */
5864 nwritten = write(server.appendfd,buf,sdslen(buf));
5865 if (nwritten != (signed)sdslen(buf)) {
5866 /* Ooops, we are in troubles. The best thing to do for now is
5867 * to simply exit instead to give the illusion that everything is
5868 * working as expected. */
5869 if (nwritten == -1) {
5870 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5871 } else {
5872 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5873 }
5874 exit(1);
5875 }
5876 /* If a background append only file rewriting is in progress we want to
5877 * accumulate the differences between the child DB and the current one
5878 * in a buffer, so that when the child process will do its work we
5879 * can append the differences to the new append only file. */
5880 if (server.bgrewritechildpid != -1)
5881 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
5882
5883 sdsfree(buf);
5884 now = time(NULL);
5885 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5886 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5887 now-server.lastfsync > 1))
5888 {
5889 fsync(server.appendfd); /* Let's try to get this data on the disk */
5890 server.lastfsync = now;
5891 }
5892 }
5893
5894 /* In Redis commands are always executed in the context of a client, so in
5895 * order to load the append only file we need to create a fake client. */
5896 static struct redisClient *createFakeClient(void) {
5897 struct redisClient *c = zmalloc(sizeof(*c));
5898
5899 selectDb(c,0);
5900 c->fd = -1;
5901 c->querybuf = sdsempty();
5902 c->argc = 0;
5903 c->argv = NULL;
5904 c->flags = 0;
5905 /* We set the fake client as a slave waiting for the synchronization
5906 * so that Redis will not try to send replies to this client. */
5907 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5908 c->reply = listCreate();
5909 listSetFreeMethod(c->reply,decrRefCount);
5910 listSetDupMethod(c->reply,dupClientReplyValue);
5911 return c;
5912 }
5913
5914 static void freeFakeClient(struct redisClient *c) {
5915 sdsfree(c->querybuf);
5916 listRelease(c->reply);
5917 zfree(c);
5918 }
5919
5920 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5921 * error (the append only file is zero-length) REDIS_ERR is returned. On
5922 * fatal error an error message is logged and the program exists. */
5923 int loadAppendOnlyFile(char *filename) {
5924 struct redisClient *fakeClient;
5925 FILE *fp = fopen(filename,"r");
5926 struct redis_stat sb;
5927
5928 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5929 return REDIS_ERR;
5930
5931 if (fp == NULL) {
5932 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5933 exit(1);
5934 }
5935
5936 fakeClient = createFakeClient();
5937 while(1) {
5938 int argc, j;
5939 unsigned long len;
5940 robj **argv;
5941 char buf[128];
5942 sds argsds;
5943 struct redisCommand *cmd;
5944
5945 if (fgets(buf,sizeof(buf),fp) == NULL) {
5946 if (feof(fp))
5947 break;
5948 else
5949 goto readerr;
5950 }
5951 if (buf[0] != '*') goto fmterr;
5952 argc = atoi(buf+1);
5953 argv = zmalloc(sizeof(robj*)*argc);
5954 for (j = 0; j < argc; j++) {
5955 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5956 if (buf[0] != '$') goto fmterr;
5957 len = strtol(buf+1,NULL,10);
5958 argsds = sdsnewlen(NULL,len);
5959 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
5960 argv[j] = createObject(REDIS_STRING,argsds);
5961 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5962 }
5963
5964 /* Command lookup */
5965 cmd = lookupCommand(argv[0]->ptr);
5966 if (!cmd) {
5967 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5968 exit(1);
5969 }
5970 /* Try object sharing and encoding */
5971 if (server.shareobjects) {
5972 int j;
5973 for(j = 1; j < argc; j++)
5974 argv[j] = tryObjectSharing(argv[j]);
5975 }
5976 if (cmd->flags & REDIS_CMD_BULK)
5977 tryObjectEncoding(argv[argc-1]);
5978 /* Run the command in the context of a fake client */
5979 fakeClient->argc = argc;
5980 fakeClient->argv = argv;
5981 cmd->proc(fakeClient);
5982 /* Discard the reply objects list from the fake client */
5983 while(listLength(fakeClient->reply))
5984 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5985 /* Clean up, ready for the next command */
5986 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5987 zfree(argv);
5988 }
5989 fclose(fp);
5990 freeFakeClient(fakeClient);
5991 return REDIS_OK;
5992
5993 readerr:
5994 if (feof(fp)) {
5995 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5996 } else {
5997 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5998 }
5999 exit(1);
6000 fmterr:
6001 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
6002 exit(1);
6003 }
6004
6005 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6006 static int fwriteBulk(FILE *fp, robj *obj) {
6007 char buf[128];
6008 obj = getDecodedObject(obj);
6009 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
6010 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
6011 if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0)
6012 goto err;
6013 if (fwrite("\r\n",2,1,fp) == 0) goto err;
6014 decrRefCount(obj);
6015 return 1;
6016 err:
6017 decrRefCount(obj);
6018 return 0;
6019 }
6020
6021 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6022 static int fwriteBulkDouble(FILE *fp, double d) {
6023 char buf[128], dbuf[128];
6024
6025 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
6026 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
6027 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6028 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
6029 return 1;
6030 }
6031
6032 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6033 static int fwriteBulkLong(FILE *fp, long l) {
6034 char buf[128], lbuf[128];
6035
6036 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
6037 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
6038 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6039 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
6040 return 1;
6041 }
6042
6043 /* Write a sequence of commands able to fully rebuild the dataset into
6044 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6045 static int rewriteAppendOnlyFile(char *filename) {
6046 dictIterator *di = NULL;
6047 dictEntry *de;
6048 FILE *fp;
6049 char tmpfile[256];
6050 int j;
6051 time_t now = time(NULL);
6052
6053 /* Note that we have to use a different temp name here compared to the
6054 * one used by rewriteAppendOnlyFileBackground() function. */
6055 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
6056 fp = fopen(tmpfile,"w");
6057 if (!fp) {
6058 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
6059 return REDIS_ERR;
6060 }
6061 for (j = 0; j < server.dbnum; j++) {
6062 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
6063 redisDb *db = server.db+j;
6064 dict *d = db->dict;
6065 if (dictSize(d) == 0) continue;
6066 di = dictGetIterator(d);
6067 if (!di) {
6068 fclose(fp);
6069 return REDIS_ERR;
6070 }
6071
6072 /* SELECT the new DB */
6073 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
6074 if (fwriteBulkLong(fp,j) == 0) goto werr;
6075
6076 /* Iterate this DB writing every entry */
6077 while((de = dictNext(di)) != NULL) {
6078 robj *key = dictGetEntryKey(de);
6079 robj *o = dictGetEntryVal(de);
6080 time_t expiretime = getExpire(db,key);
6081
6082 /* Save the key and associated value */
6083 if (o->type == REDIS_STRING) {
6084 /* Emit a SET command */
6085 char cmd[]="*3\r\n$3\r\nSET\r\n";
6086 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6087 /* Key and value */
6088 if (fwriteBulk(fp,key) == 0) goto werr;
6089 if (fwriteBulk(fp,o) == 0) goto werr;
6090 } else if (o->type == REDIS_LIST) {
6091 /* Emit the RPUSHes needed to rebuild the list */
6092 list *list = o->ptr;
6093 listNode *ln;
6094
6095 listRewind(list);
6096 while((ln = listYield(list))) {
6097 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
6098 robj *eleobj = listNodeValue(ln);
6099
6100 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6101 if (fwriteBulk(fp,key) == 0) goto werr;
6102 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6103 }
6104 } else if (o->type == REDIS_SET) {
6105 /* Emit the SADDs needed to rebuild the set */
6106 dict *set = o->ptr;
6107 dictIterator *di = dictGetIterator(set);
6108 dictEntry *de;
6109
6110 while((de = dictNext(di)) != NULL) {
6111 char cmd[]="*3\r\n$4\r\nSADD\r\n";
6112 robj *eleobj = dictGetEntryKey(de);
6113
6114 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6115 if (fwriteBulk(fp,key) == 0) goto werr;
6116 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6117 }
6118 dictReleaseIterator(di);
6119 } else if (o->type == REDIS_ZSET) {
6120 /* Emit the ZADDs needed to rebuild the sorted set */
6121 zset *zs = o->ptr;
6122 dictIterator *di = dictGetIterator(zs->dict);
6123 dictEntry *de;
6124
6125 while((de = dictNext(di)) != NULL) {
6126 char cmd[]="*4\r\n$4\r\nZADD\r\n";
6127 robj *eleobj = dictGetEntryKey(de);
6128 double *score = dictGetEntryVal(de);
6129
6130 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6131 if (fwriteBulk(fp,key) == 0) goto werr;
6132 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
6133 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6134 }
6135 dictReleaseIterator(di);
6136 } else {
6137 redisAssert(0 != 0);
6138 }
6139 /* Save the expire time */
6140 if (expiretime != -1) {
6141 char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
6142 /* If this key is already expired skip it */
6143 if (expiretime < now) continue;
6144 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6145 if (fwriteBulk(fp,key) == 0) goto werr;
6146 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6147 }
6148 }
6149 dictReleaseIterator(di);
6150 }
6151
6152 /* Make sure data will not remain on the OS's output buffers */
6153 fflush(fp);
6154 fsync(fileno(fp));
6155 fclose(fp);
6156
6157 /* Use RENAME to make sure the DB file is changed atomically only
6158 * if the generate DB file is ok. */
6159 if (rename(tmpfile,filename) == -1) {
6160 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6161 unlink(tmpfile);
6162 return REDIS_ERR;
6163 }
6164 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6165 return REDIS_OK;
6166
6167 werr:
6168 fclose(fp);
6169 unlink(tmpfile);
6170 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
6171 if (di) dictReleaseIterator(di);
6172 return REDIS_ERR;
6173 }
6174
6175 /* This is how rewriting of the append only file in background works:
6176 *
6177 * 1) The user calls BGREWRITEAOF
6178 * 2) Redis calls this function, that forks():
6179 * 2a) the child rewrite the append only file in a temp file.
6180 * 2b) the parent accumulates differences in server.bgrewritebuf.
6181 * 3) When the child finished '2a' exists.
6182 * 4) The parent will trap the exit code, if it's OK, will append the
6183 * data accumulated into server.bgrewritebuf into the temp file, and
6184 * finally will rename(2) the temp file in the actual file name.
6185 * The the new file is reopened as the new append only file. Profit!
6186 */
6187 static int rewriteAppendOnlyFileBackground(void) {
6188 pid_t childpid;
6189
6190 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6191 if ((childpid = fork()) == 0) {
6192 /* Child */
6193 char tmpfile[256];
6194 close(server.fd);
6195
6196 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6197 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6198 exit(0);
6199 } else {
6200 exit(1);
6201 }
6202 } else {
6203 /* Parent */
6204 if (childpid == -1) {
6205 redisLog(REDIS_WARNING,
6206 "Can't rewrite append only file in background: fork: %s",
6207 strerror(errno));
6208 return REDIS_ERR;
6209 }
6210 redisLog(REDIS_NOTICE,
6211 "Background append only file rewriting started by pid %d",childpid);
6212 server.bgrewritechildpid = childpid;
6213 /* We set appendseldb to -1 in order to force the next call to the
6214 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6215 * accumulated by the parent into server.bgrewritebuf will start
6216 * with a SELECT statement and it will be safe to merge. */
6217 server.appendseldb = -1;
6218 return REDIS_OK;
6219 }
6220 return REDIS_OK; /* unreached */
6221 }
6222
6223 static void bgrewriteaofCommand(redisClient *c) {
6224 if (server.bgrewritechildpid != -1) {
6225 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6226 return;
6227 }
6228 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6229 char *status = "+Background append only file rewriting started\r\n";
6230 addReplySds(c,sdsnew(status));
6231 } else {
6232 addReply(c,shared.err);
6233 }
6234 }
6235
6236 static void aofsyncCommand(redisClient *c) {
6237 if (server.appendonly)
6238 fsync(server.appendfd);
6239 else
6240 addReplySds(c,sdsnew("-ERR Append Only File is not active\r\n"));
6241 }
6242
6243 static void aofRemoveTempFile(pid_t childpid) {
6244 char tmpfile[256];
6245
6246 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6247 unlink(tmpfile);
6248 }
6249
6250 /* ================================= Debugging ============================== */
6251
6252 static void debugCommand(redisClient *c) {
6253 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6254 *((char*)-1) = 'x';
6255 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6256 if (rdbSave(server.dbfilename) != REDIS_OK) {
6257 addReply(c,shared.err);
6258 return;
6259 }
6260 emptyDb();
6261 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6262 addReply(c,shared.err);
6263 return;
6264 }
6265 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6266 addReply(c,shared.ok);
6267 } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
6268 emptyDb();
6269 if (loadAppendOnlyFile(server.appendfilename) != REDIS_OK) {
6270 addReply(c,shared.err);
6271 return;
6272 }
6273 redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
6274 addReply(c,shared.ok);
6275 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6276 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6277 robj *key, *val;
6278
6279 if (!de) {
6280 addReply(c,shared.nokeyerr);
6281 return;
6282 }
6283 key = dictGetEntryKey(de);
6284 val = dictGetEntryVal(de);
6285 addReplySds(c,sdscatprintf(sdsempty(),
6286 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6287 (void*)key, key->refcount, (void*)val, val->refcount,
6288 val->encoding));
6289 } else {
6290 addReplySds(c,sdsnew(
6291 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6292 }
6293 }
6294
6295 static void _redisAssert(char *estr) {
6296 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6297 redisLog(REDIS_WARNING,"==> %s\n",estr);
6298 #ifdef HAVE_BACKTRACE
6299 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6300 *((char*)-1) = 'x';
6301 #endif
6302 }
6303
6304 /* =================================== Main! ================================ */
6305
6306 #ifdef __linux__
6307 int linuxOvercommitMemoryValue(void) {
6308 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6309 char buf[64];
6310
6311 if (!fp) return -1;
6312 if (fgets(buf,64,fp) == NULL) {
6313 fclose(fp);
6314 return -1;
6315 }
6316 fclose(fp);
6317
6318 return atoi(buf);
6319 }
6320
6321 void linuxOvercommitMemoryWarning(void) {
6322 if (linuxOvercommitMemoryValue() == 0) {
6323 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6324 }
6325 }
6326 #endif /* __linux__ */
6327
6328 static void daemonize(void) {
6329 int fd;
6330 FILE *fp;
6331
6332 if (fork() != 0) exit(0); /* parent exits */
6333 printf("New pid: %d\n", getpid());
6334 setsid(); /* create a new session */
6335
6336 /* Every output goes to /dev/null. If Redis is daemonized but
6337 * the 'logfile' is set to 'stdout' in the configuration file
6338 * it will not log at all. */
6339 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6340 dup2(fd, STDIN_FILENO);
6341 dup2(fd, STDOUT_FILENO);
6342 dup2(fd, STDERR_FILENO);
6343 if (fd > STDERR_FILENO) close(fd);
6344 }
6345 /* Try to write the pid file */
6346 fp = fopen(server.pidfile,"w");
6347 if (fp) {
6348 fprintf(fp,"%d\n",getpid());
6349 fclose(fp);
6350 }
6351 }
6352
6353 int main(int argc, char **argv) {
6354 initServerConfig();
6355 if (argc == 2) {
6356 resetServerSaveParams();
6357 loadServerConfig(argv[1]);
6358 } else if (argc > 2) {
6359 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6360 exit(1);
6361 } else {
6362 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6363 }
6364 if (server.daemonize) daemonize();
6365 initServer();
6366 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6367 #ifdef __linux__
6368 linuxOvercommitMemoryWarning();
6369 #endif
6370 if (server.appendonly) {
6371 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6372 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6373 } else {
6374 if (rdbLoad(server.dbfilename) == REDIS_OK)
6375 redisLog(REDIS_NOTICE,"DB loaded from disk");
6376 }
6377 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6378 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6379 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6380 aeMain(server.el);
6381 aeDeleteEventLoop(server.el);
6382 return 0;
6383 }
6384
6385 /* ============================= Backtrace support ========================= */
6386
6387 #ifdef HAVE_BACKTRACE
6388 static char *findFuncName(void *pointer, unsigned long *offset);
6389
6390 static void *getMcontextEip(ucontext_t *uc) {
6391 #if defined(__FreeBSD__)
6392 return (void*) uc->uc_mcontext.mc_eip;
6393 #elif defined(__dietlibc__)
6394 return (void*) uc->uc_mcontext.eip;
6395 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6396 #if __x86_64__
6397 return (void*) uc->uc_mcontext->__ss.__rip;
6398 #else
6399 return (void*) uc->uc_mcontext->__ss.__eip;
6400 #endif
6401 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6402 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6403 return (void*) uc->uc_mcontext->__ss.__rip;
6404 #else
6405 return (void*) uc->uc_mcontext->__ss.__eip;
6406 #endif
6407 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6408 return (void*) uc->uc_mcontext.gregs[REG_EIP];
6409 #elif defined(__ia64__) /* Linux IA64 */
6410 return (void*) uc->uc_mcontext.sc_ip;
6411 #else
6412 return NULL;
6413 #endif
6414 }
6415
6416 static void segvHandler(int sig, siginfo_t *info, void *secret) {
6417 void *trace[100];
6418 char **messages = NULL;
6419 int i, trace_size = 0;
6420 unsigned long offset=0;
6421 ucontext_t *uc = (ucontext_t*) secret;
6422 sds infostring;
6423 REDIS_NOTUSED(info);
6424
6425 redisLog(REDIS_WARNING,
6426 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6427 infostring = genRedisInfoString();
6428 redisLog(REDIS_WARNING, "%s",infostring);
6429 /* It's not safe to sdsfree() the returned string under memory
6430 * corruption conditions. Let it leak as we are going to abort */
6431
6432 trace_size = backtrace(trace, 100);
6433 /* overwrite sigaction with caller's address */
6434 if (getMcontextEip(uc) != NULL) {
6435 trace[1] = getMcontextEip(uc);
6436 }
6437 messages = backtrace_symbols(trace, trace_size);
6438
6439 for (i=1; i<trace_size; ++i) {
6440 char *fn = findFuncName(trace[i], &offset), *p;
6441
6442 p = strchr(messages[i],'+');
6443 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6444 redisLog(REDIS_WARNING,"%s", messages[i]);
6445 } else {
6446 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6447 }
6448 }
6449 // free(messages); Don't call free() with possibly corrupted memory.
6450 exit(0);
6451 }
6452
6453 static void setupSigSegvAction(void) {
6454 struct sigaction act;
6455
6456 sigemptyset (&act.sa_mask);
6457 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6458 * is used. Otherwise, sa_handler is used */
6459 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6460 act.sa_sigaction = segvHandler;
6461 sigaction (SIGSEGV, &act, NULL);
6462 sigaction (SIGBUS, &act, NULL);
6463 sigaction (SIGFPE, &act, NULL);
6464 sigaction (SIGILL, &act, NULL);
6465 sigaction (SIGBUS, &act, NULL);
6466 return;
6467 }
6468
6469 #include "staticsymbols.h"
6470 /* This function try to convert a pointer into a function name. It's used in
6471 * oreder to provide a backtrace under segmentation fault that's able to
6472 * display functions declared as static (otherwise the backtrace is useless). */
6473 static char *findFuncName(void *pointer, unsigned long *offset){
6474 int i, ret = -1;
6475 unsigned long off, minoff = 0;
6476
6477 /* Try to match against the Symbol with the smallest offset */
6478 for (i=0; symsTable[i].pointer; i++) {
6479 unsigned long lp = (unsigned long) pointer;
6480
6481 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6482 off=lp-symsTable[i].pointer;
6483 if (ret < 0 || off < minoff) {
6484 minoff=off;
6485 ret=i;
6486 }
6487 }
6488 }
6489 if (ret == -1) return NULL;
6490 *offset = minoff;
6491 return symsTable[ret].name;
6492 }
6493 #else /* HAVE_BACKTRACE */
6494 static void setupSigSegvAction(void) {
6495 }
6496 #endif /* HAVE_BACKTRACE */
6497
6498
6499
6500 /* The End */
6501
6502
6503