]> git.saurik.com Git - redis.git/blob - redis.c
ac98269670261863c786c9aa0a1c492662ac022d
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "0.101"
31
32 #include "fmacros.h"
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 #define __USE_POSIX199309
40 #include <signal.h>
41 #include <execinfo.h>
42 #include <ucontext.h>
43 #include <sys/wait.h>
44 #include <errno.h>
45 #include <assert.h>
46 #include <ctype.h>
47 #include <stdarg.h>
48 #include <inttypes.h>
49 #include <arpa/inet.h>
50 #include <sys/stat.h>
51 #include <fcntl.h>
52 #include <sys/time.h>
53 #include <sys/resource.h>
54 #include <limits.h>
55 #include <execinfo.h>
56
57 #include "redis.h"
58 #include "ae.h" /* Event driven programming library */
59 #include "sds.h" /* Dynamic safe strings */
60 #include "anet.h" /* Networking the easy way */
61 #include "dict.h" /* Hash tables */
62 #include "adlist.h" /* Linked lists */
63 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
64 #include "lzf.h" /* LZF compression library */
65 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
66
67 #include "config.h"
68
69 /* Error codes */
70 #define REDIS_OK 0
71 #define REDIS_ERR -1
72
73 /* Static server configuration */
74 #define REDIS_SERVERPORT 6379 /* TCP port */
75 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
76 #define REDIS_IOBUF_LEN 1024
77 #define REDIS_LOADBUF_LEN 1024
78 #define REDIS_STATIC_ARGS 4
79 #define REDIS_DEFAULT_DBNUM 16
80 #define REDIS_CONFIGLINE_MAX 1024
81 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
82 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
83 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
84 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
85 #define REDIS_REQUEST_MAX_SIZE (1024*1024) /* max bytes in inline command */
86
87 /* Hash table parameters */
88 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
89
90 /* Command flags */
91 #define REDIS_CMD_BULK 1 /* Bulk write command */
92 #define REDIS_CMD_INLINE 2 /* Inline command */
93 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
94 this flags will return an error when the 'maxmemory' option is set in the
95 config file and the server is using more than maxmemory bytes of memory.
96 In short this commands are denied on low memory conditions. */
97 #define REDIS_CMD_DENYOOM 4
98
99 /* Object types */
100 #define REDIS_STRING 0
101 #define REDIS_LIST 1
102 #define REDIS_SET 2
103 #define REDIS_HASH 3
104
105 /* Object types only used for dumping to disk */
106 #define REDIS_EXPIRETIME 253
107 #define REDIS_SELECTDB 254
108 #define REDIS_EOF 255
109
110 /* Defines related to the dump file format. To store 32 bits lengths for short
111 * keys requires a lot of space, so we check the most significant 2 bits of
112 * the first byte to interpreter the length:
113 *
114 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
115 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
116 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
117 * 11|000000 this means: specially encoded object will follow. The six bits
118 * number specify the kind of object that follows.
119 * See the REDIS_RDB_ENC_* defines.
120 *
121 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
122 * values, will fit inside. */
123 #define REDIS_RDB_6BITLEN 0
124 #define REDIS_RDB_14BITLEN 1
125 #define REDIS_RDB_32BITLEN 2
126 #define REDIS_RDB_ENCVAL 3
127 #define REDIS_RDB_LENERR UINT_MAX
128
129 /* When a length of a string object stored on disk has the first two bits
130 * set, the remaining two bits specify a special encoding for the object
131 * accordingly to the following defines: */
132 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
133 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
134 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
135 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
136
137 /* Client flags */
138 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
139 #define REDIS_SLAVE 2 /* This client is a slave server */
140 #define REDIS_MASTER 4 /* This client is a master server */
141 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
142
143 /* Slave replication state - slave side */
144 #define REDIS_REPL_NONE 0 /* No active replication */
145 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
146 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
147
148 /* Slave replication state - from the point of view of master
149 * Note that in SEND_BULK and ONLINE state the slave receives new updates
150 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
151 * to start the next background saving in order to send updates to it. */
152 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
153 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
154 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
155 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
156
157 /* List related stuff */
158 #define REDIS_HEAD 0
159 #define REDIS_TAIL 1
160
161 /* Sort operations */
162 #define REDIS_SORT_GET 0
163 #define REDIS_SORT_DEL 1
164 #define REDIS_SORT_INCR 2
165 #define REDIS_SORT_DECR 3
166 #define REDIS_SORT_ASC 4
167 #define REDIS_SORT_DESC 5
168 #define REDIS_SORTKEY_MAX 1024
169
170 /* Log levels */
171 #define REDIS_DEBUG 0
172 #define REDIS_NOTICE 1
173 #define REDIS_WARNING 2
174
175 /* Anti-warning macro... */
176 #define REDIS_NOTUSED(V) ((void) V)
177
178
179 /*================================= Data types ============================== */
180
181 /* A redis object, that is a type able to hold a string / list / set */
182 typedef struct redisObject {
183 void *ptr;
184 int type;
185 int refcount;
186 } robj;
187
188 typedef struct redisDb {
189 dict *dict;
190 dict *expires;
191 int id;
192 } redisDb;
193
194 /* With multiplexing we need to take per-clinet state.
195 * Clients are taken in a liked list. */
196 typedef struct redisClient {
197 int fd;
198 redisDb *db;
199 int dictid;
200 sds querybuf;
201 robj **argv;
202 int argc;
203 int bulklen; /* bulk read len. -1 if not in bulk read mode */
204 list *reply;
205 int sentlen;
206 time_t lastinteraction; /* time of the last interaction, used for timeout */
207 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
208 int slaveseldb; /* slave selected db, if this client is a slave */
209 int authenticated; /* when requirepass is non-NULL */
210 int replstate; /* replication state if this is a slave */
211 int repldbfd; /* replication DB file descriptor */
212 long repldboff; /* replication DB file offset */
213 off_t repldbsize; /* replication DB file size */
214 } redisClient;
215
216 struct saveparam {
217 time_t seconds;
218 int changes;
219 };
220
221 /* Global server state structure */
222 struct redisServer {
223 int port;
224 int fd;
225 redisDb *db;
226 dict *sharingpool;
227 unsigned int sharingpoolsize;
228 long long dirty; /* changes to DB from the last save */
229 list *clients;
230 list *slaves, *monitors;
231 char neterr[ANET_ERR_LEN];
232 aeEventLoop *el;
233 int cronloops; /* number of times the cron function run */
234 list *objfreelist; /* A list of freed objects to avoid malloc() */
235 time_t lastsave; /* Unix time of last save succeeede */
236 size_t usedmemory; /* Used memory in megabytes */
237 /* Fields used only for stats */
238 time_t stat_starttime; /* server start time */
239 long long stat_numcommands; /* number of processed commands */
240 long long stat_numconnections; /* number of connections received */
241 /* Configuration */
242 int verbosity;
243 int glueoutputbuf;
244 int maxidletime;
245 int dbnum;
246 int daemonize;
247 char *pidfile;
248 int bgsaveinprogress;
249 pid_t bgsavechildpid;
250 struct saveparam *saveparams;
251 int saveparamslen;
252 char *logfile;
253 char *bindaddr;
254 char *dbfilename;
255 char *requirepass;
256 int shareobjects;
257 /* Replication related */
258 int isslave;
259 char *masterhost;
260 int masterport;
261 redisClient *master; /* client that is master for this slave */
262 int replstate;
263 unsigned int maxclients;
264 unsigned int maxmemory;
265 /* Sort parameters - qsort_r() is only available under BSD so we
266 * have to take this state global, in order to pass it to sortCompare() */
267 int sort_desc;
268 int sort_alpha;
269 int sort_bypattern;
270 };
271
272 typedef void redisCommandProc(redisClient *c);
273 struct redisCommand {
274 char *name;
275 redisCommandProc *proc;
276 int arity;
277 int flags;
278 };
279
280 struct redisFunctionSym {
281 char *name;
282 unsigned long pointer;
283 };
284
285 typedef struct _redisSortObject {
286 robj *obj;
287 union {
288 double score;
289 robj *cmpobj;
290 } u;
291 } redisSortObject;
292
293 typedef struct _redisSortOperation {
294 int type;
295 robj *pattern;
296 } redisSortOperation;
297
298 struct sharedObjectsStruct {
299 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
300 *colon, *nullbulk, *nullmultibulk,
301 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
302 *outofrangeerr, *plus,
303 *select0, *select1, *select2, *select3, *select4,
304 *select5, *select6, *select7, *select8, *select9;
305 } shared;
306
307 /*================================ Prototypes =============================== */
308
309 static void freeStringObject(robj *o);
310 static void freeListObject(robj *o);
311 static void freeSetObject(robj *o);
312 static void decrRefCount(void *o);
313 static robj *createObject(int type, void *ptr);
314 static void freeClient(redisClient *c);
315 static int rdbLoad(char *filename);
316 static void addReply(redisClient *c, robj *obj);
317 static void addReplySds(redisClient *c, sds s);
318 static void incrRefCount(robj *o);
319 static int rdbSaveBackground(char *filename);
320 static robj *createStringObject(char *ptr, size_t len);
321 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
322 static int syncWithMaster(void);
323 static robj *tryObjectSharing(robj *o);
324 static int removeExpire(redisDb *db, robj *key);
325 static int expireIfNeeded(redisDb *db, robj *key);
326 static int deleteIfVolatile(redisDb *db, robj *key);
327 static int deleteKey(redisDb *db, robj *key);
328 static time_t getExpire(redisDb *db, robj *key);
329 static int setExpire(redisDb *db, robj *key, time_t when);
330 static void updateSalvesWaitingBgsave(int bgsaveerr);
331 static void freeMemoryIfNeeded(void);
332 static int processCommand(redisClient *c);
333 static void setupSigSegvAction(void);
334
335 static void authCommand(redisClient *c);
336 static void pingCommand(redisClient *c);
337 static void echoCommand(redisClient *c);
338 static void setCommand(redisClient *c);
339 static void setnxCommand(redisClient *c);
340 static void getCommand(redisClient *c);
341 static void delCommand(redisClient *c);
342 static void existsCommand(redisClient *c);
343 static void incrCommand(redisClient *c);
344 static void decrCommand(redisClient *c);
345 static void incrbyCommand(redisClient *c);
346 static void decrbyCommand(redisClient *c);
347 static void selectCommand(redisClient *c);
348 static void randomkeyCommand(redisClient *c);
349 static void keysCommand(redisClient *c);
350 static void dbsizeCommand(redisClient *c);
351 static void lastsaveCommand(redisClient *c);
352 static void saveCommand(redisClient *c);
353 static void bgsaveCommand(redisClient *c);
354 static void shutdownCommand(redisClient *c);
355 static void moveCommand(redisClient *c);
356 static void renameCommand(redisClient *c);
357 static void renamenxCommand(redisClient *c);
358 static void lpushCommand(redisClient *c);
359 static void rpushCommand(redisClient *c);
360 static void lpopCommand(redisClient *c);
361 static void rpopCommand(redisClient *c);
362 static void llenCommand(redisClient *c);
363 static void lindexCommand(redisClient *c);
364 static void lrangeCommand(redisClient *c);
365 static void ltrimCommand(redisClient *c);
366 static void typeCommand(redisClient *c);
367 static void lsetCommand(redisClient *c);
368 static void saddCommand(redisClient *c);
369 static void sremCommand(redisClient *c);
370 static void smoveCommand(redisClient *c);
371 static void sismemberCommand(redisClient *c);
372 static void scardCommand(redisClient *c);
373 static void spopCommand(redisClient *c);
374 static void sinterCommand(redisClient *c);
375 static void sinterstoreCommand(redisClient *c);
376 static void sunionCommand(redisClient *c);
377 static void sunionstoreCommand(redisClient *c);
378 static void sdiffCommand(redisClient *c);
379 static void sdiffstoreCommand(redisClient *c);
380 static void syncCommand(redisClient *c);
381 static void flushdbCommand(redisClient *c);
382 static void flushallCommand(redisClient *c);
383 static void sortCommand(redisClient *c);
384 static void lremCommand(redisClient *c);
385 static void infoCommand(redisClient *c);
386 static void mgetCommand(redisClient *c);
387 static void monitorCommand(redisClient *c);
388 static void expireCommand(redisClient *c);
389 static void getSetCommand(redisClient *c);
390 static void ttlCommand(redisClient *c);
391 static void slaveofCommand(redisClient *c);
392 static void debugCommand(redisClient *c);
393 /*================================= Globals ================================= */
394
395 /* Global vars */
396 static struct redisServer server; /* server global state */
397 static struct redisCommand cmdTable[] = {
398 {"get",getCommand,2,REDIS_CMD_INLINE},
399 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
400 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
401 {"del",delCommand,-2,REDIS_CMD_INLINE},
402 {"exists",existsCommand,2,REDIS_CMD_INLINE},
403 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
404 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
405 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
406 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
407 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
408 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
409 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
410 {"llen",llenCommand,2,REDIS_CMD_INLINE},
411 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
412 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
413 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
414 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
415 {"lrem",lremCommand,4,REDIS_CMD_BULK},
416 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
417 {"srem",sremCommand,3,REDIS_CMD_BULK},
418 {"smove",smoveCommand,4,REDIS_CMD_BULK},
419 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
420 {"scard",scardCommand,2,REDIS_CMD_INLINE},
421 {"spop",spopCommand,2,REDIS_CMD_INLINE},
422 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
423 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
424 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
425 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
426 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
427 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
428 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
429 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
430 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
431 {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
432 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
433 {"select",selectCommand,2,REDIS_CMD_INLINE},
434 {"move",moveCommand,3,REDIS_CMD_INLINE},
435 {"rename",renameCommand,3,REDIS_CMD_INLINE},
436 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
437 {"expire",expireCommand,3,REDIS_CMD_INLINE},
438 {"keys",keysCommand,2,REDIS_CMD_INLINE},
439 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
440 {"auth",authCommand,2,REDIS_CMD_INLINE},
441 {"ping",pingCommand,1,REDIS_CMD_INLINE},
442 {"echo",echoCommand,2,REDIS_CMD_BULK},
443 {"save",saveCommand,1,REDIS_CMD_INLINE},
444 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
445 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
446 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
447 {"type",typeCommand,2,REDIS_CMD_INLINE},
448 {"sync",syncCommand,1,REDIS_CMD_INLINE},
449 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
450 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
451 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
452 {"info",infoCommand,1,REDIS_CMD_INLINE},
453 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
454 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
455 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
456 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
457 {NULL,NULL,0,0}
458 };
459 /*============================ Utility functions ============================ */
460
461 /* Glob-style pattern matching. */
462 int stringmatchlen(const char *pattern, int patternLen,
463 const char *string, int stringLen, int nocase)
464 {
465 while(patternLen) {
466 switch(pattern[0]) {
467 case '*':
468 while (pattern[1] == '*') {
469 pattern++;
470 patternLen--;
471 }
472 if (patternLen == 1)
473 return 1; /* match */
474 while(stringLen) {
475 if (stringmatchlen(pattern+1, patternLen-1,
476 string, stringLen, nocase))
477 return 1; /* match */
478 string++;
479 stringLen--;
480 }
481 return 0; /* no match */
482 break;
483 case '?':
484 if (stringLen == 0)
485 return 0; /* no match */
486 string++;
487 stringLen--;
488 break;
489 case '[':
490 {
491 int not, match;
492
493 pattern++;
494 patternLen--;
495 not = pattern[0] == '^';
496 if (not) {
497 pattern++;
498 patternLen--;
499 }
500 match = 0;
501 while(1) {
502 if (pattern[0] == '\\') {
503 pattern++;
504 patternLen--;
505 if (pattern[0] == string[0])
506 match = 1;
507 } else if (pattern[0] == ']') {
508 break;
509 } else if (patternLen == 0) {
510 pattern--;
511 patternLen++;
512 break;
513 } else if (pattern[1] == '-' && patternLen >= 3) {
514 int start = pattern[0];
515 int end = pattern[2];
516 int c = string[0];
517 if (start > end) {
518 int t = start;
519 start = end;
520 end = t;
521 }
522 if (nocase) {
523 start = tolower(start);
524 end = tolower(end);
525 c = tolower(c);
526 }
527 pattern += 2;
528 patternLen -= 2;
529 if (c >= start && c <= end)
530 match = 1;
531 } else {
532 if (!nocase) {
533 if (pattern[0] == string[0])
534 match = 1;
535 } else {
536 if (tolower((int)pattern[0]) == tolower((int)string[0]))
537 match = 1;
538 }
539 }
540 pattern++;
541 patternLen--;
542 }
543 if (not)
544 match = !match;
545 if (!match)
546 return 0; /* no match */
547 string++;
548 stringLen--;
549 break;
550 }
551 case '\\':
552 if (patternLen >= 2) {
553 pattern++;
554 patternLen--;
555 }
556 /* fall through */
557 default:
558 if (!nocase) {
559 if (pattern[0] != string[0])
560 return 0; /* no match */
561 } else {
562 if (tolower((int)pattern[0]) != tolower((int)string[0]))
563 return 0; /* no match */
564 }
565 string++;
566 stringLen--;
567 break;
568 }
569 pattern++;
570 patternLen--;
571 if (stringLen == 0) {
572 while(*pattern == '*') {
573 pattern++;
574 patternLen--;
575 }
576 break;
577 }
578 }
579 if (patternLen == 0 && stringLen == 0)
580 return 1;
581 return 0;
582 }
583
584 static void redisLog(int level, const char *fmt, ...) {
585 va_list ap;
586 FILE *fp;
587
588 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
589 if (!fp) return;
590
591 va_start(ap, fmt);
592 if (level >= server.verbosity) {
593 char *c = ".-*";
594 char buf[64];
595 time_t now;
596
597 now = time(NULL);
598 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
599 fprintf(fp,"%s %c ",buf,c[level]);
600 vfprintf(fp, fmt, ap);
601 fprintf(fp,"\n");
602 fflush(fp);
603 }
604 va_end(ap);
605
606 if (server.logfile) fclose(fp);
607 }
608
609 /*====================== Hash table type implementation ==================== */
610
611 /* This is an hash table type that uses the SDS dynamic strings libary as
612 * keys and radis objects as values (objects can hold SDS strings,
613 * lists, sets). */
614
615 static int sdsDictKeyCompare(void *privdata, const void *key1,
616 const void *key2)
617 {
618 int l1,l2;
619 DICT_NOTUSED(privdata);
620
621 l1 = sdslen((sds)key1);
622 l2 = sdslen((sds)key2);
623 if (l1 != l2) return 0;
624 return memcmp(key1, key2, l1) == 0;
625 }
626
627 static void dictRedisObjectDestructor(void *privdata, void *val)
628 {
629 DICT_NOTUSED(privdata);
630
631 decrRefCount(val);
632 }
633
634 static int dictSdsKeyCompare(void *privdata, const void *key1,
635 const void *key2)
636 {
637 const robj *o1 = key1, *o2 = key2;
638 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
639 }
640
641 static unsigned int dictSdsHash(const void *key) {
642 const robj *o = key;
643 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
644 }
645
646 static dictType setDictType = {
647 dictSdsHash, /* hash function */
648 NULL, /* key dup */
649 NULL, /* val dup */
650 dictSdsKeyCompare, /* key compare */
651 dictRedisObjectDestructor, /* key destructor */
652 NULL /* val destructor */
653 };
654
655 static dictType hashDictType = {
656 dictSdsHash, /* hash function */
657 NULL, /* key dup */
658 NULL, /* val dup */
659 dictSdsKeyCompare, /* key compare */
660 dictRedisObjectDestructor, /* key destructor */
661 dictRedisObjectDestructor /* val destructor */
662 };
663
664 /* ========================= Random utility functions ======================= */
665
666 /* Redis generally does not try to recover from out of memory conditions
667 * when allocating objects or strings, it is not clear if it will be possible
668 * to report this condition to the client since the networking layer itself
669 * is based on heap allocation for send buffers, so we simply abort.
670 * At least the code will be simpler to read... */
671 static void oom(const char *msg) {
672 fprintf(stderr, "%s: Out of memory\n",msg);
673 fflush(stderr);
674 sleep(1);
675 abort();
676 }
677
678 /* ====================== Redis server networking stuff ===================== */
679 static void closeTimedoutClients(void) {
680 redisClient *c;
681 listNode *ln;
682 time_t now = time(NULL);
683
684 listRewind(server.clients);
685 while ((ln = listYield(server.clients)) != NULL) {
686 c = listNodeValue(ln);
687 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
688 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
689 (now - c->lastinteraction > server.maxidletime)) {
690 redisLog(REDIS_DEBUG,"Closing idle client");
691 freeClient(c);
692 }
693 }
694 }
695
696 static int htNeedsResize(dict *dict) {
697 long long size, used;
698
699 size = dictSlots(dict);
700 used = dictSize(dict);
701 return (size && used && size > DICT_HT_INITIAL_SIZE &&
702 (used*100/size < REDIS_HT_MINFILL));
703 }
704
705 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
706 * we resize the hash table to save memory */
707 static void tryResizeHashTables(void) {
708 int j;
709
710 for (j = 0; j < server.dbnum; j++) {
711 if (htNeedsResize(server.db[j].dict)) {
712 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
713 dictResize(server.db[j].dict);
714 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
715 }
716 if (htNeedsResize(server.db[j].expires))
717 dictResize(server.db[j].expires);
718 }
719 }
720
721 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
722 int j, loops = server.cronloops++;
723 REDIS_NOTUSED(eventLoop);
724 REDIS_NOTUSED(id);
725 REDIS_NOTUSED(clientData);
726
727 /* Update the global state with the amount of used memory */
728 server.usedmemory = zmalloc_used_memory();
729
730 /* Show some info about non-empty databases */
731 for (j = 0; j < server.dbnum; j++) {
732 long long size, used, vkeys;
733
734 size = dictSlots(server.db[j].dict);
735 used = dictSize(server.db[j].dict);
736 vkeys = dictSize(server.db[j].expires);
737 if (!(loops % 5) && used > 0) {
738 redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size);
739 /* dictPrintStats(server.dict); */
740 }
741 }
742
743 /* We don't want to resize the hash tables while a bacground saving
744 * is in progress: the saving child is created using fork() that is
745 * implemented with a copy-on-write semantic in most modern systems, so
746 * if we resize the HT while there is the saving child at work actually
747 * a lot of memory movements in the parent will cause a lot of pages
748 * copied. */
749 if (!server.bgsaveinprogress) tryResizeHashTables();
750
751 /* Show information about connected clients */
752 if (!(loops % 5)) {
753 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use",
754 listLength(server.clients)-listLength(server.slaves),
755 listLength(server.slaves),
756 server.usedmemory,
757 dictSize(server.sharingpool));
758 }
759
760 /* Close connections of timedout clients */
761 if (server.maxidletime && !(loops % 10))
762 closeTimedoutClients();
763
764 /* Check if a background saving in progress terminated */
765 if (server.bgsaveinprogress) {
766 int statloc;
767 if (wait4(-1,&statloc,WNOHANG,NULL)) {
768 int exitcode = WEXITSTATUS(statloc);
769 int bysignal = WIFSIGNALED(statloc);
770
771 if (!bysignal && exitcode == 0) {
772 redisLog(REDIS_NOTICE,
773 "Background saving terminated with success");
774 server.dirty = 0;
775 server.lastsave = time(NULL);
776 } else if (!bysignal && exitcode != 0) {
777 redisLog(REDIS_WARNING, "Background saving error");
778 } else {
779 redisLog(REDIS_WARNING,
780 "Background saving terminated by signal");
781 }
782 server.bgsaveinprogress = 0;
783 server.bgsavechildpid = -1;
784 updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
785 }
786 } else {
787 /* If there is not a background saving in progress check if
788 * we have to save now */
789 time_t now = time(NULL);
790 for (j = 0; j < server.saveparamslen; j++) {
791 struct saveparam *sp = server.saveparams+j;
792
793 if (server.dirty >= sp->changes &&
794 now-server.lastsave > sp->seconds) {
795 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
796 sp->changes, sp->seconds);
797 rdbSaveBackground(server.dbfilename);
798 break;
799 }
800 }
801 }
802
803 /* Try to expire a few timed out keys */
804 for (j = 0; j < server.dbnum; j++) {
805 redisDb *db = server.db+j;
806 int num = dictSize(db->expires);
807
808 if (num) {
809 time_t now = time(NULL);
810
811 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
812 num = REDIS_EXPIRELOOKUPS_PER_CRON;
813 while (num--) {
814 dictEntry *de;
815 time_t t;
816
817 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
818 t = (time_t) dictGetEntryVal(de);
819 if (now > t) {
820 deleteKey(db,dictGetEntryKey(de));
821 }
822 }
823 }
824 }
825
826 /* Check if we should connect to a MASTER */
827 if (server.replstate == REDIS_REPL_CONNECT) {
828 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
829 if (syncWithMaster() == REDIS_OK) {
830 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
831 }
832 }
833 return 1000;
834 }
835
836 static void createSharedObjects(void) {
837 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
838 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
839 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
840 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
841 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
842 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
843 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
844 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
845 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
846 /* no such key */
847 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
848 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
849 "-ERR Operation against a key holding the wrong kind of value\r\n"));
850 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
851 "-ERR no such key\r\n"));
852 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
853 "-ERR syntax error\r\n"));
854 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
855 "-ERR source and destination objects are the same\r\n"));
856 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
857 "-ERR index out of range\r\n"));
858 shared.space = createObject(REDIS_STRING,sdsnew(" "));
859 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
860 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
861 shared.select0 = createStringObject("select 0\r\n",10);
862 shared.select1 = createStringObject("select 1\r\n",10);
863 shared.select2 = createStringObject("select 2\r\n",10);
864 shared.select3 = createStringObject("select 3\r\n",10);
865 shared.select4 = createStringObject("select 4\r\n",10);
866 shared.select5 = createStringObject("select 5\r\n",10);
867 shared.select6 = createStringObject("select 6\r\n",10);
868 shared.select7 = createStringObject("select 7\r\n",10);
869 shared.select8 = createStringObject("select 8\r\n",10);
870 shared.select9 = createStringObject("select 9\r\n",10);
871 }
872
873 static void appendServerSaveParams(time_t seconds, int changes) {
874 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
875 if (server.saveparams == NULL) oom("appendServerSaveParams");
876 server.saveparams[server.saveparamslen].seconds = seconds;
877 server.saveparams[server.saveparamslen].changes = changes;
878 server.saveparamslen++;
879 }
880
881 static void ResetServerSaveParams() {
882 zfree(server.saveparams);
883 server.saveparams = NULL;
884 server.saveparamslen = 0;
885 }
886
887 static void initServerConfig() {
888 server.dbnum = REDIS_DEFAULT_DBNUM;
889 server.port = REDIS_SERVERPORT;
890 server.verbosity = REDIS_DEBUG;
891 server.maxidletime = REDIS_MAXIDLETIME;
892 server.saveparams = NULL;
893 server.logfile = NULL; /* NULL = log on standard output */
894 server.bindaddr = NULL;
895 server.glueoutputbuf = 1;
896 server.daemonize = 0;
897 server.pidfile = "/var/run/redis.pid";
898 server.dbfilename = "dump.rdb";
899 server.requirepass = NULL;
900 server.shareobjects = 0;
901 server.maxclients = 0;
902 server.maxmemory = 0;
903 ResetServerSaveParams();
904
905 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
906 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
907 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
908 /* Replication related */
909 server.isslave = 0;
910 server.masterhost = NULL;
911 server.masterport = 6379;
912 server.master = NULL;
913 server.replstate = REDIS_REPL_NONE;
914 }
915
916 static void initServer() {
917 int j;
918
919 signal(SIGHUP, SIG_IGN);
920 signal(SIGPIPE, SIG_IGN);
921 setupSigSegvAction();
922
923 server.clients = listCreate();
924 server.slaves = listCreate();
925 server.monitors = listCreate();
926 server.objfreelist = listCreate();
927 createSharedObjects();
928 server.el = aeCreateEventLoop();
929 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
930 server.sharingpool = dictCreate(&setDictType,NULL);
931 server.sharingpoolsize = 1024;
932 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
933 oom("server initialization"); /* Fatal OOM */
934 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
935 if (server.fd == -1) {
936 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
937 exit(1);
938 }
939 for (j = 0; j < server.dbnum; j++) {
940 server.db[j].dict = dictCreate(&hashDictType,NULL);
941 server.db[j].expires = dictCreate(&setDictType,NULL);
942 server.db[j].id = j;
943 }
944 server.cronloops = 0;
945 server.bgsaveinprogress = 0;
946 server.bgsavechildpid = -1;
947 server.lastsave = time(NULL);
948 server.dirty = 0;
949 server.usedmemory = 0;
950 server.stat_numcommands = 0;
951 server.stat_numconnections = 0;
952 server.stat_starttime = time(NULL);
953 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
954 }
955
956 /* Empty the whole database */
957 static long long emptyDb() {
958 int j;
959 long long removed = 0;
960
961 for (j = 0; j < server.dbnum; j++) {
962 removed += dictSize(server.db[j].dict);
963 dictEmpty(server.db[j].dict);
964 dictEmpty(server.db[j].expires);
965 }
966 return removed;
967 }
968
969 static int yesnotoi(char *s) {
970 if (!strcasecmp(s,"yes")) return 1;
971 else if (!strcasecmp(s,"no")) return 0;
972 else return -1;
973 }
974
975 /* I agree, this is a very rudimental way to load a configuration...
976 will improve later if the config gets more complex */
977 static void loadServerConfig(char *filename) {
978 FILE *fp = fopen(filename,"r");
979 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
980 int linenum = 0;
981 sds line = NULL;
982
983 if (!fp) {
984 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
985 exit(1);
986 }
987 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
988 sds *argv;
989 int argc, j;
990
991 linenum++;
992 line = sdsnew(buf);
993 line = sdstrim(line," \t\r\n");
994
995 /* Skip comments and blank lines*/
996 if (line[0] == '#' || line[0] == '\0') {
997 sdsfree(line);
998 continue;
999 }
1000
1001 /* Split into arguments */
1002 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1003 sdstolower(argv[0]);
1004
1005 /* Execute config directives */
1006 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1007 server.maxidletime = atoi(argv[1]);
1008 if (server.maxidletime < 0) {
1009 err = "Invalid timeout value"; goto loaderr;
1010 }
1011 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1012 server.port = atoi(argv[1]);
1013 if (server.port < 1 || server.port > 65535) {
1014 err = "Invalid port"; goto loaderr;
1015 }
1016 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1017 server.bindaddr = zstrdup(argv[1]);
1018 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1019 int seconds = atoi(argv[1]);
1020 int changes = atoi(argv[2]);
1021 if (seconds < 1 || changes < 0) {
1022 err = "Invalid save parameters"; goto loaderr;
1023 }
1024 appendServerSaveParams(seconds,changes);
1025 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1026 if (chdir(argv[1]) == -1) {
1027 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1028 argv[1], strerror(errno));
1029 exit(1);
1030 }
1031 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1032 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1033 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1034 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1035 else {
1036 err = "Invalid log level. Must be one of debug, notice, warning";
1037 goto loaderr;
1038 }
1039 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1040 FILE *fp;
1041
1042 server.logfile = zstrdup(argv[1]);
1043 if (!strcasecmp(server.logfile,"stdout")) {
1044 zfree(server.logfile);
1045 server.logfile = NULL;
1046 }
1047 if (server.logfile) {
1048 /* Test if we are able to open the file. The server will not
1049 * be able to abort just for this problem later... */
1050 fp = fopen(server.logfile,"a");
1051 if (fp == NULL) {
1052 err = sdscatprintf(sdsempty(),
1053 "Can't open the log file: %s", strerror(errno));
1054 goto loaderr;
1055 }
1056 fclose(fp);
1057 }
1058 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1059 server.dbnum = atoi(argv[1]);
1060 if (server.dbnum < 1) {
1061 err = "Invalid number of databases"; goto loaderr;
1062 }
1063 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1064 server.maxclients = atoi(argv[1]);
1065 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1066 server.maxmemory = atoi(argv[1]);
1067 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1068 server.masterhost = sdsnew(argv[1]);
1069 server.masterport = atoi(argv[2]);
1070 server.replstate = REDIS_REPL_CONNECT;
1071 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1072 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1073 err = "argument must be 'yes' or 'no'"; goto loaderr;
1074 }
1075 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1076 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1077 err = "argument must be 'yes' or 'no'"; goto loaderr;
1078 }
1079 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1080 server.sharingpoolsize = atoi(argv[1]);
1081 if (server.sharingpoolsize < 1) {
1082 err = "invalid object sharing pool size"; goto loaderr;
1083 }
1084 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1085 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1086 err = "argument must be 'yes' or 'no'"; goto loaderr;
1087 }
1088 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1089 server.requirepass = zstrdup(argv[1]);
1090 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1091 server.pidfile = zstrdup(argv[1]);
1092 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1093 server.dbfilename = zstrdup(argv[1]);
1094 } else {
1095 err = "Bad directive or wrong number of arguments"; goto loaderr;
1096 }
1097 for (j = 0; j < argc; j++)
1098 sdsfree(argv[j]);
1099 zfree(argv);
1100 sdsfree(line);
1101 }
1102 fclose(fp);
1103 return;
1104
1105 loaderr:
1106 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1107 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1108 fprintf(stderr, ">>> '%s'\n", line);
1109 fprintf(stderr, "%s\n", err);
1110 exit(1);
1111 }
1112
1113 static void freeClientArgv(redisClient *c) {
1114 int j;
1115
1116 for (j = 0; j < c->argc; j++)
1117 decrRefCount(c->argv[j]);
1118 c->argc = 0;
1119 }
1120
1121 static void freeClient(redisClient *c) {
1122 listNode *ln;
1123
1124 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1125 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1126 sdsfree(c->querybuf);
1127 listRelease(c->reply);
1128 freeClientArgv(c);
1129 close(c->fd);
1130 ln = listSearchKey(server.clients,c);
1131 assert(ln != NULL);
1132 listDelNode(server.clients,ln);
1133 if (c->flags & REDIS_SLAVE) {
1134 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1135 close(c->repldbfd);
1136 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1137 ln = listSearchKey(l,c);
1138 assert(ln != NULL);
1139 listDelNode(l,ln);
1140 }
1141 if (c->flags & REDIS_MASTER) {
1142 server.master = NULL;
1143 server.replstate = REDIS_REPL_CONNECT;
1144 }
1145 zfree(c->argv);
1146 zfree(c);
1147 }
1148
1149 static void glueReplyBuffersIfNeeded(redisClient *c) {
1150 int totlen = 0;
1151 listNode *ln;
1152 robj *o;
1153
1154 listRewind(c->reply);
1155 while((ln = listYield(c->reply))) {
1156 o = ln->value;
1157 totlen += sdslen(o->ptr);
1158 /* This optimization makes more sense if we don't have to copy
1159 * too much data */
1160 if (totlen > 1024) return;
1161 }
1162 if (totlen > 0) {
1163 char buf[1024];
1164 int copylen = 0;
1165
1166 listRewind(c->reply);
1167 while((ln = listYield(c->reply))) {
1168 o = ln->value;
1169 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1170 copylen += sdslen(o->ptr);
1171 listDelNode(c->reply,ln);
1172 }
1173 /* Now the output buffer is empty, add the new single element */
1174 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1175 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1176 }
1177 }
1178
1179 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1180 redisClient *c = privdata;
1181 int nwritten = 0, totwritten = 0, objlen;
1182 robj *o;
1183 REDIS_NOTUSED(el);
1184 REDIS_NOTUSED(mask);
1185
1186 if (server.glueoutputbuf && listLength(c->reply) > 1)
1187 glueReplyBuffersIfNeeded(c);
1188 while(listLength(c->reply)) {
1189 o = listNodeValue(listFirst(c->reply));
1190 objlen = sdslen(o->ptr);
1191
1192 if (objlen == 0) {
1193 listDelNode(c->reply,listFirst(c->reply));
1194 continue;
1195 }
1196
1197 if (c->flags & REDIS_MASTER) {
1198 /* Don't reply to a master */
1199 nwritten = objlen - c->sentlen;
1200 } else {
1201 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1202 if (nwritten <= 0) break;
1203 }
1204 c->sentlen += nwritten;
1205 totwritten += nwritten;
1206 /* If we fully sent the object on head go to the next one */
1207 if (c->sentlen == objlen) {
1208 listDelNode(c->reply,listFirst(c->reply));
1209 c->sentlen = 0;
1210 }
1211 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1212 * bytes, in a single threaded server it's a good idea to server
1213 * other clients as well, even if a very large request comes from
1214 * super fast link that is always able to accept data (in real world
1215 * terms think to 'KEYS *' against the loopback interfae) */
1216 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1217 }
1218 if (nwritten == -1) {
1219 if (errno == EAGAIN) {
1220 nwritten = 0;
1221 } else {
1222 redisLog(REDIS_DEBUG,
1223 "Error writing to client: %s", strerror(errno));
1224 freeClient(c);
1225 return;
1226 }
1227 }
1228 if (totwritten > 0) c->lastinteraction = time(NULL);
1229 if (listLength(c->reply) == 0) {
1230 c->sentlen = 0;
1231 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1232 }
1233 }
1234
1235 static struct redisCommand *lookupCommand(char *name) {
1236 int j = 0;
1237 while(cmdTable[j].name != NULL) {
1238 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1239 j++;
1240 }
1241 return NULL;
1242 }
1243
1244 /* resetClient prepare the client to process the next command */
1245 static void resetClient(redisClient *c) {
1246 freeClientArgv(c);
1247 c->bulklen = -1;
1248 }
1249
1250 /* If this function gets called we already read a whole
1251 * command, argments are in the client argv/argc fields.
1252 * processCommand() execute the command or prepare the
1253 * server for a bulk read from the client.
1254 *
1255 * If 1 is returned the client is still alive and valid and
1256 * and other operations can be performed by the caller. Otherwise
1257 * if 0 is returned the client was destroied (i.e. after QUIT). */
1258 static int processCommand(redisClient *c) {
1259 struct redisCommand *cmd;
1260 long long dirty;
1261
1262 /* Free some memory if needed (maxmemory setting) */
1263 if (server.maxmemory) freeMemoryIfNeeded();
1264
1265 /* The QUIT command is handled as a special case. Normal command
1266 * procs are unable to close the client connection safely */
1267 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1268 freeClient(c);
1269 return 0;
1270 }
1271 cmd = lookupCommand(c->argv[0]->ptr);
1272 if (!cmd) {
1273 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1274 resetClient(c);
1275 return 1;
1276 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1277 (c->argc < -cmd->arity)) {
1278 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1279 resetClient(c);
1280 return 1;
1281 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1282 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1283 resetClient(c);
1284 return 1;
1285 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1286 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1287
1288 decrRefCount(c->argv[c->argc-1]);
1289 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1290 c->argc--;
1291 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1292 resetClient(c);
1293 return 1;
1294 }
1295 c->argc--;
1296 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1297 /* It is possible that the bulk read is already in the
1298 * buffer. Check this condition and handle it accordingly */
1299 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1300 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1301 c->argc++;
1302 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1303 } else {
1304 return 1;
1305 }
1306 }
1307 /* Let's try to share objects on the command arguments vector */
1308 if (server.shareobjects) {
1309 int j;
1310 for(j = 1; j < c->argc; j++)
1311 c->argv[j] = tryObjectSharing(c->argv[j]);
1312 }
1313 /* Check if the user is authenticated */
1314 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1315 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1316 resetClient(c);
1317 return 1;
1318 }
1319
1320 /* Exec the command */
1321 dirty = server.dirty;
1322 cmd->proc(c);
1323 if (server.dirty-dirty != 0 && listLength(server.slaves))
1324 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1325 if (listLength(server.monitors))
1326 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1327 server.stat_numcommands++;
1328
1329 /* Prepare the client for the next command */
1330 if (c->flags & REDIS_CLOSE) {
1331 freeClient(c);
1332 return 0;
1333 }
1334 resetClient(c);
1335 return 1;
1336 }
1337
1338 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1339 listNode *ln;
1340 int outc = 0, j;
1341 robj **outv;
1342 /* (args*2)+1 is enough room for args, spaces, newlines */
1343 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1344
1345 if (argc <= REDIS_STATIC_ARGS) {
1346 outv = static_outv;
1347 } else {
1348 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1349 if (!outv) oom("replicationFeedSlaves");
1350 }
1351
1352 for (j = 0; j < argc; j++) {
1353 if (j != 0) outv[outc++] = shared.space;
1354 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1355 robj *lenobj;
1356
1357 lenobj = createObject(REDIS_STRING,
1358 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv[j]->ptr)));
1359 lenobj->refcount = 0;
1360 outv[outc++] = lenobj;
1361 }
1362 outv[outc++] = argv[j];
1363 }
1364 outv[outc++] = shared.crlf;
1365
1366 /* Increment all the refcounts at start and decrement at end in order to
1367 * be sure to free objects if there is no slave in a replication state
1368 * able to be feed with commands */
1369 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1370 listRewind(slaves);
1371 while((ln = listYield(slaves))) {
1372 redisClient *slave = ln->value;
1373
1374 /* Don't feed slaves that are still waiting for BGSAVE to start */
1375 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1376
1377 /* Feed all the other slaves, MONITORs and so on */
1378 if (slave->slaveseldb != dictid) {
1379 robj *selectcmd;
1380
1381 switch(dictid) {
1382 case 0: selectcmd = shared.select0; break;
1383 case 1: selectcmd = shared.select1; break;
1384 case 2: selectcmd = shared.select2; break;
1385 case 3: selectcmd = shared.select3; break;
1386 case 4: selectcmd = shared.select4; break;
1387 case 5: selectcmd = shared.select5; break;
1388 case 6: selectcmd = shared.select6; break;
1389 case 7: selectcmd = shared.select7; break;
1390 case 8: selectcmd = shared.select8; break;
1391 case 9: selectcmd = shared.select9; break;
1392 default:
1393 selectcmd = createObject(REDIS_STRING,
1394 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1395 selectcmd->refcount = 0;
1396 break;
1397 }
1398 addReply(slave,selectcmd);
1399 slave->slaveseldb = dictid;
1400 }
1401 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1402 }
1403 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1404 if (outv != static_outv) zfree(outv);
1405 }
1406
1407 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1408 redisClient *c = (redisClient*) privdata;
1409 char buf[REDIS_IOBUF_LEN];
1410 int nread;
1411 REDIS_NOTUSED(el);
1412 REDIS_NOTUSED(mask);
1413
1414 nread = read(fd, buf, REDIS_IOBUF_LEN);
1415 if (nread == -1) {
1416 if (errno == EAGAIN) {
1417 nread = 0;
1418 } else {
1419 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1420 freeClient(c);
1421 return;
1422 }
1423 } else if (nread == 0) {
1424 redisLog(REDIS_DEBUG, "Client closed connection");
1425 freeClient(c);
1426 return;
1427 }
1428 if (nread) {
1429 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1430 c->lastinteraction = time(NULL);
1431 } else {
1432 return;
1433 }
1434
1435 again:
1436 if (c->bulklen == -1) {
1437 /* Read the first line of the query */
1438 char *p = strchr(c->querybuf,'\n');
1439 size_t querylen;
1440
1441 if (p) {
1442 sds query, *argv;
1443 int argc, j;
1444
1445 query = c->querybuf;
1446 c->querybuf = sdsempty();
1447 querylen = 1+(p-(query));
1448 if (sdslen(query) > querylen) {
1449 /* leave data after the first line of the query in the buffer */
1450 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1451 }
1452 *p = '\0'; /* remove "\n" */
1453 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1454 sdsupdatelen(query);
1455
1456 /* Now we can split the query in arguments */
1457 if (sdslen(query) == 0) {
1458 /* Ignore empty query */
1459 sdsfree(query);
1460 return;
1461 }
1462 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1463 if (argv == NULL) oom("sdssplitlen");
1464 sdsfree(query);
1465
1466 if (c->argv) zfree(c->argv);
1467 c->argv = zmalloc(sizeof(robj*)*argc);
1468 if (c->argv == NULL) oom("allocating arguments list for client");
1469
1470 for (j = 0; j < argc; j++) {
1471 if (sdslen(argv[j])) {
1472 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1473 c->argc++;
1474 } else {
1475 sdsfree(argv[j]);
1476 }
1477 }
1478 zfree(argv);
1479 /* Execute the command. If the client is still valid
1480 * after processCommand() return and there is something
1481 * on the query buffer try to process the next command. */
1482 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1483 return;
1484 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1485 redisLog(REDIS_DEBUG, "Client protocol error");
1486 freeClient(c);
1487 return;
1488 }
1489 } else {
1490 /* Bulk read handling. Note that if we are at this point
1491 the client already sent a command terminated with a newline,
1492 we are reading the bulk data that is actually the last
1493 argument of the command. */
1494 int qbl = sdslen(c->querybuf);
1495
1496 if (c->bulklen <= qbl) {
1497 /* Copy everything but the final CRLF as final argument */
1498 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1499 c->argc++;
1500 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1501 processCommand(c);
1502 return;
1503 }
1504 }
1505 }
1506
1507 static int selectDb(redisClient *c, int id) {
1508 if (id < 0 || id >= server.dbnum)
1509 return REDIS_ERR;
1510 c->db = &server.db[id];
1511 return REDIS_OK;
1512 }
1513
1514 static void *dupClientReplyValue(void *o) {
1515 incrRefCount((robj*)o);
1516 return 0;
1517 }
1518
1519 static redisClient *createClient(int fd) {
1520 redisClient *c = zmalloc(sizeof(*c));
1521
1522 anetNonBlock(NULL,fd);
1523 anetTcpNoDelay(NULL,fd);
1524 if (!c) return NULL;
1525 selectDb(c,0);
1526 c->fd = fd;
1527 c->querybuf = sdsempty();
1528 c->argc = 0;
1529 c->argv = NULL;
1530 c->bulklen = -1;
1531 c->sentlen = 0;
1532 c->flags = 0;
1533 c->lastinteraction = time(NULL);
1534 c->authenticated = 0;
1535 c->replstate = REDIS_REPL_NONE;
1536 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1537 listSetFreeMethod(c->reply,decrRefCount);
1538 listSetDupMethod(c->reply,dupClientReplyValue);
1539 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1540 readQueryFromClient, c, NULL) == AE_ERR) {
1541 freeClient(c);
1542 return NULL;
1543 }
1544 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1545 return c;
1546 }
1547
1548 static void addReply(redisClient *c, robj *obj) {
1549 if (listLength(c->reply) == 0 &&
1550 (c->replstate == REDIS_REPL_NONE ||
1551 c->replstate == REDIS_REPL_ONLINE) &&
1552 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1553 sendReplyToClient, c, NULL) == AE_ERR) return;
1554 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1555 incrRefCount(obj);
1556 }
1557
1558 static void addReplySds(redisClient *c, sds s) {
1559 robj *o = createObject(REDIS_STRING,s);
1560 addReply(c,o);
1561 decrRefCount(o);
1562 }
1563
1564 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1565 int cport, cfd;
1566 char cip[128];
1567 redisClient *c;
1568 REDIS_NOTUSED(el);
1569 REDIS_NOTUSED(mask);
1570 REDIS_NOTUSED(privdata);
1571
1572 cfd = anetAccept(server.neterr, fd, cip, &cport);
1573 if (cfd == AE_ERR) {
1574 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1575 return;
1576 }
1577 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1578 if ((c = createClient(cfd)) == NULL) {
1579 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1580 close(cfd); /* May be already closed, just ingore errors */
1581 return;
1582 }
1583 /* If maxclient directive is set and this is one client more... close the
1584 * connection. Note that we create the client instead to check before
1585 * for this condition, since now the socket is already set in nonblocking
1586 * mode and we can send an error for free using the Kernel I/O */
1587 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1588 char *err = "-ERR max number of clients reached\r\n";
1589
1590 /* That's a best effort error message, don't check write errors */
1591 (void) write(c->fd,err,strlen(err));
1592 freeClient(c);
1593 return;
1594 }
1595 server.stat_numconnections++;
1596 }
1597
1598 /* ======================= Redis objects implementation ===================== */
1599
1600 static robj *createObject(int type, void *ptr) {
1601 robj *o;
1602
1603 if (listLength(server.objfreelist)) {
1604 listNode *head = listFirst(server.objfreelist);
1605 o = listNodeValue(head);
1606 listDelNode(server.objfreelist,head);
1607 } else {
1608 o = zmalloc(sizeof(*o));
1609 }
1610 if (!o) oom("createObject");
1611 o->type = type;
1612 o->ptr = ptr;
1613 o->refcount = 1;
1614 return o;
1615 }
1616
1617 static robj *createStringObject(char *ptr, size_t len) {
1618 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1619 }
1620
1621 static robj *createListObject(void) {
1622 list *l = listCreate();
1623
1624 if (!l) oom("listCreate");
1625 listSetFreeMethod(l,decrRefCount);
1626 return createObject(REDIS_LIST,l);
1627 }
1628
1629 static robj *createSetObject(void) {
1630 dict *d = dictCreate(&setDictType,NULL);
1631 if (!d) oom("dictCreate");
1632 return createObject(REDIS_SET,d);
1633 }
1634
1635 static void freeStringObject(robj *o) {
1636 sdsfree(o->ptr);
1637 }
1638
1639 static void freeListObject(robj *o) {
1640 listRelease((list*) o->ptr);
1641 }
1642
1643 static void freeSetObject(robj *o) {
1644 dictRelease((dict*) o->ptr);
1645 }
1646
1647 static void freeHashObject(robj *o) {
1648 dictRelease((dict*) o->ptr);
1649 }
1650
1651 static void incrRefCount(robj *o) {
1652 o->refcount++;
1653 #ifdef DEBUG_REFCOUNT
1654 if (o->type == REDIS_STRING)
1655 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1656 #endif
1657 }
1658
1659 static void decrRefCount(void *obj) {
1660 robj *o = obj;
1661
1662 #ifdef DEBUG_REFCOUNT
1663 if (o->type == REDIS_STRING)
1664 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1665 #endif
1666 if (--(o->refcount) == 0) {
1667 switch(o->type) {
1668 case REDIS_STRING: freeStringObject(o); break;
1669 case REDIS_LIST: freeListObject(o); break;
1670 case REDIS_SET: freeSetObject(o); break;
1671 case REDIS_HASH: freeHashObject(o); break;
1672 default: assert(0 != 0); break;
1673 }
1674 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1675 !listAddNodeHead(server.objfreelist,o))
1676 zfree(o);
1677 }
1678 }
1679
1680 /* Try to share an object against the shared objects pool */
1681 static robj *tryObjectSharing(robj *o) {
1682 struct dictEntry *de;
1683 unsigned long c;
1684
1685 if (o == NULL || server.shareobjects == 0) return o;
1686
1687 assert(o->type == REDIS_STRING);
1688 de = dictFind(server.sharingpool,o);
1689 if (de) {
1690 robj *shared = dictGetEntryKey(de);
1691
1692 c = ((unsigned long) dictGetEntryVal(de))+1;
1693 dictGetEntryVal(de) = (void*) c;
1694 incrRefCount(shared);
1695 decrRefCount(o);
1696 return shared;
1697 } else {
1698 /* Here we are using a stream algorihtm: Every time an object is
1699 * shared we increment its count, everytime there is a miss we
1700 * recrement the counter of a random object. If this object reaches
1701 * zero we remove the object and put the current object instead. */
1702 if (dictSize(server.sharingpool) >=
1703 server.sharingpoolsize) {
1704 de = dictGetRandomKey(server.sharingpool);
1705 assert(de != NULL);
1706 c = ((unsigned long) dictGetEntryVal(de))-1;
1707 dictGetEntryVal(de) = (void*) c;
1708 if (c == 0) {
1709 dictDelete(server.sharingpool,de->key);
1710 }
1711 } else {
1712 c = 0; /* If the pool is empty we want to add this object */
1713 }
1714 if (c == 0) {
1715 int retval;
1716
1717 retval = dictAdd(server.sharingpool,o,(void*)1);
1718 assert(retval == DICT_OK);
1719 incrRefCount(o);
1720 }
1721 return o;
1722 }
1723 }
1724
1725 static robj *lookupKey(redisDb *db, robj *key) {
1726 dictEntry *de = dictFind(db->dict,key);
1727 return de ? dictGetEntryVal(de) : NULL;
1728 }
1729
1730 static robj *lookupKeyRead(redisDb *db, robj *key) {
1731 expireIfNeeded(db,key);
1732 return lookupKey(db,key);
1733 }
1734
1735 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1736 deleteIfVolatile(db,key);
1737 return lookupKey(db,key);
1738 }
1739
1740 static int deleteKey(redisDb *db, robj *key) {
1741 int retval;
1742
1743 /* We need to protect key from destruction: after the first dictDelete()
1744 * it may happen that 'key' is no longer valid if we don't increment
1745 * it's count. This may happen when we get the object reference directly
1746 * from the hash table with dictRandomKey() or dict iterators */
1747 incrRefCount(key);
1748 if (dictSize(db->expires)) dictDelete(db->expires,key);
1749 retval = dictDelete(db->dict,key);
1750 decrRefCount(key);
1751
1752 return retval == DICT_OK;
1753 }
1754
1755 /*============================ DB saving/loading ============================ */
1756
1757 static int rdbSaveType(FILE *fp, unsigned char type) {
1758 if (fwrite(&type,1,1,fp) == 0) return -1;
1759 return 0;
1760 }
1761
1762 static int rdbSaveTime(FILE *fp, time_t t) {
1763 int32_t t32 = (int32_t) t;
1764 if (fwrite(&t32,4,1,fp) == 0) return -1;
1765 return 0;
1766 }
1767
1768 /* check rdbLoadLen() comments for more info */
1769 static int rdbSaveLen(FILE *fp, uint32_t len) {
1770 unsigned char buf[2];
1771
1772 if (len < (1<<6)) {
1773 /* Save a 6 bit len */
1774 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
1775 if (fwrite(buf,1,1,fp) == 0) return -1;
1776 } else if (len < (1<<14)) {
1777 /* Save a 14 bit len */
1778 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
1779 buf[1] = len&0xFF;
1780 if (fwrite(buf,2,1,fp) == 0) return -1;
1781 } else {
1782 /* Save a 32 bit len */
1783 buf[0] = (REDIS_RDB_32BITLEN<<6);
1784 if (fwrite(buf,1,1,fp) == 0) return -1;
1785 len = htonl(len);
1786 if (fwrite(&len,4,1,fp) == 0) return -1;
1787 }
1788 return 0;
1789 }
1790
1791 /* String objects in the form "2391" "-100" without any space and with a
1792 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1793 * encoded as integers to save space */
1794 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
1795 long long value;
1796 char *endptr, buf[32];
1797
1798 /* Check if it's possible to encode this value as a number */
1799 value = strtoll(s, &endptr, 10);
1800 if (endptr[0] != '\0') return 0;
1801 snprintf(buf,32,"%lld",value);
1802
1803 /* If the number converted back into a string is not identical
1804 * then it's not possible to encode the string as integer */
1805 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
1806
1807 /* Finally check if it fits in our ranges */
1808 if (value >= -(1<<7) && value <= (1<<7)-1) {
1809 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
1810 enc[1] = value&0xFF;
1811 return 2;
1812 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
1813 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
1814 enc[1] = value&0xFF;
1815 enc[2] = (value>>8)&0xFF;
1816 return 3;
1817 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
1818 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
1819 enc[1] = value&0xFF;
1820 enc[2] = (value>>8)&0xFF;
1821 enc[3] = (value>>16)&0xFF;
1822 enc[4] = (value>>24)&0xFF;
1823 return 5;
1824 } else {
1825 return 0;
1826 }
1827 }
1828
1829 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
1830 unsigned int comprlen, outlen;
1831 unsigned char byte;
1832 void *out;
1833
1834 /* We require at least four bytes compression for this to be worth it */
1835 outlen = sdslen(obj->ptr)-4;
1836 if (outlen <= 0) return 0;
1837 if ((out = zmalloc(outlen+1)) == NULL) return 0;
1838 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
1839 if (comprlen == 0) {
1840 zfree(out);
1841 return 0;
1842 }
1843 /* Data compressed! Let's save it on disk */
1844 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
1845 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
1846 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
1847 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
1848 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
1849 zfree(out);
1850 return comprlen;
1851
1852 writeerr:
1853 zfree(out);
1854 return -1;
1855 }
1856
1857 /* Save a string objet as [len][data] on disk. If the object is a string
1858 * representation of an integer value we try to safe it in a special form */
1859 static int rdbSaveStringObject(FILE *fp, robj *obj) {
1860 size_t len = sdslen(obj->ptr);
1861 int enclen;
1862
1863 /* Try integer encoding */
1864 if (len <= 11) {
1865 unsigned char buf[5];
1866 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
1867 if (fwrite(buf,enclen,1,fp) == 0) return -1;
1868 return 0;
1869 }
1870 }
1871
1872 /* Try LZF compression - under 20 bytes it's unable to compress even
1873 * aaaaaaaaaaaaaaaaaa so skip it */
1874 if (1 && len > 20) {
1875 int retval;
1876
1877 retval = rdbSaveLzfStringObject(fp,obj);
1878 if (retval == -1) return -1;
1879 if (retval > 0) return 0;
1880 /* retval == 0 means data can't be compressed, save the old way */
1881 }
1882
1883 /* Store verbatim */
1884 if (rdbSaveLen(fp,len) == -1) return -1;
1885 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
1886 return 0;
1887 }
1888
1889 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1890 static int rdbSave(char *filename) {
1891 dictIterator *di = NULL;
1892 dictEntry *de;
1893 FILE *fp;
1894 char tmpfile[256];
1895 int j;
1896 time_t now = time(NULL);
1897
1898 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
1899 fp = fopen(tmpfile,"w");
1900 if (!fp) {
1901 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
1902 return REDIS_ERR;
1903 }
1904 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
1905 for (j = 0; j < server.dbnum; j++) {
1906 redisDb *db = server.db+j;
1907 dict *d = db->dict;
1908 if (dictSize(d) == 0) continue;
1909 di = dictGetIterator(d);
1910 if (!di) {
1911 fclose(fp);
1912 return REDIS_ERR;
1913 }
1914
1915 /* Write the SELECT DB opcode */
1916 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
1917 if (rdbSaveLen(fp,j) == -1) goto werr;
1918
1919 /* Iterate this DB writing every entry */
1920 while((de = dictNext(di)) != NULL) {
1921 robj *key = dictGetEntryKey(de);
1922 robj *o = dictGetEntryVal(de);
1923 time_t expiretime = getExpire(db,key);
1924
1925 /* Save the expire time */
1926 if (expiretime != -1) {
1927 /* If this key is already expired skip it */
1928 if (expiretime < now) continue;
1929 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
1930 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
1931 }
1932 /* Save the key and associated value */
1933 if (rdbSaveType(fp,o->type) == -1) goto werr;
1934 if (rdbSaveStringObject(fp,key) == -1) goto werr;
1935 if (o->type == REDIS_STRING) {
1936 /* Save a string value */
1937 if (rdbSaveStringObject(fp,o) == -1) goto werr;
1938 } else if (o->type == REDIS_LIST) {
1939 /* Save a list value */
1940 list *list = o->ptr;
1941 listNode *ln;
1942
1943 listRewind(list);
1944 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
1945 while((ln = listYield(list))) {
1946 robj *eleobj = listNodeValue(ln);
1947
1948 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1949 }
1950 } else if (o->type == REDIS_SET) {
1951 /* Save a set value */
1952 dict *set = o->ptr;
1953 dictIterator *di = dictGetIterator(set);
1954 dictEntry *de;
1955
1956 if (!set) oom("dictGetIteraotr");
1957 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
1958 while((de = dictNext(di)) != NULL) {
1959 robj *eleobj = dictGetEntryKey(de);
1960
1961 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1962 }
1963 dictReleaseIterator(di);
1964 } else {
1965 assert(0 != 0);
1966 }
1967 }
1968 dictReleaseIterator(di);
1969 }
1970 /* EOF opcode */
1971 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
1972
1973 /* Make sure data will not remain on the OS's output buffers */
1974 fflush(fp);
1975 fsync(fileno(fp));
1976 fclose(fp);
1977
1978 /* Use RENAME to make sure the DB file is changed atomically only
1979 * if the generate DB file is ok. */
1980 if (rename(tmpfile,filename) == -1) {
1981 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
1982 unlink(tmpfile);
1983 return REDIS_ERR;
1984 }
1985 redisLog(REDIS_NOTICE,"DB saved on disk");
1986 server.dirty = 0;
1987 server.lastsave = time(NULL);
1988 return REDIS_OK;
1989
1990 werr:
1991 fclose(fp);
1992 unlink(tmpfile);
1993 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1994 if (di) dictReleaseIterator(di);
1995 return REDIS_ERR;
1996 }
1997
1998 static int rdbSaveBackground(char *filename) {
1999 pid_t childpid;
2000
2001 if (server.bgsaveinprogress) return REDIS_ERR;
2002 if ((childpid = fork()) == 0) {
2003 /* Child */
2004 close(server.fd);
2005 if (rdbSave(filename) == REDIS_OK) {
2006 exit(0);
2007 } else {
2008 exit(1);
2009 }
2010 } else {
2011 /* Parent */
2012 if (childpid == -1) {
2013 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2014 strerror(errno));
2015 return REDIS_ERR;
2016 }
2017 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2018 server.bgsaveinprogress = 1;
2019 server.bgsavechildpid = childpid;
2020 return REDIS_OK;
2021 }
2022 return REDIS_OK; /* unreached */
2023 }
2024
2025 static int rdbLoadType(FILE *fp) {
2026 unsigned char type;
2027 if (fread(&type,1,1,fp) == 0) return -1;
2028 return type;
2029 }
2030
2031 static time_t rdbLoadTime(FILE *fp) {
2032 int32_t t32;
2033 if (fread(&t32,4,1,fp) == 0) return -1;
2034 return (time_t) t32;
2035 }
2036
2037 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2038 * of this file for a description of how this are stored on disk.
2039 *
2040 * isencoded is set to 1 if the readed length is not actually a length but
2041 * an "encoding type", check the above comments for more info */
2042 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2043 unsigned char buf[2];
2044 uint32_t len;
2045
2046 if (isencoded) *isencoded = 0;
2047 if (rdbver == 0) {
2048 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2049 return ntohl(len);
2050 } else {
2051 int type;
2052
2053 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2054 type = (buf[0]&0xC0)>>6;
2055 if (type == REDIS_RDB_6BITLEN) {
2056 /* Read a 6 bit len */
2057 return buf[0]&0x3F;
2058 } else if (type == REDIS_RDB_ENCVAL) {
2059 /* Read a 6 bit len encoding type */
2060 if (isencoded) *isencoded = 1;
2061 return buf[0]&0x3F;
2062 } else if (type == REDIS_RDB_14BITLEN) {
2063 /* Read a 14 bit len */
2064 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2065 return ((buf[0]&0x3F)<<8)|buf[1];
2066 } else {
2067 /* Read a 32 bit len */
2068 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2069 return ntohl(len);
2070 }
2071 }
2072 }
2073
2074 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2075 unsigned char enc[4];
2076 long long val;
2077
2078 if (enctype == REDIS_RDB_ENC_INT8) {
2079 if (fread(enc,1,1,fp) == 0) return NULL;
2080 val = (signed char)enc[0];
2081 } else if (enctype == REDIS_RDB_ENC_INT16) {
2082 uint16_t v;
2083 if (fread(enc,2,1,fp) == 0) return NULL;
2084 v = enc[0]|(enc[1]<<8);
2085 val = (int16_t)v;
2086 } else if (enctype == REDIS_RDB_ENC_INT32) {
2087 uint32_t v;
2088 if (fread(enc,4,1,fp) == 0) return NULL;
2089 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2090 val = (int32_t)v;
2091 } else {
2092 val = 0; /* anti-warning */
2093 assert(0!=0);
2094 }
2095 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2096 }
2097
2098 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2099 unsigned int len, clen;
2100 unsigned char *c = NULL;
2101 sds val = NULL;
2102
2103 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2104 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2105 if ((c = zmalloc(clen)) == NULL) goto err;
2106 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2107 if (fread(c,clen,1,fp) == 0) goto err;
2108 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2109 zfree(c);
2110 return createObject(REDIS_STRING,val);
2111 err:
2112 zfree(c);
2113 sdsfree(val);
2114 return NULL;
2115 }
2116
2117 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2118 int isencoded;
2119 uint32_t len;
2120 sds val;
2121
2122 len = rdbLoadLen(fp,rdbver,&isencoded);
2123 if (isencoded) {
2124 switch(len) {
2125 case REDIS_RDB_ENC_INT8:
2126 case REDIS_RDB_ENC_INT16:
2127 case REDIS_RDB_ENC_INT32:
2128 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2129 case REDIS_RDB_ENC_LZF:
2130 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2131 default:
2132 assert(0!=0);
2133 }
2134 }
2135
2136 if (len == REDIS_RDB_LENERR) return NULL;
2137 val = sdsnewlen(NULL,len);
2138 if (len && fread(val,len,1,fp) == 0) {
2139 sdsfree(val);
2140 return NULL;
2141 }
2142 return tryObjectSharing(createObject(REDIS_STRING,val));
2143 }
2144
2145 static int rdbLoad(char *filename) {
2146 FILE *fp;
2147 robj *keyobj = NULL;
2148 uint32_t dbid;
2149 int type, retval, rdbver;
2150 dict *d = server.db[0].dict;
2151 redisDb *db = server.db+0;
2152 char buf[1024];
2153 time_t expiretime = -1, now = time(NULL);
2154
2155 fp = fopen(filename,"r");
2156 if (!fp) return REDIS_ERR;
2157 if (fread(buf,9,1,fp) == 0) goto eoferr;
2158 buf[9] = '\0';
2159 if (memcmp(buf,"REDIS",5) != 0) {
2160 fclose(fp);
2161 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2162 return REDIS_ERR;
2163 }
2164 rdbver = atoi(buf+5);
2165 if (rdbver > 1) {
2166 fclose(fp);
2167 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2168 return REDIS_ERR;
2169 }
2170 while(1) {
2171 robj *o;
2172
2173 /* Read type. */
2174 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2175 if (type == REDIS_EXPIRETIME) {
2176 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2177 /* We read the time so we need to read the object type again */
2178 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2179 }
2180 if (type == REDIS_EOF) break;
2181 /* Handle SELECT DB opcode as a special case */
2182 if (type == REDIS_SELECTDB) {
2183 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2184 goto eoferr;
2185 if (dbid >= (unsigned)server.dbnum) {
2186 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2187 exit(1);
2188 }
2189 db = server.db+dbid;
2190 d = db->dict;
2191 continue;
2192 }
2193 /* Read key */
2194 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2195
2196 if (type == REDIS_STRING) {
2197 /* Read string value */
2198 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2199 } else if (type == REDIS_LIST || type == REDIS_SET) {
2200 /* Read list/set value */
2201 uint32_t listlen;
2202
2203 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2204 goto eoferr;
2205 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2206 /* Load every single element of the list/set */
2207 while(listlen--) {
2208 robj *ele;
2209
2210 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2211 if (type == REDIS_LIST) {
2212 if (!listAddNodeTail((list*)o->ptr,ele))
2213 oom("listAddNodeTail");
2214 } else {
2215 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2216 oom("dictAdd");
2217 }
2218 }
2219 } else {
2220 assert(0 != 0);
2221 }
2222 /* Add the new object in the hash table */
2223 retval = dictAdd(d,keyobj,o);
2224 if (retval == DICT_ERR) {
2225 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2226 exit(1);
2227 }
2228 /* Set the expire time if needed */
2229 if (expiretime != -1) {
2230 setExpire(db,keyobj,expiretime);
2231 /* Delete this key if already expired */
2232 if (expiretime < now) deleteKey(db,keyobj);
2233 expiretime = -1;
2234 }
2235 keyobj = o = NULL;
2236 }
2237 fclose(fp);
2238 return REDIS_OK;
2239
2240 eoferr: /* unexpected end of file is handled here with a fatal exit */
2241 if (keyobj) decrRefCount(keyobj);
2242 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2243 exit(1);
2244 return REDIS_ERR; /* Just to avoid warning */
2245 }
2246
2247 /*================================== Commands =============================== */
2248
2249 static void authCommand(redisClient *c) {
2250 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2251 c->authenticated = 1;
2252 addReply(c,shared.ok);
2253 } else {
2254 c->authenticated = 0;
2255 addReply(c,shared.err);
2256 }
2257 }
2258
2259 static void pingCommand(redisClient *c) {
2260 addReply(c,shared.pong);
2261 }
2262
2263 static void echoCommand(redisClient *c) {
2264 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
2265 (int)sdslen(c->argv[1]->ptr)));
2266 addReply(c,c->argv[1]);
2267 addReply(c,shared.crlf);
2268 }
2269
2270 /*=================================== Strings =============================== */
2271
2272 static void setGenericCommand(redisClient *c, int nx) {
2273 int retval;
2274
2275 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2276 if (retval == DICT_ERR) {
2277 if (!nx) {
2278 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2279 incrRefCount(c->argv[2]);
2280 } else {
2281 addReply(c,shared.czero);
2282 return;
2283 }
2284 } else {
2285 incrRefCount(c->argv[1]);
2286 incrRefCount(c->argv[2]);
2287 }
2288 server.dirty++;
2289 removeExpire(c->db,c->argv[1]);
2290 addReply(c, nx ? shared.cone : shared.ok);
2291 }
2292
2293 static void setCommand(redisClient *c) {
2294 setGenericCommand(c,0);
2295 }
2296
2297 static void setnxCommand(redisClient *c) {
2298 setGenericCommand(c,1);
2299 }
2300
2301 static void getCommand(redisClient *c) {
2302 robj *o = lookupKeyRead(c->db,c->argv[1]);
2303
2304 if (o == NULL) {
2305 addReply(c,shared.nullbulk);
2306 } else {
2307 if (o->type != REDIS_STRING) {
2308 addReply(c,shared.wrongtypeerr);
2309 } else {
2310 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2311 addReply(c,o);
2312 addReply(c,shared.crlf);
2313 }
2314 }
2315 }
2316
2317 static void getSetCommand(redisClient *c) {
2318 getCommand(c);
2319 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2320 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2321 } else {
2322 incrRefCount(c->argv[1]);
2323 }
2324 incrRefCount(c->argv[2]);
2325 server.dirty++;
2326 removeExpire(c->db,c->argv[1]);
2327 }
2328
2329 static void mgetCommand(redisClient *c) {
2330 int j;
2331
2332 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2333 for (j = 1; j < c->argc; j++) {
2334 robj *o = lookupKeyRead(c->db,c->argv[j]);
2335 if (o == NULL) {
2336 addReply(c,shared.nullbulk);
2337 } else {
2338 if (o->type != REDIS_STRING) {
2339 addReply(c,shared.nullbulk);
2340 } else {
2341 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2342 addReply(c,o);
2343 addReply(c,shared.crlf);
2344 }
2345 }
2346 }
2347 }
2348
2349 static void incrDecrCommand(redisClient *c, long long incr) {
2350 long long value;
2351 int retval;
2352 robj *o;
2353
2354 o = lookupKeyWrite(c->db,c->argv[1]);
2355 if (o == NULL) {
2356 value = 0;
2357 } else {
2358 if (o->type != REDIS_STRING) {
2359 value = 0;
2360 } else {
2361 char *eptr;
2362
2363 value = strtoll(o->ptr, &eptr, 10);
2364 }
2365 }
2366
2367 value += incr;
2368 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2369 retval = dictAdd(c->db->dict,c->argv[1],o);
2370 if (retval == DICT_ERR) {
2371 dictReplace(c->db->dict,c->argv[1],o);
2372 removeExpire(c->db,c->argv[1]);
2373 } else {
2374 incrRefCount(c->argv[1]);
2375 }
2376 server.dirty++;
2377 addReply(c,shared.colon);
2378 addReply(c,o);
2379 addReply(c,shared.crlf);
2380 }
2381
2382 static void incrCommand(redisClient *c) {
2383 incrDecrCommand(c,1);
2384 }
2385
2386 static void decrCommand(redisClient *c) {
2387 incrDecrCommand(c,-1);
2388 }
2389
2390 static void incrbyCommand(redisClient *c) {
2391 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2392 incrDecrCommand(c,incr);
2393 }
2394
2395 static void decrbyCommand(redisClient *c) {
2396 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2397 incrDecrCommand(c,-incr);
2398 }
2399
2400 /* ========================= Type agnostic commands ========================= */
2401
2402 static void delCommand(redisClient *c) {
2403 int deleted = 0, j;
2404
2405 for (j = 1; j < c->argc; j++) {
2406 if (deleteKey(c->db,c->argv[j])) {
2407 server.dirty++;
2408 deleted++;
2409 }
2410 }
2411 switch(deleted) {
2412 case 0:
2413 addReply(c,shared.czero);
2414 break;
2415 case 1:
2416 addReply(c,shared.cone);
2417 break;
2418 default:
2419 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2420 break;
2421 }
2422 }
2423
2424 static void existsCommand(redisClient *c) {
2425 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2426 }
2427
2428 static void selectCommand(redisClient *c) {
2429 int id = atoi(c->argv[1]->ptr);
2430
2431 if (selectDb(c,id) == REDIS_ERR) {
2432 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2433 } else {
2434 addReply(c,shared.ok);
2435 }
2436 }
2437
2438 static void randomkeyCommand(redisClient *c) {
2439 dictEntry *de;
2440
2441 while(1) {
2442 de = dictGetRandomKey(c->db->dict);
2443 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2444 }
2445 if (de == NULL) {
2446 addReply(c,shared.plus);
2447 addReply(c,shared.crlf);
2448 } else {
2449 addReply(c,shared.plus);
2450 addReply(c,dictGetEntryKey(de));
2451 addReply(c,shared.crlf);
2452 }
2453 }
2454
2455 static void keysCommand(redisClient *c) {
2456 dictIterator *di;
2457 dictEntry *de;
2458 sds pattern = c->argv[1]->ptr;
2459 int plen = sdslen(pattern);
2460 int numkeys = 0, keyslen = 0;
2461 robj *lenobj = createObject(REDIS_STRING,NULL);
2462
2463 di = dictGetIterator(c->db->dict);
2464 if (!di) oom("dictGetIterator");
2465 addReply(c,lenobj);
2466 decrRefCount(lenobj);
2467 while((de = dictNext(di)) != NULL) {
2468 robj *keyobj = dictGetEntryKey(de);
2469
2470 sds key = keyobj->ptr;
2471 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2472 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2473 if (expireIfNeeded(c->db,keyobj) == 0) {
2474 if (numkeys != 0)
2475 addReply(c,shared.space);
2476 addReply(c,keyobj);
2477 numkeys++;
2478 keyslen += sdslen(key);
2479 }
2480 }
2481 }
2482 dictReleaseIterator(di);
2483 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2484 addReply(c,shared.crlf);
2485 }
2486
2487 static void dbsizeCommand(redisClient *c) {
2488 addReplySds(c,
2489 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2490 }
2491
2492 static void lastsaveCommand(redisClient *c) {
2493 addReplySds(c,
2494 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2495 }
2496
2497 static void typeCommand(redisClient *c) {
2498 robj *o;
2499 char *type;
2500
2501 o = lookupKeyRead(c->db,c->argv[1]);
2502 if (o == NULL) {
2503 type = "+none";
2504 } else {
2505 switch(o->type) {
2506 case REDIS_STRING: type = "+string"; break;
2507 case REDIS_LIST: type = "+list"; break;
2508 case REDIS_SET: type = "+set"; break;
2509 default: type = "unknown"; break;
2510 }
2511 }
2512 addReplySds(c,sdsnew(type));
2513 addReply(c,shared.crlf);
2514 }
2515
2516 static void saveCommand(redisClient *c) {
2517 if (server.bgsaveinprogress) {
2518 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2519 return;
2520 }
2521 if (rdbSave(server.dbfilename) == REDIS_OK) {
2522 addReply(c,shared.ok);
2523 } else {
2524 addReply(c,shared.err);
2525 }
2526 }
2527
2528 static void bgsaveCommand(redisClient *c) {
2529 if (server.bgsaveinprogress) {
2530 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2531 return;
2532 }
2533 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2534 addReply(c,shared.ok);
2535 } else {
2536 addReply(c,shared.err);
2537 }
2538 }
2539
2540 static void shutdownCommand(redisClient *c) {
2541 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2542 if (server.bgsaveinprogress) {
2543 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2544 signal(SIGCHLD, SIG_IGN);
2545 kill(server.bgsavechildpid,SIGKILL);
2546 }
2547 if (rdbSave(server.dbfilename) == REDIS_OK) {
2548 if (server.daemonize)
2549 unlink(server.pidfile);
2550 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2551 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2552 exit(1);
2553 } else {
2554 signal(SIGCHLD, SIG_DFL);
2555 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2556 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2557 }
2558 }
2559
2560 static void renameGenericCommand(redisClient *c, int nx) {
2561 robj *o;
2562
2563 /* To use the same key as src and dst is probably an error */
2564 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2565 addReply(c,shared.sameobjecterr);
2566 return;
2567 }
2568
2569 o = lookupKeyWrite(c->db,c->argv[1]);
2570 if (o == NULL) {
2571 addReply(c,shared.nokeyerr);
2572 return;
2573 }
2574 incrRefCount(o);
2575 deleteIfVolatile(c->db,c->argv[2]);
2576 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2577 if (nx) {
2578 decrRefCount(o);
2579 addReply(c,shared.czero);
2580 return;
2581 }
2582 dictReplace(c->db->dict,c->argv[2],o);
2583 } else {
2584 incrRefCount(c->argv[2]);
2585 }
2586 deleteKey(c->db,c->argv[1]);
2587 server.dirty++;
2588 addReply(c,nx ? shared.cone : shared.ok);
2589 }
2590
2591 static void renameCommand(redisClient *c) {
2592 renameGenericCommand(c,0);
2593 }
2594
2595 static void renamenxCommand(redisClient *c) {
2596 renameGenericCommand(c,1);
2597 }
2598
2599 static void moveCommand(redisClient *c) {
2600 robj *o;
2601 redisDb *src, *dst;
2602 int srcid;
2603
2604 /* Obtain source and target DB pointers */
2605 src = c->db;
2606 srcid = c->db->id;
2607 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2608 addReply(c,shared.outofrangeerr);
2609 return;
2610 }
2611 dst = c->db;
2612 selectDb(c,srcid); /* Back to the source DB */
2613
2614 /* If the user is moving using as target the same
2615 * DB as the source DB it is probably an error. */
2616 if (src == dst) {
2617 addReply(c,shared.sameobjecterr);
2618 return;
2619 }
2620
2621 /* Check if the element exists and get a reference */
2622 o = lookupKeyWrite(c->db,c->argv[1]);
2623 if (!o) {
2624 addReply(c,shared.czero);
2625 return;
2626 }
2627
2628 /* Try to add the element to the target DB */
2629 deleteIfVolatile(dst,c->argv[1]);
2630 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2631 addReply(c,shared.czero);
2632 return;
2633 }
2634 incrRefCount(c->argv[1]);
2635 incrRefCount(o);
2636
2637 /* OK! key moved, free the entry in the source DB */
2638 deleteKey(src,c->argv[1]);
2639 server.dirty++;
2640 addReply(c,shared.cone);
2641 }
2642
2643 /* =================================== Lists ================================ */
2644 static void pushGenericCommand(redisClient *c, int where) {
2645 robj *lobj;
2646 list *list;
2647
2648 lobj = lookupKeyWrite(c->db,c->argv[1]);
2649 if (lobj == NULL) {
2650 lobj = createListObject();
2651 list = lobj->ptr;
2652 if (where == REDIS_HEAD) {
2653 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2654 } else {
2655 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2656 }
2657 dictAdd(c->db->dict,c->argv[1],lobj);
2658 incrRefCount(c->argv[1]);
2659 incrRefCount(c->argv[2]);
2660 } else {
2661 if (lobj->type != REDIS_LIST) {
2662 addReply(c,shared.wrongtypeerr);
2663 return;
2664 }
2665 list = lobj->ptr;
2666 if (where == REDIS_HEAD) {
2667 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2668 } else {
2669 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2670 }
2671 incrRefCount(c->argv[2]);
2672 }
2673 server.dirty++;
2674 addReply(c,shared.ok);
2675 }
2676
2677 static void lpushCommand(redisClient *c) {
2678 pushGenericCommand(c,REDIS_HEAD);
2679 }
2680
2681 static void rpushCommand(redisClient *c) {
2682 pushGenericCommand(c,REDIS_TAIL);
2683 }
2684
2685 static void llenCommand(redisClient *c) {
2686 robj *o;
2687 list *l;
2688
2689 o = lookupKeyRead(c->db,c->argv[1]);
2690 if (o == NULL) {
2691 addReply(c,shared.czero);
2692 return;
2693 } else {
2694 if (o->type != REDIS_LIST) {
2695 addReply(c,shared.wrongtypeerr);
2696 } else {
2697 l = o->ptr;
2698 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
2699 }
2700 }
2701 }
2702
2703 static void lindexCommand(redisClient *c) {
2704 robj *o;
2705 int index = atoi(c->argv[2]->ptr);
2706
2707 o = lookupKeyRead(c->db,c->argv[1]);
2708 if (o == NULL) {
2709 addReply(c,shared.nullbulk);
2710 } else {
2711 if (o->type != REDIS_LIST) {
2712 addReply(c,shared.wrongtypeerr);
2713 } else {
2714 list *list = o->ptr;
2715 listNode *ln;
2716
2717 ln = listIndex(list, index);
2718 if (ln == NULL) {
2719 addReply(c,shared.nullbulk);
2720 } else {
2721 robj *ele = listNodeValue(ln);
2722 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2723 addReply(c,ele);
2724 addReply(c,shared.crlf);
2725 }
2726 }
2727 }
2728 }
2729
2730 static void lsetCommand(redisClient *c) {
2731 robj *o;
2732 int index = atoi(c->argv[2]->ptr);
2733
2734 o = lookupKeyWrite(c->db,c->argv[1]);
2735 if (o == NULL) {
2736 addReply(c,shared.nokeyerr);
2737 } else {
2738 if (o->type != REDIS_LIST) {
2739 addReply(c,shared.wrongtypeerr);
2740 } else {
2741 list *list = o->ptr;
2742 listNode *ln;
2743
2744 ln = listIndex(list, index);
2745 if (ln == NULL) {
2746 addReply(c,shared.outofrangeerr);
2747 } else {
2748 robj *ele = listNodeValue(ln);
2749
2750 decrRefCount(ele);
2751 listNodeValue(ln) = c->argv[3];
2752 incrRefCount(c->argv[3]);
2753 addReply(c,shared.ok);
2754 server.dirty++;
2755 }
2756 }
2757 }
2758 }
2759
2760 static void popGenericCommand(redisClient *c, int where) {
2761 robj *o;
2762
2763 o = lookupKeyWrite(c->db,c->argv[1]);
2764 if (o == NULL) {
2765 addReply(c,shared.nullbulk);
2766 } else {
2767 if (o->type != REDIS_LIST) {
2768 addReply(c,shared.wrongtypeerr);
2769 } else {
2770 list *list = o->ptr;
2771 listNode *ln;
2772
2773 if (where == REDIS_HEAD)
2774 ln = listFirst(list);
2775 else
2776 ln = listLast(list);
2777
2778 if (ln == NULL) {
2779 addReply(c,shared.nullbulk);
2780 } else {
2781 robj *ele = listNodeValue(ln);
2782 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2783 addReply(c,ele);
2784 addReply(c,shared.crlf);
2785 listDelNode(list,ln);
2786 server.dirty++;
2787 }
2788 }
2789 }
2790 }
2791
2792 static void lpopCommand(redisClient *c) {
2793 popGenericCommand(c,REDIS_HEAD);
2794 }
2795
2796 static void rpopCommand(redisClient *c) {
2797 popGenericCommand(c,REDIS_TAIL);
2798 }
2799
2800 static void lrangeCommand(redisClient *c) {
2801 robj *o;
2802 int start = atoi(c->argv[2]->ptr);
2803 int end = atoi(c->argv[3]->ptr);
2804
2805 o = lookupKeyRead(c->db,c->argv[1]);
2806 if (o == NULL) {
2807 addReply(c,shared.nullmultibulk);
2808 } else {
2809 if (o->type != REDIS_LIST) {
2810 addReply(c,shared.wrongtypeerr);
2811 } else {
2812 list *list = o->ptr;
2813 listNode *ln;
2814 int llen = listLength(list);
2815 int rangelen, j;
2816 robj *ele;
2817
2818 /* convert negative indexes */
2819 if (start < 0) start = llen+start;
2820 if (end < 0) end = llen+end;
2821 if (start < 0) start = 0;
2822 if (end < 0) end = 0;
2823
2824 /* indexes sanity checks */
2825 if (start > end || start >= llen) {
2826 /* Out of range start or start > end result in empty list */
2827 addReply(c,shared.emptymultibulk);
2828 return;
2829 }
2830 if (end >= llen) end = llen-1;
2831 rangelen = (end-start)+1;
2832
2833 /* Return the result in form of a multi-bulk reply */
2834 ln = listIndex(list, start);
2835 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
2836 for (j = 0; j < rangelen; j++) {
2837 ele = listNodeValue(ln);
2838 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2839 addReply(c,ele);
2840 addReply(c,shared.crlf);
2841 ln = ln->next;
2842 }
2843 }
2844 }
2845 }
2846
2847 static void ltrimCommand(redisClient *c) {
2848 robj *o;
2849 int start = atoi(c->argv[2]->ptr);
2850 int end = atoi(c->argv[3]->ptr);
2851
2852 o = lookupKeyWrite(c->db,c->argv[1]);
2853 if (o == NULL) {
2854 addReply(c,shared.nokeyerr);
2855 } else {
2856 if (o->type != REDIS_LIST) {
2857 addReply(c,shared.wrongtypeerr);
2858 } else {
2859 list *list = o->ptr;
2860 listNode *ln;
2861 int llen = listLength(list);
2862 int j, ltrim, rtrim;
2863
2864 /* convert negative indexes */
2865 if (start < 0) start = llen+start;
2866 if (end < 0) end = llen+end;
2867 if (start < 0) start = 0;
2868 if (end < 0) end = 0;
2869
2870 /* indexes sanity checks */
2871 if (start > end || start >= llen) {
2872 /* Out of range start or start > end result in empty list */
2873 ltrim = llen;
2874 rtrim = 0;
2875 } else {
2876 if (end >= llen) end = llen-1;
2877 ltrim = start;
2878 rtrim = llen-end-1;
2879 }
2880
2881 /* Remove list elements to perform the trim */
2882 for (j = 0; j < ltrim; j++) {
2883 ln = listFirst(list);
2884 listDelNode(list,ln);
2885 }
2886 for (j = 0; j < rtrim; j++) {
2887 ln = listLast(list);
2888 listDelNode(list,ln);
2889 }
2890 addReply(c,shared.ok);
2891 server.dirty++;
2892 }
2893 }
2894 }
2895
2896 static void lremCommand(redisClient *c) {
2897 robj *o;
2898
2899 o = lookupKeyWrite(c->db,c->argv[1]);
2900 if (o == NULL) {
2901 addReply(c,shared.czero);
2902 } else {
2903 if (o->type != REDIS_LIST) {
2904 addReply(c,shared.wrongtypeerr);
2905 } else {
2906 list *list = o->ptr;
2907 listNode *ln, *next;
2908 int toremove = atoi(c->argv[2]->ptr);
2909 int removed = 0;
2910 int fromtail = 0;
2911
2912 if (toremove < 0) {
2913 toremove = -toremove;
2914 fromtail = 1;
2915 }
2916 ln = fromtail ? list->tail : list->head;
2917 while (ln) {
2918 robj *ele = listNodeValue(ln);
2919
2920 next = fromtail ? ln->prev : ln->next;
2921 if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) {
2922 listDelNode(list,ln);
2923 server.dirty++;
2924 removed++;
2925 if (toremove && removed == toremove) break;
2926 }
2927 ln = next;
2928 }
2929 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
2930 }
2931 }
2932 }
2933
2934 /* ==================================== Sets ================================ */
2935
2936 static void saddCommand(redisClient *c) {
2937 robj *set;
2938
2939 set = lookupKeyWrite(c->db,c->argv[1]);
2940 if (set == NULL) {
2941 set = createSetObject();
2942 dictAdd(c->db->dict,c->argv[1],set);
2943 incrRefCount(c->argv[1]);
2944 } else {
2945 if (set->type != REDIS_SET) {
2946 addReply(c,shared.wrongtypeerr);
2947 return;
2948 }
2949 }
2950 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
2951 incrRefCount(c->argv[2]);
2952 server.dirty++;
2953 addReply(c,shared.cone);
2954 } else {
2955 addReply(c,shared.czero);
2956 }
2957 }
2958
2959 static void sremCommand(redisClient *c) {
2960 robj *set;
2961
2962 set = lookupKeyWrite(c->db,c->argv[1]);
2963 if (set == NULL) {
2964 addReply(c,shared.czero);
2965 } else {
2966 if (set->type != REDIS_SET) {
2967 addReply(c,shared.wrongtypeerr);
2968 return;
2969 }
2970 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
2971 server.dirty++;
2972 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
2973 addReply(c,shared.cone);
2974 } else {
2975 addReply(c,shared.czero);
2976 }
2977 }
2978 }
2979
2980 static void smoveCommand(redisClient *c) {
2981 robj *srcset, *dstset;
2982
2983 srcset = lookupKeyWrite(c->db,c->argv[1]);
2984 dstset = lookupKeyWrite(c->db,c->argv[2]);
2985
2986 /* If the source key does not exist return 0, if it's of the wrong type
2987 * raise an error */
2988 if (srcset == NULL || srcset->type != REDIS_SET) {
2989 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
2990 return;
2991 }
2992 /* Error if the destination key is not a set as well */
2993 if (dstset && dstset->type != REDIS_SET) {
2994 addReply(c,shared.wrongtypeerr);
2995 return;
2996 }
2997 /* Remove the element from the source set */
2998 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
2999 /* Key not found in the src set! return zero */
3000 addReply(c,shared.czero);
3001 return;
3002 }
3003 server.dirty++;
3004 /* Add the element to the destination set */
3005 if (!dstset) {
3006 dstset = createSetObject();
3007 dictAdd(c->db->dict,c->argv[2],dstset);
3008 incrRefCount(c->argv[2]);
3009 }
3010 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3011 incrRefCount(c->argv[3]);
3012 addReply(c,shared.cone);
3013 }
3014
3015 static void sismemberCommand(redisClient *c) {
3016 robj *set;
3017
3018 set = lookupKeyRead(c->db,c->argv[1]);
3019 if (set == NULL) {
3020 addReply(c,shared.czero);
3021 } else {
3022 if (set->type != REDIS_SET) {
3023 addReply(c,shared.wrongtypeerr);
3024 return;
3025 }
3026 if (dictFind(set->ptr,c->argv[2]))
3027 addReply(c,shared.cone);
3028 else
3029 addReply(c,shared.czero);
3030 }
3031 }
3032
3033 static void scardCommand(redisClient *c) {
3034 robj *o;
3035 dict *s;
3036
3037 o = lookupKeyRead(c->db,c->argv[1]);
3038 if (o == NULL) {
3039 addReply(c,shared.czero);
3040 return;
3041 } else {
3042 if (o->type != REDIS_SET) {
3043 addReply(c,shared.wrongtypeerr);
3044 } else {
3045 s = o->ptr;
3046 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3047 dictSize(s)));
3048 }
3049 }
3050 }
3051
3052 static void spopCommand(redisClient *c) {
3053 robj *set;
3054 dictEntry *de;
3055
3056 set = lookupKeyWrite(c->db,c->argv[1]);
3057 if (set == NULL) {
3058 addReply(c,shared.nullbulk);
3059 } else {
3060 if (set->type != REDIS_SET) {
3061 addReply(c,shared.wrongtypeerr);
3062 return;
3063 }
3064 de = dictGetRandomKey(set->ptr);
3065 if (de == NULL) {
3066 addReply(c,shared.nullbulk);
3067 } else {
3068 robj *ele = dictGetEntryKey(de);
3069
3070 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
3071 addReply(c,ele);
3072 addReply(c,shared.crlf);
3073 dictDelete(set->ptr,ele);
3074 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3075 server.dirty++;
3076 }
3077 }
3078 }
3079
3080 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3081 dict **d1 = (void*) s1, **d2 = (void*) s2;
3082
3083 return dictSize(*d1)-dictSize(*d2);
3084 }
3085
3086 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3087 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3088 dictIterator *di;
3089 dictEntry *de;
3090 robj *lenobj = NULL, *dstset = NULL;
3091 int j, cardinality = 0;
3092
3093 if (!dv) oom("sinterGenericCommand");
3094 for (j = 0; j < setsnum; j++) {
3095 robj *setobj;
3096
3097 setobj = dstkey ?
3098 lookupKeyWrite(c->db,setskeys[j]) :
3099 lookupKeyRead(c->db,setskeys[j]);
3100 if (!setobj) {
3101 zfree(dv);
3102 if (dstkey) {
3103 deleteKey(c->db,dstkey);
3104 addReply(c,shared.ok);
3105 } else {
3106 addReply(c,shared.nullmultibulk);
3107 }
3108 return;
3109 }
3110 if (setobj->type != REDIS_SET) {
3111 zfree(dv);
3112 addReply(c,shared.wrongtypeerr);
3113 return;
3114 }
3115 dv[j] = setobj->ptr;
3116 }
3117 /* Sort sets from the smallest to largest, this will improve our
3118 * algorithm's performace */
3119 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3120
3121 /* The first thing we should output is the total number of elements...
3122 * since this is a multi-bulk write, but at this stage we don't know
3123 * the intersection set size, so we use a trick, append an empty object
3124 * to the output list and save the pointer to later modify it with the
3125 * right length */
3126 if (!dstkey) {
3127 lenobj = createObject(REDIS_STRING,NULL);
3128 addReply(c,lenobj);
3129 decrRefCount(lenobj);
3130 } else {
3131 /* If we have a target key where to store the resulting set
3132 * create this key with an empty set inside */
3133 dstset = createSetObject();
3134 }
3135
3136 /* Iterate all the elements of the first (smallest) set, and test
3137 * the element against all the other sets, if at least one set does
3138 * not include the element it is discarded */
3139 di = dictGetIterator(dv[0]);
3140 if (!di) oom("dictGetIterator");
3141
3142 while((de = dictNext(di)) != NULL) {
3143 robj *ele;
3144
3145 for (j = 1; j < setsnum; j++)
3146 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3147 if (j != setsnum)
3148 continue; /* at least one set does not contain the member */
3149 ele = dictGetEntryKey(de);
3150 if (!dstkey) {
3151 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
3152 addReply(c,ele);
3153 addReply(c,shared.crlf);
3154 cardinality++;
3155 } else {
3156 dictAdd(dstset->ptr,ele,NULL);
3157 incrRefCount(ele);
3158 }
3159 }
3160 dictReleaseIterator(di);
3161
3162 if (dstkey) {
3163 /* Store the resulting set into the target */
3164 deleteKey(c->db,dstkey);
3165 dictAdd(c->db->dict,dstkey,dstset);
3166 incrRefCount(dstkey);
3167 }
3168
3169 if (!dstkey) {
3170 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3171 } else {
3172 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3173 dictSize((dict*)dstset->ptr)));
3174 server.dirty++;
3175 }
3176 zfree(dv);
3177 }
3178
3179 static void sinterCommand(redisClient *c) {
3180 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3181 }
3182
3183 static void sinterstoreCommand(redisClient *c) {
3184 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3185 }
3186
3187 #define REDIS_OP_UNION 0
3188 #define REDIS_OP_DIFF 1
3189
3190 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3191 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3192 dictIterator *di;
3193 dictEntry *de;
3194 robj *dstset = NULL;
3195 int j, cardinality = 0;
3196
3197 if (!dv) oom("sunionDiffGenericCommand");
3198 for (j = 0; j < setsnum; j++) {
3199 robj *setobj;
3200
3201 setobj = dstkey ?
3202 lookupKeyWrite(c->db,setskeys[j]) :
3203 lookupKeyRead(c->db,setskeys[j]);
3204 if (!setobj) {
3205 dv[j] = NULL;
3206 continue;
3207 }
3208 if (setobj->type != REDIS_SET) {
3209 zfree(dv);
3210 addReply(c,shared.wrongtypeerr);
3211 return;
3212 }
3213 dv[j] = setobj->ptr;
3214 }
3215
3216 /* We need a temp set object to store our union. If the dstkey
3217 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3218 * this set object will be the resulting object to set into the target key*/
3219 dstset = createSetObject();
3220
3221 /* Iterate all the elements of all the sets, add every element a single
3222 * time to the result set */
3223 for (j = 0; j < setsnum; j++) {
3224 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3225 if (!dv[j]) continue; /* non existing keys are like empty sets */
3226
3227 di = dictGetIterator(dv[j]);
3228 if (!di) oom("dictGetIterator");
3229
3230 while((de = dictNext(di)) != NULL) {
3231 robj *ele;
3232
3233 /* dictAdd will not add the same element multiple times */
3234 ele = dictGetEntryKey(de);
3235 if (op == REDIS_OP_UNION || j == 0) {
3236 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3237 incrRefCount(ele);
3238 cardinality++;
3239 }
3240 } else if (op == REDIS_OP_DIFF) {
3241 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3242 cardinality--;
3243 }
3244 }
3245 }
3246 dictReleaseIterator(di);
3247
3248 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3249 }
3250
3251 /* Output the content of the resulting set, if not in STORE mode */
3252 if (!dstkey) {
3253 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3254 di = dictGetIterator(dstset->ptr);
3255 if (!di) oom("dictGetIterator");
3256 while((de = dictNext(di)) != NULL) {
3257 robj *ele;
3258
3259 ele = dictGetEntryKey(de);
3260 addReplySds(c,sdscatprintf(sdsempty(),
3261 "$%d\r\n",sdslen(ele->ptr)));
3262 addReply(c,ele);
3263 addReply(c,shared.crlf);
3264 }
3265 dictReleaseIterator(di);
3266 } else {
3267 /* If we have a target key where to store the resulting set
3268 * create this key with the result set inside */
3269 deleteKey(c->db,dstkey);
3270 dictAdd(c->db->dict,dstkey,dstset);
3271 incrRefCount(dstkey);
3272 }
3273
3274 /* Cleanup */
3275 if (!dstkey) {
3276 decrRefCount(dstset);
3277 } else {
3278 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3279 dictSize((dict*)dstset->ptr)));
3280 server.dirty++;
3281 }
3282 zfree(dv);
3283 }
3284
3285 static void sunionCommand(redisClient *c) {
3286 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3287 }
3288
3289 static void sunionstoreCommand(redisClient *c) {
3290 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3291 }
3292
3293 static void sdiffCommand(redisClient *c) {
3294 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3295 }
3296
3297 static void sdiffstoreCommand(redisClient *c) {
3298 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3299 }
3300
3301 static void flushdbCommand(redisClient *c) {
3302 server.dirty += dictSize(c->db->dict);
3303 dictEmpty(c->db->dict);
3304 dictEmpty(c->db->expires);
3305 addReply(c,shared.ok);
3306 }
3307
3308 static void flushallCommand(redisClient *c) {
3309 server.dirty += emptyDb();
3310 addReply(c,shared.ok);
3311 rdbSave(server.dbfilename);
3312 server.dirty++;
3313 }
3314
3315 static redisSortOperation *createSortOperation(int type, robj *pattern) {
3316 redisSortOperation *so = zmalloc(sizeof(*so));
3317 if (!so) oom("createSortOperation");
3318 so->type = type;
3319 so->pattern = pattern;
3320 return so;
3321 }
3322
3323 /* Return the value associated to the key with a name obtained
3324 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3325 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3326 char *p;
3327 sds spat, ssub;
3328 robj keyobj;
3329 int prefixlen, sublen, postfixlen;
3330 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3331 struct {
3332 long len;
3333 long free;
3334 char buf[REDIS_SORTKEY_MAX+1];
3335 } keyname;
3336
3337 spat = pattern->ptr;
3338 ssub = subst->ptr;
3339 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3340 p = strchr(spat,'*');
3341 if (!p) return NULL;
3342
3343 prefixlen = p-spat;
3344 sublen = sdslen(ssub);
3345 postfixlen = sdslen(spat)-(prefixlen+1);
3346 memcpy(keyname.buf,spat,prefixlen);
3347 memcpy(keyname.buf+prefixlen,ssub,sublen);
3348 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3349 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3350 keyname.len = prefixlen+sublen+postfixlen;
3351
3352 keyobj.refcount = 1;
3353 keyobj.type = REDIS_STRING;
3354 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3355
3356 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3357 return lookupKeyRead(db,&keyobj);
3358 }
3359
3360 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3361 * the additional parameter is not standard but a BSD-specific we have to
3362 * pass sorting parameters via the global 'server' structure */
3363 static int sortCompare(const void *s1, const void *s2) {
3364 const redisSortObject *so1 = s1, *so2 = s2;
3365 int cmp;
3366
3367 if (!server.sort_alpha) {
3368 /* Numeric sorting. Here it's trivial as we precomputed scores */
3369 if (so1->u.score > so2->u.score) {
3370 cmp = 1;
3371 } else if (so1->u.score < so2->u.score) {
3372 cmp = -1;
3373 } else {
3374 cmp = 0;
3375 }
3376 } else {
3377 /* Alphanumeric sorting */
3378 if (server.sort_bypattern) {
3379 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3380 /* At least one compare object is NULL */
3381 if (so1->u.cmpobj == so2->u.cmpobj)
3382 cmp = 0;
3383 else if (so1->u.cmpobj == NULL)
3384 cmp = -1;
3385 else
3386 cmp = 1;
3387 } else {
3388 /* We have both the objects, use strcoll */
3389 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3390 }
3391 } else {
3392 /* Compare elements directly */
3393 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3394 }
3395 }
3396 return server.sort_desc ? -cmp : cmp;
3397 }
3398
3399 /* The SORT command is the most complex command in Redis. Warning: this code
3400 * is optimized for speed and a bit less for readability */
3401 static void sortCommand(redisClient *c) {
3402 list *operations;
3403 int outputlen = 0;
3404 int desc = 0, alpha = 0;
3405 int limit_start = 0, limit_count = -1, start, end;
3406 int j, dontsort = 0, vectorlen;
3407 int getop = 0; /* GET operation counter */
3408 robj *sortval, *sortby = NULL;
3409 redisSortObject *vector; /* Resulting vector to sort */
3410
3411 /* Lookup the key to sort. It must be of the right types */
3412 sortval = lookupKeyRead(c->db,c->argv[1]);
3413 if (sortval == NULL) {
3414 addReply(c,shared.nokeyerr);
3415 return;
3416 }
3417 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3418 addReply(c,shared.wrongtypeerr);
3419 return;
3420 }
3421
3422 /* Create a list of operations to perform for every sorted element.
3423 * Operations can be GET/DEL/INCR/DECR */
3424 operations = listCreate();
3425 listSetFreeMethod(operations,zfree);
3426 j = 2;
3427
3428 /* Now we need to protect sortval incrementing its count, in the future
3429 * SORT may have options able to overwrite/delete keys during the sorting
3430 * and the sorted key itself may get destroied */
3431 incrRefCount(sortval);
3432
3433 /* The SORT command has an SQL-alike syntax, parse it */
3434 while(j < c->argc) {
3435 int leftargs = c->argc-j-1;
3436 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3437 desc = 0;
3438 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3439 desc = 1;
3440 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3441 alpha = 1;
3442 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3443 limit_start = atoi(c->argv[j+1]->ptr);
3444 limit_count = atoi(c->argv[j+2]->ptr);
3445 j+=2;
3446 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3447 sortby = c->argv[j+1];
3448 /* If the BY pattern does not contain '*', i.e. it is constant,
3449 * we don't need to sort nor to lookup the weight keys. */
3450 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3451 j++;
3452 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3453 listAddNodeTail(operations,createSortOperation(
3454 REDIS_SORT_GET,c->argv[j+1]));
3455 getop++;
3456 j++;
3457 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3458 listAddNodeTail(operations,createSortOperation(
3459 REDIS_SORT_DEL,c->argv[j+1]));
3460 j++;
3461 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3462 listAddNodeTail(operations,createSortOperation(
3463 REDIS_SORT_INCR,c->argv[j+1]));
3464 j++;
3465 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3466 listAddNodeTail(operations,createSortOperation(
3467 REDIS_SORT_DECR,c->argv[j+1]));
3468 j++;
3469 } else {
3470 decrRefCount(sortval);
3471 listRelease(operations);
3472 addReply(c,shared.syntaxerr);
3473 return;
3474 }
3475 j++;
3476 }
3477
3478 /* Load the sorting vector with all the objects to sort */
3479 vectorlen = (sortval->type == REDIS_LIST) ?
3480 listLength((list*)sortval->ptr) :
3481 dictSize((dict*)sortval->ptr);
3482 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3483 if (!vector) oom("allocating objects vector for SORT");
3484 j = 0;
3485 if (sortval->type == REDIS_LIST) {
3486 list *list = sortval->ptr;
3487 listNode *ln;
3488
3489 listRewind(list);
3490 while((ln = listYield(list))) {
3491 robj *ele = ln->value;
3492 vector[j].obj = ele;
3493 vector[j].u.score = 0;
3494 vector[j].u.cmpobj = NULL;
3495 j++;
3496 }
3497 } else {
3498 dict *set = sortval->ptr;
3499 dictIterator *di;
3500 dictEntry *setele;
3501
3502 di = dictGetIterator(set);
3503 if (!di) oom("dictGetIterator");
3504 while((setele = dictNext(di)) != NULL) {
3505 vector[j].obj = dictGetEntryKey(setele);
3506 vector[j].u.score = 0;
3507 vector[j].u.cmpobj = NULL;
3508 j++;
3509 }
3510 dictReleaseIterator(di);
3511 }
3512 assert(j == vectorlen);
3513
3514 /* Now it's time to load the right scores in the sorting vector */
3515 if (dontsort == 0) {
3516 for (j = 0; j < vectorlen; j++) {
3517 if (sortby) {
3518 robj *byval;
3519
3520 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3521 if (!byval || byval->type != REDIS_STRING) continue;
3522 if (alpha) {
3523 vector[j].u.cmpobj = byval;
3524 incrRefCount(byval);
3525 } else {
3526 vector[j].u.score = strtod(byval->ptr,NULL);
3527 }
3528 } else {
3529 if (!alpha) vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3530 }
3531 }
3532 }
3533
3534 /* We are ready to sort the vector... perform a bit of sanity check
3535 * on the LIMIT option too. We'll use a partial version of quicksort. */
3536 start = (limit_start < 0) ? 0 : limit_start;
3537 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3538 if (start >= vectorlen) {
3539 start = vectorlen-1;
3540 end = vectorlen-2;
3541 }
3542 if (end >= vectorlen) end = vectorlen-1;
3543
3544 if (dontsort == 0) {
3545 server.sort_desc = desc;
3546 server.sort_alpha = alpha;
3547 server.sort_bypattern = sortby ? 1 : 0;
3548 if (sortby && (start != 0 || end != vectorlen-1))
3549 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3550 else
3551 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3552 }
3553
3554 /* Send command output to the output buffer, performing the specified
3555 * GET/DEL/INCR/DECR operations if any. */
3556 outputlen = getop ? getop*(end-start+1) : end-start+1;
3557 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3558 for (j = start; j <= end; j++) {
3559 listNode *ln;
3560 if (!getop) {
3561 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3562 sdslen(vector[j].obj->ptr)));
3563 addReply(c,vector[j].obj);
3564 addReply(c,shared.crlf);
3565 }
3566 listRewind(operations);
3567 while((ln = listYield(operations))) {
3568 redisSortOperation *sop = ln->value;
3569 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3570 vector[j].obj);
3571
3572 if (sop->type == REDIS_SORT_GET) {
3573 if (!val || val->type != REDIS_STRING) {
3574 addReply(c,shared.nullbulk);
3575 } else {
3576 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3577 sdslen(val->ptr)));
3578 addReply(c,val);
3579 addReply(c,shared.crlf);
3580 }
3581 } else if (sop->type == REDIS_SORT_DEL) {
3582 /* TODO */
3583 }
3584 }
3585 }
3586
3587 /* Cleanup */
3588 decrRefCount(sortval);
3589 listRelease(operations);
3590 for (j = 0; j < vectorlen; j++) {
3591 if (sortby && alpha && vector[j].u.cmpobj)
3592 decrRefCount(vector[j].u.cmpobj);
3593 }
3594 zfree(vector);
3595 }
3596
3597 static void infoCommand(redisClient *c) {
3598 sds info;
3599 time_t uptime = time(NULL)-server.stat_starttime;
3600
3601 info = sdscatprintf(sdsempty(),
3602 "redis_version:%s\r\n"
3603 "uptime_in_seconds:%d\r\n"
3604 "uptime_in_days:%d\r\n"
3605 "connected_clients:%d\r\n"
3606 "connected_slaves:%d\r\n"
3607 "used_memory:%zu\r\n"
3608 "changes_since_last_save:%lld\r\n"
3609 "bgsave_in_progress:%d\r\n"
3610 "last_save_time:%d\r\n"
3611 "total_connections_received:%lld\r\n"
3612 "total_commands_processed:%lld\r\n"
3613 "role:%s\r\n"
3614 ,REDIS_VERSION,
3615 uptime,
3616 uptime/(3600*24),
3617 listLength(server.clients)-listLength(server.slaves),
3618 listLength(server.slaves),
3619 server.usedmemory,
3620 server.dirty,
3621 server.bgsaveinprogress,
3622 server.lastsave,
3623 server.stat_numconnections,
3624 server.stat_numcommands,
3625 server.masterhost == NULL ? "master" : "slave"
3626 );
3627 if (server.masterhost) {
3628 info = sdscatprintf(info,
3629 "master_host:%s\r\n"
3630 "master_port:%d\r\n"
3631 "master_link_status:%s\r\n"
3632 "master_last_io_seconds_ago:%d\r\n"
3633 ,server.masterhost,
3634 server.masterport,
3635 (server.replstate == REDIS_REPL_CONNECTED) ?
3636 "up" : "down",
3637 (int)(time(NULL)-server.master->lastinteraction)
3638 );
3639 }
3640 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
3641 addReplySds(c,info);
3642 addReply(c,shared.crlf);
3643 }
3644
3645 static void monitorCommand(redisClient *c) {
3646 /* ignore MONITOR if aleady slave or in monitor mode */
3647 if (c->flags & REDIS_SLAVE) return;
3648
3649 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3650 c->slaveseldb = 0;
3651 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3652 addReply(c,shared.ok);
3653 }
3654
3655 /* ================================= Expire ================================= */
3656 static int removeExpire(redisDb *db, robj *key) {
3657 if (dictDelete(db->expires,key) == DICT_OK) {
3658 return 1;
3659 } else {
3660 return 0;
3661 }
3662 }
3663
3664 static int setExpire(redisDb *db, robj *key, time_t when) {
3665 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
3666 return 0;
3667 } else {
3668 incrRefCount(key);
3669 return 1;
3670 }
3671 }
3672
3673 /* Return the expire time of the specified key, or -1 if no expire
3674 * is associated with this key (i.e. the key is non volatile) */
3675 static time_t getExpire(redisDb *db, robj *key) {
3676 dictEntry *de;
3677
3678 /* No expire? return ASAP */
3679 if (dictSize(db->expires) == 0 ||
3680 (de = dictFind(db->expires,key)) == NULL) return -1;
3681
3682 return (time_t) dictGetEntryVal(de);
3683 }
3684
3685 static int expireIfNeeded(redisDb *db, robj *key) {
3686 time_t when;
3687 dictEntry *de;
3688
3689 /* No expire? return ASAP */
3690 if (dictSize(db->expires) == 0 ||
3691 (de = dictFind(db->expires,key)) == NULL) return 0;
3692
3693 /* Lookup the expire */
3694 when = (time_t) dictGetEntryVal(de);
3695 if (time(NULL) <= when) return 0;
3696
3697 /* Delete the key */
3698 dictDelete(db->expires,key);
3699 return dictDelete(db->dict,key) == DICT_OK;
3700 }
3701
3702 static int deleteIfVolatile(redisDb *db, robj *key) {
3703 dictEntry *de;
3704
3705 /* No expire? return ASAP */
3706 if (dictSize(db->expires) == 0 ||
3707 (de = dictFind(db->expires,key)) == NULL) return 0;
3708
3709 /* Delete the key */
3710 server.dirty++;
3711 dictDelete(db->expires,key);
3712 return dictDelete(db->dict,key) == DICT_OK;
3713 }
3714
3715 static void expireCommand(redisClient *c) {
3716 dictEntry *de;
3717 int seconds = atoi(c->argv[2]->ptr);
3718
3719 de = dictFind(c->db->dict,c->argv[1]);
3720 if (de == NULL) {
3721 addReply(c,shared.czero);
3722 return;
3723 }
3724 if (seconds <= 0) {
3725 addReply(c, shared.czero);
3726 return;
3727 } else {
3728 time_t when = time(NULL)+seconds;
3729 if (setExpire(c->db,c->argv[1],when))
3730 addReply(c,shared.cone);
3731 else
3732 addReply(c,shared.czero);
3733 return;
3734 }
3735 }
3736
3737 static void ttlCommand(redisClient *c) {
3738 time_t expire;
3739 int ttl = -1;
3740
3741 expire = getExpire(c->db,c->argv[1]);
3742 if (expire != -1) {
3743 ttl = (int) (expire-time(NULL));
3744 if (ttl < 0) ttl = -1;
3745 }
3746 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
3747 }
3748
3749 /* =============================== Replication ============================= */
3750
3751 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
3752 ssize_t nwritten, ret = size;
3753 time_t start = time(NULL);
3754
3755 timeout++;
3756 while(size) {
3757 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
3758 nwritten = write(fd,ptr,size);
3759 if (nwritten == -1) return -1;
3760 ptr += nwritten;
3761 size -= nwritten;
3762 }
3763 if ((time(NULL)-start) > timeout) {
3764 errno = ETIMEDOUT;
3765 return -1;
3766 }
3767 }
3768 return ret;
3769 }
3770
3771 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
3772 ssize_t nread, totread = 0;
3773 time_t start = time(NULL);
3774
3775 timeout++;
3776 while(size) {
3777 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
3778 nread = read(fd,ptr,size);
3779 if (nread == -1) return -1;
3780 ptr += nread;
3781 size -= nread;
3782 totread += nread;
3783 }
3784 if ((time(NULL)-start) > timeout) {
3785 errno = ETIMEDOUT;
3786 return -1;
3787 }
3788 }
3789 return totread;
3790 }
3791
3792 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
3793 ssize_t nread = 0;
3794
3795 size--;
3796 while(size) {
3797 char c;
3798
3799 if (syncRead(fd,&c,1,timeout) == -1) return -1;
3800 if (c == '\n') {
3801 *ptr = '\0';
3802 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
3803 return nread;
3804 } else {
3805 *ptr++ = c;
3806 *ptr = '\0';
3807 nread++;
3808 }
3809 }
3810 return nread;
3811 }
3812
3813 static void syncCommand(redisClient *c) {
3814 /* ignore SYNC if aleady slave or in monitor mode */
3815 if (c->flags & REDIS_SLAVE) return;
3816
3817 /* SYNC can't be issued when the server has pending data to send to
3818 * the client about already issued commands. We need a fresh reply
3819 * buffer registering the differences between the BGSAVE and the current
3820 * dataset, so that we can copy to other slaves if needed. */
3821 if (listLength(c->reply) != 0) {
3822 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3823 return;
3824 }
3825
3826 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
3827 /* Here we need to check if there is a background saving operation
3828 * in progress, or if it is required to start one */
3829 if (server.bgsaveinprogress) {
3830 /* Ok a background save is in progress. Let's check if it is a good
3831 * one for replication, i.e. if there is another slave that is
3832 * registering differences since the server forked to save */
3833 redisClient *slave;
3834 listNode *ln;
3835
3836 listRewind(server.slaves);
3837 while((ln = listYield(server.slaves))) {
3838 slave = ln->value;
3839 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
3840 }
3841 if (ln) {
3842 /* Perfect, the server is already registering differences for
3843 * another slave. Set the right state, and copy the buffer. */
3844 listRelease(c->reply);
3845 c->reply = listDup(slave->reply);
3846 if (!c->reply) oom("listDup copying slave reply list");
3847 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3848 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
3849 } else {
3850 /* No way, we need to wait for the next BGSAVE in order to
3851 * register differences */
3852 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
3853 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
3854 }
3855 } else {
3856 /* Ok we don't have a BGSAVE in progress, let's start one */
3857 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
3858 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3859 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
3860 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
3861 return;
3862 }
3863 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3864 }
3865 c->repldbfd = -1;
3866 c->flags |= REDIS_SLAVE;
3867 c->slaveseldb = 0;
3868 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
3869 return;
3870 }
3871
3872 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
3873 redisClient *slave = privdata;
3874 REDIS_NOTUSED(el);
3875 REDIS_NOTUSED(mask);
3876 char buf[REDIS_IOBUF_LEN];
3877 ssize_t nwritten, buflen;
3878
3879 if (slave->repldboff == 0) {
3880 /* Write the bulk write count before to transfer the DB. In theory here
3881 * we don't know how much room there is in the output buffer of the
3882 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3883 * operations) will never be smaller than the few bytes we need. */
3884 sds bulkcount;
3885
3886 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3887 slave->repldbsize);
3888 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
3889 {
3890 sdsfree(bulkcount);
3891 freeClient(slave);
3892 return;
3893 }
3894 sdsfree(bulkcount);
3895 }
3896 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
3897 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
3898 if (buflen <= 0) {
3899 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
3900 (buflen == 0) ? "premature EOF" : strerror(errno));
3901 freeClient(slave);
3902 return;
3903 }
3904 if ((nwritten = write(fd,buf,buflen)) == -1) {
3905 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
3906 strerror(errno));
3907 freeClient(slave);
3908 return;
3909 }
3910 slave->repldboff += nwritten;
3911 if (slave->repldboff == slave->repldbsize) {
3912 close(slave->repldbfd);
3913 slave->repldbfd = -1;
3914 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3915 slave->replstate = REDIS_REPL_ONLINE;
3916 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
3917 sendReplyToClient, slave, NULL) == AE_ERR) {
3918 freeClient(slave);
3919 return;
3920 }
3921 addReplySds(slave,sdsempty());
3922 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
3923 }
3924 }
3925
3926 static void updateSalvesWaitingBgsave(int bgsaveerr) {
3927 listNode *ln;
3928 int startbgsave = 0;
3929
3930 listRewind(server.slaves);
3931 while((ln = listYield(server.slaves))) {
3932 redisClient *slave = ln->value;
3933
3934 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
3935 startbgsave = 1;
3936 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3937 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
3938 struct redis_stat buf;
3939
3940 if (bgsaveerr != REDIS_OK) {
3941 freeClient(slave);
3942 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
3943 continue;
3944 }
3945 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
3946 redis_fstat(slave->repldbfd,&buf) == -1) {
3947 freeClient(slave);
3948 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
3949 continue;
3950 }
3951 slave->repldboff = 0;
3952 slave->repldbsize = buf.st_size;
3953 slave->replstate = REDIS_REPL_SEND_BULK;
3954 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3955 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
3956 freeClient(slave);
3957 continue;
3958 }
3959 }
3960 }
3961 if (startbgsave) {
3962 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3963 listRewind(server.slaves);
3964 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
3965 while((ln = listYield(server.slaves))) {
3966 redisClient *slave = ln->value;
3967
3968 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
3969 freeClient(slave);
3970 }
3971 }
3972 }
3973 }
3974
3975 static int syncWithMaster(void) {
3976 char buf[1024], tmpfile[256];
3977 int dumpsize;
3978 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
3979 int dfd;
3980
3981 if (fd == -1) {
3982 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
3983 strerror(errno));
3984 return REDIS_ERR;
3985 }
3986 /* Issue the SYNC command */
3987 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
3988 close(fd);
3989 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
3990 strerror(errno));
3991 return REDIS_ERR;
3992 }
3993 /* Read the bulk write count */
3994 if (syncReadLine(fd,buf,1024,3600) == -1) {
3995 close(fd);
3996 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
3997 strerror(errno));
3998 return REDIS_ERR;
3999 }
4000 dumpsize = atoi(buf+1);
4001 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4002 /* Read the bulk write data on a temp file */
4003 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4004 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4005 if (dfd == -1) {
4006 close(fd);
4007 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
4008 return REDIS_ERR;
4009 }
4010 while(dumpsize) {
4011 int nread, nwritten;
4012
4013 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
4014 if (nread == -1) {
4015 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
4016 strerror(errno));
4017 close(fd);
4018 close(dfd);
4019 return REDIS_ERR;
4020 }
4021 nwritten = write(dfd,buf,nread);
4022 if (nwritten == -1) {
4023 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
4024 close(fd);
4025 close(dfd);
4026 return REDIS_ERR;
4027 }
4028 dumpsize -= nread;
4029 }
4030 close(dfd);
4031 if (rename(tmpfile,server.dbfilename) == -1) {
4032 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
4033 unlink(tmpfile);
4034 close(fd);
4035 return REDIS_ERR;
4036 }
4037 emptyDb();
4038 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4039 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4040 close(fd);
4041 return REDIS_ERR;
4042 }
4043 server.master = createClient(fd);
4044 server.master->flags |= REDIS_MASTER;
4045 server.replstate = REDIS_REPL_CONNECTED;
4046 return REDIS_OK;
4047 }
4048
4049 static void slaveofCommand(redisClient *c) {
4050 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4051 !strcasecmp(c->argv[2]->ptr,"one")) {
4052 if (server.masterhost) {
4053 sdsfree(server.masterhost);
4054 server.masterhost = NULL;
4055 if (server.master) freeClient(server.master);
4056 server.replstate = REDIS_REPL_NONE;
4057 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4058 }
4059 } else {
4060 sdsfree(server.masterhost);
4061 server.masterhost = sdsdup(c->argv[1]->ptr);
4062 server.masterport = atoi(c->argv[2]->ptr);
4063 if (server.master) freeClient(server.master);
4064 server.replstate = REDIS_REPL_CONNECT;
4065 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4066 server.masterhost, server.masterport);
4067 }
4068 addReply(c,shared.ok);
4069 }
4070
4071 /* ============================ Maxmemory directive ======================== */
4072
4073 /* This function gets called when 'maxmemory' is set on the config file to limit
4074 * the max memory used by the server, and we are out of memory.
4075 * This function will try to, in order:
4076 *
4077 * - Free objects from the free list
4078 * - Try to remove keys with an EXPIRE set
4079 *
4080 * It is not possible to free enough memory to reach used-memory < maxmemory
4081 * the server will start refusing commands that will enlarge even more the
4082 * memory usage.
4083 */
4084 static void freeMemoryIfNeeded(void) {
4085 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4086 if (listLength(server.objfreelist)) {
4087 robj *o;
4088
4089 listNode *head = listFirst(server.objfreelist);
4090 o = listNodeValue(head);
4091 listDelNode(server.objfreelist,head);
4092 zfree(o);
4093 } else {
4094 int j, k, freed = 0;
4095
4096 for (j = 0; j < server.dbnum; j++) {
4097 int minttl = -1;
4098 robj *minkey = NULL;
4099 struct dictEntry *de;
4100
4101 if (dictSize(server.db[j].expires)) {
4102 freed = 1;
4103 /* From a sample of three keys drop the one nearest to
4104 * the natural expire */
4105 for (k = 0; k < 3; k++) {
4106 time_t t;
4107
4108 de = dictGetRandomKey(server.db[j].expires);
4109 t = (time_t) dictGetEntryVal(de);
4110 if (minttl == -1 || t < minttl) {
4111 minkey = dictGetEntryKey(de);
4112 minttl = t;
4113 }
4114 }
4115 deleteKey(server.db+j,minkey);
4116 }
4117 }
4118 if (!freed) return; /* nothing to free... */
4119 }
4120 }
4121 }
4122
4123 /* ================================= Debugging ============================== */
4124
4125 static void debugCommand(redisClient *c) {
4126 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4127 *((char*)-1) = 'x';
4128 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4129 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4130 robj *key, *val;
4131
4132 if (!de) {
4133 addReply(c,shared.nokeyerr);
4134 return;
4135 }
4136 key = dictGetEntryKey(de);
4137 val = dictGetEntryVal(de);
4138 addReplySds(c,sdscatprintf(sdsempty(),
4139 "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
4140 key, key->refcount, val, val->refcount));
4141 } else {
4142 addReplySds(c,sdsnew(
4143 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4144 }
4145 }
4146
4147 #ifdef HAVE_BACKTRACE
4148 static struct redisFunctionSym symsTable[] = {
4149 {"freeStringObject", (unsigned long)freeStringObject},
4150 {"freeListObject", (unsigned long)freeListObject},
4151 {"freeSetObject", (unsigned long)freeSetObject},
4152 {"decrRefCount", (unsigned long)decrRefCount},
4153 {"createObject", (unsigned long)createObject},
4154 {"freeClient", (unsigned long)freeClient},
4155 {"rdbLoad", (unsigned long)rdbLoad},
4156 {"addReply", (unsigned long)addReply},
4157 {"addReplySds", (unsigned long)addReplySds},
4158 {"incrRefCount", (unsigned long)incrRefCount},
4159 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
4160 {"createStringObject", (unsigned long)createStringObject},
4161 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
4162 {"syncWithMaster", (unsigned long)syncWithMaster},
4163 {"tryObjectSharing", (unsigned long)tryObjectSharing},
4164 {"removeExpire", (unsigned long)removeExpire},
4165 {"expireIfNeeded", (unsigned long)expireIfNeeded},
4166 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
4167 {"deleteKey", (unsigned long)deleteKey},
4168 {"getExpire", (unsigned long)getExpire},
4169 {"setExpire", (unsigned long)setExpire},
4170 {"updateSalvesWaitingBgsave", (unsigned long)updateSalvesWaitingBgsave},
4171 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
4172 {"authCommand", (unsigned long)authCommand},
4173 {"pingCommand", (unsigned long)pingCommand},
4174 {"echoCommand", (unsigned long)echoCommand},
4175 {"setCommand", (unsigned long)setCommand},
4176 {"setnxCommand", (unsigned long)setnxCommand},
4177 {"getCommand", (unsigned long)getCommand},
4178 {"delCommand", (unsigned long)delCommand},
4179 {"existsCommand", (unsigned long)existsCommand},
4180 {"incrCommand", (unsigned long)incrCommand},
4181 {"decrCommand", (unsigned long)decrCommand},
4182 {"incrbyCommand", (unsigned long)incrbyCommand},
4183 {"decrbyCommand", (unsigned long)decrbyCommand},
4184 {"selectCommand", (unsigned long)selectCommand},
4185 {"randomkeyCommand", (unsigned long)randomkeyCommand},
4186 {"keysCommand", (unsigned long)keysCommand},
4187 {"dbsizeCommand", (unsigned long)dbsizeCommand},
4188 {"lastsaveCommand", (unsigned long)lastsaveCommand},
4189 {"saveCommand", (unsigned long)saveCommand},
4190 {"bgsaveCommand", (unsigned long)bgsaveCommand},
4191 {"shutdownCommand", (unsigned long)shutdownCommand},
4192 {"moveCommand", (unsigned long)moveCommand},
4193 {"renameCommand", (unsigned long)renameCommand},
4194 {"renamenxCommand", (unsigned long)renamenxCommand},
4195 {"lpushCommand", (unsigned long)lpushCommand},
4196 {"rpushCommand", (unsigned long)rpushCommand},
4197 {"lpopCommand", (unsigned long)lpopCommand},
4198 {"rpopCommand", (unsigned long)rpopCommand},
4199 {"llenCommand", (unsigned long)llenCommand},
4200 {"lindexCommand", (unsigned long)lindexCommand},
4201 {"lrangeCommand", (unsigned long)lrangeCommand},
4202 {"ltrimCommand", (unsigned long)ltrimCommand},
4203 {"typeCommand", (unsigned long)typeCommand},
4204 {"lsetCommand", (unsigned long)lsetCommand},
4205 {"saddCommand", (unsigned long)saddCommand},
4206 {"sremCommand", (unsigned long)sremCommand},
4207 {"smoveCommand", (unsigned long)smoveCommand},
4208 {"sismemberCommand", (unsigned long)sismemberCommand},
4209 {"scardCommand", (unsigned long)scardCommand},
4210 {"spopCommand", (unsigned long)spopCommand},
4211 {"sinterCommand", (unsigned long)sinterCommand},
4212 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
4213 {"sunionCommand", (unsigned long)sunionCommand},
4214 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
4215 {"sdiffCommand", (unsigned long)sdiffCommand},
4216 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
4217 {"syncCommand", (unsigned long)syncCommand},
4218 {"flushdbCommand", (unsigned long)flushdbCommand},
4219 {"flushallCommand", (unsigned long)flushallCommand},
4220 {"sortCommand", (unsigned long)sortCommand},
4221 {"lremCommand", (unsigned long)lremCommand},
4222 {"infoCommand", (unsigned long)infoCommand},
4223 {"mgetCommand", (unsigned long)mgetCommand},
4224 {"monitorCommand", (unsigned long)monitorCommand},
4225 {"expireCommand", (unsigned long)expireCommand},
4226 {"getSetCommand", (unsigned long)getSetCommand},
4227 {"ttlCommand", (unsigned long)ttlCommand},
4228 {"slaveofCommand", (unsigned long)slaveofCommand},
4229 {"debugCommand", (unsigned long)debugCommand},
4230 {"processCommand", (unsigned long)processCommand},
4231 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
4232 {"readQueryFromClient", (unsigned long)readQueryFromClient},
4233 {NULL,0}
4234 };
4235
4236 /* This function try to convert a pointer into a function name. It's used in
4237 * oreder to provide a backtrace under segmentation fault that's able to
4238 * display functions declared as static (otherwise the backtrace is useless). */
4239 static char *findFuncName(void *pointer, unsigned long *offset){
4240 int i, ret = -1;
4241 unsigned long off, minoff = 0;
4242
4243 /* Try to match against the Symbol with the smallest offset */
4244 for (i=0; symsTable[i].pointer; i++) {
4245 unsigned long lp = (unsigned long) pointer;
4246
4247 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
4248 off=lp-symsTable[i].pointer;
4249 if (ret < 0 || off < minoff) {
4250 minoff=off;
4251 ret=i;
4252 }
4253 }
4254 }
4255 if (ret == -1) return NULL;
4256 *offset = minoff;
4257 return symsTable[ret].name;
4258 }
4259
4260 static void *getMcontextEip(ucontext_t *uc) {
4261 #if defined(__FreeBSD__)
4262 return (void*) uc->uc_mcontext.mc_eip;
4263 #elif defined(__dietlibc__)
4264 return (void*) uc->uc_mcontext.eip;
4265 #elif defined(__APPLE__)
4266 return (void*) uc->uc_mcontext->__ss.__eip;
4267 #else /* Linux */
4268 return (void*) uc->uc_mcontext.gregs[REG_EIP];
4269 #endif
4270 }
4271
4272 static void segvHandler(int sig, siginfo_t *info, void *secret) {
4273 void *trace[100];
4274 char **messages = NULL;
4275 int i, trace_size = 0;
4276 unsigned long offset=0;
4277 time_t uptime = time(NULL)-server.stat_starttime;
4278 ucontext_t *uc = (ucontext_t*) secret;
4279 REDIS_NOTUSED(info);
4280
4281 redisLog(REDIS_WARNING,
4282 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
4283 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
4284 "redis_version:%s; "
4285 "uptime_in_seconds:%d; "
4286 "connected_clients:%d; "
4287 "connected_slaves:%d; "
4288 "used_memory:%zu; "
4289 "changes_since_last_save:%lld; "
4290 "bgsave_in_progress:%d; "
4291 "last_save_time:%d; "
4292 "total_connections_received:%lld; "
4293 "total_commands_processed:%lld; "
4294 "role:%s;"
4295 ,REDIS_VERSION,
4296 uptime,
4297 listLength(server.clients)-listLength(server.slaves),
4298 listLength(server.slaves),
4299 server.usedmemory,
4300 server.dirty,
4301 server.bgsaveinprogress,
4302 server.lastsave,
4303 server.stat_numconnections,
4304 server.stat_numcommands,
4305 server.masterhost == NULL ? "master" : "slave"
4306 ));
4307
4308 trace_size = backtrace(trace, 100);
4309 /* overwrite sigaction with caller's address */
4310 trace[1] = getMcontextEip(uc);
4311 messages = backtrace_symbols(trace, trace_size);
4312
4313 for (i=1; i<trace_size; ++i) {
4314 char *fn = findFuncName(trace[i], &offset), *p;
4315
4316 p = strchr(messages[i],'+');
4317 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
4318 redisLog(REDIS_WARNING,"%s", messages[i]);
4319 } else {
4320 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
4321 }
4322 }
4323 free(messages);
4324 exit(0);
4325 }
4326
4327 static void setupSigSegvAction(void) {
4328 struct sigaction act;
4329
4330 sigemptyset (&act.sa_mask);
4331 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4332 * is used. Otherwise, sa_handler is used */
4333 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
4334 act.sa_sigaction = segvHandler;
4335 sigaction (SIGSEGV, &act, NULL);
4336 sigaction (SIGBUS, &act, NULL);
4337 sigaction (SIGFPE, &act, NULL);
4338 sigaction (SIGILL, &act, NULL);
4339 sigaction (SIGBUS, &act, NULL);
4340 return;
4341 }
4342 #else /* HAVE_BACKTRACE */
4343 static void setupSigSegvAction(void) {
4344 }
4345 #endif /* HAVE_BACKTRACE */
4346
4347 /* =================================== Main! ================================ */
4348
4349 #ifdef __linux__
4350 int linuxOvercommitMemoryValue(void) {
4351 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4352 char buf[64];
4353
4354 if (!fp) return -1;
4355 if (fgets(buf,64,fp) == NULL) {
4356 fclose(fp);
4357 return -1;
4358 }
4359 fclose(fp);
4360
4361 return atoi(buf);
4362 }
4363
4364 void linuxOvercommitMemoryWarning(void) {
4365 if (linuxOvercommitMemoryValue() == 0) {
4366 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4367 }
4368 }
4369 #endif /* __linux__ */
4370
4371 static void daemonize(void) {
4372 int fd;
4373 FILE *fp;
4374
4375 if (fork() != 0) exit(0); /* parent exits */
4376 setsid(); /* create a new session */
4377
4378 /* Every output goes to /dev/null. If Redis is daemonized but
4379 * the 'logfile' is set to 'stdout' in the configuration file
4380 * it will not log at all. */
4381 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4382 dup2(fd, STDIN_FILENO);
4383 dup2(fd, STDOUT_FILENO);
4384 dup2(fd, STDERR_FILENO);
4385 if (fd > STDERR_FILENO) close(fd);
4386 }
4387 /* Try to write the pid file */
4388 fp = fopen(server.pidfile,"w");
4389 if (fp) {
4390 fprintf(fp,"%d\n",getpid());
4391 fclose(fp);
4392 }
4393 }
4394
4395 int main(int argc, char **argv) {
4396 #ifdef __linux__
4397 linuxOvercommitMemoryWarning();
4398 #endif
4399
4400 initServerConfig();
4401 if (argc == 2) {
4402 ResetServerSaveParams();
4403 loadServerConfig(argv[1]);
4404 } else if (argc > 2) {
4405 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4406 exit(1);
4407 } else {
4408 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4409 }
4410 initServer();
4411 if (server.daemonize) daemonize();
4412 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4413 if (rdbLoad(server.dbfilename) == REDIS_OK)
4414 redisLog(REDIS_NOTICE,"DB loaded from disk");
4415 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4416 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4417 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4418 aeMain(server.el);
4419 aeDeleteEventLoop(server.el);
4420 return 0;
4421 }