]> git.saurik.com Git - redis.git/blob - redis.c
bfdc696cf7162ba352ee56d7dad125d8677a6da5
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.1.95"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Client flags */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
165
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
174
175 /* List related stuff */
176 #define REDIS_HEAD 0
177 #define REDIS_TAIL 1
178
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
184
185 /* Log levels */
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
189
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
192
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
195
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
200
201 /* We can print the stacktrace, so our assert is defined this way: */
202 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
203 static void _redisAssert(char *estr);
204
205 /*================================= Data types ============================== */
206
207 /* A redis object, that is a type able to hold a string / list / set */
208 typedef struct redisObject {
209 void *ptr;
210 unsigned char type;
211 unsigned char encoding;
212 unsigned char notused[2];
213 int refcount;
214 } robj;
215
216 /* Macro used to initalize a Redis object allocated on the stack.
217 * Note that this macro is taken near the structure definition to make sure
218 * we'll update it when the structure is changed, to avoid bugs like
219 * bug #85 introduced exactly in this way. */
220 #define initStaticStringObject(_var,_ptr) do { \
221 _var.refcount = 1; \
222 _var.type = REDIS_STRING; \
223 _var.encoding = REDIS_ENCODING_RAW; \
224 _var.ptr = _ptr; \
225 } while(0);
226
227 typedef struct redisDb {
228 dict *dict;
229 dict *expires;
230 int id;
231 } redisDb;
232
233 /* With multiplexing we need to take per-clinet state.
234 * Clients are taken in a liked list. */
235 typedef struct redisClient {
236 int fd;
237 redisDb *db;
238 int dictid;
239 sds querybuf;
240 robj **argv, **mbargv;
241 int argc, mbargc;
242 int bulklen; /* bulk read len. -1 if not in bulk read mode */
243 int multibulk; /* multi bulk command format active */
244 list *reply;
245 int sentlen;
246 time_t lastinteraction; /* time of the last interaction, used for timeout */
247 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
248 int slaveseldb; /* slave selected db, if this client is a slave */
249 int authenticated; /* when requirepass is non-NULL */
250 int replstate; /* replication state if this is a slave */
251 int repldbfd; /* replication DB file descriptor */
252 long repldboff; /* replication DB file offset */
253 off_t repldbsize; /* replication DB file size */
254 } redisClient;
255
256 struct saveparam {
257 time_t seconds;
258 int changes;
259 };
260
261 /* Global server state structure */
262 struct redisServer {
263 int port;
264 int fd;
265 redisDb *db;
266 dict *sharingpool;
267 unsigned int sharingpoolsize;
268 long long dirty; /* changes to DB from the last save */
269 list *clients;
270 list *slaves, *monitors;
271 char neterr[ANET_ERR_LEN];
272 aeEventLoop *el;
273 int cronloops; /* number of times the cron function run */
274 list *objfreelist; /* A list of freed objects to avoid malloc() */
275 time_t lastsave; /* Unix time of last save succeeede */
276 size_t usedmemory; /* Used memory in megabytes */
277 /* Fields used only for stats */
278 time_t stat_starttime; /* server start time */
279 long long stat_numcommands; /* number of processed commands */
280 long long stat_numconnections; /* number of connections received */
281 /* Configuration */
282 int verbosity;
283 int glueoutputbuf;
284 int maxidletime;
285 int dbnum;
286 int daemonize;
287 int appendonly;
288 int appendfsync;
289 time_t lastfsync;
290 int appendfd;
291 int appendseldb;
292 char *pidfile;
293 pid_t bgsavechildpid;
294 pid_t bgrewritechildpid;
295 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
296 struct saveparam *saveparams;
297 int saveparamslen;
298 char *logfile;
299 char *bindaddr;
300 char *dbfilename;
301 char *appendfilename;
302 char *requirepass;
303 int shareobjects;
304 int rdbcompression;
305 /* Replication related */
306 int isslave;
307 char *masterauth;
308 char *masterhost;
309 int masterport;
310 redisClient *master; /* client that is master for this slave */
311 int replstate;
312 unsigned int maxclients;
313 unsigned long maxmemory;
314 /* Sort parameters - qsort_r() is only available under BSD so we
315 * have to take this state global, in order to pass it to sortCompare() */
316 int sort_desc;
317 int sort_alpha;
318 int sort_bypattern;
319 };
320
321 typedef void redisCommandProc(redisClient *c);
322 struct redisCommand {
323 char *name;
324 redisCommandProc *proc;
325 int arity;
326 int flags;
327 };
328
329 struct redisFunctionSym {
330 char *name;
331 unsigned long pointer;
332 };
333
334 typedef struct _redisSortObject {
335 robj *obj;
336 union {
337 double score;
338 robj *cmpobj;
339 } u;
340 } redisSortObject;
341
342 typedef struct _redisSortOperation {
343 int type;
344 robj *pattern;
345 } redisSortOperation;
346
347 /* ZSETs use a specialized version of Skiplists */
348
349 typedef struct zskiplistNode {
350 struct zskiplistNode **forward;
351 struct zskiplistNode *backward;
352 double score;
353 robj *obj;
354 } zskiplistNode;
355
356 typedef struct zskiplist {
357 struct zskiplistNode *header, *tail;
358 unsigned long length;
359 int level;
360 } zskiplist;
361
362 typedef struct zset {
363 dict *dict;
364 zskiplist *zsl;
365 } zset;
366
367 /* Our shared "common" objects */
368
369 struct sharedObjectsStruct {
370 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
371 *colon, *nullbulk, *nullmultibulk,
372 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
373 *outofrangeerr, *plus,
374 *select0, *select1, *select2, *select3, *select4,
375 *select5, *select6, *select7, *select8, *select9;
376 } shared;
377
378 /* Global vars that are actally used as constants. The following double
379 * values are used for double on-disk serialization, and are initialized
380 * at runtime to avoid strange compiler optimizations. */
381
382 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
383
384 /*================================ Prototypes =============================== */
385
386 static void freeStringObject(robj *o);
387 static void freeListObject(robj *o);
388 static void freeSetObject(robj *o);
389 static void decrRefCount(void *o);
390 static robj *createObject(int type, void *ptr);
391 static void freeClient(redisClient *c);
392 static int rdbLoad(char *filename);
393 static void addReply(redisClient *c, robj *obj);
394 static void addReplySds(redisClient *c, sds s);
395 static void incrRefCount(robj *o);
396 static int rdbSaveBackground(char *filename);
397 static robj *createStringObject(char *ptr, size_t len);
398 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
399 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
400 static int syncWithMaster(void);
401 static robj *tryObjectSharing(robj *o);
402 static int tryObjectEncoding(robj *o);
403 static robj *getDecodedObject(robj *o);
404 static int removeExpire(redisDb *db, robj *key);
405 static int expireIfNeeded(redisDb *db, robj *key);
406 static int deleteIfVolatile(redisDb *db, robj *key);
407 static int deleteKey(redisDb *db, robj *key);
408 static time_t getExpire(redisDb *db, robj *key);
409 static int setExpire(redisDb *db, robj *key, time_t when);
410 static void updateSlavesWaitingBgsave(int bgsaveerr);
411 static void freeMemoryIfNeeded(void);
412 static int processCommand(redisClient *c);
413 static void setupSigSegvAction(void);
414 static void rdbRemoveTempFile(pid_t childpid);
415 static void aofRemoveTempFile(pid_t childpid);
416 static size_t stringObjectLen(robj *o);
417 static void processInputBuffer(redisClient *c);
418 static zskiplist *zslCreate(void);
419 static void zslFree(zskiplist *zsl);
420 static void zslInsert(zskiplist *zsl, double score, robj *obj);
421 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
422
423 static void authCommand(redisClient *c);
424 static void pingCommand(redisClient *c);
425 static void echoCommand(redisClient *c);
426 static void setCommand(redisClient *c);
427 static void setnxCommand(redisClient *c);
428 static void getCommand(redisClient *c);
429 static void delCommand(redisClient *c);
430 static void existsCommand(redisClient *c);
431 static void incrCommand(redisClient *c);
432 static void decrCommand(redisClient *c);
433 static void incrbyCommand(redisClient *c);
434 static void decrbyCommand(redisClient *c);
435 static void selectCommand(redisClient *c);
436 static void randomkeyCommand(redisClient *c);
437 static void keysCommand(redisClient *c);
438 static void dbsizeCommand(redisClient *c);
439 static void lastsaveCommand(redisClient *c);
440 static void saveCommand(redisClient *c);
441 static void bgsaveCommand(redisClient *c);
442 static void bgrewriteaofCommand(redisClient *c);
443 static void shutdownCommand(redisClient *c);
444 static void moveCommand(redisClient *c);
445 static void renameCommand(redisClient *c);
446 static void renamenxCommand(redisClient *c);
447 static void lpushCommand(redisClient *c);
448 static void rpushCommand(redisClient *c);
449 static void lpopCommand(redisClient *c);
450 static void rpopCommand(redisClient *c);
451 static void llenCommand(redisClient *c);
452 static void lindexCommand(redisClient *c);
453 static void lrangeCommand(redisClient *c);
454 static void ltrimCommand(redisClient *c);
455 static void typeCommand(redisClient *c);
456 static void lsetCommand(redisClient *c);
457 static void saddCommand(redisClient *c);
458 static void sremCommand(redisClient *c);
459 static void smoveCommand(redisClient *c);
460 static void sismemberCommand(redisClient *c);
461 static void scardCommand(redisClient *c);
462 static void spopCommand(redisClient *c);
463 static void srandmemberCommand(redisClient *c);
464 static void sinterCommand(redisClient *c);
465 static void sinterstoreCommand(redisClient *c);
466 static void sunionCommand(redisClient *c);
467 static void sunionstoreCommand(redisClient *c);
468 static void sdiffCommand(redisClient *c);
469 static void sdiffstoreCommand(redisClient *c);
470 static void syncCommand(redisClient *c);
471 static void flushdbCommand(redisClient *c);
472 static void flushallCommand(redisClient *c);
473 static void sortCommand(redisClient *c);
474 static void lremCommand(redisClient *c);
475 static void rpoplpushcommand(redisClient *c);
476 static void infoCommand(redisClient *c);
477 static void mgetCommand(redisClient *c);
478 static void monitorCommand(redisClient *c);
479 static void expireCommand(redisClient *c);
480 static void expireatCommand(redisClient *c);
481 static void getsetCommand(redisClient *c);
482 static void ttlCommand(redisClient *c);
483 static void slaveofCommand(redisClient *c);
484 static void debugCommand(redisClient *c);
485 static void msetCommand(redisClient *c);
486 static void msetnxCommand(redisClient *c);
487 static void zaddCommand(redisClient *c);
488 static void zincrbyCommand(redisClient *c);
489 static void zrangeCommand(redisClient *c);
490 static void zrangebyscoreCommand(redisClient *c);
491 static void zrevrangeCommand(redisClient *c);
492 static void zcardCommand(redisClient *c);
493 static void zremCommand(redisClient *c);
494 static void zscoreCommand(redisClient *c);
495 static void zremrangebyscoreCommand(redisClient *c);
496
497 /*================================= Globals ================================= */
498
499 /* Global vars */
500 static struct redisServer server; /* server global state */
501 static struct redisCommand cmdTable[] = {
502 {"get",getCommand,2,REDIS_CMD_INLINE},
503 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
504 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
505 {"del",delCommand,-2,REDIS_CMD_INLINE},
506 {"exists",existsCommand,2,REDIS_CMD_INLINE},
507 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
508 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
509 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
510 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
511 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
512 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
513 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
514 {"llen",llenCommand,2,REDIS_CMD_INLINE},
515 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
516 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
517 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
518 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
519 {"lrem",lremCommand,4,REDIS_CMD_BULK},
520 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
521 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
522 {"srem",sremCommand,3,REDIS_CMD_BULK},
523 {"smove",smoveCommand,4,REDIS_CMD_BULK},
524 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
525 {"scard",scardCommand,2,REDIS_CMD_INLINE},
526 {"spop",spopCommand,2,REDIS_CMD_INLINE},
527 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
528 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
529 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
530 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
531 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
532 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
533 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
534 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
535 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
536 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
537 {"zrem",zremCommand,3,REDIS_CMD_BULK},
538 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
539 {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE},
540 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
541 {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE},
542 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
543 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
544 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
545 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
546 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
547 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
548 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
549 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
550 {"select",selectCommand,2,REDIS_CMD_INLINE},
551 {"move",moveCommand,3,REDIS_CMD_INLINE},
552 {"rename",renameCommand,3,REDIS_CMD_INLINE},
553 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
554 {"expire",expireCommand,3,REDIS_CMD_INLINE},
555 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
556 {"keys",keysCommand,2,REDIS_CMD_INLINE},
557 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
558 {"auth",authCommand,2,REDIS_CMD_INLINE},
559 {"ping",pingCommand,1,REDIS_CMD_INLINE},
560 {"echo",echoCommand,2,REDIS_CMD_BULK},
561 {"save",saveCommand,1,REDIS_CMD_INLINE},
562 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
563 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
564 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
565 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
566 {"type",typeCommand,2,REDIS_CMD_INLINE},
567 {"sync",syncCommand,1,REDIS_CMD_INLINE},
568 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
569 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
570 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
571 {"info",infoCommand,1,REDIS_CMD_INLINE},
572 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
573 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
574 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
575 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
576 {NULL,NULL,0,0}
577 };
578
579 /*============================ Utility functions ============================ */
580
581 /* Glob-style pattern matching. */
582 int stringmatchlen(const char *pattern, int patternLen,
583 const char *string, int stringLen, int nocase)
584 {
585 while(patternLen) {
586 switch(pattern[0]) {
587 case '*':
588 while (pattern[1] == '*') {
589 pattern++;
590 patternLen--;
591 }
592 if (patternLen == 1)
593 return 1; /* match */
594 while(stringLen) {
595 if (stringmatchlen(pattern+1, patternLen-1,
596 string, stringLen, nocase))
597 return 1; /* match */
598 string++;
599 stringLen--;
600 }
601 return 0; /* no match */
602 break;
603 case '?':
604 if (stringLen == 0)
605 return 0; /* no match */
606 string++;
607 stringLen--;
608 break;
609 case '[':
610 {
611 int not, match;
612
613 pattern++;
614 patternLen--;
615 not = pattern[0] == '^';
616 if (not) {
617 pattern++;
618 patternLen--;
619 }
620 match = 0;
621 while(1) {
622 if (pattern[0] == '\\') {
623 pattern++;
624 patternLen--;
625 if (pattern[0] == string[0])
626 match = 1;
627 } else if (pattern[0] == ']') {
628 break;
629 } else if (patternLen == 0) {
630 pattern--;
631 patternLen++;
632 break;
633 } else if (pattern[1] == '-' && patternLen >= 3) {
634 int start = pattern[0];
635 int end = pattern[2];
636 int c = string[0];
637 if (start > end) {
638 int t = start;
639 start = end;
640 end = t;
641 }
642 if (nocase) {
643 start = tolower(start);
644 end = tolower(end);
645 c = tolower(c);
646 }
647 pattern += 2;
648 patternLen -= 2;
649 if (c >= start && c <= end)
650 match = 1;
651 } else {
652 if (!nocase) {
653 if (pattern[0] == string[0])
654 match = 1;
655 } else {
656 if (tolower((int)pattern[0]) == tolower((int)string[0]))
657 match = 1;
658 }
659 }
660 pattern++;
661 patternLen--;
662 }
663 if (not)
664 match = !match;
665 if (!match)
666 return 0; /* no match */
667 string++;
668 stringLen--;
669 break;
670 }
671 case '\\':
672 if (patternLen >= 2) {
673 pattern++;
674 patternLen--;
675 }
676 /* fall through */
677 default:
678 if (!nocase) {
679 if (pattern[0] != string[0])
680 return 0; /* no match */
681 } else {
682 if (tolower((int)pattern[0]) != tolower((int)string[0]))
683 return 0; /* no match */
684 }
685 string++;
686 stringLen--;
687 break;
688 }
689 pattern++;
690 patternLen--;
691 if (stringLen == 0) {
692 while(*pattern == '*') {
693 pattern++;
694 patternLen--;
695 }
696 break;
697 }
698 }
699 if (patternLen == 0 && stringLen == 0)
700 return 1;
701 return 0;
702 }
703
704 static void redisLog(int level, const char *fmt, ...) {
705 va_list ap;
706 FILE *fp;
707
708 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
709 if (!fp) return;
710
711 va_start(ap, fmt);
712 if (level >= server.verbosity) {
713 char *c = ".-*";
714 char buf[64];
715 time_t now;
716
717 now = time(NULL);
718 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
719 fprintf(fp,"%s %c ",buf,c[level]);
720 vfprintf(fp, fmt, ap);
721 fprintf(fp,"\n");
722 fflush(fp);
723 }
724 va_end(ap);
725
726 if (server.logfile) fclose(fp);
727 }
728
729 /*====================== Hash table type implementation ==================== */
730
731 /* This is an hash table type that uses the SDS dynamic strings libary as
732 * keys and radis objects as values (objects can hold SDS strings,
733 * lists, sets). */
734
735 static void dictVanillaFree(void *privdata, void *val)
736 {
737 DICT_NOTUSED(privdata);
738 zfree(val);
739 }
740
741 static int sdsDictKeyCompare(void *privdata, const void *key1,
742 const void *key2)
743 {
744 int l1,l2;
745 DICT_NOTUSED(privdata);
746
747 l1 = sdslen((sds)key1);
748 l2 = sdslen((sds)key2);
749 if (l1 != l2) return 0;
750 return memcmp(key1, key2, l1) == 0;
751 }
752
753 static void dictRedisObjectDestructor(void *privdata, void *val)
754 {
755 DICT_NOTUSED(privdata);
756
757 decrRefCount(val);
758 }
759
760 static int dictObjKeyCompare(void *privdata, const void *key1,
761 const void *key2)
762 {
763 const robj *o1 = key1, *o2 = key2;
764 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
765 }
766
767 static unsigned int dictObjHash(const void *key) {
768 const robj *o = key;
769 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
770 }
771
772 static int dictEncObjKeyCompare(void *privdata, const void *key1,
773 const void *key2)
774 {
775 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
776 int cmp;
777
778 o1 = getDecodedObject(o1);
779 o2 = getDecodedObject(o2);
780 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
781 decrRefCount(o1);
782 decrRefCount(o2);
783 return cmp;
784 }
785
786 static unsigned int dictEncObjHash(const void *key) {
787 robj *o = (robj*) key;
788
789 o = getDecodedObject(o);
790 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
791 decrRefCount(o);
792 return hash;
793 }
794
795 static dictType setDictType = {
796 dictEncObjHash, /* hash function */
797 NULL, /* key dup */
798 NULL, /* val dup */
799 dictEncObjKeyCompare, /* key compare */
800 dictRedisObjectDestructor, /* key destructor */
801 NULL /* val destructor */
802 };
803
804 static dictType zsetDictType = {
805 dictEncObjHash, /* hash function */
806 NULL, /* key dup */
807 NULL, /* val dup */
808 dictEncObjKeyCompare, /* key compare */
809 dictRedisObjectDestructor, /* key destructor */
810 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
811 };
812
813 static dictType hashDictType = {
814 dictObjHash, /* hash function */
815 NULL, /* key dup */
816 NULL, /* val dup */
817 dictObjKeyCompare, /* key compare */
818 dictRedisObjectDestructor, /* key destructor */
819 dictRedisObjectDestructor /* val destructor */
820 };
821
822 /* ========================= Random utility functions ======================= */
823
824 /* Redis generally does not try to recover from out of memory conditions
825 * when allocating objects or strings, it is not clear if it will be possible
826 * to report this condition to the client since the networking layer itself
827 * is based on heap allocation for send buffers, so we simply abort.
828 * At least the code will be simpler to read... */
829 static void oom(const char *msg) {
830 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
831 sleep(1);
832 abort();
833 }
834
835 /* ====================== Redis server networking stuff ===================== */
836 static void closeTimedoutClients(void) {
837 redisClient *c;
838 listNode *ln;
839 time_t now = time(NULL);
840
841 listRewind(server.clients);
842 while ((ln = listYield(server.clients)) != NULL) {
843 c = listNodeValue(ln);
844 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
845 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
846 (now - c->lastinteraction > server.maxidletime)) {
847 redisLog(REDIS_DEBUG,"Closing idle client");
848 freeClient(c);
849 }
850 }
851 }
852
853 static int htNeedsResize(dict *dict) {
854 long long size, used;
855
856 size = dictSlots(dict);
857 used = dictSize(dict);
858 return (size && used && size > DICT_HT_INITIAL_SIZE &&
859 (used*100/size < REDIS_HT_MINFILL));
860 }
861
862 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
863 * we resize the hash table to save memory */
864 static void tryResizeHashTables(void) {
865 int j;
866
867 for (j = 0; j < server.dbnum; j++) {
868 if (htNeedsResize(server.db[j].dict)) {
869 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
870 dictResize(server.db[j].dict);
871 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
872 }
873 if (htNeedsResize(server.db[j].expires))
874 dictResize(server.db[j].expires);
875 }
876 }
877
878 /* A background saving child (BGSAVE) terminated its work. Handle this. */
879 void backgroundSaveDoneHandler(int statloc) {
880 int exitcode = WEXITSTATUS(statloc);
881 int bysignal = WIFSIGNALED(statloc);
882
883 if (!bysignal && exitcode == 0) {
884 redisLog(REDIS_NOTICE,
885 "Background saving terminated with success");
886 server.dirty = 0;
887 server.lastsave = time(NULL);
888 } else if (!bysignal && exitcode != 0) {
889 redisLog(REDIS_WARNING, "Background saving error");
890 } else {
891 redisLog(REDIS_WARNING,
892 "Background saving terminated by signal");
893 rdbRemoveTempFile(server.bgsavechildpid);
894 }
895 server.bgsavechildpid = -1;
896 /* Possibly there are slaves waiting for a BGSAVE in order to be served
897 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
898 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
899 }
900
901 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
902 * Handle this. */
903 void backgroundRewriteDoneHandler(int statloc) {
904 int exitcode = WEXITSTATUS(statloc);
905 int bysignal = WIFSIGNALED(statloc);
906
907 if (!bysignal && exitcode == 0) {
908 int fd;
909 char tmpfile[256];
910
911 redisLog(REDIS_NOTICE,
912 "Background append only file rewriting terminated with success");
913 /* Now it's time to flush the differences accumulated by the parent */
914 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
915 fd = open(tmpfile,O_WRONLY|O_APPEND);
916 if (fd == -1) {
917 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
918 goto cleanup;
919 }
920 /* Flush our data... */
921 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
922 (signed) sdslen(server.bgrewritebuf)) {
923 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
924 close(fd);
925 goto cleanup;
926 }
927 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
928 /* Now our work is to rename the temp file into the stable file. And
929 * switch the file descriptor used by the server for append only. */
930 if (rename(tmpfile,server.appendfilename) == -1) {
931 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
932 close(fd);
933 goto cleanup;
934 }
935 /* Mission completed... almost */
936 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
937 if (server.appendfd != -1) {
938 /* If append only is actually enabled... */
939 close(server.appendfd);
940 server.appendfd = fd;
941 fsync(fd);
942 server.appendseldb = -1; /* Make sure it will issue SELECT */
943 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
944 } else {
945 /* If append only is disabled we just generate a dump in this
946 * format. Why not? */
947 close(fd);
948 }
949 } else if (!bysignal && exitcode != 0) {
950 redisLog(REDIS_WARNING, "Background append only file rewriting error");
951 } else {
952 redisLog(REDIS_WARNING,
953 "Background append only file rewriting terminated by signal");
954 }
955 cleanup:
956 sdsfree(server.bgrewritebuf);
957 server.bgrewritebuf = sdsempty();
958 aofRemoveTempFile(server.bgrewritechildpid);
959 server.bgrewritechildpid = -1;
960 }
961
962 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
963 int j, loops = server.cronloops++;
964 REDIS_NOTUSED(eventLoop);
965 REDIS_NOTUSED(id);
966 REDIS_NOTUSED(clientData);
967
968 /* Update the global state with the amount of used memory */
969 server.usedmemory = zmalloc_used_memory();
970
971 /* Show some info about non-empty databases */
972 for (j = 0; j < server.dbnum; j++) {
973 long long size, used, vkeys;
974
975 size = dictSlots(server.db[j].dict);
976 used = dictSize(server.db[j].dict);
977 vkeys = dictSize(server.db[j].expires);
978 if (!(loops % 5) && (used || vkeys)) {
979 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
980 /* dictPrintStats(server.dict); */
981 }
982 }
983
984 /* We don't want to resize the hash tables while a bacground saving
985 * is in progress: the saving child is created using fork() that is
986 * implemented with a copy-on-write semantic in most modern systems, so
987 * if we resize the HT while there is the saving child at work actually
988 * a lot of memory movements in the parent will cause a lot of pages
989 * copied. */
990 if (server.bgsavechildpid == -1) tryResizeHashTables();
991
992 /* Show information about connected clients */
993 if (!(loops % 5)) {
994 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
995 listLength(server.clients)-listLength(server.slaves),
996 listLength(server.slaves),
997 server.usedmemory,
998 dictSize(server.sharingpool));
999 }
1000
1001 /* Close connections of timedout clients */
1002 if (server.maxidletime && !(loops % 10))
1003 closeTimedoutClients();
1004
1005 /* Check if a background saving or AOF rewrite in progress terminated */
1006 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
1007 int statloc;
1008 pid_t pid;
1009
1010 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1011 if (pid == server.bgsavechildpid) {
1012 backgroundSaveDoneHandler(statloc);
1013 } else {
1014 backgroundRewriteDoneHandler(statloc);
1015 }
1016 }
1017 } else {
1018 /* If there is not a background saving in progress check if
1019 * we have to save now */
1020 time_t now = time(NULL);
1021 for (j = 0; j < server.saveparamslen; j++) {
1022 struct saveparam *sp = server.saveparams+j;
1023
1024 if (server.dirty >= sp->changes &&
1025 now-server.lastsave > sp->seconds) {
1026 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1027 sp->changes, sp->seconds);
1028 rdbSaveBackground(server.dbfilename);
1029 break;
1030 }
1031 }
1032 }
1033
1034 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1035 * will use few CPU cycles if there are few expiring keys, otherwise
1036 * it will get more aggressive to avoid that too much memory is used by
1037 * keys that can be removed from the keyspace. */
1038 for (j = 0; j < server.dbnum; j++) {
1039 int expired;
1040 redisDb *db = server.db+j;
1041
1042 /* Continue to expire if at the end of the cycle more than 25%
1043 * of the keys were expired. */
1044 do {
1045 int num = dictSize(db->expires);
1046 time_t now = time(NULL);
1047
1048 expired = 0;
1049 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1050 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1051 while (num--) {
1052 dictEntry *de;
1053 time_t t;
1054
1055 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1056 t = (time_t) dictGetEntryVal(de);
1057 if (now > t) {
1058 deleteKey(db,dictGetEntryKey(de));
1059 expired++;
1060 }
1061 }
1062 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1063 }
1064
1065 /* Check if we should connect to a MASTER */
1066 if (server.replstate == REDIS_REPL_CONNECT) {
1067 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1068 if (syncWithMaster() == REDIS_OK) {
1069 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1070 }
1071 }
1072 return 1000;
1073 }
1074
1075 static void createSharedObjects(void) {
1076 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1077 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1078 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1079 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1080 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1081 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1082 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1083 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1084 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1085 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1086 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1087 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1088 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1089 "-ERR no such key\r\n"));
1090 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1091 "-ERR syntax error\r\n"));
1092 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1093 "-ERR source and destination objects are the same\r\n"));
1094 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1095 "-ERR index out of range\r\n"));
1096 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1097 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1098 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1099 shared.select0 = createStringObject("select 0\r\n",10);
1100 shared.select1 = createStringObject("select 1\r\n",10);
1101 shared.select2 = createStringObject("select 2\r\n",10);
1102 shared.select3 = createStringObject("select 3\r\n",10);
1103 shared.select4 = createStringObject("select 4\r\n",10);
1104 shared.select5 = createStringObject("select 5\r\n",10);
1105 shared.select6 = createStringObject("select 6\r\n",10);
1106 shared.select7 = createStringObject("select 7\r\n",10);
1107 shared.select8 = createStringObject("select 8\r\n",10);
1108 shared.select9 = createStringObject("select 9\r\n",10);
1109 }
1110
1111 static void appendServerSaveParams(time_t seconds, int changes) {
1112 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1113 server.saveparams[server.saveparamslen].seconds = seconds;
1114 server.saveparams[server.saveparamslen].changes = changes;
1115 server.saveparamslen++;
1116 }
1117
1118 static void resetServerSaveParams() {
1119 zfree(server.saveparams);
1120 server.saveparams = NULL;
1121 server.saveparamslen = 0;
1122 }
1123
1124 static void initServerConfig() {
1125 server.dbnum = REDIS_DEFAULT_DBNUM;
1126 server.port = REDIS_SERVERPORT;
1127 server.verbosity = REDIS_DEBUG;
1128 server.maxidletime = REDIS_MAXIDLETIME;
1129 server.saveparams = NULL;
1130 server.logfile = NULL; /* NULL = log on standard output */
1131 server.bindaddr = NULL;
1132 server.glueoutputbuf = 1;
1133 server.daemonize = 0;
1134 server.appendonly = 0;
1135 server.appendfsync = APPENDFSYNC_ALWAYS;
1136 server.lastfsync = time(NULL);
1137 server.appendfd = -1;
1138 server.appendseldb = -1; /* Make sure the first time will not match */
1139 server.pidfile = "/var/run/redis.pid";
1140 server.dbfilename = "dump.rdb";
1141 server.appendfilename = "appendonly.aof";
1142 server.requirepass = NULL;
1143 server.shareobjects = 0;
1144 server.rdbcompression = 1;
1145 server.sharingpoolsize = 1024;
1146 server.maxclients = 0;
1147 server.maxmemory = 0;
1148 resetServerSaveParams();
1149
1150 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1151 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1152 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1153 /* Replication related */
1154 server.isslave = 0;
1155 server.masterauth = NULL;
1156 server.masterhost = NULL;
1157 server.masterport = 6379;
1158 server.master = NULL;
1159 server.replstate = REDIS_REPL_NONE;
1160
1161 /* Double constants initialization */
1162 R_Zero = 0.0;
1163 R_PosInf = 1.0/R_Zero;
1164 R_NegInf = -1.0/R_Zero;
1165 R_Nan = R_Zero/R_Zero;
1166 }
1167
1168 static void initServer() {
1169 int j;
1170
1171 signal(SIGHUP, SIG_IGN);
1172 signal(SIGPIPE, SIG_IGN);
1173 setupSigSegvAction();
1174
1175 server.clients = listCreate();
1176 server.slaves = listCreate();
1177 server.monitors = listCreate();
1178 server.objfreelist = listCreate();
1179 createSharedObjects();
1180 server.el = aeCreateEventLoop();
1181 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1182 server.sharingpool = dictCreate(&setDictType,NULL);
1183 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1184 if (server.fd == -1) {
1185 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1186 exit(1);
1187 }
1188 for (j = 0; j < server.dbnum; j++) {
1189 server.db[j].dict = dictCreate(&hashDictType,NULL);
1190 server.db[j].expires = dictCreate(&setDictType,NULL);
1191 server.db[j].id = j;
1192 }
1193 server.cronloops = 0;
1194 server.bgsavechildpid = -1;
1195 server.bgrewritechildpid = -1;
1196 server.bgrewritebuf = sdsempty();
1197 server.lastsave = time(NULL);
1198 server.dirty = 0;
1199 server.usedmemory = 0;
1200 server.stat_numcommands = 0;
1201 server.stat_numconnections = 0;
1202 server.stat_starttime = time(NULL);
1203 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1204
1205 if (server.appendonly) {
1206 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1207 if (server.appendfd == -1) {
1208 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1209 strerror(errno));
1210 exit(1);
1211 }
1212 }
1213 }
1214
1215 /* Empty the whole database */
1216 static long long emptyDb() {
1217 int j;
1218 long long removed = 0;
1219
1220 for (j = 0; j < server.dbnum; j++) {
1221 removed += dictSize(server.db[j].dict);
1222 dictEmpty(server.db[j].dict);
1223 dictEmpty(server.db[j].expires);
1224 }
1225 return removed;
1226 }
1227
1228 static int yesnotoi(char *s) {
1229 if (!strcasecmp(s,"yes")) return 1;
1230 else if (!strcasecmp(s,"no")) return 0;
1231 else return -1;
1232 }
1233
1234 /* I agree, this is a very rudimental way to load a configuration...
1235 will improve later if the config gets more complex */
1236 static void loadServerConfig(char *filename) {
1237 FILE *fp;
1238 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1239 int linenum = 0;
1240 sds line = NULL;
1241
1242 if (filename[0] == '-' && filename[1] == '\0')
1243 fp = stdin;
1244 else {
1245 if ((fp = fopen(filename,"r")) == NULL) {
1246 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1247 exit(1);
1248 }
1249 }
1250
1251 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1252 sds *argv;
1253 int argc, j;
1254
1255 linenum++;
1256 line = sdsnew(buf);
1257 line = sdstrim(line," \t\r\n");
1258
1259 /* Skip comments and blank lines*/
1260 if (line[0] == '#' || line[0] == '\0') {
1261 sdsfree(line);
1262 continue;
1263 }
1264
1265 /* Split into arguments */
1266 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1267 sdstolower(argv[0]);
1268
1269 /* Execute config directives */
1270 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1271 server.maxidletime = atoi(argv[1]);
1272 if (server.maxidletime < 0) {
1273 err = "Invalid timeout value"; goto loaderr;
1274 }
1275 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1276 server.port = atoi(argv[1]);
1277 if (server.port < 1 || server.port > 65535) {
1278 err = "Invalid port"; goto loaderr;
1279 }
1280 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1281 server.bindaddr = zstrdup(argv[1]);
1282 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1283 int seconds = atoi(argv[1]);
1284 int changes = atoi(argv[2]);
1285 if (seconds < 1 || changes < 0) {
1286 err = "Invalid save parameters"; goto loaderr;
1287 }
1288 appendServerSaveParams(seconds,changes);
1289 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1290 if (chdir(argv[1]) == -1) {
1291 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1292 argv[1], strerror(errno));
1293 exit(1);
1294 }
1295 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1296 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1297 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1298 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1299 else {
1300 err = "Invalid log level. Must be one of debug, notice, warning";
1301 goto loaderr;
1302 }
1303 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1304 FILE *logfp;
1305
1306 server.logfile = zstrdup(argv[1]);
1307 if (!strcasecmp(server.logfile,"stdout")) {
1308 zfree(server.logfile);
1309 server.logfile = NULL;
1310 }
1311 if (server.logfile) {
1312 /* Test if we are able to open the file. The server will not
1313 * be able to abort just for this problem later... */
1314 logfp = fopen(server.logfile,"a");
1315 if (logfp == NULL) {
1316 err = sdscatprintf(sdsempty(),
1317 "Can't open the log file: %s", strerror(errno));
1318 goto loaderr;
1319 }
1320 fclose(logfp);
1321 }
1322 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1323 server.dbnum = atoi(argv[1]);
1324 if (server.dbnum < 1) {
1325 err = "Invalid number of databases"; goto loaderr;
1326 }
1327 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1328 server.maxclients = atoi(argv[1]);
1329 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1330 server.maxmemory = strtoll(argv[1], NULL, 10);
1331 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1332 server.masterhost = sdsnew(argv[1]);
1333 server.masterport = atoi(argv[2]);
1334 server.replstate = REDIS_REPL_CONNECT;
1335 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1336 server.masterauth = zstrdup(argv[1]);
1337 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1338 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1339 err = "argument must be 'yes' or 'no'"; goto loaderr;
1340 }
1341 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1342 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1343 err = "argument must be 'yes' or 'no'"; goto loaderr;
1344 }
1345 } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
1346 if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
1347 err = "argument must be 'yes' or 'no'"; goto loaderr;
1348 }
1349 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1350 server.sharingpoolsize = atoi(argv[1]);
1351 if (server.sharingpoolsize < 1) {
1352 err = "invalid object sharing pool size"; goto loaderr;
1353 }
1354 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1355 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1356 err = "argument must be 'yes' or 'no'"; goto loaderr;
1357 }
1358 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1359 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1360 err = "argument must be 'yes' or 'no'"; goto loaderr;
1361 }
1362 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1363 if (!strcasecmp(argv[1],"no")) {
1364 server.appendfsync = APPENDFSYNC_NO;
1365 } else if (!strcasecmp(argv[1],"always")) {
1366 server.appendfsync = APPENDFSYNC_ALWAYS;
1367 } else if (!strcasecmp(argv[1],"everysec")) {
1368 server.appendfsync = APPENDFSYNC_EVERYSEC;
1369 } else {
1370 err = "argument must be 'no', 'always' or 'everysec'";
1371 goto loaderr;
1372 }
1373 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1374 server.requirepass = zstrdup(argv[1]);
1375 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1376 server.pidfile = zstrdup(argv[1]);
1377 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1378 server.dbfilename = zstrdup(argv[1]);
1379 } else {
1380 err = "Bad directive or wrong number of arguments"; goto loaderr;
1381 }
1382 for (j = 0; j < argc; j++)
1383 sdsfree(argv[j]);
1384 zfree(argv);
1385 sdsfree(line);
1386 }
1387 if (fp != stdin) fclose(fp);
1388 return;
1389
1390 loaderr:
1391 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1392 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1393 fprintf(stderr, ">>> '%s'\n", line);
1394 fprintf(stderr, "%s\n", err);
1395 exit(1);
1396 }
1397
1398 static void freeClientArgv(redisClient *c) {
1399 int j;
1400
1401 for (j = 0; j < c->argc; j++)
1402 decrRefCount(c->argv[j]);
1403 for (j = 0; j < c->mbargc; j++)
1404 decrRefCount(c->mbargv[j]);
1405 c->argc = 0;
1406 c->mbargc = 0;
1407 }
1408
1409 static void freeClient(redisClient *c) {
1410 listNode *ln;
1411
1412 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1413 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1414 sdsfree(c->querybuf);
1415 listRelease(c->reply);
1416 freeClientArgv(c);
1417 close(c->fd);
1418 ln = listSearchKey(server.clients,c);
1419 redisAssert(ln != NULL);
1420 listDelNode(server.clients,ln);
1421 if (c->flags & REDIS_SLAVE) {
1422 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1423 close(c->repldbfd);
1424 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1425 ln = listSearchKey(l,c);
1426 redisAssert(ln != NULL);
1427 listDelNode(l,ln);
1428 }
1429 if (c->flags & REDIS_MASTER) {
1430 server.master = NULL;
1431 server.replstate = REDIS_REPL_CONNECT;
1432 }
1433 zfree(c->argv);
1434 zfree(c->mbargv);
1435 zfree(c);
1436 }
1437
1438 #define GLUEREPLY_UP_TO (1024)
1439 static void glueReplyBuffersIfNeeded(redisClient *c) {
1440 int copylen = 0;
1441 char buf[GLUEREPLY_UP_TO];
1442 listNode *ln;
1443 robj *o;
1444
1445 listRewind(c->reply);
1446 while((ln = listYield(c->reply))) {
1447 int objlen;
1448
1449 o = ln->value;
1450 objlen = sdslen(o->ptr);
1451 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1452 memcpy(buf+copylen,o->ptr,objlen);
1453 copylen += objlen;
1454 listDelNode(c->reply,ln);
1455 } else {
1456 if (copylen == 0) return;
1457 break;
1458 }
1459 }
1460 /* Now the output buffer is empty, add the new single element */
1461 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1462 listAddNodeHead(c->reply,o);
1463 }
1464
1465 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1466 redisClient *c = privdata;
1467 int nwritten = 0, totwritten = 0, objlen;
1468 robj *o;
1469 REDIS_NOTUSED(el);
1470 REDIS_NOTUSED(mask);
1471
1472 /* Use writev() if we have enough buffers to send */
1473 if (!server.glueoutputbuf &&
1474 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1475 !(c->flags & REDIS_MASTER))
1476 {
1477 sendReplyToClientWritev(el, fd, privdata, mask);
1478 return;
1479 }
1480
1481 while(listLength(c->reply)) {
1482 if (server.glueoutputbuf && listLength(c->reply) > 1)
1483 glueReplyBuffersIfNeeded(c);
1484
1485 o = listNodeValue(listFirst(c->reply));
1486 objlen = sdslen(o->ptr);
1487
1488 if (objlen == 0) {
1489 listDelNode(c->reply,listFirst(c->reply));
1490 continue;
1491 }
1492
1493 if (c->flags & REDIS_MASTER) {
1494 /* Don't reply to a master */
1495 nwritten = objlen - c->sentlen;
1496 } else {
1497 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1498 if (nwritten <= 0) break;
1499 }
1500 c->sentlen += nwritten;
1501 totwritten += nwritten;
1502 /* If we fully sent the object on head go to the next one */
1503 if (c->sentlen == objlen) {
1504 listDelNode(c->reply,listFirst(c->reply));
1505 c->sentlen = 0;
1506 }
1507 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1508 * bytes, in a single threaded server it's a good idea to serve
1509 * other clients as well, even if a very large request comes from
1510 * super fast link that is always able to accept data (in real world
1511 * scenario think about 'KEYS *' against the loopback interfae) */
1512 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1513 }
1514 if (nwritten == -1) {
1515 if (errno == EAGAIN) {
1516 nwritten = 0;
1517 } else {
1518 redisLog(REDIS_DEBUG,
1519 "Error writing to client: %s", strerror(errno));
1520 freeClient(c);
1521 return;
1522 }
1523 }
1524 if (totwritten > 0) c->lastinteraction = time(NULL);
1525 if (listLength(c->reply) == 0) {
1526 c->sentlen = 0;
1527 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1528 }
1529 }
1530
1531 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1532 {
1533 redisClient *c = privdata;
1534 int nwritten = 0, totwritten = 0, objlen, willwrite;
1535 robj *o;
1536 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1537 int offset, ion = 0;
1538 REDIS_NOTUSED(el);
1539 REDIS_NOTUSED(mask);
1540
1541 listNode *node;
1542 while (listLength(c->reply)) {
1543 offset = c->sentlen;
1544 ion = 0;
1545 willwrite = 0;
1546
1547 /* fill-in the iov[] array */
1548 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1549 o = listNodeValue(node);
1550 objlen = sdslen(o->ptr);
1551
1552 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1553 break;
1554
1555 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1556 break; /* no more iovecs */
1557
1558 iov[ion].iov_base = ((char*)o->ptr) + offset;
1559 iov[ion].iov_len = objlen - offset;
1560 willwrite += objlen - offset;
1561 offset = 0; /* just for the first item */
1562 ion++;
1563 }
1564
1565 if(willwrite == 0)
1566 break;
1567
1568 /* write all collected blocks at once */
1569 if((nwritten = writev(fd, iov, ion)) < 0) {
1570 if (errno != EAGAIN) {
1571 redisLog(REDIS_DEBUG,
1572 "Error writing to client: %s", strerror(errno));
1573 freeClient(c);
1574 return;
1575 }
1576 break;
1577 }
1578
1579 totwritten += nwritten;
1580 offset = c->sentlen;
1581
1582 /* remove written robjs from c->reply */
1583 while (nwritten && listLength(c->reply)) {
1584 o = listNodeValue(listFirst(c->reply));
1585 objlen = sdslen(o->ptr);
1586
1587 if(nwritten >= objlen - offset) {
1588 listDelNode(c->reply, listFirst(c->reply));
1589 nwritten -= objlen - offset;
1590 c->sentlen = 0;
1591 } else {
1592 /* partial write */
1593 c->sentlen += nwritten;
1594 break;
1595 }
1596 offset = 0;
1597 }
1598 }
1599
1600 if (totwritten > 0)
1601 c->lastinteraction = time(NULL);
1602
1603 if (listLength(c->reply) == 0) {
1604 c->sentlen = 0;
1605 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1606 }
1607 }
1608
1609 static struct redisCommand *lookupCommand(char *name) {
1610 int j = 0;
1611 while(cmdTable[j].name != NULL) {
1612 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1613 j++;
1614 }
1615 return NULL;
1616 }
1617
1618 /* resetClient prepare the client to process the next command */
1619 static void resetClient(redisClient *c) {
1620 freeClientArgv(c);
1621 c->bulklen = -1;
1622 c->multibulk = 0;
1623 }
1624
1625 /* If this function gets called we already read a whole
1626 * command, argments are in the client argv/argc fields.
1627 * processCommand() execute the command or prepare the
1628 * server for a bulk read from the client.
1629 *
1630 * If 1 is returned the client is still alive and valid and
1631 * and other operations can be performed by the caller. Otherwise
1632 * if 0 is returned the client was destroied (i.e. after QUIT). */
1633 static int processCommand(redisClient *c) {
1634 struct redisCommand *cmd;
1635 long long dirty;
1636
1637 /* Free some memory if needed (maxmemory setting) */
1638 if (server.maxmemory) freeMemoryIfNeeded();
1639
1640 /* Handle the multi bulk command type. This is an alternative protocol
1641 * supported by Redis in order to receive commands that are composed of
1642 * multiple binary-safe "bulk" arguments. The latency of processing is
1643 * a bit higher but this allows things like multi-sets, so if this
1644 * protocol is used only for MSET and similar commands this is a big win. */
1645 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1646 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1647 if (c->multibulk <= 0) {
1648 resetClient(c);
1649 return 1;
1650 } else {
1651 decrRefCount(c->argv[c->argc-1]);
1652 c->argc--;
1653 return 1;
1654 }
1655 } else if (c->multibulk) {
1656 if (c->bulklen == -1) {
1657 if (((char*)c->argv[0]->ptr)[0] != '$') {
1658 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1659 resetClient(c);
1660 return 1;
1661 } else {
1662 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1663 decrRefCount(c->argv[0]);
1664 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1665 c->argc--;
1666 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1667 resetClient(c);
1668 return 1;
1669 }
1670 c->argc--;
1671 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1672 return 1;
1673 }
1674 } else {
1675 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1676 c->mbargv[c->mbargc] = c->argv[0];
1677 c->mbargc++;
1678 c->argc--;
1679 c->multibulk--;
1680 if (c->multibulk == 0) {
1681 robj **auxargv;
1682 int auxargc;
1683
1684 /* Here we need to swap the multi-bulk argc/argv with the
1685 * normal argc/argv of the client structure. */
1686 auxargv = c->argv;
1687 c->argv = c->mbargv;
1688 c->mbargv = auxargv;
1689
1690 auxargc = c->argc;
1691 c->argc = c->mbargc;
1692 c->mbargc = auxargc;
1693
1694 /* We need to set bulklen to something different than -1
1695 * in order for the code below to process the command without
1696 * to try to read the last argument of a bulk command as
1697 * a special argument. */
1698 c->bulklen = 0;
1699 /* continue below and process the command */
1700 } else {
1701 c->bulklen = -1;
1702 return 1;
1703 }
1704 }
1705 }
1706 /* -- end of multi bulk commands processing -- */
1707
1708 /* The QUIT command is handled as a special case. Normal command
1709 * procs are unable to close the client connection safely */
1710 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1711 freeClient(c);
1712 return 0;
1713 }
1714 cmd = lookupCommand(c->argv[0]->ptr);
1715 if (!cmd) {
1716 addReplySds(c,
1717 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1718 (char*)c->argv[0]->ptr));
1719 resetClient(c);
1720 return 1;
1721 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1722 (c->argc < -cmd->arity)) {
1723 addReplySds(c,
1724 sdscatprintf(sdsempty(),
1725 "-ERR wrong number of arguments for '%s' command\r\n",
1726 cmd->name));
1727 resetClient(c);
1728 return 1;
1729 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1730 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1731 resetClient(c);
1732 return 1;
1733 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1734 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1735
1736 decrRefCount(c->argv[c->argc-1]);
1737 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1738 c->argc--;
1739 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1740 resetClient(c);
1741 return 1;
1742 }
1743 c->argc--;
1744 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1745 /* It is possible that the bulk read is already in the
1746 * buffer. Check this condition and handle it accordingly.
1747 * This is just a fast path, alternative to call processInputBuffer().
1748 * It's a good idea since the code is small and this condition
1749 * happens most of the times. */
1750 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1751 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1752 c->argc++;
1753 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1754 } else {
1755 return 1;
1756 }
1757 }
1758 /* Let's try to share objects on the command arguments vector */
1759 if (server.shareobjects) {
1760 int j;
1761 for(j = 1; j < c->argc; j++)
1762 c->argv[j] = tryObjectSharing(c->argv[j]);
1763 }
1764 /* Let's try to encode the bulk object to save space. */
1765 if (cmd->flags & REDIS_CMD_BULK)
1766 tryObjectEncoding(c->argv[c->argc-1]);
1767
1768 /* Check if the user is authenticated */
1769 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1770 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1771 resetClient(c);
1772 return 1;
1773 }
1774
1775 /* Exec the command */
1776 dirty = server.dirty;
1777 cmd->proc(c);
1778 if (server.appendonly && server.dirty-dirty)
1779 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1780 if (server.dirty-dirty && listLength(server.slaves))
1781 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1782 if (listLength(server.monitors))
1783 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1784 server.stat_numcommands++;
1785
1786 /* Prepare the client for the next command */
1787 if (c->flags & REDIS_CLOSE) {
1788 freeClient(c);
1789 return 0;
1790 }
1791 resetClient(c);
1792 return 1;
1793 }
1794
1795 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1796 listNode *ln;
1797 int outc = 0, j;
1798 robj **outv;
1799 /* (args*2)+1 is enough room for args, spaces, newlines */
1800 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1801
1802 if (argc <= REDIS_STATIC_ARGS) {
1803 outv = static_outv;
1804 } else {
1805 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1806 }
1807
1808 for (j = 0; j < argc; j++) {
1809 if (j != 0) outv[outc++] = shared.space;
1810 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1811 robj *lenobj;
1812
1813 lenobj = createObject(REDIS_STRING,
1814 sdscatprintf(sdsempty(),"%lu\r\n",
1815 (unsigned long) stringObjectLen(argv[j])));
1816 lenobj->refcount = 0;
1817 outv[outc++] = lenobj;
1818 }
1819 outv[outc++] = argv[j];
1820 }
1821 outv[outc++] = shared.crlf;
1822
1823 /* Increment all the refcounts at start and decrement at end in order to
1824 * be sure to free objects if there is no slave in a replication state
1825 * able to be feed with commands */
1826 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1827 listRewind(slaves);
1828 while((ln = listYield(slaves))) {
1829 redisClient *slave = ln->value;
1830
1831 /* Don't feed slaves that are still waiting for BGSAVE to start */
1832 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1833
1834 /* Feed all the other slaves, MONITORs and so on */
1835 if (slave->slaveseldb != dictid) {
1836 robj *selectcmd;
1837
1838 switch(dictid) {
1839 case 0: selectcmd = shared.select0; break;
1840 case 1: selectcmd = shared.select1; break;
1841 case 2: selectcmd = shared.select2; break;
1842 case 3: selectcmd = shared.select3; break;
1843 case 4: selectcmd = shared.select4; break;
1844 case 5: selectcmd = shared.select5; break;
1845 case 6: selectcmd = shared.select6; break;
1846 case 7: selectcmd = shared.select7; break;
1847 case 8: selectcmd = shared.select8; break;
1848 case 9: selectcmd = shared.select9; break;
1849 default:
1850 selectcmd = createObject(REDIS_STRING,
1851 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1852 selectcmd->refcount = 0;
1853 break;
1854 }
1855 addReply(slave,selectcmd);
1856 slave->slaveseldb = dictid;
1857 }
1858 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1859 }
1860 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1861 if (outv != static_outv) zfree(outv);
1862 }
1863
1864 static void processInputBuffer(redisClient *c) {
1865 again:
1866 if (c->bulklen == -1) {
1867 /* Read the first line of the query */
1868 char *p = strchr(c->querybuf,'\n');
1869 size_t querylen;
1870
1871 if (p) {
1872 sds query, *argv;
1873 int argc, j;
1874
1875 query = c->querybuf;
1876 c->querybuf = sdsempty();
1877 querylen = 1+(p-(query));
1878 if (sdslen(query) > querylen) {
1879 /* leave data after the first line of the query in the buffer */
1880 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1881 }
1882 *p = '\0'; /* remove "\n" */
1883 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1884 sdsupdatelen(query);
1885
1886 /* Now we can split the query in arguments */
1887 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1888 sdsfree(query);
1889
1890 if (c->argv) zfree(c->argv);
1891 c->argv = zmalloc(sizeof(robj*)*argc);
1892
1893 for (j = 0; j < argc; j++) {
1894 if (sdslen(argv[j])) {
1895 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1896 c->argc++;
1897 } else {
1898 sdsfree(argv[j]);
1899 }
1900 }
1901 zfree(argv);
1902 if (c->argc) {
1903 /* Execute the command. If the client is still valid
1904 * after processCommand() return and there is something
1905 * on the query buffer try to process the next command. */
1906 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1907 } else {
1908 /* Nothing to process, argc == 0. Just process the query
1909 * buffer if it's not empty or return to the caller */
1910 if (sdslen(c->querybuf)) goto again;
1911 }
1912 return;
1913 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1914 redisLog(REDIS_DEBUG, "Client protocol error");
1915 freeClient(c);
1916 return;
1917 }
1918 } else {
1919 /* Bulk read handling. Note that if we are at this point
1920 the client already sent a command terminated with a newline,
1921 we are reading the bulk data that is actually the last
1922 argument of the command. */
1923 int qbl = sdslen(c->querybuf);
1924
1925 if (c->bulklen <= qbl) {
1926 /* Copy everything but the final CRLF as final argument */
1927 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1928 c->argc++;
1929 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1930 /* Process the command. If the client is still valid after
1931 * the processing and there is more data in the buffer
1932 * try to parse it. */
1933 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1934 return;
1935 }
1936 }
1937 }
1938
1939 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1940 redisClient *c = (redisClient*) privdata;
1941 char buf[REDIS_IOBUF_LEN];
1942 int nread;
1943 REDIS_NOTUSED(el);
1944 REDIS_NOTUSED(mask);
1945
1946 nread = read(fd, buf, REDIS_IOBUF_LEN);
1947 if (nread == -1) {
1948 if (errno == EAGAIN) {
1949 nread = 0;
1950 } else {
1951 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1952 freeClient(c);
1953 return;
1954 }
1955 } else if (nread == 0) {
1956 redisLog(REDIS_DEBUG, "Client closed connection");
1957 freeClient(c);
1958 return;
1959 }
1960 if (nread) {
1961 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1962 c->lastinteraction = time(NULL);
1963 } else {
1964 return;
1965 }
1966 processInputBuffer(c);
1967 }
1968
1969 static int selectDb(redisClient *c, int id) {
1970 if (id < 0 || id >= server.dbnum)
1971 return REDIS_ERR;
1972 c->db = &server.db[id];
1973 return REDIS_OK;
1974 }
1975
1976 static void *dupClientReplyValue(void *o) {
1977 incrRefCount((robj*)o);
1978 return 0;
1979 }
1980
1981 static redisClient *createClient(int fd) {
1982 redisClient *c = zmalloc(sizeof(*c));
1983
1984 anetNonBlock(NULL,fd);
1985 anetTcpNoDelay(NULL,fd);
1986 if (!c) return NULL;
1987 selectDb(c,0);
1988 c->fd = fd;
1989 c->querybuf = sdsempty();
1990 c->argc = 0;
1991 c->argv = NULL;
1992 c->bulklen = -1;
1993 c->multibulk = 0;
1994 c->mbargc = 0;
1995 c->mbargv = NULL;
1996 c->sentlen = 0;
1997 c->flags = 0;
1998 c->lastinteraction = time(NULL);
1999 c->authenticated = 0;
2000 c->replstate = REDIS_REPL_NONE;
2001 c->reply = listCreate();
2002 listSetFreeMethod(c->reply,decrRefCount);
2003 listSetDupMethod(c->reply,dupClientReplyValue);
2004 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
2005 readQueryFromClient, c) == AE_ERR) {
2006 freeClient(c);
2007 return NULL;
2008 }
2009 listAddNodeTail(server.clients,c);
2010 return c;
2011 }
2012
2013 static void addReply(redisClient *c, robj *obj) {
2014 if (listLength(c->reply) == 0 &&
2015 (c->replstate == REDIS_REPL_NONE ||
2016 c->replstate == REDIS_REPL_ONLINE) &&
2017 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
2018 sendReplyToClient, c) == AE_ERR) return;
2019 listAddNodeTail(c->reply,getDecodedObject(obj));
2020 }
2021
2022 static void addReplySds(redisClient *c, sds s) {
2023 robj *o = createObject(REDIS_STRING,s);
2024 addReply(c,o);
2025 decrRefCount(o);
2026 }
2027
2028 static void addReplyDouble(redisClient *c, double d) {
2029 char buf[128];
2030
2031 snprintf(buf,sizeof(buf),"%.17g",d);
2032 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2033 (unsigned long) strlen(buf),buf));
2034 }
2035
2036 static void addReplyBulkLen(redisClient *c, robj *obj) {
2037 size_t len;
2038
2039 if (obj->encoding == REDIS_ENCODING_RAW) {
2040 len = sdslen(obj->ptr);
2041 } else {
2042 long n = (long)obj->ptr;
2043
2044 /* Compute how many bytes will take this integer as a radix 10 string */
2045 len = 1;
2046 if (n < 0) {
2047 len++;
2048 n = -n;
2049 }
2050 while((n = n/10) != 0) {
2051 len++;
2052 }
2053 }
2054 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
2055 }
2056
2057 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2058 int cport, cfd;
2059 char cip[128];
2060 redisClient *c;
2061 REDIS_NOTUSED(el);
2062 REDIS_NOTUSED(mask);
2063 REDIS_NOTUSED(privdata);
2064
2065 cfd = anetAccept(server.neterr, fd, cip, &cport);
2066 if (cfd == AE_ERR) {
2067 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2068 return;
2069 }
2070 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2071 if ((c = createClient(cfd)) == NULL) {
2072 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2073 close(cfd); /* May be already closed, just ingore errors */
2074 return;
2075 }
2076 /* If maxclient directive is set and this is one client more... close the
2077 * connection. Note that we create the client instead to check before
2078 * for this condition, since now the socket is already set in nonblocking
2079 * mode and we can send an error for free using the Kernel I/O */
2080 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2081 char *err = "-ERR max number of clients reached\r\n";
2082
2083 /* That's a best effort error message, don't check write errors */
2084 if (write(c->fd,err,strlen(err)) == -1) {
2085 /* Nothing to do, Just to avoid the warning... */
2086 }
2087 freeClient(c);
2088 return;
2089 }
2090 server.stat_numconnections++;
2091 }
2092
2093 /* ======================= Redis objects implementation ===================== */
2094
2095 static robj *createObject(int type, void *ptr) {
2096 robj *o;
2097
2098 if (listLength(server.objfreelist)) {
2099 listNode *head = listFirst(server.objfreelist);
2100 o = listNodeValue(head);
2101 listDelNode(server.objfreelist,head);
2102 } else {
2103 o = zmalloc(sizeof(*o));
2104 }
2105 o->type = type;
2106 o->encoding = REDIS_ENCODING_RAW;
2107 o->ptr = ptr;
2108 o->refcount = 1;
2109 return o;
2110 }
2111
2112 static robj *createStringObject(char *ptr, size_t len) {
2113 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2114 }
2115
2116 static robj *createListObject(void) {
2117 list *l = listCreate();
2118
2119 listSetFreeMethod(l,decrRefCount);
2120 return createObject(REDIS_LIST,l);
2121 }
2122
2123 static robj *createSetObject(void) {
2124 dict *d = dictCreate(&setDictType,NULL);
2125 return createObject(REDIS_SET,d);
2126 }
2127
2128 static robj *createZsetObject(void) {
2129 zset *zs = zmalloc(sizeof(*zs));
2130
2131 zs->dict = dictCreate(&zsetDictType,NULL);
2132 zs->zsl = zslCreate();
2133 return createObject(REDIS_ZSET,zs);
2134 }
2135
2136 static void freeStringObject(robj *o) {
2137 if (o->encoding == REDIS_ENCODING_RAW) {
2138 sdsfree(o->ptr);
2139 }
2140 }
2141
2142 static void freeListObject(robj *o) {
2143 listRelease((list*) o->ptr);
2144 }
2145
2146 static void freeSetObject(robj *o) {
2147 dictRelease((dict*) o->ptr);
2148 }
2149
2150 static void freeZsetObject(robj *o) {
2151 zset *zs = o->ptr;
2152
2153 dictRelease(zs->dict);
2154 zslFree(zs->zsl);
2155 zfree(zs);
2156 }
2157
2158 static void freeHashObject(robj *o) {
2159 dictRelease((dict*) o->ptr);
2160 }
2161
2162 static void incrRefCount(robj *o) {
2163 o->refcount++;
2164 #ifdef DEBUG_REFCOUNT
2165 if (o->type == REDIS_STRING)
2166 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2167 #endif
2168 }
2169
2170 static void decrRefCount(void *obj) {
2171 robj *o = obj;
2172
2173 #ifdef DEBUG_REFCOUNT
2174 if (o->type == REDIS_STRING)
2175 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2176 #endif
2177 if (--(o->refcount) == 0) {
2178 switch(o->type) {
2179 case REDIS_STRING: freeStringObject(o); break;
2180 case REDIS_LIST: freeListObject(o); break;
2181 case REDIS_SET: freeSetObject(o); break;
2182 case REDIS_ZSET: freeZsetObject(o); break;
2183 case REDIS_HASH: freeHashObject(o); break;
2184 default: redisAssert(0 != 0); break;
2185 }
2186 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2187 !listAddNodeHead(server.objfreelist,o))
2188 zfree(o);
2189 }
2190 }
2191
2192 static robj *lookupKey(redisDb *db, robj *key) {
2193 dictEntry *de = dictFind(db->dict,key);
2194 return de ? dictGetEntryVal(de) : NULL;
2195 }
2196
2197 static robj *lookupKeyRead(redisDb *db, robj *key) {
2198 expireIfNeeded(db,key);
2199 return lookupKey(db,key);
2200 }
2201
2202 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2203 deleteIfVolatile(db,key);
2204 return lookupKey(db,key);
2205 }
2206
2207 static int deleteKey(redisDb *db, robj *key) {
2208 int retval;
2209
2210 /* We need to protect key from destruction: after the first dictDelete()
2211 * it may happen that 'key' is no longer valid if we don't increment
2212 * it's count. This may happen when we get the object reference directly
2213 * from the hash table with dictRandomKey() or dict iterators */
2214 incrRefCount(key);
2215 if (dictSize(db->expires)) dictDelete(db->expires,key);
2216 retval = dictDelete(db->dict,key);
2217 decrRefCount(key);
2218
2219 return retval == DICT_OK;
2220 }
2221
2222 /* Try to share an object against the shared objects pool */
2223 static robj *tryObjectSharing(robj *o) {
2224 struct dictEntry *de;
2225 unsigned long c;
2226
2227 if (o == NULL || server.shareobjects == 0) return o;
2228
2229 redisAssert(o->type == REDIS_STRING);
2230 de = dictFind(server.sharingpool,o);
2231 if (de) {
2232 robj *shared = dictGetEntryKey(de);
2233
2234 c = ((unsigned long) dictGetEntryVal(de))+1;
2235 dictGetEntryVal(de) = (void*) c;
2236 incrRefCount(shared);
2237 decrRefCount(o);
2238 return shared;
2239 } else {
2240 /* Here we are using a stream algorihtm: Every time an object is
2241 * shared we increment its count, everytime there is a miss we
2242 * recrement the counter of a random object. If this object reaches
2243 * zero we remove the object and put the current object instead. */
2244 if (dictSize(server.sharingpool) >=
2245 server.sharingpoolsize) {
2246 de = dictGetRandomKey(server.sharingpool);
2247 redisAssert(de != NULL);
2248 c = ((unsigned long) dictGetEntryVal(de))-1;
2249 dictGetEntryVal(de) = (void*) c;
2250 if (c == 0) {
2251 dictDelete(server.sharingpool,de->key);
2252 }
2253 } else {
2254 c = 0; /* If the pool is empty we want to add this object */
2255 }
2256 if (c == 0) {
2257 int retval;
2258
2259 retval = dictAdd(server.sharingpool,o,(void*)1);
2260 redisAssert(retval == DICT_OK);
2261 incrRefCount(o);
2262 }
2263 return o;
2264 }
2265 }
2266
2267 /* Check if the nul-terminated string 's' can be represented by a long
2268 * (that is, is a number that fits into long without any other space or
2269 * character before or after the digits).
2270 *
2271 * If so, the function returns REDIS_OK and *longval is set to the value
2272 * of the number. Otherwise REDIS_ERR is returned */
2273 static int isStringRepresentableAsLong(sds s, long *longval) {
2274 char buf[32], *endptr;
2275 long value;
2276 int slen;
2277
2278 value = strtol(s, &endptr, 10);
2279 if (endptr[0] != '\0') return REDIS_ERR;
2280 slen = snprintf(buf,32,"%ld",value);
2281
2282 /* If the number converted back into a string is not identical
2283 * then it's not possible to encode the string as integer */
2284 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2285 if (longval) *longval = value;
2286 return REDIS_OK;
2287 }
2288
2289 /* Try to encode a string object in order to save space */
2290 static int tryObjectEncoding(robj *o) {
2291 long value;
2292 sds s = o->ptr;
2293
2294 if (o->encoding != REDIS_ENCODING_RAW)
2295 return REDIS_ERR; /* Already encoded */
2296
2297 /* It's not save to encode shared objects: shared objects can be shared
2298 * everywhere in the "object space" of Redis. Encoded objects can only
2299 * appear as "values" (and not, for instance, as keys) */
2300 if (o->refcount > 1) return REDIS_ERR;
2301
2302 /* Currently we try to encode only strings */
2303 redisAssert(o->type == REDIS_STRING);
2304
2305 /* Check if we can represent this string as a long integer */
2306 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2307
2308 /* Ok, this object can be encoded */
2309 o->encoding = REDIS_ENCODING_INT;
2310 sdsfree(o->ptr);
2311 o->ptr = (void*) value;
2312 return REDIS_OK;
2313 }
2314
2315 /* Get a decoded version of an encoded object (returned as a new object).
2316 * If the object is already raw-encoded just increment the ref count. */
2317 static robj *getDecodedObject(robj *o) {
2318 robj *dec;
2319
2320 if (o->encoding == REDIS_ENCODING_RAW) {
2321 incrRefCount(o);
2322 return o;
2323 }
2324 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2325 char buf[32];
2326
2327 snprintf(buf,32,"%ld",(long)o->ptr);
2328 dec = createStringObject(buf,strlen(buf));
2329 return dec;
2330 } else {
2331 redisAssert(1 != 1);
2332 }
2333 }
2334
2335 /* Compare two string objects via strcmp() or alike.
2336 * Note that the objects may be integer-encoded. In such a case we
2337 * use snprintf() to get a string representation of the numbers on the stack
2338 * and compare the strings, it's much faster than calling getDecodedObject().
2339 *
2340 * Important note: if objects are not integer encoded, but binary-safe strings,
2341 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2342 * binary safe. */
2343 static int compareStringObjects(robj *a, robj *b) {
2344 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2345 char bufa[128], bufb[128], *astr, *bstr;
2346 int bothsds = 1;
2347
2348 if (a == b) return 0;
2349 if (a->encoding != REDIS_ENCODING_RAW) {
2350 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2351 astr = bufa;
2352 bothsds = 0;
2353 } else {
2354 astr = a->ptr;
2355 }
2356 if (b->encoding != REDIS_ENCODING_RAW) {
2357 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2358 bstr = bufb;
2359 bothsds = 0;
2360 } else {
2361 bstr = b->ptr;
2362 }
2363 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2364 }
2365
2366 static size_t stringObjectLen(robj *o) {
2367 redisAssert(o->type == REDIS_STRING);
2368 if (o->encoding == REDIS_ENCODING_RAW) {
2369 return sdslen(o->ptr);
2370 } else {
2371 char buf[32];
2372
2373 return snprintf(buf,32,"%ld",(long)o->ptr);
2374 }
2375 }
2376
2377 /*============================ DB saving/loading ============================ */
2378
2379 static int rdbSaveType(FILE *fp, unsigned char type) {
2380 if (fwrite(&type,1,1,fp) == 0) return -1;
2381 return 0;
2382 }
2383
2384 static int rdbSaveTime(FILE *fp, time_t t) {
2385 int32_t t32 = (int32_t) t;
2386 if (fwrite(&t32,4,1,fp) == 0) return -1;
2387 return 0;
2388 }
2389
2390 /* check rdbLoadLen() comments for more info */
2391 static int rdbSaveLen(FILE *fp, uint32_t len) {
2392 unsigned char buf[2];
2393
2394 if (len < (1<<6)) {
2395 /* Save a 6 bit len */
2396 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2397 if (fwrite(buf,1,1,fp) == 0) return -1;
2398 } else if (len < (1<<14)) {
2399 /* Save a 14 bit len */
2400 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2401 buf[1] = len&0xFF;
2402 if (fwrite(buf,2,1,fp) == 0) return -1;
2403 } else {
2404 /* Save a 32 bit len */
2405 buf[0] = (REDIS_RDB_32BITLEN<<6);
2406 if (fwrite(buf,1,1,fp) == 0) return -1;
2407 len = htonl(len);
2408 if (fwrite(&len,4,1,fp) == 0) return -1;
2409 }
2410 return 0;
2411 }
2412
2413 /* String objects in the form "2391" "-100" without any space and with a
2414 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2415 * encoded as integers to save space */
2416 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2417 long long value;
2418 char *endptr, buf[32];
2419
2420 /* Check if it's possible to encode this value as a number */
2421 value = strtoll(s, &endptr, 10);
2422 if (endptr[0] != '\0') return 0;
2423 snprintf(buf,32,"%lld",value);
2424
2425 /* If the number converted back into a string is not identical
2426 * then it's not possible to encode the string as integer */
2427 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2428
2429 /* Finally check if it fits in our ranges */
2430 if (value >= -(1<<7) && value <= (1<<7)-1) {
2431 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2432 enc[1] = value&0xFF;
2433 return 2;
2434 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2435 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2436 enc[1] = value&0xFF;
2437 enc[2] = (value>>8)&0xFF;
2438 return 3;
2439 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2440 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2441 enc[1] = value&0xFF;
2442 enc[2] = (value>>8)&0xFF;
2443 enc[3] = (value>>16)&0xFF;
2444 enc[4] = (value>>24)&0xFF;
2445 return 5;
2446 } else {
2447 return 0;
2448 }
2449 }
2450
2451 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2452 unsigned int comprlen, outlen;
2453 unsigned char byte;
2454 void *out;
2455
2456 /* We require at least four bytes compression for this to be worth it */
2457 outlen = sdslen(obj->ptr)-4;
2458 if (outlen <= 0) return 0;
2459 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2460 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2461 if (comprlen == 0) {
2462 zfree(out);
2463 return 0;
2464 }
2465 /* Data compressed! Let's save it on disk */
2466 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2467 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2468 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2469 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2470 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2471 zfree(out);
2472 return comprlen;
2473
2474 writeerr:
2475 zfree(out);
2476 return -1;
2477 }
2478
2479 /* Save a string objet as [len][data] on disk. If the object is a string
2480 * representation of an integer value we try to safe it in a special form */
2481 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2482 size_t len;
2483 int enclen;
2484
2485 len = sdslen(obj->ptr);
2486
2487 /* Try integer encoding */
2488 if (len <= 11) {
2489 unsigned char buf[5];
2490 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2491 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2492 return 0;
2493 }
2494 }
2495
2496 /* Try LZF compression - under 20 bytes it's unable to compress even
2497 * aaaaaaaaaaaaaaaaaa so skip it */
2498 if (server.rdbcompression && len > 20) {
2499 int retval;
2500
2501 retval = rdbSaveLzfStringObject(fp,obj);
2502 if (retval == -1) return -1;
2503 if (retval > 0) return 0;
2504 /* retval == 0 means data can't be compressed, save the old way */
2505 }
2506
2507 /* Store verbatim */
2508 if (rdbSaveLen(fp,len) == -1) return -1;
2509 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2510 return 0;
2511 }
2512
2513 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2514 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2515 int retval;
2516
2517 obj = getDecodedObject(obj);
2518 retval = rdbSaveStringObjectRaw(fp,obj);
2519 decrRefCount(obj);
2520 return retval;
2521 }
2522
2523 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2524 * 8 bit integer specifing the length of the representation.
2525 * This 8 bit integer has special values in order to specify the following
2526 * conditions:
2527 * 253: not a number
2528 * 254: + inf
2529 * 255: - inf
2530 */
2531 static int rdbSaveDoubleValue(FILE *fp, double val) {
2532 unsigned char buf[128];
2533 int len;
2534
2535 if (isnan(val)) {
2536 buf[0] = 253;
2537 len = 1;
2538 } else if (!isfinite(val)) {
2539 len = 1;
2540 buf[0] = (val < 0) ? 255 : 254;
2541 } else {
2542 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2543 buf[0] = strlen((char*)buf+1);
2544 len = buf[0]+1;
2545 }
2546 if (fwrite(buf,len,1,fp) == 0) return -1;
2547 return 0;
2548 }
2549
2550 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2551 static int rdbSave(char *filename) {
2552 dictIterator *di = NULL;
2553 dictEntry *de;
2554 FILE *fp;
2555 char tmpfile[256];
2556 int j;
2557 time_t now = time(NULL);
2558
2559 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2560 fp = fopen(tmpfile,"w");
2561 if (!fp) {
2562 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2563 return REDIS_ERR;
2564 }
2565 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2566 for (j = 0; j < server.dbnum; j++) {
2567 redisDb *db = server.db+j;
2568 dict *d = db->dict;
2569 if (dictSize(d) == 0) continue;
2570 di = dictGetIterator(d);
2571 if (!di) {
2572 fclose(fp);
2573 return REDIS_ERR;
2574 }
2575
2576 /* Write the SELECT DB opcode */
2577 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2578 if (rdbSaveLen(fp,j) == -1) goto werr;
2579
2580 /* Iterate this DB writing every entry */
2581 while((de = dictNext(di)) != NULL) {
2582 robj *key = dictGetEntryKey(de);
2583 robj *o = dictGetEntryVal(de);
2584 time_t expiretime = getExpire(db,key);
2585
2586 /* Save the expire time */
2587 if (expiretime != -1) {
2588 /* If this key is already expired skip it */
2589 if (expiretime < now) continue;
2590 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2591 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2592 }
2593 /* Save the key and associated value */
2594 if (rdbSaveType(fp,o->type) == -1) goto werr;
2595 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2596 if (o->type == REDIS_STRING) {
2597 /* Save a string value */
2598 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2599 } else if (o->type == REDIS_LIST) {
2600 /* Save a list value */
2601 list *list = o->ptr;
2602 listNode *ln;
2603
2604 listRewind(list);
2605 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2606 while((ln = listYield(list))) {
2607 robj *eleobj = listNodeValue(ln);
2608
2609 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2610 }
2611 } else if (o->type == REDIS_SET) {
2612 /* Save a set value */
2613 dict *set = o->ptr;
2614 dictIterator *di = dictGetIterator(set);
2615 dictEntry *de;
2616
2617 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2618 while((de = dictNext(di)) != NULL) {
2619 robj *eleobj = dictGetEntryKey(de);
2620
2621 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2622 }
2623 dictReleaseIterator(di);
2624 } else if (o->type == REDIS_ZSET) {
2625 /* Save a set value */
2626 zset *zs = o->ptr;
2627 dictIterator *di = dictGetIterator(zs->dict);
2628 dictEntry *de;
2629
2630 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2631 while((de = dictNext(di)) != NULL) {
2632 robj *eleobj = dictGetEntryKey(de);
2633 double *score = dictGetEntryVal(de);
2634
2635 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2636 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2637 }
2638 dictReleaseIterator(di);
2639 } else {
2640 redisAssert(0 != 0);
2641 }
2642 }
2643 dictReleaseIterator(di);
2644 }
2645 /* EOF opcode */
2646 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2647
2648 /* Make sure data will not remain on the OS's output buffers */
2649 fflush(fp);
2650 fsync(fileno(fp));
2651 fclose(fp);
2652
2653 /* Use RENAME to make sure the DB file is changed atomically only
2654 * if the generate DB file is ok. */
2655 if (rename(tmpfile,filename) == -1) {
2656 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2657 unlink(tmpfile);
2658 return REDIS_ERR;
2659 }
2660 redisLog(REDIS_NOTICE,"DB saved on disk");
2661 server.dirty = 0;
2662 server.lastsave = time(NULL);
2663 return REDIS_OK;
2664
2665 werr:
2666 fclose(fp);
2667 unlink(tmpfile);
2668 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2669 if (di) dictReleaseIterator(di);
2670 return REDIS_ERR;
2671 }
2672
2673 static int rdbSaveBackground(char *filename) {
2674 pid_t childpid;
2675
2676 if (server.bgsavechildpid != -1) return REDIS_ERR;
2677 if ((childpid = fork()) == 0) {
2678 /* Child */
2679 close(server.fd);
2680 if (rdbSave(filename) == REDIS_OK) {
2681 exit(0);
2682 } else {
2683 exit(1);
2684 }
2685 } else {
2686 /* Parent */
2687 if (childpid == -1) {
2688 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2689 strerror(errno));
2690 return REDIS_ERR;
2691 }
2692 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2693 server.bgsavechildpid = childpid;
2694 return REDIS_OK;
2695 }
2696 return REDIS_OK; /* unreached */
2697 }
2698
2699 static void rdbRemoveTempFile(pid_t childpid) {
2700 char tmpfile[256];
2701
2702 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2703 unlink(tmpfile);
2704 }
2705
2706 static int rdbLoadType(FILE *fp) {
2707 unsigned char type;
2708 if (fread(&type,1,1,fp) == 0) return -1;
2709 return type;
2710 }
2711
2712 static time_t rdbLoadTime(FILE *fp) {
2713 int32_t t32;
2714 if (fread(&t32,4,1,fp) == 0) return -1;
2715 return (time_t) t32;
2716 }
2717
2718 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2719 * of this file for a description of how this are stored on disk.
2720 *
2721 * isencoded is set to 1 if the readed length is not actually a length but
2722 * an "encoding type", check the above comments for more info */
2723 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2724 unsigned char buf[2];
2725 uint32_t len;
2726
2727 if (isencoded) *isencoded = 0;
2728 if (rdbver == 0) {
2729 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2730 return ntohl(len);
2731 } else {
2732 int type;
2733
2734 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2735 type = (buf[0]&0xC0)>>6;
2736 if (type == REDIS_RDB_6BITLEN) {
2737 /* Read a 6 bit len */
2738 return buf[0]&0x3F;
2739 } else if (type == REDIS_RDB_ENCVAL) {
2740 /* Read a 6 bit len encoding type */
2741 if (isencoded) *isencoded = 1;
2742 return buf[0]&0x3F;
2743 } else if (type == REDIS_RDB_14BITLEN) {
2744 /* Read a 14 bit len */
2745 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2746 return ((buf[0]&0x3F)<<8)|buf[1];
2747 } else {
2748 /* Read a 32 bit len */
2749 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2750 return ntohl(len);
2751 }
2752 }
2753 }
2754
2755 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2756 unsigned char enc[4];
2757 long long val;
2758
2759 if (enctype == REDIS_RDB_ENC_INT8) {
2760 if (fread(enc,1,1,fp) == 0) return NULL;
2761 val = (signed char)enc[0];
2762 } else if (enctype == REDIS_RDB_ENC_INT16) {
2763 uint16_t v;
2764 if (fread(enc,2,1,fp) == 0) return NULL;
2765 v = enc[0]|(enc[1]<<8);
2766 val = (int16_t)v;
2767 } else if (enctype == REDIS_RDB_ENC_INT32) {
2768 uint32_t v;
2769 if (fread(enc,4,1,fp) == 0) return NULL;
2770 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2771 val = (int32_t)v;
2772 } else {
2773 val = 0; /* anti-warning */
2774 redisAssert(0!=0);
2775 }
2776 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2777 }
2778
2779 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2780 unsigned int len, clen;
2781 unsigned char *c = NULL;
2782 sds val = NULL;
2783
2784 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2785 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2786 if ((c = zmalloc(clen)) == NULL) goto err;
2787 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2788 if (fread(c,clen,1,fp) == 0) goto err;
2789 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2790 zfree(c);
2791 return createObject(REDIS_STRING,val);
2792 err:
2793 zfree(c);
2794 sdsfree(val);
2795 return NULL;
2796 }
2797
2798 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2799 int isencoded;
2800 uint32_t len;
2801 sds val;
2802
2803 len = rdbLoadLen(fp,rdbver,&isencoded);
2804 if (isencoded) {
2805 switch(len) {
2806 case REDIS_RDB_ENC_INT8:
2807 case REDIS_RDB_ENC_INT16:
2808 case REDIS_RDB_ENC_INT32:
2809 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2810 case REDIS_RDB_ENC_LZF:
2811 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2812 default:
2813 redisAssert(0!=0);
2814 }
2815 }
2816
2817 if (len == REDIS_RDB_LENERR) return NULL;
2818 val = sdsnewlen(NULL,len);
2819 if (len && fread(val,len,1,fp) == 0) {
2820 sdsfree(val);
2821 return NULL;
2822 }
2823 return tryObjectSharing(createObject(REDIS_STRING,val));
2824 }
2825
2826 /* For information about double serialization check rdbSaveDoubleValue() */
2827 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2828 char buf[128];
2829 unsigned char len;
2830
2831 if (fread(&len,1,1,fp) == 0) return -1;
2832 switch(len) {
2833 case 255: *val = R_NegInf; return 0;
2834 case 254: *val = R_PosInf; return 0;
2835 case 253: *val = R_Nan; return 0;
2836 default:
2837 if (fread(buf,len,1,fp) == 0) return -1;
2838 buf[len] = '\0';
2839 sscanf(buf, "%lg", val);
2840 return 0;
2841 }
2842 }
2843
2844 static int rdbLoad(char *filename) {
2845 FILE *fp;
2846 robj *keyobj = NULL;
2847 uint32_t dbid;
2848 int type, retval, rdbver;
2849 dict *d = server.db[0].dict;
2850 redisDb *db = server.db+0;
2851 char buf[1024];
2852 time_t expiretime = -1, now = time(NULL);
2853
2854 fp = fopen(filename,"r");
2855 if (!fp) return REDIS_ERR;
2856 if (fread(buf,9,1,fp) == 0) goto eoferr;
2857 buf[9] = '\0';
2858 if (memcmp(buf,"REDIS",5) != 0) {
2859 fclose(fp);
2860 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2861 return REDIS_ERR;
2862 }
2863 rdbver = atoi(buf+5);
2864 if (rdbver > 1) {
2865 fclose(fp);
2866 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2867 return REDIS_ERR;
2868 }
2869 while(1) {
2870 robj *o;
2871
2872 /* Read type. */
2873 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2874 if (type == REDIS_EXPIRETIME) {
2875 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2876 /* We read the time so we need to read the object type again */
2877 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2878 }
2879 if (type == REDIS_EOF) break;
2880 /* Handle SELECT DB opcode as a special case */
2881 if (type == REDIS_SELECTDB) {
2882 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2883 goto eoferr;
2884 if (dbid >= (unsigned)server.dbnum) {
2885 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2886 exit(1);
2887 }
2888 db = server.db+dbid;
2889 d = db->dict;
2890 continue;
2891 }
2892 /* Read key */
2893 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2894
2895 if (type == REDIS_STRING) {
2896 /* Read string value */
2897 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2898 tryObjectEncoding(o);
2899 } else if (type == REDIS_LIST || type == REDIS_SET) {
2900 /* Read list/set value */
2901 uint32_t listlen;
2902
2903 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2904 goto eoferr;
2905 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2906 /* Load every single element of the list/set */
2907 while(listlen--) {
2908 robj *ele;
2909
2910 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2911 tryObjectEncoding(ele);
2912 if (type == REDIS_LIST) {
2913 listAddNodeTail((list*)o->ptr,ele);
2914 } else {
2915 dictAdd((dict*)o->ptr,ele,NULL);
2916 }
2917 }
2918 } else if (type == REDIS_ZSET) {
2919 /* Read list/set value */
2920 uint32_t zsetlen;
2921 zset *zs;
2922
2923 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2924 goto eoferr;
2925 o = createZsetObject();
2926 zs = o->ptr;
2927 /* Load every single element of the list/set */
2928 while(zsetlen--) {
2929 robj *ele;
2930 double *score = zmalloc(sizeof(double));
2931
2932 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2933 tryObjectEncoding(ele);
2934 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2935 dictAdd(zs->dict,ele,score);
2936 zslInsert(zs->zsl,*score,ele);
2937 incrRefCount(ele); /* added to skiplist */
2938 }
2939 } else {
2940 redisAssert(0 != 0);
2941 }
2942 /* Add the new object in the hash table */
2943 retval = dictAdd(d,keyobj,o);
2944 if (retval == DICT_ERR) {
2945 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2946 exit(1);
2947 }
2948 /* Set the expire time if needed */
2949 if (expiretime != -1) {
2950 setExpire(db,keyobj,expiretime);
2951 /* Delete this key if already expired */
2952 if (expiretime < now) deleteKey(db,keyobj);
2953 expiretime = -1;
2954 }
2955 keyobj = o = NULL;
2956 }
2957 fclose(fp);
2958 return REDIS_OK;
2959
2960 eoferr: /* unexpected end of file is handled here with a fatal exit */
2961 if (keyobj) decrRefCount(keyobj);
2962 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2963 exit(1);
2964 return REDIS_ERR; /* Just to avoid warning */
2965 }
2966
2967 /*================================== Commands =============================== */
2968
2969 static void authCommand(redisClient *c) {
2970 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2971 c->authenticated = 1;
2972 addReply(c,shared.ok);
2973 } else {
2974 c->authenticated = 0;
2975 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2976 }
2977 }
2978
2979 static void pingCommand(redisClient *c) {
2980 addReply(c,shared.pong);
2981 }
2982
2983 static void echoCommand(redisClient *c) {
2984 addReplyBulkLen(c,c->argv[1]);
2985 addReply(c,c->argv[1]);
2986 addReply(c,shared.crlf);
2987 }
2988
2989 /*=================================== Strings =============================== */
2990
2991 static void setGenericCommand(redisClient *c, int nx) {
2992 int retval;
2993
2994 if (nx) deleteIfVolatile(c->db,c->argv[1]);
2995 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2996 if (retval == DICT_ERR) {
2997 if (!nx) {
2998 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2999 incrRefCount(c->argv[2]);
3000 } else {
3001 addReply(c,shared.czero);
3002 return;
3003 }
3004 } else {
3005 incrRefCount(c->argv[1]);
3006 incrRefCount(c->argv[2]);
3007 }
3008 server.dirty++;
3009 removeExpire(c->db,c->argv[1]);
3010 addReply(c, nx ? shared.cone : shared.ok);
3011 }
3012
3013 static void setCommand(redisClient *c) {
3014 setGenericCommand(c,0);
3015 }
3016
3017 static void setnxCommand(redisClient *c) {
3018 setGenericCommand(c,1);
3019 }
3020
3021 static int getGenericCommand(redisClient *c) {
3022 robj *o = lookupKeyRead(c->db,c->argv[1]);
3023
3024 if (o == NULL) {
3025 addReply(c,shared.nullbulk);
3026 return REDIS_OK;
3027 } else {
3028 if (o->type != REDIS_STRING) {
3029 addReply(c,shared.wrongtypeerr);
3030 return REDIS_ERR;
3031 } else {
3032 addReplyBulkLen(c,o);
3033 addReply(c,o);
3034 addReply(c,shared.crlf);
3035 return REDIS_OK;
3036 }
3037 }
3038 }
3039
3040 static void getCommand(redisClient *c) {
3041 getGenericCommand(c);
3042 }
3043
3044 static void getsetCommand(redisClient *c) {
3045 if (getGenericCommand(c) == REDIS_ERR) return;
3046 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3047 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3048 } else {
3049 incrRefCount(c->argv[1]);
3050 }
3051 incrRefCount(c->argv[2]);
3052 server.dirty++;
3053 removeExpire(c->db,c->argv[1]);
3054 }
3055
3056 static void mgetCommand(redisClient *c) {
3057 int j;
3058
3059 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3060 for (j = 1; j < c->argc; j++) {
3061 robj *o = lookupKeyRead(c->db,c->argv[j]);
3062 if (o == NULL) {
3063 addReply(c,shared.nullbulk);
3064 } else {
3065 if (o->type != REDIS_STRING) {
3066 addReply(c,shared.nullbulk);
3067 } else {
3068 addReplyBulkLen(c,o);
3069 addReply(c,o);
3070 addReply(c,shared.crlf);
3071 }
3072 }
3073 }
3074 }
3075
3076 static void msetGenericCommand(redisClient *c, int nx) {
3077 int j, busykeys = 0;
3078
3079 if ((c->argc % 2) == 0) {
3080 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3081 return;
3082 }
3083 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3084 * set nothing at all if at least one already key exists. */
3085 if (nx) {
3086 for (j = 1; j < c->argc; j += 2) {
3087 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3088 busykeys++;
3089 }
3090 }
3091 }
3092 if (busykeys) {
3093 addReply(c, shared.czero);
3094 return;
3095 }
3096
3097 for (j = 1; j < c->argc; j += 2) {
3098 int retval;
3099
3100 tryObjectEncoding(c->argv[j+1]);
3101 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3102 if (retval == DICT_ERR) {
3103 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3104 incrRefCount(c->argv[j+1]);
3105 } else {
3106 incrRefCount(c->argv[j]);
3107 incrRefCount(c->argv[j+1]);
3108 }
3109 removeExpire(c->db,c->argv[j]);
3110 }
3111 server.dirty += (c->argc-1)/2;
3112 addReply(c, nx ? shared.cone : shared.ok);
3113 }
3114
3115 static void msetCommand(redisClient *c) {
3116 msetGenericCommand(c,0);
3117 }
3118
3119 static void msetnxCommand(redisClient *c) {
3120 msetGenericCommand(c,1);
3121 }
3122
3123 static void incrDecrCommand(redisClient *c, long long incr) {
3124 long long value;
3125 int retval;
3126 robj *o;
3127
3128 o = lookupKeyWrite(c->db,c->argv[1]);
3129 if (o == NULL) {
3130 value = 0;
3131 } else {
3132 if (o->type != REDIS_STRING) {
3133 value = 0;
3134 } else {
3135 char *eptr;
3136
3137 if (o->encoding == REDIS_ENCODING_RAW)
3138 value = strtoll(o->ptr, &eptr, 10);
3139 else if (o->encoding == REDIS_ENCODING_INT)
3140 value = (long)o->ptr;
3141 else
3142 redisAssert(1 != 1);
3143 }
3144 }
3145
3146 value += incr;
3147 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3148 tryObjectEncoding(o);
3149 retval = dictAdd(c->db->dict,c->argv[1],o);
3150 if (retval == DICT_ERR) {
3151 dictReplace(c->db->dict,c->argv[1],o);
3152 removeExpire(c->db,c->argv[1]);
3153 } else {
3154 incrRefCount(c->argv[1]);
3155 }
3156 server.dirty++;
3157 addReply(c,shared.colon);
3158 addReply(c,o);
3159 addReply(c,shared.crlf);
3160 }
3161
3162 static void incrCommand(redisClient *c) {
3163 incrDecrCommand(c,1);
3164 }
3165
3166 static void decrCommand(redisClient *c) {
3167 incrDecrCommand(c,-1);
3168 }
3169
3170 static void incrbyCommand(redisClient *c) {
3171 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3172 incrDecrCommand(c,incr);
3173 }
3174
3175 static void decrbyCommand(redisClient *c) {
3176 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3177 incrDecrCommand(c,-incr);
3178 }
3179
3180 /* ========================= Type agnostic commands ========================= */
3181
3182 static void delCommand(redisClient *c) {
3183 int deleted = 0, j;
3184
3185 for (j = 1; j < c->argc; j++) {
3186 if (deleteKey(c->db,c->argv[j])) {
3187 server.dirty++;
3188 deleted++;
3189 }
3190 }
3191 switch(deleted) {
3192 case 0:
3193 addReply(c,shared.czero);
3194 break;
3195 case 1:
3196 addReply(c,shared.cone);
3197 break;
3198 default:
3199 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3200 break;
3201 }
3202 }
3203
3204 static void existsCommand(redisClient *c) {
3205 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3206 }
3207
3208 static void selectCommand(redisClient *c) {
3209 int id = atoi(c->argv[1]->ptr);
3210
3211 if (selectDb(c,id) == REDIS_ERR) {
3212 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3213 } else {
3214 addReply(c,shared.ok);
3215 }
3216 }
3217
3218 static void randomkeyCommand(redisClient *c) {
3219 dictEntry *de;
3220
3221 while(1) {
3222 de = dictGetRandomKey(c->db->dict);
3223 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3224 }
3225 if (de == NULL) {
3226 addReply(c,shared.plus);
3227 addReply(c,shared.crlf);
3228 } else {
3229 addReply(c,shared.plus);
3230 addReply(c,dictGetEntryKey(de));
3231 addReply(c,shared.crlf);
3232 }
3233 }
3234
3235 static void keysCommand(redisClient *c) {
3236 dictIterator *di;
3237 dictEntry *de;
3238 sds pattern = c->argv[1]->ptr;
3239 int plen = sdslen(pattern);
3240 unsigned long numkeys = 0, keyslen = 0;
3241 robj *lenobj = createObject(REDIS_STRING,NULL);
3242
3243 di = dictGetIterator(c->db->dict);
3244 addReply(c,lenobj);
3245 decrRefCount(lenobj);
3246 while((de = dictNext(di)) != NULL) {
3247 robj *keyobj = dictGetEntryKey(de);
3248
3249 sds key = keyobj->ptr;
3250 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3251 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3252 if (expireIfNeeded(c->db,keyobj) == 0) {
3253 if (numkeys != 0)
3254 addReply(c,shared.space);
3255 addReply(c,keyobj);
3256 numkeys++;
3257 keyslen += sdslen(key);
3258 }
3259 }
3260 }
3261 dictReleaseIterator(di);
3262 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3263 addReply(c,shared.crlf);
3264 }
3265
3266 static void dbsizeCommand(redisClient *c) {
3267 addReplySds(c,
3268 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3269 }
3270
3271 static void lastsaveCommand(redisClient *c) {
3272 addReplySds(c,
3273 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3274 }
3275
3276 static void typeCommand(redisClient *c) {
3277 robj *o;
3278 char *type;
3279
3280 o = lookupKeyRead(c->db,c->argv[1]);
3281 if (o == NULL) {
3282 type = "+none";
3283 } else {
3284 switch(o->type) {
3285 case REDIS_STRING: type = "+string"; break;
3286 case REDIS_LIST: type = "+list"; break;
3287 case REDIS_SET: type = "+set"; break;
3288 case REDIS_ZSET: type = "+zset"; break;
3289 default: type = "unknown"; break;
3290 }
3291 }
3292 addReplySds(c,sdsnew(type));
3293 addReply(c,shared.crlf);
3294 }
3295
3296 static void saveCommand(redisClient *c) {
3297 if (server.bgsavechildpid != -1) {
3298 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3299 return;
3300 }
3301 if (rdbSave(server.dbfilename) == REDIS_OK) {
3302 addReply(c,shared.ok);
3303 } else {
3304 addReply(c,shared.err);
3305 }
3306 }
3307
3308 static void bgsaveCommand(redisClient *c) {
3309 if (server.bgsavechildpid != -1) {
3310 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3311 return;
3312 }
3313 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3314 char *status = "+Background saving started\r\n";
3315 addReplySds(c,sdsnew(status));
3316 } else {
3317 addReply(c,shared.err);
3318 }
3319 }
3320
3321 static void shutdownCommand(redisClient *c) {
3322 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3323 /* Kill the saving child if there is a background saving in progress.
3324 We want to avoid race conditions, for instance our saving child may
3325 overwrite the synchronous saving did by SHUTDOWN. */
3326 if (server.bgsavechildpid != -1) {
3327 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3328 kill(server.bgsavechildpid,SIGKILL);
3329 rdbRemoveTempFile(server.bgsavechildpid);
3330 }
3331 if (server.appendonly) {
3332 /* Append only file: fsync() the AOF and exit */
3333 fsync(server.appendfd);
3334 exit(0);
3335 } else {
3336 /* Snapshotting. Perform a SYNC SAVE and exit */
3337 if (rdbSave(server.dbfilename) == REDIS_OK) {
3338 if (server.daemonize)
3339 unlink(server.pidfile);
3340 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3341 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3342 exit(0);
3343 } else {
3344 /* Ooops.. error saving! The best we can do is to continue operating.
3345 * Note that if there was a background saving process, in the next
3346 * cron() Redis will be notified that the background saving aborted,
3347 * handling special stuff like slaves pending for synchronization... */
3348 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3349 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3350 }
3351 }
3352 }
3353
3354 static void renameGenericCommand(redisClient *c, int nx) {
3355 robj *o;
3356
3357 /* To use the same key as src and dst is probably an error */
3358 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3359 addReply(c,shared.sameobjecterr);
3360 return;
3361 }
3362
3363 o = lookupKeyWrite(c->db,c->argv[1]);
3364 if (o == NULL) {
3365 addReply(c,shared.nokeyerr);
3366 return;
3367 }
3368 incrRefCount(o);
3369 deleteIfVolatile(c->db,c->argv[2]);
3370 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3371 if (nx) {
3372 decrRefCount(o);
3373 addReply(c,shared.czero);
3374 return;
3375 }
3376 dictReplace(c->db->dict,c->argv[2],o);
3377 } else {
3378 incrRefCount(c->argv[2]);
3379 }
3380 deleteKey(c->db,c->argv[1]);
3381 server.dirty++;
3382 addReply(c,nx ? shared.cone : shared.ok);
3383 }
3384
3385 static void renameCommand(redisClient *c) {
3386 renameGenericCommand(c,0);
3387 }
3388
3389 static void renamenxCommand(redisClient *c) {
3390 renameGenericCommand(c,1);
3391 }
3392
3393 static void moveCommand(redisClient *c) {
3394 robj *o;
3395 redisDb *src, *dst;
3396 int srcid;
3397
3398 /* Obtain source and target DB pointers */
3399 src = c->db;
3400 srcid = c->db->id;
3401 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3402 addReply(c,shared.outofrangeerr);
3403 return;
3404 }
3405 dst = c->db;
3406 selectDb(c,srcid); /* Back to the source DB */
3407
3408 /* If the user is moving using as target the same
3409 * DB as the source DB it is probably an error. */
3410 if (src == dst) {
3411 addReply(c,shared.sameobjecterr);
3412 return;
3413 }
3414
3415 /* Check if the element exists and get a reference */
3416 o = lookupKeyWrite(c->db,c->argv[1]);
3417 if (!o) {
3418 addReply(c,shared.czero);
3419 return;
3420 }
3421
3422 /* Try to add the element to the target DB */
3423 deleteIfVolatile(dst,c->argv[1]);
3424 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3425 addReply(c,shared.czero);
3426 return;
3427 }
3428 incrRefCount(c->argv[1]);
3429 incrRefCount(o);
3430
3431 /* OK! key moved, free the entry in the source DB */
3432 deleteKey(src,c->argv[1]);
3433 server.dirty++;
3434 addReply(c,shared.cone);
3435 }
3436
3437 /* =================================== Lists ================================ */
3438 static void pushGenericCommand(redisClient *c, int where) {
3439 robj *lobj;
3440 list *list;
3441
3442 lobj = lookupKeyWrite(c->db,c->argv[1]);
3443 if (lobj == NULL) {
3444 lobj = createListObject();
3445 list = lobj->ptr;
3446 if (where == REDIS_HEAD) {
3447 listAddNodeHead(list,c->argv[2]);
3448 } else {
3449 listAddNodeTail(list,c->argv[2]);
3450 }
3451 dictAdd(c->db->dict,c->argv[1],lobj);
3452 incrRefCount(c->argv[1]);
3453 incrRefCount(c->argv[2]);
3454 } else {
3455 if (lobj->type != REDIS_LIST) {
3456 addReply(c,shared.wrongtypeerr);
3457 return;
3458 }
3459 list = lobj->ptr;
3460 if (where == REDIS_HEAD) {
3461 listAddNodeHead(list,c->argv[2]);
3462 } else {
3463 listAddNodeTail(list,c->argv[2]);
3464 }
3465 incrRefCount(c->argv[2]);
3466 }
3467 server.dirty++;
3468 addReply(c,shared.ok);
3469 }
3470
3471 static void lpushCommand(redisClient *c) {
3472 pushGenericCommand(c,REDIS_HEAD);
3473 }
3474
3475 static void rpushCommand(redisClient *c) {
3476 pushGenericCommand(c,REDIS_TAIL);
3477 }
3478
3479 static void llenCommand(redisClient *c) {
3480 robj *o;
3481 list *l;
3482
3483 o = lookupKeyRead(c->db,c->argv[1]);
3484 if (o == NULL) {
3485 addReply(c,shared.czero);
3486 return;
3487 } else {
3488 if (o->type != REDIS_LIST) {
3489 addReply(c,shared.wrongtypeerr);
3490 } else {
3491 l = o->ptr;
3492 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3493 }
3494 }
3495 }
3496
3497 static void lindexCommand(redisClient *c) {
3498 robj *o;
3499 int index = atoi(c->argv[2]->ptr);
3500
3501 o = lookupKeyRead(c->db,c->argv[1]);
3502 if (o == NULL) {
3503 addReply(c,shared.nullbulk);
3504 } else {
3505 if (o->type != REDIS_LIST) {
3506 addReply(c,shared.wrongtypeerr);
3507 } else {
3508 list *list = o->ptr;
3509 listNode *ln;
3510
3511 ln = listIndex(list, index);
3512 if (ln == NULL) {
3513 addReply(c,shared.nullbulk);
3514 } else {
3515 robj *ele = listNodeValue(ln);
3516 addReplyBulkLen(c,ele);
3517 addReply(c,ele);
3518 addReply(c,shared.crlf);
3519 }
3520 }
3521 }
3522 }
3523
3524 static void lsetCommand(redisClient *c) {
3525 robj *o;
3526 int index = atoi(c->argv[2]->ptr);
3527
3528 o = lookupKeyWrite(c->db,c->argv[1]);
3529 if (o == NULL) {
3530 addReply(c,shared.nokeyerr);
3531 } else {
3532 if (o->type != REDIS_LIST) {
3533 addReply(c,shared.wrongtypeerr);
3534 } else {
3535 list *list = o->ptr;
3536 listNode *ln;
3537
3538 ln = listIndex(list, index);
3539 if (ln == NULL) {
3540 addReply(c,shared.outofrangeerr);
3541 } else {
3542 robj *ele = listNodeValue(ln);
3543
3544 decrRefCount(ele);
3545 listNodeValue(ln) = c->argv[3];
3546 incrRefCount(c->argv[3]);
3547 addReply(c,shared.ok);
3548 server.dirty++;
3549 }
3550 }
3551 }
3552 }
3553
3554 static void popGenericCommand(redisClient *c, int where) {
3555 robj *o;
3556
3557 o = lookupKeyWrite(c->db,c->argv[1]);
3558 if (o == NULL) {
3559 addReply(c,shared.nullbulk);
3560 } else {
3561 if (o->type != REDIS_LIST) {
3562 addReply(c,shared.wrongtypeerr);
3563 } else {
3564 list *list = o->ptr;
3565 listNode *ln;
3566
3567 if (where == REDIS_HEAD)
3568 ln = listFirst(list);
3569 else
3570 ln = listLast(list);
3571
3572 if (ln == NULL) {
3573 addReply(c,shared.nullbulk);
3574 } else {
3575 robj *ele = listNodeValue(ln);
3576 addReplyBulkLen(c,ele);
3577 addReply(c,ele);
3578 addReply(c,shared.crlf);
3579 listDelNode(list,ln);
3580 server.dirty++;
3581 }
3582 }
3583 }
3584 }
3585
3586 static void lpopCommand(redisClient *c) {
3587 popGenericCommand(c,REDIS_HEAD);
3588 }
3589
3590 static void rpopCommand(redisClient *c) {
3591 popGenericCommand(c,REDIS_TAIL);
3592 }
3593
3594 static void lrangeCommand(redisClient *c) {
3595 robj *o;
3596 int start = atoi(c->argv[2]->ptr);
3597 int end = atoi(c->argv[3]->ptr);
3598
3599 o = lookupKeyRead(c->db,c->argv[1]);
3600 if (o == NULL) {
3601 addReply(c,shared.nullmultibulk);
3602 } else {
3603 if (o->type != REDIS_LIST) {
3604 addReply(c,shared.wrongtypeerr);
3605 } else {
3606 list *list = o->ptr;
3607 listNode *ln;
3608 int llen = listLength(list);
3609 int rangelen, j;
3610 robj *ele;
3611
3612 /* convert negative indexes */
3613 if (start < 0) start = llen+start;
3614 if (end < 0) end = llen+end;
3615 if (start < 0) start = 0;
3616 if (end < 0) end = 0;
3617
3618 /* indexes sanity checks */
3619 if (start > end || start >= llen) {
3620 /* Out of range start or start > end result in empty list */
3621 addReply(c,shared.emptymultibulk);
3622 return;
3623 }
3624 if (end >= llen) end = llen-1;
3625 rangelen = (end-start)+1;
3626
3627 /* Return the result in form of a multi-bulk reply */
3628 ln = listIndex(list, start);
3629 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3630 for (j = 0; j < rangelen; j++) {
3631 ele = listNodeValue(ln);
3632 addReplyBulkLen(c,ele);
3633 addReply(c,ele);
3634 addReply(c,shared.crlf);
3635 ln = ln->next;
3636 }
3637 }
3638 }
3639 }
3640
3641 static void ltrimCommand(redisClient *c) {
3642 robj *o;
3643 int start = atoi(c->argv[2]->ptr);
3644 int end = atoi(c->argv[3]->ptr);
3645
3646 o = lookupKeyWrite(c->db,c->argv[1]);
3647 if (o == NULL) {
3648 addReply(c,shared.ok);
3649 } else {
3650 if (o->type != REDIS_LIST) {
3651 addReply(c,shared.wrongtypeerr);
3652 } else {
3653 list *list = o->ptr;
3654 listNode *ln;
3655 int llen = listLength(list);
3656 int j, ltrim, rtrim;
3657
3658 /* convert negative indexes */
3659 if (start < 0) start = llen+start;
3660 if (end < 0) end = llen+end;
3661 if (start < 0) start = 0;
3662 if (end < 0) end = 0;
3663
3664 /* indexes sanity checks */
3665 if (start > end || start >= llen) {
3666 /* Out of range start or start > end result in empty list */
3667 ltrim = llen;
3668 rtrim = 0;
3669 } else {
3670 if (end >= llen) end = llen-1;
3671 ltrim = start;
3672 rtrim = llen-end-1;
3673 }
3674
3675 /* Remove list elements to perform the trim */
3676 for (j = 0; j < ltrim; j++) {
3677 ln = listFirst(list);
3678 listDelNode(list,ln);
3679 }
3680 for (j = 0; j < rtrim; j++) {
3681 ln = listLast(list);
3682 listDelNode(list,ln);
3683 }
3684 server.dirty++;
3685 addReply(c,shared.ok);
3686 }
3687 }
3688 }
3689
3690 static void lremCommand(redisClient *c) {
3691 robj *o;
3692
3693 o = lookupKeyWrite(c->db,c->argv[1]);
3694 if (o == NULL) {
3695 addReply(c,shared.czero);
3696 } else {
3697 if (o->type != REDIS_LIST) {
3698 addReply(c,shared.wrongtypeerr);
3699 } else {
3700 list *list = o->ptr;
3701 listNode *ln, *next;
3702 int toremove = atoi(c->argv[2]->ptr);
3703 int removed = 0;
3704 int fromtail = 0;
3705
3706 if (toremove < 0) {
3707 toremove = -toremove;
3708 fromtail = 1;
3709 }
3710 ln = fromtail ? list->tail : list->head;
3711 while (ln) {
3712 robj *ele = listNodeValue(ln);
3713
3714 next = fromtail ? ln->prev : ln->next;
3715 if (compareStringObjects(ele,c->argv[3]) == 0) {
3716 listDelNode(list,ln);
3717 server.dirty++;
3718 removed++;
3719 if (toremove && removed == toremove) break;
3720 }
3721 ln = next;
3722 }
3723 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3724 }
3725 }
3726 }
3727
3728 /* This is the semantic of this command:
3729 * RPOPLPUSH srclist dstlist:
3730 * IF LLEN(srclist) > 0
3731 * element = RPOP srclist
3732 * LPUSH dstlist element
3733 * RETURN element
3734 * ELSE
3735 * RETURN nil
3736 * END
3737 * END
3738 *
3739 * The idea is to be able to get an element from a list in a reliable way
3740 * since the element is not just returned but pushed against another list
3741 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3742 */
3743 static void rpoplpushcommand(redisClient *c) {
3744 robj *sobj;
3745
3746 sobj = lookupKeyWrite(c->db,c->argv[1]);
3747 if (sobj == NULL) {
3748 addReply(c,shared.nullbulk);
3749 } else {
3750 if (sobj->type != REDIS_LIST) {
3751 addReply(c,shared.wrongtypeerr);
3752 } else {
3753 list *srclist = sobj->ptr;
3754 listNode *ln = listLast(srclist);
3755
3756 if (ln == NULL) {
3757 addReply(c,shared.nullbulk);
3758 } else {
3759 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3760 robj *ele = listNodeValue(ln);
3761 list *dstlist;
3762
3763 if (dobj == NULL) {
3764
3765 /* Create the list if the key does not exist */
3766 dobj = createListObject();
3767 dictAdd(c->db->dict,c->argv[2],dobj);
3768 incrRefCount(c->argv[2]);
3769 } else if (dobj->type != REDIS_LIST) {
3770 addReply(c,shared.wrongtypeerr);
3771 return;
3772 }
3773 /* Add the element to the target list */
3774 dstlist = dobj->ptr;
3775 listAddNodeHead(dstlist,ele);
3776 incrRefCount(ele);
3777
3778 /* Send the element to the client as reply as well */
3779 addReplyBulkLen(c,ele);
3780 addReply(c,ele);
3781 addReply(c,shared.crlf);
3782
3783 /* Finally remove the element from the source list */
3784 listDelNode(srclist,ln);
3785 server.dirty++;
3786 }
3787 }
3788 }
3789 }
3790
3791
3792 /* ==================================== Sets ================================ */
3793
3794 static void saddCommand(redisClient *c) {
3795 robj *set;
3796
3797 set = lookupKeyWrite(c->db,c->argv[1]);
3798 if (set == NULL) {
3799 set = createSetObject();
3800 dictAdd(c->db->dict,c->argv[1],set);
3801 incrRefCount(c->argv[1]);
3802 } else {
3803 if (set->type != REDIS_SET) {
3804 addReply(c,shared.wrongtypeerr);
3805 return;
3806 }
3807 }
3808 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3809 incrRefCount(c->argv[2]);
3810 server.dirty++;
3811 addReply(c,shared.cone);
3812 } else {
3813 addReply(c,shared.czero);
3814 }
3815 }
3816
3817 static void sremCommand(redisClient *c) {
3818 robj *set;
3819
3820 set = lookupKeyWrite(c->db,c->argv[1]);
3821 if (set == NULL) {
3822 addReply(c,shared.czero);
3823 } else {
3824 if (set->type != REDIS_SET) {
3825 addReply(c,shared.wrongtypeerr);
3826 return;
3827 }
3828 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3829 server.dirty++;
3830 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3831 addReply(c,shared.cone);
3832 } else {
3833 addReply(c,shared.czero);
3834 }
3835 }
3836 }
3837
3838 static void smoveCommand(redisClient *c) {
3839 robj *srcset, *dstset;
3840
3841 srcset = lookupKeyWrite(c->db,c->argv[1]);
3842 dstset = lookupKeyWrite(c->db,c->argv[2]);
3843
3844 /* If the source key does not exist return 0, if it's of the wrong type
3845 * raise an error */
3846 if (srcset == NULL || srcset->type != REDIS_SET) {
3847 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3848 return;
3849 }
3850 /* Error if the destination key is not a set as well */
3851 if (dstset && dstset->type != REDIS_SET) {
3852 addReply(c,shared.wrongtypeerr);
3853 return;
3854 }
3855 /* Remove the element from the source set */
3856 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3857 /* Key not found in the src set! return zero */
3858 addReply(c,shared.czero);
3859 return;
3860 }
3861 server.dirty++;
3862 /* Add the element to the destination set */
3863 if (!dstset) {
3864 dstset = createSetObject();
3865 dictAdd(c->db->dict,c->argv[2],dstset);
3866 incrRefCount(c->argv[2]);
3867 }
3868 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3869 incrRefCount(c->argv[3]);
3870 addReply(c,shared.cone);
3871 }
3872
3873 static void sismemberCommand(redisClient *c) {
3874 robj *set;
3875
3876 set = lookupKeyRead(c->db,c->argv[1]);
3877 if (set == NULL) {
3878 addReply(c,shared.czero);
3879 } else {
3880 if (set->type != REDIS_SET) {
3881 addReply(c,shared.wrongtypeerr);
3882 return;
3883 }
3884 if (dictFind(set->ptr,c->argv[2]))
3885 addReply(c,shared.cone);
3886 else
3887 addReply(c,shared.czero);
3888 }
3889 }
3890
3891 static void scardCommand(redisClient *c) {
3892 robj *o;
3893 dict *s;
3894
3895 o = lookupKeyRead(c->db,c->argv[1]);
3896 if (o == NULL) {
3897 addReply(c,shared.czero);
3898 return;
3899 } else {
3900 if (o->type != REDIS_SET) {
3901 addReply(c,shared.wrongtypeerr);
3902 } else {
3903 s = o->ptr;
3904 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
3905 dictSize(s)));
3906 }
3907 }
3908 }
3909
3910 static void spopCommand(redisClient *c) {
3911 robj *set;
3912 dictEntry *de;
3913
3914 set = lookupKeyWrite(c->db,c->argv[1]);
3915 if (set == NULL) {
3916 addReply(c,shared.nullbulk);
3917 } else {
3918 if (set->type != REDIS_SET) {
3919 addReply(c,shared.wrongtypeerr);
3920 return;
3921 }
3922 de = dictGetRandomKey(set->ptr);
3923 if (de == NULL) {
3924 addReply(c,shared.nullbulk);
3925 } else {
3926 robj *ele = dictGetEntryKey(de);
3927
3928 addReplyBulkLen(c,ele);
3929 addReply(c,ele);
3930 addReply(c,shared.crlf);
3931 dictDelete(set->ptr,ele);
3932 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3933 server.dirty++;
3934 }
3935 }
3936 }
3937
3938 static void srandmemberCommand(redisClient *c) {
3939 robj *set;
3940 dictEntry *de;
3941
3942 set = lookupKeyRead(c->db,c->argv[1]);
3943 if (set == NULL) {
3944 addReply(c,shared.nullbulk);
3945 } else {
3946 if (set->type != REDIS_SET) {
3947 addReply(c,shared.wrongtypeerr);
3948 return;
3949 }
3950 de = dictGetRandomKey(set->ptr);
3951 if (de == NULL) {
3952 addReply(c,shared.nullbulk);
3953 } else {
3954 robj *ele = dictGetEntryKey(de);
3955
3956 addReplyBulkLen(c,ele);
3957 addReply(c,ele);
3958 addReply(c,shared.crlf);
3959 }
3960 }
3961 }
3962
3963 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3964 dict **d1 = (void*) s1, **d2 = (void*) s2;
3965
3966 return dictSize(*d1)-dictSize(*d2);
3967 }
3968
3969 static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
3970 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3971 dictIterator *di;
3972 dictEntry *de;
3973 robj *lenobj = NULL, *dstset = NULL;
3974 unsigned long j, cardinality = 0;
3975
3976 for (j = 0; j < setsnum; j++) {
3977 robj *setobj;
3978
3979 setobj = dstkey ?
3980 lookupKeyWrite(c->db,setskeys[j]) :
3981 lookupKeyRead(c->db,setskeys[j]);
3982 if (!setobj) {
3983 zfree(dv);
3984 if (dstkey) {
3985 if (deleteKey(c->db,dstkey))
3986 server.dirty++;
3987 addReply(c,shared.czero);
3988 } else {
3989 addReply(c,shared.nullmultibulk);
3990 }
3991 return;
3992 }
3993 if (setobj->type != REDIS_SET) {
3994 zfree(dv);
3995 addReply(c,shared.wrongtypeerr);
3996 return;
3997 }
3998 dv[j] = setobj->ptr;
3999 }
4000 /* Sort sets from the smallest to largest, this will improve our
4001 * algorithm's performace */
4002 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
4003
4004 /* The first thing we should output is the total number of elements...
4005 * since this is a multi-bulk write, but at this stage we don't know
4006 * the intersection set size, so we use a trick, append an empty object
4007 * to the output list and save the pointer to later modify it with the
4008 * right length */
4009 if (!dstkey) {
4010 lenobj = createObject(REDIS_STRING,NULL);
4011 addReply(c,lenobj);
4012 decrRefCount(lenobj);
4013 } else {
4014 /* If we have a target key where to store the resulting set
4015 * create this key with an empty set inside */
4016 dstset = createSetObject();
4017 }
4018
4019 /* Iterate all the elements of the first (smallest) set, and test
4020 * the element against all the other sets, if at least one set does
4021 * not include the element it is discarded */
4022 di = dictGetIterator(dv[0]);
4023
4024 while((de = dictNext(di)) != NULL) {
4025 robj *ele;
4026
4027 for (j = 1; j < setsnum; j++)
4028 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4029 if (j != setsnum)
4030 continue; /* at least one set does not contain the member */
4031 ele = dictGetEntryKey(de);
4032 if (!dstkey) {
4033 addReplyBulkLen(c,ele);
4034 addReply(c,ele);
4035 addReply(c,shared.crlf);
4036 cardinality++;
4037 } else {
4038 dictAdd(dstset->ptr,ele,NULL);
4039 incrRefCount(ele);
4040 }
4041 }
4042 dictReleaseIterator(di);
4043
4044 if (dstkey) {
4045 /* Store the resulting set into the target */
4046 deleteKey(c->db,dstkey);
4047 dictAdd(c->db->dict,dstkey,dstset);
4048 incrRefCount(dstkey);
4049 }
4050
4051 if (!dstkey) {
4052 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
4053 } else {
4054 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4055 dictSize((dict*)dstset->ptr)));
4056 server.dirty++;
4057 }
4058 zfree(dv);
4059 }
4060
4061 static void sinterCommand(redisClient *c) {
4062 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4063 }
4064
4065 static void sinterstoreCommand(redisClient *c) {
4066 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4067 }
4068
4069 #define REDIS_OP_UNION 0
4070 #define REDIS_OP_DIFF 1
4071
4072 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4073 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4074 dictIterator *di;
4075 dictEntry *de;
4076 robj *dstset = NULL;
4077 int j, cardinality = 0;
4078
4079 for (j = 0; j < setsnum; j++) {
4080 robj *setobj;
4081
4082 setobj = dstkey ?
4083 lookupKeyWrite(c->db,setskeys[j]) :
4084 lookupKeyRead(c->db,setskeys[j]);
4085 if (!setobj) {
4086 dv[j] = NULL;
4087 continue;
4088 }
4089 if (setobj->type != REDIS_SET) {
4090 zfree(dv);
4091 addReply(c,shared.wrongtypeerr);
4092 return;
4093 }
4094 dv[j] = setobj->ptr;
4095 }
4096
4097 /* We need a temp set object to store our union. If the dstkey
4098 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4099 * this set object will be the resulting object to set into the target key*/
4100 dstset = createSetObject();
4101
4102 /* Iterate all the elements of all the sets, add every element a single
4103 * time to the result set */
4104 for (j = 0; j < setsnum; j++) {
4105 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4106 if (!dv[j]) continue; /* non existing keys are like empty sets */
4107
4108 di = dictGetIterator(dv[j]);
4109
4110 while((de = dictNext(di)) != NULL) {
4111 robj *ele;
4112
4113 /* dictAdd will not add the same element multiple times */
4114 ele = dictGetEntryKey(de);
4115 if (op == REDIS_OP_UNION || j == 0) {
4116 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4117 incrRefCount(ele);
4118 cardinality++;
4119 }
4120 } else if (op == REDIS_OP_DIFF) {
4121 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4122 cardinality--;
4123 }
4124 }
4125 }
4126 dictReleaseIterator(di);
4127
4128 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4129 }
4130
4131 /* Output the content of the resulting set, if not in STORE mode */
4132 if (!dstkey) {
4133 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4134 di = dictGetIterator(dstset->ptr);
4135 while((de = dictNext(di)) != NULL) {
4136 robj *ele;
4137
4138 ele = dictGetEntryKey(de);
4139 addReplyBulkLen(c,ele);
4140 addReply(c,ele);
4141 addReply(c,shared.crlf);
4142 }
4143 dictReleaseIterator(di);
4144 } else {
4145 /* If we have a target key where to store the resulting set
4146 * create this key with the result set inside */
4147 deleteKey(c->db,dstkey);
4148 dictAdd(c->db->dict,dstkey,dstset);
4149 incrRefCount(dstkey);
4150 }
4151
4152 /* Cleanup */
4153 if (!dstkey) {
4154 decrRefCount(dstset);
4155 } else {
4156 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4157 dictSize((dict*)dstset->ptr)));
4158 server.dirty++;
4159 }
4160 zfree(dv);
4161 }
4162
4163 static void sunionCommand(redisClient *c) {
4164 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4165 }
4166
4167 static void sunionstoreCommand(redisClient *c) {
4168 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4169 }
4170
4171 static void sdiffCommand(redisClient *c) {
4172 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4173 }
4174
4175 static void sdiffstoreCommand(redisClient *c) {
4176 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4177 }
4178
4179 /* ==================================== ZSets =============================== */
4180
4181 /* ZSETs are ordered sets using two data structures to hold the same elements
4182 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4183 * data structure.
4184 *
4185 * The elements are added to an hash table mapping Redis objects to scores.
4186 * At the same time the elements are added to a skip list mapping scores
4187 * to Redis objects (so objects are sorted by scores in this "view"). */
4188
4189 /* This skiplist implementation is almost a C translation of the original
4190 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4191 * Alternative to Balanced Trees", modified in three ways:
4192 * a) this implementation allows for repeated values.
4193 * b) the comparison is not just by key (our 'score') but by satellite data.
4194 * c) there is a back pointer, so it's a doubly linked list with the back
4195 * pointers being only at "level 1". This allows to traverse the list
4196 * from tail to head, useful for ZREVRANGE. */
4197
4198 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4199 zskiplistNode *zn = zmalloc(sizeof(*zn));
4200
4201 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4202 zn->score = score;
4203 zn->obj = obj;
4204 return zn;
4205 }
4206
4207 static zskiplist *zslCreate(void) {
4208 int j;
4209 zskiplist *zsl;
4210
4211 zsl = zmalloc(sizeof(*zsl));
4212 zsl->level = 1;
4213 zsl->length = 0;
4214 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4215 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4216 zsl->header->forward[j] = NULL;
4217 zsl->header->backward = NULL;
4218 zsl->tail = NULL;
4219 return zsl;
4220 }
4221
4222 static void zslFreeNode(zskiplistNode *node) {
4223 decrRefCount(node->obj);
4224 zfree(node->forward);
4225 zfree(node);
4226 }
4227
4228 static void zslFree(zskiplist *zsl) {
4229 zskiplistNode *node = zsl->header->forward[0], *next;
4230
4231 zfree(zsl->header->forward);
4232 zfree(zsl->header);
4233 while(node) {
4234 next = node->forward[0];
4235 zslFreeNode(node);
4236 node = next;
4237 }
4238 zfree(zsl);
4239 }
4240
4241 static int zslRandomLevel(void) {
4242 int level = 1;
4243 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4244 level += 1;
4245 return level;
4246 }
4247
4248 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4249 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4250 int i, level;
4251
4252 x = zsl->header;
4253 for (i = zsl->level-1; i >= 0; i--) {
4254 while (x->forward[i] &&
4255 (x->forward[i]->score < score ||
4256 (x->forward[i]->score == score &&
4257 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4258 x = x->forward[i];
4259 update[i] = x;
4260 }
4261 /* we assume the key is not already inside, since we allow duplicated
4262 * scores, and the re-insertion of score and redis object should never
4263 * happpen since the caller of zslInsert() should test in the hash table
4264 * if the element is already inside or not. */
4265 level = zslRandomLevel();
4266 if (level > zsl->level) {
4267 for (i = zsl->level; i < level; i++)
4268 update[i] = zsl->header;
4269 zsl->level = level;
4270 }
4271 x = zslCreateNode(level,score,obj);
4272 for (i = 0; i < level; i++) {
4273 x->forward[i] = update[i]->forward[i];
4274 update[i]->forward[i] = x;
4275 }
4276 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4277 if (x->forward[0])
4278 x->forward[0]->backward = x;
4279 else
4280 zsl->tail = x;
4281 zsl->length++;
4282 }
4283
4284 /* Delete an element with matching score/object from the skiplist. */
4285 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4286 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4287 int i;
4288
4289 x = zsl->header;
4290 for (i = zsl->level-1; i >= 0; i--) {
4291 while (x->forward[i] &&
4292 (x->forward[i]->score < score ||
4293 (x->forward[i]->score == score &&
4294 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4295 x = x->forward[i];
4296 update[i] = x;
4297 }
4298 /* We may have multiple elements with the same score, what we need
4299 * is to find the element with both the right score and object. */
4300 x = x->forward[0];
4301 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4302 for (i = 0; i < zsl->level; i++) {
4303 if (update[i]->forward[i] != x) break;
4304 update[i]->forward[i] = x->forward[i];
4305 }
4306 if (x->forward[0]) {
4307 x->forward[0]->backward = (x->backward == zsl->header) ?
4308 NULL : x->backward;
4309 } else {
4310 zsl->tail = x->backward;
4311 }
4312 zslFreeNode(x);
4313 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4314 zsl->level--;
4315 zsl->length--;
4316 return 1;
4317 } else {
4318 return 0; /* not found */
4319 }
4320 return 0; /* not found */
4321 }
4322
4323 /* Delete all the elements with score between min and max from the skiplist.
4324 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4325 * Note that this function takes the reference to the hash table view of the
4326 * sorted set, in order to remove the elements from the hash table too. */
4327 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4328 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4329 unsigned long removed = 0;
4330 int i;
4331
4332 x = zsl->header;
4333 for (i = zsl->level-1; i >= 0; i--) {
4334 while (x->forward[i] && x->forward[i]->score < min)
4335 x = x->forward[i];
4336 update[i] = x;
4337 }
4338 /* We may have multiple elements with the same score, what we need
4339 * is to find the element with both the right score and object. */
4340 x = x->forward[0];
4341 while (x && x->score <= max) {
4342 zskiplistNode *next;
4343
4344 for (i = 0; i < zsl->level; i++) {
4345 if (update[i]->forward[i] != x) break;
4346 update[i]->forward[i] = x->forward[i];
4347 }
4348 if (x->forward[0]) {
4349 x->forward[0]->backward = (x->backward == zsl->header) ?
4350 NULL : x->backward;
4351 } else {
4352 zsl->tail = x->backward;
4353 }
4354 next = x->forward[0];
4355 dictDelete(dict,x->obj);
4356 zslFreeNode(x);
4357 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4358 zsl->level--;
4359 zsl->length--;
4360 removed++;
4361 x = next;
4362 }
4363 return removed; /* not found */
4364 }
4365
4366 /* Find the first node having a score equal or greater than the specified one.
4367 * Returns NULL if there is no match. */
4368 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4369 zskiplistNode *x;
4370 int i;
4371
4372 x = zsl->header;
4373 for (i = zsl->level-1; i >= 0; i--) {
4374 while (x->forward[i] && x->forward[i]->score < score)
4375 x = x->forward[i];
4376 }
4377 /* We may have multiple elements with the same score, what we need
4378 * is to find the element with both the right score and object. */
4379 return x->forward[0];
4380 }
4381
4382 /* The actual Z-commands implementations */
4383
4384 /* This generic command implements both ZADD and ZINCRBY.
4385 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4386 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4387 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4388 robj *zsetobj;
4389 zset *zs;
4390 double *score;
4391
4392 zsetobj = lookupKeyWrite(c->db,key);
4393 if (zsetobj == NULL) {
4394 zsetobj = createZsetObject();
4395 dictAdd(c->db->dict,key,zsetobj);
4396 incrRefCount(key);
4397 } else {
4398 if (zsetobj->type != REDIS_ZSET) {
4399 addReply(c,shared.wrongtypeerr);
4400 return;
4401 }
4402 }
4403 zs = zsetobj->ptr;
4404
4405 /* Ok now since we implement both ZADD and ZINCRBY here the code
4406 * needs to handle the two different conditions. It's all about setting
4407 * '*score', that is, the new score to set, to the right value. */
4408 score = zmalloc(sizeof(double));
4409 if (doincrement) {
4410 dictEntry *de;
4411
4412 /* Read the old score. If the element was not present starts from 0 */
4413 de = dictFind(zs->dict,ele);
4414 if (de) {
4415 double *oldscore = dictGetEntryVal(de);
4416 *score = *oldscore + scoreval;
4417 } else {
4418 *score = scoreval;
4419 }
4420 } else {
4421 *score = scoreval;
4422 }
4423
4424 /* What follows is a simple remove and re-insert operation that is common
4425 * to both ZADD and ZINCRBY... */
4426 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4427 /* case 1: New element */
4428 incrRefCount(ele); /* added to hash */
4429 zslInsert(zs->zsl,*score,ele);
4430 incrRefCount(ele); /* added to skiplist */
4431 server.dirty++;
4432 if (doincrement)
4433 addReplyDouble(c,*score);
4434 else
4435 addReply(c,shared.cone);
4436 } else {
4437 dictEntry *de;
4438 double *oldscore;
4439
4440 /* case 2: Score update operation */
4441 de = dictFind(zs->dict,ele);
4442 redisAssert(de != NULL);
4443 oldscore = dictGetEntryVal(de);
4444 if (*score != *oldscore) {
4445 int deleted;
4446
4447 /* Remove and insert the element in the skip list with new score */
4448 deleted = zslDelete(zs->zsl,*oldscore,ele);
4449 redisAssert(deleted != 0);
4450 zslInsert(zs->zsl,*score,ele);
4451 incrRefCount(ele);
4452 /* Update the score in the hash table */
4453 dictReplace(zs->dict,ele,score);
4454 server.dirty++;
4455 } else {
4456 zfree(score);
4457 }
4458 if (doincrement)
4459 addReplyDouble(c,*score);
4460 else
4461 addReply(c,shared.czero);
4462 }
4463 }
4464
4465 static void zaddCommand(redisClient *c) {
4466 double scoreval;
4467
4468 scoreval = strtod(c->argv[2]->ptr,NULL);
4469 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4470 }
4471
4472 static void zincrbyCommand(redisClient *c) {
4473 double scoreval;
4474
4475 scoreval = strtod(c->argv[2]->ptr,NULL);
4476 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4477 }
4478
4479 static void zremCommand(redisClient *c) {
4480 robj *zsetobj;
4481 zset *zs;
4482
4483 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4484 if (zsetobj == NULL) {
4485 addReply(c,shared.czero);
4486 } else {
4487 dictEntry *de;
4488 double *oldscore;
4489 int deleted;
4490
4491 if (zsetobj->type != REDIS_ZSET) {
4492 addReply(c,shared.wrongtypeerr);
4493 return;
4494 }
4495 zs = zsetobj->ptr;
4496 de = dictFind(zs->dict,c->argv[2]);
4497 if (de == NULL) {
4498 addReply(c,shared.czero);
4499 return;
4500 }
4501 /* Delete from the skiplist */
4502 oldscore = dictGetEntryVal(de);
4503 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4504 redisAssert(deleted != 0);
4505
4506 /* Delete from the hash table */
4507 dictDelete(zs->dict,c->argv[2]);
4508 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4509 server.dirty++;
4510 addReply(c,shared.cone);
4511 }
4512 }
4513
4514 static void zremrangebyscoreCommand(redisClient *c) {
4515 double min = strtod(c->argv[2]->ptr,NULL);
4516 double max = strtod(c->argv[3]->ptr,NULL);
4517 robj *zsetobj;
4518 zset *zs;
4519
4520 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4521 if (zsetobj == NULL) {
4522 addReply(c,shared.czero);
4523 } else {
4524 long deleted;
4525
4526 if (zsetobj->type != REDIS_ZSET) {
4527 addReply(c,shared.wrongtypeerr);
4528 return;
4529 }
4530 zs = zsetobj->ptr;
4531 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4532 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4533 server.dirty += deleted;
4534 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4535 }
4536 }
4537
4538 static void zrangeGenericCommand(redisClient *c, int reverse) {
4539 robj *o;
4540 int start = atoi(c->argv[2]->ptr);
4541 int end = atoi(c->argv[3]->ptr);
4542 int withscores = 0;
4543
4544 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
4545 withscores = 1;
4546 } else if (c->argc >= 5) {
4547 addReply(c,shared.syntaxerr);
4548 return;
4549 }
4550
4551 o = lookupKeyRead(c->db,c->argv[1]);
4552 if (o == NULL) {
4553 addReply(c,shared.nullmultibulk);
4554 } else {
4555 if (o->type != REDIS_ZSET) {
4556 addReply(c,shared.wrongtypeerr);
4557 } else {
4558 zset *zsetobj = o->ptr;
4559 zskiplist *zsl = zsetobj->zsl;
4560 zskiplistNode *ln;
4561
4562 int llen = zsl->length;
4563 int rangelen, j;
4564 robj *ele;
4565
4566 /* convert negative indexes */
4567 if (start < 0) start = llen+start;
4568 if (end < 0) end = llen+end;
4569 if (start < 0) start = 0;
4570 if (end < 0) end = 0;
4571
4572 /* indexes sanity checks */
4573 if (start > end || start >= llen) {
4574 /* Out of range start or start > end result in empty list */
4575 addReply(c,shared.emptymultibulk);
4576 return;
4577 }
4578 if (end >= llen) end = llen-1;
4579 rangelen = (end-start)+1;
4580
4581 /* Return the result in form of a multi-bulk reply */
4582 if (reverse) {
4583 ln = zsl->tail;
4584 while (start--)
4585 ln = ln->backward;
4586 } else {
4587 ln = zsl->header->forward[0];
4588 while (start--)
4589 ln = ln->forward[0];
4590 }
4591
4592 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
4593 withscores ? (rangelen*2) : rangelen));
4594 for (j = 0; j < rangelen; j++) {
4595 ele = ln->obj;
4596 addReplyBulkLen(c,ele);
4597 addReply(c,ele);
4598 addReply(c,shared.crlf);
4599 if (withscores)
4600 addReplyDouble(c,ln->score);
4601 ln = reverse ? ln->backward : ln->forward[0];
4602 }
4603 }
4604 }
4605 }
4606
4607 static void zrangeCommand(redisClient *c) {
4608 zrangeGenericCommand(c,0);
4609 }
4610
4611 static void zrevrangeCommand(redisClient *c) {
4612 zrangeGenericCommand(c,1);
4613 }
4614
4615 static void zrangebyscoreCommand(redisClient *c) {
4616 robj *o;
4617 double min = strtod(c->argv[2]->ptr,NULL);
4618 double max = strtod(c->argv[3]->ptr,NULL);
4619 int offset = 0, limit = -1;
4620
4621 if (c->argc != 4 && c->argc != 7) {
4622 addReplySds(c,
4623 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4624 return;
4625 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4626 addReply(c,shared.syntaxerr);
4627 return;
4628 } else if (c->argc == 7) {
4629 offset = atoi(c->argv[5]->ptr);
4630 limit = atoi(c->argv[6]->ptr);
4631 if (offset < 0) offset = 0;
4632 }
4633
4634 o = lookupKeyRead(c->db,c->argv[1]);
4635 if (o == NULL) {
4636 addReply(c,shared.nullmultibulk);
4637 } else {
4638 if (o->type != REDIS_ZSET) {
4639 addReply(c,shared.wrongtypeerr);
4640 } else {
4641 zset *zsetobj = o->ptr;
4642 zskiplist *zsl = zsetobj->zsl;
4643 zskiplistNode *ln;
4644 robj *ele, *lenobj;
4645 unsigned int rangelen = 0;
4646
4647 /* Get the first node with the score >= min */
4648 ln = zslFirstWithScore(zsl,min);
4649 if (ln == NULL) {
4650 /* No element matching the speciifed interval */
4651 addReply(c,shared.emptymultibulk);
4652 return;
4653 }
4654
4655 /* We don't know in advance how many matching elements there
4656 * are in the list, so we push this object that will represent
4657 * the multi-bulk length in the output buffer, and will "fix"
4658 * it later */
4659 lenobj = createObject(REDIS_STRING,NULL);
4660 addReply(c,lenobj);
4661 decrRefCount(lenobj);
4662
4663 while(ln && ln->score <= max) {
4664 if (offset) {
4665 offset--;
4666 ln = ln->forward[0];
4667 continue;
4668 }
4669 if (limit == 0) break;
4670 ele = ln->obj;
4671 addReplyBulkLen(c,ele);
4672 addReply(c,ele);
4673 addReply(c,shared.crlf);
4674 ln = ln->forward[0];
4675 rangelen++;
4676 if (limit > 0) limit--;
4677 }
4678 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4679 }
4680 }
4681 }
4682
4683 static void zcardCommand(redisClient *c) {
4684 robj *o;
4685 zset *zs;
4686
4687 o = lookupKeyRead(c->db,c->argv[1]);
4688 if (o == NULL) {
4689 addReply(c,shared.czero);
4690 return;
4691 } else {
4692 if (o->type != REDIS_ZSET) {
4693 addReply(c,shared.wrongtypeerr);
4694 } else {
4695 zs = o->ptr;
4696 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
4697 }
4698 }
4699 }
4700
4701 static void zscoreCommand(redisClient *c) {
4702 robj *o;
4703 zset *zs;
4704
4705 o = lookupKeyRead(c->db,c->argv[1]);
4706 if (o == NULL) {
4707 addReply(c,shared.nullbulk);
4708 return;
4709 } else {
4710 if (o->type != REDIS_ZSET) {
4711 addReply(c,shared.wrongtypeerr);
4712 } else {
4713 dictEntry *de;
4714
4715 zs = o->ptr;
4716 de = dictFind(zs->dict,c->argv[2]);
4717 if (!de) {
4718 addReply(c,shared.nullbulk);
4719 } else {
4720 double *score = dictGetEntryVal(de);
4721
4722 addReplyDouble(c,*score);
4723 }
4724 }
4725 }
4726 }
4727
4728 /* ========================= Non type-specific commands ==================== */
4729
4730 static void flushdbCommand(redisClient *c) {
4731 server.dirty += dictSize(c->db->dict);
4732 dictEmpty(c->db->dict);
4733 dictEmpty(c->db->expires);
4734 addReply(c,shared.ok);
4735 }
4736
4737 static void flushallCommand(redisClient *c) {
4738 server.dirty += emptyDb();
4739 addReply(c,shared.ok);
4740 rdbSave(server.dbfilename);
4741 server.dirty++;
4742 }
4743
4744 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4745 redisSortOperation *so = zmalloc(sizeof(*so));
4746 so->type = type;
4747 so->pattern = pattern;
4748 return so;
4749 }
4750
4751 /* Return the value associated to the key with a name obtained
4752 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4753 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4754 char *p;
4755 sds spat, ssub;
4756 robj keyobj;
4757 int prefixlen, sublen, postfixlen;
4758 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4759 struct {
4760 long len;
4761 long free;
4762 char buf[REDIS_SORTKEY_MAX+1];
4763 } keyname;
4764
4765 /* If the pattern is "#" return the substitution object itself in order
4766 * to implement the "SORT ... GET #" feature. */
4767 spat = pattern->ptr;
4768 if (spat[0] == '#' && spat[1] == '\0') {
4769 return subst;
4770 }
4771
4772 /* The substitution object may be specially encoded. If so we create
4773 * a decoded object on the fly. Otherwise getDecodedObject will just
4774 * increment the ref count, that we'll decrement later. */
4775 subst = getDecodedObject(subst);
4776
4777 ssub = subst->ptr;
4778 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4779 p = strchr(spat,'*');
4780 if (!p) {
4781 decrRefCount(subst);
4782 return NULL;
4783 }
4784
4785 prefixlen = p-spat;
4786 sublen = sdslen(ssub);
4787 postfixlen = sdslen(spat)-(prefixlen+1);
4788 memcpy(keyname.buf,spat,prefixlen);
4789 memcpy(keyname.buf+prefixlen,ssub,sublen);
4790 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4791 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4792 keyname.len = prefixlen+sublen+postfixlen;
4793
4794 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
4795 decrRefCount(subst);
4796
4797 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4798 return lookupKeyRead(db,&keyobj);
4799 }
4800
4801 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4802 * the additional parameter is not standard but a BSD-specific we have to
4803 * pass sorting parameters via the global 'server' structure */
4804 static int sortCompare(const void *s1, const void *s2) {
4805 const redisSortObject *so1 = s1, *so2 = s2;
4806 int cmp;
4807
4808 if (!server.sort_alpha) {
4809 /* Numeric sorting. Here it's trivial as we precomputed scores */
4810 if (so1->u.score > so2->u.score) {
4811 cmp = 1;
4812 } else if (so1->u.score < so2->u.score) {
4813 cmp = -1;
4814 } else {
4815 cmp = 0;
4816 }
4817 } else {
4818 /* Alphanumeric sorting */
4819 if (server.sort_bypattern) {
4820 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4821 /* At least one compare object is NULL */
4822 if (so1->u.cmpobj == so2->u.cmpobj)
4823 cmp = 0;
4824 else if (so1->u.cmpobj == NULL)
4825 cmp = -1;
4826 else
4827 cmp = 1;
4828 } else {
4829 /* We have both the objects, use strcoll */
4830 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4831 }
4832 } else {
4833 /* Compare elements directly */
4834 robj *dec1, *dec2;
4835
4836 dec1 = getDecodedObject(so1->obj);
4837 dec2 = getDecodedObject(so2->obj);
4838 cmp = strcoll(dec1->ptr,dec2->ptr);
4839 decrRefCount(dec1);
4840 decrRefCount(dec2);
4841 }
4842 }
4843 return server.sort_desc ? -cmp : cmp;
4844 }
4845
4846 /* The SORT command is the most complex command in Redis. Warning: this code
4847 * is optimized for speed and a bit less for readability */
4848 static void sortCommand(redisClient *c) {
4849 list *operations;
4850 int outputlen = 0;
4851 int desc = 0, alpha = 0;
4852 int limit_start = 0, limit_count = -1, start, end;
4853 int j, dontsort = 0, vectorlen;
4854 int getop = 0; /* GET operation counter */
4855 robj *sortval, *sortby = NULL, *storekey = NULL;
4856 redisSortObject *vector; /* Resulting vector to sort */
4857
4858 /* Lookup the key to sort. It must be of the right types */
4859 sortval = lookupKeyRead(c->db,c->argv[1]);
4860 if (sortval == NULL) {
4861 addReply(c,shared.nullmultibulk);
4862 return;
4863 }
4864 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
4865 sortval->type != REDIS_ZSET)
4866 {
4867 addReply(c,shared.wrongtypeerr);
4868 return;
4869 }
4870
4871 /* Create a list of operations to perform for every sorted element.
4872 * Operations can be GET/DEL/INCR/DECR */
4873 operations = listCreate();
4874 listSetFreeMethod(operations,zfree);
4875 j = 2;
4876
4877 /* Now we need to protect sortval incrementing its count, in the future
4878 * SORT may have options able to overwrite/delete keys during the sorting
4879 * and the sorted key itself may get destroied */
4880 incrRefCount(sortval);
4881
4882 /* The SORT command has an SQL-alike syntax, parse it */
4883 while(j < c->argc) {
4884 int leftargs = c->argc-j-1;
4885 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4886 desc = 0;
4887 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4888 desc = 1;
4889 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4890 alpha = 1;
4891 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4892 limit_start = atoi(c->argv[j+1]->ptr);
4893 limit_count = atoi(c->argv[j+2]->ptr);
4894 j+=2;
4895 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4896 storekey = c->argv[j+1];
4897 j++;
4898 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4899 sortby = c->argv[j+1];
4900 /* If the BY pattern does not contain '*', i.e. it is constant,
4901 * we don't need to sort nor to lookup the weight keys. */
4902 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4903 j++;
4904 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4905 listAddNodeTail(operations,createSortOperation(
4906 REDIS_SORT_GET,c->argv[j+1]));
4907 getop++;
4908 j++;
4909 } else {
4910 decrRefCount(sortval);
4911 listRelease(operations);
4912 addReply(c,shared.syntaxerr);
4913 return;
4914 }
4915 j++;
4916 }
4917
4918 /* Load the sorting vector with all the objects to sort */
4919 switch(sortval->type) {
4920 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
4921 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
4922 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
4923 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
4924 }
4925 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4926 j = 0;
4927
4928 if (sortval->type == REDIS_LIST) {
4929 list *list = sortval->ptr;
4930 listNode *ln;
4931
4932 listRewind(list);
4933 while((ln = listYield(list))) {
4934 robj *ele = ln->value;
4935 vector[j].obj = ele;
4936 vector[j].u.score = 0;
4937 vector[j].u.cmpobj = NULL;
4938 j++;
4939 }
4940 } else {
4941 dict *set;
4942 dictIterator *di;
4943 dictEntry *setele;
4944
4945 if (sortval->type == REDIS_SET) {
4946 set = sortval->ptr;
4947 } else {
4948 zset *zs = sortval->ptr;
4949 set = zs->dict;
4950 }
4951
4952 di = dictGetIterator(set);
4953 while((setele = dictNext(di)) != NULL) {
4954 vector[j].obj = dictGetEntryKey(setele);
4955 vector[j].u.score = 0;
4956 vector[j].u.cmpobj = NULL;
4957 j++;
4958 }
4959 dictReleaseIterator(di);
4960 }
4961 redisAssert(j == vectorlen);
4962
4963 /* Now it's time to load the right scores in the sorting vector */
4964 if (dontsort == 0) {
4965 for (j = 0; j < vectorlen; j++) {
4966 if (sortby) {
4967 robj *byval;
4968
4969 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4970 if (!byval || byval->type != REDIS_STRING) continue;
4971 if (alpha) {
4972 vector[j].u.cmpobj = getDecodedObject(byval);
4973 } else {
4974 if (byval->encoding == REDIS_ENCODING_RAW) {
4975 vector[j].u.score = strtod(byval->ptr,NULL);
4976 } else {
4977 /* Don't need to decode the object if it's
4978 * integer-encoded (the only encoding supported) so
4979 * far. We can just cast it */
4980 if (byval->encoding == REDIS_ENCODING_INT) {
4981 vector[j].u.score = (long)byval->ptr;
4982 } else
4983 redisAssert(1 != 1);
4984 }
4985 }
4986 } else {
4987 if (!alpha) {
4988 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4989 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4990 else {
4991 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4992 vector[j].u.score = (long) vector[j].obj->ptr;
4993 else
4994 redisAssert(1 != 1);
4995 }
4996 }
4997 }
4998 }
4999 }
5000
5001 /* We are ready to sort the vector... perform a bit of sanity check
5002 * on the LIMIT option too. We'll use a partial version of quicksort. */
5003 start = (limit_start < 0) ? 0 : limit_start;
5004 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
5005 if (start >= vectorlen) {
5006 start = vectorlen-1;
5007 end = vectorlen-2;
5008 }
5009 if (end >= vectorlen) end = vectorlen-1;
5010
5011 if (dontsort == 0) {
5012 server.sort_desc = desc;
5013 server.sort_alpha = alpha;
5014 server.sort_bypattern = sortby ? 1 : 0;
5015 if (sortby && (start != 0 || end != vectorlen-1))
5016 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
5017 else
5018 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
5019 }
5020
5021 /* Send command output to the output buffer, performing the specified
5022 * GET/DEL/INCR/DECR operations if any. */
5023 outputlen = getop ? getop*(end-start+1) : end-start+1;
5024 if (storekey == NULL) {
5025 /* STORE option not specified, sent the sorting result to client */
5026 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
5027 for (j = start; j <= end; j++) {
5028 listNode *ln;
5029 if (!getop) {
5030 addReplyBulkLen(c,vector[j].obj);
5031 addReply(c,vector[j].obj);
5032 addReply(c,shared.crlf);
5033 }
5034 listRewind(operations);
5035 while((ln = listYield(operations))) {
5036 redisSortOperation *sop = ln->value;
5037 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5038 vector[j].obj);
5039
5040 if (sop->type == REDIS_SORT_GET) {
5041 if (!val || val->type != REDIS_STRING) {
5042 addReply(c,shared.nullbulk);
5043 } else {
5044 addReplyBulkLen(c,val);
5045 addReply(c,val);
5046 addReply(c,shared.crlf);
5047 }
5048 } else {
5049 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5050 }
5051 }
5052 }
5053 } else {
5054 robj *listObject = createListObject();
5055 list *listPtr = (list*) listObject->ptr;
5056
5057 /* STORE option specified, set the sorting result as a List object */
5058 for (j = start; j <= end; j++) {
5059 listNode *ln;
5060 if (!getop) {
5061 listAddNodeTail(listPtr,vector[j].obj);
5062 incrRefCount(vector[j].obj);
5063 }
5064 listRewind(operations);
5065 while((ln = listYield(operations))) {
5066 redisSortOperation *sop = ln->value;
5067 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5068 vector[j].obj);
5069
5070 if (sop->type == REDIS_SORT_GET) {
5071 if (!val || val->type != REDIS_STRING) {
5072 listAddNodeTail(listPtr,createStringObject("",0));
5073 } else {
5074 listAddNodeTail(listPtr,val);
5075 incrRefCount(val);
5076 }
5077 } else {
5078 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5079 }
5080 }
5081 }
5082 if (dictReplace(c->db->dict,storekey,listObject)) {
5083 incrRefCount(storekey);
5084 }
5085 /* Note: we add 1 because the DB is dirty anyway since even if the
5086 * SORT result is empty a new key is set and maybe the old content
5087 * replaced. */
5088 server.dirty += 1+outputlen;
5089 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5090 }
5091
5092 /* Cleanup */
5093 decrRefCount(sortval);
5094 listRelease(operations);
5095 for (j = 0; j < vectorlen; j++) {
5096 if (sortby && alpha && vector[j].u.cmpobj)
5097 decrRefCount(vector[j].u.cmpobj);
5098 }
5099 zfree(vector);
5100 }
5101
5102 /* Create the string returned by the INFO command. This is decoupled
5103 * by the INFO command itself as we need to report the same information
5104 * on memory corruption problems. */
5105 static sds genRedisInfoString(void) {
5106 sds info;
5107 time_t uptime = time(NULL)-server.stat_starttime;
5108 int j;
5109
5110 info = sdscatprintf(sdsempty(),
5111 "redis_version:%s\r\n"
5112 "arch_bits:%s\r\n"
5113 "multiplexing_api:%s\r\n"
5114 "uptime_in_seconds:%ld\r\n"
5115 "uptime_in_days:%ld\r\n"
5116 "connected_clients:%d\r\n"
5117 "connected_slaves:%d\r\n"
5118 "used_memory:%zu\r\n"
5119 "changes_since_last_save:%lld\r\n"
5120 "bgsave_in_progress:%d\r\n"
5121 "last_save_time:%ld\r\n"
5122 "bgrewriteaof_in_progress:%d\r\n"
5123 "total_connections_received:%lld\r\n"
5124 "total_commands_processed:%lld\r\n"
5125 "role:%s\r\n"
5126 ,REDIS_VERSION,
5127 (sizeof(long) == 8) ? "64" : "32",
5128 aeGetApiName(),
5129 uptime,
5130 uptime/(3600*24),
5131 listLength(server.clients)-listLength(server.slaves),
5132 listLength(server.slaves),
5133 server.usedmemory,
5134 server.dirty,
5135 server.bgsavechildpid != -1,
5136 server.lastsave,
5137 server.bgrewritechildpid != -1,
5138 server.stat_numconnections,
5139 server.stat_numcommands,
5140 server.masterhost == NULL ? "master" : "slave"
5141 );
5142 if (server.masterhost) {
5143 info = sdscatprintf(info,
5144 "master_host:%s\r\n"
5145 "master_port:%d\r\n"
5146 "master_link_status:%s\r\n"
5147 "master_last_io_seconds_ago:%d\r\n"
5148 ,server.masterhost,
5149 server.masterport,
5150 (server.replstate == REDIS_REPL_CONNECTED) ?
5151 "up" : "down",
5152 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5153 );
5154 }
5155 for (j = 0; j < server.dbnum; j++) {
5156 long long keys, vkeys;
5157
5158 keys = dictSize(server.db[j].dict);
5159 vkeys = dictSize(server.db[j].expires);
5160 if (keys || vkeys) {
5161 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5162 j, keys, vkeys);
5163 }
5164 }
5165 return info;
5166 }
5167
5168 static void infoCommand(redisClient *c) {
5169 sds info = genRedisInfoString();
5170 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5171 (unsigned long)sdslen(info)));
5172 addReplySds(c,info);
5173 addReply(c,shared.crlf);
5174 }
5175
5176 static void monitorCommand(redisClient *c) {
5177 /* ignore MONITOR if aleady slave or in monitor mode */
5178 if (c->flags & REDIS_SLAVE) return;
5179
5180 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5181 c->slaveseldb = 0;
5182 listAddNodeTail(server.monitors,c);
5183 addReply(c,shared.ok);
5184 }
5185
5186 /* ================================= Expire ================================= */
5187 static int removeExpire(redisDb *db, robj *key) {
5188 if (dictDelete(db->expires,key) == DICT_OK) {
5189 return 1;
5190 } else {
5191 return 0;
5192 }
5193 }
5194
5195 static int setExpire(redisDb *db, robj *key, time_t when) {
5196 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5197 return 0;
5198 } else {
5199 incrRefCount(key);
5200 return 1;
5201 }
5202 }
5203
5204 /* Return the expire time of the specified key, or -1 if no expire
5205 * is associated with this key (i.e. the key is non volatile) */
5206 static time_t getExpire(redisDb *db, robj *key) {
5207 dictEntry *de;
5208
5209 /* No expire? return ASAP */
5210 if (dictSize(db->expires) == 0 ||
5211 (de = dictFind(db->expires,key)) == NULL) return -1;
5212
5213 return (time_t) dictGetEntryVal(de);
5214 }
5215
5216 static int expireIfNeeded(redisDb *db, robj *key) {
5217 time_t when;
5218 dictEntry *de;
5219
5220 /* No expire? return ASAP */
5221 if (dictSize(db->expires) == 0 ||
5222 (de = dictFind(db->expires,key)) == NULL) return 0;
5223
5224 /* Lookup the expire */
5225 when = (time_t) dictGetEntryVal(de);
5226 if (time(NULL) <= when) return 0;
5227
5228 /* Delete the key */
5229 dictDelete(db->expires,key);
5230 return dictDelete(db->dict,key) == DICT_OK;
5231 }
5232
5233 static int deleteIfVolatile(redisDb *db, robj *key) {
5234 dictEntry *de;
5235
5236 /* No expire? return ASAP */
5237 if (dictSize(db->expires) == 0 ||
5238 (de = dictFind(db->expires,key)) == NULL) return 0;
5239
5240 /* Delete the key */
5241 server.dirty++;
5242 dictDelete(db->expires,key);
5243 return dictDelete(db->dict,key) == DICT_OK;
5244 }
5245
5246 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5247 dictEntry *de;
5248
5249 de = dictFind(c->db->dict,key);
5250 if (de == NULL) {
5251 addReply(c,shared.czero);
5252 return;
5253 }
5254 if (seconds < 0) {
5255 if (deleteKey(c->db,key)) server.dirty++;
5256 addReply(c, shared.cone);
5257 return;
5258 } else {
5259 time_t when = time(NULL)+seconds;
5260 if (setExpire(c->db,key,when)) {
5261 addReply(c,shared.cone);
5262 server.dirty++;
5263 } else {
5264 addReply(c,shared.czero);
5265 }
5266 return;
5267 }
5268 }
5269
5270 static void expireCommand(redisClient *c) {
5271 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5272 }
5273
5274 static void expireatCommand(redisClient *c) {
5275 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5276 }
5277
5278 static void ttlCommand(redisClient *c) {
5279 time_t expire;
5280 int ttl = -1;
5281
5282 expire = getExpire(c->db,c->argv[1]);
5283 if (expire != -1) {
5284 ttl = (int) (expire-time(NULL));
5285 if (ttl < 0) ttl = -1;
5286 }
5287 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5288 }
5289
5290 /* =============================== Replication ============================= */
5291
5292 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5293 ssize_t nwritten, ret = size;
5294 time_t start = time(NULL);
5295
5296 timeout++;
5297 while(size) {
5298 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5299 nwritten = write(fd,ptr,size);
5300 if (nwritten == -1) return -1;
5301 ptr += nwritten;
5302 size -= nwritten;
5303 }
5304 if ((time(NULL)-start) > timeout) {
5305 errno = ETIMEDOUT;
5306 return -1;
5307 }
5308 }
5309 return ret;
5310 }
5311
5312 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5313 ssize_t nread, totread = 0;
5314 time_t start = time(NULL);
5315
5316 timeout++;
5317 while(size) {
5318 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5319 nread = read(fd,ptr,size);
5320 if (nread == -1) return -1;
5321 ptr += nread;
5322 size -= nread;
5323 totread += nread;
5324 }
5325 if ((time(NULL)-start) > timeout) {
5326 errno = ETIMEDOUT;
5327 return -1;
5328 }
5329 }
5330 return totread;
5331 }
5332
5333 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5334 ssize_t nread = 0;
5335
5336 size--;
5337 while(size) {
5338 char c;
5339
5340 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5341 if (c == '\n') {
5342 *ptr = '\0';
5343 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5344 return nread;
5345 } else {
5346 *ptr++ = c;
5347 *ptr = '\0';
5348 nread++;
5349 }
5350 }
5351 return nread;
5352 }
5353
5354 static void syncCommand(redisClient *c) {
5355 /* ignore SYNC if aleady slave or in monitor mode */
5356 if (c->flags & REDIS_SLAVE) return;
5357
5358 /* SYNC can't be issued when the server has pending data to send to
5359 * the client about already issued commands. We need a fresh reply
5360 * buffer registering the differences between the BGSAVE and the current
5361 * dataset, so that we can copy to other slaves if needed. */
5362 if (listLength(c->reply) != 0) {
5363 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5364 return;
5365 }
5366
5367 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5368 /* Here we need to check if there is a background saving operation
5369 * in progress, or if it is required to start one */
5370 if (server.bgsavechildpid != -1) {
5371 /* Ok a background save is in progress. Let's check if it is a good
5372 * one for replication, i.e. if there is another slave that is
5373 * registering differences since the server forked to save */
5374 redisClient *slave;
5375 listNode *ln;
5376
5377 listRewind(server.slaves);
5378 while((ln = listYield(server.slaves))) {
5379 slave = ln->value;
5380 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5381 }
5382 if (ln) {
5383 /* Perfect, the server is already registering differences for
5384 * another slave. Set the right state, and copy the buffer. */
5385 listRelease(c->reply);
5386 c->reply = listDup(slave->reply);
5387 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5388 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5389 } else {
5390 /* No way, we need to wait for the next BGSAVE in order to
5391 * register differences */
5392 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5393 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5394 }
5395 } else {
5396 /* Ok we don't have a BGSAVE in progress, let's start one */
5397 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5398 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5399 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5400 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5401 return;
5402 }
5403 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5404 }
5405 c->repldbfd = -1;
5406 c->flags |= REDIS_SLAVE;
5407 c->slaveseldb = 0;
5408 listAddNodeTail(server.slaves,c);
5409 return;
5410 }
5411
5412 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5413 redisClient *slave = privdata;
5414 REDIS_NOTUSED(el);
5415 REDIS_NOTUSED(mask);
5416 char buf[REDIS_IOBUF_LEN];
5417 ssize_t nwritten, buflen;
5418
5419 if (slave->repldboff == 0) {
5420 /* Write the bulk write count before to transfer the DB. In theory here
5421 * we don't know how much room there is in the output buffer of the
5422 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5423 * operations) will never be smaller than the few bytes we need. */
5424 sds bulkcount;
5425
5426 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5427 slave->repldbsize);
5428 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5429 {
5430 sdsfree(bulkcount);
5431 freeClient(slave);
5432 return;
5433 }
5434 sdsfree(bulkcount);
5435 }
5436 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5437 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5438 if (buflen <= 0) {
5439 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5440 (buflen == 0) ? "premature EOF" : strerror(errno));
5441 freeClient(slave);
5442 return;
5443 }
5444 if ((nwritten = write(fd,buf,buflen)) == -1) {
5445 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5446 strerror(errno));
5447 freeClient(slave);
5448 return;
5449 }
5450 slave->repldboff += nwritten;
5451 if (slave->repldboff == slave->repldbsize) {
5452 close(slave->repldbfd);
5453 slave->repldbfd = -1;
5454 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5455 slave->replstate = REDIS_REPL_ONLINE;
5456 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5457 sendReplyToClient, slave) == AE_ERR) {
5458 freeClient(slave);
5459 return;
5460 }
5461 addReplySds(slave,sdsempty());
5462 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5463 }
5464 }
5465
5466 /* This function is called at the end of every backgrond saving.
5467 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5468 * otherwise REDIS_ERR is passed to the function.
5469 *
5470 * The goal of this function is to handle slaves waiting for a successful
5471 * background saving in order to perform non-blocking synchronization. */
5472 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5473 listNode *ln;
5474 int startbgsave = 0;
5475
5476 listRewind(server.slaves);
5477 while((ln = listYield(server.slaves))) {
5478 redisClient *slave = ln->value;
5479
5480 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5481 startbgsave = 1;
5482 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5483 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5484 struct redis_stat buf;
5485
5486 if (bgsaveerr != REDIS_OK) {
5487 freeClient(slave);
5488 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5489 continue;
5490 }
5491 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5492 redis_fstat(slave->repldbfd,&buf) == -1) {
5493 freeClient(slave);
5494 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5495 continue;
5496 }
5497 slave->repldboff = 0;
5498 slave->repldbsize = buf.st_size;
5499 slave->replstate = REDIS_REPL_SEND_BULK;
5500 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5501 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5502 freeClient(slave);
5503 continue;
5504 }
5505 }
5506 }
5507 if (startbgsave) {
5508 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5509 listRewind(server.slaves);
5510 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5511 while((ln = listYield(server.slaves))) {
5512 redisClient *slave = ln->value;
5513
5514 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5515 freeClient(slave);
5516 }
5517 }
5518 }
5519 }
5520
5521 static int syncWithMaster(void) {
5522 char buf[1024], tmpfile[256], authcmd[1024];
5523 int dumpsize;
5524 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5525 int dfd;
5526
5527 if (fd == -1) {
5528 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5529 strerror(errno));
5530 return REDIS_ERR;
5531 }
5532
5533 /* AUTH with the master if required. */
5534 if(server.masterauth) {
5535 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5536 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5537 close(fd);
5538 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5539 strerror(errno));
5540 return REDIS_ERR;
5541 }
5542 /* Read the AUTH result. */
5543 if (syncReadLine(fd,buf,1024,3600) == -1) {
5544 close(fd);
5545 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5546 strerror(errno));
5547 return REDIS_ERR;
5548 }
5549 if (buf[0] != '+') {
5550 close(fd);
5551 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5552 return REDIS_ERR;
5553 }
5554 }
5555
5556 /* Issue the SYNC command */
5557 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5558 close(fd);
5559 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5560 strerror(errno));
5561 return REDIS_ERR;
5562 }
5563 /* Read the bulk write count */
5564 if (syncReadLine(fd,buf,1024,3600) == -1) {
5565 close(fd);
5566 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5567 strerror(errno));
5568 return REDIS_ERR;
5569 }
5570 if (buf[0] != '$') {
5571 close(fd);
5572 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5573 return REDIS_ERR;
5574 }
5575 dumpsize = atoi(buf+1);
5576 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5577 /* Read the bulk write data on a temp file */
5578 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5579 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5580 if (dfd == -1) {
5581 close(fd);
5582 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5583 return REDIS_ERR;
5584 }
5585 while(dumpsize) {
5586 int nread, nwritten;
5587
5588 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5589 if (nread == -1) {
5590 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5591 strerror(errno));
5592 close(fd);
5593 close(dfd);
5594 return REDIS_ERR;
5595 }
5596 nwritten = write(dfd,buf,nread);
5597 if (nwritten == -1) {
5598 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5599 close(fd);
5600 close(dfd);
5601 return REDIS_ERR;
5602 }
5603 dumpsize -= nread;
5604 }
5605 close(dfd);
5606 if (rename(tmpfile,server.dbfilename) == -1) {
5607 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5608 unlink(tmpfile);
5609 close(fd);
5610 return REDIS_ERR;
5611 }
5612 emptyDb();
5613 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5614 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5615 close(fd);
5616 return REDIS_ERR;
5617 }
5618 server.master = createClient(fd);
5619 server.master->flags |= REDIS_MASTER;
5620 server.master->authenticated = 1;
5621 server.replstate = REDIS_REPL_CONNECTED;
5622 return REDIS_OK;
5623 }
5624
5625 static void slaveofCommand(redisClient *c) {
5626 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5627 !strcasecmp(c->argv[2]->ptr,"one")) {
5628 if (server.masterhost) {
5629 sdsfree(server.masterhost);
5630 server.masterhost = NULL;
5631 if (server.master) freeClient(server.master);
5632 server.replstate = REDIS_REPL_NONE;
5633 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5634 }
5635 } else {
5636 sdsfree(server.masterhost);
5637 server.masterhost = sdsdup(c->argv[1]->ptr);
5638 server.masterport = atoi(c->argv[2]->ptr);
5639 if (server.master) freeClient(server.master);
5640 server.replstate = REDIS_REPL_CONNECT;
5641 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5642 server.masterhost, server.masterport);
5643 }
5644 addReply(c,shared.ok);
5645 }
5646
5647 /* ============================ Maxmemory directive ======================== */
5648
5649 /* This function gets called when 'maxmemory' is set on the config file to limit
5650 * the max memory used by the server, and we are out of memory.
5651 * This function will try to, in order:
5652 *
5653 * - Free objects from the free list
5654 * - Try to remove keys with an EXPIRE set
5655 *
5656 * It is not possible to free enough memory to reach used-memory < maxmemory
5657 * the server will start refusing commands that will enlarge even more the
5658 * memory usage.
5659 */
5660 static void freeMemoryIfNeeded(void) {
5661 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5662 if (listLength(server.objfreelist)) {
5663 robj *o;
5664
5665 listNode *head = listFirst(server.objfreelist);
5666 o = listNodeValue(head);
5667 listDelNode(server.objfreelist,head);
5668 zfree(o);
5669 } else {
5670 int j, k, freed = 0;
5671
5672 for (j = 0; j < server.dbnum; j++) {
5673 int minttl = -1;
5674 robj *minkey = NULL;
5675 struct dictEntry *de;
5676
5677 if (dictSize(server.db[j].expires)) {
5678 freed = 1;
5679 /* From a sample of three keys drop the one nearest to
5680 * the natural expire */
5681 for (k = 0; k < 3; k++) {
5682 time_t t;
5683
5684 de = dictGetRandomKey(server.db[j].expires);
5685 t = (time_t) dictGetEntryVal(de);
5686 if (minttl == -1 || t < minttl) {
5687 minkey = dictGetEntryKey(de);
5688 minttl = t;
5689 }
5690 }
5691 deleteKey(server.db+j,minkey);
5692 }
5693 }
5694 if (!freed) return; /* nothing to free... */
5695 }
5696 }
5697 }
5698
5699 /* ============================== Append Only file ========================== */
5700
5701 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5702 sds buf = sdsempty();
5703 int j;
5704 ssize_t nwritten;
5705 time_t now;
5706 robj *tmpargv[3];
5707
5708 /* The DB this command was targetting is not the same as the last command
5709 * we appendend. To issue a SELECT command is needed. */
5710 if (dictid != server.appendseldb) {
5711 char seldb[64];
5712
5713 snprintf(seldb,sizeof(seldb),"%d",dictid);
5714 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
5715 (unsigned long)strlen(seldb),seldb);
5716 server.appendseldb = dictid;
5717 }
5718
5719 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5720 * EXPIREs into EXPIREATs calls */
5721 if (cmd->proc == expireCommand) {
5722 long when;
5723
5724 tmpargv[0] = createStringObject("EXPIREAT",8);
5725 tmpargv[1] = argv[1];
5726 incrRefCount(argv[1]);
5727 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5728 tmpargv[2] = createObject(REDIS_STRING,
5729 sdscatprintf(sdsempty(),"%ld",when));
5730 argv = tmpargv;
5731 }
5732
5733 /* Append the actual command */
5734 buf = sdscatprintf(buf,"*%d\r\n",argc);
5735 for (j = 0; j < argc; j++) {
5736 robj *o = argv[j];
5737
5738 o = getDecodedObject(o);
5739 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
5740 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5741 buf = sdscatlen(buf,"\r\n",2);
5742 decrRefCount(o);
5743 }
5744
5745 /* Free the objects from the modified argv for EXPIREAT */
5746 if (cmd->proc == expireCommand) {
5747 for (j = 0; j < 3; j++)
5748 decrRefCount(argv[j]);
5749 }
5750
5751 /* We want to perform a single write. This should be guaranteed atomic
5752 * at least if the filesystem we are writing is a real physical one.
5753 * While this will save us against the server being killed I don't think
5754 * there is much to do about the whole server stopping for power problems
5755 * or alike */
5756 nwritten = write(server.appendfd,buf,sdslen(buf));
5757 if (nwritten != (signed)sdslen(buf)) {
5758 /* Ooops, we are in troubles. The best thing to do for now is
5759 * to simply exit instead to give the illusion that everything is
5760 * working as expected. */
5761 if (nwritten == -1) {
5762 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5763 } else {
5764 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5765 }
5766 exit(1);
5767 }
5768 /* If a background append only file rewriting is in progress we want to
5769 * accumulate the differences between the child DB and the current one
5770 * in a buffer, so that when the child process will do its work we
5771 * can append the differences to the new append only file. */
5772 if (server.bgrewritechildpid != -1)
5773 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
5774
5775 sdsfree(buf);
5776 now = time(NULL);
5777 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5778 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5779 now-server.lastfsync > 1))
5780 {
5781 fsync(server.appendfd); /* Let's try to get this data on the disk */
5782 server.lastfsync = now;
5783 }
5784 }
5785
5786 /* In Redis commands are always executed in the context of a client, so in
5787 * order to load the append only file we need to create a fake client. */
5788 static struct redisClient *createFakeClient(void) {
5789 struct redisClient *c = zmalloc(sizeof(*c));
5790
5791 selectDb(c,0);
5792 c->fd = -1;
5793 c->querybuf = sdsempty();
5794 c->argc = 0;
5795 c->argv = NULL;
5796 c->flags = 0;
5797 /* We set the fake client as a slave waiting for the synchronization
5798 * so that Redis will not try to send replies to this client. */
5799 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5800 c->reply = listCreate();
5801 listSetFreeMethod(c->reply,decrRefCount);
5802 listSetDupMethod(c->reply,dupClientReplyValue);
5803 return c;
5804 }
5805
5806 static void freeFakeClient(struct redisClient *c) {
5807 sdsfree(c->querybuf);
5808 listRelease(c->reply);
5809 zfree(c);
5810 }
5811
5812 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5813 * error (the append only file is zero-length) REDIS_ERR is returned. On
5814 * fatal error an error message is logged and the program exists. */
5815 int loadAppendOnlyFile(char *filename) {
5816 struct redisClient *fakeClient;
5817 FILE *fp = fopen(filename,"r");
5818 struct redis_stat sb;
5819
5820 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5821 return REDIS_ERR;
5822
5823 if (fp == NULL) {
5824 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5825 exit(1);
5826 }
5827
5828 fakeClient = createFakeClient();
5829 while(1) {
5830 int argc, j;
5831 unsigned long len;
5832 robj **argv;
5833 char buf[128];
5834 sds argsds;
5835 struct redisCommand *cmd;
5836
5837 if (fgets(buf,sizeof(buf),fp) == NULL) {
5838 if (feof(fp))
5839 break;
5840 else
5841 goto readerr;
5842 }
5843 if (buf[0] != '*') goto fmterr;
5844 argc = atoi(buf+1);
5845 argv = zmalloc(sizeof(robj*)*argc);
5846 for (j = 0; j < argc; j++) {
5847 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5848 if (buf[0] != '$') goto fmterr;
5849 len = strtol(buf+1,NULL,10);
5850 argsds = sdsnewlen(NULL,len);
5851 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
5852 argv[j] = createObject(REDIS_STRING,argsds);
5853 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5854 }
5855
5856 /* Command lookup */
5857 cmd = lookupCommand(argv[0]->ptr);
5858 if (!cmd) {
5859 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5860 exit(1);
5861 }
5862 /* Try object sharing and encoding */
5863 if (server.shareobjects) {
5864 int j;
5865 for(j = 1; j < argc; j++)
5866 argv[j] = tryObjectSharing(argv[j]);
5867 }
5868 if (cmd->flags & REDIS_CMD_BULK)
5869 tryObjectEncoding(argv[argc-1]);
5870 /* Run the command in the context of a fake client */
5871 fakeClient->argc = argc;
5872 fakeClient->argv = argv;
5873 cmd->proc(fakeClient);
5874 /* Discard the reply objects list from the fake client */
5875 while(listLength(fakeClient->reply))
5876 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5877 /* Clean up, ready for the next command */
5878 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5879 zfree(argv);
5880 }
5881 fclose(fp);
5882 freeFakeClient(fakeClient);
5883 return REDIS_OK;
5884
5885 readerr:
5886 if (feof(fp)) {
5887 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5888 } else {
5889 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5890 }
5891 exit(1);
5892 fmterr:
5893 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5894 exit(1);
5895 }
5896
5897 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5898 static int fwriteBulk(FILE *fp, robj *obj) {
5899 char buf[128];
5900 obj = getDecodedObject(obj);
5901 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
5902 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
5903 if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0)
5904 goto err;
5905 if (fwrite("\r\n",2,1,fp) == 0) goto err;
5906 decrRefCount(obj);
5907 return 1;
5908 err:
5909 decrRefCount(obj);
5910 return 0;
5911 }
5912
5913 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5914 static int fwriteBulkDouble(FILE *fp, double d) {
5915 char buf[128], dbuf[128];
5916
5917 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
5918 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
5919 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5920 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
5921 return 1;
5922 }
5923
5924 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5925 static int fwriteBulkLong(FILE *fp, long l) {
5926 char buf[128], lbuf[128];
5927
5928 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
5929 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
5930 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5931 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
5932 return 1;
5933 }
5934
5935 /* Write a sequence of commands able to fully rebuild the dataset into
5936 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5937 static int rewriteAppendOnlyFile(char *filename) {
5938 dictIterator *di = NULL;
5939 dictEntry *de;
5940 FILE *fp;
5941 char tmpfile[256];
5942 int j;
5943 time_t now = time(NULL);
5944
5945 /* Note that we have to use a different temp name here compared to the
5946 * one used by rewriteAppendOnlyFileBackground() function. */
5947 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
5948 fp = fopen(tmpfile,"w");
5949 if (!fp) {
5950 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
5951 return REDIS_ERR;
5952 }
5953 for (j = 0; j < server.dbnum; j++) {
5954 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
5955 redisDb *db = server.db+j;
5956 dict *d = db->dict;
5957 if (dictSize(d) == 0) continue;
5958 di = dictGetIterator(d);
5959 if (!di) {
5960 fclose(fp);
5961 return REDIS_ERR;
5962 }
5963
5964 /* SELECT the new DB */
5965 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
5966 if (fwriteBulkLong(fp,j) == 0) goto werr;
5967
5968 /* Iterate this DB writing every entry */
5969 while((de = dictNext(di)) != NULL) {
5970 robj *key = dictGetEntryKey(de);
5971 robj *o = dictGetEntryVal(de);
5972 time_t expiretime = getExpire(db,key);
5973
5974 /* Save the key and associated value */
5975 if (o->type == REDIS_STRING) {
5976 /* Emit a SET command */
5977 char cmd[]="*3\r\n$3\r\nSET\r\n";
5978 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5979 /* Key and value */
5980 if (fwriteBulk(fp,key) == 0) goto werr;
5981 if (fwriteBulk(fp,o) == 0) goto werr;
5982 } else if (o->type == REDIS_LIST) {
5983 /* Emit the RPUSHes needed to rebuild the list */
5984 list *list = o->ptr;
5985 listNode *ln;
5986
5987 listRewind(list);
5988 while((ln = listYield(list))) {
5989 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
5990 robj *eleobj = listNodeValue(ln);
5991
5992 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5993 if (fwriteBulk(fp,key) == 0) goto werr;
5994 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5995 }
5996 } else if (o->type == REDIS_SET) {
5997 /* Emit the SADDs needed to rebuild the set */
5998 dict *set = o->ptr;
5999 dictIterator *di = dictGetIterator(set);
6000 dictEntry *de;
6001
6002 while((de = dictNext(di)) != NULL) {
6003 char cmd[]="*3\r\n$4\r\nSADD\r\n";
6004 robj *eleobj = dictGetEntryKey(de);
6005
6006 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6007 if (fwriteBulk(fp,key) == 0) goto werr;
6008 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6009 }
6010 dictReleaseIterator(di);
6011 } else if (o->type == REDIS_ZSET) {
6012 /* Emit the ZADDs needed to rebuild the sorted set */
6013 zset *zs = o->ptr;
6014 dictIterator *di = dictGetIterator(zs->dict);
6015 dictEntry *de;
6016
6017 while((de = dictNext(di)) != NULL) {
6018 char cmd[]="*4\r\n$4\r\nZADD\r\n";
6019 robj *eleobj = dictGetEntryKey(de);
6020 double *score = dictGetEntryVal(de);
6021
6022 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6023 if (fwriteBulk(fp,key) == 0) goto werr;
6024 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
6025 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6026 }
6027 dictReleaseIterator(di);
6028 } else {
6029 redisAssert(0 != 0);
6030 }
6031 /* Save the expire time */
6032 if (expiretime != -1) {
6033 char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
6034 /* If this key is already expired skip it */
6035 if (expiretime < now) continue;
6036 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6037 if (fwriteBulk(fp,key) == 0) goto werr;
6038 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6039 }
6040 }
6041 dictReleaseIterator(di);
6042 }
6043
6044 /* Make sure data will not remain on the OS's output buffers */
6045 fflush(fp);
6046 fsync(fileno(fp));
6047 fclose(fp);
6048
6049 /* Use RENAME to make sure the DB file is changed atomically only
6050 * if the generate DB file is ok. */
6051 if (rename(tmpfile,filename) == -1) {
6052 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6053 unlink(tmpfile);
6054 return REDIS_ERR;
6055 }
6056 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6057 return REDIS_OK;
6058
6059 werr:
6060 fclose(fp);
6061 unlink(tmpfile);
6062 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
6063 if (di) dictReleaseIterator(di);
6064 return REDIS_ERR;
6065 }
6066
6067 /* This is how rewriting of the append only file in background works:
6068 *
6069 * 1) The user calls BGREWRITEAOF
6070 * 2) Redis calls this function, that forks():
6071 * 2a) the child rewrite the append only file in a temp file.
6072 * 2b) the parent accumulates differences in server.bgrewritebuf.
6073 * 3) When the child finished '2a' exists.
6074 * 4) The parent will trap the exit code, if it's OK, will append the
6075 * data accumulated into server.bgrewritebuf into the temp file, and
6076 * finally will rename(2) the temp file in the actual file name.
6077 * The the new file is reopened as the new append only file. Profit!
6078 */
6079 static int rewriteAppendOnlyFileBackground(void) {
6080 pid_t childpid;
6081
6082 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6083 if ((childpid = fork()) == 0) {
6084 /* Child */
6085 char tmpfile[256];
6086 close(server.fd);
6087
6088 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6089 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6090 exit(0);
6091 } else {
6092 exit(1);
6093 }
6094 } else {
6095 /* Parent */
6096 if (childpid == -1) {
6097 redisLog(REDIS_WARNING,
6098 "Can't rewrite append only file in background: fork: %s",
6099 strerror(errno));
6100 return REDIS_ERR;
6101 }
6102 redisLog(REDIS_NOTICE,
6103 "Background append only file rewriting started by pid %d",childpid);
6104 server.bgrewritechildpid = childpid;
6105 /* We set appendseldb to -1 in order to force the next call to the
6106 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6107 * accumulated by the parent into server.bgrewritebuf will start
6108 * with a SELECT statement and it will be safe to merge. */
6109 server.appendseldb = -1;
6110 return REDIS_OK;
6111 }
6112 return REDIS_OK; /* unreached */
6113 }
6114
6115 static void bgrewriteaofCommand(redisClient *c) {
6116 if (server.bgrewritechildpid != -1) {
6117 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6118 return;
6119 }
6120 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6121 char *status = "+Background append only file rewriting started\r\n";
6122 addReplySds(c,sdsnew(status));
6123 } else {
6124 addReply(c,shared.err);
6125 }
6126 }
6127
6128 static void aofRemoveTempFile(pid_t childpid) {
6129 char tmpfile[256];
6130
6131 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6132 unlink(tmpfile);
6133 }
6134
6135 /* ================================= Debugging ============================== */
6136
6137 static void debugCommand(redisClient *c) {
6138 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6139 *((char*)-1) = 'x';
6140 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6141 if (rdbSave(server.dbfilename) != REDIS_OK) {
6142 addReply(c,shared.err);
6143 return;
6144 }
6145 emptyDb();
6146 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6147 addReply(c,shared.err);
6148 return;
6149 }
6150 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6151 addReply(c,shared.ok);
6152 } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
6153 emptyDb();
6154 if (loadAppendOnlyFile(server.appendfilename) != REDIS_OK) {
6155 addReply(c,shared.err);
6156 return;
6157 }
6158 redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
6159 addReply(c,shared.ok);
6160 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6161 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6162 robj *key, *val;
6163
6164 if (!de) {
6165 addReply(c,shared.nokeyerr);
6166 return;
6167 }
6168 key = dictGetEntryKey(de);
6169 val = dictGetEntryVal(de);
6170 addReplySds(c,sdscatprintf(sdsempty(),
6171 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6172 (void*)key, key->refcount, (void*)val, val->refcount,
6173 val->encoding));
6174 } else {
6175 addReplySds(c,sdsnew(
6176 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6177 }
6178 }
6179
6180 static void _redisAssert(char *estr) {
6181 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6182 redisLog(REDIS_WARNING,"==> %s\n",estr);
6183 #ifdef HAVE_BACKTRACE
6184 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6185 *((char*)-1) = 'x';
6186 #endif
6187 }
6188
6189 /* =================================== Main! ================================ */
6190
6191 #ifdef __linux__
6192 int linuxOvercommitMemoryValue(void) {
6193 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6194 char buf[64];
6195
6196 if (!fp) return -1;
6197 if (fgets(buf,64,fp) == NULL) {
6198 fclose(fp);
6199 return -1;
6200 }
6201 fclose(fp);
6202
6203 return atoi(buf);
6204 }
6205
6206 void linuxOvercommitMemoryWarning(void) {
6207 if (linuxOvercommitMemoryValue() == 0) {
6208 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6209 }
6210 }
6211 #endif /* __linux__ */
6212
6213 static void daemonize(void) {
6214 int fd;
6215 FILE *fp;
6216
6217 if (fork() != 0) exit(0); /* parent exits */
6218 printf("New pid: %d\n", getpid());
6219 setsid(); /* create a new session */
6220
6221 /* Every output goes to /dev/null. If Redis is daemonized but
6222 * the 'logfile' is set to 'stdout' in the configuration file
6223 * it will not log at all. */
6224 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6225 dup2(fd, STDIN_FILENO);
6226 dup2(fd, STDOUT_FILENO);
6227 dup2(fd, STDERR_FILENO);
6228 if (fd > STDERR_FILENO) close(fd);
6229 }
6230 /* Try to write the pid file */
6231 fp = fopen(server.pidfile,"w");
6232 if (fp) {
6233 fprintf(fp,"%d\n",getpid());
6234 fclose(fp);
6235 }
6236 }
6237
6238 int main(int argc, char **argv) {
6239 initServerConfig();
6240 if (argc == 2) {
6241 resetServerSaveParams();
6242 loadServerConfig(argv[1]);
6243 } else if (argc > 2) {
6244 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6245 exit(1);
6246 } else {
6247 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6248 }
6249 if (server.daemonize) daemonize();
6250 initServer();
6251 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6252 #ifdef __linux__
6253 linuxOvercommitMemoryWarning();
6254 #endif
6255 if (server.appendonly) {
6256 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6257 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6258 } else {
6259 if (rdbLoad(server.dbfilename) == REDIS_OK)
6260 redisLog(REDIS_NOTICE,"DB loaded from disk");
6261 }
6262 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6263 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6264 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6265 aeMain(server.el);
6266 aeDeleteEventLoop(server.el);
6267 return 0;
6268 }
6269
6270 /* ============================= Backtrace support ========================= */
6271
6272 #ifdef HAVE_BACKTRACE
6273 static char *findFuncName(void *pointer, unsigned long *offset);
6274
6275 static void *getMcontextEip(ucontext_t *uc) {
6276 #if defined(__FreeBSD__)
6277 return (void*) uc->uc_mcontext.mc_eip;
6278 #elif defined(__dietlibc__)
6279 return (void*) uc->uc_mcontext.eip;
6280 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6281 #if __x86_64__
6282 return (void*) uc->uc_mcontext->__ss.__rip;
6283 #else
6284 return (void*) uc->uc_mcontext->__ss.__eip;
6285 #endif
6286 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6287 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6288 return (void*) uc->uc_mcontext->__ss.__rip;
6289 #else
6290 return (void*) uc->uc_mcontext->__ss.__eip;
6291 #endif
6292 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6293 return (void*) uc->uc_mcontext.gregs[REG_EIP];
6294 #elif defined(__ia64__) /* Linux IA64 */
6295 return (void*) uc->uc_mcontext.sc_ip;
6296 #else
6297 return NULL;
6298 #endif
6299 }
6300
6301 static void segvHandler(int sig, siginfo_t *info, void *secret) {
6302 void *trace[100];
6303 char **messages = NULL;
6304 int i, trace_size = 0;
6305 unsigned long offset=0;
6306 ucontext_t *uc = (ucontext_t*) secret;
6307 sds infostring;
6308 REDIS_NOTUSED(info);
6309
6310 redisLog(REDIS_WARNING,
6311 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6312 infostring = genRedisInfoString();
6313 redisLog(REDIS_WARNING, "%s",infostring);
6314 /* It's not safe to sdsfree() the returned string under memory
6315 * corruption conditions. Let it leak as we are going to abort */
6316
6317 trace_size = backtrace(trace, 100);
6318 /* overwrite sigaction with caller's address */
6319 if (getMcontextEip(uc) != NULL) {
6320 trace[1] = getMcontextEip(uc);
6321 }
6322 messages = backtrace_symbols(trace, trace_size);
6323
6324 for (i=1; i<trace_size; ++i) {
6325 char *fn = findFuncName(trace[i], &offset), *p;
6326
6327 p = strchr(messages[i],'+');
6328 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6329 redisLog(REDIS_WARNING,"%s", messages[i]);
6330 } else {
6331 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6332 }
6333 }
6334 // free(messages); Don't call free() with possibly corrupted memory.
6335 exit(0);
6336 }
6337
6338 static void setupSigSegvAction(void) {
6339 struct sigaction act;
6340
6341 sigemptyset (&act.sa_mask);
6342 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6343 * is used. Otherwise, sa_handler is used */
6344 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6345 act.sa_sigaction = segvHandler;
6346 sigaction (SIGSEGV, &act, NULL);
6347 sigaction (SIGBUS, &act, NULL);
6348 sigaction (SIGFPE, &act, NULL);
6349 sigaction (SIGILL, &act, NULL);
6350 sigaction (SIGBUS, &act, NULL);
6351 return;
6352 }
6353
6354 #include "staticsymbols.h"
6355 /* This function try to convert a pointer into a function name. It's used in
6356 * oreder to provide a backtrace under segmentation fault that's able to
6357 * display functions declared as static (otherwise the backtrace is useless). */
6358 static char *findFuncName(void *pointer, unsigned long *offset){
6359 int i, ret = -1;
6360 unsigned long off, minoff = 0;
6361
6362 /* Try to match against the Symbol with the smallest offset */
6363 for (i=0; symsTable[i].pointer; i++) {
6364 unsigned long lp = (unsigned long) pointer;
6365
6366 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6367 off=lp-symsTable[i].pointer;
6368 if (ret < 0 || off < minoff) {
6369 minoff=off;
6370 ret=i;
6371 }
6372 }
6373 }
6374 if (ret == -1) return NULL;
6375 *offset = minoff;
6376 return symsTable[ret].name;
6377 }
6378 #else /* HAVE_BACKTRACE */
6379 static void setupSigSegvAction(void) {
6380 }
6381 #endif /* HAVE_BACKTRACE */
6382
6383
6384
6385 /* The End */
6386
6387
6388