]> git.saurik.com Git - redis.git/blob - redis.c
DEBUG OBJECT implemented
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "0.101"
31
32 #include "fmacros.h"
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 #include <signal.h>
40 #include <sys/wait.h>
41 #include <errno.h>
42 #include <assert.h>
43 #include <ctype.h>
44 #include <stdarg.h>
45 #include <inttypes.h>
46 #include <arpa/inet.h>
47 #include <sys/stat.h>
48 #include <fcntl.h>
49 #include <sys/time.h>
50 #include <sys/resource.h>
51 #include <limits.h>
52 #include <execinfo.h>
53
54 #include "ae.h" /* Event driven programming library */
55 #include "sds.h" /* Dynamic safe strings */
56 #include "anet.h" /* Networking the easy way */
57 #include "dict.h" /* Hash tables */
58 #include "adlist.h" /* Linked lists */
59 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
60 #include "lzf.h" /* LZF compression library */
61 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
62
63 /* Error codes */
64 #define REDIS_OK 0
65 #define REDIS_ERR -1
66
67 /* Static server configuration */
68 #define REDIS_SERVERPORT 6379 /* TCP port */
69 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
70 #define REDIS_IOBUF_LEN 1024
71 #define REDIS_LOADBUF_LEN 1024
72 #define REDIS_STATIC_ARGS 4
73 #define REDIS_DEFAULT_DBNUM 16
74 #define REDIS_CONFIGLINE_MAX 1024
75 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
76 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
77 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
78
79 /* Hash table parameters */
80 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
81 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
82
83 /* Command flags */
84 #define REDIS_CMD_BULK 1 /* Bulk write command */
85 #define REDIS_CMD_INLINE 2 /* Inline command */
86 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
87 this flags will return an error when the 'maxmemory' option is set in the
88 config file and the server is using more than maxmemory bytes of memory.
89 In short this commands are denied on low memory conditions. */
90 #define REDIS_CMD_DENYOOM 4
91
92 /* Object types */
93 #define REDIS_STRING 0
94 #define REDIS_LIST 1
95 #define REDIS_SET 2
96 #define REDIS_HASH 3
97
98 /* Object types only used for dumping to disk */
99 #define REDIS_EXPIRETIME 253
100 #define REDIS_SELECTDB 254
101 #define REDIS_EOF 255
102
103 /* Defines related to the dump file format. To store 32 bits lengths for short
104 * keys requires a lot of space, so we check the most significant 2 bits of
105 * the first byte to interpreter the length:
106 *
107 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
108 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
109 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
110 * 11|000000 this means: specially encoded object will follow. The six bits
111 * number specify the kind of object that follows.
112 * See the REDIS_RDB_ENC_* defines.
113 *
114 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
115 * values, will fit inside. */
116 #define REDIS_RDB_6BITLEN 0
117 #define REDIS_RDB_14BITLEN 1
118 #define REDIS_RDB_32BITLEN 2
119 #define REDIS_RDB_ENCVAL 3
120 #define REDIS_RDB_LENERR UINT_MAX
121
122 /* When a length of a string object stored on disk has the first two bits
123 * set, the remaining two bits specify a special encoding for the object
124 * accordingly to the following defines: */
125 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
126 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
127 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
128 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
129
130 /* Client flags */
131 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
132 #define REDIS_SLAVE 2 /* This client is a slave server */
133 #define REDIS_MASTER 4 /* This client is a master server */
134 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
135
136 /* Slave replication state - slave side */
137 #define REDIS_REPL_NONE 0 /* No active replication */
138 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
139 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
140
141 /* Slave replication state - from the point of view of master
142 * Note that in SEND_BULK and ONLINE state the slave receives new updates
143 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
144 * to start the next background saving in order to send updates to it. */
145 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
146 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
147 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
148 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
149
150 /* List related stuff */
151 #define REDIS_HEAD 0
152 #define REDIS_TAIL 1
153
154 /* Sort operations */
155 #define REDIS_SORT_GET 0
156 #define REDIS_SORT_DEL 1
157 #define REDIS_SORT_INCR 2
158 #define REDIS_SORT_DECR 3
159 #define REDIS_SORT_ASC 4
160 #define REDIS_SORT_DESC 5
161 #define REDIS_SORTKEY_MAX 1024
162
163 /* Log levels */
164 #define REDIS_DEBUG 0
165 #define REDIS_NOTICE 1
166 #define REDIS_WARNING 2
167
168 /* Anti-warning macro... */
169 #define REDIS_NOTUSED(V) ((void) V)
170
171 /*================================= Data types ============================== */
172
173 /* A redis object, that is a type able to hold a string / list / set */
174 typedef struct redisObject {
175 void *ptr;
176 int type;
177 int refcount;
178 } robj;
179
180 typedef struct redisDb {
181 dict *dict;
182 dict *expires;
183 int id;
184 } redisDb;
185
186 /* With multiplexing we need to take per-clinet state.
187 * Clients are taken in a liked list. */
188 typedef struct redisClient {
189 int fd;
190 redisDb *db;
191 int dictid;
192 sds querybuf;
193 robj **argv;
194 int argc;
195 int bulklen; /* bulk read len. -1 if not in bulk read mode */
196 list *reply;
197 int sentlen;
198 time_t lastinteraction; /* time of the last interaction, used for timeout */
199 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
200 int slaveseldb; /* slave selected db, if this client is a slave */
201 int authenticated; /* when requirepass is non-NULL */
202 int replstate; /* replication state if this is a slave */
203 int repldbfd; /* replication DB file descriptor */
204 long repldboff; /* replication DB file offset */
205 off_t repldbsize; /* replication DB file size */
206 } redisClient;
207
208 struct saveparam {
209 time_t seconds;
210 int changes;
211 };
212
213 /* Global server state structure */
214 struct redisServer {
215 int port;
216 int fd;
217 redisDb *db;
218 dict *sharingpool;
219 unsigned int sharingpoolsize;
220 long long dirty; /* changes to DB from the last save */
221 list *clients;
222 list *slaves, *monitors;
223 char neterr[ANET_ERR_LEN];
224 aeEventLoop *el;
225 int cronloops; /* number of times the cron function run */
226 list *objfreelist; /* A list of freed objects to avoid malloc() */
227 time_t lastsave; /* Unix time of last save succeeede */
228 size_t usedmemory; /* Used memory in megabytes */
229 /* Fields used only for stats */
230 time_t stat_starttime; /* server start time */
231 long long stat_numcommands; /* number of processed commands */
232 long long stat_numconnections; /* number of connections received */
233 /* Configuration */
234 int verbosity;
235 int glueoutputbuf;
236 int maxidletime;
237 int dbnum;
238 int daemonize;
239 char *pidfile;
240 int bgsaveinprogress;
241 struct saveparam *saveparams;
242 int saveparamslen;
243 char *logfile;
244 char *bindaddr;
245 char *dbfilename;
246 char *requirepass;
247 int shareobjects;
248 /* Replication related */
249 int isslave;
250 char *masterhost;
251 int masterport;
252 redisClient *master; /* client that is master for this slave */
253 int replstate;
254 unsigned int maxclients;
255 unsigned int maxmemory;
256 /* Sort parameters - qsort_r() is only available under BSD so we
257 * have to take this state global, in order to pass it to sortCompare() */
258 int sort_desc;
259 int sort_alpha;
260 int sort_bypattern;
261 };
262
263 typedef void redisCommandProc(redisClient *c);
264 struct redisCommand {
265 char *name;
266 redisCommandProc *proc;
267 int arity;
268 int flags;
269 };
270
271 typedef struct _redisSortObject {
272 robj *obj;
273 union {
274 double score;
275 robj *cmpobj;
276 } u;
277 } redisSortObject;
278
279 typedef struct _redisSortOperation {
280 int type;
281 robj *pattern;
282 } redisSortOperation;
283
284 struct sharedObjectsStruct {
285 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
286 *colon, *nullbulk, *nullmultibulk,
287 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
288 *outofrangeerr, *plus,
289 *select0, *select1, *select2, *select3, *select4,
290 *select5, *select6, *select7, *select8, *select9;
291 } shared;
292
293 /*================================ Prototypes =============================== */
294
295 static void freeStringObject(robj *o);
296 static void freeListObject(robj *o);
297 static void freeSetObject(robj *o);
298 static void decrRefCount(void *o);
299 static robj *createObject(int type, void *ptr);
300 static void freeClient(redisClient *c);
301 static int rdbLoad(char *filename);
302 static void addReply(redisClient *c, robj *obj);
303 static void addReplySds(redisClient *c, sds s);
304 static void incrRefCount(robj *o);
305 static int rdbSaveBackground(char *filename);
306 static robj *createStringObject(char *ptr, size_t len);
307 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
308 static int syncWithMaster(void);
309 static robj *tryObjectSharing(robj *o);
310 static int removeExpire(redisDb *db, robj *key);
311 static int expireIfNeeded(redisDb *db, robj *key);
312 static int deleteIfVolatile(redisDb *db, robj *key);
313 static int deleteKey(redisDb *db, robj *key);
314 static time_t getExpire(redisDb *db, robj *key);
315 static int setExpire(redisDb *db, robj *key, time_t when);
316 static void updateSalvesWaitingBgsave(int bgsaveerr);
317 static void freeMemoryIfNeeded(void);
318
319 static void authCommand(redisClient *c);
320 static void pingCommand(redisClient *c);
321 static void echoCommand(redisClient *c);
322 static void setCommand(redisClient *c);
323 static void setnxCommand(redisClient *c);
324 static void getCommand(redisClient *c);
325 static void delCommand(redisClient *c);
326 static void existsCommand(redisClient *c);
327 static void incrCommand(redisClient *c);
328 static void decrCommand(redisClient *c);
329 static void incrbyCommand(redisClient *c);
330 static void decrbyCommand(redisClient *c);
331 static void selectCommand(redisClient *c);
332 static void randomkeyCommand(redisClient *c);
333 static void keysCommand(redisClient *c);
334 static void dbsizeCommand(redisClient *c);
335 static void lastsaveCommand(redisClient *c);
336 static void saveCommand(redisClient *c);
337 static void bgsaveCommand(redisClient *c);
338 static void shutdownCommand(redisClient *c);
339 static void moveCommand(redisClient *c);
340 static void renameCommand(redisClient *c);
341 static void renamenxCommand(redisClient *c);
342 static void lpushCommand(redisClient *c);
343 static void rpushCommand(redisClient *c);
344 static void lpopCommand(redisClient *c);
345 static void rpopCommand(redisClient *c);
346 static void llenCommand(redisClient *c);
347 static void lindexCommand(redisClient *c);
348 static void lrangeCommand(redisClient *c);
349 static void ltrimCommand(redisClient *c);
350 static void typeCommand(redisClient *c);
351 static void lsetCommand(redisClient *c);
352 static void saddCommand(redisClient *c);
353 static void sremCommand(redisClient *c);
354 static void smoveCommand(redisClient *c);
355 static void sismemberCommand(redisClient *c);
356 static void scardCommand(redisClient *c);
357 static void sinterCommand(redisClient *c);
358 static void sinterstoreCommand(redisClient *c);
359 static void sunionCommand(redisClient *c);
360 static void sunionstoreCommand(redisClient *c);
361 static void sdiffCommand(redisClient *c);
362 static void sdiffstoreCommand(redisClient *c);
363 static void syncCommand(redisClient *c);
364 static void flushdbCommand(redisClient *c);
365 static void flushallCommand(redisClient *c);
366 static void sortCommand(redisClient *c);
367 static void lremCommand(redisClient *c);
368 static void infoCommand(redisClient *c);
369 static void mgetCommand(redisClient *c);
370 static void monitorCommand(redisClient *c);
371 static void expireCommand(redisClient *c);
372 static void getSetCommand(redisClient *c);
373 static void ttlCommand(redisClient *c);
374 static void slaveofCommand(redisClient *c);
375 static void debugCommand(redisClient *c);
376
377 /*================================= Globals ================================= */
378
379 /* Global vars */
380 static struct redisServer server; /* server global state */
381 static struct redisCommand cmdTable[] = {
382 {"get",getCommand,2,REDIS_CMD_INLINE},
383 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
384 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
385 {"del",delCommand,-2,REDIS_CMD_INLINE},
386 {"exists",existsCommand,2,REDIS_CMD_INLINE},
387 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
388 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
389 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
390 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
391 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
392 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
393 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
394 {"llen",llenCommand,2,REDIS_CMD_INLINE},
395 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
396 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
397 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
398 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
399 {"lrem",lremCommand,4,REDIS_CMD_BULK},
400 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
401 {"srem",sremCommand,3,REDIS_CMD_BULK},
402 {"smove",smoveCommand,4,REDIS_CMD_BULK},
403 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
404 {"scard",scardCommand,2,REDIS_CMD_INLINE},
405 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
406 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
407 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
408 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
409 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
410 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
411 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
412 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
413 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
414 {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
415 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
416 {"select",selectCommand,2,REDIS_CMD_INLINE},
417 {"move",moveCommand,3,REDIS_CMD_INLINE},
418 {"rename",renameCommand,3,REDIS_CMD_INLINE},
419 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
420 {"expire",expireCommand,3,REDIS_CMD_INLINE},
421 {"keys",keysCommand,2,REDIS_CMD_INLINE},
422 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
423 {"auth",authCommand,2,REDIS_CMD_INLINE},
424 {"ping",pingCommand,1,REDIS_CMD_INLINE},
425 {"echo",echoCommand,2,REDIS_CMD_BULK},
426 {"save",saveCommand,1,REDIS_CMD_INLINE},
427 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
428 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
429 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
430 {"type",typeCommand,2,REDIS_CMD_INLINE},
431 {"sync",syncCommand,1,REDIS_CMD_INLINE},
432 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
433 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
434 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
435 {"info",infoCommand,1,REDIS_CMD_INLINE},
436 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
437 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
438 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
439 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
440 {NULL,NULL,0,0}
441 };
442
443 /*============================ Utility functions ============================ */
444
445 /* Glob-style pattern matching. */
446 int stringmatchlen(const char *pattern, int patternLen,
447 const char *string, int stringLen, int nocase)
448 {
449 while(patternLen) {
450 switch(pattern[0]) {
451 case '*':
452 while (pattern[1] == '*') {
453 pattern++;
454 patternLen--;
455 }
456 if (patternLen == 1)
457 return 1; /* match */
458 while(stringLen) {
459 if (stringmatchlen(pattern+1, patternLen-1,
460 string, stringLen, nocase))
461 return 1; /* match */
462 string++;
463 stringLen--;
464 }
465 return 0; /* no match */
466 break;
467 case '?':
468 if (stringLen == 0)
469 return 0; /* no match */
470 string++;
471 stringLen--;
472 break;
473 case '[':
474 {
475 int not, match;
476
477 pattern++;
478 patternLen--;
479 not = pattern[0] == '^';
480 if (not) {
481 pattern++;
482 patternLen--;
483 }
484 match = 0;
485 while(1) {
486 if (pattern[0] == '\\') {
487 pattern++;
488 patternLen--;
489 if (pattern[0] == string[0])
490 match = 1;
491 } else if (pattern[0] == ']') {
492 break;
493 } else if (patternLen == 0) {
494 pattern--;
495 patternLen++;
496 break;
497 } else if (pattern[1] == '-' && patternLen >= 3) {
498 int start = pattern[0];
499 int end = pattern[2];
500 int c = string[0];
501 if (start > end) {
502 int t = start;
503 start = end;
504 end = t;
505 }
506 if (nocase) {
507 start = tolower(start);
508 end = tolower(end);
509 c = tolower(c);
510 }
511 pattern += 2;
512 patternLen -= 2;
513 if (c >= start && c <= end)
514 match = 1;
515 } else {
516 if (!nocase) {
517 if (pattern[0] == string[0])
518 match = 1;
519 } else {
520 if (tolower((int)pattern[0]) == tolower((int)string[0]))
521 match = 1;
522 }
523 }
524 pattern++;
525 patternLen--;
526 }
527 if (not)
528 match = !match;
529 if (!match)
530 return 0; /* no match */
531 string++;
532 stringLen--;
533 break;
534 }
535 case '\\':
536 if (patternLen >= 2) {
537 pattern++;
538 patternLen--;
539 }
540 /* fall through */
541 default:
542 if (!nocase) {
543 if (pattern[0] != string[0])
544 return 0; /* no match */
545 } else {
546 if (tolower((int)pattern[0]) != tolower((int)string[0]))
547 return 0; /* no match */
548 }
549 string++;
550 stringLen--;
551 break;
552 }
553 pattern++;
554 patternLen--;
555 if (stringLen == 0) {
556 while(*pattern == '*') {
557 pattern++;
558 patternLen--;
559 }
560 break;
561 }
562 }
563 if (patternLen == 0 && stringLen == 0)
564 return 1;
565 return 0;
566 }
567
568 void redisLog(int level, const char *fmt, ...)
569 {
570 va_list ap;
571 FILE *fp;
572
573 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
574 if (!fp) return;
575
576 va_start(ap, fmt);
577 if (level >= server.verbosity) {
578 char *c = ".-*";
579 char buf[64];
580 time_t now;
581
582 now = time(NULL);
583 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
584 fprintf(fp,"%s %c ",buf,c[level]);
585 vfprintf(fp, fmt, ap);
586 fprintf(fp,"\n");
587 fflush(fp);
588 }
589 va_end(ap);
590
591 if (server.logfile) fclose(fp);
592 }
593
594 /*====================== Hash table type implementation ==================== */
595
596 /* This is an hash table type that uses the SDS dynamic strings libary as
597 * keys and radis objects as values (objects can hold SDS strings,
598 * lists, sets). */
599
600 static int sdsDictKeyCompare(void *privdata, const void *key1,
601 const void *key2)
602 {
603 int l1,l2;
604 DICT_NOTUSED(privdata);
605
606 l1 = sdslen((sds)key1);
607 l2 = sdslen((sds)key2);
608 if (l1 != l2) return 0;
609 return memcmp(key1, key2, l1) == 0;
610 }
611
612 static void dictRedisObjectDestructor(void *privdata, void *val)
613 {
614 DICT_NOTUSED(privdata);
615
616 decrRefCount(val);
617 }
618
619 static int dictSdsKeyCompare(void *privdata, const void *key1,
620 const void *key2)
621 {
622 const robj *o1 = key1, *o2 = key2;
623 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
624 }
625
626 static unsigned int dictSdsHash(const void *key) {
627 const robj *o = key;
628 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
629 }
630
631 static dictType setDictType = {
632 dictSdsHash, /* hash function */
633 NULL, /* key dup */
634 NULL, /* val dup */
635 dictSdsKeyCompare, /* key compare */
636 dictRedisObjectDestructor, /* key destructor */
637 NULL /* val destructor */
638 };
639
640 static dictType hashDictType = {
641 dictSdsHash, /* hash function */
642 NULL, /* key dup */
643 NULL, /* val dup */
644 dictSdsKeyCompare, /* key compare */
645 dictRedisObjectDestructor, /* key destructor */
646 dictRedisObjectDestructor /* val destructor */
647 };
648
649 /* ========================= Random utility functions ======================= */
650
651 /* Redis generally does not try to recover from out of memory conditions
652 * when allocating objects or strings, it is not clear if it will be possible
653 * to report this condition to the client since the networking layer itself
654 * is based on heap allocation for send buffers, so we simply abort.
655 * At least the code will be simpler to read... */
656 static void oom(const char *msg) {
657 fprintf(stderr, "%s: Out of memory\n",msg);
658 fflush(stderr);
659 sleep(1);
660 abort();
661 }
662
663 /* ====================== Redis server networking stuff ===================== */
664 void closeTimedoutClients(void) {
665 redisClient *c;
666 listNode *ln;
667 time_t now = time(NULL);
668
669 listRewind(server.clients);
670 while ((ln = listYield(server.clients)) != NULL) {
671 c = listNodeValue(ln);
672 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
673 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
674 (now - c->lastinteraction > server.maxidletime)) {
675 redisLog(REDIS_DEBUG,"Closing idle client");
676 freeClient(c);
677 }
678 }
679 }
680
681 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
682 * we resize the hash table to save memory */
683 void tryResizeHashTables(void) {
684 int j;
685
686 for (j = 0; j < server.dbnum; j++) {
687 long long size, used;
688
689 size = dictSlots(server.db[j].dict);
690 used = dictSize(server.db[j].dict);
691 if (size && used && size > REDIS_HT_MINSLOTS &&
692 (used*100/size < REDIS_HT_MINFILL)) {
693 redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
694 dictResize(server.db[j].dict);
695 redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
696 }
697 }
698 }
699
700 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
701 int j, loops = server.cronloops++;
702 REDIS_NOTUSED(eventLoop);
703 REDIS_NOTUSED(id);
704 REDIS_NOTUSED(clientData);
705
706 /* Update the global state with the amount of used memory */
707 server.usedmemory = zmalloc_used_memory();
708
709 /* Show some info about non-empty databases */
710 for (j = 0; j < server.dbnum; j++) {
711 long long size, used, vkeys;
712
713 size = dictSlots(server.db[j].dict);
714 used = dictSize(server.db[j].dict);
715 vkeys = dictSize(server.db[j].expires);
716 if (!(loops % 5) && used > 0) {
717 redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size);
718 /* dictPrintStats(server.dict); */
719 }
720 }
721
722 /* We don't want to resize the hash tables while a bacground saving
723 * is in progress: the saving child is created using fork() that is
724 * implemented with a copy-on-write semantic in most modern systems, so
725 * if we resize the HT while there is the saving child at work actually
726 * a lot of memory movements in the parent will cause a lot of pages
727 * copied. */
728 if (!server.bgsaveinprogress) tryResizeHashTables();
729
730 /* Show information about connected clients */
731 if (!(loops % 5)) {
732 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use",
733 listLength(server.clients)-listLength(server.slaves),
734 listLength(server.slaves),
735 server.usedmemory,
736 dictSize(server.sharingpool));
737 }
738
739 /* Close connections of timedout clients */
740 if (server.maxidletime && !(loops % 10))
741 closeTimedoutClients();
742
743 /* Check if a background saving in progress terminated */
744 if (server.bgsaveinprogress) {
745 int statloc;
746 /* XXX: TODO handle the case of the saving child killed */
747 if (wait4(-1,&statloc,WNOHANG,NULL)) {
748 int exitcode = WEXITSTATUS(statloc);
749 if (exitcode == 0) {
750 redisLog(REDIS_NOTICE,
751 "Background saving terminated with success");
752 server.dirty = 0;
753 server.lastsave = time(NULL);
754 } else {
755 redisLog(REDIS_WARNING,
756 "Background saving error");
757 }
758 server.bgsaveinprogress = 0;
759 updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
760 }
761 } else {
762 /* If there is not a background saving in progress check if
763 * we have to save now */
764 time_t now = time(NULL);
765 for (j = 0; j < server.saveparamslen; j++) {
766 struct saveparam *sp = server.saveparams+j;
767
768 if (server.dirty >= sp->changes &&
769 now-server.lastsave > sp->seconds) {
770 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
771 sp->changes, sp->seconds);
772 rdbSaveBackground(server.dbfilename);
773 break;
774 }
775 }
776 }
777
778 /* Try to expire a few timed out keys */
779 for (j = 0; j < server.dbnum; j++) {
780 redisDb *db = server.db+j;
781 int num = dictSize(db->expires);
782
783 if (num) {
784 time_t now = time(NULL);
785
786 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
787 num = REDIS_EXPIRELOOKUPS_PER_CRON;
788 while (num--) {
789 dictEntry *de;
790 time_t t;
791
792 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
793 t = (time_t) dictGetEntryVal(de);
794 if (now > t) {
795 deleteKey(db,dictGetEntryKey(de));
796 }
797 }
798 }
799 }
800
801 /* Check if we should connect to a MASTER */
802 if (server.replstate == REDIS_REPL_CONNECT) {
803 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
804 if (syncWithMaster() == REDIS_OK) {
805 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
806 }
807 }
808 return 1000;
809 }
810
811 static void createSharedObjects(void) {
812 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
813 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
814 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
815 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
816 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
817 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
818 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
819 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
820 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
821 /* no such key */
822 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
823 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
824 "-ERR Operation against a key holding the wrong kind of value\r\n"));
825 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
826 "-ERR no such key\r\n"));
827 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
828 "-ERR syntax error\r\n"));
829 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
830 "-ERR source and destination objects are the same\r\n"));
831 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
832 "-ERR index out of range\r\n"));
833 shared.space = createObject(REDIS_STRING,sdsnew(" "));
834 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
835 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
836 shared.select0 = createStringObject("select 0\r\n",10);
837 shared.select1 = createStringObject("select 1\r\n",10);
838 shared.select2 = createStringObject("select 2\r\n",10);
839 shared.select3 = createStringObject("select 3\r\n",10);
840 shared.select4 = createStringObject("select 4\r\n",10);
841 shared.select5 = createStringObject("select 5\r\n",10);
842 shared.select6 = createStringObject("select 6\r\n",10);
843 shared.select7 = createStringObject("select 7\r\n",10);
844 shared.select8 = createStringObject("select 8\r\n",10);
845 shared.select9 = createStringObject("select 9\r\n",10);
846 }
847
848 static void appendServerSaveParams(time_t seconds, int changes) {
849 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
850 if (server.saveparams == NULL) oom("appendServerSaveParams");
851 server.saveparams[server.saveparamslen].seconds = seconds;
852 server.saveparams[server.saveparamslen].changes = changes;
853 server.saveparamslen++;
854 }
855
856 static void ResetServerSaveParams() {
857 zfree(server.saveparams);
858 server.saveparams = NULL;
859 server.saveparamslen = 0;
860 }
861
862 static void initServerConfig() {
863 server.dbnum = REDIS_DEFAULT_DBNUM;
864 server.port = REDIS_SERVERPORT;
865 server.verbosity = REDIS_DEBUG;
866 server.maxidletime = REDIS_MAXIDLETIME;
867 server.saveparams = NULL;
868 server.logfile = NULL; /* NULL = log on standard output */
869 server.bindaddr = NULL;
870 server.glueoutputbuf = 1;
871 server.daemonize = 0;
872 server.pidfile = "/var/run/redis.pid";
873 server.dbfilename = "dump.rdb";
874 server.requirepass = NULL;
875 server.shareobjects = 0;
876 server.maxclients = 0;
877 server.maxmemory = 0;
878 ResetServerSaveParams();
879
880 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
881 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
882 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
883 /* Replication related */
884 server.isslave = 0;
885 server.masterhost = NULL;
886 server.masterport = 6379;
887 server.master = NULL;
888 server.replstate = REDIS_REPL_NONE;
889 }
890
891 static void initServer() {
892 int j;
893
894 signal(SIGHUP, SIG_IGN);
895 signal(SIGPIPE, SIG_IGN);
896
897 server.clients = listCreate();
898 server.slaves = listCreate();
899 server.monitors = listCreate();
900 server.objfreelist = listCreate();
901 createSharedObjects();
902 server.el = aeCreateEventLoop();
903 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
904 server.sharingpool = dictCreate(&setDictType,NULL);
905 server.sharingpoolsize = 1024;
906 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
907 oom("server initialization"); /* Fatal OOM */
908 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
909 if (server.fd == -1) {
910 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
911 exit(1);
912 }
913 for (j = 0; j < server.dbnum; j++) {
914 server.db[j].dict = dictCreate(&hashDictType,NULL);
915 server.db[j].expires = dictCreate(&setDictType,NULL);
916 server.db[j].id = j;
917 }
918 server.cronloops = 0;
919 server.bgsaveinprogress = 0;
920 server.lastsave = time(NULL);
921 server.dirty = 0;
922 server.usedmemory = 0;
923 server.stat_numcommands = 0;
924 server.stat_numconnections = 0;
925 server.stat_starttime = time(NULL);
926 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
927 }
928
929 /* Empty the whole database */
930 static long long emptyDb() {
931 int j;
932 long long removed = 0;
933
934 for (j = 0; j < server.dbnum; j++) {
935 removed += dictSize(server.db[j].dict);
936 dictEmpty(server.db[j].dict);
937 dictEmpty(server.db[j].expires);
938 }
939 return removed;
940 }
941
942 static int yesnotoi(char *s) {
943 if (!strcasecmp(s,"yes")) return 1;
944 else if (!strcasecmp(s,"no")) return 0;
945 else return -1;
946 }
947
948 /* I agree, this is a very rudimental way to load a configuration...
949 will improve later if the config gets more complex */
950 static void loadServerConfig(char *filename) {
951 FILE *fp = fopen(filename,"r");
952 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
953 int linenum = 0;
954 sds line = NULL;
955
956 if (!fp) {
957 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
958 exit(1);
959 }
960 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
961 sds *argv;
962 int argc, j;
963
964 linenum++;
965 line = sdsnew(buf);
966 line = sdstrim(line," \t\r\n");
967
968 /* Skip comments and blank lines*/
969 if (line[0] == '#' || line[0] == '\0') {
970 sdsfree(line);
971 continue;
972 }
973
974 /* Split into arguments */
975 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
976 sdstolower(argv[0]);
977
978 /* Execute config directives */
979 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
980 server.maxidletime = atoi(argv[1]);
981 if (server.maxidletime < 0) {
982 err = "Invalid timeout value"; goto loaderr;
983 }
984 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
985 server.port = atoi(argv[1]);
986 if (server.port < 1 || server.port > 65535) {
987 err = "Invalid port"; goto loaderr;
988 }
989 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
990 server.bindaddr = zstrdup(argv[1]);
991 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
992 int seconds = atoi(argv[1]);
993 int changes = atoi(argv[2]);
994 if (seconds < 1 || changes < 0) {
995 err = "Invalid save parameters"; goto loaderr;
996 }
997 appendServerSaveParams(seconds,changes);
998 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
999 if (chdir(argv[1]) == -1) {
1000 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1001 argv[1], strerror(errno));
1002 exit(1);
1003 }
1004 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1005 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1006 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1007 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1008 else {
1009 err = "Invalid log level. Must be one of debug, notice, warning";
1010 goto loaderr;
1011 }
1012 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1013 FILE *fp;
1014
1015 server.logfile = zstrdup(argv[1]);
1016 if (!strcasecmp(server.logfile,"stdout")) {
1017 zfree(server.logfile);
1018 server.logfile = NULL;
1019 }
1020 if (server.logfile) {
1021 /* Test if we are able to open the file. The server will not
1022 * be able to abort just for this problem later... */
1023 fp = fopen(server.logfile,"a");
1024 if (fp == NULL) {
1025 err = sdscatprintf(sdsempty(),
1026 "Can't open the log file: %s", strerror(errno));
1027 goto loaderr;
1028 }
1029 fclose(fp);
1030 }
1031 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1032 server.dbnum = atoi(argv[1]);
1033 if (server.dbnum < 1) {
1034 err = "Invalid number of databases"; goto loaderr;
1035 }
1036 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1037 server.maxclients = atoi(argv[1]);
1038 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1039 server.maxmemory = atoi(argv[1]);
1040 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1041 server.masterhost = sdsnew(argv[1]);
1042 server.masterport = atoi(argv[2]);
1043 server.replstate = REDIS_REPL_CONNECT;
1044 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1045 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1046 err = "argument must be 'yes' or 'no'"; goto loaderr;
1047 }
1048 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1049 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1050 err = "argument must be 'yes' or 'no'"; goto loaderr;
1051 }
1052 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1053 server.sharingpoolsize = atoi(argv[1]);
1054 if (server.sharingpoolsize < 1) {
1055 err = "invalid object sharing pool size"; goto loaderr;
1056 }
1057 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1058 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1059 err = "argument must be 'yes' or 'no'"; goto loaderr;
1060 }
1061 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1062 server.requirepass = zstrdup(argv[1]);
1063 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1064 server.pidfile = zstrdup(argv[1]);
1065 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1066 server.dbfilename = zstrdup(argv[1]);
1067 } else {
1068 err = "Bad directive or wrong number of arguments"; goto loaderr;
1069 }
1070 for (j = 0; j < argc; j++)
1071 sdsfree(argv[j]);
1072 zfree(argv);
1073 sdsfree(line);
1074 }
1075 fclose(fp);
1076 return;
1077
1078 loaderr:
1079 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1080 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1081 fprintf(stderr, ">>> '%s'\n", line);
1082 fprintf(stderr, "%s\n", err);
1083 exit(1);
1084 }
1085
1086 static void freeClientArgv(redisClient *c) {
1087 int j;
1088
1089 for (j = 0; j < c->argc; j++)
1090 decrRefCount(c->argv[j]);
1091 c->argc = 0;
1092 }
1093
1094 static void freeClient(redisClient *c) {
1095 listNode *ln;
1096
1097 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1098 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1099 sdsfree(c->querybuf);
1100 listRelease(c->reply);
1101 freeClientArgv(c);
1102 close(c->fd);
1103 ln = listSearchKey(server.clients,c);
1104 assert(ln != NULL);
1105 listDelNode(server.clients,ln);
1106 if (c->flags & REDIS_SLAVE) {
1107 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1108 close(c->repldbfd);
1109 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1110 ln = listSearchKey(l,c);
1111 assert(ln != NULL);
1112 listDelNode(l,ln);
1113 }
1114 if (c->flags & REDIS_MASTER) {
1115 server.master = NULL;
1116 server.replstate = REDIS_REPL_CONNECT;
1117 }
1118 zfree(c->argv);
1119 zfree(c);
1120 }
1121
1122 static void glueReplyBuffersIfNeeded(redisClient *c) {
1123 int totlen = 0;
1124 listNode *ln;
1125 robj *o;
1126
1127 listRewind(c->reply);
1128 while((ln = listYield(c->reply))) {
1129 o = ln->value;
1130 totlen += sdslen(o->ptr);
1131 /* This optimization makes more sense if we don't have to copy
1132 * too much data */
1133 if (totlen > 1024) return;
1134 }
1135 if (totlen > 0) {
1136 char buf[1024];
1137 int copylen = 0;
1138
1139 listRewind(c->reply);
1140 while((ln = listYield(c->reply))) {
1141 o = ln->value;
1142 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1143 copylen += sdslen(o->ptr);
1144 listDelNode(c->reply,ln);
1145 }
1146 /* Now the output buffer is empty, add the new single element */
1147 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1148 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1149 }
1150 }
1151
1152 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1153 redisClient *c = privdata;
1154 int nwritten = 0, totwritten = 0, objlen;
1155 robj *o;
1156 REDIS_NOTUSED(el);
1157 REDIS_NOTUSED(mask);
1158
1159 if (server.glueoutputbuf && listLength(c->reply) > 1)
1160 glueReplyBuffersIfNeeded(c);
1161 while(listLength(c->reply)) {
1162 o = listNodeValue(listFirst(c->reply));
1163 objlen = sdslen(o->ptr);
1164
1165 if (objlen == 0) {
1166 listDelNode(c->reply,listFirst(c->reply));
1167 continue;
1168 }
1169
1170 if (c->flags & REDIS_MASTER) {
1171 nwritten = objlen - c->sentlen;
1172 } else {
1173 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1174 if (nwritten <= 0) break;
1175 }
1176 c->sentlen += nwritten;
1177 totwritten += nwritten;
1178 /* If we fully sent the object on head go to the next one */
1179 if (c->sentlen == objlen) {
1180 listDelNode(c->reply,listFirst(c->reply));
1181 c->sentlen = 0;
1182 }
1183 }
1184 if (nwritten == -1) {
1185 if (errno == EAGAIN) {
1186 nwritten = 0;
1187 } else {
1188 redisLog(REDIS_DEBUG,
1189 "Error writing to client: %s", strerror(errno));
1190 freeClient(c);
1191 return;
1192 }
1193 }
1194 if (totwritten > 0) c->lastinteraction = time(NULL);
1195 if (listLength(c->reply) == 0) {
1196 c->sentlen = 0;
1197 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1198 }
1199 }
1200
1201 static struct redisCommand *lookupCommand(char *name) {
1202 int j = 0;
1203 while(cmdTable[j].name != NULL) {
1204 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1205 j++;
1206 }
1207 return NULL;
1208 }
1209
1210 /* resetClient prepare the client to process the next command */
1211 static void resetClient(redisClient *c) {
1212 freeClientArgv(c);
1213 c->bulklen = -1;
1214 }
1215
1216 /* If this function gets called we already read a whole
1217 * command, argments are in the client argv/argc fields.
1218 * processCommand() execute the command or prepare the
1219 * server for a bulk read from the client.
1220 *
1221 * If 1 is returned the client is still alive and valid and
1222 * and other operations can be performed by the caller. Otherwise
1223 * if 0 is returned the client was destroied (i.e. after QUIT). */
1224 static int processCommand(redisClient *c) {
1225 struct redisCommand *cmd;
1226 long long dirty;
1227
1228 /* Free some memory if needed (maxmemory setting) */
1229 if (server.maxmemory) freeMemoryIfNeeded();
1230
1231 /* The QUIT command is handled as a special case. Normal command
1232 * procs are unable to close the client connection safely */
1233 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1234 freeClient(c);
1235 return 0;
1236 }
1237 cmd = lookupCommand(c->argv[0]->ptr);
1238 if (!cmd) {
1239 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1240 resetClient(c);
1241 return 1;
1242 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1243 (c->argc < -cmd->arity)) {
1244 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1245 resetClient(c);
1246 return 1;
1247 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1248 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1249 resetClient(c);
1250 return 1;
1251 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1252 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1253
1254 decrRefCount(c->argv[c->argc-1]);
1255 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1256 c->argc--;
1257 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1258 resetClient(c);
1259 return 1;
1260 }
1261 c->argc--;
1262 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1263 /* It is possible that the bulk read is already in the
1264 * buffer. Check this condition and handle it accordingly */
1265 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1266 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1267 c->argc++;
1268 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1269 } else {
1270 return 1;
1271 }
1272 }
1273 /* Let's try to share objects on the command arguments vector */
1274 if (server.shareobjects) {
1275 int j;
1276 for(j = 1; j < c->argc; j++)
1277 c->argv[j] = tryObjectSharing(c->argv[j]);
1278 }
1279 /* Check if the user is authenticated */
1280 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1281 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1282 resetClient(c);
1283 return 1;
1284 }
1285
1286 /* Exec the command */
1287 dirty = server.dirty;
1288 cmd->proc(c);
1289 if (server.dirty-dirty != 0 && listLength(server.slaves))
1290 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1291 if (listLength(server.monitors))
1292 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1293 server.stat_numcommands++;
1294
1295 /* Prepare the client for the next command */
1296 if (c->flags & REDIS_CLOSE) {
1297 freeClient(c);
1298 return 0;
1299 }
1300 resetClient(c);
1301 return 1;
1302 }
1303
1304 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1305 listNode *ln;
1306 int outc = 0, j;
1307 robj **outv;
1308 /* (args*2)+1 is enough room for args, spaces, newlines */
1309 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1310
1311 if (argc <= REDIS_STATIC_ARGS) {
1312 outv = static_outv;
1313 } else {
1314 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1315 if (!outv) oom("replicationFeedSlaves");
1316 }
1317
1318 for (j = 0; j < argc; j++) {
1319 if (j != 0) outv[outc++] = shared.space;
1320 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1321 robj *lenobj;
1322
1323 lenobj = createObject(REDIS_STRING,
1324 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv[j]->ptr)));
1325 lenobj->refcount = 0;
1326 outv[outc++] = lenobj;
1327 }
1328 outv[outc++] = argv[j];
1329 }
1330 outv[outc++] = shared.crlf;
1331
1332 /* Increment all the refcounts at start and decrement at end in order to
1333 * be sure to free objects if there is no slave in a replication state
1334 * able to be feed with commands */
1335 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1336 listRewind(slaves);
1337 while((ln = listYield(slaves))) {
1338 redisClient *slave = ln->value;
1339
1340 /* Don't feed slaves that are still waiting for BGSAVE to start */
1341 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1342
1343 /* Feed all the other slaves, MONITORs and so on */
1344 if (slave->slaveseldb != dictid) {
1345 robj *selectcmd;
1346
1347 switch(dictid) {
1348 case 0: selectcmd = shared.select0; break;
1349 case 1: selectcmd = shared.select1; break;
1350 case 2: selectcmd = shared.select2; break;
1351 case 3: selectcmd = shared.select3; break;
1352 case 4: selectcmd = shared.select4; break;
1353 case 5: selectcmd = shared.select5; break;
1354 case 6: selectcmd = shared.select6; break;
1355 case 7: selectcmd = shared.select7; break;
1356 case 8: selectcmd = shared.select8; break;
1357 case 9: selectcmd = shared.select9; break;
1358 default:
1359 selectcmd = createObject(REDIS_STRING,
1360 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1361 selectcmd->refcount = 0;
1362 break;
1363 }
1364 addReply(slave,selectcmd);
1365 slave->slaveseldb = dictid;
1366 }
1367 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1368 }
1369 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1370 if (outv != static_outv) zfree(outv);
1371 }
1372
1373 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1374 redisClient *c = (redisClient*) privdata;
1375 char buf[REDIS_IOBUF_LEN];
1376 int nread;
1377 REDIS_NOTUSED(el);
1378 REDIS_NOTUSED(mask);
1379
1380 nread = read(fd, buf, REDIS_IOBUF_LEN);
1381 if (nread == -1) {
1382 if (errno == EAGAIN) {
1383 nread = 0;
1384 } else {
1385 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1386 freeClient(c);
1387 return;
1388 }
1389 } else if (nread == 0) {
1390 redisLog(REDIS_DEBUG, "Client closed connection");
1391 freeClient(c);
1392 return;
1393 }
1394 if (nread) {
1395 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1396 c->lastinteraction = time(NULL);
1397 } else {
1398 return;
1399 }
1400
1401 again:
1402 if (c->bulklen == -1) {
1403 /* Read the first line of the query */
1404 char *p = strchr(c->querybuf,'\n');
1405 size_t querylen;
1406 if (p) {
1407 sds query, *argv;
1408 int argc, j;
1409
1410 query = c->querybuf;
1411 c->querybuf = sdsempty();
1412 querylen = 1+(p-(query));
1413 if (sdslen(query) > querylen) {
1414 /* leave data after the first line of the query in the buffer */
1415 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1416 }
1417 *p = '\0'; /* remove "\n" */
1418 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1419 sdsupdatelen(query);
1420
1421 /* Now we can split the query in arguments */
1422 if (sdslen(query) == 0) {
1423 /* Ignore empty query */
1424 sdsfree(query);
1425 return;
1426 }
1427 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1428 if (argv == NULL) oom("sdssplitlen");
1429 sdsfree(query);
1430
1431 if (c->argv) zfree(c->argv);
1432 c->argv = zmalloc(sizeof(robj*)*argc);
1433 if (c->argv == NULL) oom("allocating arguments list for client");
1434
1435 for (j = 0; j < argc; j++) {
1436 if (sdslen(argv[j])) {
1437 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1438 c->argc++;
1439 } else {
1440 sdsfree(argv[j]);
1441 }
1442 }
1443 zfree(argv);
1444 /* Execute the command. If the client is still valid
1445 * after processCommand() return and there is something
1446 * on the query buffer try to process the next command. */
1447 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1448 return;
1449 } else if (sdslen(c->querybuf) >= 1024*32) {
1450 redisLog(REDIS_DEBUG, "Client protocol error");
1451 freeClient(c);
1452 return;
1453 }
1454 } else {
1455 /* Bulk read handling. Note that if we are at this point
1456 the client already sent a command terminated with a newline,
1457 we are reading the bulk data that is actually the last
1458 argument of the command. */
1459 int qbl = sdslen(c->querybuf);
1460
1461 if (c->bulklen <= qbl) {
1462 /* Copy everything but the final CRLF as final argument */
1463 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1464 c->argc++;
1465 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1466 processCommand(c);
1467 return;
1468 }
1469 }
1470 }
1471
1472 static int selectDb(redisClient *c, int id) {
1473 if (id < 0 || id >= server.dbnum)
1474 return REDIS_ERR;
1475 c->db = &server.db[id];
1476 return REDIS_OK;
1477 }
1478
1479 static void *dupClientReplyValue(void *o) {
1480 incrRefCount((robj*)o);
1481 return 0;
1482 }
1483
1484 static redisClient *createClient(int fd) {
1485 redisClient *c = zmalloc(sizeof(*c));
1486
1487 anetNonBlock(NULL,fd);
1488 anetTcpNoDelay(NULL,fd);
1489 if (!c) return NULL;
1490 selectDb(c,0);
1491 c->fd = fd;
1492 c->querybuf = sdsempty();
1493 c->argc = 0;
1494 c->argv = NULL;
1495 c->bulklen = -1;
1496 c->sentlen = 0;
1497 c->flags = 0;
1498 c->lastinteraction = time(NULL);
1499 c->authenticated = 0;
1500 c->replstate = REDIS_REPL_NONE;
1501 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1502 listSetFreeMethod(c->reply,decrRefCount);
1503 listSetDupMethod(c->reply,dupClientReplyValue);
1504 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1505 readQueryFromClient, c, NULL) == AE_ERR) {
1506 freeClient(c);
1507 return NULL;
1508 }
1509 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1510 return c;
1511 }
1512
1513 static void addReply(redisClient *c, robj *obj) {
1514 if (listLength(c->reply) == 0 &&
1515 (c->replstate == REDIS_REPL_NONE ||
1516 c->replstate == REDIS_REPL_ONLINE) &&
1517 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1518 sendReplyToClient, c, NULL) == AE_ERR) return;
1519 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1520 incrRefCount(obj);
1521 }
1522
1523 static void addReplySds(redisClient *c, sds s) {
1524 robj *o = createObject(REDIS_STRING,s);
1525 addReply(c,o);
1526 decrRefCount(o);
1527 }
1528
1529 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1530 int cport, cfd;
1531 char cip[128];
1532 redisClient *c;
1533 REDIS_NOTUSED(el);
1534 REDIS_NOTUSED(mask);
1535 REDIS_NOTUSED(privdata);
1536
1537 cfd = anetAccept(server.neterr, fd, cip, &cport);
1538 if (cfd == AE_ERR) {
1539 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1540 return;
1541 }
1542 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1543 if ((c = createClient(cfd)) == NULL) {
1544 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1545 close(cfd); /* May be already closed, just ingore errors */
1546 return;
1547 }
1548 /* If maxclient directive is set and this is one client more... close the
1549 * connection. Note that we create the client instead to check before
1550 * for this condition, since now the socket is already set in nonblocking
1551 * mode and we can send an error for free using the Kernel I/O */
1552 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1553 char *err = "-ERR max number of clients reached\r\n";
1554
1555 /* That's a best effort error message, don't check write errors */
1556 (void) write(c->fd,err,strlen(err));
1557 freeClient(c);
1558 return;
1559 }
1560 server.stat_numconnections++;
1561 }
1562
1563 /* ======================= Redis objects implementation ===================== */
1564
1565 static robj *createObject(int type, void *ptr) {
1566 robj *o;
1567
1568 if (listLength(server.objfreelist)) {
1569 listNode *head = listFirst(server.objfreelist);
1570 o = listNodeValue(head);
1571 listDelNode(server.objfreelist,head);
1572 } else {
1573 o = zmalloc(sizeof(*o));
1574 }
1575 if (!o) oom("createObject");
1576 o->type = type;
1577 o->ptr = ptr;
1578 o->refcount = 1;
1579 return o;
1580 }
1581
1582 static robj *createStringObject(char *ptr, size_t len) {
1583 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1584 }
1585
1586 static robj *createListObject(void) {
1587 list *l = listCreate();
1588
1589 if (!l) oom("listCreate");
1590 listSetFreeMethod(l,decrRefCount);
1591 return createObject(REDIS_LIST,l);
1592 }
1593
1594 static robj *createSetObject(void) {
1595 dict *d = dictCreate(&setDictType,NULL);
1596 if (!d) oom("dictCreate");
1597 return createObject(REDIS_SET,d);
1598 }
1599
1600 static void freeStringObject(robj *o) {
1601 sdsfree(o->ptr);
1602 }
1603
1604 static void freeListObject(robj *o) {
1605 listRelease((list*) o->ptr);
1606 }
1607
1608 static void freeSetObject(robj *o) {
1609 dictRelease((dict*) o->ptr);
1610 }
1611
1612 static void freeHashObject(robj *o) {
1613 dictRelease((dict*) o->ptr);
1614 }
1615
1616 static void incrRefCount(robj *o) {
1617 o->refcount++;
1618 #ifdef DEBUG_REFCOUNT
1619 if (o->type == REDIS_STRING)
1620 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1621 #endif
1622 }
1623
1624 static void decrRefCount(void *obj) {
1625 robj *o = obj;
1626
1627 #ifdef DEBUG_REFCOUNT
1628 if (o->type == REDIS_STRING)
1629 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1630 #endif
1631 if (--(o->refcount) == 0) {
1632 switch(o->type) {
1633 case REDIS_STRING: freeStringObject(o); break;
1634 case REDIS_LIST: freeListObject(o); break;
1635 case REDIS_SET: freeSetObject(o); break;
1636 case REDIS_HASH: freeHashObject(o); break;
1637 default: assert(0 != 0); break;
1638 }
1639 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1640 !listAddNodeHead(server.objfreelist,o))
1641 zfree(o);
1642 }
1643 }
1644
1645 /* Try to share an object against the shared objects pool */
1646 static robj *tryObjectSharing(robj *o) {
1647 struct dictEntry *de;
1648 unsigned long c;
1649
1650 if (o == NULL || server.shareobjects == 0) return o;
1651
1652 assert(o->type == REDIS_STRING);
1653 de = dictFind(server.sharingpool,o);
1654 if (de) {
1655 robj *shared = dictGetEntryKey(de);
1656
1657 c = ((unsigned long) dictGetEntryVal(de))+1;
1658 dictGetEntryVal(de) = (void*) c;
1659 incrRefCount(shared);
1660 decrRefCount(o);
1661 return shared;
1662 } else {
1663 /* Here we are using a stream algorihtm: Every time an object is
1664 * shared we increment its count, everytime there is a miss we
1665 * recrement the counter of a random object. If this object reaches
1666 * zero we remove the object and put the current object instead. */
1667 if (dictSize(server.sharingpool) >=
1668 server.sharingpoolsize) {
1669 de = dictGetRandomKey(server.sharingpool);
1670 assert(de != NULL);
1671 c = ((unsigned long) dictGetEntryVal(de))-1;
1672 dictGetEntryVal(de) = (void*) c;
1673 if (c == 0) {
1674 dictDelete(server.sharingpool,de->key);
1675 }
1676 } else {
1677 c = 0; /* If the pool is empty we want to add this object */
1678 }
1679 if (c == 0) {
1680 int retval;
1681
1682 retval = dictAdd(server.sharingpool,o,(void*)1);
1683 assert(retval == DICT_OK);
1684 incrRefCount(o);
1685 }
1686 return o;
1687 }
1688 }
1689
1690 static robj *lookupKey(redisDb *db, robj *key) {
1691 dictEntry *de = dictFind(db->dict,key);
1692 return de ? dictGetEntryVal(de) : NULL;
1693 }
1694
1695 static robj *lookupKeyRead(redisDb *db, robj *key) {
1696 expireIfNeeded(db,key);
1697 return lookupKey(db,key);
1698 }
1699
1700 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1701 deleteIfVolatile(db,key);
1702 return lookupKey(db,key);
1703 }
1704
1705 static int deleteKey(redisDb *db, robj *key) {
1706 int retval;
1707
1708 /* We need to protect key from destruction: after the first dictDelete()
1709 * it may happen that 'key' is no longer valid if we don't increment
1710 * it's count. This may happen when we get the object reference directly
1711 * from the hash table with dictRandomKey() or dict iterators */
1712 incrRefCount(key);
1713 if (dictSize(db->expires)) dictDelete(db->expires,key);
1714 retval = dictDelete(db->dict,key);
1715 decrRefCount(key);
1716
1717 return retval == DICT_OK;
1718 }
1719
1720 /*============================ DB saving/loading ============================ */
1721
1722 static int rdbSaveType(FILE *fp, unsigned char type) {
1723 if (fwrite(&type,1,1,fp) == 0) return -1;
1724 return 0;
1725 }
1726
1727 static int rdbSaveTime(FILE *fp, time_t t) {
1728 int32_t t32 = (int32_t) t;
1729 if (fwrite(&t32,4,1,fp) == 0) return -1;
1730 return 0;
1731 }
1732
1733 /* check rdbLoadLen() comments for more info */
1734 static int rdbSaveLen(FILE *fp, uint32_t len) {
1735 unsigned char buf[2];
1736
1737 if (len < (1<<6)) {
1738 /* Save a 6 bit len */
1739 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
1740 if (fwrite(buf,1,1,fp) == 0) return -1;
1741 } else if (len < (1<<14)) {
1742 /* Save a 14 bit len */
1743 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
1744 buf[1] = len&0xFF;
1745 if (fwrite(buf,2,1,fp) == 0) return -1;
1746 } else {
1747 /* Save a 32 bit len */
1748 buf[0] = (REDIS_RDB_32BITLEN<<6);
1749 if (fwrite(buf,1,1,fp) == 0) return -1;
1750 len = htonl(len);
1751 if (fwrite(&len,4,1,fp) == 0) return -1;
1752 }
1753 return 0;
1754 }
1755
1756 /* String objects in the form "2391" "-100" without any space and with a
1757 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1758 * encoded as integers to save space */
1759 int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
1760 long long value;
1761 char *endptr, buf[32];
1762
1763 /* Check if it's possible to encode this value as a number */
1764 value = strtoll(s, &endptr, 10);
1765 if (endptr[0] != '\0') return 0;
1766 snprintf(buf,32,"%lld",value);
1767
1768 /* If the number converted back into a string is not identical
1769 * then it's not possible to encode the string as integer */
1770 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
1771
1772 /* Finally check if it fits in our ranges */
1773 if (value >= -(1<<7) && value <= (1<<7)-1) {
1774 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
1775 enc[1] = value&0xFF;
1776 return 2;
1777 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
1778 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
1779 enc[1] = value&0xFF;
1780 enc[2] = (value>>8)&0xFF;
1781 return 3;
1782 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
1783 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
1784 enc[1] = value&0xFF;
1785 enc[2] = (value>>8)&0xFF;
1786 enc[3] = (value>>16)&0xFF;
1787 enc[4] = (value>>24)&0xFF;
1788 return 5;
1789 } else {
1790 return 0;
1791 }
1792 }
1793
1794 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
1795 unsigned int comprlen, outlen;
1796 unsigned char byte;
1797 void *out;
1798
1799 /* We require at least four bytes compression for this to be worth it */
1800 outlen = sdslen(obj->ptr)-4;
1801 if (outlen <= 0) return 0;
1802 if ((out = zmalloc(outlen+1)) == NULL) return 0;
1803 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
1804 if (comprlen == 0) {
1805 zfree(out);
1806 return 0;
1807 }
1808 /* Data compressed! Let's save it on disk */
1809 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
1810 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
1811 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
1812 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
1813 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
1814 zfree(out);
1815 return comprlen;
1816
1817 writeerr:
1818 zfree(out);
1819 return -1;
1820 }
1821
1822 /* Save a string objet as [len][data] on disk. If the object is a string
1823 * representation of an integer value we try to safe it in a special form */
1824 static int rdbSaveStringObject(FILE *fp, robj *obj) {
1825 size_t len = sdslen(obj->ptr);
1826 int enclen;
1827
1828 /* Try integer encoding */
1829 if (len <= 11) {
1830 unsigned char buf[5];
1831 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
1832 if (fwrite(buf,enclen,1,fp) == 0) return -1;
1833 return 0;
1834 }
1835 }
1836
1837 /* Try LZF compression - under 20 bytes it's unable to compress even
1838 * aaaaaaaaaaaaaaaaaa so skip it */
1839 if (1 && len > 20) {
1840 int retval;
1841
1842 retval = rdbSaveLzfStringObject(fp,obj);
1843 if (retval == -1) return -1;
1844 if (retval > 0) return 0;
1845 /* retval == 0 means data can't be compressed, save the old way */
1846 }
1847
1848 /* Store verbatim */
1849 if (rdbSaveLen(fp,len) == -1) return -1;
1850 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
1851 return 0;
1852 }
1853
1854 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1855 static int rdbSave(char *filename) {
1856 dictIterator *di = NULL;
1857 dictEntry *de;
1858 FILE *fp;
1859 char tmpfile[256];
1860 int j;
1861 time_t now = time(NULL);
1862
1863 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
1864 fp = fopen(tmpfile,"w");
1865 if (!fp) {
1866 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
1867 return REDIS_ERR;
1868 }
1869 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
1870 for (j = 0; j < server.dbnum; j++) {
1871 redisDb *db = server.db+j;
1872 dict *d = db->dict;
1873 if (dictSize(d) == 0) continue;
1874 di = dictGetIterator(d);
1875 if (!di) {
1876 fclose(fp);
1877 return REDIS_ERR;
1878 }
1879
1880 /* Write the SELECT DB opcode */
1881 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
1882 if (rdbSaveLen(fp,j) == -1) goto werr;
1883
1884 /* Iterate this DB writing every entry */
1885 while((de = dictNext(di)) != NULL) {
1886 robj *key = dictGetEntryKey(de);
1887 robj *o = dictGetEntryVal(de);
1888 time_t expiretime = getExpire(db,key);
1889
1890 /* Save the expire time */
1891 if (expiretime != -1) {
1892 /* If this key is already expired skip it */
1893 if (expiretime < now) continue;
1894 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
1895 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
1896 }
1897 /* Save the key and associated value */
1898 if (rdbSaveType(fp,o->type) == -1) goto werr;
1899 if (rdbSaveStringObject(fp,key) == -1) goto werr;
1900 if (o->type == REDIS_STRING) {
1901 /* Save a string value */
1902 if (rdbSaveStringObject(fp,o) == -1) goto werr;
1903 } else if (o->type == REDIS_LIST) {
1904 /* Save a list value */
1905 list *list = o->ptr;
1906 listNode *ln;
1907
1908 listRewind(list);
1909 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
1910 while((ln = listYield(list))) {
1911 robj *eleobj = listNodeValue(ln);
1912
1913 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1914 }
1915 } else if (o->type == REDIS_SET) {
1916 /* Save a set value */
1917 dict *set = o->ptr;
1918 dictIterator *di = dictGetIterator(set);
1919 dictEntry *de;
1920
1921 if (!set) oom("dictGetIteraotr");
1922 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
1923 while((de = dictNext(di)) != NULL) {
1924 robj *eleobj = dictGetEntryKey(de);
1925
1926 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1927 }
1928 dictReleaseIterator(di);
1929 } else {
1930 assert(0 != 0);
1931 }
1932 }
1933 dictReleaseIterator(di);
1934 }
1935 /* EOF opcode */
1936 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
1937
1938 /* Make sure data will not remain on the OS's output buffers */
1939 fflush(fp);
1940 fsync(fileno(fp));
1941 fclose(fp);
1942
1943 /* Use RENAME to make sure the DB file is changed atomically only
1944 * if the generate DB file is ok. */
1945 if (rename(tmpfile,filename) == -1) {
1946 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
1947 unlink(tmpfile);
1948 return REDIS_ERR;
1949 }
1950 redisLog(REDIS_NOTICE,"DB saved on disk");
1951 server.dirty = 0;
1952 server.lastsave = time(NULL);
1953 return REDIS_OK;
1954
1955 werr:
1956 fclose(fp);
1957 unlink(tmpfile);
1958 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1959 if (di) dictReleaseIterator(di);
1960 return REDIS_ERR;
1961 }
1962
1963 static int rdbSaveBackground(char *filename) {
1964 pid_t childpid;
1965
1966 if (server.bgsaveinprogress) return REDIS_ERR;
1967 if ((childpid = fork()) == 0) {
1968 /* Child */
1969 close(server.fd);
1970 if (rdbSave(filename) == REDIS_OK) {
1971 exit(0);
1972 } else {
1973 exit(1);
1974 }
1975 } else {
1976 /* Parent */
1977 if (childpid == -1) {
1978 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
1979 strerror(errno));
1980 return REDIS_ERR;
1981 }
1982 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
1983 server.bgsaveinprogress = 1;
1984 return REDIS_OK;
1985 }
1986 return REDIS_OK; /* unreached */
1987 }
1988
1989 static int rdbLoadType(FILE *fp) {
1990 unsigned char type;
1991 if (fread(&type,1,1,fp) == 0) return -1;
1992 return type;
1993 }
1994
1995 static time_t rdbLoadTime(FILE *fp) {
1996 int32_t t32;
1997 if (fread(&t32,4,1,fp) == 0) return -1;
1998 return (time_t) t32;
1999 }
2000
2001 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2002 * of this file for a description of how this are stored on disk.
2003 *
2004 * isencoded is set to 1 if the readed length is not actually a length but
2005 * an "encoding type", check the above comments for more info */
2006 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2007 unsigned char buf[2];
2008 uint32_t len;
2009
2010 if (isencoded) *isencoded = 0;
2011 if (rdbver == 0) {
2012 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2013 return ntohl(len);
2014 } else {
2015 int type;
2016
2017 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2018 type = (buf[0]&0xC0)>>6;
2019 if (type == REDIS_RDB_6BITLEN) {
2020 /* Read a 6 bit len */
2021 return buf[0]&0x3F;
2022 } else if (type == REDIS_RDB_ENCVAL) {
2023 /* Read a 6 bit len encoding type */
2024 if (isencoded) *isencoded = 1;
2025 return buf[0]&0x3F;
2026 } else if (type == REDIS_RDB_14BITLEN) {
2027 /* Read a 14 bit len */
2028 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2029 return ((buf[0]&0x3F)<<8)|buf[1];
2030 } else {
2031 /* Read a 32 bit len */
2032 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2033 return ntohl(len);
2034 }
2035 }
2036 }
2037
2038 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2039 unsigned char enc[4];
2040 long long val;
2041
2042 if (enctype == REDIS_RDB_ENC_INT8) {
2043 if (fread(enc,1,1,fp) == 0) return NULL;
2044 val = (signed char)enc[0];
2045 } else if (enctype == REDIS_RDB_ENC_INT16) {
2046 uint16_t v;
2047 if (fread(enc,2,1,fp) == 0) return NULL;
2048 v = enc[0]|(enc[1]<<8);
2049 val = (int16_t)v;
2050 } else if (enctype == REDIS_RDB_ENC_INT32) {
2051 uint32_t v;
2052 if (fread(enc,4,1,fp) == 0) return NULL;
2053 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2054 val = (int32_t)v;
2055 } else {
2056 val = 0; /* anti-warning */
2057 assert(0!=0);
2058 }
2059 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2060 }
2061
2062 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2063 unsigned int len, clen;
2064 unsigned char *c = NULL;
2065 sds val = NULL;
2066
2067 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2068 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2069 if ((c = zmalloc(clen)) == NULL) goto err;
2070 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2071 if (fread(c,clen,1,fp) == 0) goto err;
2072 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2073 zfree(c);
2074 return createObject(REDIS_STRING,val);
2075 err:
2076 zfree(c);
2077 sdsfree(val);
2078 return NULL;
2079 }
2080
2081 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2082 int isencoded;
2083 uint32_t len;
2084 sds val;
2085
2086 len = rdbLoadLen(fp,rdbver,&isencoded);
2087 if (isencoded) {
2088 switch(len) {
2089 case REDIS_RDB_ENC_INT8:
2090 case REDIS_RDB_ENC_INT16:
2091 case REDIS_RDB_ENC_INT32:
2092 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2093 case REDIS_RDB_ENC_LZF:
2094 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2095 default:
2096 assert(0!=0);
2097 }
2098 }
2099
2100 if (len == REDIS_RDB_LENERR) return NULL;
2101 val = sdsnewlen(NULL,len);
2102 if (len && fread(val,len,1,fp) == 0) {
2103 sdsfree(val);
2104 return NULL;
2105 }
2106 return tryObjectSharing(createObject(REDIS_STRING,val));
2107 }
2108
2109 static int rdbLoad(char *filename) {
2110 FILE *fp;
2111 robj *keyobj = NULL;
2112 uint32_t dbid;
2113 int type, retval, rdbver;
2114 dict *d = server.db[0].dict;
2115 redisDb *db = server.db+0;
2116 char buf[1024];
2117 time_t expiretime = -1, now = time(NULL);
2118
2119 fp = fopen(filename,"r");
2120 if (!fp) return REDIS_ERR;
2121 if (fread(buf,9,1,fp) == 0) goto eoferr;
2122 buf[9] = '\0';
2123 if (memcmp(buf,"REDIS",5) != 0) {
2124 fclose(fp);
2125 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2126 return REDIS_ERR;
2127 }
2128 rdbver = atoi(buf+5);
2129 if (rdbver > 1) {
2130 fclose(fp);
2131 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2132 return REDIS_ERR;
2133 }
2134 while(1) {
2135 robj *o;
2136
2137 /* Read type. */
2138 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2139 if (type == REDIS_EXPIRETIME) {
2140 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2141 /* We read the time so we need to read the object type again */
2142 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2143 }
2144 if (type == REDIS_EOF) break;
2145 /* Handle SELECT DB opcode as a special case */
2146 if (type == REDIS_SELECTDB) {
2147 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2148 goto eoferr;
2149 if (dbid >= (unsigned)server.dbnum) {
2150 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2151 exit(1);
2152 }
2153 db = server.db+dbid;
2154 d = db->dict;
2155 continue;
2156 }
2157 /* Read key */
2158 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2159
2160 if (type == REDIS_STRING) {
2161 /* Read string value */
2162 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2163 } else if (type == REDIS_LIST || type == REDIS_SET) {
2164 /* Read list/set value */
2165 uint32_t listlen;
2166
2167 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2168 goto eoferr;
2169 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2170 /* Load every single element of the list/set */
2171 while(listlen--) {
2172 robj *ele;
2173
2174 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2175 if (type == REDIS_LIST) {
2176 if (!listAddNodeTail((list*)o->ptr,ele))
2177 oom("listAddNodeTail");
2178 } else {
2179 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2180 oom("dictAdd");
2181 }
2182 }
2183 } else {
2184 assert(0 != 0);
2185 }
2186 /* Add the new object in the hash table */
2187 retval = dictAdd(d,keyobj,o);
2188 if (retval == DICT_ERR) {
2189 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2190 exit(1);
2191 }
2192 /* Set the expire time if needed */
2193 if (expiretime != -1) {
2194 setExpire(db,keyobj,expiretime);
2195 /* Delete this key if already expired */
2196 if (expiretime < now) deleteKey(db,keyobj);
2197 expiretime = -1;
2198 }
2199 keyobj = o = NULL;
2200 }
2201 fclose(fp);
2202 return REDIS_OK;
2203
2204 eoferr: /* unexpected end of file is handled here with a fatal exit */
2205 if (keyobj) decrRefCount(keyobj);
2206 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2207 exit(1);
2208 return REDIS_ERR; /* Just to avoid warning */
2209 }
2210
2211 /*================================== Commands =============================== */
2212
2213 static void authCommand(redisClient *c) {
2214 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2215 c->authenticated = 1;
2216 addReply(c,shared.ok);
2217 } else {
2218 c->authenticated = 0;
2219 addReply(c,shared.err);
2220 }
2221 }
2222
2223 static void pingCommand(redisClient *c) {
2224 addReply(c,shared.pong);
2225 }
2226
2227 static void echoCommand(redisClient *c) {
2228 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
2229 (int)sdslen(c->argv[1]->ptr)));
2230 addReply(c,c->argv[1]);
2231 addReply(c,shared.crlf);
2232 }
2233
2234 /*=================================== Strings =============================== */
2235
2236 static void setGenericCommand(redisClient *c, int nx) {
2237 int retval;
2238
2239 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2240 if (retval == DICT_ERR) {
2241 if (!nx) {
2242 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2243 incrRefCount(c->argv[2]);
2244 } else {
2245 addReply(c,shared.czero);
2246 return;
2247 }
2248 } else {
2249 incrRefCount(c->argv[1]);
2250 incrRefCount(c->argv[2]);
2251 }
2252 server.dirty++;
2253 removeExpire(c->db,c->argv[1]);
2254 addReply(c, nx ? shared.cone : shared.ok);
2255 }
2256
2257 static void setCommand(redisClient *c) {
2258 setGenericCommand(c,0);
2259 }
2260
2261 static void setnxCommand(redisClient *c) {
2262 setGenericCommand(c,1);
2263 }
2264
2265 static void getCommand(redisClient *c) {
2266 robj *o = lookupKeyRead(c->db,c->argv[1]);
2267
2268 if (o == NULL) {
2269 addReply(c,shared.nullbulk);
2270 } else {
2271 if (o->type != REDIS_STRING) {
2272 addReply(c,shared.wrongtypeerr);
2273 } else {
2274 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2275 addReply(c,o);
2276 addReply(c,shared.crlf);
2277 }
2278 }
2279 }
2280
2281 static void getSetCommand(redisClient *c) {
2282 getCommand(c);
2283 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2284 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2285 } else {
2286 incrRefCount(c->argv[1]);
2287 }
2288 incrRefCount(c->argv[2]);
2289 server.dirty++;
2290 removeExpire(c->db,c->argv[1]);
2291 }
2292
2293 static void mgetCommand(redisClient *c) {
2294 int j;
2295
2296 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2297 for (j = 1; j < c->argc; j++) {
2298 robj *o = lookupKeyRead(c->db,c->argv[j]);
2299 if (o == NULL) {
2300 addReply(c,shared.nullbulk);
2301 } else {
2302 if (o->type != REDIS_STRING) {
2303 addReply(c,shared.nullbulk);
2304 } else {
2305 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2306 addReply(c,o);
2307 addReply(c,shared.crlf);
2308 }
2309 }
2310 }
2311 }
2312
2313 static void incrDecrCommand(redisClient *c, long long incr) {
2314 long long value;
2315 int retval;
2316 robj *o;
2317
2318 o = lookupKeyWrite(c->db,c->argv[1]);
2319 if (o == NULL) {
2320 value = 0;
2321 } else {
2322 if (o->type != REDIS_STRING) {
2323 value = 0;
2324 } else {
2325 char *eptr;
2326
2327 value = strtoll(o->ptr, &eptr, 10);
2328 }
2329 }
2330
2331 value += incr;
2332 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2333 retval = dictAdd(c->db->dict,c->argv[1],o);
2334 if (retval == DICT_ERR) {
2335 dictReplace(c->db->dict,c->argv[1],o);
2336 removeExpire(c->db,c->argv[1]);
2337 } else {
2338 incrRefCount(c->argv[1]);
2339 }
2340 server.dirty++;
2341 addReply(c,shared.colon);
2342 addReply(c,o);
2343 addReply(c,shared.crlf);
2344 }
2345
2346 static void incrCommand(redisClient *c) {
2347 incrDecrCommand(c,1);
2348 }
2349
2350 static void decrCommand(redisClient *c) {
2351 incrDecrCommand(c,-1);
2352 }
2353
2354 static void incrbyCommand(redisClient *c) {
2355 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2356 incrDecrCommand(c,incr);
2357 }
2358
2359 static void decrbyCommand(redisClient *c) {
2360 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2361 incrDecrCommand(c,-incr);
2362 }
2363
2364 /* ========================= Type agnostic commands ========================= */
2365
2366 static void delCommand(redisClient *c) {
2367 int deleted = 0, j;
2368
2369 for (j = 1; j < c->argc; j++) {
2370 if (deleteKey(c->db,c->argv[j])) {
2371 server.dirty++;
2372 deleted++;
2373 }
2374 }
2375 switch(deleted) {
2376 case 0:
2377 addReply(c,shared.czero);
2378 break;
2379 case 1:
2380 addReply(c,shared.cone);
2381 break;
2382 default:
2383 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2384 break;
2385 }
2386 }
2387
2388 static void existsCommand(redisClient *c) {
2389 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2390 }
2391
2392 static void selectCommand(redisClient *c) {
2393 int id = atoi(c->argv[1]->ptr);
2394
2395 if (selectDb(c,id) == REDIS_ERR) {
2396 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2397 } else {
2398 addReply(c,shared.ok);
2399 }
2400 }
2401
2402 static void randomkeyCommand(redisClient *c) {
2403 dictEntry *de;
2404
2405 while(1) {
2406 de = dictGetRandomKey(c->db->dict);
2407 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2408 }
2409 if (de == NULL) {
2410 addReply(c,shared.plus);
2411 addReply(c,shared.crlf);
2412 } else {
2413 addReply(c,shared.plus);
2414 addReply(c,dictGetEntryKey(de));
2415 addReply(c,shared.crlf);
2416 }
2417 }
2418
2419 static void keysCommand(redisClient *c) {
2420 dictIterator *di;
2421 dictEntry *de;
2422 sds pattern = c->argv[1]->ptr;
2423 int plen = sdslen(pattern);
2424 int numkeys = 0, keyslen = 0;
2425 robj *lenobj = createObject(REDIS_STRING,NULL);
2426
2427 di = dictGetIterator(c->db->dict);
2428 if (!di) oom("dictGetIterator");
2429 addReply(c,lenobj);
2430 decrRefCount(lenobj);
2431 while((de = dictNext(di)) != NULL) {
2432 robj *keyobj = dictGetEntryKey(de);
2433
2434 sds key = keyobj->ptr;
2435 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2436 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2437 if (expireIfNeeded(c->db,keyobj) == 0) {
2438 if (numkeys != 0)
2439 addReply(c,shared.space);
2440 addReply(c,keyobj);
2441 numkeys++;
2442 keyslen += sdslen(key);
2443 }
2444 }
2445 }
2446 dictReleaseIterator(di);
2447 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2448 addReply(c,shared.crlf);
2449 }
2450
2451 static void dbsizeCommand(redisClient *c) {
2452 addReplySds(c,
2453 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2454 }
2455
2456 static void lastsaveCommand(redisClient *c) {
2457 addReplySds(c,
2458 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2459 }
2460
2461 static void typeCommand(redisClient *c) {
2462 robj *o;
2463 char *type;
2464
2465 o = lookupKeyRead(c->db,c->argv[1]);
2466 if (o == NULL) {
2467 type = "+none";
2468 } else {
2469 switch(o->type) {
2470 case REDIS_STRING: type = "+string"; break;
2471 case REDIS_LIST: type = "+list"; break;
2472 case REDIS_SET: type = "+set"; break;
2473 default: type = "unknown"; break;
2474 }
2475 }
2476 addReplySds(c,sdsnew(type));
2477 addReply(c,shared.crlf);
2478 }
2479
2480 static void saveCommand(redisClient *c) {
2481 if (server.bgsaveinprogress) {
2482 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2483 return;
2484 }
2485 if (rdbSave(server.dbfilename) == REDIS_OK) {
2486 addReply(c,shared.ok);
2487 } else {
2488 addReply(c,shared.err);
2489 }
2490 }
2491
2492 static void bgsaveCommand(redisClient *c) {
2493 if (server.bgsaveinprogress) {
2494 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2495 return;
2496 }
2497 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2498 addReply(c,shared.ok);
2499 } else {
2500 addReply(c,shared.err);
2501 }
2502 }
2503
2504 static void shutdownCommand(redisClient *c) {
2505 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2506 /* XXX: TODO kill the child if there is a bgsave in progress */
2507 if (rdbSave(server.dbfilename) == REDIS_OK) {
2508 if (server.daemonize) {
2509 unlink(server.pidfile);
2510 }
2511 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2512 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2513 exit(1);
2514 } else {
2515 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2516 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2517 }
2518 }
2519
2520 static void renameGenericCommand(redisClient *c, int nx) {
2521 robj *o;
2522
2523 /* To use the same key as src and dst is probably an error */
2524 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2525 addReply(c,shared.sameobjecterr);
2526 return;
2527 }
2528
2529 o = lookupKeyWrite(c->db,c->argv[1]);
2530 if (o == NULL) {
2531 addReply(c,shared.nokeyerr);
2532 return;
2533 }
2534 incrRefCount(o);
2535 deleteIfVolatile(c->db,c->argv[2]);
2536 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2537 if (nx) {
2538 decrRefCount(o);
2539 addReply(c,shared.czero);
2540 return;
2541 }
2542 dictReplace(c->db->dict,c->argv[2],o);
2543 } else {
2544 incrRefCount(c->argv[2]);
2545 }
2546 deleteKey(c->db,c->argv[1]);
2547 server.dirty++;
2548 addReply(c,nx ? shared.cone : shared.ok);
2549 }
2550
2551 static void renameCommand(redisClient *c) {
2552 renameGenericCommand(c,0);
2553 }
2554
2555 static void renamenxCommand(redisClient *c) {
2556 renameGenericCommand(c,1);
2557 }
2558
2559 static void moveCommand(redisClient *c) {
2560 robj *o;
2561 redisDb *src, *dst;
2562 int srcid;
2563
2564 /* Obtain source and target DB pointers */
2565 src = c->db;
2566 srcid = c->db->id;
2567 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2568 addReply(c,shared.outofrangeerr);
2569 return;
2570 }
2571 dst = c->db;
2572 selectDb(c,srcid); /* Back to the source DB */
2573
2574 /* If the user is moving using as target the same
2575 * DB as the source DB it is probably an error. */
2576 if (src == dst) {
2577 addReply(c,shared.sameobjecterr);
2578 return;
2579 }
2580
2581 /* Check if the element exists and get a reference */
2582 o = lookupKeyWrite(c->db,c->argv[1]);
2583 if (!o) {
2584 addReply(c,shared.czero);
2585 return;
2586 }
2587
2588 /* Try to add the element to the target DB */
2589 deleteIfVolatile(dst,c->argv[1]);
2590 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2591 addReply(c,shared.czero);
2592 return;
2593 }
2594 incrRefCount(c->argv[1]);
2595 incrRefCount(o);
2596
2597 /* OK! key moved, free the entry in the source DB */
2598 deleteKey(src,c->argv[1]);
2599 server.dirty++;
2600 addReply(c,shared.cone);
2601 }
2602
2603 /* =================================== Lists ================================ */
2604 static void pushGenericCommand(redisClient *c, int where) {
2605 robj *lobj;
2606 list *list;
2607
2608 lobj = lookupKeyWrite(c->db,c->argv[1]);
2609 if (lobj == NULL) {
2610 lobj = createListObject();
2611 list = lobj->ptr;
2612 if (where == REDIS_HEAD) {
2613 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2614 } else {
2615 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2616 }
2617 dictAdd(c->db->dict,c->argv[1],lobj);
2618 incrRefCount(c->argv[1]);
2619 incrRefCount(c->argv[2]);
2620 } else {
2621 if (lobj->type != REDIS_LIST) {
2622 addReply(c,shared.wrongtypeerr);
2623 return;
2624 }
2625 list = lobj->ptr;
2626 if (where == REDIS_HEAD) {
2627 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2628 } else {
2629 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2630 }
2631 incrRefCount(c->argv[2]);
2632 }
2633 server.dirty++;
2634 addReply(c,shared.ok);
2635 }
2636
2637 static void lpushCommand(redisClient *c) {
2638 pushGenericCommand(c,REDIS_HEAD);
2639 }
2640
2641 static void rpushCommand(redisClient *c) {
2642 pushGenericCommand(c,REDIS_TAIL);
2643 }
2644
2645 static void llenCommand(redisClient *c) {
2646 robj *o;
2647 list *l;
2648
2649 o = lookupKeyRead(c->db,c->argv[1]);
2650 if (o == NULL) {
2651 addReply(c,shared.czero);
2652 return;
2653 } else {
2654 if (o->type != REDIS_LIST) {
2655 addReply(c,shared.wrongtypeerr);
2656 } else {
2657 l = o->ptr;
2658 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
2659 }
2660 }
2661 }
2662
2663 static void lindexCommand(redisClient *c) {
2664 robj *o;
2665 int index = atoi(c->argv[2]->ptr);
2666
2667 o = lookupKeyRead(c->db,c->argv[1]);
2668 if (o == NULL) {
2669 addReply(c,shared.nullbulk);
2670 } else {
2671 if (o->type != REDIS_LIST) {
2672 addReply(c,shared.wrongtypeerr);
2673 } else {
2674 list *list = o->ptr;
2675 listNode *ln;
2676
2677 ln = listIndex(list, index);
2678 if (ln == NULL) {
2679 addReply(c,shared.nullbulk);
2680 } else {
2681 robj *ele = listNodeValue(ln);
2682 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2683 addReply(c,ele);
2684 addReply(c,shared.crlf);
2685 }
2686 }
2687 }
2688 }
2689
2690 static void lsetCommand(redisClient *c) {
2691 robj *o;
2692 int index = atoi(c->argv[2]->ptr);
2693
2694 o = lookupKeyWrite(c->db,c->argv[1]);
2695 if (o == NULL) {
2696 addReply(c,shared.nokeyerr);
2697 } else {
2698 if (o->type != REDIS_LIST) {
2699 addReply(c,shared.wrongtypeerr);
2700 } else {
2701 list *list = o->ptr;
2702 listNode *ln;
2703
2704 ln = listIndex(list, index);
2705 if (ln == NULL) {
2706 addReply(c,shared.outofrangeerr);
2707 } else {
2708 robj *ele = listNodeValue(ln);
2709
2710 decrRefCount(ele);
2711 listNodeValue(ln) = c->argv[3];
2712 incrRefCount(c->argv[3]);
2713 addReply(c,shared.ok);
2714 server.dirty++;
2715 }
2716 }
2717 }
2718 }
2719
2720 static void popGenericCommand(redisClient *c, int where) {
2721 robj *o;
2722
2723 o = lookupKeyWrite(c->db,c->argv[1]);
2724 if (o == NULL) {
2725 addReply(c,shared.nullbulk);
2726 } else {
2727 if (o->type != REDIS_LIST) {
2728 addReply(c,shared.wrongtypeerr);
2729 } else {
2730 list *list = o->ptr;
2731 listNode *ln;
2732
2733 if (where == REDIS_HEAD)
2734 ln = listFirst(list);
2735 else
2736 ln = listLast(list);
2737
2738 if (ln == NULL) {
2739 addReply(c,shared.nullbulk);
2740 } else {
2741 robj *ele = listNodeValue(ln);
2742 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2743 addReply(c,ele);
2744 addReply(c,shared.crlf);
2745 listDelNode(list,ln);
2746 server.dirty++;
2747 }
2748 }
2749 }
2750 }
2751
2752 static void lpopCommand(redisClient *c) {
2753 popGenericCommand(c,REDIS_HEAD);
2754 }
2755
2756 static void rpopCommand(redisClient *c) {
2757 popGenericCommand(c,REDIS_TAIL);
2758 }
2759
2760 static void lrangeCommand(redisClient *c) {
2761 robj *o;
2762 int start = atoi(c->argv[2]->ptr);
2763 int end = atoi(c->argv[3]->ptr);
2764
2765 o = lookupKeyRead(c->db,c->argv[1]);
2766 if (o == NULL) {
2767 addReply(c,shared.nullmultibulk);
2768 } else {
2769 if (o->type != REDIS_LIST) {
2770 addReply(c,shared.wrongtypeerr);
2771 } else {
2772 list *list = o->ptr;
2773 listNode *ln;
2774 int llen = listLength(list);
2775 int rangelen, j;
2776 robj *ele;
2777
2778 /* convert negative indexes */
2779 if (start < 0) start = llen+start;
2780 if (end < 0) end = llen+end;
2781 if (start < 0) start = 0;
2782 if (end < 0) end = 0;
2783
2784 /* indexes sanity checks */
2785 if (start > end || start >= llen) {
2786 /* Out of range start or start > end result in empty list */
2787 addReply(c,shared.emptymultibulk);
2788 return;
2789 }
2790 if (end >= llen) end = llen-1;
2791 rangelen = (end-start)+1;
2792
2793 /* Return the result in form of a multi-bulk reply */
2794 ln = listIndex(list, start);
2795 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
2796 for (j = 0; j < rangelen; j++) {
2797 ele = listNodeValue(ln);
2798 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2799 addReply(c,ele);
2800 addReply(c,shared.crlf);
2801 ln = ln->next;
2802 }
2803 }
2804 }
2805 }
2806
2807 static void ltrimCommand(redisClient *c) {
2808 robj *o;
2809 int start = atoi(c->argv[2]->ptr);
2810 int end = atoi(c->argv[3]->ptr);
2811
2812 o = lookupKeyWrite(c->db,c->argv[1]);
2813 if (o == NULL) {
2814 addReply(c,shared.nokeyerr);
2815 } else {
2816 if (o->type != REDIS_LIST) {
2817 addReply(c,shared.wrongtypeerr);
2818 } else {
2819 list *list = o->ptr;
2820 listNode *ln;
2821 int llen = listLength(list);
2822 int j, ltrim, rtrim;
2823
2824 /* convert negative indexes */
2825 if (start < 0) start = llen+start;
2826 if (end < 0) end = llen+end;
2827 if (start < 0) start = 0;
2828 if (end < 0) end = 0;
2829
2830 /* indexes sanity checks */
2831 if (start > end || start >= llen) {
2832 /* Out of range start or start > end result in empty list */
2833 ltrim = llen;
2834 rtrim = 0;
2835 } else {
2836 if (end >= llen) end = llen-1;
2837 ltrim = start;
2838 rtrim = llen-end-1;
2839 }
2840
2841 /* Remove list elements to perform the trim */
2842 for (j = 0; j < ltrim; j++) {
2843 ln = listFirst(list);
2844 listDelNode(list,ln);
2845 }
2846 for (j = 0; j < rtrim; j++) {
2847 ln = listLast(list);
2848 listDelNode(list,ln);
2849 }
2850 addReply(c,shared.ok);
2851 server.dirty++;
2852 }
2853 }
2854 }
2855
2856 static void lremCommand(redisClient *c) {
2857 robj *o;
2858
2859 o = lookupKeyWrite(c->db,c->argv[1]);
2860 if (o == NULL) {
2861 addReply(c,shared.nokeyerr);
2862 } else {
2863 if (o->type != REDIS_LIST) {
2864 addReply(c,shared.wrongtypeerr);
2865 } else {
2866 list *list = o->ptr;
2867 listNode *ln, *next;
2868 int toremove = atoi(c->argv[2]->ptr);
2869 int removed = 0;
2870 int fromtail = 0;
2871
2872 if (toremove < 0) {
2873 toremove = -toremove;
2874 fromtail = 1;
2875 }
2876 ln = fromtail ? list->tail : list->head;
2877 while (ln) {
2878 robj *ele = listNodeValue(ln);
2879
2880 next = fromtail ? ln->prev : ln->next;
2881 if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) {
2882 listDelNode(list,ln);
2883 server.dirty++;
2884 removed++;
2885 if (toremove && removed == toremove) break;
2886 }
2887 ln = next;
2888 }
2889 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
2890 }
2891 }
2892 }
2893
2894 /* ==================================== Sets ================================ */
2895
2896 static void saddCommand(redisClient *c) {
2897 robj *set;
2898
2899 set = lookupKeyWrite(c->db,c->argv[1]);
2900 if (set == NULL) {
2901 set = createSetObject();
2902 dictAdd(c->db->dict,c->argv[1],set);
2903 incrRefCount(c->argv[1]);
2904 } else {
2905 if (set->type != REDIS_SET) {
2906 addReply(c,shared.wrongtypeerr);
2907 return;
2908 }
2909 }
2910 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
2911 incrRefCount(c->argv[2]);
2912 server.dirty++;
2913 addReply(c,shared.cone);
2914 } else {
2915 addReply(c,shared.czero);
2916 }
2917 }
2918
2919 static void sremCommand(redisClient *c) {
2920 robj *set;
2921
2922 set = lookupKeyWrite(c->db,c->argv[1]);
2923 if (set == NULL) {
2924 addReply(c,shared.czero);
2925 } else {
2926 if (set->type != REDIS_SET) {
2927 addReply(c,shared.wrongtypeerr);
2928 return;
2929 }
2930 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
2931 server.dirty++;
2932 addReply(c,shared.cone);
2933 } else {
2934 addReply(c,shared.czero);
2935 }
2936 }
2937 }
2938
2939 static void smoveCommand(redisClient *c) {
2940 robj *srcset, *dstset;
2941
2942 srcset = lookupKeyWrite(c->db,c->argv[1]);
2943 dstset = lookupKeyWrite(c->db,c->argv[2]);
2944
2945 /* If the source key does not exist return 0, if it's of the wrong type
2946 * raise an error */
2947 if (srcset == NULL || srcset->type != REDIS_SET) {
2948 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
2949 return;
2950 }
2951 /* Error if the destination key is not a set as well */
2952 if (dstset && dstset->type != REDIS_SET) {
2953 addReply(c,shared.wrongtypeerr);
2954 return;
2955 }
2956 /* Remove the element from the source set */
2957 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
2958 /* Key not found in the src set! return zero */
2959 addReply(c,shared.czero);
2960 return;
2961 }
2962 server.dirty++;
2963 /* Add the element to the destination set */
2964 if (!dstset) {
2965 dstset = createSetObject();
2966 dictAdd(c->db->dict,c->argv[2],dstset);
2967 incrRefCount(c->argv[2]);
2968 }
2969 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
2970 incrRefCount(c->argv[3]);
2971 addReply(c,shared.cone);
2972 }
2973
2974 static void sismemberCommand(redisClient *c) {
2975 robj *set;
2976
2977 set = lookupKeyRead(c->db,c->argv[1]);
2978 if (set == NULL) {
2979 addReply(c,shared.czero);
2980 } else {
2981 if (set->type != REDIS_SET) {
2982 addReply(c,shared.wrongtypeerr);
2983 return;
2984 }
2985 if (dictFind(set->ptr,c->argv[2]))
2986 addReply(c,shared.cone);
2987 else
2988 addReply(c,shared.czero);
2989 }
2990 }
2991
2992 static void scardCommand(redisClient *c) {
2993 robj *o;
2994 dict *s;
2995
2996 o = lookupKeyRead(c->db,c->argv[1]);
2997 if (o == NULL) {
2998 addReply(c,shared.czero);
2999 return;
3000 } else {
3001 if (o->type != REDIS_SET) {
3002 addReply(c,shared.wrongtypeerr);
3003 } else {
3004 s = o->ptr;
3005 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3006 dictSize(s)));
3007 }
3008 }
3009 }
3010
3011 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3012 dict **d1 = (void*) s1, **d2 = (void*) s2;
3013
3014 return dictSize(*d1)-dictSize(*d2);
3015 }
3016
3017 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3018 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3019 dictIterator *di;
3020 dictEntry *de;
3021 robj *lenobj = NULL, *dstset = NULL;
3022 int j, cardinality = 0;
3023
3024 if (!dv) oom("sinterGenericCommand");
3025 for (j = 0; j < setsnum; j++) {
3026 robj *setobj;
3027
3028 setobj = dstkey ?
3029 lookupKeyWrite(c->db,setskeys[j]) :
3030 lookupKeyRead(c->db,setskeys[j]);
3031 if (!setobj) {
3032 zfree(dv);
3033 if (dstkey) {
3034 deleteKey(c->db,dstkey);
3035 addReply(c,shared.ok);
3036 } else {
3037 addReply(c,shared.nullmultibulk);
3038 }
3039 return;
3040 }
3041 if (setobj->type != REDIS_SET) {
3042 zfree(dv);
3043 addReply(c,shared.wrongtypeerr);
3044 return;
3045 }
3046 dv[j] = setobj->ptr;
3047 }
3048 /* Sort sets from the smallest to largest, this will improve our
3049 * algorithm's performace */
3050 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3051
3052 /* The first thing we should output is the total number of elements...
3053 * since this is a multi-bulk write, but at this stage we don't know
3054 * the intersection set size, so we use a trick, append an empty object
3055 * to the output list and save the pointer to later modify it with the
3056 * right length */
3057 if (!dstkey) {
3058 lenobj = createObject(REDIS_STRING,NULL);
3059 addReply(c,lenobj);
3060 decrRefCount(lenobj);
3061 } else {
3062 /* If we have a target key where to store the resulting set
3063 * create this key with an empty set inside */
3064 dstset = createSetObject();
3065 }
3066
3067 /* Iterate all the elements of the first (smallest) set, and test
3068 * the element against all the other sets, if at least one set does
3069 * not include the element it is discarded */
3070 di = dictGetIterator(dv[0]);
3071 if (!di) oom("dictGetIterator");
3072
3073 while((de = dictNext(di)) != NULL) {
3074 robj *ele;
3075
3076 for (j = 1; j < setsnum; j++)
3077 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3078 if (j != setsnum)
3079 continue; /* at least one set does not contain the member */
3080 ele = dictGetEntryKey(de);
3081 if (!dstkey) {
3082 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
3083 addReply(c,ele);
3084 addReply(c,shared.crlf);
3085 cardinality++;
3086 } else {
3087 dictAdd(dstset->ptr,ele,NULL);
3088 incrRefCount(ele);
3089 }
3090 }
3091 dictReleaseIterator(di);
3092
3093 if (dstkey) {
3094 /* Store the resulting set into the target */
3095 deleteKey(c->db,dstkey);
3096 dictAdd(c->db->dict,dstkey,dstset);
3097 incrRefCount(dstkey);
3098 }
3099
3100 if (!dstkey) {
3101 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3102 } else {
3103 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3104 dictSize((dict*)dstset->ptr)));
3105 server.dirty++;
3106 }
3107 zfree(dv);
3108 }
3109
3110 static void sinterCommand(redisClient *c) {
3111 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3112 }
3113
3114 static void sinterstoreCommand(redisClient *c) {
3115 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3116 }
3117
3118 #define REDIS_OP_UNION 0
3119 #define REDIS_OP_DIFF 1
3120
3121 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3122 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3123 dictIterator *di;
3124 dictEntry *de;
3125 robj *dstset = NULL;
3126 int j, cardinality = 0;
3127
3128 if (!dv) oom("sunionDiffGenericCommand");
3129 for (j = 0; j < setsnum; j++) {
3130 robj *setobj;
3131
3132 setobj = dstkey ?
3133 lookupKeyWrite(c->db,setskeys[j]) :
3134 lookupKeyRead(c->db,setskeys[j]);
3135 if (!setobj) {
3136 dv[j] = NULL;
3137 continue;
3138 }
3139 if (setobj->type != REDIS_SET) {
3140 zfree(dv);
3141 addReply(c,shared.wrongtypeerr);
3142 return;
3143 }
3144 dv[j] = setobj->ptr;
3145 }
3146
3147 /* We need a temp set object to store our union. If the dstkey
3148 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3149 * this set object will be the resulting object to set into the target key*/
3150 dstset = createSetObject();
3151
3152 /* Iterate all the elements of all the sets, add every element a single
3153 * time to the result set */
3154 for (j = 0; j < setsnum; j++) {
3155 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3156 if (!dv[j]) continue; /* non existing keys are like empty sets */
3157
3158 di = dictGetIterator(dv[j]);
3159 if (!di) oom("dictGetIterator");
3160
3161 while((de = dictNext(di)) != NULL) {
3162 robj *ele;
3163
3164 /* dictAdd will not add the same element multiple times */
3165 ele = dictGetEntryKey(de);
3166 if (op == REDIS_OP_UNION || j == 0) {
3167 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3168 incrRefCount(ele);
3169 cardinality++;
3170 }
3171 } else if (op == REDIS_OP_DIFF) {
3172 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3173 cardinality--;
3174 }
3175 }
3176 }
3177 dictReleaseIterator(di);
3178
3179 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3180 }
3181
3182 /* Output the content of the resulting set, if not in STORE mode */
3183 if (!dstkey) {
3184 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3185 di = dictGetIterator(dstset->ptr);
3186 if (!di) oom("dictGetIterator");
3187 while((de = dictNext(di)) != NULL) {
3188 robj *ele;
3189
3190 ele = dictGetEntryKey(de);
3191 addReplySds(c,sdscatprintf(sdsempty(),
3192 "$%d\r\n",sdslen(ele->ptr)));
3193 addReply(c,ele);
3194 addReply(c,shared.crlf);
3195 }
3196 dictReleaseIterator(di);
3197 } else {
3198 /* If we have a target key where to store the resulting set
3199 * create this key with the result set inside */
3200 deleteKey(c->db,dstkey);
3201 dictAdd(c->db->dict,dstkey,dstset);
3202 incrRefCount(dstkey);
3203 }
3204
3205 /* Cleanup */
3206 if (!dstkey) {
3207 decrRefCount(dstset);
3208 } else {
3209 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3210 dictSize((dict*)dstset->ptr)));
3211 server.dirty++;
3212 }
3213 zfree(dv);
3214 }
3215
3216 static void sunionCommand(redisClient *c) {
3217 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3218 }
3219
3220 static void sunionstoreCommand(redisClient *c) {
3221 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3222 }
3223
3224 static void sdiffCommand(redisClient *c) {
3225 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3226 }
3227
3228 static void sdiffstoreCommand(redisClient *c) {
3229 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3230 }
3231
3232 static void flushdbCommand(redisClient *c) {
3233 server.dirty += dictSize(c->db->dict);
3234 dictEmpty(c->db->dict);
3235 dictEmpty(c->db->expires);
3236 addReply(c,shared.ok);
3237 }
3238
3239 static void flushallCommand(redisClient *c) {
3240 server.dirty += emptyDb();
3241 addReply(c,shared.ok);
3242 rdbSave(server.dbfilename);
3243 server.dirty++;
3244 }
3245
3246 redisSortOperation *createSortOperation(int type, robj *pattern) {
3247 redisSortOperation *so = zmalloc(sizeof(*so));
3248 if (!so) oom("createSortOperation");
3249 so->type = type;
3250 so->pattern = pattern;
3251 return so;
3252 }
3253
3254 /* Return the value associated to the key with a name obtained
3255 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3256 robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3257 char *p;
3258 sds spat, ssub;
3259 robj keyobj;
3260 int prefixlen, sublen, postfixlen;
3261 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3262 struct {
3263 long len;
3264 long free;
3265 char buf[REDIS_SORTKEY_MAX+1];
3266 } keyname;
3267
3268 spat = pattern->ptr;
3269 ssub = subst->ptr;
3270 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3271 p = strchr(spat,'*');
3272 if (!p) return NULL;
3273
3274 prefixlen = p-spat;
3275 sublen = sdslen(ssub);
3276 postfixlen = sdslen(spat)-(prefixlen+1);
3277 memcpy(keyname.buf,spat,prefixlen);
3278 memcpy(keyname.buf+prefixlen,ssub,sublen);
3279 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3280 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3281 keyname.len = prefixlen+sublen+postfixlen;
3282
3283 keyobj.refcount = 1;
3284 keyobj.type = REDIS_STRING;
3285 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3286
3287 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3288 return lookupKeyRead(db,&keyobj);
3289 }
3290
3291 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3292 * the additional parameter is not standard but a BSD-specific we have to
3293 * pass sorting parameters via the global 'server' structure */
3294 static int sortCompare(const void *s1, const void *s2) {
3295 const redisSortObject *so1 = s1, *so2 = s2;
3296 int cmp;
3297
3298 if (!server.sort_alpha) {
3299 /* Numeric sorting. Here it's trivial as we precomputed scores */
3300 if (so1->u.score > so2->u.score) {
3301 cmp = 1;
3302 } else if (so1->u.score < so2->u.score) {
3303 cmp = -1;
3304 } else {
3305 cmp = 0;
3306 }
3307 } else {
3308 /* Alphanumeric sorting */
3309 if (server.sort_bypattern) {
3310 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3311 /* At least one compare object is NULL */
3312 if (so1->u.cmpobj == so2->u.cmpobj)
3313 cmp = 0;
3314 else if (so1->u.cmpobj == NULL)
3315 cmp = -1;
3316 else
3317 cmp = 1;
3318 } else {
3319 /* We have both the objects, use strcoll */
3320 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3321 }
3322 } else {
3323 /* Compare elements directly */
3324 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3325 }
3326 }
3327 return server.sort_desc ? -cmp : cmp;
3328 }
3329
3330 /* The SORT command is the most complex command in Redis. Warning: this code
3331 * is optimized for speed and a bit less for readability */
3332 static void sortCommand(redisClient *c) {
3333 list *operations;
3334 int outputlen = 0;
3335 int desc = 0, alpha = 0;
3336 int limit_start = 0, limit_count = -1, start, end;
3337 int j, dontsort = 0, vectorlen;
3338 int getop = 0; /* GET operation counter */
3339 robj *sortval, *sortby = NULL;
3340 redisSortObject *vector; /* Resulting vector to sort */
3341
3342 /* Lookup the key to sort. It must be of the right types */
3343 sortval = lookupKeyRead(c->db,c->argv[1]);
3344 if (sortval == NULL) {
3345 addReply(c,shared.nokeyerr);
3346 return;
3347 }
3348 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3349 addReply(c,shared.wrongtypeerr);
3350 return;
3351 }
3352
3353 /* Create a list of operations to perform for every sorted element.
3354 * Operations can be GET/DEL/INCR/DECR */
3355 operations = listCreate();
3356 listSetFreeMethod(operations,zfree);
3357 j = 2;
3358
3359 /* Now we need to protect sortval incrementing its count, in the future
3360 * SORT may have options able to overwrite/delete keys during the sorting
3361 * and the sorted key itself may get destroied */
3362 incrRefCount(sortval);
3363
3364 /* The SORT command has an SQL-alike syntax, parse it */
3365 while(j < c->argc) {
3366 int leftargs = c->argc-j-1;
3367 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3368 desc = 0;
3369 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3370 desc = 1;
3371 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3372 alpha = 1;
3373 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3374 limit_start = atoi(c->argv[j+1]->ptr);
3375 limit_count = atoi(c->argv[j+2]->ptr);
3376 j+=2;
3377 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3378 sortby = c->argv[j+1];
3379 /* If the BY pattern does not contain '*', i.e. it is constant,
3380 * we don't need to sort nor to lookup the weight keys. */
3381 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3382 j++;
3383 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3384 listAddNodeTail(operations,createSortOperation(
3385 REDIS_SORT_GET,c->argv[j+1]));
3386 getop++;
3387 j++;
3388 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3389 listAddNodeTail(operations,createSortOperation(
3390 REDIS_SORT_DEL,c->argv[j+1]));
3391 j++;
3392 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3393 listAddNodeTail(operations,createSortOperation(
3394 REDIS_SORT_INCR,c->argv[j+1]));
3395 j++;
3396 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3397 listAddNodeTail(operations,createSortOperation(
3398 REDIS_SORT_DECR,c->argv[j+1]));
3399 j++;
3400 } else {
3401 decrRefCount(sortval);
3402 listRelease(operations);
3403 addReply(c,shared.syntaxerr);
3404 return;
3405 }
3406 j++;
3407 }
3408
3409 /* Load the sorting vector with all the objects to sort */
3410 vectorlen = (sortval->type == REDIS_LIST) ?
3411 listLength((list*)sortval->ptr) :
3412 dictSize((dict*)sortval->ptr);
3413 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3414 if (!vector) oom("allocating objects vector for SORT");
3415 j = 0;
3416 if (sortval->type == REDIS_LIST) {
3417 list *list = sortval->ptr;
3418 listNode *ln;
3419
3420 listRewind(list);
3421 while((ln = listYield(list))) {
3422 robj *ele = ln->value;
3423 vector[j].obj = ele;
3424 vector[j].u.score = 0;
3425 vector[j].u.cmpobj = NULL;
3426 j++;
3427 }
3428 } else {
3429 dict *set = sortval->ptr;
3430 dictIterator *di;
3431 dictEntry *setele;
3432
3433 di = dictGetIterator(set);
3434 if (!di) oom("dictGetIterator");
3435 while((setele = dictNext(di)) != NULL) {
3436 vector[j].obj = dictGetEntryKey(setele);
3437 vector[j].u.score = 0;
3438 vector[j].u.cmpobj = NULL;
3439 j++;
3440 }
3441 dictReleaseIterator(di);
3442 }
3443 assert(j == vectorlen);
3444
3445 /* Now it's time to load the right scores in the sorting vector */
3446 if (dontsort == 0) {
3447 for (j = 0; j < vectorlen; j++) {
3448 if (sortby) {
3449 robj *byval;
3450
3451 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3452 if (!byval || byval->type != REDIS_STRING) continue;
3453 if (alpha) {
3454 vector[j].u.cmpobj = byval;
3455 incrRefCount(byval);
3456 } else {
3457 vector[j].u.score = strtod(byval->ptr,NULL);
3458 }
3459 } else {
3460 if (!alpha) vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3461 }
3462 }
3463 }
3464
3465 /* We are ready to sort the vector... perform a bit of sanity check
3466 * on the LIMIT option too. We'll use a partial version of quicksort. */
3467 start = (limit_start < 0) ? 0 : limit_start;
3468 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3469 if (start >= vectorlen) {
3470 start = vectorlen-1;
3471 end = vectorlen-2;
3472 }
3473 if (end >= vectorlen) end = vectorlen-1;
3474
3475 if (dontsort == 0) {
3476 server.sort_desc = desc;
3477 server.sort_alpha = alpha;
3478 server.sort_bypattern = sortby ? 1 : 0;
3479 if (sortby && (start != 0 || end != vectorlen-1))
3480 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3481 else
3482 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3483 }
3484
3485 /* Send command output to the output buffer, performing the specified
3486 * GET/DEL/INCR/DECR operations if any. */
3487 outputlen = getop ? getop*(end-start+1) : end-start+1;
3488 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3489 for (j = start; j <= end; j++) {
3490 listNode *ln;
3491 if (!getop) {
3492 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3493 sdslen(vector[j].obj->ptr)));
3494 addReply(c,vector[j].obj);
3495 addReply(c,shared.crlf);
3496 }
3497 listRewind(operations);
3498 while((ln = listYield(operations))) {
3499 redisSortOperation *sop = ln->value;
3500 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3501 vector[j].obj);
3502
3503 if (sop->type == REDIS_SORT_GET) {
3504 if (!val || val->type != REDIS_STRING) {
3505 addReply(c,shared.nullbulk);
3506 } else {
3507 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3508 sdslen(val->ptr)));
3509 addReply(c,val);
3510 addReply(c,shared.crlf);
3511 }
3512 } else if (sop->type == REDIS_SORT_DEL) {
3513 /* TODO */
3514 }
3515 }
3516 }
3517
3518 /* Cleanup */
3519 decrRefCount(sortval);
3520 listRelease(operations);
3521 for (j = 0; j < vectorlen; j++) {
3522 if (sortby && alpha && vector[j].u.cmpobj)
3523 decrRefCount(vector[j].u.cmpobj);
3524 }
3525 zfree(vector);
3526 }
3527
3528 static void infoCommand(redisClient *c) {
3529 sds info;
3530 time_t uptime = time(NULL)-server.stat_starttime;
3531
3532 info = sdscatprintf(sdsempty(),
3533 "redis_version:%s\r\n"
3534 "uptime_in_seconds:%d\r\n"
3535 "uptime_in_days:%d\r\n"
3536 "connected_clients:%d\r\n"
3537 "connected_slaves:%d\r\n"
3538 "used_memory:%zu\r\n"
3539 "changes_since_last_save:%lld\r\n"
3540 "bgsave_in_progress:%d\r\n"
3541 "last_save_time:%d\r\n"
3542 "total_connections_received:%lld\r\n"
3543 "total_commands_processed:%lld\r\n"
3544 "role:%s\r\n"
3545 ,REDIS_VERSION,
3546 uptime,
3547 uptime/(3600*24),
3548 listLength(server.clients)-listLength(server.slaves),
3549 listLength(server.slaves),
3550 server.usedmemory,
3551 server.dirty,
3552 server.bgsaveinprogress,
3553 server.lastsave,
3554 server.stat_numconnections,
3555 server.stat_numcommands,
3556 server.masterhost == NULL ? "master" : "slave"
3557 );
3558 if (server.masterhost) {
3559 info = sdscatprintf(info,
3560 "master_host:%s\r\n"
3561 "master_port:%d\r\n"
3562 "master_link_status:%s\r\n"
3563 "master_last_io_seconds_ago:%d\r\n"
3564 ,server.masterhost,
3565 server.masterport,
3566 (server.replstate == REDIS_REPL_CONNECTED) ?
3567 "up" : "down",
3568 (int)(time(NULL)-server.master->lastinteraction)
3569 );
3570 }
3571 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
3572 addReplySds(c,info);
3573 addReply(c,shared.crlf);
3574 }
3575
3576 static void monitorCommand(redisClient *c) {
3577 /* ignore MONITOR if aleady slave or in monitor mode */
3578 if (c->flags & REDIS_SLAVE) return;
3579
3580 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3581 c->slaveseldb = 0;
3582 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3583 addReply(c,shared.ok);
3584 }
3585
3586 /* ================================= Expire ================================= */
3587 static int removeExpire(redisDb *db, robj *key) {
3588 if (dictDelete(db->expires,key) == DICT_OK) {
3589 return 1;
3590 } else {
3591 return 0;
3592 }
3593 }
3594
3595 static int setExpire(redisDb *db, robj *key, time_t when) {
3596 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
3597 return 0;
3598 } else {
3599 incrRefCount(key);
3600 return 1;
3601 }
3602 }
3603
3604 /* Return the expire time of the specified key, or -1 if no expire
3605 * is associated with this key (i.e. the key is non volatile) */
3606 static time_t getExpire(redisDb *db, robj *key) {
3607 dictEntry *de;
3608
3609 /* No expire? return ASAP */
3610 if (dictSize(db->expires) == 0 ||
3611 (de = dictFind(db->expires,key)) == NULL) return -1;
3612
3613 return (time_t) dictGetEntryVal(de);
3614 }
3615
3616 static int expireIfNeeded(redisDb *db, robj *key) {
3617 time_t when;
3618 dictEntry *de;
3619
3620 /* No expire? return ASAP */
3621 if (dictSize(db->expires) == 0 ||
3622 (de = dictFind(db->expires,key)) == NULL) return 0;
3623
3624 /* Lookup the expire */
3625 when = (time_t) dictGetEntryVal(de);
3626 if (time(NULL) <= when) return 0;
3627
3628 /* Delete the key */
3629 dictDelete(db->expires,key);
3630 return dictDelete(db->dict,key) == DICT_OK;
3631 }
3632
3633 static int deleteIfVolatile(redisDb *db, robj *key) {
3634 dictEntry *de;
3635
3636 /* No expire? return ASAP */
3637 if (dictSize(db->expires) == 0 ||
3638 (de = dictFind(db->expires,key)) == NULL) return 0;
3639
3640 /* Delete the key */
3641 server.dirty++;
3642 dictDelete(db->expires,key);
3643 return dictDelete(db->dict,key) == DICT_OK;
3644 }
3645
3646 static void expireCommand(redisClient *c) {
3647 dictEntry *de;
3648 int seconds = atoi(c->argv[2]->ptr);
3649
3650 de = dictFind(c->db->dict,c->argv[1]);
3651 if (de == NULL) {
3652 addReply(c,shared.czero);
3653 return;
3654 }
3655 if (seconds <= 0) {
3656 addReply(c, shared.czero);
3657 return;
3658 } else {
3659 time_t when = time(NULL)+seconds;
3660 if (setExpire(c->db,c->argv[1],when))
3661 addReply(c,shared.cone);
3662 else
3663 addReply(c,shared.czero);
3664 return;
3665 }
3666 }
3667
3668 static void ttlCommand(redisClient *c) {
3669 time_t expire;
3670 int ttl = -1;
3671
3672 expire = getExpire(c->db,c->argv[1]);
3673 if (expire != -1) {
3674 ttl = (int) (expire-time(NULL));
3675 if (ttl < 0) ttl = -1;
3676 }
3677 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
3678 }
3679
3680 /* =============================== Replication ============================= */
3681
3682 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
3683 ssize_t nwritten, ret = size;
3684 time_t start = time(NULL);
3685
3686 timeout++;
3687 while(size) {
3688 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
3689 nwritten = write(fd,ptr,size);
3690 if (nwritten == -1) return -1;
3691 ptr += nwritten;
3692 size -= nwritten;
3693 }
3694 if ((time(NULL)-start) > timeout) {
3695 errno = ETIMEDOUT;
3696 return -1;
3697 }
3698 }
3699 return ret;
3700 }
3701
3702 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
3703 ssize_t nread, totread = 0;
3704 time_t start = time(NULL);
3705
3706 timeout++;
3707 while(size) {
3708 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
3709 nread = read(fd,ptr,size);
3710 if (nread == -1) return -1;
3711 ptr += nread;
3712 size -= nread;
3713 totread += nread;
3714 }
3715 if ((time(NULL)-start) > timeout) {
3716 errno = ETIMEDOUT;
3717 return -1;
3718 }
3719 }
3720 return totread;
3721 }
3722
3723 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
3724 ssize_t nread = 0;
3725
3726 size--;
3727 while(size) {
3728 char c;
3729
3730 if (syncRead(fd,&c,1,timeout) == -1) return -1;
3731 if (c == '\n') {
3732 *ptr = '\0';
3733 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
3734 return nread;
3735 } else {
3736 *ptr++ = c;
3737 *ptr = '\0';
3738 nread++;
3739 }
3740 }
3741 return nread;
3742 }
3743
3744 static void syncCommand(redisClient *c) {
3745 /* ignore SYNC if aleady slave or in monitor mode */
3746 if (c->flags & REDIS_SLAVE) return;
3747
3748 /* SYNC can't be issued when the server has pending data to send to
3749 * the client about already issued commands. We need a fresh reply
3750 * buffer registering the differences between the BGSAVE and the current
3751 * dataset, so that we can copy to other slaves if needed. */
3752 if (listLength(c->reply) != 0) {
3753 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3754 return;
3755 }
3756
3757 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
3758 /* Here we need to check if there is a background saving operation
3759 * in progress, or if it is required to start one */
3760 if (server.bgsaveinprogress) {
3761 /* Ok a background save is in progress. Let's check if it is a good
3762 * one for replication, i.e. if there is another slave that is
3763 * registering differences since the server forked to save */
3764 redisClient *slave;
3765 listNode *ln;
3766
3767 listRewind(server.slaves);
3768 while((ln = listYield(server.slaves))) {
3769 slave = ln->value;
3770 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
3771 }
3772 if (ln) {
3773 /* Perfect, the server is already registering differences for
3774 * another slave. Set the right state, and copy the buffer. */
3775 listRelease(c->reply);
3776 c->reply = listDup(slave->reply);
3777 if (!c->reply) oom("listDup copying slave reply list");
3778 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3779 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
3780 } else {
3781 /* No way, we need to wait for the next BGSAVE in order to
3782 * register differences */
3783 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
3784 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
3785 }
3786 } else {
3787 /* Ok we don't have a BGSAVE in progress, let's start one */
3788 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
3789 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3790 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
3791 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
3792 return;
3793 }
3794 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3795 }
3796 c->repldbfd = -1;
3797 c->flags |= REDIS_SLAVE;
3798 c->slaveseldb = 0;
3799 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
3800 return;
3801 }
3802
3803 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
3804 redisClient *slave = privdata;
3805 REDIS_NOTUSED(el);
3806 REDIS_NOTUSED(mask);
3807 char buf[REDIS_IOBUF_LEN];
3808 ssize_t nwritten, buflen;
3809
3810 if (slave->repldboff == 0) {
3811 /* Write the bulk write count before to transfer the DB. In theory here
3812 * we don't know how much room there is in the output buffer of the
3813 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3814 * operations) will never be smaller than the few bytes we need. */
3815 sds bulkcount;
3816
3817 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3818 slave->repldbsize);
3819 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
3820 {
3821 sdsfree(bulkcount);
3822 freeClient(slave);
3823 return;
3824 }
3825 sdsfree(bulkcount);
3826 }
3827 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
3828 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
3829 if (buflen <= 0) {
3830 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
3831 (buflen == 0) ? "premature EOF" : strerror(errno));
3832 freeClient(slave);
3833 return;
3834 }
3835 if ((nwritten = write(fd,buf,buflen)) == -1) {
3836 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
3837 strerror(errno));
3838 freeClient(slave);
3839 return;
3840 }
3841 slave->repldboff += nwritten;
3842 if (slave->repldboff == slave->repldbsize) {
3843 close(slave->repldbfd);
3844 slave->repldbfd = -1;
3845 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3846 slave->replstate = REDIS_REPL_ONLINE;
3847 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
3848 sendReplyToClient, slave, NULL) == AE_ERR) {
3849 freeClient(slave);
3850 return;
3851 }
3852 addReplySds(slave,sdsempty());
3853 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
3854 }
3855 }
3856
3857 static void updateSalvesWaitingBgsave(int bgsaveerr) {
3858 listNode *ln;
3859 int startbgsave = 0;
3860
3861 listRewind(server.slaves);
3862 while((ln = listYield(server.slaves))) {
3863 redisClient *slave = ln->value;
3864
3865 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
3866 startbgsave = 1;
3867 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3868 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
3869 struct stat buf;
3870
3871 if (bgsaveerr != REDIS_OK) {
3872 freeClient(slave);
3873 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
3874 continue;
3875 }
3876 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
3877 fstat(slave->repldbfd,&buf) == -1) {
3878 freeClient(slave);
3879 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
3880 continue;
3881 }
3882 slave->repldboff = 0;
3883 slave->repldbsize = buf.st_size;
3884 slave->replstate = REDIS_REPL_SEND_BULK;
3885 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3886 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
3887 freeClient(slave);
3888 continue;
3889 }
3890 }
3891 }
3892 if (startbgsave) {
3893 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3894 listRewind(server.slaves);
3895 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
3896 while((ln = listYield(server.slaves))) {
3897 redisClient *slave = ln->value;
3898
3899 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
3900 freeClient(slave);
3901 }
3902 }
3903 }
3904 }
3905
3906 static int syncWithMaster(void) {
3907 char buf[1024], tmpfile[256];
3908 int dumpsize;
3909 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
3910 int dfd;
3911
3912 if (fd == -1) {
3913 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
3914 strerror(errno));
3915 return REDIS_ERR;
3916 }
3917 /* Issue the SYNC command */
3918 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
3919 close(fd);
3920 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
3921 strerror(errno));
3922 return REDIS_ERR;
3923 }
3924 /* Read the bulk write count */
3925 if (syncReadLine(fd,buf,1024,3600) == -1) {
3926 close(fd);
3927 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
3928 strerror(errno));
3929 return REDIS_ERR;
3930 }
3931 dumpsize = atoi(buf+1);
3932 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
3933 /* Read the bulk write data on a temp file */
3934 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
3935 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
3936 if (dfd == -1) {
3937 close(fd);
3938 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
3939 return REDIS_ERR;
3940 }
3941 while(dumpsize) {
3942 int nread, nwritten;
3943
3944 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
3945 if (nread == -1) {
3946 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
3947 strerror(errno));
3948 close(fd);
3949 close(dfd);
3950 return REDIS_ERR;
3951 }
3952 nwritten = write(dfd,buf,nread);
3953 if (nwritten == -1) {
3954 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
3955 close(fd);
3956 close(dfd);
3957 return REDIS_ERR;
3958 }
3959 dumpsize -= nread;
3960 }
3961 close(dfd);
3962 if (rename(tmpfile,server.dbfilename) == -1) {
3963 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
3964 unlink(tmpfile);
3965 close(fd);
3966 return REDIS_ERR;
3967 }
3968 emptyDb();
3969 if (rdbLoad(server.dbfilename) != REDIS_OK) {
3970 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
3971 close(fd);
3972 return REDIS_ERR;
3973 }
3974 server.master = createClient(fd);
3975 server.master->flags |= REDIS_MASTER;
3976 server.replstate = REDIS_REPL_CONNECTED;
3977 return REDIS_OK;
3978 }
3979
3980 static void slaveofCommand(redisClient *c) {
3981 if (!strcasecmp(c->argv[1]->ptr,"no") &&
3982 !strcasecmp(c->argv[2]->ptr,"one")) {
3983 if (server.masterhost) {
3984 sdsfree(server.masterhost);
3985 server.masterhost = NULL;
3986 if (server.master) freeClient(server.master);
3987 server.replstate = REDIS_REPL_NONE;
3988 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
3989 }
3990 } else {
3991 sdsfree(server.masterhost);
3992 server.masterhost = sdsdup(c->argv[1]->ptr);
3993 server.masterport = atoi(c->argv[2]->ptr);
3994 if (server.master) freeClient(server.master);
3995 server.replstate = REDIS_REPL_CONNECT;
3996 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
3997 server.masterhost, server.masterport);
3998 }
3999 addReply(c,shared.ok);
4000 }
4001
4002 /* ============================ Maxmemory directive ======================== */
4003
4004 /* This function gets called when 'maxmemory' is set on the config file to limit
4005 * the max memory used by the server, and we are out of memory.
4006 * This function will try to, in order:
4007 *
4008 * - Free objects from the free list
4009 * - Try to remove keys with an EXPIRE set
4010 *
4011 * It is not possible to free enough memory to reach used-memory < maxmemory
4012 * the server will start refusing commands that will enlarge even more the
4013 * memory usage.
4014 */
4015 static void freeMemoryIfNeeded(void) {
4016 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4017 if (listLength(server.objfreelist)) {
4018 robj *o;
4019
4020 listNode *head = listFirst(server.objfreelist);
4021 o = listNodeValue(head);
4022 listDelNode(server.objfreelist,head);
4023 zfree(o);
4024 } else {
4025 int j, k, freed = 0;
4026
4027 for (j = 0; j < server.dbnum; j++) {
4028 int minttl = -1;
4029 robj *minkey = NULL;
4030 struct dictEntry *de;
4031
4032 if (dictSize(server.db[j].expires)) {
4033 freed = 1;
4034 /* From a sample of three keys drop the one nearest to
4035 * the natural expire */
4036 for (k = 0; k < 3; k++) {
4037 time_t t;
4038
4039 de = dictGetRandomKey(server.db[j].expires);
4040 t = (time_t) dictGetEntryVal(de);
4041 if (minttl == -1 || t < minttl) {
4042 minkey = dictGetEntryKey(de);
4043 minttl = t;
4044 }
4045 }
4046 deleteKey(server.db+j,minkey);
4047 }
4048 }
4049 if (!freed) return; /* nothing to free... */
4050 }
4051 }
4052 }
4053
4054 /* ================================= Debugging ============================== */
4055
4056 static void debugCommand(redisClient *c) {
4057 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4058 *((char*)-1) = 'x';
4059 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4060 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4061 robj *key, *val;
4062
4063 if (!de) {
4064 addReply(c,shared.nokeyerr);
4065 return;
4066 }
4067 key = dictGetEntryKey(de);
4068 val = dictGetEntryVal(de);
4069 addReplySds(c,sdscatprintf(sdsempty(),
4070 "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
4071 key, key->refcount, val, val->refcount));
4072 } else {
4073 addReplySds(c,sdsnew(
4074 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4075 }
4076 }
4077
4078 /* =================================== Main! ================================ */
4079
4080 #ifdef __linux__
4081 int linuxOvercommitMemoryValue(void) {
4082 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4083 char buf[64];
4084
4085 if (!fp) return -1;
4086 if (fgets(buf,64,fp) == NULL) {
4087 fclose(fp);
4088 return -1;
4089 }
4090 fclose(fp);
4091
4092 return atoi(buf);
4093 }
4094
4095 void linuxOvercommitMemoryWarning(void) {
4096 if (linuxOvercommitMemoryValue() == 0) {
4097 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4098 }
4099 }
4100 #endif /* __linux__ */
4101
4102 static void daemonize(void) {
4103 int fd;
4104 FILE *fp;
4105
4106 if (fork() != 0) exit(0); /* parent exits */
4107 setsid(); /* create a new session */
4108
4109 /* Every output goes to /dev/null. If Redis is daemonized but
4110 * the 'logfile' is set to 'stdout' in the configuration file
4111 * it will not log at all. */
4112 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4113 dup2(fd, STDIN_FILENO);
4114 dup2(fd, STDOUT_FILENO);
4115 dup2(fd, STDERR_FILENO);
4116 if (fd > STDERR_FILENO) close(fd);
4117 }
4118 /* Try to write the pid file */
4119 fp = fopen(server.pidfile,"w");
4120 if (fp) {
4121 fprintf(fp,"%d\n",getpid());
4122 fclose(fp);
4123 }
4124 }
4125
4126 int main(int argc, char **argv) {
4127 #ifdef __linux__
4128 linuxOvercommitMemoryWarning();
4129 #endif
4130
4131 initServerConfig();
4132 if (argc == 2) {
4133 ResetServerSaveParams();
4134 loadServerConfig(argv[1]);
4135 } else if (argc > 2) {
4136 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4137 exit(1);
4138 } else {
4139 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4140 }
4141 initServer();
4142 if (server.daemonize) daemonize();
4143 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4144 if (rdbLoad(server.dbfilename) == REDIS_OK)
4145 redisLog(REDIS_NOTICE,"DB loaded from disk");
4146 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4147 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4148 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4149 aeMain(server.el);
4150 aeDeleteEventLoop(server.el);
4151 return 0;
4152 }