]> git.saurik.com Git - redis.git/blob - redis.c
Fixed issue #121
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.1.91"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Client flags */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
165
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
174
175 /* List related stuff */
176 #define REDIS_HEAD 0
177 #define REDIS_TAIL 1
178
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
184
185 /* Log levels */
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
189
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
192
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
195
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
200
201 /* We can print the stacktrace, so our assert is defined this way: */
202 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
203 static void _redisAssert(char *estr);
204
205 /*================================= Data types ============================== */
206
207 /* A redis object, that is a type able to hold a string / list / set */
208 typedef struct redisObject {
209 void *ptr;
210 unsigned char type;
211 unsigned char encoding;
212 unsigned char notused[2];
213 int refcount;
214 } robj;
215
216 /* Macro used to initalize a Redis object allocated on the stack.
217 * Note that this macro is taken near the structure definition to make sure
218 * we'll update it when the structure is changed, to avoid bugs like
219 * bug #85 introduced exactly in this way. */
220 #define initStaticStringObject(_var,_ptr) do { \
221 _var.refcount = 1; \
222 _var.type = REDIS_STRING; \
223 _var.encoding = REDIS_ENCODING_RAW; \
224 _var.ptr = _ptr; \
225 } while(0);
226
227 typedef struct redisDb {
228 dict *dict;
229 dict *expires;
230 int id;
231 } redisDb;
232
233 /* With multiplexing we need to take per-clinet state.
234 * Clients are taken in a liked list. */
235 typedef struct redisClient {
236 int fd;
237 redisDb *db;
238 int dictid;
239 sds querybuf;
240 robj **argv, **mbargv;
241 int argc, mbargc;
242 int bulklen; /* bulk read len. -1 if not in bulk read mode */
243 int multibulk; /* multi bulk command format active */
244 list *reply;
245 int sentlen;
246 time_t lastinteraction; /* time of the last interaction, used for timeout */
247 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
248 int slaveseldb; /* slave selected db, if this client is a slave */
249 int authenticated; /* when requirepass is non-NULL */
250 int replstate; /* replication state if this is a slave */
251 int repldbfd; /* replication DB file descriptor */
252 long repldboff; /* replication DB file offset */
253 off_t repldbsize; /* replication DB file size */
254 } redisClient;
255
256 struct saveparam {
257 time_t seconds;
258 int changes;
259 };
260
261 /* Global server state structure */
262 struct redisServer {
263 int port;
264 int fd;
265 redisDb *db;
266 dict *sharingpool;
267 unsigned int sharingpoolsize;
268 long long dirty; /* changes to DB from the last save */
269 list *clients;
270 list *slaves, *monitors;
271 char neterr[ANET_ERR_LEN];
272 aeEventLoop *el;
273 int cronloops; /* number of times the cron function run */
274 list *objfreelist; /* A list of freed objects to avoid malloc() */
275 time_t lastsave; /* Unix time of last save succeeede */
276 size_t usedmemory; /* Used memory in megabytes */
277 /* Fields used only for stats */
278 time_t stat_starttime; /* server start time */
279 long long stat_numcommands; /* number of processed commands */
280 long long stat_numconnections; /* number of connections received */
281 /* Configuration */
282 int verbosity;
283 int glueoutputbuf;
284 int maxidletime;
285 int dbnum;
286 int daemonize;
287 int appendonly;
288 int appendfsync;
289 time_t lastfsync;
290 int appendfd;
291 int appendseldb;
292 char *pidfile;
293 pid_t bgsavechildpid;
294 pid_t bgrewritechildpid;
295 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
296 struct saveparam *saveparams;
297 int saveparamslen;
298 char *logfile;
299 char *bindaddr;
300 char *dbfilename;
301 char *appendfilename;
302 char *requirepass;
303 int shareobjects;
304 /* Replication related */
305 int isslave;
306 char *masterauth;
307 char *masterhost;
308 int masterport;
309 redisClient *master; /* client that is master for this slave */
310 int replstate;
311 unsigned int maxclients;
312 unsigned long maxmemory;
313 /* Sort parameters - qsort_r() is only available under BSD so we
314 * have to take this state global, in order to pass it to sortCompare() */
315 int sort_desc;
316 int sort_alpha;
317 int sort_bypattern;
318 };
319
320 typedef void redisCommandProc(redisClient *c);
321 struct redisCommand {
322 char *name;
323 redisCommandProc *proc;
324 int arity;
325 int flags;
326 };
327
328 struct redisFunctionSym {
329 char *name;
330 unsigned long pointer;
331 };
332
333 typedef struct _redisSortObject {
334 robj *obj;
335 union {
336 double score;
337 robj *cmpobj;
338 } u;
339 } redisSortObject;
340
341 typedef struct _redisSortOperation {
342 int type;
343 robj *pattern;
344 } redisSortOperation;
345
346 /* ZSETs use a specialized version of Skiplists */
347
348 typedef struct zskiplistNode {
349 struct zskiplistNode **forward;
350 struct zskiplistNode *backward;
351 double score;
352 robj *obj;
353 } zskiplistNode;
354
355 typedef struct zskiplist {
356 struct zskiplistNode *header, *tail;
357 unsigned long length;
358 int level;
359 } zskiplist;
360
361 typedef struct zset {
362 dict *dict;
363 zskiplist *zsl;
364 } zset;
365
366 /* Our shared "common" objects */
367
368 struct sharedObjectsStruct {
369 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
370 *colon, *nullbulk, *nullmultibulk,
371 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
372 *outofrangeerr, *plus,
373 *select0, *select1, *select2, *select3, *select4,
374 *select5, *select6, *select7, *select8, *select9;
375 } shared;
376
377 /* Global vars that are actally used as constants. The following double
378 * values are used for double on-disk serialization, and are initialized
379 * at runtime to avoid strange compiler optimizations. */
380
381 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
382
383 /*================================ Prototypes =============================== */
384
385 static void freeStringObject(robj *o);
386 static void freeListObject(robj *o);
387 static void freeSetObject(robj *o);
388 static void decrRefCount(void *o);
389 static robj *createObject(int type, void *ptr);
390 static void freeClient(redisClient *c);
391 static int rdbLoad(char *filename);
392 static void addReply(redisClient *c, robj *obj);
393 static void addReplySds(redisClient *c, sds s);
394 static void incrRefCount(robj *o);
395 static int rdbSaveBackground(char *filename);
396 static robj *createStringObject(char *ptr, size_t len);
397 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
398 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
399 static int syncWithMaster(void);
400 static robj *tryObjectSharing(robj *o);
401 static int tryObjectEncoding(robj *o);
402 static robj *getDecodedObject(robj *o);
403 static int removeExpire(redisDb *db, robj *key);
404 static int expireIfNeeded(redisDb *db, robj *key);
405 static int deleteIfVolatile(redisDb *db, robj *key);
406 static int deleteKey(redisDb *db, robj *key);
407 static time_t getExpire(redisDb *db, robj *key);
408 static int setExpire(redisDb *db, robj *key, time_t when);
409 static void updateSlavesWaitingBgsave(int bgsaveerr);
410 static void freeMemoryIfNeeded(void);
411 static int processCommand(redisClient *c);
412 static void setupSigSegvAction(void);
413 static void rdbRemoveTempFile(pid_t childpid);
414 static void aofRemoveTempFile(pid_t childpid);
415 static size_t stringObjectLen(robj *o);
416 static void processInputBuffer(redisClient *c);
417 static zskiplist *zslCreate(void);
418 static void zslFree(zskiplist *zsl);
419 static void zslInsert(zskiplist *zsl, double score, robj *obj);
420 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
421
422 static void authCommand(redisClient *c);
423 static void pingCommand(redisClient *c);
424 static void echoCommand(redisClient *c);
425 static void setCommand(redisClient *c);
426 static void setnxCommand(redisClient *c);
427 static void getCommand(redisClient *c);
428 static void delCommand(redisClient *c);
429 static void existsCommand(redisClient *c);
430 static void incrCommand(redisClient *c);
431 static void decrCommand(redisClient *c);
432 static void incrbyCommand(redisClient *c);
433 static void decrbyCommand(redisClient *c);
434 static void selectCommand(redisClient *c);
435 static void randomkeyCommand(redisClient *c);
436 static void keysCommand(redisClient *c);
437 static void dbsizeCommand(redisClient *c);
438 static void lastsaveCommand(redisClient *c);
439 static void saveCommand(redisClient *c);
440 static void bgsaveCommand(redisClient *c);
441 static void bgrewriteaofCommand(redisClient *c);
442 static void shutdownCommand(redisClient *c);
443 static void moveCommand(redisClient *c);
444 static void renameCommand(redisClient *c);
445 static void renamenxCommand(redisClient *c);
446 static void lpushCommand(redisClient *c);
447 static void rpushCommand(redisClient *c);
448 static void lpopCommand(redisClient *c);
449 static void rpopCommand(redisClient *c);
450 static void llenCommand(redisClient *c);
451 static void lindexCommand(redisClient *c);
452 static void lrangeCommand(redisClient *c);
453 static void ltrimCommand(redisClient *c);
454 static void typeCommand(redisClient *c);
455 static void lsetCommand(redisClient *c);
456 static void saddCommand(redisClient *c);
457 static void sremCommand(redisClient *c);
458 static void smoveCommand(redisClient *c);
459 static void sismemberCommand(redisClient *c);
460 static void scardCommand(redisClient *c);
461 static void spopCommand(redisClient *c);
462 static void srandmemberCommand(redisClient *c);
463 static void sinterCommand(redisClient *c);
464 static void sinterstoreCommand(redisClient *c);
465 static void sunionCommand(redisClient *c);
466 static void sunionstoreCommand(redisClient *c);
467 static void sdiffCommand(redisClient *c);
468 static void sdiffstoreCommand(redisClient *c);
469 static void syncCommand(redisClient *c);
470 static void flushdbCommand(redisClient *c);
471 static void flushallCommand(redisClient *c);
472 static void sortCommand(redisClient *c);
473 static void lremCommand(redisClient *c);
474 static void rpoplpushcommand(redisClient *c);
475 static void infoCommand(redisClient *c);
476 static void mgetCommand(redisClient *c);
477 static void monitorCommand(redisClient *c);
478 static void expireCommand(redisClient *c);
479 static void expireatCommand(redisClient *c);
480 static void getsetCommand(redisClient *c);
481 static void ttlCommand(redisClient *c);
482 static void slaveofCommand(redisClient *c);
483 static void debugCommand(redisClient *c);
484 static void msetCommand(redisClient *c);
485 static void msetnxCommand(redisClient *c);
486 static void zaddCommand(redisClient *c);
487 static void zincrbyCommand(redisClient *c);
488 static void zrangeCommand(redisClient *c);
489 static void zrangebyscoreCommand(redisClient *c);
490 static void zrevrangeCommand(redisClient *c);
491 static void zcardCommand(redisClient *c);
492 static void zremCommand(redisClient *c);
493 static void zscoreCommand(redisClient *c);
494 static void zremrangebyscoreCommand(redisClient *c);
495
496 /*================================= Globals ================================= */
497
498 /* Global vars */
499 static struct redisServer server; /* server global state */
500 static struct redisCommand cmdTable[] = {
501 {"get",getCommand,2,REDIS_CMD_INLINE},
502 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
503 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
504 {"del",delCommand,-2,REDIS_CMD_INLINE},
505 {"exists",existsCommand,2,REDIS_CMD_INLINE},
506 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
507 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
508 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
509 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
510 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
511 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
512 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
513 {"llen",llenCommand,2,REDIS_CMD_INLINE},
514 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
515 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
516 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
517 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
518 {"lrem",lremCommand,4,REDIS_CMD_BULK},
519 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
520 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
521 {"srem",sremCommand,3,REDIS_CMD_BULK},
522 {"smove",smoveCommand,4,REDIS_CMD_BULK},
523 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
524 {"scard",scardCommand,2,REDIS_CMD_INLINE},
525 {"spop",spopCommand,2,REDIS_CMD_INLINE},
526 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
527 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
528 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
529 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
530 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
531 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
532 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
533 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
534 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
535 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
536 {"zrem",zremCommand,3,REDIS_CMD_BULK},
537 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
538 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
539 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
540 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
541 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
542 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
543 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
544 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
545 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
546 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
547 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
548 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
549 {"select",selectCommand,2,REDIS_CMD_INLINE},
550 {"move",moveCommand,3,REDIS_CMD_INLINE},
551 {"rename",renameCommand,3,REDIS_CMD_INLINE},
552 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
553 {"expire",expireCommand,3,REDIS_CMD_INLINE},
554 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
555 {"keys",keysCommand,2,REDIS_CMD_INLINE},
556 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
557 {"auth",authCommand,2,REDIS_CMD_INLINE},
558 {"ping",pingCommand,1,REDIS_CMD_INLINE},
559 {"echo",echoCommand,2,REDIS_CMD_BULK},
560 {"save",saveCommand,1,REDIS_CMD_INLINE},
561 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
562 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
563 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
564 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
565 {"type",typeCommand,2,REDIS_CMD_INLINE},
566 {"sync",syncCommand,1,REDIS_CMD_INLINE},
567 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
568 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
569 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
570 {"info",infoCommand,1,REDIS_CMD_INLINE},
571 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
572 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
573 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
574 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
575 {NULL,NULL,0,0}
576 };
577
578 /*============================ Utility functions ============================ */
579
580 /* Glob-style pattern matching. */
581 int stringmatchlen(const char *pattern, int patternLen,
582 const char *string, int stringLen, int nocase)
583 {
584 while(patternLen) {
585 switch(pattern[0]) {
586 case '*':
587 while (pattern[1] == '*') {
588 pattern++;
589 patternLen--;
590 }
591 if (patternLen == 1)
592 return 1; /* match */
593 while(stringLen) {
594 if (stringmatchlen(pattern+1, patternLen-1,
595 string, stringLen, nocase))
596 return 1; /* match */
597 string++;
598 stringLen--;
599 }
600 return 0; /* no match */
601 break;
602 case '?':
603 if (stringLen == 0)
604 return 0; /* no match */
605 string++;
606 stringLen--;
607 break;
608 case '[':
609 {
610 int not, match;
611
612 pattern++;
613 patternLen--;
614 not = pattern[0] == '^';
615 if (not) {
616 pattern++;
617 patternLen--;
618 }
619 match = 0;
620 while(1) {
621 if (pattern[0] == '\\') {
622 pattern++;
623 patternLen--;
624 if (pattern[0] == string[0])
625 match = 1;
626 } else if (pattern[0] == ']') {
627 break;
628 } else if (patternLen == 0) {
629 pattern--;
630 patternLen++;
631 break;
632 } else if (pattern[1] == '-' && patternLen >= 3) {
633 int start = pattern[0];
634 int end = pattern[2];
635 int c = string[0];
636 if (start > end) {
637 int t = start;
638 start = end;
639 end = t;
640 }
641 if (nocase) {
642 start = tolower(start);
643 end = tolower(end);
644 c = tolower(c);
645 }
646 pattern += 2;
647 patternLen -= 2;
648 if (c >= start && c <= end)
649 match = 1;
650 } else {
651 if (!nocase) {
652 if (pattern[0] == string[0])
653 match = 1;
654 } else {
655 if (tolower((int)pattern[0]) == tolower((int)string[0]))
656 match = 1;
657 }
658 }
659 pattern++;
660 patternLen--;
661 }
662 if (not)
663 match = !match;
664 if (!match)
665 return 0; /* no match */
666 string++;
667 stringLen--;
668 break;
669 }
670 case '\\':
671 if (patternLen >= 2) {
672 pattern++;
673 patternLen--;
674 }
675 /* fall through */
676 default:
677 if (!nocase) {
678 if (pattern[0] != string[0])
679 return 0; /* no match */
680 } else {
681 if (tolower((int)pattern[0]) != tolower((int)string[0]))
682 return 0; /* no match */
683 }
684 string++;
685 stringLen--;
686 break;
687 }
688 pattern++;
689 patternLen--;
690 if (stringLen == 0) {
691 while(*pattern == '*') {
692 pattern++;
693 patternLen--;
694 }
695 break;
696 }
697 }
698 if (patternLen == 0 && stringLen == 0)
699 return 1;
700 return 0;
701 }
702
703 static void redisLog(int level, const char *fmt, ...) {
704 va_list ap;
705 FILE *fp;
706
707 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
708 if (!fp) return;
709
710 va_start(ap, fmt);
711 if (level >= server.verbosity) {
712 char *c = ".-*";
713 char buf[64];
714 time_t now;
715
716 now = time(NULL);
717 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
718 fprintf(fp,"%s %c ",buf,c[level]);
719 vfprintf(fp, fmt, ap);
720 fprintf(fp,"\n");
721 fflush(fp);
722 }
723 va_end(ap);
724
725 if (server.logfile) fclose(fp);
726 }
727
728 /*====================== Hash table type implementation ==================== */
729
730 /* This is an hash table type that uses the SDS dynamic strings libary as
731 * keys and radis objects as values (objects can hold SDS strings,
732 * lists, sets). */
733
734 static void dictVanillaFree(void *privdata, void *val)
735 {
736 DICT_NOTUSED(privdata);
737 zfree(val);
738 }
739
740 static int sdsDictKeyCompare(void *privdata, const void *key1,
741 const void *key2)
742 {
743 int l1,l2;
744 DICT_NOTUSED(privdata);
745
746 l1 = sdslen((sds)key1);
747 l2 = sdslen((sds)key2);
748 if (l1 != l2) return 0;
749 return memcmp(key1, key2, l1) == 0;
750 }
751
752 static void dictRedisObjectDestructor(void *privdata, void *val)
753 {
754 DICT_NOTUSED(privdata);
755
756 decrRefCount(val);
757 }
758
759 static int dictObjKeyCompare(void *privdata, const void *key1,
760 const void *key2)
761 {
762 const robj *o1 = key1, *o2 = key2;
763 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
764 }
765
766 static unsigned int dictObjHash(const void *key) {
767 const robj *o = key;
768 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
769 }
770
771 static int dictEncObjKeyCompare(void *privdata, const void *key1,
772 const void *key2)
773 {
774 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
775 int cmp;
776
777 o1 = getDecodedObject(o1);
778 o2 = getDecodedObject(o2);
779 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
780 decrRefCount(o1);
781 decrRefCount(o2);
782 return cmp;
783 }
784
785 static unsigned int dictEncObjHash(const void *key) {
786 robj *o = (robj*) key;
787
788 o = getDecodedObject(o);
789 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
790 decrRefCount(o);
791 return hash;
792 }
793
794 static dictType setDictType = {
795 dictEncObjHash, /* hash function */
796 NULL, /* key dup */
797 NULL, /* val dup */
798 dictEncObjKeyCompare, /* key compare */
799 dictRedisObjectDestructor, /* key destructor */
800 NULL /* val destructor */
801 };
802
803 static dictType zsetDictType = {
804 dictEncObjHash, /* hash function */
805 NULL, /* key dup */
806 NULL, /* val dup */
807 dictEncObjKeyCompare, /* key compare */
808 dictRedisObjectDestructor, /* key destructor */
809 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
810 };
811
812 static dictType hashDictType = {
813 dictObjHash, /* hash function */
814 NULL, /* key dup */
815 NULL, /* val dup */
816 dictObjKeyCompare, /* key compare */
817 dictRedisObjectDestructor, /* key destructor */
818 dictRedisObjectDestructor /* val destructor */
819 };
820
821 /* ========================= Random utility functions ======================= */
822
823 /* Redis generally does not try to recover from out of memory conditions
824 * when allocating objects or strings, it is not clear if it will be possible
825 * to report this condition to the client since the networking layer itself
826 * is based on heap allocation for send buffers, so we simply abort.
827 * At least the code will be simpler to read... */
828 static void oom(const char *msg) {
829 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
830 sleep(1);
831 abort();
832 }
833
834 /* ====================== Redis server networking stuff ===================== */
835 static void closeTimedoutClients(void) {
836 redisClient *c;
837 listNode *ln;
838 time_t now = time(NULL);
839
840 listRewind(server.clients);
841 while ((ln = listYield(server.clients)) != NULL) {
842 c = listNodeValue(ln);
843 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
844 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
845 (now - c->lastinteraction > server.maxidletime)) {
846 redisLog(REDIS_DEBUG,"Closing idle client");
847 freeClient(c);
848 }
849 }
850 }
851
852 static int htNeedsResize(dict *dict) {
853 long long size, used;
854
855 size = dictSlots(dict);
856 used = dictSize(dict);
857 return (size && used && size > DICT_HT_INITIAL_SIZE &&
858 (used*100/size < REDIS_HT_MINFILL));
859 }
860
861 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
862 * we resize the hash table to save memory */
863 static void tryResizeHashTables(void) {
864 int j;
865
866 for (j = 0; j < server.dbnum; j++) {
867 if (htNeedsResize(server.db[j].dict)) {
868 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
869 dictResize(server.db[j].dict);
870 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
871 }
872 if (htNeedsResize(server.db[j].expires))
873 dictResize(server.db[j].expires);
874 }
875 }
876
877 /* A background saving child (BGSAVE) terminated its work. Handle this. */
878 void backgroundSaveDoneHandler(int statloc) {
879 int exitcode = WEXITSTATUS(statloc);
880 int bysignal = WIFSIGNALED(statloc);
881
882 if (!bysignal && exitcode == 0) {
883 redisLog(REDIS_NOTICE,
884 "Background saving terminated with success");
885 server.dirty = 0;
886 server.lastsave = time(NULL);
887 } else if (!bysignal && exitcode != 0) {
888 redisLog(REDIS_WARNING, "Background saving error");
889 } else {
890 redisLog(REDIS_WARNING,
891 "Background saving terminated by signal");
892 rdbRemoveTempFile(server.bgsavechildpid);
893 }
894 server.bgsavechildpid = -1;
895 /* Possibly there are slaves waiting for a BGSAVE in order to be served
896 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
897 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
898 }
899
900 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
901 * Handle this. */
902 void backgroundRewriteDoneHandler(int statloc) {
903 int exitcode = WEXITSTATUS(statloc);
904 int bysignal = WIFSIGNALED(statloc);
905
906 if (!bysignal && exitcode == 0) {
907 int fd;
908 char tmpfile[256];
909
910 redisLog(REDIS_NOTICE,
911 "Background append only file rewriting terminated with success");
912 /* Now it's time to flush the differences accumulated by the parent */
913 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
914 fd = open(tmpfile,O_WRONLY|O_APPEND);
915 if (fd == -1) {
916 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
917 goto cleanup;
918 }
919 /* Flush our data... */
920 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
921 (signed) sdslen(server.bgrewritebuf)) {
922 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
923 close(fd);
924 goto cleanup;
925 }
926 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
927 /* Now our work is to rename the temp file into the stable file. And
928 * switch the file descriptor used by the server for append only. */
929 if (rename(tmpfile,server.appendfilename) == -1) {
930 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
931 close(fd);
932 goto cleanup;
933 }
934 /* Mission completed... almost */
935 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
936 if (server.appendfd != -1) {
937 /* If append only is actually enabled... */
938 close(server.appendfd);
939 server.appendfd = fd;
940 fsync(fd);
941 server.appendseldb = -1; /* Make sure it will issue SELECT */
942 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
943 } else {
944 /* If append only is disabled we just generate a dump in this
945 * format. Why not? */
946 close(fd);
947 }
948 } else if (!bysignal && exitcode != 0) {
949 redisLog(REDIS_WARNING, "Background append only file rewriting error");
950 } else {
951 redisLog(REDIS_WARNING,
952 "Background append only file rewriting terminated by signal");
953 }
954 cleanup:
955 sdsfree(server.bgrewritebuf);
956 server.bgrewritebuf = sdsempty();
957 aofRemoveTempFile(server.bgrewritechildpid);
958 server.bgrewritechildpid = -1;
959 }
960
961 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
962 int j, loops = server.cronloops++;
963 REDIS_NOTUSED(eventLoop);
964 REDIS_NOTUSED(id);
965 REDIS_NOTUSED(clientData);
966
967 /* Update the global state with the amount of used memory */
968 server.usedmemory = zmalloc_used_memory();
969
970 /* Show some info about non-empty databases */
971 for (j = 0; j < server.dbnum; j++) {
972 long long size, used, vkeys;
973
974 size = dictSlots(server.db[j].dict);
975 used = dictSize(server.db[j].dict);
976 vkeys = dictSize(server.db[j].expires);
977 if (!(loops % 5) && (used || vkeys)) {
978 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
979 /* dictPrintStats(server.dict); */
980 }
981 }
982
983 /* We don't want to resize the hash tables while a bacground saving
984 * is in progress: the saving child is created using fork() that is
985 * implemented with a copy-on-write semantic in most modern systems, so
986 * if we resize the HT while there is the saving child at work actually
987 * a lot of memory movements in the parent will cause a lot of pages
988 * copied. */
989 if (server.bgsavechildpid == -1) tryResizeHashTables();
990
991 /* Show information about connected clients */
992 if (!(loops % 5)) {
993 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
994 listLength(server.clients)-listLength(server.slaves),
995 listLength(server.slaves),
996 server.usedmemory,
997 dictSize(server.sharingpool));
998 }
999
1000 /* Close connections of timedout clients */
1001 if (server.maxidletime && !(loops % 10))
1002 closeTimedoutClients();
1003
1004 /* Check if a background saving or AOF rewrite in progress terminated */
1005 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
1006 int statloc;
1007 pid_t pid;
1008
1009 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1010 if (pid == server.bgsavechildpid) {
1011 backgroundSaveDoneHandler(statloc);
1012 } else {
1013 backgroundRewriteDoneHandler(statloc);
1014 }
1015 }
1016 } else {
1017 /* If there is not a background saving in progress check if
1018 * we have to save now */
1019 time_t now = time(NULL);
1020 for (j = 0; j < server.saveparamslen; j++) {
1021 struct saveparam *sp = server.saveparams+j;
1022
1023 if (server.dirty >= sp->changes &&
1024 now-server.lastsave > sp->seconds) {
1025 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1026 sp->changes, sp->seconds);
1027 rdbSaveBackground(server.dbfilename);
1028 break;
1029 }
1030 }
1031 }
1032
1033 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1034 * will use few CPU cycles if there are few expiring keys, otherwise
1035 * it will get more aggressive to avoid that too much memory is used by
1036 * keys that can be removed from the keyspace. */
1037 for (j = 0; j < server.dbnum; j++) {
1038 int expired;
1039 redisDb *db = server.db+j;
1040
1041 /* Continue to expire if at the end of the cycle more than 25%
1042 * of the keys were expired. */
1043 do {
1044 int num = dictSize(db->expires);
1045 time_t now = time(NULL);
1046
1047 expired = 0;
1048 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1049 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1050 while (num--) {
1051 dictEntry *de;
1052 time_t t;
1053
1054 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1055 t = (time_t) dictGetEntryVal(de);
1056 if (now > t) {
1057 deleteKey(db,dictGetEntryKey(de));
1058 expired++;
1059 }
1060 }
1061 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1062 }
1063
1064 /* Check if we should connect to a MASTER */
1065 if (server.replstate == REDIS_REPL_CONNECT) {
1066 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1067 if (syncWithMaster() == REDIS_OK) {
1068 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1069 }
1070 }
1071 return 1000;
1072 }
1073
1074 static void createSharedObjects(void) {
1075 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1076 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1077 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1078 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1079 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1080 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1081 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1082 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1083 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1084 /* no such key */
1085 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1086 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1087 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1088 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1089 "-ERR no such key\r\n"));
1090 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1091 "-ERR syntax error\r\n"));
1092 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1093 "-ERR source and destination objects are the same\r\n"));
1094 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1095 "-ERR index out of range\r\n"));
1096 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1097 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1098 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1099 shared.select0 = createStringObject("select 0\r\n",10);
1100 shared.select1 = createStringObject("select 1\r\n",10);
1101 shared.select2 = createStringObject("select 2\r\n",10);
1102 shared.select3 = createStringObject("select 3\r\n",10);
1103 shared.select4 = createStringObject("select 4\r\n",10);
1104 shared.select5 = createStringObject("select 5\r\n",10);
1105 shared.select6 = createStringObject("select 6\r\n",10);
1106 shared.select7 = createStringObject("select 7\r\n",10);
1107 shared.select8 = createStringObject("select 8\r\n",10);
1108 shared.select9 = createStringObject("select 9\r\n",10);
1109 }
1110
1111 static void appendServerSaveParams(time_t seconds, int changes) {
1112 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1113 server.saveparams[server.saveparamslen].seconds = seconds;
1114 server.saveparams[server.saveparamslen].changes = changes;
1115 server.saveparamslen++;
1116 }
1117
1118 static void resetServerSaveParams() {
1119 zfree(server.saveparams);
1120 server.saveparams = NULL;
1121 server.saveparamslen = 0;
1122 }
1123
1124 static void initServerConfig() {
1125 server.dbnum = REDIS_DEFAULT_DBNUM;
1126 server.port = REDIS_SERVERPORT;
1127 server.verbosity = REDIS_DEBUG;
1128 server.maxidletime = REDIS_MAXIDLETIME;
1129 server.saveparams = NULL;
1130 server.logfile = NULL; /* NULL = log on standard output */
1131 server.bindaddr = NULL;
1132 server.glueoutputbuf = 1;
1133 server.daemonize = 0;
1134 server.appendonly = 0;
1135 server.appendfsync = APPENDFSYNC_ALWAYS;
1136 server.lastfsync = time(NULL);
1137 server.appendfd = -1;
1138 server.appendseldb = -1; /* Make sure the first time will not match */
1139 server.pidfile = "/var/run/redis.pid";
1140 server.dbfilename = "dump.rdb";
1141 server.appendfilename = "appendonly.aof";
1142 server.requirepass = NULL;
1143 server.shareobjects = 0;
1144 server.sharingpoolsize = 1024;
1145 server.maxclients = 0;
1146 server.maxmemory = 0;
1147 resetServerSaveParams();
1148
1149 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1150 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1151 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1152 /* Replication related */
1153 server.isslave = 0;
1154 server.masterauth = NULL;
1155 server.masterhost = NULL;
1156 server.masterport = 6379;
1157 server.master = NULL;
1158 server.replstate = REDIS_REPL_NONE;
1159
1160 /* Double constants initialization */
1161 R_Zero = 0.0;
1162 R_PosInf = 1.0/R_Zero;
1163 R_NegInf = -1.0/R_Zero;
1164 R_Nan = R_Zero/R_Zero;
1165 }
1166
1167 static void initServer() {
1168 int j;
1169
1170 signal(SIGHUP, SIG_IGN);
1171 signal(SIGPIPE, SIG_IGN);
1172 setupSigSegvAction();
1173
1174 server.clients = listCreate();
1175 server.slaves = listCreate();
1176 server.monitors = listCreate();
1177 server.objfreelist = listCreate();
1178 createSharedObjects();
1179 server.el = aeCreateEventLoop();
1180 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1181 server.sharingpool = dictCreate(&setDictType,NULL);
1182 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1183 if (server.fd == -1) {
1184 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1185 exit(1);
1186 }
1187 for (j = 0; j < server.dbnum; j++) {
1188 server.db[j].dict = dictCreate(&hashDictType,NULL);
1189 server.db[j].expires = dictCreate(&setDictType,NULL);
1190 server.db[j].id = j;
1191 }
1192 server.cronloops = 0;
1193 server.bgsavechildpid = -1;
1194 server.bgrewritechildpid = -1;
1195 server.bgrewritebuf = sdsempty();
1196 server.lastsave = time(NULL);
1197 server.dirty = 0;
1198 server.usedmemory = 0;
1199 server.stat_numcommands = 0;
1200 server.stat_numconnections = 0;
1201 server.stat_starttime = time(NULL);
1202 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1203
1204 if (server.appendonly) {
1205 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1206 if (server.appendfd == -1) {
1207 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1208 strerror(errno));
1209 exit(1);
1210 }
1211 }
1212 }
1213
1214 /* Empty the whole database */
1215 static long long emptyDb() {
1216 int j;
1217 long long removed = 0;
1218
1219 for (j = 0; j < server.dbnum; j++) {
1220 removed += dictSize(server.db[j].dict);
1221 dictEmpty(server.db[j].dict);
1222 dictEmpty(server.db[j].expires);
1223 }
1224 return removed;
1225 }
1226
1227 static int yesnotoi(char *s) {
1228 if (!strcasecmp(s,"yes")) return 1;
1229 else if (!strcasecmp(s,"no")) return 0;
1230 else return -1;
1231 }
1232
1233 /* I agree, this is a very rudimental way to load a configuration...
1234 will improve later if the config gets more complex */
1235 static void loadServerConfig(char *filename) {
1236 FILE *fp;
1237 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1238 int linenum = 0;
1239 sds line = NULL;
1240
1241 if (filename[0] == '-' && filename[1] == '\0')
1242 fp = stdin;
1243 else {
1244 if ((fp = fopen(filename,"r")) == NULL) {
1245 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1246 exit(1);
1247 }
1248 }
1249
1250 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1251 sds *argv;
1252 int argc, j;
1253
1254 linenum++;
1255 line = sdsnew(buf);
1256 line = sdstrim(line," \t\r\n");
1257
1258 /* Skip comments and blank lines*/
1259 if (line[0] == '#' || line[0] == '\0') {
1260 sdsfree(line);
1261 continue;
1262 }
1263
1264 /* Split into arguments */
1265 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1266 sdstolower(argv[0]);
1267
1268 /* Execute config directives */
1269 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1270 server.maxidletime = atoi(argv[1]);
1271 if (server.maxidletime < 0) {
1272 err = "Invalid timeout value"; goto loaderr;
1273 }
1274 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1275 server.port = atoi(argv[1]);
1276 if (server.port < 1 || server.port > 65535) {
1277 err = "Invalid port"; goto loaderr;
1278 }
1279 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1280 server.bindaddr = zstrdup(argv[1]);
1281 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1282 int seconds = atoi(argv[1]);
1283 int changes = atoi(argv[2]);
1284 if (seconds < 1 || changes < 0) {
1285 err = "Invalid save parameters"; goto loaderr;
1286 }
1287 appendServerSaveParams(seconds,changes);
1288 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1289 if (chdir(argv[1]) == -1) {
1290 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1291 argv[1], strerror(errno));
1292 exit(1);
1293 }
1294 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1295 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1296 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1297 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1298 else {
1299 err = "Invalid log level. Must be one of debug, notice, warning";
1300 goto loaderr;
1301 }
1302 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1303 FILE *logfp;
1304
1305 server.logfile = zstrdup(argv[1]);
1306 if (!strcasecmp(server.logfile,"stdout")) {
1307 zfree(server.logfile);
1308 server.logfile = NULL;
1309 }
1310 if (server.logfile) {
1311 /* Test if we are able to open the file. The server will not
1312 * be able to abort just for this problem later... */
1313 logfp = fopen(server.logfile,"a");
1314 if (logfp == NULL) {
1315 err = sdscatprintf(sdsempty(),
1316 "Can't open the log file: %s", strerror(errno));
1317 goto loaderr;
1318 }
1319 fclose(logfp);
1320 }
1321 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1322 server.dbnum = atoi(argv[1]);
1323 if (server.dbnum < 1) {
1324 err = "Invalid number of databases"; goto loaderr;
1325 }
1326 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1327 server.maxclients = atoi(argv[1]);
1328 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1329 server.maxmemory = strtoll(argv[1], NULL, 10);
1330 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1331 server.masterhost = sdsnew(argv[1]);
1332 server.masterport = atoi(argv[2]);
1333 server.replstate = REDIS_REPL_CONNECT;
1334 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1335 server.masterauth = zstrdup(argv[1]);
1336 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1337 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1338 err = "argument must be 'yes' or 'no'"; goto loaderr;
1339 }
1340 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1341 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1342 err = "argument must be 'yes' or 'no'"; goto loaderr;
1343 }
1344 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1345 server.sharingpoolsize = atoi(argv[1]);
1346 if (server.sharingpoolsize < 1) {
1347 err = "invalid object sharing pool size"; goto loaderr;
1348 }
1349 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1350 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1351 err = "argument must be 'yes' or 'no'"; goto loaderr;
1352 }
1353 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1354 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1355 err = "argument must be 'yes' or 'no'"; goto loaderr;
1356 }
1357 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1358 if (!strcasecmp(argv[1],"no")) {
1359 server.appendfsync = APPENDFSYNC_NO;
1360 } else if (!strcasecmp(argv[1],"always")) {
1361 server.appendfsync = APPENDFSYNC_ALWAYS;
1362 } else if (!strcasecmp(argv[1],"everysec")) {
1363 server.appendfsync = APPENDFSYNC_EVERYSEC;
1364 } else {
1365 err = "argument must be 'no', 'always' or 'everysec'";
1366 goto loaderr;
1367 }
1368 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1369 server.requirepass = zstrdup(argv[1]);
1370 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1371 server.pidfile = zstrdup(argv[1]);
1372 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1373 server.dbfilename = zstrdup(argv[1]);
1374 } else {
1375 err = "Bad directive or wrong number of arguments"; goto loaderr;
1376 }
1377 for (j = 0; j < argc; j++)
1378 sdsfree(argv[j]);
1379 zfree(argv);
1380 sdsfree(line);
1381 }
1382 if (fp != stdin) fclose(fp);
1383 return;
1384
1385 loaderr:
1386 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1387 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1388 fprintf(stderr, ">>> '%s'\n", line);
1389 fprintf(stderr, "%s\n", err);
1390 exit(1);
1391 }
1392
1393 static void freeClientArgv(redisClient *c) {
1394 int j;
1395
1396 for (j = 0; j < c->argc; j++)
1397 decrRefCount(c->argv[j]);
1398 for (j = 0; j < c->mbargc; j++)
1399 decrRefCount(c->mbargv[j]);
1400 c->argc = 0;
1401 c->mbargc = 0;
1402 }
1403
1404 static void freeClient(redisClient *c) {
1405 listNode *ln;
1406
1407 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1408 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1409 sdsfree(c->querybuf);
1410 listRelease(c->reply);
1411 freeClientArgv(c);
1412 close(c->fd);
1413 ln = listSearchKey(server.clients,c);
1414 redisAssert(ln != NULL);
1415 listDelNode(server.clients,ln);
1416 if (c->flags & REDIS_SLAVE) {
1417 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1418 close(c->repldbfd);
1419 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1420 ln = listSearchKey(l,c);
1421 redisAssert(ln != NULL);
1422 listDelNode(l,ln);
1423 }
1424 if (c->flags & REDIS_MASTER) {
1425 server.master = NULL;
1426 server.replstate = REDIS_REPL_CONNECT;
1427 }
1428 zfree(c->argv);
1429 zfree(c->mbargv);
1430 zfree(c);
1431 }
1432
1433 #define GLUEREPLY_UP_TO (1024)
1434 static void glueReplyBuffersIfNeeded(redisClient *c) {
1435 int copylen = 0;
1436 char buf[GLUEREPLY_UP_TO];
1437 listNode *ln;
1438 robj *o;
1439
1440 listRewind(c->reply);
1441 while((ln = listYield(c->reply))) {
1442 int objlen;
1443
1444 o = ln->value;
1445 objlen = sdslen(o->ptr);
1446 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1447 memcpy(buf+copylen,o->ptr,objlen);
1448 copylen += objlen;
1449 listDelNode(c->reply,ln);
1450 } else {
1451 if (copylen == 0) return;
1452 break;
1453 }
1454 }
1455 /* Now the output buffer is empty, add the new single element */
1456 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1457 listAddNodeHead(c->reply,o);
1458 }
1459
1460 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1461 redisClient *c = privdata;
1462 int nwritten = 0, totwritten = 0, objlen;
1463 robj *o;
1464 REDIS_NOTUSED(el);
1465 REDIS_NOTUSED(mask);
1466
1467 /* Use writev() if we have enough buffers to send */
1468 if (!server.glueoutputbuf &&
1469 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1470 !(c->flags & REDIS_MASTER))
1471 {
1472 sendReplyToClientWritev(el, fd, privdata, mask);
1473 return;
1474 }
1475
1476 while(listLength(c->reply)) {
1477 if (server.glueoutputbuf && listLength(c->reply) > 1)
1478 glueReplyBuffersIfNeeded(c);
1479
1480 o = listNodeValue(listFirst(c->reply));
1481 objlen = sdslen(o->ptr);
1482
1483 if (objlen == 0) {
1484 listDelNode(c->reply,listFirst(c->reply));
1485 continue;
1486 }
1487
1488 if (c->flags & REDIS_MASTER) {
1489 /* Don't reply to a master */
1490 nwritten = objlen - c->sentlen;
1491 } else {
1492 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1493 if (nwritten <= 0) break;
1494 }
1495 c->sentlen += nwritten;
1496 totwritten += nwritten;
1497 /* If we fully sent the object on head go to the next one */
1498 if (c->sentlen == objlen) {
1499 listDelNode(c->reply,listFirst(c->reply));
1500 c->sentlen = 0;
1501 }
1502 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1503 * bytes, in a single threaded server it's a good idea to serve
1504 * other clients as well, even if a very large request comes from
1505 * super fast link that is always able to accept data (in real world
1506 * scenario think about 'KEYS *' against the loopback interfae) */
1507 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1508 }
1509 if (nwritten == -1) {
1510 if (errno == EAGAIN) {
1511 nwritten = 0;
1512 } else {
1513 redisLog(REDIS_DEBUG,
1514 "Error writing to client: %s", strerror(errno));
1515 freeClient(c);
1516 return;
1517 }
1518 }
1519 if (totwritten > 0) c->lastinteraction = time(NULL);
1520 if (listLength(c->reply) == 0) {
1521 c->sentlen = 0;
1522 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1523 }
1524 }
1525
1526 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1527 {
1528 redisClient *c = privdata;
1529 int nwritten = 0, totwritten = 0, objlen, willwrite;
1530 robj *o;
1531 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1532 int offset, ion = 0;
1533 REDIS_NOTUSED(el);
1534 REDIS_NOTUSED(mask);
1535
1536 listNode *node;
1537 while (listLength(c->reply)) {
1538 offset = c->sentlen;
1539 ion = 0;
1540 willwrite = 0;
1541
1542 /* fill-in the iov[] array */
1543 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1544 o = listNodeValue(node);
1545 objlen = sdslen(o->ptr);
1546
1547 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1548 break;
1549
1550 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1551 break; /* no more iovecs */
1552
1553 iov[ion].iov_base = ((char*)o->ptr) + offset;
1554 iov[ion].iov_len = objlen - offset;
1555 willwrite += objlen - offset;
1556 offset = 0; /* just for the first item */
1557 ion++;
1558 }
1559
1560 if(willwrite == 0)
1561 break;
1562
1563 /* write all collected blocks at once */
1564 if((nwritten = writev(fd, iov, ion)) < 0) {
1565 if (errno != EAGAIN) {
1566 redisLog(REDIS_DEBUG,
1567 "Error writing to client: %s", strerror(errno));
1568 freeClient(c);
1569 return;
1570 }
1571 break;
1572 }
1573
1574 totwritten += nwritten;
1575 offset = c->sentlen;
1576
1577 /* remove written robjs from c->reply */
1578 while (nwritten && listLength(c->reply)) {
1579 o = listNodeValue(listFirst(c->reply));
1580 objlen = sdslen(o->ptr);
1581
1582 if(nwritten >= objlen - offset) {
1583 listDelNode(c->reply, listFirst(c->reply));
1584 nwritten -= objlen - offset;
1585 c->sentlen = 0;
1586 } else {
1587 /* partial write */
1588 c->sentlen += nwritten;
1589 break;
1590 }
1591 offset = 0;
1592 }
1593 }
1594
1595 if (totwritten > 0)
1596 c->lastinteraction = time(NULL);
1597
1598 if (listLength(c->reply) == 0) {
1599 c->sentlen = 0;
1600 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1601 }
1602 }
1603
1604 static struct redisCommand *lookupCommand(char *name) {
1605 int j = 0;
1606 while(cmdTable[j].name != NULL) {
1607 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1608 j++;
1609 }
1610 return NULL;
1611 }
1612
1613 /* resetClient prepare the client to process the next command */
1614 static void resetClient(redisClient *c) {
1615 freeClientArgv(c);
1616 c->bulklen = -1;
1617 c->multibulk = 0;
1618 }
1619
1620 /* If this function gets called we already read a whole
1621 * command, argments are in the client argv/argc fields.
1622 * processCommand() execute the command or prepare the
1623 * server for a bulk read from the client.
1624 *
1625 * If 1 is returned the client is still alive and valid and
1626 * and other operations can be performed by the caller. Otherwise
1627 * if 0 is returned the client was destroied (i.e. after QUIT). */
1628 static int processCommand(redisClient *c) {
1629 struct redisCommand *cmd;
1630 long long dirty;
1631
1632 /* Free some memory if needed (maxmemory setting) */
1633 if (server.maxmemory) freeMemoryIfNeeded();
1634
1635 /* Handle the multi bulk command type. This is an alternative protocol
1636 * supported by Redis in order to receive commands that are composed of
1637 * multiple binary-safe "bulk" arguments. The latency of processing is
1638 * a bit higher but this allows things like multi-sets, so if this
1639 * protocol is used only for MSET and similar commands this is a big win. */
1640 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1641 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1642 if (c->multibulk <= 0) {
1643 resetClient(c);
1644 return 1;
1645 } else {
1646 decrRefCount(c->argv[c->argc-1]);
1647 c->argc--;
1648 return 1;
1649 }
1650 } else if (c->multibulk) {
1651 if (c->bulklen == -1) {
1652 if (((char*)c->argv[0]->ptr)[0] != '$') {
1653 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1654 resetClient(c);
1655 return 1;
1656 } else {
1657 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1658 decrRefCount(c->argv[0]);
1659 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1660 c->argc--;
1661 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1662 resetClient(c);
1663 return 1;
1664 }
1665 c->argc--;
1666 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1667 return 1;
1668 }
1669 } else {
1670 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1671 c->mbargv[c->mbargc] = c->argv[0];
1672 c->mbargc++;
1673 c->argc--;
1674 c->multibulk--;
1675 if (c->multibulk == 0) {
1676 robj **auxargv;
1677 int auxargc;
1678
1679 /* Here we need to swap the multi-bulk argc/argv with the
1680 * normal argc/argv of the client structure. */
1681 auxargv = c->argv;
1682 c->argv = c->mbargv;
1683 c->mbargv = auxargv;
1684
1685 auxargc = c->argc;
1686 c->argc = c->mbargc;
1687 c->mbargc = auxargc;
1688
1689 /* We need to set bulklen to something different than -1
1690 * in order for the code below to process the command without
1691 * to try to read the last argument of a bulk command as
1692 * a special argument. */
1693 c->bulklen = 0;
1694 /* continue below and process the command */
1695 } else {
1696 c->bulklen = -1;
1697 return 1;
1698 }
1699 }
1700 }
1701 /* -- end of multi bulk commands processing -- */
1702
1703 /* The QUIT command is handled as a special case. Normal command
1704 * procs are unable to close the client connection safely */
1705 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1706 freeClient(c);
1707 return 0;
1708 }
1709 cmd = lookupCommand(c->argv[0]->ptr);
1710 if (!cmd) {
1711 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1712 resetClient(c);
1713 return 1;
1714 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1715 (c->argc < -cmd->arity)) {
1716 addReplySds(c,
1717 sdscatprintf(sdsempty(),
1718 "-ERR wrong number of arguments for '%s' command\r\n",
1719 cmd->name));
1720 resetClient(c);
1721 return 1;
1722 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1723 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1724 resetClient(c);
1725 return 1;
1726 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1727 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1728
1729 decrRefCount(c->argv[c->argc-1]);
1730 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1731 c->argc--;
1732 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1733 resetClient(c);
1734 return 1;
1735 }
1736 c->argc--;
1737 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1738 /* It is possible that the bulk read is already in the
1739 * buffer. Check this condition and handle it accordingly.
1740 * This is just a fast path, alternative to call processInputBuffer().
1741 * It's a good idea since the code is small and this condition
1742 * happens most of the times. */
1743 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1744 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1745 c->argc++;
1746 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1747 } else {
1748 return 1;
1749 }
1750 }
1751 /* Let's try to share objects on the command arguments vector */
1752 if (server.shareobjects) {
1753 int j;
1754 for(j = 1; j < c->argc; j++)
1755 c->argv[j] = tryObjectSharing(c->argv[j]);
1756 }
1757 /* Let's try to encode the bulk object to save space. */
1758 if (cmd->flags & REDIS_CMD_BULK)
1759 tryObjectEncoding(c->argv[c->argc-1]);
1760
1761 /* Check if the user is authenticated */
1762 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1763 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1764 resetClient(c);
1765 return 1;
1766 }
1767
1768 /* Exec the command */
1769 dirty = server.dirty;
1770 cmd->proc(c);
1771 if (server.appendonly && server.dirty-dirty)
1772 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1773 if (server.dirty-dirty && listLength(server.slaves))
1774 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1775 if (listLength(server.monitors))
1776 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1777 server.stat_numcommands++;
1778
1779 /* Prepare the client for the next command */
1780 if (c->flags & REDIS_CLOSE) {
1781 freeClient(c);
1782 return 0;
1783 }
1784 resetClient(c);
1785 return 1;
1786 }
1787
1788 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1789 listNode *ln;
1790 int outc = 0, j;
1791 robj **outv;
1792 /* (args*2)+1 is enough room for args, spaces, newlines */
1793 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1794
1795 if (argc <= REDIS_STATIC_ARGS) {
1796 outv = static_outv;
1797 } else {
1798 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1799 }
1800
1801 for (j = 0; j < argc; j++) {
1802 if (j != 0) outv[outc++] = shared.space;
1803 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1804 robj *lenobj;
1805
1806 lenobj = createObject(REDIS_STRING,
1807 sdscatprintf(sdsempty(),"%lu\r\n",
1808 (unsigned long) stringObjectLen(argv[j])));
1809 lenobj->refcount = 0;
1810 outv[outc++] = lenobj;
1811 }
1812 outv[outc++] = argv[j];
1813 }
1814 outv[outc++] = shared.crlf;
1815
1816 /* Increment all the refcounts at start and decrement at end in order to
1817 * be sure to free objects if there is no slave in a replication state
1818 * able to be feed with commands */
1819 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1820 listRewind(slaves);
1821 while((ln = listYield(slaves))) {
1822 redisClient *slave = ln->value;
1823
1824 /* Don't feed slaves that are still waiting for BGSAVE to start */
1825 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1826
1827 /* Feed all the other slaves, MONITORs and so on */
1828 if (slave->slaveseldb != dictid) {
1829 robj *selectcmd;
1830
1831 switch(dictid) {
1832 case 0: selectcmd = shared.select0; break;
1833 case 1: selectcmd = shared.select1; break;
1834 case 2: selectcmd = shared.select2; break;
1835 case 3: selectcmd = shared.select3; break;
1836 case 4: selectcmd = shared.select4; break;
1837 case 5: selectcmd = shared.select5; break;
1838 case 6: selectcmd = shared.select6; break;
1839 case 7: selectcmd = shared.select7; break;
1840 case 8: selectcmd = shared.select8; break;
1841 case 9: selectcmd = shared.select9; break;
1842 default:
1843 selectcmd = createObject(REDIS_STRING,
1844 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1845 selectcmd->refcount = 0;
1846 break;
1847 }
1848 addReply(slave,selectcmd);
1849 slave->slaveseldb = dictid;
1850 }
1851 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1852 }
1853 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1854 if (outv != static_outv) zfree(outv);
1855 }
1856
1857 static void processInputBuffer(redisClient *c) {
1858 again:
1859 if (c->bulklen == -1) {
1860 /* Read the first line of the query */
1861 char *p = strchr(c->querybuf,'\n');
1862 size_t querylen;
1863
1864 if (p) {
1865 sds query, *argv;
1866 int argc, j;
1867
1868 query = c->querybuf;
1869 c->querybuf = sdsempty();
1870 querylen = 1+(p-(query));
1871 if (sdslen(query) > querylen) {
1872 /* leave data after the first line of the query in the buffer */
1873 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1874 }
1875 *p = '\0'; /* remove "\n" */
1876 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1877 sdsupdatelen(query);
1878
1879 /* Now we can split the query in arguments */
1880 if (sdslen(query) == 0) {
1881 /* Ignore empty query */
1882 sdsfree(query);
1883 return;
1884 }
1885 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1886 sdsfree(query);
1887
1888 if (c->argv) zfree(c->argv);
1889 c->argv = zmalloc(sizeof(robj*)*argc);
1890
1891 for (j = 0; j < argc; j++) {
1892 if (sdslen(argv[j])) {
1893 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1894 c->argc++;
1895 } else {
1896 sdsfree(argv[j]);
1897 }
1898 }
1899 zfree(argv);
1900 /* Execute the command. If the client is still valid
1901 * after processCommand() return and there is something
1902 * on the query buffer try to process the next command. */
1903 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1904 return;
1905 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1906 redisLog(REDIS_DEBUG, "Client protocol error");
1907 freeClient(c);
1908 return;
1909 }
1910 } else {
1911 /* Bulk read handling. Note that if we are at this point
1912 the client already sent a command terminated with a newline,
1913 we are reading the bulk data that is actually the last
1914 argument of the command. */
1915 int qbl = sdslen(c->querybuf);
1916
1917 if (c->bulklen <= qbl) {
1918 /* Copy everything but the final CRLF as final argument */
1919 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1920 c->argc++;
1921 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1922 /* Process the command. If the client is still valid after
1923 * the processing and there is more data in the buffer
1924 * try to parse it. */
1925 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1926 return;
1927 }
1928 }
1929 }
1930
1931 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1932 redisClient *c = (redisClient*) privdata;
1933 char buf[REDIS_IOBUF_LEN];
1934 int nread;
1935 REDIS_NOTUSED(el);
1936 REDIS_NOTUSED(mask);
1937
1938 nread = read(fd, buf, REDIS_IOBUF_LEN);
1939 if (nread == -1) {
1940 if (errno == EAGAIN) {
1941 nread = 0;
1942 } else {
1943 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1944 freeClient(c);
1945 return;
1946 }
1947 } else if (nread == 0) {
1948 redisLog(REDIS_DEBUG, "Client closed connection");
1949 freeClient(c);
1950 return;
1951 }
1952 if (nread) {
1953 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1954 c->lastinteraction = time(NULL);
1955 } else {
1956 return;
1957 }
1958 processInputBuffer(c);
1959 }
1960
1961 static int selectDb(redisClient *c, int id) {
1962 if (id < 0 || id >= server.dbnum)
1963 return REDIS_ERR;
1964 c->db = &server.db[id];
1965 return REDIS_OK;
1966 }
1967
1968 static void *dupClientReplyValue(void *o) {
1969 incrRefCount((robj*)o);
1970 return 0;
1971 }
1972
1973 static redisClient *createClient(int fd) {
1974 redisClient *c = zmalloc(sizeof(*c));
1975
1976 anetNonBlock(NULL,fd);
1977 anetTcpNoDelay(NULL,fd);
1978 if (!c) return NULL;
1979 selectDb(c,0);
1980 c->fd = fd;
1981 c->querybuf = sdsempty();
1982 c->argc = 0;
1983 c->argv = NULL;
1984 c->bulklen = -1;
1985 c->multibulk = 0;
1986 c->mbargc = 0;
1987 c->mbargv = NULL;
1988 c->sentlen = 0;
1989 c->flags = 0;
1990 c->lastinteraction = time(NULL);
1991 c->authenticated = 0;
1992 c->replstate = REDIS_REPL_NONE;
1993 c->reply = listCreate();
1994 listSetFreeMethod(c->reply,decrRefCount);
1995 listSetDupMethod(c->reply,dupClientReplyValue);
1996 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1997 readQueryFromClient, c) == AE_ERR) {
1998 freeClient(c);
1999 return NULL;
2000 }
2001 listAddNodeTail(server.clients,c);
2002 return c;
2003 }
2004
2005 static void addReply(redisClient *c, robj *obj) {
2006 if (listLength(c->reply) == 0 &&
2007 (c->replstate == REDIS_REPL_NONE ||
2008 c->replstate == REDIS_REPL_ONLINE) &&
2009 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
2010 sendReplyToClient, c) == AE_ERR) return;
2011 listAddNodeTail(c->reply,getDecodedObject(obj));
2012 }
2013
2014 static void addReplySds(redisClient *c, sds s) {
2015 robj *o = createObject(REDIS_STRING,s);
2016 addReply(c,o);
2017 decrRefCount(o);
2018 }
2019
2020 static void addReplyDouble(redisClient *c, double d) {
2021 char buf[128];
2022
2023 snprintf(buf,sizeof(buf),"%.17g",d);
2024 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2025 (unsigned long) strlen(buf),buf));
2026 }
2027
2028 static void addReplyBulkLen(redisClient *c, robj *obj) {
2029 size_t len;
2030
2031 if (obj->encoding == REDIS_ENCODING_RAW) {
2032 len = sdslen(obj->ptr);
2033 } else {
2034 long n = (long)obj->ptr;
2035
2036 len = 1;
2037 if (n < 0) {
2038 len++;
2039 n = -n;
2040 }
2041 while((n = n/10) != 0) {
2042 len++;
2043 }
2044 }
2045 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
2046 }
2047
2048 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2049 int cport, cfd;
2050 char cip[128];
2051 redisClient *c;
2052 REDIS_NOTUSED(el);
2053 REDIS_NOTUSED(mask);
2054 REDIS_NOTUSED(privdata);
2055
2056 cfd = anetAccept(server.neterr, fd, cip, &cport);
2057 if (cfd == AE_ERR) {
2058 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2059 return;
2060 }
2061 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2062 if ((c = createClient(cfd)) == NULL) {
2063 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2064 close(cfd); /* May be already closed, just ingore errors */
2065 return;
2066 }
2067 /* If maxclient directive is set and this is one client more... close the
2068 * connection. Note that we create the client instead to check before
2069 * for this condition, since now the socket is already set in nonblocking
2070 * mode and we can send an error for free using the Kernel I/O */
2071 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2072 char *err = "-ERR max number of clients reached\r\n";
2073
2074 /* That's a best effort error message, don't check write errors */
2075 if (write(c->fd,err,strlen(err)) == -1) {
2076 /* Nothing to do, Just to avoid the warning... */
2077 }
2078 freeClient(c);
2079 return;
2080 }
2081 server.stat_numconnections++;
2082 }
2083
2084 /* ======================= Redis objects implementation ===================== */
2085
2086 static robj *createObject(int type, void *ptr) {
2087 robj *o;
2088
2089 if (listLength(server.objfreelist)) {
2090 listNode *head = listFirst(server.objfreelist);
2091 o = listNodeValue(head);
2092 listDelNode(server.objfreelist,head);
2093 } else {
2094 o = zmalloc(sizeof(*o));
2095 }
2096 o->type = type;
2097 o->encoding = REDIS_ENCODING_RAW;
2098 o->ptr = ptr;
2099 o->refcount = 1;
2100 return o;
2101 }
2102
2103 static robj *createStringObject(char *ptr, size_t len) {
2104 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2105 }
2106
2107 static robj *createListObject(void) {
2108 list *l = listCreate();
2109
2110 listSetFreeMethod(l,decrRefCount);
2111 return createObject(REDIS_LIST,l);
2112 }
2113
2114 static robj *createSetObject(void) {
2115 dict *d = dictCreate(&setDictType,NULL);
2116 return createObject(REDIS_SET,d);
2117 }
2118
2119 static robj *createZsetObject(void) {
2120 zset *zs = zmalloc(sizeof(*zs));
2121
2122 zs->dict = dictCreate(&zsetDictType,NULL);
2123 zs->zsl = zslCreate();
2124 return createObject(REDIS_ZSET,zs);
2125 }
2126
2127 static void freeStringObject(robj *o) {
2128 if (o->encoding == REDIS_ENCODING_RAW) {
2129 sdsfree(o->ptr);
2130 }
2131 }
2132
2133 static void freeListObject(robj *o) {
2134 listRelease((list*) o->ptr);
2135 }
2136
2137 static void freeSetObject(robj *o) {
2138 dictRelease((dict*) o->ptr);
2139 }
2140
2141 static void freeZsetObject(robj *o) {
2142 zset *zs = o->ptr;
2143
2144 dictRelease(zs->dict);
2145 zslFree(zs->zsl);
2146 zfree(zs);
2147 }
2148
2149 static void freeHashObject(robj *o) {
2150 dictRelease((dict*) o->ptr);
2151 }
2152
2153 static void incrRefCount(robj *o) {
2154 o->refcount++;
2155 #ifdef DEBUG_REFCOUNT
2156 if (o->type == REDIS_STRING)
2157 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2158 #endif
2159 }
2160
2161 static void decrRefCount(void *obj) {
2162 robj *o = obj;
2163
2164 #ifdef DEBUG_REFCOUNT
2165 if (o->type == REDIS_STRING)
2166 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2167 #endif
2168 if (--(o->refcount) == 0) {
2169 switch(o->type) {
2170 case REDIS_STRING: freeStringObject(o); break;
2171 case REDIS_LIST: freeListObject(o); break;
2172 case REDIS_SET: freeSetObject(o); break;
2173 case REDIS_ZSET: freeZsetObject(o); break;
2174 case REDIS_HASH: freeHashObject(o); break;
2175 default: redisAssert(0 != 0); break;
2176 }
2177 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2178 !listAddNodeHead(server.objfreelist,o))
2179 zfree(o);
2180 }
2181 }
2182
2183 static robj *lookupKey(redisDb *db, robj *key) {
2184 dictEntry *de = dictFind(db->dict,key);
2185 return de ? dictGetEntryVal(de) : NULL;
2186 }
2187
2188 static robj *lookupKeyRead(redisDb *db, robj *key) {
2189 expireIfNeeded(db,key);
2190 return lookupKey(db,key);
2191 }
2192
2193 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2194 deleteIfVolatile(db,key);
2195 return lookupKey(db,key);
2196 }
2197
2198 static int deleteKey(redisDb *db, robj *key) {
2199 int retval;
2200
2201 /* We need to protect key from destruction: after the first dictDelete()
2202 * it may happen that 'key' is no longer valid if we don't increment
2203 * it's count. This may happen when we get the object reference directly
2204 * from the hash table with dictRandomKey() or dict iterators */
2205 incrRefCount(key);
2206 if (dictSize(db->expires)) dictDelete(db->expires,key);
2207 retval = dictDelete(db->dict,key);
2208 decrRefCount(key);
2209
2210 return retval == DICT_OK;
2211 }
2212
2213 /* Try to share an object against the shared objects pool */
2214 static robj *tryObjectSharing(robj *o) {
2215 struct dictEntry *de;
2216 unsigned long c;
2217
2218 if (o == NULL || server.shareobjects == 0) return o;
2219
2220 redisAssert(o->type == REDIS_STRING);
2221 de = dictFind(server.sharingpool,o);
2222 if (de) {
2223 robj *shared = dictGetEntryKey(de);
2224
2225 c = ((unsigned long) dictGetEntryVal(de))+1;
2226 dictGetEntryVal(de) = (void*) c;
2227 incrRefCount(shared);
2228 decrRefCount(o);
2229 return shared;
2230 } else {
2231 /* Here we are using a stream algorihtm: Every time an object is
2232 * shared we increment its count, everytime there is a miss we
2233 * recrement the counter of a random object. If this object reaches
2234 * zero we remove the object and put the current object instead. */
2235 if (dictSize(server.sharingpool) >=
2236 server.sharingpoolsize) {
2237 de = dictGetRandomKey(server.sharingpool);
2238 redisAssert(de != NULL);
2239 c = ((unsigned long) dictGetEntryVal(de))-1;
2240 dictGetEntryVal(de) = (void*) c;
2241 if (c == 0) {
2242 dictDelete(server.sharingpool,de->key);
2243 }
2244 } else {
2245 c = 0; /* If the pool is empty we want to add this object */
2246 }
2247 if (c == 0) {
2248 int retval;
2249
2250 retval = dictAdd(server.sharingpool,o,(void*)1);
2251 redisAssert(retval == DICT_OK);
2252 incrRefCount(o);
2253 }
2254 return o;
2255 }
2256 }
2257
2258 /* Check if the nul-terminated string 's' can be represented by a long
2259 * (that is, is a number that fits into long without any other space or
2260 * character before or after the digits).
2261 *
2262 * If so, the function returns REDIS_OK and *longval is set to the value
2263 * of the number. Otherwise REDIS_ERR is returned */
2264 static int isStringRepresentableAsLong(sds s, long *longval) {
2265 char buf[32], *endptr;
2266 long value;
2267 int slen;
2268
2269 value = strtol(s, &endptr, 10);
2270 if (endptr[0] != '\0') return REDIS_ERR;
2271 slen = snprintf(buf,32,"%ld",value);
2272
2273 /* If the number converted back into a string is not identical
2274 * then it's not possible to encode the string as integer */
2275 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2276 if (longval) *longval = value;
2277 return REDIS_OK;
2278 }
2279
2280 /* Try to encode a string object in order to save space */
2281 static int tryObjectEncoding(robj *o) {
2282 long value;
2283 sds s = o->ptr;
2284
2285 if (o->encoding != REDIS_ENCODING_RAW)
2286 return REDIS_ERR; /* Already encoded */
2287
2288 /* It's not save to encode shared objects: shared objects can be shared
2289 * everywhere in the "object space" of Redis. Encoded objects can only
2290 * appear as "values" (and not, for instance, as keys) */
2291 if (o->refcount > 1) return REDIS_ERR;
2292
2293 /* Currently we try to encode only strings */
2294 redisAssert(o->type == REDIS_STRING);
2295
2296 /* Check if we can represent this string as a long integer */
2297 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2298
2299 /* Ok, this object can be encoded */
2300 o->encoding = REDIS_ENCODING_INT;
2301 sdsfree(o->ptr);
2302 o->ptr = (void*) value;
2303 return REDIS_OK;
2304 }
2305
2306 /* Get a decoded version of an encoded object (returned as a new object).
2307 * If the object is already raw-encoded just increment the ref count. */
2308 static robj *getDecodedObject(robj *o) {
2309 robj *dec;
2310
2311 if (o->encoding == REDIS_ENCODING_RAW) {
2312 incrRefCount(o);
2313 return o;
2314 }
2315 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2316 char buf[32];
2317
2318 snprintf(buf,32,"%ld",(long)o->ptr);
2319 dec = createStringObject(buf,strlen(buf));
2320 return dec;
2321 } else {
2322 redisAssert(1 != 1);
2323 }
2324 }
2325
2326 /* Compare two string objects via strcmp() or alike.
2327 * Note that the objects may be integer-encoded. In such a case we
2328 * use snprintf() to get a string representation of the numbers on the stack
2329 * and compare the strings, it's much faster than calling getDecodedObject().
2330 *
2331 * Important note: if objects are not integer encoded, but binary-safe strings,
2332 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2333 * binary safe. */
2334 static int compareStringObjects(robj *a, robj *b) {
2335 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2336 char bufa[128], bufb[128], *astr, *bstr;
2337 int bothsds = 1;
2338
2339 if (a == b) return 0;
2340 if (a->encoding != REDIS_ENCODING_RAW) {
2341 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2342 astr = bufa;
2343 bothsds = 0;
2344 } else {
2345 astr = a->ptr;
2346 }
2347 if (b->encoding != REDIS_ENCODING_RAW) {
2348 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2349 bstr = bufb;
2350 bothsds = 0;
2351 } else {
2352 bstr = b->ptr;
2353 }
2354 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2355 }
2356
2357 static size_t stringObjectLen(robj *o) {
2358 redisAssert(o->type == REDIS_STRING);
2359 if (o->encoding == REDIS_ENCODING_RAW) {
2360 return sdslen(o->ptr);
2361 } else {
2362 char buf[32];
2363
2364 return snprintf(buf,32,"%ld",(long)o->ptr);
2365 }
2366 }
2367
2368 /*============================ DB saving/loading ============================ */
2369
2370 static int rdbSaveType(FILE *fp, unsigned char type) {
2371 if (fwrite(&type,1,1,fp) == 0) return -1;
2372 return 0;
2373 }
2374
2375 static int rdbSaveTime(FILE *fp, time_t t) {
2376 int32_t t32 = (int32_t) t;
2377 if (fwrite(&t32,4,1,fp) == 0) return -1;
2378 return 0;
2379 }
2380
2381 /* check rdbLoadLen() comments for more info */
2382 static int rdbSaveLen(FILE *fp, uint32_t len) {
2383 unsigned char buf[2];
2384
2385 if (len < (1<<6)) {
2386 /* Save a 6 bit len */
2387 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2388 if (fwrite(buf,1,1,fp) == 0) return -1;
2389 } else if (len < (1<<14)) {
2390 /* Save a 14 bit len */
2391 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2392 buf[1] = len&0xFF;
2393 if (fwrite(buf,2,1,fp) == 0) return -1;
2394 } else {
2395 /* Save a 32 bit len */
2396 buf[0] = (REDIS_RDB_32BITLEN<<6);
2397 if (fwrite(buf,1,1,fp) == 0) return -1;
2398 len = htonl(len);
2399 if (fwrite(&len,4,1,fp) == 0) return -1;
2400 }
2401 return 0;
2402 }
2403
2404 /* String objects in the form "2391" "-100" without any space and with a
2405 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2406 * encoded as integers to save space */
2407 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2408 long long value;
2409 char *endptr, buf[32];
2410
2411 /* Check if it's possible to encode this value as a number */
2412 value = strtoll(s, &endptr, 10);
2413 if (endptr[0] != '\0') return 0;
2414 snprintf(buf,32,"%lld",value);
2415
2416 /* If the number converted back into a string is not identical
2417 * then it's not possible to encode the string as integer */
2418 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2419
2420 /* Finally check if it fits in our ranges */
2421 if (value >= -(1<<7) && value <= (1<<7)-1) {
2422 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2423 enc[1] = value&0xFF;
2424 return 2;
2425 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2426 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2427 enc[1] = value&0xFF;
2428 enc[2] = (value>>8)&0xFF;
2429 return 3;
2430 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2431 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2432 enc[1] = value&0xFF;
2433 enc[2] = (value>>8)&0xFF;
2434 enc[3] = (value>>16)&0xFF;
2435 enc[4] = (value>>24)&0xFF;
2436 return 5;
2437 } else {
2438 return 0;
2439 }
2440 }
2441
2442 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2443 unsigned int comprlen, outlen;
2444 unsigned char byte;
2445 void *out;
2446
2447 /* We require at least four bytes compression for this to be worth it */
2448 outlen = sdslen(obj->ptr)-4;
2449 if (outlen <= 0) return 0;
2450 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2451 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2452 if (comprlen == 0) {
2453 zfree(out);
2454 return 0;
2455 }
2456 /* Data compressed! Let's save it on disk */
2457 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2458 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2459 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2460 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2461 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2462 zfree(out);
2463 return comprlen;
2464
2465 writeerr:
2466 zfree(out);
2467 return -1;
2468 }
2469
2470 /* Save a string objet as [len][data] on disk. If the object is a string
2471 * representation of an integer value we try to safe it in a special form */
2472 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2473 size_t len;
2474 int enclen;
2475
2476 len = sdslen(obj->ptr);
2477
2478 /* Try integer encoding */
2479 if (len <= 11) {
2480 unsigned char buf[5];
2481 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2482 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2483 return 0;
2484 }
2485 }
2486
2487 /* Try LZF compression - under 20 bytes it's unable to compress even
2488 * aaaaaaaaaaaaaaaaaa so skip it */
2489 if (len > 20) {
2490 int retval;
2491
2492 retval = rdbSaveLzfStringObject(fp,obj);
2493 if (retval == -1) return -1;
2494 if (retval > 0) return 0;
2495 /* retval == 0 means data can't be compressed, save the old way */
2496 }
2497
2498 /* Store verbatim */
2499 if (rdbSaveLen(fp,len) == -1) return -1;
2500 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2501 return 0;
2502 }
2503
2504 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2505 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2506 int retval;
2507
2508 obj = getDecodedObject(obj);
2509 retval = rdbSaveStringObjectRaw(fp,obj);
2510 decrRefCount(obj);
2511 return retval;
2512 }
2513
2514 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2515 * 8 bit integer specifing the length of the representation.
2516 * This 8 bit integer has special values in order to specify the following
2517 * conditions:
2518 * 253: not a number
2519 * 254: + inf
2520 * 255: - inf
2521 */
2522 static int rdbSaveDoubleValue(FILE *fp, double val) {
2523 unsigned char buf[128];
2524 int len;
2525
2526 if (isnan(val)) {
2527 buf[0] = 253;
2528 len = 1;
2529 } else if (!isfinite(val)) {
2530 len = 1;
2531 buf[0] = (val < 0) ? 255 : 254;
2532 } else {
2533 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2534 buf[0] = strlen((char*)buf+1);
2535 len = buf[0]+1;
2536 }
2537 if (fwrite(buf,len,1,fp) == 0) return -1;
2538 return 0;
2539 }
2540
2541 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2542 static int rdbSave(char *filename) {
2543 dictIterator *di = NULL;
2544 dictEntry *de;
2545 FILE *fp;
2546 char tmpfile[256];
2547 int j;
2548 time_t now = time(NULL);
2549
2550 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2551 fp = fopen(tmpfile,"w");
2552 if (!fp) {
2553 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2554 return REDIS_ERR;
2555 }
2556 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2557 for (j = 0; j < server.dbnum; j++) {
2558 redisDb *db = server.db+j;
2559 dict *d = db->dict;
2560 if (dictSize(d) == 0) continue;
2561 di = dictGetIterator(d);
2562 if (!di) {
2563 fclose(fp);
2564 return REDIS_ERR;
2565 }
2566
2567 /* Write the SELECT DB opcode */
2568 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2569 if (rdbSaveLen(fp,j) == -1) goto werr;
2570
2571 /* Iterate this DB writing every entry */
2572 while((de = dictNext(di)) != NULL) {
2573 robj *key = dictGetEntryKey(de);
2574 robj *o = dictGetEntryVal(de);
2575 time_t expiretime = getExpire(db,key);
2576
2577 /* Save the expire time */
2578 if (expiretime != -1) {
2579 /* If this key is already expired skip it */
2580 if (expiretime < now) continue;
2581 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2582 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2583 }
2584 /* Save the key and associated value */
2585 if (rdbSaveType(fp,o->type) == -1) goto werr;
2586 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2587 if (o->type == REDIS_STRING) {
2588 /* Save a string value */
2589 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2590 } else if (o->type == REDIS_LIST) {
2591 /* Save a list value */
2592 list *list = o->ptr;
2593 listNode *ln;
2594
2595 listRewind(list);
2596 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2597 while((ln = listYield(list))) {
2598 robj *eleobj = listNodeValue(ln);
2599
2600 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2601 }
2602 } else if (o->type == REDIS_SET) {
2603 /* Save a set value */
2604 dict *set = o->ptr;
2605 dictIterator *di = dictGetIterator(set);
2606 dictEntry *de;
2607
2608 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2609 while((de = dictNext(di)) != NULL) {
2610 robj *eleobj = dictGetEntryKey(de);
2611
2612 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2613 }
2614 dictReleaseIterator(di);
2615 } else if (o->type == REDIS_ZSET) {
2616 /* Save a set value */
2617 zset *zs = o->ptr;
2618 dictIterator *di = dictGetIterator(zs->dict);
2619 dictEntry *de;
2620
2621 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2622 while((de = dictNext(di)) != NULL) {
2623 robj *eleobj = dictGetEntryKey(de);
2624 double *score = dictGetEntryVal(de);
2625
2626 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2627 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2628 }
2629 dictReleaseIterator(di);
2630 } else {
2631 redisAssert(0 != 0);
2632 }
2633 }
2634 dictReleaseIterator(di);
2635 }
2636 /* EOF opcode */
2637 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2638
2639 /* Make sure data will not remain on the OS's output buffers */
2640 fflush(fp);
2641 fsync(fileno(fp));
2642 fclose(fp);
2643
2644 /* Use RENAME to make sure the DB file is changed atomically only
2645 * if the generate DB file is ok. */
2646 if (rename(tmpfile,filename) == -1) {
2647 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2648 unlink(tmpfile);
2649 return REDIS_ERR;
2650 }
2651 redisLog(REDIS_NOTICE,"DB saved on disk");
2652 server.dirty = 0;
2653 server.lastsave = time(NULL);
2654 return REDIS_OK;
2655
2656 werr:
2657 fclose(fp);
2658 unlink(tmpfile);
2659 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2660 if (di) dictReleaseIterator(di);
2661 return REDIS_ERR;
2662 }
2663
2664 static int rdbSaveBackground(char *filename) {
2665 pid_t childpid;
2666
2667 if (server.bgsavechildpid != -1) return REDIS_ERR;
2668 if ((childpid = fork()) == 0) {
2669 /* Child */
2670 close(server.fd);
2671 if (rdbSave(filename) == REDIS_OK) {
2672 exit(0);
2673 } else {
2674 exit(1);
2675 }
2676 } else {
2677 /* Parent */
2678 if (childpid == -1) {
2679 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2680 strerror(errno));
2681 return REDIS_ERR;
2682 }
2683 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2684 server.bgsavechildpid = childpid;
2685 return REDIS_OK;
2686 }
2687 return REDIS_OK; /* unreached */
2688 }
2689
2690 static void rdbRemoveTempFile(pid_t childpid) {
2691 char tmpfile[256];
2692
2693 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2694 unlink(tmpfile);
2695 }
2696
2697 static int rdbLoadType(FILE *fp) {
2698 unsigned char type;
2699 if (fread(&type,1,1,fp) == 0) return -1;
2700 return type;
2701 }
2702
2703 static time_t rdbLoadTime(FILE *fp) {
2704 int32_t t32;
2705 if (fread(&t32,4,1,fp) == 0) return -1;
2706 return (time_t) t32;
2707 }
2708
2709 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2710 * of this file for a description of how this are stored on disk.
2711 *
2712 * isencoded is set to 1 if the readed length is not actually a length but
2713 * an "encoding type", check the above comments for more info */
2714 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2715 unsigned char buf[2];
2716 uint32_t len;
2717
2718 if (isencoded) *isencoded = 0;
2719 if (rdbver == 0) {
2720 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2721 return ntohl(len);
2722 } else {
2723 int type;
2724
2725 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2726 type = (buf[0]&0xC0)>>6;
2727 if (type == REDIS_RDB_6BITLEN) {
2728 /* Read a 6 bit len */
2729 return buf[0]&0x3F;
2730 } else if (type == REDIS_RDB_ENCVAL) {
2731 /* Read a 6 bit len encoding type */
2732 if (isencoded) *isencoded = 1;
2733 return buf[0]&0x3F;
2734 } else if (type == REDIS_RDB_14BITLEN) {
2735 /* Read a 14 bit len */
2736 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2737 return ((buf[0]&0x3F)<<8)|buf[1];
2738 } else {
2739 /* Read a 32 bit len */
2740 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2741 return ntohl(len);
2742 }
2743 }
2744 }
2745
2746 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2747 unsigned char enc[4];
2748 long long val;
2749
2750 if (enctype == REDIS_RDB_ENC_INT8) {
2751 if (fread(enc,1,1,fp) == 0) return NULL;
2752 val = (signed char)enc[0];
2753 } else if (enctype == REDIS_RDB_ENC_INT16) {
2754 uint16_t v;
2755 if (fread(enc,2,1,fp) == 0) return NULL;
2756 v = enc[0]|(enc[1]<<8);
2757 val = (int16_t)v;
2758 } else if (enctype == REDIS_RDB_ENC_INT32) {
2759 uint32_t v;
2760 if (fread(enc,4,1,fp) == 0) return NULL;
2761 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2762 val = (int32_t)v;
2763 } else {
2764 val = 0; /* anti-warning */
2765 redisAssert(0!=0);
2766 }
2767 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2768 }
2769
2770 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2771 unsigned int len, clen;
2772 unsigned char *c = NULL;
2773 sds val = NULL;
2774
2775 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2776 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2777 if ((c = zmalloc(clen)) == NULL) goto err;
2778 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2779 if (fread(c,clen,1,fp) == 0) goto err;
2780 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2781 zfree(c);
2782 return createObject(REDIS_STRING,val);
2783 err:
2784 zfree(c);
2785 sdsfree(val);
2786 return NULL;
2787 }
2788
2789 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2790 int isencoded;
2791 uint32_t len;
2792 sds val;
2793
2794 len = rdbLoadLen(fp,rdbver,&isencoded);
2795 if (isencoded) {
2796 switch(len) {
2797 case REDIS_RDB_ENC_INT8:
2798 case REDIS_RDB_ENC_INT16:
2799 case REDIS_RDB_ENC_INT32:
2800 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2801 case REDIS_RDB_ENC_LZF:
2802 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2803 default:
2804 redisAssert(0!=0);
2805 }
2806 }
2807
2808 if (len == REDIS_RDB_LENERR) return NULL;
2809 val = sdsnewlen(NULL,len);
2810 if (len && fread(val,len,1,fp) == 0) {
2811 sdsfree(val);
2812 return NULL;
2813 }
2814 return tryObjectSharing(createObject(REDIS_STRING,val));
2815 }
2816
2817 /* For information about double serialization check rdbSaveDoubleValue() */
2818 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2819 char buf[128];
2820 unsigned char len;
2821
2822 if (fread(&len,1,1,fp) == 0) return -1;
2823 switch(len) {
2824 case 255: *val = R_NegInf; return 0;
2825 case 254: *val = R_PosInf; return 0;
2826 case 253: *val = R_Nan; return 0;
2827 default:
2828 if (fread(buf,len,1,fp) == 0) return -1;
2829 buf[len] = '\0';
2830 sscanf(buf, "%lg", val);
2831 return 0;
2832 }
2833 }
2834
2835 static int rdbLoad(char *filename) {
2836 FILE *fp;
2837 robj *keyobj = NULL;
2838 uint32_t dbid;
2839 int type, retval, rdbver;
2840 dict *d = server.db[0].dict;
2841 redisDb *db = server.db+0;
2842 char buf[1024];
2843 time_t expiretime = -1, now = time(NULL);
2844
2845 fp = fopen(filename,"r");
2846 if (!fp) return REDIS_ERR;
2847 if (fread(buf,9,1,fp) == 0) goto eoferr;
2848 buf[9] = '\0';
2849 if (memcmp(buf,"REDIS",5) != 0) {
2850 fclose(fp);
2851 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2852 return REDIS_ERR;
2853 }
2854 rdbver = atoi(buf+5);
2855 if (rdbver > 1) {
2856 fclose(fp);
2857 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2858 return REDIS_ERR;
2859 }
2860 while(1) {
2861 robj *o;
2862
2863 /* Read type. */
2864 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2865 if (type == REDIS_EXPIRETIME) {
2866 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2867 /* We read the time so we need to read the object type again */
2868 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2869 }
2870 if (type == REDIS_EOF) break;
2871 /* Handle SELECT DB opcode as a special case */
2872 if (type == REDIS_SELECTDB) {
2873 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2874 goto eoferr;
2875 if (dbid >= (unsigned)server.dbnum) {
2876 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2877 exit(1);
2878 }
2879 db = server.db+dbid;
2880 d = db->dict;
2881 continue;
2882 }
2883 /* Read key */
2884 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2885
2886 if (type == REDIS_STRING) {
2887 /* Read string value */
2888 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2889 tryObjectEncoding(o);
2890 } else if (type == REDIS_LIST || type == REDIS_SET) {
2891 /* Read list/set value */
2892 uint32_t listlen;
2893
2894 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2895 goto eoferr;
2896 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2897 /* Load every single element of the list/set */
2898 while(listlen--) {
2899 robj *ele;
2900
2901 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2902 tryObjectEncoding(ele);
2903 if (type == REDIS_LIST) {
2904 listAddNodeTail((list*)o->ptr,ele);
2905 } else {
2906 dictAdd((dict*)o->ptr,ele,NULL);
2907 }
2908 }
2909 } else if (type == REDIS_ZSET) {
2910 /* Read list/set value */
2911 uint32_t zsetlen;
2912 zset *zs;
2913
2914 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2915 goto eoferr;
2916 o = createZsetObject();
2917 zs = o->ptr;
2918 /* Load every single element of the list/set */
2919 while(zsetlen--) {
2920 robj *ele;
2921 double *score = zmalloc(sizeof(double));
2922
2923 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2924 tryObjectEncoding(ele);
2925 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2926 dictAdd(zs->dict,ele,score);
2927 zslInsert(zs->zsl,*score,ele);
2928 incrRefCount(ele); /* added to skiplist */
2929 }
2930 } else {
2931 redisAssert(0 != 0);
2932 }
2933 /* Add the new object in the hash table */
2934 retval = dictAdd(d,keyobj,o);
2935 if (retval == DICT_ERR) {
2936 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2937 exit(1);
2938 }
2939 /* Set the expire time if needed */
2940 if (expiretime != -1) {
2941 setExpire(db,keyobj,expiretime);
2942 /* Delete this key if already expired */
2943 if (expiretime < now) deleteKey(db,keyobj);
2944 expiretime = -1;
2945 }
2946 keyobj = o = NULL;
2947 }
2948 fclose(fp);
2949 return REDIS_OK;
2950
2951 eoferr: /* unexpected end of file is handled here with a fatal exit */
2952 if (keyobj) decrRefCount(keyobj);
2953 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2954 exit(1);
2955 return REDIS_ERR; /* Just to avoid warning */
2956 }
2957
2958 /*================================== Commands =============================== */
2959
2960 static void authCommand(redisClient *c) {
2961 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2962 c->authenticated = 1;
2963 addReply(c,shared.ok);
2964 } else {
2965 c->authenticated = 0;
2966 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2967 }
2968 }
2969
2970 static void pingCommand(redisClient *c) {
2971 addReply(c,shared.pong);
2972 }
2973
2974 static void echoCommand(redisClient *c) {
2975 addReplyBulkLen(c,c->argv[1]);
2976 addReply(c,c->argv[1]);
2977 addReply(c,shared.crlf);
2978 }
2979
2980 /*=================================== Strings =============================== */
2981
2982 static void setGenericCommand(redisClient *c, int nx) {
2983 int retval;
2984
2985 if (nx) deleteIfVolatile(c->db,c->argv[1]);
2986 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2987 if (retval == DICT_ERR) {
2988 if (!nx) {
2989 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2990 incrRefCount(c->argv[2]);
2991 } else {
2992 addReply(c,shared.czero);
2993 return;
2994 }
2995 } else {
2996 incrRefCount(c->argv[1]);
2997 incrRefCount(c->argv[2]);
2998 }
2999 server.dirty++;
3000 removeExpire(c->db,c->argv[1]);
3001 addReply(c, nx ? shared.cone : shared.ok);
3002 }
3003
3004 static void setCommand(redisClient *c) {
3005 setGenericCommand(c,0);
3006 }
3007
3008 static void setnxCommand(redisClient *c) {
3009 setGenericCommand(c,1);
3010 }
3011
3012 static void getCommand(redisClient *c) {
3013 robj *o = lookupKeyRead(c->db,c->argv[1]);
3014
3015 if (o == NULL) {
3016 addReply(c,shared.nullbulk);
3017 } else {
3018 if (o->type != REDIS_STRING) {
3019 addReply(c,shared.wrongtypeerr);
3020 } else {
3021 addReplyBulkLen(c,o);
3022 addReply(c,o);
3023 addReply(c,shared.crlf);
3024 }
3025 }
3026 }
3027
3028 static void getsetCommand(redisClient *c) {
3029 getCommand(c);
3030 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3031 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3032 } else {
3033 incrRefCount(c->argv[1]);
3034 }
3035 incrRefCount(c->argv[2]);
3036 server.dirty++;
3037 removeExpire(c->db,c->argv[1]);
3038 }
3039
3040 static void mgetCommand(redisClient *c) {
3041 int j;
3042
3043 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3044 for (j = 1; j < c->argc; j++) {
3045 robj *o = lookupKeyRead(c->db,c->argv[j]);
3046 if (o == NULL) {
3047 addReply(c,shared.nullbulk);
3048 } else {
3049 if (o->type != REDIS_STRING) {
3050 addReply(c,shared.nullbulk);
3051 } else {
3052 addReplyBulkLen(c,o);
3053 addReply(c,o);
3054 addReply(c,shared.crlf);
3055 }
3056 }
3057 }
3058 }
3059
3060 static void msetGenericCommand(redisClient *c, int nx) {
3061 int j, busykeys = 0;
3062
3063 if ((c->argc % 2) == 0) {
3064 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3065 return;
3066 }
3067 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3068 * set nothing at all if at least one already key exists. */
3069 if (nx) {
3070 for (j = 1; j < c->argc; j += 2) {
3071 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3072 busykeys++;
3073 }
3074 }
3075 }
3076 if (busykeys) {
3077 addReply(c, shared.czero);
3078 return;
3079 }
3080
3081 for (j = 1; j < c->argc; j += 2) {
3082 int retval;
3083
3084 tryObjectEncoding(c->argv[j+1]);
3085 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3086 if (retval == DICT_ERR) {
3087 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3088 incrRefCount(c->argv[j+1]);
3089 } else {
3090 incrRefCount(c->argv[j]);
3091 incrRefCount(c->argv[j+1]);
3092 }
3093 removeExpire(c->db,c->argv[j]);
3094 }
3095 server.dirty += (c->argc-1)/2;
3096 addReply(c, nx ? shared.cone : shared.ok);
3097 }
3098
3099 static void msetCommand(redisClient *c) {
3100 msetGenericCommand(c,0);
3101 }
3102
3103 static void msetnxCommand(redisClient *c) {
3104 msetGenericCommand(c,1);
3105 }
3106
3107 static void incrDecrCommand(redisClient *c, long long incr) {
3108 long long value;
3109 int retval;
3110 robj *o;
3111
3112 o = lookupKeyWrite(c->db,c->argv[1]);
3113 if (o == NULL) {
3114 value = 0;
3115 } else {
3116 if (o->type != REDIS_STRING) {
3117 value = 0;
3118 } else {
3119 char *eptr;
3120
3121 if (o->encoding == REDIS_ENCODING_RAW)
3122 value = strtoll(o->ptr, &eptr, 10);
3123 else if (o->encoding == REDIS_ENCODING_INT)
3124 value = (long)o->ptr;
3125 else
3126 redisAssert(1 != 1);
3127 }
3128 }
3129
3130 value += incr;
3131 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3132 tryObjectEncoding(o);
3133 retval = dictAdd(c->db->dict,c->argv[1],o);
3134 if (retval == DICT_ERR) {
3135 dictReplace(c->db->dict,c->argv[1],o);
3136 removeExpire(c->db,c->argv[1]);
3137 } else {
3138 incrRefCount(c->argv[1]);
3139 }
3140 server.dirty++;
3141 addReply(c,shared.colon);
3142 addReply(c,o);
3143 addReply(c,shared.crlf);
3144 }
3145
3146 static void incrCommand(redisClient *c) {
3147 incrDecrCommand(c,1);
3148 }
3149
3150 static void decrCommand(redisClient *c) {
3151 incrDecrCommand(c,-1);
3152 }
3153
3154 static void incrbyCommand(redisClient *c) {
3155 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3156 incrDecrCommand(c,incr);
3157 }
3158
3159 static void decrbyCommand(redisClient *c) {
3160 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3161 incrDecrCommand(c,-incr);
3162 }
3163
3164 /* ========================= Type agnostic commands ========================= */
3165
3166 static void delCommand(redisClient *c) {
3167 int deleted = 0, j;
3168
3169 for (j = 1; j < c->argc; j++) {
3170 if (deleteKey(c->db,c->argv[j])) {
3171 server.dirty++;
3172 deleted++;
3173 }
3174 }
3175 switch(deleted) {
3176 case 0:
3177 addReply(c,shared.czero);
3178 break;
3179 case 1:
3180 addReply(c,shared.cone);
3181 break;
3182 default:
3183 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3184 break;
3185 }
3186 }
3187
3188 static void existsCommand(redisClient *c) {
3189 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3190 }
3191
3192 static void selectCommand(redisClient *c) {
3193 int id = atoi(c->argv[1]->ptr);
3194
3195 if (selectDb(c,id) == REDIS_ERR) {
3196 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3197 } else {
3198 addReply(c,shared.ok);
3199 }
3200 }
3201
3202 static void randomkeyCommand(redisClient *c) {
3203 dictEntry *de;
3204
3205 while(1) {
3206 de = dictGetRandomKey(c->db->dict);
3207 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3208 }
3209 if (de == NULL) {
3210 addReply(c,shared.plus);
3211 addReply(c,shared.crlf);
3212 } else {
3213 addReply(c,shared.plus);
3214 addReply(c,dictGetEntryKey(de));
3215 addReply(c,shared.crlf);
3216 }
3217 }
3218
3219 static void keysCommand(redisClient *c) {
3220 dictIterator *di;
3221 dictEntry *de;
3222 sds pattern = c->argv[1]->ptr;
3223 int plen = sdslen(pattern);
3224 unsigned long numkeys = 0, keyslen = 0;
3225 robj *lenobj = createObject(REDIS_STRING,NULL);
3226
3227 di = dictGetIterator(c->db->dict);
3228 addReply(c,lenobj);
3229 decrRefCount(lenobj);
3230 while((de = dictNext(di)) != NULL) {
3231 robj *keyobj = dictGetEntryKey(de);
3232
3233 sds key = keyobj->ptr;
3234 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3235 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3236 if (expireIfNeeded(c->db,keyobj) == 0) {
3237 if (numkeys != 0)
3238 addReply(c,shared.space);
3239 addReply(c,keyobj);
3240 numkeys++;
3241 keyslen += sdslen(key);
3242 }
3243 }
3244 }
3245 dictReleaseIterator(di);
3246 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3247 addReply(c,shared.crlf);
3248 }
3249
3250 static void dbsizeCommand(redisClient *c) {
3251 addReplySds(c,
3252 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3253 }
3254
3255 static void lastsaveCommand(redisClient *c) {
3256 addReplySds(c,
3257 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3258 }
3259
3260 static void typeCommand(redisClient *c) {
3261 robj *o;
3262 char *type;
3263
3264 o = lookupKeyRead(c->db,c->argv[1]);
3265 if (o == NULL) {
3266 type = "+none";
3267 } else {
3268 switch(o->type) {
3269 case REDIS_STRING: type = "+string"; break;
3270 case REDIS_LIST: type = "+list"; break;
3271 case REDIS_SET: type = "+set"; break;
3272 case REDIS_ZSET: type = "+zset"; break;
3273 default: type = "unknown"; break;
3274 }
3275 }
3276 addReplySds(c,sdsnew(type));
3277 addReply(c,shared.crlf);
3278 }
3279
3280 static void saveCommand(redisClient *c) {
3281 if (server.bgsavechildpid != -1) {
3282 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3283 return;
3284 }
3285 if (rdbSave(server.dbfilename) == REDIS_OK) {
3286 addReply(c,shared.ok);
3287 } else {
3288 addReply(c,shared.err);
3289 }
3290 }
3291
3292 static void bgsaveCommand(redisClient *c) {
3293 if (server.bgsavechildpid != -1) {
3294 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3295 return;
3296 }
3297 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3298 char *status = "+Background saving started\r\n";
3299 addReplySds(c,sdsnew(status));
3300 } else {
3301 addReply(c,shared.err);
3302 }
3303 }
3304
3305 static void shutdownCommand(redisClient *c) {
3306 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3307 /* Kill the saving child if there is a background saving in progress.
3308 We want to avoid race conditions, for instance our saving child may
3309 overwrite the synchronous saving did by SHUTDOWN. */
3310 if (server.bgsavechildpid != -1) {
3311 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3312 kill(server.bgsavechildpid,SIGKILL);
3313 rdbRemoveTempFile(server.bgsavechildpid);
3314 }
3315 /* SYNC SAVE */
3316 if (rdbSave(server.dbfilename) == REDIS_OK) {
3317 if (server.daemonize)
3318 unlink(server.pidfile);
3319 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3320 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3321 exit(1);
3322 } else {
3323 /* Ooops.. error saving! The best we can do is to continue operating.
3324 * Note that if there was a background saving process, in the next
3325 * cron() Redis will be notified that the background saving aborted,
3326 * handling special stuff like slaves pending for synchronization... */
3327 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3328 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3329 }
3330 }
3331
3332 static void renameGenericCommand(redisClient *c, int nx) {
3333 robj *o;
3334
3335 /* To use the same key as src and dst is probably an error */
3336 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3337 addReply(c,shared.sameobjecterr);
3338 return;
3339 }
3340
3341 o = lookupKeyWrite(c->db,c->argv[1]);
3342 if (o == NULL) {
3343 addReply(c,shared.nokeyerr);
3344 return;
3345 }
3346 incrRefCount(o);
3347 deleteIfVolatile(c->db,c->argv[2]);
3348 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3349 if (nx) {
3350 decrRefCount(o);
3351 addReply(c,shared.czero);
3352 return;
3353 }
3354 dictReplace(c->db->dict,c->argv[2],o);
3355 } else {
3356 incrRefCount(c->argv[2]);
3357 }
3358 deleteKey(c->db,c->argv[1]);
3359 server.dirty++;
3360 addReply(c,nx ? shared.cone : shared.ok);
3361 }
3362
3363 static void renameCommand(redisClient *c) {
3364 renameGenericCommand(c,0);
3365 }
3366
3367 static void renamenxCommand(redisClient *c) {
3368 renameGenericCommand(c,1);
3369 }
3370
3371 static void moveCommand(redisClient *c) {
3372 robj *o;
3373 redisDb *src, *dst;
3374 int srcid;
3375
3376 /* Obtain source and target DB pointers */
3377 src = c->db;
3378 srcid = c->db->id;
3379 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3380 addReply(c,shared.outofrangeerr);
3381 return;
3382 }
3383 dst = c->db;
3384 selectDb(c,srcid); /* Back to the source DB */
3385
3386 /* If the user is moving using as target the same
3387 * DB as the source DB it is probably an error. */
3388 if (src == dst) {
3389 addReply(c,shared.sameobjecterr);
3390 return;
3391 }
3392
3393 /* Check if the element exists and get a reference */
3394 o = lookupKeyWrite(c->db,c->argv[1]);
3395 if (!o) {
3396 addReply(c,shared.czero);
3397 return;
3398 }
3399
3400 /* Try to add the element to the target DB */
3401 deleteIfVolatile(dst,c->argv[1]);
3402 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3403 addReply(c,shared.czero);
3404 return;
3405 }
3406 incrRefCount(c->argv[1]);
3407 incrRefCount(o);
3408
3409 /* OK! key moved, free the entry in the source DB */
3410 deleteKey(src,c->argv[1]);
3411 server.dirty++;
3412 addReply(c,shared.cone);
3413 }
3414
3415 /* =================================== Lists ================================ */
3416 static void pushGenericCommand(redisClient *c, int where) {
3417 robj *lobj;
3418 list *list;
3419
3420 lobj = lookupKeyWrite(c->db,c->argv[1]);
3421 if (lobj == NULL) {
3422 lobj = createListObject();
3423 list = lobj->ptr;
3424 if (where == REDIS_HEAD) {
3425 listAddNodeHead(list,c->argv[2]);
3426 } else {
3427 listAddNodeTail(list,c->argv[2]);
3428 }
3429 dictAdd(c->db->dict,c->argv[1],lobj);
3430 incrRefCount(c->argv[1]);
3431 incrRefCount(c->argv[2]);
3432 } else {
3433 if (lobj->type != REDIS_LIST) {
3434 addReply(c,shared.wrongtypeerr);
3435 return;
3436 }
3437 list = lobj->ptr;
3438 if (where == REDIS_HEAD) {
3439 listAddNodeHead(list,c->argv[2]);
3440 } else {
3441 listAddNodeTail(list,c->argv[2]);
3442 }
3443 incrRefCount(c->argv[2]);
3444 }
3445 server.dirty++;
3446 addReply(c,shared.ok);
3447 }
3448
3449 static void lpushCommand(redisClient *c) {
3450 pushGenericCommand(c,REDIS_HEAD);
3451 }
3452
3453 static void rpushCommand(redisClient *c) {
3454 pushGenericCommand(c,REDIS_TAIL);
3455 }
3456
3457 static void llenCommand(redisClient *c) {
3458 robj *o;
3459 list *l;
3460
3461 o = lookupKeyRead(c->db,c->argv[1]);
3462 if (o == NULL) {
3463 addReply(c,shared.czero);
3464 return;
3465 } else {
3466 if (o->type != REDIS_LIST) {
3467 addReply(c,shared.wrongtypeerr);
3468 } else {
3469 l = o->ptr;
3470 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3471 }
3472 }
3473 }
3474
3475 static void lindexCommand(redisClient *c) {
3476 robj *o;
3477 int index = atoi(c->argv[2]->ptr);
3478
3479 o = lookupKeyRead(c->db,c->argv[1]);
3480 if (o == NULL) {
3481 addReply(c,shared.nullbulk);
3482 } else {
3483 if (o->type != REDIS_LIST) {
3484 addReply(c,shared.wrongtypeerr);
3485 } else {
3486 list *list = o->ptr;
3487 listNode *ln;
3488
3489 ln = listIndex(list, index);
3490 if (ln == NULL) {
3491 addReply(c,shared.nullbulk);
3492 } else {
3493 robj *ele = listNodeValue(ln);
3494 addReplyBulkLen(c,ele);
3495 addReply(c,ele);
3496 addReply(c,shared.crlf);
3497 }
3498 }
3499 }
3500 }
3501
3502 static void lsetCommand(redisClient *c) {
3503 robj *o;
3504 int index = atoi(c->argv[2]->ptr);
3505
3506 o = lookupKeyWrite(c->db,c->argv[1]);
3507 if (o == NULL) {
3508 addReply(c,shared.nokeyerr);
3509 } else {
3510 if (o->type != REDIS_LIST) {
3511 addReply(c,shared.wrongtypeerr);
3512 } else {
3513 list *list = o->ptr;
3514 listNode *ln;
3515
3516 ln = listIndex(list, index);
3517 if (ln == NULL) {
3518 addReply(c,shared.outofrangeerr);
3519 } else {
3520 robj *ele = listNodeValue(ln);
3521
3522 decrRefCount(ele);
3523 listNodeValue(ln) = c->argv[3];
3524 incrRefCount(c->argv[3]);
3525 addReply(c,shared.ok);
3526 server.dirty++;
3527 }
3528 }
3529 }
3530 }
3531
3532 static void popGenericCommand(redisClient *c, int where) {
3533 robj *o;
3534
3535 o = lookupKeyWrite(c->db,c->argv[1]);
3536 if (o == NULL) {
3537 addReply(c,shared.nullbulk);
3538 } else {
3539 if (o->type != REDIS_LIST) {
3540 addReply(c,shared.wrongtypeerr);
3541 } else {
3542 list *list = o->ptr;
3543 listNode *ln;
3544
3545 if (where == REDIS_HEAD)
3546 ln = listFirst(list);
3547 else
3548 ln = listLast(list);
3549
3550 if (ln == NULL) {
3551 addReply(c,shared.nullbulk);
3552 } else {
3553 robj *ele = listNodeValue(ln);
3554 addReplyBulkLen(c,ele);
3555 addReply(c,ele);
3556 addReply(c,shared.crlf);
3557 listDelNode(list,ln);
3558 server.dirty++;
3559 }
3560 }
3561 }
3562 }
3563
3564 static void lpopCommand(redisClient *c) {
3565 popGenericCommand(c,REDIS_HEAD);
3566 }
3567
3568 static void rpopCommand(redisClient *c) {
3569 popGenericCommand(c,REDIS_TAIL);
3570 }
3571
3572 static void lrangeCommand(redisClient *c) {
3573 robj *o;
3574 int start = atoi(c->argv[2]->ptr);
3575 int end = atoi(c->argv[3]->ptr);
3576
3577 o = lookupKeyRead(c->db,c->argv[1]);
3578 if (o == NULL) {
3579 addReply(c,shared.nullmultibulk);
3580 } else {
3581 if (o->type != REDIS_LIST) {
3582 addReply(c,shared.wrongtypeerr);
3583 } else {
3584 list *list = o->ptr;
3585 listNode *ln;
3586 int llen = listLength(list);
3587 int rangelen, j;
3588 robj *ele;
3589
3590 /* convert negative indexes */
3591 if (start < 0) start = llen+start;
3592 if (end < 0) end = llen+end;
3593 if (start < 0) start = 0;
3594 if (end < 0) end = 0;
3595
3596 /* indexes sanity checks */
3597 if (start > end || start >= llen) {
3598 /* Out of range start or start > end result in empty list */
3599 addReply(c,shared.emptymultibulk);
3600 return;
3601 }
3602 if (end >= llen) end = llen-1;
3603 rangelen = (end-start)+1;
3604
3605 /* Return the result in form of a multi-bulk reply */
3606 ln = listIndex(list, start);
3607 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3608 for (j = 0; j < rangelen; j++) {
3609 ele = listNodeValue(ln);
3610 addReplyBulkLen(c,ele);
3611 addReply(c,ele);
3612 addReply(c,shared.crlf);
3613 ln = ln->next;
3614 }
3615 }
3616 }
3617 }
3618
3619 static void ltrimCommand(redisClient *c) {
3620 robj *o;
3621 int start = atoi(c->argv[2]->ptr);
3622 int end = atoi(c->argv[3]->ptr);
3623
3624 o = lookupKeyWrite(c->db,c->argv[1]);
3625 if (o == NULL) {
3626 addReply(c,shared.nokeyerr);
3627 } else {
3628 if (o->type != REDIS_LIST) {
3629 addReply(c,shared.wrongtypeerr);
3630 } else {
3631 list *list = o->ptr;
3632 listNode *ln;
3633 int llen = listLength(list);
3634 int j, ltrim, rtrim;
3635
3636 /* convert negative indexes */
3637 if (start < 0) start = llen+start;
3638 if (end < 0) end = llen+end;
3639 if (start < 0) start = 0;
3640 if (end < 0) end = 0;
3641
3642 /* indexes sanity checks */
3643 if (start > end || start >= llen) {
3644 /* Out of range start or start > end result in empty list */
3645 ltrim = llen;
3646 rtrim = 0;
3647 } else {
3648 if (end >= llen) end = llen-1;
3649 ltrim = start;
3650 rtrim = llen-end-1;
3651 }
3652
3653 /* Remove list elements to perform the trim */
3654 for (j = 0; j < ltrim; j++) {
3655 ln = listFirst(list);
3656 listDelNode(list,ln);
3657 }
3658 for (j = 0; j < rtrim; j++) {
3659 ln = listLast(list);
3660 listDelNode(list,ln);
3661 }
3662 server.dirty++;
3663 addReply(c,shared.ok);
3664 }
3665 }
3666 }
3667
3668 static void lremCommand(redisClient *c) {
3669 robj *o;
3670
3671 o = lookupKeyWrite(c->db,c->argv[1]);
3672 if (o == NULL) {
3673 addReply(c,shared.czero);
3674 } else {
3675 if (o->type != REDIS_LIST) {
3676 addReply(c,shared.wrongtypeerr);
3677 } else {
3678 list *list = o->ptr;
3679 listNode *ln, *next;
3680 int toremove = atoi(c->argv[2]->ptr);
3681 int removed = 0;
3682 int fromtail = 0;
3683
3684 if (toremove < 0) {
3685 toremove = -toremove;
3686 fromtail = 1;
3687 }
3688 ln = fromtail ? list->tail : list->head;
3689 while (ln) {
3690 robj *ele = listNodeValue(ln);
3691
3692 next = fromtail ? ln->prev : ln->next;
3693 if (compareStringObjects(ele,c->argv[3]) == 0) {
3694 listDelNode(list,ln);
3695 server.dirty++;
3696 removed++;
3697 if (toremove && removed == toremove) break;
3698 }
3699 ln = next;
3700 }
3701 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3702 }
3703 }
3704 }
3705
3706 /* This is the semantic of this command:
3707 * RPOPLPUSH srclist dstlist:
3708 * IF LLEN(srclist) > 0
3709 * element = RPOP srclist
3710 * LPUSH dstlist element
3711 * RETURN element
3712 * ELSE
3713 * RETURN nil
3714 * END
3715 * END
3716 *
3717 * The idea is to be able to get an element from a list in a reliable way
3718 * since the element is not just returned but pushed against another list
3719 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3720 */
3721 static void rpoplpushcommand(redisClient *c) {
3722 robj *sobj;
3723
3724 sobj = lookupKeyWrite(c->db,c->argv[1]);
3725 if (sobj == NULL) {
3726 addReply(c,shared.nullbulk);
3727 } else {
3728 if (sobj->type != REDIS_LIST) {
3729 addReply(c,shared.wrongtypeerr);
3730 } else {
3731 list *srclist = sobj->ptr;
3732 listNode *ln = listLast(srclist);
3733
3734 if (ln == NULL) {
3735 addReply(c,shared.nullbulk);
3736 } else {
3737 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3738 robj *ele = listNodeValue(ln);
3739 list *dstlist;
3740
3741 if (dobj == NULL) {
3742
3743 /* Create the list if the key does not exist */
3744 dobj = createListObject();
3745 dictAdd(c->db->dict,c->argv[2],dobj);
3746 incrRefCount(c->argv[2]);
3747 } else if (dobj->type != REDIS_LIST) {
3748 addReply(c,shared.wrongtypeerr);
3749 return;
3750 }
3751 /* Add the element to the target list */
3752 dstlist = dobj->ptr;
3753 listAddNodeHead(dstlist,ele);
3754 incrRefCount(ele);
3755
3756 /* Send the element to the client as reply as well */
3757 addReplyBulkLen(c,ele);
3758 addReply(c,ele);
3759 addReply(c,shared.crlf);
3760
3761 /* Finally remove the element from the source list */
3762 listDelNode(srclist,ln);
3763 server.dirty++;
3764 }
3765 }
3766 }
3767 }
3768
3769
3770 /* ==================================== Sets ================================ */
3771
3772 static void saddCommand(redisClient *c) {
3773 robj *set;
3774
3775 set = lookupKeyWrite(c->db,c->argv[1]);
3776 if (set == NULL) {
3777 set = createSetObject();
3778 dictAdd(c->db->dict,c->argv[1],set);
3779 incrRefCount(c->argv[1]);
3780 } else {
3781 if (set->type != REDIS_SET) {
3782 addReply(c,shared.wrongtypeerr);
3783 return;
3784 }
3785 }
3786 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3787 incrRefCount(c->argv[2]);
3788 server.dirty++;
3789 addReply(c,shared.cone);
3790 } else {
3791 addReply(c,shared.czero);
3792 }
3793 }
3794
3795 static void sremCommand(redisClient *c) {
3796 robj *set;
3797
3798 set = lookupKeyWrite(c->db,c->argv[1]);
3799 if (set == NULL) {
3800 addReply(c,shared.czero);
3801 } else {
3802 if (set->type != REDIS_SET) {
3803 addReply(c,shared.wrongtypeerr);
3804 return;
3805 }
3806 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3807 server.dirty++;
3808 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3809 addReply(c,shared.cone);
3810 } else {
3811 addReply(c,shared.czero);
3812 }
3813 }
3814 }
3815
3816 static void smoveCommand(redisClient *c) {
3817 robj *srcset, *dstset;
3818
3819 srcset = lookupKeyWrite(c->db,c->argv[1]);
3820 dstset = lookupKeyWrite(c->db,c->argv[2]);
3821
3822 /* If the source key does not exist return 0, if it's of the wrong type
3823 * raise an error */
3824 if (srcset == NULL || srcset->type != REDIS_SET) {
3825 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3826 return;
3827 }
3828 /* Error if the destination key is not a set as well */
3829 if (dstset && dstset->type != REDIS_SET) {
3830 addReply(c,shared.wrongtypeerr);
3831 return;
3832 }
3833 /* Remove the element from the source set */
3834 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3835 /* Key not found in the src set! return zero */
3836 addReply(c,shared.czero);
3837 return;
3838 }
3839 server.dirty++;
3840 /* Add the element to the destination set */
3841 if (!dstset) {
3842 dstset = createSetObject();
3843 dictAdd(c->db->dict,c->argv[2],dstset);
3844 incrRefCount(c->argv[2]);
3845 }
3846 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3847 incrRefCount(c->argv[3]);
3848 addReply(c,shared.cone);
3849 }
3850
3851 static void sismemberCommand(redisClient *c) {
3852 robj *set;
3853
3854 set = lookupKeyRead(c->db,c->argv[1]);
3855 if (set == NULL) {
3856 addReply(c,shared.czero);
3857 } else {
3858 if (set->type != REDIS_SET) {
3859 addReply(c,shared.wrongtypeerr);
3860 return;
3861 }
3862 if (dictFind(set->ptr,c->argv[2]))
3863 addReply(c,shared.cone);
3864 else
3865 addReply(c,shared.czero);
3866 }
3867 }
3868
3869 static void scardCommand(redisClient *c) {
3870 robj *o;
3871 dict *s;
3872
3873 o = lookupKeyRead(c->db,c->argv[1]);
3874 if (o == NULL) {
3875 addReply(c,shared.czero);
3876 return;
3877 } else {
3878 if (o->type != REDIS_SET) {
3879 addReply(c,shared.wrongtypeerr);
3880 } else {
3881 s = o->ptr;
3882 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
3883 dictSize(s)));
3884 }
3885 }
3886 }
3887
3888 static void spopCommand(redisClient *c) {
3889 robj *set;
3890 dictEntry *de;
3891
3892 set = lookupKeyWrite(c->db,c->argv[1]);
3893 if (set == NULL) {
3894 addReply(c,shared.nullbulk);
3895 } else {
3896 if (set->type != REDIS_SET) {
3897 addReply(c,shared.wrongtypeerr);
3898 return;
3899 }
3900 de = dictGetRandomKey(set->ptr);
3901 if (de == NULL) {
3902 addReply(c,shared.nullbulk);
3903 } else {
3904 robj *ele = dictGetEntryKey(de);
3905
3906 addReplyBulkLen(c,ele);
3907 addReply(c,ele);
3908 addReply(c,shared.crlf);
3909 dictDelete(set->ptr,ele);
3910 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3911 server.dirty++;
3912 }
3913 }
3914 }
3915
3916 static void srandmemberCommand(redisClient *c) {
3917 robj *set;
3918 dictEntry *de;
3919
3920 set = lookupKeyRead(c->db,c->argv[1]);
3921 if (set == NULL) {
3922 addReply(c,shared.nullbulk);
3923 } else {
3924 if (set->type != REDIS_SET) {
3925 addReply(c,shared.wrongtypeerr);
3926 return;
3927 }
3928 de = dictGetRandomKey(set->ptr);
3929 if (de == NULL) {
3930 addReply(c,shared.nullbulk);
3931 } else {
3932 robj *ele = dictGetEntryKey(de);
3933
3934 addReplyBulkLen(c,ele);
3935 addReply(c,ele);
3936 addReply(c,shared.crlf);
3937 }
3938 }
3939 }
3940
3941 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3942 dict **d1 = (void*) s1, **d2 = (void*) s2;
3943
3944 return dictSize(*d1)-dictSize(*d2);
3945 }
3946
3947 static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
3948 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3949 dictIterator *di;
3950 dictEntry *de;
3951 robj *lenobj = NULL, *dstset = NULL;
3952 unsigned long j, cardinality = 0;
3953
3954 for (j = 0; j < setsnum; j++) {
3955 robj *setobj;
3956
3957 setobj = dstkey ?
3958 lookupKeyWrite(c->db,setskeys[j]) :
3959 lookupKeyRead(c->db,setskeys[j]);
3960 if (!setobj) {
3961 zfree(dv);
3962 if (dstkey) {
3963 deleteKey(c->db,dstkey);
3964 addReply(c,shared.czero);
3965 } else {
3966 addReply(c,shared.nullmultibulk);
3967 }
3968 return;
3969 }
3970 if (setobj->type != REDIS_SET) {
3971 zfree(dv);
3972 addReply(c,shared.wrongtypeerr);
3973 return;
3974 }
3975 dv[j] = setobj->ptr;
3976 }
3977 /* Sort sets from the smallest to largest, this will improve our
3978 * algorithm's performace */
3979 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3980
3981 /* The first thing we should output is the total number of elements...
3982 * since this is a multi-bulk write, but at this stage we don't know
3983 * the intersection set size, so we use a trick, append an empty object
3984 * to the output list and save the pointer to later modify it with the
3985 * right length */
3986 if (!dstkey) {
3987 lenobj = createObject(REDIS_STRING,NULL);
3988 addReply(c,lenobj);
3989 decrRefCount(lenobj);
3990 } else {
3991 /* If we have a target key where to store the resulting set
3992 * create this key with an empty set inside */
3993 dstset = createSetObject();
3994 }
3995
3996 /* Iterate all the elements of the first (smallest) set, and test
3997 * the element against all the other sets, if at least one set does
3998 * not include the element it is discarded */
3999 di = dictGetIterator(dv[0]);
4000
4001 while((de = dictNext(di)) != NULL) {
4002 robj *ele;
4003
4004 for (j = 1; j < setsnum; j++)
4005 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4006 if (j != setsnum)
4007 continue; /* at least one set does not contain the member */
4008 ele = dictGetEntryKey(de);
4009 if (!dstkey) {
4010 addReplyBulkLen(c,ele);
4011 addReply(c,ele);
4012 addReply(c,shared.crlf);
4013 cardinality++;
4014 } else {
4015 dictAdd(dstset->ptr,ele,NULL);
4016 incrRefCount(ele);
4017 }
4018 }
4019 dictReleaseIterator(di);
4020
4021 if (dstkey) {
4022 /* Store the resulting set into the target */
4023 deleteKey(c->db,dstkey);
4024 dictAdd(c->db->dict,dstkey,dstset);
4025 incrRefCount(dstkey);
4026 }
4027
4028 if (!dstkey) {
4029 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
4030 } else {
4031 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4032 dictSize((dict*)dstset->ptr)));
4033 server.dirty++;
4034 }
4035 zfree(dv);
4036 }
4037
4038 static void sinterCommand(redisClient *c) {
4039 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4040 }
4041
4042 static void sinterstoreCommand(redisClient *c) {
4043 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4044 }
4045
4046 #define REDIS_OP_UNION 0
4047 #define REDIS_OP_DIFF 1
4048
4049 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4050 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4051 dictIterator *di;
4052 dictEntry *de;
4053 robj *dstset = NULL;
4054 int j, cardinality = 0;
4055
4056 for (j = 0; j < setsnum; j++) {
4057 robj *setobj;
4058
4059 setobj = dstkey ?
4060 lookupKeyWrite(c->db,setskeys[j]) :
4061 lookupKeyRead(c->db,setskeys[j]);
4062 if (!setobj) {
4063 dv[j] = NULL;
4064 continue;
4065 }
4066 if (setobj->type != REDIS_SET) {
4067 zfree(dv);
4068 addReply(c,shared.wrongtypeerr);
4069 return;
4070 }
4071 dv[j] = setobj->ptr;
4072 }
4073
4074 /* We need a temp set object to store our union. If the dstkey
4075 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4076 * this set object will be the resulting object to set into the target key*/
4077 dstset = createSetObject();
4078
4079 /* Iterate all the elements of all the sets, add every element a single
4080 * time to the result set */
4081 for (j = 0; j < setsnum; j++) {
4082 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4083 if (!dv[j]) continue; /* non existing keys are like empty sets */
4084
4085 di = dictGetIterator(dv[j]);
4086
4087 while((de = dictNext(di)) != NULL) {
4088 robj *ele;
4089
4090 /* dictAdd will not add the same element multiple times */
4091 ele = dictGetEntryKey(de);
4092 if (op == REDIS_OP_UNION || j == 0) {
4093 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4094 incrRefCount(ele);
4095 cardinality++;
4096 }
4097 } else if (op == REDIS_OP_DIFF) {
4098 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4099 cardinality--;
4100 }
4101 }
4102 }
4103 dictReleaseIterator(di);
4104
4105 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4106 }
4107
4108 /* Output the content of the resulting set, if not in STORE mode */
4109 if (!dstkey) {
4110 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4111 di = dictGetIterator(dstset->ptr);
4112 while((de = dictNext(di)) != NULL) {
4113 robj *ele;
4114
4115 ele = dictGetEntryKey(de);
4116 addReplyBulkLen(c,ele);
4117 addReply(c,ele);
4118 addReply(c,shared.crlf);
4119 }
4120 dictReleaseIterator(di);
4121 } else {
4122 /* If we have a target key where to store the resulting set
4123 * create this key with the result set inside */
4124 deleteKey(c->db,dstkey);
4125 dictAdd(c->db->dict,dstkey,dstset);
4126 incrRefCount(dstkey);
4127 }
4128
4129 /* Cleanup */
4130 if (!dstkey) {
4131 decrRefCount(dstset);
4132 } else {
4133 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4134 dictSize((dict*)dstset->ptr)));
4135 server.dirty++;
4136 }
4137 zfree(dv);
4138 }
4139
4140 static void sunionCommand(redisClient *c) {
4141 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4142 }
4143
4144 static void sunionstoreCommand(redisClient *c) {
4145 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4146 }
4147
4148 static void sdiffCommand(redisClient *c) {
4149 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4150 }
4151
4152 static void sdiffstoreCommand(redisClient *c) {
4153 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4154 }
4155
4156 /* ==================================== ZSets =============================== */
4157
4158 /* ZSETs are ordered sets using two data structures to hold the same elements
4159 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4160 * data structure.
4161 *
4162 * The elements are added to an hash table mapping Redis objects to scores.
4163 * At the same time the elements are added to a skip list mapping scores
4164 * to Redis objects (so objects are sorted by scores in this "view"). */
4165
4166 /* This skiplist implementation is almost a C translation of the original
4167 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4168 * Alternative to Balanced Trees", modified in three ways:
4169 * a) this implementation allows for repeated values.
4170 * b) the comparison is not just by key (our 'score') but by satellite data.
4171 * c) there is a back pointer, so it's a doubly linked list with the back
4172 * pointers being only at "level 1". This allows to traverse the list
4173 * from tail to head, useful for ZREVRANGE. */
4174
4175 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4176 zskiplistNode *zn = zmalloc(sizeof(*zn));
4177
4178 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4179 zn->score = score;
4180 zn->obj = obj;
4181 return zn;
4182 }
4183
4184 static zskiplist *zslCreate(void) {
4185 int j;
4186 zskiplist *zsl;
4187
4188 zsl = zmalloc(sizeof(*zsl));
4189 zsl->level = 1;
4190 zsl->length = 0;
4191 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4192 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4193 zsl->header->forward[j] = NULL;
4194 zsl->header->backward = NULL;
4195 zsl->tail = NULL;
4196 return zsl;
4197 }
4198
4199 static void zslFreeNode(zskiplistNode *node) {
4200 decrRefCount(node->obj);
4201 zfree(node->forward);
4202 zfree(node);
4203 }
4204
4205 static void zslFree(zskiplist *zsl) {
4206 zskiplistNode *node = zsl->header->forward[0], *next;
4207
4208 zfree(zsl->header->forward);
4209 zfree(zsl->header);
4210 while(node) {
4211 next = node->forward[0];
4212 zslFreeNode(node);
4213 node = next;
4214 }
4215 zfree(zsl);
4216 }
4217
4218 static int zslRandomLevel(void) {
4219 int level = 1;
4220 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4221 level += 1;
4222 return level;
4223 }
4224
4225 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4226 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4227 int i, level;
4228
4229 x = zsl->header;
4230 for (i = zsl->level-1; i >= 0; i--) {
4231 while (x->forward[i] &&
4232 (x->forward[i]->score < score ||
4233 (x->forward[i]->score == score &&
4234 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4235 x = x->forward[i];
4236 update[i] = x;
4237 }
4238 /* we assume the key is not already inside, since we allow duplicated
4239 * scores, and the re-insertion of score and redis object should never
4240 * happpen since the caller of zslInsert() should test in the hash table
4241 * if the element is already inside or not. */
4242 level = zslRandomLevel();
4243 if (level > zsl->level) {
4244 for (i = zsl->level; i < level; i++)
4245 update[i] = zsl->header;
4246 zsl->level = level;
4247 }
4248 x = zslCreateNode(level,score,obj);
4249 for (i = 0; i < level; i++) {
4250 x->forward[i] = update[i]->forward[i];
4251 update[i]->forward[i] = x;
4252 }
4253 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4254 if (x->forward[0])
4255 x->forward[0]->backward = x;
4256 else
4257 zsl->tail = x;
4258 zsl->length++;
4259 }
4260
4261 /* Delete an element with matching score/object from the skiplist. */
4262 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4263 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4264 int i;
4265
4266 x = zsl->header;
4267 for (i = zsl->level-1; i >= 0; i--) {
4268 while (x->forward[i] &&
4269 (x->forward[i]->score < score ||
4270 (x->forward[i]->score == score &&
4271 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4272 x = x->forward[i];
4273 update[i] = x;
4274 }
4275 /* We may have multiple elements with the same score, what we need
4276 * is to find the element with both the right score and object. */
4277 x = x->forward[0];
4278 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4279 for (i = 0; i < zsl->level; i++) {
4280 if (update[i]->forward[i] != x) break;
4281 update[i]->forward[i] = x->forward[i];
4282 }
4283 if (x->forward[0]) {
4284 x->forward[0]->backward = (x->backward == zsl->header) ?
4285 NULL : x->backward;
4286 } else {
4287 zsl->tail = x->backward;
4288 }
4289 zslFreeNode(x);
4290 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4291 zsl->level--;
4292 zsl->length--;
4293 return 1;
4294 } else {
4295 return 0; /* not found */
4296 }
4297 return 0; /* not found */
4298 }
4299
4300 /* Delete all the elements with score between min and max from the skiplist.
4301 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4302 * Note that this function takes the reference to the hash table view of the
4303 * sorted set, in order to remove the elements from the hash table too. */
4304 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4305 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4306 unsigned long removed = 0;
4307 int i;
4308
4309 x = zsl->header;
4310 for (i = zsl->level-1; i >= 0; i--) {
4311 while (x->forward[i] && x->forward[i]->score < min)
4312 x = x->forward[i];
4313 update[i] = x;
4314 }
4315 /* We may have multiple elements with the same score, what we need
4316 * is to find the element with both the right score and object. */
4317 x = x->forward[0];
4318 while (x && x->score <= max) {
4319 zskiplistNode *next;
4320
4321 for (i = 0; i < zsl->level; i++) {
4322 if (update[i]->forward[i] != x) break;
4323 update[i]->forward[i] = x->forward[i];
4324 }
4325 if (x->forward[0]) {
4326 x->forward[0]->backward = (x->backward == zsl->header) ?
4327 NULL : x->backward;
4328 } else {
4329 zsl->tail = x->backward;
4330 }
4331 next = x->forward[0];
4332 dictDelete(dict,x->obj);
4333 zslFreeNode(x);
4334 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4335 zsl->level--;
4336 zsl->length--;
4337 removed++;
4338 x = next;
4339 }
4340 return removed; /* not found */
4341 }
4342
4343 /* Find the first node having a score equal or greater than the specified one.
4344 * Returns NULL if there is no match. */
4345 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4346 zskiplistNode *x;
4347 int i;
4348
4349 x = zsl->header;
4350 for (i = zsl->level-1; i >= 0; i--) {
4351 while (x->forward[i] && x->forward[i]->score < score)
4352 x = x->forward[i];
4353 }
4354 /* We may have multiple elements with the same score, what we need
4355 * is to find the element with both the right score and object. */
4356 return x->forward[0];
4357 }
4358
4359 /* The actual Z-commands implementations */
4360
4361 /* This generic command implements both ZADD and ZINCRBY.
4362 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4363 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4364 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4365 robj *zsetobj;
4366 zset *zs;
4367 double *score;
4368
4369 zsetobj = lookupKeyWrite(c->db,key);
4370 if (zsetobj == NULL) {
4371 zsetobj = createZsetObject();
4372 dictAdd(c->db->dict,key,zsetobj);
4373 incrRefCount(key);
4374 } else {
4375 if (zsetobj->type != REDIS_ZSET) {
4376 addReply(c,shared.wrongtypeerr);
4377 return;
4378 }
4379 }
4380 zs = zsetobj->ptr;
4381
4382 /* Ok now since we implement both ZADD and ZINCRBY here the code
4383 * needs to handle the two different conditions. It's all about setting
4384 * '*score', that is, the new score to set, to the right value. */
4385 score = zmalloc(sizeof(double));
4386 if (doincrement) {
4387 dictEntry *de;
4388
4389 /* Read the old score. If the element was not present starts from 0 */
4390 de = dictFind(zs->dict,ele);
4391 if (de) {
4392 double *oldscore = dictGetEntryVal(de);
4393 *score = *oldscore + scoreval;
4394 } else {
4395 *score = scoreval;
4396 }
4397 } else {
4398 *score = scoreval;
4399 }
4400
4401 /* What follows is a simple remove and re-insert operation that is common
4402 * to both ZADD and ZINCRBY... */
4403 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4404 /* case 1: New element */
4405 incrRefCount(ele); /* added to hash */
4406 zslInsert(zs->zsl,*score,ele);
4407 incrRefCount(ele); /* added to skiplist */
4408 server.dirty++;
4409 if (doincrement)
4410 addReplyDouble(c,*score);
4411 else
4412 addReply(c,shared.cone);
4413 } else {
4414 dictEntry *de;
4415 double *oldscore;
4416
4417 /* case 2: Score update operation */
4418 de = dictFind(zs->dict,ele);
4419 redisAssert(de != NULL);
4420 oldscore = dictGetEntryVal(de);
4421 if (*score != *oldscore) {
4422 int deleted;
4423
4424 /* Remove and insert the element in the skip list with new score */
4425 deleted = zslDelete(zs->zsl,*oldscore,ele);
4426 redisAssert(deleted != 0);
4427 zslInsert(zs->zsl,*score,ele);
4428 incrRefCount(ele);
4429 /* Update the score in the hash table */
4430 dictReplace(zs->dict,ele,score);
4431 server.dirty++;
4432 } else {
4433 zfree(score);
4434 }
4435 if (doincrement)
4436 addReplyDouble(c,*score);
4437 else
4438 addReply(c,shared.czero);
4439 }
4440 }
4441
4442 static void zaddCommand(redisClient *c) {
4443 double scoreval;
4444
4445 scoreval = strtod(c->argv[2]->ptr,NULL);
4446 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4447 }
4448
4449 static void zincrbyCommand(redisClient *c) {
4450 double scoreval;
4451
4452 scoreval = strtod(c->argv[2]->ptr,NULL);
4453 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4454 }
4455
4456 static void zremCommand(redisClient *c) {
4457 robj *zsetobj;
4458 zset *zs;
4459
4460 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4461 if (zsetobj == NULL) {
4462 addReply(c,shared.czero);
4463 } else {
4464 dictEntry *de;
4465 double *oldscore;
4466 int deleted;
4467
4468 if (zsetobj->type != REDIS_ZSET) {
4469 addReply(c,shared.wrongtypeerr);
4470 return;
4471 }
4472 zs = zsetobj->ptr;
4473 de = dictFind(zs->dict,c->argv[2]);
4474 if (de == NULL) {
4475 addReply(c,shared.czero);
4476 return;
4477 }
4478 /* Delete from the skiplist */
4479 oldscore = dictGetEntryVal(de);
4480 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4481 redisAssert(deleted != 0);
4482
4483 /* Delete from the hash table */
4484 dictDelete(zs->dict,c->argv[2]);
4485 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4486 server.dirty++;
4487 addReply(c,shared.cone);
4488 }
4489 }
4490
4491 static void zremrangebyscoreCommand(redisClient *c) {
4492 double min = strtod(c->argv[2]->ptr,NULL);
4493 double max = strtod(c->argv[3]->ptr,NULL);
4494 robj *zsetobj;
4495 zset *zs;
4496
4497 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4498 if (zsetobj == NULL) {
4499 addReply(c,shared.czero);
4500 } else {
4501 long deleted;
4502
4503 if (zsetobj->type != REDIS_ZSET) {
4504 addReply(c,shared.wrongtypeerr);
4505 return;
4506 }
4507 zs = zsetobj->ptr;
4508 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4509 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4510 server.dirty += deleted;
4511 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4512 }
4513 }
4514
4515 static void zrangeGenericCommand(redisClient *c, int reverse) {
4516 robj *o;
4517 int start = atoi(c->argv[2]->ptr);
4518 int end = atoi(c->argv[3]->ptr);
4519
4520 o = lookupKeyRead(c->db,c->argv[1]);
4521 if (o == NULL) {
4522 addReply(c,shared.nullmultibulk);
4523 } else {
4524 if (o->type != REDIS_ZSET) {
4525 addReply(c,shared.wrongtypeerr);
4526 } else {
4527 zset *zsetobj = o->ptr;
4528 zskiplist *zsl = zsetobj->zsl;
4529 zskiplistNode *ln;
4530
4531 int llen = zsl->length;
4532 int rangelen, j;
4533 robj *ele;
4534
4535 /* convert negative indexes */
4536 if (start < 0) start = llen+start;
4537 if (end < 0) end = llen+end;
4538 if (start < 0) start = 0;
4539 if (end < 0) end = 0;
4540
4541 /* indexes sanity checks */
4542 if (start > end || start >= llen) {
4543 /* Out of range start or start > end result in empty list */
4544 addReply(c,shared.emptymultibulk);
4545 return;
4546 }
4547 if (end >= llen) end = llen-1;
4548 rangelen = (end-start)+1;
4549
4550 /* Return the result in form of a multi-bulk reply */
4551 if (reverse) {
4552 ln = zsl->tail;
4553 while (start--)
4554 ln = ln->backward;
4555 } else {
4556 ln = zsl->header->forward[0];
4557 while (start--)
4558 ln = ln->forward[0];
4559 }
4560
4561 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4562 for (j = 0; j < rangelen; j++) {
4563 ele = ln->obj;
4564 addReplyBulkLen(c,ele);
4565 addReply(c,ele);
4566 addReply(c,shared.crlf);
4567 ln = reverse ? ln->backward : ln->forward[0];
4568 }
4569 }
4570 }
4571 }
4572
4573 static void zrangeCommand(redisClient *c) {
4574 zrangeGenericCommand(c,0);
4575 }
4576
4577 static void zrevrangeCommand(redisClient *c) {
4578 zrangeGenericCommand(c,1);
4579 }
4580
4581 static void zrangebyscoreCommand(redisClient *c) {
4582 robj *o;
4583 double min = strtod(c->argv[2]->ptr,NULL);
4584 double max = strtod(c->argv[3]->ptr,NULL);
4585 int offset = 0, limit = -1;
4586
4587 if (c->argc != 4 && c->argc != 7) {
4588 addReplySds(c,
4589 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4590 return;
4591 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4592 addReply(c,shared.syntaxerr);
4593 return;
4594 } else if (c->argc == 7) {
4595 offset = atoi(c->argv[5]->ptr);
4596 limit = atoi(c->argv[6]->ptr);
4597 if (offset < 0) offset = 0;
4598 }
4599
4600 o = lookupKeyRead(c->db,c->argv[1]);
4601 if (o == NULL) {
4602 addReply(c,shared.nullmultibulk);
4603 } else {
4604 if (o->type != REDIS_ZSET) {
4605 addReply(c,shared.wrongtypeerr);
4606 } else {
4607 zset *zsetobj = o->ptr;
4608 zskiplist *zsl = zsetobj->zsl;
4609 zskiplistNode *ln;
4610 robj *ele, *lenobj;
4611 unsigned int rangelen = 0;
4612
4613 /* Get the first node with the score >= min */
4614 ln = zslFirstWithScore(zsl,min);
4615 if (ln == NULL) {
4616 /* No element matching the speciifed interval */
4617 addReply(c,shared.emptymultibulk);
4618 return;
4619 }
4620
4621 /* We don't know in advance how many matching elements there
4622 * are in the list, so we push this object that will represent
4623 * the multi-bulk length in the output buffer, and will "fix"
4624 * it later */
4625 lenobj = createObject(REDIS_STRING,NULL);
4626 addReply(c,lenobj);
4627 decrRefCount(lenobj);
4628
4629 while(ln && ln->score <= max) {
4630 if (offset) {
4631 offset--;
4632 ln = ln->forward[0];
4633 continue;
4634 }
4635 if (limit == 0) break;
4636 ele = ln->obj;
4637 addReplyBulkLen(c,ele);
4638 addReply(c,ele);
4639 addReply(c,shared.crlf);
4640 ln = ln->forward[0];
4641 rangelen++;
4642 if (limit > 0) limit--;
4643 }
4644 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4645 }
4646 }
4647 }
4648
4649 static void zcardCommand(redisClient *c) {
4650 robj *o;
4651 zset *zs;
4652
4653 o = lookupKeyRead(c->db,c->argv[1]);
4654 if (o == NULL) {
4655 addReply(c,shared.czero);
4656 return;
4657 } else {
4658 if (o->type != REDIS_ZSET) {
4659 addReply(c,shared.wrongtypeerr);
4660 } else {
4661 zs = o->ptr;
4662 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
4663 }
4664 }
4665 }
4666
4667 static void zscoreCommand(redisClient *c) {
4668 robj *o;
4669 zset *zs;
4670
4671 o = lookupKeyRead(c->db,c->argv[1]);
4672 if (o == NULL) {
4673 addReply(c,shared.nullbulk);
4674 return;
4675 } else {
4676 if (o->type != REDIS_ZSET) {
4677 addReply(c,shared.wrongtypeerr);
4678 } else {
4679 dictEntry *de;
4680
4681 zs = o->ptr;
4682 de = dictFind(zs->dict,c->argv[2]);
4683 if (!de) {
4684 addReply(c,shared.nullbulk);
4685 } else {
4686 double *score = dictGetEntryVal(de);
4687
4688 addReplyDouble(c,*score);
4689 }
4690 }
4691 }
4692 }
4693
4694 /* ========================= Non type-specific commands ==================== */
4695
4696 static void flushdbCommand(redisClient *c) {
4697 server.dirty += dictSize(c->db->dict);
4698 dictEmpty(c->db->dict);
4699 dictEmpty(c->db->expires);
4700 addReply(c,shared.ok);
4701 }
4702
4703 static void flushallCommand(redisClient *c) {
4704 server.dirty += emptyDb();
4705 addReply(c,shared.ok);
4706 rdbSave(server.dbfilename);
4707 server.dirty++;
4708 }
4709
4710 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4711 redisSortOperation *so = zmalloc(sizeof(*so));
4712 so->type = type;
4713 so->pattern = pattern;
4714 return so;
4715 }
4716
4717 /* Return the value associated to the key with a name obtained
4718 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4719 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4720 char *p;
4721 sds spat, ssub;
4722 robj keyobj;
4723 int prefixlen, sublen, postfixlen;
4724 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4725 struct {
4726 long len;
4727 long free;
4728 char buf[REDIS_SORTKEY_MAX+1];
4729 } keyname;
4730
4731 /* If the pattern is "#" return the substitution object itself in order
4732 * to implement the "SORT ... GET #" feature. */
4733 spat = pattern->ptr;
4734 if (spat[0] == '#' && spat[1] == '\0') {
4735 return subst;
4736 }
4737
4738 /* The substitution object may be specially encoded. If so we create
4739 * a decoded object on the fly. Otherwise getDecodedObject will just
4740 * increment the ref count, that we'll decrement later. */
4741 subst = getDecodedObject(subst);
4742
4743 ssub = subst->ptr;
4744 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4745 p = strchr(spat,'*');
4746 if (!p) {
4747 decrRefCount(subst);
4748 return NULL;
4749 }
4750
4751 prefixlen = p-spat;
4752 sublen = sdslen(ssub);
4753 postfixlen = sdslen(spat)-(prefixlen+1);
4754 memcpy(keyname.buf,spat,prefixlen);
4755 memcpy(keyname.buf+prefixlen,ssub,sublen);
4756 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4757 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4758 keyname.len = prefixlen+sublen+postfixlen;
4759
4760 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
4761 decrRefCount(subst);
4762
4763 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4764 return lookupKeyRead(db,&keyobj);
4765 }
4766
4767 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4768 * the additional parameter is not standard but a BSD-specific we have to
4769 * pass sorting parameters via the global 'server' structure */
4770 static int sortCompare(const void *s1, const void *s2) {
4771 const redisSortObject *so1 = s1, *so2 = s2;
4772 int cmp;
4773
4774 if (!server.sort_alpha) {
4775 /* Numeric sorting. Here it's trivial as we precomputed scores */
4776 if (so1->u.score > so2->u.score) {
4777 cmp = 1;
4778 } else if (so1->u.score < so2->u.score) {
4779 cmp = -1;
4780 } else {
4781 cmp = 0;
4782 }
4783 } else {
4784 /* Alphanumeric sorting */
4785 if (server.sort_bypattern) {
4786 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4787 /* At least one compare object is NULL */
4788 if (so1->u.cmpobj == so2->u.cmpobj)
4789 cmp = 0;
4790 else if (so1->u.cmpobj == NULL)
4791 cmp = -1;
4792 else
4793 cmp = 1;
4794 } else {
4795 /* We have both the objects, use strcoll */
4796 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4797 }
4798 } else {
4799 /* Compare elements directly */
4800 robj *dec1, *dec2;
4801
4802 dec1 = getDecodedObject(so1->obj);
4803 dec2 = getDecodedObject(so2->obj);
4804 cmp = strcoll(dec1->ptr,dec2->ptr);
4805 decrRefCount(dec1);
4806 decrRefCount(dec2);
4807 }
4808 }
4809 return server.sort_desc ? -cmp : cmp;
4810 }
4811
4812 /* The SORT command is the most complex command in Redis. Warning: this code
4813 * is optimized for speed and a bit less for readability */
4814 static void sortCommand(redisClient *c) {
4815 list *operations;
4816 int outputlen = 0;
4817 int desc = 0, alpha = 0;
4818 int limit_start = 0, limit_count = -1, start, end;
4819 int j, dontsort = 0, vectorlen;
4820 int getop = 0; /* GET operation counter */
4821 robj *sortval, *sortby = NULL, *storekey = NULL;
4822 redisSortObject *vector; /* Resulting vector to sort */
4823
4824 /* Lookup the key to sort. It must be of the right types */
4825 sortval = lookupKeyRead(c->db,c->argv[1]);
4826 if (sortval == NULL) {
4827 addReply(c,shared.nokeyerr);
4828 return;
4829 }
4830 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
4831 sortval->type != REDIS_ZSET)
4832 {
4833 addReply(c,shared.wrongtypeerr);
4834 return;
4835 }
4836
4837 /* Create a list of operations to perform for every sorted element.
4838 * Operations can be GET/DEL/INCR/DECR */
4839 operations = listCreate();
4840 listSetFreeMethod(operations,zfree);
4841 j = 2;
4842
4843 /* Now we need to protect sortval incrementing its count, in the future
4844 * SORT may have options able to overwrite/delete keys during the sorting
4845 * and the sorted key itself may get destroied */
4846 incrRefCount(sortval);
4847
4848 /* The SORT command has an SQL-alike syntax, parse it */
4849 while(j < c->argc) {
4850 int leftargs = c->argc-j-1;
4851 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4852 desc = 0;
4853 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4854 desc = 1;
4855 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4856 alpha = 1;
4857 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4858 limit_start = atoi(c->argv[j+1]->ptr);
4859 limit_count = atoi(c->argv[j+2]->ptr);
4860 j+=2;
4861 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4862 storekey = c->argv[j+1];
4863 j++;
4864 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4865 sortby = c->argv[j+1];
4866 /* If the BY pattern does not contain '*', i.e. it is constant,
4867 * we don't need to sort nor to lookup the weight keys. */
4868 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4869 j++;
4870 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4871 listAddNodeTail(operations,createSortOperation(
4872 REDIS_SORT_GET,c->argv[j+1]));
4873 getop++;
4874 j++;
4875 } else {
4876 decrRefCount(sortval);
4877 listRelease(operations);
4878 addReply(c,shared.syntaxerr);
4879 return;
4880 }
4881 j++;
4882 }
4883
4884 /* Load the sorting vector with all the objects to sort */
4885 switch(sortval->type) {
4886 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
4887 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
4888 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
4889 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
4890 }
4891 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4892 j = 0;
4893
4894 if (sortval->type == REDIS_LIST) {
4895 list *list = sortval->ptr;
4896 listNode *ln;
4897
4898 listRewind(list);
4899 while((ln = listYield(list))) {
4900 robj *ele = ln->value;
4901 vector[j].obj = ele;
4902 vector[j].u.score = 0;
4903 vector[j].u.cmpobj = NULL;
4904 j++;
4905 }
4906 } else {
4907 dict *set;
4908 dictIterator *di;
4909 dictEntry *setele;
4910
4911 if (sortval->type == REDIS_SET) {
4912 set = sortval->ptr;
4913 } else {
4914 zset *zs = sortval->ptr;
4915 set = zs->dict;
4916 }
4917
4918 di = dictGetIterator(set);
4919 while((setele = dictNext(di)) != NULL) {
4920 vector[j].obj = dictGetEntryKey(setele);
4921 vector[j].u.score = 0;
4922 vector[j].u.cmpobj = NULL;
4923 j++;
4924 }
4925 dictReleaseIterator(di);
4926 }
4927 redisAssert(j == vectorlen);
4928
4929 /* Now it's time to load the right scores in the sorting vector */
4930 if (dontsort == 0) {
4931 for (j = 0; j < vectorlen; j++) {
4932 if (sortby) {
4933 robj *byval;
4934
4935 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4936 if (!byval || byval->type != REDIS_STRING) continue;
4937 if (alpha) {
4938 vector[j].u.cmpobj = getDecodedObject(byval);
4939 } else {
4940 if (byval->encoding == REDIS_ENCODING_RAW) {
4941 vector[j].u.score = strtod(byval->ptr,NULL);
4942 } else {
4943 /* Don't need to decode the object if it's
4944 * integer-encoded (the only encoding supported) so
4945 * far. We can just cast it */
4946 if (byval->encoding == REDIS_ENCODING_INT) {
4947 vector[j].u.score = (long)byval->ptr;
4948 } else
4949 redisAssert(1 != 1);
4950 }
4951 }
4952 } else {
4953 if (!alpha) {
4954 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4955 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4956 else {
4957 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4958 vector[j].u.score = (long) vector[j].obj->ptr;
4959 else
4960 redisAssert(1 != 1);
4961 }
4962 }
4963 }
4964 }
4965 }
4966
4967 /* We are ready to sort the vector... perform a bit of sanity check
4968 * on the LIMIT option too. We'll use a partial version of quicksort. */
4969 start = (limit_start < 0) ? 0 : limit_start;
4970 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4971 if (start >= vectorlen) {
4972 start = vectorlen-1;
4973 end = vectorlen-2;
4974 }
4975 if (end >= vectorlen) end = vectorlen-1;
4976
4977 if (dontsort == 0) {
4978 server.sort_desc = desc;
4979 server.sort_alpha = alpha;
4980 server.sort_bypattern = sortby ? 1 : 0;
4981 if (sortby && (start != 0 || end != vectorlen-1))
4982 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4983 else
4984 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4985 }
4986
4987 /* Send command output to the output buffer, performing the specified
4988 * GET/DEL/INCR/DECR operations if any. */
4989 outputlen = getop ? getop*(end-start+1) : end-start+1;
4990 if (storekey == NULL) {
4991 /* STORE option not specified, sent the sorting result to client */
4992 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4993 for (j = start; j <= end; j++) {
4994 listNode *ln;
4995 if (!getop) {
4996 addReplyBulkLen(c,vector[j].obj);
4997 addReply(c,vector[j].obj);
4998 addReply(c,shared.crlf);
4999 }
5000 listRewind(operations);
5001 while((ln = listYield(operations))) {
5002 redisSortOperation *sop = ln->value;
5003 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5004 vector[j].obj);
5005
5006 if (sop->type == REDIS_SORT_GET) {
5007 if (!val || val->type != REDIS_STRING) {
5008 addReply(c,shared.nullbulk);
5009 } else {
5010 addReplyBulkLen(c,val);
5011 addReply(c,val);
5012 addReply(c,shared.crlf);
5013 }
5014 } else {
5015 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5016 }
5017 }
5018 }
5019 } else {
5020 robj *listObject = createListObject();
5021 list *listPtr = (list*) listObject->ptr;
5022
5023 /* STORE option specified, set the sorting result as a List object */
5024 for (j = start; j <= end; j++) {
5025 listNode *ln;
5026 if (!getop) {
5027 listAddNodeTail(listPtr,vector[j].obj);
5028 incrRefCount(vector[j].obj);
5029 }
5030 listRewind(operations);
5031 while((ln = listYield(operations))) {
5032 redisSortOperation *sop = ln->value;
5033 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5034 vector[j].obj);
5035
5036 if (sop->type == REDIS_SORT_GET) {
5037 if (!val || val->type != REDIS_STRING) {
5038 listAddNodeTail(listPtr,createStringObject("",0));
5039 } else {
5040 listAddNodeTail(listPtr,val);
5041 incrRefCount(val);
5042 }
5043 } else {
5044 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5045 }
5046 }
5047 }
5048 if (dictReplace(c->db->dict,storekey,listObject)) {
5049 incrRefCount(storekey);
5050 }
5051 /* Note: we add 1 because the DB is dirty anyway since even if the
5052 * SORT result is empty a new key is set and maybe the old content
5053 * replaced. */
5054 server.dirty += 1+outputlen;
5055 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5056 }
5057
5058 /* Cleanup */
5059 decrRefCount(sortval);
5060 listRelease(operations);
5061 for (j = 0; j < vectorlen; j++) {
5062 if (sortby && alpha && vector[j].u.cmpobj)
5063 decrRefCount(vector[j].u.cmpobj);
5064 }
5065 zfree(vector);
5066 }
5067
5068 /* Create the string returned by the INFO command. This is decoupled
5069 * by the INFO command itself as we need to report the same information
5070 * on memory corruption problems. */
5071 static sds genRedisInfoString(void) {
5072 sds info;
5073 time_t uptime = time(NULL)-server.stat_starttime;
5074 int j;
5075
5076 info = sdscatprintf(sdsempty(),
5077 "redis_version:%s\r\n"
5078 "arch_bits:%s\r\n"
5079 "multiplexing_api:%s\r\n"
5080 "uptime_in_seconds:%ld\r\n"
5081 "uptime_in_days:%ld\r\n"
5082 "connected_clients:%d\r\n"
5083 "connected_slaves:%d\r\n"
5084 "used_memory:%zu\r\n"
5085 "changes_since_last_save:%lld\r\n"
5086 "bgsave_in_progress:%d\r\n"
5087 "last_save_time:%ld\r\n"
5088 "bgrewriteaof_in_progress:%d\r\n"
5089 "total_connections_received:%lld\r\n"
5090 "total_commands_processed:%lld\r\n"
5091 "role:%s\r\n"
5092 ,REDIS_VERSION,
5093 (sizeof(long) == 8) ? "64" : "32",
5094 aeGetApiName(),
5095 uptime,
5096 uptime/(3600*24),
5097 listLength(server.clients)-listLength(server.slaves),
5098 listLength(server.slaves),
5099 server.usedmemory,
5100 server.dirty,
5101 server.bgsavechildpid != -1,
5102 server.lastsave,
5103 server.bgrewritechildpid != -1,
5104 server.stat_numconnections,
5105 server.stat_numcommands,
5106 server.masterhost == NULL ? "master" : "slave"
5107 );
5108 if (server.masterhost) {
5109 info = sdscatprintf(info,
5110 "master_host:%s\r\n"
5111 "master_port:%d\r\n"
5112 "master_link_status:%s\r\n"
5113 "master_last_io_seconds_ago:%d\r\n"
5114 ,server.masterhost,
5115 server.masterport,
5116 (server.replstate == REDIS_REPL_CONNECTED) ?
5117 "up" : "down",
5118 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5119 );
5120 }
5121 for (j = 0; j < server.dbnum; j++) {
5122 long long keys, vkeys;
5123
5124 keys = dictSize(server.db[j].dict);
5125 vkeys = dictSize(server.db[j].expires);
5126 if (keys || vkeys) {
5127 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5128 j, keys, vkeys);
5129 }
5130 }
5131 return info;
5132 }
5133
5134 static void infoCommand(redisClient *c) {
5135 sds info = genRedisInfoString();
5136 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5137 (unsigned long)sdslen(info)));
5138 addReplySds(c,info);
5139 addReply(c,shared.crlf);
5140 }
5141
5142 static void monitorCommand(redisClient *c) {
5143 /* ignore MONITOR if aleady slave or in monitor mode */
5144 if (c->flags & REDIS_SLAVE) return;
5145
5146 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5147 c->slaveseldb = 0;
5148 listAddNodeTail(server.monitors,c);
5149 addReply(c,shared.ok);
5150 }
5151
5152 /* ================================= Expire ================================= */
5153 static int removeExpire(redisDb *db, robj *key) {
5154 if (dictDelete(db->expires,key) == DICT_OK) {
5155 return 1;
5156 } else {
5157 return 0;
5158 }
5159 }
5160
5161 static int setExpire(redisDb *db, robj *key, time_t when) {
5162 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5163 return 0;
5164 } else {
5165 incrRefCount(key);
5166 return 1;
5167 }
5168 }
5169
5170 /* Return the expire time of the specified key, or -1 if no expire
5171 * is associated with this key (i.e. the key is non volatile) */
5172 static time_t getExpire(redisDb *db, robj *key) {
5173 dictEntry *de;
5174
5175 /* No expire? return ASAP */
5176 if (dictSize(db->expires) == 0 ||
5177 (de = dictFind(db->expires,key)) == NULL) return -1;
5178
5179 return (time_t) dictGetEntryVal(de);
5180 }
5181
5182 static int expireIfNeeded(redisDb *db, robj *key) {
5183 time_t when;
5184 dictEntry *de;
5185
5186 /* No expire? return ASAP */
5187 if (dictSize(db->expires) == 0 ||
5188 (de = dictFind(db->expires,key)) == NULL) return 0;
5189
5190 /* Lookup the expire */
5191 when = (time_t) dictGetEntryVal(de);
5192 if (time(NULL) <= when) return 0;
5193
5194 /* Delete the key */
5195 dictDelete(db->expires,key);
5196 return dictDelete(db->dict,key) == DICT_OK;
5197 }
5198
5199 static int deleteIfVolatile(redisDb *db, robj *key) {
5200 dictEntry *de;
5201
5202 /* No expire? return ASAP */
5203 if (dictSize(db->expires) == 0 ||
5204 (de = dictFind(db->expires,key)) == NULL) return 0;
5205
5206 /* Delete the key */
5207 server.dirty++;
5208 dictDelete(db->expires,key);
5209 return dictDelete(db->dict,key) == DICT_OK;
5210 }
5211
5212 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5213 dictEntry *de;
5214
5215 de = dictFind(c->db->dict,key);
5216 if (de == NULL) {
5217 addReply(c,shared.czero);
5218 return;
5219 }
5220 if (seconds < 0) {
5221 if (deleteKey(c->db,key)) server.dirty++;
5222 addReply(c, shared.cone);
5223 return;
5224 } else {
5225 time_t when = time(NULL)+seconds;
5226 if (setExpire(c->db,key,when)) {
5227 addReply(c,shared.cone);
5228 server.dirty++;
5229 } else {
5230 addReply(c,shared.czero);
5231 }
5232 return;
5233 }
5234 }
5235
5236 static void expireCommand(redisClient *c) {
5237 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5238 }
5239
5240 static void expireatCommand(redisClient *c) {
5241 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5242 }
5243
5244 static void ttlCommand(redisClient *c) {
5245 time_t expire;
5246 int ttl = -1;
5247
5248 expire = getExpire(c->db,c->argv[1]);
5249 if (expire != -1) {
5250 ttl = (int) (expire-time(NULL));
5251 if (ttl < 0) ttl = -1;
5252 }
5253 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5254 }
5255
5256 /* =============================== Replication ============================= */
5257
5258 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5259 ssize_t nwritten, ret = size;
5260 time_t start = time(NULL);
5261
5262 timeout++;
5263 while(size) {
5264 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5265 nwritten = write(fd,ptr,size);
5266 if (nwritten == -1) return -1;
5267 ptr += nwritten;
5268 size -= nwritten;
5269 }
5270 if ((time(NULL)-start) > timeout) {
5271 errno = ETIMEDOUT;
5272 return -1;
5273 }
5274 }
5275 return ret;
5276 }
5277
5278 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5279 ssize_t nread, totread = 0;
5280 time_t start = time(NULL);
5281
5282 timeout++;
5283 while(size) {
5284 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5285 nread = read(fd,ptr,size);
5286 if (nread == -1) return -1;
5287 ptr += nread;
5288 size -= nread;
5289 totread += nread;
5290 }
5291 if ((time(NULL)-start) > timeout) {
5292 errno = ETIMEDOUT;
5293 return -1;
5294 }
5295 }
5296 return totread;
5297 }
5298
5299 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5300 ssize_t nread = 0;
5301
5302 size--;
5303 while(size) {
5304 char c;
5305
5306 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5307 if (c == '\n') {
5308 *ptr = '\0';
5309 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5310 return nread;
5311 } else {
5312 *ptr++ = c;
5313 *ptr = '\0';
5314 nread++;
5315 }
5316 }
5317 return nread;
5318 }
5319
5320 static void syncCommand(redisClient *c) {
5321 /* ignore SYNC if aleady slave or in monitor mode */
5322 if (c->flags & REDIS_SLAVE) return;
5323
5324 /* SYNC can't be issued when the server has pending data to send to
5325 * the client about already issued commands. We need a fresh reply
5326 * buffer registering the differences between the BGSAVE and the current
5327 * dataset, so that we can copy to other slaves if needed. */
5328 if (listLength(c->reply) != 0) {
5329 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5330 return;
5331 }
5332
5333 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5334 /* Here we need to check if there is a background saving operation
5335 * in progress, or if it is required to start one */
5336 if (server.bgsavechildpid != -1) {
5337 /* Ok a background save is in progress. Let's check if it is a good
5338 * one for replication, i.e. if there is another slave that is
5339 * registering differences since the server forked to save */
5340 redisClient *slave;
5341 listNode *ln;
5342
5343 listRewind(server.slaves);
5344 while((ln = listYield(server.slaves))) {
5345 slave = ln->value;
5346 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5347 }
5348 if (ln) {
5349 /* Perfect, the server is already registering differences for
5350 * another slave. Set the right state, and copy the buffer. */
5351 listRelease(c->reply);
5352 c->reply = listDup(slave->reply);
5353 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5354 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5355 } else {
5356 /* No way, we need to wait for the next BGSAVE in order to
5357 * register differences */
5358 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5359 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5360 }
5361 } else {
5362 /* Ok we don't have a BGSAVE in progress, let's start one */
5363 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5364 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5365 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5366 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5367 return;
5368 }
5369 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5370 }
5371 c->repldbfd = -1;
5372 c->flags |= REDIS_SLAVE;
5373 c->slaveseldb = 0;
5374 listAddNodeTail(server.slaves,c);
5375 return;
5376 }
5377
5378 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5379 redisClient *slave = privdata;
5380 REDIS_NOTUSED(el);
5381 REDIS_NOTUSED(mask);
5382 char buf[REDIS_IOBUF_LEN];
5383 ssize_t nwritten, buflen;
5384
5385 if (slave->repldboff == 0) {
5386 /* Write the bulk write count before to transfer the DB. In theory here
5387 * we don't know how much room there is in the output buffer of the
5388 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5389 * operations) will never be smaller than the few bytes we need. */
5390 sds bulkcount;
5391
5392 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5393 slave->repldbsize);
5394 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5395 {
5396 sdsfree(bulkcount);
5397 freeClient(slave);
5398 return;
5399 }
5400 sdsfree(bulkcount);
5401 }
5402 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5403 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5404 if (buflen <= 0) {
5405 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5406 (buflen == 0) ? "premature EOF" : strerror(errno));
5407 freeClient(slave);
5408 return;
5409 }
5410 if ((nwritten = write(fd,buf,buflen)) == -1) {
5411 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5412 strerror(errno));
5413 freeClient(slave);
5414 return;
5415 }
5416 slave->repldboff += nwritten;
5417 if (slave->repldboff == slave->repldbsize) {
5418 close(slave->repldbfd);
5419 slave->repldbfd = -1;
5420 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5421 slave->replstate = REDIS_REPL_ONLINE;
5422 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5423 sendReplyToClient, slave) == AE_ERR) {
5424 freeClient(slave);
5425 return;
5426 }
5427 addReplySds(slave,sdsempty());
5428 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5429 }
5430 }
5431
5432 /* This function is called at the end of every backgrond saving.
5433 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5434 * otherwise REDIS_ERR is passed to the function.
5435 *
5436 * The goal of this function is to handle slaves waiting for a successful
5437 * background saving in order to perform non-blocking synchronization. */
5438 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5439 listNode *ln;
5440 int startbgsave = 0;
5441
5442 listRewind(server.slaves);
5443 while((ln = listYield(server.slaves))) {
5444 redisClient *slave = ln->value;
5445
5446 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5447 startbgsave = 1;
5448 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5449 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5450 struct redis_stat buf;
5451
5452 if (bgsaveerr != REDIS_OK) {
5453 freeClient(slave);
5454 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5455 continue;
5456 }
5457 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5458 redis_fstat(slave->repldbfd,&buf) == -1) {
5459 freeClient(slave);
5460 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5461 continue;
5462 }
5463 slave->repldboff = 0;
5464 slave->repldbsize = buf.st_size;
5465 slave->replstate = REDIS_REPL_SEND_BULK;
5466 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5467 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5468 freeClient(slave);
5469 continue;
5470 }
5471 }
5472 }
5473 if (startbgsave) {
5474 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5475 listRewind(server.slaves);
5476 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5477 while((ln = listYield(server.slaves))) {
5478 redisClient *slave = ln->value;
5479
5480 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5481 freeClient(slave);
5482 }
5483 }
5484 }
5485 }
5486
5487 static int syncWithMaster(void) {
5488 char buf[1024], tmpfile[256], authcmd[1024];
5489 int dumpsize;
5490 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5491 int dfd;
5492
5493 if (fd == -1) {
5494 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5495 strerror(errno));
5496 return REDIS_ERR;
5497 }
5498
5499 /* AUTH with the master if required. */
5500 if(server.masterauth) {
5501 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5502 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5503 close(fd);
5504 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5505 strerror(errno));
5506 return REDIS_ERR;
5507 }
5508 /* Read the AUTH result. */
5509 if (syncReadLine(fd,buf,1024,3600) == -1) {
5510 close(fd);
5511 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5512 strerror(errno));
5513 return REDIS_ERR;
5514 }
5515 if (buf[0] != '+') {
5516 close(fd);
5517 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5518 return REDIS_ERR;
5519 }
5520 }
5521
5522 /* Issue the SYNC command */
5523 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5524 close(fd);
5525 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5526 strerror(errno));
5527 return REDIS_ERR;
5528 }
5529 /* Read the bulk write count */
5530 if (syncReadLine(fd,buf,1024,3600) == -1) {
5531 close(fd);
5532 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5533 strerror(errno));
5534 return REDIS_ERR;
5535 }
5536 if (buf[0] != '$') {
5537 close(fd);
5538 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5539 return REDIS_ERR;
5540 }
5541 dumpsize = atoi(buf+1);
5542 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5543 /* Read the bulk write data on a temp file */
5544 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5545 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5546 if (dfd == -1) {
5547 close(fd);
5548 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5549 return REDIS_ERR;
5550 }
5551 while(dumpsize) {
5552 int nread, nwritten;
5553
5554 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5555 if (nread == -1) {
5556 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5557 strerror(errno));
5558 close(fd);
5559 close(dfd);
5560 return REDIS_ERR;
5561 }
5562 nwritten = write(dfd,buf,nread);
5563 if (nwritten == -1) {
5564 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5565 close(fd);
5566 close(dfd);
5567 return REDIS_ERR;
5568 }
5569 dumpsize -= nread;
5570 }
5571 close(dfd);
5572 if (rename(tmpfile,server.dbfilename) == -1) {
5573 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5574 unlink(tmpfile);
5575 close(fd);
5576 return REDIS_ERR;
5577 }
5578 emptyDb();
5579 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5580 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5581 close(fd);
5582 return REDIS_ERR;
5583 }
5584 server.master = createClient(fd);
5585 server.master->flags |= REDIS_MASTER;
5586 server.master->authenticated = 1;
5587 server.replstate = REDIS_REPL_CONNECTED;
5588 return REDIS_OK;
5589 }
5590
5591 static void slaveofCommand(redisClient *c) {
5592 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5593 !strcasecmp(c->argv[2]->ptr,"one")) {
5594 if (server.masterhost) {
5595 sdsfree(server.masterhost);
5596 server.masterhost = NULL;
5597 if (server.master) freeClient(server.master);
5598 server.replstate = REDIS_REPL_NONE;
5599 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5600 }
5601 } else {
5602 sdsfree(server.masterhost);
5603 server.masterhost = sdsdup(c->argv[1]->ptr);
5604 server.masterport = atoi(c->argv[2]->ptr);
5605 if (server.master) freeClient(server.master);
5606 server.replstate = REDIS_REPL_CONNECT;
5607 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5608 server.masterhost, server.masterport);
5609 }
5610 addReply(c,shared.ok);
5611 }
5612
5613 /* ============================ Maxmemory directive ======================== */
5614
5615 /* This function gets called when 'maxmemory' is set on the config file to limit
5616 * the max memory used by the server, and we are out of memory.
5617 * This function will try to, in order:
5618 *
5619 * - Free objects from the free list
5620 * - Try to remove keys with an EXPIRE set
5621 *
5622 * It is not possible to free enough memory to reach used-memory < maxmemory
5623 * the server will start refusing commands that will enlarge even more the
5624 * memory usage.
5625 */
5626 static void freeMemoryIfNeeded(void) {
5627 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5628 if (listLength(server.objfreelist)) {
5629 robj *o;
5630
5631 listNode *head = listFirst(server.objfreelist);
5632 o = listNodeValue(head);
5633 listDelNode(server.objfreelist,head);
5634 zfree(o);
5635 } else {
5636 int j, k, freed = 0;
5637
5638 for (j = 0; j < server.dbnum; j++) {
5639 int minttl = -1;
5640 robj *minkey = NULL;
5641 struct dictEntry *de;
5642
5643 if (dictSize(server.db[j].expires)) {
5644 freed = 1;
5645 /* From a sample of three keys drop the one nearest to
5646 * the natural expire */
5647 for (k = 0; k < 3; k++) {
5648 time_t t;
5649
5650 de = dictGetRandomKey(server.db[j].expires);
5651 t = (time_t) dictGetEntryVal(de);
5652 if (minttl == -1 || t < minttl) {
5653 minkey = dictGetEntryKey(de);
5654 minttl = t;
5655 }
5656 }
5657 deleteKey(server.db+j,minkey);
5658 }
5659 }
5660 if (!freed) return; /* nothing to free... */
5661 }
5662 }
5663 }
5664
5665 /* ============================== Append Only file ========================== */
5666
5667 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5668 sds buf = sdsempty();
5669 int j;
5670 ssize_t nwritten;
5671 time_t now;
5672 robj *tmpargv[3];
5673
5674 /* The DB this command was targetting is not the same as the last command
5675 * we appendend. To issue a SELECT command is needed. */
5676 if (dictid != server.appendseldb) {
5677 char seldb[64];
5678
5679 snprintf(seldb,sizeof(seldb),"%d",dictid);
5680 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
5681 (unsigned long)strlen(seldb),seldb);
5682 server.appendseldb = dictid;
5683 }
5684
5685 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5686 * EXPIREs into EXPIREATs calls */
5687 if (cmd->proc == expireCommand) {
5688 long when;
5689
5690 tmpargv[0] = createStringObject("EXPIREAT",8);
5691 tmpargv[1] = argv[1];
5692 incrRefCount(argv[1]);
5693 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5694 tmpargv[2] = createObject(REDIS_STRING,
5695 sdscatprintf(sdsempty(),"%ld",when));
5696 argv = tmpargv;
5697 }
5698
5699 /* Append the actual command */
5700 buf = sdscatprintf(buf,"*%d\r\n",argc);
5701 for (j = 0; j < argc; j++) {
5702 robj *o = argv[j];
5703
5704 o = getDecodedObject(o);
5705 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
5706 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5707 buf = sdscatlen(buf,"\r\n",2);
5708 decrRefCount(o);
5709 }
5710
5711 /* Free the objects from the modified argv for EXPIREAT */
5712 if (cmd->proc == expireCommand) {
5713 for (j = 0; j < 3; j++)
5714 decrRefCount(argv[j]);
5715 }
5716
5717 /* We want to perform a single write. This should be guaranteed atomic
5718 * at least if the filesystem we are writing is a real physical one.
5719 * While this will save us against the server being killed I don't think
5720 * there is much to do about the whole server stopping for power problems
5721 * or alike */
5722 nwritten = write(server.appendfd,buf,sdslen(buf));
5723 if (nwritten != (signed)sdslen(buf)) {
5724 /* Ooops, we are in troubles. The best thing to do for now is
5725 * to simply exit instead to give the illusion that everything is
5726 * working as expected. */
5727 if (nwritten == -1) {
5728 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5729 } else {
5730 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5731 }
5732 exit(1);
5733 }
5734 /* If a background append only file rewriting is in progress we want to
5735 * accumulate the differences between the child DB and the current one
5736 * in a buffer, so that when the child process will do its work we
5737 * can append the differences to the new append only file. */
5738 if (server.bgrewritechildpid != -1)
5739 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
5740
5741 sdsfree(buf);
5742 now = time(NULL);
5743 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5744 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5745 now-server.lastfsync > 1))
5746 {
5747 fsync(server.appendfd); /* Let's try to get this data on the disk */
5748 server.lastfsync = now;
5749 }
5750 }
5751
5752 /* In Redis commands are always executed in the context of a client, so in
5753 * order to load the append only file we need to create a fake client. */
5754 static struct redisClient *createFakeClient(void) {
5755 struct redisClient *c = zmalloc(sizeof(*c));
5756
5757 selectDb(c,0);
5758 c->fd = -1;
5759 c->querybuf = sdsempty();
5760 c->argc = 0;
5761 c->argv = NULL;
5762 c->flags = 0;
5763 /* We set the fake client as a slave waiting for the synchronization
5764 * so that Redis will not try to send replies to this client. */
5765 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5766 c->reply = listCreate();
5767 listSetFreeMethod(c->reply,decrRefCount);
5768 listSetDupMethod(c->reply,dupClientReplyValue);
5769 return c;
5770 }
5771
5772 static void freeFakeClient(struct redisClient *c) {
5773 sdsfree(c->querybuf);
5774 listRelease(c->reply);
5775 zfree(c);
5776 }
5777
5778 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5779 * error (the append only file is zero-length) REDIS_ERR is returned. On
5780 * fatal error an error message is logged and the program exists. */
5781 int loadAppendOnlyFile(char *filename) {
5782 struct redisClient *fakeClient;
5783 FILE *fp = fopen(filename,"r");
5784 struct redis_stat sb;
5785
5786 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5787 return REDIS_ERR;
5788
5789 if (fp == NULL) {
5790 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5791 exit(1);
5792 }
5793
5794 fakeClient = createFakeClient();
5795 while(1) {
5796 int argc, j;
5797 unsigned long len;
5798 robj **argv;
5799 char buf[128];
5800 sds argsds;
5801 struct redisCommand *cmd;
5802
5803 if (fgets(buf,sizeof(buf),fp) == NULL) {
5804 if (feof(fp))
5805 break;
5806 else
5807 goto readerr;
5808 }
5809 if (buf[0] != '*') goto fmterr;
5810 argc = atoi(buf+1);
5811 argv = zmalloc(sizeof(robj*)*argc);
5812 for (j = 0; j < argc; j++) {
5813 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5814 if (buf[0] != '$') goto fmterr;
5815 len = strtol(buf+1,NULL,10);
5816 argsds = sdsnewlen(NULL,len);
5817 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
5818 argv[j] = createObject(REDIS_STRING,argsds);
5819 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5820 }
5821
5822 /* Command lookup */
5823 cmd = lookupCommand(argv[0]->ptr);
5824 if (!cmd) {
5825 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5826 exit(1);
5827 }
5828 /* Try object sharing and encoding */
5829 if (server.shareobjects) {
5830 int j;
5831 for(j = 1; j < argc; j++)
5832 argv[j] = tryObjectSharing(argv[j]);
5833 }
5834 if (cmd->flags & REDIS_CMD_BULK)
5835 tryObjectEncoding(argv[argc-1]);
5836 /* Run the command in the context of a fake client */
5837 fakeClient->argc = argc;
5838 fakeClient->argv = argv;
5839 cmd->proc(fakeClient);
5840 /* Discard the reply objects list from the fake client */
5841 while(listLength(fakeClient->reply))
5842 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5843 /* Clean up, ready for the next command */
5844 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5845 zfree(argv);
5846 }
5847 fclose(fp);
5848 freeFakeClient(fakeClient);
5849 return REDIS_OK;
5850
5851 readerr:
5852 if (feof(fp)) {
5853 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5854 } else {
5855 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5856 }
5857 exit(1);
5858 fmterr:
5859 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5860 exit(1);
5861 }
5862
5863 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5864 static int fwriteBulk(FILE *fp, robj *obj) {
5865 char buf[128];
5866 obj = getDecodedObject(obj);
5867 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
5868 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
5869 if (fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0) goto err;
5870 if (fwrite("\r\n",2,1,fp) == 0) goto err;
5871 decrRefCount(obj);
5872 return 1;
5873 err:
5874 decrRefCount(obj);
5875 return 0;
5876 }
5877
5878 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5879 static int fwriteBulkDouble(FILE *fp, double d) {
5880 char buf[128], dbuf[128];
5881
5882 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
5883 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
5884 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5885 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
5886 return 1;
5887 }
5888
5889 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5890 static int fwriteBulkLong(FILE *fp, long l) {
5891 char buf[128], lbuf[128];
5892
5893 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
5894 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
5895 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5896 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
5897 return 1;
5898 }
5899
5900 /* Write a sequence of commands able to fully rebuild the dataset into
5901 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5902 static int rewriteAppendOnlyFile(char *filename) {
5903 dictIterator *di = NULL;
5904 dictEntry *de;
5905 FILE *fp;
5906 char tmpfile[256];
5907 int j;
5908 time_t now = time(NULL);
5909
5910 /* Note that we have to use a different temp name here compared to the
5911 * one used by rewriteAppendOnlyFileBackground() function. */
5912 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
5913 fp = fopen(tmpfile,"w");
5914 if (!fp) {
5915 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
5916 return REDIS_ERR;
5917 }
5918 for (j = 0; j < server.dbnum; j++) {
5919 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
5920 redisDb *db = server.db+j;
5921 dict *d = db->dict;
5922 if (dictSize(d) == 0) continue;
5923 di = dictGetIterator(d);
5924 if (!di) {
5925 fclose(fp);
5926 return REDIS_ERR;
5927 }
5928
5929 /* SELECT the new DB */
5930 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
5931 if (fwriteBulkLong(fp,j) == 0) goto werr;
5932
5933 /* Iterate this DB writing every entry */
5934 while((de = dictNext(di)) != NULL) {
5935 robj *key = dictGetEntryKey(de);
5936 robj *o = dictGetEntryVal(de);
5937 time_t expiretime = getExpire(db,key);
5938
5939 /* Save the key and associated value */
5940 if (o->type == REDIS_STRING) {
5941 /* Emit a SET command */
5942 char cmd[]="*3\r\n$3\r\nSET\r\n";
5943 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5944 /* Key and value */
5945 if (fwriteBulk(fp,key) == 0) goto werr;
5946 if (fwriteBulk(fp,o) == 0) goto werr;
5947 } else if (o->type == REDIS_LIST) {
5948 /* Emit the RPUSHes needed to rebuild the list */
5949 list *list = o->ptr;
5950 listNode *ln;
5951
5952 listRewind(list);
5953 while((ln = listYield(list))) {
5954 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
5955 robj *eleobj = listNodeValue(ln);
5956
5957 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5958 if (fwriteBulk(fp,key) == 0) goto werr;
5959 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5960 }
5961 } else if (o->type == REDIS_SET) {
5962 /* Emit the SADDs needed to rebuild the set */
5963 dict *set = o->ptr;
5964 dictIterator *di = dictGetIterator(set);
5965 dictEntry *de;
5966
5967 while((de = dictNext(di)) != NULL) {
5968 char cmd[]="*3\r\n$4\r\nSADD\r\n";
5969 robj *eleobj = dictGetEntryKey(de);
5970
5971 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5972 if (fwriteBulk(fp,key) == 0) goto werr;
5973 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5974 }
5975 dictReleaseIterator(di);
5976 } else if (o->type == REDIS_ZSET) {
5977 /* Emit the ZADDs needed to rebuild the sorted set */
5978 zset *zs = o->ptr;
5979 dictIterator *di = dictGetIterator(zs->dict);
5980 dictEntry *de;
5981
5982 while((de = dictNext(di)) != NULL) {
5983 char cmd[]="*4\r\n$4\r\nZADD\r\n";
5984 robj *eleobj = dictGetEntryKey(de);
5985 double *score = dictGetEntryVal(de);
5986
5987 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5988 if (fwriteBulk(fp,key) == 0) goto werr;
5989 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
5990 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5991 }
5992 dictReleaseIterator(di);
5993 } else {
5994 redisAssert(0 != 0);
5995 }
5996 /* Save the expire time */
5997 if (expiretime != -1) {
5998 char cmd[]="*3\r\n$6\r\nEXPIRE\r\n";
5999 /* If this key is already expired skip it */
6000 if (expiretime < now) continue;
6001 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6002 if (fwriteBulk(fp,key) == 0) goto werr;
6003 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6004 }
6005 }
6006 dictReleaseIterator(di);
6007 }
6008
6009 /* Make sure data will not remain on the OS's output buffers */
6010 fflush(fp);
6011 fsync(fileno(fp));
6012 fclose(fp);
6013
6014 /* Use RENAME to make sure the DB file is changed atomically only
6015 * if the generate DB file is ok. */
6016 if (rename(tmpfile,filename) == -1) {
6017 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6018 unlink(tmpfile);
6019 return REDIS_ERR;
6020 }
6021 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6022 return REDIS_OK;
6023
6024 werr:
6025 fclose(fp);
6026 unlink(tmpfile);
6027 redisLog(REDIS_WARNING,"Write error writing append only fileon disk: %s", strerror(errno));
6028 if (di) dictReleaseIterator(di);
6029 return REDIS_ERR;
6030 }
6031
6032 /* This is how rewriting of the append only file in background works:
6033 *
6034 * 1) The user calls BGREWRITEAOF
6035 * 2) Redis calls this function, that forks():
6036 * 2a) the child rewrite the append only file in a temp file.
6037 * 2b) the parent accumulates differences in server.bgrewritebuf.
6038 * 3) When the child finished '2a' exists.
6039 * 4) The parent will trap the exit code, if it's OK, will append the
6040 * data accumulated into server.bgrewritebuf into the temp file, and
6041 * finally will rename(2) the temp file in the actual file name.
6042 * The the new file is reopened as the new append only file. Profit!
6043 */
6044 static int rewriteAppendOnlyFileBackground(void) {
6045 pid_t childpid;
6046
6047 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6048 if ((childpid = fork()) == 0) {
6049 /* Child */
6050 char tmpfile[256];
6051 close(server.fd);
6052
6053 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6054 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6055 exit(0);
6056 } else {
6057 exit(1);
6058 }
6059 } else {
6060 /* Parent */
6061 if (childpid == -1) {
6062 redisLog(REDIS_WARNING,
6063 "Can't rewrite append only file in background: fork: %s",
6064 strerror(errno));
6065 return REDIS_ERR;
6066 }
6067 redisLog(REDIS_NOTICE,
6068 "Background append only file rewriting started by pid %d",childpid);
6069 server.bgrewritechildpid = childpid;
6070 /* We set appendseldb to -1 in order to force the next call to the
6071 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6072 * accumulated by the parent into server.bgrewritebuf will start
6073 * with a SELECT statement and it will be safe to merge. */
6074 server.appendseldb = -1;
6075 return REDIS_OK;
6076 }
6077 return REDIS_OK; /* unreached */
6078 }
6079
6080 static void bgrewriteaofCommand(redisClient *c) {
6081 if (server.bgrewritechildpid != -1) {
6082 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6083 return;
6084 }
6085 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6086 char *status = "+Background append only file rewriting started\r\n";
6087 addReplySds(c,sdsnew(status));
6088 } else {
6089 addReply(c,shared.err);
6090 }
6091 }
6092
6093 static void aofRemoveTempFile(pid_t childpid) {
6094 char tmpfile[256];
6095
6096 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6097 unlink(tmpfile);
6098 }
6099
6100 /* ================================= Debugging ============================== */
6101
6102 static void debugCommand(redisClient *c) {
6103 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6104 *((char*)-1) = 'x';
6105 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6106 if (rdbSave(server.dbfilename) != REDIS_OK) {
6107 addReply(c,shared.err);
6108 return;
6109 }
6110 emptyDb();
6111 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6112 addReply(c,shared.err);
6113 return;
6114 }
6115 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6116 addReply(c,shared.ok);
6117 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6118 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6119 robj *key, *val;
6120
6121 if (!de) {
6122 addReply(c,shared.nokeyerr);
6123 return;
6124 }
6125 key = dictGetEntryKey(de);
6126 val = dictGetEntryVal(de);
6127 addReplySds(c,sdscatprintf(sdsempty(),
6128 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6129 (void*)key, key->refcount, (void*)val, val->refcount,
6130 val->encoding));
6131 } else {
6132 addReplySds(c,sdsnew(
6133 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6134 }
6135 }
6136
6137 static void _redisAssert(char *estr) {
6138 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6139 redisLog(REDIS_WARNING,"==> %s\n",estr);
6140 #ifdef HAVE_BACKTRACE
6141 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6142 *((char*)-1) = 'x';
6143 #endif
6144 }
6145
6146 /* =================================== Main! ================================ */
6147
6148 #ifdef __linux__
6149 int linuxOvercommitMemoryValue(void) {
6150 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6151 char buf[64];
6152
6153 if (!fp) return -1;
6154 if (fgets(buf,64,fp) == NULL) {
6155 fclose(fp);
6156 return -1;
6157 }
6158 fclose(fp);
6159
6160 return atoi(buf);
6161 }
6162
6163 void linuxOvercommitMemoryWarning(void) {
6164 if (linuxOvercommitMemoryValue() == 0) {
6165 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6166 }
6167 }
6168 #endif /* __linux__ */
6169
6170 static void daemonize(void) {
6171 int fd;
6172 FILE *fp;
6173
6174 if (fork() != 0) exit(0); /* parent exits */
6175 printf("New pid: %d\n", getpid());
6176 setsid(); /* create a new session */
6177
6178 /* Every output goes to /dev/null. If Redis is daemonized but
6179 * the 'logfile' is set to 'stdout' in the configuration file
6180 * it will not log at all. */
6181 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6182 dup2(fd, STDIN_FILENO);
6183 dup2(fd, STDOUT_FILENO);
6184 dup2(fd, STDERR_FILENO);
6185 if (fd > STDERR_FILENO) close(fd);
6186 }
6187 /* Try to write the pid file */
6188 fp = fopen(server.pidfile,"w");
6189 if (fp) {
6190 fprintf(fp,"%d\n",getpid());
6191 fclose(fp);
6192 }
6193 }
6194
6195 int main(int argc, char **argv) {
6196 initServerConfig();
6197 if (argc == 2) {
6198 resetServerSaveParams();
6199 loadServerConfig(argv[1]);
6200 } else if (argc > 2) {
6201 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6202 exit(1);
6203 } else {
6204 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6205 }
6206 if (server.daemonize) daemonize();
6207 initServer();
6208 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6209 #ifdef __linux__
6210 linuxOvercommitMemoryWarning();
6211 #endif
6212 if (server.appendonly) {
6213 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6214 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6215 } else {
6216 if (rdbLoad(server.dbfilename) == REDIS_OK)
6217 redisLog(REDIS_NOTICE,"DB loaded from disk");
6218 }
6219 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6220 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6221 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6222 aeMain(server.el);
6223 aeDeleteEventLoop(server.el);
6224 return 0;
6225 }
6226
6227 /* ============================= Backtrace support ========================= */
6228
6229 #ifdef HAVE_BACKTRACE
6230 static char *findFuncName(void *pointer, unsigned long *offset);
6231
6232 static void *getMcontextEip(ucontext_t *uc) {
6233 #if defined(__FreeBSD__)
6234 return (void*) uc->uc_mcontext.mc_eip;
6235 #elif defined(__dietlibc__)
6236 return (void*) uc->uc_mcontext.eip;
6237 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6238 #if __x86_64__
6239 return (void*) uc->uc_mcontext->__ss.__rip;
6240 #else
6241 return (void*) uc->uc_mcontext->__ss.__eip;
6242 #endif
6243 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6244 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6245 return (void*) uc->uc_mcontext->__ss.__rip;
6246 #else
6247 return (void*) uc->uc_mcontext->__ss.__eip;
6248 #endif
6249 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6250 return (void*) uc->uc_mcontext.gregs[REG_EIP];
6251 #elif defined(__ia64__) /* Linux IA64 */
6252 return (void*) uc->uc_mcontext.sc_ip;
6253 #else
6254 return NULL;
6255 #endif
6256 }
6257
6258 static void segvHandler(int sig, siginfo_t *info, void *secret) {
6259 void *trace[100];
6260 char **messages = NULL;
6261 int i, trace_size = 0;
6262 unsigned long offset=0;
6263 ucontext_t *uc = (ucontext_t*) secret;
6264 sds infostring;
6265 REDIS_NOTUSED(info);
6266
6267 redisLog(REDIS_WARNING,
6268 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6269 infostring = genRedisInfoString();
6270 redisLog(REDIS_WARNING, "%s",infostring);
6271 /* It's not safe to sdsfree() the returned string under memory
6272 * corruption conditions. Let it leak as we are going to abort */
6273
6274 trace_size = backtrace(trace, 100);
6275 /* overwrite sigaction with caller's address */
6276 if (getMcontextEip(uc) != NULL) {
6277 trace[1] = getMcontextEip(uc);
6278 }
6279 messages = backtrace_symbols(trace, trace_size);
6280
6281 for (i=1; i<trace_size; ++i) {
6282 char *fn = findFuncName(trace[i], &offset), *p;
6283
6284 p = strchr(messages[i],'+');
6285 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6286 redisLog(REDIS_WARNING,"%s", messages[i]);
6287 } else {
6288 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6289 }
6290 }
6291 // free(messages); Don't call free() with possibly corrupted memory.
6292 exit(0);
6293 }
6294
6295 static void setupSigSegvAction(void) {
6296 struct sigaction act;
6297
6298 sigemptyset (&act.sa_mask);
6299 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6300 * is used. Otherwise, sa_handler is used */
6301 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6302 act.sa_sigaction = segvHandler;
6303 sigaction (SIGSEGV, &act, NULL);
6304 sigaction (SIGBUS, &act, NULL);
6305 sigaction (SIGFPE, &act, NULL);
6306 sigaction (SIGILL, &act, NULL);
6307 sigaction (SIGBUS, &act, NULL);
6308 return;
6309 }
6310
6311 #include "staticsymbols.h"
6312 /* This function try to convert a pointer into a function name. It's used in
6313 * oreder to provide a backtrace under segmentation fault that's able to
6314 * display functions declared as static (otherwise the backtrace is useless). */
6315 static char *findFuncName(void *pointer, unsigned long *offset){
6316 int i, ret = -1;
6317 unsigned long off, minoff = 0;
6318
6319 /* Try to match against the Symbol with the smallest offset */
6320 for (i=0; symsTable[i].pointer; i++) {
6321 unsigned long lp = (unsigned long) pointer;
6322
6323 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6324 off=lp-symsTable[i].pointer;
6325 if (ret < 0 || off < minoff) {
6326 minoff=off;
6327 ret=i;
6328 }
6329 }
6330 }
6331 if (ret == -1) return NULL;
6332 *offset = minoff;
6333 return symsTable[ret].name;
6334 }
6335 #else /* HAVE_BACKTRACE */
6336 static void setupSigSegvAction(void) {
6337 }
6338 #endif /* HAVE_BACKTRACE */
6339
6340
6341
6342 /* The End */
6343
6344
6345