]> git.saurik.com Git - redis.git/blob - redis.c
01ab9158ef584d68c488f996cef7e7091a04bd8e
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "0.101"
31
32 #include "fmacros.h"
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 #define __USE_POSIX199309
40 #include <signal.h>
41 #include <execinfo.h>
42 #include <ucontext.h>
43 #include <sys/wait.h>
44 #include <errno.h>
45 #include <assert.h>
46 #include <ctype.h>
47 #include <stdarg.h>
48 #include <inttypes.h>
49 #include <arpa/inet.h>
50 #include <sys/stat.h>
51 #include <fcntl.h>
52 #include <sys/time.h>
53 #include <sys/resource.h>
54 #include <limits.h>
55 #include <execinfo.h>
56
57 #include "redis.h"
58 #include "ae.h" /* Event driven programming library */
59 #include "sds.h" /* Dynamic safe strings */
60 #include "anet.h" /* Networking the easy way */
61 #include "dict.h" /* Hash tables */
62 #include "adlist.h" /* Linked lists */
63 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
64 #include "lzf.h" /* LZF compression library */
65 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
66
67 /* Error codes */
68 #define REDIS_OK 0
69 #define REDIS_ERR -1
70
71 /* Static server configuration */
72 #define REDIS_SERVERPORT 6379 /* TCP port */
73 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
74 #define REDIS_IOBUF_LEN 1024
75 #define REDIS_LOADBUF_LEN 1024
76 #define REDIS_STATIC_ARGS 4
77 #define REDIS_DEFAULT_DBNUM 16
78 #define REDIS_CONFIGLINE_MAX 1024
79 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
80 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
81 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
82
83 /* Hash table parameters */
84 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
85 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
86
87 /* Command flags */
88 #define REDIS_CMD_BULK 1 /* Bulk write command */
89 #define REDIS_CMD_INLINE 2 /* Inline command */
90 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
91 this flags will return an error when the 'maxmemory' option is set in the
92 config file and the server is using more than maxmemory bytes of memory.
93 In short this commands are denied on low memory conditions. */
94 #define REDIS_CMD_DENYOOM 4
95
96 /* Object types */
97 #define REDIS_STRING 0
98 #define REDIS_LIST 1
99 #define REDIS_SET 2
100 #define REDIS_HASH 3
101
102 /* Object types only used for dumping to disk */
103 #define REDIS_EXPIRETIME 253
104 #define REDIS_SELECTDB 254
105 #define REDIS_EOF 255
106
107 /* Defines related to the dump file format. To store 32 bits lengths for short
108 * keys requires a lot of space, so we check the most significant 2 bits of
109 * the first byte to interpreter the length:
110 *
111 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
112 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
113 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
114 * 11|000000 this means: specially encoded object will follow. The six bits
115 * number specify the kind of object that follows.
116 * See the REDIS_RDB_ENC_* defines.
117 *
118 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
119 * values, will fit inside. */
120 #define REDIS_RDB_6BITLEN 0
121 #define REDIS_RDB_14BITLEN 1
122 #define REDIS_RDB_32BITLEN 2
123 #define REDIS_RDB_ENCVAL 3
124 #define REDIS_RDB_LENERR UINT_MAX
125
126 /* When a length of a string object stored on disk has the first two bits
127 * set, the remaining two bits specify a special encoding for the object
128 * accordingly to the following defines: */
129 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
130 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
131 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
132 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
133
134 /* Client flags */
135 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
136 #define REDIS_SLAVE 2 /* This client is a slave server */
137 #define REDIS_MASTER 4 /* This client is a master server */
138 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
139
140 /* Slave replication state - slave side */
141 #define REDIS_REPL_NONE 0 /* No active replication */
142 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
143 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
144
145 /* Slave replication state - from the point of view of master
146 * Note that in SEND_BULK and ONLINE state the slave receives new updates
147 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
148 * to start the next background saving in order to send updates to it. */
149 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
150 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
151 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
152 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
153
154 /* List related stuff */
155 #define REDIS_HEAD 0
156 #define REDIS_TAIL 1
157
158 /* Sort operations */
159 #define REDIS_SORT_GET 0
160 #define REDIS_SORT_DEL 1
161 #define REDIS_SORT_INCR 2
162 #define REDIS_SORT_DECR 3
163 #define REDIS_SORT_ASC 4
164 #define REDIS_SORT_DESC 5
165 #define REDIS_SORTKEY_MAX 1024
166
167 /* Log levels */
168 #define REDIS_DEBUG 0
169 #define REDIS_NOTICE 1
170 #define REDIS_WARNING 2
171
172 /* Anti-warning macro... */
173 #define REDIS_NOTUSED(V) ((void) V)
174
175
176 /*================================= Data types ============================== */
177
178 /* A redis object, that is a type able to hold a string / list / set */
179 typedef struct redisObject {
180 void *ptr;
181 int type;
182 int refcount;
183 } robj;
184
185 typedef struct redisDb {
186 dict *dict;
187 dict *expires;
188 int id;
189 } redisDb;
190
191 /* With multiplexing we need to take per-clinet state.
192 * Clients are taken in a liked list. */
193 typedef struct redisClient {
194 int fd;
195 redisDb *db;
196 int dictid;
197 sds querybuf;
198 robj **argv;
199 int argc;
200 int bulklen; /* bulk read len. -1 if not in bulk read mode */
201 list *reply;
202 int sentlen;
203 time_t lastinteraction; /* time of the last interaction, used for timeout */
204 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
205 int slaveseldb; /* slave selected db, if this client is a slave */
206 int authenticated; /* when requirepass is non-NULL */
207 int replstate; /* replication state if this is a slave */
208 int repldbfd; /* replication DB file descriptor */
209 long repldboff; /* replication DB file offset */
210 off_t repldbsize; /* replication DB file size */
211 } redisClient;
212
213 struct saveparam {
214 time_t seconds;
215 int changes;
216 };
217
218 /* Global server state structure */
219 struct redisServer {
220 int port;
221 int fd;
222 redisDb *db;
223 dict *sharingpool;
224 unsigned int sharingpoolsize;
225 long long dirty; /* changes to DB from the last save */
226 list *clients;
227 list *slaves, *monitors;
228 char neterr[ANET_ERR_LEN];
229 aeEventLoop *el;
230 int cronloops; /* number of times the cron function run */
231 list *objfreelist; /* A list of freed objects to avoid malloc() */
232 time_t lastsave; /* Unix time of last save succeeede */
233 size_t usedmemory; /* Used memory in megabytes */
234 /* Fields used only for stats */
235 time_t stat_starttime; /* server start time */
236 long long stat_numcommands; /* number of processed commands */
237 long long stat_numconnections; /* number of connections received */
238 /* Configuration */
239 int verbosity;
240 int glueoutputbuf;
241 int maxidletime;
242 int dbnum;
243 int daemonize;
244 char *pidfile;
245 int bgsaveinprogress;
246 struct saveparam *saveparams;
247 int saveparamslen;
248 char *logfile;
249 char *bindaddr;
250 char *dbfilename;
251 char *requirepass;
252 int shareobjects;
253 /* Replication related */
254 int isslave;
255 char *masterhost;
256 int masterport;
257 redisClient *master; /* client that is master for this slave */
258 int replstate;
259 unsigned int maxclients;
260 unsigned int maxmemory;
261 /* Sort parameters - qsort_r() is only available under BSD so we
262 * have to take this state global, in order to pass it to sortCompare() */
263 int sort_desc;
264 int sort_alpha;
265 int sort_bypattern;
266 };
267
268 typedef void redisCommandProc(redisClient *c);
269 struct redisCommand {
270 char *name;
271 redisCommandProc *proc;
272 int arity;
273 int flags;
274 };
275
276 typedef struct _redisSortObject {
277 robj *obj;
278 union {
279 double score;
280 robj *cmpobj;
281 } u;
282 } redisSortObject;
283
284 typedef struct _redisSortOperation {
285 int type;
286 robj *pattern;
287 } redisSortOperation;
288
289 struct sharedObjectsStruct {
290 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
291 *colon, *nullbulk, *nullmultibulk,
292 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
293 *outofrangeerr, *plus,
294 *select0, *select1, *select2, *select3, *select4,
295 *select5, *select6, *select7, *select8, *select9;
296 } shared;
297
298 /*================================ Prototypes =============================== */
299
300 static void freeStringObject(robj *o);
301 static void freeListObject(robj *o);
302 static void freeSetObject(robj *o);
303 static void decrRefCount(void *o);
304 static robj *createObject(int type, void *ptr);
305 static void freeClient(redisClient *c);
306 static int rdbLoad(char *filename);
307 static void addReply(redisClient *c, robj *obj);
308 static void addReplySds(redisClient *c, sds s);
309 static void incrRefCount(robj *o);
310 static int rdbSaveBackground(char *filename);
311 static robj *createStringObject(char *ptr, size_t len);
312 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
313 static int syncWithMaster(void);
314 static robj *tryObjectSharing(robj *o);
315 static int removeExpire(redisDb *db, robj *key);
316 static int expireIfNeeded(redisDb *db, robj *key);
317 static int deleteIfVolatile(redisDb *db, robj *key);
318 static int deleteKey(redisDb *db, robj *key);
319 static time_t getExpire(redisDb *db, robj *key);
320 static int setExpire(redisDb *db, robj *key, time_t when);
321 static void updateSalvesWaitingBgsave(int bgsaveerr);
322 static void freeMemoryIfNeeded(void);
323
324 static void authCommand(redisClient *c);
325 static void pingCommand(redisClient *c);
326 static void echoCommand(redisClient *c);
327 static void setCommand(redisClient *c);
328 static void setnxCommand(redisClient *c);
329 static void getCommand(redisClient *c);
330 static void delCommand(redisClient *c);
331 static void existsCommand(redisClient *c);
332 static void incrCommand(redisClient *c);
333 static void decrCommand(redisClient *c);
334 static void incrbyCommand(redisClient *c);
335 static void decrbyCommand(redisClient *c);
336 static void selectCommand(redisClient *c);
337 static void randomkeyCommand(redisClient *c);
338 static void keysCommand(redisClient *c);
339 static void dbsizeCommand(redisClient *c);
340 static void lastsaveCommand(redisClient *c);
341 static void saveCommand(redisClient *c);
342 static void bgsaveCommand(redisClient *c);
343 static void shutdownCommand(redisClient *c);
344 static void moveCommand(redisClient *c);
345 static void renameCommand(redisClient *c);
346 static void renamenxCommand(redisClient *c);
347 static void lpushCommand(redisClient *c);
348 static void rpushCommand(redisClient *c);
349 static void lpopCommand(redisClient *c);
350 static void rpopCommand(redisClient *c);
351 static void llenCommand(redisClient *c);
352 static void lindexCommand(redisClient *c);
353 static void lrangeCommand(redisClient *c);
354 static void ltrimCommand(redisClient *c);
355 static void typeCommand(redisClient *c);
356 static void lsetCommand(redisClient *c);
357 static void saddCommand(redisClient *c);
358 static void sremCommand(redisClient *c);
359 static void smoveCommand(redisClient *c);
360 static void sismemberCommand(redisClient *c);
361 static void scardCommand(redisClient *c);
362 static void sinterCommand(redisClient *c);
363 static void sinterstoreCommand(redisClient *c);
364 static void sunionCommand(redisClient *c);
365 static void sunionstoreCommand(redisClient *c);
366 static void sdiffCommand(redisClient *c);
367 static void sdiffstoreCommand(redisClient *c);
368 static void syncCommand(redisClient *c);
369 static void flushdbCommand(redisClient *c);
370 static void flushallCommand(redisClient *c);
371 static void sortCommand(redisClient *c);
372 static void lremCommand(redisClient *c);
373 static void infoCommand(redisClient *c);
374 static void mgetCommand(redisClient *c);
375 static void monitorCommand(redisClient *c);
376 static void expireCommand(redisClient *c);
377 static void getSetCommand(redisClient *c);
378 static void ttlCommand(redisClient *c);
379 static void slaveofCommand(redisClient *c);
380 static void debugCommand(redisClient *c);
381
382 /*================================= Globals ================================= */
383
384 /* Global vars */
385 static struct redisServer server; /* server global state */
386 static struct redisCommand cmdTable[] = {
387 {"get",getCommand,2,REDIS_CMD_INLINE},
388 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
389 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
390 {"del",delCommand,-2,REDIS_CMD_INLINE},
391 {"exists",existsCommand,2,REDIS_CMD_INLINE},
392 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
393 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
394 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
395 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
396 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
397 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
398 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
399 {"llen",llenCommand,2,REDIS_CMD_INLINE},
400 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
401 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
402 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
403 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
404 {"lrem",lremCommand,4,REDIS_CMD_BULK},
405 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
406 {"srem",sremCommand,3,REDIS_CMD_BULK},
407 {"smove",smoveCommand,4,REDIS_CMD_BULK},
408 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
409 {"scard",scardCommand,2,REDIS_CMD_INLINE},
410 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
411 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
412 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
413 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
414 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
415 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
416 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
417 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
418 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
419 {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
420 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
421 {"select",selectCommand,2,REDIS_CMD_INLINE},
422 {"move",moveCommand,3,REDIS_CMD_INLINE},
423 {"rename",renameCommand,3,REDIS_CMD_INLINE},
424 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
425 {"expire",expireCommand,3,REDIS_CMD_INLINE},
426 {"keys",keysCommand,2,REDIS_CMD_INLINE},
427 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
428 {"auth",authCommand,2,REDIS_CMD_INLINE},
429 {"ping",pingCommand,1,REDIS_CMD_INLINE},
430 {"echo",echoCommand,2,REDIS_CMD_BULK},
431 {"save",saveCommand,1,REDIS_CMD_INLINE},
432 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
433 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
434 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
435 {"type",typeCommand,2,REDIS_CMD_INLINE},
436 {"sync",syncCommand,1,REDIS_CMD_INLINE},
437 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
438 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
439 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
440 {"info",infoCommand,1,REDIS_CMD_INLINE},
441 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
442 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
443 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
444 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
445 {NULL,NULL,0,0}
446 };
447
448 /*============================ Utility functions ============================ */
449
450 /* Glob-style pattern matching. */
451 int stringmatchlen(const char *pattern, int patternLen,
452 const char *string, int stringLen, int nocase)
453 {
454 while(patternLen) {
455 switch(pattern[0]) {
456 case '*':
457 while (pattern[1] == '*') {
458 pattern++;
459 patternLen--;
460 }
461 if (patternLen == 1)
462 return 1; /* match */
463 while(stringLen) {
464 if (stringmatchlen(pattern+1, patternLen-1,
465 string, stringLen, nocase))
466 return 1; /* match */
467 string++;
468 stringLen--;
469 }
470 return 0; /* no match */
471 break;
472 case '?':
473 if (stringLen == 0)
474 return 0; /* no match */
475 string++;
476 stringLen--;
477 break;
478 case '[':
479 {
480 int not, match;
481
482 pattern++;
483 patternLen--;
484 not = pattern[0] == '^';
485 if (not) {
486 pattern++;
487 patternLen--;
488 }
489 match = 0;
490 while(1) {
491 if (pattern[0] == '\\') {
492 pattern++;
493 patternLen--;
494 if (pattern[0] == string[0])
495 match = 1;
496 } else if (pattern[0] == ']') {
497 break;
498 } else if (patternLen == 0) {
499 pattern--;
500 patternLen++;
501 break;
502 } else if (pattern[1] == '-' && patternLen >= 3) {
503 int start = pattern[0];
504 int end = pattern[2];
505 int c = string[0];
506 if (start > end) {
507 int t = start;
508 start = end;
509 end = t;
510 }
511 if (nocase) {
512 start = tolower(start);
513 end = tolower(end);
514 c = tolower(c);
515 }
516 pattern += 2;
517 patternLen -= 2;
518 if (c >= start && c <= end)
519 match = 1;
520 } else {
521 if (!nocase) {
522 if (pattern[0] == string[0])
523 match = 1;
524 } else {
525 if (tolower((int)pattern[0]) == tolower((int)string[0]))
526 match = 1;
527 }
528 }
529 pattern++;
530 patternLen--;
531 }
532 if (not)
533 match = !match;
534 if (!match)
535 return 0; /* no match */
536 string++;
537 stringLen--;
538 break;
539 }
540 case '\\':
541 if (patternLen >= 2) {
542 pattern++;
543 patternLen--;
544 }
545 /* fall through */
546 default:
547 if (!nocase) {
548 if (pattern[0] != string[0])
549 return 0; /* no match */
550 } else {
551 if (tolower((int)pattern[0]) != tolower((int)string[0]))
552 return 0; /* no match */
553 }
554 string++;
555 stringLen--;
556 break;
557 }
558 pattern++;
559 patternLen--;
560 if (stringLen == 0) {
561 while(*pattern == '*') {
562 pattern++;
563 patternLen--;
564 }
565 break;
566 }
567 }
568 if (patternLen == 0 && stringLen == 0)
569 return 1;
570 return 0;
571 }
572
573 void redisLog(int level, const char *fmt, ...)
574 {
575 va_list ap;
576 FILE *fp;
577
578 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
579 if (!fp) return;
580
581 va_start(ap, fmt);
582 if (level >= server.verbosity) {
583 char *c = ".-*";
584 char buf[64];
585 time_t now;
586
587 now = time(NULL);
588 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
589 fprintf(fp,"%s %c ",buf,c[level]);
590 vfprintf(fp, fmt, ap);
591 fprintf(fp,"\n");
592 fflush(fp);
593 }
594 va_end(ap);
595
596 if (server.logfile) fclose(fp);
597 }
598
599 /*====================== Hash table type implementation ==================== */
600
601 /* This is an hash table type that uses the SDS dynamic strings libary as
602 * keys and radis objects as values (objects can hold SDS strings,
603 * lists, sets). */
604
605 static int sdsDictKeyCompare(void *privdata, const void *key1,
606 const void *key2)
607 {
608 int l1,l2;
609 DICT_NOTUSED(privdata);
610
611 l1 = sdslen((sds)key1);
612 l2 = sdslen((sds)key2);
613 if (l1 != l2) return 0;
614 return memcmp(key1, key2, l1) == 0;
615 }
616
617 static void dictRedisObjectDestructor(void *privdata, void *val)
618 {
619 DICT_NOTUSED(privdata);
620
621 decrRefCount(val);
622 }
623
624 static int dictSdsKeyCompare(void *privdata, const void *key1,
625 const void *key2)
626 {
627 const robj *o1 = key1, *o2 = key2;
628 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
629 }
630
631 static unsigned int dictSdsHash(const void *key) {
632 const robj *o = key;
633 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
634 }
635
636 static dictType setDictType = {
637 dictSdsHash, /* hash function */
638 NULL, /* key dup */
639 NULL, /* val dup */
640 dictSdsKeyCompare, /* key compare */
641 dictRedisObjectDestructor, /* key destructor */
642 NULL /* val destructor */
643 };
644
645 static dictType hashDictType = {
646 dictSdsHash, /* hash function */
647 NULL, /* key dup */
648 NULL, /* val dup */
649 dictSdsKeyCompare, /* key compare */
650 dictRedisObjectDestructor, /* key destructor */
651 dictRedisObjectDestructor /* val destructor */
652 };
653
654 /* ========================= Random utility functions ======================= */
655
656 /* Redis generally does not try to recover from out of memory conditions
657 * when allocating objects or strings, it is not clear if it will be possible
658 * to report this condition to the client since the networking layer itself
659 * is based on heap allocation for send buffers, so we simply abort.
660 * At least the code will be simpler to read... */
661 static void oom(const char *msg) {
662 fprintf(stderr, "%s: Out of memory\n",msg);
663 fflush(stderr);
664 sleep(1);
665 abort();
666 }
667
668 /* ====================== Redis server networking stuff ===================== */
669 void closeTimedoutClients(void) {
670 redisClient *c;
671 listNode *ln;
672 time_t now = time(NULL);
673
674 listRewind(server.clients);
675 while ((ln = listYield(server.clients)) != NULL) {
676 c = listNodeValue(ln);
677 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
678 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
679 (now - c->lastinteraction > server.maxidletime)) {
680 redisLog(REDIS_DEBUG,"Closing idle client");
681 freeClient(c);
682 }
683 }
684 }
685
686 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
687 * we resize the hash table to save memory */
688 void tryResizeHashTables(void) {
689 int j;
690
691 for (j = 0; j < server.dbnum; j++) {
692 long long size, used;
693
694 size = dictSlots(server.db[j].dict);
695 used = dictSize(server.db[j].dict);
696 if (size && used && size > REDIS_HT_MINSLOTS &&
697 (used*100/size < REDIS_HT_MINFILL)) {
698 redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
699 dictResize(server.db[j].dict);
700 redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
701 }
702 }
703 }
704
705 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
706 int j, loops = server.cronloops++;
707 REDIS_NOTUSED(eventLoop);
708 REDIS_NOTUSED(id);
709 REDIS_NOTUSED(clientData);
710
711 /* Update the global state with the amount of used memory */
712 server.usedmemory = zmalloc_used_memory();
713
714 /* Show some info about non-empty databases */
715 for (j = 0; j < server.dbnum; j++) {
716 long long size, used, vkeys;
717
718 size = dictSlots(server.db[j].dict);
719 used = dictSize(server.db[j].dict);
720 vkeys = dictSize(server.db[j].expires);
721 if (!(loops % 5) && used > 0) {
722 redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size);
723 /* dictPrintStats(server.dict); */
724 }
725 }
726
727 /* We don't want to resize the hash tables while a bacground saving
728 * is in progress: the saving child is created using fork() that is
729 * implemented with a copy-on-write semantic in most modern systems, so
730 * if we resize the HT while there is the saving child at work actually
731 * a lot of memory movements in the parent will cause a lot of pages
732 * copied. */
733 if (!server.bgsaveinprogress) tryResizeHashTables();
734
735 /* Show information about connected clients */
736 if (!(loops % 5)) {
737 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use",
738 listLength(server.clients)-listLength(server.slaves),
739 listLength(server.slaves),
740 server.usedmemory,
741 dictSize(server.sharingpool));
742 }
743
744 /* Close connections of timedout clients */
745 if (server.maxidletime && !(loops % 10))
746 closeTimedoutClients();
747
748 /* Check if a background saving in progress terminated */
749 if (server.bgsaveinprogress) {
750 int statloc;
751 /* XXX: TODO handle the case of the saving child killed */
752 if (wait4(-1,&statloc,WNOHANG,NULL)) {
753 int exitcode = WEXITSTATUS(statloc);
754 if (exitcode == 0) {
755 redisLog(REDIS_NOTICE,
756 "Background saving terminated with success");
757 server.dirty = 0;
758 server.lastsave = time(NULL);
759 } else {
760 redisLog(REDIS_WARNING,
761 "Background saving error");
762 }
763 server.bgsaveinprogress = 0;
764 updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
765 }
766 } else {
767 /* If there is not a background saving in progress check if
768 * we have to save now */
769 time_t now = time(NULL);
770 for (j = 0; j < server.saveparamslen; j++) {
771 struct saveparam *sp = server.saveparams+j;
772
773 if (server.dirty >= sp->changes &&
774 now-server.lastsave > sp->seconds) {
775 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
776 sp->changes, sp->seconds);
777 rdbSaveBackground(server.dbfilename);
778 break;
779 }
780 }
781 }
782
783 /* Try to expire a few timed out keys */
784 for (j = 0; j < server.dbnum; j++) {
785 redisDb *db = server.db+j;
786 int num = dictSize(db->expires);
787
788 if (num) {
789 time_t now = time(NULL);
790
791 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
792 num = REDIS_EXPIRELOOKUPS_PER_CRON;
793 while (num--) {
794 dictEntry *de;
795 time_t t;
796
797 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
798 t = (time_t) dictGetEntryVal(de);
799 if (now > t) {
800 deleteKey(db,dictGetEntryKey(de));
801 }
802 }
803 }
804 }
805
806 /* Check if we should connect to a MASTER */
807 if (server.replstate == REDIS_REPL_CONNECT) {
808 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
809 if (syncWithMaster() == REDIS_OK) {
810 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
811 }
812 }
813 return 1000;
814 }
815
816 static void createSharedObjects(void) {
817 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
818 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
819 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
820 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
821 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
822 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
823 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
824 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
825 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
826 /* no such key */
827 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
828 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
829 "-ERR Operation against a key holding the wrong kind of value\r\n"));
830 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
831 "-ERR no such key\r\n"));
832 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
833 "-ERR syntax error\r\n"));
834 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
835 "-ERR source and destination objects are the same\r\n"));
836 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
837 "-ERR index out of range\r\n"));
838 shared.space = createObject(REDIS_STRING,sdsnew(" "));
839 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
840 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
841 shared.select0 = createStringObject("select 0\r\n",10);
842 shared.select1 = createStringObject("select 1\r\n",10);
843 shared.select2 = createStringObject("select 2\r\n",10);
844 shared.select3 = createStringObject("select 3\r\n",10);
845 shared.select4 = createStringObject("select 4\r\n",10);
846 shared.select5 = createStringObject("select 5\r\n",10);
847 shared.select6 = createStringObject("select 6\r\n",10);
848 shared.select7 = createStringObject("select 7\r\n",10);
849 shared.select8 = createStringObject("select 8\r\n",10);
850 shared.select9 = createStringObject("select 9\r\n",10);
851 }
852
853 static void appendServerSaveParams(time_t seconds, int changes) {
854 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
855 if (server.saveparams == NULL) oom("appendServerSaveParams");
856 server.saveparams[server.saveparamslen].seconds = seconds;
857 server.saveparams[server.saveparamslen].changes = changes;
858 server.saveparamslen++;
859 }
860
861 static void ResetServerSaveParams() {
862 zfree(server.saveparams);
863 server.saveparams = NULL;
864 server.saveparamslen = 0;
865 }
866
867 static void initServerConfig() {
868 server.dbnum = REDIS_DEFAULT_DBNUM;
869 server.port = REDIS_SERVERPORT;
870 server.verbosity = REDIS_DEBUG;
871 server.maxidletime = REDIS_MAXIDLETIME;
872 server.saveparams = NULL;
873 server.logfile = NULL; /* NULL = log on standard output */
874 server.bindaddr = NULL;
875 server.glueoutputbuf = 1;
876 server.daemonize = 0;
877 server.pidfile = "/var/run/redis.pid";
878 server.dbfilename = "dump.rdb";
879 server.requirepass = NULL;
880 server.shareobjects = 0;
881 server.maxclients = 0;
882 server.maxmemory = 0;
883 ResetServerSaveParams();
884
885 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
886 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
887 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
888 /* Replication related */
889 server.isslave = 0;
890 server.masterhost = NULL;
891 server.masterport = 6379;
892 server.master = NULL;
893 server.replstate = REDIS_REPL_NONE;
894 }
895
896 static void initServer() {
897 int j;
898
899 signal(SIGHUP, SIG_IGN);
900 signal(SIGPIPE, SIG_IGN);
901
902 server.clients = listCreate();
903 server.slaves = listCreate();
904 server.monitors = listCreate();
905 server.objfreelist = listCreate();
906 createSharedObjects();
907 server.el = aeCreateEventLoop();
908 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
909 server.sharingpool = dictCreate(&setDictType,NULL);
910 server.sharingpoolsize = 1024;
911 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
912 oom("server initialization"); /* Fatal OOM */
913 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
914 if (server.fd == -1) {
915 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
916 exit(1);
917 }
918 for (j = 0; j < server.dbnum; j++) {
919 server.db[j].dict = dictCreate(&hashDictType,NULL);
920 server.db[j].expires = dictCreate(&setDictType,NULL);
921 server.db[j].id = j;
922 }
923 server.cronloops = 0;
924 server.bgsaveinprogress = 0;
925 server.lastsave = time(NULL);
926 server.dirty = 0;
927 server.usedmemory = 0;
928 server.stat_numcommands = 0;
929 server.stat_numconnections = 0;
930 server.stat_starttime = time(NULL);
931 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
932 }
933
934 /* Empty the whole database */
935 static long long emptyDb() {
936 int j;
937 long long removed = 0;
938
939 for (j = 0; j < server.dbnum; j++) {
940 removed += dictSize(server.db[j].dict);
941 dictEmpty(server.db[j].dict);
942 dictEmpty(server.db[j].expires);
943 }
944 return removed;
945 }
946
947 static int yesnotoi(char *s) {
948 if (!strcasecmp(s,"yes")) return 1;
949 else if (!strcasecmp(s,"no")) return 0;
950 else return -1;
951 }
952
953 /* I agree, this is a very rudimental way to load a configuration...
954 will improve later if the config gets more complex */
955 static void loadServerConfig(char *filename) {
956 FILE *fp = fopen(filename,"r");
957 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
958 int linenum = 0;
959 sds line = NULL;
960
961 if (!fp) {
962 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
963 exit(1);
964 }
965 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
966 sds *argv;
967 int argc, j;
968
969 linenum++;
970 line = sdsnew(buf);
971 line = sdstrim(line," \t\r\n");
972
973 /* Skip comments and blank lines*/
974 if (line[0] == '#' || line[0] == '\0') {
975 sdsfree(line);
976 continue;
977 }
978
979 /* Split into arguments */
980 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
981 sdstolower(argv[0]);
982
983 /* Execute config directives */
984 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
985 server.maxidletime = atoi(argv[1]);
986 if (server.maxidletime < 0) {
987 err = "Invalid timeout value"; goto loaderr;
988 }
989 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
990 server.port = atoi(argv[1]);
991 if (server.port < 1 || server.port > 65535) {
992 err = "Invalid port"; goto loaderr;
993 }
994 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
995 server.bindaddr = zstrdup(argv[1]);
996 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
997 int seconds = atoi(argv[1]);
998 int changes = atoi(argv[2]);
999 if (seconds < 1 || changes < 0) {
1000 err = "Invalid save parameters"; goto loaderr;
1001 }
1002 appendServerSaveParams(seconds,changes);
1003 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1004 if (chdir(argv[1]) == -1) {
1005 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1006 argv[1], strerror(errno));
1007 exit(1);
1008 }
1009 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1010 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1011 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1012 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1013 else {
1014 err = "Invalid log level. Must be one of debug, notice, warning";
1015 goto loaderr;
1016 }
1017 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1018 FILE *fp;
1019
1020 server.logfile = zstrdup(argv[1]);
1021 if (!strcasecmp(server.logfile,"stdout")) {
1022 zfree(server.logfile);
1023 server.logfile = NULL;
1024 }
1025 if (server.logfile) {
1026 /* Test if we are able to open the file. The server will not
1027 * be able to abort just for this problem later... */
1028 fp = fopen(server.logfile,"a");
1029 if (fp == NULL) {
1030 err = sdscatprintf(sdsempty(),
1031 "Can't open the log file: %s", strerror(errno));
1032 goto loaderr;
1033 }
1034 fclose(fp);
1035 }
1036 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1037 server.dbnum = atoi(argv[1]);
1038 if (server.dbnum < 1) {
1039 err = "Invalid number of databases"; goto loaderr;
1040 }
1041 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1042 server.maxclients = atoi(argv[1]);
1043 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1044 server.maxmemory = atoi(argv[1]);
1045 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1046 server.masterhost = sdsnew(argv[1]);
1047 server.masterport = atoi(argv[2]);
1048 server.replstate = REDIS_REPL_CONNECT;
1049 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1050 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1051 err = "argument must be 'yes' or 'no'"; goto loaderr;
1052 }
1053 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1054 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1055 err = "argument must be 'yes' or 'no'"; goto loaderr;
1056 }
1057 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1058 server.sharingpoolsize = atoi(argv[1]);
1059 if (server.sharingpoolsize < 1) {
1060 err = "invalid object sharing pool size"; goto loaderr;
1061 }
1062 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1063 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1064 err = "argument must be 'yes' or 'no'"; goto loaderr;
1065 }
1066 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1067 server.requirepass = zstrdup(argv[1]);
1068 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1069 server.pidfile = zstrdup(argv[1]);
1070 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1071 server.dbfilename = zstrdup(argv[1]);
1072 } else {
1073 err = "Bad directive or wrong number of arguments"; goto loaderr;
1074 }
1075 for (j = 0; j < argc; j++)
1076 sdsfree(argv[j]);
1077 zfree(argv);
1078 sdsfree(line);
1079 }
1080 fclose(fp);
1081 return;
1082
1083 loaderr:
1084 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1085 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1086 fprintf(stderr, ">>> '%s'\n", line);
1087 fprintf(stderr, "%s\n", err);
1088 exit(1);
1089 }
1090
1091 static void freeClientArgv(redisClient *c) {
1092 int j;
1093
1094 for (j = 0; j < c->argc; j++)
1095 decrRefCount(c->argv[j]);
1096 c->argc = 0;
1097 }
1098
1099 static void freeClient(redisClient *c) {
1100 listNode *ln;
1101
1102 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1103 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1104 sdsfree(c->querybuf);
1105 listRelease(c->reply);
1106 freeClientArgv(c);
1107 close(c->fd);
1108 ln = listSearchKey(server.clients,c);
1109 assert(ln != NULL);
1110 listDelNode(server.clients,ln);
1111 if (c->flags & REDIS_SLAVE) {
1112 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1113 close(c->repldbfd);
1114 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1115 ln = listSearchKey(l,c);
1116 assert(ln != NULL);
1117 listDelNode(l,ln);
1118 }
1119 if (c->flags & REDIS_MASTER) {
1120 server.master = NULL;
1121 server.replstate = REDIS_REPL_CONNECT;
1122 }
1123 zfree(c->argv);
1124 zfree(c);
1125 }
1126
1127 static void glueReplyBuffersIfNeeded(redisClient *c) {
1128 int totlen = 0;
1129 listNode *ln;
1130 robj *o;
1131
1132 listRewind(c->reply);
1133 while((ln = listYield(c->reply))) {
1134 o = ln->value;
1135 totlen += sdslen(o->ptr);
1136 /* This optimization makes more sense if we don't have to copy
1137 * too much data */
1138 if (totlen > 1024) return;
1139 }
1140 if (totlen > 0) {
1141 char buf[1024];
1142 int copylen = 0;
1143
1144 listRewind(c->reply);
1145 while((ln = listYield(c->reply))) {
1146 o = ln->value;
1147 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1148 copylen += sdslen(o->ptr);
1149 listDelNode(c->reply,ln);
1150 }
1151 /* Now the output buffer is empty, add the new single element */
1152 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1153 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1154 }
1155 }
1156
1157 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1158 redisClient *c = privdata;
1159 int nwritten = 0, totwritten = 0, objlen;
1160 robj *o;
1161 REDIS_NOTUSED(el);
1162 REDIS_NOTUSED(mask);
1163
1164 if (server.glueoutputbuf && listLength(c->reply) > 1)
1165 glueReplyBuffersIfNeeded(c);
1166 while(listLength(c->reply)) {
1167 o = listNodeValue(listFirst(c->reply));
1168 objlen = sdslen(o->ptr);
1169
1170 if (objlen == 0) {
1171 listDelNode(c->reply,listFirst(c->reply));
1172 continue;
1173 }
1174
1175 if (c->flags & REDIS_MASTER) {
1176 nwritten = objlen - c->sentlen;
1177 } else {
1178 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1179 if (nwritten <= 0) break;
1180 }
1181 c->sentlen += nwritten;
1182 totwritten += nwritten;
1183 /* If we fully sent the object on head go to the next one */
1184 if (c->sentlen == objlen) {
1185 listDelNode(c->reply,listFirst(c->reply));
1186 c->sentlen = 0;
1187 }
1188 }
1189 if (nwritten == -1) {
1190 if (errno == EAGAIN) {
1191 nwritten = 0;
1192 } else {
1193 redisLog(REDIS_DEBUG,
1194 "Error writing to client: %s", strerror(errno));
1195 freeClient(c);
1196 return;
1197 }
1198 }
1199 if (totwritten > 0) c->lastinteraction = time(NULL);
1200 if (listLength(c->reply) == 0) {
1201 c->sentlen = 0;
1202 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1203 }
1204 }
1205
1206 static struct redisCommand *lookupCommand(char *name) {
1207 int j = 0;
1208 while(cmdTable[j].name != NULL) {
1209 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1210 j++;
1211 }
1212 return NULL;
1213 }
1214
1215 /* resetClient prepare the client to process the next command */
1216 static void resetClient(redisClient *c) {
1217 freeClientArgv(c);
1218 c->bulklen = -1;
1219 }
1220
1221 /* If this function gets called we already read a whole
1222 * command, argments are in the client argv/argc fields.
1223 * processCommand() execute the command or prepare the
1224 * server for a bulk read from the client.
1225 *
1226 * If 1 is returned the client is still alive and valid and
1227 * and other operations can be performed by the caller. Otherwise
1228 * if 0 is returned the client was destroied (i.e. after QUIT). */
1229 static int processCommand(redisClient *c) {
1230 struct redisCommand *cmd;
1231 long long dirty;
1232
1233 /* Free some memory if needed (maxmemory setting) */
1234 if (server.maxmemory) freeMemoryIfNeeded();
1235
1236 /* The QUIT command is handled as a special case. Normal command
1237 * procs are unable to close the client connection safely */
1238 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1239 freeClient(c);
1240 return 0;
1241 }
1242 cmd = lookupCommand(c->argv[0]->ptr);
1243 if (!cmd) {
1244 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1245 resetClient(c);
1246 return 1;
1247 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1248 (c->argc < -cmd->arity)) {
1249 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1250 resetClient(c);
1251 return 1;
1252 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1253 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1254 resetClient(c);
1255 return 1;
1256 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1257 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1258
1259 decrRefCount(c->argv[c->argc-1]);
1260 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1261 c->argc--;
1262 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1263 resetClient(c);
1264 return 1;
1265 }
1266 c->argc--;
1267 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1268 /* It is possible that the bulk read is already in the
1269 * buffer. Check this condition and handle it accordingly */
1270 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1271 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1272 c->argc++;
1273 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1274 } else {
1275 return 1;
1276 }
1277 }
1278 /* Let's try to share objects on the command arguments vector */
1279 if (server.shareobjects) {
1280 int j;
1281 for(j = 1; j < c->argc; j++)
1282 c->argv[j] = tryObjectSharing(c->argv[j]);
1283 }
1284 /* Check if the user is authenticated */
1285 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1286 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1287 resetClient(c);
1288 return 1;
1289 }
1290
1291 /* Exec the command */
1292 dirty = server.dirty;
1293 cmd->proc(c);
1294 if (server.dirty-dirty != 0 && listLength(server.slaves))
1295 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1296 if (listLength(server.monitors))
1297 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1298 server.stat_numcommands++;
1299
1300 /* Prepare the client for the next command */
1301 if (c->flags & REDIS_CLOSE) {
1302 freeClient(c);
1303 return 0;
1304 }
1305 resetClient(c);
1306 return 1;
1307 }
1308
1309 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1310 listNode *ln;
1311 int outc = 0, j;
1312 robj **outv;
1313 /* (args*2)+1 is enough room for args, spaces, newlines */
1314 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1315
1316 if (argc <= REDIS_STATIC_ARGS) {
1317 outv = static_outv;
1318 } else {
1319 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1320 if (!outv) oom("replicationFeedSlaves");
1321 }
1322
1323 for (j = 0; j < argc; j++) {
1324 if (j != 0) outv[outc++] = shared.space;
1325 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1326 robj *lenobj;
1327
1328 lenobj = createObject(REDIS_STRING,
1329 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv[j]->ptr)));
1330 lenobj->refcount = 0;
1331 outv[outc++] = lenobj;
1332 }
1333 outv[outc++] = argv[j];
1334 }
1335 outv[outc++] = shared.crlf;
1336
1337 /* Increment all the refcounts at start and decrement at end in order to
1338 * be sure to free objects if there is no slave in a replication state
1339 * able to be feed with commands */
1340 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1341 listRewind(slaves);
1342 while((ln = listYield(slaves))) {
1343 redisClient *slave = ln->value;
1344
1345 /* Don't feed slaves that are still waiting for BGSAVE to start */
1346 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1347
1348 /* Feed all the other slaves, MONITORs and so on */
1349 if (slave->slaveseldb != dictid) {
1350 robj *selectcmd;
1351
1352 switch(dictid) {
1353 case 0: selectcmd = shared.select0; break;
1354 case 1: selectcmd = shared.select1; break;
1355 case 2: selectcmd = shared.select2; break;
1356 case 3: selectcmd = shared.select3; break;
1357 case 4: selectcmd = shared.select4; break;
1358 case 5: selectcmd = shared.select5; break;
1359 case 6: selectcmd = shared.select6; break;
1360 case 7: selectcmd = shared.select7; break;
1361 case 8: selectcmd = shared.select8; break;
1362 case 9: selectcmd = shared.select9; break;
1363 default:
1364 selectcmd = createObject(REDIS_STRING,
1365 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1366 selectcmd->refcount = 0;
1367 break;
1368 }
1369 addReply(slave,selectcmd);
1370 slave->slaveseldb = dictid;
1371 }
1372 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1373 }
1374 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1375 if (outv != static_outv) zfree(outv);
1376 }
1377
1378 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1379 redisClient *c = (redisClient*) privdata;
1380 char buf[REDIS_IOBUF_LEN];
1381 int nread;
1382 REDIS_NOTUSED(el);
1383 REDIS_NOTUSED(mask);
1384
1385 nread = read(fd, buf, REDIS_IOBUF_LEN);
1386 if (nread == -1) {
1387 if (errno == EAGAIN) {
1388 nread = 0;
1389 } else {
1390 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1391 freeClient(c);
1392 return;
1393 }
1394 } else if (nread == 0) {
1395 redisLog(REDIS_DEBUG, "Client closed connection");
1396 freeClient(c);
1397 return;
1398 }
1399 if (nread) {
1400 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1401 c->lastinteraction = time(NULL);
1402 } else {
1403 return;
1404 }
1405
1406 again:
1407 if (c->bulklen == -1) {
1408 /* Read the first line of the query */
1409 char *p = strchr(c->querybuf,'\n');
1410 size_t querylen;
1411 if (p) {
1412 sds query, *argv;
1413 int argc, j;
1414
1415 query = c->querybuf;
1416 c->querybuf = sdsempty();
1417 querylen = 1+(p-(query));
1418 if (sdslen(query) > querylen) {
1419 /* leave data after the first line of the query in the buffer */
1420 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1421 }
1422 *p = '\0'; /* remove "\n" */
1423 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1424 sdsupdatelen(query);
1425
1426 /* Now we can split the query in arguments */
1427 if (sdslen(query) == 0) {
1428 /* Ignore empty query */
1429 sdsfree(query);
1430 return;
1431 }
1432 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1433 if (argv == NULL) oom("sdssplitlen");
1434 sdsfree(query);
1435
1436 if (c->argv) zfree(c->argv);
1437 c->argv = zmalloc(sizeof(robj*)*argc);
1438 if (c->argv == NULL) oom("allocating arguments list for client");
1439
1440 for (j = 0; j < argc; j++) {
1441 if (sdslen(argv[j])) {
1442 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1443 c->argc++;
1444 } else {
1445 sdsfree(argv[j]);
1446 }
1447 }
1448 zfree(argv);
1449 /* Execute the command. If the client is still valid
1450 * after processCommand() return and there is something
1451 * on the query buffer try to process the next command. */
1452 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1453 return;
1454 } else if (sdslen(c->querybuf) >= 1024*32) {
1455 redisLog(REDIS_DEBUG, "Client protocol error");
1456 freeClient(c);
1457 return;
1458 }
1459 } else {
1460 /* Bulk read handling. Note that if we are at this point
1461 the client already sent a command terminated with a newline,
1462 we are reading the bulk data that is actually the last
1463 argument of the command. */
1464 int qbl = sdslen(c->querybuf);
1465
1466 if (c->bulklen <= qbl) {
1467 /* Copy everything but the final CRLF as final argument */
1468 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1469 c->argc++;
1470 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1471 processCommand(c);
1472 return;
1473 }
1474 }
1475 }
1476
1477 static int selectDb(redisClient *c, int id) {
1478 if (id < 0 || id >= server.dbnum)
1479 return REDIS_ERR;
1480 c->db = &server.db[id];
1481 return REDIS_OK;
1482 }
1483
1484 static void *dupClientReplyValue(void *o) {
1485 incrRefCount((robj*)o);
1486 return 0;
1487 }
1488
1489 static redisClient *createClient(int fd) {
1490 redisClient *c = zmalloc(sizeof(*c));
1491
1492 anetNonBlock(NULL,fd);
1493 anetTcpNoDelay(NULL,fd);
1494 if (!c) return NULL;
1495 selectDb(c,0);
1496 c->fd = fd;
1497 c->querybuf = sdsempty();
1498 c->argc = 0;
1499 c->argv = NULL;
1500 c->bulklen = -1;
1501 c->sentlen = 0;
1502 c->flags = 0;
1503 c->lastinteraction = time(NULL);
1504 c->authenticated = 0;
1505 c->replstate = REDIS_REPL_NONE;
1506 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1507 listSetFreeMethod(c->reply,decrRefCount);
1508 listSetDupMethod(c->reply,dupClientReplyValue);
1509 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1510 readQueryFromClient, c, NULL) == AE_ERR) {
1511 freeClient(c);
1512 return NULL;
1513 }
1514 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1515 return c;
1516 }
1517
1518 static void addReply(redisClient *c, robj *obj) {
1519 if (listLength(c->reply) == 0 &&
1520 (c->replstate == REDIS_REPL_NONE ||
1521 c->replstate == REDIS_REPL_ONLINE) &&
1522 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1523 sendReplyToClient, c, NULL) == AE_ERR) return;
1524 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1525 incrRefCount(obj);
1526 }
1527
1528 static void addReplySds(redisClient *c, sds s) {
1529 robj *o = createObject(REDIS_STRING,s);
1530 addReply(c,o);
1531 decrRefCount(o);
1532 }
1533
1534 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1535 int cport, cfd;
1536 char cip[128];
1537 redisClient *c;
1538 REDIS_NOTUSED(el);
1539 REDIS_NOTUSED(mask);
1540 REDIS_NOTUSED(privdata);
1541
1542 cfd = anetAccept(server.neterr, fd, cip, &cport);
1543 if (cfd == AE_ERR) {
1544 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1545 return;
1546 }
1547 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1548 if ((c = createClient(cfd)) == NULL) {
1549 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1550 close(cfd); /* May be already closed, just ingore errors */
1551 return;
1552 }
1553 /* If maxclient directive is set and this is one client more... close the
1554 * connection. Note that we create the client instead to check before
1555 * for this condition, since now the socket is already set in nonblocking
1556 * mode and we can send an error for free using the Kernel I/O */
1557 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1558 char *err = "-ERR max number of clients reached\r\n";
1559
1560 /* That's a best effort error message, don't check write errors */
1561 (void) write(c->fd,err,strlen(err));
1562 freeClient(c);
1563 return;
1564 }
1565 server.stat_numconnections++;
1566 }
1567
1568 /* ======================= Redis objects implementation ===================== */
1569
1570 static robj *createObject(int type, void *ptr) {
1571 robj *o;
1572
1573 if (listLength(server.objfreelist)) {
1574 listNode *head = listFirst(server.objfreelist);
1575 o = listNodeValue(head);
1576 listDelNode(server.objfreelist,head);
1577 } else {
1578 o = zmalloc(sizeof(*o));
1579 }
1580 if (!o) oom("createObject");
1581 o->type = type;
1582 o->ptr = ptr;
1583 o->refcount = 1;
1584 return o;
1585 }
1586
1587 static robj *createStringObject(char *ptr, size_t len) {
1588 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1589 }
1590
1591 static robj *createListObject(void) {
1592 list *l = listCreate();
1593
1594 if (!l) oom("listCreate");
1595 listSetFreeMethod(l,decrRefCount);
1596 return createObject(REDIS_LIST,l);
1597 }
1598
1599 static robj *createSetObject(void) {
1600 dict *d = dictCreate(&setDictType,NULL);
1601 if (!d) oom("dictCreate");
1602 return createObject(REDIS_SET,d);
1603 }
1604
1605 static void freeStringObject(robj *o) {
1606 sdsfree(o->ptr);
1607 }
1608
1609 static void freeListObject(robj *o) {
1610 listRelease((list*) o->ptr);
1611 }
1612
1613 static void freeSetObject(robj *o) {
1614 dictRelease((dict*) o->ptr);
1615 }
1616
1617 static void freeHashObject(robj *o) {
1618 dictRelease((dict*) o->ptr);
1619 }
1620
1621 static void incrRefCount(robj *o) {
1622 o->refcount++;
1623 #ifdef DEBUG_REFCOUNT
1624 if (o->type == REDIS_STRING)
1625 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1626 #endif
1627 }
1628
1629 static void decrRefCount(void *obj) {
1630 robj *o = obj;
1631
1632 #ifdef DEBUG_REFCOUNT
1633 if (o->type == REDIS_STRING)
1634 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1635 #endif
1636 if (--(o->refcount) == 0) {
1637 switch(o->type) {
1638 case REDIS_STRING: freeStringObject(o); break;
1639 case REDIS_LIST: freeListObject(o); break;
1640 case REDIS_SET: freeSetObject(o); break;
1641 case REDIS_HASH: freeHashObject(o); break;
1642 default: assert(0 != 0); break;
1643 }
1644 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1645 !listAddNodeHead(server.objfreelist,o))
1646 zfree(o);
1647 }
1648 }
1649
1650 /* Try to share an object against the shared objects pool */
1651 static robj *tryObjectSharing(robj *o) {
1652 struct dictEntry *de;
1653 unsigned long c;
1654
1655 if (o == NULL || server.shareobjects == 0) return o;
1656
1657 assert(o->type == REDIS_STRING);
1658 de = dictFind(server.sharingpool,o);
1659 if (de) {
1660 robj *shared = dictGetEntryKey(de);
1661
1662 c = ((unsigned long) dictGetEntryVal(de))+1;
1663 dictGetEntryVal(de) = (void*) c;
1664 incrRefCount(shared);
1665 decrRefCount(o);
1666 return shared;
1667 } else {
1668 /* Here we are using a stream algorihtm: Every time an object is
1669 * shared we increment its count, everytime there is a miss we
1670 * recrement the counter of a random object. If this object reaches
1671 * zero we remove the object and put the current object instead. */
1672 if (dictSize(server.sharingpool) >=
1673 server.sharingpoolsize) {
1674 de = dictGetRandomKey(server.sharingpool);
1675 assert(de != NULL);
1676 c = ((unsigned long) dictGetEntryVal(de))-1;
1677 dictGetEntryVal(de) = (void*) c;
1678 if (c == 0) {
1679 dictDelete(server.sharingpool,de->key);
1680 }
1681 } else {
1682 c = 0; /* If the pool is empty we want to add this object */
1683 }
1684 if (c == 0) {
1685 int retval;
1686
1687 retval = dictAdd(server.sharingpool,o,(void*)1);
1688 assert(retval == DICT_OK);
1689 incrRefCount(o);
1690 }
1691 return o;
1692 }
1693 }
1694
1695 static robj *lookupKey(redisDb *db, robj *key) {
1696 dictEntry *de = dictFind(db->dict,key);
1697 return de ? dictGetEntryVal(de) : NULL;
1698 }
1699
1700 static robj *lookupKeyRead(redisDb *db, robj *key) {
1701 expireIfNeeded(db,key);
1702 return lookupKey(db,key);
1703 }
1704
1705 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1706 deleteIfVolatile(db,key);
1707 return lookupKey(db,key);
1708 }
1709
1710 static int deleteKey(redisDb *db, robj *key) {
1711 int retval;
1712
1713 /* We need to protect key from destruction: after the first dictDelete()
1714 * it may happen that 'key' is no longer valid if we don't increment
1715 * it's count. This may happen when we get the object reference directly
1716 * from the hash table with dictRandomKey() or dict iterators */
1717 incrRefCount(key);
1718 if (dictSize(db->expires)) dictDelete(db->expires,key);
1719 retval = dictDelete(db->dict,key);
1720 decrRefCount(key);
1721
1722 return retval == DICT_OK;
1723 }
1724
1725 /*============================ DB saving/loading ============================ */
1726
1727 static int rdbSaveType(FILE *fp, unsigned char type) {
1728 if (fwrite(&type,1,1,fp) == 0) return -1;
1729 return 0;
1730 }
1731
1732 static int rdbSaveTime(FILE *fp, time_t t) {
1733 int32_t t32 = (int32_t) t;
1734 if (fwrite(&t32,4,1,fp) == 0) return -1;
1735 return 0;
1736 }
1737
1738 /* check rdbLoadLen() comments for more info */
1739 static int rdbSaveLen(FILE *fp, uint32_t len) {
1740 unsigned char buf[2];
1741
1742 if (len < (1<<6)) {
1743 /* Save a 6 bit len */
1744 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
1745 if (fwrite(buf,1,1,fp) == 0) return -1;
1746 } else if (len < (1<<14)) {
1747 /* Save a 14 bit len */
1748 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
1749 buf[1] = len&0xFF;
1750 if (fwrite(buf,2,1,fp) == 0) return -1;
1751 } else {
1752 /* Save a 32 bit len */
1753 buf[0] = (REDIS_RDB_32BITLEN<<6);
1754 if (fwrite(buf,1,1,fp) == 0) return -1;
1755 len = htonl(len);
1756 if (fwrite(&len,4,1,fp) == 0) return -1;
1757 }
1758 return 0;
1759 }
1760
1761 /* String objects in the form "2391" "-100" without any space and with a
1762 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1763 * encoded as integers to save space */
1764 int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
1765 long long value;
1766 char *endptr, buf[32];
1767
1768 /* Check if it's possible to encode this value as a number */
1769 value = strtoll(s, &endptr, 10);
1770 if (endptr[0] != '\0') return 0;
1771 snprintf(buf,32,"%lld",value);
1772
1773 /* If the number converted back into a string is not identical
1774 * then it's not possible to encode the string as integer */
1775 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
1776
1777 /* Finally check if it fits in our ranges */
1778 if (value >= -(1<<7) && value <= (1<<7)-1) {
1779 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
1780 enc[1] = value&0xFF;
1781 return 2;
1782 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
1783 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
1784 enc[1] = value&0xFF;
1785 enc[2] = (value>>8)&0xFF;
1786 return 3;
1787 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
1788 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
1789 enc[1] = value&0xFF;
1790 enc[2] = (value>>8)&0xFF;
1791 enc[3] = (value>>16)&0xFF;
1792 enc[4] = (value>>24)&0xFF;
1793 return 5;
1794 } else {
1795 return 0;
1796 }
1797 }
1798
1799 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
1800 unsigned int comprlen, outlen;
1801 unsigned char byte;
1802 void *out;
1803
1804 /* We require at least four bytes compression for this to be worth it */
1805 outlen = sdslen(obj->ptr)-4;
1806 if (outlen <= 0) return 0;
1807 if ((out = zmalloc(outlen+1)) == NULL) return 0;
1808 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
1809 if (comprlen == 0) {
1810 zfree(out);
1811 return 0;
1812 }
1813 /* Data compressed! Let's save it on disk */
1814 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
1815 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
1816 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
1817 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
1818 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
1819 zfree(out);
1820 return comprlen;
1821
1822 writeerr:
1823 zfree(out);
1824 return -1;
1825 }
1826
1827 /* Save a string objet as [len][data] on disk. If the object is a string
1828 * representation of an integer value we try to safe it in a special form */
1829 static int rdbSaveStringObject(FILE *fp, robj *obj) {
1830 size_t len = sdslen(obj->ptr);
1831 int enclen;
1832
1833 /* Try integer encoding */
1834 if (len <= 11) {
1835 unsigned char buf[5];
1836 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
1837 if (fwrite(buf,enclen,1,fp) == 0) return -1;
1838 return 0;
1839 }
1840 }
1841
1842 /* Try LZF compression - under 20 bytes it's unable to compress even
1843 * aaaaaaaaaaaaaaaaaa so skip it */
1844 if (1 && len > 20) {
1845 int retval;
1846
1847 retval = rdbSaveLzfStringObject(fp,obj);
1848 if (retval == -1) return -1;
1849 if (retval > 0) return 0;
1850 /* retval == 0 means data can't be compressed, save the old way */
1851 }
1852
1853 /* Store verbatim */
1854 if (rdbSaveLen(fp,len) == -1) return -1;
1855 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
1856 return 0;
1857 }
1858
1859 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1860 static int rdbSave(char *filename) {
1861 dictIterator *di = NULL;
1862 dictEntry *de;
1863 FILE *fp;
1864 char tmpfile[256];
1865 int j;
1866 time_t now = time(NULL);
1867
1868 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
1869 fp = fopen(tmpfile,"w");
1870 if (!fp) {
1871 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
1872 return REDIS_ERR;
1873 }
1874 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
1875 for (j = 0; j < server.dbnum; j++) {
1876 redisDb *db = server.db+j;
1877 dict *d = db->dict;
1878 if (dictSize(d) == 0) continue;
1879 di = dictGetIterator(d);
1880 if (!di) {
1881 fclose(fp);
1882 return REDIS_ERR;
1883 }
1884
1885 /* Write the SELECT DB opcode */
1886 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
1887 if (rdbSaveLen(fp,j) == -1) goto werr;
1888
1889 /* Iterate this DB writing every entry */
1890 while((de = dictNext(di)) != NULL) {
1891 robj *key = dictGetEntryKey(de);
1892 robj *o = dictGetEntryVal(de);
1893 time_t expiretime = getExpire(db,key);
1894
1895 /* Save the expire time */
1896 if (expiretime != -1) {
1897 /* If this key is already expired skip it */
1898 if (expiretime < now) continue;
1899 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
1900 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
1901 }
1902 /* Save the key and associated value */
1903 if (rdbSaveType(fp,o->type) == -1) goto werr;
1904 if (rdbSaveStringObject(fp,key) == -1) goto werr;
1905 if (o->type == REDIS_STRING) {
1906 /* Save a string value */
1907 if (rdbSaveStringObject(fp,o) == -1) goto werr;
1908 } else if (o->type == REDIS_LIST) {
1909 /* Save a list value */
1910 list *list = o->ptr;
1911 listNode *ln;
1912
1913 listRewind(list);
1914 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
1915 while((ln = listYield(list))) {
1916 robj *eleobj = listNodeValue(ln);
1917
1918 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1919 }
1920 } else if (o->type == REDIS_SET) {
1921 /* Save a set value */
1922 dict *set = o->ptr;
1923 dictIterator *di = dictGetIterator(set);
1924 dictEntry *de;
1925
1926 if (!set) oom("dictGetIteraotr");
1927 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
1928 while((de = dictNext(di)) != NULL) {
1929 robj *eleobj = dictGetEntryKey(de);
1930
1931 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1932 }
1933 dictReleaseIterator(di);
1934 } else {
1935 assert(0 != 0);
1936 }
1937 }
1938 dictReleaseIterator(di);
1939 }
1940 /* EOF opcode */
1941 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
1942
1943 /* Make sure data will not remain on the OS's output buffers */
1944 fflush(fp);
1945 fsync(fileno(fp));
1946 fclose(fp);
1947
1948 /* Use RENAME to make sure the DB file is changed atomically only
1949 * if the generate DB file is ok. */
1950 if (rename(tmpfile,filename) == -1) {
1951 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
1952 unlink(tmpfile);
1953 return REDIS_ERR;
1954 }
1955 redisLog(REDIS_NOTICE,"DB saved on disk");
1956 server.dirty = 0;
1957 server.lastsave = time(NULL);
1958 return REDIS_OK;
1959
1960 werr:
1961 fclose(fp);
1962 unlink(tmpfile);
1963 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1964 if (di) dictReleaseIterator(di);
1965 return REDIS_ERR;
1966 }
1967
1968 static int rdbSaveBackground(char *filename) {
1969 pid_t childpid;
1970
1971 if (server.bgsaveinprogress) return REDIS_ERR;
1972 if ((childpid = fork()) == 0) {
1973 /* Child */
1974 close(server.fd);
1975 if (rdbSave(filename) == REDIS_OK) {
1976 exit(0);
1977 } else {
1978 exit(1);
1979 }
1980 } else {
1981 /* Parent */
1982 if (childpid == -1) {
1983 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
1984 strerror(errno));
1985 return REDIS_ERR;
1986 }
1987 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
1988 server.bgsaveinprogress = 1;
1989 return REDIS_OK;
1990 }
1991 return REDIS_OK; /* unreached */
1992 }
1993
1994 static int rdbLoadType(FILE *fp) {
1995 unsigned char type;
1996 if (fread(&type,1,1,fp) == 0) return -1;
1997 return type;
1998 }
1999
2000 static time_t rdbLoadTime(FILE *fp) {
2001 int32_t t32;
2002 if (fread(&t32,4,1,fp) == 0) return -1;
2003 return (time_t) t32;
2004 }
2005
2006 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2007 * of this file for a description of how this are stored on disk.
2008 *
2009 * isencoded is set to 1 if the readed length is not actually a length but
2010 * an "encoding type", check the above comments for more info */
2011 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2012 unsigned char buf[2];
2013 uint32_t len;
2014
2015 if (isencoded) *isencoded = 0;
2016 if (rdbver == 0) {
2017 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2018 return ntohl(len);
2019 } else {
2020 int type;
2021
2022 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2023 type = (buf[0]&0xC0)>>6;
2024 if (type == REDIS_RDB_6BITLEN) {
2025 /* Read a 6 bit len */
2026 return buf[0]&0x3F;
2027 } else if (type == REDIS_RDB_ENCVAL) {
2028 /* Read a 6 bit len encoding type */
2029 if (isencoded) *isencoded = 1;
2030 return buf[0]&0x3F;
2031 } else if (type == REDIS_RDB_14BITLEN) {
2032 /* Read a 14 bit len */
2033 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2034 return ((buf[0]&0x3F)<<8)|buf[1];
2035 } else {
2036 /* Read a 32 bit len */
2037 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2038 return ntohl(len);
2039 }
2040 }
2041 }
2042
2043 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2044 unsigned char enc[4];
2045 long long val;
2046
2047 if (enctype == REDIS_RDB_ENC_INT8) {
2048 if (fread(enc,1,1,fp) == 0) return NULL;
2049 val = (signed char)enc[0];
2050 } else if (enctype == REDIS_RDB_ENC_INT16) {
2051 uint16_t v;
2052 if (fread(enc,2,1,fp) == 0) return NULL;
2053 v = enc[0]|(enc[1]<<8);
2054 val = (int16_t)v;
2055 } else if (enctype == REDIS_RDB_ENC_INT32) {
2056 uint32_t v;
2057 if (fread(enc,4,1,fp) == 0) return NULL;
2058 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2059 val = (int32_t)v;
2060 } else {
2061 val = 0; /* anti-warning */
2062 assert(0!=0);
2063 }
2064 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2065 }
2066
2067 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2068 unsigned int len, clen;
2069 unsigned char *c = NULL;
2070 sds val = NULL;
2071
2072 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2073 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2074 if ((c = zmalloc(clen)) == NULL) goto err;
2075 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2076 if (fread(c,clen,1,fp) == 0) goto err;
2077 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2078 zfree(c);
2079 return createObject(REDIS_STRING,val);
2080 err:
2081 zfree(c);
2082 sdsfree(val);
2083 return NULL;
2084 }
2085
2086 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2087 int isencoded;
2088 uint32_t len;
2089 sds val;
2090
2091 len = rdbLoadLen(fp,rdbver,&isencoded);
2092 if (isencoded) {
2093 switch(len) {
2094 case REDIS_RDB_ENC_INT8:
2095 case REDIS_RDB_ENC_INT16:
2096 case REDIS_RDB_ENC_INT32:
2097 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2098 case REDIS_RDB_ENC_LZF:
2099 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2100 default:
2101 assert(0!=0);
2102 }
2103 }
2104
2105 if (len == REDIS_RDB_LENERR) return NULL;
2106 val = sdsnewlen(NULL,len);
2107 if (len && fread(val,len,1,fp) == 0) {
2108 sdsfree(val);
2109 return NULL;
2110 }
2111 return tryObjectSharing(createObject(REDIS_STRING,val));
2112 }
2113
2114 static int rdbLoad(char *filename) {
2115 FILE *fp;
2116 robj *keyobj = NULL;
2117 uint32_t dbid;
2118 int type, retval, rdbver;
2119 dict *d = server.db[0].dict;
2120 redisDb *db = server.db+0;
2121 char buf[1024];
2122 time_t expiretime = -1, now = time(NULL);
2123
2124 fp = fopen(filename,"r");
2125 if (!fp) return REDIS_ERR;
2126 if (fread(buf,9,1,fp) == 0) goto eoferr;
2127 buf[9] = '\0';
2128 if (memcmp(buf,"REDIS",5) != 0) {
2129 fclose(fp);
2130 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2131 return REDIS_ERR;
2132 }
2133 rdbver = atoi(buf+5);
2134 if (rdbver > 1) {
2135 fclose(fp);
2136 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2137 return REDIS_ERR;
2138 }
2139 while(1) {
2140 robj *o;
2141
2142 /* Read type. */
2143 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2144 if (type == REDIS_EXPIRETIME) {
2145 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2146 /* We read the time so we need to read the object type again */
2147 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2148 }
2149 if (type == REDIS_EOF) break;
2150 /* Handle SELECT DB opcode as a special case */
2151 if (type == REDIS_SELECTDB) {
2152 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2153 goto eoferr;
2154 if (dbid >= (unsigned)server.dbnum) {
2155 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2156 exit(1);
2157 }
2158 db = server.db+dbid;
2159 d = db->dict;
2160 continue;
2161 }
2162 /* Read key */
2163 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2164
2165 if (type == REDIS_STRING) {
2166 /* Read string value */
2167 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2168 } else if (type == REDIS_LIST || type == REDIS_SET) {
2169 /* Read list/set value */
2170 uint32_t listlen;
2171
2172 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2173 goto eoferr;
2174 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2175 /* Load every single element of the list/set */
2176 while(listlen--) {
2177 robj *ele;
2178
2179 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2180 if (type == REDIS_LIST) {
2181 if (!listAddNodeTail((list*)o->ptr,ele))
2182 oom("listAddNodeTail");
2183 } else {
2184 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2185 oom("dictAdd");
2186 }
2187 }
2188 } else {
2189 assert(0 != 0);
2190 }
2191 /* Add the new object in the hash table */
2192 retval = dictAdd(d,keyobj,o);
2193 if (retval == DICT_ERR) {
2194 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2195 exit(1);
2196 }
2197 /* Set the expire time if needed */
2198 if (expiretime != -1) {
2199 setExpire(db,keyobj,expiretime);
2200 /* Delete this key if already expired */
2201 if (expiretime < now) deleteKey(db,keyobj);
2202 expiretime = -1;
2203 }
2204 keyobj = o = NULL;
2205 }
2206 fclose(fp);
2207 return REDIS_OK;
2208
2209 eoferr: /* unexpected end of file is handled here with a fatal exit */
2210 if (keyobj) decrRefCount(keyobj);
2211 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2212 exit(1);
2213 return REDIS_ERR; /* Just to avoid warning */
2214 }
2215
2216 /*================================== Commands =============================== */
2217
2218 static void authCommand(redisClient *c) {
2219 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2220 c->authenticated = 1;
2221 addReply(c,shared.ok);
2222 } else {
2223 c->authenticated = 0;
2224 addReply(c,shared.err);
2225 }
2226 }
2227
2228 static void pingCommand(redisClient *c) {
2229 addReply(c,shared.pong);
2230 }
2231
2232 static void echoCommand(redisClient *c) {
2233 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
2234 (int)sdslen(c->argv[1]->ptr)));
2235 addReply(c,c->argv[1]);
2236 addReply(c,shared.crlf);
2237 }
2238
2239 /*=================================== Strings =============================== */
2240
2241 static void setGenericCommand(redisClient *c, int nx) {
2242 int retval;
2243
2244 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2245 if (retval == DICT_ERR) {
2246 if (!nx) {
2247 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2248 incrRefCount(c->argv[2]);
2249 } else {
2250 addReply(c,shared.czero);
2251 return;
2252 }
2253 } else {
2254 incrRefCount(c->argv[1]);
2255 incrRefCount(c->argv[2]);
2256 }
2257 server.dirty++;
2258 removeExpire(c->db,c->argv[1]);
2259 addReply(c, nx ? shared.cone : shared.ok);
2260 }
2261
2262 static void setCommand(redisClient *c) {
2263 setGenericCommand(c,0);
2264 }
2265
2266 static void setnxCommand(redisClient *c) {
2267 setGenericCommand(c,1);
2268 }
2269
2270 static void getCommand(redisClient *c) {
2271 robj *o = lookupKeyRead(c->db,c->argv[1]);
2272
2273 if (o == NULL) {
2274 addReply(c,shared.nullbulk);
2275 } else {
2276 if (o->type != REDIS_STRING) {
2277 addReply(c,shared.wrongtypeerr);
2278 } else {
2279 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2280 addReply(c,o);
2281 addReply(c,shared.crlf);
2282 }
2283 }
2284 }
2285
2286 static void getSetCommand(redisClient *c) {
2287 getCommand(c);
2288 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2289 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2290 } else {
2291 incrRefCount(c->argv[1]);
2292 }
2293 incrRefCount(c->argv[2]);
2294 server.dirty++;
2295 removeExpire(c->db,c->argv[1]);
2296 }
2297
2298 static void mgetCommand(redisClient *c) {
2299 int j;
2300
2301 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2302 for (j = 1; j < c->argc; j++) {
2303 robj *o = lookupKeyRead(c->db,c->argv[j]);
2304 if (o == NULL) {
2305 addReply(c,shared.nullbulk);
2306 } else {
2307 if (o->type != REDIS_STRING) {
2308 addReply(c,shared.nullbulk);
2309 } else {
2310 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2311 addReply(c,o);
2312 addReply(c,shared.crlf);
2313 }
2314 }
2315 }
2316 }
2317
2318 static void incrDecrCommand(redisClient *c, long long incr) {
2319 long long value;
2320 int retval;
2321 robj *o;
2322
2323 o = lookupKeyWrite(c->db,c->argv[1]);
2324 if (o == NULL) {
2325 value = 0;
2326 } else {
2327 if (o->type != REDIS_STRING) {
2328 value = 0;
2329 } else {
2330 char *eptr;
2331
2332 value = strtoll(o->ptr, &eptr, 10);
2333 }
2334 }
2335
2336 value += incr;
2337 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2338 retval = dictAdd(c->db->dict,c->argv[1],o);
2339 if (retval == DICT_ERR) {
2340 dictReplace(c->db->dict,c->argv[1],o);
2341 removeExpire(c->db,c->argv[1]);
2342 } else {
2343 incrRefCount(c->argv[1]);
2344 }
2345 server.dirty++;
2346 addReply(c,shared.colon);
2347 addReply(c,o);
2348 addReply(c,shared.crlf);
2349 }
2350
2351 static void incrCommand(redisClient *c) {
2352 incrDecrCommand(c,1);
2353 }
2354
2355 static void decrCommand(redisClient *c) {
2356 incrDecrCommand(c,-1);
2357 }
2358
2359 static void incrbyCommand(redisClient *c) {
2360 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2361 incrDecrCommand(c,incr);
2362 }
2363
2364 static void decrbyCommand(redisClient *c) {
2365 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2366 incrDecrCommand(c,-incr);
2367 }
2368
2369 /* ========================= Type agnostic commands ========================= */
2370
2371 static void delCommand(redisClient *c) {
2372 int deleted = 0, j;
2373
2374 for (j = 1; j < c->argc; j++) {
2375 if (deleteKey(c->db,c->argv[j])) {
2376 server.dirty++;
2377 deleted++;
2378 }
2379 }
2380 switch(deleted) {
2381 case 0:
2382 addReply(c,shared.czero);
2383 break;
2384 case 1:
2385 addReply(c,shared.cone);
2386 break;
2387 default:
2388 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2389 break;
2390 }
2391 }
2392
2393 static void existsCommand(redisClient *c) {
2394 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2395 }
2396
2397 static void selectCommand(redisClient *c) {
2398 int id = atoi(c->argv[1]->ptr);
2399
2400 if (selectDb(c,id) == REDIS_ERR) {
2401 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2402 } else {
2403 addReply(c,shared.ok);
2404 }
2405 }
2406
2407 static void randomkeyCommand(redisClient *c) {
2408 dictEntry *de;
2409
2410 while(1) {
2411 de = dictGetRandomKey(c->db->dict);
2412 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2413 }
2414 if (de == NULL) {
2415 addReply(c,shared.plus);
2416 addReply(c,shared.crlf);
2417 } else {
2418 addReply(c,shared.plus);
2419 addReply(c,dictGetEntryKey(de));
2420 addReply(c,shared.crlf);
2421 }
2422 }
2423
2424 static void keysCommand(redisClient *c) {
2425 dictIterator *di;
2426 dictEntry *de;
2427 sds pattern = c->argv[1]->ptr;
2428 int plen = sdslen(pattern);
2429 int numkeys = 0, keyslen = 0;
2430 robj *lenobj = createObject(REDIS_STRING,NULL);
2431
2432 di = dictGetIterator(c->db->dict);
2433 if (!di) oom("dictGetIterator");
2434 addReply(c,lenobj);
2435 decrRefCount(lenobj);
2436 while((de = dictNext(di)) != NULL) {
2437 robj *keyobj = dictGetEntryKey(de);
2438
2439 sds key = keyobj->ptr;
2440 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2441 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2442 if (expireIfNeeded(c->db,keyobj) == 0) {
2443 if (numkeys != 0)
2444 addReply(c,shared.space);
2445 addReply(c,keyobj);
2446 numkeys++;
2447 keyslen += sdslen(key);
2448 }
2449 }
2450 }
2451 dictReleaseIterator(di);
2452 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2453 addReply(c,shared.crlf);
2454 }
2455
2456 static void dbsizeCommand(redisClient *c) {
2457 addReplySds(c,
2458 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2459 }
2460
2461 static void lastsaveCommand(redisClient *c) {
2462 addReplySds(c,
2463 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2464 }
2465
2466 static void typeCommand(redisClient *c) {
2467 robj *o;
2468 char *type;
2469
2470 o = lookupKeyRead(c->db,c->argv[1]);
2471 if (o == NULL) {
2472 type = "+none";
2473 } else {
2474 switch(o->type) {
2475 case REDIS_STRING: type = "+string"; break;
2476 case REDIS_LIST: type = "+list"; break;
2477 case REDIS_SET: type = "+set"; break;
2478 default: type = "unknown"; break;
2479 }
2480 }
2481 addReplySds(c,sdsnew(type));
2482 addReply(c,shared.crlf);
2483 }
2484
2485 static void saveCommand(redisClient *c) {
2486 if (server.bgsaveinprogress) {
2487 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2488 return;
2489 }
2490 if (rdbSave(server.dbfilename) == REDIS_OK) {
2491 addReply(c,shared.ok);
2492 } else {
2493 addReply(c,shared.err);
2494 }
2495 }
2496
2497 static void bgsaveCommand(redisClient *c) {
2498 if (server.bgsaveinprogress) {
2499 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2500 return;
2501 }
2502 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2503 addReply(c,shared.ok);
2504 } else {
2505 addReply(c,shared.err);
2506 }
2507 }
2508
2509 static void shutdownCommand(redisClient *c) {
2510 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2511 /* XXX: TODO kill the child if there is a bgsave in progress */
2512 if (rdbSave(server.dbfilename) == REDIS_OK) {
2513 if (server.daemonize) {
2514 unlink(server.pidfile);
2515 }
2516 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2517 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2518 exit(1);
2519 } else {
2520 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2521 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2522 }
2523 }
2524
2525 static void renameGenericCommand(redisClient *c, int nx) {
2526 robj *o;
2527
2528 /* To use the same key as src and dst is probably an error */
2529 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2530 addReply(c,shared.sameobjecterr);
2531 return;
2532 }
2533
2534 o = lookupKeyWrite(c->db,c->argv[1]);
2535 if (o == NULL) {
2536 addReply(c,shared.nokeyerr);
2537 return;
2538 }
2539 incrRefCount(o);
2540 deleteIfVolatile(c->db,c->argv[2]);
2541 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2542 if (nx) {
2543 decrRefCount(o);
2544 addReply(c,shared.czero);
2545 return;
2546 }
2547 dictReplace(c->db->dict,c->argv[2],o);
2548 } else {
2549 incrRefCount(c->argv[2]);
2550 }
2551 deleteKey(c->db,c->argv[1]);
2552 server.dirty++;
2553 addReply(c,nx ? shared.cone : shared.ok);
2554 }
2555
2556 static void renameCommand(redisClient *c) {
2557 renameGenericCommand(c,0);
2558 }
2559
2560 static void renamenxCommand(redisClient *c) {
2561 renameGenericCommand(c,1);
2562 }
2563
2564 static void moveCommand(redisClient *c) {
2565 robj *o;
2566 redisDb *src, *dst;
2567 int srcid;
2568
2569 /* Obtain source and target DB pointers */
2570 src = c->db;
2571 srcid = c->db->id;
2572 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2573 addReply(c,shared.outofrangeerr);
2574 return;
2575 }
2576 dst = c->db;
2577 selectDb(c,srcid); /* Back to the source DB */
2578
2579 /* If the user is moving using as target the same
2580 * DB as the source DB it is probably an error. */
2581 if (src == dst) {
2582 addReply(c,shared.sameobjecterr);
2583 return;
2584 }
2585
2586 /* Check if the element exists and get a reference */
2587 o = lookupKeyWrite(c->db,c->argv[1]);
2588 if (!o) {
2589 addReply(c,shared.czero);
2590 return;
2591 }
2592
2593 /* Try to add the element to the target DB */
2594 deleteIfVolatile(dst,c->argv[1]);
2595 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2596 addReply(c,shared.czero);
2597 return;
2598 }
2599 incrRefCount(c->argv[1]);
2600 incrRefCount(o);
2601
2602 /* OK! key moved, free the entry in the source DB */
2603 deleteKey(src,c->argv[1]);
2604 server.dirty++;
2605 addReply(c,shared.cone);
2606 }
2607
2608 /* =================================== Lists ================================ */
2609 static void pushGenericCommand(redisClient *c, int where) {
2610 robj *lobj;
2611 list *list;
2612
2613 lobj = lookupKeyWrite(c->db,c->argv[1]);
2614 if (lobj == NULL) {
2615 lobj = createListObject();
2616 list = lobj->ptr;
2617 if (where == REDIS_HEAD) {
2618 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2619 } else {
2620 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2621 }
2622 dictAdd(c->db->dict,c->argv[1],lobj);
2623 incrRefCount(c->argv[1]);
2624 incrRefCount(c->argv[2]);
2625 } else {
2626 if (lobj->type != REDIS_LIST) {
2627 addReply(c,shared.wrongtypeerr);
2628 return;
2629 }
2630 list = lobj->ptr;
2631 if (where == REDIS_HEAD) {
2632 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2633 } else {
2634 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2635 }
2636 incrRefCount(c->argv[2]);
2637 }
2638 server.dirty++;
2639 addReply(c,shared.ok);
2640 }
2641
2642 static void lpushCommand(redisClient *c) {
2643 pushGenericCommand(c,REDIS_HEAD);
2644 }
2645
2646 static void rpushCommand(redisClient *c) {
2647 pushGenericCommand(c,REDIS_TAIL);
2648 }
2649
2650 static void llenCommand(redisClient *c) {
2651 robj *o;
2652 list *l;
2653
2654 o = lookupKeyRead(c->db,c->argv[1]);
2655 if (o == NULL) {
2656 addReply(c,shared.czero);
2657 return;
2658 } else {
2659 if (o->type != REDIS_LIST) {
2660 addReply(c,shared.wrongtypeerr);
2661 } else {
2662 l = o->ptr;
2663 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
2664 }
2665 }
2666 }
2667
2668 static void lindexCommand(redisClient *c) {
2669 robj *o;
2670 int index = atoi(c->argv[2]->ptr);
2671
2672 o = lookupKeyRead(c->db,c->argv[1]);
2673 if (o == NULL) {
2674 addReply(c,shared.nullbulk);
2675 } else {
2676 if (o->type != REDIS_LIST) {
2677 addReply(c,shared.wrongtypeerr);
2678 } else {
2679 list *list = o->ptr;
2680 listNode *ln;
2681
2682 ln = listIndex(list, index);
2683 if (ln == NULL) {
2684 addReply(c,shared.nullbulk);
2685 } else {
2686 robj *ele = listNodeValue(ln);
2687 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2688 addReply(c,ele);
2689 addReply(c,shared.crlf);
2690 }
2691 }
2692 }
2693 }
2694
2695 static void lsetCommand(redisClient *c) {
2696 robj *o;
2697 int index = atoi(c->argv[2]->ptr);
2698
2699 o = lookupKeyWrite(c->db,c->argv[1]);
2700 if (o == NULL) {
2701 addReply(c,shared.nokeyerr);
2702 } else {
2703 if (o->type != REDIS_LIST) {
2704 addReply(c,shared.wrongtypeerr);
2705 } else {
2706 list *list = o->ptr;
2707 listNode *ln;
2708
2709 ln = listIndex(list, index);
2710 if (ln == NULL) {
2711 addReply(c,shared.outofrangeerr);
2712 } else {
2713 robj *ele = listNodeValue(ln);
2714
2715 decrRefCount(ele);
2716 listNodeValue(ln) = c->argv[3];
2717 incrRefCount(c->argv[3]);
2718 addReply(c,shared.ok);
2719 server.dirty++;
2720 }
2721 }
2722 }
2723 }
2724
2725 static void popGenericCommand(redisClient *c, int where) {
2726 robj *o;
2727
2728 o = lookupKeyWrite(c->db,c->argv[1]);
2729 if (o == NULL) {
2730 addReply(c,shared.nullbulk);
2731 } else {
2732 if (o->type != REDIS_LIST) {
2733 addReply(c,shared.wrongtypeerr);
2734 } else {
2735 list *list = o->ptr;
2736 listNode *ln;
2737
2738 if (where == REDIS_HEAD)
2739 ln = listFirst(list);
2740 else
2741 ln = listLast(list);
2742
2743 if (ln == NULL) {
2744 addReply(c,shared.nullbulk);
2745 } else {
2746 robj *ele = listNodeValue(ln);
2747 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2748 addReply(c,ele);
2749 addReply(c,shared.crlf);
2750 listDelNode(list,ln);
2751 server.dirty++;
2752 }
2753 }
2754 }
2755 }
2756
2757 static void lpopCommand(redisClient *c) {
2758 popGenericCommand(c,REDIS_HEAD);
2759 }
2760
2761 static void rpopCommand(redisClient *c) {
2762 popGenericCommand(c,REDIS_TAIL);
2763 }
2764
2765 static void lrangeCommand(redisClient *c) {
2766 robj *o;
2767 int start = atoi(c->argv[2]->ptr);
2768 int end = atoi(c->argv[3]->ptr);
2769
2770 o = lookupKeyRead(c->db,c->argv[1]);
2771 if (o == NULL) {
2772 addReply(c,shared.nullmultibulk);
2773 } else {
2774 if (o->type != REDIS_LIST) {
2775 addReply(c,shared.wrongtypeerr);
2776 } else {
2777 list *list = o->ptr;
2778 listNode *ln;
2779 int llen = listLength(list);
2780 int rangelen, j;
2781 robj *ele;
2782
2783 /* convert negative indexes */
2784 if (start < 0) start = llen+start;
2785 if (end < 0) end = llen+end;
2786 if (start < 0) start = 0;
2787 if (end < 0) end = 0;
2788
2789 /* indexes sanity checks */
2790 if (start > end || start >= llen) {
2791 /* Out of range start or start > end result in empty list */
2792 addReply(c,shared.emptymultibulk);
2793 return;
2794 }
2795 if (end >= llen) end = llen-1;
2796 rangelen = (end-start)+1;
2797
2798 /* Return the result in form of a multi-bulk reply */
2799 ln = listIndex(list, start);
2800 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
2801 for (j = 0; j < rangelen; j++) {
2802 ele = listNodeValue(ln);
2803 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2804 addReply(c,ele);
2805 addReply(c,shared.crlf);
2806 ln = ln->next;
2807 }
2808 }
2809 }
2810 }
2811
2812 static void ltrimCommand(redisClient *c) {
2813 robj *o;
2814 int start = atoi(c->argv[2]->ptr);
2815 int end = atoi(c->argv[3]->ptr);
2816
2817 o = lookupKeyWrite(c->db,c->argv[1]);
2818 if (o == NULL) {
2819 addReply(c,shared.nokeyerr);
2820 } else {
2821 if (o->type != REDIS_LIST) {
2822 addReply(c,shared.wrongtypeerr);
2823 } else {
2824 list *list = o->ptr;
2825 listNode *ln;
2826 int llen = listLength(list);
2827 int j, ltrim, rtrim;
2828
2829 /* convert negative indexes */
2830 if (start < 0) start = llen+start;
2831 if (end < 0) end = llen+end;
2832 if (start < 0) start = 0;
2833 if (end < 0) end = 0;
2834
2835 /* indexes sanity checks */
2836 if (start > end || start >= llen) {
2837 /* Out of range start or start > end result in empty list */
2838 ltrim = llen;
2839 rtrim = 0;
2840 } else {
2841 if (end >= llen) end = llen-1;
2842 ltrim = start;
2843 rtrim = llen-end-1;
2844 }
2845
2846 /* Remove list elements to perform the trim */
2847 for (j = 0; j < ltrim; j++) {
2848 ln = listFirst(list);
2849 listDelNode(list,ln);
2850 }
2851 for (j = 0; j < rtrim; j++) {
2852 ln = listLast(list);
2853 listDelNode(list,ln);
2854 }
2855 addReply(c,shared.ok);
2856 server.dirty++;
2857 }
2858 }
2859 }
2860
2861 static void lremCommand(redisClient *c) {
2862 robj *o;
2863
2864 o = lookupKeyWrite(c->db,c->argv[1]);
2865 if (o == NULL) {
2866 addReply(c,shared.nokeyerr);
2867 } else {
2868 if (o->type != REDIS_LIST) {
2869 addReply(c,shared.wrongtypeerr);
2870 } else {
2871 list *list = o->ptr;
2872 listNode *ln, *next;
2873 int toremove = atoi(c->argv[2]->ptr);
2874 int removed = 0;
2875 int fromtail = 0;
2876
2877 if (toremove < 0) {
2878 toremove = -toremove;
2879 fromtail = 1;
2880 }
2881 ln = fromtail ? list->tail : list->head;
2882 while (ln) {
2883 robj *ele = listNodeValue(ln);
2884
2885 next = fromtail ? ln->prev : ln->next;
2886 if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) {
2887 listDelNode(list,ln);
2888 server.dirty++;
2889 removed++;
2890 if (toremove && removed == toremove) break;
2891 }
2892 ln = next;
2893 }
2894 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
2895 }
2896 }
2897 }
2898
2899 /* ==================================== Sets ================================ */
2900
2901 static void saddCommand(redisClient *c) {
2902 robj *set;
2903
2904 set = lookupKeyWrite(c->db,c->argv[1]);
2905 if (set == NULL) {
2906 set = createSetObject();
2907 dictAdd(c->db->dict,c->argv[1],set);
2908 incrRefCount(c->argv[1]);
2909 } else {
2910 if (set->type != REDIS_SET) {
2911 addReply(c,shared.wrongtypeerr);
2912 return;
2913 }
2914 }
2915 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
2916 incrRefCount(c->argv[2]);
2917 server.dirty++;
2918 addReply(c,shared.cone);
2919 } else {
2920 addReply(c,shared.czero);
2921 }
2922 }
2923
2924 static void sremCommand(redisClient *c) {
2925 robj *set;
2926
2927 set = lookupKeyWrite(c->db,c->argv[1]);
2928 if (set == NULL) {
2929 addReply(c,shared.czero);
2930 } else {
2931 if (set->type != REDIS_SET) {
2932 addReply(c,shared.wrongtypeerr);
2933 return;
2934 }
2935 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
2936 server.dirty++;
2937 addReply(c,shared.cone);
2938 } else {
2939 addReply(c,shared.czero);
2940 }
2941 }
2942 }
2943
2944 static void smoveCommand(redisClient *c) {
2945 robj *srcset, *dstset;
2946
2947 srcset = lookupKeyWrite(c->db,c->argv[1]);
2948 dstset = lookupKeyWrite(c->db,c->argv[2]);
2949
2950 /* If the source key does not exist return 0, if it's of the wrong type
2951 * raise an error */
2952 if (srcset == NULL || srcset->type != REDIS_SET) {
2953 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
2954 return;
2955 }
2956 /* Error if the destination key is not a set as well */
2957 if (dstset && dstset->type != REDIS_SET) {
2958 addReply(c,shared.wrongtypeerr);
2959 return;
2960 }
2961 /* Remove the element from the source set */
2962 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
2963 /* Key not found in the src set! return zero */
2964 addReply(c,shared.czero);
2965 return;
2966 }
2967 server.dirty++;
2968 /* Add the element to the destination set */
2969 if (!dstset) {
2970 dstset = createSetObject();
2971 dictAdd(c->db->dict,c->argv[2],dstset);
2972 incrRefCount(c->argv[2]);
2973 }
2974 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
2975 incrRefCount(c->argv[3]);
2976 addReply(c,shared.cone);
2977 }
2978
2979 static void sismemberCommand(redisClient *c) {
2980 robj *set;
2981
2982 set = lookupKeyRead(c->db,c->argv[1]);
2983 if (set == NULL) {
2984 addReply(c,shared.czero);
2985 } else {
2986 if (set->type != REDIS_SET) {
2987 addReply(c,shared.wrongtypeerr);
2988 return;
2989 }
2990 if (dictFind(set->ptr,c->argv[2]))
2991 addReply(c,shared.cone);
2992 else
2993 addReply(c,shared.czero);
2994 }
2995 }
2996
2997 static void scardCommand(redisClient *c) {
2998 robj *o;
2999 dict *s;
3000
3001 o = lookupKeyRead(c->db,c->argv[1]);
3002 if (o == NULL) {
3003 addReply(c,shared.czero);
3004 return;
3005 } else {
3006 if (o->type != REDIS_SET) {
3007 addReply(c,shared.wrongtypeerr);
3008 } else {
3009 s = o->ptr;
3010 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3011 dictSize(s)));
3012 }
3013 }
3014 }
3015
3016 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3017 dict **d1 = (void*) s1, **d2 = (void*) s2;
3018
3019 return dictSize(*d1)-dictSize(*d2);
3020 }
3021
3022 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3023 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3024 dictIterator *di;
3025 dictEntry *de;
3026 robj *lenobj = NULL, *dstset = NULL;
3027 int j, cardinality = 0;
3028
3029 if (!dv) oom("sinterGenericCommand");
3030 for (j = 0; j < setsnum; j++) {
3031 robj *setobj;
3032
3033 setobj = dstkey ?
3034 lookupKeyWrite(c->db,setskeys[j]) :
3035 lookupKeyRead(c->db,setskeys[j]);
3036 if (!setobj) {
3037 zfree(dv);
3038 if (dstkey) {
3039 deleteKey(c->db,dstkey);
3040 addReply(c,shared.ok);
3041 } else {
3042 addReply(c,shared.nullmultibulk);
3043 }
3044 return;
3045 }
3046 if (setobj->type != REDIS_SET) {
3047 zfree(dv);
3048 addReply(c,shared.wrongtypeerr);
3049 return;
3050 }
3051 dv[j] = setobj->ptr;
3052 }
3053 /* Sort sets from the smallest to largest, this will improve our
3054 * algorithm's performace */
3055 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3056
3057 /* The first thing we should output is the total number of elements...
3058 * since this is a multi-bulk write, but at this stage we don't know
3059 * the intersection set size, so we use a trick, append an empty object
3060 * to the output list and save the pointer to later modify it with the
3061 * right length */
3062 if (!dstkey) {
3063 lenobj = createObject(REDIS_STRING,NULL);
3064 addReply(c,lenobj);
3065 decrRefCount(lenobj);
3066 } else {
3067 /* If we have a target key where to store the resulting set
3068 * create this key with an empty set inside */
3069 dstset = createSetObject();
3070 }
3071
3072 /* Iterate all the elements of the first (smallest) set, and test
3073 * the element against all the other sets, if at least one set does
3074 * not include the element it is discarded */
3075 di = dictGetIterator(dv[0]);
3076 if (!di) oom("dictGetIterator");
3077
3078 while((de = dictNext(di)) != NULL) {
3079 robj *ele;
3080
3081 for (j = 1; j < setsnum; j++)
3082 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3083 if (j != setsnum)
3084 continue; /* at least one set does not contain the member */
3085 ele = dictGetEntryKey(de);
3086 if (!dstkey) {
3087 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
3088 addReply(c,ele);
3089 addReply(c,shared.crlf);
3090 cardinality++;
3091 } else {
3092 dictAdd(dstset->ptr,ele,NULL);
3093 incrRefCount(ele);
3094 }
3095 }
3096 dictReleaseIterator(di);
3097
3098 if (dstkey) {
3099 /* Store the resulting set into the target */
3100 deleteKey(c->db,dstkey);
3101 dictAdd(c->db->dict,dstkey,dstset);
3102 incrRefCount(dstkey);
3103 }
3104
3105 if (!dstkey) {
3106 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3107 } else {
3108 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3109 dictSize((dict*)dstset->ptr)));
3110 server.dirty++;
3111 }
3112 zfree(dv);
3113 }
3114
3115 static void sinterCommand(redisClient *c) {
3116 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3117 }
3118
3119 static void sinterstoreCommand(redisClient *c) {
3120 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3121 }
3122
3123 #define REDIS_OP_UNION 0
3124 #define REDIS_OP_DIFF 1
3125
3126 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3127 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3128 dictIterator *di;
3129 dictEntry *de;
3130 robj *dstset = NULL;
3131 int j, cardinality = 0;
3132
3133 if (!dv) oom("sunionDiffGenericCommand");
3134 for (j = 0; j < setsnum; j++) {
3135 robj *setobj;
3136
3137 setobj = dstkey ?
3138 lookupKeyWrite(c->db,setskeys[j]) :
3139 lookupKeyRead(c->db,setskeys[j]);
3140 if (!setobj) {
3141 dv[j] = NULL;
3142 continue;
3143 }
3144 if (setobj->type != REDIS_SET) {
3145 zfree(dv);
3146 addReply(c,shared.wrongtypeerr);
3147 return;
3148 }
3149 dv[j] = setobj->ptr;
3150 }
3151
3152 /* We need a temp set object to store our union. If the dstkey
3153 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3154 * this set object will be the resulting object to set into the target key*/
3155 dstset = createSetObject();
3156
3157 /* Iterate all the elements of all the sets, add every element a single
3158 * time to the result set */
3159 for (j = 0; j < setsnum; j++) {
3160 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3161 if (!dv[j]) continue; /* non existing keys are like empty sets */
3162
3163 di = dictGetIterator(dv[j]);
3164 if (!di) oom("dictGetIterator");
3165
3166 while((de = dictNext(di)) != NULL) {
3167 robj *ele;
3168
3169 /* dictAdd will not add the same element multiple times */
3170 ele = dictGetEntryKey(de);
3171 if (op == REDIS_OP_UNION || j == 0) {
3172 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3173 incrRefCount(ele);
3174 cardinality++;
3175 }
3176 } else if (op == REDIS_OP_DIFF) {
3177 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3178 cardinality--;
3179 }
3180 }
3181 }
3182 dictReleaseIterator(di);
3183
3184 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3185 }
3186
3187 /* Output the content of the resulting set, if not in STORE mode */
3188 if (!dstkey) {
3189 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3190 di = dictGetIterator(dstset->ptr);
3191 if (!di) oom("dictGetIterator");
3192 while((de = dictNext(di)) != NULL) {
3193 robj *ele;
3194
3195 ele = dictGetEntryKey(de);
3196 addReplySds(c,sdscatprintf(sdsempty(),
3197 "$%d\r\n",sdslen(ele->ptr)));
3198 addReply(c,ele);
3199 addReply(c,shared.crlf);
3200 }
3201 dictReleaseIterator(di);
3202 } else {
3203 /* If we have a target key where to store the resulting set
3204 * create this key with the result set inside */
3205 deleteKey(c->db,dstkey);
3206 dictAdd(c->db->dict,dstkey,dstset);
3207 incrRefCount(dstkey);
3208 }
3209
3210 /* Cleanup */
3211 if (!dstkey) {
3212 decrRefCount(dstset);
3213 } else {
3214 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3215 dictSize((dict*)dstset->ptr)));
3216 server.dirty++;
3217 }
3218 zfree(dv);
3219 }
3220
3221 static void sunionCommand(redisClient *c) {
3222 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3223 }
3224
3225 static void sunionstoreCommand(redisClient *c) {
3226 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3227 }
3228
3229 static void sdiffCommand(redisClient *c) {
3230 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3231 }
3232
3233 static void sdiffstoreCommand(redisClient *c) {
3234 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3235 }
3236
3237 static void flushdbCommand(redisClient *c) {
3238 server.dirty += dictSize(c->db->dict);
3239 dictEmpty(c->db->dict);
3240 dictEmpty(c->db->expires);
3241 addReply(c,shared.ok);
3242 }
3243
3244 static void flushallCommand(redisClient *c) {
3245 server.dirty += emptyDb();
3246 addReply(c,shared.ok);
3247 rdbSave(server.dbfilename);
3248 server.dirty++;
3249 }
3250
3251 redisSortOperation *createSortOperation(int type, robj *pattern) {
3252 redisSortOperation *so = zmalloc(sizeof(*so));
3253 if (!so) oom("createSortOperation");
3254 so->type = type;
3255 so->pattern = pattern;
3256 return so;
3257 }
3258
3259 /* Return the value associated to the key with a name obtained
3260 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3261 robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3262 char *p;
3263 sds spat, ssub;
3264 robj keyobj;
3265 int prefixlen, sublen, postfixlen;
3266 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3267 struct {
3268 long len;
3269 long free;
3270 char buf[REDIS_SORTKEY_MAX+1];
3271 } keyname;
3272
3273 spat = pattern->ptr;
3274 ssub = subst->ptr;
3275 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3276 p = strchr(spat,'*');
3277 if (!p) return NULL;
3278
3279 prefixlen = p-spat;
3280 sublen = sdslen(ssub);
3281 postfixlen = sdslen(spat)-(prefixlen+1);
3282 memcpy(keyname.buf,spat,prefixlen);
3283 memcpy(keyname.buf+prefixlen,ssub,sublen);
3284 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3285 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3286 keyname.len = prefixlen+sublen+postfixlen;
3287
3288 keyobj.refcount = 1;
3289 keyobj.type = REDIS_STRING;
3290 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3291
3292 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3293 return lookupKeyRead(db,&keyobj);
3294 }
3295
3296 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3297 * the additional parameter is not standard but a BSD-specific we have to
3298 * pass sorting parameters via the global 'server' structure */
3299 static int sortCompare(const void *s1, const void *s2) {
3300 const redisSortObject *so1 = s1, *so2 = s2;
3301 int cmp;
3302
3303 if (!server.sort_alpha) {
3304 /* Numeric sorting. Here it's trivial as we precomputed scores */
3305 if (so1->u.score > so2->u.score) {
3306 cmp = 1;
3307 } else if (so1->u.score < so2->u.score) {
3308 cmp = -1;
3309 } else {
3310 cmp = 0;
3311 }
3312 } else {
3313 /* Alphanumeric sorting */
3314 if (server.sort_bypattern) {
3315 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3316 /* At least one compare object is NULL */
3317 if (so1->u.cmpobj == so2->u.cmpobj)
3318 cmp = 0;
3319 else if (so1->u.cmpobj == NULL)
3320 cmp = -1;
3321 else
3322 cmp = 1;
3323 } else {
3324 /* We have both the objects, use strcoll */
3325 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3326 }
3327 } else {
3328 /* Compare elements directly */
3329 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3330 }
3331 }
3332 return server.sort_desc ? -cmp : cmp;
3333 }
3334
3335 /* The SORT command is the most complex command in Redis. Warning: this code
3336 * is optimized for speed and a bit less for readability */
3337 static void sortCommand(redisClient *c) {
3338 list *operations;
3339 int outputlen = 0;
3340 int desc = 0, alpha = 0;
3341 int limit_start = 0, limit_count = -1, start, end;
3342 int j, dontsort = 0, vectorlen;
3343 int getop = 0; /* GET operation counter */
3344 robj *sortval, *sortby = NULL;
3345 redisSortObject *vector; /* Resulting vector to sort */
3346
3347 /* Lookup the key to sort. It must be of the right types */
3348 sortval = lookupKeyRead(c->db,c->argv[1]);
3349 if (sortval == NULL) {
3350 addReply(c,shared.nokeyerr);
3351 return;
3352 }
3353 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3354 addReply(c,shared.wrongtypeerr);
3355 return;
3356 }
3357
3358 /* Create a list of operations to perform for every sorted element.
3359 * Operations can be GET/DEL/INCR/DECR */
3360 operations = listCreate();
3361 listSetFreeMethod(operations,zfree);
3362 j = 2;
3363
3364 /* Now we need to protect sortval incrementing its count, in the future
3365 * SORT may have options able to overwrite/delete keys during the sorting
3366 * and the sorted key itself may get destroied */
3367 incrRefCount(sortval);
3368
3369 /* The SORT command has an SQL-alike syntax, parse it */
3370 while(j < c->argc) {
3371 int leftargs = c->argc-j-1;
3372 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3373 desc = 0;
3374 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3375 desc = 1;
3376 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3377 alpha = 1;
3378 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3379 limit_start = atoi(c->argv[j+1]->ptr);
3380 limit_count = atoi(c->argv[j+2]->ptr);
3381 j+=2;
3382 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3383 sortby = c->argv[j+1];
3384 /* If the BY pattern does not contain '*', i.e. it is constant,
3385 * we don't need to sort nor to lookup the weight keys. */
3386 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3387 j++;
3388 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3389 listAddNodeTail(operations,createSortOperation(
3390 REDIS_SORT_GET,c->argv[j+1]));
3391 getop++;
3392 j++;
3393 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3394 listAddNodeTail(operations,createSortOperation(
3395 REDIS_SORT_DEL,c->argv[j+1]));
3396 j++;
3397 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3398 listAddNodeTail(operations,createSortOperation(
3399 REDIS_SORT_INCR,c->argv[j+1]));
3400 j++;
3401 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3402 listAddNodeTail(operations,createSortOperation(
3403 REDIS_SORT_DECR,c->argv[j+1]));
3404 j++;
3405 } else {
3406 decrRefCount(sortval);
3407 listRelease(operations);
3408 addReply(c,shared.syntaxerr);
3409 return;
3410 }
3411 j++;
3412 }
3413
3414 /* Load the sorting vector with all the objects to sort */
3415 vectorlen = (sortval->type == REDIS_LIST) ?
3416 listLength((list*)sortval->ptr) :
3417 dictSize((dict*)sortval->ptr);
3418 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3419 if (!vector) oom("allocating objects vector for SORT");
3420 j = 0;
3421 if (sortval->type == REDIS_LIST) {
3422 list *list = sortval->ptr;
3423 listNode *ln;
3424
3425 listRewind(list);
3426 while((ln = listYield(list))) {
3427 robj *ele = ln->value;
3428 vector[j].obj = ele;
3429 vector[j].u.score = 0;
3430 vector[j].u.cmpobj = NULL;
3431 j++;
3432 }
3433 } else {
3434 dict *set = sortval->ptr;
3435 dictIterator *di;
3436 dictEntry *setele;
3437
3438 di = dictGetIterator(set);
3439 if (!di) oom("dictGetIterator");
3440 while((setele = dictNext(di)) != NULL) {
3441 vector[j].obj = dictGetEntryKey(setele);
3442 vector[j].u.score = 0;
3443 vector[j].u.cmpobj = NULL;
3444 j++;
3445 }
3446 dictReleaseIterator(di);
3447 }
3448 assert(j == vectorlen);
3449
3450 /* Now it's time to load the right scores in the sorting vector */
3451 if (dontsort == 0) {
3452 for (j = 0; j < vectorlen; j++) {
3453 if (sortby) {
3454 robj *byval;
3455
3456 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3457 if (!byval || byval->type != REDIS_STRING) continue;
3458 if (alpha) {
3459 vector[j].u.cmpobj = byval;
3460 incrRefCount(byval);
3461 } else {
3462 vector[j].u.score = strtod(byval->ptr,NULL);
3463 }
3464 } else {
3465 if (!alpha) vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3466 }
3467 }
3468 }
3469
3470 /* We are ready to sort the vector... perform a bit of sanity check
3471 * on the LIMIT option too. We'll use a partial version of quicksort. */
3472 start = (limit_start < 0) ? 0 : limit_start;
3473 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3474 if (start >= vectorlen) {
3475 start = vectorlen-1;
3476 end = vectorlen-2;
3477 }
3478 if (end >= vectorlen) end = vectorlen-1;
3479
3480 if (dontsort == 0) {
3481 server.sort_desc = desc;
3482 server.sort_alpha = alpha;
3483 server.sort_bypattern = sortby ? 1 : 0;
3484 if (sortby && (start != 0 || end != vectorlen-1))
3485 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3486 else
3487 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3488 }
3489
3490 /* Send command output to the output buffer, performing the specified
3491 * GET/DEL/INCR/DECR operations if any. */
3492 outputlen = getop ? getop*(end-start+1) : end-start+1;
3493 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3494 for (j = start; j <= end; j++) {
3495 listNode *ln;
3496 if (!getop) {
3497 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3498 sdslen(vector[j].obj->ptr)));
3499 addReply(c,vector[j].obj);
3500 addReply(c,shared.crlf);
3501 }
3502 listRewind(operations);
3503 while((ln = listYield(operations))) {
3504 redisSortOperation *sop = ln->value;
3505 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3506 vector[j].obj);
3507
3508 if (sop->type == REDIS_SORT_GET) {
3509 if (!val || val->type != REDIS_STRING) {
3510 addReply(c,shared.nullbulk);
3511 } else {
3512 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3513 sdslen(val->ptr)));
3514 addReply(c,val);
3515 addReply(c,shared.crlf);
3516 }
3517 } else if (sop->type == REDIS_SORT_DEL) {
3518 /* TODO */
3519 }
3520 }
3521 }
3522
3523 /* Cleanup */
3524 decrRefCount(sortval);
3525 listRelease(operations);
3526 for (j = 0; j < vectorlen; j++) {
3527 if (sortby && alpha && vector[j].u.cmpobj)
3528 decrRefCount(vector[j].u.cmpobj);
3529 }
3530 zfree(vector);
3531 }
3532
3533 static void infoCommand(redisClient *c) {
3534 sds info;
3535 time_t uptime = time(NULL)-server.stat_starttime;
3536
3537 info = sdscatprintf(sdsempty(),
3538 "redis_version:%s\r\n"
3539 "uptime_in_seconds:%d\r\n"
3540 "uptime_in_days:%d\r\n"
3541 "connected_clients:%d\r\n"
3542 "connected_slaves:%d\r\n"
3543 "used_memory:%zu\r\n"
3544 "changes_since_last_save:%lld\r\n"
3545 "bgsave_in_progress:%d\r\n"
3546 "last_save_time:%d\r\n"
3547 "total_connections_received:%lld\r\n"
3548 "total_commands_processed:%lld\r\n"
3549 "role:%s\r\n"
3550 ,REDIS_VERSION,
3551 uptime,
3552 uptime/(3600*24),
3553 listLength(server.clients)-listLength(server.slaves),
3554 listLength(server.slaves),
3555 server.usedmemory,
3556 server.dirty,
3557 server.bgsaveinprogress,
3558 server.lastsave,
3559 server.stat_numconnections,
3560 server.stat_numcommands,
3561 server.masterhost == NULL ? "master" : "slave"
3562 );
3563 if (server.masterhost) {
3564 info = sdscatprintf(info,
3565 "master_host:%s\r\n"
3566 "master_port:%d\r\n"
3567 "master_link_status:%s\r\n"
3568 "master_last_io_seconds_ago:%d\r\n"
3569 ,server.masterhost,
3570 server.masterport,
3571 (server.replstate == REDIS_REPL_CONNECTED) ?
3572 "up" : "down",
3573 (int)(time(NULL)-server.master->lastinteraction)
3574 );
3575 }
3576 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
3577 addReplySds(c,info);
3578 addReply(c,shared.crlf);
3579 }
3580
3581 static void monitorCommand(redisClient *c) {
3582 /* ignore MONITOR if aleady slave or in monitor mode */
3583 if (c->flags & REDIS_SLAVE) return;
3584
3585 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3586 c->slaveseldb = 0;
3587 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3588 addReply(c,shared.ok);
3589 }
3590
3591 /* ================================= Expire ================================= */
3592 static int removeExpire(redisDb *db, robj *key) {
3593 if (dictDelete(db->expires,key) == DICT_OK) {
3594 return 1;
3595 } else {
3596 return 0;
3597 }
3598 }
3599
3600 static int setExpire(redisDb *db, robj *key, time_t when) {
3601 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
3602 return 0;
3603 } else {
3604 incrRefCount(key);
3605 return 1;
3606 }
3607 }
3608
3609 /* Return the expire time of the specified key, or -1 if no expire
3610 * is associated with this key (i.e. the key is non volatile) */
3611 static time_t getExpire(redisDb *db, robj *key) {
3612 dictEntry *de;
3613
3614 /* No expire? return ASAP */
3615 if (dictSize(db->expires) == 0 ||
3616 (de = dictFind(db->expires,key)) == NULL) return -1;
3617
3618 return (time_t) dictGetEntryVal(de);
3619 }
3620
3621 static int expireIfNeeded(redisDb *db, robj *key) {
3622 time_t when;
3623 dictEntry *de;
3624
3625 /* No expire? return ASAP */
3626 if (dictSize(db->expires) == 0 ||
3627 (de = dictFind(db->expires,key)) == NULL) return 0;
3628
3629 /* Lookup the expire */
3630 when = (time_t) dictGetEntryVal(de);
3631 if (time(NULL) <= when) return 0;
3632
3633 /* Delete the key */
3634 dictDelete(db->expires,key);
3635 return dictDelete(db->dict,key) == DICT_OK;
3636 }
3637
3638 static int deleteIfVolatile(redisDb *db, robj *key) {
3639 dictEntry *de;
3640
3641 /* No expire? return ASAP */
3642 if (dictSize(db->expires) == 0 ||
3643 (de = dictFind(db->expires,key)) == NULL) return 0;
3644
3645 /* Delete the key */
3646 server.dirty++;
3647 dictDelete(db->expires,key);
3648 return dictDelete(db->dict,key) == DICT_OK;
3649 }
3650
3651 static void expireCommand(redisClient *c) {
3652 dictEntry *de;
3653 int seconds = atoi(c->argv[2]->ptr);
3654
3655 de = dictFind(c->db->dict,c->argv[1]);
3656 if (de == NULL) {
3657 addReply(c,shared.czero);
3658 return;
3659 }
3660 if (seconds <= 0) {
3661 addReply(c, shared.czero);
3662 return;
3663 } else {
3664 time_t when = time(NULL)+seconds;
3665 if (setExpire(c->db,c->argv[1],when))
3666 addReply(c,shared.cone);
3667 else
3668 addReply(c,shared.czero);
3669 return;
3670 }
3671 }
3672
3673 static void ttlCommand(redisClient *c) {
3674 time_t expire;
3675 int ttl = -1;
3676
3677 expire = getExpire(c->db,c->argv[1]);
3678 if (expire != -1) {
3679 ttl = (int) (expire-time(NULL));
3680 if (ttl < 0) ttl = -1;
3681 }
3682 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
3683 }
3684
3685 /* =============================== Replication ============================= */
3686
3687 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
3688 ssize_t nwritten, ret = size;
3689 time_t start = time(NULL);
3690
3691 timeout++;
3692 while(size) {
3693 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
3694 nwritten = write(fd,ptr,size);
3695 if (nwritten == -1) return -1;
3696 ptr += nwritten;
3697 size -= nwritten;
3698 }
3699 if ((time(NULL)-start) > timeout) {
3700 errno = ETIMEDOUT;
3701 return -1;
3702 }
3703 }
3704 return ret;
3705 }
3706
3707 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
3708 ssize_t nread, totread = 0;
3709 time_t start = time(NULL);
3710
3711 timeout++;
3712 while(size) {
3713 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
3714 nread = read(fd,ptr,size);
3715 if (nread == -1) return -1;
3716 ptr += nread;
3717 size -= nread;
3718 totread += nread;
3719 }
3720 if ((time(NULL)-start) > timeout) {
3721 errno = ETIMEDOUT;
3722 return -1;
3723 }
3724 }
3725 return totread;
3726 }
3727
3728 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
3729 ssize_t nread = 0;
3730
3731 size--;
3732 while(size) {
3733 char c;
3734
3735 if (syncRead(fd,&c,1,timeout) == -1) return -1;
3736 if (c == '\n') {
3737 *ptr = '\0';
3738 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
3739 return nread;
3740 } else {
3741 *ptr++ = c;
3742 *ptr = '\0';
3743 nread++;
3744 }
3745 }
3746 return nread;
3747 }
3748
3749 static void syncCommand(redisClient *c) {
3750 /* ignore SYNC if aleady slave or in monitor mode */
3751 if (c->flags & REDIS_SLAVE) return;
3752
3753 /* SYNC can't be issued when the server has pending data to send to
3754 * the client about already issued commands. We need a fresh reply
3755 * buffer registering the differences between the BGSAVE and the current
3756 * dataset, so that we can copy to other slaves if needed. */
3757 if (listLength(c->reply) != 0) {
3758 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3759 return;
3760 }
3761
3762 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
3763 /* Here we need to check if there is a background saving operation
3764 * in progress, or if it is required to start one */
3765 if (server.bgsaveinprogress) {
3766 /* Ok a background save is in progress. Let's check if it is a good
3767 * one for replication, i.e. if there is another slave that is
3768 * registering differences since the server forked to save */
3769 redisClient *slave;
3770 listNode *ln;
3771
3772 listRewind(server.slaves);
3773 while((ln = listYield(server.slaves))) {
3774 slave = ln->value;
3775 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
3776 }
3777 if (ln) {
3778 /* Perfect, the server is already registering differences for
3779 * another slave. Set the right state, and copy the buffer. */
3780 listRelease(c->reply);
3781 c->reply = listDup(slave->reply);
3782 if (!c->reply) oom("listDup copying slave reply list");
3783 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3784 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
3785 } else {
3786 /* No way, we need to wait for the next BGSAVE in order to
3787 * register differences */
3788 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
3789 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
3790 }
3791 } else {
3792 /* Ok we don't have a BGSAVE in progress, let's start one */
3793 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
3794 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3795 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
3796 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
3797 return;
3798 }
3799 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3800 }
3801 c->repldbfd = -1;
3802 c->flags |= REDIS_SLAVE;
3803 c->slaveseldb = 0;
3804 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
3805 return;
3806 }
3807
3808 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
3809 redisClient *slave = privdata;
3810 REDIS_NOTUSED(el);
3811 REDIS_NOTUSED(mask);
3812 char buf[REDIS_IOBUF_LEN];
3813 ssize_t nwritten, buflen;
3814
3815 if (slave->repldboff == 0) {
3816 /* Write the bulk write count before to transfer the DB. In theory here
3817 * we don't know how much room there is in the output buffer of the
3818 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3819 * operations) will never be smaller than the few bytes we need. */
3820 sds bulkcount;
3821
3822 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3823 slave->repldbsize);
3824 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
3825 {
3826 sdsfree(bulkcount);
3827 freeClient(slave);
3828 return;
3829 }
3830 sdsfree(bulkcount);
3831 }
3832 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
3833 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
3834 if (buflen <= 0) {
3835 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
3836 (buflen == 0) ? "premature EOF" : strerror(errno));
3837 freeClient(slave);
3838 return;
3839 }
3840 if ((nwritten = write(fd,buf,buflen)) == -1) {
3841 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
3842 strerror(errno));
3843 freeClient(slave);
3844 return;
3845 }
3846 slave->repldboff += nwritten;
3847 if (slave->repldboff == slave->repldbsize) {
3848 close(slave->repldbfd);
3849 slave->repldbfd = -1;
3850 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3851 slave->replstate = REDIS_REPL_ONLINE;
3852 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
3853 sendReplyToClient, slave, NULL) == AE_ERR) {
3854 freeClient(slave);
3855 return;
3856 }
3857 addReplySds(slave,sdsempty());
3858 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
3859 }
3860 }
3861
3862 static void updateSalvesWaitingBgsave(int bgsaveerr) {
3863 listNode *ln;
3864 int startbgsave = 0;
3865
3866 listRewind(server.slaves);
3867 while((ln = listYield(server.slaves))) {
3868 redisClient *slave = ln->value;
3869
3870 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
3871 startbgsave = 1;
3872 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3873 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
3874 struct stat buf;
3875
3876 if (bgsaveerr != REDIS_OK) {
3877 freeClient(slave);
3878 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
3879 continue;
3880 }
3881 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
3882 fstat(slave->repldbfd,&buf) == -1) {
3883 freeClient(slave);
3884 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
3885 continue;
3886 }
3887 slave->repldboff = 0;
3888 slave->repldbsize = buf.st_size;
3889 slave->replstate = REDIS_REPL_SEND_BULK;
3890 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3891 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
3892 freeClient(slave);
3893 continue;
3894 }
3895 }
3896 }
3897 if (startbgsave) {
3898 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3899 listRewind(server.slaves);
3900 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
3901 while((ln = listYield(server.slaves))) {
3902 redisClient *slave = ln->value;
3903
3904 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
3905 freeClient(slave);
3906 }
3907 }
3908 }
3909 }
3910
3911 static int syncWithMaster(void) {
3912 char buf[1024], tmpfile[256];
3913 int dumpsize;
3914 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
3915 int dfd;
3916
3917 if (fd == -1) {
3918 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
3919 strerror(errno));
3920 return REDIS_ERR;
3921 }
3922 /* Issue the SYNC command */
3923 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
3924 close(fd);
3925 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
3926 strerror(errno));
3927 return REDIS_ERR;
3928 }
3929 /* Read the bulk write count */
3930 if (syncReadLine(fd,buf,1024,3600) == -1) {
3931 close(fd);
3932 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
3933 strerror(errno));
3934 return REDIS_ERR;
3935 }
3936 dumpsize = atoi(buf+1);
3937 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
3938 /* Read the bulk write data on a temp file */
3939 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
3940 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
3941 if (dfd == -1) {
3942 close(fd);
3943 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
3944 return REDIS_ERR;
3945 }
3946 while(dumpsize) {
3947 int nread, nwritten;
3948
3949 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
3950 if (nread == -1) {
3951 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
3952 strerror(errno));
3953 close(fd);
3954 close(dfd);
3955 return REDIS_ERR;
3956 }
3957 nwritten = write(dfd,buf,nread);
3958 if (nwritten == -1) {
3959 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
3960 close(fd);
3961 close(dfd);
3962 return REDIS_ERR;
3963 }
3964 dumpsize -= nread;
3965 }
3966 close(dfd);
3967 if (rename(tmpfile,server.dbfilename) == -1) {
3968 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
3969 unlink(tmpfile);
3970 close(fd);
3971 return REDIS_ERR;
3972 }
3973 emptyDb();
3974 if (rdbLoad(server.dbfilename) != REDIS_OK) {
3975 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
3976 close(fd);
3977 return REDIS_ERR;
3978 }
3979 server.master = createClient(fd);
3980 server.master->flags |= REDIS_MASTER;
3981 server.replstate = REDIS_REPL_CONNECTED;
3982 return REDIS_OK;
3983 }
3984
3985 static void slaveofCommand(redisClient *c) {
3986 if (!strcasecmp(c->argv[1]->ptr,"no") &&
3987 !strcasecmp(c->argv[2]->ptr,"one")) {
3988 if (server.masterhost) {
3989 sdsfree(server.masterhost);
3990 server.masterhost = NULL;
3991 if (server.master) freeClient(server.master);
3992 server.replstate = REDIS_REPL_NONE;
3993 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
3994 }
3995 } else {
3996 sdsfree(server.masterhost);
3997 server.masterhost = sdsdup(c->argv[1]->ptr);
3998 server.masterport = atoi(c->argv[2]->ptr);
3999 if (server.master) freeClient(server.master);
4000 server.replstate = REDIS_REPL_CONNECT;
4001 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4002 server.masterhost, server.masterport);
4003 }
4004 addReply(c,shared.ok);
4005 }
4006
4007 /* ============================ Maxmemory directive ======================== */
4008
4009 /* This function gets called when 'maxmemory' is set on the config file to limit
4010 * the max memory used by the server, and we are out of memory.
4011 * This function will try to, in order:
4012 *
4013 * - Free objects from the free list
4014 * - Try to remove keys with an EXPIRE set
4015 *
4016 * It is not possible to free enough memory to reach used-memory < maxmemory
4017 * the server will start refusing commands that will enlarge even more the
4018 * memory usage.
4019 */
4020 static void freeMemoryIfNeeded(void) {
4021 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4022 if (listLength(server.objfreelist)) {
4023 robj *o;
4024
4025 listNode *head = listFirst(server.objfreelist);
4026 o = listNodeValue(head);
4027 listDelNode(server.objfreelist,head);
4028 zfree(o);
4029 } else {
4030 int j, k, freed = 0;
4031
4032 for (j = 0; j < server.dbnum; j++) {
4033 int minttl = -1;
4034 robj *minkey = NULL;
4035 struct dictEntry *de;
4036
4037 if (dictSize(server.db[j].expires)) {
4038 freed = 1;
4039 /* From a sample of three keys drop the one nearest to
4040 * the natural expire */
4041 for (k = 0; k < 3; k++) {
4042 time_t t;
4043
4044 de = dictGetRandomKey(server.db[j].expires);
4045 t = (time_t) dictGetEntryVal(de);
4046 if (minttl == -1 || t < minttl) {
4047 minkey = dictGetEntryKey(de);
4048 minttl = t;
4049 }
4050 }
4051 deleteKey(server.db+j,minkey);
4052 }
4053 }
4054 if (!freed) return; /* nothing to free... */
4055 }
4056 }
4057 }
4058
4059 /* ================================= Debugging ============================== */
4060
4061 static void debugCommand(redisClient *c) {
4062 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4063 *((char*)-1) = 'x';
4064 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4065 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4066 robj *key, *val;
4067
4068 if (!de) {
4069 addReply(c,shared.nokeyerr);
4070 return;
4071 }
4072 key = dictGetEntryKey(de);
4073 val = dictGetEntryVal(de);
4074 addReplySds(c,sdscatprintf(sdsempty(),
4075 "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
4076 key, key->refcount, val, val->refcount));
4077 } else {
4078 addReplySds(c,sdsnew(
4079 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4080 }
4081 }
4082
4083 /* =================================== Main! ================================ */
4084
4085 #ifdef __linux__
4086 int linuxOvercommitMemoryValue(void) {
4087 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4088 char buf[64];
4089
4090 if (!fp) return -1;
4091 if (fgets(buf,64,fp) == NULL) {
4092 fclose(fp);
4093 return -1;
4094 }
4095 fclose(fp);
4096
4097 return atoi(buf);
4098 }
4099
4100 void linuxOvercommitMemoryWarning(void) {
4101 if (linuxOvercommitMemoryValue() == 0) {
4102 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4103 }
4104 }
4105 #endif /* __linux__ */
4106
4107 static void daemonize(void) {
4108 int fd;
4109 FILE *fp;
4110
4111 if (fork() != 0) exit(0); /* parent exits */
4112 setsid(); /* create a new session */
4113
4114 /* Every output goes to /dev/null. If Redis is daemonized but
4115 * the 'logfile' is set to 'stdout' in the configuration file
4116 * it will not log at all. */
4117 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4118 dup2(fd, STDIN_FILENO);
4119 dup2(fd, STDOUT_FILENO);
4120 dup2(fd, STDERR_FILENO);
4121 if (fd > STDERR_FILENO) close(fd);
4122 }
4123 /* Try to write the pid file */
4124 fp = fopen(server.pidfile,"w");
4125 if (fp) {
4126 fprintf(fp,"%d\n",getpid());
4127 fclose(fp);
4128 }
4129 }
4130
4131 static void segvHandler (int sig, siginfo_t *info, void *secret) {
4132
4133 void *trace[16];
4134 char **messages = (char **)NULL;
4135 int i, trace_size = 0;
4136 ucontext_t *uc = (ucontext_t *)secret;
4137
4138 /* Do something useful with siginfo_t */
4139 if (sig == SIGSEGV)
4140 printf("Got signal %d, faulty address is %p, from %p\n", sig, info->si_addr,
4141 (void *)uc->uc_mcontext.gregs[REG_EIP]);
4142 else
4143 printf("Got signal %d\n", sig);
4144
4145 trace_size = backtrace(trace, 16);
4146 /* overwrite sigaction with caller's address */
4147 trace[1] = (void *) uc->uc_mcontext.gregs[REG_EIP];
4148
4149 messages = backtrace_symbols(trace, trace_size);
4150 /* skip first stack frame (points here) */
4151 printf("[bt] Execution path:\n");
4152 for (i=1; i<trace_size; ++i)
4153 printf("[bt] %s\n", messages[i]);
4154
4155 exit(0);
4156 }
4157
4158 void setupSigAction(){
4159 struct sigaction act;
4160 sigemptyset (&act.sa_mask);
4161 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used. Otherwise, sa_handler is used */
4162 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
4163 act.sa_sigaction = segvHandler;
4164 sigaction (SIGSEGV, &act, NULL);
4165 }
4166
4167 int main(int argc, char **argv) {
4168 setupSigAction();
4169 #ifdef __linux__
4170 linuxOvercommitMemoryWarning();
4171 #endif
4172
4173 initServerConfig();
4174 if (argc == 2) {
4175 ResetServerSaveParams();
4176 loadServerConfig(argv[1]);
4177 } else if (argc > 2) {
4178 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4179 exit(1);
4180 } else {
4181 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4182 }
4183 initServer();
4184 if (server.daemonize) daemonize();
4185 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4186 if (rdbLoad(server.dbfilename) == REDIS_OK)
4187 redisLog(REDIS_NOTICE,"DB loaded from disk");
4188 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4189 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4190 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4191 aeMain(server.el);
4192 aeDeleteEventLoop(server.el);
4193 return 0;
4194 }