]> git.saurik.com Git - redis.git/blob - redis.c
521c7aa0238973a1dccd302745baa212247713b9
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "0.101"
31
32 #include "fmacros.h"
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 #define __USE_POSIX199309
40 #include <signal.h>
41 #include <execinfo.h>
42 #include <ucontext.h>
43 #include <sys/wait.h>
44 #include <errno.h>
45 #include <assert.h>
46 #include <ctype.h>
47 #include <stdarg.h>
48 #include <inttypes.h>
49 #include <arpa/inet.h>
50 #include <sys/stat.h>
51 #include <fcntl.h>
52 #include <sys/time.h>
53 #include <sys/resource.h>
54 #include <limits.h>
55 #include <execinfo.h>
56
57 #include "redis.h"
58 #include "ae.h" /* Event driven programming library */
59 #include "sds.h" /* Dynamic safe strings */
60 #include "anet.h" /* Networking the easy way */
61 #include "dict.h" /* Hash tables */
62 #include "adlist.h" /* Linked lists */
63 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
64 #include "lzf.h" /* LZF compression library */
65 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
66
67 /* Error codes */
68 #define REDIS_OK 0
69 #define REDIS_ERR -1
70
71 /* Static server configuration */
72 #define REDIS_SERVERPORT 6379 /* TCP port */
73 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
74 #define REDIS_IOBUF_LEN 1024
75 #define REDIS_LOADBUF_LEN 1024
76 #define REDIS_STATIC_ARGS 4
77 #define REDIS_DEFAULT_DBNUM 16
78 #define REDIS_CONFIGLINE_MAX 1024
79 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
80 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
81 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
82
83 /* Hash table parameters */
84 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
85 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
86
87 /* Command flags */
88 #define REDIS_CMD_BULK 1 /* Bulk write command */
89 #define REDIS_CMD_INLINE 2 /* Inline command */
90 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
91 this flags will return an error when the 'maxmemory' option is set in the
92 config file and the server is using more than maxmemory bytes of memory.
93 In short this commands are denied on low memory conditions. */
94 #define REDIS_CMD_DENYOOM 4
95
96 /* Object types */
97 #define REDIS_STRING 0
98 #define REDIS_LIST 1
99 #define REDIS_SET 2
100 #define REDIS_HASH 3
101
102 /* Object types only used for dumping to disk */
103 #define REDIS_EXPIRETIME 253
104 #define REDIS_SELECTDB 254
105 #define REDIS_EOF 255
106
107 /* Defines related to the dump file format. To store 32 bits lengths for short
108 * keys requires a lot of space, so we check the most significant 2 bits of
109 * the first byte to interpreter the length:
110 *
111 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
112 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
113 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
114 * 11|000000 this means: specially encoded object will follow. The six bits
115 * number specify the kind of object that follows.
116 * See the REDIS_RDB_ENC_* defines.
117 *
118 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
119 * values, will fit inside. */
120 #define REDIS_RDB_6BITLEN 0
121 #define REDIS_RDB_14BITLEN 1
122 #define REDIS_RDB_32BITLEN 2
123 #define REDIS_RDB_ENCVAL 3
124 #define REDIS_RDB_LENERR UINT_MAX
125
126 /* When a length of a string object stored on disk has the first two bits
127 * set, the remaining two bits specify a special encoding for the object
128 * accordingly to the following defines: */
129 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
130 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
131 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
132 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
133
134 /* Client flags */
135 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
136 #define REDIS_SLAVE 2 /* This client is a slave server */
137 #define REDIS_MASTER 4 /* This client is a master server */
138 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
139
140 /* Slave replication state - slave side */
141 #define REDIS_REPL_NONE 0 /* No active replication */
142 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
143 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
144
145 /* Slave replication state - from the point of view of master
146 * Note that in SEND_BULK and ONLINE state the slave receives new updates
147 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
148 * to start the next background saving in order to send updates to it. */
149 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
150 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
151 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
152 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
153
154 /* List related stuff */
155 #define REDIS_HEAD 0
156 #define REDIS_TAIL 1
157
158 /* Sort operations */
159 #define REDIS_SORT_GET 0
160 #define REDIS_SORT_DEL 1
161 #define REDIS_SORT_INCR 2
162 #define REDIS_SORT_DECR 3
163 #define REDIS_SORT_ASC 4
164 #define REDIS_SORT_DESC 5
165 #define REDIS_SORTKEY_MAX 1024
166
167 /* Log levels */
168 #define REDIS_DEBUG 0
169 #define REDIS_NOTICE 1
170 #define REDIS_WARNING 2
171
172 /* Anti-warning macro... */
173 #define REDIS_NOTUSED(V) ((void) V)
174
175
176 /*================================= Data types ============================== */
177
178 /* A redis object, that is a type able to hold a string / list / set */
179 typedef struct redisObject {
180 void *ptr;
181 int type;
182 int refcount;
183 } robj;
184
185 typedef struct redisDb {
186 dict *dict;
187 dict *expires;
188 int id;
189 } redisDb;
190
191 /* With multiplexing we need to take per-clinet state.
192 * Clients are taken in a liked list. */
193 typedef struct redisClient {
194 int fd;
195 redisDb *db;
196 int dictid;
197 sds querybuf;
198 robj **argv;
199 int argc;
200 int bulklen; /* bulk read len. -1 if not in bulk read mode */
201 list *reply;
202 int sentlen;
203 time_t lastinteraction; /* time of the last interaction, used for timeout */
204 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
205 int slaveseldb; /* slave selected db, if this client is a slave */
206 int authenticated; /* when requirepass is non-NULL */
207 int replstate; /* replication state if this is a slave */
208 int repldbfd; /* replication DB file descriptor */
209 long repldboff; /* replication DB file offset */
210 off_t repldbsize; /* replication DB file size */
211 } redisClient;
212
213 struct saveparam {
214 time_t seconds;
215 int changes;
216 };
217
218 /* Global server state structure */
219 struct redisServer {
220 int port;
221 int fd;
222 redisDb *db;
223 dict *sharingpool;
224 unsigned int sharingpoolsize;
225 long long dirty; /* changes to DB from the last save */
226 list *clients;
227 list *slaves, *monitors;
228 char neterr[ANET_ERR_LEN];
229 aeEventLoop *el;
230 int cronloops; /* number of times the cron function run */
231 list *objfreelist; /* A list of freed objects to avoid malloc() */
232 time_t lastsave; /* Unix time of last save succeeede */
233 size_t usedmemory; /* Used memory in megabytes */
234 /* Fields used only for stats */
235 time_t stat_starttime; /* server start time */
236 long long stat_numcommands; /* number of processed commands */
237 long long stat_numconnections; /* number of connections received */
238 /* Configuration */
239 int verbosity;
240 int glueoutputbuf;
241 int maxidletime;
242 int dbnum;
243 int daemonize;
244 char *pidfile;
245 int bgsaveinprogress;
246 struct saveparam *saveparams;
247 int saveparamslen;
248 char *logfile;
249 char *bindaddr;
250 char *dbfilename;
251 char *requirepass;
252 int shareobjects;
253 /* Replication related */
254 int isslave;
255 char *masterhost;
256 int masterport;
257 redisClient *master; /* client that is master for this slave */
258 int replstate;
259 unsigned int maxclients;
260 unsigned int maxmemory;
261 /* Sort parameters - qsort_r() is only available under BSD so we
262 * have to take this state global, in order to pass it to sortCompare() */
263 int sort_desc;
264 int sort_alpha;
265 int sort_bypattern;
266 };
267
268 typedef void redisCommandProc(redisClient *c);
269 struct redisCommand {
270 char *name;
271 redisCommandProc *proc;
272 int arity;
273 int flags;
274 };
275
276 typedef struct _redisSortObject {
277 robj *obj;
278 union {
279 double score;
280 robj *cmpobj;
281 } u;
282 } redisSortObject;
283
284 typedef struct _redisSortOperation {
285 int type;
286 robj *pattern;
287 } redisSortOperation;
288
289 struct sharedObjectsStruct {
290 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
291 *colon, *nullbulk, *nullmultibulk,
292 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
293 *outofrangeerr, *plus,
294 *select0, *select1, *select2, *select3, *select4,
295 *select5, *select6, *select7, *select8, *select9;
296 } shared;
297
298 /*================================ Prototypes =============================== */
299
300 static void freeStringObject(robj *o);
301 static void freeListObject(robj *o);
302 static void freeSetObject(robj *o);
303 static void decrRefCount(void *o);
304 static robj *createObject(int type, void *ptr);
305 static void freeClient(redisClient *c);
306 static int rdbLoad(char *filename);
307 static void addReply(redisClient *c, robj *obj);
308 static void addReplySds(redisClient *c, sds s);
309 static void incrRefCount(robj *o);
310 static int rdbSaveBackground(char *filename);
311 static robj *createStringObject(char *ptr, size_t len);
312 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
313 static int syncWithMaster(void);
314 static robj *tryObjectSharing(robj *o);
315 static int removeExpire(redisDb *db, robj *key);
316 static int expireIfNeeded(redisDb *db, robj *key);
317 static int deleteIfVolatile(redisDb *db, robj *key);
318 static int deleteKey(redisDb *db, robj *key);
319 static time_t getExpire(redisDb *db, robj *key);
320 static int setExpire(redisDb *db, robj *key, time_t when);
321 static void updateSalvesWaitingBgsave(int bgsaveerr);
322 static void freeMemoryIfNeeded(void);
323
324 static void authCommand(redisClient *c);
325 static void pingCommand(redisClient *c);
326 static void echoCommand(redisClient *c);
327 static void setCommand(redisClient *c);
328 static void setnxCommand(redisClient *c);
329 static void getCommand(redisClient *c);
330 static void delCommand(redisClient *c);
331 static void existsCommand(redisClient *c);
332 static void incrCommand(redisClient *c);
333 static void decrCommand(redisClient *c);
334 static void incrbyCommand(redisClient *c);
335 static void decrbyCommand(redisClient *c);
336 static void selectCommand(redisClient *c);
337 static void randomkeyCommand(redisClient *c);
338 static void keysCommand(redisClient *c);
339 static void dbsizeCommand(redisClient *c);
340 static void lastsaveCommand(redisClient *c);
341 static void saveCommand(redisClient *c);
342 static void bgsaveCommand(redisClient *c);
343 static void shutdownCommand(redisClient *c);
344 static void moveCommand(redisClient *c);
345 static void renameCommand(redisClient *c);
346 static void renamenxCommand(redisClient *c);
347 static void lpushCommand(redisClient *c);
348 static void rpushCommand(redisClient *c);
349 static void lpopCommand(redisClient *c);
350 static void rpopCommand(redisClient *c);
351 static void llenCommand(redisClient *c);
352 static void lindexCommand(redisClient *c);
353 static void lrangeCommand(redisClient *c);
354 static void ltrimCommand(redisClient *c);
355 static void typeCommand(redisClient *c);
356 static void lsetCommand(redisClient *c);
357 static void saddCommand(redisClient *c);
358 static void sremCommand(redisClient *c);
359 static void smoveCommand(redisClient *c);
360 static void sismemberCommand(redisClient *c);
361 static void scardCommand(redisClient *c);
362 static void sinterCommand(redisClient *c);
363 static void sinterstoreCommand(redisClient *c);
364 static void sunionCommand(redisClient *c);
365 static void sunionstoreCommand(redisClient *c);
366 static void sdiffCommand(redisClient *c);
367 static void sdiffstoreCommand(redisClient *c);
368 static void syncCommand(redisClient *c);
369 static void flushdbCommand(redisClient *c);
370 static void flushallCommand(redisClient *c);
371 static void sortCommand(redisClient *c);
372 static void lremCommand(redisClient *c);
373 static void infoCommand(redisClient *c);
374 static void mgetCommand(redisClient *c);
375 static void monitorCommand(redisClient *c);
376 static void expireCommand(redisClient *c);
377 static void getSetCommand(redisClient *c);
378 static void ttlCommand(redisClient *c);
379 static void slaveofCommand(redisClient *c);
380 static void debugCommand(redisClient *c);
381 static void setupSigSegvAction();
382 /*================================= Globals ================================= */
383
384 /* Global vars */
385 static struct redisServer server; /* server global state */
386 static struct redisCommand cmdTable[] = {
387 {"get",getCommand,2,REDIS_CMD_INLINE},
388 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
389 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
390 {"del",delCommand,-2,REDIS_CMD_INLINE},
391 {"exists",existsCommand,2,REDIS_CMD_INLINE},
392 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
393 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
394 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
395 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
396 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
397 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
398 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
399 {"llen",llenCommand,2,REDIS_CMD_INLINE},
400 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
401 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
402 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
403 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
404 {"lrem",lremCommand,4,REDIS_CMD_BULK},
405 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
406 {"srem",sremCommand,3,REDIS_CMD_BULK},
407 {"smove",smoveCommand,4,REDIS_CMD_BULK},
408 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
409 {"scard",scardCommand,2,REDIS_CMD_INLINE},
410 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
411 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
412 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
413 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
414 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
415 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
416 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
417 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
418 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
419 {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
420 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
421 {"select",selectCommand,2,REDIS_CMD_INLINE},
422 {"move",moveCommand,3,REDIS_CMD_INLINE},
423 {"rename",renameCommand,3,REDIS_CMD_INLINE},
424 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
425 {"expire",expireCommand,3,REDIS_CMD_INLINE},
426 {"keys",keysCommand,2,REDIS_CMD_INLINE},
427 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
428 {"auth",authCommand,2,REDIS_CMD_INLINE},
429 {"ping",pingCommand,1,REDIS_CMD_INLINE},
430 {"echo",echoCommand,2,REDIS_CMD_BULK},
431 {"save",saveCommand,1,REDIS_CMD_INLINE},
432 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
433 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
434 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
435 {"type",typeCommand,2,REDIS_CMD_INLINE},
436 {"sync",syncCommand,1,REDIS_CMD_INLINE},
437 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
438 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
439 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
440 {"info",infoCommand,1,REDIS_CMD_INLINE},
441 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
442 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
443 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
444 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
445 {NULL,NULL,0,0}
446 };
447
448 /*============================ Utility functions ============================ */
449
450 /* Glob-style pattern matching. */
451 int stringmatchlen(const char *pattern, int patternLen,
452 const char *string, int stringLen, int nocase)
453 {
454 while(patternLen) {
455 switch(pattern[0]) {
456 case '*':
457 while (pattern[1] == '*') {
458 pattern++;
459 patternLen--;
460 }
461 if (patternLen == 1)
462 return 1; /* match */
463 while(stringLen) {
464 if (stringmatchlen(pattern+1, patternLen-1,
465 string, stringLen, nocase))
466 return 1; /* match */
467 string++;
468 stringLen--;
469 }
470 return 0; /* no match */
471 break;
472 case '?':
473 if (stringLen == 0)
474 return 0; /* no match */
475 string++;
476 stringLen--;
477 break;
478 case '[':
479 {
480 int not, match;
481
482 pattern++;
483 patternLen--;
484 not = pattern[0] == '^';
485 if (not) {
486 pattern++;
487 patternLen--;
488 }
489 match = 0;
490 while(1) {
491 if (pattern[0] == '\\') {
492 pattern++;
493 patternLen--;
494 if (pattern[0] == string[0])
495 match = 1;
496 } else if (pattern[0] == ']') {
497 break;
498 } else if (patternLen == 0) {
499 pattern--;
500 patternLen++;
501 break;
502 } else if (pattern[1] == '-' && patternLen >= 3) {
503 int start = pattern[0];
504 int end = pattern[2];
505 int c = string[0];
506 if (start > end) {
507 int t = start;
508 start = end;
509 end = t;
510 }
511 if (nocase) {
512 start = tolower(start);
513 end = tolower(end);
514 c = tolower(c);
515 }
516 pattern += 2;
517 patternLen -= 2;
518 if (c >= start && c <= end)
519 match = 1;
520 } else {
521 if (!nocase) {
522 if (pattern[0] == string[0])
523 match = 1;
524 } else {
525 if (tolower((int)pattern[0]) == tolower((int)string[0]))
526 match = 1;
527 }
528 }
529 pattern++;
530 patternLen--;
531 }
532 if (not)
533 match = !match;
534 if (!match)
535 return 0; /* no match */
536 string++;
537 stringLen--;
538 break;
539 }
540 case '\\':
541 if (patternLen >= 2) {
542 pattern++;
543 patternLen--;
544 }
545 /* fall through */
546 default:
547 if (!nocase) {
548 if (pattern[0] != string[0])
549 return 0; /* no match */
550 } else {
551 if (tolower((int)pattern[0]) != tolower((int)string[0]))
552 return 0; /* no match */
553 }
554 string++;
555 stringLen--;
556 break;
557 }
558 pattern++;
559 patternLen--;
560 if (stringLen == 0) {
561 while(*pattern == '*') {
562 pattern++;
563 patternLen--;
564 }
565 break;
566 }
567 }
568 if (patternLen == 0 && stringLen == 0)
569 return 1;
570 return 0;
571 }
572
573 void redisLog(int level, const char *fmt, ...)
574 {
575 va_list ap;
576 FILE *fp;
577
578 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
579 if (!fp) return;
580
581 va_start(ap, fmt);
582 if (level >= server.verbosity) {
583 char *c = ".-*";
584 char buf[64];
585 time_t now;
586
587 now = time(NULL);
588 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
589 fprintf(fp,"%s %c ",buf,c[level]);
590 vfprintf(fp, fmt, ap);
591 fprintf(fp,"\n");
592 fflush(fp);
593 }
594 va_end(ap);
595
596 if (server.logfile) fclose(fp);
597 }
598
599 /*====================== Hash table type implementation ==================== */
600
601 /* This is an hash table type that uses the SDS dynamic strings libary as
602 * keys and radis objects as values (objects can hold SDS strings,
603 * lists, sets). */
604
605 static int sdsDictKeyCompare(void *privdata, const void *key1,
606 const void *key2)
607 {
608 int l1,l2;
609 DICT_NOTUSED(privdata);
610
611 l1 = sdslen((sds)key1);
612 l2 = sdslen((sds)key2);
613 if (l1 != l2) return 0;
614 return memcmp(key1, key2, l1) == 0;
615 }
616
617 static void dictRedisObjectDestructor(void *privdata, void *val)
618 {
619 DICT_NOTUSED(privdata);
620
621 decrRefCount(val);
622 }
623
624 static int dictSdsKeyCompare(void *privdata, const void *key1,
625 const void *key2)
626 {
627 const robj *o1 = key1, *o2 = key2;
628 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
629 }
630
631 static unsigned int dictSdsHash(const void *key) {
632 const robj *o = key;
633 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
634 }
635
636 static dictType setDictType = {
637 dictSdsHash, /* hash function */
638 NULL, /* key dup */
639 NULL, /* val dup */
640 dictSdsKeyCompare, /* key compare */
641 dictRedisObjectDestructor, /* key destructor */
642 NULL /* val destructor */
643 };
644
645 static dictType hashDictType = {
646 dictSdsHash, /* hash function */
647 NULL, /* key dup */
648 NULL, /* val dup */
649 dictSdsKeyCompare, /* key compare */
650 dictRedisObjectDestructor, /* key destructor */
651 dictRedisObjectDestructor /* val destructor */
652 };
653
654 /* ========================= Random utility functions ======================= */
655
656 /* Redis generally does not try to recover from out of memory conditions
657 * when allocating objects or strings, it is not clear if it will be possible
658 * to report this condition to the client since the networking layer itself
659 * is based on heap allocation for send buffers, so we simply abort.
660 * At least the code will be simpler to read... */
661 static void oom(const char *msg) {
662 fprintf(stderr, "%s: Out of memory\n",msg);
663 fflush(stderr);
664 sleep(1);
665 abort();
666 }
667
668 /* ====================== Redis server networking stuff ===================== */
669 void closeTimedoutClients(void) {
670 redisClient *c;
671 listNode *ln;
672 time_t now = time(NULL);
673
674 listRewind(server.clients);
675 while ((ln = listYield(server.clients)) != NULL) {
676 c = listNodeValue(ln);
677 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
678 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
679 (now - c->lastinteraction > server.maxidletime)) {
680 redisLog(REDIS_DEBUG,"Closing idle client");
681 freeClient(c);
682 }
683 }
684 }
685
686 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
687 * we resize the hash table to save memory */
688 void tryResizeHashTables(void) {
689 int j;
690
691 for (j = 0; j < server.dbnum; j++) {
692 long long size, used;
693
694 size = dictSlots(server.db[j].dict);
695 used = dictSize(server.db[j].dict);
696 if (size && used && size > REDIS_HT_MINSLOTS &&
697 (used*100/size < REDIS_HT_MINFILL)) {
698 redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
699 dictResize(server.db[j].dict);
700 redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
701 }
702 }
703 }
704
705 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
706 int j, loops = server.cronloops++;
707 REDIS_NOTUSED(eventLoop);
708 REDIS_NOTUSED(id);
709 REDIS_NOTUSED(clientData);
710
711 /* Update the global state with the amount of used memory */
712 server.usedmemory = zmalloc_used_memory();
713
714 /* Show some info about non-empty databases */
715 for (j = 0; j < server.dbnum; j++) {
716 long long size, used, vkeys;
717
718 size = dictSlots(server.db[j].dict);
719 used = dictSize(server.db[j].dict);
720 vkeys = dictSize(server.db[j].expires);
721 if (!(loops % 5) && used > 0) {
722 redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size);
723 /* dictPrintStats(server.dict); */
724 }
725 }
726
727 /* We don't want to resize the hash tables while a bacground saving
728 * is in progress: the saving child is created using fork() that is
729 * implemented with a copy-on-write semantic in most modern systems, so
730 * if we resize the HT while there is the saving child at work actually
731 * a lot of memory movements in the parent will cause a lot of pages
732 * copied. */
733 if (!server.bgsaveinprogress) tryResizeHashTables();
734
735 /* Show information about connected clients */
736 if (!(loops % 5)) {
737 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use",
738 listLength(server.clients)-listLength(server.slaves),
739 listLength(server.slaves),
740 server.usedmemory,
741 dictSize(server.sharingpool));
742 }
743
744 /* Close connections of timedout clients */
745 if (server.maxidletime && !(loops % 10))
746 closeTimedoutClients();
747
748 /* Check if a background saving in progress terminated */
749 if (server.bgsaveinprogress) {
750 int statloc;
751 /* XXX: TODO handle the case of the saving child killed */
752 if (wait4(-1,&statloc,WNOHANG,NULL)) {
753 int exitcode = WEXITSTATUS(statloc);
754 if (exitcode == 0) {
755 redisLog(REDIS_NOTICE,
756 "Background saving terminated with success");
757 server.dirty = 0;
758 server.lastsave = time(NULL);
759 } else {
760 redisLog(REDIS_WARNING,
761 "Background saving error");
762 }
763 server.bgsaveinprogress = 0;
764 updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
765 }
766 } else {
767 /* If there is not a background saving in progress check if
768 * we have to save now */
769 time_t now = time(NULL);
770 for (j = 0; j < server.saveparamslen; j++) {
771 struct saveparam *sp = server.saveparams+j;
772
773 if (server.dirty >= sp->changes &&
774 now-server.lastsave > sp->seconds) {
775 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
776 sp->changes, sp->seconds);
777 rdbSaveBackground(server.dbfilename);
778 break;
779 }
780 }
781 }
782
783 /* Try to expire a few timed out keys */
784 for (j = 0; j < server.dbnum; j++) {
785 redisDb *db = server.db+j;
786 int num = dictSize(db->expires);
787
788 if (num) {
789 time_t now = time(NULL);
790
791 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
792 num = REDIS_EXPIRELOOKUPS_PER_CRON;
793 while (num--) {
794 dictEntry *de;
795 time_t t;
796
797 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
798 t = (time_t) dictGetEntryVal(de);
799 if (now > t) {
800 deleteKey(db,dictGetEntryKey(de));
801 }
802 }
803 }
804 }
805
806 /* Check if we should connect to a MASTER */
807 if (server.replstate == REDIS_REPL_CONNECT) {
808 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
809 if (syncWithMaster() == REDIS_OK) {
810 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
811 }
812 }
813 return 1000;
814 }
815
816 static void createSharedObjects(void) {
817 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
818 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
819 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
820 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
821 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
822 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
823 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
824 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
825 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
826 /* no such key */
827 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
828 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
829 "-ERR Operation against a key holding the wrong kind of value\r\n"));
830 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
831 "-ERR no such key\r\n"));
832 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
833 "-ERR syntax error\r\n"));
834 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
835 "-ERR source and destination objects are the same\r\n"));
836 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
837 "-ERR index out of range\r\n"));
838 shared.space = createObject(REDIS_STRING,sdsnew(" "));
839 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
840 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
841 shared.select0 = createStringObject("select 0\r\n",10);
842 shared.select1 = createStringObject("select 1\r\n",10);
843 shared.select2 = createStringObject("select 2\r\n",10);
844 shared.select3 = createStringObject("select 3\r\n",10);
845 shared.select4 = createStringObject("select 4\r\n",10);
846 shared.select5 = createStringObject("select 5\r\n",10);
847 shared.select6 = createStringObject("select 6\r\n",10);
848 shared.select7 = createStringObject("select 7\r\n",10);
849 shared.select8 = createStringObject("select 8\r\n",10);
850 shared.select9 = createStringObject("select 9\r\n",10);
851 }
852
853 static void appendServerSaveParams(time_t seconds, int changes) {
854 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
855 if (server.saveparams == NULL) oom("appendServerSaveParams");
856 server.saveparams[server.saveparamslen].seconds = seconds;
857 server.saveparams[server.saveparamslen].changes = changes;
858 server.saveparamslen++;
859 }
860
861 static void ResetServerSaveParams() {
862 zfree(server.saveparams);
863 server.saveparams = NULL;
864 server.saveparamslen = 0;
865 }
866
867 static void initServerConfig() {
868 server.dbnum = REDIS_DEFAULT_DBNUM;
869 server.port = REDIS_SERVERPORT;
870 server.verbosity = REDIS_DEBUG;
871 server.maxidletime = REDIS_MAXIDLETIME;
872 server.saveparams = NULL;
873 server.logfile = NULL; /* NULL = log on standard output */
874 server.bindaddr = NULL;
875 server.glueoutputbuf = 1;
876 server.daemonize = 0;
877 server.pidfile = "/var/run/redis.pid";
878 server.dbfilename = "dump.rdb";
879 server.requirepass = NULL;
880 server.shareobjects = 0;
881 server.maxclients = 0;
882 server.maxmemory = 0;
883 ResetServerSaveParams();
884
885 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
886 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
887 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
888 /* Replication related */
889 server.isslave = 0;
890 server.masterhost = NULL;
891 server.masterport = 6379;
892 server.master = NULL;
893 server.replstate = REDIS_REPL_NONE;
894 }
895
896 static void initServer() {
897 int j;
898
899 signal(SIGHUP, SIG_IGN);
900 signal(SIGPIPE, SIG_IGN);
901 setupSigSegvAction();
902
903 server.clients = listCreate();
904 server.slaves = listCreate();
905 server.monitors = listCreate();
906 server.objfreelist = listCreate();
907 createSharedObjects();
908 server.el = aeCreateEventLoop();
909 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
910 server.sharingpool = dictCreate(&setDictType,NULL);
911 server.sharingpoolsize = 1024;
912 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
913 oom("server initialization"); /* Fatal OOM */
914 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
915 if (server.fd == -1) {
916 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
917 exit(1);
918 }
919 for (j = 0; j < server.dbnum; j++) {
920 server.db[j].dict = dictCreate(&hashDictType,NULL);
921 server.db[j].expires = dictCreate(&setDictType,NULL);
922 server.db[j].id = j;
923 }
924 server.cronloops = 0;
925 server.bgsaveinprogress = 0;
926 server.lastsave = time(NULL);
927 server.dirty = 0;
928 server.usedmemory = 0;
929 server.stat_numcommands = 0;
930 server.stat_numconnections = 0;
931 server.stat_starttime = time(NULL);
932 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
933 }
934
935 /* Empty the whole database */
936 static long long emptyDb() {
937 int j;
938 long long removed = 0;
939
940 for (j = 0; j < server.dbnum; j++) {
941 removed += dictSize(server.db[j].dict);
942 dictEmpty(server.db[j].dict);
943 dictEmpty(server.db[j].expires);
944 }
945 return removed;
946 }
947
948 static int yesnotoi(char *s) {
949 if (!strcasecmp(s,"yes")) return 1;
950 else if (!strcasecmp(s,"no")) return 0;
951 else return -1;
952 }
953
954 /* I agree, this is a very rudimental way to load a configuration...
955 will improve later if the config gets more complex */
956 static void loadServerConfig(char *filename) {
957 FILE *fp = fopen(filename,"r");
958 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
959 int linenum = 0;
960 sds line = NULL;
961
962 if (!fp) {
963 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
964 exit(1);
965 }
966 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
967 sds *argv;
968 int argc, j;
969
970 linenum++;
971 line = sdsnew(buf);
972 line = sdstrim(line," \t\r\n");
973
974 /* Skip comments and blank lines*/
975 if (line[0] == '#' || line[0] == '\0') {
976 sdsfree(line);
977 continue;
978 }
979
980 /* Split into arguments */
981 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
982 sdstolower(argv[0]);
983
984 /* Execute config directives */
985 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
986 server.maxidletime = atoi(argv[1]);
987 if (server.maxidletime < 0) {
988 err = "Invalid timeout value"; goto loaderr;
989 }
990 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
991 server.port = atoi(argv[1]);
992 if (server.port < 1 || server.port > 65535) {
993 err = "Invalid port"; goto loaderr;
994 }
995 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
996 server.bindaddr = zstrdup(argv[1]);
997 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
998 int seconds = atoi(argv[1]);
999 int changes = atoi(argv[2]);
1000 if (seconds < 1 || changes < 0) {
1001 err = "Invalid save parameters"; goto loaderr;
1002 }
1003 appendServerSaveParams(seconds,changes);
1004 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1005 if (chdir(argv[1]) == -1) {
1006 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1007 argv[1], strerror(errno));
1008 exit(1);
1009 }
1010 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1011 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1012 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1013 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1014 else {
1015 err = "Invalid log level. Must be one of debug, notice, warning";
1016 goto loaderr;
1017 }
1018 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1019 FILE *fp;
1020
1021 server.logfile = zstrdup(argv[1]);
1022 if (!strcasecmp(server.logfile,"stdout")) {
1023 zfree(server.logfile);
1024 server.logfile = NULL;
1025 }
1026 if (server.logfile) {
1027 /* Test if we are able to open the file. The server will not
1028 * be able to abort just for this problem later... */
1029 fp = fopen(server.logfile,"a");
1030 if (fp == NULL) {
1031 err = sdscatprintf(sdsempty(),
1032 "Can't open the log file: %s", strerror(errno));
1033 goto loaderr;
1034 }
1035 fclose(fp);
1036 }
1037 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1038 server.dbnum = atoi(argv[1]);
1039 if (server.dbnum < 1) {
1040 err = "Invalid number of databases"; goto loaderr;
1041 }
1042 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1043 server.maxclients = atoi(argv[1]);
1044 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1045 server.maxmemory = atoi(argv[1]);
1046 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1047 server.masterhost = sdsnew(argv[1]);
1048 server.masterport = atoi(argv[2]);
1049 server.replstate = REDIS_REPL_CONNECT;
1050 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1051 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1052 err = "argument must be 'yes' or 'no'"; goto loaderr;
1053 }
1054 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1055 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1056 err = "argument must be 'yes' or 'no'"; goto loaderr;
1057 }
1058 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1059 server.sharingpoolsize = atoi(argv[1]);
1060 if (server.sharingpoolsize < 1) {
1061 err = "invalid object sharing pool size"; goto loaderr;
1062 }
1063 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1064 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1065 err = "argument must be 'yes' or 'no'"; goto loaderr;
1066 }
1067 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1068 server.requirepass = zstrdup(argv[1]);
1069 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1070 server.pidfile = zstrdup(argv[1]);
1071 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1072 server.dbfilename = zstrdup(argv[1]);
1073 } else {
1074 err = "Bad directive or wrong number of arguments"; goto loaderr;
1075 }
1076 for (j = 0; j < argc; j++)
1077 sdsfree(argv[j]);
1078 zfree(argv);
1079 sdsfree(line);
1080 }
1081 fclose(fp);
1082 return;
1083
1084 loaderr:
1085 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1086 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1087 fprintf(stderr, ">>> '%s'\n", line);
1088 fprintf(stderr, "%s\n", err);
1089 exit(1);
1090 }
1091
1092 static void freeClientArgv(redisClient *c) {
1093 int j;
1094
1095 for (j = 0; j < c->argc; j++)
1096 decrRefCount(c->argv[j]);
1097 c->argc = 0;
1098 }
1099
1100 static void freeClient(redisClient *c) {
1101 listNode *ln;
1102
1103 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1104 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1105 sdsfree(c->querybuf);
1106 listRelease(c->reply);
1107 freeClientArgv(c);
1108 close(c->fd);
1109 ln = listSearchKey(server.clients,c);
1110 assert(ln != NULL);
1111 listDelNode(server.clients,ln);
1112 if (c->flags & REDIS_SLAVE) {
1113 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1114 close(c->repldbfd);
1115 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1116 ln = listSearchKey(l,c);
1117 assert(ln != NULL);
1118 listDelNode(l,ln);
1119 }
1120 if (c->flags & REDIS_MASTER) {
1121 server.master = NULL;
1122 server.replstate = REDIS_REPL_CONNECT;
1123 }
1124 zfree(c->argv);
1125 zfree(c);
1126 }
1127
1128 static void glueReplyBuffersIfNeeded(redisClient *c) {
1129 int totlen = 0;
1130 listNode *ln;
1131 robj *o;
1132
1133 listRewind(c->reply);
1134 while((ln = listYield(c->reply))) {
1135 o = ln->value;
1136 totlen += sdslen(o->ptr);
1137 /* This optimization makes more sense if we don't have to copy
1138 * too much data */
1139 if (totlen > 1024) return;
1140 }
1141 if (totlen > 0) {
1142 char buf[1024];
1143 int copylen = 0;
1144
1145 listRewind(c->reply);
1146 while((ln = listYield(c->reply))) {
1147 o = ln->value;
1148 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1149 copylen += sdslen(o->ptr);
1150 listDelNode(c->reply,ln);
1151 }
1152 /* Now the output buffer is empty, add the new single element */
1153 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1154 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1155 }
1156 }
1157
1158 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1159 redisClient *c = privdata;
1160 int nwritten = 0, totwritten = 0, objlen;
1161 robj *o;
1162 REDIS_NOTUSED(el);
1163 REDIS_NOTUSED(mask);
1164
1165 if (server.glueoutputbuf && listLength(c->reply) > 1)
1166 glueReplyBuffersIfNeeded(c);
1167 while(listLength(c->reply)) {
1168 o = listNodeValue(listFirst(c->reply));
1169 objlen = sdslen(o->ptr);
1170
1171 if (objlen == 0) {
1172 listDelNode(c->reply,listFirst(c->reply));
1173 continue;
1174 }
1175
1176 if (c->flags & REDIS_MASTER) {
1177 nwritten = objlen - c->sentlen;
1178 } else {
1179 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1180 if (nwritten <= 0) break;
1181 }
1182 c->sentlen += nwritten;
1183 totwritten += nwritten;
1184 /* If we fully sent the object on head go to the next one */
1185 if (c->sentlen == objlen) {
1186 listDelNode(c->reply,listFirst(c->reply));
1187 c->sentlen = 0;
1188 }
1189 }
1190 if (nwritten == -1) {
1191 if (errno == EAGAIN) {
1192 nwritten = 0;
1193 } else {
1194 redisLog(REDIS_DEBUG,
1195 "Error writing to client: %s", strerror(errno));
1196 freeClient(c);
1197 return;
1198 }
1199 }
1200 if (totwritten > 0) c->lastinteraction = time(NULL);
1201 if (listLength(c->reply) == 0) {
1202 c->sentlen = 0;
1203 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1204 }
1205 }
1206
1207 static struct redisCommand *lookupCommand(char *name) {
1208 int j = 0;
1209 while(cmdTable[j].name != NULL) {
1210 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1211 j++;
1212 }
1213 return NULL;
1214 }
1215
1216 /* resetClient prepare the client to process the next command */
1217 static void resetClient(redisClient *c) {
1218 freeClientArgv(c);
1219 c->bulklen = -1;
1220 }
1221
1222 /* If this function gets called we already read a whole
1223 * command, argments are in the client argv/argc fields.
1224 * processCommand() execute the command or prepare the
1225 * server for a bulk read from the client.
1226 *
1227 * If 1 is returned the client is still alive and valid and
1228 * and other operations can be performed by the caller. Otherwise
1229 * if 0 is returned the client was destroied (i.e. after QUIT). */
1230 static int processCommand(redisClient *c) {
1231 struct redisCommand *cmd;
1232 long long dirty;
1233
1234 /* Free some memory if needed (maxmemory setting) */
1235 if (server.maxmemory) freeMemoryIfNeeded();
1236
1237 /* The QUIT command is handled as a special case. Normal command
1238 * procs are unable to close the client connection safely */
1239 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1240 freeClient(c);
1241 return 0;
1242 }
1243 cmd = lookupCommand(c->argv[0]->ptr);
1244 if (!cmd) {
1245 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1246 resetClient(c);
1247 return 1;
1248 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1249 (c->argc < -cmd->arity)) {
1250 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1251 resetClient(c);
1252 return 1;
1253 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1254 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1255 resetClient(c);
1256 return 1;
1257 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1258 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1259
1260 decrRefCount(c->argv[c->argc-1]);
1261 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1262 c->argc--;
1263 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1264 resetClient(c);
1265 return 1;
1266 }
1267 c->argc--;
1268 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1269 /* It is possible that the bulk read is already in the
1270 * buffer. Check this condition and handle it accordingly */
1271 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1272 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1273 c->argc++;
1274 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1275 } else {
1276 return 1;
1277 }
1278 }
1279 /* Let's try to share objects on the command arguments vector */
1280 if (server.shareobjects) {
1281 int j;
1282 for(j = 1; j < c->argc; j++)
1283 c->argv[j] = tryObjectSharing(c->argv[j]);
1284 }
1285 /* Check if the user is authenticated */
1286 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1287 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1288 resetClient(c);
1289 return 1;
1290 }
1291
1292 /* Exec the command */
1293 dirty = server.dirty;
1294 cmd->proc(c);
1295 if (server.dirty-dirty != 0 && listLength(server.slaves))
1296 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1297 if (listLength(server.monitors))
1298 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1299 server.stat_numcommands++;
1300
1301 /* Prepare the client for the next command */
1302 if (c->flags & REDIS_CLOSE) {
1303 freeClient(c);
1304 return 0;
1305 }
1306 resetClient(c);
1307 return 1;
1308 }
1309
1310 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1311 listNode *ln;
1312 int outc = 0, j;
1313 robj **outv;
1314 /* (args*2)+1 is enough room for args, spaces, newlines */
1315 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1316
1317 if (argc <= REDIS_STATIC_ARGS) {
1318 outv = static_outv;
1319 } else {
1320 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1321 if (!outv) oom("replicationFeedSlaves");
1322 }
1323
1324 for (j = 0; j < argc; j++) {
1325 if (j != 0) outv[outc++] = shared.space;
1326 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1327 robj *lenobj;
1328
1329 lenobj = createObject(REDIS_STRING,
1330 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv[j]->ptr)));
1331 lenobj->refcount = 0;
1332 outv[outc++] = lenobj;
1333 }
1334 outv[outc++] = argv[j];
1335 }
1336 outv[outc++] = shared.crlf;
1337
1338 /* Increment all the refcounts at start and decrement at end in order to
1339 * be sure to free objects if there is no slave in a replication state
1340 * able to be feed with commands */
1341 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1342 listRewind(slaves);
1343 while((ln = listYield(slaves))) {
1344 redisClient *slave = ln->value;
1345
1346 /* Don't feed slaves that are still waiting for BGSAVE to start */
1347 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1348
1349 /* Feed all the other slaves, MONITORs and so on */
1350 if (slave->slaveseldb != dictid) {
1351 robj *selectcmd;
1352
1353 switch(dictid) {
1354 case 0: selectcmd = shared.select0; break;
1355 case 1: selectcmd = shared.select1; break;
1356 case 2: selectcmd = shared.select2; break;
1357 case 3: selectcmd = shared.select3; break;
1358 case 4: selectcmd = shared.select4; break;
1359 case 5: selectcmd = shared.select5; break;
1360 case 6: selectcmd = shared.select6; break;
1361 case 7: selectcmd = shared.select7; break;
1362 case 8: selectcmd = shared.select8; break;
1363 case 9: selectcmd = shared.select9; break;
1364 default:
1365 selectcmd = createObject(REDIS_STRING,
1366 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1367 selectcmd->refcount = 0;
1368 break;
1369 }
1370 addReply(slave,selectcmd);
1371 slave->slaveseldb = dictid;
1372 }
1373 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1374 }
1375 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1376 if (outv != static_outv) zfree(outv);
1377 }
1378
1379 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1380 redisClient *c = (redisClient*) privdata;
1381 char buf[REDIS_IOBUF_LEN];
1382 int nread;
1383 REDIS_NOTUSED(el);
1384 REDIS_NOTUSED(mask);
1385
1386 nread = read(fd, buf, REDIS_IOBUF_LEN);
1387 if (nread == -1) {
1388 if (errno == EAGAIN) {
1389 nread = 0;
1390 } else {
1391 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1392 freeClient(c);
1393 return;
1394 }
1395 } else if (nread == 0) {
1396 redisLog(REDIS_DEBUG, "Client closed connection");
1397 freeClient(c);
1398 return;
1399 }
1400 if (nread) {
1401 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1402 c->lastinteraction = time(NULL);
1403 } else {
1404 return;
1405 }
1406
1407 again:
1408 if (c->bulklen == -1) {
1409 /* Read the first line of the query */
1410 char *p = strchr(c->querybuf,'\n');
1411 size_t querylen;
1412 if (p) {
1413 sds query, *argv;
1414 int argc, j;
1415
1416 query = c->querybuf;
1417 c->querybuf = sdsempty();
1418 querylen = 1+(p-(query));
1419 if (sdslen(query) > querylen) {
1420 /* leave data after the first line of the query in the buffer */
1421 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1422 }
1423 *p = '\0'; /* remove "\n" */
1424 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1425 sdsupdatelen(query);
1426
1427 /* Now we can split the query in arguments */
1428 if (sdslen(query) == 0) {
1429 /* Ignore empty query */
1430 sdsfree(query);
1431 return;
1432 }
1433 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1434 if (argv == NULL) oom("sdssplitlen");
1435 sdsfree(query);
1436
1437 if (c->argv) zfree(c->argv);
1438 c->argv = zmalloc(sizeof(robj*)*argc);
1439 if (c->argv == NULL) oom("allocating arguments list for client");
1440
1441 for (j = 0; j < argc; j++) {
1442 if (sdslen(argv[j])) {
1443 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1444 c->argc++;
1445 } else {
1446 sdsfree(argv[j]);
1447 }
1448 }
1449 zfree(argv);
1450 /* Execute the command. If the client is still valid
1451 * after processCommand() return and there is something
1452 * on the query buffer try to process the next command. */
1453 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1454 return;
1455 } else if (sdslen(c->querybuf) >= 1024*32) {
1456 redisLog(REDIS_DEBUG, "Client protocol error");
1457 freeClient(c);
1458 return;
1459 }
1460 } else {
1461 /* Bulk read handling. Note that if we are at this point
1462 the client already sent a command terminated with a newline,
1463 we are reading the bulk data that is actually the last
1464 argument of the command. */
1465 int qbl = sdslen(c->querybuf);
1466
1467 if (c->bulklen <= qbl) {
1468 /* Copy everything but the final CRLF as final argument */
1469 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1470 c->argc++;
1471 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1472 processCommand(c);
1473 return;
1474 }
1475 }
1476 }
1477
1478 static int selectDb(redisClient *c, int id) {
1479 if (id < 0 || id >= server.dbnum)
1480 return REDIS_ERR;
1481 c->db = &server.db[id];
1482 return REDIS_OK;
1483 }
1484
1485 static void *dupClientReplyValue(void *o) {
1486 incrRefCount((robj*)o);
1487 return 0;
1488 }
1489
1490 static redisClient *createClient(int fd) {
1491 redisClient *c = zmalloc(sizeof(*c));
1492
1493 anetNonBlock(NULL,fd);
1494 anetTcpNoDelay(NULL,fd);
1495 if (!c) return NULL;
1496 selectDb(c,0);
1497 c->fd = fd;
1498 c->querybuf = sdsempty();
1499 c->argc = 0;
1500 c->argv = NULL;
1501 c->bulklen = -1;
1502 c->sentlen = 0;
1503 c->flags = 0;
1504 c->lastinteraction = time(NULL);
1505 c->authenticated = 0;
1506 c->replstate = REDIS_REPL_NONE;
1507 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1508 listSetFreeMethod(c->reply,decrRefCount);
1509 listSetDupMethod(c->reply,dupClientReplyValue);
1510 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1511 readQueryFromClient, c, NULL) == AE_ERR) {
1512 freeClient(c);
1513 return NULL;
1514 }
1515 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1516 return c;
1517 }
1518
1519 static void addReply(redisClient *c, robj *obj) {
1520 if (listLength(c->reply) == 0 &&
1521 (c->replstate == REDIS_REPL_NONE ||
1522 c->replstate == REDIS_REPL_ONLINE) &&
1523 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1524 sendReplyToClient, c, NULL) == AE_ERR) return;
1525 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1526 incrRefCount(obj);
1527 }
1528
1529 static void addReplySds(redisClient *c, sds s) {
1530 robj *o = createObject(REDIS_STRING,s);
1531 addReply(c,o);
1532 decrRefCount(o);
1533 }
1534
1535 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1536 int cport, cfd;
1537 char cip[128];
1538 redisClient *c;
1539 REDIS_NOTUSED(el);
1540 REDIS_NOTUSED(mask);
1541 REDIS_NOTUSED(privdata);
1542
1543 cfd = anetAccept(server.neterr, fd, cip, &cport);
1544 if (cfd == AE_ERR) {
1545 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1546 return;
1547 }
1548 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1549 if ((c = createClient(cfd)) == NULL) {
1550 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1551 close(cfd); /* May be already closed, just ingore errors */
1552 return;
1553 }
1554 /* If maxclient directive is set and this is one client more... close the
1555 * connection. Note that we create the client instead to check before
1556 * for this condition, since now the socket is already set in nonblocking
1557 * mode and we can send an error for free using the Kernel I/O */
1558 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1559 char *err = "-ERR max number of clients reached\r\n";
1560
1561 /* That's a best effort error message, don't check write errors */
1562 (void) write(c->fd,err,strlen(err));
1563 freeClient(c);
1564 return;
1565 }
1566 server.stat_numconnections++;
1567 }
1568
1569 /* ======================= Redis objects implementation ===================== */
1570
1571 static robj *createObject(int type, void *ptr) {
1572 robj *o;
1573
1574 if (listLength(server.objfreelist)) {
1575 listNode *head = listFirst(server.objfreelist);
1576 o = listNodeValue(head);
1577 listDelNode(server.objfreelist,head);
1578 } else {
1579 o = zmalloc(sizeof(*o));
1580 }
1581 if (!o) oom("createObject");
1582 o->type = type;
1583 o->ptr = ptr;
1584 o->refcount = 1;
1585 return o;
1586 }
1587
1588 static robj *createStringObject(char *ptr, size_t len) {
1589 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1590 }
1591
1592 static robj *createListObject(void) {
1593 list *l = listCreate();
1594
1595 if (!l) oom("listCreate");
1596 listSetFreeMethod(l,decrRefCount);
1597 return createObject(REDIS_LIST,l);
1598 }
1599
1600 static robj *createSetObject(void) {
1601 dict *d = dictCreate(&setDictType,NULL);
1602 if (!d) oom("dictCreate");
1603 return createObject(REDIS_SET,d);
1604 }
1605
1606 static void freeStringObject(robj *o) {
1607 sdsfree(o->ptr);
1608 }
1609
1610 static void freeListObject(robj *o) {
1611 listRelease((list*) o->ptr);
1612 }
1613
1614 static void freeSetObject(robj *o) {
1615 dictRelease((dict*) o->ptr);
1616 }
1617
1618 static void freeHashObject(robj *o) {
1619 dictRelease((dict*) o->ptr);
1620 }
1621
1622 static void incrRefCount(robj *o) {
1623 o->refcount++;
1624 #ifdef DEBUG_REFCOUNT
1625 if (o->type == REDIS_STRING)
1626 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1627 #endif
1628 }
1629
1630 static void decrRefCount(void *obj) {
1631 robj *o = obj;
1632
1633 #ifdef DEBUG_REFCOUNT
1634 if (o->type == REDIS_STRING)
1635 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1636 #endif
1637 if (--(o->refcount) == 0) {
1638 switch(o->type) {
1639 case REDIS_STRING: freeStringObject(o); break;
1640 case REDIS_LIST: freeListObject(o); break;
1641 case REDIS_SET: freeSetObject(o); break;
1642 case REDIS_HASH: freeHashObject(o); break;
1643 default: assert(0 != 0); break;
1644 }
1645 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1646 !listAddNodeHead(server.objfreelist,o))
1647 zfree(o);
1648 }
1649 }
1650
1651 /* Try to share an object against the shared objects pool */
1652 static robj *tryObjectSharing(robj *o) {
1653 struct dictEntry *de;
1654 unsigned long c;
1655
1656 if (o == NULL || server.shareobjects == 0) return o;
1657
1658 assert(o->type == REDIS_STRING);
1659 de = dictFind(server.sharingpool,o);
1660 if (de) {
1661 robj *shared = dictGetEntryKey(de);
1662
1663 c = ((unsigned long) dictGetEntryVal(de))+1;
1664 dictGetEntryVal(de) = (void*) c;
1665 incrRefCount(shared);
1666 decrRefCount(o);
1667 return shared;
1668 } else {
1669 /* Here we are using a stream algorihtm: Every time an object is
1670 * shared we increment its count, everytime there is a miss we
1671 * recrement the counter of a random object. If this object reaches
1672 * zero we remove the object and put the current object instead. */
1673 if (dictSize(server.sharingpool) >=
1674 server.sharingpoolsize) {
1675 de = dictGetRandomKey(server.sharingpool);
1676 assert(de != NULL);
1677 c = ((unsigned long) dictGetEntryVal(de))-1;
1678 dictGetEntryVal(de) = (void*) c;
1679 if (c == 0) {
1680 dictDelete(server.sharingpool,de->key);
1681 }
1682 } else {
1683 c = 0; /* If the pool is empty we want to add this object */
1684 }
1685 if (c == 0) {
1686 int retval;
1687
1688 retval = dictAdd(server.sharingpool,o,(void*)1);
1689 assert(retval == DICT_OK);
1690 incrRefCount(o);
1691 }
1692 return o;
1693 }
1694 }
1695
1696 static robj *lookupKey(redisDb *db, robj *key) {
1697 dictEntry *de = dictFind(db->dict,key);
1698 return de ? dictGetEntryVal(de) : NULL;
1699 }
1700
1701 static robj *lookupKeyRead(redisDb *db, robj *key) {
1702 expireIfNeeded(db,key);
1703 return lookupKey(db,key);
1704 }
1705
1706 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1707 deleteIfVolatile(db,key);
1708 return lookupKey(db,key);
1709 }
1710
1711 static int deleteKey(redisDb *db, robj *key) {
1712 int retval;
1713
1714 /* We need to protect key from destruction: after the first dictDelete()
1715 * it may happen that 'key' is no longer valid if we don't increment
1716 * it's count. This may happen when we get the object reference directly
1717 * from the hash table with dictRandomKey() or dict iterators */
1718 incrRefCount(key);
1719 if (dictSize(db->expires)) dictDelete(db->expires,key);
1720 retval = dictDelete(db->dict,key);
1721 decrRefCount(key);
1722
1723 return retval == DICT_OK;
1724 }
1725
1726 /*============================ DB saving/loading ============================ */
1727
1728 static int rdbSaveType(FILE *fp, unsigned char type) {
1729 if (fwrite(&type,1,1,fp) == 0) return -1;
1730 return 0;
1731 }
1732
1733 static int rdbSaveTime(FILE *fp, time_t t) {
1734 int32_t t32 = (int32_t) t;
1735 if (fwrite(&t32,4,1,fp) == 0) return -1;
1736 return 0;
1737 }
1738
1739 /* check rdbLoadLen() comments for more info */
1740 static int rdbSaveLen(FILE *fp, uint32_t len) {
1741 unsigned char buf[2];
1742
1743 if (len < (1<<6)) {
1744 /* Save a 6 bit len */
1745 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
1746 if (fwrite(buf,1,1,fp) == 0) return -1;
1747 } else if (len < (1<<14)) {
1748 /* Save a 14 bit len */
1749 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
1750 buf[1] = len&0xFF;
1751 if (fwrite(buf,2,1,fp) == 0) return -1;
1752 } else {
1753 /* Save a 32 bit len */
1754 buf[0] = (REDIS_RDB_32BITLEN<<6);
1755 if (fwrite(buf,1,1,fp) == 0) return -1;
1756 len = htonl(len);
1757 if (fwrite(&len,4,1,fp) == 0) return -1;
1758 }
1759 return 0;
1760 }
1761
1762 /* String objects in the form "2391" "-100" without any space and with a
1763 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1764 * encoded as integers to save space */
1765 int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
1766 long long value;
1767 char *endptr, buf[32];
1768
1769 /* Check if it's possible to encode this value as a number */
1770 value = strtoll(s, &endptr, 10);
1771 if (endptr[0] != '\0') return 0;
1772 snprintf(buf,32,"%lld",value);
1773
1774 /* If the number converted back into a string is not identical
1775 * then it's not possible to encode the string as integer */
1776 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
1777
1778 /* Finally check if it fits in our ranges */
1779 if (value >= -(1<<7) && value <= (1<<7)-1) {
1780 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
1781 enc[1] = value&0xFF;
1782 return 2;
1783 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
1784 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
1785 enc[1] = value&0xFF;
1786 enc[2] = (value>>8)&0xFF;
1787 return 3;
1788 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
1789 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
1790 enc[1] = value&0xFF;
1791 enc[2] = (value>>8)&0xFF;
1792 enc[3] = (value>>16)&0xFF;
1793 enc[4] = (value>>24)&0xFF;
1794 return 5;
1795 } else {
1796 return 0;
1797 }
1798 }
1799
1800 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
1801 unsigned int comprlen, outlen;
1802 unsigned char byte;
1803 void *out;
1804
1805 /* We require at least four bytes compression for this to be worth it */
1806 outlen = sdslen(obj->ptr)-4;
1807 if (outlen <= 0) return 0;
1808 if ((out = zmalloc(outlen+1)) == NULL) return 0;
1809 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
1810 if (comprlen == 0) {
1811 zfree(out);
1812 return 0;
1813 }
1814 /* Data compressed! Let's save it on disk */
1815 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
1816 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
1817 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
1818 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
1819 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
1820 zfree(out);
1821 return comprlen;
1822
1823 writeerr:
1824 zfree(out);
1825 return -1;
1826 }
1827
1828 /* Save a string objet as [len][data] on disk. If the object is a string
1829 * representation of an integer value we try to safe it in a special form */
1830 static int rdbSaveStringObject(FILE *fp, robj *obj) {
1831 size_t len = sdslen(obj->ptr);
1832 int enclen;
1833
1834 /* Try integer encoding */
1835 if (len <= 11) {
1836 unsigned char buf[5];
1837 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
1838 if (fwrite(buf,enclen,1,fp) == 0) return -1;
1839 return 0;
1840 }
1841 }
1842
1843 /* Try LZF compression - under 20 bytes it's unable to compress even
1844 * aaaaaaaaaaaaaaaaaa so skip it */
1845 if (1 && len > 20) {
1846 int retval;
1847
1848 retval = rdbSaveLzfStringObject(fp,obj);
1849 if (retval == -1) return -1;
1850 if (retval > 0) return 0;
1851 /* retval == 0 means data can't be compressed, save the old way */
1852 }
1853
1854 /* Store verbatim */
1855 if (rdbSaveLen(fp,len) == -1) return -1;
1856 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
1857 return 0;
1858 }
1859
1860 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1861 static int rdbSave(char *filename) {
1862 dictIterator *di = NULL;
1863 dictEntry *de;
1864 FILE *fp;
1865 char tmpfile[256];
1866 int j;
1867 time_t now = time(NULL);
1868
1869 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
1870 fp = fopen(tmpfile,"w");
1871 if (!fp) {
1872 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
1873 return REDIS_ERR;
1874 }
1875 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
1876 for (j = 0; j < server.dbnum; j++) {
1877 redisDb *db = server.db+j;
1878 dict *d = db->dict;
1879 if (dictSize(d) == 0) continue;
1880 di = dictGetIterator(d);
1881 if (!di) {
1882 fclose(fp);
1883 return REDIS_ERR;
1884 }
1885
1886 /* Write the SELECT DB opcode */
1887 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
1888 if (rdbSaveLen(fp,j) == -1) goto werr;
1889
1890 /* Iterate this DB writing every entry */
1891 while((de = dictNext(di)) != NULL) {
1892 robj *key = dictGetEntryKey(de);
1893 robj *o = dictGetEntryVal(de);
1894 time_t expiretime = getExpire(db,key);
1895
1896 /* Save the expire time */
1897 if (expiretime != -1) {
1898 /* If this key is already expired skip it */
1899 if (expiretime < now) continue;
1900 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
1901 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
1902 }
1903 /* Save the key and associated value */
1904 if (rdbSaveType(fp,o->type) == -1) goto werr;
1905 if (rdbSaveStringObject(fp,key) == -1) goto werr;
1906 if (o->type == REDIS_STRING) {
1907 /* Save a string value */
1908 if (rdbSaveStringObject(fp,o) == -1) goto werr;
1909 } else if (o->type == REDIS_LIST) {
1910 /* Save a list value */
1911 list *list = o->ptr;
1912 listNode *ln;
1913
1914 listRewind(list);
1915 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
1916 while((ln = listYield(list))) {
1917 robj *eleobj = listNodeValue(ln);
1918
1919 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1920 }
1921 } else if (o->type == REDIS_SET) {
1922 /* Save a set value */
1923 dict *set = o->ptr;
1924 dictIterator *di = dictGetIterator(set);
1925 dictEntry *de;
1926
1927 if (!set) oom("dictGetIteraotr");
1928 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
1929 while((de = dictNext(di)) != NULL) {
1930 robj *eleobj = dictGetEntryKey(de);
1931
1932 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1933 }
1934 dictReleaseIterator(di);
1935 } else {
1936 assert(0 != 0);
1937 }
1938 }
1939 dictReleaseIterator(di);
1940 }
1941 /* EOF opcode */
1942 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
1943
1944 /* Make sure data will not remain on the OS's output buffers */
1945 fflush(fp);
1946 fsync(fileno(fp));
1947 fclose(fp);
1948
1949 /* Use RENAME to make sure the DB file is changed atomically only
1950 * if the generate DB file is ok. */
1951 if (rename(tmpfile,filename) == -1) {
1952 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
1953 unlink(tmpfile);
1954 return REDIS_ERR;
1955 }
1956 redisLog(REDIS_NOTICE,"DB saved on disk");
1957 server.dirty = 0;
1958 server.lastsave = time(NULL);
1959 return REDIS_OK;
1960
1961 werr:
1962 fclose(fp);
1963 unlink(tmpfile);
1964 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1965 if (di) dictReleaseIterator(di);
1966 return REDIS_ERR;
1967 }
1968
1969 static int rdbSaveBackground(char *filename) {
1970 pid_t childpid;
1971
1972 if (server.bgsaveinprogress) return REDIS_ERR;
1973 if ((childpid = fork()) == 0) {
1974 /* Child */
1975 close(server.fd);
1976 if (rdbSave(filename) == REDIS_OK) {
1977 exit(0);
1978 } else {
1979 exit(1);
1980 }
1981 } else {
1982 /* Parent */
1983 if (childpid == -1) {
1984 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
1985 strerror(errno));
1986 return REDIS_ERR;
1987 }
1988 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
1989 server.bgsaveinprogress = 1;
1990 return REDIS_OK;
1991 }
1992 return REDIS_OK; /* unreached */
1993 }
1994
1995 static int rdbLoadType(FILE *fp) {
1996 unsigned char type;
1997 if (fread(&type,1,1,fp) == 0) return -1;
1998 return type;
1999 }
2000
2001 static time_t rdbLoadTime(FILE *fp) {
2002 int32_t t32;
2003 if (fread(&t32,4,1,fp) == 0) return -1;
2004 return (time_t) t32;
2005 }
2006
2007 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2008 * of this file for a description of how this are stored on disk.
2009 *
2010 * isencoded is set to 1 if the readed length is not actually a length but
2011 * an "encoding type", check the above comments for more info */
2012 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2013 unsigned char buf[2];
2014 uint32_t len;
2015
2016 if (isencoded) *isencoded = 0;
2017 if (rdbver == 0) {
2018 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2019 return ntohl(len);
2020 } else {
2021 int type;
2022
2023 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2024 type = (buf[0]&0xC0)>>6;
2025 if (type == REDIS_RDB_6BITLEN) {
2026 /* Read a 6 bit len */
2027 return buf[0]&0x3F;
2028 } else if (type == REDIS_RDB_ENCVAL) {
2029 /* Read a 6 bit len encoding type */
2030 if (isencoded) *isencoded = 1;
2031 return buf[0]&0x3F;
2032 } else if (type == REDIS_RDB_14BITLEN) {
2033 /* Read a 14 bit len */
2034 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2035 return ((buf[0]&0x3F)<<8)|buf[1];
2036 } else {
2037 /* Read a 32 bit len */
2038 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2039 return ntohl(len);
2040 }
2041 }
2042 }
2043
2044 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2045 unsigned char enc[4];
2046 long long val;
2047
2048 if (enctype == REDIS_RDB_ENC_INT8) {
2049 if (fread(enc,1,1,fp) == 0) return NULL;
2050 val = (signed char)enc[0];
2051 } else if (enctype == REDIS_RDB_ENC_INT16) {
2052 uint16_t v;
2053 if (fread(enc,2,1,fp) == 0) return NULL;
2054 v = enc[0]|(enc[1]<<8);
2055 val = (int16_t)v;
2056 } else if (enctype == REDIS_RDB_ENC_INT32) {
2057 uint32_t v;
2058 if (fread(enc,4,1,fp) == 0) return NULL;
2059 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2060 val = (int32_t)v;
2061 } else {
2062 val = 0; /* anti-warning */
2063 assert(0!=0);
2064 }
2065 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2066 }
2067
2068 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2069 unsigned int len, clen;
2070 unsigned char *c = NULL;
2071 sds val = NULL;
2072
2073 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2074 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2075 if ((c = zmalloc(clen)) == NULL) goto err;
2076 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2077 if (fread(c,clen,1,fp) == 0) goto err;
2078 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2079 zfree(c);
2080 return createObject(REDIS_STRING,val);
2081 err:
2082 zfree(c);
2083 sdsfree(val);
2084 return NULL;
2085 }
2086
2087 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2088 int isencoded;
2089 uint32_t len;
2090 sds val;
2091
2092 len = rdbLoadLen(fp,rdbver,&isencoded);
2093 if (isencoded) {
2094 switch(len) {
2095 case REDIS_RDB_ENC_INT8:
2096 case REDIS_RDB_ENC_INT16:
2097 case REDIS_RDB_ENC_INT32:
2098 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2099 case REDIS_RDB_ENC_LZF:
2100 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2101 default:
2102 assert(0!=0);
2103 }
2104 }
2105
2106 if (len == REDIS_RDB_LENERR) return NULL;
2107 val = sdsnewlen(NULL,len);
2108 if (len && fread(val,len,1,fp) == 0) {
2109 sdsfree(val);
2110 return NULL;
2111 }
2112 return tryObjectSharing(createObject(REDIS_STRING,val));
2113 }
2114
2115 static int rdbLoad(char *filename) {
2116 FILE *fp;
2117 robj *keyobj = NULL;
2118 uint32_t dbid;
2119 int type, retval, rdbver;
2120 dict *d = server.db[0].dict;
2121 redisDb *db = server.db+0;
2122 char buf[1024];
2123 time_t expiretime = -1, now = time(NULL);
2124
2125 fp = fopen(filename,"r");
2126 if (!fp) return REDIS_ERR;
2127 if (fread(buf,9,1,fp) == 0) goto eoferr;
2128 buf[9] = '\0';
2129 if (memcmp(buf,"REDIS",5) != 0) {
2130 fclose(fp);
2131 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2132 return REDIS_ERR;
2133 }
2134 rdbver = atoi(buf+5);
2135 if (rdbver > 1) {
2136 fclose(fp);
2137 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2138 return REDIS_ERR;
2139 }
2140 while(1) {
2141 robj *o;
2142
2143 /* Read type. */
2144 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2145 if (type == REDIS_EXPIRETIME) {
2146 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2147 /* We read the time so we need to read the object type again */
2148 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2149 }
2150 if (type == REDIS_EOF) break;
2151 /* Handle SELECT DB opcode as a special case */
2152 if (type == REDIS_SELECTDB) {
2153 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2154 goto eoferr;
2155 if (dbid >= (unsigned)server.dbnum) {
2156 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2157 exit(1);
2158 }
2159 db = server.db+dbid;
2160 d = db->dict;
2161 continue;
2162 }
2163 /* Read key */
2164 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2165
2166 if (type == REDIS_STRING) {
2167 /* Read string value */
2168 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2169 } else if (type == REDIS_LIST || type == REDIS_SET) {
2170 /* Read list/set value */
2171 uint32_t listlen;
2172
2173 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2174 goto eoferr;
2175 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2176 /* Load every single element of the list/set */
2177 while(listlen--) {
2178 robj *ele;
2179
2180 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2181 if (type == REDIS_LIST) {
2182 if (!listAddNodeTail((list*)o->ptr,ele))
2183 oom("listAddNodeTail");
2184 } else {
2185 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2186 oom("dictAdd");
2187 }
2188 }
2189 } else {
2190 assert(0 != 0);
2191 }
2192 /* Add the new object in the hash table */
2193 retval = dictAdd(d,keyobj,o);
2194 if (retval == DICT_ERR) {
2195 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2196 exit(1);
2197 }
2198 /* Set the expire time if needed */
2199 if (expiretime != -1) {
2200 setExpire(db,keyobj,expiretime);
2201 /* Delete this key if already expired */
2202 if (expiretime < now) deleteKey(db,keyobj);
2203 expiretime = -1;
2204 }
2205 keyobj = o = NULL;
2206 }
2207 fclose(fp);
2208 return REDIS_OK;
2209
2210 eoferr: /* unexpected end of file is handled here with a fatal exit */
2211 if (keyobj) decrRefCount(keyobj);
2212 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2213 exit(1);
2214 return REDIS_ERR; /* Just to avoid warning */
2215 }
2216
2217 /*================================== Commands =============================== */
2218
2219 static void authCommand(redisClient *c) {
2220 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2221 c->authenticated = 1;
2222 addReply(c,shared.ok);
2223 } else {
2224 c->authenticated = 0;
2225 addReply(c,shared.err);
2226 }
2227 }
2228
2229 static void pingCommand(redisClient *c) {
2230 addReply(c,shared.pong);
2231 }
2232
2233 static void echoCommand(redisClient *c) {
2234 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
2235 (int)sdslen(c->argv[1]->ptr)));
2236 addReply(c,c->argv[1]);
2237 addReply(c,shared.crlf);
2238 }
2239
2240 /*=================================== Strings =============================== */
2241
2242 static void setGenericCommand(redisClient *c, int nx) {
2243 int retval;
2244
2245 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2246 if (retval == DICT_ERR) {
2247 if (!nx) {
2248 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2249 incrRefCount(c->argv[2]);
2250 } else {
2251 addReply(c,shared.czero);
2252 return;
2253 }
2254 } else {
2255 incrRefCount(c->argv[1]);
2256 incrRefCount(c->argv[2]);
2257 }
2258 server.dirty++;
2259 removeExpire(c->db,c->argv[1]);
2260 addReply(c, nx ? shared.cone : shared.ok);
2261 }
2262
2263 static void setCommand(redisClient *c) {
2264 setGenericCommand(c,0);
2265 }
2266
2267 static void setnxCommand(redisClient *c) {
2268 setGenericCommand(c,1);
2269 }
2270
2271 static void getCommand(redisClient *c) {
2272 robj *o = lookupKeyRead(c->db,c->argv[1]);
2273
2274 if (o == NULL) {
2275 addReply(c,shared.nullbulk);
2276 } else {
2277 if (o->type != REDIS_STRING) {
2278 addReply(c,shared.wrongtypeerr);
2279 } else {
2280 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2281 addReply(c,o);
2282 addReply(c,shared.crlf);
2283 }
2284 }
2285 }
2286
2287 static void getSetCommand(redisClient *c) {
2288 getCommand(c);
2289 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2290 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2291 } else {
2292 incrRefCount(c->argv[1]);
2293 }
2294 incrRefCount(c->argv[2]);
2295 server.dirty++;
2296 removeExpire(c->db,c->argv[1]);
2297 }
2298
2299 static void mgetCommand(redisClient *c) {
2300 int j;
2301
2302 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2303 for (j = 1; j < c->argc; j++) {
2304 robj *o = lookupKeyRead(c->db,c->argv[j]);
2305 if (o == NULL) {
2306 addReply(c,shared.nullbulk);
2307 } else {
2308 if (o->type != REDIS_STRING) {
2309 addReply(c,shared.nullbulk);
2310 } else {
2311 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2312 addReply(c,o);
2313 addReply(c,shared.crlf);
2314 }
2315 }
2316 }
2317 }
2318
2319 static void incrDecrCommand(redisClient *c, long long incr) {
2320 long long value;
2321 int retval;
2322 robj *o;
2323
2324 o = lookupKeyWrite(c->db,c->argv[1]);
2325 if (o == NULL) {
2326 value = 0;
2327 } else {
2328 if (o->type != REDIS_STRING) {
2329 value = 0;
2330 } else {
2331 char *eptr;
2332
2333 value = strtoll(o->ptr, &eptr, 10);
2334 }
2335 }
2336
2337 value += incr;
2338 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2339 retval = dictAdd(c->db->dict,c->argv[1],o);
2340 if (retval == DICT_ERR) {
2341 dictReplace(c->db->dict,c->argv[1],o);
2342 removeExpire(c->db,c->argv[1]);
2343 } else {
2344 incrRefCount(c->argv[1]);
2345 }
2346 server.dirty++;
2347 addReply(c,shared.colon);
2348 addReply(c,o);
2349 addReply(c,shared.crlf);
2350 }
2351
2352 static void incrCommand(redisClient *c) {
2353 incrDecrCommand(c,1);
2354 }
2355
2356 static void decrCommand(redisClient *c) {
2357 incrDecrCommand(c,-1);
2358 }
2359
2360 static void incrbyCommand(redisClient *c) {
2361 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2362 incrDecrCommand(c,incr);
2363 }
2364
2365 static void decrbyCommand(redisClient *c) {
2366 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2367 incrDecrCommand(c,-incr);
2368 }
2369
2370 /* ========================= Type agnostic commands ========================= */
2371
2372 static void delCommand(redisClient *c) {
2373 int deleted = 0, j;
2374
2375 for (j = 1; j < c->argc; j++) {
2376 if (deleteKey(c->db,c->argv[j])) {
2377 server.dirty++;
2378 deleted++;
2379 }
2380 }
2381 switch(deleted) {
2382 case 0:
2383 addReply(c,shared.czero);
2384 break;
2385 case 1:
2386 addReply(c,shared.cone);
2387 break;
2388 default:
2389 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2390 break;
2391 }
2392 }
2393
2394 static void existsCommand(redisClient *c) {
2395 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2396 }
2397
2398 static void selectCommand(redisClient *c) {
2399 int id = atoi(c->argv[1]->ptr);
2400
2401 if (selectDb(c,id) == REDIS_ERR) {
2402 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2403 } else {
2404 addReply(c,shared.ok);
2405 }
2406 }
2407
2408 static void randomkeyCommand(redisClient *c) {
2409 dictEntry *de;
2410
2411 while(1) {
2412 de = dictGetRandomKey(c->db->dict);
2413 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2414 }
2415 if (de == NULL) {
2416 addReply(c,shared.plus);
2417 addReply(c,shared.crlf);
2418 } else {
2419 addReply(c,shared.plus);
2420 addReply(c,dictGetEntryKey(de));
2421 addReply(c,shared.crlf);
2422 }
2423 }
2424
2425 static void keysCommand(redisClient *c) {
2426 dictIterator *di;
2427 dictEntry *de;
2428 sds pattern = c->argv[1]->ptr;
2429 int plen = sdslen(pattern);
2430 int numkeys = 0, keyslen = 0;
2431 robj *lenobj = createObject(REDIS_STRING,NULL);
2432
2433 di = dictGetIterator(c->db->dict);
2434 if (!di) oom("dictGetIterator");
2435 addReply(c,lenobj);
2436 decrRefCount(lenobj);
2437 while((de = dictNext(di)) != NULL) {
2438 robj *keyobj = dictGetEntryKey(de);
2439
2440 sds key = keyobj->ptr;
2441 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2442 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2443 if (expireIfNeeded(c->db,keyobj) == 0) {
2444 if (numkeys != 0)
2445 addReply(c,shared.space);
2446 addReply(c,keyobj);
2447 numkeys++;
2448 keyslen += sdslen(key);
2449 }
2450 }
2451 }
2452 dictReleaseIterator(di);
2453 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2454 addReply(c,shared.crlf);
2455 }
2456
2457 static void dbsizeCommand(redisClient *c) {
2458 addReplySds(c,
2459 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2460 }
2461
2462 static void lastsaveCommand(redisClient *c) {
2463 addReplySds(c,
2464 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2465 }
2466
2467 static void typeCommand(redisClient *c) {
2468 robj *o;
2469 char *type;
2470
2471 o = lookupKeyRead(c->db,c->argv[1]);
2472 if (o == NULL) {
2473 type = "+none";
2474 } else {
2475 switch(o->type) {
2476 case REDIS_STRING: type = "+string"; break;
2477 case REDIS_LIST: type = "+list"; break;
2478 case REDIS_SET: type = "+set"; break;
2479 default: type = "unknown"; break;
2480 }
2481 }
2482 addReplySds(c,sdsnew(type));
2483 addReply(c,shared.crlf);
2484 }
2485
2486 static void saveCommand(redisClient *c) {
2487 if (server.bgsaveinprogress) {
2488 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2489 return;
2490 }
2491 if (rdbSave(server.dbfilename) == REDIS_OK) {
2492 addReply(c,shared.ok);
2493 } else {
2494 addReply(c,shared.err);
2495 }
2496 }
2497
2498 static void bgsaveCommand(redisClient *c) {
2499 if (server.bgsaveinprogress) {
2500 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2501 return;
2502 }
2503 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2504 addReply(c,shared.ok);
2505 } else {
2506 addReply(c,shared.err);
2507 }
2508 }
2509
2510 static void shutdownCommand(redisClient *c) {
2511 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2512 /* XXX: TODO kill the child if there is a bgsave in progress */
2513 if (rdbSave(server.dbfilename) == REDIS_OK) {
2514 if (server.daemonize) {
2515 unlink(server.pidfile);
2516 }
2517 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2518 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2519 exit(1);
2520 } else {
2521 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2522 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2523 }
2524 }
2525
2526 static void renameGenericCommand(redisClient *c, int nx) {
2527 robj *o;
2528
2529 /* To use the same key as src and dst is probably an error */
2530 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2531 addReply(c,shared.sameobjecterr);
2532 return;
2533 }
2534
2535 o = lookupKeyWrite(c->db,c->argv[1]);
2536 if (o == NULL) {
2537 addReply(c,shared.nokeyerr);
2538 return;
2539 }
2540 incrRefCount(o);
2541 deleteIfVolatile(c->db,c->argv[2]);
2542 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2543 if (nx) {
2544 decrRefCount(o);
2545 addReply(c,shared.czero);
2546 return;
2547 }
2548 dictReplace(c->db->dict,c->argv[2],o);
2549 } else {
2550 incrRefCount(c->argv[2]);
2551 }
2552 deleteKey(c->db,c->argv[1]);
2553 server.dirty++;
2554 addReply(c,nx ? shared.cone : shared.ok);
2555 }
2556
2557 static void renameCommand(redisClient *c) {
2558 renameGenericCommand(c,0);
2559 }
2560
2561 static void renamenxCommand(redisClient *c) {
2562 renameGenericCommand(c,1);
2563 }
2564
2565 static void moveCommand(redisClient *c) {
2566 robj *o;
2567 redisDb *src, *dst;
2568 int srcid;
2569
2570 /* Obtain source and target DB pointers */
2571 src = c->db;
2572 srcid = c->db->id;
2573 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2574 addReply(c,shared.outofrangeerr);
2575 return;
2576 }
2577 dst = c->db;
2578 selectDb(c,srcid); /* Back to the source DB */
2579
2580 /* If the user is moving using as target the same
2581 * DB as the source DB it is probably an error. */
2582 if (src == dst) {
2583 addReply(c,shared.sameobjecterr);
2584 return;
2585 }
2586
2587 /* Check if the element exists and get a reference */
2588 o = lookupKeyWrite(c->db,c->argv[1]);
2589 if (!o) {
2590 addReply(c,shared.czero);
2591 return;
2592 }
2593
2594 /* Try to add the element to the target DB */
2595 deleteIfVolatile(dst,c->argv[1]);
2596 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2597 addReply(c,shared.czero);
2598 return;
2599 }
2600 incrRefCount(c->argv[1]);
2601 incrRefCount(o);
2602
2603 /* OK! key moved, free the entry in the source DB */
2604 deleteKey(src,c->argv[1]);
2605 server.dirty++;
2606 addReply(c,shared.cone);
2607 }
2608
2609 /* =================================== Lists ================================ */
2610 static void pushGenericCommand(redisClient *c, int where) {
2611 robj *lobj;
2612 list *list;
2613
2614 lobj = lookupKeyWrite(c->db,c->argv[1]);
2615 if (lobj == NULL) {
2616 lobj = createListObject();
2617 list = lobj->ptr;
2618 if (where == REDIS_HEAD) {
2619 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2620 } else {
2621 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2622 }
2623 dictAdd(c->db->dict,c->argv[1],lobj);
2624 incrRefCount(c->argv[1]);
2625 incrRefCount(c->argv[2]);
2626 } else {
2627 if (lobj->type != REDIS_LIST) {
2628 addReply(c,shared.wrongtypeerr);
2629 return;
2630 }
2631 list = lobj->ptr;
2632 if (where == REDIS_HEAD) {
2633 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2634 } else {
2635 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2636 }
2637 incrRefCount(c->argv[2]);
2638 }
2639 server.dirty++;
2640 addReply(c,shared.ok);
2641 }
2642
2643 static void lpushCommand(redisClient *c) {
2644 pushGenericCommand(c,REDIS_HEAD);
2645 }
2646
2647 static void rpushCommand(redisClient *c) {
2648 pushGenericCommand(c,REDIS_TAIL);
2649 }
2650
2651 static void llenCommand(redisClient *c) {
2652 robj *o;
2653 list *l;
2654
2655 o = lookupKeyRead(c->db,c->argv[1]);
2656 if (o == NULL) {
2657 addReply(c,shared.czero);
2658 return;
2659 } else {
2660 if (o->type != REDIS_LIST) {
2661 addReply(c,shared.wrongtypeerr);
2662 } else {
2663 l = o->ptr;
2664 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
2665 }
2666 }
2667 }
2668
2669 static void lindexCommand(redisClient *c) {
2670 robj *o;
2671 int index = atoi(c->argv[2]->ptr);
2672
2673 o = lookupKeyRead(c->db,c->argv[1]);
2674 if (o == NULL) {
2675 addReply(c,shared.nullbulk);
2676 } else {
2677 if (o->type != REDIS_LIST) {
2678 addReply(c,shared.wrongtypeerr);
2679 } else {
2680 list *list = o->ptr;
2681 listNode *ln;
2682
2683 ln = listIndex(list, index);
2684 if (ln == NULL) {
2685 addReply(c,shared.nullbulk);
2686 } else {
2687 robj *ele = listNodeValue(ln);
2688 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2689 addReply(c,ele);
2690 addReply(c,shared.crlf);
2691 }
2692 }
2693 }
2694 }
2695
2696 static void lsetCommand(redisClient *c) {
2697 robj *o;
2698 int index = atoi(c->argv[2]->ptr);
2699
2700 o = lookupKeyWrite(c->db,c->argv[1]);
2701 if (o == NULL) {
2702 addReply(c,shared.nokeyerr);
2703 } else {
2704 if (o->type != REDIS_LIST) {
2705 addReply(c,shared.wrongtypeerr);
2706 } else {
2707 list *list = o->ptr;
2708 listNode *ln;
2709
2710 ln = listIndex(list, index);
2711 if (ln == NULL) {
2712 addReply(c,shared.outofrangeerr);
2713 } else {
2714 robj *ele = listNodeValue(ln);
2715
2716 decrRefCount(ele);
2717 listNodeValue(ln) = c->argv[3];
2718 incrRefCount(c->argv[3]);
2719 addReply(c,shared.ok);
2720 server.dirty++;
2721 }
2722 }
2723 }
2724 }
2725
2726 static void popGenericCommand(redisClient *c, int where) {
2727 robj *o;
2728
2729 o = lookupKeyWrite(c->db,c->argv[1]);
2730 if (o == NULL) {
2731 addReply(c,shared.nullbulk);
2732 } else {
2733 if (o->type != REDIS_LIST) {
2734 addReply(c,shared.wrongtypeerr);
2735 } else {
2736 list *list = o->ptr;
2737 listNode *ln;
2738
2739 if (where == REDIS_HEAD)
2740 ln = listFirst(list);
2741 else
2742 ln = listLast(list);
2743
2744 if (ln == NULL) {
2745 addReply(c,shared.nullbulk);
2746 } else {
2747 robj *ele = listNodeValue(ln);
2748 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2749 addReply(c,ele);
2750 addReply(c,shared.crlf);
2751 listDelNode(list,ln);
2752 server.dirty++;
2753 }
2754 }
2755 }
2756 }
2757
2758 static void lpopCommand(redisClient *c) {
2759 popGenericCommand(c,REDIS_HEAD);
2760 }
2761
2762 static void rpopCommand(redisClient *c) {
2763 popGenericCommand(c,REDIS_TAIL);
2764 }
2765
2766 static void lrangeCommand(redisClient *c) {
2767 robj *o;
2768 int start = atoi(c->argv[2]->ptr);
2769 int end = atoi(c->argv[3]->ptr);
2770
2771 o = lookupKeyRead(c->db,c->argv[1]);
2772 if (o == NULL) {
2773 addReply(c,shared.nullmultibulk);
2774 } else {
2775 if (o->type != REDIS_LIST) {
2776 addReply(c,shared.wrongtypeerr);
2777 } else {
2778 list *list = o->ptr;
2779 listNode *ln;
2780 int llen = listLength(list);
2781 int rangelen, j;
2782 robj *ele;
2783
2784 /* convert negative indexes */
2785 if (start < 0) start = llen+start;
2786 if (end < 0) end = llen+end;
2787 if (start < 0) start = 0;
2788 if (end < 0) end = 0;
2789
2790 /* indexes sanity checks */
2791 if (start > end || start >= llen) {
2792 /* Out of range start or start > end result in empty list */
2793 addReply(c,shared.emptymultibulk);
2794 return;
2795 }
2796 if (end >= llen) end = llen-1;
2797 rangelen = (end-start)+1;
2798
2799 /* Return the result in form of a multi-bulk reply */
2800 ln = listIndex(list, start);
2801 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
2802 for (j = 0; j < rangelen; j++) {
2803 ele = listNodeValue(ln);
2804 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2805 addReply(c,ele);
2806 addReply(c,shared.crlf);
2807 ln = ln->next;
2808 }
2809 }
2810 }
2811 }
2812
2813 static void ltrimCommand(redisClient *c) {
2814 robj *o;
2815 int start = atoi(c->argv[2]->ptr);
2816 int end = atoi(c->argv[3]->ptr);
2817
2818 o = lookupKeyWrite(c->db,c->argv[1]);
2819 if (o == NULL) {
2820 addReply(c,shared.nokeyerr);
2821 } else {
2822 if (o->type != REDIS_LIST) {
2823 addReply(c,shared.wrongtypeerr);
2824 } else {
2825 list *list = o->ptr;
2826 listNode *ln;
2827 int llen = listLength(list);
2828 int j, ltrim, rtrim;
2829
2830 /* convert negative indexes */
2831 if (start < 0) start = llen+start;
2832 if (end < 0) end = llen+end;
2833 if (start < 0) start = 0;
2834 if (end < 0) end = 0;
2835
2836 /* indexes sanity checks */
2837 if (start > end || start >= llen) {
2838 /* Out of range start or start > end result in empty list */
2839 ltrim = llen;
2840 rtrim = 0;
2841 } else {
2842 if (end >= llen) end = llen-1;
2843 ltrim = start;
2844 rtrim = llen-end-1;
2845 }
2846
2847 /* Remove list elements to perform the trim */
2848 for (j = 0; j < ltrim; j++) {
2849 ln = listFirst(list);
2850 listDelNode(list,ln);
2851 }
2852 for (j = 0; j < rtrim; j++) {
2853 ln = listLast(list);
2854 listDelNode(list,ln);
2855 }
2856 addReply(c,shared.ok);
2857 server.dirty++;
2858 }
2859 }
2860 }
2861
2862 static void lremCommand(redisClient *c) {
2863 robj *o;
2864
2865 o = lookupKeyWrite(c->db,c->argv[1]);
2866 if (o == NULL) {
2867 addReply(c,shared.nokeyerr);
2868 } else {
2869 if (o->type != REDIS_LIST) {
2870 addReply(c,shared.wrongtypeerr);
2871 } else {
2872 list *list = o->ptr;
2873 listNode *ln, *next;
2874 int toremove = atoi(c->argv[2]->ptr);
2875 int removed = 0;
2876 int fromtail = 0;
2877
2878 if (toremove < 0) {
2879 toremove = -toremove;
2880 fromtail = 1;
2881 }
2882 ln = fromtail ? list->tail : list->head;
2883 while (ln) {
2884 robj *ele = listNodeValue(ln);
2885
2886 next = fromtail ? ln->prev : ln->next;
2887 if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) {
2888 listDelNode(list,ln);
2889 server.dirty++;
2890 removed++;
2891 if (toremove && removed == toremove) break;
2892 }
2893 ln = next;
2894 }
2895 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
2896 }
2897 }
2898 }
2899
2900 /* ==================================== Sets ================================ */
2901
2902 static void saddCommand(redisClient *c) {
2903 robj *set;
2904
2905 set = lookupKeyWrite(c->db,c->argv[1]);
2906 if (set == NULL) {
2907 set = createSetObject();
2908 dictAdd(c->db->dict,c->argv[1],set);
2909 incrRefCount(c->argv[1]);
2910 } else {
2911 if (set->type != REDIS_SET) {
2912 addReply(c,shared.wrongtypeerr);
2913 return;
2914 }
2915 }
2916 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
2917 incrRefCount(c->argv[2]);
2918 server.dirty++;
2919 addReply(c,shared.cone);
2920 } else {
2921 addReply(c,shared.czero);
2922 }
2923 }
2924
2925 static void sremCommand(redisClient *c) {
2926 robj *set;
2927
2928 set = lookupKeyWrite(c->db,c->argv[1]);
2929 if (set == NULL) {
2930 addReply(c,shared.czero);
2931 } else {
2932 if (set->type != REDIS_SET) {
2933 addReply(c,shared.wrongtypeerr);
2934 return;
2935 }
2936 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
2937 server.dirty++;
2938 addReply(c,shared.cone);
2939 } else {
2940 addReply(c,shared.czero);
2941 }
2942 }
2943 }
2944
2945 static void smoveCommand(redisClient *c) {
2946 robj *srcset, *dstset;
2947
2948 srcset = lookupKeyWrite(c->db,c->argv[1]);
2949 dstset = lookupKeyWrite(c->db,c->argv[2]);
2950
2951 /* If the source key does not exist return 0, if it's of the wrong type
2952 * raise an error */
2953 if (srcset == NULL || srcset->type != REDIS_SET) {
2954 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
2955 return;
2956 }
2957 /* Error if the destination key is not a set as well */
2958 if (dstset && dstset->type != REDIS_SET) {
2959 addReply(c,shared.wrongtypeerr);
2960 return;
2961 }
2962 /* Remove the element from the source set */
2963 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
2964 /* Key not found in the src set! return zero */
2965 addReply(c,shared.czero);
2966 return;
2967 }
2968 server.dirty++;
2969 /* Add the element to the destination set */
2970 if (!dstset) {
2971 dstset = createSetObject();
2972 dictAdd(c->db->dict,c->argv[2],dstset);
2973 incrRefCount(c->argv[2]);
2974 }
2975 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
2976 incrRefCount(c->argv[3]);
2977 addReply(c,shared.cone);
2978 }
2979
2980 static void sismemberCommand(redisClient *c) {
2981 robj *set;
2982
2983 set = lookupKeyRead(c->db,c->argv[1]);
2984 if (set == NULL) {
2985 addReply(c,shared.czero);
2986 } else {
2987 if (set->type != REDIS_SET) {
2988 addReply(c,shared.wrongtypeerr);
2989 return;
2990 }
2991 if (dictFind(set->ptr,c->argv[2]))
2992 addReply(c,shared.cone);
2993 else
2994 addReply(c,shared.czero);
2995 }
2996 }
2997
2998 static void scardCommand(redisClient *c) {
2999 robj *o;
3000 dict *s;
3001
3002 o = lookupKeyRead(c->db,c->argv[1]);
3003 if (o == NULL) {
3004 addReply(c,shared.czero);
3005 return;
3006 } else {
3007 if (o->type != REDIS_SET) {
3008 addReply(c,shared.wrongtypeerr);
3009 } else {
3010 s = o->ptr;
3011 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3012 dictSize(s)));
3013 }
3014 }
3015 }
3016
3017 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3018 dict **d1 = (void*) s1, **d2 = (void*) s2;
3019
3020 return dictSize(*d1)-dictSize(*d2);
3021 }
3022
3023 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3024 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3025 dictIterator *di;
3026 dictEntry *de;
3027 robj *lenobj = NULL, *dstset = NULL;
3028 int j, cardinality = 0;
3029
3030 if (!dv) oom("sinterGenericCommand");
3031 for (j = 0; j < setsnum; j++) {
3032 robj *setobj;
3033
3034 setobj = dstkey ?
3035 lookupKeyWrite(c->db,setskeys[j]) :
3036 lookupKeyRead(c->db,setskeys[j]);
3037 if (!setobj) {
3038 zfree(dv);
3039 if (dstkey) {
3040 deleteKey(c->db,dstkey);
3041 addReply(c,shared.ok);
3042 } else {
3043 addReply(c,shared.nullmultibulk);
3044 }
3045 return;
3046 }
3047 if (setobj->type != REDIS_SET) {
3048 zfree(dv);
3049 addReply(c,shared.wrongtypeerr);
3050 return;
3051 }
3052 dv[j] = setobj->ptr;
3053 }
3054 /* Sort sets from the smallest to largest, this will improve our
3055 * algorithm's performace */
3056 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3057
3058 /* The first thing we should output is the total number of elements...
3059 * since this is a multi-bulk write, but at this stage we don't know
3060 * the intersection set size, so we use a trick, append an empty object
3061 * to the output list and save the pointer to later modify it with the
3062 * right length */
3063 if (!dstkey) {
3064 lenobj = createObject(REDIS_STRING,NULL);
3065 addReply(c,lenobj);
3066 decrRefCount(lenobj);
3067 } else {
3068 /* If we have a target key where to store the resulting set
3069 * create this key with an empty set inside */
3070 dstset = createSetObject();
3071 }
3072
3073 /* Iterate all the elements of the first (smallest) set, and test
3074 * the element against all the other sets, if at least one set does
3075 * not include the element it is discarded */
3076 di = dictGetIterator(dv[0]);
3077 if (!di) oom("dictGetIterator");
3078
3079 while((de = dictNext(di)) != NULL) {
3080 robj *ele;
3081
3082 for (j = 1; j < setsnum; j++)
3083 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3084 if (j != setsnum)
3085 continue; /* at least one set does not contain the member */
3086 ele = dictGetEntryKey(de);
3087 if (!dstkey) {
3088 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
3089 addReply(c,ele);
3090 addReply(c,shared.crlf);
3091 cardinality++;
3092 } else {
3093 dictAdd(dstset->ptr,ele,NULL);
3094 incrRefCount(ele);
3095 }
3096 }
3097 dictReleaseIterator(di);
3098
3099 if (dstkey) {
3100 /* Store the resulting set into the target */
3101 deleteKey(c->db,dstkey);
3102 dictAdd(c->db->dict,dstkey,dstset);
3103 incrRefCount(dstkey);
3104 }
3105
3106 if (!dstkey) {
3107 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3108 } else {
3109 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3110 dictSize((dict*)dstset->ptr)));
3111 server.dirty++;
3112 }
3113 zfree(dv);
3114 }
3115
3116 static void sinterCommand(redisClient *c) {
3117 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3118 }
3119
3120 static void sinterstoreCommand(redisClient *c) {
3121 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3122 }
3123
3124 #define REDIS_OP_UNION 0
3125 #define REDIS_OP_DIFF 1
3126
3127 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3128 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3129 dictIterator *di;
3130 dictEntry *de;
3131 robj *dstset = NULL;
3132 int j, cardinality = 0;
3133
3134 if (!dv) oom("sunionDiffGenericCommand");
3135 for (j = 0; j < setsnum; j++) {
3136 robj *setobj;
3137
3138 setobj = dstkey ?
3139 lookupKeyWrite(c->db,setskeys[j]) :
3140 lookupKeyRead(c->db,setskeys[j]);
3141 if (!setobj) {
3142 dv[j] = NULL;
3143 continue;
3144 }
3145 if (setobj->type != REDIS_SET) {
3146 zfree(dv);
3147 addReply(c,shared.wrongtypeerr);
3148 return;
3149 }
3150 dv[j] = setobj->ptr;
3151 }
3152
3153 /* We need a temp set object to store our union. If the dstkey
3154 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3155 * this set object will be the resulting object to set into the target key*/
3156 dstset = createSetObject();
3157
3158 /* Iterate all the elements of all the sets, add every element a single
3159 * time to the result set */
3160 for (j = 0; j < setsnum; j++) {
3161 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3162 if (!dv[j]) continue; /* non existing keys are like empty sets */
3163
3164 di = dictGetIterator(dv[j]);
3165 if (!di) oom("dictGetIterator");
3166
3167 while((de = dictNext(di)) != NULL) {
3168 robj *ele;
3169
3170 /* dictAdd will not add the same element multiple times */
3171 ele = dictGetEntryKey(de);
3172 if (op == REDIS_OP_UNION || j == 0) {
3173 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3174 incrRefCount(ele);
3175 cardinality++;
3176 }
3177 } else if (op == REDIS_OP_DIFF) {
3178 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3179 cardinality--;
3180 }
3181 }
3182 }
3183 dictReleaseIterator(di);
3184
3185 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3186 }
3187
3188 /* Output the content of the resulting set, if not in STORE mode */
3189 if (!dstkey) {
3190 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3191 di = dictGetIterator(dstset->ptr);
3192 if (!di) oom("dictGetIterator");
3193 while((de = dictNext(di)) != NULL) {
3194 robj *ele;
3195
3196 ele = dictGetEntryKey(de);
3197 addReplySds(c,sdscatprintf(sdsempty(),
3198 "$%d\r\n",sdslen(ele->ptr)));
3199 addReply(c,ele);
3200 addReply(c,shared.crlf);
3201 }
3202 dictReleaseIterator(di);
3203 } else {
3204 /* If we have a target key where to store the resulting set
3205 * create this key with the result set inside */
3206 deleteKey(c->db,dstkey);
3207 dictAdd(c->db->dict,dstkey,dstset);
3208 incrRefCount(dstkey);
3209 }
3210
3211 /* Cleanup */
3212 if (!dstkey) {
3213 decrRefCount(dstset);
3214 } else {
3215 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3216 dictSize((dict*)dstset->ptr)));
3217 server.dirty++;
3218 }
3219 zfree(dv);
3220 }
3221
3222 static void sunionCommand(redisClient *c) {
3223 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3224 }
3225
3226 static void sunionstoreCommand(redisClient *c) {
3227 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3228 }
3229
3230 static void sdiffCommand(redisClient *c) {
3231 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3232 }
3233
3234 static void sdiffstoreCommand(redisClient *c) {
3235 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3236 }
3237
3238 static void flushdbCommand(redisClient *c) {
3239 server.dirty += dictSize(c->db->dict);
3240 dictEmpty(c->db->dict);
3241 dictEmpty(c->db->expires);
3242 addReply(c,shared.ok);
3243 }
3244
3245 static void flushallCommand(redisClient *c) {
3246 server.dirty += emptyDb();
3247 addReply(c,shared.ok);
3248 rdbSave(server.dbfilename);
3249 server.dirty++;
3250 }
3251
3252 redisSortOperation *createSortOperation(int type, robj *pattern) {
3253 redisSortOperation *so = zmalloc(sizeof(*so));
3254 if (!so) oom("createSortOperation");
3255 so->type = type;
3256 so->pattern = pattern;
3257 return so;
3258 }
3259
3260 /* Return the value associated to the key with a name obtained
3261 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3262 robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3263 char *p;
3264 sds spat, ssub;
3265 robj keyobj;
3266 int prefixlen, sublen, postfixlen;
3267 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3268 struct {
3269 long len;
3270 long free;
3271 char buf[REDIS_SORTKEY_MAX+1];
3272 } keyname;
3273
3274 spat = pattern->ptr;
3275 ssub = subst->ptr;
3276 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3277 p = strchr(spat,'*');
3278 if (!p) return NULL;
3279
3280 prefixlen = p-spat;
3281 sublen = sdslen(ssub);
3282 postfixlen = sdslen(spat)-(prefixlen+1);
3283 memcpy(keyname.buf,spat,prefixlen);
3284 memcpy(keyname.buf+prefixlen,ssub,sublen);
3285 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3286 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3287 keyname.len = prefixlen+sublen+postfixlen;
3288
3289 keyobj.refcount = 1;
3290 keyobj.type = REDIS_STRING;
3291 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3292
3293 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3294 return lookupKeyRead(db,&keyobj);
3295 }
3296
3297 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3298 * the additional parameter is not standard but a BSD-specific we have to
3299 * pass sorting parameters via the global 'server' structure */
3300 static int sortCompare(const void *s1, const void *s2) {
3301 const redisSortObject *so1 = s1, *so2 = s2;
3302 int cmp;
3303
3304 if (!server.sort_alpha) {
3305 /* Numeric sorting. Here it's trivial as we precomputed scores */
3306 if (so1->u.score > so2->u.score) {
3307 cmp = 1;
3308 } else if (so1->u.score < so2->u.score) {
3309 cmp = -1;
3310 } else {
3311 cmp = 0;
3312 }
3313 } else {
3314 /* Alphanumeric sorting */
3315 if (server.sort_bypattern) {
3316 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3317 /* At least one compare object is NULL */
3318 if (so1->u.cmpobj == so2->u.cmpobj)
3319 cmp = 0;
3320 else if (so1->u.cmpobj == NULL)
3321 cmp = -1;
3322 else
3323 cmp = 1;
3324 } else {
3325 /* We have both the objects, use strcoll */
3326 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3327 }
3328 } else {
3329 /* Compare elements directly */
3330 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3331 }
3332 }
3333 return server.sort_desc ? -cmp : cmp;
3334 }
3335
3336 /* The SORT command is the most complex command in Redis. Warning: this code
3337 * is optimized for speed and a bit less for readability */
3338 static void sortCommand(redisClient *c) {
3339 list *operations;
3340 int outputlen = 0;
3341 int desc = 0, alpha = 0;
3342 int limit_start = 0, limit_count = -1, start, end;
3343 int j, dontsort = 0, vectorlen;
3344 int getop = 0; /* GET operation counter */
3345 robj *sortval, *sortby = NULL;
3346 redisSortObject *vector; /* Resulting vector to sort */
3347
3348 /* Lookup the key to sort. It must be of the right types */
3349 sortval = lookupKeyRead(c->db,c->argv[1]);
3350 if (sortval == NULL) {
3351 addReply(c,shared.nokeyerr);
3352 return;
3353 }
3354 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3355 addReply(c,shared.wrongtypeerr);
3356 return;
3357 }
3358
3359 /* Create a list of operations to perform for every sorted element.
3360 * Operations can be GET/DEL/INCR/DECR */
3361 operations = listCreate();
3362 listSetFreeMethod(operations,zfree);
3363 j = 2;
3364
3365 /* Now we need to protect sortval incrementing its count, in the future
3366 * SORT may have options able to overwrite/delete keys during the sorting
3367 * and the sorted key itself may get destroied */
3368 incrRefCount(sortval);
3369
3370 /* The SORT command has an SQL-alike syntax, parse it */
3371 while(j < c->argc) {
3372 int leftargs = c->argc-j-1;
3373 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3374 desc = 0;
3375 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3376 desc = 1;
3377 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3378 alpha = 1;
3379 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3380 limit_start = atoi(c->argv[j+1]->ptr);
3381 limit_count = atoi(c->argv[j+2]->ptr);
3382 j+=2;
3383 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3384 sortby = c->argv[j+1];
3385 /* If the BY pattern does not contain '*', i.e. it is constant,
3386 * we don't need to sort nor to lookup the weight keys. */
3387 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3388 j++;
3389 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3390 listAddNodeTail(operations,createSortOperation(
3391 REDIS_SORT_GET,c->argv[j+1]));
3392 getop++;
3393 j++;
3394 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3395 listAddNodeTail(operations,createSortOperation(
3396 REDIS_SORT_DEL,c->argv[j+1]));
3397 j++;
3398 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3399 listAddNodeTail(operations,createSortOperation(
3400 REDIS_SORT_INCR,c->argv[j+1]));
3401 j++;
3402 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3403 listAddNodeTail(operations,createSortOperation(
3404 REDIS_SORT_DECR,c->argv[j+1]));
3405 j++;
3406 } else {
3407 decrRefCount(sortval);
3408 listRelease(operations);
3409 addReply(c,shared.syntaxerr);
3410 return;
3411 }
3412 j++;
3413 }
3414
3415 /* Load the sorting vector with all the objects to sort */
3416 vectorlen = (sortval->type == REDIS_LIST) ?
3417 listLength((list*)sortval->ptr) :
3418 dictSize((dict*)sortval->ptr);
3419 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3420 if (!vector) oom("allocating objects vector for SORT");
3421 j = 0;
3422 if (sortval->type == REDIS_LIST) {
3423 list *list = sortval->ptr;
3424 listNode *ln;
3425
3426 listRewind(list);
3427 while((ln = listYield(list))) {
3428 robj *ele = ln->value;
3429 vector[j].obj = ele;
3430 vector[j].u.score = 0;
3431 vector[j].u.cmpobj = NULL;
3432 j++;
3433 }
3434 } else {
3435 dict *set = sortval->ptr;
3436 dictIterator *di;
3437 dictEntry *setele;
3438
3439 di = dictGetIterator(set);
3440 if (!di) oom("dictGetIterator");
3441 while((setele = dictNext(di)) != NULL) {
3442 vector[j].obj = dictGetEntryKey(setele);
3443 vector[j].u.score = 0;
3444 vector[j].u.cmpobj = NULL;
3445 j++;
3446 }
3447 dictReleaseIterator(di);
3448 }
3449 assert(j == vectorlen);
3450
3451 /* Now it's time to load the right scores in the sorting vector */
3452 if (dontsort == 0) {
3453 for (j = 0; j < vectorlen; j++) {
3454 if (sortby) {
3455 robj *byval;
3456
3457 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3458 if (!byval || byval->type != REDIS_STRING) continue;
3459 if (alpha) {
3460 vector[j].u.cmpobj = byval;
3461 incrRefCount(byval);
3462 } else {
3463 vector[j].u.score = strtod(byval->ptr,NULL);
3464 }
3465 } else {
3466 if (!alpha) vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3467 }
3468 }
3469 }
3470
3471 /* We are ready to sort the vector... perform a bit of sanity check
3472 * on the LIMIT option too. We'll use a partial version of quicksort. */
3473 start = (limit_start < 0) ? 0 : limit_start;
3474 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3475 if (start >= vectorlen) {
3476 start = vectorlen-1;
3477 end = vectorlen-2;
3478 }
3479 if (end >= vectorlen) end = vectorlen-1;
3480
3481 if (dontsort == 0) {
3482 server.sort_desc = desc;
3483 server.sort_alpha = alpha;
3484 server.sort_bypattern = sortby ? 1 : 0;
3485 if (sortby && (start != 0 || end != vectorlen-1))
3486 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3487 else
3488 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3489 }
3490
3491 /* Send command output to the output buffer, performing the specified
3492 * GET/DEL/INCR/DECR operations if any. */
3493 outputlen = getop ? getop*(end-start+1) : end-start+1;
3494 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3495 for (j = start; j <= end; j++) {
3496 listNode *ln;
3497 if (!getop) {
3498 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3499 sdslen(vector[j].obj->ptr)));
3500 addReply(c,vector[j].obj);
3501 addReply(c,shared.crlf);
3502 }
3503 listRewind(operations);
3504 while((ln = listYield(operations))) {
3505 redisSortOperation *sop = ln->value;
3506 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3507 vector[j].obj);
3508
3509 if (sop->type == REDIS_SORT_GET) {
3510 if (!val || val->type != REDIS_STRING) {
3511 addReply(c,shared.nullbulk);
3512 } else {
3513 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3514 sdslen(val->ptr)));
3515 addReply(c,val);
3516 addReply(c,shared.crlf);
3517 }
3518 } else if (sop->type == REDIS_SORT_DEL) {
3519 /* TODO */
3520 }
3521 }
3522 }
3523
3524 /* Cleanup */
3525 decrRefCount(sortval);
3526 listRelease(operations);
3527 for (j = 0; j < vectorlen; j++) {
3528 if (sortby && alpha && vector[j].u.cmpobj)
3529 decrRefCount(vector[j].u.cmpobj);
3530 }
3531 zfree(vector);
3532 }
3533
3534 static void infoCommand(redisClient *c) {
3535 sds info;
3536 time_t uptime = time(NULL)-server.stat_starttime;
3537
3538 info = sdscatprintf(sdsempty(),
3539 "redis_version:%s\r\n"
3540 "uptime_in_seconds:%d\r\n"
3541 "uptime_in_days:%d\r\n"
3542 "connected_clients:%d\r\n"
3543 "connected_slaves:%d\r\n"
3544 "used_memory:%zu\r\n"
3545 "changes_since_last_save:%lld\r\n"
3546 "bgsave_in_progress:%d\r\n"
3547 "last_save_time:%d\r\n"
3548 "total_connections_received:%lld\r\n"
3549 "total_commands_processed:%lld\r\n"
3550 "role:%s\r\n"
3551 ,REDIS_VERSION,
3552 uptime,
3553 uptime/(3600*24),
3554 listLength(server.clients)-listLength(server.slaves),
3555 listLength(server.slaves),
3556 server.usedmemory,
3557 server.dirty,
3558 server.bgsaveinprogress,
3559 server.lastsave,
3560 server.stat_numconnections,
3561 server.stat_numcommands,
3562 server.masterhost == NULL ? "master" : "slave"
3563 );
3564 if (server.masterhost) {
3565 info = sdscatprintf(info,
3566 "master_host:%s\r\n"
3567 "master_port:%d\r\n"
3568 "master_link_status:%s\r\n"
3569 "master_last_io_seconds_ago:%d\r\n"
3570 ,server.masterhost,
3571 server.masterport,
3572 (server.replstate == REDIS_REPL_CONNECTED) ?
3573 "up" : "down",
3574 (int)(time(NULL)-server.master->lastinteraction)
3575 );
3576 }
3577 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
3578 addReplySds(c,info);
3579 addReply(c,shared.crlf);
3580 }
3581
3582 static void monitorCommand(redisClient *c) {
3583 /* ignore MONITOR if aleady slave or in monitor mode */
3584 if (c->flags & REDIS_SLAVE) return;
3585
3586 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3587 c->slaveseldb = 0;
3588 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3589 addReply(c,shared.ok);
3590 }
3591
3592 /* ================================= Expire ================================= */
3593 static int removeExpire(redisDb *db, robj *key) {
3594 if (dictDelete(db->expires,key) == DICT_OK) {
3595 return 1;
3596 } else {
3597 return 0;
3598 }
3599 }
3600
3601 static int setExpire(redisDb *db, robj *key, time_t when) {
3602 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
3603 return 0;
3604 } else {
3605 incrRefCount(key);
3606 return 1;
3607 }
3608 }
3609
3610 /* Return the expire time of the specified key, or -1 if no expire
3611 * is associated with this key (i.e. the key is non volatile) */
3612 static time_t getExpire(redisDb *db, robj *key) {
3613 dictEntry *de;
3614
3615 /* No expire? return ASAP */
3616 if (dictSize(db->expires) == 0 ||
3617 (de = dictFind(db->expires,key)) == NULL) return -1;
3618
3619 return (time_t) dictGetEntryVal(de);
3620 }
3621
3622 static int expireIfNeeded(redisDb *db, robj *key) {
3623 time_t when;
3624 dictEntry *de;
3625
3626 /* No expire? return ASAP */
3627 if (dictSize(db->expires) == 0 ||
3628 (de = dictFind(db->expires,key)) == NULL) return 0;
3629
3630 /* Lookup the expire */
3631 when = (time_t) dictGetEntryVal(de);
3632 if (time(NULL) <= when) return 0;
3633
3634 /* Delete the key */
3635 dictDelete(db->expires,key);
3636 return dictDelete(db->dict,key) == DICT_OK;
3637 }
3638
3639 static int deleteIfVolatile(redisDb *db, robj *key) {
3640 dictEntry *de;
3641
3642 /* No expire? return ASAP */
3643 if (dictSize(db->expires) == 0 ||
3644 (de = dictFind(db->expires,key)) == NULL) return 0;
3645
3646 /* Delete the key */
3647 server.dirty++;
3648 dictDelete(db->expires,key);
3649 return dictDelete(db->dict,key) == DICT_OK;
3650 }
3651
3652 static void expireCommand(redisClient *c) {
3653 dictEntry *de;
3654 int seconds = atoi(c->argv[2]->ptr);
3655
3656 de = dictFind(c->db->dict,c->argv[1]);
3657 if (de == NULL) {
3658 addReply(c,shared.czero);
3659 return;
3660 }
3661 if (seconds <= 0) {
3662 addReply(c, shared.czero);
3663 return;
3664 } else {
3665 time_t when = time(NULL)+seconds;
3666 if (setExpire(c->db,c->argv[1],when))
3667 addReply(c,shared.cone);
3668 else
3669 addReply(c,shared.czero);
3670 return;
3671 }
3672 }
3673
3674 static void ttlCommand(redisClient *c) {
3675 time_t expire;
3676 int ttl = -1;
3677
3678 expire = getExpire(c->db,c->argv[1]);
3679 if (expire != -1) {
3680 ttl = (int) (expire-time(NULL));
3681 if (ttl < 0) ttl = -1;
3682 }
3683 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
3684 }
3685
3686 /* =============================== Replication ============================= */
3687
3688 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
3689 ssize_t nwritten, ret = size;
3690 time_t start = time(NULL);
3691
3692 timeout++;
3693 while(size) {
3694 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
3695 nwritten = write(fd,ptr,size);
3696 if (nwritten == -1) return -1;
3697 ptr += nwritten;
3698 size -= nwritten;
3699 }
3700 if ((time(NULL)-start) > timeout) {
3701 errno = ETIMEDOUT;
3702 return -1;
3703 }
3704 }
3705 return ret;
3706 }
3707
3708 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
3709 ssize_t nread, totread = 0;
3710 time_t start = time(NULL);
3711
3712 timeout++;
3713 while(size) {
3714 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
3715 nread = read(fd,ptr,size);
3716 if (nread == -1) return -1;
3717 ptr += nread;
3718 size -= nread;
3719 totread += nread;
3720 }
3721 if ((time(NULL)-start) > timeout) {
3722 errno = ETIMEDOUT;
3723 return -1;
3724 }
3725 }
3726 return totread;
3727 }
3728
3729 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
3730 ssize_t nread = 0;
3731
3732 size--;
3733 while(size) {
3734 char c;
3735
3736 if (syncRead(fd,&c,1,timeout) == -1) return -1;
3737 if (c == '\n') {
3738 *ptr = '\0';
3739 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
3740 return nread;
3741 } else {
3742 *ptr++ = c;
3743 *ptr = '\0';
3744 nread++;
3745 }
3746 }
3747 return nread;
3748 }
3749
3750 static void syncCommand(redisClient *c) {
3751 /* ignore SYNC if aleady slave or in monitor mode */
3752 if (c->flags & REDIS_SLAVE) return;
3753
3754 /* SYNC can't be issued when the server has pending data to send to
3755 * the client about already issued commands. We need a fresh reply
3756 * buffer registering the differences between the BGSAVE and the current
3757 * dataset, so that we can copy to other slaves if needed. */
3758 if (listLength(c->reply) != 0) {
3759 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3760 return;
3761 }
3762
3763 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
3764 /* Here we need to check if there is a background saving operation
3765 * in progress, or if it is required to start one */
3766 if (server.bgsaveinprogress) {
3767 /* Ok a background save is in progress. Let's check if it is a good
3768 * one for replication, i.e. if there is another slave that is
3769 * registering differences since the server forked to save */
3770 redisClient *slave;
3771 listNode *ln;
3772
3773 listRewind(server.slaves);
3774 while((ln = listYield(server.slaves))) {
3775 slave = ln->value;
3776 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
3777 }
3778 if (ln) {
3779 /* Perfect, the server is already registering differences for
3780 * another slave. Set the right state, and copy the buffer. */
3781 listRelease(c->reply);
3782 c->reply = listDup(slave->reply);
3783 if (!c->reply) oom("listDup copying slave reply list");
3784 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3785 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
3786 } else {
3787 /* No way, we need to wait for the next BGSAVE in order to
3788 * register differences */
3789 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
3790 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
3791 }
3792 } else {
3793 /* Ok we don't have a BGSAVE in progress, let's start one */
3794 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
3795 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3796 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
3797 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
3798 return;
3799 }
3800 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3801 }
3802 c->repldbfd = -1;
3803 c->flags |= REDIS_SLAVE;
3804 c->slaveseldb = 0;
3805 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
3806 return;
3807 }
3808
3809 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
3810 redisClient *slave = privdata;
3811 REDIS_NOTUSED(el);
3812 REDIS_NOTUSED(mask);
3813 char buf[REDIS_IOBUF_LEN];
3814 ssize_t nwritten, buflen;
3815
3816 if (slave->repldboff == 0) {
3817 /* Write the bulk write count before to transfer the DB. In theory here
3818 * we don't know how much room there is in the output buffer of the
3819 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3820 * operations) will never be smaller than the few bytes we need. */
3821 sds bulkcount;
3822
3823 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3824 slave->repldbsize);
3825 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
3826 {
3827 sdsfree(bulkcount);
3828 freeClient(slave);
3829 return;
3830 }
3831 sdsfree(bulkcount);
3832 }
3833 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
3834 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
3835 if (buflen <= 0) {
3836 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
3837 (buflen == 0) ? "premature EOF" : strerror(errno));
3838 freeClient(slave);
3839 return;
3840 }
3841 if ((nwritten = write(fd,buf,buflen)) == -1) {
3842 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
3843 strerror(errno));
3844 freeClient(slave);
3845 return;
3846 }
3847 slave->repldboff += nwritten;
3848 if (slave->repldboff == slave->repldbsize) {
3849 close(slave->repldbfd);
3850 slave->repldbfd = -1;
3851 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3852 slave->replstate = REDIS_REPL_ONLINE;
3853 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
3854 sendReplyToClient, slave, NULL) == AE_ERR) {
3855 freeClient(slave);
3856 return;
3857 }
3858 addReplySds(slave,sdsempty());
3859 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
3860 }
3861 }
3862
3863 static void updateSalvesWaitingBgsave(int bgsaveerr) {
3864 listNode *ln;
3865 int startbgsave = 0;
3866
3867 listRewind(server.slaves);
3868 while((ln = listYield(server.slaves))) {
3869 redisClient *slave = ln->value;
3870
3871 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
3872 startbgsave = 1;
3873 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3874 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
3875 struct stat buf;
3876
3877 if (bgsaveerr != REDIS_OK) {
3878 freeClient(slave);
3879 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
3880 continue;
3881 }
3882 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
3883 fstat(slave->repldbfd,&buf) == -1) {
3884 freeClient(slave);
3885 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
3886 continue;
3887 }
3888 slave->repldboff = 0;
3889 slave->repldbsize = buf.st_size;
3890 slave->replstate = REDIS_REPL_SEND_BULK;
3891 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3892 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
3893 freeClient(slave);
3894 continue;
3895 }
3896 }
3897 }
3898 if (startbgsave) {
3899 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3900 listRewind(server.slaves);
3901 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
3902 while((ln = listYield(server.slaves))) {
3903 redisClient *slave = ln->value;
3904
3905 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
3906 freeClient(slave);
3907 }
3908 }
3909 }
3910 }
3911
3912 static int syncWithMaster(void) {
3913 char buf[1024], tmpfile[256];
3914 int dumpsize;
3915 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
3916 int dfd;
3917
3918 if (fd == -1) {
3919 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
3920 strerror(errno));
3921 return REDIS_ERR;
3922 }
3923 /* Issue the SYNC command */
3924 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
3925 close(fd);
3926 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
3927 strerror(errno));
3928 return REDIS_ERR;
3929 }
3930 /* Read the bulk write count */
3931 if (syncReadLine(fd,buf,1024,3600) == -1) {
3932 close(fd);
3933 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
3934 strerror(errno));
3935 return REDIS_ERR;
3936 }
3937 dumpsize = atoi(buf+1);
3938 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
3939 /* Read the bulk write data on a temp file */
3940 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
3941 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
3942 if (dfd == -1) {
3943 close(fd);
3944 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
3945 return REDIS_ERR;
3946 }
3947 while(dumpsize) {
3948 int nread, nwritten;
3949
3950 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
3951 if (nread == -1) {
3952 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
3953 strerror(errno));
3954 close(fd);
3955 close(dfd);
3956 return REDIS_ERR;
3957 }
3958 nwritten = write(dfd,buf,nread);
3959 if (nwritten == -1) {
3960 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
3961 close(fd);
3962 close(dfd);
3963 return REDIS_ERR;
3964 }
3965 dumpsize -= nread;
3966 }
3967 close(dfd);
3968 if (rename(tmpfile,server.dbfilename) == -1) {
3969 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
3970 unlink(tmpfile);
3971 close(fd);
3972 return REDIS_ERR;
3973 }
3974 emptyDb();
3975 if (rdbLoad(server.dbfilename) != REDIS_OK) {
3976 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
3977 close(fd);
3978 return REDIS_ERR;
3979 }
3980 server.master = createClient(fd);
3981 server.master->flags |= REDIS_MASTER;
3982 server.replstate = REDIS_REPL_CONNECTED;
3983 return REDIS_OK;
3984 }
3985
3986 static void slaveofCommand(redisClient *c) {
3987 if (!strcasecmp(c->argv[1]->ptr,"no") &&
3988 !strcasecmp(c->argv[2]->ptr,"one")) {
3989 if (server.masterhost) {
3990 sdsfree(server.masterhost);
3991 server.masterhost = NULL;
3992 if (server.master) freeClient(server.master);
3993 server.replstate = REDIS_REPL_NONE;
3994 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
3995 }
3996 } else {
3997 sdsfree(server.masterhost);
3998 server.masterhost = sdsdup(c->argv[1]->ptr);
3999 server.masterport = atoi(c->argv[2]->ptr);
4000 if (server.master) freeClient(server.master);
4001 server.replstate = REDIS_REPL_CONNECT;
4002 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4003 server.masterhost, server.masterport);
4004 }
4005 addReply(c,shared.ok);
4006 }
4007
4008 /* ============================ Maxmemory directive ======================== */
4009
4010 /* This function gets called when 'maxmemory' is set on the config file to limit
4011 * the max memory used by the server, and we are out of memory.
4012 * This function will try to, in order:
4013 *
4014 * - Free objects from the free list
4015 * - Try to remove keys with an EXPIRE set
4016 *
4017 * It is not possible to free enough memory to reach used-memory < maxmemory
4018 * the server will start refusing commands that will enlarge even more the
4019 * memory usage.
4020 */
4021 static void freeMemoryIfNeeded(void) {
4022 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4023 if (listLength(server.objfreelist)) {
4024 robj *o;
4025
4026 listNode *head = listFirst(server.objfreelist);
4027 o = listNodeValue(head);
4028 listDelNode(server.objfreelist,head);
4029 zfree(o);
4030 } else {
4031 int j, k, freed = 0;
4032
4033 for (j = 0; j < server.dbnum; j++) {
4034 int minttl = -1;
4035 robj *minkey = NULL;
4036 struct dictEntry *de;
4037
4038 if (dictSize(server.db[j].expires)) {
4039 freed = 1;
4040 /* From a sample of three keys drop the one nearest to
4041 * the natural expire */
4042 for (k = 0; k < 3; k++) {
4043 time_t t;
4044
4045 de = dictGetRandomKey(server.db[j].expires);
4046 t = (time_t) dictGetEntryVal(de);
4047 if (minttl == -1 || t < minttl) {
4048 minkey = dictGetEntryKey(de);
4049 minttl = t;
4050 }
4051 }
4052 deleteKey(server.db+j,minkey);
4053 }
4054 }
4055 if (!freed) return; /* nothing to free... */
4056 }
4057 }
4058 }
4059
4060 /* ================================= Debugging ============================== */
4061
4062 static void debugCommand(redisClient *c) {
4063 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4064 *((char*)-1) = 'x';
4065 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4066 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4067 robj *key, *val;
4068
4069 if (!de) {
4070 addReply(c,shared.nokeyerr);
4071 return;
4072 }
4073 key = dictGetEntryKey(de);
4074 val = dictGetEntryVal(de);
4075 addReplySds(c,sdscatprintf(sdsempty(),
4076 "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
4077 key, key->refcount, val, val->refcount));
4078 } else {
4079 addReplySds(c,sdsnew(
4080 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4081 }
4082 }
4083
4084 static void segvHandler (int sig, siginfo_t *info, void *secret) {
4085
4086 void *trace[100];
4087 char **messages = (char **)NULL;
4088 int i, trace_size = 0;
4089 ucontext_t *uc = (ucontext_t *)secret;
4090
4091 redisLog(REDIS_DEBUG, "Segmentation fault -%d- Redis %s", sig, REDIS_VERSION );
4092 redisLog(REDIS_DEBUG,"EIP %p", (void *)uc->uc_mcontext.gregs[REG_EIP]);
4093 redisLog(REDIS_DEBUG,"EAX %p, EBX %p, ECX %p, EDX %p", (void *)uc->uc_mcontext.gregs[REG_EAX], (void *)uc->uc_mcontext.gregs[REG_EBX], (void *)uc->uc_mcontext.gregs[REG_ECX], (void *)uc->uc_mcontext.gregs[REG_EDX]);
4094
4095
4096 trace_size = backtrace(trace, 100);
4097 /* overwrite sigaction with caller's address */
4098 trace[1] = (void *) uc->uc_mcontext.gregs[REG_EIP];
4099
4100 messages = backtrace_symbols(trace, trace_size);
4101
4102 for (i=1; i<trace_size; ++i)
4103 redisLog(REDIS_DEBUG,"[bt] %s", messages[i]);
4104
4105 free(messages);
4106 exit(0);
4107 }
4108
4109 void setupSigSegvAction(){
4110 struct sigaction act;
4111 sigemptyset (&act.sa_mask);
4112 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used. Otherwise, sa_handler is used */
4113 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
4114 act.sa_sigaction = segvHandler;
4115 sigaction (SIGSEGV, &act, NULL);
4116 }
4117 /* =================================== Main! ================================ */
4118
4119 #ifdef __linux__
4120 int linuxOvercommitMemoryValue(void) {
4121 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4122 char buf[64];
4123
4124 if (!fp) return -1;
4125 if (fgets(buf,64,fp) == NULL) {
4126 fclose(fp);
4127 return -1;
4128 }
4129 fclose(fp);
4130
4131 return atoi(buf);
4132 }
4133
4134 void linuxOvercommitMemoryWarning(void) {
4135 if (linuxOvercommitMemoryValue() == 0) {
4136 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4137 }
4138 }
4139 #endif /* __linux__ */
4140
4141 static void daemonize(void) {
4142 int fd;
4143 FILE *fp;
4144
4145 if (fork() != 0) exit(0); /* parent exits */
4146 setsid(); /* create a new session */
4147
4148 /* Every output goes to /dev/null. If Redis is daemonized but
4149 * the 'logfile' is set to 'stdout' in the configuration file
4150 * it will not log at all. */
4151 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4152 dup2(fd, STDIN_FILENO);
4153 dup2(fd, STDOUT_FILENO);
4154 dup2(fd, STDERR_FILENO);
4155 if (fd > STDERR_FILENO) close(fd);
4156 }
4157 /* Try to write the pid file */
4158 fp = fopen(server.pidfile,"w");
4159 if (fp) {
4160 fprintf(fp,"%d\n",getpid());
4161 fclose(fp);
4162 }
4163 }
4164
4165 int main(int argc, char **argv) {
4166 #ifdef __linux__
4167 linuxOvercommitMemoryWarning();
4168 #endif
4169
4170 initServerConfig();
4171 if (argc == 2) {
4172 ResetServerSaveParams();
4173 loadServerConfig(argv[1]);
4174 } else if (argc > 2) {
4175 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4176 exit(1);
4177 } else {
4178 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4179 }
4180 initServer();
4181 if (server.daemonize) daemonize();
4182 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4183 if (rdbLoad(server.dbfilename) == REDIS_OK)
4184 redisLog(REDIS_NOTICE,"DB loaded from disk");
4185 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4186 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4187 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4188 aeMain(server.el);
4189 aeDeleteEventLoop(server.el);
4190 return 0;
4191 }