]> git.saurik.com Git - redis.git/blob - redis.c
a44be7bf442c3a28d5b551b0890826fcbfb29b82
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "0.101"
31
32 #include "fmacros.h"
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 #define __USE_POSIX199309
40 #include <signal.h>
41 #include <execinfo.h>
42 #include <ucontext.h>
43 #include <sys/wait.h>
44 #include <errno.h>
45 #include <assert.h>
46 #include <ctype.h>
47 #include <stdarg.h>
48 #include <inttypes.h>
49 #include <arpa/inet.h>
50 #include <sys/stat.h>
51 #include <fcntl.h>
52 #include <sys/time.h>
53 #include <sys/resource.h>
54 #include <limits.h>
55 #include <execinfo.h>
56
57 #include "redis.h"
58 #include "ae.h" /* Event driven programming library */
59 #include "sds.h" /* Dynamic safe strings */
60 #include "anet.h" /* Networking the easy way */
61 #include "dict.h" /* Hash tables */
62 #include "adlist.h" /* Linked lists */
63 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
64 #include "lzf.h" /* LZF compression library */
65 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
66
67 #include "config.h"
68
69 /* Error codes */
70 #define REDIS_OK 0
71 #define REDIS_ERR -1
72
73 /* Static server configuration */
74 #define REDIS_SERVERPORT 6379 /* TCP port */
75 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
76 #define REDIS_IOBUF_LEN 1024
77 #define REDIS_LOADBUF_LEN 1024
78 #define REDIS_STATIC_ARGS 4
79 #define REDIS_DEFAULT_DBNUM 16
80 #define REDIS_CONFIGLINE_MAX 1024
81 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
82 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
83 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
84 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
85
86 /* Hash table parameters */
87 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
88 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
89
90 /* Command flags */
91 #define REDIS_CMD_BULK 1 /* Bulk write command */
92 #define REDIS_CMD_INLINE 2 /* Inline command */
93 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
94 this flags will return an error when the 'maxmemory' option is set in the
95 config file and the server is using more than maxmemory bytes of memory.
96 In short this commands are denied on low memory conditions. */
97 #define REDIS_CMD_DENYOOM 4
98
99 /* Object types */
100 #define REDIS_STRING 0
101 #define REDIS_LIST 1
102 #define REDIS_SET 2
103 #define REDIS_HASH 3
104
105 /* Object types only used for dumping to disk */
106 #define REDIS_EXPIRETIME 253
107 #define REDIS_SELECTDB 254
108 #define REDIS_EOF 255
109
110 /* Defines related to the dump file format. To store 32 bits lengths for short
111 * keys requires a lot of space, so we check the most significant 2 bits of
112 * the first byte to interpreter the length:
113 *
114 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
115 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
116 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
117 * 11|000000 this means: specially encoded object will follow. The six bits
118 * number specify the kind of object that follows.
119 * See the REDIS_RDB_ENC_* defines.
120 *
121 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
122 * values, will fit inside. */
123 #define REDIS_RDB_6BITLEN 0
124 #define REDIS_RDB_14BITLEN 1
125 #define REDIS_RDB_32BITLEN 2
126 #define REDIS_RDB_ENCVAL 3
127 #define REDIS_RDB_LENERR UINT_MAX
128
129 /* When a length of a string object stored on disk has the first two bits
130 * set, the remaining two bits specify a special encoding for the object
131 * accordingly to the following defines: */
132 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
133 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
134 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
135 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
136
137 /* Client flags */
138 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
139 #define REDIS_SLAVE 2 /* This client is a slave server */
140 #define REDIS_MASTER 4 /* This client is a master server */
141 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
142
143 /* Slave replication state - slave side */
144 #define REDIS_REPL_NONE 0 /* No active replication */
145 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
146 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
147
148 /* Slave replication state - from the point of view of master
149 * Note that in SEND_BULK and ONLINE state the slave receives new updates
150 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
151 * to start the next background saving in order to send updates to it. */
152 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
153 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
154 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
155 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
156
157 /* List related stuff */
158 #define REDIS_HEAD 0
159 #define REDIS_TAIL 1
160
161 /* Sort operations */
162 #define REDIS_SORT_GET 0
163 #define REDIS_SORT_DEL 1
164 #define REDIS_SORT_INCR 2
165 #define REDIS_SORT_DECR 3
166 #define REDIS_SORT_ASC 4
167 #define REDIS_SORT_DESC 5
168 #define REDIS_SORTKEY_MAX 1024
169
170 /* Log levels */
171 #define REDIS_DEBUG 0
172 #define REDIS_NOTICE 1
173 #define REDIS_WARNING 2
174
175 /* Anti-warning macro... */
176 #define REDIS_NOTUSED(V) ((void) V)
177
178
179 /*================================= Data types ============================== */
180
181 /* A redis object, that is a type able to hold a string / list / set */
182 typedef struct redisObject {
183 void *ptr;
184 int type;
185 int refcount;
186 } robj;
187
188 typedef struct redisDb {
189 dict *dict;
190 dict *expires;
191 int id;
192 } redisDb;
193
194 /* With multiplexing we need to take per-clinet state.
195 * Clients are taken in a liked list. */
196 typedef struct redisClient {
197 int fd;
198 redisDb *db;
199 int dictid;
200 sds querybuf;
201 robj **argv;
202 int argc;
203 int bulklen; /* bulk read len. -1 if not in bulk read mode */
204 list *reply;
205 int sentlen;
206 time_t lastinteraction; /* time of the last interaction, used for timeout */
207 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
208 int slaveseldb; /* slave selected db, if this client is a slave */
209 int authenticated; /* when requirepass is non-NULL */
210 int replstate; /* replication state if this is a slave */
211 int repldbfd; /* replication DB file descriptor */
212 long repldboff; /* replication DB file offset */
213 off_t repldbsize; /* replication DB file size */
214 } redisClient;
215
216 struct saveparam {
217 time_t seconds;
218 int changes;
219 };
220
221 /* Global server state structure */
222 struct redisServer {
223 int port;
224 int fd;
225 redisDb *db;
226 dict *sharingpool;
227 unsigned int sharingpoolsize;
228 long long dirty; /* changes to DB from the last save */
229 list *clients;
230 list *slaves, *monitors;
231 char neterr[ANET_ERR_LEN];
232 aeEventLoop *el;
233 int cronloops; /* number of times the cron function run */
234 list *objfreelist; /* A list of freed objects to avoid malloc() */
235 time_t lastsave; /* Unix time of last save succeeede */
236 size_t usedmemory; /* Used memory in megabytes */
237 /* Fields used only for stats */
238 time_t stat_starttime; /* server start time */
239 long long stat_numcommands; /* number of processed commands */
240 long long stat_numconnections; /* number of connections received */
241 /* Configuration */
242 int verbosity;
243 int glueoutputbuf;
244 int maxidletime;
245 int dbnum;
246 int daemonize;
247 char *pidfile;
248 int bgsaveinprogress;
249 pid_t bgsavechildpid;
250 struct saveparam *saveparams;
251 int saveparamslen;
252 char *logfile;
253 char *bindaddr;
254 char *dbfilename;
255 char *requirepass;
256 int shareobjects;
257 /* Replication related */
258 int isslave;
259 char *masterhost;
260 int masterport;
261 redisClient *master; /* client that is master for this slave */
262 int replstate;
263 unsigned int maxclients;
264 unsigned int maxmemory;
265 /* Sort parameters - qsort_r() is only available under BSD so we
266 * have to take this state global, in order to pass it to sortCompare() */
267 int sort_desc;
268 int sort_alpha;
269 int sort_bypattern;
270 };
271
272 typedef void redisCommandProc(redisClient *c);
273 struct redisCommand {
274 char *name;
275 redisCommandProc *proc;
276 int arity;
277 int flags;
278 };
279
280 struct redisFunctionSym {
281 char *name;
282 unsigned long pointer;
283 };
284
285 typedef struct _redisSortObject {
286 robj *obj;
287 union {
288 double score;
289 robj *cmpobj;
290 } u;
291 } redisSortObject;
292
293 typedef struct _redisSortOperation {
294 int type;
295 robj *pattern;
296 } redisSortOperation;
297
298 struct sharedObjectsStruct {
299 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
300 *colon, *nullbulk, *nullmultibulk,
301 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
302 *outofrangeerr, *plus,
303 *select0, *select1, *select2, *select3, *select4,
304 *select5, *select6, *select7, *select8, *select9;
305 } shared;
306
307 /*================================ Prototypes =============================== */
308
309 static void freeStringObject(robj *o);
310 static void freeListObject(robj *o);
311 static void freeSetObject(robj *o);
312 static void decrRefCount(void *o);
313 static robj *createObject(int type, void *ptr);
314 static void freeClient(redisClient *c);
315 static int rdbLoad(char *filename);
316 static void addReply(redisClient *c, robj *obj);
317 static void addReplySds(redisClient *c, sds s);
318 static void incrRefCount(robj *o);
319 static int rdbSaveBackground(char *filename);
320 static robj *createStringObject(char *ptr, size_t len);
321 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
322 static int syncWithMaster(void);
323 static robj *tryObjectSharing(robj *o);
324 static int removeExpire(redisDb *db, robj *key);
325 static int expireIfNeeded(redisDb *db, robj *key);
326 static int deleteIfVolatile(redisDb *db, robj *key);
327 static int deleteKey(redisDb *db, robj *key);
328 static time_t getExpire(redisDb *db, robj *key);
329 static int setExpire(redisDb *db, robj *key, time_t when);
330 static void updateSalvesWaitingBgsave(int bgsaveerr);
331 static void freeMemoryIfNeeded(void);
332 static int processCommand(redisClient *c);
333 static void segvHandler(int sig, siginfo_t *info, void *secret);
334 static void setupSigSegvAction(void);
335
336 static void authCommand(redisClient *c);
337 static void pingCommand(redisClient *c);
338 static void echoCommand(redisClient *c);
339 static void setCommand(redisClient *c);
340 static void setnxCommand(redisClient *c);
341 static void getCommand(redisClient *c);
342 static void delCommand(redisClient *c);
343 static void existsCommand(redisClient *c);
344 static void incrCommand(redisClient *c);
345 static void decrCommand(redisClient *c);
346 static void incrbyCommand(redisClient *c);
347 static void decrbyCommand(redisClient *c);
348 static void selectCommand(redisClient *c);
349 static void randomkeyCommand(redisClient *c);
350 static void keysCommand(redisClient *c);
351 static void dbsizeCommand(redisClient *c);
352 static void lastsaveCommand(redisClient *c);
353 static void saveCommand(redisClient *c);
354 static void bgsaveCommand(redisClient *c);
355 static void shutdownCommand(redisClient *c);
356 static void moveCommand(redisClient *c);
357 static void renameCommand(redisClient *c);
358 static void renamenxCommand(redisClient *c);
359 static void lpushCommand(redisClient *c);
360 static void rpushCommand(redisClient *c);
361 static void lpopCommand(redisClient *c);
362 static void rpopCommand(redisClient *c);
363 static void llenCommand(redisClient *c);
364 static void lindexCommand(redisClient *c);
365 static void lrangeCommand(redisClient *c);
366 static void ltrimCommand(redisClient *c);
367 static void typeCommand(redisClient *c);
368 static void lsetCommand(redisClient *c);
369 static void saddCommand(redisClient *c);
370 static void sremCommand(redisClient *c);
371 static void smoveCommand(redisClient *c);
372 static void sismemberCommand(redisClient *c);
373 static void scardCommand(redisClient *c);
374 static void sinterCommand(redisClient *c);
375 static void sinterstoreCommand(redisClient *c);
376 static void sunionCommand(redisClient *c);
377 static void sunionstoreCommand(redisClient *c);
378 static void sdiffCommand(redisClient *c);
379 static void sdiffstoreCommand(redisClient *c);
380 static void syncCommand(redisClient *c);
381 static void flushdbCommand(redisClient *c);
382 static void flushallCommand(redisClient *c);
383 static void sortCommand(redisClient *c);
384 static void lremCommand(redisClient *c);
385 static void infoCommand(redisClient *c);
386 static void mgetCommand(redisClient *c);
387 static void monitorCommand(redisClient *c);
388 static void expireCommand(redisClient *c);
389 static void getSetCommand(redisClient *c);
390 static void ttlCommand(redisClient *c);
391 static void slaveofCommand(redisClient *c);
392 static void debugCommand(redisClient *c);
393 /*================================= Globals ================================= */
394
395 /* Global vars */
396 static struct redisServer server; /* server global state */
397 static struct redisCommand cmdTable[] = {
398 {"get",getCommand,2,REDIS_CMD_INLINE},
399 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
400 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
401 {"del",delCommand,-2,REDIS_CMD_INLINE},
402 {"exists",existsCommand,2,REDIS_CMD_INLINE},
403 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
404 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
405 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
406 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
407 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
408 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
409 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
410 {"llen",llenCommand,2,REDIS_CMD_INLINE},
411 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
412 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
413 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
414 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
415 {"lrem",lremCommand,4,REDIS_CMD_BULK},
416 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
417 {"srem",sremCommand,3,REDIS_CMD_BULK},
418 {"smove",smoveCommand,4,REDIS_CMD_BULK},
419 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
420 {"scard",scardCommand,2,REDIS_CMD_INLINE},
421 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
422 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
423 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
424 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
425 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
426 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
427 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
428 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
429 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
430 {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
431 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
432 {"select",selectCommand,2,REDIS_CMD_INLINE},
433 {"move",moveCommand,3,REDIS_CMD_INLINE},
434 {"rename",renameCommand,3,REDIS_CMD_INLINE},
435 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
436 {"expire",expireCommand,3,REDIS_CMD_INLINE},
437 {"keys",keysCommand,2,REDIS_CMD_INLINE},
438 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
439 {"auth",authCommand,2,REDIS_CMD_INLINE},
440 {"ping",pingCommand,1,REDIS_CMD_INLINE},
441 {"echo",echoCommand,2,REDIS_CMD_BULK},
442 {"save",saveCommand,1,REDIS_CMD_INLINE},
443 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
444 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
445 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
446 {"type",typeCommand,2,REDIS_CMD_INLINE},
447 {"sync",syncCommand,1,REDIS_CMD_INLINE},
448 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
449 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
450 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
451 {"info",infoCommand,1,REDIS_CMD_INLINE},
452 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
453 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
454 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
455 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
456 {NULL,NULL,0,0}
457 };
458 /*============================ Utility functions ============================ */
459
460 /* Glob-style pattern matching. */
461 int stringmatchlen(const char *pattern, int patternLen,
462 const char *string, int stringLen, int nocase)
463 {
464 while(patternLen) {
465 switch(pattern[0]) {
466 case '*':
467 while (pattern[1] == '*') {
468 pattern++;
469 patternLen--;
470 }
471 if (patternLen == 1)
472 return 1; /* match */
473 while(stringLen) {
474 if (stringmatchlen(pattern+1, patternLen-1,
475 string, stringLen, nocase))
476 return 1; /* match */
477 string++;
478 stringLen--;
479 }
480 return 0; /* no match */
481 break;
482 case '?':
483 if (stringLen == 0)
484 return 0; /* no match */
485 string++;
486 stringLen--;
487 break;
488 case '[':
489 {
490 int not, match;
491
492 pattern++;
493 patternLen--;
494 not = pattern[0] == '^';
495 if (not) {
496 pattern++;
497 patternLen--;
498 }
499 match = 0;
500 while(1) {
501 if (pattern[0] == '\\') {
502 pattern++;
503 patternLen--;
504 if (pattern[0] == string[0])
505 match = 1;
506 } else if (pattern[0] == ']') {
507 break;
508 } else if (patternLen == 0) {
509 pattern--;
510 patternLen++;
511 break;
512 } else if (pattern[1] == '-' && patternLen >= 3) {
513 int start = pattern[0];
514 int end = pattern[2];
515 int c = string[0];
516 if (start > end) {
517 int t = start;
518 start = end;
519 end = t;
520 }
521 if (nocase) {
522 start = tolower(start);
523 end = tolower(end);
524 c = tolower(c);
525 }
526 pattern += 2;
527 patternLen -= 2;
528 if (c >= start && c <= end)
529 match = 1;
530 } else {
531 if (!nocase) {
532 if (pattern[0] == string[0])
533 match = 1;
534 } else {
535 if (tolower((int)pattern[0]) == tolower((int)string[0]))
536 match = 1;
537 }
538 }
539 pattern++;
540 patternLen--;
541 }
542 if (not)
543 match = !match;
544 if (!match)
545 return 0; /* no match */
546 string++;
547 stringLen--;
548 break;
549 }
550 case '\\':
551 if (patternLen >= 2) {
552 pattern++;
553 patternLen--;
554 }
555 /* fall through */
556 default:
557 if (!nocase) {
558 if (pattern[0] != string[0])
559 return 0; /* no match */
560 } else {
561 if (tolower((int)pattern[0]) != tolower((int)string[0]))
562 return 0; /* no match */
563 }
564 string++;
565 stringLen--;
566 break;
567 }
568 pattern++;
569 patternLen--;
570 if (stringLen == 0) {
571 while(*pattern == '*') {
572 pattern++;
573 patternLen--;
574 }
575 break;
576 }
577 }
578 if (patternLen == 0 && stringLen == 0)
579 return 1;
580 return 0;
581 }
582
583 static void redisLog(int level, const char *fmt, ...) {
584 va_list ap;
585 FILE *fp;
586
587 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
588 if (!fp) return;
589
590 va_start(ap, fmt);
591 if (level >= server.verbosity) {
592 char *c = ".-*";
593 char buf[64];
594 time_t now;
595
596 now = time(NULL);
597 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
598 fprintf(fp,"%s %c ",buf,c[level]);
599 vfprintf(fp, fmt, ap);
600 fprintf(fp,"\n");
601 fflush(fp);
602 }
603 va_end(ap);
604
605 if (server.logfile) fclose(fp);
606 }
607
608 /*====================== Hash table type implementation ==================== */
609
610 /* This is an hash table type that uses the SDS dynamic strings libary as
611 * keys and radis objects as values (objects can hold SDS strings,
612 * lists, sets). */
613
614 static int sdsDictKeyCompare(void *privdata, const void *key1,
615 const void *key2)
616 {
617 int l1,l2;
618 DICT_NOTUSED(privdata);
619
620 l1 = sdslen((sds)key1);
621 l2 = sdslen((sds)key2);
622 if (l1 != l2) return 0;
623 return memcmp(key1, key2, l1) == 0;
624 }
625
626 static void dictRedisObjectDestructor(void *privdata, void *val)
627 {
628 DICT_NOTUSED(privdata);
629
630 decrRefCount(val);
631 }
632
633 static int dictSdsKeyCompare(void *privdata, const void *key1,
634 const void *key2)
635 {
636 const robj *o1 = key1, *o2 = key2;
637 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
638 }
639
640 static unsigned int dictSdsHash(const void *key) {
641 const robj *o = key;
642 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
643 }
644
645 static dictType setDictType = {
646 dictSdsHash, /* hash function */
647 NULL, /* key dup */
648 NULL, /* val dup */
649 dictSdsKeyCompare, /* key compare */
650 dictRedisObjectDestructor, /* key destructor */
651 NULL /* val destructor */
652 };
653
654 static dictType hashDictType = {
655 dictSdsHash, /* hash function */
656 NULL, /* key dup */
657 NULL, /* val dup */
658 dictSdsKeyCompare, /* key compare */
659 dictRedisObjectDestructor, /* key destructor */
660 dictRedisObjectDestructor /* val destructor */
661 };
662
663 /* ========================= Random utility functions ======================= */
664
665 /* Redis generally does not try to recover from out of memory conditions
666 * when allocating objects or strings, it is not clear if it will be possible
667 * to report this condition to the client since the networking layer itself
668 * is based on heap allocation for send buffers, so we simply abort.
669 * At least the code will be simpler to read... */
670 static void oom(const char *msg) {
671 fprintf(stderr, "%s: Out of memory\n",msg);
672 fflush(stderr);
673 sleep(1);
674 abort();
675 }
676
677 /* ====================== Redis server networking stuff ===================== */
678 static void closeTimedoutClients(void) {
679 redisClient *c;
680 listNode *ln;
681 time_t now = time(NULL);
682
683 listRewind(server.clients);
684 while ((ln = listYield(server.clients)) != NULL) {
685 c = listNodeValue(ln);
686 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
687 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
688 (now - c->lastinteraction > server.maxidletime)) {
689 redisLog(REDIS_DEBUG,"Closing idle client");
690 freeClient(c);
691 }
692 }
693 }
694
695 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
696 * we resize the hash table to save memory */
697 static void tryResizeHashTables(void) {
698 int j;
699
700 for (j = 0; j < server.dbnum; j++) {
701 long long size, used;
702
703 size = dictSlots(server.db[j].dict);
704 used = dictSize(server.db[j].dict);
705 if (size && used && size > REDIS_HT_MINSLOTS &&
706 (used*100/size < REDIS_HT_MINFILL)) {
707 redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
708 dictResize(server.db[j].dict);
709 redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
710 }
711 }
712 }
713
714 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
715 int j, loops = server.cronloops++;
716 REDIS_NOTUSED(eventLoop);
717 REDIS_NOTUSED(id);
718 REDIS_NOTUSED(clientData);
719
720 /* Update the global state with the amount of used memory */
721 server.usedmemory = zmalloc_used_memory();
722
723 /* Show some info about non-empty databases */
724 for (j = 0; j < server.dbnum; j++) {
725 long long size, used, vkeys;
726
727 size = dictSlots(server.db[j].dict);
728 used = dictSize(server.db[j].dict);
729 vkeys = dictSize(server.db[j].expires);
730 if (!(loops % 5) && used > 0) {
731 redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size);
732 /* dictPrintStats(server.dict); */
733 }
734 }
735
736 /* We don't want to resize the hash tables while a bacground saving
737 * is in progress: the saving child is created using fork() that is
738 * implemented with a copy-on-write semantic in most modern systems, so
739 * if we resize the HT while there is the saving child at work actually
740 * a lot of memory movements in the parent will cause a lot of pages
741 * copied. */
742 if (!server.bgsaveinprogress) tryResizeHashTables();
743
744 /* Show information about connected clients */
745 if (!(loops % 5)) {
746 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use",
747 listLength(server.clients)-listLength(server.slaves),
748 listLength(server.slaves),
749 server.usedmemory,
750 dictSize(server.sharingpool));
751 }
752
753 /* Close connections of timedout clients */
754 if (server.maxidletime && !(loops % 10))
755 closeTimedoutClients();
756
757 /* Check if a background saving in progress terminated */
758 if (server.bgsaveinprogress) {
759 int statloc;
760 /* XXX: TODO handle the case of the saving child killed */
761 if (wait4(-1,&statloc,WNOHANG,NULL)) {
762 int exitcode = WEXITSTATUS(statloc);
763 int bysignal = WIFSIGNALED(statloc);
764
765 if (!bysignal && exitcode == 0) {
766 redisLog(REDIS_NOTICE,
767 "Background saving terminated with success");
768 server.dirty = 0;
769 server.lastsave = time(NULL);
770 } else if (!bysignal && exitcode != 0) {
771 redisLog(REDIS_WARNING, "Background saving error");
772 } else {
773 redisLog(REDIS_WARNING,
774 "Background saving terminated by signal");
775 }
776 server.bgsaveinprogress = 0;
777 server.bgsavechildpid = -1;
778 updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
779 }
780 } else {
781 /* If there is not a background saving in progress check if
782 * we have to save now */
783 time_t now = time(NULL);
784 for (j = 0; j < server.saveparamslen; j++) {
785 struct saveparam *sp = server.saveparams+j;
786
787 if (server.dirty >= sp->changes &&
788 now-server.lastsave > sp->seconds) {
789 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
790 sp->changes, sp->seconds);
791 rdbSaveBackground(server.dbfilename);
792 break;
793 }
794 }
795 }
796
797 /* Try to expire a few timed out keys */
798 for (j = 0; j < server.dbnum; j++) {
799 redisDb *db = server.db+j;
800 int num = dictSize(db->expires);
801
802 if (num) {
803 time_t now = time(NULL);
804
805 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
806 num = REDIS_EXPIRELOOKUPS_PER_CRON;
807 while (num--) {
808 dictEntry *de;
809 time_t t;
810
811 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
812 t = (time_t) dictGetEntryVal(de);
813 if (now > t) {
814 deleteKey(db,dictGetEntryKey(de));
815 }
816 }
817 }
818 }
819
820 /* Check if we should connect to a MASTER */
821 if (server.replstate == REDIS_REPL_CONNECT) {
822 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
823 if (syncWithMaster() == REDIS_OK) {
824 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
825 }
826 }
827 return 1000;
828 }
829
830 static void createSharedObjects(void) {
831 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
832 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
833 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
834 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
835 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
836 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
837 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
838 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
839 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
840 /* no such key */
841 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
842 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
843 "-ERR Operation against a key holding the wrong kind of value\r\n"));
844 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
845 "-ERR no such key\r\n"));
846 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
847 "-ERR syntax error\r\n"));
848 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
849 "-ERR source and destination objects are the same\r\n"));
850 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
851 "-ERR index out of range\r\n"));
852 shared.space = createObject(REDIS_STRING,sdsnew(" "));
853 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
854 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
855 shared.select0 = createStringObject("select 0\r\n",10);
856 shared.select1 = createStringObject("select 1\r\n",10);
857 shared.select2 = createStringObject("select 2\r\n",10);
858 shared.select3 = createStringObject("select 3\r\n",10);
859 shared.select4 = createStringObject("select 4\r\n",10);
860 shared.select5 = createStringObject("select 5\r\n",10);
861 shared.select6 = createStringObject("select 6\r\n",10);
862 shared.select7 = createStringObject("select 7\r\n",10);
863 shared.select8 = createStringObject("select 8\r\n",10);
864 shared.select9 = createStringObject("select 9\r\n",10);
865 }
866
867 static void appendServerSaveParams(time_t seconds, int changes) {
868 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
869 if (server.saveparams == NULL) oom("appendServerSaveParams");
870 server.saveparams[server.saveparamslen].seconds = seconds;
871 server.saveparams[server.saveparamslen].changes = changes;
872 server.saveparamslen++;
873 }
874
875 static void ResetServerSaveParams() {
876 zfree(server.saveparams);
877 server.saveparams = NULL;
878 server.saveparamslen = 0;
879 }
880
881 static void initServerConfig() {
882 server.dbnum = REDIS_DEFAULT_DBNUM;
883 server.port = REDIS_SERVERPORT;
884 server.verbosity = REDIS_DEBUG;
885 server.maxidletime = REDIS_MAXIDLETIME;
886 server.saveparams = NULL;
887 server.logfile = NULL; /* NULL = log on standard output */
888 server.bindaddr = NULL;
889 server.glueoutputbuf = 1;
890 server.daemonize = 0;
891 server.pidfile = "/var/run/redis.pid";
892 server.dbfilename = "dump.rdb";
893 server.requirepass = NULL;
894 server.shareobjects = 0;
895 server.maxclients = 0;
896 server.maxmemory = 0;
897 ResetServerSaveParams();
898
899 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
900 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
901 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
902 /* Replication related */
903 server.isslave = 0;
904 server.masterhost = NULL;
905 server.masterport = 6379;
906 server.master = NULL;
907 server.replstate = REDIS_REPL_NONE;
908 }
909
910 static void initServer() {
911 int j;
912
913 signal(SIGHUP, SIG_IGN);
914 signal(SIGPIPE, SIG_IGN);
915 setupSigSegvAction();
916
917 server.clients = listCreate();
918 server.slaves = listCreate();
919 server.monitors = listCreate();
920 server.objfreelist = listCreate();
921 createSharedObjects();
922 server.el = aeCreateEventLoop();
923 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
924 server.sharingpool = dictCreate(&setDictType,NULL);
925 server.sharingpoolsize = 1024;
926 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
927 oom("server initialization"); /* Fatal OOM */
928 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
929 if (server.fd == -1) {
930 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
931 exit(1);
932 }
933 for (j = 0; j < server.dbnum; j++) {
934 server.db[j].dict = dictCreate(&hashDictType,NULL);
935 server.db[j].expires = dictCreate(&setDictType,NULL);
936 server.db[j].id = j;
937 }
938 server.cronloops = 0;
939 server.bgsaveinprogress = 0;
940 server.bgsavechildpid = -1;
941 server.lastsave = time(NULL);
942 server.dirty = 0;
943 server.usedmemory = 0;
944 server.stat_numcommands = 0;
945 server.stat_numconnections = 0;
946 server.stat_starttime = time(NULL);
947 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
948 }
949
950 /* Empty the whole database */
951 static long long emptyDb() {
952 int j;
953 long long removed = 0;
954
955 for (j = 0; j < server.dbnum; j++) {
956 removed += dictSize(server.db[j].dict);
957 dictEmpty(server.db[j].dict);
958 dictEmpty(server.db[j].expires);
959 }
960 return removed;
961 }
962
963 static int yesnotoi(char *s) {
964 if (!strcasecmp(s,"yes")) return 1;
965 else if (!strcasecmp(s,"no")) return 0;
966 else return -1;
967 }
968
969 /* I agree, this is a very rudimental way to load a configuration...
970 will improve later if the config gets more complex */
971 static void loadServerConfig(char *filename) {
972 FILE *fp = fopen(filename,"r");
973 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
974 int linenum = 0;
975 sds line = NULL;
976
977 if (!fp) {
978 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
979 exit(1);
980 }
981 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
982 sds *argv;
983 int argc, j;
984
985 linenum++;
986 line = sdsnew(buf);
987 line = sdstrim(line," \t\r\n");
988
989 /* Skip comments and blank lines*/
990 if (line[0] == '#' || line[0] == '\0') {
991 sdsfree(line);
992 continue;
993 }
994
995 /* Split into arguments */
996 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
997 sdstolower(argv[0]);
998
999 /* Execute config directives */
1000 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1001 server.maxidletime = atoi(argv[1]);
1002 if (server.maxidletime < 0) {
1003 err = "Invalid timeout value"; goto loaderr;
1004 }
1005 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1006 server.port = atoi(argv[1]);
1007 if (server.port < 1 || server.port > 65535) {
1008 err = "Invalid port"; goto loaderr;
1009 }
1010 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1011 server.bindaddr = zstrdup(argv[1]);
1012 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1013 int seconds = atoi(argv[1]);
1014 int changes = atoi(argv[2]);
1015 if (seconds < 1 || changes < 0) {
1016 err = "Invalid save parameters"; goto loaderr;
1017 }
1018 appendServerSaveParams(seconds,changes);
1019 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1020 if (chdir(argv[1]) == -1) {
1021 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1022 argv[1], strerror(errno));
1023 exit(1);
1024 }
1025 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1026 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1027 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1028 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1029 else {
1030 err = "Invalid log level. Must be one of debug, notice, warning";
1031 goto loaderr;
1032 }
1033 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1034 FILE *fp;
1035
1036 server.logfile = zstrdup(argv[1]);
1037 if (!strcasecmp(server.logfile,"stdout")) {
1038 zfree(server.logfile);
1039 server.logfile = NULL;
1040 }
1041 if (server.logfile) {
1042 /* Test if we are able to open the file. The server will not
1043 * be able to abort just for this problem later... */
1044 fp = fopen(server.logfile,"a");
1045 if (fp == NULL) {
1046 err = sdscatprintf(sdsempty(),
1047 "Can't open the log file: %s", strerror(errno));
1048 goto loaderr;
1049 }
1050 fclose(fp);
1051 }
1052 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1053 server.dbnum = atoi(argv[1]);
1054 if (server.dbnum < 1) {
1055 err = "Invalid number of databases"; goto loaderr;
1056 }
1057 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1058 server.maxclients = atoi(argv[1]);
1059 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1060 server.maxmemory = atoi(argv[1]);
1061 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1062 server.masterhost = sdsnew(argv[1]);
1063 server.masterport = atoi(argv[2]);
1064 server.replstate = REDIS_REPL_CONNECT;
1065 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1066 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1067 err = "argument must be 'yes' or 'no'"; goto loaderr;
1068 }
1069 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1070 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1071 err = "argument must be 'yes' or 'no'"; goto loaderr;
1072 }
1073 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1074 server.sharingpoolsize = atoi(argv[1]);
1075 if (server.sharingpoolsize < 1) {
1076 err = "invalid object sharing pool size"; goto loaderr;
1077 }
1078 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1079 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1080 err = "argument must be 'yes' or 'no'"; goto loaderr;
1081 }
1082 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1083 server.requirepass = zstrdup(argv[1]);
1084 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1085 server.pidfile = zstrdup(argv[1]);
1086 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1087 server.dbfilename = zstrdup(argv[1]);
1088 } else {
1089 err = "Bad directive or wrong number of arguments"; goto loaderr;
1090 }
1091 for (j = 0; j < argc; j++)
1092 sdsfree(argv[j]);
1093 zfree(argv);
1094 sdsfree(line);
1095 }
1096 fclose(fp);
1097 return;
1098
1099 loaderr:
1100 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1101 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1102 fprintf(stderr, ">>> '%s'\n", line);
1103 fprintf(stderr, "%s\n", err);
1104 exit(1);
1105 }
1106
1107 static void freeClientArgv(redisClient *c) {
1108 int j;
1109
1110 for (j = 0; j < c->argc; j++)
1111 decrRefCount(c->argv[j]);
1112 c->argc = 0;
1113 }
1114
1115 static void freeClient(redisClient *c) {
1116 listNode *ln;
1117
1118 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1119 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1120 sdsfree(c->querybuf);
1121 listRelease(c->reply);
1122 freeClientArgv(c);
1123 close(c->fd);
1124 ln = listSearchKey(server.clients,c);
1125 assert(ln != NULL);
1126 listDelNode(server.clients,ln);
1127 if (c->flags & REDIS_SLAVE) {
1128 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1129 close(c->repldbfd);
1130 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1131 ln = listSearchKey(l,c);
1132 assert(ln != NULL);
1133 listDelNode(l,ln);
1134 }
1135 if (c->flags & REDIS_MASTER) {
1136 server.master = NULL;
1137 server.replstate = REDIS_REPL_CONNECT;
1138 }
1139 zfree(c->argv);
1140 zfree(c);
1141 }
1142
1143 static void glueReplyBuffersIfNeeded(redisClient *c) {
1144 int totlen = 0;
1145 listNode *ln;
1146 robj *o;
1147
1148 listRewind(c->reply);
1149 while((ln = listYield(c->reply))) {
1150 o = ln->value;
1151 totlen += sdslen(o->ptr);
1152 /* This optimization makes more sense if we don't have to copy
1153 * too much data */
1154 if (totlen > 1024) return;
1155 }
1156 if (totlen > 0) {
1157 char buf[1024];
1158 int copylen = 0;
1159
1160 listRewind(c->reply);
1161 while((ln = listYield(c->reply))) {
1162 o = ln->value;
1163 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1164 copylen += sdslen(o->ptr);
1165 listDelNode(c->reply,ln);
1166 }
1167 /* Now the output buffer is empty, add the new single element */
1168 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1169 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1170 }
1171 }
1172
1173 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1174 redisClient *c = privdata;
1175 int nwritten = 0, totwritten = 0, objlen;
1176 robj *o;
1177 REDIS_NOTUSED(el);
1178 REDIS_NOTUSED(mask);
1179
1180 if (server.glueoutputbuf && listLength(c->reply) > 1)
1181 glueReplyBuffersIfNeeded(c);
1182 while(listLength(c->reply)) {
1183 o = listNodeValue(listFirst(c->reply));
1184 objlen = sdslen(o->ptr);
1185
1186 if (objlen == 0) {
1187 listDelNode(c->reply,listFirst(c->reply));
1188 continue;
1189 }
1190
1191 if (c->flags & REDIS_MASTER) {
1192 /* Don't reply to a master */
1193 nwritten = objlen - c->sentlen;
1194 } else {
1195 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1196 if (nwritten <= 0) break;
1197 }
1198 c->sentlen += nwritten;
1199 totwritten += nwritten;
1200 /* If we fully sent the object on head go to the next one */
1201 if (c->sentlen == objlen) {
1202 listDelNode(c->reply,listFirst(c->reply));
1203 c->sentlen = 0;
1204 }
1205 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1206 * bytes, in a single threaded server it's a good idea to server
1207 * other clients as well, even if a very large request comes from
1208 * super fast link that is always able to accept data (in real world
1209 * terms think to 'KEYS *' against the loopback interfae) */
1210 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1211 }
1212 if (nwritten == -1) {
1213 if (errno == EAGAIN) {
1214 nwritten = 0;
1215 } else {
1216 redisLog(REDIS_DEBUG,
1217 "Error writing to client: %s", strerror(errno));
1218 freeClient(c);
1219 return;
1220 }
1221 }
1222 if (totwritten > 0) c->lastinteraction = time(NULL);
1223 if (listLength(c->reply) == 0) {
1224 c->sentlen = 0;
1225 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1226 }
1227 }
1228
1229 static struct redisCommand *lookupCommand(char *name) {
1230 int j = 0;
1231 while(cmdTable[j].name != NULL) {
1232 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1233 j++;
1234 }
1235 return NULL;
1236 }
1237
1238 /* resetClient prepare the client to process the next command */
1239 static void resetClient(redisClient *c) {
1240 freeClientArgv(c);
1241 c->bulklen = -1;
1242 }
1243
1244 /* If this function gets called we already read a whole
1245 * command, argments are in the client argv/argc fields.
1246 * processCommand() execute the command or prepare the
1247 * server for a bulk read from the client.
1248 *
1249 * If 1 is returned the client is still alive and valid and
1250 * and other operations can be performed by the caller. Otherwise
1251 * if 0 is returned the client was destroied (i.e. after QUIT). */
1252 static int processCommand(redisClient *c) {
1253 struct redisCommand *cmd;
1254 long long dirty;
1255
1256 /* Free some memory if needed (maxmemory setting) */
1257 if (server.maxmemory) freeMemoryIfNeeded();
1258
1259 /* The QUIT command is handled as a special case. Normal command
1260 * procs are unable to close the client connection safely */
1261 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1262 freeClient(c);
1263 return 0;
1264 }
1265 cmd = lookupCommand(c->argv[0]->ptr);
1266 if (!cmd) {
1267 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1268 resetClient(c);
1269 return 1;
1270 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1271 (c->argc < -cmd->arity)) {
1272 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1273 resetClient(c);
1274 return 1;
1275 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1276 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1277 resetClient(c);
1278 return 1;
1279 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1280 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1281
1282 decrRefCount(c->argv[c->argc-1]);
1283 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1284 c->argc--;
1285 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1286 resetClient(c);
1287 return 1;
1288 }
1289 c->argc--;
1290 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1291 /* It is possible that the bulk read is already in the
1292 * buffer. Check this condition and handle it accordingly */
1293 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1294 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1295 c->argc++;
1296 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1297 } else {
1298 return 1;
1299 }
1300 }
1301 /* Let's try to share objects on the command arguments vector */
1302 if (server.shareobjects) {
1303 int j;
1304 for(j = 1; j < c->argc; j++)
1305 c->argv[j] = tryObjectSharing(c->argv[j]);
1306 }
1307 /* Check if the user is authenticated */
1308 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1309 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1310 resetClient(c);
1311 return 1;
1312 }
1313
1314 /* Exec the command */
1315 dirty = server.dirty;
1316 cmd->proc(c);
1317 if (server.dirty-dirty != 0 && listLength(server.slaves))
1318 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1319 if (listLength(server.monitors))
1320 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1321 server.stat_numcommands++;
1322
1323 /* Prepare the client for the next command */
1324 if (c->flags & REDIS_CLOSE) {
1325 freeClient(c);
1326 return 0;
1327 }
1328 resetClient(c);
1329 return 1;
1330 }
1331
1332 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1333 listNode *ln;
1334 int outc = 0, j;
1335 robj **outv;
1336 /* (args*2)+1 is enough room for args, spaces, newlines */
1337 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1338
1339 if (argc <= REDIS_STATIC_ARGS) {
1340 outv = static_outv;
1341 } else {
1342 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1343 if (!outv) oom("replicationFeedSlaves");
1344 }
1345
1346 for (j = 0; j < argc; j++) {
1347 if (j != 0) outv[outc++] = shared.space;
1348 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1349 robj *lenobj;
1350
1351 lenobj = createObject(REDIS_STRING,
1352 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv[j]->ptr)));
1353 lenobj->refcount = 0;
1354 outv[outc++] = lenobj;
1355 }
1356 outv[outc++] = argv[j];
1357 }
1358 outv[outc++] = shared.crlf;
1359
1360 /* Increment all the refcounts at start and decrement at end in order to
1361 * be sure to free objects if there is no slave in a replication state
1362 * able to be feed with commands */
1363 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1364 listRewind(slaves);
1365 while((ln = listYield(slaves))) {
1366 redisClient *slave = ln->value;
1367
1368 /* Don't feed slaves that are still waiting for BGSAVE to start */
1369 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1370
1371 /* Feed all the other slaves, MONITORs and so on */
1372 if (slave->slaveseldb != dictid) {
1373 robj *selectcmd;
1374
1375 switch(dictid) {
1376 case 0: selectcmd = shared.select0; break;
1377 case 1: selectcmd = shared.select1; break;
1378 case 2: selectcmd = shared.select2; break;
1379 case 3: selectcmd = shared.select3; break;
1380 case 4: selectcmd = shared.select4; break;
1381 case 5: selectcmd = shared.select5; break;
1382 case 6: selectcmd = shared.select6; break;
1383 case 7: selectcmd = shared.select7; break;
1384 case 8: selectcmd = shared.select8; break;
1385 case 9: selectcmd = shared.select9; break;
1386 default:
1387 selectcmd = createObject(REDIS_STRING,
1388 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1389 selectcmd->refcount = 0;
1390 break;
1391 }
1392 addReply(slave,selectcmd);
1393 slave->slaveseldb = dictid;
1394 }
1395 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1396 }
1397 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1398 if (outv != static_outv) zfree(outv);
1399 }
1400
1401 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1402 redisClient *c = (redisClient*) privdata;
1403 char buf[REDIS_IOBUF_LEN];
1404 int nread;
1405 REDIS_NOTUSED(el);
1406 REDIS_NOTUSED(mask);
1407
1408 nread = read(fd, buf, REDIS_IOBUF_LEN);
1409 if (nread == -1) {
1410 if (errno == EAGAIN) {
1411 nread = 0;
1412 } else {
1413 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1414 freeClient(c);
1415 return;
1416 }
1417 } else if (nread == 0) {
1418 redisLog(REDIS_DEBUG, "Client closed connection");
1419 freeClient(c);
1420 return;
1421 }
1422 if (nread) {
1423 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1424 c->lastinteraction = time(NULL);
1425 } else {
1426 return;
1427 }
1428
1429 again:
1430 if (c->bulklen == -1) {
1431 /* Read the first line of the query */
1432 char *p = strchr(c->querybuf,'\n');
1433 size_t querylen;
1434 if (p) {
1435 sds query, *argv;
1436 int argc, j;
1437
1438 query = c->querybuf;
1439 c->querybuf = sdsempty();
1440 querylen = 1+(p-(query));
1441 if (sdslen(query) > querylen) {
1442 /* leave data after the first line of the query in the buffer */
1443 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1444 }
1445 *p = '\0'; /* remove "\n" */
1446 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1447 sdsupdatelen(query);
1448
1449 /* Now we can split the query in arguments */
1450 if (sdslen(query) == 0) {
1451 /* Ignore empty query */
1452 sdsfree(query);
1453 return;
1454 }
1455 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1456 if (argv == NULL) oom("sdssplitlen");
1457 sdsfree(query);
1458
1459 if (c->argv) zfree(c->argv);
1460 c->argv = zmalloc(sizeof(robj*)*argc);
1461 if (c->argv == NULL) oom("allocating arguments list for client");
1462
1463 for (j = 0; j < argc; j++) {
1464 if (sdslen(argv[j])) {
1465 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1466 c->argc++;
1467 } else {
1468 sdsfree(argv[j]);
1469 }
1470 }
1471 zfree(argv);
1472 /* Execute the command. If the client is still valid
1473 * after processCommand() return and there is something
1474 * on the query buffer try to process the next command. */
1475 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1476 return;
1477 } else if (sdslen(c->querybuf) >= 1024*32) {
1478 redisLog(REDIS_DEBUG, "Client protocol error");
1479 freeClient(c);
1480 return;
1481 }
1482 } else {
1483 /* Bulk read handling. Note that if we are at this point
1484 the client already sent a command terminated with a newline,
1485 we are reading the bulk data that is actually the last
1486 argument of the command. */
1487 int qbl = sdslen(c->querybuf);
1488
1489 if (c->bulklen <= qbl) {
1490 /* Copy everything but the final CRLF as final argument */
1491 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1492 c->argc++;
1493 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1494 processCommand(c);
1495 return;
1496 }
1497 }
1498 }
1499
1500 static int selectDb(redisClient *c, int id) {
1501 if (id < 0 || id >= server.dbnum)
1502 return REDIS_ERR;
1503 c->db = &server.db[id];
1504 return REDIS_OK;
1505 }
1506
1507 static void *dupClientReplyValue(void *o) {
1508 incrRefCount((robj*)o);
1509 return 0;
1510 }
1511
1512 static redisClient *createClient(int fd) {
1513 redisClient *c = zmalloc(sizeof(*c));
1514
1515 anetNonBlock(NULL,fd);
1516 anetTcpNoDelay(NULL,fd);
1517 if (!c) return NULL;
1518 selectDb(c,0);
1519 c->fd = fd;
1520 c->querybuf = sdsempty();
1521 c->argc = 0;
1522 c->argv = NULL;
1523 c->bulklen = -1;
1524 c->sentlen = 0;
1525 c->flags = 0;
1526 c->lastinteraction = time(NULL);
1527 c->authenticated = 0;
1528 c->replstate = REDIS_REPL_NONE;
1529 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1530 listSetFreeMethod(c->reply,decrRefCount);
1531 listSetDupMethod(c->reply,dupClientReplyValue);
1532 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1533 readQueryFromClient, c, NULL) == AE_ERR) {
1534 freeClient(c);
1535 return NULL;
1536 }
1537 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1538 return c;
1539 }
1540
1541 static void addReply(redisClient *c, robj *obj) {
1542 if (listLength(c->reply) == 0 &&
1543 (c->replstate == REDIS_REPL_NONE ||
1544 c->replstate == REDIS_REPL_ONLINE) &&
1545 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1546 sendReplyToClient, c, NULL) == AE_ERR) return;
1547 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1548 incrRefCount(obj);
1549 }
1550
1551 static void addReplySds(redisClient *c, sds s) {
1552 robj *o = createObject(REDIS_STRING,s);
1553 addReply(c,o);
1554 decrRefCount(o);
1555 }
1556
1557 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1558 int cport, cfd;
1559 char cip[128];
1560 redisClient *c;
1561 REDIS_NOTUSED(el);
1562 REDIS_NOTUSED(mask);
1563 REDIS_NOTUSED(privdata);
1564
1565 cfd = anetAccept(server.neterr, fd, cip, &cport);
1566 if (cfd == AE_ERR) {
1567 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1568 return;
1569 }
1570 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1571 if ((c = createClient(cfd)) == NULL) {
1572 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1573 close(cfd); /* May be already closed, just ingore errors */
1574 return;
1575 }
1576 /* If maxclient directive is set and this is one client more... close the
1577 * connection. Note that we create the client instead to check before
1578 * for this condition, since now the socket is already set in nonblocking
1579 * mode and we can send an error for free using the Kernel I/O */
1580 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1581 char *err = "-ERR max number of clients reached\r\n";
1582
1583 /* That's a best effort error message, don't check write errors */
1584 (void) write(c->fd,err,strlen(err));
1585 freeClient(c);
1586 return;
1587 }
1588 server.stat_numconnections++;
1589 }
1590
1591 /* ======================= Redis objects implementation ===================== */
1592
1593 static robj *createObject(int type, void *ptr) {
1594 robj *o;
1595
1596 if (listLength(server.objfreelist)) {
1597 listNode *head = listFirst(server.objfreelist);
1598 o = listNodeValue(head);
1599 listDelNode(server.objfreelist,head);
1600 } else {
1601 o = zmalloc(sizeof(*o));
1602 }
1603 if (!o) oom("createObject");
1604 o->type = type;
1605 o->ptr = ptr;
1606 o->refcount = 1;
1607 return o;
1608 }
1609
1610 static robj *createStringObject(char *ptr, size_t len) {
1611 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1612 }
1613
1614 static robj *createListObject(void) {
1615 list *l = listCreate();
1616
1617 if (!l) oom("listCreate");
1618 listSetFreeMethod(l,decrRefCount);
1619 return createObject(REDIS_LIST,l);
1620 }
1621
1622 static robj *createSetObject(void) {
1623 dict *d = dictCreate(&setDictType,NULL);
1624 if (!d) oom("dictCreate");
1625 return createObject(REDIS_SET,d);
1626 }
1627
1628 static void freeStringObject(robj *o) {
1629 sdsfree(o->ptr);
1630 }
1631
1632 static void freeListObject(robj *o) {
1633 listRelease((list*) o->ptr);
1634 }
1635
1636 static void freeSetObject(robj *o) {
1637 dictRelease((dict*) o->ptr);
1638 }
1639
1640 static void freeHashObject(robj *o) {
1641 dictRelease((dict*) o->ptr);
1642 }
1643
1644 static void incrRefCount(robj *o) {
1645 o->refcount++;
1646 #ifdef DEBUG_REFCOUNT
1647 if (o->type == REDIS_STRING)
1648 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1649 #endif
1650 }
1651
1652 static void decrRefCount(void *obj) {
1653 robj *o = obj;
1654
1655 #ifdef DEBUG_REFCOUNT
1656 if (o->type == REDIS_STRING)
1657 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1658 #endif
1659 if (--(o->refcount) == 0) {
1660 switch(o->type) {
1661 case REDIS_STRING: freeStringObject(o); break;
1662 case REDIS_LIST: freeListObject(o); break;
1663 case REDIS_SET: freeSetObject(o); break;
1664 case REDIS_HASH: freeHashObject(o); break;
1665 default: assert(0 != 0); break;
1666 }
1667 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1668 !listAddNodeHead(server.objfreelist,o))
1669 zfree(o);
1670 }
1671 }
1672
1673 /* Try to share an object against the shared objects pool */
1674 static robj *tryObjectSharing(robj *o) {
1675 struct dictEntry *de;
1676 unsigned long c;
1677
1678 if (o == NULL || server.shareobjects == 0) return o;
1679
1680 assert(o->type == REDIS_STRING);
1681 de = dictFind(server.sharingpool,o);
1682 if (de) {
1683 robj *shared = dictGetEntryKey(de);
1684
1685 c = ((unsigned long) dictGetEntryVal(de))+1;
1686 dictGetEntryVal(de) = (void*) c;
1687 incrRefCount(shared);
1688 decrRefCount(o);
1689 return shared;
1690 } else {
1691 /* Here we are using a stream algorihtm: Every time an object is
1692 * shared we increment its count, everytime there is a miss we
1693 * recrement the counter of a random object. If this object reaches
1694 * zero we remove the object and put the current object instead. */
1695 if (dictSize(server.sharingpool) >=
1696 server.sharingpoolsize) {
1697 de = dictGetRandomKey(server.sharingpool);
1698 assert(de != NULL);
1699 c = ((unsigned long) dictGetEntryVal(de))-1;
1700 dictGetEntryVal(de) = (void*) c;
1701 if (c == 0) {
1702 dictDelete(server.sharingpool,de->key);
1703 }
1704 } else {
1705 c = 0; /* If the pool is empty we want to add this object */
1706 }
1707 if (c == 0) {
1708 int retval;
1709
1710 retval = dictAdd(server.sharingpool,o,(void*)1);
1711 assert(retval == DICT_OK);
1712 incrRefCount(o);
1713 }
1714 return o;
1715 }
1716 }
1717
1718 static robj *lookupKey(redisDb *db, robj *key) {
1719 dictEntry *de = dictFind(db->dict,key);
1720 return de ? dictGetEntryVal(de) : NULL;
1721 }
1722
1723 static robj *lookupKeyRead(redisDb *db, robj *key) {
1724 expireIfNeeded(db,key);
1725 return lookupKey(db,key);
1726 }
1727
1728 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1729 deleteIfVolatile(db,key);
1730 return lookupKey(db,key);
1731 }
1732
1733 static int deleteKey(redisDb *db, robj *key) {
1734 int retval;
1735
1736 /* We need to protect key from destruction: after the first dictDelete()
1737 * it may happen that 'key' is no longer valid if we don't increment
1738 * it's count. This may happen when we get the object reference directly
1739 * from the hash table with dictRandomKey() or dict iterators */
1740 incrRefCount(key);
1741 if (dictSize(db->expires)) dictDelete(db->expires,key);
1742 retval = dictDelete(db->dict,key);
1743 decrRefCount(key);
1744
1745 return retval == DICT_OK;
1746 }
1747
1748 /*============================ DB saving/loading ============================ */
1749
1750 static int rdbSaveType(FILE *fp, unsigned char type) {
1751 if (fwrite(&type,1,1,fp) == 0) return -1;
1752 return 0;
1753 }
1754
1755 static int rdbSaveTime(FILE *fp, time_t t) {
1756 int32_t t32 = (int32_t) t;
1757 if (fwrite(&t32,4,1,fp) == 0) return -1;
1758 return 0;
1759 }
1760
1761 /* check rdbLoadLen() comments for more info */
1762 static int rdbSaveLen(FILE *fp, uint32_t len) {
1763 unsigned char buf[2];
1764
1765 if (len < (1<<6)) {
1766 /* Save a 6 bit len */
1767 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
1768 if (fwrite(buf,1,1,fp) == 0) return -1;
1769 } else if (len < (1<<14)) {
1770 /* Save a 14 bit len */
1771 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
1772 buf[1] = len&0xFF;
1773 if (fwrite(buf,2,1,fp) == 0) return -1;
1774 } else {
1775 /* Save a 32 bit len */
1776 buf[0] = (REDIS_RDB_32BITLEN<<6);
1777 if (fwrite(buf,1,1,fp) == 0) return -1;
1778 len = htonl(len);
1779 if (fwrite(&len,4,1,fp) == 0) return -1;
1780 }
1781 return 0;
1782 }
1783
1784 /* String objects in the form "2391" "-100" without any space and with a
1785 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1786 * encoded as integers to save space */
1787 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
1788 long long value;
1789 char *endptr, buf[32];
1790
1791 /* Check if it's possible to encode this value as a number */
1792 value = strtoll(s, &endptr, 10);
1793 if (endptr[0] != '\0') return 0;
1794 snprintf(buf,32,"%lld",value);
1795
1796 /* If the number converted back into a string is not identical
1797 * then it's not possible to encode the string as integer */
1798 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
1799
1800 /* Finally check if it fits in our ranges */
1801 if (value >= -(1<<7) && value <= (1<<7)-1) {
1802 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
1803 enc[1] = value&0xFF;
1804 return 2;
1805 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
1806 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
1807 enc[1] = value&0xFF;
1808 enc[2] = (value>>8)&0xFF;
1809 return 3;
1810 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
1811 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
1812 enc[1] = value&0xFF;
1813 enc[2] = (value>>8)&0xFF;
1814 enc[3] = (value>>16)&0xFF;
1815 enc[4] = (value>>24)&0xFF;
1816 return 5;
1817 } else {
1818 return 0;
1819 }
1820 }
1821
1822 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
1823 unsigned int comprlen, outlen;
1824 unsigned char byte;
1825 void *out;
1826
1827 /* We require at least four bytes compression for this to be worth it */
1828 outlen = sdslen(obj->ptr)-4;
1829 if (outlen <= 0) return 0;
1830 if ((out = zmalloc(outlen+1)) == NULL) return 0;
1831 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
1832 if (comprlen == 0) {
1833 zfree(out);
1834 return 0;
1835 }
1836 /* Data compressed! Let's save it on disk */
1837 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
1838 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
1839 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
1840 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
1841 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
1842 zfree(out);
1843 return comprlen;
1844
1845 writeerr:
1846 zfree(out);
1847 return -1;
1848 }
1849
1850 /* Save a string objet as [len][data] on disk. If the object is a string
1851 * representation of an integer value we try to safe it in a special form */
1852 static int rdbSaveStringObject(FILE *fp, robj *obj) {
1853 size_t len = sdslen(obj->ptr);
1854 int enclen;
1855
1856 /* Try integer encoding */
1857 if (len <= 11) {
1858 unsigned char buf[5];
1859 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
1860 if (fwrite(buf,enclen,1,fp) == 0) return -1;
1861 return 0;
1862 }
1863 }
1864
1865 /* Try LZF compression - under 20 bytes it's unable to compress even
1866 * aaaaaaaaaaaaaaaaaa so skip it */
1867 if (1 && len > 20) {
1868 int retval;
1869
1870 retval = rdbSaveLzfStringObject(fp,obj);
1871 if (retval == -1) return -1;
1872 if (retval > 0) return 0;
1873 /* retval == 0 means data can't be compressed, save the old way */
1874 }
1875
1876 /* Store verbatim */
1877 if (rdbSaveLen(fp,len) == -1) return -1;
1878 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
1879 return 0;
1880 }
1881
1882 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1883 static int rdbSave(char *filename) {
1884 dictIterator *di = NULL;
1885 dictEntry *de;
1886 FILE *fp;
1887 char tmpfile[256];
1888 int j;
1889 time_t now = time(NULL);
1890
1891 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
1892 fp = fopen(tmpfile,"w");
1893 if (!fp) {
1894 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
1895 return REDIS_ERR;
1896 }
1897 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
1898 for (j = 0; j < server.dbnum; j++) {
1899 redisDb *db = server.db+j;
1900 dict *d = db->dict;
1901 if (dictSize(d) == 0) continue;
1902 di = dictGetIterator(d);
1903 if (!di) {
1904 fclose(fp);
1905 return REDIS_ERR;
1906 }
1907
1908 /* Write the SELECT DB opcode */
1909 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
1910 if (rdbSaveLen(fp,j) == -1) goto werr;
1911
1912 /* Iterate this DB writing every entry */
1913 while((de = dictNext(di)) != NULL) {
1914 robj *key = dictGetEntryKey(de);
1915 robj *o = dictGetEntryVal(de);
1916 time_t expiretime = getExpire(db,key);
1917
1918 /* Save the expire time */
1919 if (expiretime != -1) {
1920 /* If this key is already expired skip it */
1921 if (expiretime < now) continue;
1922 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
1923 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
1924 }
1925 /* Save the key and associated value */
1926 if (rdbSaveType(fp,o->type) == -1) goto werr;
1927 if (rdbSaveStringObject(fp,key) == -1) goto werr;
1928 if (o->type == REDIS_STRING) {
1929 /* Save a string value */
1930 if (rdbSaveStringObject(fp,o) == -1) goto werr;
1931 } else if (o->type == REDIS_LIST) {
1932 /* Save a list value */
1933 list *list = o->ptr;
1934 listNode *ln;
1935
1936 listRewind(list);
1937 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
1938 while((ln = listYield(list))) {
1939 robj *eleobj = listNodeValue(ln);
1940
1941 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1942 }
1943 } else if (o->type == REDIS_SET) {
1944 /* Save a set value */
1945 dict *set = o->ptr;
1946 dictIterator *di = dictGetIterator(set);
1947 dictEntry *de;
1948
1949 if (!set) oom("dictGetIteraotr");
1950 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
1951 while((de = dictNext(di)) != NULL) {
1952 robj *eleobj = dictGetEntryKey(de);
1953
1954 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1955 }
1956 dictReleaseIterator(di);
1957 } else {
1958 assert(0 != 0);
1959 }
1960 }
1961 dictReleaseIterator(di);
1962 }
1963 /* EOF opcode */
1964 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
1965
1966 /* Make sure data will not remain on the OS's output buffers */
1967 fflush(fp);
1968 fsync(fileno(fp));
1969 fclose(fp);
1970
1971 /* Use RENAME to make sure the DB file is changed atomically only
1972 * if the generate DB file is ok. */
1973 if (rename(tmpfile,filename) == -1) {
1974 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
1975 unlink(tmpfile);
1976 return REDIS_ERR;
1977 }
1978 redisLog(REDIS_NOTICE,"DB saved on disk");
1979 server.dirty = 0;
1980 server.lastsave = time(NULL);
1981 return REDIS_OK;
1982
1983 werr:
1984 fclose(fp);
1985 unlink(tmpfile);
1986 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1987 if (di) dictReleaseIterator(di);
1988 return REDIS_ERR;
1989 }
1990
1991 static int rdbSaveBackground(char *filename) {
1992 pid_t childpid;
1993
1994 if (server.bgsaveinprogress) return REDIS_ERR;
1995 if ((childpid = fork()) == 0) {
1996 /* Child */
1997 close(server.fd);
1998 if (rdbSave(filename) == REDIS_OK) {
1999 exit(0);
2000 } else {
2001 exit(1);
2002 }
2003 } else {
2004 /* Parent */
2005 if (childpid == -1) {
2006 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2007 strerror(errno));
2008 return REDIS_ERR;
2009 }
2010 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2011 server.bgsaveinprogress = 1;
2012 server.bgsavechildpid = childpid;
2013 return REDIS_OK;
2014 }
2015 return REDIS_OK; /* unreached */
2016 }
2017
2018 static int rdbLoadType(FILE *fp) {
2019 unsigned char type;
2020 if (fread(&type,1,1,fp) == 0) return -1;
2021 return type;
2022 }
2023
2024 static time_t rdbLoadTime(FILE *fp) {
2025 int32_t t32;
2026 if (fread(&t32,4,1,fp) == 0) return -1;
2027 return (time_t) t32;
2028 }
2029
2030 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2031 * of this file for a description of how this are stored on disk.
2032 *
2033 * isencoded is set to 1 if the readed length is not actually a length but
2034 * an "encoding type", check the above comments for more info */
2035 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2036 unsigned char buf[2];
2037 uint32_t len;
2038
2039 if (isencoded) *isencoded = 0;
2040 if (rdbver == 0) {
2041 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2042 return ntohl(len);
2043 } else {
2044 int type;
2045
2046 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2047 type = (buf[0]&0xC0)>>6;
2048 if (type == REDIS_RDB_6BITLEN) {
2049 /* Read a 6 bit len */
2050 return buf[0]&0x3F;
2051 } else if (type == REDIS_RDB_ENCVAL) {
2052 /* Read a 6 bit len encoding type */
2053 if (isencoded) *isencoded = 1;
2054 return buf[0]&0x3F;
2055 } else if (type == REDIS_RDB_14BITLEN) {
2056 /* Read a 14 bit len */
2057 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2058 return ((buf[0]&0x3F)<<8)|buf[1];
2059 } else {
2060 /* Read a 32 bit len */
2061 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2062 return ntohl(len);
2063 }
2064 }
2065 }
2066
2067 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2068 unsigned char enc[4];
2069 long long val;
2070
2071 if (enctype == REDIS_RDB_ENC_INT8) {
2072 if (fread(enc,1,1,fp) == 0) return NULL;
2073 val = (signed char)enc[0];
2074 } else if (enctype == REDIS_RDB_ENC_INT16) {
2075 uint16_t v;
2076 if (fread(enc,2,1,fp) == 0) return NULL;
2077 v = enc[0]|(enc[1]<<8);
2078 val = (int16_t)v;
2079 } else if (enctype == REDIS_RDB_ENC_INT32) {
2080 uint32_t v;
2081 if (fread(enc,4,1,fp) == 0) return NULL;
2082 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2083 val = (int32_t)v;
2084 } else {
2085 val = 0; /* anti-warning */
2086 assert(0!=0);
2087 }
2088 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2089 }
2090
2091 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2092 unsigned int len, clen;
2093 unsigned char *c = NULL;
2094 sds val = NULL;
2095
2096 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2097 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2098 if ((c = zmalloc(clen)) == NULL) goto err;
2099 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2100 if (fread(c,clen,1,fp) == 0) goto err;
2101 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2102 zfree(c);
2103 return createObject(REDIS_STRING,val);
2104 err:
2105 zfree(c);
2106 sdsfree(val);
2107 return NULL;
2108 }
2109
2110 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2111 int isencoded;
2112 uint32_t len;
2113 sds val;
2114
2115 len = rdbLoadLen(fp,rdbver,&isencoded);
2116 if (isencoded) {
2117 switch(len) {
2118 case REDIS_RDB_ENC_INT8:
2119 case REDIS_RDB_ENC_INT16:
2120 case REDIS_RDB_ENC_INT32:
2121 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2122 case REDIS_RDB_ENC_LZF:
2123 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2124 default:
2125 assert(0!=0);
2126 }
2127 }
2128
2129 if (len == REDIS_RDB_LENERR) return NULL;
2130 val = sdsnewlen(NULL,len);
2131 if (len && fread(val,len,1,fp) == 0) {
2132 sdsfree(val);
2133 return NULL;
2134 }
2135 return tryObjectSharing(createObject(REDIS_STRING,val));
2136 }
2137
2138 static int rdbLoad(char *filename) {
2139 FILE *fp;
2140 robj *keyobj = NULL;
2141 uint32_t dbid;
2142 int type, retval, rdbver;
2143 dict *d = server.db[0].dict;
2144 redisDb *db = server.db+0;
2145 char buf[1024];
2146 time_t expiretime = -1, now = time(NULL);
2147
2148 fp = fopen(filename,"r");
2149 if (!fp) return REDIS_ERR;
2150 if (fread(buf,9,1,fp) == 0) goto eoferr;
2151 buf[9] = '\0';
2152 if (memcmp(buf,"REDIS",5) != 0) {
2153 fclose(fp);
2154 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2155 return REDIS_ERR;
2156 }
2157 rdbver = atoi(buf+5);
2158 if (rdbver > 1) {
2159 fclose(fp);
2160 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2161 return REDIS_ERR;
2162 }
2163 while(1) {
2164 robj *o;
2165
2166 /* Read type. */
2167 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2168 if (type == REDIS_EXPIRETIME) {
2169 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2170 /* We read the time so we need to read the object type again */
2171 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2172 }
2173 if (type == REDIS_EOF) break;
2174 /* Handle SELECT DB opcode as a special case */
2175 if (type == REDIS_SELECTDB) {
2176 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2177 goto eoferr;
2178 if (dbid >= (unsigned)server.dbnum) {
2179 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2180 exit(1);
2181 }
2182 db = server.db+dbid;
2183 d = db->dict;
2184 continue;
2185 }
2186 /* Read key */
2187 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2188
2189 if (type == REDIS_STRING) {
2190 /* Read string value */
2191 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2192 } else if (type == REDIS_LIST || type == REDIS_SET) {
2193 /* Read list/set value */
2194 uint32_t listlen;
2195
2196 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2197 goto eoferr;
2198 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2199 /* Load every single element of the list/set */
2200 while(listlen--) {
2201 robj *ele;
2202
2203 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2204 if (type == REDIS_LIST) {
2205 if (!listAddNodeTail((list*)o->ptr,ele))
2206 oom("listAddNodeTail");
2207 } else {
2208 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2209 oom("dictAdd");
2210 }
2211 }
2212 } else {
2213 assert(0 != 0);
2214 }
2215 /* Add the new object in the hash table */
2216 retval = dictAdd(d,keyobj,o);
2217 if (retval == DICT_ERR) {
2218 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2219 exit(1);
2220 }
2221 /* Set the expire time if needed */
2222 if (expiretime != -1) {
2223 setExpire(db,keyobj,expiretime);
2224 /* Delete this key if already expired */
2225 if (expiretime < now) deleteKey(db,keyobj);
2226 expiretime = -1;
2227 }
2228 keyobj = o = NULL;
2229 }
2230 fclose(fp);
2231 return REDIS_OK;
2232
2233 eoferr: /* unexpected end of file is handled here with a fatal exit */
2234 if (keyobj) decrRefCount(keyobj);
2235 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2236 exit(1);
2237 return REDIS_ERR; /* Just to avoid warning */
2238 }
2239
2240 /*================================== Commands =============================== */
2241
2242 static void authCommand(redisClient *c) {
2243 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2244 c->authenticated = 1;
2245 addReply(c,shared.ok);
2246 } else {
2247 c->authenticated = 0;
2248 addReply(c,shared.err);
2249 }
2250 }
2251
2252 static void pingCommand(redisClient *c) {
2253 addReply(c,shared.pong);
2254 }
2255
2256 static void echoCommand(redisClient *c) {
2257 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
2258 (int)sdslen(c->argv[1]->ptr)));
2259 addReply(c,c->argv[1]);
2260 addReply(c,shared.crlf);
2261 }
2262
2263 /*=================================== Strings =============================== */
2264
2265 static void setGenericCommand(redisClient *c, int nx) {
2266 int retval;
2267
2268 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2269 if (retval == DICT_ERR) {
2270 if (!nx) {
2271 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2272 incrRefCount(c->argv[2]);
2273 } else {
2274 addReply(c,shared.czero);
2275 return;
2276 }
2277 } else {
2278 incrRefCount(c->argv[1]);
2279 incrRefCount(c->argv[2]);
2280 }
2281 server.dirty++;
2282 removeExpire(c->db,c->argv[1]);
2283 addReply(c, nx ? shared.cone : shared.ok);
2284 }
2285
2286 static void setCommand(redisClient *c) {
2287 setGenericCommand(c,0);
2288 }
2289
2290 static void setnxCommand(redisClient *c) {
2291 setGenericCommand(c,1);
2292 }
2293
2294 static void getCommand(redisClient *c) {
2295 robj *o = lookupKeyRead(c->db,c->argv[1]);
2296
2297 if (o == NULL) {
2298 addReply(c,shared.nullbulk);
2299 } else {
2300 if (o->type != REDIS_STRING) {
2301 addReply(c,shared.wrongtypeerr);
2302 } else {
2303 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2304 addReply(c,o);
2305 addReply(c,shared.crlf);
2306 }
2307 }
2308 }
2309
2310 static void getSetCommand(redisClient *c) {
2311 getCommand(c);
2312 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2313 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2314 } else {
2315 incrRefCount(c->argv[1]);
2316 }
2317 incrRefCount(c->argv[2]);
2318 server.dirty++;
2319 removeExpire(c->db,c->argv[1]);
2320 }
2321
2322 static void mgetCommand(redisClient *c) {
2323 int j;
2324
2325 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2326 for (j = 1; j < c->argc; j++) {
2327 robj *o = lookupKeyRead(c->db,c->argv[j]);
2328 if (o == NULL) {
2329 addReply(c,shared.nullbulk);
2330 } else {
2331 if (o->type != REDIS_STRING) {
2332 addReply(c,shared.nullbulk);
2333 } else {
2334 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2335 addReply(c,o);
2336 addReply(c,shared.crlf);
2337 }
2338 }
2339 }
2340 }
2341
2342 static void incrDecrCommand(redisClient *c, long long incr) {
2343 long long value;
2344 int retval;
2345 robj *o;
2346
2347 o = lookupKeyWrite(c->db,c->argv[1]);
2348 if (o == NULL) {
2349 value = 0;
2350 } else {
2351 if (o->type != REDIS_STRING) {
2352 value = 0;
2353 } else {
2354 char *eptr;
2355
2356 value = strtoll(o->ptr, &eptr, 10);
2357 }
2358 }
2359
2360 value += incr;
2361 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2362 retval = dictAdd(c->db->dict,c->argv[1],o);
2363 if (retval == DICT_ERR) {
2364 dictReplace(c->db->dict,c->argv[1],o);
2365 removeExpire(c->db,c->argv[1]);
2366 } else {
2367 incrRefCount(c->argv[1]);
2368 }
2369 server.dirty++;
2370 addReply(c,shared.colon);
2371 addReply(c,o);
2372 addReply(c,shared.crlf);
2373 }
2374
2375 static void incrCommand(redisClient *c) {
2376 incrDecrCommand(c,1);
2377 }
2378
2379 static void decrCommand(redisClient *c) {
2380 incrDecrCommand(c,-1);
2381 }
2382
2383 static void incrbyCommand(redisClient *c) {
2384 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2385 incrDecrCommand(c,incr);
2386 }
2387
2388 static void decrbyCommand(redisClient *c) {
2389 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2390 incrDecrCommand(c,-incr);
2391 }
2392
2393 /* ========================= Type agnostic commands ========================= */
2394
2395 static void delCommand(redisClient *c) {
2396 int deleted = 0, j;
2397
2398 for (j = 1; j < c->argc; j++) {
2399 if (deleteKey(c->db,c->argv[j])) {
2400 server.dirty++;
2401 deleted++;
2402 }
2403 }
2404 switch(deleted) {
2405 case 0:
2406 addReply(c,shared.czero);
2407 break;
2408 case 1:
2409 addReply(c,shared.cone);
2410 break;
2411 default:
2412 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2413 break;
2414 }
2415 }
2416
2417 static void existsCommand(redisClient *c) {
2418 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2419 }
2420
2421 static void selectCommand(redisClient *c) {
2422 int id = atoi(c->argv[1]->ptr);
2423
2424 if (selectDb(c,id) == REDIS_ERR) {
2425 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2426 } else {
2427 addReply(c,shared.ok);
2428 }
2429 }
2430
2431 static void randomkeyCommand(redisClient *c) {
2432 dictEntry *de;
2433
2434 while(1) {
2435 de = dictGetRandomKey(c->db->dict);
2436 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2437 }
2438 if (de == NULL) {
2439 addReply(c,shared.plus);
2440 addReply(c,shared.crlf);
2441 } else {
2442 addReply(c,shared.plus);
2443 addReply(c,dictGetEntryKey(de));
2444 addReply(c,shared.crlf);
2445 }
2446 }
2447
2448 static void keysCommand(redisClient *c) {
2449 dictIterator *di;
2450 dictEntry *de;
2451 sds pattern = c->argv[1]->ptr;
2452 int plen = sdslen(pattern);
2453 int numkeys = 0, keyslen = 0;
2454 robj *lenobj = createObject(REDIS_STRING,NULL);
2455
2456 di = dictGetIterator(c->db->dict);
2457 if (!di) oom("dictGetIterator");
2458 addReply(c,lenobj);
2459 decrRefCount(lenobj);
2460 while((de = dictNext(di)) != NULL) {
2461 robj *keyobj = dictGetEntryKey(de);
2462
2463 sds key = keyobj->ptr;
2464 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2465 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2466 if (expireIfNeeded(c->db,keyobj) == 0) {
2467 if (numkeys != 0)
2468 addReply(c,shared.space);
2469 addReply(c,keyobj);
2470 numkeys++;
2471 keyslen += sdslen(key);
2472 }
2473 }
2474 }
2475 dictReleaseIterator(di);
2476 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2477 addReply(c,shared.crlf);
2478 }
2479
2480 static void dbsizeCommand(redisClient *c) {
2481 addReplySds(c,
2482 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2483 }
2484
2485 static void lastsaveCommand(redisClient *c) {
2486 addReplySds(c,
2487 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2488 }
2489
2490 static void typeCommand(redisClient *c) {
2491 robj *o;
2492 char *type;
2493
2494 o = lookupKeyRead(c->db,c->argv[1]);
2495 if (o == NULL) {
2496 type = "+none";
2497 } else {
2498 switch(o->type) {
2499 case REDIS_STRING: type = "+string"; break;
2500 case REDIS_LIST: type = "+list"; break;
2501 case REDIS_SET: type = "+set"; break;
2502 default: type = "unknown"; break;
2503 }
2504 }
2505 addReplySds(c,sdsnew(type));
2506 addReply(c,shared.crlf);
2507 }
2508
2509 static void saveCommand(redisClient *c) {
2510 if (server.bgsaveinprogress) {
2511 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2512 return;
2513 }
2514 if (rdbSave(server.dbfilename) == REDIS_OK) {
2515 addReply(c,shared.ok);
2516 } else {
2517 addReply(c,shared.err);
2518 }
2519 }
2520
2521 static void bgsaveCommand(redisClient *c) {
2522 if (server.bgsaveinprogress) {
2523 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2524 return;
2525 }
2526 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2527 addReply(c,shared.ok);
2528 } else {
2529 addReply(c,shared.err);
2530 }
2531 }
2532
2533 static void shutdownCommand(redisClient *c) {
2534 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2535 if (server.bgsaveinprogress) {
2536 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2537 signal(SIGCHLD, SIG_IGN);
2538 kill(server.bgsavechildpid,SIGKILL);
2539 }
2540 if (rdbSave(server.dbfilename) == REDIS_OK) {
2541 if (server.daemonize)
2542 unlink(server.pidfile);
2543 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2544 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2545 exit(1);
2546 } else {
2547 signal(SIGCHLD, SIG_DFL);
2548 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2549 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2550 }
2551 }
2552
2553 static void renameGenericCommand(redisClient *c, int nx) {
2554 robj *o;
2555
2556 /* To use the same key as src and dst is probably an error */
2557 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2558 addReply(c,shared.sameobjecterr);
2559 return;
2560 }
2561
2562 o = lookupKeyWrite(c->db,c->argv[1]);
2563 if (o == NULL) {
2564 addReply(c,shared.nokeyerr);
2565 return;
2566 }
2567 incrRefCount(o);
2568 deleteIfVolatile(c->db,c->argv[2]);
2569 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2570 if (nx) {
2571 decrRefCount(o);
2572 addReply(c,shared.czero);
2573 return;
2574 }
2575 dictReplace(c->db->dict,c->argv[2],o);
2576 } else {
2577 incrRefCount(c->argv[2]);
2578 }
2579 deleteKey(c->db,c->argv[1]);
2580 server.dirty++;
2581 addReply(c,nx ? shared.cone : shared.ok);
2582 }
2583
2584 static void renameCommand(redisClient *c) {
2585 renameGenericCommand(c,0);
2586 }
2587
2588 static void renamenxCommand(redisClient *c) {
2589 renameGenericCommand(c,1);
2590 }
2591
2592 static void moveCommand(redisClient *c) {
2593 robj *o;
2594 redisDb *src, *dst;
2595 int srcid;
2596
2597 /* Obtain source and target DB pointers */
2598 src = c->db;
2599 srcid = c->db->id;
2600 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2601 addReply(c,shared.outofrangeerr);
2602 return;
2603 }
2604 dst = c->db;
2605 selectDb(c,srcid); /* Back to the source DB */
2606
2607 /* If the user is moving using as target the same
2608 * DB as the source DB it is probably an error. */
2609 if (src == dst) {
2610 addReply(c,shared.sameobjecterr);
2611 return;
2612 }
2613
2614 /* Check if the element exists and get a reference */
2615 o = lookupKeyWrite(c->db,c->argv[1]);
2616 if (!o) {
2617 addReply(c,shared.czero);
2618 return;
2619 }
2620
2621 /* Try to add the element to the target DB */
2622 deleteIfVolatile(dst,c->argv[1]);
2623 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2624 addReply(c,shared.czero);
2625 return;
2626 }
2627 incrRefCount(c->argv[1]);
2628 incrRefCount(o);
2629
2630 /* OK! key moved, free the entry in the source DB */
2631 deleteKey(src,c->argv[1]);
2632 server.dirty++;
2633 addReply(c,shared.cone);
2634 }
2635
2636 /* =================================== Lists ================================ */
2637 static void pushGenericCommand(redisClient *c, int where) {
2638 robj *lobj;
2639 list *list;
2640
2641 lobj = lookupKeyWrite(c->db,c->argv[1]);
2642 if (lobj == NULL) {
2643 lobj = createListObject();
2644 list = lobj->ptr;
2645 if (where == REDIS_HEAD) {
2646 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2647 } else {
2648 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2649 }
2650 dictAdd(c->db->dict,c->argv[1],lobj);
2651 incrRefCount(c->argv[1]);
2652 incrRefCount(c->argv[2]);
2653 } else {
2654 if (lobj->type != REDIS_LIST) {
2655 addReply(c,shared.wrongtypeerr);
2656 return;
2657 }
2658 list = lobj->ptr;
2659 if (where == REDIS_HEAD) {
2660 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2661 } else {
2662 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2663 }
2664 incrRefCount(c->argv[2]);
2665 }
2666 server.dirty++;
2667 addReply(c,shared.ok);
2668 }
2669
2670 static void lpushCommand(redisClient *c) {
2671 pushGenericCommand(c,REDIS_HEAD);
2672 }
2673
2674 static void rpushCommand(redisClient *c) {
2675 pushGenericCommand(c,REDIS_TAIL);
2676 }
2677
2678 static void llenCommand(redisClient *c) {
2679 robj *o;
2680 list *l;
2681
2682 o = lookupKeyRead(c->db,c->argv[1]);
2683 if (o == NULL) {
2684 addReply(c,shared.czero);
2685 return;
2686 } else {
2687 if (o->type != REDIS_LIST) {
2688 addReply(c,shared.wrongtypeerr);
2689 } else {
2690 l = o->ptr;
2691 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
2692 }
2693 }
2694 }
2695
2696 static void lindexCommand(redisClient *c) {
2697 robj *o;
2698 int index = atoi(c->argv[2]->ptr);
2699
2700 o = lookupKeyRead(c->db,c->argv[1]);
2701 if (o == NULL) {
2702 addReply(c,shared.nullbulk);
2703 } else {
2704 if (o->type != REDIS_LIST) {
2705 addReply(c,shared.wrongtypeerr);
2706 } else {
2707 list *list = o->ptr;
2708 listNode *ln;
2709
2710 ln = listIndex(list, index);
2711 if (ln == NULL) {
2712 addReply(c,shared.nullbulk);
2713 } else {
2714 robj *ele = listNodeValue(ln);
2715 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2716 addReply(c,ele);
2717 addReply(c,shared.crlf);
2718 }
2719 }
2720 }
2721 }
2722
2723 static void lsetCommand(redisClient *c) {
2724 robj *o;
2725 int index = atoi(c->argv[2]->ptr);
2726
2727 o = lookupKeyWrite(c->db,c->argv[1]);
2728 if (o == NULL) {
2729 addReply(c,shared.nokeyerr);
2730 } else {
2731 if (o->type != REDIS_LIST) {
2732 addReply(c,shared.wrongtypeerr);
2733 } else {
2734 list *list = o->ptr;
2735 listNode *ln;
2736
2737 ln = listIndex(list, index);
2738 if (ln == NULL) {
2739 addReply(c,shared.outofrangeerr);
2740 } else {
2741 robj *ele = listNodeValue(ln);
2742
2743 decrRefCount(ele);
2744 listNodeValue(ln) = c->argv[3];
2745 incrRefCount(c->argv[3]);
2746 addReply(c,shared.ok);
2747 server.dirty++;
2748 }
2749 }
2750 }
2751 }
2752
2753 static void popGenericCommand(redisClient *c, int where) {
2754 robj *o;
2755
2756 o = lookupKeyWrite(c->db,c->argv[1]);
2757 if (o == NULL) {
2758 addReply(c,shared.nullbulk);
2759 } else {
2760 if (o->type != REDIS_LIST) {
2761 addReply(c,shared.wrongtypeerr);
2762 } else {
2763 list *list = o->ptr;
2764 listNode *ln;
2765
2766 if (where == REDIS_HEAD)
2767 ln = listFirst(list);
2768 else
2769 ln = listLast(list);
2770
2771 if (ln == NULL) {
2772 addReply(c,shared.nullbulk);
2773 } else {
2774 robj *ele = listNodeValue(ln);
2775 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2776 addReply(c,ele);
2777 addReply(c,shared.crlf);
2778 listDelNode(list,ln);
2779 server.dirty++;
2780 }
2781 }
2782 }
2783 }
2784
2785 static void lpopCommand(redisClient *c) {
2786 popGenericCommand(c,REDIS_HEAD);
2787 }
2788
2789 static void rpopCommand(redisClient *c) {
2790 popGenericCommand(c,REDIS_TAIL);
2791 }
2792
2793 static void lrangeCommand(redisClient *c) {
2794 robj *o;
2795 int start = atoi(c->argv[2]->ptr);
2796 int end = atoi(c->argv[3]->ptr);
2797
2798 o = lookupKeyRead(c->db,c->argv[1]);
2799 if (o == NULL) {
2800 addReply(c,shared.nullmultibulk);
2801 } else {
2802 if (o->type != REDIS_LIST) {
2803 addReply(c,shared.wrongtypeerr);
2804 } else {
2805 list *list = o->ptr;
2806 listNode *ln;
2807 int llen = listLength(list);
2808 int rangelen, j;
2809 robj *ele;
2810
2811 /* convert negative indexes */
2812 if (start < 0) start = llen+start;
2813 if (end < 0) end = llen+end;
2814 if (start < 0) start = 0;
2815 if (end < 0) end = 0;
2816
2817 /* indexes sanity checks */
2818 if (start > end || start >= llen) {
2819 /* Out of range start or start > end result in empty list */
2820 addReply(c,shared.emptymultibulk);
2821 return;
2822 }
2823 if (end >= llen) end = llen-1;
2824 rangelen = (end-start)+1;
2825
2826 /* Return the result in form of a multi-bulk reply */
2827 ln = listIndex(list, start);
2828 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
2829 for (j = 0; j < rangelen; j++) {
2830 ele = listNodeValue(ln);
2831 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2832 addReply(c,ele);
2833 addReply(c,shared.crlf);
2834 ln = ln->next;
2835 }
2836 }
2837 }
2838 }
2839
2840 static void ltrimCommand(redisClient *c) {
2841 robj *o;
2842 int start = atoi(c->argv[2]->ptr);
2843 int end = atoi(c->argv[3]->ptr);
2844
2845 o = lookupKeyWrite(c->db,c->argv[1]);
2846 if (o == NULL) {
2847 addReply(c,shared.nokeyerr);
2848 } else {
2849 if (o->type != REDIS_LIST) {
2850 addReply(c,shared.wrongtypeerr);
2851 } else {
2852 list *list = o->ptr;
2853 listNode *ln;
2854 int llen = listLength(list);
2855 int j, ltrim, rtrim;
2856
2857 /* convert negative indexes */
2858 if (start < 0) start = llen+start;
2859 if (end < 0) end = llen+end;
2860 if (start < 0) start = 0;
2861 if (end < 0) end = 0;
2862
2863 /* indexes sanity checks */
2864 if (start > end || start >= llen) {
2865 /* Out of range start or start > end result in empty list */
2866 ltrim = llen;
2867 rtrim = 0;
2868 } else {
2869 if (end >= llen) end = llen-1;
2870 ltrim = start;
2871 rtrim = llen-end-1;
2872 }
2873
2874 /* Remove list elements to perform the trim */
2875 for (j = 0; j < ltrim; j++) {
2876 ln = listFirst(list);
2877 listDelNode(list,ln);
2878 }
2879 for (j = 0; j < rtrim; j++) {
2880 ln = listLast(list);
2881 listDelNode(list,ln);
2882 }
2883 addReply(c,shared.ok);
2884 server.dirty++;
2885 }
2886 }
2887 }
2888
2889 static void lremCommand(redisClient *c) {
2890 robj *o;
2891
2892 o = lookupKeyWrite(c->db,c->argv[1]);
2893 if (o == NULL) {
2894 addReply(c,shared.czero);
2895 } else {
2896 if (o->type != REDIS_LIST) {
2897 addReply(c,shared.wrongtypeerr);
2898 } else {
2899 list *list = o->ptr;
2900 listNode *ln, *next;
2901 int toremove = atoi(c->argv[2]->ptr);
2902 int removed = 0;
2903 int fromtail = 0;
2904
2905 if (toremove < 0) {
2906 toremove = -toremove;
2907 fromtail = 1;
2908 }
2909 ln = fromtail ? list->tail : list->head;
2910 while (ln) {
2911 robj *ele = listNodeValue(ln);
2912
2913 next = fromtail ? ln->prev : ln->next;
2914 if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) {
2915 listDelNode(list,ln);
2916 server.dirty++;
2917 removed++;
2918 if (toremove && removed == toremove) break;
2919 }
2920 ln = next;
2921 }
2922 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
2923 }
2924 }
2925 }
2926
2927 /* ==================================== Sets ================================ */
2928
2929 static void saddCommand(redisClient *c) {
2930 robj *set;
2931
2932 set = lookupKeyWrite(c->db,c->argv[1]);
2933 if (set == NULL) {
2934 set = createSetObject();
2935 dictAdd(c->db->dict,c->argv[1],set);
2936 incrRefCount(c->argv[1]);
2937 } else {
2938 if (set->type != REDIS_SET) {
2939 addReply(c,shared.wrongtypeerr);
2940 return;
2941 }
2942 }
2943 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
2944 incrRefCount(c->argv[2]);
2945 server.dirty++;
2946 addReply(c,shared.cone);
2947 } else {
2948 addReply(c,shared.czero);
2949 }
2950 }
2951
2952 static void sremCommand(redisClient *c) {
2953 robj *set;
2954
2955 set = lookupKeyWrite(c->db,c->argv[1]);
2956 if (set == NULL) {
2957 addReply(c,shared.czero);
2958 } else {
2959 if (set->type != REDIS_SET) {
2960 addReply(c,shared.wrongtypeerr);
2961 return;
2962 }
2963 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
2964 server.dirty++;
2965 addReply(c,shared.cone);
2966 } else {
2967 addReply(c,shared.czero);
2968 }
2969 }
2970 }
2971
2972 static void smoveCommand(redisClient *c) {
2973 robj *srcset, *dstset;
2974
2975 srcset = lookupKeyWrite(c->db,c->argv[1]);
2976 dstset = lookupKeyWrite(c->db,c->argv[2]);
2977
2978 /* If the source key does not exist return 0, if it's of the wrong type
2979 * raise an error */
2980 if (srcset == NULL || srcset->type != REDIS_SET) {
2981 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
2982 return;
2983 }
2984 /* Error if the destination key is not a set as well */
2985 if (dstset && dstset->type != REDIS_SET) {
2986 addReply(c,shared.wrongtypeerr);
2987 return;
2988 }
2989 /* Remove the element from the source set */
2990 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
2991 /* Key not found in the src set! return zero */
2992 addReply(c,shared.czero);
2993 return;
2994 }
2995 server.dirty++;
2996 /* Add the element to the destination set */
2997 if (!dstset) {
2998 dstset = createSetObject();
2999 dictAdd(c->db->dict,c->argv[2],dstset);
3000 incrRefCount(c->argv[2]);
3001 }
3002 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3003 incrRefCount(c->argv[3]);
3004 addReply(c,shared.cone);
3005 }
3006
3007 static void sismemberCommand(redisClient *c) {
3008 robj *set;
3009
3010 set = lookupKeyRead(c->db,c->argv[1]);
3011 if (set == NULL) {
3012 addReply(c,shared.czero);
3013 } else {
3014 if (set->type != REDIS_SET) {
3015 addReply(c,shared.wrongtypeerr);
3016 return;
3017 }
3018 if (dictFind(set->ptr,c->argv[2]))
3019 addReply(c,shared.cone);
3020 else
3021 addReply(c,shared.czero);
3022 }
3023 }
3024
3025 static void scardCommand(redisClient *c) {
3026 robj *o;
3027 dict *s;
3028
3029 o = lookupKeyRead(c->db,c->argv[1]);
3030 if (o == NULL) {
3031 addReply(c,shared.czero);
3032 return;
3033 } else {
3034 if (o->type != REDIS_SET) {
3035 addReply(c,shared.wrongtypeerr);
3036 } else {
3037 s = o->ptr;
3038 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3039 dictSize(s)));
3040 }
3041 }
3042 }
3043
3044 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3045 dict **d1 = (void*) s1, **d2 = (void*) s2;
3046
3047 return dictSize(*d1)-dictSize(*d2);
3048 }
3049
3050 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3051 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3052 dictIterator *di;
3053 dictEntry *de;
3054 robj *lenobj = NULL, *dstset = NULL;
3055 int j, cardinality = 0;
3056
3057 if (!dv) oom("sinterGenericCommand");
3058 for (j = 0; j < setsnum; j++) {
3059 robj *setobj;
3060
3061 setobj = dstkey ?
3062 lookupKeyWrite(c->db,setskeys[j]) :
3063 lookupKeyRead(c->db,setskeys[j]);
3064 if (!setobj) {
3065 zfree(dv);
3066 if (dstkey) {
3067 deleteKey(c->db,dstkey);
3068 addReply(c,shared.ok);
3069 } else {
3070 addReply(c,shared.nullmultibulk);
3071 }
3072 return;
3073 }
3074 if (setobj->type != REDIS_SET) {
3075 zfree(dv);
3076 addReply(c,shared.wrongtypeerr);
3077 return;
3078 }
3079 dv[j] = setobj->ptr;
3080 }
3081 /* Sort sets from the smallest to largest, this will improve our
3082 * algorithm's performace */
3083 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3084
3085 /* The first thing we should output is the total number of elements...
3086 * since this is a multi-bulk write, but at this stage we don't know
3087 * the intersection set size, so we use a trick, append an empty object
3088 * to the output list and save the pointer to later modify it with the
3089 * right length */
3090 if (!dstkey) {
3091 lenobj = createObject(REDIS_STRING,NULL);
3092 addReply(c,lenobj);
3093 decrRefCount(lenobj);
3094 } else {
3095 /* If we have a target key where to store the resulting set
3096 * create this key with an empty set inside */
3097 dstset = createSetObject();
3098 }
3099
3100 /* Iterate all the elements of the first (smallest) set, and test
3101 * the element against all the other sets, if at least one set does
3102 * not include the element it is discarded */
3103 di = dictGetIterator(dv[0]);
3104 if (!di) oom("dictGetIterator");
3105
3106 while((de = dictNext(di)) != NULL) {
3107 robj *ele;
3108
3109 for (j = 1; j < setsnum; j++)
3110 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3111 if (j != setsnum)
3112 continue; /* at least one set does not contain the member */
3113 ele = dictGetEntryKey(de);
3114 if (!dstkey) {
3115 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
3116 addReply(c,ele);
3117 addReply(c,shared.crlf);
3118 cardinality++;
3119 } else {
3120 dictAdd(dstset->ptr,ele,NULL);
3121 incrRefCount(ele);
3122 }
3123 }
3124 dictReleaseIterator(di);
3125
3126 if (dstkey) {
3127 /* Store the resulting set into the target */
3128 deleteKey(c->db,dstkey);
3129 dictAdd(c->db->dict,dstkey,dstset);
3130 incrRefCount(dstkey);
3131 }
3132
3133 if (!dstkey) {
3134 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3135 } else {
3136 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3137 dictSize((dict*)dstset->ptr)));
3138 server.dirty++;
3139 }
3140 zfree(dv);
3141 }
3142
3143 static void sinterCommand(redisClient *c) {
3144 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3145 }
3146
3147 static void sinterstoreCommand(redisClient *c) {
3148 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3149 }
3150
3151 #define REDIS_OP_UNION 0
3152 #define REDIS_OP_DIFF 1
3153
3154 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3155 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3156 dictIterator *di;
3157 dictEntry *de;
3158 robj *dstset = NULL;
3159 int j, cardinality = 0;
3160
3161 if (!dv) oom("sunionDiffGenericCommand");
3162 for (j = 0; j < setsnum; j++) {
3163 robj *setobj;
3164
3165 setobj = dstkey ?
3166 lookupKeyWrite(c->db,setskeys[j]) :
3167 lookupKeyRead(c->db,setskeys[j]);
3168 if (!setobj) {
3169 dv[j] = NULL;
3170 continue;
3171 }
3172 if (setobj->type != REDIS_SET) {
3173 zfree(dv);
3174 addReply(c,shared.wrongtypeerr);
3175 return;
3176 }
3177 dv[j] = setobj->ptr;
3178 }
3179
3180 /* We need a temp set object to store our union. If the dstkey
3181 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3182 * this set object will be the resulting object to set into the target key*/
3183 dstset = createSetObject();
3184
3185 /* Iterate all the elements of all the sets, add every element a single
3186 * time to the result set */
3187 for (j = 0; j < setsnum; j++) {
3188 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3189 if (!dv[j]) continue; /* non existing keys are like empty sets */
3190
3191 di = dictGetIterator(dv[j]);
3192 if (!di) oom("dictGetIterator");
3193
3194 while((de = dictNext(di)) != NULL) {
3195 robj *ele;
3196
3197 /* dictAdd will not add the same element multiple times */
3198 ele = dictGetEntryKey(de);
3199 if (op == REDIS_OP_UNION || j == 0) {
3200 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3201 incrRefCount(ele);
3202 cardinality++;
3203 }
3204 } else if (op == REDIS_OP_DIFF) {
3205 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3206 cardinality--;
3207 }
3208 }
3209 }
3210 dictReleaseIterator(di);
3211
3212 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3213 }
3214
3215 /* Output the content of the resulting set, if not in STORE mode */
3216 if (!dstkey) {
3217 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3218 di = dictGetIterator(dstset->ptr);
3219 if (!di) oom("dictGetIterator");
3220 while((de = dictNext(di)) != NULL) {
3221 robj *ele;
3222
3223 ele = dictGetEntryKey(de);
3224 addReplySds(c,sdscatprintf(sdsempty(),
3225 "$%d\r\n",sdslen(ele->ptr)));
3226 addReply(c,ele);
3227 addReply(c,shared.crlf);
3228 }
3229 dictReleaseIterator(di);
3230 } else {
3231 /* If we have a target key where to store the resulting set
3232 * create this key with the result set inside */
3233 deleteKey(c->db,dstkey);
3234 dictAdd(c->db->dict,dstkey,dstset);
3235 incrRefCount(dstkey);
3236 }
3237
3238 /* Cleanup */
3239 if (!dstkey) {
3240 decrRefCount(dstset);
3241 } else {
3242 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3243 dictSize((dict*)dstset->ptr)));
3244 server.dirty++;
3245 }
3246 zfree(dv);
3247 }
3248
3249 static void sunionCommand(redisClient *c) {
3250 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3251 }
3252
3253 static void sunionstoreCommand(redisClient *c) {
3254 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3255 }
3256
3257 static void sdiffCommand(redisClient *c) {
3258 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3259 }
3260
3261 static void sdiffstoreCommand(redisClient *c) {
3262 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3263 }
3264
3265 static void flushdbCommand(redisClient *c) {
3266 server.dirty += dictSize(c->db->dict);
3267 dictEmpty(c->db->dict);
3268 dictEmpty(c->db->expires);
3269 addReply(c,shared.ok);
3270 }
3271
3272 static void flushallCommand(redisClient *c) {
3273 server.dirty += emptyDb();
3274 addReply(c,shared.ok);
3275 rdbSave(server.dbfilename);
3276 server.dirty++;
3277 }
3278
3279 static redisSortOperation *createSortOperation(int type, robj *pattern) {
3280 redisSortOperation *so = zmalloc(sizeof(*so));
3281 if (!so) oom("createSortOperation");
3282 so->type = type;
3283 so->pattern = pattern;
3284 return so;
3285 }
3286
3287 /* Return the value associated to the key with a name obtained
3288 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3289 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3290 char *p;
3291 sds spat, ssub;
3292 robj keyobj;
3293 int prefixlen, sublen, postfixlen;
3294 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3295 struct {
3296 long len;
3297 long free;
3298 char buf[REDIS_SORTKEY_MAX+1];
3299 } keyname;
3300
3301 spat = pattern->ptr;
3302 ssub = subst->ptr;
3303 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3304 p = strchr(spat,'*');
3305 if (!p) return NULL;
3306
3307 prefixlen = p-spat;
3308 sublen = sdslen(ssub);
3309 postfixlen = sdslen(spat)-(prefixlen+1);
3310 memcpy(keyname.buf,spat,prefixlen);
3311 memcpy(keyname.buf+prefixlen,ssub,sublen);
3312 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3313 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3314 keyname.len = prefixlen+sublen+postfixlen;
3315
3316 keyobj.refcount = 1;
3317 keyobj.type = REDIS_STRING;
3318 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3319
3320 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3321 return lookupKeyRead(db,&keyobj);
3322 }
3323
3324 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3325 * the additional parameter is not standard but a BSD-specific we have to
3326 * pass sorting parameters via the global 'server' structure */
3327 static int sortCompare(const void *s1, const void *s2) {
3328 const redisSortObject *so1 = s1, *so2 = s2;
3329 int cmp;
3330
3331 if (!server.sort_alpha) {
3332 /* Numeric sorting. Here it's trivial as we precomputed scores */
3333 if (so1->u.score > so2->u.score) {
3334 cmp = 1;
3335 } else if (so1->u.score < so2->u.score) {
3336 cmp = -1;
3337 } else {
3338 cmp = 0;
3339 }
3340 } else {
3341 /* Alphanumeric sorting */
3342 if (server.sort_bypattern) {
3343 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3344 /* At least one compare object is NULL */
3345 if (so1->u.cmpobj == so2->u.cmpobj)
3346 cmp = 0;
3347 else if (so1->u.cmpobj == NULL)
3348 cmp = -1;
3349 else
3350 cmp = 1;
3351 } else {
3352 /* We have both the objects, use strcoll */
3353 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3354 }
3355 } else {
3356 /* Compare elements directly */
3357 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3358 }
3359 }
3360 return server.sort_desc ? -cmp : cmp;
3361 }
3362
3363 /* The SORT command is the most complex command in Redis. Warning: this code
3364 * is optimized for speed and a bit less for readability */
3365 static void sortCommand(redisClient *c) {
3366 list *operations;
3367 int outputlen = 0;
3368 int desc = 0, alpha = 0;
3369 int limit_start = 0, limit_count = -1, start, end;
3370 int j, dontsort = 0, vectorlen;
3371 int getop = 0; /* GET operation counter */
3372 robj *sortval, *sortby = NULL;
3373 redisSortObject *vector; /* Resulting vector to sort */
3374
3375 /* Lookup the key to sort. It must be of the right types */
3376 sortval = lookupKeyRead(c->db,c->argv[1]);
3377 if (sortval == NULL) {
3378 addReply(c,shared.nokeyerr);
3379 return;
3380 }
3381 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3382 addReply(c,shared.wrongtypeerr);
3383 return;
3384 }
3385
3386 /* Create a list of operations to perform for every sorted element.
3387 * Operations can be GET/DEL/INCR/DECR */
3388 operations = listCreate();
3389 listSetFreeMethod(operations,zfree);
3390 j = 2;
3391
3392 /* Now we need to protect sortval incrementing its count, in the future
3393 * SORT may have options able to overwrite/delete keys during the sorting
3394 * and the sorted key itself may get destroied */
3395 incrRefCount(sortval);
3396
3397 /* The SORT command has an SQL-alike syntax, parse it */
3398 while(j < c->argc) {
3399 int leftargs = c->argc-j-1;
3400 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3401 desc = 0;
3402 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3403 desc = 1;
3404 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3405 alpha = 1;
3406 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3407 limit_start = atoi(c->argv[j+1]->ptr);
3408 limit_count = atoi(c->argv[j+2]->ptr);
3409 j+=2;
3410 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3411 sortby = c->argv[j+1];
3412 /* If the BY pattern does not contain '*', i.e. it is constant,
3413 * we don't need to sort nor to lookup the weight keys. */
3414 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3415 j++;
3416 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3417 listAddNodeTail(operations,createSortOperation(
3418 REDIS_SORT_GET,c->argv[j+1]));
3419 getop++;
3420 j++;
3421 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3422 listAddNodeTail(operations,createSortOperation(
3423 REDIS_SORT_DEL,c->argv[j+1]));
3424 j++;
3425 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3426 listAddNodeTail(operations,createSortOperation(
3427 REDIS_SORT_INCR,c->argv[j+1]));
3428 j++;
3429 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3430 listAddNodeTail(operations,createSortOperation(
3431 REDIS_SORT_DECR,c->argv[j+1]));
3432 j++;
3433 } else {
3434 decrRefCount(sortval);
3435 listRelease(operations);
3436 addReply(c,shared.syntaxerr);
3437 return;
3438 }
3439 j++;
3440 }
3441
3442 /* Load the sorting vector with all the objects to sort */
3443 vectorlen = (sortval->type == REDIS_LIST) ?
3444 listLength((list*)sortval->ptr) :
3445 dictSize((dict*)sortval->ptr);
3446 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3447 if (!vector) oom("allocating objects vector for SORT");
3448 j = 0;
3449 if (sortval->type == REDIS_LIST) {
3450 list *list = sortval->ptr;
3451 listNode *ln;
3452
3453 listRewind(list);
3454 while((ln = listYield(list))) {
3455 robj *ele = ln->value;
3456 vector[j].obj = ele;
3457 vector[j].u.score = 0;
3458 vector[j].u.cmpobj = NULL;
3459 j++;
3460 }
3461 } else {
3462 dict *set = sortval->ptr;
3463 dictIterator *di;
3464 dictEntry *setele;
3465
3466 di = dictGetIterator(set);
3467 if (!di) oom("dictGetIterator");
3468 while((setele = dictNext(di)) != NULL) {
3469 vector[j].obj = dictGetEntryKey(setele);
3470 vector[j].u.score = 0;
3471 vector[j].u.cmpobj = NULL;
3472 j++;
3473 }
3474 dictReleaseIterator(di);
3475 }
3476 assert(j == vectorlen);
3477
3478 /* Now it's time to load the right scores in the sorting vector */
3479 if (dontsort == 0) {
3480 for (j = 0; j < vectorlen; j++) {
3481 if (sortby) {
3482 robj *byval;
3483
3484 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3485 if (!byval || byval->type != REDIS_STRING) continue;
3486 if (alpha) {
3487 vector[j].u.cmpobj = byval;
3488 incrRefCount(byval);
3489 } else {
3490 vector[j].u.score = strtod(byval->ptr,NULL);
3491 }
3492 } else {
3493 if (!alpha) vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3494 }
3495 }
3496 }
3497
3498 /* We are ready to sort the vector... perform a bit of sanity check
3499 * on the LIMIT option too. We'll use a partial version of quicksort. */
3500 start = (limit_start < 0) ? 0 : limit_start;
3501 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3502 if (start >= vectorlen) {
3503 start = vectorlen-1;
3504 end = vectorlen-2;
3505 }
3506 if (end >= vectorlen) end = vectorlen-1;
3507
3508 if (dontsort == 0) {
3509 server.sort_desc = desc;
3510 server.sort_alpha = alpha;
3511 server.sort_bypattern = sortby ? 1 : 0;
3512 if (sortby && (start != 0 || end != vectorlen-1))
3513 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3514 else
3515 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3516 }
3517
3518 /* Send command output to the output buffer, performing the specified
3519 * GET/DEL/INCR/DECR operations if any. */
3520 outputlen = getop ? getop*(end-start+1) : end-start+1;
3521 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3522 for (j = start; j <= end; j++) {
3523 listNode *ln;
3524 if (!getop) {
3525 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3526 sdslen(vector[j].obj->ptr)));
3527 addReply(c,vector[j].obj);
3528 addReply(c,shared.crlf);
3529 }
3530 listRewind(operations);
3531 while((ln = listYield(operations))) {
3532 redisSortOperation *sop = ln->value;
3533 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3534 vector[j].obj);
3535
3536 if (sop->type == REDIS_SORT_GET) {
3537 if (!val || val->type != REDIS_STRING) {
3538 addReply(c,shared.nullbulk);
3539 } else {
3540 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3541 sdslen(val->ptr)));
3542 addReply(c,val);
3543 addReply(c,shared.crlf);
3544 }
3545 } else if (sop->type == REDIS_SORT_DEL) {
3546 /* TODO */
3547 }
3548 }
3549 }
3550
3551 /* Cleanup */
3552 decrRefCount(sortval);
3553 listRelease(operations);
3554 for (j = 0; j < vectorlen; j++) {
3555 if (sortby && alpha && vector[j].u.cmpobj)
3556 decrRefCount(vector[j].u.cmpobj);
3557 }
3558 zfree(vector);
3559 }
3560
3561 static void infoCommand(redisClient *c) {
3562 sds info;
3563 time_t uptime = time(NULL)-server.stat_starttime;
3564
3565 info = sdscatprintf(sdsempty(),
3566 "redis_version:%s\r\n"
3567 "uptime_in_seconds:%d\r\n"
3568 "uptime_in_days:%d\r\n"
3569 "connected_clients:%d\r\n"
3570 "connected_slaves:%d\r\n"
3571 "used_memory:%zu\r\n"
3572 "changes_since_last_save:%lld\r\n"
3573 "bgsave_in_progress:%d\r\n"
3574 "last_save_time:%d\r\n"
3575 "total_connections_received:%lld\r\n"
3576 "total_commands_processed:%lld\r\n"
3577 "role:%s\r\n"
3578 ,REDIS_VERSION,
3579 uptime,
3580 uptime/(3600*24),
3581 listLength(server.clients)-listLength(server.slaves),
3582 listLength(server.slaves),
3583 server.usedmemory,
3584 server.dirty,
3585 server.bgsaveinprogress,
3586 server.lastsave,
3587 server.stat_numconnections,
3588 server.stat_numcommands,
3589 server.masterhost == NULL ? "master" : "slave"
3590 );
3591 if (server.masterhost) {
3592 info = sdscatprintf(info,
3593 "master_host:%s\r\n"
3594 "master_port:%d\r\n"
3595 "master_link_status:%s\r\n"
3596 "master_last_io_seconds_ago:%d\r\n"
3597 ,server.masterhost,
3598 server.masterport,
3599 (server.replstate == REDIS_REPL_CONNECTED) ?
3600 "up" : "down",
3601 (int)(time(NULL)-server.master->lastinteraction)
3602 );
3603 }
3604 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
3605 addReplySds(c,info);
3606 addReply(c,shared.crlf);
3607 }
3608
3609 static void monitorCommand(redisClient *c) {
3610 /* ignore MONITOR if aleady slave or in monitor mode */
3611 if (c->flags & REDIS_SLAVE) return;
3612
3613 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3614 c->slaveseldb = 0;
3615 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3616 addReply(c,shared.ok);
3617 }
3618
3619 /* ================================= Expire ================================= */
3620 static int removeExpire(redisDb *db, robj *key) {
3621 if (dictDelete(db->expires,key) == DICT_OK) {
3622 return 1;
3623 } else {
3624 return 0;
3625 }
3626 }
3627
3628 static int setExpire(redisDb *db, robj *key, time_t when) {
3629 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
3630 return 0;
3631 } else {
3632 incrRefCount(key);
3633 return 1;
3634 }
3635 }
3636
3637 /* Return the expire time of the specified key, or -1 if no expire
3638 * is associated with this key (i.e. the key is non volatile) */
3639 static time_t getExpire(redisDb *db, robj *key) {
3640 dictEntry *de;
3641
3642 /* No expire? return ASAP */
3643 if (dictSize(db->expires) == 0 ||
3644 (de = dictFind(db->expires,key)) == NULL) return -1;
3645
3646 return (time_t) dictGetEntryVal(de);
3647 }
3648
3649 static int expireIfNeeded(redisDb *db, robj *key) {
3650 time_t when;
3651 dictEntry *de;
3652
3653 /* No expire? return ASAP */
3654 if (dictSize(db->expires) == 0 ||
3655 (de = dictFind(db->expires,key)) == NULL) return 0;
3656
3657 /* Lookup the expire */
3658 when = (time_t) dictGetEntryVal(de);
3659 if (time(NULL) <= when) return 0;
3660
3661 /* Delete the key */
3662 dictDelete(db->expires,key);
3663 return dictDelete(db->dict,key) == DICT_OK;
3664 }
3665
3666 static int deleteIfVolatile(redisDb *db, robj *key) {
3667 dictEntry *de;
3668
3669 /* No expire? return ASAP */
3670 if (dictSize(db->expires) == 0 ||
3671 (de = dictFind(db->expires,key)) == NULL) return 0;
3672
3673 /* Delete the key */
3674 server.dirty++;
3675 dictDelete(db->expires,key);
3676 return dictDelete(db->dict,key) == DICT_OK;
3677 }
3678
3679 static void expireCommand(redisClient *c) {
3680 dictEntry *de;
3681 int seconds = atoi(c->argv[2]->ptr);
3682
3683 de = dictFind(c->db->dict,c->argv[1]);
3684 if (de == NULL) {
3685 addReply(c,shared.czero);
3686 return;
3687 }
3688 if (seconds <= 0) {
3689 addReply(c, shared.czero);
3690 return;
3691 } else {
3692 time_t when = time(NULL)+seconds;
3693 if (setExpire(c->db,c->argv[1],when))
3694 addReply(c,shared.cone);
3695 else
3696 addReply(c,shared.czero);
3697 return;
3698 }
3699 }
3700
3701 static void ttlCommand(redisClient *c) {
3702 time_t expire;
3703 int ttl = -1;
3704
3705 expire = getExpire(c->db,c->argv[1]);
3706 if (expire != -1) {
3707 ttl = (int) (expire-time(NULL));
3708 if (ttl < 0) ttl = -1;
3709 }
3710 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
3711 }
3712
3713 /* =============================== Replication ============================= */
3714
3715 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
3716 ssize_t nwritten, ret = size;
3717 time_t start = time(NULL);
3718
3719 timeout++;
3720 while(size) {
3721 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
3722 nwritten = write(fd,ptr,size);
3723 if (nwritten == -1) return -1;
3724 ptr += nwritten;
3725 size -= nwritten;
3726 }
3727 if ((time(NULL)-start) > timeout) {
3728 errno = ETIMEDOUT;
3729 return -1;
3730 }
3731 }
3732 return ret;
3733 }
3734
3735 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
3736 ssize_t nread, totread = 0;
3737 time_t start = time(NULL);
3738
3739 timeout++;
3740 while(size) {
3741 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
3742 nread = read(fd,ptr,size);
3743 if (nread == -1) return -1;
3744 ptr += nread;
3745 size -= nread;
3746 totread += nread;
3747 }
3748 if ((time(NULL)-start) > timeout) {
3749 errno = ETIMEDOUT;
3750 return -1;
3751 }
3752 }
3753 return totread;
3754 }
3755
3756 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
3757 ssize_t nread = 0;
3758
3759 size--;
3760 while(size) {
3761 char c;
3762
3763 if (syncRead(fd,&c,1,timeout) == -1) return -1;
3764 if (c == '\n') {
3765 *ptr = '\0';
3766 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
3767 return nread;
3768 } else {
3769 *ptr++ = c;
3770 *ptr = '\0';
3771 nread++;
3772 }
3773 }
3774 return nread;
3775 }
3776
3777 static void syncCommand(redisClient *c) {
3778 /* ignore SYNC if aleady slave or in monitor mode */
3779 if (c->flags & REDIS_SLAVE) return;
3780
3781 /* SYNC can't be issued when the server has pending data to send to
3782 * the client about already issued commands. We need a fresh reply
3783 * buffer registering the differences between the BGSAVE and the current
3784 * dataset, so that we can copy to other slaves if needed. */
3785 if (listLength(c->reply) != 0) {
3786 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3787 return;
3788 }
3789
3790 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
3791 /* Here we need to check if there is a background saving operation
3792 * in progress, or if it is required to start one */
3793 if (server.bgsaveinprogress) {
3794 /* Ok a background save is in progress. Let's check if it is a good
3795 * one for replication, i.e. if there is another slave that is
3796 * registering differences since the server forked to save */
3797 redisClient *slave;
3798 listNode *ln;
3799
3800 listRewind(server.slaves);
3801 while((ln = listYield(server.slaves))) {
3802 slave = ln->value;
3803 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
3804 }
3805 if (ln) {
3806 /* Perfect, the server is already registering differences for
3807 * another slave. Set the right state, and copy the buffer. */
3808 listRelease(c->reply);
3809 c->reply = listDup(slave->reply);
3810 if (!c->reply) oom("listDup copying slave reply list");
3811 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3812 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
3813 } else {
3814 /* No way, we need to wait for the next BGSAVE in order to
3815 * register differences */
3816 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
3817 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
3818 }
3819 } else {
3820 /* Ok we don't have a BGSAVE in progress, let's start one */
3821 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
3822 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3823 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
3824 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
3825 return;
3826 }
3827 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3828 }
3829 c->repldbfd = -1;
3830 c->flags |= REDIS_SLAVE;
3831 c->slaveseldb = 0;
3832 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
3833 return;
3834 }
3835
3836 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
3837 redisClient *slave = privdata;
3838 REDIS_NOTUSED(el);
3839 REDIS_NOTUSED(mask);
3840 char buf[REDIS_IOBUF_LEN];
3841 ssize_t nwritten, buflen;
3842
3843 if (slave->repldboff == 0) {
3844 /* Write the bulk write count before to transfer the DB. In theory here
3845 * we don't know how much room there is in the output buffer of the
3846 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3847 * operations) will never be smaller than the few bytes we need. */
3848 sds bulkcount;
3849
3850 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3851 slave->repldbsize);
3852 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
3853 {
3854 sdsfree(bulkcount);
3855 freeClient(slave);
3856 return;
3857 }
3858 sdsfree(bulkcount);
3859 }
3860 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
3861 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
3862 if (buflen <= 0) {
3863 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
3864 (buflen == 0) ? "premature EOF" : strerror(errno));
3865 freeClient(slave);
3866 return;
3867 }
3868 if ((nwritten = write(fd,buf,buflen)) == -1) {
3869 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
3870 strerror(errno));
3871 freeClient(slave);
3872 return;
3873 }
3874 slave->repldboff += nwritten;
3875 if (slave->repldboff == slave->repldbsize) {
3876 close(slave->repldbfd);
3877 slave->repldbfd = -1;
3878 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3879 slave->replstate = REDIS_REPL_ONLINE;
3880 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
3881 sendReplyToClient, slave, NULL) == AE_ERR) {
3882 freeClient(slave);
3883 return;
3884 }
3885 addReplySds(slave,sdsempty());
3886 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
3887 }
3888 }
3889
3890 static void updateSalvesWaitingBgsave(int bgsaveerr) {
3891 listNode *ln;
3892 int startbgsave = 0;
3893
3894 listRewind(server.slaves);
3895 while((ln = listYield(server.slaves))) {
3896 redisClient *slave = ln->value;
3897
3898 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
3899 startbgsave = 1;
3900 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3901 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
3902 struct redis_stat buf;
3903
3904 if (bgsaveerr != REDIS_OK) {
3905 freeClient(slave);
3906 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
3907 continue;
3908 }
3909 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
3910 redis_fstat(slave->repldbfd,&buf) == -1) {
3911 freeClient(slave);
3912 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
3913 continue;
3914 }
3915 slave->repldboff = 0;
3916 slave->repldbsize = buf.st_size;
3917 slave->replstate = REDIS_REPL_SEND_BULK;
3918 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3919 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
3920 freeClient(slave);
3921 continue;
3922 }
3923 }
3924 }
3925 if (startbgsave) {
3926 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3927 listRewind(server.slaves);
3928 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
3929 while((ln = listYield(server.slaves))) {
3930 redisClient *slave = ln->value;
3931
3932 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
3933 freeClient(slave);
3934 }
3935 }
3936 }
3937 }
3938
3939 static int syncWithMaster(void) {
3940 char buf[1024], tmpfile[256];
3941 int dumpsize;
3942 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
3943 int dfd;
3944
3945 if (fd == -1) {
3946 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
3947 strerror(errno));
3948 return REDIS_ERR;
3949 }
3950 /* Issue the SYNC command */
3951 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
3952 close(fd);
3953 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
3954 strerror(errno));
3955 return REDIS_ERR;
3956 }
3957 /* Read the bulk write count */
3958 if (syncReadLine(fd,buf,1024,3600) == -1) {
3959 close(fd);
3960 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
3961 strerror(errno));
3962 return REDIS_ERR;
3963 }
3964 dumpsize = atoi(buf+1);
3965 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
3966 /* Read the bulk write data on a temp file */
3967 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
3968 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
3969 if (dfd == -1) {
3970 close(fd);
3971 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
3972 return REDIS_ERR;
3973 }
3974 while(dumpsize) {
3975 int nread, nwritten;
3976
3977 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
3978 if (nread == -1) {
3979 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
3980 strerror(errno));
3981 close(fd);
3982 close(dfd);
3983 return REDIS_ERR;
3984 }
3985 nwritten = write(dfd,buf,nread);
3986 if (nwritten == -1) {
3987 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
3988 close(fd);
3989 close(dfd);
3990 return REDIS_ERR;
3991 }
3992 dumpsize -= nread;
3993 }
3994 close(dfd);
3995 if (rename(tmpfile,server.dbfilename) == -1) {
3996 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
3997 unlink(tmpfile);
3998 close(fd);
3999 return REDIS_ERR;
4000 }
4001 emptyDb();
4002 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4003 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4004 close(fd);
4005 return REDIS_ERR;
4006 }
4007 server.master = createClient(fd);
4008 server.master->flags |= REDIS_MASTER;
4009 server.replstate = REDIS_REPL_CONNECTED;
4010 return REDIS_OK;
4011 }
4012
4013 static void slaveofCommand(redisClient *c) {
4014 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4015 !strcasecmp(c->argv[2]->ptr,"one")) {
4016 if (server.masterhost) {
4017 sdsfree(server.masterhost);
4018 server.masterhost = NULL;
4019 if (server.master) freeClient(server.master);
4020 server.replstate = REDIS_REPL_NONE;
4021 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4022 }
4023 } else {
4024 sdsfree(server.masterhost);
4025 server.masterhost = sdsdup(c->argv[1]->ptr);
4026 server.masterport = atoi(c->argv[2]->ptr);
4027 if (server.master) freeClient(server.master);
4028 server.replstate = REDIS_REPL_CONNECT;
4029 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4030 server.masterhost, server.masterport);
4031 }
4032 addReply(c,shared.ok);
4033 }
4034
4035 /* ============================ Maxmemory directive ======================== */
4036
4037 /* This function gets called when 'maxmemory' is set on the config file to limit
4038 * the max memory used by the server, and we are out of memory.
4039 * This function will try to, in order:
4040 *
4041 * - Free objects from the free list
4042 * - Try to remove keys with an EXPIRE set
4043 *
4044 * It is not possible to free enough memory to reach used-memory < maxmemory
4045 * the server will start refusing commands that will enlarge even more the
4046 * memory usage.
4047 */
4048 static void freeMemoryIfNeeded(void) {
4049 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4050 if (listLength(server.objfreelist)) {
4051 robj *o;
4052
4053 listNode *head = listFirst(server.objfreelist);
4054 o = listNodeValue(head);
4055 listDelNode(server.objfreelist,head);
4056 zfree(o);
4057 } else {
4058 int j, k, freed = 0;
4059
4060 for (j = 0; j < server.dbnum; j++) {
4061 int minttl = -1;
4062 robj *minkey = NULL;
4063 struct dictEntry *de;
4064
4065 if (dictSize(server.db[j].expires)) {
4066 freed = 1;
4067 /* From a sample of three keys drop the one nearest to
4068 * the natural expire */
4069 for (k = 0; k < 3; k++) {
4070 time_t t;
4071
4072 de = dictGetRandomKey(server.db[j].expires);
4073 t = (time_t) dictGetEntryVal(de);
4074 if (minttl == -1 || t < minttl) {
4075 minkey = dictGetEntryKey(de);
4076 minttl = t;
4077 }
4078 }
4079 deleteKey(server.db+j,minkey);
4080 }
4081 }
4082 if (!freed) return; /* nothing to free... */
4083 }
4084 }
4085 }
4086
4087 /* ================================= Debugging ============================== */
4088
4089 static void debugCommand(redisClient *c) {
4090 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4091 *((char*)-1) = 'x';
4092 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4093 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4094 robj *key, *val;
4095
4096 if (!de) {
4097 addReply(c,shared.nokeyerr);
4098 return;
4099 }
4100 key = dictGetEntryKey(de);
4101 val = dictGetEntryVal(de);
4102 addReplySds(c,sdscatprintf(sdsempty(),
4103 "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
4104 key, key->refcount, val, val->refcount));
4105 } else {
4106 addReplySds(c,sdsnew(
4107 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4108 }
4109 }
4110
4111 static struct redisFunctionSym symsTable[] = {
4112 {"freeStringObject", (unsigned long)freeStringObject},
4113 {"freeListObject", (unsigned long)freeListObject},
4114 {"freeSetObject", (unsigned long)freeSetObject},
4115 {"decrRefCount", (unsigned long)decrRefCount},
4116 {"createObject", (unsigned long)createObject},
4117 {"freeClient", (unsigned long)freeClient},
4118 {"rdbLoad", (unsigned long)rdbLoad},
4119 {"addReply", (unsigned long)addReply},
4120 {"addReplySds", (unsigned long)addReplySds},
4121 {"incrRefCount", (unsigned long)incrRefCount},
4122 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
4123 {"createStringObject", (unsigned long)createStringObject},
4124 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
4125 {"syncWithMaster", (unsigned long)syncWithMaster},
4126 {"tryObjectSharing", (unsigned long)tryObjectSharing},
4127 {"removeExpire", (unsigned long)removeExpire},
4128 {"expireIfNeeded", (unsigned long)expireIfNeeded},
4129 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
4130 {"deleteKey", (unsigned long)deleteKey},
4131 {"getExpire", (unsigned long)getExpire},
4132 {"setExpire", (unsigned long)setExpire},
4133 {"updateSalvesWaitingBgsave", (unsigned long)updateSalvesWaitingBgsave},
4134 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
4135 {"authCommand", (unsigned long)authCommand},
4136 {"pingCommand", (unsigned long)pingCommand},
4137 {"echoCommand", (unsigned long)echoCommand},
4138 {"setCommand", (unsigned long)setCommand},
4139 {"setnxCommand", (unsigned long)setnxCommand},
4140 {"getCommand", (unsigned long)getCommand},
4141 {"delCommand", (unsigned long)delCommand},
4142 {"existsCommand", (unsigned long)existsCommand},
4143 {"incrCommand", (unsigned long)incrCommand},
4144 {"decrCommand", (unsigned long)decrCommand},
4145 {"incrbyCommand", (unsigned long)incrbyCommand},
4146 {"decrbyCommand", (unsigned long)decrbyCommand},
4147 {"selectCommand", (unsigned long)selectCommand},
4148 {"randomkeyCommand", (unsigned long)randomkeyCommand},
4149 {"keysCommand", (unsigned long)keysCommand},
4150 {"dbsizeCommand", (unsigned long)dbsizeCommand},
4151 {"lastsaveCommand", (unsigned long)lastsaveCommand},
4152 {"saveCommand", (unsigned long)saveCommand},
4153 {"bgsaveCommand", (unsigned long)bgsaveCommand},
4154 {"shutdownCommand", (unsigned long)shutdownCommand},
4155 {"moveCommand", (unsigned long)moveCommand},
4156 {"renameCommand", (unsigned long)renameCommand},
4157 {"renamenxCommand", (unsigned long)renamenxCommand},
4158 {"lpushCommand", (unsigned long)lpushCommand},
4159 {"rpushCommand", (unsigned long)rpushCommand},
4160 {"lpopCommand", (unsigned long)lpopCommand},
4161 {"rpopCommand", (unsigned long)rpopCommand},
4162 {"llenCommand", (unsigned long)llenCommand},
4163 {"lindexCommand", (unsigned long)lindexCommand},
4164 {"lrangeCommand", (unsigned long)lrangeCommand},
4165 {"ltrimCommand", (unsigned long)ltrimCommand},
4166 {"typeCommand", (unsigned long)typeCommand},
4167 {"lsetCommand", (unsigned long)lsetCommand},
4168 {"saddCommand", (unsigned long)saddCommand},
4169 {"sremCommand", (unsigned long)sremCommand},
4170 {"smoveCommand", (unsigned long)smoveCommand},
4171 {"sismemberCommand", (unsigned long)sismemberCommand},
4172 {"scardCommand", (unsigned long)scardCommand},
4173 {"sinterCommand", (unsigned long)sinterCommand},
4174 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
4175 {"sunionCommand", (unsigned long)sunionCommand},
4176 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
4177 {"sdiffCommand", (unsigned long)sdiffCommand},
4178 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
4179 {"syncCommand", (unsigned long)syncCommand},
4180 {"flushdbCommand", (unsigned long)flushdbCommand},
4181 {"flushallCommand", (unsigned long)flushallCommand},
4182 {"sortCommand", (unsigned long)sortCommand},
4183 {"lremCommand", (unsigned long)lremCommand},
4184 {"infoCommand", (unsigned long)infoCommand},
4185 {"mgetCommand", (unsigned long)mgetCommand},
4186 {"monitorCommand", (unsigned long)monitorCommand},
4187 {"expireCommand", (unsigned long)expireCommand},
4188 {"getSetCommand", (unsigned long)getSetCommand},
4189 {"ttlCommand", (unsigned long)ttlCommand},
4190 {"slaveofCommand", (unsigned long)slaveofCommand},
4191 {"debugCommand", (unsigned long)debugCommand},
4192 {"processCommand", (unsigned long)processCommand},
4193 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
4194 {"segvHandler", (unsigned long)segvHandler},
4195 {"readQueryFromClient", (unsigned long)readQueryFromClient},
4196 {NULL,0}
4197 };
4198
4199 /* This function try to convert a pointer into a function name. It's used in
4200 * oreder to provide a backtrace under segmentation fault that's able to
4201 * display functions declared as static (otherwise the backtrace is useless). */
4202 static char *findFuncName(void *pointer, unsigned long *offset){
4203 int i, ret = -1;
4204 unsigned long off, minoff = 0;
4205
4206 /* Try to match against the Symbol with the smallest offset */
4207 for (i=0; symsTable[i].pointer; i++) {
4208 unsigned long lp = (unsigned long) pointer;
4209
4210 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
4211 off=lp-symsTable[i].pointer;
4212 if (ret < 0 || off < minoff) {
4213 minoff=off;
4214 ret=i;
4215 }
4216 }
4217 }
4218 if (ret == -1) return NULL;
4219 *offset = minoff;
4220 return symsTable[ret].name;
4221 }
4222
4223 static void *getMcontextEip(ucontext_t *uc) {
4224 #if defined(__FreeBSD__)
4225 return (void*) uc->uc_mcontext.mc_eip;
4226 #elif defined(__dietlibc__)
4227 return (void*) uc->uc_mcontext.eip;
4228 #elif defined(__APPLE__)
4229 return (void*) uc->uc_mcontext->__ss.__eip;
4230 #else /* Linux */
4231 return (void*) uc->uc_mcontext.gregs[REG_EIP];
4232 #endif
4233 }
4234
4235 static void segvHandler(int sig, siginfo_t *info, void *secret) {
4236 void *trace[100];
4237 char **messages = NULL;
4238 int i, trace_size = 0;
4239 unsigned long offset=0;
4240 time_t uptime = time(NULL)-server.stat_starttime;
4241 ucontext_t *uc = (ucontext_t*) secret;
4242 REDIS_NOTUSED(info);
4243
4244 redisLog(REDIS_WARNING,
4245 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
4246 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
4247 "redis_version:%s; "
4248 "uptime_in_seconds:%d; "
4249 "connected_clients:%d; "
4250 "connected_slaves:%d; "
4251 "used_memory:%zu; "
4252 "changes_since_last_save:%lld; "
4253 "bgsave_in_progress:%d; "
4254 "last_save_time:%d; "
4255 "total_connections_received:%lld; "
4256 "total_commands_processed:%lld; "
4257 "role:%s;"
4258 ,REDIS_VERSION,
4259 uptime,
4260 listLength(server.clients)-listLength(server.slaves),
4261 listLength(server.slaves),
4262 server.usedmemory,
4263 server.dirty,
4264 server.bgsaveinprogress,
4265 server.lastsave,
4266 server.stat_numconnections,
4267 server.stat_numcommands,
4268 server.masterhost == NULL ? "master" : "slave"
4269 ));
4270
4271 trace_size = backtrace(trace, 100);
4272 /* overwrite sigaction with caller's address */
4273 trace[1] = getMcontextEip(uc);
4274 messages = backtrace_symbols(trace, trace_size);
4275
4276 for (i=0; i<trace_size; ++i) {
4277 char *fn = findFuncName(trace[i], &offset), *p;
4278
4279 p = strchr(messages[i],'+');
4280 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
4281 redisLog(REDIS_WARNING,"%s", messages[i]);
4282 } else {
4283 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
4284 }
4285 }
4286 free(messages);
4287 exit(0);
4288 }
4289
4290 static void setupSigSegvAction(void) {
4291 struct sigaction act;
4292
4293 sigemptyset (&act.sa_mask);
4294 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4295 * is used. Otherwise, sa_handler is used */
4296 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
4297 act.sa_sigaction = segvHandler;
4298 sigaction (SIGSEGV, &act, NULL);
4299 sigaction (SIGBUS, &act, NULL);
4300 }
4301
4302 /* =================================== Main! ================================ */
4303
4304 #ifdef __linux__
4305 int linuxOvercommitMemoryValue(void) {
4306 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4307 char buf[64];
4308
4309 if (!fp) return -1;
4310 if (fgets(buf,64,fp) == NULL) {
4311 fclose(fp);
4312 return -1;
4313 }
4314 fclose(fp);
4315
4316 return atoi(buf);
4317 }
4318
4319 void linuxOvercommitMemoryWarning(void) {
4320 if (linuxOvercommitMemoryValue() == 0) {
4321 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4322 }
4323 }
4324 #endif /* __linux__ */
4325
4326 static void daemonize(void) {
4327 int fd;
4328 FILE *fp;
4329
4330 if (fork() != 0) exit(0); /* parent exits */
4331 setsid(); /* create a new session */
4332
4333 /* Every output goes to /dev/null. If Redis is daemonized but
4334 * the 'logfile' is set to 'stdout' in the configuration file
4335 * it will not log at all. */
4336 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4337 dup2(fd, STDIN_FILENO);
4338 dup2(fd, STDOUT_FILENO);
4339 dup2(fd, STDERR_FILENO);
4340 if (fd > STDERR_FILENO) close(fd);
4341 }
4342 /* Try to write the pid file */
4343 fp = fopen(server.pidfile,"w");
4344 if (fp) {
4345 fprintf(fp,"%d\n",getpid());
4346 fclose(fp);
4347 }
4348 }
4349
4350 int main(int argc, char **argv) {
4351 #ifdef __linux__
4352 linuxOvercommitMemoryWarning();
4353 #endif
4354
4355 initServerConfig();
4356 if (argc == 2) {
4357 ResetServerSaveParams();
4358 loadServerConfig(argv[1]);
4359 } else if (argc > 2) {
4360 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4361 exit(1);
4362 } else {
4363 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4364 }
4365 initServer();
4366 if (server.daemonize) daemonize();
4367 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4368 if (rdbLoad(server.dbfilename) == REDIS_OK)
4369 redisLog(REDIS_NOTICE,"DB loaded from disk");
4370 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4371 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4372 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4373 aeMain(server.el);
4374 aeDeleteEventLoop(server.el);
4375 return 0;
4376 }