]> git.saurik.com Git - redis.git/blob - redis.c
Kill the background saving process before performing SHUTDOWN to avoid races
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "0.101"
31
32 #include "fmacros.h"
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 #include <signal.h>
40 #include <sys/wait.h>
41 #include <errno.h>
42 #include <assert.h>
43 #include <ctype.h>
44 #include <stdarg.h>
45 #include <inttypes.h>
46 #include <arpa/inet.h>
47 #include <sys/stat.h>
48 #include <fcntl.h>
49 #include <sys/time.h>
50 #include <sys/resource.h>
51 #include <limits.h>
52 #include <execinfo.h>
53
54 #include "ae.h" /* Event driven programming library */
55 #include "sds.h" /* Dynamic safe strings */
56 #include "anet.h" /* Networking the easy way */
57 #include "dict.h" /* Hash tables */
58 #include "adlist.h" /* Linked lists */
59 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
60 #include "lzf.h" /* LZF compression library */
61 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
62
63 #include "config.h"
64
65 /* Error codes */
66 #define REDIS_OK 0
67 #define REDIS_ERR -1
68
69 /* Static server configuration */
70 #define REDIS_SERVERPORT 6379 /* TCP port */
71 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
72 #define REDIS_IOBUF_LEN 1024
73 #define REDIS_LOADBUF_LEN 1024
74 #define REDIS_STATIC_ARGS 4
75 #define REDIS_DEFAULT_DBNUM 16
76 #define REDIS_CONFIGLINE_MAX 1024
77 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
78 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
79 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
80
81 /* Hash table parameters */
82 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
83 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
84
85 /* Command flags */
86 #define REDIS_CMD_BULK 1 /* Bulk write command */
87 #define REDIS_CMD_INLINE 2 /* Inline command */
88 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
89 this flags will return an error when the 'maxmemory' option is set in the
90 config file and the server is using more than maxmemory bytes of memory.
91 In short this commands are denied on low memory conditions. */
92 #define REDIS_CMD_DENYOOM 4
93
94 /* Object types */
95 #define REDIS_STRING 0
96 #define REDIS_LIST 1
97 #define REDIS_SET 2
98 #define REDIS_HASH 3
99
100 /* Object types only used for dumping to disk */
101 #define REDIS_EXPIRETIME 253
102 #define REDIS_SELECTDB 254
103 #define REDIS_EOF 255
104
105 /* Defines related to the dump file format. To store 32 bits lengths for short
106 * keys requires a lot of space, so we check the most significant 2 bits of
107 * the first byte to interpreter the length:
108 *
109 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
110 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
111 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
112 * 11|000000 this means: specially encoded object will follow. The six bits
113 * number specify the kind of object that follows.
114 * See the REDIS_RDB_ENC_* defines.
115 *
116 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
117 * values, will fit inside. */
118 #define REDIS_RDB_6BITLEN 0
119 #define REDIS_RDB_14BITLEN 1
120 #define REDIS_RDB_32BITLEN 2
121 #define REDIS_RDB_ENCVAL 3
122 #define REDIS_RDB_LENERR UINT_MAX
123
124 /* When a length of a string object stored on disk has the first two bits
125 * set, the remaining two bits specify a special encoding for the object
126 * accordingly to the following defines: */
127 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
128 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
129 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
130 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
131
132 /* Client flags */
133 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
134 #define REDIS_SLAVE 2 /* This client is a slave server */
135 #define REDIS_MASTER 4 /* This client is a master server */
136 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
137
138 /* Slave replication state - slave side */
139 #define REDIS_REPL_NONE 0 /* No active replication */
140 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
141 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
142
143 /* Slave replication state - from the point of view of master
144 * Note that in SEND_BULK and ONLINE state the slave receives new updates
145 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
146 * to start the next background saving in order to send updates to it. */
147 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
148 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
149 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
150 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
151
152 /* List related stuff */
153 #define REDIS_HEAD 0
154 #define REDIS_TAIL 1
155
156 /* Sort operations */
157 #define REDIS_SORT_GET 0
158 #define REDIS_SORT_DEL 1
159 #define REDIS_SORT_INCR 2
160 #define REDIS_SORT_DECR 3
161 #define REDIS_SORT_ASC 4
162 #define REDIS_SORT_DESC 5
163 #define REDIS_SORTKEY_MAX 1024
164
165 /* Log levels */
166 #define REDIS_DEBUG 0
167 #define REDIS_NOTICE 1
168 #define REDIS_WARNING 2
169
170 /* Anti-warning macro... */
171 #define REDIS_NOTUSED(V) ((void) V)
172
173 /*================================= Data types ============================== */
174
175 /* A redis object, that is a type able to hold a string / list / set */
176 typedef struct redisObject {
177 void *ptr;
178 int type;
179 int refcount;
180 } robj;
181
182 typedef struct redisDb {
183 dict *dict;
184 dict *expires;
185 int id;
186 } redisDb;
187
188 /* With multiplexing we need to take per-clinet state.
189 * Clients are taken in a liked list. */
190 typedef struct redisClient {
191 int fd;
192 redisDb *db;
193 int dictid;
194 sds querybuf;
195 robj **argv;
196 int argc;
197 int bulklen; /* bulk read len. -1 if not in bulk read mode */
198 list *reply;
199 int sentlen;
200 time_t lastinteraction; /* time of the last interaction, used for timeout */
201 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
202 int slaveseldb; /* slave selected db, if this client is a slave */
203 int authenticated; /* when requirepass is non-NULL */
204 int replstate; /* replication state if this is a slave */
205 int repldbfd; /* replication DB file descriptor */
206 long repldboff; /* replication DB file offset */
207 off_t repldbsize; /* replication DB file size */
208 } redisClient;
209
210 struct saveparam {
211 time_t seconds;
212 int changes;
213 };
214
215 /* Global server state structure */
216 struct redisServer {
217 int port;
218 int fd;
219 redisDb *db;
220 dict *sharingpool;
221 unsigned int sharingpoolsize;
222 long long dirty; /* changes to DB from the last save */
223 list *clients;
224 list *slaves, *monitors;
225 char neterr[ANET_ERR_LEN];
226 aeEventLoop *el;
227 int cronloops; /* number of times the cron function run */
228 list *objfreelist; /* A list of freed objects to avoid malloc() */
229 time_t lastsave; /* Unix time of last save succeeede */
230 size_t usedmemory; /* Used memory in megabytes */
231 /* Fields used only for stats */
232 time_t stat_starttime; /* server start time */
233 long long stat_numcommands; /* number of processed commands */
234 long long stat_numconnections; /* number of connections received */
235 /* Configuration */
236 int verbosity;
237 int glueoutputbuf;
238 int maxidletime;
239 int dbnum;
240 int daemonize;
241 char *pidfile;
242 int bgsaveinprogress;
243 pid_t bgsavechildpid;
244 struct saveparam *saveparams;
245 int saveparamslen;
246 char *logfile;
247 char *bindaddr;
248 char *dbfilename;
249 char *requirepass;
250 int shareobjects;
251 /* Replication related */
252 int isslave;
253 char *masterhost;
254 int masterport;
255 redisClient *master; /* client that is master for this slave */
256 int replstate;
257 unsigned int maxclients;
258 unsigned int maxmemory;
259 /* Sort parameters - qsort_r() is only available under BSD so we
260 * have to take this state global, in order to pass it to sortCompare() */
261 int sort_desc;
262 int sort_alpha;
263 int sort_bypattern;
264 };
265
266 typedef void redisCommandProc(redisClient *c);
267 struct redisCommand {
268 char *name;
269 redisCommandProc *proc;
270 int arity;
271 int flags;
272 };
273
274 typedef struct _redisSortObject {
275 robj *obj;
276 union {
277 double score;
278 robj *cmpobj;
279 } u;
280 } redisSortObject;
281
282 typedef struct _redisSortOperation {
283 int type;
284 robj *pattern;
285 } redisSortOperation;
286
287 struct sharedObjectsStruct {
288 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
289 *colon, *nullbulk, *nullmultibulk,
290 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
291 *outofrangeerr, *plus,
292 *select0, *select1, *select2, *select3, *select4,
293 *select5, *select6, *select7, *select8, *select9;
294 } shared;
295
296 /*================================ Prototypes =============================== */
297
298 static void freeStringObject(robj *o);
299 static void freeListObject(robj *o);
300 static void freeSetObject(robj *o);
301 static void decrRefCount(void *o);
302 static robj *createObject(int type, void *ptr);
303 static void freeClient(redisClient *c);
304 static int rdbLoad(char *filename);
305 static void addReply(redisClient *c, robj *obj);
306 static void addReplySds(redisClient *c, sds s);
307 static void incrRefCount(robj *o);
308 static int rdbSaveBackground(char *filename);
309 static robj *createStringObject(char *ptr, size_t len);
310 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
311 static int syncWithMaster(void);
312 static robj *tryObjectSharing(robj *o);
313 static int removeExpire(redisDb *db, robj *key);
314 static int expireIfNeeded(redisDb *db, robj *key);
315 static int deleteIfVolatile(redisDb *db, robj *key);
316 static int deleteKey(redisDb *db, robj *key);
317 static time_t getExpire(redisDb *db, robj *key);
318 static int setExpire(redisDb *db, robj *key, time_t when);
319 static void updateSalvesWaitingBgsave(int bgsaveerr);
320 static void freeMemoryIfNeeded(void);
321
322 static void authCommand(redisClient *c);
323 static void pingCommand(redisClient *c);
324 static void echoCommand(redisClient *c);
325 static void setCommand(redisClient *c);
326 static void setnxCommand(redisClient *c);
327 static void getCommand(redisClient *c);
328 static void delCommand(redisClient *c);
329 static void existsCommand(redisClient *c);
330 static void incrCommand(redisClient *c);
331 static void decrCommand(redisClient *c);
332 static void incrbyCommand(redisClient *c);
333 static void decrbyCommand(redisClient *c);
334 static void selectCommand(redisClient *c);
335 static void randomkeyCommand(redisClient *c);
336 static void keysCommand(redisClient *c);
337 static void dbsizeCommand(redisClient *c);
338 static void lastsaveCommand(redisClient *c);
339 static void saveCommand(redisClient *c);
340 static void bgsaveCommand(redisClient *c);
341 static void shutdownCommand(redisClient *c);
342 static void moveCommand(redisClient *c);
343 static void renameCommand(redisClient *c);
344 static void renamenxCommand(redisClient *c);
345 static void lpushCommand(redisClient *c);
346 static void rpushCommand(redisClient *c);
347 static void lpopCommand(redisClient *c);
348 static void rpopCommand(redisClient *c);
349 static void llenCommand(redisClient *c);
350 static void lindexCommand(redisClient *c);
351 static void lrangeCommand(redisClient *c);
352 static void ltrimCommand(redisClient *c);
353 static void typeCommand(redisClient *c);
354 static void lsetCommand(redisClient *c);
355 static void saddCommand(redisClient *c);
356 static void sremCommand(redisClient *c);
357 static void smoveCommand(redisClient *c);
358 static void sismemberCommand(redisClient *c);
359 static void scardCommand(redisClient *c);
360 static void sinterCommand(redisClient *c);
361 static void sinterstoreCommand(redisClient *c);
362 static void sunionCommand(redisClient *c);
363 static void sunionstoreCommand(redisClient *c);
364 static void sdiffCommand(redisClient *c);
365 static void sdiffstoreCommand(redisClient *c);
366 static void syncCommand(redisClient *c);
367 static void flushdbCommand(redisClient *c);
368 static void flushallCommand(redisClient *c);
369 static void sortCommand(redisClient *c);
370 static void lremCommand(redisClient *c);
371 static void infoCommand(redisClient *c);
372 static void mgetCommand(redisClient *c);
373 static void monitorCommand(redisClient *c);
374 static void expireCommand(redisClient *c);
375 static void getSetCommand(redisClient *c);
376 static void ttlCommand(redisClient *c);
377 static void slaveofCommand(redisClient *c);
378 static void debugCommand(redisClient *c);
379
380 /*================================= Globals ================================= */
381
382 /* Global vars */
383 static struct redisServer server; /* server global state */
384 static struct redisCommand cmdTable[] = {
385 {"get",getCommand,2,REDIS_CMD_INLINE},
386 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
387 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
388 {"del",delCommand,-2,REDIS_CMD_INLINE},
389 {"exists",existsCommand,2,REDIS_CMD_INLINE},
390 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
391 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
392 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
393 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
394 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
395 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
396 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
397 {"llen",llenCommand,2,REDIS_CMD_INLINE},
398 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
399 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
400 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
401 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
402 {"lrem",lremCommand,4,REDIS_CMD_BULK},
403 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
404 {"srem",sremCommand,3,REDIS_CMD_BULK},
405 {"smove",smoveCommand,4,REDIS_CMD_BULK},
406 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
407 {"scard",scardCommand,2,REDIS_CMD_INLINE},
408 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
409 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
410 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
411 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
412 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
413 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
414 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
415 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
416 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
417 {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
418 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
419 {"select",selectCommand,2,REDIS_CMD_INLINE},
420 {"move",moveCommand,3,REDIS_CMD_INLINE},
421 {"rename",renameCommand,3,REDIS_CMD_INLINE},
422 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
423 {"expire",expireCommand,3,REDIS_CMD_INLINE},
424 {"keys",keysCommand,2,REDIS_CMD_INLINE},
425 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
426 {"auth",authCommand,2,REDIS_CMD_INLINE},
427 {"ping",pingCommand,1,REDIS_CMD_INLINE},
428 {"echo",echoCommand,2,REDIS_CMD_BULK},
429 {"save",saveCommand,1,REDIS_CMD_INLINE},
430 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
431 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
432 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
433 {"type",typeCommand,2,REDIS_CMD_INLINE},
434 {"sync",syncCommand,1,REDIS_CMD_INLINE},
435 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
436 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
437 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
438 {"info",infoCommand,1,REDIS_CMD_INLINE},
439 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
440 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
441 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
442 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
443 {NULL,NULL,0,0}
444 };
445
446 /*============================ Utility functions ============================ */
447
448 /* Glob-style pattern matching. */
449 int stringmatchlen(const char *pattern, int patternLen,
450 const char *string, int stringLen, int nocase)
451 {
452 while(patternLen) {
453 switch(pattern[0]) {
454 case '*':
455 while (pattern[1] == '*') {
456 pattern++;
457 patternLen--;
458 }
459 if (patternLen == 1)
460 return 1; /* match */
461 while(stringLen) {
462 if (stringmatchlen(pattern+1, patternLen-1,
463 string, stringLen, nocase))
464 return 1; /* match */
465 string++;
466 stringLen--;
467 }
468 return 0; /* no match */
469 break;
470 case '?':
471 if (stringLen == 0)
472 return 0; /* no match */
473 string++;
474 stringLen--;
475 break;
476 case '[':
477 {
478 int not, match;
479
480 pattern++;
481 patternLen--;
482 not = pattern[0] == '^';
483 if (not) {
484 pattern++;
485 patternLen--;
486 }
487 match = 0;
488 while(1) {
489 if (pattern[0] == '\\') {
490 pattern++;
491 patternLen--;
492 if (pattern[0] == string[0])
493 match = 1;
494 } else if (pattern[0] == ']') {
495 break;
496 } else if (patternLen == 0) {
497 pattern--;
498 patternLen++;
499 break;
500 } else if (pattern[1] == '-' && patternLen >= 3) {
501 int start = pattern[0];
502 int end = pattern[2];
503 int c = string[0];
504 if (start > end) {
505 int t = start;
506 start = end;
507 end = t;
508 }
509 if (nocase) {
510 start = tolower(start);
511 end = tolower(end);
512 c = tolower(c);
513 }
514 pattern += 2;
515 patternLen -= 2;
516 if (c >= start && c <= end)
517 match = 1;
518 } else {
519 if (!nocase) {
520 if (pattern[0] == string[0])
521 match = 1;
522 } else {
523 if (tolower((int)pattern[0]) == tolower((int)string[0]))
524 match = 1;
525 }
526 }
527 pattern++;
528 patternLen--;
529 }
530 if (not)
531 match = !match;
532 if (!match)
533 return 0; /* no match */
534 string++;
535 stringLen--;
536 break;
537 }
538 case '\\':
539 if (patternLen >= 2) {
540 pattern++;
541 patternLen--;
542 }
543 /* fall through */
544 default:
545 if (!nocase) {
546 if (pattern[0] != string[0])
547 return 0; /* no match */
548 } else {
549 if (tolower((int)pattern[0]) != tolower((int)string[0]))
550 return 0; /* no match */
551 }
552 string++;
553 stringLen--;
554 break;
555 }
556 pattern++;
557 patternLen--;
558 if (stringLen == 0) {
559 while(*pattern == '*') {
560 pattern++;
561 patternLen--;
562 }
563 break;
564 }
565 }
566 if (patternLen == 0 && stringLen == 0)
567 return 1;
568 return 0;
569 }
570
571 void redisLog(int level, const char *fmt, ...)
572 {
573 va_list ap;
574 FILE *fp;
575
576 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
577 if (!fp) return;
578
579 va_start(ap, fmt);
580 if (level >= server.verbosity) {
581 char *c = ".-*";
582 char buf[64];
583 time_t now;
584
585 now = time(NULL);
586 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
587 fprintf(fp,"%s %c ",buf,c[level]);
588 vfprintf(fp, fmt, ap);
589 fprintf(fp,"\n");
590 fflush(fp);
591 }
592 va_end(ap);
593
594 if (server.logfile) fclose(fp);
595 }
596
597 /*====================== Hash table type implementation ==================== */
598
599 /* This is an hash table type that uses the SDS dynamic strings libary as
600 * keys and radis objects as values (objects can hold SDS strings,
601 * lists, sets). */
602
603 static int sdsDictKeyCompare(void *privdata, const void *key1,
604 const void *key2)
605 {
606 int l1,l2;
607 DICT_NOTUSED(privdata);
608
609 l1 = sdslen((sds)key1);
610 l2 = sdslen((sds)key2);
611 if (l1 != l2) return 0;
612 return memcmp(key1, key2, l1) == 0;
613 }
614
615 static void dictRedisObjectDestructor(void *privdata, void *val)
616 {
617 DICT_NOTUSED(privdata);
618
619 decrRefCount(val);
620 }
621
622 static int dictSdsKeyCompare(void *privdata, const void *key1,
623 const void *key2)
624 {
625 const robj *o1 = key1, *o2 = key2;
626 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
627 }
628
629 static unsigned int dictSdsHash(const void *key) {
630 const robj *o = key;
631 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
632 }
633
634 static dictType setDictType = {
635 dictSdsHash, /* hash function */
636 NULL, /* key dup */
637 NULL, /* val dup */
638 dictSdsKeyCompare, /* key compare */
639 dictRedisObjectDestructor, /* key destructor */
640 NULL /* val destructor */
641 };
642
643 static dictType hashDictType = {
644 dictSdsHash, /* hash function */
645 NULL, /* key dup */
646 NULL, /* val dup */
647 dictSdsKeyCompare, /* key compare */
648 dictRedisObjectDestructor, /* key destructor */
649 dictRedisObjectDestructor /* val destructor */
650 };
651
652 /* ========================= Random utility functions ======================= */
653
654 /* Redis generally does not try to recover from out of memory conditions
655 * when allocating objects or strings, it is not clear if it will be possible
656 * to report this condition to the client since the networking layer itself
657 * is based on heap allocation for send buffers, so we simply abort.
658 * At least the code will be simpler to read... */
659 static void oom(const char *msg) {
660 fprintf(stderr, "%s: Out of memory\n",msg);
661 fflush(stderr);
662 sleep(1);
663 abort();
664 }
665
666 /* ====================== Redis server networking stuff ===================== */
667 void closeTimedoutClients(void) {
668 redisClient *c;
669 listNode *ln;
670 time_t now = time(NULL);
671
672 listRewind(server.clients);
673 while ((ln = listYield(server.clients)) != NULL) {
674 c = listNodeValue(ln);
675 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
676 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
677 (now - c->lastinteraction > server.maxidletime)) {
678 redisLog(REDIS_DEBUG,"Closing idle client");
679 freeClient(c);
680 }
681 }
682 }
683
684 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
685 * we resize the hash table to save memory */
686 void tryResizeHashTables(void) {
687 int j;
688
689 for (j = 0; j < server.dbnum; j++) {
690 long long size, used;
691
692 size = dictSlots(server.db[j].dict);
693 used = dictSize(server.db[j].dict);
694 if (size && used && size > REDIS_HT_MINSLOTS &&
695 (used*100/size < REDIS_HT_MINFILL)) {
696 redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
697 dictResize(server.db[j].dict);
698 redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
699 }
700 }
701 }
702
703 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
704 int j, loops = server.cronloops++;
705 REDIS_NOTUSED(eventLoop);
706 REDIS_NOTUSED(id);
707 REDIS_NOTUSED(clientData);
708
709 /* Update the global state with the amount of used memory */
710 server.usedmemory = zmalloc_used_memory();
711
712 /* Show some info about non-empty databases */
713 for (j = 0; j < server.dbnum; j++) {
714 long long size, used, vkeys;
715
716 size = dictSlots(server.db[j].dict);
717 used = dictSize(server.db[j].dict);
718 vkeys = dictSize(server.db[j].expires);
719 if (!(loops % 5) && used > 0) {
720 redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size);
721 /* dictPrintStats(server.dict); */
722 }
723 }
724
725 /* We don't want to resize the hash tables while a bacground saving
726 * is in progress: the saving child is created using fork() that is
727 * implemented with a copy-on-write semantic in most modern systems, so
728 * if we resize the HT while there is the saving child at work actually
729 * a lot of memory movements in the parent will cause a lot of pages
730 * copied. */
731 if (!server.bgsaveinprogress) tryResizeHashTables();
732
733 /* Show information about connected clients */
734 if (!(loops % 5)) {
735 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use",
736 listLength(server.clients)-listLength(server.slaves),
737 listLength(server.slaves),
738 server.usedmemory,
739 dictSize(server.sharingpool));
740 }
741
742 /* Close connections of timedout clients */
743 if (server.maxidletime && !(loops % 10))
744 closeTimedoutClients();
745
746 /* Check if a background saving in progress terminated */
747 if (server.bgsaveinprogress) {
748 int statloc;
749 /* XXX: TODO handle the case of the saving child killed */
750 if (wait4(-1,&statloc,WNOHANG,NULL)) {
751 int exitcode = WEXITSTATUS(statloc);
752 int bysignal = WIFSIGNALED(statloc);
753
754 if (!bysignal && exitcode == 0) {
755 redisLog(REDIS_NOTICE,
756 "Background saving terminated with success");
757 server.dirty = 0;
758 server.lastsave = time(NULL);
759 } else if (!bysignal && exitcode != 0) {
760 redisLog(REDIS_WARNING, "Background saving error");
761 } else {
762 redisLog(REDIS_WARNING,
763 "Background saving terminated by signal");
764 }
765 server.bgsaveinprogress = 0;
766 server.bgsavechildpid = -1;
767 updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
768 }
769 } else {
770 /* If there is not a background saving in progress check if
771 * we have to save now */
772 time_t now = time(NULL);
773 for (j = 0; j < server.saveparamslen; j++) {
774 struct saveparam *sp = server.saveparams+j;
775
776 if (server.dirty >= sp->changes &&
777 now-server.lastsave > sp->seconds) {
778 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
779 sp->changes, sp->seconds);
780 rdbSaveBackground(server.dbfilename);
781 break;
782 }
783 }
784 }
785
786 /* Try to expire a few timed out keys */
787 for (j = 0; j < server.dbnum; j++) {
788 redisDb *db = server.db+j;
789 int num = dictSize(db->expires);
790
791 if (num) {
792 time_t now = time(NULL);
793
794 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
795 num = REDIS_EXPIRELOOKUPS_PER_CRON;
796 while (num--) {
797 dictEntry *de;
798 time_t t;
799
800 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
801 t = (time_t) dictGetEntryVal(de);
802 if (now > t) {
803 deleteKey(db,dictGetEntryKey(de));
804 }
805 }
806 }
807 }
808
809 /* Check if we should connect to a MASTER */
810 if (server.replstate == REDIS_REPL_CONNECT) {
811 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
812 if (syncWithMaster() == REDIS_OK) {
813 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
814 }
815 }
816 return 1000;
817 }
818
819 static void createSharedObjects(void) {
820 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
821 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
822 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
823 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
824 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
825 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
826 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
827 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
828 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
829 /* no such key */
830 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
831 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
832 "-ERR Operation against a key holding the wrong kind of value\r\n"));
833 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
834 "-ERR no such key\r\n"));
835 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
836 "-ERR syntax error\r\n"));
837 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
838 "-ERR source and destination objects are the same\r\n"));
839 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
840 "-ERR index out of range\r\n"));
841 shared.space = createObject(REDIS_STRING,sdsnew(" "));
842 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
843 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
844 shared.select0 = createStringObject("select 0\r\n",10);
845 shared.select1 = createStringObject("select 1\r\n",10);
846 shared.select2 = createStringObject("select 2\r\n",10);
847 shared.select3 = createStringObject("select 3\r\n",10);
848 shared.select4 = createStringObject("select 4\r\n",10);
849 shared.select5 = createStringObject("select 5\r\n",10);
850 shared.select6 = createStringObject("select 6\r\n",10);
851 shared.select7 = createStringObject("select 7\r\n",10);
852 shared.select8 = createStringObject("select 8\r\n",10);
853 shared.select9 = createStringObject("select 9\r\n",10);
854 }
855
856 static void appendServerSaveParams(time_t seconds, int changes) {
857 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
858 if (server.saveparams == NULL) oom("appendServerSaveParams");
859 server.saveparams[server.saveparamslen].seconds = seconds;
860 server.saveparams[server.saveparamslen].changes = changes;
861 server.saveparamslen++;
862 }
863
864 static void ResetServerSaveParams() {
865 zfree(server.saveparams);
866 server.saveparams = NULL;
867 server.saveparamslen = 0;
868 }
869
870 static void initServerConfig() {
871 server.dbnum = REDIS_DEFAULT_DBNUM;
872 server.port = REDIS_SERVERPORT;
873 server.verbosity = REDIS_DEBUG;
874 server.maxidletime = REDIS_MAXIDLETIME;
875 server.saveparams = NULL;
876 server.logfile = NULL; /* NULL = log on standard output */
877 server.bindaddr = NULL;
878 server.glueoutputbuf = 1;
879 server.daemonize = 0;
880 server.pidfile = "/var/run/redis.pid";
881 server.dbfilename = "dump.rdb";
882 server.requirepass = NULL;
883 server.shareobjects = 0;
884 server.maxclients = 0;
885 server.maxmemory = 0;
886 ResetServerSaveParams();
887
888 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
889 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
890 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
891 /* Replication related */
892 server.isslave = 0;
893 server.masterhost = NULL;
894 server.masterport = 6379;
895 server.master = NULL;
896 server.replstate = REDIS_REPL_NONE;
897 }
898
899 static void initServer() {
900 int j;
901
902 signal(SIGHUP, SIG_IGN);
903 signal(SIGPIPE, SIG_IGN);
904
905 server.clients = listCreate();
906 server.slaves = listCreate();
907 server.monitors = listCreate();
908 server.objfreelist = listCreate();
909 createSharedObjects();
910 server.el = aeCreateEventLoop();
911 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
912 server.sharingpool = dictCreate(&setDictType,NULL);
913 server.sharingpoolsize = 1024;
914 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
915 oom("server initialization"); /* Fatal OOM */
916 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
917 if (server.fd == -1) {
918 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
919 exit(1);
920 }
921 for (j = 0; j < server.dbnum; j++) {
922 server.db[j].dict = dictCreate(&hashDictType,NULL);
923 server.db[j].expires = dictCreate(&setDictType,NULL);
924 server.db[j].id = j;
925 }
926 server.cronloops = 0;
927 server.bgsaveinprogress = 0;
928 server.bgsavechildpid = -1;
929 server.lastsave = time(NULL);
930 server.dirty = 0;
931 server.usedmemory = 0;
932 server.stat_numcommands = 0;
933 server.stat_numconnections = 0;
934 server.stat_starttime = time(NULL);
935 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
936 }
937
938 /* Empty the whole database */
939 static long long emptyDb() {
940 int j;
941 long long removed = 0;
942
943 for (j = 0; j < server.dbnum; j++) {
944 removed += dictSize(server.db[j].dict);
945 dictEmpty(server.db[j].dict);
946 dictEmpty(server.db[j].expires);
947 }
948 return removed;
949 }
950
951 static int yesnotoi(char *s) {
952 if (!strcasecmp(s,"yes")) return 1;
953 else if (!strcasecmp(s,"no")) return 0;
954 else return -1;
955 }
956
957 /* I agree, this is a very rudimental way to load a configuration...
958 will improve later if the config gets more complex */
959 static void loadServerConfig(char *filename) {
960 FILE *fp = fopen(filename,"r");
961 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
962 int linenum = 0;
963 sds line = NULL;
964
965 if (!fp) {
966 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
967 exit(1);
968 }
969 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
970 sds *argv;
971 int argc, j;
972
973 linenum++;
974 line = sdsnew(buf);
975 line = sdstrim(line," \t\r\n");
976
977 /* Skip comments and blank lines*/
978 if (line[0] == '#' || line[0] == '\0') {
979 sdsfree(line);
980 continue;
981 }
982
983 /* Split into arguments */
984 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
985 sdstolower(argv[0]);
986
987 /* Execute config directives */
988 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
989 server.maxidletime = atoi(argv[1]);
990 if (server.maxidletime < 0) {
991 err = "Invalid timeout value"; goto loaderr;
992 }
993 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
994 server.port = atoi(argv[1]);
995 if (server.port < 1 || server.port > 65535) {
996 err = "Invalid port"; goto loaderr;
997 }
998 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
999 server.bindaddr = zstrdup(argv[1]);
1000 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1001 int seconds = atoi(argv[1]);
1002 int changes = atoi(argv[2]);
1003 if (seconds < 1 || changes < 0) {
1004 err = "Invalid save parameters"; goto loaderr;
1005 }
1006 appendServerSaveParams(seconds,changes);
1007 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1008 if (chdir(argv[1]) == -1) {
1009 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1010 argv[1], strerror(errno));
1011 exit(1);
1012 }
1013 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1014 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1015 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1016 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1017 else {
1018 err = "Invalid log level. Must be one of debug, notice, warning";
1019 goto loaderr;
1020 }
1021 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1022 FILE *fp;
1023
1024 server.logfile = zstrdup(argv[1]);
1025 if (!strcasecmp(server.logfile,"stdout")) {
1026 zfree(server.logfile);
1027 server.logfile = NULL;
1028 }
1029 if (server.logfile) {
1030 /* Test if we are able to open the file. The server will not
1031 * be able to abort just for this problem later... */
1032 fp = fopen(server.logfile,"a");
1033 if (fp == NULL) {
1034 err = sdscatprintf(sdsempty(),
1035 "Can't open the log file: %s", strerror(errno));
1036 goto loaderr;
1037 }
1038 fclose(fp);
1039 }
1040 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1041 server.dbnum = atoi(argv[1]);
1042 if (server.dbnum < 1) {
1043 err = "Invalid number of databases"; goto loaderr;
1044 }
1045 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1046 server.maxclients = atoi(argv[1]);
1047 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1048 server.maxmemory = atoi(argv[1]);
1049 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1050 server.masterhost = sdsnew(argv[1]);
1051 server.masterport = atoi(argv[2]);
1052 server.replstate = REDIS_REPL_CONNECT;
1053 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1054 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1055 err = "argument must be 'yes' or 'no'"; goto loaderr;
1056 }
1057 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1058 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1059 err = "argument must be 'yes' or 'no'"; goto loaderr;
1060 }
1061 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1062 server.sharingpoolsize = atoi(argv[1]);
1063 if (server.sharingpoolsize < 1) {
1064 err = "invalid object sharing pool size"; goto loaderr;
1065 }
1066 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1067 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1068 err = "argument must be 'yes' or 'no'"; goto loaderr;
1069 }
1070 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1071 server.requirepass = zstrdup(argv[1]);
1072 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1073 server.pidfile = zstrdup(argv[1]);
1074 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1075 server.dbfilename = zstrdup(argv[1]);
1076 } else {
1077 err = "Bad directive or wrong number of arguments"; goto loaderr;
1078 }
1079 for (j = 0; j < argc; j++)
1080 sdsfree(argv[j]);
1081 zfree(argv);
1082 sdsfree(line);
1083 }
1084 fclose(fp);
1085 return;
1086
1087 loaderr:
1088 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1089 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1090 fprintf(stderr, ">>> '%s'\n", line);
1091 fprintf(stderr, "%s\n", err);
1092 exit(1);
1093 }
1094
1095 static void freeClientArgv(redisClient *c) {
1096 int j;
1097
1098 for (j = 0; j < c->argc; j++)
1099 decrRefCount(c->argv[j]);
1100 c->argc = 0;
1101 }
1102
1103 static void freeClient(redisClient *c) {
1104 listNode *ln;
1105
1106 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1107 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1108 sdsfree(c->querybuf);
1109 listRelease(c->reply);
1110 freeClientArgv(c);
1111 close(c->fd);
1112 ln = listSearchKey(server.clients,c);
1113 assert(ln != NULL);
1114 listDelNode(server.clients,ln);
1115 if (c->flags & REDIS_SLAVE) {
1116 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1117 close(c->repldbfd);
1118 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1119 ln = listSearchKey(l,c);
1120 assert(ln != NULL);
1121 listDelNode(l,ln);
1122 }
1123 if (c->flags & REDIS_MASTER) {
1124 server.master = NULL;
1125 server.replstate = REDIS_REPL_CONNECT;
1126 }
1127 zfree(c->argv);
1128 zfree(c);
1129 }
1130
1131 static void glueReplyBuffersIfNeeded(redisClient *c) {
1132 int totlen = 0;
1133 listNode *ln;
1134 robj *o;
1135
1136 listRewind(c->reply);
1137 while((ln = listYield(c->reply))) {
1138 o = ln->value;
1139 totlen += sdslen(o->ptr);
1140 /* This optimization makes more sense if we don't have to copy
1141 * too much data */
1142 if (totlen > 1024) return;
1143 }
1144 if (totlen > 0) {
1145 char buf[1024];
1146 int copylen = 0;
1147
1148 listRewind(c->reply);
1149 while((ln = listYield(c->reply))) {
1150 o = ln->value;
1151 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1152 copylen += sdslen(o->ptr);
1153 listDelNode(c->reply,ln);
1154 }
1155 /* Now the output buffer is empty, add the new single element */
1156 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1157 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1158 }
1159 }
1160
1161 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1162 redisClient *c = privdata;
1163 int nwritten = 0, totwritten = 0, objlen;
1164 robj *o;
1165 REDIS_NOTUSED(el);
1166 REDIS_NOTUSED(mask);
1167
1168 if (server.glueoutputbuf && listLength(c->reply) > 1)
1169 glueReplyBuffersIfNeeded(c);
1170 while(listLength(c->reply)) {
1171 o = listNodeValue(listFirst(c->reply));
1172 objlen = sdslen(o->ptr);
1173
1174 if (objlen == 0) {
1175 listDelNode(c->reply,listFirst(c->reply));
1176 continue;
1177 }
1178
1179 if (c->flags & REDIS_MASTER) {
1180 nwritten = objlen - c->sentlen;
1181 } else {
1182 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1183 if (nwritten <= 0) break;
1184 }
1185 c->sentlen += nwritten;
1186 totwritten += nwritten;
1187 /* If we fully sent the object on head go to the next one */
1188 if (c->sentlen == objlen) {
1189 listDelNode(c->reply,listFirst(c->reply));
1190 c->sentlen = 0;
1191 }
1192 }
1193 if (nwritten == -1) {
1194 if (errno == EAGAIN) {
1195 nwritten = 0;
1196 } else {
1197 redisLog(REDIS_DEBUG,
1198 "Error writing to client: %s", strerror(errno));
1199 freeClient(c);
1200 return;
1201 }
1202 }
1203 if (totwritten > 0) c->lastinteraction = time(NULL);
1204 if (listLength(c->reply) == 0) {
1205 c->sentlen = 0;
1206 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1207 }
1208 }
1209
1210 static struct redisCommand *lookupCommand(char *name) {
1211 int j = 0;
1212 while(cmdTable[j].name != NULL) {
1213 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1214 j++;
1215 }
1216 return NULL;
1217 }
1218
1219 /* resetClient prepare the client to process the next command */
1220 static void resetClient(redisClient *c) {
1221 freeClientArgv(c);
1222 c->bulklen = -1;
1223 }
1224
1225 /* If this function gets called we already read a whole
1226 * command, argments are in the client argv/argc fields.
1227 * processCommand() execute the command or prepare the
1228 * server for a bulk read from the client.
1229 *
1230 * If 1 is returned the client is still alive and valid and
1231 * and other operations can be performed by the caller. Otherwise
1232 * if 0 is returned the client was destroied (i.e. after QUIT). */
1233 static int processCommand(redisClient *c) {
1234 struct redisCommand *cmd;
1235 long long dirty;
1236
1237 /* Free some memory if needed (maxmemory setting) */
1238 if (server.maxmemory) freeMemoryIfNeeded();
1239
1240 /* The QUIT command is handled as a special case. Normal command
1241 * procs are unable to close the client connection safely */
1242 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1243 freeClient(c);
1244 return 0;
1245 }
1246 cmd = lookupCommand(c->argv[0]->ptr);
1247 if (!cmd) {
1248 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1249 resetClient(c);
1250 return 1;
1251 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1252 (c->argc < -cmd->arity)) {
1253 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1254 resetClient(c);
1255 return 1;
1256 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1257 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1258 resetClient(c);
1259 return 1;
1260 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1261 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1262
1263 decrRefCount(c->argv[c->argc-1]);
1264 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1265 c->argc--;
1266 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1267 resetClient(c);
1268 return 1;
1269 }
1270 c->argc--;
1271 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1272 /* It is possible that the bulk read is already in the
1273 * buffer. Check this condition and handle it accordingly */
1274 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1275 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1276 c->argc++;
1277 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1278 } else {
1279 return 1;
1280 }
1281 }
1282 /* Let's try to share objects on the command arguments vector */
1283 if (server.shareobjects) {
1284 int j;
1285 for(j = 1; j < c->argc; j++)
1286 c->argv[j] = tryObjectSharing(c->argv[j]);
1287 }
1288 /* Check if the user is authenticated */
1289 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1290 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1291 resetClient(c);
1292 return 1;
1293 }
1294
1295 /* Exec the command */
1296 dirty = server.dirty;
1297 cmd->proc(c);
1298 if (server.dirty-dirty != 0 && listLength(server.slaves))
1299 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1300 if (listLength(server.monitors))
1301 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1302 server.stat_numcommands++;
1303
1304 /* Prepare the client for the next command */
1305 if (c->flags & REDIS_CLOSE) {
1306 freeClient(c);
1307 return 0;
1308 }
1309 resetClient(c);
1310 return 1;
1311 }
1312
1313 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1314 listNode *ln;
1315 int outc = 0, j;
1316 robj **outv;
1317 /* (args*2)+1 is enough room for args, spaces, newlines */
1318 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1319
1320 if (argc <= REDIS_STATIC_ARGS) {
1321 outv = static_outv;
1322 } else {
1323 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1324 if (!outv) oom("replicationFeedSlaves");
1325 }
1326
1327 for (j = 0; j < argc; j++) {
1328 if (j != 0) outv[outc++] = shared.space;
1329 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1330 robj *lenobj;
1331
1332 lenobj = createObject(REDIS_STRING,
1333 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv[j]->ptr)));
1334 lenobj->refcount = 0;
1335 outv[outc++] = lenobj;
1336 }
1337 outv[outc++] = argv[j];
1338 }
1339 outv[outc++] = shared.crlf;
1340
1341 /* Increment all the refcounts at start and decrement at end in order to
1342 * be sure to free objects if there is no slave in a replication state
1343 * able to be feed with commands */
1344 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1345 listRewind(slaves);
1346 while((ln = listYield(slaves))) {
1347 redisClient *slave = ln->value;
1348
1349 /* Don't feed slaves that are still waiting for BGSAVE to start */
1350 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1351
1352 /* Feed all the other slaves, MONITORs and so on */
1353 if (slave->slaveseldb != dictid) {
1354 robj *selectcmd;
1355
1356 switch(dictid) {
1357 case 0: selectcmd = shared.select0; break;
1358 case 1: selectcmd = shared.select1; break;
1359 case 2: selectcmd = shared.select2; break;
1360 case 3: selectcmd = shared.select3; break;
1361 case 4: selectcmd = shared.select4; break;
1362 case 5: selectcmd = shared.select5; break;
1363 case 6: selectcmd = shared.select6; break;
1364 case 7: selectcmd = shared.select7; break;
1365 case 8: selectcmd = shared.select8; break;
1366 case 9: selectcmd = shared.select9; break;
1367 default:
1368 selectcmd = createObject(REDIS_STRING,
1369 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1370 selectcmd->refcount = 0;
1371 break;
1372 }
1373 addReply(slave,selectcmd);
1374 slave->slaveseldb = dictid;
1375 }
1376 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1377 }
1378 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1379 if (outv != static_outv) zfree(outv);
1380 }
1381
1382 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1383 redisClient *c = (redisClient*) privdata;
1384 char buf[REDIS_IOBUF_LEN];
1385 int nread;
1386 REDIS_NOTUSED(el);
1387 REDIS_NOTUSED(mask);
1388
1389 nread = read(fd, buf, REDIS_IOBUF_LEN);
1390 if (nread == -1) {
1391 if (errno == EAGAIN) {
1392 nread = 0;
1393 } else {
1394 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1395 freeClient(c);
1396 return;
1397 }
1398 } else if (nread == 0) {
1399 redisLog(REDIS_DEBUG, "Client closed connection");
1400 freeClient(c);
1401 return;
1402 }
1403 if (nread) {
1404 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1405 c->lastinteraction = time(NULL);
1406 } else {
1407 return;
1408 }
1409
1410 again:
1411 if (c->bulklen == -1) {
1412 /* Read the first line of the query */
1413 char *p = strchr(c->querybuf,'\n');
1414 size_t querylen;
1415 if (p) {
1416 sds query, *argv;
1417 int argc, j;
1418
1419 query = c->querybuf;
1420 c->querybuf = sdsempty();
1421 querylen = 1+(p-(query));
1422 if (sdslen(query) > querylen) {
1423 /* leave data after the first line of the query in the buffer */
1424 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1425 }
1426 *p = '\0'; /* remove "\n" */
1427 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1428 sdsupdatelen(query);
1429
1430 /* Now we can split the query in arguments */
1431 if (sdslen(query) == 0) {
1432 /* Ignore empty query */
1433 sdsfree(query);
1434 return;
1435 }
1436 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1437 if (argv == NULL) oom("sdssplitlen");
1438 sdsfree(query);
1439
1440 if (c->argv) zfree(c->argv);
1441 c->argv = zmalloc(sizeof(robj*)*argc);
1442 if (c->argv == NULL) oom("allocating arguments list for client");
1443
1444 for (j = 0; j < argc; j++) {
1445 if (sdslen(argv[j])) {
1446 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1447 c->argc++;
1448 } else {
1449 sdsfree(argv[j]);
1450 }
1451 }
1452 zfree(argv);
1453 /* Execute the command. If the client is still valid
1454 * after processCommand() return and there is something
1455 * on the query buffer try to process the next command. */
1456 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1457 return;
1458 } else if (sdslen(c->querybuf) >= 1024*32) {
1459 redisLog(REDIS_DEBUG, "Client protocol error");
1460 freeClient(c);
1461 return;
1462 }
1463 } else {
1464 /* Bulk read handling. Note that if we are at this point
1465 the client already sent a command terminated with a newline,
1466 we are reading the bulk data that is actually the last
1467 argument of the command. */
1468 int qbl = sdslen(c->querybuf);
1469
1470 if (c->bulklen <= qbl) {
1471 /* Copy everything but the final CRLF as final argument */
1472 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1473 c->argc++;
1474 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1475 processCommand(c);
1476 return;
1477 }
1478 }
1479 }
1480
1481 static int selectDb(redisClient *c, int id) {
1482 if (id < 0 || id >= server.dbnum)
1483 return REDIS_ERR;
1484 c->db = &server.db[id];
1485 return REDIS_OK;
1486 }
1487
1488 static void *dupClientReplyValue(void *o) {
1489 incrRefCount((robj*)o);
1490 return 0;
1491 }
1492
1493 static redisClient *createClient(int fd) {
1494 redisClient *c = zmalloc(sizeof(*c));
1495
1496 anetNonBlock(NULL,fd);
1497 anetTcpNoDelay(NULL,fd);
1498 if (!c) return NULL;
1499 selectDb(c,0);
1500 c->fd = fd;
1501 c->querybuf = sdsempty();
1502 c->argc = 0;
1503 c->argv = NULL;
1504 c->bulklen = -1;
1505 c->sentlen = 0;
1506 c->flags = 0;
1507 c->lastinteraction = time(NULL);
1508 c->authenticated = 0;
1509 c->replstate = REDIS_REPL_NONE;
1510 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1511 listSetFreeMethod(c->reply,decrRefCount);
1512 listSetDupMethod(c->reply,dupClientReplyValue);
1513 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1514 readQueryFromClient, c, NULL) == AE_ERR) {
1515 freeClient(c);
1516 return NULL;
1517 }
1518 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1519 return c;
1520 }
1521
1522 static void addReply(redisClient *c, robj *obj) {
1523 if (listLength(c->reply) == 0 &&
1524 (c->replstate == REDIS_REPL_NONE ||
1525 c->replstate == REDIS_REPL_ONLINE) &&
1526 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1527 sendReplyToClient, c, NULL) == AE_ERR) return;
1528 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1529 incrRefCount(obj);
1530 }
1531
1532 static void addReplySds(redisClient *c, sds s) {
1533 robj *o = createObject(REDIS_STRING,s);
1534 addReply(c,o);
1535 decrRefCount(o);
1536 }
1537
1538 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1539 int cport, cfd;
1540 char cip[128];
1541 redisClient *c;
1542 REDIS_NOTUSED(el);
1543 REDIS_NOTUSED(mask);
1544 REDIS_NOTUSED(privdata);
1545
1546 cfd = anetAccept(server.neterr, fd, cip, &cport);
1547 if (cfd == AE_ERR) {
1548 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1549 return;
1550 }
1551 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1552 if ((c = createClient(cfd)) == NULL) {
1553 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1554 close(cfd); /* May be already closed, just ingore errors */
1555 return;
1556 }
1557 /* If maxclient directive is set and this is one client more... close the
1558 * connection. Note that we create the client instead to check before
1559 * for this condition, since now the socket is already set in nonblocking
1560 * mode and we can send an error for free using the Kernel I/O */
1561 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1562 char *err = "-ERR max number of clients reached\r\n";
1563
1564 /* That's a best effort error message, don't check write errors */
1565 (void) write(c->fd,err,strlen(err));
1566 freeClient(c);
1567 return;
1568 }
1569 server.stat_numconnections++;
1570 }
1571
1572 /* ======================= Redis objects implementation ===================== */
1573
1574 static robj *createObject(int type, void *ptr) {
1575 robj *o;
1576
1577 if (listLength(server.objfreelist)) {
1578 listNode *head = listFirst(server.objfreelist);
1579 o = listNodeValue(head);
1580 listDelNode(server.objfreelist,head);
1581 } else {
1582 o = zmalloc(sizeof(*o));
1583 }
1584 if (!o) oom("createObject");
1585 o->type = type;
1586 o->ptr = ptr;
1587 o->refcount = 1;
1588 return o;
1589 }
1590
1591 static robj *createStringObject(char *ptr, size_t len) {
1592 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1593 }
1594
1595 static robj *createListObject(void) {
1596 list *l = listCreate();
1597
1598 if (!l) oom("listCreate");
1599 listSetFreeMethod(l,decrRefCount);
1600 return createObject(REDIS_LIST,l);
1601 }
1602
1603 static robj *createSetObject(void) {
1604 dict *d = dictCreate(&setDictType,NULL);
1605 if (!d) oom("dictCreate");
1606 return createObject(REDIS_SET,d);
1607 }
1608
1609 static void freeStringObject(robj *o) {
1610 sdsfree(o->ptr);
1611 }
1612
1613 static void freeListObject(robj *o) {
1614 listRelease((list*) o->ptr);
1615 }
1616
1617 static void freeSetObject(robj *o) {
1618 dictRelease((dict*) o->ptr);
1619 }
1620
1621 static void freeHashObject(robj *o) {
1622 dictRelease((dict*) o->ptr);
1623 }
1624
1625 static void incrRefCount(robj *o) {
1626 o->refcount++;
1627 #ifdef DEBUG_REFCOUNT
1628 if (o->type == REDIS_STRING)
1629 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1630 #endif
1631 }
1632
1633 static void decrRefCount(void *obj) {
1634 robj *o = obj;
1635
1636 #ifdef DEBUG_REFCOUNT
1637 if (o->type == REDIS_STRING)
1638 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1639 #endif
1640 if (--(o->refcount) == 0) {
1641 switch(o->type) {
1642 case REDIS_STRING: freeStringObject(o); break;
1643 case REDIS_LIST: freeListObject(o); break;
1644 case REDIS_SET: freeSetObject(o); break;
1645 case REDIS_HASH: freeHashObject(o); break;
1646 default: assert(0 != 0); break;
1647 }
1648 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1649 !listAddNodeHead(server.objfreelist,o))
1650 zfree(o);
1651 }
1652 }
1653
1654 /* Try to share an object against the shared objects pool */
1655 static robj *tryObjectSharing(robj *o) {
1656 struct dictEntry *de;
1657 unsigned long c;
1658
1659 if (o == NULL || server.shareobjects == 0) return o;
1660
1661 assert(o->type == REDIS_STRING);
1662 de = dictFind(server.sharingpool,o);
1663 if (de) {
1664 robj *shared = dictGetEntryKey(de);
1665
1666 c = ((unsigned long) dictGetEntryVal(de))+1;
1667 dictGetEntryVal(de) = (void*) c;
1668 incrRefCount(shared);
1669 decrRefCount(o);
1670 return shared;
1671 } else {
1672 /* Here we are using a stream algorihtm: Every time an object is
1673 * shared we increment its count, everytime there is a miss we
1674 * recrement the counter of a random object. If this object reaches
1675 * zero we remove the object and put the current object instead. */
1676 if (dictSize(server.sharingpool) >=
1677 server.sharingpoolsize) {
1678 de = dictGetRandomKey(server.sharingpool);
1679 assert(de != NULL);
1680 c = ((unsigned long) dictGetEntryVal(de))-1;
1681 dictGetEntryVal(de) = (void*) c;
1682 if (c == 0) {
1683 dictDelete(server.sharingpool,de->key);
1684 }
1685 } else {
1686 c = 0; /* If the pool is empty we want to add this object */
1687 }
1688 if (c == 0) {
1689 int retval;
1690
1691 retval = dictAdd(server.sharingpool,o,(void*)1);
1692 assert(retval == DICT_OK);
1693 incrRefCount(o);
1694 }
1695 return o;
1696 }
1697 }
1698
1699 static robj *lookupKey(redisDb *db, robj *key) {
1700 dictEntry *de = dictFind(db->dict,key);
1701 return de ? dictGetEntryVal(de) : NULL;
1702 }
1703
1704 static robj *lookupKeyRead(redisDb *db, robj *key) {
1705 expireIfNeeded(db,key);
1706 return lookupKey(db,key);
1707 }
1708
1709 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1710 deleteIfVolatile(db,key);
1711 return lookupKey(db,key);
1712 }
1713
1714 static int deleteKey(redisDb *db, robj *key) {
1715 int retval;
1716
1717 /* We need to protect key from destruction: after the first dictDelete()
1718 * it may happen that 'key' is no longer valid if we don't increment
1719 * it's count. This may happen when we get the object reference directly
1720 * from the hash table with dictRandomKey() or dict iterators */
1721 incrRefCount(key);
1722 if (dictSize(db->expires)) dictDelete(db->expires,key);
1723 retval = dictDelete(db->dict,key);
1724 decrRefCount(key);
1725
1726 return retval == DICT_OK;
1727 }
1728
1729 /*============================ DB saving/loading ============================ */
1730
1731 static int rdbSaveType(FILE *fp, unsigned char type) {
1732 if (fwrite(&type,1,1,fp) == 0) return -1;
1733 return 0;
1734 }
1735
1736 static int rdbSaveTime(FILE *fp, time_t t) {
1737 int32_t t32 = (int32_t) t;
1738 if (fwrite(&t32,4,1,fp) == 0) return -1;
1739 return 0;
1740 }
1741
1742 /* check rdbLoadLen() comments for more info */
1743 static int rdbSaveLen(FILE *fp, uint32_t len) {
1744 unsigned char buf[2];
1745
1746 if (len < (1<<6)) {
1747 /* Save a 6 bit len */
1748 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
1749 if (fwrite(buf,1,1,fp) == 0) return -1;
1750 } else if (len < (1<<14)) {
1751 /* Save a 14 bit len */
1752 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
1753 buf[1] = len&0xFF;
1754 if (fwrite(buf,2,1,fp) == 0) return -1;
1755 } else {
1756 /* Save a 32 bit len */
1757 buf[0] = (REDIS_RDB_32BITLEN<<6);
1758 if (fwrite(buf,1,1,fp) == 0) return -1;
1759 len = htonl(len);
1760 if (fwrite(&len,4,1,fp) == 0) return -1;
1761 }
1762 return 0;
1763 }
1764
1765 /* String objects in the form "2391" "-100" without any space and with a
1766 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1767 * encoded as integers to save space */
1768 int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
1769 long long value;
1770 char *endptr, buf[32];
1771
1772 /* Check if it's possible to encode this value as a number */
1773 value = strtoll(s, &endptr, 10);
1774 if (endptr[0] != '\0') return 0;
1775 snprintf(buf,32,"%lld",value);
1776
1777 /* If the number converted back into a string is not identical
1778 * then it's not possible to encode the string as integer */
1779 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
1780
1781 /* Finally check if it fits in our ranges */
1782 if (value >= -(1<<7) && value <= (1<<7)-1) {
1783 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
1784 enc[1] = value&0xFF;
1785 return 2;
1786 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
1787 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
1788 enc[1] = value&0xFF;
1789 enc[2] = (value>>8)&0xFF;
1790 return 3;
1791 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
1792 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
1793 enc[1] = value&0xFF;
1794 enc[2] = (value>>8)&0xFF;
1795 enc[3] = (value>>16)&0xFF;
1796 enc[4] = (value>>24)&0xFF;
1797 return 5;
1798 } else {
1799 return 0;
1800 }
1801 }
1802
1803 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
1804 unsigned int comprlen, outlen;
1805 unsigned char byte;
1806 void *out;
1807
1808 /* We require at least four bytes compression for this to be worth it */
1809 outlen = sdslen(obj->ptr)-4;
1810 if (outlen <= 0) return 0;
1811 if ((out = zmalloc(outlen+1)) == NULL) return 0;
1812 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
1813 if (comprlen == 0) {
1814 zfree(out);
1815 return 0;
1816 }
1817 /* Data compressed! Let's save it on disk */
1818 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
1819 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
1820 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
1821 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
1822 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
1823 zfree(out);
1824 return comprlen;
1825
1826 writeerr:
1827 zfree(out);
1828 return -1;
1829 }
1830
1831 /* Save a string objet as [len][data] on disk. If the object is a string
1832 * representation of an integer value we try to safe it in a special form */
1833 static int rdbSaveStringObject(FILE *fp, robj *obj) {
1834 size_t len = sdslen(obj->ptr);
1835 int enclen;
1836
1837 /* Try integer encoding */
1838 if (len <= 11) {
1839 unsigned char buf[5];
1840 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
1841 if (fwrite(buf,enclen,1,fp) == 0) return -1;
1842 return 0;
1843 }
1844 }
1845
1846 /* Try LZF compression - under 20 bytes it's unable to compress even
1847 * aaaaaaaaaaaaaaaaaa so skip it */
1848 if (1 && len > 20) {
1849 int retval;
1850
1851 retval = rdbSaveLzfStringObject(fp,obj);
1852 if (retval == -1) return -1;
1853 if (retval > 0) return 0;
1854 /* retval == 0 means data can't be compressed, save the old way */
1855 }
1856
1857 /* Store verbatim */
1858 if (rdbSaveLen(fp,len) == -1) return -1;
1859 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
1860 return 0;
1861 }
1862
1863 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1864 static int rdbSave(char *filename) {
1865 dictIterator *di = NULL;
1866 dictEntry *de;
1867 FILE *fp;
1868 char tmpfile[256];
1869 int j;
1870 time_t now = time(NULL);
1871
1872 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
1873 fp = fopen(tmpfile,"w");
1874 if (!fp) {
1875 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
1876 return REDIS_ERR;
1877 }
1878 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
1879 for (j = 0; j < server.dbnum; j++) {
1880 redisDb *db = server.db+j;
1881 dict *d = db->dict;
1882 if (dictSize(d) == 0) continue;
1883 di = dictGetIterator(d);
1884 if (!di) {
1885 fclose(fp);
1886 return REDIS_ERR;
1887 }
1888
1889 /* Write the SELECT DB opcode */
1890 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
1891 if (rdbSaveLen(fp,j) == -1) goto werr;
1892
1893 /* Iterate this DB writing every entry */
1894 while((de = dictNext(di)) != NULL) {
1895 robj *key = dictGetEntryKey(de);
1896 robj *o = dictGetEntryVal(de);
1897 time_t expiretime = getExpire(db,key);
1898
1899 /* Save the expire time */
1900 if (expiretime != -1) {
1901 /* If this key is already expired skip it */
1902 if (expiretime < now) continue;
1903 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
1904 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
1905 }
1906 /* Save the key and associated value */
1907 if (rdbSaveType(fp,o->type) == -1) goto werr;
1908 if (rdbSaveStringObject(fp,key) == -1) goto werr;
1909 if (o->type == REDIS_STRING) {
1910 /* Save a string value */
1911 if (rdbSaveStringObject(fp,o) == -1) goto werr;
1912 } else if (o->type == REDIS_LIST) {
1913 /* Save a list value */
1914 list *list = o->ptr;
1915 listNode *ln;
1916
1917 listRewind(list);
1918 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
1919 while((ln = listYield(list))) {
1920 robj *eleobj = listNodeValue(ln);
1921
1922 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1923 }
1924 } else if (o->type == REDIS_SET) {
1925 /* Save a set value */
1926 dict *set = o->ptr;
1927 dictIterator *di = dictGetIterator(set);
1928 dictEntry *de;
1929
1930 if (!set) oom("dictGetIteraotr");
1931 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
1932 while((de = dictNext(di)) != NULL) {
1933 robj *eleobj = dictGetEntryKey(de);
1934
1935 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1936 }
1937 dictReleaseIterator(di);
1938 } else {
1939 assert(0 != 0);
1940 }
1941 }
1942 dictReleaseIterator(di);
1943 }
1944 /* EOF opcode */
1945 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
1946
1947 /* Make sure data will not remain on the OS's output buffers */
1948 fflush(fp);
1949 fsync(fileno(fp));
1950 fclose(fp);
1951
1952 /* Use RENAME to make sure the DB file is changed atomically only
1953 * if the generate DB file is ok. */
1954 if (rename(tmpfile,filename) == -1) {
1955 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
1956 unlink(tmpfile);
1957 return REDIS_ERR;
1958 }
1959 redisLog(REDIS_NOTICE,"DB saved on disk");
1960 server.dirty = 0;
1961 server.lastsave = time(NULL);
1962 return REDIS_OK;
1963
1964 werr:
1965 fclose(fp);
1966 unlink(tmpfile);
1967 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1968 if (di) dictReleaseIterator(di);
1969 return REDIS_ERR;
1970 }
1971
1972 static int rdbSaveBackground(char *filename) {
1973 pid_t childpid;
1974
1975 if (server.bgsaveinprogress) return REDIS_ERR;
1976 if ((childpid = fork()) == 0) {
1977 /* Child */
1978 close(server.fd);
1979 if (rdbSave(filename) == REDIS_OK) {
1980 exit(0);
1981 } else {
1982 exit(1);
1983 }
1984 } else {
1985 /* Parent */
1986 if (childpid == -1) {
1987 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
1988 strerror(errno));
1989 return REDIS_ERR;
1990 }
1991 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
1992 server.bgsaveinprogress = 1;
1993 server.bgsavechildpid = childpid;
1994 return REDIS_OK;
1995 }
1996 return REDIS_OK; /* unreached */
1997 }
1998
1999 static int rdbLoadType(FILE *fp) {
2000 unsigned char type;
2001 if (fread(&type,1,1,fp) == 0) return -1;
2002 return type;
2003 }
2004
2005 static time_t rdbLoadTime(FILE *fp) {
2006 int32_t t32;
2007 if (fread(&t32,4,1,fp) == 0) return -1;
2008 return (time_t) t32;
2009 }
2010
2011 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2012 * of this file for a description of how this are stored on disk.
2013 *
2014 * isencoded is set to 1 if the readed length is not actually a length but
2015 * an "encoding type", check the above comments for more info */
2016 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2017 unsigned char buf[2];
2018 uint32_t len;
2019
2020 if (isencoded) *isencoded = 0;
2021 if (rdbver == 0) {
2022 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2023 return ntohl(len);
2024 } else {
2025 int type;
2026
2027 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2028 type = (buf[0]&0xC0)>>6;
2029 if (type == REDIS_RDB_6BITLEN) {
2030 /* Read a 6 bit len */
2031 return buf[0]&0x3F;
2032 } else if (type == REDIS_RDB_ENCVAL) {
2033 /* Read a 6 bit len encoding type */
2034 if (isencoded) *isencoded = 1;
2035 return buf[0]&0x3F;
2036 } else if (type == REDIS_RDB_14BITLEN) {
2037 /* Read a 14 bit len */
2038 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2039 return ((buf[0]&0x3F)<<8)|buf[1];
2040 } else {
2041 /* Read a 32 bit len */
2042 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2043 return ntohl(len);
2044 }
2045 }
2046 }
2047
2048 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2049 unsigned char enc[4];
2050 long long val;
2051
2052 if (enctype == REDIS_RDB_ENC_INT8) {
2053 if (fread(enc,1,1,fp) == 0) return NULL;
2054 val = (signed char)enc[0];
2055 } else if (enctype == REDIS_RDB_ENC_INT16) {
2056 uint16_t v;
2057 if (fread(enc,2,1,fp) == 0) return NULL;
2058 v = enc[0]|(enc[1]<<8);
2059 val = (int16_t)v;
2060 } else if (enctype == REDIS_RDB_ENC_INT32) {
2061 uint32_t v;
2062 if (fread(enc,4,1,fp) == 0) return NULL;
2063 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2064 val = (int32_t)v;
2065 } else {
2066 val = 0; /* anti-warning */
2067 assert(0!=0);
2068 }
2069 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2070 }
2071
2072 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2073 unsigned int len, clen;
2074 unsigned char *c = NULL;
2075 sds val = NULL;
2076
2077 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2078 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2079 if ((c = zmalloc(clen)) == NULL) goto err;
2080 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2081 if (fread(c,clen,1,fp) == 0) goto err;
2082 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2083 zfree(c);
2084 return createObject(REDIS_STRING,val);
2085 err:
2086 zfree(c);
2087 sdsfree(val);
2088 return NULL;
2089 }
2090
2091 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2092 int isencoded;
2093 uint32_t len;
2094 sds val;
2095
2096 len = rdbLoadLen(fp,rdbver,&isencoded);
2097 if (isencoded) {
2098 switch(len) {
2099 case REDIS_RDB_ENC_INT8:
2100 case REDIS_RDB_ENC_INT16:
2101 case REDIS_RDB_ENC_INT32:
2102 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2103 case REDIS_RDB_ENC_LZF:
2104 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2105 default:
2106 assert(0!=0);
2107 }
2108 }
2109
2110 if (len == REDIS_RDB_LENERR) return NULL;
2111 val = sdsnewlen(NULL,len);
2112 if (len && fread(val,len,1,fp) == 0) {
2113 sdsfree(val);
2114 return NULL;
2115 }
2116 return tryObjectSharing(createObject(REDIS_STRING,val));
2117 }
2118
2119 static int rdbLoad(char *filename) {
2120 FILE *fp;
2121 robj *keyobj = NULL;
2122 uint32_t dbid;
2123 int type, retval, rdbver;
2124 dict *d = server.db[0].dict;
2125 redisDb *db = server.db+0;
2126 char buf[1024];
2127 time_t expiretime = -1, now = time(NULL);
2128
2129 fp = fopen(filename,"r");
2130 if (!fp) return REDIS_ERR;
2131 if (fread(buf,9,1,fp) == 0) goto eoferr;
2132 buf[9] = '\0';
2133 if (memcmp(buf,"REDIS",5) != 0) {
2134 fclose(fp);
2135 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2136 return REDIS_ERR;
2137 }
2138 rdbver = atoi(buf+5);
2139 if (rdbver > 1) {
2140 fclose(fp);
2141 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2142 return REDIS_ERR;
2143 }
2144 while(1) {
2145 robj *o;
2146
2147 /* Read type. */
2148 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2149 if (type == REDIS_EXPIRETIME) {
2150 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2151 /* We read the time so we need to read the object type again */
2152 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2153 }
2154 if (type == REDIS_EOF) break;
2155 /* Handle SELECT DB opcode as a special case */
2156 if (type == REDIS_SELECTDB) {
2157 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2158 goto eoferr;
2159 if (dbid >= (unsigned)server.dbnum) {
2160 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2161 exit(1);
2162 }
2163 db = server.db+dbid;
2164 d = db->dict;
2165 continue;
2166 }
2167 /* Read key */
2168 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2169
2170 if (type == REDIS_STRING) {
2171 /* Read string value */
2172 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2173 } else if (type == REDIS_LIST || type == REDIS_SET) {
2174 /* Read list/set value */
2175 uint32_t listlen;
2176
2177 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2178 goto eoferr;
2179 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2180 /* Load every single element of the list/set */
2181 while(listlen--) {
2182 robj *ele;
2183
2184 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2185 if (type == REDIS_LIST) {
2186 if (!listAddNodeTail((list*)o->ptr,ele))
2187 oom("listAddNodeTail");
2188 } else {
2189 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2190 oom("dictAdd");
2191 }
2192 }
2193 } else {
2194 assert(0 != 0);
2195 }
2196 /* Add the new object in the hash table */
2197 retval = dictAdd(d,keyobj,o);
2198 if (retval == DICT_ERR) {
2199 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2200 exit(1);
2201 }
2202 /* Set the expire time if needed */
2203 if (expiretime != -1) {
2204 setExpire(db,keyobj,expiretime);
2205 /* Delete this key if already expired */
2206 if (expiretime < now) deleteKey(db,keyobj);
2207 expiretime = -1;
2208 }
2209 keyobj = o = NULL;
2210 }
2211 fclose(fp);
2212 return REDIS_OK;
2213
2214 eoferr: /* unexpected end of file is handled here with a fatal exit */
2215 if (keyobj) decrRefCount(keyobj);
2216 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2217 exit(1);
2218 return REDIS_ERR; /* Just to avoid warning */
2219 }
2220
2221 /*================================== Commands =============================== */
2222
2223 static void authCommand(redisClient *c) {
2224 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2225 c->authenticated = 1;
2226 addReply(c,shared.ok);
2227 } else {
2228 c->authenticated = 0;
2229 addReply(c,shared.err);
2230 }
2231 }
2232
2233 static void pingCommand(redisClient *c) {
2234 addReply(c,shared.pong);
2235 }
2236
2237 static void echoCommand(redisClient *c) {
2238 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
2239 (int)sdslen(c->argv[1]->ptr)));
2240 addReply(c,c->argv[1]);
2241 addReply(c,shared.crlf);
2242 }
2243
2244 /*=================================== Strings =============================== */
2245
2246 static void setGenericCommand(redisClient *c, int nx) {
2247 int retval;
2248
2249 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2250 if (retval == DICT_ERR) {
2251 if (!nx) {
2252 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2253 incrRefCount(c->argv[2]);
2254 } else {
2255 addReply(c,shared.czero);
2256 return;
2257 }
2258 } else {
2259 incrRefCount(c->argv[1]);
2260 incrRefCount(c->argv[2]);
2261 }
2262 server.dirty++;
2263 removeExpire(c->db,c->argv[1]);
2264 addReply(c, nx ? shared.cone : shared.ok);
2265 }
2266
2267 static void setCommand(redisClient *c) {
2268 setGenericCommand(c,0);
2269 }
2270
2271 static void setnxCommand(redisClient *c) {
2272 setGenericCommand(c,1);
2273 }
2274
2275 static void getCommand(redisClient *c) {
2276 robj *o = lookupKeyRead(c->db,c->argv[1]);
2277
2278 if (o == NULL) {
2279 addReply(c,shared.nullbulk);
2280 } else {
2281 if (o->type != REDIS_STRING) {
2282 addReply(c,shared.wrongtypeerr);
2283 } else {
2284 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2285 addReply(c,o);
2286 addReply(c,shared.crlf);
2287 }
2288 }
2289 }
2290
2291 static void getSetCommand(redisClient *c) {
2292 getCommand(c);
2293 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2294 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2295 } else {
2296 incrRefCount(c->argv[1]);
2297 }
2298 incrRefCount(c->argv[2]);
2299 server.dirty++;
2300 removeExpire(c->db,c->argv[1]);
2301 }
2302
2303 static void mgetCommand(redisClient *c) {
2304 int j;
2305
2306 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2307 for (j = 1; j < c->argc; j++) {
2308 robj *o = lookupKeyRead(c->db,c->argv[j]);
2309 if (o == NULL) {
2310 addReply(c,shared.nullbulk);
2311 } else {
2312 if (o->type != REDIS_STRING) {
2313 addReply(c,shared.nullbulk);
2314 } else {
2315 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2316 addReply(c,o);
2317 addReply(c,shared.crlf);
2318 }
2319 }
2320 }
2321 }
2322
2323 static void incrDecrCommand(redisClient *c, long long incr) {
2324 long long value;
2325 int retval;
2326 robj *o;
2327
2328 o = lookupKeyWrite(c->db,c->argv[1]);
2329 if (o == NULL) {
2330 value = 0;
2331 } else {
2332 if (o->type != REDIS_STRING) {
2333 value = 0;
2334 } else {
2335 char *eptr;
2336
2337 value = strtoll(o->ptr, &eptr, 10);
2338 }
2339 }
2340
2341 value += incr;
2342 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2343 retval = dictAdd(c->db->dict,c->argv[1],o);
2344 if (retval == DICT_ERR) {
2345 dictReplace(c->db->dict,c->argv[1],o);
2346 removeExpire(c->db,c->argv[1]);
2347 } else {
2348 incrRefCount(c->argv[1]);
2349 }
2350 server.dirty++;
2351 addReply(c,shared.colon);
2352 addReply(c,o);
2353 addReply(c,shared.crlf);
2354 }
2355
2356 static void incrCommand(redisClient *c) {
2357 incrDecrCommand(c,1);
2358 }
2359
2360 static void decrCommand(redisClient *c) {
2361 incrDecrCommand(c,-1);
2362 }
2363
2364 static void incrbyCommand(redisClient *c) {
2365 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2366 incrDecrCommand(c,incr);
2367 }
2368
2369 static void decrbyCommand(redisClient *c) {
2370 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2371 incrDecrCommand(c,-incr);
2372 }
2373
2374 /* ========================= Type agnostic commands ========================= */
2375
2376 static void delCommand(redisClient *c) {
2377 int deleted = 0, j;
2378
2379 for (j = 1; j < c->argc; j++) {
2380 if (deleteKey(c->db,c->argv[j])) {
2381 server.dirty++;
2382 deleted++;
2383 }
2384 }
2385 switch(deleted) {
2386 case 0:
2387 addReply(c,shared.czero);
2388 break;
2389 case 1:
2390 addReply(c,shared.cone);
2391 break;
2392 default:
2393 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2394 break;
2395 }
2396 }
2397
2398 static void existsCommand(redisClient *c) {
2399 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2400 }
2401
2402 static void selectCommand(redisClient *c) {
2403 int id = atoi(c->argv[1]->ptr);
2404
2405 if (selectDb(c,id) == REDIS_ERR) {
2406 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2407 } else {
2408 addReply(c,shared.ok);
2409 }
2410 }
2411
2412 static void randomkeyCommand(redisClient *c) {
2413 dictEntry *de;
2414
2415 while(1) {
2416 de = dictGetRandomKey(c->db->dict);
2417 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2418 }
2419 if (de == NULL) {
2420 addReply(c,shared.plus);
2421 addReply(c,shared.crlf);
2422 } else {
2423 addReply(c,shared.plus);
2424 addReply(c,dictGetEntryKey(de));
2425 addReply(c,shared.crlf);
2426 }
2427 }
2428
2429 static void keysCommand(redisClient *c) {
2430 dictIterator *di;
2431 dictEntry *de;
2432 sds pattern = c->argv[1]->ptr;
2433 int plen = sdslen(pattern);
2434 int numkeys = 0, keyslen = 0;
2435 robj *lenobj = createObject(REDIS_STRING,NULL);
2436
2437 di = dictGetIterator(c->db->dict);
2438 if (!di) oom("dictGetIterator");
2439 addReply(c,lenobj);
2440 decrRefCount(lenobj);
2441 while((de = dictNext(di)) != NULL) {
2442 robj *keyobj = dictGetEntryKey(de);
2443
2444 sds key = keyobj->ptr;
2445 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2446 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2447 if (expireIfNeeded(c->db,keyobj) == 0) {
2448 if (numkeys != 0)
2449 addReply(c,shared.space);
2450 addReply(c,keyobj);
2451 numkeys++;
2452 keyslen += sdslen(key);
2453 }
2454 }
2455 }
2456 dictReleaseIterator(di);
2457 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2458 addReply(c,shared.crlf);
2459 }
2460
2461 static void dbsizeCommand(redisClient *c) {
2462 addReplySds(c,
2463 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2464 }
2465
2466 static void lastsaveCommand(redisClient *c) {
2467 addReplySds(c,
2468 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2469 }
2470
2471 static void typeCommand(redisClient *c) {
2472 robj *o;
2473 char *type;
2474
2475 o = lookupKeyRead(c->db,c->argv[1]);
2476 if (o == NULL) {
2477 type = "+none";
2478 } else {
2479 switch(o->type) {
2480 case REDIS_STRING: type = "+string"; break;
2481 case REDIS_LIST: type = "+list"; break;
2482 case REDIS_SET: type = "+set"; break;
2483 default: type = "unknown"; break;
2484 }
2485 }
2486 addReplySds(c,sdsnew(type));
2487 addReply(c,shared.crlf);
2488 }
2489
2490 static void saveCommand(redisClient *c) {
2491 if (server.bgsaveinprogress) {
2492 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2493 return;
2494 }
2495 if (rdbSave(server.dbfilename) == REDIS_OK) {
2496 addReply(c,shared.ok);
2497 } else {
2498 addReply(c,shared.err);
2499 }
2500 }
2501
2502 static void bgsaveCommand(redisClient *c) {
2503 if (server.bgsaveinprogress) {
2504 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2505 return;
2506 }
2507 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2508 addReply(c,shared.ok);
2509 } else {
2510 addReply(c,shared.err);
2511 }
2512 }
2513
2514 static void shutdownCommand(redisClient *c) {
2515 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2516 if (server.bgsaveinprogress) {
2517 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2518 kill(server.bgsavechildpid,SIGKILL);
2519 }
2520 if (rdbSave(server.dbfilename) == REDIS_OK) {
2521 if (server.daemonize)
2522 unlink(server.pidfile);
2523 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2524 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2525 exit(1);
2526 } else {
2527 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2528 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2529 }
2530 }
2531
2532 static void renameGenericCommand(redisClient *c, int nx) {
2533 robj *o;
2534
2535 /* To use the same key as src and dst is probably an error */
2536 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2537 addReply(c,shared.sameobjecterr);
2538 return;
2539 }
2540
2541 o = lookupKeyWrite(c->db,c->argv[1]);
2542 if (o == NULL) {
2543 addReply(c,shared.nokeyerr);
2544 return;
2545 }
2546 incrRefCount(o);
2547 deleteIfVolatile(c->db,c->argv[2]);
2548 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2549 if (nx) {
2550 decrRefCount(o);
2551 addReply(c,shared.czero);
2552 return;
2553 }
2554 dictReplace(c->db->dict,c->argv[2],o);
2555 } else {
2556 incrRefCount(c->argv[2]);
2557 }
2558 deleteKey(c->db,c->argv[1]);
2559 server.dirty++;
2560 addReply(c,nx ? shared.cone : shared.ok);
2561 }
2562
2563 static void renameCommand(redisClient *c) {
2564 renameGenericCommand(c,0);
2565 }
2566
2567 static void renamenxCommand(redisClient *c) {
2568 renameGenericCommand(c,1);
2569 }
2570
2571 static void moveCommand(redisClient *c) {
2572 robj *o;
2573 redisDb *src, *dst;
2574 int srcid;
2575
2576 /* Obtain source and target DB pointers */
2577 src = c->db;
2578 srcid = c->db->id;
2579 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2580 addReply(c,shared.outofrangeerr);
2581 return;
2582 }
2583 dst = c->db;
2584 selectDb(c,srcid); /* Back to the source DB */
2585
2586 /* If the user is moving using as target the same
2587 * DB as the source DB it is probably an error. */
2588 if (src == dst) {
2589 addReply(c,shared.sameobjecterr);
2590 return;
2591 }
2592
2593 /* Check if the element exists and get a reference */
2594 o = lookupKeyWrite(c->db,c->argv[1]);
2595 if (!o) {
2596 addReply(c,shared.czero);
2597 return;
2598 }
2599
2600 /* Try to add the element to the target DB */
2601 deleteIfVolatile(dst,c->argv[1]);
2602 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2603 addReply(c,shared.czero);
2604 return;
2605 }
2606 incrRefCount(c->argv[1]);
2607 incrRefCount(o);
2608
2609 /* OK! key moved, free the entry in the source DB */
2610 deleteKey(src,c->argv[1]);
2611 server.dirty++;
2612 addReply(c,shared.cone);
2613 }
2614
2615 /* =================================== Lists ================================ */
2616 static void pushGenericCommand(redisClient *c, int where) {
2617 robj *lobj;
2618 list *list;
2619
2620 lobj = lookupKeyWrite(c->db,c->argv[1]);
2621 if (lobj == NULL) {
2622 lobj = createListObject();
2623 list = lobj->ptr;
2624 if (where == REDIS_HEAD) {
2625 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2626 } else {
2627 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2628 }
2629 dictAdd(c->db->dict,c->argv[1],lobj);
2630 incrRefCount(c->argv[1]);
2631 incrRefCount(c->argv[2]);
2632 } else {
2633 if (lobj->type != REDIS_LIST) {
2634 addReply(c,shared.wrongtypeerr);
2635 return;
2636 }
2637 list = lobj->ptr;
2638 if (where == REDIS_HEAD) {
2639 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2640 } else {
2641 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2642 }
2643 incrRefCount(c->argv[2]);
2644 }
2645 server.dirty++;
2646 addReply(c,shared.ok);
2647 }
2648
2649 static void lpushCommand(redisClient *c) {
2650 pushGenericCommand(c,REDIS_HEAD);
2651 }
2652
2653 static void rpushCommand(redisClient *c) {
2654 pushGenericCommand(c,REDIS_TAIL);
2655 }
2656
2657 static void llenCommand(redisClient *c) {
2658 robj *o;
2659 list *l;
2660
2661 o = lookupKeyRead(c->db,c->argv[1]);
2662 if (o == NULL) {
2663 addReply(c,shared.czero);
2664 return;
2665 } else {
2666 if (o->type != REDIS_LIST) {
2667 addReply(c,shared.wrongtypeerr);
2668 } else {
2669 l = o->ptr;
2670 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
2671 }
2672 }
2673 }
2674
2675 static void lindexCommand(redisClient *c) {
2676 robj *o;
2677 int index = atoi(c->argv[2]->ptr);
2678
2679 o = lookupKeyRead(c->db,c->argv[1]);
2680 if (o == NULL) {
2681 addReply(c,shared.nullbulk);
2682 } else {
2683 if (o->type != REDIS_LIST) {
2684 addReply(c,shared.wrongtypeerr);
2685 } else {
2686 list *list = o->ptr;
2687 listNode *ln;
2688
2689 ln = listIndex(list, index);
2690 if (ln == NULL) {
2691 addReply(c,shared.nullbulk);
2692 } else {
2693 robj *ele = listNodeValue(ln);
2694 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2695 addReply(c,ele);
2696 addReply(c,shared.crlf);
2697 }
2698 }
2699 }
2700 }
2701
2702 static void lsetCommand(redisClient *c) {
2703 robj *o;
2704 int index = atoi(c->argv[2]->ptr);
2705
2706 o = lookupKeyWrite(c->db,c->argv[1]);
2707 if (o == NULL) {
2708 addReply(c,shared.nokeyerr);
2709 } else {
2710 if (o->type != REDIS_LIST) {
2711 addReply(c,shared.wrongtypeerr);
2712 } else {
2713 list *list = o->ptr;
2714 listNode *ln;
2715
2716 ln = listIndex(list, index);
2717 if (ln == NULL) {
2718 addReply(c,shared.outofrangeerr);
2719 } else {
2720 robj *ele = listNodeValue(ln);
2721
2722 decrRefCount(ele);
2723 listNodeValue(ln) = c->argv[3];
2724 incrRefCount(c->argv[3]);
2725 addReply(c,shared.ok);
2726 server.dirty++;
2727 }
2728 }
2729 }
2730 }
2731
2732 static void popGenericCommand(redisClient *c, int where) {
2733 robj *o;
2734
2735 o = lookupKeyWrite(c->db,c->argv[1]);
2736 if (o == NULL) {
2737 addReply(c,shared.nullbulk);
2738 } else {
2739 if (o->type != REDIS_LIST) {
2740 addReply(c,shared.wrongtypeerr);
2741 } else {
2742 list *list = o->ptr;
2743 listNode *ln;
2744
2745 if (where == REDIS_HEAD)
2746 ln = listFirst(list);
2747 else
2748 ln = listLast(list);
2749
2750 if (ln == NULL) {
2751 addReply(c,shared.nullbulk);
2752 } else {
2753 robj *ele = listNodeValue(ln);
2754 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2755 addReply(c,ele);
2756 addReply(c,shared.crlf);
2757 listDelNode(list,ln);
2758 server.dirty++;
2759 }
2760 }
2761 }
2762 }
2763
2764 static void lpopCommand(redisClient *c) {
2765 popGenericCommand(c,REDIS_HEAD);
2766 }
2767
2768 static void rpopCommand(redisClient *c) {
2769 popGenericCommand(c,REDIS_TAIL);
2770 }
2771
2772 static void lrangeCommand(redisClient *c) {
2773 robj *o;
2774 int start = atoi(c->argv[2]->ptr);
2775 int end = atoi(c->argv[3]->ptr);
2776
2777 o = lookupKeyRead(c->db,c->argv[1]);
2778 if (o == NULL) {
2779 addReply(c,shared.nullmultibulk);
2780 } else {
2781 if (o->type != REDIS_LIST) {
2782 addReply(c,shared.wrongtypeerr);
2783 } else {
2784 list *list = o->ptr;
2785 listNode *ln;
2786 int llen = listLength(list);
2787 int rangelen, j;
2788 robj *ele;
2789
2790 /* convert negative indexes */
2791 if (start < 0) start = llen+start;
2792 if (end < 0) end = llen+end;
2793 if (start < 0) start = 0;
2794 if (end < 0) end = 0;
2795
2796 /* indexes sanity checks */
2797 if (start > end || start >= llen) {
2798 /* Out of range start or start > end result in empty list */
2799 addReply(c,shared.emptymultibulk);
2800 return;
2801 }
2802 if (end >= llen) end = llen-1;
2803 rangelen = (end-start)+1;
2804
2805 /* Return the result in form of a multi-bulk reply */
2806 ln = listIndex(list, start);
2807 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
2808 for (j = 0; j < rangelen; j++) {
2809 ele = listNodeValue(ln);
2810 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2811 addReply(c,ele);
2812 addReply(c,shared.crlf);
2813 ln = ln->next;
2814 }
2815 }
2816 }
2817 }
2818
2819 static void ltrimCommand(redisClient *c) {
2820 robj *o;
2821 int start = atoi(c->argv[2]->ptr);
2822 int end = atoi(c->argv[3]->ptr);
2823
2824 o = lookupKeyWrite(c->db,c->argv[1]);
2825 if (o == NULL) {
2826 addReply(c,shared.nokeyerr);
2827 } else {
2828 if (o->type != REDIS_LIST) {
2829 addReply(c,shared.wrongtypeerr);
2830 } else {
2831 list *list = o->ptr;
2832 listNode *ln;
2833 int llen = listLength(list);
2834 int j, ltrim, rtrim;
2835
2836 /* convert negative indexes */
2837 if (start < 0) start = llen+start;
2838 if (end < 0) end = llen+end;
2839 if (start < 0) start = 0;
2840 if (end < 0) end = 0;
2841
2842 /* indexes sanity checks */
2843 if (start > end || start >= llen) {
2844 /* Out of range start or start > end result in empty list */
2845 ltrim = llen;
2846 rtrim = 0;
2847 } else {
2848 if (end >= llen) end = llen-1;
2849 ltrim = start;
2850 rtrim = llen-end-1;
2851 }
2852
2853 /* Remove list elements to perform the trim */
2854 for (j = 0; j < ltrim; j++) {
2855 ln = listFirst(list);
2856 listDelNode(list,ln);
2857 }
2858 for (j = 0; j < rtrim; j++) {
2859 ln = listLast(list);
2860 listDelNode(list,ln);
2861 }
2862 addReply(c,shared.ok);
2863 server.dirty++;
2864 }
2865 }
2866 }
2867
2868 static void lremCommand(redisClient *c) {
2869 robj *o;
2870
2871 o = lookupKeyWrite(c->db,c->argv[1]);
2872 if (o == NULL) {
2873 addReply(c,shared.czero);
2874 } else {
2875 if (o->type != REDIS_LIST) {
2876 addReply(c,shared.wrongtypeerr);
2877 } else {
2878 list *list = o->ptr;
2879 listNode *ln, *next;
2880 int toremove = atoi(c->argv[2]->ptr);
2881 int removed = 0;
2882 int fromtail = 0;
2883
2884 if (toremove < 0) {
2885 toremove = -toremove;
2886 fromtail = 1;
2887 }
2888 ln = fromtail ? list->tail : list->head;
2889 while (ln) {
2890 robj *ele = listNodeValue(ln);
2891
2892 next = fromtail ? ln->prev : ln->next;
2893 if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) {
2894 listDelNode(list,ln);
2895 server.dirty++;
2896 removed++;
2897 if (toremove && removed == toremove) break;
2898 }
2899 ln = next;
2900 }
2901 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
2902 }
2903 }
2904 }
2905
2906 /* ==================================== Sets ================================ */
2907
2908 static void saddCommand(redisClient *c) {
2909 robj *set;
2910
2911 set = lookupKeyWrite(c->db,c->argv[1]);
2912 if (set == NULL) {
2913 set = createSetObject();
2914 dictAdd(c->db->dict,c->argv[1],set);
2915 incrRefCount(c->argv[1]);
2916 } else {
2917 if (set->type != REDIS_SET) {
2918 addReply(c,shared.wrongtypeerr);
2919 return;
2920 }
2921 }
2922 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
2923 incrRefCount(c->argv[2]);
2924 server.dirty++;
2925 addReply(c,shared.cone);
2926 } else {
2927 addReply(c,shared.czero);
2928 }
2929 }
2930
2931 static void sremCommand(redisClient *c) {
2932 robj *set;
2933
2934 set = lookupKeyWrite(c->db,c->argv[1]);
2935 if (set == NULL) {
2936 addReply(c,shared.czero);
2937 } else {
2938 if (set->type != REDIS_SET) {
2939 addReply(c,shared.wrongtypeerr);
2940 return;
2941 }
2942 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
2943 server.dirty++;
2944 addReply(c,shared.cone);
2945 } else {
2946 addReply(c,shared.czero);
2947 }
2948 }
2949 }
2950
2951 static void smoveCommand(redisClient *c) {
2952 robj *srcset, *dstset;
2953
2954 srcset = lookupKeyWrite(c->db,c->argv[1]);
2955 dstset = lookupKeyWrite(c->db,c->argv[2]);
2956
2957 /* If the source key does not exist return 0, if it's of the wrong type
2958 * raise an error */
2959 if (srcset == NULL || srcset->type != REDIS_SET) {
2960 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
2961 return;
2962 }
2963 /* Error if the destination key is not a set as well */
2964 if (dstset && dstset->type != REDIS_SET) {
2965 addReply(c,shared.wrongtypeerr);
2966 return;
2967 }
2968 /* Remove the element from the source set */
2969 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
2970 /* Key not found in the src set! return zero */
2971 addReply(c,shared.czero);
2972 return;
2973 }
2974 server.dirty++;
2975 /* Add the element to the destination set */
2976 if (!dstset) {
2977 dstset = createSetObject();
2978 dictAdd(c->db->dict,c->argv[2],dstset);
2979 incrRefCount(c->argv[2]);
2980 }
2981 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
2982 incrRefCount(c->argv[3]);
2983 addReply(c,shared.cone);
2984 }
2985
2986 static void sismemberCommand(redisClient *c) {
2987 robj *set;
2988
2989 set = lookupKeyRead(c->db,c->argv[1]);
2990 if (set == NULL) {
2991 addReply(c,shared.czero);
2992 } else {
2993 if (set->type != REDIS_SET) {
2994 addReply(c,shared.wrongtypeerr);
2995 return;
2996 }
2997 if (dictFind(set->ptr,c->argv[2]))
2998 addReply(c,shared.cone);
2999 else
3000 addReply(c,shared.czero);
3001 }
3002 }
3003
3004 static void scardCommand(redisClient *c) {
3005 robj *o;
3006 dict *s;
3007
3008 o = lookupKeyRead(c->db,c->argv[1]);
3009 if (o == NULL) {
3010 addReply(c,shared.czero);
3011 return;
3012 } else {
3013 if (o->type != REDIS_SET) {
3014 addReply(c,shared.wrongtypeerr);
3015 } else {
3016 s = o->ptr;
3017 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3018 dictSize(s)));
3019 }
3020 }
3021 }
3022
3023 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3024 dict **d1 = (void*) s1, **d2 = (void*) s2;
3025
3026 return dictSize(*d1)-dictSize(*d2);
3027 }
3028
3029 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3030 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3031 dictIterator *di;
3032 dictEntry *de;
3033 robj *lenobj = NULL, *dstset = NULL;
3034 int j, cardinality = 0;
3035
3036 if (!dv) oom("sinterGenericCommand");
3037 for (j = 0; j < setsnum; j++) {
3038 robj *setobj;
3039
3040 setobj = dstkey ?
3041 lookupKeyWrite(c->db,setskeys[j]) :
3042 lookupKeyRead(c->db,setskeys[j]);
3043 if (!setobj) {
3044 zfree(dv);
3045 if (dstkey) {
3046 deleteKey(c->db,dstkey);
3047 addReply(c,shared.ok);
3048 } else {
3049 addReply(c,shared.nullmultibulk);
3050 }
3051 return;
3052 }
3053 if (setobj->type != REDIS_SET) {
3054 zfree(dv);
3055 addReply(c,shared.wrongtypeerr);
3056 return;
3057 }
3058 dv[j] = setobj->ptr;
3059 }
3060 /* Sort sets from the smallest to largest, this will improve our
3061 * algorithm's performace */
3062 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3063
3064 /* The first thing we should output is the total number of elements...
3065 * since this is a multi-bulk write, but at this stage we don't know
3066 * the intersection set size, so we use a trick, append an empty object
3067 * to the output list and save the pointer to later modify it with the
3068 * right length */
3069 if (!dstkey) {
3070 lenobj = createObject(REDIS_STRING,NULL);
3071 addReply(c,lenobj);
3072 decrRefCount(lenobj);
3073 } else {
3074 /* If we have a target key where to store the resulting set
3075 * create this key with an empty set inside */
3076 dstset = createSetObject();
3077 }
3078
3079 /* Iterate all the elements of the first (smallest) set, and test
3080 * the element against all the other sets, if at least one set does
3081 * not include the element it is discarded */
3082 di = dictGetIterator(dv[0]);
3083 if (!di) oom("dictGetIterator");
3084
3085 while((de = dictNext(di)) != NULL) {
3086 robj *ele;
3087
3088 for (j = 1; j < setsnum; j++)
3089 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3090 if (j != setsnum)
3091 continue; /* at least one set does not contain the member */
3092 ele = dictGetEntryKey(de);
3093 if (!dstkey) {
3094 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
3095 addReply(c,ele);
3096 addReply(c,shared.crlf);
3097 cardinality++;
3098 } else {
3099 dictAdd(dstset->ptr,ele,NULL);
3100 incrRefCount(ele);
3101 }
3102 }
3103 dictReleaseIterator(di);
3104
3105 if (dstkey) {
3106 /* Store the resulting set into the target */
3107 deleteKey(c->db,dstkey);
3108 dictAdd(c->db->dict,dstkey,dstset);
3109 incrRefCount(dstkey);
3110 }
3111
3112 if (!dstkey) {
3113 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3114 } else {
3115 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3116 dictSize((dict*)dstset->ptr)));
3117 server.dirty++;
3118 }
3119 zfree(dv);
3120 }
3121
3122 static void sinterCommand(redisClient *c) {
3123 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3124 }
3125
3126 static void sinterstoreCommand(redisClient *c) {
3127 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3128 }
3129
3130 #define REDIS_OP_UNION 0
3131 #define REDIS_OP_DIFF 1
3132
3133 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3134 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3135 dictIterator *di;
3136 dictEntry *de;
3137 robj *dstset = NULL;
3138 int j, cardinality = 0;
3139
3140 if (!dv) oom("sunionDiffGenericCommand");
3141 for (j = 0; j < setsnum; j++) {
3142 robj *setobj;
3143
3144 setobj = dstkey ?
3145 lookupKeyWrite(c->db,setskeys[j]) :
3146 lookupKeyRead(c->db,setskeys[j]);
3147 if (!setobj) {
3148 dv[j] = NULL;
3149 continue;
3150 }
3151 if (setobj->type != REDIS_SET) {
3152 zfree(dv);
3153 addReply(c,shared.wrongtypeerr);
3154 return;
3155 }
3156 dv[j] = setobj->ptr;
3157 }
3158
3159 /* We need a temp set object to store our union. If the dstkey
3160 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3161 * this set object will be the resulting object to set into the target key*/
3162 dstset = createSetObject();
3163
3164 /* Iterate all the elements of all the sets, add every element a single
3165 * time to the result set */
3166 for (j = 0; j < setsnum; j++) {
3167 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3168 if (!dv[j]) continue; /* non existing keys are like empty sets */
3169
3170 di = dictGetIterator(dv[j]);
3171 if (!di) oom("dictGetIterator");
3172
3173 while((de = dictNext(di)) != NULL) {
3174 robj *ele;
3175
3176 /* dictAdd will not add the same element multiple times */
3177 ele = dictGetEntryKey(de);
3178 if (op == REDIS_OP_UNION || j == 0) {
3179 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3180 incrRefCount(ele);
3181 cardinality++;
3182 }
3183 } else if (op == REDIS_OP_DIFF) {
3184 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3185 cardinality--;
3186 }
3187 }
3188 }
3189 dictReleaseIterator(di);
3190
3191 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3192 }
3193
3194 /* Output the content of the resulting set, if not in STORE mode */
3195 if (!dstkey) {
3196 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3197 di = dictGetIterator(dstset->ptr);
3198 if (!di) oom("dictGetIterator");
3199 while((de = dictNext(di)) != NULL) {
3200 robj *ele;
3201
3202 ele = dictGetEntryKey(de);
3203 addReplySds(c,sdscatprintf(sdsempty(),
3204 "$%d\r\n",sdslen(ele->ptr)));
3205 addReply(c,ele);
3206 addReply(c,shared.crlf);
3207 }
3208 dictReleaseIterator(di);
3209 } else {
3210 /* If we have a target key where to store the resulting set
3211 * create this key with the result set inside */
3212 deleteKey(c->db,dstkey);
3213 dictAdd(c->db->dict,dstkey,dstset);
3214 incrRefCount(dstkey);
3215 }
3216
3217 /* Cleanup */
3218 if (!dstkey) {
3219 decrRefCount(dstset);
3220 } else {
3221 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3222 dictSize((dict*)dstset->ptr)));
3223 server.dirty++;
3224 }
3225 zfree(dv);
3226 }
3227
3228 static void sunionCommand(redisClient *c) {
3229 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3230 }
3231
3232 static void sunionstoreCommand(redisClient *c) {
3233 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3234 }
3235
3236 static void sdiffCommand(redisClient *c) {
3237 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3238 }
3239
3240 static void sdiffstoreCommand(redisClient *c) {
3241 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3242 }
3243
3244 static void flushdbCommand(redisClient *c) {
3245 server.dirty += dictSize(c->db->dict);
3246 dictEmpty(c->db->dict);
3247 dictEmpty(c->db->expires);
3248 addReply(c,shared.ok);
3249 }
3250
3251 static void flushallCommand(redisClient *c) {
3252 server.dirty += emptyDb();
3253 addReply(c,shared.ok);
3254 rdbSave(server.dbfilename);
3255 server.dirty++;
3256 }
3257
3258 redisSortOperation *createSortOperation(int type, robj *pattern) {
3259 redisSortOperation *so = zmalloc(sizeof(*so));
3260 if (!so) oom("createSortOperation");
3261 so->type = type;
3262 so->pattern = pattern;
3263 return so;
3264 }
3265
3266 /* Return the value associated to the key with a name obtained
3267 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3268 robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3269 char *p;
3270 sds spat, ssub;
3271 robj keyobj;
3272 int prefixlen, sublen, postfixlen;
3273 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3274 struct {
3275 long len;
3276 long free;
3277 char buf[REDIS_SORTKEY_MAX+1];
3278 } keyname;
3279
3280 spat = pattern->ptr;
3281 ssub = subst->ptr;
3282 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3283 p = strchr(spat,'*');
3284 if (!p) return NULL;
3285
3286 prefixlen = p-spat;
3287 sublen = sdslen(ssub);
3288 postfixlen = sdslen(spat)-(prefixlen+1);
3289 memcpy(keyname.buf,spat,prefixlen);
3290 memcpy(keyname.buf+prefixlen,ssub,sublen);
3291 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3292 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3293 keyname.len = prefixlen+sublen+postfixlen;
3294
3295 keyobj.refcount = 1;
3296 keyobj.type = REDIS_STRING;
3297 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3298
3299 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3300 return lookupKeyRead(db,&keyobj);
3301 }
3302
3303 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3304 * the additional parameter is not standard but a BSD-specific we have to
3305 * pass sorting parameters via the global 'server' structure */
3306 static int sortCompare(const void *s1, const void *s2) {
3307 const redisSortObject *so1 = s1, *so2 = s2;
3308 int cmp;
3309
3310 if (!server.sort_alpha) {
3311 /* Numeric sorting. Here it's trivial as we precomputed scores */
3312 if (so1->u.score > so2->u.score) {
3313 cmp = 1;
3314 } else if (so1->u.score < so2->u.score) {
3315 cmp = -1;
3316 } else {
3317 cmp = 0;
3318 }
3319 } else {
3320 /* Alphanumeric sorting */
3321 if (server.sort_bypattern) {
3322 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3323 /* At least one compare object is NULL */
3324 if (so1->u.cmpobj == so2->u.cmpobj)
3325 cmp = 0;
3326 else if (so1->u.cmpobj == NULL)
3327 cmp = -1;
3328 else
3329 cmp = 1;
3330 } else {
3331 /* We have both the objects, use strcoll */
3332 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3333 }
3334 } else {
3335 /* Compare elements directly */
3336 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3337 }
3338 }
3339 return server.sort_desc ? -cmp : cmp;
3340 }
3341
3342 /* The SORT command is the most complex command in Redis. Warning: this code
3343 * is optimized for speed and a bit less for readability */
3344 static void sortCommand(redisClient *c) {
3345 list *operations;
3346 int outputlen = 0;
3347 int desc = 0, alpha = 0;
3348 int limit_start = 0, limit_count = -1, start, end;
3349 int j, dontsort = 0, vectorlen;
3350 int getop = 0; /* GET operation counter */
3351 robj *sortval, *sortby = NULL;
3352 redisSortObject *vector; /* Resulting vector to sort */
3353
3354 /* Lookup the key to sort. It must be of the right types */
3355 sortval = lookupKeyRead(c->db,c->argv[1]);
3356 if (sortval == NULL) {
3357 addReply(c,shared.nokeyerr);
3358 return;
3359 }
3360 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3361 addReply(c,shared.wrongtypeerr);
3362 return;
3363 }
3364
3365 /* Create a list of operations to perform for every sorted element.
3366 * Operations can be GET/DEL/INCR/DECR */
3367 operations = listCreate();
3368 listSetFreeMethod(operations,zfree);
3369 j = 2;
3370
3371 /* Now we need to protect sortval incrementing its count, in the future
3372 * SORT may have options able to overwrite/delete keys during the sorting
3373 * and the sorted key itself may get destroied */
3374 incrRefCount(sortval);
3375
3376 /* The SORT command has an SQL-alike syntax, parse it */
3377 while(j < c->argc) {
3378 int leftargs = c->argc-j-1;
3379 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3380 desc = 0;
3381 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3382 desc = 1;
3383 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3384 alpha = 1;
3385 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3386 limit_start = atoi(c->argv[j+1]->ptr);
3387 limit_count = atoi(c->argv[j+2]->ptr);
3388 j+=2;
3389 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3390 sortby = c->argv[j+1];
3391 /* If the BY pattern does not contain '*', i.e. it is constant,
3392 * we don't need to sort nor to lookup the weight keys. */
3393 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3394 j++;
3395 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3396 listAddNodeTail(operations,createSortOperation(
3397 REDIS_SORT_GET,c->argv[j+1]));
3398 getop++;
3399 j++;
3400 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3401 listAddNodeTail(operations,createSortOperation(
3402 REDIS_SORT_DEL,c->argv[j+1]));
3403 j++;
3404 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3405 listAddNodeTail(operations,createSortOperation(
3406 REDIS_SORT_INCR,c->argv[j+1]));
3407 j++;
3408 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3409 listAddNodeTail(operations,createSortOperation(
3410 REDIS_SORT_DECR,c->argv[j+1]));
3411 j++;
3412 } else {
3413 decrRefCount(sortval);
3414 listRelease(operations);
3415 addReply(c,shared.syntaxerr);
3416 return;
3417 }
3418 j++;
3419 }
3420
3421 /* Load the sorting vector with all the objects to sort */
3422 vectorlen = (sortval->type == REDIS_LIST) ?
3423 listLength((list*)sortval->ptr) :
3424 dictSize((dict*)sortval->ptr);
3425 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3426 if (!vector) oom("allocating objects vector for SORT");
3427 j = 0;
3428 if (sortval->type == REDIS_LIST) {
3429 list *list = sortval->ptr;
3430 listNode *ln;
3431
3432 listRewind(list);
3433 while((ln = listYield(list))) {
3434 robj *ele = ln->value;
3435 vector[j].obj = ele;
3436 vector[j].u.score = 0;
3437 vector[j].u.cmpobj = NULL;
3438 j++;
3439 }
3440 } else {
3441 dict *set = sortval->ptr;
3442 dictIterator *di;
3443 dictEntry *setele;
3444
3445 di = dictGetIterator(set);
3446 if (!di) oom("dictGetIterator");
3447 while((setele = dictNext(di)) != NULL) {
3448 vector[j].obj = dictGetEntryKey(setele);
3449 vector[j].u.score = 0;
3450 vector[j].u.cmpobj = NULL;
3451 j++;
3452 }
3453 dictReleaseIterator(di);
3454 }
3455 assert(j == vectorlen);
3456
3457 /* Now it's time to load the right scores in the sorting vector */
3458 if (dontsort == 0) {
3459 for (j = 0; j < vectorlen; j++) {
3460 if (sortby) {
3461 robj *byval;
3462
3463 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3464 if (!byval || byval->type != REDIS_STRING) continue;
3465 if (alpha) {
3466 vector[j].u.cmpobj = byval;
3467 incrRefCount(byval);
3468 } else {
3469 vector[j].u.score = strtod(byval->ptr,NULL);
3470 }
3471 } else {
3472 if (!alpha) vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3473 }
3474 }
3475 }
3476
3477 /* We are ready to sort the vector... perform a bit of sanity check
3478 * on the LIMIT option too. We'll use a partial version of quicksort. */
3479 start = (limit_start < 0) ? 0 : limit_start;
3480 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3481 if (start >= vectorlen) {
3482 start = vectorlen-1;
3483 end = vectorlen-2;
3484 }
3485 if (end >= vectorlen) end = vectorlen-1;
3486
3487 if (dontsort == 0) {
3488 server.sort_desc = desc;
3489 server.sort_alpha = alpha;
3490 server.sort_bypattern = sortby ? 1 : 0;
3491 if (sortby && (start != 0 || end != vectorlen-1))
3492 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3493 else
3494 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3495 }
3496
3497 /* Send command output to the output buffer, performing the specified
3498 * GET/DEL/INCR/DECR operations if any. */
3499 outputlen = getop ? getop*(end-start+1) : end-start+1;
3500 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3501 for (j = start; j <= end; j++) {
3502 listNode *ln;
3503 if (!getop) {
3504 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3505 sdslen(vector[j].obj->ptr)));
3506 addReply(c,vector[j].obj);
3507 addReply(c,shared.crlf);
3508 }
3509 listRewind(operations);
3510 while((ln = listYield(operations))) {
3511 redisSortOperation *sop = ln->value;
3512 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3513 vector[j].obj);
3514
3515 if (sop->type == REDIS_SORT_GET) {
3516 if (!val || val->type != REDIS_STRING) {
3517 addReply(c,shared.nullbulk);
3518 } else {
3519 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3520 sdslen(val->ptr)));
3521 addReply(c,val);
3522 addReply(c,shared.crlf);
3523 }
3524 } else if (sop->type == REDIS_SORT_DEL) {
3525 /* TODO */
3526 }
3527 }
3528 }
3529
3530 /* Cleanup */
3531 decrRefCount(sortval);
3532 listRelease(operations);
3533 for (j = 0; j < vectorlen; j++) {
3534 if (sortby && alpha && vector[j].u.cmpobj)
3535 decrRefCount(vector[j].u.cmpobj);
3536 }
3537 zfree(vector);
3538 }
3539
3540 static void infoCommand(redisClient *c) {
3541 sds info;
3542 time_t uptime = time(NULL)-server.stat_starttime;
3543
3544 info = sdscatprintf(sdsempty(),
3545 "redis_version:%s\r\n"
3546 "uptime_in_seconds:%d\r\n"
3547 "uptime_in_days:%d\r\n"
3548 "connected_clients:%d\r\n"
3549 "connected_slaves:%d\r\n"
3550 "used_memory:%zu\r\n"
3551 "changes_since_last_save:%lld\r\n"
3552 "bgsave_in_progress:%d\r\n"
3553 "last_save_time:%d\r\n"
3554 "total_connections_received:%lld\r\n"
3555 "total_commands_processed:%lld\r\n"
3556 "role:%s\r\n"
3557 ,REDIS_VERSION,
3558 uptime,
3559 uptime/(3600*24),
3560 listLength(server.clients)-listLength(server.slaves),
3561 listLength(server.slaves),
3562 server.usedmemory,
3563 server.dirty,
3564 server.bgsaveinprogress,
3565 server.lastsave,
3566 server.stat_numconnections,
3567 server.stat_numcommands,
3568 server.masterhost == NULL ? "master" : "slave"
3569 );
3570 if (server.masterhost) {
3571 info = sdscatprintf(info,
3572 "master_host:%s\r\n"
3573 "master_port:%d\r\n"
3574 "master_link_status:%s\r\n"
3575 "master_last_io_seconds_ago:%d\r\n"
3576 ,server.masterhost,
3577 server.masterport,
3578 (server.replstate == REDIS_REPL_CONNECTED) ?
3579 "up" : "down",
3580 (int)(time(NULL)-server.master->lastinteraction)
3581 );
3582 }
3583 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
3584 addReplySds(c,info);
3585 addReply(c,shared.crlf);
3586 }
3587
3588 static void monitorCommand(redisClient *c) {
3589 /* ignore MONITOR if aleady slave or in monitor mode */
3590 if (c->flags & REDIS_SLAVE) return;
3591
3592 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3593 c->slaveseldb = 0;
3594 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3595 addReply(c,shared.ok);
3596 }
3597
3598 /* ================================= Expire ================================= */
3599 static int removeExpire(redisDb *db, robj *key) {
3600 if (dictDelete(db->expires,key) == DICT_OK) {
3601 return 1;
3602 } else {
3603 return 0;
3604 }
3605 }
3606
3607 static int setExpire(redisDb *db, robj *key, time_t when) {
3608 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
3609 return 0;
3610 } else {
3611 incrRefCount(key);
3612 return 1;
3613 }
3614 }
3615
3616 /* Return the expire time of the specified key, or -1 if no expire
3617 * is associated with this key (i.e. the key is non volatile) */
3618 static time_t getExpire(redisDb *db, robj *key) {
3619 dictEntry *de;
3620
3621 /* No expire? return ASAP */
3622 if (dictSize(db->expires) == 0 ||
3623 (de = dictFind(db->expires,key)) == NULL) return -1;
3624
3625 return (time_t) dictGetEntryVal(de);
3626 }
3627
3628 static int expireIfNeeded(redisDb *db, robj *key) {
3629 time_t when;
3630 dictEntry *de;
3631
3632 /* No expire? return ASAP */
3633 if (dictSize(db->expires) == 0 ||
3634 (de = dictFind(db->expires,key)) == NULL) return 0;
3635
3636 /* Lookup the expire */
3637 when = (time_t) dictGetEntryVal(de);
3638 if (time(NULL) <= when) return 0;
3639
3640 /* Delete the key */
3641 dictDelete(db->expires,key);
3642 return dictDelete(db->dict,key) == DICT_OK;
3643 }
3644
3645 static int deleteIfVolatile(redisDb *db, robj *key) {
3646 dictEntry *de;
3647
3648 /* No expire? return ASAP */
3649 if (dictSize(db->expires) == 0 ||
3650 (de = dictFind(db->expires,key)) == NULL) return 0;
3651
3652 /* Delete the key */
3653 server.dirty++;
3654 dictDelete(db->expires,key);
3655 return dictDelete(db->dict,key) == DICT_OK;
3656 }
3657
3658 static void expireCommand(redisClient *c) {
3659 dictEntry *de;
3660 int seconds = atoi(c->argv[2]->ptr);
3661
3662 de = dictFind(c->db->dict,c->argv[1]);
3663 if (de == NULL) {
3664 addReply(c,shared.czero);
3665 return;
3666 }
3667 if (seconds <= 0) {
3668 addReply(c, shared.czero);
3669 return;
3670 } else {
3671 time_t when = time(NULL)+seconds;
3672 if (setExpire(c->db,c->argv[1],when))
3673 addReply(c,shared.cone);
3674 else
3675 addReply(c,shared.czero);
3676 return;
3677 }
3678 }
3679
3680 static void ttlCommand(redisClient *c) {
3681 time_t expire;
3682 int ttl = -1;
3683
3684 expire = getExpire(c->db,c->argv[1]);
3685 if (expire != -1) {
3686 ttl = (int) (expire-time(NULL));
3687 if (ttl < 0) ttl = -1;
3688 }
3689 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
3690 }
3691
3692 /* =============================== Replication ============================= */
3693
3694 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
3695 ssize_t nwritten, ret = size;
3696 time_t start = time(NULL);
3697
3698 timeout++;
3699 while(size) {
3700 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
3701 nwritten = write(fd,ptr,size);
3702 if (nwritten == -1) return -1;
3703 ptr += nwritten;
3704 size -= nwritten;
3705 }
3706 if ((time(NULL)-start) > timeout) {
3707 errno = ETIMEDOUT;
3708 return -1;
3709 }
3710 }
3711 return ret;
3712 }
3713
3714 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
3715 ssize_t nread, totread = 0;
3716 time_t start = time(NULL);
3717
3718 timeout++;
3719 while(size) {
3720 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
3721 nread = read(fd,ptr,size);
3722 if (nread == -1) return -1;
3723 ptr += nread;
3724 size -= nread;
3725 totread += nread;
3726 }
3727 if ((time(NULL)-start) > timeout) {
3728 errno = ETIMEDOUT;
3729 return -1;
3730 }
3731 }
3732 return totread;
3733 }
3734
3735 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
3736 ssize_t nread = 0;
3737
3738 size--;
3739 while(size) {
3740 char c;
3741
3742 if (syncRead(fd,&c,1,timeout) == -1) return -1;
3743 if (c == '\n') {
3744 *ptr = '\0';
3745 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
3746 return nread;
3747 } else {
3748 *ptr++ = c;
3749 *ptr = '\0';
3750 nread++;
3751 }
3752 }
3753 return nread;
3754 }
3755
3756 static void syncCommand(redisClient *c) {
3757 /* ignore SYNC if aleady slave or in monitor mode */
3758 if (c->flags & REDIS_SLAVE) return;
3759
3760 /* SYNC can't be issued when the server has pending data to send to
3761 * the client about already issued commands. We need a fresh reply
3762 * buffer registering the differences between the BGSAVE and the current
3763 * dataset, so that we can copy to other slaves if needed. */
3764 if (listLength(c->reply) != 0) {
3765 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3766 return;
3767 }
3768
3769 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
3770 /* Here we need to check if there is a background saving operation
3771 * in progress, or if it is required to start one */
3772 if (server.bgsaveinprogress) {
3773 /* Ok a background save is in progress. Let's check if it is a good
3774 * one for replication, i.e. if there is another slave that is
3775 * registering differences since the server forked to save */
3776 redisClient *slave;
3777 listNode *ln;
3778
3779 listRewind(server.slaves);
3780 while((ln = listYield(server.slaves))) {
3781 slave = ln->value;
3782 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
3783 }
3784 if (ln) {
3785 /* Perfect, the server is already registering differences for
3786 * another slave. Set the right state, and copy the buffer. */
3787 listRelease(c->reply);
3788 c->reply = listDup(slave->reply);
3789 if (!c->reply) oom("listDup copying slave reply list");
3790 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3791 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
3792 } else {
3793 /* No way, we need to wait for the next BGSAVE in order to
3794 * register differences */
3795 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
3796 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
3797 }
3798 } else {
3799 /* Ok we don't have a BGSAVE in progress, let's start one */
3800 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
3801 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3802 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
3803 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
3804 return;
3805 }
3806 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3807 }
3808 c->repldbfd = -1;
3809 c->flags |= REDIS_SLAVE;
3810 c->slaveseldb = 0;
3811 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
3812 return;
3813 }
3814
3815 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
3816 redisClient *slave = privdata;
3817 REDIS_NOTUSED(el);
3818 REDIS_NOTUSED(mask);
3819 char buf[REDIS_IOBUF_LEN];
3820 ssize_t nwritten, buflen;
3821
3822 if (slave->repldboff == 0) {
3823 /* Write the bulk write count before to transfer the DB. In theory here
3824 * we don't know how much room there is in the output buffer of the
3825 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3826 * operations) will never be smaller than the few bytes we need. */
3827 sds bulkcount;
3828
3829 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3830 slave->repldbsize);
3831 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
3832 {
3833 sdsfree(bulkcount);
3834 freeClient(slave);
3835 return;
3836 }
3837 sdsfree(bulkcount);
3838 }
3839 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
3840 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
3841 if (buflen <= 0) {
3842 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
3843 (buflen == 0) ? "premature EOF" : strerror(errno));
3844 freeClient(slave);
3845 return;
3846 }
3847 if ((nwritten = write(fd,buf,buflen)) == -1) {
3848 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
3849 strerror(errno));
3850 freeClient(slave);
3851 return;
3852 }
3853 slave->repldboff += nwritten;
3854 if (slave->repldboff == slave->repldbsize) {
3855 close(slave->repldbfd);
3856 slave->repldbfd = -1;
3857 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3858 slave->replstate = REDIS_REPL_ONLINE;
3859 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
3860 sendReplyToClient, slave, NULL) == AE_ERR) {
3861 freeClient(slave);
3862 return;
3863 }
3864 addReplySds(slave,sdsempty());
3865 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
3866 }
3867 }
3868
3869 static void updateSalvesWaitingBgsave(int bgsaveerr) {
3870 listNode *ln;
3871 int startbgsave = 0;
3872
3873 listRewind(server.slaves);
3874 while((ln = listYield(server.slaves))) {
3875 redisClient *slave = ln->value;
3876
3877 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
3878 startbgsave = 1;
3879 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3880 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
3881 struct redis_stat buf;
3882
3883 if (bgsaveerr != REDIS_OK) {
3884 freeClient(slave);
3885 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
3886 continue;
3887 }
3888 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
3889 redis_fstat(slave->repldbfd,&buf) == -1) {
3890 freeClient(slave);
3891 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
3892 continue;
3893 }
3894 slave->repldboff = 0;
3895 slave->repldbsize = buf.st_size;
3896 slave->replstate = REDIS_REPL_SEND_BULK;
3897 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3898 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
3899 freeClient(slave);
3900 continue;
3901 }
3902 }
3903 }
3904 if (startbgsave) {
3905 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3906 listRewind(server.slaves);
3907 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
3908 while((ln = listYield(server.slaves))) {
3909 redisClient *slave = ln->value;
3910
3911 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
3912 freeClient(slave);
3913 }
3914 }
3915 }
3916 }
3917
3918 static int syncWithMaster(void) {
3919 char buf[1024], tmpfile[256];
3920 int dumpsize;
3921 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
3922 int dfd;
3923
3924 if (fd == -1) {
3925 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
3926 strerror(errno));
3927 return REDIS_ERR;
3928 }
3929 /* Issue the SYNC command */
3930 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
3931 close(fd);
3932 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
3933 strerror(errno));
3934 return REDIS_ERR;
3935 }
3936 /* Read the bulk write count */
3937 if (syncReadLine(fd,buf,1024,3600) == -1) {
3938 close(fd);
3939 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
3940 strerror(errno));
3941 return REDIS_ERR;
3942 }
3943 dumpsize = atoi(buf+1);
3944 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
3945 /* Read the bulk write data on a temp file */
3946 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
3947 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
3948 if (dfd == -1) {
3949 close(fd);
3950 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
3951 return REDIS_ERR;
3952 }
3953 while(dumpsize) {
3954 int nread, nwritten;
3955
3956 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
3957 if (nread == -1) {
3958 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
3959 strerror(errno));
3960 close(fd);
3961 close(dfd);
3962 return REDIS_ERR;
3963 }
3964 nwritten = write(dfd,buf,nread);
3965 if (nwritten == -1) {
3966 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
3967 close(fd);
3968 close(dfd);
3969 return REDIS_ERR;
3970 }
3971 dumpsize -= nread;
3972 }
3973 close(dfd);
3974 if (rename(tmpfile,server.dbfilename) == -1) {
3975 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
3976 unlink(tmpfile);
3977 close(fd);
3978 return REDIS_ERR;
3979 }
3980 emptyDb();
3981 if (rdbLoad(server.dbfilename) != REDIS_OK) {
3982 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
3983 close(fd);
3984 return REDIS_ERR;
3985 }
3986 server.master = createClient(fd);
3987 server.master->flags |= REDIS_MASTER;
3988 server.replstate = REDIS_REPL_CONNECTED;
3989 return REDIS_OK;
3990 }
3991
3992 static void slaveofCommand(redisClient *c) {
3993 if (!strcasecmp(c->argv[1]->ptr,"no") &&
3994 !strcasecmp(c->argv[2]->ptr,"one")) {
3995 if (server.masterhost) {
3996 sdsfree(server.masterhost);
3997 server.masterhost = NULL;
3998 if (server.master) freeClient(server.master);
3999 server.replstate = REDIS_REPL_NONE;
4000 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4001 }
4002 } else {
4003 sdsfree(server.masterhost);
4004 server.masterhost = sdsdup(c->argv[1]->ptr);
4005 server.masterport = atoi(c->argv[2]->ptr);
4006 if (server.master) freeClient(server.master);
4007 server.replstate = REDIS_REPL_CONNECT;
4008 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4009 server.masterhost, server.masterport);
4010 }
4011 addReply(c,shared.ok);
4012 }
4013
4014 /* ============================ Maxmemory directive ======================== */
4015
4016 /* This function gets called when 'maxmemory' is set on the config file to limit
4017 * the max memory used by the server, and we are out of memory.
4018 * This function will try to, in order:
4019 *
4020 * - Free objects from the free list
4021 * - Try to remove keys with an EXPIRE set
4022 *
4023 * It is not possible to free enough memory to reach used-memory < maxmemory
4024 * the server will start refusing commands that will enlarge even more the
4025 * memory usage.
4026 */
4027 static void freeMemoryIfNeeded(void) {
4028 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4029 if (listLength(server.objfreelist)) {
4030 robj *o;
4031
4032 listNode *head = listFirst(server.objfreelist);
4033 o = listNodeValue(head);
4034 listDelNode(server.objfreelist,head);
4035 zfree(o);
4036 } else {
4037 int j, k, freed = 0;
4038
4039 for (j = 0; j < server.dbnum; j++) {
4040 int minttl = -1;
4041 robj *minkey = NULL;
4042 struct dictEntry *de;
4043
4044 if (dictSize(server.db[j].expires)) {
4045 freed = 1;
4046 /* From a sample of three keys drop the one nearest to
4047 * the natural expire */
4048 for (k = 0; k < 3; k++) {
4049 time_t t;
4050
4051 de = dictGetRandomKey(server.db[j].expires);
4052 t = (time_t) dictGetEntryVal(de);
4053 if (minttl == -1 || t < minttl) {
4054 minkey = dictGetEntryKey(de);
4055 minttl = t;
4056 }
4057 }
4058 deleteKey(server.db+j,minkey);
4059 }
4060 }
4061 if (!freed) return; /* nothing to free... */
4062 }
4063 }
4064 }
4065
4066 /* ================================= Debugging ============================== */
4067
4068 static void debugCommand(redisClient *c) {
4069 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4070 *((char*)-1) = 'x';
4071 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4072 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4073 robj *key, *val;
4074
4075 if (!de) {
4076 addReply(c,shared.nokeyerr);
4077 return;
4078 }
4079 key = dictGetEntryKey(de);
4080 val = dictGetEntryVal(de);
4081 addReplySds(c,sdscatprintf(sdsempty(),
4082 "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
4083 key, key->refcount, val, val->refcount));
4084 } else {
4085 addReplySds(c,sdsnew(
4086 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4087 }
4088 }
4089
4090 /* =================================== Main! ================================ */
4091
4092 #ifdef __linux__
4093 int linuxOvercommitMemoryValue(void) {
4094 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4095 char buf[64];
4096
4097 if (!fp) return -1;
4098 if (fgets(buf,64,fp) == NULL) {
4099 fclose(fp);
4100 return -1;
4101 }
4102 fclose(fp);
4103
4104 return atoi(buf);
4105 }
4106
4107 void linuxOvercommitMemoryWarning(void) {
4108 if (linuxOvercommitMemoryValue() == 0) {
4109 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4110 }
4111 }
4112 #endif /* __linux__ */
4113
4114 static void daemonize(void) {
4115 int fd;
4116 FILE *fp;
4117
4118 if (fork() != 0) exit(0); /* parent exits */
4119 setsid(); /* create a new session */
4120
4121 /* Every output goes to /dev/null. If Redis is daemonized but
4122 * the 'logfile' is set to 'stdout' in the configuration file
4123 * it will not log at all. */
4124 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4125 dup2(fd, STDIN_FILENO);
4126 dup2(fd, STDOUT_FILENO);
4127 dup2(fd, STDERR_FILENO);
4128 if (fd > STDERR_FILENO) close(fd);
4129 }
4130 /* Try to write the pid file */
4131 fp = fopen(server.pidfile,"w");
4132 if (fp) {
4133 fprintf(fp,"%d\n",getpid());
4134 fclose(fp);
4135 }
4136 }
4137
4138 int main(int argc, char **argv) {
4139 #ifdef __linux__
4140 linuxOvercommitMemoryWarning();
4141 #endif
4142
4143 initServerConfig();
4144 if (argc == 2) {
4145 ResetServerSaveParams();
4146 loadServerConfig(argv[1]);
4147 } else if (argc > 2) {
4148 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4149 exit(1);
4150 } else {
4151 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4152 }
4153 initServer();
4154 if (server.daemonize) daemonize();
4155 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4156 if (rdbLoad(server.dbfilename) == REDIS_OK)
4157 redisLog(REDIS_NOTICE,"DB loaded from disk");
4158 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4159 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4160 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4161 aeMain(server.el);
4162 aeDeleteEventLoop(server.el);
4163 return 0;
4164 }