]> git.saurik.com Git - redis.git/blob - redis.c
modified a bit the ZREVRANGE test to cover a few lines of code more
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.1.93"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Client flags */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
165
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
174
175 /* List related stuff */
176 #define REDIS_HEAD 0
177 #define REDIS_TAIL 1
178
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
184
185 /* Log levels */
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
189
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
192
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
195
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
200
201 /* We can print the stacktrace, so our assert is defined this way: */
202 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
203 static void _redisAssert(char *estr);
204
205 /*================================= Data types ============================== */
206
207 /* A redis object, that is a type able to hold a string / list / set */
208 typedef struct redisObject {
209 void *ptr;
210 unsigned char type;
211 unsigned char encoding;
212 unsigned char notused[2];
213 int refcount;
214 } robj;
215
216 /* Macro used to initalize a Redis object allocated on the stack.
217 * Note that this macro is taken near the structure definition to make sure
218 * we'll update it when the structure is changed, to avoid bugs like
219 * bug #85 introduced exactly in this way. */
220 #define initStaticStringObject(_var,_ptr) do { \
221 _var.refcount = 1; \
222 _var.type = REDIS_STRING; \
223 _var.encoding = REDIS_ENCODING_RAW; \
224 _var.ptr = _ptr; \
225 } while(0);
226
227 typedef struct redisDb {
228 dict *dict;
229 dict *expires;
230 int id;
231 } redisDb;
232
233 /* With multiplexing we need to take per-clinet state.
234 * Clients are taken in a liked list. */
235 typedef struct redisClient {
236 int fd;
237 redisDb *db;
238 int dictid;
239 sds querybuf;
240 robj **argv, **mbargv;
241 int argc, mbargc;
242 int bulklen; /* bulk read len. -1 if not in bulk read mode */
243 int multibulk; /* multi bulk command format active */
244 list *reply;
245 int sentlen;
246 time_t lastinteraction; /* time of the last interaction, used for timeout */
247 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
248 int slaveseldb; /* slave selected db, if this client is a slave */
249 int authenticated; /* when requirepass is non-NULL */
250 int replstate; /* replication state if this is a slave */
251 int repldbfd; /* replication DB file descriptor */
252 long repldboff; /* replication DB file offset */
253 off_t repldbsize; /* replication DB file size */
254 } redisClient;
255
256 struct saveparam {
257 time_t seconds;
258 int changes;
259 };
260
261 /* Global server state structure */
262 struct redisServer {
263 int port;
264 int fd;
265 redisDb *db;
266 dict *sharingpool;
267 unsigned int sharingpoolsize;
268 long long dirty; /* changes to DB from the last save */
269 list *clients;
270 list *slaves, *monitors;
271 char neterr[ANET_ERR_LEN];
272 aeEventLoop *el;
273 int cronloops; /* number of times the cron function run */
274 list *objfreelist; /* A list of freed objects to avoid malloc() */
275 time_t lastsave; /* Unix time of last save succeeede */
276 size_t usedmemory; /* Used memory in megabytes */
277 /* Fields used only for stats */
278 time_t stat_starttime; /* server start time */
279 long long stat_numcommands; /* number of processed commands */
280 long long stat_numconnections; /* number of connections received */
281 /* Configuration */
282 int verbosity;
283 int glueoutputbuf;
284 int maxidletime;
285 int dbnum;
286 int daemonize;
287 int appendonly;
288 int appendfsync;
289 time_t lastfsync;
290 int appendfd;
291 int appendseldb;
292 char *pidfile;
293 pid_t bgsavechildpid;
294 pid_t bgrewritechildpid;
295 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
296 struct saveparam *saveparams;
297 int saveparamslen;
298 char *logfile;
299 char *bindaddr;
300 char *dbfilename;
301 char *appendfilename;
302 char *requirepass;
303 int shareobjects;
304 int rdbcompression;
305 /* Replication related */
306 int isslave;
307 char *masterauth;
308 char *masterhost;
309 int masterport;
310 redisClient *master; /* client that is master for this slave */
311 int replstate;
312 unsigned int maxclients;
313 unsigned long maxmemory;
314 /* Sort parameters - qsort_r() is only available under BSD so we
315 * have to take this state global, in order to pass it to sortCompare() */
316 int sort_desc;
317 int sort_alpha;
318 int sort_bypattern;
319 };
320
321 typedef void redisCommandProc(redisClient *c);
322 struct redisCommand {
323 char *name;
324 redisCommandProc *proc;
325 int arity;
326 int flags;
327 };
328
329 struct redisFunctionSym {
330 char *name;
331 unsigned long pointer;
332 };
333
334 typedef struct _redisSortObject {
335 robj *obj;
336 union {
337 double score;
338 robj *cmpobj;
339 } u;
340 } redisSortObject;
341
342 typedef struct _redisSortOperation {
343 int type;
344 robj *pattern;
345 } redisSortOperation;
346
347 /* ZSETs use a specialized version of Skiplists */
348
349 typedef struct zskiplistNode {
350 struct zskiplistNode **forward;
351 struct zskiplistNode *backward;
352 double score;
353 robj *obj;
354 } zskiplistNode;
355
356 typedef struct zskiplist {
357 struct zskiplistNode *header, *tail;
358 unsigned long length;
359 int level;
360 } zskiplist;
361
362 typedef struct zset {
363 dict *dict;
364 zskiplist *zsl;
365 } zset;
366
367 /* Our shared "common" objects */
368
369 struct sharedObjectsStruct {
370 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
371 *colon, *nullbulk, *nullmultibulk,
372 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
373 *outofrangeerr, *plus,
374 *select0, *select1, *select2, *select3, *select4,
375 *select5, *select6, *select7, *select8, *select9;
376 } shared;
377
378 /* Global vars that are actally used as constants. The following double
379 * values are used for double on-disk serialization, and are initialized
380 * at runtime to avoid strange compiler optimizations. */
381
382 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
383
384 /*================================ Prototypes =============================== */
385
386 static void freeStringObject(robj *o);
387 static void freeListObject(robj *o);
388 static void freeSetObject(robj *o);
389 static void decrRefCount(void *o);
390 static robj *createObject(int type, void *ptr);
391 static void freeClient(redisClient *c);
392 static int rdbLoad(char *filename);
393 static void addReply(redisClient *c, robj *obj);
394 static void addReplySds(redisClient *c, sds s);
395 static void incrRefCount(robj *o);
396 static int rdbSaveBackground(char *filename);
397 static robj *createStringObject(char *ptr, size_t len);
398 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
399 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
400 static int syncWithMaster(void);
401 static robj *tryObjectSharing(robj *o);
402 static int tryObjectEncoding(robj *o);
403 static robj *getDecodedObject(robj *o);
404 static int removeExpire(redisDb *db, robj *key);
405 static int expireIfNeeded(redisDb *db, robj *key);
406 static int deleteIfVolatile(redisDb *db, robj *key);
407 static int deleteKey(redisDb *db, robj *key);
408 static time_t getExpire(redisDb *db, robj *key);
409 static int setExpire(redisDb *db, robj *key, time_t when);
410 static void updateSlavesWaitingBgsave(int bgsaveerr);
411 static void freeMemoryIfNeeded(void);
412 static int processCommand(redisClient *c);
413 static void setupSigSegvAction(void);
414 static void rdbRemoveTempFile(pid_t childpid);
415 static void aofRemoveTempFile(pid_t childpid);
416 static size_t stringObjectLen(robj *o);
417 static void processInputBuffer(redisClient *c);
418 static zskiplist *zslCreate(void);
419 static void zslFree(zskiplist *zsl);
420 static void zslInsert(zskiplist *zsl, double score, robj *obj);
421 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
422
423 static void authCommand(redisClient *c);
424 static void pingCommand(redisClient *c);
425 static void echoCommand(redisClient *c);
426 static void setCommand(redisClient *c);
427 static void setnxCommand(redisClient *c);
428 static void getCommand(redisClient *c);
429 static void delCommand(redisClient *c);
430 static void existsCommand(redisClient *c);
431 static void incrCommand(redisClient *c);
432 static void decrCommand(redisClient *c);
433 static void incrbyCommand(redisClient *c);
434 static void decrbyCommand(redisClient *c);
435 static void selectCommand(redisClient *c);
436 static void randomkeyCommand(redisClient *c);
437 static void keysCommand(redisClient *c);
438 static void dbsizeCommand(redisClient *c);
439 static void lastsaveCommand(redisClient *c);
440 static void saveCommand(redisClient *c);
441 static void bgsaveCommand(redisClient *c);
442 static void bgrewriteaofCommand(redisClient *c);
443 static void shutdownCommand(redisClient *c);
444 static void moveCommand(redisClient *c);
445 static void renameCommand(redisClient *c);
446 static void renamenxCommand(redisClient *c);
447 static void lpushCommand(redisClient *c);
448 static void rpushCommand(redisClient *c);
449 static void lpopCommand(redisClient *c);
450 static void rpopCommand(redisClient *c);
451 static void llenCommand(redisClient *c);
452 static void lindexCommand(redisClient *c);
453 static void lrangeCommand(redisClient *c);
454 static void ltrimCommand(redisClient *c);
455 static void typeCommand(redisClient *c);
456 static void lsetCommand(redisClient *c);
457 static void saddCommand(redisClient *c);
458 static void sremCommand(redisClient *c);
459 static void smoveCommand(redisClient *c);
460 static void sismemberCommand(redisClient *c);
461 static void scardCommand(redisClient *c);
462 static void spopCommand(redisClient *c);
463 static void srandmemberCommand(redisClient *c);
464 static void sinterCommand(redisClient *c);
465 static void sinterstoreCommand(redisClient *c);
466 static void sunionCommand(redisClient *c);
467 static void sunionstoreCommand(redisClient *c);
468 static void sdiffCommand(redisClient *c);
469 static void sdiffstoreCommand(redisClient *c);
470 static void syncCommand(redisClient *c);
471 static void flushdbCommand(redisClient *c);
472 static void flushallCommand(redisClient *c);
473 static void sortCommand(redisClient *c);
474 static void lremCommand(redisClient *c);
475 static void rpoplpushcommand(redisClient *c);
476 static void infoCommand(redisClient *c);
477 static void mgetCommand(redisClient *c);
478 static void monitorCommand(redisClient *c);
479 static void expireCommand(redisClient *c);
480 static void expireatCommand(redisClient *c);
481 static void getsetCommand(redisClient *c);
482 static void ttlCommand(redisClient *c);
483 static void slaveofCommand(redisClient *c);
484 static void debugCommand(redisClient *c);
485 static void msetCommand(redisClient *c);
486 static void msetnxCommand(redisClient *c);
487 static void zaddCommand(redisClient *c);
488 static void zincrbyCommand(redisClient *c);
489 static void zrangeCommand(redisClient *c);
490 static void zrangebyscoreCommand(redisClient *c);
491 static void zrevrangeCommand(redisClient *c);
492 static void zcardCommand(redisClient *c);
493 static void zremCommand(redisClient *c);
494 static void zscoreCommand(redisClient *c);
495 static void zremrangebyscoreCommand(redisClient *c);
496
497 /*================================= Globals ================================= */
498
499 /* Global vars */
500 static struct redisServer server; /* server global state */
501 static struct redisCommand cmdTable[] = {
502 {"get",getCommand,2,REDIS_CMD_INLINE},
503 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
504 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
505 {"del",delCommand,-2,REDIS_CMD_INLINE},
506 {"exists",existsCommand,2,REDIS_CMD_INLINE},
507 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
508 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
509 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
510 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
511 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
512 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
513 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
514 {"llen",llenCommand,2,REDIS_CMD_INLINE},
515 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
516 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
517 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
518 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
519 {"lrem",lremCommand,4,REDIS_CMD_BULK},
520 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
521 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
522 {"srem",sremCommand,3,REDIS_CMD_BULK},
523 {"smove",smoveCommand,4,REDIS_CMD_BULK},
524 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
525 {"scard",scardCommand,2,REDIS_CMD_INLINE},
526 {"spop",spopCommand,2,REDIS_CMD_INLINE},
527 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
528 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
529 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
530 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
531 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
532 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
533 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
534 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
535 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
536 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
537 {"zrem",zremCommand,3,REDIS_CMD_BULK},
538 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
539 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
540 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
541 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
542 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
543 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
544 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
545 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
546 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
547 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
548 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
549 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
550 {"select",selectCommand,2,REDIS_CMD_INLINE},
551 {"move",moveCommand,3,REDIS_CMD_INLINE},
552 {"rename",renameCommand,3,REDIS_CMD_INLINE},
553 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
554 {"expire",expireCommand,3,REDIS_CMD_INLINE},
555 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
556 {"keys",keysCommand,2,REDIS_CMD_INLINE},
557 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
558 {"auth",authCommand,2,REDIS_CMD_INLINE},
559 {"ping",pingCommand,1,REDIS_CMD_INLINE},
560 {"echo",echoCommand,2,REDIS_CMD_BULK},
561 {"save",saveCommand,1,REDIS_CMD_INLINE},
562 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
563 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
564 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
565 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
566 {"type",typeCommand,2,REDIS_CMD_INLINE},
567 {"sync",syncCommand,1,REDIS_CMD_INLINE},
568 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
569 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
570 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
571 {"info",infoCommand,1,REDIS_CMD_INLINE},
572 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
573 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
574 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
575 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
576 {NULL,NULL,0,0}
577 };
578
579 /*============================ Utility functions ============================ */
580
581 /* Glob-style pattern matching. */
582 int stringmatchlen(const char *pattern, int patternLen,
583 const char *string, int stringLen, int nocase)
584 {
585 while(patternLen) {
586 switch(pattern[0]) {
587 case '*':
588 while (pattern[1] == '*') {
589 pattern++;
590 patternLen--;
591 }
592 if (patternLen == 1)
593 return 1; /* match */
594 while(stringLen) {
595 if (stringmatchlen(pattern+1, patternLen-1,
596 string, stringLen, nocase))
597 return 1; /* match */
598 string++;
599 stringLen--;
600 }
601 return 0; /* no match */
602 break;
603 case '?':
604 if (stringLen == 0)
605 return 0; /* no match */
606 string++;
607 stringLen--;
608 break;
609 case '[':
610 {
611 int not, match;
612
613 pattern++;
614 patternLen--;
615 not = pattern[0] == '^';
616 if (not) {
617 pattern++;
618 patternLen--;
619 }
620 match = 0;
621 while(1) {
622 if (pattern[0] == '\\') {
623 pattern++;
624 patternLen--;
625 if (pattern[0] == string[0])
626 match = 1;
627 } else if (pattern[0] == ']') {
628 break;
629 } else if (patternLen == 0) {
630 pattern--;
631 patternLen++;
632 break;
633 } else if (pattern[1] == '-' && patternLen >= 3) {
634 int start = pattern[0];
635 int end = pattern[2];
636 int c = string[0];
637 if (start > end) {
638 int t = start;
639 start = end;
640 end = t;
641 }
642 if (nocase) {
643 start = tolower(start);
644 end = tolower(end);
645 c = tolower(c);
646 }
647 pattern += 2;
648 patternLen -= 2;
649 if (c >= start && c <= end)
650 match = 1;
651 } else {
652 if (!nocase) {
653 if (pattern[0] == string[0])
654 match = 1;
655 } else {
656 if (tolower((int)pattern[0]) == tolower((int)string[0]))
657 match = 1;
658 }
659 }
660 pattern++;
661 patternLen--;
662 }
663 if (not)
664 match = !match;
665 if (!match)
666 return 0; /* no match */
667 string++;
668 stringLen--;
669 break;
670 }
671 case '\\':
672 if (patternLen >= 2) {
673 pattern++;
674 patternLen--;
675 }
676 /* fall through */
677 default:
678 if (!nocase) {
679 if (pattern[0] != string[0])
680 return 0; /* no match */
681 } else {
682 if (tolower((int)pattern[0]) != tolower((int)string[0]))
683 return 0; /* no match */
684 }
685 string++;
686 stringLen--;
687 break;
688 }
689 pattern++;
690 patternLen--;
691 if (stringLen == 0) {
692 while(*pattern == '*') {
693 pattern++;
694 patternLen--;
695 }
696 break;
697 }
698 }
699 if (patternLen == 0 && stringLen == 0)
700 return 1;
701 return 0;
702 }
703
704 static void redisLog(int level, const char *fmt, ...) {
705 va_list ap;
706 FILE *fp;
707
708 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
709 if (!fp) return;
710
711 va_start(ap, fmt);
712 if (level >= server.verbosity) {
713 char *c = ".-*";
714 char buf[64];
715 time_t now;
716
717 now = time(NULL);
718 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
719 fprintf(fp,"%s %c ",buf,c[level]);
720 vfprintf(fp, fmt, ap);
721 fprintf(fp,"\n");
722 fflush(fp);
723 }
724 va_end(ap);
725
726 if (server.logfile) fclose(fp);
727 }
728
729 /*====================== Hash table type implementation ==================== */
730
731 /* This is an hash table type that uses the SDS dynamic strings libary as
732 * keys and radis objects as values (objects can hold SDS strings,
733 * lists, sets). */
734
735 static void dictVanillaFree(void *privdata, void *val)
736 {
737 DICT_NOTUSED(privdata);
738 zfree(val);
739 }
740
741 static int sdsDictKeyCompare(void *privdata, const void *key1,
742 const void *key2)
743 {
744 int l1,l2;
745 DICT_NOTUSED(privdata);
746
747 l1 = sdslen((sds)key1);
748 l2 = sdslen((sds)key2);
749 if (l1 != l2) return 0;
750 return memcmp(key1, key2, l1) == 0;
751 }
752
753 static void dictRedisObjectDestructor(void *privdata, void *val)
754 {
755 DICT_NOTUSED(privdata);
756
757 decrRefCount(val);
758 }
759
760 static int dictObjKeyCompare(void *privdata, const void *key1,
761 const void *key2)
762 {
763 const robj *o1 = key1, *o2 = key2;
764 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
765 }
766
767 static unsigned int dictObjHash(const void *key) {
768 const robj *o = key;
769 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
770 }
771
772 static int dictEncObjKeyCompare(void *privdata, const void *key1,
773 const void *key2)
774 {
775 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
776 int cmp;
777
778 o1 = getDecodedObject(o1);
779 o2 = getDecodedObject(o2);
780 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
781 decrRefCount(o1);
782 decrRefCount(o2);
783 return cmp;
784 }
785
786 static unsigned int dictEncObjHash(const void *key) {
787 robj *o = (robj*) key;
788
789 o = getDecodedObject(o);
790 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
791 decrRefCount(o);
792 return hash;
793 }
794
795 static dictType setDictType = {
796 dictEncObjHash, /* hash function */
797 NULL, /* key dup */
798 NULL, /* val dup */
799 dictEncObjKeyCompare, /* key compare */
800 dictRedisObjectDestructor, /* key destructor */
801 NULL /* val destructor */
802 };
803
804 static dictType zsetDictType = {
805 dictEncObjHash, /* hash function */
806 NULL, /* key dup */
807 NULL, /* val dup */
808 dictEncObjKeyCompare, /* key compare */
809 dictRedisObjectDestructor, /* key destructor */
810 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
811 };
812
813 static dictType hashDictType = {
814 dictObjHash, /* hash function */
815 NULL, /* key dup */
816 NULL, /* val dup */
817 dictObjKeyCompare, /* key compare */
818 dictRedisObjectDestructor, /* key destructor */
819 dictRedisObjectDestructor /* val destructor */
820 };
821
822 /* ========================= Random utility functions ======================= */
823
824 /* Redis generally does not try to recover from out of memory conditions
825 * when allocating objects or strings, it is not clear if it will be possible
826 * to report this condition to the client since the networking layer itself
827 * is based on heap allocation for send buffers, so we simply abort.
828 * At least the code will be simpler to read... */
829 static void oom(const char *msg) {
830 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
831 sleep(1);
832 abort();
833 }
834
835 /* ====================== Redis server networking stuff ===================== */
836 static void closeTimedoutClients(void) {
837 redisClient *c;
838 listNode *ln;
839 time_t now = time(NULL);
840
841 listRewind(server.clients);
842 while ((ln = listYield(server.clients)) != NULL) {
843 c = listNodeValue(ln);
844 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
845 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
846 (now - c->lastinteraction > server.maxidletime)) {
847 redisLog(REDIS_DEBUG,"Closing idle client");
848 freeClient(c);
849 }
850 }
851 }
852
853 static int htNeedsResize(dict *dict) {
854 long long size, used;
855
856 size = dictSlots(dict);
857 used = dictSize(dict);
858 return (size && used && size > DICT_HT_INITIAL_SIZE &&
859 (used*100/size < REDIS_HT_MINFILL));
860 }
861
862 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
863 * we resize the hash table to save memory */
864 static void tryResizeHashTables(void) {
865 int j;
866
867 for (j = 0; j < server.dbnum; j++) {
868 if (htNeedsResize(server.db[j].dict)) {
869 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
870 dictResize(server.db[j].dict);
871 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
872 }
873 if (htNeedsResize(server.db[j].expires))
874 dictResize(server.db[j].expires);
875 }
876 }
877
878 /* A background saving child (BGSAVE) terminated its work. Handle this. */
879 void backgroundSaveDoneHandler(int statloc) {
880 int exitcode = WEXITSTATUS(statloc);
881 int bysignal = WIFSIGNALED(statloc);
882
883 if (!bysignal && exitcode == 0) {
884 redisLog(REDIS_NOTICE,
885 "Background saving terminated with success");
886 server.dirty = 0;
887 server.lastsave = time(NULL);
888 } else if (!bysignal && exitcode != 0) {
889 redisLog(REDIS_WARNING, "Background saving error");
890 } else {
891 redisLog(REDIS_WARNING,
892 "Background saving terminated by signal");
893 rdbRemoveTempFile(server.bgsavechildpid);
894 }
895 server.bgsavechildpid = -1;
896 /* Possibly there are slaves waiting for a BGSAVE in order to be served
897 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
898 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
899 }
900
901 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
902 * Handle this. */
903 void backgroundRewriteDoneHandler(int statloc) {
904 int exitcode = WEXITSTATUS(statloc);
905 int bysignal = WIFSIGNALED(statloc);
906
907 if (!bysignal && exitcode == 0) {
908 int fd;
909 char tmpfile[256];
910
911 redisLog(REDIS_NOTICE,
912 "Background append only file rewriting terminated with success");
913 /* Now it's time to flush the differences accumulated by the parent */
914 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
915 fd = open(tmpfile,O_WRONLY|O_APPEND);
916 if (fd == -1) {
917 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
918 goto cleanup;
919 }
920 /* Flush our data... */
921 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
922 (signed) sdslen(server.bgrewritebuf)) {
923 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
924 close(fd);
925 goto cleanup;
926 }
927 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
928 /* Now our work is to rename the temp file into the stable file. And
929 * switch the file descriptor used by the server for append only. */
930 if (rename(tmpfile,server.appendfilename) == -1) {
931 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
932 close(fd);
933 goto cleanup;
934 }
935 /* Mission completed... almost */
936 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
937 if (server.appendfd != -1) {
938 /* If append only is actually enabled... */
939 close(server.appendfd);
940 server.appendfd = fd;
941 fsync(fd);
942 server.appendseldb = -1; /* Make sure it will issue SELECT */
943 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
944 } else {
945 /* If append only is disabled we just generate a dump in this
946 * format. Why not? */
947 close(fd);
948 }
949 } else if (!bysignal && exitcode != 0) {
950 redisLog(REDIS_WARNING, "Background append only file rewriting error");
951 } else {
952 redisLog(REDIS_WARNING,
953 "Background append only file rewriting terminated by signal");
954 }
955 cleanup:
956 sdsfree(server.bgrewritebuf);
957 server.bgrewritebuf = sdsempty();
958 aofRemoveTempFile(server.bgrewritechildpid);
959 server.bgrewritechildpid = -1;
960 }
961
962 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
963 int j, loops = server.cronloops++;
964 REDIS_NOTUSED(eventLoop);
965 REDIS_NOTUSED(id);
966 REDIS_NOTUSED(clientData);
967
968 /* Update the global state with the amount of used memory */
969 server.usedmemory = zmalloc_used_memory();
970
971 /* Show some info about non-empty databases */
972 for (j = 0; j < server.dbnum; j++) {
973 long long size, used, vkeys;
974
975 size = dictSlots(server.db[j].dict);
976 used = dictSize(server.db[j].dict);
977 vkeys = dictSize(server.db[j].expires);
978 if (!(loops % 5) && (used || vkeys)) {
979 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
980 /* dictPrintStats(server.dict); */
981 }
982 }
983
984 /* We don't want to resize the hash tables while a bacground saving
985 * is in progress: the saving child is created using fork() that is
986 * implemented with a copy-on-write semantic in most modern systems, so
987 * if we resize the HT while there is the saving child at work actually
988 * a lot of memory movements in the parent will cause a lot of pages
989 * copied. */
990 if (server.bgsavechildpid == -1) tryResizeHashTables();
991
992 /* Show information about connected clients */
993 if (!(loops % 5)) {
994 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
995 listLength(server.clients)-listLength(server.slaves),
996 listLength(server.slaves),
997 server.usedmemory,
998 dictSize(server.sharingpool));
999 }
1000
1001 /* Close connections of timedout clients */
1002 if (server.maxidletime && !(loops % 10))
1003 closeTimedoutClients();
1004
1005 /* Check if a background saving or AOF rewrite in progress terminated */
1006 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
1007 int statloc;
1008 pid_t pid;
1009
1010 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1011 if (pid == server.bgsavechildpid) {
1012 backgroundSaveDoneHandler(statloc);
1013 } else {
1014 backgroundRewriteDoneHandler(statloc);
1015 }
1016 }
1017 } else {
1018 /* If there is not a background saving in progress check if
1019 * we have to save now */
1020 time_t now = time(NULL);
1021 for (j = 0; j < server.saveparamslen; j++) {
1022 struct saveparam *sp = server.saveparams+j;
1023
1024 if (server.dirty >= sp->changes &&
1025 now-server.lastsave > sp->seconds) {
1026 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1027 sp->changes, sp->seconds);
1028 rdbSaveBackground(server.dbfilename);
1029 break;
1030 }
1031 }
1032 }
1033
1034 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1035 * will use few CPU cycles if there are few expiring keys, otherwise
1036 * it will get more aggressive to avoid that too much memory is used by
1037 * keys that can be removed from the keyspace. */
1038 for (j = 0; j < server.dbnum; j++) {
1039 int expired;
1040 redisDb *db = server.db+j;
1041
1042 /* Continue to expire if at the end of the cycle more than 25%
1043 * of the keys were expired. */
1044 do {
1045 int num = dictSize(db->expires);
1046 time_t now = time(NULL);
1047
1048 expired = 0;
1049 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1050 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1051 while (num--) {
1052 dictEntry *de;
1053 time_t t;
1054
1055 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1056 t = (time_t) dictGetEntryVal(de);
1057 if (now > t) {
1058 deleteKey(db,dictGetEntryKey(de));
1059 expired++;
1060 }
1061 }
1062 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1063 }
1064
1065 /* Check if we should connect to a MASTER */
1066 if (server.replstate == REDIS_REPL_CONNECT) {
1067 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1068 if (syncWithMaster() == REDIS_OK) {
1069 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1070 }
1071 }
1072 return 1000;
1073 }
1074
1075 static void createSharedObjects(void) {
1076 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1077 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1078 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1079 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1080 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1081 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1082 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1083 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1084 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1085 /* no such key */
1086 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1087 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1088 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1089 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1090 "-ERR no such key\r\n"));
1091 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1092 "-ERR syntax error\r\n"));
1093 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1094 "-ERR source and destination objects are the same\r\n"));
1095 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1096 "-ERR index out of range\r\n"));
1097 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1098 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1099 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1100 shared.select0 = createStringObject("select 0\r\n",10);
1101 shared.select1 = createStringObject("select 1\r\n",10);
1102 shared.select2 = createStringObject("select 2\r\n",10);
1103 shared.select3 = createStringObject("select 3\r\n",10);
1104 shared.select4 = createStringObject("select 4\r\n",10);
1105 shared.select5 = createStringObject("select 5\r\n",10);
1106 shared.select6 = createStringObject("select 6\r\n",10);
1107 shared.select7 = createStringObject("select 7\r\n",10);
1108 shared.select8 = createStringObject("select 8\r\n",10);
1109 shared.select9 = createStringObject("select 9\r\n",10);
1110 }
1111
1112 static void appendServerSaveParams(time_t seconds, int changes) {
1113 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1114 server.saveparams[server.saveparamslen].seconds = seconds;
1115 server.saveparams[server.saveparamslen].changes = changes;
1116 server.saveparamslen++;
1117 }
1118
1119 static void resetServerSaveParams() {
1120 zfree(server.saveparams);
1121 server.saveparams = NULL;
1122 server.saveparamslen = 0;
1123 }
1124
1125 static void initServerConfig() {
1126 server.dbnum = REDIS_DEFAULT_DBNUM;
1127 server.port = REDIS_SERVERPORT;
1128 server.verbosity = REDIS_DEBUG;
1129 server.maxidletime = REDIS_MAXIDLETIME;
1130 server.saveparams = NULL;
1131 server.logfile = NULL; /* NULL = log on standard output */
1132 server.bindaddr = NULL;
1133 server.glueoutputbuf = 1;
1134 server.daemonize = 0;
1135 server.appendonly = 0;
1136 server.appendfsync = APPENDFSYNC_ALWAYS;
1137 server.lastfsync = time(NULL);
1138 server.appendfd = -1;
1139 server.appendseldb = -1; /* Make sure the first time will not match */
1140 server.pidfile = "/var/run/redis.pid";
1141 server.dbfilename = "dump.rdb";
1142 server.appendfilename = "appendonly.aof";
1143 server.requirepass = NULL;
1144 server.shareobjects = 0;
1145 server.rdbcompression = 1;
1146 server.sharingpoolsize = 1024;
1147 server.maxclients = 0;
1148 server.maxmemory = 0;
1149 resetServerSaveParams();
1150
1151 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1152 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1153 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1154 /* Replication related */
1155 server.isslave = 0;
1156 server.masterauth = NULL;
1157 server.masterhost = NULL;
1158 server.masterport = 6379;
1159 server.master = NULL;
1160 server.replstate = REDIS_REPL_NONE;
1161
1162 /* Double constants initialization */
1163 R_Zero = 0.0;
1164 R_PosInf = 1.0/R_Zero;
1165 R_NegInf = -1.0/R_Zero;
1166 R_Nan = R_Zero/R_Zero;
1167 }
1168
1169 static void initServer() {
1170 int j;
1171
1172 signal(SIGHUP, SIG_IGN);
1173 signal(SIGPIPE, SIG_IGN);
1174 setupSigSegvAction();
1175
1176 server.clients = listCreate();
1177 server.slaves = listCreate();
1178 server.monitors = listCreate();
1179 server.objfreelist = listCreate();
1180 createSharedObjects();
1181 server.el = aeCreateEventLoop();
1182 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1183 server.sharingpool = dictCreate(&setDictType,NULL);
1184 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1185 if (server.fd == -1) {
1186 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1187 exit(1);
1188 }
1189 for (j = 0; j < server.dbnum; j++) {
1190 server.db[j].dict = dictCreate(&hashDictType,NULL);
1191 server.db[j].expires = dictCreate(&setDictType,NULL);
1192 server.db[j].id = j;
1193 }
1194 server.cronloops = 0;
1195 server.bgsavechildpid = -1;
1196 server.bgrewritechildpid = -1;
1197 server.bgrewritebuf = sdsempty();
1198 server.lastsave = time(NULL);
1199 server.dirty = 0;
1200 server.usedmemory = 0;
1201 server.stat_numcommands = 0;
1202 server.stat_numconnections = 0;
1203 server.stat_starttime = time(NULL);
1204 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1205
1206 if (server.appendonly) {
1207 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1208 if (server.appendfd == -1) {
1209 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1210 strerror(errno));
1211 exit(1);
1212 }
1213 }
1214 }
1215
1216 /* Empty the whole database */
1217 static long long emptyDb() {
1218 int j;
1219 long long removed = 0;
1220
1221 for (j = 0; j < server.dbnum; j++) {
1222 removed += dictSize(server.db[j].dict);
1223 dictEmpty(server.db[j].dict);
1224 dictEmpty(server.db[j].expires);
1225 }
1226 return removed;
1227 }
1228
1229 static int yesnotoi(char *s) {
1230 if (!strcasecmp(s,"yes")) return 1;
1231 else if (!strcasecmp(s,"no")) return 0;
1232 else return -1;
1233 }
1234
1235 /* I agree, this is a very rudimental way to load a configuration...
1236 will improve later if the config gets more complex */
1237 static void loadServerConfig(char *filename) {
1238 FILE *fp;
1239 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1240 int linenum = 0;
1241 sds line = NULL;
1242
1243 if (filename[0] == '-' && filename[1] == '\0')
1244 fp = stdin;
1245 else {
1246 if ((fp = fopen(filename,"r")) == NULL) {
1247 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1248 exit(1);
1249 }
1250 }
1251
1252 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1253 sds *argv;
1254 int argc, j;
1255
1256 linenum++;
1257 line = sdsnew(buf);
1258 line = sdstrim(line," \t\r\n");
1259
1260 /* Skip comments and blank lines*/
1261 if (line[0] == '#' || line[0] == '\0') {
1262 sdsfree(line);
1263 continue;
1264 }
1265
1266 /* Split into arguments */
1267 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1268 sdstolower(argv[0]);
1269
1270 /* Execute config directives */
1271 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1272 server.maxidletime = atoi(argv[1]);
1273 if (server.maxidletime < 0) {
1274 err = "Invalid timeout value"; goto loaderr;
1275 }
1276 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1277 server.port = atoi(argv[1]);
1278 if (server.port < 1 || server.port > 65535) {
1279 err = "Invalid port"; goto loaderr;
1280 }
1281 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1282 server.bindaddr = zstrdup(argv[1]);
1283 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1284 int seconds = atoi(argv[1]);
1285 int changes = atoi(argv[2]);
1286 if (seconds < 1 || changes < 0) {
1287 err = "Invalid save parameters"; goto loaderr;
1288 }
1289 appendServerSaveParams(seconds,changes);
1290 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1291 if (chdir(argv[1]) == -1) {
1292 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1293 argv[1], strerror(errno));
1294 exit(1);
1295 }
1296 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1297 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1298 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1299 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1300 else {
1301 err = "Invalid log level. Must be one of debug, notice, warning";
1302 goto loaderr;
1303 }
1304 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1305 FILE *logfp;
1306
1307 server.logfile = zstrdup(argv[1]);
1308 if (!strcasecmp(server.logfile,"stdout")) {
1309 zfree(server.logfile);
1310 server.logfile = NULL;
1311 }
1312 if (server.logfile) {
1313 /* Test if we are able to open the file. The server will not
1314 * be able to abort just for this problem later... */
1315 logfp = fopen(server.logfile,"a");
1316 if (logfp == NULL) {
1317 err = sdscatprintf(sdsempty(),
1318 "Can't open the log file: %s", strerror(errno));
1319 goto loaderr;
1320 }
1321 fclose(logfp);
1322 }
1323 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1324 server.dbnum = atoi(argv[1]);
1325 if (server.dbnum < 1) {
1326 err = "Invalid number of databases"; goto loaderr;
1327 }
1328 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1329 server.maxclients = atoi(argv[1]);
1330 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1331 server.maxmemory = strtoll(argv[1], NULL, 10);
1332 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1333 server.masterhost = sdsnew(argv[1]);
1334 server.masterport = atoi(argv[2]);
1335 server.replstate = REDIS_REPL_CONNECT;
1336 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1337 server.masterauth = zstrdup(argv[1]);
1338 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1339 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1340 err = "argument must be 'yes' or 'no'"; goto loaderr;
1341 }
1342 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1343 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1344 err = "argument must be 'yes' or 'no'"; goto loaderr;
1345 }
1346 } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
1347 if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
1348 err = "argument must be 'yes' or 'no'"; goto loaderr;
1349 }
1350 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1351 server.sharingpoolsize = atoi(argv[1]);
1352 if (server.sharingpoolsize < 1) {
1353 err = "invalid object sharing pool size"; goto loaderr;
1354 }
1355 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1356 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1357 err = "argument must be 'yes' or 'no'"; goto loaderr;
1358 }
1359 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1360 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1361 err = "argument must be 'yes' or 'no'"; goto loaderr;
1362 }
1363 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1364 if (!strcasecmp(argv[1],"no")) {
1365 server.appendfsync = APPENDFSYNC_NO;
1366 } else if (!strcasecmp(argv[1],"always")) {
1367 server.appendfsync = APPENDFSYNC_ALWAYS;
1368 } else if (!strcasecmp(argv[1],"everysec")) {
1369 server.appendfsync = APPENDFSYNC_EVERYSEC;
1370 } else {
1371 err = "argument must be 'no', 'always' or 'everysec'";
1372 goto loaderr;
1373 }
1374 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1375 server.requirepass = zstrdup(argv[1]);
1376 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1377 server.pidfile = zstrdup(argv[1]);
1378 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1379 server.dbfilename = zstrdup(argv[1]);
1380 } else {
1381 err = "Bad directive or wrong number of arguments"; goto loaderr;
1382 }
1383 for (j = 0; j < argc; j++)
1384 sdsfree(argv[j]);
1385 zfree(argv);
1386 sdsfree(line);
1387 }
1388 if (fp != stdin) fclose(fp);
1389 return;
1390
1391 loaderr:
1392 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1393 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1394 fprintf(stderr, ">>> '%s'\n", line);
1395 fprintf(stderr, "%s\n", err);
1396 exit(1);
1397 }
1398
1399 static void freeClientArgv(redisClient *c) {
1400 int j;
1401
1402 for (j = 0; j < c->argc; j++)
1403 decrRefCount(c->argv[j]);
1404 for (j = 0; j < c->mbargc; j++)
1405 decrRefCount(c->mbargv[j]);
1406 c->argc = 0;
1407 c->mbargc = 0;
1408 }
1409
1410 static void freeClient(redisClient *c) {
1411 listNode *ln;
1412
1413 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1414 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1415 sdsfree(c->querybuf);
1416 listRelease(c->reply);
1417 freeClientArgv(c);
1418 close(c->fd);
1419 ln = listSearchKey(server.clients,c);
1420 redisAssert(ln != NULL);
1421 listDelNode(server.clients,ln);
1422 if (c->flags & REDIS_SLAVE) {
1423 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1424 close(c->repldbfd);
1425 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1426 ln = listSearchKey(l,c);
1427 redisAssert(ln != NULL);
1428 listDelNode(l,ln);
1429 }
1430 if (c->flags & REDIS_MASTER) {
1431 server.master = NULL;
1432 server.replstate = REDIS_REPL_CONNECT;
1433 }
1434 zfree(c->argv);
1435 zfree(c->mbargv);
1436 zfree(c);
1437 }
1438
1439 #define GLUEREPLY_UP_TO (1024)
1440 static void glueReplyBuffersIfNeeded(redisClient *c) {
1441 int copylen = 0;
1442 char buf[GLUEREPLY_UP_TO];
1443 listNode *ln;
1444 robj *o;
1445
1446 listRewind(c->reply);
1447 while((ln = listYield(c->reply))) {
1448 int objlen;
1449
1450 o = ln->value;
1451 objlen = sdslen(o->ptr);
1452 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1453 memcpy(buf+copylen,o->ptr,objlen);
1454 copylen += objlen;
1455 listDelNode(c->reply,ln);
1456 } else {
1457 if (copylen == 0) return;
1458 break;
1459 }
1460 }
1461 /* Now the output buffer is empty, add the new single element */
1462 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1463 listAddNodeHead(c->reply,o);
1464 }
1465
1466 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1467 redisClient *c = privdata;
1468 int nwritten = 0, totwritten = 0, objlen;
1469 robj *o;
1470 REDIS_NOTUSED(el);
1471 REDIS_NOTUSED(mask);
1472
1473 /* Use writev() if we have enough buffers to send */
1474 if (!server.glueoutputbuf &&
1475 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1476 !(c->flags & REDIS_MASTER))
1477 {
1478 sendReplyToClientWritev(el, fd, privdata, mask);
1479 return;
1480 }
1481
1482 while(listLength(c->reply)) {
1483 if (server.glueoutputbuf && listLength(c->reply) > 1)
1484 glueReplyBuffersIfNeeded(c);
1485
1486 o = listNodeValue(listFirst(c->reply));
1487 objlen = sdslen(o->ptr);
1488
1489 if (objlen == 0) {
1490 listDelNode(c->reply,listFirst(c->reply));
1491 continue;
1492 }
1493
1494 if (c->flags & REDIS_MASTER) {
1495 /* Don't reply to a master */
1496 nwritten = objlen - c->sentlen;
1497 } else {
1498 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1499 if (nwritten <= 0) break;
1500 }
1501 c->sentlen += nwritten;
1502 totwritten += nwritten;
1503 /* If we fully sent the object on head go to the next one */
1504 if (c->sentlen == objlen) {
1505 listDelNode(c->reply,listFirst(c->reply));
1506 c->sentlen = 0;
1507 }
1508 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1509 * bytes, in a single threaded server it's a good idea to serve
1510 * other clients as well, even if a very large request comes from
1511 * super fast link that is always able to accept data (in real world
1512 * scenario think about 'KEYS *' against the loopback interfae) */
1513 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1514 }
1515 if (nwritten == -1) {
1516 if (errno == EAGAIN) {
1517 nwritten = 0;
1518 } else {
1519 redisLog(REDIS_DEBUG,
1520 "Error writing to client: %s", strerror(errno));
1521 freeClient(c);
1522 return;
1523 }
1524 }
1525 if (totwritten > 0) c->lastinteraction = time(NULL);
1526 if (listLength(c->reply) == 0) {
1527 c->sentlen = 0;
1528 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1529 }
1530 }
1531
1532 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1533 {
1534 redisClient *c = privdata;
1535 int nwritten = 0, totwritten = 0, objlen, willwrite;
1536 robj *o;
1537 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1538 int offset, ion = 0;
1539 REDIS_NOTUSED(el);
1540 REDIS_NOTUSED(mask);
1541
1542 listNode *node;
1543 while (listLength(c->reply)) {
1544 offset = c->sentlen;
1545 ion = 0;
1546 willwrite = 0;
1547
1548 /* fill-in the iov[] array */
1549 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1550 o = listNodeValue(node);
1551 objlen = sdslen(o->ptr);
1552
1553 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1554 break;
1555
1556 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1557 break; /* no more iovecs */
1558
1559 iov[ion].iov_base = ((char*)o->ptr) + offset;
1560 iov[ion].iov_len = objlen - offset;
1561 willwrite += objlen - offset;
1562 offset = 0; /* just for the first item */
1563 ion++;
1564 }
1565
1566 if(willwrite == 0)
1567 break;
1568
1569 /* write all collected blocks at once */
1570 if((nwritten = writev(fd, iov, ion)) < 0) {
1571 if (errno != EAGAIN) {
1572 redisLog(REDIS_DEBUG,
1573 "Error writing to client: %s", strerror(errno));
1574 freeClient(c);
1575 return;
1576 }
1577 break;
1578 }
1579
1580 totwritten += nwritten;
1581 offset = c->sentlen;
1582
1583 /* remove written robjs from c->reply */
1584 while (nwritten && listLength(c->reply)) {
1585 o = listNodeValue(listFirst(c->reply));
1586 objlen = sdslen(o->ptr);
1587
1588 if(nwritten >= objlen - offset) {
1589 listDelNode(c->reply, listFirst(c->reply));
1590 nwritten -= objlen - offset;
1591 c->sentlen = 0;
1592 } else {
1593 /* partial write */
1594 c->sentlen += nwritten;
1595 break;
1596 }
1597 offset = 0;
1598 }
1599 }
1600
1601 if (totwritten > 0)
1602 c->lastinteraction = time(NULL);
1603
1604 if (listLength(c->reply) == 0) {
1605 c->sentlen = 0;
1606 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1607 }
1608 }
1609
1610 static struct redisCommand *lookupCommand(char *name) {
1611 int j = 0;
1612 while(cmdTable[j].name != NULL) {
1613 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1614 j++;
1615 }
1616 return NULL;
1617 }
1618
1619 /* resetClient prepare the client to process the next command */
1620 static void resetClient(redisClient *c) {
1621 freeClientArgv(c);
1622 c->bulklen = -1;
1623 c->multibulk = 0;
1624 }
1625
1626 /* If this function gets called we already read a whole
1627 * command, argments are in the client argv/argc fields.
1628 * processCommand() execute the command or prepare the
1629 * server for a bulk read from the client.
1630 *
1631 * If 1 is returned the client is still alive and valid and
1632 * and other operations can be performed by the caller. Otherwise
1633 * if 0 is returned the client was destroied (i.e. after QUIT). */
1634 static int processCommand(redisClient *c) {
1635 struct redisCommand *cmd;
1636 long long dirty;
1637
1638 /* Free some memory if needed (maxmemory setting) */
1639 if (server.maxmemory) freeMemoryIfNeeded();
1640
1641 /* Handle the multi bulk command type. This is an alternative protocol
1642 * supported by Redis in order to receive commands that are composed of
1643 * multiple binary-safe "bulk" arguments. The latency of processing is
1644 * a bit higher but this allows things like multi-sets, so if this
1645 * protocol is used only for MSET and similar commands this is a big win. */
1646 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1647 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1648 if (c->multibulk <= 0) {
1649 resetClient(c);
1650 return 1;
1651 } else {
1652 decrRefCount(c->argv[c->argc-1]);
1653 c->argc--;
1654 return 1;
1655 }
1656 } else if (c->multibulk) {
1657 if (c->bulklen == -1) {
1658 if (((char*)c->argv[0]->ptr)[0] != '$') {
1659 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1660 resetClient(c);
1661 return 1;
1662 } else {
1663 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1664 decrRefCount(c->argv[0]);
1665 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1666 c->argc--;
1667 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1668 resetClient(c);
1669 return 1;
1670 }
1671 c->argc--;
1672 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1673 return 1;
1674 }
1675 } else {
1676 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1677 c->mbargv[c->mbargc] = c->argv[0];
1678 c->mbargc++;
1679 c->argc--;
1680 c->multibulk--;
1681 if (c->multibulk == 0) {
1682 robj **auxargv;
1683 int auxargc;
1684
1685 /* Here we need to swap the multi-bulk argc/argv with the
1686 * normal argc/argv of the client structure. */
1687 auxargv = c->argv;
1688 c->argv = c->mbargv;
1689 c->mbargv = auxargv;
1690
1691 auxargc = c->argc;
1692 c->argc = c->mbargc;
1693 c->mbargc = auxargc;
1694
1695 /* We need to set bulklen to something different than -1
1696 * in order for the code below to process the command without
1697 * to try to read the last argument of a bulk command as
1698 * a special argument. */
1699 c->bulklen = 0;
1700 /* continue below and process the command */
1701 } else {
1702 c->bulklen = -1;
1703 return 1;
1704 }
1705 }
1706 }
1707 /* -- end of multi bulk commands processing -- */
1708
1709 /* The QUIT command is handled as a special case. Normal command
1710 * procs are unable to close the client connection safely */
1711 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1712 freeClient(c);
1713 return 0;
1714 }
1715 cmd = lookupCommand(c->argv[0]->ptr);
1716 if (!cmd) {
1717 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1718 resetClient(c);
1719 return 1;
1720 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1721 (c->argc < -cmd->arity)) {
1722 addReplySds(c,
1723 sdscatprintf(sdsempty(),
1724 "-ERR wrong number of arguments for '%s' command\r\n",
1725 cmd->name));
1726 resetClient(c);
1727 return 1;
1728 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1729 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1730 resetClient(c);
1731 return 1;
1732 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1733 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1734
1735 decrRefCount(c->argv[c->argc-1]);
1736 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1737 c->argc--;
1738 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1739 resetClient(c);
1740 return 1;
1741 }
1742 c->argc--;
1743 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1744 /* It is possible that the bulk read is already in the
1745 * buffer. Check this condition and handle it accordingly.
1746 * This is just a fast path, alternative to call processInputBuffer().
1747 * It's a good idea since the code is small and this condition
1748 * happens most of the times. */
1749 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1750 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1751 c->argc++;
1752 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1753 } else {
1754 return 1;
1755 }
1756 }
1757 /* Let's try to share objects on the command arguments vector */
1758 if (server.shareobjects) {
1759 int j;
1760 for(j = 1; j < c->argc; j++)
1761 c->argv[j] = tryObjectSharing(c->argv[j]);
1762 }
1763 /* Let's try to encode the bulk object to save space. */
1764 if (cmd->flags & REDIS_CMD_BULK)
1765 tryObjectEncoding(c->argv[c->argc-1]);
1766
1767 /* Check if the user is authenticated */
1768 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1769 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1770 resetClient(c);
1771 return 1;
1772 }
1773
1774 /* Exec the command */
1775 dirty = server.dirty;
1776 cmd->proc(c);
1777 if (server.appendonly && server.dirty-dirty)
1778 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1779 if (server.dirty-dirty && listLength(server.slaves))
1780 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1781 if (listLength(server.monitors))
1782 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1783 server.stat_numcommands++;
1784
1785 /* Prepare the client for the next command */
1786 if (c->flags & REDIS_CLOSE) {
1787 freeClient(c);
1788 return 0;
1789 }
1790 resetClient(c);
1791 return 1;
1792 }
1793
1794 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1795 listNode *ln;
1796 int outc = 0, j;
1797 robj **outv;
1798 /* (args*2)+1 is enough room for args, spaces, newlines */
1799 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1800
1801 if (argc <= REDIS_STATIC_ARGS) {
1802 outv = static_outv;
1803 } else {
1804 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1805 }
1806
1807 for (j = 0; j < argc; j++) {
1808 if (j != 0) outv[outc++] = shared.space;
1809 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1810 robj *lenobj;
1811
1812 lenobj = createObject(REDIS_STRING,
1813 sdscatprintf(sdsempty(),"%lu\r\n",
1814 (unsigned long) stringObjectLen(argv[j])));
1815 lenobj->refcount = 0;
1816 outv[outc++] = lenobj;
1817 }
1818 outv[outc++] = argv[j];
1819 }
1820 outv[outc++] = shared.crlf;
1821
1822 /* Increment all the refcounts at start and decrement at end in order to
1823 * be sure to free objects if there is no slave in a replication state
1824 * able to be feed with commands */
1825 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1826 listRewind(slaves);
1827 while((ln = listYield(slaves))) {
1828 redisClient *slave = ln->value;
1829
1830 /* Don't feed slaves that are still waiting for BGSAVE to start */
1831 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1832
1833 /* Feed all the other slaves, MONITORs and so on */
1834 if (slave->slaveseldb != dictid) {
1835 robj *selectcmd;
1836
1837 switch(dictid) {
1838 case 0: selectcmd = shared.select0; break;
1839 case 1: selectcmd = shared.select1; break;
1840 case 2: selectcmd = shared.select2; break;
1841 case 3: selectcmd = shared.select3; break;
1842 case 4: selectcmd = shared.select4; break;
1843 case 5: selectcmd = shared.select5; break;
1844 case 6: selectcmd = shared.select6; break;
1845 case 7: selectcmd = shared.select7; break;
1846 case 8: selectcmd = shared.select8; break;
1847 case 9: selectcmd = shared.select9; break;
1848 default:
1849 selectcmd = createObject(REDIS_STRING,
1850 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1851 selectcmd->refcount = 0;
1852 break;
1853 }
1854 addReply(slave,selectcmd);
1855 slave->slaveseldb = dictid;
1856 }
1857 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1858 }
1859 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1860 if (outv != static_outv) zfree(outv);
1861 }
1862
1863 static void processInputBuffer(redisClient *c) {
1864 again:
1865 if (c->bulklen == -1) {
1866 /* Read the first line of the query */
1867 char *p = strchr(c->querybuf,'\n');
1868 size_t querylen;
1869
1870 if (p) {
1871 sds query, *argv;
1872 int argc, j;
1873
1874 query = c->querybuf;
1875 c->querybuf = sdsempty();
1876 querylen = 1+(p-(query));
1877 if (sdslen(query) > querylen) {
1878 /* leave data after the first line of the query in the buffer */
1879 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1880 }
1881 *p = '\0'; /* remove "\n" */
1882 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1883 sdsupdatelen(query);
1884
1885 /* Now we can split the query in arguments */
1886 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1887 sdsfree(query);
1888
1889 if (c->argv) zfree(c->argv);
1890 c->argv = zmalloc(sizeof(robj*)*argc);
1891
1892 for (j = 0; j < argc; j++) {
1893 if (sdslen(argv[j])) {
1894 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1895 c->argc++;
1896 } else {
1897 sdsfree(argv[j]);
1898 }
1899 }
1900 zfree(argv);
1901 if (c->argc) {
1902 /* Execute the command. If the client is still valid
1903 * after processCommand() return and there is something
1904 * on the query buffer try to process the next command. */
1905 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1906 } else {
1907 /* Nothing to process, argc == 0. Just process the query
1908 * buffer if it's not empty or return to the caller */
1909 if (sdslen(c->querybuf)) goto again;
1910 }
1911 return;
1912 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1913 redisLog(REDIS_DEBUG, "Client protocol error");
1914 freeClient(c);
1915 return;
1916 }
1917 } else {
1918 /* Bulk read handling. Note that if we are at this point
1919 the client already sent a command terminated with a newline,
1920 we are reading the bulk data that is actually the last
1921 argument of the command. */
1922 int qbl = sdslen(c->querybuf);
1923
1924 if (c->bulklen <= qbl) {
1925 /* Copy everything but the final CRLF as final argument */
1926 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1927 c->argc++;
1928 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1929 /* Process the command. If the client is still valid after
1930 * the processing and there is more data in the buffer
1931 * try to parse it. */
1932 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1933 return;
1934 }
1935 }
1936 }
1937
1938 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1939 redisClient *c = (redisClient*) privdata;
1940 char buf[REDIS_IOBUF_LEN];
1941 int nread;
1942 REDIS_NOTUSED(el);
1943 REDIS_NOTUSED(mask);
1944
1945 nread = read(fd, buf, REDIS_IOBUF_LEN);
1946 if (nread == -1) {
1947 if (errno == EAGAIN) {
1948 nread = 0;
1949 } else {
1950 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1951 freeClient(c);
1952 return;
1953 }
1954 } else if (nread == 0) {
1955 redisLog(REDIS_DEBUG, "Client closed connection");
1956 freeClient(c);
1957 return;
1958 }
1959 if (nread) {
1960 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1961 c->lastinteraction = time(NULL);
1962 } else {
1963 return;
1964 }
1965 processInputBuffer(c);
1966 }
1967
1968 static int selectDb(redisClient *c, int id) {
1969 if (id < 0 || id >= server.dbnum)
1970 return REDIS_ERR;
1971 c->db = &server.db[id];
1972 return REDIS_OK;
1973 }
1974
1975 static void *dupClientReplyValue(void *o) {
1976 incrRefCount((robj*)o);
1977 return 0;
1978 }
1979
1980 static redisClient *createClient(int fd) {
1981 redisClient *c = zmalloc(sizeof(*c));
1982
1983 anetNonBlock(NULL,fd);
1984 anetTcpNoDelay(NULL,fd);
1985 if (!c) return NULL;
1986 selectDb(c,0);
1987 c->fd = fd;
1988 c->querybuf = sdsempty();
1989 c->argc = 0;
1990 c->argv = NULL;
1991 c->bulklen = -1;
1992 c->multibulk = 0;
1993 c->mbargc = 0;
1994 c->mbargv = NULL;
1995 c->sentlen = 0;
1996 c->flags = 0;
1997 c->lastinteraction = time(NULL);
1998 c->authenticated = 0;
1999 c->replstate = REDIS_REPL_NONE;
2000 c->reply = listCreate();
2001 listSetFreeMethod(c->reply,decrRefCount);
2002 listSetDupMethod(c->reply,dupClientReplyValue);
2003 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
2004 readQueryFromClient, c) == AE_ERR) {
2005 freeClient(c);
2006 return NULL;
2007 }
2008 listAddNodeTail(server.clients,c);
2009 return c;
2010 }
2011
2012 static void addReply(redisClient *c, robj *obj) {
2013 if (listLength(c->reply) == 0 &&
2014 (c->replstate == REDIS_REPL_NONE ||
2015 c->replstate == REDIS_REPL_ONLINE) &&
2016 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
2017 sendReplyToClient, c) == AE_ERR) return;
2018 listAddNodeTail(c->reply,getDecodedObject(obj));
2019 }
2020
2021 static void addReplySds(redisClient *c, sds s) {
2022 robj *o = createObject(REDIS_STRING,s);
2023 addReply(c,o);
2024 decrRefCount(o);
2025 }
2026
2027 static void addReplyDouble(redisClient *c, double d) {
2028 char buf[128];
2029
2030 snprintf(buf,sizeof(buf),"%.17g",d);
2031 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2032 (unsigned long) strlen(buf),buf));
2033 }
2034
2035 static void addReplyBulkLen(redisClient *c, robj *obj) {
2036 size_t len;
2037
2038 if (obj->encoding == REDIS_ENCODING_RAW) {
2039 len = sdslen(obj->ptr);
2040 } else {
2041 long n = (long)obj->ptr;
2042
2043 /* Compute how many bytes will take this integer as a radix 10 string */
2044 len = 1;
2045 if (n < 0) {
2046 len++;
2047 n = -n;
2048 }
2049 while((n = n/10) != 0) {
2050 len++;
2051 }
2052 }
2053 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
2054 }
2055
2056 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2057 int cport, cfd;
2058 char cip[128];
2059 redisClient *c;
2060 REDIS_NOTUSED(el);
2061 REDIS_NOTUSED(mask);
2062 REDIS_NOTUSED(privdata);
2063
2064 cfd = anetAccept(server.neterr, fd, cip, &cport);
2065 if (cfd == AE_ERR) {
2066 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2067 return;
2068 }
2069 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2070 if ((c = createClient(cfd)) == NULL) {
2071 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2072 close(cfd); /* May be already closed, just ingore errors */
2073 return;
2074 }
2075 /* If maxclient directive is set and this is one client more... close the
2076 * connection. Note that we create the client instead to check before
2077 * for this condition, since now the socket is already set in nonblocking
2078 * mode and we can send an error for free using the Kernel I/O */
2079 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2080 char *err = "-ERR max number of clients reached\r\n";
2081
2082 /* That's a best effort error message, don't check write errors */
2083 if (write(c->fd,err,strlen(err)) == -1) {
2084 /* Nothing to do, Just to avoid the warning... */
2085 }
2086 freeClient(c);
2087 return;
2088 }
2089 server.stat_numconnections++;
2090 }
2091
2092 /* ======================= Redis objects implementation ===================== */
2093
2094 static robj *createObject(int type, void *ptr) {
2095 robj *o;
2096
2097 if (listLength(server.objfreelist)) {
2098 listNode *head = listFirst(server.objfreelist);
2099 o = listNodeValue(head);
2100 listDelNode(server.objfreelist,head);
2101 } else {
2102 o = zmalloc(sizeof(*o));
2103 }
2104 o->type = type;
2105 o->encoding = REDIS_ENCODING_RAW;
2106 o->ptr = ptr;
2107 o->refcount = 1;
2108 return o;
2109 }
2110
2111 static robj *createStringObject(char *ptr, size_t len) {
2112 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2113 }
2114
2115 static robj *createListObject(void) {
2116 list *l = listCreate();
2117
2118 listSetFreeMethod(l,decrRefCount);
2119 return createObject(REDIS_LIST,l);
2120 }
2121
2122 static robj *createSetObject(void) {
2123 dict *d = dictCreate(&setDictType,NULL);
2124 return createObject(REDIS_SET,d);
2125 }
2126
2127 static robj *createZsetObject(void) {
2128 zset *zs = zmalloc(sizeof(*zs));
2129
2130 zs->dict = dictCreate(&zsetDictType,NULL);
2131 zs->zsl = zslCreate();
2132 return createObject(REDIS_ZSET,zs);
2133 }
2134
2135 static void freeStringObject(robj *o) {
2136 if (o->encoding == REDIS_ENCODING_RAW) {
2137 sdsfree(o->ptr);
2138 }
2139 }
2140
2141 static void freeListObject(robj *o) {
2142 listRelease((list*) o->ptr);
2143 }
2144
2145 static void freeSetObject(robj *o) {
2146 dictRelease((dict*) o->ptr);
2147 }
2148
2149 static void freeZsetObject(robj *o) {
2150 zset *zs = o->ptr;
2151
2152 dictRelease(zs->dict);
2153 zslFree(zs->zsl);
2154 zfree(zs);
2155 }
2156
2157 static void freeHashObject(robj *o) {
2158 dictRelease((dict*) o->ptr);
2159 }
2160
2161 static void incrRefCount(robj *o) {
2162 o->refcount++;
2163 #ifdef DEBUG_REFCOUNT
2164 if (o->type == REDIS_STRING)
2165 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2166 #endif
2167 }
2168
2169 static void decrRefCount(void *obj) {
2170 robj *o = obj;
2171
2172 #ifdef DEBUG_REFCOUNT
2173 if (o->type == REDIS_STRING)
2174 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2175 #endif
2176 if (--(o->refcount) == 0) {
2177 switch(o->type) {
2178 case REDIS_STRING: freeStringObject(o); break;
2179 case REDIS_LIST: freeListObject(o); break;
2180 case REDIS_SET: freeSetObject(o); break;
2181 case REDIS_ZSET: freeZsetObject(o); break;
2182 case REDIS_HASH: freeHashObject(o); break;
2183 default: redisAssert(0 != 0); break;
2184 }
2185 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2186 !listAddNodeHead(server.objfreelist,o))
2187 zfree(o);
2188 }
2189 }
2190
2191 static robj *lookupKey(redisDb *db, robj *key) {
2192 dictEntry *de = dictFind(db->dict,key);
2193 return de ? dictGetEntryVal(de) : NULL;
2194 }
2195
2196 static robj *lookupKeyRead(redisDb *db, robj *key) {
2197 expireIfNeeded(db,key);
2198 return lookupKey(db,key);
2199 }
2200
2201 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2202 deleteIfVolatile(db,key);
2203 return lookupKey(db,key);
2204 }
2205
2206 static int deleteKey(redisDb *db, robj *key) {
2207 int retval;
2208
2209 /* We need to protect key from destruction: after the first dictDelete()
2210 * it may happen that 'key' is no longer valid if we don't increment
2211 * it's count. This may happen when we get the object reference directly
2212 * from the hash table with dictRandomKey() or dict iterators */
2213 incrRefCount(key);
2214 if (dictSize(db->expires)) dictDelete(db->expires,key);
2215 retval = dictDelete(db->dict,key);
2216 decrRefCount(key);
2217
2218 return retval == DICT_OK;
2219 }
2220
2221 /* Try to share an object against the shared objects pool */
2222 static robj *tryObjectSharing(robj *o) {
2223 struct dictEntry *de;
2224 unsigned long c;
2225
2226 if (o == NULL || server.shareobjects == 0) return o;
2227
2228 redisAssert(o->type == REDIS_STRING);
2229 de = dictFind(server.sharingpool,o);
2230 if (de) {
2231 robj *shared = dictGetEntryKey(de);
2232
2233 c = ((unsigned long) dictGetEntryVal(de))+1;
2234 dictGetEntryVal(de) = (void*) c;
2235 incrRefCount(shared);
2236 decrRefCount(o);
2237 return shared;
2238 } else {
2239 /* Here we are using a stream algorihtm: Every time an object is
2240 * shared we increment its count, everytime there is a miss we
2241 * recrement the counter of a random object. If this object reaches
2242 * zero we remove the object and put the current object instead. */
2243 if (dictSize(server.sharingpool) >=
2244 server.sharingpoolsize) {
2245 de = dictGetRandomKey(server.sharingpool);
2246 redisAssert(de != NULL);
2247 c = ((unsigned long) dictGetEntryVal(de))-1;
2248 dictGetEntryVal(de) = (void*) c;
2249 if (c == 0) {
2250 dictDelete(server.sharingpool,de->key);
2251 }
2252 } else {
2253 c = 0; /* If the pool is empty we want to add this object */
2254 }
2255 if (c == 0) {
2256 int retval;
2257
2258 retval = dictAdd(server.sharingpool,o,(void*)1);
2259 redisAssert(retval == DICT_OK);
2260 incrRefCount(o);
2261 }
2262 return o;
2263 }
2264 }
2265
2266 /* Check if the nul-terminated string 's' can be represented by a long
2267 * (that is, is a number that fits into long without any other space or
2268 * character before or after the digits).
2269 *
2270 * If so, the function returns REDIS_OK and *longval is set to the value
2271 * of the number. Otherwise REDIS_ERR is returned */
2272 static int isStringRepresentableAsLong(sds s, long *longval) {
2273 char buf[32], *endptr;
2274 long value;
2275 int slen;
2276
2277 value = strtol(s, &endptr, 10);
2278 if (endptr[0] != '\0') return REDIS_ERR;
2279 slen = snprintf(buf,32,"%ld",value);
2280
2281 /* If the number converted back into a string is not identical
2282 * then it's not possible to encode the string as integer */
2283 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2284 if (longval) *longval = value;
2285 return REDIS_OK;
2286 }
2287
2288 /* Try to encode a string object in order to save space */
2289 static int tryObjectEncoding(robj *o) {
2290 long value;
2291 sds s = o->ptr;
2292
2293 if (o->encoding != REDIS_ENCODING_RAW)
2294 return REDIS_ERR; /* Already encoded */
2295
2296 /* It's not save to encode shared objects: shared objects can be shared
2297 * everywhere in the "object space" of Redis. Encoded objects can only
2298 * appear as "values" (and not, for instance, as keys) */
2299 if (o->refcount > 1) return REDIS_ERR;
2300
2301 /* Currently we try to encode only strings */
2302 redisAssert(o->type == REDIS_STRING);
2303
2304 /* Check if we can represent this string as a long integer */
2305 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2306
2307 /* Ok, this object can be encoded */
2308 o->encoding = REDIS_ENCODING_INT;
2309 sdsfree(o->ptr);
2310 o->ptr = (void*) value;
2311 return REDIS_OK;
2312 }
2313
2314 /* Get a decoded version of an encoded object (returned as a new object).
2315 * If the object is already raw-encoded just increment the ref count. */
2316 static robj *getDecodedObject(robj *o) {
2317 robj *dec;
2318
2319 if (o->encoding == REDIS_ENCODING_RAW) {
2320 incrRefCount(o);
2321 return o;
2322 }
2323 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2324 char buf[32];
2325
2326 snprintf(buf,32,"%ld",(long)o->ptr);
2327 dec = createStringObject(buf,strlen(buf));
2328 return dec;
2329 } else {
2330 redisAssert(1 != 1);
2331 }
2332 }
2333
2334 /* Compare two string objects via strcmp() or alike.
2335 * Note that the objects may be integer-encoded. In such a case we
2336 * use snprintf() to get a string representation of the numbers on the stack
2337 * and compare the strings, it's much faster than calling getDecodedObject().
2338 *
2339 * Important note: if objects are not integer encoded, but binary-safe strings,
2340 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2341 * binary safe. */
2342 static int compareStringObjects(robj *a, robj *b) {
2343 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2344 char bufa[128], bufb[128], *astr, *bstr;
2345 int bothsds = 1;
2346
2347 if (a == b) return 0;
2348 if (a->encoding != REDIS_ENCODING_RAW) {
2349 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2350 astr = bufa;
2351 bothsds = 0;
2352 } else {
2353 astr = a->ptr;
2354 }
2355 if (b->encoding != REDIS_ENCODING_RAW) {
2356 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2357 bstr = bufb;
2358 bothsds = 0;
2359 } else {
2360 bstr = b->ptr;
2361 }
2362 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2363 }
2364
2365 static size_t stringObjectLen(robj *o) {
2366 redisAssert(o->type == REDIS_STRING);
2367 if (o->encoding == REDIS_ENCODING_RAW) {
2368 return sdslen(o->ptr);
2369 } else {
2370 char buf[32];
2371
2372 return snprintf(buf,32,"%ld",(long)o->ptr);
2373 }
2374 }
2375
2376 /*============================ DB saving/loading ============================ */
2377
2378 static int rdbSaveType(FILE *fp, unsigned char type) {
2379 if (fwrite(&type,1,1,fp) == 0) return -1;
2380 return 0;
2381 }
2382
2383 static int rdbSaveTime(FILE *fp, time_t t) {
2384 int32_t t32 = (int32_t) t;
2385 if (fwrite(&t32,4,1,fp) == 0) return -1;
2386 return 0;
2387 }
2388
2389 /* check rdbLoadLen() comments for more info */
2390 static int rdbSaveLen(FILE *fp, uint32_t len) {
2391 unsigned char buf[2];
2392
2393 if (len < (1<<6)) {
2394 /* Save a 6 bit len */
2395 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2396 if (fwrite(buf,1,1,fp) == 0) return -1;
2397 } else if (len < (1<<14)) {
2398 /* Save a 14 bit len */
2399 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2400 buf[1] = len&0xFF;
2401 if (fwrite(buf,2,1,fp) == 0) return -1;
2402 } else {
2403 /* Save a 32 bit len */
2404 buf[0] = (REDIS_RDB_32BITLEN<<6);
2405 if (fwrite(buf,1,1,fp) == 0) return -1;
2406 len = htonl(len);
2407 if (fwrite(&len,4,1,fp) == 0) return -1;
2408 }
2409 return 0;
2410 }
2411
2412 /* String objects in the form "2391" "-100" without any space and with a
2413 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2414 * encoded as integers to save space */
2415 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2416 long long value;
2417 char *endptr, buf[32];
2418
2419 /* Check if it's possible to encode this value as a number */
2420 value = strtoll(s, &endptr, 10);
2421 if (endptr[0] != '\0') return 0;
2422 snprintf(buf,32,"%lld",value);
2423
2424 /* If the number converted back into a string is not identical
2425 * then it's not possible to encode the string as integer */
2426 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2427
2428 /* Finally check if it fits in our ranges */
2429 if (value >= -(1<<7) && value <= (1<<7)-1) {
2430 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2431 enc[1] = value&0xFF;
2432 return 2;
2433 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2434 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2435 enc[1] = value&0xFF;
2436 enc[2] = (value>>8)&0xFF;
2437 return 3;
2438 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2439 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2440 enc[1] = value&0xFF;
2441 enc[2] = (value>>8)&0xFF;
2442 enc[3] = (value>>16)&0xFF;
2443 enc[4] = (value>>24)&0xFF;
2444 return 5;
2445 } else {
2446 return 0;
2447 }
2448 }
2449
2450 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2451 unsigned int comprlen, outlen;
2452 unsigned char byte;
2453 void *out;
2454
2455 /* We require at least four bytes compression for this to be worth it */
2456 outlen = sdslen(obj->ptr)-4;
2457 if (outlen <= 0) return 0;
2458 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2459 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2460 if (comprlen == 0) {
2461 zfree(out);
2462 return 0;
2463 }
2464 /* Data compressed! Let's save it on disk */
2465 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2466 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2467 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2468 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2469 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2470 zfree(out);
2471 return comprlen;
2472
2473 writeerr:
2474 zfree(out);
2475 return -1;
2476 }
2477
2478 /* Save a string objet as [len][data] on disk. If the object is a string
2479 * representation of an integer value we try to safe it in a special form */
2480 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2481 size_t len;
2482 int enclen;
2483
2484 len = sdslen(obj->ptr);
2485
2486 /* Try integer encoding */
2487 if (len <= 11) {
2488 unsigned char buf[5];
2489 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2490 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2491 return 0;
2492 }
2493 }
2494
2495 /* Try LZF compression - under 20 bytes it's unable to compress even
2496 * aaaaaaaaaaaaaaaaaa so skip it */
2497 if (server.rdbcompression && len > 20) {
2498 int retval;
2499
2500 retval = rdbSaveLzfStringObject(fp,obj);
2501 if (retval == -1) return -1;
2502 if (retval > 0) return 0;
2503 /* retval == 0 means data can't be compressed, save the old way */
2504 }
2505
2506 /* Store verbatim */
2507 if (rdbSaveLen(fp,len) == -1) return -1;
2508 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2509 return 0;
2510 }
2511
2512 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2513 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2514 int retval;
2515
2516 obj = getDecodedObject(obj);
2517 retval = rdbSaveStringObjectRaw(fp,obj);
2518 decrRefCount(obj);
2519 return retval;
2520 }
2521
2522 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2523 * 8 bit integer specifing the length of the representation.
2524 * This 8 bit integer has special values in order to specify the following
2525 * conditions:
2526 * 253: not a number
2527 * 254: + inf
2528 * 255: - inf
2529 */
2530 static int rdbSaveDoubleValue(FILE *fp, double val) {
2531 unsigned char buf[128];
2532 int len;
2533
2534 if (isnan(val)) {
2535 buf[0] = 253;
2536 len = 1;
2537 } else if (!isfinite(val)) {
2538 len = 1;
2539 buf[0] = (val < 0) ? 255 : 254;
2540 } else {
2541 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2542 buf[0] = strlen((char*)buf+1);
2543 len = buf[0]+1;
2544 }
2545 if (fwrite(buf,len,1,fp) == 0) return -1;
2546 return 0;
2547 }
2548
2549 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2550 static int rdbSave(char *filename) {
2551 dictIterator *di = NULL;
2552 dictEntry *de;
2553 FILE *fp;
2554 char tmpfile[256];
2555 int j;
2556 time_t now = time(NULL);
2557
2558 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2559 fp = fopen(tmpfile,"w");
2560 if (!fp) {
2561 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2562 return REDIS_ERR;
2563 }
2564 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2565 for (j = 0; j < server.dbnum; j++) {
2566 redisDb *db = server.db+j;
2567 dict *d = db->dict;
2568 if (dictSize(d) == 0) continue;
2569 di = dictGetIterator(d);
2570 if (!di) {
2571 fclose(fp);
2572 return REDIS_ERR;
2573 }
2574
2575 /* Write the SELECT DB opcode */
2576 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2577 if (rdbSaveLen(fp,j) == -1) goto werr;
2578
2579 /* Iterate this DB writing every entry */
2580 while((de = dictNext(di)) != NULL) {
2581 robj *key = dictGetEntryKey(de);
2582 robj *o = dictGetEntryVal(de);
2583 time_t expiretime = getExpire(db,key);
2584
2585 /* Save the expire time */
2586 if (expiretime != -1) {
2587 /* If this key is already expired skip it */
2588 if (expiretime < now) continue;
2589 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2590 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2591 }
2592 /* Save the key and associated value */
2593 if (rdbSaveType(fp,o->type) == -1) goto werr;
2594 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2595 if (o->type == REDIS_STRING) {
2596 /* Save a string value */
2597 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2598 } else if (o->type == REDIS_LIST) {
2599 /* Save a list value */
2600 list *list = o->ptr;
2601 listNode *ln;
2602
2603 listRewind(list);
2604 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2605 while((ln = listYield(list))) {
2606 robj *eleobj = listNodeValue(ln);
2607
2608 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2609 }
2610 } else if (o->type == REDIS_SET) {
2611 /* Save a set value */
2612 dict *set = o->ptr;
2613 dictIterator *di = dictGetIterator(set);
2614 dictEntry *de;
2615
2616 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2617 while((de = dictNext(di)) != NULL) {
2618 robj *eleobj = dictGetEntryKey(de);
2619
2620 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2621 }
2622 dictReleaseIterator(di);
2623 } else if (o->type == REDIS_ZSET) {
2624 /* Save a set value */
2625 zset *zs = o->ptr;
2626 dictIterator *di = dictGetIterator(zs->dict);
2627 dictEntry *de;
2628
2629 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2630 while((de = dictNext(di)) != NULL) {
2631 robj *eleobj = dictGetEntryKey(de);
2632 double *score = dictGetEntryVal(de);
2633
2634 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2635 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2636 }
2637 dictReleaseIterator(di);
2638 } else {
2639 redisAssert(0 != 0);
2640 }
2641 }
2642 dictReleaseIterator(di);
2643 }
2644 /* EOF opcode */
2645 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2646
2647 /* Make sure data will not remain on the OS's output buffers */
2648 fflush(fp);
2649 fsync(fileno(fp));
2650 fclose(fp);
2651
2652 /* Use RENAME to make sure the DB file is changed atomically only
2653 * if the generate DB file is ok. */
2654 if (rename(tmpfile,filename) == -1) {
2655 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2656 unlink(tmpfile);
2657 return REDIS_ERR;
2658 }
2659 redisLog(REDIS_NOTICE,"DB saved on disk");
2660 server.dirty = 0;
2661 server.lastsave = time(NULL);
2662 return REDIS_OK;
2663
2664 werr:
2665 fclose(fp);
2666 unlink(tmpfile);
2667 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2668 if (di) dictReleaseIterator(di);
2669 return REDIS_ERR;
2670 }
2671
2672 static int rdbSaveBackground(char *filename) {
2673 pid_t childpid;
2674
2675 if (server.bgsavechildpid != -1) return REDIS_ERR;
2676 if ((childpid = fork()) == 0) {
2677 /* Child */
2678 close(server.fd);
2679 if (rdbSave(filename) == REDIS_OK) {
2680 exit(0);
2681 } else {
2682 exit(1);
2683 }
2684 } else {
2685 /* Parent */
2686 if (childpid == -1) {
2687 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2688 strerror(errno));
2689 return REDIS_ERR;
2690 }
2691 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2692 server.bgsavechildpid = childpid;
2693 return REDIS_OK;
2694 }
2695 return REDIS_OK; /* unreached */
2696 }
2697
2698 static void rdbRemoveTempFile(pid_t childpid) {
2699 char tmpfile[256];
2700
2701 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2702 unlink(tmpfile);
2703 }
2704
2705 static int rdbLoadType(FILE *fp) {
2706 unsigned char type;
2707 if (fread(&type,1,1,fp) == 0) return -1;
2708 return type;
2709 }
2710
2711 static time_t rdbLoadTime(FILE *fp) {
2712 int32_t t32;
2713 if (fread(&t32,4,1,fp) == 0) return -1;
2714 return (time_t) t32;
2715 }
2716
2717 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2718 * of this file for a description of how this are stored on disk.
2719 *
2720 * isencoded is set to 1 if the readed length is not actually a length but
2721 * an "encoding type", check the above comments for more info */
2722 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2723 unsigned char buf[2];
2724 uint32_t len;
2725
2726 if (isencoded) *isencoded = 0;
2727 if (rdbver == 0) {
2728 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2729 return ntohl(len);
2730 } else {
2731 int type;
2732
2733 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2734 type = (buf[0]&0xC0)>>6;
2735 if (type == REDIS_RDB_6BITLEN) {
2736 /* Read a 6 bit len */
2737 return buf[0]&0x3F;
2738 } else if (type == REDIS_RDB_ENCVAL) {
2739 /* Read a 6 bit len encoding type */
2740 if (isencoded) *isencoded = 1;
2741 return buf[0]&0x3F;
2742 } else if (type == REDIS_RDB_14BITLEN) {
2743 /* Read a 14 bit len */
2744 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2745 return ((buf[0]&0x3F)<<8)|buf[1];
2746 } else {
2747 /* Read a 32 bit len */
2748 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2749 return ntohl(len);
2750 }
2751 }
2752 }
2753
2754 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2755 unsigned char enc[4];
2756 long long val;
2757
2758 if (enctype == REDIS_RDB_ENC_INT8) {
2759 if (fread(enc,1,1,fp) == 0) return NULL;
2760 val = (signed char)enc[0];
2761 } else if (enctype == REDIS_RDB_ENC_INT16) {
2762 uint16_t v;
2763 if (fread(enc,2,1,fp) == 0) return NULL;
2764 v = enc[0]|(enc[1]<<8);
2765 val = (int16_t)v;
2766 } else if (enctype == REDIS_RDB_ENC_INT32) {
2767 uint32_t v;
2768 if (fread(enc,4,1,fp) == 0) return NULL;
2769 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2770 val = (int32_t)v;
2771 } else {
2772 val = 0; /* anti-warning */
2773 redisAssert(0!=0);
2774 }
2775 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2776 }
2777
2778 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2779 unsigned int len, clen;
2780 unsigned char *c = NULL;
2781 sds val = NULL;
2782
2783 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2784 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2785 if ((c = zmalloc(clen)) == NULL) goto err;
2786 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2787 if (fread(c,clen,1,fp) == 0) goto err;
2788 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2789 zfree(c);
2790 return createObject(REDIS_STRING,val);
2791 err:
2792 zfree(c);
2793 sdsfree(val);
2794 return NULL;
2795 }
2796
2797 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2798 int isencoded;
2799 uint32_t len;
2800 sds val;
2801
2802 len = rdbLoadLen(fp,rdbver,&isencoded);
2803 if (isencoded) {
2804 switch(len) {
2805 case REDIS_RDB_ENC_INT8:
2806 case REDIS_RDB_ENC_INT16:
2807 case REDIS_RDB_ENC_INT32:
2808 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2809 case REDIS_RDB_ENC_LZF:
2810 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2811 default:
2812 redisAssert(0!=0);
2813 }
2814 }
2815
2816 if (len == REDIS_RDB_LENERR) return NULL;
2817 val = sdsnewlen(NULL,len);
2818 if (len && fread(val,len,1,fp) == 0) {
2819 sdsfree(val);
2820 return NULL;
2821 }
2822 return tryObjectSharing(createObject(REDIS_STRING,val));
2823 }
2824
2825 /* For information about double serialization check rdbSaveDoubleValue() */
2826 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2827 char buf[128];
2828 unsigned char len;
2829
2830 if (fread(&len,1,1,fp) == 0) return -1;
2831 switch(len) {
2832 case 255: *val = R_NegInf; return 0;
2833 case 254: *val = R_PosInf; return 0;
2834 case 253: *val = R_Nan; return 0;
2835 default:
2836 if (fread(buf,len,1,fp) == 0) return -1;
2837 buf[len] = '\0';
2838 sscanf(buf, "%lg", val);
2839 return 0;
2840 }
2841 }
2842
2843 static int rdbLoad(char *filename) {
2844 FILE *fp;
2845 robj *keyobj = NULL;
2846 uint32_t dbid;
2847 int type, retval, rdbver;
2848 dict *d = server.db[0].dict;
2849 redisDb *db = server.db+0;
2850 char buf[1024];
2851 time_t expiretime = -1, now = time(NULL);
2852
2853 fp = fopen(filename,"r");
2854 if (!fp) return REDIS_ERR;
2855 if (fread(buf,9,1,fp) == 0) goto eoferr;
2856 buf[9] = '\0';
2857 if (memcmp(buf,"REDIS",5) != 0) {
2858 fclose(fp);
2859 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2860 return REDIS_ERR;
2861 }
2862 rdbver = atoi(buf+5);
2863 if (rdbver > 1) {
2864 fclose(fp);
2865 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2866 return REDIS_ERR;
2867 }
2868 while(1) {
2869 robj *o;
2870
2871 /* Read type. */
2872 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2873 if (type == REDIS_EXPIRETIME) {
2874 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2875 /* We read the time so we need to read the object type again */
2876 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2877 }
2878 if (type == REDIS_EOF) break;
2879 /* Handle SELECT DB opcode as a special case */
2880 if (type == REDIS_SELECTDB) {
2881 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2882 goto eoferr;
2883 if (dbid >= (unsigned)server.dbnum) {
2884 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2885 exit(1);
2886 }
2887 db = server.db+dbid;
2888 d = db->dict;
2889 continue;
2890 }
2891 /* Read key */
2892 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2893
2894 if (type == REDIS_STRING) {
2895 /* Read string value */
2896 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2897 tryObjectEncoding(o);
2898 } else if (type == REDIS_LIST || type == REDIS_SET) {
2899 /* Read list/set value */
2900 uint32_t listlen;
2901
2902 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2903 goto eoferr;
2904 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2905 /* Load every single element of the list/set */
2906 while(listlen--) {
2907 robj *ele;
2908
2909 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2910 tryObjectEncoding(ele);
2911 if (type == REDIS_LIST) {
2912 listAddNodeTail((list*)o->ptr,ele);
2913 } else {
2914 dictAdd((dict*)o->ptr,ele,NULL);
2915 }
2916 }
2917 } else if (type == REDIS_ZSET) {
2918 /* Read list/set value */
2919 uint32_t zsetlen;
2920 zset *zs;
2921
2922 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2923 goto eoferr;
2924 o = createZsetObject();
2925 zs = o->ptr;
2926 /* Load every single element of the list/set */
2927 while(zsetlen--) {
2928 robj *ele;
2929 double *score = zmalloc(sizeof(double));
2930
2931 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2932 tryObjectEncoding(ele);
2933 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2934 dictAdd(zs->dict,ele,score);
2935 zslInsert(zs->zsl,*score,ele);
2936 incrRefCount(ele); /* added to skiplist */
2937 }
2938 } else {
2939 redisAssert(0 != 0);
2940 }
2941 /* Add the new object in the hash table */
2942 retval = dictAdd(d,keyobj,o);
2943 if (retval == DICT_ERR) {
2944 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2945 exit(1);
2946 }
2947 /* Set the expire time if needed */
2948 if (expiretime != -1) {
2949 setExpire(db,keyobj,expiretime);
2950 /* Delete this key if already expired */
2951 if (expiretime < now) deleteKey(db,keyobj);
2952 expiretime = -1;
2953 }
2954 keyobj = o = NULL;
2955 }
2956 fclose(fp);
2957 return REDIS_OK;
2958
2959 eoferr: /* unexpected end of file is handled here with a fatal exit */
2960 if (keyobj) decrRefCount(keyobj);
2961 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2962 exit(1);
2963 return REDIS_ERR; /* Just to avoid warning */
2964 }
2965
2966 /*================================== Commands =============================== */
2967
2968 static void authCommand(redisClient *c) {
2969 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2970 c->authenticated = 1;
2971 addReply(c,shared.ok);
2972 } else {
2973 c->authenticated = 0;
2974 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2975 }
2976 }
2977
2978 static void pingCommand(redisClient *c) {
2979 addReply(c,shared.pong);
2980 }
2981
2982 static void echoCommand(redisClient *c) {
2983 addReplyBulkLen(c,c->argv[1]);
2984 addReply(c,c->argv[1]);
2985 addReply(c,shared.crlf);
2986 }
2987
2988 /*=================================== Strings =============================== */
2989
2990 static void setGenericCommand(redisClient *c, int nx) {
2991 int retval;
2992
2993 if (nx) deleteIfVolatile(c->db,c->argv[1]);
2994 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2995 if (retval == DICT_ERR) {
2996 if (!nx) {
2997 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2998 incrRefCount(c->argv[2]);
2999 } else {
3000 addReply(c,shared.czero);
3001 return;
3002 }
3003 } else {
3004 incrRefCount(c->argv[1]);
3005 incrRefCount(c->argv[2]);
3006 }
3007 server.dirty++;
3008 removeExpire(c->db,c->argv[1]);
3009 addReply(c, nx ? shared.cone : shared.ok);
3010 }
3011
3012 static void setCommand(redisClient *c) {
3013 setGenericCommand(c,0);
3014 }
3015
3016 static void setnxCommand(redisClient *c) {
3017 setGenericCommand(c,1);
3018 }
3019
3020 static void getCommand(redisClient *c) {
3021 robj *o = lookupKeyRead(c->db,c->argv[1]);
3022
3023 if (o == NULL) {
3024 addReply(c,shared.nullbulk);
3025 } else {
3026 if (o->type != REDIS_STRING) {
3027 addReply(c,shared.wrongtypeerr);
3028 } else {
3029 addReplyBulkLen(c,o);
3030 addReply(c,o);
3031 addReply(c,shared.crlf);
3032 }
3033 }
3034 }
3035
3036 static void getsetCommand(redisClient *c) {
3037 getCommand(c);
3038 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3039 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3040 } else {
3041 incrRefCount(c->argv[1]);
3042 }
3043 incrRefCount(c->argv[2]);
3044 server.dirty++;
3045 removeExpire(c->db,c->argv[1]);
3046 }
3047
3048 static void mgetCommand(redisClient *c) {
3049 int j;
3050
3051 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3052 for (j = 1; j < c->argc; j++) {
3053 robj *o = lookupKeyRead(c->db,c->argv[j]);
3054 if (o == NULL) {
3055 addReply(c,shared.nullbulk);
3056 } else {
3057 if (o->type != REDIS_STRING) {
3058 addReply(c,shared.nullbulk);
3059 } else {
3060 addReplyBulkLen(c,o);
3061 addReply(c,o);
3062 addReply(c,shared.crlf);
3063 }
3064 }
3065 }
3066 }
3067
3068 static void msetGenericCommand(redisClient *c, int nx) {
3069 int j, busykeys = 0;
3070
3071 if ((c->argc % 2) == 0) {
3072 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3073 return;
3074 }
3075 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3076 * set nothing at all if at least one already key exists. */
3077 if (nx) {
3078 for (j = 1; j < c->argc; j += 2) {
3079 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3080 busykeys++;
3081 }
3082 }
3083 }
3084 if (busykeys) {
3085 addReply(c, shared.czero);
3086 return;
3087 }
3088
3089 for (j = 1; j < c->argc; j += 2) {
3090 int retval;
3091
3092 tryObjectEncoding(c->argv[j+1]);
3093 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3094 if (retval == DICT_ERR) {
3095 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3096 incrRefCount(c->argv[j+1]);
3097 } else {
3098 incrRefCount(c->argv[j]);
3099 incrRefCount(c->argv[j+1]);
3100 }
3101 removeExpire(c->db,c->argv[j]);
3102 }
3103 server.dirty += (c->argc-1)/2;
3104 addReply(c, nx ? shared.cone : shared.ok);
3105 }
3106
3107 static void msetCommand(redisClient *c) {
3108 msetGenericCommand(c,0);
3109 }
3110
3111 static void msetnxCommand(redisClient *c) {
3112 msetGenericCommand(c,1);
3113 }
3114
3115 static void incrDecrCommand(redisClient *c, long long incr) {
3116 long long value;
3117 int retval;
3118 robj *o;
3119
3120 o = lookupKeyWrite(c->db,c->argv[1]);
3121 if (o == NULL) {
3122 value = 0;
3123 } else {
3124 if (o->type != REDIS_STRING) {
3125 value = 0;
3126 } else {
3127 char *eptr;
3128
3129 if (o->encoding == REDIS_ENCODING_RAW)
3130 value = strtoll(o->ptr, &eptr, 10);
3131 else if (o->encoding == REDIS_ENCODING_INT)
3132 value = (long)o->ptr;
3133 else
3134 redisAssert(1 != 1);
3135 }
3136 }
3137
3138 value += incr;
3139 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3140 tryObjectEncoding(o);
3141 retval = dictAdd(c->db->dict,c->argv[1],o);
3142 if (retval == DICT_ERR) {
3143 dictReplace(c->db->dict,c->argv[1],o);
3144 removeExpire(c->db,c->argv[1]);
3145 } else {
3146 incrRefCount(c->argv[1]);
3147 }
3148 server.dirty++;
3149 addReply(c,shared.colon);
3150 addReply(c,o);
3151 addReply(c,shared.crlf);
3152 }
3153
3154 static void incrCommand(redisClient *c) {
3155 incrDecrCommand(c,1);
3156 }
3157
3158 static void decrCommand(redisClient *c) {
3159 incrDecrCommand(c,-1);
3160 }
3161
3162 static void incrbyCommand(redisClient *c) {
3163 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3164 incrDecrCommand(c,incr);
3165 }
3166
3167 static void decrbyCommand(redisClient *c) {
3168 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3169 incrDecrCommand(c,-incr);
3170 }
3171
3172 /* ========================= Type agnostic commands ========================= */
3173
3174 static void delCommand(redisClient *c) {
3175 int deleted = 0, j;
3176
3177 for (j = 1; j < c->argc; j++) {
3178 if (deleteKey(c->db,c->argv[j])) {
3179 server.dirty++;
3180 deleted++;
3181 }
3182 }
3183 switch(deleted) {
3184 case 0:
3185 addReply(c,shared.czero);
3186 break;
3187 case 1:
3188 addReply(c,shared.cone);
3189 break;
3190 default:
3191 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3192 break;
3193 }
3194 }
3195
3196 static void existsCommand(redisClient *c) {
3197 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3198 }
3199
3200 static void selectCommand(redisClient *c) {
3201 int id = atoi(c->argv[1]->ptr);
3202
3203 if (selectDb(c,id) == REDIS_ERR) {
3204 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3205 } else {
3206 addReply(c,shared.ok);
3207 }
3208 }
3209
3210 static void randomkeyCommand(redisClient *c) {
3211 dictEntry *de;
3212
3213 while(1) {
3214 de = dictGetRandomKey(c->db->dict);
3215 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3216 }
3217 if (de == NULL) {
3218 addReply(c,shared.plus);
3219 addReply(c,shared.crlf);
3220 } else {
3221 addReply(c,shared.plus);
3222 addReply(c,dictGetEntryKey(de));
3223 addReply(c,shared.crlf);
3224 }
3225 }
3226
3227 static void keysCommand(redisClient *c) {
3228 dictIterator *di;
3229 dictEntry *de;
3230 sds pattern = c->argv[1]->ptr;
3231 int plen = sdslen(pattern);
3232 unsigned long numkeys = 0, keyslen = 0;
3233 robj *lenobj = createObject(REDIS_STRING,NULL);
3234
3235 di = dictGetIterator(c->db->dict);
3236 addReply(c,lenobj);
3237 decrRefCount(lenobj);
3238 while((de = dictNext(di)) != NULL) {
3239 robj *keyobj = dictGetEntryKey(de);
3240
3241 sds key = keyobj->ptr;
3242 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3243 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3244 if (expireIfNeeded(c->db,keyobj) == 0) {
3245 if (numkeys != 0)
3246 addReply(c,shared.space);
3247 addReply(c,keyobj);
3248 numkeys++;
3249 keyslen += sdslen(key);
3250 }
3251 }
3252 }
3253 dictReleaseIterator(di);
3254 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3255 addReply(c,shared.crlf);
3256 }
3257
3258 static void dbsizeCommand(redisClient *c) {
3259 addReplySds(c,
3260 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3261 }
3262
3263 static void lastsaveCommand(redisClient *c) {
3264 addReplySds(c,
3265 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3266 }
3267
3268 static void typeCommand(redisClient *c) {
3269 robj *o;
3270 char *type;
3271
3272 o = lookupKeyRead(c->db,c->argv[1]);
3273 if (o == NULL) {
3274 type = "+none";
3275 } else {
3276 switch(o->type) {
3277 case REDIS_STRING: type = "+string"; break;
3278 case REDIS_LIST: type = "+list"; break;
3279 case REDIS_SET: type = "+set"; break;
3280 case REDIS_ZSET: type = "+zset"; break;
3281 default: type = "unknown"; break;
3282 }
3283 }
3284 addReplySds(c,sdsnew(type));
3285 addReply(c,shared.crlf);
3286 }
3287
3288 static void saveCommand(redisClient *c) {
3289 if (server.bgsavechildpid != -1) {
3290 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3291 return;
3292 }
3293 if (rdbSave(server.dbfilename) == REDIS_OK) {
3294 addReply(c,shared.ok);
3295 } else {
3296 addReply(c,shared.err);
3297 }
3298 }
3299
3300 static void bgsaveCommand(redisClient *c) {
3301 if (server.bgsavechildpid != -1) {
3302 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3303 return;
3304 }
3305 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3306 char *status = "+Background saving started\r\n";
3307 addReplySds(c,sdsnew(status));
3308 } else {
3309 addReply(c,shared.err);
3310 }
3311 }
3312
3313 static void shutdownCommand(redisClient *c) {
3314 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3315 /* Kill the saving child if there is a background saving in progress.
3316 We want to avoid race conditions, for instance our saving child may
3317 overwrite the synchronous saving did by SHUTDOWN. */
3318 if (server.bgsavechildpid != -1) {
3319 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3320 kill(server.bgsavechildpid,SIGKILL);
3321 rdbRemoveTempFile(server.bgsavechildpid);
3322 }
3323 if (server.appendonly) {
3324 /* Append only file: fsync() the AOF and exit */
3325 fsync(server.appendfd);
3326 exit(0);
3327 } else {
3328 /* Snapshotting. Perform a SYNC SAVE and exit */
3329 if (rdbSave(server.dbfilename) == REDIS_OK) {
3330 if (server.daemonize)
3331 unlink(server.pidfile);
3332 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3333 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3334 exit(0);
3335 } else {
3336 /* Ooops.. error saving! The best we can do is to continue operating.
3337 * Note that if there was a background saving process, in the next
3338 * cron() Redis will be notified that the background saving aborted,
3339 * handling special stuff like slaves pending for synchronization... */
3340 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3341 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3342 }
3343 }
3344 }
3345
3346 static void renameGenericCommand(redisClient *c, int nx) {
3347 robj *o;
3348
3349 /* To use the same key as src and dst is probably an error */
3350 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3351 addReply(c,shared.sameobjecterr);
3352 return;
3353 }
3354
3355 o = lookupKeyWrite(c->db,c->argv[1]);
3356 if (o == NULL) {
3357 addReply(c,shared.nokeyerr);
3358 return;
3359 }
3360 incrRefCount(o);
3361 deleteIfVolatile(c->db,c->argv[2]);
3362 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3363 if (nx) {
3364 decrRefCount(o);
3365 addReply(c,shared.czero);
3366 return;
3367 }
3368 dictReplace(c->db->dict,c->argv[2],o);
3369 } else {
3370 incrRefCount(c->argv[2]);
3371 }
3372 deleteKey(c->db,c->argv[1]);
3373 server.dirty++;
3374 addReply(c,nx ? shared.cone : shared.ok);
3375 }
3376
3377 static void renameCommand(redisClient *c) {
3378 renameGenericCommand(c,0);
3379 }
3380
3381 static void renamenxCommand(redisClient *c) {
3382 renameGenericCommand(c,1);
3383 }
3384
3385 static void moveCommand(redisClient *c) {
3386 robj *o;
3387 redisDb *src, *dst;
3388 int srcid;
3389
3390 /* Obtain source and target DB pointers */
3391 src = c->db;
3392 srcid = c->db->id;
3393 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3394 addReply(c,shared.outofrangeerr);
3395 return;
3396 }
3397 dst = c->db;
3398 selectDb(c,srcid); /* Back to the source DB */
3399
3400 /* If the user is moving using as target the same
3401 * DB as the source DB it is probably an error. */
3402 if (src == dst) {
3403 addReply(c,shared.sameobjecterr);
3404 return;
3405 }
3406
3407 /* Check if the element exists and get a reference */
3408 o = lookupKeyWrite(c->db,c->argv[1]);
3409 if (!o) {
3410 addReply(c,shared.czero);
3411 return;
3412 }
3413
3414 /* Try to add the element to the target DB */
3415 deleteIfVolatile(dst,c->argv[1]);
3416 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3417 addReply(c,shared.czero);
3418 return;
3419 }
3420 incrRefCount(c->argv[1]);
3421 incrRefCount(o);
3422
3423 /* OK! key moved, free the entry in the source DB */
3424 deleteKey(src,c->argv[1]);
3425 server.dirty++;
3426 addReply(c,shared.cone);
3427 }
3428
3429 /* =================================== Lists ================================ */
3430 static void pushGenericCommand(redisClient *c, int where) {
3431 robj *lobj;
3432 list *list;
3433
3434 lobj = lookupKeyWrite(c->db,c->argv[1]);
3435 if (lobj == NULL) {
3436 lobj = createListObject();
3437 list = lobj->ptr;
3438 if (where == REDIS_HEAD) {
3439 listAddNodeHead(list,c->argv[2]);
3440 } else {
3441 listAddNodeTail(list,c->argv[2]);
3442 }
3443 dictAdd(c->db->dict,c->argv[1],lobj);
3444 incrRefCount(c->argv[1]);
3445 incrRefCount(c->argv[2]);
3446 } else {
3447 if (lobj->type != REDIS_LIST) {
3448 addReply(c,shared.wrongtypeerr);
3449 return;
3450 }
3451 list = lobj->ptr;
3452 if (where == REDIS_HEAD) {
3453 listAddNodeHead(list,c->argv[2]);
3454 } else {
3455 listAddNodeTail(list,c->argv[2]);
3456 }
3457 incrRefCount(c->argv[2]);
3458 }
3459 server.dirty++;
3460 addReply(c,shared.ok);
3461 }
3462
3463 static void lpushCommand(redisClient *c) {
3464 pushGenericCommand(c,REDIS_HEAD);
3465 }
3466
3467 static void rpushCommand(redisClient *c) {
3468 pushGenericCommand(c,REDIS_TAIL);
3469 }
3470
3471 static void llenCommand(redisClient *c) {
3472 robj *o;
3473 list *l;
3474
3475 o = lookupKeyRead(c->db,c->argv[1]);
3476 if (o == NULL) {
3477 addReply(c,shared.czero);
3478 return;
3479 } else {
3480 if (o->type != REDIS_LIST) {
3481 addReply(c,shared.wrongtypeerr);
3482 } else {
3483 l = o->ptr;
3484 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3485 }
3486 }
3487 }
3488
3489 static void lindexCommand(redisClient *c) {
3490 robj *o;
3491 int index = atoi(c->argv[2]->ptr);
3492
3493 o = lookupKeyRead(c->db,c->argv[1]);
3494 if (o == NULL) {
3495 addReply(c,shared.nullbulk);
3496 } else {
3497 if (o->type != REDIS_LIST) {
3498 addReply(c,shared.wrongtypeerr);
3499 } else {
3500 list *list = o->ptr;
3501 listNode *ln;
3502
3503 ln = listIndex(list, index);
3504 if (ln == NULL) {
3505 addReply(c,shared.nullbulk);
3506 } else {
3507 robj *ele = listNodeValue(ln);
3508 addReplyBulkLen(c,ele);
3509 addReply(c,ele);
3510 addReply(c,shared.crlf);
3511 }
3512 }
3513 }
3514 }
3515
3516 static void lsetCommand(redisClient *c) {
3517 robj *o;
3518 int index = atoi(c->argv[2]->ptr);
3519
3520 o = lookupKeyWrite(c->db,c->argv[1]);
3521 if (o == NULL) {
3522 addReply(c,shared.nokeyerr);
3523 } else {
3524 if (o->type != REDIS_LIST) {
3525 addReply(c,shared.wrongtypeerr);
3526 } else {
3527 list *list = o->ptr;
3528 listNode *ln;
3529
3530 ln = listIndex(list, index);
3531 if (ln == NULL) {
3532 addReply(c,shared.outofrangeerr);
3533 } else {
3534 robj *ele = listNodeValue(ln);
3535
3536 decrRefCount(ele);
3537 listNodeValue(ln) = c->argv[3];
3538 incrRefCount(c->argv[3]);
3539 addReply(c,shared.ok);
3540 server.dirty++;
3541 }
3542 }
3543 }
3544 }
3545
3546 static void popGenericCommand(redisClient *c, int where) {
3547 robj *o;
3548
3549 o = lookupKeyWrite(c->db,c->argv[1]);
3550 if (o == NULL) {
3551 addReply(c,shared.nullbulk);
3552 } else {
3553 if (o->type != REDIS_LIST) {
3554 addReply(c,shared.wrongtypeerr);
3555 } else {
3556 list *list = o->ptr;
3557 listNode *ln;
3558
3559 if (where == REDIS_HEAD)
3560 ln = listFirst(list);
3561 else
3562 ln = listLast(list);
3563
3564 if (ln == NULL) {
3565 addReply(c,shared.nullbulk);
3566 } else {
3567 robj *ele = listNodeValue(ln);
3568 addReplyBulkLen(c,ele);
3569 addReply(c,ele);
3570 addReply(c,shared.crlf);
3571 listDelNode(list,ln);
3572 server.dirty++;
3573 }
3574 }
3575 }
3576 }
3577
3578 static void lpopCommand(redisClient *c) {
3579 popGenericCommand(c,REDIS_HEAD);
3580 }
3581
3582 static void rpopCommand(redisClient *c) {
3583 popGenericCommand(c,REDIS_TAIL);
3584 }
3585
3586 static void lrangeCommand(redisClient *c) {
3587 robj *o;
3588 int start = atoi(c->argv[2]->ptr);
3589 int end = atoi(c->argv[3]->ptr);
3590
3591 o = lookupKeyRead(c->db,c->argv[1]);
3592 if (o == NULL) {
3593 addReply(c,shared.nullmultibulk);
3594 } else {
3595 if (o->type != REDIS_LIST) {
3596 addReply(c,shared.wrongtypeerr);
3597 } else {
3598 list *list = o->ptr;
3599 listNode *ln;
3600 int llen = listLength(list);
3601 int rangelen, j;
3602 robj *ele;
3603
3604 /* convert negative indexes */
3605 if (start < 0) start = llen+start;
3606 if (end < 0) end = llen+end;
3607 if (start < 0) start = 0;
3608 if (end < 0) end = 0;
3609
3610 /* indexes sanity checks */
3611 if (start > end || start >= llen) {
3612 /* Out of range start or start > end result in empty list */
3613 addReply(c,shared.emptymultibulk);
3614 return;
3615 }
3616 if (end >= llen) end = llen-1;
3617 rangelen = (end-start)+1;
3618
3619 /* Return the result in form of a multi-bulk reply */
3620 ln = listIndex(list, start);
3621 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3622 for (j = 0; j < rangelen; j++) {
3623 ele = listNodeValue(ln);
3624 addReplyBulkLen(c,ele);
3625 addReply(c,ele);
3626 addReply(c,shared.crlf);
3627 ln = ln->next;
3628 }
3629 }
3630 }
3631 }
3632
3633 static void ltrimCommand(redisClient *c) {
3634 robj *o;
3635 int start = atoi(c->argv[2]->ptr);
3636 int end = atoi(c->argv[3]->ptr);
3637
3638 o = lookupKeyWrite(c->db,c->argv[1]);
3639 if (o == NULL) {
3640 addReply(c,shared.ok);
3641 } else {
3642 if (o->type != REDIS_LIST) {
3643 addReply(c,shared.wrongtypeerr);
3644 } else {
3645 list *list = o->ptr;
3646 listNode *ln;
3647 int llen = listLength(list);
3648 int j, ltrim, rtrim;
3649
3650 /* convert negative indexes */
3651 if (start < 0) start = llen+start;
3652 if (end < 0) end = llen+end;
3653 if (start < 0) start = 0;
3654 if (end < 0) end = 0;
3655
3656 /* indexes sanity checks */
3657 if (start > end || start >= llen) {
3658 /* Out of range start or start > end result in empty list */
3659 ltrim = llen;
3660 rtrim = 0;
3661 } else {
3662 if (end >= llen) end = llen-1;
3663 ltrim = start;
3664 rtrim = llen-end-1;
3665 }
3666
3667 /* Remove list elements to perform the trim */
3668 for (j = 0; j < ltrim; j++) {
3669 ln = listFirst(list);
3670 listDelNode(list,ln);
3671 }
3672 for (j = 0; j < rtrim; j++) {
3673 ln = listLast(list);
3674 listDelNode(list,ln);
3675 }
3676 server.dirty++;
3677 addReply(c,shared.ok);
3678 }
3679 }
3680 }
3681
3682 static void lremCommand(redisClient *c) {
3683 robj *o;
3684
3685 o = lookupKeyWrite(c->db,c->argv[1]);
3686 if (o == NULL) {
3687 addReply(c,shared.czero);
3688 } else {
3689 if (o->type != REDIS_LIST) {
3690 addReply(c,shared.wrongtypeerr);
3691 } else {
3692 list *list = o->ptr;
3693 listNode *ln, *next;
3694 int toremove = atoi(c->argv[2]->ptr);
3695 int removed = 0;
3696 int fromtail = 0;
3697
3698 if (toremove < 0) {
3699 toremove = -toremove;
3700 fromtail = 1;
3701 }
3702 ln = fromtail ? list->tail : list->head;
3703 while (ln) {
3704 robj *ele = listNodeValue(ln);
3705
3706 next = fromtail ? ln->prev : ln->next;
3707 if (compareStringObjects(ele,c->argv[3]) == 0) {
3708 listDelNode(list,ln);
3709 server.dirty++;
3710 removed++;
3711 if (toremove && removed == toremove) break;
3712 }
3713 ln = next;
3714 }
3715 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3716 }
3717 }
3718 }
3719
3720 /* This is the semantic of this command:
3721 * RPOPLPUSH srclist dstlist:
3722 * IF LLEN(srclist) > 0
3723 * element = RPOP srclist
3724 * LPUSH dstlist element
3725 * RETURN element
3726 * ELSE
3727 * RETURN nil
3728 * END
3729 * END
3730 *
3731 * The idea is to be able to get an element from a list in a reliable way
3732 * since the element is not just returned but pushed against another list
3733 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3734 */
3735 static void rpoplpushcommand(redisClient *c) {
3736 robj *sobj;
3737
3738 sobj = lookupKeyWrite(c->db,c->argv[1]);
3739 if (sobj == NULL) {
3740 addReply(c,shared.nullbulk);
3741 } else {
3742 if (sobj->type != REDIS_LIST) {
3743 addReply(c,shared.wrongtypeerr);
3744 } else {
3745 list *srclist = sobj->ptr;
3746 listNode *ln = listLast(srclist);
3747
3748 if (ln == NULL) {
3749 addReply(c,shared.nullbulk);
3750 } else {
3751 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3752 robj *ele = listNodeValue(ln);
3753 list *dstlist;
3754
3755 if (dobj == NULL) {
3756
3757 /* Create the list if the key does not exist */
3758 dobj = createListObject();
3759 dictAdd(c->db->dict,c->argv[2],dobj);
3760 incrRefCount(c->argv[2]);
3761 } else if (dobj->type != REDIS_LIST) {
3762 addReply(c,shared.wrongtypeerr);
3763 return;
3764 }
3765 /* Add the element to the target list */
3766 dstlist = dobj->ptr;
3767 listAddNodeHead(dstlist,ele);
3768 incrRefCount(ele);
3769
3770 /* Send the element to the client as reply as well */
3771 addReplyBulkLen(c,ele);
3772 addReply(c,ele);
3773 addReply(c,shared.crlf);
3774
3775 /* Finally remove the element from the source list */
3776 listDelNode(srclist,ln);
3777 server.dirty++;
3778 }
3779 }
3780 }
3781 }
3782
3783
3784 /* ==================================== Sets ================================ */
3785
3786 static void saddCommand(redisClient *c) {
3787 robj *set;
3788
3789 set = lookupKeyWrite(c->db,c->argv[1]);
3790 if (set == NULL) {
3791 set = createSetObject();
3792 dictAdd(c->db->dict,c->argv[1],set);
3793 incrRefCount(c->argv[1]);
3794 } else {
3795 if (set->type != REDIS_SET) {
3796 addReply(c,shared.wrongtypeerr);
3797 return;
3798 }
3799 }
3800 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3801 incrRefCount(c->argv[2]);
3802 server.dirty++;
3803 addReply(c,shared.cone);
3804 } else {
3805 addReply(c,shared.czero);
3806 }
3807 }
3808
3809 static void sremCommand(redisClient *c) {
3810 robj *set;
3811
3812 set = lookupKeyWrite(c->db,c->argv[1]);
3813 if (set == NULL) {
3814 addReply(c,shared.czero);
3815 } else {
3816 if (set->type != REDIS_SET) {
3817 addReply(c,shared.wrongtypeerr);
3818 return;
3819 }
3820 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3821 server.dirty++;
3822 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3823 addReply(c,shared.cone);
3824 } else {
3825 addReply(c,shared.czero);
3826 }
3827 }
3828 }
3829
3830 static void smoveCommand(redisClient *c) {
3831 robj *srcset, *dstset;
3832
3833 srcset = lookupKeyWrite(c->db,c->argv[1]);
3834 dstset = lookupKeyWrite(c->db,c->argv[2]);
3835
3836 /* If the source key does not exist return 0, if it's of the wrong type
3837 * raise an error */
3838 if (srcset == NULL || srcset->type != REDIS_SET) {
3839 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3840 return;
3841 }
3842 /* Error if the destination key is not a set as well */
3843 if (dstset && dstset->type != REDIS_SET) {
3844 addReply(c,shared.wrongtypeerr);
3845 return;
3846 }
3847 /* Remove the element from the source set */
3848 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3849 /* Key not found in the src set! return zero */
3850 addReply(c,shared.czero);
3851 return;
3852 }
3853 server.dirty++;
3854 /* Add the element to the destination set */
3855 if (!dstset) {
3856 dstset = createSetObject();
3857 dictAdd(c->db->dict,c->argv[2],dstset);
3858 incrRefCount(c->argv[2]);
3859 }
3860 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3861 incrRefCount(c->argv[3]);
3862 addReply(c,shared.cone);
3863 }
3864
3865 static void sismemberCommand(redisClient *c) {
3866 robj *set;
3867
3868 set = lookupKeyRead(c->db,c->argv[1]);
3869 if (set == NULL) {
3870 addReply(c,shared.czero);
3871 } else {
3872 if (set->type != REDIS_SET) {
3873 addReply(c,shared.wrongtypeerr);
3874 return;
3875 }
3876 if (dictFind(set->ptr,c->argv[2]))
3877 addReply(c,shared.cone);
3878 else
3879 addReply(c,shared.czero);
3880 }
3881 }
3882
3883 static void scardCommand(redisClient *c) {
3884 robj *o;
3885 dict *s;
3886
3887 o = lookupKeyRead(c->db,c->argv[1]);
3888 if (o == NULL) {
3889 addReply(c,shared.czero);
3890 return;
3891 } else {
3892 if (o->type != REDIS_SET) {
3893 addReply(c,shared.wrongtypeerr);
3894 } else {
3895 s = o->ptr;
3896 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
3897 dictSize(s)));
3898 }
3899 }
3900 }
3901
3902 static void spopCommand(redisClient *c) {
3903 robj *set;
3904 dictEntry *de;
3905
3906 set = lookupKeyWrite(c->db,c->argv[1]);
3907 if (set == NULL) {
3908 addReply(c,shared.nullbulk);
3909 } else {
3910 if (set->type != REDIS_SET) {
3911 addReply(c,shared.wrongtypeerr);
3912 return;
3913 }
3914 de = dictGetRandomKey(set->ptr);
3915 if (de == NULL) {
3916 addReply(c,shared.nullbulk);
3917 } else {
3918 robj *ele = dictGetEntryKey(de);
3919
3920 addReplyBulkLen(c,ele);
3921 addReply(c,ele);
3922 addReply(c,shared.crlf);
3923 dictDelete(set->ptr,ele);
3924 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3925 server.dirty++;
3926 }
3927 }
3928 }
3929
3930 static void srandmemberCommand(redisClient *c) {
3931 robj *set;
3932 dictEntry *de;
3933
3934 set = lookupKeyRead(c->db,c->argv[1]);
3935 if (set == NULL) {
3936 addReply(c,shared.nullbulk);
3937 } else {
3938 if (set->type != REDIS_SET) {
3939 addReply(c,shared.wrongtypeerr);
3940 return;
3941 }
3942 de = dictGetRandomKey(set->ptr);
3943 if (de == NULL) {
3944 addReply(c,shared.nullbulk);
3945 } else {
3946 robj *ele = dictGetEntryKey(de);
3947
3948 addReplyBulkLen(c,ele);
3949 addReply(c,ele);
3950 addReply(c,shared.crlf);
3951 }
3952 }
3953 }
3954
3955 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3956 dict **d1 = (void*) s1, **d2 = (void*) s2;
3957
3958 return dictSize(*d1)-dictSize(*d2);
3959 }
3960
3961 static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
3962 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3963 dictIterator *di;
3964 dictEntry *de;
3965 robj *lenobj = NULL, *dstset = NULL;
3966 unsigned long j, cardinality = 0;
3967
3968 for (j = 0; j < setsnum; j++) {
3969 robj *setobj;
3970
3971 setobj = dstkey ?
3972 lookupKeyWrite(c->db,setskeys[j]) :
3973 lookupKeyRead(c->db,setskeys[j]);
3974 if (!setobj) {
3975 zfree(dv);
3976 if (dstkey) {
3977 if (deleteKey(c->db,dstkey))
3978 server.dirty++;
3979 addReply(c,shared.czero);
3980 } else {
3981 addReply(c,shared.nullmultibulk);
3982 }
3983 return;
3984 }
3985 if (setobj->type != REDIS_SET) {
3986 zfree(dv);
3987 addReply(c,shared.wrongtypeerr);
3988 return;
3989 }
3990 dv[j] = setobj->ptr;
3991 }
3992 /* Sort sets from the smallest to largest, this will improve our
3993 * algorithm's performace */
3994 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3995
3996 /* The first thing we should output is the total number of elements...
3997 * since this is a multi-bulk write, but at this stage we don't know
3998 * the intersection set size, so we use a trick, append an empty object
3999 * to the output list and save the pointer to later modify it with the
4000 * right length */
4001 if (!dstkey) {
4002 lenobj = createObject(REDIS_STRING,NULL);
4003 addReply(c,lenobj);
4004 decrRefCount(lenobj);
4005 } else {
4006 /* If we have a target key where to store the resulting set
4007 * create this key with an empty set inside */
4008 dstset = createSetObject();
4009 }
4010
4011 /* Iterate all the elements of the first (smallest) set, and test
4012 * the element against all the other sets, if at least one set does
4013 * not include the element it is discarded */
4014 di = dictGetIterator(dv[0]);
4015
4016 while((de = dictNext(di)) != NULL) {
4017 robj *ele;
4018
4019 for (j = 1; j < setsnum; j++)
4020 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4021 if (j != setsnum)
4022 continue; /* at least one set does not contain the member */
4023 ele = dictGetEntryKey(de);
4024 if (!dstkey) {
4025 addReplyBulkLen(c,ele);
4026 addReply(c,ele);
4027 addReply(c,shared.crlf);
4028 cardinality++;
4029 } else {
4030 dictAdd(dstset->ptr,ele,NULL);
4031 incrRefCount(ele);
4032 }
4033 }
4034 dictReleaseIterator(di);
4035
4036 if (dstkey) {
4037 /* Store the resulting set into the target */
4038 deleteKey(c->db,dstkey);
4039 dictAdd(c->db->dict,dstkey,dstset);
4040 incrRefCount(dstkey);
4041 }
4042
4043 if (!dstkey) {
4044 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
4045 } else {
4046 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4047 dictSize((dict*)dstset->ptr)));
4048 server.dirty++;
4049 }
4050 zfree(dv);
4051 }
4052
4053 static void sinterCommand(redisClient *c) {
4054 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4055 }
4056
4057 static void sinterstoreCommand(redisClient *c) {
4058 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4059 }
4060
4061 #define REDIS_OP_UNION 0
4062 #define REDIS_OP_DIFF 1
4063
4064 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4065 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4066 dictIterator *di;
4067 dictEntry *de;
4068 robj *dstset = NULL;
4069 int j, cardinality = 0;
4070
4071 for (j = 0; j < setsnum; j++) {
4072 robj *setobj;
4073
4074 setobj = dstkey ?
4075 lookupKeyWrite(c->db,setskeys[j]) :
4076 lookupKeyRead(c->db,setskeys[j]);
4077 if (!setobj) {
4078 dv[j] = NULL;
4079 continue;
4080 }
4081 if (setobj->type != REDIS_SET) {
4082 zfree(dv);
4083 addReply(c,shared.wrongtypeerr);
4084 return;
4085 }
4086 dv[j] = setobj->ptr;
4087 }
4088
4089 /* We need a temp set object to store our union. If the dstkey
4090 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4091 * this set object will be the resulting object to set into the target key*/
4092 dstset = createSetObject();
4093
4094 /* Iterate all the elements of all the sets, add every element a single
4095 * time to the result set */
4096 for (j = 0; j < setsnum; j++) {
4097 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4098 if (!dv[j]) continue; /* non existing keys are like empty sets */
4099
4100 di = dictGetIterator(dv[j]);
4101
4102 while((de = dictNext(di)) != NULL) {
4103 robj *ele;
4104
4105 /* dictAdd will not add the same element multiple times */
4106 ele = dictGetEntryKey(de);
4107 if (op == REDIS_OP_UNION || j == 0) {
4108 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4109 incrRefCount(ele);
4110 cardinality++;
4111 }
4112 } else if (op == REDIS_OP_DIFF) {
4113 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4114 cardinality--;
4115 }
4116 }
4117 }
4118 dictReleaseIterator(di);
4119
4120 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4121 }
4122
4123 /* Output the content of the resulting set, if not in STORE mode */
4124 if (!dstkey) {
4125 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4126 di = dictGetIterator(dstset->ptr);
4127 while((de = dictNext(di)) != NULL) {
4128 robj *ele;
4129
4130 ele = dictGetEntryKey(de);
4131 addReplyBulkLen(c,ele);
4132 addReply(c,ele);
4133 addReply(c,shared.crlf);
4134 }
4135 dictReleaseIterator(di);
4136 } else {
4137 /* If we have a target key where to store the resulting set
4138 * create this key with the result set inside */
4139 deleteKey(c->db,dstkey);
4140 dictAdd(c->db->dict,dstkey,dstset);
4141 incrRefCount(dstkey);
4142 }
4143
4144 /* Cleanup */
4145 if (!dstkey) {
4146 decrRefCount(dstset);
4147 } else {
4148 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4149 dictSize((dict*)dstset->ptr)));
4150 server.dirty++;
4151 }
4152 zfree(dv);
4153 }
4154
4155 static void sunionCommand(redisClient *c) {
4156 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4157 }
4158
4159 static void sunionstoreCommand(redisClient *c) {
4160 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4161 }
4162
4163 static void sdiffCommand(redisClient *c) {
4164 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4165 }
4166
4167 static void sdiffstoreCommand(redisClient *c) {
4168 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4169 }
4170
4171 /* ==================================== ZSets =============================== */
4172
4173 /* ZSETs are ordered sets using two data structures to hold the same elements
4174 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4175 * data structure.
4176 *
4177 * The elements are added to an hash table mapping Redis objects to scores.
4178 * At the same time the elements are added to a skip list mapping scores
4179 * to Redis objects (so objects are sorted by scores in this "view"). */
4180
4181 /* This skiplist implementation is almost a C translation of the original
4182 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4183 * Alternative to Balanced Trees", modified in three ways:
4184 * a) this implementation allows for repeated values.
4185 * b) the comparison is not just by key (our 'score') but by satellite data.
4186 * c) there is a back pointer, so it's a doubly linked list with the back
4187 * pointers being only at "level 1". This allows to traverse the list
4188 * from tail to head, useful for ZREVRANGE. */
4189
4190 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4191 zskiplistNode *zn = zmalloc(sizeof(*zn));
4192
4193 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4194 zn->score = score;
4195 zn->obj = obj;
4196 return zn;
4197 }
4198
4199 static zskiplist *zslCreate(void) {
4200 int j;
4201 zskiplist *zsl;
4202
4203 zsl = zmalloc(sizeof(*zsl));
4204 zsl->level = 1;
4205 zsl->length = 0;
4206 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4207 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4208 zsl->header->forward[j] = NULL;
4209 zsl->header->backward = NULL;
4210 zsl->tail = NULL;
4211 return zsl;
4212 }
4213
4214 static void zslFreeNode(zskiplistNode *node) {
4215 decrRefCount(node->obj);
4216 zfree(node->forward);
4217 zfree(node);
4218 }
4219
4220 static void zslFree(zskiplist *zsl) {
4221 zskiplistNode *node = zsl->header->forward[0], *next;
4222
4223 zfree(zsl->header->forward);
4224 zfree(zsl->header);
4225 while(node) {
4226 next = node->forward[0];
4227 zslFreeNode(node);
4228 node = next;
4229 }
4230 zfree(zsl);
4231 }
4232
4233 static int zslRandomLevel(void) {
4234 int level = 1;
4235 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4236 level += 1;
4237 return level;
4238 }
4239
4240 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4241 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4242 int i, level;
4243
4244 x = zsl->header;
4245 for (i = zsl->level-1; i >= 0; i--) {
4246 while (x->forward[i] &&
4247 (x->forward[i]->score < score ||
4248 (x->forward[i]->score == score &&
4249 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4250 x = x->forward[i];
4251 update[i] = x;
4252 }
4253 /* we assume the key is not already inside, since we allow duplicated
4254 * scores, and the re-insertion of score and redis object should never
4255 * happpen since the caller of zslInsert() should test in the hash table
4256 * if the element is already inside or not. */
4257 level = zslRandomLevel();
4258 if (level > zsl->level) {
4259 for (i = zsl->level; i < level; i++)
4260 update[i] = zsl->header;
4261 zsl->level = level;
4262 }
4263 x = zslCreateNode(level,score,obj);
4264 for (i = 0; i < level; i++) {
4265 x->forward[i] = update[i]->forward[i];
4266 update[i]->forward[i] = x;
4267 }
4268 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4269 if (x->forward[0])
4270 x->forward[0]->backward = x;
4271 else
4272 zsl->tail = x;
4273 zsl->length++;
4274 }
4275
4276 /* Delete an element with matching score/object from the skiplist. */
4277 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4278 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4279 int i;
4280
4281 x = zsl->header;
4282 for (i = zsl->level-1; i >= 0; i--) {
4283 while (x->forward[i] &&
4284 (x->forward[i]->score < score ||
4285 (x->forward[i]->score == score &&
4286 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4287 x = x->forward[i];
4288 update[i] = x;
4289 }
4290 /* We may have multiple elements with the same score, what we need
4291 * is to find the element with both the right score and object. */
4292 x = x->forward[0];
4293 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4294 for (i = 0; i < zsl->level; i++) {
4295 if (update[i]->forward[i] != x) break;
4296 update[i]->forward[i] = x->forward[i];
4297 }
4298 if (x->forward[0]) {
4299 x->forward[0]->backward = (x->backward == zsl->header) ?
4300 NULL : x->backward;
4301 } else {
4302 zsl->tail = x->backward;
4303 }
4304 zslFreeNode(x);
4305 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4306 zsl->level--;
4307 zsl->length--;
4308 return 1;
4309 } else {
4310 return 0; /* not found */
4311 }
4312 return 0; /* not found */
4313 }
4314
4315 /* Delete all the elements with score between min and max from the skiplist.
4316 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4317 * Note that this function takes the reference to the hash table view of the
4318 * sorted set, in order to remove the elements from the hash table too. */
4319 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4320 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4321 unsigned long removed = 0;
4322 int i;
4323
4324 x = zsl->header;
4325 for (i = zsl->level-1; i >= 0; i--) {
4326 while (x->forward[i] && x->forward[i]->score < min)
4327 x = x->forward[i];
4328 update[i] = x;
4329 }
4330 /* We may have multiple elements with the same score, what we need
4331 * is to find the element with both the right score and object. */
4332 x = x->forward[0];
4333 while (x && x->score <= max) {
4334 zskiplistNode *next;
4335
4336 for (i = 0; i < zsl->level; i++) {
4337 if (update[i]->forward[i] != x) break;
4338 update[i]->forward[i] = x->forward[i];
4339 }
4340 if (x->forward[0]) {
4341 x->forward[0]->backward = (x->backward == zsl->header) ?
4342 NULL : x->backward;
4343 } else {
4344 zsl->tail = x->backward;
4345 }
4346 next = x->forward[0];
4347 dictDelete(dict,x->obj);
4348 zslFreeNode(x);
4349 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4350 zsl->level--;
4351 zsl->length--;
4352 removed++;
4353 x = next;
4354 }
4355 return removed; /* not found */
4356 }
4357
4358 /* Find the first node having a score equal or greater than the specified one.
4359 * Returns NULL if there is no match. */
4360 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4361 zskiplistNode *x;
4362 int i;
4363
4364 x = zsl->header;
4365 for (i = zsl->level-1; i >= 0; i--) {
4366 while (x->forward[i] && x->forward[i]->score < score)
4367 x = x->forward[i];
4368 }
4369 /* We may have multiple elements with the same score, what we need
4370 * is to find the element with both the right score and object. */
4371 return x->forward[0];
4372 }
4373
4374 /* The actual Z-commands implementations */
4375
4376 /* This generic command implements both ZADD and ZINCRBY.
4377 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4378 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4379 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4380 robj *zsetobj;
4381 zset *zs;
4382 double *score;
4383
4384 zsetobj = lookupKeyWrite(c->db,key);
4385 if (zsetobj == NULL) {
4386 zsetobj = createZsetObject();
4387 dictAdd(c->db->dict,key,zsetobj);
4388 incrRefCount(key);
4389 } else {
4390 if (zsetobj->type != REDIS_ZSET) {
4391 addReply(c,shared.wrongtypeerr);
4392 return;
4393 }
4394 }
4395 zs = zsetobj->ptr;
4396
4397 /* Ok now since we implement both ZADD and ZINCRBY here the code
4398 * needs to handle the two different conditions. It's all about setting
4399 * '*score', that is, the new score to set, to the right value. */
4400 score = zmalloc(sizeof(double));
4401 if (doincrement) {
4402 dictEntry *de;
4403
4404 /* Read the old score. If the element was not present starts from 0 */
4405 de = dictFind(zs->dict,ele);
4406 if (de) {
4407 double *oldscore = dictGetEntryVal(de);
4408 *score = *oldscore + scoreval;
4409 } else {
4410 *score = scoreval;
4411 }
4412 } else {
4413 *score = scoreval;
4414 }
4415
4416 /* What follows is a simple remove and re-insert operation that is common
4417 * to both ZADD and ZINCRBY... */
4418 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4419 /* case 1: New element */
4420 incrRefCount(ele); /* added to hash */
4421 zslInsert(zs->zsl,*score,ele);
4422 incrRefCount(ele); /* added to skiplist */
4423 server.dirty++;
4424 if (doincrement)
4425 addReplyDouble(c,*score);
4426 else
4427 addReply(c,shared.cone);
4428 } else {
4429 dictEntry *de;
4430 double *oldscore;
4431
4432 /* case 2: Score update operation */
4433 de = dictFind(zs->dict,ele);
4434 redisAssert(de != NULL);
4435 oldscore = dictGetEntryVal(de);
4436 if (*score != *oldscore) {
4437 int deleted;
4438
4439 /* Remove and insert the element in the skip list with new score */
4440 deleted = zslDelete(zs->zsl,*oldscore,ele);
4441 redisAssert(deleted != 0);
4442 zslInsert(zs->zsl,*score,ele);
4443 incrRefCount(ele);
4444 /* Update the score in the hash table */
4445 dictReplace(zs->dict,ele,score);
4446 server.dirty++;
4447 } else {
4448 zfree(score);
4449 }
4450 if (doincrement)
4451 addReplyDouble(c,*score);
4452 else
4453 addReply(c,shared.czero);
4454 }
4455 }
4456
4457 static void zaddCommand(redisClient *c) {
4458 double scoreval;
4459
4460 scoreval = strtod(c->argv[2]->ptr,NULL);
4461 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4462 }
4463
4464 static void zincrbyCommand(redisClient *c) {
4465 double scoreval;
4466
4467 scoreval = strtod(c->argv[2]->ptr,NULL);
4468 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4469 }
4470
4471 static void zremCommand(redisClient *c) {
4472 robj *zsetobj;
4473 zset *zs;
4474
4475 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4476 if (zsetobj == NULL) {
4477 addReply(c,shared.czero);
4478 } else {
4479 dictEntry *de;
4480 double *oldscore;
4481 int deleted;
4482
4483 if (zsetobj->type != REDIS_ZSET) {
4484 addReply(c,shared.wrongtypeerr);
4485 return;
4486 }
4487 zs = zsetobj->ptr;
4488 de = dictFind(zs->dict,c->argv[2]);
4489 if (de == NULL) {
4490 addReply(c,shared.czero);
4491 return;
4492 }
4493 /* Delete from the skiplist */
4494 oldscore = dictGetEntryVal(de);
4495 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4496 redisAssert(deleted != 0);
4497
4498 /* Delete from the hash table */
4499 dictDelete(zs->dict,c->argv[2]);
4500 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4501 server.dirty++;
4502 addReply(c,shared.cone);
4503 }
4504 }
4505
4506 static void zremrangebyscoreCommand(redisClient *c) {
4507 double min = strtod(c->argv[2]->ptr,NULL);
4508 double max = strtod(c->argv[3]->ptr,NULL);
4509 robj *zsetobj;
4510 zset *zs;
4511
4512 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4513 if (zsetobj == NULL) {
4514 addReply(c,shared.czero);
4515 } else {
4516 long deleted;
4517
4518 if (zsetobj->type != REDIS_ZSET) {
4519 addReply(c,shared.wrongtypeerr);
4520 return;
4521 }
4522 zs = zsetobj->ptr;
4523 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4524 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4525 server.dirty += deleted;
4526 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4527 }
4528 }
4529
4530 static void zrangeGenericCommand(redisClient *c, int reverse) {
4531 robj *o;
4532 int start = atoi(c->argv[2]->ptr);
4533 int end = atoi(c->argv[3]->ptr);
4534
4535 o = lookupKeyRead(c->db,c->argv[1]);
4536 if (o == NULL) {
4537 addReply(c,shared.nullmultibulk);
4538 } else {
4539 if (o->type != REDIS_ZSET) {
4540 addReply(c,shared.wrongtypeerr);
4541 } else {
4542 zset *zsetobj = o->ptr;
4543 zskiplist *zsl = zsetobj->zsl;
4544 zskiplistNode *ln;
4545
4546 int llen = zsl->length;
4547 int rangelen, j;
4548 robj *ele;
4549
4550 /* convert negative indexes */
4551 if (start < 0) start = llen+start;
4552 if (end < 0) end = llen+end;
4553 if (start < 0) start = 0;
4554 if (end < 0) end = 0;
4555
4556 /* indexes sanity checks */
4557 if (start > end || start >= llen) {
4558 /* Out of range start or start > end result in empty list */
4559 addReply(c,shared.emptymultibulk);
4560 return;
4561 }
4562 if (end >= llen) end = llen-1;
4563 rangelen = (end-start)+1;
4564
4565 /* Return the result in form of a multi-bulk reply */
4566 if (reverse) {
4567 ln = zsl->tail;
4568 while (start--)
4569 ln = ln->backward;
4570 } else {
4571 ln = zsl->header->forward[0];
4572 while (start--)
4573 ln = ln->forward[0];
4574 }
4575
4576 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4577 for (j = 0; j < rangelen; j++) {
4578 ele = ln->obj;
4579 addReplyBulkLen(c,ele);
4580 addReply(c,ele);
4581 addReply(c,shared.crlf);
4582 ln = reverse ? ln->backward : ln->forward[0];
4583 }
4584 }
4585 }
4586 }
4587
4588 static void zrangeCommand(redisClient *c) {
4589 zrangeGenericCommand(c,0);
4590 }
4591
4592 static void zrevrangeCommand(redisClient *c) {
4593 zrangeGenericCommand(c,1);
4594 }
4595
4596 static void zrangebyscoreCommand(redisClient *c) {
4597 robj *o;
4598 double min = strtod(c->argv[2]->ptr,NULL);
4599 double max = strtod(c->argv[3]->ptr,NULL);
4600 int offset = 0, limit = -1;
4601
4602 if (c->argc != 4 && c->argc != 7) {
4603 addReplySds(c,
4604 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4605 return;
4606 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4607 addReply(c,shared.syntaxerr);
4608 return;
4609 } else if (c->argc == 7) {
4610 offset = atoi(c->argv[5]->ptr);
4611 limit = atoi(c->argv[6]->ptr);
4612 if (offset < 0) offset = 0;
4613 }
4614
4615 o = lookupKeyRead(c->db,c->argv[1]);
4616 if (o == NULL) {
4617 addReply(c,shared.nullmultibulk);
4618 } else {
4619 if (o->type != REDIS_ZSET) {
4620 addReply(c,shared.wrongtypeerr);
4621 } else {
4622 zset *zsetobj = o->ptr;
4623 zskiplist *zsl = zsetobj->zsl;
4624 zskiplistNode *ln;
4625 robj *ele, *lenobj;
4626 unsigned int rangelen = 0;
4627
4628 /* Get the first node with the score >= min */
4629 ln = zslFirstWithScore(zsl,min);
4630 if (ln == NULL) {
4631 /* No element matching the speciifed interval */
4632 addReply(c,shared.emptymultibulk);
4633 return;
4634 }
4635
4636 /* We don't know in advance how many matching elements there
4637 * are in the list, so we push this object that will represent
4638 * the multi-bulk length in the output buffer, and will "fix"
4639 * it later */
4640 lenobj = createObject(REDIS_STRING,NULL);
4641 addReply(c,lenobj);
4642 decrRefCount(lenobj);
4643
4644 while(ln && ln->score <= max) {
4645 if (offset) {
4646 offset--;
4647 ln = ln->forward[0];
4648 continue;
4649 }
4650 if (limit == 0) break;
4651 ele = ln->obj;
4652 addReplyBulkLen(c,ele);
4653 addReply(c,ele);
4654 addReply(c,shared.crlf);
4655 ln = ln->forward[0];
4656 rangelen++;
4657 if (limit > 0) limit--;
4658 }
4659 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4660 }
4661 }
4662 }
4663
4664 static void zcardCommand(redisClient *c) {
4665 robj *o;
4666 zset *zs;
4667
4668 o = lookupKeyRead(c->db,c->argv[1]);
4669 if (o == NULL) {
4670 addReply(c,shared.czero);
4671 return;
4672 } else {
4673 if (o->type != REDIS_ZSET) {
4674 addReply(c,shared.wrongtypeerr);
4675 } else {
4676 zs = o->ptr;
4677 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
4678 }
4679 }
4680 }
4681
4682 static void zscoreCommand(redisClient *c) {
4683 robj *o;
4684 zset *zs;
4685
4686 o = lookupKeyRead(c->db,c->argv[1]);
4687 if (o == NULL) {
4688 addReply(c,shared.nullbulk);
4689 return;
4690 } else {
4691 if (o->type != REDIS_ZSET) {
4692 addReply(c,shared.wrongtypeerr);
4693 } else {
4694 dictEntry *de;
4695
4696 zs = o->ptr;
4697 de = dictFind(zs->dict,c->argv[2]);
4698 if (!de) {
4699 addReply(c,shared.nullbulk);
4700 } else {
4701 double *score = dictGetEntryVal(de);
4702
4703 addReplyDouble(c,*score);
4704 }
4705 }
4706 }
4707 }
4708
4709 /* ========================= Non type-specific commands ==================== */
4710
4711 static void flushdbCommand(redisClient *c) {
4712 server.dirty += dictSize(c->db->dict);
4713 dictEmpty(c->db->dict);
4714 dictEmpty(c->db->expires);
4715 addReply(c,shared.ok);
4716 }
4717
4718 static void flushallCommand(redisClient *c) {
4719 server.dirty += emptyDb();
4720 addReply(c,shared.ok);
4721 rdbSave(server.dbfilename);
4722 server.dirty++;
4723 }
4724
4725 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4726 redisSortOperation *so = zmalloc(sizeof(*so));
4727 so->type = type;
4728 so->pattern = pattern;
4729 return so;
4730 }
4731
4732 /* Return the value associated to the key with a name obtained
4733 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4734 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4735 char *p;
4736 sds spat, ssub;
4737 robj keyobj;
4738 int prefixlen, sublen, postfixlen;
4739 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4740 struct {
4741 long len;
4742 long free;
4743 char buf[REDIS_SORTKEY_MAX+1];
4744 } keyname;
4745
4746 /* If the pattern is "#" return the substitution object itself in order
4747 * to implement the "SORT ... GET #" feature. */
4748 spat = pattern->ptr;
4749 if (spat[0] == '#' && spat[1] == '\0') {
4750 return subst;
4751 }
4752
4753 /* The substitution object may be specially encoded. If so we create
4754 * a decoded object on the fly. Otherwise getDecodedObject will just
4755 * increment the ref count, that we'll decrement later. */
4756 subst = getDecodedObject(subst);
4757
4758 ssub = subst->ptr;
4759 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4760 p = strchr(spat,'*');
4761 if (!p) {
4762 decrRefCount(subst);
4763 return NULL;
4764 }
4765
4766 prefixlen = p-spat;
4767 sublen = sdslen(ssub);
4768 postfixlen = sdslen(spat)-(prefixlen+1);
4769 memcpy(keyname.buf,spat,prefixlen);
4770 memcpy(keyname.buf+prefixlen,ssub,sublen);
4771 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4772 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4773 keyname.len = prefixlen+sublen+postfixlen;
4774
4775 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
4776 decrRefCount(subst);
4777
4778 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4779 return lookupKeyRead(db,&keyobj);
4780 }
4781
4782 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4783 * the additional parameter is not standard but a BSD-specific we have to
4784 * pass sorting parameters via the global 'server' structure */
4785 static int sortCompare(const void *s1, const void *s2) {
4786 const redisSortObject *so1 = s1, *so2 = s2;
4787 int cmp;
4788
4789 if (!server.sort_alpha) {
4790 /* Numeric sorting. Here it's trivial as we precomputed scores */
4791 if (so1->u.score > so2->u.score) {
4792 cmp = 1;
4793 } else if (so1->u.score < so2->u.score) {
4794 cmp = -1;
4795 } else {
4796 cmp = 0;
4797 }
4798 } else {
4799 /* Alphanumeric sorting */
4800 if (server.sort_bypattern) {
4801 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4802 /* At least one compare object is NULL */
4803 if (so1->u.cmpobj == so2->u.cmpobj)
4804 cmp = 0;
4805 else if (so1->u.cmpobj == NULL)
4806 cmp = -1;
4807 else
4808 cmp = 1;
4809 } else {
4810 /* We have both the objects, use strcoll */
4811 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4812 }
4813 } else {
4814 /* Compare elements directly */
4815 robj *dec1, *dec2;
4816
4817 dec1 = getDecodedObject(so1->obj);
4818 dec2 = getDecodedObject(so2->obj);
4819 cmp = strcoll(dec1->ptr,dec2->ptr);
4820 decrRefCount(dec1);
4821 decrRefCount(dec2);
4822 }
4823 }
4824 return server.sort_desc ? -cmp : cmp;
4825 }
4826
4827 /* The SORT command is the most complex command in Redis. Warning: this code
4828 * is optimized for speed and a bit less for readability */
4829 static void sortCommand(redisClient *c) {
4830 list *operations;
4831 int outputlen = 0;
4832 int desc = 0, alpha = 0;
4833 int limit_start = 0, limit_count = -1, start, end;
4834 int j, dontsort = 0, vectorlen;
4835 int getop = 0; /* GET operation counter */
4836 robj *sortval, *sortby = NULL, *storekey = NULL;
4837 redisSortObject *vector; /* Resulting vector to sort */
4838
4839 /* Lookup the key to sort. It must be of the right types */
4840 sortval = lookupKeyRead(c->db,c->argv[1]);
4841 if (sortval == NULL) {
4842 addReply(c,shared.nokeyerr);
4843 return;
4844 }
4845 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
4846 sortval->type != REDIS_ZSET)
4847 {
4848 addReply(c,shared.wrongtypeerr);
4849 return;
4850 }
4851
4852 /* Create a list of operations to perform for every sorted element.
4853 * Operations can be GET/DEL/INCR/DECR */
4854 operations = listCreate();
4855 listSetFreeMethod(operations,zfree);
4856 j = 2;
4857
4858 /* Now we need to protect sortval incrementing its count, in the future
4859 * SORT may have options able to overwrite/delete keys during the sorting
4860 * and the sorted key itself may get destroied */
4861 incrRefCount(sortval);
4862
4863 /* The SORT command has an SQL-alike syntax, parse it */
4864 while(j < c->argc) {
4865 int leftargs = c->argc-j-1;
4866 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4867 desc = 0;
4868 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4869 desc = 1;
4870 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4871 alpha = 1;
4872 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4873 limit_start = atoi(c->argv[j+1]->ptr);
4874 limit_count = atoi(c->argv[j+2]->ptr);
4875 j+=2;
4876 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4877 storekey = c->argv[j+1];
4878 j++;
4879 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4880 sortby = c->argv[j+1];
4881 /* If the BY pattern does not contain '*', i.e. it is constant,
4882 * we don't need to sort nor to lookup the weight keys. */
4883 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4884 j++;
4885 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4886 listAddNodeTail(operations,createSortOperation(
4887 REDIS_SORT_GET,c->argv[j+1]));
4888 getop++;
4889 j++;
4890 } else {
4891 decrRefCount(sortval);
4892 listRelease(operations);
4893 addReply(c,shared.syntaxerr);
4894 return;
4895 }
4896 j++;
4897 }
4898
4899 /* Load the sorting vector with all the objects to sort */
4900 switch(sortval->type) {
4901 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
4902 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
4903 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
4904 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
4905 }
4906 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4907 j = 0;
4908
4909 if (sortval->type == REDIS_LIST) {
4910 list *list = sortval->ptr;
4911 listNode *ln;
4912
4913 listRewind(list);
4914 while((ln = listYield(list))) {
4915 robj *ele = ln->value;
4916 vector[j].obj = ele;
4917 vector[j].u.score = 0;
4918 vector[j].u.cmpobj = NULL;
4919 j++;
4920 }
4921 } else {
4922 dict *set;
4923 dictIterator *di;
4924 dictEntry *setele;
4925
4926 if (sortval->type == REDIS_SET) {
4927 set = sortval->ptr;
4928 } else {
4929 zset *zs = sortval->ptr;
4930 set = zs->dict;
4931 }
4932
4933 di = dictGetIterator(set);
4934 while((setele = dictNext(di)) != NULL) {
4935 vector[j].obj = dictGetEntryKey(setele);
4936 vector[j].u.score = 0;
4937 vector[j].u.cmpobj = NULL;
4938 j++;
4939 }
4940 dictReleaseIterator(di);
4941 }
4942 redisAssert(j == vectorlen);
4943
4944 /* Now it's time to load the right scores in the sorting vector */
4945 if (dontsort == 0) {
4946 for (j = 0; j < vectorlen; j++) {
4947 if (sortby) {
4948 robj *byval;
4949
4950 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4951 if (!byval || byval->type != REDIS_STRING) continue;
4952 if (alpha) {
4953 vector[j].u.cmpobj = getDecodedObject(byval);
4954 } else {
4955 if (byval->encoding == REDIS_ENCODING_RAW) {
4956 vector[j].u.score = strtod(byval->ptr,NULL);
4957 } else {
4958 /* Don't need to decode the object if it's
4959 * integer-encoded (the only encoding supported) so
4960 * far. We can just cast it */
4961 if (byval->encoding == REDIS_ENCODING_INT) {
4962 vector[j].u.score = (long)byval->ptr;
4963 } else
4964 redisAssert(1 != 1);
4965 }
4966 }
4967 } else {
4968 if (!alpha) {
4969 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4970 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4971 else {
4972 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4973 vector[j].u.score = (long) vector[j].obj->ptr;
4974 else
4975 redisAssert(1 != 1);
4976 }
4977 }
4978 }
4979 }
4980 }
4981
4982 /* We are ready to sort the vector... perform a bit of sanity check
4983 * on the LIMIT option too. We'll use a partial version of quicksort. */
4984 start = (limit_start < 0) ? 0 : limit_start;
4985 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4986 if (start >= vectorlen) {
4987 start = vectorlen-1;
4988 end = vectorlen-2;
4989 }
4990 if (end >= vectorlen) end = vectorlen-1;
4991
4992 if (dontsort == 0) {
4993 server.sort_desc = desc;
4994 server.sort_alpha = alpha;
4995 server.sort_bypattern = sortby ? 1 : 0;
4996 if (sortby && (start != 0 || end != vectorlen-1))
4997 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4998 else
4999 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
5000 }
5001
5002 /* Send command output to the output buffer, performing the specified
5003 * GET/DEL/INCR/DECR operations if any. */
5004 outputlen = getop ? getop*(end-start+1) : end-start+1;
5005 if (storekey == NULL) {
5006 /* STORE option not specified, sent the sorting result to client */
5007 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
5008 for (j = start; j <= end; j++) {
5009 listNode *ln;
5010 if (!getop) {
5011 addReplyBulkLen(c,vector[j].obj);
5012 addReply(c,vector[j].obj);
5013 addReply(c,shared.crlf);
5014 }
5015 listRewind(operations);
5016 while((ln = listYield(operations))) {
5017 redisSortOperation *sop = ln->value;
5018 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5019 vector[j].obj);
5020
5021 if (sop->type == REDIS_SORT_GET) {
5022 if (!val || val->type != REDIS_STRING) {
5023 addReply(c,shared.nullbulk);
5024 } else {
5025 addReplyBulkLen(c,val);
5026 addReply(c,val);
5027 addReply(c,shared.crlf);
5028 }
5029 } else {
5030 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5031 }
5032 }
5033 }
5034 } else {
5035 robj *listObject = createListObject();
5036 list *listPtr = (list*) listObject->ptr;
5037
5038 /* STORE option specified, set the sorting result as a List object */
5039 for (j = start; j <= end; j++) {
5040 listNode *ln;
5041 if (!getop) {
5042 listAddNodeTail(listPtr,vector[j].obj);
5043 incrRefCount(vector[j].obj);
5044 }
5045 listRewind(operations);
5046 while((ln = listYield(operations))) {
5047 redisSortOperation *sop = ln->value;
5048 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5049 vector[j].obj);
5050
5051 if (sop->type == REDIS_SORT_GET) {
5052 if (!val || val->type != REDIS_STRING) {
5053 listAddNodeTail(listPtr,createStringObject("",0));
5054 } else {
5055 listAddNodeTail(listPtr,val);
5056 incrRefCount(val);
5057 }
5058 } else {
5059 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5060 }
5061 }
5062 }
5063 if (dictReplace(c->db->dict,storekey,listObject)) {
5064 incrRefCount(storekey);
5065 }
5066 /* Note: we add 1 because the DB is dirty anyway since even if the
5067 * SORT result is empty a new key is set and maybe the old content
5068 * replaced. */
5069 server.dirty += 1+outputlen;
5070 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5071 }
5072
5073 /* Cleanup */
5074 decrRefCount(sortval);
5075 listRelease(operations);
5076 for (j = 0; j < vectorlen; j++) {
5077 if (sortby && alpha && vector[j].u.cmpobj)
5078 decrRefCount(vector[j].u.cmpobj);
5079 }
5080 zfree(vector);
5081 }
5082
5083 /* Create the string returned by the INFO command. This is decoupled
5084 * by the INFO command itself as we need to report the same information
5085 * on memory corruption problems. */
5086 static sds genRedisInfoString(void) {
5087 sds info;
5088 time_t uptime = time(NULL)-server.stat_starttime;
5089 int j;
5090
5091 info = sdscatprintf(sdsempty(),
5092 "redis_version:%s\r\n"
5093 "arch_bits:%s\r\n"
5094 "multiplexing_api:%s\r\n"
5095 "uptime_in_seconds:%ld\r\n"
5096 "uptime_in_days:%ld\r\n"
5097 "connected_clients:%d\r\n"
5098 "connected_slaves:%d\r\n"
5099 "used_memory:%zu\r\n"
5100 "changes_since_last_save:%lld\r\n"
5101 "bgsave_in_progress:%d\r\n"
5102 "last_save_time:%ld\r\n"
5103 "bgrewriteaof_in_progress:%d\r\n"
5104 "total_connections_received:%lld\r\n"
5105 "total_commands_processed:%lld\r\n"
5106 "role:%s\r\n"
5107 ,REDIS_VERSION,
5108 (sizeof(long) == 8) ? "64" : "32",
5109 aeGetApiName(),
5110 uptime,
5111 uptime/(3600*24),
5112 listLength(server.clients)-listLength(server.slaves),
5113 listLength(server.slaves),
5114 server.usedmemory,
5115 server.dirty,
5116 server.bgsavechildpid != -1,
5117 server.lastsave,
5118 server.bgrewritechildpid != -1,
5119 server.stat_numconnections,
5120 server.stat_numcommands,
5121 server.masterhost == NULL ? "master" : "slave"
5122 );
5123 if (server.masterhost) {
5124 info = sdscatprintf(info,
5125 "master_host:%s\r\n"
5126 "master_port:%d\r\n"
5127 "master_link_status:%s\r\n"
5128 "master_last_io_seconds_ago:%d\r\n"
5129 ,server.masterhost,
5130 server.masterport,
5131 (server.replstate == REDIS_REPL_CONNECTED) ?
5132 "up" : "down",
5133 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5134 );
5135 }
5136 for (j = 0; j < server.dbnum; j++) {
5137 long long keys, vkeys;
5138
5139 keys = dictSize(server.db[j].dict);
5140 vkeys = dictSize(server.db[j].expires);
5141 if (keys || vkeys) {
5142 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5143 j, keys, vkeys);
5144 }
5145 }
5146 return info;
5147 }
5148
5149 static void infoCommand(redisClient *c) {
5150 sds info = genRedisInfoString();
5151 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5152 (unsigned long)sdslen(info)));
5153 addReplySds(c,info);
5154 addReply(c,shared.crlf);
5155 }
5156
5157 static void monitorCommand(redisClient *c) {
5158 /* ignore MONITOR if aleady slave or in monitor mode */
5159 if (c->flags & REDIS_SLAVE) return;
5160
5161 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5162 c->slaveseldb = 0;
5163 listAddNodeTail(server.monitors,c);
5164 addReply(c,shared.ok);
5165 }
5166
5167 /* ================================= Expire ================================= */
5168 static int removeExpire(redisDb *db, robj *key) {
5169 if (dictDelete(db->expires,key) == DICT_OK) {
5170 return 1;
5171 } else {
5172 return 0;
5173 }
5174 }
5175
5176 static int setExpire(redisDb *db, robj *key, time_t when) {
5177 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5178 return 0;
5179 } else {
5180 incrRefCount(key);
5181 return 1;
5182 }
5183 }
5184
5185 /* Return the expire time of the specified key, or -1 if no expire
5186 * is associated with this key (i.e. the key is non volatile) */
5187 static time_t getExpire(redisDb *db, robj *key) {
5188 dictEntry *de;
5189
5190 /* No expire? return ASAP */
5191 if (dictSize(db->expires) == 0 ||
5192 (de = dictFind(db->expires,key)) == NULL) return -1;
5193
5194 return (time_t) dictGetEntryVal(de);
5195 }
5196
5197 static int expireIfNeeded(redisDb *db, robj *key) {
5198 time_t when;
5199 dictEntry *de;
5200
5201 /* No expire? return ASAP */
5202 if (dictSize(db->expires) == 0 ||
5203 (de = dictFind(db->expires,key)) == NULL) return 0;
5204
5205 /* Lookup the expire */
5206 when = (time_t) dictGetEntryVal(de);
5207 if (time(NULL) <= when) return 0;
5208
5209 /* Delete the key */
5210 dictDelete(db->expires,key);
5211 return dictDelete(db->dict,key) == DICT_OK;
5212 }
5213
5214 static int deleteIfVolatile(redisDb *db, robj *key) {
5215 dictEntry *de;
5216
5217 /* No expire? return ASAP */
5218 if (dictSize(db->expires) == 0 ||
5219 (de = dictFind(db->expires,key)) == NULL) return 0;
5220
5221 /* Delete the key */
5222 server.dirty++;
5223 dictDelete(db->expires,key);
5224 return dictDelete(db->dict,key) == DICT_OK;
5225 }
5226
5227 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5228 dictEntry *de;
5229
5230 de = dictFind(c->db->dict,key);
5231 if (de == NULL) {
5232 addReply(c,shared.czero);
5233 return;
5234 }
5235 if (seconds < 0) {
5236 if (deleteKey(c->db,key)) server.dirty++;
5237 addReply(c, shared.cone);
5238 return;
5239 } else {
5240 time_t when = time(NULL)+seconds;
5241 if (setExpire(c->db,key,when)) {
5242 addReply(c,shared.cone);
5243 server.dirty++;
5244 } else {
5245 addReply(c,shared.czero);
5246 }
5247 return;
5248 }
5249 }
5250
5251 static void expireCommand(redisClient *c) {
5252 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5253 }
5254
5255 static void expireatCommand(redisClient *c) {
5256 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5257 }
5258
5259 static void ttlCommand(redisClient *c) {
5260 time_t expire;
5261 int ttl = -1;
5262
5263 expire = getExpire(c->db,c->argv[1]);
5264 if (expire != -1) {
5265 ttl = (int) (expire-time(NULL));
5266 if (ttl < 0) ttl = -1;
5267 }
5268 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5269 }
5270
5271 /* =============================== Replication ============================= */
5272
5273 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5274 ssize_t nwritten, ret = size;
5275 time_t start = time(NULL);
5276
5277 timeout++;
5278 while(size) {
5279 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5280 nwritten = write(fd,ptr,size);
5281 if (nwritten == -1) return -1;
5282 ptr += nwritten;
5283 size -= nwritten;
5284 }
5285 if ((time(NULL)-start) > timeout) {
5286 errno = ETIMEDOUT;
5287 return -1;
5288 }
5289 }
5290 return ret;
5291 }
5292
5293 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5294 ssize_t nread, totread = 0;
5295 time_t start = time(NULL);
5296
5297 timeout++;
5298 while(size) {
5299 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5300 nread = read(fd,ptr,size);
5301 if (nread == -1) return -1;
5302 ptr += nread;
5303 size -= nread;
5304 totread += nread;
5305 }
5306 if ((time(NULL)-start) > timeout) {
5307 errno = ETIMEDOUT;
5308 return -1;
5309 }
5310 }
5311 return totread;
5312 }
5313
5314 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5315 ssize_t nread = 0;
5316
5317 size--;
5318 while(size) {
5319 char c;
5320
5321 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5322 if (c == '\n') {
5323 *ptr = '\0';
5324 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5325 return nread;
5326 } else {
5327 *ptr++ = c;
5328 *ptr = '\0';
5329 nread++;
5330 }
5331 }
5332 return nread;
5333 }
5334
5335 static void syncCommand(redisClient *c) {
5336 /* ignore SYNC if aleady slave or in monitor mode */
5337 if (c->flags & REDIS_SLAVE) return;
5338
5339 /* SYNC can't be issued when the server has pending data to send to
5340 * the client about already issued commands. We need a fresh reply
5341 * buffer registering the differences between the BGSAVE and the current
5342 * dataset, so that we can copy to other slaves if needed. */
5343 if (listLength(c->reply) != 0) {
5344 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5345 return;
5346 }
5347
5348 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5349 /* Here we need to check if there is a background saving operation
5350 * in progress, or if it is required to start one */
5351 if (server.bgsavechildpid != -1) {
5352 /* Ok a background save is in progress. Let's check if it is a good
5353 * one for replication, i.e. if there is another slave that is
5354 * registering differences since the server forked to save */
5355 redisClient *slave;
5356 listNode *ln;
5357
5358 listRewind(server.slaves);
5359 while((ln = listYield(server.slaves))) {
5360 slave = ln->value;
5361 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5362 }
5363 if (ln) {
5364 /* Perfect, the server is already registering differences for
5365 * another slave. Set the right state, and copy the buffer. */
5366 listRelease(c->reply);
5367 c->reply = listDup(slave->reply);
5368 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5369 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5370 } else {
5371 /* No way, we need to wait for the next BGSAVE in order to
5372 * register differences */
5373 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5374 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5375 }
5376 } else {
5377 /* Ok we don't have a BGSAVE in progress, let's start one */
5378 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5379 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5380 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5381 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5382 return;
5383 }
5384 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5385 }
5386 c->repldbfd = -1;
5387 c->flags |= REDIS_SLAVE;
5388 c->slaveseldb = 0;
5389 listAddNodeTail(server.slaves,c);
5390 return;
5391 }
5392
5393 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5394 redisClient *slave = privdata;
5395 REDIS_NOTUSED(el);
5396 REDIS_NOTUSED(mask);
5397 char buf[REDIS_IOBUF_LEN];
5398 ssize_t nwritten, buflen;
5399
5400 if (slave->repldboff == 0) {
5401 /* Write the bulk write count before to transfer the DB. In theory here
5402 * we don't know how much room there is in the output buffer of the
5403 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5404 * operations) will never be smaller than the few bytes we need. */
5405 sds bulkcount;
5406
5407 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5408 slave->repldbsize);
5409 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5410 {
5411 sdsfree(bulkcount);
5412 freeClient(slave);
5413 return;
5414 }
5415 sdsfree(bulkcount);
5416 }
5417 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5418 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5419 if (buflen <= 0) {
5420 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5421 (buflen == 0) ? "premature EOF" : strerror(errno));
5422 freeClient(slave);
5423 return;
5424 }
5425 if ((nwritten = write(fd,buf,buflen)) == -1) {
5426 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5427 strerror(errno));
5428 freeClient(slave);
5429 return;
5430 }
5431 slave->repldboff += nwritten;
5432 if (slave->repldboff == slave->repldbsize) {
5433 close(slave->repldbfd);
5434 slave->repldbfd = -1;
5435 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5436 slave->replstate = REDIS_REPL_ONLINE;
5437 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5438 sendReplyToClient, slave) == AE_ERR) {
5439 freeClient(slave);
5440 return;
5441 }
5442 addReplySds(slave,sdsempty());
5443 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5444 }
5445 }
5446
5447 /* This function is called at the end of every backgrond saving.
5448 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5449 * otherwise REDIS_ERR is passed to the function.
5450 *
5451 * The goal of this function is to handle slaves waiting for a successful
5452 * background saving in order to perform non-blocking synchronization. */
5453 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5454 listNode *ln;
5455 int startbgsave = 0;
5456
5457 listRewind(server.slaves);
5458 while((ln = listYield(server.slaves))) {
5459 redisClient *slave = ln->value;
5460
5461 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5462 startbgsave = 1;
5463 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5464 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5465 struct redis_stat buf;
5466
5467 if (bgsaveerr != REDIS_OK) {
5468 freeClient(slave);
5469 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5470 continue;
5471 }
5472 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5473 redis_fstat(slave->repldbfd,&buf) == -1) {
5474 freeClient(slave);
5475 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5476 continue;
5477 }
5478 slave->repldboff = 0;
5479 slave->repldbsize = buf.st_size;
5480 slave->replstate = REDIS_REPL_SEND_BULK;
5481 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5482 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5483 freeClient(slave);
5484 continue;
5485 }
5486 }
5487 }
5488 if (startbgsave) {
5489 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5490 listRewind(server.slaves);
5491 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5492 while((ln = listYield(server.slaves))) {
5493 redisClient *slave = ln->value;
5494
5495 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5496 freeClient(slave);
5497 }
5498 }
5499 }
5500 }
5501
5502 static int syncWithMaster(void) {
5503 char buf[1024], tmpfile[256], authcmd[1024];
5504 int dumpsize;
5505 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5506 int dfd;
5507
5508 if (fd == -1) {
5509 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5510 strerror(errno));
5511 return REDIS_ERR;
5512 }
5513
5514 /* AUTH with the master if required. */
5515 if(server.masterauth) {
5516 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5517 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5518 close(fd);
5519 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5520 strerror(errno));
5521 return REDIS_ERR;
5522 }
5523 /* Read the AUTH result. */
5524 if (syncReadLine(fd,buf,1024,3600) == -1) {
5525 close(fd);
5526 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5527 strerror(errno));
5528 return REDIS_ERR;
5529 }
5530 if (buf[0] != '+') {
5531 close(fd);
5532 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5533 return REDIS_ERR;
5534 }
5535 }
5536
5537 /* Issue the SYNC command */
5538 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5539 close(fd);
5540 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5541 strerror(errno));
5542 return REDIS_ERR;
5543 }
5544 /* Read the bulk write count */
5545 if (syncReadLine(fd,buf,1024,3600) == -1) {
5546 close(fd);
5547 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5548 strerror(errno));
5549 return REDIS_ERR;
5550 }
5551 if (buf[0] != '$') {
5552 close(fd);
5553 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5554 return REDIS_ERR;
5555 }
5556 dumpsize = atoi(buf+1);
5557 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5558 /* Read the bulk write data on a temp file */
5559 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5560 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5561 if (dfd == -1) {
5562 close(fd);
5563 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5564 return REDIS_ERR;
5565 }
5566 while(dumpsize) {
5567 int nread, nwritten;
5568
5569 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5570 if (nread == -1) {
5571 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5572 strerror(errno));
5573 close(fd);
5574 close(dfd);
5575 return REDIS_ERR;
5576 }
5577 nwritten = write(dfd,buf,nread);
5578 if (nwritten == -1) {
5579 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5580 close(fd);
5581 close(dfd);
5582 return REDIS_ERR;
5583 }
5584 dumpsize -= nread;
5585 }
5586 close(dfd);
5587 if (rename(tmpfile,server.dbfilename) == -1) {
5588 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5589 unlink(tmpfile);
5590 close(fd);
5591 return REDIS_ERR;
5592 }
5593 emptyDb();
5594 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5595 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5596 close(fd);
5597 return REDIS_ERR;
5598 }
5599 server.master = createClient(fd);
5600 server.master->flags |= REDIS_MASTER;
5601 server.master->authenticated = 1;
5602 server.replstate = REDIS_REPL_CONNECTED;
5603 return REDIS_OK;
5604 }
5605
5606 static void slaveofCommand(redisClient *c) {
5607 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5608 !strcasecmp(c->argv[2]->ptr,"one")) {
5609 if (server.masterhost) {
5610 sdsfree(server.masterhost);
5611 server.masterhost = NULL;
5612 if (server.master) freeClient(server.master);
5613 server.replstate = REDIS_REPL_NONE;
5614 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5615 }
5616 } else {
5617 sdsfree(server.masterhost);
5618 server.masterhost = sdsdup(c->argv[1]->ptr);
5619 server.masterport = atoi(c->argv[2]->ptr);
5620 if (server.master) freeClient(server.master);
5621 server.replstate = REDIS_REPL_CONNECT;
5622 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5623 server.masterhost, server.masterport);
5624 }
5625 addReply(c,shared.ok);
5626 }
5627
5628 /* ============================ Maxmemory directive ======================== */
5629
5630 /* This function gets called when 'maxmemory' is set on the config file to limit
5631 * the max memory used by the server, and we are out of memory.
5632 * This function will try to, in order:
5633 *
5634 * - Free objects from the free list
5635 * - Try to remove keys with an EXPIRE set
5636 *
5637 * It is not possible to free enough memory to reach used-memory < maxmemory
5638 * the server will start refusing commands that will enlarge even more the
5639 * memory usage.
5640 */
5641 static void freeMemoryIfNeeded(void) {
5642 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5643 if (listLength(server.objfreelist)) {
5644 robj *o;
5645
5646 listNode *head = listFirst(server.objfreelist);
5647 o = listNodeValue(head);
5648 listDelNode(server.objfreelist,head);
5649 zfree(o);
5650 } else {
5651 int j, k, freed = 0;
5652
5653 for (j = 0; j < server.dbnum; j++) {
5654 int minttl = -1;
5655 robj *minkey = NULL;
5656 struct dictEntry *de;
5657
5658 if (dictSize(server.db[j].expires)) {
5659 freed = 1;
5660 /* From a sample of three keys drop the one nearest to
5661 * the natural expire */
5662 for (k = 0; k < 3; k++) {
5663 time_t t;
5664
5665 de = dictGetRandomKey(server.db[j].expires);
5666 t = (time_t) dictGetEntryVal(de);
5667 if (minttl == -1 || t < minttl) {
5668 minkey = dictGetEntryKey(de);
5669 minttl = t;
5670 }
5671 }
5672 deleteKey(server.db+j,minkey);
5673 }
5674 }
5675 if (!freed) return; /* nothing to free... */
5676 }
5677 }
5678 }
5679
5680 /* ============================== Append Only file ========================== */
5681
5682 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5683 sds buf = sdsempty();
5684 int j;
5685 ssize_t nwritten;
5686 time_t now;
5687 robj *tmpargv[3];
5688
5689 /* The DB this command was targetting is not the same as the last command
5690 * we appendend. To issue a SELECT command is needed. */
5691 if (dictid != server.appendseldb) {
5692 char seldb[64];
5693
5694 snprintf(seldb,sizeof(seldb),"%d",dictid);
5695 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
5696 (unsigned long)strlen(seldb),seldb);
5697 server.appendseldb = dictid;
5698 }
5699
5700 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5701 * EXPIREs into EXPIREATs calls */
5702 if (cmd->proc == expireCommand) {
5703 long when;
5704
5705 tmpargv[0] = createStringObject("EXPIREAT",8);
5706 tmpargv[1] = argv[1];
5707 incrRefCount(argv[1]);
5708 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5709 tmpargv[2] = createObject(REDIS_STRING,
5710 sdscatprintf(sdsempty(),"%ld",when));
5711 argv = tmpargv;
5712 }
5713
5714 /* Append the actual command */
5715 buf = sdscatprintf(buf,"*%d\r\n",argc);
5716 for (j = 0; j < argc; j++) {
5717 robj *o = argv[j];
5718
5719 o = getDecodedObject(o);
5720 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
5721 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5722 buf = sdscatlen(buf,"\r\n",2);
5723 decrRefCount(o);
5724 }
5725
5726 /* Free the objects from the modified argv for EXPIREAT */
5727 if (cmd->proc == expireCommand) {
5728 for (j = 0; j < 3; j++)
5729 decrRefCount(argv[j]);
5730 }
5731
5732 /* We want to perform a single write. This should be guaranteed atomic
5733 * at least if the filesystem we are writing is a real physical one.
5734 * While this will save us against the server being killed I don't think
5735 * there is much to do about the whole server stopping for power problems
5736 * or alike */
5737 nwritten = write(server.appendfd,buf,sdslen(buf));
5738 if (nwritten != (signed)sdslen(buf)) {
5739 /* Ooops, we are in troubles. The best thing to do for now is
5740 * to simply exit instead to give the illusion that everything is
5741 * working as expected. */
5742 if (nwritten == -1) {
5743 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5744 } else {
5745 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5746 }
5747 exit(1);
5748 }
5749 /* If a background append only file rewriting is in progress we want to
5750 * accumulate the differences between the child DB and the current one
5751 * in a buffer, so that when the child process will do its work we
5752 * can append the differences to the new append only file. */
5753 if (server.bgrewritechildpid != -1)
5754 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
5755
5756 sdsfree(buf);
5757 now = time(NULL);
5758 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5759 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5760 now-server.lastfsync > 1))
5761 {
5762 fsync(server.appendfd); /* Let's try to get this data on the disk */
5763 server.lastfsync = now;
5764 }
5765 }
5766
5767 /* In Redis commands are always executed in the context of a client, so in
5768 * order to load the append only file we need to create a fake client. */
5769 static struct redisClient *createFakeClient(void) {
5770 struct redisClient *c = zmalloc(sizeof(*c));
5771
5772 selectDb(c,0);
5773 c->fd = -1;
5774 c->querybuf = sdsempty();
5775 c->argc = 0;
5776 c->argv = NULL;
5777 c->flags = 0;
5778 /* We set the fake client as a slave waiting for the synchronization
5779 * so that Redis will not try to send replies to this client. */
5780 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5781 c->reply = listCreate();
5782 listSetFreeMethod(c->reply,decrRefCount);
5783 listSetDupMethod(c->reply,dupClientReplyValue);
5784 return c;
5785 }
5786
5787 static void freeFakeClient(struct redisClient *c) {
5788 sdsfree(c->querybuf);
5789 listRelease(c->reply);
5790 zfree(c);
5791 }
5792
5793 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5794 * error (the append only file is zero-length) REDIS_ERR is returned. On
5795 * fatal error an error message is logged and the program exists. */
5796 int loadAppendOnlyFile(char *filename) {
5797 struct redisClient *fakeClient;
5798 FILE *fp = fopen(filename,"r");
5799 struct redis_stat sb;
5800
5801 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5802 return REDIS_ERR;
5803
5804 if (fp == NULL) {
5805 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5806 exit(1);
5807 }
5808
5809 fakeClient = createFakeClient();
5810 while(1) {
5811 int argc, j;
5812 unsigned long len;
5813 robj **argv;
5814 char buf[128];
5815 sds argsds;
5816 struct redisCommand *cmd;
5817
5818 if (fgets(buf,sizeof(buf),fp) == NULL) {
5819 if (feof(fp))
5820 break;
5821 else
5822 goto readerr;
5823 }
5824 if (buf[0] != '*') goto fmterr;
5825 argc = atoi(buf+1);
5826 argv = zmalloc(sizeof(robj*)*argc);
5827 for (j = 0; j < argc; j++) {
5828 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5829 if (buf[0] != '$') goto fmterr;
5830 len = strtol(buf+1,NULL,10);
5831 argsds = sdsnewlen(NULL,len);
5832 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
5833 argv[j] = createObject(REDIS_STRING,argsds);
5834 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5835 }
5836
5837 /* Command lookup */
5838 cmd = lookupCommand(argv[0]->ptr);
5839 if (!cmd) {
5840 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5841 exit(1);
5842 }
5843 /* Try object sharing and encoding */
5844 if (server.shareobjects) {
5845 int j;
5846 for(j = 1; j < argc; j++)
5847 argv[j] = tryObjectSharing(argv[j]);
5848 }
5849 if (cmd->flags & REDIS_CMD_BULK)
5850 tryObjectEncoding(argv[argc-1]);
5851 /* Run the command in the context of a fake client */
5852 fakeClient->argc = argc;
5853 fakeClient->argv = argv;
5854 cmd->proc(fakeClient);
5855 /* Discard the reply objects list from the fake client */
5856 while(listLength(fakeClient->reply))
5857 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5858 /* Clean up, ready for the next command */
5859 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5860 zfree(argv);
5861 }
5862 fclose(fp);
5863 freeFakeClient(fakeClient);
5864 return REDIS_OK;
5865
5866 readerr:
5867 if (feof(fp)) {
5868 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5869 } else {
5870 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5871 }
5872 exit(1);
5873 fmterr:
5874 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5875 exit(1);
5876 }
5877
5878 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5879 static int fwriteBulk(FILE *fp, robj *obj) {
5880 char buf[128];
5881 obj = getDecodedObject(obj);
5882 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
5883 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
5884 if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0)
5885 goto err;
5886 if (fwrite("\r\n",2,1,fp) == 0) goto err;
5887 decrRefCount(obj);
5888 return 1;
5889 err:
5890 decrRefCount(obj);
5891 return 0;
5892 }
5893
5894 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5895 static int fwriteBulkDouble(FILE *fp, double d) {
5896 char buf[128], dbuf[128];
5897
5898 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
5899 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
5900 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5901 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
5902 return 1;
5903 }
5904
5905 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5906 static int fwriteBulkLong(FILE *fp, long l) {
5907 char buf[128], lbuf[128];
5908
5909 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
5910 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
5911 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5912 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
5913 return 1;
5914 }
5915
5916 /* Write a sequence of commands able to fully rebuild the dataset into
5917 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5918 static int rewriteAppendOnlyFile(char *filename) {
5919 dictIterator *di = NULL;
5920 dictEntry *de;
5921 FILE *fp;
5922 char tmpfile[256];
5923 int j;
5924 time_t now = time(NULL);
5925
5926 /* Note that we have to use a different temp name here compared to the
5927 * one used by rewriteAppendOnlyFileBackground() function. */
5928 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
5929 fp = fopen(tmpfile,"w");
5930 if (!fp) {
5931 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
5932 return REDIS_ERR;
5933 }
5934 for (j = 0; j < server.dbnum; j++) {
5935 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
5936 redisDb *db = server.db+j;
5937 dict *d = db->dict;
5938 if (dictSize(d) == 0) continue;
5939 di = dictGetIterator(d);
5940 if (!di) {
5941 fclose(fp);
5942 return REDIS_ERR;
5943 }
5944
5945 /* SELECT the new DB */
5946 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
5947 if (fwriteBulkLong(fp,j) == 0) goto werr;
5948
5949 /* Iterate this DB writing every entry */
5950 while((de = dictNext(di)) != NULL) {
5951 robj *key = dictGetEntryKey(de);
5952 robj *o = dictGetEntryVal(de);
5953 time_t expiretime = getExpire(db,key);
5954
5955 /* Save the key and associated value */
5956 if (o->type == REDIS_STRING) {
5957 /* Emit a SET command */
5958 char cmd[]="*3\r\n$3\r\nSET\r\n";
5959 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5960 /* Key and value */
5961 if (fwriteBulk(fp,key) == 0) goto werr;
5962 if (fwriteBulk(fp,o) == 0) goto werr;
5963 } else if (o->type == REDIS_LIST) {
5964 /* Emit the RPUSHes needed to rebuild the list */
5965 list *list = o->ptr;
5966 listNode *ln;
5967
5968 listRewind(list);
5969 while((ln = listYield(list))) {
5970 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
5971 robj *eleobj = listNodeValue(ln);
5972
5973 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5974 if (fwriteBulk(fp,key) == 0) goto werr;
5975 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5976 }
5977 } else if (o->type == REDIS_SET) {
5978 /* Emit the SADDs needed to rebuild the set */
5979 dict *set = o->ptr;
5980 dictIterator *di = dictGetIterator(set);
5981 dictEntry *de;
5982
5983 while((de = dictNext(di)) != NULL) {
5984 char cmd[]="*3\r\n$4\r\nSADD\r\n";
5985 robj *eleobj = dictGetEntryKey(de);
5986
5987 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5988 if (fwriteBulk(fp,key) == 0) goto werr;
5989 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5990 }
5991 dictReleaseIterator(di);
5992 } else if (o->type == REDIS_ZSET) {
5993 /* Emit the ZADDs needed to rebuild the sorted set */
5994 zset *zs = o->ptr;
5995 dictIterator *di = dictGetIterator(zs->dict);
5996 dictEntry *de;
5997
5998 while((de = dictNext(di)) != NULL) {
5999 char cmd[]="*4\r\n$4\r\nZADD\r\n";
6000 robj *eleobj = dictGetEntryKey(de);
6001 double *score = dictGetEntryVal(de);
6002
6003 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6004 if (fwriteBulk(fp,key) == 0) goto werr;
6005 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
6006 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6007 }
6008 dictReleaseIterator(di);
6009 } else {
6010 redisAssert(0 != 0);
6011 }
6012 /* Save the expire time */
6013 if (expiretime != -1) {
6014 char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
6015 /* If this key is already expired skip it */
6016 if (expiretime < now) continue;
6017 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6018 if (fwriteBulk(fp,key) == 0) goto werr;
6019 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6020 }
6021 }
6022 dictReleaseIterator(di);
6023 }
6024
6025 /* Make sure data will not remain on the OS's output buffers */
6026 fflush(fp);
6027 fsync(fileno(fp));
6028 fclose(fp);
6029
6030 /* Use RENAME to make sure the DB file is changed atomically only
6031 * if the generate DB file is ok. */
6032 if (rename(tmpfile,filename) == -1) {
6033 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6034 unlink(tmpfile);
6035 return REDIS_ERR;
6036 }
6037 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6038 return REDIS_OK;
6039
6040 werr:
6041 fclose(fp);
6042 unlink(tmpfile);
6043 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
6044 if (di) dictReleaseIterator(di);
6045 return REDIS_ERR;
6046 }
6047
6048 /* This is how rewriting of the append only file in background works:
6049 *
6050 * 1) The user calls BGREWRITEAOF
6051 * 2) Redis calls this function, that forks():
6052 * 2a) the child rewrite the append only file in a temp file.
6053 * 2b) the parent accumulates differences in server.bgrewritebuf.
6054 * 3) When the child finished '2a' exists.
6055 * 4) The parent will trap the exit code, if it's OK, will append the
6056 * data accumulated into server.bgrewritebuf into the temp file, and
6057 * finally will rename(2) the temp file in the actual file name.
6058 * The the new file is reopened as the new append only file. Profit!
6059 */
6060 static int rewriteAppendOnlyFileBackground(void) {
6061 pid_t childpid;
6062
6063 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6064 if ((childpid = fork()) == 0) {
6065 /* Child */
6066 char tmpfile[256];
6067 close(server.fd);
6068
6069 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6070 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6071 exit(0);
6072 } else {
6073 exit(1);
6074 }
6075 } else {
6076 /* Parent */
6077 if (childpid == -1) {
6078 redisLog(REDIS_WARNING,
6079 "Can't rewrite append only file in background: fork: %s",
6080 strerror(errno));
6081 return REDIS_ERR;
6082 }
6083 redisLog(REDIS_NOTICE,
6084 "Background append only file rewriting started by pid %d",childpid);
6085 server.bgrewritechildpid = childpid;
6086 /* We set appendseldb to -1 in order to force the next call to the
6087 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6088 * accumulated by the parent into server.bgrewritebuf will start
6089 * with a SELECT statement and it will be safe to merge. */
6090 server.appendseldb = -1;
6091 return REDIS_OK;
6092 }
6093 return REDIS_OK; /* unreached */
6094 }
6095
6096 static void bgrewriteaofCommand(redisClient *c) {
6097 if (server.bgrewritechildpid != -1) {
6098 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6099 return;
6100 }
6101 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6102 char *status = "+Background append only file rewriting started\r\n";
6103 addReplySds(c,sdsnew(status));
6104 } else {
6105 addReply(c,shared.err);
6106 }
6107 }
6108
6109 static void aofRemoveTempFile(pid_t childpid) {
6110 char tmpfile[256];
6111
6112 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6113 unlink(tmpfile);
6114 }
6115
6116 /* ================================= Debugging ============================== */
6117
6118 static void debugCommand(redisClient *c) {
6119 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6120 *((char*)-1) = 'x';
6121 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6122 if (rdbSave(server.dbfilename) != REDIS_OK) {
6123 addReply(c,shared.err);
6124 return;
6125 }
6126 emptyDb();
6127 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6128 addReply(c,shared.err);
6129 return;
6130 }
6131 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6132 addReply(c,shared.ok);
6133 } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
6134 emptyDb();
6135 if (loadAppendOnlyFile(server.appendfilename) != REDIS_OK) {
6136 addReply(c,shared.err);
6137 return;
6138 }
6139 redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
6140 addReply(c,shared.ok);
6141 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6142 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6143 robj *key, *val;
6144
6145 if (!de) {
6146 addReply(c,shared.nokeyerr);
6147 return;
6148 }
6149 key = dictGetEntryKey(de);
6150 val = dictGetEntryVal(de);
6151 addReplySds(c,sdscatprintf(sdsempty(),
6152 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6153 (void*)key, key->refcount, (void*)val, val->refcount,
6154 val->encoding));
6155 } else {
6156 addReplySds(c,sdsnew(
6157 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6158 }
6159 }
6160
6161 static void _redisAssert(char *estr) {
6162 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6163 redisLog(REDIS_WARNING,"==> %s\n",estr);
6164 #ifdef HAVE_BACKTRACE
6165 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6166 *((char*)-1) = 'x';
6167 #endif
6168 }
6169
6170 /* =================================== Main! ================================ */
6171
6172 #ifdef __linux__
6173 int linuxOvercommitMemoryValue(void) {
6174 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6175 char buf[64];
6176
6177 if (!fp) return -1;
6178 if (fgets(buf,64,fp) == NULL) {
6179 fclose(fp);
6180 return -1;
6181 }
6182 fclose(fp);
6183
6184 return atoi(buf);
6185 }
6186
6187 void linuxOvercommitMemoryWarning(void) {
6188 if (linuxOvercommitMemoryValue() == 0) {
6189 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6190 }
6191 }
6192 #endif /* __linux__ */
6193
6194 static void daemonize(void) {
6195 int fd;
6196 FILE *fp;
6197
6198 if (fork() != 0) exit(0); /* parent exits */
6199 printf("New pid: %d\n", getpid());
6200 setsid(); /* create a new session */
6201
6202 /* Every output goes to /dev/null. If Redis is daemonized but
6203 * the 'logfile' is set to 'stdout' in the configuration file
6204 * it will not log at all. */
6205 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6206 dup2(fd, STDIN_FILENO);
6207 dup2(fd, STDOUT_FILENO);
6208 dup2(fd, STDERR_FILENO);
6209 if (fd > STDERR_FILENO) close(fd);
6210 }
6211 /* Try to write the pid file */
6212 fp = fopen(server.pidfile,"w");
6213 if (fp) {
6214 fprintf(fp,"%d\n",getpid());
6215 fclose(fp);
6216 }
6217 }
6218
6219 int main(int argc, char **argv) {
6220 initServerConfig();
6221 if (argc == 2) {
6222 resetServerSaveParams();
6223 loadServerConfig(argv[1]);
6224 } else if (argc > 2) {
6225 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6226 exit(1);
6227 } else {
6228 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6229 }
6230 if (server.daemonize) daemonize();
6231 initServer();
6232 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6233 #ifdef __linux__
6234 linuxOvercommitMemoryWarning();
6235 #endif
6236 if (server.appendonly) {
6237 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6238 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6239 } else {
6240 if (rdbLoad(server.dbfilename) == REDIS_OK)
6241 redisLog(REDIS_NOTICE,"DB loaded from disk");
6242 }
6243 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6244 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6245 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6246 aeMain(server.el);
6247 aeDeleteEventLoop(server.el);
6248 return 0;
6249 }
6250
6251 /* ============================= Backtrace support ========================= */
6252
6253 #ifdef HAVE_BACKTRACE
6254 static char *findFuncName(void *pointer, unsigned long *offset);
6255
6256 static void *getMcontextEip(ucontext_t *uc) {
6257 #if defined(__FreeBSD__)
6258 return (void*) uc->uc_mcontext.mc_eip;
6259 #elif defined(__dietlibc__)
6260 return (void*) uc->uc_mcontext.eip;
6261 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6262 #if __x86_64__
6263 return (void*) uc->uc_mcontext->__ss.__rip;
6264 #else
6265 return (void*) uc->uc_mcontext->__ss.__eip;
6266 #endif
6267 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6268 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6269 return (void*) uc->uc_mcontext->__ss.__rip;
6270 #else
6271 return (void*) uc->uc_mcontext->__ss.__eip;
6272 #endif
6273 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6274 return (void*) uc->uc_mcontext.gregs[REG_EIP];
6275 #elif defined(__ia64__) /* Linux IA64 */
6276 return (void*) uc->uc_mcontext.sc_ip;
6277 #else
6278 return NULL;
6279 #endif
6280 }
6281
6282 static void segvHandler(int sig, siginfo_t *info, void *secret) {
6283 void *trace[100];
6284 char **messages = NULL;
6285 int i, trace_size = 0;
6286 unsigned long offset=0;
6287 ucontext_t *uc = (ucontext_t*) secret;
6288 sds infostring;
6289 REDIS_NOTUSED(info);
6290
6291 redisLog(REDIS_WARNING,
6292 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6293 infostring = genRedisInfoString();
6294 redisLog(REDIS_WARNING, "%s",infostring);
6295 /* It's not safe to sdsfree() the returned string under memory
6296 * corruption conditions. Let it leak as we are going to abort */
6297
6298 trace_size = backtrace(trace, 100);
6299 /* overwrite sigaction with caller's address */
6300 if (getMcontextEip(uc) != NULL) {
6301 trace[1] = getMcontextEip(uc);
6302 }
6303 messages = backtrace_symbols(trace, trace_size);
6304
6305 for (i=1; i<trace_size; ++i) {
6306 char *fn = findFuncName(trace[i], &offset), *p;
6307
6308 p = strchr(messages[i],'+');
6309 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6310 redisLog(REDIS_WARNING,"%s", messages[i]);
6311 } else {
6312 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6313 }
6314 }
6315 // free(messages); Don't call free() with possibly corrupted memory.
6316 exit(0);
6317 }
6318
6319 static void setupSigSegvAction(void) {
6320 struct sigaction act;
6321
6322 sigemptyset (&act.sa_mask);
6323 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6324 * is used. Otherwise, sa_handler is used */
6325 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6326 act.sa_sigaction = segvHandler;
6327 sigaction (SIGSEGV, &act, NULL);
6328 sigaction (SIGBUS, &act, NULL);
6329 sigaction (SIGFPE, &act, NULL);
6330 sigaction (SIGILL, &act, NULL);
6331 sigaction (SIGBUS, &act, NULL);
6332 return;
6333 }
6334
6335 #include "staticsymbols.h"
6336 /* This function try to convert a pointer into a function name. It's used in
6337 * oreder to provide a backtrace under segmentation fault that's able to
6338 * display functions declared as static (otherwise the backtrace is useless). */
6339 static char *findFuncName(void *pointer, unsigned long *offset){
6340 int i, ret = -1;
6341 unsigned long off, minoff = 0;
6342
6343 /* Try to match against the Symbol with the smallest offset */
6344 for (i=0; symsTable[i].pointer; i++) {
6345 unsigned long lp = (unsigned long) pointer;
6346
6347 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6348 off=lp-symsTable[i].pointer;
6349 if (ret < 0 || off < minoff) {
6350 minoff=off;
6351 ret=i;
6352 }
6353 }
6354 }
6355 if (ret == -1) return NULL;
6356 *offset = minoff;
6357 return symsTable[ret].name;
6358 }
6359 #else /* HAVE_BACKTRACE */
6360 static void setupSigSegvAction(void) {
6361 }
6362 #endif /* HAVE_BACKTRACE */
6363
6364
6365
6366 /* The End */
6367
6368
6369