]> git.saurik.com Git - redis.git/blob - redis.c
TODO updated
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "0.101"
31
32 #include "fmacros.h"
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 #include <signal.h>
40 #include <sys/wait.h>
41 #include <errno.h>
42 #include <assert.h>
43 #include <ctype.h>
44 #include <stdarg.h>
45 #include <inttypes.h>
46 #include <arpa/inet.h>
47 #include <sys/stat.h>
48 #include <fcntl.h>
49 #include <sys/time.h>
50 #include <sys/resource.h>
51 #include <limits.h>
52 #include <execinfo.h>
53
54 #include "ae.h" /* Event driven programming library */
55 #include "sds.h" /* Dynamic safe strings */
56 #include "anet.h" /* Networking the easy way */
57 #include "dict.h" /* Hash tables */
58 #include "adlist.h" /* Linked lists */
59 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
60 #include "lzf.h" /* LZF compression library */
61 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
62
63 #include "config.h"
64
65 /* Error codes */
66 #define REDIS_OK 0
67 #define REDIS_ERR -1
68
69 /* Static server configuration */
70 #define REDIS_SERVERPORT 6379 /* TCP port */
71 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
72 #define REDIS_IOBUF_LEN 1024
73 #define REDIS_LOADBUF_LEN 1024
74 #define REDIS_STATIC_ARGS 4
75 #define REDIS_DEFAULT_DBNUM 16
76 #define REDIS_CONFIGLINE_MAX 1024
77 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
78 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
79 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
80 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
81
82 /* Hash table parameters */
83 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
84 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
85
86 /* Command flags */
87 #define REDIS_CMD_BULK 1 /* Bulk write command */
88 #define REDIS_CMD_INLINE 2 /* Inline command */
89 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
90 this flags will return an error when the 'maxmemory' option is set in the
91 config file and the server is using more than maxmemory bytes of memory.
92 In short this commands are denied on low memory conditions. */
93 #define REDIS_CMD_DENYOOM 4
94
95 /* Object types */
96 #define REDIS_STRING 0
97 #define REDIS_LIST 1
98 #define REDIS_SET 2
99 #define REDIS_HASH 3
100
101 /* Object types only used for dumping to disk */
102 #define REDIS_EXPIRETIME 253
103 #define REDIS_SELECTDB 254
104 #define REDIS_EOF 255
105
106 /* Defines related to the dump file format. To store 32 bits lengths for short
107 * keys requires a lot of space, so we check the most significant 2 bits of
108 * the first byte to interpreter the length:
109 *
110 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
111 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
112 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
113 * 11|000000 this means: specially encoded object will follow. The six bits
114 * number specify the kind of object that follows.
115 * See the REDIS_RDB_ENC_* defines.
116 *
117 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
118 * values, will fit inside. */
119 #define REDIS_RDB_6BITLEN 0
120 #define REDIS_RDB_14BITLEN 1
121 #define REDIS_RDB_32BITLEN 2
122 #define REDIS_RDB_ENCVAL 3
123 #define REDIS_RDB_LENERR UINT_MAX
124
125 /* When a length of a string object stored on disk has the first two bits
126 * set, the remaining two bits specify a special encoding for the object
127 * accordingly to the following defines: */
128 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
129 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
130 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
131 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
132
133 /* Client flags */
134 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
135 #define REDIS_SLAVE 2 /* This client is a slave server */
136 #define REDIS_MASTER 4 /* This client is a master server */
137 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
138
139 /* Slave replication state - slave side */
140 #define REDIS_REPL_NONE 0 /* No active replication */
141 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
142 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
143
144 /* Slave replication state - from the point of view of master
145 * Note that in SEND_BULK and ONLINE state the slave receives new updates
146 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
147 * to start the next background saving in order to send updates to it. */
148 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
149 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
150 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
151 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
152
153 /* List related stuff */
154 #define REDIS_HEAD 0
155 #define REDIS_TAIL 1
156
157 /* Sort operations */
158 #define REDIS_SORT_GET 0
159 #define REDIS_SORT_DEL 1
160 #define REDIS_SORT_INCR 2
161 #define REDIS_SORT_DECR 3
162 #define REDIS_SORT_ASC 4
163 #define REDIS_SORT_DESC 5
164 #define REDIS_SORTKEY_MAX 1024
165
166 /* Log levels */
167 #define REDIS_DEBUG 0
168 #define REDIS_NOTICE 1
169 #define REDIS_WARNING 2
170
171 /* Anti-warning macro... */
172 #define REDIS_NOTUSED(V) ((void) V)
173
174 /*================================= Data types ============================== */
175
176 /* A redis object, that is a type able to hold a string / list / set */
177 typedef struct redisObject {
178 void *ptr;
179 int type;
180 int refcount;
181 } robj;
182
183 typedef struct redisDb {
184 dict *dict;
185 dict *expires;
186 int id;
187 } redisDb;
188
189 /* With multiplexing we need to take per-clinet state.
190 * Clients are taken in a liked list. */
191 typedef struct redisClient {
192 int fd;
193 redisDb *db;
194 int dictid;
195 sds querybuf;
196 robj **argv;
197 int argc;
198 int bulklen; /* bulk read len. -1 if not in bulk read mode */
199 list *reply;
200 int sentlen;
201 time_t lastinteraction; /* time of the last interaction, used for timeout */
202 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
203 int slaveseldb; /* slave selected db, if this client is a slave */
204 int authenticated; /* when requirepass is non-NULL */
205 int replstate; /* replication state if this is a slave */
206 int repldbfd; /* replication DB file descriptor */
207 long repldboff; /* replication DB file offset */
208 off_t repldbsize; /* replication DB file size */
209 } redisClient;
210
211 struct saveparam {
212 time_t seconds;
213 int changes;
214 };
215
216 /* Global server state structure */
217 struct redisServer {
218 int port;
219 int fd;
220 redisDb *db;
221 dict *sharingpool;
222 unsigned int sharingpoolsize;
223 long long dirty; /* changes to DB from the last save */
224 list *clients;
225 list *slaves, *monitors;
226 char neterr[ANET_ERR_LEN];
227 aeEventLoop *el;
228 int cronloops; /* number of times the cron function run */
229 list *objfreelist; /* A list of freed objects to avoid malloc() */
230 time_t lastsave; /* Unix time of last save succeeede */
231 size_t usedmemory; /* Used memory in megabytes */
232 /* Fields used only for stats */
233 time_t stat_starttime; /* server start time */
234 long long stat_numcommands; /* number of processed commands */
235 long long stat_numconnections; /* number of connections received */
236 /* Configuration */
237 int verbosity;
238 int glueoutputbuf;
239 int maxidletime;
240 int dbnum;
241 int daemonize;
242 char *pidfile;
243 int bgsaveinprogress;
244 pid_t bgsavechildpid;
245 struct saveparam *saveparams;
246 int saveparamslen;
247 char *logfile;
248 char *bindaddr;
249 char *dbfilename;
250 char *requirepass;
251 int shareobjects;
252 /* Replication related */
253 int isslave;
254 char *masterhost;
255 int masterport;
256 redisClient *master; /* client that is master for this slave */
257 int replstate;
258 unsigned int maxclients;
259 unsigned int maxmemory;
260 /* Sort parameters - qsort_r() is only available under BSD so we
261 * have to take this state global, in order to pass it to sortCompare() */
262 int sort_desc;
263 int sort_alpha;
264 int sort_bypattern;
265 };
266
267 typedef void redisCommandProc(redisClient *c);
268 struct redisCommand {
269 char *name;
270 redisCommandProc *proc;
271 int arity;
272 int flags;
273 };
274
275 typedef struct _redisSortObject {
276 robj *obj;
277 union {
278 double score;
279 robj *cmpobj;
280 } u;
281 } redisSortObject;
282
283 typedef struct _redisSortOperation {
284 int type;
285 robj *pattern;
286 } redisSortOperation;
287
288 struct sharedObjectsStruct {
289 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
290 *colon, *nullbulk, *nullmultibulk,
291 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
292 *outofrangeerr, *plus,
293 *select0, *select1, *select2, *select3, *select4,
294 *select5, *select6, *select7, *select8, *select9;
295 } shared;
296
297 /*================================ Prototypes =============================== */
298
299 static void freeStringObject(robj *o);
300 static void freeListObject(robj *o);
301 static void freeSetObject(robj *o);
302 static void decrRefCount(void *o);
303 static robj *createObject(int type, void *ptr);
304 static void freeClient(redisClient *c);
305 static int rdbLoad(char *filename);
306 static void addReply(redisClient *c, robj *obj);
307 static void addReplySds(redisClient *c, sds s);
308 static void incrRefCount(robj *o);
309 static int rdbSaveBackground(char *filename);
310 static robj *createStringObject(char *ptr, size_t len);
311 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
312 static int syncWithMaster(void);
313 static robj *tryObjectSharing(robj *o);
314 static int removeExpire(redisDb *db, robj *key);
315 static int expireIfNeeded(redisDb *db, robj *key);
316 static int deleteIfVolatile(redisDb *db, robj *key);
317 static int deleteKey(redisDb *db, robj *key);
318 static time_t getExpire(redisDb *db, robj *key);
319 static int setExpire(redisDb *db, robj *key, time_t when);
320 static void updateSalvesWaitingBgsave(int bgsaveerr);
321 static void freeMemoryIfNeeded(void);
322
323 static void authCommand(redisClient *c);
324 static void pingCommand(redisClient *c);
325 static void echoCommand(redisClient *c);
326 static void setCommand(redisClient *c);
327 static void setnxCommand(redisClient *c);
328 static void getCommand(redisClient *c);
329 static void delCommand(redisClient *c);
330 static void existsCommand(redisClient *c);
331 static void incrCommand(redisClient *c);
332 static void decrCommand(redisClient *c);
333 static void incrbyCommand(redisClient *c);
334 static void decrbyCommand(redisClient *c);
335 static void selectCommand(redisClient *c);
336 static void randomkeyCommand(redisClient *c);
337 static void keysCommand(redisClient *c);
338 static void dbsizeCommand(redisClient *c);
339 static void lastsaveCommand(redisClient *c);
340 static void saveCommand(redisClient *c);
341 static void bgsaveCommand(redisClient *c);
342 static void shutdownCommand(redisClient *c);
343 static void moveCommand(redisClient *c);
344 static void renameCommand(redisClient *c);
345 static void renamenxCommand(redisClient *c);
346 static void lpushCommand(redisClient *c);
347 static void rpushCommand(redisClient *c);
348 static void lpopCommand(redisClient *c);
349 static void rpopCommand(redisClient *c);
350 static void llenCommand(redisClient *c);
351 static void lindexCommand(redisClient *c);
352 static void lrangeCommand(redisClient *c);
353 static void ltrimCommand(redisClient *c);
354 static void typeCommand(redisClient *c);
355 static void lsetCommand(redisClient *c);
356 static void saddCommand(redisClient *c);
357 static void sremCommand(redisClient *c);
358 static void smoveCommand(redisClient *c);
359 static void sismemberCommand(redisClient *c);
360 static void scardCommand(redisClient *c);
361 static void sinterCommand(redisClient *c);
362 static void sinterstoreCommand(redisClient *c);
363 static void sunionCommand(redisClient *c);
364 static void sunionstoreCommand(redisClient *c);
365 static void sdiffCommand(redisClient *c);
366 static void sdiffstoreCommand(redisClient *c);
367 static void syncCommand(redisClient *c);
368 static void flushdbCommand(redisClient *c);
369 static void flushallCommand(redisClient *c);
370 static void sortCommand(redisClient *c);
371 static void lremCommand(redisClient *c);
372 static void infoCommand(redisClient *c);
373 static void mgetCommand(redisClient *c);
374 static void monitorCommand(redisClient *c);
375 static void expireCommand(redisClient *c);
376 static void getSetCommand(redisClient *c);
377 static void ttlCommand(redisClient *c);
378 static void slaveofCommand(redisClient *c);
379 static void debugCommand(redisClient *c);
380
381 /*================================= Globals ================================= */
382
383 /* Global vars */
384 static struct redisServer server; /* server global state */
385 static struct redisCommand cmdTable[] = {
386 {"get",getCommand,2,REDIS_CMD_INLINE},
387 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
388 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
389 {"del",delCommand,-2,REDIS_CMD_INLINE},
390 {"exists",existsCommand,2,REDIS_CMD_INLINE},
391 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
392 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
393 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
394 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
395 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
396 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
397 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
398 {"llen",llenCommand,2,REDIS_CMD_INLINE},
399 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
400 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
401 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
402 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
403 {"lrem",lremCommand,4,REDIS_CMD_BULK},
404 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
405 {"srem",sremCommand,3,REDIS_CMD_BULK},
406 {"smove",smoveCommand,4,REDIS_CMD_BULK},
407 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
408 {"scard",scardCommand,2,REDIS_CMD_INLINE},
409 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
410 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
411 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
412 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
413 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
414 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
415 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
416 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
417 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
418 {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
419 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
420 {"select",selectCommand,2,REDIS_CMD_INLINE},
421 {"move",moveCommand,3,REDIS_CMD_INLINE},
422 {"rename",renameCommand,3,REDIS_CMD_INLINE},
423 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
424 {"expire",expireCommand,3,REDIS_CMD_INLINE},
425 {"keys",keysCommand,2,REDIS_CMD_INLINE},
426 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
427 {"auth",authCommand,2,REDIS_CMD_INLINE},
428 {"ping",pingCommand,1,REDIS_CMD_INLINE},
429 {"echo",echoCommand,2,REDIS_CMD_BULK},
430 {"save",saveCommand,1,REDIS_CMD_INLINE},
431 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
432 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
433 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
434 {"type",typeCommand,2,REDIS_CMD_INLINE},
435 {"sync",syncCommand,1,REDIS_CMD_INLINE},
436 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
437 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
438 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
439 {"info",infoCommand,1,REDIS_CMD_INLINE},
440 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
441 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
442 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
443 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
444 {NULL,NULL,0,0}
445 };
446
447 /*============================ Utility functions ============================ */
448
449 /* Glob-style pattern matching. */
450 int stringmatchlen(const char *pattern, int patternLen,
451 const char *string, int stringLen, int nocase)
452 {
453 while(patternLen) {
454 switch(pattern[0]) {
455 case '*':
456 while (pattern[1] == '*') {
457 pattern++;
458 patternLen--;
459 }
460 if (patternLen == 1)
461 return 1; /* match */
462 while(stringLen) {
463 if (stringmatchlen(pattern+1, patternLen-1,
464 string, stringLen, nocase))
465 return 1; /* match */
466 string++;
467 stringLen--;
468 }
469 return 0; /* no match */
470 break;
471 case '?':
472 if (stringLen == 0)
473 return 0; /* no match */
474 string++;
475 stringLen--;
476 break;
477 case '[':
478 {
479 int not, match;
480
481 pattern++;
482 patternLen--;
483 not = pattern[0] == '^';
484 if (not) {
485 pattern++;
486 patternLen--;
487 }
488 match = 0;
489 while(1) {
490 if (pattern[0] == '\\') {
491 pattern++;
492 patternLen--;
493 if (pattern[0] == string[0])
494 match = 1;
495 } else if (pattern[0] == ']') {
496 break;
497 } else if (patternLen == 0) {
498 pattern--;
499 patternLen++;
500 break;
501 } else if (pattern[1] == '-' && patternLen >= 3) {
502 int start = pattern[0];
503 int end = pattern[2];
504 int c = string[0];
505 if (start > end) {
506 int t = start;
507 start = end;
508 end = t;
509 }
510 if (nocase) {
511 start = tolower(start);
512 end = tolower(end);
513 c = tolower(c);
514 }
515 pattern += 2;
516 patternLen -= 2;
517 if (c >= start && c <= end)
518 match = 1;
519 } else {
520 if (!nocase) {
521 if (pattern[0] == string[0])
522 match = 1;
523 } else {
524 if (tolower((int)pattern[0]) == tolower((int)string[0]))
525 match = 1;
526 }
527 }
528 pattern++;
529 patternLen--;
530 }
531 if (not)
532 match = !match;
533 if (!match)
534 return 0; /* no match */
535 string++;
536 stringLen--;
537 break;
538 }
539 case '\\':
540 if (patternLen >= 2) {
541 pattern++;
542 patternLen--;
543 }
544 /* fall through */
545 default:
546 if (!nocase) {
547 if (pattern[0] != string[0])
548 return 0; /* no match */
549 } else {
550 if (tolower((int)pattern[0]) != tolower((int)string[0]))
551 return 0; /* no match */
552 }
553 string++;
554 stringLen--;
555 break;
556 }
557 pattern++;
558 patternLen--;
559 if (stringLen == 0) {
560 while(*pattern == '*') {
561 pattern++;
562 patternLen--;
563 }
564 break;
565 }
566 }
567 if (patternLen == 0 && stringLen == 0)
568 return 1;
569 return 0;
570 }
571
572 void redisLog(int level, const char *fmt, ...)
573 {
574 va_list ap;
575 FILE *fp;
576
577 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
578 if (!fp) return;
579
580 va_start(ap, fmt);
581 if (level >= server.verbosity) {
582 char *c = ".-*";
583 char buf[64];
584 time_t now;
585
586 now = time(NULL);
587 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
588 fprintf(fp,"%s %c ",buf,c[level]);
589 vfprintf(fp, fmt, ap);
590 fprintf(fp,"\n");
591 fflush(fp);
592 }
593 va_end(ap);
594
595 if (server.logfile) fclose(fp);
596 }
597
598 /*====================== Hash table type implementation ==================== */
599
600 /* This is an hash table type that uses the SDS dynamic strings libary as
601 * keys and radis objects as values (objects can hold SDS strings,
602 * lists, sets). */
603
604 static int sdsDictKeyCompare(void *privdata, const void *key1,
605 const void *key2)
606 {
607 int l1,l2;
608 DICT_NOTUSED(privdata);
609
610 l1 = sdslen((sds)key1);
611 l2 = sdslen((sds)key2);
612 if (l1 != l2) return 0;
613 return memcmp(key1, key2, l1) == 0;
614 }
615
616 static void dictRedisObjectDestructor(void *privdata, void *val)
617 {
618 DICT_NOTUSED(privdata);
619
620 decrRefCount(val);
621 }
622
623 static int dictSdsKeyCompare(void *privdata, const void *key1,
624 const void *key2)
625 {
626 const robj *o1 = key1, *o2 = key2;
627 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
628 }
629
630 static unsigned int dictSdsHash(const void *key) {
631 const robj *o = key;
632 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
633 }
634
635 static dictType setDictType = {
636 dictSdsHash, /* hash function */
637 NULL, /* key dup */
638 NULL, /* val dup */
639 dictSdsKeyCompare, /* key compare */
640 dictRedisObjectDestructor, /* key destructor */
641 NULL /* val destructor */
642 };
643
644 static dictType hashDictType = {
645 dictSdsHash, /* hash function */
646 NULL, /* key dup */
647 NULL, /* val dup */
648 dictSdsKeyCompare, /* key compare */
649 dictRedisObjectDestructor, /* key destructor */
650 dictRedisObjectDestructor /* val destructor */
651 };
652
653 /* ========================= Random utility functions ======================= */
654
655 /* Redis generally does not try to recover from out of memory conditions
656 * when allocating objects or strings, it is not clear if it will be possible
657 * to report this condition to the client since the networking layer itself
658 * is based on heap allocation for send buffers, so we simply abort.
659 * At least the code will be simpler to read... */
660 static void oom(const char *msg) {
661 fprintf(stderr, "%s: Out of memory\n",msg);
662 fflush(stderr);
663 sleep(1);
664 abort();
665 }
666
667 /* ====================== Redis server networking stuff ===================== */
668 void closeTimedoutClients(void) {
669 redisClient *c;
670 listNode *ln;
671 time_t now = time(NULL);
672
673 listRewind(server.clients);
674 while ((ln = listYield(server.clients)) != NULL) {
675 c = listNodeValue(ln);
676 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
677 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
678 (now - c->lastinteraction > server.maxidletime)) {
679 redisLog(REDIS_DEBUG,"Closing idle client");
680 freeClient(c);
681 }
682 }
683 }
684
685 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
686 * we resize the hash table to save memory */
687 void tryResizeHashTables(void) {
688 int j;
689
690 for (j = 0; j < server.dbnum; j++) {
691 long long size, used;
692
693 size = dictSlots(server.db[j].dict);
694 used = dictSize(server.db[j].dict);
695 if (size && used && size > REDIS_HT_MINSLOTS &&
696 (used*100/size < REDIS_HT_MINFILL)) {
697 redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
698 dictResize(server.db[j].dict);
699 redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
700 }
701 }
702 }
703
704 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
705 int j, loops = server.cronloops++;
706 REDIS_NOTUSED(eventLoop);
707 REDIS_NOTUSED(id);
708 REDIS_NOTUSED(clientData);
709
710 /* Update the global state with the amount of used memory */
711 server.usedmemory = zmalloc_used_memory();
712
713 /* Show some info about non-empty databases */
714 for (j = 0; j < server.dbnum; j++) {
715 long long size, used, vkeys;
716
717 size = dictSlots(server.db[j].dict);
718 used = dictSize(server.db[j].dict);
719 vkeys = dictSize(server.db[j].expires);
720 if (!(loops % 5) && used > 0) {
721 redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size);
722 /* dictPrintStats(server.dict); */
723 }
724 }
725
726 /* We don't want to resize the hash tables while a bacground saving
727 * is in progress: the saving child is created using fork() that is
728 * implemented with a copy-on-write semantic in most modern systems, so
729 * if we resize the HT while there is the saving child at work actually
730 * a lot of memory movements in the parent will cause a lot of pages
731 * copied. */
732 if (!server.bgsaveinprogress) tryResizeHashTables();
733
734 /* Show information about connected clients */
735 if (!(loops % 5)) {
736 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use",
737 listLength(server.clients)-listLength(server.slaves),
738 listLength(server.slaves),
739 server.usedmemory,
740 dictSize(server.sharingpool));
741 }
742
743 /* Close connections of timedout clients */
744 if (server.maxidletime && !(loops % 10))
745 closeTimedoutClients();
746
747 /* Check if a background saving in progress terminated */
748 if (server.bgsaveinprogress) {
749 int statloc;
750 /* XXX: TODO handle the case of the saving child killed */
751 if (wait4(-1,&statloc,WNOHANG,NULL)) {
752 int exitcode = WEXITSTATUS(statloc);
753 int bysignal = WIFSIGNALED(statloc);
754
755 if (!bysignal && exitcode == 0) {
756 redisLog(REDIS_NOTICE,
757 "Background saving terminated with success");
758 server.dirty = 0;
759 server.lastsave = time(NULL);
760 } else if (!bysignal && exitcode != 0) {
761 redisLog(REDIS_WARNING, "Background saving error");
762 } else {
763 redisLog(REDIS_WARNING,
764 "Background saving terminated by signal");
765 }
766 server.bgsaveinprogress = 0;
767 server.bgsavechildpid = -1;
768 updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
769 }
770 } else {
771 /* If there is not a background saving in progress check if
772 * we have to save now */
773 time_t now = time(NULL);
774 for (j = 0; j < server.saveparamslen; j++) {
775 struct saveparam *sp = server.saveparams+j;
776
777 if (server.dirty >= sp->changes &&
778 now-server.lastsave > sp->seconds) {
779 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
780 sp->changes, sp->seconds);
781 rdbSaveBackground(server.dbfilename);
782 break;
783 }
784 }
785 }
786
787 /* Try to expire a few timed out keys */
788 for (j = 0; j < server.dbnum; j++) {
789 redisDb *db = server.db+j;
790 int num = dictSize(db->expires);
791
792 if (num) {
793 time_t now = time(NULL);
794
795 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
796 num = REDIS_EXPIRELOOKUPS_PER_CRON;
797 while (num--) {
798 dictEntry *de;
799 time_t t;
800
801 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
802 t = (time_t) dictGetEntryVal(de);
803 if (now > t) {
804 deleteKey(db,dictGetEntryKey(de));
805 }
806 }
807 }
808 }
809
810 /* Check if we should connect to a MASTER */
811 if (server.replstate == REDIS_REPL_CONNECT) {
812 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
813 if (syncWithMaster() == REDIS_OK) {
814 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
815 }
816 }
817 return 1000;
818 }
819
820 static void createSharedObjects(void) {
821 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
822 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
823 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
824 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
825 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
826 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
827 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
828 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
829 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
830 /* no such key */
831 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
832 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
833 "-ERR Operation against a key holding the wrong kind of value\r\n"));
834 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
835 "-ERR no such key\r\n"));
836 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
837 "-ERR syntax error\r\n"));
838 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
839 "-ERR source and destination objects are the same\r\n"));
840 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
841 "-ERR index out of range\r\n"));
842 shared.space = createObject(REDIS_STRING,sdsnew(" "));
843 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
844 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
845 shared.select0 = createStringObject("select 0\r\n",10);
846 shared.select1 = createStringObject("select 1\r\n",10);
847 shared.select2 = createStringObject("select 2\r\n",10);
848 shared.select3 = createStringObject("select 3\r\n",10);
849 shared.select4 = createStringObject("select 4\r\n",10);
850 shared.select5 = createStringObject("select 5\r\n",10);
851 shared.select6 = createStringObject("select 6\r\n",10);
852 shared.select7 = createStringObject("select 7\r\n",10);
853 shared.select8 = createStringObject("select 8\r\n",10);
854 shared.select9 = createStringObject("select 9\r\n",10);
855 }
856
857 static void appendServerSaveParams(time_t seconds, int changes) {
858 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
859 if (server.saveparams == NULL) oom("appendServerSaveParams");
860 server.saveparams[server.saveparamslen].seconds = seconds;
861 server.saveparams[server.saveparamslen].changes = changes;
862 server.saveparamslen++;
863 }
864
865 static void ResetServerSaveParams() {
866 zfree(server.saveparams);
867 server.saveparams = NULL;
868 server.saveparamslen = 0;
869 }
870
871 static void initServerConfig() {
872 server.dbnum = REDIS_DEFAULT_DBNUM;
873 server.port = REDIS_SERVERPORT;
874 server.verbosity = REDIS_DEBUG;
875 server.maxidletime = REDIS_MAXIDLETIME;
876 server.saveparams = NULL;
877 server.logfile = NULL; /* NULL = log on standard output */
878 server.bindaddr = NULL;
879 server.glueoutputbuf = 1;
880 server.daemonize = 0;
881 server.pidfile = "/var/run/redis.pid";
882 server.dbfilename = "dump.rdb";
883 server.requirepass = NULL;
884 server.shareobjects = 0;
885 server.maxclients = 0;
886 server.maxmemory = 0;
887 ResetServerSaveParams();
888
889 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
890 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
891 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
892 /* Replication related */
893 server.isslave = 0;
894 server.masterhost = NULL;
895 server.masterport = 6379;
896 server.master = NULL;
897 server.replstate = REDIS_REPL_NONE;
898 }
899
900 static void initServer() {
901 int j;
902
903 signal(SIGHUP, SIG_IGN);
904 signal(SIGPIPE, SIG_IGN);
905
906 server.clients = listCreate();
907 server.slaves = listCreate();
908 server.monitors = listCreate();
909 server.objfreelist = listCreate();
910 createSharedObjects();
911 server.el = aeCreateEventLoop();
912 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
913 server.sharingpool = dictCreate(&setDictType,NULL);
914 server.sharingpoolsize = 1024;
915 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
916 oom("server initialization"); /* Fatal OOM */
917 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
918 if (server.fd == -1) {
919 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
920 exit(1);
921 }
922 for (j = 0; j < server.dbnum; j++) {
923 server.db[j].dict = dictCreate(&hashDictType,NULL);
924 server.db[j].expires = dictCreate(&setDictType,NULL);
925 server.db[j].id = j;
926 }
927 server.cronloops = 0;
928 server.bgsaveinprogress = 0;
929 server.bgsavechildpid = -1;
930 server.lastsave = time(NULL);
931 server.dirty = 0;
932 server.usedmemory = 0;
933 server.stat_numcommands = 0;
934 server.stat_numconnections = 0;
935 server.stat_starttime = time(NULL);
936 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
937 }
938
939 /* Empty the whole database */
940 static long long emptyDb() {
941 int j;
942 long long removed = 0;
943
944 for (j = 0; j < server.dbnum; j++) {
945 removed += dictSize(server.db[j].dict);
946 dictEmpty(server.db[j].dict);
947 dictEmpty(server.db[j].expires);
948 }
949 return removed;
950 }
951
952 static int yesnotoi(char *s) {
953 if (!strcasecmp(s,"yes")) return 1;
954 else if (!strcasecmp(s,"no")) return 0;
955 else return -1;
956 }
957
958 /* I agree, this is a very rudimental way to load a configuration...
959 will improve later if the config gets more complex */
960 static void loadServerConfig(char *filename) {
961 FILE *fp = fopen(filename,"r");
962 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
963 int linenum = 0;
964 sds line = NULL;
965
966 if (!fp) {
967 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
968 exit(1);
969 }
970 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
971 sds *argv;
972 int argc, j;
973
974 linenum++;
975 line = sdsnew(buf);
976 line = sdstrim(line," \t\r\n");
977
978 /* Skip comments and blank lines*/
979 if (line[0] == '#' || line[0] == '\0') {
980 sdsfree(line);
981 continue;
982 }
983
984 /* Split into arguments */
985 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
986 sdstolower(argv[0]);
987
988 /* Execute config directives */
989 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
990 server.maxidletime = atoi(argv[1]);
991 if (server.maxidletime < 0) {
992 err = "Invalid timeout value"; goto loaderr;
993 }
994 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
995 server.port = atoi(argv[1]);
996 if (server.port < 1 || server.port > 65535) {
997 err = "Invalid port"; goto loaderr;
998 }
999 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1000 server.bindaddr = zstrdup(argv[1]);
1001 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1002 int seconds = atoi(argv[1]);
1003 int changes = atoi(argv[2]);
1004 if (seconds < 1 || changes < 0) {
1005 err = "Invalid save parameters"; goto loaderr;
1006 }
1007 appendServerSaveParams(seconds,changes);
1008 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1009 if (chdir(argv[1]) == -1) {
1010 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1011 argv[1], strerror(errno));
1012 exit(1);
1013 }
1014 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1015 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1016 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1017 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1018 else {
1019 err = "Invalid log level. Must be one of debug, notice, warning";
1020 goto loaderr;
1021 }
1022 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1023 FILE *fp;
1024
1025 server.logfile = zstrdup(argv[1]);
1026 if (!strcasecmp(server.logfile,"stdout")) {
1027 zfree(server.logfile);
1028 server.logfile = NULL;
1029 }
1030 if (server.logfile) {
1031 /* Test if we are able to open the file. The server will not
1032 * be able to abort just for this problem later... */
1033 fp = fopen(server.logfile,"a");
1034 if (fp == NULL) {
1035 err = sdscatprintf(sdsempty(),
1036 "Can't open the log file: %s", strerror(errno));
1037 goto loaderr;
1038 }
1039 fclose(fp);
1040 }
1041 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1042 server.dbnum = atoi(argv[1]);
1043 if (server.dbnum < 1) {
1044 err = "Invalid number of databases"; goto loaderr;
1045 }
1046 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1047 server.maxclients = atoi(argv[1]);
1048 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1049 server.maxmemory = atoi(argv[1]);
1050 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1051 server.masterhost = sdsnew(argv[1]);
1052 server.masterport = atoi(argv[2]);
1053 server.replstate = REDIS_REPL_CONNECT;
1054 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1055 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1056 err = "argument must be 'yes' or 'no'"; goto loaderr;
1057 }
1058 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1059 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1060 err = "argument must be 'yes' or 'no'"; goto loaderr;
1061 }
1062 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1063 server.sharingpoolsize = atoi(argv[1]);
1064 if (server.sharingpoolsize < 1) {
1065 err = "invalid object sharing pool size"; goto loaderr;
1066 }
1067 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1068 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1069 err = "argument must be 'yes' or 'no'"; goto loaderr;
1070 }
1071 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1072 server.requirepass = zstrdup(argv[1]);
1073 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1074 server.pidfile = zstrdup(argv[1]);
1075 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1076 server.dbfilename = zstrdup(argv[1]);
1077 } else {
1078 err = "Bad directive or wrong number of arguments"; goto loaderr;
1079 }
1080 for (j = 0; j < argc; j++)
1081 sdsfree(argv[j]);
1082 zfree(argv);
1083 sdsfree(line);
1084 }
1085 fclose(fp);
1086 return;
1087
1088 loaderr:
1089 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1090 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1091 fprintf(stderr, ">>> '%s'\n", line);
1092 fprintf(stderr, "%s\n", err);
1093 exit(1);
1094 }
1095
1096 static void freeClientArgv(redisClient *c) {
1097 int j;
1098
1099 for (j = 0; j < c->argc; j++)
1100 decrRefCount(c->argv[j]);
1101 c->argc = 0;
1102 }
1103
1104 static void freeClient(redisClient *c) {
1105 listNode *ln;
1106
1107 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1108 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1109 sdsfree(c->querybuf);
1110 listRelease(c->reply);
1111 freeClientArgv(c);
1112 close(c->fd);
1113 ln = listSearchKey(server.clients,c);
1114 assert(ln != NULL);
1115 listDelNode(server.clients,ln);
1116 if (c->flags & REDIS_SLAVE) {
1117 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1118 close(c->repldbfd);
1119 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1120 ln = listSearchKey(l,c);
1121 assert(ln != NULL);
1122 listDelNode(l,ln);
1123 }
1124 if (c->flags & REDIS_MASTER) {
1125 server.master = NULL;
1126 server.replstate = REDIS_REPL_CONNECT;
1127 }
1128 zfree(c->argv);
1129 zfree(c);
1130 }
1131
1132 static void glueReplyBuffersIfNeeded(redisClient *c) {
1133 int totlen = 0;
1134 listNode *ln;
1135 robj *o;
1136
1137 listRewind(c->reply);
1138 while((ln = listYield(c->reply))) {
1139 o = ln->value;
1140 totlen += sdslen(o->ptr);
1141 /* This optimization makes more sense if we don't have to copy
1142 * too much data */
1143 if (totlen > 1024) return;
1144 }
1145 if (totlen > 0) {
1146 char buf[1024];
1147 int copylen = 0;
1148
1149 listRewind(c->reply);
1150 while((ln = listYield(c->reply))) {
1151 o = ln->value;
1152 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1153 copylen += sdslen(o->ptr);
1154 listDelNode(c->reply,ln);
1155 }
1156 /* Now the output buffer is empty, add the new single element */
1157 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1158 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1159 }
1160 }
1161
1162 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1163 redisClient *c = privdata;
1164 int nwritten = 0, totwritten = 0, objlen;
1165 robj *o;
1166 REDIS_NOTUSED(el);
1167 REDIS_NOTUSED(mask);
1168
1169 if (server.glueoutputbuf && listLength(c->reply) > 1)
1170 glueReplyBuffersIfNeeded(c);
1171 while(listLength(c->reply)) {
1172 o = listNodeValue(listFirst(c->reply));
1173 objlen = sdslen(o->ptr);
1174
1175 if (objlen == 0) {
1176 listDelNode(c->reply,listFirst(c->reply));
1177 continue;
1178 }
1179
1180 if (c->flags & REDIS_MASTER) {
1181 /* Don't reply to a master */
1182 nwritten = objlen - c->sentlen;
1183 } else {
1184 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1185 if (nwritten <= 0) break;
1186 }
1187 c->sentlen += nwritten;
1188 totwritten += nwritten;
1189 /* If we fully sent the object on head go to the next one */
1190 if (c->sentlen == objlen) {
1191 listDelNode(c->reply,listFirst(c->reply));
1192 c->sentlen = 0;
1193 }
1194 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1195 * bytes, in a single threaded server it's a good idea to server
1196 * other clients as well, even if a very large request comes from
1197 * super fast link that is always able to accept data (in real world
1198 * terms think to 'KEYS *' against the loopback interfae) */
1199 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1200 }
1201 if (nwritten == -1) {
1202 if (errno == EAGAIN) {
1203 nwritten = 0;
1204 } else {
1205 redisLog(REDIS_DEBUG,
1206 "Error writing to client: %s", strerror(errno));
1207 freeClient(c);
1208 return;
1209 }
1210 }
1211 if (totwritten > 0) c->lastinteraction = time(NULL);
1212 if (listLength(c->reply) == 0) {
1213 c->sentlen = 0;
1214 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1215 }
1216 }
1217
1218 static struct redisCommand *lookupCommand(char *name) {
1219 int j = 0;
1220 while(cmdTable[j].name != NULL) {
1221 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1222 j++;
1223 }
1224 return NULL;
1225 }
1226
1227 /* resetClient prepare the client to process the next command */
1228 static void resetClient(redisClient *c) {
1229 freeClientArgv(c);
1230 c->bulklen = -1;
1231 }
1232
1233 /* If this function gets called we already read a whole
1234 * command, argments are in the client argv/argc fields.
1235 * processCommand() execute the command or prepare the
1236 * server for a bulk read from the client.
1237 *
1238 * If 1 is returned the client is still alive and valid and
1239 * and other operations can be performed by the caller. Otherwise
1240 * if 0 is returned the client was destroied (i.e. after QUIT). */
1241 static int processCommand(redisClient *c) {
1242 struct redisCommand *cmd;
1243 long long dirty;
1244
1245 /* Free some memory if needed (maxmemory setting) */
1246 if (server.maxmemory) freeMemoryIfNeeded();
1247
1248 /* The QUIT command is handled as a special case. Normal command
1249 * procs are unable to close the client connection safely */
1250 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1251 freeClient(c);
1252 return 0;
1253 }
1254 cmd = lookupCommand(c->argv[0]->ptr);
1255 if (!cmd) {
1256 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1257 resetClient(c);
1258 return 1;
1259 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1260 (c->argc < -cmd->arity)) {
1261 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1262 resetClient(c);
1263 return 1;
1264 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1265 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1266 resetClient(c);
1267 return 1;
1268 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1269 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1270
1271 decrRefCount(c->argv[c->argc-1]);
1272 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1273 c->argc--;
1274 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1275 resetClient(c);
1276 return 1;
1277 }
1278 c->argc--;
1279 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1280 /* It is possible that the bulk read is already in the
1281 * buffer. Check this condition and handle it accordingly */
1282 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1283 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1284 c->argc++;
1285 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1286 } else {
1287 return 1;
1288 }
1289 }
1290 /* Let's try to share objects on the command arguments vector */
1291 if (server.shareobjects) {
1292 int j;
1293 for(j = 1; j < c->argc; j++)
1294 c->argv[j] = tryObjectSharing(c->argv[j]);
1295 }
1296 /* Check if the user is authenticated */
1297 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1298 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1299 resetClient(c);
1300 return 1;
1301 }
1302
1303 /* Exec the command */
1304 dirty = server.dirty;
1305 cmd->proc(c);
1306 if (server.dirty-dirty != 0 && listLength(server.slaves))
1307 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1308 if (listLength(server.monitors))
1309 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1310 server.stat_numcommands++;
1311
1312 /* Prepare the client for the next command */
1313 if (c->flags & REDIS_CLOSE) {
1314 freeClient(c);
1315 return 0;
1316 }
1317 resetClient(c);
1318 return 1;
1319 }
1320
1321 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1322 listNode *ln;
1323 int outc = 0, j;
1324 robj **outv;
1325 /* (args*2)+1 is enough room for args, spaces, newlines */
1326 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1327
1328 if (argc <= REDIS_STATIC_ARGS) {
1329 outv = static_outv;
1330 } else {
1331 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1332 if (!outv) oom("replicationFeedSlaves");
1333 }
1334
1335 for (j = 0; j < argc; j++) {
1336 if (j != 0) outv[outc++] = shared.space;
1337 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1338 robj *lenobj;
1339
1340 lenobj = createObject(REDIS_STRING,
1341 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv[j]->ptr)));
1342 lenobj->refcount = 0;
1343 outv[outc++] = lenobj;
1344 }
1345 outv[outc++] = argv[j];
1346 }
1347 outv[outc++] = shared.crlf;
1348
1349 /* Increment all the refcounts at start and decrement at end in order to
1350 * be sure to free objects if there is no slave in a replication state
1351 * able to be feed with commands */
1352 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1353 listRewind(slaves);
1354 while((ln = listYield(slaves))) {
1355 redisClient *slave = ln->value;
1356
1357 /* Don't feed slaves that are still waiting for BGSAVE to start */
1358 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1359
1360 /* Feed all the other slaves, MONITORs and so on */
1361 if (slave->slaveseldb != dictid) {
1362 robj *selectcmd;
1363
1364 switch(dictid) {
1365 case 0: selectcmd = shared.select0; break;
1366 case 1: selectcmd = shared.select1; break;
1367 case 2: selectcmd = shared.select2; break;
1368 case 3: selectcmd = shared.select3; break;
1369 case 4: selectcmd = shared.select4; break;
1370 case 5: selectcmd = shared.select5; break;
1371 case 6: selectcmd = shared.select6; break;
1372 case 7: selectcmd = shared.select7; break;
1373 case 8: selectcmd = shared.select8; break;
1374 case 9: selectcmd = shared.select9; break;
1375 default:
1376 selectcmd = createObject(REDIS_STRING,
1377 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1378 selectcmd->refcount = 0;
1379 break;
1380 }
1381 addReply(slave,selectcmd);
1382 slave->slaveseldb = dictid;
1383 }
1384 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1385 }
1386 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1387 if (outv != static_outv) zfree(outv);
1388 }
1389
1390 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1391 redisClient *c = (redisClient*) privdata;
1392 char buf[REDIS_IOBUF_LEN];
1393 int nread;
1394 REDIS_NOTUSED(el);
1395 REDIS_NOTUSED(mask);
1396
1397 nread = read(fd, buf, REDIS_IOBUF_LEN);
1398 if (nread == -1) {
1399 if (errno == EAGAIN) {
1400 nread = 0;
1401 } else {
1402 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1403 freeClient(c);
1404 return;
1405 }
1406 } else if (nread == 0) {
1407 redisLog(REDIS_DEBUG, "Client closed connection");
1408 freeClient(c);
1409 return;
1410 }
1411 if (nread) {
1412 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1413 c->lastinteraction = time(NULL);
1414 } else {
1415 return;
1416 }
1417
1418 again:
1419 if (c->bulklen == -1) {
1420 /* Read the first line of the query */
1421 char *p = strchr(c->querybuf,'\n');
1422 size_t querylen;
1423 if (p) {
1424 sds query, *argv;
1425 int argc, j;
1426
1427 query = c->querybuf;
1428 c->querybuf = sdsempty();
1429 querylen = 1+(p-(query));
1430 if (sdslen(query) > querylen) {
1431 /* leave data after the first line of the query in the buffer */
1432 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1433 }
1434 *p = '\0'; /* remove "\n" */
1435 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1436 sdsupdatelen(query);
1437
1438 /* Now we can split the query in arguments */
1439 if (sdslen(query) == 0) {
1440 /* Ignore empty query */
1441 sdsfree(query);
1442 return;
1443 }
1444 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1445 if (argv == NULL) oom("sdssplitlen");
1446 sdsfree(query);
1447
1448 if (c->argv) zfree(c->argv);
1449 c->argv = zmalloc(sizeof(robj*)*argc);
1450 if (c->argv == NULL) oom("allocating arguments list for client");
1451
1452 for (j = 0; j < argc; j++) {
1453 if (sdslen(argv[j])) {
1454 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1455 c->argc++;
1456 } else {
1457 sdsfree(argv[j]);
1458 }
1459 }
1460 zfree(argv);
1461 /* Execute the command. If the client is still valid
1462 * after processCommand() return and there is something
1463 * on the query buffer try to process the next command. */
1464 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1465 return;
1466 } else if (sdslen(c->querybuf) >= 1024*32) {
1467 redisLog(REDIS_DEBUG, "Client protocol error");
1468 freeClient(c);
1469 return;
1470 }
1471 } else {
1472 /* Bulk read handling. Note that if we are at this point
1473 the client already sent a command terminated with a newline,
1474 we are reading the bulk data that is actually the last
1475 argument of the command. */
1476 int qbl = sdslen(c->querybuf);
1477
1478 if (c->bulklen <= qbl) {
1479 /* Copy everything but the final CRLF as final argument */
1480 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1481 c->argc++;
1482 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1483 processCommand(c);
1484 return;
1485 }
1486 }
1487 }
1488
1489 static int selectDb(redisClient *c, int id) {
1490 if (id < 0 || id >= server.dbnum)
1491 return REDIS_ERR;
1492 c->db = &server.db[id];
1493 return REDIS_OK;
1494 }
1495
1496 static void *dupClientReplyValue(void *o) {
1497 incrRefCount((robj*)o);
1498 return 0;
1499 }
1500
1501 static redisClient *createClient(int fd) {
1502 redisClient *c = zmalloc(sizeof(*c));
1503
1504 anetNonBlock(NULL,fd);
1505 anetTcpNoDelay(NULL,fd);
1506 if (!c) return NULL;
1507 selectDb(c,0);
1508 c->fd = fd;
1509 c->querybuf = sdsempty();
1510 c->argc = 0;
1511 c->argv = NULL;
1512 c->bulklen = -1;
1513 c->sentlen = 0;
1514 c->flags = 0;
1515 c->lastinteraction = time(NULL);
1516 c->authenticated = 0;
1517 c->replstate = REDIS_REPL_NONE;
1518 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1519 listSetFreeMethod(c->reply,decrRefCount);
1520 listSetDupMethod(c->reply,dupClientReplyValue);
1521 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1522 readQueryFromClient, c, NULL) == AE_ERR) {
1523 freeClient(c);
1524 return NULL;
1525 }
1526 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1527 return c;
1528 }
1529
1530 static void addReply(redisClient *c, robj *obj) {
1531 if (listLength(c->reply) == 0 &&
1532 (c->replstate == REDIS_REPL_NONE ||
1533 c->replstate == REDIS_REPL_ONLINE) &&
1534 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1535 sendReplyToClient, c, NULL) == AE_ERR) return;
1536 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1537 incrRefCount(obj);
1538 }
1539
1540 static void addReplySds(redisClient *c, sds s) {
1541 robj *o = createObject(REDIS_STRING,s);
1542 addReply(c,o);
1543 decrRefCount(o);
1544 }
1545
1546 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1547 int cport, cfd;
1548 char cip[128];
1549 redisClient *c;
1550 REDIS_NOTUSED(el);
1551 REDIS_NOTUSED(mask);
1552 REDIS_NOTUSED(privdata);
1553
1554 cfd = anetAccept(server.neterr, fd, cip, &cport);
1555 if (cfd == AE_ERR) {
1556 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1557 return;
1558 }
1559 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1560 if ((c = createClient(cfd)) == NULL) {
1561 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1562 close(cfd); /* May be already closed, just ingore errors */
1563 return;
1564 }
1565 /* If maxclient directive is set and this is one client more... close the
1566 * connection. Note that we create the client instead to check before
1567 * for this condition, since now the socket is already set in nonblocking
1568 * mode and we can send an error for free using the Kernel I/O */
1569 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1570 char *err = "-ERR max number of clients reached\r\n";
1571
1572 /* That's a best effort error message, don't check write errors */
1573 (void) write(c->fd,err,strlen(err));
1574 freeClient(c);
1575 return;
1576 }
1577 server.stat_numconnections++;
1578 }
1579
1580 /* ======================= Redis objects implementation ===================== */
1581
1582 static robj *createObject(int type, void *ptr) {
1583 robj *o;
1584
1585 if (listLength(server.objfreelist)) {
1586 listNode *head = listFirst(server.objfreelist);
1587 o = listNodeValue(head);
1588 listDelNode(server.objfreelist,head);
1589 } else {
1590 o = zmalloc(sizeof(*o));
1591 }
1592 if (!o) oom("createObject");
1593 o->type = type;
1594 o->ptr = ptr;
1595 o->refcount = 1;
1596 return o;
1597 }
1598
1599 static robj *createStringObject(char *ptr, size_t len) {
1600 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1601 }
1602
1603 static robj *createListObject(void) {
1604 list *l = listCreate();
1605
1606 if (!l) oom("listCreate");
1607 listSetFreeMethod(l,decrRefCount);
1608 return createObject(REDIS_LIST,l);
1609 }
1610
1611 static robj *createSetObject(void) {
1612 dict *d = dictCreate(&setDictType,NULL);
1613 if (!d) oom("dictCreate");
1614 return createObject(REDIS_SET,d);
1615 }
1616
1617 static void freeStringObject(robj *o) {
1618 sdsfree(o->ptr);
1619 }
1620
1621 static void freeListObject(robj *o) {
1622 listRelease((list*) o->ptr);
1623 }
1624
1625 static void freeSetObject(robj *o) {
1626 dictRelease((dict*) o->ptr);
1627 }
1628
1629 static void freeHashObject(robj *o) {
1630 dictRelease((dict*) o->ptr);
1631 }
1632
1633 static void incrRefCount(robj *o) {
1634 o->refcount++;
1635 #ifdef DEBUG_REFCOUNT
1636 if (o->type == REDIS_STRING)
1637 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1638 #endif
1639 }
1640
1641 static void decrRefCount(void *obj) {
1642 robj *o = obj;
1643
1644 #ifdef DEBUG_REFCOUNT
1645 if (o->type == REDIS_STRING)
1646 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1647 #endif
1648 if (--(o->refcount) == 0) {
1649 switch(o->type) {
1650 case REDIS_STRING: freeStringObject(o); break;
1651 case REDIS_LIST: freeListObject(o); break;
1652 case REDIS_SET: freeSetObject(o); break;
1653 case REDIS_HASH: freeHashObject(o); break;
1654 default: assert(0 != 0); break;
1655 }
1656 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1657 !listAddNodeHead(server.objfreelist,o))
1658 zfree(o);
1659 }
1660 }
1661
1662 /* Try to share an object against the shared objects pool */
1663 static robj *tryObjectSharing(robj *o) {
1664 struct dictEntry *de;
1665 unsigned long c;
1666
1667 if (o == NULL || server.shareobjects == 0) return o;
1668
1669 assert(o->type == REDIS_STRING);
1670 de = dictFind(server.sharingpool,o);
1671 if (de) {
1672 robj *shared = dictGetEntryKey(de);
1673
1674 c = ((unsigned long) dictGetEntryVal(de))+1;
1675 dictGetEntryVal(de) = (void*) c;
1676 incrRefCount(shared);
1677 decrRefCount(o);
1678 return shared;
1679 } else {
1680 /* Here we are using a stream algorihtm: Every time an object is
1681 * shared we increment its count, everytime there is a miss we
1682 * recrement the counter of a random object. If this object reaches
1683 * zero we remove the object and put the current object instead. */
1684 if (dictSize(server.sharingpool) >=
1685 server.sharingpoolsize) {
1686 de = dictGetRandomKey(server.sharingpool);
1687 assert(de != NULL);
1688 c = ((unsigned long) dictGetEntryVal(de))-1;
1689 dictGetEntryVal(de) = (void*) c;
1690 if (c == 0) {
1691 dictDelete(server.sharingpool,de->key);
1692 }
1693 } else {
1694 c = 0; /* If the pool is empty we want to add this object */
1695 }
1696 if (c == 0) {
1697 int retval;
1698
1699 retval = dictAdd(server.sharingpool,o,(void*)1);
1700 assert(retval == DICT_OK);
1701 incrRefCount(o);
1702 }
1703 return o;
1704 }
1705 }
1706
1707 static robj *lookupKey(redisDb *db, robj *key) {
1708 dictEntry *de = dictFind(db->dict,key);
1709 return de ? dictGetEntryVal(de) : NULL;
1710 }
1711
1712 static robj *lookupKeyRead(redisDb *db, robj *key) {
1713 expireIfNeeded(db,key);
1714 return lookupKey(db,key);
1715 }
1716
1717 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1718 deleteIfVolatile(db,key);
1719 return lookupKey(db,key);
1720 }
1721
1722 static int deleteKey(redisDb *db, robj *key) {
1723 int retval;
1724
1725 /* We need to protect key from destruction: after the first dictDelete()
1726 * it may happen that 'key' is no longer valid if we don't increment
1727 * it's count. This may happen when we get the object reference directly
1728 * from the hash table with dictRandomKey() or dict iterators */
1729 incrRefCount(key);
1730 if (dictSize(db->expires)) dictDelete(db->expires,key);
1731 retval = dictDelete(db->dict,key);
1732 decrRefCount(key);
1733
1734 return retval == DICT_OK;
1735 }
1736
1737 /*============================ DB saving/loading ============================ */
1738
1739 static int rdbSaveType(FILE *fp, unsigned char type) {
1740 if (fwrite(&type,1,1,fp) == 0) return -1;
1741 return 0;
1742 }
1743
1744 static int rdbSaveTime(FILE *fp, time_t t) {
1745 int32_t t32 = (int32_t) t;
1746 if (fwrite(&t32,4,1,fp) == 0) return -1;
1747 return 0;
1748 }
1749
1750 /* check rdbLoadLen() comments for more info */
1751 static int rdbSaveLen(FILE *fp, uint32_t len) {
1752 unsigned char buf[2];
1753
1754 if (len < (1<<6)) {
1755 /* Save a 6 bit len */
1756 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
1757 if (fwrite(buf,1,1,fp) == 0) return -1;
1758 } else if (len < (1<<14)) {
1759 /* Save a 14 bit len */
1760 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
1761 buf[1] = len&0xFF;
1762 if (fwrite(buf,2,1,fp) == 0) return -1;
1763 } else {
1764 /* Save a 32 bit len */
1765 buf[0] = (REDIS_RDB_32BITLEN<<6);
1766 if (fwrite(buf,1,1,fp) == 0) return -1;
1767 len = htonl(len);
1768 if (fwrite(&len,4,1,fp) == 0) return -1;
1769 }
1770 return 0;
1771 }
1772
1773 /* String objects in the form "2391" "-100" without any space and with a
1774 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1775 * encoded as integers to save space */
1776 int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
1777 long long value;
1778 char *endptr, buf[32];
1779
1780 /* Check if it's possible to encode this value as a number */
1781 value = strtoll(s, &endptr, 10);
1782 if (endptr[0] != '\0') return 0;
1783 snprintf(buf,32,"%lld",value);
1784
1785 /* If the number converted back into a string is not identical
1786 * then it's not possible to encode the string as integer */
1787 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
1788
1789 /* Finally check if it fits in our ranges */
1790 if (value >= -(1<<7) && value <= (1<<7)-1) {
1791 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
1792 enc[1] = value&0xFF;
1793 return 2;
1794 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
1795 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
1796 enc[1] = value&0xFF;
1797 enc[2] = (value>>8)&0xFF;
1798 return 3;
1799 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
1800 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
1801 enc[1] = value&0xFF;
1802 enc[2] = (value>>8)&0xFF;
1803 enc[3] = (value>>16)&0xFF;
1804 enc[4] = (value>>24)&0xFF;
1805 return 5;
1806 } else {
1807 return 0;
1808 }
1809 }
1810
1811 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
1812 unsigned int comprlen, outlen;
1813 unsigned char byte;
1814 void *out;
1815
1816 /* We require at least four bytes compression for this to be worth it */
1817 outlen = sdslen(obj->ptr)-4;
1818 if (outlen <= 0) return 0;
1819 if ((out = zmalloc(outlen+1)) == NULL) return 0;
1820 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
1821 if (comprlen == 0) {
1822 zfree(out);
1823 return 0;
1824 }
1825 /* Data compressed! Let's save it on disk */
1826 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
1827 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
1828 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
1829 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
1830 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
1831 zfree(out);
1832 return comprlen;
1833
1834 writeerr:
1835 zfree(out);
1836 return -1;
1837 }
1838
1839 /* Save a string objet as [len][data] on disk. If the object is a string
1840 * representation of an integer value we try to safe it in a special form */
1841 static int rdbSaveStringObject(FILE *fp, robj *obj) {
1842 size_t len = sdslen(obj->ptr);
1843 int enclen;
1844
1845 /* Try integer encoding */
1846 if (len <= 11) {
1847 unsigned char buf[5];
1848 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
1849 if (fwrite(buf,enclen,1,fp) == 0) return -1;
1850 return 0;
1851 }
1852 }
1853
1854 /* Try LZF compression - under 20 bytes it's unable to compress even
1855 * aaaaaaaaaaaaaaaaaa so skip it */
1856 if (1 && len > 20) {
1857 int retval;
1858
1859 retval = rdbSaveLzfStringObject(fp,obj);
1860 if (retval == -1) return -1;
1861 if (retval > 0) return 0;
1862 /* retval == 0 means data can't be compressed, save the old way */
1863 }
1864
1865 /* Store verbatim */
1866 if (rdbSaveLen(fp,len) == -1) return -1;
1867 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
1868 return 0;
1869 }
1870
1871 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1872 static int rdbSave(char *filename) {
1873 dictIterator *di = NULL;
1874 dictEntry *de;
1875 FILE *fp;
1876 char tmpfile[256];
1877 int j;
1878 time_t now = time(NULL);
1879
1880 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
1881 fp = fopen(tmpfile,"w");
1882 if (!fp) {
1883 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
1884 return REDIS_ERR;
1885 }
1886 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
1887 for (j = 0; j < server.dbnum; j++) {
1888 redisDb *db = server.db+j;
1889 dict *d = db->dict;
1890 if (dictSize(d) == 0) continue;
1891 di = dictGetIterator(d);
1892 if (!di) {
1893 fclose(fp);
1894 return REDIS_ERR;
1895 }
1896
1897 /* Write the SELECT DB opcode */
1898 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
1899 if (rdbSaveLen(fp,j) == -1) goto werr;
1900
1901 /* Iterate this DB writing every entry */
1902 while((de = dictNext(di)) != NULL) {
1903 robj *key = dictGetEntryKey(de);
1904 robj *o = dictGetEntryVal(de);
1905 time_t expiretime = getExpire(db,key);
1906
1907 /* Save the expire time */
1908 if (expiretime != -1) {
1909 /* If this key is already expired skip it */
1910 if (expiretime < now) continue;
1911 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
1912 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
1913 }
1914 /* Save the key and associated value */
1915 if (rdbSaveType(fp,o->type) == -1) goto werr;
1916 if (rdbSaveStringObject(fp,key) == -1) goto werr;
1917 if (o->type == REDIS_STRING) {
1918 /* Save a string value */
1919 if (rdbSaveStringObject(fp,o) == -1) goto werr;
1920 } else if (o->type == REDIS_LIST) {
1921 /* Save a list value */
1922 list *list = o->ptr;
1923 listNode *ln;
1924
1925 listRewind(list);
1926 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
1927 while((ln = listYield(list))) {
1928 robj *eleobj = listNodeValue(ln);
1929
1930 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1931 }
1932 } else if (o->type == REDIS_SET) {
1933 /* Save a set value */
1934 dict *set = o->ptr;
1935 dictIterator *di = dictGetIterator(set);
1936 dictEntry *de;
1937
1938 if (!set) oom("dictGetIteraotr");
1939 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
1940 while((de = dictNext(di)) != NULL) {
1941 robj *eleobj = dictGetEntryKey(de);
1942
1943 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1944 }
1945 dictReleaseIterator(di);
1946 } else {
1947 assert(0 != 0);
1948 }
1949 }
1950 dictReleaseIterator(di);
1951 }
1952 /* EOF opcode */
1953 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
1954
1955 /* Make sure data will not remain on the OS's output buffers */
1956 fflush(fp);
1957 fsync(fileno(fp));
1958 fclose(fp);
1959
1960 /* Use RENAME to make sure the DB file is changed atomically only
1961 * if the generate DB file is ok. */
1962 if (rename(tmpfile,filename) == -1) {
1963 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
1964 unlink(tmpfile);
1965 return REDIS_ERR;
1966 }
1967 redisLog(REDIS_NOTICE,"DB saved on disk");
1968 server.dirty = 0;
1969 server.lastsave = time(NULL);
1970 return REDIS_OK;
1971
1972 werr:
1973 fclose(fp);
1974 unlink(tmpfile);
1975 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1976 if (di) dictReleaseIterator(di);
1977 return REDIS_ERR;
1978 }
1979
1980 static int rdbSaveBackground(char *filename) {
1981 pid_t childpid;
1982
1983 if (server.bgsaveinprogress) return REDIS_ERR;
1984 if ((childpid = fork()) == 0) {
1985 /* Child */
1986 close(server.fd);
1987 if (rdbSave(filename) == REDIS_OK) {
1988 exit(0);
1989 } else {
1990 exit(1);
1991 }
1992 } else {
1993 /* Parent */
1994 if (childpid == -1) {
1995 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
1996 strerror(errno));
1997 return REDIS_ERR;
1998 }
1999 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2000 server.bgsaveinprogress = 1;
2001 server.bgsavechildpid = childpid;
2002 return REDIS_OK;
2003 }
2004 return REDIS_OK; /* unreached */
2005 }
2006
2007 static int rdbLoadType(FILE *fp) {
2008 unsigned char type;
2009 if (fread(&type,1,1,fp) == 0) return -1;
2010 return type;
2011 }
2012
2013 static time_t rdbLoadTime(FILE *fp) {
2014 int32_t t32;
2015 if (fread(&t32,4,1,fp) == 0) return -1;
2016 return (time_t) t32;
2017 }
2018
2019 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2020 * of this file for a description of how this are stored on disk.
2021 *
2022 * isencoded is set to 1 if the readed length is not actually a length but
2023 * an "encoding type", check the above comments for more info */
2024 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2025 unsigned char buf[2];
2026 uint32_t len;
2027
2028 if (isencoded) *isencoded = 0;
2029 if (rdbver == 0) {
2030 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2031 return ntohl(len);
2032 } else {
2033 int type;
2034
2035 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2036 type = (buf[0]&0xC0)>>6;
2037 if (type == REDIS_RDB_6BITLEN) {
2038 /* Read a 6 bit len */
2039 return buf[0]&0x3F;
2040 } else if (type == REDIS_RDB_ENCVAL) {
2041 /* Read a 6 bit len encoding type */
2042 if (isencoded) *isencoded = 1;
2043 return buf[0]&0x3F;
2044 } else if (type == REDIS_RDB_14BITLEN) {
2045 /* Read a 14 bit len */
2046 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2047 return ((buf[0]&0x3F)<<8)|buf[1];
2048 } else {
2049 /* Read a 32 bit len */
2050 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2051 return ntohl(len);
2052 }
2053 }
2054 }
2055
2056 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2057 unsigned char enc[4];
2058 long long val;
2059
2060 if (enctype == REDIS_RDB_ENC_INT8) {
2061 if (fread(enc,1,1,fp) == 0) return NULL;
2062 val = (signed char)enc[0];
2063 } else if (enctype == REDIS_RDB_ENC_INT16) {
2064 uint16_t v;
2065 if (fread(enc,2,1,fp) == 0) return NULL;
2066 v = enc[0]|(enc[1]<<8);
2067 val = (int16_t)v;
2068 } else if (enctype == REDIS_RDB_ENC_INT32) {
2069 uint32_t v;
2070 if (fread(enc,4,1,fp) == 0) return NULL;
2071 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2072 val = (int32_t)v;
2073 } else {
2074 val = 0; /* anti-warning */
2075 assert(0!=0);
2076 }
2077 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2078 }
2079
2080 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2081 unsigned int len, clen;
2082 unsigned char *c = NULL;
2083 sds val = NULL;
2084
2085 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2086 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2087 if ((c = zmalloc(clen)) == NULL) goto err;
2088 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2089 if (fread(c,clen,1,fp) == 0) goto err;
2090 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2091 zfree(c);
2092 return createObject(REDIS_STRING,val);
2093 err:
2094 zfree(c);
2095 sdsfree(val);
2096 return NULL;
2097 }
2098
2099 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2100 int isencoded;
2101 uint32_t len;
2102 sds val;
2103
2104 len = rdbLoadLen(fp,rdbver,&isencoded);
2105 if (isencoded) {
2106 switch(len) {
2107 case REDIS_RDB_ENC_INT8:
2108 case REDIS_RDB_ENC_INT16:
2109 case REDIS_RDB_ENC_INT32:
2110 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2111 case REDIS_RDB_ENC_LZF:
2112 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2113 default:
2114 assert(0!=0);
2115 }
2116 }
2117
2118 if (len == REDIS_RDB_LENERR) return NULL;
2119 val = sdsnewlen(NULL,len);
2120 if (len && fread(val,len,1,fp) == 0) {
2121 sdsfree(val);
2122 return NULL;
2123 }
2124 return tryObjectSharing(createObject(REDIS_STRING,val));
2125 }
2126
2127 static int rdbLoad(char *filename) {
2128 FILE *fp;
2129 robj *keyobj = NULL;
2130 uint32_t dbid;
2131 int type, retval, rdbver;
2132 dict *d = server.db[0].dict;
2133 redisDb *db = server.db+0;
2134 char buf[1024];
2135 time_t expiretime = -1, now = time(NULL);
2136
2137 fp = fopen(filename,"r");
2138 if (!fp) return REDIS_ERR;
2139 if (fread(buf,9,1,fp) == 0) goto eoferr;
2140 buf[9] = '\0';
2141 if (memcmp(buf,"REDIS",5) != 0) {
2142 fclose(fp);
2143 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2144 return REDIS_ERR;
2145 }
2146 rdbver = atoi(buf+5);
2147 if (rdbver > 1) {
2148 fclose(fp);
2149 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2150 return REDIS_ERR;
2151 }
2152 while(1) {
2153 robj *o;
2154
2155 /* Read type. */
2156 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2157 if (type == REDIS_EXPIRETIME) {
2158 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2159 /* We read the time so we need to read the object type again */
2160 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2161 }
2162 if (type == REDIS_EOF) break;
2163 /* Handle SELECT DB opcode as a special case */
2164 if (type == REDIS_SELECTDB) {
2165 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2166 goto eoferr;
2167 if (dbid >= (unsigned)server.dbnum) {
2168 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2169 exit(1);
2170 }
2171 db = server.db+dbid;
2172 d = db->dict;
2173 continue;
2174 }
2175 /* Read key */
2176 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2177
2178 if (type == REDIS_STRING) {
2179 /* Read string value */
2180 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2181 } else if (type == REDIS_LIST || type == REDIS_SET) {
2182 /* Read list/set value */
2183 uint32_t listlen;
2184
2185 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2186 goto eoferr;
2187 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2188 /* Load every single element of the list/set */
2189 while(listlen--) {
2190 robj *ele;
2191
2192 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2193 if (type == REDIS_LIST) {
2194 if (!listAddNodeTail((list*)o->ptr,ele))
2195 oom("listAddNodeTail");
2196 } else {
2197 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2198 oom("dictAdd");
2199 }
2200 }
2201 } else {
2202 assert(0 != 0);
2203 }
2204 /* Add the new object in the hash table */
2205 retval = dictAdd(d,keyobj,o);
2206 if (retval == DICT_ERR) {
2207 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2208 exit(1);
2209 }
2210 /* Set the expire time if needed */
2211 if (expiretime != -1) {
2212 setExpire(db,keyobj,expiretime);
2213 /* Delete this key if already expired */
2214 if (expiretime < now) deleteKey(db,keyobj);
2215 expiretime = -1;
2216 }
2217 keyobj = o = NULL;
2218 }
2219 fclose(fp);
2220 return REDIS_OK;
2221
2222 eoferr: /* unexpected end of file is handled here with a fatal exit */
2223 if (keyobj) decrRefCount(keyobj);
2224 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2225 exit(1);
2226 return REDIS_ERR; /* Just to avoid warning */
2227 }
2228
2229 /*================================== Commands =============================== */
2230
2231 static void authCommand(redisClient *c) {
2232 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2233 c->authenticated = 1;
2234 addReply(c,shared.ok);
2235 } else {
2236 c->authenticated = 0;
2237 addReply(c,shared.err);
2238 }
2239 }
2240
2241 static void pingCommand(redisClient *c) {
2242 addReply(c,shared.pong);
2243 }
2244
2245 static void echoCommand(redisClient *c) {
2246 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
2247 (int)sdslen(c->argv[1]->ptr)));
2248 addReply(c,c->argv[1]);
2249 addReply(c,shared.crlf);
2250 }
2251
2252 /*=================================== Strings =============================== */
2253
2254 static void setGenericCommand(redisClient *c, int nx) {
2255 int retval;
2256
2257 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2258 if (retval == DICT_ERR) {
2259 if (!nx) {
2260 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2261 incrRefCount(c->argv[2]);
2262 } else {
2263 addReply(c,shared.czero);
2264 return;
2265 }
2266 } else {
2267 incrRefCount(c->argv[1]);
2268 incrRefCount(c->argv[2]);
2269 }
2270 server.dirty++;
2271 removeExpire(c->db,c->argv[1]);
2272 addReply(c, nx ? shared.cone : shared.ok);
2273 }
2274
2275 static void setCommand(redisClient *c) {
2276 setGenericCommand(c,0);
2277 }
2278
2279 static void setnxCommand(redisClient *c) {
2280 setGenericCommand(c,1);
2281 }
2282
2283 static void getCommand(redisClient *c) {
2284 robj *o = lookupKeyRead(c->db,c->argv[1]);
2285
2286 if (o == NULL) {
2287 addReply(c,shared.nullbulk);
2288 } else {
2289 if (o->type != REDIS_STRING) {
2290 addReply(c,shared.wrongtypeerr);
2291 } else {
2292 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2293 addReply(c,o);
2294 addReply(c,shared.crlf);
2295 }
2296 }
2297 }
2298
2299 static void getSetCommand(redisClient *c) {
2300 getCommand(c);
2301 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2302 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2303 } else {
2304 incrRefCount(c->argv[1]);
2305 }
2306 incrRefCount(c->argv[2]);
2307 server.dirty++;
2308 removeExpire(c->db,c->argv[1]);
2309 }
2310
2311 static void mgetCommand(redisClient *c) {
2312 int j;
2313
2314 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2315 for (j = 1; j < c->argc; j++) {
2316 robj *o = lookupKeyRead(c->db,c->argv[j]);
2317 if (o == NULL) {
2318 addReply(c,shared.nullbulk);
2319 } else {
2320 if (o->type != REDIS_STRING) {
2321 addReply(c,shared.nullbulk);
2322 } else {
2323 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2324 addReply(c,o);
2325 addReply(c,shared.crlf);
2326 }
2327 }
2328 }
2329 }
2330
2331 static void incrDecrCommand(redisClient *c, long long incr) {
2332 long long value;
2333 int retval;
2334 robj *o;
2335
2336 o = lookupKeyWrite(c->db,c->argv[1]);
2337 if (o == NULL) {
2338 value = 0;
2339 } else {
2340 if (o->type != REDIS_STRING) {
2341 value = 0;
2342 } else {
2343 char *eptr;
2344
2345 value = strtoll(o->ptr, &eptr, 10);
2346 }
2347 }
2348
2349 value += incr;
2350 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2351 retval = dictAdd(c->db->dict,c->argv[1],o);
2352 if (retval == DICT_ERR) {
2353 dictReplace(c->db->dict,c->argv[1],o);
2354 removeExpire(c->db,c->argv[1]);
2355 } else {
2356 incrRefCount(c->argv[1]);
2357 }
2358 server.dirty++;
2359 addReply(c,shared.colon);
2360 addReply(c,o);
2361 addReply(c,shared.crlf);
2362 }
2363
2364 static void incrCommand(redisClient *c) {
2365 incrDecrCommand(c,1);
2366 }
2367
2368 static void decrCommand(redisClient *c) {
2369 incrDecrCommand(c,-1);
2370 }
2371
2372 static void incrbyCommand(redisClient *c) {
2373 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2374 incrDecrCommand(c,incr);
2375 }
2376
2377 static void decrbyCommand(redisClient *c) {
2378 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2379 incrDecrCommand(c,-incr);
2380 }
2381
2382 /* ========================= Type agnostic commands ========================= */
2383
2384 static void delCommand(redisClient *c) {
2385 int deleted = 0, j;
2386
2387 for (j = 1; j < c->argc; j++) {
2388 if (deleteKey(c->db,c->argv[j])) {
2389 server.dirty++;
2390 deleted++;
2391 }
2392 }
2393 switch(deleted) {
2394 case 0:
2395 addReply(c,shared.czero);
2396 break;
2397 case 1:
2398 addReply(c,shared.cone);
2399 break;
2400 default:
2401 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2402 break;
2403 }
2404 }
2405
2406 static void existsCommand(redisClient *c) {
2407 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2408 }
2409
2410 static void selectCommand(redisClient *c) {
2411 int id = atoi(c->argv[1]->ptr);
2412
2413 if (selectDb(c,id) == REDIS_ERR) {
2414 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2415 } else {
2416 addReply(c,shared.ok);
2417 }
2418 }
2419
2420 static void randomkeyCommand(redisClient *c) {
2421 dictEntry *de;
2422
2423 while(1) {
2424 de = dictGetRandomKey(c->db->dict);
2425 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2426 }
2427 if (de == NULL) {
2428 addReply(c,shared.plus);
2429 addReply(c,shared.crlf);
2430 } else {
2431 addReply(c,shared.plus);
2432 addReply(c,dictGetEntryKey(de));
2433 addReply(c,shared.crlf);
2434 }
2435 }
2436
2437 static void keysCommand(redisClient *c) {
2438 dictIterator *di;
2439 dictEntry *de;
2440 sds pattern = c->argv[1]->ptr;
2441 int plen = sdslen(pattern);
2442 int numkeys = 0, keyslen = 0;
2443 robj *lenobj = createObject(REDIS_STRING,NULL);
2444
2445 di = dictGetIterator(c->db->dict);
2446 if (!di) oom("dictGetIterator");
2447 addReply(c,lenobj);
2448 decrRefCount(lenobj);
2449 while((de = dictNext(di)) != NULL) {
2450 robj *keyobj = dictGetEntryKey(de);
2451
2452 sds key = keyobj->ptr;
2453 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2454 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2455 if (expireIfNeeded(c->db,keyobj) == 0) {
2456 if (numkeys != 0)
2457 addReply(c,shared.space);
2458 addReply(c,keyobj);
2459 numkeys++;
2460 keyslen += sdslen(key);
2461 }
2462 }
2463 }
2464 dictReleaseIterator(di);
2465 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2466 addReply(c,shared.crlf);
2467 }
2468
2469 static void dbsizeCommand(redisClient *c) {
2470 addReplySds(c,
2471 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2472 }
2473
2474 static void lastsaveCommand(redisClient *c) {
2475 addReplySds(c,
2476 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2477 }
2478
2479 static void typeCommand(redisClient *c) {
2480 robj *o;
2481 char *type;
2482
2483 o = lookupKeyRead(c->db,c->argv[1]);
2484 if (o == NULL) {
2485 type = "+none";
2486 } else {
2487 switch(o->type) {
2488 case REDIS_STRING: type = "+string"; break;
2489 case REDIS_LIST: type = "+list"; break;
2490 case REDIS_SET: type = "+set"; break;
2491 default: type = "unknown"; break;
2492 }
2493 }
2494 addReplySds(c,sdsnew(type));
2495 addReply(c,shared.crlf);
2496 }
2497
2498 static void saveCommand(redisClient *c) {
2499 if (server.bgsaveinprogress) {
2500 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2501 return;
2502 }
2503 if (rdbSave(server.dbfilename) == REDIS_OK) {
2504 addReply(c,shared.ok);
2505 } else {
2506 addReply(c,shared.err);
2507 }
2508 }
2509
2510 static void bgsaveCommand(redisClient *c) {
2511 if (server.bgsaveinprogress) {
2512 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2513 return;
2514 }
2515 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2516 addReply(c,shared.ok);
2517 } else {
2518 addReply(c,shared.err);
2519 }
2520 }
2521
2522 static void shutdownCommand(redisClient *c) {
2523 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2524 if (server.bgsaveinprogress) {
2525 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2526 signal(SIGCHLD, SIG_IGN);
2527 kill(server.bgsavechildpid,SIGKILL);
2528 }
2529 if (rdbSave(server.dbfilename) == REDIS_OK) {
2530 if (server.daemonize)
2531 unlink(server.pidfile);
2532 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2533 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2534 exit(1);
2535 } else {
2536 signal(SIGCHLD, SIG_DFL);
2537 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2538 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2539 }
2540 }
2541
2542 static void renameGenericCommand(redisClient *c, int nx) {
2543 robj *o;
2544
2545 /* To use the same key as src and dst is probably an error */
2546 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2547 addReply(c,shared.sameobjecterr);
2548 return;
2549 }
2550
2551 o = lookupKeyWrite(c->db,c->argv[1]);
2552 if (o == NULL) {
2553 addReply(c,shared.nokeyerr);
2554 return;
2555 }
2556 incrRefCount(o);
2557 deleteIfVolatile(c->db,c->argv[2]);
2558 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2559 if (nx) {
2560 decrRefCount(o);
2561 addReply(c,shared.czero);
2562 return;
2563 }
2564 dictReplace(c->db->dict,c->argv[2],o);
2565 } else {
2566 incrRefCount(c->argv[2]);
2567 }
2568 deleteKey(c->db,c->argv[1]);
2569 server.dirty++;
2570 addReply(c,nx ? shared.cone : shared.ok);
2571 }
2572
2573 static void renameCommand(redisClient *c) {
2574 renameGenericCommand(c,0);
2575 }
2576
2577 static void renamenxCommand(redisClient *c) {
2578 renameGenericCommand(c,1);
2579 }
2580
2581 static void moveCommand(redisClient *c) {
2582 robj *o;
2583 redisDb *src, *dst;
2584 int srcid;
2585
2586 /* Obtain source and target DB pointers */
2587 src = c->db;
2588 srcid = c->db->id;
2589 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2590 addReply(c,shared.outofrangeerr);
2591 return;
2592 }
2593 dst = c->db;
2594 selectDb(c,srcid); /* Back to the source DB */
2595
2596 /* If the user is moving using as target the same
2597 * DB as the source DB it is probably an error. */
2598 if (src == dst) {
2599 addReply(c,shared.sameobjecterr);
2600 return;
2601 }
2602
2603 /* Check if the element exists and get a reference */
2604 o = lookupKeyWrite(c->db,c->argv[1]);
2605 if (!o) {
2606 addReply(c,shared.czero);
2607 return;
2608 }
2609
2610 /* Try to add the element to the target DB */
2611 deleteIfVolatile(dst,c->argv[1]);
2612 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2613 addReply(c,shared.czero);
2614 return;
2615 }
2616 incrRefCount(c->argv[1]);
2617 incrRefCount(o);
2618
2619 /* OK! key moved, free the entry in the source DB */
2620 deleteKey(src,c->argv[1]);
2621 server.dirty++;
2622 addReply(c,shared.cone);
2623 }
2624
2625 /* =================================== Lists ================================ */
2626 static void pushGenericCommand(redisClient *c, int where) {
2627 robj *lobj;
2628 list *list;
2629
2630 lobj = lookupKeyWrite(c->db,c->argv[1]);
2631 if (lobj == NULL) {
2632 lobj = createListObject();
2633 list = lobj->ptr;
2634 if (where == REDIS_HEAD) {
2635 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2636 } else {
2637 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2638 }
2639 dictAdd(c->db->dict,c->argv[1],lobj);
2640 incrRefCount(c->argv[1]);
2641 incrRefCount(c->argv[2]);
2642 } else {
2643 if (lobj->type != REDIS_LIST) {
2644 addReply(c,shared.wrongtypeerr);
2645 return;
2646 }
2647 list = lobj->ptr;
2648 if (where == REDIS_HEAD) {
2649 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2650 } else {
2651 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2652 }
2653 incrRefCount(c->argv[2]);
2654 }
2655 server.dirty++;
2656 addReply(c,shared.ok);
2657 }
2658
2659 static void lpushCommand(redisClient *c) {
2660 pushGenericCommand(c,REDIS_HEAD);
2661 }
2662
2663 static void rpushCommand(redisClient *c) {
2664 pushGenericCommand(c,REDIS_TAIL);
2665 }
2666
2667 static void llenCommand(redisClient *c) {
2668 robj *o;
2669 list *l;
2670
2671 o = lookupKeyRead(c->db,c->argv[1]);
2672 if (o == NULL) {
2673 addReply(c,shared.czero);
2674 return;
2675 } else {
2676 if (o->type != REDIS_LIST) {
2677 addReply(c,shared.wrongtypeerr);
2678 } else {
2679 l = o->ptr;
2680 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
2681 }
2682 }
2683 }
2684
2685 static void lindexCommand(redisClient *c) {
2686 robj *o;
2687 int index = atoi(c->argv[2]->ptr);
2688
2689 o = lookupKeyRead(c->db,c->argv[1]);
2690 if (o == NULL) {
2691 addReply(c,shared.nullbulk);
2692 } else {
2693 if (o->type != REDIS_LIST) {
2694 addReply(c,shared.wrongtypeerr);
2695 } else {
2696 list *list = o->ptr;
2697 listNode *ln;
2698
2699 ln = listIndex(list, index);
2700 if (ln == NULL) {
2701 addReply(c,shared.nullbulk);
2702 } else {
2703 robj *ele = listNodeValue(ln);
2704 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2705 addReply(c,ele);
2706 addReply(c,shared.crlf);
2707 }
2708 }
2709 }
2710 }
2711
2712 static void lsetCommand(redisClient *c) {
2713 robj *o;
2714 int index = atoi(c->argv[2]->ptr);
2715
2716 o = lookupKeyWrite(c->db,c->argv[1]);
2717 if (o == NULL) {
2718 addReply(c,shared.nokeyerr);
2719 } else {
2720 if (o->type != REDIS_LIST) {
2721 addReply(c,shared.wrongtypeerr);
2722 } else {
2723 list *list = o->ptr;
2724 listNode *ln;
2725
2726 ln = listIndex(list, index);
2727 if (ln == NULL) {
2728 addReply(c,shared.outofrangeerr);
2729 } else {
2730 robj *ele = listNodeValue(ln);
2731
2732 decrRefCount(ele);
2733 listNodeValue(ln) = c->argv[3];
2734 incrRefCount(c->argv[3]);
2735 addReply(c,shared.ok);
2736 server.dirty++;
2737 }
2738 }
2739 }
2740 }
2741
2742 static void popGenericCommand(redisClient *c, int where) {
2743 robj *o;
2744
2745 o = lookupKeyWrite(c->db,c->argv[1]);
2746 if (o == NULL) {
2747 addReply(c,shared.nullbulk);
2748 } else {
2749 if (o->type != REDIS_LIST) {
2750 addReply(c,shared.wrongtypeerr);
2751 } else {
2752 list *list = o->ptr;
2753 listNode *ln;
2754
2755 if (where == REDIS_HEAD)
2756 ln = listFirst(list);
2757 else
2758 ln = listLast(list);
2759
2760 if (ln == NULL) {
2761 addReply(c,shared.nullbulk);
2762 } else {
2763 robj *ele = listNodeValue(ln);
2764 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2765 addReply(c,ele);
2766 addReply(c,shared.crlf);
2767 listDelNode(list,ln);
2768 server.dirty++;
2769 }
2770 }
2771 }
2772 }
2773
2774 static void lpopCommand(redisClient *c) {
2775 popGenericCommand(c,REDIS_HEAD);
2776 }
2777
2778 static void rpopCommand(redisClient *c) {
2779 popGenericCommand(c,REDIS_TAIL);
2780 }
2781
2782 static void lrangeCommand(redisClient *c) {
2783 robj *o;
2784 int start = atoi(c->argv[2]->ptr);
2785 int end = atoi(c->argv[3]->ptr);
2786
2787 o = lookupKeyRead(c->db,c->argv[1]);
2788 if (o == NULL) {
2789 addReply(c,shared.nullmultibulk);
2790 } else {
2791 if (o->type != REDIS_LIST) {
2792 addReply(c,shared.wrongtypeerr);
2793 } else {
2794 list *list = o->ptr;
2795 listNode *ln;
2796 int llen = listLength(list);
2797 int rangelen, j;
2798 robj *ele;
2799
2800 /* convert negative indexes */
2801 if (start < 0) start = llen+start;
2802 if (end < 0) end = llen+end;
2803 if (start < 0) start = 0;
2804 if (end < 0) end = 0;
2805
2806 /* indexes sanity checks */
2807 if (start > end || start >= llen) {
2808 /* Out of range start or start > end result in empty list */
2809 addReply(c,shared.emptymultibulk);
2810 return;
2811 }
2812 if (end >= llen) end = llen-1;
2813 rangelen = (end-start)+1;
2814
2815 /* Return the result in form of a multi-bulk reply */
2816 ln = listIndex(list, start);
2817 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
2818 for (j = 0; j < rangelen; j++) {
2819 ele = listNodeValue(ln);
2820 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2821 addReply(c,ele);
2822 addReply(c,shared.crlf);
2823 ln = ln->next;
2824 }
2825 }
2826 }
2827 }
2828
2829 static void ltrimCommand(redisClient *c) {
2830 robj *o;
2831 int start = atoi(c->argv[2]->ptr);
2832 int end = atoi(c->argv[3]->ptr);
2833
2834 o = lookupKeyWrite(c->db,c->argv[1]);
2835 if (o == NULL) {
2836 addReply(c,shared.nokeyerr);
2837 } else {
2838 if (o->type != REDIS_LIST) {
2839 addReply(c,shared.wrongtypeerr);
2840 } else {
2841 list *list = o->ptr;
2842 listNode *ln;
2843 int llen = listLength(list);
2844 int j, ltrim, rtrim;
2845
2846 /* convert negative indexes */
2847 if (start < 0) start = llen+start;
2848 if (end < 0) end = llen+end;
2849 if (start < 0) start = 0;
2850 if (end < 0) end = 0;
2851
2852 /* indexes sanity checks */
2853 if (start > end || start >= llen) {
2854 /* Out of range start or start > end result in empty list */
2855 ltrim = llen;
2856 rtrim = 0;
2857 } else {
2858 if (end >= llen) end = llen-1;
2859 ltrim = start;
2860 rtrim = llen-end-1;
2861 }
2862
2863 /* Remove list elements to perform the trim */
2864 for (j = 0; j < ltrim; j++) {
2865 ln = listFirst(list);
2866 listDelNode(list,ln);
2867 }
2868 for (j = 0; j < rtrim; j++) {
2869 ln = listLast(list);
2870 listDelNode(list,ln);
2871 }
2872 addReply(c,shared.ok);
2873 server.dirty++;
2874 }
2875 }
2876 }
2877
2878 static void lremCommand(redisClient *c) {
2879 robj *o;
2880
2881 o = lookupKeyWrite(c->db,c->argv[1]);
2882 if (o == NULL) {
2883 addReply(c,shared.czero);
2884 } else {
2885 if (o->type != REDIS_LIST) {
2886 addReply(c,shared.wrongtypeerr);
2887 } else {
2888 list *list = o->ptr;
2889 listNode *ln, *next;
2890 int toremove = atoi(c->argv[2]->ptr);
2891 int removed = 0;
2892 int fromtail = 0;
2893
2894 if (toremove < 0) {
2895 toremove = -toremove;
2896 fromtail = 1;
2897 }
2898 ln = fromtail ? list->tail : list->head;
2899 while (ln) {
2900 robj *ele = listNodeValue(ln);
2901
2902 next = fromtail ? ln->prev : ln->next;
2903 if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) {
2904 listDelNode(list,ln);
2905 server.dirty++;
2906 removed++;
2907 if (toremove && removed == toremove) break;
2908 }
2909 ln = next;
2910 }
2911 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
2912 }
2913 }
2914 }
2915
2916 /* ==================================== Sets ================================ */
2917
2918 static void saddCommand(redisClient *c) {
2919 robj *set;
2920
2921 set = lookupKeyWrite(c->db,c->argv[1]);
2922 if (set == NULL) {
2923 set = createSetObject();
2924 dictAdd(c->db->dict,c->argv[1],set);
2925 incrRefCount(c->argv[1]);
2926 } else {
2927 if (set->type != REDIS_SET) {
2928 addReply(c,shared.wrongtypeerr);
2929 return;
2930 }
2931 }
2932 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
2933 incrRefCount(c->argv[2]);
2934 server.dirty++;
2935 addReply(c,shared.cone);
2936 } else {
2937 addReply(c,shared.czero);
2938 }
2939 }
2940
2941 static void sremCommand(redisClient *c) {
2942 robj *set;
2943
2944 set = lookupKeyWrite(c->db,c->argv[1]);
2945 if (set == NULL) {
2946 addReply(c,shared.czero);
2947 } else {
2948 if (set->type != REDIS_SET) {
2949 addReply(c,shared.wrongtypeerr);
2950 return;
2951 }
2952 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
2953 server.dirty++;
2954 addReply(c,shared.cone);
2955 } else {
2956 addReply(c,shared.czero);
2957 }
2958 }
2959 }
2960
2961 static void smoveCommand(redisClient *c) {
2962 robj *srcset, *dstset;
2963
2964 srcset = lookupKeyWrite(c->db,c->argv[1]);
2965 dstset = lookupKeyWrite(c->db,c->argv[2]);
2966
2967 /* If the source key does not exist return 0, if it's of the wrong type
2968 * raise an error */
2969 if (srcset == NULL || srcset->type != REDIS_SET) {
2970 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
2971 return;
2972 }
2973 /* Error if the destination key is not a set as well */
2974 if (dstset && dstset->type != REDIS_SET) {
2975 addReply(c,shared.wrongtypeerr);
2976 return;
2977 }
2978 /* Remove the element from the source set */
2979 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
2980 /* Key not found in the src set! return zero */
2981 addReply(c,shared.czero);
2982 return;
2983 }
2984 server.dirty++;
2985 /* Add the element to the destination set */
2986 if (!dstset) {
2987 dstset = createSetObject();
2988 dictAdd(c->db->dict,c->argv[2],dstset);
2989 incrRefCount(c->argv[2]);
2990 }
2991 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
2992 incrRefCount(c->argv[3]);
2993 addReply(c,shared.cone);
2994 }
2995
2996 static void sismemberCommand(redisClient *c) {
2997 robj *set;
2998
2999 set = lookupKeyRead(c->db,c->argv[1]);
3000 if (set == NULL) {
3001 addReply(c,shared.czero);
3002 } else {
3003 if (set->type != REDIS_SET) {
3004 addReply(c,shared.wrongtypeerr);
3005 return;
3006 }
3007 if (dictFind(set->ptr,c->argv[2]))
3008 addReply(c,shared.cone);
3009 else
3010 addReply(c,shared.czero);
3011 }
3012 }
3013
3014 static void scardCommand(redisClient *c) {
3015 robj *o;
3016 dict *s;
3017
3018 o = lookupKeyRead(c->db,c->argv[1]);
3019 if (o == NULL) {
3020 addReply(c,shared.czero);
3021 return;
3022 } else {
3023 if (o->type != REDIS_SET) {
3024 addReply(c,shared.wrongtypeerr);
3025 } else {
3026 s = o->ptr;
3027 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3028 dictSize(s)));
3029 }
3030 }
3031 }
3032
3033 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3034 dict **d1 = (void*) s1, **d2 = (void*) s2;
3035
3036 return dictSize(*d1)-dictSize(*d2);
3037 }
3038
3039 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3040 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3041 dictIterator *di;
3042 dictEntry *de;
3043 robj *lenobj = NULL, *dstset = NULL;
3044 int j, cardinality = 0;
3045
3046 if (!dv) oom("sinterGenericCommand");
3047 for (j = 0; j < setsnum; j++) {
3048 robj *setobj;
3049
3050 setobj = dstkey ?
3051 lookupKeyWrite(c->db,setskeys[j]) :
3052 lookupKeyRead(c->db,setskeys[j]);
3053 if (!setobj) {
3054 zfree(dv);
3055 if (dstkey) {
3056 deleteKey(c->db,dstkey);
3057 addReply(c,shared.ok);
3058 } else {
3059 addReply(c,shared.nullmultibulk);
3060 }
3061 return;
3062 }
3063 if (setobj->type != REDIS_SET) {
3064 zfree(dv);
3065 addReply(c,shared.wrongtypeerr);
3066 return;
3067 }
3068 dv[j] = setobj->ptr;
3069 }
3070 /* Sort sets from the smallest to largest, this will improve our
3071 * algorithm's performace */
3072 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3073
3074 /* The first thing we should output is the total number of elements...
3075 * since this is a multi-bulk write, but at this stage we don't know
3076 * the intersection set size, so we use a trick, append an empty object
3077 * to the output list and save the pointer to later modify it with the
3078 * right length */
3079 if (!dstkey) {
3080 lenobj = createObject(REDIS_STRING,NULL);
3081 addReply(c,lenobj);
3082 decrRefCount(lenobj);
3083 } else {
3084 /* If we have a target key where to store the resulting set
3085 * create this key with an empty set inside */
3086 dstset = createSetObject();
3087 }
3088
3089 /* Iterate all the elements of the first (smallest) set, and test
3090 * the element against all the other sets, if at least one set does
3091 * not include the element it is discarded */
3092 di = dictGetIterator(dv[0]);
3093 if (!di) oom("dictGetIterator");
3094
3095 while((de = dictNext(di)) != NULL) {
3096 robj *ele;
3097
3098 for (j = 1; j < setsnum; j++)
3099 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3100 if (j != setsnum)
3101 continue; /* at least one set does not contain the member */
3102 ele = dictGetEntryKey(de);
3103 if (!dstkey) {
3104 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
3105 addReply(c,ele);
3106 addReply(c,shared.crlf);
3107 cardinality++;
3108 } else {
3109 dictAdd(dstset->ptr,ele,NULL);
3110 incrRefCount(ele);
3111 }
3112 }
3113 dictReleaseIterator(di);
3114
3115 if (dstkey) {
3116 /* Store the resulting set into the target */
3117 deleteKey(c->db,dstkey);
3118 dictAdd(c->db->dict,dstkey,dstset);
3119 incrRefCount(dstkey);
3120 }
3121
3122 if (!dstkey) {
3123 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3124 } else {
3125 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3126 dictSize((dict*)dstset->ptr)));
3127 server.dirty++;
3128 }
3129 zfree(dv);
3130 }
3131
3132 static void sinterCommand(redisClient *c) {
3133 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3134 }
3135
3136 static void sinterstoreCommand(redisClient *c) {
3137 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3138 }
3139
3140 #define REDIS_OP_UNION 0
3141 #define REDIS_OP_DIFF 1
3142
3143 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3144 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3145 dictIterator *di;
3146 dictEntry *de;
3147 robj *dstset = NULL;
3148 int j, cardinality = 0;
3149
3150 if (!dv) oom("sunionDiffGenericCommand");
3151 for (j = 0; j < setsnum; j++) {
3152 robj *setobj;
3153
3154 setobj = dstkey ?
3155 lookupKeyWrite(c->db,setskeys[j]) :
3156 lookupKeyRead(c->db,setskeys[j]);
3157 if (!setobj) {
3158 dv[j] = NULL;
3159 continue;
3160 }
3161 if (setobj->type != REDIS_SET) {
3162 zfree(dv);
3163 addReply(c,shared.wrongtypeerr);
3164 return;
3165 }
3166 dv[j] = setobj->ptr;
3167 }
3168
3169 /* We need a temp set object to store our union. If the dstkey
3170 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3171 * this set object will be the resulting object to set into the target key*/
3172 dstset = createSetObject();
3173
3174 /* Iterate all the elements of all the sets, add every element a single
3175 * time to the result set */
3176 for (j = 0; j < setsnum; j++) {
3177 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3178 if (!dv[j]) continue; /* non existing keys are like empty sets */
3179
3180 di = dictGetIterator(dv[j]);
3181 if (!di) oom("dictGetIterator");
3182
3183 while((de = dictNext(di)) != NULL) {
3184 robj *ele;
3185
3186 /* dictAdd will not add the same element multiple times */
3187 ele = dictGetEntryKey(de);
3188 if (op == REDIS_OP_UNION || j == 0) {
3189 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3190 incrRefCount(ele);
3191 cardinality++;
3192 }
3193 } else if (op == REDIS_OP_DIFF) {
3194 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3195 cardinality--;
3196 }
3197 }
3198 }
3199 dictReleaseIterator(di);
3200
3201 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3202 }
3203
3204 /* Output the content of the resulting set, if not in STORE mode */
3205 if (!dstkey) {
3206 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3207 di = dictGetIterator(dstset->ptr);
3208 if (!di) oom("dictGetIterator");
3209 while((de = dictNext(di)) != NULL) {
3210 robj *ele;
3211
3212 ele = dictGetEntryKey(de);
3213 addReplySds(c,sdscatprintf(sdsempty(),
3214 "$%d\r\n",sdslen(ele->ptr)));
3215 addReply(c,ele);
3216 addReply(c,shared.crlf);
3217 }
3218 dictReleaseIterator(di);
3219 } else {
3220 /* If we have a target key where to store the resulting set
3221 * create this key with the result set inside */
3222 deleteKey(c->db,dstkey);
3223 dictAdd(c->db->dict,dstkey,dstset);
3224 incrRefCount(dstkey);
3225 }
3226
3227 /* Cleanup */
3228 if (!dstkey) {
3229 decrRefCount(dstset);
3230 } else {
3231 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3232 dictSize((dict*)dstset->ptr)));
3233 server.dirty++;
3234 }
3235 zfree(dv);
3236 }
3237
3238 static void sunionCommand(redisClient *c) {
3239 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3240 }
3241
3242 static void sunionstoreCommand(redisClient *c) {
3243 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3244 }
3245
3246 static void sdiffCommand(redisClient *c) {
3247 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3248 }
3249
3250 static void sdiffstoreCommand(redisClient *c) {
3251 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3252 }
3253
3254 static void flushdbCommand(redisClient *c) {
3255 server.dirty += dictSize(c->db->dict);
3256 dictEmpty(c->db->dict);
3257 dictEmpty(c->db->expires);
3258 addReply(c,shared.ok);
3259 }
3260
3261 static void flushallCommand(redisClient *c) {
3262 server.dirty += emptyDb();
3263 addReply(c,shared.ok);
3264 rdbSave(server.dbfilename);
3265 server.dirty++;
3266 }
3267
3268 redisSortOperation *createSortOperation(int type, robj *pattern) {
3269 redisSortOperation *so = zmalloc(sizeof(*so));
3270 if (!so) oom("createSortOperation");
3271 so->type = type;
3272 so->pattern = pattern;
3273 return so;
3274 }
3275
3276 /* Return the value associated to the key with a name obtained
3277 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3278 robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3279 char *p;
3280 sds spat, ssub;
3281 robj keyobj;
3282 int prefixlen, sublen, postfixlen;
3283 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3284 struct {
3285 long len;
3286 long free;
3287 char buf[REDIS_SORTKEY_MAX+1];
3288 } keyname;
3289
3290 spat = pattern->ptr;
3291 ssub = subst->ptr;
3292 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3293 p = strchr(spat,'*');
3294 if (!p) return NULL;
3295
3296 prefixlen = p-spat;
3297 sublen = sdslen(ssub);
3298 postfixlen = sdslen(spat)-(prefixlen+1);
3299 memcpy(keyname.buf,spat,prefixlen);
3300 memcpy(keyname.buf+prefixlen,ssub,sublen);
3301 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3302 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3303 keyname.len = prefixlen+sublen+postfixlen;
3304
3305 keyobj.refcount = 1;
3306 keyobj.type = REDIS_STRING;
3307 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3308
3309 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3310 return lookupKeyRead(db,&keyobj);
3311 }
3312
3313 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3314 * the additional parameter is not standard but a BSD-specific we have to
3315 * pass sorting parameters via the global 'server' structure */
3316 static int sortCompare(const void *s1, const void *s2) {
3317 const redisSortObject *so1 = s1, *so2 = s2;
3318 int cmp;
3319
3320 if (!server.sort_alpha) {
3321 /* Numeric sorting. Here it's trivial as we precomputed scores */
3322 if (so1->u.score > so2->u.score) {
3323 cmp = 1;
3324 } else if (so1->u.score < so2->u.score) {
3325 cmp = -1;
3326 } else {
3327 cmp = 0;
3328 }
3329 } else {
3330 /* Alphanumeric sorting */
3331 if (server.sort_bypattern) {
3332 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3333 /* At least one compare object is NULL */
3334 if (so1->u.cmpobj == so2->u.cmpobj)
3335 cmp = 0;
3336 else if (so1->u.cmpobj == NULL)
3337 cmp = -1;
3338 else
3339 cmp = 1;
3340 } else {
3341 /* We have both the objects, use strcoll */
3342 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3343 }
3344 } else {
3345 /* Compare elements directly */
3346 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3347 }
3348 }
3349 return server.sort_desc ? -cmp : cmp;
3350 }
3351
3352 /* The SORT command is the most complex command in Redis. Warning: this code
3353 * is optimized for speed and a bit less for readability */
3354 static void sortCommand(redisClient *c) {
3355 list *operations;
3356 int outputlen = 0;
3357 int desc = 0, alpha = 0;
3358 int limit_start = 0, limit_count = -1, start, end;
3359 int j, dontsort = 0, vectorlen;
3360 int getop = 0; /* GET operation counter */
3361 robj *sortval, *sortby = NULL;
3362 redisSortObject *vector; /* Resulting vector to sort */
3363
3364 /* Lookup the key to sort. It must be of the right types */
3365 sortval = lookupKeyRead(c->db,c->argv[1]);
3366 if (sortval == NULL) {
3367 addReply(c,shared.nokeyerr);
3368 return;
3369 }
3370 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3371 addReply(c,shared.wrongtypeerr);
3372 return;
3373 }
3374
3375 /* Create a list of operations to perform for every sorted element.
3376 * Operations can be GET/DEL/INCR/DECR */
3377 operations = listCreate();
3378 listSetFreeMethod(operations,zfree);
3379 j = 2;
3380
3381 /* Now we need to protect sortval incrementing its count, in the future
3382 * SORT may have options able to overwrite/delete keys during the sorting
3383 * and the sorted key itself may get destroied */
3384 incrRefCount(sortval);
3385
3386 /* The SORT command has an SQL-alike syntax, parse it */
3387 while(j < c->argc) {
3388 int leftargs = c->argc-j-1;
3389 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3390 desc = 0;
3391 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3392 desc = 1;
3393 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3394 alpha = 1;
3395 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3396 limit_start = atoi(c->argv[j+1]->ptr);
3397 limit_count = atoi(c->argv[j+2]->ptr);
3398 j+=2;
3399 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3400 sortby = c->argv[j+1];
3401 /* If the BY pattern does not contain '*', i.e. it is constant,
3402 * we don't need to sort nor to lookup the weight keys. */
3403 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3404 j++;
3405 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3406 listAddNodeTail(operations,createSortOperation(
3407 REDIS_SORT_GET,c->argv[j+1]));
3408 getop++;
3409 j++;
3410 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3411 listAddNodeTail(operations,createSortOperation(
3412 REDIS_SORT_DEL,c->argv[j+1]));
3413 j++;
3414 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3415 listAddNodeTail(operations,createSortOperation(
3416 REDIS_SORT_INCR,c->argv[j+1]));
3417 j++;
3418 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3419 listAddNodeTail(operations,createSortOperation(
3420 REDIS_SORT_DECR,c->argv[j+1]));
3421 j++;
3422 } else {
3423 decrRefCount(sortval);
3424 listRelease(operations);
3425 addReply(c,shared.syntaxerr);
3426 return;
3427 }
3428 j++;
3429 }
3430
3431 /* Load the sorting vector with all the objects to sort */
3432 vectorlen = (sortval->type == REDIS_LIST) ?
3433 listLength((list*)sortval->ptr) :
3434 dictSize((dict*)sortval->ptr);
3435 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3436 if (!vector) oom("allocating objects vector for SORT");
3437 j = 0;
3438 if (sortval->type == REDIS_LIST) {
3439 list *list = sortval->ptr;
3440 listNode *ln;
3441
3442 listRewind(list);
3443 while((ln = listYield(list))) {
3444 robj *ele = ln->value;
3445 vector[j].obj = ele;
3446 vector[j].u.score = 0;
3447 vector[j].u.cmpobj = NULL;
3448 j++;
3449 }
3450 } else {
3451 dict *set = sortval->ptr;
3452 dictIterator *di;
3453 dictEntry *setele;
3454
3455 di = dictGetIterator(set);
3456 if (!di) oom("dictGetIterator");
3457 while((setele = dictNext(di)) != NULL) {
3458 vector[j].obj = dictGetEntryKey(setele);
3459 vector[j].u.score = 0;
3460 vector[j].u.cmpobj = NULL;
3461 j++;
3462 }
3463 dictReleaseIterator(di);
3464 }
3465 assert(j == vectorlen);
3466
3467 /* Now it's time to load the right scores in the sorting vector */
3468 if (dontsort == 0) {
3469 for (j = 0; j < vectorlen; j++) {
3470 if (sortby) {
3471 robj *byval;
3472
3473 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3474 if (!byval || byval->type != REDIS_STRING) continue;
3475 if (alpha) {
3476 vector[j].u.cmpobj = byval;
3477 incrRefCount(byval);
3478 } else {
3479 vector[j].u.score = strtod(byval->ptr,NULL);
3480 }
3481 } else {
3482 if (!alpha) vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3483 }
3484 }
3485 }
3486
3487 /* We are ready to sort the vector... perform a bit of sanity check
3488 * on the LIMIT option too. We'll use a partial version of quicksort. */
3489 start = (limit_start < 0) ? 0 : limit_start;
3490 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3491 if (start >= vectorlen) {
3492 start = vectorlen-1;
3493 end = vectorlen-2;
3494 }
3495 if (end >= vectorlen) end = vectorlen-1;
3496
3497 if (dontsort == 0) {
3498 server.sort_desc = desc;
3499 server.sort_alpha = alpha;
3500 server.sort_bypattern = sortby ? 1 : 0;
3501 if (sortby && (start != 0 || end != vectorlen-1))
3502 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3503 else
3504 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3505 }
3506
3507 /* Send command output to the output buffer, performing the specified
3508 * GET/DEL/INCR/DECR operations if any. */
3509 outputlen = getop ? getop*(end-start+1) : end-start+1;
3510 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3511 for (j = start; j <= end; j++) {
3512 listNode *ln;
3513 if (!getop) {
3514 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3515 sdslen(vector[j].obj->ptr)));
3516 addReply(c,vector[j].obj);
3517 addReply(c,shared.crlf);
3518 }
3519 listRewind(operations);
3520 while((ln = listYield(operations))) {
3521 redisSortOperation *sop = ln->value;
3522 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3523 vector[j].obj);
3524
3525 if (sop->type == REDIS_SORT_GET) {
3526 if (!val || val->type != REDIS_STRING) {
3527 addReply(c,shared.nullbulk);
3528 } else {
3529 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3530 sdslen(val->ptr)));
3531 addReply(c,val);
3532 addReply(c,shared.crlf);
3533 }
3534 } else if (sop->type == REDIS_SORT_DEL) {
3535 /* TODO */
3536 }
3537 }
3538 }
3539
3540 /* Cleanup */
3541 decrRefCount(sortval);
3542 listRelease(operations);
3543 for (j = 0; j < vectorlen; j++) {
3544 if (sortby && alpha && vector[j].u.cmpobj)
3545 decrRefCount(vector[j].u.cmpobj);
3546 }
3547 zfree(vector);
3548 }
3549
3550 static void infoCommand(redisClient *c) {
3551 sds info;
3552 time_t uptime = time(NULL)-server.stat_starttime;
3553
3554 info = sdscatprintf(sdsempty(),
3555 "redis_version:%s\r\n"
3556 "uptime_in_seconds:%d\r\n"
3557 "uptime_in_days:%d\r\n"
3558 "connected_clients:%d\r\n"
3559 "connected_slaves:%d\r\n"
3560 "used_memory:%zu\r\n"
3561 "changes_since_last_save:%lld\r\n"
3562 "bgsave_in_progress:%d\r\n"
3563 "last_save_time:%d\r\n"
3564 "total_connections_received:%lld\r\n"
3565 "total_commands_processed:%lld\r\n"
3566 "role:%s\r\n"
3567 ,REDIS_VERSION,
3568 uptime,
3569 uptime/(3600*24),
3570 listLength(server.clients)-listLength(server.slaves),
3571 listLength(server.slaves),
3572 server.usedmemory,
3573 server.dirty,
3574 server.bgsaveinprogress,
3575 server.lastsave,
3576 server.stat_numconnections,
3577 server.stat_numcommands,
3578 server.masterhost == NULL ? "master" : "slave"
3579 );
3580 if (server.masterhost) {
3581 info = sdscatprintf(info,
3582 "master_host:%s\r\n"
3583 "master_port:%d\r\n"
3584 "master_link_status:%s\r\n"
3585 "master_last_io_seconds_ago:%d\r\n"
3586 ,server.masterhost,
3587 server.masterport,
3588 (server.replstate == REDIS_REPL_CONNECTED) ?
3589 "up" : "down",
3590 (int)(time(NULL)-server.master->lastinteraction)
3591 );
3592 }
3593 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
3594 addReplySds(c,info);
3595 addReply(c,shared.crlf);
3596 }
3597
3598 static void monitorCommand(redisClient *c) {
3599 /* ignore MONITOR if aleady slave or in monitor mode */
3600 if (c->flags & REDIS_SLAVE) return;
3601
3602 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3603 c->slaveseldb = 0;
3604 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3605 addReply(c,shared.ok);
3606 }
3607
3608 /* ================================= Expire ================================= */
3609 static int removeExpire(redisDb *db, robj *key) {
3610 if (dictDelete(db->expires,key) == DICT_OK) {
3611 return 1;
3612 } else {
3613 return 0;
3614 }
3615 }
3616
3617 static int setExpire(redisDb *db, robj *key, time_t when) {
3618 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
3619 return 0;
3620 } else {
3621 incrRefCount(key);
3622 return 1;
3623 }
3624 }
3625
3626 /* Return the expire time of the specified key, or -1 if no expire
3627 * is associated with this key (i.e. the key is non volatile) */
3628 static time_t getExpire(redisDb *db, robj *key) {
3629 dictEntry *de;
3630
3631 /* No expire? return ASAP */
3632 if (dictSize(db->expires) == 0 ||
3633 (de = dictFind(db->expires,key)) == NULL) return -1;
3634
3635 return (time_t) dictGetEntryVal(de);
3636 }
3637
3638 static int expireIfNeeded(redisDb *db, robj *key) {
3639 time_t when;
3640 dictEntry *de;
3641
3642 /* No expire? return ASAP */
3643 if (dictSize(db->expires) == 0 ||
3644 (de = dictFind(db->expires,key)) == NULL) return 0;
3645
3646 /* Lookup the expire */
3647 when = (time_t) dictGetEntryVal(de);
3648 if (time(NULL) <= when) return 0;
3649
3650 /* Delete the key */
3651 dictDelete(db->expires,key);
3652 return dictDelete(db->dict,key) == DICT_OK;
3653 }
3654
3655 static int deleteIfVolatile(redisDb *db, robj *key) {
3656 dictEntry *de;
3657
3658 /* No expire? return ASAP */
3659 if (dictSize(db->expires) == 0 ||
3660 (de = dictFind(db->expires,key)) == NULL) return 0;
3661
3662 /* Delete the key */
3663 server.dirty++;
3664 dictDelete(db->expires,key);
3665 return dictDelete(db->dict,key) == DICT_OK;
3666 }
3667
3668 static void expireCommand(redisClient *c) {
3669 dictEntry *de;
3670 int seconds = atoi(c->argv[2]->ptr);
3671
3672 de = dictFind(c->db->dict,c->argv[1]);
3673 if (de == NULL) {
3674 addReply(c,shared.czero);
3675 return;
3676 }
3677 if (seconds <= 0) {
3678 addReply(c, shared.czero);
3679 return;
3680 } else {
3681 time_t when = time(NULL)+seconds;
3682 if (setExpire(c->db,c->argv[1],when))
3683 addReply(c,shared.cone);
3684 else
3685 addReply(c,shared.czero);
3686 return;
3687 }
3688 }
3689
3690 static void ttlCommand(redisClient *c) {
3691 time_t expire;
3692 int ttl = -1;
3693
3694 expire = getExpire(c->db,c->argv[1]);
3695 if (expire != -1) {
3696 ttl = (int) (expire-time(NULL));
3697 if (ttl < 0) ttl = -1;
3698 }
3699 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
3700 }
3701
3702 /* =============================== Replication ============================= */
3703
3704 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
3705 ssize_t nwritten, ret = size;
3706 time_t start = time(NULL);
3707
3708 timeout++;
3709 while(size) {
3710 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
3711 nwritten = write(fd,ptr,size);
3712 if (nwritten == -1) return -1;
3713 ptr += nwritten;
3714 size -= nwritten;
3715 }
3716 if ((time(NULL)-start) > timeout) {
3717 errno = ETIMEDOUT;
3718 return -1;
3719 }
3720 }
3721 return ret;
3722 }
3723
3724 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
3725 ssize_t nread, totread = 0;
3726 time_t start = time(NULL);
3727
3728 timeout++;
3729 while(size) {
3730 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
3731 nread = read(fd,ptr,size);
3732 if (nread == -1) return -1;
3733 ptr += nread;
3734 size -= nread;
3735 totread += nread;
3736 }
3737 if ((time(NULL)-start) > timeout) {
3738 errno = ETIMEDOUT;
3739 return -1;
3740 }
3741 }
3742 return totread;
3743 }
3744
3745 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
3746 ssize_t nread = 0;
3747
3748 size--;
3749 while(size) {
3750 char c;
3751
3752 if (syncRead(fd,&c,1,timeout) == -1) return -1;
3753 if (c == '\n') {
3754 *ptr = '\0';
3755 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
3756 return nread;
3757 } else {
3758 *ptr++ = c;
3759 *ptr = '\0';
3760 nread++;
3761 }
3762 }
3763 return nread;
3764 }
3765
3766 static void syncCommand(redisClient *c) {
3767 /* ignore SYNC if aleady slave or in monitor mode */
3768 if (c->flags & REDIS_SLAVE) return;
3769
3770 /* SYNC can't be issued when the server has pending data to send to
3771 * the client about already issued commands. We need a fresh reply
3772 * buffer registering the differences between the BGSAVE and the current
3773 * dataset, so that we can copy to other slaves if needed. */
3774 if (listLength(c->reply) != 0) {
3775 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3776 return;
3777 }
3778
3779 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
3780 /* Here we need to check if there is a background saving operation
3781 * in progress, or if it is required to start one */
3782 if (server.bgsaveinprogress) {
3783 /* Ok a background save is in progress. Let's check if it is a good
3784 * one for replication, i.e. if there is another slave that is
3785 * registering differences since the server forked to save */
3786 redisClient *slave;
3787 listNode *ln;
3788
3789 listRewind(server.slaves);
3790 while((ln = listYield(server.slaves))) {
3791 slave = ln->value;
3792 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
3793 }
3794 if (ln) {
3795 /* Perfect, the server is already registering differences for
3796 * another slave. Set the right state, and copy the buffer. */
3797 listRelease(c->reply);
3798 c->reply = listDup(slave->reply);
3799 if (!c->reply) oom("listDup copying slave reply list");
3800 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3801 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
3802 } else {
3803 /* No way, we need to wait for the next BGSAVE in order to
3804 * register differences */
3805 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
3806 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
3807 }
3808 } else {
3809 /* Ok we don't have a BGSAVE in progress, let's start one */
3810 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
3811 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3812 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
3813 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
3814 return;
3815 }
3816 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3817 }
3818 c->repldbfd = -1;
3819 c->flags |= REDIS_SLAVE;
3820 c->slaveseldb = 0;
3821 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
3822 return;
3823 }
3824
3825 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
3826 redisClient *slave = privdata;
3827 REDIS_NOTUSED(el);
3828 REDIS_NOTUSED(mask);
3829 char buf[REDIS_IOBUF_LEN];
3830 ssize_t nwritten, buflen;
3831
3832 if (slave->repldboff == 0) {
3833 /* Write the bulk write count before to transfer the DB. In theory here
3834 * we don't know how much room there is in the output buffer of the
3835 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3836 * operations) will never be smaller than the few bytes we need. */
3837 sds bulkcount;
3838
3839 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3840 slave->repldbsize);
3841 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
3842 {
3843 sdsfree(bulkcount);
3844 freeClient(slave);
3845 return;
3846 }
3847 sdsfree(bulkcount);
3848 }
3849 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
3850 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
3851 if (buflen <= 0) {
3852 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
3853 (buflen == 0) ? "premature EOF" : strerror(errno));
3854 freeClient(slave);
3855 return;
3856 }
3857 if ((nwritten = write(fd,buf,buflen)) == -1) {
3858 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
3859 strerror(errno));
3860 freeClient(slave);
3861 return;
3862 }
3863 slave->repldboff += nwritten;
3864 if (slave->repldboff == slave->repldbsize) {
3865 close(slave->repldbfd);
3866 slave->repldbfd = -1;
3867 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3868 slave->replstate = REDIS_REPL_ONLINE;
3869 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
3870 sendReplyToClient, slave, NULL) == AE_ERR) {
3871 freeClient(slave);
3872 return;
3873 }
3874 addReplySds(slave,sdsempty());
3875 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
3876 }
3877 }
3878
3879 static void updateSalvesWaitingBgsave(int bgsaveerr) {
3880 listNode *ln;
3881 int startbgsave = 0;
3882
3883 listRewind(server.slaves);
3884 while((ln = listYield(server.slaves))) {
3885 redisClient *slave = ln->value;
3886
3887 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
3888 startbgsave = 1;
3889 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3890 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
3891 struct redis_stat buf;
3892
3893 if (bgsaveerr != REDIS_OK) {
3894 freeClient(slave);
3895 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
3896 continue;
3897 }
3898 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
3899 redis_fstat(slave->repldbfd,&buf) == -1) {
3900 freeClient(slave);
3901 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
3902 continue;
3903 }
3904 slave->repldboff = 0;
3905 slave->repldbsize = buf.st_size;
3906 slave->replstate = REDIS_REPL_SEND_BULK;
3907 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3908 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
3909 freeClient(slave);
3910 continue;
3911 }
3912 }
3913 }
3914 if (startbgsave) {
3915 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3916 listRewind(server.slaves);
3917 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
3918 while((ln = listYield(server.slaves))) {
3919 redisClient *slave = ln->value;
3920
3921 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
3922 freeClient(slave);
3923 }
3924 }
3925 }
3926 }
3927
3928 static int syncWithMaster(void) {
3929 char buf[1024], tmpfile[256];
3930 int dumpsize;
3931 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
3932 int dfd;
3933
3934 if (fd == -1) {
3935 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
3936 strerror(errno));
3937 return REDIS_ERR;
3938 }
3939 /* Issue the SYNC command */
3940 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
3941 close(fd);
3942 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
3943 strerror(errno));
3944 return REDIS_ERR;
3945 }
3946 /* Read the bulk write count */
3947 if (syncReadLine(fd,buf,1024,3600) == -1) {
3948 close(fd);
3949 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
3950 strerror(errno));
3951 return REDIS_ERR;
3952 }
3953 dumpsize = atoi(buf+1);
3954 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
3955 /* Read the bulk write data on a temp file */
3956 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
3957 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
3958 if (dfd == -1) {
3959 close(fd);
3960 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
3961 return REDIS_ERR;
3962 }
3963 while(dumpsize) {
3964 int nread, nwritten;
3965
3966 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
3967 if (nread == -1) {
3968 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
3969 strerror(errno));
3970 close(fd);
3971 close(dfd);
3972 return REDIS_ERR;
3973 }
3974 nwritten = write(dfd,buf,nread);
3975 if (nwritten == -1) {
3976 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
3977 close(fd);
3978 close(dfd);
3979 return REDIS_ERR;
3980 }
3981 dumpsize -= nread;
3982 }
3983 close(dfd);
3984 if (rename(tmpfile,server.dbfilename) == -1) {
3985 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
3986 unlink(tmpfile);
3987 close(fd);
3988 return REDIS_ERR;
3989 }
3990 emptyDb();
3991 if (rdbLoad(server.dbfilename) != REDIS_OK) {
3992 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
3993 close(fd);
3994 return REDIS_ERR;
3995 }
3996 server.master = createClient(fd);
3997 server.master->flags |= REDIS_MASTER;
3998 server.replstate = REDIS_REPL_CONNECTED;
3999 return REDIS_OK;
4000 }
4001
4002 static void slaveofCommand(redisClient *c) {
4003 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4004 !strcasecmp(c->argv[2]->ptr,"one")) {
4005 if (server.masterhost) {
4006 sdsfree(server.masterhost);
4007 server.masterhost = NULL;
4008 if (server.master) freeClient(server.master);
4009 server.replstate = REDIS_REPL_NONE;
4010 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4011 }
4012 } else {
4013 sdsfree(server.masterhost);
4014 server.masterhost = sdsdup(c->argv[1]->ptr);
4015 server.masterport = atoi(c->argv[2]->ptr);
4016 if (server.master) freeClient(server.master);
4017 server.replstate = REDIS_REPL_CONNECT;
4018 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4019 server.masterhost, server.masterport);
4020 }
4021 addReply(c,shared.ok);
4022 }
4023
4024 /* ============================ Maxmemory directive ======================== */
4025
4026 /* This function gets called when 'maxmemory' is set on the config file to limit
4027 * the max memory used by the server, and we are out of memory.
4028 * This function will try to, in order:
4029 *
4030 * - Free objects from the free list
4031 * - Try to remove keys with an EXPIRE set
4032 *
4033 * It is not possible to free enough memory to reach used-memory < maxmemory
4034 * the server will start refusing commands that will enlarge even more the
4035 * memory usage.
4036 */
4037 static void freeMemoryIfNeeded(void) {
4038 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4039 if (listLength(server.objfreelist)) {
4040 robj *o;
4041
4042 listNode *head = listFirst(server.objfreelist);
4043 o = listNodeValue(head);
4044 listDelNode(server.objfreelist,head);
4045 zfree(o);
4046 } else {
4047 int j, k, freed = 0;
4048
4049 for (j = 0; j < server.dbnum; j++) {
4050 int minttl = -1;
4051 robj *minkey = NULL;
4052 struct dictEntry *de;
4053
4054 if (dictSize(server.db[j].expires)) {
4055 freed = 1;
4056 /* From a sample of three keys drop the one nearest to
4057 * the natural expire */
4058 for (k = 0; k < 3; k++) {
4059 time_t t;
4060
4061 de = dictGetRandomKey(server.db[j].expires);
4062 t = (time_t) dictGetEntryVal(de);
4063 if (minttl == -1 || t < minttl) {
4064 minkey = dictGetEntryKey(de);
4065 minttl = t;
4066 }
4067 }
4068 deleteKey(server.db+j,minkey);
4069 }
4070 }
4071 if (!freed) return; /* nothing to free... */
4072 }
4073 }
4074 }
4075
4076 /* ================================= Debugging ============================== */
4077
4078 static void debugCommand(redisClient *c) {
4079 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4080 *((char*)-1) = 'x';
4081 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4082 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4083 robj *key, *val;
4084
4085 if (!de) {
4086 addReply(c,shared.nokeyerr);
4087 return;
4088 }
4089 key = dictGetEntryKey(de);
4090 val = dictGetEntryVal(de);
4091 addReplySds(c,sdscatprintf(sdsempty(),
4092 "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
4093 key, key->refcount, val, val->refcount));
4094 } else {
4095 addReplySds(c,sdsnew(
4096 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4097 }
4098 }
4099
4100 /* =================================== Main! ================================ */
4101
4102 #ifdef __linux__
4103 int linuxOvercommitMemoryValue(void) {
4104 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4105 char buf[64];
4106
4107 if (!fp) return -1;
4108 if (fgets(buf,64,fp) == NULL) {
4109 fclose(fp);
4110 return -1;
4111 }
4112 fclose(fp);
4113
4114 return atoi(buf);
4115 }
4116
4117 void linuxOvercommitMemoryWarning(void) {
4118 if (linuxOvercommitMemoryValue() == 0) {
4119 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4120 }
4121 }
4122 #endif /* __linux__ */
4123
4124 static void daemonize(void) {
4125 int fd;
4126 FILE *fp;
4127
4128 if (fork() != 0) exit(0); /* parent exits */
4129 setsid(); /* create a new session */
4130
4131 /* Every output goes to /dev/null. If Redis is daemonized but
4132 * the 'logfile' is set to 'stdout' in the configuration file
4133 * it will not log at all. */
4134 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4135 dup2(fd, STDIN_FILENO);
4136 dup2(fd, STDOUT_FILENO);
4137 dup2(fd, STDERR_FILENO);
4138 if (fd > STDERR_FILENO) close(fd);
4139 }
4140 /* Try to write the pid file */
4141 fp = fopen(server.pidfile,"w");
4142 if (fp) {
4143 fprintf(fp,"%d\n",getpid());
4144 fclose(fp);
4145 }
4146 }
4147
4148 int main(int argc, char **argv) {
4149 #ifdef __linux__
4150 linuxOvercommitMemoryWarning();
4151 #endif
4152
4153 initServerConfig();
4154 if (argc == 2) {
4155 ResetServerSaveParams();
4156 loadServerConfig(argv[1]);
4157 } else if (argc > 2) {
4158 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4159 exit(1);
4160 } else {
4161 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4162 }
4163 initServer();
4164 if (server.daemonize) daemonize();
4165 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4166 if (rdbLoad(server.dbfilename) == REDIS_OK)
4167 redisLog(REDIS_NOTICE,"DB loaded from disk");
4168 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4169 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4170 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4171 aeMain(server.el);
4172 aeDeleteEventLoop(server.el);
4173 return 0;
4174 }