]> git.saurik.com Git - redis.git/blob - redis.c
some minor changes to the backtrace code
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "0.101"
31
32 #include "fmacros.h"
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 #define __USE_POSIX199309
40 #include <signal.h>
41 #include <execinfo.h>
42 #include <ucontext.h>
43 #include <sys/wait.h>
44 #include <errno.h>
45 #include <assert.h>
46 #include <ctype.h>
47 #include <stdarg.h>
48 #include <inttypes.h>
49 #include <arpa/inet.h>
50 #include <sys/stat.h>
51 #include <fcntl.h>
52 #include <sys/time.h>
53 #include <sys/resource.h>
54 #include <limits.h>
55 #include <execinfo.h>
56
57 #include "redis.h"
58 #include "ae.h" /* Event driven programming library */
59 #include "sds.h" /* Dynamic safe strings */
60 #include "anet.h" /* Networking the easy way */
61 #include "dict.h" /* Hash tables */
62 #include "adlist.h" /* Linked lists */
63 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
64 #include "lzf.h" /* LZF compression library */
65 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
66
67 #include "config.h"
68
69 /* Error codes */
70 #define REDIS_OK 0
71 #define REDIS_ERR -1
72
73 /* Static server configuration */
74 #define REDIS_SERVERPORT 6379 /* TCP port */
75 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
76 #define REDIS_IOBUF_LEN 1024
77 #define REDIS_LOADBUF_LEN 1024
78 #define REDIS_STATIC_ARGS 4
79 #define REDIS_DEFAULT_DBNUM 16
80 #define REDIS_CONFIGLINE_MAX 1024
81 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
82 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
83 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
84 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
85
86 /* Hash table parameters */
87 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
88 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
89
90 /* Command flags */
91 #define REDIS_CMD_BULK 1 /* Bulk write command */
92 #define REDIS_CMD_INLINE 2 /* Inline command */
93 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
94 this flags will return an error when the 'maxmemory' option is set in the
95 config file and the server is using more than maxmemory bytes of memory.
96 In short this commands are denied on low memory conditions. */
97 #define REDIS_CMD_DENYOOM 4
98
99 /* Object types */
100 #define REDIS_STRING 0
101 #define REDIS_LIST 1
102 #define REDIS_SET 2
103 #define REDIS_HASH 3
104
105 /* Object types only used for dumping to disk */
106 #define REDIS_EXPIRETIME 253
107 #define REDIS_SELECTDB 254
108 #define REDIS_EOF 255
109
110 /* Defines related to the dump file format. To store 32 bits lengths for short
111 * keys requires a lot of space, so we check the most significant 2 bits of
112 * the first byte to interpreter the length:
113 *
114 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
115 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
116 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
117 * 11|000000 this means: specially encoded object will follow. The six bits
118 * number specify the kind of object that follows.
119 * See the REDIS_RDB_ENC_* defines.
120 *
121 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
122 * values, will fit inside. */
123 #define REDIS_RDB_6BITLEN 0
124 #define REDIS_RDB_14BITLEN 1
125 #define REDIS_RDB_32BITLEN 2
126 #define REDIS_RDB_ENCVAL 3
127 #define REDIS_RDB_LENERR UINT_MAX
128
129 /* When a length of a string object stored on disk has the first two bits
130 * set, the remaining two bits specify a special encoding for the object
131 * accordingly to the following defines: */
132 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
133 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
134 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
135 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
136
137 /* Client flags */
138 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
139 #define REDIS_SLAVE 2 /* This client is a slave server */
140 #define REDIS_MASTER 4 /* This client is a master server */
141 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
142
143 /* Slave replication state - slave side */
144 #define REDIS_REPL_NONE 0 /* No active replication */
145 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
146 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
147
148 /* Slave replication state - from the point of view of master
149 * Note that in SEND_BULK and ONLINE state the slave receives new updates
150 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
151 * to start the next background saving in order to send updates to it. */
152 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
153 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
154 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
155 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
156
157 /* List related stuff */
158 #define REDIS_HEAD 0
159 #define REDIS_TAIL 1
160
161 /* Sort operations */
162 #define REDIS_SORT_GET 0
163 #define REDIS_SORT_DEL 1
164 #define REDIS_SORT_INCR 2
165 #define REDIS_SORT_DECR 3
166 #define REDIS_SORT_ASC 4
167 #define REDIS_SORT_DESC 5
168 #define REDIS_SORTKEY_MAX 1024
169
170 /* Log levels */
171 #define REDIS_DEBUG 0
172 #define REDIS_NOTICE 1
173 #define REDIS_WARNING 2
174
175 /* Anti-warning macro... */
176 #define REDIS_NOTUSED(V) ((void) V)
177
178
179 /*================================= Data types ============================== */
180
181 /* A redis object, that is a type able to hold a string / list / set */
182 typedef struct redisObject {
183 void *ptr;
184 int type;
185 int refcount;
186 } robj;
187
188 typedef struct redisDb {
189 dict *dict;
190 dict *expires;
191 int id;
192 } redisDb;
193
194 /* With multiplexing we need to take per-clinet state.
195 * Clients are taken in a liked list. */
196 typedef struct redisClient {
197 int fd;
198 redisDb *db;
199 int dictid;
200 sds querybuf;
201 robj **argv;
202 int argc;
203 int bulklen; /* bulk read len. -1 if not in bulk read mode */
204 list *reply;
205 int sentlen;
206 time_t lastinteraction; /* time of the last interaction, used for timeout */
207 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
208 int slaveseldb; /* slave selected db, if this client is a slave */
209 int authenticated; /* when requirepass is non-NULL */
210 int replstate; /* replication state if this is a slave */
211 int repldbfd; /* replication DB file descriptor */
212 long repldboff; /* replication DB file offset */
213 off_t repldbsize; /* replication DB file size */
214 } redisClient;
215
216 struct saveparam {
217 time_t seconds;
218 int changes;
219 };
220
221 /* Global server state structure */
222 struct redisServer {
223 int port;
224 int fd;
225 redisDb *db;
226 dict *sharingpool;
227 unsigned int sharingpoolsize;
228 long long dirty; /* changes to DB from the last save */
229 list *clients;
230 list *slaves, *monitors;
231 char neterr[ANET_ERR_LEN];
232 aeEventLoop *el;
233 int cronloops; /* number of times the cron function run */
234 list *objfreelist; /* A list of freed objects to avoid malloc() */
235 time_t lastsave; /* Unix time of last save succeeede */
236 size_t usedmemory; /* Used memory in megabytes */
237 /* Fields used only for stats */
238 time_t stat_starttime; /* server start time */
239 long long stat_numcommands; /* number of processed commands */
240 long long stat_numconnections; /* number of connections received */
241 /* Configuration */
242 int verbosity;
243 int glueoutputbuf;
244 int maxidletime;
245 int dbnum;
246 int daemonize;
247 char *pidfile;
248 int bgsaveinprogress;
249 pid_t bgsavechildpid;
250 struct saveparam *saveparams;
251 int saveparamslen;
252 char *logfile;
253 char *bindaddr;
254 char *dbfilename;
255 char *requirepass;
256 int shareobjects;
257 /* Replication related */
258 int isslave;
259 char *masterhost;
260 int masterport;
261 redisClient *master; /* client that is master for this slave */
262 int replstate;
263 unsigned int maxclients;
264 unsigned int maxmemory;
265 /* Sort parameters - qsort_r() is only available under BSD so we
266 * have to take this state global, in order to pass it to sortCompare() */
267 int sort_desc;
268 int sort_alpha;
269 int sort_bypattern;
270 };
271
272 typedef void redisCommandProc(redisClient *c);
273 struct redisCommand {
274 char *name;
275 redisCommandProc *proc;
276 int arity;
277 int flags;
278 };
279
280 struct redisFunctionSym {
281 char *name;
282 unsigned long pointer;
283 };
284
285 typedef struct _redisSortObject {
286 robj *obj;
287 union {
288 double score;
289 robj *cmpobj;
290 } u;
291 } redisSortObject;
292
293 typedef struct _redisSortOperation {
294 int type;
295 robj *pattern;
296 } redisSortOperation;
297
298 struct sharedObjectsStruct {
299 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
300 *colon, *nullbulk, *nullmultibulk,
301 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
302 *outofrangeerr, *plus,
303 *select0, *select1, *select2, *select3, *select4,
304 *select5, *select6, *select7, *select8, *select9;
305 } shared;
306
307 /*================================ Prototypes =============================== */
308
309 static void freeStringObject(robj *o);
310 static void freeListObject(robj *o);
311 static void freeSetObject(robj *o);
312 static void decrRefCount(void *o);
313 static robj *createObject(int type, void *ptr);
314 static void freeClient(redisClient *c);
315 static int rdbLoad(char *filename);
316 static void addReply(redisClient *c, robj *obj);
317 static void addReplySds(redisClient *c, sds s);
318 static void incrRefCount(robj *o);
319 static int rdbSaveBackground(char *filename);
320 static robj *createStringObject(char *ptr, size_t len);
321 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
322 static int syncWithMaster(void);
323 static robj *tryObjectSharing(robj *o);
324 static int removeExpire(redisDb *db, robj *key);
325 static int expireIfNeeded(redisDb *db, robj *key);
326 static int deleteIfVolatile(redisDb *db, robj *key);
327 static int deleteKey(redisDb *db, robj *key);
328 static time_t getExpire(redisDb *db, robj *key);
329 static int setExpire(redisDb *db, robj *key, time_t when);
330 static void updateSalvesWaitingBgsave(int bgsaveerr);
331 static void freeMemoryIfNeeded(void);
332 static int processCommand(redisClient *c);
333 static void setupSigSegvAction(void);
334
335 static void authCommand(redisClient *c);
336 static void pingCommand(redisClient *c);
337 static void echoCommand(redisClient *c);
338 static void setCommand(redisClient *c);
339 static void setnxCommand(redisClient *c);
340 static void getCommand(redisClient *c);
341 static void delCommand(redisClient *c);
342 static void existsCommand(redisClient *c);
343 static void incrCommand(redisClient *c);
344 static void decrCommand(redisClient *c);
345 static void incrbyCommand(redisClient *c);
346 static void decrbyCommand(redisClient *c);
347 static void selectCommand(redisClient *c);
348 static void randomkeyCommand(redisClient *c);
349 static void keysCommand(redisClient *c);
350 static void dbsizeCommand(redisClient *c);
351 static void lastsaveCommand(redisClient *c);
352 static void saveCommand(redisClient *c);
353 static void bgsaveCommand(redisClient *c);
354 static void shutdownCommand(redisClient *c);
355 static void moveCommand(redisClient *c);
356 static void renameCommand(redisClient *c);
357 static void renamenxCommand(redisClient *c);
358 static void lpushCommand(redisClient *c);
359 static void rpushCommand(redisClient *c);
360 static void lpopCommand(redisClient *c);
361 static void rpopCommand(redisClient *c);
362 static void llenCommand(redisClient *c);
363 static void lindexCommand(redisClient *c);
364 static void lrangeCommand(redisClient *c);
365 static void ltrimCommand(redisClient *c);
366 static void typeCommand(redisClient *c);
367 static void lsetCommand(redisClient *c);
368 static void saddCommand(redisClient *c);
369 static void sremCommand(redisClient *c);
370 static void smoveCommand(redisClient *c);
371 static void sismemberCommand(redisClient *c);
372 static void scardCommand(redisClient *c);
373 static void sinterCommand(redisClient *c);
374 static void sinterstoreCommand(redisClient *c);
375 static void sunionCommand(redisClient *c);
376 static void sunionstoreCommand(redisClient *c);
377 static void sdiffCommand(redisClient *c);
378 static void sdiffstoreCommand(redisClient *c);
379 static void syncCommand(redisClient *c);
380 static void flushdbCommand(redisClient *c);
381 static void flushallCommand(redisClient *c);
382 static void sortCommand(redisClient *c);
383 static void lremCommand(redisClient *c);
384 static void infoCommand(redisClient *c);
385 static void mgetCommand(redisClient *c);
386 static void monitorCommand(redisClient *c);
387 static void expireCommand(redisClient *c);
388 static void getSetCommand(redisClient *c);
389 static void ttlCommand(redisClient *c);
390 static void slaveofCommand(redisClient *c);
391 static void debugCommand(redisClient *c);
392 /*================================= Globals ================================= */
393
394 /* Global vars */
395 static struct redisServer server; /* server global state */
396 static struct redisCommand cmdTable[] = {
397 {"get",getCommand,2,REDIS_CMD_INLINE},
398 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
399 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
400 {"del",delCommand,-2,REDIS_CMD_INLINE},
401 {"exists",existsCommand,2,REDIS_CMD_INLINE},
402 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
403 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
404 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
405 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
406 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
407 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
408 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
409 {"llen",llenCommand,2,REDIS_CMD_INLINE},
410 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
411 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
412 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
413 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
414 {"lrem",lremCommand,4,REDIS_CMD_BULK},
415 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
416 {"srem",sremCommand,3,REDIS_CMD_BULK},
417 {"smove",smoveCommand,4,REDIS_CMD_BULK},
418 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
419 {"scard",scardCommand,2,REDIS_CMD_INLINE},
420 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
421 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
422 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
423 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
424 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
425 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
426 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
427 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
428 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
429 {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
430 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
431 {"select",selectCommand,2,REDIS_CMD_INLINE},
432 {"move",moveCommand,3,REDIS_CMD_INLINE},
433 {"rename",renameCommand,3,REDIS_CMD_INLINE},
434 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
435 {"expire",expireCommand,3,REDIS_CMD_INLINE},
436 {"keys",keysCommand,2,REDIS_CMD_INLINE},
437 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
438 {"auth",authCommand,2,REDIS_CMD_INLINE},
439 {"ping",pingCommand,1,REDIS_CMD_INLINE},
440 {"echo",echoCommand,2,REDIS_CMD_BULK},
441 {"save",saveCommand,1,REDIS_CMD_INLINE},
442 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
443 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
444 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
445 {"type",typeCommand,2,REDIS_CMD_INLINE},
446 {"sync",syncCommand,1,REDIS_CMD_INLINE},
447 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
448 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
449 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
450 {"info",infoCommand,1,REDIS_CMD_INLINE},
451 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
452 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
453 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
454 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
455 {NULL,NULL,0,0}
456 };
457 /*============================ Utility functions ============================ */
458
459 /* Glob-style pattern matching. */
460 int stringmatchlen(const char *pattern, int patternLen,
461 const char *string, int stringLen, int nocase)
462 {
463 while(patternLen) {
464 switch(pattern[0]) {
465 case '*':
466 while (pattern[1] == '*') {
467 pattern++;
468 patternLen--;
469 }
470 if (patternLen == 1)
471 return 1; /* match */
472 while(stringLen) {
473 if (stringmatchlen(pattern+1, patternLen-1,
474 string, stringLen, nocase))
475 return 1; /* match */
476 string++;
477 stringLen--;
478 }
479 return 0; /* no match */
480 break;
481 case '?':
482 if (stringLen == 0)
483 return 0; /* no match */
484 string++;
485 stringLen--;
486 break;
487 case '[':
488 {
489 int not, match;
490
491 pattern++;
492 patternLen--;
493 not = pattern[0] == '^';
494 if (not) {
495 pattern++;
496 patternLen--;
497 }
498 match = 0;
499 while(1) {
500 if (pattern[0] == '\\') {
501 pattern++;
502 patternLen--;
503 if (pattern[0] == string[0])
504 match = 1;
505 } else if (pattern[0] == ']') {
506 break;
507 } else if (patternLen == 0) {
508 pattern--;
509 patternLen++;
510 break;
511 } else if (pattern[1] == '-' && patternLen >= 3) {
512 int start = pattern[0];
513 int end = pattern[2];
514 int c = string[0];
515 if (start > end) {
516 int t = start;
517 start = end;
518 end = t;
519 }
520 if (nocase) {
521 start = tolower(start);
522 end = tolower(end);
523 c = tolower(c);
524 }
525 pattern += 2;
526 patternLen -= 2;
527 if (c >= start && c <= end)
528 match = 1;
529 } else {
530 if (!nocase) {
531 if (pattern[0] == string[0])
532 match = 1;
533 } else {
534 if (tolower((int)pattern[0]) == tolower((int)string[0]))
535 match = 1;
536 }
537 }
538 pattern++;
539 patternLen--;
540 }
541 if (not)
542 match = !match;
543 if (!match)
544 return 0; /* no match */
545 string++;
546 stringLen--;
547 break;
548 }
549 case '\\':
550 if (patternLen >= 2) {
551 pattern++;
552 patternLen--;
553 }
554 /* fall through */
555 default:
556 if (!nocase) {
557 if (pattern[0] != string[0])
558 return 0; /* no match */
559 } else {
560 if (tolower((int)pattern[0]) != tolower((int)string[0]))
561 return 0; /* no match */
562 }
563 string++;
564 stringLen--;
565 break;
566 }
567 pattern++;
568 patternLen--;
569 if (stringLen == 0) {
570 while(*pattern == '*') {
571 pattern++;
572 patternLen--;
573 }
574 break;
575 }
576 }
577 if (patternLen == 0 && stringLen == 0)
578 return 1;
579 return 0;
580 }
581
582 static void redisLog(int level, const char *fmt, ...) {
583 va_list ap;
584 FILE *fp;
585
586 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
587 if (!fp) return;
588
589 va_start(ap, fmt);
590 if (level >= server.verbosity) {
591 char *c = ".-*";
592 char buf[64];
593 time_t now;
594
595 now = time(NULL);
596 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
597 fprintf(fp,"%s %c ",buf,c[level]);
598 vfprintf(fp, fmt, ap);
599 fprintf(fp,"\n");
600 fflush(fp);
601 }
602 va_end(ap);
603
604 if (server.logfile) fclose(fp);
605 }
606
607 /*====================== Hash table type implementation ==================== */
608
609 /* This is an hash table type that uses the SDS dynamic strings libary as
610 * keys and radis objects as values (objects can hold SDS strings,
611 * lists, sets). */
612
613 static int sdsDictKeyCompare(void *privdata, const void *key1,
614 const void *key2)
615 {
616 int l1,l2;
617 DICT_NOTUSED(privdata);
618
619 l1 = sdslen((sds)key1);
620 l2 = sdslen((sds)key2);
621 if (l1 != l2) return 0;
622 return memcmp(key1, key2, l1) == 0;
623 }
624
625 static void dictRedisObjectDestructor(void *privdata, void *val)
626 {
627 DICT_NOTUSED(privdata);
628
629 decrRefCount(val);
630 }
631
632 static int dictSdsKeyCompare(void *privdata, const void *key1,
633 const void *key2)
634 {
635 const robj *o1 = key1, *o2 = key2;
636 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
637 }
638
639 static unsigned int dictSdsHash(const void *key) {
640 const robj *o = key;
641 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
642 }
643
644 static dictType setDictType = {
645 dictSdsHash, /* hash function */
646 NULL, /* key dup */
647 NULL, /* val dup */
648 dictSdsKeyCompare, /* key compare */
649 dictRedisObjectDestructor, /* key destructor */
650 NULL /* val destructor */
651 };
652
653 static dictType hashDictType = {
654 dictSdsHash, /* hash function */
655 NULL, /* key dup */
656 NULL, /* val dup */
657 dictSdsKeyCompare, /* key compare */
658 dictRedisObjectDestructor, /* key destructor */
659 dictRedisObjectDestructor /* val destructor */
660 };
661
662 /* ========================= Random utility functions ======================= */
663
664 /* Redis generally does not try to recover from out of memory conditions
665 * when allocating objects or strings, it is not clear if it will be possible
666 * to report this condition to the client since the networking layer itself
667 * is based on heap allocation for send buffers, so we simply abort.
668 * At least the code will be simpler to read... */
669 static void oom(const char *msg) {
670 fprintf(stderr, "%s: Out of memory\n",msg);
671 fflush(stderr);
672 sleep(1);
673 abort();
674 }
675
676 /* ====================== Redis server networking stuff ===================== */
677 static void closeTimedoutClients(void) {
678 redisClient *c;
679 listNode *ln;
680 time_t now = time(NULL);
681
682 listRewind(server.clients);
683 while ((ln = listYield(server.clients)) != NULL) {
684 c = listNodeValue(ln);
685 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
686 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
687 (now - c->lastinteraction > server.maxidletime)) {
688 redisLog(REDIS_DEBUG,"Closing idle client");
689 freeClient(c);
690 }
691 }
692 }
693
694 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
695 * we resize the hash table to save memory */
696 static void tryResizeHashTables(void) {
697 int j;
698
699 for (j = 0; j < server.dbnum; j++) {
700 long long size, used;
701
702 size = dictSlots(server.db[j].dict);
703 used = dictSize(server.db[j].dict);
704 if (size && used && size > REDIS_HT_MINSLOTS &&
705 (used*100/size < REDIS_HT_MINFILL)) {
706 redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
707 dictResize(server.db[j].dict);
708 redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
709 }
710 }
711 }
712
713 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
714 int j, loops = server.cronloops++;
715 REDIS_NOTUSED(eventLoop);
716 REDIS_NOTUSED(id);
717 REDIS_NOTUSED(clientData);
718
719 /* Update the global state with the amount of used memory */
720 server.usedmemory = zmalloc_used_memory();
721
722 /* Show some info about non-empty databases */
723 for (j = 0; j < server.dbnum; j++) {
724 long long size, used, vkeys;
725
726 size = dictSlots(server.db[j].dict);
727 used = dictSize(server.db[j].dict);
728 vkeys = dictSize(server.db[j].expires);
729 if (!(loops % 5) && used > 0) {
730 redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size);
731 /* dictPrintStats(server.dict); */
732 }
733 }
734
735 /* We don't want to resize the hash tables while a bacground saving
736 * is in progress: the saving child is created using fork() that is
737 * implemented with a copy-on-write semantic in most modern systems, so
738 * if we resize the HT while there is the saving child at work actually
739 * a lot of memory movements in the parent will cause a lot of pages
740 * copied. */
741 if (!server.bgsaveinprogress) tryResizeHashTables();
742
743 /* Show information about connected clients */
744 if (!(loops % 5)) {
745 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use",
746 listLength(server.clients)-listLength(server.slaves),
747 listLength(server.slaves),
748 server.usedmemory,
749 dictSize(server.sharingpool));
750 }
751
752 /* Close connections of timedout clients */
753 if (server.maxidletime && !(loops % 10))
754 closeTimedoutClients();
755
756 /* Check if a background saving in progress terminated */
757 if (server.bgsaveinprogress) {
758 int statloc;
759 /* XXX: TODO handle the case of the saving child killed */
760 if (wait4(-1,&statloc,WNOHANG,NULL)) {
761 int exitcode = WEXITSTATUS(statloc);
762 int bysignal = WIFSIGNALED(statloc);
763
764 if (!bysignal && exitcode == 0) {
765 redisLog(REDIS_NOTICE,
766 "Background saving terminated with success");
767 server.dirty = 0;
768 server.lastsave = time(NULL);
769 } else if (!bysignal && exitcode != 0) {
770 redisLog(REDIS_WARNING, "Background saving error");
771 } else {
772 redisLog(REDIS_WARNING,
773 "Background saving terminated by signal");
774 }
775 server.bgsaveinprogress = 0;
776 server.bgsavechildpid = -1;
777 updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
778 }
779 } else {
780 /* If there is not a background saving in progress check if
781 * we have to save now */
782 time_t now = time(NULL);
783 for (j = 0; j < server.saveparamslen; j++) {
784 struct saveparam *sp = server.saveparams+j;
785
786 if (server.dirty >= sp->changes &&
787 now-server.lastsave > sp->seconds) {
788 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
789 sp->changes, sp->seconds);
790 rdbSaveBackground(server.dbfilename);
791 break;
792 }
793 }
794 }
795
796 /* Try to expire a few timed out keys */
797 for (j = 0; j < server.dbnum; j++) {
798 redisDb *db = server.db+j;
799 int num = dictSize(db->expires);
800
801 if (num) {
802 time_t now = time(NULL);
803
804 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
805 num = REDIS_EXPIRELOOKUPS_PER_CRON;
806 while (num--) {
807 dictEntry *de;
808 time_t t;
809
810 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
811 t = (time_t) dictGetEntryVal(de);
812 if (now > t) {
813 deleteKey(db,dictGetEntryKey(de));
814 }
815 }
816 }
817 }
818
819 /* Check if we should connect to a MASTER */
820 if (server.replstate == REDIS_REPL_CONNECT) {
821 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
822 if (syncWithMaster() == REDIS_OK) {
823 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
824 }
825 }
826 return 1000;
827 }
828
829 static void createSharedObjects(void) {
830 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
831 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
832 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
833 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
834 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
835 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
836 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
837 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
838 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
839 /* no such key */
840 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
841 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
842 "-ERR Operation against a key holding the wrong kind of value\r\n"));
843 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
844 "-ERR no such key\r\n"));
845 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
846 "-ERR syntax error\r\n"));
847 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
848 "-ERR source and destination objects are the same\r\n"));
849 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
850 "-ERR index out of range\r\n"));
851 shared.space = createObject(REDIS_STRING,sdsnew(" "));
852 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
853 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
854 shared.select0 = createStringObject("select 0\r\n",10);
855 shared.select1 = createStringObject("select 1\r\n",10);
856 shared.select2 = createStringObject("select 2\r\n",10);
857 shared.select3 = createStringObject("select 3\r\n",10);
858 shared.select4 = createStringObject("select 4\r\n",10);
859 shared.select5 = createStringObject("select 5\r\n",10);
860 shared.select6 = createStringObject("select 6\r\n",10);
861 shared.select7 = createStringObject("select 7\r\n",10);
862 shared.select8 = createStringObject("select 8\r\n",10);
863 shared.select9 = createStringObject("select 9\r\n",10);
864 }
865
866 static void appendServerSaveParams(time_t seconds, int changes) {
867 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
868 if (server.saveparams == NULL) oom("appendServerSaveParams");
869 server.saveparams[server.saveparamslen].seconds = seconds;
870 server.saveparams[server.saveparamslen].changes = changes;
871 server.saveparamslen++;
872 }
873
874 static void ResetServerSaveParams() {
875 zfree(server.saveparams);
876 server.saveparams = NULL;
877 server.saveparamslen = 0;
878 }
879
880 static void initServerConfig() {
881 server.dbnum = REDIS_DEFAULT_DBNUM;
882 server.port = REDIS_SERVERPORT;
883 server.verbosity = REDIS_DEBUG;
884 server.maxidletime = REDIS_MAXIDLETIME;
885 server.saveparams = NULL;
886 server.logfile = NULL; /* NULL = log on standard output */
887 server.bindaddr = NULL;
888 server.glueoutputbuf = 1;
889 server.daemonize = 0;
890 server.pidfile = "/var/run/redis.pid";
891 server.dbfilename = "dump.rdb";
892 server.requirepass = NULL;
893 server.shareobjects = 0;
894 server.maxclients = 0;
895 server.maxmemory = 0;
896 ResetServerSaveParams();
897
898 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
899 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
900 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
901 /* Replication related */
902 server.isslave = 0;
903 server.masterhost = NULL;
904 server.masterport = 6379;
905 server.master = NULL;
906 server.replstate = REDIS_REPL_NONE;
907 }
908
909 static void initServer() {
910 int j;
911
912 signal(SIGHUP, SIG_IGN);
913 signal(SIGPIPE, SIG_IGN);
914 setupSigSegvAction();
915
916 server.clients = listCreate();
917 server.slaves = listCreate();
918 server.monitors = listCreate();
919 server.objfreelist = listCreate();
920 createSharedObjects();
921 server.el = aeCreateEventLoop();
922 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
923 server.sharingpool = dictCreate(&setDictType,NULL);
924 server.sharingpoolsize = 1024;
925 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
926 oom("server initialization"); /* Fatal OOM */
927 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
928 if (server.fd == -1) {
929 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
930 exit(1);
931 }
932 for (j = 0; j < server.dbnum; j++) {
933 server.db[j].dict = dictCreate(&hashDictType,NULL);
934 server.db[j].expires = dictCreate(&setDictType,NULL);
935 server.db[j].id = j;
936 }
937 server.cronloops = 0;
938 server.bgsaveinprogress = 0;
939 server.bgsavechildpid = -1;
940 server.lastsave = time(NULL);
941 server.dirty = 0;
942 server.usedmemory = 0;
943 server.stat_numcommands = 0;
944 server.stat_numconnections = 0;
945 server.stat_starttime = time(NULL);
946 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
947 }
948
949 /* Empty the whole database */
950 static long long emptyDb() {
951 int j;
952 long long removed = 0;
953
954 for (j = 0; j < server.dbnum; j++) {
955 removed += dictSize(server.db[j].dict);
956 dictEmpty(server.db[j].dict);
957 dictEmpty(server.db[j].expires);
958 }
959 return removed;
960 }
961
962 static int yesnotoi(char *s) {
963 if (!strcasecmp(s,"yes")) return 1;
964 else if (!strcasecmp(s,"no")) return 0;
965 else return -1;
966 }
967
968 /* I agree, this is a very rudimental way to load a configuration...
969 will improve later if the config gets more complex */
970 static void loadServerConfig(char *filename) {
971 FILE *fp = fopen(filename,"r");
972 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
973 int linenum = 0;
974 sds line = NULL;
975
976 if (!fp) {
977 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
978 exit(1);
979 }
980 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
981 sds *argv;
982 int argc, j;
983
984 linenum++;
985 line = sdsnew(buf);
986 line = sdstrim(line," \t\r\n");
987
988 /* Skip comments and blank lines*/
989 if (line[0] == '#' || line[0] == '\0') {
990 sdsfree(line);
991 continue;
992 }
993
994 /* Split into arguments */
995 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
996 sdstolower(argv[0]);
997
998 /* Execute config directives */
999 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1000 server.maxidletime = atoi(argv[1]);
1001 if (server.maxidletime < 0) {
1002 err = "Invalid timeout value"; goto loaderr;
1003 }
1004 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1005 server.port = atoi(argv[1]);
1006 if (server.port < 1 || server.port > 65535) {
1007 err = "Invalid port"; goto loaderr;
1008 }
1009 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1010 server.bindaddr = zstrdup(argv[1]);
1011 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1012 int seconds = atoi(argv[1]);
1013 int changes = atoi(argv[2]);
1014 if (seconds < 1 || changes < 0) {
1015 err = "Invalid save parameters"; goto loaderr;
1016 }
1017 appendServerSaveParams(seconds,changes);
1018 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1019 if (chdir(argv[1]) == -1) {
1020 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1021 argv[1], strerror(errno));
1022 exit(1);
1023 }
1024 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1025 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1026 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1027 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1028 else {
1029 err = "Invalid log level. Must be one of debug, notice, warning";
1030 goto loaderr;
1031 }
1032 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1033 FILE *fp;
1034
1035 server.logfile = zstrdup(argv[1]);
1036 if (!strcasecmp(server.logfile,"stdout")) {
1037 zfree(server.logfile);
1038 server.logfile = NULL;
1039 }
1040 if (server.logfile) {
1041 /* Test if we are able to open the file. The server will not
1042 * be able to abort just for this problem later... */
1043 fp = fopen(server.logfile,"a");
1044 if (fp == NULL) {
1045 err = sdscatprintf(sdsempty(),
1046 "Can't open the log file: %s", strerror(errno));
1047 goto loaderr;
1048 }
1049 fclose(fp);
1050 }
1051 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1052 server.dbnum = atoi(argv[1]);
1053 if (server.dbnum < 1) {
1054 err = "Invalid number of databases"; goto loaderr;
1055 }
1056 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1057 server.maxclients = atoi(argv[1]);
1058 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1059 server.maxmemory = atoi(argv[1]);
1060 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1061 server.masterhost = sdsnew(argv[1]);
1062 server.masterport = atoi(argv[2]);
1063 server.replstate = REDIS_REPL_CONNECT;
1064 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1065 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1066 err = "argument must be 'yes' or 'no'"; goto loaderr;
1067 }
1068 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1069 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1070 err = "argument must be 'yes' or 'no'"; goto loaderr;
1071 }
1072 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1073 server.sharingpoolsize = atoi(argv[1]);
1074 if (server.sharingpoolsize < 1) {
1075 err = "invalid object sharing pool size"; goto loaderr;
1076 }
1077 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1078 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1079 err = "argument must be 'yes' or 'no'"; goto loaderr;
1080 }
1081 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1082 server.requirepass = zstrdup(argv[1]);
1083 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1084 server.pidfile = zstrdup(argv[1]);
1085 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1086 server.dbfilename = zstrdup(argv[1]);
1087 } else {
1088 err = "Bad directive or wrong number of arguments"; goto loaderr;
1089 }
1090 for (j = 0; j < argc; j++)
1091 sdsfree(argv[j]);
1092 zfree(argv);
1093 sdsfree(line);
1094 }
1095 fclose(fp);
1096 return;
1097
1098 loaderr:
1099 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1100 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1101 fprintf(stderr, ">>> '%s'\n", line);
1102 fprintf(stderr, "%s\n", err);
1103 exit(1);
1104 }
1105
1106 static void freeClientArgv(redisClient *c) {
1107 int j;
1108
1109 for (j = 0; j < c->argc; j++)
1110 decrRefCount(c->argv[j]);
1111 c->argc = 0;
1112 }
1113
1114 static void freeClient(redisClient *c) {
1115 listNode *ln;
1116
1117 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1118 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1119 sdsfree(c->querybuf);
1120 listRelease(c->reply);
1121 freeClientArgv(c);
1122 close(c->fd);
1123 ln = listSearchKey(server.clients,c);
1124 assert(ln != NULL);
1125 listDelNode(server.clients,ln);
1126 if (c->flags & REDIS_SLAVE) {
1127 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1128 close(c->repldbfd);
1129 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1130 ln = listSearchKey(l,c);
1131 assert(ln != NULL);
1132 listDelNode(l,ln);
1133 }
1134 if (c->flags & REDIS_MASTER) {
1135 server.master = NULL;
1136 server.replstate = REDIS_REPL_CONNECT;
1137 }
1138 zfree(c->argv);
1139 zfree(c);
1140 }
1141
1142 static void glueReplyBuffersIfNeeded(redisClient *c) {
1143 int totlen = 0;
1144 listNode *ln;
1145 robj *o;
1146
1147 listRewind(c->reply);
1148 while((ln = listYield(c->reply))) {
1149 o = ln->value;
1150 totlen += sdslen(o->ptr);
1151 /* This optimization makes more sense if we don't have to copy
1152 * too much data */
1153 if (totlen > 1024) return;
1154 }
1155 if (totlen > 0) {
1156 char buf[1024];
1157 int copylen = 0;
1158
1159 listRewind(c->reply);
1160 while((ln = listYield(c->reply))) {
1161 o = ln->value;
1162 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1163 copylen += sdslen(o->ptr);
1164 listDelNode(c->reply,ln);
1165 }
1166 /* Now the output buffer is empty, add the new single element */
1167 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1168 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1169 }
1170 }
1171
1172 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1173 redisClient *c = privdata;
1174 int nwritten = 0, totwritten = 0, objlen;
1175 robj *o;
1176 REDIS_NOTUSED(el);
1177 REDIS_NOTUSED(mask);
1178
1179 if (server.glueoutputbuf && listLength(c->reply) > 1)
1180 glueReplyBuffersIfNeeded(c);
1181 while(listLength(c->reply)) {
1182 o = listNodeValue(listFirst(c->reply));
1183 objlen = sdslen(o->ptr);
1184
1185 if (objlen == 0) {
1186 listDelNode(c->reply,listFirst(c->reply));
1187 continue;
1188 }
1189
1190 if (c->flags & REDIS_MASTER) {
1191 /* Don't reply to a master */
1192 nwritten = objlen - c->sentlen;
1193 } else {
1194 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1195 if (nwritten <= 0) break;
1196 }
1197 c->sentlen += nwritten;
1198 totwritten += nwritten;
1199 /* If we fully sent the object on head go to the next one */
1200 if (c->sentlen == objlen) {
1201 listDelNode(c->reply,listFirst(c->reply));
1202 c->sentlen = 0;
1203 }
1204 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1205 * bytes, in a single threaded server it's a good idea to server
1206 * other clients as well, even if a very large request comes from
1207 * super fast link that is always able to accept data (in real world
1208 * terms think to 'KEYS *' against the loopback interfae) */
1209 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1210 }
1211 if (nwritten == -1) {
1212 if (errno == EAGAIN) {
1213 nwritten = 0;
1214 } else {
1215 redisLog(REDIS_DEBUG,
1216 "Error writing to client: %s", strerror(errno));
1217 freeClient(c);
1218 return;
1219 }
1220 }
1221 if (totwritten > 0) c->lastinteraction = time(NULL);
1222 if (listLength(c->reply) == 0) {
1223 c->sentlen = 0;
1224 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1225 }
1226 }
1227
1228 static struct redisCommand *lookupCommand(char *name) {
1229 int j = 0;
1230 while(cmdTable[j].name != NULL) {
1231 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1232 j++;
1233 }
1234 return NULL;
1235 }
1236
1237 /* resetClient prepare the client to process the next command */
1238 static void resetClient(redisClient *c) {
1239 freeClientArgv(c);
1240 c->bulklen = -1;
1241 }
1242
1243 /* If this function gets called we already read a whole
1244 * command, argments are in the client argv/argc fields.
1245 * processCommand() execute the command or prepare the
1246 * server for a bulk read from the client.
1247 *
1248 * If 1 is returned the client is still alive and valid and
1249 * and other operations can be performed by the caller. Otherwise
1250 * if 0 is returned the client was destroied (i.e. after QUIT). */
1251 static int processCommand(redisClient *c) {
1252 struct redisCommand *cmd;
1253 long long dirty;
1254
1255 /* Free some memory if needed (maxmemory setting) */
1256 if (server.maxmemory) freeMemoryIfNeeded();
1257
1258 /* The QUIT command is handled as a special case. Normal command
1259 * procs are unable to close the client connection safely */
1260 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1261 freeClient(c);
1262 return 0;
1263 }
1264 cmd = lookupCommand(c->argv[0]->ptr);
1265 if (!cmd) {
1266 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1267 resetClient(c);
1268 return 1;
1269 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1270 (c->argc < -cmd->arity)) {
1271 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1272 resetClient(c);
1273 return 1;
1274 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1275 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1276 resetClient(c);
1277 return 1;
1278 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1279 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1280
1281 decrRefCount(c->argv[c->argc-1]);
1282 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1283 c->argc--;
1284 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1285 resetClient(c);
1286 return 1;
1287 }
1288 c->argc--;
1289 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1290 /* It is possible that the bulk read is already in the
1291 * buffer. Check this condition and handle it accordingly */
1292 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1293 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1294 c->argc++;
1295 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1296 } else {
1297 return 1;
1298 }
1299 }
1300 /* Let's try to share objects on the command arguments vector */
1301 if (server.shareobjects) {
1302 int j;
1303 for(j = 1; j < c->argc; j++)
1304 c->argv[j] = tryObjectSharing(c->argv[j]);
1305 }
1306 /* Check if the user is authenticated */
1307 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1308 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1309 resetClient(c);
1310 return 1;
1311 }
1312
1313 /* Exec the command */
1314 dirty = server.dirty;
1315 cmd->proc(c);
1316 if (server.dirty-dirty != 0 && listLength(server.slaves))
1317 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1318 if (listLength(server.monitors))
1319 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1320 server.stat_numcommands++;
1321
1322 /* Prepare the client for the next command */
1323 if (c->flags & REDIS_CLOSE) {
1324 freeClient(c);
1325 return 0;
1326 }
1327 resetClient(c);
1328 return 1;
1329 }
1330
1331 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1332 listNode *ln;
1333 int outc = 0, j;
1334 robj **outv;
1335 /* (args*2)+1 is enough room for args, spaces, newlines */
1336 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1337
1338 if (argc <= REDIS_STATIC_ARGS) {
1339 outv = static_outv;
1340 } else {
1341 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1342 if (!outv) oom("replicationFeedSlaves");
1343 }
1344
1345 for (j = 0; j < argc; j++) {
1346 if (j != 0) outv[outc++] = shared.space;
1347 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1348 robj *lenobj;
1349
1350 lenobj = createObject(REDIS_STRING,
1351 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv[j]->ptr)));
1352 lenobj->refcount = 0;
1353 outv[outc++] = lenobj;
1354 }
1355 outv[outc++] = argv[j];
1356 }
1357 outv[outc++] = shared.crlf;
1358
1359 /* Increment all the refcounts at start and decrement at end in order to
1360 * be sure to free objects if there is no slave in a replication state
1361 * able to be feed with commands */
1362 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1363 listRewind(slaves);
1364 while((ln = listYield(slaves))) {
1365 redisClient *slave = ln->value;
1366
1367 /* Don't feed slaves that are still waiting for BGSAVE to start */
1368 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1369
1370 /* Feed all the other slaves, MONITORs and so on */
1371 if (slave->slaveseldb != dictid) {
1372 robj *selectcmd;
1373
1374 switch(dictid) {
1375 case 0: selectcmd = shared.select0; break;
1376 case 1: selectcmd = shared.select1; break;
1377 case 2: selectcmd = shared.select2; break;
1378 case 3: selectcmd = shared.select3; break;
1379 case 4: selectcmd = shared.select4; break;
1380 case 5: selectcmd = shared.select5; break;
1381 case 6: selectcmd = shared.select6; break;
1382 case 7: selectcmd = shared.select7; break;
1383 case 8: selectcmd = shared.select8; break;
1384 case 9: selectcmd = shared.select9; break;
1385 default:
1386 selectcmd = createObject(REDIS_STRING,
1387 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1388 selectcmd->refcount = 0;
1389 break;
1390 }
1391 addReply(slave,selectcmd);
1392 slave->slaveseldb = dictid;
1393 }
1394 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1395 }
1396 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1397 if (outv != static_outv) zfree(outv);
1398 }
1399
1400 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1401 redisClient *c = (redisClient*) privdata;
1402 char buf[REDIS_IOBUF_LEN];
1403 int nread;
1404 REDIS_NOTUSED(el);
1405 REDIS_NOTUSED(mask);
1406
1407 nread = read(fd, buf, REDIS_IOBUF_LEN);
1408 if (nread == -1) {
1409 if (errno == EAGAIN) {
1410 nread = 0;
1411 } else {
1412 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1413 freeClient(c);
1414 return;
1415 }
1416 } else if (nread == 0) {
1417 redisLog(REDIS_DEBUG, "Client closed connection");
1418 freeClient(c);
1419 return;
1420 }
1421 if (nread) {
1422 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1423 c->lastinteraction = time(NULL);
1424 } else {
1425 return;
1426 }
1427
1428 again:
1429 if (c->bulklen == -1) {
1430 /* Read the first line of the query */
1431 char *p = strchr(c->querybuf,'\n');
1432 size_t querylen;
1433 if (p) {
1434 sds query, *argv;
1435 int argc, j;
1436
1437 query = c->querybuf;
1438 c->querybuf = sdsempty();
1439 querylen = 1+(p-(query));
1440 if (sdslen(query) > querylen) {
1441 /* leave data after the first line of the query in the buffer */
1442 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1443 }
1444 *p = '\0'; /* remove "\n" */
1445 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1446 sdsupdatelen(query);
1447
1448 /* Now we can split the query in arguments */
1449 if (sdslen(query) == 0) {
1450 /* Ignore empty query */
1451 sdsfree(query);
1452 return;
1453 }
1454 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1455 if (argv == NULL) oom("sdssplitlen");
1456 sdsfree(query);
1457
1458 if (c->argv) zfree(c->argv);
1459 c->argv = zmalloc(sizeof(robj*)*argc);
1460 if (c->argv == NULL) oom("allocating arguments list for client");
1461
1462 for (j = 0; j < argc; j++) {
1463 if (sdslen(argv[j])) {
1464 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1465 c->argc++;
1466 } else {
1467 sdsfree(argv[j]);
1468 }
1469 }
1470 zfree(argv);
1471 /* Execute the command. If the client is still valid
1472 * after processCommand() return and there is something
1473 * on the query buffer try to process the next command. */
1474 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1475 return;
1476 } else if (sdslen(c->querybuf) >= 1024*32) {
1477 redisLog(REDIS_DEBUG, "Client protocol error");
1478 freeClient(c);
1479 return;
1480 }
1481 } else {
1482 /* Bulk read handling. Note that if we are at this point
1483 the client already sent a command terminated with a newline,
1484 we are reading the bulk data that is actually the last
1485 argument of the command. */
1486 int qbl = sdslen(c->querybuf);
1487
1488 if (c->bulklen <= qbl) {
1489 /* Copy everything but the final CRLF as final argument */
1490 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1491 c->argc++;
1492 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1493 processCommand(c);
1494 return;
1495 }
1496 }
1497 }
1498
1499 static int selectDb(redisClient *c, int id) {
1500 if (id < 0 || id >= server.dbnum)
1501 return REDIS_ERR;
1502 c->db = &server.db[id];
1503 return REDIS_OK;
1504 }
1505
1506 static void *dupClientReplyValue(void *o) {
1507 incrRefCount((robj*)o);
1508 return 0;
1509 }
1510
1511 static redisClient *createClient(int fd) {
1512 redisClient *c = zmalloc(sizeof(*c));
1513
1514 anetNonBlock(NULL,fd);
1515 anetTcpNoDelay(NULL,fd);
1516 if (!c) return NULL;
1517 selectDb(c,0);
1518 c->fd = fd;
1519 c->querybuf = sdsempty();
1520 c->argc = 0;
1521 c->argv = NULL;
1522 c->bulklen = -1;
1523 c->sentlen = 0;
1524 c->flags = 0;
1525 c->lastinteraction = time(NULL);
1526 c->authenticated = 0;
1527 c->replstate = REDIS_REPL_NONE;
1528 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1529 listSetFreeMethod(c->reply,decrRefCount);
1530 listSetDupMethod(c->reply,dupClientReplyValue);
1531 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1532 readQueryFromClient, c, NULL) == AE_ERR) {
1533 freeClient(c);
1534 return NULL;
1535 }
1536 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1537 return c;
1538 }
1539
1540 static void addReply(redisClient *c, robj *obj) {
1541 if (listLength(c->reply) == 0 &&
1542 (c->replstate == REDIS_REPL_NONE ||
1543 c->replstate == REDIS_REPL_ONLINE) &&
1544 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1545 sendReplyToClient, c, NULL) == AE_ERR) return;
1546 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1547 incrRefCount(obj);
1548 }
1549
1550 static void addReplySds(redisClient *c, sds s) {
1551 robj *o = createObject(REDIS_STRING,s);
1552 addReply(c,o);
1553 decrRefCount(o);
1554 }
1555
1556 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1557 int cport, cfd;
1558 char cip[128];
1559 redisClient *c;
1560 REDIS_NOTUSED(el);
1561 REDIS_NOTUSED(mask);
1562 REDIS_NOTUSED(privdata);
1563
1564 cfd = anetAccept(server.neterr, fd, cip, &cport);
1565 if (cfd == AE_ERR) {
1566 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1567 return;
1568 }
1569 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1570 if ((c = createClient(cfd)) == NULL) {
1571 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1572 close(cfd); /* May be already closed, just ingore errors */
1573 return;
1574 }
1575 /* If maxclient directive is set and this is one client more... close the
1576 * connection. Note that we create the client instead to check before
1577 * for this condition, since now the socket is already set in nonblocking
1578 * mode and we can send an error for free using the Kernel I/O */
1579 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1580 char *err = "-ERR max number of clients reached\r\n";
1581
1582 /* That's a best effort error message, don't check write errors */
1583 (void) write(c->fd,err,strlen(err));
1584 freeClient(c);
1585 return;
1586 }
1587 server.stat_numconnections++;
1588 }
1589
1590 /* ======================= Redis objects implementation ===================== */
1591
1592 static robj *createObject(int type, void *ptr) {
1593 robj *o;
1594
1595 if (listLength(server.objfreelist)) {
1596 listNode *head = listFirst(server.objfreelist);
1597 o = listNodeValue(head);
1598 listDelNode(server.objfreelist,head);
1599 } else {
1600 o = zmalloc(sizeof(*o));
1601 }
1602 if (!o) oom("createObject");
1603 o->type = type;
1604 o->ptr = ptr;
1605 o->refcount = 1;
1606 return o;
1607 }
1608
1609 static robj *createStringObject(char *ptr, size_t len) {
1610 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1611 }
1612
1613 static robj *createListObject(void) {
1614 list *l = listCreate();
1615
1616 if (!l) oom("listCreate");
1617 listSetFreeMethod(l,decrRefCount);
1618 return createObject(REDIS_LIST,l);
1619 }
1620
1621 static robj *createSetObject(void) {
1622 dict *d = dictCreate(&setDictType,NULL);
1623 if (!d) oom("dictCreate");
1624 return createObject(REDIS_SET,d);
1625 }
1626
1627 static void freeStringObject(robj *o) {
1628 sdsfree(o->ptr);
1629 }
1630
1631 static void freeListObject(robj *o) {
1632 listRelease((list*) o->ptr);
1633 }
1634
1635 static void freeSetObject(robj *o) {
1636 dictRelease((dict*) o->ptr);
1637 }
1638
1639 static void freeHashObject(robj *o) {
1640 dictRelease((dict*) o->ptr);
1641 }
1642
1643 static void incrRefCount(robj *o) {
1644 o->refcount++;
1645 #ifdef DEBUG_REFCOUNT
1646 if (o->type == REDIS_STRING)
1647 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1648 #endif
1649 }
1650
1651 static void decrRefCount(void *obj) {
1652 robj *o = obj;
1653
1654 #ifdef DEBUG_REFCOUNT
1655 if (o->type == REDIS_STRING)
1656 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1657 #endif
1658 if (--(o->refcount) == 0) {
1659 switch(o->type) {
1660 case REDIS_STRING: freeStringObject(o); break;
1661 case REDIS_LIST: freeListObject(o); break;
1662 case REDIS_SET: freeSetObject(o); break;
1663 case REDIS_HASH: freeHashObject(o); break;
1664 default: assert(0 != 0); break;
1665 }
1666 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1667 !listAddNodeHead(server.objfreelist,o))
1668 zfree(o);
1669 }
1670 }
1671
1672 /* Try to share an object against the shared objects pool */
1673 static robj *tryObjectSharing(robj *o) {
1674 struct dictEntry *de;
1675 unsigned long c;
1676
1677 if (o == NULL || server.shareobjects == 0) return o;
1678
1679 assert(o->type == REDIS_STRING);
1680 de = dictFind(server.sharingpool,o);
1681 if (de) {
1682 robj *shared = dictGetEntryKey(de);
1683
1684 c = ((unsigned long) dictGetEntryVal(de))+1;
1685 dictGetEntryVal(de) = (void*) c;
1686 incrRefCount(shared);
1687 decrRefCount(o);
1688 return shared;
1689 } else {
1690 /* Here we are using a stream algorihtm: Every time an object is
1691 * shared we increment its count, everytime there is a miss we
1692 * recrement the counter of a random object. If this object reaches
1693 * zero we remove the object and put the current object instead. */
1694 if (dictSize(server.sharingpool) >=
1695 server.sharingpoolsize) {
1696 de = dictGetRandomKey(server.sharingpool);
1697 assert(de != NULL);
1698 c = ((unsigned long) dictGetEntryVal(de))-1;
1699 dictGetEntryVal(de) = (void*) c;
1700 if (c == 0) {
1701 dictDelete(server.sharingpool,de->key);
1702 }
1703 } else {
1704 c = 0; /* If the pool is empty we want to add this object */
1705 }
1706 if (c == 0) {
1707 int retval;
1708
1709 retval = dictAdd(server.sharingpool,o,(void*)1);
1710 assert(retval == DICT_OK);
1711 incrRefCount(o);
1712 }
1713 return o;
1714 }
1715 }
1716
1717 static robj *lookupKey(redisDb *db, robj *key) {
1718 dictEntry *de = dictFind(db->dict,key);
1719 return de ? dictGetEntryVal(de) : NULL;
1720 }
1721
1722 static robj *lookupKeyRead(redisDb *db, robj *key) {
1723 expireIfNeeded(db,key);
1724 return lookupKey(db,key);
1725 }
1726
1727 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1728 deleteIfVolatile(db,key);
1729 return lookupKey(db,key);
1730 }
1731
1732 static int deleteKey(redisDb *db, robj *key) {
1733 int retval;
1734
1735 /* We need to protect key from destruction: after the first dictDelete()
1736 * it may happen that 'key' is no longer valid if we don't increment
1737 * it's count. This may happen when we get the object reference directly
1738 * from the hash table with dictRandomKey() or dict iterators */
1739 incrRefCount(key);
1740 if (dictSize(db->expires)) dictDelete(db->expires,key);
1741 retval = dictDelete(db->dict,key);
1742 decrRefCount(key);
1743
1744 return retval == DICT_OK;
1745 }
1746
1747 /*============================ DB saving/loading ============================ */
1748
1749 static int rdbSaveType(FILE *fp, unsigned char type) {
1750 if (fwrite(&type,1,1,fp) == 0) return -1;
1751 return 0;
1752 }
1753
1754 static int rdbSaveTime(FILE *fp, time_t t) {
1755 int32_t t32 = (int32_t) t;
1756 if (fwrite(&t32,4,1,fp) == 0) return -1;
1757 return 0;
1758 }
1759
1760 /* check rdbLoadLen() comments for more info */
1761 static int rdbSaveLen(FILE *fp, uint32_t len) {
1762 unsigned char buf[2];
1763
1764 if (len < (1<<6)) {
1765 /* Save a 6 bit len */
1766 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
1767 if (fwrite(buf,1,1,fp) == 0) return -1;
1768 } else if (len < (1<<14)) {
1769 /* Save a 14 bit len */
1770 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
1771 buf[1] = len&0xFF;
1772 if (fwrite(buf,2,1,fp) == 0) return -1;
1773 } else {
1774 /* Save a 32 bit len */
1775 buf[0] = (REDIS_RDB_32BITLEN<<6);
1776 if (fwrite(buf,1,1,fp) == 0) return -1;
1777 len = htonl(len);
1778 if (fwrite(&len,4,1,fp) == 0) return -1;
1779 }
1780 return 0;
1781 }
1782
1783 /* String objects in the form "2391" "-100" without any space and with a
1784 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1785 * encoded as integers to save space */
1786 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
1787 long long value;
1788 char *endptr, buf[32];
1789
1790 /* Check if it's possible to encode this value as a number */
1791 value = strtoll(s, &endptr, 10);
1792 if (endptr[0] != '\0') return 0;
1793 snprintf(buf,32,"%lld",value);
1794
1795 /* If the number converted back into a string is not identical
1796 * then it's not possible to encode the string as integer */
1797 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
1798
1799 /* Finally check if it fits in our ranges */
1800 if (value >= -(1<<7) && value <= (1<<7)-1) {
1801 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
1802 enc[1] = value&0xFF;
1803 return 2;
1804 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
1805 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
1806 enc[1] = value&0xFF;
1807 enc[2] = (value>>8)&0xFF;
1808 return 3;
1809 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
1810 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
1811 enc[1] = value&0xFF;
1812 enc[2] = (value>>8)&0xFF;
1813 enc[3] = (value>>16)&0xFF;
1814 enc[4] = (value>>24)&0xFF;
1815 return 5;
1816 } else {
1817 return 0;
1818 }
1819 }
1820
1821 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
1822 unsigned int comprlen, outlen;
1823 unsigned char byte;
1824 void *out;
1825
1826 /* We require at least four bytes compression for this to be worth it */
1827 outlen = sdslen(obj->ptr)-4;
1828 if (outlen <= 0) return 0;
1829 if ((out = zmalloc(outlen+1)) == NULL) return 0;
1830 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
1831 if (comprlen == 0) {
1832 zfree(out);
1833 return 0;
1834 }
1835 /* Data compressed! Let's save it on disk */
1836 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
1837 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
1838 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
1839 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
1840 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
1841 zfree(out);
1842 return comprlen;
1843
1844 writeerr:
1845 zfree(out);
1846 return -1;
1847 }
1848
1849 /* Save a string objet as [len][data] on disk. If the object is a string
1850 * representation of an integer value we try to safe it in a special form */
1851 static int rdbSaveStringObject(FILE *fp, robj *obj) {
1852 size_t len = sdslen(obj->ptr);
1853 int enclen;
1854
1855 /* Try integer encoding */
1856 if (len <= 11) {
1857 unsigned char buf[5];
1858 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
1859 if (fwrite(buf,enclen,1,fp) == 0) return -1;
1860 return 0;
1861 }
1862 }
1863
1864 /* Try LZF compression - under 20 bytes it's unable to compress even
1865 * aaaaaaaaaaaaaaaaaa so skip it */
1866 if (1 && len > 20) {
1867 int retval;
1868
1869 retval = rdbSaveLzfStringObject(fp,obj);
1870 if (retval == -1) return -1;
1871 if (retval > 0) return 0;
1872 /* retval == 0 means data can't be compressed, save the old way */
1873 }
1874
1875 /* Store verbatim */
1876 if (rdbSaveLen(fp,len) == -1) return -1;
1877 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
1878 return 0;
1879 }
1880
1881 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1882 static int rdbSave(char *filename) {
1883 dictIterator *di = NULL;
1884 dictEntry *de;
1885 FILE *fp;
1886 char tmpfile[256];
1887 int j;
1888 time_t now = time(NULL);
1889
1890 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
1891 fp = fopen(tmpfile,"w");
1892 if (!fp) {
1893 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
1894 return REDIS_ERR;
1895 }
1896 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
1897 for (j = 0; j < server.dbnum; j++) {
1898 redisDb *db = server.db+j;
1899 dict *d = db->dict;
1900 if (dictSize(d) == 0) continue;
1901 di = dictGetIterator(d);
1902 if (!di) {
1903 fclose(fp);
1904 return REDIS_ERR;
1905 }
1906
1907 /* Write the SELECT DB opcode */
1908 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
1909 if (rdbSaveLen(fp,j) == -1) goto werr;
1910
1911 /* Iterate this DB writing every entry */
1912 while((de = dictNext(di)) != NULL) {
1913 robj *key = dictGetEntryKey(de);
1914 robj *o = dictGetEntryVal(de);
1915 time_t expiretime = getExpire(db,key);
1916
1917 /* Save the expire time */
1918 if (expiretime != -1) {
1919 /* If this key is already expired skip it */
1920 if (expiretime < now) continue;
1921 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
1922 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
1923 }
1924 /* Save the key and associated value */
1925 if (rdbSaveType(fp,o->type) == -1) goto werr;
1926 if (rdbSaveStringObject(fp,key) == -1) goto werr;
1927 if (o->type == REDIS_STRING) {
1928 /* Save a string value */
1929 if (rdbSaveStringObject(fp,o) == -1) goto werr;
1930 } else if (o->type == REDIS_LIST) {
1931 /* Save a list value */
1932 list *list = o->ptr;
1933 listNode *ln;
1934
1935 listRewind(list);
1936 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
1937 while((ln = listYield(list))) {
1938 robj *eleobj = listNodeValue(ln);
1939
1940 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1941 }
1942 } else if (o->type == REDIS_SET) {
1943 /* Save a set value */
1944 dict *set = o->ptr;
1945 dictIterator *di = dictGetIterator(set);
1946 dictEntry *de;
1947
1948 if (!set) oom("dictGetIteraotr");
1949 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
1950 while((de = dictNext(di)) != NULL) {
1951 robj *eleobj = dictGetEntryKey(de);
1952
1953 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1954 }
1955 dictReleaseIterator(di);
1956 } else {
1957 assert(0 != 0);
1958 }
1959 }
1960 dictReleaseIterator(di);
1961 }
1962 /* EOF opcode */
1963 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
1964
1965 /* Make sure data will not remain on the OS's output buffers */
1966 fflush(fp);
1967 fsync(fileno(fp));
1968 fclose(fp);
1969
1970 /* Use RENAME to make sure the DB file is changed atomically only
1971 * if the generate DB file is ok. */
1972 if (rename(tmpfile,filename) == -1) {
1973 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
1974 unlink(tmpfile);
1975 return REDIS_ERR;
1976 }
1977 redisLog(REDIS_NOTICE,"DB saved on disk");
1978 server.dirty = 0;
1979 server.lastsave = time(NULL);
1980 return REDIS_OK;
1981
1982 werr:
1983 fclose(fp);
1984 unlink(tmpfile);
1985 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1986 if (di) dictReleaseIterator(di);
1987 return REDIS_ERR;
1988 }
1989
1990 static int rdbSaveBackground(char *filename) {
1991 pid_t childpid;
1992
1993 if (server.bgsaveinprogress) return REDIS_ERR;
1994 if ((childpid = fork()) == 0) {
1995 /* Child */
1996 close(server.fd);
1997 if (rdbSave(filename) == REDIS_OK) {
1998 exit(0);
1999 } else {
2000 exit(1);
2001 }
2002 } else {
2003 /* Parent */
2004 if (childpid == -1) {
2005 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2006 strerror(errno));
2007 return REDIS_ERR;
2008 }
2009 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2010 server.bgsaveinprogress = 1;
2011 server.bgsavechildpid = childpid;
2012 return REDIS_OK;
2013 }
2014 return REDIS_OK; /* unreached */
2015 }
2016
2017 static int rdbLoadType(FILE *fp) {
2018 unsigned char type;
2019 if (fread(&type,1,1,fp) == 0) return -1;
2020 return type;
2021 }
2022
2023 static time_t rdbLoadTime(FILE *fp) {
2024 int32_t t32;
2025 if (fread(&t32,4,1,fp) == 0) return -1;
2026 return (time_t) t32;
2027 }
2028
2029 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2030 * of this file for a description of how this are stored on disk.
2031 *
2032 * isencoded is set to 1 if the readed length is not actually a length but
2033 * an "encoding type", check the above comments for more info */
2034 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2035 unsigned char buf[2];
2036 uint32_t len;
2037
2038 if (isencoded) *isencoded = 0;
2039 if (rdbver == 0) {
2040 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2041 return ntohl(len);
2042 } else {
2043 int type;
2044
2045 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2046 type = (buf[0]&0xC0)>>6;
2047 if (type == REDIS_RDB_6BITLEN) {
2048 /* Read a 6 bit len */
2049 return buf[0]&0x3F;
2050 } else if (type == REDIS_RDB_ENCVAL) {
2051 /* Read a 6 bit len encoding type */
2052 if (isencoded) *isencoded = 1;
2053 return buf[0]&0x3F;
2054 } else if (type == REDIS_RDB_14BITLEN) {
2055 /* Read a 14 bit len */
2056 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2057 return ((buf[0]&0x3F)<<8)|buf[1];
2058 } else {
2059 /* Read a 32 bit len */
2060 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2061 return ntohl(len);
2062 }
2063 }
2064 }
2065
2066 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2067 unsigned char enc[4];
2068 long long val;
2069
2070 if (enctype == REDIS_RDB_ENC_INT8) {
2071 if (fread(enc,1,1,fp) == 0) return NULL;
2072 val = (signed char)enc[0];
2073 } else if (enctype == REDIS_RDB_ENC_INT16) {
2074 uint16_t v;
2075 if (fread(enc,2,1,fp) == 0) return NULL;
2076 v = enc[0]|(enc[1]<<8);
2077 val = (int16_t)v;
2078 } else if (enctype == REDIS_RDB_ENC_INT32) {
2079 uint32_t v;
2080 if (fread(enc,4,1,fp) == 0) return NULL;
2081 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2082 val = (int32_t)v;
2083 } else {
2084 val = 0; /* anti-warning */
2085 assert(0!=0);
2086 }
2087 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2088 }
2089
2090 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2091 unsigned int len, clen;
2092 unsigned char *c = NULL;
2093 sds val = NULL;
2094
2095 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2096 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2097 if ((c = zmalloc(clen)) == NULL) goto err;
2098 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2099 if (fread(c,clen,1,fp) == 0) goto err;
2100 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2101 zfree(c);
2102 return createObject(REDIS_STRING,val);
2103 err:
2104 zfree(c);
2105 sdsfree(val);
2106 return NULL;
2107 }
2108
2109 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2110 int isencoded;
2111 uint32_t len;
2112 sds val;
2113
2114 len = rdbLoadLen(fp,rdbver,&isencoded);
2115 if (isencoded) {
2116 switch(len) {
2117 case REDIS_RDB_ENC_INT8:
2118 case REDIS_RDB_ENC_INT16:
2119 case REDIS_RDB_ENC_INT32:
2120 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2121 case REDIS_RDB_ENC_LZF:
2122 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2123 default:
2124 assert(0!=0);
2125 }
2126 }
2127
2128 if (len == REDIS_RDB_LENERR) return NULL;
2129 val = sdsnewlen(NULL,len);
2130 if (len && fread(val,len,1,fp) == 0) {
2131 sdsfree(val);
2132 return NULL;
2133 }
2134 return tryObjectSharing(createObject(REDIS_STRING,val));
2135 }
2136
2137 static int rdbLoad(char *filename) {
2138 FILE *fp;
2139 robj *keyobj = NULL;
2140 uint32_t dbid;
2141 int type, retval, rdbver;
2142 dict *d = server.db[0].dict;
2143 redisDb *db = server.db+0;
2144 char buf[1024];
2145 time_t expiretime = -1, now = time(NULL);
2146
2147 fp = fopen(filename,"r");
2148 if (!fp) return REDIS_ERR;
2149 if (fread(buf,9,1,fp) == 0) goto eoferr;
2150 buf[9] = '\0';
2151 if (memcmp(buf,"REDIS",5) != 0) {
2152 fclose(fp);
2153 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2154 return REDIS_ERR;
2155 }
2156 rdbver = atoi(buf+5);
2157 if (rdbver > 1) {
2158 fclose(fp);
2159 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2160 return REDIS_ERR;
2161 }
2162 while(1) {
2163 robj *o;
2164
2165 /* Read type. */
2166 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2167 if (type == REDIS_EXPIRETIME) {
2168 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2169 /* We read the time so we need to read the object type again */
2170 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2171 }
2172 if (type == REDIS_EOF) break;
2173 /* Handle SELECT DB opcode as a special case */
2174 if (type == REDIS_SELECTDB) {
2175 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2176 goto eoferr;
2177 if (dbid >= (unsigned)server.dbnum) {
2178 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2179 exit(1);
2180 }
2181 db = server.db+dbid;
2182 d = db->dict;
2183 continue;
2184 }
2185 /* Read key */
2186 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2187
2188 if (type == REDIS_STRING) {
2189 /* Read string value */
2190 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2191 } else if (type == REDIS_LIST || type == REDIS_SET) {
2192 /* Read list/set value */
2193 uint32_t listlen;
2194
2195 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2196 goto eoferr;
2197 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2198 /* Load every single element of the list/set */
2199 while(listlen--) {
2200 robj *ele;
2201
2202 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2203 if (type == REDIS_LIST) {
2204 if (!listAddNodeTail((list*)o->ptr,ele))
2205 oom("listAddNodeTail");
2206 } else {
2207 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2208 oom("dictAdd");
2209 }
2210 }
2211 } else {
2212 assert(0 != 0);
2213 }
2214 /* Add the new object in the hash table */
2215 retval = dictAdd(d,keyobj,o);
2216 if (retval == DICT_ERR) {
2217 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2218 exit(1);
2219 }
2220 /* Set the expire time if needed */
2221 if (expiretime != -1) {
2222 setExpire(db,keyobj,expiretime);
2223 /* Delete this key if already expired */
2224 if (expiretime < now) deleteKey(db,keyobj);
2225 expiretime = -1;
2226 }
2227 keyobj = o = NULL;
2228 }
2229 fclose(fp);
2230 return REDIS_OK;
2231
2232 eoferr: /* unexpected end of file is handled here with a fatal exit */
2233 if (keyobj) decrRefCount(keyobj);
2234 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2235 exit(1);
2236 return REDIS_ERR; /* Just to avoid warning */
2237 }
2238
2239 /*================================== Commands =============================== */
2240
2241 static void authCommand(redisClient *c) {
2242 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2243 c->authenticated = 1;
2244 addReply(c,shared.ok);
2245 } else {
2246 c->authenticated = 0;
2247 addReply(c,shared.err);
2248 }
2249 }
2250
2251 static void pingCommand(redisClient *c) {
2252 addReply(c,shared.pong);
2253 }
2254
2255 static void echoCommand(redisClient *c) {
2256 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
2257 (int)sdslen(c->argv[1]->ptr)));
2258 addReply(c,c->argv[1]);
2259 addReply(c,shared.crlf);
2260 }
2261
2262 /*=================================== Strings =============================== */
2263
2264 static void setGenericCommand(redisClient *c, int nx) {
2265 int retval;
2266
2267 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2268 if (retval == DICT_ERR) {
2269 if (!nx) {
2270 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2271 incrRefCount(c->argv[2]);
2272 } else {
2273 addReply(c,shared.czero);
2274 return;
2275 }
2276 } else {
2277 incrRefCount(c->argv[1]);
2278 incrRefCount(c->argv[2]);
2279 }
2280 server.dirty++;
2281 removeExpire(c->db,c->argv[1]);
2282 addReply(c, nx ? shared.cone : shared.ok);
2283 }
2284
2285 static void setCommand(redisClient *c) {
2286 setGenericCommand(c,0);
2287 }
2288
2289 static void setnxCommand(redisClient *c) {
2290 setGenericCommand(c,1);
2291 }
2292
2293 static void getCommand(redisClient *c) {
2294 robj *o = lookupKeyRead(c->db,c->argv[1]);
2295
2296 if (o == NULL) {
2297 addReply(c,shared.nullbulk);
2298 } else {
2299 if (o->type != REDIS_STRING) {
2300 addReply(c,shared.wrongtypeerr);
2301 } else {
2302 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2303 addReply(c,o);
2304 addReply(c,shared.crlf);
2305 }
2306 }
2307 }
2308
2309 static void getSetCommand(redisClient *c) {
2310 getCommand(c);
2311 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2312 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2313 } else {
2314 incrRefCount(c->argv[1]);
2315 }
2316 incrRefCount(c->argv[2]);
2317 server.dirty++;
2318 removeExpire(c->db,c->argv[1]);
2319 }
2320
2321 static void mgetCommand(redisClient *c) {
2322 int j;
2323
2324 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2325 for (j = 1; j < c->argc; j++) {
2326 robj *o = lookupKeyRead(c->db,c->argv[j]);
2327 if (o == NULL) {
2328 addReply(c,shared.nullbulk);
2329 } else {
2330 if (o->type != REDIS_STRING) {
2331 addReply(c,shared.nullbulk);
2332 } else {
2333 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2334 addReply(c,o);
2335 addReply(c,shared.crlf);
2336 }
2337 }
2338 }
2339 }
2340
2341 static void incrDecrCommand(redisClient *c, long long incr) {
2342 long long value;
2343 int retval;
2344 robj *o;
2345
2346 o = lookupKeyWrite(c->db,c->argv[1]);
2347 if (o == NULL) {
2348 value = 0;
2349 } else {
2350 if (o->type != REDIS_STRING) {
2351 value = 0;
2352 } else {
2353 char *eptr;
2354
2355 value = strtoll(o->ptr, &eptr, 10);
2356 }
2357 }
2358
2359 value += incr;
2360 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2361 retval = dictAdd(c->db->dict,c->argv[1],o);
2362 if (retval == DICT_ERR) {
2363 dictReplace(c->db->dict,c->argv[1],o);
2364 removeExpire(c->db,c->argv[1]);
2365 } else {
2366 incrRefCount(c->argv[1]);
2367 }
2368 server.dirty++;
2369 addReply(c,shared.colon);
2370 addReply(c,o);
2371 addReply(c,shared.crlf);
2372 }
2373
2374 static void incrCommand(redisClient *c) {
2375 incrDecrCommand(c,1);
2376 }
2377
2378 static void decrCommand(redisClient *c) {
2379 incrDecrCommand(c,-1);
2380 }
2381
2382 static void incrbyCommand(redisClient *c) {
2383 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2384 incrDecrCommand(c,incr);
2385 }
2386
2387 static void decrbyCommand(redisClient *c) {
2388 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2389 incrDecrCommand(c,-incr);
2390 }
2391
2392 /* ========================= Type agnostic commands ========================= */
2393
2394 static void delCommand(redisClient *c) {
2395 int deleted = 0, j;
2396
2397 for (j = 1; j < c->argc; j++) {
2398 if (deleteKey(c->db,c->argv[j])) {
2399 server.dirty++;
2400 deleted++;
2401 }
2402 }
2403 switch(deleted) {
2404 case 0:
2405 addReply(c,shared.czero);
2406 break;
2407 case 1:
2408 addReply(c,shared.cone);
2409 break;
2410 default:
2411 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2412 break;
2413 }
2414 }
2415
2416 static void existsCommand(redisClient *c) {
2417 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2418 }
2419
2420 static void selectCommand(redisClient *c) {
2421 int id = atoi(c->argv[1]->ptr);
2422
2423 if (selectDb(c,id) == REDIS_ERR) {
2424 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2425 } else {
2426 addReply(c,shared.ok);
2427 }
2428 }
2429
2430 static void randomkeyCommand(redisClient *c) {
2431 dictEntry *de;
2432
2433 while(1) {
2434 de = dictGetRandomKey(c->db->dict);
2435 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2436 }
2437 if (de == NULL) {
2438 addReply(c,shared.plus);
2439 addReply(c,shared.crlf);
2440 } else {
2441 addReply(c,shared.plus);
2442 addReply(c,dictGetEntryKey(de));
2443 addReply(c,shared.crlf);
2444 }
2445 }
2446
2447 static void keysCommand(redisClient *c) {
2448 dictIterator *di;
2449 dictEntry *de;
2450 sds pattern = c->argv[1]->ptr;
2451 int plen = sdslen(pattern);
2452 int numkeys = 0, keyslen = 0;
2453 robj *lenobj = createObject(REDIS_STRING,NULL);
2454
2455 di = dictGetIterator(c->db->dict);
2456 if (!di) oom("dictGetIterator");
2457 addReply(c,lenobj);
2458 decrRefCount(lenobj);
2459 while((de = dictNext(di)) != NULL) {
2460 robj *keyobj = dictGetEntryKey(de);
2461
2462 sds key = keyobj->ptr;
2463 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2464 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2465 if (expireIfNeeded(c->db,keyobj) == 0) {
2466 if (numkeys != 0)
2467 addReply(c,shared.space);
2468 addReply(c,keyobj);
2469 numkeys++;
2470 keyslen += sdslen(key);
2471 }
2472 }
2473 }
2474 dictReleaseIterator(di);
2475 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2476 addReply(c,shared.crlf);
2477 }
2478
2479 static void dbsizeCommand(redisClient *c) {
2480 addReplySds(c,
2481 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2482 }
2483
2484 static void lastsaveCommand(redisClient *c) {
2485 addReplySds(c,
2486 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2487 }
2488
2489 static void typeCommand(redisClient *c) {
2490 robj *o;
2491 char *type;
2492
2493 o = lookupKeyRead(c->db,c->argv[1]);
2494 if (o == NULL) {
2495 type = "+none";
2496 } else {
2497 switch(o->type) {
2498 case REDIS_STRING: type = "+string"; break;
2499 case REDIS_LIST: type = "+list"; break;
2500 case REDIS_SET: type = "+set"; break;
2501 default: type = "unknown"; break;
2502 }
2503 }
2504 addReplySds(c,sdsnew(type));
2505 addReply(c,shared.crlf);
2506 }
2507
2508 static void saveCommand(redisClient *c) {
2509 if (server.bgsaveinprogress) {
2510 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2511 return;
2512 }
2513 if (rdbSave(server.dbfilename) == REDIS_OK) {
2514 addReply(c,shared.ok);
2515 } else {
2516 addReply(c,shared.err);
2517 }
2518 }
2519
2520 static void bgsaveCommand(redisClient *c) {
2521 if (server.bgsaveinprogress) {
2522 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2523 return;
2524 }
2525 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2526 addReply(c,shared.ok);
2527 } else {
2528 addReply(c,shared.err);
2529 }
2530 }
2531
2532 static void shutdownCommand(redisClient *c) {
2533 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2534 if (server.bgsaveinprogress) {
2535 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2536 signal(SIGCHLD, SIG_IGN);
2537 kill(server.bgsavechildpid,SIGKILL);
2538 }
2539 if (rdbSave(server.dbfilename) == REDIS_OK) {
2540 if (server.daemonize)
2541 unlink(server.pidfile);
2542 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2543 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2544 exit(1);
2545 } else {
2546 signal(SIGCHLD, SIG_DFL);
2547 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2548 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2549 }
2550 }
2551
2552 static void renameGenericCommand(redisClient *c, int nx) {
2553 robj *o;
2554
2555 /* To use the same key as src and dst is probably an error */
2556 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2557 addReply(c,shared.sameobjecterr);
2558 return;
2559 }
2560
2561 o = lookupKeyWrite(c->db,c->argv[1]);
2562 if (o == NULL) {
2563 addReply(c,shared.nokeyerr);
2564 return;
2565 }
2566 incrRefCount(o);
2567 deleteIfVolatile(c->db,c->argv[2]);
2568 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2569 if (nx) {
2570 decrRefCount(o);
2571 addReply(c,shared.czero);
2572 return;
2573 }
2574 dictReplace(c->db->dict,c->argv[2],o);
2575 } else {
2576 incrRefCount(c->argv[2]);
2577 }
2578 deleteKey(c->db,c->argv[1]);
2579 server.dirty++;
2580 addReply(c,nx ? shared.cone : shared.ok);
2581 }
2582
2583 static void renameCommand(redisClient *c) {
2584 renameGenericCommand(c,0);
2585 }
2586
2587 static void renamenxCommand(redisClient *c) {
2588 renameGenericCommand(c,1);
2589 }
2590
2591 static void moveCommand(redisClient *c) {
2592 robj *o;
2593 redisDb *src, *dst;
2594 int srcid;
2595
2596 /* Obtain source and target DB pointers */
2597 src = c->db;
2598 srcid = c->db->id;
2599 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2600 addReply(c,shared.outofrangeerr);
2601 return;
2602 }
2603 dst = c->db;
2604 selectDb(c,srcid); /* Back to the source DB */
2605
2606 /* If the user is moving using as target the same
2607 * DB as the source DB it is probably an error. */
2608 if (src == dst) {
2609 addReply(c,shared.sameobjecterr);
2610 return;
2611 }
2612
2613 /* Check if the element exists and get a reference */
2614 o = lookupKeyWrite(c->db,c->argv[1]);
2615 if (!o) {
2616 addReply(c,shared.czero);
2617 return;
2618 }
2619
2620 /* Try to add the element to the target DB */
2621 deleteIfVolatile(dst,c->argv[1]);
2622 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2623 addReply(c,shared.czero);
2624 return;
2625 }
2626 incrRefCount(c->argv[1]);
2627 incrRefCount(o);
2628
2629 /* OK! key moved, free the entry in the source DB */
2630 deleteKey(src,c->argv[1]);
2631 server.dirty++;
2632 addReply(c,shared.cone);
2633 }
2634
2635 /* =================================== Lists ================================ */
2636 static void pushGenericCommand(redisClient *c, int where) {
2637 robj *lobj;
2638 list *list;
2639
2640 lobj = lookupKeyWrite(c->db,c->argv[1]);
2641 if (lobj == NULL) {
2642 lobj = createListObject();
2643 list = lobj->ptr;
2644 if (where == REDIS_HEAD) {
2645 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2646 } else {
2647 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2648 }
2649 dictAdd(c->db->dict,c->argv[1],lobj);
2650 incrRefCount(c->argv[1]);
2651 incrRefCount(c->argv[2]);
2652 } else {
2653 if (lobj->type != REDIS_LIST) {
2654 addReply(c,shared.wrongtypeerr);
2655 return;
2656 }
2657 list = lobj->ptr;
2658 if (where == REDIS_HEAD) {
2659 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2660 } else {
2661 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2662 }
2663 incrRefCount(c->argv[2]);
2664 }
2665 server.dirty++;
2666 addReply(c,shared.ok);
2667 }
2668
2669 static void lpushCommand(redisClient *c) {
2670 pushGenericCommand(c,REDIS_HEAD);
2671 }
2672
2673 static void rpushCommand(redisClient *c) {
2674 pushGenericCommand(c,REDIS_TAIL);
2675 }
2676
2677 static void llenCommand(redisClient *c) {
2678 robj *o;
2679 list *l;
2680
2681 o = lookupKeyRead(c->db,c->argv[1]);
2682 if (o == NULL) {
2683 addReply(c,shared.czero);
2684 return;
2685 } else {
2686 if (o->type != REDIS_LIST) {
2687 addReply(c,shared.wrongtypeerr);
2688 } else {
2689 l = o->ptr;
2690 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
2691 }
2692 }
2693 }
2694
2695 static void lindexCommand(redisClient *c) {
2696 robj *o;
2697 int index = atoi(c->argv[2]->ptr);
2698
2699 o = lookupKeyRead(c->db,c->argv[1]);
2700 if (o == NULL) {
2701 addReply(c,shared.nullbulk);
2702 } else {
2703 if (o->type != REDIS_LIST) {
2704 addReply(c,shared.wrongtypeerr);
2705 } else {
2706 list *list = o->ptr;
2707 listNode *ln;
2708
2709 ln = listIndex(list, index);
2710 if (ln == NULL) {
2711 addReply(c,shared.nullbulk);
2712 } else {
2713 robj *ele = listNodeValue(ln);
2714 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2715 addReply(c,ele);
2716 addReply(c,shared.crlf);
2717 }
2718 }
2719 }
2720 }
2721
2722 static void lsetCommand(redisClient *c) {
2723 robj *o;
2724 int index = atoi(c->argv[2]->ptr);
2725
2726 o = lookupKeyWrite(c->db,c->argv[1]);
2727 if (o == NULL) {
2728 addReply(c,shared.nokeyerr);
2729 } else {
2730 if (o->type != REDIS_LIST) {
2731 addReply(c,shared.wrongtypeerr);
2732 } else {
2733 list *list = o->ptr;
2734 listNode *ln;
2735
2736 ln = listIndex(list, index);
2737 if (ln == NULL) {
2738 addReply(c,shared.outofrangeerr);
2739 } else {
2740 robj *ele = listNodeValue(ln);
2741
2742 decrRefCount(ele);
2743 listNodeValue(ln) = c->argv[3];
2744 incrRefCount(c->argv[3]);
2745 addReply(c,shared.ok);
2746 server.dirty++;
2747 }
2748 }
2749 }
2750 }
2751
2752 static void popGenericCommand(redisClient *c, int where) {
2753 robj *o;
2754
2755 o = lookupKeyWrite(c->db,c->argv[1]);
2756 if (o == NULL) {
2757 addReply(c,shared.nullbulk);
2758 } else {
2759 if (o->type != REDIS_LIST) {
2760 addReply(c,shared.wrongtypeerr);
2761 } else {
2762 list *list = o->ptr;
2763 listNode *ln;
2764
2765 if (where == REDIS_HEAD)
2766 ln = listFirst(list);
2767 else
2768 ln = listLast(list);
2769
2770 if (ln == NULL) {
2771 addReply(c,shared.nullbulk);
2772 } else {
2773 robj *ele = listNodeValue(ln);
2774 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2775 addReply(c,ele);
2776 addReply(c,shared.crlf);
2777 listDelNode(list,ln);
2778 server.dirty++;
2779 }
2780 }
2781 }
2782 }
2783
2784 static void lpopCommand(redisClient *c) {
2785 popGenericCommand(c,REDIS_HEAD);
2786 }
2787
2788 static void rpopCommand(redisClient *c) {
2789 popGenericCommand(c,REDIS_TAIL);
2790 }
2791
2792 static void lrangeCommand(redisClient *c) {
2793 robj *o;
2794 int start = atoi(c->argv[2]->ptr);
2795 int end = atoi(c->argv[3]->ptr);
2796
2797 o = lookupKeyRead(c->db,c->argv[1]);
2798 if (o == NULL) {
2799 addReply(c,shared.nullmultibulk);
2800 } else {
2801 if (o->type != REDIS_LIST) {
2802 addReply(c,shared.wrongtypeerr);
2803 } else {
2804 list *list = o->ptr;
2805 listNode *ln;
2806 int llen = listLength(list);
2807 int rangelen, j;
2808 robj *ele;
2809
2810 /* convert negative indexes */
2811 if (start < 0) start = llen+start;
2812 if (end < 0) end = llen+end;
2813 if (start < 0) start = 0;
2814 if (end < 0) end = 0;
2815
2816 /* indexes sanity checks */
2817 if (start > end || start >= llen) {
2818 /* Out of range start or start > end result in empty list */
2819 addReply(c,shared.emptymultibulk);
2820 return;
2821 }
2822 if (end >= llen) end = llen-1;
2823 rangelen = (end-start)+1;
2824
2825 /* Return the result in form of a multi-bulk reply */
2826 ln = listIndex(list, start);
2827 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
2828 for (j = 0; j < rangelen; j++) {
2829 ele = listNodeValue(ln);
2830 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2831 addReply(c,ele);
2832 addReply(c,shared.crlf);
2833 ln = ln->next;
2834 }
2835 }
2836 }
2837 }
2838
2839 static void ltrimCommand(redisClient *c) {
2840 robj *o;
2841 int start = atoi(c->argv[2]->ptr);
2842 int end = atoi(c->argv[3]->ptr);
2843
2844 o = lookupKeyWrite(c->db,c->argv[1]);
2845 if (o == NULL) {
2846 addReply(c,shared.nokeyerr);
2847 } else {
2848 if (o->type != REDIS_LIST) {
2849 addReply(c,shared.wrongtypeerr);
2850 } else {
2851 list *list = o->ptr;
2852 listNode *ln;
2853 int llen = listLength(list);
2854 int j, ltrim, rtrim;
2855
2856 /* convert negative indexes */
2857 if (start < 0) start = llen+start;
2858 if (end < 0) end = llen+end;
2859 if (start < 0) start = 0;
2860 if (end < 0) end = 0;
2861
2862 /* indexes sanity checks */
2863 if (start > end || start >= llen) {
2864 /* Out of range start or start > end result in empty list */
2865 ltrim = llen;
2866 rtrim = 0;
2867 } else {
2868 if (end >= llen) end = llen-1;
2869 ltrim = start;
2870 rtrim = llen-end-1;
2871 }
2872
2873 /* Remove list elements to perform the trim */
2874 for (j = 0; j < ltrim; j++) {
2875 ln = listFirst(list);
2876 listDelNode(list,ln);
2877 }
2878 for (j = 0; j < rtrim; j++) {
2879 ln = listLast(list);
2880 listDelNode(list,ln);
2881 }
2882 addReply(c,shared.ok);
2883 server.dirty++;
2884 }
2885 }
2886 }
2887
2888 static void lremCommand(redisClient *c) {
2889 robj *o;
2890
2891 o = lookupKeyWrite(c->db,c->argv[1]);
2892 if (o == NULL) {
2893 addReply(c,shared.czero);
2894 } else {
2895 if (o->type != REDIS_LIST) {
2896 addReply(c,shared.wrongtypeerr);
2897 } else {
2898 list *list = o->ptr;
2899 listNode *ln, *next;
2900 int toremove = atoi(c->argv[2]->ptr);
2901 int removed = 0;
2902 int fromtail = 0;
2903
2904 if (toremove < 0) {
2905 toremove = -toremove;
2906 fromtail = 1;
2907 }
2908 ln = fromtail ? list->tail : list->head;
2909 while (ln) {
2910 robj *ele = listNodeValue(ln);
2911
2912 next = fromtail ? ln->prev : ln->next;
2913 if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) {
2914 listDelNode(list,ln);
2915 server.dirty++;
2916 removed++;
2917 if (toremove && removed == toremove) break;
2918 }
2919 ln = next;
2920 }
2921 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
2922 }
2923 }
2924 }
2925
2926 /* ==================================== Sets ================================ */
2927
2928 static void saddCommand(redisClient *c) {
2929 robj *set;
2930
2931 set = lookupKeyWrite(c->db,c->argv[1]);
2932 if (set == NULL) {
2933 set = createSetObject();
2934 dictAdd(c->db->dict,c->argv[1],set);
2935 incrRefCount(c->argv[1]);
2936 } else {
2937 if (set->type != REDIS_SET) {
2938 addReply(c,shared.wrongtypeerr);
2939 return;
2940 }
2941 }
2942 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
2943 incrRefCount(c->argv[2]);
2944 server.dirty++;
2945 addReply(c,shared.cone);
2946 } else {
2947 addReply(c,shared.czero);
2948 }
2949 }
2950
2951 static void sremCommand(redisClient *c) {
2952 robj *set;
2953
2954 set = lookupKeyWrite(c->db,c->argv[1]);
2955 if (set == NULL) {
2956 addReply(c,shared.czero);
2957 } else {
2958 if (set->type != REDIS_SET) {
2959 addReply(c,shared.wrongtypeerr);
2960 return;
2961 }
2962 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
2963 server.dirty++;
2964 addReply(c,shared.cone);
2965 } else {
2966 addReply(c,shared.czero);
2967 }
2968 }
2969 }
2970
2971 static void smoveCommand(redisClient *c) {
2972 robj *srcset, *dstset;
2973
2974 srcset = lookupKeyWrite(c->db,c->argv[1]);
2975 dstset = lookupKeyWrite(c->db,c->argv[2]);
2976
2977 /* If the source key does not exist return 0, if it's of the wrong type
2978 * raise an error */
2979 if (srcset == NULL || srcset->type != REDIS_SET) {
2980 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
2981 return;
2982 }
2983 /* Error if the destination key is not a set as well */
2984 if (dstset && dstset->type != REDIS_SET) {
2985 addReply(c,shared.wrongtypeerr);
2986 return;
2987 }
2988 /* Remove the element from the source set */
2989 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
2990 /* Key not found in the src set! return zero */
2991 addReply(c,shared.czero);
2992 return;
2993 }
2994 server.dirty++;
2995 /* Add the element to the destination set */
2996 if (!dstset) {
2997 dstset = createSetObject();
2998 dictAdd(c->db->dict,c->argv[2],dstset);
2999 incrRefCount(c->argv[2]);
3000 }
3001 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3002 incrRefCount(c->argv[3]);
3003 addReply(c,shared.cone);
3004 }
3005
3006 static void sismemberCommand(redisClient *c) {
3007 robj *set;
3008
3009 set = lookupKeyRead(c->db,c->argv[1]);
3010 if (set == NULL) {
3011 addReply(c,shared.czero);
3012 } else {
3013 if (set->type != REDIS_SET) {
3014 addReply(c,shared.wrongtypeerr);
3015 return;
3016 }
3017 if (dictFind(set->ptr,c->argv[2]))
3018 addReply(c,shared.cone);
3019 else
3020 addReply(c,shared.czero);
3021 }
3022 }
3023
3024 static void scardCommand(redisClient *c) {
3025 robj *o;
3026 dict *s;
3027
3028 o = lookupKeyRead(c->db,c->argv[1]);
3029 if (o == NULL) {
3030 addReply(c,shared.czero);
3031 return;
3032 } else {
3033 if (o->type != REDIS_SET) {
3034 addReply(c,shared.wrongtypeerr);
3035 } else {
3036 s = o->ptr;
3037 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3038 dictSize(s)));
3039 }
3040 }
3041 }
3042
3043 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3044 dict **d1 = (void*) s1, **d2 = (void*) s2;
3045
3046 return dictSize(*d1)-dictSize(*d2);
3047 }
3048
3049 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3050 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3051 dictIterator *di;
3052 dictEntry *de;
3053 robj *lenobj = NULL, *dstset = NULL;
3054 int j, cardinality = 0;
3055
3056 if (!dv) oom("sinterGenericCommand");
3057 for (j = 0; j < setsnum; j++) {
3058 robj *setobj;
3059
3060 setobj = dstkey ?
3061 lookupKeyWrite(c->db,setskeys[j]) :
3062 lookupKeyRead(c->db,setskeys[j]);
3063 if (!setobj) {
3064 zfree(dv);
3065 if (dstkey) {
3066 deleteKey(c->db,dstkey);
3067 addReply(c,shared.ok);
3068 } else {
3069 addReply(c,shared.nullmultibulk);
3070 }
3071 return;
3072 }
3073 if (setobj->type != REDIS_SET) {
3074 zfree(dv);
3075 addReply(c,shared.wrongtypeerr);
3076 return;
3077 }
3078 dv[j] = setobj->ptr;
3079 }
3080 /* Sort sets from the smallest to largest, this will improve our
3081 * algorithm's performace */
3082 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3083
3084 /* The first thing we should output is the total number of elements...
3085 * since this is a multi-bulk write, but at this stage we don't know
3086 * the intersection set size, so we use a trick, append an empty object
3087 * to the output list and save the pointer to later modify it with the
3088 * right length */
3089 if (!dstkey) {
3090 lenobj = createObject(REDIS_STRING,NULL);
3091 addReply(c,lenobj);
3092 decrRefCount(lenobj);
3093 } else {
3094 /* If we have a target key where to store the resulting set
3095 * create this key with an empty set inside */
3096 dstset = createSetObject();
3097 }
3098
3099 /* Iterate all the elements of the first (smallest) set, and test
3100 * the element against all the other sets, if at least one set does
3101 * not include the element it is discarded */
3102 di = dictGetIterator(dv[0]);
3103 if (!di) oom("dictGetIterator");
3104
3105 while((de = dictNext(di)) != NULL) {
3106 robj *ele;
3107
3108 for (j = 1; j < setsnum; j++)
3109 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3110 if (j != setsnum)
3111 continue; /* at least one set does not contain the member */
3112 ele = dictGetEntryKey(de);
3113 if (!dstkey) {
3114 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
3115 addReply(c,ele);
3116 addReply(c,shared.crlf);
3117 cardinality++;
3118 } else {
3119 dictAdd(dstset->ptr,ele,NULL);
3120 incrRefCount(ele);
3121 }
3122 }
3123 dictReleaseIterator(di);
3124
3125 if (dstkey) {
3126 /* Store the resulting set into the target */
3127 deleteKey(c->db,dstkey);
3128 dictAdd(c->db->dict,dstkey,dstset);
3129 incrRefCount(dstkey);
3130 }
3131
3132 if (!dstkey) {
3133 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3134 } else {
3135 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3136 dictSize((dict*)dstset->ptr)));
3137 server.dirty++;
3138 }
3139 zfree(dv);
3140 }
3141
3142 static void sinterCommand(redisClient *c) {
3143 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3144 }
3145
3146 static void sinterstoreCommand(redisClient *c) {
3147 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3148 }
3149
3150 #define REDIS_OP_UNION 0
3151 #define REDIS_OP_DIFF 1
3152
3153 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3154 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3155 dictIterator *di;
3156 dictEntry *de;
3157 robj *dstset = NULL;
3158 int j, cardinality = 0;
3159
3160 if (!dv) oom("sunionDiffGenericCommand");
3161 for (j = 0; j < setsnum; j++) {
3162 robj *setobj;
3163
3164 setobj = dstkey ?
3165 lookupKeyWrite(c->db,setskeys[j]) :
3166 lookupKeyRead(c->db,setskeys[j]);
3167 if (!setobj) {
3168 dv[j] = NULL;
3169 continue;
3170 }
3171 if (setobj->type != REDIS_SET) {
3172 zfree(dv);
3173 addReply(c,shared.wrongtypeerr);
3174 return;
3175 }
3176 dv[j] = setobj->ptr;
3177 }
3178
3179 /* We need a temp set object to store our union. If the dstkey
3180 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3181 * this set object will be the resulting object to set into the target key*/
3182 dstset = createSetObject();
3183
3184 /* Iterate all the elements of all the sets, add every element a single
3185 * time to the result set */
3186 for (j = 0; j < setsnum; j++) {
3187 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3188 if (!dv[j]) continue; /* non existing keys are like empty sets */
3189
3190 di = dictGetIterator(dv[j]);
3191 if (!di) oom("dictGetIterator");
3192
3193 while((de = dictNext(di)) != NULL) {
3194 robj *ele;
3195
3196 /* dictAdd will not add the same element multiple times */
3197 ele = dictGetEntryKey(de);
3198 if (op == REDIS_OP_UNION || j == 0) {
3199 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3200 incrRefCount(ele);
3201 cardinality++;
3202 }
3203 } else if (op == REDIS_OP_DIFF) {
3204 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3205 cardinality--;
3206 }
3207 }
3208 }
3209 dictReleaseIterator(di);
3210
3211 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3212 }
3213
3214 /* Output the content of the resulting set, if not in STORE mode */
3215 if (!dstkey) {
3216 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3217 di = dictGetIterator(dstset->ptr);
3218 if (!di) oom("dictGetIterator");
3219 while((de = dictNext(di)) != NULL) {
3220 robj *ele;
3221
3222 ele = dictGetEntryKey(de);
3223 addReplySds(c,sdscatprintf(sdsempty(),
3224 "$%d\r\n",sdslen(ele->ptr)));
3225 addReply(c,ele);
3226 addReply(c,shared.crlf);
3227 }
3228 dictReleaseIterator(di);
3229 } else {
3230 /* If we have a target key where to store the resulting set
3231 * create this key with the result set inside */
3232 deleteKey(c->db,dstkey);
3233 dictAdd(c->db->dict,dstkey,dstset);
3234 incrRefCount(dstkey);
3235 }
3236
3237 /* Cleanup */
3238 if (!dstkey) {
3239 decrRefCount(dstset);
3240 } else {
3241 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3242 dictSize((dict*)dstset->ptr)));
3243 server.dirty++;
3244 }
3245 zfree(dv);
3246 }
3247
3248 static void sunionCommand(redisClient *c) {
3249 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3250 }
3251
3252 static void sunionstoreCommand(redisClient *c) {
3253 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3254 }
3255
3256 static void sdiffCommand(redisClient *c) {
3257 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3258 }
3259
3260 static void sdiffstoreCommand(redisClient *c) {
3261 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3262 }
3263
3264 static void flushdbCommand(redisClient *c) {
3265 server.dirty += dictSize(c->db->dict);
3266 dictEmpty(c->db->dict);
3267 dictEmpty(c->db->expires);
3268 addReply(c,shared.ok);
3269 }
3270
3271 static void flushallCommand(redisClient *c) {
3272 server.dirty += emptyDb();
3273 addReply(c,shared.ok);
3274 rdbSave(server.dbfilename);
3275 server.dirty++;
3276 }
3277
3278 static redisSortOperation *createSortOperation(int type, robj *pattern) {
3279 redisSortOperation *so = zmalloc(sizeof(*so));
3280 if (!so) oom("createSortOperation");
3281 so->type = type;
3282 so->pattern = pattern;
3283 return so;
3284 }
3285
3286 /* Return the value associated to the key with a name obtained
3287 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3288 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3289 char *p;
3290 sds spat, ssub;
3291 robj keyobj;
3292 int prefixlen, sublen, postfixlen;
3293 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3294 struct {
3295 long len;
3296 long free;
3297 char buf[REDIS_SORTKEY_MAX+1];
3298 } keyname;
3299
3300 spat = pattern->ptr;
3301 ssub = subst->ptr;
3302 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3303 p = strchr(spat,'*');
3304 if (!p) return NULL;
3305
3306 prefixlen = p-spat;
3307 sublen = sdslen(ssub);
3308 postfixlen = sdslen(spat)-(prefixlen+1);
3309 memcpy(keyname.buf,spat,prefixlen);
3310 memcpy(keyname.buf+prefixlen,ssub,sublen);
3311 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3312 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3313 keyname.len = prefixlen+sublen+postfixlen;
3314
3315 keyobj.refcount = 1;
3316 keyobj.type = REDIS_STRING;
3317 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3318
3319 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3320 return lookupKeyRead(db,&keyobj);
3321 }
3322
3323 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3324 * the additional parameter is not standard but a BSD-specific we have to
3325 * pass sorting parameters via the global 'server' structure */
3326 static int sortCompare(const void *s1, const void *s2) {
3327 const redisSortObject *so1 = s1, *so2 = s2;
3328 int cmp;
3329
3330 if (!server.sort_alpha) {
3331 /* Numeric sorting. Here it's trivial as we precomputed scores */
3332 if (so1->u.score > so2->u.score) {
3333 cmp = 1;
3334 } else if (so1->u.score < so2->u.score) {
3335 cmp = -1;
3336 } else {
3337 cmp = 0;
3338 }
3339 } else {
3340 /* Alphanumeric sorting */
3341 if (server.sort_bypattern) {
3342 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3343 /* At least one compare object is NULL */
3344 if (so1->u.cmpobj == so2->u.cmpobj)
3345 cmp = 0;
3346 else if (so1->u.cmpobj == NULL)
3347 cmp = -1;
3348 else
3349 cmp = 1;
3350 } else {
3351 /* We have both the objects, use strcoll */
3352 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3353 }
3354 } else {
3355 /* Compare elements directly */
3356 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3357 }
3358 }
3359 return server.sort_desc ? -cmp : cmp;
3360 }
3361
3362 /* The SORT command is the most complex command in Redis. Warning: this code
3363 * is optimized for speed and a bit less for readability */
3364 static void sortCommand(redisClient *c) {
3365 list *operations;
3366 int outputlen = 0;
3367 int desc = 0, alpha = 0;
3368 int limit_start = 0, limit_count = -1, start, end;
3369 int j, dontsort = 0, vectorlen;
3370 int getop = 0; /* GET operation counter */
3371 robj *sortval, *sortby = NULL;
3372 redisSortObject *vector; /* Resulting vector to sort */
3373
3374 /* Lookup the key to sort. It must be of the right types */
3375 sortval = lookupKeyRead(c->db,c->argv[1]);
3376 if (sortval == NULL) {
3377 addReply(c,shared.nokeyerr);
3378 return;
3379 }
3380 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3381 addReply(c,shared.wrongtypeerr);
3382 return;
3383 }
3384
3385 /* Create a list of operations to perform for every sorted element.
3386 * Operations can be GET/DEL/INCR/DECR */
3387 operations = listCreate();
3388 listSetFreeMethod(operations,zfree);
3389 j = 2;
3390
3391 /* Now we need to protect sortval incrementing its count, in the future
3392 * SORT may have options able to overwrite/delete keys during the sorting
3393 * and the sorted key itself may get destroied */
3394 incrRefCount(sortval);
3395
3396 /* The SORT command has an SQL-alike syntax, parse it */
3397 while(j < c->argc) {
3398 int leftargs = c->argc-j-1;
3399 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3400 desc = 0;
3401 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3402 desc = 1;
3403 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3404 alpha = 1;
3405 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3406 limit_start = atoi(c->argv[j+1]->ptr);
3407 limit_count = atoi(c->argv[j+2]->ptr);
3408 j+=2;
3409 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3410 sortby = c->argv[j+1];
3411 /* If the BY pattern does not contain '*', i.e. it is constant,
3412 * we don't need to sort nor to lookup the weight keys. */
3413 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3414 j++;
3415 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3416 listAddNodeTail(operations,createSortOperation(
3417 REDIS_SORT_GET,c->argv[j+1]));
3418 getop++;
3419 j++;
3420 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3421 listAddNodeTail(operations,createSortOperation(
3422 REDIS_SORT_DEL,c->argv[j+1]));
3423 j++;
3424 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3425 listAddNodeTail(operations,createSortOperation(
3426 REDIS_SORT_INCR,c->argv[j+1]));
3427 j++;
3428 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3429 listAddNodeTail(operations,createSortOperation(
3430 REDIS_SORT_DECR,c->argv[j+1]));
3431 j++;
3432 } else {
3433 decrRefCount(sortval);
3434 listRelease(operations);
3435 addReply(c,shared.syntaxerr);
3436 return;
3437 }
3438 j++;
3439 }
3440
3441 /* Load the sorting vector with all the objects to sort */
3442 vectorlen = (sortval->type == REDIS_LIST) ?
3443 listLength((list*)sortval->ptr) :
3444 dictSize((dict*)sortval->ptr);
3445 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3446 if (!vector) oom("allocating objects vector for SORT");
3447 j = 0;
3448 if (sortval->type == REDIS_LIST) {
3449 list *list = sortval->ptr;
3450 listNode *ln;
3451
3452 listRewind(list);
3453 while((ln = listYield(list))) {
3454 robj *ele = ln->value;
3455 vector[j].obj = ele;
3456 vector[j].u.score = 0;
3457 vector[j].u.cmpobj = NULL;
3458 j++;
3459 }
3460 } else {
3461 dict *set = sortval->ptr;
3462 dictIterator *di;
3463 dictEntry *setele;
3464
3465 di = dictGetIterator(set);
3466 if (!di) oom("dictGetIterator");
3467 while((setele = dictNext(di)) != NULL) {
3468 vector[j].obj = dictGetEntryKey(setele);
3469 vector[j].u.score = 0;
3470 vector[j].u.cmpobj = NULL;
3471 j++;
3472 }
3473 dictReleaseIterator(di);
3474 }
3475 assert(j == vectorlen);
3476
3477 /* Now it's time to load the right scores in the sorting vector */
3478 if (dontsort == 0) {
3479 for (j = 0; j < vectorlen; j++) {
3480 if (sortby) {
3481 robj *byval;
3482
3483 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3484 if (!byval || byval->type != REDIS_STRING) continue;
3485 if (alpha) {
3486 vector[j].u.cmpobj = byval;
3487 incrRefCount(byval);
3488 } else {
3489 vector[j].u.score = strtod(byval->ptr,NULL);
3490 }
3491 } else {
3492 if (!alpha) vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3493 }
3494 }
3495 }
3496
3497 /* We are ready to sort the vector... perform a bit of sanity check
3498 * on the LIMIT option too. We'll use a partial version of quicksort. */
3499 start = (limit_start < 0) ? 0 : limit_start;
3500 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3501 if (start >= vectorlen) {
3502 start = vectorlen-1;
3503 end = vectorlen-2;
3504 }
3505 if (end >= vectorlen) end = vectorlen-1;
3506
3507 if (dontsort == 0) {
3508 server.sort_desc = desc;
3509 server.sort_alpha = alpha;
3510 server.sort_bypattern = sortby ? 1 : 0;
3511 if (sortby && (start != 0 || end != vectorlen-1))
3512 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3513 else
3514 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3515 }
3516
3517 /* Send command output to the output buffer, performing the specified
3518 * GET/DEL/INCR/DECR operations if any. */
3519 outputlen = getop ? getop*(end-start+1) : end-start+1;
3520 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3521 for (j = start; j <= end; j++) {
3522 listNode *ln;
3523 if (!getop) {
3524 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3525 sdslen(vector[j].obj->ptr)));
3526 addReply(c,vector[j].obj);
3527 addReply(c,shared.crlf);
3528 }
3529 listRewind(operations);
3530 while((ln = listYield(operations))) {
3531 redisSortOperation *sop = ln->value;
3532 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3533 vector[j].obj);
3534
3535 if (sop->type == REDIS_SORT_GET) {
3536 if (!val || val->type != REDIS_STRING) {
3537 addReply(c,shared.nullbulk);
3538 } else {
3539 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3540 sdslen(val->ptr)));
3541 addReply(c,val);
3542 addReply(c,shared.crlf);
3543 }
3544 } else if (sop->type == REDIS_SORT_DEL) {
3545 /* TODO */
3546 }
3547 }
3548 }
3549
3550 /* Cleanup */
3551 decrRefCount(sortval);
3552 listRelease(operations);
3553 for (j = 0; j < vectorlen; j++) {
3554 if (sortby && alpha && vector[j].u.cmpobj)
3555 decrRefCount(vector[j].u.cmpobj);
3556 }
3557 zfree(vector);
3558 }
3559
3560 static void infoCommand(redisClient *c) {
3561 sds info;
3562 time_t uptime = time(NULL)-server.stat_starttime;
3563
3564 info = sdscatprintf(sdsempty(),
3565 "redis_version:%s\r\n"
3566 "uptime_in_seconds:%d\r\n"
3567 "uptime_in_days:%d\r\n"
3568 "connected_clients:%d\r\n"
3569 "connected_slaves:%d\r\n"
3570 "used_memory:%zu\r\n"
3571 "changes_since_last_save:%lld\r\n"
3572 "bgsave_in_progress:%d\r\n"
3573 "last_save_time:%d\r\n"
3574 "total_connections_received:%lld\r\n"
3575 "total_commands_processed:%lld\r\n"
3576 "role:%s\r\n"
3577 ,REDIS_VERSION,
3578 uptime,
3579 uptime/(3600*24),
3580 listLength(server.clients)-listLength(server.slaves),
3581 listLength(server.slaves),
3582 server.usedmemory,
3583 server.dirty,
3584 server.bgsaveinprogress,
3585 server.lastsave,
3586 server.stat_numconnections,
3587 server.stat_numcommands,
3588 server.masterhost == NULL ? "master" : "slave"
3589 );
3590 if (server.masterhost) {
3591 info = sdscatprintf(info,
3592 "master_host:%s\r\n"
3593 "master_port:%d\r\n"
3594 "master_link_status:%s\r\n"
3595 "master_last_io_seconds_ago:%d\r\n"
3596 ,server.masterhost,
3597 server.masterport,
3598 (server.replstate == REDIS_REPL_CONNECTED) ?
3599 "up" : "down",
3600 (int)(time(NULL)-server.master->lastinteraction)
3601 );
3602 }
3603 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
3604 addReplySds(c,info);
3605 addReply(c,shared.crlf);
3606 }
3607
3608 static void monitorCommand(redisClient *c) {
3609 /* ignore MONITOR if aleady slave or in monitor mode */
3610 if (c->flags & REDIS_SLAVE) return;
3611
3612 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3613 c->slaveseldb = 0;
3614 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3615 addReply(c,shared.ok);
3616 }
3617
3618 /* ================================= Expire ================================= */
3619 static int removeExpire(redisDb *db, robj *key) {
3620 if (dictDelete(db->expires,key) == DICT_OK) {
3621 return 1;
3622 } else {
3623 return 0;
3624 }
3625 }
3626
3627 static int setExpire(redisDb *db, robj *key, time_t when) {
3628 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
3629 return 0;
3630 } else {
3631 incrRefCount(key);
3632 return 1;
3633 }
3634 }
3635
3636 /* Return the expire time of the specified key, or -1 if no expire
3637 * is associated with this key (i.e. the key is non volatile) */
3638 static time_t getExpire(redisDb *db, robj *key) {
3639 dictEntry *de;
3640
3641 /* No expire? return ASAP */
3642 if (dictSize(db->expires) == 0 ||
3643 (de = dictFind(db->expires,key)) == NULL) return -1;
3644
3645 return (time_t) dictGetEntryVal(de);
3646 }
3647
3648 static int expireIfNeeded(redisDb *db, robj *key) {
3649 time_t when;
3650 dictEntry *de;
3651
3652 /* No expire? return ASAP */
3653 if (dictSize(db->expires) == 0 ||
3654 (de = dictFind(db->expires,key)) == NULL) return 0;
3655
3656 /* Lookup the expire */
3657 when = (time_t) dictGetEntryVal(de);
3658 if (time(NULL) <= when) return 0;
3659
3660 /* Delete the key */
3661 dictDelete(db->expires,key);
3662 return dictDelete(db->dict,key) == DICT_OK;
3663 }
3664
3665 static int deleteIfVolatile(redisDb *db, robj *key) {
3666 dictEntry *de;
3667
3668 /* No expire? return ASAP */
3669 if (dictSize(db->expires) == 0 ||
3670 (de = dictFind(db->expires,key)) == NULL) return 0;
3671
3672 /* Delete the key */
3673 server.dirty++;
3674 dictDelete(db->expires,key);
3675 return dictDelete(db->dict,key) == DICT_OK;
3676 }
3677
3678 static void expireCommand(redisClient *c) {
3679 dictEntry *de;
3680 int seconds = atoi(c->argv[2]->ptr);
3681
3682 de = dictFind(c->db->dict,c->argv[1]);
3683 if (de == NULL) {
3684 addReply(c,shared.czero);
3685 return;
3686 }
3687 if (seconds <= 0) {
3688 addReply(c, shared.czero);
3689 return;
3690 } else {
3691 time_t when = time(NULL)+seconds;
3692 if (setExpire(c->db,c->argv[1],when))
3693 addReply(c,shared.cone);
3694 else
3695 addReply(c,shared.czero);
3696 return;
3697 }
3698 }
3699
3700 static void ttlCommand(redisClient *c) {
3701 time_t expire;
3702 int ttl = -1;
3703
3704 expire = getExpire(c->db,c->argv[1]);
3705 if (expire != -1) {
3706 ttl = (int) (expire-time(NULL));
3707 if (ttl < 0) ttl = -1;
3708 }
3709 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
3710 }
3711
3712 /* =============================== Replication ============================= */
3713
3714 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
3715 ssize_t nwritten, ret = size;
3716 time_t start = time(NULL);
3717
3718 timeout++;
3719 while(size) {
3720 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
3721 nwritten = write(fd,ptr,size);
3722 if (nwritten == -1) return -1;
3723 ptr += nwritten;
3724 size -= nwritten;
3725 }
3726 if ((time(NULL)-start) > timeout) {
3727 errno = ETIMEDOUT;
3728 return -1;
3729 }
3730 }
3731 return ret;
3732 }
3733
3734 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
3735 ssize_t nread, totread = 0;
3736 time_t start = time(NULL);
3737
3738 timeout++;
3739 while(size) {
3740 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
3741 nread = read(fd,ptr,size);
3742 if (nread == -1) return -1;
3743 ptr += nread;
3744 size -= nread;
3745 totread += nread;
3746 }
3747 if ((time(NULL)-start) > timeout) {
3748 errno = ETIMEDOUT;
3749 return -1;
3750 }
3751 }
3752 return totread;
3753 }
3754
3755 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
3756 ssize_t nread = 0;
3757
3758 size--;
3759 while(size) {
3760 char c;
3761
3762 if (syncRead(fd,&c,1,timeout) == -1) return -1;
3763 if (c == '\n') {
3764 *ptr = '\0';
3765 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
3766 return nread;
3767 } else {
3768 *ptr++ = c;
3769 *ptr = '\0';
3770 nread++;
3771 }
3772 }
3773 return nread;
3774 }
3775
3776 static void syncCommand(redisClient *c) {
3777 /* ignore SYNC if aleady slave or in monitor mode */
3778 if (c->flags & REDIS_SLAVE) return;
3779
3780 /* SYNC can't be issued when the server has pending data to send to
3781 * the client about already issued commands. We need a fresh reply
3782 * buffer registering the differences between the BGSAVE and the current
3783 * dataset, so that we can copy to other slaves if needed. */
3784 if (listLength(c->reply) != 0) {
3785 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3786 return;
3787 }
3788
3789 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
3790 /* Here we need to check if there is a background saving operation
3791 * in progress, or if it is required to start one */
3792 if (server.bgsaveinprogress) {
3793 /* Ok a background save is in progress. Let's check if it is a good
3794 * one for replication, i.e. if there is another slave that is
3795 * registering differences since the server forked to save */
3796 redisClient *slave;
3797 listNode *ln;
3798
3799 listRewind(server.slaves);
3800 while((ln = listYield(server.slaves))) {
3801 slave = ln->value;
3802 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
3803 }
3804 if (ln) {
3805 /* Perfect, the server is already registering differences for
3806 * another slave. Set the right state, and copy the buffer. */
3807 listRelease(c->reply);
3808 c->reply = listDup(slave->reply);
3809 if (!c->reply) oom("listDup copying slave reply list");
3810 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3811 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
3812 } else {
3813 /* No way, we need to wait for the next BGSAVE in order to
3814 * register differences */
3815 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
3816 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
3817 }
3818 } else {
3819 /* Ok we don't have a BGSAVE in progress, let's start one */
3820 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
3821 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3822 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
3823 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
3824 return;
3825 }
3826 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3827 }
3828 c->repldbfd = -1;
3829 c->flags |= REDIS_SLAVE;
3830 c->slaveseldb = 0;
3831 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
3832 return;
3833 }
3834
3835 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
3836 redisClient *slave = privdata;
3837 REDIS_NOTUSED(el);
3838 REDIS_NOTUSED(mask);
3839 char buf[REDIS_IOBUF_LEN];
3840 ssize_t nwritten, buflen;
3841
3842 if (slave->repldboff == 0) {
3843 /* Write the bulk write count before to transfer the DB. In theory here
3844 * we don't know how much room there is in the output buffer of the
3845 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3846 * operations) will never be smaller than the few bytes we need. */
3847 sds bulkcount;
3848
3849 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3850 slave->repldbsize);
3851 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
3852 {
3853 sdsfree(bulkcount);
3854 freeClient(slave);
3855 return;
3856 }
3857 sdsfree(bulkcount);
3858 }
3859 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
3860 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
3861 if (buflen <= 0) {
3862 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
3863 (buflen == 0) ? "premature EOF" : strerror(errno));
3864 freeClient(slave);
3865 return;
3866 }
3867 if ((nwritten = write(fd,buf,buflen)) == -1) {
3868 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
3869 strerror(errno));
3870 freeClient(slave);
3871 return;
3872 }
3873 slave->repldboff += nwritten;
3874 if (slave->repldboff == slave->repldbsize) {
3875 close(slave->repldbfd);
3876 slave->repldbfd = -1;
3877 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3878 slave->replstate = REDIS_REPL_ONLINE;
3879 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
3880 sendReplyToClient, slave, NULL) == AE_ERR) {
3881 freeClient(slave);
3882 return;
3883 }
3884 addReplySds(slave,sdsempty());
3885 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
3886 }
3887 }
3888
3889 static void updateSalvesWaitingBgsave(int bgsaveerr) {
3890 listNode *ln;
3891 int startbgsave = 0;
3892
3893 listRewind(server.slaves);
3894 while((ln = listYield(server.slaves))) {
3895 redisClient *slave = ln->value;
3896
3897 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
3898 startbgsave = 1;
3899 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3900 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
3901 struct redis_stat buf;
3902
3903 if (bgsaveerr != REDIS_OK) {
3904 freeClient(slave);
3905 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
3906 continue;
3907 }
3908 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
3909 redis_fstat(slave->repldbfd,&buf) == -1) {
3910 freeClient(slave);
3911 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
3912 continue;
3913 }
3914 slave->repldboff = 0;
3915 slave->repldbsize = buf.st_size;
3916 slave->replstate = REDIS_REPL_SEND_BULK;
3917 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3918 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
3919 freeClient(slave);
3920 continue;
3921 }
3922 }
3923 }
3924 if (startbgsave) {
3925 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3926 listRewind(server.slaves);
3927 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
3928 while((ln = listYield(server.slaves))) {
3929 redisClient *slave = ln->value;
3930
3931 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
3932 freeClient(slave);
3933 }
3934 }
3935 }
3936 }
3937
3938 static int syncWithMaster(void) {
3939 char buf[1024], tmpfile[256];
3940 int dumpsize;
3941 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
3942 int dfd;
3943
3944 if (fd == -1) {
3945 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
3946 strerror(errno));
3947 return REDIS_ERR;
3948 }
3949 /* Issue the SYNC command */
3950 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
3951 close(fd);
3952 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
3953 strerror(errno));
3954 return REDIS_ERR;
3955 }
3956 /* Read the bulk write count */
3957 if (syncReadLine(fd,buf,1024,3600) == -1) {
3958 close(fd);
3959 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
3960 strerror(errno));
3961 return REDIS_ERR;
3962 }
3963 dumpsize = atoi(buf+1);
3964 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
3965 /* Read the bulk write data on a temp file */
3966 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
3967 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
3968 if (dfd == -1) {
3969 close(fd);
3970 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
3971 return REDIS_ERR;
3972 }
3973 while(dumpsize) {
3974 int nread, nwritten;
3975
3976 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
3977 if (nread == -1) {
3978 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
3979 strerror(errno));
3980 close(fd);
3981 close(dfd);
3982 return REDIS_ERR;
3983 }
3984 nwritten = write(dfd,buf,nread);
3985 if (nwritten == -1) {
3986 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
3987 close(fd);
3988 close(dfd);
3989 return REDIS_ERR;
3990 }
3991 dumpsize -= nread;
3992 }
3993 close(dfd);
3994 if (rename(tmpfile,server.dbfilename) == -1) {
3995 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
3996 unlink(tmpfile);
3997 close(fd);
3998 return REDIS_ERR;
3999 }
4000 emptyDb();
4001 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4002 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4003 close(fd);
4004 return REDIS_ERR;
4005 }
4006 server.master = createClient(fd);
4007 server.master->flags |= REDIS_MASTER;
4008 server.replstate = REDIS_REPL_CONNECTED;
4009 return REDIS_OK;
4010 }
4011
4012 static void slaveofCommand(redisClient *c) {
4013 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4014 !strcasecmp(c->argv[2]->ptr,"one")) {
4015 if (server.masterhost) {
4016 sdsfree(server.masterhost);
4017 server.masterhost = NULL;
4018 if (server.master) freeClient(server.master);
4019 server.replstate = REDIS_REPL_NONE;
4020 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4021 }
4022 } else {
4023 sdsfree(server.masterhost);
4024 server.masterhost = sdsdup(c->argv[1]->ptr);
4025 server.masterport = atoi(c->argv[2]->ptr);
4026 if (server.master) freeClient(server.master);
4027 server.replstate = REDIS_REPL_CONNECT;
4028 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4029 server.masterhost, server.masterport);
4030 }
4031 addReply(c,shared.ok);
4032 }
4033
4034 /* ============================ Maxmemory directive ======================== */
4035
4036 /* This function gets called when 'maxmemory' is set on the config file to limit
4037 * the max memory used by the server, and we are out of memory.
4038 * This function will try to, in order:
4039 *
4040 * - Free objects from the free list
4041 * - Try to remove keys with an EXPIRE set
4042 *
4043 * It is not possible to free enough memory to reach used-memory < maxmemory
4044 * the server will start refusing commands that will enlarge even more the
4045 * memory usage.
4046 */
4047 static void freeMemoryIfNeeded(void) {
4048 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4049 if (listLength(server.objfreelist)) {
4050 robj *o;
4051
4052 listNode *head = listFirst(server.objfreelist);
4053 o = listNodeValue(head);
4054 listDelNode(server.objfreelist,head);
4055 zfree(o);
4056 } else {
4057 int j, k, freed = 0;
4058
4059 for (j = 0; j < server.dbnum; j++) {
4060 int minttl = -1;
4061 robj *minkey = NULL;
4062 struct dictEntry *de;
4063
4064 if (dictSize(server.db[j].expires)) {
4065 freed = 1;
4066 /* From a sample of three keys drop the one nearest to
4067 * the natural expire */
4068 for (k = 0; k < 3; k++) {
4069 time_t t;
4070
4071 de = dictGetRandomKey(server.db[j].expires);
4072 t = (time_t) dictGetEntryVal(de);
4073 if (minttl == -1 || t < minttl) {
4074 minkey = dictGetEntryKey(de);
4075 minttl = t;
4076 }
4077 }
4078 deleteKey(server.db+j,minkey);
4079 }
4080 }
4081 if (!freed) return; /* nothing to free... */
4082 }
4083 }
4084 }
4085
4086 /* ================================= Debugging ============================== */
4087
4088 static void debugCommand(redisClient *c) {
4089 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4090 *((char*)-1) = 'x';
4091 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4092 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4093 robj *key, *val;
4094
4095 if (!de) {
4096 addReply(c,shared.nokeyerr);
4097 return;
4098 }
4099 key = dictGetEntryKey(de);
4100 val = dictGetEntryVal(de);
4101 addReplySds(c,sdscatprintf(sdsempty(),
4102 "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
4103 key, key->refcount, val, val->refcount));
4104 } else {
4105 addReplySds(c,sdsnew(
4106 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4107 }
4108 }
4109
4110 #ifdef HAVE_BACKTRACE
4111 static struct redisFunctionSym symsTable[] = {
4112 {"freeStringObject", (unsigned long)freeStringObject},
4113 {"freeListObject", (unsigned long)freeListObject},
4114 {"freeSetObject", (unsigned long)freeSetObject},
4115 {"decrRefCount", (unsigned long)decrRefCount},
4116 {"createObject", (unsigned long)createObject},
4117 {"freeClient", (unsigned long)freeClient},
4118 {"rdbLoad", (unsigned long)rdbLoad},
4119 {"addReply", (unsigned long)addReply},
4120 {"addReplySds", (unsigned long)addReplySds},
4121 {"incrRefCount", (unsigned long)incrRefCount},
4122 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
4123 {"createStringObject", (unsigned long)createStringObject},
4124 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
4125 {"syncWithMaster", (unsigned long)syncWithMaster},
4126 {"tryObjectSharing", (unsigned long)tryObjectSharing},
4127 {"removeExpire", (unsigned long)removeExpire},
4128 {"expireIfNeeded", (unsigned long)expireIfNeeded},
4129 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
4130 {"deleteKey", (unsigned long)deleteKey},
4131 {"getExpire", (unsigned long)getExpire},
4132 {"setExpire", (unsigned long)setExpire},
4133 {"updateSalvesWaitingBgsave", (unsigned long)updateSalvesWaitingBgsave},
4134 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
4135 {"authCommand", (unsigned long)authCommand},
4136 {"pingCommand", (unsigned long)pingCommand},
4137 {"echoCommand", (unsigned long)echoCommand},
4138 {"setCommand", (unsigned long)setCommand},
4139 {"setnxCommand", (unsigned long)setnxCommand},
4140 {"getCommand", (unsigned long)getCommand},
4141 {"delCommand", (unsigned long)delCommand},
4142 {"existsCommand", (unsigned long)existsCommand},
4143 {"incrCommand", (unsigned long)incrCommand},
4144 {"decrCommand", (unsigned long)decrCommand},
4145 {"incrbyCommand", (unsigned long)incrbyCommand},
4146 {"decrbyCommand", (unsigned long)decrbyCommand},
4147 {"selectCommand", (unsigned long)selectCommand},
4148 {"randomkeyCommand", (unsigned long)randomkeyCommand},
4149 {"keysCommand", (unsigned long)keysCommand},
4150 {"dbsizeCommand", (unsigned long)dbsizeCommand},
4151 {"lastsaveCommand", (unsigned long)lastsaveCommand},
4152 {"saveCommand", (unsigned long)saveCommand},
4153 {"bgsaveCommand", (unsigned long)bgsaveCommand},
4154 {"shutdownCommand", (unsigned long)shutdownCommand},
4155 {"moveCommand", (unsigned long)moveCommand},
4156 {"renameCommand", (unsigned long)renameCommand},
4157 {"renamenxCommand", (unsigned long)renamenxCommand},
4158 {"lpushCommand", (unsigned long)lpushCommand},
4159 {"rpushCommand", (unsigned long)rpushCommand},
4160 {"lpopCommand", (unsigned long)lpopCommand},
4161 {"rpopCommand", (unsigned long)rpopCommand},
4162 {"llenCommand", (unsigned long)llenCommand},
4163 {"lindexCommand", (unsigned long)lindexCommand},
4164 {"lrangeCommand", (unsigned long)lrangeCommand},
4165 {"ltrimCommand", (unsigned long)ltrimCommand},
4166 {"typeCommand", (unsigned long)typeCommand},
4167 {"lsetCommand", (unsigned long)lsetCommand},
4168 {"saddCommand", (unsigned long)saddCommand},
4169 {"sremCommand", (unsigned long)sremCommand},
4170 {"smoveCommand", (unsigned long)smoveCommand},
4171 {"sismemberCommand", (unsigned long)sismemberCommand},
4172 {"scardCommand", (unsigned long)scardCommand},
4173 {"sinterCommand", (unsigned long)sinterCommand},
4174 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
4175 {"sunionCommand", (unsigned long)sunionCommand},
4176 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
4177 {"sdiffCommand", (unsigned long)sdiffCommand},
4178 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
4179 {"syncCommand", (unsigned long)syncCommand},
4180 {"flushdbCommand", (unsigned long)flushdbCommand},
4181 {"flushallCommand", (unsigned long)flushallCommand},
4182 {"sortCommand", (unsigned long)sortCommand},
4183 {"lremCommand", (unsigned long)lremCommand},
4184 {"infoCommand", (unsigned long)infoCommand},
4185 {"mgetCommand", (unsigned long)mgetCommand},
4186 {"monitorCommand", (unsigned long)monitorCommand},
4187 {"expireCommand", (unsigned long)expireCommand},
4188 {"getSetCommand", (unsigned long)getSetCommand},
4189 {"ttlCommand", (unsigned long)ttlCommand},
4190 {"slaveofCommand", (unsigned long)slaveofCommand},
4191 {"debugCommand", (unsigned long)debugCommand},
4192 {"processCommand", (unsigned long)processCommand},
4193 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
4194 {"readQueryFromClient", (unsigned long)readQueryFromClient},
4195 {NULL,0}
4196 };
4197
4198 /* This function try to convert a pointer into a function name. It's used in
4199 * oreder to provide a backtrace under segmentation fault that's able to
4200 * display functions declared as static (otherwise the backtrace is useless). */
4201 static char *findFuncName(void *pointer, unsigned long *offset){
4202 int i, ret = -1;
4203 unsigned long off, minoff = 0;
4204
4205 /* Try to match against the Symbol with the smallest offset */
4206 for (i=0; symsTable[i].pointer; i++) {
4207 unsigned long lp = (unsigned long) pointer;
4208
4209 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
4210 off=lp-symsTable[i].pointer;
4211 if (ret < 0 || off < minoff) {
4212 minoff=off;
4213 ret=i;
4214 }
4215 }
4216 }
4217 if (ret == -1) return NULL;
4218 *offset = minoff;
4219 return symsTable[ret].name;
4220 }
4221
4222 static void *getMcontextEip(ucontext_t *uc) {
4223 #if defined(__FreeBSD__)
4224 return (void*) uc->uc_mcontext.mc_eip;
4225 #elif defined(__dietlibc__)
4226 return (void*) uc->uc_mcontext.eip;
4227 #elif defined(__APPLE__)
4228 return (void*) uc->uc_mcontext->__ss.__eip;
4229 #else /* Linux */
4230 return (void*) uc->uc_mcontext.gregs[REG_EIP];
4231 #endif
4232 }
4233
4234 static void segvHandler(int sig, siginfo_t *info, void *secret) {
4235 void *trace[100];
4236 char **messages = NULL;
4237 int i, trace_size = 0;
4238 unsigned long offset=0;
4239 time_t uptime = time(NULL)-server.stat_starttime;
4240 ucontext_t *uc = (ucontext_t*) secret;
4241 REDIS_NOTUSED(info);
4242
4243 redisLog(REDIS_WARNING,
4244 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
4245 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
4246 "redis_version:%s; "
4247 "uptime_in_seconds:%d; "
4248 "connected_clients:%d; "
4249 "connected_slaves:%d; "
4250 "used_memory:%zu; "
4251 "changes_since_last_save:%lld; "
4252 "bgsave_in_progress:%d; "
4253 "last_save_time:%d; "
4254 "total_connections_received:%lld; "
4255 "total_commands_processed:%lld; "
4256 "role:%s;"
4257 ,REDIS_VERSION,
4258 uptime,
4259 listLength(server.clients)-listLength(server.slaves),
4260 listLength(server.slaves),
4261 server.usedmemory,
4262 server.dirty,
4263 server.bgsaveinprogress,
4264 server.lastsave,
4265 server.stat_numconnections,
4266 server.stat_numcommands,
4267 server.masterhost == NULL ? "master" : "slave"
4268 ));
4269
4270 trace_size = backtrace(trace, 100);
4271 /* overwrite sigaction with caller's address */
4272 trace[1] = getMcontextEip(uc);
4273 messages = backtrace_symbols(trace, trace_size);
4274
4275 for (i=1; i<trace_size; ++i) {
4276 char *fn = findFuncName(trace[i], &offset), *p;
4277
4278 p = strchr(messages[i],'+');
4279 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
4280 redisLog(REDIS_WARNING,"%s", messages[i]);
4281 } else {
4282 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
4283 }
4284 }
4285 free(messages);
4286 exit(0);
4287 }
4288
4289 static void setupSigSegvAction(void) {
4290 struct sigaction act;
4291
4292 sigemptyset (&act.sa_mask);
4293 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4294 * is used. Otherwise, sa_handler is used */
4295 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
4296 act.sa_sigaction = segvHandler;
4297 sigaction (SIGSEGV, &act, NULL);
4298 sigaction (SIGBUS, &act, NULL);
4299 return;
4300 }
4301 #else /* HAVE_BACKTRACE */
4302 static void setupSigSegvAction(void) {
4303 }
4304 #endif /* HAVE_BACKTRACE */
4305
4306 /* =================================== Main! ================================ */
4307
4308 #ifdef __linux__
4309 int linuxOvercommitMemoryValue(void) {
4310 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4311 char buf[64];
4312
4313 if (!fp) return -1;
4314 if (fgets(buf,64,fp) == NULL) {
4315 fclose(fp);
4316 return -1;
4317 }
4318 fclose(fp);
4319
4320 return atoi(buf);
4321 }
4322
4323 void linuxOvercommitMemoryWarning(void) {
4324 if (linuxOvercommitMemoryValue() == 0) {
4325 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4326 }
4327 }
4328 #endif /* __linux__ */
4329
4330 static void daemonize(void) {
4331 int fd;
4332 FILE *fp;
4333
4334 if (fork() != 0) exit(0); /* parent exits */
4335 setsid(); /* create a new session */
4336
4337 /* Every output goes to /dev/null. If Redis is daemonized but
4338 * the 'logfile' is set to 'stdout' in the configuration file
4339 * it will not log at all. */
4340 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4341 dup2(fd, STDIN_FILENO);
4342 dup2(fd, STDOUT_FILENO);
4343 dup2(fd, STDERR_FILENO);
4344 if (fd > STDERR_FILENO) close(fd);
4345 }
4346 /* Try to write the pid file */
4347 fp = fopen(server.pidfile,"w");
4348 if (fp) {
4349 fprintf(fp,"%d\n",getpid());
4350 fclose(fp);
4351 }
4352 }
4353
4354 int main(int argc, char **argv) {
4355 #ifdef __linux__
4356 linuxOvercommitMemoryWarning();
4357 #endif
4358
4359 initServerConfig();
4360 if (argc == 2) {
4361 ResetServerSaveParams();
4362 loadServerConfig(argv[1]);
4363 } else if (argc > 2) {
4364 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4365 exit(1);
4366 } else {
4367 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4368 }
4369 initServer();
4370 if (server.daemonize) daemonize();
4371 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4372 if (rdbLoad(server.dbfilename) == REDIS_OK)
4373 redisLog(REDIS_NOTICE,"DB loaded from disk");
4374 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4375 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4376 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4377 aeMain(server.el);
4378 aeDeleteEventLoop(server.el);
4379 return 0;
4380 }