]> git.saurik.com Git - redis.git/blob - redis.c
75490f239f425cb8f04595070bd9cc16becd0243
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.1.90"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Client flags */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
165
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
174
175 /* List related stuff */
176 #define REDIS_HEAD 0
177 #define REDIS_TAIL 1
178
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
184
185 /* Log levels */
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
189
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
192
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
195
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
200
201 /* We can print the stacktrace, so our assert is defined this way: */
202 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
203 static void _redisAssert(char *estr);
204
205 /*================================= Data types ============================== */
206
207 /* A redis object, that is a type able to hold a string / list / set */
208 typedef struct redisObject {
209 void *ptr;
210 unsigned char type;
211 unsigned char encoding;
212 unsigned char notused[2];
213 int refcount;
214 } robj;
215
216 /* Macro used to initalize a Redis object allocated on the stack.
217 * Note that this macro is taken near the structure definition to make sure
218 * we'll update it when the structure is changed, to avoid bugs like
219 * bug #85 introduced exactly in this way. */
220 #define initStaticStringObject(_var,_ptr) do { \
221 _var.refcount = 1; \
222 _var.type = REDIS_STRING; \
223 _var.encoding = REDIS_ENCODING_RAW; \
224 _var.ptr = _ptr; \
225 } while(0);
226
227 typedef struct redisDb {
228 dict *dict;
229 dict *expires;
230 int id;
231 } redisDb;
232
233 /* With multiplexing we need to take per-clinet state.
234 * Clients are taken in a liked list. */
235 typedef struct redisClient {
236 int fd;
237 redisDb *db;
238 int dictid;
239 sds querybuf;
240 robj **argv, **mbargv;
241 int argc, mbargc;
242 int bulklen; /* bulk read len. -1 if not in bulk read mode */
243 int multibulk; /* multi bulk command format active */
244 list *reply;
245 int sentlen;
246 time_t lastinteraction; /* time of the last interaction, used for timeout */
247 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
248 int slaveseldb; /* slave selected db, if this client is a slave */
249 int authenticated; /* when requirepass is non-NULL */
250 int replstate; /* replication state if this is a slave */
251 int repldbfd; /* replication DB file descriptor */
252 long repldboff; /* replication DB file offset */
253 off_t repldbsize; /* replication DB file size */
254 } redisClient;
255
256 struct saveparam {
257 time_t seconds;
258 int changes;
259 };
260
261 /* Global server state structure */
262 struct redisServer {
263 int port;
264 int fd;
265 redisDb *db;
266 dict *sharingpool;
267 unsigned int sharingpoolsize;
268 long long dirty; /* changes to DB from the last save */
269 list *clients;
270 list *slaves, *monitors;
271 char neterr[ANET_ERR_LEN];
272 aeEventLoop *el;
273 int cronloops; /* number of times the cron function run */
274 list *objfreelist; /* A list of freed objects to avoid malloc() */
275 time_t lastsave; /* Unix time of last save succeeede */
276 size_t usedmemory; /* Used memory in megabytes */
277 /* Fields used only for stats */
278 time_t stat_starttime; /* server start time */
279 long long stat_numcommands; /* number of processed commands */
280 long long stat_numconnections; /* number of connections received */
281 /* Configuration */
282 int verbosity;
283 int glueoutputbuf;
284 int maxidletime;
285 int dbnum;
286 int daemonize;
287 int appendonly;
288 int appendfsync;
289 time_t lastfsync;
290 int appendfd;
291 int appendseldb;
292 char *pidfile;
293 pid_t bgsavechildpid;
294 pid_t bgrewritechildpid;
295 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
296 struct saveparam *saveparams;
297 int saveparamslen;
298 char *logfile;
299 char *bindaddr;
300 char *dbfilename;
301 char *appendfilename;
302 char *requirepass;
303 int shareobjects;
304 /* Replication related */
305 int isslave;
306 char *masterauth;
307 char *masterhost;
308 int masterport;
309 redisClient *master; /* client that is master for this slave */
310 int replstate;
311 unsigned int maxclients;
312 unsigned long maxmemory;
313 /* Sort parameters - qsort_r() is only available under BSD so we
314 * have to take this state global, in order to pass it to sortCompare() */
315 int sort_desc;
316 int sort_alpha;
317 int sort_bypattern;
318 };
319
320 typedef void redisCommandProc(redisClient *c);
321 struct redisCommand {
322 char *name;
323 redisCommandProc *proc;
324 int arity;
325 int flags;
326 };
327
328 struct redisFunctionSym {
329 char *name;
330 unsigned long pointer;
331 };
332
333 typedef struct _redisSortObject {
334 robj *obj;
335 union {
336 double score;
337 robj *cmpobj;
338 } u;
339 } redisSortObject;
340
341 typedef struct _redisSortOperation {
342 int type;
343 robj *pattern;
344 } redisSortOperation;
345
346 /* ZSETs use a specialized version of Skiplists */
347
348 typedef struct zskiplistNode {
349 struct zskiplistNode **forward;
350 struct zskiplistNode *backward;
351 double score;
352 robj *obj;
353 } zskiplistNode;
354
355 typedef struct zskiplist {
356 struct zskiplistNode *header, *tail;
357 unsigned long length;
358 int level;
359 } zskiplist;
360
361 typedef struct zset {
362 dict *dict;
363 zskiplist *zsl;
364 } zset;
365
366 /* Our shared "common" objects */
367
368 struct sharedObjectsStruct {
369 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
370 *colon, *nullbulk, *nullmultibulk,
371 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
372 *outofrangeerr, *plus,
373 *select0, *select1, *select2, *select3, *select4,
374 *select5, *select6, *select7, *select8, *select9;
375 } shared;
376
377 /* Global vars that are actally used as constants. The following double
378 * values are used for double on-disk serialization, and are initialized
379 * at runtime to avoid strange compiler optimizations. */
380
381 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
382
383 /*================================ Prototypes =============================== */
384
385 static void freeStringObject(robj *o);
386 static void freeListObject(robj *o);
387 static void freeSetObject(robj *o);
388 static void decrRefCount(void *o);
389 static robj *createObject(int type, void *ptr);
390 static void freeClient(redisClient *c);
391 static int rdbLoad(char *filename);
392 static void addReply(redisClient *c, robj *obj);
393 static void addReplySds(redisClient *c, sds s);
394 static void incrRefCount(robj *o);
395 static int rdbSaveBackground(char *filename);
396 static robj *createStringObject(char *ptr, size_t len);
397 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
398 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
399 static int syncWithMaster(void);
400 static robj *tryObjectSharing(robj *o);
401 static int tryObjectEncoding(robj *o);
402 static robj *getDecodedObject(robj *o);
403 static int removeExpire(redisDb *db, robj *key);
404 static int expireIfNeeded(redisDb *db, robj *key);
405 static int deleteIfVolatile(redisDb *db, robj *key);
406 static int deleteKey(redisDb *db, robj *key);
407 static time_t getExpire(redisDb *db, robj *key);
408 static int setExpire(redisDb *db, robj *key, time_t when);
409 static void updateSlavesWaitingBgsave(int bgsaveerr);
410 static void freeMemoryIfNeeded(void);
411 static int processCommand(redisClient *c);
412 static void setupSigSegvAction(void);
413 static void rdbRemoveTempFile(pid_t childpid);
414 static void aofRemoveTempFile(pid_t childpid);
415 static size_t stringObjectLen(robj *o);
416 static void processInputBuffer(redisClient *c);
417 static zskiplist *zslCreate(void);
418 static void zslFree(zskiplist *zsl);
419 static void zslInsert(zskiplist *zsl, double score, robj *obj);
420 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
421
422 static void authCommand(redisClient *c);
423 static void pingCommand(redisClient *c);
424 static void echoCommand(redisClient *c);
425 static void setCommand(redisClient *c);
426 static void setnxCommand(redisClient *c);
427 static void getCommand(redisClient *c);
428 static void delCommand(redisClient *c);
429 static void existsCommand(redisClient *c);
430 static void incrCommand(redisClient *c);
431 static void decrCommand(redisClient *c);
432 static void incrbyCommand(redisClient *c);
433 static void decrbyCommand(redisClient *c);
434 static void selectCommand(redisClient *c);
435 static void randomkeyCommand(redisClient *c);
436 static void keysCommand(redisClient *c);
437 static void dbsizeCommand(redisClient *c);
438 static void lastsaveCommand(redisClient *c);
439 static void saveCommand(redisClient *c);
440 static void bgsaveCommand(redisClient *c);
441 static void bgrewriteaofCommand(redisClient *c);
442 static void shutdownCommand(redisClient *c);
443 static void moveCommand(redisClient *c);
444 static void renameCommand(redisClient *c);
445 static void renamenxCommand(redisClient *c);
446 static void lpushCommand(redisClient *c);
447 static void rpushCommand(redisClient *c);
448 static void lpopCommand(redisClient *c);
449 static void rpopCommand(redisClient *c);
450 static void llenCommand(redisClient *c);
451 static void lindexCommand(redisClient *c);
452 static void lrangeCommand(redisClient *c);
453 static void ltrimCommand(redisClient *c);
454 static void typeCommand(redisClient *c);
455 static void lsetCommand(redisClient *c);
456 static void saddCommand(redisClient *c);
457 static void sremCommand(redisClient *c);
458 static void smoveCommand(redisClient *c);
459 static void sismemberCommand(redisClient *c);
460 static void scardCommand(redisClient *c);
461 static void spopCommand(redisClient *c);
462 static void srandmemberCommand(redisClient *c);
463 static void sinterCommand(redisClient *c);
464 static void sinterstoreCommand(redisClient *c);
465 static void sunionCommand(redisClient *c);
466 static void sunionstoreCommand(redisClient *c);
467 static void sdiffCommand(redisClient *c);
468 static void sdiffstoreCommand(redisClient *c);
469 static void syncCommand(redisClient *c);
470 static void flushdbCommand(redisClient *c);
471 static void flushallCommand(redisClient *c);
472 static void sortCommand(redisClient *c);
473 static void lremCommand(redisClient *c);
474 static void rpoplpushcommand(redisClient *c);
475 static void infoCommand(redisClient *c);
476 static void mgetCommand(redisClient *c);
477 static void monitorCommand(redisClient *c);
478 static void expireCommand(redisClient *c);
479 static void expireatCommand(redisClient *c);
480 static void getsetCommand(redisClient *c);
481 static void ttlCommand(redisClient *c);
482 static void slaveofCommand(redisClient *c);
483 static void debugCommand(redisClient *c);
484 static void msetCommand(redisClient *c);
485 static void msetnxCommand(redisClient *c);
486 static void zaddCommand(redisClient *c);
487 static void zincrbyCommand(redisClient *c);
488 static void zrangeCommand(redisClient *c);
489 static void zrangebyscoreCommand(redisClient *c);
490 static void zrevrangeCommand(redisClient *c);
491 static void zcardCommand(redisClient *c);
492 static void zremCommand(redisClient *c);
493 static void zscoreCommand(redisClient *c);
494 static void zremrangebyscoreCommand(redisClient *c);
495
496 /*================================= Globals ================================= */
497
498 /* Global vars */
499 static struct redisServer server; /* server global state */
500 static struct redisCommand cmdTable[] = {
501 {"get",getCommand,2,REDIS_CMD_INLINE},
502 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
503 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
504 {"del",delCommand,-2,REDIS_CMD_INLINE},
505 {"exists",existsCommand,2,REDIS_CMD_INLINE},
506 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
507 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
508 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
509 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
510 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
511 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
512 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
513 {"llen",llenCommand,2,REDIS_CMD_INLINE},
514 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
515 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
516 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
517 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
518 {"lrem",lremCommand,4,REDIS_CMD_BULK},
519 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
520 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
521 {"srem",sremCommand,3,REDIS_CMD_BULK},
522 {"smove",smoveCommand,4,REDIS_CMD_BULK},
523 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
524 {"scard",scardCommand,2,REDIS_CMD_INLINE},
525 {"spop",spopCommand,2,REDIS_CMD_INLINE},
526 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
527 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
528 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
529 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
530 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
531 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
532 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
533 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
534 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
535 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
536 {"zrem",zremCommand,3,REDIS_CMD_BULK},
537 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
538 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
539 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
540 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
541 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
542 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
543 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
544 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
545 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
546 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
547 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
548 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
549 {"select",selectCommand,2,REDIS_CMD_INLINE},
550 {"move",moveCommand,3,REDIS_CMD_INLINE},
551 {"rename",renameCommand,3,REDIS_CMD_INLINE},
552 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
553 {"expire",expireCommand,3,REDIS_CMD_INLINE},
554 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
555 {"keys",keysCommand,2,REDIS_CMD_INLINE},
556 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
557 {"auth",authCommand,2,REDIS_CMD_INLINE},
558 {"ping",pingCommand,1,REDIS_CMD_INLINE},
559 {"echo",echoCommand,2,REDIS_CMD_BULK},
560 {"save",saveCommand,1,REDIS_CMD_INLINE},
561 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
562 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
563 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
564 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
565 {"type",typeCommand,2,REDIS_CMD_INLINE},
566 {"sync",syncCommand,1,REDIS_CMD_INLINE},
567 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
568 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
569 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
570 {"info",infoCommand,1,REDIS_CMD_INLINE},
571 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
572 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
573 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
574 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
575 {NULL,NULL,0,0}
576 };
577
578 /*============================ Utility functions ============================ */
579
580 /* Glob-style pattern matching. */
581 int stringmatchlen(const char *pattern, int patternLen,
582 const char *string, int stringLen, int nocase)
583 {
584 while(patternLen) {
585 switch(pattern[0]) {
586 case '*':
587 while (pattern[1] == '*') {
588 pattern++;
589 patternLen--;
590 }
591 if (patternLen == 1)
592 return 1; /* match */
593 while(stringLen) {
594 if (stringmatchlen(pattern+1, patternLen-1,
595 string, stringLen, nocase))
596 return 1; /* match */
597 string++;
598 stringLen--;
599 }
600 return 0; /* no match */
601 break;
602 case '?':
603 if (stringLen == 0)
604 return 0; /* no match */
605 string++;
606 stringLen--;
607 break;
608 case '[':
609 {
610 int not, match;
611
612 pattern++;
613 patternLen--;
614 not = pattern[0] == '^';
615 if (not) {
616 pattern++;
617 patternLen--;
618 }
619 match = 0;
620 while(1) {
621 if (pattern[0] == '\\') {
622 pattern++;
623 patternLen--;
624 if (pattern[0] == string[0])
625 match = 1;
626 } else if (pattern[0] == ']') {
627 break;
628 } else if (patternLen == 0) {
629 pattern--;
630 patternLen++;
631 break;
632 } else if (pattern[1] == '-' && patternLen >= 3) {
633 int start = pattern[0];
634 int end = pattern[2];
635 int c = string[0];
636 if (start > end) {
637 int t = start;
638 start = end;
639 end = t;
640 }
641 if (nocase) {
642 start = tolower(start);
643 end = tolower(end);
644 c = tolower(c);
645 }
646 pattern += 2;
647 patternLen -= 2;
648 if (c >= start && c <= end)
649 match = 1;
650 } else {
651 if (!nocase) {
652 if (pattern[0] == string[0])
653 match = 1;
654 } else {
655 if (tolower((int)pattern[0]) == tolower((int)string[0]))
656 match = 1;
657 }
658 }
659 pattern++;
660 patternLen--;
661 }
662 if (not)
663 match = !match;
664 if (!match)
665 return 0; /* no match */
666 string++;
667 stringLen--;
668 break;
669 }
670 case '\\':
671 if (patternLen >= 2) {
672 pattern++;
673 patternLen--;
674 }
675 /* fall through */
676 default:
677 if (!nocase) {
678 if (pattern[0] != string[0])
679 return 0; /* no match */
680 } else {
681 if (tolower((int)pattern[0]) != tolower((int)string[0]))
682 return 0; /* no match */
683 }
684 string++;
685 stringLen--;
686 break;
687 }
688 pattern++;
689 patternLen--;
690 if (stringLen == 0) {
691 while(*pattern == '*') {
692 pattern++;
693 patternLen--;
694 }
695 break;
696 }
697 }
698 if (patternLen == 0 && stringLen == 0)
699 return 1;
700 return 0;
701 }
702
703 static void redisLog(int level, const char *fmt, ...) {
704 va_list ap;
705 FILE *fp;
706
707 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
708 if (!fp) return;
709
710 va_start(ap, fmt);
711 if (level >= server.verbosity) {
712 char *c = ".-*";
713 char buf[64];
714 time_t now;
715
716 now = time(NULL);
717 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
718 fprintf(fp,"%s %c ",buf,c[level]);
719 vfprintf(fp, fmt, ap);
720 fprintf(fp,"\n");
721 fflush(fp);
722 }
723 va_end(ap);
724
725 if (server.logfile) fclose(fp);
726 }
727
728 /*====================== Hash table type implementation ==================== */
729
730 /* This is an hash table type that uses the SDS dynamic strings libary as
731 * keys and radis objects as values (objects can hold SDS strings,
732 * lists, sets). */
733
734 static void dictVanillaFree(void *privdata, void *val)
735 {
736 DICT_NOTUSED(privdata);
737 zfree(val);
738 }
739
740 static int sdsDictKeyCompare(void *privdata, const void *key1,
741 const void *key2)
742 {
743 int l1,l2;
744 DICT_NOTUSED(privdata);
745
746 l1 = sdslen((sds)key1);
747 l2 = sdslen((sds)key2);
748 if (l1 != l2) return 0;
749 return memcmp(key1, key2, l1) == 0;
750 }
751
752 static void dictRedisObjectDestructor(void *privdata, void *val)
753 {
754 DICT_NOTUSED(privdata);
755
756 decrRefCount(val);
757 }
758
759 static int dictObjKeyCompare(void *privdata, const void *key1,
760 const void *key2)
761 {
762 const robj *o1 = key1, *o2 = key2;
763 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
764 }
765
766 static unsigned int dictObjHash(const void *key) {
767 const robj *o = key;
768 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
769 }
770
771 static int dictEncObjKeyCompare(void *privdata, const void *key1,
772 const void *key2)
773 {
774 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
775 int cmp;
776
777 o1 = getDecodedObject(o1);
778 o2 = getDecodedObject(o2);
779 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
780 decrRefCount(o1);
781 decrRefCount(o2);
782 return cmp;
783 }
784
785 static unsigned int dictEncObjHash(const void *key) {
786 robj *o = (robj*) key;
787
788 o = getDecodedObject(o);
789 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
790 decrRefCount(o);
791 return hash;
792 }
793
794 static dictType setDictType = {
795 dictEncObjHash, /* hash function */
796 NULL, /* key dup */
797 NULL, /* val dup */
798 dictEncObjKeyCompare, /* key compare */
799 dictRedisObjectDestructor, /* key destructor */
800 NULL /* val destructor */
801 };
802
803 static dictType zsetDictType = {
804 dictEncObjHash, /* hash function */
805 NULL, /* key dup */
806 NULL, /* val dup */
807 dictEncObjKeyCompare, /* key compare */
808 dictRedisObjectDestructor, /* key destructor */
809 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
810 };
811
812 static dictType hashDictType = {
813 dictObjHash, /* hash function */
814 NULL, /* key dup */
815 NULL, /* val dup */
816 dictObjKeyCompare, /* key compare */
817 dictRedisObjectDestructor, /* key destructor */
818 dictRedisObjectDestructor /* val destructor */
819 };
820
821 /* ========================= Random utility functions ======================= */
822
823 /* Redis generally does not try to recover from out of memory conditions
824 * when allocating objects or strings, it is not clear if it will be possible
825 * to report this condition to the client since the networking layer itself
826 * is based on heap allocation for send buffers, so we simply abort.
827 * At least the code will be simpler to read... */
828 static void oom(const char *msg) {
829 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
830 sleep(1);
831 abort();
832 }
833
834 /* ====================== Redis server networking stuff ===================== */
835 static void closeTimedoutClients(void) {
836 redisClient *c;
837 listNode *ln;
838 time_t now = time(NULL);
839
840 listRewind(server.clients);
841 while ((ln = listYield(server.clients)) != NULL) {
842 c = listNodeValue(ln);
843 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
844 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
845 (now - c->lastinteraction > server.maxidletime)) {
846 redisLog(REDIS_DEBUG,"Closing idle client");
847 freeClient(c);
848 }
849 }
850 }
851
852 static int htNeedsResize(dict *dict) {
853 long long size, used;
854
855 size = dictSlots(dict);
856 used = dictSize(dict);
857 return (size && used && size > DICT_HT_INITIAL_SIZE &&
858 (used*100/size < REDIS_HT_MINFILL));
859 }
860
861 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
862 * we resize the hash table to save memory */
863 static void tryResizeHashTables(void) {
864 int j;
865
866 for (j = 0; j < server.dbnum; j++) {
867 if (htNeedsResize(server.db[j].dict)) {
868 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
869 dictResize(server.db[j].dict);
870 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
871 }
872 if (htNeedsResize(server.db[j].expires))
873 dictResize(server.db[j].expires);
874 }
875 }
876
877 /* A background saving child (BGSAVE) terminated its work. Handle this. */
878 void backgroundSaveDoneHandler(int statloc) {
879 int exitcode = WEXITSTATUS(statloc);
880 int bysignal = WIFSIGNALED(statloc);
881
882 if (!bysignal && exitcode == 0) {
883 redisLog(REDIS_NOTICE,
884 "Background saving terminated with success");
885 server.dirty = 0;
886 server.lastsave = time(NULL);
887 } else if (!bysignal && exitcode != 0) {
888 redisLog(REDIS_WARNING, "Background saving error");
889 } else {
890 redisLog(REDIS_WARNING,
891 "Background saving terminated by signal");
892 rdbRemoveTempFile(server.bgsavechildpid);
893 }
894 server.bgsavechildpid = -1;
895 /* Possibly there are slaves waiting for a BGSAVE in order to be served
896 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
897 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
898 }
899
900 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
901 * Handle this. */
902 void backgroundRewriteDoneHandler(int statloc) {
903 int exitcode = WEXITSTATUS(statloc);
904 int bysignal = WIFSIGNALED(statloc);
905
906 if (!bysignal && exitcode == 0) {
907 int fd;
908 char tmpfile[256];
909
910 redisLog(REDIS_NOTICE,
911 "Background append only file rewriting terminated with success");
912 /* Now it's time to flush the differences accumulated by the parent */
913 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
914 fd = open(tmpfile,O_WRONLY|O_APPEND);
915 if (fd == -1) {
916 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
917 goto cleanup;
918 }
919 /* Flush our data... */
920 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
921 (signed) sdslen(server.bgrewritebuf)) {
922 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
923 close(fd);
924 goto cleanup;
925 }
926 redisLog(REDIS_WARNING,"Parent diff flushed into the new append log file with success");
927 /* Now our work is to rename the temp file into the stable file. And
928 * switch the file descriptor used by the server for append only. */
929 if (rename(tmpfile,server.appendfilename) == -1) {
930 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
931 close(fd);
932 goto cleanup;
933 }
934 /* Mission completed... almost */
935 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
936 if (server.appendfd != -1) {
937 /* If append only is actually enabled... */
938 close(server.appendfd);
939 server.appendfd = fd;
940 fsync(fd);
941 server.appendseldb = -1; /* Make sure it will issue SELECT */
942 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
943 } else {
944 /* If append only is disabled we just generate a dump in this
945 * format. Why not? */
946 close(fd);
947 }
948 } else if (!bysignal && exitcode != 0) {
949 redisLog(REDIS_WARNING, "Background append only file rewriting error");
950 } else {
951 redisLog(REDIS_WARNING,
952 "Background append only file rewriting terminated by signal");
953 }
954 cleanup:
955 sdsfree(server.bgrewritebuf);
956 server.bgrewritebuf = sdsempty();
957 aofRemoveTempFile(server.bgrewritechildpid);
958 server.bgrewritechildpid = -1;
959 }
960
961 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
962 int j, loops = server.cronloops++;
963 REDIS_NOTUSED(eventLoop);
964 REDIS_NOTUSED(id);
965 REDIS_NOTUSED(clientData);
966
967 /* Update the global state with the amount of used memory */
968 server.usedmemory = zmalloc_used_memory();
969
970 /* Show some info about non-empty databases */
971 for (j = 0; j < server.dbnum; j++) {
972 long long size, used, vkeys;
973
974 size = dictSlots(server.db[j].dict);
975 used = dictSize(server.db[j].dict);
976 vkeys = dictSize(server.db[j].expires);
977 if (!(loops % 5) && (used || vkeys)) {
978 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
979 /* dictPrintStats(server.dict); */
980 }
981 }
982
983 /* We don't want to resize the hash tables while a bacground saving
984 * is in progress: the saving child is created using fork() that is
985 * implemented with a copy-on-write semantic in most modern systems, so
986 * if we resize the HT while there is the saving child at work actually
987 * a lot of memory movements in the parent will cause a lot of pages
988 * copied. */
989 if (server.bgsavechildpid == -1) tryResizeHashTables();
990
991 /* Show information about connected clients */
992 if (!(loops % 5)) {
993 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
994 listLength(server.clients)-listLength(server.slaves),
995 listLength(server.slaves),
996 server.usedmemory,
997 dictSize(server.sharingpool));
998 }
999
1000 /* Close connections of timedout clients */
1001 if (server.maxidletime && !(loops % 10))
1002 closeTimedoutClients();
1003
1004 /* Check if a background saving or AOF rewrite in progress terminated */
1005 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
1006 int statloc;
1007 pid_t pid;
1008
1009 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1010 if (pid == server.bgsavechildpid) {
1011 backgroundSaveDoneHandler(statloc);
1012 } else {
1013 backgroundRewriteDoneHandler(statloc);
1014 }
1015 }
1016 } else {
1017 /* If there is not a background saving in progress check if
1018 * we have to save now */
1019 time_t now = time(NULL);
1020 for (j = 0; j < server.saveparamslen; j++) {
1021 struct saveparam *sp = server.saveparams+j;
1022
1023 if (server.dirty >= sp->changes &&
1024 now-server.lastsave > sp->seconds) {
1025 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1026 sp->changes, sp->seconds);
1027 rdbSaveBackground(server.dbfilename);
1028 break;
1029 }
1030 }
1031 }
1032
1033 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1034 * will use few CPU cycles if there are few expiring keys, otherwise
1035 * it will get more aggressive to avoid that too much memory is used by
1036 * keys that can be removed from the keyspace. */
1037 for (j = 0; j < server.dbnum; j++) {
1038 int expired;
1039 redisDb *db = server.db+j;
1040
1041 /* Continue to expire if at the end of the cycle more than 25%
1042 * of the keys were expired. */
1043 do {
1044 int num = dictSize(db->expires);
1045 time_t now = time(NULL);
1046
1047 expired = 0;
1048 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1049 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1050 while (num--) {
1051 dictEntry *de;
1052 time_t t;
1053
1054 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1055 t = (time_t) dictGetEntryVal(de);
1056 if (now > t) {
1057 deleteKey(db,dictGetEntryKey(de));
1058 expired++;
1059 }
1060 }
1061 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1062 }
1063
1064 /* Check if we should connect to a MASTER */
1065 if (server.replstate == REDIS_REPL_CONNECT) {
1066 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1067 if (syncWithMaster() == REDIS_OK) {
1068 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1069 }
1070 }
1071 return 1000;
1072 }
1073
1074 static void createSharedObjects(void) {
1075 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1076 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1077 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1078 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1079 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1080 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1081 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1082 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1083 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1084 /* no such key */
1085 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1086 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1087 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1088 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1089 "-ERR no such key\r\n"));
1090 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1091 "-ERR syntax error\r\n"));
1092 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1093 "-ERR source and destination objects are the same\r\n"));
1094 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1095 "-ERR index out of range\r\n"));
1096 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1097 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1098 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1099 shared.select0 = createStringObject("select 0\r\n",10);
1100 shared.select1 = createStringObject("select 1\r\n",10);
1101 shared.select2 = createStringObject("select 2\r\n",10);
1102 shared.select3 = createStringObject("select 3\r\n",10);
1103 shared.select4 = createStringObject("select 4\r\n",10);
1104 shared.select5 = createStringObject("select 5\r\n",10);
1105 shared.select6 = createStringObject("select 6\r\n",10);
1106 shared.select7 = createStringObject("select 7\r\n",10);
1107 shared.select8 = createStringObject("select 8\r\n",10);
1108 shared.select9 = createStringObject("select 9\r\n",10);
1109 }
1110
1111 static void appendServerSaveParams(time_t seconds, int changes) {
1112 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1113 server.saveparams[server.saveparamslen].seconds = seconds;
1114 server.saveparams[server.saveparamslen].changes = changes;
1115 server.saveparamslen++;
1116 }
1117
1118 static void resetServerSaveParams() {
1119 zfree(server.saveparams);
1120 server.saveparams = NULL;
1121 server.saveparamslen = 0;
1122 }
1123
1124 static void initServerConfig() {
1125 server.dbnum = REDIS_DEFAULT_DBNUM;
1126 server.port = REDIS_SERVERPORT;
1127 server.verbosity = REDIS_DEBUG;
1128 server.maxidletime = REDIS_MAXIDLETIME;
1129 server.saveparams = NULL;
1130 server.logfile = NULL; /* NULL = log on standard output */
1131 server.bindaddr = NULL;
1132 server.glueoutputbuf = 1;
1133 server.daemonize = 0;
1134 server.appendonly = 0;
1135 server.appendfsync = APPENDFSYNC_ALWAYS;
1136 server.lastfsync = time(NULL);
1137 server.appendfd = -1;
1138 server.appendseldb = -1; /* Make sure the first time will not match */
1139 server.pidfile = "/var/run/redis.pid";
1140 server.dbfilename = "dump.rdb";
1141 server.appendfilename = "appendonly.aof";
1142 server.requirepass = NULL;
1143 server.shareobjects = 0;
1144 server.sharingpoolsize = 1024;
1145 server.maxclients = 0;
1146 server.maxmemory = 0;
1147 resetServerSaveParams();
1148
1149 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1150 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1151 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1152 /* Replication related */
1153 server.isslave = 0;
1154 server.masterauth = NULL;
1155 server.masterhost = NULL;
1156 server.masterport = 6379;
1157 server.master = NULL;
1158 server.replstate = REDIS_REPL_NONE;
1159
1160 /* Double constants initialization */
1161 R_Zero = 0.0;
1162 R_PosInf = 1.0/R_Zero;
1163 R_NegInf = -1.0/R_Zero;
1164 R_Nan = R_Zero/R_Zero;
1165 }
1166
1167 static void initServer() {
1168 int j;
1169
1170 signal(SIGHUP, SIG_IGN);
1171 signal(SIGPIPE, SIG_IGN);
1172 setupSigSegvAction();
1173
1174 server.clients = listCreate();
1175 server.slaves = listCreate();
1176 server.monitors = listCreate();
1177 server.objfreelist = listCreate();
1178 createSharedObjects();
1179 server.el = aeCreateEventLoop();
1180 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1181 server.sharingpool = dictCreate(&setDictType,NULL);
1182 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1183 if (server.fd == -1) {
1184 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1185 exit(1);
1186 }
1187 for (j = 0; j < server.dbnum; j++) {
1188 server.db[j].dict = dictCreate(&hashDictType,NULL);
1189 server.db[j].expires = dictCreate(&setDictType,NULL);
1190 server.db[j].id = j;
1191 }
1192 server.cronloops = 0;
1193 server.bgsavechildpid = -1;
1194 server.bgrewritechildpid = -1;
1195 server.bgrewritebuf = sdsempty();
1196 server.lastsave = time(NULL);
1197 server.dirty = 0;
1198 server.usedmemory = 0;
1199 server.stat_numcommands = 0;
1200 server.stat_numconnections = 0;
1201 server.stat_starttime = time(NULL);
1202 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1203
1204 if (server.appendonly) {
1205 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1206 if (server.appendfd == -1) {
1207 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1208 strerror(errno));
1209 exit(1);
1210 }
1211 }
1212 }
1213
1214 /* Empty the whole database */
1215 static long long emptyDb() {
1216 int j;
1217 long long removed = 0;
1218
1219 for (j = 0; j < server.dbnum; j++) {
1220 removed += dictSize(server.db[j].dict);
1221 dictEmpty(server.db[j].dict);
1222 dictEmpty(server.db[j].expires);
1223 }
1224 return removed;
1225 }
1226
1227 static int yesnotoi(char *s) {
1228 if (!strcasecmp(s,"yes")) return 1;
1229 else if (!strcasecmp(s,"no")) return 0;
1230 else return -1;
1231 }
1232
1233 /* I agree, this is a very rudimental way to load a configuration...
1234 will improve later if the config gets more complex */
1235 static void loadServerConfig(char *filename) {
1236 FILE *fp;
1237 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1238 int linenum = 0;
1239 sds line = NULL;
1240
1241 if (filename[0] == '-' && filename[1] == '\0')
1242 fp = stdin;
1243 else {
1244 if ((fp = fopen(filename,"r")) == NULL) {
1245 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1246 exit(1);
1247 }
1248 }
1249
1250 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1251 sds *argv;
1252 int argc, j;
1253
1254 linenum++;
1255 line = sdsnew(buf);
1256 line = sdstrim(line," \t\r\n");
1257
1258 /* Skip comments and blank lines*/
1259 if (line[0] == '#' || line[0] == '\0') {
1260 sdsfree(line);
1261 continue;
1262 }
1263
1264 /* Split into arguments */
1265 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1266 sdstolower(argv[0]);
1267
1268 /* Execute config directives */
1269 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1270 server.maxidletime = atoi(argv[1]);
1271 if (server.maxidletime < 0) {
1272 err = "Invalid timeout value"; goto loaderr;
1273 }
1274 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1275 server.port = atoi(argv[1]);
1276 if (server.port < 1 || server.port > 65535) {
1277 err = "Invalid port"; goto loaderr;
1278 }
1279 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1280 server.bindaddr = zstrdup(argv[1]);
1281 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1282 int seconds = atoi(argv[1]);
1283 int changes = atoi(argv[2]);
1284 if (seconds < 1 || changes < 0) {
1285 err = "Invalid save parameters"; goto loaderr;
1286 }
1287 appendServerSaveParams(seconds,changes);
1288 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1289 if (chdir(argv[1]) == -1) {
1290 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1291 argv[1], strerror(errno));
1292 exit(1);
1293 }
1294 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1295 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1296 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1297 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1298 else {
1299 err = "Invalid log level. Must be one of debug, notice, warning";
1300 goto loaderr;
1301 }
1302 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1303 FILE *logfp;
1304
1305 server.logfile = zstrdup(argv[1]);
1306 if (!strcasecmp(server.logfile,"stdout")) {
1307 zfree(server.logfile);
1308 server.logfile = NULL;
1309 }
1310 if (server.logfile) {
1311 /* Test if we are able to open the file. The server will not
1312 * be able to abort just for this problem later... */
1313 logfp = fopen(server.logfile,"a");
1314 if (logfp == NULL) {
1315 err = sdscatprintf(sdsempty(),
1316 "Can't open the log file: %s", strerror(errno));
1317 goto loaderr;
1318 }
1319 fclose(logfp);
1320 }
1321 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1322 server.dbnum = atoi(argv[1]);
1323 if (server.dbnum < 1) {
1324 err = "Invalid number of databases"; goto loaderr;
1325 }
1326 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1327 server.maxclients = atoi(argv[1]);
1328 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1329 server.maxmemory = strtoll(argv[1], NULL, 10);
1330 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1331 server.masterhost = sdsnew(argv[1]);
1332 server.masterport = atoi(argv[2]);
1333 server.replstate = REDIS_REPL_CONNECT;
1334 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1335 server.masterauth = zstrdup(argv[1]);
1336 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1337 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1338 err = "argument must be 'yes' or 'no'"; goto loaderr;
1339 }
1340 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1341 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1342 err = "argument must be 'yes' or 'no'"; goto loaderr;
1343 }
1344 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1345 server.sharingpoolsize = atoi(argv[1]);
1346 if (server.sharingpoolsize < 1) {
1347 err = "invalid object sharing pool size"; goto loaderr;
1348 }
1349 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1350 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1351 err = "argument must be 'yes' or 'no'"; goto loaderr;
1352 }
1353 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1354 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1355 err = "argument must be 'yes' or 'no'"; goto loaderr;
1356 }
1357 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1358 if (!strcasecmp(argv[1],"no")) {
1359 server.appendfsync = APPENDFSYNC_NO;
1360 } else if (!strcasecmp(argv[1],"always")) {
1361 server.appendfsync = APPENDFSYNC_ALWAYS;
1362 } else if (!strcasecmp(argv[1],"everysec")) {
1363 server.appendfsync = APPENDFSYNC_EVERYSEC;
1364 } else {
1365 err = "argument must be 'no', 'always' or 'everysec'";
1366 goto loaderr;
1367 }
1368 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1369 server.requirepass = zstrdup(argv[1]);
1370 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1371 server.pidfile = zstrdup(argv[1]);
1372 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1373 server.dbfilename = zstrdup(argv[1]);
1374 } else {
1375 err = "Bad directive or wrong number of arguments"; goto loaderr;
1376 }
1377 for (j = 0; j < argc; j++)
1378 sdsfree(argv[j]);
1379 zfree(argv);
1380 sdsfree(line);
1381 }
1382 if (fp != stdin) fclose(fp);
1383 return;
1384
1385 loaderr:
1386 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1387 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1388 fprintf(stderr, ">>> '%s'\n", line);
1389 fprintf(stderr, "%s\n", err);
1390 exit(1);
1391 }
1392
1393 static void freeClientArgv(redisClient *c) {
1394 int j;
1395
1396 for (j = 0; j < c->argc; j++)
1397 decrRefCount(c->argv[j]);
1398 for (j = 0; j < c->mbargc; j++)
1399 decrRefCount(c->mbargv[j]);
1400 c->argc = 0;
1401 c->mbargc = 0;
1402 }
1403
1404 static void freeClient(redisClient *c) {
1405 listNode *ln;
1406
1407 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1408 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1409 sdsfree(c->querybuf);
1410 listRelease(c->reply);
1411 freeClientArgv(c);
1412 close(c->fd);
1413 ln = listSearchKey(server.clients,c);
1414 redisAssert(ln != NULL);
1415 listDelNode(server.clients,ln);
1416 if (c->flags & REDIS_SLAVE) {
1417 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1418 close(c->repldbfd);
1419 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1420 ln = listSearchKey(l,c);
1421 redisAssert(ln != NULL);
1422 listDelNode(l,ln);
1423 }
1424 if (c->flags & REDIS_MASTER) {
1425 server.master = NULL;
1426 server.replstate = REDIS_REPL_CONNECT;
1427 }
1428 zfree(c->argv);
1429 zfree(c->mbargv);
1430 zfree(c);
1431 }
1432
1433 #define GLUEREPLY_UP_TO (1024)
1434 static void glueReplyBuffersIfNeeded(redisClient *c) {
1435 int copylen = 0;
1436 char buf[GLUEREPLY_UP_TO];
1437 listNode *ln;
1438 robj *o;
1439
1440 listRewind(c->reply);
1441 while((ln = listYield(c->reply))) {
1442 int objlen;
1443
1444 o = ln->value;
1445 objlen = sdslen(o->ptr);
1446 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1447 memcpy(buf+copylen,o->ptr,objlen);
1448 copylen += objlen;
1449 listDelNode(c->reply,ln);
1450 } else {
1451 if (copylen == 0) return;
1452 break;
1453 }
1454 }
1455 /* Now the output buffer is empty, add the new single element */
1456 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1457 listAddNodeHead(c->reply,o);
1458 }
1459
1460 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1461 redisClient *c = privdata;
1462 int nwritten = 0, totwritten = 0, objlen;
1463 robj *o;
1464 REDIS_NOTUSED(el);
1465 REDIS_NOTUSED(mask);
1466
1467 /* Use writev() if we have enough buffers to send */
1468 if (!server.glueoutputbuf &&
1469 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1470 !(c->flags & REDIS_MASTER))
1471 {
1472 sendReplyToClientWritev(el, fd, privdata, mask);
1473 return;
1474 }
1475
1476 while(listLength(c->reply)) {
1477 if (server.glueoutputbuf && listLength(c->reply) > 1)
1478 glueReplyBuffersIfNeeded(c);
1479
1480 o = listNodeValue(listFirst(c->reply));
1481 objlen = sdslen(o->ptr);
1482
1483 if (objlen == 0) {
1484 listDelNode(c->reply,listFirst(c->reply));
1485 continue;
1486 }
1487
1488 if (c->flags & REDIS_MASTER) {
1489 /* Don't reply to a master */
1490 nwritten = objlen - c->sentlen;
1491 } else {
1492 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1493 if (nwritten <= 0) break;
1494 }
1495 c->sentlen += nwritten;
1496 totwritten += nwritten;
1497 /* If we fully sent the object on head go to the next one */
1498 if (c->sentlen == objlen) {
1499 listDelNode(c->reply,listFirst(c->reply));
1500 c->sentlen = 0;
1501 }
1502 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1503 * bytes, in a single threaded server it's a good idea to serve
1504 * other clients as well, even if a very large request comes from
1505 * super fast link that is always able to accept data (in real world
1506 * scenario think about 'KEYS *' against the loopback interfae) */
1507 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1508 }
1509 if (nwritten == -1) {
1510 if (errno == EAGAIN) {
1511 nwritten = 0;
1512 } else {
1513 redisLog(REDIS_DEBUG,
1514 "Error writing to client: %s", strerror(errno));
1515 freeClient(c);
1516 return;
1517 }
1518 }
1519 if (totwritten > 0) c->lastinteraction = time(NULL);
1520 if (listLength(c->reply) == 0) {
1521 c->sentlen = 0;
1522 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1523 }
1524 }
1525
1526 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1527 {
1528 redisClient *c = privdata;
1529 int nwritten = 0, totwritten = 0, objlen, willwrite;
1530 robj *o;
1531 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1532 int offset, ion = 0;
1533 REDIS_NOTUSED(el);
1534 REDIS_NOTUSED(mask);
1535
1536 listNode *node;
1537 while (listLength(c->reply)) {
1538 offset = c->sentlen;
1539 ion = 0;
1540 willwrite = 0;
1541
1542 /* fill-in the iov[] array */
1543 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1544 o = listNodeValue(node);
1545 objlen = sdslen(o->ptr);
1546
1547 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1548 break;
1549
1550 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1551 break; /* no more iovecs */
1552
1553 iov[ion].iov_base = ((char*)o->ptr) + offset;
1554 iov[ion].iov_len = objlen - offset;
1555 willwrite += objlen - offset;
1556 offset = 0; /* just for the first item */
1557 ion++;
1558 }
1559
1560 if(willwrite == 0)
1561 break;
1562
1563 /* write all collected blocks at once */
1564 if((nwritten = writev(fd, iov, ion)) < 0) {
1565 if (errno != EAGAIN) {
1566 redisLog(REDIS_DEBUG,
1567 "Error writing to client: %s", strerror(errno));
1568 freeClient(c);
1569 return;
1570 }
1571 break;
1572 }
1573
1574 totwritten += nwritten;
1575 offset = c->sentlen;
1576
1577 /* remove written robjs from c->reply */
1578 while (nwritten && listLength(c->reply)) {
1579 o = listNodeValue(listFirst(c->reply));
1580 objlen = sdslen(o->ptr);
1581
1582 if(nwritten >= objlen - offset) {
1583 listDelNode(c->reply, listFirst(c->reply));
1584 nwritten -= objlen - offset;
1585 c->sentlen = 0;
1586 } else {
1587 /* partial write */
1588 c->sentlen += nwritten;
1589 break;
1590 }
1591 offset = 0;
1592 }
1593 }
1594
1595 if (totwritten > 0)
1596 c->lastinteraction = time(NULL);
1597
1598 if (listLength(c->reply) == 0) {
1599 c->sentlen = 0;
1600 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1601 }
1602 }
1603
1604 static struct redisCommand *lookupCommand(char *name) {
1605 int j = 0;
1606 while(cmdTable[j].name != NULL) {
1607 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1608 j++;
1609 }
1610 return NULL;
1611 }
1612
1613 /* resetClient prepare the client to process the next command */
1614 static void resetClient(redisClient *c) {
1615 freeClientArgv(c);
1616 c->bulklen = -1;
1617 c->multibulk = 0;
1618 }
1619
1620 /* If this function gets called we already read a whole
1621 * command, argments are in the client argv/argc fields.
1622 * processCommand() execute the command or prepare the
1623 * server for a bulk read from the client.
1624 *
1625 * If 1 is returned the client is still alive and valid and
1626 * and other operations can be performed by the caller. Otherwise
1627 * if 0 is returned the client was destroied (i.e. after QUIT). */
1628 static int processCommand(redisClient *c) {
1629 struct redisCommand *cmd;
1630 long long dirty;
1631
1632 /* Free some memory if needed (maxmemory setting) */
1633 if (server.maxmemory) freeMemoryIfNeeded();
1634
1635 /* Handle the multi bulk command type. This is an alternative protocol
1636 * supported by Redis in order to receive commands that are composed of
1637 * multiple binary-safe "bulk" arguments. The latency of processing is
1638 * a bit higher but this allows things like multi-sets, so if this
1639 * protocol is used only for MSET and similar commands this is a big win. */
1640 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1641 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1642 if (c->multibulk <= 0) {
1643 resetClient(c);
1644 return 1;
1645 } else {
1646 decrRefCount(c->argv[c->argc-1]);
1647 c->argc--;
1648 return 1;
1649 }
1650 } else if (c->multibulk) {
1651 if (c->bulklen == -1) {
1652 if (((char*)c->argv[0]->ptr)[0] != '$') {
1653 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1654 resetClient(c);
1655 return 1;
1656 } else {
1657 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1658 decrRefCount(c->argv[0]);
1659 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1660 c->argc--;
1661 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1662 resetClient(c);
1663 return 1;
1664 }
1665 c->argc--;
1666 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1667 return 1;
1668 }
1669 } else {
1670 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1671 c->mbargv[c->mbargc] = c->argv[0];
1672 c->mbargc++;
1673 c->argc--;
1674 c->multibulk--;
1675 if (c->multibulk == 0) {
1676 robj **auxargv;
1677 int auxargc;
1678
1679 /* Here we need to swap the multi-bulk argc/argv with the
1680 * normal argc/argv of the client structure. */
1681 auxargv = c->argv;
1682 c->argv = c->mbargv;
1683 c->mbargv = auxargv;
1684
1685 auxargc = c->argc;
1686 c->argc = c->mbargc;
1687 c->mbargc = auxargc;
1688
1689 /* We need to set bulklen to something different than -1
1690 * in order for the code below to process the command without
1691 * to try to read the last argument of a bulk command as
1692 * a special argument. */
1693 c->bulklen = 0;
1694 /* continue below and process the command */
1695 } else {
1696 c->bulklen = -1;
1697 return 1;
1698 }
1699 }
1700 }
1701 /* -- end of multi bulk commands processing -- */
1702
1703 /* The QUIT command is handled as a special case. Normal command
1704 * procs are unable to close the client connection safely */
1705 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1706 freeClient(c);
1707 return 0;
1708 }
1709 cmd = lookupCommand(c->argv[0]->ptr);
1710 if (!cmd) {
1711 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1712 resetClient(c);
1713 return 1;
1714 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1715 (c->argc < -cmd->arity)) {
1716 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1717 resetClient(c);
1718 return 1;
1719 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1720 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1721 resetClient(c);
1722 return 1;
1723 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1724 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1725
1726 decrRefCount(c->argv[c->argc-1]);
1727 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1728 c->argc--;
1729 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1730 resetClient(c);
1731 return 1;
1732 }
1733 c->argc--;
1734 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1735 /* It is possible that the bulk read is already in the
1736 * buffer. Check this condition and handle it accordingly.
1737 * This is just a fast path, alternative to call processInputBuffer().
1738 * It's a good idea since the code is small and this condition
1739 * happens most of the times. */
1740 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1741 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1742 c->argc++;
1743 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1744 } else {
1745 return 1;
1746 }
1747 }
1748 /* Let's try to share objects on the command arguments vector */
1749 if (server.shareobjects) {
1750 int j;
1751 for(j = 1; j < c->argc; j++)
1752 c->argv[j] = tryObjectSharing(c->argv[j]);
1753 }
1754 /* Let's try to encode the bulk object to save space. */
1755 if (cmd->flags & REDIS_CMD_BULK)
1756 tryObjectEncoding(c->argv[c->argc-1]);
1757
1758 /* Check if the user is authenticated */
1759 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1760 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1761 resetClient(c);
1762 return 1;
1763 }
1764
1765 /* Exec the command */
1766 dirty = server.dirty;
1767 cmd->proc(c);
1768 if (server.appendonly && server.dirty-dirty)
1769 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1770 if (server.dirty-dirty && listLength(server.slaves))
1771 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1772 if (listLength(server.monitors))
1773 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1774 server.stat_numcommands++;
1775
1776 /* Prepare the client for the next command */
1777 if (c->flags & REDIS_CLOSE) {
1778 freeClient(c);
1779 return 0;
1780 }
1781 resetClient(c);
1782 return 1;
1783 }
1784
1785 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1786 listNode *ln;
1787 int outc = 0, j;
1788 robj **outv;
1789 /* (args*2)+1 is enough room for args, spaces, newlines */
1790 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1791
1792 if (argc <= REDIS_STATIC_ARGS) {
1793 outv = static_outv;
1794 } else {
1795 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1796 }
1797
1798 for (j = 0; j < argc; j++) {
1799 if (j != 0) outv[outc++] = shared.space;
1800 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1801 robj *lenobj;
1802
1803 lenobj = createObject(REDIS_STRING,
1804 sdscatprintf(sdsempty(),"%lu\r\n",
1805 stringObjectLen(argv[j])));
1806 lenobj->refcount = 0;
1807 outv[outc++] = lenobj;
1808 }
1809 outv[outc++] = argv[j];
1810 }
1811 outv[outc++] = shared.crlf;
1812
1813 /* Increment all the refcounts at start and decrement at end in order to
1814 * be sure to free objects if there is no slave in a replication state
1815 * able to be feed with commands */
1816 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1817 listRewind(slaves);
1818 while((ln = listYield(slaves))) {
1819 redisClient *slave = ln->value;
1820
1821 /* Don't feed slaves that are still waiting for BGSAVE to start */
1822 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1823
1824 /* Feed all the other slaves, MONITORs and so on */
1825 if (slave->slaveseldb != dictid) {
1826 robj *selectcmd;
1827
1828 switch(dictid) {
1829 case 0: selectcmd = shared.select0; break;
1830 case 1: selectcmd = shared.select1; break;
1831 case 2: selectcmd = shared.select2; break;
1832 case 3: selectcmd = shared.select3; break;
1833 case 4: selectcmd = shared.select4; break;
1834 case 5: selectcmd = shared.select5; break;
1835 case 6: selectcmd = shared.select6; break;
1836 case 7: selectcmd = shared.select7; break;
1837 case 8: selectcmd = shared.select8; break;
1838 case 9: selectcmd = shared.select9; break;
1839 default:
1840 selectcmd = createObject(REDIS_STRING,
1841 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1842 selectcmd->refcount = 0;
1843 break;
1844 }
1845 addReply(slave,selectcmd);
1846 slave->slaveseldb = dictid;
1847 }
1848 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1849 }
1850 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1851 if (outv != static_outv) zfree(outv);
1852 }
1853
1854 static void processInputBuffer(redisClient *c) {
1855 again:
1856 if (c->bulklen == -1) {
1857 /* Read the first line of the query */
1858 char *p = strchr(c->querybuf,'\n');
1859 size_t querylen;
1860
1861 if (p) {
1862 sds query, *argv;
1863 int argc, j;
1864
1865 query = c->querybuf;
1866 c->querybuf = sdsempty();
1867 querylen = 1+(p-(query));
1868 if (sdslen(query) > querylen) {
1869 /* leave data after the first line of the query in the buffer */
1870 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1871 }
1872 *p = '\0'; /* remove "\n" */
1873 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1874 sdsupdatelen(query);
1875
1876 /* Now we can split the query in arguments */
1877 if (sdslen(query) == 0) {
1878 /* Ignore empty query */
1879 sdsfree(query);
1880 return;
1881 }
1882 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1883 sdsfree(query);
1884
1885 if (c->argv) zfree(c->argv);
1886 c->argv = zmalloc(sizeof(robj*)*argc);
1887
1888 for (j = 0; j < argc; j++) {
1889 if (sdslen(argv[j])) {
1890 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1891 c->argc++;
1892 } else {
1893 sdsfree(argv[j]);
1894 }
1895 }
1896 zfree(argv);
1897 /* Execute the command. If the client is still valid
1898 * after processCommand() return and there is something
1899 * on the query buffer try to process the next command. */
1900 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1901 return;
1902 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1903 redisLog(REDIS_DEBUG, "Client protocol error");
1904 freeClient(c);
1905 return;
1906 }
1907 } else {
1908 /* Bulk read handling. Note that if we are at this point
1909 the client already sent a command terminated with a newline,
1910 we are reading the bulk data that is actually the last
1911 argument of the command. */
1912 int qbl = sdslen(c->querybuf);
1913
1914 if (c->bulklen <= qbl) {
1915 /* Copy everything but the final CRLF as final argument */
1916 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1917 c->argc++;
1918 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1919 /* Process the command. If the client is still valid after
1920 * the processing and there is more data in the buffer
1921 * try to parse it. */
1922 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1923 return;
1924 }
1925 }
1926 }
1927
1928 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1929 redisClient *c = (redisClient*) privdata;
1930 char buf[REDIS_IOBUF_LEN];
1931 int nread;
1932 REDIS_NOTUSED(el);
1933 REDIS_NOTUSED(mask);
1934
1935 nread = read(fd, buf, REDIS_IOBUF_LEN);
1936 if (nread == -1) {
1937 if (errno == EAGAIN) {
1938 nread = 0;
1939 } else {
1940 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1941 freeClient(c);
1942 return;
1943 }
1944 } else if (nread == 0) {
1945 redisLog(REDIS_DEBUG, "Client closed connection");
1946 freeClient(c);
1947 return;
1948 }
1949 if (nread) {
1950 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1951 c->lastinteraction = time(NULL);
1952 } else {
1953 return;
1954 }
1955 processInputBuffer(c);
1956 }
1957
1958 static int selectDb(redisClient *c, int id) {
1959 if (id < 0 || id >= server.dbnum)
1960 return REDIS_ERR;
1961 c->db = &server.db[id];
1962 return REDIS_OK;
1963 }
1964
1965 static void *dupClientReplyValue(void *o) {
1966 incrRefCount((robj*)o);
1967 return 0;
1968 }
1969
1970 static redisClient *createClient(int fd) {
1971 redisClient *c = zmalloc(sizeof(*c));
1972
1973 anetNonBlock(NULL,fd);
1974 anetTcpNoDelay(NULL,fd);
1975 if (!c) return NULL;
1976 selectDb(c,0);
1977 c->fd = fd;
1978 c->querybuf = sdsempty();
1979 c->argc = 0;
1980 c->argv = NULL;
1981 c->bulklen = -1;
1982 c->multibulk = 0;
1983 c->mbargc = 0;
1984 c->mbargv = NULL;
1985 c->sentlen = 0;
1986 c->flags = 0;
1987 c->lastinteraction = time(NULL);
1988 c->authenticated = 0;
1989 c->replstate = REDIS_REPL_NONE;
1990 c->reply = listCreate();
1991 listSetFreeMethod(c->reply,decrRefCount);
1992 listSetDupMethod(c->reply,dupClientReplyValue);
1993 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1994 readQueryFromClient, c) == AE_ERR) {
1995 freeClient(c);
1996 return NULL;
1997 }
1998 listAddNodeTail(server.clients,c);
1999 return c;
2000 }
2001
2002 static void addReply(redisClient *c, robj *obj) {
2003 if (listLength(c->reply) == 0 &&
2004 (c->replstate == REDIS_REPL_NONE ||
2005 c->replstate == REDIS_REPL_ONLINE) &&
2006 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
2007 sendReplyToClient, c) == AE_ERR) return;
2008 listAddNodeTail(c->reply,getDecodedObject(obj));
2009 }
2010
2011 static void addReplySds(redisClient *c, sds s) {
2012 robj *o = createObject(REDIS_STRING,s);
2013 addReply(c,o);
2014 decrRefCount(o);
2015 }
2016
2017 static void addReplyDouble(redisClient *c, double d) {
2018 char buf[128];
2019
2020 snprintf(buf,sizeof(buf),"%.17g",d);
2021 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2022 strlen(buf),buf));
2023 }
2024
2025 static void addReplyBulkLen(redisClient *c, robj *obj) {
2026 size_t len;
2027
2028 if (obj->encoding == REDIS_ENCODING_RAW) {
2029 len = sdslen(obj->ptr);
2030 } else {
2031 long n = (long)obj->ptr;
2032
2033 len = 1;
2034 if (n < 0) {
2035 len++;
2036 n = -n;
2037 }
2038 while((n = n/10) != 0) {
2039 len++;
2040 }
2041 }
2042 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",len));
2043 }
2044
2045 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2046 int cport, cfd;
2047 char cip[128];
2048 redisClient *c;
2049 REDIS_NOTUSED(el);
2050 REDIS_NOTUSED(mask);
2051 REDIS_NOTUSED(privdata);
2052
2053 cfd = anetAccept(server.neterr, fd, cip, &cport);
2054 if (cfd == AE_ERR) {
2055 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2056 return;
2057 }
2058 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2059 if ((c = createClient(cfd)) == NULL) {
2060 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2061 close(cfd); /* May be already closed, just ingore errors */
2062 return;
2063 }
2064 /* If maxclient directive is set and this is one client more... close the
2065 * connection. Note that we create the client instead to check before
2066 * for this condition, since now the socket is already set in nonblocking
2067 * mode and we can send an error for free using the Kernel I/O */
2068 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2069 char *err = "-ERR max number of clients reached\r\n";
2070
2071 /* That's a best effort error message, don't check write errors */
2072 if (write(c->fd,err,strlen(err)) == -1) {
2073 /* Nothing to do, Just to avoid the warning... */
2074 }
2075 freeClient(c);
2076 return;
2077 }
2078 server.stat_numconnections++;
2079 }
2080
2081 /* ======================= Redis objects implementation ===================== */
2082
2083 static robj *createObject(int type, void *ptr) {
2084 robj *o;
2085
2086 if (listLength(server.objfreelist)) {
2087 listNode *head = listFirst(server.objfreelist);
2088 o = listNodeValue(head);
2089 listDelNode(server.objfreelist,head);
2090 } else {
2091 o = zmalloc(sizeof(*o));
2092 }
2093 o->type = type;
2094 o->encoding = REDIS_ENCODING_RAW;
2095 o->ptr = ptr;
2096 o->refcount = 1;
2097 return o;
2098 }
2099
2100 static robj *createStringObject(char *ptr, size_t len) {
2101 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2102 }
2103
2104 static robj *createListObject(void) {
2105 list *l = listCreate();
2106
2107 listSetFreeMethod(l,decrRefCount);
2108 return createObject(REDIS_LIST,l);
2109 }
2110
2111 static robj *createSetObject(void) {
2112 dict *d = dictCreate(&setDictType,NULL);
2113 return createObject(REDIS_SET,d);
2114 }
2115
2116 static robj *createZsetObject(void) {
2117 zset *zs = zmalloc(sizeof(*zs));
2118
2119 zs->dict = dictCreate(&zsetDictType,NULL);
2120 zs->zsl = zslCreate();
2121 return createObject(REDIS_ZSET,zs);
2122 }
2123
2124 static void freeStringObject(robj *o) {
2125 if (o->encoding == REDIS_ENCODING_RAW) {
2126 sdsfree(o->ptr);
2127 }
2128 }
2129
2130 static void freeListObject(robj *o) {
2131 listRelease((list*) o->ptr);
2132 }
2133
2134 static void freeSetObject(robj *o) {
2135 dictRelease((dict*) o->ptr);
2136 }
2137
2138 static void freeZsetObject(robj *o) {
2139 zset *zs = o->ptr;
2140
2141 dictRelease(zs->dict);
2142 zslFree(zs->zsl);
2143 zfree(zs);
2144 }
2145
2146 static void freeHashObject(robj *o) {
2147 dictRelease((dict*) o->ptr);
2148 }
2149
2150 static void incrRefCount(robj *o) {
2151 o->refcount++;
2152 #ifdef DEBUG_REFCOUNT
2153 if (o->type == REDIS_STRING)
2154 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2155 #endif
2156 }
2157
2158 static void decrRefCount(void *obj) {
2159 robj *o = obj;
2160
2161 #ifdef DEBUG_REFCOUNT
2162 if (o->type == REDIS_STRING)
2163 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2164 #endif
2165 if (--(o->refcount) == 0) {
2166 switch(o->type) {
2167 case REDIS_STRING: freeStringObject(o); break;
2168 case REDIS_LIST: freeListObject(o); break;
2169 case REDIS_SET: freeSetObject(o); break;
2170 case REDIS_ZSET: freeZsetObject(o); break;
2171 case REDIS_HASH: freeHashObject(o); break;
2172 default: redisAssert(0 != 0); break;
2173 }
2174 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2175 !listAddNodeHead(server.objfreelist,o))
2176 zfree(o);
2177 }
2178 }
2179
2180 static robj *lookupKey(redisDb *db, robj *key) {
2181 dictEntry *de = dictFind(db->dict,key);
2182 return de ? dictGetEntryVal(de) : NULL;
2183 }
2184
2185 static robj *lookupKeyRead(redisDb *db, robj *key) {
2186 expireIfNeeded(db,key);
2187 return lookupKey(db,key);
2188 }
2189
2190 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2191 deleteIfVolatile(db,key);
2192 return lookupKey(db,key);
2193 }
2194
2195 static int deleteKey(redisDb *db, robj *key) {
2196 int retval;
2197
2198 /* We need to protect key from destruction: after the first dictDelete()
2199 * it may happen that 'key' is no longer valid if we don't increment
2200 * it's count. This may happen when we get the object reference directly
2201 * from the hash table with dictRandomKey() or dict iterators */
2202 incrRefCount(key);
2203 if (dictSize(db->expires)) dictDelete(db->expires,key);
2204 retval = dictDelete(db->dict,key);
2205 decrRefCount(key);
2206
2207 return retval == DICT_OK;
2208 }
2209
2210 /* Try to share an object against the shared objects pool */
2211 static robj *tryObjectSharing(robj *o) {
2212 struct dictEntry *de;
2213 unsigned long c;
2214
2215 if (o == NULL || server.shareobjects == 0) return o;
2216
2217 redisAssert(o->type == REDIS_STRING);
2218 de = dictFind(server.sharingpool,o);
2219 if (de) {
2220 robj *shared = dictGetEntryKey(de);
2221
2222 c = ((unsigned long) dictGetEntryVal(de))+1;
2223 dictGetEntryVal(de) = (void*) c;
2224 incrRefCount(shared);
2225 decrRefCount(o);
2226 return shared;
2227 } else {
2228 /* Here we are using a stream algorihtm: Every time an object is
2229 * shared we increment its count, everytime there is a miss we
2230 * recrement the counter of a random object. If this object reaches
2231 * zero we remove the object and put the current object instead. */
2232 if (dictSize(server.sharingpool) >=
2233 server.sharingpoolsize) {
2234 de = dictGetRandomKey(server.sharingpool);
2235 redisAssert(de != NULL);
2236 c = ((unsigned long) dictGetEntryVal(de))-1;
2237 dictGetEntryVal(de) = (void*) c;
2238 if (c == 0) {
2239 dictDelete(server.sharingpool,de->key);
2240 }
2241 } else {
2242 c = 0; /* If the pool is empty we want to add this object */
2243 }
2244 if (c == 0) {
2245 int retval;
2246
2247 retval = dictAdd(server.sharingpool,o,(void*)1);
2248 redisAssert(retval == DICT_OK);
2249 incrRefCount(o);
2250 }
2251 return o;
2252 }
2253 }
2254
2255 /* Check if the nul-terminated string 's' can be represented by a long
2256 * (that is, is a number that fits into long without any other space or
2257 * character before or after the digits).
2258 *
2259 * If so, the function returns REDIS_OK and *longval is set to the value
2260 * of the number. Otherwise REDIS_ERR is returned */
2261 static int isStringRepresentableAsLong(sds s, long *longval) {
2262 char buf[32], *endptr;
2263 long value;
2264 int slen;
2265
2266 value = strtol(s, &endptr, 10);
2267 if (endptr[0] != '\0') return REDIS_ERR;
2268 slen = snprintf(buf,32,"%ld",value);
2269
2270 /* If the number converted back into a string is not identical
2271 * then it's not possible to encode the string as integer */
2272 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2273 if (longval) *longval = value;
2274 return REDIS_OK;
2275 }
2276
2277 /* Try to encode a string object in order to save space */
2278 static int tryObjectEncoding(robj *o) {
2279 long value;
2280 sds s = o->ptr;
2281
2282 if (o->encoding != REDIS_ENCODING_RAW)
2283 return REDIS_ERR; /* Already encoded */
2284
2285 /* It's not save to encode shared objects: shared objects can be shared
2286 * everywhere in the "object space" of Redis. Encoded objects can only
2287 * appear as "values" (and not, for instance, as keys) */
2288 if (o->refcount > 1) return REDIS_ERR;
2289
2290 /* Currently we try to encode only strings */
2291 redisAssert(o->type == REDIS_STRING);
2292
2293 /* Check if we can represent this string as a long integer */
2294 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2295
2296 /* Ok, this object can be encoded */
2297 o->encoding = REDIS_ENCODING_INT;
2298 sdsfree(o->ptr);
2299 o->ptr = (void*) value;
2300 return REDIS_OK;
2301 }
2302
2303 /* Get a decoded version of an encoded object (returned as a new object).
2304 * If the object is already raw-encoded just increment the ref count. */
2305 static robj *getDecodedObject(robj *o) {
2306 robj *dec;
2307
2308 if (o->encoding == REDIS_ENCODING_RAW) {
2309 incrRefCount(o);
2310 return o;
2311 }
2312 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2313 char buf[32];
2314
2315 snprintf(buf,32,"%ld",(long)o->ptr);
2316 dec = createStringObject(buf,strlen(buf));
2317 return dec;
2318 } else {
2319 redisAssert(1 != 1);
2320 }
2321 }
2322
2323 /* Compare two string objects via strcmp() or alike.
2324 * Note that the objects may be integer-encoded. In such a case we
2325 * use snprintf() to get a string representation of the numbers on the stack
2326 * and compare the strings, it's much faster than calling getDecodedObject().
2327 *
2328 * Important note: if objects are not integer encoded, but binary-safe strings,
2329 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2330 * binary safe. */
2331 static int compareStringObjects(robj *a, robj *b) {
2332 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2333 char bufa[128], bufb[128], *astr, *bstr;
2334 int bothsds = 1;
2335
2336 if (a == b) return 0;
2337 if (a->encoding != REDIS_ENCODING_RAW) {
2338 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2339 astr = bufa;
2340 bothsds = 0;
2341 } else {
2342 astr = a->ptr;
2343 }
2344 if (b->encoding != REDIS_ENCODING_RAW) {
2345 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2346 bstr = bufb;
2347 bothsds = 0;
2348 } else {
2349 bstr = b->ptr;
2350 }
2351 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2352 }
2353
2354 static size_t stringObjectLen(robj *o) {
2355 redisAssert(o->type == REDIS_STRING);
2356 if (o->encoding == REDIS_ENCODING_RAW) {
2357 return sdslen(o->ptr);
2358 } else {
2359 char buf[32];
2360
2361 return snprintf(buf,32,"%ld",(long)o->ptr);
2362 }
2363 }
2364
2365 /*============================ DB saving/loading ============================ */
2366
2367 static int rdbSaveType(FILE *fp, unsigned char type) {
2368 if (fwrite(&type,1,1,fp) == 0) return -1;
2369 return 0;
2370 }
2371
2372 static int rdbSaveTime(FILE *fp, time_t t) {
2373 int32_t t32 = (int32_t) t;
2374 if (fwrite(&t32,4,1,fp) == 0) return -1;
2375 return 0;
2376 }
2377
2378 /* check rdbLoadLen() comments for more info */
2379 static int rdbSaveLen(FILE *fp, uint32_t len) {
2380 unsigned char buf[2];
2381
2382 if (len < (1<<6)) {
2383 /* Save a 6 bit len */
2384 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2385 if (fwrite(buf,1,1,fp) == 0) return -1;
2386 } else if (len < (1<<14)) {
2387 /* Save a 14 bit len */
2388 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2389 buf[1] = len&0xFF;
2390 if (fwrite(buf,2,1,fp) == 0) return -1;
2391 } else {
2392 /* Save a 32 bit len */
2393 buf[0] = (REDIS_RDB_32BITLEN<<6);
2394 if (fwrite(buf,1,1,fp) == 0) return -1;
2395 len = htonl(len);
2396 if (fwrite(&len,4,1,fp) == 0) return -1;
2397 }
2398 return 0;
2399 }
2400
2401 /* String objects in the form "2391" "-100" without any space and with a
2402 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2403 * encoded as integers to save space */
2404 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2405 long long value;
2406 char *endptr, buf[32];
2407
2408 /* Check if it's possible to encode this value as a number */
2409 value = strtoll(s, &endptr, 10);
2410 if (endptr[0] != '\0') return 0;
2411 snprintf(buf,32,"%lld",value);
2412
2413 /* If the number converted back into a string is not identical
2414 * then it's not possible to encode the string as integer */
2415 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2416
2417 /* Finally check if it fits in our ranges */
2418 if (value >= -(1<<7) && value <= (1<<7)-1) {
2419 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2420 enc[1] = value&0xFF;
2421 return 2;
2422 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2423 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2424 enc[1] = value&0xFF;
2425 enc[2] = (value>>8)&0xFF;
2426 return 3;
2427 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2428 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2429 enc[1] = value&0xFF;
2430 enc[2] = (value>>8)&0xFF;
2431 enc[3] = (value>>16)&0xFF;
2432 enc[4] = (value>>24)&0xFF;
2433 return 5;
2434 } else {
2435 return 0;
2436 }
2437 }
2438
2439 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2440 unsigned int comprlen, outlen;
2441 unsigned char byte;
2442 void *out;
2443
2444 /* We require at least four bytes compression for this to be worth it */
2445 outlen = sdslen(obj->ptr)-4;
2446 if (outlen <= 0) return 0;
2447 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2448 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2449 if (comprlen == 0) {
2450 zfree(out);
2451 return 0;
2452 }
2453 /* Data compressed! Let's save it on disk */
2454 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2455 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2456 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2457 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2458 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2459 zfree(out);
2460 return comprlen;
2461
2462 writeerr:
2463 zfree(out);
2464 return -1;
2465 }
2466
2467 /* Save a string objet as [len][data] on disk. If the object is a string
2468 * representation of an integer value we try to safe it in a special form */
2469 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2470 size_t len;
2471 int enclen;
2472
2473 len = sdslen(obj->ptr);
2474
2475 /* Try integer encoding */
2476 if (len <= 11) {
2477 unsigned char buf[5];
2478 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2479 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2480 return 0;
2481 }
2482 }
2483
2484 /* Try LZF compression - under 20 bytes it's unable to compress even
2485 * aaaaaaaaaaaaaaaaaa so skip it */
2486 if (len > 20) {
2487 int retval;
2488
2489 retval = rdbSaveLzfStringObject(fp,obj);
2490 if (retval == -1) return -1;
2491 if (retval > 0) return 0;
2492 /* retval == 0 means data can't be compressed, save the old way */
2493 }
2494
2495 /* Store verbatim */
2496 if (rdbSaveLen(fp,len) == -1) return -1;
2497 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2498 return 0;
2499 }
2500
2501 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2502 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2503 int retval;
2504
2505 obj = getDecodedObject(obj);
2506 retval = rdbSaveStringObjectRaw(fp,obj);
2507 decrRefCount(obj);
2508 return retval;
2509 }
2510
2511 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2512 * 8 bit integer specifing the length of the representation.
2513 * This 8 bit integer has special values in order to specify the following
2514 * conditions:
2515 * 253: not a number
2516 * 254: + inf
2517 * 255: - inf
2518 */
2519 static int rdbSaveDoubleValue(FILE *fp, double val) {
2520 unsigned char buf[128];
2521 int len;
2522
2523 if (isnan(val)) {
2524 buf[0] = 253;
2525 len = 1;
2526 } else if (!isfinite(val)) {
2527 len = 1;
2528 buf[0] = (val < 0) ? 255 : 254;
2529 } else {
2530 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2531 buf[0] = strlen((char*)buf+1);
2532 len = buf[0]+1;
2533 }
2534 if (fwrite(buf,len,1,fp) == 0) return -1;
2535 return 0;
2536 }
2537
2538 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2539 static int rdbSave(char *filename) {
2540 dictIterator *di = NULL;
2541 dictEntry *de;
2542 FILE *fp;
2543 char tmpfile[256];
2544 int j;
2545 time_t now = time(NULL);
2546
2547 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2548 fp = fopen(tmpfile,"w");
2549 if (!fp) {
2550 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2551 return REDIS_ERR;
2552 }
2553 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2554 for (j = 0; j < server.dbnum; j++) {
2555 redisDb *db = server.db+j;
2556 dict *d = db->dict;
2557 if (dictSize(d) == 0) continue;
2558 di = dictGetIterator(d);
2559 if (!di) {
2560 fclose(fp);
2561 return REDIS_ERR;
2562 }
2563
2564 /* Write the SELECT DB opcode */
2565 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2566 if (rdbSaveLen(fp,j) == -1) goto werr;
2567
2568 /* Iterate this DB writing every entry */
2569 while((de = dictNext(di)) != NULL) {
2570 robj *key = dictGetEntryKey(de);
2571 robj *o = dictGetEntryVal(de);
2572 time_t expiretime = getExpire(db,key);
2573
2574 /* Save the expire time */
2575 if (expiretime != -1) {
2576 /* If this key is already expired skip it */
2577 if (expiretime < now) continue;
2578 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2579 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2580 }
2581 /* Save the key and associated value */
2582 if (rdbSaveType(fp,o->type) == -1) goto werr;
2583 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2584 if (o->type == REDIS_STRING) {
2585 /* Save a string value */
2586 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2587 } else if (o->type == REDIS_LIST) {
2588 /* Save a list value */
2589 list *list = o->ptr;
2590 listNode *ln;
2591
2592 listRewind(list);
2593 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2594 while((ln = listYield(list))) {
2595 robj *eleobj = listNodeValue(ln);
2596
2597 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2598 }
2599 } else if (o->type == REDIS_SET) {
2600 /* Save a set value */
2601 dict *set = o->ptr;
2602 dictIterator *di = dictGetIterator(set);
2603 dictEntry *de;
2604
2605 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2606 while((de = dictNext(di)) != NULL) {
2607 robj *eleobj = dictGetEntryKey(de);
2608
2609 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2610 }
2611 dictReleaseIterator(di);
2612 } else if (o->type == REDIS_ZSET) {
2613 /* Save a set value */
2614 zset *zs = o->ptr;
2615 dictIterator *di = dictGetIterator(zs->dict);
2616 dictEntry *de;
2617
2618 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2619 while((de = dictNext(di)) != NULL) {
2620 robj *eleobj = dictGetEntryKey(de);
2621 double *score = dictGetEntryVal(de);
2622
2623 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2624 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2625 }
2626 dictReleaseIterator(di);
2627 } else {
2628 redisAssert(0 != 0);
2629 }
2630 }
2631 dictReleaseIterator(di);
2632 }
2633 /* EOF opcode */
2634 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2635
2636 /* Make sure data will not remain on the OS's output buffers */
2637 fflush(fp);
2638 fsync(fileno(fp));
2639 fclose(fp);
2640
2641 /* Use RENAME to make sure the DB file is changed atomically only
2642 * if the generate DB file is ok. */
2643 if (rename(tmpfile,filename) == -1) {
2644 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2645 unlink(tmpfile);
2646 return REDIS_ERR;
2647 }
2648 redisLog(REDIS_NOTICE,"DB saved on disk");
2649 server.dirty = 0;
2650 server.lastsave = time(NULL);
2651 return REDIS_OK;
2652
2653 werr:
2654 fclose(fp);
2655 unlink(tmpfile);
2656 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2657 if (di) dictReleaseIterator(di);
2658 return REDIS_ERR;
2659 }
2660
2661 static int rdbSaveBackground(char *filename) {
2662 pid_t childpid;
2663
2664 if (server.bgsavechildpid != -1) return REDIS_ERR;
2665 if ((childpid = fork()) == 0) {
2666 /* Child */
2667 close(server.fd);
2668 if (rdbSave(filename) == REDIS_OK) {
2669 exit(0);
2670 } else {
2671 exit(1);
2672 }
2673 } else {
2674 /* Parent */
2675 if (childpid == -1) {
2676 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2677 strerror(errno));
2678 return REDIS_ERR;
2679 }
2680 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2681 server.bgsavechildpid = childpid;
2682 return REDIS_OK;
2683 }
2684 return REDIS_OK; /* unreached */
2685 }
2686
2687 static void rdbRemoveTempFile(pid_t childpid) {
2688 char tmpfile[256];
2689
2690 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2691 unlink(tmpfile);
2692 }
2693
2694 static int rdbLoadType(FILE *fp) {
2695 unsigned char type;
2696 if (fread(&type,1,1,fp) == 0) return -1;
2697 return type;
2698 }
2699
2700 static time_t rdbLoadTime(FILE *fp) {
2701 int32_t t32;
2702 if (fread(&t32,4,1,fp) == 0) return -1;
2703 return (time_t) t32;
2704 }
2705
2706 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2707 * of this file for a description of how this are stored on disk.
2708 *
2709 * isencoded is set to 1 if the readed length is not actually a length but
2710 * an "encoding type", check the above comments for more info */
2711 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2712 unsigned char buf[2];
2713 uint32_t len;
2714
2715 if (isencoded) *isencoded = 0;
2716 if (rdbver == 0) {
2717 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2718 return ntohl(len);
2719 } else {
2720 int type;
2721
2722 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2723 type = (buf[0]&0xC0)>>6;
2724 if (type == REDIS_RDB_6BITLEN) {
2725 /* Read a 6 bit len */
2726 return buf[0]&0x3F;
2727 } else if (type == REDIS_RDB_ENCVAL) {
2728 /* Read a 6 bit len encoding type */
2729 if (isencoded) *isencoded = 1;
2730 return buf[0]&0x3F;
2731 } else if (type == REDIS_RDB_14BITLEN) {
2732 /* Read a 14 bit len */
2733 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2734 return ((buf[0]&0x3F)<<8)|buf[1];
2735 } else {
2736 /* Read a 32 bit len */
2737 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2738 return ntohl(len);
2739 }
2740 }
2741 }
2742
2743 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2744 unsigned char enc[4];
2745 long long val;
2746
2747 if (enctype == REDIS_RDB_ENC_INT8) {
2748 if (fread(enc,1,1,fp) == 0) return NULL;
2749 val = (signed char)enc[0];
2750 } else if (enctype == REDIS_RDB_ENC_INT16) {
2751 uint16_t v;
2752 if (fread(enc,2,1,fp) == 0) return NULL;
2753 v = enc[0]|(enc[1]<<8);
2754 val = (int16_t)v;
2755 } else if (enctype == REDIS_RDB_ENC_INT32) {
2756 uint32_t v;
2757 if (fread(enc,4,1,fp) == 0) return NULL;
2758 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2759 val = (int32_t)v;
2760 } else {
2761 val = 0; /* anti-warning */
2762 redisAssert(0!=0);
2763 }
2764 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2765 }
2766
2767 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2768 unsigned int len, clen;
2769 unsigned char *c = NULL;
2770 sds val = NULL;
2771
2772 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2773 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2774 if ((c = zmalloc(clen)) == NULL) goto err;
2775 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2776 if (fread(c,clen,1,fp) == 0) goto err;
2777 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2778 zfree(c);
2779 return createObject(REDIS_STRING,val);
2780 err:
2781 zfree(c);
2782 sdsfree(val);
2783 return NULL;
2784 }
2785
2786 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2787 int isencoded;
2788 uint32_t len;
2789 sds val;
2790
2791 len = rdbLoadLen(fp,rdbver,&isencoded);
2792 if (isencoded) {
2793 switch(len) {
2794 case REDIS_RDB_ENC_INT8:
2795 case REDIS_RDB_ENC_INT16:
2796 case REDIS_RDB_ENC_INT32:
2797 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2798 case REDIS_RDB_ENC_LZF:
2799 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2800 default:
2801 redisAssert(0!=0);
2802 }
2803 }
2804
2805 if (len == REDIS_RDB_LENERR) return NULL;
2806 val = sdsnewlen(NULL,len);
2807 if (len && fread(val,len,1,fp) == 0) {
2808 sdsfree(val);
2809 return NULL;
2810 }
2811 return tryObjectSharing(createObject(REDIS_STRING,val));
2812 }
2813
2814 /* For information about double serialization check rdbSaveDoubleValue() */
2815 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2816 char buf[128];
2817 unsigned char len;
2818
2819 if (fread(&len,1,1,fp) == 0) return -1;
2820 switch(len) {
2821 case 255: *val = R_NegInf; return 0;
2822 case 254: *val = R_PosInf; return 0;
2823 case 253: *val = R_Nan; return 0;
2824 default:
2825 if (fread(buf,len,1,fp) == 0) return -1;
2826 sscanf(buf, "%lg", val);
2827 return 0;
2828 }
2829 }
2830
2831 static int rdbLoad(char *filename) {
2832 FILE *fp;
2833 robj *keyobj = NULL;
2834 uint32_t dbid;
2835 int type, retval, rdbver;
2836 dict *d = server.db[0].dict;
2837 redisDb *db = server.db+0;
2838 char buf[1024];
2839 time_t expiretime = -1, now = time(NULL);
2840
2841 fp = fopen(filename,"r");
2842 if (!fp) return REDIS_ERR;
2843 if (fread(buf,9,1,fp) == 0) goto eoferr;
2844 buf[9] = '\0';
2845 if (memcmp(buf,"REDIS",5) != 0) {
2846 fclose(fp);
2847 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2848 return REDIS_ERR;
2849 }
2850 rdbver = atoi(buf+5);
2851 if (rdbver > 1) {
2852 fclose(fp);
2853 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2854 return REDIS_ERR;
2855 }
2856 while(1) {
2857 robj *o;
2858
2859 /* Read type. */
2860 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2861 if (type == REDIS_EXPIRETIME) {
2862 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2863 /* We read the time so we need to read the object type again */
2864 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2865 }
2866 if (type == REDIS_EOF) break;
2867 /* Handle SELECT DB opcode as a special case */
2868 if (type == REDIS_SELECTDB) {
2869 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2870 goto eoferr;
2871 if (dbid >= (unsigned)server.dbnum) {
2872 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2873 exit(1);
2874 }
2875 db = server.db+dbid;
2876 d = db->dict;
2877 continue;
2878 }
2879 /* Read key */
2880 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2881
2882 if (type == REDIS_STRING) {
2883 /* Read string value */
2884 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2885 tryObjectEncoding(o);
2886 } else if (type == REDIS_LIST || type == REDIS_SET) {
2887 /* Read list/set value */
2888 uint32_t listlen;
2889
2890 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2891 goto eoferr;
2892 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2893 /* Load every single element of the list/set */
2894 while(listlen--) {
2895 robj *ele;
2896
2897 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2898 tryObjectEncoding(ele);
2899 if (type == REDIS_LIST) {
2900 listAddNodeTail((list*)o->ptr,ele);
2901 } else {
2902 dictAdd((dict*)o->ptr,ele,NULL);
2903 }
2904 }
2905 } else if (type == REDIS_ZSET) {
2906 /* Read list/set value */
2907 uint32_t zsetlen;
2908 zset *zs;
2909
2910 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2911 goto eoferr;
2912 o = createZsetObject();
2913 zs = o->ptr;
2914 /* Load every single element of the list/set */
2915 while(zsetlen--) {
2916 robj *ele;
2917 double *score = zmalloc(sizeof(double));
2918
2919 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2920 tryObjectEncoding(ele);
2921 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2922 dictAdd(zs->dict,ele,score);
2923 zslInsert(zs->zsl,*score,ele);
2924 incrRefCount(ele); /* added to skiplist */
2925 }
2926 } else {
2927 redisAssert(0 != 0);
2928 }
2929 /* Add the new object in the hash table */
2930 retval = dictAdd(d,keyobj,o);
2931 if (retval == DICT_ERR) {
2932 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2933 exit(1);
2934 }
2935 /* Set the expire time if needed */
2936 if (expiretime != -1) {
2937 setExpire(db,keyobj,expiretime);
2938 /* Delete this key if already expired */
2939 if (expiretime < now) deleteKey(db,keyobj);
2940 expiretime = -1;
2941 }
2942 keyobj = o = NULL;
2943 }
2944 fclose(fp);
2945 return REDIS_OK;
2946
2947 eoferr: /* unexpected end of file is handled here with a fatal exit */
2948 if (keyobj) decrRefCount(keyobj);
2949 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2950 exit(1);
2951 return REDIS_ERR; /* Just to avoid warning */
2952 }
2953
2954 /*================================== Commands =============================== */
2955
2956 static void authCommand(redisClient *c) {
2957 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2958 c->authenticated = 1;
2959 addReply(c,shared.ok);
2960 } else {
2961 c->authenticated = 0;
2962 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2963 }
2964 }
2965
2966 static void pingCommand(redisClient *c) {
2967 addReply(c,shared.pong);
2968 }
2969
2970 static void echoCommand(redisClient *c) {
2971 addReplyBulkLen(c,c->argv[1]);
2972 addReply(c,c->argv[1]);
2973 addReply(c,shared.crlf);
2974 }
2975
2976 /*=================================== Strings =============================== */
2977
2978 static void setGenericCommand(redisClient *c, int nx) {
2979 int retval;
2980
2981 deleteIfVolatile(c->db,c->argv[1]);
2982 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2983 if (retval == DICT_ERR) {
2984 if (!nx) {
2985 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2986 incrRefCount(c->argv[2]);
2987 } else {
2988 addReply(c,shared.czero);
2989 return;
2990 }
2991 } else {
2992 incrRefCount(c->argv[1]);
2993 incrRefCount(c->argv[2]);
2994 }
2995 server.dirty++;
2996 removeExpire(c->db,c->argv[1]);
2997 addReply(c, nx ? shared.cone : shared.ok);
2998 }
2999
3000 static void setCommand(redisClient *c) {
3001 setGenericCommand(c,0);
3002 }
3003
3004 static void setnxCommand(redisClient *c) {
3005 setGenericCommand(c,1);
3006 }
3007
3008 static void getCommand(redisClient *c) {
3009 robj *o = lookupKeyRead(c->db,c->argv[1]);
3010
3011 if (o == NULL) {
3012 addReply(c,shared.nullbulk);
3013 } else {
3014 if (o->type != REDIS_STRING) {
3015 addReply(c,shared.wrongtypeerr);
3016 } else {
3017 addReplyBulkLen(c,o);
3018 addReply(c,o);
3019 addReply(c,shared.crlf);
3020 }
3021 }
3022 }
3023
3024 static void getsetCommand(redisClient *c) {
3025 getCommand(c);
3026 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3027 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3028 } else {
3029 incrRefCount(c->argv[1]);
3030 }
3031 incrRefCount(c->argv[2]);
3032 server.dirty++;
3033 removeExpire(c->db,c->argv[1]);
3034 }
3035
3036 static void mgetCommand(redisClient *c) {
3037 int j;
3038
3039 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3040 for (j = 1; j < c->argc; j++) {
3041 robj *o = lookupKeyRead(c->db,c->argv[j]);
3042 if (o == NULL) {
3043 addReply(c,shared.nullbulk);
3044 } else {
3045 if (o->type != REDIS_STRING) {
3046 addReply(c,shared.nullbulk);
3047 } else {
3048 addReplyBulkLen(c,o);
3049 addReply(c,o);
3050 addReply(c,shared.crlf);
3051 }
3052 }
3053 }
3054 }
3055
3056 static void msetGenericCommand(redisClient *c, int nx) {
3057 int j, busykeys = 0;
3058
3059 if ((c->argc % 2) == 0) {
3060 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
3061 return;
3062 }
3063 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3064 * set nothing at all if at least one already key exists. */
3065 if (nx) {
3066 for (j = 1; j < c->argc; j += 2) {
3067 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3068 busykeys++;
3069 }
3070 }
3071 }
3072 if (busykeys) {
3073 addReply(c, shared.czero);
3074 return;
3075 }
3076
3077 for (j = 1; j < c->argc; j += 2) {
3078 int retval;
3079
3080 tryObjectEncoding(c->argv[j+1]);
3081 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3082 if (retval == DICT_ERR) {
3083 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3084 incrRefCount(c->argv[j+1]);
3085 } else {
3086 incrRefCount(c->argv[j]);
3087 incrRefCount(c->argv[j+1]);
3088 }
3089 removeExpire(c->db,c->argv[j]);
3090 }
3091 server.dirty += (c->argc-1)/2;
3092 addReply(c, nx ? shared.cone : shared.ok);
3093 }
3094
3095 static void msetCommand(redisClient *c) {
3096 msetGenericCommand(c,0);
3097 }
3098
3099 static void msetnxCommand(redisClient *c) {
3100 msetGenericCommand(c,1);
3101 }
3102
3103 static void incrDecrCommand(redisClient *c, long long incr) {
3104 long long value;
3105 int retval;
3106 robj *o;
3107
3108 o = lookupKeyWrite(c->db,c->argv[1]);
3109 if (o == NULL) {
3110 value = 0;
3111 } else {
3112 if (o->type != REDIS_STRING) {
3113 value = 0;
3114 } else {
3115 char *eptr;
3116
3117 if (o->encoding == REDIS_ENCODING_RAW)
3118 value = strtoll(o->ptr, &eptr, 10);
3119 else if (o->encoding == REDIS_ENCODING_INT)
3120 value = (long)o->ptr;
3121 else
3122 redisAssert(1 != 1);
3123 }
3124 }
3125
3126 value += incr;
3127 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3128 tryObjectEncoding(o);
3129 retval = dictAdd(c->db->dict,c->argv[1],o);
3130 if (retval == DICT_ERR) {
3131 dictReplace(c->db->dict,c->argv[1],o);
3132 removeExpire(c->db,c->argv[1]);
3133 } else {
3134 incrRefCount(c->argv[1]);
3135 }
3136 server.dirty++;
3137 addReply(c,shared.colon);
3138 addReply(c,o);
3139 addReply(c,shared.crlf);
3140 }
3141
3142 static void incrCommand(redisClient *c) {
3143 incrDecrCommand(c,1);
3144 }
3145
3146 static void decrCommand(redisClient *c) {
3147 incrDecrCommand(c,-1);
3148 }
3149
3150 static void incrbyCommand(redisClient *c) {
3151 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3152 incrDecrCommand(c,incr);
3153 }
3154
3155 static void decrbyCommand(redisClient *c) {
3156 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3157 incrDecrCommand(c,-incr);
3158 }
3159
3160 /* ========================= Type agnostic commands ========================= */
3161
3162 static void delCommand(redisClient *c) {
3163 int deleted = 0, j;
3164
3165 for (j = 1; j < c->argc; j++) {
3166 if (deleteKey(c->db,c->argv[j])) {
3167 server.dirty++;
3168 deleted++;
3169 }
3170 }
3171 switch(deleted) {
3172 case 0:
3173 addReply(c,shared.czero);
3174 break;
3175 case 1:
3176 addReply(c,shared.cone);
3177 break;
3178 default:
3179 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3180 break;
3181 }
3182 }
3183
3184 static void existsCommand(redisClient *c) {
3185 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3186 }
3187
3188 static void selectCommand(redisClient *c) {
3189 int id = atoi(c->argv[1]->ptr);
3190
3191 if (selectDb(c,id) == REDIS_ERR) {
3192 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3193 } else {
3194 addReply(c,shared.ok);
3195 }
3196 }
3197
3198 static void randomkeyCommand(redisClient *c) {
3199 dictEntry *de;
3200
3201 while(1) {
3202 de = dictGetRandomKey(c->db->dict);
3203 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3204 }
3205 if (de == NULL) {
3206 addReply(c,shared.plus);
3207 addReply(c,shared.crlf);
3208 } else {
3209 addReply(c,shared.plus);
3210 addReply(c,dictGetEntryKey(de));
3211 addReply(c,shared.crlf);
3212 }
3213 }
3214
3215 static void keysCommand(redisClient *c) {
3216 dictIterator *di;
3217 dictEntry *de;
3218 sds pattern = c->argv[1]->ptr;
3219 int plen = sdslen(pattern);
3220 unsigned long numkeys = 0, keyslen = 0;
3221 robj *lenobj = createObject(REDIS_STRING,NULL);
3222
3223 di = dictGetIterator(c->db->dict);
3224 addReply(c,lenobj);
3225 decrRefCount(lenobj);
3226 while((de = dictNext(di)) != NULL) {
3227 robj *keyobj = dictGetEntryKey(de);
3228
3229 sds key = keyobj->ptr;
3230 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3231 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3232 if (expireIfNeeded(c->db,keyobj) == 0) {
3233 if (numkeys != 0)
3234 addReply(c,shared.space);
3235 addReply(c,keyobj);
3236 numkeys++;
3237 keyslen += sdslen(key);
3238 }
3239 }
3240 }
3241 dictReleaseIterator(di);
3242 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3243 addReply(c,shared.crlf);
3244 }
3245
3246 static void dbsizeCommand(redisClient *c) {
3247 addReplySds(c,
3248 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3249 }
3250
3251 static void lastsaveCommand(redisClient *c) {
3252 addReplySds(c,
3253 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3254 }
3255
3256 static void typeCommand(redisClient *c) {
3257 robj *o;
3258 char *type;
3259
3260 o = lookupKeyRead(c->db,c->argv[1]);
3261 if (o == NULL) {
3262 type = "+none";
3263 } else {
3264 switch(o->type) {
3265 case REDIS_STRING: type = "+string"; break;
3266 case REDIS_LIST: type = "+list"; break;
3267 case REDIS_SET: type = "+set"; break;
3268 case REDIS_ZSET: type = "+zset"; break;
3269 default: type = "unknown"; break;
3270 }
3271 }
3272 addReplySds(c,sdsnew(type));
3273 addReply(c,shared.crlf);
3274 }
3275
3276 static void saveCommand(redisClient *c) {
3277 if (server.bgsavechildpid != -1) {
3278 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3279 return;
3280 }
3281 if (rdbSave(server.dbfilename) == REDIS_OK) {
3282 addReply(c,shared.ok);
3283 } else {
3284 addReply(c,shared.err);
3285 }
3286 }
3287
3288 static void bgsaveCommand(redisClient *c) {
3289 if (server.bgsavechildpid != -1) {
3290 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3291 return;
3292 }
3293 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3294 addReply(c,shared.ok);
3295 } else {
3296 addReply(c,shared.err);
3297 }
3298 }
3299
3300 static void shutdownCommand(redisClient *c) {
3301 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3302 /* Kill the saving child if there is a background saving in progress.
3303 We want to avoid race conditions, for instance our saving child may
3304 overwrite the synchronous saving did by SHUTDOWN. */
3305 if (server.bgsavechildpid != -1) {
3306 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3307 kill(server.bgsavechildpid,SIGKILL);
3308 rdbRemoveTempFile(server.bgsavechildpid);
3309 }
3310 /* SYNC SAVE */
3311 if (rdbSave(server.dbfilename) == REDIS_OK) {
3312 if (server.daemonize)
3313 unlink(server.pidfile);
3314 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3315 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3316 exit(1);
3317 } else {
3318 /* Ooops.. error saving! The best we can do is to continue operating.
3319 * Note that if there was a background saving process, in the next
3320 * cron() Redis will be notified that the background saving aborted,
3321 * handling special stuff like slaves pending for synchronization... */
3322 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3323 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3324 }
3325 }
3326
3327 static void renameGenericCommand(redisClient *c, int nx) {
3328 robj *o;
3329
3330 /* To use the same key as src and dst is probably an error */
3331 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3332 addReply(c,shared.sameobjecterr);
3333 return;
3334 }
3335
3336 o = lookupKeyWrite(c->db,c->argv[1]);
3337 if (o == NULL) {
3338 addReply(c,shared.nokeyerr);
3339 return;
3340 }
3341 incrRefCount(o);
3342 deleteIfVolatile(c->db,c->argv[2]);
3343 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3344 if (nx) {
3345 decrRefCount(o);
3346 addReply(c,shared.czero);
3347 return;
3348 }
3349 dictReplace(c->db->dict,c->argv[2],o);
3350 } else {
3351 incrRefCount(c->argv[2]);
3352 }
3353 deleteKey(c->db,c->argv[1]);
3354 server.dirty++;
3355 addReply(c,nx ? shared.cone : shared.ok);
3356 }
3357
3358 static void renameCommand(redisClient *c) {
3359 renameGenericCommand(c,0);
3360 }
3361
3362 static void renamenxCommand(redisClient *c) {
3363 renameGenericCommand(c,1);
3364 }
3365
3366 static void moveCommand(redisClient *c) {
3367 robj *o;
3368 redisDb *src, *dst;
3369 int srcid;
3370
3371 /* Obtain source and target DB pointers */
3372 src = c->db;
3373 srcid = c->db->id;
3374 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3375 addReply(c,shared.outofrangeerr);
3376 return;
3377 }
3378 dst = c->db;
3379 selectDb(c,srcid); /* Back to the source DB */
3380
3381 /* If the user is moving using as target the same
3382 * DB as the source DB it is probably an error. */
3383 if (src == dst) {
3384 addReply(c,shared.sameobjecterr);
3385 return;
3386 }
3387
3388 /* Check if the element exists and get a reference */
3389 o = lookupKeyWrite(c->db,c->argv[1]);
3390 if (!o) {
3391 addReply(c,shared.czero);
3392 return;
3393 }
3394
3395 /* Try to add the element to the target DB */
3396 deleteIfVolatile(dst,c->argv[1]);
3397 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3398 addReply(c,shared.czero);
3399 return;
3400 }
3401 incrRefCount(c->argv[1]);
3402 incrRefCount(o);
3403
3404 /* OK! key moved, free the entry in the source DB */
3405 deleteKey(src,c->argv[1]);
3406 server.dirty++;
3407 addReply(c,shared.cone);
3408 }
3409
3410 /* =================================== Lists ================================ */
3411 static void pushGenericCommand(redisClient *c, int where) {
3412 robj *lobj;
3413 list *list;
3414
3415 lobj = lookupKeyWrite(c->db,c->argv[1]);
3416 if (lobj == NULL) {
3417 lobj = createListObject();
3418 list = lobj->ptr;
3419 if (where == REDIS_HEAD) {
3420 listAddNodeHead(list,c->argv[2]);
3421 } else {
3422 listAddNodeTail(list,c->argv[2]);
3423 }
3424 dictAdd(c->db->dict,c->argv[1],lobj);
3425 incrRefCount(c->argv[1]);
3426 incrRefCount(c->argv[2]);
3427 } else {
3428 if (lobj->type != REDIS_LIST) {
3429 addReply(c,shared.wrongtypeerr);
3430 return;
3431 }
3432 list = lobj->ptr;
3433 if (where == REDIS_HEAD) {
3434 listAddNodeHead(list,c->argv[2]);
3435 } else {
3436 listAddNodeTail(list,c->argv[2]);
3437 }
3438 incrRefCount(c->argv[2]);
3439 }
3440 server.dirty++;
3441 addReply(c,shared.ok);
3442 }
3443
3444 static void lpushCommand(redisClient *c) {
3445 pushGenericCommand(c,REDIS_HEAD);
3446 }
3447
3448 static void rpushCommand(redisClient *c) {
3449 pushGenericCommand(c,REDIS_TAIL);
3450 }
3451
3452 static void llenCommand(redisClient *c) {
3453 robj *o;
3454 list *l;
3455
3456 o = lookupKeyRead(c->db,c->argv[1]);
3457 if (o == NULL) {
3458 addReply(c,shared.czero);
3459 return;
3460 } else {
3461 if (o->type != REDIS_LIST) {
3462 addReply(c,shared.wrongtypeerr);
3463 } else {
3464 l = o->ptr;
3465 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3466 }
3467 }
3468 }
3469
3470 static void lindexCommand(redisClient *c) {
3471 robj *o;
3472 int index = atoi(c->argv[2]->ptr);
3473
3474 o = lookupKeyRead(c->db,c->argv[1]);
3475 if (o == NULL) {
3476 addReply(c,shared.nullbulk);
3477 } else {
3478 if (o->type != REDIS_LIST) {
3479 addReply(c,shared.wrongtypeerr);
3480 } else {
3481 list *list = o->ptr;
3482 listNode *ln;
3483
3484 ln = listIndex(list, index);
3485 if (ln == NULL) {
3486 addReply(c,shared.nullbulk);
3487 } else {
3488 robj *ele = listNodeValue(ln);
3489 addReplyBulkLen(c,ele);
3490 addReply(c,ele);
3491 addReply(c,shared.crlf);
3492 }
3493 }
3494 }
3495 }
3496
3497 static void lsetCommand(redisClient *c) {
3498 robj *o;
3499 int index = atoi(c->argv[2]->ptr);
3500
3501 o = lookupKeyWrite(c->db,c->argv[1]);
3502 if (o == NULL) {
3503 addReply(c,shared.nokeyerr);
3504 } else {
3505 if (o->type != REDIS_LIST) {
3506 addReply(c,shared.wrongtypeerr);
3507 } else {
3508 list *list = o->ptr;
3509 listNode *ln;
3510
3511 ln = listIndex(list, index);
3512 if (ln == NULL) {
3513 addReply(c,shared.outofrangeerr);
3514 } else {
3515 robj *ele = listNodeValue(ln);
3516
3517 decrRefCount(ele);
3518 listNodeValue(ln) = c->argv[3];
3519 incrRefCount(c->argv[3]);
3520 addReply(c,shared.ok);
3521 server.dirty++;
3522 }
3523 }
3524 }
3525 }
3526
3527 static void popGenericCommand(redisClient *c, int where) {
3528 robj *o;
3529
3530 o = lookupKeyWrite(c->db,c->argv[1]);
3531 if (o == NULL) {
3532 addReply(c,shared.nullbulk);
3533 } else {
3534 if (o->type != REDIS_LIST) {
3535 addReply(c,shared.wrongtypeerr);
3536 } else {
3537 list *list = o->ptr;
3538 listNode *ln;
3539
3540 if (where == REDIS_HEAD)
3541 ln = listFirst(list);
3542 else
3543 ln = listLast(list);
3544
3545 if (ln == NULL) {
3546 addReply(c,shared.nullbulk);
3547 } else {
3548 robj *ele = listNodeValue(ln);
3549 addReplyBulkLen(c,ele);
3550 addReply(c,ele);
3551 addReply(c,shared.crlf);
3552 listDelNode(list,ln);
3553 server.dirty++;
3554 }
3555 }
3556 }
3557 }
3558
3559 static void lpopCommand(redisClient *c) {
3560 popGenericCommand(c,REDIS_HEAD);
3561 }
3562
3563 static void rpopCommand(redisClient *c) {
3564 popGenericCommand(c,REDIS_TAIL);
3565 }
3566
3567 static void lrangeCommand(redisClient *c) {
3568 robj *o;
3569 int start = atoi(c->argv[2]->ptr);
3570 int end = atoi(c->argv[3]->ptr);
3571
3572 o = lookupKeyRead(c->db,c->argv[1]);
3573 if (o == NULL) {
3574 addReply(c,shared.nullmultibulk);
3575 } else {
3576 if (o->type != REDIS_LIST) {
3577 addReply(c,shared.wrongtypeerr);
3578 } else {
3579 list *list = o->ptr;
3580 listNode *ln;
3581 int llen = listLength(list);
3582 int rangelen, j;
3583 robj *ele;
3584
3585 /* convert negative indexes */
3586 if (start < 0) start = llen+start;
3587 if (end < 0) end = llen+end;
3588 if (start < 0) start = 0;
3589 if (end < 0) end = 0;
3590
3591 /* indexes sanity checks */
3592 if (start > end || start >= llen) {
3593 /* Out of range start or start > end result in empty list */
3594 addReply(c,shared.emptymultibulk);
3595 return;
3596 }
3597 if (end >= llen) end = llen-1;
3598 rangelen = (end-start)+1;
3599
3600 /* Return the result in form of a multi-bulk reply */
3601 ln = listIndex(list, start);
3602 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3603 for (j = 0; j < rangelen; j++) {
3604 ele = listNodeValue(ln);
3605 addReplyBulkLen(c,ele);
3606 addReply(c,ele);
3607 addReply(c,shared.crlf);
3608 ln = ln->next;
3609 }
3610 }
3611 }
3612 }
3613
3614 static void ltrimCommand(redisClient *c) {
3615 robj *o;
3616 int start = atoi(c->argv[2]->ptr);
3617 int end = atoi(c->argv[3]->ptr);
3618
3619 o = lookupKeyWrite(c->db,c->argv[1]);
3620 if (o == NULL) {
3621 addReply(c,shared.nokeyerr);
3622 } else {
3623 if (o->type != REDIS_LIST) {
3624 addReply(c,shared.wrongtypeerr);
3625 } else {
3626 list *list = o->ptr;
3627 listNode *ln;
3628 int llen = listLength(list);
3629 int j, ltrim, rtrim;
3630
3631 /* convert negative indexes */
3632 if (start < 0) start = llen+start;
3633 if (end < 0) end = llen+end;
3634 if (start < 0) start = 0;
3635 if (end < 0) end = 0;
3636
3637 /* indexes sanity checks */
3638 if (start > end || start >= llen) {
3639 /* Out of range start or start > end result in empty list */
3640 ltrim = llen;
3641 rtrim = 0;
3642 } else {
3643 if (end >= llen) end = llen-1;
3644 ltrim = start;
3645 rtrim = llen-end-1;
3646 }
3647
3648 /* Remove list elements to perform the trim */
3649 for (j = 0; j < ltrim; j++) {
3650 ln = listFirst(list);
3651 listDelNode(list,ln);
3652 }
3653 for (j = 0; j < rtrim; j++) {
3654 ln = listLast(list);
3655 listDelNode(list,ln);
3656 }
3657 server.dirty++;
3658 addReply(c,shared.ok);
3659 }
3660 }
3661 }
3662
3663 static void lremCommand(redisClient *c) {
3664 robj *o;
3665
3666 o = lookupKeyWrite(c->db,c->argv[1]);
3667 if (o == NULL) {
3668 addReply(c,shared.czero);
3669 } else {
3670 if (o->type != REDIS_LIST) {
3671 addReply(c,shared.wrongtypeerr);
3672 } else {
3673 list *list = o->ptr;
3674 listNode *ln, *next;
3675 int toremove = atoi(c->argv[2]->ptr);
3676 int removed = 0;
3677 int fromtail = 0;
3678
3679 if (toremove < 0) {
3680 toremove = -toremove;
3681 fromtail = 1;
3682 }
3683 ln = fromtail ? list->tail : list->head;
3684 while (ln) {
3685 robj *ele = listNodeValue(ln);
3686
3687 next = fromtail ? ln->prev : ln->next;
3688 if (compareStringObjects(ele,c->argv[3]) == 0) {
3689 listDelNode(list,ln);
3690 server.dirty++;
3691 removed++;
3692 if (toremove && removed == toremove) break;
3693 }
3694 ln = next;
3695 }
3696 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3697 }
3698 }
3699 }
3700
3701 /* This is the semantic of this command:
3702 * RPOPLPUSH srclist dstlist:
3703 * IF LLEN(srclist) > 0
3704 * element = RPOP srclist
3705 * LPUSH dstlist element
3706 * RETURN element
3707 * ELSE
3708 * RETURN nil
3709 * END
3710 * END
3711 *
3712 * The idea is to be able to get an element from a list in a reliable way
3713 * since the element is not just returned but pushed against another list
3714 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3715 */
3716 static void rpoplpushcommand(redisClient *c) {
3717 robj *sobj;
3718
3719 sobj = lookupKeyWrite(c->db,c->argv[1]);
3720 if (sobj == NULL) {
3721 addReply(c,shared.nullbulk);
3722 } else {
3723 if (sobj->type != REDIS_LIST) {
3724 addReply(c,shared.wrongtypeerr);
3725 } else {
3726 list *srclist = sobj->ptr;
3727 listNode *ln = listLast(srclist);
3728
3729 if (ln == NULL) {
3730 addReply(c,shared.nullbulk);
3731 } else {
3732 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3733 robj *ele = listNodeValue(ln);
3734 list *dstlist;
3735
3736 if (dobj == NULL) {
3737
3738 /* Create the list if the key does not exist */
3739 dobj = createListObject();
3740 dictAdd(c->db->dict,c->argv[2],dobj);
3741 incrRefCount(c->argv[2]);
3742 } else if (dobj->type != REDIS_LIST) {
3743 addReply(c,shared.wrongtypeerr);
3744 return;
3745 }
3746 /* Add the element to the target list */
3747 dstlist = dobj->ptr;
3748 listAddNodeHead(dstlist,ele);
3749 incrRefCount(ele);
3750
3751 /* Send the element to the client as reply as well */
3752 addReplyBulkLen(c,ele);
3753 addReply(c,ele);
3754 addReply(c,shared.crlf);
3755
3756 /* Finally remove the element from the source list */
3757 listDelNode(srclist,ln);
3758 server.dirty++;
3759 }
3760 }
3761 }
3762 }
3763
3764
3765 /* ==================================== Sets ================================ */
3766
3767 static void saddCommand(redisClient *c) {
3768 robj *set;
3769
3770 set = lookupKeyWrite(c->db,c->argv[1]);
3771 if (set == NULL) {
3772 set = createSetObject();
3773 dictAdd(c->db->dict,c->argv[1],set);
3774 incrRefCount(c->argv[1]);
3775 } else {
3776 if (set->type != REDIS_SET) {
3777 addReply(c,shared.wrongtypeerr);
3778 return;
3779 }
3780 }
3781 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3782 incrRefCount(c->argv[2]);
3783 server.dirty++;
3784 addReply(c,shared.cone);
3785 } else {
3786 addReply(c,shared.czero);
3787 }
3788 }
3789
3790 static void sremCommand(redisClient *c) {
3791 robj *set;
3792
3793 set = lookupKeyWrite(c->db,c->argv[1]);
3794 if (set == NULL) {
3795 addReply(c,shared.czero);
3796 } else {
3797 if (set->type != REDIS_SET) {
3798 addReply(c,shared.wrongtypeerr);
3799 return;
3800 }
3801 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3802 server.dirty++;
3803 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3804 addReply(c,shared.cone);
3805 } else {
3806 addReply(c,shared.czero);
3807 }
3808 }
3809 }
3810
3811 static void smoveCommand(redisClient *c) {
3812 robj *srcset, *dstset;
3813
3814 srcset = lookupKeyWrite(c->db,c->argv[1]);
3815 dstset = lookupKeyWrite(c->db,c->argv[2]);
3816
3817 /* If the source key does not exist return 0, if it's of the wrong type
3818 * raise an error */
3819 if (srcset == NULL || srcset->type != REDIS_SET) {
3820 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3821 return;
3822 }
3823 /* Error if the destination key is not a set as well */
3824 if (dstset && dstset->type != REDIS_SET) {
3825 addReply(c,shared.wrongtypeerr);
3826 return;
3827 }
3828 /* Remove the element from the source set */
3829 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3830 /* Key not found in the src set! return zero */
3831 addReply(c,shared.czero);
3832 return;
3833 }
3834 server.dirty++;
3835 /* Add the element to the destination set */
3836 if (!dstset) {
3837 dstset = createSetObject();
3838 dictAdd(c->db->dict,c->argv[2],dstset);
3839 incrRefCount(c->argv[2]);
3840 }
3841 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3842 incrRefCount(c->argv[3]);
3843 addReply(c,shared.cone);
3844 }
3845
3846 static void sismemberCommand(redisClient *c) {
3847 robj *set;
3848
3849 set = lookupKeyRead(c->db,c->argv[1]);
3850 if (set == NULL) {
3851 addReply(c,shared.czero);
3852 } else {
3853 if (set->type != REDIS_SET) {
3854 addReply(c,shared.wrongtypeerr);
3855 return;
3856 }
3857 if (dictFind(set->ptr,c->argv[2]))
3858 addReply(c,shared.cone);
3859 else
3860 addReply(c,shared.czero);
3861 }
3862 }
3863
3864 static void scardCommand(redisClient *c) {
3865 robj *o;
3866 dict *s;
3867
3868 o = lookupKeyRead(c->db,c->argv[1]);
3869 if (o == NULL) {
3870 addReply(c,shared.czero);
3871 return;
3872 } else {
3873 if (o->type != REDIS_SET) {
3874 addReply(c,shared.wrongtypeerr);
3875 } else {
3876 s = o->ptr;
3877 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
3878 dictSize(s)));
3879 }
3880 }
3881 }
3882
3883 static void spopCommand(redisClient *c) {
3884 robj *set;
3885 dictEntry *de;
3886
3887 set = lookupKeyWrite(c->db,c->argv[1]);
3888 if (set == NULL) {
3889 addReply(c,shared.nullbulk);
3890 } else {
3891 if (set->type != REDIS_SET) {
3892 addReply(c,shared.wrongtypeerr);
3893 return;
3894 }
3895 de = dictGetRandomKey(set->ptr);
3896 if (de == NULL) {
3897 addReply(c,shared.nullbulk);
3898 } else {
3899 robj *ele = dictGetEntryKey(de);
3900
3901 addReplyBulkLen(c,ele);
3902 addReply(c,ele);
3903 addReply(c,shared.crlf);
3904 dictDelete(set->ptr,ele);
3905 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3906 server.dirty++;
3907 }
3908 }
3909 }
3910
3911 static void srandmemberCommand(redisClient *c) {
3912 robj *set;
3913 dictEntry *de;
3914
3915 set = lookupKeyRead(c->db,c->argv[1]);
3916 if (set == NULL) {
3917 addReply(c,shared.nullbulk);
3918 } else {
3919 if (set->type != REDIS_SET) {
3920 addReply(c,shared.wrongtypeerr);
3921 return;
3922 }
3923 de = dictGetRandomKey(set->ptr);
3924 if (de == NULL) {
3925 addReply(c,shared.nullbulk);
3926 } else {
3927 robj *ele = dictGetEntryKey(de);
3928
3929 addReplyBulkLen(c,ele);
3930 addReply(c,ele);
3931 addReply(c,shared.crlf);
3932 }
3933 }
3934 }
3935
3936 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3937 dict **d1 = (void*) s1, **d2 = (void*) s2;
3938
3939 return dictSize(*d1)-dictSize(*d2);
3940 }
3941
3942 static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
3943 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3944 dictIterator *di;
3945 dictEntry *de;
3946 robj *lenobj = NULL, *dstset = NULL;
3947 unsigned long j, cardinality = 0;
3948
3949 for (j = 0; j < setsnum; j++) {
3950 robj *setobj;
3951
3952 setobj = dstkey ?
3953 lookupKeyWrite(c->db,setskeys[j]) :
3954 lookupKeyRead(c->db,setskeys[j]);
3955 if (!setobj) {
3956 zfree(dv);
3957 if (dstkey) {
3958 deleteKey(c->db,dstkey);
3959 addReply(c,shared.ok);
3960 } else {
3961 addReply(c,shared.nullmultibulk);
3962 }
3963 return;
3964 }
3965 if (setobj->type != REDIS_SET) {
3966 zfree(dv);
3967 addReply(c,shared.wrongtypeerr);
3968 return;
3969 }
3970 dv[j] = setobj->ptr;
3971 }
3972 /* Sort sets from the smallest to largest, this will improve our
3973 * algorithm's performace */
3974 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3975
3976 /* The first thing we should output is the total number of elements...
3977 * since this is a multi-bulk write, but at this stage we don't know
3978 * the intersection set size, so we use a trick, append an empty object
3979 * to the output list and save the pointer to later modify it with the
3980 * right length */
3981 if (!dstkey) {
3982 lenobj = createObject(REDIS_STRING,NULL);
3983 addReply(c,lenobj);
3984 decrRefCount(lenobj);
3985 } else {
3986 /* If we have a target key where to store the resulting set
3987 * create this key with an empty set inside */
3988 dstset = createSetObject();
3989 }
3990
3991 /* Iterate all the elements of the first (smallest) set, and test
3992 * the element against all the other sets, if at least one set does
3993 * not include the element it is discarded */
3994 di = dictGetIterator(dv[0]);
3995
3996 while((de = dictNext(di)) != NULL) {
3997 robj *ele;
3998
3999 for (j = 1; j < setsnum; j++)
4000 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4001 if (j != setsnum)
4002 continue; /* at least one set does not contain the member */
4003 ele = dictGetEntryKey(de);
4004 if (!dstkey) {
4005 addReplyBulkLen(c,ele);
4006 addReply(c,ele);
4007 addReply(c,shared.crlf);
4008 cardinality++;
4009 } else {
4010 dictAdd(dstset->ptr,ele,NULL);
4011 incrRefCount(ele);
4012 }
4013 }
4014 dictReleaseIterator(di);
4015
4016 if (dstkey) {
4017 /* Store the resulting set into the target */
4018 deleteKey(c->db,dstkey);
4019 dictAdd(c->db->dict,dstkey,dstset);
4020 incrRefCount(dstkey);
4021 }
4022
4023 if (!dstkey) {
4024 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
4025 } else {
4026 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4027 dictSize((dict*)dstset->ptr)));
4028 server.dirty++;
4029 }
4030 zfree(dv);
4031 }
4032
4033 static void sinterCommand(redisClient *c) {
4034 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4035 }
4036
4037 static void sinterstoreCommand(redisClient *c) {
4038 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4039 }
4040
4041 #define REDIS_OP_UNION 0
4042 #define REDIS_OP_DIFF 1
4043
4044 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4045 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4046 dictIterator *di;
4047 dictEntry *de;
4048 robj *dstset = NULL;
4049 int j, cardinality = 0;
4050
4051 for (j = 0; j < setsnum; j++) {
4052 robj *setobj;
4053
4054 setobj = dstkey ?
4055 lookupKeyWrite(c->db,setskeys[j]) :
4056 lookupKeyRead(c->db,setskeys[j]);
4057 if (!setobj) {
4058 dv[j] = NULL;
4059 continue;
4060 }
4061 if (setobj->type != REDIS_SET) {
4062 zfree(dv);
4063 addReply(c,shared.wrongtypeerr);
4064 return;
4065 }
4066 dv[j] = setobj->ptr;
4067 }
4068
4069 /* We need a temp set object to store our union. If the dstkey
4070 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4071 * this set object will be the resulting object to set into the target key*/
4072 dstset = createSetObject();
4073
4074 /* Iterate all the elements of all the sets, add every element a single
4075 * time to the result set */
4076 for (j = 0; j < setsnum; j++) {
4077 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4078 if (!dv[j]) continue; /* non existing keys are like empty sets */
4079
4080 di = dictGetIterator(dv[j]);
4081
4082 while((de = dictNext(di)) != NULL) {
4083 robj *ele;
4084
4085 /* dictAdd will not add the same element multiple times */
4086 ele = dictGetEntryKey(de);
4087 if (op == REDIS_OP_UNION || j == 0) {
4088 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4089 incrRefCount(ele);
4090 cardinality++;
4091 }
4092 } else if (op == REDIS_OP_DIFF) {
4093 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4094 cardinality--;
4095 }
4096 }
4097 }
4098 dictReleaseIterator(di);
4099
4100 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4101 }
4102
4103 /* Output the content of the resulting set, if not in STORE mode */
4104 if (!dstkey) {
4105 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4106 di = dictGetIterator(dstset->ptr);
4107 while((de = dictNext(di)) != NULL) {
4108 robj *ele;
4109
4110 ele = dictGetEntryKey(de);
4111 addReplyBulkLen(c,ele);
4112 addReply(c,ele);
4113 addReply(c,shared.crlf);
4114 }
4115 dictReleaseIterator(di);
4116 } else {
4117 /* If we have a target key where to store the resulting set
4118 * create this key with the result set inside */
4119 deleteKey(c->db,dstkey);
4120 dictAdd(c->db->dict,dstkey,dstset);
4121 incrRefCount(dstkey);
4122 }
4123
4124 /* Cleanup */
4125 if (!dstkey) {
4126 decrRefCount(dstset);
4127 } else {
4128 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4129 dictSize((dict*)dstset->ptr)));
4130 server.dirty++;
4131 }
4132 zfree(dv);
4133 }
4134
4135 static void sunionCommand(redisClient *c) {
4136 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4137 }
4138
4139 static void sunionstoreCommand(redisClient *c) {
4140 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4141 }
4142
4143 static void sdiffCommand(redisClient *c) {
4144 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4145 }
4146
4147 static void sdiffstoreCommand(redisClient *c) {
4148 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4149 }
4150
4151 /* ==================================== ZSets =============================== */
4152
4153 /* ZSETs are ordered sets using two data structures to hold the same elements
4154 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4155 * data structure.
4156 *
4157 * The elements are added to an hash table mapping Redis objects to scores.
4158 * At the same time the elements are added to a skip list mapping scores
4159 * to Redis objects (so objects are sorted by scores in this "view"). */
4160
4161 /* This skiplist implementation is almost a C translation of the original
4162 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4163 * Alternative to Balanced Trees", modified in three ways:
4164 * a) this implementation allows for repeated values.
4165 * b) the comparison is not just by key (our 'score') but by satellite data.
4166 * c) there is a back pointer, so it's a doubly linked list with the back
4167 * pointers being only at "level 1". This allows to traverse the list
4168 * from tail to head, useful for ZREVRANGE. */
4169
4170 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4171 zskiplistNode *zn = zmalloc(sizeof(*zn));
4172
4173 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4174 zn->score = score;
4175 zn->obj = obj;
4176 return zn;
4177 }
4178
4179 static zskiplist *zslCreate(void) {
4180 int j;
4181 zskiplist *zsl;
4182
4183 zsl = zmalloc(sizeof(*zsl));
4184 zsl->level = 1;
4185 zsl->length = 0;
4186 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4187 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4188 zsl->header->forward[j] = NULL;
4189 zsl->header->backward = NULL;
4190 zsl->tail = NULL;
4191 return zsl;
4192 }
4193
4194 static void zslFreeNode(zskiplistNode *node) {
4195 decrRefCount(node->obj);
4196 zfree(node->forward);
4197 zfree(node);
4198 }
4199
4200 static void zslFree(zskiplist *zsl) {
4201 zskiplistNode *node = zsl->header->forward[0], *next;
4202
4203 zfree(zsl->header->forward);
4204 zfree(zsl->header);
4205 while(node) {
4206 next = node->forward[0];
4207 zslFreeNode(node);
4208 node = next;
4209 }
4210 zfree(zsl);
4211 }
4212
4213 static int zslRandomLevel(void) {
4214 int level = 1;
4215 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4216 level += 1;
4217 return level;
4218 }
4219
4220 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4221 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4222 int i, level;
4223
4224 x = zsl->header;
4225 for (i = zsl->level-1; i >= 0; i--) {
4226 while (x->forward[i] &&
4227 (x->forward[i]->score < score ||
4228 (x->forward[i]->score == score &&
4229 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4230 x = x->forward[i];
4231 update[i] = x;
4232 }
4233 /* we assume the key is not already inside, since we allow duplicated
4234 * scores, and the re-insertion of score and redis object should never
4235 * happpen since the caller of zslInsert() should test in the hash table
4236 * if the element is already inside or not. */
4237 level = zslRandomLevel();
4238 if (level > zsl->level) {
4239 for (i = zsl->level; i < level; i++)
4240 update[i] = zsl->header;
4241 zsl->level = level;
4242 }
4243 x = zslCreateNode(level,score,obj);
4244 for (i = 0; i < level; i++) {
4245 x->forward[i] = update[i]->forward[i];
4246 update[i]->forward[i] = x;
4247 }
4248 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4249 if (x->forward[0])
4250 x->forward[0]->backward = x;
4251 else
4252 zsl->tail = x;
4253 zsl->length++;
4254 }
4255
4256 /* Delete an element with matching score/object from the skiplist. */
4257 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4258 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4259 int i;
4260
4261 x = zsl->header;
4262 for (i = zsl->level-1; i >= 0; i--) {
4263 while (x->forward[i] &&
4264 (x->forward[i]->score < score ||
4265 (x->forward[i]->score == score &&
4266 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4267 x = x->forward[i];
4268 update[i] = x;
4269 }
4270 /* We may have multiple elements with the same score, what we need
4271 * is to find the element with both the right score and object. */
4272 x = x->forward[0];
4273 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4274 for (i = 0; i < zsl->level; i++) {
4275 if (update[i]->forward[i] != x) break;
4276 update[i]->forward[i] = x->forward[i];
4277 }
4278 if (x->forward[0]) {
4279 x->forward[0]->backward = (x->backward == zsl->header) ?
4280 NULL : x->backward;
4281 } else {
4282 zsl->tail = x->backward;
4283 }
4284 zslFreeNode(x);
4285 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4286 zsl->level--;
4287 zsl->length--;
4288 return 1;
4289 } else {
4290 return 0; /* not found */
4291 }
4292 return 0; /* not found */
4293 }
4294
4295 /* Delete all the elements with score between min and max from the skiplist.
4296 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4297 * Note that this function takes the reference to the hash table view of the
4298 * sorted set, in order to remove the elements from the hash table too. */
4299 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4300 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4301 unsigned long removed = 0;
4302 int i;
4303
4304 x = zsl->header;
4305 for (i = zsl->level-1; i >= 0; i--) {
4306 while (x->forward[i] && x->forward[i]->score < min)
4307 x = x->forward[i];
4308 update[i] = x;
4309 }
4310 /* We may have multiple elements with the same score, what we need
4311 * is to find the element with both the right score and object. */
4312 x = x->forward[0];
4313 while (x && x->score <= max) {
4314 zskiplistNode *next;
4315
4316 for (i = 0; i < zsl->level; i++) {
4317 if (update[i]->forward[i] != x) break;
4318 update[i]->forward[i] = x->forward[i];
4319 }
4320 if (x->forward[0]) {
4321 x->forward[0]->backward = (x->backward == zsl->header) ?
4322 NULL : x->backward;
4323 } else {
4324 zsl->tail = x->backward;
4325 }
4326 next = x->forward[0];
4327 dictDelete(dict,x->obj);
4328 zslFreeNode(x);
4329 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4330 zsl->level--;
4331 zsl->length--;
4332 removed++;
4333 x = next;
4334 }
4335 return removed; /* not found */
4336 }
4337
4338 /* Find the first node having a score equal or greater than the specified one.
4339 * Returns NULL if there is no match. */
4340 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4341 zskiplistNode *x;
4342 int i;
4343
4344 x = zsl->header;
4345 for (i = zsl->level-1; i >= 0; i--) {
4346 while (x->forward[i] && x->forward[i]->score < score)
4347 x = x->forward[i];
4348 }
4349 /* We may have multiple elements with the same score, what we need
4350 * is to find the element with both the right score and object. */
4351 return x->forward[0];
4352 }
4353
4354 /* The actual Z-commands implementations */
4355
4356 /* This generic command implements both ZADD and ZINCRBY.
4357 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4358 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4359 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4360 robj *zsetobj;
4361 zset *zs;
4362 double *score;
4363
4364 zsetobj = lookupKeyWrite(c->db,key);
4365 if (zsetobj == NULL) {
4366 zsetobj = createZsetObject();
4367 dictAdd(c->db->dict,key,zsetobj);
4368 incrRefCount(key);
4369 } else {
4370 if (zsetobj->type != REDIS_ZSET) {
4371 addReply(c,shared.wrongtypeerr);
4372 return;
4373 }
4374 }
4375 zs = zsetobj->ptr;
4376
4377 /* Ok now since we implement both ZADD and ZINCRBY here the code
4378 * needs to handle the two different conditions. It's all about setting
4379 * '*score', that is, the new score to set, to the right value. */
4380 score = zmalloc(sizeof(double));
4381 if (doincrement) {
4382 dictEntry *de;
4383
4384 /* Read the old score. If the element was not present starts from 0 */
4385 de = dictFind(zs->dict,ele);
4386 if (de) {
4387 double *oldscore = dictGetEntryVal(de);
4388 *score = *oldscore + scoreval;
4389 } else {
4390 *score = scoreval;
4391 }
4392 } else {
4393 *score = scoreval;
4394 }
4395
4396 /* What follows is a simple remove and re-insert operation that is common
4397 * to both ZADD and ZINCRBY... */
4398 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4399 /* case 1: New element */
4400 incrRefCount(ele); /* added to hash */
4401 zslInsert(zs->zsl,*score,ele);
4402 incrRefCount(ele); /* added to skiplist */
4403 server.dirty++;
4404 if (doincrement)
4405 addReplyDouble(c,*score);
4406 else
4407 addReply(c,shared.cone);
4408 } else {
4409 dictEntry *de;
4410 double *oldscore;
4411
4412 /* case 2: Score update operation */
4413 de = dictFind(zs->dict,ele);
4414 redisAssert(de != NULL);
4415 oldscore = dictGetEntryVal(de);
4416 if (*score != *oldscore) {
4417 int deleted;
4418
4419 /* Remove and insert the element in the skip list with new score */
4420 deleted = zslDelete(zs->zsl,*oldscore,ele);
4421 redisAssert(deleted != 0);
4422 zslInsert(zs->zsl,*score,ele);
4423 incrRefCount(ele);
4424 /* Update the score in the hash table */
4425 dictReplace(zs->dict,ele,score);
4426 server.dirty++;
4427 } else {
4428 zfree(score);
4429 }
4430 if (doincrement)
4431 addReplyDouble(c,*score);
4432 else
4433 addReply(c,shared.czero);
4434 }
4435 }
4436
4437 static void zaddCommand(redisClient *c) {
4438 double scoreval;
4439
4440 scoreval = strtod(c->argv[2]->ptr,NULL);
4441 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4442 }
4443
4444 static void zincrbyCommand(redisClient *c) {
4445 double scoreval;
4446
4447 scoreval = strtod(c->argv[2]->ptr,NULL);
4448 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4449 }
4450
4451 static void zremCommand(redisClient *c) {
4452 robj *zsetobj;
4453 zset *zs;
4454
4455 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4456 if (zsetobj == NULL) {
4457 addReply(c,shared.czero);
4458 } else {
4459 dictEntry *de;
4460 double *oldscore;
4461 int deleted;
4462
4463 if (zsetobj->type != REDIS_ZSET) {
4464 addReply(c,shared.wrongtypeerr);
4465 return;
4466 }
4467 zs = zsetobj->ptr;
4468 de = dictFind(zs->dict,c->argv[2]);
4469 if (de == NULL) {
4470 addReply(c,shared.czero);
4471 return;
4472 }
4473 /* Delete from the skiplist */
4474 oldscore = dictGetEntryVal(de);
4475 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4476 redisAssert(deleted != 0);
4477
4478 /* Delete from the hash table */
4479 dictDelete(zs->dict,c->argv[2]);
4480 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4481 server.dirty++;
4482 addReply(c,shared.cone);
4483 }
4484 }
4485
4486 static void zremrangebyscoreCommand(redisClient *c) {
4487 double min = strtod(c->argv[2]->ptr,NULL);
4488 double max = strtod(c->argv[3]->ptr,NULL);
4489 robj *zsetobj;
4490 zset *zs;
4491
4492 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4493 if (zsetobj == NULL) {
4494 addReply(c,shared.czero);
4495 } else {
4496 long deleted;
4497
4498 if (zsetobj->type != REDIS_ZSET) {
4499 addReply(c,shared.wrongtypeerr);
4500 return;
4501 }
4502 zs = zsetobj->ptr;
4503 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4504 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4505 server.dirty += deleted;
4506 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4507 }
4508 }
4509
4510 static void zrangeGenericCommand(redisClient *c, int reverse) {
4511 robj *o;
4512 int start = atoi(c->argv[2]->ptr);
4513 int end = atoi(c->argv[3]->ptr);
4514
4515 o = lookupKeyRead(c->db,c->argv[1]);
4516 if (o == NULL) {
4517 addReply(c,shared.nullmultibulk);
4518 } else {
4519 if (o->type != REDIS_ZSET) {
4520 addReply(c,shared.wrongtypeerr);
4521 } else {
4522 zset *zsetobj = o->ptr;
4523 zskiplist *zsl = zsetobj->zsl;
4524 zskiplistNode *ln;
4525
4526 int llen = zsl->length;
4527 int rangelen, j;
4528 robj *ele;
4529
4530 /* convert negative indexes */
4531 if (start < 0) start = llen+start;
4532 if (end < 0) end = llen+end;
4533 if (start < 0) start = 0;
4534 if (end < 0) end = 0;
4535
4536 /* indexes sanity checks */
4537 if (start > end || start >= llen) {
4538 /* Out of range start or start > end result in empty list */
4539 addReply(c,shared.emptymultibulk);
4540 return;
4541 }
4542 if (end >= llen) end = llen-1;
4543 rangelen = (end-start)+1;
4544
4545 /* Return the result in form of a multi-bulk reply */
4546 if (reverse) {
4547 ln = zsl->tail;
4548 while (start--)
4549 ln = ln->backward;
4550 } else {
4551 ln = zsl->header->forward[0];
4552 while (start--)
4553 ln = ln->forward[0];
4554 }
4555
4556 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4557 for (j = 0; j < rangelen; j++) {
4558 ele = ln->obj;
4559 addReplyBulkLen(c,ele);
4560 addReply(c,ele);
4561 addReply(c,shared.crlf);
4562 ln = reverse ? ln->backward : ln->forward[0];
4563 }
4564 }
4565 }
4566 }
4567
4568 static void zrangeCommand(redisClient *c) {
4569 zrangeGenericCommand(c,0);
4570 }
4571
4572 static void zrevrangeCommand(redisClient *c) {
4573 zrangeGenericCommand(c,1);
4574 }
4575
4576 static void zrangebyscoreCommand(redisClient *c) {
4577 robj *o;
4578 double min = strtod(c->argv[2]->ptr,NULL);
4579 double max = strtod(c->argv[3]->ptr,NULL);
4580 int offset = 0, limit = -1;
4581
4582 if (c->argc != 4 && c->argc != 7) {
4583 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4584 return;
4585 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4586 addReply(c,shared.syntaxerr);
4587 return;
4588 } else if (c->argc == 7) {
4589 offset = atoi(c->argv[5]->ptr);
4590 limit = atoi(c->argv[6]->ptr);
4591 if (offset < 0) offset = 0;
4592 }
4593
4594 o = lookupKeyRead(c->db,c->argv[1]);
4595 if (o == NULL) {
4596 addReply(c,shared.nullmultibulk);
4597 } else {
4598 if (o->type != REDIS_ZSET) {
4599 addReply(c,shared.wrongtypeerr);
4600 } else {
4601 zset *zsetobj = o->ptr;
4602 zskiplist *zsl = zsetobj->zsl;
4603 zskiplistNode *ln;
4604 robj *ele, *lenobj;
4605 unsigned int rangelen = 0;
4606
4607 /* Get the first node with the score >= min */
4608 ln = zslFirstWithScore(zsl,min);
4609 if (ln == NULL) {
4610 /* No element matching the speciifed interval */
4611 addReply(c,shared.emptymultibulk);
4612 return;
4613 }
4614
4615 /* We don't know in advance how many matching elements there
4616 * are in the list, so we push this object that will represent
4617 * the multi-bulk length in the output buffer, and will "fix"
4618 * it later */
4619 lenobj = createObject(REDIS_STRING,NULL);
4620 addReply(c,lenobj);
4621 decrRefCount(lenobj);
4622
4623 while(ln && ln->score <= max) {
4624 if (offset) {
4625 offset--;
4626 ln = ln->forward[0];
4627 continue;
4628 }
4629 if (limit == 0) break;
4630 ele = ln->obj;
4631 addReplyBulkLen(c,ele);
4632 addReply(c,ele);
4633 addReply(c,shared.crlf);
4634 ln = ln->forward[0];
4635 rangelen++;
4636 if (limit > 0) limit--;
4637 }
4638 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4639 }
4640 }
4641 }
4642
4643 static void zcardCommand(redisClient *c) {
4644 robj *o;
4645 zset *zs;
4646
4647 o = lookupKeyRead(c->db,c->argv[1]);
4648 if (o == NULL) {
4649 addReply(c,shared.czero);
4650 return;
4651 } else {
4652 if (o->type != REDIS_ZSET) {
4653 addReply(c,shared.wrongtypeerr);
4654 } else {
4655 zs = o->ptr;
4656 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
4657 }
4658 }
4659 }
4660
4661 static void zscoreCommand(redisClient *c) {
4662 robj *o;
4663 zset *zs;
4664
4665 o = lookupKeyRead(c->db,c->argv[1]);
4666 if (o == NULL) {
4667 addReply(c,shared.nullbulk);
4668 return;
4669 } else {
4670 if (o->type != REDIS_ZSET) {
4671 addReply(c,shared.wrongtypeerr);
4672 } else {
4673 dictEntry *de;
4674
4675 zs = o->ptr;
4676 de = dictFind(zs->dict,c->argv[2]);
4677 if (!de) {
4678 addReply(c,shared.nullbulk);
4679 } else {
4680 double *score = dictGetEntryVal(de);
4681
4682 addReplyDouble(c,*score);
4683 }
4684 }
4685 }
4686 }
4687
4688 /* ========================= Non type-specific commands ==================== */
4689
4690 static void flushdbCommand(redisClient *c) {
4691 server.dirty += dictSize(c->db->dict);
4692 dictEmpty(c->db->dict);
4693 dictEmpty(c->db->expires);
4694 addReply(c,shared.ok);
4695 }
4696
4697 static void flushallCommand(redisClient *c) {
4698 server.dirty += emptyDb();
4699 addReply(c,shared.ok);
4700 rdbSave(server.dbfilename);
4701 server.dirty++;
4702 }
4703
4704 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4705 redisSortOperation *so = zmalloc(sizeof(*so));
4706 so->type = type;
4707 so->pattern = pattern;
4708 return so;
4709 }
4710
4711 /* Return the value associated to the key with a name obtained
4712 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4713 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4714 char *p;
4715 sds spat, ssub;
4716 robj keyobj;
4717 int prefixlen, sublen, postfixlen;
4718 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4719 struct {
4720 long len;
4721 long free;
4722 char buf[REDIS_SORTKEY_MAX+1];
4723 } keyname;
4724
4725 /* If the pattern is "#" return the substitution object itself in order
4726 * to implement the "SORT ... GET #" feature. */
4727 spat = pattern->ptr;
4728 if (spat[0] == '#' && spat[1] == '\0') {
4729 return subst;
4730 }
4731
4732 /* The substitution object may be specially encoded. If so we create
4733 * a decoded object on the fly. Otherwise getDecodedObject will just
4734 * increment the ref count, that we'll decrement later. */
4735 subst = getDecodedObject(subst);
4736
4737 ssub = subst->ptr;
4738 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4739 p = strchr(spat,'*');
4740 if (!p) {
4741 decrRefCount(subst);
4742 return NULL;
4743 }
4744
4745 prefixlen = p-spat;
4746 sublen = sdslen(ssub);
4747 postfixlen = sdslen(spat)-(prefixlen+1);
4748 memcpy(keyname.buf,spat,prefixlen);
4749 memcpy(keyname.buf+prefixlen,ssub,sublen);
4750 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4751 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4752 keyname.len = prefixlen+sublen+postfixlen;
4753
4754 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
4755 decrRefCount(subst);
4756
4757 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4758 return lookupKeyRead(db,&keyobj);
4759 }
4760
4761 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4762 * the additional parameter is not standard but a BSD-specific we have to
4763 * pass sorting parameters via the global 'server' structure */
4764 static int sortCompare(const void *s1, const void *s2) {
4765 const redisSortObject *so1 = s1, *so2 = s2;
4766 int cmp;
4767
4768 if (!server.sort_alpha) {
4769 /* Numeric sorting. Here it's trivial as we precomputed scores */
4770 if (so1->u.score > so2->u.score) {
4771 cmp = 1;
4772 } else if (so1->u.score < so2->u.score) {
4773 cmp = -1;
4774 } else {
4775 cmp = 0;
4776 }
4777 } else {
4778 /* Alphanumeric sorting */
4779 if (server.sort_bypattern) {
4780 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4781 /* At least one compare object is NULL */
4782 if (so1->u.cmpobj == so2->u.cmpobj)
4783 cmp = 0;
4784 else if (so1->u.cmpobj == NULL)
4785 cmp = -1;
4786 else
4787 cmp = 1;
4788 } else {
4789 /* We have both the objects, use strcoll */
4790 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4791 }
4792 } else {
4793 /* Compare elements directly */
4794 robj *dec1, *dec2;
4795
4796 dec1 = getDecodedObject(so1->obj);
4797 dec2 = getDecodedObject(so2->obj);
4798 cmp = strcoll(dec1->ptr,dec2->ptr);
4799 decrRefCount(dec1);
4800 decrRefCount(dec2);
4801 }
4802 }
4803 return server.sort_desc ? -cmp : cmp;
4804 }
4805
4806 /* The SORT command is the most complex command in Redis. Warning: this code
4807 * is optimized for speed and a bit less for readability */
4808 static void sortCommand(redisClient *c) {
4809 list *operations;
4810 int outputlen = 0;
4811 int desc = 0, alpha = 0;
4812 int limit_start = 0, limit_count = -1, start, end;
4813 int j, dontsort = 0, vectorlen;
4814 int getop = 0; /* GET operation counter */
4815 robj *sortval, *sortby = NULL, *storekey = NULL;
4816 redisSortObject *vector; /* Resulting vector to sort */
4817
4818 /* Lookup the key to sort. It must be of the right types */
4819 sortval = lookupKeyRead(c->db,c->argv[1]);
4820 if (sortval == NULL) {
4821 addReply(c,shared.nokeyerr);
4822 return;
4823 }
4824 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
4825 sortval->type != REDIS_ZSET)
4826 {
4827 addReply(c,shared.wrongtypeerr);
4828 return;
4829 }
4830
4831 /* Create a list of operations to perform for every sorted element.
4832 * Operations can be GET/DEL/INCR/DECR */
4833 operations = listCreate();
4834 listSetFreeMethod(operations,zfree);
4835 j = 2;
4836
4837 /* Now we need to protect sortval incrementing its count, in the future
4838 * SORT may have options able to overwrite/delete keys during the sorting
4839 * and the sorted key itself may get destroied */
4840 incrRefCount(sortval);
4841
4842 /* The SORT command has an SQL-alike syntax, parse it */
4843 while(j < c->argc) {
4844 int leftargs = c->argc-j-1;
4845 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4846 desc = 0;
4847 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4848 desc = 1;
4849 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4850 alpha = 1;
4851 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4852 limit_start = atoi(c->argv[j+1]->ptr);
4853 limit_count = atoi(c->argv[j+2]->ptr);
4854 j+=2;
4855 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4856 storekey = c->argv[j+1];
4857 j++;
4858 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4859 sortby = c->argv[j+1];
4860 /* If the BY pattern does not contain '*', i.e. it is constant,
4861 * we don't need to sort nor to lookup the weight keys. */
4862 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4863 j++;
4864 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4865 listAddNodeTail(operations,createSortOperation(
4866 REDIS_SORT_GET,c->argv[j+1]));
4867 getop++;
4868 j++;
4869 } else {
4870 decrRefCount(sortval);
4871 listRelease(operations);
4872 addReply(c,shared.syntaxerr);
4873 return;
4874 }
4875 j++;
4876 }
4877
4878 /* Load the sorting vector with all the objects to sort */
4879 switch(sortval->type) {
4880 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
4881 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
4882 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
4883 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
4884 }
4885 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4886 j = 0;
4887
4888 if (sortval->type == REDIS_LIST) {
4889 list *list = sortval->ptr;
4890 listNode *ln;
4891
4892 listRewind(list);
4893 while((ln = listYield(list))) {
4894 robj *ele = ln->value;
4895 vector[j].obj = ele;
4896 vector[j].u.score = 0;
4897 vector[j].u.cmpobj = NULL;
4898 j++;
4899 }
4900 } else {
4901 dict *set;
4902 dictIterator *di;
4903 dictEntry *setele;
4904
4905 if (sortval->type == REDIS_SET) {
4906 set = sortval->ptr;
4907 } else {
4908 zset *zs = sortval->ptr;
4909 set = zs->dict;
4910 }
4911
4912 di = dictGetIterator(set);
4913 while((setele = dictNext(di)) != NULL) {
4914 vector[j].obj = dictGetEntryKey(setele);
4915 vector[j].u.score = 0;
4916 vector[j].u.cmpobj = NULL;
4917 j++;
4918 }
4919 dictReleaseIterator(di);
4920 }
4921 redisAssert(j == vectorlen);
4922
4923 /* Now it's time to load the right scores in the sorting vector */
4924 if (dontsort == 0) {
4925 for (j = 0; j < vectorlen; j++) {
4926 if (sortby) {
4927 robj *byval;
4928
4929 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4930 if (!byval || byval->type != REDIS_STRING) continue;
4931 if (alpha) {
4932 vector[j].u.cmpobj = getDecodedObject(byval);
4933 } else {
4934 if (byval->encoding == REDIS_ENCODING_RAW) {
4935 vector[j].u.score = strtod(byval->ptr,NULL);
4936 } else {
4937 /* Don't need to decode the object if it's
4938 * integer-encoded (the only encoding supported) so
4939 * far. We can just cast it */
4940 if (byval->encoding == REDIS_ENCODING_INT) {
4941 vector[j].u.score = (long)byval->ptr;
4942 } else
4943 redisAssert(1 != 1);
4944 }
4945 }
4946 } else {
4947 if (!alpha) {
4948 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4949 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4950 else {
4951 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4952 vector[j].u.score = (long) vector[j].obj->ptr;
4953 else
4954 redisAssert(1 != 1);
4955 }
4956 }
4957 }
4958 }
4959 }
4960
4961 /* We are ready to sort the vector... perform a bit of sanity check
4962 * on the LIMIT option too. We'll use a partial version of quicksort. */
4963 start = (limit_start < 0) ? 0 : limit_start;
4964 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4965 if (start >= vectorlen) {
4966 start = vectorlen-1;
4967 end = vectorlen-2;
4968 }
4969 if (end >= vectorlen) end = vectorlen-1;
4970
4971 if (dontsort == 0) {
4972 server.sort_desc = desc;
4973 server.sort_alpha = alpha;
4974 server.sort_bypattern = sortby ? 1 : 0;
4975 if (sortby && (start != 0 || end != vectorlen-1))
4976 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4977 else
4978 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4979 }
4980
4981 /* Send command output to the output buffer, performing the specified
4982 * GET/DEL/INCR/DECR operations if any. */
4983 outputlen = getop ? getop*(end-start+1) : end-start+1;
4984 if (storekey == NULL) {
4985 /* STORE option not specified, sent the sorting result to client */
4986 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4987 for (j = start; j <= end; j++) {
4988 listNode *ln;
4989 if (!getop) {
4990 addReplyBulkLen(c,vector[j].obj);
4991 addReply(c,vector[j].obj);
4992 addReply(c,shared.crlf);
4993 }
4994 listRewind(operations);
4995 while((ln = listYield(operations))) {
4996 redisSortOperation *sop = ln->value;
4997 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4998 vector[j].obj);
4999
5000 if (sop->type == REDIS_SORT_GET) {
5001 if (!val || val->type != REDIS_STRING) {
5002 addReply(c,shared.nullbulk);
5003 } else {
5004 addReplyBulkLen(c,val);
5005 addReply(c,val);
5006 addReply(c,shared.crlf);
5007 }
5008 } else {
5009 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5010 }
5011 }
5012 }
5013 } else {
5014 robj *listObject = createListObject();
5015 list *listPtr = (list*) listObject->ptr;
5016
5017 /* STORE option specified, set the sorting result as a List object */
5018 for (j = start; j <= end; j++) {
5019 listNode *ln;
5020 if (!getop) {
5021 listAddNodeTail(listPtr,vector[j].obj);
5022 incrRefCount(vector[j].obj);
5023 }
5024 listRewind(operations);
5025 while((ln = listYield(operations))) {
5026 redisSortOperation *sop = ln->value;
5027 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5028 vector[j].obj);
5029
5030 if (sop->type == REDIS_SORT_GET) {
5031 if (!val || val->type != REDIS_STRING) {
5032 listAddNodeTail(listPtr,createStringObject("",0));
5033 } else {
5034 listAddNodeTail(listPtr,val);
5035 incrRefCount(val);
5036 }
5037 } else {
5038 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5039 }
5040 }
5041 }
5042 if (dictReplace(c->db->dict,storekey,listObject)) {
5043 incrRefCount(storekey);
5044 }
5045 /* Note: we add 1 because the DB is dirty anyway since even if the
5046 * SORT result is empty a new key is set and maybe the old content
5047 * replaced. */
5048 server.dirty += 1+outputlen;
5049 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5050 }
5051
5052 /* Cleanup */
5053 decrRefCount(sortval);
5054 listRelease(operations);
5055 for (j = 0; j < vectorlen; j++) {
5056 if (sortby && alpha && vector[j].u.cmpobj)
5057 decrRefCount(vector[j].u.cmpobj);
5058 }
5059 zfree(vector);
5060 }
5061
5062 /* Create the string returned by the INFO command. This is decoupled
5063 * by the INFO command itself as we need to report the same information
5064 * on memory corruption problems. */
5065 static sds genRedisInfoString(void) {
5066 sds info;
5067 time_t uptime = time(NULL)-server.stat_starttime;
5068 int j;
5069
5070 info = sdscatprintf(sdsempty(),
5071 "redis_version:%s\r\n"
5072 "arch_bits:%s\r\n"
5073 "multiplexing_api:%s\r\n"
5074 "uptime_in_seconds:%ld\r\n"
5075 "uptime_in_days:%ld\r\n"
5076 "connected_clients:%d\r\n"
5077 "connected_slaves:%d\r\n"
5078 "used_memory:%zu\r\n"
5079 "changes_since_last_save:%lld\r\n"
5080 "bgsave_in_progress:%d\r\n"
5081 "last_save_time:%ld\r\n"
5082 "total_connections_received:%lld\r\n"
5083 "total_commands_processed:%lld\r\n"
5084 "role:%s\r\n"
5085 ,REDIS_VERSION,
5086 (sizeof(long) == 8) ? "64" : "32",
5087 aeGetApiName(),
5088 uptime,
5089 uptime/(3600*24),
5090 listLength(server.clients)-listLength(server.slaves),
5091 listLength(server.slaves),
5092 server.usedmemory,
5093 server.dirty,
5094 server.bgsavechildpid != -1,
5095 server.lastsave,
5096 server.stat_numconnections,
5097 server.stat_numcommands,
5098 server.masterhost == NULL ? "master" : "slave"
5099 );
5100 if (server.masterhost) {
5101 info = sdscatprintf(info,
5102 "master_host:%s\r\n"
5103 "master_port:%d\r\n"
5104 "master_link_status:%s\r\n"
5105 "master_last_io_seconds_ago:%d\r\n"
5106 ,server.masterhost,
5107 server.masterport,
5108 (server.replstate == REDIS_REPL_CONNECTED) ?
5109 "up" : "down",
5110 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5111 );
5112 }
5113 for (j = 0; j < server.dbnum; j++) {
5114 long long keys, vkeys;
5115
5116 keys = dictSize(server.db[j].dict);
5117 vkeys = dictSize(server.db[j].expires);
5118 if (keys || vkeys) {
5119 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5120 j, keys, vkeys);
5121 }
5122 }
5123 return info;
5124 }
5125
5126 static void infoCommand(redisClient *c) {
5127 sds info = genRedisInfoString();
5128 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",sdslen(info)));
5129 addReplySds(c,info);
5130 addReply(c,shared.crlf);
5131 }
5132
5133 static void monitorCommand(redisClient *c) {
5134 /* ignore MONITOR if aleady slave or in monitor mode */
5135 if (c->flags & REDIS_SLAVE) return;
5136
5137 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5138 c->slaveseldb = 0;
5139 listAddNodeTail(server.monitors,c);
5140 addReply(c,shared.ok);
5141 }
5142
5143 /* ================================= Expire ================================= */
5144 static int removeExpire(redisDb *db, robj *key) {
5145 if (dictDelete(db->expires,key) == DICT_OK) {
5146 return 1;
5147 } else {
5148 return 0;
5149 }
5150 }
5151
5152 static int setExpire(redisDb *db, robj *key, time_t when) {
5153 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5154 return 0;
5155 } else {
5156 incrRefCount(key);
5157 return 1;
5158 }
5159 }
5160
5161 /* Return the expire time of the specified key, or -1 if no expire
5162 * is associated with this key (i.e. the key is non volatile) */
5163 static time_t getExpire(redisDb *db, robj *key) {
5164 dictEntry *de;
5165
5166 /* No expire? return ASAP */
5167 if (dictSize(db->expires) == 0 ||
5168 (de = dictFind(db->expires,key)) == NULL) return -1;
5169
5170 return (time_t) dictGetEntryVal(de);
5171 }
5172
5173 static int expireIfNeeded(redisDb *db, robj *key) {
5174 time_t when;
5175 dictEntry *de;
5176
5177 /* No expire? return ASAP */
5178 if (dictSize(db->expires) == 0 ||
5179 (de = dictFind(db->expires,key)) == NULL) return 0;
5180
5181 /* Lookup the expire */
5182 when = (time_t) dictGetEntryVal(de);
5183 if (time(NULL) <= when) return 0;
5184
5185 /* Delete the key */
5186 dictDelete(db->expires,key);
5187 return dictDelete(db->dict,key) == DICT_OK;
5188 }
5189
5190 static int deleteIfVolatile(redisDb *db, robj *key) {
5191 dictEntry *de;
5192
5193 /* No expire? return ASAP */
5194 if (dictSize(db->expires) == 0 ||
5195 (de = dictFind(db->expires,key)) == NULL) return 0;
5196
5197 /* Delete the key */
5198 server.dirty++;
5199 dictDelete(db->expires,key);
5200 return dictDelete(db->dict,key) == DICT_OK;
5201 }
5202
5203 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5204 dictEntry *de;
5205
5206 de = dictFind(c->db->dict,key);
5207 if (de == NULL) {
5208 addReply(c,shared.czero);
5209 return;
5210 }
5211 if (seconds < 0) {
5212 if (deleteKey(c->db,key)) server.dirty++;
5213 addReply(c, shared.cone);
5214 return;
5215 } else {
5216 time_t when = time(NULL)+seconds;
5217 if (setExpire(c->db,key,when)) {
5218 addReply(c,shared.cone);
5219 server.dirty++;
5220 } else {
5221 addReply(c,shared.czero);
5222 }
5223 return;
5224 }
5225 }
5226
5227 static void expireCommand(redisClient *c) {
5228 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5229 }
5230
5231 static void expireatCommand(redisClient *c) {
5232 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5233 }
5234
5235 static void ttlCommand(redisClient *c) {
5236 time_t expire;
5237 int ttl = -1;
5238
5239 expire = getExpire(c->db,c->argv[1]);
5240 if (expire != -1) {
5241 ttl = (int) (expire-time(NULL));
5242 if (ttl < 0) ttl = -1;
5243 }
5244 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5245 }
5246
5247 /* =============================== Replication ============================= */
5248
5249 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5250 ssize_t nwritten, ret = size;
5251 time_t start = time(NULL);
5252
5253 timeout++;
5254 while(size) {
5255 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5256 nwritten = write(fd,ptr,size);
5257 if (nwritten == -1) return -1;
5258 ptr += nwritten;
5259 size -= nwritten;
5260 }
5261 if ((time(NULL)-start) > timeout) {
5262 errno = ETIMEDOUT;
5263 return -1;
5264 }
5265 }
5266 return ret;
5267 }
5268
5269 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5270 ssize_t nread, totread = 0;
5271 time_t start = time(NULL);
5272
5273 timeout++;
5274 while(size) {
5275 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5276 nread = read(fd,ptr,size);
5277 if (nread == -1) return -1;
5278 ptr += nread;
5279 size -= nread;
5280 totread += nread;
5281 }
5282 if ((time(NULL)-start) > timeout) {
5283 errno = ETIMEDOUT;
5284 return -1;
5285 }
5286 }
5287 return totread;
5288 }
5289
5290 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5291 ssize_t nread = 0;
5292
5293 size--;
5294 while(size) {
5295 char c;
5296
5297 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5298 if (c == '\n') {
5299 *ptr = '\0';
5300 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5301 return nread;
5302 } else {
5303 *ptr++ = c;
5304 *ptr = '\0';
5305 nread++;
5306 }
5307 }
5308 return nread;
5309 }
5310
5311 static void syncCommand(redisClient *c) {
5312 /* ignore SYNC if aleady slave or in monitor mode */
5313 if (c->flags & REDIS_SLAVE) return;
5314
5315 /* SYNC can't be issued when the server has pending data to send to
5316 * the client about already issued commands. We need a fresh reply
5317 * buffer registering the differences between the BGSAVE and the current
5318 * dataset, so that we can copy to other slaves if needed. */
5319 if (listLength(c->reply) != 0) {
5320 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5321 return;
5322 }
5323
5324 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5325 /* Here we need to check if there is a background saving operation
5326 * in progress, or if it is required to start one */
5327 if (server.bgsavechildpid != -1) {
5328 /* Ok a background save is in progress. Let's check if it is a good
5329 * one for replication, i.e. if there is another slave that is
5330 * registering differences since the server forked to save */
5331 redisClient *slave;
5332 listNode *ln;
5333
5334 listRewind(server.slaves);
5335 while((ln = listYield(server.slaves))) {
5336 slave = ln->value;
5337 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5338 }
5339 if (ln) {
5340 /* Perfect, the server is already registering differences for
5341 * another slave. Set the right state, and copy the buffer. */
5342 listRelease(c->reply);
5343 c->reply = listDup(slave->reply);
5344 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5345 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5346 } else {
5347 /* No way, we need to wait for the next BGSAVE in order to
5348 * register differences */
5349 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5350 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5351 }
5352 } else {
5353 /* Ok we don't have a BGSAVE in progress, let's start one */
5354 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5355 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5356 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5357 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5358 return;
5359 }
5360 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5361 }
5362 c->repldbfd = -1;
5363 c->flags |= REDIS_SLAVE;
5364 c->slaveseldb = 0;
5365 listAddNodeTail(server.slaves,c);
5366 return;
5367 }
5368
5369 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5370 redisClient *slave = privdata;
5371 REDIS_NOTUSED(el);
5372 REDIS_NOTUSED(mask);
5373 char buf[REDIS_IOBUF_LEN];
5374 ssize_t nwritten, buflen;
5375
5376 if (slave->repldboff == 0) {
5377 /* Write the bulk write count before to transfer the DB. In theory here
5378 * we don't know how much room there is in the output buffer of the
5379 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5380 * operations) will never be smaller than the few bytes we need. */
5381 sds bulkcount;
5382
5383 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5384 slave->repldbsize);
5385 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5386 {
5387 sdsfree(bulkcount);
5388 freeClient(slave);
5389 return;
5390 }
5391 sdsfree(bulkcount);
5392 }
5393 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5394 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5395 if (buflen <= 0) {
5396 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5397 (buflen == 0) ? "premature EOF" : strerror(errno));
5398 freeClient(slave);
5399 return;
5400 }
5401 if ((nwritten = write(fd,buf,buflen)) == -1) {
5402 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5403 strerror(errno));
5404 freeClient(slave);
5405 return;
5406 }
5407 slave->repldboff += nwritten;
5408 if (slave->repldboff == slave->repldbsize) {
5409 close(slave->repldbfd);
5410 slave->repldbfd = -1;
5411 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5412 slave->replstate = REDIS_REPL_ONLINE;
5413 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5414 sendReplyToClient, slave) == AE_ERR) {
5415 freeClient(slave);
5416 return;
5417 }
5418 addReplySds(slave,sdsempty());
5419 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5420 }
5421 }
5422
5423 /* This function is called at the end of every backgrond saving.
5424 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5425 * otherwise REDIS_ERR is passed to the function.
5426 *
5427 * The goal of this function is to handle slaves waiting for a successful
5428 * background saving in order to perform non-blocking synchronization. */
5429 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5430 listNode *ln;
5431 int startbgsave = 0;
5432
5433 listRewind(server.slaves);
5434 while((ln = listYield(server.slaves))) {
5435 redisClient *slave = ln->value;
5436
5437 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5438 startbgsave = 1;
5439 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5440 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5441 struct redis_stat buf;
5442
5443 if (bgsaveerr != REDIS_OK) {
5444 freeClient(slave);
5445 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5446 continue;
5447 }
5448 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5449 redis_fstat(slave->repldbfd,&buf) == -1) {
5450 freeClient(slave);
5451 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5452 continue;
5453 }
5454 slave->repldboff = 0;
5455 slave->repldbsize = buf.st_size;
5456 slave->replstate = REDIS_REPL_SEND_BULK;
5457 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5458 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5459 freeClient(slave);
5460 continue;
5461 }
5462 }
5463 }
5464 if (startbgsave) {
5465 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5466 listRewind(server.slaves);
5467 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5468 while((ln = listYield(server.slaves))) {
5469 redisClient *slave = ln->value;
5470
5471 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5472 freeClient(slave);
5473 }
5474 }
5475 }
5476 }
5477
5478 static int syncWithMaster(void) {
5479 char buf[1024], tmpfile[256], authcmd[1024];
5480 int dumpsize;
5481 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5482 int dfd;
5483
5484 if (fd == -1) {
5485 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5486 strerror(errno));
5487 return REDIS_ERR;
5488 }
5489
5490 /* AUTH with the master if required. */
5491 if(server.masterauth) {
5492 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5493 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5494 close(fd);
5495 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5496 strerror(errno));
5497 return REDIS_ERR;
5498 }
5499 /* Read the AUTH result. */
5500 if (syncReadLine(fd,buf,1024,3600) == -1) {
5501 close(fd);
5502 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5503 strerror(errno));
5504 return REDIS_ERR;
5505 }
5506 if (buf[0] != '+') {
5507 close(fd);
5508 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5509 return REDIS_ERR;
5510 }
5511 }
5512
5513 /* Issue the SYNC command */
5514 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5515 close(fd);
5516 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5517 strerror(errno));
5518 return REDIS_ERR;
5519 }
5520 /* Read the bulk write count */
5521 if (syncReadLine(fd,buf,1024,3600) == -1) {
5522 close(fd);
5523 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5524 strerror(errno));
5525 return REDIS_ERR;
5526 }
5527 if (buf[0] != '$') {
5528 close(fd);
5529 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5530 return REDIS_ERR;
5531 }
5532 dumpsize = atoi(buf+1);
5533 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5534 /* Read the bulk write data on a temp file */
5535 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5536 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5537 if (dfd == -1) {
5538 close(fd);
5539 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5540 return REDIS_ERR;
5541 }
5542 while(dumpsize) {
5543 int nread, nwritten;
5544
5545 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5546 if (nread == -1) {
5547 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5548 strerror(errno));
5549 close(fd);
5550 close(dfd);
5551 return REDIS_ERR;
5552 }
5553 nwritten = write(dfd,buf,nread);
5554 if (nwritten == -1) {
5555 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5556 close(fd);
5557 close(dfd);
5558 return REDIS_ERR;
5559 }
5560 dumpsize -= nread;
5561 }
5562 close(dfd);
5563 if (rename(tmpfile,server.dbfilename) == -1) {
5564 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5565 unlink(tmpfile);
5566 close(fd);
5567 return REDIS_ERR;
5568 }
5569 emptyDb();
5570 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5571 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5572 close(fd);
5573 return REDIS_ERR;
5574 }
5575 server.master = createClient(fd);
5576 server.master->flags |= REDIS_MASTER;
5577 server.replstate = REDIS_REPL_CONNECTED;
5578 return REDIS_OK;
5579 }
5580
5581 static void slaveofCommand(redisClient *c) {
5582 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5583 !strcasecmp(c->argv[2]->ptr,"one")) {
5584 if (server.masterhost) {
5585 sdsfree(server.masterhost);
5586 server.masterhost = NULL;
5587 if (server.master) freeClient(server.master);
5588 server.replstate = REDIS_REPL_NONE;
5589 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5590 }
5591 } else {
5592 sdsfree(server.masterhost);
5593 server.masterhost = sdsdup(c->argv[1]->ptr);
5594 server.masterport = atoi(c->argv[2]->ptr);
5595 if (server.master) freeClient(server.master);
5596 server.replstate = REDIS_REPL_CONNECT;
5597 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5598 server.masterhost, server.masterport);
5599 }
5600 addReply(c,shared.ok);
5601 }
5602
5603 /* ============================ Maxmemory directive ======================== */
5604
5605 /* This function gets called when 'maxmemory' is set on the config file to limit
5606 * the max memory used by the server, and we are out of memory.
5607 * This function will try to, in order:
5608 *
5609 * - Free objects from the free list
5610 * - Try to remove keys with an EXPIRE set
5611 *
5612 * It is not possible to free enough memory to reach used-memory < maxmemory
5613 * the server will start refusing commands that will enlarge even more the
5614 * memory usage.
5615 */
5616 static void freeMemoryIfNeeded(void) {
5617 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5618 if (listLength(server.objfreelist)) {
5619 robj *o;
5620
5621 listNode *head = listFirst(server.objfreelist);
5622 o = listNodeValue(head);
5623 listDelNode(server.objfreelist,head);
5624 zfree(o);
5625 } else {
5626 int j, k, freed = 0;
5627
5628 for (j = 0; j < server.dbnum; j++) {
5629 int minttl = -1;
5630 robj *minkey = NULL;
5631 struct dictEntry *de;
5632
5633 if (dictSize(server.db[j].expires)) {
5634 freed = 1;
5635 /* From a sample of three keys drop the one nearest to
5636 * the natural expire */
5637 for (k = 0; k < 3; k++) {
5638 time_t t;
5639
5640 de = dictGetRandomKey(server.db[j].expires);
5641 t = (time_t) dictGetEntryVal(de);
5642 if (minttl == -1 || t < minttl) {
5643 minkey = dictGetEntryKey(de);
5644 minttl = t;
5645 }
5646 }
5647 deleteKey(server.db+j,minkey);
5648 }
5649 }
5650 if (!freed) return; /* nothing to free... */
5651 }
5652 }
5653 }
5654
5655 /* ============================== Append Only file ========================== */
5656
5657 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5658 sds buf = sdsempty();
5659 int j;
5660 ssize_t nwritten;
5661 time_t now;
5662 robj *tmpargv[3];
5663
5664 /* The DB this command was targetting is not the same as the last command
5665 * we appendend. To issue a SELECT command is needed. */
5666 if (dictid != server.appendseldb) {
5667 char seldb[64];
5668
5669 snprintf(seldb,sizeof(seldb),"%d",dictid);
5670 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
5671 strlen(seldb),seldb);
5672 server.appendseldb = dictid;
5673 }
5674
5675 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5676 * EXPIREs into EXPIREATs calls */
5677 if (cmd->proc == expireCommand) {
5678 long when;
5679
5680 tmpargv[0] = createStringObject("EXPIREAT",8);
5681 tmpargv[1] = argv[1];
5682 incrRefCount(argv[1]);
5683 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5684 tmpargv[2] = createObject(REDIS_STRING,
5685 sdscatprintf(sdsempty(),"%ld",when));
5686 argv = tmpargv;
5687 }
5688
5689 /* Append the actual command */
5690 buf = sdscatprintf(buf,"*%d\r\n",argc);
5691 for (j = 0; j < argc; j++) {
5692 robj *o = argv[j];
5693
5694 o = getDecodedObject(o);
5695 buf = sdscatprintf(buf,"$%lu\r\n",sdslen(o->ptr));
5696 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5697 buf = sdscatlen(buf,"\r\n",2);
5698 decrRefCount(o);
5699 }
5700
5701 /* Free the objects from the modified argv for EXPIREAT */
5702 if (cmd->proc == expireCommand) {
5703 for (j = 0; j < 3; j++)
5704 decrRefCount(argv[j]);
5705 }
5706
5707 /* We want to perform a single write. This should be guaranteed atomic
5708 * at least if the filesystem we are writing is a real physical one.
5709 * While this will save us against the server being killed I don't think
5710 * there is much to do about the whole server stopping for power problems
5711 * or alike */
5712 nwritten = write(server.appendfd,buf,sdslen(buf));
5713 if (nwritten != (signed)sdslen(buf)) {
5714 /* Ooops, we are in troubles. The best thing to do for now is
5715 * to simply exit instead to give the illusion that everything is
5716 * working as expected. */
5717 if (nwritten == -1) {
5718 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5719 } else {
5720 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5721 }
5722 exit(1);
5723 }
5724 /* If a background append only file rewriting is in progress we want to
5725 * accumulate the differences between the child DB and the current one
5726 * in a buffer, so that when the child process will do its work we
5727 * can append the differences to the new append only file. */
5728 if (server.bgrewritechildpid != -1)
5729 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
5730
5731 sdsfree(buf);
5732 now = time(NULL);
5733 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5734 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5735 now-server.lastfsync > 1))
5736 {
5737 fsync(server.appendfd); /* Let's try to get this data on the disk */
5738 server.lastfsync = now;
5739 }
5740 }
5741
5742 /* In Redis commands are always executed in the context of a client, so in
5743 * order to load the append only file we need to create a fake client. */
5744 static struct redisClient *createFakeClient(void) {
5745 struct redisClient *c = zmalloc(sizeof(*c));
5746
5747 selectDb(c,0);
5748 c->fd = -1;
5749 c->querybuf = sdsempty();
5750 c->argc = 0;
5751 c->argv = NULL;
5752 c->flags = 0;
5753 /* We set the fake client as a slave waiting for the synchronization
5754 * so that Redis will not try to send replies to this client. */
5755 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5756 c->reply = listCreate();
5757 listSetFreeMethod(c->reply,decrRefCount);
5758 listSetDupMethod(c->reply,dupClientReplyValue);
5759 return c;
5760 }
5761
5762 static void freeFakeClient(struct redisClient *c) {
5763 sdsfree(c->querybuf);
5764 listRelease(c->reply);
5765 zfree(c);
5766 }
5767
5768 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5769 * error (the append only file is zero-length) REDIS_ERR is returned. On
5770 * fatal error an error message is logged and the program exists. */
5771 int loadAppendOnlyFile(char *filename) {
5772 struct redisClient *fakeClient;
5773 FILE *fp = fopen(filename,"r");
5774 struct redis_stat sb;
5775
5776 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5777 return REDIS_ERR;
5778
5779 if (fp == NULL) {
5780 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5781 exit(1);
5782 }
5783
5784 fakeClient = createFakeClient();
5785 while(1) {
5786 int argc, j;
5787 unsigned long len;
5788 robj **argv;
5789 char buf[128];
5790 sds argsds;
5791 struct redisCommand *cmd;
5792
5793 if (fgets(buf,sizeof(buf),fp) == NULL) {
5794 if (feof(fp))
5795 break;
5796 else
5797 goto readerr;
5798 }
5799 if (buf[0] != '*') goto fmterr;
5800 argc = atoi(buf+1);
5801 argv = zmalloc(sizeof(robj*)*argc);
5802 for (j = 0; j < argc; j++) {
5803 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5804 if (buf[0] != '$') goto fmterr;
5805 len = strtol(buf+1,NULL,10);
5806 argsds = sdsnewlen(NULL,len);
5807 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
5808 argv[j] = createObject(REDIS_STRING,argsds);
5809 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5810 }
5811
5812 /* Command lookup */
5813 cmd = lookupCommand(argv[0]->ptr);
5814 if (!cmd) {
5815 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5816 exit(1);
5817 }
5818 /* Try object sharing and encoding */
5819 if (server.shareobjects) {
5820 int j;
5821 for(j = 1; j < argc; j++)
5822 argv[j] = tryObjectSharing(argv[j]);
5823 }
5824 if (cmd->flags & REDIS_CMD_BULK)
5825 tryObjectEncoding(argv[argc-1]);
5826 /* Run the command in the context of a fake client */
5827 fakeClient->argc = argc;
5828 fakeClient->argv = argv;
5829 cmd->proc(fakeClient);
5830 /* Discard the reply objects list from the fake client */
5831 while(listLength(fakeClient->reply))
5832 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5833 /* Clean up, ready for the next command */
5834 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5835 zfree(argv);
5836 }
5837 fclose(fp);
5838 freeFakeClient(fakeClient);
5839 return REDIS_OK;
5840
5841 readerr:
5842 if (feof(fp)) {
5843 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5844 } else {
5845 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5846 }
5847 exit(1);
5848 fmterr:
5849 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5850 exit(1);
5851 }
5852
5853 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5854 static int fwriteBulk(FILE *fp, robj *obj) {
5855 char buf[128];
5856 obj = getDecodedObject(obj);
5857 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
5858 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
5859 if (fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0) goto err;
5860 if (fwrite("\r\n",2,1,fp) == 0) goto err;
5861 decrRefCount(obj);
5862 return 1;
5863 err:
5864 decrRefCount(obj);
5865 return 0;
5866 }
5867
5868 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5869 static int fwriteBulkDouble(FILE *fp, double d) {
5870 char buf[128], dbuf[128];
5871
5872 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
5873 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
5874 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5875 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
5876 return 1;
5877 }
5878
5879 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5880 static int fwriteBulkLong(FILE *fp, long l) {
5881 char buf[128], lbuf[128];
5882
5883 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
5884 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
5885 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5886 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
5887 return 1;
5888 }
5889
5890 /* Write a sequence of commands able to fully rebuild the dataset into
5891 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5892 static int rewriteAppendOnlyFile(char *filename) {
5893 dictIterator *di = NULL;
5894 dictEntry *de;
5895 FILE *fp;
5896 char tmpfile[256];
5897 int j;
5898 time_t now = time(NULL);
5899
5900 /* Note that we have to use a different temp name here compared to the
5901 * one used by rewriteAppendOnlyFileBackground() function. */
5902 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
5903 fp = fopen(tmpfile,"w");
5904 if (!fp) {
5905 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
5906 return REDIS_ERR;
5907 }
5908 for (j = 0; j < server.dbnum; j++) {
5909 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
5910 redisDb *db = server.db+j;
5911 dict *d = db->dict;
5912 if (dictSize(d) == 0) continue;
5913 di = dictGetIterator(d);
5914 if (!di) {
5915 fclose(fp);
5916 return REDIS_ERR;
5917 }
5918
5919 /* SELECT the new DB */
5920 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
5921 if (fwriteBulkLong(fp,j) == 0) goto werr;
5922
5923 /* Iterate this DB writing every entry */
5924 while((de = dictNext(di)) != NULL) {
5925 robj *key = dictGetEntryKey(de);
5926 robj *o = dictGetEntryVal(de);
5927 time_t expiretime = getExpire(db,key);
5928
5929 /* Save the key and associated value */
5930 if (o->type == REDIS_STRING) {
5931 /* Emit a SET command */
5932 char cmd[]="*3\r\n$3\r\nSET\r\n";
5933 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5934 /* Key and value */
5935 if (fwriteBulk(fp,key) == 0) goto werr;
5936 if (fwriteBulk(fp,o) == 0) goto werr;
5937 } else if (o->type == REDIS_LIST) {
5938 /* Emit the RPUSHes needed to rebuild the list */
5939 list *list = o->ptr;
5940 listNode *ln;
5941
5942 listRewind(list);
5943 while((ln = listYield(list))) {
5944 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
5945 robj *eleobj = listNodeValue(ln);
5946
5947 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5948 if (fwriteBulk(fp,key) == 0) goto werr;
5949 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5950 }
5951 } else if (o->type == REDIS_SET) {
5952 /* Emit the SADDs needed to rebuild the set */
5953 dict *set = o->ptr;
5954 dictIterator *di = dictGetIterator(set);
5955 dictEntry *de;
5956
5957 while((de = dictNext(di)) != NULL) {
5958 char cmd[]="*3\r\n$4\r\nSADD\r\n";
5959 robj *eleobj = dictGetEntryKey(de);
5960
5961 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5962 if (fwriteBulk(fp,key) == 0) goto werr;
5963 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5964 }
5965 dictReleaseIterator(di);
5966 } else if (o->type == REDIS_ZSET) {
5967 /* Emit the ZADDs needed to rebuild the sorted set */
5968 zset *zs = o->ptr;
5969 dictIterator *di = dictGetIterator(zs->dict);
5970 dictEntry *de;
5971
5972 while((de = dictNext(di)) != NULL) {
5973 char cmd[]="*4\r\n$4\r\nZADD\r\n";
5974 robj *eleobj = dictGetEntryKey(de);
5975 double *score = dictGetEntryVal(de);
5976
5977 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5978 if (fwriteBulk(fp,key) == 0) goto werr;
5979 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
5980 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5981 }
5982 dictReleaseIterator(di);
5983 } else {
5984 redisAssert(0 != 0);
5985 }
5986 /* Save the expire time */
5987 if (expiretime != -1) {
5988 char cmd[]="*3\r\n$6\r\nEXPIRE\r\n";
5989 /* If this key is already expired skip it */
5990 if (expiretime < now) continue;
5991 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5992 if (fwriteBulk(fp,key) == 0) goto werr;
5993 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
5994 }
5995 }
5996 dictReleaseIterator(di);
5997 }
5998
5999 /* Make sure data will not remain on the OS's output buffers */
6000 fflush(fp);
6001 fsync(fileno(fp));
6002 fclose(fp);
6003
6004 /* Use RENAME to make sure the DB file is changed atomically only
6005 * if the generate DB file is ok. */
6006 if (rename(tmpfile,filename) == -1) {
6007 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6008 unlink(tmpfile);
6009 return REDIS_ERR;
6010 }
6011 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6012 return REDIS_OK;
6013
6014 werr:
6015 fclose(fp);
6016 unlink(tmpfile);
6017 redisLog(REDIS_WARNING,"Write error writing append only fileon disk: %s", strerror(errno));
6018 if (di) dictReleaseIterator(di);
6019 return REDIS_ERR;
6020 }
6021
6022 /* This is how rewriting of the append only file in background works:
6023 *
6024 * 1) The user calls BGREWRITEAOF
6025 * 2) Redis calls this function, that forks():
6026 * 2a) the child rewrite the append only file in a temp file.
6027 * 2b) the parent accumulates differences in server.bgrewritebuf.
6028 * 3) When the child finished '2a' exists.
6029 * 4) The parent will trap the exit code, if it's OK, will append the
6030 * data accumulated into server.bgrewritebuf into the temp file, and
6031 * finally will rename(2) the temp file in the actual file name.
6032 * The the new file is reopened as the new append only file. Profit!
6033 */
6034 static int rewriteAppendOnlyFileBackground(void) {
6035 pid_t childpid;
6036
6037 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6038 if ((childpid = fork()) == 0) {
6039 /* Child */
6040 char tmpfile[256];
6041 close(server.fd);
6042
6043 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6044 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6045 exit(0);
6046 } else {
6047 exit(1);
6048 }
6049 } else {
6050 /* Parent */
6051 if (childpid == -1) {
6052 redisLog(REDIS_WARNING,
6053 "Can't rewrite append only file in background: fork: %s",
6054 strerror(errno));
6055 return REDIS_ERR;
6056 }
6057 redisLog(REDIS_NOTICE,
6058 "Background append only file rewriting started by pid %d",childpid);
6059 server.bgrewritechildpid = childpid;
6060 /* We set appendseldb to -1 in order to force the next call to the
6061 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6062 * accumulated by the parent into server.bgrewritebuf will start
6063 * with a SELECT statement and it will be safe to merge. */
6064 server.appendseldb = -1;
6065 return REDIS_OK;
6066 }
6067 return REDIS_OK; /* unreached */
6068 }
6069
6070 static void bgrewriteaofCommand(redisClient *c) {
6071 if (server.bgrewritechildpid != -1) {
6072 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6073 return;
6074 }
6075 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6076 addReply(c,shared.ok);
6077 } else {
6078 addReply(c,shared.err);
6079 }
6080 }
6081
6082 static void aofRemoveTempFile(pid_t childpid) {
6083 char tmpfile[256];
6084
6085 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6086 unlink(tmpfile);
6087 }
6088
6089 /* ================================= Debugging ============================== */
6090
6091 static void debugCommand(redisClient *c) {
6092 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6093 *((char*)-1) = 'x';
6094 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6095 if (rdbSave(server.dbfilename) != REDIS_OK) {
6096 addReply(c,shared.err);
6097 return;
6098 }
6099 emptyDb();
6100 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6101 addReply(c,shared.err);
6102 return;
6103 }
6104 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6105 addReply(c,shared.ok);
6106 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6107 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6108 robj *key, *val;
6109
6110 if (!de) {
6111 addReply(c,shared.nokeyerr);
6112 return;
6113 }
6114 key = dictGetEntryKey(de);
6115 val = dictGetEntryVal(de);
6116 addReplySds(c,sdscatprintf(sdsempty(),
6117 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6118 (void*)key, key->refcount, (void*)val, val->refcount,
6119 val->encoding));
6120 } else {
6121 addReplySds(c,sdsnew(
6122 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6123 }
6124 }
6125
6126 static void _redisAssert(char *estr) {
6127 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6128 redisLog(REDIS_WARNING,"==> %s\n",estr);
6129 #ifdef HAVE_BACKTRACE
6130 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6131 *((char*)-1) = 'x';
6132 #endif
6133 }
6134
6135 /* =================================== Main! ================================ */
6136
6137 #ifdef __linux__
6138 int linuxOvercommitMemoryValue(void) {
6139 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6140 char buf[64];
6141
6142 if (!fp) return -1;
6143 if (fgets(buf,64,fp) == NULL) {
6144 fclose(fp);
6145 return -1;
6146 }
6147 fclose(fp);
6148
6149 return atoi(buf);
6150 }
6151
6152 void linuxOvercommitMemoryWarning(void) {
6153 if (linuxOvercommitMemoryValue() == 0) {
6154 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6155 }
6156 }
6157 #endif /* __linux__ */
6158
6159 static void daemonize(void) {
6160 int fd;
6161 FILE *fp;
6162
6163 if (fork() != 0) exit(0); /* parent exits */
6164 printf("New pid: %d\n", getpid());
6165 setsid(); /* create a new session */
6166
6167 /* Every output goes to /dev/null. If Redis is daemonized but
6168 * the 'logfile' is set to 'stdout' in the configuration file
6169 * it will not log at all. */
6170 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6171 dup2(fd, STDIN_FILENO);
6172 dup2(fd, STDOUT_FILENO);
6173 dup2(fd, STDERR_FILENO);
6174 if (fd > STDERR_FILENO) close(fd);
6175 }
6176 /* Try to write the pid file */
6177 fp = fopen(server.pidfile,"w");
6178 if (fp) {
6179 fprintf(fp,"%d\n",getpid());
6180 fclose(fp);
6181 }
6182 }
6183
6184 int main(int argc, char **argv) {
6185 initServerConfig();
6186 if (argc == 2) {
6187 resetServerSaveParams();
6188 loadServerConfig(argv[1]);
6189 } else if (argc > 2) {
6190 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6191 exit(1);
6192 } else {
6193 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6194 }
6195 if (server.daemonize) daemonize();
6196 initServer();
6197 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6198 #ifdef __linux__
6199 linuxOvercommitMemoryWarning();
6200 #endif
6201 if (server.appendonly) {
6202 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6203 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6204 } else {
6205 if (rdbLoad(server.dbfilename) == REDIS_OK)
6206 redisLog(REDIS_NOTICE,"DB loaded from disk");
6207 }
6208 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6209 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6210 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6211 aeMain(server.el);
6212 aeDeleteEventLoop(server.el);
6213 return 0;
6214 }
6215
6216 /* ============================= Backtrace support ========================= */
6217
6218 #ifdef HAVE_BACKTRACE
6219 static char *findFuncName(void *pointer, unsigned long *offset);
6220
6221 static void *getMcontextEip(ucontext_t *uc) {
6222 #if defined(__FreeBSD__)
6223 return (void*) uc->uc_mcontext.mc_eip;
6224 #elif defined(__dietlibc__)
6225 return (void*) uc->uc_mcontext.eip;
6226 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6227 #if __x86_64__
6228 return (void*) uc->uc_mcontext->__ss.__rip;
6229 #else
6230 return (void*) uc->uc_mcontext->__ss.__eip;
6231 #endif
6232 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6233 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6234 return (void*) uc->uc_mcontext->__ss.__rip;
6235 #else
6236 return (void*) uc->uc_mcontext->__ss.__eip;
6237 #endif
6238 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6239 return (void*) uc->uc_mcontext.gregs[REG_EIP];
6240 #elif defined(__ia64__) /* Linux IA64 */
6241 return (void*) uc->uc_mcontext.sc_ip;
6242 #else
6243 return NULL;
6244 #endif
6245 }
6246
6247 static void segvHandler(int sig, siginfo_t *info, void *secret) {
6248 void *trace[100];
6249 char **messages = NULL;
6250 int i, trace_size = 0;
6251 unsigned long offset=0;
6252 ucontext_t *uc = (ucontext_t*) secret;
6253 sds infostring;
6254 REDIS_NOTUSED(info);
6255
6256 redisLog(REDIS_WARNING,
6257 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6258 infostring = genRedisInfoString();
6259 redisLog(REDIS_WARNING, "%s",infostring);
6260 /* It's not safe to sdsfree() the returned string under memory
6261 * corruption conditions. Let it leak as we are going to abort */
6262
6263 trace_size = backtrace(trace, 100);
6264 /* overwrite sigaction with caller's address */
6265 if (getMcontextEip(uc) != NULL) {
6266 trace[1] = getMcontextEip(uc);
6267 }
6268 messages = backtrace_symbols(trace, trace_size);
6269
6270 for (i=1; i<trace_size; ++i) {
6271 char *fn = findFuncName(trace[i], &offset), *p;
6272
6273 p = strchr(messages[i],'+');
6274 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6275 redisLog(REDIS_WARNING,"%s", messages[i]);
6276 } else {
6277 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6278 }
6279 }
6280 // free(messages); Don't call free() with possibly corrupted memory.
6281 exit(0);
6282 }
6283
6284 static void setupSigSegvAction(void) {
6285 struct sigaction act;
6286
6287 sigemptyset (&act.sa_mask);
6288 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6289 * is used. Otherwise, sa_handler is used */
6290 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6291 act.sa_sigaction = segvHandler;
6292 sigaction (SIGSEGV, &act, NULL);
6293 sigaction (SIGBUS, &act, NULL);
6294 sigaction (SIGFPE, &act, NULL);
6295 sigaction (SIGILL, &act, NULL);
6296 sigaction (SIGBUS, &act, NULL);
6297 return;
6298 }
6299
6300 #include "staticsymbols.h"
6301 /* This function try to convert a pointer into a function name. It's used in
6302 * oreder to provide a backtrace under segmentation fault that's able to
6303 * display functions declared as static (otherwise the backtrace is useless). */
6304 static char *findFuncName(void *pointer, unsigned long *offset){
6305 int i, ret = -1;
6306 unsigned long off, minoff = 0;
6307
6308 /* Try to match against the Symbol with the smallest offset */
6309 for (i=0; symsTable[i].pointer; i++) {
6310 unsigned long lp = (unsigned long) pointer;
6311
6312 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6313 off=lp-symsTable[i].pointer;
6314 if (ret < 0 || off < minoff) {
6315 minoff=off;
6316 ret=i;
6317 }
6318 }
6319 }
6320 if (ret == -1) return NULL;
6321 *offset = minoff;
6322 return symsTable[ret].name;
6323 }
6324 #else /* HAVE_BACKTRACE */
6325 static void setupSigSegvAction(void) {
6326 }
6327 #endif /* HAVE_BACKTRACE */
6328
6329
6330
6331 /* The End */
6332
6333
6334