]> git.saurik.com Git - redis.git/blob - redis.c
b7dc921a82ed9bf3c53378d6bb6979bc0a696502
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.1.90"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Client flags */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
165
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
174
175 /* List related stuff */
176 #define REDIS_HEAD 0
177 #define REDIS_TAIL 1
178
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
184
185 /* Log levels */
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
189
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
192
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
195
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
200
201 /* We can print the stacktrace, so our assert is defined this way: */
202 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
203 static void _redisAssert(char *estr);
204
205 /*================================= Data types ============================== */
206
207 /* A redis object, that is a type able to hold a string / list / set */
208 typedef struct redisObject {
209 void *ptr;
210 unsigned char type;
211 unsigned char encoding;
212 unsigned char notused[2];
213 int refcount;
214 } robj;
215
216 /* Macro used to initalize a Redis object allocated on the stack.
217 * Note that this macro is taken near the structure definition to make sure
218 * we'll update it when the structure is changed, to avoid bugs like
219 * bug #85 introduced exactly in this way. */
220 #define initStaticStringObject(_var,_ptr) do { \
221 _var.refcount = 1; \
222 _var.type = REDIS_STRING; \
223 _var.encoding = REDIS_ENCODING_RAW; \
224 _var.ptr = _ptr; \
225 } while(0);
226
227 typedef struct redisDb {
228 dict *dict;
229 dict *expires;
230 int id;
231 } redisDb;
232
233 /* With multiplexing we need to take per-clinet state.
234 * Clients are taken in a liked list. */
235 typedef struct redisClient {
236 int fd;
237 redisDb *db;
238 int dictid;
239 sds querybuf;
240 robj **argv, **mbargv;
241 int argc, mbargc;
242 int bulklen; /* bulk read len. -1 if not in bulk read mode */
243 int multibulk; /* multi bulk command format active */
244 list *reply;
245 int sentlen;
246 time_t lastinteraction; /* time of the last interaction, used for timeout */
247 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
248 int slaveseldb; /* slave selected db, if this client is a slave */
249 int authenticated; /* when requirepass is non-NULL */
250 int replstate; /* replication state if this is a slave */
251 int repldbfd; /* replication DB file descriptor */
252 long repldboff; /* replication DB file offset */
253 off_t repldbsize; /* replication DB file size */
254 } redisClient;
255
256 struct saveparam {
257 time_t seconds;
258 int changes;
259 };
260
261 /* Global server state structure */
262 struct redisServer {
263 int port;
264 int fd;
265 redisDb *db;
266 dict *sharingpool;
267 unsigned int sharingpoolsize;
268 long long dirty; /* changes to DB from the last save */
269 list *clients;
270 list *slaves, *monitors;
271 char neterr[ANET_ERR_LEN];
272 aeEventLoop *el;
273 int cronloops; /* number of times the cron function run */
274 list *objfreelist; /* A list of freed objects to avoid malloc() */
275 time_t lastsave; /* Unix time of last save succeeede */
276 size_t usedmemory; /* Used memory in megabytes */
277 /* Fields used only for stats */
278 time_t stat_starttime; /* server start time */
279 long long stat_numcommands; /* number of processed commands */
280 long long stat_numconnections; /* number of connections received */
281 /* Configuration */
282 int verbosity;
283 int glueoutputbuf;
284 int maxidletime;
285 int dbnum;
286 int daemonize;
287 int appendonly;
288 int appendfsync;
289 time_t lastfsync;
290 int appendfd;
291 int appendseldb;
292 char *pidfile;
293 pid_t bgsavechildpid;
294 pid_t bgrewritechildpid;
295 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
296 struct saveparam *saveparams;
297 int saveparamslen;
298 char *logfile;
299 char *bindaddr;
300 char *dbfilename;
301 char *appendfilename;
302 char *requirepass;
303 int shareobjects;
304 /* Replication related */
305 int isslave;
306 char *masterauth;
307 char *masterhost;
308 int masterport;
309 redisClient *master; /* client that is master for this slave */
310 int replstate;
311 unsigned int maxclients;
312 unsigned long maxmemory;
313 /* Sort parameters - qsort_r() is only available under BSD so we
314 * have to take this state global, in order to pass it to sortCompare() */
315 int sort_desc;
316 int sort_alpha;
317 int sort_bypattern;
318 };
319
320 typedef void redisCommandProc(redisClient *c);
321 struct redisCommand {
322 char *name;
323 redisCommandProc *proc;
324 int arity;
325 int flags;
326 };
327
328 struct redisFunctionSym {
329 char *name;
330 unsigned long pointer;
331 };
332
333 typedef struct _redisSortObject {
334 robj *obj;
335 union {
336 double score;
337 robj *cmpobj;
338 } u;
339 } redisSortObject;
340
341 typedef struct _redisSortOperation {
342 int type;
343 robj *pattern;
344 } redisSortOperation;
345
346 /* ZSETs use a specialized version of Skiplists */
347
348 typedef struct zskiplistNode {
349 struct zskiplistNode **forward;
350 struct zskiplistNode *backward;
351 double score;
352 robj *obj;
353 } zskiplistNode;
354
355 typedef struct zskiplist {
356 struct zskiplistNode *header, *tail;
357 unsigned long length;
358 int level;
359 } zskiplist;
360
361 typedef struct zset {
362 dict *dict;
363 zskiplist *zsl;
364 } zset;
365
366 /* Our shared "common" objects */
367
368 struct sharedObjectsStruct {
369 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
370 *colon, *nullbulk, *nullmultibulk,
371 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
372 *outofrangeerr, *plus,
373 *select0, *select1, *select2, *select3, *select4,
374 *select5, *select6, *select7, *select8, *select9;
375 } shared;
376
377 /* Global vars that are actally used as constants. The following double
378 * values are used for double on-disk serialization, and are initialized
379 * at runtime to avoid strange compiler optimizations. */
380
381 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
382
383 /*================================ Prototypes =============================== */
384
385 static void freeStringObject(robj *o);
386 static void freeListObject(robj *o);
387 static void freeSetObject(robj *o);
388 static void decrRefCount(void *o);
389 static robj *createObject(int type, void *ptr);
390 static void freeClient(redisClient *c);
391 static int rdbLoad(char *filename);
392 static void addReply(redisClient *c, robj *obj);
393 static void addReplySds(redisClient *c, sds s);
394 static void incrRefCount(robj *o);
395 static int rdbSaveBackground(char *filename);
396 static robj *createStringObject(char *ptr, size_t len);
397 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
398 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
399 static int syncWithMaster(void);
400 static robj *tryObjectSharing(robj *o);
401 static int tryObjectEncoding(robj *o);
402 static robj *getDecodedObject(robj *o);
403 static int removeExpire(redisDb *db, robj *key);
404 static int expireIfNeeded(redisDb *db, robj *key);
405 static int deleteIfVolatile(redisDb *db, robj *key);
406 static int deleteKey(redisDb *db, robj *key);
407 static time_t getExpire(redisDb *db, robj *key);
408 static int setExpire(redisDb *db, robj *key, time_t when);
409 static void updateSlavesWaitingBgsave(int bgsaveerr);
410 static void freeMemoryIfNeeded(void);
411 static int processCommand(redisClient *c);
412 static void setupSigSegvAction(void);
413 static void rdbRemoveTempFile(pid_t childpid);
414 static void aofRemoveTempFile(pid_t childpid);
415 static size_t stringObjectLen(robj *o);
416 static void processInputBuffer(redisClient *c);
417 static zskiplist *zslCreate(void);
418 static void zslFree(zskiplist *zsl);
419 static void zslInsert(zskiplist *zsl, double score, robj *obj);
420 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
421
422 static void authCommand(redisClient *c);
423 static void pingCommand(redisClient *c);
424 static void echoCommand(redisClient *c);
425 static void setCommand(redisClient *c);
426 static void setnxCommand(redisClient *c);
427 static void getCommand(redisClient *c);
428 static void delCommand(redisClient *c);
429 static void existsCommand(redisClient *c);
430 static void incrCommand(redisClient *c);
431 static void decrCommand(redisClient *c);
432 static void incrbyCommand(redisClient *c);
433 static void decrbyCommand(redisClient *c);
434 static void selectCommand(redisClient *c);
435 static void randomkeyCommand(redisClient *c);
436 static void keysCommand(redisClient *c);
437 static void dbsizeCommand(redisClient *c);
438 static void lastsaveCommand(redisClient *c);
439 static void saveCommand(redisClient *c);
440 static void bgsaveCommand(redisClient *c);
441 static void bgrewriteaofCommand(redisClient *c);
442 static void shutdownCommand(redisClient *c);
443 static void moveCommand(redisClient *c);
444 static void renameCommand(redisClient *c);
445 static void renamenxCommand(redisClient *c);
446 static void lpushCommand(redisClient *c);
447 static void rpushCommand(redisClient *c);
448 static void lpopCommand(redisClient *c);
449 static void rpopCommand(redisClient *c);
450 static void llenCommand(redisClient *c);
451 static void lindexCommand(redisClient *c);
452 static void lrangeCommand(redisClient *c);
453 static void ltrimCommand(redisClient *c);
454 static void typeCommand(redisClient *c);
455 static void lsetCommand(redisClient *c);
456 static void saddCommand(redisClient *c);
457 static void sremCommand(redisClient *c);
458 static void smoveCommand(redisClient *c);
459 static void sismemberCommand(redisClient *c);
460 static void scardCommand(redisClient *c);
461 static void spopCommand(redisClient *c);
462 static void srandmemberCommand(redisClient *c);
463 static void sinterCommand(redisClient *c);
464 static void sinterstoreCommand(redisClient *c);
465 static void sunionCommand(redisClient *c);
466 static void sunionstoreCommand(redisClient *c);
467 static void sdiffCommand(redisClient *c);
468 static void sdiffstoreCommand(redisClient *c);
469 static void syncCommand(redisClient *c);
470 static void flushdbCommand(redisClient *c);
471 static void flushallCommand(redisClient *c);
472 static void sortCommand(redisClient *c);
473 static void lremCommand(redisClient *c);
474 static void rpoplpushcommand(redisClient *c);
475 static void infoCommand(redisClient *c);
476 static void mgetCommand(redisClient *c);
477 static void monitorCommand(redisClient *c);
478 static void expireCommand(redisClient *c);
479 static void expireatCommand(redisClient *c);
480 static void getsetCommand(redisClient *c);
481 static void ttlCommand(redisClient *c);
482 static void slaveofCommand(redisClient *c);
483 static void debugCommand(redisClient *c);
484 static void msetCommand(redisClient *c);
485 static void msetnxCommand(redisClient *c);
486 static void zaddCommand(redisClient *c);
487 static void zincrbyCommand(redisClient *c);
488 static void zrangeCommand(redisClient *c);
489 static void zrangebyscoreCommand(redisClient *c);
490 static void zrevrangeCommand(redisClient *c);
491 static void zcardCommand(redisClient *c);
492 static void zremCommand(redisClient *c);
493 static void zscoreCommand(redisClient *c);
494 static void zremrangebyscoreCommand(redisClient *c);
495
496 /*================================= Globals ================================= */
497
498 /* Global vars */
499 static struct redisServer server; /* server global state */
500 static struct redisCommand cmdTable[] = {
501 {"get",getCommand,2,REDIS_CMD_INLINE},
502 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
503 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
504 {"del",delCommand,-2,REDIS_CMD_INLINE},
505 {"exists",existsCommand,2,REDIS_CMD_INLINE},
506 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
507 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
508 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
509 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
510 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
511 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
512 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
513 {"llen",llenCommand,2,REDIS_CMD_INLINE},
514 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
515 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
516 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
517 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
518 {"lrem",lremCommand,4,REDIS_CMD_BULK},
519 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
520 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
521 {"srem",sremCommand,3,REDIS_CMD_BULK},
522 {"smove",smoveCommand,4,REDIS_CMD_BULK},
523 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
524 {"scard",scardCommand,2,REDIS_CMD_INLINE},
525 {"spop",spopCommand,2,REDIS_CMD_INLINE},
526 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
527 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
528 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
529 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
530 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
531 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
532 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
533 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
534 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
535 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
536 {"zrem",zremCommand,3,REDIS_CMD_BULK},
537 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
538 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
539 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
540 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
541 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
542 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
543 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
544 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
545 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
546 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
547 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
548 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
549 {"select",selectCommand,2,REDIS_CMD_INLINE},
550 {"move",moveCommand,3,REDIS_CMD_INLINE},
551 {"rename",renameCommand,3,REDIS_CMD_INLINE},
552 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
553 {"expire",expireCommand,3,REDIS_CMD_INLINE},
554 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
555 {"keys",keysCommand,2,REDIS_CMD_INLINE},
556 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
557 {"auth",authCommand,2,REDIS_CMD_INLINE},
558 {"ping",pingCommand,1,REDIS_CMD_INLINE},
559 {"echo",echoCommand,2,REDIS_CMD_BULK},
560 {"save",saveCommand,1,REDIS_CMD_INLINE},
561 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
562 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
563 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
564 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
565 {"type",typeCommand,2,REDIS_CMD_INLINE},
566 {"sync",syncCommand,1,REDIS_CMD_INLINE},
567 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
568 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
569 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
570 {"info",infoCommand,1,REDIS_CMD_INLINE},
571 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
572 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
573 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
574 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
575 {NULL,NULL,0,0}
576 };
577
578 /*============================ Utility functions ============================ */
579
580 /* Glob-style pattern matching. */
581 int stringmatchlen(const char *pattern, int patternLen,
582 const char *string, int stringLen, int nocase)
583 {
584 while(patternLen) {
585 switch(pattern[0]) {
586 case '*':
587 while (pattern[1] == '*') {
588 pattern++;
589 patternLen--;
590 }
591 if (patternLen == 1)
592 return 1; /* match */
593 while(stringLen) {
594 if (stringmatchlen(pattern+1, patternLen-1,
595 string, stringLen, nocase))
596 return 1; /* match */
597 string++;
598 stringLen--;
599 }
600 return 0; /* no match */
601 break;
602 case '?':
603 if (stringLen == 0)
604 return 0; /* no match */
605 string++;
606 stringLen--;
607 break;
608 case '[':
609 {
610 int not, match;
611
612 pattern++;
613 patternLen--;
614 not = pattern[0] == '^';
615 if (not) {
616 pattern++;
617 patternLen--;
618 }
619 match = 0;
620 while(1) {
621 if (pattern[0] == '\\') {
622 pattern++;
623 patternLen--;
624 if (pattern[0] == string[0])
625 match = 1;
626 } else if (pattern[0] == ']') {
627 break;
628 } else if (patternLen == 0) {
629 pattern--;
630 patternLen++;
631 break;
632 } else if (pattern[1] == '-' && patternLen >= 3) {
633 int start = pattern[0];
634 int end = pattern[2];
635 int c = string[0];
636 if (start > end) {
637 int t = start;
638 start = end;
639 end = t;
640 }
641 if (nocase) {
642 start = tolower(start);
643 end = tolower(end);
644 c = tolower(c);
645 }
646 pattern += 2;
647 patternLen -= 2;
648 if (c >= start && c <= end)
649 match = 1;
650 } else {
651 if (!nocase) {
652 if (pattern[0] == string[0])
653 match = 1;
654 } else {
655 if (tolower((int)pattern[0]) == tolower((int)string[0]))
656 match = 1;
657 }
658 }
659 pattern++;
660 patternLen--;
661 }
662 if (not)
663 match = !match;
664 if (!match)
665 return 0; /* no match */
666 string++;
667 stringLen--;
668 break;
669 }
670 case '\\':
671 if (patternLen >= 2) {
672 pattern++;
673 patternLen--;
674 }
675 /* fall through */
676 default:
677 if (!nocase) {
678 if (pattern[0] != string[0])
679 return 0; /* no match */
680 } else {
681 if (tolower((int)pattern[0]) != tolower((int)string[0]))
682 return 0; /* no match */
683 }
684 string++;
685 stringLen--;
686 break;
687 }
688 pattern++;
689 patternLen--;
690 if (stringLen == 0) {
691 while(*pattern == '*') {
692 pattern++;
693 patternLen--;
694 }
695 break;
696 }
697 }
698 if (patternLen == 0 && stringLen == 0)
699 return 1;
700 return 0;
701 }
702
703 static void redisLog(int level, const char *fmt, ...) {
704 va_list ap;
705 FILE *fp;
706
707 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
708 if (!fp) return;
709
710 va_start(ap, fmt);
711 if (level >= server.verbosity) {
712 char *c = ".-*";
713 char buf[64];
714 time_t now;
715
716 now = time(NULL);
717 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
718 fprintf(fp,"%s %c ",buf,c[level]);
719 vfprintf(fp, fmt, ap);
720 fprintf(fp,"\n");
721 fflush(fp);
722 }
723 va_end(ap);
724
725 if (server.logfile) fclose(fp);
726 }
727
728 /*====================== Hash table type implementation ==================== */
729
730 /* This is an hash table type that uses the SDS dynamic strings libary as
731 * keys and radis objects as values (objects can hold SDS strings,
732 * lists, sets). */
733
734 static void dictVanillaFree(void *privdata, void *val)
735 {
736 DICT_NOTUSED(privdata);
737 zfree(val);
738 }
739
740 static int sdsDictKeyCompare(void *privdata, const void *key1,
741 const void *key2)
742 {
743 int l1,l2;
744 DICT_NOTUSED(privdata);
745
746 l1 = sdslen((sds)key1);
747 l2 = sdslen((sds)key2);
748 if (l1 != l2) return 0;
749 return memcmp(key1, key2, l1) == 0;
750 }
751
752 static void dictRedisObjectDestructor(void *privdata, void *val)
753 {
754 DICT_NOTUSED(privdata);
755
756 decrRefCount(val);
757 }
758
759 static int dictObjKeyCompare(void *privdata, const void *key1,
760 const void *key2)
761 {
762 const robj *o1 = key1, *o2 = key2;
763 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
764 }
765
766 static unsigned int dictObjHash(const void *key) {
767 const robj *o = key;
768 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
769 }
770
771 static int dictEncObjKeyCompare(void *privdata, const void *key1,
772 const void *key2)
773 {
774 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
775 int cmp;
776
777 o1 = getDecodedObject(o1);
778 o2 = getDecodedObject(o2);
779 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
780 decrRefCount(o1);
781 decrRefCount(o2);
782 return cmp;
783 }
784
785 static unsigned int dictEncObjHash(const void *key) {
786 robj *o = (robj*) key;
787
788 o = getDecodedObject(o);
789 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
790 decrRefCount(o);
791 return hash;
792 }
793
794 static dictType setDictType = {
795 dictEncObjHash, /* hash function */
796 NULL, /* key dup */
797 NULL, /* val dup */
798 dictEncObjKeyCompare, /* key compare */
799 dictRedisObjectDestructor, /* key destructor */
800 NULL /* val destructor */
801 };
802
803 static dictType zsetDictType = {
804 dictEncObjHash, /* hash function */
805 NULL, /* key dup */
806 NULL, /* val dup */
807 dictEncObjKeyCompare, /* key compare */
808 dictRedisObjectDestructor, /* key destructor */
809 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
810 };
811
812 static dictType hashDictType = {
813 dictObjHash, /* hash function */
814 NULL, /* key dup */
815 NULL, /* val dup */
816 dictObjKeyCompare, /* key compare */
817 dictRedisObjectDestructor, /* key destructor */
818 dictRedisObjectDestructor /* val destructor */
819 };
820
821 /* ========================= Random utility functions ======================= */
822
823 /* Redis generally does not try to recover from out of memory conditions
824 * when allocating objects or strings, it is not clear if it will be possible
825 * to report this condition to the client since the networking layer itself
826 * is based on heap allocation for send buffers, so we simply abort.
827 * At least the code will be simpler to read... */
828 static void oom(const char *msg) {
829 fprintf(stderr, "%s: Out of memory\n",msg);
830 fflush(stderr);
831 sleep(1);
832 abort();
833 }
834
835 /* ====================== Redis server networking stuff ===================== */
836 static void closeTimedoutClients(void) {
837 redisClient *c;
838 listNode *ln;
839 time_t now = time(NULL);
840
841 listRewind(server.clients);
842 while ((ln = listYield(server.clients)) != NULL) {
843 c = listNodeValue(ln);
844 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
845 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
846 (now - c->lastinteraction > server.maxidletime)) {
847 redisLog(REDIS_DEBUG,"Closing idle client");
848 freeClient(c);
849 }
850 }
851 }
852
853 static int htNeedsResize(dict *dict) {
854 long long size, used;
855
856 size = dictSlots(dict);
857 used = dictSize(dict);
858 return (size && used && size > DICT_HT_INITIAL_SIZE &&
859 (used*100/size < REDIS_HT_MINFILL));
860 }
861
862 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
863 * we resize the hash table to save memory */
864 static void tryResizeHashTables(void) {
865 int j;
866
867 for (j = 0; j < server.dbnum; j++) {
868 if (htNeedsResize(server.db[j].dict)) {
869 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
870 dictResize(server.db[j].dict);
871 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
872 }
873 if (htNeedsResize(server.db[j].expires))
874 dictResize(server.db[j].expires);
875 }
876 }
877
878 /* A background saving child (BGSAVE) terminated its work. Handle this. */
879 void backgroundSaveDoneHandler(int statloc) {
880 int exitcode = WEXITSTATUS(statloc);
881 int bysignal = WIFSIGNALED(statloc);
882
883 if (!bysignal && exitcode == 0) {
884 redisLog(REDIS_NOTICE,
885 "Background saving terminated with success");
886 server.dirty = 0;
887 server.lastsave = time(NULL);
888 } else if (!bysignal && exitcode != 0) {
889 redisLog(REDIS_WARNING, "Background saving error");
890 } else {
891 redisLog(REDIS_WARNING,
892 "Background saving terminated by signal");
893 rdbRemoveTempFile(server.bgsavechildpid);
894 }
895 server.bgsavechildpid = -1;
896 /* Possibly there are slaves waiting for a BGSAVE in order to be served
897 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
898 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
899 }
900
901 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
902 * Handle this. */
903 void backgroundRewriteDoneHandler(int statloc) {
904 int exitcode = WEXITSTATUS(statloc);
905 int bysignal = WIFSIGNALED(statloc);
906
907 if (!bysignal && exitcode == 0) {
908 int fd;
909 char tmpfile[256];
910
911 redisLog(REDIS_NOTICE,
912 "Background append only file rewriting terminated with success");
913 /* Now it's time to flush the differences accumulated by the parent */
914 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
915 fd = open(tmpfile,O_WRONLY|O_APPEND);
916 if (fd == -1) {
917 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
918 goto cleanup;
919 }
920 /* Flush our data... */
921 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
922 (signed) sdslen(server.bgrewritebuf)) {
923 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
924 close(fd);
925 goto cleanup;
926 }
927 redisLog(REDIS_WARNING,"Parent diff flushed into the new append log file with success");
928 /* Now our work is to rename the temp file into the stable file. And
929 * switch the file descriptor used by the server for append only. */
930 if (rename(tmpfile,server.appendfilename) == -1) {
931 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
932 close(fd);
933 goto cleanup;
934 }
935 /* Mission completed... almost */
936 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
937 if (server.appendfd != -1) {
938 /* If append only is actually enabled... */
939 close(server.appendfd);
940 server.appendfd = fd;
941 fsync(fd);
942 server.appendseldb = -1; /* Make sure it will issue SELECT */
943 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
944 } else {
945 /* If append only is disabled we just generate a dump in this
946 * format. Why not? */
947 close(fd);
948 }
949 } else if (!bysignal && exitcode != 0) {
950 redisLog(REDIS_WARNING, "Background append only file rewriting error");
951 } else {
952 redisLog(REDIS_WARNING,
953 "Background append only file rewriting terminated by signal");
954 }
955 cleanup:
956 sdsfree(server.bgrewritebuf);
957 server.bgrewritebuf = sdsempty();
958 aofRemoveTempFile(server.bgrewritechildpid);
959 server.bgrewritechildpid = -1;
960 }
961
962 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
963 int j, loops = server.cronloops++;
964 REDIS_NOTUSED(eventLoop);
965 REDIS_NOTUSED(id);
966 REDIS_NOTUSED(clientData);
967
968 /* Update the global state with the amount of used memory */
969 server.usedmemory = zmalloc_used_memory();
970
971 /* Show some info about non-empty databases */
972 for (j = 0; j < server.dbnum; j++) {
973 long long size, used, vkeys;
974
975 size = dictSlots(server.db[j].dict);
976 used = dictSize(server.db[j].dict);
977 vkeys = dictSize(server.db[j].expires);
978 if (!(loops % 5) && (used || vkeys)) {
979 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
980 /* dictPrintStats(server.dict); */
981 }
982 }
983
984 /* We don't want to resize the hash tables while a bacground saving
985 * is in progress: the saving child is created using fork() that is
986 * implemented with a copy-on-write semantic in most modern systems, so
987 * if we resize the HT while there is the saving child at work actually
988 * a lot of memory movements in the parent will cause a lot of pages
989 * copied. */
990 if (server.bgsavechildpid == -1) tryResizeHashTables();
991
992 /* Show information about connected clients */
993 if (!(loops % 5)) {
994 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
995 listLength(server.clients)-listLength(server.slaves),
996 listLength(server.slaves),
997 server.usedmemory,
998 dictSize(server.sharingpool));
999 }
1000
1001 /* Close connections of timedout clients */
1002 if (server.maxidletime && !(loops % 10))
1003 closeTimedoutClients();
1004
1005 /* Check if a background saving or AOF rewrite in progress terminated */
1006 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
1007 int statloc;
1008 pid_t pid;
1009
1010 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1011 if (pid == server.bgsavechildpid) {
1012 backgroundSaveDoneHandler(statloc);
1013 } else {
1014 backgroundRewriteDoneHandler(statloc);
1015 }
1016 }
1017 } else {
1018 /* If there is not a background saving in progress check if
1019 * we have to save now */
1020 time_t now = time(NULL);
1021 for (j = 0; j < server.saveparamslen; j++) {
1022 struct saveparam *sp = server.saveparams+j;
1023
1024 if (server.dirty >= sp->changes &&
1025 now-server.lastsave > sp->seconds) {
1026 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1027 sp->changes, sp->seconds);
1028 rdbSaveBackground(server.dbfilename);
1029 break;
1030 }
1031 }
1032 }
1033
1034 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1035 * will use few CPU cycles if there are few expiring keys, otherwise
1036 * it will get more aggressive to avoid that too much memory is used by
1037 * keys that can be removed from the keyspace. */
1038 for (j = 0; j < server.dbnum; j++) {
1039 int expired;
1040 redisDb *db = server.db+j;
1041
1042 /* Continue to expire if at the end of the cycle more than 25%
1043 * of the keys were expired. */
1044 do {
1045 int num = dictSize(db->expires);
1046 time_t now = time(NULL);
1047
1048 expired = 0;
1049 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1050 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1051 while (num--) {
1052 dictEntry *de;
1053 time_t t;
1054
1055 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1056 t = (time_t) dictGetEntryVal(de);
1057 if (now > t) {
1058 deleteKey(db,dictGetEntryKey(de));
1059 expired++;
1060 }
1061 }
1062 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1063 }
1064
1065 /* Check if we should connect to a MASTER */
1066 if (server.replstate == REDIS_REPL_CONNECT) {
1067 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1068 if (syncWithMaster() == REDIS_OK) {
1069 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1070 }
1071 }
1072 return 1000;
1073 }
1074
1075 static void createSharedObjects(void) {
1076 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1077 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1078 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1079 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1080 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1081 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1082 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1083 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1084 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1085 /* no such key */
1086 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1087 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1088 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1089 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1090 "-ERR no such key\r\n"));
1091 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1092 "-ERR syntax error\r\n"));
1093 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1094 "-ERR source and destination objects are the same\r\n"));
1095 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1096 "-ERR index out of range\r\n"));
1097 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1098 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1099 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1100 shared.select0 = createStringObject("select 0\r\n",10);
1101 shared.select1 = createStringObject("select 1\r\n",10);
1102 shared.select2 = createStringObject("select 2\r\n",10);
1103 shared.select3 = createStringObject("select 3\r\n",10);
1104 shared.select4 = createStringObject("select 4\r\n",10);
1105 shared.select5 = createStringObject("select 5\r\n",10);
1106 shared.select6 = createStringObject("select 6\r\n",10);
1107 shared.select7 = createStringObject("select 7\r\n",10);
1108 shared.select8 = createStringObject("select 8\r\n",10);
1109 shared.select9 = createStringObject("select 9\r\n",10);
1110 }
1111
1112 static void appendServerSaveParams(time_t seconds, int changes) {
1113 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1114 server.saveparams[server.saveparamslen].seconds = seconds;
1115 server.saveparams[server.saveparamslen].changes = changes;
1116 server.saveparamslen++;
1117 }
1118
1119 static void resetServerSaveParams() {
1120 zfree(server.saveparams);
1121 server.saveparams = NULL;
1122 server.saveparamslen = 0;
1123 }
1124
1125 static void initServerConfig() {
1126 server.dbnum = REDIS_DEFAULT_DBNUM;
1127 server.port = REDIS_SERVERPORT;
1128 server.verbosity = REDIS_DEBUG;
1129 server.maxidletime = REDIS_MAXIDLETIME;
1130 server.saveparams = NULL;
1131 server.logfile = NULL; /* NULL = log on standard output */
1132 server.bindaddr = NULL;
1133 server.glueoutputbuf = 1;
1134 server.daemonize = 0;
1135 server.appendonly = 0;
1136 server.appendfsync = APPENDFSYNC_ALWAYS;
1137 server.lastfsync = time(NULL);
1138 server.appendfd = -1;
1139 server.appendseldb = -1; /* Make sure the first time will not match */
1140 server.pidfile = "/var/run/redis.pid";
1141 server.dbfilename = "dump.rdb";
1142 server.appendfilename = "appendonly.aof";
1143 server.requirepass = NULL;
1144 server.shareobjects = 0;
1145 server.sharingpoolsize = 1024;
1146 server.maxclients = 0;
1147 server.maxmemory = 0;
1148 resetServerSaveParams();
1149
1150 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1151 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1152 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1153 /* Replication related */
1154 server.isslave = 0;
1155 server.masterauth = NULL;
1156 server.masterhost = NULL;
1157 server.masterport = 6379;
1158 server.master = NULL;
1159 server.replstate = REDIS_REPL_NONE;
1160
1161 /* Double constants initialization */
1162 R_Zero = 0.0;
1163 R_PosInf = 1.0/R_Zero;
1164 R_NegInf = -1.0/R_Zero;
1165 R_Nan = R_Zero/R_Zero;
1166 }
1167
1168 static void initServer() {
1169 int j;
1170
1171 signal(SIGHUP, SIG_IGN);
1172 signal(SIGPIPE, SIG_IGN);
1173 setupSigSegvAction();
1174
1175 server.clients = listCreate();
1176 server.slaves = listCreate();
1177 server.monitors = listCreate();
1178 server.objfreelist = listCreate();
1179 createSharedObjects();
1180 server.el = aeCreateEventLoop();
1181 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1182 server.sharingpool = dictCreate(&setDictType,NULL);
1183 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1184 if (server.fd == -1) {
1185 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1186 exit(1);
1187 }
1188 for (j = 0; j < server.dbnum; j++) {
1189 server.db[j].dict = dictCreate(&hashDictType,NULL);
1190 server.db[j].expires = dictCreate(&setDictType,NULL);
1191 server.db[j].id = j;
1192 }
1193 server.cronloops = 0;
1194 server.bgsavechildpid = -1;
1195 server.bgrewritechildpid = -1;
1196 server.bgrewritebuf = sdsempty();
1197 server.lastsave = time(NULL);
1198 server.dirty = 0;
1199 server.usedmemory = 0;
1200 server.stat_numcommands = 0;
1201 server.stat_numconnections = 0;
1202 server.stat_starttime = time(NULL);
1203 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1204
1205 if (server.appendonly) {
1206 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1207 if (server.appendfd == -1) {
1208 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1209 strerror(errno));
1210 exit(1);
1211 }
1212 }
1213 }
1214
1215 /* Empty the whole database */
1216 static long long emptyDb() {
1217 int j;
1218 long long removed = 0;
1219
1220 for (j = 0; j < server.dbnum; j++) {
1221 removed += dictSize(server.db[j].dict);
1222 dictEmpty(server.db[j].dict);
1223 dictEmpty(server.db[j].expires);
1224 }
1225 return removed;
1226 }
1227
1228 static int yesnotoi(char *s) {
1229 if (!strcasecmp(s,"yes")) return 1;
1230 else if (!strcasecmp(s,"no")) return 0;
1231 else return -1;
1232 }
1233
1234 /* I agree, this is a very rudimental way to load a configuration...
1235 will improve later if the config gets more complex */
1236 static void loadServerConfig(char *filename) {
1237 FILE *fp;
1238 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1239 int linenum = 0;
1240 sds line = NULL;
1241
1242 if (filename[0] == '-' && filename[1] == '\0')
1243 fp = stdin;
1244 else {
1245 if ((fp = fopen(filename,"r")) == NULL) {
1246 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1247 exit(1);
1248 }
1249 }
1250
1251 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1252 sds *argv;
1253 int argc, j;
1254
1255 linenum++;
1256 line = sdsnew(buf);
1257 line = sdstrim(line," \t\r\n");
1258
1259 /* Skip comments and blank lines*/
1260 if (line[0] == '#' || line[0] == '\0') {
1261 sdsfree(line);
1262 continue;
1263 }
1264
1265 /* Split into arguments */
1266 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1267 sdstolower(argv[0]);
1268
1269 /* Execute config directives */
1270 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1271 server.maxidletime = atoi(argv[1]);
1272 if (server.maxidletime < 0) {
1273 err = "Invalid timeout value"; goto loaderr;
1274 }
1275 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1276 server.port = atoi(argv[1]);
1277 if (server.port < 1 || server.port > 65535) {
1278 err = "Invalid port"; goto loaderr;
1279 }
1280 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1281 server.bindaddr = zstrdup(argv[1]);
1282 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1283 int seconds = atoi(argv[1]);
1284 int changes = atoi(argv[2]);
1285 if (seconds < 1 || changes < 0) {
1286 err = "Invalid save parameters"; goto loaderr;
1287 }
1288 appendServerSaveParams(seconds,changes);
1289 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1290 if (chdir(argv[1]) == -1) {
1291 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1292 argv[1], strerror(errno));
1293 exit(1);
1294 }
1295 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1296 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1297 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1298 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1299 else {
1300 err = "Invalid log level. Must be one of debug, notice, warning";
1301 goto loaderr;
1302 }
1303 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1304 FILE *logfp;
1305
1306 server.logfile = zstrdup(argv[1]);
1307 if (!strcasecmp(server.logfile,"stdout")) {
1308 zfree(server.logfile);
1309 server.logfile = NULL;
1310 }
1311 if (server.logfile) {
1312 /* Test if we are able to open the file. The server will not
1313 * be able to abort just for this problem later... */
1314 logfp = fopen(server.logfile,"a");
1315 if (logfp == NULL) {
1316 err = sdscatprintf(sdsempty(),
1317 "Can't open the log file: %s", strerror(errno));
1318 goto loaderr;
1319 }
1320 fclose(logfp);
1321 }
1322 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1323 server.dbnum = atoi(argv[1]);
1324 if (server.dbnum < 1) {
1325 err = "Invalid number of databases"; goto loaderr;
1326 }
1327 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1328 server.maxclients = atoi(argv[1]);
1329 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1330 server.maxmemory = strtoll(argv[1], NULL, 10);
1331 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1332 server.masterhost = sdsnew(argv[1]);
1333 server.masterport = atoi(argv[2]);
1334 server.replstate = REDIS_REPL_CONNECT;
1335 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1336 server.masterauth = zstrdup(argv[1]);
1337 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1338 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1339 err = "argument must be 'yes' or 'no'"; goto loaderr;
1340 }
1341 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1342 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1343 err = "argument must be 'yes' or 'no'"; goto loaderr;
1344 }
1345 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1346 server.sharingpoolsize = atoi(argv[1]);
1347 if (server.sharingpoolsize < 1) {
1348 err = "invalid object sharing pool size"; goto loaderr;
1349 }
1350 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1351 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1352 err = "argument must be 'yes' or 'no'"; goto loaderr;
1353 }
1354 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1355 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1356 err = "argument must be 'yes' or 'no'"; goto loaderr;
1357 }
1358 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1359 if (!strcasecmp(argv[1],"no")) {
1360 server.appendfsync = APPENDFSYNC_NO;
1361 } else if (!strcasecmp(argv[1],"always")) {
1362 server.appendfsync = APPENDFSYNC_ALWAYS;
1363 } else if (!strcasecmp(argv[1],"everysec")) {
1364 server.appendfsync = APPENDFSYNC_EVERYSEC;
1365 } else {
1366 err = "argument must be 'no', 'always' or 'everysec'";
1367 goto loaderr;
1368 }
1369 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1370 server.requirepass = zstrdup(argv[1]);
1371 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1372 server.pidfile = zstrdup(argv[1]);
1373 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1374 server.dbfilename = zstrdup(argv[1]);
1375 } else {
1376 err = "Bad directive or wrong number of arguments"; goto loaderr;
1377 }
1378 for (j = 0; j < argc; j++)
1379 sdsfree(argv[j]);
1380 zfree(argv);
1381 sdsfree(line);
1382 }
1383 if (fp != stdin) fclose(fp);
1384 return;
1385
1386 loaderr:
1387 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1388 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1389 fprintf(stderr, ">>> '%s'\n", line);
1390 fprintf(stderr, "%s\n", err);
1391 exit(1);
1392 }
1393
1394 static void freeClientArgv(redisClient *c) {
1395 int j;
1396
1397 for (j = 0; j < c->argc; j++)
1398 decrRefCount(c->argv[j]);
1399 for (j = 0; j < c->mbargc; j++)
1400 decrRefCount(c->mbargv[j]);
1401 c->argc = 0;
1402 c->mbargc = 0;
1403 }
1404
1405 static void freeClient(redisClient *c) {
1406 listNode *ln;
1407
1408 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1409 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1410 sdsfree(c->querybuf);
1411 listRelease(c->reply);
1412 freeClientArgv(c);
1413 close(c->fd);
1414 ln = listSearchKey(server.clients,c);
1415 redisAssert(ln != NULL);
1416 listDelNode(server.clients,ln);
1417 if (c->flags & REDIS_SLAVE) {
1418 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1419 close(c->repldbfd);
1420 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1421 ln = listSearchKey(l,c);
1422 redisAssert(ln != NULL);
1423 listDelNode(l,ln);
1424 }
1425 if (c->flags & REDIS_MASTER) {
1426 server.master = NULL;
1427 server.replstate = REDIS_REPL_CONNECT;
1428 }
1429 zfree(c->argv);
1430 zfree(c->mbargv);
1431 zfree(c);
1432 }
1433
1434 #define GLUEREPLY_UP_TO (1024)
1435 static void glueReplyBuffersIfNeeded(redisClient *c) {
1436 int copylen = 0;
1437 char buf[GLUEREPLY_UP_TO];
1438 listNode *ln;
1439 robj *o;
1440
1441 listRewind(c->reply);
1442 while((ln = listYield(c->reply))) {
1443 int objlen;
1444
1445 o = ln->value;
1446 objlen = sdslen(o->ptr);
1447 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1448 memcpy(buf+copylen,o->ptr,objlen);
1449 copylen += objlen;
1450 listDelNode(c->reply,ln);
1451 } else {
1452 if (copylen == 0) return;
1453 break;
1454 }
1455 }
1456 /* Now the output buffer is empty, add the new single element */
1457 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1458 listAddNodeHead(c->reply,o);
1459 }
1460
1461 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1462 redisClient *c = privdata;
1463 int nwritten = 0, totwritten = 0, objlen;
1464 robj *o;
1465 REDIS_NOTUSED(el);
1466 REDIS_NOTUSED(mask);
1467
1468 /* Use writev() if we have enough buffers to send */
1469 if (!server.glueoutputbuf &&
1470 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1471 !(c->flags & REDIS_MASTER))
1472 {
1473 sendReplyToClientWritev(el, fd, privdata, mask);
1474 return;
1475 }
1476
1477 while(listLength(c->reply)) {
1478 if (server.glueoutputbuf && listLength(c->reply) > 1)
1479 glueReplyBuffersIfNeeded(c);
1480
1481 o = listNodeValue(listFirst(c->reply));
1482 objlen = sdslen(o->ptr);
1483
1484 if (objlen == 0) {
1485 listDelNode(c->reply,listFirst(c->reply));
1486 continue;
1487 }
1488
1489 if (c->flags & REDIS_MASTER) {
1490 /* Don't reply to a master */
1491 nwritten = objlen - c->sentlen;
1492 } else {
1493 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1494 if (nwritten <= 0) break;
1495 }
1496 c->sentlen += nwritten;
1497 totwritten += nwritten;
1498 /* If we fully sent the object on head go to the next one */
1499 if (c->sentlen == objlen) {
1500 listDelNode(c->reply,listFirst(c->reply));
1501 c->sentlen = 0;
1502 }
1503 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1504 * bytes, in a single threaded server it's a good idea to serve
1505 * other clients as well, even if a very large request comes from
1506 * super fast link that is always able to accept data (in real world
1507 * scenario think about 'KEYS *' against the loopback interfae) */
1508 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1509 }
1510 if (nwritten == -1) {
1511 if (errno == EAGAIN) {
1512 nwritten = 0;
1513 } else {
1514 redisLog(REDIS_DEBUG,
1515 "Error writing to client: %s", strerror(errno));
1516 freeClient(c);
1517 return;
1518 }
1519 }
1520 if (totwritten > 0) c->lastinteraction = time(NULL);
1521 if (listLength(c->reply) == 0) {
1522 c->sentlen = 0;
1523 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1524 }
1525 }
1526
1527 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1528 {
1529 redisClient *c = privdata;
1530 int nwritten = 0, totwritten = 0, objlen, willwrite;
1531 robj *o;
1532 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1533 int offset, ion = 0;
1534 REDIS_NOTUSED(el);
1535 REDIS_NOTUSED(mask);
1536
1537 listNode *node;
1538 while (listLength(c->reply)) {
1539 offset = c->sentlen;
1540 ion = 0;
1541 willwrite = 0;
1542
1543 /* fill-in the iov[] array */
1544 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1545 o = listNodeValue(node);
1546 objlen = sdslen(o->ptr);
1547
1548 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1549 break;
1550
1551 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1552 break; /* no more iovecs */
1553
1554 iov[ion].iov_base = ((char*)o->ptr) + offset;
1555 iov[ion].iov_len = objlen - offset;
1556 willwrite += objlen - offset;
1557 offset = 0; /* just for the first item */
1558 ion++;
1559 }
1560
1561 if(willwrite == 0)
1562 break;
1563
1564 /* write all collected blocks at once */
1565 if((nwritten = writev(fd, iov, ion)) < 0) {
1566 if (errno != EAGAIN) {
1567 redisLog(REDIS_DEBUG,
1568 "Error writing to client: %s", strerror(errno));
1569 freeClient(c);
1570 return;
1571 }
1572 break;
1573 }
1574
1575 totwritten += nwritten;
1576 offset = c->sentlen;
1577
1578 /* remove written robjs from c->reply */
1579 while (nwritten && listLength(c->reply)) {
1580 o = listNodeValue(listFirst(c->reply));
1581 objlen = sdslen(o->ptr);
1582
1583 if(nwritten >= objlen - offset) {
1584 listDelNode(c->reply, listFirst(c->reply));
1585 nwritten -= objlen - offset;
1586 c->sentlen = 0;
1587 } else {
1588 /* partial write */
1589 c->sentlen += nwritten;
1590 break;
1591 }
1592 offset = 0;
1593 }
1594 }
1595
1596 if (totwritten > 0)
1597 c->lastinteraction = time(NULL);
1598
1599 if (listLength(c->reply) == 0) {
1600 c->sentlen = 0;
1601 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1602 }
1603 }
1604
1605 static struct redisCommand *lookupCommand(char *name) {
1606 int j = 0;
1607 while(cmdTable[j].name != NULL) {
1608 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1609 j++;
1610 }
1611 return NULL;
1612 }
1613
1614 /* resetClient prepare the client to process the next command */
1615 static void resetClient(redisClient *c) {
1616 freeClientArgv(c);
1617 c->bulklen = -1;
1618 c->multibulk = 0;
1619 }
1620
1621 /* If this function gets called we already read a whole
1622 * command, argments are in the client argv/argc fields.
1623 * processCommand() execute the command or prepare the
1624 * server for a bulk read from the client.
1625 *
1626 * If 1 is returned the client is still alive and valid and
1627 * and other operations can be performed by the caller. Otherwise
1628 * if 0 is returned the client was destroied (i.e. after QUIT). */
1629 static int processCommand(redisClient *c) {
1630 struct redisCommand *cmd;
1631 long long dirty;
1632
1633 /* Free some memory if needed (maxmemory setting) */
1634 if (server.maxmemory) freeMemoryIfNeeded();
1635
1636 /* Handle the multi bulk command type. This is an alternative protocol
1637 * supported by Redis in order to receive commands that are composed of
1638 * multiple binary-safe "bulk" arguments. The latency of processing is
1639 * a bit higher but this allows things like multi-sets, so if this
1640 * protocol is used only for MSET and similar commands this is a big win. */
1641 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1642 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1643 if (c->multibulk <= 0) {
1644 resetClient(c);
1645 return 1;
1646 } else {
1647 decrRefCount(c->argv[c->argc-1]);
1648 c->argc--;
1649 return 1;
1650 }
1651 } else if (c->multibulk) {
1652 if (c->bulklen == -1) {
1653 if (((char*)c->argv[0]->ptr)[0] != '$') {
1654 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1655 resetClient(c);
1656 return 1;
1657 } else {
1658 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1659 decrRefCount(c->argv[0]);
1660 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1661 c->argc--;
1662 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1663 resetClient(c);
1664 return 1;
1665 }
1666 c->argc--;
1667 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1668 return 1;
1669 }
1670 } else {
1671 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1672 c->mbargv[c->mbargc] = c->argv[0];
1673 c->mbargc++;
1674 c->argc--;
1675 c->multibulk--;
1676 if (c->multibulk == 0) {
1677 robj **auxargv;
1678 int auxargc;
1679
1680 /* Here we need to swap the multi-bulk argc/argv with the
1681 * normal argc/argv of the client structure. */
1682 auxargv = c->argv;
1683 c->argv = c->mbargv;
1684 c->mbargv = auxargv;
1685
1686 auxargc = c->argc;
1687 c->argc = c->mbargc;
1688 c->mbargc = auxargc;
1689
1690 /* We need to set bulklen to something different than -1
1691 * in order for the code below to process the command without
1692 * to try to read the last argument of a bulk command as
1693 * a special argument. */
1694 c->bulklen = 0;
1695 /* continue below and process the command */
1696 } else {
1697 c->bulklen = -1;
1698 return 1;
1699 }
1700 }
1701 }
1702 /* -- end of multi bulk commands processing -- */
1703
1704 /* The QUIT command is handled as a special case. Normal command
1705 * procs are unable to close the client connection safely */
1706 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1707 freeClient(c);
1708 return 0;
1709 }
1710 cmd = lookupCommand(c->argv[0]->ptr);
1711 if (!cmd) {
1712 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1713 resetClient(c);
1714 return 1;
1715 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1716 (c->argc < -cmd->arity)) {
1717 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1718 resetClient(c);
1719 return 1;
1720 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1721 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1722 resetClient(c);
1723 return 1;
1724 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1725 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1726
1727 decrRefCount(c->argv[c->argc-1]);
1728 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1729 c->argc--;
1730 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1731 resetClient(c);
1732 return 1;
1733 }
1734 c->argc--;
1735 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1736 /* It is possible that the bulk read is already in the
1737 * buffer. Check this condition and handle it accordingly.
1738 * This is just a fast path, alternative to call processInputBuffer().
1739 * It's a good idea since the code is small and this condition
1740 * happens most of the times. */
1741 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1742 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1743 c->argc++;
1744 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1745 } else {
1746 return 1;
1747 }
1748 }
1749 /* Let's try to share objects on the command arguments vector */
1750 if (server.shareobjects) {
1751 int j;
1752 for(j = 1; j < c->argc; j++)
1753 c->argv[j] = tryObjectSharing(c->argv[j]);
1754 }
1755 /* Let's try to encode the bulk object to save space. */
1756 if (cmd->flags & REDIS_CMD_BULK)
1757 tryObjectEncoding(c->argv[c->argc-1]);
1758
1759 /* Check if the user is authenticated */
1760 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1761 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1762 resetClient(c);
1763 return 1;
1764 }
1765
1766 /* Exec the command */
1767 dirty = server.dirty;
1768 cmd->proc(c);
1769 if (server.appendonly && server.dirty-dirty)
1770 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1771 if (server.dirty-dirty && listLength(server.slaves))
1772 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1773 if (listLength(server.monitors))
1774 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1775 server.stat_numcommands++;
1776
1777 /* Prepare the client for the next command */
1778 if (c->flags & REDIS_CLOSE) {
1779 freeClient(c);
1780 return 0;
1781 }
1782 resetClient(c);
1783 return 1;
1784 }
1785
1786 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1787 listNode *ln;
1788 int outc = 0, j;
1789 robj **outv;
1790 /* (args*2)+1 is enough room for args, spaces, newlines */
1791 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1792
1793 if (argc <= REDIS_STATIC_ARGS) {
1794 outv = static_outv;
1795 } else {
1796 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1797 }
1798
1799 for (j = 0; j < argc; j++) {
1800 if (j != 0) outv[outc++] = shared.space;
1801 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1802 robj *lenobj;
1803
1804 lenobj = createObject(REDIS_STRING,
1805 sdscatprintf(sdsempty(),"%lu\r\n",
1806 stringObjectLen(argv[j])));
1807 lenobj->refcount = 0;
1808 outv[outc++] = lenobj;
1809 }
1810 outv[outc++] = argv[j];
1811 }
1812 outv[outc++] = shared.crlf;
1813
1814 /* Increment all the refcounts at start and decrement at end in order to
1815 * be sure to free objects if there is no slave in a replication state
1816 * able to be feed with commands */
1817 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1818 listRewind(slaves);
1819 while((ln = listYield(slaves))) {
1820 redisClient *slave = ln->value;
1821
1822 /* Don't feed slaves that are still waiting for BGSAVE to start */
1823 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1824
1825 /* Feed all the other slaves, MONITORs and so on */
1826 if (slave->slaveseldb != dictid) {
1827 robj *selectcmd;
1828
1829 switch(dictid) {
1830 case 0: selectcmd = shared.select0; break;
1831 case 1: selectcmd = shared.select1; break;
1832 case 2: selectcmd = shared.select2; break;
1833 case 3: selectcmd = shared.select3; break;
1834 case 4: selectcmd = shared.select4; break;
1835 case 5: selectcmd = shared.select5; break;
1836 case 6: selectcmd = shared.select6; break;
1837 case 7: selectcmd = shared.select7; break;
1838 case 8: selectcmd = shared.select8; break;
1839 case 9: selectcmd = shared.select9; break;
1840 default:
1841 selectcmd = createObject(REDIS_STRING,
1842 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1843 selectcmd->refcount = 0;
1844 break;
1845 }
1846 addReply(slave,selectcmd);
1847 slave->slaveseldb = dictid;
1848 }
1849 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1850 }
1851 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1852 if (outv != static_outv) zfree(outv);
1853 }
1854
1855 static void processInputBuffer(redisClient *c) {
1856 again:
1857 if (c->bulklen == -1) {
1858 /* Read the first line of the query */
1859 char *p = strchr(c->querybuf,'\n');
1860 size_t querylen;
1861
1862 if (p) {
1863 sds query, *argv;
1864 int argc, j;
1865
1866 query = c->querybuf;
1867 c->querybuf = sdsempty();
1868 querylen = 1+(p-(query));
1869 if (sdslen(query) > querylen) {
1870 /* leave data after the first line of the query in the buffer */
1871 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1872 }
1873 *p = '\0'; /* remove "\n" */
1874 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1875 sdsupdatelen(query);
1876
1877 /* Now we can split the query in arguments */
1878 if (sdslen(query) == 0) {
1879 /* Ignore empty query */
1880 sdsfree(query);
1881 return;
1882 }
1883 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1884 sdsfree(query);
1885
1886 if (c->argv) zfree(c->argv);
1887 c->argv = zmalloc(sizeof(robj*)*argc);
1888
1889 for (j = 0; j < argc; j++) {
1890 if (sdslen(argv[j])) {
1891 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1892 c->argc++;
1893 } else {
1894 sdsfree(argv[j]);
1895 }
1896 }
1897 zfree(argv);
1898 /* Execute the command. If the client is still valid
1899 * after processCommand() return and there is something
1900 * on the query buffer try to process the next command. */
1901 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1902 return;
1903 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1904 redisLog(REDIS_DEBUG, "Client protocol error");
1905 freeClient(c);
1906 return;
1907 }
1908 } else {
1909 /* Bulk read handling. Note that if we are at this point
1910 the client already sent a command terminated with a newline,
1911 we are reading the bulk data that is actually the last
1912 argument of the command. */
1913 int qbl = sdslen(c->querybuf);
1914
1915 if (c->bulklen <= qbl) {
1916 /* Copy everything but the final CRLF as final argument */
1917 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1918 c->argc++;
1919 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1920 /* Process the command. If the client is still valid after
1921 * the processing and there is more data in the buffer
1922 * try to parse it. */
1923 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1924 return;
1925 }
1926 }
1927 }
1928
1929 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1930 redisClient *c = (redisClient*) privdata;
1931 char buf[REDIS_IOBUF_LEN];
1932 int nread;
1933 REDIS_NOTUSED(el);
1934 REDIS_NOTUSED(mask);
1935
1936 nread = read(fd, buf, REDIS_IOBUF_LEN);
1937 if (nread == -1) {
1938 if (errno == EAGAIN) {
1939 nread = 0;
1940 } else {
1941 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1942 freeClient(c);
1943 return;
1944 }
1945 } else if (nread == 0) {
1946 redisLog(REDIS_DEBUG, "Client closed connection");
1947 freeClient(c);
1948 return;
1949 }
1950 if (nread) {
1951 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1952 c->lastinteraction = time(NULL);
1953 } else {
1954 return;
1955 }
1956 processInputBuffer(c);
1957 }
1958
1959 static int selectDb(redisClient *c, int id) {
1960 if (id < 0 || id >= server.dbnum)
1961 return REDIS_ERR;
1962 c->db = &server.db[id];
1963 return REDIS_OK;
1964 }
1965
1966 static void *dupClientReplyValue(void *o) {
1967 incrRefCount((robj*)o);
1968 return 0;
1969 }
1970
1971 static redisClient *createClient(int fd) {
1972 redisClient *c = zmalloc(sizeof(*c));
1973
1974 anetNonBlock(NULL,fd);
1975 anetTcpNoDelay(NULL,fd);
1976 if (!c) return NULL;
1977 selectDb(c,0);
1978 c->fd = fd;
1979 c->querybuf = sdsempty();
1980 c->argc = 0;
1981 c->argv = NULL;
1982 c->bulklen = -1;
1983 c->multibulk = 0;
1984 c->mbargc = 0;
1985 c->mbargv = NULL;
1986 c->sentlen = 0;
1987 c->flags = 0;
1988 c->lastinteraction = time(NULL);
1989 c->authenticated = 0;
1990 c->replstate = REDIS_REPL_NONE;
1991 c->reply = listCreate();
1992 listSetFreeMethod(c->reply,decrRefCount);
1993 listSetDupMethod(c->reply,dupClientReplyValue);
1994 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1995 readQueryFromClient, c) == AE_ERR) {
1996 freeClient(c);
1997 return NULL;
1998 }
1999 listAddNodeTail(server.clients,c);
2000 return c;
2001 }
2002
2003 static void addReply(redisClient *c, robj *obj) {
2004 if (listLength(c->reply) == 0 &&
2005 (c->replstate == REDIS_REPL_NONE ||
2006 c->replstate == REDIS_REPL_ONLINE) &&
2007 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
2008 sendReplyToClient, c) == AE_ERR) return;
2009 listAddNodeTail(c->reply,getDecodedObject(obj));
2010 }
2011
2012 static void addReplySds(redisClient *c, sds s) {
2013 robj *o = createObject(REDIS_STRING,s);
2014 addReply(c,o);
2015 decrRefCount(o);
2016 }
2017
2018 static void addReplyDouble(redisClient *c, double d) {
2019 char buf[128];
2020
2021 snprintf(buf,sizeof(buf),"%.17g",d);
2022 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2023 strlen(buf),buf));
2024 }
2025
2026 static void addReplyBulkLen(redisClient *c, robj *obj) {
2027 size_t len;
2028
2029 if (obj->encoding == REDIS_ENCODING_RAW) {
2030 len = sdslen(obj->ptr);
2031 } else {
2032 long n = (long)obj->ptr;
2033
2034 len = 1;
2035 if (n < 0) {
2036 len++;
2037 n = -n;
2038 }
2039 while((n = n/10) != 0) {
2040 len++;
2041 }
2042 }
2043 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",len));
2044 }
2045
2046 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2047 int cport, cfd;
2048 char cip[128];
2049 redisClient *c;
2050 REDIS_NOTUSED(el);
2051 REDIS_NOTUSED(mask);
2052 REDIS_NOTUSED(privdata);
2053
2054 cfd = anetAccept(server.neterr, fd, cip, &cport);
2055 if (cfd == AE_ERR) {
2056 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2057 return;
2058 }
2059 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2060 if ((c = createClient(cfd)) == NULL) {
2061 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2062 close(cfd); /* May be already closed, just ingore errors */
2063 return;
2064 }
2065 /* If maxclient directive is set and this is one client more... close the
2066 * connection. Note that we create the client instead to check before
2067 * for this condition, since now the socket is already set in nonblocking
2068 * mode and we can send an error for free using the Kernel I/O */
2069 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2070 char *err = "-ERR max number of clients reached\r\n";
2071
2072 /* That's a best effort error message, don't check write errors */
2073 if (write(c->fd,err,strlen(err)) == -1) {
2074 /* Nothing to do, Just to avoid the warning... */
2075 }
2076 freeClient(c);
2077 return;
2078 }
2079 server.stat_numconnections++;
2080 }
2081
2082 /* ======================= Redis objects implementation ===================== */
2083
2084 static robj *createObject(int type, void *ptr) {
2085 robj *o;
2086
2087 if (listLength(server.objfreelist)) {
2088 listNode *head = listFirst(server.objfreelist);
2089 o = listNodeValue(head);
2090 listDelNode(server.objfreelist,head);
2091 } else {
2092 o = zmalloc(sizeof(*o));
2093 }
2094 o->type = type;
2095 o->encoding = REDIS_ENCODING_RAW;
2096 o->ptr = ptr;
2097 o->refcount = 1;
2098 return o;
2099 }
2100
2101 static robj *createStringObject(char *ptr, size_t len) {
2102 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2103 }
2104
2105 static robj *createListObject(void) {
2106 list *l = listCreate();
2107
2108 listSetFreeMethod(l,decrRefCount);
2109 return createObject(REDIS_LIST,l);
2110 }
2111
2112 static robj *createSetObject(void) {
2113 dict *d = dictCreate(&setDictType,NULL);
2114 return createObject(REDIS_SET,d);
2115 }
2116
2117 static robj *createZsetObject(void) {
2118 zset *zs = zmalloc(sizeof(*zs));
2119
2120 zs->dict = dictCreate(&zsetDictType,NULL);
2121 zs->zsl = zslCreate();
2122 return createObject(REDIS_ZSET,zs);
2123 }
2124
2125 static void freeStringObject(robj *o) {
2126 if (o->encoding == REDIS_ENCODING_RAW) {
2127 sdsfree(o->ptr);
2128 }
2129 }
2130
2131 static void freeListObject(robj *o) {
2132 listRelease((list*) o->ptr);
2133 }
2134
2135 static void freeSetObject(robj *o) {
2136 dictRelease((dict*) o->ptr);
2137 }
2138
2139 static void freeZsetObject(robj *o) {
2140 zset *zs = o->ptr;
2141
2142 dictRelease(zs->dict);
2143 zslFree(zs->zsl);
2144 zfree(zs);
2145 }
2146
2147 static void freeHashObject(robj *o) {
2148 dictRelease((dict*) o->ptr);
2149 }
2150
2151 static void incrRefCount(robj *o) {
2152 o->refcount++;
2153 #ifdef DEBUG_REFCOUNT
2154 if (o->type == REDIS_STRING)
2155 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2156 #endif
2157 }
2158
2159 static void decrRefCount(void *obj) {
2160 robj *o = obj;
2161
2162 #ifdef DEBUG_REFCOUNT
2163 if (o->type == REDIS_STRING)
2164 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2165 #endif
2166 if (--(o->refcount) == 0) {
2167 switch(o->type) {
2168 case REDIS_STRING: freeStringObject(o); break;
2169 case REDIS_LIST: freeListObject(o); break;
2170 case REDIS_SET: freeSetObject(o); break;
2171 case REDIS_ZSET: freeZsetObject(o); break;
2172 case REDIS_HASH: freeHashObject(o); break;
2173 default: redisAssert(0 != 0); break;
2174 }
2175 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2176 !listAddNodeHead(server.objfreelist,o))
2177 zfree(o);
2178 }
2179 }
2180
2181 static robj *lookupKey(redisDb *db, robj *key) {
2182 dictEntry *de = dictFind(db->dict,key);
2183 return de ? dictGetEntryVal(de) : NULL;
2184 }
2185
2186 static robj *lookupKeyRead(redisDb *db, robj *key) {
2187 expireIfNeeded(db,key);
2188 return lookupKey(db,key);
2189 }
2190
2191 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2192 deleteIfVolatile(db,key);
2193 return lookupKey(db,key);
2194 }
2195
2196 static int deleteKey(redisDb *db, robj *key) {
2197 int retval;
2198
2199 /* We need to protect key from destruction: after the first dictDelete()
2200 * it may happen that 'key' is no longer valid if we don't increment
2201 * it's count. This may happen when we get the object reference directly
2202 * from the hash table with dictRandomKey() or dict iterators */
2203 incrRefCount(key);
2204 if (dictSize(db->expires)) dictDelete(db->expires,key);
2205 retval = dictDelete(db->dict,key);
2206 decrRefCount(key);
2207
2208 return retval == DICT_OK;
2209 }
2210
2211 /* Try to share an object against the shared objects pool */
2212 static robj *tryObjectSharing(robj *o) {
2213 struct dictEntry *de;
2214 unsigned long c;
2215
2216 if (o == NULL || server.shareobjects == 0) return o;
2217
2218 redisAssert(o->type == REDIS_STRING);
2219 de = dictFind(server.sharingpool,o);
2220 if (de) {
2221 robj *shared = dictGetEntryKey(de);
2222
2223 c = ((unsigned long) dictGetEntryVal(de))+1;
2224 dictGetEntryVal(de) = (void*) c;
2225 incrRefCount(shared);
2226 decrRefCount(o);
2227 return shared;
2228 } else {
2229 /* Here we are using a stream algorihtm: Every time an object is
2230 * shared we increment its count, everytime there is a miss we
2231 * recrement the counter of a random object. If this object reaches
2232 * zero we remove the object and put the current object instead. */
2233 if (dictSize(server.sharingpool) >=
2234 server.sharingpoolsize) {
2235 de = dictGetRandomKey(server.sharingpool);
2236 redisAssert(de != NULL);
2237 c = ((unsigned long) dictGetEntryVal(de))-1;
2238 dictGetEntryVal(de) = (void*) c;
2239 if (c == 0) {
2240 dictDelete(server.sharingpool,de->key);
2241 }
2242 } else {
2243 c = 0; /* If the pool is empty we want to add this object */
2244 }
2245 if (c == 0) {
2246 int retval;
2247
2248 retval = dictAdd(server.sharingpool,o,(void*)1);
2249 redisAssert(retval == DICT_OK);
2250 incrRefCount(o);
2251 }
2252 return o;
2253 }
2254 }
2255
2256 /* Check if the nul-terminated string 's' can be represented by a long
2257 * (that is, is a number that fits into long without any other space or
2258 * character before or after the digits).
2259 *
2260 * If so, the function returns REDIS_OK and *longval is set to the value
2261 * of the number. Otherwise REDIS_ERR is returned */
2262 static int isStringRepresentableAsLong(sds s, long *longval) {
2263 char buf[32], *endptr;
2264 long value;
2265 int slen;
2266
2267 value = strtol(s, &endptr, 10);
2268 if (endptr[0] != '\0') return REDIS_ERR;
2269 slen = snprintf(buf,32,"%ld",value);
2270
2271 /* If the number converted back into a string is not identical
2272 * then it's not possible to encode the string as integer */
2273 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2274 if (longval) *longval = value;
2275 return REDIS_OK;
2276 }
2277
2278 /* Try to encode a string object in order to save space */
2279 static int tryObjectEncoding(robj *o) {
2280 long value;
2281 sds s = o->ptr;
2282
2283 if (o->encoding != REDIS_ENCODING_RAW)
2284 return REDIS_ERR; /* Already encoded */
2285
2286 /* It's not save to encode shared objects: shared objects can be shared
2287 * everywhere in the "object space" of Redis. Encoded objects can only
2288 * appear as "values" (and not, for instance, as keys) */
2289 if (o->refcount > 1) return REDIS_ERR;
2290
2291 /* Currently we try to encode only strings */
2292 redisAssert(o->type == REDIS_STRING);
2293
2294 /* Check if we can represent this string as a long integer */
2295 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2296
2297 /* Ok, this object can be encoded */
2298 o->encoding = REDIS_ENCODING_INT;
2299 sdsfree(o->ptr);
2300 o->ptr = (void*) value;
2301 return REDIS_OK;
2302 }
2303
2304 /* Get a decoded version of an encoded object (returned as a new object).
2305 * If the object is already raw-encoded just increment the ref count. */
2306 static robj *getDecodedObject(robj *o) {
2307 robj *dec;
2308
2309 if (o->encoding == REDIS_ENCODING_RAW) {
2310 incrRefCount(o);
2311 return o;
2312 }
2313 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2314 char buf[32];
2315
2316 snprintf(buf,32,"%ld",(long)o->ptr);
2317 dec = createStringObject(buf,strlen(buf));
2318 return dec;
2319 } else {
2320 redisAssert(1 != 1);
2321 }
2322 }
2323
2324 /* Compare two string objects via strcmp() or alike.
2325 * Note that the objects may be integer-encoded. In such a case we
2326 * use snprintf() to get a string representation of the numbers on the stack
2327 * and compare the strings, it's much faster than calling getDecodedObject().
2328 *
2329 * Important note: if objects are not integer encoded, but binary-safe strings,
2330 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2331 * binary safe. */
2332 static int compareStringObjects(robj *a, robj *b) {
2333 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2334 char bufa[128], bufb[128], *astr, *bstr;
2335 int bothsds = 1;
2336
2337 if (a == b) return 0;
2338 if (a->encoding != REDIS_ENCODING_RAW) {
2339 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2340 astr = bufa;
2341 bothsds = 0;
2342 } else {
2343 astr = a->ptr;
2344 }
2345 if (b->encoding != REDIS_ENCODING_RAW) {
2346 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2347 bstr = bufb;
2348 bothsds = 0;
2349 } else {
2350 bstr = b->ptr;
2351 }
2352 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2353 }
2354
2355 static size_t stringObjectLen(robj *o) {
2356 redisAssert(o->type == REDIS_STRING);
2357 if (o->encoding == REDIS_ENCODING_RAW) {
2358 return sdslen(o->ptr);
2359 } else {
2360 char buf[32];
2361
2362 return snprintf(buf,32,"%ld",(long)o->ptr);
2363 }
2364 }
2365
2366 /*============================ DB saving/loading ============================ */
2367
2368 static int rdbSaveType(FILE *fp, unsigned char type) {
2369 if (fwrite(&type,1,1,fp) == 0) return -1;
2370 return 0;
2371 }
2372
2373 static int rdbSaveTime(FILE *fp, time_t t) {
2374 int32_t t32 = (int32_t) t;
2375 if (fwrite(&t32,4,1,fp) == 0) return -1;
2376 return 0;
2377 }
2378
2379 /* check rdbLoadLen() comments for more info */
2380 static int rdbSaveLen(FILE *fp, uint32_t len) {
2381 unsigned char buf[2];
2382
2383 if (len < (1<<6)) {
2384 /* Save a 6 bit len */
2385 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2386 if (fwrite(buf,1,1,fp) == 0) return -1;
2387 } else if (len < (1<<14)) {
2388 /* Save a 14 bit len */
2389 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2390 buf[1] = len&0xFF;
2391 if (fwrite(buf,2,1,fp) == 0) return -1;
2392 } else {
2393 /* Save a 32 bit len */
2394 buf[0] = (REDIS_RDB_32BITLEN<<6);
2395 if (fwrite(buf,1,1,fp) == 0) return -1;
2396 len = htonl(len);
2397 if (fwrite(&len,4,1,fp) == 0) return -1;
2398 }
2399 return 0;
2400 }
2401
2402 /* String objects in the form "2391" "-100" without any space and with a
2403 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2404 * encoded as integers to save space */
2405 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2406 long long value;
2407 char *endptr, buf[32];
2408
2409 /* Check if it's possible to encode this value as a number */
2410 value = strtoll(s, &endptr, 10);
2411 if (endptr[0] != '\0') return 0;
2412 snprintf(buf,32,"%lld",value);
2413
2414 /* If the number converted back into a string is not identical
2415 * then it's not possible to encode the string as integer */
2416 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2417
2418 /* Finally check if it fits in our ranges */
2419 if (value >= -(1<<7) && value <= (1<<7)-1) {
2420 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2421 enc[1] = value&0xFF;
2422 return 2;
2423 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2424 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2425 enc[1] = value&0xFF;
2426 enc[2] = (value>>8)&0xFF;
2427 return 3;
2428 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2429 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2430 enc[1] = value&0xFF;
2431 enc[2] = (value>>8)&0xFF;
2432 enc[3] = (value>>16)&0xFF;
2433 enc[4] = (value>>24)&0xFF;
2434 return 5;
2435 } else {
2436 return 0;
2437 }
2438 }
2439
2440 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2441 unsigned int comprlen, outlen;
2442 unsigned char byte;
2443 void *out;
2444
2445 /* We require at least four bytes compression for this to be worth it */
2446 outlen = sdslen(obj->ptr)-4;
2447 if (outlen <= 0) return 0;
2448 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2449 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2450 if (comprlen == 0) {
2451 zfree(out);
2452 return 0;
2453 }
2454 /* Data compressed! Let's save it on disk */
2455 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2456 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2457 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2458 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2459 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2460 zfree(out);
2461 return comprlen;
2462
2463 writeerr:
2464 zfree(out);
2465 return -1;
2466 }
2467
2468 /* Save a string objet as [len][data] on disk. If the object is a string
2469 * representation of an integer value we try to safe it in a special form */
2470 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2471 size_t len;
2472 int enclen;
2473
2474 len = sdslen(obj->ptr);
2475
2476 /* Try integer encoding */
2477 if (len <= 11) {
2478 unsigned char buf[5];
2479 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2480 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2481 return 0;
2482 }
2483 }
2484
2485 /* Try LZF compression - under 20 bytes it's unable to compress even
2486 * aaaaaaaaaaaaaaaaaa so skip it */
2487 if (len > 20) {
2488 int retval;
2489
2490 retval = rdbSaveLzfStringObject(fp,obj);
2491 if (retval == -1) return -1;
2492 if (retval > 0) return 0;
2493 /* retval == 0 means data can't be compressed, save the old way */
2494 }
2495
2496 /* Store verbatim */
2497 if (rdbSaveLen(fp,len) == -1) return -1;
2498 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2499 return 0;
2500 }
2501
2502 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2503 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2504 int retval;
2505
2506 obj = getDecodedObject(obj);
2507 retval = rdbSaveStringObjectRaw(fp,obj);
2508 decrRefCount(obj);
2509 return retval;
2510 }
2511
2512 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2513 * 8 bit integer specifing the length of the representation.
2514 * This 8 bit integer has special values in order to specify the following
2515 * conditions:
2516 * 253: not a number
2517 * 254: + inf
2518 * 255: - inf
2519 */
2520 static int rdbSaveDoubleValue(FILE *fp, double val) {
2521 unsigned char buf[128];
2522 int len;
2523
2524 if (isnan(val)) {
2525 buf[0] = 253;
2526 len = 1;
2527 } else if (!isfinite(val)) {
2528 len = 1;
2529 buf[0] = (val < 0) ? 255 : 254;
2530 } else {
2531 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2532 buf[0] = strlen((char*)buf+1);
2533 len = buf[0]+1;
2534 }
2535 if (fwrite(buf,len,1,fp) == 0) return -1;
2536 return 0;
2537 }
2538
2539 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2540 static int rdbSave(char *filename) {
2541 dictIterator *di = NULL;
2542 dictEntry *de;
2543 FILE *fp;
2544 char tmpfile[256];
2545 int j;
2546 time_t now = time(NULL);
2547
2548 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2549 fp = fopen(tmpfile,"w");
2550 if (!fp) {
2551 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2552 return REDIS_ERR;
2553 }
2554 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2555 for (j = 0; j < server.dbnum; j++) {
2556 redisDb *db = server.db+j;
2557 dict *d = db->dict;
2558 if (dictSize(d) == 0) continue;
2559 di = dictGetIterator(d);
2560 if (!di) {
2561 fclose(fp);
2562 return REDIS_ERR;
2563 }
2564
2565 /* Write the SELECT DB opcode */
2566 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2567 if (rdbSaveLen(fp,j) == -1) goto werr;
2568
2569 /* Iterate this DB writing every entry */
2570 while((de = dictNext(di)) != NULL) {
2571 robj *key = dictGetEntryKey(de);
2572 robj *o = dictGetEntryVal(de);
2573 time_t expiretime = getExpire(db,key);
2574
2575 /* Save the expire time */
2576 if (expiretime != -1) {
2577 /* If this key is already expired skip it */
2578 if (expiretime < now) continue;
2579 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2580 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2581 }
2582 /* Save the key and associated value */
2583 if (rdbSaveType(fp,o->type) == -1) goto werr;
2584 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2585 if (o->type == REDIS_STRING) {
2586 /* Save a string value */
2587 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2588 } else if (o->type == REDIS_LIST) {
2589 /* Save a list value */
2590 list *list = o->ptr;
2591 listNode *ln;
2592
2593 listRewind(list);
2594 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2595 while((ln = listYield(list))) {
2596 robj *eleobj = listNodeValue(ln);
2597
2598 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2599 }
2600 } else if (o->type == REDIS_SET) {
2601 /* Save a set value */
2602 dict *set = o->ptr;
2603 dictIterator *di = dictGetIterator(set);
2604 dictEntry *de;
2605
2606 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2607 while((de = dictNext(di)) != NULL) {
2608 robj *eleobj = dictGetEntryKey(de);
2609
2610 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2611 }
2612 dictReleaseIterator(di);
2613 } else if (o->type == REDIS_ZSET) {
2614 /* Save a set value */
2615 zset *zs = o->ptr;
2616 dictIterator *di = dictGetIterator(zs->dict);
2617 dictEntry *de;
2618
2619 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2620 while((de = dictNext(di)) != NULL) {
2621 robj *eleobj = dictGetEntryKey(de);
2622 double *score = dictGetEntryVal(de);
2623
2624 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2625 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2626 }
2627 dictReleaseIterator(di);
2628 } else {
2629 redisAssert(0 != 0);
2630 }
2631 }
2632 dictReleaseIterator(di);
2633 }
2634 /* EOF opcode */
2635 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2636
2637 /* Make sure data will not remain on the OS's output buffers */
2638 fflush(fp);
2639 fsync(fileno(fp));
2640 fclose(fp);
2641
2642 /* Use RENAME to make sure the DB file is changed atomically only
2643 * if the generate DB file is ok. */
2644 if (rename(tmpfile,filename) == -1) {
2645 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2646 unlink(tmpfile);
2647 return REDIS_ERR;
2648 }
2649 redisLog(REDIS_NOTICE,"DB saved on disk");
2650 server.dirty = 0;
2651 server.lastsave = time(NULL);
2652 return REDIS_OK;
2653
2654 werr:
2655 fclose(fp);
2656 unlink(tmpfile);
2657 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2658 if (di) dictReleaseIterator(di);
2659 return REDIS_ERR;
2660 }
2661
2662 static int rdbSaveBackground(char *filename) {
2663 pid_t childpid;
2664
2665 if (server.bgsavechildpid != -1) return REDIS_ERR;
2666 if ((childpid = fork()) == 0) {
2667 /* Child */
2668 close(server.fd);
2669 if (rdbSave(filename) == REDIS_OK) {
2670 exit(0);
2671 } else {
2672 exit(1);
2673 }
2674 } else {
2675 /* Parent */
2676 if (childpid == -1) {
2677 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2678 strerror(errno));
2679 return REDIS_ERR;
2680 }
2681 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2682 server.bgsavechildpid = childpid;
2683 return REDIS_OK;
2684 }
2685 return REDIS_OK; /* unreached */
2686 }
2687
2688 static void rdbRemoveTempFile(pid_t childpid) {
2689 char tmpfile[256];
2690
2691 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2692 unlink(tmpfile);
2693 }
2694
2695 static int rdbLoadType(FILE *fp) {
2696 unsigned char type;
2697 if (fread(&type,1,1,fp) == 0) return -1;
2698 return type;
2699 }
2700
2701 static time_t rdbLoadTime(FILE *fp) {
2702 int32_t t32;
2703 if (fread(&t32,4,1,fp) == 0) return -1;
2704 return (time_t) t32;
2705 }
2706
2707 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2708 * of this file for a description of how this are stored on disk.
2709 *
2710 * isencoded is set to 1 if the readed length is not actually a length but
2711 * an "encoding type", check the above comments for more info */
2712 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2713 unsigned char buf[2];
2714 uint32_t len;
2715
2716 if (isencoded) *isencoded = 0;
2717 if (rdbver == 0) {
2718 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2719 return ntohl(len);
2720 } else {
2721 int type;
2722
2723 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2724 type = (buf[0]&0xC0)>>6;
2725 if (type == REDIS_RDB_6BITLEN) {
2726 /* Read a 6 bit len */
2727 return buf[0]&0x3F;
2728 } else if (type == REDIS_RDB_ENCVAL) {
2729 /* Read a 6 bit len encoding type */
2730 if (isencoded) *isencoded = 1;
2731 return buf[0]&0x3F;
2732 } else if (type == REDIS_RDB_14BITLEN) {
2733 /* Read a 14 bit len */
2734 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2735 return ((buf[0]&0x3F)<<8)|buf[1];
2736 } else {
2737 /* Read a 32 bit len */
2738 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2739 return ntohl(len);
2740 }
2741 }
2742 }
2743
2744 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2745 unsigned char enc[4];
2746 long long val;
2747
2748 if (enctype == REDIS_RDB_ENC_INT8) {
2749 if (fread(enc,1,1,fp) == 0) return NULL;
2750 val = (signed char)enc[0];
2751 } else if (enctype == REDIS_RDB_ENC_INT16) {
2752 uint16_t v;
2753 if (fread(enc,2,1,fp) == 0) return NULL;
2754 v = enc[0]|(enc[1]<<8);
2755 val = (int16_t)v;
2756 } else if (enctype == REDIS_RDB_ENC_INT32) {
2757 uint32_t v;
2758 if (fread(enc,4,1,fp) == 0) return NULL;
2759 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2760 val = (int32_t)v;
2761 } else {
2762 val = 0; /* anti-warning */
2763 redisAssert(0!=0);
2764 }
2765 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2766 }
2767
2768 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2769 unsigned int len, clen;
2770 unsigned char *c = NULL;
2771 sds val = NULL;
2772
2773 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2774 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2775 if ((c = zmalloc(clen)) == NULL) goto err;
2776 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2777 if (fread(c,clen,1,fp) == 0) goto err;
2778 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2779 zfree(c);
2780 return createObject(REDIS_STRING,val);
2781 err:
2782 zfree(c);
2783 sdsfree(val);
2784 return NULL;
2785 }
2786
2787 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2788 int isencoded;
2789 uint32_t len;
2790 sds val;
2791
2792 len = rdbLoadLen(fp,rdbver,&isencoded);
2793 if (isencoded) {
2794 switch(len) {
2795 case REDIS_RDB_ENC_INT8:
2796 case REDIS_RDB_ENC_INT16:
2797 case REDIS_RDB_ENC_INT32:
2798 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2799 case REDIS_RDB_ENC_LZF:
2800 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2801 default:
2802 redisAssert(0!=0);
2803 }
2804 }
2805
2806 if (len == REDIS_RDB_LENERR) return NULL;
2807 val = sdsnewlen(NULL,len);
2808 if (len && fread(val,len,1,fp) == 0) {
2809 sdsfree(val);
2810 return NULL;
2811 }
2812 return tryObjectSharing(createObject(REDIS_STRING,val));
2813 }
2814
2815 /* For information about double serialization check rdbSaveDoubleValue() */
2816 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2817 char buf[128];
2818 unsigned char len;
2819
2820 if (fread(&len,1,1,fp) == 0) return -1;
2821 switch(len) {
2822 case 255: *val = R_NegInf; return 0;
2823 case 254: *val = R_PosInf; return 0;
2824 case 253: *val = R_Nan; return 0;
2825 default:
2826 if (fread(buf,len,1,fp) == 0) return -1;
2827 sscanf(buf, "%lg", val);
2828 return 0;
2829 }
2830 }
2831
2832 static int rdbLoad(char *filename) {
2833 FILE *fp;
2834 robj *keyobj = NULL;
2835 uint32_t dbid;
2836 int type, retval, rdbver;
2837 dict *d = server.db[0].dict;
2838 redisDb *db = server.db+0;
2839 char buf[1024];
2840 time_t expiretime = -1, now = time(NULL);
2841
2842 fp = fopen(filename,"r");
2843 if (!fp) return REDIS_ERR;
2844 if (fread(buf,9,1,fp) == 0) goto eoferr;
2845 buf[9] = '\0';
2846 if (memcmp(buf,"REDIS",5) != 0) {
2847 fclose(fp);
2848 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2849 return REDIS_ERR;
2850 }
2851 rdbver = atoi(buf+5);
2852 if (rdbver > 1) {
2853 fclose(fp);
2854 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2855 return REDIS_ERR;
2856 }
2857 while(1) {
2858 robj *o;
2859
2860 /* Read type. */
2861 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2862 if (type == REDIS_EXPIRETIME) {
2863 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2864 /* We read the time so we need to read the object type again */
2865 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2866 }
2867 if (type == REDIS_EOF) break;
2868 /* Handle SELECT DB opcode as a special case */
2869 if (type == REDIS_SELECTDB) {
2870 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2871 goto eoferr;
2872 if (dbid >= (unsigned)server.dbnum) {
2873 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2874 exit(1);
2875 }
2876 db = server.db+dbid;
2877 d = db->dict;
2878 continue;
2879 }
2880 /* Read key */
2881 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2882
2883 if (type == REDIS_STRING) {
2884 /* Read string value */
2885 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2886 tryObjectEncoding(o);
2887 } else if (type == REDIS_LIST || type == REDIS_SET) {
2888 /* Read list/set value */
2889 uint32_t listlen;
2890
2891 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2892 goto eoferr;
2893 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2894 /* Load every single element of the list/set */
2895 while(listlen--) {
2896 robj *ele;
2897
2898 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2899 tryObjectEncoding(ele);
2900 if (type == REDIS_LIST) {
2901 listAddNodeTail((list*)o->ptr,ele);
2902 } else {
2903 dictAdd((dict*)o->ptr,ele,NULL);
2904 }
2905 }
2906 } else if (type == REDIS_ZSET) {
2907 /* Read list/set value */
2908 uint32_t zsetlen;
2909 zset *zs;
2910
2911 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2912 goto eoferr;
2913 o = createZsetObject();
2914 zs = o->ptr;
2915 /* Load every single element of the list/set */
2916 while(zsetlen--) {
2917 robj *ele;
2918 double *score = zmalloc(sizeof(double));
2919
2920 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2921 tryObjectEncoding(ele);
2922 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2923 dictAdd(zs->dict,ele,score);
2924 zslInsert(zs->zsl,*score,ele);
2925 incrRefCount(ele); /* added to skiplist */
2926 }
2927 } else {
2928 redisAssert(0 != 0);
2929 }
2930 /* Add the new object in the hash table */
2931 retval = dictAdd(d,keyobj,o);
2932 if (retval == DICT_ERR) {
2933 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2934 exit(1);
2935 }
2936 /* Set the expire time if needed */
2937 if (expiretime != -1) {
2938 setExpire(db,keyobj,expiretime);
2939 /* Delete this key if already expired */
2940 if (expiretime < now) deleteKey(db,keyobj);
2941 expiretime = -1;
2942 }
2943 keyobj = o = NULL;
2944 }
2945 fclose(fp);
2946 return REDIS_OK;
2947
2948 eoferr: /* unexpected end of file is handled here with a fatal exit */
2949 if (keyobj) decrRefCount(keyobj);
2950 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2951 exit(1);
2952 return REDIS_ERR; /* Just to avoid warning */
2953 }
2954
2955 /*================================== Commands =============================== */
2956
2957 static void authCommand(redisClient *c) {
2958 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2959 c->authenticated = 1;
2960 addReply(c,shared.ok);
2961 } else {
2962 c->authenticated = 0;
2963 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2964 }
2965 }
2966
2967 static void pingCommand(redisClient *c) {
2968 addReply(c,shared.pong);
2969 }
2970
2971 static void echoCommand(redisClient *c) {
2972 addReplyBulkLen(c,c->argv[1]);
2973 addReply(c,c->argv[1]);
2974 addReply(c,shared.crlf);
2975 }
2976
2977 /*=================================== Strings =============================== */
2978
2979 static void setGenericCommand(redisClient *c, int nx) {
2980 int retval;
2981
2982 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2983 if (retval == DICT_ERR) {
2984 if (!nx) {
2985 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2986 incrRefCount(c->argv[2]);
2987 } else {
2988 addReply(c,shared.czero);
2989 return;
2990 }
2991 } else {
2992 incrRefCount(c->argv[1]);
2993 incrRefCount(c->argv[2]);
2994 }
2995 server.dirty++;
2996 removeExpire(c->db,c->argv[1]);
2997 addReply(c, nx ? shared.cone : shared.ok);
2998 }
2999
3000 static void setCommand(redisClient *c) {
3001 setGenericCommand(c,0);
3002 }
3003
3004 static void setnxCommand(redisClient *c) {
3005 setGenericCommand(c,1);
3006 }
3007
3008 static void getCommand(redisClient *c) {
3009 robj *o = lookupKeyRead(c->db,c->argv[1]);
3010
3011 if (o == NULL) {
3012 addReply(c,shared.nullbulk);
3013 } else {
3014 if (o->type != REDIS_STRING) {
3015 addReply(c,shared.wrongtypeerr);
3016 } else {
3017 addReplyBulkLen(c,o);
3018 addReply(c,o);
3019 addReply(c,shared.crlf);
3020 }
3021 }
3022 }
3023
3024 static void getsetCommand(redisClient *c) {
3025 getCommand(c);
3026 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3027 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3028 } else {
3029 incrRefCount(c->argv[1]);
3030 }
3031 incrRefCount(c->argv[2]);
3032 server.dirty++;
3033 removeExpire(c->db,c->argv[1]);
3034 }
3035
3036 static void mgetCommand(redisClient *c) {
3037 int j;
3038
3039 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3040 for (j = 1; j < c->argc; j++) {
3041 robj *o = lookupKeyRead(c->db,c->argv[j]);
3042 if (o == NULL) {
3043 addReply(c,shared.nullbulk);
3044 } else {
3045 if (o->type != REDIS_STRING) {
3046 addReply(c,shared.nullbulk);
3047 } else {
3048 addReplyBulkLen(c,o);
3049 addReply(c,o);
3050 addReply(c,shared.crlf);
3051 }
3052 }
3053 }
3054 }
3055
3056 static void msetGenericCommand(redisClient *c, int nx) {
3057 int j;
3058
3059 if ((c->argc % 2) == 0) {
3060 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
3061 return;
3062 }
3063 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3064 * set nothing at all if at least one already key exists. */
3065 if (nx) {
3066 for (j = 1; j < c->argc; j += 2) {
3067 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
3068 addReply(c, shared.czero);
3069 return;
3070 }
3071 }
3072 }
3073
3074 for (j = 1; j < c->argc; j += 2) {
3075 int retval;
3076
3077 tryObjectEncoding(c->argv[j+1]);
3078 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3079 if (retval == DICT_ERR) {
3080 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3081 incrRefCount(c->argv[j+1]);
3082 } else {
3083 incrRefCount(c->argv[j]);
3084 incrRefCount(c->argv[j+1]);
3085 }
3086 removeExpire(c->db,c->argv[j]);
3087 }
3088 server.dirty += (c->argc-1)/2;
3089 addReply(c, nx ? shared.cone : shared.ok);
3090 }
3091
3092 static void msetCommand(redisClient *c) {
3093 msetGenericCommand(c,0);
3094 }
3095
3096 static void msetnxCommand(redisClient *c) {
3097 msetGenericCommand(c,1);
3098 }
3099
3100 static void incrDecrCommand(redisClient *c, long long incr) {
3101 long long value;
3102 int retval;
3103 robj *o;
3104
3105 o = lookupKeyWrite(c->db,c->argv[1]);
3106 if (o == NULL) {
3107 value = 0;
3108 } else {
3109 if (o->type != REDIS_STRING) {
3110 value = 0;
3111 } else {
3112 char *eptr;
3113
3114 if (o->encoding == REDIS_ENCODING_RAW)
3115 value = strtoll(o->ptr, &eptr, 10);
3116 else if (o->encoding == REDIS_ENCODING_INT)
3117 value = (long)o->ptr;
3118 else
3119 redisAssert(1 != 1);
3120 }
3121 }
3122
3123 value += incr;
3124 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3125 tryObjectEncoding(o);
3126 retval = dictAdd(c->db->dict,c->argv[1],o);
3127 if (retval == DICT_ERR) {
3128 dictReplace(c->db->dict,c->argv[1],o);
3129 removeExpire(c->db,c->argv[1]);
3130 } else {
3131 incrRefCount(c->argv[1]);
3132 }
3133 server.dirty++;
3134 addReply(c,shared.colon);
3135 addReply(c,o);
3136 addReply(c,shared.crlf);
3137 }
3138
3139 static void incrCommand(redisClient *c) {
3140 incrDecrCommand(c,1);
3141 }
3142
3143 static void decrCommand(redisClient *c) {
3144 incrDecrCommand(c,-1);
3145 }
3146
3147 static void incrbyCommand(redisClient *c) {
3148 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3149 incrDecrCommand(c,incr);
3150 }
3151
3152 static void decrbyCommand(redisClient *c) {
3153 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3154 incrDecrCommand(c,-incr);
3155 }
3156
3157 /* ========================= Type agnostic commands ========================= */
3158
3159 static void delCommand(redisClient *c) {
3160 int deleted = 0, j;
3161
3162 for (j = 1; j < c->argc; j++) {
3163 if (deleteKey(c->db,c->argv[j])) {
3164 server.dirty++;
3165 deleted++;
3166 }
3167 }
3168 switch(deleted) {
3169 case 0:
3170 addReply(c,shared.czero);
3171 break;
3172 case 1:
3173 addReply(c,shared.cone);
3174 break;
3175 default:
3176 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3177 break;
3178 }
3179 }
3180
3181 static void existsCommand(redisClient *c) {
3182 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3183 }
3184
3185 static void selectCommand(redisClient *c) {
3186 int id = atoi(c->argv[1]->ptr);
3187
3188 if (selectDb(c,id) == REDIS_ERR) {
3189 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3190 } else {
3191 addReply(c,shared.ok);
3192 }
3193 }
3194
3195 static void randomkeyCommand(redisClient *c) {
3196 dictEntry *de;
3197
3198 while(1) {
3199 de = dictGetRandomKey(c->db->dict);
3200 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3201 }
3202 if (de == NULL) {
3203 addReply(c,shared.plus);
3204 addReply(c,shared.crlf);
3205 } else {
3206 addReply(c,shared.plus);
3207 addReply(c,dictGetEntryKey(de));
3208 addReply(c,shared.crlf);
3209 }
3210 }
3211
3212 static void keysCommand(redisClient *c) {
3213 dictIterator *di;
3214 dictEntry *de;
3215 sds pattern = c->argv[1]->ptr;
3216 int plen = sdslen(pattern);
3217 unsigned long numkeys = 0, keyslen = 0;
3218 robj *lenobj = createObject(REDIS_STRING,NULL);
3219
3220 di = dictGetIterator(c->db->dict);
3221 addReply(c,lenobj);
3222 decrRefCount(lenobj);
3223 while((de = dictNext(di)) != NULL) {
3224 robj *keyobj = dictGetEntryKey(de);
3225
3226 sds key = keyobj->ptr;
3227 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3228 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3229 if (expireIfNeeded(c->db,keyobj) == 0) {
3230 if (numkeys != 0)
3231 addReply(c,shared.space);
3232 addReply(c,keyobj);
3233 numkeys++;
3234 keyslen += sdslen(key);
3235 }
3236 }
3237 }
3238 dictReleaseIterator(di);
3239 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3240 addReply(c,shared.crlf);
3241 }
3242
3243 static void dbsizeCommand(redisClient *c) {
3244 addReplySds(c,
3245 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3246 }
3247
3248 static void lastsaveCommand(redisClient *c) {
3249 addReplySds(c,
3250 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3251 }
3252
3253 static void typeCommand(redisClient *c) {
3254 robj *o;
3255 char *type;
3256
3257 o = lookupKeyRead(c->db,c->argv[1]);
3258 if (o == NULL) {
3259 type = "+none";
3260 } else {
3261 switch(o->type) {
3262 case REDIS_STRING: type = "+string"; break;
3263 case REDIS_LIST: type = "+list"; break;
3264 case REDIS_SET: type = "+set"; break;
3265 case REDIS_ZSET: type = "+zset"; break;
3266 default: type = "unknown"; break;
3267 }
3268 }
3269 addReplySds(c,sdsnew(type));
3270 addReply(c,shared.crlf);
3271 }
3272
3273 static void saveCommand(redisClient *c) {
3274 if (server.bgsavechildpid != -1) {
3275 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3276 return;
3277 }
3278 if (rdbSave(server.dbfilename) == REDIS_OK) {
3279 addReply(c,shared.ok);
3280 } else {
3281 addReply(c,shared.err);
3282 }
3283 }
3284
3285 static void bgsaveCommand(redisClient *c) {
3286 if (server.bgsavechildpid != -1) {
3287 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3288 return;
3289 }
3290 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3291 addReply(c,shared.ok);
3292 } else {
3293 addReply(c,shared.err);
3294 }
3295 }
3296
3297 static void shutdownCommand(redisClient *c) {
3298 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3299 /* Kill the saving child if there is a background saving in progress.
3300 We want to avoid race conditions, for instance our saving child may
3301 overwrite the synchronous saving did by SHUTDOWN. */
3302 if (server.bgsavechildpid != -1) {
3303 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3304 kill(server.bgsavechildpid,SIGKILL);
3305 rdbRemoveTempFile(server.bgsavechildpid);
3306 }
3307 /* SYNC SAVE */
3308 if (rdbSave(server.dbfilename) == REDIS_OK) {
3309 if (server.daemonize)
3310 unlink(server.pidfile);
3311 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3312 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3313 exit(1);
3314 } else {
3315 /* Ooops.. error saving! The best we can do is to continue operating.
3316 * Note that if there was a background saving process, in the next
3317 * cron() Redis will be notified that the background saving aborted,
3318 * handling special stuff like slaves pending for synchronization... */
3319 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3320 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3321 }
3322 }
3323
3324 static void renameGenericCommand(redisClient *c, int nx) {
3325 robj *o;
3326
3327 /* To use the same key as src and dst is probably an error */
3328 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3329 addReply(c,shared.sameobjecterr);
3330 return;
3331 }
3332
3333 o = lookupKeyWrite(c->db,c->argv[1]);
3334 if (o == NULL) {
3335 addReply(c,shared.nokeyerr);
3336 return;
3337 }
3338 incrRefCount(o);
3339 deleteIfVolatile(c->db,c->argv[2]);
3340 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3341 if (nx) {
3342 decrRefCount(o);
3343 addReply(c,shared.czero);
3344 return;
3345 }
3346 dictReplace(c->db->dict,c->argv[2],o);
3347 } else {
3348 incrRefCount(c->argv[2]);
3349 }
3350 deleteKey(c->db,c->argv[1]);
3351 server.dirty++;
3352 addReply(c,nx ? shared.cone : shared.ok);
3353 }
3354
3355 static void renameCommand(redisClient *c) {
3356 renameGenericCommand(c,0);
3357 }
3358
3359 static void renamenxCommand(redisClient *c) {
3360 renameGenericCommand(c,1);
3361 }
3362
3363 static void moveCommand(redisClient *c) {
3364 robj *o;
3365 redisDb *src, *dst;
3366 int srcid;
3367
3368 /* Obtain source and target DB pointers */
3369 src = c->db;
3370 srcid = c->db->id;
3371 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3372 addReply(c,shared.outofrangeerr);
3373 return;
3374 }
3375 dst = c->db;
3376 selectDb(c,srcid); /* Back to the source DB */
3377
3378 /* If the user is moving using as target the same
3379 * DB as the source DB it is probably an error. */
3380 if (src == dst) {
3381 addReply(c,shared.sameobjecterr);
3382 return;
3383 }
3384
3385 /* Check if the element exists and get a reference */
3386 o = lookupKeyWrite(c->db,c->argv[1]);
3387 if (!o) {
3388 addReply(c,shared.czero);
3389 return;
3390 }
3391
3392 /* Try to add the element to the target DB */
3393 deleteIfVolatile(dst,c->argv[1]);
3394 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3395 addReply(c,shared.czero);
3396 return;
3397 }
3398 incrRefCount(c->argv[1]);
3399 incrRefCount(o);
3400
3401 /* OK! key moved, free the entry in the source DB */
3402 deleteKey(src,c->argv[1]);
3403 server.dirty++;
3404 addReply(c,shared.cone);
3405 }
3406
3407 /* =================================== Lists ================================ */
3408 static void pushGenericCommand(redisClient *c, int where) {
3409 robj *lobj;
3410 list *list;
3411
3412 lobj = lookupKeyWrite(c->db,c->argv[1]);
3413 if (lobj == NULL) {
3414 lobj = createListObject();
3415 list = lobj->ptr;
3416 if (where == REDIS_HEAD) {
3417 listAddNodeHead(list,c->argv[2]);
3418 } else {
3419 listAddNodeTail(list,c->argv[2]);
3420 }
3421 dictAdd(c->db->dict,c->argv[1],lobj);
3422 incrRefCount(c->argv[1]);
3423 incrRefCount(c->argv[2]);
3424 } else {
3425 if (lobj->type != REDIS_LIST) {
3426 addReply(c,shared.wrongtypeerr);
3427 return;
3428 }
3429 list = lobj->ptr;
3430 if (where == REDIS_HEAD) {
3431 listAddNodeHead(list,c->argv[2]);
3432 } else {
3433 listAddNodeTail(list,c->argv[2]);
3434 }
3435 incrRefCount(c->argv[2]);
3436 }
3437 server.dirty++;
3438 addReply(c,shared.ok);
3439 }
3440
3441 static void lpushCommand(redisClient *c) {
3442 pushGenericCommand(c,REDIS_HEAD);
3443 }
3444
3445 static void rpushCommand(redisClient *c) {
3446 pushGenericCommand(c,REDIS_TAIL);
3447 }
3448
3449 static void llenCommand(redisClient *c) {
3450 robj *o;
3451 list *l;
3452
3453 o = lookupKeyRead(c->db,c->argv[1]);
3454 if (o == NULL) {
3455 addReply(c,shared.czero);
3456 return;
3457 } else {
3458 if (o->type != REDIS_LIST) {
3459 addReply(c,shared.wrongtypeerr);
3460 } else {
3461 l = o->ptr;
3462 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3463 }
3464 }
3465 }
3466
3467 static void lindexCommand(redisClient *c) {
3468 robj *o;
3469 int index = atoi(c->argv[2]->ptr);
3470
3471 o = lookupKeyRead(c->db,c->argv[1]);
3472 if (o == NULL) {
3473 addReply(c,shared.nullbulk);
3474 } else {
3475 if (o->type != REDIS_LIST) {
3476 addReply(c,shared.wrongtypeerr);
3477 } else {
3478 list *list = o->ptr;
3479 listNode *ln;
3480
3481 ln = listIndex(list, index);
3482 if (ln == NULL) {
3483 addReply(c,shared.nullbulk);
3484 } else {
3485 robj *ele = listNodeValue(ln);
3486 addReplyBulkLen(c,ele);
3487 addReply(c,ele);
3488 addReply(c,shared.crlf);
3489 }
3490 }
3491 }
3492 }
3493
3494 static void lsetCommand(redisClient *c) {
3495 robj *o;
3496 int index = atoi(c->argv[2]->ptr);
3497
3498 o = lookupKeyWrite(c->db,c->argv[1]);
3499 if (o == NULL) {
3500 addReply(c,shared.nokeyerr);
3501 } else {
3502 if (o->type != REDIS_LIST) {
3503 addReply(c,shared.wrongtypeerr);
3504 } else {
3505 list *list = o->ptr;
3506 listNode *ln;
3507
3508 ln = listIndex(list, index);
3509 if (ln == NULL) {
3510 addReply(c,shared.outofrangeerr);
3511 } else {
3512 robj *ele = listNodeValue(ln);
3513
3514 decrRefCount(ele);
3515 listNodeValue(ln) = c->argv[3];
3516 incrRefCount(c->argv[3]);
3517 addReply(c,shared.ok);
3518 server.dirty++;
3519 }
3520 }
3521 }
3522 }
3523
3524 static void popGenericCommand(redisClient *c, int where) {
3525 robj *o;
3526
3527 o = lookupKeyWrite(c->db,c->argv[1]);
3528 if (o == NULL) {
3529 addReply(c,shared.nullbulk);
3530 } else {
3531 if (o->type != REDIS_LIST) {
3532 addReply(c,shared.wrongtypeerr);
3533 } else {
3534 list *list = o->ptr;
3535 listNode *ln;
3536
3537 if (where == REDIS_HEAD)
3538 ln = listFirst(list);
3539 else
3540 ln = listLast(list);
3541
3542 if (ln == NULL) {
3543 addReply(c,shared.nullbulk);
3544 } else {
3545 robj *ele = listNodeValue(ln);
3546 addReplyBulkLen(c,ele);
3547 addReply(c,ele);
3548 addReply(c,shared.crlf);
3549 listDelNode(list,ln);
3550 server.dirty++;
3551 }
3552 }
3553 }
3554 }
3555
3556 static void lpopCommand(redisClient *c) {
3557 popGenericCommand(c,REDIS_HEAD);
3558 }
3559
3560 static void rpopCommand(redisClient *c) {
3561 popGenericCommand(c,REDIS_TAIL);
3562 }
3563
3564 static void lrangeCommand(redisClient *c) {
3565 robj *o;
3566 int start = atoi(c->argv[2]->ptr);
3567 int end = atoi(c->argv[3]->ptr);
3568
3569 o = lookupKeyRead(c->db,c->argv[1]);
3570 if (o == NULL) {
3571 addReply(c,shared.nullmultibulk);
3572 } else {
3573 if (o->type != REDIS_LIST) {
3574 addReply(c,shared.wrongtypeerr);
3575 } else {
3576 list *list = o->ptr;
3577 listNode *ln;
3578 int llen = listLength(list);
3579 int rangelen, j;
3580 robj *ele;
3581
3582 /* convert negative indexes */
3583 if (start < 0) start = llen+start;
3584 if (end < 0) end = llen+end;
3585 if (start < 0) start = 0;
3586 if (end < 0) end = 0;
3587
3588 /* indexes sanity checks */
3589 if (start > end || start >= llen) {
3590 /* Out of range start or start > end result in empty list */
3591 addReply(c,shared.emptymultibulk);
3592 return;
3593 }
3594 if (end >= llen) end = llen-1;
3595 rangelen = (end-start)+1;
3596
3597 /* Return the result in form of a multi-bulk reply */
3598 ln = listIndex(list, start);
3599 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3600 for (j = 0; j < rangelen; j++) {
3601 ele = listNodeValue(ln);
3602 addReplyBulkLen(c,ele);
3603 addReply(c,ele);
3604 addReply(c,shared.crlf);
3605 ln = ln->next;
3606 }
3607 }
3608 }
3609 }
3610
3611 static void ltrimCommand(redisClient *c) {
3612 robj *o;
3613 int start = atoi(c->argv[2]->ptr);
3614 int end = atoi(c->argv[3]->ptr);
3615
3616 o = lookupKeyWrite(c->db,c->argv[1]);
3617 if (o == NULL) {
3618 addReply(c,shared.nokeyerr);
3619 } else {
3620 if (o->type != REDIS_LIST) {
3621 addReply(c,shared.wrongtypeerr);
3622 } else {
3623 list *list = o->ptr;
3624 listNode *ln;
3625 int llen = listLength(list);
3626 int j, ltrim, rtrim;
3627
3628 /* convert negative indexes */
3629 if (start < 0) start = llen+start;
3630 if (end < 0) end = llen+end;
3631 if (start < 0) start = 0;
3632 if (end < 0) end = 0;
3633
3634 /* indexes sanity checks */
3635 if (start > end || start >= llen) {
3636 /* Out of range start or start > end result in empty list */
3637 ltrim = llen;
3638 rtrim = 0;
3639 } else {
3640 if (end >= llen) end = llen-1;
3641 ltrim = start;
3642 rtrim = llen-end-1;
3643 }
3644
3645 /* Remove list elements to perform the trim */
3646 for (j = 0; j < ltrim; j++) {
3647 ln = listFirst(list);
3648 listDelNode(list,ln);
3649 }
3650 for (j = 0; j < rtrim; j++) {
3651 ln = listLast(list);
3652 listDelNode(list,ln);
3653 }
3654 server.dirty++;
3655 addReply(c,shared.ok);
3656 }
3657 }
3658 }
3659
3660 static void lremCommand(redisClient *c) {
3661 robj *o;
3662
3663 o = lookupKeyWrite(c->db,c->argv[1]);
3664 if (o == NULL) {
3665 addReply(c,shared.czero);
3666 } else {
3667 if (o->type != REDIS_LIST) {
3668 addReply(c,shared.wrongtypeerr);
3669 } else {
3670 list *list = o->ptr;
3671 listNode *ln, *next;
3672 int toremove = atoi(c->argv[2]->ptr);
3673 int removed = 0;
3674 int fromtail = 0;
3675
3676 if (toremove < 0) {
3677 toremove = -toremove;
3678 fromtail = 1;
3679 }
3680 ln = fromtail ? list->tail : list->head;
3681 while (ln) {
3682 robj *ele = listNodeValue(ln);
3683
3684 next = fromtail ? ln->prev : ln->next;
3685 if (compareStringObjects(ele,c->argv[3]) == 0) {
3686 listDelNode(list,ln);
3687 server.dirty++;
3688 removed++;
3689 if (toremove && removed == toremove) break;
3690 }
3691 ln = next;
3692 }
3693 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3694 }
3695 }
3696 }
3697
3698 /* This is the semantic of this command:
3699 * RPOPLPUSH srclist dstlist:
3700 * IF LLEN(srclist) > 0
3701 * element = RPOP srclist
3702 * LPUSH dstlist element
3703 * RETURN element
3704 * ELSE
3705 * RETURN nil
3706 * END
3707 * END
3708 *
3709 * The idea is to be able to get an element from a list in a reliable way
3710 * since the element is not just returned but pushed against another list
3711 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3712 */
3713 static void rpoplpushcommand(redisClient *c) {
3714 robj *sobj;
3715
3716 sobj = lookupKeyWrite(c->db,c->argv[1]);
3717 if (sobj == NULL) {
3718 addReply(c,shared.nullbulk);
3719 } else {
3720 if (sobj->type != REDIS_LIST) {
3721 addReply(c,shared.wrongtypeerr);
3722 } else {
3723 list *srclist = sobj->ptr;
3724 listNode *ln = listLast(srclist);
3725
3726 if (ln == NULL) {
3727 addReply(c,shared.nullbulk);
3728 } else {
3729 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3730 robj *ele = listNodeValue(ln);
3731 list *dstlist;
3732
3733 if (dobj == NULL) {
3734
3735 /* Create the list if the key does not exist */
3736 dobj = createListObject();
3737 dictAdd(c->db->dict,c->argv[2],dobj);
3738 incrRefCount(c->argv[2]);
3739 } else if (dobj->type != REDIS_LIST) {
3740 addReply(c,shared.wrongtypeerr);
3741 return;
3742 }
3743 /* Add the element to the target list */
3744 dstlist = dobj->ptr;
3745 listAddNodeHead(dstlist,ele);
3746 incrRefCount(ele);
3747
3748 /* Send the element to the client as reply as well */
3749 addReplyBulkLen(c,ele);
3750 addReply(c,ele);
3751 addReply(c,shared.crlf);
3752
3753 /* Finally remove the element from the source list */
3754 listDelNode(srclist,ln);
3755 server.dirty++;
3756 }
3757 }
3758 }
3759 }
3760
3761
3762 /* ==================================== Sets ================================ */
3763
3764 static void saddCommand(redisClient *c) {
3765 robj *set;
3766
3767 set = lookupKeyWrite(c->db,c->argv[1]);
3768 if (set == NULL) {
3769 set = createSetObject();
3770 dictAdd(c->db->dict,c->argv[1],set);
3771 incrRefCount(c->argv[1]);
3772 } else {
3773 if (set->type != REDIS_SET) {
3774 addReply(c,shared.wrongtypeerr);
3775 return;
3776 }
3777 }
3778 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3779 incrRefCount(c->argv[2]);
3780 server.dirty++;
3781 addReply(c,shared.cone);
3782 } else {
3783 addReply(c,shared.czero);
3784 }
3785 }
3786
3787 static void sremCommand(redisClient *c) {
3788 robj *set;
3789
3790 set = lookupKeyWrite(c->db,c->argv[1]);
3791 if (set == NULL) {
3792 addReply(c,shared.czero);
3793 } else {
3794 if (set->type != REDIS_SET) {
3795 addReply(c,shared.wrongtypeerr);
3796 return;
3797 }
3798 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3799 server.dirty++;
3800 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3801 addReply(c,shared.cone);
3802 } else {
3803 addReply(c,shared.czero);
3804 }
3805 }
3806 }
3807
3808 static void smoveCommand(redisClient *c) {
3809 robj *srcset, *dstset;
3810
3811 srcset = lookupKeyWrite(c->db,c->argv[1]);
3812 dstset = lookupKeyWrite(c->db,c->argv[2]);
3813
3814 /* If the source key does not exist return 0, if it's of the wrong type
3815 * raise an error */
3816 if (srcset == NULL || srcset->type != REDIS_SET) {
3817 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3818 return;
3819 }
3820 /* Error if the destination key is not a set as well */
3821 if (dstset && dstset->type != REDIS_SET) {
3822 addReply(c,shared.wrongtypeerr);
3823 return;
3824 }
3825 /* Remove the element from the source set */
3826 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3827 /* Key not found in the src set! return zero */
3828 addReply(c,shared.czero);
3829 return;
3830 }
3831 server.dirty++;
3832 /* Add the element to the destination set */
3833 if (!dstset) {
3834 dstset = createSetObject();
3835 dictAdd(c->db->dict,c->argv[2],dstset);
3836 incrRefCount(c->argv[2]);
3837 }
3838 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3839 incrRefCount(c->argv[3]);
3840 addReply(c,shared.cone);
3841 }
3842
3843 static void sismemberCommand(redisClient *c) {
3844 robj *set;
3845
3846 set = lookupKeyRead(c->db,c->argv[1]);
3847 if (set == NULL) {
3848 addReply(c,shared.czero);
3849 } else {
3850 if (set->type != REDIS_SET) {
3851 addReply(c,shared.wrongtypeerr);
3852 return;
3853 }
3854 if (dictFind(set->ptr,c->argv[2]))
3855 addReply(c,shared.cone);
3856 else
3857 addReply(c,shared.czero);
3858 }
3859 }
3860
3861 static void scardCommand(redisClient *c) {
3862 robj *o;
3863 dict *s;
3864
3865 o = lookupKeyRead(c->db,c->argv[1]);
3866 if (o == NULL) {
3867 addReply(c,shared.czero);
3868 return;
3869 } else {
3870 if (o->type != REDIS_SET) {
3871 addReply(c,shared.wrongtypeerr);
3872 } else {
3873 s = o->ptr;
3874 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
3875 dictSize(s)));
3876 }
3877 }
3878 }
3879
3880 static void spopCommand(redisClient *c) {
3881 robj *set;
3882 dictEntry *de;
3883
3884 set = lookupKeyWrite(c->db,c->argv[1]);
3885 if (set == NULL) {
3886 addReply(c,shared.nullbulk);
3887 } else {
3888 if (set->type != REDIS_SET) {
3889 addReply(c,shared.wrongtypeerr);
3890 return;
3891 }
3892 de = dictGetRandomKey(set->ptr);
3893 if (de == NULL) {
3894 addReply(c,shared.nullbulk);
3895 } else {
3896 robj *ele = dictGetEntryKey(de);
3897
3898 addReplyBulkLen(c,ele);
3899 addReply(c,ele);
3900 addReply(c,shared.crlf);
3901 dictDelete(set->ptr,ele);
3902 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3903 server.dirty++;
3904 }
3905 }
3906 }
3907
3908 static void srandmemberCommand(redisClient *c) {
3909 robj *set;
3910 dictEntry *de;
3911
3912 set = lookupKeyRead(c->db,c->argv[1]);
3913 if (set == NULL) {
3914 addReply(c,shared.nullbulk);
3915 } else {
3916 if (set->type != REDIS_SET) {
3917 addReply(c,shared.wrongtypeerr);
3918 return;
3919 }
3920 de = dictGetRandomKey(set->ptr);
3921 if (de == NULL) {
3922 addReply(c,shared.nullbulk);
3923 } else {
3924 robj *ele = dictGetEntryKey(de);
3925
3926 addReplyBulkLen(c,ele);
3927 addReply(c,ele);
3928 addReply(c,shared.crlf);
3929 }
3930 }
3931 }
3932
3933 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3934 dict **d1 = (void*) s1, **d2 = (void*) s2;
3935
3936 return dictSize(*d1)-dictSize(*d2);
3937 }
3938
3939 static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
3940 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3941 dictIterator *di;
3942 dictEntry *de;
3943 robj *lenobj = NULL, *dstset = NULL;
3944 unsigned long j, cardinality = 0;
3945
3946 for (j = 0; j < setsnum; j++) {
3947 robj *setobj;
3948
3949 setobj = dstkey ?
3950 lookupKeyWrite(c->db,setskeys[j]) :
3951 lookupKeyRead(c->db,setskeys[j]);
3952 if (!setobj) {
3953 zfree(dv);
3954 if (dstkey) {
3955 deleteKey(c->db,dstkey);
3956 addReply(c,shared.ok);
3957 } else {
3958 addReply(c,shared.nullmultibulk);
3959 }
3960 return;
3961 }
3962 if (setobj->type != REDIS_SET) {
3963 zfree(dv);
3964 addReply(c,shared.wrongtypeerr);
3965 return;
3966 }
3967 dv[j] = setobj->ptr;
3968 }
3969 /* Sort sets from the smallest to largest, this will improve our
3970 * algorithm's performace */
3971 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3972
3973 /* The first thing we should output is the total number of elements...
3974 * since this is a multi-bulk write, but at this stage we don't know
3975 * the intersection set size, so we use a trick, append an empty object
3976 * to the output list and save the pointer to later modify it with the
3977 * right length */
3978 if (!dstkey) {
3979 lenobj = createObject(REDIS_STRING,NULL);
3980 addReply(c,lenobj);
3981 decrRefCount(lenobj);
3982 } else {
3983 /* If we have a target key where to store the resulting set
3984 * create this key with an empty set inside */
3985 dstset = createSetObject();
3986 }
3987
3988 /* Iterate all the elements of the first (smallest) set, and test
3989 * the element against all the other sets, if at least one set does
3990 * not include the element it is discarded */
3991 di = dictGetIterator(dv[0]);
3992
3993 while((de = dictNext(di)) != NULL) {
3994 robj *ele;
3995
3996 for (j = 1; j < setsnum; j++)
3997 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3998 if (j != setsnum)
3999 continue; /* at least one set does not contain the member */
4000 ele = dictGetEntryKey(de);
4001 if (!dstkey) {
4002 addReplyBulkLen(c,ele);
4003 addReply(c,ele);
4004 addReply(c,shared.crlf);
4005 cardinality++;
4006 } else {
4007 dictAdd(dstset->ptr,ele,NULL);
4008 incrRefCount(ele);
4009 }
4010 }
4011 dictReleaseIterator(di);
4012
4013 if (dstkey) {
4014 /* Store the resulting set into the target */
4015 deleteKey(c->db,dstkey);
4016 dictAdd(c->db->dict,dstkey,dstset);
4017 incrRefCount(dstkey);
4018 }
4019
4020 if (!dstkey) {
4021 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
4022 } else {
4023 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4024 dictSize((dict*)dstset->ptr)));
4025 server.dirty++;
4026 }
4027 zfree(dv);
4028 }
4029
4030 static void sinterCommand(redisClient *c) {
4031 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4032 }
4033
4034 static void sinterstoreCommand(redisClient *c) {
4035 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4036 }
4037
4038 #define REDIS_OP_UNION 0
4039 #define REDIS_OP_DIFF 1
4040
4041 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4042 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4043 dictIterator *di;
4044 dictEntry *de;
4045 robj *dstset = NULL;
4046 int j, cardinality = 0;
4047
4048 for (j = 0; j < setsnum; j++) {
4049 robj *setobj;
4050
4051 setobj = dstkey ?
4052 lookupKeyWrite(c->db,setskeys[j]) :
4053 lookupKeyRead(c->db,setskeys[j]);
4054 if (!setobj) {
4055 dv[j] = NULL;
4056 continue;
4057 }
4058 if (setobj->type != REDIS_SET) {
4059 zfree(dv);
4060 addReply(c,shared.wrongtypeerr);
4061 return;
4062 }
4063 dv[j] = setobj->ptr;
4064 }
4065
4066 /* We need a temp set object to store our union. If the dstkey
4067 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4068 * this set object will be the resulting object to set into the target key*/
4069 dstset = createSetObject();
4070
4071 /* Iterate all the elements of all the sets, add every element a single
4072 * time to the result set */
4073 for (j = 0; j < setsnum; j++) {
4074 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4075 if (!dv[j]) continue; /* non existing keys are like empty sets */
4076
4077 di = dictGetIterator(dv[j]);
4078
4079 while((de = dictNext(di)) != NULL) {
4080 robj *ele;
4081
4082 /* dictAdd will not add the same element multiple times */
4083 ele = dictGetEntryKey(de);
4084 if (op == REDIS_OP_UNION || j == 0) {
4085 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4086 incrRefCount(ele);
4087 cardinality++;
4088 }
4089 } else if (op == REDIS_OP_DIFF) {
4090 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4091 cardinality--;
4092 }
4093 }
4094 }
4095 dictReleaseIterator(di);
4096
4097 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4098 }
4099
4100 /* Output the content of the resulting set, if not in STORE mode */
4101 if (!dstkey) {
4102 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4103 di = dictGetIterator(dstset->ptr);
4104 while((de = dictNext(di)) != NULL) {
4105 robj *ele;
4106
4107 ele = dictGetEntryKey(de);
4108 addReplyBulkLen(c,ele);
4109 addReply(c,ele);
4110 addReply(c,shared.crlf);
4111 }
4112 dictReleaseIterator(di);
4113 } else {
4114 /* If we have a target key where to store the resulting set
4115 * create this key with the result set inside */
4116 deleteKey(c->db,dstkey);
4117 dictAdd(c->db->dict,dstkey,dstset);
4118 incrRefCount(dstkey);
4119 }
4120
4121 /* Cleanup */
4122 if (!dstkey) {
4123 decrRefCount(dstset);
4124 } else {
4125 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4126 dictSize((dict*)dstset->ptr)));
4127 server.dirty++;
4128 }
4129 zfree(dv);
4130 }
4131
4132 static void sunionCommand(redisClient *c) {
4133 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4134 }
4135
4136 static void sunionstoreCommand(redisClient *c) {
4137 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4138 }
4139
4140 static void sdiffCommand(redisClient *c) {
4141 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4142 }
4143
4144 static void sdiffstoreCommand(redisClient *c) {
4145 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4146 }
4147
4148 /* ==================================== ZSets =============================== */
4149
4150 /* ZSETs are ordered sets using two data structures to hold the same elements
4151 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4152 * data structure.
4153 *
4154 * The elements are added to an hash table mapping Redis objects to scores.
4155 * At the same time the elements are added to a skip list mapping scores
4156 * to Redis objects (so objects are sorted by scores in this "view"). */
4157
4158 /* This skiplist implementation is almost a C translation of the original
4159 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4160 * Alternative to Balanced Trees", modified in three ways:
4161 * a) this implementation allows for repeated values.
4162 * b) the comparison is not just by key (our 'score') but by satellite data.
4163 * c) there is a back pointer, so it's a doubly linked list with the back
4164 * pointers being only at "level 1". This allows to traverse the list
4165 * from tail to head, useful for ZREVRANGE. */
4166
4167 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4168 zskiplistNode *zn = zmalloc(sizeof(*zn));
4169
4170 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4171 zn->score = score;
4172 zn->obj = obj;
4173 return zn;
4174 }
4175
4176 static zskiplist *zslCreate(void) {
4177 int j;
4178 zskiplist *zsl;
4179
4180 zsl = zmalloc(sizeof(*zsl));
4181 zsl->level = 1;
4182 zsl->length = 0;
4183 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4184 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4185 zsl->header->forward[j] = NULL;
4186 zsl->header->backward = NULL;
4187 zsl->tail = NULL;
4188 return zsl;
4189 }
4190
4191 static void zslFreeNode(zskiplistNode *node) {
4192 decrRefCount(node->obj);
4193 zfree(node->forward);
4194 zfree(node);
4195 }
4196
4197 static void zslFree(zskiplist *zsl) {
4198 zskiplistNode *node = zsl->header->forward[0], *next;
4199
4200 zfree(zsl->header->forward);
4201 zfree(zsl->header);
4202 while(node) {
4203 next = node->forward[0];
4204 zslFreeNode(node);
4205 node = next;
4206 }
4207 zfree(zsl);
4208 }
4209
4210 static int zslRandomLevel(void) {
4211 int level = 1;
4212 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4213 level += 1;
4214 return level;
4215 }
4216
4217 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4218 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4219 int i, level;
4220
4221 x = zsl->header;
4222 for (i = zsl->level-1; i >= 0; i--) {
4223 while (x->forward[i] &&
4224 (x->forward[i]->score < score ||
4225 (x->forward[i]->score == score &&
4226 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4227 x = x->forward[i];
4228 update[i] = x;
4229 }
4230 /* we assume the key is not already inside, since we allow duplicated
4231 * scores, and the re-insertion of score and redis object should never
4232 * happpen since the caller of zslInsert() should test in the hash table
4233 * if the element is already inside or not. */
4234 level = zslRandomLevel();
4235 if (level > zsl->level) {
4236 for (i = zsl->level; i < level; i++)
4237 update[i] = zsl->header;
4238 zsl->level = level;
4239 }
4240 x = zslCreateNode(level,score,obj);
4241 for (i = 0; i < level; i++) {
4242 x->forward[i] = update[i]->forward[i];
4243 update[i]->forward[i] = x;
4244 }
4245 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4246 if (x->forward[0])
4247 x->forward[0]->backward = x;
4248 else
4249 zsl->tail = x;
4250 zsl->length++;
4251 }
4252
4253 /* Delete an element with matching score/object from the skiplist. */
4254 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4255 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4256 int i;
4257
4258 x = zsl->header;
4259 for (i = zsl->level-1; i >= 0; i--) {
4260 while (x->forward[i] &&
4261 (x->forward[i]->score < score ||
4262 (x->forward[i]->score == score &&
4263 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4264 x = x->forward[i];
4265 update[i] = x;
4266 }
4267 /* We may have multiple elements with the same score, what we need
4268 * is to find the element with both the right score and object. */
4269 x = x->forward[0];
4270 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4271 for (i = 0; i < zsl->level; i++) {
4272 if (update[i]->forward[i] != x) break;
4273 update[i]->forward[i] = x->forward[i];
4274 }
4275 if (x->forward[0]) {
4276 x->forward[0]->backward = (x->backward == zsl->header) ?
4277 NULL : x->backward;
4278 } else {
4279 zsl->tail = x->backward;
4280 }
4281 zslFreeNode(x);
4282 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4283 zsl->level--;
4284 zsl->length--;
4285 return 1;
4286 } else {
4287 return 0; /* not found */
4288 }
4289 return 0; /* not found */
4290 }
4291
4292 /* Delete all the elements with score between min and max from the skiplist.
4293 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4294 * Note that this function takes the reference to the hash table view of the
4295 * sorted set, in order to remove the elements from the hash table too. */
4296 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4297 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4298 unsigned long removed = 0;
4299 int i;
4300
4301 x = zsl->header;
4302 for (i = zsl->level-1; i >= 0; i--) {
4303 while (x->forward[i] && x->forward[i]->score < min)
4304 x = x->forward[i];
4305 update[i] = x;
4306 }
4307 /* We may have multiple elements with the same score, what we need
4308 * is to find the element with both the right score and object. */
4309 x = x->forward[0];
4310 while (x && x->score <= max) {
4311 zskiplistNode *next;
4312
4313 for (i = 0; i < zsl->level; i++) {
4314 if (update[i]->forward[i] != x) break;
4315 update[i]->forward[i] = x->forward[i];
4316 }
4317 if (x->forward[0]) {
4318 x->forward[0]->backward = (x->backward == zsl->header) ?
4319 NULL : x->backward;
4320 } else {
4321 zsl->tail = x->backward;
4322 }
4323 next = x->forward[0];
4324 dictDelete(dict,x->obj);
4325 zslFreeNode(x);
4326 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4327 zsl->level--;
4328 zsl->length--;
4329 removed++;
4330 x = next;
4331 }
4332 return removed; /* not found */
4333 }
4334
4335 /* Find the first node having a score equal or greater than the specified one.
4336 * Returns NULL if there is no match. */
4337 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4338 zskiplistNode *x;
4339 int i;
4340
4341 x = zsl->header;
4342 for (i = zsl->level-1; i >= 0; i--) {
4343 while (x->forward[i] && x->forward[i]->score < score)
4344 x = x->forward[i];
4345 }
4346 /* We may have multiple elements with the same score, what we need
4347 * is to find the element with both the right score and object. */
4348 return x->forward[0];
4349 }
4350
4351 /* The actual Z-commands implementations */
4352
4353 /* This generic command implements both ZADD and ZINCRBY.
4354 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4355 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4356 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4357 robj *zsetobj;
4358 zset *zs;
4359 double *score;
4360
4361 zsetobj = lookupKeyWrite(c->db,key);
4362 if (zsetobj == NULL) {
4363 zsetobj = createZsetObject();
4364 dictAdd(c->db->dict,key,zsetobj);
4365 incrRefCount(key);
4366 } else {
4367 if (zsetobj->type != REDIS_ZSET) {
4368 addReply(c,shared.wrongtypeerr);
4369 return;
4370 }
4371 }
4372 zs = zsetobj->ptr;
4373
4374 /* Ok now since we implement both ZADD and ZINCRBY here the code
4375 * needs to handle the two different conditions. It's all about setting
4376 * '*score', that is, the new score to set, to the right value. */
4377 score = zmalloc(sizeof(double));
4378 if (doincrement) {
4379 dictEntry *de;
4380
4381 /* Read the old score. If the element was not present starts from 0 */
4382 de = dictFind(zs->dict,ele);
4383 if (de) {
4384 double *oldscore = dictGetEntryVal(de);
4385 *score = *oldscore + scoreval;
4386 } else {
4387 *score = scoreval;
4388 }
4389 } else {
4390 *score = scoreval;
4391 }
4392
4393 /* What follows is a simple remove and re-insert operation that is common
4394 * to both ZADD and ZINCRBY... */
4395 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4396 /* case 1: New element */
4397 incrRefCount(ele); /* added to hash */
4398 zslInsert(zs->zsl,*score,ele);
4399 incrRefCount(ele); /* added to skiplist */
4400 server.dirty++;
4401 if (doincrement)
4402 addReplyDouble(c,*score);
4403 else
4404 addReply(c,shared.cone);
4405 } else {
4406 dictEntry *de;
4407 double *oldscore;
4408
4409 /* case 2: Score update operation */
4410 de = dictFind(zs->dict,ele);
4411 redisAssert(de != NULL);
4412 oldscore = dictGetEntryVal(de);
4413 if (*score != *oldscore) {
4414 int deleted;
4415
4416 /* Remove and insert the element in the skip list with new score */
4417 deleted = zslDelete(zs->zsl,*oldscore,ele);
4418 redisAssert(deleted != 0);
4419 zslInsert(zs->zsl,*score,ele);
4420 incrRefCount(ele);
4421 /* Update the score in the hash table */
4422 dictReplace(zs->dict,ele,score);
4423 server.dirty++;
4424 } else {
4425 zfree(score);
4426 }
4427 if (doincrement)
4428 addReplyDouble(c,*score);
4429 else
4430 addReply(c,shared.czero);
4431 }
4432 }
4433
4434 static void zaddCommand(redisClient *c) {
4435 double scoreval;
4436
4437 scoreval = strtod(c->argv[2]->ptr,NULL);
4438 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4439 }
4440
4441 static void zincrbyCommand(redisClient *c) {
4442 double scoreval;
4443
4444 scoreval = strtod(c->argv[2]->ptr,NULL);
4445 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4446 }
4447
4448 static void zremCommand(redisClient *c) {
4449 robj *zsetobj;
4450 zset *zs;
4451
4452 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4453 if (zsetobj == NULL) {
4454 addReply(c,shared.czero);
4455 } else {
4456 dictEntry *de;
4457 double *oldscore;
4458 int deleted;
4459
4460 if (zsetobj->type != REDIS_ZSET) {
4461 addReply(c,shared.wrongtypeerr);
4462 return;
4463 }
4464 zs = zsetobj->ptr;
4465 de = dictFind(zs->dict,c->argv[2]);
4466 if (de == NULL) {
4467 addReply(c,shared.czero);
4468 return;
4469 }
4470 /* Delete from the skiplist */
4471 oldscore = dictGetEntryVal(de);
4472 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4473 redisAssert(deleted != 0);
4474
4475 /* Delete from the hash table */
4476 dictDelete(zs->dict,c->argv[2]);
4477 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4478 server.dirty++;
4479 addReply(c,shared.cone);
4480 }
4481 }
4482
4483 static void zremrangebyscoreCommand(redisClient *c) {
4484 double min = strtod(c->argv[2]->ptr,NULL);
4485 double max = strtod(c->argv[3]->ptr,NULL);
4486 robj *zsetobj;
4487 zset *zs;
4488
4489 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4490 if (zsetobj == NULL) {
4491 addReply(c,shared.czero);
4492 } else {
4493 long deleted;
4494
4495 if (zsetobj->type != REDIS_ZSET) {
4496 addReply(c,shared.wrongtypeerr);
4497 return;
4498 }
4499 zs = zsetobj->ptr;
4500 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4501 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4502 server.dirty += deleted;
4503 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4504 }
4505 }
4506
4507 static void zrangeGenericCommand(redisClient *c, int reverse) {
4508 robj *o;
4509 int start = atoi(c->argv[2]->ptr);
4510 int end = atoi(c->argv[3]->ptr);
4511
4512 o = lookupKeyRead(c->db,c->argv[1]);
4513 if (o == NULL) {
4514 addReply(c,shared.nullmultibulk);
4515 } else {
4516 if (o->type != REDIS_ZSET) {
4517 addReply(c,shared.wrongtypeerr);
4518 } else {
4519 zset *zsetobj = o->ptr;
4520 zskiplist *zsl = zsetobj->zsl;
4521 zskiplistNode *ln;
4522
4523 int llen = zsl->length;
4524 int rangelen, j;
4525 robj *ele;
4526
4527 /* convert negative indexes */
4528 if (start < 0) start = llen+start;
4529 if (end < 0) end = llen+end;
4530 if (start < 0) start = 0;
4531 if (end < 0) end = 0;
4532
4533 /* indexes sanity checks */
4534 if (start > end || start >= llen) {
4535 /* Out of range start or start > end result in empty list */
4536 addReply(c,shared.emptymultibulk);
4537 return;
4538 }
4539 if (end >= llen) end = llen-1;
4540 rangelen = (end-start)+1;
4541
4542 /* Return the result in form of a multi-bulk reply */
4543 if (reverse) {
4544 ln = zsl->tail;
4545 while (start--)
4546 ln = ln->backward;
4547 } else {
4548 ln = zsl->header->forward[0];
4549 while (start--)
4550 ln = ln->forward[0];
4551 }
4552
4553 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4554 for (j = 0; j < rangelen; j++) {
4555 ele = ln->obj;
4556 addReplyBulkLen(c,ele);
4557 addReply(c,ele);
4558 addReply(c,shared.crlf);
4559 ln = reverse ? ln->backward : ln->forward[0];
4560 }
4561 }
4562 }
4563 }
4564
4565 static void zrangeCommand(redisClient *c) {
4566 zrangeGenericCommand(c,0);
4567 }
4568
4569 static void zrevrangeCommand(redisClient *c) {
4570 zrangeGenericCommand(c,1);
4571 }
4572
4573 static void zrangebyscoreCommand(redisClient *c) {
4574 robj *o;
4575 double min = strtod(c->argv[2]->ptr,NULL);
4576 double max = strtod(c->argv[3]->ptr,NULL);
4577 int offset = 0, limit = -1;
4578
4579 if (c->argc != 4 && c->argc != 7) {
4580 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4581 return;
4582 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4583 addReply(c,shared.syntaxerr);
4584 return;
4585 } else if (c->argc == 7) {
4586 offset = atoi(c->argv[5]->ptr);
4587 limit = atoi(c->argv[6]->ptr);
4588 if (offset < 0) offset = 0;
4589 }
4590
4591 o = lookupKeyRead(c->db,c->argv[1]);
4592 if (o == NULL) {
4593 addReply(c,shared.nullmultibulk);
4594 } else {
4595 if (o->type != REDIS_ZSET) {
4596 addReply(c,shared.wrongtypeerr);
4597 } else {
4598 zset *zsetobj = o->ptr;
4599 zskiplist *zsl = zsetobj->zsl;
4600 zskiplistNode *ln;
4601 robj *ele, *lenobj;
4602 unsigned int rangelen = 0;
4603
4604 /* Get the first node with the score >= min */
4605 ln = zslFirstWithScore(zsl,min);
4606 if (ln == NULL) {
4607 /* No element matching the speciifed interval */
4608 addReply(c,shared.emptymultibulk);
4609 return;
4610 }
4611
4612 /* We don't know in advance how many matching elements there
4613 * are in the list, so we push this object that will represent
4614 * the multi-bulk length in the output buffer, and will "fix"
4615 * it later */
4616 lenobj = createObject(REDIS_STRING,NULL);
4617 addReply(c,lenobj);
4618 decrRefCount(lenobj);
4619
4620 while(ln && ln->score <= max) {
4621 if (offset) {
4622 offset--;
4623 ln = ln->forward[0];
4624 continue;
4625 }
4626 if (limit == 0) break;
4627 ele = ln->obj;
4628 addReplyBulkLen(c,ele);
4629 addReply(c,ele);
4630 addReply(c,shared.crlf);
4631 ln = ln->forward[0];
4632 rangelen++;
4633 if (limit > 0) limit--;
4634 }
4635 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4636 }
4637 }
4638 }
4639
4640 static void zcardCommand(redisClient *c) {
4641 robj *o;
4642 zset *zs;
4643
4644 o = lookupKeyRead(c->db,c->argv[1]);
4645 if (o == NULL) {
4646 addReply(c,shared.czero);
4647 return;
4648 } else {
4649 if (o->type != REDIS_ZSET) {
4650 addReply(c,shared.wrongtypeerr);
4651 } else {
4652 zs = o->ptr;
4653 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
4654 }
4655 }
4656 }
4657
4658 static void zscoreCommand(redisClient *c) {
4659 robj *o;
4660 zset *zs;
4661
4662 o = lookupKeyRead(c->db,c->argv[1]);
4663 if (o == NULL) {
4664 addReply(c,shared.nullbulk);
4665 return;
4666 } else {
4667 if (o->type != REDIS_ZSET) {
4668 addReply(c,shared.wrongtypeerr);
4669 } else {
4670 dictEntry *de;
4671
4672 zs = o->ptr;
4673 de = dictFind(zs->dict,c->argv[2]);
4674 if (!de) {
4675 addReply(c,shared.nullbulk);
4676 } else {
4677 double *score = dictGetEntryVal(de);
4678
4679 addReplyDouble(c,*score);
4680 }
4681 }
4682 }
4683 }
4684
4685 /* ========================= Non type-specific commands ==================== */
4686
4687 static void flushdbCommand(redisClient *c) {
4688 server.dirty += dictSize(c->db->dict);
4689 dictEmpty(c->db->dict);
4690 dictEmpty(c->db->expires);
4691 addReply(c,shared.ok);
4692 }
4693
4694 static void flushallCommand(redisClient *c) {
4695 server.dirty += emptyDb();
4696 addReply(c,shared.ok);
4697 rdbSave(server.dbfilename);
4698 server.dirty++;
4699 }
4700
4701 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4702 redisSortOperation *so = zmalloc(sizeof(*so));
4703 so->type = type;
4704 so->pattern = pattern;
4705 return so;
4706 }
4707
4708 /* Return the value associated to the key with a name obtained
4709 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4710 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4711 char *p;
4712 sds spat, ssub;
4713 robj keyobj;
4714 int prefixlen, sublen, postfixlen;
4715 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4716 struct {
4717 long len;
4718 long free;
4719 char buf[REDIS_SORTKEY_MAX+1];
4720 } keyname;
4721
4722 /* If the pattern is "#" return the substitution object itself in order
4723 * to implement the "SORT ... GET #" feature. */
4724 spat = pattern->ptr;
4725 if (spat[0] == '#' && spat[1] == '\0') {
4726 return subst;
4727 }
4728
4729 /* The substitution object may be specially encoded. If so we create
4730 * a decoded object on the fly. Otherwise getDecodedObject will just
4731 * increment the ref count, that we'll decrement later. */
4732 subst = getDecodedObject(subst);
4733
4734 ssub = subst->ptr;
4735 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4736 p = strchr(spat,'*');
4737 if (!p) {
4738 decrRefCount(subst);
4739 return NULL;
4740 }
4741
4742 prefixlen = p-spat;
4743 sublen = sdslen(ssub);
4744 postfixlen = sdslen(spat)-(prefixlen+1);
4745 memcpy(keyname.buf,spat,prefixlen);
4746 memcpy(keyname.buf+prefixlen,ssub,sublen);
4747 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4748 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4749 keyname.len = prefixlen+sublen+postfixlen;
4750
4751 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
4752 decrRefCount(subst);
4753
4754 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4755 return lookupKeyRead(db,&keyobj);
4756 }
4757
4758 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4759 * the additional parameter is not standard but a BSD-specific we have to
4760 * pass sorting parameters via the global 'server' structure */
4761 static int sortCompare(const void *s1, const void *s2) {
4762 const redisSortObject *so1 = s1, *so2 = s2;
4763 int cmp;
4764
4765 if (!server.sort_alpha) {
4766 /* Numeric sorting. Here it's trivial as we precomputed scores */
4767 if (so1->u.score > so2->u.score) {
4768 cmp = 1;
4769 } else if (so1->u.score < so2->u.score) {
4770 cmp = -1;
4771 } else {
4772 cmp = 0;
4773 }
4774 } else {
4775 /* Alphanumeric sorting */
4776 if (server.sort_bypattern) {
4777 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4778 /* At least one compare object is NULL */
4779 if (so1->u.cmpobj == so2->u.cmpobj)
4780 cmp = 0;
4781 else if (so1->u.cmpobj == NULL)
4782 cmp = -1;
4783 else
4784 cmp = 1;
4785 } else {
4786 /* We have both the objects, use strcoll */
4787 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4788 }
4789 } else {
4790 /* Compare elements directly */
4791 robj *dec1, *dec2;
4792
4793 dec1 = getDecodedObject(so1->obj);
4794 dec2 = getDecodedObject(so2->obj);
4795 cmp = strcoll(dec1->ptr,dec2->ptr);
4796 decrRefCount(dec1);
4797 decrRefCount(dec2);
4798 }
4799 }
4800 return server.sort_desc ? -cmp : cmp;
4801 }
4802
4803 /* The SORT command is the most complex command in Redis. Warning: this code
4804 * is optimized for speed and a bit less for readability */
4805 static void sortCommand(redisClient *c) {
4806 list *operations;
4807 int outputlen = 0;
4808 int desc = 0, alpha = 0;
4809 int limit_start = 0, limit_count = -1, start, end;
4810 int j, dontsort = 0, vectorlen;
4811 int getop = 0; /* GET operation counter */
4812 robj *sortval, *sortby = NULL, *storekey = NULL;
4813 redisSortObject *vector; /* Resulting vector to sort */
4814
4815 /* Lookup the key to sort. It must be of the right types */
4816 sortval = lookupKeyRead(c->db,c->argv[1]);
4817 if (sortval == NULL) {
4818 addReply(c,shared.nokeyerr);
4819 return;
4820 }
4821 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
4822 sortval->type != REDIS_ZSET)
4823 {
4824 addReply(c,shared.wrongtypeerr);
4825 return;
4826 }
4827
4828 /* Create a list of operations to perform for every sorted element.
4829 * Operations can be GET/DEL/INCR/DECR */
4830 operations = listCreate();
4831 listSetFreeMethod(operations,zfree);
4832 j = 2;
4833
4834 /* Now we need to protect sortval incrementing its count, in the future
4835 * SORT may have options able to overwrite/delete keys during the sorting
4836 * and the sorted key itself may get destroied */
4837 incrRefCount(sortval);
4838
4839 /* The SORT command has an SQL-alike syntax, parse it */
4840 while(j < c->argc) {
4841 int leftargs = c->argc-j-1;
4842 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4843 desc = 0;
4844 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4845 desc = 1;
4846 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4847 alpha = 1;
4848 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4849 limit_start = atoi(c->argv[j+1]->ptr);
4850 limit_count = atoi(c->argv[j+2]->ptr);
4851 j+=2;
4852 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4853 storekey = c->argv[j+1];
4854 j++;
4855 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4856 sortby = c->argv[j+1];
4857 /* If the BY pattern does not contain '*', i.e. it is constant,
4858 * we don't need to sort nor to lookup the weight keys. */
4859 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4860 j++;
4861 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4862 listAddNodeTail(operations,createSortOperation(
4863 REDIS_SORT_GET,c->argv[j+1]));
4864 getop++;
4865 j++;
4866 } else {
4867 decrRefCount(sortval);
4868 listRelease(operations);
4869 addReply(c,shared.syntaxerr);
4870 return;
4871 }
4872 j++;
4873 }
4874
4875 /* Load the sorting vector with all the objects to sort */
4876 switch(sortval->type) {
4877 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
4878 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
4879 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
4880 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
4881 }
4882 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4883 j = 0;
4884
4885 if (sortval->type == REDIS_LIST) {
4886 list *list = sortval->ptr;
4887 listNode *ln;
4888
4889 listRewind(list);
4890 while((ln = listYield(list))) {
4891 robj *ele = ln->value;
4892 vector[j].obj = ele;
4893 vector[j].u.score = 0;
4894 vector[j].u.cmpobj = NULL;
4895 j++;
4896 }
4897 } else {
4898 dict *set;
4899 dictIterator *di;
4900 dictEntry *setele;
4901
4902 if (sortval->type == REDIS_SET) {
4903 set = sortval->ptr;
4904 } else {
4905 zset *zs = sortval->ptr;
4906 set = zs->dict;
4907 }
4908
4909 di = dictGetIterator(set);
4910 while((setele = dictNext(di)) != NULL) {
4911 vector[j].obj = dictGetEntryKey(setele);
4912 vector[j].u.score = 0;
4913 vector[j].u.cmpobj = NULL;
4914 j++;
4915 }
4916 dictReleaseIterator(di);
4917 }
4918 redisAssert(j == vectorlen);
4919
4920 /* Now it's time to load the right scores in the sorting vector */
4921 if (dontsort == 0) {
4922 for (j = 0; j < vectorlen; j++) {
4923 if (sortby) {
4924 robj *byval;
4925
4926 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4927 if (!byval || byval->type != REDIS_STRING) continue;
4928 if (alpha) {
4929 vector[j].u.cmpobj = getDecodedObject(byval);
4930 } else {
4931 if (byval->encoding == REDIS_ENCODING_RAW) {
4932 vector[j].u.score = strtod(byval->ptr,NULL);
4933 } else {
4934 /* Don't need to decode the object if it's
4935 * integer-encoded (the only encoding supported) so
4936 * far. We can just cast it */
4937 if (byval->encoding == REDIS_ENCODING_INT) {
4938 vector[j].u.score = (long)byval->ptr;
4939 } else
4940 redisAssert(1 != 1);
4941 }
4942 }
4943 } else {
4944 if (!alpha) {
4945 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4946 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4947 else {
4948 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4949 vector[j].u.score = (long) vector[j].obj->ptr;
4950 else
4951 redisAssert(1 != 1);
4952 }
4953 }
4954 }
4955 }
4956 }
4957
4958 /* We are ready to sort the vector... perform a bit of sanity check
4959 * on the LIMIT option too. We'll use a partial version of quicksort. */
4960 start = (limit_start < 0) ? 0 : limit_start;
4961 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4962 if (start >= vectorlen) {
4963 start = vectorlen-1;
4964 end = vectorlen-2;
4965 }
4966 if (end >= vectorlen) end = vectorlen-1;
4967
4968 if (dontsort == 0) {
4969 server.sort_desc = desc;
4970 server.sort_alpha = alpha;
4971 server.sort_bypattern = sortby ? 1 : 0;
4972 if (sortby && (start != 0 || end != vectorlen-1))
4973 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4974 else
4975 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4976 }
4977
4978 /* Send command output to the output buffer, performing the specified
4979 * GET/DEL/INCR/DECR operations if any. */
4980 outputlen = getop ? getop*(end-start+1) : end-start+1;
4981 if (storekey == NULL) {
4982 /* STORE option not specified, sent the sorting result to client */
4983 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4984 for (j = start; j <= end; j++) {
4985 listNode *ln;
4986 if (!getop) {
4987 addReplyBulkLen(c,vector[j].obj);
4988 addReply(c,vector[j].obj);
4989 addReply(c,shared.crlf);
4990 }
4991 listRewind(operations);
4992 while((ln = listYield(operations))) {
4993 redisSortOperation *sop = ln->value;
4994 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4995 vector[j].obj);
4996
4997 if (sop->type == REDIS_SORT_GET) {
4998 if (!val || val->type != REDIS_STRING) {
4999 addReply(c,shared.nullbulk);
5000 } else {
5001 addReplyBulkLen(c,val);
5002 addReply(c,val);
5003 addReply(c,shared.crlf);
5004 }
5005 } else {
5006 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5007 }
5008 }
5009 }
5010 } else {
5011 robj *listObject = createListObject();
5012 list *listPtr = (list*) listObject->ptr;
5013
5014 /* STORE option specified, set the sorting result as a List object */
5015 for (j = start; j <= end; j++) {
5016 listNode *ln;
5017 if (!getop) {
5018 listAddNodeTail(listPtr,vector[j].obj);
5019 incrRefCount(vector[j].obj);
5020 }
5021 listRewind(operations);
5022 while((ln = listYield(operations))) {
5023 redisSortOperation *sop = ln->value;
5024 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5025 vector[j].obj);
5026
5027 if (sop->type == REDIS_SORT_GET) {
5028 if (!val || val->type != REDIS_STRING) {
5029 listAddNodeTail(listPtr,createStringObject("",0));
5030 } else {
5031 listAddNodeTail(listPtr,val);
5032 incrRefCount(val);
5033 }
5034 } else {
5035 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5036 }
5037 }
5038 }
5039 if (dictReplace(c->db->dict,storekey,listObject)) {
5040 incrRefCount(storekey);
5041 }
5042 /* Note: we add 1 because the DB is dirty anyway since even if the
5043 * SORT result is empty a new key is set and maybe the old content
5044 * replaced. */
5045 server.dirty += 1+outputlen;
5046 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5047 }
5048
5049 /* Cleanup */
5050 decrRefCount(sortval);
5051 listRelease(operations);
5052 for (j = 0; j < vectorlen; j++) {
5053 if (sortby && alpha && vector[j].u.cmpobj)
5054 decrRefCount(vector[j].u.cmpobj);
5055 }
5056 zfree(vector);
5057 }
5058
5059 /* Create the string returned by the INFO command. This is decoupled
5060 * by the INFO command itself as we need to report the same information
5061 * on memory corruption problems. */
5062 static sds genRedisInfoString(void) {
5063 sds info;
5064 time_t uptime = time(NULL)-server.stat_starttime;
5065 int j;
5066
5067 info = sdscatprintf(sdsempty(),
5068 "redis_version:%s\r\n"
5069 "arch_bits:%s\r\n"
5070 "multiplexing_api:%s\r\n"
5071 "uptime_in_seconds:%ld\r\n"
5072 "uptime_in_days:%ld\r\n"
5073 "connected_clients:%d\r\n"
5074 "connected_slaves:%d\r\n"
5075 "used_memory:%zu\r\n"
5076 "changes_since_last_save:%lld\r\n"
5077 "bgsave_in_progress:%d\r\n"
5078 "last_save_time:%ld\r\n"
5079 "total_connections_received:%lld\r\n"
5080 "total_commands_processed:%lld\r\n"
5081 "role:%s\r\n"
5082 ,REDIS_VERSION,
5083 (sizeof(long) == 8) ? "64" : "32",
5084 aeGetApiName(),
5085 uptime,
5086 uptime/(3600*24),
5087 listLength(server.clients)-listLength(server.slaves),
5088 listLength(server.slaves),
5089 server.usedmemory,
5090 server.dirty,
5091 server.bgsavechildpid != -1,
5092 server.lastsave,
5093 server.stat_numconnections,
5094 server.stat_numcommands,
5095 server.masterhost == NULL ? "master" : "slave"
5096 );
5097 if (server.masterhost) {
5098 info = sdscatprintf(info,
5099 "master_host:%s\r\n"
5100 "master_port:%d\r\n"
5101 "master_link_status:%s\r\n"
5102 "master_last_io_seconds_ago:%d\r\n"
5103 ,server.masterhost,
5104 server.masterport,
5105 (server.replstate == REDIS_REPL_CONNECTED) ?
5106 "up" : "down",
5107 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5108 );
5109 }
5110 for (j = 0; j < server.dbnum; j++) {
5111 long long keys, vkeys;
5112
5113 keys = dictSize(server.db[j].dict);
5114 vkeys = dictSize(server.db[j].expires);
5115 if (keys || vkeys) {
5116 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5117 j, keys, vkeys);
5118 }
5119 }
5120 return info;
5121 }
5122
5123 static void infoCommand(redisClient *c) {
5124 sds info = genRedisInfoString();
5125 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",sdslen(info)));
5126 addReplySds(c,info);
5127 addReply(c,shared.crlf);
5128 }
5129
5130 static void monitorCommand(redisClient *c) {
5131 /* ignore MONITOR if aleady slave or in monitor mode */
5132 if (c->flags & REDIS_SLAVE) return;
5133
5134 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5135 c->slaveseldb = 0;
5136 listAddNodeTail(server.monitors,c);
5137 addReply(c,shared.ok);
5138 }
5139
5140 /* ================================= Expire ================================= */
5141 static int removeExpire(redisDb *db, robj *key) {
5142 if (dictDelete(db->expires,key) == DICT_OK) {
5143 return 1;
5144 } else {
5145 return 0;
5146 }
5147 }
5148
5149 static int setExpire(redisDb *db, robj *key, time_t when) {
5150 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5151 return 0;
5152 } else {
5153 incrRefCount(key);
5154 return 1;
5155 }
5156 }
5157
5158 /* Return the expire time of the specified key, or -1 if no expire
5159 * is associated with this key (i.e. the key is non volatile) */
5160 static time_t getExpire(redisDb *db, robj *key) {
5161 dictEntry *de;
5162
5163 /* No expire? return ASAP */
5164 if (dictSize(db->expires) == 0 ||
5165 (de = dictFind(db->expires,key)) == NULL) return -1;
5166
5167 return (time_t) dictGetEntryVal(de);
5168 }
5169
5170 static int expireIfNeeded(redisDb *db, robj *key) {
5171 time_t when;
5172 dictEntry *de;
5173
5174 /* No expire? return ASAP */
5175 if (dictSize(db->expires) == 0 ||
5176 (de = dictFind(db->expires,key)) == NULL) return 0;
5177
5178 /* Lookup the expire */
5179 when = (time_t) dictGetEntryVal(de);
5180 if (time(NULL) <= when) return 0;
5181
5182 /* Delete the key */
5183 dictDelete(db->expires,key);
5184 return dictDelete(db->dict,key) == DICT_OK;
5185 }
5186
5187 static int deleteIfVolatile(redisDb *db, robj *key) {
5188 dictEntry *de;
5189
5190 /* No expire? return ASAP */
5191 if (dictSize(db->expires) == 0 ||
5192 (de = dictFind(db->expires,key)) == NULL) return 0;
5193
5194 /* Delete the key */
5195 server.dirty++;
5196 dictDelete(db->expires,key);
5197 return dictDelete(db->dict,key) == DICT_OK;
5198 }
5199
5200 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5201 dictEntry *de;
5202
5203 de = dictFind(c->db->dict,key);
5204 if (de == NULL) {
5205 addReply(c,shared.czero);
5206 return;
5207 }
5208 if (seconds < 0) {
5209 if (deleteKey(c->db,key)) server.dirty++;
5210 addReply(c, shared.cone);
5211 return;
5212 } else {
5213 time_t when = time(NULL)+seconds;
5214 if (setExpire(c->db,key,when)) {
5215 addReply(c,shared.cone);
5216 server.dirty++;
5217 } else {
5218 addReply(c,shared.czero);
5219 }
5220 return;
5221 }
5222 }
5223
5224 static void expireCommand(redisClient *c) {
5225 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5226 }
5227
5228 static void expireatCommand(redisClient *c) {
5229 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5230 }
5231
5232 static void ttlCommand(redisClient *c) {
5233 time_t expire;
5234 int ttl = -1;
5235
5236 expire = getExpire(c->db,c->argv[1]);
5237 if (expire != -1) {
5238 ttl = (int) (expire-time(NULL));
5239 if (ttl < 0) ttl = -1;
5240 }
5241 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5242 }
5243
5244 /* =============================== Replication ============================= */
5245
5246 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5247 ssize_t nwritten, ret = size;
5248 time_t start = time(NULL);
5249
5250 timeout++;
5251 while(size) {
5252 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5253 nwritten = write(fd,ptr,size);
5254 if (nwritten == -1) return -1;
5255 ptr += nwritten;
5256 size -= nwritten;
5257 }
5258 if ((time(NULL)-start) > timeout) {
5259 errno = ETIMEDOUT;
5260 return -1;
5261 }
5262 }
5263 return ret;
5264 }
5265
5266 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5267 ssize_t nread, totread = 0;
5268 time_t start = time(NULL);
5269
5270 timeout++;
5271 while(size) {
5272 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5273 nread = read(fd,ptr,size);
5274 if (nread == -1) return -1;
5275 ptr += nread;
5276 size -= nread;
5277 totread += nread;
5278 }
5279 if ((time(NULL)-start) > timeout) {
5280 errno = ETIMEDOUT;
5281 return -1;
5282 }
5283 }
5284 return totread;
5285 }
5286
5287 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5288 ssize_t nread = 0;
5289
5290 size--;
5291 while(size) {
5292 char c;
5293
5294 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5295 if (c == '\n') {
5296 *ptr = '\0';
5297 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5298 return nread;
5299 } else {
5300 *ptr++ = c;
5301 *ptr = '\0';
5302 nread++;
5303 }
5304 }
5305 return nread;
5306 }
5307
5308 static void syncCommand(redisClient *c) {
5309 /* ignore SYNC if aleady slave or in monitor mode */
5310 if (c->flags & REDIS_SLAVE) return;
5311
5312 /* SYNC can't be issued when the server has pending data to send to
5313 * the client about already issued commands. We need a fresh reply
5314 * buffer registering the differences between the BGSAVE and the current
5315 * dataset, so that we can copy to other slaves if needed. */
5316 if (listLength(c->reply) != 0) {
5317 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5318 return;
5319 }
5320
5321 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5322 /* Here we need to check if there is a background saving operation
5323 * in progress, or if it is required to start one */
5324 if (server.bgsavechildpid != -1) {
5325 /* Ok a background save is in progress. Let's check if it is a good
5326 * one for replication, i.e. if there is another slave that is
5327 * registering differences since the server forked to save */
5328 redisClient *slave;
5329 listNode *ln;
5330
5331 listRewind(server.slaves);
5332 while((ln = listYield(server.slaves))) {
5333 slave = ln->value;
5334 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5335 }
5336 if (ln) {
5337 /* Perfect, the server is already registering differences for
5338 * another slave. Set the right state, and copy the buffer. */
5339 listRelease(c->reply);
5340 c->reply = listDup(slave->reply);
5341 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5342 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5343 } else {
5344 /* No way, we need to wait for the next BGSAVE in order to
5345 * register differences */
5346 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5347 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5348 }
5349 } else {
5350 /* Ok we don't have a BGSAVE in progress, let's start one */
5351 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5352 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5353 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5354 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5355 return;
5356 }
5357 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5358 }
5359 c->repldbfd = -1;
5360 c->flags |= REDIS_SLAVE;
5361 c->slaveseldb = 0;
5362 listAddNodeTail(server.slaves,c);
5363 return;
5364 }
5365
5366 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5367 redisClient *slave = privdata;
5368 REDIS_NOTUSED(el);
5369 REDIS_NOTUSED(mask);
5370 char buf[REDIS_IOBUF_LEN];
5371 ssize_t nwritten, buflen;
5372
5373 if (slave->repldboff == 0) {
5374 /* Write the bulk write count before to transfer the DB. In theory here
5375 * we don't know how much room there is in the output buffer of the
5376 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5377 * operations) will never be smaller than the few bytes we need. */
5378 sds bulkcount;
5379
5380 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5381 slave->repldbsize);
5382 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5383 {
5384 sdsfree(bulkcount);
5385 freeClient(slave);
5386 return;
5387 }
5388 sdsfree(bulkcount);
5389 }
5390 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5391 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5392 if (buflen <= 0) {
5393 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5394 (buflen == 0) ? "premature EOF" : strerror(errno));
5395 freeClient(slave);
5396 return;
5397 }
5398 if ((nwritten = write(fd,buf,buflen)) == -1) {
5399 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5400 strerror(errno));
5401 freeClient(slave);
5402 return;
5403 }
5404 slave->repldboff += nwritten;
5405 if (slave->repldboff == slave->repldbsize) {
5406 close(slave->repldbfd);
5407 slave->repldbfd = -1;
5408 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5409 slave->replstate = REDIS_REPL_ONLINE;
5410 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5411 sendReplyToClient, slave) == AE_ERR) {
5412 freeClient(slave);
5413 return;
5414 }
5415 addReplySds(slave,sdsempty());
5416 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5417 }
5418 }
5419
5420 /* This function is called at the end of every backgrond saving.
5421 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5422 * otherwise REDIS_ERR is passed to the function.
5423 *
5424 * The goal of this function is to handle slaves waiting for a successful
5425 * background saving in order to perform non-blocking synchronization. */
5426 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5427 listNode *ln;
5428 int startbgsave = 0;
5429
5430 listRewind(server.slaves);
5431 while((ln = listYield(server.slaves))) {
5432 redisClient *slave = ln->value;
5433
5434 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5435 startbgsave = 1;
5436 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5437 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5438 struct redis_stat buf;
5439
5440 if (bgsaveerr != REDIS_OK) {
5441 freeClient(slave);
5442 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5443 continue;
5444 }
5445 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5446 redis_fstat(slave->repldbfd,&buf) == -1) {
5447 freeClient(slave);
5448 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5449 continue;
5450 }
5451 slave->repldboff = 0;
5452 slave->repldbsize = buf.st_size;
5453 slave->replstate = REDIS_REPL_SEND_BULK;
5454 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5455 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5456 freeClient(slave);
5457 continue;
5458 }
5459 }
5460 }
5461 if (startbgsave) {
5462 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5463 listRewind(server.slaves);
5464 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5465 while((ln = listYield(server.slaves))) {
5466 redisClient *slave = ln->value;
5467
5468 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5469 freeClient(slave);
5470 }
5471 }
5472 }
5473 }
5474
5475 static int syncWithMaster(void) {
5476 char buf[1024], tmpfile[256], authcmd[1024];
5477 int dumpsize;
5478 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5479 int dfd;
5480
5481 if (fd == -1) {
5482 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5483 strerror(errno));
5484 return REDIS_ERR;
5485 }
5486
5487 /* AUTH with the master if required. */
5488 if(server.masterauth) {
5489 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5490 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5491 close(fd);
5492 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5493 strerror(errno));
5494 return REDIS_ERR;
5495 }
5496 /* Read the AUTH result. */
5497 if (syncReadLine(fd,buf,1024,3600) == -1) {
5498 close(fd);
5499 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5500 strerror(errno));
5501 return REDIS_ERR;
5502 }
5503 if (buf[0] != '+') {
5504 close(fd);
5505 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5506 return REDIS_ERR;
5507 }
5508 }
5509
5510 /* Issue the SYNC command */
5511 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5512 close(fd);
5513 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5514 strerror(errno));
5515 return REDIS_ERR;
5516 }
5517 /* Read the bulk write count */
5518 if (syncReadLine(fd,buf,1024,3600) == -1) {
5519 close(fd);
5520 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5521 strerror(errno));
5522 return REDIS_ERR;
5523 }
5524 if (buf[0] != '$') {
5525 close(fd);
5526 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5527 return REDIS_ERR;
5528 }
5529 dumpsize = atoi(buf+1);
5530 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5531 /* Read the bulk write data on a temp file */
5532 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5533 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5534 if (dfd == -1) {
5535 close(fd);
5536 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5537 return REDIS_ERR;
5538 }
5539 while(dumpsize) {
5540 int nread, nwritten;
5541
5542 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5543 if (nread == -1) {
5544 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5545 strerror(errno));
5546 close(fd);
5547 close(dfd);
5548 return REDIS_ERR;
5549 }
5550 nwritten = write(dfd,buf,nread);
5551 if (nwritten == -1) {
5552 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5553 close(fd);
5554 close(dfd);
5555 return REDIS_ERR;
5556 }
5557 dumpsize -= nread;
5558 }
5559 close(dfd);
5560 if (rename(tmpfile,server.dbfilename) == -1) {
5561 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5562 unlink(tmpfile);
5563 close(fd);
5564 return REDIS_ERR;
5565 }
5566 emptyDb();
5567 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5568 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5569 close(fd);
5570 return REDIS_ERR;
5571 }
5572 server.master = createClient(fd);
5573 server.master->flags |= REDIS_MASTER;
5574 server.replstate = REDIS_REPL_CONNECTED;
5575 return REDIS_OK;
5576 }
5577
5578 static void slaveofCommand(redisClient *c) {
5579 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5580 !strcasecmp(c->argv[2]->ptr,"one")) {
5581 if (server.masterhost) {
5582 sdsfree(server.masterhost);
5583 server.masterhost = NULL;
5584 if (server.master) freeClient(server.master);
5585 server.replstate = REDIS_REPL_NONE;
5586 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5587 }
5588 } else {
5589 sdsfree(server.masterhost);
5590 server.masterhost = sdsdup(c->argv[1]->ptr);
5591 server.masterport = atoi(c->argv[2]->ptr);
5592 if (server.master) freeClient(server.master);
5593 server.replstate = REDIS_REPL_CONNECT;
5594 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5595 server.masterhost, server.masterport);
5596 }
5597 addReply(c,shared.ok);
5598 }
5599
5600 /* ============================ Maxmemory directive ======================== */
5601
5602 /* This function gets called when 'maxmemory' is set on the config file to limit
5603 * the max memory used by the server, and we are out of memory.
5604 * This function will try to, in order:
5605 *
5606 * - Free objects from the free list
5607 * - Try to remove keys with an EXPIRE set
5608 *
5609 * It is not possible to free enough memory to reach used-memory < maxmemory
5610 * the server will start refusing commands that will enlarge even more the
5611 * memory usage.
5612 */
5613 static void freeMemoryIfNeeded(void) {
5614 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5615 if (listLength(server.objfreelist)) {
5616 robj *o;
5617
5618 listNode *head = listFirst(server.objfreelist);
5619 o = listNodeValue(head);
5620 listDelNode(server.objfreelist,head);
5621 zfree(o);
5622 } else {
5623 int j, k, freed = 0;
5624
5625 for (j = 0; j < server.dbnum; j++) {
5626 int minttl = -1;
5627 robj *minkey = NULL;
5628 struct dictEntry *de;
5629
5630 if (dictSize(server.db[j].expires)) {
5631 freed = 1;
5632 /* From a sample of three keys drop the one nearest to
5633 * the natural expire */
5634 for (k = 0; k < 3; k++) {
5635 time_t t;
5636
5637 de = dictGetRandomKey(server.db[j].expires);
5638 t = (time_t) dictGetEntryVal(de);
5639 if (minttl == -1 || t < minttl) {
5640 minkey = dictGetEntryKey(de);
5641 minttl = t;
5642 }
5643 }
5644 deleteKey(server.db+j,minkey);
5645 }
5646 }
5647 if (!freed) return; /* nothing to free... */
5648 }
5649 }
5650 }
5651
5652 /* ============================== Append Only file ========================== */
5653
5654 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5655 sds buf = sdsempty();
5656 int j;
5657 ssize_t nwritten;
5658 time_t now;
5659 robj *tmpargv[3];
5660
5661 /* The DB this command was targetting is not the same as the last command
5662 * we appendend. To issue a SELECT command is needed. */
5663 if (dictid != server.appendseldb) {
5664 char seldb[64];
5665
5666 snprintf(seldb,sizeof(seldb),"%d",dictid);
5667 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
5668 strlen(seldb),seldb);
5669 server.appendseldb = dictid;
5670 }
5671
5672 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5673 * EXPIREs into EXPIREATs calls */
5674 if (cmd->proc == expireCommand) {
5675 long when;
5676
5677 tmpargv[0] = createStringObject("EXPIREAT",8);
5678 tmpargv[1] = argv[1];
5679 incrRefCount(argv[1]);
5680 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5681 tmpargv[2] = createObject(REDIS_STRING,
5682 sdscatprintf(sdsempty(),"%ld",when));
5683 argv = tmpargv;
5684 }
5685
5686 /* Append the actual command */
5687 buf = sdscatprintf(buf,"*%d\r\n",argc);
5688 for (j = 0; j < argc; j++) {
5689 robj *o = argv[j];
5690
5691 o = getDecodedObject(o);
5692 buf = sdscatprintf(buf,"$%lu\r\n",sdslen(o->ptr));
5693 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5694 buf = sdscatlen(buf,"\r\n",2);
5695 decrRefCount(o);
5696 }
5697
5698 /* Free the objects from the modified argv for EXPIREAT */
5699 if (cmd->proc == expireCommand) {
5700 for (j = 0; j < 3; j++)
5701 decrRefCount(argv[j]);
5702 }
5703
5704 /* We want to perform a single write. This should be guaranteed atomic
5705 * at least if the filesystem we are writing is a real physical one.
5706 * While this will save us against the server being killed I don't think
5707 * there is much to do about the whole server stopping for power problems
5708 * or alike */
5709 nwritten = write(server.appendfd,buf,sdslen(buf));
5710 if (nwritten != (signed)sdslen(buf)) {
5711 /* Ooops, we are in troubles. The best thing to do for now is
5712 * to simply exit instead to give the illusion that everything is
5713 * working as expected. */
5714 if (nwritten == -1) {
5715 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5716 } else {
5717 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5718 }
5719 exit(1);
5720 }
5721 /* If a background append only file rewriting is in progress we want to
5722 * accumulate the differences between the child DB and the current one
5723 * in a buffer, so that when the child process will do its work we
5724 * can append the differences to the new append only file. */
5725 if (server.bgrewritechildpid != -1)
5726 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
5727
5728 sdsfree(buf);
5729 now = time(NULL);
5730 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5731 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5732 now-server.lastfsync > 1))
5733 {
5734 fsync(server.appendfd); /* Let's try to get this data on the disk */
5735 server.lastfsync = now;
5736 }
5737 }
5738
5739 /* In Redis commands are always executed in the context of a client, so in
5740 * order to load the append only file we need to create a fake client. */
5741 static struct redisClient *createFakeClient(void) {
5742 struct redisClient *c = zmalloc(sizeof(*c));
5743
5744 selectDb(c,0);
5745 c->fd = -1;
5746 c->querybuf = sdsempty();
5747 c->argc = 0;
5748 c->argv = NULL;
5749 c->flags = 0;
5750 /* We set the fake client as a slave waiting for the synchronization
5751 * so that Redis will not try to send replies to this client. */
5752 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5753 c->reply = listCreate();
5754 listSetFreeMethod(c->reply,decrRefCount);
5755 listSetDupMethod(c->reply,dupClientReplyValue);
5756 return c;
5757 }
5758
5759 static void freeFakeClient(struct redisClient *c) {
5760 sdsfree(c->querybuf);
5761 listRelease(c->reply);
5762 zfree(c);
5763 }
5764
5765 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5766 * error (the append only file is zero-length) REDIS_ERR is returned. On
5767 * fatal error an error message is logged and the program exists. */
5768 int loadAppendOnlyFile(char *filename) {
5769 struct redisClient *fakeClient;
5770 FILE *fp = fopen(filename,"r");
5771 struct redis_stat sb;
5772
5773 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5774 return REDIS_ERR;
5775
5776 if (fp == NULL) {
5777 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5778 exit(1);
5779 }
5780
5781 fakeClient = createFakeClient();
5782 while(1) {
5783 int argc, j;
5784 unsigned long len;
5785 robj **argv;
5786 char buf[128];
5787 sds argsds;
5788 struct redisCommand *cmd;
5789
5790 if (fgets(buf,sizeof(buf),fp) == NULL) {
5791 if (feof(fp))
5792 break;
5793 else
5794 goto readerr;
5795 }
5796 if (buf[0] != '*') goto fmterr;
5797 argc = atoi(buf+1);
5798 argv = zmalloc(sizeof(robj*)*argc);
5799 for (j = 0; j < argc; j++) {
5800 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5801 if (buf[0] != '$') goto fmterr;
5802 len = strtol(buf+1,NULL,10);
5803 argsds = sdsnewlen(NULL,len);
5804 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
5805 argv[j] = createObject(REDIS_STRING,argsds);
5806 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5807 }
5808
5809 /* Command lookup */
5810 cmd = lookupCommand(argv[0]->ptr);
5811 if (!cmd) {
5812 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5813 exit(1);
5814 }
5815 /* Try object sharing and encoding */
5816 if (server.shareobjects) {
5817 int j;
5818 for(j = 1; j < argc; j++)
5819 argv[j] = tryObjectSharing(argv[j]);
5820 }
5821 if (cmd->flags & REDIS_CMD_BULK)
5822 tryObjectEncoding(argv[argc-1]);
5823 /* Run the command in the context of a fake client */
5824 fakeClient->argc = argc;
5825 fakeClient->argv = argv;
5826 cmd->proc(fakeClient);
5827 /* Discard the reply objects list from the fake client */
5828 while(listLength(fakeClient->reply))
5829 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5830 /* Clean up, ready for the next command */
5831 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5832 zfree(argv);
5833 }
5834 fclose(fp);
5835 freeFakeClient(fakeClient);
5836 return REDIS_OK;
5837
5838 readerr:
5839 if (feof(fp)) {
5840 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5841 } else {
5842 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5843 }
5844 exit(1);
5845 fmterr:
5846 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5847 exit(1);
5848 }
5849
5850 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5851 static int fwriteBulk(FILE *fp, robj *obj) {
5852 char buf[128];
5853 obj = getDecodedObject(obj);
5854 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
5855 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
5856 if (fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0) goto err;
5857 if (fwrite("\r\n",2,1,fp) == 0) goto err;
5858 decrRefCount(obj);
5859 return 1;
5860 err:
5861 decrRefCount(obj);
5862 return 0;
5863 }
5864
5865 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5866 static int fwriteBulkDouble(FILE *fp, double d) {
5867 char buf[128], dbuf[128];
5868
5869 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
5870 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
5871 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5872 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
5873 return 1;
5874 }
5875
5876 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5877 static int fwriteBulkLong(FILE *fp, long l) {
5878 char buf[128], lbuf[128];
5879
5880 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
5881 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
5882 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5883 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
5884 return 1;
5885 }
5886
5887 /* Write a sequence of commands able to fully rebuild the dataset into
5888 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5889 static int rewriteAppendOnlyFile(char *filename) {
5890 dictIterator *di = NULL;
5891 dictEntry *de;
5892 FILE *fp;
5893 char tmpfile[256];
5894 int j;
5895 time_t now = time(NULL);
5896
5897 /* Note that we have to use a different temp name here compared to the
5898 * one used by rewriteAppendOnlyFileBackground() function. */
5899 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
5900 fp = fopen(tmpfile,"w");
5901 if (!fp) {
5902 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
5903 return REDIS_ERR;
5904 }
5905 for (j = 0; j < server.dbnum; j++) {
5906 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
5907 redisDb *db = server.db+j;
5908 dict *d = db->dict;
5909 if (dictSize(d) == 0) continue;
5910 di = dictGetIterator(d);
5911 if (!di) {
5912 fclose(fp);
5913 return REDIS_ERR;
5914 }
5915
5916 /* SELECT the new DB */
5917 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
5918 if (fwriteBulkLong(fp,j) == 0) goto werr;
5919
5920 /* Iterate this DB writing every entry */
5921 while((de = dictNext(di)) != NULL) {
5922 robj *key = dictGetEntryKey(de);
5923 robj *o = dictGetEntryVal(de);
5924 time_t expiretime = getExpire(db,key);
5925
5926 /* Save the key and associated value */
5927 if (o->type == REDIS_STRING) {
5928 /* Emit a SET command */
5929 char cmd[]="*3\r\n$3\r\nSET\r\n";
5930 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5931 /* Key and value */
5932 if (fwriteBulk(fp,key) == 0) goto werr;
5933 if (fwriteBulk(fp,o) == 0) goto werr;
5934 } else if (o->type == REDIS_LIST) {
5935 /* Emit the RPUSHes needed to rebuild the list */
5936 list *list = o->ptr;
5937 listNode *ln;
5938
5939 listRewind(list);
5940 while((ln = listYield(list))) {
5941 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
5942 robj *eleobj = listNodeValue(ln);
5943
5944 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5945 if (fwriteBulk(fp,key) == 0) goto werr;
5946 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5947 }
5948 } else if (o->type == REDIS_SET) {
5949 /* Emit the SADDs needed to rebuild the set */
5950 dict *set = o->ptr;
5951 dictIterator *di = dictGetIterator(set);
5952 dictEntry *de;
5953
5954 while((de = dictNext(di)) != NULL) {
5955 char cmd[]="*3\r\n$4\r\nSADD\r\n";
5956 robj *eleobj = dictGetEntryKey(de);
5957
5958 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5959 if (fwriteBulk(fp,key) == 0) goto werr;
5960 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5961 }
5962 dictReleaseIterator(di);
5963 } else if (o->type == REDIS_ZSET) {
5964 /* Emit the ZADDs needed to rebuild the sorted set */
5965 zset *zs = o->ptr;
5966 dictIterator *di = dictGetIterator(zs->dict);
5967 dictEntry *de;
5968
5969 while((de = dictNext(di)) != NULL) {
5970 char cmd[]="*4\r\n$4\r\nZADD\r\n";
5971 robj *eleobj = dictGetEntryKey(de);
5972 double *score = dictGetEntryVal(de);
5973
5974 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5975 if (fwriteBulk(fp,key) == 0) goto werr;
5976 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
5977 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5978 }
5979 dictReleaseIterator(di);
5980 } else {
5981 redisAssert(0 != 0);
5982 }
5983 /* Save the expire time */
5984 if (expiretime != -1) {
5985 char cmd[]="*3\r\n$6\r\nEXPIRE\r\n";
5986 /* If this key is already expired skip it */
5987 if (expiretime < now) continue;
5988 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5989 if (fwriteBulk(fp,key) == 0) goto werr;
5990 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
5991 }
5992 }
5993 dictReleaseIterator(di);
5994 }
5995
5996 /* Make sure data will not remain on the OS's output buffers */
5997 fflush(fp);
5998 fsync(fileno(fp));
5999 fclose(fp);
6000
6001 /* Use RENAME to make sure the DB file is changed atomically only
6002 * if the generate DB file is ok. */
6003 if (rename(tmpfile,filename) == -1) {
6004 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6005 unlink(tmpfile);
6006 return REDIS_ERR;
6007 }
6008 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6009 return REDIS_OK;
6010
6011 werr:
6012 fclose(fp);
6013 unlink(tmpfile);
6014 redisLog(REDIS_WARNING,"Write error writing append only fileon disk: %s", strerror(errno));
6015 if (di) dictReleaseIterator(di);
6016 return REDIS_ERR;
6017 }
6018
6019 /* This is how rewriting of the append only file in background works:
6020 *
6021 * 1) The user calls BGREWRITEAOF
6022 * 2) Redis calls this function, that forks():
6023 * 2a) the child rewrite the append only file in a temp file.
6024 * 2b) the parent accumulates differences in server.bgrewritebuf.
6025 * 3) When the child finished '2a' exists.
6026 * 4) The parent will trap the exit code, if it's OK, will append the
6027 * data accumulated into server.bgrewritebuf into the temp file, and
6028 * finally will rename(2) the temp file in the actual file name.
6029 * The the new file is reopened as the new append only file. Profit!
6030 */
6031 static int rewriteAppendOnlyFileBackground(void) {
6032 pid_t childpid;
6033
6034 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6035 if ((childpid = fork()) == 0) {
6036 /* Child */
6037 char tmpfile[256];
6038 close(server.fd);
6039
6040 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6041 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6042 exit(0);
6043 } else {
6044 exit(1);
6045 }
6046 } else {
6047 /* Parent */
6048 if (childpid == -1) {
6049 redisLog(REDIS_WARNING,
6050 "Can't rewrite append only file in background: fork: %s",
6051 strerror(errno));
6052 return REDIS_ERR;
6053 }
6054 redisLog(REDIS_NOTICE,
6055 "Background append only file rewriting started by pid %d",childpid);
6056 server.bgrewritechildpid = childpid;
6057 /* We set appendseldb to -1 in order to force the next call to the
6058 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6059 * accumulated by the parent into server.bgrewritebuf will start
6060 * with a SELECT statement and it will be safe to merge. */
6061 server.appendseldb = -1;
6062 return REDIS_OK;
6063 }
6064 return REDIS_OK; /* unreached */
6065 }
6066
6067 static void bgrewriteaofCommand(redisClient *c) {
6068 if (server.bgrewritechildpid != -1) {
6069 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6070 return;
6071 }
6072 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6073 addReply(c,shared.ok);
6074 } else {
6075 addReply(c,shared.err);
6076 }
6077 }
6078
6079 static void aofRemoveTempFile(pid_t childpid) {
6080 char tmpfile[256];
6081
6082 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6083 unlink(tmpfile);
6084 }
6085
6086 /* ================================= Debugging ============================== */
6087
6088 static void debugCommand(redisClient *c) {
6089 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6090 *((char*)-1) = 'x';
6091 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6092 if (rdbSave(server.dbfilename) != REDIS_OK) {
6093 addReply(c,shared.err);
6094 return;
6095 }
6096 emptyDb();
6097 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6098 addReply(c,shared.err);
6099 return;
6100 }
6101 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6102 addReply(c,shared.ok);
6103 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6104 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6105 robj *key, *val;
6106
6107 if (!de) {
6108 addReply(c,shared.nokeyerr);
6109 return;
6110 }
6111 key = dictGetEntryKey(de);
6112 val = dictGetEntryVal(de);
6113 addReplySds(c,sdscatprintf(sdsempty(),
6114 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6115 (void*)key, key->refcount, (void*)val, val->refcount,
6116 val->encoding));
6117 } else {
6118 addReplySds(c,sdsnew(
6119 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6120 }
6121 }
6122
6123 static void _redisAssert(char *estr) {
6124 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6125 redisLog(REDIS_WARNING,"==> %s\n",estr);
6126 #ifdef HAVE_BACKTRACE
6127 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6128 *((char*)-1) = 'x';
6129 #endif
6130 }
6131
6132 /* =================================== Main! ================================ */
6133
6134 #ifdef __linux__
6135 int linuxOvercommitMemoryValue(void) {
6136 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6137 char buf[64];
6138
6139 if (!fp) return -1;
6140 if (fgets(buf,64,fp) == NULL) {
6141 fclose(fp);
6142 return -1;
6143 }
6144 fclose(fp);
6145
6146 return atoi(buf);
6147 }
6148
6149 void linuxOvercommitMemoryWarning(void) {
6150 if (linuxOvercommitMemoryValue() == 0) {
6151 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6152 }
6153 }
6154 #endif /* __linux__ */
6155
6156 static void daemonize(void) {
6157 int fd;
6158 FILE *fp;
6159
6160 if (fork() != 0) exit(0); /* parent exits */
6161 setsid(); /* create a new session */
6162
6163 /* Every output goes to /dev/null. If Redis is daemonized but
6164 * the 'logfile' is set to 'stdout' in the configuration file
6165 * it will not log at all. */
6166 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6167 dup2(fd, STDIN_FILENO);
6168 dup2(fd, STDOUT_FILENO);
6169 dup2(fd, STDERR_FILENO);
6170 if (fd > STDERR_FILENO) close(fd);
6171 }
6172 /* Try to write the pid file */
6173 fp = fopen(server.pidfile,"w");
6174 if (fp) {
6175 fprintf(fp,"%d\n",getpid());
6176 fclose(fp);
6177 }
6178 }
6179
6180 int main(int argc, char **argv) {
6181 initServerConfig();
6182 if (argc == 2) {
6183 resetServerSaveParams();
6184 loadServerConfig(argv[1]);
6185 } else if (argc > 2) {
6186 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6187 exit(1);
6188 } else {
6189 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6190 }
6191 initServer();
6192 if (server.daemonize) daemonize();
6193 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6194 #ifdef __linux__
6195 linuxOvercommitMemoryWarning();
6196 #endif
6197 if (server.appendonly) {
6198 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6199 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6200 } else {
6201 if (rdbLoad(server.dbfilename) == REDIS_OK)
6202 redisLog(REDIS_NOTICE,"DB loaded from disk");
6203 }
6204 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6205 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6206 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6207 aeMain(server.el);
6208 aeDeleteEventLoop(server.el);
6209 return 0;
6210 }
6211
6212 /* ============================= Backtrace support ========================= */
6213
6214 #ifdef HAVE_BACKTRACE
6215 static char *findFuncName(void *pointer, unsigned long *offset);
6216
6217 static void *getMcontextEip(ucontext_t *uc) {
6218 #if defined(__FreeBSD__)
6219 return (void*) uc->uc_mcontext.mc_eip;
6220 #elif defined(__dietlibc__)
6221 return (void*) uc->uc_mcontext.eip;
6222 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6223 #if __x86_64__
6224 return (void*) uc->uc_mcontext->__ss.__rip;
6225 #else
6226 return (void*) uc->uc_mcontext->__ss.__eip;
6227 #endif
6228 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6229 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6230 return (void*) uc->uc_mcontext->__ss.__rip;
6231 #else
6232 return (void*) uc->uc_mcontext->__ss.__eip;
6233 #endif
6234 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6235 return (void*) uc->uc_mcontext.gregs[REG_EIP];
6236 #elif defined(__ia64__) /* Linux IA64 */
6237 return (void*) uc->uc_mcontext.sc_ip;
6238 #else
6239 return NULL;
6240 #endif
6241 }
6242
6243 static void segvHandler(int sig, siginfo_t *info, void *secret) {
6244 void *trace[100];
6245 char **messages = NULL;
6246 int i, trace_size = 0;
6247 unsigned long offset=0;
6248 ucontext_t *uc = (ucontext_t*) secret;
6249 sds infostring;
6250 REDIS_NOTUSED(info);
6251
6252 redisLog(REDIS_WARNING,
6253 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6254 infostring = genRedisInfoString();
6255 redisLog(REDIS_WARNING, "%s",infostring);
6256 /* It's not safe to sdsfree() the returned string under memory
6257 * corruption conditions. Let it leak as we are going to abort */
6258
6259 trace_size = backtrace(trace, 100);
6260 /* overwrite sigaction with caller's address */
6261 if (getMcontextEip(uc) != NULL) {
6262 trace[1] = getMcontextEip(uc);
6263 }
6264 messages = backtrace_symbols(trace, trace_size);
6265
6266 for (i=1; i<trace_size; ++i) {
6267 char *fn = findFuncName(trace[i], &offset), *p;
6268
6269 p = strchr(messages[i],'+');
6270 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6271 redisLog(REDIS_WARNING,"%s", messages[i]);
6272 } else {
6273 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6274 }
6275 }
6276 // free(messages); Don't call free() with possibly corrupted memory.
6277 exit(0);
6278 }
6279
6280 static void setupSigSegvAction(void) {
6281 struct sigaction act;
6282
6283 sigemptyset (&act.sa_mask);
6284 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6285 * is used. Otherwise, sa_handler is used */
6286 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6287 act.sa_sigaction = segvHandler;
6288 sigaction (SIGSEGV, &act, NULL);
6289 sigaction (SIGBUS, &act, NULL);
6290 sigaction (SIGFPE, &act, NULL);
6291 sigaction (SIGILL, &act, NULL);
6292 sigaction (SIGBUS, &act, NULL);
6293 return;
6294 }
6295
6296 #include "staticsymbols.h"
6297 /* This function try to convert a pointer into a function name. It's used in
6298 * oreder to provide a backtrace under segmentation fault that's able to
6299 * display functions declared as static (otherwise the backtrace is useless). */
6300 static char *findFuncName(void *pointer, unsigned long *offset){
6301 int i, ret = -1;
6302 unsigned long off, minoff = 0;
6303
6304 /* Try to match against the Symbol with the smallest offset */
6305 for (i=0; symsTable[i].pointer; i++) {
6306 unsigned long lp = (unsigned long) pointer;
6307
6308 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6309 off=lp-symsTable[i].pointer;
6310 if (ret < 0 || off < minoff) {
6311 minoff=off;
6312 ret=i;
6313 }
6314 }
6315 }
6316 if (ret == -1) return NULL;
6317 *offset = minoff;
6318 return symsTable[ret].name;
6319 }
6320 #else /* HAVE_BACKTRACE */
6321 static void setupSigSegvAction(void) {
6322 }
6323 #endif /* HAVE_BACKTRACE */
6324
6325
6326
6327 /* The End */
6328
6329
6330