]> git.saurik.com Git - redis.git/blob - redis.c
2761541c95236c388cff19e8a0181a4769e078e3
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "0.101"
31
32 #include "fmacros.h"
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 #include <signal.h>
40 #include <sys/wait.h>
41 #include <errno.h>
42 #include <assert.h>
43 #include <ctype.h>
44 #include <stdarg.h>
45 #include <inttypes.h>
46 #include <arpa/inet.h>
47 #include <sys/stat.h>
48 #include <fcntl.h>
49 #include <sys/time.h>
50 #include <sys/resource.h>
51 #include <limits.h>
52
53 #include "ae.h" /* Event driven programming library */
54 #include "sds.h" /* Dynamic safe strings */
55 #include "anet.h" /* Networking the easy way */
56 #include "dict.h" /* Hash tables */
57 #include "adlist.h" /* Linked lists */
58 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
59 #include "lzf.h" /* LZF compression library */
60 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
61
62 /* Error codes */
63 #define REDIS_OK 0
64 #define REDIS_ERR -1
65
66 /* Static server configuration */
67 #define REDIS_SERVERPORT 6379 /* TCP port */
68 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
69 #define REDIS_IOBUF_LEN 1024
70 #define REDIS_LOADBUF_LEN 1024
71 #define REDIS_STATIC_ARGS 4
72 #define REDIS_DEFAULT_DBNUM 16
73 #define REDIS_CONFIGLINE_MAX 1024
74 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
75 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
76 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
77
78 /* Hash table parameters */
79 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
80 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
81
82 /* Command flags */
83 #define REDIS_CMD_BULK 1 /* Bulk write command */
84 #define REDIS_CMD_INLINE 2 /* Inline command */
85 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
86 this flags will return an error when the 'maxmemory' option is set in the
87 config file and the server is using more than maxmemory bytes of memory.
88 In short this commands are denied on low memory conditions. */
89 #define REDIS_CMD_DENYOOM 4
90
91 /* Object types */
92 #define REDIS_STRING 0
93 #define REDIS_LIST 1
94 #define REDIS_SET 2
95 #define REDIS_HASH 3
96
97 /* Object types only used for dumping to disk */
98 #define REDIS_EXPIRETIME 253
99 #define REDIS_SELECTDB 254
100 #define REDIS_EOF 255
101
102 /* Defines related to the dump file format. To store 32 bits lengths for short
103 * keys requires a lot of space, so we check the most significant 2 bits of
104 * the first byte to interpreter the length:
105 *
106 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
107 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
108 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
109 * 11|000000 this means: specially encoded object will follow. The six bits
110 * number specify the kind of object that follows.
111 * See the REDIS_RDB_ENC_* defines.
112 *
113 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
114 * values, will fit inside. */
115 #define REDIS_RDB_6BITLEN 0
116 #define REDIS_RDB_14BITLEN 1
117 #define REDIS_RDB_32BITLEN 2
118 #define REDIS_RDB_ENCVAL 3
119 #define REDIS_RDB_LENERR UINT_MAX
120
121 /* When a length of a string object stored on disk has the first two bits
122 * set, the remaining two bits specify a special encoding for the object
123 * accordingly to the following defines: */
124 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
125 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
126 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
127 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
128
129 /* Client flags */
130 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
131 #define REDIS_SLAVE 2 /* This client is a slave server */
132 #define REDIS_MASTER 4 /* This client is a master server */
133 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
134
135 /* Slave replication state - slave side */
136 #define REDIS_REPL_NONE 0 /* No active replication */
137 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
138 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
139
140 /* Slave replication state - from the point of view of master
141 * Note that in SEND_BULK and ONLINE state the slave receives new updates
142 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
143 * to start the next background saving in order to send updates to it. */
144 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
145 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
146 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
147 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
148
149 /* List related stuff */
150 #define REDIS_HEAD 0
151 #define REDIS_TAIL 1
152
153 /* Sort operations */
154 #define REDIS_SORT_GET 0
155 #define REDIS_SORT_DEL 1
156 #define REDIS_SORT_INCR 2
157 #define REDIS_SORT_DECR 3
158 #define REDIS_SORT_ASC 4
159 #define REDIS_SORT_DESC 5
160 #define REDIS_SORTKEY_MAX 1024
161
162 /* Log levels */
163 #define REDIS_DEBUG 0
164 #define REDIS_NOTICE 1
165 #define REDIS_WARNING 2
166
167 /* Anti-warning macro... */
168 #define REDIS_NOTUSED(V) ((void) V)
169
170 /*================================= Data types ============================== */
171
172 /* A redis object, that is a type able to hold a string / list / set */
173 typedef struct redisObject {
174 void *ptr;
175 int type;
176 int refcount;
177 } robj;
178
179 typedef struct redisDb {
180 dict *dict;
181 dict *expires;
182 int id;
183 } redisDb;
184
185 /* With multiplexing we need to take per-clinet state.
186 * Clients are taken in a liked list. */
187 typedef struct redisClient {
188 int fd;
189 redisDb *db;
190 int dictid;
191 sds querybuf;
192 robj **argv;
193 int argc;
194 int bulklen; /* bulk read len. -1 if not in bulk read mode */
195 list *reply;
196 int sentlen;
197 time_t lastinteraction; /* time of the last interaction, used for timeout */
198 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
199 int slaveseldb; /* slave selected db, if this client is a slave */
200 int authenticated; /* when requirepass is non-NULL */
201 int replstate; /* replication state if this is a slave */
202 int repldbfd; /* replication DB file descriptor */
203 long repldboff; /* replication DB file offset */
204 off_t repldbsize; /* replication DB file size */
205 } redisClient;
206
207 struct saveparam {
208 time_t seconds;
209 int changes;
210 };
211
212 /* Global server state structure */
213 struct redisServer {
214 int port;
215 int fd;
216 redisDb *db;
217 dict *sharingpool;
218 unsigned int sharingpoolsize;
219 long long dirty; /* changes to DB from the last save */
220 list *clients;
221 list *slaves, *monitors;
222 char neterr[ANET_ERR_LEN];
223 aeEventLoop *el;
224 int cronloops; /* number of times the cron function run */
225 list *objfreelist; /* A list of freed objects to avoid malloc() */
226 time_t lastsave; /* Unix time of last save succeeede */
227 size_t usedmemory; /* Used memory in megabytes */
228 /* Fields used only for stats */
229 time_t stat_starttime; /* server start time */
230 long long stat_numcommands; /* number of processed commands */
231 long long stat_numconnections; /* number of connections received */
232 /* Configuration */
233 int verbosity;
234 int glueoutputbuf;
235 int maxidletime;
236 int dbnum;
237 int daemonize;
238 char *pidfile;
239 int bgsaveinprogress;
240 struct saveparam *saveparams;
241 int saveparamslen;
242 char *logfile;
243 char *bindaddr;
244 char *dbfilename;
245 char *requirepass;
246 int shareobjects;
247 /* Replication related */
248 int isslave;
249 char *masterhost;
250 int masterport;
251 redisClient *master; /* client that is master for this slave */
252 int replstate;
253 unsigned int maxclients;
254 unsigned int maxmemory;
255 /* Sort parameters - qsort_r() is only available under BSD so we
256 * have to take this state global, in order to pass it to sortCompare() */
257 int sort_desc;
258 int sort_alpha;
259 int sort_bypattern;
260 };
261
262 typedef void redisCommandProc(redisClient *c);
263 struct redisCommand {
264 char *name;
265 redisCommandProc *proc;
266 int arity;
267 int flags;
268 };
269
270 typedef struct _redisSortObject {
271 robj *obj;
272 union {
273 double score;
274 robj *cmpobj;
275 } u;
276 } redisSortObject;
277
278 typedef struct _redisSortOperation {
279 int type;
280 robj *pattern;
281 } redisSortOperation;
282
283 struct sharedObjectsStruct {
284 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
285 *colon, *nullbulk, *nullmultibulk,
286 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
287 *outofrangeerr, *plus,
288 *select0, *select1, *select2, *select3, *select4,
289 *select5, *select6, *select7, *select8, *select9;
290 } shared;
291
292 /*================================ Prototypes =============================== */
293
294 static void freeStringObject(robj *o);
295 static void freeListObject(robj *o);
296 static void freeSetObject(robj *o);
297 static void decrRefCount(void *o);
298 static robj *createObject(int type, void *ptr);
299 static void freeClient(redisClient *c);
300 static int rdbLoad(char *filename);
301 static void addReply(redisClient *c, robj *obj);
302 static void addReplySds(redisClient *c, sds s);
303 static void incrRefCount(robj *o);
304 static int rdbSaveBackground(char *filename);
305 static robj *createStringObject(char *ptr, size_t len);
306 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
307 static int syncWithMaster(void);
308 static robj *tryObjectSharing(robj *o);
309 static int removeExpire(redisDb *db, robj *key);
310 static int expireIfNeeded(redisDb *db, robj *key);
311 static int deleteIfVolatile(redisDb *db, robj *key);
312 static int deleteKey(redisDb *db, robj *key);
313 static time_t getExpire(redisDb *db, robj *key);
314 static int setExpire(redisDb *db, robj *key, time_t when);
315 static void updateSalvesWaitingBgsave(int bgsaveerr);
316 static void freeMemoryIfNeeded(void);
317
318 static void authCommand(redisClient *c);
319 static void pingCommand(redisClient *c);
320 static void echoCommand(redisClient *c);
321 static void setCommand(redisClient *c);
322 static void setnxCommand(redisClient *c);
323 static void getCommand(redisClient *c);
324 static void delCommand(redisClient *c);
325 static void existsCommand(redisClient *c);
326 static void incrCommand(redisClient *c);
327 static void decrCommand(redisClient *c);
328 static void incrbyCommand(redisClient *c);
329 static void decrbyCommand(redisClient *c);
330 static void selectCommand(redisClient *c);
331 static void randomkeyCommand(redisClient *c);
332 static void keysCommand(redisClient *c);
333 static void dbsizeCommand(redisClient *c);
334 static void lastsaveCommand(redisClient *c);
335 static void saveCommand(redisClient *c);
336 static void bgsaveCommand(redisClient *c);
337 static void shutdownCommand(redisClient *c);
338 static void moveCommand(redisClient *c);
339 static void renameCommand(redisClient *c);
340 static void renamenxCommand(redisClient *c);
341 static void lpushCommand(redisClient *c);
342 static void rpushCommand(redisClient *c);
343 static void lpopCommand(redisClient *c);
344 static void rpopCommand(redisClient *c);
345 static void llenCommand(redisClient *c);
346 static void lindexCommand(redisClient *c);
347 static void lrangeCommand(redisClient *c);
348 static void ltrimCommand(redisClient *c);
349 static void typeCommand(redisClient *c);
350 static void lsetCommand(redisClient *c);
351 static void saddCommand(redisClient *c);
352 static void sremCommand(redisClient *c);
353 static void smoveCommand(redisClient *c);
354 static void sismemberCommand(redisClient *c);
355 static void scardCommand(redisClient *c);
356 static void sinterCommand(redisClient *c);
357 static void sinterstoreCommand(redisClient *c);
358 static void sunionCommand(redisClient *c);
359 static void sunionstoreCommand(redisClient *c);
360 static void sdiffCommand(redisClient *c);
361 static void sdiffstoreCommand(redisClient *c);
362 static void syncCommand(redisClient *c);
363 static void flushdbCommand(redisClient *c);
364 static void flushallCommand(redisClient *c);
365 static void sortCommand(redisClient *c);
366 static void lremCommand(redisClient *c);
367 static void infoCommand(redisClient *c);
368 static void mgetCommand(redisClient *c);
369 static void monitorCommand(redisClient *c);
370 static void expireCommand(redisClient *c);
371 static void getSetCommand(redisClient *c);
372 static void ttlCommand(redisClient *c);
373 static void slaveofCommand(redisClient *c);
374
375 /*================================= Globals ================================= */
376
377 /* Global vars */
378 static struct redisServer server; /* server global state */
379 static struct redisCommand cmdTable[] = {
380 {"get",getCommand,2,REDIS_CMD_INLINE},
381 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
382 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
383 {"del",delCommand,-2,REDIS_CMD_INLINE},
384 {"exists",existsCommand,2,REDIS_CMD_INLINE},
385 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
386 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
387 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
388 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
389 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
390 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
391 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
392 {"llen",llenCommand,2,REDIS_CMD_INLINE},
393 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
394 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
395 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
396 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
397 {"lrem",lremCommand,4,REDIS_CMD_BULK},
398 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
399 {"srem",sremCommand,3,REDIS_CMD_BULK},
400 {"smove",smoveCommand,4,REDIS_CMD_BULK},
401 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
402 {"scard",scardCommand,2,REDIS_CMD_INLINE},
403 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
404 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
405 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
406 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
407 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
408 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
409 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
410 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
411 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
412 {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
413 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
414 {"select",selectCommand,2,REDIS_CMD_INLINE},
415 {"move",moveCommand,3,REDIS_CMD_INLINE},
416 {"rename",renameCommand,3,REDIS_CMD_INLINE},
417 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
418 {"expire",expireCommand,3,REDIS_CMD_INLINE},
419 {"keys",keysCommand,2,REDIS_CMD_INLINE},
420 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
421 {"auth",authCommand,2,REDIS_CMD_INLINE},
422 {"ping",pingCommand,1,REDIS_CMD_INLINE},
423 {"echo",echoCommand,2,REDIS_CMD_BULK},
424 {"save",saveCommand,1,REDIS_CMD_INLINE},
425 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
426 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
427 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
428 {"type",typeCommand,2,REDIS_CMD_INLINE},
429 {"sync",syncCommand,1,REDIS_CMD_INLINE},
430 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
431 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
432 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
433 {"info",infoCommand,1,REDIS_CMD_INLINE},
434 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
435 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
436 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
437 {NULL,NULL,0,0}
438 };
439
440 /*============================ Utility functions ============================ */
441
442 /* Glob-style pattern matching. */
443 int stringmatchlen(const char *pattern, int patternLen,
444 const char *string, int stringLen, int nocase)
445 {
446 while(patternLen) {
447 switch(pattern[0]) {
448 case '*':
449 while (pattern[1] == '*') {
450 pattern++;
451 patternLen--;
452 }
453 if (patternLen == 1)
454 return 1; /* match */
455 while(stringLen) {
456 if (stringmatchlen(pattern+1, patternLen-1,
457 string, stringLen, nocase))
458 return 1; /* match */
459 string++;
460 stringLen--;
461 }
462 return 0; /* no match */
463 break;
464 case '?':
465 if (stringLen == 0)
466 return 0; /* no match */
467 string++;
468 stringLen--;
469 break;
470 case '[':
471 {
472 int not, match;
473
474 pattern++;
475 patternLen--;
476 not = pattern[0] == '^';
477 if (not) {
478 pattern++;
479 patternLen--;
480 }
481 match = 0;
482 while(1) {
483 if (pattern[0] == '\\') {
484 pattern++;
485 patternLen--;
486 if (pattern[0] == string[0])
487 match = 1;
488 } else if (pattern[0] == ']') {
489 break;
490 } else if (patternLen == 0) {
491 pattern--;
492 patternLen++;
493 break;
494 } else if (pattern[1] == '-' && patternLen >= 3) {
495 int start = pattern[0];
496 int end = pattern[2];
497 int c = string[0];
498 if (start > end) {
499 int t = start;
500 start = end;
501 end = t;
502 }
503 if (nocase) {
504 start = tolower(start);
505 end = tolower(end);
506 c = tolower(c);
507 }
508 pattern += 2;
509 patternLen -= 2;
510 if (c >= start && c <= end)
511 match = 1;
512 } else {
513 if (!nocase) {
514 if (pattern[0] == string[0])
515 match = 1;
516 } else {
517 if (tolower((int)pattern[0]) == tolower((int)string[0]))
518 match = 1;
519 }
520 }
521 pattern++;
522 patternLen--;
523 }
524 if (not)
525 match = !match;
526 if (!match)
527 return 0; /* no match */
528 string++;
529 stringLen--;
530 break;
531 }
532 case '\\':
533 if (patternLen >= 2) {
534 pattern++;
535 patternLen--;
536 }
537 /* fall through */
538 default:
539 if (!nocase) {
540 if (pattern[0] != string[0])
541 return 0; /* no match */
542 } else {
543 if (tolower((int)pattern[0]) != tolower((int)string[0]))
544 return 0; /* no match */
545 }
546 string++;
547 stringLen--;
548 break;
549 }
550 pattern++;
551 patternLen--;
552 if (stringLen == 0) {
553 while(*pattern == '*') {
554 pattern++;
555 patternLen--;
556 }
557 break;
558 }
559 }
560 if (patternLen == 0 && stringLen == 0)
561 return 1;
562 return 0;
563 }
564
565 void redisLog(int level, const char *fmt, ...)
566 {
567 va_list ap;
568 FILE *fp;
569
570 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
571 if (!fp) return;
572
573 va_start(ap, fmt);
574 if (level >= server.verbosity) {
575 char *c = ".-*";
576 char buf[64];
577 time_t now;
578
579 now = time(NULL);
580 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
581 fprintf(fp,"%s %c ",buf,c[level]);
582 vfprintf(fp, fmt, ap);
583 fprintf(fp,"\n");
584 fflush(fp);
585 }
586 va_end(ap);
587
588 if (server.logfile) fclose(fp);
589 }
590
591 /*====================== Hash table type implementation ==================== */
592
593 /* This is an hash table type that uses the SDS dynamic strings libary as
594 * keys and radis objects as values (objects can hold SDS strings,
595 * lists, sets). */
596
597 static int sdsDictKeyCompare(void *privdata, const void *key1,
598 const void *key2)
599 {
600 int l1,l2;
601 DICT_NOTUSED(privdata);
602
603 l1 = sdslen((sds)key1);
604 l2 = sdslen((sds)key2);
605 if (l1 != l2) return 0;
606 return memcmp(key1, key2, l1) == 0;
607 }
608
609 static void dictRedisObjectDestructor(void *privdata, void *val)
610 {
611 DICT_NOTUSED(privdata);
612
613 decrRefCount(val);
614 }
615
616 static int dictSdsKeyCompare(void *privdata, const void *key1,
617 const void *key2)
618 {
619 const robj *o1 = key1, *o2 = key2;
620 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
621 }
622
623 static unsigned int dictSdsHash(const void *key) {
624 const robj *o = key;
625 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
626 }
627
628 static dictType setDictType = {
629 dictSdsHash, /* hash function */
630 NULL, /* key dup */
631 NULL, /* val dup */
632 dictSdsKeyCompare, /* key compare */
633 dictRedisObjectDestructor, /* key destructor */
634 NULL /* val destructor */
635 };
636
637 static dictType hashDictType = {
638 dictSdsHash, /* hash function */
639 NULL, /* key dup */
640 NULL, /* val dup */
641 dictSdsKeyCompare, /* key compare */
642 dictRedisObjectDestructor, /* key destructor */
643 dictRedisObjectDestructor /* val destructor */
644 };
645
646 /* ========================= Random utility functions ======================= */
647
648 /* Redis generally does not try to recover from out of memory conditions
649 * when allocating objects or strings, it is not clear if it will be possible
650 * to report this condition to the client since the networking layer itself
651 * is based on heap allocation for send buffers, so we simply abort.
652 * At least the code will be simpler to read... */
653 static void oom(const char *msg) {
654 fprintf(stderr, "%s: Out of memory\n",msg);
655 fflush(stderr);
656 sleep(1);
657 abort();
658 }
659
660 /* ====================== Redis server networking stuff ===================== */
661 void closeTimedoutClients(void) {
662 redisClient *c;
663 listNode *ln;
664 time_t now = time(NULL);
665
666 listRewind(server.clients);
667 while ((ln = listYield(server.clients)) != NULL) {
668 c = listNodeValue(ln);
669 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
670 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
671 (now - c->lastinteraction > server.maxidletime)) {
672 redisLog(REDIS_DEBUG,"Closing idle client");
673 freeClient(c);
674 }
675 }
676 }
677
678 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
679 * we resize the hash table to save memory */
680 void tryResizeHashTables(void) {
681 int j;
682
683 for (j = 0; j < server.dbnum; j++) {
684 long long size, used;
685
686 size = dictSlots(server.db[j].dict);
687 used = dictSize(server.db[j].dict);
688 if (size && used && size > REDIS_HT_MINSLOTS &&
689 (used*100/size < REDIS_HT_MINFILL)) {
690 redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
691 dictResize(server.db[j].dict);
692 redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
693 }
694 }
695 }
696
697 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
698 int j, loops = server.cronloops++;
699 REDIS_NOTUSED(eventLoop);
700 REDIS_NOTUSED(id);
701 REDIS_NOTUSED(clientData);
702
703 /* Update the global state with the amount of used memory */
704 server.usedmemory = zmalloc_used_memory();
705
706 /* Show some info about non-empty databases */
707 for (j = 0; j < server.dbnum; j++) {
708 long long size, used, vkeys;
709
710 size = dictSlots(server.db[j].dict);
711 used = dictSize(server.db[j].dict);
712 vkeys = dictSize(server.db[j].expires);
713 if (!(loops % 5) && used > 0) {
714 redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size);
715 /* dictPrintStats(server.dict); */
716 }
717 }
718
719 /* We don't want to resize the hash tables while a bacground saving
720 * is in progress: the saving child is created using fork() that is
721 * implemented with a copy-on-write semantic in most modern systems, so
722 * if we resize the HT while there is the saving child at work actually
723 * a lot of memory movements in the parent will cause a lot of pages
724 * copied. */
725 if (!server.bgsaveinprogress) tryResizeHashTables();
726
727 /* Show information about connected clients */
728 if (!(loops % 5)) {
729 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use",
730 listLength(server.clients)-listLength(server.slaves),
731 listLength(server.slaves),
732 server.usedmemory,
733 dictSize(server.sharingpool));
734 }
735
736 /* Close connections of timedout clients */
737 if (server.maxidletime && !(loops % 10))
738 closeTimedoutClients();
739
740 /* Check if a background saving in progress terminated */
741 if (server.bgsaveinprogress) {
742 int statloc;
743 /* XXX: TODO handle the case of the saving child killed */
744 if (wait4(-1,&statloc,WNOHANG,NULL)) {
745 int exitcode = WEXITSTATUS(statloc);
746 if (exitcode == 0) {
747 redisLog(REDIS_NOTICE,
748 "Background saving terminated with success");
749 server.dirty = 0;
750 server.lastsave = time(NULL);
751 } else {
752 redisLog(REDIS_WARNING,
753 "Background saving error");
754 }
755 server.bgsaveinprogress = 0;
756 updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
757 }
758 } else {
759 /* If there is not a background saving in progress check if
760 * we have to save now */
761 time_t now = time(NULL);
762 for (j = 0; j < server.saveparamslen; j++) {
763 struct saveparam *sp = server.saveparams+j;
764
765 if (server.dirty >= sp->changes &&
766 now-server.lastsave > sp->seconds) {
767 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
768 sp->changes, sp->seconds);
769 rdbSaveBackground(server.dbfilename);
770 break;
771 }
772 }
773 }
774
775 /* Try to expire a few timed out keys */
776 for (j = 0; j < server.dbnum; j++) {
777 redisDb *db = server.db+j;
778 int num = dictSize(db->expires);
779
780 if (num) {
781 time_t now = time(NULL);
782
783 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
784 num = REDIS_EXPIRELOOKUPS_PER_CRON;
785 while (num--) {
786 dictEntry *de;
787 time_t t;
788
789 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
790 t = (time_t) dictGetEntryVal(de);
791 if (now > t) {
792 deleteKey(db,dictGetEntryKey(de));
793 }
794 }
795 }
796 }
797
798 /* Check if we should connect to a MASTER */
799 if (server.replstate == REDIS_REPL_CONNECT) {
800 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
801 if (syncWithMaster() == REDIS_OK) {
802 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
803 }
804 }
805 return 1000;
806 }
807
808 static void createSharedObjects(void) {
809 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
810 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
811 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
812 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
813 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
814 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
815 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
816 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
817 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
818 /* no such key */
819 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
820 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
821 "-ERR Operation against a key holding the wrong kind of value\r\n"));
822 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
823 "-ERR no such key\r\n"));
824 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
825 "-ERR syntax error\r\n"));
826 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
827 "-ERR source and destination objects are the same\r\n"));
828 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
829 "-ERR index out of range\r\n"));
830 shared.space = createObject(REDIS_STRING,sdsnew(" "));
831 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
832 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
833 shared.select0 = createStringObject("select 0\r\n",10);
834 shared.select1 = createStringObject("select 1\r\n",10);
835 shared.select2 = createStringObject("select 2\r\n",10);
836 shared.select3 = createStringObject("select 3\r\n",10);
837 shared.select4 = createStringObject("select 4\r\n",10);
838 shared.select5 = createStringObject("select 5\r\n",10);
839 shared.select6 = createStringObject("select 6\r\n",10);
840 shared.select7 = createStringObject("select 7\r\n",10);
841 shared.select8 = createStringObject("select 8\r\n",10);
842 shared.select9 = createStringObject("select 9\r\n",10);
843 }
844
845 static void appendServerSaveParams(time_t seconds, int changes) {
846 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
847 if (server.saveparams == NULL) oom("appendServerSaveParams");
848 server.saveparams[server.saveparamslen].seconds = seconds;
849 server.saveparams[server.saveparamslen].changes = changes;
850 server.saveparamslen++;
851 }
852
853 static void ResetServerSaveParams() {
854 zfree(server.saveparams);
855 server.saveparams = NULL;
856 server.saveparamslen = 0;
857 }
858
859 static void initServerConfig() {
860 server.dbnum = REDIS_DEFAULT_DBNUM;
861 server.port = REDIS_SERVERPORT;
862 server.verbosity = REDIS_DEBUG;
863 server.maxidletime = REDIS_MAXIDLETIME;
864 server.saveparams = NULL;
865 server.logfile = NULL; /* NULL = log on standard output */
866 server.bindaddr = NULL;
867 server.glueoutputbuf = 1;
868 server.daemonize = 0;
869 server.pidfile = "/var/run/redis.pid";
870 server.dbfilename = "dump.rdb";
871 server.requirepass = NULL;
872 server.shareobjects = 0;
873 server.maxclients = 0;
874 server.maxmemory = 0;
875 ResetServerSaveParams();
876
877 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
878 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
879 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
880 /* Replication related */
881 server.isslave = 0;
882 server.masterhost = NULL;
883 server.masterport = 6379;
884 server.master = NULL;
885 server.replstate = REDIS_REPL_NONE;
886 }
887
888 static void initServer() {
889 int j;
890
891 signal(SIGHUP, SIG_IGN);
892 signal(SIGPIPE, SIG_IGN);
893
894 server.clients = listCreate();
895 server.slaves = listCreate();
896 server.monitors = listCreate();
897 server.objfreelist = listCreate();
898 createSharedObjects();
899 server.el = aeCreateEventLoop();
900 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
901 server.sharingpool = dictCreate(&setDictType,NULL);
902 server.sharingpoolsize = 1024;
903 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
904 oom("server initialization"); /* Fatal OOM */
905 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
906 if (server.fd == -1) {
907 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
908 exit(1);
909 }
910 for (j = 0; j < server.dbnum; j++) {
911 server.db[j].dict = dictCreate(&hashDictType,NULL);
912 server.db[j].expires = dictCreate(&setDictType,NULL);
913 server.db[j].id = j;
914 }
915 server.cronloops = 0;
916 server.bgsaveinprogress = 0;
917 server.lastsave = time(NULL);
918 server.dirty = 0;
919 server.usedmemory = 0;
920 server.stat_numcommands = 0;
921 server.stat_numconnections = 0;
922 server.stat_starttime = time(NULL);
923 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
924 }
925
926 /* Empty the whole database */
927 static long long emptyDb() {
928 int j;
929 long long removed = 0;
930
931 for (j = 0; j < server.dbnum; j++) {
932 removed += dictSize(server.db[j].dict);
933 dictEmpty(server.db[j].dict);
934 dictEmpty(server.db[j].expires);
935 }
936 return removed;
937 }
938
939 static int yesnotoi(char *s) {
940 if (!strcasecmp(s,"yes")) return 1;
941 else if (!strcasecmp(s,"no")) return 0;
942 else return -1;
943 }
944
945 /* I agree, this is a very rudimental way to load a configuration...
946 will improve later if the config gets more complex */
947 static void loadServerConfig(char *filename) {
948 FILE *fp = fopen(filename,"r");
949 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
950 int linenum = 0;
951 sds line = NULL;
952
953 if (!fp) {
954 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
955 exit(1);
956 }
957 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
958 sds *argv;
959 int argc, j;
960
961 linenum++;
962 line = sdsnew(buf);
963 line = sdstrim(line," \t\r\n");
964
965 /* Skip comments and blank lines*/
966 if (line[0] == '#' || line[0] == '\0') {
967 sdsfree(line);
968 continue;
969 }
970
971 /* Split into arguments */
972 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
973 sdstolower(argv[0]);
974
975 /* Execute config directives */
976 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
977 server.maxidletime = atoi(argv[1]);
978 if (server.maxidletime < 0) {
979 err = "Invalid timeout value"; goto loaderr;
980 }
981 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
982 server.port = atoi(argv[1]);
983 if (server.port < 1 || server.port > 65535) {
984 err = "Invalid port"; goto loaderr;
985 }
986 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
987 server.bindaddr = zstrdup(argv[1]);
988 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
989 int seconds = atoi(argv[1]);
990 int changes = atoi(argv[2]);
991 if (seconds < 1 || changes < 0) {
992 err = "Invalid save parameters"; goto loaderr;
993 }
994 appendServerSaveParams(seconds,changes);
995 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
996 if (chdir(argv[1]) == -1) {
997 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
998 argv[1], strerror(errno));
999 exit(1);
1000 }
1001 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1002 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1003 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1004 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1005 else {
1006 err = "Invalid log level. Must be one of debug, notice, warning";
1007 goto loaderr;
1008 }
1009 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1010 FILE *fp;
1011
1012 server.logfile = zstrdup(argv[1]);
1013 if (!strcasecmp(server.logfile,"stdout")) {
1014 zfree(server.logfile);
1015 server.logfile = NULL;
1016 }
1017 if (server.logfile) {
1018 /* Test if we are able to open the file. The server will not
1019 * be able to abort just for this problem later... */
1020 fp = fopen(server.logfile,"a");
1021 if (fp == NULL) {
1022 err = sdscatprintf(sdsempty(),
1023 "Can't open the log file: %s", strerror(errno));
1024 goto loaderr;
1025 }
1026 fclose(fp);
1027 }
1028 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1029 server.dbnum = atoi(argv[1]);
1030 if (server.dbnum < 1) {
1031 err = "Invalid number of databases"; goto loaderr;
1032 }
1033 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1034 server.maxclients = atoi(argv[1]);
1035 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1036 server.maxmemory = atoi(argv[1]);
1037 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1038 server.masterhost = sdsnew(argv[1]);
1039 server.masterport = atoi(argv[2]);
1040 server.replstate = REDIS_REPL_CONNECT;
1041 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1042 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1043 err = "argument must be 'yes' or 'no'"; goto loaderr;
1044 }
1045 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1046 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1047 err = "argument must be 'yes' or 'no'"; goto loaderr;
1048 }
1049 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1050 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1051 err = "argument must be 'yes' or 'no'"; goto loaderr;
1052 }
1053 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1054 server.requirepass = zstrdup(argv[1]);
1055 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1056 server.pidfile = zstrdup(argv[1]);
1057 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1058 server.dbfilename = zstrdup(argv[1]);
1059 } else {
1060 err = "Bad directive or wrong number of arguments"; goto loaderr;
1061 }
1062 for (j = 0; j < argc; j++)
1063 sdsfree(argv[j]);
1064 zfree(argv);
1065 sdsfree(line);
1066 }
1067 fclose(fp);
1068 return;
1069
1070 loaderr:
1071 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1072 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1073 fprintf(stderr, ">>> '%s'\n", line);
1074 fprintf(stderr, "%s\n", err);
1075 exit(1);
1076 }
1077
1078 static void freeClientArgv(redisClient *c) {
1079 int j;
1080
1081 for (j = 0; j < c->argc; j++)
1082 decrRefCount(c->argv[j]);
1083 c->argc = 0;
1084 }
1085
1086 static void freeClient(redisClient *c) {
1087 listNode *ln;
1088
1089 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1090 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1091 sdsfree(c->querybuf);
1092 listRelease(c->reply);
1093 freeClientArgv(c);
1094 close(c->fd);
1095 ln = listSearchKey(server.clients,c);
1096 assert(ln != NULL);
1097 listDelNode(server.clients,ln);
1098 if (c->flags & REDIS_SLAVE) {
1099 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1100 close(c->repldbfd);
1101 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1102 ln = listSearchKey(l,c);
1103 assert(ln != NULL);
1104 listDelNode(l,ln);
1105 }
1106 if (c->flags & REDIS_MASTER) {
1107 server.master = NULL;
1108 server.replstate = REDIS_REPL_CONNECT;
1109 }
1110 zfree(c->argv);
1111 zfree(c);
1112 }
1113
1114 static void glueReplyBuffersIfNeeded(redisClient *c) {
1115 int totlen = 0;
1116 listNode *ln;
1117 robj *o;
1118
1119 listRewind(c->reply);
1120 while((ln = listYield(c->reply))) {
1121 o = ln->value;
1122 totlen += sdslen(o->ptr);
1123 /* This optimization makes more sense if we don't have to copy
1124 * too much data */
1125 if (totlen > 1024) return;
1126 }
1127 if (totlen > 0) {
1128 char buf[1024];
1129 int copylen = 0;
1130
1131 listRewind(c->reply);
1132 while((ln = listYield(c->reply))) {
1133 o = ln->value;
1134 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1135 copylen += sdslen(o->ptr);
1136 listDelNode(c->reply,ln);
1137 }
1138 /* Now the output buffer is empty, add the new single element */
1139 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1140 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1141 }
1142 }
1143
1144 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1145 redisClient *c = privdata;
1146 int nwritten = 0, totwritten = 0, objlen;
1147 robj *o;
1148 REDIS_NOTUSED(el);
1149 REDIS_NOTUSED(mask);
1150
1151 if (server.glueoutputbuf && listLength(c->reply) > 1)
1152 glueReplyBuffersIfNeeded(c);
1153 while(listLength(c->reply)) {
1154 o = listNodeValue(listFirst(c->reply));
1155 objlen = sdslen(o->ptr);
1156
1157 if (objlen == 0) {
1158 listDelNode(c->reply,listFirst(c->reply));
1159 continue;
1160 }
1161
1162 if (c->flags & REDIS_MASTER) {
1163 nwritten = objlen - c->sentlen;
1164 } else {
1165 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1166 if (nwritten <= 0) break;
1167 }
1168 c->sentlen += nwritten;
1169 totwritten += nwritten;
1170 /* If we fully sent the object on head go to the next one */
1171 if (c->sentlen == objlen) {
1172 listDelNode(c->reply,listFirst(c->reply));
1173 c->sentlen = 0;
1174 }
1175 }
1176 if (nwritten == -1) {
1177 if (errno == EAGAIN) {
1178 nwritten = 0;
1179 } else {
1180 redisLog(REDIS_DEBUG,
1181 "Error writing to client: %s", strerror(errno));
1182 freeClient(c);
1183 return;
1184 }
1185 }
1186 if (totwritten > 0) c->lastinteraction = time(NULL);
1187 if (listLength(c->reply) == 0) {
1188 c->sentlen = 0;
1189 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1190 }
1191 }
1192
1193 static struct redisCommand *lookupCommand(char *name) {
1194 int j = 0;
1195 while(cmdTable[j].name != NULL) {
1196 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1197 j++;
1198 }
1199 return NULL;
1200 }
1201
1202 /* resetClient prepare the client to process the next command */
1203 static void resetClient(redisClient *c) {
1204 freeClientArgv(c);
1205 c->bulklen = -1;
1206 }
1207
1208 /* If this function gets called we already read a whole
1209 * command, argments are in the client argv/argc fields.
1210 * processCommand() execute the command or prepare the
1211 * server for a bulk read from the client.
1212 *
1213 * If 1 is returned the client is still alive and valid and
1214 * and other operations can be performed by the caller. Otherwise
1215 * if 0 is returned the client was destroied (i.e. after QUIT). */
1216 static int processCommand(redisClient *c) {
1217 struct redisCommand *cmd;
1218 long long dirty;
1219
1220 /* Free some memory if needed (maxmemory setting) */
1221 if (server.maxmemory) freeMemoryIfNeeded();
1222
1223 /* The QUIT command is handled as a special case. Normal command
1224 * procs are unable to close the client connection safely */
1225 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1226 freeClient(c);
1227 return 0;
1228 }
1229 cmd = lookupCommand(c->argv[0]->ptr);
1230 if (!cmd) {
1231 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1232 resetClient(c);
1233 return 1;
1234 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1235 (c->argc < -cmd->arity)) {
1236 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1237 resetClient(c);
1238 return 1;
1239 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1240 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1241 resetClient(c);
1242 return 1;
1243 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1244 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1245
1246 decrRefCount(c->argv[c->argc-1]);
1247 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1248 c->argc--;
1249 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1250 resetClient(c);
1251 return 1;
1252 }
1253 c->argc--;
1254 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1255 /* It is possible that the bulk read is already in the
1256 * buffer. Check this condition and handle it accordingly */
1257 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1258 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1259 c->argc++;
1260 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1261 } else {
1262 return 1;
1263 }
1264 }
1265 /* Let's try to share objects on the command arguments vector */
1266 if (server.shareobjects) {
1267 int j;
1268 for(j = 1; j < c->argc; j++)
1269 c->argv[j] = tryObjectSharing(c->argv[j]);
1270 }
1271 /* Check if the user is authenticated */
1272 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1273 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1274 resetClient(c);
1275 return 1;
1276 }
1277
1278 /* Exec the command */
1279 dirty = server.dirty;
1280 cmd->proc(c);
1281 if (server.dirty-dirty != 0 && listLength(server.slaves))
1282 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1283 if (listLength(server.monitors))
1284 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1285 server.stat_numcommands++;
1286
1287 /* Prepare the client for the next command */
1288 if (c->flags & REDIS_CLOSE) {
1289 freeClient(c);
1290 return 0;
1291 }
1292 resetClient(c);
1293 return 1;
1294 }
1295
1296 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1297 listNode *ln;
1298 int outc = 0, j;
1299 robj **outv;
1300 /* (args*2)+1 is enough room for args, spaces, newlines */
1301 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1302
1303 if (argc <= REDIS_STATIC_ARGS) {
1304 outv = static_outv;
1305 } else {
1306 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1307 if (!outv) oom("replicationFeedSlaves");
1308 }
1309
1310 for (j = 0; j < argc; j++) {
1311 if (j != 0) outv[outc++] = shared.space;
1312 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1313 robj *lenobj;
1314
1315 lenobj = createObject(REDIS_STRING,
1316 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv[j]->ptr)));
1317 lenobj->refcount = 0;
1318 outv[outc++] = lenobj;
1319 }
1320 outv[outc++] = argv[j];
1321 }
1322 outv[outc++] = shared.crlf;
1323
1324 /* Increment all the refcounts at start and decrement at end in order to
1325 * be sure to free objects if there is no slave in a replication state
1326 * able to be feed with commands */
1327 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1328 listRewind(slaves);
1329 while((ln = listYield(slaves))) {
1330 redisClient *slave = ln->value;
1331
1332 /* Don't feed slaves that are still waiting for BGSAVE to start */
1333 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1334
1335 /* Feed all the other slaves, MONITORs and so on */
1336 if (slave->slaveseldb != dictid) {
1337 robj *selectcmd;
1338
1339 switch(dictid) {
1340 case 0: selectcmd = shared.select0; break;
1341 case 1: selectcmd = shared.select1; break;
1342 case 2: selectcmd = shared.select2; break;
1343 case 3: selectcmd = shared.select3; break;
1344 case 4: selectcmd = shared.select4; break;
1345 case 5: selectcmd = shared.select5; break;
1346 case 6: selectcmd = shared.select6; break;
1347 case 7: selectcmd = shared.select7; break;
1348 case 8: selectcmd = shared.select8; break;
1349 case 9: selectcmd = shared.select9; break;
1350 default:
1351 selectcmd = createObject(REDIS_STRING,
1352 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1353 selectcmd->refcount = 0;
1354 break;
1355 }
1356 addReply(slave,selectcmd);
1357 slave->slaveseldb = dictid;
1358 }
1359 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1360 }
1361 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1362 if (outv != static_outv) zfree(outv);
1363 }
1364
1365 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1366 redisClient *c = (redisClient*) privdata;
1367 char buf[REDIS_IOBUF_LEN];
1368 int nread;
1369 REDIS_NOTUSED(el);
1370 REDIS_NOTUSED(mask);
1371
1372 nread = read(fd, buf, REDIS_IOBUF_LEN);
1373 if (nread == -1) {
1374 if (errno == EAGAIN) {
1375 nread = 0;
1376 } else {
1377 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1378 freeClient(c);
1379 return;
1380 }
1381 } else if (nread == 0) {
1382 redisLog(REDIS_DEBUG, "Client closed connection");
1383 freeClient(c);
1384 return;
1385 }
1386 if (nread) {
1387 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1388 c->lastinteraction = time(NULL);
1389 } else {
1390 return;
1391 }
1392
1393 again:
1394 if (c->bulklen == -1) {
1395 /* Read the first line of the query */
1396 char *p = strchr(c->querybuf,'\n');
1397 size_t querylen;
1398 if (p) {
1399 sds query, *argv;
1400 int argc, j;
1401
1402 query = c->querybuf;
1403 c->querybuf = sdsempty();
1404 querylen = 1+(p-(query));
1405 if (sdslen(query) > querylen) {
1406 /* leave data after the first line of the query in the buffer */
1407 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1408 }
1409 *p = '\0'; /* remove "\n" */
1410 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1411 sdsupdatelen(query);
1412
1413 /* Now we can split the query in arguments */
1414 if (sdslen(query) == 0) {
1415 /* Ignore empty query */
1416 sdsfree(query);
1417 return;
1418 }
1419 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1420 if (argv == NULL) oom("sdssplitlen");
1421 sdsfree(query);
1422
1423 if (c->argv) zfree(c->argv);
1424 c->argv = zmalloc(sizeof(robj*)*argc);
1425 if (c->argv == NULL) oom("allocating arguments list for client");
1426
1427 for (j = 0; j < argc; j++) {
1428 if (sdslen(argv[j])) {
1429 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1430 c->argc++;
1431 } else {
1432 sdsfree(argv[j]);
1433 }
1434 }
1435 zfree(argv);
1436 /* Execute the command. If the client is still valid
1437 * after processCommand() return and there is something
1438 * on the query buffer try to process the next command. */
1439 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1440 return;
1441 } else if (sdslen(c->querybuf) >= 1024*32) {
1442 redisLog(REDIS_DEBUG, "Client protocol error");
1443 freeClient(c);
1444 return;
1445 }
1446 } else {
1447 /* Bulk read handling. Note that if we are at this point
1448 the client already sent a command terminated with a newline,
1449 we are reading the bulk data that is actually the last
1450 argument of the command. */
1451 int qbl = sdslen(c->querybuf);
1452
1453 if (c->bulklen <= qbl) {
1454 /* Copy everything but the final CRLF as final argument */
1455 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1456 c->argc++;
1457 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1458 processCommand(c);
1459 return;
1460 }
1461 }
1462 }
1463
1464 static int selectDb(redisClient *c, int id) {
1465 if (id < 0 || id >= server.dbnum)
1466 return REDIS_ERR;
1467 c->db = &server.db[id];
1468 return REDIS_OK;
1469 }
1470
1471 static void *dupClientReplyValue(void *o) {
1472 incrRefCount((robj*)o);
1473 return 0;
1474 }
1475
1476 static redisClient *createClient(int fd) {
1477 redisClient *c = zmalloc(sizeof(*c));
1478
1479 anetNonBlock(NULL,fd);
1480 anetTcpNoDelay(NULL,fd);
1481 if (!c) return NULL;
1482 selectDb(c,0);
1483 c->fd = fd;
1484 c->querybuf = sdsempty();
1485 c->argc = 0;
1486 c->argv = NULL;
1487 c->bulklen = -1;
1488 c->sentlen = 0;
1489 c->flags = 0;
1490 c->lastinteraction = time(NULL);
1491 c->authenticated = 0;
1492 c->replstate = REDIS_REPL_NONE;
1493 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1494 listSetFreeMethod(c->reply,decrRefCount);
1495 listSetDupMethod(c->reply,dupClientReplyValue);
1496 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1497 readQueryFromClient, c, NULL) == AE_ERR) {
1498 freeClient(c);
1499 return NULL;
1500 }
1501 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1502 return c;
1503 }
1504
1505 static void addReply(redisClient *c, robj *obj) {
1506 if (listLength(c->reply) == 0 &&
1507 (c->replstate == REDIS_REPL_NONE ||
1508 c->replstate == REDIS_REPL_ONLINE) &&
1509 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1510 sendReplyToClient, c, NULL) == AE_ERR) return;
1511 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1512 incrRefCount(obj);
1513 }
1514
1515 static void addReplySds(redisClient *c, sds s) {
1516 robj *o = createObject(REDIS_STRING,s);
1517 addReply(c,o);
1518 decrRefCount(o);
1519 }
1520
1521 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1522 int cport, cfd;
1523 char cip[128];
1524 redisClient *c;
1525 REDIS_NOTUSED(el);
1526 REDIS_NOTUSED(mask);
1527 REDIS_NOTUSED(privdata);
1528
1529 cfd = anetAccept(server.neterr, fd, cip, &cport);
1530 if (cfd == AE_ERR) {
1531 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1532 return;
1533 }
1534 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1535 if ((c = createClient(cfd)) == NULL) {
1536 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1537 close(cfd); /* May be already closed, just ingore errors */
1538 return;
1539 }
1540 /* If maxclient directive is set and this is one client more... close the
1541 * connection. Note that we create the client instead to check before
1542 * for this condition, since now the socket is already set in nonblocking
1543 * mode and we can send an error for free using the Kernel I/O */
1544 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1545 char *err = "-ERR max number of clients reached\r\n";
1546
1547 /* That's a best effort error message, don't check write errors */
1548 (void) write(c->fd,err,strlen(err));
1549 freeClient(c);
1550 return;
1551 }
1552 server.stat_numconnections++;
1553 }
1554
1555 /* ======================= Redis objects implementation ===================== */
1556
1557 static robj *createObject(int type, void *ptr) {
1558 robj *o;
1559
1560 if (listLength(server.objfreelist)) {
1561 listNode *head = listFirst(server.objfreelist);
1562 o = listNodeValue(head);
1563 listDelNode(server.objfreelist,head);
1564 } else {
1565 o = zmalloc(sizeof(*o));
1566 }
1567 if (!o) oom("createObject");
1568 o->type = type;
1569 o->ptr = ptr;
1570 o->refcount = 1;
1571 return o;
1572 }
1573
1574 static robj *createStringObject(char *ptr, size_t len) {
1575 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1576 }
1577
1578 static robj *createListObject(void) {
1579 list *l = listCreate();
1580
1581 if (!l) oom("listCreate");
1582 listSetFreeMethod(l,decrRefCount);
1583 return createObject(REDIS_LIST,l);
1584 }
1585
1586 static robj *createSetObject(void) {
1587 dict *d = dictCreate(&setDictType,NULL);
1588 if (!d) oom("dictCreate");
1589 return createObject(REDIS_SET,d);
1590 }
1591
1592 static void freeStringObject(robj *o) {
1593 sdsfree(o->ptr);
1594 }
1595
1596 static void freeListObject(robj *o) {
1597 listRelease((list*) o->ptr);
1598 }
1599
1600 static void freeSetObject(robj *o) {
1601 dictRelease((dict*) o->ptr);
1602 }
1603
1604 static void freeHashObject(robj *o) {
1605 dictRelease((dict*) o->ptr);
1606 }
1607
1608 static void incrRefCount(robj *o) {
1609 o->refcount++;
1610 #ifdef DEBUG_REFCOUNT
1611 if (o->type == REDIS_STRING)
1612 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1613 #endif
1614 }
1615
1616 static void decrRefCount(void *obj) {
1617 robj *o = obj;
1618
1619 #ifdef DEBUG_REFCOUNT
1620 if (o->type == REDIS_STRING)
1621 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1622 #endif
1623 if (--(o->refcount) == 0) {
1624 switch(o->type) {
1625 case REDIS_STRING: freeStringObject(o); break;
1626 case REDIS_LIST: freeListObject(o); break;
1627 case REDIS_SET: freeSetObject(o); break;
1628 case REDIS_HASH: freeHashObject(o); break;
1629 default: assert(0 != 0); break;
1630 }
1631 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1632 !listAddNodeHead(server.objfreelist,o))
1633 zfree(o);
1634 }
1635 }
1636
1637 /* Try to share an object against the shared objects pool */
1638 static robj *tryObjectSharing(robj *o) {
1639 struct dictEntry *de;
1640 unsigned long c;
1641
1642 if (o == NULL || server.shareobjects == 0) return o;
1643
1644 assert(o->type == REDIS_STRING);
1645 de = dictFind(server.sharingpool,o);
1646 if (de) {
1647 robj *shared = dictGetEntryKey(de);
1648
1649 c = ((unsigned long) dictGetEntryVal(de))+1;
1650 dictGetEntryVal(de) = (void*) c;
1651 incrRefCount(shared);
1652 decrRefCount(o);
1653 return shared;
1654 } else {
1655 /* Here we are using a stream algorihtm: Every time an object is
1656 * shared we increment its count, everytime there is a miss we
1657 * recrement the counter of a random object. If this object reaches
1658 * zero we remove the object and put the current object instead. */
1659 if (dictSize(server.sharingpool) >=
1660 server.sharingpoolsize) {
1661 de = dictGetRandomKey(server.sharingpool);
1662 assert(de != NULL);
1663 c = ((unsigned long) dictGetEntryVal(de))-1;
1664 dictGetEntryVal(de) = (void*) c;
1665 if (c == 0) {
1666 dictDelete(server.sharingpool,de->key);
1667 }
1668 } else {
1669 c = 0; /* If the pool is empty we want to add this object */
1670 }
1671 if (c == 0) {
1672 int retval;
1673
1674 retval = dictAdd(server.sharingpool,o,(void*)1);
1675 assert(retval == DICT_OK);
1676 incrRefCount(o);
1677 }
1678 return o;
1679 }
1680 }
1681
1682 static robj *lookupKey(redisDb *db, robj *key) {
1683 dictEntry *de = dictFind(db->dict,key);
1684 return de ? dictGetEntryVal(de) : NULL;
1685 }
1686
1687 static robj *lookupKeyRead(redisDb *db, robj *key) {
1688 expireIfNeeded(db,key);
1689 return lookupKey(db,key);
1690 }
1691
1692 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1693 deleteIfVolatile(db,key);
1694 return lookupKey(db,key);
1695 }
1696
1697 static int deleteKey(redisDb *db, robj *key) {
1698 int retval;
1699
1700 /* We need to protect key from destruction: after the first dictDelete()
1701 * it may happen that 'key' is no longer valid if we don't increment
1702 * it's count. This may happen when we get the object reference directly
1703 * from the hash table with dictRandomKey() or dict iterators */
1704 incrRefCount(key);
1705 if (dictSize(db->expires)) dictDelete(db->expires,key);
1706 retval = dictDelete(db->dict,key);
1707 decrRefCount(key);
1708
1709 return retval == DICT_OK;
1710 }
1711
1712 /*============================ DB saving/loading ============================ */
1713
1714 static int rdbSaveType(FILE *fp, unsigned char type) {
1715 if (fwrite(&type,1,1,fp) == 0) return -1;
1716 return 0;
1717 }
1718
1719 static int rdbSaveTime(FILE *fp, time_t t) {
1720 int32_t t32 = (int32_t) t;
1721 if (fwrite(&t32,4,1,fp) == 0) return -1;
1722 return 0;
1723 }
1724
1725 /* check rdbLoadLen() comments for more info */
1726 static int rdbSaveLen(FILE *fp, uint32_t len) {
1727 unsigned char buf[2];
1728
1729 if (len < (1<<6)) {
1730 /* Save a 6 bit len */
1731 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
1732 if (fwrite(buf,1,1,fp) == 0) return -1;
1733 } else if (len < (1<<14)) {
1734 /* Save a 14 bit len */
1735 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
1736 buf[1] = len&0xFF;
1737 if (fwrite(buf,2,1,fp) == 0) return -1;
1738 } else {
1739 /* Save a 32 bit len */
1740 buf[0] = (REDIS_RDB_32BITLEN<<6);
1741 if (fwrite(buf,1,1,fp) == 0) return -1;
1742 len = htonl(len);
1743 if (fwrite(&len,4,1,fp) == 0) return -1;
1744 }
1745 return 0;
1746 }
1747
1748 /* String objects in the form "2391" "-100" without any space and with a
1749 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1750 * encoded as integers to save space */
1751 int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
1752 long long value;
1753 char *endptr, buf[32];
1754
1755 /* Check if it's possible to encode this value as a number */
1756 value = strtoll(s, &endptr, 10);
1757 if (endptr[0] != '\0') return 0;
1758 snprintf(buf,32,"%lld",value);
1759
1760 /* If the number converted back into a string is not identical
1761 * then it's not possible to encode the string as integer */
1762 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
1763
1764 /* Finally check if it fits in our ranges */
1765 if (value >= -(1<<7) && value <= (1<<7)-1) {
1766 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
1767 enc[1] = value&0xFF;
1768 return 2;
1769 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
1770 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
1771 enc[1] = value&0xFF;
1772 enc[2] = (value>>8)&0xFF;
1773 return 3;
1774 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
1775 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
1776 enc[1] = value&0xFF;
1777 enc[2] = (value>>8)&0xFF;
1778 enc[3] = (value>>16)&0xFF;
1779 enc[4] = (value>>24)&0xFF;
1780 return 5;
1781 } else {
1782 return 0;
1783 }
1784 }
1785
1786 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
1787 unsigned int comprlen, outlen;
1788 unsigned char byte;
1789 void *out;
1790
1791 /* We require at least four bytes compression for this to be worth it */
1792 outlen = sdslen(obj->ptr)-4;
1793 if (outlen <= 0) return 0;
1794 if ((out = zmalloc(outlen+1)) == NULL) return 0;
1795 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
1796 if (comprlen == 0) {
1797 zfree(out);
1798 return 0;
1799 }
1800 /* Data compressed! Let's save it on disk */
1801 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
1802 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
1803 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
1804 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
1805 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
1806 zfree(out);
1807 return comprlen;
1808
1809 writeerr:
1810 zfree(out);
1811 return -1;
1812 }
1813
1814 /* Save a string objet as [len][data] on disk. If the object is a string
1815 * representation of an integer value we try to safe it in a special form */
1816 static int rdbSaveStringObject(FILE *fp, robj *obj) {
1817 size_t len = sdslen(obj->ptr);
1818 int enclen;
1819
1820 /* Try integer encoding */
1821 if (len <= 11) {
1822 unsigned char buf[5];
1823 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
1824 if (fwrite(buf,enclen,1,fp) == 0) return -1;
1825 return 0;
1826 }
1827 }
1828
1829 /* Try LZF compression - under 20 bytes it's unable to compress even
1830 * aaaaaaaaaaaaaaaaaa so skip it */
1831 if (1 && len > 20) {
1832 int retval;
1833
1834 retval = rdbSaveLzfStringObject(fp,obj);
1835 if (retval == -1) return -1;
1836 if (retval > 0) return 0;
1837 /* retval == 0 means data can't be compressed, save the old way */
1838 }
1839
1840 /* Store verbatim */
1841 if (rdbSaveLen(fp,len) == -1) return -1;
1842 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
1843 return 0;
1844 }
1845
1846 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1847 static int rdbSave(char *filename) {
1848 dictIterator *di = NULL;
1849 dictEntry *de;
1850 FILE *fp;
1851 char tmpfile[256];
1852 int j;
1853 time_t now = time(NULL);
1854
1855 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
1856 fp = fopen(tmpfile,"w");
1857 if (!fp) {
1858 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
1859 return REDIS_ERR;
1860 }
1861 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
1862 for (j = 0; j < server.dbnum; j++) {
1863 redisDb *db = server.db+j;
1864 dict *d = db->dict;
1865 if (dictSize(d) == 0) continue;
1866 di = dictGetIterator(d);
1867 if (!di) {
1868 fclose(fp);
1869 return REDIS_ERR;
1870 }
1871
1872 /* Write the SELECT DB opcode */
1873 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
1874 if (rdbSaveLen(fp,j) == -1) goto werr;
1875
1876 /* Iterate this DB writing every entry */
1877 while((de = dictNext(di)) != NULL) {
1878 robj *key = dictGetEntryKey(de);
1879 robj *o = dictGetEntryVal(de);
1880 time_t expiretime = getExpire(db,key);
1881
1882 /* Save the expire time */
1883 if (expiretime != -1) {
1884 /* If this key is already expired skip it */
1885 if (expiretime < now) continue;
1886 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
1887 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
1888 }
1889 /* Save the key and associated value */
1890 if (rdbSaveType(fp,o->type) == -1) goto werr;
1891 if (rdbSaveStringObject(fp,key) == -1) goto werr;
1892 if (o->type == REDIS_STRING) {
1893 /* Save a string value */
1894 if (rdbSaveStringObject(fp,o) == -1) goto werr;
1895 } else if (o->type == REDIS_LIST) {
1896 /* Save a list value */
1897 list *list = o->ptr;
1898 listNode *ln;
1899
1900 listRewind(list);
1901 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
1902 while((ln = listYield(list))) {
1903 robj *eleobj = listNodeValue(ln);
1904
1905 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1906 }
1907 } else if (o->type == REDIS_SET) {
1908 /* Save a set value */
1909 dict *set = o->ptr;
1910 dictIterator *di = dictGetIterator(set);
1911 dictEntry *de;
1912
1913 if (!set) oom("dictGetIteraotr");
1914 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
1915 while((de = dictNext(di)) != NULL) {
1916 robj *eleobj = dictGetEntryKey(de);
1917
1918 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1919 }
1920 dictReleaseIterator(di);
1921 } else {
1922 assert(0 != 0);
1923 }
1924 }
1925 dictReleaseIterator(di);
1926 }
1927 /* EOF opcode */
1928 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
1929
1930 /* Make sure data will not remain on the OS's output buffers */
1931 fflush(fp);
1932 fsync(fileno(fp));
1933 fclose(fp);
1934
1935 /* Use RENAME to make sure the DB file is changed atomically only
1936 * if the generate DB file is ok. */
1937 if (rename(tmpfile,filename) == -1) {
1938 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
1939 unlink(tmpfile);
1940 return REDIS_ERR;
1941 }
1942 redisLog(REDIS_NOTICE,"DB saved on disk");
1943 server.dirty = 0;
1944 server.lastsave = time(NULL);
1945 return REDIS_OK;
1946
1947 werr:
1948 fclose(fp);
1949 unlink(tmpfile);
1950 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1951 if (di) dictReleaseIterator(di);
1952 return REDIS_ERR;
1953 }
1954
1955 static int rdbSaveBackground(char *filename) {
1956 pid_t childpid;
1957
1958 if (server.bgsaveinprogress) return REDIS_ERR;
1959 if ((childpid = fork()) == 0) {
1960 /* Child */
1961 close(server.fd);
1962 if (rdbSave(filename) == REDIS_OK) {
1963 exit(0);
1964 } else {
1965 exit(1);
1966 }
1967 } else {
1968 /* Parent */
1969 if (childpid == -1) {
1970 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
1971 strerror(errno));
1972 return REDIS_ERR;
1973 }
1974 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
1975 server.bgsaveinprogress = 1;
1976 return REDIS_OK;
1977 }
1978 return REDIS_OK; /* unreached */
1979 }
1980
1981 static int rdbLoadType(FILE *fp) {
1982 unsigned char type;
1983 if (fread(&type,1,1,fp) == 0) return -1;
1984 return type;
1985 }
1986
1987 static time_t rdbLoadTime(FILE *fp) {
1988 int32_t t32;
1989 if (fread(&t32,4,1,fp) == 0) return -1;
1990 return (time_t) t32;
1991 }
1992
1993 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
1994 * of this file for a description of how this are stored on disk.
1995 *
1996 * isencoded is set to 1 if the readed length is not actually a length but
1997 * an "encoding type", check the above comments for more info */
1998 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
1999 unsigned char buf[2];
2000 uint32_t len;
2001
2002 if (isencoded) *isencoded = 0;
2003 if (rdbver == 0) {
2004 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2005 return ntohl(len);
2006 } else {
2007 int type;
2008
2009 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2010 type = (buf[0]&0xC0)>>6;
2011 if (type == REDIS_RDB_6BITLEN) {
2012 /* Read a 6 bit len */
2013 return buf[0]&0x3F;
2014 } else if (type == REDIS_RDB_ENCVAL) {
2015 /* Read a 6 bit len encoding type */
2016 if (isencoded) *isencoded = 1;
2017 return buf[0]&0x3F;
2018 } else if (type == REDIS_RDB_14BITLEN) {
2019 /* Read a 14 bit len */
2020 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2021 return ((buf[0]&0x3F)<<8)|buf[1];
2022 } else {
2023 /* Read a 32 bit len */
2024 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2025 return ntohl(len);
2026 }
2027 }
2028 }
2029
2030 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2031 unsigned char enc[4];
2032 long long val;
2033
2034 if (enctype == REDIS_RDB_ENC_INT8) {
2035 if (fread(enc,1,1,fp) == 0) return NULL;
2036 val = (signed char)enc[0];
2037 } else if (enctype == REDIS_RDB_ENC_INT16) {
2038 uint16_t v;
2039 if (fread(enc,2,1,fp) == 0) return NULL;
2040 v = enc[0]|(enc[1]<<8);
2041 val = (int16_t)v;
2042 } else if (enctype == REDIS_RDB_ENC_INT32) {
2043 uint32_t v;
2044 if (fread(enc,4,1,fp) == 0) return NULL;
2045 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2046 val = (int32_t)v;
2047 } else {
2048 val = 0; /* anti-warning */
2049 assert(0!=0);
2050 }
2051 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2052 }
2053
2054 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2055 unsigned int len, clen;
2056 unsigned char *c = NULL;
2057 sds val = NULL;
2058
2059 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2060 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2061 if ((c = zmalloc(clen)) == NULL) goto err;
2062 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2063 if (fread(c,clen,1,fp) == 0) goto err;
2064 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2065 zfree(c);
2066 return createObject(REDIS_STRING,val);
2067 err:
2068 zfree(c);
2069 sdsfree(val);
2070 return NULL;
2071 }
2072
2073 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2074 int isencoded;
2075 uint32_t len;
2076 sds val;
2077
2078 len = rdbLoadLen(fp,rdbver,&isencoded);
2079 if (isencoded) {
2080 switch(len) {
2081 case REDIS_RDB_ENC_INT8:
2082 case REDIS_RDB_ENC_INT16:
2083 case REDIS_RDB_ENC_INT32:
2084 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2085 case REDIS_RDB_ENC_LZF:
2086 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2087 default:
2088 assert(0!=0);
2089 }
2090 }
2091
2092 if (len == REDIS_RDB_LENERR) return NULL;
2093 val = sdsnewlen(NULL,len);
2094 if (len && fread(val,len,1,fp) == 0) {
2095 sdsfree(val);
2096 return NULL;
2097 }
2098 return tryObjectSharing(createObject(REDIS_STRING,val));
2099 }
2100
2101 static int rdbLoad(char *filename) {
2102 FILE *fp;
2103 robj *keyobj = NULL;
2104 uint32_t dbid;
2105 int type, retval, rdbver;
2106 dict *d = server.db[0].dict;
2107 redisDb *db = server.db+0;
2108 char buf[1024];
2109 time_t expiretime = -1, now = time(NULL);
2110
2111 fp = fopen(filename,"r");
2112 if (!fp) return REDIS_ERR;
2113 if (fread(buf,9,1,fp) == 0) goto eoferr;
2114 buf[9] = '\0';
2115 if (memcmp(buf,"REDIS",5) != 0) {
2116 fclose(fp);
2117 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2118 return REDIS_ERR;
2119 }
2120 rdbver = atoi(buf+5);
2121 if (rdbver > 1) {
2122 fclose(fp);
2123 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2124 return REDIS_ERR;
2125 }
2126 while(1) {
2127 robj *o;
2128
2129 /* Read type. */
2130 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2131 if (type == REDIS_EXPIRETIME) {
2132 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2133 /* We read the time so we need to read the object type again */
2134 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2135 }
2136 if (type == REDIS_EOF) break;
2137 /* Handle SELECT DB opcode as a special case */
2138 if (type == REDIS_SELECTDB) {
2139 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2140 goto eoferr;
2141 if (dbid >= (unsigned)server.dbnum) {
2142 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2143 exit(1);
2144 }
2145 db = server.db+dbid;
2146 d = db->dict;
2147 continue;
2148 }
2149 /* Read key */
2150 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2151
2152 if (type == REDIS_STRING) {
2153 /* Read string value */
2154 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2155 } else if (type == REDIS_LIST || type == REDIS_SET) {
2156 /* Read list/set value */
2157 uint32_t listlen;
2158
2159 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2160 goto eoferr;
2161 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2162 /* Load every single element of the list/set */
2163 while(listlen--) {
2164 robj *ele;
2165
2166 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2167 if (type == REDIS_LIST) {
2168 if (!listAddNodeTail((list*)o->ptr,ele))
2169 oom("listAddNodeTail");
2170 } else {
2171 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2172 oom("dictAdd");
2173 }
2174 }
2175 } else {
2176 assert(0 != 0);
2177 }
2178 /* Add the new object in the hash table */
2179 retval = dictAdd(d,keyobj,o);
2180 if (retval == DICT_ERR) {
2181 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2182 exit(1);
2183 }
2184 /* Set the expire time if needed */
2185 if (expiretime != -1) {
2186 setExpire(db,keyobj,expiretime);
2187 /* Delete this key if already expired */
2188 if (expiretime < now) deleteKey(db,keyobj);
2189 expiretime = -1;
2190 }
2191 keyobj = o = NULL;
2192 }
2193 fclose(fp);
2194 return REDIS_OK;
2195
2196 eoferr: /* unexpected end of file is handled here with a fatal exit */
2197 if (keyobj) decrRefCount(keyobj);
2198 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2199 exit(1);
2200 return REDIS_ERR; /* Just to avoid warning */
2201 }
2202
2203 /*================================== Commands =============================== */
2204
2205 static void authCommand(redisClient *c) {
2206 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2207 c->authenticated = 1;
2208 addReply(c,shared.ok);
2209 } else {
2210 c->authenticated = 0;
2211 addReply(c,shared.err);
2212 }
2213 }
2214
2215 static void pingCommand(redisClient *c) {
2216 addReply(c,shared.pong);
2217 }
2218
2219 static void echoCommand(redisClient *c) {
2220 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
2221 (int)sdslen(c->argv[1]->ptr)));
2222 addReply(c,c->argv[1]);
2223 addReply(c,shared.crlf);
2224 }
2225
2226 /*=================================== Strings =============================== */
2227
2228 static void setGenericCommand(redisClient *c, int nx) {
2229 int retval;
2230
2231 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2232 if (retval == DICT_ERR) {
2233 if (!nx) {
2234 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2235 incrRefCount(c->argv[2]);
2236 } else {
2237 addReply(c,shared.czero);
2238 return;
2239 }
2240 } else {
2241 incrRefCount(c->argv[1]);
2242 incrRefCount(c->argv[2]);
2243 }
2244 server.dirty++;
2245 removeExpire(c->db,c->argv[1]);
2246 addReply(c, nx ? shared.cone : shared.ok);
2247 }
2248
2249 static void setCommand(redisClient *c) {
2250 setGenericCommand(c,0);
2251 }
2252
2253 static void setnxCommand(redisClient *c) {
2254 setGenericCommand(c,1);
2255 }
2256
2257 static void getCommand(redisClient *c) {
2258 robj *o = lookupKeyRead(c->db,c->argv[1]);
2259
2260 if (o == NULL) {
2261 addReply(c,shared.nullbulk);
2262 } else {
2263 if (o->type != REDIS_STRING) {
2264 addReply(c,shared.wrongtypeerr);
2265 } else {
2266 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2267 addReply(c,o);
2268 addReply(c,shared.crlf);
2269 }
2270 }
2271 }
2272
2273 static void getSetCommand(redisClient *c) {
2274 getCommand(c);
2275 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2276 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2277 } else {
2278 incrRefCount(c->argv[1]);
2279 }
2280 incrRefCount(c->argv[2]);
2281 server.dirty++;
2282 removeExpire(c->db,c->argv[1]);
2283 }
2284
2285 static void mgetCommand(redisClient *c) {
2286 int j;
2287
2288 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2289 for (j = 1; j < c->argc; j++) {
2290 robj *o = lookupKeyRead(c->db,c->argv[j]);
2291 if (o == NULL) {
2292 addReply(c,shared.nullbulk);
2293 } else {
2294 if (o->type != REDIS_STRING) {
2295 addReply(c,shared.nullbulk);
2296 } else {
2297 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2298 addReply(c,o);
2299 addReply(c,shared.crlf);
2300 }
2301 }
2302 }
2303 }
2304
2305 static void incrDecrCommand(redisClient *c, long long incr) {
2306 long long value;
2307 int retval;
2308 robj *o;
2309
2310 o = lookupKeyWrite(c->db,c->argv[1]);
2311 if (o == NULL) {
2312 value = 0;
2313 } else {
2314 if (o->type != REDIS_STRING) {
2315 value = 0;
2316 } else {
2317 char *eptr;
2318
2319 value = strtoll(o->ptr, &eptr, 10);
2320 }
2321 }
2322
2323 value += incr;
2324 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2325 retval = dictAdd(c->db->dict,c->argv[1],o);
2326 if (retval == DICT_ERR) {
2327 dictReplace(c->db->dict,c->argv[1],o);
2328 removeExpire(c->db,c->argv[1]);
2329 } else {
2330 incrRefCount(c->argv[1]);
2331 }
2332 server.dirty++;
2333 addReply(c,shared.colon);
2334 addReply(c,o);
2335 addReply(c,shared.crlf);
2336 }
2337
2338 static void incrCommand(redisClient *c) {
2339 incrDecrCommand(c,1);
2340 }
2341
2342 static void decrCommand(redisClient *c) {
2343 incrDecrCommand(c,-1);
2344 }
2345
2346 static void incrbyCommand(redisClient *c) {
2347 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2348 incrDecrCommand(c,incr);
2349 }
2350
2351 static void decrbyCommand(redisClient *c) {
2352 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2353 incrDecrCommand(c,-incr);
2354 }
2355
2356 /* ========================= Type agnostic commands ========================= */
2357
2358 static void delCommand(redisClient *c) {
2359 int deleted = 0, j;
2360
2361 for (j = 1; j < c->argc; j++) {
2362 if (deleteKey(c->db,c->argv[j])) {
2363 server.dirty++;
2364 deleted++;
2365 }
2366 }
2367 switch(deleted) {
2368 case 0:
2369 addReply(c,shared.czero);
2370 break;
2371 case 1:
2372 addReply(c,shared.cone);
2373 break;
2374 default:
2375 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2376 break;
2377 }
2378 }
2379
2380 static void existsCommand(redisClient *c) {
2381 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2382 }
2383
2384 static void selectCommand(redisClient *c) {
2385 int id = atoi(c->argv[1]->ptr);
2386
2387 if (selectDb(c,id) == REDIS_ERR) {
2388 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2389 } else {
2390 addReply(c,shared.ok);
2391 }
2392 }
2393
2394 static void randomkeyCommand(redisClient *c) {
2395 dictEntry *de;
2396
2397 while(1) {
2398 de = dictGetRandomKey(c->db->dict);
2399 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2400 }
2401 if (de == NULL) {
2402 addReply(c,shared.plus);
2403 addReply(c,shared.crlf);
2404 } else {
2405 addReply(c,shared.plus);
2406 addReply(c,dictGetEntryKey(de));
2407 addReply(c,shared.crlf);
2408 }
2409 }
2410
2411 static void keysCommand(redisClient *c) {
2412 dictIterator *di;
2413 dictEntry *de;
2414 sds pattern = c->argv[1]->ptr;
2415 int plen = sdslen(pattern);
2416 int numkeys = 0, keyslen = 0;
2417 robj *lenobj = createObject(REDIS_STRING,NULL);
2418
2419 di = dictGetIterator(c->db->dict);
2420 if (!di) oom("dictGetIterator");
2421 addReply(c,lenobj);
2422 decrRefCount(lenobj);
2423 while((de = dictNext(di)) != NULL) {
2424 robj *keyobj = dictGetEntryKey(de);
2425
2426 sds key = keyobj->ptr;
2427 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2428 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2429 if (expireIfNeeded(c->db,keyobj) == 0) {
2430 if (numkeys != 0)
2431 addReply(c,shared.space);
2432 addReply(c,keyobj);
2433 numkeys++;
2434 keyslen += sdslen(key);
2435 }
2436 }
2437 }
2438 dictReleaseIterator(di);
2439 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2440 addReply(c,shared.crlf);
2441 }
2442
2443 static void dbsizeCommand(redisClient *c) {
2444 addReplySds(c,
2445 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2446 }
2447
2448 static void lastsaveCommand(redisClient *c) {
2449 addReplySds(c,
2450 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2451 }
2452
2453 static void typeCommand(redisClient *c) {
2454 robj *o;
2455 char *type;
2456
2457 o = lookupKeyRead(c->db,c->argv[1]);
2458 if (o == NULL) {
2459 type = "+none";
2460 } else {
2461 switch(o->type) {
2462 case REDIS_STRING: type = "+string"; break;
2463 case REDIS_LIST: type = "+list"; break;
2464 case REDIS_SET: type = "+set"; break;
2465 default: type = "unknown"; break;
2466 }
2467 }
2468 addReplySds(c,sdsnew(type));
2469 addReply(c,shared.crlf);
2470 }
2471
2472 static void saveCommand(redisClient *c) {
2473 if (server.bgsaveinprogress) {
2474 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2475 return;
2476 }
2477 if (rdbSave(server.dbfilename) == REDIS_OK) {
2478 addReply(c,shared.ok);
2479 } else {
2480 addReply(c,shared.err);
2481 }
2482 }
2483
2484 static void bgsaveCommand(redisClient *c) {
2485 if (server.bgsaveinprogress) {
2486 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2487 return;
2488 }
2489 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2490 addReply(c,shared.ok);
2491 } else {
2492 addReply(c,shared.err);
2493 }
2494 }
2495
2496 static void shutdownCommand(redisClient *c) {
2497 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2498 /* XXX: TODO kill the child if there is a bgsave in progress */
2499 if (rdbSave(server.dbfilename) == REDIS_OK) {
2500 if (server.daemonize) {
2501 unlink(server.pidfile);
2502 }
2503 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2504 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2505 exit(1);
2506 } else {
2507 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2508 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2509 }
2510 }
2511
2512 static void renameGenericCommand(redisClient *c, int nx) {
2513 robj *o;
2514
2515 /* To use the same key as src and dst is probably an error */
2516 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2517 addReply(c,shared.sameobjecterr);
2518 return;
2519 }
2520
2521 o = lookupKeyWrite(c->db,c->argv[1]);
2522 if (o == NULL) {
2523 addReply(c,shared.nokeyerr);
2524 return;
2525 }
2526 incrRefCount(o);
2527 deleteIfVolatile(c->db,c->argv[2]);
2528 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2529 if (nx) {
2530 decrRefCount(o);
2531 addReply(c,shared.czero);
2532 return;
2533 }
2534 dictReplace(c->db->dict,c->argv[2],o);
2535 } else {
2536 incrRefCount(c->argv[2]);
2537 }
2538 deleteKey(c->db,c->argv[1]);
2539 server.dirty++;
2540 addReply(c,nx ? shared.cone : shared.ok);
2541 }
2542
2543 static void renameCommand(redisClient *c) {
2544 renameGenericCommand(c,0);
2545 }
2546
2547 static void renamenxCommand(redisClient *c) {
2548 renameGenericCommand(c,1);
2549 }
2550
2551 static void moveCommand(redisClient *c) {
2552 robj *o;
2553 redisDb *src, *dst;
2554 int srcid;
2555
2556 /* Obtain source and target DB pointers */
2557 src = c->db;
2558 srcid = c->db->id;
2559 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2560 addReply(c,shared.outofrangeerr);
2561 return;
2562 }
2563 dst = c->db;
2564 selectDb(c,srcid); /* Back to the source DB */
2565
2566 /* If the user is moving using as target the same
2567 * DB as the source DB it is probably an error. */
2568 if (src == dst) {
2569 addReply(c,shared.sameobjecterr);
2570 return;
2571 }
2572
2573 /* Check if the element exists and get a reference */
2574 o = lookupKeyWrite(c->db,c->argv[1]);
2575 if (!o) {
2576 addReply(c,shared.czero);
2577 return;
2578 }
2579
2580 /* Try to add the element to the target DB */
2581 deleteIfVolatile(dst,c->argv[1]);
2582 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2583 addReply(c,shared.czero);
2584 return;
2585 }
2586 incrRefCount(c->argv[1]);
2587 incrRefCount(o);
2588
2589 /* OK! key moved, free the entry in the source DB */
2590 deleteKey(src,c->argv[1]);
2591 server.dirty++;
2592 addReply(c,shared.cone);
2593 }
2594
2595 /* =================================== Lists ================================ */
2596 static void pushGenericCommand(redisClient *c, int where) {
2597 robj *lobj;
2598 list *list;
2599
2600 lobj = lookupKeyWrite(c->db,c->argv[1]);
2601 if (lobj == NULL) {
2602 lobj = createListObject();
2603 list = lobj->ptr;
2604 if (where == REDIS_HEAD) {
2605 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2606 } else {
2607 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2608 }
2609 dictAdd(c->db->dict,c->argv[1],lobj);
2610 incrRefCount(c->argv[1]);
2611 incrRefCount(c->argv[2]);
2612 } else {
2613 if (lobj->type != REDIS_LIST) {
2614 addReply(c,shared.wrongtypeerr);
2615 return;
2616 }
2617 list = lobj->ptr;
2618 if (where == REDIS_HEAD) {
2619 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2620 } else {
2621 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2622 }
2623 incrRefCount(c->argv[2]);
2624 }
2625 server.dirty++;
2626 addReply(c,shared.ok);
2627 }
2628
2629 static void lpushCommand(redisClient *c) {
2630 pushGenericCommand(c,REDIS_HEAD);
2631 }
2632
2633 static void rpushCommand(redisClient *c) {
2634 pushGenericCommand(c,REDIS_TAIL);
2635 }
2636
2637 static void llenCommand(redisClient *c) {
2638 robj *o;
2639 list *l;
2640
2641 o = lookupKeyRead(c->db,c->argv[1]);
2642 if (o == NULL) {
2643 addReply(c,shared.czero);
2644 return;
2645 } else {
2646 if (o->type != REDIS_LIST) {
2647 addReply(c,shared.wrongtypeerr);
2648 } else {
2649 l = o->ptr;
2650 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
2651 }
2652 }
2653 }
2654
2655 static void lindexCommand(redisClient *c) {
2656 robj *o;
2657 int index = atoi(c->argv[2]->ptr);
2658
2659 o = lookupKeyRead(c->db,c->argv[1]);
2660 if (o == NULL) {
2661 addReply(c,shared.nullbulk);
2662 } else {
2663 if (o->type != REDIS_LIST) {
2664 addReply(c,shared.wrongtypeerr);
2665 } else {
2666 list *list = o->ptr;
2667 listNode *ln;
2668
2669 ln = listIndex(list, index);
2670 if (ln == NULL) {
2671 addReply(c,shared.nullbulk);
2672 } else {
2673 robj *ele = listNodeValue(ln);
2674 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2675 addReply(c,ele);
2676 addReply(c,shared.crlf);
2677 }
2678 }
2679 }
2680 }
2681
2682 static void lsetCommand(redisClient *c) {
2683 robj *o;
2684 int index = atoi(c->argv[2]->ptr);
2685
2686 o = lookupKeyWrite(c->db,c->argv[1]);
2687 if (o == NULL) {
2688 addReply(c,shared.nokeyerr);
2689 } else {
2690 if (o->type != REDIS_LIST) {
2691 addReply(c,shared.wrongtypeerr);
2692 } else {
2693 list *list = o->ptr;
2694 listNode *ln;
2695
2696 ln = listIndex(list, index);
2697 if (ln == NULL) {
2698 addReply(c,shared.outofrangeerr);
2699 } else {
2700 robj *ele = listNodeValue(ln);
2701
2702 decrRefCount(ele);
2703 listNodeValue(ln) = c->argv[3];
2704 incrRefCount(c->argv[3]);
2705 addReply(c,shared.ok);
2706 server.dirty++;
2707 }
2708 }
2709 }
2710 }
2711
2712 static void popGenericCommand(redisClient *c, int where) {
2713 robj *o;
2714
2715 o = lookupKeyWrite(c->db,c->argv[1]);
2716 if (o == NULL) {
2717 addReply(c,shared.nullbulk);
2718 } else {
2719 if (o->type != REDIS_LIST) {
2720 addReply(c,shared.wrongtypeerr);
2721 } else {
2722 list *list = o->ptr;
2723 listNode *ln;
2724
2725 if (where == REDIS_HEAD)
2726 ln = listFirst(list);
2727 else
2728 ln = listLast(list);
2729
2730 if (ln == NULL) {
2731 addReply(c,shared.nullbulk);
2732 } else {
2733 robj *ele = listNodeValue(ln);
2734 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2735 addReply(c,ele);
2736 addReply(c,shared.crlf);
2737 listDelNode(list,ln);
2738 server.dirty++;
2739 }
2740 }
2741 }
2742 }
2743
2744 static void lpopCommand(redisClient *c) {
2745 popGenericCommand(c,REDIS_HEAD);
2746 }
2747
2748 static void rpopCommand(redisClient *c) {
2749 popGenericCommand(c,REDIS_TAIL);
2750 }
2751
2752 static void lrangeCommand(redisClient *c) {
2753 robj *o;
2754 int start = atoi(c->argv[2]->ptr);
2755 int end = atoi(c->argv[3]->ptr);
2756
2757 o = lookupKeyRead(c->db,c->argv[1]);
2758 if (o == NULL) {
2759 addReply(c,shared.nullmultibulk);
2760 } else {
2761 if (o->type != REDIS_LIST) {
2762 addReply(c,shared.wrongtypeerr);
2763 } else {
2764 list *list = o->ptr;
2765 listNode *ln;
2766 int llen = listLength(list);
2767 int rangelen, j;
2768 robj *ele;
2769
2770 /* convert negative indexes */
2771 if (start < 0) start = llen+start;
2772 if (end < 0) end = llen+end;
2773 if (start < 0) start = 0;
2774 if (end < 0) end = 0;
2775
2776 /* indexes sanity checks */
2777 if (start > end || start >= llen) {
2778 /* Out of range start or start > end result in empty list */
2779 addReply(c,shared.emptymultibulk);
2780 return;
2781 }
2782 if (end >= llen) end = llen-1;
2783 rangelen = (end-start)+1;
2784
2785 /* Return the result in form of a multi-bulk reply */
2786 ln = listIndex(list, start);
2787 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
2788 for (j = 0; j < rangelen; j++) {
2789 ele = listNodeValue(ln);
2790 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2791 addReply(c,ele);
2792 addReply(c,shared.crlf);
2793 ln = ln->next;
2794 }
2795 }
2796 }
2797 }
2798
2799 static void ltrimCommand(redisClient *c) {
2800 robj *o;
2801 int start = atoi(c->argv[2]->ptr);
2802 int end = atoi(c->argv[3]->ptr);
2803
2804 o = lookupKeyWrite(c->db,c->argv[1]);
2805 if (o == NULL) {
2806 addReply(c,shared.nokeyerr);
2807 } else {
2808 if (o->type != REDIS_LIST) {
2809 addReply(c,shared.wrongtypeerr);
2810 } else {
2811 list *list = o->ptr;
2812 listNode *ln;
2813 int llen = listLength(list);
2814 int j, ltrim, rtrim;
2815
2816 /* convert negative indexes */
2817 if (start < 0) start = llen+start;
2818 if (end < 0) end = llen+end;
2819 if (start < 0) start = 0;
2820 if (end < 0) end = 0;
2821
2822 /* indexes sanity checks */
2823 if (start > end || start >= llen) {
2824 /* Out of range start or start > end result in empty list */
2825 ltrim = llen;
2826 rtrim = 0;
2827 } else {
2828 if (end >= llen) end = llen-1;
2829 ltrim = start;
2830 rtrim = llen-end-1;
2831 }
2832
2833 /* Remove list elements to perform the trim */
2834 for (j = 0; j < ltrim; j++) {
2835 ln = listFirst(list);
2836 listDelNode(list,ln);
2837 }
2838 for (j = 0; j < rtrim; j++) {
2839 ln = listLast(list);
2840 listDelNode(list,ln);
2841 }
2842 addReply(c,shared.ok);
2843 server.dirty++;
2844 }
2845 }
2846 }
2847
2848 static void lremCommand(redisClient *c) {
2849 robj *o;
2850
2851 o = lookupKeyWrite(c->db,c->argv[1]);
2852 if (o == NULL) {
2853 addReply(c,shared.nokeyerr);
2854 } else {
2855 if (o->type != REDIS_LIST) {
2856 addReply(c,shared.wrongtypeerr);
2857 } else {
2858 list *list = o->ptr;
2859 listNode *ln, *next;
2860 int toremove = atoi(c->argv[2]->ptr);
2861 int removed = 0;
2862 int fromtail = 0;
2863
2864 if (toremove < 0) {
2865 toremove = -toremove;
2866 fromtail = 1;
2867 }
2868 ln = fromtail ? list->tail : list->head;
2869 while (ln) {
2870 robj *ele = listNodeValue(ln);
2871
2872 next = fromtail ? ln->prev : ln->next;
2873 if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) {
2874 listDelNode(list,ln);
2875 server.dirty++;
2876 removed++;
2877 if (toremove && removed == toremove) break;
2878 }
2879 ln = next;
2880 }
2881 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
2882 }
2883 }
2884 }
2885
2886 /* ==================================== Sets ================================ */
2887
2888 static void saddCommand(redisClient *c) {
2889 robj *set;
2890
2891 set = lookupKeyWrite(c->db,c->argv[1]);
2892 if (set == NULL) {
2893 set = createSetObject();
2894 dictAdd(c->db->dict,c->argv[1],set);
2895 incrRefCount(c->argv[1]);
2896 } else {
2897 if (set->type != REDIS_SET) {
2898 addReply(c,shared.wrongtypeerr);
2899 return;
2900 }
2901 }
2902 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
2903 incrRefCount(c->argv[2]);
2904 server.dirty++;
2905 addReply(c,shared.cone);
2906 } else {
2907 addReply(c,shared.czero);
2908 }
2909 }
2910
2911 static void sremCommand(redisClient *c) {
2912 robj *set;
2913
2914 set = lookupKeyWrite(c->db,c->argv[1]);
2915 if (set == NULL) {
2916 addReply(c,shared.czero);
2917 } else {
2918 if (set->type != REDIS_SET) {
2919 addReply(c,shared.wrongtypeerr);
2920 return;
2921 }
2922 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
2923 server.dirty++;
2924 addReply(c,shared.cone);
2925 } else {
2926 addReply(c,shared.czero);
2927 }
2928 }
2929 }
2930
2931 static void smoveCommand(redisClient *c) {
2932 robj *srcset, *dstset;
2933
2934 srcset = lookupKeyWrite(c->db,c->argv[1]);
2935 dstset = lookupKeyWrite(c->db,c->argv[2]);
2936
2937 /* If the source key does not exist return 0, if it's of the wrong type
2938 * raise an error */
2939 if (srcset == NULL || srcset->type != REDIS_SET) {
2940 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
2941 return;
2942 }
2943 /* Error if the destination key is not a set as well */
2944 if (dstset && dstset->type != REDIS_SET) {
2945 addReply(c,shared.wrongtypeerr);
2946 return;
2947 }
2948 /* Remove the element from the source set */
2949 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
2950 /* Key not found in the src set! return zero */
2951 addReply(c,shared.czero);
2952 return;
2953 }
2954 server.dirty++;
2955 /* Add the element to the destination set */
2956 if (!dstset) {
2957 dstset = createSetObject();
2958 dictAdd(c->db->dict,c->argv[2],dstset);
2959 incrRefCount(c->argv[2]);
2960 }
2961 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
2962 incrRefCount(c->argv[3]);
2963 addReply(c,shared.cone);
2964 }
2965
2966 static void sismemberCommand(redisClient *c) {
2967 robj *set;
2968
2969 set = lookupKeyRead(c->db,c->argv[1]);
2970 if (set == NULL) {
2971 addReply(c,shared.czero);
2972 } else {
2973 if (set->type != REDIS_SET) {
2974 addReply(c,shared.wrongtypeerr);
2975 return;
2976 }
2977 if (dictFind(set->ptr,c->argv[2]))
2978 addReply(c,shared.cone);
2979 else
2980 addReply(c,shared.czero);
2981 }
2982 }
2983
2984 static void scardCommand(redisClient *c) {
2985 robj *o;
2986 dict *s;
2987
2988 o = lookupKeyRead(c->db,c->argv[1]);
2989 if (o == NULL) {
2990 addReply(c,shared.czero);
2991 return;
2992 } else {
2993 if (o->type != REDIS_SET) {
2994 addReply(c,shared.wrongtypeerr);
2995 } else {
2996 s = o->ptr;
2997 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
2998 dictSize(s)));
2999 }
3000 }
3001 }
3002
3003 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3004 dict **d1 = (void*) s1, **d2 = (void*) s2;
3005
3006 return dictSize(*d1)-dictSize(*d2);
3007 }
3008
3009 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3010 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3011 dictIterator *di;
3012 dictEntry *de;
3013 robj *lenobj = NULL, *dstset = NULL;
3014 int j, cardinality = 0;
3015
3016 if (!dv) oom("sinterGenericCommand");
3017 for (j = 0; j < setsnum; j++) {
3018 robj *setobj;
3019
3020 setobj = dstkey ?
3021 lookupKeyWrite(c->db,setskeys[j]) :
3022 lookupKeyRead(c->db,setskeys[j]);
3023 if (!setobj) {
3024 zfree(dv);
3025 if (dstkey) {
3026 deleteKey(c->db,dstkey);
3027 addReply(c,shared.ok);
3028 } else {
3029 addReply(c,shared.nullmultibulk);
3030 }
3031 return;
3032 }
3033 if (setobj->type != REDIS_SET) {
3034 zfree(dv);
3035 addReply(c,shared.wrongtypeerr);
3036 return;
3037 }
3038 dv[j] = setobj->ptr;
3039 }
3040 /* Sort sets from the smallest to largest, this will improve our
3041 * algorithm's performace */
3042 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3043
3044 /* The first thing we should output is the total number of elements...
3045 * since this is a multi-bulk write, but at this stage we don't know
3046 * the intersection set size, so we use a trick, append an empty object
3047 * to the output list and save the pointer to later modify it with the
3048 * right length */
3049 if (!dstkey) {
3050 lenobj = createObject(REDIS_STRING,NULL);
3051 addReply(c,lenobj);
3052 decrRefCount(lenobj);
3053 } else {
3054 /* If we have a target key where to store the resulting set
3055 * create this key with an empty set inside */
3056 dstset = createSetObject();
3057 }
3058
3059 /* Iterate all the elements of the first (smallest) set, and test
3060 * the element against all the other sets, if at least one set does
3061 * not include the element it is discarded */
3062 di = dictGetIterator(dv[0]);
3063 if (!di) oom("dictGetIterator");
3064
3065 while((de = dictNext(di)) != NULL) {
3066 robj *ele;
3067
3068 for (j = 1; j < setsnum; j++)
3069 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3070 if (j != setsnum)
3071 continue; /* at least one set does not contain the member */
3072 ele = dictGetEntryKey(de);
3073 if (!dstkey) {
3074 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
3075 addReply(c,ele);
3076 addReply(c,shared.crlf);
3077 cardinality++;
3078 } else {
3079 dictAdd(dstset->ptr,ele,NULL);
3080 incrRefCount(ele);
3081 }
3082 }
3083 dictReleaseIterator(di);
3084
3085 if (dstkey) {
3086 /* Store the resulting set into the target */
3087 deleteKey(c->db,dstkey);
3088 dictAdd(c->db->dict,dstkey,dstset);
3089 incrRefCount(dstkey);
3090 }
3091
3092 if (!dstkey) {
3093 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3094 } else {
3095 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3096 dictSize((dict*)dstset->ptr)));
3097 server.dirty++;
3098 }
3099 zfree(dv);
3100 }
3101
3102 static void sinterCommand(redisClient *c) {
3103 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3104 }
3105
3106 static void sinterstoreCommand(redisClient *c) {
3107 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3108 }
3109
3110 #define REDIS_OP_UNION 0
3111 #define REDIS_OP_DIFF 1
3112
3113 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3114 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3115 dictIterator *di;
3116 dictEntry *de;
3117 robj *dstset = NULL;
3118 int j, cardinality = 0;
3119
3120 if (!dv) oom("sunionDiffGenericCommand");
3121 for (j = 0; j < setsnum; j++) {
3122 robj *setobj;
3123
3124 setobj = dstkey ?
3125 lookupKeyWrite(c->db,setskeys[j]) :
3126 lookupKeyRead(c->db,setskeys[j]);
3127 if (!setobj) {
3128 dv[j] = NULL;
3129 continue;
3130 }
3131 if (setobj->type != REDIS_SET) {
3132 zfree(dv);
3133 addReply(c,shared.wrongtypeerr);
3134 return;
3135 }
3136 dv[j] = setobj->ptr;
3137 }
3138
3139 /* We need a temp set object to store our union. If the dstkey
3140 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3141 * this set object will be the resulting object to set into the target key*/
3142 dstset = createSetObject();
3143
3144 /* Iterate all the elements of all the sets, add every element a single
3145 * time to the result set */
3146 for (j = 0; j < setsnum; j++) {
3147 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3148 if (!dv[j]) continue; /* non existing keys are like empty sets */
3149
3150 di = dictGetIterator(dv[j]);
3151 if (!di) oom("dictGetIterator");
3152
3153 while((de = dictNext(di)) != NULL) {
3154 robj *ele;
3155
3156 /* dictAdd will not add the same element multiple times */
3157 ele = dictGetEntryKey(de);
3158 if (op == REDIS_OP_UNION || j == 0) {
3159 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3160 incrRefCount(ele);
3161 cardinality++;
3162 }
3163 } else if (op == REDIS_OP_DIFF) {
3164 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3165 cardinality--;
3166 }
3167 }
3168 }
3169 dictReleaseIterator(di);
3170
3171 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3172 }
3173
3174 /* Output the content of the resulting set, if not in STORE mode */
3175 if (!dstkey) {
3176 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3177 di = dictGetIterator(dstset->ptr);
3178 if (!di) oom("dictGetIterator");
3179 while((de = dictNext(di)) != NULL) {
3180 robj *ele;
3181
3182 ele = dictGetEntryKey(de);
3183 addReplySds(c,sdscatprintf(sdsempty(),
3184 "$%d\r\n",sdslen(ele->ptr)));
3185 addReply(c,ele);
3186 addReply(c,shared.crlf);
3187 }
3188 dictReleaseIterator(di);
3189 } else {
3190 /* If we have a target key where to store the resulting set
3191 * create this key with the result set inside */
3192 deleteKey(c->db,dstkey);
3193 dictAdd(c->db->dict,dstkey,dstset);
3194 incrRefCount(dstkey);
3195 }
3196
3197 /* Cleanup */
3198 if (!dstkey) {
3199 decrRefCount(dstset);
3200 } else {
3201 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3202 dictSize((dict*)dstset->ptr)));
3203 server.dirty++;
3204 }
3205 zfree(dv);
3206 }
3207
3208 static void sunionCommand(redisClient *c) {
3209 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3210 }
3211
3212 static void sunionstoreCommand(redisClient *c) {
3213 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3214 }
3215
3216 static void sdiffCommand(redisClient *c) {
3217 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3218 }
3219
3220 static void sdiffstoreCommand(redisClient *c) {
3221 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3222 }
3223
3224 static void flushdbCommand(redisClient *c) {
3225 server.dirty += dictSize(c->db->dict);
3226 dictEmpty(c->db->dict);
3227 dictEmpty(c->db->expires);
3228 addReply(c,shared.ok);
3229 }
3230
3231 static void flushallCommand(redisClient *c) {
3232 server.dirty += emptyDb();
3233 addReply(c,shared.ok);
3234 rdbSave(server.dbfilename);
3235 server.dirty++;
3236 }
3237
3238 redisSortOperation *createSortOperation(int type, robj *pattern) {
3239 redisSortOperation *so = zmalloc(sizeof(*so));
3240 if (!so) oom("createSortOperation");
3241 so->type = type;
3242 so->pattern = pattern;
3243 return so;
3244 }
3245
3246 /* Return the value associated to the key with a name obtained
3247 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3248 robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3249 char *p;
3250 sds spat, ssub;
3251 robj keyobj;
3252 int prefixlen, sublen, postfixlen;
3253 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3254 struct {
3255 long len;
3256 long free;
3257 char buf[REDIS_SORTKEY_MAX+1];
3258 } keyname;
3259
3260 spat = pattern->ptr;
3261 ssub = subst->ptr;
3262 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3263 p = strchr(spat,'*');
3264 if (!p) return NULL;
3265
3266 prefixlen = p-spat;
3267 sublen = sdslen(ssub);
3268 postfixlen = sdslen(spat)-(prefixlen+1);
3269 memcpy(keyname.buf,spat,prefixlen);
3270 memcpy(keyname.buf+prefixlen,ssub,sublen);
3271 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3272 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3273 keyname.len = prefixlen+sublen+postfixlen;
3274
3275 keyobj.refcount = 1;
3276 keyobj.type = REDIS_STRING;
3277 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3278
3279 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3280 return lookupKeyRead(db,&keyobj);
3281 }
3282
3283 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3284 * the additional parameter is not standard but a BSD-specific we have to
3285 * pass sorting parameters via the global 'server' structure */
3286 static int sortCompare(const void *s1, const void *s2) {
3287 const redisSortObject *so1 = s1, *so2 = s2;
3288 int cmp;
3289
3290 if (!server.sort_alpha) {
3291 /* Numeric sorting. Here it's trivial as we precomputed scores */
3292 if (so1->u.score > so2->u.score) {
3293 cmp = 1;
3294 } else if (so1->u.score < so2->u.score) {
3295 cmp = -1;
3296 } else {
3297 cmp = 0;
3298 }
3299 } else {
3300 /* Alphanumeric sorting */
3301 if (server.sort_bypattern) {
3302 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3303 /* At least one compare object is NULL */
3304 if (so1->u.cmpobj == so2->u.cmpobj)
3305 cmp = 0;
3306 else if (so1->u.cmpobj == NULL)
3307 cmp = -1;
3308 else
3309 cmp = 1;
3310 } else {
3311 /* We have both the objects, use strcoll */
3312 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3313 }
3314 } else {
3315 /* Compare elements directly */
3316 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3317 }
3318 }
3319 return server.sort_desc ? -cmp : cmp;
3320 }
3321
3322 /* The SORT command is the most complex command in Redis. Warning: this code
3323 * is optimized for speed and a bit less for readability */
3324 static void sortCommand(redisClient *c) {
3325 list *operations;
3326 int outputlen = 0;
3327 int desc = 0, alpha = 0;
3328 int limit_start = 0, limit_count = -1, start, end;
3329 int j, dontsort = 0, vectorlen;
3330 int getop = 0; /* GET operation counter */
3331 robj *sortval, *sortby = NULL;
3332 redisSortObject *vector; /* Resulting vector to sort */
3333
3334 /* Lookup the key to sort. It must be of the right types */
3335 sortval = lookupKeyRead(c->db,c->argv[1]);
3336 if (sortval == NULL) {
3337 addReply(c,shared.nokeyerr);
3338 return;
3339 }
3340 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3341 addReply(c,shared.wrongtypeerr);
3342 return;
3343 }
3344
3345 /* Create a list of operations to perform for every sorted element.
3346 * Operations can be GET/DEL/INCR/DECR */
3347 operations = listCreate();
3348 listSetFreeMethod(operations,zfree);
3349 j = 2;
3350
3351 /* Now we need to protect sortval incrementing its count, in the future
3352 * SORT may have options able to overwrite/delete keys during the sorting
3353 * and the sorted key itself may get destroied */
3354 incrRefCount(sortval);
3355
3356 /* The SORT command has an SQL-alike syntax, parse it */
3357 while(j < c->argc) {
3358 int leftargs = c->argc-j-1;
3359 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3360 desc = 0;
3361 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3362 desc = 1;
3363 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3364 alpha = 1;
3365 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3366 limit_start = atoi(c->argv[j+1]->ptr);
3367 limit_count = atoi(c->argv[j+2]->ptr);
3368 j+=2;
3369 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3370 sortby = c->argv[j+1];
3371 /* If the BY pattern does not contain '*', i.e. it is constant,
3372 * we don't need to sort nor to lookup the weight keys. */
3373 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3374 j++;
3375 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3376 listAddNodeTail(operations,createSortOperation(
3377 REDIS_SORT_GET,c->argv[j+1]));
3378 getop++;
3379 j++;
3380 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3381 listAddNodeTail(operations,createSortOperation(
3382 REDIS_SORT_DEL,c->argv[j+1]));
3383 j++;
3384 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3385 listAddNodeTail(operations,createSortOperation(
3386 REDIS_SORT_INCR,c->argv[j+1]));
3387 j++;
3388 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3389 listAddNodeTail(operations,createSortOperation(
3390 REDIS_SORT_DECR,c->argv[j+1]));
3391 j++;
3392 } else {
3393 decrRefCount(sortval);
3394 listRelease(operations);
3395 addReply(c,shared.syntaxerr);
3396 return;
3397 }
3398 j++;
3399 }
3400
3401 /* Load the sorting vector with all the objects to sort */
3402 vectorlen = (sortval->type == REDIS_LIST) ?
3403 listLength((list*)sortval->ptr) :
3404 dictSize((dict*)sortval->ptr);
3405 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3406 if (!vector) oom("allocating objects vector for SORT");
3407 j = 0;
3408 if (sortval->type == REDIS_LIST) {
3409 list *list = sortval->ptr;
3410 listNode *ln;
3411
3412 listRewind(list);
3413 while((ln = listYield(list))) {
3414 robj *ele = ln->value;
3415 vector[j].obj = ele;
3416 vector[j].u.score = 0;
3417 vector[j].u.cmpobj = NULL;
3418 j++;
3419 }
3420 } else {
3421 dict *set = sortval->ptr;
3422 dictIterator *di;
3423 dictEntry *setele;
3424
3425 di = dictGetIterator(set);
3426 if (!di) oom("dictGetIterator");
3427 while((setele = dictNext(di)) != NULL) {
3428 vector[j].obj = dictGetEntryKey(setele);
3429 vector[j].u.score = 0;
3430 vector[j].u.cmpobj = NULL;
3431 j++;
3432 }
3433 dictReleaseIterator(di);
3434 }
3435 assert(j == vectorlen);
3436
3437 /* Now it's time to load the right scores in the sorting vector */
3438 if (dontsort == 0) {
3439 for (j = 0; j < vectorlen; j++) {
3440 if (sortby) {
3441 robj *byval;
3442
3443 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3444 if (!byval || byval->type != REDIS_STRING) continue;
3445 if (alpha) {
3446 vector[j].u.cmpobj = byval;
3447 incrRefCount(byval);
3448 } else {
3449 vector[j].u.score = strtod(byval->ptr,NULL);
3450 }
3451 } else {
3452 if (!alpha) vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3453 }
3454 }
3455 }
3456
3457 /* We are ready to sort the vector... perform a bit of sanity check
3458 * on the LIMIT option too. We'll use a partial version of quicksort. */
3459 start = (limit_start < 0) ? 0 : limit_start;
3460 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3461 if (start >= vectorlen) {
3462 start = vectorlen-1;
3463 end = vectorlen-2;
3464 }
3465 if (end >= vectorlen) end = vectorlen-1;
3466
3467 if (dontsort == 0) {
3468 server.sort_desc = desc;
3469 server.sort_alpha = alpha;
3470 server.sort_bypattern = sortby ? 1 : 0;
3471 if (sortby && (start != 0 || end != vectorlen-1))
3472 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3473 else
3474 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3475 }
3476
3477 /* Send command output to the output buffer, performing the specified
3478 * GET/DEL/INCR/DECR operations if any. */
3479 outputlen = getop ? getop*(end-start+1) : end-start+1;
3480 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3481 for (j = start; j <= end; j++) {
3482 listNode *ln;
3483 if (!getop) {
3484 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3485 sdslen(vector[j].obj->ptr)));
3486 addReply(c,vector[j].obj);
3487 addReply(c,shared.crlf);
3488 }
3489 listRewind(operations);
3490 while((ln = listYield(operations))) {
3491 redisSortOperation *sop = ln->value;
3492 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3493 vector[j].obj);
3494
3495 if (sop->type == REDIS_SORT_GET) {
3496 if (!val || val->type != REDIS_STRING) {
3497 addReply(c,shared.nullbulk);
3498 } else {
3499 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3500 sdslen(val->ptr)));
3501 addReply(c,val);
3502 addReply(c,shared.crlf);
3503 }
3504 } else if (sop->type == REDIS_SORT_DEL) {
3505 /* TODO */
3506 }
3507 }
3508 }
3509
3510 /* Cleanup */
3511 decrRefCount(sortval);
3512 listRelease(operations);
3513 for (j = 0; j < vectorlen; j++) {
3514 if (sortby && alpha && vector[j].u.cmpobj)
3515 decrRefCount(vector[j].u.cmpobj);
3516 }
3517 zfree(vector);
3518 }
3519
3520 static void infoCommand(redisClient *c) {
3521 sds info;
3522 time_t uptime = time(NULL)-server.stat_starttime;
3523
3524 info = sdscatprintf(sdsempty(),
3525 "redis_version:%s\r\n"
3526 "uptime_in_seconds:%d\r\n"
3527 "uptime_in_days:%d\r\n"
3528 "connected_clients:%d\r\n"
3529 "connected_slaves:%d\r\n"
3530 "used_memory:%zu\r\n"
3531 "changes_since_last_save:%lld\r\n"
3532 "bgsave_in_progress:%d\r\n"
3533 "last_save_time:%d\r\n"
3534 "total_connections_received:%lld\r\n"
3535 "total_commands_processed:%lld\r\n"
3536 "role:%s\r\n"
3537 ,REDIS_VERSION,
3538 uptime,
3539 uptime/(3600*24),
3540 listLength(server.clients)-listLength(server.slaves),
3541 listLength(server.slaves),
3542 server.usedmemory,
3543 server.dirty,
3544 server.bgsaveinprogress,
3545 server.lastsave,
3546 server.stat_numconnections,
3547 server.stat_numcommands,
3548 server.masterhost == NULL ? "master" : "slave"
3549 );
3550 if (server.masterhost) {
3551 info = sdscatprintf(info,
3552 "master_host:%s\r\n"
3553 "master_port:%d\r\n"
3554 "master_link_status:%s\r\n"
3555 "master_last_io_seconds_ago:%d\r\n"
3556 ,server.masterhost,
3557 server.masterport,
3558 (server.replstate == REDIS_REPL_CONNECTED) ?
3559 "up" : "down",
3560 (int)(time(NULL)-server.master->lastinteraction)
3561 );
3562 }
3563 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
3564 addReplySds(c,info);
3565 addReply(c,shared.crlf);
3566 }
3567
3568 static void monitorCommand(redisClient *c) {
3569 /* ignore MONITOR if aleady slave or in monitor mode */
3570 if (c->flags & REDIS_SLAVE) return;
3571
3572 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3573 c->slaveseldb = 0;
3574 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3575 addReply(c,shared.ok);
3576 }
3577
3578 /* ================================= Expire ================================= */
3579 static int removeExpire(redisDb *db, robj *key) {
3580 if (dictDelete(db->expires,key) == DICT_OK) {
3581 return 1;
3582 } else {
3583 return 0;
3584 }
3585 }
3586
3587 static int setExpire(redisDb *db, robj *key, time_t when) {
3588 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
3589 return 0;
3590 } else {
3591 incrRefCount(key);
3592 return 1;
3593 }
3594 }
3595
3596 /* Return the expire time of the specified key, or -1 if no expire
3597 * is associated with this key (i.e. the key is non volatile) */
3598 static time_t getExpire(redisDb *db, robj *key) {
3599 dictEntry *de;
3600
3601 /* No expire? return ASAP */
3602 if (dictSize(db->expires) == 0 ||
3603 (de = dictFind(db->expires,key)) == NULL) return -1;
3604
3605 return (time_t) dictGetEntryVal(de);
3606 }
3607
3608 static int expireIfNeeded(redisDb *db, robj *key) {
3609 time_t when;
3610 dictEntry *de;
3611
3612 /* No expire? return ASAP */
3613 if (dictSize(db->expires) == 0 ||
3614 (de = dictFind(db->expires,key)) == NULL) return 0;
3615
3616 /* Lookup the expire */
3617 when = (time_t) dictGetEntryVal(de);
3618 if (time(NULL) <= when) return 0;
3619
3620 /* Delete the key */
3621 dictDelete(db->expires,key);
3622 return dictDelete(db->dict,key) == DICT_OK;
3623 }
3624
3625 static int deleteIfVolatile(redisDb *db, robj *key) {
3626 dictEntry *de;
3627
3628 /* No expire? return ASAP */
3629 if (dictSize(db->expires) == 0 ||
3630 (de = dictFind(db->expires,key)) == NULL) return 0;
3631
3632 /* Delete the key */
3633 server.dirty++;
3634 dictDelete(db->expires,key);
3635 return dictDelete(db->dict,key) == DICT_OK;
3636 }
3637
3638 static void expireCommand(redisClient *c) {
3639 dictEntry *de;
3640 int seconds = atoi(c->argv[2]->ptr);
3641
3642 de = dictFind(c->db->dict,c->argv[1]);
3643 if (de == NULL) {
3644 addReply(c,shared.czero);
3645 return;
3646 }
3647 if (seconds <= 0) {
3648 addReply(c, shared.czero);
3649 return;
3650 } else {
3651 time_t when = time(NULL)+seconds;
3652 if (setExpire(c->db,c->argv[1],when))
3653 addReply(c,shared.cone);
3654 else
3655 addReply(c,shared.czero);
3656 return;
3657 }
3658 }
3659
3660 static void ttlCommand(redisClient *c) {
3661 time_t expire;
3662 int ttl = -1;
3663
3664 expire = getExpire(c->db,c->argv[1]);
3665 if (expire != -1) {
3666 ttl = (int) (expire-time(NULL));
3667 if (ttl < 0) ttl = -1;
3668 }
3669 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
3670 }
3671
3672 /* =============================== Replication ============================= */
3673
3674 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
3675 ssize_t nwritten, ret = size;
3676 time_t start = time(NULL);
3677
3678 timeout++;
3679 while(size) {
3680 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
3681 nwritten = write(fd,ptr,size);
3682 if (nwritten == -1) return -1;
3683 ptr += nwritten;
3684 size -= nwritten;
3685 }
3686 if ((time(NULL)-start) > timeout) {
3687 errno = ETIMEDOUT;
3688 return -1;
3689 }
3690 }
3691 return ret;
3692 }
3693
3694 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
3695 ssize_t nread, totread = 0;
3696 time_t start = time(NULL);
3697
3698 timeout++;
3699 while(size) {
3700 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
3701 nread = read(fd,ptr,size);
3702 if (nread == -1) return -1;
3703 ptr += nread;
3704 size -= nread;
3705 totread += nread;
3706 }
3707 if ((time(NULL)-start) > timeout) {
3708 errno = ETIMEDOUT;
3709 return -1;
3710 }
3711 }
3712 return totread;
3713 }
3714
3715 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
3716 ssize_t nread = 0;
3717
3718 size--;
3719 while(size) {
3720 char c;
3721
3722 if (syncRead(fd,&c,1,timeout) == -1) return -1;
3723 if (c == '\n') {
3724 *ptr = '\0';
3725 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
3726 return nread;
3727 } else {
3728 *ptr++ = c;
3729 *ptr = '\0';
3730 nread++;
3731 }
3732 }
3733 return nread;
3734 }
3735
3736 static void syncCommand(redisClient *c) {
3737 /* ignore SYNC if aleady slave or in monitor mode */
3738 if (c->flags & REDIS_SLAVE) return;
3739
3740 /* SYNC can't be issued when the server has pending data to send to
3741 * the client about already issued commands. We need a fresh reply
3742 * buffer registering the differences between the BGSAVE and the current
3743 * dataset, so that we can copy to other slaves if needed. */
3744 if (listLength(c->reply) != 0) {
3745 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3746 return;
3747 }
3748
3749 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
3750 /* Here we need to check if there is a background saving operation
3751 * in progress, or if it is required to start one */
3752 if (server.bgsaveinprogress) {
3753 /* Ok a background save is in progress. Let's check if it is a good
3754 * one for replication, i.e. if there is another slave that is
3755 * registering differences since the server forked to save */
3756 redisClient *slave;
3757 listNode *ln;
3758
3759 listRewind(server.slaves);
3760 while((ln = listYield(server.slaves))) {
3761 slave = ln->value;
3762 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
3763 }
3764 if (ln) {
3765 /* Perfect, the server is already registering differences for
3766 * another slave. Set the right state, and copy the buffer. */
3767 listRelease(c->reply);
3768 c->reply = listDup(slave->reply);
3769 if (!c->reply) oom("listDup copying slave reply list");
3770 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3771 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
3772 } else {
3773 /* No way, we need to wait for the next BGSAVE in order to
3774 * register differences */
3775 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
3776 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
3777 }
3778 } else {
3779 /* Ok we don't have a BGSAVE in progress, let's start one */
3780 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
3781 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3782 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
3783 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
3784 return;
3785 }
3786 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3787 }
3788 c->repldbfd = -1;
3789 c->flags |= REDIS_SLAVE;
3790 c->slaveseldb = 0;
3791 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
3792 return;
3793 }
3794
3795 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
3796 redisClient *slave = privdata;
3797 REDIS_NOTUSED(el);
3798 REDIS_NOTUSED(mask);
3799 char buf[REDIS_IOBUF_LEN];
3800 ssize_t nwritten, buflen;
3801
3802 if (slave->repldboff == 0) {
3803 /* Write the bulk write count before to transfer the DB. In theory here
3804 * we don't know how much room there is in the output buffer of the
3805 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3806 * operations) will never be smaller than the few bytes we need. */
3807 sds bulkcount;
3808
3809 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3810 slave->repldbsize);
3811 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
3812 {
3813 sdsfree(bulkcount);
3814 freeClient(slave);
3815 return;
3816 }
3817 sdsfree(bulkcount);
3818 }
3819 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
3820 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
3821 if (buflen <= 0) {
3822 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
3823 (buflen == 0) ? "premature EOF" : strerror(errno));
3824 freeClient(slave);
3825 return;
3826 }
3827 if ((nwritten = write(fd,buf,buflen)) == -1) {
3828 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
3829 strerror(errno));
3830 freeClient(slave);
3831 return;
3832 }
3833 slave->repldboff += nwritten;
3834 if (slave->repldboff == slave->repldbsize) {
3835 close(slave->repldbfd);
3836 slave->repldbfd = -1;
3837 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3838 slave->replstate = REDIS_REPL_ONLINE;
3839 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
3840 sendReplyToClient, slave, NULL) == AE_ERR) {
3841 freeClient(slave);
3842 return;
3843 }
3844 addReplySds(slave,sdsempty());
3845 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
3846 }
3847 }
3848
3849 static void updateSalvesWaitingBgsave(int bgsaveerr) {
3850 listNode *ln;
3851 int startbgsave = 0;
3852
3853 listRewind(server.slaves);
3854 while((ln = listYield(server.slaves))) {
3855 redisClient *slave = ln->value;
3856
3857 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
3858 startbgsave = 1;
3859 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3860 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
3861 struct stat buf;
3862
3863 if (bgsaveerr != REDIS_OK) {
3864 freeClient(slave);
3865 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
3866 continue;
3867 }
3868 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
3869 fstat(slave->repldbfd,&buf) == -1) {
3870 freeClient(slave);
3871 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
3872 continue;
3873 }
3874 slave->repldboff = 0;
3875 slave->repldbsize = buf.st_size;
3876 slave->replstate = REDIS_REPL_SEND_BULK;
3877 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3878 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
3879 freeClient(slave);
3880 continue;
3881 }
3882 }
3883 }
3884 if (startbgsave) {
3885 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3886 listRewind(server.slaves);
3887 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
3888 while((ln = listYield(server.slaves))) {
3889 redisClient *slave = ln->value;
3890
3891 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
3892 freeClient(slave);
3893 }
3894 }
3895 }
3896 }
3897
3898 static int syncWithMaster(void) {
3899 char buf[1024], tmpfile[256];
3900 int dumpsize;
3901 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
3902 int dfd;
3903
3904 if (fd == -1) {
3905 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
3906 strerror(errno));
3907 return REDIS_ERR;
3908 }
3909 /* Issue the SYNC command */
3910 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
3911 close(fd);
3912 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
3913 strerror(errno));
3914 return REDIS_ERR;
3915 }
3916 /* Read the bulk write count */
3917 if (syncReadLine(fd,buf,1024,3600) == -1) {
3918 close(fd);
3919 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
3920 strerror(errno));
3921 return REDIS_ERR;
3922 }
3923 dumpsize = atoi(buf+1);
3924 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
3925 /* Read the bulk write data on a temp file */
3926 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
3927 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
3928 if (dfd == -1) {
3929 close(fd);
3930 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
3931 return REDIS_ERR;
3932 }
3933 while(dumpsize) {
3934 int nread, nwritten;
3935
3936 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
3937 if (nread == -1) {
3938 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
3939 strerror(errno));
3940 close(fd);
3941 close(dfd);
3942 return REDIS_ERR;
3943 }
3944 nwritten = write(dfd,buf,nread);
3945 if (nwritten == -1) {
3946 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
3947 close(fd);
3948 close(dfd);
3949 return REDIS_ERR;
3950 }
3951 dumpsize -= nread;
3952 }
3953 close(dfd);
3954 if (rename(tmpfile,server.dbfilename) == -1) {
3955 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
3956 unlink(tmpfile);
3957 close(fd);
3958 return REDIS_ERR;
3959 }
3960 emptyDb();
3961 if (rdbLoad(server.dbfilename) != REDIS_OK) {
3962 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
3963 close(fd);
3964 return REDIS_ERR;
3965 }
3966 server.master = createClient(fd);
3967 server.master->flags |= REDIS_MASTER;
3968 server.replstate = REDIS_REPL_CONNECTED;
3969 return REDIS_OK;
3970 }
3971
3972 static void slaveofCommand(redisClient *c) {
3973 if (!strcasecmp(c->argv[1]->ptr,"no") &&
3974 !strcasecmp(c->argv[2]->ptr,"one")) {
3975 if (server.masterhost) {
3976 sdsfree(server.masterhost);
3977 server.masterhost = NULL;
3978 if (server.master) freeClient(server.master);
3979 server.replstate = REDIS_REPL_NONE;
3980 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
3981 }
3982 } else {
3983 sdsfree(server.masterhost);
3984 server.masterhost = sdsdup(c->argv[1]->ptr);
3985 server.masterport = atoi(c->argv[2]->ptr);
3986 if (server.master) freeClient(server.master);
3987 server.replstate = REDIS_REPL_CONNECT;
3988 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
3989 server.masterhost, server.masterport);
3990 }
3991 addReply(c,shared.ok);
3992 }
3993
3994 /* ============================ Maxmemory directive ======================== */
3995
3996 /* This function gets called when 'maxmemory' is set on the config file to limit
3997 * the max memory used by the server, and we are out of memory.
3998 * This function will try to, in order:
3999 *
4000 * - Free objects from the free list
4001 * - Try to remove keys with an EXPIRE set
4002 *
4003 * It is not possible to free enough memory to reach used-memory < maxmemory
4004 * the server will start refusing commands that will enlarge even more the
4005 * memory usage.
4006 */
4007 static void freeMemoryIfNeeded(void) {
4008 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4009 if (listLength(server.objfreelist)) {
4010 robj *o;
4011
4012 listNode *head = listFirst(server.objfreelist);
4013 o = listNodeValue(head);
4014 listDelNode(server.objfreelist,head);
4015 zfree(o);
4016 } else {
4017 int j, k, freed = 0;
4018
4019 for (j = 0; j < server.dbnum; j++) {
4020 int minttl = -1;
4021 robj *minkey = NULL;
4022 struct dictEntry *de;
4023
4024 if (dictSize(server.db[j].expires)) {
4025 freed = 1;
4026 /* From a sample of three keys drop the one nearest to
4027 * the natural expire */
4028 for (k = 0; k < 3; k++) {
4029 time_t t;
4030
4031 de = dictGetRandomKey(server.db[j].expires);
4032 t = (time_t) dictGetEntryVal(de);
4033 if (minttl == -1 || t < minttl) {
4034 minkey = dictGetEntryKey(de);
4035 minttl = t;
4036 }
4037 }
4038 deleteKey(server.db+j,minkey);
4039 }
4040 }
4041 if (!freed) return; /* nothing to free... */
4042 }
4043 }
4044 }
4045
4046 /* =================================== Main! ================================ */
4047
4048 #ifdef __linux__
4049 int linuxOvercommitMemoryValue(void) {
4050 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4051 char buf[64];
4052
4053 if (!fp) return -1;
4054 if (fgets(buf,64,fp) == NULL) {
4055 fclose(fp);
4056 return -1;
4057 }
4058 fclose(fp);
4059
4060 return atoi(buf);
4061 }
4062
4063 void linuxOvercommitMemoryWarning(void) {
4064 if (linuxOvercommitMemoryValue() == 0) {
4065 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4066 }
4067 }
4068 #endif /* __linux__ */
4069
4070 static void daemonize(void) {
4071 int fd;
4072 FILE *fp;
4073
4074 if (fork() != 0) exit(0); /* parent exits */
4075 setsid(); /* create a new session */
4076
4077 /* Every output goes to /dev/null. If Redis is daemonized but
4078 * the 'logfile' is set to 'stdout' in the configuration file
4079 * it will not log at all. */
4080 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4081 dup2(fd, STDIN_FILENO);
4082 dup2(fd, STDOUT_FILENO);
4083 dup2(fd, STDERR_FILENO);
4084 if (fd > STDERR_FILENO) close(fd);
4085 }
4086 /* Try to write the pid file */
4087 fp = fopen(server.pidfile,"w");
4088 if (fp) {
4089 fprintf(fp,"%d\n",getpid());
4090 fclose(fp);
4091 }
4092 }
4093
4094 int main(int argc, char **argv) {
4095 #ifdef __linux__
4096 linuxOvercommitMemoryWarning();
4097 #endif
4098
4099 initServerConfig();
4100 if (argc == 2) {
4101 ResetServerSaveParams();
4102 loadServerConfig(argv[1]);
4103 } else if (argc > 2) {
4104 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4105 exit(1);
4106 } else {
4107 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4108 }
4109 initServer();
4110 if (server.daemonize) daemonize();
4111 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4112 if (rdbLoad(server.dbfilename) == REDIS_OK)
4113 redisLog(REDIS_NOTICE,"DB loaded from disk");
4114 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4115 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4116 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4117 aeMain(server.el);
4118 aeDeleteEventLoop(server.el);
4119 return 0;
4120 }