]> git.saurik.com Git - redis.git/blob - redis.c
622c7d7456b8b411dee80e6e69e6fb63d8b58ac3
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.1.93"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Client flags */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
165
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
174
175 /* List related stuff */
176 #define REDIS_HEAD 0
177 #define REDIS_TAIL 1
178
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
184
185 /* Log levels */
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
189
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
192
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
195
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
200
201 /* We can print the stacktrace, so our assert is defined this way: */
202 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
203 static void _redisAssert(char *estr);
204
205 /*================================= Data types ============================== */
206
207 /* A redis object, that is a type able to hold a string / list / set */
208 typedef struct redisObject {
209 void *ptr;
210 unsigned char type;
211 unsigned char encoding;
212 unsigned char notused[2];
213 int refcount;
214 } robj;
215
216 /* Macro used to initalize a Redis object allocated on the stack.
217 * Note that this macro is taken near the structure definition to make sure
218 * we'll update it when the structure is changed, to avoid bugs like
219 * bug #85 introduced exactly in this way. */
220 #define initStaticStringObject(_var,_ptr) do { \
221 _var.refcount = 1; \
222 _var.type = REDIS_STRING; \
223 _var.encoding = REDIS_ENCODING_RAW; \
224 _var.ptr = _ptr; \
225 } while(0);
226
227 typedef struct redisDb {
228 dict *dict;
229 dict *expires;
230 int id;
231 } redisDb;
232
233 /* With multiplexing we need to take per-clinet state.
234 * Clients are taken in a liked list. */
235 typedef struct redisClient {
236 int fd;
237 redisDb *db;
238 int dictid;
239 sds querybuf;
240 robj **argv, **mbargv;
241 int argc, mbargc;
242 int bulklen; /* bulk read len. -1 if not in bulk read mode */
243 int multibulk; /* multi bulk command format active */
244 list *reply;
245 int sentlen;
246 time_t lastinteraction; /* time of the last interaction, used for timeout */
247 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
248 int slaveseldb; /* slave selected db, if this client is a slave */
249 int authenticated; /* when requirepass is non-NULL */
250 int replstate; /* replication state if this is a slave */
251 int repldbfd; /* replication DB file descriptor */
252 long repldboff; /* replication DB file offset */
253 off_t repldbsize; /* replication DB file size */
254 } redisClient;
255
256 struct saveparam {
257 time_t seconds;
258 int changes;
259 };
260
261 /* Global server state structure */
262 struct redisServer {
263 int port;
264 int fd;
265 redisDb *db;
266 dict *sharingpool;
267 unsigned int sharingpoolsize;
268 long long dirty; /* changes to DB from the last save */
269 list *clients;
270 list *slaves, *monitors;
271 char neterr[ANET_ERR_LEN];
272 aeEventLoop *el;
273 int cronloops; /* number of times the cron function run */
274 list *objfreelist; /* A list of freed objects to avoid malloc() */
275 time_t lastsave; /* Unix time of last save succeeede */
276 size_t usedmemory; /* Used memory in megabytes */
277 /* Fields used only for stats */
278 time_t stat_starttime; /* server start time */
279 long long stat_numcommands; /* number of processed commands */
280 long long stat_numconnections; /* number of connections received */
281 /* Configuration */
282 int verbosity;
283 int glueoutputbuf;
284 int maxidletime;
285 int dbnum;
286 int daemonize;
287 int appendonly;
288 int appendfsync;
289 time_t lastfsync;
290 int appendfd;
291 int appendseldb;
292 char *pidfile;
293 pid_t bgsavechildpid;
294 pid_t bgrewritechildpid;
295 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
296 struct saveparam *saveparams;
297 int saveparamslen;
298 char *logfile;
299 char *bindaddr;
300 char *dbfilename;
301 char *appendfilename;
302 char *requirepass;
303 int shareobjects;
304 int rdbcompression;
305 /* Replication related */
306 int isslave;
307 char *masterauth;
308 char *masterhost;
309 int masterport;
310 redisClient *master; /* client that is master for this slave */
311 int replstate;
312 unsigned int maxclients;
313 unsigned long maxmemory;
314 /* Sort parameters - qsort_r() is only available under BSD so we
315 * have to take this state global, in order to pass it to sortCompare() */
316 int sort_desc;
317 int sort_alpha;
318 int sort_bypattern;
319 };
320
321 typedef void redisCommandProc(redisClient *c);
322 struct redisCommand {
323 char *name;
324 redisCommandProc *proc;
325 int arity;
326 int flags;
327 };
328
329 struct redisFunctionSym {
330 char *name;
331 unsigned long pointer;
332 };
333
334 typedef struct _redisSortObject {
335 robj *obj;
336 union {
337 double score;
338 robj *cmpobj;
339 } u;
340 } redisSortObject;
341
342 typedef struct _redisSortOperation {
343 int type;
344 robj *pattern;
345 } redisSortOperation;
346
347 /* ZSETs use a specialized version of Skiplists */
348
349 typedef struct zskiplistNode {
350 struct zskiplistNode **forward;
351 struct zskiplistNode *backward;
352 double score;
353 robj *obj;
354 } zskiplistNode;
355
356 typedef struct zskiplist {
357 struct zskiplistNode *header, *tail;
358 unsigned long length;
359 int level;
360 } zskiplist;
361
362 typedef struct zset {
363 dict *dict;
364 zskiplist *zsl;
365 } zset;
366
367 /* Our shared "common" objects */
368
369 struct sharedObjectsStruct {
370 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
371 *colon, *nullbulk, *nullmultibulk,
372 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
373 *outofrangeerr, *plus,
374 *select0, *select1, *select2, *select3, *select4,
375 *select5, *select6, *select7, *select8, *select9;
376 } shared;
377
378 /* Global vars that are actally used as constants. The following double
379 * values are used for double on-disk serialization, and are initialized
380 * at runtime to avoid strange compiler optimizations. */
381
382 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
383
384 /*================================ Prototypes =============================== */
385
386 static void freeStringObject(robj *o);
387 static void freeListObject(robj *o);
388 static void freeSetObject(robj *o);
389 static void decrRefCount(void *o);
390 static robj *createObject(int type, void *ptr);
391 static void freeClient(redisClient *c);
392 static int rdbLoad(char *filename);
393 static void addReply(redisClient *c, robj *obj);
394 static void addReplySds(redisClient *c, sds s);
395 static void incrRefCount(robj *o);
396 static int rdbSaveBackground(char *filename);
397 static robj *createStringObject(char *ptr, size_t len);
398 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
399 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
400 static int syncWithMaster(void);
401 static robj *tryObjectSharing(robj *o);
402 static int tryObjectEncoding(robj *o);
403 static robj *getDecodedObject(robj *o);
404 static int removeExpire(redisDb *db, robj *key);
405 static int expireIfNeeded(redisDb *db, robj *key);
406 static int deleteIfVolatile(redisDb *db, robj *key);
407 static int deleteKey(redisDb *db, robj *key);
408 static time_t getExpire(redisDb *db, robj *key);
409 static int setExpire(redisDb *db, robj *key, time_t when);
410 static void updateSlavesWaitingBgsave(int bgsaveerr);
411 static void freeMemoryIfNeeded(void);
412 static int processCommand(redisClient *c);
413 static void setupSigSegvAction(void);
414 static void rdbRemoveTempFile(pid_t childpid);
415 static void aofRemoveTempFile(pid_t childpid);
416 static size_t stringObjectLen(robj *o);
417 static void processInputBuffer(redisClient *c);
418 static zskiplist *zslCreate(void);
419 static void zslFree(zskiplist *zsl);
420 static void zslInsert(zskiplist *zsl, double score, robj *obj);
421 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
422
423 static void authCommand(redisClient *c);
424 static void pingCommand(redisClient *c);
425 static void echoCommand(redisClient *c);
426 static void setCommand(redisClient *c);
427 static void setnxCommand(redisClient *c);
428 static void getCommand(redisClient *c);
429 static void delCommand(redisClient *c);
430 static void existsCommand(redisClient *c);
431 static void incrCommand(redisClient *c);
432 static void decrCommand(redisClient *c);
433 static void incrbyCommand(redisClient *c);
434 static void decrbyCommand(redisClient *c);
435 static void selectCommand(redisClient *c);
436 static void randomkeyCommand(redisClient *c);
437 static void keysCommand(redisClient *c);
438 static void dbsizeCommand(redisClient *c);
439 static void lastsaveCommand(redisClient *c);
440 static void saveCommand(redisClient *c);
441 static void bgsaveCommand(redisClient *c);
442 static void bgrewriteaofCommand(redisClient *c);
443 static void shutdownCommand(redisClient *c);
444 static void moveCommand(redisClient *c);
445 static void renameCommand(redisClient *c);
446 static void renamenxCommand(redisClient *c);
447 static void lpushCommand(redisClient *c);
448 static void rpushCommand(redisClient *c);
449 static void lpopCommand(redisClient *c);
450 static void rpopCommand(redisClient *c);
451 static void llenCommand(redisClient *c);
452 static void lindexCommand(redisClient *c);
453 static void lrangeCommand(redisClient *c);
454 static void ltrimCommand(redisClient *c);
455 static void typeCommand(redisClient *c);
456 static void lsetCommand(redisClient *c);
457 static void saddCommand(redisClient *c);
458 static void sremCommand(redisClient *c);
459 static void smoveCommand(redisClient *c);
460 static void sismemberCommand(redisClient *c);
461 static void scardCommand(redisClient *c);
462 static void spopCommand(redisClient *c);
463 static void srandmemberCommand(redisClient *c);
464 static void sinterCommand(redisClient *c);
465 static void sinterstoreCommand(redisClient *c);
466 static void sunionCommand(redisClient *c);
467 static void sunionstoreCommand(redisClient *c);
468 static void sdiffCommand(redisClient *c);
469 static void sdiffstoreCommand(redisClient *c);
470 static void syncCommand(redisClient *c);
471 static void flushdbCommand(redisClient *c);
472 static void flushallCommand(redisClient *c);
473 static void sortCommand(redisClient *c);
474 static void lremCommand(redisClient *c);
475 static void rpoplpushcommand(redisClient *c);
476 static void infoCommand(redisClient *c);
477 static void mgetCommand(redisClient *c);
478 static void monitorCommand(redisClient *c);
479 static void expireCommand(redisClient *c);
480 static void expireatCommand(redisClient *c);
481 static void getsetCommand(redisClient *c);
482 static void ttlCommand(redisClient *c);
483 static void slaveofCommand(redisClient *c);
484 static void debugCommand(redisClient *c);
485 static void msetCommand(redisClient *c);
486 static void msetnxCommand(redisClient *c);
487 static void zaddCommand(redisClient *c);
488 static void zincrbyCommand(redisClient *c);
489 static void zrangeCommand(redisClient *c);
490 static void zrangebyscoreCommand(redisClient *c);
491 static void zrevrangeCommand(redisClient *c);
492 static void zcardCommand(redisClient *c);
493 static void zremCommand(redisClient *c);
494 static void zscoreCommand(redisClient *c);
495 static void zremrangebyscoreCommand(redisClient *c);
496
497 /*================================= Globals ================================= */
498
499 /* Global vars */
500 static struct redisServer server; /* server global state */
501 static struct redisCommand cmdTable[] = {
502 {"get",getCommand,2,REDIS_CMD_INLINE},
503 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
504 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
505 {"del",delCommand,-2,REDIS_CMD_INLINE},
506 {"exists",existsCommand,2,REDIS_CMD_INLINE},
507 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
508 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
509 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
510 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
511 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
512 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
513 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
514 {"llen",llenCommand,2,REDIS_CMD_INLINE},
515 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
516 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
517 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
518 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
519 {"lrem",lremCommand,4,REDIS_CMD_BULK},
520 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
521 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
522 {"srem",sremCommand,3,REDIS_CMD_BULK},
523 {"smove",smoveCommand,4,REDIS_CMD_BULK},
524 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
525 {"scard",scardCommand,2,REDIS_CMD_INLINE},
526 {"spop",spopCommand,2,REDIS_CMD_INLINE},
527 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
528 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
529 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
530 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
531 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
532 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
533 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
534 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
535 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
536 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
537 {"zrem",zremCommand,3,REDIS_CMD_BULK},
538 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
539 {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE},
540 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
541 {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE},
542 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
543 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
544 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
545 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
546 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
547 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
548 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
549 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
550 {"select",selectCommand,2,REDIS_CMD_INLINE},
551 {"move",moveCommand,3,REDIS_CMD_INLINE},
552 {"rename",renameCommand,3,REDIS_CMD_INLINE},
553 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
554 {"expire",expireCommand,3,REDIS_CMD_INLINE},
555 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
556 {"keys",keysCommand,2,REDIS_CMD_INLINE},
557 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
558 {"auth",authCommand,2,REDIS_CMD_INLINE},
559 {"ping",pingCommand,1,REDIS_CMD_INLINE},
560 {"echo",echoCommand,2,REDIS_CMD_BULK},
561 {"save",saveCommand,1,REDIS_CMD_INLINE},
562 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
563 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
564 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
565 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
566 {"type",typeCommand,2,REDIS_CMD_INLINE},
567 {"sync",syncCommand,1,REDIS_CMD_INLINE},
568 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
569 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
570 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
571 {"info",infoCommand,1,REDIS_CMD_INLINE},
572 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
573 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
574 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
575 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
576 {NULL,NULL,0,0}
577 };
578
579 /*============================ Utility functions ============================ */
580
581 /* Glob-style pattern matching. */
582 int stringmatchlen(const char *pattern, int patternLen,
583 const char *string, int stringLen, int nocase)
584 {
585 while(patternLen) {
586 switch(pattern[0]) {
587 case '*':
588 while (pattern[1] == '*') {
589 pattern++;
590 patternLen--;
591 }
592 if (patternLen == 1)
593 return 1; /* match */
594 while(stringLen) {
595 if (stringmatchlen(pattern+1, patternLen-1,
596 string, stringLen, nocase))
597 return 1; /* match */
598 string++;
599 stringLen--;
600 }
601 return 0; /* no match */
602 break;
603 case '?':
604 if (stringLen == 0)
605 return 0; /* no match */
606 string++;
607 stringLen--;
608 break;
609 case '[':
610 {
611 int not, match;
612
613 pattern++;
614 patternLen--;
615 not = pattern[0] == '^';
616 if (not) {
617 pattern++;
618 patternLen--;
619 }
620 match = 0;
621 while(1) {
622 if (pattern[0] == '\\') {
623 pattern++;
624 patternLen--;
625 if (pattern[0] == string[0])
626 match = 1;
627 } else if (pattern[0] == ']') {
628 break;
629 } else if (patternLen == 0) {
630 pattern--;
631 patternLen++;
632 break;
633 } else if (pattern[1] == '-' && patternLen >= 3) {
634 int start = pattern[0];
635 int end = pattern[2];
636 int c = string[0];
637 if (start > end) {
638 int t = start;
639 start = end;
640 end = t;
641 }
642 if (nocase) {
643 start = tolower(start);
644 end = tolower(end);
645 c = tolower(c);
646 }
647 pattern += 2;
648 patternLen -= 2;
649 if (c >= start && c <= end)
650 match = 1;
651 } else {
652 if (!nocase) {
653 if (pattern[0] == string[0])
654 match = 1;
655 } else {
656 if (tolower((int)pattern[0]) == tolower((int)string[0]))
657 match = 1;
658 }
659 }
660 pattern++;
661 patternLen--;
662 }
663 if (not)
664 match = !match;
665 if (!match)
666 return 0; /* no match */
667 string++;
668 stringLen--;
669 break;
670 }
671 case '\\':
672 if (patternLen >= 2) {
673 pattern++;
674 patternLen--;
675 }
676 /* fall through */
677 default:
678 if (!nocase) {
679 if (pattern[0] != string[0])
680 return 0; /* no match */
681 } else {
682 if (tolower((int)pattern[0]) != tolower((int)string[0]))
683 return 0; /* no match */
684 }
685 string++;
686 stringLen--;
687 break;
688 }
689 pattern++;
690 patternLen--;
691 if (stringLen == 0) {
692 while(*pattern == '*') {
693 pattern++;
694 patternLen--;
695 }
696 break;
697 }
698 }
699 if (patternLen == 0 && stringLen == 0)
700 return 1;
701 return 0;
702 }
703
704 static void redisLog(int level, const char *fmt, ...) {
705 va_list ap;
706 FILE *fp;
707
708 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
709 if (!fp) return;
710
711 va_start(ap, fmt);
712 if (level >= server.verbosity) {
713 char *c = ".-*";
714 char buf[64];
715 time_t now;
716
717 now = time(NULL);
718 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
719 fprintf(fp,"%s %c ",buf,c[level]);
720 vfprintf(fp, fmt, ap);
721 fprintf(fp,"\n");
722 fflush(fp);
723 }
724 va_end(ap);
725
726 if (server.logfile) fclose(fp);
727 }
728
729 /*====================== Hash table type implementation ==================== */
730
731 /* This is an hash table type that uses the SDS dynamic strings libary as
732 * keys and radis objects as values (objects can hold SDS strings,
733 * lists, sets). */
734
735 static void dictVanillaFree(void *privdata, void *val)
736 {
737 DICT_NOTUSED(privdata);
738 zfree(val);
739 }
740
741 static int sdsDictKeyCompare(void *privdata, const void *key1,
742 const void *key2)
743 {
744 int l1,l2;
745 DICT_NOTUSED(privdata);
746
747 l1 = sdslen((sds)key1);
748 l2 = sdslen((sds)key2);
749 if (l1 != l2) return 0;
750 return memcmp(key1, key2, l1) == 0;
751 }
752
753 static void dictRedisObjectDestructor(void *privdata, void *val)
754 {
755 DICT_NOTUSED(privdata);
756
757 decrRefCount(val);
758 }
759
760 static int dictObjKeyCompare(void *privdata, const void *key1,
761 const void *key2)
762 {
763 const robj *o1 = key1, *o2 = key2;
764 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
765 }
766
767 static unsigned int dictObjHash(const void *key) {
768 const robj *o = key;
769 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
770 }
771
772 static int dictEncObjKeyCompare(void *privdata, const void *key1,
773 const void *key2)
774 {
775 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
776 int cmp;
777
778 o1 = getDecodedObject(o1);
779 o2 = getDecodedObject(o2);
780 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
781 decrRefCount(o1);
782 decrRefCount(o2);
783 return cmp;
784 }
785
786 static unsigned int dictEncObjHash(const void *key) {
787 robj *o = (robj*) key;
788
789 o = getDecodedObject(o);
790 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
791 decrRefCount(o);
792 return hash;
793 }
794
795 static dictType setDictType = {
796 dictEncObjHash, /* hash function */
797 NULL, /* key dup */
798 NULL, /* val dup */
799 dictEncObjKeyCompare, /* key compare */
800 dictRedisObjectDestructor, /* key destructor */
801 NULL /* val destructor */
802 };
803
804 static dictType zsetDictType = {
805 dictEncObjHash, /* hash function */
806 NULL, /* key dup */
807 NULL, /* val dup */
808 dictEncObjKeyCompare, /* key compare */
809 dictRedisObjectDestructor, /* key destructor */
810 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
811 };
812
813 static dictType hashDictType = {
814 dictObjHash, /* hash function */
815 NULL, /* key dup */
816 NULL, /* val dup */
817 dictObjKeyCompare, /* key compare */
818 dictRedisObjectDestructor, /* key destructor */
819 dictRedisObjectDestructor /* val destructor */
820 };
821
822 /* ========================= Random utility functions ======================= */
823
824 /* Redis generally does not try to recover from out of memory conditions
825 * when allocating objects or strings, it is not clear if it will be possible
826 * to report this condition to the client since the networking layer itself
827 * is based on heap allocation for send buffers, so we simply abort.
828 * At least the code will be simpler to read... */
829 static void oom(const char *msg) {
830 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
831 sleep(1);
832 abort();
833 }
834
835 /* ====================== Redis server networking stuff ===================== */
836 static void closeTimedoutClients(void) {
837 redisClient *c;
838 listNode *ln;
839 time_t now = time(NULL);
840
841 listRewind(server.clients);
842 while ((ln = listYield(server.clients)) != NULL) {
843 c = listNodeValue(ln);
844 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
845 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
846 (now - c->lastinteraction > server.maxidletime)) {
847 redisLog(REDIS_DEBUG,"Closing idle client");
848 freeClient(c);
849 }
850 }
851 }
852
853 static int htNeedsResize(dict *dict) {
854 long long size, used;
855
856 size = dictSlots(dict);
857 used = dictSize(dict);
858 return (size && used && size > DICT_HT_INITIAL_SIZE &&
859 (used*100/size < REDIS_HT_MINFILL));
860 }
861
862 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
863 * we resize the hash table to save memory */
864 static void tryResizeHashTables(void) {
865 int j;
866
867 for (j = 0; j < server.dbnum; j++) {
868 if (htNeedsResize(server.db[j].dict)) {
869 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
870 dictResize(server.db[j].dict);
871 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
872 }
873 if (htNeedsResize(server.db[j].expires))
874 dictResize(server.db[j].expires);
875 }
876 }
877
878 /* A background saving child (BGSAVE) terminated its work. Handle this. */
879 void backgroundSaveDoneHandler(int statloc) {
880 int exitcode = WEXITSTATUS(statloc);
881 int bysignal = WIFSIGNALED(statloc);
882
883 if (!bysignal && exitcode == 0) {
884 redisLog(REDIS_NOTICE,
885 "Background saving terminated with success");
886 server.dirty = 0;
887 server.lastsave = time(NULL);
888 } else if (!bysignal && exitcode != 0) {
889 redisLog(REDIS_WARNING, "Background saving error");
890 } else {
891 redisLog(REDIS_WARNING,
892 "Background saving terminated by signal");
893 rdbRemoveTempFile(server.bgsavechildpid);
894 }
895 server.bgsavechildpid = -1;
896 /* Possibly there are slaves waiting for a BGSAVE in order to be served
897 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
898 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
899 }
900
901 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
902 * Handle this. */
903 void backgroundRewriteDoneHandler(int statloc) {
904 int exitcode = WEXITSTATUS(statloc);
905 int bysignal = WIFSIGNALED(statloc);
906
907 if (!bysignal && exitcode == 0) {
908 int fd;
909 char tmpfile[256];
910
911 redisLog(REDIS_NOTICE,
912 "Background append only file rewriting terminated with success");
913 /* Now it's time to flush the differences accumulated by the parent */
914 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
915 fd = open(tmpfile,O_WRONLY|O_APPEND);
916 if (fd == -1) {
917 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
918 goto cleanup;
919 }
920 /* Flush our data... */
921 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
922 (signed) sdslen(server.bgrewritebuf)) {
923 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
924 close(fd);
925 goto cleanup;
926 }
927 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
928 /* Now our work is to rename the temp file into the stable file. And
929 * switch the file descriptor used by the server for append only. */
930 if (rename(tmpfile,server.appendfilename) == -1) {
931 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
932 close(fd);
933 goto cleanup;
934 }
935 /* Mission completed... almost */
936 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
937 if (server.appendfd != -1) {
938 /* If append only is actually enabled... */
939 close(server.appendfd);
940 server.appendfd = fd;
941 fsync(fd);
942 server.appendseldb = -1; /* Make sure it will issue SELECT */
943 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
944 } else {
945 /* If append only is disabled we just generate a dump in this
946 * format. Why not? */
947 close(fd);
948 }
949 } else if (!bysignal && exitcode != 0) {
950 redisLog(REDIS_WARNING, "Background append only file rewriting error");
951 } else {
952 redisLog(REDIS_WARNING,
953 "Background append only file rewriting terminated by signal");
954 }
955 cleanup:
956 sdsfree(server.bgrewritebuf);
957 server.bgrewritebuf = sdsempty();
958 aofRemoveTempFile(server.bgrewritechildpid);
959 server.bgrewritechildpid = -1;
960 }
961
962 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
963 int j, loops = server.cronloops++;
964 REDIS_NOTUSED(eventLoop);
965 REDIS_NOTUSED(id);
966 REDIS_NOTUSED(clientData);
967
968 /* Update the global state with the amount of used memory */
969 server.usedmemory = zmalloc_used_memory();
970
971 /* Show some info about non-empty databases */
972 for (j = 0; j < server.dbnum; j++) {
973 long long size, used, vkeys;
974
975 size = dictSlots(server.db[j].dict);
976 used = dictSize(server.db[j].dict);
977 vkeys = dictSize(server.db[j].expires);
978 if (!(loops % 5) && (used || vkeys)) {
979 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
980 /* dictPrintStats(server.dict); */
981 }
982 }
983
984 /* We don't want to resize the hash tables while a bacground saving
985 * is in progress: the saving child is created using fork() that is
986 * implemented with a copy-on-write semantic in most modern systems, so
987 * if we resize the HT while there is the saving child at work actually
988 * a lot of memory movements in the parent will cause a lot of pages
989 * copied. */
990 if (server.bgsavechildpid == -1) tryResizeHashTables();
991
992 /* Show information about connected clients */
993 if (!(loops % 5)) {
994 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
995 listLength(server.clients)-listLength(server.slaves),
996 listLength(server.slaves),
997 server.usedmemory,
998 dictSize(server.sharingpool));
999 }
1000
1001 /* Close connections of timedout clients */
1002 if (server.maxidletime && !(loops % 10))
1003 closeTimedoutClients();
1004
1005 /* Check if a background saving or AOF rewrite in progress terminated */
1006 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
1007 int statloc;
1008 pid_t pid;
1009
1010 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1011 if (pid == server.bgsavechildpid) {
1012 backgroundSaveDoneHandler(statloc);
1013 } else {
1014 backgroundRewriteDoneHandler(statloc);
1015 }
1016 }
1017 } else {
1018 /* If there is not a background saving in progress check if
1019 * we have to save now */
1020 time_t now = time(NULL);
1021 for (j = 0; j < server.saveparamslen; j++) {
1022 struct saveparam *sp = server.saveparams+j;
1023
1024 if (server.dirty >= sp->changes &&
1025 now-server.lastsave > sp->seconds) {
1026 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1027 sp->changes, sp->seconds);
1028 rdbSaveBackground(server.dbfilename);
1029 break;
1030 }
1031 }
1032 }
1033
1034 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1035 * will use few CPU cycles if there are few expiring keys, otherwise
1036 * it will get more aggressive to avoid that too much memory is used by
1037 * keys that can be removed from the keyspace. */
1038 for (j = 0; j < server.dbnum; j++) {
1039 int expired;
1040 redisDb *db = server.db+j;
1041
1042 /* Continue to expire if at the end of the cycle more than 25%
1043 * of the keys were expired. */
1044 do {
1045 int num = dictSize(db->expires);
1046 time_t now = time(NULL);
1047
1048 expired = 0;
1049 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1050 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1051 while (num--) {
1052 dictEntry *de;
1053 time_t t;
1054
1055 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1056 t = (time_t) dictGetEntryVal(de);
1057 if (now > t) {
1058 deleteKey(db,dictGetEntryKey(de));
1059 expired++;
1060 }
1061 }
1062 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1063 }
1064
1065 /* Check if we should connect to a MASTER */
1066 if (server.replstate == REDIS_REPL_CONNECT) {
1067 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1068 if (syncWithMaster() == REDIS_OK) {
1069 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1070 }
1071 }
1072 return 1000;
1073 }
1074
1075 static void createSharedObjects(void) {
1076 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1077 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1078 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1079 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1080 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1081 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1082 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1083 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1084 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1085 /* no such key */
1086 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1087 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1088 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1089 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1090 "-ERR no such key\r\n"));
1091 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1092 "-ERR syntax error\r\n"));
1093 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1094 "-ERR source and destination objects are the same\r\n"));
1095 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1096 "-ERR index out of range\r\n"));
1097 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1098 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1099 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1100 shared.select0 = createStringObject("select 0\r\n",10);
1101 shared.select1 = createStringObject("select 1\r\n",10);
1102 shared.select2 = createStringObject("select 2\r\n",10);
1103 shared.select3 = createStringObject("select 3\r\n",10);
1104 shared.select4 = createStringObject("select 4\r\n",10);
1105 shared.select5 = createStringObject("select 5\r\n",10);
1106 shared.select6 = createStringObject("select 6\r\n",10);
1107 shared.select7 = createStringObject("select 7\r\n",10);
1108 shared.select8 = createStringObject("select 8\r\n",10);
1109 shared.select9 = createStringObject("select 9\r\n",10);
1110 }
1111
1112 static void appendServerSaveParams(time_t seconds, int changes) {
1113 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1114 server.saveparams[server.saveparamslen].seconds = seconds;
1115 server.saveparams[server.saveparamslen].changes = changes;
1116 server.saveparamslen++;
1117 }
1118
1119 static void resetServerSaveParams() {
1120 zfree(server.saveparams);
1121 server.saveparams = NULL;
1122 server.saveparamslen = 0;
1123 }
1124
1125 static void initServerConfig() {
1126 server.dbnum = REDIS_DEFAULT_DBNUM;
1127 server.port = REDIS_SERVERPORT;
1128 server.verbosity = REDIS_DEBUG;
1129 server.maxidletime = REDIS_MAXIDLETIME;
1130 server.saveparams = NULL;
1131 server.logfile = NULL; /* NULL = log on standard output */
1132 server.bindaddr = NULL;
1133 server.glueoutputbuf = 1;
1134 server.daemonize = 0;
1135 server.appendonly = 0;
1136 server.appendfsync = APPENDFSYNC_ALWAYS;
1137 server.lastfsync = time(NULL);
1138 server.appendfd = -1;
1139 server.appendseldb = -1; /* Make sure the first time will not match */
1140 server.pidfile = "/var/run/redis.pid";
1141 server.dbfilename = "dump.rdb";
1142 server.appendfilename = "appendonly.aof";
1143 server.requirepass = NULL;
1144 server.shareobjects = 0;
1145 server.rdbcompression = 1;
1146 server.sharingpoolsize = 1024;
1147 server.maxclients = 0;
1148 server.maxmemory = 0;
1149 resetServerSaveParams();
1150
1151 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1152 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1153 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1154 /* Replication related */
1155 server.isslave = 0;
1156 server.masterauth = NULL;
1157 server.masterhost = NULL;
1158 server.masterport = 6379;
1159 server.master = NULL;
1160 server.replstate = REDIS_REPL_NONE;
1161
1162 /* Double constants initialization */
1163 R_Zero = 0.0;
1164 R_PosInf = 1.0/R_Zero;
1165 R_NegInf = -1.0/R_Zero;
1166 R_Nan = R_Zero/R_Zero;
1167 }
1168
1169 static void initServer() {
1170 int j;
1171
1172 signal(SIGHUP, SIG_IGN);
1173 signal(SIGPIPE, SIG_IGN);
1174 setupSigSegvAction();
1175
1176 server.clients = listCreate();
1177 server.slaves = listCreate();
1178 server.monitors = listCreate();
1179 server.objfreelist = listCreate();
1180 createSharedObjects();
1181 server.el = aeCreateEventLoop();
1182 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1183 server.sharingpool = dictCreate(&setDictType,NULL);
1184 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1185 if (server.fd == -1) {
1186 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1187 exit(1);
1188 }
1189 for (j = 0; j < server.dbnum; j++) {
1190 server.db[j].dict = dictCreate(&hashDictType,NULL);
1191 server.db[j].expires = dictCreate(&setDictType,NULL);
1192 server.db[j].id = j;
1193 }
1194 server.cronloops = 0;
1195 server.bgsavechildpid = -1;
1196 server.bgrewritechildpid = -1;
1197 server.bgrewritebuf = sdsempty();
1198 server.lastsave = time(NULL);
1199 server.dirty = 0;
1200 server.usedmemory = 0;
1201 server.stat_numcommands = 0;
1202 server.stat_numconnections = 0;
1203 server.stat_starttime = time(NULL);
1204 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1205
1206 if (server.appendonly) {
1207 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1208 if (server.appendfd == -1) {
1209 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1210 strerror(errno));
1211 exit(1);
1212 }
1213 }
1214 }
1215
1216 /* Empty the whole database */
1217 static long long emptyDb() {
1218 int j;
1219 long long removed = 0;
1220
1221 for (j = 0; j < server.dbnum; j++) {
1222 removed += dictSize(server.db[j].dict);
1223 dictEmpty(server.db[j].dict);
1224 dictEmpty(server.db[j].expires);
1225 }
1226 return removed;
1227 }
1228
1229 static int yesnotoi(char *s) {
1230 if (!strcasecmp(s,"yes")) return 1;
1231 else if (!strcasecmp(s,"no")) return 0;
1232 else return -1;
1233 }
1234
1235 /* I agree, this is a very rudimental way to load a configuration...
1236 will improve later if the config gets more complex */
1237 static void loadServerConfig(char *filename) {
1238 FILE *fp;
1239 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1240 int linenum = 0;
1241 sds line = NULL;
1242
1243 if (filename[0] == '-' && filename[1] == '\0')
1244 fp = stdin;
1245 else {
1246 if ((fp = fopen(filename,"r")) == NULL) {
1247 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1248 exit(1);
1249 }
1250 }
1251
1252 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1253 sds *argv;
1254 int argc, j;
1255
1256 linenum++;
1257 line = sdsnew(buf);
1258 line = sdstrim(line," \t\r\n");
1259
1260 /* Skip comments and blank lines*/
1261 if (line[0] == '#' || line[0] == '\0') {
1262 sdsfree(line);
1263 continue;
1264 }
1265
1266 /* Split into arguments */
1267 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1268 sdstolower(argv[0]);
1269
1270 /* Execute config directives */
1271 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1272 server.maxidletime = atoi(argv[1]);
1273 if (server.maxidletime < 0) {
1274 err = "Invalid timeout value"; goto loaderr;
1275 }
1276 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1277 server.port = atoi(argv[1]);
1278 if (server.port < 1 || server.port > 65535) {
1279 err = "Invalid port"; goto loaderr;
1280 }
1281 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1282 server.bindaddr = zstrdup(argv[1]);
1283 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1284 int seconds = atoi(argv[1]);
1285 int changes = atoi(argv[2]);
1286 if (seconds < 1 || changes < 0) {
1287 err = "Invalid save parameters"; goto loaderr;
1288 }
1289 appendServerSaveParams(seconds,changes);
1290 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1291 if (chdir(argv[1]) == -1) {
1292 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1293 argv[1], strerror(errno));
1294 exit(1);
1295 }
1296 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1297 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1298 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1299 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1300 else {
1301 err = "Invalid log level. Must be one of debug, notice, warning";
1302 goto loaderr;
1303 }
1304 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1305 FILE *logfp;
1306
1307 server.logfile = zstrdup(argv[1]);
1308 if (!strcasecmp(server.logfile,"stdout")) {
1309 zfree(server.logfile);
1310 server.logfile = NULL;
1311 }
1312 if (server.logfile) {
1313 /* Test if we are able to open the file. The server will not
1314 * be able to abort just for this problem later... */
1315 logfp = fopen(server.logfile,"a");
1316 if (logfp == NULL) {
1317 err = sdscatprintf(sdsempty(),
1318 "Can't open the log file: %s", strerror(errno));
1319 goto loaderr;
1320 }
1321 fclose(logfp);
1322 }
1323 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1324 server.dbnum = atoi(argv[1]);
1325 if (server.dbnum < 1) {
1326 err = "Invalid number of databases"; goto loaderr;
1327 }
1328 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1329 server.maxclients = atoi(argv[1]);
1330 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1331 server.maxmemory = strtoll(argv[1], NULL, 10);
1332 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1333 server.masterhost = sdsnew(argv[1]);
1334 server.masterport = atoi(argv[2]);
1335 server.replstate = REDIS_REPL_CONNECT;
1336 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1337 server.masterauth = zstrdup(argv[1]);
1338 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1339 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1340 err = "argument must be 'yes' or 'no'"; goto loaderr;
1341 }
1342 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1343 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1344 err = "argument must be 'yes' or 'no'"; goto loaderr;
1345 }
1346 } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
1347 if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
1348 err = "argument must be 'yes' or 'no'"; goto loaderr;
1349 }
1350 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1351 server.sharingpoolsize = atoi(argv[1]);
1352 if (server.sharingpoolsize < 1) {
1353 err = "invalid object sharing pool size"; goto loaderr;
1354 }
1355 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1356 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1357 err = "argument must be 'yes' or 'no'"; goto loaderr;
1358 }
1359 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1360 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1361 err = "argument must be 'yes' or 'no'"; goto loaderr;
1362 }
1363 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1364 if (!strcasecmp(argv[1],"no")) {
1365 server.appendfsync = APPENDFSYNC_NO;
1366 } else if (!strcasecmp(argv[1],"always")) {
1367 server.appendfsync = APPENDFSYNC_ALWAYS;
1368 } else if (!strcasecmp(argv[1],"everysec")) {
1369 server.appendfsync = APPENDFSYNC_EVERYSEC;
1370 } else {
1371 err = "argument must be 'no', 'always' or 'everysec'";
1372 goto loaderr;
1373 }
1374 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1375 server.requirepass = zstrdup(argv[1]);
1376 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1377 server.pidfile = zstrdup(argv[1]);
1378 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1379 server.dbfilename = zstrdup(argv[1]);
1380 } else {
1381 err = "Bad directive or wrong number of arguments"; goto loaderr;
1382 }
1383 for (j = 0; j < argc; j++)
1384 sdsfree(argv[j]);
1385 zfree(argv);
1386 sdsfree(line);
1387 }
1388 if (fp != stdin) fclose(fp);
1389 return;
1390
1391 loaderr:
1392 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1393 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1394 fprintf(stderr, ">>> '%s'\n", line);
1395 fprintf(stderr, "%s\n", err);
1396 exit(1);
1397 }
1398
1399 static void freeClientArgv(redisClient *c) {
1400 int j;
1401
1402 for (j = 0; j < c->argc; j++)
1403 decrRefCount(c->argv[j]);
1404 for (j = 0; j < c->mbargc; j++)
1405 decrRefCount(c->mbargv[j]);
1406 c->argc = 0;
1407 c->mbargc = 0;
1408 }
1409
1410 static void freeClient(redisClient *c) {
1411 listNode *ln;
1412
1413 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1414 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1415 sdsfree(c->querybuf);
1416 listRelease(c->reply);
1417 freeClientArgv(c);
1418 close(c->fd);
1419 ln = listSearchKey(server.clients,c);
1420 redisAssert(ln != NULL);
1421 listDelNode(server.clients,ln);
1422 if (c->flags & REDIS_SLAVE) {
1423 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1424 close(c->repldbfd);
1425 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1426 ln = listSearchKey(l,c);
1427 redisAssert(ln != NULL);
1428 listDelNode(l,ln);
1429 }
1430 if (c->flags & REDIS_MASTER) {
1431 server.master = NULL;
1432 server.replstate = REDIS_REPL_CONNECT;
1433 }
1434 zfree(c->argv);
1435 zfree(c->mbargv);
1436 zfree(c);
1437 }
1438
1439 #define GLUEREPLY_UP_TO (1024)
1440 static void glueReplyBuffersIfNeeded(redisClient *c) {
1441 int copylen = 0;
1442 char buf[GLUEREPLY_UP_TO];
1443 listNode *ln;
1444 robj *o;
1445
1446 listRewind(c->reply);
1447 while((ln = listYield(c->reply))) {
1448 int objlen;
1449
1450 o = ln->value;
1451 objlen = sdslen(o->ptr);
1452 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1453 memcpy(buf+copylen,o->ptr,objlen);
1454 copylen += objlen;
1455 listDelNode(c->reply,ln);
1456 } else {
1457 if (copylen == 0) return;
1458 break;
1459 }
1460 }
1461 /* Now the output buffer is empty, add the new single element */
1462 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1463 listAddNodeHead(c->reply,o);
1464 }
1465
1466 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1467 redisClient *c = privdata;
1468 int nwritten = 0, totwritten = 0, objlen;
1469 robj *o;
1470 REDIS_NOTUSED(el);
1471 REDIS_NOTUSED(mask);
1472
1473 /* Use writev() if we have enough buffers to send */
1474 if (!server.glueoutputbuf &&
1475 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1476 !(c->flags & REDIS_MASTER))
1477 {
1478 sendReplyToClientWritev(el, fd, privdata, mask);
1479 return;
1480 }
1481
1482 while(listLength(c->reply)) {
1483 if (server.glueoutputbuf && listLength(c->reply) > 1)
1484 glueReplyBuffersIfNeeded(c);
1485
1486 o = listNodeValue(listFirst(c->reply));
1487 objlen = sdslen(o->ptr);
1488
1489 if (objlen == 0) {
1490 listDelNode(c->reply,listFirst(c->reply));
1491 continue;
1492 }
1493
1494 if (c->flags & REDIS_MASTER) {
1495 /* Don't reply to a master */
1496 nwritten = objlen - c->sentlen;
1497 } else {
1498 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1499 if (nwritten <= 0) break;
1500 }
1501 c->sentlen += nwritten;
1502 totwritten += nwritten;
1503 /* If we fully sent the object on head go to the next one */
1504 if (c->sentlen == objlen) {
1505 listDelNode(c->reply,listFirst(c->reply));
1506 c->sentlen = 0;
1507 }
1508 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1509 * bytes, in a single threaded server it's a good idea to serve
1510 * other clients as well, even if a very large request comes from
1511 * super fast link that is always able to accept data (in real world
1512 * scenario think about 'KEYS *' against the loopback interfae) */
1513 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1514 }
1515 if (nwritten == -1) {
1516 if (errno == EAGAIN) {
1517 nwritten = 0;
1518 } else {
1519 redisLog(REDIS_DEBUG,
1520 "Error writing to client: %s", strerror(errno));
1521 freeClient(c);
1522 return;
1523 }
1524 }
1525 if (totwritten > 0) c->lastinteraction = time(NULL);
1526 if (listLength(c->reply) == 0) {
1527 c->sentlen = 0;
1528 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1529 }
1530 }
1531
1532 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1533 {
1534 redisClient *c = privdata;
1535 int nwritten = 0, totwritten = 0, objlen, willwrite;
1536 robj *o;
1537 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1538 int offset, ion = 0;
1539 REDIS_NOTUSED(el);
1540 REDIS_NOTUSED(mask);
1541
1542 listNode *node;
1543 while (listLength(c->reply)) {
1544 offset = c->sentlen;
1545 ion = 0;
1546 willwrite = 0;
1547
1548 /* fill-in the iov[] array */
1549 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1550 o = listNodeValue(node);
1551 objlen = sdslen(o->ptr);
1552
1553 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1554 break;
1555
1556 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1557 break; /* no more iovecs */
1558
1559 iov[ion].iov_base = ((char*)o->ptr) + offset;
1560 iov[ion].iov_len = objlen - offset;
1561 willwrite += objlen - offset;
1562 offset = 0; /* just for the first item */
1563 ion++;
1564 }
1565
1566 if(willwrite == 0)
1567 break;
1568
1569 /* write all collected blocks at once */
1570 if((nwritten = writev(fd, iov, ion)) < 0) {
1571 if (errno != EAGAIN) {
1572 redisLog(REDIS_DEBUG,
1573 "Error writing to client: %s", strerror(errno));
1574 freeClient(c);
1575 return;
1576 }
1577 break;
1578 }
1579
1580 totwritten += nwritten;
1581 offset = c->sentlen;
1582
1583 /* remove written robjs from c->reply */
1584 while (nwritten && listLength(c->reply)) {
1585 o = listNodeValue(listFirst(c->reply));
1586 objlen = sdslen(o->ptr);
1587
1588 if(nwritten >= objlen - offset) {
1589 listDelNode(c->reply, listFirst(c->reply));
1590 nwritten -= objlen - offset;
1591 c->sentlen = 0;
1592 } else {
1593 /* partial write */
1594 c->sentlen += nwritten;
1595 break;
1596 }
1597 offset = 0;
1598 }
1599 }
1600
1601 if (totwritten > 0)
1602 c->lastinteraction = time(NULL);
1603
1604 if (listLength(c->reply) == 0) {
1605 c->sentlen = 0;
1606 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1607 }
1608 }
1609
1610 static struct redisCommand *lookupCommand(char *name) {
1611 int j = 0;
1612 while(cmdTable[j].name != NULL) {
1613 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1614 j++;
1615 }
1616 return NULL;
1617 }
1618
1619 /* resetClient prepare the client to process the next command */
1620 static void resetClient(redisClient *c) {
1621 freeClientArgv(c);
1622 c->bulklen = -1;
1623 c->multibulk = 0;
1624 }
1625
1626 /* If this function gets called we already read a whole
1627 * command, argments are in the client argv/argc fields.
1628 * processCommand() execute the command or prepare the
1629 * server for a bulk read from the client.
1630 *
1631 * If 1 is returned the client is still alive and valid and
1632 * and other operations can be performed by the caller. Otherwise
1633 * if 0 is returned the client was destroied (i.e. after QUIT). */
1634 static int processCommand(redisClient *c) {
1635 struct redisCommand *cmd;
1636 long long dirty;
1637
1638 /* Free some memory if needed (maxmemory setting) */
1639 if (server.maxmemory) freeMemoryIfNeeded();
1640
1641 /* Handle the multi bulk command type. This is an alternative protocol
1642 * supported by Redis in order to receive commands that are composed of
1643 * multiple binary-safe "bulk" arguments. The latency of processing is
1644 * a bit higher but this allows things like multi-sets, so if this
1645 * protocol is used only for MSET and similar commands this is a big win. */
1646 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1647 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1648 if (c->multibulk <= 0) {
1649 resetClient(c);
1650 return 1;
1651 } else {
1652 decrRefCount(c->argv[c->argc-1]);
1653 c->argc--;
1654 return 1;
1655 }
1656 } else if (c->multibulk) {
1657 if (c->bulklen == -1) {
1658 if (((char*)c->argv[0]->ptr)[0] != '$') {
1659 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1660 resetClient(c);
1661 return 1;
1662 } else {
1663 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1664 decrRefCount(c->argv[0]);
1665 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1666 c->argc--;
1667 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1668 resetClient(c);
1669 return 1;
1670 }
1671 c->argc--;
1672 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1673 return 1;
1674 }
1675 } else {
1676 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1677 c->mbargv[c->mbargc] = c->argv[0];
1678 c->mbargc++;
1679 c->argc--;
1680 c->multibulk--;
1681 if (c->multibulk == 0) {
1682 robj **auxargv;
1683 int auxargc;
1684
1685 /* Here we need to swap the multi-bulk argc/argv with the
1686 * normal argc/argv of the client structure. */
1687 auxargv = c->argv;
1688 c->argv = c->mbargv;
1689 c->mbargv = auxargv;
1690
1691 auxargc = c->argc;
1692 c->argc = c->mbargc;
1693 c->mbargc = auxargc;
1694
1695 /* We need to set bulklen to something different than -1
1696 * in order for the code below to process the command without
1697 * to try to read the last argument of a bulk command as
1698 * a special argument. */
1699 c->bulklen = 0;
1700 /* continue below and process the command */
1701 } else {
1702 c->bulklen = -1;
1703 return 1;
1704 }
1705 }
1706 }
1707 /* -- end of multi bulk commands processing -- */
1708
1709 /* The QUIT command is handled as a special case. Normal command
1710 * procs are unable to close the client connection safely */
1711 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1712 freeClient(c);
1713 return 0;
1714 }
1715 cmd = lookupCommand(c->argv[0]->ptr);
1716 if (!cmd) {
1717 addReplySds(c,
1718 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1719 (char*)c->argv[0]->ptr));
1720 resetClient(c);
1721 return 1;
1722 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1723 (c->argc < -cmd->arity)) {
1724 addReplySds(c,
1725 sdscatprintf(sdsempty(),
1726 "-ERR wrong number of arguments for '%s' command\r\n",
1727 cmd->name));
1728 resetClient(c);
1729 return 1;
1730 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1731 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1732 resetClient(c);
1733 return 1;
1734 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1735 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1736
1737 decrRefCount(c->argv[c->argc-1]);
1738 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1739 c->argc--;
1740 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1741 resetClient(c);
1742 return 1;
1743 }
1744 c->argc--;
1745 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1746 /* It is possible that the bulk read is already in the
1747 * buffer. Check this condition and handle it accordingly.
1748 * This is just a fast path, alternative to call processInputBuffer().
1749 * It's a good idea since the code is small and this condition
1750 * happens most of the times. */
1751 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1752 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1753 c->argc++;
1754 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1755 } else {
1756 return 1;
1757 }
1758 }
1759 /* Let's try to share objects on the command arguments vector */
1760 if (server.shareobjects) {
1761 int j;
1762 for(j = 1; j < c->argc; j++)
1763 c->argv[j] = tryObjectSharing(c->argv[j]);
1764 }
1765 /* Let's try to encode the bulk object to save space. */
1766 if (cmd->flags & REDIS_CMD_BULK)
1767 tryObjectEncoding(c->argv[c->argc-1]);
1768
1769 /* Check if the user is authenticated */
1770 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1771 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1772 resetClient(c);
1773 return 1;
1774 }
1775
1776 /* Exec the command */
1777 dirty = server.dirty;
1778 cmd->proc(c);
1779 if (server.appendonly && server.dirty-dirty)
1780 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1781 if (server.dirty-dirty && listLength(server.slaves))
1782 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1783 if (listLength(server.monitors))
1784 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1785 server.stat_numcommands++;
1786
1787 /* Prepare the client for the next command */
1788 if (c->flags & REDIS_CLOSE) {
1789 freeClient(c);
1790 return 0;
1791 }
1792 resetClient(c);
1793 return 1;
1794 }
1795
1796 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1797 listNode *ln;
1798 int outc = 0, j;
1799 robj **outv;
1800 /* (args*2)+1 is enough room for args, spaces, newlines */
1801 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1802
1803 if (argc <= REDIS_STATIC_ARGS) {
1804 outv = static_outv;
1805 } else {
1806 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1807 }
1808
1809 for (j = 0; j < argc; j++) {
1810 if (j != 0) outv[outc++] = shared.space;
1811 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1812 robj *lenobj;
1813
1814 lenobj = createObject(REDIS_STRING,
1815 sdscatprintf(sdsempty(),"%lu\r\n",
1816 (unsigned long) stringObjectLen(argv[j])));
1817 lenobj->refcount = 0;
1818 outv[outc++] = lenobj;
1819 }
1820 outv[outc++] = argv[j];
1821 }
1822 outv[outc++] = shared.crlf;
1823
1824 /* Increment all the refcounts at start and decrement at end in order to
1825 * be sure to free objects if there is no slave in a replication state
1826 * able to be feed with commands */
1827 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1828 listRewind(slaves);
1829 while((ln = listYield(slaves))) {
1830 redisClient *slave = ln->value;
1831
1832 /* Don't feed slaves that are still waiting for BGSAVE to start */
1833 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1834
1835 /* Feed all the other slaves, MONITORs and so on */
1836 if (slave->slaveseldb != dictid) {
1837 robj *selectcmd;
1838
1839 switch(dictid) {
1840 case 0: selectcmd = shared.select0; break;
1841 case 1: selectcmd = shared.select1; break;
1842 case 2: selectcmd = shared.select2; break;
1843 case 3: selectcmd = shared.select3; break;
1844 case 4: selectcmd = shared.select4; break;
1845 case 5: selectcmd = shared.select5; break;
1846 case 6: selectcmd = shared.select6; break;
1847 case 7: selectcmd = shared.select7; break;
1848 case 8: selectcmd = shared.select8; break;
1849 case 9: selectcmd = shared.select9; break;
1850 default:
1851 selectcmd = createObject(REDIS_STRING,
1852 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1853 selectcmd->refcount = 0;
1854 break;
1855 }
1856 addReply(slave,selectcmd);
1857 slave->slaveseldb = dictid;
1858 }
1859 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1860 }
1861 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1862 if (outv != static_outv) zfree(outv);
1863 }
1864
1865 static void processInputBuffer(redisClient *c) {
1866 again:
1867 if (c->bulklen == -1) {
1868 /* Read the first line of the query */
1869 char *p = strchr(c->querybuf,'\n');
1870 size_t querylen;
1871
1872 if (p) {
1873 sds query, *argv;
1874 int argc, j;
1875
1876 query = c->querybuf;
1877 c->querybuf = sdsempty();
1878 querylen = 1+(p-(query));
1879 if (sdslen(query) > querylen) {
1880 /* leave data after the first line of the query in the buffer */
1881 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1882 }
1883 *p = '\0'; /* remove "\n" */
1884 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1885 sdsupdatelen(query);
1886
1887 /* Now we can split the query in arguments */
1888 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1889 sdsfree(query);
1890
1891 if (c->argv) zfree(c->argv);
1892 c->argv = zmalloc(sizeof(robj*)*argc);
1893
1894 for (j = 0; j < argc; j++) {
1895 if (sdslen(argv[j])) {
1896 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1897 c->argc++;
1898 } else {
1899 sdsfree(argv[j]);
1900 }
1901 }
1902 zfree(argv);
1903 if (c->argc) {
1904 /* Execute the command. If the client is still valid
1905 * after processCommand() return and there is something
1906 * on the query buffer try to process the next command. */
1907 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1908 } else {
1909 /* Nothing to process, argc == 0. Just process the query
1910 * buffer if it's not empty or return to the caller */
1911 if (sdslen(c->querybuf)) goto again;
1912 }
1913 return;
1914 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1915 redisLog(REDIS_DEBUG, "Client protocol error");
1916 freeClient(c);
1917 return;
1918 }
1919 } else {
1920 /* Bulk read handling. Note that if we are at this point
1921 the client already sent a command terminated with a newline,
1922 we are reading the bulk data that is actually the last
1923 argument of the command. */
1924 int qbl = sdslen(c->querybuf);
1925
1926 if (c->bulklen <= qbl) {
1927 /* Copy everything but the final CRLF as final argument */
1928 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1929 c->argc++;
1930 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1931 /* Process the command. If the client is still valid after
1932 * the processing and there is more data in the buffer
1933 * try to parse it. */
1934 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1935 return;
1936 }
1937 }
1938 }
1939
1940 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1941 redisClient *c = (redisClient*) privdata;
1942 char buf[REDIS_IOBUF_LEN];
1943 int nread;
1944 REDIS_NOTUSED(el);
1945 REDIS_NOTUSED(mask);
1946
1947 nread = read(fd, buf, REDIS_IOBUF_LEN);
1948 if (nread == -1) {
1949 if (errno == EAGAIN) {
1950 nread = 0;
1951 } else {
1952 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1953 freeClient(c);
1954 return;
1955 }
1956 } else if (nread == 0) {
1957 redisLog(REDIS_DEBUG, "Client closed connection");
1958 freeClient(c);
1959 return;
1960 }
1961 if (nread) {
1962 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1963 c->lastinteraction = time(NULL);
1964 } else {
1965 return;
1966 }
1967 processInputBuffer(c);
1968 }
1969
1970 static int selectDb(redisClient *c, int id) {
1971 if (id < 0 || id >= server.dbnum)
1972 return REDIS_ERR;
1973 c->db = &server.db[id];
1974 return REDIS_OK;
1975 }
1976
1977 static void *dupClientReplyValue(void *o) {
1978 incrRefCount((robj*)o);
1979 return 0;
1980 }
1981
1982 static redisClient *createClient(int fd) {
1983 redisClient *c = zmalloc(sizeof(*c));
1984
1985 anetNonBlock(NULL,fd);
1986 anetTcpNoDelay(NULL,fd);
1987 if (!c) return NULL;
1988 selectDb(c,0);
1989 c->fd = fd;
1990 c->querybuf = sdsempty();
1991 c->argc = 0;
1992 c->argv = NULL;
1993 c->bulklen = -1;
1994 c->multibulk = 0;
1995 c->mbargc = 0;
1996 c->mbargv = NULL;
1997 c->sentlen = 0;
1998 c->flags = 0;
1999 c->lastinteraction = time(NULL);
2000 c->authenticated = 0;
2001 c->replstate = REDIS_REPL_NONE;
2002 c->reply = listCreate();
2003 listSetFreeMethod(c->reply,decrRefCount);
2004 listSetDupMethod(c->reply,dupClientReplyValue);
2005 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
2006 readQueryFromClient, c) == AE_ERR) {
2007 freeClient(c);
2008 return NULL;
2009 }
2010 listAddNodeTail(server.clients,c);
2011 return c;
2012 }
2013
2014 static void addReply(redisClient *c, robj *obj) {
2015 if (listLength(c->reply) == 0 &&
2016 (c->replstate == REDIS_REPL_NONE ||
2017 c->replstate == REDIS_REPL_ONLINE) &&
2018 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
2019 sendReplyToClient, c) == AE_ERR) return;
2020 listAddNodeTail(c->reply,getDecodedObject(obj));
2021 }
2022
2023 static void addReplySds(redisClient *c, sds s) {
2024 robj *o = createObject(REDIS_STRING,s);
2025 addReply(c,o);
2026 decrRefCount(o);
2027 }
2028
2029 static void addReplyDouble(redisClient *c, double d) {
2030 char buf[128];
2031
2032 snprintf(buf,sizeof(buf),"%.17g",d);
2033 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2034 (unsigned long) strlen(buf),buf));
2035 }
2036
2037 static void addReplyBulkLen(redisClient *c, robj *obj) {
2038 size_t len;
2039
2040 if (obj->encoding == REDIS_ENCODING_RAW) {
2041 len = sdslen(obj->ptr);
2042 } else {
2043 long n = (long)obj->ptr;
2044
2045 /* Compute how many bytes will take this integer as a radix 10 string */
2046 len = 1;
2047 if (n < 0) {
2048 len++;
2049 n = -n;
2050 }
2051 while((n = n/10) != 0) {
2052 len++;
2053 }
2054 }
2055 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
2056 }
2057
2058 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2059 int cport, cfd;
2060 char cip[128];
2061 redisClient *c;
2062 REDIS_NOTUSED(el);
2063 REDIS_NOTUSED(mask);
2064 REDIS_NOTUSED(privdata);
2065
2066 cfd = anetAccept(server.neterr, fd, cip, &cport);
2067 if (cfd == AE_ERR) {
2068 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2069 return;
2070 }
2071 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2072 if ((c = createClient(cfd)) == NULL) {
2073 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2074 close(cfd); /* May be already closed, just ingore errors */
2075 return;
2076 }
2077 /* If maxclient directive is set and this is one client more... close the
2078 * connection. Note that we create the client instead to check before
2079 * for this condition, since now the socket is already set in nonblocking
2080 * mode and we can send an error for free using the Kernel I/O */
2081 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2082 char *err = "-ERR max number of clients reached\r\n";
2083
2084 /* That's a best effort error message, don't check write errors */
2085 if (write(c->fd,err,strlen(err)) == -1) {
2086 /* Nothing to do, Just to avoid the warning... */
2087 }
2088 freeClient(c);
2089 return;
2090 }
2091 server.stat_numconnections++;
2092 }
2093
2094 /* ======================= Redis objects implementation ===================== */
2095
2096 static robj *createObject(int type, void *ptr) {
2097 robj *o;
2098
2099 if (listLength(server.objfreelist)) {
2100 listNode *head = listFirst(server.objfreelist);
2101 o = listNodeValue(head);
2102 listDelNode(server.objfreelist,head);
2103 } else {
2104 o = zmalloc(sizeof(*o));
2105 }
2106 o->type = type;
2107 o->encoding = REDIS_ENCODING_RAW;
2108 o->ptr = ptr;
2109 o->refcount = 1;
2110 return o;
2111 }
2112
2113 static robj *createStringObject(char *ptr, size_t len) {
2114 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2115 }
2116
2117 static robj *createListObject(void) {
2118 list *l = listCreate();
2119
2120 listSetFreeMethod(l,decrRefCount);
2121 return createObject(REDIS_LIST,l);
2122 }
2123
2124 static robj *createSetObject(void) {
2125 dict *d = dictCreate(&setDictType,NULL);
2126 return createObject(REDIS_SET,d);
2127 }
2128
2129 static robj *createZsetObject(void) {
2130 zset *zs = zmalloc(sizeof(*zs));
2131
2132 zs->dict = dictCreate(&zsetDictType,NULL);
2133 zs->zsl = zslCreate();
2134 return createObject(REDIS_ZSET,zs);
2135 }
2136
2137 static void freeStringObject(robj *o) {
2138 if (o->encoding == REDIS_ENCODING_RAW) {
2139 sdsfree(o->ptr);
2140 }
2141 }
2142
2143 static void freeListObject(robj *o) {
2144 listRelease((list*) o->ptr);
2145 }
2146
2147 static void freeSetObject(robj *o) {
2148 dictRelease((dict*) o->ptr);
2149 }
2150
2151 static void freeZsetObject(robj *o) {
2152 zset *zs = o->ptr;
2153
2154 dictRelease(zs->dict);
2155 zslFree(zs->zsl);
2156 zfree(zs);
2157 }
2158
2159 static void freeHashObject(robj *o) {
2160 dictRelease((dict*) o->ptr);
2161 }
2162
2163 static void incrRefCount(robj *o) {
2164 o->refcount++;
2165 #ifdef DEBUG_REFCOUNT
2166 if (o->type == REDIS_STRING)
2167 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2168 #endif
2169 }
2170
2171 static void decrRefCount(void *obj) {
2172 robj *o = obj;
2173
2174 #ifdef DEBUG_REFCOUNT
2175 if (o->type == REDIS_STRING)
2176 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2177 #endif
2178 if (--(o->refcount) == 0) {
2179 switch(o->type) {
2180 case REDIS_STRING: freeStringObject(o); break;
2181 case REDIS_LIST: freeListObject(o); break;
2182 case REDIS_SET: freeSetObject(o); break;
2183 case REDIS_ZSET: freeZsetObject(o); break;
2184 case REDIS_HASH: freeHashObject(o); break;
2185 default: redisAssert(0 != 0); break;
2186 }
2187 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2188 !listAddNodeHead(server.objfreelist,o))
2189 zfree(o);
2190 }
2191 }
2192
2193 static robj *lookupKey(redisDb *db, robj *key) {
2194 dictEntry *de = dictFind(db->dict,key);
2195 return de ? dictGetEntryVal(de) : NULL;
2196 }
2197
2198 static robj *lookupKeyRead(redisDb *db, robj *key) {
2199 expireIfNeeded(db,key);
2200 return lookupKey(db,key);
2201 }
2202
2203 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2204 deleteIfVolatile(db,key);
2205 return lookupKey(db,key);
2206 }
2207
2208 static int deleteKey(redisDb *db, robj *key) {
2209 int retval;
2210
2211 /* We need to protect key from destruction: after the first dictDelete()
2212 * it may happen that 'key' is no longer valid if we don't increment
2213 * it's count. This may happen when we get the object reference directly
2214 * from the hash table with dictRandomKey() or dict iterators */
2215 incrRefCount(key);
2216 if (dictSize(db->expires)) dictDelete(db->expires,key);
2217 retval = dictDelete(db->dict,key);
2218 decrRefCount(key);
2219
2220 return retval == DICT_OK;
2221 }
2222
2223 /* Try to share an object against the shared objects pool */
2224 static robj *tryObjectSharing(robj *o) {
2225 struct dictEntry *de;
2226 unsigned long c;
2227
2228 if (o == NULL || server.shareobjects == 0) return o;
2229
2230 redisAssert(o->type == REDIS_STRING);
2231 de = dictFind(server.sharingpool,o);
2232 if (de) {
2233 robj *shared = dictGetEntryKey(de);
2234
2235 c = ((unsigned long) dictGetEntryVal(de))+1;
2236 dictGetEntryVal(de) = (void*) c;
2237 incrRefCount(shared);
2238 decrRefCount(o);
2239 return shared;
2240 } else {
2241 /* Here we are using a stream algorihtm: Every time an object is
2242 * shared we increment its count, everytime there is a miss we
2243 * recrement the counter of a random object. If this object reaches
2244 * zero we remove the object and put the current object instead. */
2245 if (dictSize(server.sharingpool) >=
2246 server.sharingpoolsize) {
2247 de = dictGetRandomKey(server.sharingpool);
2248 redisAssert(de != NULL);
2249 c = ((unsigned long) dictGetEntryVal(de))-1;
2250 dictGetEntryVal(de) = (void*) c;
2251 if (c == 0) {
2252 dictDelete(server.sharingpool,de->key);
2253 }
2254 } else {
2255 c = 0; /* If the pool is empty we want to add this object */
2256 }
2257 if (c == 0) {
2258 int retval;
2259
2260 retval = dictAdd(server.sharingpool,o,(void*)1);
2261 redisAssert(retval == DICT_OK);
2262 incrRefCount(o);
2263 }
2264 return o;
2265 }
2266 }
2267
2268 /* Check if the nul-terminated string 's' can be represented by a long
2269 * (that is, is a number that fits into long without any other space or
2270 * character before or after the digits).
2271 *
2272 * If so, the function returns REDIS_OK and *longval is set to the value
2273 * of the number. Otherwise REDIS_ERR is returned */
2274 static int isStringRepresentableAsLong(sds s, long *longval) {
2275 char buf[32], *endptr;
2276 long value;
2277 int slen;
2278
2279 value = strtol(s, &endptr, 10);
2280 if (endptr[0] != '\0') return REDIS_ERR;
2281 slen = snprintf(buf,32,"%ld",value);
2282
2283 /* If the number converted back into a string is not identical
2284 * then it's not possible to encode the string as integer */
2285 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2286 if (longval) *longval = value;
2287 return REDIS_OK;
2288 }
2289
2290 /* Try to encode a string object in order to save space */
2291 static int tryObjectEncoding(robj *o) {
2292 long value;
2293 sds s = o->ptr;
2294
2295 if (o->encoding != REDIS_ENCODING_RAW)
2296 return REDIS_ERR; /* Already encoded */
2297
2298 /* It's not save to encode shared objects: shared objects can be shared
2299 * everywhere in the "object space" of Redis. Encoded objects can only
2300 * appear as "values" (and not, for instance, as keys) */
2301 if (o->refcount > 1) return REDIS_ERR;
2302
2303 /* Currently we try to encode only strings */
2304 redisAssert(o->type == REDIS_STRING);
2305
2306 /* Check if we can represent this string as a long integer */
2307 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2308
2309 /* Ok, this object can be encoded */
2310 o->encoding = REDIS_ENCODING_INT;
2311 sdsfree(o->ptr);
2312 o->ptr = (void*) value;
2313 return REDIS_OK;
2314 }
2315
2316 /* Get a decoded version of an encoded object (returned as a new object).
2317 * If the object is already raw-encoded just increment the ref count. */
2318 static robj *getDecodedObject(robj *o) {
2319 robj *dec;
2320
2321 if (o->encoding == REDIS_ENCODING_RAW) {
2322 incrRefCount(o);
2323 return o;
2324 }
2325 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2326 char buf[32];
2327
2328 snprintf(buf,32,"%ld",(long)o->ptr);
2329 dec = createStringObject(buf,strlen(buf));
2330 return dec;
2331 } else {
2332 redisAssert(1 != 1);
2333 }
2334 }
2335
2336 /* Compare two string objects via strcmp() or alike.
2337 * Note that the objects may be integer-encoded. In such a case we
2338 * use snprintf() to get a string representation of the numbers on the stack
2339 * and compare the strings, it's much faster than calling getDecodedObject().
2340 *
2341 * Important note: if objects are not integer encoded, but binary-safe strings,
2342 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2343 * binary safe. */
2344 static int compareStringObjects(robj *a, robj *b) {
2345 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2346 char bufa[128], bufb[128], *astr, *bstr;
2347 int bothsds = 1;
2348
2349 if (a == b) return 0;
2350 if (a->encoding != REDIS_ENCODING_RAW) {
2351 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2352 astr = bufa;
2353 bothsds = 0;
2354 } else {
2355 astr = a->ptr;
2356 }
2357 if (b->encoding != REDIS_ENCODING_RAW) {
2358 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2359 bstr = bufb;
2360 bothsds = 0;
2361 } else {
2362 bstr = b->ptr;
2363 }
2364 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2365 }
2366
2367 static size_t stringObjectLen(robj *o) {
2368 redisAssert(o->type == REDIS_STRING);
2369 if (o->encoding == REDIS_ENCODING_RAW) {
2370 return sdslen(o->ptr);
2371 } else {
2372 char buf[32];
2373
2374 return snprintf(buf,32,"%ld",(long)o->ptr);
2375 }
2376 }
2377
2378 /*============================ DB saving/loading ============================ */
2379
2380 static int rdbSaveType(FILE *fp, unsigned char type) {
2381 if (fwrite(&type,1,1,fp) == 0) return -1;
2382 return 0;
2383 }
2384
2385 static int rdbSaveTime(FILE *fp, time_t t) {
2386 int32_t t32 = (int32_t) t;
2387 if (fwrite(&t32,4,1,fp) == 0) return -1;
2388 return 0;
2389 }
2390
2391 /* check rdbLoadLen() comments for more info */
2392 static int rdbSaveLen(FILE *fp, uint32_t len) {
2393 unsigned char buf[2];
2394
2395 if (len < (1<<6)) {
2396 /* Save a 6 bit len */
2397 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2398 if (fwrite(buf,1,1,fp) == 0) return -1;
2399 } else if (len < (1<<14)) {
2400 /* Save a 14 bit len */
2401 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2402 buf[1] = len&0xFF;
2403 if (fwrite(buf,2,1,fp) == 0) return -1;
2404 } else {
2405 /* Save a 32 bit len */
2406 buf[0] = (REDIS_RDB_32BITLEN<<6);
2407 if (fwrite(buf,1,1,fp) == 0) return -1;
2408 len = htonl(len);
2409 if (fwrite(&len,4,1,fp) == 0) return -1;
2410 }
2411 return 0;
2412 }
2413
2414 /* String objects in the form "2391" "-100" without any space and with a
2415 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2416 * encoded as integers to save space */
2417 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2418 long long value;
2419 char *endptr, buf[32];
2420
2421 /* Check if it's possible to encode this value as a number */
2422 value = strtoll(s, &endptr, 10);
2423 if (endptr[0] != '\0') return 0;
2424 snprintf(buf,32,"%lld",value);
2425
2426 /* If the number converted back into a string is not identical
2427 * then it's not possible to encode the string as integer */
2428 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2429
2430 /* Finally check if it fits in our ranges */
2431 if (value >= -(1<<7) && value <= (1<<7)-1) {
2432 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2433 enc[1] = value&0xFF;
2434 return 2;
2435 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2436 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2437 enc[1] = value&0xFF;
2438 enc[2] = (value>>8)&0xFF;
2439 return 3;
2440 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2441 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2442 enc[1] = value&0xFF;
2443 enc[2] = (value>>8)&0xFF;
2444 enc[3] = (value>>16)&0xFF;
2445 enc[4] = (value>>24)&0xFF;
2446 return 5;
2447 } else {
2448 return 0;
2449 }
2450 }
2451
2452 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2453 unsigned int comprlen, outlen;
2454 unsigned char byte;
2455 void *out;
2456
2457 /* We require at least four bytes compression for this to be worth it */
2458 outlen = sdslen(obj->ptr)-4;
2459 if (outlen <= 0) return 0;
2460 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2461 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2462 if (comprlen == 0) {
2463 zfree(out);
2464 return 0;
2465 }
2466 /* Data compressed! Let's save it on disk */
2467 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2468 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2469 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2470 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2471 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2472 zfree(out);
2473 return comprlen;
2474
2475 writeerr:
2476 zfree(out);
2477 return -1;
2478 }
2479
2480 /* Save a string objet as [len][data] on disk. If the object is a string
2481 * representation of an integer value we try to safe it in a special form */
2482 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2483 size_t len;
2484 int enclen;
2485
2486 len = sdslen(obj->ptr);
2487
2488 /* Try integer encoding */
2489 if (len <= 11) {
2490 unsigned char buf[5];
2491 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2492 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2493 return 0;
2494 }
2495 }
2496
2497 /* Try LZF compression - under 20 bytes it's unable to compress even
2498 * aaaaaaaaaaaaaaaaaa so skip it */
2499 if (server.rdbcompression && len > 20) {
2500 int retval;
2501
2502 retval = rdbSaveLzfStringObject(fp,obj);
2503 if (retval == -1) return -1;
2504 if (retval > 0) return 0;
2505 /* retval == 0 means data can't be compressed, save the old way */
2506 }
2507
2508 /* Store verbatim */
2509 if (rdbSaveLen(fp,len) == -1) return -1;
2510 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2511 return 0;
2512 }
2513
2514 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2515 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2516 int retval;
2517
2518 obj = getDecodedObject(obj);
2519 retval = rdbSaveStringObjectRaw(fp,obj);
2520 decrRefCount(obj);
2521 return retval;
2522 }
2523
2524 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2525 * 8 bit integer specifing the length of the representation.
2526 * This 8 bit integer has special values in order to specify the following
2527 * conditions:
2528 * 253: not a number
2529 * 254: + inf
2530 * 255: - inf
2531 */
2532 static int rdbSaveDoubleValue(FILE *fp, double val) {
2533 unsigned char buf[128];
2534 int len;
2535
2536 if (isnan(val)) {
2537 buf[0] = 253;
2538 len = 1;
2539 } else if (!isfinite(val)) {
2540 len = 1;
2541 buf[0] = (val < 0) ? 255 : 254;
2542 } else {
2543 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2544 buf[0] = strlen((char*)buf+1);
2545 len = buf[0]+1;
2546 }
2547 if (fwrite(buf,len,1,fp) == 0) return -1;
2548 return 0;
2549 }
2550
2551 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2552 static int rdbSave(char *filename) {
2553 dictIterator *di = NULL;
2554 dictEntry *de;
2555 FILE *fp;
2556 char tmpfile[256];
2557 int j;
2558 time_t now = time(NULL);
2559
2560 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2561 fp = fopen(tmpfile,"w");
2562 if (!fp) {
2563 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2564 return REDIS_ERR;
2565 }
2566 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2567 for (j = 0; j < server.dbnum; j++) {
2568 redisDb *db = server.db+j;
2569 dict *d = db->dict;
2570 if (dictSize(d) == 0) continue;
2571 di = dictGetIterator(d);
2572 if (!di) {
2573 fclose(fp);
2574 return REDIS_ERR;
2575 }
2576
2577 /* Write the SELECT DB opcode */
2578 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2579 if (rdbSaveLen(fp,j) == -1) goto werr;
2580
2581 /* Iterate this DB writing every entry */
2582 while((de = dictNext(di)) != NULL) {
2583 robj *key = dictGetEntryKey(de);
2584 robj *o = dictGetEntryVal(de);
2585 time_t expiretime = getExpire(db,key);
2586
2587 /* Save the expire time */
2588 if (expiretime != -1) {
2589 /* If this key is already expired skip it */
2590 if (expiretime < now) continue;
2591 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2592 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2593 }
2594 /* Save the key and associated value */
2595 if (rdbSaveType(fp,o->type) == -1) goto werr;
2596 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2597 if (o->type == REDIS_STRING) {
2598 /* Save a string value */
2599 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2600 } else if (o->type == REDIS_LIST) {
2601 /* Save a list value */
2602 list *list = o->ptr;
2603 listNode *ln;
2604
2605 listRewind(list);
2606 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2607 while((ln = listYield(list))) {
2608 robj *eleobj = listNodeValue(ln);
2609
2610 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2611 }
2612 } else if (o->type == REDIS_SET) {
2613 /* Save a set value */
2614 dict *set = o->ptr;
2615 dictIterator *di = dictGetIterator(set);
2616 dictEntry *de;
2617
2618 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2619 while((de = dictNext(di)) != NULL) {
2620 robj *eleobj = dictGetEntryKey(de);
2621
2622 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2623 }
2624 dictReleaseIterator(di);
2625 } else if (o->type == REDIS_ZSET) {
2626 /* Save a set value */
2627 zset *zs = o->ptr;
2628 dictIterator *di = dictGetIterator(zs->dict);
2629 dictEntry *de;
2630
2631 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2632 while((de = dictNext(di)) != NULL) {
2633 robj *eleobj = dictGetEntryKey(de);
2634 double *score = dictGetEntryVal(de);
2635
2636 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2637 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2638 }
2639 dictReleaseIterator(di);
2640 } else {
2641 redisAssert(0 != 0);
2642 }
2643 }
2644 dictReleaseIterator(di);
2645 }
2646 /* EOF opcode */
2647 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2648
2649 /* Make sure data will not remain on the OS's output buffers */
2650 fflush(fp);
2651 fsync(fileno(fp));
2652 fclose(fp);
2653
2654 /* Use RENAME to make sure the DB file is changed atomically only
2655 * if the generate DB file is ok. */
2656 if (rename(tmpfile,filename) == -1) {
2657 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2658 unlink(tmpfile);
2659 return REDIS_ERR;
2660 }
2661 redisLog(REDIS_NOTICE,"DB saved on disk");
2662 server.dirty = 0;
2663 server.lastsave = time(NULL);
2664 return REDIS_OK;
2665
2666 werr:
2667 fclose(fp);
2668 unlink(tmpfile);
2669 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2670 if (di) dictReleaseIterator(di);
2671 return REDIS_ERR;
2672 }
2673
2674 static int rdbSaveBackground(char *filename) {
2675 pid_t childpid;
2676
2677 if (server.bgsavechildpid != -1) return REDIS_ERR;
2678 if ((childpid = fork()) == 0) {
2679 /* Child */
2680 close(server.fd);
2681 if (rdbSave(filename) == REDIS_OK) {
2682 exit(0);
2683 } else {
2684 exit(1);
2685 }
2686 } else {
2687 /* Parent */
2688 if (childpid == -1) {
2689 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2690 strerror(errno));
2691 return REDIS_ERR;
2692 }
2693 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2694 server.bgsavechildpid = childpid;
2695 return REDIS_OK;
2696 }
2697 return REDIS_OK; /* unreached */
2698 }
2699
2700 static void rdbRemoveTempFile(pid_t childpid) {
2701 char tmpfile[256];
2702
2703 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2704 unlink(tmpfile);
2705 }
2706
2707 static int rdbLoadType(FILE *fp) {
2708 unsigned char type;
2709 if (fread(&type,1,1,fp) == 0) return -1;
2710 return type;
2711 }
2712
2713 static time_t rdbLoadTime(FILE *fp) {
2714 int32_t t32;
2715 if (fread(&t32,4,1,fp) == 0) return -1;
2716 return (time_t) t32;
2717 }
2718
2719 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2720 * of this file for a description of how this are stored on disk.
2721 *
2722 * isencoded is set to 1 if the readed length is not actually a length but
2723 * an "encoding type", check the above comments for more info */
2724 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2725 unsigned char buf[2];
2726 uint32_t len;
2727
2728 if (isencoded) *isencoded = 0;
2729 if (rdbver == 0) {
2730 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2731 return ntohl(len);
2732 } else {
2733 int type;
2734
2735 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2736 type = (buf[0]&0xC0)>>6;
2737 if (type == REDIS_RDB_6BITLEN) {
2738 /* Read a 6 bit len */
2739 return buf[0]&0x3F;
2740 } else if (type == REDIS_RDB_ENCVAL) {
2741 /* Read a 6 bit len encoding type */
2742 if (isencoded) *isencoded = 1;
2743 return buf[0]&0x3F;
2744 } else if (type == REDIS_RDB_14BITLEN) {
2745 /* Read a 14 bit len */
2746 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2747 return ((buf[0]&0x3F)<<8)|buf[1];
2748 } else {
2749 /* Read a 32 bit len */
2750 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2751 return ntohl(len);
2752 }
2753 }
2754 }
2755
2756 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2757 unsigned char enc[4];
2758 long long val;
2759
2760 if (enctype == REDIS_RDB_ENC_INT8) {
2761 if (fread(enc,1,1,fp) == 0) return NULL;
2762 val = (signed char)enc[0];
2763 } else if (enctype == REDIS_RDB_ENC_INT16) {
2764 uint16_t v;
2765 if (fread(enc,2,1,fp) == 0) return NULL;
2766 v = enc[0]|(enc[1]<<8);
2767 val = (int16_t)v;
2768 } else if (enctype == REDIS_RDB_ENC_INT32) {
2769 uint32_t v;
2770 if (fread(enc,4,1,fp) == 0) return NULL;
2771 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2772 val = (int32_t)v;
2773 } else {
2774 val = 0; /* anti-warning */
2775 redisAssert(0!=0);
2776 }
2777 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2778 }
2779
2780 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2781 unsigned int len, clen;
2782 unsigned char *c = NULL;
2783 sds val = NULL;
2784
2785 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2786 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2787 if ((c = zmalloc(clen)) == NULL) goto err;
2788 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2789 if (fread(c,clen,1,fp) == 0) goto err;
2790 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2791 zfree(c);
2792 return createObject(REDIS_STRING,val);
2793 err:
2794 zfree(c);
2795 sdsfree(val);
2796 return NULL;
2797 }
2798
2799 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2800 int isencoded;
2801 uint32_t len;
2802 sds val;
2803
2804 len = rdbLoadLen(fp,rdbver,&isencoded);
2805 if (isencoded) {
2806 switch(len) {
2807 case REDIS_RDB_ENC_INT8:
2808 case REDIS_RDB_ENC_INT16:
2809 case REDIS_RDB_ENC_INT32:
2810 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2811 case REDIS_RDB_ENC_LZF:
2812 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2813 default:
2814 redisAssert(0!=0);
2815 }
2816 }
2817
2818 if (len == REDIS_RDB_LENERR) return NULL;
2819 val = sdsnewlen(NULL,len);
2820 if (len && fread(val,len,1,fp) == 0) {
2821 sdsfree(val);
2822 return NULL;
2823 }
2824 return tryObjectSharing(createObject(REDIS_STRING,val));
2825 }
2826
2827 /* For information about double serialization check rdbSaveDoubleValue() */
2828 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2829 char buf[128];
2830 unsigned char len;
2831
2832 if (fread(&len,1,1,fp) == 0) return -1;
2833 switch(len) {
2834 case 255: *val = R_NegInf; return 0;
2835 case 254: *val = R_PosInf; return 0;
2836 case 253: *val = R_Nan; return 0;
2837 default:
2838 if (fread(buf,len,1,fp) == 0) return -1;
2839 buf[len] = '\0';
2840 sscanf(buf, "%lg", val);
2841 return 0;
2842 }
2843 }
2844
2845 static int rdbLoad(char *filename) {
2846 FILE *fp;
2847 robj *keyobj = NULL;
2848 uint32_t dbid;
2849 int type, retval, rdbver;
2850 dict *d = server.db[0].dict;
2851 redisDb *db = server.db+0;
2852 char buf[1024];
2853 time_t expiretime = -1, now = time(NULL);
2854
2855 fp = fopen(filename,"r");
2856 if (!fp) return REDIS_ERR;
2857 if (fread(buf,9,1,fp) == 0) goto eoferr;
2858 buf[9] = '\0';
2859 if (memcmp(buf,"REDIS",5) != 0) {
2860 fclose(fp);
2861 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2862 return REDIS_ERR;
2863 }
2864 rdbver = atoi(buf+5);
2865 if (rdbver > 1) {
2866 fclose(fp);
2867 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2868 return REDIS_ERR;
2869 }
2870 while(1) {
2871 robj *o;
2872
2873 /* Read type. */
2874 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2875 if (type == REDIS_EXPIRETIME) {
2876 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2877 /* We read the time so we need to read the object type again */
2878 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2879 }
2880 if (type == REDIS_EOF) break;
2881 /* Handle SELECT DB opcode as a special case */
2882 if (type == REDIS_SELECTDB) {
2883 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2884 goto eoferr;
2885 if (dbid >= (unsigned)server.dbnum) {
2886 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2887 exit(1);
2888 }
2889 db = server.db+dbid;
2890 d = db->dict;
2891 continue;
2892 }
2893 /* Read key */
2894 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2895
2896 if (type == REDIS_STRING) {
2897 /* Read string value */
2898 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2899 tryObjectEncoding(o);
2900 } else if (type == REDIS_LIST || type == REDIS_SET) {
2901 /* Read list/set value */
2902 uint32_t listlen;
2903
2904 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2905 goto eoferr;
2906 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2907 /* Load every single element of the list/set */
2908 while(listlen--) {
2909 robj *ele;
2910
2911 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2912 tryObjectEncoding(ele);
2913 if (type == REDIS_LIST) {
2914 listAddNodeTail((list*)o->ptr,ele);
2915 } else {
2916 dictAdd((dict*)o->ptr,ele,NULL);
2917 }
2918 }
2919 } else if (type == REDIS_ZSET) {
2920 /* Read list/set value */
2921 uint32_t zsetlen;
2922 zset *zs;
2923
2924 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2925 goto eoferr;
2926 o = createZsetObject();
2927 zs = o->ptr;
2928 /* Load every single element of the list/set */
2929 while(zsetlen--) {
2930 robj *ele;
2931 double *score = zmalloc(sizeof(double));
2932
2933 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2934 tryObjectEncoding(ele);
2935 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2936 dictAdd(zs->dict,ele,score);
2937 zslInsert(zs->zsl,*score,ele);
2938 incrRefCount(ele); /* added to skiplist */
2939 }
2940 } else {
2941 redisAssert(0 != 0);
2942 }
2943 /* Add the new object in the hash table */
2944 retval = dictAdd(d,keyobj,o);
2945 if (retval == DICT_ERR) {
2946 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2947 exit(1);
2948 }
2949 /* Set the expire time if needed */
2950 if (expiretime != -1) {
2951 setExpire(db,keyobj,expiretime);
2952 /* Delete this key if already expired */
2953 if (expiretime < now) deleteKey(db,keyobj);
2954 expiretime = -1;
2955 }
2956 keyobj = o = NULL;
2957 }
2958 fclose(fp);
2959 return REDIS_OK;
2960
2961 eoferr: /* unexpected end of file is handled here with a fatal exit */
2962 if (keyobj) decrRefCount(keyobj);
2963 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2964 exit(1);
2965 return REDIS_ERR; /* Just to avoid warning */
2966 }
2967
2968 /*================================== Commands =============================== */
2969
2970 static void authCommand(redisClient *c) {
2971 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2972 c->authenticated = 1;
2973 addReply(c,shared.ok);
2974 } else {
2975 c->authenticated = 0;
2976 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2977 }
2978 }
2979
2980 static void pingCommand(redisClient *c) {
2981 addReply(c,shared.pong);
2982 }
2983
2984 static void echoCommand(redisClient *c) {
2985 addReplyBulkLen(c,c->argv[1]);
2986 addReply(c,c->argv[1]);
2987 addReply(c,shared.crlf);
2988 }
2989
2990 /*=================================== Strings =============================== */
2991
2992 static void setGenericCommand(redisClient *c, int nx) {
2993 int retval;
2994
2995 if (nx) deleteIfVolatile(c->db,c->argv[1]);
2996 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2997 if (retval == DICT_ERR) {
2998 if (!nx) {
2999 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3000 incrRefCount(c->argv[2]);
3001 } else {
3002 addReply(c,shared.czero);
3003 return;
3004 }
3005 } else {
3006 incrRefCount(c->argv[1]);
3007 incrRefCount(c->argv[2]);
3008 }
3009 server.dirty++;
3010 removeExpire(c->db,c->argv[1]);
3011 addReply(c, nx ? shared.cone : shared.ok);
3012 }
3013
3014 static void setCommand(redisClient *c) {
3015 setGenericCommand(c,0);
3016 }
3017
3018 static void setnxCommand(redisClient *c) {
3019 setGenericCommand(c,1);
3020 }
3021
3022 static void getCommand(redisClient *c) {
3023 robj *o = lookupKeyRead(c->db,c->argv[1]);
3024
3025 if (o == NULL) {
3026 addReply(c,shared.nullbulk);
3027 } else {
3028 if (o->type != REDIS_STRING) {
3029 addReply(c,shared.wrongtypeerr);
3030 } else {
3031 addReplyBulkLen(c,o);
3032 addReply(c,o);
3033 addReply(c,shared.crlf);
3034 }
3035 }
3036 }
3037
3038 static void getsetCommand(redisClient *c) {
3039 getCommand(c);
3040 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3041 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3042 } else {
3043 incrRefCount(c->argv[1]);
3044 }
3045 incrRefCount(c->argv[2]);
3046 server.dirty++;
3047 removeExpire(c->db,c->argv[1]);
3048 }
3049
3050 static void mgetCommand(redisClient *c) {
3051 int j;
3052
3053 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3054 for (j = 1; j < c->argc; j++) {
3055 robj *o = lookupKeyRead(c->db,c->argv[j]);
3056 if (o == NULL) {
3057 addReply(c,shared.nullbulk);
3058 } else {
3059 if (o->type != REDIS_STRING) {
3060 addReply(c,shared.nullbulk);
3061 } else {
3062 addReplyBulkLen(c,o);
3063 addReply(c,o);
3064 addReply(c,shared.crlf);
3065 }
3066 }
3067 }
3068 }
3069
3070 static void msetGenericCommand(redisClient *c, int nx) {
3071 int j, busykeys = 0;
3072
3073 if ((c->argc % 2) == 0) {
3074 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3075 return;
3076 }
3077 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3078 * set nothing at all if at least one already key exists. */
3079 if (nx) {
3080 for (j = 1; j < c->argc; j += 2) {
3081 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3082 busykeys++;
3083 }
3084 }
3085 }
3086 if (busykeys) {
3087 addReply(c, shared.czero);
3088 return;
3089 }
3090
3091 for (j = 1; j < c->argc; j += 2) {
3092 int retval;
3093
3094 tryObjectEncoding(c->argv[j+1]);
3095 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3096 if (retval == DICT_ERR) {
3097 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3098 incrRefCount(c->argv[j+1]);
3099 } else {
3100 incrRefCount(c->argv[j]);
3101 incrRefCount(c->argv[j+1]);
3102 }
3103 removeExpire(c->db,c->argv[j]);
3104 }
3105 server.dirty += (c->argc-1)/2;
3106 addReply(c, nx ? shared.cone : shared.ok);
3107 }
3108
3109 static void msetCommand(redisClient *c) {
3110 msetGenericCommand(c,0);
3111 }
3112
3113 static void msetnxCommand(redisClient *c) {
3114 msetGenericCommand(c,1);
3115 }
3116
3117 static void incrDecrCommand(redisClient *c, long long incr) {
3118 long long value;
3119 int retval;
3120 robj *o;
3121
3122 o = lookupKeyWrite(c->db,c->argv[1]);
3123 if (o == NULL) {
3124 value = 0;
3125 } else {
3126 if (o->type != REDIS_STRING) {
3127 value = 0;
3128 } else {
3129 char *eptr;
3130
3131 if (o->encoding == REDIS_ENCODING_RAW)
3132 value = strtoll(o->ptr, &eptr, 10);
3133 else if (o->encoding == REDIS_ENCODING_INT)
3134 value = (long)o->ptr;
3135 else
3136 redisAssert(1 != 1);
3137 }
3138 }
3139
3140 value += incr;
3141 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3142 tryObjectEncoding(o);
3143 retval = dictAdd(c->db->dict,c->argv[1],o);
3144 if (retval == DICT_ERR) {
3145 dictReplace(c->db->dict,c->argv[1],o);
3146 removeExpire(c->db,c->argv[1]);
3147 } else {
3148 incrRefCount(c->argv[1]);
3149 }
3150 server.dirty++;
3151 addReply(c,shared.colon);
3152 addReply(c,o);
3153 addReply(c,shared.crlf);
3154 }
3155
3156 static void incrCommand(redisClient *c) {
3157 incrDecrCommand(c,1);
3158 }
3159
3160 static void decrCommand(redisClient *c) {
3161 incrDecrCommand(c,-1);
3162 }
3163
3164 static void incrbyCommand(redisClient *c) {
3165 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3166 incrDecrCommand(c,incr);
3167 }
3168
3169 static void decrbyCommand(redisClient *c) {
3170 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3171 incrDecrCommand(c,-incr);
3172 }
3173
3174 /* ========================= Type agnostic commands ========================= */
3175
3176 static void delCommand(redisClient *c) {
3177 int deleted = 0, j;
3178
3179 for (j = 1; j < c->argc; j++) {
3180 if (deleteKey(c->db,c->argv[j])) {
3181 server.dirty++;
3182 deleted++;
3183 }
3184 }
3185 switch(deleted) {
3186 case 0:
3187 addReply(c,shared.czero);
3188 break;
3189 case 1:
3190 addReply(c,shared.cone);
3191 break;
3192 default:
3193 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3194 break;
3195 }
3196 }
3197
3198 static void existsCommand(redisClient *c) {
3199 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3200 }
3201
3202 static void selectCommand(redisClient *c) {
3203 int id = atoi(c->argv[1]->ptr);
3204
3205 if (selectDb(c,id) == REDIS_ERR) {
3206 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3207 } else {
3208 addReply(c,shared.ok);
3209 }
3210 }
3211
3212 static void randomkeyCommand(redisClient *c) {
3213 dictEntry *de;
3214
3215 while(1) {
3216 de = dictGetRandomKey(c->db->dict);
3217 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3218 }
3219 if (de == NULL) {
3220 addReply(c,shared.plus);
3221 addReply(c,shared.crlf);
3222 } else {
3223 addReply(c,shared.plus);
3224 addReply(c,dictGetEntryKey(de));
3225 addReply(c,shared.crlf);
3226 }
3227 }
3228
3229 static void keysCommand(redisClient *c) {
3230 dictIterator *di;
3231 dictEntry *de;
3232 sds pattern = c->argv[1]->ptr;
3233 int plen = sdslen(pattern);
3234 unsigned long numkeys = 0, keyslen = 0;
3235 robj *lenobj = createObject(REDIS_STRING,NULL);
3236
3237 di = dictGetIterator(c->db->dict);
3238 addReply(c,lenobj);
3239 decrRefCount(lenobj);
3240 while((de = dictNext(di)) != NULL) {
3241 robj *keyobj = dictGetEntryKey(de);
3242
3243 sds key = keyobj->ptr;
3244 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3245 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3246 if (expireIfNeeded(c->db,keyobj) == 0) {
3247 if (numkeys != 0)
3248 addReply(c,shared.space);
3249 addReply(c,keyobj);
3250 numkeys++;
3251 keyslen += sdslen(key);
3252 }
3253 }
3254 }
3255 dictReleaseIterator(di);
3256 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3257 addReply(c,shared.crlf);
3258 }
3259
3260 static void dbsizeCommand(redisClient *c) {
3261 addReplySds(c,
3262 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3263 }
3264
3265 static void lastsaveCommand(redisClient *c) {
3266 addReplySds(c,
3267 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3268 }
3269
3270 static void typeCommand(redisClient *c) {
3271 robj *o;
3272 char *type;
3273
3274 o = lookupKeyRead(c->db,c->argv[1]);
3275 if (o == NULL) {
3276 type = "+none";
3277 } else {
3278 switch(o->type) {
3279 case REDIS_STRING: type = "+string"; break;
3280 case REDIS_LIST: type = "+list"; break;
3281 case REDIS_SET: type = "+set"; break;
3282 case REDIS_ZSET: type = "+zset"; break;
3283 default: type = "unknown"; break;
3284 }
3285 }
3286 addReplySds(c,sdsnew(type));
3287 addReply(c,shared.crlf);
3288 }
3289
3290 static void saveCommand(redisClient *c) {
3291 if (server.bgsavechildpid != -1) {
3292 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3293 return;
3294 }
3295 if (rdbSave(server.dbfilename) == REDIS_OK) {
3296 addReply(c,shared.ok);
3297 } else {
3298 addReply(c,shared.err);
3299 }
3300 }
3301
3302 static void bgsaveCommand(redisClient *c) {
3303 if (server.bgsavechildpid != -1) {
3304 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3305 return;
3306 }
3307 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3308 char *status = "+Background saving started\r\n";
3309 addReplySds(c,sdsnew(status));
3310 } else {
3311 addReply(c,shared.err);
3312 }
3313 }
3314
3315 static void shutdownCommand(redisClient *c) {
3316 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3317 /* Kill the saving child if there is a background saving in progress.
3318 We want to avoid race conditions, for instance our saving child may
3319 overwrite the synchronous saving did by SHUTDOWN. */
3320 if (server.bgsavechildpid != -1) {
3321 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3322 kill(server.bgsavechildpid,SIGKILL);
3323 rdbRemoveTempFile(server.bgsavechildpid);
3324 }
3325 if (server.appendonly) {
3326 /* Append only file: fsync() the AOF and exit */
3327 fsync(server.appendfd);
3328 exit(0);
3329 } else {
3330 /* Snapshotting. Perform a SYNC SAVE and exit */
3331 if (rdbSave(server.dbfilename) == REDIS_OK) {
3332 if (server.daemonize)
3333 unlink(server.pidfile);
3334 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3335 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3336 exit(0);
3337 } else {
3338 /* Ooops.. error saving! The best we can do is to continue operating.
3339 * Note that if there was a background saving process, in the next
3340 * cron() Redis will be notified that the background saving aborted,
3341 * handling special stuff like slaves pending for synchronization... */
3342 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3343 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3344 }
3345 }
3346 }
3347
3348 static void renameGenericCommand(redisClient *c, int nx) {
3349 robj *o;
3350
3351 /* To use the same key as src and dst is probably an error */
3352 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3353 addReply(c,shared.sameobjecterr);
3354 return;
3355 }
3356
3357 o = lookupKeyWrite(c->db,c->argv[1]);
3358 if (o == NULL) {
3359 addReply(c,shared.nokeyerr);
3360 return;
3361 }
3362 incrRefCount(o);
3363 deleteIfVolatile(c->db,c->argv[2]);
3364 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3365 if (nx) {
3366 decrRefCount(o);
3367 addReply(c,shared.czero);
3368 return;
3369 }
3370 dictReplace(c->db->dict,c->argv[2],o);
3371 } else {
3372 incrRefCount(c->argv[2]);
3373 }
3374 deleteKey(c->db,c->argv[1]);
3375 server.dirty++;
3376 addReply(c,nx ? shared.cone : shared.ok);
3377 }
3378
3379 static void renameCommand(redisClient *c) {
3380 renameGenericCommand(c,0);
3381 }
3382
3383 static void renamenxCommand(redisClient *c) {
3384 renameGenericCommand(c,1);
3385 }
3386
3387 static void moveCommand(redisClient *c) {
3388 robj *o;
3389 redisDb *src, *dst;
3390 int srcid;
3391
3392 /* Obtain source and target DB pointers */
3393 src = c->db;
3394 srcid = c->db->id;
3395 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3396 addReply(c,shared.outofrangeerr);
3397 return;
3398 }
3399 dst = c->db;
3400 selectDb(c,srcid); /* Back to the source DB */
3401
3402 /* If the user is moving using as target the same
3403 * DB as the source DB it is probably an error. */
3404 if (src == dst) {
3405 addReply(c,shared.sameobjecterr);
3406 return;
3407 }
3408
3409 /* Check if the element exists and get a reference */
3410 o = lookupKeyWrite(c->db,c->argv[1]);
3411 if (!o) {
3412 addReply(c,shared.czero);
3413 return;
3414 }
3415
3416 /* Try to add the element to the target DB */
3417 deleteIfVolatile(dst,c->argv[1]);
3418 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3419 addReply(c,shared.czero);
3420 return;
3421 }
3422 incrRefCount(c->argv[1]);
3423 incrRefCount(o);
3424
3425 /* OK! key moved, free the entry in the source DB */
3426 deleteKey(src,c->argv[1]);
3427 server.dirty++;
3428 addReply(c,shared.cone);
3429 }
3430
3431 /* =================================== Lists ================================ */
3432 static void pushGenericCommand(redisClient *c, int where) {
3433 robj *lobj;
3434 list *list;
3435
3436 lobj = lookupKeyWrite(c->db,c->argv[1]);
3437 if (lobj == NULL) {
3438 lobj = createListObject();
3439 list = lobj->ptr;
3440 if (where == REDIS_HEAD) {
3441 listAddNodeHead(list,c->argv[2]);
3442 } else {
3443 listAddNodeTail(list,c->argv[2]);
3444 }
3445 dictAdd(c->db->dict,c->argv[1],lobj);
3446 incrRefCount(c->argv[1]);
3447 incrRefCount(c->argv[2]);
3448 } else {
3449 if (lobj->type != REDIS_LIST) {
3450 addReply(c,shared.wrongtypeerr);
3451 return;
3452 }
3453 list = lobj->ptr;
3454 if (where == REDIS_HEAD) {
3455 listAddNodeHead(list,c->argv[2]);
3456 } else {
3457 listAddNodeTail(list,c->argv[2]);
3458 }
3459 incrRefCount(c->argv[2]);
3460 }
3461 server.dirty++;
3462 addReply(c,shared.ok);
3463 }
3464
3465 static void lpushCommand(redisClient *c) {
3466 pushGenericCommand(c,REDIS_HEAD);
3467 }
3468
3469 static void rpushCommand(redisClient *c) {
3470 pushGenericCommand(c,REDIS_TAIL);
3471 }
3472
3473 static void llenCommand(redisClient *c) {
3474 robj *o;
3475 list *l;
3476
3477 o = lookupKeyRead(c->db,c->argv[1]);
3478 if (o == NULL) {
3479 addReply(c,shared.czero);
3480 return;
3481 } else {
3482 if (o->type != REDIS_LIST) {
3483 addReply(c,shared.wrongtypeerr);
3484 } else {
3485 l = o->ptr;
3486 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3487 }
3488 }
3489 }
3490
3491 static void lindexCommand(redisClient *c) {
3492 robj *o;
3493 int index = atoi(c->argv[2]->ptr);
3494
3495 o = lookupKeyRead(c->db,c->argv[1]);
3496 if (o == NULL) {
3497 addReply(c,shared.nullbulk);
3498 } else {
3499 if (o->type != REDIS_LIST) {
3500 addReply(c,shared.wrongtypeerr);
3501 } else {
3502 list *list = o->ptr;
3503 listNode *ln;
3504
3505 ln = listIndex(list, index);
3506 if (ln == NULL) {
3507 addReply(c,shared.nullbulk);
3508 } else {
3509 robj *ele = listNodeValue(ln);
3510 addReplyBulkLen(c,ele);
3511 addReply(c,ele);
3512 addReply(c,shared.crlf);
3513 }
3514 }
3515 }
3516 }
3517
3518 static void lsetCommand(redisClient *c) {
3519 robj *o;
3520 int index = atoi(c->argv[2]->ptr);
3521
3522 o = lookupKeyWrite(c->db,c->argv[1]);
3523 if (o == NULL) {
3524 addReply(c,shared.nokeyerr);
3525 } else {
3526 if (o->type != REDIS_LIST) {
3527 addReply(c,shared.wrongtypeerr);
3528 } else {
3529 list *list = o->ptr;
3530 listNode *ln;
3531
3532 ln = listIndex(list, index);
3533 if (ln == NULL) {
3534 addReply(c,shared.outofrangeerr);
3535 } else {
3536 robj *ele = listNodeValue(ln);
3537
3538 decrRefCount(ele);
3539 listNodeValue(ln) = c->argv[3];
3540 incrRefCount(c->argv[3]);
3541 addReply(c,shared.ok);
3542 server.dirty++;
3543 }
3544 }
3545 }
3546 }
3547
3548 static void popGenericCommand(redisClient *c, int where) {
3549 robj *o;
3550
3551 o = lookupKeyWrite(c->db,c->argv[1]);
3552 if (o == NULL) {
3553 addReply(c,shared.nullbulk);
3554 } else {
3555 if (o->type != REDIS_LIST) {
3556 addReply(c,shared.wrongtypeerr);
3557 } else {
3558 list *list = o->ptr;
3559 listNode *ln;
3560
3561 if (where == REDIS_HEAD)
3562 ln = listFirst(list);
3563 else
3564 ln = listLast(list);
3565
3566 if (ln == NULL) {
3567 addReply(c,shared.nullbulk);
3568 } else {
3569 robj *ele = listNodeValue(ln);
3570 addReplyBulkLen(c,ele);
3571 addReply(c,ele);
3572 addReply(c,shared.crlf);
3573 listDelNode(list,ln);
3574 server.dirty++;
3575 }
3576 }
3577 }
3578 }
3579
3580 static void lpopCommand(redisClient *c) {
3581 popGenericCommand(c,REDIS_HEAD);
3582 }
3583
3584 static void rpopCommand(redisClient *c) {
3585 popGenericCommand(c,REDIS_TAIL);
3586 }
3587
3588 static void lrangeCommand(redisClient *c) {
3589 robj *o;
3590 int start = atoi(c->argv[2]->ptr);
3591 int end = atoi(c->argv[3]->ptr);
3592
3593 o = lookupKeyRead(c->db,c->argv[1]);
3594 if (o == NULL) {
3595 addReply(c,shared.nullmultibulk);
3596 } else {
3597 if (o->type != REDIS_LIST) {
3598 addReply(c,shared.wrongtypeerr);
3599 } else {
3600 list *list = o->ptr;
3601 listNode *ln;
3602 int llen = listLength(list);
3603 int rangelen, j;
3604 robj *ele;
3605
3606 /* convert negative indexes */
3607 if (start < 0) start = llen+start;
3608 if (end < 0) end = llen+end;
3609 if (start < 0) start = 0;
3610 if (end < 0) end = 0;
3611
3612 /* indexes sanity checks */
3613 if (start > end || start >= llen) {
3614 /* Out of range start or start > end result in empty list */
3615 addReply(c,shared.emptymultibulk);
3616 return;
3617 }
3618 if (end >= llen) end = llen-1;
3619 rangelen = (end-start)+1;
3620
3621 /* Return the result in form of a multi-bulk reply */
3622 ln = listIndex(list, start);
3623 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3624 for (j = 0; j < rangelen; j++) {
3625 ele = listNodeValue(ln);
3626 addReplyBulkLen(c,ele);
3627 addReply(c,ele);
3628 addReply(c,shared.crlf);
3629 ln = ln->next;
3630 }
3631 }
3632 }
3633 }
3634
3635 static void ltrimCommand(redisClient *c) {
3636 robj *o;
3637 int start = atoi(c->argv[2]->ptr);
3638 int end = atoi(c->argv[3]->ptr);
3639
3640 o = lookupKeyWrite(c->db,c->argv[1]);
3641 if (o == NULL) {
3642 addReply(c,shared.ok);
3643 } else {
3644 if (o->type != REDIS_LIST) {
3645 addReply(c,shared.wrongtypeerr);
3646 } else {
3647 list *list = o->ptr;
3648 listNode *ln;
3649 int llen = listLength(list);
3650 int j, ltrim, rtrim;
3651
3652 /* convert negative indexes */
3653 if (start < 0) start = llen+start;
3654 if (end < 0) end = llen+end;
3655 if (start < 0) start = 0;
3656 if (end < 0) end = 0;
3657
3658 /* indexes sanity checks */
3659 if (start > end || start >= llen) {
3660 /* Out of range start or start > end result in empty list */
3661 ltrim = llen;
3662 rtrim = 0;
3663 } else {
3664 if (end >= llen) end = llen-1;
3665 ltrim = start;
3666 rtrim = llen-end-1;
3667 }
3668
3669 /* Remove list elements to perform the trim */
3670 for (j = 0; j < ltrim; j++) {
3671 ln = listFirst(list);
3672 listDelNode(list,ln);
3673 }
3674 for (j = 0; j < rtrim; j++) {
3675 ln = listLast(list);
3676 listDelNode(list,ln);
3677 }
3678 server.dirty++;
3679 addReply(c,shared.ok);
3680 }
3681 }
3682 }
3683
3684 static void lremCommand(redisClient *c) {
3685 robj *o;
3686
3687 o = lookupKeyWrite(c->db,c->argv[1]);
3688 if (o == NULL) {
3689 addReply(c,shared.czero);
3690 } else {
3691 if (o->type != REDIS_LIST) {
3692 addReply(c,shared.wrongtypeerr);
3693 } else {
3694 list *list = o->ptr;
3695 listNode *ln, *next;
3696 int toremove = atoi(c->argv[2]->ptr);
3697 int removed = 0;
3698 int fromtail = 0;
3699
3700 if (toremove < 0) {
3701 toremove = -toremove;
3702 fromtail = 1;
3703 }
3704 ln = fromtail ? list->tail : list->head;
3705 while (ln) {
3706 robj *ele = listNodeValue(ln);
3707
3708 next = fromtail ? ln->prev : ln->next;
3709 if (compareStringObjects(ele,c->argv[3]) == 0) {
3710 listDelNode(list,ln);
3711 server.dirty++;
3712 removed++;
3713 if (toremove && removed == toremove) break;
3714 }
3715 ln = next;
3716 }
3717 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3718 }
3719 }
3720 }
3721
3722 /* This is the semantic of this command:
3723 * RPOPLPUSH srclist dstlist:
3724 * IF LLEN(srclist) > 0
3725 * element = RPOP srclist
3726 * LPUSH dstlist element
3727 * RETURN element
3728 * ELSE
3729 * RETURN nil
3730 * END
3731 * END
3732 *
3733 * The idea is to be able to get an element from a list in a reliable way
3734 * since the element is not just returned but pushed against another list
3735 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3736 */
3737 static void rpoplpushcommand(redisClient *c) {
3738 robj *sobj;
3739
3740 sobj = lookupKeyWrite(c->db,c->argv[1]);
3741 if (sobj == NULL) {
3742 addReply(c,shared.nullbulk);
3743 } else {
3744 if (sobj->type != REDIS_LIST) {
3745 addReply(c,shared.wrongtypeerr);
3746 } else {
3747 list *srclist = sobj->ptr;
3748 listNode *ln = listLast(srclist);
3749
3750 if (ln == NULL) {
3751 addReply(c,shared.nullbulk);
3752 } else {
3753 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3754 robj *ele = listNodeValue(ln);
3755 list *dstlist;
3756
3757 if (dobj == NULL) {
3758
3759 /* Create the list if the key does not exist */
3760 dobj = createListObject();
3761 dictAdd(c->db->dict,c->argv[2],dobj);
3762 incrRefCount(c->argv[2]);
3763 } else if (dobj->type != REDIS_LIST) {
3764 addReply(c,shared.wrongtypeerr);
3765 return;
3766 }
3767 /* Add the element to the target list */
3768 dstlist = dobj->ptr;
3769 listAddNodeHead(dstlist,ele);
3770 incrRefCount(ele);
3771
3772 /* Send the element to the client as reply as well */
3773 addReplyBulkLen(c,ele);
3774 addReply(c,ele);
3775 addReply(c,shared.crlf);
3776
3777 /* Finally remove the element from the source list */
3778 listDelNode(srclist,ln);
3779 server.dirty++;
3780 }
3781 }
3782 }
3783 }
3784
3785
3786 /* ==================================== Sets ================================ */
3787
3788 static void saddCommand(redisClient *c) {
3789 robj *set;
3790
3791 set = lookupKeyWrite(c->db,c->argv[1]);
3792 if (set == NULL) {
3793 set = createSetObject();
3794 dictAdd(c->db->dict,c->argv[1],set);
3795 incrRefCount(c->argv[1]);
3796 } else {
3797 if (set->type != REDIS_SET) {
3798 addReply(c,shared.wrongtypeerr);
3799 return;
3800 }
3801 }
3802 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3803 incrRefCount(c->argv[2]);
3804 server.dirty++;
3805 addReply(c,shared.cone);
3806 } else {
3807 addReply(c,shared.czero);
3808 }
3809 }
3810
3811 static void sremCommand(redisClient *c) {
3812 robj *set;
3813
3814 set = lookupKeyWrite(c->db,c->argv[1]);
3815 if (set == NULL) {
3816 addReply(c,shared.czero);
3817 } else {
3818 if (set->type != REDIS_SET) {
3819 addReply(c,shared.wrongtypeerr);
3820 return;
3821 }
3822 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3823 server.dirty++;
3824 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3825 addReply(c,shared.cone);
3826 } else {
3827 addReply(c,shared.czero);
3828 }
3829 }
3830 }
3831
3832 static void smoveCommand(redisClient *c) {
3833 robj *srcset, *dstset;
3834
3835 srcset = lookupKeyWrite(c->db,c->argv[1]);
3836 dstset = lookupKeyWrite(c->db,c->argv[2]);
3837
3838 /* If the source key does not exist return 0, if it's of the wrong type
3839 * raise an error */
3840 if (srcset == NULL || srcset->type != REDIS_SET) {
3841 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3842 return;
3843 }
3844 /* Error if the destination key is not a set as well */
3845 if (dstset && dstset->type != REDIS_SET) {
3846 addReply(c,shared.wrongtypeerr);
3847 return;
3848 }
3849 /* Remove the element from the source set */
3850 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3851 /* Key not found in the src set! return zero */
3852 addReply(c,shared.czero);
3853 return;
3854 }
3855 server.dirty++;
3856 /* Add the element to the destination set */
3857 if (!dstset) {
3858 dstset = createSetObject();
3859 dictAdd(c->db->dict,c->argv[2],dstset);
3860 incrRefCount(c->argv[2]);
3861 }
3862 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3863 incrRefCount(c->argv[3]);
3864 addReply(c,shared.cone);
3865 }
3866
3867 static void sismemberCommand(redisClient *c) {
3868 robj *set;
3869
3870 set = lookupKeyRead(c->db,c->argv[1]);
3871 if (set == NULL) {
3872 addReply(c,shared.czero);
3873 } else {
3874 if (set->type != REDIS_SET) {
3875 addReply(c,shared.wrongtypeerr);
3876 return;
3877 }
3878 if (dictFind(set->ptr,c->argv[2]))
3879 addReply(c,shared.cone);
3880 else
3881 addReply(c,shared.czero);
3882 }
3883 }
3884
3885 static void scardCommand(redisClient *c) {
3886 robj *o;
3887 dict *s;
3888
3889 o = lookupKeyRead(c->db,c->argv[1]);
3890 if (o == NULL) {
3891 addReply(c,shared.czero);
3892 return;
3893 } else {
3894 if (o->type != REDIS_SET) {
3895 addReply(c,shared.wrongtypeerr);
3896 } else {
3897 s = o->ptr;
3898 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
3899 dictSize(s)));
3900 }
3901 }
3902 }
3903
3904 static void spopCommand(redisClient *c) {
3905 robj *set;
3906 dictEntry *de;
3907
3908 set = lookupKeyWrite(c->db,c->argv[1]);
3909 if (set == NULL) {
3910 addReply(c,shared.nullbulk);
3911 } else {
3912 if (set->type != REDIS_SET) {
3913 addReply(c,shared.wrongtypeerr);
3914 return;
3915 }
3916 de = dictGetRandomKey(set->ptr);
3917 if (de == NULL) {
3918 addReply(c,shared.nullbulk);
3919 } else {
3920 robj *ele = dictGetEntryKey(de);
3921
3922 addReplyBulkLen(c,ele);
3923 addReply(c,ele);
3924 addReply(c,shared.crlf);
3925 dictDelete(set->ptr,ele);
3926 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3927 server.dirty++;
3928 }
3929 }
3930 }
3931
3932 static void srandmemberCommand(redisClient *c) {
3933 robj *set;
3934 dictEntry *de;
3935
3936 set = lookupKeyRead(c->db,c->argv[1]);
3937 if (set == NULL) {
3938 addReply(c,shared.nullbulk);
3939 } else {
3940 if (set->type != REDIS_SET) {
3941 addReply(c,shared.wrongtypeerr);
3942 return;
3943 }
3944 de = dictGetRandomKey(set->ptr);
3945 if (de == NULL) {
3946 addReply(c,shared.nullbulk);
3947 } else {
3948 robj *ele = dictGetEntryKey(de);
3949
3950 addReplyBulkLen(c,ele);
3951 addReply(c,ele);
3952 addReply(c,shared.crlf);
3953 }
3954 }
3955 }
3956
3957 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3958 dict **d1 = (void*) s1, **d2 = (void*) s2;
3959
3960 return dictSize(*d1)-dictSize(*d2);
3961 }
3962
3963 static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
3964 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3965 dictIterator *di;
3966 dictEntry *de;
3967 robj *lenobj = NULL, *dstset = NULL;
3968 unsigned long j, cardinality = 0;
3969
3970 for (j = 0; j < setsnum; j++) {
3971 robj *setobj;
3972
3973 setobj = dstkey ?
3974 lookupKeyWrite(c->db,setskeys[j]) :
3975 lookupKeyRead(c->db,setskeys[j]);
3976 if (!setobj) {
3977 zfree(dv);
3978 if (dstkey) {
3979 if (deleteKey(c->db,dstkey))
3980 server.dirty++;
3981 addReply(c,shared.czero);
3982 } else {
3983 addReply(c,shared.nullmultibulk);
3984 }
3985 return;
3986 }
3987 if (setobj->type != REDIS_SET) {
3988 zfree(dv);
3989 addReply(c,shared.wrongtypeerr);
3990 return;
3991 }
3992 dv[j] = setobj->ptr;
3993 }
3994 /* Sort sets from the smallest to largest, this will improve our
3995 * algorithm's performace */
3996 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3997
3998 /* The first thing we should output is the total number of elements...
3999 * since this is a multi-bulk write, but at this stage we don't know
4000 * the intersection set size, so we use a trick, append an empty object
4001 * to the output list and save the pointer to later modify it with the
4002 * right length */
4003 if (!dstkey) {
4004 lenobj = createObject(REDIS_STRING,NULL);
4005 addReply(c,lenobj);
4006 decrRefCount(lenobj);
4007 } else {
4008 /* If we have a target key where to store the resulting set
4009 * create this key with an empty set inside */
4010 dstset = createSetObject();
4011 }
4012
4013 /* Iterate all the elements of the first (smallest) set, and test
4014 * the element against all the other sets, if at least one set does
4015 * not include the element it is discarded */
4016 di = dictGetIterator(dv[0]);
4017
4018 while((de = dictNext(di)) != NULL) {
4019 robj *ele;
4020
4021 for (j = 1; j < setsnum; j++)
4022 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4023 if (j != setsnum)
4024 continue; /* at least one set does not contain the member */
4025 ele = dictGetEntryKey(de);
4026 if (!dstkey) {
4027 addReplyBulkLen(c,ele);
4028 addReply(c,ele);
4029 addReply(c,shared.crlf);
4030 cardinality++;
4031 } else {
4032 dictAdd(dstset->ptr,ele,NULL);
4033 incrRefCount(ele);
4034 }
4035 }
4036 dictReleaseIterator(di);
4037
4038 if (dstkey) {
4039 /* Store the resulting set into the target */
4040 deleteKey(c->db,dstkey);
4041 dictAdd(c->db->dict,dstkey,dstset);
4042 incrRefCount(dstkey);
4043 }
4044
4045 if (!dstkey) {
4046 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
4047 } else {
4048 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4049 dictSize((dict*)dstset->ptr)));
4050 server.dirty++;
4051 }
4052 zfree(dv);
4053 }
4054
4055 static void sinterCommand(redisClient *c) {
4056 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4057 }
4058
4059 static void sinterstoreCommand(redisClient *c) {
4060 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4061 }
4062
4063 #define REDIS_OP_UNION 0
4064 #define REDIS_OP_DIFF 1
4065
4066 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4067 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4068 dictIterator *di;
4069 dictEntry *de;
4070 robj *dstset = NULL;
4071 int j, cardinality = 0;
4072
4073 for (j = 0; j < setsnum; j++) {
4074 robj *setobj;
4075
4076 setobj = dstkey ?
4077 lookupKeyWrite(c->db,setskeys[j]) :
4078 lookupKeyRead(c->db,setskeys[j]);
4079 if (!setobj) {
4080 dv[j] = NULL;
4081 continue;
4082 }
4083 if (setobj->type != REDIS_SET) {
4084 zfree(dv);
4085 addReply(c,shared.wrongtypeerr);
4086 return;
4087 }
4088 dv[j] = setobj->ptr;
4089 }
4090
4091 /* We need a temp set object to store our union. If the dstkey
4092 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4093 * this set object will be the resulting object to set into the target key*/
4094 dstset = createSetObject();
4095
4096 /* Iterate all the elements of all the sets, add every element a single
4097 * time to the result set */
4098 for (j = 0; j < setsnum; j++) {
4099 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4100 if (!dv[j]) continue; /* non existing keys are like empty sets */
4101
4102 di = dictGetIterator(dv[j]);
4103
4104 while((de = dictNext(di)) != NULL) {
4105 robj *ele;
4106
4107 /* dictAdd will not add the same element multiple times */
4108 ele = dictGetEntryKey(de);
4109 if (op == REDIS_OP_UNION || j == 0) {
4110 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4111 incrRefCount(ele);
4112 cardinality++;
4113 }
4114 } else if (op == REDIS_OP_DIFF) {
4115 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4116 cardinality--;
4117 }
4118 }
4119 }
4120 dictReleaseIterator(di);
4121
4122 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4123 }
4124
4125 /* Output the content of the resulting set, if not in STORE mode */
4126 if (!dstkey) {
4127 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4128 di = dictGetIterator(dstset->ptr);
4129 while((de = dictNext(di)) != NULL) {
4130 robj *ele;
4131
4132 ele = dictGetEntryKey(de);
4133 addReplyBulkLen(c,ele);
4134 addReply(c,ele);
4135 addReply(c,shared.crlf);
4136 }
4137 dictReleaseIterator(di);
4138 } else {
4139 /* If we have a target key where to store the resulting set
4140 * create this key with the result set inside */
4141 deleteKey(c->db,dstkey);
4142 dictAdd(c->db->dict,dstkey,dstset);
4143 incrRefCount(dstkey);
4144 }
4145
4146 /* Cleanup */
4147 if (!dstkey) {
4148 decrRefCount(dstset);
4149 } else {
4150 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4151 dictSize((dict*)dstset->ptr)));
4152 server.dirty++;
4153 }
4154 zfree(dv);
4155 }
4156
4157 static void sunionCommand(redisClient *c) {
4158 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4159 }
4160
4161 static void sunionstoreCommand(redisClient *c) {
4162 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4163 }
4164
4165 static void sdiffCommand(redisClient *c) {
4166 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4167 }
4168
4169 static void sdiffstoreCommand(redisClient *c) {
4170 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4171 }
4172
4173 /* ==================================== ZSets =============================== */
4174
4175 /* ZSETs are ordered sets using two data structures to hold the same elements
4176 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4177 * data structure.
4178 *
4179 * The elements are added to an hash table mapping Redis objects to scores.
4180 * At the same time the elements are added to a skip list mapping scores
4181 * to Redis objects (so objects are sorted by scores in this "view"). */
4182
4183 /* This skiplist implementation is almost a C translation of the original
4184 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4185 * Alternative to Balanced Trees", modified in three ways:
4186 * a) this implementation allows for repeated values.
4187 * b) the comparison is not just by key (our 'score') but by satellite data.
4188 * c) there is a back pointer, so it's a doubly linked list with the back
4189 * pointers being only at "level 1". This allows to traverse the list
4190 * from tail to head, useful for ZREVRANGE. */
4191
4192 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4193 zskiplistNode *zn = zmalloc(sizeof(*zn));
4194
4195 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4196 zn->score = score;
4197 zn->obj = obj;
4198 return zn;
4199 }
4200
4201 static zskiplist *zslCreate(void) {
4202 int j;
4203 zskiplist *zsl;
4204
4205 zsl = zmalloc(sizeof(*zsl));
4206 zsl->level = 1;
4207 zsl->length = 0;
4208 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4209 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4210 zsl->header->forward[j] = NULL;
4211 zsl->header->backward = NULL;
4212 zsl->tail = NULL;
4213 return zsl;
4214 }
4215
4216 static void zslFreeNode(zskiplistNode *node) {
4217 decrRefCount(node->obj);
4218 zfree(node->forward);
4219 zfree(node);
4220 }
4221
4222 static void zslFree(zskiplist *zsl) {
4223 zskiplistNode *node = zsl->header->forward[0], *next;
4224
4225 zfree(zsl->header->forward);
4226 zfree(zsl->header);
4227 while(node) {
4228 next = node->forward[0];
4229 zslFreeNode(node);
4230 node = next;
4231 }
4232 zfree(zsl);
4233 }
4234
4235 static int zslRandomLevel(void) {
4236 int level = 1;
4237 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4238 level += 1;
4239 return level;
4240 }
4241
4242 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4243 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4244 int i, level;
4245
4246 x = zsl->header;
4247 for (i = zsl->level-1; i >= 0; i--) {
4248 while (x->forward[i] &&
4249 (x->forward[i]->score < score ||
4250 (x->forward[i]->score == score &&
4251 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4252 x = x->forward[i];
4253 update[i] = x;
4254 }
4255 /* we assume the key is not already inside, since we allow duplicated
4256 * scores, and the re-insertion of score and redis object should never
4257 * happpen since the caller of zslInsert() should test in the hash table
4258 * if the element is already inside or not. */
4259 level = zslRandomLevel();
4260 if (level > zsl->level) {
4261 for (i = zsl->level; i < level; i++)
4262 update[i] = zsl->header;
4263 zsl->level = level;
4264 }
4265 x = zslCreateNode(level,score,obj);
4266 for (i = 0; i < level; i++) {
4267 x->forward[i] = update[i]->forward[i];
4268 update[i]->forward[i] = x;
4269 }
4270 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4271 if (x->forward[0])
4272 x->forward[0]->backward = x;
4273 else
4274 zsl->tail = x;
4275 zsl->length++;
4276 }
4277
4278 /* Delete an element with matching score/object from the skiplist. */
4279 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4280 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4281 int i;
4282
4283 x = zsl->header;
4284 for (i = zsl->level-1; i >= 0; i--) {
4285 while (x->forward[i] &&
4286 (x->forward[i]->score < score ||
4287 (x->forward[i]->score == score &&
4288 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4289 x = x->forward[i];
4290 update[i] = x;
4291 }
4292 /* We may have multiple elements with the same score, what we need
4293 * is to find the element with both the right score and object. */
4294 x = x->forward[0];
4295 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4296 for (i = 0; i < zsl->level; i++) {
4297 if (update[i]->forward[i] != x) break;
4298 update[i]->forward[i] = x->forward[i];
4299 }
4300 if (x->forward[0]) {
4301 x->forward[0]->backward = (x->backward == zsl->header) ?
4302 NULL : x->backward;
4303 } else {
4304 zsl->tail = x->backward;
4305 }
4306 zslFreeNode(x);
4307 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4308 zsl->level--;
4309 zsl->length--;
4310 return 1;
4311 } else {
4312 return 0; /* not found */
4313 }
4314 return 0; /* not found */
4315 }
4316
4317 /* Delete all the elements with score between min and max from the skiplist.
4318 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4319 * Note that this function takes the reference to the hash table view of the
4320 * sorted set, in order to remove the elements from the hash table too. */
4321 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4322 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4323 unsigned long removed = 0;
4324 int i;
4325
4326 x = zsl->header;
4327 for (i = zsl->level-1; i >= 0; i--) {
4328 while (x->forward[i] && x->forward[i]->score < min)
4329 x = x->forward[i];
4330 update[i] = x;
4331 }
4332 /* We may have multiple elements with the same score, what we need
4333 * is to find the element with both the right score and object. */
4334 x = x->forward[0];
4335 while (x && x->score <= max) {
4336 zskiplistNode *next;
4337
4338 for (i = 0; i < zsl->level; i++) {
4339 if (update[i]->forward[i] != x) break;
4340 update[i]->forward[i] = x->forward[i];
4341 }
4342 if (x->forward[0]) {
4343 x->forward[0]->backward = (x->backward == zsl->header) ?
4344 NULL : x->backward;
4345 } else {
4346 zsl->tail = x->backward;
4347 }
4348 next = x->forward[0];
4349 dictDelete(dict,x->obj);
4350 zslFreeNode(x);
4351 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4352 zsl->level--;
4353 zsl->length--;
4354 removed++;
4355 x = next;
4356 }
4357 return removed; /* not found */
4358 }
4359
4360 /* Find the first node having a score equal or greater than the specified one.
4361 * Returns NULL if there is no match. */
4362 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4363 zskiplistNode *x;
4364 int i;
4365
4366 x = zsl->header;
4367 for (i = zsl->level-1; i >= 0; i--) {
4368 while (x->forward[i] && x->forward[i]->score < score)
4369 x = x->forward[i];
4370 }
4371 /* We may have multiple elements with the same score, what we need
4372 * is to find the element with both the right score and object. */
4373 return x->forward[0];
4374 }
4375
4376 /* The actual Z-commands implementations */
4377
4378 /* This generic command implements both ZADD and ZINCRBY.
4379 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4380 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4381 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4382 robj *zsetobj;
4383 zset *zs;
4384 double *score;
4385
4386 zsetobj = lookupKeyWrite(c->db,key);
4387 if (zsetobj == NULL) {
4388 zsetobj = createZsetObject();
4389 dictAdd(c->db->dict,key,zsetobj);
4390 incrRefCount(key);
4391 } else {
4392 if (zsetobj->type != REDIS_ZSET) {
4393 addReply(c,shared.wrongtypeerr);
4394 return;
4395 }
4396 }
4397 zs = zsetobj->ptr;
4398
4399 /* Ok now since we implement both ZADD and ZINCRBY here the code
4400 * needs to handle the two different conditions. It's all about setting
4401 * '*score', that is, the new score to set, to the right value. */
4402 score = zmalloc(sizeof(double));
4403 if (doincrement) {
4404 dictEntry *de;
4405
4406 /* Read the old score. If the element was not present starts from 0 */
4407 de = dictFind(zs->dict,ele);
4408 if (de) {
4409 double *oldscore = dictGetEntryVal(de);
4410 *score = *oldscore + scoreval;
4411 } else {
4412 *score = scoreval;
4413 }
4414 } else {
4415 *score = scoreval;
4416 }
4417
4418 /* What follows is a simple remove and re-insert operation that is common
4419 * to both ZADD and ZINCRBY... */
4420 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4421 /* case 1: New element */
4422 incrRefCount(ele); /* added to hash */
4423 zslInsert(zs->zsl,*score,ele);
4424 incrRefCount(ele); /* added to skiplist */
4425 server.dirty++;
4426 if (doincrement)
4427 addReplyDouble(c,*score);
4428 else
4429 addReply(c,shared.cone);
4430 } else {
4431 dictEntry *de;
4432 double *oldscore;
4433
4434 /* case 2: Score update operation */
4435 de = dictFind(zs->dict,ele);
4436 redisAssert(de != NULL);
4437 oldscore = dictGetEntryVal(de);
4438 if (*score != *oldscore) {
4439 int deleted;
4440
4441 /* Remove and insert the element in the skip list with new score */
4442 deleted = zslDelete(zs->zsl,*oldscore,ele);
4443 redisAssert(deleted != 0);
4444 zslInsert(zs->zsl,*score,ele);
4445 incrRefCount(ele);
4446 /* Update the score in the hash table */
4447 dictReplace(zs->dict,ele,score);
4448 server.dirty++;
4449 } else {
4450 zfree(score);
4451 }
4452 if (doincrement)
4453 addReplyDouble(c,*score);
4454 else
4455 addReply(c,shared.czero);
4456 }
4457 }
4458
4459 static void zaddCommand(redisClient *c) {
4460 double scoreval;
4461
4462 scoreval = strtod(c->argv[2]->ptr,NULL);
4463 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4464 }
4465
4466 static void zincrbyCommand(redisClient *c) {
4467 double scoreval;
4468
4469 scoreval = strtod(c->argv[2]->ptr,NULL);
4470 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4471 }
4472
4473 static void zremCommand(redisClient *c) {
4474 robj *zsetobj;
4475 zset *zs;
4476
4477 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4478 if (zsetobj == NULL) {
4479 addReply(c,shared.czero);
4480 } else {
4481 dictEntry *de;
4482 double *oldscore;
4483 int deleted;
4484
4485 if (zsetobj->type != REDIS_ZSET) {
4486 addReply(c,shared.wrongtypeerr);
4487 return;
4488 }
4489 zs = zsetobj->ptr;
4490 de = dictFind(zs->dict,c->argv[2]);
4491 if (de == NULL) {
4492 addReply(c,shared.czero);
4493 return;
4494 }
4495 /* Delete from the skiplist */
4496 oldscore = dictGetEntryVal(de);
4497 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4498 redisAssert(deleted != 0);
4499
4500 /* Delete from the hash table */
4501 dictDelete(zs->dict,c->argv[2]);
4502 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4503 server.dirty++;
4504 addReply(c,shared.cone);
4505 }
4506 }
4507
4508 static void zremrangebyscoreCommand(redisClient *c) {
4509 double min = strtod(c->argv[2]->ptr,NULL);
4510 double max = strtod(c->argv[3]->ptr,NULL);
4511 robj *zsetobj;
4512 zset *zs;
4513
4514 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4515 if (zsetobj == NULL) {
4516 addReply(c,shared.czero);
4517 } else {
4518 long deleted;
4519
4520 if (zsetobj->type != REDIS_ZSET) {
4521 addReply(c,shared.wrongtypeerr);
4522 return;
4523 }
4524 zs = zsetobj->ptr;
4525 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4526 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4527 server.dirty += deleted;
4528 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4529 }
4530 }
4531
4532 static void zrangeGenericCommand(redisClient *c, int reverse) {
4533 robj *o;
4534 int start = atoi(c->argv[2]->ptr);
4535 int end = atoi(c->argv[3]->ptr);
4536 int withscores = 0;
4537
4538 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
4539 withscores = 1;
4540 } else if (c->argc >= 5) {
4541 addReply(c,shared.syntaxerr);
4542 return;
4543 }
4544
4545 o = lookupKeyRead(c->db,c->argv[1]);
4546 if (o == NULL) {
4547 addReply(c,shared.nullmultibulk);
4548 } else {
4549 if (o->type != REDIS_ZSET) {
4550 addReply(c,shared.wrongtypeerr);
4551 } else {
4552 zset *zsetobj = o->ptr;
4553 zskiplist *zsl = zsetobj->zsl;
4554 zskiplistNode *ln;
4555
4556 int llen = zsl->length;
4557 int rangelen, j;
4558 robj *ele;
4559
4560 /* convert negative indexes */
4561 if (start < 0) start = llen+start;
4562 if (end < 0) end = llen+end;
4563 if (start < 0) start = 0;
4564 if (end < 0) end = 0;
4565
4566 /* indexes sanity checks */
4567 if (start > end || start >= llen) {
4568 /* Out of range start or start > end result in empty list */
4569 addReply(c,shared.emptymultibulk);
4570 return;
4571 }
4572 if (end >= llen) end = llen-1;
4573 rangelen = (end-start)+1;
4574
4575 /* Return the result in form of a multi-bulk reply */
4576 if (reverse) {
4577 ln = zsl->tail;
4578 while (start--)
4579 ln = ln->backward;
4580 } else {
4581 ln = zsl->header->forward[0];
4582 while (start--)
4583 ln = ln->forward[0];
4584 }
4585
4586 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
4587 withscores ? (rangelen*2) : rangelen));
4588 for (j = 0; j < rangelen; j++) {
4589 ele = ln->obj;
4590 addReplyBulkLen(c,ele);
4591 addReply(c,ele);
4592 addReply(c,shared.crlf);
4593 if (withscores)
4594 addReplyDouble(c,ln->score);
4595 ln = reverse ? ln->backward : ln->forward[0];
4596 }
4597 }
4598 }
4599 }
4600
4601 static void zrangeCommand(redisClient *c) {
4602 zrangeGenericCommand(c,0);
4603 }
4604
4605 static void zrevrangeCommand(redisClient *c) {
4606 zrangeGenericCommand(c,1);
4607 }
4608
4609 static void zrangebyscoreCommand(redisClient *c) {
4610 robj *o;
4611 double min = strtod(c->argv[2]->ptr,NULL);
4612 double max = strtod(c->argv[3]->ptr,NULL);
4613 int offset = 0, limit = -1;
4614
4615 if (c->argc != 4 && c->argc != 7) {
4616 addReplySds(c,
4617 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4618 return;
4619 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4620 addReply(c,shared.syntaxerr);
4621 return;
4622 } else if (c->argc == 7) {
4623 offset = atoi(c->argv[5]->ptr);
4624 limit = atoi(c->argv[6]->ptr);
4625 if (offset < 0) offset = 0;
4626 }
4627
4628 o = lookupKeyRead(c->db,c->argv[1]);
4629 if (o == NULL) {
4630 addReply(c,shared.nullmultibulk);
4631 } else {
4632 if (o->type != REDIS_ZSET) {
4633 addReply(c,shared.wrongtypeerr);
4634 } else {
4635 zset *zsetobj = o->ptr;
4636 zskiplist *zsl = zsetobj->zsl;
4637 zskiplistNode *ln;
4638 robj *ele, *lenobj;
4639 unsigned int rangelen = 0;
4640
4641 /* Get the first node with the score >= min */
4642 ln = zslFirstWithScore(zsl,min);
4643 if (ln == NULL) {
4644 /* No element matching the speciifed interval */
4645 addReply(c,shared.emptymultibulk);
4646 return;
4647 }
4648
4649 /* We don't know in advance how many matching elements there
4650 * are in the list, so we push this object that will represent
4651 * the multi-bulk length in the output buffer, and will "fix"
4652 * it later */
4653 lenobj = createObject(REDIS_STRING,NULL);
4654 addReply(c,lenobj);
4655 decrRefCount(lenobj);
4656
4657 while(ln && ln->score <= max) {
4658 if (offset) {
4659 offset--;
4660 ln = ln->forward[0];
4661 continue;
4662 }
4663 if (limit == 0) break;
4664 ele = ln->obj;
4665 addReplyBulkLen(c,ele);
4666 addReply(c,ele);
4667 addReply(c,shared.crlf);
4668 ln = ln->forward[0];
4669 rangelen++;
4670 if (limit > 0) limit--;
4671 }
4672 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4673 }
4674 }
4675 }
4676
4677 static void zcardCommand(redisClient *c) {
4678 robj *o;
4679 zset *zs;
4680
4681 o = lookupKeyRead(c->db,c->argv[1]);
4682 if (o == NULL) {
4683 addReply(c,shared.czero);
4684 return;
4685 } else {
4686 if (o->type != REDIS_ZSET) {
4687 addReply(c,shared.wrongtypeerr);
4688 } else {
4689 zs = o->ptr;
4690 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
4691 }
4692 }
4693 }
4694
4695 static void zscoreCommand(redisClient *c) {
4696 robj *o;
4697 zset *zs;
4698
4699 o = lookupKeyRead(c->db,c->argv[1]);
4700 if (o == NULL) {
4701 addReply(c,shared.nullbulk);
4702 return;
4703 } else {
4704 if (o->type != REDIS_ZSET) {
4705 addReply(c,shared.wrongtypeerr);
4706 } else {
4707 dictEntry *de;
4708
4709 zs = o->ptr;
4710 de = dictFind(zs->dict,c->argv[2]);
4711 if (!de) {
4712 addReply(c,shared.nullbulk);
4713 } else {
4714 double *score = dictGetEntryVal(de);
4715
4716 addReplyDouble(c,*score);
4717 }
4718 }
4719 }
4720 }
4721
4722 /* ========================= Non type-specific commands ==================== */
4723
4724 static void flushdbCommand(redisClient *c) {
4725 server.dirty += dictSize(c->db->dict);
4726 dictEmpty(c->db->dict);
4727 dictEmpty(c->db->expires);
4728 addReply(c,shared.ok);
4729 }
4730
4731 static void flushallCommand(redisClient *c) {
4732 server.dirty += emptyDb();
4733 addReply(c,shared.ok);
4734 rdbSave(server.dbfilename);
4735 server.dirty++;
4736 }
4737
4738 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4739 redisSortOperation *so = zmalloc(sizeof(*so));
4740 so->type = type;
4741 so->pattern = pattern;
4742 return so;
4743 }
4744
4745 /* Return the value associated to the key with a name obtained
4746 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4747 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4748 char *p;
4749 sds spat, ssub;
4750 robj keyobj;
4751 int prefixlen, sublen, postfixlen;
4752 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4753 struct {
4754 long len;
4755 long free;
4756 char buf[REDIS_SORTKEY_MAX+1];
4757 } keyname;
4758
4759 /* If the pattern is "#" return the substitution object itself in order
4760 * to implement the "SORT ... GET #" feature. */
4761 spat = pattern->ptr;
4762 if (spat[0] == '#' && spat[1] == '\0') {
4763 return subst;
4764 }
4765
4766 /* The substitution object may be specially encoded. If so we create
4767 * a decoded object on the fly. Otherwise getDecodedObject will just
4768 * increment the ref count, that we'll decrement later. */
4769 subst = getDecodedObject(subst);
4770
4771 ssub = subst->ptr;
4772 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4773 p = strchr(spat,'*');
4774 if (!p) {
4775 decrRefCount(subst);
4776 return NULL;
4777 }
4778
4779 prefixlen = p-spat;
4780 sublen = sdslen(ssub);
4781 postfixlen = sdslen(spat)-(prefixlen+1);
4782 memcpy(keyname.buf,spat,prefixlen);
4783 memcpy(keyname.buf+prefixlen,ssub,sublen);
4784 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4785 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4786 keyname.len = prefixlen+sublen+postfixlen;
4787
4788 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
4789 decrRefCount(subst);
4790
4791 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4792 return lookupKeyRead(db,&keyobj);
4793 }
4794
4795 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4796 * the additional parameter is not standard but a BSD-specific we have to
4797 * pass sorting parameters via the global 'server' structure */
4798 static int sortCompare(const void *s1, const void *s2) {
4799 const redisSortObject *so1 = s1, *so2 = s2;
4800 int cmp;
4801
4802 if (!server.sort_alpha) {
4803 /* Numeric sorting. Here it's trivial as we precomputed scores */
4804 if (so1->u.score > so2->u.score) {
4805 cmp = 1;
4806 } else if (so1->u.score < so2->u.score) {
4807 cmp = -1;
4808 } else {
4809 cmp = 0;
4810 }
4811 } else {
4812 /* Alphanumeric sorting */
4813 if (server.sort_bypattern) {
4814 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4815 /* At least one compare object is NULL */
4816 if (so1->u.cmpobj == so2->u.cmpobj)
4817 cmp = 0;
4818 else if (so1->u.cmpobj == NULL)
4819 cmp = -1;
4820 else
4821 cmp = 1;
4822 } else {
4823 /* We have both the objects, use strcoll */
4824 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4825 }
4826 } else {
4827 /* Compare elements directly */
4828 robj *dec1, *dec2;
4829
4830 dec1 = getDecodedObject(so1->obj);
4831 dec2 = getDecodedObject(so2->obj);
4832 cmp = strcoll(dec1->ptr,dec2->ptr);
4833 decrRefCount(dec1);
4834 decrRefCount(dec2);
4835 }
4836 }
4837 return server.sort_desc ? -cmp : cmp;
4838 }
4839
4840 /* The SORT command is the most complex command in Redis. Warning: this code
4841 * is optimized for speed and a bit less for readability */
4842 static void sortCommand(redisClient *c) {
4843 list *operations;
4844 int outputlen = 0;
4845 int desc = 0, alpha = 0;
4846 int limit_start = 0, limit_count = -1, start, end;
4847 int j, dontsort = 0, vectorlen;
4848 int getop = 0; /* GET operation counter */
4849 robj *sortval, *sortby = NULL, *storekey = NULL;
4850 redisSortObject *vector; /* Resulting vector to sort */
4851
4852 /* Lookup the key to sort. It must be of the right types */
4853 sortval = lookupKeyRead(c->db,c->argv[1]);
4854 if (sortval == NULL) {
4855 addReply(c,shared.nullmultibulk);
4856 return;
4857 }
4858 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
4859 sortval->type != REDIS_ZSET)
4860 {
4861 addReply(c,shared.wrongtypeerr);
4862 return;
4863 }
4864
4865 /* Create a list of operations to perform for every sorted element.
4866 * Operations can be GET/DEL/INCR/DECR */
4867 operations = listCreate();
4868 listSetFreeMethod(operations,zfree);
4869 j = 2;
4870
4871 /* Now we need to protect sortval incrementing its count, in the future
4872 * SORT may have options able to overwrite/delete keys during the sorting
4873 * and the sorted key itself may get destroied */
4874 incrRefCount(sortval);
4875
4876 /* The SORT command has an SQL-alike syntax, parse it */
4877 while(j < c->argc) {
4878 int leftargs = c->argc-j-1;
4879 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4880 desc = 0;
4881 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4882 desc = 1;
4883 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4884 alpha = 1;
4885 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4886 limit_start = atoi(c->argv[j+1]->ptr);
4887 limit_count = atoi(c->argv[j+2]->ptr);
4888 j+=2;
4889 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4890 storekey = c->argv[j+1];
4891 j++;
4892 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4893 sortby = c->argv[j+1];
4894 /* If the BY pattern does not contain '*', i.e. it is constant,
4895 * we don't need to sort nor to lookup the weight keys. */
4896 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4897 j++;
4898 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4899 listAddNodeTail(operations,createSortOperation(
4900 REDIS_SORT_GET,c->argv[j+1]));
4901 getop++;
4902 j++;
4903 } else {
4904 decrRefCount(sortval);
4905 listRelease(operations);
4906 addReply(c,shared.syntaxerr);
4907 return;
4908 }
4909 j++;
4910 }
4911
4912 /* Load the sorting vector with all the objects to sort */
4913 switch(sortval->type) {
4914 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
4915 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
4916 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
4917 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
4918 }
4919 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4920 j = 0;
4921
4922 if (sortval->type == REDIS_LIST) {
4923 list *list = sortval->ptr;
4924 listNode *ln;
4925
4926 listRewind(list);
4927 while((ln = listYield(list))) {
4928 robj *ele = ln->value;
4929 vector[j].obj = ele;
4930 vector[j].u.score = 0;
4931 vector[j].u.cmpobj = NULL;
4932 j++;
4933 }
4934 } else {
4935 dict *set;
4936 dictIterator *di;
4937 dictEntry *setele;
4938
4939 if (sortval->type == REDIS_SET) {
4940 set = sortval->ptr;
4941 } else {
4942 zset *zs = sortval->ptr;
4943 set = zs->dict;
4944 }
4945
4946 di = dictGetIterator(set);
4947 while((setele = dictNext(di)) != NULL) {
4948 vector[j].obj = dictGetEntryKey(setele);
4949 vector[j].u.score = 0;
4950 vector[j].u.cmpobj = NULL;
4951 j++;
4952 }
4953 dictReleaseIterator(di);
4954 }
4955 redisAssert(j == vectorlen);
4956
4957 /* Now it's time to load the right scores in the sorting vector */
4958 if (dontsort == 0) {
4959 for (j = 0; j < vectorlen; j++) {
4960 if (sortby) {
4961 robj *byval;
4962
4963 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4964 if (!byval || byval->type != REDIS_STRING) continue;
4965 if (alpha) {
4966 vector[j].u.cmpobj = getDecodedObject(byval);
4967 } else {
4968 if (byval->encoding == REDIS_ENCODING_RAW) {
4969 vector[j].u.score = strtod(byval->ptr,NULL);
4970 } else {
4971 /* Don't need to decode the object if it's
4972 * integer-encoded (the only encoding supported) so
4973 * far. We can just cast it */
4974 if (byval->encoding == REDIS_ENCODING_INT) {
4975 vector[j].u.score = (long)byval->ptr;
4976 } else
4977 redisAssert(1 != 1);
4978 }
4979 }
4980 } else {
4981 if (!alpha) {
4982 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4983 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4984 else {
4985 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4986 vector[j].u.score = (long) vector[j].obj->ptr;
4987 else
4988 redisAssert(1 != 1);
4989 }
4990 }
4991 }
4992 }
4993 }
4994
4995 /* We are ready to sort the vector... perform a bit of sanity check
4996 * on the LIMIT option too. We'll use a partial version of quicksort. */
4997 start = (limit_start < 0) ? 0 : limit_start;
4998 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4999 if (start >= vectorlen) {
5000 start = vectorlen-1;
5001 end = vectorlen-2;
5002 }
5003 if (end >= vectorlen) end = vectorlen-1;
5004
5005 if (dontsort == 0) {
5006 server.sort_desc = desc;
5007 server.sort_alpha = alpha;
5008 server.sort_bypattern = sortby ? 1 : 0;
5009 if (sortby && (start != 0 || end != vectorlen-1))
5010 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
5011 else
5012 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
5013 }
5014
5015 /* Send command output to the output buffer, performing the specified
5016 * GET/DEL/INCR/DECR operations if any. */
5017 outputlen = getop ? getop*(end-start+1) : end-start+1;
5018 if (storekey == NULL) {
5019 /* STORE option not specified, sent the sorting result to client */
5020 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
5021 for (j = start; j <= end; j++) {
5022 listNode *ln;
5023 if (!getop) {
5024 addReplyBulkLen(c,vector[j].obj);
5025 addReply(c,vector[j].obj);
5026 addReply(c,shared.crlf);
5027 }
5028 listRewind(operations);
5029 while((ln = listYield(operations))) {
5030 redisSortOperation *sop = ln->value;
5031 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5032 vector[j].obj);
5033
5034 if (sop->type == REDIS_SORT_GET) {
5035 if (!val || val->type != REDIS_STRING) {
5036 addReply(c,shared.nullbulk);
5037 } else {
5038 addReplyBulkLen(c,val);
5039 addReply(c,val);
5040 addReply(c,shared.crlf);
5041 }
5042 } else {
5043 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5044 }
5045 }
5046 }
5047 } else {
5048 robj *listObject = createListObject();
5049 list *listPtr = (list*) listObject->ptr;
5050
5051 /* STORE option specified, set the sorting result as a List object */
5052 for (j = start; j <= end; j++) {
5053 listNode *ln;
5054 if (!getop) {
5055 listAddNodeTail(listPtr,vector[j].obj);
5056 incrRefCount(vector[j].obj);
5057 }
5058 listRewind(operations);
5059 while((ln = listYield(operations))) {
5060 redisSortOperation *sop = ln->value;
5061 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5062 vector[j].obj);
5063
5064 if (sop->type == REDIS_SORT_GET) {
5065 if (!val || val->type != REDIS_STRING) {
5066 listAddNodeTail(listPtr,createStringObject("",0));
5067 } else {
5068 listAddNodeTail(listPtr,val);
5069 incrRefCount(val);
5070 }
5071 } else {
5072 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5073 }
5074 }
5075 }
5076 if (dictReplace(c->db->dict,storekey,listObject)) {
5077 incrRefCount(storekey);
5078 }
5079 /* Note: we add 1 because the DB is dirty anyway since even if the
5080 * SORT result is empty a new key is set and maybe the old content
5081 * replaced. */
5082 server.dirty += 1+outputlen;
5083 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5084 }
5085
5086 /* Cleanup */
5087 decrRefCount(sortval);
5088 listRelease(operations);
5089 for (j = 0; j < vectorlen; j++) {
5090 if (sortby && alpha && vector[j].u.cmpobj)
5091 decrRefCount(vector[j].u.cmpobj);
5092 }
5093 zfree(vector);
5094 }
5095
5096 /* Create the string returned by the INFO command. This is decoupled
5097 * by the INFO command itself as we need to report the same information
5098 * on memory corruption problems. */
5099 static sds genRedisInfoString(void) {
5100 sds info;
5101 time_t uptime = time(NULL)-server.stat_starttime;
5102 int j;
5103
5104 info = sdscatprintf(sdsempty(),
5105 "redis_version:%s\r\n"
5106 "arch_bits:%s\r\n"
5107 "multiplexing_api:%s\r\n"
5108 "uptime_in_seconds:%ld\r\n"
5109 "uptime_in_days:%ld\r\n"
5110 "connected_clients:%d\r\n"
5111 "connected_slaves:%d\r\n"
5112 "used_memory:%zu\r\n"
5113 "changes_since_last_save:%lld\r\n"
5114 "bgsave_in_progress:%d\r\n"
5115 "last_save_time:%ld\r\n"
5116 "bgrewriteaof_in_progress:%d\r\n"
5117 "total_connections_received:%lld\r\n"
5118 "total_commands_processed:%lld\r\n"
5119 "role:%s\r\n"
5120 ,REDIS_VERSION,
5121 (sizeof(long) == 8) ? "64" : "32",
5122 aeGetApiName(),
5123 uptime,
5124 uptime/(3600*24),
5125 listLength(server.clients)-listLength(server.slaves),
5126 listLength(server.slaves),
5127 server.usedmemory,
5128 server.dirty,
5129 server.bgsavechildpid != -1,
5130 server.lastsave,
5131 server.bgrewritechildpid != -1,
5132 server.stat_numconnections,
5133 server.stat_numcommands,
5134 server.masterhost == NULL ? "master" : "slave"
5135 );
5136 if (server.masterhost) {
5137 info = sdscatprintf(info,
5138 "master_host:%s\r\n"
5139 "master_port:%d\r\n"
5140 "master_link_status:%s\r\n"
5141 "master_last_io_seconds_ago:%d\r\n"
5142 ,server.masterhost,
5143 server.masterport,
5144 (server.replstate == REDIS_REPL_CONNECTED) ?
5145 "up" : "down",
5146 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5147 );
5148 }
5149 for (j = 0; j < server.dbnum; j++) {
5150 long long keys, vkeys;
5151
5152 keys = dictSize(server.db[j].dict);
5153 vkeys = dictSize(server.db[j].expires);
5154 if (keys || vkeys) {
5155 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5156 j, keys, vkeys);
5157 }
5158 }
5159 return info;
5160 }
5161
5162 static void infoCommand(redisClient *c) {
5163 sds info = genRedisInfoString();
5164 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5165 (unsigned long)sdslen(info)));
5166 addReplySds(c,info);
5167 addReply(c,shared.crlf);
5168 }
5169
5170 static void monitorCommand(redisClient *c) {
5171 /* ignore MONITOR if aleady slave or in monitor mode */
5172 if (c->flags & REDIS_SLAVE) return;
5173
5174 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5175 c->slaveseldb = 0;
5176 listAddNodeTail(server.monitors,c);
5177 addReply(c,shared.ok);
5178 }
5179
5180 /* ================================= Expire ================================= */
5181 static int removeExpire(redisDb *db, robj *key) {
5182 if (dictDelete(db->expires,key) == DICT_OK) {
5183 return 1;
5184 } else {
5185 return 0;
5186 }
5187 }
5188
5189 static int setExpire(redisDb *db, robj *key, time_t when) {
5190 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5191 return 0;
5192 } else {
5193 incrRefCount(key);
5194 return 1;
5195 }
5196 }
5197
5198 /* Return the expire time of the specified key, or -1 if no expire
5199 * is associated with this key (i.e. the key is non volatile) */
5200 static time_t getExpire(redisDb *db, robj *key) {
5201 dictEntry *de;
5202
5203 /* No expire? return ASAP */
5204 if (dictSize(db->expires) == 0 ||
5205 (de = dictFind(db->expires,key)) == NULL) return -1;
5206
5207 return (time_t) dictGetEntryVal(de);
5208 }
5209
5210 static int expireIfNeeded(redisDb *db, robj *key) {
5211 time_t when;
5212 dictEntry *de;
5213
5214 /* No expire? return ASAP */
5215 if (dictSize(db->expires) == 0 ||
5216 (de = dictFind(db->expires,key)) == NULL) return 0;
5217
5218 /* Lookup the expire */
5219 when = (time_t) dictGetEntryVal(de);
5220 if (time(NULL) <= when) return 0;
5221
5222 /* Delete the key */
5223 dictDelete(db->expires,key);
5224 return dictDelete(db->dict,key) == DICT_OK;
5225 }
5226
5227 static int deleteIfVolatile(redisDb *db, robj *key) {
5228 dictEntry *de;
5229
5230 /* No expire? return ASAP */
5231 if (dictSize(db->expires) == 0 ||
5232 (de = dictFind(db->expires,key)) == NULL) return 0;
5233
5234 /* Delete the key */
5235 server.dirty++;
5236 dictDelete(db->expires,key);
5237 return dictDelete(db->dict,key) == DICT_OK;
5238 }
5239
5240 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5241 dictEntry *de;
5242
5243 de = dictFind(c->db->dict,key);
5244 if (de == NULL) {
5245 addReply(c,shared.czero);
5246 return;
5247 }
5248 if (seconds < 0) {
5249 if (deleteKey(c->db,key)) server.dirty++;
5250 addReply(c, shared.cone);
5251 return;
5252 } else {
5253 time_t when = time(NULL)+seconds;
5254 if (setExpire(c->db,key,when)) {
5255 addReply(c,shared.cone);
5256 server.dirty++;
5257 } else {
5258 addReply(c,shared.czero);
5259 }
5260 return;
5261 }
5262 }
5263
5264 static void expireCommand(redisClient *c) {
5265 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5266 }
5267
5268 static void expireatCommand(redisClient *c) {
5269 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5270 }
5271
5272 static void ttlCommand(redisClient *c) {
5273 time_t expire;
5274 int ttl = -1;
5275
5276 expire = getExpire(c->db,c->argv[1]);
5277 if (expire != -1) {
5278 ttl = (int) (expire-time(NULL));
5279 if (ttl < 0) ttl = -1;
5280 }
5281 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5282 }
5283
5284 /* =============================== Replication ============================= */
5285
5286 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5287 ssize_t nwritten, ret = size;
5288 time_t start = time(NULL);
5289
5290 timeout++;
5291 while(size) {
5292 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5293 nwritten = write(fd,ptr,size);
5294 if (nwritten == -1) return -1;
5295 ptr += nwritten;
5296 size -= nwritten;
5297 }
5298 if ((time(NULL)-start) > timeout) {
5299 errno = ETIMEDOUT;
5300 return -1;
5301 }
5302 }
5303 return ret;
5304 }
5305
5306 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5307 ssize_t nread, totread = 0;
5308 time_t start = time(NULL);
5309
5310 timeout++;
5311 while(size) {
5312 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5313 nread = read(fd,ptr,size);
5314 if (nread == -1) return -1;
5315 ptr += nread;
5316 size -= nread;
5317 totread += nread;
5318 }
5319 if ((time(NULL)-start) > timeout) {
5320 errno = ETIMEDOUT;
5321 return -1;
5322 }
5323 }
5324 return totread;
5325 }
5326
5327 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5328 ssize_t nread = 0;
5329
5330 size--;
5331 while(size) {
5332 char c;
5333
5334 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5335 if (c == '\n') {
5336 *ptr = '\0';
5337 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5338 return nread;
5339 } else {
5340 *ptr++ = c;
5341 *ptr = '\0';
5342 nread++;
5343 }
5344 }
5345 return nread;
5346 }
5347
5348 static void syncCommand(redisClient *c) {
5349 /* ignore SYNC if aleady slave or in monitor mode */
5350 if (c->flags & REDIS_SLAVE) return;
5351
5352 /* SYNC can't be issued when the server has pending data to send to
5353 * the client about already issued commands. We need a fresh reply
5354 * buffer registering the differences between the BGSAVE and the current
5355 * dataset, so that we can copy to other slaves if needed. */
5356 if (listLength(c->reply) != 0) {
5357 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5358 return;
5359 }
5360
5361 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5362 /* Here we need to check if there is a background saving operation
5363 * in progress, or if it is required to start one */
5364 if (server.bgsavechildpid != -1) {
5365 /* Ok a background save is in progress. Let's check if it is a good
5366 * one for replication, i.e. if there is another slave that is
5367 * registering differences since the server forked to save */
5368 redisClient *slave;
5369 listNode *ln;
5370
5371 listRewind(server.slaves);
5372 while((ln = listYield(server.slaves))) {
5373 slave = ln->value;
5374 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5375 }
5376 if (ln) {
5377 /* Perfect, the server is already registering differences for
5378 * another slave. Set the right state, and copy the buffer. */
5379 listRelease(c->reply);
5380 c->reply = listDup(slave->reply);
5381 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5382 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5383 } else {
5384 /* No way, we need to wait for the next BGSAVE in order to
5385 * register differences */
5386 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5387 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5388 }
5389 } else {
5390 /* Ok we don't have a BGSAVE in progress, let's start one */
5391 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5392 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5393 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5394 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5395 return;
5396 }
5397 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5398 }
5399 c->repldbfd = -1;
5400 c->flags |= REDIS_SLAVE;
5401 c->slaveseldb = 0;
5402 listAddNodeTail(server.slaves,c);
5403 return;
5404 }
5405
5406 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5407 redisClient *slave = privdata;
5408 REDIS_NOTUSED(el);
5409 REDIS_NOTUSED(mask);
5410 char buf[REDIS_IOBUF_LEN];
5411 ssize_t nwritten, buflen;
5412
5413 if (slave->repldboff == 0) {
5414 /* Write the bulk write count before to transfer the DB. In theory here
5415 * we don't know how much room there is in the output buffer of the
5416 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5417 * operations) will never be smaller than the few bytes we need. */
5418 sds bulkcount;
5419
5420 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5421 slave->repldbsize);
5422 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5423 {
5424 sdsfree(bulkcount);
5425 freeClient(slave);
5426 return;
5427 }
5428 sdsfree(bulkcount);
5429 }
5430 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5431 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5432 if (buflen <= 0) {
5433 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5434 (buflen == 0) ? "premature EOF" : strerror(errno));
5435 freeClient(slave);
5436 return;
5437 }
5438 if ((nwritten = write(fd,buf,buflen)) == -1) {
5439 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5440 strerror(errno));
5441 freeClient(slave);
5442 return;
5443 }
5444 slave->repldboff += nwritten;
5445 if (slave->repldboff == slave->repldbsize) {
5446 close(slave->repldbfd);
5447 slave->repldbfd = -1;
5448 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5449 slave->replstate = REDIS_REPL_ONLINE;
5450 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5451 sendReplyToClient, slave) == AE_ERR) {
5452 freeClient(slave);
5453 return;
5454 }
5455 addReplySds(slave,sdsempty());
5456 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5457 }
5458 }
5459
5460 /* This function is called at the end of every backgrond saving.
5461 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5462 * otherwise REDIS_ERR is passed to the function.
5463 *
5464 * The goal of this function is to handle slaves waiting for a successful
5465 * background saving in order to perform non-blocking synchronization. */
5466 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5467 listNode *ln;
5468 int startbgsave = 0;
5469
5470 listRewind(server.slaves);
5471 while((ln = listYield(server.slaves))) {
5472 redisClient *slave = ln->value;
5473
5474 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5475 startbgsave = 1;
5476 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5477 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5478 struct redis_stat buf;
5479
5480 if (bgsaveerr != REDIS_OK) {
5481 freeClient(slave);
5482 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5483 continue;
5484 }
5485 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5486 redis_fstat(slave->repldbfd,&buf) == -1) {
5487 freeClient(slave);
5488 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5489 continue;
5490 }
5491 slave->repldboff = 0;
5492 slave->repldbsize = buf.st_size;
5493 slave->replstate = REDIS_REPL_SEND_BULK;
5494 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5495 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5496 freeClient(slave);
5497 continue;
5498 }
5499 }
5500 }
5501 if (startbgsave) {
5502 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5503 listRewind(server.slaves);
5504 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5505 while((ln = listYield(server.slaves))) {
5506 redisClient *slave = ln->value;
5507
5508 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5509 freeClient(slave);
5510 }
5511 }
5512 }
5513 }
5514
5515 static int syncWithMaster(void) {
5516 char buf[1024], tmpfile[256], authcmd[1024];
5517 int dumpsize;
5518 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5519 int dfd;
5520
5521 if (fd == -1) {
5522 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5523 strerror(errno));
5524 return REDIS_ERR;
5525 }
5526
5527 /* AUTH with the master if required. */
5528 if(server.masterauth) {
5529 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5530 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5531 close(fd);
5532 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5533 strerror(errno));
5534 return REDIS_ERR;
5535 }
5536 /* Read the AUTH result. */
5537 if (syncReadLine(fd,buf,1024,3600) == -1) {
5538 close(fd);
5539 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5540 strerror(errno));
5541 return REDIS_ERR;
5542 }
5543 if (buf[0] != '+') {
5544 close(fd);
5545 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5546 return REDIS_ERR;
5547 }
5548 }
5549
5550 /* Issue the SYNC command */
5551 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5552 close(fd);
5553 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5554 strerror(errno));
5555 return REDIS_ERR;
5556 }
5557 /* Read the bulk write count */
5558 if (syncReadLine(fd,buf,1024,3600) == -1) {
5559 close(fd);
5560 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5561 strerror(errno));
5562 return REDIS_ERR;
5563 }
5564 if (buf[0] != '$') {
5565 close(fd);
5566 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5567 return REDIS_ERR;
5568 }
5569 dumpsize = atoi(buf+1);
5570 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5571 /* Read the bulk write data on a temp file */
5572 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5573 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5574 if (dfd == -1) {
5575 close(fd);
5576 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5577 return REDIS_ERR;
5578 }
5579 while(dumpsize) {
5580 int nread, nwritten;
5581
5582 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5583 if (nread == -1) {
5584 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5585 strerror(errno));
5586 close(fd);
5587 close(dfd);
5588 return REDIS_ERR;
5589 }
5590 nwritten = write(dfd,buf,nread);
5591 if (nwritten == -1) {
5592 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5593 close(fd);
5594 close(dfd);
5595 return REDIS_ERR;
5596 }
5597 dumpsize -= nread;
5598 }
5599 close(dfd);
5600 if (rename(tmpfile,server.dbfilename) == -1) {
5601 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5602 unlink(tmpfile);
5603 close(fd);
5604 return REDIS_ERR;
5605 }
5606 emptyDb();
5607 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5608 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5609 close(fd);
5610 return REDIS_ERR;
5611 }
5612 server.master = createClient(fd);
5613 server.master->flags |= REDIS_MASTER;
5614 server.master->authenticated = 1;
5615 server.replstate = REDIS_REPL_CONNECTED;
5616 return REDIS_OK;
5617 }
5618
5619 static void slaveofCommand(redisClient *c) {
5620 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5621 !strcasecmp(c->argv[2]->ptr,"one")) {
5622 if (server.masterhost) {
5623 sdsfree(server.masterhost);
5624 server.masterhost = NULL;
5625 if (server.master) freeClient(server.master);
5626 server.replstate = REDIS_REPL_NONE;
5627 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5628 }
5629 } else {
5630 sdsfree(server.masterhost);
5631 server.masterhost = sdsdup(c->argv[1]->ptr);
5632 server.masterport = atoi(c->argv[2]->ptr);
5633 if (server.master) freeClient(server.master);
5634 server.replstate = REDIS_REPL_CONNECT;
5635 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5636 server.masterhost, server.masterport);
5637 }
5638 addReply(c,shared.ok);
5639 }
5640
5641 /* ============================ Maxmemory directive ======================== */
5642
5643 /* This function gets called when 'maxmemory' is set on the config file to limit
5644 * the max memory used by the server, and we are out of memory.
5645 * This function will try to, in order:
5646 *
5647 * - Free objects from the free list
5648 * - Try to remove keys with an EXPIRE set
5649 *
5650 * It is not possible to free enough memory to reach used-memory < maxmemory
5651 * the server will start refusing commands that will enlarge even more the
5652 * memory usage.
5653 */
5654 static void freeMemoryIfNeeded(void) {
5655 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5656 if (listLength(server.objfreelist)) {
5657 robj *o;
5658
5659 listNode *head = listFirst(server.objfreelist);
5660 o = listNodeValue(head);
5661 listDelNode(server.objfreelist,head);
5662 zfree(o);
5663 } else {
5664 int j, k, freed = 0;
5665
5666 for (j = 0; j < server.dbnum; j++) {
5667 int minttl = -1;
5668 robj *minkey = NULL;
5669 struct dictEntry *de;
5670
5671 if (dictSize(server.db[j].expires)) {
5672 freed = 1;
5673 /* From a sample of three keys drop the one nearest to
5674 * the natural expire */
5675 for (k = 0; k < 3; k++) {
5676 time_t t;
5677
5678 de = dictGetRandomKey(server.db[j].expires);
5679 t = (time_t) dictGetEntryVal(de);
5680 if (minttl == -1 || t < minttl) {
5681 minkey = dictGetEntryKey(de);
5682 minttl = t;
5683 }
5684 }
5685 deleteKey(server.db+j,minkey);
5686 }
5687 }
5688 if (!freed) return; /* nothing to free... */
5689 }
5690 }
5691 }
5692
5693 /* ============================== Append Only file ========================== */
5694
5695 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5696 sds buf = sdsempty();
5697 int j;
5698 ssize_t nwritten;
5699 time_t now;
5700 robj *tmpargv[3];
5701
5702 /* The DB this command was targetting is not the same as the last command
5703 * we appendend. To issue a SELECT command is needed. */
5704 if (dictid != server.appendseldb) {
5705 char seldb[64];
5706
5707 snprintf(seldb,sizeof(seldb),"%d",dictid);
5708 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
5709 (unsigned long)strlen(seldb),seldb);
5710 server.appendseldb = dictid;
5711 }
5712
5713 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5714 * EXPIREs into EXPIREATs calls */
5715 if (cmd->proc == expireCommand) {
5716 long when;
5717
5718 tmpargv[0] = createStringObject("EXPIREAT",8);
5719 tmpargv[1] = argv[1];
5720 incrRefCount(argv[1]);
5721 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5722 tmpargv[2] = createObject(REDIS_STRING,
5723 sdscatprintf(sdsempty(),"%ld",when));
5724 argv = tmpargv;
5725 }
5726
5727 /* Append the actual command */
5728 buf = sdscatprintf(buf,"*%d\r\n",argc);
5729 for (j = 0; j < argc; j++) {
5730 robj *o = argv[j];
5731
5732 o = getDecodedObject(o);
5733 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
5734 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5735 buf = sdscatlen(buf,"\r\n",2);
5736 decrRefCount(o);
5737 }
5738
5739 /* Free the objects from the modified argv for EXPIREAT */
5740 if (cmd->proc == expireCommand) {
5741 for (j = 0; j < 3; j++)
5742 decrRefCount(argv[j]);
5743 }
5744
5745 /* We want to perform a single write. This should be guaranteed atomic
5746 * at least if the filesystem we are writing is a real physical one.
5747 * While this will save us against the server being killed I don't think
5748 * there is much to do about the whole server stopping for power problems
5749 * or alike */
5750 nwritten = write(server.appendfd,buf,sdslen(buf));
5751 if (nwritten != (signed)sdslen(buf)) {
5752 /* Ooops, we are in troubles. The best thing to do for now is
5753 * to simply exit instead to give the illusion that everything is
5754 * working as expected. */
5755 if (nwritten == -1) {
5756 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5757 } else {
5758 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5759 }
5760 exit(1);
5761 }
5762 /* If a background append only file rewriting is in progress we want to
5763 * accumulate the differences between the child DB and the current one
5764 * in a buffer, so that when the child process will do its work we
5765 * can append the differences to the new append only file. */
5766 if (server.bgrewritechildpid != -1)
5767 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
5768
5769 sdsfree(buf);
5770 now = time(NULL);
5771 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5772 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5773 now-server.lastfsync > 1))
5774 {
5775 fsync(server.appendfd); /* Let's try to get this data on the disk */
5776 server.lastfsync = now;
5777 }
5778 }
5779
5780 /* In Redis commands are always executed in the context of a client, so in
5781 * order to load the append only file we need to create a fake client. */
5782 static struct redisClient *createFakeClient(void) {
5783 struct redisClient *c = zmalloc(sizeof(*c));
5784
5785 selectDb(c,0);
5786 c->fd = -1;
5787 c->querybuf = sdsempty();
5788 c->argc = 0;
5789 c->argv = NULL;
5790 c->flags = 0;
5791 /* We set the fake client as a slave waiting for the synchronization
5792 * so that Redis will not try to send replies to this client. */
5793 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5794 c->reply = listCreate();
5795 listSetFreeMethod(c->reply,decrRefCount);
5796 listSetDupMethod(c->reply,dupClientReplyValue);
5797 return c;
5798 }
5799
5800 static void freeFakeClient(struct redisClient *c) {
5801 sdsfree(c->querybuf);
5802 listRelease(c->reply);
5803 zfree(c);
5804 }
5805
5806 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5807 * error (the append only file is zero-length) REDIS_ERR is returned. On
5808 * fatal error an error message is logged and the program exists. */
5809 int loadAppendOnlyFile(char *filename) {
5810 struct redisClient *fakeClient;
5811 FILE *fp = fopen(filename,"r");
5812 struct redis_stat sb;
5813
5814 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5815 return REDIS_ERR;
5816
5817 if (fp == NULL) {
5818 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5819 exit(1);
5820 }
5821
5822 fakeClient = createFakeClient();
5823 while(1) {
5824 int argc, j;
5825 unsigned long len;
5826 robj **argv;
5827 char buf[128];
5828 sds argsds;
5829 struct redisCommand *cmd;
5830
5831 if (fgets(buf,sizeof(buf),fp) == NULL) {
5832 if (feof(fp))
5833 break;
5834 else
5835 goto readerr;
5836 }
5837 if (buf[0] != '*') goto fmterr;
5838 argc = atoi(buf+1);
5839 argv = zmalloc(sizeof(robj*)*argc);
5840 for (j = 0; j < argc; j++) {
5841 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5842 if (buf[0] != '$') goto fmterr;
5843 len = strtol(buf+1,NULL,10);
5844 argsds = sdsnewlen(NULL,len);
5845 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
5846 argv[j] = createObject(REDIS_STRING,argsds);
5847 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5848 }
5849
5850 /* Command lookup */
5851 cmd = lookupCommand(argv[0]->ptr);
5852 if (!cmd) {
5853 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5854 exit(1);
5855 }
5856 /* Try object sharing and encoding */
5857 if (server.shareobjects) {
5858 int j;
5859 for(j = 1; j < argc; j++)
5860 argv[j] = tryObjectSharing(argv[j]);
5861 }
5862 if (cmd->flags & REDIS_CMD_BULK)
5863 tryObjectEncoding(argv[argc-1]);
5864 /* Run the command in the context of a fake client */
5865 fakeClient->argc = argc;
5866 fakeClient->argv = argv;
5867 cmd->proc(fakeClient);
5868 /* Discard the reply objects list from the fake client */
5869 while(listLength(fakeClient->reply))
5870 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5871 /* Clean up, ready for the next command */
5872 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5873 zfree(argv);
5874 }
5875 fclose(fp);
5876 freeFakeClient(fakeClient);
5877 return REDIS_OK;
5878
5879 readerr:
5880 if (feof(fp)) {
5881 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5882 } else {
5883 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5884 }
5885 exit(1);
5886 fmterr:
5887 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5888 exit(1);
5889 }
5890
5891 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5892 static int fwriteBulk(FILE *fp, robj *obj) {
5893 char buf[128];
5894 obj = getDecodedObject(obj);
5895 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
5896 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
5897 if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0)
5898 goto err;
5899 if (fwrite("\r\n",2,1,fp) == 0) goto err;
5900 decrRefCount(obj);
5901 return 1;
5902 err:
5903 decrRefCount(obj);
5904 return 0;
5905 }
5906
5907 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5908 static int fwriteBulkDouble(FILE *fp, double d) {
5909 char buf[128], dbuf[128];
5910
5911 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
5912 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
5913 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5914 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
5915 return 1;
5916 }
5917
5918 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5919 static int fwriteBulkLong(FILE *fp, long l) {
5920 char buf[128], lbuf[128];
5921
5922 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
5923 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
5924 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5925 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
5926 return 1;
5927 }
5928
5929 /* Write a sequence of commands able to fully rebuild the dataset into
5930 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5931 static int rewriteAppendOnlyFile(char *filename) {
5932 dictIterator *di = NULL;
5933 dictEntry *de;
5934 FILE *fp;
5935 char tmpfile[256];
5936 int j;
5937 time_t now = time(NULL);
5938
5939 /* Note that we have to use a different temp name here compared to the
5940 * one used by rewriteAppendOnlyFileBackground() function. */
5941 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
5942 fp = fopen(tmpfile,"w");
5943 if (!fp) {
5944 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
5945 return REDIS_ERR;
5946 }
5947 for (j = 0; j < server.dbnum; j++) {
5948 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
5949 redisDb *db = server.db+j;
5950 dict *d = db->dict;
5951 if (dictSize(d) == 0) continue;
5952 di = dictGetIterator(d);
5953 if (!di) {
5954 fclose(fp);
5955 return REDIS_ERR;
5956 }
5957
5958 /* SELECT the new DB */
5959 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
5960 if (fwriteBulkLong(fp,j) == 0) goto werr;
5961
5962 /* Iterate this DB writing every entry */
5963 while((de = dictNext(di)) != NULL) {
5964 robj *key = dictGetEntryKey(de);
5965 robj *o = dictGetEntryVal(de);
5966 time_t expiretime = getExpire(db,key);
5967
5968 /* Save the key and associated value */
5969 if (o->type == REDIS_STRING) {
5970 /* Emit a SET command */
5971 char cmd[]="*3\r\n$3\r\nSET\r\n";
5972 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5973 /* Key and value */
5974 if (fwriteBulk(fp,key) == 0) goto werr;
5975 if (fwriteBulk(fp,o) == 0) goto werr;
5976 } else if (o->type == REDIS_LIST) {
5977 /* Emit the RPUSHes needed to rebuild the list */
5978 list *list = o->ptr;
5979 listNode *ln;
5980
5981 listRewind(list);
5982 while((ln = listYield(list))) {
5983 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
5984 robj *eleobj = listNodeValue(ln);
5985
5986 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5987 if (fwriteBulk(fp,key) == 0) goto werr;
5988 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5989 }
5990 } else if (o->type == REDIS_SET) {
5991 /* Emit the SADDs needed to rebuild the set */
5992 dict *set = o->ptr;
5993 dictIterator *di = dictGetIterator(set);
5994 dictEntry *de;
5995
5996 while((de = dictNext(di)) != NULL) {
5997 char cmd[]="*3\r\n$4\r\nSADD\r\n";
5998 robj *eleobj = dictGetEntryKey(de);
5999
6000 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6001 if (fwriteBulk(fp,key) == 0) goto werr;
6002 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6003 }
6004 dictReleaseIterator(di);
6005 } else if (o->type == REDIS_ZSET) {
6006 /* Emit the ZADDs needed to rebuild the sorted set */
6007 zset *zs = o->ptr;
6008 dictIterator *di = dictGetIterator(zs->dict);
6009 dictEntry *de;
6010
6011 while((de = dictNext(di)) != NULL) {
6012 char cmd[]="*4\r\n$4\r\nZADD\r\n";
6013 robj *eleobj = dictGetEntryKey(de);
6014 double *score = dictGetEntryVal(de);
6015
6016 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6017 if (fwriteBulk(fp,key) == 0) goto werr;
6018 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
6019 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6020 }
6021 dictReleaseIterator(di);
6022 } else {
6023 redisAssert(0 != 0);
6024 }
6025 /* Save the expire time */
6026 if (expiretime != -1) {
6027 char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
6028 /* If this key is already expired skip it */
6029 if (expiretime < now) continue;
6030 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6031 if (fwriteBulk(fp,key) == 0) goto werr;
6032 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6033 }
6034 }
6035 dictReleaseIterator(di);
6036 }
6037
6038 /* Make sure data will not remain on the OS's output buffers */
6039 fflush(fp);
6040 fsync(fileno(fp));
6041 fclose(fp);
6042
6043 /* Use RENAME to make sure the DB file is changed atomically only
6044 * if the generate DB file is ok. */
6045 if (rename(tmpfile,filename) == -1) {
6046 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6047 unlink(tmpfile);
6048 return REDIS_ERR;
6049 }
6050 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6051 return REDIS_OK;
6052
6053 werr:
6054 fclose(fp);
6055 unlink(tmpfile);
6056 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
6057 if (di) dictReleaseIterator(di);
6058 return REDIS_ERR;
6059 }
6060
6061 /* This is how rewriting of the append only file in background works:
6062 *
6063 * 1) The user calls BGREWRITEAOF
6064 * 2) Redis calls this function, that forks():
6065 * 2a) the child rewrite the append only file in a temp file.
6066 * 2b) the parent accumulates differences in server.bgrewritebuf.
6067 * 3) When the child finished '2a' exists.
6068 * 4) The parent will trap the exit code, if it's OK, will append the
6069 * data accumulated into server.bgrewritebuf into the temp file, and
6070 * finally will rename(2) the temp file in the actual file name.
6071 * The the new file is reopened as the new append only file. Profit!
6072 */
6073 static int rewriteAppendOnlyFileBackground(void) {
6074 pid_t childpid;
6075
6076 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6077 if ((childpid = fork()) == 0) {
6078 /* Child */
6079 char tmpfile[256];
6080 close(server.fd);
6081
6082 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6083 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6084 exit(0);
6085 } else {
6086 exit(1);
6087 }
6088 } else {
6089 /* Parent */
6090 if (childpid == -1) {
6091 redisLog(REDIS_WARNING,
6092 "Can't rewrite append only file in background: fork: %s",
6093 strerror(errno));
6094 return REDIS_ERR;
6095 }
6096 redisLog(REDIS_NOTICE,
6097 "Background append only file rewriting started by pid %d",childpid);
6098 server.bgrewritechildpid = childpid;
6099 /* We set appendseldb to -1 in order to force the next call to the
6100 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6101 * accumulated by the parent into server.bgrewritebuf will start
6102 * with a SELECT statement and it will be safe to merge. */
6103 server.appendseldb = -1;
6104 return REDIS_OK;
6105 }
6106 return REDIS_OK; /* unreached */
6107 }
6108
6109 static void bgrewriteaofCommand(redisClient *c) {
6110 if (server.bgrewritechildpid != -1) {
6111 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6112 return;
6113 }
6114 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6115 char *status = "+Background append only file rewriting started\r\n";
6116 addReplySds(c,sdsnew(status));
6117 } else {
6118 addReply(c,shared.err);
6119 }
6120 }
6121
6122 static void aofRemoveTempFile(pid_t childpid) {
6123 char tmpfile[256];
6124
6125 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6126 unlink(tmpfile);
6127 }
6128
6129 /* ================================= Debugging ============================== */
6130
6131 static void debugCommand(redisClient *c) {
6132 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6133 *((char*)-1) = 'x';
6134 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6135 if (rdbSave(server.dbfilename) != REDIS_OK) {
6136 addReply(c,shared.err);
6137 return;
6138 }
6139 emptyDb();
6140 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6141 addReply(c,shared.err);
6142 return;
6143 }
6144 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6145 addReply(c,shared.ok);
6146 } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
6147 emptyDb();
6148 if (loadAppendOnlyFile(server.appendfilename) != REDIS_OK) {
6149 addReply(c,shared.err);
6150 return;
6151 }
6152 redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
6153 addReply(c,shared.ok);
6154 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6155 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6156 robj *key, *val;
6157
6158 if (!de) {
6159 addReply(c,shared.nokeyerr);
6160 return;
6161 }
6162 key = dictGetEntryKey(de);
6163 val = dictGetEntryVal(de);
6164 addReplySds(c,sdscatprintf(sdsempty(),
6165 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6166 (void*)key, key->refcount, (void*)val, val->refcount,
6167 val->encoding));
6168 } else {
6169 addReplySds(c,sdsnew(
6170 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6171 }
6172 }
6173
6174 static void _redisAssert(char *estr) {
6175 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6176 redisLog(REDIS_WARNING,"==> %s\n",estr);
6177 #ifdef HAVE_BACKTRACE
6178 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6179 *((char*)-1) = 'x';
6180 #endif
6181 }
6182
6183 /* =================================== Main! ================================ */
6184
6185 #ifdef __linux__
6186 int linuxOvercommitMemoryValue(void) {
6187 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6188 char buf[64];
6189
6190 if (!fp) return -1;
6191 if (fgets(buf,64,fp) == NULL) {
6192 fclose(fp);
6193 return -1;
6194 }
6195 fclose(fp);
6196
6197 return atoi(buf);
6198 }
6199
6200 void linuxOvercommitMemoryWarning(void) {
6201 if (linuxOvercommitMemoryValue() == 0) {
6202 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6203 }
6204 }
6205 #endif /* __linux__ */
6206
6207 static void daemonize(void) {
6208 int fd;
6209 FILE *fp;
6210
6211 if (fork() != 0) exit(0); /* parent exits */
6212 printf("New pid: %d\n", getpid());
6213 setsid(); /* create a new session */
6214
6215 /* Every output goes to /dev/null. If Redis is daemonized but
6216 * the 'logfile' is set to 'stdout' in the configuration file
6217 * it will not log at all. */
6218 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6219 dup2(fd, STDIN_FILENO);
6220 dup2(fd, STDOUT_FILENO);
6221 dup2(fd, STDERR_FILENO);
6222 if (fd > STDERR_FILENO) close(fd);
6223 }
6224 /* Try to write the pid file */
6225 fp = fopen(server.pidfile,"w");
6226 if (fp) {
6227 fprintf(fp,"%d\n",getpid());
6228 fclose(fp);
6229 }
6230 }
6231
6232 int main(int argc, char **argv) {
6233 initServerConfig();
6234 if (argc == 2) {
6235 resetServerSaveParams();
6236 loadServerConfig(argv[1]);
6237 } else if (argc > 2) {
6238 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6239 exit(1);
6240 } else {
6241 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6242 }
6243 if (server.daemonize) daemonize();
6244 initServer();
6245 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6246 #ifdef __linux__
6247 linuxOvercommitMemoryWarning();
6248 #endif
6249 if (server.appendonly) {
6250 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6251 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6252 } else {
6253 if (rdbLoad(server.dbfilename) == REDIS_OK)
6254 redisLog(REDIS_NOTICE,"DB loaded from disk");
6255 }
6256 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6257 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6258 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6259 aeMain(server.el);
6260 aeDeleteEventLoop(server.el);
6261 return 0;
6262 }
6263
6264 /* ============================= Backtrace support ========================= */
6265
6266 #ifdef HAVE_BACKTRACE
6267 static char *findFuncName(void *pointer, unsigned long *offset);
6268
6269 static void *getMcontextEip(ucontext_t *uc) {
6270 #if defined(__FreeBSD__)
6271 return (void*) uc->uc_mcontext.mc_eip;
6272 #elif defined(__dietlibc__)
6273 return (void*) uc->uc_mcontext.eip;
6274 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6275 #if __x86_64__
6276 return (void*) uc->uc_mcontext->__ss.__rip;
6277 #else
6278 return (void*) uc->uc_mcontext->__ss.__eip;
6279 #endif
6280 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6281 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6282 return (void*) uc->uc_mcontext->__ss.__rip;
6283 #else
6284 return (void*) uc->uc_mcontext->__ss.__eip;
6285 #endif
6286 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6287 return (void*) uc->uc_mcontext.gregs[REG_EIP];
6288 #elif defined(__ia64__) /* Linux IA64 */
6289 return (void*) uc->uc_mcontext.sc_ip;
6290 #else
6291 return NULL;
6292 #endif
6293 }
6294
6295 static void segvHandler(int sig, siginfo_t *info, void *secret) {
6296 void *trace[100];
6297 char **messages = NULL;
6298 int i, trace_size = 0;
6299 unsigned long offset=0;
6300 ucontext_t *uc = (ucontext_t*) secret;
6301 sds infostring;
6302 REDIS_NOTUSED(info);
6303
6304 redisLog(REDIS_WARNING,
6305 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6306 infostring = genRedisInfoString();
6307 redisLog(REDIS_WARNING, "%s",infostring);
6308 /* It's not safe to sdsfree() the returned string under memory
6309 * corruption conditions. Let it leak as we are going to abort */
6310
6311 trace_size = backtrace(trace, 100);
6312 /* overwrite sigaction with caller's address */
6313 if (getMcontextEip(uc) != NULL) {
6314 trace[1] = getMcontextEip(uc);
6315 }
6316 messages = backtrace_symbols(trace, trace_size);
6317
6318 for (i=1; i<trace_size; ++i) {
6319 char *fn = findFuncName(trace[i], &offset), *p;
6320
6321 p = strchr(messages[i],'+');
6322 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6323 redisLog(REDIS_WARNING,"%s", messages[i]);
6324 } else {
6325 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6326 }
6327 }
6328 // free(messages); Don't call free() with possibly corrupted memory.
6329 exit(0);
6330 }
6331
6332 static void setupSigSegvAction(void) {
6333 struct sigaction act;
6334
6335 sigemptyset (&act.sa_mask);
6336 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6337 * is used. Otherwise, sa_handler is used */
6338 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6339 act.sa_sigaction = segvHandler;
6340 sigaction (SIGSEGV, &act, NULL);
6341 sigaction (SIGBUS, &act, NULL);
6342 sigaction (SIGFPE, &act, NULL);
6343 sigaction (SIGILL, &act, NULL);
6344 sigaction (SIGBUS, &act, NULL);
6345 return;
6346 }
6347
6348 #include "staticsymbols.h"
6349 /* This function try to convert a pointer into a function name. It's used in
6350 * oreder to provide a backtrace under segmentation fault that's able to
6351 * display functions declared as static (otherwise the backtrace is useless). */
6352 static char *findFuncName(void *pointer, unsigned long *offset){
6353 int i, ret = -1;
6354 unsigned long off, minoff = 0;
6355
6356 /* Try to match against the Symbol with the smallest offset */
6357 for (i=0; symsTable[i].pointer; i++) {
6358 unsigned long lp = (unsigned long) pointer;
6359
6360 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6361 off=lp-symsTable[i].pointer;
6362 if (ret < 0 || off < minoff) {
6363 minoff=off;
6364 ret=i;
6365 }
6366 }
6367 }
6368 if (ret == -1) return NULL;
6369 *offset = minoff;
6370 return symsTable[ret].name;
6371 }
6372 #else /* HAVE_BACKTRACE */
6373 static void setupSigSegvAction(void) {
6374 }
6375 #endif /* HAVE_BACKTRACE */
6376
6377
6378
6379 /* The End */
6380
6381
6382