]> git.saurik.com Git - redis.git/blob - redis.c
Fixed some subtle bug in the command processing code almost impossible to spot in...
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.1.91"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Client flags */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
165
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
174
175 /* List related stuff */
176 #define REDIS_HEAD 0
177 #define REDIS_TAIL 1
178
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
184
185 /* Log levels */
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
189
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
192
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
195
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
200
201 /* We can print the stacktrace, so our assert is defined this way: */
202 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
203 static void _redisAssert(char *estr);
204
205 /*================================= Data types ============================== */
206
207 /* A redis object, that is a type able to hold a string / list / set */
208 typedef struct redisObject {
209 void *ptr;
210 unsigned char type;
211 unsigned char encoding;
212 unsigned char notused[2];
213 int refcount;
214 } robj;
215
216 /* Macro used to initalize a Redis object allocated on the stack.
217 * Note that this macro is taken near the structure definition to make sure
218 * we'll update it when the structure is changed, to avoid bugs like
219 * bug #85 introduced exactly in this way. */
220 #define initStaticStringObject(_var,_ptr) do { \
221 _var.refcount = 1; \
222 _var.type = REDIS_STRING; \
223 _var.encoding = REDIS_ENCODING_RAW; \
224 _var.ptr = _ptr; \
225 } while(0);
226
227 typedef struct redisDb {
228 dict *dict;
229 dict *expires;
230 int id;
231 } redisDb;
232
233 /* With multiplexing we need to take per-clinet state.
234 * Clients are taken in a liked list. */
235 typedef struct redisClient {
236 int fd;
237 redisDb *db;
238 int dictid;
239 sds querybuf;
240 robj **argv, **mbargv;
241 int argc, mbargc;
242 int bulklen; /* bulk read len. -1 if not in bulk read mode */
243 int multibulk; /* multi bulk command format active */
244 list *reply;
245 int sentlen;
246 time_t lastinteraction; /* time of the last interaction, used for timeout */
247 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
248 int slaveseldb; /* slave selected db, if this client is a slave */
249 int authenticated; /* when requirepass is non-NULL */
250 int replstate; /* replication state if this is a slave */
251 int repldbfd; /* replication DB file descriptor */
252 long repldboff; /* replication DB file offset */
253 off_t repldbsize; /* replication DB file size */
254 } redisClient;
255
256 struct saveparam {
257 time_t seconds;
258 int changes;
259 };
260
261 /* Global server state structure */
262 struct redisServer {
263 int port;
264 int fd;
265 redisDb *db;
266 dict *sharingpool;
267 unsigned int sharingpoolsize;
268 long long dirty; /* changes to DB from the last save */
269 list *clients;
270 list *slaves, *monitors;
271 char neterr[ANET_ERR_LEN];
272 aeEventLoop *el;
273 int cronloops; /* number of times the cron function run */
274 list *objfreelist; /* A list of freed objects to avoid malloc() */
275 time_t lastsave; /* Unix time of last save succeeede */
276 size_t usedmemory; /* Used memory in megabytes */
277 /* Fields used only for stats */
278 time_t stat_starttime; /* server start time */
279 long long stat_numcommands; /* number of processed commands */
280 long long stat_numconnections; /* number of connections received */
281 /* Configuration */
282 int verbosity;
283 int glueoutputbuf;
284 int maxidletime;
285 int dbnum;
286 int daemonize;
287 int appendonly;
288 int appendfsync;
289 time_t lastfsync;
290 int appendfd;
291 int appendseldb;
292 char *pidfile;
293 pid_t bgsavechildpid;
294 pid_t bgrewritechildpid;
295 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
296 struct saveparam *saveparams;
297 int saveparamslen;
298 char *logfile;
299 char *bindaddr;
300 char *dbfilename;
301 char *appendfilename;
302 char *requirepass;
303 int shareobjects;
304 /* Replication related */
305 int isslave;
306 char *masterauth;
307 char *masterhost;
308 int masterport;
309 redisClient *master; /* client that is master for this slave */
310 int replstate;
311 unsigned int maxclients;
312 unsigned long maxmemory;
313 /* Sort parameters - qsort_r() is only available under BSD so we
314 * have to take this state global, in order to pass it to sortCompare() */
315 int sort_desc;
316 int sort_alpha;
317 int sort_bypattern;
318 };
319
320 typedef void redisCommandProc(redisClient *c);
321 struct redisCommand {
322 char *name;
323 redisCommandProc *proc;
324 int arity;
325 int flags;
326 };
327
328 struct redisFunctionSym {
329 char *name;
330 unsigned long pointer;
331 };
332
333 typedef struct _redisSortObject {
334 robj *obj;
335 union {
336 double score;
337 robj *cmpobj;
338 } u;
339 } redisSortObject;
340
341 typedef struct _redisSortOperation {
342 int type;
343 robj *pattern;
344 } redisSortOperation;
345
346 /* ZSETs use a specialized version of Skiplists */
347
348 typedef struct zskiplistNode {
349 struct zskiplistNode **forward;
350 struct zskiplistNode *backward;
351 double score;
352 robj *obj;
353 } zskiplistNode;
354
355 typedef struct zskiplist {
356 struct zskiplistNode *header, *tail;
357 unsigned long length;
358 int level;
359 } zskiplist;
360
361 typedef struct zset {
362 dict *dict;
363 zskiplist *zsl;
364 } zset;
365
366 /* Our shared "common" objects */
367
368 struct sharedObjectsStruct {
369 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
370 *colon, *nullbulk, *nullmultibulk,
371 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
372 *outofrangeerr, *plus,
373 *select0, *select1, *select2, *select3, *select4,
374 *select5, *select6, *select7, *select8, *select9;
375 } shared;
376
377 /* Global vars that are actally used as constants. The following double
378 * values are used for double on-disk serialization, and are initialized
379 * at runtime to avoid strange compiler optimizations. */
380
381 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
382
383 /*================================ Prototypes =============================== */
384
385 static void freeStringObject(robj *o);
386 static void freeListObject(robj *o);
387 static void freeSetObject(robj *o);
388 static void decrRefCount(void *o);
389 static robj *createObject(int type, void *ptr);
390 static void freeClient(redisClient *c);
391 static int rdbLoad(char *filename);
392 static void addReply(redisClient *c, robj *obj);
393 static void addReplySds(redisClient *c, sds s);
394 static void incrRefCount(robj *o);
395 static int rdbSaveBackground(char *filename);
396 static robj *createStringObject(char *ptr, size_t len);
397 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
398 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
399 static int syncWithMaster(void);
400 static robj *tryObjectSharing(robj *o);
401 static int tryObjectEncoding(robj *o);
402 static robj *getDecodedObject(robj *o);
403 static int removeExpire(redisDb *db, robj *key);
404 static int expireIfNeeded(redisDb *db, robj *key);
405 static int deleteIfVolatile(redisDb *db, robj *key);
406 static int deleteKey(redisDb *db, robj *key);
407 static time_t getExpire(redisDb *db, robj *key);
408 static int setExpire(redisDb *db, robj *key, time_t when);
409 static void updateSlavesWaitingBgsave(int bgsaveerr);
410 static void freeMemoryIfNeeded(void);
411 static int processCommand(redisClient *c);
412 static void setupSigSegvAction(void);
413 static void rdbRemoveTempFile(pid_t childpid);
414 static void aofRemoveTempFile(pid_t childpid);
415 static size_t stringObjectLen(robj *o);
416 static void processInputBuffer(redisClient *c);
417 static zskiplist *zslCreate(void);
418 static void zslFree(zskiplist *zsl);
419 static void zslInsert(zskiplist *zsl, double score, robj *obj);
420 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
421
422 static void authCommand(redisClient *c);
423 static void pingCommand(redisClient *c);
424 static void echoCommand(redisClient *c);
425 static void setCommand(redisClient *c);
426 static void setnxCommand(redisClient *c);
427 static void getCommand(redisClient *c);
428 static void delCommand(redisClient *c);
429 static void existsCommand(redisClient *c);
430 static void incrCommand(redisClient *c);
431 static void decrCommand(redisClient *c);
432 static void incrbyCommand(redisClient *c);
433 static void decrbyCommand(redisClient *c);
434 static void selectCommand(redisClient *c);
435 static void randomkeyCommand(redisClient *c);
436 static void keysCommand(redisClient *c);
437 static void dbsizeCommand(redisClient *c);
438 static void lastsaveCommand(redisClient *c);
439 static void saveCommand(redisClient *c);
440 static void bgsaveCommand(redisClient *c);
441 static void bgrewriteaofCommand(redisClient *c);
442 static void shutdownCommand(redisClient *c);
443 static void moveCommand(redisClient *c);
444 static void renameCommand(redisClient *c);
445 static void renamenxCommand(redisClient *c);
446 static void lpushCommand(redisClient *c);
447 static void rpushCommand(redisClient *c);
448 static void lpopCommand(redisClient *c);
449 static void rpopCommand(redisClient *c);
450 static void llenCommand(redisClient *c);
451 static void lindexCommand(redisClient *c);
452 static void lrangeCommand(redisClient *c);
453 static void ltrimCommand(redisClient *c);
454 static void typeCommand(redisClient *c);
455 static void lsetCommand(redisClient *c);
456 static void saddCommand(redisClient *c);
457 static void sremCommand(redisClient *c);
458 static void smoveCommand(redisClient *c);
459 static void sismemberCommand(redisClient *c);
460 static void scardCommand(redisClient *c);
461 static void spopCommand(redisClient *c);
462 static void srandmemberCommand(redisClient *c);
463 static void sinterCommand(redisClient *c);
464 static void sinterstoreCommand(redisClient *c);
465 static void sunionCommand(redisClient *c);
466 static void sunionstoreCommand(redisClient *c);
467 static void sdiffCommand(redisClient *c);
468 static void sdiffstoreCommand(redisClient *c);
469 static void syncCommand(redisClient *c);
470 static void flushdbCommand(redisClient *c);
471 static void flushallCommand(redisClient *c);
472 static void sortCommand(redisClient *c);
473 static void lremCommand(redisClient *c);
474 static void rpoplpushcommand(redisClient *c);
475 static void infoCommand(redisClient *c);
476 static void mgetCommand(redisClient *c);
477 static void monitorCommand(redisClient *c);
478 static void expireCommand(redisClient *c);
479 static void expireatCommand(redisClient *c);
480 static void getsetCommand(redisClient *c);
481 static void ttlCommand(redisClient *c);
482 static void slaveofCommand(redisClient *c);
483 static void debugCommand(redisClient *c);
484 static void msetCommand(redisClient *c);
485 static void msetnxCommand(redisClient *c);
486 static void zaddCommand(redisClient *c);
487 static void zincrbyCommand(redisClient *c);
488 static void zrangeCommand(redisClient *c);
489 static void zrangebyscoreCommand(redisClient *c);
490 static void zrevrangeCommand(redisClient *c);
491 static void zcardCommand(redisClient *c);
492 static void zremCommand(redisClient *c);
493 static void zscoreCommand(redisClient *c);
494 static void zremrangebyscoreCommand(redisClient *c);
495
496 /*================================= Globals ================================= */
497
498 /* Global vars */
499 static struct redisServer server; /* server global state */
500 static struct redisCommand cmdTable[] = {
501 {"get",getCommand,2,REDIS_CMD_INLINE},
502 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
503 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
504 {"del",delCommand,-2,REDIS_CMD_INLINE},
505 {"exists",existsCommand,2,REDIS_CMD_INLINE},
506 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
507 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
508 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
509 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
510 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
511 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
512 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
513 {"llen",llenCommand,2,REDIS_CMD_INLINE},
514 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
515 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
516 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
517 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
518 {"lrem",lremCommand,4,REDIS_CMD_BULK},
519 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
520 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
521 {"srem",sremCommand,3,REDIS_CMD_BULK},
522 {"smove",smoveCommand,4,REDIS_CMD_BULK},
523 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
524 {"scard",scardCommand,2,REDIS_CMD_INLINE},
525 {"spop",spopCommand,2,REDIS_CMD_INLINE},
526 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
527 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
528 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
529 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
530 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
531 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
532 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
533 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
534 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
535 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
536 {"zrem",zremCommand,3,REDIS_CMD_BULK},
537 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
538 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
539 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
540 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
541 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
542 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
543 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
544 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
545 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
546 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
547 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
548 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
549 {"select",selectCommand,2,REDIS_CMD_INLINE},
550 {"move",moveCommand,3,REDIS_CMD_INLINE},
551 {"rename",renameCommand,3,REDIS_CMD_INLINE},
552 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
553 {"expire",expireCommand,3,REDIS_CMD_INLINE},
554 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
555 {"keys",keysCommand,2,REDIS_CMD_INLINE},
556 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
557 {"auth",authCommand,2,REDIS_CMD_INLINE},
558 {"ping",pingCommand,1,REDIS_CMD_INLINE},
559 {"echo",echoCommand,2,REDIS_CMD_BULK},
560 {"save",saveCommand,1,REDIS_CMD_INLINE},
561 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
562 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
563 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
564 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
565 {"type",typeCommand,2,REDIS_CMD_INLINE},
566 {"sync",syncCommand,1,REDIS_CMD_INLINE},
567 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
568 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
569 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
570 {"info",infoCommand,1,REDIS_CMD_INLINE},
571 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
572 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
573 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
574 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
575 {NULL,NULL,0,0}
576 };
577
578 /*============================ Utility functions ============================ */
579
580 /* Glob-style pattern matching. */
581 int stringmatchlen(const char *pattern, int patternLen,
582 const char *string, int stringLen, int nocase)
583 {
584 while(patternLen) {
585 switch(pattern[0]) {
586 case '*':
587 while (pattern[1] == '*') {
588 pattern++;
589 patternLen--;
590 }
591 if (patternLen == 1)
592 return 1; /* match */
593 while(stringLen) {
594 if (stringmatchlen(pattern+1, patternLen-1,
595 string, stringLen, nocase))
596 return 1; /* match */
597 string++;
598 stringLen--;
599 }
600 return 0; /* no match */
601 break;
602 case '?':
603 if (stringLen == 0)
604 return 0; /* no match */
605 string++;
606 stringLen--;
607 break;
608 case '[':
609 {
610 int not, match;
611
612 pattern++;
613 patternLen--;
614 not = pattern[0] == '^';
615 if (not) {
616 pattern++;
617 patternLen--;
618 }
619 match = 0;
620 while(1) {
621 if (pattern[0] == '\\') {
622 pattern++;
623 patternLen--;
624 if (pattern[0] == string[0])
625 match = 1;
626 } else if (pattern[0] == ']') {
627 break;
628 } else if (patternLen == 0) {
629 pattern--;
630 patternLen++;
631 break;
632 } else if (pattern[1] == '-' && patternLen >= 3) {
633 int start = pattern[0];
634 int end = pattern[2];
635 int c = string[0];
636 if (start > end) {
637 int t = start;
638 start = end;
639 end = t;
640 }
641 if (nocase) {
642 start = tolower(start);
643 end = tolower(end);
644 c = tolower(c);
645 }
646 pattern += 2;
647 patternLen -= 2;
648 if (c >= start && c <= end)
649 match = 1;
650 } else {
651 if (!nocase) {
652 if (pattern[0] == string[0])
653 match = 1;
654 } else {
655 if (tolower((int)pattern[0]) == tolower((int)string[0]))
656 match = 1;
657 }
658 }
659 pattern++;
660 patternLen--;
661 }
662 if (not)
663 match = !match;
664 if (!match)
665 return 0; /* no match */
666 string++;
667 stringLen--;
668 break;
669 }
670 case '\\':
671 if (patternLen >= 2) {
672 pattern++;
673 patternLen--;
674 }
675 /* fall through */
676 default:
677 if (!nocase) {
678 if (pattern[0] != string[0])
679 return 0; /* no match */
680 } else {
681 if (tolower((int)pattern[0]) != tolower((int)string[0]))
682 return 0; /* no match */
683 }
684 string++;
685 stringLen--;
686 break;
687 }
688 pattern++;
689 patternLen--;
690 if (stringLen == 0) {
691 while(*pattern == '*') {
692 pattern++;
693 patternLen--;
694 }
695 break;
696 }
697 }
698 if (patternLen == 0 && stringLen == 0)
699 return 1;
700 return 0;
701 }
702
703 static void redisLog(int level, const char *fmt, ...) {
704 va_list ap;
705 FILE *fp;
706
707 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
708 if (!fp) return;
709
710 va_start(ap, fmt);
711 if (level >= server.verbosity) {
712 char *c = ".-*";
713 char buf[64];
714 time_t now;
715
716 now = time(NULL);
717 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
718 fprintf(fp,"%s %c ",buf,c[level]);
719 vfprintf(fp, fmt, ap);
720 fprintf(fp,"\n");
721 fflush(fp);
722 }
723 va_end(ap);
724
725 if (server.logfile) fclose(fp);
726 }
727
728 /*====================== Hash table type implementation ==================== */
729
730 /* This is an hash table type that uses the SDS dynamic strings libary as
731 * keys and radis objects as values (objects can hold SDS strings,
732 * lists, sets). */
733
734 static void dictVanillaFree(void *privdata, void *val)
735 {
736 DICT_NOTUSED(privdata);
737 zfree(val);
738 }
739
740 static int sdsDictKeyCompare(void *privdata, const void *key1,
741 const void *key2)
742 {
743 int l1,l2;
744 DICT_NOTUSED(privdata);
745
746 l1 = sdslen((sds)key1);
747 l2 = sdslen((sds)key2);
748 if (l1 != l2) return 0;
749 return memcmp(key1, key2, l1) == 0;
750 }
751
752 static void dictRedisObjectDestructor(void *privdata, void *val)
753 {
754 DICT_NOTUSED(privdata);
755
756 decrRefCount(val);
757 }
758
759 static int dictObjKeyCompare(void *privdata, const void *key1,
760 const void *key2)
761 {
762 const robj *o1 = key1, *o2 = key2;
763 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
764 }
765
766 static unsigned int dictObjHash(const void *key) {
767 const robj *o = key;
768 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
769 }
770
771 static int dictEncObjKeyCompare(void *privdata, const void *key1,
772 const void *key2)
773 {
774 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
775 int cmp;
776
777 o1 = getDecodedObject(o1);
778 o2 = getDecodedObject(o2);
779 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
780 decrRefCount(o1);
781 decrRefCount(o2);
782 return cmp;
783 }
784
785 static unsigned int dictEncObjHash(const void *key) {
786 robj *o = (robj*) key;
787
788 o = getDecodedObject(o);
789 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
790 decrRefCount(o);
791 return hash;
792 }
793
794 static dictType setDictType = {
795 dictEncObjHash, /* hash function */
796 NULL, /* key dup */
797 NULL, /* val dup */
798 dictEncObjKeyCompare, /* key compare */
799 dictRedisObjectDestructor, /* key destructor */
800 NULL /* val destructor */
801 };
802
803 static dictType zsetDictType = {
804 dictEncObjHash, /* hash function */
805 NULL, /* key dup */
806 NULL, /* val dup */
807 dictEncObjKeyCompare, /* key compare */
808 dictRedisObjectDestructor, /* key destructor */
809 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
810 };
811
812 static dictType hashDictType = {
813 dictObjHash, /* hash function */
814 NULL, /* key dup */
815 NULL, /* val dup */
816 dictObjKeyCompare, /* key compare */
817 dictRedisObjectDestructor, /* key destructor */
818 dictRedisObjectDestructor /* val destructor */
819 };
820
821 /* ========================= Random utility functions ======================= */
822
823 /* Redis generally does not try to recover from out of memory conditions
824 * when allocating objects or strings, it is not clear if it will be possible
825 * to report this condition to the client since the networking layer itself
826 * is based on heap allocation for send buffers, so we simply abort.
827 * At least the code will be simpler to read... */
828 static void oom(const char *msg) {
829 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
830 sleep(1);
831 abort();
832 }
833
834 /* ====================== Redis server networking stuff ===================== */
835 static void closeTimedoutClients(void) {
836 redisClient *c;
837 listNode *ln;
838 time_t now = time(NULL);
839
840 listRewind(server.clients);
841 while ((ln = listYield(server.clients)) != NULL) {
842 c = listNodeValue(ln);
843 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
844 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
845 (now - c->lastinteraction > server.maxidletime)) {
846 redisLog(REDIS_DEBUG,"Closing idle client");
847 freeClient(c);
848 }
849 }
850 }
851
852 static int htNeedsResize(dict *dict) {
853 long long size, used;
854
855 size = dictSlots(dict);
856 used = dictSize(dict);
857 return (size && used && size > DICT_HT_INITIAL_SIZE &&
858 (used*100/size < REDIS_HT_MINFILL));
859 }
860
861 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
862 * we resize the hash table to save memory */
863 static void tryResizeHashTables(void) {
864 int j;
865
866 for (j = 0; j < server.dbnum; j++) {
867 if (htNeedsResize(server.db[j].dict)) {
868 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
869 dictResize(server.db[j].dict);
870 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
871 }
872 if (htNeedsResize(server.db[j].expires))
873 dictResize(server.db[j].expires);
874 }
875 }
876
877 /* A background saving child (BGSAVE) terminated its work. Handle this. */
878 void backgroundSaveDoneHandler(int statloc) {
879 int exitcode = WEXITSTATUS(statloc);
880 int bysignal = WIFSIGNALED(statloc);
881
882 if (!bysignal && exitcode == 0) {
883 redisLog(REDIS_NOTICE,
884 "Background saving terminated with success");
885 server.dirty = 0;
886 server.lastsave = time(NULL);
887 } else if (!bysignal && exitcode != 0) {
888 redisLog(REDIS_WARNING, "Background saving error");
889 } else {
890 redisLog(REDIS_WARNING,
891 "Background saving terminated by signal");
892 rdbRemoveTempFile(server.bgsavechildpid);
893 }
894 server.bgsavechildpid = -1;
895 /* Possibly there are slaves waiting for a BGSAVE in order to be served
896 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
897 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
898 }
899
900 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
901 * Handle this. */
902 void backgroundRewriteDoneHandler(int statloc) {
903 int exitcode = WEXITSTATUS(statloc);
904 int bysignal = WIFSIGNALED(statloc);
905
906 if (!bysignal && exitcode == 0) {
907 int fd;
908 char tmpfile[256];
909
910 redisLog(REDIS_NOTICE,
911 "Background append only file rewriting terminated with success");
912 /* Now it's time to flush the differences accumulated by the parent */
913 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
914 fd = open(tmpfile,O_WRONLY|O_APPEND);
915 if (fd == -1) {
916 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
917 goto cleanup;
918 }
919 /* Flush our data... */
920 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
921 (signed) sdslen(server.bgrewritebuf)) {
922 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
923 close(fd);
924 goto cleanup;
925 }
926 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
927 /* Now our work is to rename the temp file into the stable file. And
928 * switch the file descriptor used by the server for append only. */
929 if (rename(tmpfile,server.appendfilename) == -1) {
930 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
931 close(fd);
932 goto cleanup;
933 }
934 /* Mission completed... almost */
935 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
936 if (server.appendfd != -1) {
937 /* If append only is actually enabled... */
938 close(server.appendfd);
939 server.appendfd = fd;
940 fsync(fd);
941 server.appendseldb = -1; /* Make sure it will issue SELECT */
942 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
943 } else {
944 /* If append only is disabled we just generate a dump in this
945 * format. Why not? */
946 close(fd);
947 }
948 } else if (!bysignal && exitcode != 0) {
949 redisLog(REDIS_WARNING, "Background append only file rewriting error");
950 } else {
951 redisLog(REDIS_WARNING,
952 "Background append only file rewriting terminated by signal");
953 }
954 cleanup:
955 sdsfree(server.bgrewritebuf);
956 server.bgrewritebuf = sdsempty();
957 aofRemoveTempFile(server.bgrewritechildpid);
958 server.bgrewritechildpid = -1;
959 }
960
961 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
962 int j, loops = server.cronloops++;
963 REDIS_NOTUSED(eventLoop);
964 REDIS_NOTUSED(id);
965 REDIS_NOTUSED(clientData);
966
967 /* Update the global state with the amount of used memory */
968 server.usedmemory = zmalloc_used_memory();
969
970 /* Show some info about non-empty databases */
971 for (j = 0; j < server.dbnum; j++) {
972 long long size, used, vkeys;
973
974 size = dictSlots(server.db[j].dict);
975 used = dictSize(server.db[j].dict);
976 vkeys = dictSize(server.db[j].expires);
977 if (!(loops % 5) && (used || vkeys)) {
978 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
979 /* dictPrintStats(server.dict); */
980 }
981 }
982
983 /* We don't want to resize the hash tables while a bacground saving
984 * is in progress: the saving child is created using fork() that is
985 * implemented with a copy-on-write semantic in most modern systems, so
986 * if we resize the HT while there is the saving child at work actually
987 * a lot of memory movements in the parent will cause a lot of pages
988 * copied. */
989 if (server.bgsavechildpid == -1) tryResizeHashTables();
990
991 /* Show information about connected clients */
992 if (!(loops % 5)) {
993 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
994 listLength(server.clients)-listLength(server.slaves),
995 listLength(server.slaves),
996 server.usedmemory,
997 dictSize(server.sharingpool));
998 }
999
1000 /* Close connections of timedout clients */
1001 if (server.maxidletime && !(loops % 10))
1002 closeTimedoutClients();
1003
1004 /* Check if a background saving or AOF rewrite in progress terminated */
1005 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
1006 int statloc;
1007 pid_t pid;
1008
1009 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1010 if (pid == server.bgsavechildpid) {
1011 backgroundSaveDoneHandler(statloc);
1012 } else {
1013 backgroundRewriteDoneHandler(statloc);
1014 }
1015 }
1016 } else {
1017 /* If there is not a background saving in progress check if
1018 * we have to save now */
1019 time_t now = time(NULL);
1020 for (j = 0; j < server.saveparamslen; j++) {
1021 struct saveparam *sp = server.saveparams+j;
1022
1023 if (server.dirty >= sp->changes &&
1024 now-server.lastsave > sp->seconds) {
1025 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1026 sp->changes, sp->seconds);
1027 rdbSaveBackground(server.dbfilename);
1028 break;
1029 }
1030 }
1031 }
1032
1033 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1034 * will use few CPU cycles if there are few expiring keys, otherwise
1035 * it will get more aggressive to avoid that too much memory is used by
1036 * keys that can be removed from the keyspace. */
1037 for (j = 0; j < server.dbnum; j++) {
1038 int expired;
1039 redisDb *db = server.db+j;
1040
1041 /* Continue to expire if at the end of the cycle more than 25%
1042 * of the keys were expired. */
1043 do {
1044 int num = dictSize(db->expires);
1045 time_t now = time(NULL);
1046
1047 expired = 0;
1048 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1049 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1050 while (num--) {
1051 dictEntry *de;
1052 time_t t;
1053
1054 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1055 t = (time_t) dictGetEntryVal(de);
1056 if (now > t) {
1057 deleteKey(db,dictGetEntryKey(de));
1058 expired++;
1059 }
1060 }
1061 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1062 }
1063
1064 /* Check if we should connect to a MASTER */
1065 if (server.replstate == REDIS_REPL_CONNECT) {
1066 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1067 if (syncWithMaster() == REDIS_OK) {
1068 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1069 }
1070 }
1071 return 1000;
1072 }
1073
1074 static void createSharedObjects(void) {
1075 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1076 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1077 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1078 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1079 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1080 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1081 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1082 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1083 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1084 /* no such key */
1085 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1086 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1087 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1088 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1089 "-ERR no such key\r\n"));
1090 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1091 "-ERR syntax error\r\n"));
1092 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1093 "-ERR source and destination objects are the same\r\n"));
1094 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1095 "-ERR index out of range\r\n"));
1096 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1097 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1098 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1099 shared.select0 = createStringObject("select 0\r\n",10);
1100 shared.select1 = createStringObject("select 1\r\n",10);
1101 shared.select2 = createStringObject("select 2\r\n",10);
1102 shared.select3 = createStringObject("select 3\r\n",10);
1103 shared.select4 = createStringObject("select 4\r\n",10);
1104 shared.select5 = createStringObject("select 5\r\n",10);
1105 shared.select6 = createStringObject("select 6\r\n",10);
1106 shared.select7 = createStringObject("select 7\r\n",10);
1107 shared.select8 = createStringObject("select 8\r\n",10);
1108 shared.select9 = createStringObject("select 9\r\n",10);
1109 }
1110
1111 static void appendServerSaveParams(time_t seconds, int changes) {
1112 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1113 server.saveparams[server.saveparamslen].seconds = seconds;
1114 server.saveparams[server.saveparamslen].changes = changes;
1115 server.saveparamslen++;
1116 }
1117
1118 static void resetServerSaveParams() {
1119 zfree(server.saveparams);
1120 server.saveparams = NULL;
1121 server.saveparamslen = 0;
1122 }
1123
1124 static void initServerConfig() {
1125 server.dbnum = REDIS_DEFAULT_DBNUM;
1126 server.port = REDIS_SERVERPORT;
1127 server.verbosity = REDIS_DEBUG;
1128 server.maxidletime = REDIS_MAXIDLETIME;
1129 server.saveparams = NULL;
1130 server.logfile = NULL; /* NULL = log on standard output */
1131 server.bindaddr = NULL;
1132 server.glueoutputbuf = 1;
1133 server.daemonize = 0;
1134 server.appendonly = 0;
1135 server.appendfsync = APPENDFSYNC_ALWAYS;
1136 server.lastfsync = time(NULL);
1137 server.appendfd = -1;
1138 server.appendseldb = -1; /* Make sure the first time will not match */
1139 server.pidfile = "/var/run/redis.pid";
1140 server.dbfilename = "dump.rdb";
1141 server.appendfilename = "appendonly.aof";
1142 server.requirepass = NULL;
1143 server.shareobjects = 0;
1144 server.sharingpoolsize = 1024;
1145 server.maxclients = 0;
1146 server.maxmemory = 0;
1147 resetServerSaveParams();
1148
1149 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1150 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1151 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1152 /* Replication related */
1153 server.isslave = 0;
1154 server.masterauth = NULL;
1155 server.masterhost = NULL;
1156 server.masterport = 6379;
1157 server.master = NULL;
1158 server.replstate = REDIS_REPL_NONE;
1159
1160 /* Double constants initialization */
1161 R_Zero = 0.0;
1162 R_PosInf = 1.0/R_Zero;
1163 R_NegInf = -1.0/R_Zero;
1164 R_Nan = R_Zero/R_Zero;
1165 }
1166
1167 static void initServer() {
1168 int j;
1169
1170 signal(SIGHUP, SIG_IGN);
1171 signal(SIGPIPE, SIG_IGN);
1172 setupSigSegvAction();
1173
1174 server.clients = listCreate();
1175 server.slaves = listCreate();
1176 server.monitors = listCreate();
1177 server.objfreelist = listCreate();
1178 createSharedObjects();
1179 server.el = aeCreateEventLoop();
1180 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1181 server.sharingpool = dictCreate(&setDictType,NULL);
1182 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1183 if (server.fd == -1) {
1184 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1185 exit(1);
1186 }
1187 for (j = 0; j < server.dbnum; j++) {
1188 server.db[j].dict = dictCreate(&hashDictType,NULL);
1189 server.db[j].expires = dictCreate(&setDictType,NULL);
1190 server.db[j].id = j;
1191 }
1192 server.cronloops = 0;
1193 server.bgsavechildpid = -1;
1194 server.bgrewritechildpid = -1;
1195 server.bgrewritebuf = sdsempty();
1196 server.lastsave = time(NULL);
1197 server.dirty = 0;
1198 server.usedmemory = 0;
1199 server.stat_numcommands = 0;
1200 server.stat_numconnections = 0;
1201 server.stat_starttime = time(NULL);
1202 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1203
1204 if (server.appendonly) {
1205 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1206 if (server.appendfd == -1) {
1207 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1208 strerror(errno));
1209 exit(1);
1210 }
1211 }
1212 }
1213
1214 /* Empty the whole database */
1215 static long long emptyDb() {
1216 int j;
1217 long long removed = 0;
1218
1219 for (j = 0; j < server.dbnum; j++) {
1220 removed += dictSize(server.db[j].dict);
1221 dictEmpty(server.db[j].dict);
1222 dictEmpty(server.db[j].expires);
1223 }
1224 return removed;
1225 }
1226
1227 static int yesnotoi(char *s) {
1228 if (!strcasecmp(s,"yes")) return 1;
1229 else if (!strcasecmp(s,"no")) return 0;
1230 else return -1;
1231 }
1232
1233 /* I agree, this is a very rudimental way to load a configuration...
1234 will improve later if the config gets more complex */
1235 static void loadServerConfig(char *filename) {
1236 FILE *fp;
1237 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1238 int linenum = 0;
1239 sds line = NULL;
1240
1241 if (filename[0] == '-' && filename[1] == '\0')
1242 fp = stdin;
1243 else {
1244 if ((fp = fopen(filename,"r")) == NULL) {
1245 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1246 exit(1);
1247 }
1248 }
1249
1250 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1251 sds *argv;
1252 int argc, j;
1253
1254 linenum++;
1255 line = sdsnew(buf);
1256 line = sdstrim(line," \t\r\n");
1257
1258 /* Skip comments and blank lines*/
1259 if (line[0] == '#' || line[0] == '\0') {
1260 sdsfree(line);
1261 continue;
1262 }
1263
1264 /* Split into arguments */
1265 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1266 sdstolower(argv[0]);
1267
1268 /* Execute config directives */
1269 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1270 server.maxidletime = atoi(argv[1]);
1271 if (server.maxidletime < 0) {
1272 err = "Invalid timeout value"; goto loaderr;
1273 }
1274 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1275 server.port = atoi(argv[1]);
1276 if (server.port < 1 || server.port > 65535) {
1277 err = "Invalid port"; goto loaderr;
1278 }
1279 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1280 server.bindaddr = zstrdup(argv[1]);
1281 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1282 int seconds = atoi(argv[1]);
1283 int changes = atoi(argv[2]);
1284 if (seconds < 1 || changes < 0) {
1285 err = "Invalid save parameters"; goto loaderr;
1286 }
1287 appendServerSaveParams(seconds,changes);
1288 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1289 if (chdir(argv[1]) == -1) {
1290 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1291 argv[1], strerror(errno));
1292 exit(1);
1293 }
1294 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1295 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1296 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1297 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1298 else {
1299 err = "Invalid log level. Must be one of debug, notice, warning";
1300 goto loaderr;
1301 }
1302 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1303 FILE *logfp;
1304
1305 server.logfile = zstrdup(argv[1]);
1306 if (!strcasecmp(server.logfile,"stdout")) {
1307 zfree(server.logfile);
1308 server.logfile = NULL;
1309 }
1310 if (server.logfile) {
1311 /* Test if we are able to open the file. The server will not
1312 * be able to abort just for this problem later... */
1313 logfp = fopen(server.logfile,"a");
1314 if (logfp == NULL) {
1315 err = sdscatprintf(sdsempty(),
1316 "Can't open the log file: %s", strerror(errno));
1317 goto loaderr;
1318 }
1319 fclose(logfp);
1320 }
1321 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1322 server.dbnum = atoi(argv[1]);
1323 if (server.dbnum < 1) {
1324 err = "Invalid number of databases"; goto loaderr;
1325 }
1326 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1327 server.maxclients = atoi(argv[1]);
1328 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1329 server.maxmemory = strtoll(argv[1], NULL, 10);
1330 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1331 server.masterhost = sdsnew(argv[1]);
1332 server.masterport = atoi(argv[2]);
1333 server.replstate = REDIS_REPL_CONNECT;
1334 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1335 server.masterauth = zstrdup(argv[1]);
1336 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1337 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1338 err = "argument must be 'yes' or 'no'"; goto loaderr;
1339 }
1340 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1341 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1342 err = "argument must be 'yes' or 'no'"; goto loaderr;
1343 }
1344 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1345 server.sharingpoolsize = atoi(argv[1]);
1346 if (server.sharingpoolsize < 1) {
1347 err = "invalid object sharing pool size"; goto loaderr;
1348 }
1349 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1350 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1351 err = "argument must be 'yes' or 'no'"; goto loaderr;
1352 }
1353 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1354 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1355 err = "argument must be 'yes' or 'no'"; goto loaderr;
1356 }
1357 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1358 if (!strcasecmp(argv[1],"no")) {
1359 server.appendfsync = APPENDFSYNC_NO;
1360 } else if (!strcasecmp(argv[1],"always")) {
1361 server.appendfsync = APPENDFSYNC_ALWAYS;
1362 } else if (!strcasecmp(argv[1],"everysec")) {
1363 server.appendfsync = APPENDFSYNC_EVERYSEC;
1364 } else {
1365 err = "argument must be 'no', 'always' or 'everysec'";
1366 goto loaderr;
1367 }
1368 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1369 server.requirepass = zstrdup(argv[1]);
1370 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1371 server.pidfile = zstrdup(argv[1]);
1372 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1373 server.dbfilename = zstrdup(argv[1]);
1374 } else {
1375 err = "Bad directive or wrong number of arguments"; goto loaderr;
1376 }
1377 for (j = 0; j < argc; j++)
1378 sdsfree(argv[j]);
1379 zfree(argv);
1380 sdsfree(line);
1381 }
1382 if (fp != stdin) fclose(fp);
1383 return;
1384
1385 loaderr:
1386 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1387 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1388 fprintf(stderr, ">>> '%s'\n", line);
1389 fprintf(stderr, "%s\n", err);
1390 exit(1);
1391 }
1392
1393 static void freeClientArgv(redisClient *c) {
1394 int j;
1395
1396 for (j = 0; j < c->argc; j++)
1397 decrRefCount(c->argv[j]);
1398 for (j = 0; j < c->mbargc; j++)
1399 decrRefCount(c->mbargv[j]);
1400 c->argc = 0;
1401 c->mbargc = 0;
1402 }
1403
1404 static void freeClient(redisClient *c) {
1405 listNode *ln;
1406
1407 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1408 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1409 sdsfree(c->querybuf);
1410 listRelease(c->reply);
1411 freeClientArgv(c);
1412 close(c->fd);
1413 ln = listSearchKey(server.clients,c);
1414 redisAssert(ln != NULL);
1415 listDelNode(server.clients,ln);
1416 if (c->flags & REDIS_SLAVE) {
1417 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1418 close(c->repldbfd);
1419 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1420 ln = listSearchKey(l,c);
1421 redisAssert(ln != NULL);
1422 listDelNode(l,ln);
1423 }
1424 if (c->flags & REDIS_MASTER) {
1425 server.master = NULL;
1426 server.replstate = REDIS_REPL_CONNECT;
1427 }
1428 zfree(c->argv);
1429 zfree(c->mbargv);
1430 zfree(c);
1431 }
1432
1433 #define GLUEREPLY_UP_TO (1024)
1434 static void glueReplyBuffersIfNeeded(redisClient *c) {
1435 int copylen = 0;
1436 char buf[GLUEREPLY_UP_TO];
1437 listNode *ln;
1438 robj *o;
1439
1440 listRewind(c->reply);
1441 while((ln = listYield(c->reply))) {
1442 int objlen;
1443
1444 o = ln->value;
1445 objlen = sdslen(o->ptr);
1446 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1447 memcpy(buf+copylen,o->ptr,objlen);
1448 copylen += objlen;
1449 listDelNode(c->reply,ln);
1450 } else {
1451 if (copylen == 0) return;
1452 break;
1453 }
1454 }
1455 /* Now the output buffer is empty, add the new single element */
1456 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1457 listAddNodeHead(c->reply,o);
1458 }
1459
1460 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1461 redisClient *c = privdata;
1462 int nwritten = 0, totwritten = 0, objlen;
1463 robj *o;
1464 REDIS_NOTUSED(el);
1465 REDIS_NOTUSED(mask);
1466
1467 /* Use writev() if we have enough buffers to send */
1468 if (!server.glueoutputbuf &&
1469 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1470 !(c->flags & REDIS_MASTER))
1471 {
1472 sendReplyToClientWritev(el, fd, privdata, mask);
1473 return;
1474 }
1475
1476 while(listLength(c->reply)) {
1477 if (server.glueoutputbuf && listLength(c->reply) > 1)
1478 glueReplyBuffersIfNeeded(c);
1479
1480 o = listNodeValue(listFirst(c->reply));
1481 objlen = sdslen(o->ptr);
1482
1483 if (objlen == 0) {
1484 listDelNode(c->reply,listFirst(c->reply));
1485 continue;
1486 }
1487
1488 if (c->flags & REDIS_MASTER) {
1489 /* Don't reply to a master */
1490 nwritten = objlen - c->sentlen;
1491 } else {
1492 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1493 if (nwritten <= 0) break;
1494 }
1495 c->sentlen += nwritten;
1496 totwritten += nwritten;
1497 /* If we fully sent the object on head go to the next one */
1498 if (c->sentlen == objlen) {
1499 listDelNode(c->reply,listFirst(c->reply));
1500 c->sentlen = 0;
1501 }
1502 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1503 * bytes, in a single threaded server it's a good idea to serve
1504 * other clients as well, even if a very large request comes from
1505 * super fast link that is always able to accept data (in real world
1506 * scenario think about 'KEYS *' against the loopback interfae) */
1507 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1508 }
1509 if (nwritten == -1) {
1510 if (errno == EAGAIN) {
1511 nwritten = 0;
1512 } else {
1513 redisLog(REDIS_DEBUG,
1514 "Error writing to client: %s", strerror(errno));
1515 freeClient(c);
1516 return;
1517 }
1518 }
1519 if (totwritten > 0) c->lastinteraction = time(NULL);
1520 if (listLength(c->reply) == 0) {
1521 c->sentlen = 0;
1522 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1523 }
1524 }
1525
1526 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1527 {
1528 redisClient *c = privdata;
1529 int nwritten = 0, totwritten = 0, objlen, willwrite;
1530 robj *o;
1531 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1532 int offset, ion = 0;
1533 REDIS_NOTUSED(el);
1534 REDIS_NOTUSED(mask);
1535
1536 listNode *node;
1537 while (listLength(c->reply)) {
1538 offset = c->sentlen;
1539 ion = 0;
1540 willwrite = 0;
1541
1542 /* fill-in the iov[] array */
1543 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1544 o = listNodeValue(node);
1545 objlen = sdslen(o->ptr);
1546
1547 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1548 break;
1549
1550 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1551 break; /* no more iovecs */
1552
1553 iov[ion].iov_base = ((char*)o->ptr) + offset;
1554 iov[ion].iov_len = objlen - offset;
1555 willwrite += objlen - offset;
1556 offset = 0; /* just for the first item */
1557 ion++;
1558 }
1559
1560 if(willwrite == 0)
1561 break;
1562
1563 /* write all collected blocks at once */
1564 if((nwritten = writev(fd, iov, ion)) < 0) {
1565 if (errno != EAGAIN) {
1566 redisLog(REDIS_DEBUG,
1567 "Error writing to client: %s", strerror(errno));
1568 freeClient(c);
1569 return;
1570 }
1571 break;
1572 }
1573
1574 totwritten += nwritten;
1575 offset = c->sentlen;
1576
1577 /* remove written robjs from c->reply */
1578 while (nwritten && listLength(c->reply)) {
1579 o = listNodeValue(listFirst(c->reply));
1580 objlen = sdslen(o->ptr);
1581
1582 if(nwritten >= objlen - offset) {
1583 listDelNode(c->reply, listFirst(c->reply));
1584 nwritten -= objlen - offset;
1585 c->sentlen = 0;
1586 } else {
1587 /* partial write */
1588 c->sentlen += nwritten;
1589 break;
1590 }
1591 offset = 0;
1592 }
1593 }
1594
1595 if (totwritten > 0)
1596 c->lastinteraction = time(NULL);
1597
1598 if (listLength(c->reply) == 0) {
1599 c->sentlen = 0;
1600 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1601 }
1602 }
1603
1604 static struct redisCommand *lookupCommand(char *name) {
1605 int j = 0;
1606 while(cmdTable[j].name != NULL) {
1607 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1608 j++;
1609 }
1610 return NULL;
1611 }
1612
1613 /* resetClient prepare the client to process the next command */
1614 static void resetClient(redisClient *c) {
1615 freeClientArgv(c);
1616 c->bulklen = -1;
1617 c->multibulk = 0;
1618 }
1619
1620 /* If this function gets called we already read a whole
1621 * command, argments are in the client argv/argc fields.
1622 * processCommand() execute the command or prepare the
1623 * server for a bulk read from the client.
1624 *
1625 * If 1 is returned the client is still alive and valid and
1626 * and other operations can be performed by the caller. Otherwise
1627 * if 0 is returned the client was destroied (i.e. after QUIT). */
1628 static int processCommand(redisClient *c) {
1629 struct redisCommand *cmd;
1630 long long dirty;
1631
1632 /* Free some memory if needed (maxmemory setting) */
1633 if (server.maxmemory) freeMemoryIfNeeded();
1634
1635 /* Handle the multi bulk command type. This is an alternative protocol
1636 * supported by Redis in order to receive commands that are composed of
1637 * multiple binary-safe "bulk" arguments. The latency of processing is
1638 * a bit higher but this allows things like multi-sets, so if this
1639 * protocol is used only for MSET and similar commands this is a big win. */
1640 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1641 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1642 if (c->multibulk <= 0) {
1643 resetClient(c);
1644 return 1;
1645 } else {
1646 decrRefCount(c->argv[c->argc-1]);
1647 c->argc--;
1648 return 1;
1649 }
1650 } else if (c->multibulk) {
1651 if (c->bulklen == -1) {
1652 if (((char*)c->argv[0]->ptr)[0] != '$') {
1653 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1654 resetClient(c);
1655 return 1;
1656 } else {
1657 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1658 decrRefCount(c->argv[0]);
1659 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1660 c->argc--;
1661 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1662 resetClient(c);
1663 return 1;
1664 }
1665 c->argc--;
1666 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1667 return 1;
1668 }
1669 } else {
1670 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1671 c->mbargv[c->mbargc] = c->argv[0];
1672 c->mbargc++;
1673 c->argc--;
1674 c->multibulk--;
1675 if (c->multibulk == 0) {
1676 robj **auxargv;
1677 int auxargc;
1678
1679 /* Here we need to swap the multi-bulk argc/argv with the
1680 * normal argc/argv of the client structure. */
1681 auxargv = c->argv;
1682 c->argv = c->mbargv;
1683 c->mbargv = auxargv;
1684
1685 auxargc = c->argc;
1686 c->argc = c->mbargc;
1687 c->mbargc = auxargc;
1688
1689 /* We need to set bulklen to something different than -1
1690 * in order for the code below to process the command without
1691 * to try to read the last argument of a bulk command as
1692 * a special argument. */
1693 c->bulklen = 0;
1694 /* continue below and process the command */
1695 } else {
1696 c->bulklen = -1;
1697 return 1;
1698 }
1699 }
1700 }
1701 /* -- end of multi bulk commands processing -- */
1702
1703 /* The QUIT command is handled as a special case. Normal command
1704 * procs are unable to close the client connection safely */
1705 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1706 freeClient(c);
1707 return 0;
1708 }
1709 cmd = lookupCommand(c->argv[0]->ptr);
1710 if (!cmd) {
1711 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1712 resetClient(c);
1713 return 1;
1714 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1715 (c->argc < -cmd->arity)) {
1716 addReplySds(c,
1717 sdscatprintf(sdsempty(),
1718 "-ERR wrong number of arguments for '%s' command\r\n",
1719 cmd->name));
1720 resetClient(c);
1721 return 1;
1722 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1723 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1724 resetClient(c);
1725 return 1;
1726 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1727 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1728
1729 decrRefCount(c->argv[c->argc-1]);
1730 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1731 c->argc--;
1732 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1733 resetClient(c);
1734 return 1;
1735 }
1736 c->argc--;
1737 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1738 /* It is possible that the bulk read is already in the
1739 * buffer. Check this condition and handle it accordingly.
1740 * This is just a fast path, alternative to call processInputBuffer().
1741 * It's a good idea since the code is small and this condition
1742 * happens most of the times. */
1743 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1744 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1745 c->argc++;
1746 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1747 } else {
1748 return 1;
1749 }
1750 }
1751 /* Let's try to share objects on the command arguments vector */
1752 if (server.shareobjects) {
1753 int j;
1754 for(j = 1; j < c->argc; j++)
1755 c->argv[j] = tryObjectSharing(c->argv[j]);
1756 }
1757 /* Let's try to encode the bulk object to save space. */
1758 if (cmd->flags & REDIS_CMD_BULK)
1759 tryObjectEncoding(c->argv[c->argc-1]);
1760
1761 /* Check if the user is authenticated */
1762 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1763 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1764 resetClient(c);
1765 return 1;
1766 }
1767
1768 /* Exec the command */
1769 dirty = server.dirty;
1770 cmd->proc(c);
1771 if (server.appendonly && server.dirty-dirty)
1772 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1773 if (server.dirty-dirty && listLength(server.slaves))
1774 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1775 if (listLength(server.monitors))
1776 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1777 server.stat_numcommands++;
1778
1779 /* Prepare the client for the next command */
1780 if (c->flags & REDIS_CLOSE) {
1781 freeClient(c);
1782 return 0;
1783 }
1784 resetClient(c);
1785 return 1;
1786 }
1787
1788 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1789 listNode *ln;
1790 int outc = 0, j;
1791 robj **outv;
1792 /* (args*2)+1 is enough room for args, spaces, newlines */
1793 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1794
1795 if (argc <= REDIS_STATIC_ARGS) {
1796 outv = static_outv;
1797 } else {
1798 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1799 }
1800
1801 for (j = 0; j < argc; j++) {
1802 if (j != 0) outv[outc++] = shared.space;
1803 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1804 robj *lenobj;
1805
1806 lenobj = createObject(REDIS_STRING,
1807 sdscatprintf(sdsempty(),"%lu\r\n",
1808 (unsigned long) stringObjectLen(argv[j])));
1809 lenobj->refcount = 0;
1810 outv[outc++] = lenobj;
1811 }
1812 outv[outc++] = argv[j];
1813 }
1814 outv[outc++] = shared.crlf;
1815
1816 /* Increment all the refcounts at start and decrement at end in order to
1817 * be sure to free objects if there is no slave in a replication state
1818 * able to be feed with commands */
1819 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1820 listRewind(slaves);
1821 while((ln = listYield(slaves))) {
1822 redisClient *slave = ln->value;
1823
1824 /* Don't feed slaves that are still waiting for BGSAVE to start */
1825 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1826
1827 /* Feed all the other slaves, MONITORs and so on */
1828 if (slave->slaveseldb != dictid) {
1829 robj *selectcmd;
1830
1831 switch(dictid) {
1832 case 0: selectcmd = shared.select0; break;
1833 case 1: selectcmd = shared.select1; break;
1834 case 2: selectcmd = shared.select2; break;
1835 case 3: selectcmd = shared.select3; break;
1836 case 4: selectcmd = shared.select4; break;
1837 case 5: selectcmd = shared.select5; break;
1838 case 6: selectcmd = shared.select6; break;
1839 case 7: selectcmd = shared.select7; break;
1840 case 8: selectcmd = shared.select8; break;
1841 case 9: selectcmd = shared.select9; break;
1842 default:
1843 selectcmd = createObject(REDIS_STRING,
1844 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1845 selectcmd->refcount = 0;
1846 break;
1847 }
1848 addReply(slave,selectcmd);
1849 slave->slaveseldb = dictid;
1850 }
1851 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1852 }
1853 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1854 if (outv != static_outv) zfree(outv);
1855 }
1856
1857 static void processInputBuffer(redisClient *c) {
1858 again:
1859 if (c->bulklen == -1) {
1860 /* Read the first line of the query */
1861 char *p = strchr(c->querybuf,'\n');
1862 size_t querylen;
1863
1864 if (p) {
1865 sds query, *argv;
1866 int argc, j;
1867
1868 query = c->querybuf;
1869 c->querybuf = sdsempty();
1870 querylen = 1+(p-(query));
1871 if (sdslen(query) > querylen) {
1872 /* leave data after the first line of the query in the buffer */
1873 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1874 }
1875 *p = '\0'; /* remove "\n" */
1876 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1877 sdsupdatelen(query);
1878
1879 /* Now we can split the query in arguments */
1880 if (sdslen(query) == 0) {
1881 /* Ignore empty query */
1882 sdsfree(query);
1883 if (sdslen(c->querybuf)) goto again;
1884 return;
1885 }
1886 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1887 sdsfree(query);
1888
1889 if (c->argv) zfree(c->argv);
1890 c->argv = zmalloc(sizeof(robj*)*argc);
1891
1892 for (j = 0; j < argc; j++) {
1893 if (sdslen(argv[j])) {
1894 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1895 c->argc++;
1896 } else {
1897 sdsfree(argv[j]);
1898 }
1899 }
1900 zfree(argv);
1901 if (c->argc) {
1902 /* Execute the command. If the client is still valid
1903 * after processCommand() return and there is something
1904 * on the query buffer try to process the next command. */
1905 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1906 } else {
1907 /* Nothing to process, argc == 0. Just process the query
1908 * buffer if it's not empty or return to the caller */
1909 if (sdslen(c->querybuf)) goto again;
1910 }
1911 return;
1912 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1913 redisLog(REDIS_DEBUG, "Client protocol error");
1914 freeClient(c);
1915 return;
1916 }
1917 } else {
1918 /* Bulk read handling. Note that if we are at this point
1919 the client already sent a command terminated with a newline,
1920 we are reading the bulk data that is actually the last
1921 argument of the command. */
1922 int qbl = sdslen(c->querybuf);
1923
1924 if (c->bulklen <= qbl) {
1925 /* Copy everything but the final CRLF as final argument */
1926 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1927 c->argc++;
1928 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1929 /* Process the command. If the client is still valid after
1930 * the processing and there is more data in the buffer
1931 * try to parse it. */
1932 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1933 return;
1934 }
1935 }
1936 }
1937
1938 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1939 redisClient *c = (redisClient*) privdata;
1940 char buf[REDIS_IOBUF_LEN];
1941 int nread;
1942 REDIS_NOTUSED(el);
1943 REDIS_NOTUSED(mask);
1944
1945 nread = read(fd, buf, REDIS_IOBUF_LEN);
1946 if (nread == -1) {
1947 if (errno == EAGAIN) {
1948 nread = 0;
1949 } else {
1950 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1951 freeClient(c);
1952 return;
1953 }
1954 } else if (nread == 0) {
1955 redisLog(REDIS_DEBUG, "Client closed connection");
1956 freeClient(c);
1957 return;
1958 }
1959 if (nread) {
1960 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1961 c->lastinteraction = time(NULL);
1962 } else {
1963 return;
1964 }
1965 processInputBuffer(c);
1966 }
1967
1968 static int selectDb(redisClient *c, int id) {
1969 if (id < 0 || id >= server.dbnum)
1970 return REDIS_ERR;
1971 c->db = &server.db[id];
1972 return REDIS_OK;
1973 }
1974
1975 static void *dupClientReplyValue(void *o) {
1976 incrRefCount((robj*)o);
1977 return 0;
1978 }
1979
1980 static redisClient *createClient(int fd) {
1981 redisClient *c = zmalloc(sizeof(*c));
1982
1983 anetNonBlock(NULL,fd);
1984 anetTcpNoDelay(NULL,fd);
1985 if (!c) return NULL;
1986 selectDb(c,0);
1987 c->fd = fd;
1988 c->querybuf = sdsempty();
1989 c->argc = 0;
1990 c->argv = NULL;
1991 c->bulklen = -1;
1992 c->multibulk = 0;
1993 c->mbargc = 0;
1994 c->mbargv = NULL;
1995 c->sentlen = 0;
1996 c->flags = 0;
1997 c->lastinteraction = time(NULL);
1998 c->authenticated = 0;
1999 c->replstate = REDIS_REPL_NONE;
2000 c->reply = listCreate();
2001 listSetFreeMethod(c->reply,decrRefCount);
2002 listSetDupMethod(c->reply,dupClientReplyValue);
2003 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
2004 readQueryFromClient, c) == AE_ERR) {
2005 freeClient(c);
2006 return NULL;
2007 }
2008 listAddNodeTail(server.clients,c);
2009 return c;
2010 }
2011
2012 static void addReply(redisClient *c, robj *obj) {
2013 if (listLength(c->reply) == 0 &&
2014 (c->replstate == REDIS_REPL_NONE ||
2015 c->replstate == REDIS_REPL_ONLINE) &&
2016 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
2017 sendReplyToClient, c) == AE_ERR) return;
2018 listAddNodeTail(c->reply,getDecodedObject(obj));
2019 }
2020
2021 static void addReplySds(redisClient *c, sds s) {
2022 robj *o = createObject(REDIS_STRING,s);
2023 addReply(c,o);
2024 decrRefCount(o);
2025 }
2026
2027 static void addReplyDouble(redisClient *c, double d) {
2028 char buf[128];
2029
2030 snprintf(buf,sizeof(buf),"%.17g",d);
2031 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2032 (unsigned long) strlen(buf),buf));
2033 }
2034
2035 static void addReplyBulkLen(redisClient *c, robj *obj) {
2036 size_t len;
2037
2038 if (obj->encoding == REDIS_ENCODING_RAW) {
2039 len = sdslen(obj->ptr);
2040 } else {
2041 long n = (long)obj->ptr;
2042
2043 len = 1;
2044 if (n < 0) {
2045 len++;
2046 n = -n;
2047 }
2048 while((n = n/10) != 0) {
2049 len++;
2050 }
2051 }
2052 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
2053 }
2054
2055 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2056 int cport, cfd;
2057 char cip[128];
2058 redisClient *c;
2059 REDIS_NOTUSED(el);
2060 REDIS_NOTUSED(mask);
2061 REDIS_NOTUSED(privdata);
2062
2063 cfd = anetAccept(server.neterr, fd, cip, &cport);
2064 if (cfd == AE_ERR) {
2065 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2066 return;
2067 }
2068 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2069 if ((c = createClient(cfd)) == NULL) {
2070 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2071 close(cfd); /* May be already closed, just ingore errors */
2072 return;
2073 }
2074 /* If maxclient directive is set and this is one client more... close the
2075 * connection. Note that we create the client instead to check before
2076 * for this condition, since now the socket is already set in nonblocking
2077 * mode and we can send an error for free using the Kernel I/O */
2078 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2079 char *err = "-ERR max number of clients reached\r\n";
2080
2081 /* That's a best effort error message, don't check write errors */
2082 if (write(c->fd,err,strlen(err)) == -1) {
2083 /* Nothing to do, Just to avoid the warning... */
2084 }
2085 freeClient(c);
2086 return;
2087 }
2088 server.stat_numconnections++;
2089 }
2090
2091 /* ======================= Redis objects implementation ===================== */
2092
2093 static robj *createObject(int type, void *ptr) {
2094 robj *o;
2095
2096 if (listLength(server.objfreelist)) {
2097 listNode *head = listFirst(server.objfreelist);
2098 o = listNodeValue(head);
2099 listDelNode(server.objfreelist,head);
2100 } else {
2101 o = zmalloc(sizeof(*o));
2102 }
2103 o->type = type;
2104 o->encoding = REDIS_ENCODING_RAW;
2105 o->ptr = ptr;
2106 o->refcount = 1;
2107 return o;
2108 }
2109
2110 static robj *createStringObject(char *ptr, size_t len) {
2111 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2112 }
2113
2114 static robj *createListObject(void) {
2115 list *l = listCreate();
2116
2117 listSetFreeMethod(l,decrRefCount);
2118 return createObject(REDIS_LIST,l);
2119 }
2120
2121 static robj *createSetObject(void) {
2122 dict *d = dictCreate(&setDictType,NULL);
2123 return createObject(REDIS_SET,d);
2124 }
2125
2126 static robj *createZsetObject(void) {
2127 zset *zs = zmalloc(sizeof(*zs));
2128
2129 zs->dict = dictCreate(&zsetDictType,NULL);
2130 zs->zsl = zslCreate();
2131 return createObject(REDIS_ZSET,zs);
2132 }
2133
2134 static void freeStringObject(robj *o) {
2135 if (o->encoding == REDIS_ENCODING_RAW) {
2136 sdsfree(o->ptr);
2137 }
2138 }
2139
2140 static void freeListObject(robj *o) {
2141 listRelease((list*) o->ptr);
2142 }
2143
2144 static void freeSetObject(robj *o) {
2145 dictRelease((dict*) o->ptr);
2146 }
2147
2148 static void freeZsetObject(robj *o) {
2149 zset *zs = o->ptr;
2150
2151 dictRelease(zs->dict);
2152 zslFree(zs->zsl);
2153 zfree(zs);
2154 }
2155
2156 static void freeHashObject(robj *o) {
2157 dictRelease((dict*) o->ptr);
2158 }
2159
2160 static void incrRefCount(robj *o) {
2161 o->refcount++;
2162 #ifdef DEBUG_REFCOUNT
2163 if (o->type == REDIS_STRING)
2164 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2165 #endif
2166 }
2167
2168 static void decrRefCount(void *obj) {
2169 robj *o = obj;
2170
2171 #ifdef DEBUG_REFCOUNT
2172 if (o->type == REDIS_STRING)
2173 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2174 #endif
2175 if (--(o->refcount) == 0) {
2176 switch(o->type) {
2177 case REDIS_STRING: freeStringObject(o); break;
2178 case REDIS_LIST: freeListObject(o); break;
2179 case REDIS_SET: freeSetObject(o); break;
2180 case REDIS_ZSET: freeZsetObject(o); break;
2181 case REDIS_HASH: freeHashObject(o); break;
2182 default: redisAssert(0 != 0); break;
2183 }
2184 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2185 !listAddNodeHead(server.objfreelist,o))
2186 zfree(o);
2187 }
2188 }
2189
2190 static robj *lookupKey(redisDb *db, robj *key) {
2191 dictEntry *de = dictFind(db->dict,key);
2192 return de ? dictGetEntryVal(de) : NULL;
2193 }
2194
2195 static robj *lookupKeyRead(redisDb *db, robj *key) {
2196 expireIfNeeded(db,key);
2197 return lookupKey(db,key);
2198 }
2199
2200 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2201 deleteIfVolatile(db,key);
2202 return lookupKey(db,key);
2203 }
2204
2205 static int deleteKey(redisDb *db, robj *key) {
2206 int retval;
2207
2208 /* We need to protect key from destruction: after the first dictDelete()
2209 * it may happen that 'key' is no longer valid if we don't increment
2210 * it's count. This may happen when we get the object reference directly
2211 * from the hash table with dictRandomKey() or dict iterators */
2212 incrRefCount(key);
2213 if (dictSize(db->expires)) dictDelete(db->expires,key);
2214 retval = dictDelete(db->dict,key);
2215 decrRefCount(key);
2216
2217 return retval == DICT_OK;
2218 }
2219
2220 /* Try to share an object against the shared objects pool */
2221 static robj *tryObjectSharing(robj *o) {
2222 struct dictEntry *de;
2223 unsigned long c;
2224
2225 if (o == NULL || server.shareobjects == 0) return o;
2226
2227 redisAssert(o->type == REDIS_STRING);
2228 de = dictFind(server.sharingpool,o);
2229 if (de) {
2230 robj *shared = dictGetEntryKey(de);
2231
2232 c = ((unsigned long) dictGetEntryVal(de))+1;
2233 dictGetEntryVal(de) = (void*) c;
2234 incrRefCount(shared);
2235 decrRefCount(o);
2236 return shared;
2237 } else {
2238 /* Here we are using a stream algorihtm: Every time an object is
2239 * shared we increment its count, everytime there is a miss we
2240 * recrement the counter of a random object. If this object reaches
2241 * zero we remove the object and put the current object instead. */
2242 if (dictSize(server.sharingpool) >=
2243 server.sharingpoolsize) {
2244 de = dictGetRandomKey(server.sharingpool);
2245 redisAssert(de != NULL);
2246 c = ((unsigned long) dictGetEntryVal(de))-1;
2247 dictGetEntryVal(de) = (void*) c;
2248 if (c == 0) {
2249 dictDelete(server.sharingpool,de->key);
2250 }
2251 } else {
2252 c = 0; /* If the pool is empty we want to add this object */
2253 }
2254 if (c == 0) {
2255 int retval;
2256
2257 retval = dictAdd(server.sharingpool,o,(void*)1);
2258 redisAssert(retval == DICT_OK);
2259 incrRefCount(o);
2260 }
2261 return o;
2262 }
2263 }
2264
2265 /* Check if the nul-terminated string 's' can be represented by a long
2266 * (that is, is a number that fits into long without any other space or
2267 * character before or after the digits).
2268 *
2269 * If so, the function returns REDIS_OK and *longval is set to the value
2270 * of the number. Otherwise REDIS_ERR is returned */
2271 static int isStringRepresentableAsLong(sds s, long *longval) {
2272 char buf[32], *endptr;
2273 long value;
2274 int slen;
2275
2276 value = strtol(s, &endptr, 10);
2277 if (endptr[0] != '\0') return REDIS_ERR;
2278 slen = snprintf(buf,32,"%ld",value);
2279
2280 /* If the number converted back into a string is not identical
2281 * then it's not possible to encode the string as integer */
2282 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2283 if (longval) *longval = value;
2284 return REDIS_OK;
2285 }
2286
2287 /* Try to encode a string object in order to save space */
2288 static int tryObjectEncoding(robj *o) {
2289 long value;
2290 sds s = o->ptr;
2291
2292 if (o->encoding != REDIS_ENCODING_RAW)
2293 return REDIS_ERR; /* Already encoded */
2294
2295 /* It's not save to encode shared objects: shared objects can be shared
2296 * everywhere in the "object space" of Redis. Encoded objects can only
2297 * appear as "values" (and not, for instance, as keys) */
2298 if (o->refcount > 1) return REDIS_ERR;
2299
2300 /* Currently we try to encode only strings */
2301 redisAssert(o->type == REDIS_STRING);
2302
2303 /* Check if we can represent this string as a long integer */
2304 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2305
2306 /* Ok, this object can be encoded */
2307 o->encoding = REDIS_ENCODING_INT;
2308 sdsfree(o->ptr);
2309 o->ptr = (void*) value;
2310 return REDIS_OK;
2311 }
2312
2313 /* Get a decoded version of an encoded object (returned as a new object).
2314 * If the object is already raw-encoded just increment the ref count. */
2315 static robj *getDecodedObject(robj *o) {
2316 robj *dec;
2317
2318 if (o->encoding == REDIS_ENCODING_RAW) {
2319 incrRefCount(o);
2320 return o;
2321 }
2322 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2323 char buf[32];
2324
2325 snprintf(buf,32,"%ld",(long)o->ptr);
2326 dec = createStringObject(buf,strlen(buf));
2327 return dec;
2328 } else {
2329 redisAssert(1 != 1);
2330 }
2331 }
2332
2333 /* Compare two string objects via strcmp() or alike.
2334 * Note that the objects may be integer-encoded. In such a case we
2335 * use snprintf() to get a string representation of the numbers on the stack
2336 * and compare the strings, it's much faster than calling getDecodedObject().
2337 *
2338 * Important note: if objects are not integer encoded, but binary-safe strings,
2339 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2340 * binary safe. */
2341 static int compareStringObjects(robj *a, robj *b) {
2342 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2343 char bufa[128], bufb[128], *astr, *bstr;
2344 int bothsds = 1;
2345
2346 if (a == b) return 0;
2347 if (a->encoding != REDIS_ENCODING_RAW) {
2348 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2349 astr = bufa;
2350 bothsds = 0;
2351 } else {
2352 astr = a->ptr;
2353 }
2354 if (b->encoding != REDIS_ENCODING_RAW) {
2355 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2356 bstr = bufb;
2357 bothsds = 0;
2358 } else {
2359 bstr = b->ptr;
2360 }
2361 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2362 }
2363
2364 static size_t stringObjectLen(robj *o) {
2365 redisAssert(o->type == REDIS_STRING);
2366 if (o->encoding == REDIS_ENCODING_RAW) {
2367 return sdslen(o->ptr);
2368 } else {
2369 char buf[32];
2370
2371 return snprintf(buf,32,"%ld",(long)o->ptr);
2372 }
2373 }
2374
2375 /*============================ DB saving/loading ============================ */
2376
2377 static int rdbSaveType(FILE *fp, unsigned char type) {
2378 if (fwrite(&type,1,1,fp) == 0) return -1;
2379 return 0;
2380 }
2381
2382 static int rdbSaveTime(FILE *fp, time_t t) {
2383 int32_t t32 = (int32_t) t;
2384 if (fwrite(&t32,4,1,fp) == 0) return -1;
2385 return 0;
2386 }
2387
2388 /* check rdbLoadLen() comments for more info */
2389 static int rdbSaveLen(FILE *fp, uint32_t len) {
2390 unsigned char buf[2];
2391
2392 if (len < (1<<6)) {
2393 /* Save a 6 bit len */
2394 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2395 if (fwrite(buf,1,1,fp) == 0) return -1;
2396 } else if (len < (1<<14)) {
2397 /* Save a 14 bit len */
2398 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2399 buf[1] = len&0xFF;
2400 if (fwrite(buf,2,1,fp) == 0) return -1;
2401 } else {
2402 /* Save a 32 bit len */
2403 buf[0] = (REDIS_RDB_32BITLEN<<6);
2404 if (fwrite(buf,1,1,fp) == 0) return -1;
2405 len = htonl(len);
2406 if (fwrite(&len,4,1,fp) == 0) return -1;
2407 }
2408 return 0;
2409 }
2410
2411 /* String objects in the form "2391" "-100" without any space and with a
2412 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2413 * encoded as integers to save space */
2414 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2415 long long value;
2416 char *endptr, buf[32];
2417
2418 /* Check if it's possible to encode this value as a number */
2419 value = strtoll(s, &endptr, 10);
2420 if (endptr[0] != '\0') return 0;
2421 snprintf(buf,32,"%lld",value);
2422
2423 /* If the number converted back into a string is not identical
2424 * then it's not possible to encode the string as integer */
2425 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2426
2427 /* Finally check if it fits in our ranges */
2428 if (value >= -(1<<7) && value <= (1<<7)-1) {
2429 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2430 enc[1] = value&0xFF;
2431 return 2;
2432 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2433 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2434 enc[1] = value&0xFF;
2435 enc[2] = (value>>8)&0xFF;
2436 return 3;
2437 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2438 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2439 enc[1] = value&0xFF;
2440 enc[2] = (value>>8)&0xFF;
2441 enc[3] = (value>>16)&0xFF;
2442 enc[4] = (value>>24)&0xFF;
2443 return 5;
2444 } else {
2445 return 0;
2446 }
2447 }
2448
2449 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2450 unsigned int comprlen, outlen;
2451 unsigned char byte;
2452 void *out;
2453
2454 /* We require at least four bytes compression for this to be worth it */
2455 outlen = sdslen(obj->ptr)-4;
2456 if (outlen <= 0) return 0;
2457 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2458 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2459 if (comprlen == 0) {
2460 zfree(out);
2461 return 0;
2462 }
2463 /* Data compressed! Let's save it on disk */
2464 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2465 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2466 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2467 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2468 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2469 zfree(out);
2470 return comprlen;
2471
2472 writeerr:
2473 zfree(out);
2474 return -1;
2475 }
2476
2477 /* Save a string objet as [len][data] on disk. If the object is a string
2478 * representation of an integer value we try to safe it in a special form */
2479 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2480 size_t len;
2481 int enclen;
2482
2483 len = sdslen(obj->ptr);
2484
2485 /* Try integer encoding */
2486 if (len <= 11) {
2487 unsigned char buf[5];
2488 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2489 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2490 return 0;
2491 }
2492 }
2493
2494 /* Try LZF compression - under 20 bytes it's unable to compress even
2495 * aaaaaaaaaaaaaaaaaa so skip it */
2496 if (len > 20) {
2497 int retval;
2498
2499 retval = rdbSaveLzfStringObject(fp,obj);
2500 if (retval == -1) return -1;
2501 if (retval > 0) return 0;
2502 /* retval == 0 means data can't be compressed, save the old way */
2503 }
2504
2505 /* Store verbatim */
2506 if (rdbSaveLen(fp,len) == -1) return -1;
2507 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2508 return 0;
2509 }
2510
2511 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2512 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2513 int retval;
2514
2515 obj = getDecodedObject(obj);
2516 retval = rdbSaveStringObjectRaw(fp,obj);
2517 decrRefCount(obj);
2518 return retval;
2519 }
2520
2521 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2522 * 8 bit integer specifing the length of the representation.
2523 * This 8 bit integer has special values in order to specify the following
2524 * conditions:
2525 * 253: not a number
2526 * 254: + inf
2527 * 255: - inf
2528 */
2529 static int rdbSaveDoubleValue(FILE *fp, double val) {
2530 unsigned char buf[128];
2531 int len;
2532
2533 if (isnan(val)) {
2534 buf[0] = 253;
2535 len = 1;
2536 } else if (!isfinite(val)) {
2537 len = 1;
2538 buf[0] = (val < 0) ? 255 : 254;
2539 } else {
2540 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2541 buf[0] = strlen((char*)buf+1);
2542 len = buf[0]+1;
2543 }
2544 if (fwrite(buf,len,1,fp) == 0) return -1;
2545 return 0;
2546 }
2547
2548 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2549 static int rdbSave(char *filename) {
2550 dictIterator *di = NULL;
2551 dictEntry *de;
2552 FILE *fp;
2553 char tmpfile[256];
2554 int j;
2555 time_t now = time(NULL);
2556
2557 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2558 fp = fopen(tmpfile,"w");
2559 if (!fp) {
2560 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2561 return REDIS_ERR;
2562 }
2563 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2564 for (j = 0; j < server.dbnum; j++) {
2565 redisDb *db = server.db+j;
2566 dict *d = db->dict;
2567 if (dictSize(d) == 0) continue;
2568 di = dictGetIterator(d);
2569 if (!di) {
2570 fclose(fp);
2571 return REDIS_ERR;
2572 }
2573
2574 /* Write the SELECT DB opcode */
2575 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2576 if (rdbSaveLen(fp,j) == -1) goto werr;
2577
2578 /* Iterate this DB writing every entry */
2579 while((de = dictNext(di)) != NULL) {
2580 robj *key = dictGetEntryKey(de);
2581 robj *o = dictGetEntryVal(de);
2582 time_t expiretime = getExpire(db,key);
2583
2584 /* Save the expire time */
2585 if (expiretime != -1) {
2586 /* If this key is already expired skip it */
2587 if (expiretime < now) continue;
2588 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2589 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2590 }
2591 /* Save the key and associated value */
2592 if (rdbSaveType(fp,o->type) == -1) goto werr;
2593 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2594 if (o->type == REDIS_STRING) {
2595 /* Save a string value */
2596 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2597 } else if (o->type == REDIS_LIST) {
2598 /* Save a list value */
2599 list *list = o->ptr;
2600 listNode *ln;
2601
2602 listRewind(list);
2603 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2604 while((ln = listYield(list))) {
2605 robj *eleobj = listNodeValue(ln);
2606
2607 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2608 }
2609 } else if (o->type == REDIS_SET) {
2610 /* Save a set value */
2611 dict *set = o->ptr;
2612 dictIterator *di = dictGetIterator(set);
2613 dictEntry *de;
2614
2615 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2616 while((de = dictNext(di)) != NULL) {
2617 robj *eleobj = dictGetEntryKey(de);
2618
2619 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2620 }
2621 dictReleaseIterator(di);
2622 } else if (o->type == REDIS_ZSET) {
2623 /* Save a set value */
2624 zset *zs = o->ptr;
2625 dictIterator *di = dictGetIterator(zs->dict);
2626 dictEntry *de;
2627
2628 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2629 while((de = dictNext(di)) != NULL) {
2630 robj *eleobj = dictGetEntryKey(de);
2631 double *score = dictGetEntryVal(de);
2632
2633 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2634 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2635 }
2636 dictReleaseIterator(di);
2637 } else {
2638 redisAssert(0 != 0);
2639 }
2640 }
2641 dictReleaseIterator(di);
2642 }
2643 /* EOF opcode */
2644 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2645
2646 /* Make sure data will not remain on the OS's output buffers */
2647 fflush(fp);
2648 fsync(fileno(fp));
2649 fclose(fp);
2650
2651 /* Use RENAME to make sure the DB file is changed atomically only
2652 * if the generate DB file is ok. */
2653 if (rename(tmpfile,filename) == -1) {
2654 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2655 unlink(tmpfile);
2656 return REDIS_ERR;
2657 }
2658 redisLog(REDIS_NOTICE,"DB saved on disk");
2659 server.dirty = 0;
2660 server.lastsave = time(NULL);
2661 return REDIS_OK;
2662
2663 werr:
2664 fclose(fp);
2665 unlink(tmpfile);
2666 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2667 if (di) dictReleaseIterator(di);
2668 return REDIS_ERR;
2669 }
2670
2671 static int rdbSaveBackground(char *filename) {
2672 pid_t childpid;
2673
2674 if (server.bgsavechildpid != -1) return REDIS_ERR;
2675 if ((childpid = fork()) == 0) {
2676 /* Child */
2677 close(server.fd);
2678 if (rdbSave(filename) == REDIS_OK) {
2679 exit(0);
2680 } else {
2681 exit(1);
2682 }
2683 } else {
2684 /* Parent */
2685 if (childpid == -1) {
2686 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2687 strerror(errno));
2688 return REDIS_ERR;
2689 }
2690 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2691 server.bgsavechildpid = childpid;
2692 return REDIS_OK;
2693 }
2694 return REDIS_OK; /* unreached */
2695 }
2696
2697 static void rdbRemoveTempFile(pid_t childpid) {
2698 char tmpfile[256];
2699
2700 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2701 unlink(tmpfile);
2702 }
2703
2704 static int rdbLoadType(FILE *fp) {
2705 unsigned char type;
2706 if (fread(&type,1,1,fp) == 0) return -1;
2707 return type;
2708 }
2709
2710 static time_t rdbLoadTime(FILE *fp) {
2711 int32_t t32;
2712 if (fread(&t32,4,1,fp) == 0) return -1;
2713 return (time_t) t32;
2714 }
2715
2716 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2717 * of this file for a description of how this are stored on disk.
2718 *
2719 * isencoded is set to 1 if the readed length is not actually a length but
2720 * an "encoding type", check the above comments for more info */
2721 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2722 unsigned char buf[2];
2723 uint32_t len;
2724
2725 if (isencoded) *isencoded = 0;
2726 if (rdbver == 0) {
2727 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2728 return ntohl(len);
2729 } else {
2730 int type;
2731
2732 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2733 type = (buf[0]&0xC0)>>6;
2734 if (type == REDIS_RDB_6BITLEN) {
2735 /* Read a 6 bit len */
2736 return buf[0]&0x3F;
2737 } else if (type == REDIS_RDB_ENCVAL) {
2738 /* Read a 6 bit len encoding type */
2739 if (isencoded) *isencoded = 1;
2740 return buf[0]&0x3F;
2741 } else if (type == REDIS_RDB_14BITLEN) {
2742 /* Read a 14 bit len */
2743 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2744 return ((buf[0]&0x3F)<<8)|buf[1];
2745 } else {
2746 /* Read a 32 bit len */
2747 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2748 return ntohl(len);
2749 }
2750 }
2751 }
2752
2753 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2754 unsigned char enc[4];
2755 long long val;
2756
2757 if (enctype == REDIS_RDB_ENC_INT8) {
2758 if (fread(enc,1,1,fp) == 0) return NULL;
2759 val = (signed char)enc[0];
2760 } else if (enctype == REDIS_RDB_ENC_INT16) {
2761 uint16_t v;
2762 if (fread(enc,2,1,fp) == 0) return NULL;
2763 v = enc[0]|(enc[1]<<8);
2764 val = (int16_t)v;
2765 } else if (enctype == REDIS_RDB_ENC_INT32) {
2766 uint32_t v;
2767 if (fread(enc,4,1,fp) == 0) return NULL;
2768 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2769 val = (int32_t)v;
2770 } else {
2771 val = 0; /* anti-warning */
2772 redisAssert(0!=0);
2773 }
2774 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2775 }
2776
2777 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2778 unsigned int len, clen;
2779 unsigned char *c = NULL;
2780 sds val = NULL;
2781
2782 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2783 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2784 if ((c = zmalloc(clen)) == NULL) goto err;
2785 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2786 if (fread(c,clen,1,fp) == 0) goto err;
2787 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2788 zfree(c);
2789 return createObject(REDIS_STRING,val);
2790 err:
2791 zfree(c);
2792 sdsfree(val);
2793 return NULL;
2794 }
2795
2796 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2797 int isencoded;
2798 uint32_t len;
2799 sds val;
2800
2801 len = rdbLoadLen(fp,rdbver,&isencoded);
2802 if (isencoded) {
2803 switch(len) {
2804 case REDIS_RDB_ENC_INT8:
2805 case REDIS_RDB_ENC_INT16:
2806 case REDIS_RDB_ENC_INT32:
2807 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2808 case REDIS_RDB_ENC_LZF:
2809 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2810 default:
2811 redisAssert(0!=0);
2812 }
2813 }
2814
2815 if (len == REDIS_RDB_LENERR) return NULL;
2816 val = sdsnewlen(NULL,len);
2817 if (len && fread(val,len,1,fp) == 0) {
2818 sdsfree(val);
2819 return NULL;
2820 }
2821 return tryObjectSharing(createObject(REDIS_STRING,val));
2822 }
2823
2824 /* For information about double serialization check rdbSaveDoubleValue() */
2825 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2826 char buf[128];
2827 unsigned char len;
2828
2829 if (fread(&len,1,1,fp) == 0) return -1;
2830 switch(len) {
2831 case 255: *val = R_NegInf; return 0;
2832 case 254: *val = R_PosInf; return 0;
2833 case 253: *val = R_Nan; return 0;
2834 default:
2835 if (fread(buf,len,1,fp) == 0) return -1;
2836 buf[len] = '\0';
2837 sscanf(buf, "%lg", val);
2838 return 0;
2839 }
2840 }
2841
2842 static int rdbLoad(char *filename) {
2843 FILE *fp;
2844 robj *keyobj = NULL;
2845 uint32_t dbid;
2846 int type, retval, rdbver;
2847 dict *d = server.db[0].dict;
2848 redisDb *db = server.db+0;
2849 char buf[1024];
2850 time_t expiretime = -1, now = time(NULL);
2851
2852 fp = fopen(filename,"r");
2853 if (!fp) return REDIS_ERR;
2854 if (fread(buf,9,1,fp) == 0) goto eoferr;
2855 buf[9] = '\0';
2856 if (memcmp(buf,"REDIS",5) != 0) {
2857 fclose(fp);
2858 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2859 return REDIS_ERR;
2860 }
2861 rdbver = atoi(buf+5);
2862 if (rdbver > 1) {
2863 fclose(fp);
2864 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2865 return REDIS_ERR;
2866 }
2867 while(1) {
2868 robj *o;
2869
2870 /* Read type. */
2871 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2872 if (type == REDIS_EXPIRETIME) {
2873 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2874 /* We read the time so we need to read the object type again */
2875 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2876 }
2877 if (type == REDIS_EOF) break;
2878 /* Handle SELECT DB opcode as a special case */
2879 if (type == REDIS_SELECTDB) {
2880 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2881 goto eoferr;
2882 if (dbid >= (unsigned)server.dbnum) {
2883 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2884 exit(1);
2885 }
2886 db = server.db+dbid;
2887 d = db->dict;
2888 continue;
2889 }
2890 /* Read key */
2891 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2892
2893 if (type == REDIS_STRING) {
2894 /* Read string value */
2895 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2896 tryObjectEncoding(o);
2897 } else if (type == REDIS_LIST || type == REDIS_SET) {
2898 /* Read list/set value */
2899 uint32_t listlen;
2900
2901 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2902 goto eoferr;
2903 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2904 /* Load every single element of the list/set */
2905 while(listlen--) {
2906 robj *ele;
2907
2908 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2909 tryObjectEncoding(ele);
2910 if (type == REDIS_LIST) {
2911 listAddNodeTail((list*)o->ptr,ele);
2912 } else {
2913 dictAdd((dict*)o->ptr,ele,NULL);
2914 }
2915 }
2916 } else if (type == REDIS_ZSET) {
2917 /* Read list/set value */
2918 uint32_t zsetlen;
2919 zset *zs;
2920
2921 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2922 goto eoferr;
2923 o = createZsetObject();
2924 zs = o->ptr;
2925 /* Load every single element of the list/set */
2926 while(zsetlen--) {
2927 robj *ele;
2928 double *score = zmalloc(sizeof(double));
2929
2930 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2931 tryObjectEncoding(ele);
2932 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2933 dictAdd(zs->dict,ele,score);
2934 zslInsert(zs->zsl,*score,ele);
2935 incrRefCount(ele); /* added to skiplist */
2936 }
2937 } else {
2938 redisAssert(0 != 0);
2939 }
2940 /* Add the new object in the hash table */
2941 retval = dictAdd(d,keyobj,o);
2942 if (retval == DICT_ERR) {
2943 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2944 exit(1);
2945 }
2946 /* Set the expire time if needed */
2947 if (expiretime != -1) {
2948 setExpire(db,keyobj,expiretime);
2949 /* Delete this key if already expired */
2950 if (expiretime < now) deleteKey(db,keyobj);
2951 expiretime = -1;
2952 }
2953 keyobj = o = NULL;
2954 }
2955 fclose(fp);
2956 return REDIS_OK;
2957
2958 eoferr: /* unexpected end of file is handled here with a fatal exit */
2959 if (keyobj) decrRefCount(keyobj);
2960 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2961 exit(1);
2962 return REDIS_ERR; /* Just to avoid warning */
2963 }
2964
2965 /*================================== Commands =============================== */
2966
2967 static void authCommand(redisClient *c) {
2968 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2969 c->authenticated = 1;
2970 addReply(c,shared.ok);
2971 } else {
2972 c->authenticated = 0;
2973 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2974 }
2975 }
2976
2977 static void pingCommand(redisClient *c) {
2978 addReply(c,shared.pong);
2979 }
2980
2981 static void echoCommand(redisClient *c) {
2982 addReplyBulkLen(c,c->argv[1]);
2983 addReply(c,c->argv[1]);
2984 addReply(c,shared.crlf);
2985 }
2986
2987 /*=================================== Strings =============================== */
2988
2989 static void setGenericCommand(redisClient *c, int nx) {
2990 int retval;
2991
2992 if (nx) deleteIfVolatile(c->db,c->argv[1]);
2993 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2994 if (retval == DICT_ERR) {
2995 if (!nx) {
2996 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2997 incrRefCount(c->argv[2]);
2998 } else {
2999 addReply(c,shared.czero);
3000 return;
3001 }
3002 } else {
3003 incrRefCount(c->argv[1]);
3004 incrRefCount(c->argv[2]);
3005 }
3006 server.dirty++;
3007 removeExpire(c->db,c->argv[1]);
3008 addReply(c, nx ? shared.cone : shared.ok);
3009 }
3010
3011 static void setCommand(redisClient *c) {
3012 setGenericCommand(c,0);
3013 }
3014
3015 static void setnxCommand(redisClient *c) {
3016 setGenericCommand(c,1);
3017 }
3018
3019 static void getCommand(redisClient *c) {
3020 robj *o = lookupKeyRead(c->db,c->argv[1]);
3021
3022 if (o == NULL) {
3023 addReply(c,shared.nullbulk);
3024 } else {
3025 if (o->type != REDIS_STRING) {
3026 addReply(c,shared.wrongtypeerr);
3027 } else {
3028 addReplyBulkLen(c,o);
3029 addReply(c,o);
3030 addReply(c,shared.crlf);
3031 }
3032 }
3033 }
3034
3035 static void getsetCommand(redisClient *c) {
3036 getCommand(c);
3037 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3038 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3039 } else {
3040 incrRefCount(c->argv[1]);
3041 }
3042 incrRefCount(c->argv[2]);
3043 server.dirty++;
3044 removeExpire(c->db,c->argv[1]);
3045 }
3046
3047 static void mgetCommand(redisClient *c) {
3048 int j;
3049
3050 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3051 for (j = 1; j < c->argc; j++) {
3052 robj *o = lookupKeyRead(c->db,c->argv[j]);
3053 if (o == NULL) {
3054 addReply(c,shared.nullbulk);
3055 } else {
3056 if (o->type != REDIS_STRING) {
3057 addReply(c,shared.nullbulk);
3058 } else {
3059 addReplyBulkLen(c,o);
3060 addReply(c,o);
3061 addReply(c,shared.crlf);
3062 }
3063 }
3064 }
3065 }
3066
3067 static void msetGenericCommand(redisClient *c, int nx) {
3068 int j, busykeys = 0;
3069
3070 if ((c->argc % 2) == 0) {
3071 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3072 return;
3073 }
3074 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3075 * set nothing at all if at least one already key exists. */
3076 if (nx) {
3077 for (j = 1; j < c->argc; j += 2) {
3078 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3079 busykeys++;
3080 }
3081 }
3082 }
3083 if (busykeys) {
3084 addReply(c, shared.czero);
3085 return;
3086 }
3087
3088 for (j = 1; j < c->argc; j += 2) {
3089 int retval;
3090
3091 tryObjectEncoding(c->argv[j+1]);
3092 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3093 if (retval == DICT_ERR) {
3094 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3095 incrRefCount(c->argv[j+1]);
3096 } else {
3097 incrRefCount(c->argv[j]);
3098 incrRefCount(c->argv[j+1]);
3099 }
3100 removeExpire(c->db,c->argv[j]);
3101 }
3102 server.dirty += (c->argc-1)/2;
3103 addReply(c, nx ? shared.cone : shared.ok);
3104 }
3105
3106 static void msetCommand(redisClient *c) {
3107 msetGenericCommand(c,0);
3108 }
3109
3110 static void msetnxCommand(redisClient *c) {
3111 msetGenericCommand(c,1);
3112 }
3113
3114 static void incrDecrCommand(redisClient *c, long long incr) {
3115 long long value;
3116 int retval;
3117 robj *o;
3118
3119 o = lookupKeyWrite(c->db,c->argv[1]);
3120 if (o == NULL) {
3121 value = 0;
3122 } else {
3123 if (o->type != REDIS_STRING) {
3124 value = 0;
3125 } else {
3126 char *eptr;
3127
3128 if (o->encoding == REDIS_ENCODING_RAW)
3129 value = strtoll(o->ptr, &eptr, 10);
3130 else if (o->encoding == REDIS_ENCODING_INT)
3131 value = (long)o->ptr;
3132 else
3133 redisAssert(1 != 1);
3134 }
3135 }
3136
3137 value += incr;
3138 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3139 tryObjectEncoding(o);
3140 retval = dictAdd(c->db->dict,c->argv[1],o);
3141 if (retval == DICT_ERR) {
3142 dictReplace(c->db->dict,c->argv[1],o);
3143 removeExpire(c->db,c->argv[1]);
3144 } else {
3145 incrRefCount(c->argv[1]);
3146 }
3147 server.dirty++;
3148 addReply(c,shared.colon);
3149 addReply(c,o);
3150 addReply(c,shared.crlf);
3151 }
3152
3153 static void incrCommand(redisClient *c) {
3154 incrDecrCommand(c,1);
3155 }
3156
3157 static void decrCommand(redisClient *c) {
3158 incrDecrCommand(c,-1);
3159 }
3160
3161 static void incrbyCommand(redisClient *c) {
3162 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3163 incrDecrCommand(c,incr);
3164 }
3165
3166 static void decrbyCommand(redisClient *c) {
3167 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3168 incrDecrCommand(c,-incr);
3169 }
3170
3171 /* ========================= Type agnostic commands ========================= */
3172
3173 static void delCommand(redisClient *c) {
3174 int deleted = 0, j;
3175
3176 for (j = 1; j < c->argc; j++) {
3177 if (deleteKey(c->db,c->argv[j])) {
3178 server.dirty++;
3179 deleted++;
3180 }
3181 }
3182 switch(deleted) {
3183 case 0:
3184 addReply(c,shared.czero);
3185 break;
3186 case 1:
3187 addReply(c,shared.cone);
3188 break;
3189 default:
3190 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3191 break;
3192 }
3193 }
3194
3195 static void existsCommand(redisClient *c) {
3196 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3197 }
3198
3199 static void selectCommand(redisClient *c) {
3200 int id = atoi(c->argv[1]->ptr);
3201
3202 if (selectDb(c,id) == REDIS_ERR) {
3203 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3204 } else {
3205 addReply(c,shared.ok);
3206 }
3207 }
3208
3209 static void randomkeyCommand(redisClient *c) {
3210 dictEntry *de;
3211
3212 while(1) {
3213 de = dictGetRandomKey(c->db->dict);
3214 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3215 }
3216 if (de == NULL) {
3217 addReply(c,shared.plus);
3218 addReply(c,shared.crlf);
3219 } else {
3220 addReply(c,shared.plus);
3221 addReply(c,dictGetEntryKey(de));
3222 addReply(c,shared.crlf);
3223 }
3224 }
3225
3226 static void keysCommand(redisClient *c) {
3227 dictIterator *di;
3228 dictEntry *de;
3229 sds pattern = c->argv[1]->ptr;
3230 int plen = sdslen(pattern);
3231 unsigned long numkeys = 0, keyslen = 0;
3232 robj *lenobj = createObject(REDIS_STRING,NULL);
3233
3234 di = dictGetIterator(c->db->dict);
3235 addReply(c,lenobj);
3236 decrRefCount(lenobj);
3237 while((de = dictNext(di)) != NULL) {
3238 robj *keyobj = dictGetEntryKey(de);
3239
3240 sds key = keyobj->ptr;
3241 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3242 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3243 if (expireIfNeeded(c->db,keyobj) == 0) {
3244 if (numkeys != 0)
3245 addReply(c,shared.space);
3246 addReply(c,keyobj);
3247 numkeys++;
3248 keyslen += sdslen(key);
3249 }
3250 }
3251 }
3252 dictReleaseIterator(di);
3253 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3254 addReply(c,shared.crlf);
3255 }
3256
3257 static void dbsizeCommand(redisClient *c) {
3258 addReplySds(c,
3259 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3260 }
3261
3262 static void lastsaveCommand(redisClient *c) {
3263 addReplySds(c,
3264 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3265 }
3266
3267 static void typeCommand(redisClient *c) {
3268 robj *o;
3269 char *type;
3270
3271 o = lookupKeyRead(c->db,c->argv[1]);
3272 if (o == NULL) {
3273 type = "+none";
3274 } else {
3275 switch(o->type) {
3276 case REDIS_STRING: type = "+string"; break;
3277 case REDIS_LIST: type = "+list"; break;
3278 case REDIS_SET: type = "+set"; break;
3279 case REDIS_ZSET: type = "+zset"; break;
3280 default: type = "unknown"; break;
3281 }
3282 }
3283 addReplySds(c,sdsnew(type));
3284 addReply(c,shared.crlf);
3285 }
3286
3287 static void saveCommand(redisClient *c) {
3288 if (server.bgsavechildpid != -1) {
3289 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3290 return;
3291 }
3292 if (rdbSave(server.dbfilename) == REDIS_OK) {
3293 addReply(c,shared.ok);
3294 } else {
3295 addReply(c,shared.err);
3296 }
3297 }
3298
3299 static void bgsaveCommand(redisClient *c) {
3300 if (server.bgsavechildpid != -1) {
3301 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3302 return;
3303 }
3304 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3305 char *status = "+Background saving started\r\n";
3306 addReplySds(c,sdsnew(status));
3307 } else {
3308 addReply(c,shared.err);
3309 }
3310 }
3311
3312 static void shutdownCommand(redisClient *c) {
3313 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3314 /* Kill the saving child if there is a background saving in progress.
3315 We want to avoid race conditions, for instance our saving child may
3316 overwrite the synchronous saving did by SHUTDOWN. */
3317 if (server.bgsavechildpid != -1) {
3318 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3319 kill(server.bgsavechildpid,SIGKILL);
3320 rdbRemoveTempFile(server.bgsavechildpid);
3321 }
3322 /* SYNC SAVE */
3323 if (rdbSave(server.dbfilename) == REDIS_OK) {
3324 if (server.daemonize)
3325 unlink(server.pidfile);
3326 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3327 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3328 exit(1);
3329 } else {
3330 /* Ooops.. error saving! The best we can do is to continue operating.
3331 * Note that if there was a background saving process, in the next
3332 * cron() Redis will be notified that the background saving aborted,
3333 * handling special stuff like slaves pending for synchronization... */
3334 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3335 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3336 }
3337 }
3338
3339 static void renameGenericCommand(redisClient *c, int nx) {
3340 robj *o;
3341
3342 /* To use the same key as src and dst is probably an error */
3343 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3344 addReply(c,shared.sameobjecterr);
3345 return;
3346 }
3347
3348 o = lookupKeyWrite(c->db,c->argv[1]);
3349 if (o == NULL) {
3350 addReply(c,shared.nokeyerr);
3351 return;
3352 }
3353 incrRefCount(o);
3354 deleteIfVolatile(c->db,c->argv[2]);
3355 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3356 if (nx) {
3357 decrRefCount(o);
3358 addReply(c,shared.czero);
3359 return;
3360 }
3361 dictReplace(c->db->dict,c->argv[2],o);
3362 } else {
3363 incrRefCount(c->argv[2]);
3364 }
3365 deleteKey(c->db,c->argv[1]);
3366 server.dirty++;
3367 addReply(c,nx ? shared.cone : shared.ok);
3368 }
3369
3370 static void renameCommand(redisClient *c) {
3371 renameGenericCommand(c,0);
3372 }
3373
3374 static void renamenxCommand(redisClient *c) {
3375 renameGenericCommand(c,1);
3376 }
3377
3378 static void moveCommand(redisClient *c) {
3379 robj *o;
3380 redisDb *src, *dst;
3381 int srcid;
3382
3383 /* Obtain source and target DB pointers */
3384 src = c->db;
3385 srcid = c->db->id;
3386 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3387 addReply(c,shared.outofrangeerr);
3388 return;
3389 }
3390 dst = c->db;
3391 selectDb(c,srcid); /* Back to the source DB */
3392
3393 /* If the user is moving using as target the same
3394 * DB as the source DB it is probably an error. */
3395 if (src == dst) {
3396 addReply(c,shared.sameobjecterr);
3397 return;
3398 }
3399
3400 /* Check if the element exists and get a reference */
3401 o = lookupKeyWrite(c->db,c->argv[1]);
3402 if (!o) {
3403 addReply(c,shared.czero);
3404 return;
3405 }
3406
3407 /* Try to add the element to the target DB */
3408 deleteIfVolatile(dst,c->argv[1]);
3409 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3410 addReply(c,shared.czero);
3411 return;
3412 }
3413 incrRefCount(c->argv[1]);
3414 incrRefCount(o);
3415
3416 /* OK! key moved, free the entry in the source DB */
3417 deleteKey(src,c->argv[1]);
3418 server.dirty++;
3419 addReply(c,shared.cone);
3420 }
3421
3422 /* =================================== Lists ================================ */
3423 static void pushGenericCommand(redisClient *c, int where) {
3424 robj *lobj;
3425 list *list;
3426
3427 lobj = lookupKeyWrite(c->db,c->argv[1]);
3428 if (lobj == NULL) {
3429 lobj = createListObject();
3430 list = lobj->ptr;
3431 if (where == REDIS_HEAD) {
3432 listAddNodeHead(list,c->argv[2]);
3433 } else {
3434 listAddNodeTail(list,c->argv[2]);
3435 }
3436 dictAdd(c->db->dict,c->argv[1],lobj);
3437 incrRefCount(c->argv[1]);
3438 incrRefCount(c->argv[2]);
3439 } else {
3440 if (lobj->type != REDIS_LIST) {
3441 addReply(c,shared.wrongtypeerr);
3442 return;
3443 }
3444 list = lobj->ptr;
3445 if (where == REDIS_HEAD) {
3446 listAddNodeHead(list,c->argv[2]);
3447 } else {
3448 listAddNodeTail(list,c->argv[2]);
3449 }
3450 incrRefCount(c->argv[2]);
3451 }
3452 server.dirty++;
3453 addReply(c,shared.ok);
3454 }
3455
3456 static void lpushCommand(redisClient *c) {
3457 pushGenericCommand(c,REDIS_HEAD);
3458 }
3459
3460 static void rpushCommand(redisClient *c) {
3461 pushGenericCommand(c,REDIS_TAIL);
3462 }
3463
3464 static void llenCommand(redisClient *c) {
3465 robj *o;
3466 list *l;
3467
3468 o = lookupKeyRead(c->db,c->argv[1]);
3469 if (o == NULL) {
3470 addReply(c,shared.czero);
3471 return;
3472 } else {
3473 if (o->type != REDIS_LIST) {
3474 addReply(c,shared.wrongtypeerr);
3475 } else {
3476 l = o->ptr;
3477 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3478 }
3479 }
3480 }
3481
3482 static void lindexCommand(redisClient *c) {
3483 robj *o;
3484 int index = atoi(c->argv[2]->ptr);
3485
3486 o = lookupKeyRead(c->db,c->argv[1]);
3487 if (o == NULL) {
3488 addReply(c,shared.nullbulk);
3489 } else {
3490 if (o->type != REDIS_LIST) {
3491 addReply(c,shared.wrongtypeerr);
3492 } else {
3493 list *list = o->ptr;
3494 listNode *ln;
3495
3496 ln = listIndex(list, index);
3497 if (ln == NULL) {
3498 addReply(c,shared.nullbulk);
3499 } else {
3500 robj *ele = listNodeValue(ln);
3501 addReplyBulkLen(c,ele);
3502 addReply(c,ele);
3503 addReply(c,shared.crlf);
3504 }
3505 }
3506 }
3507 }
3508
3509 static void lsetCommand(redisClient *c) {
3510 robj *o;
3511 int index = atoi(c->argv[2]->ptr);
3512
3513 o = lookupKeyWrite(c->db,c->argv[1]);
3514 if (o == NULL) {
3515 addReply(c,shared.nokeyerr);
3516 } else {
3517 if (o->type != REDIS_LIST) {
3518 addReply(c,shared.wrongtypeerr);
3519 } else {
3520 list *list = o->ptr;
3521 listNode *ln;
3522
3523 ln = listIndex(list, index);
3524 if (ln == NULL) {
3525 addReply(c,shared.outofrangeerr);
3526 } else {
3527 robj *ele = listNodeValue(ln);
3528
3529 decrRefCount(ele);
3530 listNodeValue(ln) = c->argv[3];
3531 incrRefCount(c->argv[3]);
3532 addReply(c,shared.ok);
3533 server.dirty++;
3534 }
3535 }
3536 }
3537 }
3538
3539 static void popGenericCommand(redisClient *c, int where) {
3540 robj *o;
3541
3542 o = lookupKeyWrite(c->db,c->argv[1]);
3543 if (o == NULL) {
3544 addReply(c,shared.nullbulk);
3545 } else {
3546 if (o->type != REDIS_LIST) {
3547 addReply(c,shared.wrongtypeerr);
3548 } else {
3549 list *list = o->ptr;
3550 listNode *ln;
3551
3552 if (where == REDIS_HEAD)
3553 ln = listFirst(list);
3554 else
3555 ln = listLast(list);
3556
3557 if (ln == NULL) {
3558 addReply(c,shared.nullbulk);
3559 } else {
3560 robj *ele = listNodeValue(ln);
3561 addReplyBulkLen(c,ele);
3562 addReply(c,ele);
3563 addReply(c,shared.crlf);
3564 listDelNode(list,ln);
3565 server.dirty++;
3566 }
3567 }
3568 }
3569 }
3570
3571 static void lpopCommand(redisClient *c) {
3572 popGenericCommand(c,REDIS_HEAD);
3573 }
3574
3575 static void rpopCommand(redisClient *c) {
3576 popGenericCommand(c,REDIS_TAIL);
3577 }
3578
3579 static void lrangeCommand(redisClient *c) {
3580 robj *o;
3581 int start = atoi(c->argv[2]->ptr);
3582 int end = atoi(c->argv[3]->ptr);
3583
3584 o = lookupKeyRead(c->db,c->argv[1]);
3585 if (o == NULL) {
3586 addReply(c,shared.nullmultibulk);
3587 } else {
3588 if (o->type != REDIS_LIST) {
3589 addReply(c,shared.wrongtypeerr);
3590 } else {
3591 list *list = o->ptr;
3592 listNode *ln;
3593 int llen = listLength(list);
3594 int rangelen, j;
3595 robj *ele;
3596
3597 /* convert negative indexes */
3598 if (start < 0) start = llen+start;
3599 if (end < 0) end = llen+end;
3600 if (start < 0) start = 0;
3601 if (end < 0) end = 0;
3602
3603 /* indexes sanity checks */
3604 if (start > end || start >= llen) {
3605 /* Out of range start or start > end result in empty list */
3606 addReply(c,shared.emptymultibulk);
3607 return;
3608 }
3609 if (end >= llen) end = llen-1;
3610 rangelen = (end-start)+1;
3611
3612 /* Return the result in form of a multi-bulk reply */
3613 ln = listIndex(list, start);
3614 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3615 for (j = 0; j < rangelen; j++) {
3616 ele = listNodeValue(ln);
3617 addReplyBulkLen(c,ele);
3618 addReply(c,ele);
3619 addReply(c,shared.crlf);
3620 ln = ln->next;
3621 }
3622 }
3623 }
3624 }
3625
3626 static void ltrimCommand(redisClient *c) {
3627 robj *o;
3628 int start = atoi(c->argv[2]->ptr);
3629 int end = atoi(c->argv[3]->ptr);
3630
3631 o = lookupKeyWrite(c->db,c->argv[1]);
3632 if (o == NULL) {
3633 addReply(c,shared.nokeyerr);
3634 } else {
3635 if (o->type != REDIS_LIST) {
3636 addReply(c,shared.wrongtypeerr);
3637 } else {
3638 list *list = o->ptr;
3639 listNode *ln;
3640 int llen = listLength(list);
3641 int j, ltrim, rtrim;
3642
3643 /* convert negative indexes */
3644 if (start < 0) start = llen+start;
3645 if (end < 0) end = llen+end;
3646 if (start < 0) start = 0;
3647 if (end < 0) end = 0;
3648
3649 /* indexes sanity checks */
3650 if (start > end || start >= llen) {
3651 /* Out of range start or start > end result in empty list */
3652 ltrim = llen;
3653 rtrim = 0;
3654 } else {
3655 if (end >= llen) end = llen-1;
3656 ltrim = start;
3657 rtrim = llen-end-1;
3658 }
3659
3660 /* Remove list elements to perform the trim */
3661 for (j = 0; j < ltrim; j++) {
3662 ln = listFirst(list);
3663 listDelNode(list,ln);
3664 }
3665 for (j = 0; j < rtrim; j++) {
3666 ln = listLast(list);
3667 listDelNode(list,ln);
3668 }
3669 server.dirty++;
3670 addReply(c,shared.ok);
3671 }
3672 }
3673 }
3674
3675 static void lremCommand(redisClient *c) {
3676 robj *o;
3677
3678 o = lookupKeyWrite(c->db,c->argv[1]);
3679 if (o == NULL) {
3680 addReply(c,shared.czero);
3681 } else {
3682 if (o->type != REDIS_LIST) {
3683 addReply(c,shared.wrongtypeerr);
3684 } else {
3685 list *list = o->ptr;
3686 listNode *ln, *next;
3687 int toremove = atoi(c->argv[2]->ptr);
3688 int removed = 0;
3689 int fromtail = 0;
3690
3691 if (toremove < 0) {
3692 toremove = -toremove;
3693 fromtail = 1;
3694 }
3695 ln = fromtail ? list->tail : list->head;
3696 while (ln) {
3697 robj *ele = listNodeValue(ln);
3698
3699 next = fromtail ? ln->prev : ln->next;
3700 if (compareStringObjects(ele,c->argv[3]) == 0) {
3701 listDelNode(list,ln);
3702 server.dirty++;
3703 removed++;
3704 if (toremove && removed == toremove) break;
3705 }
3706 ln = next;
3707 }
3708 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3709 }
3710 }
3711 }
3712
3713 /* This is the semantic of this command:
3714 * RPOPLPUSH srclist dstlist:
3715 * IF LLEN(srclist) > 0
3716 * element = RPOP srclist
3717 * LPUSH dstlist element
3718 * RETURN element
3719 * ELSE
3720 * RETURN nil
3721 * END
3722 * END
3723 *
3724 * The idea is to be able to get an element from a list in a reliable way
3725 * since the element is not just returned but pushed against another list
3726 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3727 */
3728 static void rpoplpushcommand(redisClient *c) {
3729 robj *sobj;
3730
3731 sobj = lookupKeyWrite(c->db,c->argv[1]);
3732 if (sobj == NULL) {
3733 addReply(c,shared.nullbulk);
3734 } else {
3735 if (sobj->type != REDIS_LIST) {
3736 addReply(c,shared.wrongtypeerr);
3737 } else {
3738 list *srclist = sobj->ptr;
3739 listNode *ln = listLast(srclist);
3740
3741 if (ln == NULL) {
3742 addReply(c,shared.nullbulk);
3743 } else {
3744 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3745 robj *ele = listNodeValue(ln);
3746 list *dstlist;
3747
3748 if (dobj == NULL) {
3749
3750 /* Create the list if the key does not exist */
3751 dobj = createListObject();
3752 dictAdd(c->db->dict,c->argv[2],dobj);
3753 incrRefCount(c->argv[2]);
3754 } else if (dobj->type != REDIS_LIST) {
3755 addReply(c,shared.wrongtypeerr);
3756 return;
3757 }
3758 /* Add the element to the target list */
3759 dstlist = dobj->ptr;
3760 listAddNodeHead(dstlist,ele);
3761 incrRefCount(ele);
3762
3763 /* Send the element to the client as reply as well */
3764 addReplyBulkLen(c,ele);
3765 addReply(c,ele);
3766 addReply(c,shared.crlf);
3767
3768 /* Finally remove the element from the source list */
3769 listDelNode(srclist,ln);
3770 server.dirty++;
3771 }
3772 }
3773 }
3774 }
3775
3776
3777 /* ==================================== Sets ================================ */
3778
3779 static void saddCommand(redisClient *c) {
3780 robj *set;
3781
3782 set = lookupKeyWrite(c->db,c->argv[1]);
3783 if (set == NULL) {
3784 set = createSetObject();
3785 dictAdd(c->db->dict,c->argv[1],set);
3786 incrRefCount(c->argv[1]);
3787 } else {
3788 if (set->type != REDIS_SET) {
3789 addReply(c,shared.wrongtypeerr);
3790 return;
3791 }
3792 }
3793 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3794 incrRefCount(c->argv[2]);
3795 server.dirty++;
3796 addReply(c,shared.cone);
3797 } else {
3798 addReply(c,shared.czero);
3799 }
3800 }
3801
3802 static void sremCommand(redisClient *c) {
3803 robj *set;
3804
3805 set = lookupKeyWrite(c->db,c->argv[1]);
3806 if (set == NULL) {
3807 addReply(c,shared.czero);
3808 } else {
3809 if (set->type != REDIS_SET) {
3810 addReply(c,shared.wrongtypeerr);
3811 return;
3812 }
3813 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3814 server.dirty++;
3815 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3816 addReply(c,shared.cone);
3817 } else {
3818 addReply(c,shared.czero);
3819 }
3820 }
3821 }
3822
3823 static void smoveCommand(redisClient *c) {
3824 robj *srcset, *dstset;
3825
3826 srcset = lookupKeyWrite(c->db,c->argv[1]);
3827 dstset = lookupKeyWrite(c->db,c->argv[2]);
3828
3829 /* If the source key does not exist return 0, if it's of the wrong type
3830 * raise an error */
3831 if (srcset == NULL || srcset->type != REDIS_SET) {
3832 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3833 return;
3834 }
3835 /* Error if the destination key is not a set as well */
3836 if (dstset && dstset->type != REDIS_SET) {
3837 addReply(c,shared.wrongtypeerr);
3838 return;
3839 }
3840 /* Remove the element from the source set */
3841 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3842 /* Key not found in the src set! return zero */
3843 addReply(c,shared.czero);
3844 return;
3845 }
3846 server.dirty++;
3847 /* Add the element to the destination set */
3848 if (!dstset) {
3849 dstset = createSetObject();
3850 dictAdd(c->db->dict,c->argv[2],dstset);
3851 incrRefCount(c->argv[2]);
3852 }
3853 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3854 incrRefCount(c->argv[3]);
3855 addReply(c,shared.cone);
3856 }
3857
3858 static void sismemberCommand(redisClient *c) {
3859 robj *set;
3860
3861 set = lookupKeyRead(c->db,c->argv[1]);
3862 if (set == NULL) {
3863 addReply(c,shared.czero);
3864 } else {
3865 if (set->type != REDIS_SET) {
3866 addReply(c,shared.wrongtypeerr);
3867 return;
3868 }
3869 if (dictFind(set->ptr,c->argv[2]))
3870 addReply(c,shared.cone);
3871 else
3872 addReply(c,shared.czero);
3873 }
3874 }
3875
3876 static void scardCommand(redisClient *c) {
3877 robj *o;
3878 dict *s;
3879
3880 o = lookupKeyRead(c->db,c->argv[1]);
3881 if (o == NULL) {
3882 addReply(c,shared.czero);
3883 return;
3884 } else {
3885 if (o->type != REDIS_SET) {
3886 addReply(c,shared.wrongtypeerr);
3887 } else {
3888 s = o->ptr;
3889 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
3890 dictSize(s)));
3891 }
3892 }
3893 }
3894
3895 static void spopCommand(redisClient *c) {
3896 robj *set;
3897 dictEntry *de;
3898
3899 set = lookupKeyWrite(c->db,c->argv[1]);
3900 if (set == NULL) {
3901 addReply(c,shared.nullbulk);
3902 } else {
3903 if (set->type != REDIS_SET) {
3904 addReply(c,shared.wrongtypeerr);
3905 return;
3906 }
3907 de = dictGetRandomKey(set->ptr);
3908 if (de == NULL) {
3909 addReply(c,shared.nullbulk);
3910 } else {
3911 robj *ele = dictGetEntryKey(de);
3912
3913 addReplyBulkLen(c,ele);
3914 addReply(c,ele);
3915 addReply(c,shared.crlf);
3916 dictDelete(set->ptr,ele);
3917 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3918 server.dirty++;
3919 }
3920 }
3921 }
3922
3923 static void srandmemberCommand(redisClient *c) {
3924 robj *set;
3925 dictEntry *de;
3926
3927 set = lookupKeyRead(c->db,c->argv[1]);
3928 if (set == NULL) {
3929 addReply(c,shared.nullbulk);
3930 } else {
3931 if (set->type != REDIS_SET) {
3932 addReply(c,shared.wrongtypeerr);
3933 return;
3934 }
3935 de = dictGetRandomKey(set->ptr);
3936 if (de == NULL) {
3937 addReply(c,shared.nullbulk);
3938 } else {
3939 robj *ele = dictGetEntryKey(de);
3940
3941 addReplyBulkLen(c,ele);
3942 addReply(c,ele);
3943 addReply(c,shared.crlf);
3944 }
3945 }
3946 }
3947
3948 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3949 dict **d1 = (void*) s1, **d2 = (void*) s2;
3950
3951 return dictSize(*d1)-dictSize(*d2);
3952 }
3953
3954 static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
3955 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3956 dictIterator *di;
3957 dictEntry *de;
3958 robj *lenobj = NULL, *dstset = NULL;
3959 unsigned long j, cardinality = 0;
3960
3961 for (j = 0; j < setsnum; j++) {
3962 robj *setobj;
3963
3964 setobj = dstkey ?
3965 lookupKeyWrite(c->db,setskeys[j]) :
3966 lookupKeyRead(c->db,setskeys[j]);
3967 if (!setobj) {
3968 zfree(dv);
3969 if (dstkey) {
3970 deleteKey(c->db,dstkey);
3971 addReply(c,shared.czero);
3972 } else {
3973 addReply(c,shared.nullmultibulk);
3974 }
3975 return;
3976 }
3977 if (setobj->type != REDIS_SET) {
3978 zfree(dv);
3979 addReply(c,shared.wrongtypeerr);
3980 return;
3981 }
3982 dv[j] = setobj->ptr;
3983 }
3984 /* Sort sets from the smallest to largest, this will improve our
3985 * algorithm's performace */
3986 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3987
3988 /* The first thing we should output is the total number of elements...
3989 * since this is a multi-bulk write, but at this stage we don't know
3990 * the intersection set size, so we use a trick, append an empty object
3991 * to the output list and save the pointer to later modify it with the
3992 * right length */
3993 if (!dstkey) {
3994 lenobj = createObject(REDIS_STRING,NULL);
3995 addReply(c,lenobj);
3996 decrRefCount(lenobj);
3997 } else {
3998 /* If we have a target key where to store the resulting set
3999 * create this key with an empty set inside */
4000 dstset = createSetObject();
4001 }
4002
4003 /* Iterate all the elements of the first (smallest) set, and test
4004 * the element against all the other sets, if at least one set does
4005 * not include the element it is discarded */
4006 di = dictGetIterator(dv[0]);
4007
4008 while((de = dictNext(di)) != NULL) {
4009 robj *ele;
4010
4011 for (j = 1; j < setsnum; j++)
4012 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4013 if (j != setsnum)
4014 continue; /* at least one set does not contain the member */
4015 ele = dictGetEntryKey(de);
4016 if (!dstkey) {
4017 addReplyBulkLen(c,ele);
4018 addReply(c,ele);
4019 addReply(c,shared.crlf);
4020 cardinality++;
4021 } else {
4022 dictAdd(dstset->ptr,ele,NULL);
4023 incrRefCount(ele);
4024 }
4025 }
4026 dictReleaseIterator(di);
4027
4028 if (dstkey) {
4029 /* Store the resulting set into the target */
4030 deleteKey(c->db,dstkey);
4031 dictAdd(c->db->dict,dstkey,dstset);
4032 incrRefCount(dstkey);
4033 }
4034
4035 if (!dstkey) {
4036 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
4037 } else {
4038 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4039 dictSize((dict*)dstset->ptr)));
4040 server.dirty++;
4041 }
4042 zfree(dv);
4043 }
4044
4045 static void sinterCommand(redisClient *c) {
4046 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4047 }
4048
4049 static void sinterstoreCommand(redisClient *c) {
4050 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4051 }
4052
4053 #define REDIS_OP_UNION 0
4054 #define REDIS_OP_DIFF 1
4055
4056 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4057 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4058 dictIterator *di;
4059 dictEntry *de;
4060 robj *dstset = NULL;
4061 int j, cardinality = 0;
4062
4063 for (j = 0; j < setsnum; j++) {
4064 robj *setobj;
4065
4066 setobj = dstkey ?
4067 lookupKeyWrite(c->db,setskeys[j]) :
4068 lookupKeyRead(c->db,setskeys[j]);
4069 if (!setobj) {
4070 dv[j] = NULL;
4071 continue;
4072 }
4073 if (setobj->type != REDIS_SET) {
4074 zfree(dv);
4075 addReply(c,shared.wrongtypeerr);
4076 return;
4077 }
4078 dv[j] = setobj->ptr;
4079 }
4080
4081 /* We need a temp set object to store our union. If the dstkey
4082 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4083 * this set object will be the resulting object to set into the target key*/
4084 dstset = createSetObject();
4085
4086 /* Iterate all the elements of all the sets, add every element a single
4087 * time to the result set */
4088 for (j = 0; j < setsnum; j++) {
4089 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4090 if (!dv[j]) continue; /* non existing keys are like empty sets */
4091
4092 di = dictGetIterator(dv[j]);
4093
4094 while((de = dictNext(di)) != NULL) {
4095 robj *ele;
4096
4097 /* dictAdd will not add the same element multiple times */
4098 ele = dictGetEntryKey(de);
4099 if (op == REDIS_OP_UNION || j == 0) {
4100 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4101 incrRefCount(ele);
4102 cardinality++;
4103 }
4104 } else if (op == REDIS_OP_DIFF) {
4105 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4106 cardinality--;
4107 }
4108 }
4109 }
4110 dictReleaseIterator(di);
4111
4112 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4113 }
4114
4115 /* Output the content of the resulting set, if not in STORE mode */
4116 if (!dstkey) {
4117 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4118 di = dictGetIterator(dstset->ptr);
4119 while((de = dictNext(di)) != NULL) {
4120 robj *ele;
4121
4122 ele = dictGetEntryKey(de);
4123 addReplyBulkLen(c,ele);
4124 addReply(c,ele);
4125 addReply(c,shared.crlf);
4126 }
4127 dictReleaseIterator(di);
4128 } else {
4129 /* If we have a target key where to store the resulting set
4130 * create this key with the result set inside */
4131 deleteKey(c->db,dstkey);
4132 dictAdd(c->db->dict,dstkey,dstset);
4133 incrRefCount(dstkey);
4134 }
4135
4136 /* Cleanup */
4137 if (!dstkey) {
4138 decrRefCount(dstset);
4139 } else {
4140 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4141 dictSize((dict*)dstset->ptr)));
4142 server.dirty++;
4143 }
4144 zfree(dv);
4145 }
4146
4147 static void sunionCommand(redisClient *c) {
4148 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4149 }
4150
4151 static void sunionstoreCommand(redisClient *c) {
4152 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4153 }
4154
4155 static void sdiffCommand(redisClient *c) {
4156 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4157 }
4158
4159 static void sdiffstoreCommand(redisClient *c) {
4160 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4161 }
4162
4163 /* ==================================== ZSets =============================== */
4164
4165 /* ZSETs are ordered sets using two data structures to hold the same elements
4166 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4167 * data structure.
4168 *
4169 * The elements are added to an hash table mapping Redis objects to scores.
4170 * At the same time the elements are added to a skip list mapping scores
4171 * to Redis objects (so objects are sorted by scores in this "view"). */
4172
4173 /* This skiplist implementation is almost a C translation of the original
4174 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4175 * Alternative to Balanced Trees", modified in three ways:
4176 * a) this implementation allows for repeated values.
4177 * b) the comparison is not just by key (our 'score') but by satellite data.
4178 * c) there is a back pointer, so it's a doubly linked list with the back
4179 * pointers being only at "level 1". This allows to traverse the list
4180 * from tail to head, useful for ZREVRANGE. */
4181
4182 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4183 zskiplistNode *zn = zmalloc(sizeof(*zn));
4184
4185 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4186 zn->score = score;
4187 zn->obj = obj;
4188 return zn;
4189 }
4190
4191 static zskiplist *zslCreate(void) {
4192 int j;
4193 zskiplist *zsl;
4194
4195 zsl = zmalloc(sizeof(*zsl));
4196 zsl->level = 1;
4197 zsl->length = 0;
4198 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4199 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4200 zsl->header->forward[j] = NULL;
4201 zsl->header->backward = NULL;
4202 zsl->tail = NULL;
4203 return zsl;
4204 }
4205
4206 static void zslFreeNode(zskiplistNode *node) {
4207 decrRefCount(node->obj);
4208 zfree(node->forward);
4209 zfree(node);
4210 }
4211
4212 static void zslFree(zskiplist *zsl) {
4213 zskiplistNode *node = zsl->header->forward[0], *next;
4214
4215 zfree(zsl->header->forward);
4216 zfree(zsl->header);
4217 while(node) {
4218 next = node->forward[0];
4219 zslFreeNode(node);
4220 node = next;
4221 }
4222 zfree(zsl);
4223 }
4224
4225 static int zslRandomLevel(void) {
4226 int level = 1;
4227 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4228 level += 1;
4229 return level;
4230 }
4231
4232 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4233 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4234 int i, level;
4235
4236 x = zsl->header;
4237 for (i = zsl->level-1; i >= 0; i--) {
4238 while (x->forward[i] &&
4239 (x->forward[i]->score < score ||
4240 (x->forward[i]->score == score &&
4241 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4242 x = x->forward[i];
4243 update[i] = x;
4244 }
4245 /* we assume the key is not already inside, since we allow duplicated
4246 * scores, and the re-insertion of score and redis object should never
4247 * happpen since the caller of zslInsert() should test in the hash table
4248 * if the element is already inside or not. */
4249 level = zslRandomLevel();
4250 if (level > zsl->level) {
4251 for (i = zsl->level; i < level; i++)
4252 update[i] = zsl->header;
4253 zsl->level = level;
4254 }
4255 x = zslCreateNode(level,score,obj);
4256 for (i = 0; i < level; i++) {
4257 x->forward[i] = update[i]->forward[i];
4258 update[i]->forward[i] = x;
4259 }
4260 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4261 if (x->forward[0])
4262 x->forward[0]->backward = x;
4263 else
4264 zsl->tail = x;
4265 zsl->length++;
4266 }
4267
4268 /* Delete an element with matching score/object from the skiplist. */
4269 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4270 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4271 int i;
4272
4273 x = zsl->header;
4274 for (i = zsl->level-1; i >= 0; i--) {
4275 while (x->forward[i] &&
4276 (x->forward[i]->score < score ||
4277 (x->forward[i]->score == score &&
4278 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4279 x = x->forward[i];
4280 update[i] = x;
4281 }
4282 /* We may have multiple elements with the same score, what we need
4283 * is to find the element with both the right score and object. */
4284 x = x->forward[0];
4285 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4286 for (i = 0; i < zsl->level; i++) {
4287 if (update[i]->forward[i] != x) break;
4288 update[i]->forward[i] = x->forward[i];
4289 }
4290 if (x->forward[0]) {
4291 x->forward[0]->backward = (x->backward == zsl->header) ?
4292 NULL : x->backward;
4293 } else {
4294 zsl->tail = x->backward;
4295 }
4296 zslFreeNode(x);
4297 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4298 zsl->level--;
4299 zsl->length--;
4300 return 1;
4301 } else {
4302 return 0; /* not found */
4303 }
4304 return 0; /* not found */
4305 }
4306
4307 /* Delete all the elements with score between min and max from the skiplist.
4308 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4309 * Note that this function takes the reference to the hash table view of the
4310 * sorted set, in order to remove the elements from the hash table too. */
4311 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4312 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4313 unsigned long removed = 0;
4314 int i;
4315
4316 x = zsl->header;
4317 for (i = zsl->level-1; i >= 0; i--) {
4318 while (x->forward[i] && x->forward[i]->score < min)
4319 x = x->forward[i];
4320 update[i] = x;
4321 }
4322 /* We may have multiple elements with the same score, what we need
4323 * is to find the element with both the right score and object. */
4324 x = x->forward[0];
4325 while (x && x->score <= max) {
4326 zskiplistNode *next;
4327
4328 for (i = 0; i < zsl->level; i++) {
4329 if (update[i]->forward[i] != x) break;
4330 update[i]->forward[i] = x->forward[i];
4331 }
4332 if (x->forward[0]) {
4333 x->forward[0]->backward = (x->backward == zsl->header) ?
4334 NULL : x->backward;
4335 } else {
4336 zsl->tail = x->backward;
4337 }
4338 next = x->forward[0];
4339 dictDelete(dict,x->obj);
4340 zslFreeNode(x);
4341 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4342 zsl->level--;
4343 zsl->length--;
4344 removed++;
4345 x = next;
4346 }
4347 return removed; /* not found */
4348 }
4349
4350 /* Find the first node having a score equal or greater than the specified one.
4351 * Returns NULL if there is no match. */
4352 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4353 zskiplistNode *x;
4354 int i;
4355
4356 x = zsl->header;
4357 for (i = zsl->level-1; i >= 0; i--) {
4358 while (x->forward[i] && x->forward[i]->score < score)
4359 x = x->forward[i];
4360 }
4361 /* We may have multiple elements with the same score, what we need
4362 * is to find the element with both the right score and object. */
4363 return x->forward[0];
4364 }
4365
4366 /* The actual Z-commands implementations */
4367
4368 /* This generic command implements both ZADD and ZINCRBY.
4369 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4370 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4371 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4372 robj *zsetobj;
4373 zset *zs;
4374 double *score;
4375
4376 zsetobj = lookupKeyWrite(c->db,key);
4377 if (zsetobj == NULL) {
4378 zsetobj = createZsetObject();
4379 dictAdd(c->db->dict,key,zsetobj);
4380 incrRefCount(key);
4381 } else {
4382 if (zsetobj->type != REDIS_ZSET) {
4383 addReply(c,shared.wrongtypeerr);
4384 return;
4385 }
4386 }
4387 zs = zsetobj->ptr;
4388
4389 /* Ok now since we implement both ZADD and ZINCRBY here the code
4390 * needs to handle the two different conditions. It's all about setting
4391 * '*score', that is, the new score to set, to the right value. */
4392 score = zmalloc(sizeof(double));
4393 if (doincrement) {
4394 dictEntry *de;
4395
4396 /* Read the old score. If the element was not present starts from 0 */
4397 de = dictFind(zs->dict,ele);
4398 if (de) {
4399 double *oldscore = dictGetEntryVal(de);
4400 *score = *oldscore + scoreval;
4401 } else {
4402 *score = scoreval;
4403 }
4404 } else {
4405 *score = scoreval;
4406 }
4407
4408 /* What follows is a simple remove and re-insert operation that is common
4409 * to both ZADD and ZINCRBY... */
4410 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4411 /* case 1: New element */
4412 incrRefCount(ele); /* added to hash */
4413 zslInsert(zs->zsl,*score,ele);
4414 incrRefCount(ele); /* added to skiplist */
4415 server.dirty++;
4416 if (doincrement)
4417 addReplyDouble(c,*score);
4418 else
4419 addReply(c,shared.cone);
4420 } else {
4421 dictEntry *de;
4422 double *oldscore;
4423
4424 /* case 2: Score update operation */
4425 de = dictFind(zs->dict,ele);
4426 redisAssert(de != NULL);
4427 oldscore = dictGetEntryVal(de);
4428 if (*score != *oldscore) {
4429 int deleted;
4430
4431 /* Remove and insert the element in the skip list with new score */
4432 deleted = zslDelete(zs->zsl,*oldscore,ele);
4433 redisAssert(deleted != 0);
4434 zslInsert(zs->zsl,*score,ele);
4435 incrRefCount(ele);
4436 /* Update the score in the hash table */
4437 dictReplace(zs->dict,ele,score);
4438 server.dirty++;
4439 } else {
4440 zfree(score);
4441 }
4442 if (doincrement)
4443 addReplyDouble(c,*score);
4444 else
4445 addReply(c,shared.czero);
4446 }
4447 }
4448
4449 static void zaddCommand(redisClient *c) {
4450 double scoreval;
4451
4452 scoreval = strtod(c->argv[2]->ptr,NULL);
4453 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4454 }
4455
4456 static void zincrbyCommand(redisClient *c) {
4457 double scoreval;
4458
4459 scoreval = strtod(c->argv[2]->ptr,NULL);
4460 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4461 }
4462
4463 static void zremCommand(redisClient *c) {
4464 robj *zsetobj;
4465 zset *zs;
4466
4467 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4468 if (zsetobj == NULL) {
4469 addReply(c,shared.czero);
4470 } else {
4471 dictEntry *de;
4472 double *oldscore;
4473 int deleted;
4474
4475 if (zsetobj->type != REDIS_ZSET) {
4476 addReply(c,shared.wrongtypeerr);
4477 return;
4478 }
4479 zs = zsetobj->ptr;
4480 de = dictFind(zs->dict,c->argv[2]);
4481 if (de == NULL) {
4482 addReply(c,shared.czero);
4483 return;
4484 }
4485 /* Delete from the skiplist */
4486 oldscore = dictGetEntryVal(de);
4487 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4488 redisAssert(deleted != 0);
4489
4490 /* Delete from the hash table */
4491 dictDelete(zs->dict,c->argv[2]);
4492 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4493 server.dirty++;
4494 addReply(c,shared.cone);
4495 }
4496 }
4497
4498 static void zremrangebyscoreCommand(redisClient *c) {
4499 double min = strtod(c->argv[2]->ptr,NULL);
4500 double max = strtod(c->argv[3]->ptr,NULL);
4501 robj *zsetobj;
4502 zset *zs;
4503
4504 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4505 if (zsetobj == NULL) {
4506 addReply(c,shared.czero);
4507 } else {
4508 long deleted;
4509
4510 if (zsetobj->type != REDIS_ZSET) {
4511 addReply(c,shared.wrongtypeerr);
4512 return;
4513 }
4514 zs = zsetobj->ptr;
4515 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4516 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4517 server.dirty += deleted;
4518 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4519 }
4520 }
4521
4522 static void zrangeGenericCommand(redisClient *c, int reverse) {
4523 robj *o;
4524 int start = atoi(c->argv[2]->ptr);
4525 int end = atoi(c->argv[3]->ptr);
4526
4527 o = lookupKeyRead(c->db,c->argv[1]);
4528 if (o == NULL) {
4529 addReply(c,shared.nullmultibulk);
4530 } else {
4531 if (o->type != REDIS_ZSET) {
4532 addReply(c,shared.wrongtypeerr);
4533 } else {
4534 zset *zsetobj = o->ptr;
4535 zskiplist *zsl = zsetobj->zsl;
4536 zskiplistNode *ln;
4537
4538 int llen = zsl->length;
4539 int rangelen, j;
4540 robj *ele;
4541
4542 /* convert negative indexes */
4543 if (start < 0) start = llen+start;
4544 if (end < 0) end = llen+end;
4545 if (start < 0) start = 0;
4546 if (end < 0) end = 0;
4547
4548 /* indexes sanity checks */
4549 if (start > end || start >= llen) {
4550 /* Out of range start or start > end result in empty list */
4551 addReply(c,shared.emptymultibulk);
4552 return;
4553 }
4554 if (end >= llen) end = llen-1;
4555 rangelen = (end-start)+1;
4556
4557 /* Return the result in form of a multi-bulk reply */
4558 if (reverse) {
4559 ln = zsl->tail;
4560 while (start--)
4561 ln = ln->backward;
4562 } else {
4563 ln = zsl->header->forward[0];
4564 while (start--)
4565 ln = ln->forward[0];
4566 }
4567
4568 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4569 for (j = 0; j < rangelen; j++) {
4570 ele = ln->obj;
4571 addReplyBulkLen(c,ele);
4572 addReply(c,ele);
4573 addReply(c,shared.crlf);
4574 ln = reverse ? ln->backward : ln->forward[0];
4575 }
4576 }
4577 }
4578 }
4579
4580 static void zrangeCommand(redisClient *c) {
4581 zrangeGenericCommand(c,0);
4582 }
4583
4584 static void zrevrangeCommand(redisClient *c) {
4585 zrangeGenericCommand(c,1);
4586 }
4587
4588 static void zrangebyscoreCommand(redisClient *c) {
4589 robj *o;
4590 double min = strtod(c->argv[2]->ptr,NULL);
4591 double max = strtod(c->argv[3]->ptr,NULL);
4592 int offset = 0, limit = -1;
4593
4594 if (c->argc != 4 && c->argc != 7) {
4595 addReplySds(c,
4596 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4597 return;
4598 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4599 addReply(c,shared.syntaxerr);
4600 return;
4601 } else if (c->argc == 7) {
4602 offset = atoi(c->argv[5]->ptr);
4603 limit = atoi(c->argv[6]->ptr);
4604 if (offset < 0) offset = 0;
4605 }
4606
4607 o = lookupKeyRead(c->db,c->argv[1]);
4608 if (o == NULL) {
4609 addReply(c,shared.nullmultibulk);
4610 } else {
4611 if (o->type != REDIS_ZSET) {
4612 addReply(c,shared.wrongtypeerr);
4613 } else {
4614 zset *zsetobj = o->ptr;
4615 zskiplist *zsl = zsetobj->zsl;
4616 zskiplistNode *ln;
4617 robj *ele, *lenobj;
4618 unsigned int rangelen = 0;
4619
4620 /* Get the first node with the score >= min */
4621 ln = zslFirstWithScore(zsl,min);
4622 if (ln == NULL) {
4623 /* No element matching the speciifed interval */
4624 addReply(c,shared.emptymultibulk);
4625 return;
4626 }
4627
4628 /* We don't know in advance how many matching elements there
4629 * are in the list, so we push this object that will represent
4630 * the multi-bulk length in the output buffer, and will "fix"
4631 * it later */
4632 lenobj = createObject(REDIS_STRING,NULL);
4633 addReply(c,lenobj);
4634 decrRefCount(lenobj);
4635
4636 while(ln && ln->score <= max) {
4637 if (offset) {
4638 offset--;
4639 ln = ln->forward[0];
4640 continue;
4641 }
4642 if (limit == 0) break;
4643 ele = ln->obj;
4644 addReplyBulkLen(c,ele);
4645 addReply(c,ele);
4646 addReply(c,shared.crlf);
4647 ln = ln->forward[0];
4648 rangelen++;
4649 if (limit > 0) limit--;
4650 }
4651 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4652 }
4653 }
4654 }
4655
4656 static void zcardCommand(redisClient *c) {
4657 robj *o;
4658 zset *zs;
4659
4660 o = lookupKeyRead(c->db,c->argv[1]);
4661 if (o == NULL) {
4662 addReply(c,shared.czero);
4663 return;
4664 } else {
4665 if (o->type != REDIS_ZSET) {
4666 addReply(c,shared.wrongtypeerr);
4667 } else {
4668 zs = o->ptr;
4669 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
4670 }
4671 }
4672 }
4673
4674 static void zscoreCommand(redisClient *c) {
4675 robj *o;
4676 zset *zs;
4677
4678 o = lookupKeyRead(c->db,c->argv[1]);
4679 if (o == NULL) {
4680 addReply(c,shared.nullbulk);
4681 return;
4682 } else {
4683 if (o->type != REDIS_ZSET) {
4684 addReply(c,shared.wrongtypeerr);
4685 } else {
4686 dictEntry *de;
4687
4688 zs = o->ptr;
4689 de = dictFind(zs->dict,c->argv[2]);
4690 if (!de) {
4691 addReply(c,shared.nullbulk);
4692 } else {
4693 double *score = dictGetEntryVal(de);
4694
4695 addReplyDouble(c,*score);
4696 }
4697 }
4698 }
4699 }
4700
4701 /* ========================= Non type-specific commands ==================== */
4702
4703 static void flushdbCommand(redisClient *c) {
4704 server.dirty += dictSize(c->db->dict);
4705 dictEmpty(c->db->dict);
4706 dictEmpty(c->db->expires);
4707 addReply(c,shared.ok);
4708 }
4709
4710 static void flushallCommand(redisClient *c) {
4711 server.dirty += emptyDb();
4712 addReply(c,shared.ok);
4713 rdbSave(server.dbfilename);
4714 server.dirty++;
4715 }
4716
4717 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4718 redisSortOperation *so = zmalloc(sizeof(*so));
4719 so->type = type;
4720 so->pattern = pattern;
4721 return so;
4722 }
4723
4724 /* Return the value associated to the key with a name obtained
4725 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4726 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4727 char *p;
4728 sds spat, ssub;
4729 robj keyobj;
4730 int prefixlen, sublen, postfixlen;
4731 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4732 struct {
4733 long len;
4734 long free;
4735 char buf[REDIS_SORTKEY_MAX+1];
4736 } keyname;
4737
4738 /* If the pattern is "#" return the substitution object itself in order
4739 * to implement the "SORT ... GET #" feature. */
4740 spat = pattern->ptr;
4741 if (spat[0] == '#' && spat[1] == '\0') {
4742 return subst;
4743 }
4744
4745 /* The substitution object may be specially encoded. If so we create
4746 * a decoded object on the fly. Otherwise getDecodedObject will just
4747 * increment the ref count, that we'll decrement later. */
4748 subst = getDecodedObject(subst);
4749
4750 ssub = subst->ptr;
4751 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4752 p = strchr(spat,'*');
4753 if (!p) {
4754 decrRefCount(subst);
4755 return NULL;
4756 }
4757
4758 prefixlen = p-spat;
4759 sublen = sdslen(ssub);
4760 postfixlen = sdslen(spat)-(prefixlen+1);
4761 memcpy(keyname.buf,spat,prefixlen);
4762 memcpy(keyname.buf+prefixlen,ssub,sublen);
4763 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4764 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4765 keyname.len = prefixlen+sublen+postfixlen;
4766
4767 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
4768 decrRefCount(subst);
4769
4770 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4771 return lookupKeyRead(db,&keyobj);
4772 }
4773
4774 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4775 * the additional parameter is not standard but a BSD-specific we have to
4776 * pass sorting parameters via the global 'server' structure */
4777 static int sortCompare(const void *s1, const void *s2) {
4778 const redisSortObject *so1 = s1, *so2 = s2;
4779 int cmp;
4780
4781 if (!server.sort_alpha) {
4782 /* Numeric sorting. Here it's trivial as we precomputed scores */
4783 if (so1->u.score > so2->u.score) {
4784 cmp = 1;
4785 } else if (so1->u.score < so2->u.score) {
4786 cmp = -1;
4787 } else {
4788 cmp = 0;
4789 }
4790 } else {
4791 /* Alphanumeric sorting */
4792 if (server.sort_bypattern) {
4793 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4794 /* At least one compare object is NULL */
4795 if (so1->u.cmpobj == so2->u.cmpobj)
4796 cmp = 0;
4797 else if (so1->u.cmpobj == NULL)
4798 cmp = -1;
4799 else
4800 cmp = 1;
4801 } else {
4802 /* We have both the objects, use strcoll */
4803 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4804 }
4805 } else {
4806 /* Compare elements directly */
4807 robj *dec1, *dec2;
4808
4809 dec1 = getDecodedObject(so1->obj);
4810 dec2 = getDecodedObject(so2->obj);
4811 cmp = strcoll(dec1->ptr,dec2->ptr);
4812 decrRefCount(dec1);
4813 decrRefCount(dec2);
4814 }
4815 }
4816 return server.sort_desc ? -cmp : cmp;
4817 }
4818
4819 /* The SORT command is the most complex command in Redis. Warning: this code
4820 * is optimized for speed and a bit less for readability */
4821 static void sortCommand(redisClient *c) {
4822 list *operations;
4823 int outputlen = 0;
4824 int desc = 0, alpha = 0;
4825 int limit_start = 0, limit_count = -1, start, end;
4826 int j, dontsort = 0, vectorlen;
4827 int getop = 0; /* GET operation counter */
4828 robj *sortval, *sortby = NULL, *storekey = NULL;
4829 redisSortObject *vector; /* Resulting vector to sort */
4830
4831 /* Lookup the key to sort. It must be of the right types */
4832 sortval = lookupKeyRead(c->db,c->argv[1]);
4833 if (sortval == NULL) {
4834 addReply(c,shared.nokeyerr);
4835 return;
4836 }
4837 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
4838 sortval->type != REDIS_ZSET)
4839 {
4840 addReply(c,shared.wrongtypeerr);
4841 return;
4842 }
4843
4844 /* Create a list of operations to perform for every sorted element.
4845 * Operations can be GET/DEL/INCR/DECR */
4846 operations = listCreate();
4847 listSetFreeMethod(operations,zfree);
4848 j = 2;
4849
4850 /* Now we need to protect sortval incrementing its count, in the future
4851 * SORT may have options able to overwrite/delete keys during the sorting
4852 * and the sorted key itself may get destroied */
4853 incrRefCount(sortval);
4854
4855 /* The SORT command has an SQL-alike syntax, parse it */
4856 while(j < c->argc) {
4857 int leftargs = c->argc-j-1;
4858 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4859 desc = 0;
4860 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4861 desc = 1;
4862 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4863 alpha = 1;
4864 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4865 limit_start = atoi(c->argv[j+1]->ptr);
4866 limit_count = atoi(c->argv[j+2]->ptr);
4867 j+=2;
4868 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4869 storekey = c->argv[j+1];
4870 j++;
4871 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4872 sortby = c->argv[j+1];
4873 /* If the BY pattern does not contain '*', i.e. it is constant,
4874 * we don't need to sort nor to lookup the weight keys. */
4875 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4876 j++;
4877 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4878 listAddNodeTail(operations,createSortOperation(
4879 REDIS_SORT_GET,c->argv[j+1]));
4880 getop++;
4881 j++;
4882 } else {
4883 decrRefCount(sortval);
4884 listRelease(operations);
4885 addReply(c,shared.syntaxerr);
4886 return;
4887 }
4888 j++;
4889 }
4890
4891 /* Load the sorting vector with all the objects to sort */
4892 switch(sortval->type) {
4893 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
4894 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
4895 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
4896 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
4897 }
4898 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4899 j = 0;
4900
4901 if (sortval->type == REDIS_LIST) {
4902 list *list = sortval->ptr;
4903 listNode *ln;
4904
4905 listRewind(list);
4906 while((ln = listYield(list))) {
4907 robj *ele = ln->value;
4908 vector[j].obj = ele;
4909 vector[j].u.score = 0;
4910 vector[j].u.cmpobj = NULL;
4911 j++;
4912 }
4913 } else {
4914 dict *set;
4915 dictIterator *di;
4916 dictEntry *setele;
4917
4918 if (sortval->type == REDIS_SET) {
4919 set = sortval->ptr;
4920 } else {
4921 zset *zs = sortval->ptr;
4922 set = zs->dict;
4923 }
4924
4925 di = dictGetIterator(set);
4926 while((setele = dictNext(di)) != NULL) {
4927 vector[j].obj = dictGetEntryKey(setele);
4928 vector[j].u.score = 0;
4929 vector[j].u.cmpobj = NULL;
4930 j++;
4931 }
4932 dictReleaseIterator(di);
4933 }
4934 redisAssert(j == vectorlen);
4935
4936 /* Now it's time to load the right scores in the sorting vector */
4937 if (dontsort == 0) {
4938 for (j = 0; j < vectorlen; j++) {
4939 if (sortby) {
4940 robj *byval;
4941
4942 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4943 if (!byval || byval->type != REDIS_STRING) continue;
4944 if (alpha) {
4945 vector[j].u.cmpobj = getDecodedObject(byval);
4946 } else {
4947 if (byval->encoding == REDIS_ENCODING_RAW) {
4948 vector[j].u.score = strtod(byval->ptr,NULL);
4949 } else {
4950 /* Don't need to decode the object if it's
4951 * integer-encoded (the only encoding supported) so
4952 * far. We can just cast it */
4953 if (byval->encoding == REDIS_ENCODING_INT) {
4954 vector[j].u.score = (long)byval->ptr;
4955 } else
4956 redisAssert(1 != 1);
4957 }
4958 }
4959 } else {
4960 if (!alpha) {
4961 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4962 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4963 else {
4964 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4965 vector[j].u.score = (long) vector[j].obj->ptr;
4966 else
4967 redisAssert(1 != 1);
4968 }
4969 }
4970 }
4971 }
4972 }
4973
4974 /* We are ready to sort the vector... perform a bit of sanity check
4975 * on the LIMIT option too. We'll use a partial version of quicksort. */
4976 start = (limit_start < 0) ? 0 : limit_start;
4977 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4978 if (start >= vectorlen) {
4979 start = vectorlen-1;
4980 end = vectorlen-2;
4981 }
4982 if (end >= vectorlen) end = vectorlen-1;
4983
4984 if (dontsort == 0) {
4985 server.sort_desc = desc;
4986 server.sort_alpha = alpha;
4987 server.sort_bypattern = sortby ? 1 : 0;
4988 if (sortby && (start != 0 || end != vectorlen-1))
4989 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4990 else
4991 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4992 }
4993
4994 /* Send command output to the output buffer, performing the specified
4995 * GET/DEL/INCR/DECR operations if any. */
4996 outputlen = getop ? getop*(end-start+1) : end-start+1;
4997 if (storekey == NULL) {
4998 /* STORE option not specified, sent the sorting result to client */
4999 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
5000 for (j = start; j <= end; j++) {
5001 listNode *ln;
5002 if (!getop) {
5003 addReplyBulkLen(c,vector[j].obj);
5004 addReply(c,vector[j].obj);
5005 addReply(c,shared.crlf);
5006 }
5007 listRewind(operations);
5008 while((ln = listYield(operations))) {
5009 redisSortOperation *sop = ln->value;
5010 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5011 vector[j].obj);
5012
5013 if (sop->type == REDIS_SORT_GET) {
5014 if (!val || val->type != REDIS_STRING) {
5015 addReply(c,shared.nullbulk);
5016 } else {
5017 addReplyBulkLen(c,val);
5018 addReply(c,val);
5019 addReply(c,shared.crlf);
5020 }
5021 } else {
5022 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5023 }
5024 }
5025 }
5026 } else {
5027 robj *listObject = createListObject();
5028 list *listPtr = (list*) listObject->ptr;
5029
5030 /* STORE option specified, set the sorting result as a List object */
5031 for (j = start; j <= end; j++) {
5032 listNode *ln;
5033 if (!getop) {
5034 listAddNodeTail(listPtr,vector[j].obj);
5035 incrRefCount(vector[j].obj);
5036 }
5037 listRewind(operations);
5038 while((ln = listYield(operations))) {
5039 redisSortOperation *sop = ln->value;
5040 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5041 vector[j].obj);
5042
5043 if (sop->type == REDIS_SORT_GET) {
5044 if (!val || val->type != REDIS_STRING) {
5045 listAddNodeTail(listPtr,createStringObject("",0));
5046 } else {
5047 listAddNodeTail(listPtr,val);
5048 incrRefCount(val);
5049 }
5050 } else {
5051 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5052 }
5053 }
5054 }
5055 if (dictReplace(c->db->dict,storekey,listObject)) {
5056 incrRefCount(storekey);
5057 }
5058 /* Note: we add 1 because the DB is dirty anyway since even if the
5059 * SORT result is empty a new key is set and maybe the old content
5060 * replaced. */
5061 server.dirty += 1+outputlen;
5062 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5063 }
5064
5065 /* Cleanup */
5066 decrRefCount(sortval);
5067 listRelease(operations);
5068 for (j = 0; j < vectorlen; j++) {
5069 if (sortby && alpha && vector[j].u.cmpobj)
5070 decrRefCount(vector[j].u.cmpobj);
5071 }
5072 zfree(vector);
5073 }
5074
5075 /* Create the string returned by the INFO command. This is decoupled
5076 * by the INFO command itself as we need to report the same information
5077 * on memory corruption problems. */
5078 static sds genRedisInfoString(void) {
5079 sds info;
5080 time_t uptime = time(NULL)-server.stat_starttime;
5081 int j;
5082
5083 info = sdscatprintf(sdsempty(),
5084 "redis_version:%s\r\n"
5085 "arch_bits:%s\r\n"
5086 "multiplexing_api:%s\r\n"
5087 "uptime_in_seconds:%ld\r\n"
5088 "uptime_in_days:%ld\r\n"
5089 "connected_clients:%d\r\n"
5090 "connected_slaves:%d\r\n"
5091 "used_memory:%zu\r\n"
5092 "changes_since_last_save:%lld\r\n"
5093 "bgsave_in_progress:%d\r\n"
5094 "last_save_time:%ld\r\n"
5095 "bgrewriteaof_in_progress:%d\r\n"
5096 "total_connections_received:%lld\r\n"
5097 "total_commands_processed:%lld\r\n"
5098 "role:%s\r\n"
5099 ,REDIS_VERSION,
5100 (sizeof(long) == 8) ? "64" : "32",
5101 aeGetApiName(),
5102 uptime,
5103 uptime/(3600*24),
5104 listLength(server.clients)-listLength(server.slaves),
5105 listLength(server.slaves),
5106 server.usedmemory,
5107 server.dirty,
5108 server.bgsavechildpid != -1,
5109 server.lastsave,
5110 server.bgrewritechildpid != -1,
5111 server.stat_numconnections,
5112 server.stat_numcommands,
5113 server.masterhost == NULL ? "master" : "slave"
5114 );
5115 if (server.masterhost) {
5116 info = sdscatprintf(info,
5117 "master_host:%s\r\n"
5118 "master_port:%d\r\n"
5119 "master_link_status:%s\r\n"
5120 "master_last_io_seconds_ago:%d\r\n"
5121 ,server.masterhost,
5122 server.masterport,
5123 (server.replstate == REDIS_REPL_CONNECTED) ?
5124 "up" : "down",
5125 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5126 );
5127 }
5128 for (j = 0; j < server.dbnum; j++) {
5129 long long keys, vkeys;
5130
5131 keys = dictSize(server.db[j].dict);
5132 vkeys = dictSize(server.db[j].expires);
5133 if (keys || vkeys) {
5134 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5135 j, keys, vkeys);
5136 }
5137 }
5138 return info;
5139 }
5140
5141 static void infoCommand(redisClient *c) {
5142 sds info = genRedisInfoString();
5143 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5144 (unsigned long)sdslen(info)));
5145 addReplySds(c,info);
5146 addReply(c,shared.crlf);
5147 }
5148
5149 static void monitorCommand(redisClient *c) {
5150 /* ignore MONITOR if aleady slave or in monitor mode */
5151 if (c->flags & REDIS_SLAVE) return;
5152
5153 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5154 c->slaveseldb = 0;
5155 listAddNodeTail(server.monitors,c);
5156 addReply(c,shared.ok);
5157 }
5158
5159 /* ================================= Expire ================================= */
5160 static int removeExpire(redisDb *db, robj *key) {
5161 if (dictDelete(db->expires,key) == DICT_OK) {
5162 return 1;
5163 } else {
5164 return 0;
5165 }
5166 }
5167
5168 static int setExpire(redisDb *db, robj *key, time_t when) {
5169 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5170 return 0;
5171 } else {
5172 incrRefCount(key);
5173 return 1;
5174 }
5175 }
5176
5177 /* Return the expire time of the specified key, or -1 if no expire
5178 * is associated with this key (i.e. the key is non volatile) */
5179 static time_t getExpire(redisDb *db, robj *key) {
5180 dictEntry *de;
5181
5182 /* No expire? return ASAP */
5183 if (dictSize(db->expires) == 0 ||
5184 (de = dictFind(db->expires,key)) == NULL) return -1;
5185
5186 return (time_t) dictGetEntryVal(de);
5187 }
5188
5189 static int expireIfNeeded(redisDb *db, robj *key) {
5190 time_t when;
5191 dictEntry *de;
5192
5193 /* No expire? return ASAP */
5194 if (dictSize(db->expires) == 0 ||
5195 (de = dictFind(db->expires,key)) == NULL) return 0;
5196
5197 /* Lookup the expire */
5198 when = (time_t) dictGetEntryVal(de);
5199 if (time(NULL) <= when) return 0;
5200
5201 /* Delete the key */
5202 dictDelete(db->expires,key);
5203 return dictDelete(db->dict,key) == DICT_OK;
5204 }
5205
5206 static int deleteIfVolatile(redisDb *db, robj *key) {
5207 dictEntry *de;
5208
5209 /* No expire? return ASAP */
5210 if (dictSize(db->expires) == 0 ||
5211 (de = dictFind(db->expires,key)) == NULL) return 0;
5212
5213 /* Delete the key */
5214 server.dirty++;
5215 dictDelete(db->expires,key);
5216 return dictDelete(db->dict,key) == DICT_OK;
5217 }
5218
5219 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5220 dictEntry *de;
5221
5222 de = dictFind(c->db->dict,key);
5223 if (de == NULL) {
5224 addReply(c,shared.czero);
5225 return;
5226 }
5227 if (seconds < 0) {
5228 if (deleteKey(c->db,key)) server.dirty++;
5229 addReply(c, shared.cone);
5230 return;
5231 } else {
5232 time_t when = time(NULL)+seconds;
5233 if (setExpire(c->db,key,when)) {
5234 addReply(c,shared.cone);
5235 server.dirty++;
5236 } else {
5237 addReply(c,shared.czero);
5238 }
5239 return;
5240 }
5241 }
5242
5243 static void expireCommand(redisClient *c) {
5244 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5245 }
5246
5247 static void expireatCommand(redisClient *c) {
5248 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5249 }
5250
5251 static void ttlCommand(redisClient *c) {
5252 time_t expire;
5253 int ttl = -1;
5254
5255 expire = getExpire(c->db,c->argv[1]);
5256 if (expire != -1) {
5257 ttl = (int) (expire-time(NULL));
5258 if (ttl < 0) ttl = -1;
5259 }
5260 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5261 }
5262
5263 /* =============================== Replication ============================= */
5264
5265 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5266 ssize_t nwritten, ret = size;
5267 time_t start = time(NULL);
5268
5269 timeout++;
5270 while(size) {
5271 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5272 nwritten = write(fd,ptr,size);
5273 if (nwritten == -1) return -1;
5274 ptr += nwritten;
5275 size -= nwritten;
5276 }
5277 if ((time(NULL)-start) > timeout) {
5278 errno = ETIMEDOUT;
5279 return -1;
5280 }
5281 }
5282 return ret;
5283 }
5284
5285 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5286 ssize_t nread, totread = 0;
5287 time_t start = time(NULL);
5288
5289 timeout++;
5290 while(size) {
5291 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5292 nread = read(fd,ptr,size);
5293 if (nread == -1) return -1;
5294 ptr += nread;
5295 size -= nread;
5296 totread += nread;
5297 }
5298 if ((time(NULL)-start) > timeout) {
5299 errno = ETIMEDOUT;
5300 return -1;
5301 }
5302 }
5303 return totread;
5304 }
5305
5306 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5307 ssize_t nread = 0;
5308
5309 size--;
5310 while(size) {
5311 char c;
5312
5313 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5314 if (c == '\n') {
5315 *ptr = '\0';
5316 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5317 return nread;
5318 } else {
5319 *ptr++ = c;
5320 *ptr = '\0';
5321 nread++;
5322 }
5323 }
5324 return nread;
5325 }
5326
5327 static void syncCommand(redisClient *c) {
5328 /* ignore SYNC if aleady slave or in monitor mode */
5329 if (c->flags & REDIS_SLAVE) return;
5330
5331 /* SYNC can't be issued when the server has pending data to send to
5332 * the client about already issued commands. We need a fresh reply
5333 * buffer registering the differences between the BGSAVE and the current
5334 * dataset, so that we can copy to other slaves if needed. */
5335 if (listLength(c->reply) != 0) {
5336 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5337 return;
5338 }
5339
5340 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5341 /* Here we need to check if there is a background saving operation
5342 * in progress, or if it is required to start one */
5343 if (server.bgsavechildpid != -1) {
5344 /* Ok a background save is in progress. Let's check if it is a good
5345 * one for replication, i.e. if there is another slave that is
5346 * registering differences since the server forked to save */
5347 redisClient *slave;
5348 listNode *ln;
5349
5350 listRewind(server.slaves);
5351 while((ln = listYield(server.slaves))) {
5352 slave = ln->value;
5353 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5354 }
5355 if (ln) {
5356 /* Perfect, the server is already registering differences for
5357 * another slave. Set the right state, and copy the buffer. */
5358 listRelease(c->reply);
5359 c->reply = listDup(slave->reply);
5360 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5361 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5362 } else {
5363 /* No way, we need to wait for the next BGSAVE in order to
5364 * register differences */
5365 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5366 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5367 }
5368 } else {
5369 /* Ok we don't have a BGSAVE in progress, let's start one */
5370 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5371 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5372 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5373 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5374 return;
5375 }
5376 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5377 }
5378 c->repldbfd = -1;
5379 c->flags |= REDIS_SLAVE;
5380 c->slaveseldb = 0;
5381 listAddNodeTail(server.slaves,c);
5382 return;
5383 }
5384
5385 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5386 redisClient *slave = privdata;
5387 REDIS_NOTUSED(el);
5388 REDIS_NOTUSED(mask);
5389 char buf[REDIS_IOBUF_LEN];
5390 ssize_t nwritten, buflen;
5391
5392 if (slave->repldboff == 0) {
5393 /* Write the bulk write count before to transfer the DB. In theory here
5394 * we don't know how much room there is in the output buffer of the
5395 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5396 * operations) will never be smaller than the few bytes we need. */
5397 sds bulkcount;
5398
5399 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5400 slave->repldbsize);
5401 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5402 {
5403 sdsfree(bulkcount);
5404 freeClient(slave);
5405 return;
5406 }
5407 sdsfree(bulkcount);
5408 }
5409 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5410 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5411 if (buflen <= 0) {
5412 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5413 (buflen == 0) ? "premature EOF" : strerror(errno));
5414 freeClient(slave);
5415 return;
5416 }
5417 if ((nwritten = write(fd,buf,buflen)) == -1) {
5418 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5419 strerror(errno));
5420 freeClient(slave);
5421 return;
5422 }
5423 slave->repldboff += nwritten;
5424 if (slave->repldboff == slave->repldbsize) {
5425 close(slave->repldbfd);
5426 slave->repldbfd = -1;
5427 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5428 slave->replstate = REDIS_REPL_ONLINE;
5429 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5430 sendReplyToClient, slave) == AE_ERR) {
5431 freeClient(slave);
5432 return;
5433 }
5434 addReplySds(slave,sdsempty());
5435 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5436 }
5437 }
5438
5439 /* This function is called at the end of every backgrond saving.
5440 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5441 * otherwise REDIS_ERR is passed to the function.
5442 *
5443 * The goal of this function is to handle slaves waiting for a successful
5444 * background saving in order to perform non-blocking synchronization. */
5445 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5446 listNode *ln;
5447 int startbgsave = 0;
5448
5449 listRewind(server.slaves);
5450 while((ln = listYield(server.slaves))) {
5451 redisClient *slave = ln->value;
5452
5453 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5454 startbgsave = 1;
5455 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5456 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5457 struct redis_stat buf;
5458
5459 if (bgsaveerr != REDIS_OK) {
5460 freeClient(slave);
5461 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5462 continue;
5463 }
5464 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5465 redis_fstat(slave->repldbfd,&buf) == -1) {
5466 freeClient(slave);
5467 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5468 continue;
5469 }
5470 slave->repldboff = 0;
5471 slave->repldbsize = buf.st_size;
5472 slave->replstate = REDIS_REPL_SEND_BULK;
5473 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5474 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5475 freeClient(slave);
5476 continue;
5477 }
5478 }
5479 }
5480 if (startbgsave) {
5481 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5482 listRewind(server.slaves);
5483 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5484 while((ln = listYield(server.slaves))) {
5485 redisClient *slave = ln->value;
5486
5487 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5488 freeClient(slave);
5489 }
5490 }
5491 }
5492 }
5493
5494 static int syncWithMaster(void) {
5495 char buf[1024], tmpfile[256], authcmd[1024];
5496 int dumpsize;
5497 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5498 int dfd;
5499
5500 if (fd == -1) {
5501 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5502 strerror(errno));
5503 return REDIS_ERR;
5504 }
5505
5506 /* AUTH with the master if required. */
5507 if(server.masterauth) {
5508 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5509 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5510 close(fd);
5511 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5512 strerror(errno));
5513 return REDIS_ERR;
5514 }
5515 /* Read the AUTH result. */
5516 if (syncReadLine(fd,buf,1024,3600) == -1) {
5517 close(fd);
5518 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5519 strerror(errno));
5520 return REDIS_ERR;
5521 }
5522 if (buf[0] != '+') {
5523 close(fd);
5524 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5525 return REDIS_ERR;
5526 }
5527 }
5528
5529 /* Issue the SYNC command */
5530 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5531 close(fd);
5532 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5533 strerror(errno));
5534 return REDIS_ERR;
5535 }
5536 /* Read the bulk write count */
5537 if (syncReadLine(fd,buf,1024,3600) == -1) {
5538 close(fd);
5539 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5540 strerror(errno));
5541 return REDIS_ERR;
5542 }
5543 if (buf[0] != '$') {
5544 close(fd);
5545 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5546 return REDIS_ERR;
5547 }
5548 dumpsize = atoi(buf+1);
5549 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5550 /* Read the bulk write data on a temp file */
5551 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5552 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5553 if (dfd == -1) {
5554 close(fd);
5555 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5556 return REDIS_ERR;
5557 }
5558 while(dumpsize) {
5559 int nread, nwritten;
5560
5561 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5562 if (nread == -1) {
5563 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5564 strerror(errno));
5565 close(fd);
5566 close(dfd);
5567 return REDIS_ERR;
5568 }
5569 nwritten = write(dfd,buf,nread);
5570 if (nwritten == -1) {
5571 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5572 close(fd);
5573 close(dfd);
5574 return REDIS_ERR;
5575 }
5576 dumpsize -= nread;
5577 }
5578 close(dfd);
5579 if (rename(tmpfile,server.dbfilename) == -1) {
5580 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5581 unlink(tmpfile);
5582 close(fd);
5583 return REDIS_ERR;
5584 }
5585 emptyDb();
5586 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5587 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5588 close(fd);
5589 return REDIS_ERR;
5590 }
5591 server.master = createClient(fd);
5592 server.master->flags |= REDIS_MASTER;
5593 server.master->authenticated = 1;
5594 server.replstate = REDIS_REPL_CONNECTED;
5595 return REDIS_OK;
5596 }
5597
5598 static void slaveofCommand(redisClient *c) {
5599 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5600 !strcasecmp(c->argv[2]->ptr,"one")) {
5601 if (server.masterhost) {
5602 sdsfree(server.masterhost);
5603 server.masterhost = NULL;
5604 if (server.master) freeClient(server.master);
5605 server.replstate = REDIS_REPL_NONE;
5606 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5607 }
5608 } else {
5609 sdsfree(server.masterhost);
5610 server.masterhost = sdsdup(c->argv[1]->ptr);
5611 server.masterport = atoi(c->argv[2]->ptr);
5612 if (server.master) freeClient(server.master);
5613 server.replstate = REDIS_REPL_CONNECT;
5614 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5615 server.masterhost, server.masterport);
5616 }
5617 addReply(c,shared.ok);
5618 }
5619
5620 /* ============================ Maxmemory directive ======================== */
5621
5622 /* This function gets called when 'maxmemory' is set on the config file to limit
5623 * the max memory used by the server, and we are out of memory.
5624 * This function will try to, in order:
5625 *
5626 * - Free objects from the free list
5627 * - Try to remove keys with an EXPIRE set
5628 *
5629 * It is not possible to free enough memory to reach used-memory < maxmemory
5630 * the server will start refusing commands that will enlarge even more the
5631 * memory usage.
5632 */
5633 static void freeMemoryIfNeeded(void) {
5634 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5635 if (listLength(server.objfreelist)) {
5636 robj *o;
5637
5638 listNode *head = listFirst(server.objfreelist);
5639 o = listNodeValue(head);
5640 listDelNode(server.objfreelist,head);
5641 zfree(o);
5642 } else {
5643 int j, k, freed = 0;
5644
5645 for (j = 0; j < server.dbnum; j++) {
5646 int minttl = -1;
5647 robj *minkey = NULL;
5648 struct dictEntry *de;
5649
5650 if (dictSize(server.db[j].expires)) {
5651 freed = 1;
5652 /* From a sample of three keys drop the one nearest to
5653 * the natural expire */
5654 for (k = 0; k < 3; k++) {
5655 time_t t;
5656
5657 de = dictGetRandomKey(server.db[j].expires);
5658 t = (time_t) dictGetEntryVal(de);
5659 if (minttl == -1 || t < minttl) {
5660 minkey = dictGetEntryKey(de);
5661 minttl = t;
5662 }
5663 }
5664 deleteKey(server.db+j,minkey);
5665 }
5666 }
5667 if (!freed) return; /* nothing to free... */
5668 }
5669 }
5670 }
5671
5672 /* ============================== Append Only file ========================== */
5673
5674 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5675 sds buf = sdsempty();
5676 int j;
5677 ssize_t nwritten;
5678 time_t now;
5679 robj *tmpargv[3];
5680
5681 /* The DB this command was targetting is not the same as the last command
5682 * we appendend. To issue a SELECT command is needed. */
5683 if (dictid != server.appendseldb) {
5684 char seldb[64];
5685
5686 snprintf(seldb,sizeof(seldb),"%d",dictid);
5687 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
5688 (unsigned long)strlen(seldb),seldb);
5689 server.appendseldb = dictid;
5690 }
5691
5692 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5693 * EXPIREs into EXPIREATs calls */
5694 if (cmd->proc == expireCommand) {
5695 long when;
5696
5697 tmpargv[0] = createStringObject("EXPIREAT",8);
5698 tmpargv[1] = argv[1];
5699 incrRefCount(argv[1]);
5700 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5701 tmpargv[2] = createObject(REDIS_STRING,
5702 sdscatprintf(sdsempty(),"%ld",when));
5703 argv = tmpargv;
5704 }
5705
5706 /* Append the actual command */
5707 buf = sdscatprintf(buf,"*%d\r\n",argc);
5708 for (j = 0; j < argc; j++) {
5709 robj *o = argv[j];
5710
5711 o = getDecodedObject(o);
5712 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
5713 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5714 buf = sdscatlen(buf,"\r\n",2);
5715 decrRefCount(o);
5716 }
5717
5718 /* Free the objects from the modified argv for EXPIREAT */
5719 if (cmd->proc == expireCommand) {
5720 for (j = 0; j < 3; j++)
5721 decrRefCount(argv[j]);
5722 }
5723
5724 /* We want to perform a single write. This should be guaranteed atomic
5725 * at least if the filesystem we are writing is a real physical one.
5726 * While this will save us against the server being killed I don't think
5727 * there is much to do about the whole server stopping for power problems
5728 * or alike */
5729 nwritten = write(server.appendfd,buf,sdslen(buf));
5730 if (nwritten != (signed)sdslen(buf)) {
5731 /* Ooops, we are in troubles. The best thing to do for now is
5732 * to simply exit instead to give the illusion that everything is
5733 * working as expected. */
5734 if (nwritten == -1) {
5735 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5736 } else {
5737 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5738 }
5739 exit(1);
5740 }
5741 /* If a background append only file rewriting is in progress we want to
5742 * accumulate the differences between the child DB and the current one
5743 * in a buffer, so that when the child process will do its work we
5744 * can append the differences to the new append only file. */
5745 if (server.bgrewritechildpid != -1)
5746 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
5747
5748 sdsfree(buf);
5749 now = time(NULL);
5750 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5751 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5752 now-server.lastfsync > 1))
5753 {
5754 fsync(server.appendfd); /* Let's try to get this data on the disk */
5755 server.lastfsync = now;
5756 }
5757 }
5758
5759 /* In Redis commands are always executed in the context of a client, so in
5760 * order to load the append only file we need to create a fake client. */
5761 static struct redisClient *createFakeClient(void) {
5762 struct redisClient *c = zmalloc(sizeof(*c));
5763
5764 selectDb(c,0);
5765 c->fd = -1;
5766 c->querybuf = sdsempty();
5767 c->argc = 0;
5768 c->argv = NULL;
5769 c->flags = 0;
5770 /* We set the fake client as a slave waiting for the synchronization
5771 * so that Redis will not try to send replies to this client. */
5772 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5773 c->reply = listCreate();
5774 listSetFreeMethod(c->reply,decrRefCount);
5775 listSetDupMethod(c->reply,dupClientReplyValue);
5776 return c;
5777 }
5778
5779 static void freeFakeClient(struct redisClient *c) {
5780 sdsfree(c->querybuf);
5781 listRelease(c->reply);
5782 zfree(c);
5783 }
5784
5785 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5786 * error (the append only file is zero-length) REDIS_ERR is returned. On
5787 * fatal error an error message is logged and the program exists. */
5788 int loadAppendOnlyFile(char *filename) {
5789 struct redisClient *fakeClient;
5790 FILE *fp = fopen(filename,"r");
5791 struct redis_stat sb;
5792
5793 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5794 return REDIS_ERR;
5795
5796 if (fp == NULL) {
5797 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5798 exit(1);
5799 }
5800
5801 fakeClient = createFakeClient();
5802 while(1) {
5803 int argc, j;
5804 unsigned long len;
5805 robj **argv;
5806 char buf[128];
5807 sds argsds;
5808 struct redisCommand *cmd;
5809
5810 if (fgets(buf,sizeof(buf),fp) == NULL) {
5811 if (feof(fp))
5812 break;
5813 else
5814 goto readerr;
5815 }
5816 if (buf[0] != '*') goto fmterr;
5817 argc = atoi(buf+1);
5818 argv = zmalloc(sizeof(robj*)*argc);
5819 for (j = 0; j < argc; j++) {
5820 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5821 if (buf[0] != '$') goto fmterr;
5822 len = strtol(buf+1,NULL,10);
5823 argsds = sdsnewlen(NULL,len);
5824 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
5825 argv[j] = createObject(REDIS_STRING,argsds);
5826 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5827 }
5828
5829 /* Command lookup */
5830 cmd = lookupCommand(argv[0]->ptr);
5831 if (!cmd) {
5832 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5833 exit(1);
5834 }
5835 /* Try object sharing and encoding */
5836 if (server.shareobjects) {
5837 int j;
5838 for(j = 1; j < argc; j++)
5839 argv[j] = tryObjectSharing(argv[j]);
5840 }
5841 if (cmd->flags & REDIS_CMD_BULK)
5842 tryObjectEncoding(argv[argc-1]);
5843 /* Run the command in the context of a fake client */
5844 fakeClient->argc = argc;
5845 fakeClient->argv = argv;
5846 cmd->proc(fakeClient);
5847 /* Discard the reply objects list from the fake client */
5848 while(listLength(fakeClient->reply))
5849 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5850 /* Clean up, ready for the next command */
5851 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5852 zfree(argv);
5853 }
5854 fclose(fp);
5855 freeFakeClient(fakeClient);
5856 return REDIS_OK;
5857
5858 readerr:
5859 if (feof(fp)) {
5860 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5861 } else {
5862 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5863 }
5864 exit(1);
5865 fmterr:
5866 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5867 exit(1);
5868 }
5869
5870 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5871 static int fwriteBulk(FILE *fp, robj *obj) {
5872 char buf[128];
5873 obj = getDecodedObject(obj);
5874 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
5875 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
5876 if (fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0) goto err;
5877 if (fwrite("\r\n",2,1,fp) == 0) goto err;
5878 decrRefCount(obj);
5879 return 1;
5880 err:
5881 decrRefCount(obj);
5882 return 0;
5883 }
5884
5885 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5886 static int fwriteBulkDouble(FILE *fp, double d) {
5887 char buf[128], dbuf[128];
5888
5889 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
5890 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
5891 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5892 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
5893 return 1;
5894 }
5895
5896 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5897 static int fwriteBulkLong(FILE *fp, long l) {
5898 char buf[128], lbuf[128];
5899
5900 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
5901 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
5902 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5903 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
5904 return 1;
5905 }
5906
5907 /* Write a sequence of commands able to fully rebuild the dataset into
5908 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5909 static int rewriteAppendOnlyFile(char *filename) {
5910 dictIterator *di = NULL;
5911 dictEntry *de;
5912 FILE *fp;
5913 char tmpfile[256];
5914 int j;
5915 time_t now = time(NULL);
5916
5917 /* Note that we have to use a different temp name here compared to the
5918 * one used by rewriteAppendOnlyFileBackground() function. */
5919 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
5920 fp = fopen(tmpfile,"w");
5921 if (!fp) {
5922 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
5923 return REDIS_ERR;
5924 }
5925 for (j = 0; j < server.dbnum; j++) {
5926 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
5927 redisDb *db = server.db+j;
5928 dict *d = db->dict;
5929 if (dictSize(d) == 0) continue;
5930 di = dictGetIterator(d);
5931 if (!di) {
5932 fclose(fp);
5933 return REDIS_ERR;
5934 }
5935
5936 /* SELECT the new DB */
5937 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
5938 if (fwriteBulkLong(fp,j) == 0) goto werr;
5939
5940 /* Iterate this DB writing every entry */
5941 while((de = dictNext(di)) != NULL) {
5942 robj *key = dictGetEntryKey(de);
5943 robj *o = dictGetEntryVal(de);
5944 time_t expiretime = getExpire(db,key);
5945
5946 /* Save the key and associated value */
5947 if (o->type == REDIS_STRING) {
5948 /* Emit a SET command */
5949 char cmd[]="*3\r\n$3\r\nSET\r\n";
5950 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5951 /* Key and value */
5952 if (fwriteBulk(fp,key) == 0) goto werr;
5953 if (fwriteBulk(fp,o) == 0) goto werr;
5954 } else if (o->type == REDIS_LIST) {
5955 /* Emit the RPUSHes needed to rebuild the list */
5956 list *list = o->ptr;
5957 listNode *ln;
5958
5959 listRewind(list);
5960 while((ln = listYield(list))) {
5961 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
5962 robj *eleobj = listNodeValue(ln);
5963
5964 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5965 if (fwriteBulk(fp,key) == 0) goto werr;
5966 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5967 }
5968 } else if (o->type == REDIS_SET) {
5969 /* Emit the SADDs needed to rebuild the set */
5970 dict *set = o->ptr;
5971 dictIterator *di = dictGetIterator(set);
5972 dictEntry *de;
5973
5974 while((de = dictNext(di)) != NULL) {
5975 char cmd[]="*3\r\n$4\r\nSADD\r\n";
5976 robj *eleobj = dictGetEntryKey(de);
5977
5978 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5979 if (fwriteBulk(fp,key) == 0) goto werr;
5980 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5981 }
5982 dictReleaseIterator(di);
5983 } else if (o->type == REDIS_ZSET) {
5984 /* Emit the ZADDs needed to rebuild the sorted set */
5985 zset *zs = o->ptr;
5986 dictIterator *di = dictGetIterator(zs->dict);
5987 dictEntry *de;
5988
5989 while((de = dictNext(di)) != NULL) {
5990 char cmd[]="*4\r\n$4\r\nZADD\r\n";
5991 robj *eleobj = dictGetEntryKey(de);
5992 double *score = dictGetEntryVal(de);
5993
5994 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5995 if (fwriteBulk(fp,key) == 0) goto werr;
5996 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
5997 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5998 }
5999 dictReleaseIterator(di);
6000 } else {
6001 redisAssert(0 != 0);
6002 }
6003 /* Save the expire time */
6004 if (expiretime != -1) {
6005 char cmd[]="*3\r\n$6\r\nEXPIRE\r\n";
6006 /* If this key is already expired skip it */
6007 if (expiretime < now) continue;
6008 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6009 if (fwriteBulk(fp,key) == 0) goto werr;
6010 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6011 }
6012 }
6013 dictReleaseIterator(di);
6014 }
6015
6016 /* Make sure data will not remain on the OS's output buffers */
6017 fflush(fp);
6018 fsync(fileno(fp));
6019 fclose(fp);
6020
6021 /* Use RENAME to make sure the DB file is changed atomically only
6022 * if the generate DB file is ok. */
6023 if (rename(tmpfile,filename) == -1) {
6024 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6025 unlink(tmpfile);
6026 return REDIS_ERR;
6027 }
6028 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6029 return REDIS_OK;
6030
6031 werr:
6032 fclose(fp);
6033 unlink(tmpfile);
6034 redisLog(REDIS_WARNING,"Write error writing append only fileon disk: %s", strerror(errno));
6035 if (di) dictReleaseIterator(di);
6036 return REDIS_ERR;
6037 }
6038
6039 /* This is how rewriting of the append only file in background works:
6040 *
6041 * 1) The user calls BGREWRITEAOF
6042 * 2) Redis calls this function, that forks():
6043 * 2a) the child rewrite the append only file in a temp file.
6044 * 2b) the parent accumulates differences in server.bgrewritebuf.
6045 * 3) When the child finished '2a' exists.
6046 * 4) The parent will trap the exit code, if it's OK, will append the
6047 * data accumulated into server.bgrewritebuf into the temp file, and
6048 * finally will rename(2) the temp file in the actual file name.
6049 * The the new file is reopened as the new append only file. Profit!
6050 */
6051 static int rewriteAppendOnlyFileBackground(void) {
6052 pid_t childpid;
6053
6054 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6055 if ((childpid = fork()) == 0) {
6056 /* Child */
6057 char tmpfile[256];
6058 close(server.fd);
6059
6060 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6061 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6062 exit(0);
6063 } else {
6064 exit(1);
6065 }
6066 } else {
6067 /* Parent */
6068 if (childpid == -1) {
6069 redisLog(REDIS_WARNING,
6070 "Can't rewrite append only file in background: fork: %s",
6071 strerror(errno));
6072 return REDIS_ERR;
6073 }
6074 redisLog(REDIS_NOTICE,
6075 "Background append only file rewriting started by pid %d",childpid);
6076 server.bgrewritechildpid = childpid;
6077 /* We set appendseldb to -1 in order to force the next call to the
6078 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6079 * accumulated by the parent into server.bgrewritebuf will start
6080 * with a SELECT statement and it will be safe to merge. */
6081 server.appendseldb = -1;
6082 return REDIS_OK;
6083 }
6084 return REDIS_OK; /* unreached */
6085 }
6086
6087 static void bgrewriteaofCommand(redisClient *c) {
6088 if (server.bgrewritechildpid != -1) {
6089 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6090 return;
6091 }
6092 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6093 char *status = "+Background append only file rewriting started\r\n";
6094 addReplySds(c,sdsnew(status));
6095 } else {
6096 addReply(c,shared.err);
6097 }
6098 }
6099
6100 static void aofRemoveTempFile(pid_t childpid) {
6101 char tmpfile[256];
6102
6103 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6104 unlink(tmpfile);
6105 }
6106
6107 /* ================================= Debugging ============================== */
6108
6109 static void debugCommand(redisClient *c) {
6110 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6111 *((char*)-1) = 'x';
6112 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6113 if (rdbSave(server.dbfilename) != REDIS_OK) {
6114 addReply(c,shared.err);
6115 return;
6116 }
6117 emptyDb();
6118 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6119 addReply(c,shared.err);
6120 return;
6121 }
6122 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6123 addReply(c,shared.ok);
6124 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6125 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6126 robj *key, *val;
6127
6128 if (!de) {
6129 addReply(c,shared.nokeyerr);
6130 return;
6131 }
6132 key = dictGetEntryKey(de);
6133 val = dictGetEntryVal(de);
6134 addReplySds(c,sdscatprintf(sdsempty(),
6135 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6136 (void*)key, key->refcount, (void*)val, val->refcount,
6137 val->encoding));
6138 } else {
6139 addReplySds(c,sdsnew(
6140 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6141 }
6142 }
6143
6144 static void _redisAssert(char *estr) {
6145 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6146 redisLog(REDIS_WARNING,"==> %s\n",estr);
6147 #ifdef HAVE_BACKTRACE
6148 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6149 *((char*)-1) = 'x';
6150 #endif
6151 }
6152
6153 /* =================================== Main! ================================ */
6154
6155 #ifdef __linux__
6156 int linuxOvercommitMemoryValue(void) {
6157 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6158 char buf[64];
6159
6160 if (!fp) return -1;
6161 if (fgets(buf,64,fp) == NULL) {
6162 fclose(fp);
6163 return -1;
6164 }
6165 fclose(fp);
6166
6167 return atoi(buf);
6168 }
6169
6170 void linuxOvercommitMemoryWarning(void) {
6171 if (linuxOvercommitMemoryValue() == 0) {
6172 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6173 }
6174 }
6175 #endif /* __linux__ */
6176
6177 static void daemonize(void) {
6178 int fd;
6179 FILE *fp;
6180
6181 if (fork() != 0) exit(0); /* parent exits */
6182 printf("New pid: %d\n", getpid());
6183 setsid(); /* create a new session */
6184
6185 /* Every output goes to /dev/null. If Redis is daemonized but
6186 * the 'logfile' is set to 'stdout' in the configuration file
6187 * it will not log at all. */
6188 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6189 dup2(fd, STDIN_FILENO);
6190 dup2(fd, STDOUT_FILENO);
6191 dup2(fd, STDERR_FILENO);
6192 if (fd > STDERR_FILENO) close(fd);
6193 }
6194 /* Try to write the pid file */
6195 fp = fopen(server.pidfile,"w");
6196 if (fp) {
6197 fprintf(fp,"%d\n",getpid());
6198 fclose(fp);
6199 }
6200 }
6201
6202 int main(int argc, char **argv) {
6203 initServerConfig();
6204 if (argc == 2) {
6205 resetServerSaveParams();
6206 loadServerConfig(argv[1]);
6207 } else if (argc > 2) {
6208 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6209 exit(1);
6210 } else {
6211 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6212 }
6213 if (server.daemonize) daemonize();
6214 initServer();
6215 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6216 #ifdef __linux__
6217 linuxOvercommitMemoryWarning();
6218 #endif
6219 if (server.appendonly) {
6220 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6221 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6222 } else {
6223 if (rdbLoad(server.dbfilename) == REDIS_OK)
6224 redisLog(REDIS_NOTICE,"DB loaded from disk");
6225 }
6226 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6227 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6228 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6229 aeMain(server.el);
6230 aeDeleteEventLoop(server.el);
6231 return 0;
6232 }
6233
6234 /* ============================= Backtrace support ========================= */
6235
6236 #ifdef HAVE_BACKTRACE
6237 static char *findFuncName(void *pointer, unsigned long *offset);
6238
6239 static void *getMcontextEip(ucontext_t *uc) {
6240 #if defined(__FreeBSD__)
6241 return (void*) uc->uc_mcontext.mc_eip;
6242 #elif defined(__dietlibc__)
6243 return (void*) uc->uc_mcontext.eip;
6244 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6245 #if __x86_64__
6246 return (void*) uc->uc_mcontext->__ss.__rip;
6247 #else
6248 return (void*) uc->uc_mcontext->__ss.__eip;
6249 #endif
6250 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6251 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6252 return (void*) uc->uc_mcontext->__ss.__rip;
6253 #else
6254 return (void*) uc->uc_mcontext->__ss.__eip;
6255 #endif
6256 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6257 return (void*) uc->uc_mcontext.gregs[REG_EIP];
6258 #elif defined(__ia64__) /* Linux IA64 */
6259 return (void*) uc->uc_mcontext.sc_ip;
6260 #else
6261 return NULL;
6262 #endif
6263 }
6264
6265 static void segvHandler(int sig, siginfo_t *info, void *secret) {
6266 void *trace[100];
6267 char **messages = NULL;
6268 int i, trace_size = 0;
6269 unsigned long offset=0;
6270 ucontext_t *uc = (ucontext_t*) secret;
6271 sds infostring;
6272 REDIS_NOTUSED(info);
6273
6274 redisLog(REDIS_WARNING,
6275 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6276 infostring = genRedisInfoString();
6277 redisLog(REDIS_WARNING, "%s",infostring);
6278 /* It's not safe to sdsfree() the returned string under memory
6279 * corruption conditions. Let it leak as we are going to abort */
6280
6281 trace_size = backtrace(trace, 100);
6282 /* overwrite sigaction with caller's address */
6283 if (getMcontextEip(uc) != NULL) {
6284 trace[1] = getMcontextEip(uc);
6285 }
6286 messages = backtrace_symbols(trace, trace_size);
6287
6288 for (i=1; i<trace_size; ++i) {
6289 char *fn = findFuncName(trace[i], &offset), *p;
6290
6291 p = strchr(messages[i],'+');
6292 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6293 redisLog(REDIS_WARNING,"%s", messages[i]);
6294 } else {
6295 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6296 }
6297 }
6298 // free(messages); Don't call free() with possibly corrupted memory.
6299 exit(0);
6300 }
6301
6302 static void setupSigSegvAction(void) {
6303 struct sigaction act;
6304
6305 sigemptyset (&act.sa_mask);
6306 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6307 * is used. Otherwise, sa_handler is used */
6308 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6309 act.sa_sigaction = segvHandler;
6310 sigaction (SIGSEGV, &act, NULL);
6311 sigaction (SIGBUS, &act, NULL);
6312 sigaction (SIGFPE, &act, NULL);
6313 sigaction (SIGILL, &act, NULL);
6314 sigaction (SIGBUS, &act, NULL);
6315 return;
6316 }
6317
6318 #include "staticsymbols.h"
6319 /* This function try to convert a pointer into a function name. It's used in
6320 * oreder to provide a backtrace under segmentation fault that's able to
6321 * display functions declared as static (otherwise the backtrace is useless). */
6322 static char *findFuncName(void *pointer, unsigned long *offset){
6323 int i, ret = -1;
6324 unsigned long off, minoff = 0;
6325
6326 /* Try to match against the Symbol with the smallest offset */
6327 for (i=0; symsTable[i].pointer; i++) {
6328 unsigned long lp = (unsigned long) pointer;
6329
6330 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6331 off=lp-symsTable[i].pointer;
6332 if (ret < 0 || off < minoff) {
6333 minoff=off;
6334 ret=i;
6335 }
6336 }
6337 }
6338 if (ret == -1) return NULL;
6339 *offset = minoff;
6340 return symsTable[ret].name;
6341 }
6342 #else /* HAVE_BACKTRACE */
6343 static void setupSigSegvAction(void) {
6344 }
6345 #endif /* HAVE_BACKTRACE */
6346
6347
6348
6349 /* The End */
6350
6351
6352