]> git.saurik.com Git - redis.git/blob - redis.c
3a9cfc60e3ff6fdd052666c8ad3ce86140a5d957
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "0.101"
31
32 #include "fmacros.h"
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 #include <signal.h>
40 #include <sys/wait.h>
41 #include <errno.h>
42 #include <assert.h>
43 #include <ctype.h>
44 #include <stdarg.h>
45 #include <inttypes.h>
46 #include <arpa/inet.h>
47 #include <sys/stat.h>
48 #include <fcntl.h>
49 #include <sys/time.h>
50 #include <sys/resource.h>
51 #include <limits.h>
52 #include <execinfo.h>
53
54 #include "ae.h" /* Event driven programming library */
55 #include "sds.h" /* Dynamic safe strings */
56 #include "anet.h" /* Networking the easy way */
57 #include "dict.h" /* Hash tables */
58 #include "adlist.h" /* Linked lists */
59 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
60 #include "lzf.h" /* LZF compression library */
61 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
62
63 /* Error codes */
64 #define REDIS_OK 0
65 #define REDIS_ERR -1
66
67 /* Static server configuration */
68 #define REDIS_SERVERPORT 6379 /* TCP port */
69 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
70 #define REDIS_IOBUF_LEN 1024
71 #define REDIS_LOADBUF_LEN 1024
72 #define REDIS_STATIC_ARGS 4
73 #define REDIS_DEFAULT_DBNUM 16
74 #define REDIS_CONFIGLINE_MAX 1024
75 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
76 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
77 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
78
79 /* Hash table parameters */
80 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
81 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
82
83 /* Command flags */
84 #define REDIS_CMD_BULK 1 /* Bulk write command */
85 #define REDIS_CMD_INLINE 2 /* Inline command */
86 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
87 this flags will return an error when the 'maxmemory' option is set in the
88 config file and the server is using more than maxmemory bytes of memory.
89 In short this commands are denied on low memory conditions. */
90 #define REDIS_CMD_DENYOOM 4
91
92 /* Object types */
93 #define REDIS_STRING 0
94 #define REDIS_LIST 1
95 #define REDIS_SET 2
96 #define REDIS_HASH 3
97
98 /* Object types only used for dumping to disk */
99 #define REDIS_EXPIRETIME 253
100 #define REDIS_SELECTDB 254
101 #define REDIS_EOF 255
102
103 /* Defines related to the dump file format. To store 32 bits lengths for short
104 * keys requires a lot of space, so we check the most significant 2 bits of
105 * the first byte to interpreter the length:
106 *
107 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
108 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
109 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
110 * 11|000000 this means: specially encoded object will follow. The six bits
111 * number specify the kind of object that follows.
112 * See the REDIS_RDB_ENC_* defines.
113 *
114 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
115 * values, will fit inside. */
116 #define REDIS_RDB_6BITLEN 0
117 #define REDIS_RDB_14BITLEN 1
118 #define REDIS_RDB_32BITLEN 2
119 #define REDIS_RDB_ENCVAL 3
120 #define REDIS_RDB_LENERR UINT_MAX
121
122 /* When a length of a string object stored on disk has the first two bits
123 * set, the remaining two bits specify a special encoding for the object
124 * accordingly to the following defines: */
125 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
126 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
127 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
128 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
129
130 /* Client flags */
131 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
132 #define REDIS_SLAVE 2 /* This client is a slave server */
133 #define REDIS_MASTER 4 /* This client is a master server */
134 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
135
136 /* Slave replication state - slave side */
137 #define REDIS_REPL_NONE 0 /* No active replication */
138 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
139 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
140
141 /* Slave replication state - from the point of view of master
142 * Note that in SEND_BULK and ONLINE state the slave receives new updates
143 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
144 * to start the next background saving in order to send updates to it. */
145 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
146 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
147 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
148 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
149
150 /* List related stuff */
151 #define REDIS_HEAD 0
152 #define REDIS_TAIL 1
153
154 /* Sort operations */
155 #define REDIS_SORT_GET 0
156 #define REDIS_SORT_DEL 1
157 #define REDIS_SORT_INCR 2
158 #define REDIS_SORT_DECR 3
159 #define REDIS_SORT_ASC 4
160 #define REDIS_SORT_DESC 5
161 #define REDIS_SORTKEY_MAX 1024
162
163 /* Log levels */
164 #define REDIS_DEBUG 0
165 #define REDIS_NOTICE 1
166 #define REDIS_WARNING 2
167
168 /* Anti-warning macro... */
169 #define REDIS_NOTUSED(V) ((void) V)
170
171 /*================================= Data types ============================== */
172
173 /* A redis object, that is a type able to hold a string / list / set */
174 typedef struct redisObject {
175 void *ptr;
176 int type;
177 int refcount;
178 } robj;
179
180 typedef struct redisDb {
181 dict *dict;
182 dict *expires;
183 int id;
184 } redisDb;
185
186 /* With multiplexing we need to take per-clinet state.
187 * Clients are taken in a liked list. */
188 typedef struct redisClient {
189 int fd;
190 redisDb *db;
191 int dictid;
192 sds querybuf;
193 robj **argv;
194 int argc;
195 int bulklen; /* bulk read len. -1 if not in bulk read mode */
196 list *reply;
197 int sentlen;
198 time_t lastinteraction; /* time of the last interaction, used for timeout */
199 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
200 int slaveseldb; /* slave selected db, if this client is a slave */
201 int authenticated; /* when requirepass is non-NULL */
202 int replstate; /* replication state if this is a slave */
203 int repldbfd; /* replication DB file descriptor */
204 long repldboff; /* replication DB file offset */
205 off_t repldbsize; /* replication DB file size */
206 } redisClient;
207
208 struct saveparam {
209 time_t seconds;
210 int changes;
211 };
212
213 /* Global server state structure */
214 struct redisServer {
215 int port;
216 int fd;
217 redisDb *db;
218 dict *sharingpool;
219 unsigned int sharingpoolsize;
220 long long dirty; /* changes to DB from the last save */
221 list *clients;
222 list *slaves, *monitors;
223 char neterr[ANET_ERR_LEN];
224 aeEventLoop *el;
225 int cronloops; /* number of times the cron function run */
226 list *objfreelist; /* A list of freed objects to avoid malloc() */
227 time_t lastsave; /* Unix time of last save succeeede */
228 size_t usedmemory; /* Used memory in megabytes */
229 /* Fields used only for stats */
230 time_t stat_starttime; /* server start time */
231 long long stat_numcommands; /* number of processed commands */
232 long long stat_numconnections; /* number of connections received */
233 /* Configuration */
234 int verbosity;
235 int glueoutputbuf;
236 int maxidletime;
237 int dbnum;
238 int daemonize;
239 char *pidfile;
240 int bgsaveinprogress;
241 struct saveparam *saveparams;
242 int saveparamslen;
243 char *logfile;
244 char *bindaddr;
245 char *dbfilename;
246 char *requirepass;
247 int shareobjects;
248 /* Replication related */
249 int isslave;
250 char *masterhost;
251 int masterport;
252 redisClient *master; /* client that is master for this slave */
253 int replstate;
254 unsigned int maxclients;
255 unsigned int maxmemory;
256 /* Sort parameters - qsort_r() is only available under BSD so we
257 * have to take this state global, in order to pass it to sortCompare() */
258 int sort_desc;
259 int sort_alpha;
260 int sort_bypattern;
261 };
262
263 typedef void redisCommandProc(redisClient *c);
264 struct redisCommand {
265 char *name;
266 redisCommandProc *proc;
267 int arity;
268 int flags;
269 };
270
271 typedef struct _redisSortObject {
272 robj *obj;
273 union {
274 double score;
275 robj *cmpobj;
276 } u;
277 } redisSortObject;
278
279 typedef struct _redisSortOperation {
280 int type;
281 robj *pattern;
282 } redisSortOperation;
283
284 struct sharedObjectsStruct {
285 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
286 *colon, *nullbulk, *nullmultibulk,
287 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
288 *outofrangeerr, *plus,
289 *select0, *select1, *select2, *select3, *select4,
290 *select5, *select6, *select7, *select8, *select9;
291 } shared;
292
293 /*================================ Prototypes =============================== */
294
295 static void freeStringObject(robj *o);
296 static void freeListObject(robj *o);
297 static void freeSetObject(robj *o);
298 static void decrRefCount(void *o);
299 static robj *createObject(int type, void *ptr);
300 static void freeClient(redisClient *c);
301 static int rdbLoad(char *filename);
302 static void addReply(redisClient *c, robj *obj);
303 static void addReplySds(redisClient *c, sds s);
304 static void incrRefCount(robj *o);
305 static int rdbSaveBackground(char *filename);
306 static robj *createStringObject(char *ptr, size_t len);
307 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
308 static int syncWithMaster(void);
309 static robj *tryObjectSharing(robj *o);
310 static int removeExpire(redisDb *db, robj *key);
311 static int expireIfNeeded(redisDb *db, robj *key);
312 static int deleteIfVolatile(redisDb *db, robj *key);
313 static int deleteKey(redisDb *db, robj *key);
314 static time_t getExpire(redisDb *db, robj *key);
315 static int setExpire(redisDb *db, robj *key, time_t when);
316 static void updateSalvesWaitingBgsave(int bgsaveerr);
317 static void freeMemoryIfNeeded(void);
318 static void onSigsegv(int sig);
319
320 static void authCommand(redisClient *c);
321 static void pingCommand(redisClient *c);
322 static void echoCommand(redisClient *c);
323 static void setCommand(redisClient *c);
324 static void setnxCommand(redisClient *c);
325 static void getCommand(redisClient *c);
326 static void delCommand(redisClient *c);
327 static void existsCommand(redisClient *c);
328 static void incrCommand(redisClient *c);
329 static void decrCommand(redisClient *c);
330 static void incrbyCommand(redisClient *c);
331 static void decrbyCommand(redisClient *c);
332 static void selectCommand(redisClient *c);
333 static void randomkeyCommand(redisClient *c);
334 static void keysCommand(redisClient *c);
335 static void dbsizeCommand(redisClient *c);
336 static void lastsaveCommand(redisClient *c);
337 static void saveCommand(redisClient *c);
338 static void bgsaveCommand(redisClient *c);
339 static void shutdownCommand(redisClient *c);
340 static void moveCommand(redisClient *c);
341 static void renameCommand(redisClient *c);
342 static void renamenxCommand(redisClient *c);
343 static void lpushCommand(redisClient *c);
344 static void rpushCommand(redisClient *c);
345 static void lpopCommand(redisClient *c);
346 static void rpopCommand(redisClient *c);
347 static void llenCommand(redisClient *c);
348 static void lindexCommand(redisClient *c);
349 static void lrangeCommand(redisClient *c);
350 static void ltrimCommand(redisClient *c);
351 static void typeCommand(redisClient *c);
352 static void lsetCommand(redisClient *c);
353 static void saddCommand(redisClient *c);
354 static void sremCommand(redisClient *c);
355 static void smoveCommand(redisClient *c);
356 static void sismemberCommand(redisClient *c);
357 static void scardCommand(redisClient *c);
358 static void sinterCommand(redisClient *c);
359 static void sinterstoreCommand(redisClient *c);
360 static void sunionCommand(redisClient *c);
361 static void sunionstoreCommand(redisClient *c);
362 static void sdiffCommand(redisClient *c);
363 static void sdiffstoreCommand(redisClient *c);
364 static void syncCommand(redisClient *c);
365 static void flushdbCommand(redisClient *c);
366 static void flushallCommand(redisClient *c);
367 static void sortCommand(redisClient *c);
368 static void lremCommand(redisClient *c);
369 static void infoCommand(redisClient *c);
370 static void mgetCommand(redisClient *c);
371 static void monitorCommand(redisClient *c);
372 static void expireCommand(redisClient *c);
373 static void getSetCommand(redisClient *c);
374 static void ttlCommand(redisClient *c);
375 static void slaveofCommand(redisClient *c);
376 static void debugCommand(redisClient *c);
377
378 /*================================= Globals ================================= */
379
380 /* Global vars */
381 static struct redisServer server; /* server global state */
382 static struct redisCommand cmdTable[] = {
383 {"get",getCommand,2,REDIS_CMD_INLINE},
384 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
385 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
386 {"del",delCommand,-2,REDIS_CMD_INLINE},
387 {"exists",existsCommand,2,REDIS_CMD_INLINE},
388 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
389 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
390 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
391 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
392 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
393 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
394 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
395 {"llen",llenCommand,2,REDIS_CMD_INLINE},
396 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
397 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
398 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
399 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
400 {"lrem",lremCommand,4,REDIS_CMD_BULK},
401 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
402 {"srem",sremCommand,3,REDIS_CMD_BULK},
403 {"smove",smoveCommand,4,REDIS_CMD_BULK},
404 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
405 {"scard",scardCommand,2,REDIS_CMD_INLINE},
406 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
407 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
408 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
409 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
410 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
411 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
412 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
413 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
414 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
415 {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
416 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
417 {"select",selectCommand,2,REDIS_CMD_INLINE},
418 {"move",moveCommand,3,REDIS_CMD_INLINE},
419 {"rename",renameCommand,3,REDIS_CMD_INLINE},
420 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
421 {"expire",expireCommand,3,REDIS_CMD_INLINE},
422 {"keys",keysCommand,2,REDIS_CMD_INLINE},
423 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
424 {"auth",authCommand,2,REDIS_CMD_INLINE},
425 {"ping",pingCommand,1,REDIS_CMD_INLINE},
426 {"echo",echoCommand,2,REDIS_CMD_BULK},
427 {"save",saveCommand,1,REDIS_CMD_INLINE},
428 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
429 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
430 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
431 {"type",typeCommand,2,REDIS_CMD_INLINE},
432 {"sync",syncCommand,1,REDIS_CMD_INLINE},
433 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
434 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
435 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
436 {"info",infoCommand,1,REDIS_CMD_INLINE},
437 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
438 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
439 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
440 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
441 {NULL,NULL,0,0}
442 };
443
444 /*============================ Utility functions ============================ */
445
446 /* Glob-style pattern matching. */
447 int stringmatchlen(const char *pattern, int patternLen,
448 const char *string, int stringLen, int nocase)
449 {
450 while(patternLen) {
451 switch(pattern[0]) {
452 case '*':
453 while (pattern[1] == '*') {
454 pattern++;
455 patternLen--;
456 }
457 if (patternLen == 1)
458 return 1; /* match */
459 while(stringLen) {
460 if (stringmatchlen(pattern+1, patternLen-1,
461 string, stringLen, nocase))
462 return 1; /* match */
463 string++;
464 stringLen--;
465 }
466 return 0; /* no match */
467 break;
468 case '?':
469 if (stringLen == 0)
470 return 0; /* no match */
471 string++;
472 stringLen--;
473 break;
474 case '[':
475 {
476 int not, match;
477
478 pattern++;
479 patternLen--;
480 not = pattern[0] == '^';
481 if (not) {
482 pattern++;
483 patternLen--;
484 }
485 match = 0;
486 while(1) {
487 if (pattern[0] == '\\') {
488 pattern++;
489 patternLen--;
490 if (pattern[0] == string[0])
491 match = 1;
492 } else if (pattern[0] == ']') {
493 break;
494 } else if (patternLen == 0) {
495 pattern--;
496 patternLen++;
497 break;
498 } else if (pattern[1] == '-' && patternLen >= 3) {
499 int start = pattern[0];
500 int end = pattern[2];
501 int c = string[0];
502 if (start > end) {
503 int t = start;
504 start = end;
505 end = t;
506 }
507 if (nocase) {
508 start = tolower(start);
509 end = tolower(end);
510 c = tolower(c);
511 }
512 pattern += 2;
513 patternLen -= 2;
514 if (c >= start && c <= end)
515 match = 1;
516 } else {
517 if (!nocase) {
518 if (pattern[0] == string[0])
519 match = 1;
520 } else {
521 if (tolower((int)pattern[0]) == tolower((int)string[0]))
522 match = 1;
523 }
524 }
525 pattern++;
526 patternLen--;
527 }
528 if (not)
529 match = !match;
530 if (!match)
531 return 0; /* no match */
532 string++;
533 stringLen--;
534 break;
535 }
536 case '\\':
537 if (patternLen >= 2) {
538 pattern++;
539 patternLen--;
540 }
541 /* fall through */
542 default:
543 if (!nocase) {
544 if (pattern[0] != string[0])
545 return 0; /* no match */
546 } else {
547 if (tolower((int)pattern[0]) != tolower((int)string[0]))
548 return 0; /* no match */
549 }
550 string++;
551 stringLen--;
552 break;
553 }
554 pattern++;
555 patternLen--;
556 if (stringLen == 0) {
557 while(*pattern == '*') {
558 pattern++;
559 patternLen--;
560 }
561 break;
562 }
563 }
564 if (patternLen == 0 && stringLen == 0)
565 return 1;
566 return 0;
567 }
568
569 void redisLog(int level, const char *fmt, ...)
570 {
571 va_list ap;
572 FILE *fp;
573
574 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
575 if (!fp) return;
576
577 va_start(ap, fmt);
578 if (level >= server.verbosity) {
579 char *c = ".-*";
580 char buf[64];
581 time_t now;
582
583 now = time(NULL);
584 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
585 fprintf(fp,"%s %c ",buf,c[level]);
586 vfprintf(fp, fmt, ap);
587 fprintf(fp,"\n");
588 fflush(fp);
589 }
590 va_end(ap);
591
592 if (server.logfile) fclose(fp);
593 }
594
595 /*====================== Hash table type implementation ==================== */
596
597 /* This is an hash table type that uses the SDS dynamic strings libary as
598 * keys and radis objects as values (objects can hold SDS strings,
599 * lists, sets). */
600
601 static int sdsDictKeyCompare(void *privdata, const void *key1,
602 const void *key2)
603 {
604 int l1,l2;
605 DICT_NOTUSED(privdata);
606
607 l1 = sdslen((sds)key1);
608 l2 = sdslen((sds)key2);
609 if (l1 != l2) return 0;
610 return memcmp(key1, key2, l1) == 0;
611 }
612
613 static void dictRedisObjectDestructor(void *privdata, void *val)
614 {
615 DICT_NOTUSED(privdata);
616
617 decrRefCount(val);
618 }
619
620 static int dictSdsKeyCompare(void *privdata, const void *key1,
621 const void *key2)
622 {
623 const robj *o1 = key1, *o2 = key2;
624 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
625 }
626
627 static unsigned int dictSdsHash(const void *key) {
628 const robj *o = key;
629 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
630 }
631
632 static dictType setDictType = {
633 dictSdsHash, /* hash function */
634 NULL, /* key dup */
635 NULL, /* val dup */
636 dictSdsKeyCompare, /* key compare */
637 dictRedisObjectDestructor, /* key destructor */
638 NULL /* val destructor */
639 };
640
641 static dictType hashDictType = {
642 dictSdsHash, /* hash function */
643 NULL, /* key dup */
644 NULL, /* val dup */
645 dictSdsKeyCompare, /* key compare */
646 dictRedisObjectDestructor, /* key destructor */
647 dictRedisObjectDestructor /* val destructor */
648 };
649
650 /* ========================= Random utility functions ======================= */
651
652 /* Redis generally does not try to recover from out of memory conditions
653 * when allocating objects or strings, it is not clear if it will be possible
654 * to report this condition to the client since the networking layer itself
655 * is based on heap allocation for send buffers, so we simply abort.
656 * At least the code will be simpler to read... */
657 static void oom(const char *msg) {
658 fprintf(stderr, "%s: Out of memory\n",msg);
659 fflush(stderr);
660 sleep(1);
661 abort();
662 }
663
664 /* ====================== Redis server networking stuff ===================== */
665 void closeTimedoutClients(void) {
666 redisClient *c;
667 listNode *ln;
668 time_t now = time(NULL);
669
670 listRewind(server.clients);
671 while ((ln = listYield(server.clients)) != NULL) {
672 c = listNodeValue(ln);
673 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
674 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
675 (now - c->lastinteraction > server.maxidletime)) {
676 redisLog(REDIS_DEBUG,"Closing idle client");
677 freeClient(c);
678 }
679 }
680 }
681
682 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
683 * we resize the hash table to save memory */
684 void tryResizeHashTables(void) {
685 int j;
686
687 for (j = 0; j < server.dbnum; j++) {
688 long long size, used;
689
690 size = dictSlots(server.db[j].dict);
691 used = dictSize(server.db[j].dict);
692 if (size && used && size > REDIS_HT_MINSLOTS &&
693 (used*100/size < REDIS_HT_MINFILL)) {
694 redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
695 dictResize(server.db[j].dict);
696 redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
697 }
698 }
699 }
700
701 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
702 int j, loops = server.cronloops++;
703 REDIS_NOTUSED(eventLoop);
704 REDIS_NOTUSED(id);
705 REDIS_NOTUSED(clientData);
706
707 /* Update the global state with the amount of used memory */
708 server.usedmemory = zmalloc_used_memory();
709
710 /* Show some info about non-empty databases */
711 for (j = 0; j < server.dbnum; j++) {
712 long long size, used, vkeys;
713
714 size = dictSlots(server.db[j].dict);
715 used = dictSize(server.db[j].dict);
716 vkeys = dictSize(server.db[j].expires);
717 if (!(loops % 5) && used > 0) {
718 redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size);
719 /* dictPrintStats(server.dict); */
720 }
721 }
722
723 /* We don't want to resize the hash tables while a bacground saving
724 * is in progress: the saving child is created using fork() that is
725 * implemented with a copy-on-write semantic in most modern systems, so
726 * if we resize the HT while there is the saving child at work actually
727 * a lot of memory movements in the parent will cause a lot of pages
728 * copied. */
729 if (!server.bgsaveinprogress) tryResizeHashTables();
730
731 /* Show information about connected clients */
732 if (!(loops % 5)) {
733 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use",
734 listLength(server.clients)-listLength(server.slaves),
735 listLength(server.slaves),
736 server.usedmemory,
737 dictSize(server.sharingpool));
738 }
739
740 /* Close connections of timedout clients */
741 if (server.maxidletime && !(loops % 10))
742 closeTimedoutClients();
743
744 /* Check if a background saving in progress terminated */
745 if (server.bgsaveinprogress) {
746 int statloc;
747 /* XXX: TODO handle the case of the saving child killed */
748 if (wait4(-1,&statloc,WNOHANG,NULL)) {
749 int exitcode = WEXITSTATUS(statloc);
750 if (exitcode == 0) {
751 redisLog(REDIS_NOTICE,
752 "Background saving terminated with success");
753 server.dirty = 0;
754 server.lastsave = time(NULL);
755 } else {
756 redisLog(REDIS_WARNING,
757 "Background saving error");
758 }
759 server.bgsaveinprogress = 0;
760 updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
761 }
762 } else {
763 /* If there is not a background saving in progress check if
764 * we have to save now */
765 time_t now = time(NULL);
766 for (j = 0; j < server.saveparamslen; j++) {
767 struct saveparam *sp = server.saveparams+j;
768
769 if (server.dirty >= sp->changes &&
770 now-server.lastsave > sp->seconds) {
771 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
772 sp->changes, sp->seconds);
773 rdbSaveBackground(server.dbfilename);
774 break;
775 }
776 }
777 }
778
779 /* Try to expire a few timed out keys */
780 for (j = 0; j < server.dbnum; j++) {
781 redisDb *db = server.db+j;
782 int num = dictSize(db->expires);
783
784 if (num) {
785 time_t now = time(NULL);
786
787 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
788 num = REDIS_EXPIRELOOKUPS_PER_CRON;
789 while (num--) {
790 dictEntry *de;
791 time_t t;
792
793 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
794 t = (time_t) dictGetEntryVal(de);
795 if (now > t) {
796 deleteKey(db,dictGetEntryKey(de));
797 }
798 }
799 }
800 }
801
802 /* Check if we should connect to a MASTER */
803 if (server.replstate == REDIS_REPL_CONNECT) {
804 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
805 if (syncWithMaster() == REDIS_OK) {
806 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
807 }
808 }
809 return 1000;
810 }
811
812 static void createSharedObjects(void) {
813 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
814 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
815 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
816 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
817 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
818 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
819 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
820 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
821 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
822 /* no such key */
823 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
824 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
825 "-ERR Operation against a key holding the wrong kind of value\r\n"));
826 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
827 "-ERR no such key\r\n"));
828 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
829 "-ERR syntax error\r\n"));
830 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
831 "-ERR source and destination objects are the same\r\n"));
832 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
833 "-ERR index out of range\r\n"));
834 shared.space = createObject(REDIS_STRING,sdsnew(" "));
835 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
836 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
837 shared.select0 = createStringObject("select 0\r\n",10);
838 shared.select1 = createStringObject("select 1\r\n",10);
839 shared.select2 = createStringObject("select 2\r\n",10);
840 shared.select3 = createStringObject("select 3\r\n",10);
841 shared.select4 = createStringObject("select 4\r\n",10);
842 shared.select5 = createStringObject("select 5\r\n",10);
843 shared.select6 = createStringObject("select 6\r\n",10);
844 shared.select7 = createStringObject("select 7\r\n",10);
845 shared.select8 = createStringObject("select 8\r\n",10);
846 shared.select9 = createStringObject("select 9\r\n",10);
847 }
848
849 static void appendServerSaveParams(time_t seconds, int changes) {
850 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
851 if (server.saveparams == NULL) oom("appendServerSaveParams");
852 server.saveparams[server.saveparamslen].seconds = seconds;
853 server.saveparams[server.saveparamslen].changes = changes;
854 server.saveparamslen++;
855 }
856
857 static void ResetServerSaveParams() {
858 zfree(server.saveparams);
859 server.saveparams = NULL;
860 server.saveparamslen = 0;
861 }
862
863 static void initServerConfig() {
864 server.dbnum = REDIS_DEFAULT_DBNUM;
865 server.port = REDIS_SERVERPORT;
866 server.verbosity = REDIS_DEBUG;
867 server.maxidletime = REDIS_MAXIDLETIME;
868 server.saveparams = NULL;
869 server.logfile = NULL; /* NULL = log on standard output */
870 server.bindaddr = NULL;
871 server.glueoutputbuf = 1;
872 server.daemonize = 0;
873 server.pidfile = "/var/run/redis.pid";
874 server.dbfilename = "dump.rdb";
875 server.requirepass = NULL;
876 server.shareobjects = 0;
877 server.maxclients = 0;
878 server.maxmemory = 0;
879 ResetServerSaveParams();
880
881 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
882 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
883 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
884 /* Replication related */
885 server.isslave = 0;
886 server.masterhost = NULL;
887 server.masterport = 6379;
888 server.master = NULL;
889 server.replstate = REDIS_REPL_NONE;
890 }
891
892 static void initServer() {
893 int j;
894
895 signal(SIGHUP, SIG_IGN);
896 signal(SIGPIPE, SIG_IGN);
897 signal(SIGSEGV, onSigsegv);
898 signal(SIGBUS, onSigsegv);
899
900 server.clients = listCreate();
901 server.slaves = listCreate();
902 server.monitors = listCreate();
903 server.objfreelist = listCreate();
904 createSharedObjects();
905 server.el = aeCreateEventLoop();
906 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
907 server.sharingpool = dictCreate(&setDictType,NULL);
908 server.sharingpoolsize = 1024;
909 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
910 oom("server initialization"); /* Fatal OOM */
911 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
912 if (server.fd == -1) {
913 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
914 exit(1);
915 }
916 for (j = 0; j < server.dbnum; j++) {
917 server.db[j].dict = dictCreate(&hashDictType,NULL);
918 server.db[j].expires = dictCreate(&setDictType,NULL);
919 server.db[j].id = j;
920 }
921 server.cronloops = 0;
922 server.bgsaveinprogress = 0;
923 server.lastsave = time(NULL);
924 server.dirty = 0;
925 server.usedmemory = 0;
926 server.stat_numcommands = 0;
927 server.stat_numconnections = 0;
928 server.stat_starttime = time(NULL);
929 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
930 }
931
932 /* Empty the whole database */
933 static long long emptyDb() {
934 int j;
935 long long removed = 0;
936
937 for (j = 0; j < server.dbnum; j++) {
938 removed += dictSize(server.db[j].dict);
939 dictEmpty(server.db[j].dict);
940 dictEmpty(server.db[j].expires);
941 }
942 return removed;
943 }
944
945 static int yesnotoi(char *s) {
946 if (!strcasecmp(s,"yes")) return 1;
947 else if (!strcasecmp(s,"no")) return 0;
948 else return -1;
949 }
950
951 /* I agree, this is a very rudimental way to load a configuration...
952 will improve later if the config gets more complex */
953 static void loadServerConfig(char *filename) {
954 FILE *fp = fopen(filename,"r");
955 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
956 int linenum = 0;
957 sds line = NULL;
958
959 if (!fp) {
960 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
961 exit(1);
962 }
963 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
964 sds *argv;
965 int argc, j;
966
967 linenum++;
968 line = sdsnew(buf);
969 line = sdstrim(line," \t\r\n");
970
971 /* Skip comments and blank lines*/
972 if (line[0] == '#' || line[0] == '\0') {
973 sdsfree(line);
974 continue;
975 }
976
977 /* Split into arguments */
978 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
979 sdstolower(argv[0]);
980
981 /* Execute config directives */
982 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
983 server.maxidletime = atoi(argv[1]);
984 if (server.maxidletime < 0) {
985 err = "Invalid timeout value"; goto loaderr;
986 }
987 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
988 server.port = atoi(argv[1]);
989 if (server.port < 1 || server.port > 65535) {
990 err = "Invalid port"; goto loaderr;
991 }
992 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
993 server.bindaddr = zstrdup(argv[1]);
994 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
995 int seconds = atoi(argv[1]);
996 int changes = atoi(argv[2]);
997 if (seconds < 1 || changes < 0) {
998 err = "Invalid save parameters"; goto loaderr;
999 }
1000 appendServerSaveParams(seconds,changes);
1001 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1002 if (chdir(argv[1]) == -1) {
1003 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1004 argv[1], strerror(errno));
1005 exit(1);
1006 }
1007 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1008 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1009 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1010 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1011 else {
1012 err = "Invalid log level. Must be one of debug, notice, warning";
1013 goto loaderr;
1014 }
1015 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1016 FILE *fp;
1017
1018 server.logfile = zstrdup(argv[1]);
1019 if (!strcasecmp(server.logfile,"stdout")) {
1020 zfree(server.logfile);
1021 server.logfile = NULL;
1022 }
1023 if (server.logfile) {
1024 /* Test if we are able to open the file. The server will not
1025 * be able to abort just for this problem later... */
1026 fp = fopen(server.logfile,"a");
1027 if (fp == NULL) {
1028 err = sdscatprintf(sdsempty(),
1029 "Can't open the log file: %s", strerror(errno));
1030 goto loaderr;
1031 }
1032 fclose(fp);
1033 }
1034 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1035 server.dbnum = atoi(argv[1]);
1036 if (server.dbnum < 1) {
1037 err = "Invalid number of databases"; goto loaderr;
1038 }
1039 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1040 server.maxclients = atoi(argv[1]);
1041 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1042 server.maxmemory = atoi(argv[1]);
1043 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1044 server.masterhost = sdsnew(argv[1]);
1045 server.masterport = atoi(argv[2]);
1046 server.replstate = REDIS_REPL_CONNECT;
1047 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1048 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1049 err = "argument must be 'yes' or 'no'"; goto loaderr;
1050 }
1051 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1052 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1053 err = "argument must be 'yes' or 'no'"; goto loaderr;
1054 }
1055 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1056 server.sharingpoolsize = atoi(argv[1]);
1057 if (server.sharingpoolsize < 1) {
1058 err = "invalid object sharing pool size"; goto loaderr;
1059 }
1060 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1061 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1062 err = "argument must be 'yes' or 'no'"; goto loaderr;
1063 }
1064 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1065 server.requirepass = zstrdup(argv[1]);
1066 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1067 server.pidfile = zstrdup(argv[1]);
1068 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1069 server.dbfilename = zstrdup(argv[1]);
1070 } else {
1071 err = "Bad directive or wrong number of arguments"; goto loaderr;
1072 }
1073 for (j = 0; j < argc; j++)
1074 sdsfree(argv[j]);
1075 zfree(argv);
1076 sdsfree(line);
1077 }
1078 fclose(fp);
1079 return;
1080
1081 loaderr:
1082 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1083 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1084 fprintf(stderr, ">>> '%s'\n", line);
1085 fprintf(stderr, "%s\n", err);
1086 exit(1);
1087 }
1088
1089 static void freeClientArgv(redisClient *c) {
1090 int j;
1091
1092 for (j = 0; j < c->argc; j++)
1093 decrRefCount(c->argv[j]);
1094 c->argc = 0;
1095 }
1096
1097 static void freeClient(redisClient *c) {
1098 listNode *ln;
1099
1100 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1101 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1102 sdsfree(c->querybuf);
1103 listRelease(c->reply);
1104 freeClientArgv(c);
1105 close(c->fd);
1106 ln = listSearchKey(server.clients,c);
1107 assert(ln != NULL);
1108 listDelNode(server.clients,ln);
1109 if (c->flags & REDIS_SLAVE) {
1110 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1111 close(c->repldbfd);
1112 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1113 ln = listSearchKey(l,c);
1114 assert(ln != NULL);
1115 listDelNode(l,ln);
1116 }
1117 if (c->flags & REDIS_MASTER) {
1118 server.master = NULL;
1119 server.replstate = REDIS_REPL_CONNECT;
1120 }
1121 zfree(c->argv);
1122 zfree(c);
1123 }
1124
1125 static void glueReplyBuffersIfNeeded(redisClient *c) {
1126 int totlen = 0;
1127 listNode *ln;
1128 robj *o;
1129
1130 listRewind(c->reply);
1131 while((ln = listYield(c->reply))) {
1132 o = ln->value;
1133 totlen += sdslen(o->ptr);
1134 /* This optimization makes more sense if we don't have to copy
1135 * too much data */
1136 if (totlen > 1024) return;
1137 }
1138 if (totlen > 0) {
1139 char buf[1024];
1140 int copylen = 0;
1141
1142 listRewind(c->reply);
1143 while((ln = listYield(c->reply))) {
1144 o = ln->value;
1145 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1146 copylen += sdslen(o->ptr);
1147 listDelNode(c->reply,ln);
1148 }
1149 /* Now the output buffer is empty, add the new single element */
1150 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1151 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1152 }
1153 }
1154
1155 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1156 redisClient *c = privdata;
1157 int nwritten = 0, totwritten = 0, objlen;
1158 robj *o;
1159 REDIS_NOTUSED(el);
1160 REDIS_NOTUSED(mask);
1161
1162 if (server.glueoutputbuf && listLength(c->reply) > 1)
1163 glueReplyBuffersIfNeeded(c);
1164 while(listLength(c->reply)) {
1165 o = listNodeValue(listFirst(c->reply));
1166 objlen = sdslen(o->ptr);
1167
1168 if (objlen == 0) {
1169 listDelNode(c->reply,listFirst(c->reply));
1170 continue;
1171 }
1172
1173 if (c->flags & REDIS_MASTER) {
1174 nwritten = objlen - c->sentlen;
1175 } else {
1176 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1177 if (nwritten <= 0) break;
1178 }
1179 c->sentlen += nwritten;
1180 totwritten += nwritten;
1181 /* If we fully sent the object on head go to the next one */
1182 if (c->sentlen == objlen) {
1183 listDelNode(c->reply,listFirst(c->reply));
1184 c->sentlen = 0;
1185 }
1186 }
1187 if (nwritten == -1) {
1188 if (errno == EAGAIN) {
1189 nwritten = 0;
1190 } else {
1191 redisLog(REDIS_DEBUG,
1192 "Error writing to client: %s", strerror(errno));
1193 freeClient(c);
1194 return;
1195 }
1196 }
1197 if (totwritten > 0) c->lastinteraction = time(NULL);
1198 if (listLength(c->reply) == 0) {
1199 c->sentlen = 0;
1200 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1201 }
1202 }
1203
1204 static struct redisCommand *lookupCommand(char *name) {
1205 int j = 0;
1206 while(cmdTable[j].name != NULL) {
1207 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1208 j++;
1209 }
1210 return NULL;
1211 }
1212
1213 /* resetClient prepare the client to process the next command */
1214 static void resetClient(redisClient *c) {
1215 freeClientArgv(c);
1216 c->bulklen = -1;
1217 }
1218
1219 /* If this function gets called we already read a whole
1220 * command, argments are in the client argv/argc fields.
1221 * processCommand() execute the command or prepare the
1222 * server for a bulk read from the client.
1223 *
1224 * If 1 is returned the client is still alive and valid and
1225 * and other operations can be performed by the caller. Otherwise
1226 * if 0 is returned the client was destroied (i.e. after QUIT). */
1227 static int processCommand(redisClient *c) {
1228 struct redisCommand *cmd;
1229 long long dirty;
1230
1231 /* Free some memory if needed (maxmemory setting) */
1232 if (server.maxmemory) freeMemoryIfNeeded();
1233
1234 /* The QUIT command is handled as a special case. Normal command
1235 * procs are unable to close the client connection safely */
1236 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1237 freeClient(c);
1238 return 0;
1239 }
1240 cmd = lookupCommand(c->argv[0]->ptr);
1241 if (!cmd) {
1242 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1243 resetClient(c);
1244 return 1;
1245 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1246 (c->argc < -cmd->arity)) {
1247 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1248 resetClient(c);
1249 return 1;
1250 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1251 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1252 resetClient(c);
1253 return 1;
1254 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1255 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1256
1257 decrRefCount(c->argv[c->argc-1]);
1258 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1259 c->argc--;
1260 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1261 resetClient(c);
1262 return 1;
1263 }
1264 c->argc--;
1265 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1266 /* It is possible that the bulk read is already in the
1267 * buffer. Check this condition and handle it accordingly */
1268 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1269 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1270 c->argc++;
1271 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1272 } else {
1273 return 1;
1274 }
1275 }
1276 /* Let's try to share objects on the command arguments vector */
1277 if (server.shareobjects) {
1278 int j;
1279 for(j = 1; j < c->argc; j++)
1280 c->argv[j] = tryObjectSharing(c->argv[j]);
1281 }
1282 /* Check if the user is authenticated */
1283 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1284 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1285 resetClient(c);
1286 return 1;
1287 }
1288
1289 /* Exec the command */
1290 dirty = server.dirty;
1291 cmd->proc(c);
1292 if (server.dirty-dirty != 0 && listLength(server.slaves))
1293 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1294 if (listLength(server.monitors))
1295 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1296 server.stat_numcommands++;
1297
1298 /* Prepare the client for the next command */
1299 if (c->flags & REDIS_CLOSE) {
1300 freeClient(c);
1301 return 0;
1302 }
1303 resetClient(c);
1304 return 1;
1305 }
1306
1307 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1308 listNode *ln;
1309 int outc = 0, j;
1310 robj **outv;
1311 /* (args*2)+1 is enough room for args, spaces, newlines */
1312 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1313
1314 if (argc <= REDIS_STATIC_ARGS) {
1315 outv = static_outv;
1316 } else {
1317 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1318 if (!outv) oom("replicationFeedSlaves");
1319 }
1320
1321 for (j = 0; j < argc; j++) {
1322 if (j != 0) outv[outc++] = shared.space;
1323 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1324 robj *lenobj;
1325
1326 lenobj = createObject(REDIS_STRING,
1327 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv[j]->ptr)));
1328 lenobj->refcount = 0;
1329 outv[outc++] = lenobj;
1330 }
1331 outv[outc++] = argv[j];
1332 }
1333 outv[outc++] = shared.crlf;
1334
1335 /* Increment all the refcounts at start and decrement at end in order to
1336 * be sure to free objects if there is no slave in a replication state
1337 * able to be feed with commands */
1338 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1339 listRewind(slaves);
1340 while((ln = listYield(slaves))) {
1341 redisClient *slave = ln->value;
1342
1343 /* Don't feed slaves that are still waiting for BGSAVE to start */
1344 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1345
1346 /* Feed all the other slaves, MONITORs and so on */
1347 if (slave->slaveseldb != dictid) {
1348 robj *selectcmd;
1349
1350 switch(dictid) {
1351 case 0: selectcmd = shared.select0; break;
1352 case 1: selectcmd = shared.select1; break;
1353 case 2: selectcmd = shared.select2; break;
1354 case 3: selectcmd = shared.select3; break;
1355 case 4: selectcmd = shared.select4; break;
1356 case 5: selectcmd = shared.select5; break;
1357 case 6: selectcmd = shared.select6; break;
1358 case 7: selectcmd = shared.select7; break;
1359 case 8: selectcmd = shared.select8; break;
1360 case 9: selectcmd = shared.select9; break;
1361 default:
1362 selectcmd = createObject(REDIS_STRING,
1363 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1364 selectcmd->refcount = 0;
1365 break;
1366 }
1367 addReply(slave,selectcmd);
1368 slave->slaveseldb = dictid;
1369 }
1370 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1371 }
1372 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1373 if (outv != static_outv) zfree(outv);
1374 }
1375
1376 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1377 redisClient *c = (redisClient*) privdata;
1378 char buf[REDIS_IOBUF_LEN];
1379 int nread;
1380 REDIS_NOTUSED(el);
1381 REDIS_NOTUSED(mask);
1382
1383 nread = read(fd, buf, REDIS_IOBUF_LEN);
1384 if (nread == -1) {
1385 if (errno == EAGAIN) {
1386 nread = 0;
1387 } else {
1388 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1389 freeClient(c);
1390 return;
1391 }
1392 } else if (nread == 0) {
1393 redisLog(REDIS_DEBUG, "Client closed connection");
1394 freeClient(c);
1395 return;
1396 }
1397 if (nread) {
1398 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1399 c->lastinteraction = time(NULL);
1400 } else {
1401 return;
1402 }
1403
1404 again:
1405 if (c->bulklen == -1) {
1406 /* Read the first line of the query */
1407 char *p = strchr(c->querybuf,'\n');
1408 size_t querylen;
1409 if (p) {
1410 sds query, *argv;
1411 int argc, j;
1412
1413 query = c->querybuf;
1414 c->querybuf = sdsempty();
1415 querylen = 1+(p-(query));
1416 if (sdslen(query) > querylen) {
1417 /* leave data after the first line of the query in the buffer */
1418 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1419 }
1420 *p = '\0'; /* remove "\n" */
1421 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1422 sdsupdatelen(query);
1423
1424 /* Now we can split the query in arguments */
1425 if (sdslen(query) == 0) {
1426 /* Ignore empty query */
1427 sdsfree(query);
1428 return;
1429 }
1430 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1431 if (argv == NULL) oom("sdssplitlen");
1432 sdsfree(query);
1433
1434 if (c->argv) zfree(c->argv);
1435 c->argv = zmalloc(sizeof(robj*)*argc);
1436 if (c->argv == NULL) oom("allocating arguments list for client");
1437
1438 for (j = 0; j < argc; j++) {
1439 if (sdslen(argv[j])) {
1440 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1441 c->argc++;
1442 } else {
1443 sdsfree(argv[j]);
1444 }
1445 }
1446 zfree(argv);
1447 /* Execute the command. If the client is still valid
1448 * after processCommand() return and there is something
1449 * on the query buffer try to process the next command. */
1450 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1451 return;
1452 } else if (sdslen(c->querybuf) >= 1024*32) {
1453 redisLog(REDIS_DEBUG, "Client protocol error");
1454 freeClient(c);
1455 return;
1456 }
1457 } else {
1458 /* Bulk read handling. Note that if we are at this point
1459 the client already sent a command terminated with a newline,
1460 we are reading the bulk data that is actually the last
1461 argument of the command. */
1462 int qbl = sdslen(c->querybuf);
1463
1464 if (c->bulklen <= qbl) {
1465 /* Copy everything but the final CRLF as final argument */
1466 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1467 c->argc++;
1468 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1469 processCommand(c);
1470 return;
1471 }
1472 }
1473 }
1474
1475 static int selectDb(redisClient *c, int id) {
1476 if (id < 0 || id >= server.dbnum)
1477 return REDIS_ERR;
1478 c->db = &server.db[id];
1479 return REDIS_OK;
1480 }
1481
1482 static void *dupClientReplyValue(void *o) {
1483 incrRefCount((robj*)o);
1484 return 0;
1485 }
1486
1487 static redisClient *createClient(int fd) {
1488 redisClient *c = zmalloc(sizeof(*c));
1489
1490 anetNonBlock(NULL,fd);
1491 anetTcpNoDelay(NULL,fd);
1492 if (!c) return NULL;
1493 selectDb(c,0);
1494 c->fd = fd;
1495 c->querybuf = sdsempty();
1496 c->argc = 0;
1497 c->argv = NULL;
1498 c->bulklen = -1;
1499 c->sentlen = 0;
1500 c->flags = 0;
1501 c->lastinteraction = time(NULL);
1502 c->authenticated = 0;
1503 c->replstate = REDIS_REPL_NONE;
1504 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1505 listSetFreeMethod(c->reply,decrRefCount);
1506 listSetDupMethod(c->reply,dupClientReplyValue);
1507 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1508 readQueryFromClient, c, NULL) == AE_ERR) {
1509 freeClient(c);
1510 return NULL;
1511 }
1512 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1513 return c;
1514 }
1515
1516 static void addReply(redisClient *c, robj *obj) {
1517 if (listLength(c->reply) == 0 &&
1518 (c->replstate == REDIS_REPL_NONE ||
1519 c->replstate == REDIS_REPL_ONLINE) &&
1520 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1521 sendReplyToClient, c, NULL) == AE_ERR) return;
1522 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1523 incrRefCount(obj);
1524 }
1525
1526 static void addReplySds(redisClient *c, sds s) {
1527 robj *o = createObject(REDIS_STRING,s);
1528 addReply(c,o);
1529 decrRefCount(o);
1530 }
1531
1532 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1533 int cport, cfd;
1534 char cip[128];
1535 redisClient *c;
1536 REDIS_NOTUSED(el);
1537 REDIS_NOTUSED(mask);
1538 REDIS_NOTUSED(privdata);
1539
1540 cfd = anetAccept(server.neterr, fd, cip, &cport);
1541 if (cfd == AE_ERR) {
1542 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1543 return;
1544 }
1545 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1546 if ((c = createClient(cfd)) == NULL) {
1547 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1548 close(cfd); /* May be already closed, just ingore errors */
1549 return;
1550 }
1551 /* If maxclient directive is set and this is one client more... close the
1552 * connection. Note that we create the client instead to check before
1553 * for this condition, since now the socket is already set in nonblocking
1554 * mode and we can send an error for free using the Kernel I/O */
1555 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1556 char *err = "-ERR max number of clients reached\r\n";
1557
1558 /* That's a best effort error message, don't check write errors */
1559 (void) write(c->fd,err,strlen(err));
1560 freeClient(c);
1561 return;
1562 }
1563 server.stat_numconnections++;
1564 }
1565
1566 /* ======================= Redis objects implementation ===================== */
1567
1568 static robj *createObject(int type, void *ptr) {
1569 robj *o;
1570
1571 if (listLength(server.objfreelist)) {
1572 listNode *head = listFirst(server.objfreelist);
1573 o = listNodeValue(head);
1574 listDelNode(server.objfreelist,head);
1575 } else {
1576 o = zmalloc(sizeof(*o));
1577 }
1578 if (!o) oom("createObject");
1579 o->type = type;
1580 o->ptr = ptr;
1581 o->refcount = 1;
1582 return o;
1583 }
1584
1585 static robj *createStringObject(char *ptr, size_t len) {
1586 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1587 }
1588
1589 static robj *createListObject(void) {
1590 list *l = listCreate();
1591
1592 if (!l) oom("listCreate");
1593 listSetFreeMethod(l,decrRefCount);
1594 return createObject(REDIS_LIST,l);
1595 }
1596
1597 static robj *createSetObject(void) {
1598 dict *d = dictCreate(&setDictType,NULL);
1599 if (!d) oom("dictCreate");
1600 return createObject(REDIS_SET,d);
1601 }
1602
1603 static void freeStringObject(robj *o) {
1604 sdsfree(o->ptr);
1605 }
1606
1607 static void freeListObject(robj *o) {
1608 listRelease((list*) o->ptr);
1609 }
1610
1611 static void freeSetObject(robj *o) {
1612 dictRelease((dict*) o->ptr);
1613 }
1614
1615 static void freeHashObject(robj *o) {
1616 dictRelease((dict*) o->ptr);
1617 }
1618
1619 static void incrRefCount(robj *o) {
1620 o->refcount++;
1621 #ifdef DEBUG_REFCOUNT
1622 if (o->type == REDIS_STRING)
1623 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1624 #endif
1625 }
1626
1627 static void decrRefCount(void *obj) {
1628 robj *o = obj;
1629
1630 #ifdef DEBUG_REFCOUNT
1631 if (o->type == REDIS_STRING)
1632 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1633 #endif
1634 if (--(o->refcount) == 0) {
1635 switch(o->type) {
1636 case REDIS_STRING: freeStringObject(o); break;
1637 case REDIS_LIST: freeListObject(o); break;
1638 case REDIS_SET: freeSetObject(o); break;
1639 case REDIS_HASH: freeHashObject(o); break;
1640 default: assert(0 != 0); break;
1641 }
1642 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1643 !listAddNodeHead(server.objfreelist,o))
1644 zfree(o);
1645 }
1646 }
1647
1648 /* Try to share an object against the shared objects pool */
1649 static robj *tryObjectSharing(robj *o) {
1650 struct dictEntry *de;
1651 unsigned long c;
1652
1653 if (o == NULL || server.shareobjects == 0) return o;
1654
1655 assert(o->type == REDIS_STRING);
1656 de = dictFind(server.sharingpool,o);
1657 if (de) {
1658 robj *shared = dictGetEntryKey(de);
1659
1660 c = ((unsigned long) dictGetEntryVal(de))+1;
1661 dictGetEntryVal(de) = (void*) c;
1662 incrRefCount(shared);
1663 decrRefCount(o);
1664 return shared;
1665 } else {
1666 /* Here we are using a stream algorihtm: Every time an object is
1667 * shared we increment its count, everytime there is a miss we
1668 * recrement the counter of a random object. If this object reaches
1669 * zero we remove the object and put the current object instead. */
1670 if (dictSize(server.sharingpool) >=
1671 server.sharingpoolsize) {
1672 de = dictGetRandomKey(server.sharingpool);
1673 assert(de != NULL);
1674 c = ((unsigned long) dictGetEntryVal(de))-1;
1675 dictGetEntryVal(de) = (void*) c;
1676 if (c == 0) {
1677 dictDelete(server.sharingpool,de->key);
1678 }
1679 } else {
1680 c = 0; /* If the pool is empty we want to add this object */
1681 }
1682 if (c == 0) {
1683 int retval;
1684
1685 retval = dictAdd(server.sharingpool,o,(void*)1);
1686 assert(retval == DICT_OK);
1687 incrRefCount(o);
1688 }
1689 return o;
1690 }
1691 }
1692
1693 static robj *lookupKey(redisDb *db, robj *key) {
1694 dictEntry *de = dictFind(db->dict,key);
1695 return de ? dictGetEntryVal(de) : NULL;
1696 }
1697
1698 static robj *lookupKeyRead(redisDb *db, robj *key) {
1699 expireIfNeeded(db,key);
1700 return lookupKey(db,key);
1701 }
1702
1703 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1704 deleteIfVolatile(db,key);
1705 return lookupKey(db,key);
1706 }
1707
1708 static int deleteKey(redisDb *db, robj *key) {
1709 int retval;
1710
1711 /* We need to protect key from destruction: after the first dictDelete()
1712 * it may happen that 'key' is no longer valid if we don't increment
1713 * it's count. This may happen when we get the object reference directly
1714 * from the hash table with dictRandomKey() or dict iterators */
1715 incrRefCount(key);
1716 if (dictSize(db->expires)) dictDelete(db->expires,key);
1717 retval = dictDelete(db->dict,key);
1718 decrRefCount(key);
1719
1720 return retval == DICT_OK;
1721 }
1722
1723 /*============================ DB saving/loading ============================ */
1724
1725 static int rdbSaveType(FILE *fp, unsigned char type) {
1726 if (fwrite(&type,1,1,fp) == 0) return -1;
1727 return 0;
1728 }
1729
1730 static int rdbSaveTime(FILE *fp, time_t t) {
1731 int32_t t32 = (int32_t) t;
1732 if (fwrite(&t32,4,1,fp) == 0) return -1;
1733 return 0;
1734 }
1735
1736 /* check rdbLoadLen() comments for more info */
1737 static int rdbSaveLen(FILE *fp, uint32_t len) {
1738 unsigned char buf[2];
1739
1740 if (len < (1<<6)) {
1741 /* Save a 6 bit len */
1742 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
1743 if (fwrite(buf,1,1,fp) == 0) return -1;
1744 } else if (len < (1<<14)) {
1745 /* Save a 14 bit len */
1746 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
1747 buf[1] = len&0xFF;
1748 if (fwrite(buf,2,1,fp) == 0) return -1;
1749 } else {
1750 /* Save a 32 bit len */
1751 buf[0] = (REDIS_RDB_32BITLEN<<6);
1752 if (fwrite(buf,1,1,fp) == 0) return -1;
1753 len = htonl(len);
1754 if (fwrite(&len,4,1,fp) == 0) return -1;
1755 }
1756 return 0;
1757 }
1758
1759 /* String objects in the form "2391" "-100" without any space and with a
1760 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1761 * encoded as integers to save space */
1762 int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
1763 long long value;
1764 char *endptr, buf[32];
1765
1766 /* Check if it's possible to encode this value as a number */
1767 value = strtoll(s, &endptr, 10);
1768 if (endptr[0] != '\0') return 0;
1769 snprintf(buf,32,"%lld",value);
1770
1771 /* If the number converted back into a string is not identical
1772 * then it's not possible to encode the string as integer */
1773 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
1774
1775 /* Finally check if it fits in our ranges */
1776 if (value >= -(1<<7) && value <= (1<<7)-1) {
1777 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
1778 enc[1] = value&0xFF;
1779 return 2;
1780 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
1781 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
1782 enc[1] = value&0xFF;
1783 enc[2] = (value>>8)&0xFF;
1784 return 3;
1785 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
1786 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
1787 enc[1] = value&0xFF;
1788 enc[2] = (value>>8)&0xFF;
1789 enc[3] = (value>>16)&0xFF;
1790 enc[4] = (value>>24)&0xFF;
1791 return 5;
1792 } else {
1793 return 0;
1794 }
1795 }
1796
1797 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
1798 unsigned int comprlen, outlen;
1799 unsigned char byte;
1800 void *out;
1801
1802 /* We require at least four bytes compression for this to be worth it */
1803 outlen = sdslen(obj->ptr)-4;
1804 if (outlen <= 0) return 0;
1805 if ((out = zmalloc(outlen+1)) == NULL) return 0;
1806 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
1807 if (comprlen == 0) {
1808 zfree(out);
1809 return 0;
1810 }
1811 /* Data compressed! Let's save it on disk */
1812 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
1813 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
1814 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
1815 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
1816 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
1817 zfree(out);
1818 return comprlen;
1819
1820 writeerr:
1821 zfree(out);
1822 return -1;
1823 }
1824
1825 /* Save a string objet as [len][data] on disk. If the object is a string
1826 * representation of an integer value we try to safe it in a special form */
1827 static int rdbSaveStringObject(FILE *fp, robj *obj) {
1828 size_t len = sdslen(obj->ptr);
1829 int enclen;
1830
1831 /* Try integer encoding */
1832 if (len <= 11) {
1833 unsigned char buf[5];
1834 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
1835 if (fwrite(buf,enclen,1,fp) == 0) return -1;
1836 return 0;
1837 }
1838 }
1839
1840 /* Try LZF compression - under 20 bytes it's unable to compress even
1841 * aaaaaaaaaaaaaaaaaa so skip it */
1842 if (1 && len > 20) {
1843 int retval;
1844
1845 retval = rdbSaveLzfStringObject(fp,obj);
1846 if (retval == -1) return -1;
1847 if (retval > 0) return 0;
1848 /* retval == 0 means data can't be compressed, save the old way */
1849 }
1850
1851 /* Store verbatim */
1852 if (rdbSaveLen(fp,len) == -1) return -1;
1853 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
1854 return 0;
1855 }
1856
1857 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1858 static int rdbSave(char *filename) {
1859 dictIterator *di = NULL;
1860 dictEntry *de;
1861 FILE *fp;
1862 char tmpfile[256];
1863 int j;
1864 time_t now = time(NULL);
1865
1866 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
1867 fp = fopen(tmpfile,"w");
1868 if (!fp) {
1869 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
1870 return REDIS_ERR;
1871 }
1872 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
1873 for (j = 0; j < server.dbnum; j++) {
1874 redisDb *db = server.db+j;
1875 dict *d = db->dict;
1876 if (dictSize(d) == 0) continue;
1877 di = dictGetIterator(d);
1878 if (!di) {
1879 fclose(fp);
1880 return REDIS_ERR;
1881 }
1882
1883 /* Write the SELECT DB opcode */
1884 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
1885 if (rdbSaveLen(fp,j) == -1) goto werr;
1886
1887 /* Iterate this DB writing every entry */
1888 while((de = dictNext(di)) != NULL) {
1889 robj *key = dictGetEntryKey(de);
1890 robj *o = dictGetEntryVal(de);
1891 time_t expiretime = getExpire(db,key);
1892
1893 /* Save the expire time */
1894 if (expiretime != -1) {
1895 /* If this key is already expired skip it */
1896 if (expiretime < now) continue;
1897 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
1898 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
1899 }
1900 /* Save the key and associated value */
1901 if (rdbSaveType(fp,o->type) == -1) goto werr;
1902 if (rdbSaveStringObject(fp,key) == -1) goto werr;
1903 if (o->type == REDIS_STRING) {
1904 /* Save a string value */
1905 if (rdbSaveStringObject(fp,o) == -1) goto werr;
1906 } else if (o->type == REDIS_LIST) {
1907 /* Save a list value */
1908 list *list = o->ptr;
1909 listNode *ln;
1910
1911 listRewind(list);
1912 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
1913 while((ln = listYield(list))) {
1914 robj *eleobj = listNodeValue(ln);
1915
1916 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1917 }
1918 } else if (o->type == REDIS_SET) {
1919 /* Save a set value */
1920 dict *set = o->ptr;
1921 dictIterator *di = dictGetIterator(set);
1922 dictEntry *de;
1923
1924 if (!set) oom("dictGetIteraotr");
1925 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
1926 while((de = dictNext(di)) != NULL) {
1927 robj *eleobj = dictGetEntryKey(de);
1928
1929 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1930 }
1931 dictReleaseIterator(di);
1932 } else {
1933 assert(0 != 0);
1934 }
1935 }
1936 dictReleaseIterator(di);
1937 }
1938 /* EOF opcode */
1939 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
1940
1941 /* Make sure data will not remain on the OS's output buffers */
1942 fflush(fp);
1943 fsync(fileno(fp));
1944 fclose(fp);
1945
1946 /* Use RENAME to make sure the DB file is changed atomically only
1947 * if the generate DB file is ok. */
1948 if (rename(tmpfile,filename) == -1) {
1949 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
1950 unlink(tmpfile);
1951 return REDIS_ERR;
1952 }
1953 redisLog(REDIS_NOTICE,"DB saved on disk");
1954 server.dirty = 0;
1955 server.lastsave = time(NULL);
1956 return REDIS_OK;
1957
1958 werr:
1959 fclose(fp);
1960 unlink(tmpfile);
1961 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1962 if (di) dictReleaseIterator(di);
1963 return REDIS_ERR;
1964 }
1965
1966 static int rdbSaveBackground(char *filename) {
1967 pid_t childpid;
1968
1969 if (server.bgsaveinprogress) return REDIS_ERR;
1970 if ((childpid = fork()) == 0) {
1971 /* Child */
1972 close(server.fd);
1973 if (rdbSave(filename) == REDIS_OK) {
1974 exit(0);
1975 } else {
1976 exit(1);
1977 }
1978 } else {
1979 /* Parent */
1980 if (childpid == -1) {
1981 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
1982 strerror(errno));
1983 return REDIS_ERR;
1984 }
1985 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
1986 server.bgsaveinprogress = 1;
1987 return REDIS_OK;
1988 }
1989 return REDIS_OK; /* unreached */
1990 }
1991
1992 static int rdbLoadType(FILE *fp) {
1993 unsigned char type;
1994 if (fread(&type,1,1,fp) == 0) return -1;
1995 return type;
1996 }
1997
1998 static time_t rdbLoadTime(FILE *fp) {
1999 int32_t t32;
2000 if (fread(&t32,4,1,fp) == 0) return -1;
2001 return (time_t) t32;
2002 }
2003
2004 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2005 * of this file for a description of how this are stored on disk.
2006 *
2007 * isencoded is set to 1 if the readed length is not actually a length but
2008 * an "encoding type", check the above comments for more info */
2009 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2010 unsigned char buf[2];
2011 uint32_t len;
2012
2013 if (isencoded) *isencoded = 0;
2014 if (rdbver == 0) {
2015 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2016 return ntohl(len);
2017 } else {
2018 int type;
2019
2020 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2021 type = (buf[0]&0xC0)>>6;
2022 if (type == REDIS_RDB_6BITLEN) {
2023 /* Read a 6 bit len */
2024 return buf[0]&0x3F;
2025 } else if (type == REDIS_RDB_ENCVAL) {
2026 /* Read a 6 bit len encoding type */
2027 if (isencoded) *isencoded = 1;
2028 return buf[0]&0x3F;
2029 } else if (type == REDIS_RDB_14BITLEN) {
2030 /* Read a 14 bit len */
2031 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2032 return ((buf[0]&0x3F)<<8)|buf[1];
2033 } else {
2034 /* Read a 32 bit len */
2035 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2036 return ntohl(len);
2037 }
2038 }
2039 }
2040
2041 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2042 unsigned char enc[4];
2043 long long val;
2044
2045 if (enctype == REDIS_RDB_ENC_INT8) {
2046 if (fread(enc,1,1,fp) == 0) return NULL;
2047 val = (signed char)enc[0];
2048 } else if (enctype == REDIS_RDB_ENC_INT16) {
2049 uint16_t v;
2050 if (fread(enc,2,1,fp) == 0) return NULL;
2051 v = enc[0]|(enc[1]<<8);
2052 val = (int16_t)v;
2053 } else if (enctype == REDIS_RDB_ENC_INT32) {
2054 uint32_t v;
2055 if (fread(enc,4,1,fp) == 0) return NULL;
2056 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2057 val = (int32_t)v;
2058 } else {
2059 val = 0; /* anti-warning */
2060 assert(0!=0);
2061 }
2062 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2063 }
2064
2065 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2066 unsigned int len, clen;
2067 unsigned char *c = NULL;
2068 sds val = NULL;
2069
2070 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2071 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2072 if ((c = zmalloc(clen)) == NULL) goto err;
2073 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2074 if (fread(c,clen,1,fp) == 0) goto err;
2075 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2076 zfree(c);
2077 return createObject(REDIS_STRING,val);
2078 err:
2079 zfree(c);
2080 sdsfree(val);
2081 return NULL;
2082 }
2083
2084 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2085 int isencoded;
2086 uint32_t len;
2087 sds val;
2088
2089 len = rdbLoadLen(fp,rdbver,&isencoded);
2090 if (isencoded) {
2091 switch(len) {
2092 case REDIS_RDB_ENC_INT8:
2093 case REDIS_RDB_ENC_INT16:
2094 case REDIS_RDB_ENC_INT32:
2095 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2096 case REDIS_RDB_ENC_LZF:
2097 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2098 default:
2099 assert(0!=0);
2100 }
2101 }
2102
2103 if (len == REDIS_RDB_LENERR) return NULL;
2104 val = sdsnewlen(NULL,len);
2105 if (len && fread(val,len,1,fp) == 0) {
2106 sdsfree(val);
2107 return NULL;
2108 }
2109 return tryObjectSharing(createObject(REDIS_STRING,val));
2110 }
2111
2112 static int rdbLoad(char *filename) {
2113 FILE *fp;
2114 robj *keyobj = NULL;
2115 uint32_t dbid;
2116 int type, retval, rdbver;
2117 dict *d = server.db[0].dict;
2118 redisDb *db = server.db+0;
2119 char buf[1024];
2120 time_t expiretime = -1, now = time(NULL);
2121
2122 fp = fopen(filename,"r");
2123 if (!fp) return REDIS_ERR;
2124 if (fread(buf,9,1,fp) == 0) goto eoferr;
2125 buf[9] = '\0';
2126 if (memcmp(buf,"REDIS",5) != 0) {
2127 fclose(fp);
2128 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2129 return REDIS_ERR;
2130 }
2131 rdbver = atoi(buf+5);
2132 if (rdbver > 1) {
2133 fclose(fp);
2134 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2135 return REDIS_ERR;
2136 }
2137 while(1) {
2138 robj *o;
2139
2140 /* Read type. */
2141 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2142 if (type == REDIS_EXPIRETIME) {
2143 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2144 /* We read the time so we need to read the object type again */
2145 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2146 }
2147 if (type == REDIS_EOF) break;
2148 /* Handle SELECT DB opcode as a special case */
2149 if (type == REDIS_SELECTDB) {
2150 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2151 goto eoferr;
2152 if (dbid >= (unsigned)server.dbnum) {
2153 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2154 exit(1);
2155 }
2156 db = server.db+dbid;
2157 d = db->dict;
2158 continue;
2159 }
2160 /* Read key */
2161 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2162
2163 if (type == REDIS_STRING) {
2164 /* Read string value */
2165 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2166 } else if (type == REDIS_LIST || type == REDIS_SET) {
2167 /* Read list/set value */
2168 uint32_t listlen;
2169
2170 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2171 goto eoferr;
2172 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2173 /* Load every single element of the list/set */
2174 while(listlen--) {
2175 robj *ele;
2176
2177 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2178 if (type == REDIS_LIST) {
2179 if (!listAddNodeTail((list*)o->ptr,ele))
2180 oom("listAddNodeTail");
2181 } else {
2182 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2183 oom("dictAdd");
2184 }
2185 }
2186 } else {
2187 assert(0 != 0);
2188 }
2189 /* Add the new object in the hash table */
2190 retval = dictAdd(d,keyobj,o);
2191 if (retval == DICT_ERR) {
2192 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2193 exit(1);
2194 }
2195 /* Set the expire time if needed */
2196 if (expiretime != -1) {
2197 setExpire(db,keyobj,expiretime);
2198 /* Delete this key if already expired */
2199 if (expiretime < now) deleteKey(db,keyobj);
2200 expiretime = -1;
2201 }
2202 keyobj = o = NULL;
2203 }
2204 fclose(fp);
2205 return REDIS_OK;
2206
2207 eoferr: /* unexpected end of file is handled here with a fatal exit */
2208 if (keyobj) decrRefCount(keyobj);
2209 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2210 exit(1);
2211 return REDIS_ERR; /* Just to avoid warning */
2212 }
2213
2214 /*================================== Commands =============================== */
2215
2216 static void authCommand(redisClient *c) {
2217 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2218 c->authenticated = 1;
2219 addReply(c,shared.ok);
2220 } else {
2221 c->authenticated = 0;
2222 addReply(c,shared.err);
2223 }
2224 }
2225
2226 static void pingCommand(redisClient *c) {
2227 addReply(c,shared.pong);
2228 }
2229
2230 static void echoCommand(redisClient *c) {
2231 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
2232 (int)sdslen(c->argv[1]->ptr)));
2233 addReply(c,c->argv[1]);
2234 addReply(c,shared.crlf);
2235 }
2236
2237 /*=================================== Strings =============================== */
2238
2239 static void setGenericCommand(redisClient *c, int nx) {
2240 int retval;
2241
2242 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2243 if (retval == DICT_ERR) {
2244 if (!nx) {
2245 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2246 incrRefCount(c->argv[2]);
2247 } else {
2248 addReply(c,shared.czero);
2249 return;
2250 }
2251 } else {
2252 incrRefCount(c->argv[1]);
2253 incrRefCount(c->argv[2]);
2254 }
2255 server.dirty++;
2256 removeExpire(c->db,c->argv[1]);
2257 addReply(c, nx ? shared.cone : shared.ok);
2258 }
2259
2260 static void setCommand(redisClient *c) {
2261 setGenericCommand(c,0);
2262 }
2263
2264 static void setnxCommand(redisClient *c) {
2265 setGenericCommand(c,1);
2266 }
2267
2268 static void getCommand(redisClient *c) {
2269 robj *o = lookupKeyRead(c->db,c->argv[1]);
2270
2271 if (o == NULL) {
2272 addReply(c,shared.nullbulk);
2273 } else {
2274 if (o->type != REDIS_STRING) {
2275 addReply(c,shared.wrongtypeerr);
2276 } else {
2277 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2278 addReply(c,o);
2279 addReply(c,shared.crlf);
2280 }
2281 }
2282 }
2283
2284 static void getSetCommand(redisClient *c) {
2285 getCommand(c);
2286 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2287 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2288 } else {
2289 incrRefCount(c->argv[1]);
2290 }
2291 incrRefCount(c->argv[2]);
2292 server.dirty++;
2293 removeExpire(c->db,c->argv[1]);
2294 }
2295
2296 static void mgetCommand(redisClient *c) {
2297 int j;
2298
2299 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2300 for (j = 1; j < c->argc; j++) {
2301 robj *o = lookupKeyRead(c->db,c->argv[j]);
2302 if (o == NULL) {
2303 addReply(c,shared.nullbulk);
2304 } else {
2305 if (o->type != REDIS_STRING) {
2306 addReply(c,shared.nullbulk);
2307 } else {
2308 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2309 addReply(c,o);
2310 addReply(c,shared.crlf);
2311 }
2312 }
2313 }
2314 }
2315
2316 static void incrDecrCommand(redisClient *c, long long incr) {
2317 long long value;
2318 int retval;
2319 robj *o;
2320
2321 o = lookupKeyWrite(c->db,c->argv[1]);
2322 if (o == NULL) {
2323 value = 0;
2324 } else {
2325 if (o->type != REDIS_STRING) {
2326 value = 0;
2327 } else {
2328 char *eptr;
2329
2330 value = strtoll(o->ptr, &eptr, 10);
2331 }
2332 }
2333
2334 value += incr;
2335 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2336 retval = dictAdd(c->db->dict,c->argv[1],o);
2337 if (retval == DICT_ERR) {
2338 dictReplace(c->db->dict,c->argv[1],o);
2339 removeExpire(c->db,c->argv[1]);
2340 } else {
2341 incrRefCount(c->argv[1]);
2342 }
2343 server.dirty++;
2344 addReply(c,shared.colon);
2345 addReply(c,o);
2346 addReply(c,shared.crlf);
2347 }
2348
2349 static void incrCommand(redisClient *c) {
2350 incrDecrCommand(c,1);
2351 }
2352
2353 static void decrCommand(redisClient *c) {
2354 incrDecrCommand(c,-1);
2355 }
2356
2357 static void incrbyCommand(redisClient *c) {
2358 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2359 incrDecrCommand(c,incr);
2360 }
2361
2362 static void decrbyCommand(redisClient *c) {
2363 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2364 incrDecrCommand(c,-incr);
2365 }
2366
2367 /* ========================= Type agnostic commands ========================= */
2368
2369 static void delCommand(redisClient *c) {
2370 int deleted = 0, j;
2371
2372 for (j = 1; j < c->argc; j++) {
2373 if (deleteKey(c->db,c->argv[j])) {
2374 server.dirty++;
2375 deleted++;
2376 }
2377 }
2378 switch(deleted) {
2379 case 0:
2380 addReply(c,shared.czero);
2381 break;
2382 case 1:
2383 addReply(c,shared.cone);
2384 break;
2385 default:
2386 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2387 break;
2388 }
2389 }
2390
2391 static void existsCommand(redisClient *c) {
2392 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2393 }
2394
2395 static void selectCommand(redisClient *c) {
2396 int id = atoi(c->argv[1]->ptr);
2397
2398 if (selectDb(c,id) == REDIS_ERR) {
2399 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2400 } else {
2401 addReply(c,shared.ok);
2402 }
2403 }
2404
2405 static void randomkeyCommand(redisClient *c) {
2406 dictEntry *de;
2407
2408 while(1) {
2409 de = dictGetRandomKey(c->db->dict);
2410 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2411 }
2412 if (de == NULL) {
2413 addReply(c,shared.plus);
2414 addReply(c,shared.crlf);
2415 } else {
2416 addReply(c,shared.plus);
2417 addReply(c,dictGetEntryKey(de));
2418 addReply(c,shared.crlf);
2419 }
2420 }
2421
2422 static void keysCommand(redisClient *c) {
2423 dictIterator *di;
2424 dictEntry *de;
2425 sds pattern = c->argv[1]->ptr;
2426 int plen = sdslen(pattern);
2427 int numkeys = 0, keyslen = 0;
2428 robj *lenobj = createObject(REDIS_STRING,NULL);
2429
2430 di = dictGetIterator(c->db->dict);
2431 if (!di) oom("dictGetIterator");
2432 addReply(c,lenobj);
2433 decrRefCount(lenobj);
2434 while((de = dictNext(di)) != NULL) {
2435 robj *keyobj = dictGetEntryKey(de);
2436
2437 sds key = keyobj->ptr;
2438 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2439 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2440 if (expireIfNeeded(c->db,keyobj) == 0) {
2441 if (numkeys != 0)
2442 addReply(c,shared.space);
2443 addReply(c,keyobj);
2444 numkeys++;
2445 keyslen += sdslen(key);
2446 }
2447 }
2448 }
2449 dictReleaseIterator(di);
2450 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2451 addReply(c,shared.crlf);
2452 }
2453
2454 static void dbsizeCommand(redisClient *c) {
2455 addReplySds(c,
2456 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2457 }
2458
2459 static void lastsaveCommand(redisClient *c) {
2460 addReplySds(c,
2461 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2462 }
2463
2464 static void typeCommand(redisClient *c) {
2465 robj *o;
2466 char *type;
2467
2468 o = lookupKeyRead(c->db,c->argv[1]);
2469 if (o == NULL) {
2470 type = "+none";
2471 } else {
2472 switch(o->type) {
2473 case REDIS_STRING: type = "+string"; break;
2474 case REDIS_LIST: type = "+list"; break;
2475 case REDIS_SET: type = "+set"; break;
2476 default: type = "unknown"; break;
2477 }
2478 }
2479 addReplySds(c,sdsnew(type));
2480 addReply(c,shared.crlf);
2481 }
2482
2483 static void saveCommand(redisClient *c) {
2484 if (server.bgsaveinprogress) {
2485 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2486 return;
2487 }
2488 if (rdbSave(server.dbfilename) == REDIS_OK) {
2489 addReply(c,shared.ok);
2490 } else {
2491 addReply(c,shared.err);
2492 }
2493 }
2494
2495 static void bgsaveCommand(redisClient *c) {
2496 if (server.bgsaveinprogress) {
2497 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2498 return;
2499 }
2500 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2501 addReply(c,shared.ok);
2502 } else {
2503 addReply(c,shared.err);
2504 }
2505 }
2506
2507 static void shutdownCommand(redisClient *c) {
2508 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2509 /* XXX: TODO kill the child if there is a bgsave in progress */
2510 if (rdbSave(server.dbfilename) == REDIS_OK) {
2511 if (server.daemonize) {
2512 unlink(server.pidfile);
2513 }
2514 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2515 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2516 exit(1);
2517 } else {
2518 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2519 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2520 }
2521 }
2522
2523 static void renameGenericCommand(redisClient *c, int nx) {
2524 robj *o;
2525
2526 /* To use the same key as src and dst is probably an error */
2527 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2528 addReply(c,shared.sameobjecterr);
2529 return;
2530 }
2531
2532 o = lookupKeyWrite(c->db,c->argv[1]);
2533 if (o == NULL) {
2534 addReply(c,shared.nokeyerr);
2535 return;
2536 }
2537 incrRefCount(o);
2538 deleteIfVolatile(c->db,c->argv[2]);
2539 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2540 if (nx) {
2541 decrRefCount(o);
2542 addReply(c,shared.czero);
2543 return;
2544 }
2545 dictReplace(c->db->dict,c->argv[2],o);
2546 } else {
2547 incrRefCount(c->argv[2]);
2548 }
2549 deleteKey(c->db,c->argv[1]);
2550 server.dirty++;
2551 addReply(c,nx ? shared.cone : shared.ok);
2552 }
2553
2554 static void renameCommand(redisClient *c) {
2555 renameGenericCommand(c,0);
2556 }
2557
2558 static void renamenxCommand(redisClient *c) {
2559 renameGenericCommand(c,1);
2560 }
2561
2562 static void moveCommand(redisClient *c) {
2563 robj *o;
2564 redisDb *src, *dst;
2565 int srcid;
2566
2567 /* Obtain source and target DB pointers */
2568 src = c->db;
2569 srcid = c->db->id;
2570 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2571 addReply(c,shared.outofrangeerr);
2572 return;
2573 }
2574 dst = c->db;
2575 selectDb(c,srcid); /* Back to the source DB */
2576
2577 /* If the user is moving using as target the same
2578 * DB as the source DB it is probably an error. */
2579 if (src == dst) {
2580 addReply(c,shared.sameobjecterr);
2581 return;
2582 }
2583
2584 /* Check if the element exists and get a reference */
2585 o = lookupKeyWrite(c->db,c->argv[1]);
2586 if (!o) {
2587 addReply(c,shared.czero);
2588 return;
2589 }
2590
2591 /* Try to add the element to the target DB */
2592 deleteIfVolatile(dst,c->argv[1]);
2593 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2594 addReply(c,shared.czero);
2595 return;
2596 }
2597 incrRefCount(c->argv[1]);
2598 incrRefCount(o);
2599
2600 /* OK! key moved, free the entry in the source DB */
2601 deleteKey(src,c->argv[1]);
2602 server.dirty++;
2603 addReply(c,shared.cone);
2604 }
2605
2606 /* =================================== Lists ================================ */
2607 static void pushGenericCommand(redisClient *c, int where) {
2608 robj *lobj;
2609 list *list;
2610
2611 lobj = lookupKeyWrite(c->db,c->argv[1]);
2612 if (lobj == NULL) {
2613 lobj = createListObject();
2614 list = lobj->ptr;
2615 if (where == REDIS_HEAD) {
2616 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2617 } else {
2618 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2619 }
2620 dictAdd(c->db->dict,c->argv[1],lobj);
2621 incrRefCount(c->argv[1]);
2622 incrRefCount(c->argv[2]);
2623 } else {
2624 if (lobj->type != REDIS_LIST) {
2625 addReply(c,shared.wrongtypeerr);
2626 return;
2627 }
2628 list = lobj->ptr;
2629 if (where == REDIS_HEAD) {
2630 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2631 } else {
2632 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2633 }
2634 incrRefCount(c->argv[2]);
2635 }
2636 server.dirty++;
2637 addReply(c,shared.ok);
2638 }
2639
2640 static void lpushCommand(redisClient *c) {
2641 pushGenericCommand(c,REDIS_HEAD);
2642 }
2643
2644 static void rpushCommand(redisClient *c) {
2645 pushGenericCommand(c,REDIS_TAIL);
2646 }
2647
2648 static void llenCommand(redisClient *c) {
2649 robj *o;
2650 list *l;
2651
2652 o = lookupKeyRead(c->db,c->argv[1]);
2653 if (o == NULL) {
2654 addReply(c,shared.czero);
2655 return;
2656 } else {
2657 if (o->type != REDIS_LIST) {
2658 addReply(c,shared.wrongtypeerr);
2659 } else {
2660 l = o->ptr;
2661 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
2662 }
2663 }
2664 }
2665
2666 static void lindexCommand(redisClient *c) {
2667 robj *o;
2668 int index = atoi(c->argv[2]->ptr);
2669
2670 o = lookupKeyRead(c->db,c->argv[1]);
2671 if (o == NULL) {
2672 addReply(c,shared.nullbulk);
2673 } else {
2674 if (o->type != REDIS_LIST) {
2675 addReply(c,shared.wrongtypeerr);
2676 } else {
2677 list *list = o->ptr;
2678 listNode *ln;
2679
2680 ln = listIndex(list, index);
2681 if (ln == NULL) {
2682 addReply(c,shared.nullbulk);
2683 } else {
2684 robj *ele = listNodeValue(ln);
2685 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2686 addReply(c,ele);
2687 addReply(c,shared.crlf);
2688 }
2689 }
2690 }
2691 }
2692
2693 static void lsetCommand(redisClient *c) {
2694 robj *o;
2695 int index = atoi(c->argv[2]->ptr);
2696
2697 o = lookupKeyWrite(c->db,c->argv[1]);
2698 if (o == NULL) {
2699 addReply(c,shared.nokeyerr);
2700 } else {
2701 if (o->type != REDIS_LIST) {
2702 addReply(c,shared.wrongtypeerr);
2703 } else {
2704 list *list = o->ptr;
2705 listNode *ln;
2706
2707 ln = listIndex(list, index);
2708 if (ln == NULL) {
2709 addReply(c,shared.outofrangeerr);
2710 } else {
2711 robj *ele = listNodeValue(ln);
2712
2713 decrRefCount(ele);
2714 listNodeValue(ln) = c->argv[3];
2715 incrRefCount(c->argv[3]);
2716 addReply(c,shared.ok);
2717 server.dirty++;
2718 }
2719 }
2720 }
2721 }
2722
2723 static void popGenericCommand(redisClient *c, int where) {
2724 robj *o;
2725
2726 o = lookupKeyWrite(c->db,c->argv[1]);
2727 if (o == NULL) {
2728 addReply(c,shared.nullbulk);
2729 } else {
2730 if (o->type != REDIS_LIST) {
2731 addReply(c,shared.wrongtypeerr);
2732 } else {
2733 list *list = o->ptr;
2734 listNode *ln;
2735
2736 if (where == REDIS_HEAD)
2737 ln = listFirst(list);
2738 else
2739 ln = listLast(list);
2740
2741 if (ln == NULL) {
2742 addReply(c,shared.nullbulk);
2743 } else {
2744 robj *ele = listNodeValue(ln);
2745 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2746 addReply(c,ele);
2747 addReply(c,shared.crlf);
2748 listDelNode(list,ln);
2749 server.dirty++;
2750 }
2751 }
2752 }
2753 }
2754
2755 static void lpopCommand(redisClient *c) {
2756 popGenericCommand(c,REDIS_HEAD);
2757 }
2758
2759 static void rpopCommand(redisClient *c) {
2760 popGenericCommand(c,REDIS_TAIL);
2761 }
2762
2763 static void lrangeCommand(redisClient *c) {
2764 robj *o;
2765 int start = atoi(c->argv[2]->ptr);
2766 int end = atoi(c->argv[3]->ptr);
2767
2768 o = lookupKeyRead(c->db,c->argv[1]);
2769 if (o == NULL) {
2770 addReply(c,shared.nullmultibulk);
2771 } else {
2772 if (o->type != REDIS_LIST) {
2773 addReply(c,shared.wrongtypeerr);
2774 } else {
2775 list *list = o->ptr;
2776 listNode *ln;
2777 int llen = listLength(list);
2778 int rangelen, j;
2779 robj *ele;
2780
2781 /* convert negative indexes */
2782 if (start < 0) start = llen+start;
2783 if (end < 0) end = llen+end;
2784 if (start < 0) start = 0;
2785 if (end < 0) end = 0;
2786
2787 /* indexes sanity checks */
2788 if (start > end || start >= llen) {
2789 /* Out of range start or start > end result in empty list */
2790 addReply(c,shared.emptymultibulk);
2791 return;
2792 }
2793 if (end >= llen) end = llen-1;
2794 rangelen = (end-start)+1;
2795
2796 /* Return the result in form of a multi-bulk reply */
2797 ln = listIndex(list, start);
2798 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
2799 for (j = 0; j < rangelen; j++) {
2800 ele = listNodeValue(ln);
2801 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2802 addReply(c,ele);
2803 addReply(c,shared.crlf);
2804 ln = ln->next;
2805 }
2806 }
2807 }
2808 }
2809
2810 static void ltrimCommand(redisClient *c) {
2811 robj *o;
2812 int start = atoi(c->argv[2]->ptr);
2813 int end = atoi(c->argv[3]->ptr);
2814
2815 o = lookupKeyWrite(c->db,c->argv[1]);
2816 if (o == NULL) {
2817 addReply(c,shared.nokeyerr);
2818 } else {
2819 if (o->type != REDIS_LIST) {
2820 addReply(c,shared.wrongtypeerr);
2821 } else {
2822 list *list = o->ptr;
2823 listNode *ln;
2824 int llen = listLength(list);
2825 int j, ltrim, rtrim;
2826
2827 /* convert negative indexes */
2828 if (start < 0) start = llen+start;
2829 if (end < 0) end = llen+end;
2830 if (start < 0) start = 0;
2831 if (end < 0) end = 0;
2832
2833 /* indexes sanity checks */
2834 if (start > end || start >= llen) {
2835 /* Out of range start or start > end result in empty list */
2836 ltrim = llen;
2837 rtrim = 0;
2838 } else {
2839 if (end >= llen) end = llen-1;
2840 ltrim = start;
2841 rtrim = llen-end-1;
2842 }
2843
2844 /* Remove list elements to perform the trim */
2845 for (j = 0; j < ltrim; j++) {
2846 ln = listFirst(list);
2847 listDelNode(list,ln);
2848 }
2849 for (j = 0; j < rtrim; j++) {
2850 ln = listLast(list);
2851 listDelNode(list,ln);
2852 }
2853 addReply(c,shared.ok);
2854 server.dirty++;
2855 }
2856 }
2857 }
2858
2859 static void lremCommand(redisClient *c) {
2860 robj *o;
2861
2862 o = lookupKeyWrite(c->db,c->argv[1]);
2863 if (o == NULL) {
2864 addReply(c,shared.nokeyerr);
2865 } else {
2866 if (o->type != REDIS_LIST) {
2867 addReply(c,shared.wrongtypeerr);
2868 } else {
2869 list *list = o->ptr;
2870 listNode *ln, *next;
2871 int toremove = atoi(c->argv[2]->ptr);
2872 int removed = 0;
2873 int fromtail = 0;
2874
2875 if (toremove < 0) {
2876 toremove = -toremove;
2877 fromtail = 1;
2878 }
2879 ln = fromtail ? list->tail : list->head;
2880 while (ln) {
2881 robj *ele = listNodeValue(ln);
2882
2883 next = fromtail ? ln->prev : ln->next;
2884 if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) {
2885 listDelNode(list,ln);
2886 server.dirty++;
2887 removed++;
2888 if (toremove && removed == toremove) break;
2889 }
2890 ln = next;
2891 }
2892 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
2893 }
2894 }
2895 }
2896
2897 /* ==================================== Sets ================================ */
2898
2899 static void saddCommand(redisClient *c) {
2900 robj *set;
2901
2902 set = lookupKeyWrite(c->db,c->argv[1]);
2903 if (set == NULL) {
2904 set = createSetObject();
2905 dictAdd(c->db->dict,c->argv[1],set);
2906 incrRefCount(c->argv[1]);
2907 } else {
2908 if (set->type != REDIS_SET) {
2909 addReply(c,shared.wrongtypeerr);
2910 return;
2911 }
2912 }
2913 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
2914 incrRefCount(c->argv[2]);
2915 server.dirty++;
2916 addReply(c,shared.cone);
2917 } else {
2918 addReply(c,shared.czero);
2919 }
2920 }
2921
2922 static void sremCommand(redisClient *c) {
2923 robj *set;
2924
2925 set = lookupKeyWrite(c->db,c->argv[1]);
2926 if (set == NULL) {
2927 addReply(c,shared.czero);
2928 } else {
2929 if (set->type != REDIS_SET) {
2930 addReply(c,shared.wrongtypeerr);
2931 return;
2932 }
2933 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
2934 server.dirty++;
2935 addReply(c,shared.cone);
2936 } else {
2937 addReply(c,shared.czero);
2938 }
2939 }
2940 }
2941
2942 static void smoveCommand(redisClient *c) {
2943 robj *srcset, *dstset;
2944
2945 srcset = lookupKeyWrite(c->db,c->argv[1]);
2946 dstset = lookupKeyWrite(c->db,c->argv[2]);
2947
2948 /* If the source key does not exist return 0, if it's of the wrong type
2949 * raise an error */
2950 if (srcset == NULL || srcset->type != REDIS_SET) {
2951 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
2952 return;
2953 }
2954 /* Error if the destination key is not a set as well */
2955 if (dstset && dstset->type != REDIS_SET) {
2956 addReply(c,shared.wrongtypeerr);
2957 return;
2958 }
2959 /* Remove the element from the source set */
2960 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
2961 /* Key not found in the src set! return zero */
2962 addReply(c,shared.czero);
2963 return;
2964 }
2965 server.dirty++;
2966 /* Add the element to the destination set */
2967 if (!dstset) {
2968 dstset = createSetObject();
2969 dictAdd(c->db->dict,c->argv[2],dstset);
2970 incrRefCount(c->argv[2]);
2971 }
2972 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
2973 incrRefCount(c->argv[3]);
2974 addReply(c,shared.cone);
2975 }
2976
2977 static void sismemberCommand(redisClient *c) {
2978 robj *set;
2979
2980 set = lookupKeyRead(c->db,c->argv[1]);
2981 if (set == NULL) {
2982 addReply(c,shared.czero);
2983 } else {
2984 if (set->type != REDIS_SET) {
2985 addReply(c,shared.wrongtypeerr);
2986 return;
2987 }
2988 if (dictFind(set->ptr,c->argv[2]))
2989 addReply(c,shared.cone);
2990 else
2991 addReply(c,shared.czero);
2992 }
2993 }
2994
2995 static void scardCommand(redisClient *c) {
2996 robj *o;
2997 dict *s;
2998
2999 o = lookupKeyRead(c->db,c->argv[1]);
3000 if (o == NULL) {
3001 addReply(c,shared.czero);
3002 return;
3003 } else {
3004 if (o->type != REDIS_SET) {
3005 addReply(c,shared.wrongtypeerr);
3006 } else {
3007 s = o->ptr;
3008 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3009 dictSize(s)));
3010 }
3011 }
3012 }
3013
3014 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3015 dict **d1 = (void*) s1, **d2 = (void*) s2;
3016
3017 return dictSize(*d1)-dictSize(*d2);
3018 }
3019
3020 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3021 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3022 dictIterator *di;
3023 dictEntry *de;
3024 robj *lenobj = NULL, *dstset = NULL;
3025 int j, cardinality = 0;
3026
3027 if (!dv) oom("sinterGenericCommand");
3028 for (j = 0; j < setsnum; j++) {
3029 robj *setobj;
3030
3031 setobj = dstkey ?
3032 lookupKeyWrite(c->db,setskeys[j]) :
3033 lookupKeyRead(c->db,setskeys[j]);
3034 if (!setobj) {
3035 zfree(dv);
3036 if (dstkey) {
3037 deleteKey(c->db,dstkey);
3038 addReply(c,shared.ok);
3039 } else {
3040 addReply(c,shared.nullmultibulk);
3041 }
3042 return;
3043 }
3044 if (setobj->type != REDIS_SET) {
3045 zfree(dv);
3046 addReply(c,shared.wrongtypeerr);
3047 return;
3048 }
3049 dv[j] = setobj->ptr;
3050 }
3051 /* Sort sets from the smallest to largest, this will improve our
3052 * algorithm's performace */
3053 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3054
3055 /* The first thing we should output is the total number of elements...
3056 * since this is a multi-bulk write, but at this stage we don't know
3057 * the intersection set size, so we use a trick, append an empty object
3058 * to the output list and save the pointer to later modify it with the
3059 * right length */
3060 if (!dstkey) {
3061 lenobj = createObject(REDIS_STRING,NULL);
3062 addReply(c,lenobj);
3063 decrRefCount(lenobj);
3064 } else {
3065 /* If we have a target key where to store the resulting set
3066 * create this key with an empty set inside */
3067 dstset = createSetObject();
3068 }
3069
3070 /* Iterate all the elements of the first (smallest) set, and test
3071 * the element against all the other sets, if at least one set does
3072 * not include the element it is discarded */
3073 di = dictGetIterator(dv[0]);
3074 if (!di) oom("dictGetIterator");
3075
3076 while((de = dictNext(di)) != NULL) {
3077 robj *ele;
3078
3079 for (j = 1; j < setsnum; j++)
3080 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3081 if (j != setsnum)
3082 continue; /* at least one set does not contain the member */
3083 ele = dictGetEntryKey(de);
3084 if (!dstkey) {
3085 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
3086 addReply(c,ele);
3087 addReply(c,shared.crlf);
3088 cardinality++;
3089 } else {
3090 dictAdd(dstset->ptr,ele,NULL);
3091 incrRefCount(ele);
3092 }
3093 }
3094 dictReleaseIterator(di);
3095
3096 if (dstkey) {
3097 /* Store the resulting set into the target */
3098 deleteKey(c->db,dstkey);
3099 dictAdd(c->db->dict,dstkey,dstset);
3100 incrRefCount(dstkey);
3101 }
3102
3103 if (!dstkey) {
3104 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3105 } else {
3106 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3107 dictSize((dict*)dstset->ptr)));
3108 server.dirty++;
3109 }
3110 zfree(dv);
3111 }
3112
3113 static void sinterCommand(redisClient *c) {
3114 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3115 }
3116
3117 static void sinterstoreCommand(redisClient *c) {
3118 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3119 }
3120
3121 #define REDIS_OP_UNION 0
3122 #define REDIS_OP_DIFF 1
3123
3124 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3125 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3126 dictIterator *di;
3127 dictEntry *de;
3128 robj *dstset = NULL;
3129 int j, cardinality = 0;
3130
3131 if (!dv) oom("sunionDiffGenericCommand");
3132 for (j = 0; j < setsnum; j++) {
3133 robj *setobj;
3134
3135 setobj = dstkey ?
3136 lookupKeyWrite(c->db,setskeys[j]) :
3137 lookupKeyRead(c->db,setskeys[j]);
3138 if (!setobj) {
3139 dv[j] = NULL;
3140 continue;
3141 }
3142 if (setobj->type != REDIS_SET) {
3143 zfree(dv);
3144 addReply(c,shared.wrongtypeerr);
3145 return;
3146 }
3147 dv[j] = setobj->ptr;
3148 }
3149
3150 /* We need a temp set object to store our union. If the dstkey
3151 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3152 * this set object will be the resulting object to set into the target key*/
3153 dstset = createSetObject();
3154
3155 /* Iterate all the elements of all the sets, add every element a single
3156 * time to the result set */
3157 for (j = 0; j < setsnum; j++) {
3158 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3159 if (!dv[j]) continue; /* non existing keys are like empty sets */
3160
3161 di = dictGetIterator(dv[j]);
3162 if (!di) oom("dictGetIterator");
3163
3164 while((de = dictNext(di)) != NULL) {
3165 robj *ele;
3166
3167 /* dictAdd will not add the same element multiple times */
3168 ele = dictGetEntryKey(de);
3169 if (op == REDIS_OP_UNION || j == 0) {
3170 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3171 incrRefCount(ele);
3172 cardinality++;
3173 }
3174 } else if (op == REDIS_OP_DIFF) {
3175 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3176 cardinality--;
3177 }
3178 }
3179 }
3180 dictReleaseIterator(di);
3181
3182 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3183 }
3184
3185 /* Output the content of the resulting set, if not in STORE mode */
3186 if (!dstkey) {
3187 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3188 di = dictGetIterator(dstset->ptr);
3189 if (!di) oom("dictGetIterator");
3190 while((de = dictNext(di)) != NULL) {
3191 robj *ele;
3192
3193 ele = dictGetEntryKey(de);
3194 addReplySds(c,sdscatprintf(sdsempty(),
3195 "$%d\r\n",sdslen(ele->ptr)));
3196 addReply(c,ele);
3197 addReply(c,shared.crlf);
3198 }
3199 dictReleaseIterator(di);
3200 } else {
3201 /* If we have a target key where to store the resulting set
3202 * create this key with the result set inside */
3203 deleteKey(c->db,dstkey);
3204 dictAdd(c->db->dict,dstkey,dstset);
3205 incrRefCount(dstkey);
3206 }
3207
3208 /* Cleanup */
3209 if (!dstkey) {
3210 decrRefCount(dstset);
3211 } else {
3212 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3213 dictSize((dict*)dstset->ptr)));
3214 server.dirty++;
3215 }
3216 zfree(dv);
3217 }
3218
3219 static void sunionCommand(redisClient *c) {
3220 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3221 }
3222
3223 static void sunionstoreCommand(redisClient *c) {
3224 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3225 }
3226
3227 static void sdiffCommand(redisClient *c) {
3228 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3229 }
3230
3231 static void sdiffstoreCommand(redisClient *c) {
3232 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3233 }
3234
3235 static void flushdbCommand(redisClient *c) {
3236 server.dirty += dictSize(c->db->dict);
3237 dictEmpty(c->db->dict);
3238 dictEmpty(c->db->expires);
3239 addReply(c,shared.ok);
3240 }
3241
3242 static void flushallCommand(redisClient *c) {
3243 server.dirty += emptyDb();
3244 addReply(c,shared.ok);
3245 rdbSave(server.dbfilename);
3246 server.dirty++;
3247 }
3248
3249 redisSortOperation *createSortOperation(int type, robj *pattern) {
3250 redisSortOperation *so = zmalloc(sizeof(*so));
3251 if (!so) oom("createSortOperation");
3252 so->type = type;
3253 so->pattern = pattern;
3254 return so;
3255 }
3256
3257 /* Return the value associated to the key with a name obtained
3258 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3259 robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3260 char *p;
3261 sds spat, ssub;
3262 robj keyobj;
3263 int prefixlen, sublen, postfixlen;
3264 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3265 struct {
3266 long len;
3267 long free;
3268 char buf[REDIS_SORTKEY_MAX+1];
3269 } keyname;
3270
3271 spat = pattern->ptr;
3272 ssub = subst->ptr;
3273 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3274 p = strchr(spat,'*');
3275 if (!p) return NULL;
3276
3277 prefixlen = p-spat;
3278 sublen = sdslen(ssub);
3279 postfixlen = sdslen(spat)-(prefixlen+1);
3280 memcpy(keyname.buf,spat,prefixlen);
3281 memcpy(keyname.buf+prefixlen,ssub,sublen);
3282 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3283 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3284 keyname.len = prefixlen+sublen+postfixlen;
3285
3286 keyobj.refcount = 1;
3287 keyobj.type = REDIS_STRING;
3288 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3289
3290 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3291 return lookupKeyRead(db,&keyobj);
3292 }
3293
3294 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3295 * the additional parameter is not standard but a BSD-specific we have to
3296 * pass sorting parameters via the global 'server' structure */
3297 static int sortCompare(const void *s1, const void *s2) {
3298 const redisSortObject *so1 = s1, *so2 = s2;
3299 int cmp;
3300
3301 if (!server.sort_alpha) {
3302 /* Numeric sorting. Here it's trivial as we precomputed scores */
3303 if (so1->u.score > so2->u.score) {
3304 cmp = 1;
3305 } else if (so1->u.score < so2->u.score) {
3306 cmp = -1;
3307 } else {
3308 cmp = 0;
3309 }
3310 } else {
3311 /* Alphanumeric sorting */
3312 if (server.sort_bypattern) {
3313 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3314 /* At least one compare object is NULL */
3315 if (so1->u.cmpobj == so2->u.cmpobj)
3316 cmp = 0;
3317 else if (so1->u.cmpobj == NULL)
3318 cmp = -1;
3319 else
3320 cmp = 1;
3321 } else {
3322 /* We have both the objects, use strcoll */
3323 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3324 }
3325 } else {
3326 /* Compare elements directly */
3327 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3328 }
3329 }
3330 return server.sort_desc ? -cmp : cmp;
3331 }
3332
3333 /* The SORT command is the most complex command in Redis. Warning: this code
3334 * is optimized for speed and a bit less for readability */
3335 static void sortCommand(redisClient *c) {
3336 list *operations;
3337 int outputlen = 0;
3338 int desc = 0, alpha = 0;
3339 int limit_start = 0, limit_count = -1, start, end;
3340 int j, dontsort = 0, vectorlen;
3341 int getop = 0; /* GET operation counter */
3342 robj *sortval, *sortby = NULL;
3343 redisSortObject *vector; /* Resulting vector to sort */
3344
3345 /* Lookup the key to sort. It must be of the right types */
3346 sortval = lookupKeyRead(c->db,c->argv[1]);
3347 if (sortval == NULL) {
3348 addReply(c,shared.nokeyerr);
3349 return;
3350 }
3351 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3352 addReply(c,shared.wrongtypeerr);
3353 return;
3354 }
3355
3356 /* Create a list of operations to perform for every sorted element.
3357 * Operations can be GET/DEL/INCR/DECR */
3358 operations = listCreate();
3359 listSetFreeMethod(operations,zfree);
3360 j = 2;
3361
3362 /* Now we need to protect sortval incrementing its count, in the future
3363 * SORT may have options able to overwrite/delete keys during the sorting
3364 * and the sorted key itself may get destroied */
3365 incrRefCount(sortval);
3366
3367 /* The SORT command has an SQL-alike syntax, parse it */
3368 while(j < c->argc) {
3369 int leftargs = c->argc-j-1;
3370 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3371 desc = 0;
3372 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3373 desc = 1;
3374 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3375 alpha = 1;
3376 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3377 limit_start = atoi(c->argv[j+1]->ptr);
3378 limit_count = atoi(c->argv[j+2]->ptr);
3379 j+=2;
3380 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3381 sortby = c->argv[j+1];
3382 /* If the BY pattern does not contain '*', i.e. it is constant,
3383 * we don't need to sort nor to lookup the weight keys. */
3384 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3385 j++;
3386 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3387 listAddNodeTail(operations,createSortOperation(
3388 REDIS_SORT_GET,c->argv[j+1]));
3389 getop++;
3390 j++;
3391 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3392 listAddNodeTail(operations,createSortOperation(
3393 REDIS_SORT_DEL,c->argv[j+1]));
3394 j++;
3395 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3396 listAddNodeTail(operations,createSortOperation(
3397 REDIS_SORT_INCR,c->argv[j+1]));
3398 j++;
3399 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3400 listAddNodeTail(operations,createSortOperation(
3401 REDIS_SORT_DECR,c->argv[j+1]));
3402 j++;
3403 } else {
3404 decrRefCount(sortval);
3405 listRelease(operations);
3406 addReply(c,shared.syntaxerr);
3407 return;
3408 }
3409 j++;
3410 }
3411
3412 /* Load the sorting vector with all the objects to sort */
3413 vectorlen = (sortval->type == REDIS_LIST) ?
3414 listLength((list*)sortval->ptr) :
3415 dictSize((dict*)sortval->ptr);
3416 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3417 if (!vector) oom("allocating objects vector for SORT");
3418 j = 0;
3419 if (sortval->type == REDIS_LIST) {
3420 list *list = sortval->ptr;
3421 listNode *ln;
3422
3423 listRewind(list);
3424 while((ln = listYield(list))) {
3425 robj *ele = ln->value;
3426 vector[j].obj = ele;
3427 vector[j].u.score = 0;
3428 vector[j].u.cmpobj = NULL;
3429 j++;
3430 }
3431 } else {
3432 dict *set = sortval->ptr;
3433 dictIterator *di;
3434 dictEntry *setele;
3435
3436 di = dictGetIterator(set);
3437 if (!di) oom("dictGetIterator");
3438 while((setele = dictNext(di)) != NULL) {
3439 vector[j].obj = dictGetEntryKey(setele);
3440 vector[j].u.score = 0;
3441 vector[j].u.cmpobj = NULL;
3442 j++;
3443 }
3444 dictReleaseIterator(di);
3445 }
3446 assert(j == vectorlen);
3447
3448 /* Now it's time to load the right scores in the sorting vector */
3449 if (dontsort == 0) {
3450 for (j = 0; j < vectorlen; j++) {
3451 if (sortby) {
3452 robj *byval;
3453
3454 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3455 if (!byval || byval->type != REDIS_STRING) continue;
3456 if (alpha) {
3457 vector[j].u.cmpobj = byval;
3458 incrRefCount(byval);
3459 } else {
3460 vector[j].u.score = strtod(byval->ptr,NULL);
3461 }
3462 } else {
3463 if (!alpha) vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3464 }
3465 }
3466 }
3467
3468 /* We are ready to sort the vector... perform a bit of sanity check
3469 * on the LIMIT option too. We'll use a partial version of quicksort. */
3470 start = (limit_start < 0) ? 0 : limit_start;
3471 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3472 if (start >= vectorlen) {
3473 start = vectorlen-1;
3474 end = vectorlen-2;
3475 }
3476 if (end >= vectorlen) end = vectorlen-1;
3477
3478 if (dontsort == 0) {
3479 server.sort_desc = desc;
3480 server.sort_alpha = alpha;
3481 server.sort_bypattern = sortby ? 1 : 0;
3482 if (sortby && (start != 0 || end != vectorlen-1))
3483 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3484 else
3485 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3486 }
3487
3488 /* Send command output to the output buffer, performing the specified
3489 * GET/DEL/INCR/DECR operations if any. */
3490 outputlen = getop ? getop*(end-start+1) : end-start+1;
3491 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3492 for (j = start; j <= end; j++) {
3493 listNode *ln;
3494 if (!getop) {
3495 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3496 sdslen(vector[j].obj->ptr)));
3497 addReply(c,vector[j].obj);
3498 addReply(c,shared.crlf);
3499 }
3500 listRewind(operations);
3501 while((ln = listYield(operations))) {
3502 redisSortOperation *sop = ln->value;
3503 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3504 vector[j].obj);
3505
3506 if (sop->type == REDIS_SORT_GET) {
3507 if (!val || val->type != REDIS_STRING) {
3508 addReply(c,shared.nullbulk);
3509 } else {
3510 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3511 sdslen(val->ptr)));
3512 addReply(c,val);
3513 addReply(c,shared.crlf);
3514 }
3515 } else if (sop->type == REDIS_SORT_DEL) {
3516 /* TODO */
3517 }
3518 }
3519 }
3520
3521 /* Cleanup */
3522 decrRefCount(sortval);
3523 listRelease(operations);
3524 for (j = 0; j < vectorlen; j++) {
3525 if (sortby && alpha && vector[j].u.cmpobj)
3526 decrRefCount(vector[j].u.cmpobj);
3527 }
3528 zfree(vector);
3529 }
3530
3531 static void infoCommand(redisClient *c) {
3532 sds info;
3533 time_t uptime = time(NULL)-server.stat_starttime;
3534
3535 info = sdscatprintf(sdsempty(),
3536 "redis_version:%s\r\n"
3537 "uptime_in_seconds:%d\r\n"
3538 "uptime_in_days:%d\r\n"
3539 "connected_clients:%d\r\n"
3540 "connected_slaves:%d\r\n"
3541 "used_memory:%zu\r\n"
3542 "changes_since_last_save:%lld\r\n"
3543 "bgsave_in_progress:%d\r\n"
3544 "last_save_time:%d\r\n"
3545 "total_connections_received:%lld\r\n"
3546 "total_commands_processed:%lld\r\n"
3547 "role:%s\r\n"
3548 ,REDIS_VERSION,
3549 uptime,
3550 uptime/(3600*24),
3551 listLength(server.clients)-listLength(server.slaves),
3552 listLength(server.slaves),
3553 server.usedmemory,
3554 server.dirty,
3555 server.bgsaveinprogress,
3556 server.lastsave,
3557 server.stat_numconnections,
3558 server.stat_numcommands,
3559 server.masterhost == NULL ? "master" : "slave"
3560 );
3561 if (server.masterhost) {
3562 info = sdscatprintf(info,
3563 "master_host:%s\r\n"
3564 "master_port:%d\r\n"
3565 "master_link_status:%s\r\n"
3566 "master_last_io_seconds_ago:%d\r\n"
3567 ,server.masterhost,
3568 server.masterport,
3569 (server.replstate == REDIS_REPL_CONNECTED) ?
3570 "up" : "down",
3571 (int)(time(NULL)-server.master->lastinteraction)
3572 );
3573 }
3574 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
3575 addReplySds(c,info);
3576 addReply(c,shared.crlf);
3577 }
3578
3579 static void monitorCommand(redisClient *c) {
3580 /* ignore MONITOR if aleady slave or in monitor mode */
3581 if (c->flags & REDIS_SLAVE) return;
3582
3583 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3584 c->slaveseldb = 0;
3585 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3586 addReply(c,shared.ok);
3587 }
3588
3589 /* ================================= Expire ================================= */
3590 static int removeExpire(redisDb *db, robj *key) {
3591 if (dictDelete(db->expires,key) == DICT_OK) {
3592 return 1;
3593 } else {
3594 return 0;
3595 }
3596 }
3597
3598 static int setExpire(redisDb *db, robj *key, time_t when) {
3599 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
3600 return 0;
3601 } else {
3602 incrRefCount(key);
3603 return 1;
3604 }
3605 }
3606
3607 /* Return the expire time of the specified key, or -1 if no expire
3608 * is associated with this key (i.e. the key is non volatile) */
3609 static time_t getExpire(redisDb *db, robj *key) {
3610 dictEntry *de;
3611
3612 /* No expire? return ASAP */
3613 if (dictSize(db->expires) == 0 ||
3614 (de = dictFind(db->expires,key)) == NULL) return -1;
3615
3616 return (time_t) dictGetEntryVal(de);
3617 }
3618
3619 static int expireIfNeeded(redisDb *db, robj *key) {
3620 time_t when;
3621 dictEntry *de;
3622
3623 /* No expire? return ASAP */
3624 if (dictSize(db->expires) == 0 ||
3625 (de = dictFind(db->expires,key)) == NULL) return 0;
3626
3627 /* Lookup the expire */
3628 when = (time_t) dictGetEntryVal(de);
3629 if (time(NULL) <= when) return 0;
3630
3631 /* Delete the key */
3632 dictDelete(db->expires,key);
3633 return dictDelete(db->dict,key) == DICT_OK;
3634 }
3635
3636 static int deleteIfVolatile(redisDb *db, robj *key) {
3637 dictEntry *de;
3638
3639 /* No expire? return ASAP */
3640 if (dictSize(db->expires) == 0 ||
3641 (de = dictFind(db->expires,key)) == NULL) return 0;
3642
3643 /* Delete the key */
3644 server.dirty++;
3645 dictDelete(db->expires,key);
3646 return dictDelete(db->dict,key) == DICT_OK;
3647 }
3648
3649 static void expireCommand(redisClient *c) {
3650 dictEntry *de;
3651 int seconds = atoi(c->argv[2]->ptr);
3652
3653 de = dictFind(c->db->dict,c->argv[1]);
3654 if (de == NULL) {
3655 addReply(c,shared.czero);
3656 return;
3657 }
3658 if (seconds <= 0) {
3659 addReply(c, shared.czero);
3660 return;
3661 } else {
3662 time_t when = time(NULL)+seconds;
3663 if (setExpire(c->db,c->argv[1],when))
3664 addReply(c,shared.cone);
3665 else
3666 addReply(c,shared.czero);
3667 return;
3668 }
3669 }
3670
3671 static void ttlCommand(redisClient *c) {
3672 time_t expire;
3673 int ttl = -1;
3674
3675 expire = getExpire(c->db,c->argv[1]);
3676 if (expire != -1) {
3677 ttl = (int) (expire-time(NULL));
3678 if (ttl < 0) ttl = -1;
3679 }
3680 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
3681 }
3682
3683 /* =============================== Replication ============================= */
3684
3685 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
3686 ssize_t nwritten, ret = size;
3687 time_t start = time(NULL);
3688
3689 timeout++;
3690 while(size) {
3691 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
3692 nwritten = write(fd,ptr,size);
3693 if (nwritten == -1) return -1;
3694 ptr += nwritten;
3695 size -= nwritten;
3696 }
3697 if ((time(NULL)-start) > timeout) {
3698 errno = ETIMEDOUT;
3699 return -1;
3700 }
3701 }
3702 return ret;
3703 }
3704
3705 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
3706 ssize_t nread, totread = 0;
3707 time_t start = time(NULL);
3708
3709 timeout++;
3710 while(size) {
3711 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
3712 nread = read(fd,ptr,size);
3713 if (nread == -1) return -1;
3714 ptr += nread;
3715 size -= nread;
3716 totread += nread;
3717 }
3718 if ((time(NULL)-start) > timeout) {
3719 errno = ETIMEDOUT;
3720 return -1;
3721 }
3722 }
3723 return totread;
3724 }
3725
3726 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
3727 ssize_t nread = 0;
3728
3729 size--;
3730 while(size) {
3731 char c;
3732
3733 if (syncRead(fd,&c,1,timeout) == -1) return -1;
3734 if (c == '\n') {
3735 *ptr = '\0';
3736 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
3737 return nread;
3738 } else {
3739 *ptr++ = c;
3740 *ptr = '\0';
3741 nread++;
3742 }
3743 }
3744 return nread;
3745 }
3746
3747 static void syncCommand(redisClient *c) {
3748 /* ignore SYNC if aleady slave or in monitor mode */
3749 if (c->flags & REDIS_SLAVE) return;
3750
3751 /* SYNC can't be issued when the server has pending data to send to
3752 * the client about already issued commands. We need a fresh reply
3753 * buffer registering the differences between the BGSAVE and the current
3754 * dataset, so that we can copy to other slaves if needed. */
3755 if (listLength(c->reply) != 0) {
3756 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3757 return;
3758 }
3759
3760 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
3761 /* Here we need to check if there is a background saving operation
3762 * in progress, or if it is required to start one */
3763 if (server.bgsaveinprogress) {
3764 /* Ok a background save is in progress. Let's check if it is a good
3765 * one for replication, i.e. if there is another slave that is
3766 * registering differences since the server forked to save */
3767 redisClient *slave;
3768 listNode *ln;
3769
3770 listRewind(server.slaves);
3771 while((ln = listYield(server.slaves))) {
3772 slave = ln->value;
3773 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
3774 }
3775 if (ln) {
3776 /* Perfect, the server is already registering differences for
3777 * another slave. Set the right state, and copy the buffer. */
3778 listRelease(c->reply);
3779 c->reply = listDup(slave->reply);
3780 if (!c->reply) oom("listDup copying slave reply list");
3781 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3782 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
3783 } else {
3784 /* No way, we need to wait for the next BGSAVE in order to
3785 * register differences */
3786 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
3787 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
3788 }
3789 } else {
3790 /* Ok we don't have a BGSAVE in progress, let's start one */
3791 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
3792 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3793 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
3794 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
3795 return;
3796 }
3797 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3798 }
3799 c->repldbfd = -1;
3800 c->flags |= REDIS_SLAVE;
3801 c->slaveseldb = 0;
3802 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
3803 return;
3804 }
3805
3806 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
3807 redisClient *slave = privdata;
3808 REDIS_NOTUSED(el);
3809 REDIS_NOTUSED(mask);
3810 char buf[REDIS_IOBUF_LEN];
3811 ssize_t nwritten, buflen;
3812
3813 if (slave->repldboff == 0) {
3814 /* Write the bulk write count before to transfer the DB. In theory here
3815 * we don't know how much room there is in the output buffer of the
3816 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3817 * operations) will never be smaller than the few bytes we need. */
3818 sds bulkcount;
3819
3820 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3821 slave->repldbsize);
3822 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
3823 {
3824 sdsfree(bulkcount);
3825 freeClient(slave);
3826 return;
3827 }
3828 sdsfree(bulkcount);
3829 }
3830 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
3831 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
3832 if (buflen <= 0) {
3833 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
3834 (buflen == 0) ? "premature EOF" : strerror(errno));
3835 freeClient(slave);
3836 return;
3837 }
3838 if ((nwritten = write(fd,buf,buflen)) == -1) {
3839 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
3840 strerror(errno));
3841 freeClient(slave);
3842 return;
3843 }
3844 slave->repldboff += nwritten;
3845 if (slave->repldboff == slave->repldbsize) {
3846 close(slave->repldbfd);
3847 slave->repldbfd = -1;
3848 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3849 slave->replstate = REDIS_REPL_ONLINE;
3850 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
3851 sendReplyToClient, slave, NULL) == AE_ERR) {
3852 freeClient(slave);
3853 return;
3854 }
3855 addReplySds(slave,sdsempty());
3856 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
3857 }
3858 }
3859
3860 static void updateSalvesWaitingBgsave(int bgsaveerr) {
3861 listNode *ln;
3862 int startbgsave = 0;
3863
3864 listRewind(server.slaves);
3865 while((ln = listYield(server.slaves))) {
3866 redisClient *slave = ln->value;
3867
3868 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
3869 startbgsave = 1;
3870 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3871 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
3872 struct stat buf;
3873
3874 if (bgsaveerr != REDIS_OK) {
3875 freeClient(slave);
3876 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
3877 continue;
3878 }
3879 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
3880 fstat(slave->repldbfd,&buf) == -1) {
3881 freeClient(slave);
3882 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
3883 continue;
3884 }
3885 slave->repldboff = 0;
3886 slave->repldbsize = buf.st_size;
3887 slave->replstate = REDIS_REPL_SEND_BULK;
3888 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3889 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
3890 freeClient(slave);
3891 continue;
3892 }
3893 }
3894 }
3895 if (startbgsave) {
3896 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3897 listRewind(server.slaves);
3898 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
3899 while((ln = listYield(server.slaves))) {
3900 redisClient *slave = ln->value;
3901
3902 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
3903 freeClient(slave);
3904 }
3905 }
3906 }
3907 }
3908
3909 static int syncWithMaster(void) {
3910 char buf[1024], tmpfile[256];
3911 int dumpsize;
3912 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
3913 int dfd;
3914
3915 if (fd == -1) {
3916 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
3917 strerror(errno));
3918 return REDIS_ERR;
3919 }
3920 /* Issue the SYNC command */
3921 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
3922 close(fd);
3923 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
3924 strerror(errno));
3925 return REDIS_ERR;
3926 }
3927 /* Read the bulk write count */
3928 if (syncReadLine(fd,buf,1024,3600) == -1) {
3929 close(fd);
3930 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
3931 strerror(errno));
3932 return REDIS_ERR;
3933 }
3934 dumpsize = atoi(buf+1);
3935 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
3936 /* Read the bulk write data on a temp file */
3937 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
3938 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
3939 if (dfd == -1) {
3940 close(fd);
3941 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
3942 return REDIS_ERR;
3943 }
3944 while(dumpsize) {
3945 int nread, nwritten;
3946
3947 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
3948 if (nread == -1) {
3949 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
3950 strerror(errno));
3951 close(fd);
3952 close(dfd);
3953 return REDIS_ERR;
3954 }
3955 nwritten = write(dfd,buf,nread);
3956 if (nwritten == -1) {
3957 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
3958 close(fd);
3959 close(dfd);
3960 return REDIS_ERR;
3961 }
3962 dumpsize -= nread;
3963 }
3964 close(dfd);
3965 if (rename(tmpfile,server.dbfilename) == -1) {
3966 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
3967 unlink(tmpfile);
3968 close(fd);
3969 return REDIS_ERR;
3970 }
3971 emptyDb();
3972 if (rdbLoad(server.dbfilename) != REDIS_OK) {
3973 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
3974 close(fd);
3975 return REDIS_ERR;
3976 }
3977 server.master = createClient(fd);
3978 server.master->flags |= REDIS_MASTER;
3979 server.replstate = REDIS_REPL_CONNECTED;
3980 return REDIS_OK;
3981 }
3982
3983 static void slaveofCommand(redisClient *c) {
3984 if (!strcasecmp(c->argv[1]->ptr,"no") &&
3985 !strcasecmp(c->argv[2]->ptr,"one")) {
3986 if (server.masterhost) {
3987 sdsfree(server.masterhost);
3988 server.masterhost = NULL;
3989 if (server.master) freeClient(server.master);
3990 server.replstate = REDIS_REPL_NONE;
3991 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
3992 }
3993 } else {
3994 sdsfree(server.masterhost);
3995 server.masterhost = sdsdup(c->argv[1]->ptr);
3996 server.masterport = atoi(c->argv[2]->ptr);
3997 if (server.master) freeClient(server.master);
3998 server.replstate = REDIS_REPL_CONNECT;
3999 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4000 server.masterhost, server.masterport);
4001 }
4002 addReply(c,shared.ok);
4003 }
4004
4005 /* ============================ Maxmemory directive ======================== */
4006
4007 /* This function gets called when 'maxmemory' is set on the config file to limit
4008 * the max memory used by the server, and we are out of memory.
4009 * This function will try to, in order:
4010 *
4011 * - Free objects from the free list
4012 * - Try to remove keys with an EXPIRE set
4013 *
4014 * It is not possible to free enough memory to reach used-memory < maxmemory
4015 * the server will start refusing commands that will enlarge even more the
4016 * memory usage.
4017 */
4018 static void freeMemoryIfNeeded(void) {
4019 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4020 if (listLength(server.objfreelist)) {
4021 robj *o;
4022
4023 listNode *head = listFirst(server.objfreelist);
4024 o = listNodeValue(head);
4025 listDelNode(server.objfreelist,head);
4026 zfree(o);
4027 } else {
4028 int j, k, freed = 0;
4029
4030 for (j = 0; j < server.dbnum; j++) {
4031 int minttl = -1;
4032 robj *minkey = NULL;
4033 struct dictEntry *de;
4034
4035 if (dictSize(server.db[j].expires)) {
4036 freed = 1;
4037 /* From a sample of three keys drop the one nearest to
4038 * the natural expire */
4039 for (k = 0; k < 3; k++) {
4040 time_t t;
4041
4042 de = dictGetRandomKey(server.db[j].expires);
4043 t = (time_t) dictGetEntryVal(de);
4044 if (minttl == -1 || t < minttl) {
4045 minkey = dictGetEntryKey(de);
4046 minttl = t;
4047 }
4048 }
4049 deleteKey(server.db+j,minkey);
4050 }
4051 }
4052 if (!freed) return; /* nothing to free... */
4053 }
4054 }
4055 }
4056
4057 /* ================================= Debugging ============================== */
4058
4059 static void debugCommand(redisClient *c) {
4060 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4061 *((char*)-1) = 'x';
4062 } else {
4063 addReplySds(c,sdsnew("-ERR Syntax error, try DEBUG SEGFAULT\r\n"));
4064 }
4065 }
4066
4067 static void onSigsegv(int sig) {
4068 void *trace[25];
4069 int n = backtrace(trace, 25);
4070 char **symbols = backtrace_symbols(trace, n);
4071
4072 redisLog(REDIS_WARNING,"Got %s!!! Redis crashed, backtrace:",
4073 sig == SIGSEGV ? "SIGSEGV" : "SIGBUS");
4074 for (int i = 0; i < n; i++)
4075 redisLog(REDIS_WARNING,symbols[i]);
4076 exit(1);
4077 }
4078
4079 /* =================================== Main! ================================ */
4080
4081 #ifdef __linux__
4082 int linuxOvercommitMemoryValue(void) {
4083 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4084 char buf[64];
4085
4086 if (!fp) return -1;
4087 if (fgets(buf,64,fp) == NULL) {
4088 fclose(fp);
4089 return -1;
4090 }
4091 fclose(fp);
4092
4093 return atoi(buf);
4094 }
4095
4096 void linuxOvercommitMemoryWarning(void) {
4097 if (linuxOvercommitMemoryValue() == 0) {
4098 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4099 }
4100 }
4101 #endif /* __linux__ */
4102
4103 static void daemonize(void) {
4104 int fd;
4105 FILE *fp;
4106
4107 if (fork() != 0) exit(0); /* parent exits */
4108 setsid(); /* create a new session */
4109
4110 /* Every output goes to /dev/null. If Redis is daemonized but
4111 * the 'logfile' is set to 'stdout' in the configuration file
4112 * it will not log at all. */
4113 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4114 dup2(fd, STDIN_FILENO);
4115 dup2(fd, STDOUT_FILENO);
4116 dup2(fd, STDERR_FILENO);
4117 if (fd > STDERR_FILENO) close(fd);
4118 }
4119 /* Try to write the pid file */
4120 fp = fopen(server.pidfile,"w");
4121 if (fp) {
4122 fprintf(fp,"%d\n",getpid());
4123 fclose(fp);
4124 }
4125 }
4126
4127 int main(int argc, char **argv) {
4128 #ifdef __linux__
4129 linuxOvercommitMemoryWarning();
4130 #endif
4131
4132 initServerConfig();
4133 if (argc == 2) {
4134 ResetServerSaveParams();
4135 loadServerConfig(argv[1]);
4136 } else if (argc > 2) {
4137 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4138 exit(1);
4139 } else {
4140 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4141 }
4142 initServer();
4143 if (server.daemonize) daemonize();
4144 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4145 if (rdbLoad(server.dbfilename) == REDIS_OK)
4146 redisLog(REDIS_NOTICE,"DB loaded from disk");
4147 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4148 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4149 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4150 aeMain(server.el);
4151 aeDeleteEventLoop(server.el);
4152 return 0;
4153 }