]> git.saurik.com Git - redis.git/blob - redis.c
f0b69edd946517b4485e22189c9dfaa5aafab341
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.1.91"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Client flags */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
165
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
174
175 /* List related stuff */
176 #define REDIS_HEAD 0
177 #define REDIS_TAIL 1
178
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
184
185 /* Log levels */
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
189
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
192
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
195
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
200
201 /* We can print the stacktrace, so our assert is defined this way: */
202 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
203 static void _redisAssert(char *estr);
204
205 /*================================= Data types ============================== */
206
207 /* A redis object, that is a type able to hold a string / list / set */
208 typedef struct redisObject {
209 void *ptr;
210 unsigned char type;
211 unsigned char encoding;
212 unsigned char notused[2];
213 int refcount;
214 } robj;
215
216 /* Macro used to initalize a Redis object allocated on the stack.
217 * Note that this macro is taken near the structure definition to make sure
218 * we'll update it when the structure is changed, to avoid bugs like
219 * bug #85 introduced exactly in this way. */
220 #define initStaticStringObject(_var,_ptr) do { \
221 _var.refcount = 1; \
222 _var.type = REDIS_STRING; \
223 _var.encoding = REDIS_ENCODING_RAW; \
224 _var.ptr = _ptr; \
225 } while(0);
226
227 typedef struct redisDb {
228 dict *dict;
229 dict *expires;
230 int id;
231 } redisDb;
232
233 /* With multiplexing we need to take per-clinet state.
234 * Clients are taken in a liked list. */
235 typedef struct redisClient {
236 int fd;
237 redisDb *db;
238 int dictid;
239 sds querybuf;
240 robj **argv, **mbargv;
241 int argc, mbargc;
242 int bulklen; /* bulk read len. -1 if not in bulk read mode */
243 int multibulk; /* multi bulk command format active */
244 list *reply;
245 int sentlen;
246 time_t lastinteraction; /* time of the last interaction, used for timeout */
247 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
248 int slaveseldb; /* slave selected db, if this client is a slave */
249 int authenticated; /* when requirepass is non-NULL */
250 int replstate; /* replication state if this is a slave */
251 int repldbfd; /* replication DB file descriptor */
252 long repldboff; /* replication DB file offset */
253 off_t repldbsize; /* replication DB file size */
254 } redisClient;
255
256 struct saveparam {
257 time_t seconds;
258 int changes;
259 };
260
261 /* Global server state structure */
262 struct redisServer {
263 int port;
264 int fd;
265 redisDb *db;
266 dict *sharingpool;
267 unsigned int sharingpoolsize;
268 long long dirty; /* changes to DB from the last save */
269 list *clients;
270 list *slaves, *monitors;
271 char neterr[ANET_ERR_LEN];
272 aeEventLoop *el;
273 int cronloops; /* number of times the cron function run */
274 list *objfreelist; /* A list of freed objects to avoid malloc() */
275 time_t lastsave; /* Unix time of last save succeeede */
276 size_t usedmemory; /* Used memory in megabytes */
277 /* Fields used only for stats */
278 time_t stat_starttime; /* server start time */
279 long long stat_numcommands; /* number of processed commands */
280 long long stat_numconnections; /* number of connections received */
281 /* Configuration */
282 int verbosity;
283 int glueoutputbuf;
284 int maxidletime;
285 int dbnum;
286 int daemonize;
287 int appendonly;
288 int appendfsync;
289 time_t lastfsync;
290 int appendfd;
291 int appendseldb;
292 char *pidfile;
293 pid_t bgsavechildpid;
294 pid_t bgrewritechildpid;
295 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
296 struct saveparam *saveparams;
297 int saveparamslen;
298 char *logfile;
299 char *bindaddr;
300 char *dbfilename;
301 char *appendfilename;
302 char *requirepass;
303 int shareobjects;
304 /* Replication related */
305 int isslave;
306 char *masterauth;
307 char *masterhost;
308 int masterport;
309 redisClient *master; /* client that is master for this slave */
310 int replstate;
311 unsigned int maxclients;
312 unsigned long maxmemory;
313 /* Sort parameters - qsort_r() is only available under BSD so we
314 * have to take this state global, in order to pass it to sortCompare() */
315 int sort_desc;
316 int sort_alpha;
317 int sort_bypattern;
318 };
319
320 typedef void redisCommandProc(redisClient *c);
321 struct redisCommand {
322 char *name;
323 redisCommandProc *proc;
324 int arity;
325 int flags;
326 };
327
328 struct redisFunctionSym {
329 char *name;
330 unsigned long pointer;
331 };
332
333 typedef struct _redisSortObject {
334 robj *obj;
335 union {
336 double score;
337 robj *cmpobj;
338 } u;
339 } redisSortObject;
340
341 typedef struct _redisSortOperation {
342 int type;
343 robj *pattern;
344 } redisSortOperation;
345
346 /* ZSETs use a specialized version of Skiplists */
347
348 typedef struct zskiplistNode {
349 struct zskiplistNode **forward;
350 struct zskiplistNode *backward;
351 double score;
352 robj *obj;
353 } zskiplistNode;
354
355 typedef struct zskiplist {
356 struct zskiplistNode *header, *tail;
357 unsigned long length;
358 int level;
359 } zskiplist;
360
361 typedef struct zset {
362 dict *dict;
363 zskiplist *zsl;
364 } zset;
365
366 /* Our shared "common" objects */
367
368 struct sharedObjectsStruct {
369 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
370 *colon, *nullbulk, *nullmultibulk,
371 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
372 *outofrangeerr, *plus,
373 *select0, *select1, *select2, *select3, *select4,
374 *select5, *select6, *select7, *select8, *select9;
375 } shared;
376
377 /* Global vars that are actally used as constants. The following double
378 * values are used for double on-disk serialization, and are initialized
379 * at runtime to avoid strange compiler optimizations. */
380
381 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
382
383 /*================================ Prototypes =============================== */
384
385 static void freeStringObject(robj *o);
386 static void freeListObject(robj *o);
387 static void freeSetObject(robj *o);
388 static void decrRefCount(void *o);
389 static robj *createObject(int type, void *ptr);
390 static void freeClient(redisClient *c);
391 static int rdbLoad(char *filename);
392 static void addReply(redisClient *c, robj *obj);
393 static void addReplySds(redisClient *c, sds s);
394 static void incrRefCount(robj *o);
395 static int rdbSaveBackground(char *filename);
396 static robj *createStringObject(char *ptr, size_t len);
397 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
398 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
399 static int syncWithMaster(void);
400 static robj *tryObjectSharing(robj *o);
401 static int tryObjectEncoding(robj *o);
402 static robj *getDecodedObject(robj *o);
403 static int removeExpire(redisDb *db, robj *key);
404 static int expireIfNeeded(redisDb *db, robj *key);
405 static int deleteIfVolatile(redisDb *db, robj *key);
406 static int deleteKey(redisDb *db, robj *key);
407 static time_t getExpire(redisDb *db, robj *key);
408 static int setExpire(redisDb *db, robj *key, time_t when);
409 static void updateSlavesWaitingBgsave(int bgsaveerr);
410 static void freeMemoryIfNeeded(void);
411 static int processCommand(redisClient *c);
412 static void setupSigSegvAction(void);
413 static void rdbRemoveTempFile(pid_t childpid);
414 static void aofRemoveTempFile(pid_t childpid);
415 static size_t stringObjectLen(robj *o);
416 static void processInputBuffer(redisClient *c);
417 static zskiplist *zslCreate(void);
418 static void zslFree(zskiplist *zsl);
419 static void zslInsert(zskiplist *zsl, double score, robj *obj);
420 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
421
422 static void authCommand(redisClient *c);
423 static void pingCommand(redisClient *c);
424 static void echoCommand(redisClient *c);
425 static void setCommand(redisClient *c);
426 static void setnxCommand(redisClient *c);
427 static void getCommand(redisClient *c);
428 static void delCommand(redisClient *c);
429 static void existsCommand(redisClient *c);
430 static void incrCommand(redisClient *c);
431 static void decrCommand(redisClient *c);
432 static void incrbyCommand(redisClient *c);
433 static void decrbyCommand(redisClient *c);
434 static void selectCommand(redisClient *c);
435 static void randomkeyCommand(redisClient *c);
436 static void keysCommand(redisClient *c);
437 static void dbsizeCommand(redisClient *c);
438 static void lastsaveCommand(redisClient *c);
439 static void saveCommand(redisClient *c);
440 static void bgsaveCommand(redisClient *c);
441 static void bgrewriteaofCommand(redisClient *c);
442 static void shutdownCommand(redisClient *c);
443 static void moveCommand(redisClient *c);
444 static void renameCommand(redisClient *c);
445 static void renamenxCommand(redisClient *c);
446 static void lpushCommand(redisClient *c);
447 static void rpushCommand(redisClient *c);
448 static void lpopCommand(redisClient *c);
449 static void rpopCommand(redisClient *c);
450 static void llenCommand(redisClient *c);
451 static void lindexCommand(redisClient *c);
452 static void lrangeCommand(redisClient *c);
453 static void ltrimCommand(redisClient *c);
454 static void typeCommand(redisClient *c);
455 static void lsetCommand(redisClient *c);
456 static void saddCommand(redisClient *c);
457 static void sremCommand(redisClient *c);
458 static void smoveCommand(redisClient *c);
459 static void sismemberCommand(redisClient *c);
460 static void scardCommand(redisClient *c);
461 static void spopCommand(redisClient *c);
462 static void srandmemberCommand(redisClient *c);
463 static void sinterCommand(redisClient *c);
464 static void sinterstoreCommand(redisClient *c);
465 static void sunionCommand(redisClient *c);
466 static void sunionstoreCommand(redisClient *c);
467 static void sdiffCommand(redisClient *c);
468 static void sdiffstoreCommand(redisClient *c);
469 static void syncCommand(redisClient *c);
470 static void flushdbCommand(redisClient *c);
471 static void flushallCommand(redisClient *c);
472 static void sortCommand(redisClient *c);
473 static void lremCommand(redisClient *c);
474 static void rpoplpushcommand(redisClient *c);
475 static void infoCommand(redisClient *c);
476 static void mgetCommand(redisClient *c);
477 static void monitorCommand(redisClient *c);
478 static void expireCommand(redisClient *c);
479 static void expireatCommand(redisClient *c);
480 static void getsetCommand(redisClient *c);
481 static void ttlCommand(redisClient *c);
482 static void slaveofCommand(redisClient *c);
483 static void debugCommand(redisClient *c);
484 static void msetCommand(redisClient *c);
485 static void msetnxCommand(redisClient *c);
486 static void zaddCommand(redisClient *c);
487 static void zincrbyCommand(redisClient *c);
488 static void zrangeCommand(redisClient *c);
489 static void zrangebyscoreCommand(redisClient *c);
490 static void zrevrangeCommand(redisClient *c);
491 static void zcardCommand(redisClient *c);
492 static void zremCommand(redisClient *c);
493 static void zscoreCommand(redisClient *c);
494 static void zremrangebyscoreCommand(redisClient *c);
495
496 /*================================= Globals ================================= */
497
498 /* Global vars */
499 static struct redisServer server; /* server global state */
500 static struct redisCommand cmdTable[] = {
501 {"get",getCommand,2,REDIS_CMD_INLINE},
502 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
503 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
504 {"del",delCommand,-2,REDIS_CMD_INLINE},
505 {"exists",existsCommand,2,REDIS_CMD_INLINE},
506 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
507 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
508 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
509 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
510 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
511 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
512 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
513 {"llen",llenCommand,2,REDIS_CMD_INLINE},
514 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
515 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
516 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
517 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
518 {"lrem",lremCommand,4,REDIS_CMD_BULK},
519 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
520 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
521 {"srem",sremCommand,3,REDIS_CMD_BULK},
522 {"smove",smoveCommand,4,REDIS_CMD_BULK},
523 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
524 {"scard",scardCommand,2,REDIS_CMD_INLINE},
525 {"spop",spopCommand,2,REDIS_CMD_INLINE},
526 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
527 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
528 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
529 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
530 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
531 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
532 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
533 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
534 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
535 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
536 {"zrem",zremCommand,3,REDIS_CMD_BULK},
537 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
538 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
539 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
540 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
541 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
542 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
543 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
544 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
545 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
546 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
547 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
548 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
549 {"select",selectCommand,2,REDIS_CMD_INLINE},
550 {"move",moveCommand,3,REDIS_CMD_INLINE},
551 {"rename",renameCommand,3,REDIS_CMD_INLINE},
552 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
553 {"expire",expireCommand,3,REDIS_CMD_INLINE},
554 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
555 {"keys",keysCommand,2,REDIS_CMD_INLINE},
556 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
557 {"auth",authCommand,2,REDIS_CMD_INLINE},
558 {"ping",pingCommand,1,REDIS_CMD_INLINE},
559 {"echo",echoCommand,2,REDIS_CMD_BULK},
560 {"save",saveCommand,1,REDIS_CMD_INLINE},
561 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
562 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
563 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
564 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
565 {"type",typeCommand,2,REDIS_CMD_INLINE},
566 {"sync",syncCommand,1,REDIS_CMD_INLINE},
567 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
568 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
569 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
570 {"info",infoCommand,1,REDIS_CMD_INLINE},
571 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
572 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
573 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
574 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
575 {NULL,NULL,0,0}
576 };
577
578 /*============================ Utility functions ============================ */
579
580 /* Glob-style pattern matching. */
581 int stringmatchlen(const char *pattern, int patternLen,
582 const char *string, int stringLen, int nocase)
583 {
584 while(patternLen) {
585 switch(pattern[0]) {
586 case '*':
587 while (pattern[1] == '*') {
588 pattern++;
589 patternLen--;
590 }
591 if (patternLen == 1)
592 return 1; /* match */
593 while(stringLen) {
594 if (stringmatchlen(pattern+1, patternLen-1,
595 string, stringLen, nocase))
596 return 1; /* match */
597 string++;
598 stringLen--;
599 }
600 return 0; /* no match */
601 break;
602 case '?':
603 if (stringLen == 0)
604 return 0; /* no match */
605 string++;
606 stringLen--;
607 break;
608 case '[':
609 {
610 int not, match;
611
612 pattern++;
613 patternLen--;
614 not = pattern[0] == '^';
615 if (not) {
616 pattern++;
617 patternLen--;
618 }
619 match = 0;
620 while(1) {
621 if (pattern[0] == '\\') {
622 pattern++;
623 patternLen--;
624 if (pattern[0] == string[0])
625 match = 1;
626 } else if (pattern[0] == ']') {
627 break;
628 } else if (patternLen == 0) {
629 pattern--;
630 patternLen++;
631 break;
632 } else if (pattern[1] == '-' && patternLen >= 3) {
633 int start = pattern[0];
634 int end = pattern[2];
635 int c = string[0];
636 if (start > end) {
637 int t = start;
638 start = end;
639 end = t;
640 }
641 if (nocase) {
642 start = tolower(start);
643 end = tolower(end);
644 c = tolower(c);
645 }
646 pattern += 2;
647 patternLen -= 2;
648 if (c >= start && c <= end)
649 match = 1;
650 } else {
651 if (!nocase) {
652 if (pattern[0] == string[0])
653 match = 1;
654 } else {
655 if (tolower((int)pattern[0]) == tolower((int)string[0]))
656 match = 1;
657 }
658 }
659 pattern++;
660 patternLen--;
661 }
662 if (not)
663 match = !match;
664 if (!match)
665 return 0; /* no match */
666 string++;
667 stringLen--;
668 break;
669 }
670 case '\\':
671 if (patternLen >= 2) {
672 pattern++;
673 patternLen--;
674 }
675 /* fall through */
676 default:
677 if (!nocase) {
678 if (pattern[0] != string[0])
679 return 0; /* no match */
680 } else {
681 if (tolower((int)pattern[0]) != tolower((int)string[0]))
682 return 0; /* no match */
683 }
684 string++;
685 stringLen--;
686 break;
687 }
688 pattern++;
689 patternLen--;
690 if (stringLen == 0) {
691 while(*pattern == '*') {
692 pattern++;
693 patternLen--;
694 }
695 break;
696 }
697 }
698 if (patternLen == 0 && stringLen == 0)
699 return 1;
700 return 0;
701 }
702
703 static void redisLog(int level, const char *fmt, ...) {
704 va_list ap;
705 FILE *fp;
706
707 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
708 if (!fp) return;
709
710 va_start(ap, fmt);
711 if (level >= server.verbosity) {
712 char *c = ".-*";
713 char buf[64];
714 time_t now;
715
716 now = time(NULL);
717 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
718 fprintf(fp,"%s %c ",buf,c[level]);
719 vfprintf(fp, fmt, ap);
720 fprintf(fp,"\n");
721 fflush(fp);
722 }
723 va_end(ap);
724
725 if (server.logfile) fclose(fp);
726 }
727
728 /*====================== Hash table type implementation ==================== */
729
730 /* This is an hash table type that uses the SDS dynamic strings libary as
731 * keys and radis objects as values (objects can hold SDS strings,
732 * lists, sets). */
733
734 static void dictVanillaFree(void *privdata, void *val)
735 {
736 DICT_NOTUSED(privdata);
737 zfree(val);
738 }
739
740 static int sdsDictKeyCompare(void *privdata, const void *key1,
741 const void *key2)
742 {
743 int l1,l2;
744 DICT_NOTUSED(privdata);
745
746 l1 = sdslen((sds)key1);
747 l2 = sdslen((sds)key2);
748 if (l1 != l2) return 0;
749 return memcmp(key1, key2, l1) == 0;
750 }
751
752 static void dictRedisObjectDestructor(void *privdata, void *val)
753 {
754 DICT_NOTUSED(privdata);
755
756 decrRefCount(val);
757 }
758
759 static int dictObjKeyCompare(void *privdata, const void *key1,
760 const void *key2)
761 {
762 const robj *o1 = key1, *o2 = key2;
763 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
764 }
765
766 static unsigned int dictObjHash(const void *key) {
767 const robj *o = key;
768 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
769 }
770
771 static int dictEncObjKeyCompare(void *privdata, const void *key1,
772 const void *key2)
773 {
774 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
775 int cmp;
776
777 o1 = getDecodedObject(o1);
778 o2 = getDecodedObject(o2);
779 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
780 decrRefCount(o1);
781 decrRefCount(o2);
782 return cmp;
783 }
784
785 static unsigned int dictEncObjHash(const void *key) {
786 robj *o = (robj*) key;
787
788 o = getDecodedObject(o);
789 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
790 decrRefCount(o);
791 return hash;
792 }
793
794 static dictType setDictType = {
795 dictEncObjHash, /* hash function */
796 NULL, /* key dup */
797 NULL, /* val dup */
798 dictEncObjKeyCompare, /* key compare */
799 dictRedisObjectDestructor, /* key destructor */
800 NULL /* val destructor */
801 };
802
803 static dictType zsetDictType = {
804 dictEncObjHash, /* hash function */
805 NULL, /* key dup */
806 NULL, /* val dup */
807 dictEncObjKeyCompare, /* key compare */
808 dictRedisObjectDestructor, /* key destructor */
809 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
810 };
811
812 static dictType hashDictType = {
813 dictObjHash, /* hash function */
814 NULL, /* key dup */
815 NULL, /* val dup */
816 dictObjKeyCompare, /* key compare */
817 dictRedisObjectDestructor, /* key destructor */
818 dictRedisObjectDestructor /* val destructor */
819 };
820
821 /* ========================= Random utility functions ======================= */
822
823 /* Redis generally does not try to recover from out of memory conditions
824 * when allocating objects or strings, it is not clear if it will be possible
825 * to report this condition to the client since the networking layer itself
826 * is based on heap allocation for send buffers, so we simply abort.
827 * At least the code will be simpler to read... */
828 static void oom(const char *msg) {
829 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
830 sleep(1);
831 abort();
832 }
833
834 /* ====================== Redis server networking stuff ===================== */
835 static void closeTimedoutClients(void) {
836 redisClient *c;
837 listNode *ln;
838 time_t now = time(NULL);
839
840 listRewind(server.clients);
841 while ((ln = listYield(server.clients)) != NULL) {
842 c = listNodeValue(ln);
843 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
844 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
845 (now - c->lastinteraction > server.maxidletime)) {
846 redisLog(REDIS_DEBUG,"Closing idle client");
847 freeClient(c);
848 }
849 }
850 }
851
852 static int htNeedsResize(dict *dict) {
853 long long size, used;
854
855 size = dictSlots(dict);
856 used = dictSize(dict);
857 return (size && used && size > DICT_HT_INITIAL_SIZE &&
858 (used*100/size < REDIS_HT_MINFILL));
859 }
860
861 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
862 * we resize the hash table to save memory */
863 static void tryResizeHashTables(void) {
864 int j;
865
866 for (j = 0; j < server.dbnum; j++) {
867 if (htNeedsResize(server.db[j].dict)) {
868 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
869 dictResize(server.db[j].dict);
870 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
871 }
872 if (htNeedsResize(server.db[j].expires))
873 dictResize(server.db[j].expires);
874 }
875 }
876
877 /* A background saving child (BGSAVE) terminated its work. Handle this. */
878 void backgroundSaveDoneHandler(int statloc) {
879 int exitcode = WEXITSTATUS(statloc);
880 int bysignal = WIFSIGNALED(statloc);
881
882 if (!bysignal && exitcode == 0) {
883 redisLog(REDIS_NOTICE,
884 "Background saving terminated with success");
885 server.dirty = 0;
886 server.lastsave = time(NULL);
887 } else if (!bysignal && exitcode != 0) {
888 redisLog(REDIS_WARNING, "Background saving error");
889 } else {
890 redisLog(REDIS_WARNING,
891 "Background saving terminated by signal");
892 rdbRemoveTempFile(server.bgsavechildpid);
893 }
894 server.bgsavechildpid = -1;
895 /* Possibly there are slaves waiting for a BGSAVE in order to be served
896 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
897 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
898 }
899
900 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
901 * Handle this. */
902 void backgroundRewriteDoneHandler(int statloc) {
903 int exitcode = WEXITSTATUS(statloc);
904 int bysignal = WIFSIGNALED(statloc);
905
906 if (!bysignal && exitcode == 0) {
907 int fd;
908 char tmpfile[256];
909
910 redisLog(REDIS_NOTICE,
911 "Background append only file rewriting terminated with success");
912 /* Now it's time to flush the differences accumulated by the parent */
913 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
914 fd = open(tmpfile,O_WRONLY|O_APPEND);
915 if (fd == -1) {
916 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
917 goto cleanup;
918 }
919 /* Flush our data... */
920 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
921 (signed) sdslen(server.bgrewritebuf)) {
922 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
923 close(fd);
924 goto cleanup;
925 }
926 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
927 /* Now our work is to rename the temp file into the stable file. And
928 * switch the file descriptor used by the server for append only. */
929 if (rename(tmpfile,server.appendfilename) == -1) {
930 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
931 close(fd);
932 goto cleanup;
933 }
934 /* Mission completed... almost */
935 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
936 if (server.appendfd != -1) {
937 /* If append only is actually enabled... */
938 close(server.appendfd);
939 server.appendfd = fd;
940 fsync(fd);
941 server.appendseldb = -1; /* Make sure it will issue SELECT */
942 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
943 } else {
944 /* If append only is disabled we just generate a dump in this
945 * format. Why not? */
946 close(fd);
947 }
948 } else if (!bysignal && exitcode != 0) {
949 redisLog(REDIS_WARNING, "Background append only file rewriting error");
950 } else {
951 redisLog(REDIS_WARNING,
952 "Background append only file rewriting terminated by signal");
953 }
954 cleanup:
955 sdsfree(server.bgrewritebuf);
956 server.bgrewritebuf = sdsempty();
957 aofRemoveTempFile(server.bgrewritechildpid);
958 server.bgrewritechildpid = -1;
959 }
960
961 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
962 int j, loops = server.cronloops++;
963 REDIS_NOTUSED(eventLoop);
964 REDIS_NOTUSED(id);
965 REDIS_NOTUSED(clientData);
966
967 /* Update the global state with the amount of used memory */
968 server.usedmemory = zmalloc_used_memory();
969
970 /* Show some info about non-empty databases */
971 for (j = 0; j < server.dbnum; j++) {
972 long long size, used, vkeys;
973
974 size = dictSlots(server.db[j].dict);
975 used = dictSize(server.db[j].dict);
976 vkeys = dictSize(server.db[j].expires);
977 if (!(loops % 5) && (used || vkeys)) {
978 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
979 /* dictPrintStats(server.dict); */
980 }
981 }
982
983 /* We don't want to resize the hash tables while a bacground saving
984 * is in progress: the saving child is created using fork() that is
985 * implemented with a copy-on-write semantic in most modern systems, so
986 * if we resize the HT while there is the saving child at work actually
987 * a lot of memory movements in the parent will cause a lot of pages
988 * copied. */
989 if (server.bgsavechildpid == -1) tryResizeHashTables();
990
991 /* Show information about connected clients */
992 if (!(loops % 5)) {
993 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
994 listLength(server.clients)-listLength(server.slaves),
995 listLength(server.slaves),
996 server.usedmemory,
997 dictSize(server.sharingpool));
998 }
999
1000 /* Close connections of timedout clients */
1001 if (server.maxidletime && !(loops % 10))
1002 closeTimedoutClients();
1003
1004 /* Check if a background saving or AOF rewrite in progress terminated */
1005 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
1006 int statloc;
1007 pid_t pid;
1008
1009 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1010 if (pid == server.bgsavechildpid) {
1011 backgroundSaveDoneHandler(statloc);
1012 } else {
1013 backgroundRewriteDoneHandler(statloc);
1014 }
1015 }
1016 } else {
1017 /* If there is not a background saving in progress check if
1018 * we have to save now */
1019 time_t now = time(NULL);
1020 for (j = 0; j < server.saveparamslen; j++) {
1021 struct saveparam *sp = server.saveparams+j;
1022
1023 if (server.dirty >= sp->changes &&
1024 now-server.lastsave > sp->seconds) {
1025 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1026 sp->changes, sp->seconds);
1027 rdbSaveBackground(server.dbfilename);
1028 break;
1029 }
1030 }
1031 }
1032
1033 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1034 * will use few CPU cycles if there are few expiring keys, otherwise
1035 * it will get more aggressive to avoid that too much memory is used by
1036 * keys that can be removed from the keyspace. */
1037 for (j = 0; j < server.dbnum; j++) {
1038 int expired;
1039 redisDb *db = server.db+j;
1040
1041 /* Continue to expire if at the end of the cycle more than 25%
1042 * of the keys were expired. */
1043 do {
1044 int num = dictSize(db->expires);
1045 time_t now = time(NULL);
1046
1047 expired = 0;
1048 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1049 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1050 while (num--) {
1051 dictEntry *de;
1052 time_t t;
1053
1054 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1055 t = (time_t) dictGetEntryVal(de);
1056 if (now > t) {
1057 deleteKey(db,dictGetEntryKey(de));
1058 expired++;
1059 }
1060 }
1061 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1062 }
1063
1064 /* Check if we should connect to a MASTER */
1065 if (server.replstate == REDIS_REPL_CONNECT) {
1066 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1067 if (syncWithMaster() == REDIS_OK) {
1068 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1069 }
1070 }
1071 return 1000;
1072 }
1073
1074 static void createSharedObjects(void) {
1075 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1076 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1077 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1078 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1079 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1080 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1081 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1082 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1083 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1084 /* no such key */
1085 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1086 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1087 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1088 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1089 "-ERR no such key\r\n"));
1090 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1091 "-ERR syntax error\r\n"));
1092 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1093 "-ERR source and destination objects are the same\r\n"));
1094 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1095 "-ERR index out of range\r\n"));
1096 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1097 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1098 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1099 shared.select0 = createStringObject("select 0\r\n",10);
1100 shared.select1 = createStringObject("select 1\r\n",10);
1101 shared.select2 = createStringObject("select 2\r\n",10);
1102 shared.select3 = createStringObject("select 3\r\n",10);
1103 shared.select4 = createStringObject("select 4\r\n",10);
1104 shared.select5 = createStringObject("select 5\r\n",10);
1105 shared.select6 = createStringObject("select 6\r\n",10);
1106 shared.select7 = createStringObject("select 7\r\n",10);
1107 shared.select8 = createStringObject("select 8\r\n",10);
1108 shared.select9 = createStringObject("select 9\r\n",10);
1109 }
1110
1111 static void appendServerSaveParams(time_t seconds, int changes) {
1112 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1113 server.saveparams[server.saveparamslen].seconds = seconds;
1114 server.saveparams[server.saveparamslen].changes = changes;
1115 server.saveparamslen++;
1116 }
1117
1118 static void resetServerSaveParams() {
1119 zfree(server.saveparams);
1120 server.saveparams = NULL;
1121 server.saveparamslen = 0;
1122 }
1123
1124 static void initServerConfig() {
1125 server.dbnum = REDIS_DEFAULT_DBNUM;
1126 server.port = REDIS_SERVERPORT;
1127 server.verbosity = REDIS_DEBUG;
1128 server.maxidletime = REDIS_MAXIDLETIME;
1129 server.saveparams = NULL;
1130 server.logfile = NULL; /* NULL = log on standard output */
1131 server.bindaddr = NULL;
1132 server.glueoutputbuf = 1;
1133 server.daemonize = 0;
1134 server.appendonly = 0;
1135 server.appendfsync = APPENDFSYNC_ALWAYS;
1136 server.lastfsync = time(NULL);
1137 server.appendfd = -1;
1138 server.appendseldb = -1; /* Make sure the first time will not match */
1139 server.pidfile = "/var/run/redis.pid";
1140 server.dbfilename = "dump.rdb";
1141 server.appendfilename = "appendonly.aof";
1142 server.requirepass = NULL;
1143 server.shareobjects = 0;
1144 server.sharingpoolsize = 1024;
1145 server.maxclients = 0;
1146 server.maxmemory = 0;
1147 resetServerSaveParams();
1148
1149 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1150 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1151 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1152 /* Replication related */
1153 server.isslave = 0;
1154 server.masterauth = NULL;
1155 server.masterhost = NULL;
1156 server.masterport = 6379;
1157 server.master = NULL;
1158 server.replstate = REDIS_REPL_NONE;
1159
1160 /* Double constants initialization */
1161 R_Zero = 0.0;
1162 R_PosInf = 1.0/R_Zero;
1163 R_NegInf = -1.0/R_Zero;
1164 R_Nan = R_Zero/R_Zero;
1165 }
1166
1167 static void initServer() {
1168 int j;
1169
1170 signal(SIGHUP, SIG_IGN);
1171 signal(SIGPIPE, SIG_IGN);
1172 setupSigSegvAction();
1173
1174 server.clients = listCreate();
1175 server.slaves = listCreate();
1176 server.monitors = listCreate();
1177 server.objfreelist = listCreate();
1178 createSharedObjects();
1179 server.el = aeCreateEventLoop();
1180 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1181 server.sharingpool = dictCreate(&setDictType,NULL);
1182 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1183 if (server.fd == -1) {
1184 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1185 exit(1);
1186 }
1187 for (j = 0; j < server.dbnum; j++) {
1188 server.db[j].dict = dictCreate(&hashDictType,NULL);
1189 server.db[j].expires = dictCreate(&setDictType,NULL);
1190 server.db[j].id = j;
1191 }
1192 server.cronloops = 0;
1193 server.bgsavechildpid = -1;
1194 server.bgrewritechildpid = -1;
1195 server.bgrewritebuf = sdsempty();
1196 server.lastsave = time(NULL);
1197 server.dirty = 0;
1198 server.usedmemory = 0;
1199 server.stat_numcommands = 0;
1200 server.stat_numconnections = 0;
1201 server.stat_starttime = time(NULL);
1202 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1203
1204 if (server.appendonly) {
1205 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1206 if (server.appendfd == -1) {
1207 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1208 strerror(errno));
1209 exit(1);
1210 }
1211 }
1212 }
1213
1214 /* Empty the whole database */
1215 static long long emptyDb() {
1216 int j;
1217 long long removed = 0;
1218
1219 for (j = 0; j < server.dbnum; j++) {
1220 removed += dictSize(server.db[j].dict);
1221 dictEmpty(server.db[j].dict);
1222 dictEmpty(server.db[j].expires);
1223 }
1224 return removed;
1225 }
1226
1227 static int yesnotoi(char *s) {
1228 if (!strcasecmp(s,"yes")) return 1;
1229 else if (!strcasecmp(s,"no")) return 0;
1230 else return -1;
1231 }
1232
1233 /* I agree, this is a very rudimental way to load a configuration...
1234 will improve later if the config gets more complex */
1235 static void loadServerConfig(char *filename) {
1236 FILE *fp;
1237 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1238 int linenum = 0;
1239 sds line = NULL;
1240
1241 if (filename[0] == '-' && filename[1] == '\0')
1242 fp = stdin;
1243 else {
1244 if ((fp = fopen(filename,"r")) == NULL) {
1245 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1246 exit(1);
1247 }
1248 }
1249
1250 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1251 sds *argv;
1252 int argc, j;
1253
1254 linenum++;
1255 line = sdsnew(buf);
1256 line = sdstrim(line," \t\r\n");
1257
1258 /* Skip comments and blank lines*/
1259 if (line[0] == '#' || line[0] == '\0') {
1260 sdsfree(line);
1261 continue;
1262 }
1263
1264 /* Split into arguments */
1265 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1266 sdstolower(argv[0]);
1267
1268 /* Execute config directives */
1269 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1270 server.maxidletime = atoi(argv[1]);
1271 if (server.maxidletime < 0) {
1272 err = "Invalid timeout value"; goto loaderr;
1273 }
1274 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1275 server.port = atoi(argv[1]);
1276 if (server.port < 1 || server.port > 65535) {
1277 err = "Invalid port"; goto loaderr;
1278 }
1279 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1280 server.bindaddr = zstrdup(argv[1]);
1281 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1282 int seconds = atoi(argv[1]);
1283 int changes = atoi(argv[2]);
1284 if (seconds < 1 || changes < 0) {
1285 err = "Invalid save parameters"; goto loaderr;
1286 }
1287 appendServerSaveParams(seconds,changes);
1288 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1289 if (chdir(argv[1]) == -1) {
1290 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1291 argv[1], strerror(errno));
1292 exit(1);
1293 }
1294 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1295 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1296 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1297 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1298 else {
1299 err = "Invalid log level. Must be one of debug, notice, warning";
1300 goto loaderr;
1301 }
1302 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1303 FILE *logfp;
1304
1305 server.logfile = zstrdup(argv[1]);
1306 if (!strcasecmp(server.logfile,"stdout")) {
1307 zfree(server.logfile);
1308 server.logfile = NULL;
1309 }
1310 if (server.logfile) {
1311 /* Test if we are able to open the file. The server will not
1312 * be able to abort just for this problem later... */
1313 logfp = fopen(server.logfile,"a");
1314 if (logfp == NULL) {
1315 err = sdscatprintf(sdsempty(),
1316 "Can't open the log file: %s", strerror(errno));
1317 goto loaderr;
1318 }
1319 fclose(logfp);
1320 }
1321 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1322 server.dbnum = atoi(argv[1]);
1323 if (server.dbnum < 1) {
1324 err = "Invalid number of databases"; goto loaderr;
1325 }
1326 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1327 server.maxclients = atoi(argv[1]);
1328 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1329 server.maxmemory = strtoll(argv[1], NULL, 10);
1330 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1331 server.masterhost = sdsnew(argv[1]);
1332 server.masterport = atoi(argv[2]);
1333 server.replstate = REDIS_REPL_CONNECT;
1334 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1335 server.masterauth = zstrdup(argv[1]);
1336 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1337 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1338 err = "argument must be 'yes' or 'no'"; goto loaderr;
1339 }
1340 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1341 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1342 err = "argument must be 'yes' or 'no'"; goto loaderr;
1343 }
1344 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1345 server.sharingpoolsize = atoi(argv[1]);
1346 if (server.sharingpoolsize < 1) {
1347 err = "invalid object sharing pool size"; goto loaderr;
1348 }
1349 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1350 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1351 err = "argument must be 'yes' or 'no'"; goto loaderr;
1352 }
1353 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1354 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1355 err = "argument must be 'yes' or 'no'"; goto loaderr;
1356 }
1357 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1358 if (!strcasecmp(argv[1],"no")) {
1359 server.appendfsync = APPENDFSYNC_NO;
1360 } else if (!strcasecmp(argv[1],"always")) {
1361 server.appendfsync = APPENDFSYNC_ALWAYS;
1362 } else if (!strcasecmp(argv[1],"everysec")) {
1363 server.appendfsync = APPENDFSYNC_EVERYSEC;
1364 } else {
1365 err = "argument must be 'no', 'always' or 'everysec'";
1366 goto loaderr;
1367 }
1368 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1369 server.requirepass = zstrdup(argv[1]);
1370 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1371 server.pidfile = zstrdup(argv[1]);
1372 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1373 server.dbfilename = zstrdup(argv[1]);
1374 } else {
1375 err = "Bad directive or wrong number of arguments"; goto loaderr;
1376 }
1377 for (j = 0; j < argc; j++)
1378 sdsfree(argv[j]);
1379 zfree(argv);
1380 sdsfree(line);
1381 }
1382 if (fp != stdin) fclose(fp);
1383 return;
1384
1385 loaderr:
1386 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1387 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1388 fprintf(stderr, ">>> '%s'\n", line);
1389 fprintf(stderr, "%s\n", err);
1390 exit(1);
1391 }
1392
1393 static void freeClientArgv(redisClient *c) {
1394 int j;
1395
1396 for (j = 0; j < c->argc; j++)
1397 decrRefCount(c->argv[j]);
1398 for (j = 0; j < c->mbargc; j++)
1399 decrRefCount(c->mbargv[j]);
1400 c->argc = 0;
1401 c->mbargc = 0;
1402 }
1403
1404 static void freeClient(redisClient *c) {
1405 listNode *ln;
1406
1407 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1408 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1409 sdsfree(c->querybuf);
1410 listRelease(c->reply);
1411 freeClientArgv(c);
1412 close(c->fd);
1413 ln = listSearchKey(server.clients,c);
1414 redisAssert(ln != NULL);
1415 listDelNode(server.clients,ln);
1416 if (c->flags & REDIS_SLAVE) {
1417 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1418 close(c->repldbfd);
1419 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1420 ln = listSearchKey(l,c);
1421 redisAssert(ln != NULL);
1422 listDelNode(l,ln);
1423 }
1424 if (c->flags & REDIS_MASTER) {
1425 server.master = NULL;
1426 server.replstate = REDIS_REPL_CONNECT;
1427 }
1428 zfree(c->argv);
1429 zfree(c->mbargv);
1430 zfree(c);
1431 }
1432
1433 #define GLUEREPLY_UP_TO (1024)
1434 static void glueReplyBuffersIfNeeded(redisClient *c) {
1435 int copylen = 0;
1436 char buf[GLUEREPLY_UP_TO];
1437 listNode *ln;
1438 robj *o;
1439
1440 listRewind(c->reply);
1441 while((ln = listYield(c->reply))) {
1442 int objlen;
1443
1444 o = ln->value;
1445 objlen = sdslen(o->ptr);
1446 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1447 memcpy(buf+copylen,o->ptr,objlen);
1448 copylen += objlen;
1449 listDelNode(c->reply,ln);
1450 } else {
1451 if (copylen == 0) return;
1452 break;
1453 }
1454 }
1455 /* Now the output buffer is empty, add the new single element */
1456 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1457 listAddNodeHead(c->reply,o);
1458 }
1459
1460 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1461 redisClient *c = privdata;
1462 int nwritten = 0, totwritten = 0, objlen;
1463 robj *o;
1464 REDIS_NOTUSED(el);
1465 REDIS_NOTUSED(mask);
1466
1467 /* Use writev() if we have enough buffers to send */
1468 if (!server.glueoutputbuf &&
1469 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1470 !(c->flags & REDIS_MASTER))
1471 {
1472 sendReplyToClientWritev(el, fd, privdata, mask);
1473 return;
1474 }
1475
1476 while(listLength(c->reply)) {
1477 if (server.glueoutputbuf && listLength(c->reply) > 1)
1478 glueReplyBuffersIfNeeded(c);
1479
1480 o = listNodeValue(listFirst(c->reply));
1481 objlen = sdslen(o->ptr);
1482
1483 if (objlen == 0) {
1484 listDelNode(c->reply,listFirst(c->reply));
1485 continue;
1486 }
1487
1488 if (c->flags & REDIS_MASTER) {
1489 /* Don't reply to a master */
1490 nwritten = objlen - c->sentlen;
1491 } else {
1492 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1493 if (nwritten <= 0) break;
1494 }
1495 c->sentlen += nwritten;
1496 totwritten += nwritten;
1497 /* If we fully sent the object on head go to the next one */
1498 if (c->sentlen == objlen) {
1499 listDelNode(c->reply,listFirst(c->reply));
1500 c->sentlen = 0;
1501 }
1502 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1503 * bytes, in a single threaded server it's a good idea to serve
1504 * other clients as well, even if a very large request comes from
1505 * super fast link that is always able to accept data (in real world
1506 * scenario think about 'KEYS *' against the loopback interfae) */
1507 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1508 }
1509 if (nwritten == -1) {
1510 if (errno == EAGAIN) {
1511 nwritten = 0;
1512 } else {
1513 redisLog(REDIS_DEBUG,
1514 "Error writing to client: %s", strerror(errno));
1515 freeClient(c);
1516 return;
1517 }
1518 }
1519 if (totwritten > 0) c->lastinteraction = time(NULL);
1520 if (listLength(c->reply) == 0) {
1521 c->sentlen = 0;
1522 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1523 }
1524 }
1525
1526 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1527 {
1528 redisClient *c = privdata;
1529 int nwritten = 0, totwritten = 0, objlen, willwrite;
1530 robj *o;
1531 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1532 int offset, ion = 0;
1533 REDIS_NOTUSED(el);
1534 REDIS_NOTUSED(mask);
1535
1536 listNode *node;
1537 while (listLength(c->reply)) {
1538 offset = c->sentlen;
1539 ion = 0;
1540 willwrite = 0;
1541
1542 /* fill-in the iov[] array */
1543 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1544 o = listNodeValue(node);
1545 objlen = sdslen(o->ptr);
1546
1547 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1548 break;
1549
1550 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1551 break; /* no more iovecs */
1552
1553 iov[ion].iov_base = ((char*)o->ptr) + offset;
1554 iov[ion].iov_len = objlen - offset;
1555 willwrite += objlen - offset;
1556 offset = 0; /* just for the first item */
1557 ion++;
1558 }
1559
1560 if(willwrite == 0)
1561 break;
1562
1563 /* write all collected blocks at once */
1564 if((nwritten = writev(fd, iov, ion)) < 0) {
1565 if (errno != EAGAIN) {
1566 redisLog(REDIS_DEBUG,
1567 "Error writing to client: %s", strerror(errno));
1568 freeClient(c);
1569 return;
1570 }
1571 break;
1572 }
1573
1574 totwritten += nwritten;
1575 offset = c->sentlen;
1576
1577 /* remove written robjs from c->reply */
1578 while (nwritten && listLength(c->reply)) {
1579 o = listNodeValue(listFirst(c->reply));
1580 objlen = sdslen(o->ptr);
1581
1582 if(nwritten >= objlen - offset) {
1583 listDelNode(c->reply, listFirst(c->reply));
1584 nwritten -= objlen - offset;
1585 c->sentlen = 0;
1586 } else {
1587 /* partial write */
1588 c->sentlen += nwritten;
1589 break;
1590 }
1591 offset = 0;
1592 }
1593 }
1594
1595 if (totwritten > 0)
1596 c->lastinteraction = time(NULL);
1597
1598 if (listLength(c->reply) == 0) {
1599 c->sentlen = 0;
1600 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1601 }
1602 }
1603
1604 static struct redisCommand *lookupCommand(char *name) {
1605 int j = 0;
1606 while(cmdTable[j].name != NULL) {
1607 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1608 j++;
1609 }
1610 return NULL;
1611 }
1612
1613 /* resetClient prepare the client to process the next command */
1614 static void resetClient(redisClient *c) {
1615 freeClientArgv(c);
1616 c->bulklen = -1;
1617 c->multibulk = 0;
1618 }
1619
1620 /* If this function gets called we already read a whole
1621 * command, argments are in the client argv/argc fields.
1622 * processCommand() execute the command or prepare the
1623 * server for a bulk read from the client.
1624 *
1625 * If 1 is returned the client is still alive and valid and
1626 * and other operations can be performed by the caller. Otherwise
1627 * if 0 is returned the client was destroied (i.e. after QUIT). */
1628 static int processCommand(redisClient *c) {
1629 struct redisCommand *cmd;
1630 long long dirty;
1631
1632 /* Free some memory if needed (maxmemory setting) */
1633 if (server.maxmemory) freeMemoryIfNeeded();
1634
1635 /* Handle the multi bulk command type. This is an alternative protocol
1636 * supported by Redis in order to receive commands that are composed of
1637 * multiple binary-safe "bulk" arguments. The latency of processing is
1638 * a bit higher but this allows things like multi-sets, so if this
1639 * protocol is used only for MSET and similar commands this is a big win. */
1640 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1641 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1642 if (c->multibulk <= 0) {
1643 resetClient(c);
1644 return 1;
1645 } else {
1646 decrRefCount(c->argv[c->argc-1]);
1647 c->argc--;
1648 return 1;
1649 }
1650 } else if (c->multibulk) {
1651 if (c->bulklen == -1) {
1652 if (((char*)c->argv[0]->ptr)[0] != '$') {
1653 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1654 resetClient(c);
1655 return 1;
1656 } else {
1657 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1658 decrRefCount(c->argv[0]);
1659 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1660 c->argc--;
1661 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1662 resetClient(c);
1663 return 1;
1664 }
1665 c->argc--;
1666 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1667 return 1;
1668 }
1669 } else {
1670 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1671 c->mbargv[c->mbargc] = c->argv[0];
1672 c->mbargc++;
1673 c->argc--;
1674 c->multibulk--;
1675 if (c->multibulk == 0) {
1676 robj **auxargv;
1677 int auxargc;
1678
1679 /* Here we need to swap the multi-bulk argc/argv with the
1680 * normal argc/argv of the client structure. */
1681 auxargv = c->argv;
1682 c->argv = c->mbargv;
1683 c->mbargv = auxargv;
1684
1685 auxargc = c->argc;
1686 c->argc = c->mbargc;
1687 c->mbargc = auxargc;
1688
1689 /* We need to set bulklen to something different than -1
1690 * in order for the code below to process the command without
1691 * to try to read the last argument of a bulk command as
1692 * a special argument. */
1693 c->bulklen = 0;
1694 /* continue below and process the command */
1695 } else {
1696 c->bulklen = -1;
1697 return 1;
1698 }
1699 }
1700 }
1701 /* -- end of multi bulk commands processing -- */
1702
1703 /* The QUIT command is handled as a special case. Normal command
1704 * procs are unable to close the client connection safely */
1705 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1706 freeClient(c);
1707 return 0;
1708 }
1709 cmd = lookupCommand(c->argv[0]->ptr);
1710 if (!cmd) {
1711 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1712 resetClient(c);
1713 return 1;
1714 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1715 (c->argc < -cmd->arity)) {
1716 addReplySds(c,
1717 sdscatprintf(sdsempty(),
1718 "-ERR wrong number of arguments for '%s' command\r\n",
1719 cmd->name));
1720 resetClient(c);
1721 return 1;
1722 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1723 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1724 resetClient(c);
1725 return 1;
1726 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1727 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1728
1729 decrRefCount(c->argv[c->argc-1]);
1730 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1731 c->argc--;
1732 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1733 resetClient(c);
1734 return 1;
1735 }
1736 c->argc--;
1737 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1738 /* It is possible that the bulk read is already in the
1739 * buffer. Check this condition and handle it accordingly.
1740 * This is just a fast path, alternative to call processInputBuffer().
1741 * It's a good idea since the code is small and this condition
1742 * happens most of the times. */
1743 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1744 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1745 c->argc++;
1746 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1747 } else {
1748 return 1;
1749 }
1750 }
1751 /* Let's try to share objects on the command arguments vector */
1752 if (server.shareobjects) {
1753 int j;
1754 for(j = 1; j < c->argc; j++)
1755 c->argv[j] = tryObjectSharing(c->argv[j]);
1756 }
1757 /* Let's try to encode the bulk object to save space. */
1758 if (cmd->flags & REDIS_CMD_BULK)
1759 tryObjectEncoding(c->argv[c->argc-1]);
1760
1761 /* Check if the user is authenticated */
1762 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1763 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1764 resetClient(c);
1765 return 1;
1766 }
1767
1768 /* Exec the command */
1769 dirty = server.dirty;
1770 cmd->proc(c);
1771 if (server.appendonly && server.dirty-dirty)
1772 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1773 if (server.dirty-dirty && listLength(server.slaves))
1774 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1775 if (listLength(server.monitors))
1776 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1777 server.stat_numcommands++;
1778
1779 /* Prepare the client for the next command */
1780 if (c->flags & REDIS_CLOSE) {
1781 freeClient(c);
1782 return 0;
1783 }
1784 resetClient(c);
1785 return 1;
1786 }
1787
1788 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1789 listNode *ln;
1790 int outc = 0, j;
1791 robj **outv;
1792 /* (args*2)+1 is enough room for args, spaces, newlines */
1793 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1794
1795 if (argc <= REDIS_STATIC_ARGS) {
1796 outv = static_outv;
1797 } else {
1798 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1799 }
1800
1801 for (j = 0; j < argc; j++) {
1802 if (j != 0) outv[outc++] = shared.space;
1803 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1804 robj *lenobj;
1805
1806 lenobj = createObject(REDIS_STRING,
1807 sdscatprintf(sdsempty(),"%lu\r\n",
1808 (unsigned long) stringObjectLen(argv[j])));
1809 lenobj->refcount = 0;
1810 outv[outc++] = lenobj;
1811 }
1812 outv[outc++] = argv[j];
1813 }
1814 outv[outc++] = shared.crlf;
1815
1816 /* Increment all the refcounts at start and decrement at end in order to
1817 * be sure to free objects if there is no slave in a replication state
1818 * able to be feed with commands */
1819 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1820 listRewind(slaves);
1821 while((ln = listYield(slaves))) {
1822 redisClient *slave = ln->value;
1823
1824 /* Don't feed slaves that are still waiting for BGSAVE to start */
1825 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1826
1827 /* Feed all the other slaves, MONITORs and so on */
1828 if (slave->slaveseldb != dictid) {
1829 robj *selectcmd;
1830
1831 switch(dictid) {
1832 case 0: selectcmd = shared.select0; break;
1833 case 1: selectcmd = shared.select1; break;
1834 case 2: selectcmd = shared.select2; break;
1835 case 3: selectcmd = shared.select3; break;
1836 case 4: selectcmd = shared.select4; break;
1837 case 5: selectcmd = shared.select5; break;
1838 case 6: selectcmd = shared.select6; break;
1839 case 7: selectcmd = shared.select7; break;
1840 case 8: selectcmd = shared.select8; break;
1841 case 9: selectcmd = shared.select9; break;
1842 default:
1843 selectcmd = createObject(REDIS_STRING,
1844 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1845 selectcmd->refcount = 0;
1846 break;
1847 }
1848 addReply(slave,selectcmd);
1849 slave->slaveseldb = dictid;
1850 }
1851 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1852 }
1853 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1854 if (outv != static_outv) zfree(outv);
1855 }
1856
1857 static void processInputBuffer(redisClient *c) {
1858 again:
1859 if (c->bulklen == -1) {
1860 /* Read the first line of the query */
1861 char *p = strchr(c->querybuf,'\n');
1862 size_t querylen;
1863
1864 if (p) {
1865 sds query, *argv;
1866 int argc, j;
1867
1868 query = c->querybuf;
1869 c->querybuf = sdsempty();
1870 querylen = 1+(p-(query));
1871 if (sdslen(query) > querylen) {
1872 /* leave data after the first line of the query in the buffer */
1873 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1874 }
1875 *p = '\0'; /* remove "\n" */
1876 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1877 sdsupdatelen(query);
1878
1879 /* Now we can split the query in arguments */
1880 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1881 sdsfree(query);
1882
1883 if (c->argv) zfree(c->argv);
1884 c->argv = zmalloc(sizeof(robj*)*argc);
1885
1886 for (j = 0; j < argc; j++) {
1887 if (sdslen(argv[j])) {
1888 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1889 c->argc++;
1890 } else {
1891 sdsfree(argv[j]);
1892 }
1893 }
1894 zfree(argv);
1895 if (c->argc) {
1896 /* Execute the command. If the client is still valid
1897 * after processCommand() return and there is something
1898 * on the query buffer try to process the next command. */
1899 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1900 } else {
1901 /* Nothing to process, argc == 0. Just process the query
1902 * buffer if it's not empty or return to the caller */
1903 if (sdslen(c->querybuf)) goto again;
1904 }
1905 return;
1906 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1907 redisLog(REDIS_DEBUG, "Client protocol error");
1908 freeClient(c);
1909 return;
1910 }
1911 } else {
1912 /* Bulk read handling. Note that if we are at this point
1913 the client already sent a command terminated with a newline,
1914 we are reading the bulk data that is actually the last
1915 argument of the command. */
1916 int qbl = sdslen(c->querybuf);
1917
1918 if (c->bulklen <= qbl) {
1919 /* Copy everything but the final CRLF as final argument */
1920 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1921 c->argc++;
1922 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1923 /* Process the command. If the client is still valid after
1924 * the processing and there is more data in the buffer
1925 * try to parse it. */
1926 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1927 return;
1928 }
1929 }
1930 }
1931
1932 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1933 redisClient *c = (redisClient*) privdata;
1934 char buf[REDIS_IOBUF_LEN];
1935 int nread;
1936 REDIS_NOTUSED(el);
1937 REDIS_NOTUSED(mask);
1938
1939 nread = read(fd, buf, REDIS_IOBUF_LEN);
1940 if (nread == -1) {
1941 if (errno == EAGAIN) {
1942 nread = 0;
1943 } else {
1944 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1945 freeClient(c);
1946 return;
1947 }
1948 } else if (nread == 0) {
1949 redisLog(REDIS_DEBUG, "Client closed connection");
1950 freeClient(c);
1951 return;
1952 }
1953 if (nread) {
1954 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1955 c->lastinteraction = time(NULL);
1956 } else {
1957 return;
1958 }
1959 processInputBuffer(c);
1960 }
1961
1962 static int selectDb(redisClient *c, int id) {
1963 if (id < 0 || id >= server.dbnum)
1964 return REDIS_ERR;
1965 c->db = &server.db[id];
1966 return REDIS_OK;
1967 }
1968
1969 static void *dupClientReplyValue(void *o) {
1970 incrRefCount((robj*)o);
1971 return 0;
1972 }
1973
1974 static redisClient *createClient(int fd) {
1975 redisClient *c = zmalloc(sizeof(*c));
1976
1977 anetNonBlock(NULL,fd);
1978 anetTcpNoDelay(NULL,fd);
1979 if (!c) return NULL;
1980 selectDb(c,0);
1981 c->fd = fd;
1982 c->querybuf = sdsempty();
1983 c->argc = 0;
1984 c->argv = NULL;
1985 c->bulklen = -1;
1986 c->multibulk = 0;
1987 c->mbargc = 0;
1988 c->mbargv = NULL;
1989 c->sentlen = 0;
1990 c->flags = 0;
1991 c->lastinteraction = time(NULL);
1992 c->authenticated = 0;
1993 c->replstate = REDIS_REPL_NONE;
1994 c->reply = listCreate();
1995 listSetFreeMethod(c->reply,decrRefCount);
1996 listSetDupMethod(c->reply,dupClientReplyValue);
1997 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1998 readQueryFromClient, c) == AE_ERR) {
1999 freeClient(c);
2000 return NULL;
2001 }
2002 listAddNodeTail(server.clients,c);
2003 return c;
2004 }
2005
2006 static void addReply(redisClient *c, robj *obj) {
2007 if (listLength(c->reply) == 0 &&
2008 (c->replstate == REDIS_REPL_NONE ||
2009 c->replstate == REDIS_REPL_ONLINE) &&
2010 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
2011 sendReplyToClient, c) == AE_ERR) return;
2012 listAddNodeTail(c->reply,getDecodedObject(obj));
2013 }
2014
2015 static void addReplySds(redisClient *c, sds s) {
2016 robj *o = createObject(REDIS_STRING,s);
2017 addReply(c,o);
2018 decrRefCount(o);
2019 }
2020
2021 static void addReplyDouble(redisClient *c, double d) {
2022 char buf[128];
2023
2024 snprintf(buf,sizeof(buf),"%.17g",d);
2025 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2026 (unsigned long) strlen(buf),buf));
2027 }
2028
2029 static void addReplyBulkLen(redisClient *c, robj *obj) {
2030 size_t len;
2031
2032 if (obj->encoding == REDIS_ENCODING_RAW) {
2033 len = sdslen(obj->ptr);
2034 } else {
2035 long n = (long)obj->ptr;
2036
2037 len = 1;
2038 if (n < 0) {
2039 len++;
2040 n = -n;
2041 }
2042 while((n = n/10) != 0) {
2043 len++;
2044 }
2045 }
2046 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
2047 }
2048
2049 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2050 int cport, cfd;
2051 char cip[128];
2052 redisClient *c;
2053 REDIS_NOTUSED(el);
2054 REDIS_NOTUSED(mask);
2055 REDIS_NOTUSED(privdata);
2056
2057 cfd = anetAccept(server.neterr, fd, cip, &cport);
2058 if (cfd == AE_ERR) {
2059 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2060 return;
2061 }
2062 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2063 if ((c = createClient(cfd)) == NULL) {
2064 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2065 close(cfd); /* May be already closed, just ingore errors */
2066 return;
2067 }
2068 /* If maxclient directive is set and this is one client more... close the
2069 * connection. Note that we create the client instead to check before
2070 * for this condition, since now the socket is already set in nonblocking
2071 * mode and we can send an error for free using the Kernel I/O */
2072 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2073 char *err = "-ERR max number of clients reached\r\n";
2074
2075 /* That's a best effort error message, don't check write errors */
2076 if (write(c->fd,err,strlen(err)) == -1) {
2077 /* Nothing to do, Just to avoid the warning... */
2078 }
2079 freeClient(c);
2080 return;
2081 }
2082 server.stat_numconnections++;
2083 }
2084
2085 /* ======================= Redis objects implementation ===================== */
2086
2087 static robj *createObject(int type, void *ptr) {
2088 robj *o;
2089
2090 if (listLength(server.objfreelist)) {
2091 listNode *head = listFirst(server.objfreelist);
2092 o = listNodeValue(head);
2093 listDelNode(server.objfreelist,head);
2094 } else {
2095 o = zmalloc(sizeof(*o));
2096 }
2097 o->type = type;
2098 o->encoding = REDIS_ENCODING_RAW;
2099 o->ptr = ptr;
2100 o->refcount = 1;
2101 return o;
2102 }
2103
2104 static robj *createStringObject(char *ptr, size_t len) {
2105 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2106 }
2107
2108 static robj *createListObject(void) {
2109 list *l = listCreate();
2110
2111 listSetFreeMethod(l,decrRefCount);
2112 return createObject(REDIS_LIST,l);
2113 }
2114
2115 static robj *createSetObject(void) {
2116 dict *d = dictCreate(&setDictType,NULL);
2117 return createObject(REDIS_SET,d);
2118 }
2119
2120 static robj *createZsetObject(void) {
2121 zset *zs = zmalloc(sizeof(*zs));
2122
2123 zs->dict = dictCreate(&zsetDictType,NULL);
2124 zs->zsl = zslCreate();
2125 return createObject(REDIS_ZSET,zs);
2126 }
2127
2128 static void freeStringObject(robj *o) {
2129 if (o->encoding == REDIS_ENCODING_RAW) {
2130 sdsfree(o->ptr);
2131 }
2132 }
2133
2134 static void freeListObject(robj *o) {
2135 listRelease((list*) o->ptr);
2136 }
2137
2138 static void freeSetObject(robj *o) {
2139 dictRelease((dict*) o->ptr);
2140 }
2141
2142 static void freeZsetObject(robj *o) {
2143 zset *zs = o->ptr;
2144
2145 dictRelease(zs->dict);
2146 zslFree(zs->zsl);
2147 zfree(zs);
2148 }
2149
2150 static void freeHashObject(robj *o) {
2151 dictRelease((dict*) o->ptr);
2152 }
2153
2154 static void incrRefCount(robj *o) {
2155 o->refcount++;
2156 #ifdef DEBUG_REFCOUNT
2157 if (o->type == REDIS_STRING)
2158 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2159 #endif
2160 }
2161
2162 static void decrRefCount(void *obj) {
2163 robj *o = obj;
2164
2165 #ifdef DEBUG_REFCOUNT
2166 if (o->type == REDIS_STRING)
2167 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2168 #endif
2169 if (--(o->refcount) == 0) {
2170 switch(o->type) {
2171 case REDIS_STRING: freeStringObject(o); break;
2172 case REDIS_LIST: freeListObject(o); break;
2173 case REDIS_SET: freeSetObject(o); break;
2174 case REDIS_ZSET: freeZsetObject(o); break;
2175 case REDIS_HASH: freeHashObject(o); break;
2176 default: redisAssert(0 != 0); break;
2177 }
2178 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2179 !listAddNodeHead(server.objfreelist,o))
2180 zfree(o);
2181 }
2182 }
2183
2184 static robj *lookupKey(redisDb *db, robj *key) {
2185 dictEntry *de = dictFind(db->dict,key);
2186 return de ? dictGetEntryVal(de) : NULL;
2187 }
2188
2189 static robj *lookupKeyRead(redisDb *db, robj *key) {
2190 expireIfNeeded(db,key);
2191 return lookupKey(db,key);
2192 }
2193
2194 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2195 deleteIfVolatile(db,key);
2196 return lookupKey(db,key);
2197 }
2198
2199 static int deleteKey(redisDb *db, robj *key) {
2200 int retval;
2201
2202 /* We need to protect key from destruction: after the first dictDelete()
2203 * it may happen that 'key' is no longer valid if we don't increment
2204 * it's count. This may happen when we get the object reference directly
2205 * from the hash table with dictRandomKey() or dict iterators */
2206 incrRefCount(key);
2207 if (dictSize(db->expires)) dictDelete(db->expires,key);
2208 retval = dictDelete(db->dict,key);
2209 decrRefCount(key);
2210
2211 return retval == DICT_OK;
2212 }
2213
2214 /* Try to share an object against the shared objects pool */
2215 static robj *tryObjectSharing(robj *o) {
2216 struct dictEntry *de;
2217 unsigned long c;
2218
2219 if (o == NULL || server.shareobjects == 0) return o;
2220
2221 redisAssert(o->type == REDIS_STRING);
2222 de = dictFind(server.sharingpool,o);
2223 if (de) {
2224 robj *shared = dictGetEntryKey(de);
2225
2226 c = ((unsigned long) dictGetEntryVal(de))+1;
2227 dictGetEntryVal(de) = (void*) c;
2228 incrRefCount(shared);
2229 decrRefCount(o);
2230 return shared;
2231 } else {
2232 /* Here we are using a stream algorihtm: Every time an object is
2233 * shared we increment its count, everytime there is a miss we
2234 * recrement the counter of a random object. If this object reaches
2235 * zero we remove the object and put the current object instead. */
2236 if (dictSize(server.sharingpool) >=
2237 server.sharingpoolsize) {
2238 de = dictGetRandomKey(server.sharingpool);
2239 redisAssert(de != NULL);
2240 c = ((unsigned long) dictGetEntryVal(de))-1;
2241 dictGetEntryVal(de) = (void*) c;
2242 if (c == 0) {
2243 dictDelete(server.sharingpool,de->key);
2244 }
2245 } else {
2246 c = 0; /* If the pool is empty we want to add this object */
2247 }
2248 if (c == 0) {
2249 int retval;
2250
2251 retval = dictAdd(server.sharingpool,o,(void*)1);
2252 redisAssert(retval == DICT_OK);
2253 incrRefCount(o);
2254 }
2255 return o;
2256 }
2257 }
2258
2259 /* Check if the nul-terminated string 's' can be represented by a long
2260 * (that is, is a number that fits into long without any other space or
2261 * character before or after the digits).
2262 *
2263 * If so, the function returns REDIS_OK and *longval is set to the value
2264 * of the number. Otherwise REDIS_ERR is returned */
2265 static int isStringRepresentableAsLong(sds s, long *longval) {
2266 char buf[32], *endptr;
2267 long value;
2268 int slen;
2269
2270 value = strtol(s, &endptr, 10);
2271 if (endptr[0] != '\0') return REDIS_ERR;
2272 slen = snprintf(buf,32,"%ld",value);
2273
2274 /* If the number converted back into a string is not identical
2275 * then it's not possible to encode the string as integer */
2276 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2277 if (longval) *longval = value;
2278 return REDIS_OK;
2279 }
2280
2281 /* Try to encode a string object in order to save space */
2282 static int tryObjectEncoding(robj *o) {
2283 long value;
2284 sds s = o->ptr;
2285
2286 if (o->encoding != REDIS_ENCODING_RAW)
2287 return REDIS_ERR; /* Already encoded */
2288
2289 /* It's not save to encode shared objects: shared objects can be shared
2290 * everywhere in the "object space" of Redis. Encoded objects can only
2291 * appear as "values" (and not, for instance, as keys) */
2292 if (o->refcount > 1) return REDIS_ERR;
2293
2294 /* Currently we try to encode only strings */
2295 redisAssert(o->type == REDIS_STRING);
2296
2297 /* Check if we can represent this string as a long integer */
2298 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2299
2300 /* Ok, this object can be encoded */
2301 o->encoding = REDIS_ENCODING_INT;
2302 sdsfree(o->ptr);
2303 o->ptr = (void*) value;
2304 return REDIS_OK;
2305 }
2306
2307 /* Get a decoded version of an encoded object (returned as a new object).
2308 * If the object is already raw-encoded just increment the ref count. */
2309 static robj *getDecodedObject(robj *o) {
2310 robj *dec;
2311
2312 if (o->encoding == REDIS_ENCODING_RAW) {
2313 incrRefCount(o);
2314 return o;
2315 }
2316 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2317 char buf[32];
2318
2319 snprintf(buf,32,"%ld",(long)o->ptr);
2320 dec = createStringObject(buf,strlen(buf));
2321 return dec;
2322 } else {
2323 redisAssert(1 != 1);
2324 }
2325 }
2326
2327 /* Compare two string objects via strcmp() or alike.
2328 * Note that the objects may be integer-encoded. In such a case we
2329 * use snprintf() to get a string representation of the numbers on the stack
2330 * and compare the strings, it's much faster than calling getDecodedObject().
2331 *
2332 * Important note: if objects are not integer encoded, but binary-safe strings,
2333 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2334 * binary safe. */
2335 static int compareStringObjects(robj *a, robj *b) {
2336 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2337 char bufa[128], bufb[128], *astr, *bstr;
2338 int bothsds = 1;
2339
2340 if (a == b) return 0;
2341 if (a->encoding != REDIS_ENCODING_RAW) {
2342 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2343 astr = bufa;
2344 bothsds = 0;
2345 } else {
2346 astr = a->ptr;
2347 }
2348 if (b->encoding != REDIS_ENCODING_RAW) {
2349 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2350 bstr = bufb;
2351 bothsds = 0;
2352 } else {
2353 bstr = b->ptr;
2354 }
2355 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2356 }
2357
2358 static size_t stringObjectLen(robj *o) {
2359 redisAssert(o->type == REDIS_STRING);
2360 if (o->encoding == REDIS_ENCODING_RAW) {
2361 return sdslen(o->ptr);
2362 } else {
2363 char buf[32];
2364
2365 return snprintf(buf,32,"%ld",(long)o->ptr);
2366 }
2367 }
2368
2369 /*============================ DB saving/loading ============================ */
2370
2371 static int rdbSaveType(FILE *fp, unsigned char type) {
2372 if (fwrite(&type,1,1,fp) == 0) return -1;
2373 return 0;
2374 }
2375
2376 static int rdbSaveTime(FILE *fp, time_t t) {
2377 int32_t t32 = (int32_t) t;
2378 if (fwrite(&t32,4,1,fp) == 0) return -1;
2379 return 0;
2380 }
2381
2382 /* check rdbLoadLen() comments for more info */
2383 static int rdbSaveLen(FILE *fp, uint32_t len) {
2384 unsigned char buf[2];
2385
2386 if (len < (1<<6)) {
2387 /* Save a 6 bit len */
2388 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2389 if (fwrite(buf,1,1,fp) == 0) return -1;
2390 } else if (len < (1<<14)) {
2391 /* Save a 14 bit len */
2392 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2393 buf[1] = len&0xFF;
2394 if (fwrite(buf,2,1,fp) == 0) return -1;
2395 } else {
2396 /* Save a 32 bit len */
2397 buf[0] = (REDIS_RDB_32BITLEN<<6);
2398 if (fwrite(buf,1,1,fp) == 0) return -1;
2399 len = htonl(len);
2400 if (fwrite(&len,4,1,fp) == 0) return -1;
2401 }
2402 return 0;
2403 }
2404
2405 /* String objects in the form "2391" "-100" without any space and with a
2406 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2407 * encoded as integers to save space */
2408 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2409 long long value;
2410 char *endptr, buf[32];
2411
2412 /* Check if it's possible to encode this value as a number */
2413 value = strtoll(s, &endptr, 10);
2414 if (endptr[0] != '\0') return 0;
2415 snprintf(buf,32,"%lld",value);
2416
2417 /* If the number converted back into a string is not identical
2418 * then it's not possible to encode the string as integer */
2419 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2420
2421 /* Finally check if it fits in our ranges */
2422 if (value >= -(1<<7) && value <= (1<<7)-1) {
2423 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2424 enc[1] = value&0xFF;
2425 return 2;
2426 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2427 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2428 enc[1] = value&0xFF;
2429 enc[2] = (value>>8)&0xFF;
2430 return 3;
2431 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2432 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2433 enc[1] = value&0xFF;
2434 enc[2] = (value>>8)&0xFF;
2435 enc[3] = (value>>16)&0xFF;
2436 enc[4] = (value>>24)&0xFF;
2437 return 5;
2438 } else {
2439 return 0;
2440 }
2441 }
2442
2443 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2444 unsigned int comprlen, outlen;
2445 unsigned char byte;
2446 void *out;
2447
2448 /* We require at least four bytes compression for this to be worth it */
2449 outlen = sdslen(obj->ptr)-4;
2450 if (outlen <= 0) return 0;
2451 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2452 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2453 if (comprlen == 0) {
2454 zfree(out);
2455 return 0;
2456 }
2457 /* Data compressed! Let's save it on disk */
2458 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2459 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2460 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2461 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2462 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2463 zfree(out);
2464 return comprlen;
2465
2466 writeerr:
2467 zfree(out);
2468 return -1;
2469 }
2470
2471 /* Save a string objet as [len][data] on disk. If the object is a string
2472 * representation of an integer value we try to safe it in a special form */
2473 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2474 size_t len;
2475 int enclen;
2476
2477 len = sdslen(obj->ptr);
2478
2479 /* Try integer encoding */
2480 if (len <= 11) {
2481 unsigned char buf[5];
2482 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2483 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2484 return 0;
2485 }
2486 }
2487
2488 /* Try LZF compression - under 20 bytes it's unable to compress even
2489 * aaaaaaaaaaaaaaaaaa so skip it */
2490 if (len > 20) {
2491 int retval;
2492
2493 retval = rdbSaveLzfStringObject(fp,obj);
2494 if (retval == -1) return -1;
2495 if (retval > 0) return 0;
2496 /* retval == 0 means data can't be compressed, save the old way */
2497 }
2498
2499 /* Store verbatim */
2500 if (rdbSaveLen(fp,len) == -1) return -1;
2501 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2502 return 0;
2503 }
2504
2505 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2506 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2507 int retval;
2508
2509 obj = getDecodedObject(obj);
2510 retval = rdbSaveStringObjectRaw(fp,obj);
2511 decrRefCount(obj);
2512 return retval;
2513 }
2514
2515 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2516 * 8 bit integer specifing the length of the representation.
2517 * This 8 bit integer has special values in order to specify the following
2518 * conditions:
2519 * 253: not a number
2520 * 254: + inf
2521 * 255: - inf
2522 */
2523 static int rdbSaveDoubleValue(FILE *fp, double val) {
2524 unsigned char buf[128];
2525 int len;
2526
2527 if (isnan(val)) {
2528 buf[0] = 253;
2529 len = 1;
2530 } else if (!isfinite(val)) {
2531 len = 1;
2532 buf[0] = (val < 0) ? 255 : 254;
2533 } else {
2534 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2535 buf[0] = strlen((char*)buf+1);
2536 len = buf[0]+1;
2537 }
2538 if (fwrite(buf,len,1,fp) == 0) return -1;
2539 return 0;
2540 }
2541
2542 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2543 static int rdbSave(char *filename) {
2544 dictIterator *di = NULL;
2545 dictEntry *de;
2546 FILE *fp;
2547 char tmpfile[256];
2548 int j;
2549 time_t now = time(NULL);
2550
2551 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2552 fp = fopen(tmpfile,"w");
2553 if (!fp) {
2554 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2555 return REDIS_ERR;
2556 }
2557 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2558 for (j = 0; j < server.dbnum; j++) {
2559 redisDb *db = server.db+j;
2560 dict *d = db->dict;
2561 if (dictSize(d) == 0) continue;
2562 di = dictGetIterator(d);
2563 if (!di) {
2564 fclose(fp);
2565 return REDIS_ERR;
2566 }
2567
2568 /* Write the SELECT DB opcode */
2569 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2570 if (rdbSaveLen(fp,j) == -1) goto werr;
2571
2572 /* Iterate this DB writing every entry */
2573 while((de = dictNext(di)) != NULL) {
2574 robj *key = dictGetEntryKey(de);
2575 robj *o = dictGetEntryVal(de);
2576 time_t expiretime = getExpire(db,key);
2577
2578 /* Save the expire time */
2579 if (expiretime != -1) {
2580 /* If this key is already expired skip it */
2581 if (expiretime < now) continue;
2582 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2583 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2584 }
2585 /* Save the key and associated value */
2586 if (rdbSaveType(fp,o->type) == -1) goto werr;
2587 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2588 if (o->type == REDIS_STRING) {
2589 /* Save a string value */
2590 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2591 } else if (o->type == REDIS_LIST) {
2592 /* Save a list value */
2593 list *list = o->ptr;
2594 listNode *ln;
2595
2596 listRewind(list);
2597 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2598 while((ln = listYield(list))) {
2599 robj *eleobj = listNodeValue(ln);
2600
2601 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2602 }
2603 } else if (o->type == REDIS_SET) {
2604 /* Save a set value */
2605 dict *set = o->ptr;
2606 dictIterator *di = dictGetIterator(set);
2607 dictEntry *de;
2608
2609 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2610 while((de = dictNext(di)) != NULL) {
2611 robj *eleobj = dictGetEntryKey(de);
2612
2613 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2614 }
2615 dictReleaseIterator(di);
2616 } else if (o->type == REDIS_ZSET) {
2617 /* Save a set value */
2618 zset *zs = o->ptr;
2619 dictIterator *di = dictGetIterator(zs->dict);
2620 dictEntry *de;
2621
2622 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2623 while((de = dictNext(di)) != NULL) {
2624 robj *eleobj = dictGetEntryKey(de);
2625 double *score = dictGetEntryVal(de);
2626
2627 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2628 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2629 }
2630 dictReleaseIterator(di);
2631 } else {
2632 redisAssert(0 != 0);
2633 }
2634 }
2635 dictReleaseIterator(di);
2636 }
2637 /* EOF opcode */
2638 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2639
2640 /* Make sure data will not remain on the OS's output buffers */
2641 fflush(fp);
2642 fsync(fileno(fp));
2643 fclose(fp);
2644
2645 /* Use RENAME to make sure the DB file is changed atomically only
2646 * if the generate DB file is ok. */
2647 if (rename(tmpfile,filename) == -1) {
2648 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2649 unlink(tmpfile);
2650 return REDIS_ERR;
2651 }
2652 redisLog(REDIS_NOTICE,"DB saved on disk");
2653 server.dirty = 0;
2654 server.lastsave = time(NULL);
2655 return REDIS_OK;
2656
2657 werr:
2658 fclose(fp);
2659 unlink(tmpfile);
2660 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2661 if (di) dictReleaseIterator(di);
2662 return REDIS_ERR;
2663 }
2664
2665 static int rdbSaveBackground(char *filename) {
2666 pid_t childpid;
2667
2668 if (server.bgsavechildpid != -1) return REDIS_ERR;
2669 if ((childpid = fork()) == 0) {
2670 /* Child */
2671 close(server.fd);
2672 if (rdbSave(filename) == REDIS_OK) {
2673 exit(0);
2674 } else {
2675 exit(1);
2676 }
2677 } else {
2678 /* Parent */
2679 if (childpid == -1) {
2680 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2681 strerror(errno));
2682 return REDIS_ERR;
2683 }
2684 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2685 server.bgsavechildpid = childpid;
2686 return REDIS_OK;
2687 }
2688 return REDIS_OK; /* unreached */
2689 }
2690
2691 static void rdbRemoveTempFile(pid_t childpid) {
2692 char tmpfile[256];
2693
2694 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2695 unlink(tmpfile);
2696 }
2697
2698 static int rdbLoadType(FILE *fp) {
2699 unsigned char type;
2700 if (fread(&type,1,1,fp) == 0) return -1;
2701 return type;
2702 }
2703
2704 static time_t rdbLoadTime(FILE *fp) {
2705 int32_t t32;
2706 if (fread(&t32,4,1,fp) == 0) return -1;
2707 return (time_t) t32;
2708 }
2709
2710 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2711 * of this file for a description of how this are stored on disk.
2712 *
2713 * isencoded is set to 1 if the readed length is not actually a length but
2714 * an "encoding type", check the above comments for more info */
2715 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2716 unsigned char buf[2];
2717 uint32_t len;
2718
2719 if (isencoded) *isencoded = 0;
2720 if (rdbver == 0) {
2721 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2722 return ntohl(len);
2723 } else {
2724 int type;
2725
2726 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2727 type = (buf[0]&0xC0)>>6;
2728 if (type == REDIS_RDB_6BITLEN) {
2729 /* Read a 6 bit len */
2730 return buf[0]&0x3F;
2731 } else if (type == REDIS_RDB_ENCVAL) {
2732 /* Read a 6 bit len encoding type */
2733 if (isencoded) *isencoded = 1;
2734 return buf[0]&0x3F;
2735 } else if (type == REDIS_RDB_14BITLEN) {
2736 /* Read a 14 bit len */
2737 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2738 return ((buf[0]&0x3F)<<8)|buf[1];
2739 } else {
2740 /* Read a 32 bit len */
2741 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2742 return ntohl(len);
2743 }
2744 }
2745 }
2746
2747 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2748 unsigned char enc[4];
2749 long long val;
2750
2751 if (enctype == REDIS_RDB_ENC_INT8) {
2752 if (fread(enc,1,1,fp) == 0) return NULL;
2753 val = (signed char)enc[0];
2754 } else if (enctype == REDIS_RDB_ENC_INT16) {
2755 uint16_t v;
2756 if (fread(enc,2,1,fp) == 0) return NULL;
2757 v = enc[0]|(enc[1]<<8);
2758 val = (int16_t)v;
2759 } else if (enctype == REDIS_RDB_ENC_INT32) {
2760 uint32_t v;
2761 if (fread(enc,4,1,fp) == 0) return NULL;
2762 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2763 val = (int32_t)v;
2764 } else {
2765 val = 0; /* anti-warning */
2766 redisAssert(0!=0);
2767 }
2768 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2769 }
2770
2771 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2772 unsigned int len, clen;
2773 unsigned char *c = NULL;
2774 sds val = NULL;
2775
2776 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2777 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2778 if ((c = zmalloc(clen)) == NULL) goto err;
2779 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2780 if (fread(c,clen,1,fp) == 0) goto err;
2781 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2782 zfree(c);
2783 return createObject(REDIS_STRING,val);
2784 err:
2785 zfree(c);
2786 sdsfree(val);
2787 return NULL;
2788 }
2789
2790 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2791 int isencoded;
2792 uint32_t len;
2793 sds val;
2794
2795 len = rdbLoadLen(fp,rdbver,&isencoded);
2796 if (isencoded) {
2797 switch(len) {
2798 case REDIS_RDB_ENC_INT8:
2799 case REDIS_RDB_ENC_INT16:
2800 case REDIS_RDB_ENC_INT32:
2801 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2802 case REDIS_RDB_ENC_LZF:
2803 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2804 default:
2805 redisAssert(0!=0);
2806 }
2807 }
2808
2809 if (len == REDIS_RDB_LENERR) return NULL;
2810 val = sdsnewlen(NULL,len);
2811 if (len && fread(val,len,1,fp) == 0) {
2812 sdsfree(val);
2813 return NULL;
2814 }
2815 return tryObjectSharing(createObject(REDIS_STRING,val));
2816 }
2817
2818 /* For information about double serialization check rdbSaveDoubleValue() */
2819 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2820 char buf[128];
2821 unsigned char len;
2822
2823 if (fread(&len,1,1,fp) == 0) return -1;
2824 switch(len) {
2825 case 255: *val = R_NegInf; return 0;
2826 case 254: *val = R_PosInf; return 0;
2827 case 253: *val = R_Nan; return 0;
2828 default:
2829 if (fread(buf,len,1,fp) == 0) return -1;
2830 buf[len] = '\0';
2831 sscanf(buf, "%lg", val);
2832 return 0;
2833 }
2834 }
2835
2836 static int rdbLoad(char *filename) {
2837 FILE *fp;
2838 robj *keyobj = NULL;
2839 uint32_t dbid;
2840 int type, retval, rdbver;
2841 dict *d = server.db[0].dict;
2842 redisDb *db = server.db+0;
2843 char buf[1024];
2844 time_t expiretime = -1, now = time(NULL);
2845
2846 fp = fopen(filename,"r");
2847 if (!fp) return REDIS_ERR;
2848 if (fread(buf,9,1,fp) == 0) goto eoferr;
2849 buf[9] = '\0';
2850 if (memcmp(buf,"REDIS",5) != 0) {
2851 fclose(fp);
2852 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2853 return REDIS_ERR;
2854 }
2855 rdbver = atoi(buf+5);
2856 if (rdbver > 1) {
2857 fclose(fp);
2858 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2859 return REDIS_ERR;
2860 }
2861 while(1) {
2862 robj *o;
2863
2864 /* Read type. */
2865 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2866 if (type == REDIS_EXPIRETIME) {
2867 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2868 /* We read the time so we need to read the object type again */
2869 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2870 }
2871 if (type == REDIS_EOF) break;
2872 /* Handle SELECT DB opcode as a special case */
2873 if (type == REDIS_SELECTDB) {
2874 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2875 goto eoferr;
2876 if (dbid >= (unsigned)server.dbnum) {
2877 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2878 exit(1);
2879 }
2880 db = server.db+dbid;
2881 d = db->dict;
2882 continue;
2883 }
2884 /* Read key */
2885 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2886
2887 if (type == REDIS_STRING) {
2888 /* Read string value */
2889 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2890 tryObjectEncoding(o);
2891 } else if (type == REDIS_LIST || type == REDIS_SET) {
2892 /* Read list/set value */
2893 uint32_t listlen;
2894
2895 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2896 goto eoferr;
2897 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2898 /* Load every single element of the list/set */
2899 while(listlen--) {
2900 robj *ele;
2901
2902 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2903 tryObjectEncoding(ele);
2904 if (type == REDIS_LIST) {
2905 listAddNodeTail((list*)o->ptr,ele);
2906 } else {
2907 dictAdd((dict*)o->ptr,ele,NULL);
2908 }
2909 }
2910 } else if (type == REDIS_ZSET) {
2911 /* Read list/set value */
2912 uint32_t zsetlen;
2913 zset *zs;
2914
2915 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2916 goto eoferr;
2917 o = createZsetObject();
2918 zs = o->ptr;
2919 /* Load every single element of the list/set */
2920 while(zsetlen--) {
2921 robj *ele;
2922 double *score = zmalloc(sizeof(double));
2923
2924 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2925 tryObjectEncoding(ele);
2926 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2927 dictAdd(zs->dict,ele,score);
2928 zslInsert(zs->zsl,*score,ele);
2929 incrRefCount(ele); /* added to skiplist */
2930 }
2931 } else {
2932 redisAssert(0 != 0);
2933 }
2934 /* Add the new object in the hash table */
2935 retval = dictAdd(d,keyobj,o);
2936 if (retval == DICT_ERR) {
2937 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2938 exit(1);
2939 }
2940 /* Set the expire time if needed */
2941 if (expiretime != -1) {
2942 setExpire(db,keyobj,expiretime);
2943 /* Delete this key if already expired */
2944 if (expiretime < now) deleteKey(db,keyobj);
2945 expiretime = -1;
2946 }
2947 keyobj = o = NULL;
2948 }
2949 fclose(fp);
2950 return REDIS_OK;
2951
2952 eoferr: /* unexpected end of file is handled here with a fatal exit */
2953 if (keyobj) decrRefCount(keyobj);
2954 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2955 exit(1);
2956 return REDIS_ERR; /* Just to avoid warning */
2957 }
2958
2959 /*================================== Commands =============================== */
2960
2961 static void authCommand(redisClient *c) {
2962 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2963 c->authenticated = 1;
2964 addReply(c,shared.ok);
2965 } else {
2966 c->authenticated = 0;
2967 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2968 }
2969 }
2970
2971 static void pingCommand(redisClient *c) {
2972 addReply(c,shared.pong);
2973 }
2974
2975 static void echoCommand(redisClient *c) {
2976 addReplyBulkLen(c,c->argv[1]);
2977 addReply(c,c->argv[1]);
2978 addReply(c,shared.crlf);
2979 }
2980
2981 /*=================================== Strings =============================== */
2982
2983 static void setGenericCommand(redisClient *c, int nx) {
2984 int retval;
2985
2986 if (nx) deleteIfVolatile(c->db,c->argv[1]);
2987 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2988 if (retval == DICT_ERR) {
2989 if (!nx) {
2990 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2991 incrRefCount(c->argv[2]);
2992 } else {
2993 addReply(c,shared.czero);
2994 return;
2995 }
2996 } else {
2997 incrRefCount(c->argv[1]);
2998 incrRefCount(c->argv[2]);
2999 }
3000 server.dirty++;
3001 removeExpire(c->db,c->argv[1]);
3002 addReply(c, nx ? shared.cone : shared.ok);
3003 }
3004
3005 static void setCommand(redisClient *c) {
3006 setGenericCommand(c,0);
3007 }
3008
3009 static void setnxCommand(redisClient *c) {
3010 setGenericCommand(c,1);
3011 }
3012
3013 static void getCommand(redisClient *c) {
3014 robj *o = lookupKeyRead(c->db,c->argv[1]);
3015
3016 if (o == NULL) {
3017 addReply(c,shared.nullbulk);
3018 } else {
3019 if (o->type != REDIS_STRING) {
3020 addReply(c,shared.wrongtypeerr);
3021 } else {
3022 addReplyBulkLen(c,o);
3023 addReply(c,o);
3024 addReply(c,shared.crlf);
3025 }
3026 }
3027 }
3028
3029 static void getsetCommand(redisClient *c) {
3030 getCommand(c);
3031 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3032 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3033 } else {
3034 incrRefCount(c->argv[1]);
3035 }
3036 incrRefCount(c->argv[2]);
3037 server.dirty++;
3038 removeExpire(c->db,c->argv[1]);
3039 }
3040
3041 static void mgetCommand(redisClient *c) {
3042 int j;
3043
3044 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3045 for (j = 1; j < c->argc; j++) {
3046 robj *o = lookupKeyRead(c->db,c->argv[j]);
3047 if (o == NULL) {
3048 addReply(c,shared.nullbulk);
3049 } else {
3050 if (o->type != REDIS_STRING) {
3051 addReply(c,shared.nullbulk);
3052 } else {
3053 addReplyBulkLen(c,o);
3054 addReply(c,o);
3055 addReply(c,shared.crlf);
3056 }
3057 }
3058 }
3059 }
3060
3061 static void msetGenericCommand(redisClient *c, int nx) {
3062 int j, busykeys = 0;
3063
3064 if ((c->argc % 2) == 0) {
3065 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3066 return;
3067 }
3068 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3069 * set nothing at all if at least one already key exists. */
3070 if (nx) {
3071 for (j = 1; j < c->argc; j += 2) {
3072 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3073 busykeys++;
3074 }
3075 }
3076 }
3077 if (busykeys) {
3078 addReply(c, shared.czero);
3079 return;
3080 }
3081
3082 for (j = 1; j < c->argc; j += 2) {
3083 int retval;
3084
3085 tryObjectEncoding(c->argv[j+1]);
3086 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3087 if (retval == DICT_ERR) {
3088 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3089 incrRefCount(c->argv[j+1]);
3090 } else {
3091 incrRefCount(c->argv[j]);
3092 incrRefCount(c->argv[j+1]);
3093 }
3094 removeExpire(c->db,c->argv[j]);
3095 }
3096 server.dirty += (c->argc-1)/2;
3097 addReply(c, nx ? shared.cone : shared.ok);
3098 }
3099
3100 static void msetCommand(redisClient *c) {
3101 msetGenericCommand(c,0);
3102 }
3103
3104 static void msetnxCommand(redisClient *c) {
3105 msetGenericCommand(c,1);
3106 }
3107
3108 static void incrDecrCommand(redisClient *c, long long incr) {
3109 long long value;
3110 int retval;
3111 robj *o;
3112
3113 o = lookupKeyWrite(c->db,c->argv[1]);
3114 if (o == NULL) {
3115 value = 0;
3116 } else {
3117 if (o->type != REDIS_STRING) {
3118 value = 0;
3119 } else {
3120 char *eptr;
3121
3122 if (o->encoding == REDIS_ENCODING_RAW)
3123 value = strtoll(o->ptr, &eptr, 10);
3124 else if (o->encoding == REDIS_ENCODING_INT)
3125 value = (long)o->ptr;
3126 else
3127 redisAssert(1 != 1);
3128 }
3129 }
3130
3131 value += incr;
3132 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3133 tryObjectEncoding(o);
3134 retval = dictAdd(c->db->dict,c->argv[1],o);
3135 if (retval == DICT_ERR) {
3136 dictReplace(c->db->dict,c->argv[1],o);
3137 removeExpire(c->db,c->argv[1]);
3138 } else {
3139 incrRefCount(c->argv[1]);
3140 }
3141 server.dirty++;
3142 addReply(c,shared.colon);
3143 addReply(c,o);
3144 addReply(c,shared.crlf);
3145 }
3146
3147 static void incrCommand(redisClient *c) {
3148 incrDecrCommand(c,1);
3149 }
3150
3151 static void decrCommand(redisClient *c) {
3152 incrDecrCommand(c,-1);
3153 }
3154
3155 static void incrbyCommand(redisClient *c) {
3156 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3157 incrDecrCommand(c,incr);
3158 }
3159
3160 static void decrbyCommand(redisClient *c) {
3161 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3162 incrDecrCommand(c,-incr);
3163 }
3164
3165 /* ========================= Type agnostic commands ========================= */
3166
3167 static void delCommand(redisClient *c) {
3168 int deleted = 0, j;
3169
3170 for (j = 1; j < c->argc; j++) {
3171 if (deleteKey(c->db,c->argv[j])) {
3172 server.dirty++;
3173 deleted++;
3174 }
3175 }
3176 switch(deleted) {
3177 case 0:
3178 addReply(c,shared.czero);
3179 break;
3180 case 1:
3181 addReply(c,shared.cone);
3182 break;
3183 default:
3184 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3185 break;
3186 }
3187 }
3188
3189 static void existsCommand(redisClient *c) {
3190 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3191 }
3192
3193 static void selectCommand(redisClient *c) {
3194 int id = atoi(c->argv[1]->ptr);
3195
3196 if (selectDb(c,id) == REDIS_ERR) {
3197 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3198 } else {
3199 addReply(c,shared.ok);
3200 }
3201 }
3202
3203 static void randomkeyCommand(redisClient *c) {
3204 dictEntry *de;
3205
3206 while(1) {
3207 de = dictGetRandomKey(c->db->dict);
3208 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3209 }
3210 if (de == NULL) {
3211 addReply(c,shared.plus);
3212 addReply(c,shared.crlf);
3213 } else {
3214 addReply(c,shared.plus);
3215 addReply(c,dictGetEntryKey(de));
3216 addReply(c,shared.crlf);
3217 }
3218 }
3219
3220 static void keysCommand(redisClient *c) {
3221 dictIterator *di;
3222 dictEntry *de;
3223 sds pattern = c->argv[1]->ptr;
3224 int plen = sdslen(pattern);
3225 unsigned long numkeys = 0, keyslen = 0;
3226 robj *lenobj = createObject(REDIS_STRING,NULL);
3227
3228 di = dictGetIterator(c->db->dict);
3229 addReply(c,lenobj);
3230 decrRefCount(lenobj);
3231 while((de = dictNext(di)) != NULL) {
3232 robj *keyobj = dictGetEntryKey(de);
3233
3234 sds key = keyobj->ptr;
3235 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3236 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3237 if (expireIfNeeded(c->db,keyobj) == 0) {
3238 if (numkeys != 0)
3239 addReply(c,shared.space);
3240 addReply(c,keyobj);
3241 numkeys++;
3242 keyslen += sdslen(key);
3243 }
3244 }
3245 }
3246 dictReleaseIterator(di);
3247 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3248 addReply(c,shared.crlf);
3249 }
3250
3251 static void dbsizeCommand(redisClient *c) {
3252 addReplySds(c,
3253 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3254 }
3255
3256 static void lastsaveCommand(redisClient *c) {
3257 addReplySds(c,
3258 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3259 }
3260
3261 static void typeCommand(redisClient *c) {
3262 robj *o;
3263 char *type;
3264
3265 o = lookupKeyRead(c->db,c->argv[1]);
3266 if (o == NULL) {
3267 type = "+none";
3268 } else {
3269 switch(o->type) {
3270 case REDIS_STRING: type = "+string"; break;
3271 case REDIS_LIST: type = "+list"; break;
3272 case REDIS_SET: type = "+set"; break;
3273 case REDIS_ZSET: type = "+zset"; break;
3274 default: type = "unknown"; break;
3275 }
3276 }
3277 addReplySds(c,sdsnew(type));
3278 addReply(c,shared.crlf);
3279 }
3280
3281 static void saveCommand(redisClient *c) {
3282 if (server.bgsavechildpid != -1) {
3283 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3284 return;
3285 }
3286 if (rdbSave(server.dbfilename) == REDIS_OK) {
3287 addReply(c,shared.ok);
3288 } else {
3289 addReply(c,shared.err);
3290 }
3291 }
3292
3293 static void bgsaveCommand(redisClient *c) {
3294 if (server.bgsavechildpid != -1) {
3295 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3296 return;
3297 }
3298 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3299 char *status = "+Background saving started\r\n";
3300 addReplySds(c,sdsnew(status));
3301 } else {
3302 addReply(c,shared.err);
3303 }
3304 }
3305
3306 static void shutdownCommand(redisClient *c) {
3307 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3308 /* Kill the saving child if there is a background saving in progress.
3309 We want to avoid race conditions, for instance our saving child may
3310 overwrite the synchronous saving did by SHUTDOWN. */
3311 if (server.bgsavechildpid != -1) {
3312 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3313 kill(server.bgsavechildpid,SIGKILL);
3314 rdbRemoveTempFile(server.bgsavechildpid);
3315 }
3316 /* SYNC SAVE */
3317 if (rdbSave(server.dbfilename) == REDIS_OK) {
3318 if (server.daemonize)
3319 unlink(server.pidfile);
3320 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3321 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3322 exit(1);
3323 } else {
3324 /* Ooops.. error saving! The best we can do is to continue operating.
3325 * Note that if there was a background saving process, in the next
3326 * cron() Redis will be notified that the background saving aborted,
3327 * handling special stuff like slaves pending for synchronization... */
3328 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3329 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3330 }
3331 }
3332
3333 static void renameGenericCommand(redisClient *c, int nx) {
3334 robj *o;
3335
3336 /* To use the same key as src and dst is probably an error */
3337 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3338 addReply(c,shared.sameobjecterr);
3339 return;
3340 }
3341
3342 o = lookupKeyWrite(c->db,c->argv[1]);
3343 if (o == NULL) {
3344 addReply(c,shared.nokeyerr);
3345 return;
3346 }
3347 incrRefCount(o);
3348 deleteIfVolatile(c->db,c->argv[2]);
3349 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3350 if (nx) {
3351 decrRefCount(o);
3352 addReply(c,shared.czero);
3353 return;
3354 }
3355 dictReplace(c->db->dict,c->argv[2],o);
3356 } else {
3357 incrRefCount(c->argv[2]);
3358 }
3359 deleteKey(c->db,c->argv[1]);
3360 server.dirty++;
3361 addReply(c,nx ? shared.cone : shared.ok);
3362 }
3363
3364 static void renameCommand(redisClient *c) {
3365 renameGenericCommand(c,0);
3366 }
3367
3368 static void renamenxCommand(redisClient *c) {
3369 renameGenericCommand(c,1);
3370 }
3371
3372 static void moveCommand(redisClient *c) {
3373 robj *o;
3374 redisDb *src, *dst;
3375 int srcid;
3376
3377 /* Obtain source and target DB pointers */
3378 src = c->db;
3379 srcid = c->db->id;
3380 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3381 addReply(c,shared.outofrangeerr);
3382 return;
3383 }
3384 dst = c->db;
3385 selectDb(c,srcid); /* Back to the source DB */
3386
3387 /* If the user is moving using as target the same
3388 * DB as the source DB it is probably an error. */
3389 if (src == dst) {
3390 addReply(c,shared.sameobjecterr);
3391 return;
3392 }
3393
3394 /* Check if the element exists and get a reference */
3395 o = lookupKeyWrite(c->db,c->argv[1]);
3396 if (!o) {
3397 addReply(c,shared.czero);
3398 return;
3399 }
3400
3401 /* Try to add the element to the target DB */
3402 deleteIfVolatile(dst,c->argv[1]);
3403 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3404 addReply(c,shared.czero);
3405 return;
3406 }
3407 incrRefCount(c->argv[1]);
3408 incrRefCount(o);
3409
3410 /* OK! key moved, free the entry in the source DB */
3411 deleteKey(src,c->argv[1]);
3412 server.dirty++;
3413 addReply(c,shared.cone);
3414 }
3415
3416 /* =================================== Lists ================================ */
3417 static void pushGenericCommand(redisClient *c, int where) {
3418 robj *lobj;
3419 list *list;
3420
3421 lobj = lookupKeyWrite(c->db,c->argv[1]);
3422 if (lobj == NULL) {
3423 lobj = createListObject();
3424 list = lobj->ptr;
3425 if (where == REDIS_HEAD) {
3426 listAddNodeHead(list,c->argv[2]);
3427 } else {
3428 listAddNodeTail(list,c->argv[2]);
3429 }
3430 dictAdd(c->db->dict,c->argv[1],lobj);
3431 incrRefCount(c->argv[1]);
3432 incrRefCount(c->argv[2]);
3433 } else {
3434 if (lobj->type != REDIS_LIST) {
3435 addReply(c,shared.wrongtypeerr);
3436 return;
3437 }
3438 list = lobj->ptr;
3439 if (where == REDIS_HEAD) {
3440 listAddNodeHead(list,c->argv[2]);
3441 } else {
3442 listAddNodeTail(list,c->argv[2]);
3443 }
3444 incrRefCount(c->argv[2]);
3445 }
3446 server.dirty++;
3447 addReply(c,shared.ok);
3448 }
3449
3450 static void lpushCommand(redisClient *c) {
3451 pushGenericCommand(c,REDIS_HEAD);
3452 }
3453
3454 static void rpushCommand(redisClient *c) {
3455 pushGenericCommand(c,REDIS_TAIL);
3456 }
3457
3458 static void llenCommand(redisClient *c) {
3459 robj *o;
3460 list *l;
3461
3462 o = lookupKeyRead(c->db,c->argv[1]);
3463 if (o == NULL) {
3464 addReply(c,shared.czero);
3465 return;
3466 } else {
3467 if (o->type != REDIS_LIST) {
3468 addReply(c,shared.wrongtypeerr);
3469 } else {
3470 l = o->ptr;
3471 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3472 }
3473 }
3474 }
3475
3476 static void lindexCommand(redisClient *c) {
3477 robj *o;
3478 int index = atoi(c->argv[2]->ptr);
3479
3480 o = lookupKeyRead(c->db,c->argv[1]);
3481 if (o == NULL) {
3482 addReply(c,shared.nullbulk);
3483 } else {
3484 if (o->type != REDIS_LIST) {
3485 addReply(c,shared.wrongtypeerr);
3486 } else {
3487 list *list = o->ptr;
3488 listNode *ln;
3489
3490 ln = listIndex(list, index);
3491 if (ln == NULL) {
3492 addReply(c,shared.nullbulk);
3493 } else {
3494 robj *ele = listNodeValue(ln);
3495 addReplyBulkLen(c,ele);
3496 addReply(c,ele);
3497 addReply(c,shared.crlf);
3498 }
3499 }
3500 }
3501 }
3502
3503 static void lsetCommand(redisClient *c) {
3504 robj *o;
3505 int index = atoi(c->argv[2]->ptr);
3506
3507 o = lookupKeyWrite(c->db,c->argv[1]);
3508 if (o == NULL) {
3509 addReply(c,shared.nokeyerr);
3510 } else {
3511 if (o->type != REDIS_LIST) {
3512 addReply(c,shared.wrongtypeerr);
3513 } else {
3514 list *list = o->ptr;
3515 listNode *ln;
3516
3517 ln = listIndex(list, index);
3518 if (ln == NULL) {
3519 addReply(c,shared.outofrangeerr);
3520 } else {
3521 robj *ele = listNodeValue(ln);
3522
3523 decrRefCount(ele);
3524 listNodeValue(ln) = c->argv[3];
3525 incrRefCount(c->argv[3]);
3526 addReply(c,shared.ok);
3527 server.dirty++;
3528 }
3529 }
3530 }
3531 }
3532
3533 static void popGenericCommand(redisClient *c, int where) {
3534 robj *o;
3535
3536 o = lookupKeyWrite(c->db,c->argv[1]);
3537 if (o == NULL) {
3538 addReply(c,shared.nullbulk);
3539 } else {
3540 if (o->type != REDIS_LIST) {
3541 addReply(c,shared.wrongtypeerr);
3542 } else {
3543 list *list = o->ptr;
3544 listNode *ln;
3545
3546 if (where == REDIS_HEAD)
3547 ln = listFirst(list);
3548 else
3549 ln = listLast(list);
3550
3551 if (ln == NULL) {
3552 addReply(c,shared.nullbulk);
3553 } else {
3554 robj *ele = listNodeValue(ln);
3555 addReplyBulkLen(c,ele);
3556 addReply(c,ele);
3557 addReply(c,shared.crlf);
3558 listDelNode(list,ln);
3559 server.dirty++;
3560 }
3561 }
3562 }
3563 }
3564
3565 static void lpopCommand(redisClient *c) {
3566 popGenericCommand(c,REDIS_HEAD);
3567 }
3568
3569 static void rpopCommand(redisClient *c) {
3570 popGenericCommand(c,REDIS_TAIL);
3571 }
3572
3573 static void lrangeCommand(redisClient *c) {
3574 robj *o;
3575 int start = atoi(c->argv[2]->ptr);
3576 int end = atoi(c->argv[3]->ptr);
3577
3578 o = lookupKeyRead(c->db,c->argv[1]);
3579 if (o == NULL) {
3580 addReply(c,shared.nullmultibulk);
3581 } else {
3582 if (o->type != REDIS_LIST) {
3583 addReply(c,shared.wrongtypeerr);
3584 } else {
3585 list *list = o->ptr;
3586 listNode *ln;
3587 int llen = listLength(list);
3588 int rangelen, j;
3589 robj *ele;
3590
3591 /* convert negative indexes */
3592 if (start < 0) start = llen+start;
3593 if (end < 0) end = llen+end;
3594 if (start < 0) start = 0;
3595 if (end < 0) end = 0;
3596
3597 /* indexes sanity checks */
3598 if (start > end || start >= llen) {
3599 /* Out of range start or start > end result in empty list */
3600 addReply(c,shared.emptymultibulk);
3601 return;
3602 }
3603 if (end >= llen) end = llen-1;
3604 rangelen = (end-start)+1;
3605
3606 /* Return the result in form of a multi-bulk reply */
3607 ln = listIndex(list, start);
3608 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3609 for (j = 0; j < rangelen; j++) {
3610 ele = listNodeValue(ln);
3611 addReplyBulkLen(c,ele);
3612 addReply(c,ele);
3613 addReply(c,shared.crlf);
3614 ln = ln->next;
3615 }
3616 }
3617 }
3618 }
3619
3620 static void ltrimCommand(redisClient *c) {
3621 robj *o;
3622 int start = atoi(c->argv[2]->ptr);
3623 int end = atoi(c->argv[3]->ptr);
3624
3625 o = lookupKeyWrite(c->db,c->argv[1]);
3626 if (o == NULL) {
3627 addReply(c,shared.nokeyerr);
3628 } else {
3629 if (o->type != REDIS_LIST) {
3630 addReply(c,shared.wrongtypeerr);
3631 } else {
3632 list *list = o->ptr;
3633 listNode *ln;
3634 int llen = listLength(list);
3635 int j, ltrim, rtrim;
3636
3637 /* convert negative indexes */
3638 if (start < 0) start = llen+start;
3639 if (end < 0) end = llen+end;
3640 if (start < 0) start = 0;
3641 if (end < 0) end = 0;
3642
3643 /* indexes sanity checks */
3644 if (start > end || start >= llen) {
3645 /* Out of range start or start > end result in empty list */
3646 ltrim = llen;
3647 rtrim = 0;
3648 } else {
3649 if (end >= llen) end = llen-1;
3650 ltrim = start;
3651 rtrim = llen-end-1;
3652 }
3653
3654 /* Remove list elements to perform the trim */
3655 for (j = 0; j < ltrim; j++) {
3656 ln = listFirst(list);
3657 listDelNode(list,ln);
3658 }
3659 for (j = 0; j < rtrim; j++) {
3660 ln = listLast(list);
3661 listDelNode(list,ln);
3662 }
3663 server.dirty++;
3664 addReply(c,shared.ok);
3665 }
3666 }
3667 }
3668
3669 static void lremCommand(redisClient *c) {
3670 robj *o;
3671
3672 o = lookupKeyWrite(c->db,c->argv[1]);
3673 if (o == NULL) {
3674 addReply(c,shared.czero);
3675 } else {
3676 if (o->type != REDIS_LIST) {
3677 addReply(c,shared.wrongtypeerr);
3678 } else {
3679 list *list = o->ptr;
3680 listNode *ln, *next;
3681 int toremove = atoi(c->argv[2]->ptr);
3682 int removed = 0;
3683 int fromtail = 0;
3684
3685 if (toremove < 0) {
3686 toremove = -toremove;
3687 fromtail = 1;
3688 }
3689 ln = fromtail ? list->tail : list->head;
3690 while (ln) {
3691 robj *ele = listNodeValue(ln);
3692
3693 next = fromtail ? ln->prev : ln->next;
3694 if (compareStringObjects(ele,c->argv[3]) == 0) {
3695 listDelNode(list,ln);
3696 server.dirty++;
3697 removed++;
3698 if (toremove && removed == toremove) break;
3699 }
3700 ln = next;
3701 }
3702 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3703 }
3704 }
3705 }
3706
3707 /* This is the semantic of this command:
3708 * RPOPLPUSH srclist dstlist:
3709 * IF LLEN(srclist) > 0
3710 * element = RPOP srclist
3711 * LPUSH dstlist element
3712 * RETURN element
3713 * ELSE
3714 * RETURN nil
3715 * END
3716 * END
3717 *
3718 * The idea is to be able to get an element from a list in a reliable way
3719 * since the element is not just returned but pushed against another list
3720 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3721 */
3722 static void rpoplpushcommand(redisClient *c) {
3723 robj *sobj;
3724
3725 sobj = lookupKeyWrite(c->db,c->argv[1]);
3726 if (sobj == NULL) {
3727 addReply(c,shared.nullbulk);
3728 } else {
3729 if (sobj->type != REDIS_LIST) {
3730 addReply(c,shared.wrongtypeerr);
3731 } else {
3732 list *srclist = sobj->ptr;
3733 listNode *ln = listLast(srclist);
3734
3735 if (ln == NULL) {
3736 addReply(c,shared.nullbulk);
3737 } else {
3738 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3739 robj *ele = listNodeValue(ln);
3740 list *dstlist;
3741
3742 if (dobj == NULL) {
3743
3744 /* Create the list if the key does not exist */
3745 dobj = createListObject();
3746 dictAdd(c->db->dict,c->argv[2],dobj);
3747 incrRefCount(c->argv[2]);
3748 } else if (dobj->type != REDIS_LIST) {
3749 addReply(c,shared.wrongtypeerr);
3750 return;
3751 }
3752 /* Add the element to the target list */
3753 dstlist = dobj->ptr;
3754 listAddNodeHead(dstlist,ele);
3755 incrRefCount(ele);
3756
3757 /* Send the element to the client as reply as well */
3758 addReplyBulkLen(c,ele);
3759 addReply(c,ele);
3760 addReply(c,shared.crlf);
3761
3762 /* Finally remove the element from the source list */
3763 listDelNode(srclist,ln);
3764 server.dirty++;
3765 }
3766 }
3767 }
3768 }
3769
3770
3771 /* ==================================== Sets ================================ */
3772
3773 static void saddCommand(redisClient *c) {
3774 robj *set;
3775
3776 set = lookupKeyWrite(c->db,c->argv[1]);
3777 if (set == NULL) {
3778 set = createSetObject();
3779 dictAdd(c->db->dict,c->argv[1],set);
3780 incrRefCount(c->argv[1]);
3781 } else {
3782 if (set->type != REDIS_SET) {
3783 addReply(c,shared.wrongtypeerr);
3784 return;
3785 }
3786 }
3787 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3788 incrRefCount(c->argv[2]);
3789 server.dirty++;
3790 addReply(c,shared.cone);
3791 } else {
3792 addReply(c,shared.czero);
3793 }
3794 }
3795
3796 static void sremCommand(redisClient *c) {
3797 robj *set;
3798
3799 set = lookupKeyWrite(c->db,c->argv[1]);
3800 if (set == NULL) {
3801 addReply(c,shared.czero);
3802 } else {
3803 if (set->type != REDIS_SET) {
3804 addReply(c,shared.wrongtypeerr);
3805 return;
3806 }
3807 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3808 server.dirty++;
3809 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3810 addReply(c,shared.cone);
3811 } else {
3812 addReply(c,shared.czero);
3813 }
3814 }
3815 }
3816
3817 static void smoveCommand(redisClient *c) {
3818 robj *srcset, *dstset;
3819
3820 srcset = lookupKeyWrite(c->db,c->argv[1]);
3821 dstset = lookupKeyWrite(c->db,c->argv[2]);
3822
3823 /* If the source key does not exist return 0, if it's of the wrong type
3824 * raise an error */
3825 if (srcset == NULL || srcset->type != REDIS_SET) {
3826 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3827 return;
3828 }
3829 /* Error if the destination key is not a set as well */
3830 if (dstset && dstset->type != REDIS_SET) {
3831 addReply(c,shared.wrongtypeerr);
3832 return;
3833 }
3834 /* Remove the element from the source set */
3835 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3836 /* Key not found in the src set! return zero */
3837 addReply(c,shared.czero);
3838 return;
3839 }
3840 server.dirty++;
3841 /* Add the element to the destination set */
3842 if (!dstset) {
3843 dstset = createSetObject();
3844 dictAdd(c->db->dict,c->argv[2],dstset);
3845 incrRefCount(c->argv[2]);
3846 }
3847 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3848 incrRefCount(c->argv[3]);
3849 addReply(c,shared.cone);
3850 }
3851
3852 static void sismemberCommand(redisClient *c) {
3853 robj *set;
3854
3855 set = lookupKeyRead(c->db,c->argv[1]);
3856 if (set == NULL) {
3857 addReply(c,shared.czero);
3858 } else {
3859 if (set->type != REDIS_SET) {
3860 addReply(c,shared.wrongtypeerr);
3861 return;
3862 }
3863 if (dictFind(set->ptr,c->argv[2]))
3864 addReply(c,shared.cone);
3865 else
3866 addReply(c,shared.czero);
3867 }
3868 }
3869
3870 static void scardCommand(redisClient *c) {
3871 robj *o;
3872 dict *s;
3873
3874 o = lookupKeyRead(c->db,c->argv[1]);
3875 if (o == NULL) {
3876 addReply(c,shared.czero);
3877 return;
3878 } else {
3879 if (o->type != REDIS_SET) {
3880 addReply(c,shared.wrongtypeerr);
3881 } else {
3882 s = o->ptr;
3883 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
3884 dictSize(s)));
3885 }
3886 }
3887 }
3888
3889 static void spopCommand(redisClient *c) {
3890 robj *set;
3891 dictEntry *de;
3892
3893 set = lookupKeyWrite(c->db,c->argv[1]);
3894 if (set == NULL) {
3895 addReply(c,shared.nullbulk);
3896 } else {
3897 if (set->type != REDIS_SET) {
3898 addReply(c,shared.wrongtypeerr);
3899 return;
3900 }
3901 de = dictGetRandomKey(set->ptr);
3902 if (de == NULL) {
3903 addReply(c,shared.nullbulk);
3904 } else {
3905 robj *ele = dictGetEntryKey(de);
3906
3907 addReplyBulkLen(c,ele);
3908 addReply(c,ele);
3909 addReply(c,shared.crlf);
3910 dictDelete(set->ptr,ele);
3911 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3912 server.dirty++;
3913 }
3914 }
3915 }
3916
3917 static void srandmemberCommand(redisClient *c) {
3918 robj *set;
3919 dictEntry *de;
3920
3921 set = lookupKeyRead(c->db,c->argv[1]);
3922 if (set == NULL) {
3923 addReply(c,shared.nullbulk);
3924 } else {
3925 if (set->type != REDIS_SET) {
3926 addReply(c,shared.wrongtypeerr);
3927 return;
3928 }
3929 de = dictGetRandomKey(set->ptr);
3930 if (de == NULL) {
3931 addReply(c,shared.nullbulk);
3932 } else {
3933 robj *ele = dictGetEntryKey(de);
3934
3935 addReplyBulkLen(c,ele);
3936 addReply(c,ele);
3937 addReply(c,shared.crlf);
3938 }
3939 }
3940 }
3941
3942 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3943 dict **d1 = (void*) s1, **d2 = (void*) s2;
3944
3945 return dictSize(*d1)-dictSize(*d2);
3946 }
3947
3948 static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
3949 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3950 dictIterator *di;
3951 dictEntry *de;
3952 robj *lenobj = NULL, *dstset = NULL;
3953 unsigned long j, cardinality = 0;
3954
3955 for (j = 0; j < setsnum; j++) {
3956 robj *setobj;
3957
3958 setobj = dstkey ?
3959 lookupKeyWrite(c->db,setskeys[j]) :
3960 lookupKeyRead(c->db,setskeys[j]);
3961 if (!setobj) {
3962 zfree(dv);
3963 if (dstkey) {
3964 deleteKey(c->db,dstkey);
3965 addReply(c,shared.czero);
3966 } else {
3967 addReply(c,shared.nullmultibulk);
3968 }
3969 return;
3970 }
3971 if (setobj->type != REDIS_SET) {
3972 zfree(dv);
3973 addReply(c,shared.wrongtypeerr);
3974 return;
3975 }
3976 dv[j] = setobj->ptr;
3977 }
3978 /* Sort sets from the smallest to largest, this will improve our
3979 * algorithm's performace */
3980 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3981
3982 /* The first thing we should output is the total number of elements...
3983 * since this is a multi-bulk write, but at this stage we don't know
3984 * the intersection set size, so we use a trick, append an empty object
3985 * to the output list and save the pointer to later modify it with the
3986 * right length */
3987 if (!dstkey) {
3988 lenobj = createObject(REDIS_STRING,NULL);
3989 addReply(c,lenobj);
3990 decrRefCount(lenobj);
3991 } else {
3992 /* If we have a target key where to store the resulting set
3993 * create this key with an empty set inside */
3994 dstset = createSetObject();
3995 }
3996
3997 /* Iterate all the elements of the first (smallest) set, and test
3998 * the element against all the other sets, if at least one set does
3999 * not include the element it is discarded */
4000 di = dictGetIterator(dv[0]);
4001
4002 while((de = dictNext(di)) != NULL) {
4003 robj *ele;
4004
4005 for (j = 1; j < setsnum; j++)
4006 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4007 if (j != setsnum)
4008 continue; /* at least one set does not contain the member */
4009 ele = dictGetEntryKey(de);
4010 if (!dstkey) {
4011 addReplyBulkLen(c,ele);
4012 addReply(c,ele);
4013 addReply(c,shared.crlf);
4014 cardinality++;
4015 } else {
4016 dictAdd(dstset->ptr,ele,NULL);
4017 incrRefCount(ele);
4018 }
4019 }
4020 dictReleaseIterator(di);
4021
4022 if (dstkey) {
4023 /* Store the resulting set into the target */
4024 deleteKey(c->db,dstkey);
4025 dictAdd(c->db->dict,dstkey,dstset);
4026 incrRefCount(dstkey);
4027 }
4028
4029 if (!dstkey) {
4030 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
4031 } else {
4032 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4033 dictSize((dict*)dstset->ptr)));
4034 server.dirty++;
4035 }
4036 zfree(dv);
4037 }
4038
4039 static void sinterCommand(redisClient *c) {
4040 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4041 }
4042
4043 static void sinterstoreCommand(redisClient *c) {
4044 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4045 }
4046
4047 #define REDIS_OP_UNION 0
4048 #define REDIS_OP_DIFF 1
4049
4050 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4051 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4052 dictIterator *di;
4053 dictEntry *de;
4054 robj *dstset = NULL;
4055 int j, cardinality = 0;
4056
4057 for (j = 0; j < setsnum; j++) {
4058 robj *setobj;
4059
4060 setobj = dstkey ?
4061 lookupKeyWrite(c->db,setskeys[j]) :
4062 lookupKeyRead(c->db,setskeys[j]);
4063 if (!setobj) {
4064 dv[j] = NULL;
4065 continue;
4066 }
4067 if (setobj->type != REDIS_SET) {
4068 zfree(dv);
4069 addReply(c,shared.wrongtypeerr);
4070 return;
4071 }
4072 dv[j] = setobj->ptr;
4073 }
4074
4075 /* We need a temp set object to store our union. If the dstkey
4076 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4077 * this set object will be the resulting object to set into the target key*/
4078 dstset = createSetObject();
4079
4080 /* Iterate all the elements of all the sets, add every element a single
4081 * time to the result set */
4082 for (j = 0; j < setsnum; j++) {
4083 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4084 if (!dv[j]) continue; /* non existing keys are like empty sets */
4085
4086 di = dictGetIterator(dv[j]);
4087
4088 while((de = dictNext(di)) != NULL) {
4089 robj *ele;
4090
4091 /* dictAdd will not add the same element multiple times */
4092 ele = dictGetEntryKey(de);
4093 if (op == REDIS_OP_UNION || j == 0) {
4094 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4095 incrRefCount(ele);
4096 cardinality++;
4097 }
4098 } else if (op == REDIS_OP_DIFF) {
4099 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4100 cardinality--;
4101 }
4102 }
4103 }
4104 dictReleaseIterator(di);
4105
4106 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4107 }
4108
4109 /* Output the content of the resulting set, if not in STORE mode */
4110 if (!dstkey) {
4111 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4112 di = dictGetIterator(dstset->ptr);
4113 while((de = dictNext(di)) != NULL) {
4114 robj *ele;
4115
4116 ele = dictGetEntryKey(de);
4117 addReplyBulkLen(c,ele);
4118 addReply(c,ele);
4119 addReply(c,shared.crlf);
4120 }
4121 dictReleaseIterator(di);
4122 } else {
4123 /* If we have a target key where to store the resulting set
4124 * create this key with the result set inside */
4125 deleteKey(c->db,dstkey);
4126 dictAdd(c->db->dict,dstkey,dstset);
4127 incrRefCount(dstkey);
4128 }
4129
4130 /* Cleanup */
4131 if (!dstkey) {
4132 decrRefCount(dstset);
4133 } else {
4134 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4135 dictSize((dict*)dstset->ptr)));
4136 server.dirty++;
4137 }
4138 zfree(dv);
4139 }
4140
4141 static void sunionCommand(redisClient *c) {
4142 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4143 }
4144
4145 static void sunionstoreCommand(redisClient *c) {
4146 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4147 }
4148
4149 static void sdiffCommand(redisClient *c) {
4150 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4151 }
4152
4153 static void sdiffstoreCommand(redisClient *c) {
4154 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4155 }
4156
4157 /* ==================================== ZSets =============================== */
4158
4159 /* ZSETs are ordered sets using two data structures to hold the same elements
4160 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4161 * data structure.
4162 *
4163 * The elements are added to an hash table mapping Redis objects to scores.
4164 * At the same time the elements are added to a skip list mapping scores
4165 * to Redis objects (so objects are sorted by scores in this "view"). */
4166
4167 /* This skiplist implementation is almost a C translation of the original
4168 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4169 * Alternative to Balanced Trees", modified in three ways:
4170 * a) this implementation allows for repeated values.
4171 * b) the comparison is not just by key (our 'score') but by satellite data.
4172 * c) there is a back pointer, so it's a doubly linked list with the back
4173 * pointers being only at "level 1". This allows to traverse the list
4174 * from tail to head, useful for ZREVRANGE. */
4175
4176 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4177 zskiplistNode *zn = zmalloc(sizeof(*zn));
4178
4179 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4180 zn->score = score;
4181 zn->obj = obj;
4182 return zn;
4183 }
4184
4185 static zskiplist *zslCreate(void) {
4186 int j;
4187 zskiplist *zsl;
4188
4189 zsl = zmalloc(sizeof(*zsl));
4190 zsl->level = 1;
4191 zsl->length = 0;
4192 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4193 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4194 zsl->header->forward[j] = NULL;
4195 zsl->header->backward = NULL;
4196 zsl->tail = NULL;
4197 return zsl;
4198 }
4199
4200 static void zslFreeNode(zskiplistNode *node) {
4201 decrRefCount(node->obj);
4202 zfree(node->forward);
4203 zfree(node);
4204 }
4205
4206 static void zslFree(zskiplist *zsl) {
4207 zskiplistNode *node = zsl->header->forward[0], *next;
4208
4209 zfree(zsl->header->forward);
4210 zfree(zsl->header);
4211 while(node) {
4212 next = node->forward[0];
4213 zslFreeNode(node);
4214 node = next;
4215 }
4216 zfree(zsl);
4217 }
4218
4219 static int zslRandomLevel(void) {
4220 int level = 1;
4221 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4222 level += 1;
4223 return level;
4224 }
4225
4226 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4227 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4228 int i, level;
4229
4230 x = zsl->header;
4231 for (i = zsl->level-1; i >= 0; i--) {
4232 while (x->forward[i] &&
4233 (x->forward[i]->score < score ||
4234 (x->forward[i]->score == score &&
4235 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4236 x = x->forward[i];
4237 update[i] = x;
4238 }
4239 /* we assume the key is not already inside, since we allow duplicated
4240 * scores, and the re-insertion of score and redis object should never
4241 * happpen since the caller of zslInsert() should test in the hash table
4242 * if the element is already inside or not. */
4243 level = zslRandomLevel();
4244 if (level > zsl->level) {
4245 for (i = zsl->level; i < level; i++)
4246 update[i] = zsl->header;
4247 zsl->level = level;
4248 }
4249 x = zslCreateNode(level,score,obj);
4250 for (i = 0; i < level; i++) {
4251 x->forward[i] = update[i]->forward[i];
4252 update[i]->forward[i] = x;
4253 }
4254 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4255 if (x->forward[0])
4256 x->forward[0]->backward = x;
4257 else
4258 zsl->tail = x;
4259 zsl->length++;
4260 }
4261
4262 /* Delete an element with matching score/object from the skiplist. */
4263 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4264 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4265 int i;
4266
4267 x = zsl->header;
4268 for (i = zsl->level-1; i >= 0; i--) {
4269 while (x->forward[i] &&
4270 (x->forward[i]->score < score ||
4271 (x->forward[i]->score == score &&
4272 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4273 x = x->forward[i];
4274 update[i] = x;
4275 }
4276 /* We may have multiple elements with the same score, what we need
4277 * is to find the element with both the right score and object. */
4278 x = x->forward[0];
4279 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4280 for (i = 0; i < zsl->level; i++) {
4281 if (update[i]->forward[i] != x) break;
4282 update[i]->forward[i] = x->forward[i];
4283 }
4284 if (x->forward[0]) {
4285 x->forward[0]->backward = (x->backward == zsl->header) ?
4286 NULL : x->backward;
4287 } else {
4288 zsl->tail = x->backward;
4289 }
4290 zslFreeNode(x);
4291 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4292 zsl->level--;
4293 zsl->length--;
4294 return 1;
4295 } else {
4296 return 0; /* not found */
4297 }
4298 return 0; /* not found */
4299 }
4300
4301 /* Delete all the elements with score between min and max from the skiplist.
4302 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4303 * Note that this function takes the reference to the hash table view of the
4304 * sorted set, in order to remove the elements from the hash table too. */
4305 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4306 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4307 unsigned long removed = 0;
4308 int i;
4309
4310 x = zsl->header;
4311 for (i = zsl->level-1; i >= 0; i--) {
4312 while (x->forward[i] && x->forward[i]->score < min)
4313 x = x->forward[i];
4314 update[i] = x;
4315 }
4316 /* We may have multiple elements with the same score, what we need
4317 * is to find the element with both the right score and object. */
4318 x = x->forward[0];
4319 while (x && x->score <= max) {
4320 zskiplistNode *next;
4321
4322 for (i = 0; i < zsl->level; i++) {
4323 if (update[i]->forward[i] != x) break;
4324 update[i]->forward[i] = x->forward[i];
4325 }
4326 if (x->forward[0]) {
4327 x->forward[0]->backward = (x->backward == zsl->header) ?
4328 NULL : x->backward;
4329 } else {
4330 zsl->tail = x->backward;
4331 }
4332 next = x->forward[0];
4333 dictDelete(dict,x->obj);
4334 zslFreeNode(x);
4335 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4336 zsl->level--;
4337 zsl->length--;
4338 removed++;
4339 x = next;
4340 }
4341 return removed; /* not found */
4342 }
4343
4344 /* Find the first node having a score equal or greater than the specified one.
4345 * Returns NULL if there is no match. */
4346 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4347 zskiplistNode *x;
4348 int i;
4349
4350 x = zsl->header;
4351 for (i = zsl->level-1; i >= 0; i--) {
4352 while (x->forward[i] && x->forward[i]->score < score)
4353 x = x->forward[i];
4354 }
4355 /* We may have multiple elements with the same score, what we need
4356 * is to find the element with both the right score and object. */
4357 return x->forward[0];
4358 }
4359
4360 /* The actual Z-commands implementations */
4361
4362 /* This generic command implements both ZADD and ZINCRBY.
4363 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4364 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4365 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4366 robj *zsetobj;
4367 zset *zs;
4368 double *score;
4369
4370 zsetobj = lookupKeyWrite(c->db,key);
4371 if (zsetobj == NULL) {
4372 zsetobj = createZsetObject();
4373 dictAdd(c->db->dict,key,zsetobj);
4374 incrRefCount(key);
4375 } else {
4376 if (zsetobj->type != REDIS_ZSET) {
4377 addReply(c,shared.wrongtypeerr);
4378 return;
4379 }
4380 }
4381 zs = zsetobj->ptr;
4382
4383 /* Ok now since we implement both ZADD and ZINCRBY here the code
4384 * needs to handle the two different conditions. It's all about setting
4385 * '*score', that is, the new score to set, to the right value. */
4386 score = zmalloc(sizeof(double));
4387 if (doincrement) {
4388 dictEntry *de;
4389
4390 /* Read the old score. If the element was not present starts from 0 */
4391 de = dictFind(zs->dict,ele);
4392 if (de) {
4393 double *oldscore = dictGetEntryVal(de);
4394 *score = *oldscore + scoreval;
4395 } else {
4396 *score = scoreval;
4397 }
4398 } else {
4399 *score = scoreval;
4400 }
4401
4402 /* What follows is a simple remove and re-insert operation that is common
4403 * to both ZADD and ZINCRBY... */
4404 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4405 /* case 1: New element */
4406 incrRefCount(ele); /* added to hash */
4407 zslInsert(zs->zsl,*score,ele);
4408 incrRefCount(ele); /* added to skiplist */
4409 server.dirty++;
4410 if (doincrement)
4411 addReplyDouble(c,*score);
4412 else
4413 addReply(c,shared.cone);
4414 } else {
4415 dictEntry *de;
4416 double *oldscore;
4417
4418 /* case 2: Score update operation */
4419 de = dictFind(zs->dict,ele);
4420 redisAssert(de != NULL);
4421 oldscore = dictGetEntryVal(de);
4422 if (*score != *oldscore) {
4423 int deleted;
4424
4425 /* Remove and insert the element in the skip list with new score */
4426 deleted = zslDelete(zs->zsl,*oldscore,ele);
4427 redisAssert(deleted != 0);
4428 zslInsert(zs->zsl,*score,ele);
4429 incrRefCount(ele);
4430 /* Update the score in the hash table */
4431 dictReplace(zs->dict,ele,score);
4432 server.dirty++;
4433 } else {
4434 zfree(score);
4435 }
4436 if (doincrement)
4437 addReplyDouble(c,*score);
4438 else
4439 addReply(c,shared.czero);
4440 }
4441 }
4442
4443 static void zaddCommand(redisClient *c) {
4444 double scoreval;
4445
4446 scoreval = strtod(c->argv[2]->ptr,NULL);
4447 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4448 }
4449
4450 static void zincrbyCommand(redisClient *c) {
4451 double scoreval;
4452
4453 scoreval = strtod(c->argv[2]->ptr,NULL);
4454 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4455 }
4456
4457 static void zremCommand(redisClient *c) {
4458 robj *zsetobj;
4459 zset *zs;
4460
4461 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4462 if (zsetobj == NULL) {
4463 addReply(c,shared.czero);
4464 } else {
4465 dictEntry *de;
4466 double *oldscore;
4467 int deleted;
4468
4469 if (zsetobj->type != REDIS_ZSET) {
4470 addReply(c,shared.wrongtypeerr);
4471 return;
4472 }
4473 zs = zsetobj->ptr;
4474 de = dictFind(zs->dict,c->argv[2]);
4475 if (de == NULL) {
4476 addReply(c,shared.czero);
4477 return;
4478 }
4479 /* Delete from the skiplist */
4480 oldscore = dictGetEntryVal(de);
4481 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4482 redisAssert(deleted != 0);
4483
4484 /* Delete from the hash table */
4485 dictDelete(zs->dict,c->argv[2]);
4486 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4487 server.dirty++;
4488 addReply(c,shared.cone);
4489 }
4490 }
4491
4492 static void zremrangebyscoreCommand(redisClient *c) {
4493 double min = strtod(c->argv[2]->ptr,NULL);
4494 double max = strtod(c->argv[3]->ptr,NULL);
4495 robj *zsetobj;
4496 zset *zs;
4497
4498 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4499 if (zsetobj == NULL) {
4500 addReply(c,shared.czero);
4501 } else {
4502 long deleted;
4503
4504 if (zsetobj->type != REDIS_ZSET) {
4505 addReply(c,shared.wrongtypeerr);
4506 return;
4507 }
4508 zs = zsetobj->ptr;
4509 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4510 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4511 server.dirty += deleted;
4512 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4513 }
4514 }
4515
4516 static void zrangeGenericCommand(redisClient *c, int reverse) {
4517 robj *o;
4518 int start = atoi(c->argv[2]->ptr);
4519 int end = atoi(c->argv[3]->ptr);
4520
4521 o = lookupKeyRead(c->db,c->argv[1]);
4522 if (o == NULL) {
4523 addReply(c,shared.nullmultibulk);
4524 } else {
4525 if (o->type != REDIS_ZSET) {
4526 addReply(c,shared.wrongtypeerr);
4527 } else {
4528 zset *zsetobj = o->ptr;
4529 zskiplist *zsl = zsetobj->zsl;
4530 zskiplistNode *ln;
4531
4532 int llen = zsl->length;
4533 int rangelen, j;
4534 robj *ele;
4535
4536 /* convert negative indexes */
4537 if (start < 0) start = llen+start;
4538 if (end < 0) end = llen+end;
4539 if (start < 0) start = 0;
4540 if (end < 0) end = 0;
4541
4542 /* indexes sanity checks */
4543 if (start > end || start >= llen) {
4544 /* Out of range start or start > end result in empty list */
4545 addReply(c,shared.emptymultibulk);
4546 return;
4547 }
4548 if (end >= llen) end = llen-1;
4549 rangelen = (end-start)+1;
4550
4551 /* Return the result in form of a multi-bulk reply */
4552 if (reverse) {
4553 ln = zsl->tail;
4554 while (start--)
4555 ln = ln->backward;
4556 } else {
4557 ln = zsl->header->forward[0];
4558 while (start--)
4559 ln = ln->forward[0];
4560 }
4561
4562 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4563 for (j = 0; j < rangelen; j++) {
4564 ele = ln->obj;
4565 addReplyBulkLen(c,ele);
4566 addReply(c,ele);
4567 addReply(c,shared.crlf);
4568 ln = reverse ? ln->backward : ln->forward[0];
4569 }
4570 }
4571 }
4572 }
4573
4574 static void zrangeCommand(redisClient *c) {
4575 zrangeGenericCommand(c,0);
4576 }
4577
4578 static void zrevrangeCommand(redisClient *c) {
4579 zrangeGenericCommand(c,1);
4580 }
4581
4582 static void zrangebyscoreCommand(redisClient *c) {
4583 robj *o;
4584 double min = strtod(c->argv[2]->ptr,NULL);
4585 double max = strtod(c->argv[3]->ptr,NULL);
4586 int offset = 0, limit = -1;
4587
4588 if (c->argc != 4 && c->argc != 7) {
4589 addReplySds(c,
4590 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4591 return;
4592 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4593 addReply(c,shared.syntaxerr);
4594 return;
4595 } else if (c->argc == 7) {
4596 offset = atoi(c->argv[5]->ptr);
4597 limit = atoi(c->argv[6]->ptr);
4598 if (offset < 0) offset = 0;
4599 }
4600
4601 o = lookupKeyRead(c->db,c->argv[1]);
4602 if (o == NULL) {
4603 addReply(c,shared.nullmultibulk);
4604 } else {
4605 if (o->type != REDIS_ZSET) {
4606 addReply(c,shared.wrongtypeerr);
4607 } else {
4608 zset *zsetobj = o->ptr;
4609 zskiplist *zsl = zsetobj->zsl;
4610 zskiplistNode *ln;
4611 robj *ele, *lenobj;
4612 unsigned int rangelen = 0;
4613
4614 /* Get the first node with the score >= min */
4615 ln = zslFirstWithScore(zsl,min);
4616 if (ln == NULL) {
4617 /* No element matching the speciifed interval */
4618 addReply(c,shared.emptymultibulk);
4619 return;
4620 }
4621
4622 /* We don't know in advance how many matching elements there
4623 * are in the list, so we push this object that will represent
4624 * the multi-bulk length in the output buffer, and will "fix"
4625 * it later */
4626 lenobj = createObject(REDIS_STRING,NULL);
4627 addReply(c,lenobj);
4628 decrRefCount(lenobj);
4629
4630 while(ln && ln->score <= max) {
4631 if (offset) {
4632 offset--;
4633 ln = ln->forward[0];
4634 continue;
4635 }
4636 if (limit == 0) break;
4637 ele = ln->obj;
4638 addReplyBulkLen(c,ele);
4639 addReply(c,ele);
4640 addReply(c,shared.crlf);
4641 ln = ln->forward[0];
4642 rangelen++;
4643 if (limit > 0) limit--;
4644 }
4645 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4646 }
4647 }
4648 }
4649
4650 static void zcardCommand(redisClient *c) {
4651 robj *o;
4652 zset *zs;
4653
4654 o = lookupKeyRead(c->db,c->argv[1]);
4655 if (o == NULL) {
4656 addReply(c,shared.czero);
4657 return;
4658 } else {
4659 if (o->type != REDIS_ZSET) {
4660 addReply(c,shared.wrongtypeerr);
4661 } else {
4662 zs = o->ptr;
4663 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
4664 }
4665 }
4666 }
4667
4668 static void zscoreCommand(redisClient *c) {
4669 robj *o;
4670 zset *zs;
4671
4672 o = lookupKeyRead(c->db,c->argv[1]);
4673 if (o == NULL) {
4674 addReply(c,shared.nullbulk);
4675 return;
4676 } else {
4677 if (o->type != REDIS_ZSET) {
4678 addReply(c,shared.wrongtypeerr);
4679 } else {
4680 dictEntry *de;
4681
4682 zs = o->ptr;
4683 de = dictFind(zs->dict,c->argv[2]);
4684 if (!de) {
4685 addReply(c,shared.nullbulk);
4686 } else {
4687 double *score = dictGetEntryVal(de);
4688
4689 addReplyDouble(c,*score);
4690 }
4691 }
4692 }
4693 }
4694
4695 /* ========================= Non type-specific commands ==================== */
4696
4697 static void flushdbCommand(redisClient *c) {
4698 server.dirty += dictSize(c->db->dict);
4699 dictEmpty(c->db->dict);
4700 dictEmpty(c->db->expires);
4701 addReply(c,shared.ok);
4702 }
4703
4704 static void flushallCommand(redisClient *c) {
4705 server.dirty += emptyDb();
4706 addReply(c,shared.ok);
4707 rdbSave(server.dbfilename);
4708 server.dirty++;
4709 }
4710
4711 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4712 redisSortOperation *so = zmalloc(sizeof(*so));
4713 so->type = type;
4714 so->pattern = pattern;
4715 return so;
4716 }
4717
4718 /* Return the value associated to the key with a name obtained
4719 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4720 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4721 char *p;
4722 sds spat, ssub;
4723 robj keyobj;
4724 int prefixlen, sublen, postfixlen;
4725 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4726 struct {
4727 long len;
4728 long free;
4729 char buf[REDIS_SORTKEY_MAX+1];
4730 } keyname;
4731
4732 /* If the pattern is "#" return the substitution object itself in order
4733 * to implement the "SORT ... GET #" feature. */
4734 spat = pattern->ptr;
4735 if (spat[0] == '#' && spat[1] == '\0') {
4736 return subst;
4737 }
4738
4739 /* The substitution object may be specially encoded. If so we create
4740 * a decoded object on the fly. Otherwise getDecodedObject will just
4741 * increment the ref count, that we'll decrement later. */
4742 subst = getDecodedObject(subst);
4743
4744 ssub = subst->ptr;
4745 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4746 p = strchr(spat,'*');
4747 if (!p) {
4748 decrRefCount(subst);
4749 return NULL;
4750 }
4751
4752 prefixlen = p-spat;
4753 sublen = sdslen(ssub);
4754 postfixlen = sdslen(spat)-(prefixlen+1);
4755 memcpy(keyname.buf,spat,prefixlen);
4756 memcpy(keyname.buf+prefixlen,ssub,sublen);
4757 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4758 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4759 keyname.len = prefixlen+sublen+postfixlen;
4760
4761 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
4762 decrRefCount(subst);
4763
4764 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4765 return lookupKeyRead(db,&keyobj);
4766 }
4767
4768 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4769 * the additional parameter is not standard but a BSD-specific we have to
4770 * pass sorting parameters via the global 'server' structure */
4771 static int sortCompare(const void *s1, const void *s2) {
4772 const redisSortObject *so1 = s1, *so2 = s2;
4773 int cmp;
4774
4775 if (!server.sort_alpha) {
4776 /* Numeric sorting. Here it's trivial as we precomputed scores */
4777 if (so1->u.score > so2->u.score) {
4778 cmp = 1;
4779 } else if (so1->u.score < so2->u.score) {
4780 cmp = -1;
4781 } else {
4782 cmp = 0;
4783 }
4784 } else {
4785 /* Alphanumeric sorting */
4786 if (server.sort_bypattern) {
4787 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4788 /* At least one compare object is NULL */
4789 if (so1->u.cmpobj == so2->u.cmpobj)
4790 cmp = 0;
4791 else if (so1->u.cmpobj == NULL)
4792 cmp = -1;
4793 else
4794 cmp = 1;
4795 } else {
4796 /* We have both the objects, use strcoll */
4797 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4798 }
4799 } else {
4800 /* Compare elements directly */
4801 robj *dec1, *dec2;
4802
4803 dec1 = getDecodedObject(so1->obj);
4804 dec2 = getDecodedObject(so2->obj);
4805 cmp = strcoll(dec1->ptr,dec2->ptr);
4806 decrRefCount(dec1);
4807 decrRefCount(dec2);
4808 }
4809 }
4810 return server.sort_desc ? -cmp : cmp;
4811 }
4812
4813 /* The SORT command is the most complex command in Redis. Warning: this code
4814 * is optimized for speed and a bit less for readability */
4815 static void sortCommand(redisClient *c) {
4816 list *operations;
4817 int outputlen = 0;
4818 int desc = 0, alpha = 0;
4819 int limit_start = 0, limit_count = -1, start, end;
4820 int j, dontsort = 0, vectorlen;
4821 int getop = 0; /* GET operation counter */
4822 robj *sortval, *sortby = NULL, *storekey = NULL;
4823 redisSortObject *vector; /* Resulting vector to sort */
4824
4825 /* Lookup the key to sort. It must be of the right types */
4826 sortval = lookupKeyRead(c->db,c->argv[1]);
4827 if (sortval == NULL) {
4828 addReply(c,shared.nokeyerr);
4829 return;
4830 }
4831 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
4832 sortval->type != REDIS_ZSET)
4833 {
4834 addReply(c,shared.wrongtypeerr);
4835 return;
4836 }
4837
4838 /* Create a list of operations to perform for every sorted element.
4839 * Operations can be GET/DEL/INCR/DECR */
4840 operations = listCreate();
4841 listSetFreeMethod(operations,zfree);
4842 j = 2;
4843
4844 /* Now we need to protect sortval incrementing its count, in the future
4845 * SORT may have options able to overwrite/delete keys during the sorting
4846 * and the sorted key itself may get destroied */
4847 incrRefCount(sortval);
4848
4849 /* The SORT command has an SQL-alike syntax, parse it */
4850 while(j < c->argc) {
4851 int leftargs = c->argc-j-1;
4852 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4853 desc = 0;
4854 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4855 desc = 1;
4856 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4857 alpha = 1;
4858 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4859 limit_start = atoi(c->argv[j+1]->ptr);
4860 limit_count = atoi(c->argv[j+2]->ptr);
4861 j+=2;
4862 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4863 storekey = c->argv[j+1];
4864 j++;
4865 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4866 sortby = c->argv[j+1];
4867 /* If the BY pattern does not contain '*', i.e. it is constant,
4868 * we don't need to sort nor to lookup the weight keys. */
4869 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4870 j++;
4871 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4872 listAddNodeTail(operations,createSortOperation(
4873 REDIS_SORT_GET,c->argv[j+1]));
4874 getop++;
4875 j++;
4876 } else {
4877 decrRefCount(sortval);
4878 listRelease(operations);
4879 addReply(c,shared.syntaxerr);
4880 return;
4881 }
4882 j++;
4883 }
4884
4885 /* Load the sorting vector with all the objects to sort */
4886 switch(sortval->type) {
4887 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
4888 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
4889 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
4890 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
4891 }
4892 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4893 j = 0;
4894
4895 if (sortval->type == REDIS_LIST) {
4896 list *list = sortval->ptr;
4897 listNode *ln;
4898
4899 listRewind(list);
4900 while((ln = listYield(list))) {
4901 robj *ele = ln->value;
4902 vector[j].obj = ele;
4903 vector[j].u.score = 0;
4904 vector[j].u.cmpobj = NULL;
4905 j++;
4906 }
4907 } else {
4908 dict *set;
4909 dictIterator *di;
4910 dictEntry *setele;
4911
4912 if (sortval->type == REDIS_SET) {
4913 set = sortval->ptr;
4914 } else {
4915 zset *zs = sortval->ptr;
4916 set = zs->dict;
4917 }
4918
4919 di = dictGetIterator(set);
4920 while((setele = dictNext(di)) != NULL) {
4921 vector[j].obj = dictGetEntryKey(setele);
4922 vector[j].u.score = 0;
4923 vector[j].u.cmpobj = NULL;
4924 j++;
4925 }
4926 dictReleaseIterator(di);
4927 }
4928 redisAssert(j == vectorlen);
4929
4930 /* Now it's time to load the right scores in the sorting vector */
4931 if (dontsort == 0) {
4932 for (j = 0; j < vectorlen; j++) {
4933 if (sortby) {
4934 robj *byval;
4935
4936 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4937 if (!byval || byval->type != REDIS_STRING) continue;
4938 if (alpha) {
4939 vector[j].u.cmpobj = getDecodedObject(byval);
4940 } else {
4941 if (byval->encoding == REDIS_ENCODING_RAW) {
4942 vector[j].u.score = strtod(byval->ptr,NULL);
4943 } else {
4944 /* Don't need to decode the object if it's
4945 * integer-encoded (the only encoding supported) so
4946 * far. We can just cast it */
4947 if (byval->encoding == REDIS_ENCODING_INT) {
4948 vector[j].u.score = (long)byval->ptr;
4949 } else
4950 redisAssert(1 != 1);
4951 }
4952 }
4953 } else {
4954 if (!alpha) {
4955 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4956 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4957 else {
4958 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4959 vector[j].u.score = (long) vector[j].obj->ptr;
4960 else
4961 redisAssert(1 != 1);
4962 }
4963 }
4964 }
4965 }
4966 }
4967
4968 /* We are ready to sort the vector... perform a bit of sanity check
4969 * on the LIMIT option too. We'll use a partial version of quicksort. */
4970 start = (limit_start < 0) ? 0 : limit_start;
4971 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4972 if (start >= vectorlen) {
4973 start = vectorlen-1;
4974 end = vectorlen-2;
4975 }
4976 if (end >= vectorlen) end = vectorlen-1;
4977
4978 if (dontsort == 0) {
4979 server.sort_desc = desc;
4980 server.sort_alpha = alpha;
4981 server.sort_bypattern = sortby ? 1 : 0;
4982 if (sortby && (start != 0 || end != vectorlen-1))
4983 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4984 else
4985 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4986 }
4987
4988 /* Send command output to the output buffer, performing the specified
4989 * GET/DEL/INCR/DECR operations if any. */
4990 outputlen = getop ? getop*(end-start+1) : end-start+1;
4991 if (storekey == NULL) {
4992 /* STORE option not specified, sent the sorting result to client */
4993 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4994 for (j = start; j <= end; j++) {
4995 listNode *ln;
4996 if (!getop) {
4997 addReplyBulkLen(c,vector[j].obj);
4998 addReply(c,vector[j].obj);
4999 addReply(c,shared.crlf);
5000 }
5001 listRewind(operations);
5002 while((ln = listYield(operations))) {
5003 redisSortOperation *sop = ln->value;
5004 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5005 vector[j].obj);
5006
5007 if (sop->type == REDIS_SORT_GET) {
5008 if (!val || val->type != REDIS_STRING) {
5009 addReply(c,shared.nullbulk);
5010 } else {
5011 addReplyBulkLen(c,val);
5012 addReply(c,val);
5013 addReply(c,shared.crlf);
5014 }
5015 } else {
5016 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5017 }
5018 }
5019 }
5020 } else {
5021 robj *listObject = createListObject();
5022 list *listPtr = (list*) listObject->ptr;
5023
5024 /* STORE option specified, set the sorting result as a List object */
5025 for (j = start; j <= end; j++) {
5026 listNode *ln;
5027 if (!getop) {
5028 listAddNodeTail(listPtr,vector[j].obj);
5029 incrRefCount(vector[j].obj);
5030 }
5031 listRewind(operations);
5032 while((ln = listYield(operations))) {
5033 redisSortOperation *sop = ln->value;
5034 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5035 vector[j].obj);
5036
5037 if (sop->type == REDIS_SORT_GET) {
5038 if (!val || val->type != REDIS_STRING) {
5039 listAddNodeTail(listPtr,createStringObject("",0));
5040 } else {
5041 listAddNodeTail(listPtr,val);
5042 incrRefCount(val);
5043 }
5044 } else {
5045 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5046 }
5047 }
5048 }
5049 if (dictReplace(c->db->dict,storekey,listObject)) {
5050 incrRefCount(storekey);
5051 }
5052 /* Note: we add 1 because the DB is dirty anyway since even if the
5053 * SORT result is empty a new key is set and maybe the old content
5054 * replaced. */
5055 server.dirty += 1+outputlen;
5056 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5057 }
5058
5059 /* Cleanup */
5060 decrRefCount(sortval);
5061 listRelease(operations);
5062 for (j = 0; j < vectorlen; j++) {
5063 if (sortby && alpha && vector[j].u.cmpobj)
5064 decrRefCount(vector[j].u.cmpobj);
5065 }
5066 zfree(vector);
5067 }
5068
5069 /* Create the string returned by the INFO command. This is decoupled
5070 * by the INFO command itself as we need to report the same information
5071 * on memory corruption problems. */
5072 static sds genRedisInfoString(void) {
5073 sds info;
5074 time_t uptime = time(NULL)-server.stat_starttime;
5075 int j;
5076
5077 info = sdscatprintf(sdsempty(),
5078 "redis_version:%s\r\n"
5079 "arch_bits:%s\r\n"
5080 "multiplexing_api:%s\r\n"
5081 "uptime_in_seconds:%ld\r\n"
5082 "uptime_in_days:%ld\r\n"
5083 "connected_clients:%d\r\n"
5084 "connected_slaves:%d\r\n"
5085 "used_memory:%zu\r\n"
5086 "changes_since_last_save:%lld\r\n"
5087 "bgsave_in_progress:%d\r\n"
5088 "last_save_time:%ld\r\n"
5089 "bgrewriteaof_in_progress:%d\r\n"
5090 "total_connections_received:%lld\r\n"
5091 "total_commands_processed:%lld\r\n"
5092 "role:%s\r\n"
5093 ,REDIS_VERSION,
5094 (sizeof(long) == 8) ? "64" : "32",
5095 aeGetApiName(),
5096 uptime,
5097 uptime/(3600*24),
5098 listLength(server.clients)-listLength(server.slaves),
5099 listLength(server.slaves),
5100 server.usedmemory,
5101 server.dirty,
5102 server.bgsavechildpid != -1,
5103 server.lastsave,
5104 server.bgrewritechildpid != -1,
5105 server.stat_numconnections,
5106 server.stat_numcommands,
5107 server.masterhost == NULL ? "master" : "slave"
5108 );
5109 if (server.masterhost) {
5110 info = sdscatprintf(info,
5111 "master_host:%s\r\n"
5112 "master_port:%d\r\n"
5113 "master_link_status:%s\r\n"
5114 "master_last_io_seconds_ago:%d\r\n"
5115 ,server.masterhost,
5116 server.masterport,
5117 (server.replstate == REDIS_REPL_CONNECTED) ?
5118 "up" : "down",
5119 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5120 );
5121 }
5122 for (j = 0; j < server.dbnum; j++) {
5123 long long keys, vkeys;
5124
5125 keys = dictSize(server.db[j].dict);
5126 vkeys = dictSize(server.db[j].expires);
5127 if (keys || vkeys) {
5128 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5129 j, keys, vkeys);
5130 }
5131 }
5132 return info;
5133 }
5134
5135 static void infoCommand(redisClient *c) {
5136 sds info = genRedisInfoString();
5137 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5138 (unsigned long)sdslen(info)));
5139 addReplySds(c,info);
5140 addReply(c,shared.crlf);
5141 }
5142
5143 static void monitorCommand(redisClient *c) {
5144 /* ignore MONITOR if aleady slave or in monitor mode */
5145 if (c->flags & REDIS_SLAVE) return;
5146
5147 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5148 c->slaveseldb = 0;
5149 listAddNodeTail(server.monitors,c);
5150 addReply(c,shared.ok);
5151 }
5152
5153 /* ================================= Expire ================================= */
5154 static int removeExpire(redisDb *db, robj *key) {
5155 if (dictDelete(db->expires,key) == DICT_OK) {
5156 return 1;
5157 } else {
5158 return 0;
5159 }
5160 }
5161
5162 static int setExpire(redisDb *db, robj *key, time_t when) {
5163 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5164 return 0;
5165 } else {
5166 incrRefCount(key);
5167 return 1;
5168 }
5169 }
5170
5171 /* Return the expire time of the specified key, or -1 if no expire
5172 * is associated with this key (i.e. the key is non volatile) */
5173 static time_t getExpire(redisDb *db, robj *key) {
5174 dictEntry *de;
5175
5176 /* No expire? return ASAP */
5177 if (dictSize(db->expires) == 0 ||
5178 (de = dictFind(db->expires,key)) == NULL) return -1;
5179
5180 return (time_t) dictGetEntryVal(de);
5181 }
5182
5183 static int expireIfNeeded(redisDb *db, robj *key) {
5184 time_t when;
5185 dictEntry *de;
5186
5187 /* No expire? return ASAP */
5188 if (dictSize(db->expires) == 0 ||
5189 (de = dictFind(db->expires,key)) == NULL) return 0;
5190
5191 /* Lookup the expire */
5192 when = (time_t) dictGetEntryVal(de);
5193 if (time(NULL) <= when) return 0;
5194
5195 /* Delete the key */
5196 dictDelete(db->expires,key);
5197 return dictDelete(db->dict,key) == DICT_OK;
5198 }
5199
5200 static int deleteIfVolatile(redisDb *db, robj *key) {
5201 dictEntry *de;
5202
5203 /* No expire? return ASAP */
5204 if (dictSize(db->expires) == 0 ||
5205 (de = dictFind(db->expires,key)) == NULL) return 0;
5206
5207 /* Delete the key */
5208 server.dirty++;
5209 dictDelete(db->expires,key);
5210 return dictDelete(db->dict,key) == DICT_OK;
5211 }
5212
5213 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5214 dictEntry *de;
5215
5216 de = dictFind(c->db->dict,key);
5217 if (de == NULL) {
5218 addReply(c,shared.czero);
5219 return;
5220 }
5221 if (seconds < 0) {
5222 if (deleteKey(c->db,key)) server.dirty++;
5223 addReply(c, shared.cone);
5224 return;
5225 } else {
5226 time_t when = time(NULL)+seconds;
5227 if (setExpire(c->db,key,when)) {
5228 addReply(c,shared.cone);
5229 server.dirty++;
5230 } else {
5231 addReply(c,shared.czero);
5232 }
5233 return;
5234 }
5235 }
5236
5237 static void expireCommand(redisClient *c) {
5238 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5239 }
5240
5241 static void expireatCommand(redisClient *c) {
5242 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5243 }
5244
5245 static void ttlCommand(redisClient *c) {
5246 time_t expire;
5247 int ttl = -1;
5248
5249 expire = getExpire(c->db,c->argv[1]);
5250 if (expire != -1) {
5251 ttl = (int) (expire-time(NULL));
5252 if (ttl < 0) ttl = -1;
5253 }
5254 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5255 }
5256
5257 /* =============================== Replication ============================= */
5258
5259 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5260 ssize_t nwritten, ret = size;
5261 time_t start = time(NULL);
5262
5263 timeout++;
5264 while(size) {
5265 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5266 nwritten = write(fd,ptr,size);
5267 if (nwritten == -1) return -1;
5268 ptr += nwritten;
5269 size -= nwritten;
5270 }
5271 if ((time(NULL)-start) > timeout) {
5272 errno = ETIMEDOUT;
5273 return -1;
5274 }
5275 }
5276 return ret;
5277 }
5278
5279 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5280 ssize_t nread, totread = 0;
5281 time_t start = time(NULL);
5282
5283 timeout++;
5284 while(size) {
5285 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5286 nread = read(fd,ptr,size);
5287 if (nread == -1) return -1;
5288 ptr += nread;
5289 size -= nread;
5290 totread += nread;
5291 }
5292 if ((time(NULL)-start) > timeout) {
5293 errno = ETIMEDOUT;
5294 return -1;
5295 }
5296 }
5297 return totread;
5298 }
5299
5300 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5301 ssize_t nread = 0;
5302
5303 size--;
5304 while(size) {
5305 char c;
5306
5307 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5308 if (c == '\n') {
5309 *ptr = '\0';
5310 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5311 return nread;
5312 } else {
5313 *ptr++ = c;
5314 *ptr = '\0';
5315 nread++;
5316 }
5317 }
5318 return nread;
5319 }
5320
5321 static void syncCommand(redisClient *c) {
5322 /* ignore SYNC if aleady slave or in monitor mode */
5323 if (c->flags & REDIS_SLAVE) return;
5324
5325 /* SYNC can't be issued when the server has pending data to send to
5326 * the client about already issued commands. We need a fresh reply
5327 * buffer registering the differences between the BGSAVE and the current
5328 * dataset, so that we can copy to other slaves if needed. */
5329 if (listLength(c->reply) != 0) {
5330 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5331 return;
5332 }
5333
5334 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5335 /* Here we need to check if there is a background saving operation
5336 * in progress, or if it is required to start one */
5337 if (server.bgsavechildpid != -1) {
5338 /* Ok a background save is in progress. Let's check if it is a good
5339 * one for replication, i.e. if there is another slave that is
5340 * registering differences since the server forked to save */
5341 redisClient *slave;
5342 listNode *ln;
5343
5344 listRewind(server.slaves);
5345 while((ln = listYield(server.slaves))) {
5346 slave = ln->value;
5347 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5348 }
5349 if (ln) {
5350 /* Perfect, the server is already registering differences for
5351 * another slave. Set the right state, and copy the buffer. */
5352 listRelease(c->reply);
5353 c->reply = listDup(slave->reply);
5354 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5355 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5356 } else {
5357 /* No way, we need to wait for the next BGSAVE in order to
5358 * register differences */
5359 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5360 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5361 }
5362 } else {
5363 /* Ok we don't have a BGSAVE in progress, let's start one */
5364 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5365 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5366 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5367 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5368 return;
5369 }
5370 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5371 }
5372 c->repldbfd = -1;
5373 c->flags |= REDIS_SLAVE;
5374 c->slaveseldb = 0;
5375 listAddNodeTail(server.slaves,c);
5376 return;
5377 }
5378
5379 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5380 redisClient *slave = privdata;
5381 REDIS_NOTUSED(el);
5382 REDIS_NOTUSED(mask);
5383 char buf[REDIS_IOBUF_LEN];
5384 ssize_t nwritten, buflen;
5385
5386 if (slave->repldboff == 0) {
5387 /* Write the bulk write count before to transfer the DB. In theory here
5388 * we don't know how much room there is in the output buffer of the
5389 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5390 * operations) will never be smaller than the few bytes we need. */
5391 sds bulkcount;
5392
5393 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5394 slave->repldbsize);
5395 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5396 {
5397 sdsfree(bulkcount);
5398 freeClient(slave);
5399 return;
5400 }
5401 sdsfree(bulkcount);
5402 }
5403 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5404 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5405 if (buflen <= 0) {
5406 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5407 (buflen == 0) ? "premature EOF" : strerror(errno));
5408 freeClient(slave);
5409 return;
5410 }
5411 if ((nwritten = write(fd,buf,buflen)) == -1) {
5412 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5413 strerror(errno));
5414 freeClient(slave);
5415 return;
5416 }
5417 slave->repldboff += nwritten;
5418 if (slave->repldboff == slave->repldbsize) {
5419 close(slave->repldbfd);
5420 slave->repldbfd = -1;
5421 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5422 slave->replstate = REDIS_REPL_ONLINE;
5423 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5424 sendReplyToClient, slave) == AE_ERR) {
5425 freeClient(slave);
5426 return;
5427 }
5428 addReplySds(slave,sdsempty());
5429 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5430 }
5431 }
5432
5433 /* This function is called at the end of every backgrond saving.
5434 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5435 * otherwise REDIS_ERR is passed to the function.
5436 *
5437 * The goal of this function is to handle slaves waiting for a successful
5438 * background saving in order to perform non-blocking synchronization. */
5439 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5440 listNode *ln;
5441 int startbgsave = 0;
5442
5443 listRewind(server.slaves);
5444 while((ln = listYield(server.slaves))) {
5445 redisClient *slave = ln->value;
5446
5447 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5448 startbgsave = 1;
5449 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5450 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5451 struct redis_stat buf;
5452
5453 if (bgsaveerr != REDIS_OK) {
5454 freeClient(slave);
5455 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5456 continue;
5457 }
5458 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5459 redis_fstat(slave->repldbfd,&buf) == -1) {
5460 freeClient(slave);
5461 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5462 continue;
5463 }
5464 slave->repldboff = 0;
5465 slave->repldbsize = buf.st_size;
5466 slave->replstate = REDIS_REPL_SEND_BULK;
5467 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5468 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5469 freeClient(slave);
5470 continue;
5471 }
5472 }
5473 }
5474 if (startbgsave) {
5475 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5476 listRewind(server.slaves);
5477 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5478 while((ln = listYield(server.slaves))) {
5479 redisClient *slave = ln->value;
5480
5481 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5482 freeClient(slave);
5483 }
5484 }
5485 }
5486 }
5487
5488 static int syncWithMaster(void) {
5489 char buf[1024], tmpfile[256], authcmd[1024];
5490 int dumpsize;
5491 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5492 int dfd;
5493
5494 if (fd == -1) {
5495 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5496 strerror(errno));
5497 return REDIS_ERR;
5498 }
5499
5500 /* AUTH with the master if required. */
5501 if(server.masterauth) {
5502 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5503 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5504 close(fd);
5505 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5506 strerror(errno));
5507 return REDIS_ERR;
5508 }
5509 /* Read the AUTH result. */
5510 if (syncReadLine(fd,buf,1024,3600) == -1) {
5511 close(fd);
5512 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5513 strerror(errno));
5514 return REDIS_ERR;
5515 }
5516 if (buf[0] != '+') {
5517 close(fd);
5518 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5519 return REDIS_ERR;
5520 }
5521 }
5522
5523 /* Issue the SYNC command */
5524 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5525 close(fd);
5526 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5527 strerror(errno));
5528 return REDIS_ERR;
5529 }
5530 /* Read the bulk write count */
5531 if (syncReadLine(fd,buf,1024,3600) == -1) {
5532 close(fd);
5533 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5534 strerror(errno));
5535 return REDIS_ERR;
5536 }
5537 if (buf[0] != '$') {
5538 close(fd);
5539 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5540 return REDIS_ERR;
5541 }
5542 dumpsize = atoi(buf+1);
5543 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5544 /* Read the bulk write data on a temp file */
5545 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5546 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5547 if (dfd == -1) {
5548 close(fd);
5549 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5550 return REDIS_ERR;
5551 }
5552 while(dumpsize) {
5553 int nread, nwritten;
5554
5555 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5556 if (nread == -1) {
5557 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5558 strerror(errno));
5559 close(fd);
5560 close(dfd);
5561 return REDIS_ERR;
5562 }
5563 nwritten = write(dfd,buf,nread);
5564 if (nwritten == -1) {
5565 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5566 close(fd);
5567 close(dfd);
5568 return REDIS_ERR;
5569 }
5570 dumpsize -= nread;
5571 }
5572 close(dfd);
5573 if (rename(tmpfile,server.dbfilename) == -1) {
5574 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5575 unlink(tmpfile);
5576 close(fd);
5577 return REDIS_ERR;
5578 }
5579 emptyDb();
5580 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5581 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5582 close(fd);
5583 return REDIS_ERR;
5584 }
5585 server.master = createClient(fd);
5586 server.master->flags |= REDIS_MASTER;
5587 server.master->authenticated = 1;
5588 server.replstate = REDIS_REPL_CONNECTED;
5589 return REDIS_OK;
5590 }
5591
5592 static void slaveofCommand(redisClient *c) {
5593 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5594 !strcasecmp(c->argv[2]->ptr,"one")) {
5595 if (server.masterhost) {
5596 sdsfree(server.masterhost);
5597 server.masterhost = NULL;
5598 if (server.master) freeClient(server.master);
5599 server.replstate = REDIS_REPL_NONE;
5600 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5601 }
5602 } else {
5603 sdsfree(server.masterhost);
5604 server.masterhost = sdsdup(c->argv[1]->ptr);
5605 server.masterport = atoi(c->argv[2]->ptr);
5606 if (server.master) freeClient(server.master);
5607 server.replstate = REDIS_REPL_CONNECT;
5608 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5609 server.masterhost, server.masterport);
5610 }
5611 addReply(c,shared.ok);
5612 }
5613
5614 /* ============================ Maxmemory directive ======================== */
5615
5616 /* This function gets called when 'maxmemory' is set on the config file to limit
5617 * the max memory used by the server, and we are out of memory.
5618 * This function will try to, in order:
5619 *
5620 * - Free objects from the free list
5621 * - Try to remove keys with an EXPIRE set
5622 *
5623 * It is not possible to free enough memory to reach used-memory < maxmemory
5624 * the server will start refusing commands that will enlarge even more the
5625 * memory usage.
5626 */
5627 static void freeMemoryIfNeeded(void) {
5628 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5629 if (listLength(server.objfreelist)) {
5630 robj *o;
5631
5632 listNode *head = listFirst(server.objfreelist);
5633 o = listNodeValue(head);
5634 listDelNode(server.objfreelist,head);
5635 zfree(o);
5636 } else {
5637 int j, k, freed = 0;
5638
5639 for (j = 0; j < server.dbnum; j++) {
5640 int minttl = -1;
5641 robj *minkey = NULL;
5642 struct dictEntry *de;
5643
5644 if (dictSize(server.db[j].expires)) {
5645 freed = 1;
5646 /* From a sample of three keys drop the one nearest to
5647 * the natural expire */
5648 for (k = 0; k < 3; k++) {
5649 time_t t;
5650
5651 de = dictGetRandomKey(server.db[j].expires);
5652 t = (time_t) dictGetEntryVal(de);
5653 if (minttl == -1 || t < minttl) {
5654 minkey = dictGetEntryKey(de);
5655 minttl = t;
5656 }
5657 }
5658 deleteKey(server.db+j,minkey);
5659 }
5660 }
5661 if (!freed) return; /* nothing to free... */
5662 }
5663 }
5664 }
5665
5666 /* ============================== Append Only file ========================== */
5667
5668 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5669 sds buf = sdsempty();
5670 int j;
5671 ssize_t nwritten;
5672 time_t now;
5673 robj *tmpargv[3];
5674
5675 /* The DB this command was targetting is not the same as the last command
5676 * we appendend. To issue a SELECT command is needed. */
5677 if (dictid != server.appendseldb) {
5678 char seldb[64];
5679
5680 snprintf(seldb,sizeof(seldb),"%d",dictid);
5681 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
5682 (unsigned long)strlen(seldb),seldb);
5683 server.appendseldb = dictid;
5684 }
5685
5686 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5687 * EXPIREs into EXPIREATs calls */
5688 if (cmd->proc == expireCommand) {
5689 long when;
5690
5691 tmpargv[0] = createStringObject("EXPIREAT",8);
5692 tmpargv[1] = argv[1];
5693 incrRefCount(argv[1]);
5694 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5695 tmpargv[2] = createObject(REDIS_STRING,
5696 sdscatprintf(sdsempty(),"%ld",when));
5697 argv = tmpargv;
5698 }
5699
5700 /* Append the actual command */
5701 buf = sdscatprintf(buf,"*%d\r\n",argc);
5702 for (j = 0; j < argc; j++) {
5703 robj *o = argv[j];
5704
5705 o = getDecodedObject(o);
5706 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
5707 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5708 buf = sdscatlen(buf,"\r\n",2);
5709 decrRefCount(o);
5710 }
5711
5712 /* Free the objects from the modified argv for EXPIREAT */
5713 if (cmd->proc == expireCommand) {
5714 for (j = 0; j < 3; j++)
5715 decrRefCount(argv[j]);
5716 }
5717
5718 /* We want to perform a single write. This should be guaranteed atomic
5719 * at least if the filesystem we are writing is a real physical one.
5720 * While this will save us against the server being killed I don't think
5721 * there is much to do about the whole server stopping for power problems
5722 * or alike */
5723 nwritten = write(server.appendfd,buf,sdslen(buf));
5724 if (nwritten != (signed)sdslen(buf)) {
5725 /* Ooops, we are in troubles. The best thing to do for now is
5726 * to simply exit instead to give the illusion that everything is
5727 * working as expected. */
5728 if (nwritten == -1) {
5729 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5730 } else {
5731 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5732 }
5733 exit(1);
5734 }
5735 /* If a background append only file rewriting is in progress we want to
5736 * accumulate the differences between the child DB and the current one
5737 * in a buffer, so that when the child process will do its work we
5738 * can append the differences to the new append only file. */
5739 if (server.bgrewritechildpid != -1)
5740 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
5741
5742 sdsfree(buf);
5743 now = time(NULL);
5744 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5745 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5746 now-server.lastfsync > 1))
5747 {
5748 fsync(server.appendfd); /* Let's try to get this data on the disk */
5749 server.lastfsync = now;
5750 }
5751 }
5752
5753 /* In Redis commands are always executed in the context of a client, so in
5754 * order to load the append only file we need to create a fake client. */
5755 static struct redisClient *createFakeClient(void) {
5756 struct redisClient *c = zmalloc(sizeof(*c));
5757
5758 selectDb(c,0);
5759 c->fd = -1;
5760 c->querybuf = sdsempty();
5761 c->argc = 0;
5762 c->argv = NULL;
5763 c->flags = 0;
5764 /* We set the fake client as a slave waiting for the synchronization
5765 * so that Redis will not try to send replies to this client. */
5766 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5767 c->reply = listCreate();
5768 listSetFreeMethod(c->reply,decrRefCount);
5769 listSetDupMethod(c->reply,dupClientReplyValue);
5770 return c;
5771 }
5772
5773 static void freeFakeClient(struct redisClient *c) {
5774 sdsfree(c->querybuf);
5775 listRelease(c->reply);
5776 zfree(c);
5777 }
5778
5779 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5780 * error (the append only file is zero-length) REDIS_ERR is returned. On
5781 * fatal error an error message is logged and the program exists. */
5782 int loadAppendOnlyFile(char *filename) {
5783 struct redisClient *fakeClient;
5784 FILE *fp = fopen(filename,"r");
5785 struct redis_stat sb;
5786
5787 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5788 return REDIS_ERR;
5789
5790 if (fp == NULL) {
5791 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5792 exit(1);
5793 }
5794
5795 fakeClient = createFakeClient();
5796 while(1) {
5797 int argc, j;
5798 unsigned long len;
5799 robj **argv;
5800 char buf[128];
5801 sds argsds;
5802 struct redisCommand *cmd;
5803
5804 if (fgets(buf,sizeof(buf),fp) == NULL) {
5805 if (feof(fp))
5806 break;
5807 else
5808 goto readerr;
5809 }
5810 if (buf[0] != '*') goto fmterr;
5811 argc = atoi(buf+1);
5812 argv = zmalloc(sizeof(robj*)*argc);
5813 for (j = 0; j < argc; j++) {
5814 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5815 if (buf[0] != '$') goto fmterr;
5816 len = strtol(buf+1,NULL,10);
5817 argsds = sdsnewlen(NULL,len);
5818 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
5819 argv[j] = createObject(REDIS_STRING,argsds);
5820 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5821 }
5822
5823 /* Command lookup */
5824 cmd = lookupCommand(argv[0]->ptr);
5825 if (!cmd) {
5826 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5827 exit(1);
5828 }
5829 /* Try object sharing and encoding */
5830 if (server.shareobjects) {
5831 int j;
5832 for(j = 1; j < argc; j++)
5833 argv[j] = tryObjectSharing(argv[j]);
5834 }
5835 if (cmd->flags & REDIS_CMD_BULK)
5836 tryObjectEncoding(argv[argc-1]);
5837 /* Run the command in the context of a fake client */
5838 fakeClient->argc = argc;
5839 fakeClient->argv = argv;
5840 cmd->proc(fakeClient);
5841 /* Discard the reply objects list from the fake client */
5842 while(listLength(fakeClient->reply))
5843 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5844 /* Clean up, ready for the next command */
5845 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5846 zfree(argv);
5847 }
5848 fclose(fp);
5849 freeFakeClient(fakeClient);
5850 return REDIS_OK;
5851
5852 readerr:
5853 if (feof(fp)) {
5854 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5855 } else {
5856 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5857 }
5858 exit(1);
5859 fmterr:
5860 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5861 exit(1);
5862 }
5863
5864 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5865 static int fwriteBulk(FILE *fp, robj *obj) {
5866 char buf[128];
5867 obj = getDecodedObject(obj);
5868 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
5869 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
5870 if (fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0) goto err;
5871 if (fwrite("\r\n",2,1,fp) == 0) goto err;
5872 decrRefCount(obj);
5873 return 1;
5874 err:
5875 decrRefCount(obj);
5876 return 0;
5877 }
5878
5879 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5880 static int fwriteBulkDouble(FILE *fp, double d) {
5881 char buf[128], dbuf[128];
5882
5883 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
5884 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
5885 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5886 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
5887 return 1;
5888 }
5889
5890 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5891 static int fwriteBulkLong(FILE *fp, long l) {
5892 char buf[128], lbuf[128];
5893
5894 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
5895 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
5896 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5897 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
5898 return 1;
5899 }
5900
5901 /* Write a sequence of commands able to fully rebuild the dataset into
5902 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5903 static int rewriteAppendOnlyFile(char *filename) {
5904 dictIterator *di = NULL;
5905 dictEntry *de;
5906 FILE *fp;
5907 char tmpfile[256];
5908 int j;
5909 time_t now = time(NULL);
5910
5911 /* Note that we have to use a different temp name here compared to the
5912 * one used by rewriteAppendOnlyFileBackground() function. */
5913 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
5914 fp = fopen(tmpfile,"w");
5915 if (!fp) {
5916 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
5917 return REDIS_ERR;
5918 }
5919 for (j = 0; j < server.dbnum; j++) {
5920 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
5921 redisDb *db = server.db+j;
5922 dict *d = db->dict;
5923 if (dictSize(d) == 0) continue;
5924 di = dictGetIterator(d);
5925 if (!di) {
5926 fclose(fp);
5927 return REDIS_ERR;
5928 }
5929
5930 /* SELECT the new DB */
5931 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
5932 if (fwriteBulkLong(fp,j) == 0) goto werr;
5933
5934 /* Iterate this DB writing every entry */
5935 while((de = dictNext(di)) != NULL) {
5936 robj *key = dictGetEntryKey(de);
5937 robj *o = dictGetEntryVal(de);
5938 time_t expiretime = getExpire(db,key);
5939
5940 /* Save the key and associated value */
5941 if (o->type == REDIS_STRING) {
5942 /* Emit a SET command */
5943 char cmd[]="*3\r\n$3\r\nSET\r\n";
5944 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5945 /* Key and value */
5946 if (fwriteBulk(fp,key) == 0) goto werr;
5947 if (fwriteBulk(fp,o) == 0) goto werr;
5948 } else if (o->type == REDIS_LIST) {
5949 /* Emit the RPUSHes needed to rebuild the list */
5950 list *list = o->ptr;
5951 listNode *ln;
5952
5953 listRewind(list);
5954 while((ln = listYield(list))) {
5955 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
5956 robj *eleobj = listNodeValue(ln);
5957
5958 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5959 if (fwriteBulk(fp,key) == 0) goto werr;
5960 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5961 }
5962 } else if (o->type == REDIS_SET) {
5963 /* Emit the SADDs needed to rebuild the set */
5964 dict *set = o->ptr;
5965 dictIterator *di = dictGetIterator(set);
5966 dictEntry *de;
5967
5968 while((de = dictNext(di)) != NULL) {
5969 char cmd[]="*3\r\n$4\r\nSADD\r\n";
5970 robj *eleobj = dictGetEntryKey(de);
5971
5972 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5973 if (fwriteBulk(fp,key) == 0) goto werr;
5974 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5975 }
5976 dictReleaseIterator(di);
5977 } else if (o->type == REDIS_ZSET) {
5978 /* Emit the ZADDs needed to rebuild the sorted set */
5979 zset *zs = o->ptr;
5980 dictIterator *di = dictGetIterator(zs->dict);
5981 dictEntry *de;
5982
5983 while((de = dictNext(di)) != NULL) {
5984 char cmd[]="*4\r\n$4\r\nZADD\r\n";
5985 robj *eleobj = dictGetEntryKey(de);
5986 double *score = dictGetEntryVal(de);
5987
5988 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5989 if (fwriteBulk(fp,key) == 0) goto werr;
5990 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
5991 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5992 }
5993 dictReleaseIterator(di);
5994 } else {
5995 redisAssert(0 != 0);
5996 }
5997 /* Save the expire time */
5998 if (expiretime != -1) {
5999 char cmd[]="*3\r\n$6\r\nEXPIRE\r\n";
6000 /* If this key is already expired skip it */
6001 if (expiretime < now) continue;
6002 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6003 if (fwriteBulk(fp,key) == 0) goto werr;
6004 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6005 }
6006 }
6007 dictReleaseIterator(di);
6008 }
6009
6010 /* Make sure data will not remain on the OS's output buffers */
6011 fflush(fp);
6012 fsync(fileno(fp));
6013 fclose(fp);
6014
6015 /* Use RENAME to make sure the DB file is changed atomically only
6016 * if the generate DB file is ok. */
6017 if (rename(tmpfile,filename) == -1) {
6018 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6019 unlink(tmpfile);
6020 return REDIS_ERR;
6021 }
6022 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6023 return REDIS_OK;
6024
6025 werr:
6026 fclose(fp);
6027 unlink(tmpfile);
6028 redisLog(REDIS_WARNING,"Write error writing append only fileon disk: %s", strerror(errno));
6029 if (di) dictReleaseIterator(di);
6030 return REDIS_ERR;
6031 }
6032
6033 /* This is how rewriting of the append only file in background works:
6034 *
6035 * 1) The user calls BGREWRITEAOF
6036 * 2) Redis calls this function, that forks():
6037 * 2a) the child rewrite the append only file in a temp file.
6038 * 2b) the parent accumulates differences in server.bgrewritebuf.
6039 * 3) When the child finished '2a' exists.
6040 * 4) The parent will trap the exit code, if it's OK, will append the
6041 * data accumulated into server.bgrewritebuf into the temp file, and
6042 * finally will rename(2) the temp file in the actual file name.
6043 * The the new file is reopened as the new append only file. Profit!
6044 */
6045 static int rewriteAppendOnlyFileBackground(void) {
6046 pid_t childpid;
6047
6048 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6049 if ((childpid = fork()) == 0) {
6050 /* Child */
6051 char tmpfile[256];
6052 close(server.fd);
6053
6054 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6055 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6056 exit(0);
6057 } else {
6058 exit(1);
6059 }
6060 } else {
6061 /* Parent */
6062 if (childpid == -1) {
6063 redisLog(REDIS_WARNING,
6064 "Can't rewrite append only file in background: fork: %s",
6065 strerror(errno));
6066 return REDIS_ERR;
6067 }
6068 redisLog(REDIS_NOTICE,
6069 "Background append only file rewriting started by pid %d",childpid);
6070 server.bgrewritechildpid = childpid;
6071 /* We set appendseldb to -1 in order to force the next call to the
6072 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6073 * accumulated by the parent into server.bgrewritebuf will start
6074 * with a SELECT statement and it will be safe to merge. */
6075 server.appendseldb = -1;
6076 return REDIS_OK;
6077 }
6078 return REDIS_OK; /* unreached */
6079 }
6080
6081 static void bgrewriteaofCommand(redisClient *c) {
6082 if (server.bgrewritechildpid != -1) {
6083 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6084 return;
6085 }
6086 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6087 char *status = "+Background append only file rewriting started\r\n";
6088 addReplySds(c,sdsnew(status));
6089 } else {
6090 addReply(c,shared.err);
6091 }
6092 }
6093
6094 static void aofRemoveTempFile(pid_t childpid) {
6095 char tmpfile[256];
6096
6097 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6098 unlink(tmpfile);
6099 }
6100
6101 /* ================================= Debugging ============================== */
6102
6103 static void debugCommand(redisClient *c) {
6104 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6105 *((char*)-1) = 'x';
6106 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6107 if (rdbSave(server.dbfilename) != REDIS_OK) {
6108 addReply(c,shared.err);
6109 return;
6110 }
6111 emptyDb();
6112 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6113 addReply(c,shared.err);
6114 return;
6115 }
6116 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6117 addReply(c,shared.ok);
6118 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6119 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6120 robj *key, *val;
6121
6122 if (!de) {
6123 addReply(c,shared.nokeyerr);
6124 return;
6125 }
6126 key = dictGetEntryKey(de);
6127 val = dictGetEntryVal(de);
6128 addReplySds(c,sdscatprintf(sdsempty(),
6129 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6130 (void*)key, key->refcount, (void*)val, val->refcount,
6131 val->encoding));
6132 } else {
6133 addReplySds(c,sdsnew(
6134 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6135 }
6136 }
6137
6138 static void _redisAssert(char *estr) {
6139 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6140 redisLog(REDIS_WARNING,"==> %s\n",estr);
6141 #ifdef HAVE_BACKTRACE
6142 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6143 *((char*)-1) = 'x';
6144 #endif
6145 }
6146
6147 /* =================================== Main! ================================ */
6148
6149 #ifdef __linux__
6150 int linuxOvercommitMemoryValue(void) {
6151 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6152 char buf[64];
6153
6154 if (!fp) return -1;
6155 if (fgets(buf,64,fp) == NULL) {
6156 fclose(fp);
6157 return -1;
6158 }
6159 fclose(fp);
6160
6161 return atoi(buf);
6162 }
6163
6164 void linuxOvercommitMemoryWarning(void) {
6165 if (linuxOvercommitMemoryValue() == 0) {
6166 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6167 }
6168 }
6169 #endif /* __linux__ */
6170
6171 static void daemonize(void) {
6172 int fd;
6173 FILE *fp;
6174
6175 if (fork() != 0) exit(0); /* parent exits */
6176 printf("New pid: %d\n", getpid());
6177 setsid(); /* create a new session */
6178
6179 /* Every output goes to /dev/null. If Redis is daemonized but
6180 * the 'logfile' is set to 'stdout' in the configuration file
6181 * it will not log at all. */
6182 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6183 dup2(fd, STDIN_FILENO);
6184 dup2(fd, STDOUT_FILENO);
6185 dup2(fd, STDERR_FILENO);
6186 if (fd > STDERR_FILENO) close(fd);
6187 }
6188 /* Try to write the pid file */
6189 fp = fopen(server.pidfile,"w");
6190 if (fp) {
6191 fprintf(fp,"%d\n",getpid());
6192 fclose(fp);
6193 }
6194 }
6195
6196 int main(int argc, char **argv) {
6197 initServerConfig();
6198 if (argc == 2) {
6199 resetServerSaveParams();
6200 loadServerConfig(argv[1]);
6201 } else if (argc > 2) {
6202 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6203 exit(1);
6204 } else {
6205 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6206 }
6207 if (server.daemonize) daemonize();
6208 initServer();
6209 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6210 #ifdef __linux__
6211 linuxOvercommitMemoryWarning();
6212 #endif
6213 if (server.appendonly) {
6214 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6215 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6216 } else {
6217 if (rdbLoad(server.dbfilename) == REDIS_OK)
6218 redisLog(REDIS_NOTICE,"DB loaded from disk");
6219 }
6220 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6221 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6222 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6223 aeMain(server.el);
6224 aeDeleteEventLoop(server.el);
6225 return 0;
6226 }
6227
6228 /* ============================= Backtrace support ========================= */
6229
6230 #ifdef HAVE_BACKTRACE
6231 static char *findFuncName(void *pointer, unsigned long *offset);
6232
6233 static void *getMcontextEip(ucontext_t *uc) {
6234 #if defined(__FreeBSD__)
6235 return (void*) uc->uc_mcontext.mc_eip;
6236 #elif defined(__dietlibc__)
6237 return (void*) uc->uc_mcontext.eip;
6238 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6239 #if __x86_64__
6240 return (void*) uc->uc_mcontext->__ss.__rip;
6241 #else
6242 return (void*) uc->uc_mcontext->__ss.__eip;
6243 #endif
6244 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6245 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6246 return (void*) uc->uc_mcontext->__ss.__rip;
6247 #else
6248 return (void*) uc->uc_mcontext->__ss.__eip;
6249 #endif
6250 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6251 return (void*) uc->uc_mcontext.gregs[REG_EIP];
6252 #elif defined(__ia64__) /* Linux IA64 */
6253 return (void*) uc->uc_mcontext.sc_ip;
6254 #else
6255 return NULL;
6256 #endif
6257 }
6258
6259 static void segvHandler(int sig, siginfo_t *info, void *secret) {
6260 void *trace[100];
6261 char **messages = NULL;
6262 int i, trace_size = 0;
6263 unsigned long offset=0;
6264 ucontext_t *uc = (ucontext_t*) secret;
6265 sds infostring;
6266 REDIS_NOTUSED(info);
6267
6268 redisLog(REDIS_WARNING,
6269 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6270 infostring = genRedisInfoString();
6271 redisLog(REDIS_WARNING, "%s",infostring);
6272 /* It's not safe to sdsfree() the returned string under memory
6273 * corruption conditions. Let it leak as we are going to abort */
6274
6275 trace_size = backtrace(trace, 100);
6276 /* overwrite sigaction with caller's address */
6277 if (getMcontextEip(uc) != NULL) {
6278 trace[1] = getMcontextEip(uc);
6279 }
6280 messages = backtrace_symbols(trace, trace_size);
6281
6282 for (i=1; i<trace_size; ++i) {
6283 char *fn = findFuncName(trace[i], &offset), *p;
6284
6285 p = strchr(messages[i],'+');
6286 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6287 redisLog(REDIS_WARNING,"%s", messages[i]);
6288 } else {
6289 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6290 }
6291 }
6292 // free(messages); Don't call free() with possibly corrupted memory.
6293 exit(0);
6294 }
6295
6296 static void setupSigSegvAction(void) {
6297 struct sigaction act;
6298
6299 sigemptyset (&act.sa_mask);
6300 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6301 * is used. Otherwise, sa_handler is used */
6302 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6303 act.sa_sigaction = segvHandler;
6304 sigaction (SIGSEGV, &act, NULL);
6305 sigaction (SIGBUS, &act, NULL);
6306 sigaction (SIGFPE, &act, NULL);
6307 sigaction (SIGILL, &act, NULL);
6308 sigaction (SIGBUS, &act, NULL);
6309 return;
6310 }
6311
6312 #include "staticsymbols.h"
6313 /* This function try to convert a pointer into a function name. It's used in
6314 * oreder to provide a backtrace under segmentation fault that's able to
6315 * display functions declared as static (otherwise the backtrace is useless). */
6316 static char *findFuncName(void *pointer, unsigned long *offset){
6317 int i, ret = -1;
6318 unsigned long off, minoff = 0;
6319
6320 /* Try to match against the Symbol with the smallest offset */
6321 for (i=0; symsTable[i].pointer; i++) {
6322 unsigned long lp = (unsigned long) pointer;
6323
6324 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6325 off=lp-symsTable[i].pointer;
6326 if (ret < 0 || off < minoff) {
6327 minoff=off;
6328 ret=i;
6329 }
6330 }
6331 }
6332 if (ret == -1) return NULL;
6333 *offset = minoff;
6334 return symsTable[ret].name;
6335 }
6336 #else /* HAVE_BACKTRACE */
6337 static void setupSigSegvAction(void) {
6338 }
6339 #endif /* HAVE_BACKTRACE */
6340
6341
6342
6343 /* The End */
6344
6345
6346