]> git.saurik.com Git - redis.git/blob - redis.c
SPOP implemented. Hash table resizing for Sets and Expires too. Changed the resize...
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "0.101"
31
32 #include "fmacros.h"
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 #define __USE_POSIX199309
40 #include <signal.h>
41 #include <execinfo.h>
42 #include <ucontext.h>
43 #include <sys/wait.h>
44 #include <errno.h>
45 #include <assert.h>
46 #include <ctype.h>
47 #include <stdarg.h>
48 #include <inttypes.h>
49 #include <arpa/inet.h>
50 #include <sys/stat.h>
51 #include <fcntl.h>
52 #include <sys/time.h>
53 #include <sys/resource.h>
54 #include <limits.h>
55 #include <execinfo.h>
56
57 #include "redis.h"
58 #include "ae.h" /* Event driven programming library */
59 #include "sds.h" /* Dynamic safe strings */
60 #include "anet.h" /* Networking the easy way */
61 #include "dict.h" /* Hash tables */
62 #include "adlist.h" /* Linked lists */
63 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
64 #include "lzf.h" /* LZF compression library */
65 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
66
67 #include "config.h"
68
69 /* Error codes */
70 #define REDIS_OK 0
71 #define REDIS_ERR -1
72
73 /* Static server configuration */
74 #define REDIS_SERVERPORT 6379 /* TCP port */
75 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
76 #define REDIS_IOBUF_LEN 1024
77 #define REDIS_LOADBUF_LEN 1024
78 #define REDIS_STATIC_ARGS 4
79 #define REDIS_DEFAULT_DBNUM 16
80 #define REDIS_CONFIGLINE_MAX 1024
81 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
82 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
83 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
84 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
85
86 /* Hash table parameters */
87 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
88
89 /* Command flags */
90 #define REDIS_CMD_BULK 1 /* Bulk write command */
91 #define REDIS_CMD_INLINE 2 /* Inline command */
92 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
93 this flags will return an error when the 'maxmemory' option is set in the
94 config file and the server is using more than maxmemory bytes of memory.
95 In short this commands are denied on low memory conditions. */
96 #define REDIS_CMD_DENYOOM 4
97
98 /* Object types */
99 #define REDIS_STRING 0
100 #define REDIS_LIST 1
101 #define REDIS_SET 2
102 #define REDIS_HASH 3
103
104 /* Object types only used for dumping to disk */
105 #define REDIS_EXPIRETIME 253
106 #define REDIS_SELECTDB 254
107 #define REDIS_EOF 255
108
109 /* Defines related to the dump file format. To store 32 bits lengths for short
110 * keys requires a lot of space, so we check the most significant 2 bits of
111 * the first byte to interpreter the length:
112 *
113 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
114 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
115 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
116 * 11|000000 this means: specially encoded object will follow. The six bits
117 * number specify the kind of object that follows.
118 * See the REDIS_RDB_ENC_* defines.
119 *
120 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
121 * values, will fit inside. */
122 #define REDIS_RDB_6BITLEN 0
123 #define REDIS_RDB_14BITLEN 1
124 #define REDIS_RDB_32BITLEN 2
125 #define REDIS_RDB_ENCVAL 3
126 #define REDIS_RDB_LENERR UINT_MAX
127
128 /* When a length of a string object stored on disk has the first two bits
129 * set, the remaining two bits specify a special encoding for the object
130 * accordingly to the following defines: */
131 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
132 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
133 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
134 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
135
136 /* Client flags */
137 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
138 #define REDIS_SLAVE 2 /* This client is a slave server */
139 #define REDIS_MASTER 4 /* This client is a master server */
140 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
141
142 /* Slave replication state - slave side */
143 #define REDIS_REPL_NONE 0 /* No active replication */
144 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
145 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
146
147 /* Slave replication state - from the point of view of master
148 * Note that in SEND_BULK and ONLINE state the slave receives new updates
149 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
150 * to start the next background saving in order to send updates to it. */
151 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
152 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
153 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
154 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
155
156 /* List related stuff */
157 #define REDIS_HEAD 0
158 #define REDIS_TAIL 1
159
160 /* Sort operations */
161 #define REDIS_SORT_GET 0
162 #define REDIS_SORT_DEL 1
163 #define REDIS_SORT_INCR 2
164 #define REDIS_SORT_DECR 3
165 #define REDIS_SORT_ASC 4
166 #define REDIS_SORT_DESC 5
167 #define REDIS_SORTKEY_MAX 1024
168
169 /* Log levels */
170 #define REDIS_DEBUG 0
171 #define REDIS_NOTICE 1
172 #define REDIS_WARNING 2
173
174 /* Anti-warning macro... */
175 #define REDIS_NOTUSED(V) ((void) V)
176
177
178 /*================================= Data types ============================== */
179
180 /* A redis object, that is a type able to hold a string / list / set */
181 typedef struct redisObject {
182 void *ptr;
183 int type;
184 int refcount;
185 } robj;
186
187 typedef struct redisDb {
188 dict *dict;
189 dict *expires;
190 int id;
191 } redisDb;
192
193 /* With multiplexing we need to take per-clinet state.
194 * Clients are taken in a liked list. */
195 typedef struct redisClient {
196 int fd;
197 redisDb *db;
198 int dictid;
199 sds querybuf;
200 robj **argv;
201 int argc;
202 int bulklen; /* bulk read len. -1 if not in bulk read mode */
203 list *reply;
204 int sentlen;
205 time_t lastinteraction; /* time of the last interaction, used for timeout */
206 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
207 int slaveseldb; /* slave selected db, if this client is a slave */
208 int authenticated; /* when requirepass is non-NULL */
209 int replstate; /* replication state if this is a slave */
210 int repldbfd; /* replication DB file descriptor */
211 long repldboff; /* replication DB file offset */
212 off_t repldbsize; /* replication DB file size */
213 } redisClient;
214
215 struct saveparam {
216 time_t seconds;
217 int changes;
218 };
219
220 /* Global server state structure */
221 struct redisServer {
222 int port;
223 int fd;
224 redisDb *db;
225 dict *sharingpool;
226 unsigned int sharingpoolsize;
227 long long dirty; /* changes to DB from the last save */
228 list *clients;
229 list *slaves, *monitors;
230 char neterr[ANET_ERR_LEN];
231 aeEventLoop *el;
232 int cronloops; /* number of times the cron function run */
233 list *objfreelist; /* A list of freed objects to avoid malloc() */
234 time_t lastsave; /* Unix time of last save succeeede */
235 size_t usedmemory; /* Used memory in megabytes */
236 /* Fields used only for stats */
237 time_t stat_starttime; /* server start time */
238 long long stat_numcommands; /* number of processed commands */
239 long long stat_numconnections; /* number of connections received */
240 /* Configuration */
241 int verbosity;
242 int glueoutputbuf;
243 int maxidletime;
244 int dbnum;
245 int daemonize;
246 char *pidfile;
247 int bgsaveinprogress;
248 pid_t bgsavechildpid;
249 struct saveparam *saveparams;
250 int saveparamslen;
251 char *logfile;
252 char *bindaddr;
253 char *dbfilename;
254 char *requirepass;
255 int shareobjects;
256 /* Replication related */
257 int isslave;
258 char *masterhost;
259 int masterport;
260 redisClient *master; /* client that is master for this slave */
261 int replstate;
262 unsigned int maxclients;
263 unsigned int maxmemory;
264 /* Sort parameters - qsort_r() is only available under BSD so we
265 * have to take this state global, in order to pass it to sortCompare() */
266 int sort_desc;
267 int sort_alpha;
268 int sort_bypattern;
269 };
270
271 typedef void redisCommandProc(redisClient *c);
272 struct redisCommand {
273 char *name;
274 redisCommandProc *proc;
275 int arity;
276 int flags;
277 };
278
279 struct redisFunctionSym {
280 char *name;
281 unsigned long pointer;
282 };
283
284 typedef struct _redisSortObject {
285 robj *obj;
286 union {
287 double score;
288 robj *cmpobj;
289 } u;
290 } redisSortObject;
291
292 typedef struct _redisSortOperation {
293 int type;
294 robj *pattern;
295 } redisSortOperation;
296
297 struct sharedObjectsStruct {
298 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
299 *colon, *nullbulk, *nullmultibulk,
300 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
301 *outofrangeerr, *plus,
302 *select0, *select1, *select2, *select3, *select4,
303 *select5, *select6, *select7, *select8, *select9;
304 } shared;
305
306 /*================================ Prototypes =============================== */
307
308 static void freeStringObject(robj *o);
309 static void freeListObject(robj *o);
310 static void freeSetObject(robj *o);
311 static void decrRefCount(void *o);
312 static robj *createObject(int type, void *ptr);
313 static void freeClient(redisClient *c);
314 static int rdbLoad(char *filename);
315 static void addReply(redisClient *c, robj *obj);
316 static void addReplySds(redisClient *c, sds s);
317 static void incrRefCount(robj *o);
318 static int rdbSaveBackground(char *filename);
319 static robj *createStringObject(char *ptr, size_t len);
320 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
321 static int syncWithMaster(void);
322 static robj *tryObjectSharing(robj *o);
323 static int removeExpire(redisDb *db, robj *key);
324 static int expireIfNeeded(redisDb *db, robj *key);
325 static int deleteIfVolatile(redisDb *db, robj *key);
326 static int deleteKey(redisDb *db, robj *key);
327 static time_t getExpire(redisDb *db, robj *key);
328 static int setExpire(redisDb *db, robj *key, time_t when);
329 static void updateSalvesWaitingBgsave(int bgsaveerr);
330 static void freeMemoryIfNeeded(void);
331 static int processCommand(redisClient *c);
332 static void setupSigSegvAction(void);
333
334 static void authCommand(redisClient *c);
335 static void pingCommand(redisClient *c);
336 static void echoCommand(redisClient *c);
337 static void setCommand(redisClient *c);
338 static void setnxCommand(redisClient *c);
339 static void getCommand(redisClient *c);
340 static void delCommand(redisClient *c);
341 static void existsCommand(redisClient *c);
342 static void incrCommand(redisClient *c);
343 static void decrCommand(redisClient *c);
344 static void incrbyCommand(redisClient *c);
345 static void decrbyCommand(redisClient *c);
346 static void selectCommand(redisClient *c);
347 static void randomkeyCommand(redisClient *c);
348 static void keysCommand(redisClient *c);
349 static void dbsizeCommand(redisClient *c);
350 static void lastsaveCommand(redisClient *c);
351 static void saveCommand(redisClient *c);
352 static void bgsaveCommand(redisClient *c);
353 static void shutdownCommand(redisClient *c);
354 static void moveCommand(redisClient *c);
355 static void renameCommand(redisClient *c);
356 static void renamenxCommand(redisClient *c);
357 static void lpushCommand(redisClient *c);
358 static void rpushCommand(redisClient *c);
359 static void lpopCommand(redisClient *c);
360 static void rpopCommand(redisClient *c);
361 static void llenCommand(redisClient *c);
362 static void lindexCommand(redisClient *c);
363 static void lrangeCommand(redisClient *c);
364 static void ltrimCommand(redisClient *c);
365 static void typeCommand(redisClient *c);
366 static void lsetCommand(redisClient *c);
367 static void saddCommand(redisClient *c);
368 static void sremCommand(redisClient *c);
369 static void smoveCommand(redisClient *c);
370 static void sismemberCommand(redisClient *c);
371 static void scardCommand(redisClient *c);
372 static void spopCommand(redisClient *c);
373 static void sinterCommand(redisClient *c);
374 static void sinterstoreCommand(redisClient *c);
375 static void sunionCommand(redisClient *c);
376 static void sunionstoreCommand(redisClient *c);
377 static void sdiffCommand(redisClient *c);
378 static void sdiffstoreCommand(redisClient *c);
379 static void syncCommand(redisClient *c);
380 static void flushdbCommand(redisClient *c);
381 static void flushallCommand(redisClient *c);
382 static void sortCommand(redisClient *c);
383 static void lremCommand(redisClient *c);
384 static void infoCommand(redisClient *c);
385 static void mgetCommand(redisClient *c);
386 static void monitorCommand(redisClient *c);
387 static void expireCommand(redisClient *c);
388 static void getSetCommand(redisClient *c);
389 static void ttlCommand(redisClient *c);
390 static void slaveofCommand(redisClient *c);
391 static void debugCommand(redisClient *c);
392 /*================================= Globals ================================= */
393
394 /* Global vars */
395 static struct redisServer server; /* server global state */
396 static struct redisCommand cmdTable[] = {
397 {"get",getCommand,2,REDIS_CMD_INLINE},
398 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
399 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
400 {"del",delCommand,-2,REDIS_CMD_INLINE},
401 {"exists",existsCommand,2,REDIS_CMD_INLINE},
402 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
403 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
404 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
405 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
406 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
407 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
408 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
409 {"llen",llenCommand,2,REDIS_CMD_INLINE},
410 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
411 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
412 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
413 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
414 {"lrem",lremCommand,4,REDIS_CMD_BULK},
415 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
416 {"srem",sremCommand,3,REDIS_CMD_BULK},
417 {"smove",smoveCommand,4,REDIS_CMD_BULK},
418 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
419 {"scard",scardCommand,2,REDIS_CMD_INLINE},
420 {"spop",spopCommand,2,REDIS_CMD_INLINE},
421 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
422 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
423 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
424 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
425 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
426 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
427 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
428 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
429 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
430 {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
431 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
432 {"select",selectCommand,2,REDIS_CMD_INLINE},
433 {"move",moveCommand,3,REDIS_CMD_INLINE},
434 {"rename",renameCommand,3,REDIS_CMD_INLINE},
435 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
436 {"expire",expireCommand,3,REDIS_CMD_INLINE},
437 {"keys",keysCommand,2,REDIS_CMD_INLINE},
438 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
439 {"auth",authCommand,2,REDIS_CMD_INLINE},
440 {"ping",pingCommand,1,REDIS_CMD_INLINE},
441 {"echo",echoCommand,2,REDIS_CMD_BULK},
442 {"save",saveCommand,1,REDIS_CMD_INLINE},
443 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
444 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
445 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
446 {"type",typeCommand,2,REDIS_CMD_INLINE},
447 {"sync",syncCommand,1,REDIS_CMD_INLINE},
448 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
449 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
450 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
451 {"info",infoCommand,1,REDIS_CMD_INLINE},
452 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
453 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
454 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
455 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
456 {NULL,NULL,0,0}
457 };
458 /*============================ Utility functions ============================ */
459
460 /* Glob-style pattern matching. */
461 int stringmatchlen(const char *pattern, int patternLen,
462 const char *string, int stringLen, int nocase)
463 {
464 while(patternLen) {
465 switch(pattern[0]) {
466 case '*':
467 while (pattern[1] == '*') {
468 pattern++;
469 patternLen--;
470 }
471 if (patternLen == 1)
472 return 1; /* match */
473 while(stringLen) {
474 if (stringmatchlen(pattern+1, patternLen-1,
475 string, stringLen, nocase))
476 return 1; /* match */
477 string++;
478 stringLen--;
479 }
480 return 0; /* no match */
481 break;
482 case '?':
483 if (stringLen == 0)
484 return 0; /* no match */
485 string++;
486 stringLen--;
487 break;
488 case '[':
489 {
490 int not, match;
491
492 pattern++;
493 patternLen--;
494 not = pattern[0] == '^';
495 if (not) {
496 pattern++;
497 patternLen--;
498 }
499 match = 0;
500 while(1) {
501 if (pattern[0] == '\\') {
502 pattern++;
503 patternLen--;
504 if (pattern[0] == string[0])
505 match = 1;
506 } else if (pattern[0] == ']') {
507 break;
508 } else if (patternLen == 0) {
509 pattern--;
510 patternLen++;
511 break;
512 } else if (pattern[1] == '-' && patternLen >= 3) {
513 int start = pattern[0];
514 int end = pattern[2];
515 int c = string[0];
516 if (start > end) {
517 int t = start;
518 start = end;
519 end = t;
520 }
521 if (nocase) {
522 start = tolower(start);
523 end = tolower(end);
524 c = tolower(c);
525 }
526 pattern += 2;
527 patternLen -= 2;
528 if (c >= start && c <= end)
529 match = 1;
530 } else {
531 if (!nocase) {
532 if (pattern[0] == string[0])
533 match = 1;
534 } else {
535 if (tolower((int)pattern[0]) == tolower((int)string[0]))
536 match = 1;
537 }
538 }
539 pattern++;
540 patternLen--;
541 }
542 if (not)
543 match = !match;
544 if (!match)
545 return 0; /* no match */
546 string++;
547 stringLen--;
548 break;
549 }
550 case '\\':
551 if (patternLen >= 2) {
552 pattern++;
553 patternLen--;
554 }
555 /* fall through */
556 default:
557 if (!nocase) {
558 if (pattern[0] != string[0])
559 return 0; /* no match */
560 } else {
561 if (tolower((int)pattern[0]) != tolower((int)string[0]))
562 return 0; /* no match */
563 }
564 string++;
565 stringLen--;
566 break;
567 }
568 pattern++;
569 patternLen--;
570 if (stringLen == 0) {
571 while(*pattern == '*') {
572 pattern++;
573 patternLen--;
574 }
575 break;
576 }
577 }
578 if (patternLen == 0 && stringLen == 0)
579 return 1;
580 return 0;
581 }
582
583 static void redisLog(int level, const char *fmt, ...) {
584 va_list ap;
585 FILE *fp;
586
587 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
588 if (!fp) return;
589
590 va_start(ap, fmt);
591 if (level >= server.verbosity) {
592 char *c = ".-*";
593 char buf[64];
594 time_t now;
595
596 now = time(NULL);
597 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
598 fprintf(fp,"%s %c ",buf,c[level]);
599 vfprintf(fp, fmt, ap);
600 fprintf(fp,"\n");
601 fflush(fp);
602 }
603 va_end(ap);
604
605 if (server.logfile) fclose(fp);
606 }
607
608 /*====================== Hash table type implementation ==================== */
609
610 /* This is an hash table type that uses the SDS dynamic strings libary as
611 * keys and radis objects as values (objects can hold SDS strings,
612 * lists, sets). */
613
614 static int sdsDictKeyCompare(void *privdata, const void *key1,
615 const void *key2)
616 {
617 int l1,l2;
618 DICT_NOTUSED(privdata);
619
620 l1 = sdslen((sds)key1);
621 l2 = sdslen((sds)key2);
622 if (l1 != l2) return 0;
623 return memcmp(key1, key2, l1) == 0;
624 }
625
626 static void dictRedisObjectDestructor(void *privdata, void *val)
627 {
628 DICT_NOTUSED(privdata);
629
630 decrRefCount(val);
631 }
632
633 static int dictSdsKeyCompare(void *privdata, const void *key1,
634 const void *key2)
635 {
636 const robj *o1 = key1, *o2 = key2;
637 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
638 }
639
640 static unsigned int dictSdsHash(const void *key) {
641 const robj *o = key;
642 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
643 }
644
645 static dictType setDictType = {
646 dictSdsHash, /* hash function */
647 NULL, /* key dup */
648 NULL, /* val dup */
649 dictSdsKeyCompare, /* key compare */
650 dictRedisObjectDestructor, /* key destructor */
651 NULL /* val destructor */
652 };
653
654 static dictType hashDictType = {
655 dictSdsHash, /* hash function */
656 NULL, /* key dup */
657 NULL, /* val dup */
658 dictSdsKeyCompare, /* key compare */
659 dictRedisObjectDestructor, /* key destructor */
660 dictRedisObjectDestructor /* val destructor */
661 };
662
663 /* ========================= Random utility functions ======================= */
664
665 /* Redis generally does not try to recover from out of memory conditions
666 * when allocating objects or strings, it is not clear if it will be possible
667 * to report this condition to the client since the networking layer itself
668 * is based on heap allocation for send buffers, so we simply abort.
669 * At least the code will be simpler to read... */
670 static void oom(const char *msg) {
671 fprintf(stderr, "%s: Out of memory\n",msg);
672 fflush(stderr);
673 sleep(1);
674 abort();
675 }
676
677 /* ====================== Redis server networking stuff ===================== */
678 static void closeTimedoutClients(void) {
679 redisClient *c;
680 listNode *ln;
681 time_t now = time(NULL);
682
683 listRewind(server.clients);
684 while ((ln = listYield(server.clients)) != NULL) {
685 c = listNodeValue(ln);
686 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
687 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
688 (now - c->lastinteraction > server.maxidletime)) {
689 redisLog(REDIS_DEBUG,"Closing idle client");
690 freeClient(c);
691 }
692 }
693 }
694
695 static int htNeedsResize(dict *dict) {
696 long long size, used;
697
698 size = dictSlots(dict);
699 used = dictSize(dict);
700 return (size && used && size > DICT_HT_INITIAL_SIZE &&
701 (used*100/size < REDIS_HT_MINFILL));
702 }
703
704 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
705 * we resize the hash table to save memory */
706 static void tryResizeHashTables(void) {
707 int j;
708
709 for (j = 0; j < server.dbnum; j++) {
710 if (htNeedsResize(server.db[j].dict)) {
711 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
712 dictResize(server.db[j].dict);
713 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
714 }
715 if (htNeedsResize(server.db[j].expires))
716 dictResize(server.db[j].expires);
717 }
718 }
719
720 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
721 int j, loops = server.cronloops++;
722 REDIS_NOTUSED(eventLoop);
723 REDIS_NOTUSED(id);
724 REDIS_NOTUSED(clientData);
725
726 /* Update the global state with the amount of used memory */
727 server.usedmemory = zmalloc_used_memory();
728
729 /* Show some info about non-empty databases */
730 for (j = 0; j < server.dbnum; j++) {
731 long long size, used, vkeys;
732
733 size = dictSlots(server.db[j].dict);
734 used = dictSize(server.db[j].dict);
735 vkeys = dictSize(server.db[j].expires);
736 if (!(loops % 5) && used > 0) {
737 redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size);
738 /* dictPrintStats(server.dict); */
739 }
740 }
741
742 /* We don't want to resize the hash tables while a bacground saving
743 * is in progress: the saving child is created using fork() that is
744 * implemented with a copy-on-write semantic in most modern systems, so
745 * if we resize the HT while there is the saving child at work actually
746 * a lot of memory movements in the parent will cause a lot of pages
747 * copied. */
748 if (!server.bgsaveinprogress) tryResizeHashTables();
749
750 /* Show information about connected clients */
751 if (!(loops % 5)) {
752 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use",
753 listLength(server.clients)-listLength(server.slaves),
754 listLength(server.slaves),
755 server.usedmemory,
756 dictSize(server.sharingpool));
757 }
758
759 /* Close connections of timedout clients */
760 if (server.maxidletime && !(loops % 10))
761 closeTimedoutClients();
762
763 /* Check if a background saving in progress terminated */
764 if (server.bgsaveinprogress) {
765 int statloc;
766 /* XXX: TODO handle the case of the saving child killed */
767 if (wait4(-1,&statloc,WNOHANG,NULL)) {
768 int exitcode = WEXITSTATUS(statloc);
769 int bysignal = WIFSIGNALED(statloc);
770
771 if (!bysignal && exitcode == 0) {
772 redisLog(REDIS_NOTICE,
773 "Background saving terminated with success");
774 server.dirty = 0;
775 server.lastsave = time(NULL);
776 } else if (!bysignal && exitcode != 0) {
777 redisLog(REDIS_WARNING, "Background saving error");
778 } else {
779 redisLog(REDIS_WARNING,
780 "Background saving terminated by signal");
781 }
782 server.bgsaveinprogress = 0;
783 server.bgsavechildpid = -1;
784 updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
785 }
786 } else {
787 /* If there is not a background saving in progress check if
788 * we have to save now */
789 time_t now = time(NULL);
790 for (j = 0; j < server.saveparamslen; j++) {
791 struct saveparam *sp = server.saveparams+j;
792
793 if (server.dirty >= sp->changes &&
794 now-server.lastsave > sp->seconds) {
795 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
796 sp->changes, sp->seconds);
797 rdbSaveBackground(server.dbfilename);
798 break;
799 }
800 }
801 }
802
803 /* Try to expire a few timed out keys */
804 for (j = 0; j < server.dbnum; j++) {
805 redisDb *db = server.db+j;
806 int num = dictSize(db->expires);
807
808 if (num) {
809 time_t now = time(NULL);
810
811 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
812 num = REDIS_EXPIRELOOKUPS_PER_CRON;
813 while (num--) {
814 dictEntry *de;
815 time_t t;
816
817 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
818 t = (time_t) dictGetEntryVal(de);
819 if (now > t) {
820 deleteKey(db,dictGetEntryKey(de));
821 }
822 }
823 }
824 }
825
826 /* Check if we should connect to a MASTER */
827 if (server.replstate == REDIS_REPL_CONNECT) {
828 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
829 if (syncWithMaster() == REDIS_OK) {
830 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
831 }
832 }
833 return 1000;
834 }
835
836 static void createSharedObjects(void) {
837 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
838 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
839 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
840 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
841 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
842 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
843 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
844 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
845 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
846 /* no such key */
847 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
848 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
849 "-ERR Operation against a key holding the wrong kind of value\r\n"));
850 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
851 "-ERR no such key\r\n"));
852 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
853 "-ERR syntax error\r\n"));
854 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
855 "-ERR source and destination objects are the same\r\n"));
856 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
857 "-ERR index out of range\r\n"));
858 shared.space = createObject(REDIS_STRING,sdsnew(" "));
859 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
860 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
861 shared.select0 = createStringObject("select 0\r\n",10);
862 shared.select1 = createStringObject("select 1\r\n",10);
863 shared.select2 = createStringObject("select 2\r\n",10);
864 shared.select3 = createStringObject("select 3\r\n",10);
865 shared.select4 = createStringObject("select 4\r\n",10);
866 shared.select5 = createStringObject("select 5\r\n",10);
867 shared.select6 = createStringObject("select 6\r\n",10);
868 shared.select7 = createStringObject("select 7\r\n",10);
869 shared.select8 = createStringObject("select 8\r\n",10);
870 shared.select9 = createStringObject("select 9\r\n",10);
871 }
872
873 static void appendServerSaveParams(time_t seconds, int changes) {
874 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
875 if (server.saveparams == NULL) oom("appendServerSaveParams");
876 server.saveparams[server.saveparamslen].seconds = seconds;
877 server.saveparams[server.saveparamslen].changes = changes;
878 server.saveparamslen++;
879 }
880
881 static void ResetServerSaveParams() {
882 zfree(server.saveparams);
883 server.saveparams = NULL;
884 server.saveparamslen = 0;
885 }
886
887 static void initServerConfig() {
888 server.dbnum = REDIS_DEFAULT_DBNUM;
889 server.port = REDIS_SERVERPORT;
890 server.verbosity = REDIS_DEBUG;
891 server.maxidletime = REDIS_MAXIDLETIME;
892 server.saveparams = NULL;
893 server.logfile = NULL; /* NULL = log on standard output */
894 server.bindaddr = NULL;
895 server.glueoutputbuf = 1;
896 server.daemonize = 0;
897 server.pidfile = "/var/run/redis.pid";
898 server.dbfilename = "dump.rdb";
899 server.requirepass = NULL;
900 server.shareobjects = 0;
901 server.maxclients = 0;
902 server.maxmemory = 0;
903 ResetServerSaveParams();
904
905 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
906 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
907 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
908 /* Replication related */
909 server.isslave = 0;
910 server.masterhost = NULL;
911 server.masterport = 6379;
912 server.master = NULL;
913 server.replstate = REDIS_REPL_NONE;
914 }
915
916 static void initServer() {
917 int j;
918
919 signal(SIGHUP, SIG_IGN);
920 signal(SIGPIPE, SIG_IGN);
921 setupSigSegvAction();
922
923 server.clients = listCreate();
924 server.slaves = listCreate();
925 server.monitors = listCreate();
926 server.objfreelist = listCreate();
927 createSharedObjects();
928 server.el = aeCreateEventLoop();
929 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
930 server.sharingpool = dictCreate(&setDictType,NULL);
931 server.sharingpoolsize = 1024;
932 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
933 oom("server initialization"); /* Fatal OOM */
934 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
935 if (server.fd == -1) {
936 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
937 exit(1);
938 }
939 for (j = 0; j < server.dbnum; j++) {
940 server.db[j].dict = dictCreate(&hashDictType,NULL);
941 server.db[j].expires = dictCreate(&setDictType,NULL);
942 server.db[j].id = j;
943 }
944 server.cronloops = 0;
945 server.bgsaveinprogress = 0;
946 server.bgsavechildpid = -1;
947 server.lastsave = time(NULL);
948 server.dirty = 0;
949 server.usedmemory = 0;
950 server.stat_numcommands = 0;
951 server.stat_numconnections = 0;
952 server.stat_starttime = time(NULL);
953 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
954 }
955
956 /* Empty the whole database */
957 static long long emptyDb() {
958 int j;
959 long long removed = 0;
960
961 for (j = 0; j < server.dbnum; j++) {
962 removed += dictSize(server.db[j].dict);
963 dictEmpty(server.db[j].dict);
964 dictEmpty(server.db[j].expires);
965 }
966 return removed;
967 }
968
969 static int yesnotoi(char *s) {
970 if (!strcasecmp(s,"yes")) return 1;
971 else if (!strcasecmp(s,"no")) return 0;
972 else return -1;
973 }
974
975 /* I agree, this is a very rudimental way to load a configuration...
976 will improve later if the config gets more complex */
977 static void loadServerConfig(char *filename) {
978 FILE *fp = fopen(filename,"r");
979 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
980 int linenum = 0;
981 sds line = NULL;
982
983 if (!fp) {
984 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
985 exit(1);
986 }
987 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
988 sds *argv;
989 int argc, j;
990
991 linenum++;
992 line = sdsnew(buf);
993 line = sdstrim(line," \t\r\n");
994
995 /* Skip comments and blank lines*/
996 if (line[0] == '#' || line[0] == '\0') {
997 sdsfree(line);
998 continue;
999 }
1000
1001 /* Split into arguments */
1002 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1003 sdstolower(argv[0]);
1004
1005 /* Execute config directives */
1006 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1007 server.maxidletime = atoi(argv[1]);
1008 if (server.maxidletime < 0) {
1009 err = "Invalid timeout value"; goto loaderr;
1010 }
1011 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1012 server.port = atoi(argv[1]);
1013 if (server.port < 1 || server.port > 65535) {
1014 err = "Invalid port"; goto loaderr;
1015 }
1016 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1017 server.bindaddr = zstrdup(argv[1]);
1018 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1019 int seconds = atoi(argv[1]);
1020 int changes = atoi(argv[2]);
1021 if (seconds < 1 || changes < 0) {
1022 err = "Invalid save parameters"; goto loaderr;
1023 }
1024 appendServerSaveParams(seconds,changes);
1025 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1026 if (chdir(argv[1]) == -1) {
1027 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1028 argv[1], strerror(errno));
1029 exit(1);
1030 }
1031 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1032 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1033 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1034 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1035 else {
1036 err = "Invalid log level. Must be one of debug, notice, warning";
1037 goto loaderr;
1038 }
1039 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1040 FILE *fp;
1041
1042 server.logfile = zstrdup(argv[1]);
1043 if (!strcasecmp(server.logfile,"stdout")) {
1044 zfree(server.logfile);
1045 server.logfile = NULL;
1046 }
1047 if (server.logfile) {
1048 /* Test if we are able to open the file. The server will not
1049 * be able to abort just for this problem later... */
1050 fp = fopen(server.logfile,"a");
1051 if (fp == NULL) {
1052 err = sdscatprintf(sdsempty(),
1053 "Can't open the log file: %s", strerror(errno));
1054 goto loaderr;
1055 }
1056 fclose(fp);
1057 }
1058 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1059 server.dbnum = atoi(argv[1]);
1060 if (server.dbnum < 1) {
1061 err = "Invalid number of databases"; goto loaderr;
1062 }
1063 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1064 server.maxclients = atoi(argv[1]);
1065 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1066 server.maxmemory = atoi(argv[1]);
1067 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1068 server.masterhost = sdsnew(argv[1]);
1069 server.masterport = atoi(argv[2]);
1070 server.replstate = REDIS_REPL_CONNECT;
1071 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1072 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1073 err = "argument must be 'yes' or 'no'"; goto loaderr;
1074 }
1075 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1076 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1077 err = "argument must be 'yes' or 'no'"; goto loaderr;
1078 }
1079 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1080 server.sharingpoolsize = atoi(argv[1]);
1081 if (server.sharingpoolsize < 1) {
1082 err = "invalid object sharing pool size"; goto loaderr;
1083 }
1084 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1085 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1086 err = "argument must be 'yes' or 'no'"; goto loaderr;
1087 }
1088 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1089 server.requirepass = zstrdup(argv[1]);
1090 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1091 server.pidfile = zstrdup(argv[1]);
1092 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1093 server.dbfilename = zstrdup(argv[1]);
1094 } else {
1095 err = "Bad directive or wrong number of arguments"; goto loaderr;
1096 }
1097 for (j = 0; j < argc; j++)
1098 sdsfree(argv[j]);
1099 zfree(argv);
1100 sdsfree(line);
1101 }
1102 fclose(fp);
1103 return;
1104
1105 loaderr:
1106 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1107 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1108 fprintf(stderr, ">>> '%s'\n", line);
1109 fprintf(stderr, "%s\n", err);
1110 exit(1);
1111 }
1112
1113 static void freeClientArgv(redisClient *c) {
1114 int j;
1115
1116 for (j = 0; j < c->argc; j++)
1117 decrRefCount(c->argv[j]);
1118 c->argc = 0;
1119 }
1120
1121 static void freeClient(redisClient *c) {
1122 listNode *ln;
1123
1124 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1125 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1126 sdsfree(c->querybuf);
1127 listRelease(c->reply);
1128 freeClientArgv(c);
1129 close(c->fd);
1130 ln = listSearchKey(server.clients,c);
1131 assert(ln != NULL);
1132 listDelNode(server.clients,ln);
1133 if (c->flags & REDIS_SLAVE) {
1134 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1135 close(c->repldbfd);
1136 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1137 ln = listSearchKey(l,c);
1138 assert(ln != NULL);
1139 listDelNode(l,ln);
1140 }
1141 if (c->flags & REDIS_MASTER) {
1142 server.master = NULL;
1143 server.replstate = REDIS_REPL_CONNECT;
1144 }
1145 zfree(c->argv);
1146 zfree(c);
1147 }
1148
1149 static void glueReplyBuffersIfNeeded(redisClient *c) {
1150 int totlen = 0;
1151 listNode *ln;
1152 robj *o;
1153
1154 listRewind(c->reply);
1155 while((ln = listYield(c->reply))) {
1156 o = ln->value;
1157 totlen += sdslen(o->ptr);
1158 /* This optimization makes more sense if we don't have to copy
1159 * too much data */
1160 if (totlen > 1024) return;
1161 }
1162 if (totlen > 0) {
1163 char buf[1024];
1164 int copylen = 0;
1165
1166 listRewind(c->reply);
1167 while((ln = listYield(c->reply))) {
1168 o = ln->value;
1169 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1170 copylen += sdslen(o->ptr);
1171 listDelNode(c->reply,ln);
1172 }
1173 /* Now the output buffer is empty, add the new single element */
1174 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1175 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1176 }
1177 }
1178
1179 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1180 redisClient *c = privdata;
1181 int nwritten = 0, totwritten = 0, objlen;
1182 robj *o;
1183 REDIS_NOTUSED(el);
1184 REDIS_NOTUSED(mask);
1185
1186 if (server.glueoutputbuf && listLength(c->reply) > 1)
1187 glueReplyBuffersIfNeeded(c);
1188 while(listLength(c->reply)) {
1189 o = listNodeValue(listFirst(c->reply));
1190 objlen = sdslen(o->ptr);
1191
1192 if (objlen == 0) {
1193 listDelNode(c->reply,listFirst(c->reply));
1194 continue;
1195 }
1196
1197 if (c->flags & REDIS_MASTER) {
1198 /* Don't reply to a master */
1199 nwritten = objlen - c->sentlen;
1200 } else {
1201 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1202 if (nwritten <= 0) break;
1203 }
1204 c->sentlen += nwritten;
1205 totwritten += nwritten;
1206 /* If we fully sent the object on head go to the next one */
1207 if (c->sentlen == objlen) {
1208 listDelNode(c->reply,listFirst(c->reply));
1209 c->sentlen = 0;
1210 }
1211 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1212 * bytes, in a single threaded server it's a good idea to server
1213 * other clients as well, even if a very large request comes from
1214 * super fast link that is always able to accept data (in real world
1215 * terms think to 'KEYS *' against the loopback interfae) */
1216 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1217 }
1218 if (nwritten == -1) {
1219 if (errno == EAGAIN) {
1220 nwritten = 0;
1221 } else {
1222 redisLog(REDIS_DEBUG,
1223 "Error writing to client: %s", strerror(errno));
1224 freeClient(c);
1225 return;
1226 }
1227 }
1228 if (totwritten > 0) c->lastinteraction = time(NULL);
1229 if (listLength(c->reply) == 0) {
1230 c->sentlen = 0;
1231 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1232 }
1233 }
1234
1235 static struct redisCommand *lookupCommand(char *name) {
1236 int j = 0;
1237 while(cmdTable[j].name != NULL) {
1238 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1239 j++;
1240 }
1241 return NULL;
1242 }
1243
1244 /* resetClient prepare the client to process the next command */
1245 static void resetClient(redisClient *c) {
1246 freeClientArgv(c);
1247 c->bulklen = -1;
1248 }
1249
1250 /* If this function gets called we already read a whole
1251 * command, argments are in the client argv/argc fields.
1252 * processCommand() execute the command or prepare the
1253 * server for a bulk read from the client.
1254 *
1255 * If 1 is returned the client is still alive and valid and
1256 * and other operations can be performed by the caller. Otherwise
1257 * if 0 is returned the client was destroied (i.e. after QUIT). */
1258 static int processCommand(redisClient *c) {
1259 struct redisCommand *cmd;
1260 long long dirty;
1261
1262 /* Free some memory if needed (maxmemory setting) */
1263 if (server.maxmemory) freeMemoryIfNeeded();
1264
1265 /* The QUIT command is handled as a special case. Normal command
1266 * procs are unable to close the client connection safely */
1267 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1268 freeClient(c);
1269 return 0;
1270 }
1271 cmd = lookupCommand(c->argv[0]->ptr);
1272 if (!cmd) {
1273 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1274 resetClient(c);
1275 return 1;
1276 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1277 (c->argc < -cmd->arity)) {
1278 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1279 resetClient(c);
1280 return 1;
1281 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1282 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1283 resetClient(c);
1284 return 1;
1285 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1286 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1287
1288 decrRefCount(c->argv[c->argc-1]);
1289 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1290 c->argc--;
1291 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1292 resetClient(c);
1293 return 1;
1294 }
1295 c->argc--;
1296 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1297 /* It is possible that the bulk read is already in the
1298 * buffer. Check this condition and handle it accordingly */
1299 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1300 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1301 c->argc++;
1302 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1303 } else {
1304 return 1;
1305 }
1306 }
1307 /* Let's try to share objects on the command arguments vector */
1308 if (server.shareobjects) {
1309 int j;
1310 for(j = 1; j < c->argc; j++)
1311 c->argv[j] = tryObjectSharing(c->argv[j]);
1312 }
1313 /* Check if the user is authenticated */
1314 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1315 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1316 resetClient(c);
1317 return 1;
1318 }
1319
1320 /* Exec the command */
1321 dirty = server.dirty;
1322 cmd->proc(c);
1323 if (server.dirty-dirty != 0 && listLength(server.slaves))
1324 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1325 if (listLength(server.monitors))
1326 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1327 server.stat_numcommands++;
1328
1329 /* Prepare the client for the next command */
1330 if (c->flags & REDIS_CLOSE) {
1331 freeClient(c);
1332 return 0;
1333 }
1334 resetClient(c);
1335 return 1;
1336 }
1337
1338 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1339 listNode *ln;
1340 int outc = 0, j;
1341 robj **outv;
1342 /* (args*2)+1 is enough room for args, spaces, newlines */
1343 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1344
1345 if (argc <= REDIS_STATIC_ARGS) {
1346 outv = static_outv;
1347 } else {
1348 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1349 if (!outv) oom("replicationFeedSlaves");
1350 }
1351
1352 for (j = 0; j < argc; j++) {
1353 if (j != 0) outv[outc++] = shared.space;
1354 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1355 robj *lenobj;
1356
1357 lenobj = createObject(REDIS_STRING,
1358 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv[j]->ptr)));
1359 lenobj->refcount = 0;
1360 outv[outc++] = lenobj;
1361 }
1362 outv[outc++] = argv[j];
1363 }
1364 outv[outc++] = shared.crlf;
1365
1366 /* Increment all the refcounts at start and decrement at end in order to
1367 * be sure to free objects if there is no slave in a replication state
1368 * able to be feed with commands */
1369 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1370 listRewind(slaves);
1371 while((ln = listYield(slaves))) {
1372 redisClient *slave = ln->value;
1373
1374 /* Don't feed slaves that are still waiting for BGSAVE to start */
1375 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1376
1377 /* Feed all the other slaves, MONITORs and so on */
1378 if (slave->slaveseldb != dictid) {
1379 robj *selectcmd;
1380
1381 switch(dictid) {
1382 case 0: selectcmd = shared.select0; break;
1383 case 1: selectcmd = shared.select1; break;
1384 case 2: selectcmd = shared.select2; break;
1385 case 3: selectcmd = shared.select3; break;
1386 case 4: selectcmd = shared.select4; break;
1387 case 5: selectcmd = shared.select5; break;
1388 case 6: selectcmd = shared.select6; break;
1389 case 7: selectcmd = shared.select7; break;
1390 case 8: selectcmd = shared.select8; break;
1391 case 9: selectcmd = shared.select9; break;
1392 default:
1393 selectcmd = createObject(REDIS_STRING,
1394 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1395 selectcmd->refcount = 0;
1396 break;
1397 }
1398 addReply(slave,selectcmd);
1399 slave->slaveseldb = dictid;
1400 }
1401 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1402 }
1403 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1404 if (outv != static_outv) zfree(outv);
1405 }
1406
1407 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1408 redisClient *c = (redisClient*) privdata;
1409 char buf[REDIS_IOBUF_LEN];
1410 int nread;
1411 REDIS_NOTUSED(el);
1412 REDIS_NOTUSED(mask);
1413
1414 nread = read(fd, buf, REDIS_IOBUF_LEN);
1415 if (nread == -1) {
1416 if (errno == EAGAIN) {
1417 nread = 0;
1418 } else {
1419 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1420 freeClient(c);
1421 return;
1422 }
1423 } else if (nread == 0) {
1424 redisLog(REDIS_DEBUG, "Client closed connection");
1425 freeClient(c);
1426 return;
1427 }
1428 if (nread) {
1429 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1430 c->lastinteraction = time(NULL);
1431 } else {
1432 return;
1433 }
1434
1435 again:
1436 if (c->bulklen == -1) {
1437 /* Read the first line of the query */
1438 char *p = strchr(c->querybuf,'\n');
1439 size_t querylen;
1440 if (p) {
1441 sds query, *argv;
1442 int argc, j;
1443
1444 query = c->querybuf;
1445 c->querybuf = sdsempty();
1446 querylen = 1+(p-(query));
1447 if (sdslen(query) > querylen) {
1448 /* leave data after the first line of the query in the buffer */
1449 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1450 }
1451 *p = '\0'; /* remove "\n" */
1452 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1453 sdsupdatelen(query);
1454
1455 /* Now we can split the query in arguments */
1456 if (sdslen(query) == 0) {
1457 /* Ignore empty query */
1458 sdsfree(query);
1459 return;
1460 }
1461 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1462 if (argv == NULL) oom("sdssplitlen");
1463 sdsfree(query);
1464
1465 if (c->argv) zfree(c->argv);
1466 c->argv = zmalloc(sizeof(robj*)*argc);
1467 if (c->argv == NULL) oom("allocating arguments list for client");
1468
1469 for (j = 0; j < argc; j++) {
1470 if (sdslen(argv[j])) {
1471 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1472 c->argc++;
1473 } else {
1474 sdsfree(argv[j]);
1475 }
1476 }
1477 zfree(argv);
1478 /* Execute the command. If the client is still valid
1479 * after processCommand() return and there is something
1480 * on the query buffer try to process the next command. */
1481 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1482 return;
1483 } else if (sdslen(c->querybuf) >= 1024*32) {
1484 redisLog(REDIS_DEBUG, "Client protocol error");
1485 freeClient(c);
1486 return;
1487 }
1488 } else {
1489 /* Bulk read handling. Note that if we are at this point
1490 the client already sent a command terminated with a newline,
1491 we are reading the bulk data that is actually the last
1492 argument of the command. */
1493 int qbl = sdslen(c->querybuf);
1494
1495 if (c->bulklen <= qbl) {
1496 /* Copy everything but the final CRLF as final argument */
1497 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1498 c->argc++;
1499 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1500 processCommand(c);
1501 return;
1502 }
1503 }
1504 }
1505
1506 static int selectDb(redisClient *c, int id) {
1507 if (id < 0 || id >= server.dbnum)
1508 return REDIS_ERR;
1509 c->db = &server.db[id];
1510 return REDIS_OK;
1511 }
1512
1513 static void *dupClientReplyValue(void *o) {
1514 incrRefCount((robj*)o);
1515 return 0;
1516 }
1517
1518 static redisClient *createClient(int fd) {
1519 redisClient *c = zmalloc(sizeof(*c));
1520
1521 anetNonBlock(NULL,fd);
1522 anetTcpNoDelay(NULL,fd);
1523 if (!c) return NULL;
1524 selectDb(c,0);
1525 c->fd = fd;
1526 c->querybuf = sdsempty();
1527 c->argc = 0;
1528 c->argv = NULL;
1529 c->bulklen = -1;
1530 c->sentlen = 0;
1531 c->flags = 0;
1532 c->lastinteraction = time(NULL);
1533 c->authenticated = 0;
1534 c->replstate = REDIS_REPL_NONE;
1535 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1536 listSetFreeMethod(c->reply,decrRefCount);
1537 listSetDupMethod(c->reply,dupClientReplyValue);
1538 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1539 readQueryFromClient, c, NULL) == AE_ERR) {
1540 freeClient(c);
1541 return NULL;
1542 }
1543 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1544 return c;
1545 }
1546
1547 static void addReply(redisClient *c, robj *obj) {
1548 if (listLength(c->reply) == 0 &&
1549 (c->replstate == REDIS_REPL_NONE ||
1550 c->replstate == REDIS_REPL_ONLINE) &&
1551 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1552 sendReplyToClient, c, NULL) == AE_ERR) return;
1553 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1554 incrRefCount(obj);
1555 }
1556
1557 static void addReplySds(redisClient *c, sds s) {
1558 robj *o = createObject(REDIS_STRING,s);
1559 addReply(c,o);
1560 decrRefCount(o);
1561 }
1562
1563 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1564 int cport, cfd;
1565 char cip[128];
1566 redisClient *c;
1567 REDIS_NOTUSED(el);
1568 REDIS_NOTUSED(mask);
1569 REDIS_NOTUSED(privdata);
1570
1571 cfd = anetAccept(server.neterr, fd, cip, &cport);
1572 if (cfd == AE_ERR) {
1573 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1574 return;
1575 }
1576 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1577 if ((c = createClient(cfd)) == NULL) {
1578 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1579 close(cfd); /* May be already closed, just ingore errors */
1580 return;
1581 }
1582 /* If maxclient directive is set and this is one client more... close the
1583 * connection. Note that we create the client instead to check before
1584 * for this condition, since now the socket is already set in nonblocking
1585 * mode and we can send an error for free using the Kernel I/O */
1586 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1587 char *err = "-ERR max number of clients reached\r\n";
1588
1589 /* That's a best effort error message, don't check write errors */
1590 (void) write(c->fd,err,strlen(err));
1591 freeClient(c);
1592 return;
1593 }
1594 server.stat_numconnections++;
1595 }
1596
1597 /* ======================= Redis objects implementation ===================== */
1598
1599 static robj *createObject(int type, void *ptr) {
1600 robj *o;
1601
1602 if (listLength(server.objfreelist)) {
1603 listNode *head = listFirst(server.objfreelist);
1604 o = listNodeValue(head);
1605 listDelNode(server.objfreelist,head);
1606 } else {
1607 o = zmalloc(sizeof(*o));
1608 }
1609 if (!o) oom("createObject");
1610 o->type = type;
1611 o->ptr = ptr;
1612 o->refcount = 1;
1613 return o;
1614 }
1615
1616 static robj *createStringObject(char *ptr, size_t len) {
1617 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1618 }
1619
1620 static robj *createListObject(void) {
1621 list *l = listCreate();
1622
1623 if (!l) oom("listCreate");
1624 listSetFreeMethod(l,decrRefCount);
1625 return createObject(REDIS_LIST,l);
1626 }
1627
1628 static robj *createSetObject(void) {
1629 dict *d = dictCreate(&setDictType,NULL);
1630 if (!d) oom("dictCreate");
1631 return createObject(REDIS_SET,d);
1632 }
1633
1634 static void freeStringObject(robj *o) {
1635 sdsfree(o->ptr);
1636 }
1637
1638 static void freeListObject(robj *o) {
1639 listRelease((list*) o->ptr);
1640 }
1641
1642 static void freeSetObject(robj *o) {
1643 dictRelease((dict*) o->ptr);
1644 }
1645
1646 static void freeHashObject(robj *o) {
1647 dictRelease((dict*) o->ptr);
1648 }
1649
1650 static void incrRefCount(robj *o) {
1651 o->refcount++;
1652 #ifdef DEBUG_REFCOUNT
1653 if (o->type == REDIS_STRING)
1654 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1655 #endif
1656 }
1657
1658 static void decrRefCount(void *obj) {
1659 robj *o = obj;
1660
1661 #ifdef DEBUG_REFCOUNT
1662 if (o->type == REDIS_STRING)
1663 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1664 #endif
1665 if (--(o->refcount) == 0) {
1666 switch(o->type) {
1667 case REDIS_STRING: freeStringObject(o); break;
1668 case REDIS_LIST: freeListObject(o); break;
1669 case REDIS_SET: freeSetObject(o); break;
1670 case REDIS_HASH: freeHashObject(o); break;
1671 default: assert(0 != 0); break;
1672 }
1673 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1674 !listAddNodeHead(server.objfreelist,o))
1675 zfree(o);
1676 }
1677 }
1678
1679 /* Try to share an object against the shared objects pool */
1680 static robj *tryObjectSharing(robj *o) {
1681 struct dictEntry *de;
1682 unsigned long c;
1683
1684 if (o == NULL || server.shareobjects == 0) return o;
1685
1686 assert(o->type == REDIS_STRING);
1687 de = dictFind(server.sharingpool,o);
1688 if (de) {
1689 robj *shared = dictGetEntryKey(de);
1690
1691 c = ((unsigned long) dictGetEntryVal(de))+1;
1692 dictGetEntryVal(de) = (void*) c;
1693 incrRefCount(shared);
1694 decrRefCount(o);
1695 return shared;
1696 } else {
1697 /* Here we are using a stream algorihtm: Every time an object is
1698 * shared we increment its count, everytime there is a miss we
1699 * recrement the counter of a random object. If this object reaches
1700 * zero we remove the object and put the current object instead. */
1701 if (dictSize(server.sharingpool) >=
1702 server.sharingpoolsize) {
1703 de = dictGetRandomKey(server.sharingpool);
1704 assert(de != NULL);
1705 c = ((unsigned long) dictGetEntryVal(de))-1;
1706 dictGetEntryVal(de) = (void*) c;
1707 if (c == 0) {
1708 dictDelete(server.sharingpool,de->key);
1709 }
1710 } else {
1711 c = 0; /* If the pool is empty we want to add this object */
1712 }
1713 if (c == 0) {
1714 int retval;
1715
1716 retval = dictAdd(server.sharingpool,o,(void*)1);
1717 assert(retval == DICT_OK);
1718 incrRefCount(o);
1719 }
1720 return o;
1721 }
1722 }
1723
1724 static robj *lookupKey(redisDb *db, robj *key) {
1725 dictEntry *de = dictFind(db->dict,key);
1726 return de ? dictGetEntryVal(de) : NULL;
1727 }
1728
1729 static robj *lookupKeyRead(redisDb *db, robj *key) {
1730 expireIfNeeded(db,key);
1731 return lookupKey(db,key);
1732 }
1733
1734 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1735 deleteIfVolatile(db,key);
1736 return lookupKey(db,key);
1737 }
1738
1739 static int deleteKey(redisDb *db, robj *key) {
1740 int retval;
1741
1742 /* We need to protect key from destruction: after the first dictDelete()
1743 * it may happen that 'key' is no longer valid if we don't increment
1744 * it's count. This may happen when we get the object reference directly
1745 * from the hash table with dictRandomKey() or dict iterators */
1746 incrRefCount(key);
1747 if (dictSize(db->expires)) dictDelete(db->expires,key);
1748 retval = dictDelete(db->dict,key);
1749 decrRefCount(key);
1750
1751 return retval == DICT_OK;
1752 }
1753
1754 /*============================ DB saving/loading ============================ */
1755
1756 static int rdbSaveType(FILE *fp, unsigned char type) {
1757 if (fwrite(&type,1,1,fp) == 0) return -1;
1758 return 0;
1759 }
1760
1761 static int rdbSaveTime(FILE *fp, time_t t) {
1762 int32_t t32 = (int32_t) t;
1763 if (fwrite(&t32,4,1,fp) == 0) return -1;
1764 return 0;
1765 }
1766
1767 /* check rdbLoadLen() comments for more info */
1768 static int rdbSaveLen(FILE *fp, uint32_t len) {
1769 unsigned char buf[2];
1770
1771 if (len < (1<<6)) {
1772 /* Save a 6 bit len */
1773 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
1774 if (fwrite(buf,1,1,fp) == 0) return -1;
1775 } else if (len < (1<<14)) {
1776 /* Save a 14 bit len */
1777 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
1778 buf[1] = len&0xFF;
1779 if (fwrite(buf,2,1,fp) == 0) return -1;
1780 } else {
1781 /* Save a 32 bit len */
1782 buf[0] = (REDIS_RDB_32BITLEN<<6);
1783 if (fwrite(buf,1,1,fp) == 0) return -1;
1784 len = htonl(len);
1785 if (fwrite(&len,4,1,fp) == 0) return -1;
1786 }
1787 return 0;
1788 }
1789
1790 /* String objects in the form "2391" "-100" without any space and with a
1791 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1792 * encoded as integers to save space */
1793 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
1794 long long value;
1795 char *endptr, buf[32];
1796
1797 /* Check if it's possible to encode this value as a number */
1798 value = strtoll(s, &endptr, 10);
1799 if (endptr[0] != '\0') return 0;
1800 snprintf(buf,32,"%lld",value);
1801
1802 /* If the number converted back into a string is not identical
1803 * then it's not possible to encode the string as integer */
1804 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
1805
1806 /* Finally check if it fits in our ranges */
1807 if (value >= -(1<<7) && value <= (1<<7)-1) {
1808 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
1809 enc[1] = value&0xFF;
1810 return 2;
1811 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
1812 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
1813 enc[1] = value&0xFF;
1814 enc[2] = (value>>8)&0xFF;
1815 return 3;
1816 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
1817 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
1818 enc[1] = value&0xFF;
1819 enc[2] = (value>>8)&0xFF;
1820 enc[3] = (value>>16)&0xFF;
1821 enc[4] = (value>>24)&0xFF;
1822 return 5;
1823 } else {
1824 return 0;
1825 }
1826 }
1827
1828 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
1829 unsigned int comprlen, outlen;
1830 unsigned char byte;
1831 void *out;
1832
1833 /* We require at least four bytes compression for this to be worth it */
1834 outlen = sdslen(obj->ptr)-4;
1835 if (outlen <= 0) return 0;
1836 if ((out = zmalloc(outlen+1)) == NULL) return 0;
1837 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
1838 if (comprlen == 0) {
1839 zfree(out);
1840 return 0;
1841 }
1842 /* Data compressed! Let's save it on disk */
1843 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
1844 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
1845 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
1846 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
1847 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
1848 zfree(out);
1849 return comprlen;
1850
1851 writeerr:
1852 zfree(out);
1853 return -1;
1854 }
1855
1856 /* Save a string objet as [len][data] on disk. If the object is a string
1857 * representation of an integer value we try to safe it in a special form */
1858 static int rdbSaveStringObject(FILE *fp, robj *obj) {
1859 size_t len = sdslen(obj->ptr);
1860 int enclen;
1861
1862 /* Try integer encoding */
1863 if (len <= 11) {
1864 unsigned char buf[5];
1865 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
1866 if (fwrite(buf,enclen,1,fp) == 0) return -1;
1867 return 0;
1868 }
1869 }
1870
1871 /* Try LZF compression - under 20 bytes it's unable to compress even
1872 * aaaaaaaaaaaaaaaaaa so skip it */
1873 if (1 && len > 20) {
1874 int retval;
1875
1876 retval = rdbSaveLzfStringObject(fp,obj);
1877 if (retval == -1) return -1;
1878 if (retval > 0) return 0;
1879 /* retval == 0 means data can't be compressed, save the old way */
1880 }
1881
1882 /* Store verbatim */
1883 if (rdbSaveLen(fp,len) == -1) return -1;
1884 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
1885 return 0;
1886 }
1887
1888 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1889 static int rdbSave(char *filename) {
1890 dictIterator *di = NULL;
1891 dictEntry *de;
1892 FILE *fp;
1893 char tmpfile[256];
1894 int j;
1895 time_t now = time(NULL);
1896
1897 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
1898 fp = fopen(tmpfile,"w");
1899 if (!fp) {
1900 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
1901 return REDIS_ERR;
1902 }
1903 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
1904 for (j = 0; j < server.dbnum; j++) {
1905 redisDb *db = server.db+j;
1906 dict *d = db->dict;
1907 if (dictSize(d) == 0) continue;
1908 di = dictGetIterator(d);
1909 if (!di) {
1910 fclose(fp);
1911 return REDIS_ERR;
1912 }
1913
1914 /* Write the SELECT DB opcode */
1915 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
1916 if (rdbSaveLen(fp,j) == -1) goto werr;
1917
1918 /* Iterate this DB writing every entry */
1919 while((de = dictNext(di)) != NULL) {
1920 robj *key = dictGetEntryKey(de);
1921 robj *o = dictGetEntryVal(de);
1922 time_t expiretime = getExpire(db,key);
1923
1924 /* Save the expire time */
1925 if (expiretime != -1) {
1926 /* If this key is already expired skip it */
1927 if (expiretime < now) continue;
1928 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
1929 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
1930 }
1931 /* Save the key and associated value */
1932 if (rdbSaveType(fp,o->type) == -1) goto werr;
1933 if (rdbSaveStringObject(fp,key) == -1) goto werr;
1934 if (o->type == REDIS_STRING) {
1935 /* Save a string value */
1936 if (rdbSaveStringObject(fp,o) == -1) goto werr;
1937 } else if (o->type == REDIS_LIST) {
1938 /* Save a list value */
1939 list *list = o->ptr;
1940 listNode *ln;
1941
1942 listRewind(list);
1943 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
1944 while((ln = listYield(list))) {
1945 robj *eleobj = listNodeValue(ln);
1946
1947 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1948 }
1949 } else if (o->type == REDIS_SET) {
1950 /* Save a set value */
1951 dict *set = o->ptr;
1952 dictIterator *di = dictGetIterator(set);
1953 dictEntry *de;
1954
1955 if (!set) oom("dictGetIteraotr");
1956 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
1957 while((de = dictNext(di)) != NULL) {
1958 robj *eleobj = dictGetEntryKey(de);
1959
1960 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1961 }
1962 dictReleaseIterator(di);
1963 } else {
1964 assert(0 != 0);
1965 }
1966 }
1967 dictReleaseIterator(di);
1968 }
1969 /* EOF opcode */
1970 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
1971
1972 /* Make sure data will not remain on the OS's output buffers */
1973 fflush(fp);
1974 fsync(fileno(fp));
1975 fclose(fp);
1976
1977 /* Use RENAME to make sure the DB file is changed atomically only
1978 * if the generate DB file is ok. */
1979 if (rename(tmpfile,filename) == -1) {
1980 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
1981 unlink(tmpfile);
1982 return REDIS_ERR;
1983 }
1984 redisLog(REDIS_NOTICE,"DB saved on disk");
1985 server.dirty = 0;
1986 server.lastsave = time(NULL);
1987 return REDIS_OK;
1988
1989 werr:
1990 fclose(fp);
1991 unlink(tmpfile);
1992 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1993 if (di) dictReleaseIterator(di);
1994 return REDIS_ERR;
1995 }
1996
1997 static int rdbSaveBackground(char *filename) {
1998 pid_t childpid;
1999
2000 if (server.bgsaveinprogress) return REDIS_ERR;
2001 if ((childpid = fork()) == 0) {
2002 /* Child */
2003 close(server.fd);
2004 if (rdbSave(filename) == REDIS_OK) {
2005 exit(0);
2006 } else {
2007 exit(1);
2008 }
2009 } else {
2010 /* Parent */
2011 if (childpid == -1) {
2012 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2013 strerror(errno));
2014 return REDIS_ERR;
2015 }
2016 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2017 server.bgsaveinprogress = 1;
2018 server.bgsavechildpid = childpid;
2019 return REDIS_OK;
2020 }
2021 return REDIS_OK; /* unreached */
2022 }
2023
2024 static int rdbLoadType(FILE *fp) {
2025 unsigned char type;
2026 if (fread(&type,1,1,fp) == 0) return -1;
2027 return type;
2028 }
2029
2030 static time_t rdbLoadTime(FILE *fp) {
2031 int32_t t32;
2032 if (fread(&t32,4,1,fp) == 0) return -1;
2033 return (time_t) t32;
2034 }
2035
2036 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2037 * of this file for a description of how this are stored on disk.
2038 *
2039 * isencoded is set to 1 if the readed length is not actually a length but
2040 * an "encoding type", check the above comments for more info */
2041 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2042 unsigned char buf[2];
2043 uint32_t len;
2044
2045 if (isencoded) *isencoded = 0;
2046 if (rdbver == 0) {
2047 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2048 return ntohl(len);
2049 } else {
2050 int type;
2051
2052 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2053 type = (buf[0]&0xC0)>>6;
2054 if (type == REDIS_RDB_6BITLEN) {
2055 /* Read a 6 bit len */
2056 return buf[0]&0x3F;
2057 } else if (type == REDIS_RDB_ENCVAL) {
2058 /* Read a 6 bit len encoding type */
2059 if (isencoded) *isencoded = 1;
2060 return buf[0]&0x3F;
2061 } else if (type == REDIS_RDB_14BITLEN) {
2062 /* Read a 14 bit len */
2063 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2064 return ((buf[0]&0x3F)<<8)|buf[1];
2065 } else {
2066 /* Read a 32 bit len */
2067 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2068 return ntohl(len);
2069 }
2070 }
2071 }
2072
2073 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2074 unsigned char enc[4];
2075 long long val;
2076
2077 if (enctype == REDIS_RDB_ENC_INT8) {
2078 if (fread(enc,1,1,fp) == 0) return NULL;
2079 val = (signed char)enc[0];
2080 } else if (enctype == REDIS_RDB_ENC_INT16) {
2081 uint16_t v;
2082 if (fread(enc,2,1,fp) == 0) return NULL;
2083 v = enc[0]|(enc[1]<<8);
2084 val = (int16_t)v;
2085 } else if (enctype == REDIS_RDB_ENC_INT32) {
2086 uint32_t v;
2087 if (fread(enc,4,1,fp) == 0) return NULL;
2088 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2089 val = (int32_t)v;
2090 } else {
2091 val = 0; /* anti-warning */
2092 assert(0!=0);
2093 }
2094 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2095 }
2096
2097 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2098 unsigned int len, clen;
2099 unsigned char *c = NULL;
2100 sds val = NULL;
2101
2102 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2103 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2104 if ((c = zmalloc(clen)) == NULL) goto err;
2105 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2106 if (fread(c,clen,1,fp) == 0) goto err;
2107 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2108 zfree(c);
2109 return createObject(REDIS_STRING,val);
2110 err:
2111 zfree(c);
2112 sdsfree(val);
2113 return NULL;
2114 }
2115
2116 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2117 int isencoded;
2118 uint32_t len;
2119 sds val;
2120
2121 len = rdbLoadLen(fp,rdbver,&isencoded);
2122 if (isencoded) {
2123 switch(len) {
2124 case REDIS_RDB_ENC_INT8:
2125 case REDIS_RDB_ENC_INT16:
2126 case REDIS_RDB_ENC_INT32:
2127 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2128 case REDIS_RDB_ENC_LZF:
2129 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2130 default:
2131 assert(0!=0);
2132 }
2133 }
2134
2135 if (len == REDIS_RDB_LENERR) return NULL;
2136 val = sdsnewlen(NULL,len);
2137 if (len && fread(val,len,1,fp) == 0) {
2138 sdsfree(val);
2139 return NULL;
2140 }
2141 return tryObjectSharing(createObject(REDIS_STRING,val));
2142 }
2143
2144 static int rdbLoad(char *filename) {
2145 FILE *fp;
2146 robj *keyobj = NULL;
2147 uint32_t dbid;
2148 int type, retval, rdbver;
2149 dict *d = server.db[0].dict;
2150 redisDb *db = server.db+0;
2151 char buf[1024];
2152 time_t expiretime = -1, now = time(NULL);
2153
2154 fp = fopen(filename,"r");
2155 if (!fp) return REDIS_ERR;
2156 if (fread(buf,9,1,fp) == 0) goto eoferr;
2157 buf[9] = '\0';
2158 if (memcmp(buf,"REDIS",5) != 0) {
2159 fclose(fp);
2160 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2161 return REDIS_ERR;
2162 }
2163 rdbver = atoi(buf+5);
2164 if (rdbver > 1) {
2165 fclose(fp);
2166 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2167 return REDIS_ERR;
2168 }
2169 while(1) {
2170 robj *o;
2171
2172 /* Read type. */
2173 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2174 if (type == REDIS_EXPIRETIME) {
2175 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2176 /* We read the time so we need to read the object type again */
2177 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2178 }
2179 if (type == REDIS_EOF) break;
2180 /* Handle SELECT DB opcode as a special case */
2181 if (type == REDIS_SELECTDB) {
2182 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2183 goto eoferr;
2184 if (dbid >= (unsigned)server.dbnum) {
2185 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2186 exit(1);
2187 }
2188 db = server.db+dbid;
2189 d = db->dict;
2190 continue;
2191 }
2192 /* Read key */
2193 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2194
2195 if (type == REDIS_STRING) {
2196 /* Read string value */
2197 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2198 } else if (type == REDIS_LIST || type == REDIS_SET) {
2199 /* Read list/set value */
2200 uint32_t listlen;
2201
2202 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2203 goto eoferr;
2204 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2205 /* Load every single element of the list/set */
2206 while(listlen--) {
2207 robj *ele;
2208
2209 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2210 if (type == REDIS_LIST) {
2211 if (!listAddNodeTail((list*)o->ptr,ele))
2212 oom("listAddNodeTail");
2213 } else {
2214 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2215 oom("dictAdd");
2216 }
2217 }
2218 } else {
2219 assert(0 != 0);
2220 }
2221 /* Add the new object in the hash table */
2222 retval = dictAdd(d,keyobj,o);
2223 if (retval == DICT_ERR) {
2224 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2225 exit(1);
2226 }
2227 /* Set the expire time if needed */
2228 if (expiretime != -1) {
2229 setExpire(db,keyobj,expiretime);
2230 /* Delete this key if already expired */
2231 if (expiretime < now) deleteKey(db,keyobj);
2232 expiretime = -1;
2233 }
2234 keyobj = o = NULL;
2235 }
2236 fclose(fp);
2237 return REDIS_OK;
2238
2239 eoferr: /* unexpected end of file is handled here with a fatal exit */
2240 if (keyobj) decrRefCount(keyobj);
2241 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2242 exit(1);
2243 return REDIS_ERR; /* Just to avoid warning */
2244 }
2245
2246 /*================================== Commands =============================== */
2247
2248 static void authCommand(redisClient *c) {
2249 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2250 c->authenticated = 1;
2251 addReply(c,shared.ok);
2252 } else {
2253 c->authenticated = 0;
2254 addReply(c,shared.err);
2255 }
2256 }
2257
2258 static void pingCommand(redisClient *c) {
2259 addReply(c,shared.pong);
2260 }
2261
2262 static void echoCommand(redisClient *c) {
2263 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
2264 (int)sdslen(c->argv[1]->ptr)));
2265 addReply(c,c->argv[1]);
2266 addReply(c,shared.crlf);
2267 }
2268
2269 /*=================================== Strings =============================== */
2270
2271 static void setGenericCommand(redisClient *c, int nx) {
2272 int retval;
2273
2274 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2275 if (retval == DICT_ERR) {
2276 if (!nx) {
2277 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2278 incrRefCount(c->argv[2]);
2279 } else {
2280 addReply(c,shared.czero);
2281 return;
2282 }
2283 } else {
2284 incrRefCount(c->argv[1]);
2285 incrRefCount(c->argv[2]);
2286 }
2287 server.dirty++;
2288 removeExpire(c->db,c->argv[1]);
2289 addReply(c, nx ? shared.cone : shared.ok);
2290 }
2291
2292 static void setCommand(redisClient *c) {
2293 setGenericCommand(c,0);
2294 }
2295
2296 static void setnxCommand(redisClient *c) {
2297 setGenericCommand(c,1);
2298 }
2299
2300 static void getCommand(redisClient *c) {
2301 robj *o = lookupKeyRead(c->db,c->argv[1]);
2302
2303 if (o == NULL) {
2304 addReply(c,shared.nullbulk);
2305 } else {
2306 if (o->type != REDIS_STRING) {
2307 addReply(c,shared.wrongtypeerr);
2308 } else {
2309 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2310 addReply(c,o);
2311 addReply(c,shared.crlf);
2312 }
2313 }
2314 }
2315
2316 static void getSetCommand(redisClient *c) {
2317 getCommand(c);
2318 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2319 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2320 } else {
2321 incrRefCount(c->argv[1]);
2322 }
2323 incrRefCount(c->argv[2]);
2324 server.dirty++;
2325 removeExpire(c->db,c->argv[1]);
2326 }
2327
2328 static void mgetCommand(redisClient *c) {
2329 int j;
2330
2331 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2332 for (j = 1; j < c->argc; j++) {
2333 robj *o = lookupKeyRead(c->db,c->argv[j]);
2334 if (o == NULL) {
2335 addReply(c,shared.nullbulk);
2336 } else {
2337 if (o->type != REDIS_STRING) {
2338 addReply(c,shared.nullbulk);
2339 } else {
2340 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2341 addReply(c,o);
2342 addReply(c,shared.crlf);
2343 }
2344 }
2345 }
2346 }
2347
2348 static void incrDecrCommand(redisClient *c, long long incr) {
2349 long long value;
2350 int retval;
2351 robj *o;
2352
2353 o = lookupKeyWrite(c->db,c->argv[1]);
2354 if (o == NULL) {
2355 value = 0;
2356 } else {
2357 if (o->type != REDIS_STRING) {
2358 value = 0;
2359 } else {
2360 char *eptr;
2361
2362 value = strtoll(o->ptr, &eptr, 10);
2363 }
2364 }
2365
2366 value += incr;
2367 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2368 retval = dictAdd(c->db->dict,c->argv[1],o);
2369 if (retval == DICT_ERR) {
2370 dictReplace(c->db->dict,c->argv[1],o);
2371 removeExpire(c->db,c->argv[1]);
2372 } else {
2373 incrRefCount(c->argv[1]);
2374 }
2375 server.dirty++;
2376 addReply(c,shared.colon);
2377 addReply(c,o);
2378 addReply(c,shared.crlf);
2379 }
2380
2381 static void incrCommand(redisClient *c) {
2382 incrDecrCommand(c,1);
2383 }
2384
2385 static void decrCommand(redisClient *c) {
2386 incrDecrCommand(c,-1);
2387 }
2388
2389 static void incrbyCommand(redisClient *c) {
2390 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2391 incrDecrCommand(c,incr);
2392 }
2393
2394 static void decrbyCommand(redisClient *c) {
2395 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2396 incrDecrCommand(c,-incr);
2397 }
2398
2399 /* ========================= Type agnostic commands ========================= */
2400
2401 static void delCommand(redisClient *c) {
2402 int deleted = 0, j;
2403
2404 for (j = 1; j < c->argc; j++) {
2405 if (deleteKey(c->db,c->argv[j])) {
2406 server.dirty++;
2407 deleted++;
2408 }
2409 }
2410 switch(deleted) {
2411 case 0:
2412 addReply(c,shared.czero);
2413 break;
2414 case 1:
2415 addReply(c,shared.cone);
2416 break;
2417 default:
2418 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2419 break;
2420 }
2421 }
2422
2423 static void existsCommand(redisClient *c) {
2424 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2425 }
2426
2427 static void selectCommand(redisClient *c) {
2428 int id = atoi(c->argv[1]->ptr);
2429
2430 if (selectDb(c,id) == REDIS_ERR) {
2431 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2432 } else {
2433 addReply(c,shared.ok);
2434 }
2435 }
2436
2437 static void randomkeyCommand(redisClient *c) {
2438 dictEntry *de;
2439
2440 while(1) {
2441 de = dictGetRandomKey(c->db->dict);
2442 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2443 }
2444 if (de == NULL) {
2445 addReply(c,shared.plus);
2446 addReply(c,shared.crlf);
2447 } else {
2448 addReply(c,shared.plus);
2449 addReply(c,dictGetEntryKey(de));
2450 addReply(c,shared.crlf);
2451 }
2452 }
2453
2454 static void keysCommand(redisClient *c) {
2455 dictIterator *di;
2456 dictEntry *de;
2457 sds pattern = c->argv[1]->ptr;
2458 int plen = sdslen(pattern);
2459 int numkeys = 0, keyslen = 0;
2460 robj *lenobj = createObject(REDIS_STRING,NULL);
2461
2462 di = dictGetIterator(c->db->dict);
2463 if (!di) oom("dictGetIterator");
2464 addReply(c,lenobj);
2465 decrRefCount(lenobj);
2466 while((de = dictNext(di)) != NULL) {
2467 robj *keyobj = dictGetEntryKey(de);
2468
2469 sds key = keyobj->ptr;
2470 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2471 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2472 if (expireIfNeeded(c->db,keyobj) == 0) {
2473 if (numkeys != 0)
2474 addReply(c,shared.space);
2475 addReply(c,keyobj);
2476 numkeys++;
2477 keyslen += sdslen(key);
2478 }
2479 }
2480 }
2481 dictReleaseIterator(di);
2482 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2483 addReply(c,shared.crlf);
2484 }
2485
2486 static void dbsizeCommand(redisClient *c) {
2487 addReplySds(c,
2488 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2489 }
2490
2491 static void lastsaveCommand(redisClient *c) {
2492 addReplySds(c,
2493 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2494 }
2495
2496 static void typeCommand(redisClient *c) {
2497 robj *o;
2498 char *type;
2499
2500 o = lookupKeyRead(c->db,c->argv[1]);
2501 if (o == NULL) {
2502 type = "+none";
2503 } else {
2504 switch(o->type) {
2505 case REDIS_STRING: type = "+string"; break;
2506 case REDIS_LIST: type = "+list"; break;
2507 case REDIS_SET: type = "+set"; break;
2508 default: type = "unknown"; break;
2509 }
2510 }
2511 addReplySds(c,sdsnew(type));
2512 addReply(c,shared.crlf);
2513 }
2514
2515 static void saveCommand(redisClient *c) {
2516 if (server.bgsaveinprogress) {
2517 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2518 return;
2519 }
2520 if (rdbSave(server.dbfilename) == REDIS_OK) {
2521 addReply(c,shared.ok);
2522 } else {
2523 addReply(c,shared.err);
2524 }
2525 }
2526
2527 static void bgsaveCommand(redisClient *c) {
2528 if (server.bgsaveinprogress) {
2529 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2530 return;
2531 }
2532 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2533 addReply(c,shared.ok);
2534 } else {
2535 addReply(c,shared.err);
2536 }
2537 }
2538
2539 static void shutdownCommand(redisClient *c) {
2540 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2541 if (server.bgsaveinprogress) {
2542 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2543 signal(SIGCHLD, SIG_IGN);
2544 kill(server.bgsavechildpid,SIGKILL);
2545 }
2546 if (rdbSave(server.dbfilename) == REDIS_OK) {
2547 if (server.daemonize)
2548 unlink(server.pidfile);
2549 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2550 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2551 exit(1);
2552 } else {
2553 signal(SIGCHLD, SIG_DFL);
2554 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2555 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2556 }
2557 }
2558
2559 static void renameGenericCommand(redisClient *c, int nx) {
2560 robj *o;
2561
2562 /* To use the same key as src and dst is probably an error */
2563 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2564 addReply(c,shared.sameobjecterr);
2565 return;
2566 }
2567
2568 o = lookupKeyWrite(c->db,c->argv[1]);
2569 if (o == NULL) {
2570 addReply(c,shared.nokeyerr);
2571 return;
2572 }
2573 incrRefCount(o);
2574 deleteIfVolatile(c->db,c->argv[2]);
2575 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2576 if (nx) {
2577 decrRefCount(o);
2578 addReply(c,shared.czero);
2579 return;
2580 }
2581 dictReplace(c->db->dict,c->argv[2],o);
2582 } else {
2583 incrRefCount(c->argv[2]);
2584 }
2585 deleteKey(c->db,c->argv[1]);
2586 server.dirty++;
2587 addReply(c,nx ? shared.cone : shared.ok);
2588 }
2589
2590 static void renameCommand(redisClient *c) {
2591 renameGenericCommand(c,0);
2592 }
2593
2594 static void renamenxCommand(redisClient *c) {
2595 renameGenericCommand(c,1);
2596 }
2597
2598 static void moveCommand(redisClient *c) {
2599 robj *o;
2600 redisDb *src, *dst;
2601 int srcid;
2602
2603 /* Obtain source and target DB pointers */
2604 src = c->db;
2605 srcid = c->db->id;
2606 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2607 addReply(c,shared.outofrangeerr);
2608 return;
2609 }
2610 dst = c->db;
2611 selectDb(c,srcid); /* Back to the source DB */
2612
2613 /* If the user is moving using as target the same
2614 * DB as the source DB it is probably an error. */
2615 if (src == dst) {
2616 addReply(c,shared.sameobjecterr);
2617 return;
2618 }
2619
2620 /* Check if the element exists and get a reference */
2621 o = lookupKeyWrite(c->db,c->argv[1]);
2622 if (!o) {
2623 addReply(c,shared.czero);
2624 return;
2625 }
2626
2627 /* Try to add the element to the target DB */
2628 deleteIfVolatile(dst,c->argv[1]);
2629 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2630 addReply(c,shared.czero);
2631 return;
2632 }
2633 incrRefCount(c->argv[1]);
2634 incrRefCount(o);
2635
2636 /* OK! key moved, free the entry in the source DB */
2637 deleteKey(src,c->argv[1]);
2638 server.dirty++;
2639 addReply(c,shared.cone);
2640 }
2641
2642 /* =================================== Lists ================================ */
2643 static void pushGenericCommand(redisClient *c, int where) {
2644 robj *lobj;
2645 list *list;
2646
2647 lobj = lookupKeyWrite(c->db,c->argv[1]);
2648 if (lobj == NULL) {
2649 lobj = createListObject();
2650 list = lobj->ptr;
2651 if (where == REDIS_HEAD) {
2652 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2653 } else {
2654 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2655 }
2656 dictAdd(c->db->dict,c->argv[1],lobj);
2657 incrRefCount(c->argv[1]);
2658 incrRefCount(c->argv[2]);
2659 } else {
2660 if (lobj->type != REDIS_LIST) {
2661 addReply(c,shared.wrongtypeerr);
2662 return;
2663 }
2664 list = lobj->ptr;
2665 if (where == REDIS_HEAD) {
2666 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2667 } else {
2668 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2669 }
2670 incrRefCount(c->argv[2]);
2671 }
2672 server.dirty++;
2673 addReply(c,shared.ok);
2674 }
2675
2676 static void lpushCommand(redisClient *c) {
2677 pushGenericCommand(c,REDIS_HEAD);
2678 }
2679
2680 static void rpushCommand(redisClient *c) {
2681 pushGenericCommand(c,REDIS_TAIL);
2682 }
2683
2684 static void llenCommand(redisClient *c) {
2685 robj *o;
2686 list *l;
2687
2688 o = lookupKeyRead(c->db,c->argv[1]);
2689 if (o == NULL) {
2690 addReply(c,shared.czero);
2691 return;
2692 } else {
2693 if (o->type != REDIS_LIST) {
2694 addReply(c,shared.wrongtypeerr);
2695 } else {
2696 l = o->ptr;
2697 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
2698 }
2699 }
2700 }
2701
2702 static void lindexCommand(redisClient *c) {
2703 robj *o;
2704 int index = atoi(c->argv[2]->ptr);
2705
2706 o = lookupKeyRead(c->db,c->argv[1]);
2707 if (o == NULL) {
2708 addReply(c,shared.nullbulk);
2709 } else {
2710 if (o->type != REDIS_LIST) {
2711 addReply(c,shared.wrongtypeerr);
2712 } else {
2713 list *list = o->ptr;
2714 listNode *ln;
2715
2716 ln = listIndex(list, index);
2717 if (ln == NULL) {
2718 addReply(c,shared.nullbulk);
2719 } else {
2720 robj *ele = listNodeValue(ln);
2721 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2722 addReply(c,ele);
2723 addReply(c,shared.crlf);
2724 }
2725 }
2726 }
2727 }
2728
2729 static void lsetCommand(redisClient *c) {
2730 robj *o;
2731 int index = atoi(c->argv[2]->ptr);
2732
2733 o = lookupKeyWrite(c->db,c->argv[1]);
2734 if (o == NULL) {
2735 addReply(c,shared.nokeyerr);
2736 } else {
2737 if (o->type != REDIS_LIST) {
2738 addReply(c,shared.wrongtypeerr);
2739 } else {
2740 list *list = o->ptr;
2741 listNode *ln;
2742
2743 ln = listIndex(list, index);
2744 if (ln == NULL) {
2745 addReply(c,shared.outofrangeerr);
2746 } else {
2747 robj *ele = listNodeValue(ln);
2748
2749 decrRefCount(ele);
2750 listNodeValue(ln) = c->argv[3];
2751 incrRefCount(c->argv[3]);
2752 addReply(c,shared.ok);
2753 server.dirty++;
2754 }
2755 }
2756 }
2757 }
2758
2759 static void popGenericCommand(redisClient *c, int where) {
2760 robj *o;
2761
2762 o = lookupKeyWrite(c->db,c->argv[1]);
2763 if (o == NULL) {
2764 addReply(c,shared.nullbulk);
2765 } else {
2766 if (o->type != REDIS_LIST) {
2767 addReply(c,shared.wrongtypeerr);
2768 } else {
2769 list *list = o->ptr;
2770 listNode *ln;
2771
2772 if (where == REDIS_HEAD)
2773 ln = listFirst(list);
2774 else
2775 ln = listLast(list);
2776
2777 if (ln == NULL) {
2778 addReply(c,shared.nullbulk);
2779 } else {
2780 robj *ele = listNodeValue(ln);
2781 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2782 addReply(c,ele);
2783 addReply(c,shared.crlf);
2784 listDelNode(list,ln);
2785 server.dirty++;
2786 }
2787 }
2788 }
2789 }
2790
2791 static void lpopCommand(redisClient *c) {
2792 popGenericCommand(c,REDIS_HEAD);
2793 }
2794
2795 static void rpopCommand(redisClient *c) {
2796 popGenericCommand(c,REDIS_TAIL);
2797 }
2798
2799 static void lrangeCommand(redisClient *c) {
2800 robj *o;
2801 int start = atoi(c->argv[2]->ptr);
2802 int end = atoi(c->argv[3]->ptr);
2803
2804 o = lookupKeyRead(c->db,c->argv[1]);
2805 if (o == NULL) {
2806 addReply(c,shared.nullmultibulk);
2807 } else {
2808 if (o->type != REDIS_LIST) {
2809 addReply(c,shared.wrongtypeerr);
2810 } else {
2811 list *list = o->ptr;
2812 listNode *ln;
2813 int llen = listLength(list);
2814 int rangelen, j;
2815 robj *ele;
2816
2817 /* convert negative indexes */
2818 if (start < 0) start = llen+start;
2819 if (end < 0) end = llen+end;
2820 if (start < 0) start = 0;
2821 if (end < 0) end = 0;
2822
2823 /* indexes sanity checks */
2824 if (start > end || start >= llen) {
2825 /* Out of range start or start > end result in empty list */
2826 addReply(c,shared.emptymultibulk);
2827 return;
2828 }
2829 if (end >= llen) end = llen-1;
2830 rangelen = (end-start)+1;
2831
2832 /* Return the result in form of a multi-bulk reply */
2833 ln = listIndex(list, start);
2834 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
2835 for (j = 0; j < rangelen; j++) {
2836 ele = listNodeValue(ln);
2837 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2838 addReply(c,ele);
2839 addReply(c,shared.crlf);
2840 ln = ln->next;
2841 }
2842 }
2843 }
2844 }
2845
2846 static void ltrimCommand(redisClient *c) {
2847 robj *o;
2848 int start = atoi(c->argv[2]->ptr);
2849 int end = atoi(c->argv[3]->ptr);
2850
2851 o = lookupKeyWrite(c->db,c->argv[1]);
2852 if (o == NULL) {
2853 addReply(c,shared.nokeyerr);
2854 } else {
2855 if (o->type != REDIS_LIST) {
2856 addReply(c,shared.wrongtypeerr);
2857 } else {
2858 list *list = o->ptr;
2859 listNode *ln;
2860 int llen = listLength(list);
2861 int j, ltrim, rtrim;
2862
2863 /* convert negative indexes */
2864 if (start < 0) start = llen+start;
2865 if (end < 0) end = llen+end;
2866 if (start < 0) start = 0;
2867 if (end < 0) end = 0;
2868
2869 /* indexes sanity checks */
2870 if (start > end || start >= llen) {
2871 /* Out of range start or start > end result in empty list */
2872 ltrim = llen;
2873 rtrim = 0;
2874 } else {
2875 if (end >= llen) end = llen-1;
2876 ltrim = start;
2877 rtrim = llen-end-1;
2878 }
2879
2880 /* Remove list elements to perform the trim */
2881 for (j = 0; j < ltrim; j++) {
2882 ln = listFirst(list);
2883 listDelNode(list,ln);
2884 }
2885 for (j = 0; j < rtrim; j++) {
2886 ln = listLast(list);
2887 listDelNode(list,ln);
2888 }
2889 addReply(c,shared.ok);
2890 server.dirty++;
2891 }
2892 }
2893 }
2894
2895 static void lremCommand(redisClient *c) {
2896 robj *o;
2897
2898 o = lookupKeyWrite(c->db,c->argv[1]);
2899 if (o == NULL) {
2900 addReply(c,shared.czero);
2901 } else {
2902 if (o->type != REDIS_LIST) {
2903 addReply(c,shared.wrongtypeerr);
2904 } else {
2905 list *list = o->ptr;
2906 listNode *ln, *next;
2907 int toremove = atoi(c->argv[2]->ptr);
2908 int removed = 0;
2909 int fromtail = 0;
2910
2911 if (toremove < 0) {
2912 toremove = -toremove;
2913 fromtail = 1;
2914 }
2915 ln = fromtail ? list->tail : list->head;
2916 while (ln) {
2917 robj *ele = listNodeValue(ln);
2918
2919 next = fromtail ? ln->prev : ln->next;
2920 if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) {
2921 listDelNode(list,ln);
2922 server.dirty++;
2923 removed++;
2924 if (toremove && removed == toremove) break;
2925 }
2926 ln = next;
2927 }
2928 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
2929 }
2930 }
2931 }
2932
2933 /* ==================================== Sets ================================ */
2934
2935 static void saddCommand(redisClient *c) {
2936 robj *set;
2937
2938 set = lookupKeyWrite(c->db,c->argv[1]);
2939 if (set == NULL) {
2940 set = createSetObject();
2941 dictAdd(c->db->dict,c->argv[1],set);
2942 incrRefCount(c->argv[1]);
2943 } else {
2944 if (set->type != REDIS_SET) {
2945 addReply(c,shared.wrongtypeerr);
2946 return;
2947 }
2948 }
2949 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
2950 incrRefCount(c->argv[2]);
2951 server.dirty++;
2952 addReply(c,shared.cone);
2953 } else {
2954 addReply(c,shared.czero);
2955 }
2956 }
2957
2958 static void sremCommand(redisClient *c) {
2959 robj *set;
2960
2961 set = lookupKeyWrite(c->db,c->argv[1]);
2962 if (set == NULL) {
2963 addReply(c,shared.czero);
2964 } else {
2965 if (set->type != REDIS_SET) {
2966 addReply(c,shared.wrongtypeerr);
2967 return;
2968 }
2969 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
2970 server.dirty++;
2971 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
2972 addReply(c,shared.cone);
2973 } else {
2974 addReply(c,shared.czero);
2975 }
2976 }
2977 }
2978
2979 static void smoveCommand(redisClient *c) {
2980 robj *srcset, *dstset;
2981
2982 srcset = lookupKeyWrite(c->db,c->argv[1]);
2983 dstset = lookupKeyWrite(c->db,c->argv[2]);
2984
2985 /* If the source key does not exist return 0, if it's of the wrong type
2986 * raise an error */
2987 if (srcset == NULL || srcset->type != REDIS_SET) {
2988 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
2989 return;
2990 }
2991 /* Error if the destination key is not a set as well */
2992 if (dstset && dstset->type != REDIS_SET) {
2993 addReply(c,shared.wrongtypeerr);
2994 return;
2995 }
2996 /* Remove the element from the source set */
2997 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
2998 /* Key not found in the src set! return zero */
2999 addReply(c,shared.czero);
3000 return;
3001 }
3002 server.dirty++;
3003 /* Add the element to the destination set */
3004 if (!dstset) {
3005 dstset = createSetObject();
3006 dictAdd(c->db->dict,c->argv[2],dstset);
3007 incrRefCount(c->argv[2]);
3008 }
3009 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3010 incrRefCount(c->argv[3]);
3011 addReply(c,shared.cone);
3012 }
3013
3014 static void sismemberCommand(redisClient *c) {
3015 robj *set;
3016
3017 set = lookupKeyRead(c->db,c->argv[1]);
3018 if (set == NULL) {
3019 addReply(c,shared.czero);
3020 } else {
3021 if (set->type != REDIS_SET) {
3022 addReply(c,shared.wrongtypeerr);
3023 return;
3024 }
3025 if (dictFind(set->ptr,c->argv[2]))
3026 addReply(c,shared.cone);
3027 else
3028 addReply(c,shared.czero);
3029 }
3030 }
3031
3032 static void scardCommand(redisClient *c) {
3033 robj *o;
3034 dict *s;
3035
3036 o = lookupKeyRead(c->db,c->argv[1]);
3037 if (o == NULL) {
3038 addReply(c,shared.czero);
3039 return;
3040 } else {
3041 if (o->type != REDIS_SET) {
3042 addReply(c,shared.wrongtypeerr);
3043 } else {
3044 s = o->ptr;
3045 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3046 dictSize(s)));
3047 }
3048 }
3049 }
3050
3051 static void spopCommand(redisClient *c) {
3052 robj *set;
3053 dictEntry *de;
3054
3055 set = lookupKeyWrite(c->db,c->argv[1]);
3056 if (set == NULL) {
3057 addReply(c,shared.nullbulk);
3058 } else {
3059 if (set->type != REDIS_SET) {
3060 addReply(c,shared.wrongtypeerr);
3061 return;
3062 }
3063 de = dictGetRandomKey(set->ptr);
3064 if (de == NULL) {
3065 addReply(c,shared.nullbulk);
3066 } else {
3067 robj *ele = dictGetEntryKey(de);
3068
3069 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
3070 addReply(c,ele);
3071 addReply(c,shared.crlf);
3072 dictDelete(set->ptr,ele);
3073 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3074 server.dirty++;
3075 }
3076 }
3077 }
3078
3079 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3080 dict **d1 = (void*) s1, **d2 = (void*) s2;
3081
3082 return dictSize(*d1)-dictSize(*d2);
3083 }
3084
3085 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3086 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3087 dictIterator *di;
3088 dictEntry *de;
3089 robj *lenobj = NULL, *dstset = NULL;
3090 int j, cardinality = 0;
3091
3092 if (!dv) oom("sinterGenericCommand");
3093 for (j = 0; j < setsnum; j++) {
3094 robj *setobj;
3095
3096 setobj = dstkey ?
3097 lookupKeyWrite(c->db,setskeys[j]) :
3098 lookupKeyRead(c->db,setskeys[j]);
3099 if (!setobj) {
3100 zfree(dv);
3101 if (dstkey) {
3102 deleteKey(c->db,dstkey);
3103 addReply(c,shared.ok);
3104 } else {
3105 addReply(c,shared.nullmultibulk);
3106 }
3107 return;
3108 }
3109 if (setobj->type != REDIS_SET) {
3110 zfree(dv);
3111 addReply(c,shared.wrongtypeerr);
3112 return;
3113 }
3114 dv[j] = setobj->ptr;
3115 }
3116 /* Sort sets from the smallest to largest, this will improve our
3117 * algorithm's performace */
3118 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3119
3120 /* The first thing we should output is the total number of elements...
3121 * since this is a multi-bulk write, but at this stage we don't know
3122 * the intersection set size, so we use a trick, append an empty object
3123 * to the output list and save the pointer to later modify it with the
3124 * right length */
3125 if (!dstkey) {
3126 lenobj = createObject(REDIS_STRING,NULL);
3127 addReply(c,lenobj);
3128 decrRefCount(lenobj);
3129 } else {
3130 /* If we have a target key where to store the resulting set
3131 * create this key with an empty set inside */
3132 dstset = createSetObject();
3133 }
3134
3135 /* Iterate all the elements of the first (smallest) set, and test
3136 * the element against all the other sets, if at least one set does
3137 * not include the element it is discarded */
3138 di = dictGetIterator(dv[0]);
3139 if (!di) oom("dictGetIterator");
3140
3141 while((de = dictNext(di)) != NULL) {
3142 robj *ele;
3143
3144 for (j = 1; j < setsnum; j++)
3145 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3146 if (j != setsnum)
3147 continue; /* at least one set does not contain the member */
3148 ele = dictGetEntryKey(de);
3149 if (!dstkey) {
3150 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
3151 addReply(c,ele);
3152 addReply(c,shared.crlf);
3153 cardinality++;
3154 } else {
3155 dictAdd(dstset->ptr,ele,NULL);
3156 incrRefCount(ele);
3157 }
3158 }
3159 dictReleaseIterator(di);
3160
3161 if (dstkey) {
3162 /* Store the resulting set into the target */
3163 deleteKey(c->db,dstkey);
3164 dictAdd(c->db->dict,dstkey,dstset);
3165 incrRefCount(dstkey);
3166 }
3167
3168 if (!dstkey) {
3169 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3170 } else {
3171 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3172 dictSize((dict*)dstset->ptr)));
3173 server.dirty++;
3174 }
3175 zfree(dv);
3176 }
3177
3178 static void sinterCommand(redisClient *c) {
3179 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3180 }
3181
3182 static void sinterstoreCommand(redisClient *c) {
3183 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3184 }
3185
3186 #define REDIS_OP_UNION 0
3187 #define REDIS_OP_DIFF 1
3188
3189 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3190 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3191 dictIterator *di;
3192 dictEntry *de;
3193 robj *dstset = NULL;
3194 int j, cardinality = 0;
3195
3196 if (!dv) oom("sunionDiffGenericCommand");
3197 for (j = 0; j < setsnum; j++) {
3198 robj *setobj;
3199
3200 setobj = dstkey ?
3201 lookupKeyWrite(c->db,setskeys[j]) :
3202 lookupKeyRead(c->db,setskeys[j]);
3203 if (!setobj) {
3204 dv[j] = NULL;
3205 continue;
3206 }
3207 if (setobj->type != REDIS_SET) {
3208 zfree(dv);
3209 addReply(c,shared.wrongtypeerr);
3210 return;
3211 }
3212 dv[j] = setobj->ptr;
3213 }
3214
3215 /* We need a temp set object to store our union. If the dstkey
3216 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3217 * this set object will be the resulting object to set into the target key*/
3218 dstset = createSetObject();
3219
3220 /* Iterate all the elements of all the sets, add every element a single
3221 * time to the result set */
3222 for (j = 0; j < setsnum; j++) {
3223 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3224 if (!dv[j]) continue; /* non existing keys are like empty sets */
3225
3226 di = dictGetIterator(dv[j]);
3227 if (!di) oom("dictGetIterator");
3228
3229 while((de = dictNext(di)) != NULL) {
3230 robj *ele;
3231
3232 /* dictAdd will not add the same element multiple times */
3233 ele = dictGetEntryKey(de);
3234 if (op == REDIS_OP_UNION || j == 0) {
3235 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3236 incrRefCount(ele);
3237 cardinality++;
3238 }
3239 } else if (op == REDIS_OP_DIFF) {
3240 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3241 cardinality--;
3242 }
3243 }
3244 }
3245 dictReleaseIterator(di);
3246
3247 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3248 }
3249
3250 /* Output the content of the resulting set, if not in STORE mode */
3251 if (!dstkey) {
3252 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3253 di = dictGetIterator(dstset->ptr);
3254 if (!di) oom("dictGetIterator");
3255 while((de = dictNext(di)) != NULL) {
3256 robj *ele;
3257
3258 ele = dictGetEntryKey(de);
3259 addReplySds(c,sdscatprintf(sdsempty(),
3260 "$%d\r\n",sdslen(ele->ptr)));
3261 addReply(c,ele);
3262 addReply(c,shared.crlf);
3263 }
3264 dictReleaseIterator(di);
3265 } else {
3266 /* If we have a target key where to store the resulting set
3267 * create this key with the result set inside */
3268 deleteKey(c->db,dstkey);
3269 dictAdd(c->db->dict,dstkey,dstset);
3270 incrRefCount(dstkey);
3271 }
3272
3273 /* Cleanup */
3274 if (!dstkey) {
3275 decrRefCount(dstset);
3276 } else {
3277 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3278 dictSize((dict*)dstset->ptr)));
3279 server.dirty++;
3280 }
3281 zfree(dv);
3282 }
3283
3284 static void sunionCommand(redisClient *c) {
3285 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3286 }
3287
3288 static void sunionstoreCommand(redisClient *c) {
3289 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3290 }
3291
3292 static void sdiffCommand(redisClient *c) {
3293 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3294 }
3295
3296 static void sdiffstoreCommand(redisClient *c) {
3297 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3298 }
3299
3300 static void flushdbCommand(redisClient *c) {
3301 server.dirty += dictSize(c->db->dict);
3302 dictEmpty(c->db->dict);
3303 dictEmpty(c->db->expires);
3304 addReply(c,shared.ok);
3305 }
3306
3307 static void flushallCommand(redisClient *c) {
3308 server.dirty += emptyDb();
3309 addReply(c,shared.ok);
3310 rdbSave(server.dbfilename);
3311 server.dirty++;
3312 }
3313
3314 static redisSortOperation *createSortOperation(int type, robj *pattern) {
3315 redisSortOperation *so = zmalloc(sizeof(*so));
3316 if (!so) oom("createSortOperation");
3317 so->type = type;
3318 so->pattern = pattern;
3319 return so;
3320 }
3321
3322 /* Return the value associated to the key with a name obtained
3323 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3324 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3325 char *p;
3326 sds spat, ssub;
3327 robj keyobj;
3328 int prefixlen, sublen, postfixlen;
3329 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3330 struct {
3331 long len;
3332 long free;
3333 char buf[REDIS_SORTKEY_MAX+1];
3334 } keyname;
3335
3336 spat = pattern->ptr;
3337 ssub = subst->ptr;
3338 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3339 p = strchr(spat,'*');
3340 if (!p) return NULL;
3341
3342 prefixlen = p-spat;
3343 sublen = sdslen(ssub);
3344 postfixlen = sdslen(spat)-(prefixlen+1);
3345 memcpy(keyname.buf,spat,prefixlen);
3346 memcpy(keyname.buf+prefixlen,ssub,sublen);
3347 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3348 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3349 keyname.len = prefixlen+sublen+postfixlen;
3350
3351 keyobj.refcount = 1;
3352 keyobj.type = REDIS_STRING;
3353 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3354
3355 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3356 return lookupKeyRead(db,&keyobj);
3357 }
3358
3359 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3360 * the additional parameter is not standard but a BSD-specific we have to
3361 * pass sorting parameters via the global 'server' structure */
3362 static int sortCompare(const void *s1, const void *s2) {
3363 const redisSortObject *so1 = s1, *so2 = s2;
3364 int cmp;
3365
3366 if (!server.sort_alpha) {
3367 /* Numeric sorting. Here it's trivial as we precomputed scores */
3368 if (so1->u.score > so2->u.score) {
3369 cmp = 1;
3370 } else if (so1->u.score < so2->u.score) {
3371 cmp = -1;
3372 } else {
3373 cmp = 0;
3374 }
3375 } else {
3376 /* Alphanumeric sorting */
3377 if (server.sort_bypattern) {
3378 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3379 /* At least one compare object is NULL */
3380 if (so1->u.cmpobj == so2->u.cmpobj)
3381 cmp = 0;
3382 else if (so1->u.cmpobj == NULL)
3383 cmp = -1;
3384 else
3385 cmp = 1;
3386 } else {
3387 /* We have both the objects, use strcoll */
3388 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3389 }
3390 } else {
3391 /* Compare elements directly */
3392 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3393 }
3394 }
3395 return server.sort_desc ? -cmp : cmp;
3396 }
3397
3398 /* The SORT command is the most complex command in Redis. Warning: this code
3399 * is optimized for speed and a bit less for readability */
3400 static void sortCommand(redisClient *c) {
3401 list *operations;
3402 int outputlen = 0;
3403 int desc = 0, alpha = 0;
3404 int limit_start = 0, limit_count = -1, start, end;
3405 int j, dontsort = 0, vectorlen;
3406 int getop = 0; /* GET operation counter */
3407 robj *sortval, *sortby = NULL;
3408 redisSortObject *vector; /* Resulting vector to sort */
3409
3410 /* Lookup the key to sort. It must be of the right types */
3411 sortval = lookupKeyRead(c->db,c->argv[1]);
3412 if (sortval == NULL) {
3413 addReply(c,shared.nokeyerr);
3414 return;
3415 }
3416 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3417 addReply(c,shared.wrongtypeerr);
3418 return;
3419 }
3420
3421 /* Create a list of operations to perform for every sorted element.
3422 * Operations can be GET/DEL/INCR/DECR */
3423 operations = listCreate();
3424 listSetFreeMethod(operations,zfree);
3425 j = 2;
3426
3427 /* Now we need to protect sortval incrementing its count, in the future
3428 * SORT may have options able to overwrite/delete keys during the sorting
3429 * and the sorted key itself may get destroied */
3430 incrRefCount(sortval);
3431
3432 /* The SORT command has an SQL-alike syntax, parse it */
3433 while(j < c->argc) {
3434 int leftargs = c->argc-j-1;
3435 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3436 desc = 0;
3437 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3438 desc = 1;
3439 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3440 alpha = 1;
3441 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3442 limit_start = atoi(c->argv[j+1]->ptr);
3443 limit_count = atoi(c->argv[j+2]->ptr);
3444 j+=2;
3445 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3446 sortby = c->argv[j+1];
3447 /* If the BY pattern does not contain '*', i.e. it is constant,
3448 * we don't need to sort nor to lookup the weight keys. */
3449 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3450 j++;
3451 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3452 listAddNodeTail(operations,createSortOperation(
3453 REDIS_SORT_GET,c->argv[j+1]));
3454 getop++;
3455 j++;
3456 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3457 listAddNodeTail(operations,createSortOperation(
3458 REDIS_SORT_DEL,c->argv[j+1]));
3459 j++;
3460 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3461 listAddNodeTail(operations,createSortOperation(
3462 REDIS_SORT_INCR,c->argv[j+1]));
3463 j++;
3464 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3465 listAddNodeTail(operations,createSortOperation(
3466 REDIS_SORT_DECR,c->argv[j+1]));
3467 j++;
3468 } else {
3469 decrRefCount(sortval);
3470 listRelease(operations);
3471 addReply(c,shared.syntaxerr);
3472 return;
3473 }
3474 j++;
3475 }
3476
3477 /* Load the sorting vector with all the objects to sort */
3478 vectorlen = (sortval->type == REDIS_LIST) ?
3479 listLength((list*)sortval->ptr) :
3480 dictSize((dict*)sortval->ptr);
3481 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3482 if (!vector) oom("allocating objects vector for SORT");
3483 j = 0;
3484 if (sortval->type == REDIS_LIST) {
3485 list *list = sortval->ptr;
3486 listNode *ln;
3487
3488 listRewind(list);
3489 while((ln = listYield(list))) {
3490 robj *ele = ln->value;
3491 vector[j].obj = ele;
3492 vector[j].u.score = 0;
3493 vector[j].u.cmpobj = NULL;
3494 j++;
3495 }
3496 } else {
3497 dict *set = sortval->ptr;
3498 dictIterator *di;
3499 dictEntry *setele;
3500
3501 di = dictGetIterator(set);
3502 if (!di) oom("dictGetIterator");
3503 while((setele = dictNext(di)) != NULL) {
3504 vector[j].obj = dictGetEntryKey(setele);
3505 vector[j].u.score = 0;
3506 vector[j].u.cmpobj = NULL;
3507 j++;
3508 }
3509 dictReleaseIterator(di);
3510 }
3511 assert(j == vectorlen);
3512
3513 /* Now it's time to load the right scores in the sorting vector */
3514 if (dontsort == 0) {
3515 for (j = 0; j < vectorlen; j++) {
3516 if (sortby) {
3517 robj *byval;
3518
3519 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3520 if (!byval || byval->type != REDIS_STRING) continue;
3521 if (alpha) {
3522 vector[j].u.cmpobj = byval;
3523 incrRefCount(byval);
3524 } else {
3525 vector[j].u.score = strtod(byval->ptr,NULL);
3526 }
3527 } else {
3528 if (!alpha) vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3529 }
3530 }
3531 }
3532
3533 /* We are ready to sort the vector... perform a bit of sanity check
3534 * on the LIMIT option too. We'll use a partial version of quicksort. */
3535 start = (limit_start < 0) ? 0 : limit_start;
3536 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3537 if (start >= vectorlen) {
3538 start = vectorlen-1;
3539 end = vectorlen-2;
3540 }
3541 if (end >= vectorlen) end = vectorlen-1;
3542
3543 if (dontsort == 0) {
3544 server.sort_desc = desc;
3545 server.sort_alpha = alpha;
3546 server.sort_bypattern = sortby ? 1 : 0;
3547 if (sortby && (start != 0 || end != vectorlen-1))
3548 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3549 else
3550 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3551 }
3552
3553 /* Send command output to the output buffer, performing the specified
3554 * GET/DEL/INCR/DECR operations if any. */
3555 outputlen = getop ? getop*(end-start+1) : end-start+1;
3556 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3557 for (j = start; j <= end; j++) {
3558 listNode *ln;
3559 if (!getop) {
3560 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3561 sdslen(vector[j].obj->ptr)));
3562 addReply(c,vector[j].obj);
3563 addReply(c,shared.crlf);
3564 }
3565 listRewind(operations);
3566 while((ln = listYield(operations))) {
3567 redisSortOperation *sop = ln->value;
3568 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3569 vector[j].obj);
3570
3571 if (sop->type == REDIS_SORT_GET) {
3572 if (!val || val->type != REDIS_STRING) {
3573 addReply(c,shared.nullbulk);
3574 } else {
3575 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3576 sdslen(val->ptr)));
3577 addReply(c,val);
3578 addReply(c,shared.crlf);
3579 }
3580 } else if (sop->type == REDIS_SORT_DEL) {
3581 /* TODO */
3582 }
3583 }
3584 }
3585
3586 /* Cleanup */
3587 decrRefCount(sortval);
3588 listRelease(operations);
3589 for (j = 0; j < vectorlen; j++) {
3590 if (sortby && alpha && vector[j].u.cmpobj)
3591 decrRefCount(vector[j].u.cmpobj);
3592 }
3593 zfree(vector);
3594 }
3595
3596 static void infoCommand(redisClient *c) {
3597 sds info;
3598 time_t uptime = time(NULL)-server.stat_starttime;
3599
3600 info = sdscatprintf(sdsempty(),
3601 "redis_version:%s\r\n"
3602 "uptime_in_seconds:%d\r\n"
3603 "uptime_in_days:%d\r\n"
3604 "connected_clients:%d\r\n"
3605 "connected_slaves:%d\r\n"
3606 "used_memory:%zu\r\n"
3607 "changes_since_last_save:%lld\r\n"
3608 "bgsave_in_progress:%d\r\n"
3609 "last_save_time:%d\r\n"
3610 "total_connections_received:%lld\r\n"
3611 "total_commands_processed:%lld\r\n"
3612 "role:%s\r\n"
3613 ,REDIS_VERSION,
3614 uptime,
3615 uptime/(3600*24),
3616 listLength(server.clients)-listLength(server.slaves),
3617 listLength(server.slaves),
3618 server.usedmemory,
3619 server.dirty,
3620 server.bgsaveinprogress,
3621 server.lastsave,
3622 server.stat_numconnections,
3623 server.stat_numcommands,
3624 server.masterhost == NULL ? "master" : "slave"
3625 );
3626 if (server.masterhost) {
3627 info = sdscatprintf(info,
3628 "master_host:%s\r\n"
3629 "master_port:%d\r\n"
3630 "master_link_status:%s\r\n"
3631 "master_last_io_seconds_ago:%d\r\n"
3632 ,server.masterhost,
3633 server.masterport,
3634 (server.replstate == REDIS_REPL_CONNECTED) ?
3635 "up" : "down",
3636 (int)(time(NULL)-server.master->lastinteraction)
3637 );
3638 }
3639 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
3640 addReplySds(c,info);
3641 addReply(c,shared.crlf);
3642 }
3643
3644 static void monitorCommand(redisClient *c) {
3645 /* ignore MONITOR if aleady slave or in monitor mode */
3646 if (c->flags & REDIS_SLAVE) return;
3647
3648 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3649 c->slaveseldb = 0;
3650 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3651 addReply(c,shared.ok);
3652 }
3653
3654 /* ================================= Expire ================================= */
3655 static int removeExpire(redisDb *db, robj *key) {
3656 if (dictDelete(db->expires,key) == DICT_OK) {
3657 return 1;
3658 } else {
3659 return 0;
3660 }
3661 }
3662
3663 static int setExpire(redisDb *db, robj *key, time_t when) {
3664 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
3665 return 0;
3666 } else {
3667 incrRefCount(key);
3668 return 1;
3669 }
3670 }
3671
3672 /* Return the expire time of the specified key, or -1 if no expire
3673 * is associated with this key (i.e. the key is non volatile) */
3674 static time_t getExpire(redisDb *db, robj *key) {
3675 dictEntry *de;
3676
3677 /* No expire? return ASAP */
3678 if (dictSize(db->expires) == 0 ||
3679 (de = dictFind(db->expires,key)) == NULL) return -1;
3680
3681 return (time_t) dictGetEntryVal(de);
3682 }
3683
3684 static int expireIfNeeded(redisDb *db, robj *key) {
3685 time_t when;
3686 dictEntry *de;
3687
3688 /* No expire? return ASAP */
3689 if (dictSize(db->expires) == 0 ||
3690 (de = dictFind(db->expires,key)) == NULL) return 0;
3691
3692 /* Lookup the expire */
3693 when = (time_t) dictGetEntryVal(de);
3694 if (time(NULL) <= when) return 0;
3695
3696 /* Delete the key */
3697 dictDelete(db->expires,key);
3698 return dictDelete(db->dict,key) == DICT_OK;
3699 }
3700
3701 static int deleteIfVolatile(redisDb *db, robj *key) {
3702 dictEntry *de;
3703
3704 /* No expire? return ASAP */
3705 if (dictSize(db->expires) == 0 ||
3706 (de = dictFind(db->expires,key)) == NULL) return 0;
3707
3708 /* Delete the key */
3709 server.dirty++;
3710 dictDelete(db->expires,key);
3711 return dictDelete(db->dict,key) == DICT_OK;
3712 }
3713
3714 static void expireCommand(redisClient *c) {
3715 dictEntry *de;
3716 int seconds = atoi(c->argv[2]->ptr);
3717
3718 de = dictFind(c->db->dict,c->argv[1]);
3719 if (de == NULL) {
3720 addReply(c,shared.czero);
3721 return;
3722 }
3723 if (seconds <= 0) {
3724 addReply(c, shared.czero);
3725 return;
3726 } else {
3727 time_t when = time(NULL)+seconds;
3728 if (setExpire(c->db,c->argv[1],when))
3729 addReply(c,shared.cone);
3730 else
3731 addReply(c,shared.czero);
3732 return;
3733 }
3734 }
3735
3736 static void ttlCommand(redisClient *c) {
3737 time_t expire;
3738 int ttl = -1;
3739
3740 expire = getExpire(c->db,c->argv[1]);
3741 if (expire != -1) {
3742 ttl = (int) (expire-time(NULL));
3743 if (ttl < 0) ttl = -1;
3744 }
3745 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
3746 }
3747
3748 /* =============================== Replication ============================= */
3749
3750 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
3751 ssize_t nwritten, ret = size;
3752 time_t start = time(NULL);
3753
3754 timeout++;
3755 while(size) {
3756 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
3757 nwritten = write(fd,ptr,size);
3758 if (nwritten == -1) return -1;
3759 ptr += nwritten;
3760 size -= nwritten;
3761 }
3762 if ((time(NULL)-start) > timeout) {
3763 errno = ETIMEDOUT;
3764 return -1;
3765 }
3766 }
3767 return ret;
3768 }
3769
3770 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
3771 ssize_t nread, totread = 0;
3772 time_t start = time(NULL);
3773
3774 timeout++;
3775 while(size) {
3776 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
3777 nread = read(fd,ptr,size);
3778 if (nread == -1) return -1;
3779 ptr += nread;
3780 size -= nread;
3781 totread += nread;
3782 }
3783 if ((time(NULL)-start) > timeout) {
3784 errno = ETIMEDOUT;
3785 return -1;
3786 }
3787 }
3788 return totread;
3789 }
3790
3791 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
3792 ssize_t nread = 0;
3793
3794 size--;
3795 while(size) {
3796 char c;
3797
3798 if (syncRead(fd,&c,1,timeout) == -1) return -1;
3799 if (c == '\n') {
3800 *ptr = '\0';
3801 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
3802 return nread;
3803 } else {
3804 *ptr++ = c;
3805 *ptr = '\0';
3806 nread++;
3807 }
3808 }
3809 return nread;
3810 }
3811
3812 static void syncCommand(redisClient *c) {
3813 /* ignore SYNC if aleady slave or in monitor mode */
3814 if (c->flags & REDIS_SLAVE) return;
3815
3816 /* SYNC can't be issued when the server has pending data to send to
3817 * the client about already issued commands. We need a fresh reply
3818 * buffer registering the differences between the BGSAVE and the current
3819 * dataset, so that we can copy to other slaves if needed. */
3820 if (listLength(c->reply) != 0) {
3821 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3822 return;
3823 }
3824
3825 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
3826 /* Here we need to check if there is a background saving operation
3827 * in progress, or if it is required to start one */
3828 if (server.bgsaveinprogress) {
3829 /* Ok a background save is in progress. Let's check if it is a good
3830 * one for replication, i.e. if there is another slave that is
3831 * registering differences since the server forked to save */
3832 redisClient *slave;
3833 listNode *ln;
3834
3835 listRewind(server.slaves);
3836 while((ln = listYield(server.slaves))) {
3837 slave = ln->value;
3838 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
3839 }
3840 if (ln) {
3841 /* Perfect, the server is already registering differences for
3842 * another slave. Set the right state, and copy the buffer. */
3843 listRelease(c->reply);
3844 c->reply = listDup(slave->reply);
3845 if (!c->reply) oom("listDup copying slave reply list");
3846 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3847 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
3848 } else {
3849 /* No way, we need to wait for the next BGSAVE in order to
3850 * register differences */
3851 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
3852 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
3853 }
3854 } else {
3855 /* Ok we don't have a BGSAVE in progress, let's start one */
3856 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
3857 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3858 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
3859 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
3860 return;
3861 }
3862 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3863 }
3864 c->repldbfd = -1;
3865 c->flags |= REDIS_SLAVE;
3866 c->slaveseldb = 0;
3867 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
3868 return;
3869 }
3870
3871 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
3872 redisClient *slave = privdata;
3873 REDIS_NOTUSED(el);
3874 REDIS_NOTUSED(mask);
3875 char buf[REDIS_IOBUF_LEN];
3876 ssize_t nwritten, buflen;
3877
3878 if (slave->repldboff == 0) {
3879 /* Write the bulk write count before to transfer the DB. In theory here
3880 * we don't know how much room there is in the output buffer of the
3881 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3882 * operations) will never be smaller than the few bytes we need. */
3883 sds bulkcount;
3884
3885 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3886 slave->repldbsize);
3887 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
3888 {
3889 sdsfree(bulkcount);
3890 freeClient(slave);
3891 return;
3892 }
3893 sdsfree(bulkcount);
3894 }
3895 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
3896 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
3897 if (buflen <= 0) {
3898 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
3899 (buflen == 0) ? "premature EOF" : strerror(errno));
3900 freeClient(slave);
3901 return;
3902 }
3903 if ((nwritten = write(fd,buf,buflen)) == -1) {
3904 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
3905 strerror(errno));
3906 freeClient(slave);
3907 return;
3908 }
3909 slave->repldboff += nwritten;
3910 if (slave->repldboff == slave->repldbsize) {
3911 close(slave->repldbfd);
3912 slave->repldbfd = -1;
3913 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3914 slave->replstate = REDIS_REPL_ONLINE;
3915 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
3916 sendReplyToClient, slave, NULL) == AE_ERR) {
3917 freeClient(slave);
3918 return;
3919 }
3920 addReplySds(slave,sdsempty());
3921 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
3922 }
3923 }
3924
3925 static void updateSalvesWaitingBgsave(int bgsaveerr) {
3926 listNode *ln;
3927 int startbgsave = 0;
3928
3929 listRewind(server.slaves);
3930 while((ln = listYield(server.slaves))) {
3931 redisClient *slave = ln->value;
3932
3933 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
3934 startbgsave = 1;
3935 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3936 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
3937 struct redis_stat buf;
3938
3939 if (bgsaveerr != REDIS_OK) {
3940 freeClient(slave);
3941 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
3942 continue;
3943 }
3944 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
3945 redis_fstat(slave->repldbfd,&buf) == -1) {
3946 freeClient(slave);
3947 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
3948 continue;
3949 }
3950 slave->repldboff = 0;
3951 slave->repldbsize = buf.st_size;
3952 slave->replstate = REDIS_REPL_SEND_BULK;
3953 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3954 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
3955 freeClient(slave);
3956 continue;
3957 }
3958 }
3959 }
3960 if (startbgsave) {
3961 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3962 listRewind(server.slaves);
3963 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
3964 while((ln = listYield(server.slaves))) {
3965 redisClient *slave = ln->value;
3966
3967 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
3968 freeClient(slave);
3969 }
3970 }
3971 }
3972 }
3973
3974 static int syncWithMaster(void) {
3975 char buf[1024], tmpfile[256];
3976 int dumpsize;
3977 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
3978 int dfd;
3979
3980 if (fd == -1) {
3981 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
3982 strerror(errno));
3983 return REDIS_ERR;
3984 }
3985 /* Issue the SYNC command */
3986 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
3987 close(fd);
3988 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
3989 strerror(errno));
3990 return REDIS_ERR;
3991 }
3992 /* Read the bulk write count */
3993 if (syncReadLine(fd,buf,1024,3600) == -1) {
3994 close(fd);
3995 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
3996 strerror(errno));
3997 return REDIS_ERR;
3998 }
3999 dumpsize = atoi(buf+1);
4000 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4001 /* Read the bulk write data on a temp file */
4002 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4003 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4004 if (dfd == -1) {
4005 close(fd);
4006 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
4007 return REDIS_ERR;
4008 }
4009 while(dumpsize) {
4010 int nread, nwritten;
4011
4012 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
4013 if (nread == -1) {
4014 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
4015 strerror(errno));
4016 close(fd);
4017 close(dfd);
4018 return REDIS_ERR;
4019 }
4020 nwritten = write(dfd,buf,nread);
4021 if (nwritten == -1) {
4022 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
4023 close(fd);
4024 close(dfd);
4025 return REDIS_ERR;
4026 }
4027 dumpsize -= nread;
4028 }
4029 close(dfd);
4030 if (rename(tmpfile,server.dbfilename) == -1) {
4031 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
4032 unlink(tmpfile);
4033 close(fd);
4034 return REDIS_ERR;
4035 }
4036 emptyDb();
4037 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4038 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4039 close(fd);
4040 return REDIS_ERR;
4041 }
4042 server.master = createClient(fd);
4043 server.master->flags |= REDIS_MASTER;
4044 server.replstate = REDIS_REPL_CONNECTED;
4045 return REDIS_OK;
4046 }
4047
4048 static void slaveofCommand(redisClient *c) {
4049 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4050 !strcasecmp(c->argv[2]->ptr,"one")) {
4051 if (server.masterhost) {
4052 sdsfree(server.masterhost);
4053 server.masterhost = NULL;
4054 if (server.master) freeClient(server.master);
4055 server.replstate = REDIS_REPL_NONE;
4056 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4057 }
4058 } else {
4059 sdsfree(server.masterhost);
4060 server.masterhost = sdsdup(c->argv[1]->ptr);
4061 server.masterport = atoi(c->argv[2]->ptr);
4062 if (server.master) freeClient(server.master);
4063 server.replstate = REDIS_REPL_CONNECT;
4064 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4065 server.masterhost, server.masterport);
4066 }
4067 addReply(c,shared.ok);
4068 }
4069
4070 /* ============================ Maxmemory directive ======================== */
4071
4072 /* This function gets called when 'maxmemory' is set on the config file to limit
4073 * the max memory used by the server, and we are out of memory.
4074 * This function will try to, in order:
4075 *
4076 * - Free objects from the free list
4077 * - Try to remove keys with an EXPIRE set
4078 *
4079 * It is not possible to free enough memory to reach used-memory < maxmemory
4080 * the server will start refusing commands that will enlarge even more the
4081 * memory usage.
4082 */
4083 static void freeMemoryIfNeeded(void) {
4084 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4085 if (listLength(server.objfreelist)) {
4086 robj *o;
4087
4088 listNode *head = listFirst(server.objfreelist);
4089 o = listNodeValue(head);
4090 listDelNode(server.objfreelist,head);
4091 zfree(o);
4092 } else {
4093 int j, k, freed = 0;
4094
4095 for (j = 0; j < server.dbnum; j++) {
4096 int minttl = -1;
4097 robj *minkey = NULL;
4098 struct dictEntry *de;
4099
4100 if (dictSize(server.db[j].expires)) {
4101 freed = 1;
4102 /* From a sample of three keys drop the one nearest to
4103 * the natural expire */
4104 for (k = 0; k < 3; k++) {
4105 time_t t;
4106
4107 de = dictGetRandomKey(server.db[j].expires);
4108 t = (time_t) dictGetEntryVal(de);
4109 if (minttl == -1 || t < minttl) {
4110 minkey = dictGetEntryKey(de);
4111 minttl = t;
4112 }
4113 }
4114 deleteKey(server.db+j,minkey);
4115 }
4116 }
4117 if (!freed) return; /* nothing to free... */
4118 }
4119 }
4120 }
4121
4122 /* ================================= Debugging ============================== */
4123
4124 static void debugCommand(redisClient *c) {
4125 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4126 *((char*)-1) = 'x';
4127 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4128 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4129 robj *key, *val;
4130
4131 if (!de) {
4132 addReply(c,shared.nokeyerr);
4133 return;
4134 }
4135 key = dictGetEntryKey(de);
4136 val = dictGetEntryVal(de);
4137 addReplySds(c,sdscatprintf(sdsempty(),
4138 "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
4139 key, key->refcount, val, val->refcount));
4140 } else {
4141 addReplySds(c,sdsnew(
4142 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4143 }
4144 }
4145
4146 #ifdef HAVE_BACKTRACE
4147 static struct redisFunctionSym symsTable[] = {
4148 {"freeStringObject", (unsigned long)freeStringObject},
4149 {"freeListObject", (unsigned long)freeListObject},
4150 {"freeSetObject", (unsigned long)freeSetObject},
4151 {"decrRefCount", (unsigned long)decrRefCount},
4152 {"createObject", (unsigned long)createObject},
4153 {"freeClient", (unsigned long)freeClient},
4154 {"rdbLoad", (unsigned long)rdbLoad},
4155 {"addReply", (unsigned long)addReply},
4156 {"addReplySds", (unsigned long)addReplySds},
4157 {"incrRefCount", (unsigned long)incrRefCount},
4158 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
4159 {"createStringObject", (unsigned long)createStringObject},
4160 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
4161 {"syncWithMaster", (unsigned long)syncWithMaster},
4162 {"tryObjectSharing", (unsigned long)tryObjectSharing},
4163 {"removeExpire", (unsigned long)removeExpire},
4164 {"expireIfNeeded", (unsigned long)expireIfNeeded},
4165 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
4166 {"deleteKey", (unsigned long)deleteKey},
4167 {"getExpire", (unsigned long)getExpire},
4168 {"setExpire", (unsigned long)setExpire},
4169 {"updateSalvesWaitingBgsave", (unsigned long)updateSalvesWaitingBgsave},
4170 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
4171 {"authCommand", (unsigned long)authCommand},
4172 {"pingCommand", (unsigned long)pingCommand},
4173 {"echoCommand", (unsigned long)echoCommand},
4174 {"setCommand", (unsigned long)setCommand},
4175 {"setnxCommand", (unsigned long)setnxCommand},
4176 {"getCommand", (unsigned long)getCommand},
4177 {"delCommand", (unsigned long)delCommand},
4178 {"existsCommand", (unsigned long)existsCommand},
4179 {"incrCommand", (unsigned long)incrCommand},
4180 {"decrCommand", (unsigned long)decrCommand},
4181 {"incrbyCommand", (unsigned long)incrbyCommand},
4182 {"decrbyCommand", (unsigned long)decrbyCommand},
4183 {"selectCommand", (unsigned long)selectCommand},
4184 {"randomkeyCommand", (unsigned long)randomkeyCommand},
4185 {"keysCommand", (unsigned long)keysCommand},
4186 {"dbsizeCommand", (unsigned long)dbsizeCommand},
4187 {"lastsaveCommand", (unsigned long)lastsaveCommand},
4188 {"saveCommand", (unsigned long)saveCommand},
4189 {"bgsaveCommand", (unsigned long)bgsaveCommand},
4190 {"shutdownCommand", (unsigned long)shutdownCommand},
4191 {"moveCommand", (unsigned long)moveCommand},
4192 {"renameCommand", (unsigned long)renameCommand},
4193 {"renamenxCommand", (unsigned long)renamenxCommand},
4194 {"lpushCommand", (unsigned long)lpushCommand},
4195 {"rpushCommand", (unsigned long)rpushCommand},
4196 {"lpopCommand", (unsigned long)lpopCommand},
4197 {"rpopCommand", (unsigned long)rpopCommand},
4198 {"llenCommand", (unsigned long)llenCommand},
4199 {"lindexCommand", (unsigned long)lindexCommand},
4200 {"lrangeCommand", (unsigned long)lrangeCommand},
4201 {"ltrimCommand", (unsigned long)ltrimCommand},
4202 {"typeCommand", (unsigned long)typeCommand},
4203 {"lsetCommand", (unsigned long)lsetCommand},
4204 {"saddCommand", (unsigned long)saddCommand},
4205 {"sremCommand", (unsigned long)sremCommand},
4206 {"smoveCommand", (unsigned long)smoveCommand},
4207 {"sismemberCommand", (unsigned long)sismemberCommand},
4208 {"scardCommand", (unsigned long)scardCommand},
4209 {"spopCommand", (unsigned long)spopCommand},
4210 {"sinterCommand", (unsigned long)sinterCommand},
4211 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
4212 {"sunionCommand", (unsigned long)sunionCommand},
4213 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
4214 {"sdiffCommand", (unsigned long)sdiffCommand},
4215 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
4216 {"syncCommand", (unsigned long)syncCommand},
4217 {"flushdbCommand", (unsigned long)flushdbCommand},
4218 {"flushallCommand", (unsigned long)flushallCommand},
4219 {"sortCommand", (unsigned long)sortCommand},
4220 {"lremCommand", (unsigned long)lremCommand},
4221 {"infoCommand", (unsigned long)infoCommand},
4222 {"mgetCommand", (unsigned long)mgetCommand},
4223 {"monitorCommand", (unsigned long)monitorCommand},
4224 {"expireCommand", (unsigned long)expireCommand},
4225 {"getSetCommand", (unsigned long)getSetCommand},
4226 {"ttlCommand", (unsigned long)ttlCommand},
4227 {"slaveofCommand", (unsigned long)slaveofCommand},
4228 {"debugCommand", (unsigned long)debugCommand},
4229 {"processCommand", (unsigned long)processCommand},
4230 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
4231 {"readQueryFromClient", (unsigned long)readQueryFromClient},
4232 {NULL,0}
4233 };
4234
4235 /* This function try to convert a pointer into a function name. It's used in
4236 * oreder to provide a backtrace under segmentation fault that's able to
4237 * display functions declared as static (otherwise the backtrace is useless). */
4238 static char *findFuncName(void *pointer, unsigned long *offset){
4239 int i, ret = -1;
4240 unsigned long off, minoff = 0;
4241
4242 /* Try to match against the Symbol with the smallest offset */
4243 for (i=0; symsTable[i].pointer; i++) {
4244 unsigned long lp = (unsigned long) pointer;
4245
4246 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
4247 off=lp-symsTable[i].pointer;
4248 if (ret < 0 || off < minoff) {
4249 minoff=off;
4250 ret=i;
4251 }
4252 }
4253 }
4254 if (ret == -1) return NULL;
4255 *offset = minoff;
4256 return symsTable[ret].name;
4257 }
4258
4259 static void *getMcontextEip(ucontext_t *uc) {
4260 #if defined(__FreeBSD__)
4261 return (void*) uc->uc_mcontext.mc_eip;
4262 #elif defined(__dietlibc__)
4263 return (void*) uc->uc_mcontext.eip;
4264 #elif defined(__APPLE__)
4265 return (void*) uc->uc_mcontext->__ss.__eip;
4266 #else /* Linux */
4267 return (void*) uc->uc_mcontext.gregs[REG_EIP];
4268 #endif
4269 }
4270
4271 static void segvHandler(int sig, siginfo_t *info, void *secret) {
4272 void *trace[100];
4273 char **messages = NULL;
4274 int i, trace_size = 0;
4275 unsigned long offset=0;
4276 time_t uptime = time(NULL)-server.stat_starttime;
4277 ucontext_t *uc = (ucontext_t*) secret;
4278 REDIS_NOTUSED(info);
4279
4280 redisLog(REDIS_WARNING,
4281 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
4282 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
4283 "redis_version:%s; "
4284 "uptime_in_seconds:%d; "
4285 "connected_clients:%d; "
4286 "connected_slaves:%d; "
4287 "used_memory:%zu; "
4288 "changes_since_last_save:%lld; "
4289 "bgsave_in_progress:%d; "
4290 "last_save_time:%d; "
4291 "total_connections_received:%lld; "
4292 "total_commands_processed:%lld; "
4293 "role:%s;"
4294 ,REDIS_VERSION,
4295 uptime,
4296 listLength(server.clients)-listLength(server.slaves),
4297 listLength(server.slaves),
4298 server.usedmemory,
4299 server.dirty,
4300 server.bgsaveinprogress,
4301 server.lastsave,
4302 server.stat_numconnections,
4303 server.stat_numcommands,
4304 server.masterhost == NULL ? "master" : "slave"
4305 ));
4306
4307 trace_size = backtrace(trace, 100);
4308 /* overwrite sigaction with caller's address */
4309 trace[1] = getMcontextEip(uc);
4310 messages = backtrace_symbols(trace, trace_size);
4311
4312 for (i=1; i<trace_size; ++i) {
4313 char *fn = findFuncName(trace[i], &offset), *p;
4314
4315 p = strchr(messages[i],'+');
4316 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
4317 redisLog(REDIS_WARNING,"%s", messages[i]);
4318 } else {
4319 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
4320 }
4321 }
4322 free(messages);
4323 exit(0);
4324 }
4325
4326 static void setupSigSegvAction(void) {
4327 struct sigaction act;
4328
4329 sigemptyset (&act.sa_mask);
4330 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4331 * is used. Otherwise, sa_handler is used */
4332 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
4333 act.sa_sigaction = segvHandler;
4334 sigaction (SIGSEGV, &act, NULL);
4335 sigaction (SIGBUS, &act, NULL);
4336 sigaction (SIGFPE, &act, NULL);
4337 sigaction (SIGILL, &act, NULL);
4338 sigaction (SIGBUS, &act, NULL);
4339 return;
4340 }
4341 #else /* HAVE_BACKTRACE */
4342 static void setupSigSegvAction(void) {
4343 }
4344 #endif /* HAVE_BACKTRACE */
4345
4346 /* =================================== Main! ================================ */
4347
4348 #ifdef __linux__
4349 int linuxOvercommitMemoryValue(void) {
4350 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4351 char buf[64];
4352
4353 if (!fp) return -1;
4354 if (fgets(buf,64,fp) == NULL) {
4355 fclose(fp);
4356 return -1;
4357 }
4358 fclose(fp);
4359
4360 return atoi(buf);
4361 }
4362
4363 void linuxOvercommitMemoryWarning(void) {
4364 if (linuxOvercommitMemoryValue() == 0) {
4365 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4366 }
4367 }
4368 #endif /* __linux__ */
4369
4370 static void daemonize(void) {
4371 int fd;
4372 FILE *fp;
4373
4374 if (fork() != 0) exit(0); /* parent exits */
4375 setsid(); /* create a new session */
4376
4377 /* Every output goes to /dev/null. If Redis is daemonized but
4378 * the 'logfile' is set to 'stdout' in the configuration file
4379 * it will not log at all. */
4380 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4381 dup2(fd, STDIN_FILENO);
4382 dup2(fd, STDOUT_FILENO);
4383 dup2(fd, STDERR_FILENO);
4384 if (fd > STDERR_FILENO) close(fd);
4385 }
4386 /* Try to write the pid file */
4387 fp = fopen(server.pidfile,"w");
4388 if (fp) {
4389 fprintf(fp,"%d\n",getpid());
4390 fclose(fp);
4391 }
4392 }
4393
4394 int main(int argc, char **argv) {
4395 #ifdef __linux__
4396 linuxOvercommitMemoryWarning();
4397 #endif
4398
4399 initServerConfig();
4400 if (argc == 2) {
4401 ResetServerSaveParams();
4402 loadServerConfig(argv[1]);
4403 } else if (argc > 2) {
4404 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4405 exit(1);
4406 } else {
4407 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4408 }
4409 initServer();
4410 if (server.daemonize) daemonize();
4411 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4412 if (rdbLoad(server.dbfilename) == REDIS_OK)
4413 redisLog(REDIS_NOTICE,"DB loaded from disk");
4414 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4415 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4416 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4417 aeMain(server.el);
4418 aeDeleteEventLoop(server.el);
4419 return 0;
4420 }