]> git.saurik.com Git - redis.git/blob - redis.c
add compile options to debug
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "0.101"
31
32 #include "fmacros.h"
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 #define __USE_POSIX199309
40 #include <signal.h>
41 #include <execinfo.h>
42 #include <ucontext.h>
43 #include <sys/wait.h>
44 #include <errno.h>
45 #include <assert.h>
46 #include <ctype.h>
47 #include <stdarg.h>
48 #include <inttypes.h>
49 #include <arpa/inet.h>
50 #include <sys/stat.h>
51 #include <fcntl.h>
52 #include <sys/time.h>
53 #include <sys/resource.h>
54 #include <limits.h>
55 #include <execinfo.h>
56
57 #include "redis.h"
58 #include "ae.h" /* Event driven programming library */
59 #include "sds.h" /* Dynamic safe strings */
60 #include "anet.h" /* Networking the easy way */
61 #include "dict.h" /* Hash tables */
62 #include "adlist.h" /* Linked lists */
63 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
64 #include "lzf.h" /* LZF compression library */
65 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
66
67 /* Error codes */
68 #define REDIS_OK 0
69 #define REDIS_ERR -1
70
71 /* Static server configuration */
72 #define REDIS_SERVERPORT 6379 /* TCP port */
73 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
74 #define REDIS_IOBUF_LEN 1024
75 #define REDIS_LOADBUF_LEN 1024
76 #define REDIS_STATIC_ARGS 4
77 #define REDIS_DEFAULT_DBNUM 16
78 #define REDIS_CONFIGLINE_MAX 1024
79 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
80 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
81 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
82
83 /* Hash table parameters */
84 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
85 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
86
87 /* Command flags */
88 #define REDIS_CMD_BULK 1 /* Bulk write command */
89 #define REDIS_CMD_INLINE 2 /* Inline command */
90 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
91 this flags will return an error when the 'maxmemory' option is set in the
92 config file and the server is using more than maxmemory bytes of memory.
93 In short this commands are denied on low memory conditions. */
94 #define REDIS_CMD_DENYOOM 4
95
96 /* Object types */
97 #define REDIS_STRING 0
98 #define REDIS_LIST 1
99 #define REDIS_SET 2
100 #define REDIS_HASH 3
101
102 /* Object types only used for dumping to disk */
103 #define REDIS_EXPIRETIME 253
104 #define REDIS_SELECTDB 254
105 #define REDIS_EOF 255
106
107 /* Defines related to the dump file format. To store 32 bits lengths for short
108 * keys requires a lot of space, so we check the most significant 2 bits of
109 * the first byte to interpreter the length:
110 *
111 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
112 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
113 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
114 * 11|000000 this means: specially encoded object will follow. The six bits
115 * number specify the kind of object that follows.
116 * See the REDIS_RDB_ENC_* defines.
117 *
118 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
119 * values, will fit inside. */
120 #define REDIS_RDB_6BITLEN 0
121 #define REDIS_RDB_14BITLEN 1
122 #define REDIS_RDB_32BITLEN 2
123 #define REDIS_RDB_ENCVAL 3
124 #define REDIS_RDB_LENERR UINT_MAX
125
126 /* When a length of a string object stored on disk has the first two bits
127 * set, the remaining two bits specify a special encoding for the object
128 * accordingly to the following defines: */
129 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
130 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
131 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
132 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
133
134 /* Client flags */
135 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
136 #define REDIS_SLAVE 2 /* This client is a slave server */
137 #define REDIS_MASTER 4 /* This client is a master server */
138 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
139
140 /* Slave replication state - slave side */
141 #define REDIS_REPL_NONE 0 /* No active replication */
142 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
143 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
144
145 /* Slave replication state - from the point of view of master
146 * Note that in SEND_BULK and ONLINE state the slave receives new updates
147 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
148 * to start the next background saving in order to send updates to it. */
149 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
150 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
151 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
152 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
153
154 /* List related stuff */
155 #define REDIS_HEAD 0
156 #define REDIS_TAIL 1
157
158 /* Sort operations */
159 #define REDIS_SORT_GET 0
160 #define REDIS_SORT_DEL 1
161 #define REDIS_SORT_INCR 2
162 #define REDIS_SORT_DECR 3
163 #define REDIS_SORT_ASC 4
164 #define REDIS_SORT_DESC 5
165 #define REDIS_SORTKEY_MAX 1024
166
167 /* Log levels */
168 #define REDIS_DEBUG 0
169 #define REDIS_NOTICE 1
170 #define REDIS_WARNING 2
171
172 /* Anti-warning macro... */
173 #define REDIS_NOTUSED(V) ((void) V)
174 int die() {
175 char *err = NULL;
176 sprintf(err, "gonner");
177 return 0;
178 }
179
180
181 /*================================= Data types ============================== */
182
183 /* A redis object, that is a type able to hold a string / list / set */
184 typedef struct redisObject {
185 void *ptr;
186 int type;
187 int refcount;
188 } robj;
189
190 typedef struct redisDb {
191 dict *dict;
192 dict *expires;
193 int id;
194 } redisDb;
195
196 /* With multiplexing we need to take per-clinet state.
197 * Clients are taken in a liked list. */
198 typedef struct redisClient {
199 int fd;
200 redisDb *db;
201 int dictid;
202 sds querybuf;
203 robj **argv;
204 int argc;
205 int bulklen; /* bulk read len. -1 if not in bulk read mode */
206 list *reply;
207 int sentlen;
208 time_t lastinteraction; /* time of the last interaction, used for timeout */
209 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
210 int slaveseldb; /* slave selected db, if this client is a slave */
211 int authenticated; /* when requirepass is non-NULL */
212 int replstate; /* replication state if this is a slave */
213 int repldbfd; /* replication DB file descriptor */
214 long repldboff; /* replication DB file offset */
215 off_t repldbsize; /* replication DB file size */
216 } redisClient;
217
218 struct saveparam {
219 time_t seconds;
220 int changes;
221 };
222
223 /* Global server state structure */
224 struct redisServer {
225 int port;
226 int fd;
227 redisDb *db;
228 dict *sharingpool;
229 unsigned int sharingpoolsize;
230 long long dirty; /* changes to DB from the last save */
231 list *clients;
232 list *slaves, *monitors;
233 char neterr[ANET_ERR_LEN];
234 aeEventLoop *el;
235 int cronloops; /* number of times the cron function run */
236 list *objfreelist; /* A list of freed objects to avoid malloc() */
237 time_t lastsave; /* Unix time of last save succeeede */
238 size_t usedmemory; /* Used memory in megabytes */
239 /* Fields used only for stats */
240 time_t stat_starttime; /* server start time */
241 long long stat_numcommands; /* number of processed commands */
242 long long stat_numconnections; /* number of connections received */
243 /* Configuration */
244 int verbosity;
245 int glueoutputbuf;
246 int maxidletime;
247 int dbnum;
248 int daemonize;
249 char *pidfile;
250 int bgsaveinprogress;
251 struct saveparam *saveparams;
252 int saveparamslen;
253 char *logfile;
254 char *bindaddr;
255 char *dbfilename;
256 char *requirepass;
257 int shareobjects;
258 /* Replication related */
259 int isslave;
260 char *masterhost;
261 int masterport;
262 redisClient *master; /* client that is master for this slave */
263 int replstate;
264 unsigned int maxclients;
265 unsigned int maxmemory;
266 /* Sort parameters - qsort_r() is only available under BSD so we
267 * have to take this state global, in order to pass it to sortCompare() */
268 int sort_desc;
269 int sort_alpha;
270 int sort_bypattern;
271 };
272
273 typedef void redisCommandProc(redisClient *c);
274 struct redisCommand {
275 char *name;
276 redisCommandProc *proc;
277 int arity;
278 int flags;
279 };
280
281 typedef struct _redisSortObject {
282 robj *obj;
283 union {
284 double score;
285 robj *cmpobj;
286 } u;
287 } redisSortObject;
288
289 typedef struct _redisSortOperation {
290 int type;
291 robj *pattern;
292 } redisSortOperation;
293
294 struct sharedObjectsStruct {
295 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
296 *colon, *nullbulk, *nullmultibulk,
297 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
298 *outofrangeerr, *plus,
299 *select0, *select1, *select2, *select3, *select4,
300 *select5, *select6, *select7, *select8, *select9;
301 } shared;
302
303 /*================================ Prototypes =============================== */
304
305 static void freeStringObject(robj *o);
306 static void freeListObject(robj *o);
307 static void freeSetObject(robj *o);
308 static void decrRefCount(void *o);
309 static robj *createObject(int type, void *ptr);
310 static void freeClient(redisClient *c);
311 static int rdbLoad(char *filename);
312 static void addReply(redisClient *c, robj *obj);
313 static void addReplySds(redisClient *c, sds s);
314 static void incrRefCount(robj *o);
315 static int rdbSaveBackground(char *filename);
316 static robj *createStringObject(char *ptr, size_t len);
317 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
318 static int syncWithMaster(void);
319 static robj *tryObjectSharing(robj *o);
320 static int removeExpire(redisDb *db, robj *key);
321 static int expireIfNeeded(redisDb *db, robj *key);
322 static int deleteIfVolatile(redisDb *db, robj *key);
323 static int deleteKey(redisDb *db, robj *key);
324 static time_t getExpire(redisDb *db, robj *key);
325 static int setExpire(redisDb *db, robj *key, time_t when);
326 static void updateSalvesWaitingBgsave(int bgsaveerr);
327 static void freeMemoryIfNeeded(void);
328
329 static void authCommand(redisClient *c);
330 static void pingCommand(redisClient *c);
331 static void echoCommand(redisClient *c);
332 static void setCommand(redisClient *c);
333 static void setnxCommand(redisClient *c);
334 static void getCommand(redisClient *c);
335 static void delCommand(redisClient *c);
336 static void existsCommand(redisClient *c);
337 static void incrCommand(redisClient *c);
338 static void decrCommand(redisClient *c);
339 static void incrbyCommand(redisClient *c);
340 static void decrbyCommand(redisClient *c);
341 static void selectCommand(redisClient *c);
342 static void randomkeyCommand(redisClient *c);
343 static void keysCommand(redisClient *c);
344 static void dbsizeCommand(redisClient *c);
345 static void lastsaveCommand(redisClient *c);
346 static void saveCommand(redisClient *c);
347 static void bgsaveCommand(redisClient *c);
348 static void shutdownCommand(redisClient *c);
349 static void moveCommand(redisClient *c);
350 static void renameCommand(redisClient *c);
351 static void renamenxCommand(redisClient *c);
352 static void lpushCommand(redisClient *c);
353 static void rpushCommand(redisClient *c);
354 static void lpopCommand(redisClient *c);
355 static void rpopCommand(redisClient *c);
356 static void llenCommand(redisClient *c);
357 static void lindexCommand(redisClient *c);
358 static void lrangeCommand(redisClient *c);
359 static void ltrimCommand(redisClient *c);
360 static void typeCommand(redisClient *c);
361 static void lsetCommand(redisClient *c);
362 static void saddCommand(redisClient *c);
363 static void sremCommand(redisClient *c);
364 static void smoveCommand(redisClient *c);
365 static void sismemberCommand(redisClient *c);
366 static void scardCommand(redisClient *c);
367 static void sinterCommand(redisClient *c);
368 static void sinterstoreCommand(redisClient *c);
369 static void sunionCommand(redisClient *c);
370 static void sunionstoreCommand(redisClient *c);
371 static void sdiffCommand(redisClient *c);
372 static void sdiffstoreCommand(redisClient *c);
373 static void syncCommand(redisClient *c);
374 static void flushdbCommand(redisClient *c);
375 static void flushallCommand(redisClient *c);
376 static void sortCommand(redisClient *c);
377 static void lremCommand(redisClient *c);
378 static void infoCommand(redisClient *c);
379 static void mgetCommand(redisClient *c);
380 static void monitorCommand(redisClient *c);
381 static void expireCommand(redisClient *c);
382 static void getSetCommand(redisClient *c);
383 static void ttlCommand(redisClient *c);
384 static void slaveofCommand(redisClient *c);
385 static void debugCommand(redisClient *c);
386
387 /*================================= Globals ================================= */
388
389 /* Global vars */
390 static struct redisServer server; /* server global state */
391 static struct redisCommand cmdTable[] = {
392 {"get",getCommand,2,REDIS_CMD_INLINE},
393 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
394 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
395 {"del",delCommand,-2,REDIS_CMD_INLINE},
396 {"exists",existsCommand,2,REDIS_CMD_INLINE},
397 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
398 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
399 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
400 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
401 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
402 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
403 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
404 {"llen",llenCommand,2,REDIS_CMD_INLINE},
405 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
406 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
407 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
408 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
409 {"lrem",lremCommand,4,REDIS_CMD_BULK},
410 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
411 {"srem",sremCommand,3,REDIS_CMD_BULK},
412 {"smove",smoveCommand,4,REDIS_CMD_BULK},
413 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
414 {"scard",scardCommand,2,REDIS_CMD_INLINE},
415 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
416 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
417 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
418 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
419 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
420 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
421 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
422 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
423 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
424 {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
425 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
426 {"select",selectCommand,2,REDIS_CMD_INLINE},
427 {"move",moveCommand,3,REDIS_CMD_INLINE},
428 {"rename",renameCommand,3,REDIS_CMD_INLINE},
429 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
430 {"expire",expireCommand,3,REDIS_CMD_INLINE},
431 {"keys",keysCommand,2,REDIS_CMD_INLINE},
432 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
433 {"auth",authCommand,2,REDIS_CMD_INLINE},
434 {"ping",pingCommand,1,REDIS_CMD_INLINE},
435 {"echo",echoCommand,2,REDIS_CMD_BULK},
436 {"save",saveCommand,1,REDIS_CMD_INLINE},
437 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
438 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
439 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
440 {"type",typeCommand,2,REDIS_CMD_INLINE},
441 {"sync",syncCommand,1,REDIS_CMD_INLINE},
442 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
443 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
444 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
445 {"info",infoCommand,1,REDIS_CMD_INLINE},
446 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
447 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
448 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
449 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
450 {NULL,NULL,0,0}
451 };
452
453 /*============================ Utility functions ============================ */
454
455 /* Glob-style pattern matching. */
456 int stringmatchlen(const char *pattern, int patternLen,
457 const char *string, int stringLen, int nocase)
458 {
459 while(patternLen) {
460 switch(pattern[0]) {
461 case '*':
462 while (pattern[1] == '*') {
463 pattern++;
464 patternLen--;
465 }
466 if (patternLen == 1)
467 return 1; /* match */
468 while(stringLen) {
469 if (stringmatchlen(pattern+1, patternLen-1,
470 string, stringLen, nocase))
471 return 1; /* match */
472 string++;
473 stringLen--;
474 }
475 return 0; /* no match */
476 break;
477 case '?':
478 if (stringLen == 0)
479 return 0; /* no match */
480 string++;
481 stringLen--;
482 break;
483 case '[':
484 {
485 int not, match;
486
487 pattern++;
488 patternLen--;
489 not = pattern[0] == '^';
490 if (not) {
491 pattern++;
492 patternLen--;
493 }
494 match = 0;
495 while(1) {
496 if (pattern[0] == '\\') {
497 pattern++;
498 patternLen--;
499 if (pattern[0] == string[0])
500 match = 1;
501 } else if (pattern[0] == ']') {
502 break;
503 } else if (patternLen == 0) {
504 pattern--;
505 patternLen++;
506 break;
507 } else if (pattern[1] == '-' && patternLen >= 3) {
508 int start = pattern[0];
509 int end = pattern[2];
510 int c = string[0];
511 if (start > end) {
512 int t = start;
513 start = end;
514 end = t;
515 }
516 if (nocase) {
517 start = tolower(start);
518 end = tolower(end);
519 c = tolower(c);
520 }
521 pattern += 2;
522 patternLen -= 2;
523 if (c >= start && c <= end)
524 match = 1;
525 } else {
526 if (!nocase) {
527 if (pattern[0] == string[0])
528 match = 1;
529 } else {
530 if (tolower((int)pattern[0]) == tolower((int)string[0]))
531 match = 1;
532 }
533 }
534 pattern++;
535 patternLen--;
536 }
537 if (not)
538 match = !match;
539 if (!match)
540 return 0; /* no match */
541 string++;
542 stringLen--;
543 break;
544 }
545 case '\\':
546 if (patternLen >= 2) {
547 pattern++;
548 patternLen--;
549 }
550 /* fall through */
551 default:
552 if (!nocase) {
553 if (pattern[0] != string[0])
554 return 0; /* no match */
555 } else {
556 if (tolower((int)pattern[0]) != tolower((int)string[0]))
557 return 0; /* no match */
558 }
559 string++;
560 stringLen--;
561 break;
562 }
563 pattern++;
564 patternLen--;
565 if (stringLen == 0) {
566 while(*pattern == '*') {
567 pattern++;
568 patternLen--;
569 }
570 break;
571 }
572 }
573 if (patternLen == 0 && stringLen == 0)
574 return 1;
575 return 0;
576 }
577
578 void redisLog(int level, const char *fmt, ...)
579 {
580 va_list ap;
581 FILE *fp;
582
583 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
584 if (!fp) return;
585
586 va_start(ap, fmt);
587 if (level >= server.verbosity) {
588 char *c = ".-*";
589 char buf[64];
590 time_t now;
591
592 now = time(NULL);
593 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
594 fprintf(fp,"%s %c ",buf,c[level]);
595 vfprintf(fp, fmt, ap);
596 fprintf(fp,"\n");
597 fflush(fp);
598 }
599 va_end(ap);
600
601 if (server.logfile) fclose(fp);
602 }
603
604 /*====================== Hash table type implementation ==================== */
605
606 /* This is an hash table type that uses the SDS dynamic strings libary as
607 * keys and radis objects as values (objects can hold SDS strings,
608 * lists, sets). */
609
610 static int sdsDictKeyCompare(void *privdata, const void *key1,
611 const void *key2)
612 {
613 int l1,l2;
614 DICT_NOTUSED(privdata);
615
616 l1 = sdslen((sds)key1);
617 l2 = sdslen((sds)key2);
618 if (l1 != l2) return 0;
619 return memcmp(key1, key2, l1) == 0;
620 }
621
622 static void dictRedisObjectDestructor(void *privdata, void *val)
623 {
624 DICT_NOTUSED(privdata);
625
626 decrRefCount(val);
627 }
628
629 static int dictSdsKeyCompare(void *privdata, const void *key1,
630 const void *key2)
631 {
632 const robj *o1 = key1, *o2 = key2;
633 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
634 }
635
636 static unsigned int dictSdsHash(const void *key) {
637 const robj *o = key;
638 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
639 }
640
641 static dictType setDictType = {
642 dictSdsHash, /* hash function */
643 NULL, /* key dup */
644 NULL, /* val dup */
645 dictSdsKeyCompare, /* key compare */
646 dictRedisObjectDestructor, /* key destructor */
647 NULL /* val destructor */
648 };
649
650 static dictType hashDictType = {
651 dictSdsHash, /* hash function */
652 NULL, /* key dup */
653 NULL, /* val dup */
654 dictSdsKeyCompare, /* key compare */
655 dictRedisObjectDestructor, /* key destructor */
656 dictRedisObjectDestructor /* val destructor */
657 };
658
659 /* ========================= Random utility functions ======================= */
660
661 /* Redis generally does not try to recover from out of memory conditions
662 * when allocating objects or strings, it is not clear if it will be possible
663 * to report this condition to the client since the networking layer itself
664 * is based on heap allocation for send buffers, so we simply abort.
665 * At least the code will be simpler to read... */
666 static void oom(const char *msg) {
667 fprintf(stderr, "%s: Out of memory\n",msg);
668 fflush(stderr);
669 sleep(1);
670 abort();
671 }
672
673 /* ====================== Redis server networking stuff ===================== */
674 void closeTimedoutClients(void) {
675 redisClient *c;
676 listNode *ln;
677 time_t now = time(NULL);
678
679 listRewind(server.clients);
680 while ((ln = listYield(server.clients)) != NULL) {
681 c = listNodeValue(ln);
682 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
683 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
684 (now - c->lastinteraction > server.maxidletime)) {
685 redisLog(REDIS_DEBUG,"Closing idle client");
686 freeClient(c);
687 }
688 }
689 }
690
691 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
692 * we resize the hash table to save memory */
693 void tryResizeHashTables(void) {
694 int j;
695
696 for (j = 0; j < server.dbnum; j++) {
697 long long size, used;
698
699 size = dictSlots(server.db[j].dict);
700 used = dictSize(server.db[j].dict);
701 if (size && used && size > REDIS_HT_MINSLOTS &&
702 (used*100/size < REDIS_HT_MINFILL)) {
703 redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
704 dictResize(server.db[j].dict);
705 redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
706 }
707 }
708 }
709
710 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
711 int j, loops = server.cronloops++;
712 REDIS_NOTUSED(eventLoop);
713 REDIS_NOTUSED(id);
714 REDIS_NOTUSED(clientData);
715
716 /* Update the global state with the amount of used memory */
717 server.usedmemory = zmalloc_used_memory();
718
719 /* Show some info about non-empty databases */
720 for (j = 0; j < server.dbnum; j++) {
721 long long size, used, vkeys;
722
723 size = dictSlots(server.db[j].dict);
724 used = dictSize(server.db[j].dict);
725 vkeys = dictSize(server.db[j].expires);
726 if (!(loops % 5) && used > 0) {
727 redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size);
728 /* dictPrintStats(server.dict); */
729 }
730 }
731
732 /* We don't want to resize the hash tables while a bacground saving
733 * is in progress: the saving child is created using fork() that is
734 * implemented with a copy-on-write semantic in most modern systems, so
735 * if we resize the HT while there is the saving child at work actually
736 * a lot of memory movements in the parent will cause a lot of pages
737 * copied. */
738 if (!server.bgsaveinprogress) tryResizeHashTables();
739
740 /* Show information about connected clients */
741 if (!(loops % 5)) {
742 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use",
743 listLength(server.clients)-listLength(server.slaves),
744 listLength(server.slaves),
745 server.usedmemory,
746 dictSize(server.sharingpool));
747 }
748
749 /* Close connections of timedout clients */
750 if (server.maxidletime && !(loops % 10))
751 closeTimedoutClients();
752
753 /* Check if a background saving in progress terminated */
754 if (server.bgsaveinprogress) {
755 int statloc;
756 /* XXX: TODO handle the case of the saving child killed */
757 if (wait4(-1,&statloc,WNOHANG,NULL)) {
758 int exitcode = WEXITSTATUS(statloc);
759 if (exitcode == 0) {
760 redisLog(REDIS_NOTICE,
761 "Background saving terminated with success");
762 server.dirty = 0;
763 server.lastsave = time(NULL);
764 } else {
765 redisLog(REDIS_WARNING,
766 "Background saving error");
767 }
768 server.bgsaveinprogress = 0;
769 updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
770 }
771 } else {
772 /* If there is not a background saving in progress check if
773 * we have to save now */
774 time_t now = time(NULL);
775 for (j = 0; j < server.saveparamslen; j++) {
776 struct saveparam *sp = server.saveparams+j;
777
778 if (server.dirty >= sp->changes &&
779 now-server.lastsave > sp->seconds) {
780 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
781 sp->changes, sp->seconds);
782 rdbSaveBackground(server.dbfilename);
783 break;
784 }
785 }
786 }
787
788 /* Try to expire a few timed out keys */
789 for (j = 0; j < server.dbnum; j++) {
790 redisDb *db = server.db+j;
791 int num = dictSize(db->expires);
792
793 if (num) {
794 time_t now = time(NULL);
795
796 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
797 num = REDIS_EXPIRELOOKUPS_PER_CRON;
798 while (num--) {
799 dictEntry *de;
800 time_t t;
801
802 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
803 t = (time_t) dictGetEntryVal(de);
804 if (now > t) {
805 deleteKey(db,dictGetEntryKey(de));
806 }
807 }
808 }
809 }
810
811 /* Check if we should connect to a MASTER */
812 if (server.replstate == REDIS_REPL_CONNECT) {
813 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
814 if (syncWithMaster() == REDIS_OK) {
815 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
816 }
817 }
818 return 1000;
819 }
820
821 static void createSharedObjects(void) {
822 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
823 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
824 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
825 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
826 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
827 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
828 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
829 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
830 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
831 /* no such key */
832 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
833 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
834 "-ERR Operation against a key holding the wrong kind of value\r\n"));
835 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
836 "-ERR no such key\r\n"));
837 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
838 "-ERR syntax error\r\n"));
839 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
840 "-ERR source and destination objects are the same\r\n"));
841 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
842 "-ERR index out of range\r\n"));
843 shared.space = createObject(REDIS_STRING,sdsnew(" "));
844 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
845 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
846 shared.select0 = createStringObject("select 0\r\n",10);
847 shared.select1 = createStringObject("select 1\r\n",10);
848 shared.select2 = createStringObject("select 2\r\n",10);
849 shared.select3 = createStringObject("select 3\r\n",10);
850 shared.select4 = createStringObject("select 4\r\n",10);
851 shared.select5 = createStringObject("select 5\r\n",10);
852 shared.select6 = createStringObject("select 6\r\n",10);
853 shared.select7 = createStringObject("select 7\r\n",10);
854 shared.select8 = createStringObject("select 8\r\n",10);
855 shared.select9 = createStringObject("select 9\r\n",10);
856 }
857
858 static void appendServerSaveParams(time_t seconds, int changes) {
859 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
860 if (server.saveparams == NULL) oom("appendServerSaveParams");
861 server.saveparams[server.saveparamslen].seconds = seconds;
862 server.saveparams[server.saveparamslen].changes = changes;
863 server.saveparamslen++;
864 }
865
866 static void ResetServerSaveParams() {
867 zfree(server.saveparams);
868 server.saveparams = NULL;
869 server.saveparamslen = 0;
870 }
871
872 static void initServerConfig() {
873 server.dbnum = REDIS_DEFAULT_DBNUM;
874 server.port = REDIS_SERVERPORT;
875 server.verbosity = REDIS_DEBUG;
876 server.maxidletime = REDIS_MAXIDLETIME;
877 server.saveparams = NULL;
878 server.logfile = NULL; /* NULL = log on standard output */
879 server.bindaddr = NULL;
880 server.glueoutputbuf = 1;
881 server.daemonize = 0;
882 server.pidfile = "/var/run/redis.pid";
883 server.dbfilename = "dump.rdb";
884 server.requirepass = NULL;
885 server.shareobjects = 0;
886 server.maxclients = 0;
887 server.maxmemory = 0;
888 ResetServerSaveParams();
889
890 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
891 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
892 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
893 /* Replication related */
894 server.isslave = 0;
895 server.masterhost = NULL;
896 server.masterport = 6379;
897 server.master = NULL;
898 server.replstate = REDIS_REPL_NONE;
899 }
900
901 static void initServer() {
902 int j;
903
904 signal(SIGHUP, SIG_IGN);
905 signal(SIGPIPE, SIG_IGN);
906
907 server.clients = listCreate();
908 server.slaves = listCreate();
909 server.monitors = listCreate();
910 server.objfreelist = listCreate();
911 createSharedObjects();
912 server.el = aeCreateEventLoop();
913 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
914 server.sharingpool = dictCreate(&setDictType,NULL);
915 server.sharingpoolsize = 1024;
916 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
917 oom("server initialization"); /* Fatal OOM */
918 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
919 if (server.fd == -1) {
920 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
921 exit(1);
922 }
923 for (j = 0; j < server.dbnum; j++) {
924 server.db[j].dict = dictCreate(&hashDictType,NULL);
925 server.db[j].expires = dictCreate(&setDictType,NULL);
926 server.db[j].id = j;
927 }
928 server.cronloops = 0;
929 server.bgsaveinprogress = 0;
930 server.lastsave = time(NULL);
931 server.dirty = 0;
932 server.usedmemory = 0;
933 server.stat_numcommands = 0;
934 server.stat_numconnections = 0;
935 server.stat_starttime = time(NULL);
936 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
937 }
938
939 /* Empty the whole database */
940 static long long emptyDb() {
941 int j;
942 long long removed = 0;
943
944 for (j = 0; j < server.dbnum; j++) {
945 removed += dictSize(server.db[j].dict);
946 dictEmpty(server.db[j].dict);
947 dictEmpty(server.db[j].expires);
948 }
949 return removed;
950 }
951
952 static int yesnotoi(char *s) {
953 if (!strcasecmp(s,"yes")) return 1;
954 else if (!strcasecmp(s,"no")) return 0;
955 else return -1;
956 }
957
958 /* I agree, this is a very rudimental way to load a configuration...
959 will improve later if the config gets more complex */
960 static void loadServerConfig(char *filename) {
961 FILE *fp = fopen(filename,"r");
962 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
963 int linenum = 0;
964 sds line = NULL;
965
966 if (!fp) {
967 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
968 exit(1);
969 }
970 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
971 sds *argv;
972 int argc, j;
973
974 linenum++;
975 line = sdsnew(buf);
976 line = sdstrim(line," \t\r\n");
977
978 /* Skip comments and blank lines*/
979 if (line[0] == '#' || line[0] == '\0') {
980 sdsfree(line);
981 continue;
982 }
983
984 /* Split into arguments */
985 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
986 sdstolower(argv[0]);
987
988 /* Execute config directives */
989 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
990 server.maxidletime = atoi(argv[1]);
991 if (server.maxidletime < 0) {
992 err = "Invalid timeout value"; goto loaderr;
993 }
994 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
995 server.port = atoi(argv[1]);
996 if (server.port < 1 || server.port > 65535) {
997 err = "Invalid port"; goto loaderr;
998 }
999 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1000 server.bindaddr = zstrdup(argv[1]);
1001 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1002 int seconds = atoi(argv[1]);
1003 int changes = atoi(argv[2]);
1004 if (seconds < 1 || changes < 0) {
1005 err = "Invalid save parameters"; goto loaderr;
1006 }
1007 appendServerSaveParams(seconds,changes);
1008 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1009 if (chdir(argv[1]) == -1) {
1010 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1011 argv[1], strerror(errno));
1012 exit(1);
1013 }
1014 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1015 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1016 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1017 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1018 else {
1019 err = "Invalid log level. Must be one of debug, notice, warning";
1020 goto loaderr;
1021 }
1022 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1023 FILE *fp;
1024
1025 server.logfile = zstrdup(argv[1]);
1026 if (!strcasecmp(server.logfile,"stdout")) {
1027 zfree(server.logfile);
1028 server.logfile = NULL;
1029 }
1030 if (server.logfile) {
1031 /* Test if we are able to open the file. The server will not
1032 * be able to abort just for this problem later... */
1033 fp = fopen(server.logfile,"a");
1034 if (fp == NULL) {
1035 err = sdscatprintf(sdsempty(),
1036 "Can't open the log file: %s", strerror(errno));
1037 goto loaderr;
1038 }
1039 fclose(fp);
1040 }
1041 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1042 server.dbnum = atoi(argv[1]);
1043 if (server.dbnum < 1) {
1044 err = "Invalid number of databases"; goto loaderr;
1045 }
1046 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1047 server.maxclients = atoi(argv[1]);
1048 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1049 server.maxmemory = atoi(argv[1]);
1050 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1051 server.masterhost = sdsnew(argv[1]);
1052 server.masterport = atoi(argv[2]);
1053 server.replstate = REDIS_REPL_CONNECT;
1054 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1055 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1056 err = "argument must be 'yes' or 'no'"; goto loaderr;
1057 }
1058 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1059 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1060 err = "argument must be 'yes' or 'no'"; goto loaderr;
1061 }
1062 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1063 server.sharingpoolsize = atoi(argv[1]);
1064 if (server.sharingpoolsize < 1) {
1065 err = "invalid object sharing pool size"; goto loaderr;
1066 }
1067 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1068 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1069 err = "argument must be 'yes' or 'no'"; goto loaderr;
1070 }
1071 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1072 server.requirepass = zstrdup(argv[1]);
1073 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1074 server.pidfile = zstrdup(argv[1]);
1075 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1076 server.dbfilename = zstrdup(argv[1]);
1077 } else {
1078 err = "Bad directive or wrong number of arguments"; goto loaderr;
1079 }
1080 for (j = 0; j < argc; j++)
1081 sdsfree(argv[j]);
1082 zfree(argv);
1083 sdsfree(line);
1084 }
1085 fclose(fp);
1086 return;
1087
1088 loaderr:
1089 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1090 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1091 fprintf(stderr, ">>> '%s'\n", line);
1092 fprintf(stderr, "%s\n", err);
1093 exit(1);
1094 }
1095
1096 static void freeClientArgv(redisClient *c) {
1097 int j;
1098
1099 for (j = 0; j < c->argc; j++)
1100 decrRefCount(c->argv[j]);
1101 c->argc = 0;
1102 }
1103
1104 static void freeClient(redisClient *c) {
1105 listNode *ln;
1106
1107 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1108 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1109 sdsfree(c->querybuf);
1110 listRelease(c->reply);
1111 freeClientArgv(c);
1112 close(c->fd);
1113 ln = listSearchKey(server.clients,c);
1114 assert(ln != NULL);
1115 listDelNode(server.clients,ln);
1116 if (c->flags & REDIS_SLAVE) {
1117 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1118 close(c->repldbfd);
1119 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1120 ln = listSearchKey(l,c);
1121 assert(ln != NULL);
1122 listDelNode(l,ln);
1123 }
1124 if (c->flags & REDIS_MASTER) {
1125 server.master = NULL;
1126 server.replstate = REDIS_REPL_CONNECT;
1127 }
1128 zfree(c->argv);
1129 zfree(c);
1130 }
1131
1132 static void glueReplyBuffersIfNeeded(redisClient *c) {
1133 int totlen = 0;
1134 listNode *ln;
1135 robj *o;
1136
1137 listRewind(c->reply);
1138 while((ln = listYield(c->reply))) {
1139 o = ln->value;
1140 totlen += sdslen(o->ptr);
1141 /* This optimization makes more sense if we don't have to copy
1142 * too much data */
1143 if (totlen > 1024) return;
1144 }
1145 if (totlen > 0) {
1146 char buf[1024];
1147 int copylen = 0;
1148
1149 listRewind(c->reply);
1150 while((ln = listYield(c->reply))) {
1151 o = ln->value;
1152 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1153 copylen += sdslen(o->ptr);
1154 listDelNode(c->reply,ln);
1155 }
1156 /* Now the output buffer is empty, add the new single element */
1157 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1158 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1159 }
1160 }
1161
1162 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1163 redisClient *c = privdata;
1164 int nwritten = 0, totwritten = 0, objlen;
1165 robj *o;
1166 REDIS_NOTUSED(el);
1167 REDIS_NOTUSED(mask);
1168
1169 if (server.glueoutputbuf && listLength(c->reply) > 1)
1170 glueReplyBuffersIfNeeded(c);
1171 while(listLength(c->reply)) {
1172 o = listNodeValue(listFirst(c->reply));
1173 objlen = sdslen(o->ptr);
1174
1175 if (objlen == 0) {
1176 listDelNode(c->reply,listFirst(c->reply));
1177 continue;
1178 }
1179
1180 if (c->flags & REDIS_MASTER) {
1181 nwritten = objlen - c->sentlen;
1182 } else {
1183 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1184 if (nwritten <= 0) break;
1185 }
1186 c->sentlen += nwritten;
1187 totwritten += nwritten;
1188 /* If we fully sent the object on head go to the next one */
1189 if (c->sentlen == objlen) {
1190 listDelNode(c->reply,listFirst(c->reply));
1191 c->sentlen = 0;
1192 }
1193 }
1194 if (nwritten == -1) {
1195 if (errno == EAGAIN) {
1196 nwritten = 0;
1197 } else {
1198 redisLog(REDIS_DEBUG,
1199 "Error writing to client: %s", strerror(errno));
1200 freeClient(c);
1201 return;
1202 }
1203 }
1204 if (totwritten > 0) c->lastinteraction = time(NULL);
1205 if (listLength(c->reply) == 0) {
1206 c->sentlen = 0;
1207 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1208 }
1209 }
1210
1211 static struct redisCommand *lookupCommand(char *name) {
1212 int j = 0;
1213 while(cmdTable[j].name != NULL) {
1214 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1215 j++;
1216 }
1217 return NULL;
1218 }
1219
1220 /* resetClient prepare the client to process the next command */
1221 static void resetClient(redisClient *c) {
1222 freeClientArgv(c);
1223 c->bulklen = -1;
1224 }
1225
1226 /* If this function gets called we already read a whole
1227 * command, argments are in the client argv/argc fields.
1228 * processCommand() execute the command or prepare the
1229 * server for a bulk read from the client.
1230 *
1231 * If 1 is returned the client is still alive and valid and
1232 * and other operations can be performed by the caller. Otherwise
1233 * if 0 is returned the client was destroied (i.e. after QUIT). */
1234 static int processCommand(redisClient *c) {
1235 struct redisCommand *cmd;
1236 long long dirty;
1237
1238 /* Free some memory if needed (maxmemory setting) */
1239 if (server.maxmemory) freeMemoryIfNeeded();
1240
1241 /* The QUIT command is handled as a special case. Normal command
1242 * procs are unable to close the client connection safely */
1243 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1244 freeClient(c);
1245 return 0;
1246 }
1247 cmd = lookupCommand(c->argv[0]->ptr);
1248 if (!cmd) {
1249 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1250 resetClient(c);
1251 return 1;
1252 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1253 (c->argc < -cmd->arity)) {
1254 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1255 resetClient(c);
1256 return 1;
1257 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1258 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1259 resetClient(c);
1260 return 1;
1261 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1262 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1263
1264 decrRefCount(c->argv[c->argc-1]);
1265 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1266 c->argc--;
1267 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1268 resetClient(c);
1269 return 1;
1270 }
1271 c->argc--;
1272 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1273 /* It is possible that the bulk read is already in the
1274 * buffer. Check this condition and handle it accordingly */
1275 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1276 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1277 c->argc++;
1278 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1279 } else {
1280 return 1;
1281 }
1282 }
1283 /* Let's try to share objects on the command arguments vector */
1284 if (server.shareobjects) {
1285 int j;
1286 for(j = 1; j < c->argc; j++)
1287 c->argv[j] = tryObjectSharing(c->argv[j]);
1288 }
1289 /* Check if the user is authenticated */
1290 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1291 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1292 resetClient(c);
1293 return 1;
1294 }
1295
1296 /* Exec the command */
1297 dirty = server.dirty;
1298 cmd->proc(c);
1299 if (server.dirty-dirty != 0 && listLength(server.slaves))
1300 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1301 if (listLength(server.monitors))
1302 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1303 server.stat_numcommands++;
1304
1305 /* Prepare the client for the next command */
1306 if (c->flags & REDIS_CLOSE) {
1307 freeClient(c);
1308 return 0;
1309 }
1310 resetClient(c);
1311 return 1;
1312 }
1313
1314 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1315 listNode *ln;
1316 int outc = 0, j;
1317 robj **outv;
1318 /* (args*2)+1 is enough room for args, spaces, newlines */
1319 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1320
1321 if (argc <= REDIS_STATIC_ARGS) {
1322 outv = static_outv;
1323 } else {
1324 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1325 if (!outv) oom("replicationFeedSlaves");
1326 }
1327
1328 for (j = 0; j < argc; j++) {
1329 if (j != 0) outv[outc++] = shared.space;
1330 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1331 robj *lenobj;
1332
1333 lenobj = createObject(REDIS_STRING,
1334 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv[j]->ptr)));
1335 lenobj->refcount = 0;
1336 outv[outc++] = lenobj;
1337 }
1338 outv[outc++] = argv[j];
1339 }
1340 outv[outc++] = shared.crlf;
1341
1342 /* Increment all the refcounts at start and decrement at end in order to
1343 * be sure to free objects if there is no slave in a replication state
1344 * able to be feed with commands */
1345 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1346 listRewind(slaves);
1347 while((ln = listYield(slaves))) {
1348 redisClient *slave = ln->value;
1349
1350 /* Don't feed slaves that are still waiting for BGSAVE to start */
1351 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1352
1353 /* Feed all the other slaves, MONITORs and so on */
1354 if (slave->slaveseldb != dictid) {
1355 robj *selectcmd;
1356
1357 switch(dictid) {
1358 case 0: selectcmd = shared.select0; break;
1359 case 1: selectcmd = shared.select1; break;
1360 case 2: selectcmd = shared.select2; break;
1361 case 3: selectcmd = shared.select3; break;
1362 case 4: selectcmd = shared.select4; break;
1363 case 5: selectcmd = shared.select5; break;
1364 case 6: selectcmd = shared.select6; break;
1365 case 7: selectcmd = shared.select7; break;
1366 case 8: selectcmd = shared.select8; break;
1367 case 9: selectcmd = shared.select9; break;
1368 default:
1369 selectcmd = createObject(REDIS_STRING,
1370 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1371 selectcmd->refcount = 0;
1372 break;
1373 }
1374 addReply(slave,selectcmd);
1375 slave->slaveseldb = dictid;
1376 }
1377 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1378 }
1379 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1380 if (outv != static_outv) zfree(outv);
1381 }
1382
1383 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1384 redisClient *c = (redisClient*) privdata;
1385 char buf[REDIS_IOBUF_LEN];
1386 int nread;
1387 REDIS_NOTUSED(el);
1388 REDIS_NOTUSED(mask);
1389
1390 nread = read(fd, buf, REDIS_IOBUF_LEN);
1391 if (nread == -1) {
1392 if (errno == EAGAIN) {
1393 nread = 0;
1394 } else {
1395 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1396 freeClient(c);
1397 return;
1398 }
1399 } else if (nread == 0) {
1400 redisLog(REDIS_DEBUG, "Client closed connection");
1401 freeClient(c);
1402 return;
1403 }
1404 if (nread) {
1405 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1406 c->lastinteraction = time(NULL);
1407 } else {
1408 return;
1409 }
1410
1411 again:
1412 if (c->bulklen == -1) {
1413 /* Read the first line of the query */
1414 char *p = strchr(c->querybuf,'\n');
1415 size_t querylen;
1416 if (p) {
1417 sds query, *argv;
1418 int argc, j;
1419
1420 query = c->querybuf;
1421 c->querybuf = sdsempty();
1422 querylen = 1+(p-(query));
1423 if (sdslen(query) > querylen) {
1424 /* leave data after the first line of the query in the buffer */
1425 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1426 }
1427 *p = '\0'; /* remove "\n" */
1428 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1429 sdsupdatelen(query);
1430
1431 /* Now we can split the query in arguments */
1432 if (sdslen(query) == 0) {
1433 /* Ignore empty query */
1434 sdsfree(query);
1435 return;
1436 }
1437 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1438 if (argv == NULL) oom("sdssplitlen");
1439 sdsfree(query);
1440
1441 if (c->argv) zfree(c->argv);
1442 c->argv = zmalloc(sizeof(robj*)*argc);
1443 if (c->argv == NULL) oom("allocating arguments list for client");
1444
1445 for (j = 0; j < argc; j++) {
1446 if (sdslen(argv[j])) {
1447 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1448 c->argc++;
1449 } else {
1450 sdsfree(argv[j]);
1451 }
1452 }
1453 zfree(argv);
1454 /* Execute the command. If the client is still valid
1455 * after processCommand() return and there is something
1456 * on the query buffer try to process the next command. */
1457 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1458 return;
1459 } else if (sdslen(c->querybuf) >= 1024*32) {
1460 redisLog(REDIS_DEBUG, "Client protocol error");
1461 freeClient(c);
1462 return;
1463 }
1464 } else {
1465 /* Bulk read handling. Note that if we are at this point
1466 the client already sent a command terminated with a newline,
1467 we are reading the bulk data that is actually the last
1468 argument of the command. */
1469 int qbl = sdslen(c->querybuf);
1470
1471 if (c->bulklen <= qbl) {
1472 /* Copy everything but the final CRLF as final argument */
1473 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1474 c->argc++;
1475 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1476 processCommand(c);
1477 return;
1478 }
1479 }
1480 }
1481
1482 static int selectDb(redisClient *c, int id) {
1483 if (id < 0 || id >= server.dbnum)
1484 return REDIS_ERR;
1485 c->db = &server.db[id];
1486 return REDIS_OK;
1487 }
1488
1489 static void *dupClientReplyValue(void *o) {
1490 incrRefCount((robj*)o);
1491 return 0;
1492 }
1493
1494 static redisClient *createClient(int fd) {
1495 redisClient *c = zmalloc(sizeof(*c));
1496
1497 anetNonBlock(NULL,fd);
1498 anetTcpNoDelay(NULL,fd);
1499 if (!c) return NULL;
1500 selectDb(c,0);
1501 c->fd = fd;
1502 c->querybuf = sdsempty();
1503 c->argc = 0;
1504 c->argv = NULL;
1505 c->bulklen = -1;
1506 c->sentlen = 0;
1507 c->flags = 0;
1508 c->lastinteraction = time(NULL);
1509 c->authenticated = 0;
1510 c->replstate = REDIS_REPL_NONE;
1511 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1512 listSetFreeMethod(c->reply,decrRefCount);
1513 listSetDupMethod(c->reply,dupClientReplyValue);
1514 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1515 readQueryFromClient, c, NULL) == AE_ERR) {
1516 freeClient(c);
1517 return NULL;
1518 }
1519 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1520 return c;
1521 }
1522
1523 static void addReply(redisClient *c, robj *obj) {
1524 if (listLength(c->reply) == 0 &&
1525 (c->replstate == REDIS_REPL_NONE ||
1526 c->replstate == REDIS_REPL_ONLINE) &&
1527 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1528 sendReplyToClient, c, NULL) == AE_ERR) return;
1529 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1530 incrRefCount(obj);
1531 }
1532
1533 static void addReplySds(redisClient *c, sds s) {
1534 robj *o = createObject(REDIS_STRING,s);
1535 addReply(c,o);
1536 decrRefCount(o);
1537 }
1538
1539 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1540 int cport, cfd;
1541 char cip[128];
1542 redisClient *c;
1543 REDIS_NOTUSED(el);
1544 REDIS_NOTUSED(mask);
1545 REDIS_NOTUSED(privdata);
1546
1547 cfd = anetAccept(server.neterr, fd, cip, &cport);
1548 if (cfd == AE_ERR) {
1549 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1550 return;
1551 }
1552 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1553 if ((c = createClient(cfd)) == NULL) {
1554 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1555 close(cfd); /* May be already closed, just ingore errors */
1556 return;
1557 }
1558 /* If maxclient directive is set and this is one client more... close the
1559 * connection. Note that we create the client instead to check before
1560 * for this condition, since now the socket is already set in nonblocking
1561 * mode and we can send an error for free using the Kernel I/O */
1562 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1563 char *err = "-ERR max number of clients reached\r\n";
1564
1565 /* That's a best effort error message, don't check write errors */
1566 (void) write(c->fd,err,strlen(err));
1567 freeClient(c);
1568 return;
1569 }
1570 server.stat_numconnections++;
1571 }
1572
1573 /* ======================= Redis objects implementation ===================== */
1574
1575 static robj *createObject(int type, void *ptr) {
1576 robj *o;
1577
1578 if (listLength(server.objfreelist)) {
1579 listNode *head = listFirst(server.objfreelist);
1580 o = listNodeValue(head);
1581 listDelNode(server.objfreelist,head);
1582 } else {
1583 o = zmalloc(sizeof(*o));
1584 }
1585 if (!o) oom("createObject");
1586 o->type = type;
1587 o->ptr = ptr;
1588 o->refcount = 1;
1589 return o;
1590 }
1591
1592 static robj *createStringObject(char *ptr, size_t len) {
1593 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1594 }
1595
1596 static robj *createListObject(void) {
1597 list *l = listCreate();
1598
1599 if (!l) oom("listCreate");
1600 listSetFreeMethod(l,decrRefCount);
1601 return createObject(REDIS_LIST,l);
1602 }
1603
1604 static robj *createSetObject(void) {
1605 dict *d = dictCreate(&setDictType,NULL);
1606 if (!d) oom("dictCreate");
1607 die();
1608 return createObject(REDIS_SET,d);
1609 }
1610
1611 static void freeStringObject(robj *o) {
1612 sdsfree(o->ptr);
1613 }
1614
1615 static void freeListObject(robj *o) {
1616 listRelease((list*) o->ptr);
1617 }
1618
1619 static void freeSetObject(robj *o) {
1620 dictRelease((dict*) o->ptr);
1621 }
1622
1623 static void freeHashObject(robj *o) {
1624 dictRelease((dict*) o->ptr);
1625 }
1626
1627 static void incrRefCount(robj *o) {
1628 o->refcount++;
1629 #ifdef DEBUG_REFCOUNT
1630 if (o->type == REDIS_STRING)
1631 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1632 #endif
1633 }
1634
1635 static void decrRefCount(void *obj) {
1636 robj *o = obj;
1637
1638 #ifdef DEBUG_REFCOUNT
1639 if (o->type == REDIS_STRING)
1640 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1641 #endif
1642 if (--(o->refcount) == 0) {
1643 switch(o->type) {
1644 case REDIS_STRING: freeStringObject(o); break;
1645 case REDIS_LIST: freeListObject(o); break;
1646 case REDIS_SET: freeSetObject(o); break;
1647 case REDIS_HASH: freeHashObject(o); break;
1648 default: assert(0 != 0); break;
1649 }
1650 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1651 !listAddNodeHead(server.objfreelist,o))
1652 zfree(o);
1653 }
1654 }
1655
1656 /* Try to share an object against the shared objects pool */
1657 static robj *tryObjectSharing(robj *o) {
1658 struct dictEntry *de;
1659 unsigned long c;
1660
1661 if (o == NULL || server.shareobjects == 0) return o;
1662
1663 assert(o->type == REDIS_STRING);
1664 de = dictFind(server.sharingpool,o);
1665 if (de) {
1666 robj *shared = dictGetEntryKey(de);
1667
1668 c = ((unsigned long) dictGetEntryVal(de))+1;
1669 dictGetEntryVal(de) = (void*) c;
1670 incrRefCount(shared);
1671 decrRefCount(o);
1672 return shared;
1673 } else {
1674 /* Here we are using a stream algorihtm: Every time an object is
1675 * shared we increment its count, everytime there is a miss we
1676 * recrement the counter of a random object. If this object reaches
1677 * zero we remove the object and put the current object instead. */
1678 if (dictSize(server.sharingpool) >=
1679 server.sharingpoolsize) {
1680 de = dictGetRandomKey(server.sharingpool);
1681 assert(de != NULL);
1682 c = ((unsigned long) dictGetEntryVal(de))-1;
1683 dictGetEntryVal(de) = (void*) c;
1684 if (c == 0) {
1685 dictDelete(server.sharingpool,de->key);
1686 }
1687 } else {
1688 c = 0; /* If the pool is empty we want to add this object */
1689 }
1690 if (c == 0) {
1691 int retval;
1692
1693 retval = dictAdd(server.sharingpool,o,(void*)1);
1694 assert(retval == DICT_OK);
1695 incrRefCount(o);
1696 }
1697 return o;
1698 }
1699 }
1700
1701 static robj *lookupKey(redisDb *db, robj *key) {
1702 dictEntry *de = dictFind(db->dict,key);
1703 return de ? dictGetEntryVal(de) : NULL;
1704 }
1705
1706 static robj *lookupKeyRead(redisDb *db, robj *key) {
1707 expireIfNeeded(db,key);
1708 return lookupKey(db,key);
1709 }
1710
1711 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1712 deleteIfVolatile(db,key);
1713 return lookupKey(db,key);
1714 }
1715
1716 static int deleteKey(redisDb *db, robj *key) {
1717 int retval;
1718
1719 /* We need to protect key from destruction: after the first dictDelete()
1720 * it may happen that 'key' is no longer valid if we don't increment
1721 * it's count. This may happen when we get the object reference directly
1722 * from the hash table with dictRandomKey() or dict iterators */
1723 incrRefCount(key);
1724 if (dictSize(db->expires)) dictDelete(db->expires,key);
1725 retval = dictDelete(db->dict,key);
1726 decrRefCount(key);
1727
1728 return retval == DICT_OK;
1729 }
1730
1731 /*============================ DB saving/loading ============================ */
1732
1733 static int rdbSaveType(FILE *fp, unsigned char type) {
1734 if (fwrite(&type,1,1,fp) == 0) return -1;
1735 return 0;
1736 }
1737
1738 static int rdbSaveTime(FILE *fp, time_t t) {
1739 int32_t t32 = (int32_t) t;
1740 if (fwrite(&t32,4,1,fp) == 0) return -1;
1741 return 0;
1742 }
1743
1744 /* check rdbLoadLen() comments for more info */
1745 static int rdbSaveLen(FILE *fp, uint32_t len) {
1746 unsigned char buf[2];
1747
1748 if (len < (1<<6)) {
1749 /* Save a 6 bit len */
1750 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
1751 if (fwrite(buf,1,1,fp) == 0) return -1;
1752 } else if (len < (1<<14)) {
1753 /* Save a 14 bit len */
1754 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
1755 buf[1] = len&0xFF;
1756 if (fwrite(buf,2,1,fp) == 0) return -1;
1757 } else {
1758 /* Save a 32 bit len */
1759 buf[0] = (REDIS_RDB_32BITLEN<<6);
1760 if (fwrite(buf,1,1,fp) == 0) return -1;
1761 len = htonl(len);
1762 if (fwrite(&len,4,1,fp) == 0) return -1;
1763 }
1764 return 0;
1765 }
1766
1767 /* String objects in the form "2391" "-100" without any space and with a
1768 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1769 * encoded as integers to save space */
1770 int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
1771 long long value;
1772 char *endptr, buf[32];
1773
1774 /* Check if it's possible to encode this value as a number */
1775 value = strtoll(s, &endptr, 10);
1776 if (endptr[0] != '\0') return 0;
1777 snprintf(buf,32,"%lld",value);
1778
1779 /* If the number converted back into a string is not identical
1780 * then it's not possible to encode the string as integer */
1781 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
1782
1783 /* Finally check if it fits in our ranges */
1784 if (value >= -(1<<7) && value <= (1<<7)-1) {
1785 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
1786 enc[1] = value&0xFF;
1787 return 2;
1788 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
1789 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
1790 enc[1] = value&0xFF;
1791 enc[2] = (value>>8)&0xFF;
1792 return 3;
1793 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
1794 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
1795 enc[1] = value&0xFF;
1796 enc[2] = (value>>8)&0xFF;
1797 enc[3] = (value>>16)&0xFF;
1798 enc[4] = (value>>24)&0xFF;
1799 return 5;
1800 } else {
1801 return 0;
1802 }
1803 }
1804
1805 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
1806 unsigned int comprlen, outlen;
1807 unsigned char byte;
1808 void *out;
1809
1810 /* We require at least four bytes compression for this to be worth it */
1811 outlen = sdslen(obj->ptr)-4;
1812 if (outlen <= 0) return 0;
1813 if ((out = zmalloc(outlen+1)) == NULL) return 0;
1814 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
1815 if (comprlen == 0) {
1816 zfree(out);
1817 return 0;
1818 }
1819 /* Data compressed! Let's save it on disk */
1820 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
1821 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
1822 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
1823 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
1824 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
1825 zfree(out);
1826 return comprlen;
1827
1828 writeerr:
1829 zfree(out);
1830 return -1;
1831 }
1832
1833 /* Save a string objet as [len][data] on disk. If the object is a string
1834 * representation of an integer value we try to safe it in a special form */
1835 static int rdbSaveStringObject(FILE *fp, robj *obj) {
1836 size_t len = sdslen(obj->ptr);
1837 int enclen;
1838
1839 /* Try integer encoding */
1840 if (len <= 11) {
1841 unsigned char buf[5];
1842 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
1843 if (fwrite(buf,enclen,1,fp) == 0) return -1;
1844 return 0;
1845 }
1846 }
1847
1848 /* Try LZF compression - under 20 bytes it's unable to compress even
1849 * aaaaaaaaaaaaaaaaaa so skip it */
1850 if (1 && len > 20) {
1851 int retval;
1852
1853 retval = rdbSaveLzfStringObject(fp,obj);
1854 if (retval == -1) return -1;
1855 if (retval > 0) return 0;
1856 /* retval == 0 means data can't be compressed, save the old way */
1857 }
1858
1859 /* Store verbatim */
1860 if (rdbSaveLen(fp,len) == -1) return -1;
1861 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
1862 return 0;
1863 }
1864
1865 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1866 static int rdbSave(char *filename) {
1867 dictIterator *di = NULL;
1868 dictEntry *de;
1869 FILE *fp;
1870 char tmpfile[256];
1871 int j;
1872 time_t now = time(NULL);
1873
1874 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
1875 fp = fopen(tmpfile,"w");
1876 if (!fp) {
1877 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
1878 return REDIS_ERR;
1879 }
1880 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
1881 for (j = 0; j < server.dbnum; j++) {
1882 redisDb *db = server.db+j;
1883 dict *d = db->dict;
1884 if (dictSize(d) == 0) continue;
1885 di = dictGetIterator(d);
1886 if (!di) {
1887 fclose(fp);
1888 return REDIS_ERR;
1889 }
1890
1891 /* Write the SELECT DB opcode */
1892 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
1893 if (rdbSaveLen(fp,j) == -1) goto werr;
1894
1895 /* Iterate this DB writing every entry */
1896 while((de = dictNext(di)) != NULL) {
1897 robj *key = dictGetEntryKey(de);
1898 robj *o = dictGetEntryVal(de);
1899 time_t expiretime = getExpire(db,key);
1900
1901 /* Save the expire time */
1902 if (expiretime != -1) {
1903 /* If this key is already expired skip it */
1904 if (expiretime < now) continue;
1905 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
1906 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
1907 }
1908 /* Save the key and associated value */
1909 if (rdbSaveType(fp,o->type) == -1) goto werr;
1910 if (rdbSaveStringObject(fp,key) == -1) goto werr;
1911 if (o->type == REDIS_STRING) {
1912 /* Save a string value */
1913 if (rdbSaveStringObject(fp,o) == -1) goto werr;
1914 } else if (o->type == REDIS_LIST) {
1915 /* Save a list value */
1916 list *list = o->ptr;
1917 listNode *ln;
1918
1919 listRewind(list);
1920 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
1921 while((ln = listYield(list))) {
1922 robj *eleobj = listNodeValue(ln);
1923
1924 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1925 }
1926 } else if (o->type == REDIS_SET) {
1927 /* Save a set value */
1928 dict *set = o->ptr;
1929 dictIterator *di = dictGetIterator(set);
1930 dictEntry *de;
1931
1932 if (!set) oom("dictGetIteraotr");
1933 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
1934 while((de = dictNext(di)) != NULL) {
1935 robj *eleobj = dictGetEntryKey(de);
1936
1937 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1938 }
1939 dictReleaseIterator(di);
1940 } else {
1941 assert(0 != 0);
1942 }
1943 }
1944 dictReleaseIterator(di);
1945 }
1946 /* EOF opcode */
1947 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
1948
1949 /* Make sure data will not remain on the OS's output buffers */
1950 fflush(fp);
1951 fsync(fileno(fp));
1952 fclose(fp);
1953
1954 /* Use RENAME to make sure the DB file is changed atomically only
1955 * if the generate DB file is ok. */
1956 if (rename(tmpfile,filename) == -1) {
1957 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
1958 unlink(tmpfile);
1959 return REDIS_ERR;
1960 }
1961 redisLog(REDIS_NOTICE,"DB saved on disk");
1962 server.dirty = 0;
1963 server.lastsave = time(NULL);
1964 return REDIS_OK;
1965
1966 werr:
1967 fclose(fp);
1968 unlink(tmpfile);
1969 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1970 if (di) dictReleaseIterator(di);
1971 return REDIS_ERR;
1972 }
1973
1974 static int rdbSaveBackground(char *filename) {
1975 pid_t childpid;
1976
1977 if (server.bgsaveinprogress) return REDIS_ERR;
1978 if ((childpid = fork()) == 0) {
1979 /* Child */
1980 close(server.fd);
1981 if (rdbSave(filename) == REDIS_OK) {
1982 exit(0);
1983 } else {
1984 exit(1);
1985 }
1986 } else {
1987 /* Parent */
1988 if (childpid == -1) {
1989 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
1990 strerror(errno));
1991 return REDIS_ERR;
1992 }
1993 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
1994 server.bgsaveinprogress = 1;
1995 return REDIS_OK;
1996 }
1997 return REDIS_OK; /* unreached */
1998 }
1999
2000 static int rdbLoadType(FILE *fp) {
2001 unsigned char type;
2002 if (fread(&type,1,1,fp) == 0) return -1;
2003 return type;
2004 }
2005
2006 static time_t rdbLoadTime(FILE *fp) {
2007 int32_t t32;
2008 if (fread(&t32,4,1,fp) == 0) return -1;
2009 return (time_t) t32;
2010 }
2011
2012 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2013 * of this file for a description of how this are stored on disk.
2014 *
2015 * isencoded is set to 1 if the readed length is not actually a length but
2016 * an "encoding type", check the above comments for more info */
2017 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2018 unsigned char buf[2];
2019 uint32_t len;
2020
2021 if (isencoded) *isencoded = 0;
2022 if (rdbver == 0) {
2023 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2024 return ntohl(len);
2025 } else {
2026 int type;
2027
2028 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2029 type = (buf[0]&0xC0)>>6;
2030 if (type == REDIS_RDB_6BITLEN) {
2031 /* Read a 6 bit len */
2032 return buf[0]&0x3F;
2033 } else if (type == REDIS_RDB_ENCVAL) {
2034 /* Read a 6 bit len encoding type */
2035 if (isencoded) *isencoded = 1;
2036 return buf[0]&0x3F;
2037 } else if (type == REDIS_RDB_14BITLEN) {
2038 /* Read a 14 bit len */
2039 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2040 return ((buf[0]&0x3F)<<8)|buf[1];
2041 } else {
2042 /* Read a 32 bit len */
2043 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2044 return ntohl(len);
2045 }
2046 }
2047 }
2048
2049 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2050 unsigned char enc[4];
2051 long long val;
2052
2053 if (enctype == REDIS_RDB_ENC_INT8) {
2054 if (fread(enc,1,1,fp) == 0) return NULL;
2055 val = (signed char)enc[0];
2056 } else if (enctype == REDIS_RDB_ENC_INT16) {
2057 uint16_t v;
2058 if (fread(enc,2,1,fp) == 0) return NULL;
2059 v = enc[0]|(enc[1]<<8);
2060 val = (int16_t)v;
2061 } else if (enctype == REDIS_RDB_ENC_INT32) {
2062 uint32_t v;
2063 if (fread(enc,4,1,fp) == 0) return NULL;
2064 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2065 val = (int32_t)v;
2066 } else {
2067 val = 0; /* anti-warning */
2068 assert(0!=0);
2069 }
2070 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2071 }
2072
2073 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2074 unsigned int len, clen;
2075 unsigned char *c = NULL;
2076 sds val = NULL;
2077
2078 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2079 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2080 if ((c = zmalloc(clen)) == NULL) goto err;
2081 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2082 if (fread(c,clen,1,fp) == 0) goto err;
2083 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2084 zfree(c);
2085 return createObject(REDIS_STRING,val);
2086 err:
2087 zfree(c);
2088 sdsfree(val);
2089 return NULL;
2090 }
2091
2092 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2093 int isencoded;
2094 uint32_t len;
2095 sds val;
2096
2097 len = rdbLoadLen(fp,rdbver,&isencoded);
2098 if (isencoded) {
2099 switch(len) {
2100 case REDIS_RDB_ENC_INT8:
2101 case REDIS_RDB_ENC_INT16:
2102 case REDIS_RDB_ENC_INT32:
2103 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2104 case REDIS_RDB_ENC_LZF:
2105 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2106 default:
2107 assert(0!=0);
2108 }
2109 }
2110
2111 if (len == REDIS_RDB_LENERR) return NULL;
2112 val = sdsnewlen(NULL,len);
2113 if (len && fread(val,len,1,fp) == 0) {
2114 sdsfree(val);
2115 return NULL;
2116 }
2117 return tryObjectSharing(createObject(REDIS_STRING,val));
2118 }
2119
2120 static int rdbLoad(char *filename) {
2121 FILE *fp;
2122 robj *keyobj = NULL;
2123 uint32_t dbid;
2124 int type, retval, rdbver;
2125 dict *d = server.db[0].dict;
2126 redisDb *db = server.db+0;
2127 char buf[1024];
2128 time_t expiretime = -1, now = time(NULL);
2129
2130 fp = fopen(filename,"r");
2131 if (!fp) return REDIS_ERR;
2132 if (fread(buf,9,1,fp) == 0) goto eoferr;
2133 buf[9] = '\0';
2134 if (memcmp(buf,"REDIS",5) != 0) {
2135 fclose(fp);
2136 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2137 return REDIS_ERR;
2138 }
2139 rdbver = atoi(buf+5);
2140 if (rdbver > 1) {
2141 fclose(fp);
2142 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2143 return REDIS_ERR;
2144 }
2145 while(1) {
2146 robj *o;
2147
2148 /* Read type. */
2149 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2150 if (type == REDIS_EXPIRETIME) {
2151 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2152 /* We read the time so we need to read the object type again */
2153 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2154 }
2155 if (type == REDIS_EOF) break;
2156 /* Handle SELECT DB opcode as a special case */
2157 if (type == REDIS_SELECTDB) {
2158 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2159 goto eoferr;
2160 if (dbid >= (unsigned)server.dbnum) {
2161 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2162 exit(1);
2163 }
2164 db = server.db+dbid;
2165 d = db->dict;
2166 continue;
2167 }
2168 /* Read key */
2169 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2170
2171 if (type == REDIS_STRING) {
2172 /* Read string value */
2173 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2174 } else if (type == REDIS_LIST || type == REDIS_SET) {
2175 /* Read list/set value */
2176 uint32_t listlen;
2177
2178 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2179 goto eoferr;
2180 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2181 /* Load every single element of the list/set */
2182 while(listlen--) {
2183 robj *ele;
2184
2185 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2186 if (type == REDIS_LIST) {
2187 if (!listAddNodeTail((list*)o->ptr,ele))
2188 oom("listAddNodeTail");
2189 } else {
2190 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2191 oom("dictAdd");
2192 }
2193 }
2194 } else {
2195 assert(0 != 0);
2196 }
2197 /* Add the new object in the hash table */
2198 retval = dictAdd(d,keyobj,o);
2199 if (retval == DICT_ERR) {
2200 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2201 exit(1);
2202 }
2203 /* Set the expire time if needed */
2204 if (expiretime != -1) {
2205 setExpire(db,keyobj,expiretime);
2206 /* Delete this key if already expired */
2207 if (expiretime < now) deleteKey(db,keyobj);
2208 expiretime = -1;
2209 }
2210 keyobj = o = NULL;
2211 }
2212 fclose(fp);
2213 return REDIS_OK;
2214
2215 eoferr: /* unexpected end of file is handled here with a fatal exit */
2216 if (keyobj) decrRefCount(keyobj);
2217 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2218 exit(1);
2219 return REDIS_ERR; /* Just to avoid warning */
2220 }
2221
2222 /*================================== Commands =============================== */
2223
2224 static void authCommand(redisClient *c) {
2225 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2226 c->authenticated = 1;
2227 addReply(c,shared.ok);
2228 } else {
2229 c->authenticated = 0;
2230 addReply(c,shared.err);
2231 }
2232 }
2233
2234 static void pingCommand(redisClient *c) {
2235 addReply(c,shared.pong);
2236 }
2237
2238 static void echoCommand(redisClient *c) {
2239 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
2240 (int)sdslen(c->argv[1]->ptr)));
2241 addReply(c,c->argv[1]);
2242 addReply(c,shared.crlf);
2243 }
2244
2245 /*=================================== Strings =============================== */
2246
2247 static void setGenericCommand(redisClient *c, int nx) {
2248 int retval;
2249
2250 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2251 if (retval == DICT_ERR) {
2252 if (!nx) {
2253 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2254 incrRefCount(c->argv[2]);
2255 } else {
2256 addReply(c,shared.czero);
2257 return;
2258 }
2259 } else {
2260 incrRefCount(c->argv[1]);
2261 incrRefCount(c->argv[2]);
2262 }
2263 server.dirty++;
2264 removeExpire(c->db,c->argv[1]);
2265 addReply(c, nx ? shared.cone : shared.ok);
2266 }
2267
2268 static void setCommand(redisClient *c) {
2269 setGenericCommand(c,0);
2270 }
2271
2272 static void setnxCommand(redisClient *c) {
2273 setGenericCommand(c,1);
2274 }
2275
2276 static void getCommand(redisClient *c) {
2277 robj *o = lookupKeyRead(c->db,c->argv[1]);
2278
2279 if (o == NULL) {
2280 addReply(c,shared.nullbulk);
2281 } else {
2282 if (o->type != REDIS_STRING) {
2283 addReply(c,shared.wrongtypeerr);
2284 } else {
2285 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2286 addReply(c,o);
2287 addReply(c,shared.crlf);
2288 }
2289 }
2290 }
2291
2292 static void getSetCommand(redisClient *c) {
2293 getCommand(c);
2294 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2295 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2296 } else {
2297 incrRefCount(c->argv[1]);
2298 }
2299 incrRefCount(c->argv[2]);
2300 server.dirty++;
2301 removeExpire(c->db,c->argv[1]);
2302 }
2303
2304 static void mgetCommand(redisClient *c) {
2305 int j;
2306
2307 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2308 for (j = 1; j < c->argc; j++) {
2309 robj *o = lookupKeyRead(c->db,c->argv[j]);
2310 if (o == NULL) {
2311 addReply(c,shared.nullbulk);
2312 } else {
2313 if (o->type != REDIS_STRING) {
2314 addReply(c,shared.nullbulk);
2315 } else {
2316 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2317 addReply(c,o);
2318 addReply(c,shared.crlf);
2319 }
2320 }
2321 }
2322 }
2323
2324 static void incrDecrCommand(redisClient *c, long long incr) {
2325 long long value;
2326 int retval;
2327 robj *o;
2328
2329 o = lookupKeyWrite(c->db,c->argv[1]);
2330 if (o == NULL) {
2331 value = 0;
2332 } else {
2333 if (o->type != REDIS_STRING) {
2334 value = 0;
2335 } else {
2336 char *eptr;
2337
2338 value = strtoll(o->ptr, &eptr, 10);
2339 }
2340 }
2341
2342 value += incr;
2343 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2344 retval = dictAdd(c->db->dict,c->argv[1],o);
2345 if (retval == DICT_ERR) {
2346 dictReplace(c->db->dict,c->argv[1],o);
2347 removeExpire(c->db,c->argv[1]);
2348 } else {
2349 incrRefCount(c->argv[1]);
2350 }
2351 server.dirty++;
2352 addReply(c,shared.colon);
2353 addReply(c,o);
2354 addReply(c,shared.crlf);
2355 }
2356
2357 static void incrCommand(redisClient *c) {
2358 incrDecrCommand(c,1);
2359 }
2360
2361 static void decrCommand(redisClient *c) {
2362 incrDecrCommand(c,-1);
2363 }
2364
2365 static void incrbyCommand(redisClient *c) {
2366 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2367 incrDecrCommand(c,incr);
2368 }
2369
2370 static void decrbyCommand(redisClient *c) {
2371 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2372 incrDecrCommand(c,-incr);
2373 }
2374
2375 /* ========================= Type agnostic commands ========================= */
2376
2377 static void delCommand(redisClient *c) {
2378 int deleted = 0, j;
2379
2380 for (j = 1; j < c->argc; j++) {
2381 if (deleteKey(c->db,c->argv[j])) {
2382 server.dirty++;
2383 deleted++;
2384 }
2385 }
2386 switch(deleted) {
2387 case 0:
2388 addReply(c,shared.czero);
2389 break;
2390 case 1:
2391 addReply(c,shared.cone);
2392 break;
2393 default:
2394 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2395 break;
2396 }
2397 }
2398
2399 static void existsCommand(redisClient *c) {
2400 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2401 }
2402
2403 static void selectCommand(redisClient *c) {
2404 int id = atoi(c->argv[1]->ptr);
2405
2406 if (selectDb(c,id) == REDIS_ERR) {
2407 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2408 } else {
2409 addReply(c,shared.ok);
2410 }
2411 }
2412
2413 static void randomkeyCommand(redisClient *c) {
2414 dictEntry *de;
2415
2416 while(1) {
2417 de = dictGetRandomKey(c->db->dict);
2418 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2419 }
2420 if (de == NULL) {
2421 addReply(c,shared.plus);
2422 addReply(c,shared.crlf);
2423 } else {
2424 addReply(c,shared.plus);
2425 addReply(c,dictGetEntryKey(de));
2426 addReply(c,shared.crlf);
2427 }
2428 }
2429
2430 static void keysCommand(redisClient *c) {
2431 dictIterator *di;
2432 dictEntry *de;
2433 sds pattern = c->argv[1]->ptr;
2434 int plen = sdslen(pattern);
2435 int numkeys = 0, keyslen = 0;
2436 robj *lenobj = createObject(REDIS_STRING,NULL);
2437
2438 di = dictGetIterator(c->db->dict);
2439 if (!di) oom("dictGetIterator");
2440 addReply(c,lenobj);
2441 decrRefCount(lenobj);
2442 while((de = dictNext(di)) != NULL) {
2443 robj *keyobj = dictGetEntryKey(de);
2444
2445 sds key = keyobj->ptr;
2446 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2447 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2448 if (expireIfNeeded(c->db,keyobj) == 0) {
2449 if (numkeys != 0)
2450 addReply(c,shared.space);
2451 addReply(c,keyobj);
2452 numkeys++;
2453 keyslen += sdslen(key);
2454 }
2455 }
2456 }
2457 dictReleaseIterator(di);
2458 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2459 addReply(c,shared.crlf);
2460 }
2461
2462 static void dbsizeCommand(redisClient *c) {
2463 addReplySds(c,
2464 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2465 }
2466
2467 static void lastsaveCommand(redisClient *c) {
2468 addReplySds(c,
2469 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2470 }
2471
2472 static void typeCommand(redisClient *c) {
2473 robj *o;
2474 char *type;
2475
2476 o = lookupKeyRead(c->db,c->argv[1]);
2477 if (o == NULL) {
2478 type = "+none";
2479 } else {
2480 switch(o->type) {
2481 case REDIS_STRING: type = "+string"; break;
2482 case REDIS_LIST: type = "+list"; break;
2483 case REDIS_SET: type = "+set"; break;
2484 default: type = "unknown"; break;
2485 }
2486 }
2487 addReplySds(c,sdsnew(type));
2488 addReply(c,shared.crlf);
2489 }
2490
2491 static void saveCommand(redisClient *c) {
2492 if (server.bgsaveinprogress) {
2493 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2494 return;
2495 }
2496 if (rdbSave(server.dbfilename) == REDIS_OK) {
2497 addReply(c,shared.ok);
2498 } else {
2499 addReply(c,shared.err);
2500 }
2501 }
2502
2503 static void bgsaveCommand(redisClient *c) {
2504 if (server.bgsaveinprogress) {
2505 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2506 return;
2507 }
2508 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2509 addReply(c,shared.ok);
2510 } else {
2511 addReply(c,shared.err);
2512 }
2513 }
2514
2515 static void shutdownCommand(redisClient *c) {
2516 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2517 /* XXX: TODO kill the child if there is a bgsave in progress */
2518 if (rdbSave(server.dbfilename) == REDIS_OK) {
2519 if (server.daemonize) {
2520 unlink(server.pidfile);
2521 }
2522 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2523 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2524 exit(1);
2525 } else {
2526 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2527 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2528 }
2529 }
2530
2531 static void renameGenericCommand(redisClient *c, int nx) {
2532 robj *o;
2533
2534 /* To use the same key as src and dst is probably an error */
2535 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2536 addReply(c,shared.sameobjecterr);
2537 return;
2538 }
2539
2540 o = lookupKeyWrite(c->db,c->argv[1]);
2541 if (o == NULL) {
2542 addReply(c,shared.nokeyerr);
2543 return;
2544 }
2545 incrRefCount(o);
2546 deleteIfVolatile(c->db,c->argv[2]);
2547 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2548 if (nx) {
2549 decrRefCount(o);
2550 addReply(c,shared.czero);
2551 return;
2552 }
2553 dictReplace(c->db->dict,c->argv[2],o);
2554 } else {
2555 incrRefCount(c->argv[2]);
2556 }
2557 deleteKey(c->db,c->argv[1]);
2558 server.dirty++;
2559 addReply(c,nx ? shared.cone : shared.ok);
2560 }
2561
2562 static void renameCommand(redisClient *c) {
2563 renameGenericCommand(c,0);
2564 }
2565
2566 static void renamenxCommand(redisClient *c) {
2567 renameGenericCommand(c,1);
2568 }
2569
2570 static void moveCommand(redisClient *c) {
2571 robj *o;
2572 redisDb *src, *dst;
2573 int srcid;
2574
2575 /* Obtain source and target DB pointers */
2576 src = c->db;
2577 srcid = c->db->id;
2578 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2579 addReply(c,shared.outofrangeerr);
2580 return;
2581 }
2582 dst = c->db;
2583 selectDb(c,srcid); /* Back to the source DB */
2584
2585 /* If the user is moving using as target the same
2586 * DB as the source DB it is probably an error. */
2587 if (src == dst) {
2588 addReply(c,shared.sameobjecterr);
2589 return;
2590 }
2591
2592 /* Check if the element exists and get a reference */
2593 o = lookupKeyWrite(c->db,c->argv[1]);
2594 if (!o) {
2595 addReply(c,shared.czero);
2596 return;
2597 }
2598
2599 /* Try to add the element to the target DB */
2600 deleteIfVolatile(dst,c->argv[1]);
2601 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2602 addReply(c,shared.czero);
2603 return;
2604 }
2605 incrRefCount(c->argv[1]);
2606 incrRefCount(o);
2607
2608 /* OK! key moved, free the entry in the source DB */
2609 deleteKey(src,c->argv[1]);
2610 server.dirty++;
2611 addReply(c,shared.cone);
2612 }
2613
2614 /* =================================== Lists ================================ */
2615 static void pushGenericCommand(redisClient *c, int where) {
2616 robj *lobj;
2617 list *list;
2618
2619 lobj = lookupKeyWrite(c->db,c->argv[1]);
2620 if (lobj == NULL) {
2621 lobj = createListObject();
2622 list = lobj->ptr;
2623 if (where == REDIS_HEAD) {
2624 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2625 } else {
2626 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2627 }
2628 dictAdd(c->db->dict,c->argv[1],lobj);
2629 incrRefCount(c->argv[1]);
2630 incrRefCount(c->argv[2]);
2631 } else {
2632 if (lobj->type != REDIS_LIST) {
2633 addReply(c,shared.wrongtypeerr);
2634 return;
2635 }
2636 list = lobj->ptr;
2637 if (where == REDIS_HEAD) {
2638 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2639 } else {
2640 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2641 }
2642 incrRefCount(c->argv[2]);
2643 }
2644 server.dirty++;
2645 addReply(c,shared.ok);
2646 }
2647
2648 static void lpushCommand(redisClient *c) {
2649 pushGenericCommand(c,REDIS_HEAD);
2650 }
2651
2652 static void rpushCommand(redisClient *c) {
2653 pushGenericCommand(c,REDIS_TAIL);
2654 }
2655
2656 static void llenCommand(redisClient *c) {
2657 robj *o;
2658 list *l;
2659
2660 o = lookupKeyRead(c->db,c->argv[1]);
2661 if (o == NULL) {
2662 addReply(c,shared.czero);
2663 return;
2664 } else {
2665 if (o->type != REDIS_LIST) {
2666 addReply(c,shared.wrongtypeerr);
2667 } else {
2668 l = o->ptr;
2669 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
2670 }
2671 }
2672 }
2673
2674 static void lindexCommand(redisClient *c) {
2675 robj *o;
2676 int index = atoi(c->argv[2]->ptr);
2677
2678 o = lookupKeyRead(c->db,c->argv[1]);
2679 if (o == NULL) {
2680 addReply(c,shared.nullbulk);
2681 } else {
2682 if (o->type != REDIS_LIST) {
2683 addReply(c,shared.wrongtypeerr);
2684 } else {
2685 list *list = o->ptr;
2686 listNode *ln;
2687
2688 ln = listIndex(list, index);
2689 if (ln == NULL) {
2690 addReply(c,shared.nullbulk);
2691 } else {
2692 robj *ele = listNodeValue(ln);
2693 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2694 addReply(c,ele);
2695 addReply(c,shared.crlf);
2696 }
2697 }
2698 }
2699 }
2700
2701 static void lsetCommand(redisClient *c) {
2702 robj *o;
2703 int index = atoi(c->argv[2]->ptr);
2704
2705 o = lookupKeyWrite(c->db,c->argv[1]);
2706 if (o == NULL) {
2707 addReply(c,shared.nokeyerr);
2708 } else {
2709 if (o->type != REDIS_LIST) {
2710 addReply(c,shared.wrongtypeerr);
2711 } else {
2712 list *list = o->ptr;
2713 listNode *ln;
2714
2715 ln = listIndex(list, index);
2716 if (ln == NULL) {
2717 addReply(c,shared.outofrangeerr);
2718 } else {
2719 robj *ele = listNodeValue(ln);
2720
2721 decrRefCount(ele);
2722 listNodeValue(ln) = c->argv[3];
2723 incrRefCount(c->argv[3]);
2724 addReply(c,shared.ok);
2725 server.dirty++;
2726 }
2727 }
2728 }
2729 }
2730
2731 static void popGenericCommand(redisClient *c, int where) {
2732 robj *o;
2733
2734 o = lookupKeyWrite(c->db,c->argv[1]);
2735 if (o == NULL) {
2736 addReply(c,shared.nullbulk);
2737 } else {
2738 if (o->type != REDIS_LIST) {
2739 addReply(c,shared.wrongtypeerr);
2740 } else {
2741 list *list = o->ptr;
2742 listNode *ln;
2743
2744 if (where == REDIS_HEAD)
2745 ln = listFirst(list);
2746 else
2747 ln = listLast(list);
2748
2749 if (ln == NULL) {
2750 addReply(c,shared.nullbulk);
2751 } else {
2752 robj *ele = listNodeValue(ln);
2753 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2754 addReply(c,ele);
2755 addReply(c,shared.crlf);
2756 listDelNode(list,ln);
2757 server.dirty++;
2758 }
2759 }
2760 }
2761 }
2762
2763 static void lpopCommand(redisClient *c) {
2764 popGenericCommand(c,REDIS_HEAD);
2765 }
2766
2767 static void rpopCommand(redisClient *c) {
2768 popGenericCommand(c,REDIS_TAIL);
2769 }
2770
2771 static void lrangeCommand(redisClient *c) {
2772 robj *o;
2773 int start = atoi(c->argv[2]->ptr);
2774 int end = atoi(c->argv[3]->ptr);
2775
2776 o = lookupKeyRead(c->db,c->argv[1]);
2777 if (o == NULL) {
2778 addReply(c,shared.nullmultibulk);
2779 } else {
2780 if (o->type != REDIS_LIST) {
2781 addReply(c,shared.wrongtypeerr);
2782 } else {
2783 list *list = o->ptr;
2784 listNode *ln;
2785 int llen = listLength(list);
2786 int rangelen, j;
2787 robj *ele;
2788
2789 /* convert negative indexes */
2790 if (start < 0) start = llen+start;
2791 if (end < 0) end = llen+end;
2792 if (start < 0) start = 0;
2793 if (end < 0) end = 0;
2794
2795 /* indexes sanity checks */
2796 if (start > end || start >= llen) {
2797 /* Out of range start or start > end result in empty list */
2798 addReply(c,shared.emptymultibulk);
2799 return;
2800 }
2801 if (end >= llen) end = llen-1;
2802 rangelen = (end-start)+1;
2803
2804 /* Return the result in form of a multi-bulk reply */
2805 ln = listIndex(list, start);
2806 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
2807 for (j = 0; j < rangelen; j++) {
2808 ele = listNodeValue(ln);
2809 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2810 addReply(c,ele);
2811 addReply(c,shared.crlf);
2812 ln = ln->next;
2813 }
2814 }
2815 }
2816 }
2817
2818 static void ltrimCommand(redisClient *c) {
2819 robj *o;
2820 int start = atoi(c->argv[2]->ptr);
2821 int end = atoi(c->argv[3]->ptr);
2822
2823 o = lookupKeyWrite(c->db,c->argv[1]);
2824 if (o == NULL) {
2825 addReply(c,shared.nokeyerr);
2826 } else {
2827 if (o->type != REDIS_LIST) {
2828 addReply(c,shared.wrongtypeerr);
2829 } else {
2830 list *list = o->ptr;
2831 listNode *ln;
2832 int llen = listLength(list);
2833 int j, ltrim, rtrim;
2834
2835 /* convert negative indexes */
2836 if (start < 0) start = llen+start;
2837 if (end < 0) end = llen+end;
2838 if (start < 0) start = 0;
2839 if (end < 0) end = 0;
2840
2841 /* indexes sanity checks */
2842 if (start > end || start >= llen) {
2843 /* Out of range start or start > end result in empty list */
2844 ltrim = llen;
2845 rtrim = 0;
2846 } else {
2847 if (end >= llen) end = llen-1;
2848 ltrim = start;
2849 rtrim = llen-end-1;
2850 }
2851
2852 /* Remove list elements to perform the trim */
2853 for (j = 0; j < ltrim; j++) {
2854 ln = listFirst(list);
2855 listDelNode(list,ln);
2856 }
2857 for (j = 0; j < rtrim; j++) {
2858 ln = listLast(list);
2859 listDelNode(list,ln);
2860 }
2861 addReply(c,shared.ok);
2862 server.dirty++;
2863 }
2864 }
2865 }
2866
2867 static void lremCommand(redisClient *c) {
2868 robj *o;
2869
2870 o = lookupKeyWrite(c->db,c->argv[1]);
2871 if (o == NULL) {
2872 addReply(c,shared.nokeyerr);
2873 } else {
2874 if (o->type != REDIS_LIST) {
2875 addReply(c,shared.wrongtypeerr);
2876 } else {
2877 list *list = o->ptr;
2878 listNode *ln, *next;
2879 int toremove = atoi(c->argv[2]->ptr);
2880 int removed = 0;
2881 int fromtail = 0;
2882
2883 if (toremove < 0) {
2884 toremove = -toremove;
2885 fromtail = 1;
2886 }
2887 ln = fromtail ? list->tail : list->head;
2888 while (ln) {
2889 robj *ele = listNodeValue(ln);
2890
2891 next = fromtail ? ln->prev : ln->next;
2892 if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) {
2893 listDelNode(list,ln);
2894 server.dirty++;
2895 removed++;
2896 if (toremove && removed == toremove) break;
2897 }
2898 ln = next;
2899 }
2900 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
2901 }
2902 }
2903 }
2904
2905 /* ==================================== Sets ================================ */
2906
2907 static void saddCommand(redisClient *c) {
2908 robj *set;
2909
2910 set = lookupKeyWrite(c->db,c->argv[1]);
2911 if (set == NULL) {
2912 set = createSetObject();
2913 dictAdd(c->db->dict,c->argv[1],set);
2914 incrRefCount(c->argv[1]);
2915 } else {
2916 if (set->type != REDIS_SET) {
2917 addReply(c,shared.wrongtypeerr);
2918 return;
2919 }
2920 }
2921 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
2922 incrRefCount(c->argv[2]);
2923 server.dirty++;
2924 addReply(c,shared.cone);
2925 } else {
2926 addReply(c,shared.czero);
2927 }
2928 }
2929
2930 static void sremCommand(redisClient *c) {
2931 robj *set;
2932
2933 set = lookupKeyWrite(c->db,c->argv[1]);
2934 if (set == NULL) {
2935 addReply(c,shared.czero);
2936 } else {
2937 if (set->type != REDIS_SET) {
2938 addReply(c,shared.wrongtypeerr);
2939 return;
2940 }
2941 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
2942 server.dirty++;
2943 addReply(c,shared.cone);
2944 } else {
2945 addReply(c,shared.czero);
2946 }
2947 }
2948 }
2949
2950 static void smoveCommand(redisClient *c) {
2951 robj *srcset, *dstset;
2952
2953 srcset = lookupKeyWrite(c->db,c->argv[1]);
2954 dstset = lookupKeyWrite(c->db,c->argv[2]);
2955
2956 /* If the source key does not exist return 0, if it's of the wrong type
2957 * raise an error */
2958 if (srcset == NULL || srcset->type != REDIS_SET) {
2959 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
2960 return;
2961 }
2962 /* Error if the destination key is not a set as well */
2963 if (dstset && dstset->type != REDIS_SET) {
2964 addReply(c,shared.wrongtypeerr);
2965 return;
2966 }
2967 /* Remove the element from the source set */
2968 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
2969 /* Key not found in the src set! return zero */
2970 addReply(c,shared.czero);
2971 return;
2972 }
2973 server.dirty++;
2974 /* Add the element to the destination set */
2975 if (!dstset) {
2976 dstset = createSetObject();
2977 dictAdd(c->db->dict,c->argv[2],dstset);
2978 incrRefCount(c->argv[2]);
2979 }
2980 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
2981 incrRefCount(c->argv[3]);
2982 addReply(c,shared.cone);
2983 }
2984
2985 static void sismemberCommand(redisClient *c) {
2986 robj *set;
2987
2988 set = lookupKeyRead(c->db,c->argv[1]);
2989 if (set == NULL) {
2990 addReply(c,shared.czero);
2991 } else {
2992 if (set->type != REDIS_SET) {
2993 addReply(c,shared.wrongtypeerr);
2994 return;
2995 }
2996 if (dictFind(set->ptr,c->argv[2]))
2997 addReply(c,shared.cone);
2998 else
2999 addReply(c,shared.czero);
3000 }
3001 }
3002
3003 static void scardCommand(redisClient *c) {
3004 robj *o;
3005 dict *s;
3006
3007 o = lookupKeyRead(c->db,c->argv[1]);
3008 if (o == NULL) {
3009 addReply(c,shared.czero);
3010 return;
3011 } else {
3012 if (o->type != REDIS_SET) {
3013 addReply(c,shared.wrongtypeerr);
3014 } else {
3015 s = o->ptr;
3016 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3017 dictSize(s)));
3018 }
3019 }
3020 }
3021
3022 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3023 dict **d1 = (void*) s1, **d2 = (void*) s2;
3024
3025 return dictSize(*d1)-dictSize(*d2);
3026 }
3027
3028 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3029 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3030 dictIterator *di;
3031 dictEntry *de;
3032 robj *lenobj = NULL, *dstset = NULL;
3033 int j, cardinality = 0;
3034
3035 if (!dv) oom("sinterGenericCommand");
3036 for (j = 0; j < setsnum; j++) {
3037 robj *setobj;
3038
3039 setobj = dstkey ?
3040 lookupKeyWrite(c->db,setskeys[j]) :
3041 lookupKeyRead(c->db,setskeys[j]);
3042 if (!setobj) {
3043 zfree(dv);
3044 if (dstkey) {
3045 deleteKey(c->db,dstkey);
3046 addReply(c,shared.ok);
3047 } else {
3048 addReply(c,shared.nullmultibulk);
3049 }
3050 return;
3051 }
3052 if (setobj->type != REDIS_SET) {
3053 zfree(dv);
3054 addReply(c,shared.wrongtypeerr);
3055 return;
3056 }
3057 dv[j] = setobj->ptr;
3058 }
3059 /* Sort sets from the smallest to largest, this will improve our
3060 * algorithm's performace */
3061 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3062
3063 /* The first thing we should output is the total number of elements...
3064 * since this is a multi-bulk write, but at this stage we don't know
3065 * the intersection set size, so we use a trick, append an empty object
3066 * to the output list and save the pointer to later modify it with the
3067 * right length */
3068 if (!dstkey) {
3069 lenobj = createObject(REDIS_STRING,NULL);
3070 addReply(c,lenobj);
3071 decrRefCount(lenobj);
3072 } else {
3073 /* If we have a target key where to store the resulting set
3074 * create this key with an empty set inside */
3075 dstset = createSetObject();
3076 }
3077
3078 /* Iterate all the elements of the first (smallest) set, and test
3079 * the element against all the other sets, if at least one set does
3080 * not include the element it is discarded */
3081 di = dictGetIterator(dv[0]);
3082 if (!di) oom("dictGetIterator");
3083
3084 while((de = dictNext(di)) != NULL) {
3085 robj *ele;
3086
3087 for (j = 1; j < setsnum; j++)
3088 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3089 if (j != setsnum)
3090 continue; /* at least one set does not contain the member */
3091 ele = dictGetEntryKey(de);
3092 if (!dstkey) {
3093 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
3094 addReply(c,ele);
3095 addReply(c,shared.crlf);
3096 cardinality++;
3097 } else {
3098 dictAdd(dstset->ptr,ele,NULL);
3099 incrRefCount(ele);
3100 }
3101 }
3102 dictReleaseIterator(di);
3103
3104 if (dstkey) {
3105 /* Store the resulting set into the target */
3106 deleteKey(c->db,dstkey);
3107 dictAdd(c->db->dict,dstkey,dstset);
3108 incrRefCount(dstkey);
3109 }
3110
3111 if (!dstkey) {
3112 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3113 } else {
3114 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3115 dictSize((dict*)dstset->ptr)));
3116 server.dirty++;
3117 }
3118 zfree(dv);
3119 }
3120
3121 static void sinterCommand(redisClient *c) {
3122 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3123 }
3124
3125 static void sinterstoreCommand(redisClient *c) {
3126 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3127 }
3128
3129 #define REDIS_OP_UNION 0
3130 #define REDIS_OP_DIFF 1
3131
3132 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3133 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3134 dictIterator *di;
3135 dictEntry *de;
3136 robj *dstset = NULL;
3137 int j, cardinality = 0;
3138
3139 if (!dv) oom("sunionDiffGenericCommand");
3140 for (j = 0; j < setsnum; j++) {
3141 robj *setobj;
3142
3143 setobj = dstkey ?
3144 lookupKeyWrite(c->db,setskeys[j]) :
3145 lookupKeyRead(c->db,setskeys[j]);
3146 if (!setobj) {
3147 dv[j] = NULL;
3148 continue;
3149 }
3150 if (setobj->type != REDIS_SET) {
3151 zfree(dv);
3152 addReply(c,shared.wrongtypeerr);
3153 return;
3154 }
3155 dv[j] = setobj->ptr;
3156 }
3157
3158 /* We need a temp set object to store our union. If the dstkey
3159 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3160 * this set object will be the resulting object to set into the target key*/
3161 dstset = createSetObject();
3162
3163 /* Iterate all the elements of all the sets, add every element a single
3164 * time to the result set */
3165 for (j = 0; j < setsnum; j++) {
3166 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3167 if (!dv[j]) continue; /* non existing keys are like empty sets */
3168
3169 di = dictGetIterator(dv[j]);
3170 if (!di) oom("dictGetIterator");
3171
3172 while((de = dictNext(di)) != NULL) {
3173 robj *ele;
3174
3175 /* dictAdd will not add the same element multiple times */
3176 ele = dictGetEntryKey(de);
3177 if (op == REDIS_OP_UNION || j == 0) {
3178 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3179 incrRefCount(ele);
3180 cardinality++;
3181 }
3182 } else if (op == REDIS_OP_DIFF) {
3183 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3184 cardinality--;
3185 }
3186 }
3187 }
3188 dictReleaseIterator(di);
3189
3190 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3191 }
3192
3193 /* Output the content of the resulting set, if not in STORE mode */
3194 if (!dstkey) {
3195 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3196 di = dictGetIterator(dstset->ptr);
3197 if (!di) oom("dictGetIterator");
3198 while((de = dictNext(di)) != NULL) {
3199 robj *ele;
3200
3201 ele = dictGetEntryKey(de);
3202 addReplySds(c,sdscatprintf(sdsempty(),
3203 "$%d\r\n",sdslen(ele->ptr)));
3204 addReply(c,ele);
3205 addReply(c,shared.crlf);
3206 }
3207 dictReleaseIterator(di);
3208 } else {
3209 /* If we have a target key where to store the resulting set
3210 * create this key with the result set inside */
3211 deleteKey(c->db,dstkey);
3212 dictAdd(c->db->dict,dstkey,dstset);
3213 incrRefCount(dstkey);
3214 }
3215
3216 /* Cleanup */
3217 if (!dstkey) {
3218 decrRefCount(dstset);
3219 } else {
3220 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3221 dictSize((dict*)dstset->ptr)));
3222 server.dirty++;
3223 }
3224 zfree(dv);
3225 }
3226
3227 static void sunionCommand(redisClient *c) {
3228 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3229 }
3230
3231 static void sunionstoreCommand(redisClient *c) {
3232 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3233 }
3234
3235 static void sdiffCommand(redisClient *c) {
3236 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3237 }
3238
3239 static void sdiffstoreCommand(redisClient *c) {
3240 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3241 }
3242
3243 static void flushdbCommand(redisClient *c) {
3244 server.dirty += dictSize(c->db->dict);
3245 dictEmpty(c->db->dict);
3246 dictEmpty(c->db->expires);
3247 addReply(c,shared.ok);
3248 }
3249
3250 static void flushallCommand(redisClient *c) {
3251 server.dirty += emptyDb();
3252 addReply(c,shared.ok);
3253 rdbSave(server.dbfilename);
3254 server.dirty++;
3255 }
3256
3257 redisSortOperation *createSortOperation(int type, robj *pattern) {
3258 redisSortOperation *so = zmalloc(sizeof(*so));
3259 if (!so) oom("createSortOperation");
3260 so->type = type;
3261 so->pattern = pattern;
3262 return so;
3263 }
3264
3265 /* Return the value associated to the key with a name obtained
3266 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3267 robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3268 char *p;
3269 sds spat, ssub;
3270 robj keyobj;
3271 int prefixlen, sublen, postfixlen;
3272 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3273 struct {
3274 long len;
3275 long free;
3276 char buf[REDIS_SORTKEY_MAX+1];
3277 } keyname;
3278
3279 spat = pattern->ptr;
3280 ssub = subst->ptr;
3281 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3282 p = strchr(spat,'*');
3283 if (!p) return NULL;
3284
3285 prefixlen = p-spat;
3286 sublen = sdslen(ssub);
3287 postfixlen = sdslen(spat)-(prefixlen+1);
3288 memcpy(keyname.buf,spat,prefixlen);
3289 memcpy(keyname.buf+prefixlen,ssub,sublen);
3290 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3291 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3292 keyname.len = prefixlen+sublen+postfixlen;
3293
3294 keyobj.refcount = 1;
3295 keyobj.type = REDIS_STRING;
3296 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3297
3298 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3299 return lookupKeyRead(db,&keyobj);
3300 }
3301
3302 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3303 * the additional parameter is not standard but a BSD-specific we have to
3304 * pass sorting parameters via the global 'server' structure */
3305 static int sortCompare(const void *s1, const void *s2) {
3306 const redisSortObject *so1 = s1, *so2 = s2;
3307 int cmp;
3308
3309 if (!server.sort_alpha) {
3310 /* Numeric sorting. Here it's trivial as we precomputed scores */
3311 if (so1->u.score > so2->u.score) {
3312 cmp = 1;
3313 } else if (so1->u.score < so2->u.score) {
3314 cmp = -1;
3315 } else {
3316 cmp = 0;
3317 }
3318 } else {
3319 /* Alphanumeric sorting */
3320 if (server.sort_bypattern) {
3321 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3322 /* At least one compare object is NULL */
3323 if (so1->u.cmpobj == so2->u.cmpobj)
3324 cmp = 0;
3325 else if (so1->u.cmpobj == NULL)
3326 cmp = -1;
3327 else
3328 cmp = 1;
3329 } else {
3330 /* We have both the objects, use strcoll */
3331 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3332 }
3333 } else {
3334 /* Compare elements directly */
3335 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3336 }
3337 }
3338 return server.sort_desc ? -cmp : cmp;
3339 }
3340
3341 /* The SORT command is the most complex command in Redis. Warning: this code
3342 * is optimized for speed and a bit less for readability */
3343 static void sortCommand(redisClient *c) {
3344 list *operations;
3345 int outputlen = 0;
3346 int desc = 0, alpha = 0;
3347 int limit_start = 0, limit_count = -1, start, end;
3348 int j, dontsort = 0, vectorlen;
3349 int getop = 0; /* GET operation counter */
3350 robj *sortval, *sortby = NULL;
3351 redisSortObject *vector; /* Resulting vector to sort */
3352
3353 /* Lookup the key to sort. It must be of the right types */
3354 sortval = lookupKeyRead(c->db,c->argv[1]);
3355 if (sortval == NULL) {
3356 addReply(c,shared.nokeyerr);
3357 return;
3358 }
3359 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3360 addReply(c,shared.wrongtypeerr);
3361 return;
3362 }
3363
3364 /* Create a list of operations to perform for every sorted element.
3365 * Operations can be GET/DEL/INCR/DECR */
3366 operations = listCreate();
3367 listSetFreeMethod(operations,zfree);
3368 j = 2;
3369
3370 /* Now we need to protect sortval incrementing its count, in the future
3371 * SORT may have options able to overwrite/delete keys during the sorting
3372 * and the sorted key itself may get destroied */
3373 incrRefCount(sortval);
3374
3375 /* The SORT command has an SQL-alike syntax, parse it */
3376 while(j < c->argc) {
3377 int leftargs = c->argc-j-1;
3378 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3379 desc = 0;
3380 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3381 desc = 1;
3382 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3383 alpha = 1;
3384 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3385 limit_start = atoi(c->argv[j+1]->ptr);
3386 limit_count = atoi(c->argv[j+2]->ptr);
3387 j+=2;
3388 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3389 sortby = c->argv[j+1];
3390 /* If the BY pattern does not contain '*', i.e. it is constant,
3391 * we don't need to sort nor to lookup the weight keys. */
3392 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3393 j++;
3394 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3395 listAddNodeTail(operations,createSortOperation(
3396 REDIS_SORT_GET,c->argv[j+1]));
3397 getop++;
3398 j++;
3399 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3400 listAddNodeTail(operations,createSortOperation(
3401 REDIS_SORT_DEL,c->argv[j+1]));
3402 j++;
3403 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3404 listAddNodeTail(operations,createSortOperation(
3405 REDIS_SORT_INCR,c->argv[j+1]));
3406 j++;
3407 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3408 listAddNodeTail(operations,createSortOperation(
3409 REDIS_SORT_DECR,c->argv[j+1]));
3410 j++;
3411 } else {
3412 decrRefCount(sortval);
3413 listRelease(operations);
3414 addReply(c,shared.syntaxerr);
3415 return;
3416 }
3417 j++;
3418 }
3419
3420 /* Load the sorting vector with all the objects to sort */
3421 vectorlen = (sortval->type == REDIS_LIST) ?
3422 listLength((list*)sortval->ptr) :
3423 dictSize((dict*)sortval->ptr);
3424 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3425 if (!vector) oom("allocating objects vector for SORT");
3426 j = 0;
3427 if (sortval->type == REDIS_LIST) {
3428 list *list = sortval->ptr;
3429 listNode *ln;
3430
3431 listRewind(list);
3432 while((ln = listYield(list))) {
3433 robj *ele = ln->value;
3434 vector[j].obj = ele;
3435 vector[j].u.score = 0;
3436 vector[j].u.cmpobj = NULL;
3437 j++;
3438 }
3439 } else {
3440 dict *set = sortval->ptr;
3441 dictIterator *di;
3442 dictEntry *setele;
3443
3444 di = dictGetIterator(set);
3445 if (!di) oom("dictGetIterator");
3446 while((setele = dictNext(di)) != NULL) {
3447 vector[j].obj = dictGetEntryKey(setele);
3448 vector[j].u.score = 0;
3449 vector[j].u.cmpobj = NULL;
3450 j++;
3451 }
3452 dictReleaseIterator(di);
3453 }
3454 assert(j == vectorlen);
3455
3456 /* Now it's time to load the right scores in the sorting vector */
3457 if (dontsort == 0) {
3458 for (j = 0; j < vectorlen; j++) {
3459 if (sortby) {
3460 robj *byval;
3461
3462 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3463 if (!byval || byval->type != REDIS_STRING) continue;
3464 if (alpha) {
3465 vector[j].u.cmpobj = byval;
3466 incrRefCount(byval);
3467 } else {
3468 vector[j].u.score = strtod(byval->ptr,NULL);
3469 }
3470 } else {
3471 if (!alpha) vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3472 }
3473 }
3474 }
3475
3476 /* We are ready to sort the vector... perform a bit of sanity check
3477 * on the LIMIT option too. We'll use a partial version of quicksort. */
3478 start = (limit_start < 0) ? 0 : limit_start;
3479 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3480 if (start >= vectorlen) {
3481 start = vectorlen-1;
3482 end = vectorlen-2;
3483 }
3484 if (end >= vectorlen) end = vectorlen-1;
3485
3486 if (dontsort == 0) {
3487 server.sort_desc = desc;
3488 server.sort_alpha = alpha;
3489 server.sort_bypattern = sortby ? 1 : 0;
3490 if (sortby && (start != 0 || end != vectorlen-1))
3491 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3492 else
3493 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3494 }
3495
3496 /* Send command output to the output buffer, performing the specified
3497 * GET/DEL/INCR/DECR operations if any. */
3498 outputlen = getop ? getop*(end-start+1) : end-start+1;
3499 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3500 for (j = start; j <= end; j++) {
3501 listNode *ln;
3502 if (!getop) {
3503 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3504 sdslen(vector[j].obj->ptr)));
3505 addReply(c,vector[j].obj);
3506 addReply(c,shared.crlf);
3507 }
3508 listRewind(operations);
3509 while((ln = listYield(operations))) {
3510 redisSortOperation *sop = ln->value;
3511 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3512 vector[j].obj);
3513
3514 if (sop->type == REDIS_SORT_GET) {
3515 if (!val || val->type != REDIS_STRING) {
3516 addReply(c,shared.nullbulk);
3517 } else {
3518 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3519 sdslen(val->ptr)));
3520 addReply(c,val);
3521 addReply(c,shared.crlf);
3522 }
3523 } else if (sop->type == REDIS_SORT_DEL) {
3524 /* TODO */
3525 }
3526 }
3527 }
3528
3529 /* Cleanup */
3530 decrRefCount(sortval);
3531 listRelease(operations);
3532 for (j = 0; j < vectorlen; j++) {
3533 if (sortby && alpha && vector[j].u.cmpobj)
3534 decrRefCount(vector[j].u.cmpobj);
3535 }
3536 zfree(vector);
3537 }
3538
3539 static void infoCommand(redisClient *c) {
3540 sds info;
3541 time_t uptime = time(NULL)-server.stat_starttime;
3542
3543 info = sdscatprintf(sdsempty(),
3544 "redis_version:%s\r\n"
3545 "uptime_in_seconds:%d\r\n"
3546 "uptime_in_days:%d\r\n"
3547 "connected_clients:%d\r\n"
3548 "connected_slaves:%d\r\n"
3549 "used_memory:%zu\r\n"
3550 "changes_since_last_save:%lld\r\n"
3551 "bgsave_in_progress:%d\r\n"
3552 "last_save_time:%d\r\n"
3553 "total_connections_received:%lld\r\n"
3554 "total_commands_processed:%lld\r\n"
3555 "role:%s\r\n"
3556 ,REDIS_VERSION,
3557 uptime,
3558 uptime/(3600*24),
3559 listLength(server.clients)-listLength(server.slaves),
3560 listLength(server.slaves),
3561 server.usedmemory,
3562 server.dirty,
3563 server.bgsaveinprogress,
3564 server.lastsave,
3565 server.stat_numconnections,
3566 server.stat_numcommands,
3567 server.masterhost == NULL ? "master" : "slave"
3568 );
3569 if (server.masterhost) {
3570 info = sdscatprintf(info,
3571 "master_host:%s\r\n"
3572 "master_port:%d\r\n"
3573 "master_link_status:%s\r\n"
3574 "master_last_io_seconds_ago:%d\r\n"
3575 ,server.masterhost,
3576 server.masterport,
3577 (server.replstate == REDIS_REPL_CONNECTED) ?
3578 "up" : "down",
3579 (int)(time(NULL)-server.master->lastinteraction)
3580 );
3581 }
3582 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
3583 addReplySds(c,info);
3584 addReply(c,shared.crlf);
3585 }
3586
3587 static void monitorCommand(redisClient *c) {
3588 /* ignore MONITOR if aleady slave or in monitor mode */
3589 if (c->flags & REDIS_SLAVE) return;
3590
3591 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3592 c->slaveseldb = 0;
3593 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3594 addReply(c,shared.ok);
3595 }
3596
3597 /* ================================= Expire ================================= */
3598 static int removeExpire(redisDb *db, robj *key) {
3599 if (dictDelete(db->expires,key) == DICT_OK) {
3600 return 1;
3601 } else {
3602 return 0;
3603 }
3604 }
3605
3606 static int setExpire(redisDb *db, robj *key, time_t when) {
3607 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
3608 return 0;
3609 } else {
3610 incrRefCount(key);
3611 return 1;
3612 }
3613 }
3614
3615 /* Return the expire time of the specified key, or -1 if no expire
3616 * is associated with this key (i.e. the key is non volatile) */
3617 static time_t getExpire(redisDb *db, robj *key) {
3618 dictEntry *de;
3619
3620 /* No expire? return ASAP */
3621 if (dictSize(db->expires) == 0 ||
3622 (de = dictFind(db->expires,key)) == NULL) return -1;
3623
3624 return (time_t) dictGetEntryVal(de);
3625 }
3626
3627 static int expireIfNeeded(redisDb *db, robj *key) {
3628 time_t when;
3629 dictEntry *de;
3630
3631 /* No expire? return ASAP */
3632 if (dictSize(db->expires) == 0 ||
3633 (de = dictFind(db->expires,key)) == NULL) return 0;
3634
3635 /* Lookup the expire */
3636 when = (time_t) dictGetEntryVal(de);
3637 if (time(NULL) <= when) return 0;
3638
3639 /* Delete the key */
3640 dictDelete(db->expires,key);
3641 return dictDelete(db->dict,key) == DICT_OK;
3642 }
3643
3644 static int deleteIfVolatile(redisDb *db, robj *key) {
3645 dictEntry *de;
3646
3647 /* No expire? return ASAP */
3648 if (dictSize(db->expires) == 0 ||
3649 (de = dictFind(db->expires,key)) == NULL) return 0;
3650
3651 /* Delete the key */
3652 server.dirty++;
3653 dictDelete(db->expires,key);
3654 return dictDelete(db->dict,key) == DICT_OK;
3655 }
3656
3657 static void expireCommand(redisClient *c) {
3658 dictEntry *de;
3659 int seconds = atoi(c->argv[2]->ptr);
3660
3661 de = dictFind(c->db->dict,c->argv[1]);
3662 if (de == NULL) {
3663 addReply(c,shared.czero);
3664 return;
3665 }
3666 if (seconds <= 0) {
3667 addReply(c, shared.czero);
3668 return;
3669 } else {
3670 time_t when = time(NULL)+seconds;
3671 if (setExpire(c->db,c->argv[1],when))
3672 addReply(c,shared.cone);
3673 else
3674 addReply(c,shared.czero);
3675 return;
3676 }
3677 }
3678
3679 static void ttlCommand(redisClient *c) {
3680 time_t expire;
3681 int ttl = -1;
3682
3683 expire = getExpire(c->db,c->argv[1]);
3684 if (expire != -1) {
3685 ttl = (int) (expire-time(NULL));
3686 if (ttl < 0) ttl = -1;
3687 }
3688 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
3689 }
3690
3691 /* =============================== Replication ============================= */
3692
3693 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
3694 ssize_t nwritten, ret = size;
3695 time_t start = time(NULL);
3696
3697 timeout++;
3698 while(size) {
3699 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
3700 nwritten = write(fd,ptr,size);
3701 if (nwritten == -1) return -1;
3702 ptr += nwritten;
3703 size -= nwritten;
3704 }
3705 if ((time(NULL)-start) > timeout) {
3706 errno = ETIMEDOUT;
3707 return -1;
3708 }
3709 }
3710 return ret;
3711 }
3712
3713 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
3714 ssize_t nread, totread = 0;
3715 time_t start = time(NULL);
3716
3717 timeout++;
3718 while(size) {
3719 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
3720 nread = read(fd,ptr,size);
3721 if (nread == -1) return -1;
3722 ptr += nread;
3723 size -= nread;
3724 totread += nread;
3725 }
3726 if ((time(NULL)-start) > timeout) {
3727 errno = ETIMEDOUT;
3728 return -1;
3729 }
3730 }
3731 return totread;
3732 }
3733
3734 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
3735 ssize_t nread = 0;
3736
3737 size--;
3738 while(size) {
3739 char c;
3740
3741 if (syncRead(fd,&c,1,timeout) == -1) return -1;
3742 if (c == '\n') {
3743 *ptr = '\0';
3744 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
3745 return nread;
3746 } else {
3747 *ptr++ = c;
3748 *ptr = '\0';
3749 nread++;
3750 }
3751 }
3752 return nread;
3753 }
3754
3755 static void syncCommand(redisClient *c) {
3756 /* ignore SYNC if aleady slave or in monitor mode */
3757 if (c->flags & REDIS_SLAVE) return;
3758
3759 /* SYNC can't be issued when the server has pending data to send to
3760 * the client about already issued commands. We need a fresh reply
3761 * buffer registering the differences between the BGSAVE and the current
3762 * dataset, so that we can copy to other slaves if needed. */
3763 if (listLength(c->reply) != 0) {
3764 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3765 return;
3766 }
3767
3768 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
3769 /* Here we need to check if there is a background saving operation
3770 * in progress, or if it is required to start one */
3771 if (server.bgsaveinprogress) {
3772 /* Ok a background save is in progress. Let's check if it is a good
3773 * one for replication, i.e. if there is another slave that is
3774 * registering differences since the server forked to save */
3775 redisClient *slave;
3776 listNode *ln;
3777
3778 listRewind(server.slaves);
3779 while((ln = listYield(server.slaves))) {
3780 slave = ln->value;
3781 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
3782 }
3783 if (ln) {
3784 /* Perfect, the server is already registering differences for
3785 * another slave. Set the right state, and copy the buffer. */
3786 listRelease(c->reply);
3787 c->reply = listDup(slave->reply);
3788 if (!c->reply) oom("listDup copying slave reply list");
3789 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3790 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
3791 } else {
3792 /* No way, we need to wait for the next BGSAVE in order to
3793 * register differences */
3794 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
3795 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
3796 }
3797 } else {
3798 /* Ok we don't have a BGSAVE in progress, let's start one */
3799 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
3800 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3801 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
3802 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
3803 return;
3804 }
3805 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3806 }
3807 c->repldbfd = -1;
3808 c->flags |= REDIS_SLAVE;
3809 c->slaveseldb = 0;
3810 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
3811 return;
3812 }
3813
3814 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
3815 redisClient *slave = privdata;
3816 REDIS_NOTUSED(el);
3817 REDIS_NOTUSED(mask);
3818 char buf[REDIS_IOBUF_LEN];
3819 ssize_t nwritten, buflen;
3820
3821 if (slave->repldboff == 0) {
3822 /* Write the bulk write count before to transfer the DB. In theory here
3823 * we don't know how much room there is in the output buffer of the
3824 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3825 * operations) will never be smaller than the few bytes we need. */
3826 sds bulkcount;
3827
3828 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3829 slave->repldbsize);
3830 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
3831 {
3832 sdsfree(bulkcount);
3833 freeClient(slave);
3834 return;
3835 }
3836 sdsfree(bulkcount);
3837 }
3838 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
3839 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
3840 if (buflen <= 0) {
3841 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
3842 (buflen == 0) ? "premature EOF" : strerror(errno));
3843 freeClient(slave);
3844 return;
3845 }
3846 if ((nwritten = write(fd,buf,buflen)) == -1) {
3847 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
3848 strerror(errno));
3849 freeClient(slave);
3850 return;
3851 }
3852 slave->repldboff += nwritten;
3853 if (slave->repldboff == slave->repldbsize) {
3854 close(slave->repldbfd);
3855 slave->repldbfd = -1;
3856 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3857 slave->replstate = REDIS_REPL_ONLINE;
3858 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
3859 sendReplyToClient, slave, NULL) == AE_ERR) {
3860 freeClient(slave);
3861 return;
3862 }
3863 addReplySds(slave,sdsempty());
3864 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
3865 }
3866 }
3867
3868 static void updateSalvesWaitingBgsave(int bgsaveerr) {
3869 listNode *ln;
3870 int startbgsave = 0;
3871
3872 listRewind(server.slaves);
3873 while((ln = listYield(server.slaves))) {
3874 redisClient *slave = ln->value;
3875
3876 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
3877 startbgsave = 1;
3878 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3879 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
3880 struct stat buf;
3881
3882 if (bgsaveerr != REDIS_OK) {
3883 freeClient(slave);
3884 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
3885 continue;
3886 }
3887 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
3888 fstat(slave->repldbfd,&buf) == -1) {
3889 freeClient(slave);
3890 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
3891 continue;
3892 }
3893 slave->repldboff = 0;
3894 slave->repldbsize = buf.st_size;
3895 slave->replstate = REDIS_REPL_SEND_BULK;
3896 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3897 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
3898 freeClient(slave);
3899 continue;
3900 }
3901 }
3902 }
3903 if (startbgsave) {
3904 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3905 listRewind(server.slaves);
3906 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
3907 while((ln = listYield(server.slaves))) {
3908 redisClient *slave = ln->value;
3909
3910 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
3911 freeClient(slave);
3912 }
3913 }
3914 }
3915 }
3916
3917 static int syncWithMaster(void) {
3918 char buf[1024], tmpfile[256];
3919 int dumpsize;
3920 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
3921 int dfd;
3922
3923 if (fd == -1) {
3924 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
3925 strerror(errno));
3926 return REDIS_ERR;
3927 }
3928 /* Issue the SYNC command */
3929 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
3930 close(fd);
3931 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
3932 strerror(errno));
3933 return REDIS_ERR;
3934 }
3935 /* Read the bulk write count */
3936 if (syncReadLine(fd,buf,1024,3600) == -1) {
3937 close(fd);
3938 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
3939 strerror(errno));
3940 return REDIS_ERR;
3941 }
3942 dumpsize = atoi(buf+1);
3943 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
3944 /* Read the bulk write data on a temp file */
3945 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
3946 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
3947 if (dfd == -1) {
3948 close(fd);
3949 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
3950 return REDIS_ERR;
3951 }
3952 while(dumpsize) {
3953 int nread, nwritten;
3954
3955 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
3956 if (nread == -1) {
3957 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
3958 strerror(errno));
3959 close(fd);
3960 close(dfd);
3961 return REDIS_ERR;
3962 }
3963 nwritten = write(dfd,buf,nread);
3964 if (nwritten == -1) {
3965 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
3966 close(fd);
3967 close(dfd);
3968 return REDIS_ERR;
3969 }
3970 dumpsize -= nread;
3971 }
3972 close(dfd);
3973 if (rename(tmpfile,server.dbfilename) == -1) {
3974 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
3975 unlink(tmpfile);
3976 close(fd);
3977 return REDIS_ERR;
3978 }
3979 emptyDb();
3980 if (rdbLoad(server.dbfilename) != REDIS_OK) {
3981 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
3982 close(fd);
3983 return REDIS_ERR;
3984 }
3985 server.master = createClient(fd);
3986 server.master->flags |= REDIS_MASTER;
3987 server.replstate = REDIS_REPL_CONNECTED;
3988 return REDIS_OK;
3989 }
3990
3991 static void slaveofCommand(redisClient *c) {
3992 if (!strcasecmp(c->argv[1]->ptr,"no") &&
3993 !strcasecmp(c->argv[2]->ptr,"one")) {
3994 if (server.masterhost) {
3995 sdsfree(server.masterhost);
3996 server.masterhost = NULL;
3997 if (server.master) freeClient(server.master);
3998 server.replstate = REDIS_REPL_NONE;
3999 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4000 }
4001 } else {
4002 sdsfree(server.masterhost);
4003 server.masterhost = sdsdup(c->argv[1]->ptr);
4004 server.masterport = atoi(c->argv[2]->ptr);
4005 if (server.master) freeClient(server.master);
4006 server.replstate = REDIS_REPL_CONNECT;
4007 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4008 server.masterhost, server.masterport);
4009 }
4010 addReply(c,shared.ok);
4011 }
4012
4013 /* ============================ Maxmemory directive ======================== */
4014
4015 /* This function gets called when 'maxmemory' is set on the config file to limit
4016 * the max memory used by the server, and we are out of memory.
4017 * This function will try to, in order:
4018 *
4019 * - Free objects from the free list
4020 * - Try to remove keys with an EXPIRE set
4021 *
4022 * It is not possible to free enough memory to reach used-memory < maxmemory
4023 * the server will start refusing commands that will enlarge even more the
4024 * memory usage.
4025 */
4026 static void freeMemoryIfNeeded(void) {
4027 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4028 if (listLength(server.objfreelist)) {
4029 robj *o;
4030
4031 listNode *head = listFirst(server.objfreelist);
4032 o = listNodeValue(head);
4033 listDelNode(server.objfreelist,head);
4034 zfree(o);
4035 } else {
4036 int j, k, freed = 0;
4037
4038 for (j = 0; j < server.dbnum; j++) {
4039 int minttl = -1;
4040 robj *minkey = NULL;
4041 struct dictEntry *de;
4042
4043 if (dictSize(server.db[j].expires)) {
4044 freed = 1;
4045 /* From a sample of three keys drop the one nearest to
4046 * the natural expire */
4047 for (k = 0; k < 3; k++) {
4048 time_t t;
4049
4050 de = dictGetRandomKey(server.db[j].expires);
4051 t = (time_t) dictGetEntryVal(de);
4052 if (minttl == -1 || t < minttl) {
4053 minkey = dictGetEntryKey(de);
4054 minttl = t;
4055 }
4056 }
4057 deleteKey(server.db+j,minkey);
4058 }
4059 }
4060 if (!freed) return; /* nothing to free... */
4061 }
4062 }
4063 }
4064
4065 /* ================================= Debugging ============================== */
4066
4067 static void debugCommand(redisClient *c) {
4068 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4069 *((char*)-1) = 'x';
4070 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4071 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4072 robj *key, *val;
4073
4074 if (!de) {
4075 addReply(c,shared.nokeyerr);
4076 return;
4077 }
4078 key = dictGetEntryKey(de);
4079 val = dictGetEntryVal(de);
4080 addReplySds(c,sdscatprintf(sdsempty(),
4081 "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
4082 key, key->refcount, val, val->refcount));
4083 } else {
4084 addReplySds(c,sdsnew(
4085 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4086 }
4087 }
4088
4089 /* =================================== Main! ================================ */
4090
4091 #ifdef __linux__
4092 int linuxOvercommitMemoryValue(void) {
4093 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4094 char buf[64];
4095
4096 if (!fp) return -1;
4097 if (fgets(buf,64,fp) == NULL) {
4098 fclose(fp);
4099 return -1;
4100 }
4101 fclose(fp);
4102
4103 return atoi(buf);
4104 }
4105
4106 void linuxOvercommitMemoryWarning(void) {
4107 if (linuxOvercommitMemoryValue() == 0) {
4108 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4109 }
4110 }
4111 #endif /* __linux__ */
4112
4113 static void daemonize(void) {
4114 int fd;
4115 FILE *fp;
4116
4117 if (fork() != 0) exit(0); /* parent exits */
4118 setsid(); /* create a new session */
4119
4120 /* Every output goes to /dev/null. If Redis is daemonized but
4121 * the 'logfile' is set to 'stdout' in the configuration file
4122 * it will not log at all. */
4123 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4124 dup2(fd, STDIN_FILENO);
4125 dup2(fd, STDOUT_FILENO);
4126 dup2(fd, STDERR_FILENO);
4127 if (fd > STDERR_FILENO) close(fd);
4128 }
4129 /* Try to write the pid file */
4130 fp = fopen(server.pidfile,"w");
4131 if (fp) {
4132 fprintf(fp,"%d\n",getpid());
4133 fclose(fp);
4134 }
4135 }
4136
4137 static void segvHandler (int sig, siginfo_t *info, void *secret) {
4138
4139 void *trace[16];
4140 char **messages = (char **)NULL;
4141 int i, trace_size = 0;
4142 ucontext_t *uc = (ucontext_t *)secret;
4143
4144 /* Do something useful with siginfo_t */
4145 if (sig == SIGSEGV)
4146 printf("Got signal %d, faulty address is %p, from %p\n", sig, info->si_addr,
4147 (void *)uc->uc_mcontext.gregs[REG_EIP]);
4148 else
4149 printf("Got signal %d\n", sig);
4150
4151 trace_size = backtrace(trace, 16);
4152 /* overwrite sigaction with caller's address */
4153 trace[1] = (void *) uc->uc_mcontext.gregs[REG_EIP];
4154
4155 messages = backtrace_symbols(trace, trace_size);
4156 /* skip first stack frame (points here) */
4157 printf("[bt] Execution path:\n");
4158 for (i=1; i<trace_size; ++i)
4159 printf("[bt] %s\n", messages[i]);
4160
4161 exit(0);
4162 }
4163
4164 void setupSigAction(){
4165 struct sigaction act;
4166 sigemptyset (&act.sa_mask);
4167 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used. Otherwise, sa_handler is used */
4168 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
4169 act.sa_sigaction = segvHandler;
4170 sigaction (SIGSEGV, &act, NULL);
4171 }
4172
4173 int main(int argc, char **argv) {
4174 setupSigAction();
4175 #ifdef __linux__
4176 linuxOvercommitMemoryWarning();
4177 #endif
4178
4179 initServerConfig();
4180 if (argc == 2) {
4181 ResetServerSaveParams();
4182 loadServerConfig(argv[1]);
4183 } else if (argc > 2) {
4184 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4185 exit(1);
4186 } else {
4187 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4188 }
4189 initServer();
4190 if (server.daemonize) daemonize();
4191 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4192 if (rdbLoad(server.dbfilename) == REDIS_OK)
4193 redisLog(REDIS_NOTICE,"DB loaded from disk");
4194 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4195 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4196 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4197 aeMain(server.el);
4198 aeDeleteEventLoop(server.el);
4199 return 0;
4200 }