]> git.saurik.com Git - redis.git/blob - redis.c
LREM now returns :0 for non existing keys
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "0.101"
31
32 #include "fmacros.h"
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 #include <signal.h>
40 #include <sys/wait.h>
41 #include <errno.h>
42 #include <assert.h>
43 #include <ctype.h>
44 #include <stdarg.h>
45 #include <inttypes.h>
46 #include <arpa/inet.h>
47 #include <sys/stat.h>
48 #include <fcntl.h>
49 #include <sys/time.h>
50 #include <sys/resource.h>
51 #include <limits.h>
52 #include <execinfo.h>
53
54 #include "ae.h" /* Event driven programming library */
55 #include "sds.h" /* Dynamic safe strings */
56 #include "anet.h" /* Networking the easy way */
57 #include "dict.h" /* Hash tables */
58 #include "adlist.h" /* Linked lists */
59 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
60 #include "lzf.h" /* LZF compression library */
61 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
62
63 #include "config.h"
64
65 /* Error codes */
66 #define REDIS_OK 0
67 #define REDIS_ERR -1
68
69 /* Static server configuration */
70 #define REDIS_SERVERPORT 6379 /* TCP port */
71 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
72 #define REDIS_IOBUF_LEN 1024
73 #define REDIS_LOADBUF_LEN 1024
74 #define REDIS_STATIC_ARGS 4
75 #define REDIS_DEFAULT_DBNUM 16
76 #define REDIS_CONFIGLINE_MAX 1024
77 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
78 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
79 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
80
81 /* Hash table parameters */
82 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
83 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
84
85 /* Command flags */
86 #define REDIS_CMD_BULK 1 /* Bulk write command */
87 #define REDIS_CMD_INLINE 2 /* Inline command */
88 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
89 this flags will return an error when the 'maxmemory' option is set in the
90 config file and the server is using more than maxmemory bytes of memory.
91 In short this commands are denied on low memory conditions. */
92 #define REDIS_CMD_DENYOOM 4
93
94 /* Object types */
95 #define REDIS_STRING 0
96 #define REDIS_LIST 1
97 #define REDIS_SET 2
98 #define REDIS_HASH 3
99
100 /* Object types only used for dumping to disk */
101 #define REDIS_EXPIRETIME 253
102 #define REDIS_SELECTDB 254
103 #define REDIS_EOF 255
104
105 /* Defines related to the dump file format. To store 32 bits lengths for short
106 * keys requires a lot of space, so we check the most significant 2 bits of
107 * the first byte to interpreter the length:
108 *
109 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
110 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
111 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
112 * 11|000000 this means: specially encoded object will follow. The six bits
113 * number specify the kind of object that follows.
114 * See the REDIS_RDB_ENC_* defines.
115 *
116 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
117 * values, will fit inside. */
118 #define REDIS_RDB_6BITLEN 0
119 #define REDIS_RDB_14BITLEN 1
120 #define REDIS_RDB_32BITLEN 2
121 #define REDIS_RDB_ENCVAL 3
122 #define REDIS_RDB_LENERR UINT_MAX
123
124 /* When a length of a string object stored on disk has the first two bits
125 * set, the remaining two bits specify a special encoding for the object
126 * accordingly to the following defines: */
127 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
128 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
129 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
130 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
131
132 /* Client flags */
133 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
134 #define REDIS_SLAVE 2 /* This client is a slave server */
135 #define REDIS_MASTER 4 /* This client is a master server */
136 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
137
138 /* Slave replication state - slave side */
139 #define REDIS_REPL_NONE 0 /* No active replication */
140 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
141 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
142
143 /* Slave replication state - from the point of view of master
144 * Note that in SEND_BULK and ONLINE state the slave receives new updates
145 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
146 * to start the next background saving in order to send updates to it. */
147 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
148 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
149 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
150 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
151
152 /* List related stuff */
153 #define REDIS_HEAD 0
154 #define REDIS_TAIL 1
155
156 /* Sort operations */
157 #define REDIS_SORT_GET 0
158 #define REDIS_SORT_DEL 1
159 #define REDIS_SORT_INCR 2
160 #define REDIS_SORT_DECR 3
161 #define REDIS_SORT_ASC 4
162 #define REDIS_SORT_DESC 5
163 #define REDIS_SORTKEY_MAX 1024
164
165 /* Log levels */
166 #define REDIS_DEBUG 0
167 #define REDIS_NOTICE 1
168 #define REDIS_WARNING 2
169
170 /* Anti-warning macro... */
171 #define REDIS_NOTUSED(V) ((void) V)
172
173 /*================================= Data types ============================== */
174
175 /* A redis object, that is a type able to hold a string / list / set */
176 typedef struct redisObject {
177 void *ptr;
178 int type;
179 int refcount;
180 } robj;
181
182 typedef struct redisDb {
183 dict *dict;
184 dict *expires;
185 int id;
186 } redisDb;
187
188 /* With multiplexing we need to take per-clinet state.
189 * Clients are taken in a liked list. */
190 typedef struct redisClient {
191 int fd;
192 redisDb *db;
193 int dictid;
194 sds querybuf;
195 robj **argv;
196 int argc;
197 int bulklen; /* bulk read len. -1 if not in bulk read mode */
198 list *reply;
199 int sentlen;
200 time_t lastinteraction; /* time of the last interaction, used for timeout */
201 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
202 int slaveseldb; /* slave selected db, if this client is a slave */
203 int authenticated; /* when requirepass is non-NULL */
204 int replstate; /* replication state if this is a slave */
205 int repldbfd; /* replication DB file descriptor */
206 long repldboff; /* replication DB file offset */
207 off_t repldbsize; /* replication DB file size */
208 } redisClient;
209
210 struct saveparam {
211 time_t seconds;
212 int changes;
213 };
214
215 /* Global server state structure */
216 struct redisServer {
217 int port;
218 int fd;
219 redisDb *db;
220 dict *sharingpool;
221 unsigned int sharingpoolsize;
222 long long dirty; /* changes to DB from the last save */
223 list *clients;
224 list *slaves, *monitors;
225 char neterr[ANET_ERR_LEN];
226 aeEventLoop *el;
227 int cronloops; /* number of times the cron function run */
228 list *objfreelist; /* A list of freed objects to avoid malloc() */
229 time_t lastsave; /* Unix time of last save succeeede */
230 size_t usedmemory; /* Used memory in megabytes */
231 /* Fields used only for stats */
232 time_t stat_starttime; /* server start time */
233 long long stat_numcommands; /* number of processed commands */
234 long long stat_numconnections; /* number of connections received */
235 /* Configuration */
236 int verbosity;
237 int glueoutputbuf;
238 int maxidletime;
239 int dbnum;
240 int daemonize;
241 char *pidfile;
242 int bgsaveinprogress;
243 struct saveparam *saveparams;
244 int saveparamslen;
245 char *logfile;
246 char *bindaddr;
247 char *dbfilename;
248 char *requirepass;
249 int shareobjects;
250 /* Replication related */
251 int isslave;
252 char *masterhost;
253 int masterport;
254 redisClient *master; /* client that is master for this slave */
255 int replstate;
256 unsigned int maxclients;
257 unsigned int maxmemory;
258 /* Sort parameters - qsort_r() is only available under BSD so we
259 * have to take this state global, in order to pass it to sortCompare() */
260 int sort_desc;
261 int sort_alpha;
262 int sort_bypattern;
263 };
264
265 typedef void redisCommandProc(redisClient *c);
266 struct redisCommand {
267 char *name;
268 redisCommandProc *proc;
269 int arity;
270 int flags;
271 };
272
273 typedef struct _redisSortObject {
274 robj *obj;
275 union {
276 double score;
277 robj *cmpobj;
278 } u;
279 } redisSortObject;
280
281 typedef struct _redisSortOperation {
282 int type;
283 robj *pattern;
284 } redisSortOperation;
285
286 struct sharedObjectsStruct {
287 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
288 *colon, *nullbulk, *nullmultibulk,
289 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
290 *outofrangeerr, *plus,
291 *select0, *select1, *select2, *select3, *select4,
292 *select5, *select6, *select7, *select8, *select9;
293 } shared;
294
295 /*================================ Prototypes =============================== */
296
297 static void freeStringObject(robj *o);
298 static void freeListObject(robj *o);
299 static void freeSetObject(robj *o);
300 static void decrRefCount(void *o);
301 static robj *createObject(int type, void *ptr);
302 static void freeClient(redisClient *c);
303 static int rdbLoad(char *filename);
304 static void addReply(redisClient *c, robj *obj);
305 static void addReplySds(redisClient *c, sds s);
306 static void incrRefCount(robj *o);
307 static int rdbSaveBackground(char *filename);
308 static robj *createStringObject(char *ptr, size_t len);
309 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
310 static int syncWithMaster(void);
311 static robj *tryObjectSharing(robj *o);
312 static int removeExpire(redisDb *db, robj *key);
313 static int expireIfNeeded(redisDb *db, robj *key);
314 static int deleteIfVolatile(redisDb *db, robj *key);
315 static int deleteKey(redisDb *db, robj *key);
316 static time_t getExpire(redisDb *db, robj *key);
317 static int setExpire(redisDb *db, robj *key, time_t when);
318 static void updateSalvesWaitingBgsave(int bgsaveerr);
319 static void freeMemoryIfNeeded(void);
320
321 static void authCommand(redisClient *c);
322 static void pingCommand(redisClient *c);
323 static void echoCommand(redisClient *c);
324 static void setCommand(redisClient *c);
325 static void setnxCommand(redisClient *c);
326 static void getCommand(redisClient *c);
327 static void delCommand(redisClient *c);
328 static void existsCommand(redisClient *c);
329 static void incrCommand(redisClient *c);
330 static void decrCommand(redisClient *c);
331 static void incrbyCommand(redisClient *c);
332 static void decrbyCommand(redisClient *c);
333 static void selectCommand(redisClient *c);
334 static void randomkeyCommand(redisClient *c);
335 static void keysCommand(redisClient *c);
336 static void dbsizeCommand(redisClient *c);
337 static void lastsaveCommand(redisClient *c);
338 static void saveCommand(redisClient *c);
339 static void bgsaveCommand(redisClient *c);
340 static void shutdownCommand(redisClient *c);
341 static void moveCommand(redisClient *c);
342 static void renameCommand(redisClient *c);
343 static void renamenxCommand(redisClient *c);
344 static void lpushCommand(redisClient *c);
345 static void rpushCommand(redisClient *c);
346 static void lpopCommand(redisClient *c);
347 static void rpopCommand(redisClient *c);
348 static void llenCommand(redisClient *c);
349 static void lindexCommand(redisClient *c);
350 static void lrangeCommand(redisClient *c);
351 static void ltrimCommand(redisClient *c);
352 static void typeCommand(redisClient *c);
353 static void lsetCommand(redisClient *c);
354 static void saddCommand(redisClient *c);
355 static void sremCommand(redisClient *c);
356 static void smoveCommand(redisClient *c);
357 static void sismemberCommand(redisClient *c);
358 static void scardCommand(redisClient *c);
359 static void sinterCommand(redisClient *c);
360 static void sinterstoreCommand(redisClient *c);
361 static void sunionCommand(redisClient *c);
362 static void sunionstoreCommand(redisClient *c);
363 static void sdiffCommand(redisClient *c);
364 static void sdiffstoreCommand(redisClient *c);
365 static void syncCommand(redisClient *c);
366 static void flushdbCommand(redisClient *c);
367 static void flushallCommand(redisClient *c);
368 static void sortCommand(redisClient *c);
369 static void lremCommand(redisClient *c);
370 static void infoCommand(redisClient *c);
371 static void mgetCommand(redisClient *c);
372 static void monitorCommand(redisClient *c);
373 static void expireCommand(redisClient *c);
374 static void getSetCommand(redisClient *c);
375 static void ttlCommand(redisClient *c);
376 static void slaveofCommand(redisClient *c);
377 static void debugCommand(redisClient *c);
378
379 /*================================= Globals ================================= */
380
381 /* Global vars */
382 static struct redisServer server; /* server global state */
383 static struct redisCommand cmdTable[] = {
384 {"get",getCommand,2,REDIS_CMD_INLINE},
385 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
386 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
387 {"del",delCommand,-2,REDIS_CMD_INLINE},
388 {"exists",existsCommand,2,REDIS_CMD_INLINE},
389 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
390 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
391 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
392 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
393 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
394 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
395 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
396 {"llen",llenCommand,2,REDIS_CMD_INLINE},
397 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
398 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
399 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
400 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
401 {"lrem",lremCommand,4,REDIS_CMD_BULK},
402 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
403 {"srem",sremCommand,3,REDIS_CMD_BULK},
404 {"smove",smoveCommand,4,REDIS_CMD_BULK},
405 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
406 {"scard",scardCommand,2,REDIS_CMD_INLINE},
407 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
408 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
409 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
410 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
411 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
412 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
413 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
414 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
415 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
416 {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
417 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
418 {"select",selectCommand,2,REDIS_CMD_INLINE},
419 {"move",moveCommand,3,REDIS_CMD_INLINE},
420 {"rename",renameCommand,3,REDIS_CMD_INLINE},
421 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
422 {"expire",expireCommand,3,REDIS_CMD_INLINE},
423 {"keys",keysCommand,2,REDIS_CMD_INLINE},
424 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
425 {"auth",authCommand,2,REDIS_CMD_INLINE},
426 {"ping",pingCommand,1,REDIS_CMD_INLINE},
427 {"echo",echoCommand,2,REDIS_CMD_BULK},
428 {"save",saveCommand,1,REDIS_CMD_INLINE},
429 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
430 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
431 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
432 {"type",typeCommand,2,REDIS_CMD_INLINE},
433 {"sync",syncCommand,1,REDIS_CMD_INLINE},
434 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
435 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
436 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
437 {"info",infoCommand,1,REDIS_CMD_INLINE},
438 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
439 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
440 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
441 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
442 {NULL,NULL,0,0}
443 };
444
445 /*============================ Utility functions ============================ */
446
447 /* Glob-style pattern matching. */
448 int stringmatchlen(const char *pattern, int patternLen,
449 const char *string, int stringLen, int nocase)
450 {
451 while(patternLen) {
452 switch(pattern[0]) {
453 case '*':
454 while (pattern[1] == '*') {
455 pattern++;
456 patternLen--;
457 }
458 if (patternLen == 1)
459 return 1; /* match */
460 while(stringLen) {
461 if (stringmatchlen(pattern+1, patternLen-1,
462 string, stringLen, nocase))
463 return 1; /* match */
464 string++;
465 stringLen--;
466 }
467 return 0; /* no match */
468 break;
469 case '?':
470 if (stringLen == 0)
471 return 0; /* no match */
472 string++;
473 stringLen--;
474 break;
475 case '[':
476 {
477 int not, match;
478
479 pattern++;
480 patternLen--;
481 not = pattern[0] == '^';
482 if (not) {
483 pattern++;
484 patternLen--;
485 }
486 match = 0;
487 while(1) {
488 if (pattern[0] == '\\') {
489 pattern++;
490 patternLen--;
491 if (pattern[0] == string[0])
492 match = 1;
493 } else if (pattern[0] == ']') {
494 break;
495 } else if (patternLen == 0) {
496 pattern--;
497 patternLen++;
498 break;
499 } else if (pattern[1] == '-' && patternLen >= 3) {
500 int start = pattern[0];
501 int end = pattern[2];
502 int c = string[0];
503 if (start > end) {
504 int t = start;
505 start = end;
506 end = t;
507 }
508 if (nocase) {
509 start = tolower(start);
510 end = tolower(end);
511 c = tolower(c);
512 }
513 pattern += 2;
514 patternLen -= 2;
515 if (c >= start && c <= end)
516 match = 1;
517 } else {
518 if (!nocase) {
519 if (pattern[0] == string[0])
520 match = 1;
521 } else {
522 if (tolower((int)pattern[0]) == tolower((int)string[0]))
523 match = 1;
524 }
525 }
526 pattern++;
527 patternLen--;
528 }
529 if (not)
530 match = !match;
531 if (!match)
532 return 0; /* no match */
533 string++;
534 stringLen--;
535 break;
536 }
537 case '\\':
538 if (patternLen >= 2) {
539 pattern++;
540 patternLen--;
541 }
542 /* fall through */
543 default:
544 if (!nocase) {
545 if (pattern[0] != string[0])
546 return 0; /* no match */
547 } else {
548 if (tolower((int)pattern[0]) != tolower((int)string[0]))
549 return 0; /* no match */
550 }
551 string++;
552 stringLen--;
553 break;
554 }
555 pattern++;
556 patternLen--;
557 if (stringLen == 0) {
558 while(*pattern == '*') {
559 pattern++;
560 patternLen--;
561 }
562 break;
563 }
564 }
565 if (patternLen == 0 && stringLen == 0)
566 return 1;
567 return 0;
568 }
569
570 void redisLog(int level, const char *fmt, ...)
571 {
572 va_list ap;
573 FILE *fp;
574
575 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
576 if (!fp) return;
577
578 va_start(ap, fmt);
579 if (level >= server.verbosity) {
580 char *c = ".-*";
581 char buf[64];
582 time_t now;
583
584 now = time(NULL);
585 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
586 fprintf(fp,"%s %c ",buf,c[level]);
587 vfprintf(fp, fmt, ap);
588 fprintf(fp,"\n");
589 fflush(fp);
590 }
591 va_end(ap);
592
593 if (server.logfile) fclose(fp);
594 }
595
596 /*====================== Hash table type implementation ==================== */
597
598 /* This is an hash table type that uses the SDS dynamic strings libary as
599 * keys and radis objects as values (objects can hold SDS strings,
600 * lists, sets). */
601
602 static int sdsDictKeyCompare(void *privdata, const void *key1,
603 const void *key2)
604 {
605 int l1,l2;
606 DICT_NOTUSED(privdata);
607
608 l1 = sdslen((sds)key1);
609 l2 = sdslen((sds)key2);
610 if (l1 != l2) return 0;
611 return memcmp(key1, key2, l1) == 0;
612 }
613
614 static void dictRedisObjectDestructor(void *privdata, void *val)
615 {
616 DICT_NOTUSED(privdata);
617
618 decrRefCount(val);
619 }
620
621 static int dictSdsKeyCompare(void *privdata, const void *key1,
622 const void *key2)
623 {
624 const robj *o1 = key1, *o2 = key2;
625 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
626 }
627
628 static unsigned int dictSdsHash(const void *key) {
629 const robj *o = key;
630 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
631 }
632
633 static dictType setDictType = {
634 dictSdsHash, /* hash function */
635 NULL, /* key dup */
636 NULL, /* val dup */
637 dictSdsKeyCompare, /* key compare */
638 dictRedisObjectDestructor, /* key destructor */
639 NULL /* val destructor */
640 };
641
642 static dictType hashDictType = {
643 dictSdsHash, /* hash function */
644 NULL, /* key dup */
645 NULL, /* val dup */
646 dictSdsKeyCompare, /* key compare */
647 dictRedisObjectDestructor, /* key destructor */
648 dictRedisObjectDestructor /* val destructor */
649 };
650
651 /* ========================= Random utility functions ======================= */
652
653 /* Redis generally does not try to recover from out of memory conditions
654 * when allocating objects or strings, it is not clear if it will be possible
655 * to report this condition to the client since the networking layer itself
656 * is based on heap allocation for send buffers, so we simply abort.
657 * At least the code will be simpler to read... */
658 static void oom(const char *msg) {
659 fprintf(stderr, "%s: Out of memory\n",msg);
660 fflush(stderr);
661 sleep(1);
662 abort();
663 }
664
665 /* ====================== Redis server networking stuff ===================== */
666 void closeTimedoutClients(void) {
667 redisClient *c;
668 listNode *ln;
669 time_t now = time(NULL);
670
671 listRewind(server.clients);
672 while ((ln = listYield(server.clients)) != NULL) {
673 c = listNodeValue(ln);
674 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
675 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
676 (now - c->lastinteraction > server.maxidletime)) {
677 redisLog(REDIS_DEBUG,"Closing idle client");
678 freeClient(c);
679 }
680 }
681 }
682
683 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
684 * we resize the hash table to save memory */
685 void tryResizeHashTables(void) {
686 int j;
687
688 for (j = 0; j < server.dbnum; j++) {
689 long long size, used;
690
691 size = dictSlots(server.db[j].dict);
692 used = dictSize(server.db[j].dict);
693 if (size && used && size > REDIS_HT_MINSLOTS &&
694 (used*100/size < REDIS_HT_MINFILL)) {
695 redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
696 dictResize(server.db[j].dict);
697 redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
698 }
699 }
700 }
701
702 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
703 int j, loops = server.cronloops++;
704 REDIS_NOTUSED(eventLoop);
705 REDIS_NOTUSED(id);
706 REDIS_NOTUSED(clientData);
707
708 /* Update the global state with the amount of used memory */
709 server.usedmemory = zmalloc_used_memory();
710
711 /* Show some info about non-empty databases */
712 for (j = 0; j < server.dbnum; j++) {
713 long long size, used, vkeys;
714
715 size = dictSlots(server.db[j].dict);
716 used = dictSize(server.db[j].dict);
717 vkeys = dictSize(server.db[j].expires);
718 if (!(loops % 5) && used > 0) {
719 redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size);
720 /* dictPrintStats(server.dict); */
721 }
722 }
723
724 /* We don't want to resize the hash tables while a bacground saving
725 * is in progress: the saving child is created using fork() that is
726 * implemented with a copy-on-write semantic in most modern systems, so
727 * if we resize the HT while there is the saving child at work actually
728 * a lot of memory movements in the parent will cause a lot of pages
729 * copied. */
730 if (!server.bgsaveinprogress) tryResizeHashTables();
731
732 /* Show information about connected clients */
733 if (!(loops % 5)) {
734 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use",
735 listLength(server.clients)-listLength(server.slaves),
736 listLength(server.slaves),
737 server.usedmemory,
738 dictSize(server.sharingpool));
739 }
740
741 /* Close connections of timedout clients */
742 if (server.maxidletime && !(loops % 10))
743 closeTimedoutClients();
744
745 /* Check if a background saving in progress terminated */
746 if (server.bgsaveinprogress) {
747 int statloc;
748 /* XXX: TODO handle the case of the saving child killed */
749 if (wait4(-1,&statloc,WNOHANG,NULL)) {
750 int exitcode = WEXITSTATUS(statloc);
751 if (exitcode == 0) {
752 redisLog(REDIS_NOTICE,
753 "Background saving terminated with success");
754 server.dirty = 0;
755 server.lastsave = time(NULL);
756 } else {
757 redisLog(REDIS_WARNING,
758 "Background saving error");
759 }
760 server.bgsaveinprogress = 0;
761 updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
762 }
763 } else {
764 /* If there is not a background saving in progress check if
765 * we have to save now */
766 time_t now = time(NULL);
767 for (j = 0; j < server.saveparamslen; j++) {
768 struct saveparam *sp = server.saveparams+j;
769
770 if (server.dirty >= sp->changes &&
771 now-server.lastsave > sp->seconds) {
772 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
773 sp->changes, sp->seconds);
774 rdbSaveBackground(server.dbfilename);
775 break;
776 }
777 }
778 }
779
780 /* Try to expire a few timed out keys */
781 for (j = 0; j < server.dbnum; j++) {
782 redisDb *db = server.db+j;
783 int num = dictSize(db->expires);
784
785 if (num) {
786 time_t now = time(NULL);
787
788 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
789 num = REDIS_EXPIRELOOKUPS_PER_CRON;
790 while (num--) {
791 dictEntry *de;
792 time_t t;
793
794 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
795 t = (time_t) dictGetEntryVal(de);
796 if (now > t) {
797 deleteKey(db,dictGetEntryKey(de));
798 }
799 }
800 }
801 }
802
803 /* Check if we should connect to a MASTER */
804 if (server.replstate == REDIS_REPL_CONNECT) {
805 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
806 if (syncWithMaster() == REDIS_OK) {
807 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
808 }
809 }
810 return 1000;
811 }
812
813 static void createSharedObjects(void) {
814 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
815 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
816 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
817 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
818 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
819 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
820 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
821 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
822 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
823 /* no such key */
824 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
825 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
826 "-ERR Operation against a key holding the wrong kind of value\r\n"));
827 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
828 "-ERR no such key\r\n"));
829 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
830 "-ERR syntax error\r\n"));
831 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
832 "-ERR source and destination objects are the same\r\n"));
833 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
834 "-ERR index out of range\r\n"));
835 shared.space = createObject(REDIS_STRING,sdsnew(" "));
836 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
837 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
838 shared.select0 = createStringObject("select 0\r\n",10);
839 shared.select1 = createStringObject("select 1\r\n",10);
840 shared.select2 = createStringObject("select 2\r\n",10);
841 shared.select3 = createStringObject("select 3\r\n",10);
842 shared.select4 = createStringObject("select 4\r\n",10);
843 shared.select5 = createStringObject("select 5\r\n",10);
844 shared.select6 = createStringObject("select 6\r\n",10);
845 shared.select7 = createStringObject("select 7\r\n",10);
846 shared.select8 = createStringObject("select 8\r\n",10);
847 shared.select9 = createStringObject("select 9\r\n",10);
848 }
849
850 static void appendServerSaveParams(time_t seconds, int changes) {
851 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
852 if (server.saveparams == NULL) oom("appendServerSaveParams");
853 server.saveparams[server.saveparamslen].seconds = seconds;
854 server.saveparams[server.saveparamslen].changes = changes;
855 server.saveparamslen++;
856 }
857
858 static void ResetServerSaveParams() {
859 zfree(server.saveparams);
860 server.saveparams = NULL;
861 server.saveparamslen = 0;
862 }
863
864 static void initServerConfig() {
865 server.dbnum = REDIS_DEFAULT_DBNUM;
866 server.port = REDIS_SERVERPORT;
867 server.verbosity = REDIS_DEBUG;
868 server.maxidletime = REDIS_MAXIDLETIME;
869 server.saveparams = NULL;
870 server.logfile = NULL; /* NULL = log on standard output */
871 server.bindaddr = NULL;
872 server.glueoutputbuf = 1;
873 server.daemonize = 0;
874 server.pidfile = "/var/run/redis.pid";
875 server.dbfilename = "dump.rdb";
876 server.requirepass = NULL;
877 server.shareobjects = 0;
878 server.maxclients = 0;
879 server.maxmemory = 0;
880 ResetServerSaveParams();
881
882 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
883 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
884 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
885 /* Replication related */
886 server.isslave = 0;
887 server.masterhost = NULL;
888 server.masterport = 6379;
889 server.master = NULL;
890 server.replstate = REDIS_REPL_NONE;
891 }
892
893 static void initServer() {
894 int j;
895
896 signal(SIGHUP, SIG_IGN);
897 signal(SIGPIPE, SIG_IGN);
898
899 server.clients = listCreate();
900 server.slaves = listCreate();
901 server.monitors = listCreate();
902 server.objfreelist = listCreate();
903 createSharedObjects();
904 server.el = aeCreateEventLoop();
905 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
906 server.sharingpool = dictCreate(&setDictType,NULL);
907 server.sharingpoolsize = 1024;
908 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
909 oom("server initialization"); /* Fatal OOM */
910 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
911 if (server.fd == -1) {
912 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
913 exit(1);
914 }
915 for (j = 0; j < server.dbnum; j++) {
916 server.db[j].dict = dictCreate(&hashDictType,NULL);
917 server.db[j].expires = dictCreate(&setDictType,NULL);
918 server.db[j].id = j;
919 }
920 server.cronloops = 0;
921 server.bgsaveinprogress = 0;
922 server.lastsave = time(NULL);
923 server.dirty = 0;
924 server.usedmemory = 0;
925 server.stat_numcommands = 0;
926 server.stat_numconnections = 0;
927 server.stat_starttime = time(NULL);
928 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
929 }
930
931 /* Empty the whole database */
932 static long long emptyDb() {
933 int j;
934 long long removed = 0;
935
936 for (j = 0; j < server.dbnum; j++) {
937 removed += dictSize(server.db[j].dict);
938 dictEmpty(server.db[j].dict);
939 dictEmpty(server.db[j].expires);
940 }
941 return removed;
942 }
943
944 static int yesnotoi(char *s) {
945 if (!strcasecmp(s,"yes")) return 1;
946 else if (!strcasecmp(s,"no")) return 0;
947 else return -1;
948 }
949
950 /* I agree, this is a very rudimental way to load a configuration...
951 will improve later if the config gets more complex */
952 static void loadServerConfig(char *filename) {
953 FILE *fp = fopen(filename,"r");
954 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
955 int linenum = 0;
956 sds line = NULL;
957
958 if (!fp) {
959 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
960 exit(1);
961 }
962 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
963 sds *argv;
964 int argc, j;
965
966 linenum++;
967 line = sdsnew(buf);
968 line = sdstrim(line," \t\r\n");
969
970 /* Skip comments and blank lines*/
971 if (line[0] == '#' || line[0] == '\0') {
972 sdsfree(line);
973 continue;
974 }
975
976 /* Split into arguments */
977 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
978 sdstolower(argv[0]);
979
980 /* Execute config directives */
981 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
982 server.maxidletime = atoi(argv[1]);
983 if (server.maxidletime < 0) {
984 err = "Invalid timeout value"; goto loaderr;
985 }
986 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
987 server.port = atoi(argv[1]);
988 if (server.port < 1 || server.port > 65535) {
989 err = "Invalid port"; goto loaderr;
990 }
991 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
992 server.bindaddr = zstrdup(argv[1]);
993 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
994 int seconds = atoi(argv[1]);
995 int changes = atoi(argv[2]);
996 if (seconds < 1 || changes < 0) {
997 err = "Invalid save parameters"; goto loaderr;
998 }
999 appendServerSaveParams(seconds,changes);
1000 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1001 if (chdir(argv[1]) == -1) {
1002 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1003 argv[1], strerror(errno));
1004 exit(1);
1005 }
1006 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1007 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1008 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1009 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1010 else {
1011 err = "Invalid log level. Must be one of debug, notice, warning";
1012 goto loaderr;
1013 }
1014 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1015 FILE *fp;
1016
1017 server.logfile = zstrdup(argv[1]);
1018 if (!strcasecmp(server.logfile,"stdout")) {
1019 zfree(server.logfile);
1020 server.logfile = NULL;
1021 }
1022 if (server.logfile) {
1023 /* Test if we are able to open the file. The server will not
1024 * be able to abort just for this problem later... */
1025 fp = fopen(server.logfile,"a");
1026 if (fp == NULL) {
1027 err = sdscatprintf(sdsempty(),
1028 "Can't open the log file: %s", strerror(errno));
1029 goto loaderr;
1030 }
1031 fclose(fp);
1032 }
1033 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1034 server.dbnum = atoi(argv[1]);
1035 if (server.dbnum < 1) {
1036 err = "Invalid number of databases"; goto loaderr;
1037 }
1038 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1039 server.maxclients = atoi(argv[1]);
1040 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1041 server.maxmemory = atoi(argv[1]);
1042 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1043 server.masterhost = sdsnew(argv[1]);
1044 server.masterport = atoi(argv[2]);
1045 server.replstate = REDIS_REPL_CONNECT;
1046 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1047 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1048 err = "argument must be 'yes' or 'no'"; goto loaderr;
1049 }
1050 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1051 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1052 err = "argument must be 'yes' or 'no'"; goto loaderr;
1053 }
1054 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1055 server.sharingpoolsize = atoi(argv[1]);
1056 if (server.sharingpoolsize < 1) {
1057 err = "invalid object sharing pool size"; goto loaderr;
1058 }
1059 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1060 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1061 err = "argument must be 'yes' or 'no'"; goto loaderr;
1062 }
1063 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1064 server.requirepass = zstrdup(argv[1]);
1065 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1066 server.pidfile = zstrdup(argv[1]);
1067 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1068 server.dbfilename = zstrdup(argv[1]);
1069 } else {
1070 err = "Bad directive or wrong number of arguments"; goto loaderr;
1071 }
1072 for (j = 0; j < argc; j++)
1073 sdsfree(argv[j]);
1074 zfree(argv);
1075 sdsfree(line);
1076 }
1077 fclose(fp);
1078 return;
1079
1080 loaderr:
1081 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1082 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1083 fprintf(stderr, ">>> '%s'\n", line);
1084 fprintf(stderr, "%s\n", err);
1085 exit(1);
1086 }
1087
1088 static void freeClientArgv(redisClient *c) {
1089 int j;
1090
1091 for (j = 0; j < c->argc; j++)
1092 decrRefCount(c->argv[j]);
1093 c->argc = 0;
1094 }
1095
1096 static void freeClient(redisClient *c) {
1097 listNode *ln;
1098
1099 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1100 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1101 sdsfree(c->querybuf);
1102 listRelease(c->reply);
1103 freeClientArgv(c);
1104 close(c->fd);
1105 ln = listSearchKey(server.clients,c);
1106 assert(ln != NULL);
1107 listDelNode(server.clients,ln);
1108 if (c->flags & REDIS_SLAVE) {
1109 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1110 close(c->repldbfd);
1111 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1112 ln = listSearchKey(l,c);
1113 assert(ln != NULL);
1114 listDelNode(l,ln);
1115 }
1116 if (c->flags & REDIS_MASTER) {
1117 server.master = NULL;
1118 server.replstate = REDIS_REPL_CONNECT;
1119 }
1120 zfree(c->argv);
1121 zfree(c);
1122 }
1123
1124 static void glueReplyBuffersIfNeeded(redisClient *c) {
1125 int totlen = 0;
1126 listNode *ln;
1127 robj *o;
1128
1129 listRewind(c->reply);
1130 while((ln = listYield(c->reply))) {
1131 o = ln->value;
1132 totlen += sdslen(o->ptr);
1133 /* This optimization makes more sense if we don't have to copy
1134 * too much data */
1135 if (totlen > 1024) return;
1136 }
1137 if (totlen > 0) {
1138 char buf[1024];
1139 int copylen = 0;
1140
1141 listRewind(c->reply);
1142 while((ln = listYield(c->reply))) {
1143 o = ln->value;
1144 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1145 copylen += sdslen(o->ptr);
1146 listDelNode(c->reply,ln);
1147 }
1148 /* Now the output buffer is empty, add the new single element */
1149 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1150 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1151 }
1152 }
1153
1154 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1155 redisClient *c = privdata;
1156 int nwritten = 0, totwritten = 0, objlen;
1157 robj *o;
1158 REDIS_NOTUSED(el);
1159 REDIS_NOTUSED(mask);
1160
1161 if (server.glueoutputbuf && listLength(c->reply) > 1)
1162 glueReplyBuffersIfNeeded(c);
1163 while(listLength(c->reply)) {
1164 o = listNodeValue(listFirst(c->reply));
1165 objlen = sdslen(o->ptr);
1166
1167 if (objlen == 0) {
1168 listDelNode(c->reply,listFirst(c->reply));
1169 continue;
1170 }
1171
1172 if (c->flags & REDIS_MASTER) {
1173 nwritten = objlen - c->sentlen;
1174 } else {
1175 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1176 if (nwritten <= 0) break;
1177 }
1178 c->sentlen += nwritten;
1179 totwritten += nwritten;
1180 /* If we fully sent the object on head go to the next one */
1181 if (c->sentlen == objlen) {
1182 listDelNode(c->reply,listFirst(c->reply));
1183 c->sentlen = 0;
1184 }
1185 }
1186 if (nwritten == -1) {
1187 if (errno == EAGAIN) {
1188 nwritten = 0;
1189 } else {
1190 redisLog(REDIS_DEBUG,
1191 "Error writing to client: %s", strerror(errno));
1192 freeClient(c);
1193 return;
1194 }
1195 }
1196 if (totwritten > 0) c->lastinteraction = time(NULL);
1197 if (listLength(c->reply) == 0) {
1198 c->sentlen = 0;
1199 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1200 }
1201 }
1202
1203 static struct redisCommand *lookupCommand(char *name) {
1204 int j = 0;
1205 while(cmdTable[j].name != NULL) {
1206 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1207 j++;
1208 }
1209 return NULL;
1210 }
1211
1212 /* resetClient prepare the client to process the next command */
1213 static void resetClient(redisClient *c) {
1214 freeClientArgv(c);
1215 c->bulklen = -1;
1216 }
1217
1218 /* If this function gets called we already read a whole
1219 * command, argments are in the client argv/argc fields.
1220 * processCommand() execute the command or prepare the
1221 * server for a bulk read from the client.
1222 *
1223 * If 1 is returned the client is still alive and valid and
1224 * and other operations can be performed by the caller. Otherwise
1225 * if 0 is returned the client was destroied (i.e. after QUIT). */
1226 static int processCommand(redisClient *c) {
1227 struct redisCommand *cmd;
1228 long long dirty;
1229
1230 /* Free some memory if needed (maxmemory setting) */
1231 if (server.maxmemory) freeMemoryIfNeeded();
1232
1233 /* The QUIT command is handled as a special case. Normal command
1234 * procs are unable to close the client connection safely */
1235 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1236 freeClient(c);
1237 return 0;
1238 }
1239 cmd = lookupCommand(c->argv[0]->ptr);
1240 if (!cmd) {
1241 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1242 resetClient(c);
1243 return 1;
1244 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1245 (c->argc < -cmd->arity)) {
1246 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1247 resetClient(c);
1248 return 1;
1249 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1250 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1251 resetClient(c);
1252 return 1;
1253 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1254 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1255
1256 decrRefCount(c->argv[c->argc-1]);
1257 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1258 c->argc--;
1259 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1260 resetClient(c);
1261 return 1;
1262 }
1263 c->argc--;
1264 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1265 /* It is possible that the bulk read is already in the
1266 * buffer. Check this condition and handle it accordingly */
1267 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1268 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1269 c->argc++;
1270 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1271 } else {
1272 return 1;
1273 }
1274 }
1275 /* Let's try to share objects on the command arguments vector */
1276 if (server.shareobjects) {
1277 int j;
1278 for(j = 1; j < c->argc; j++)
1279 c->argv[j] = tryObjectSharing(c->argv[j]);
1280 }
1281 /* Check if the user is authenticated */
1282 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1283 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1284 resetClient(c);
1285 return 1;
1286 }
1287
1288 /* Exec the command */
1289 dirty = server.dirty;
1290 cmd->proc(c);
1291 if (server.dirty-dirty != 0 && listLength(server.slaves))
1292 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1293 if (listLength(server.monitors))
1294 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1295 server.stat_numcommands++;
1296
1297 /* Prepare the client for the next command */
1298 if (c->flags & REDIS_CLOSE) {
1299 freeClient(c);
1300 return 0;
1301 }
1302 resetClient(c);
1303 return 1;
1304 }
1305
1306 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1307 listNode *ln;
1308 int outc = 0, j;
1309 robj **outv;
1310 /* (args*2)+1 is enough room for args, spaces, newlines */
1311 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1312
1313 if (argc <= REDIS_STATIC_ARGS) {
1314 outv = static_outv;
1315 } else {
1316 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1317 if (!outv) oom("replicationFeedSlaves");
1318 }
1319
1320 for (j = 0; j < argc; j++) {
1321 if (j != 0) outv[outc++] = shared.space;
1322 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1323 robj *lenobj;
1324
1325 lenobj = createObject(REDIS_STRING,
1326 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv[j]->ptr)));
1327 lenobj->refcount = 0;
1328 outv[outc++] = lenobj;
1329 }
1330 outv[outc++] = argv[j];
1331 }
1332 outv[outc++] = shared.crlf;
1333
1334 /* Increment all the refcounts at start and decrement at end in order to
1335 * be sure to free objects if there is no slave in a replication state
1336 * able to be feed with commands */
1337 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1338 listRewind(slaves);
1339 while((ln = listYield(slaves))) {
1340 redisClient *slave = ln->value;
1341
1342 /* Don't feed slaves that are still waiting for BGSAVE to start */
1343 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1344
1345 /* Feed all the other slaves, MONITORs and so on */
1346 if (slave->slaveseldb != dictid) {
1347 robj *selectcmd;
1348
1349 switch(dictid) {
1350 case 0: selectcmd = shared.select0; break;
1351 case 1: selectcmd = shared.select1; break;
1352 case 2: selectcmd = shared.select2; break;
1353 case 3: selectcmd = shared.select3; break;
1354 case 4: selectcmd = shared.select4; break;
1355 case 5: selectcmd = shared.select5; break;
1356 case 6: selectcmd = shared.select6; break;
1357 case 7: selectcmd = shared.select7; break;
1358 case 8: selectcmd = shared.select8; break;
1359 case 9: selectcmd = shared.select9; break;
1360 default:
1361 selectcmd = createObject(REDIS_STRING,
1362 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1363 selectcmd->refcount = 0;
1364 break;
1365 }
1366 addReply(slave,selectcmd);
1367 slave->slaveseldb = dictid;
1368 }
1369 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1370 }
1371 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1372 if (outv != static_outv) zfree(outv);
1373 }
1374
1375 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1376 redisClient *c = (redisClient*) privdata;
1377 char buf[REDIS_IOBUF_LEN];
1378 int nread;
1379 REDIS_NOTUSED(el);
1380 REDIS_NOTUSED(mask);
1381
1382 nread = read(fd, buf, REDIS_IOBUF_LEN);
1383 if (nread == -1) {
1384 if (errno == EAGAIN) {
1385 nread = 0;
1386 } else {
1387 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1388 freeClient(c);
1389 return;
1390 }
1391 } else if (nread == 0) {
1392 redisLog(REDIS_DEBUG, "Client closed connection");
1393 freeClient(c);
1394 return;
1395 }
1396 if (nread) {
1397 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1398 c->lastinteraction = time(NULL);
1399 } else {
1400 return;
1401 }
1402
1403 again:
1404 if (c->bulklen == -1) {
1405 /* Read the first line of the query */
1406 char *p = strchr(c->querybuf,'\n');
1407 size_t querylen;
1408 if (p) {
1409 sds query, *argv;
1410 int argc, j;
1411
1412 query = c->querybuf;
1413 c->querybuf = sdsempty();
1414 querylen = 1+(p-(query));
1415 if (sdslen(query) > querylen) {
1416 /* leave data after the first line of the query in the buffer */
1417 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1418 }
1419 *p = '\0'; /* remove "\n" */
1420 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1421 sdsupdatelen(query);
1422
1423 /* Now we can split the query in arguments */
1424 if (sdslen(query) == 0) {
1425 /* Ignore empty query */
1426 sdsfree(query);
1427 return;
1428 }
1429 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1430 if (argv == NULL) oom("sdssplitlen");
1431 sdsfree(query);
1432
1433 if (c->argv) zfree(c->argv);
1434 c->argv = zmalloc(sizeof(robj*)*argc);
1435 if (c->argv == NULL) oom("allocating arguments list for client");
1436
1437 for (j = 0; j < argc; j++) {
1438 if (sdslen(argv[j])) {
1439 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1440 c->argc++;
1441 } else {
1442 sdsfree(argv[j]);
1443 }
1444 }
1445 zfree(argv);
1446 /* Execute the command. If the client is still valid
1447 * after processCommand() return and there is something
1448 * on the query buffer try to process the next command. */
1449 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1450 return;
1451 } else if (sdslen(c->querybuf) >= 1024*32) {
1452 redisLog(REDIS_DEBUG, "Client protocol error");
1453 freeClient(c);
1454 return;
1455 }
1456 } else {
1457 /* Bulk read handling. Note that if we are at this point
1458 the client already sent a command terminated with a newline,
1459 we are reading the bulk data that is actually the last
1460 argument of the command. */
1461 int qbl = sdslen(c->querybuf);
1462
1463 if (c->bulklen <= qbl) {
1464 /* Copy everything but the final CRLF as final argument */
1465 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1466 c->argc++;
1467 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1468 processCommand(c);
1469 return;
1470 }
1471 }
1472 }
1473
1474 static int selectDb(redisClient *c, int id) {
1475 if (id < 0 || id >= server.dbnum)
1476 return REDIS_ERR;
1477 c->db = &server.db[id];
1478 return REDIS_OK;
1479 }
1480
1481 static void *dupClientReplyValue(void *o) {
1482 incrRefCount((robj*)o);
1483 return 0;
1484 }
1485
1486 static redisClient *createClient(int fd) {
1487 redisClient *c = zmalloc(sizeof(*c));
1488
1489 anetNonBlock(NULL,fd);
1490 anetTcpNoDelay(NULL,fd);
1491 if (!c) return NULL;
1492 selectDb(c,0);
1493 c->fd = fd;
1494 c->querybuf = sdsempty();
1495 c->argc = 0;
1496 c->argv = NULL;
1497 c->bulklen = -1;
1498 c->sentlen = 0;
1499 c->flags = 0;
1500 c->lastinteraction = time(NULL);
1501 c->authenticated = 0;
1502 c->replstate = REDIS_REPL_NONE;
1503 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1504 listSetFreeMethod(c->reply,decrRefCount);
1505 listSetDupMethod(c->reply,dupClientReplyValue);
1506 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1507 readQueryFromClient, c, NULL) == AE_ERR) {
1508 freeClient(c);
1509 return NULL;
1510 }
1511 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1512 return c;
1513 }
1514
1515 static void addReply(redisClient *c, robj *obj) {
1516 if (listLength(c->reply) == 0 &&
1517 (c->replstate == REDIS_REPL_NONE ||
1518 c->replstate == REDIS_REPL_ONLINE) &&
1519 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1520 sendReplyToClient, c, NULL) == AE_ERR) return;
1521 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1522 incrRefCount(obj);
1523 }
1524
1525 static void addReplySds(redisClient *c, sds s) {
1526 robj *o = createObject(REDIS_STRING,s);
1527 addReply(c,o);
1528 decrRefCount(o);
1529 }
1530
1531 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1532 int cport, cfd;
1533 char cip[128];
1534 redisClient *c;
1535 REDIS_NOTUSED(el);
1536 REDIS_NOTUSED(mask);
1537 REDIS_NOTUSED(privdata);
1538
1539 cfd = anetAccept(server.neterr, fd, cip, &cport);
1540 if (cfd == AE_ERR) {
1541 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1542 return;
1543 }
1544 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1545 if ((c = createClient(cfd)) == NULL) {
1546 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1547 close(cfd); /* May be already closed, just ingore errors */
1548 return;
1549 }
1550 /* If maxclient directive is set and this is one client more... close the
1551 * connection. Note that we create the client instead to check before
1552 * for this condition, since now the socket is already set in nonblocking
1553 * mode and we can send an error for free using the Kernel I/O */
1554 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1555 char *err = "-ERR max number of clients reached\r\n";
1556
1557 /* That's a best effort error message, don't check write errors */
1558 (void) write(c->fd,err,strlen(err));
1559 freeClient(c);
1560 return;
1561 }
1562 server.stat_numconnections++;
1563 }
1564
1565 /* ======================= Redis objects implementation ===================== */
1566
1567 static robj *createObject(int type, void *ptr) {
1568 robj *o;
1569
1570 if (listLength(server.objfreelist)) {
1571 listNode *head = listFirst(server.objfreelist);
1572 o = listNodeValue(head);
1573 listDelNode(server.objfreelist,head);
1574 } else {
1575 o = zmalloc(sizeof(*o));
1576 }
1577 if (!o) oom("createObject");
1578 o->type = type;
1579 o->ptr = ptr;
1580 o->refcount = 1;
1581 return o;
1582 }
1583
1584 static robj *createStringObject(char *ptr, size_t len) {
1585 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1586 }
1587
1588 static robj *createListObject(void) {
1589 list *l = listCreate();
1590
1591 if (!l) oom("listCreate");
1592 listSetFreeMethod(l,decrRefCount);
1593 return createObject(REDIS_LIST,l);
1594 }
1595
1596 static robj *createSetObject(void) {
1597 dict *d = dictCreate(&setDictType,NULL);
1598 if (!d) oom("dictCreate");
1599 return createObject(REDIS_SET,d);
1600 }
1601
1602 static void freeStringObject(robj *o) {
1603 sdsfree(o->ptr);
1604 }
1605
1606 static void freeListObject(robj *o) {
1607 listRelease((list*) o->ptr);
1608 }
1609
1610 static void freeSetObject(robj *o) {
1611 dictRelease((dict*) o->ptr);
1612 }
1613
1614 static void freeHashObject(robj *o) {
1615 dictRelease((dict*) o->ptr);
1616 }
1617
1618 static void incrRefCount(robj *o) {
1619 o->refcount++;
1620 #ifdef DEBUG_REFCOUNT
1621 if (o->type == REDIS_STRING)
1622 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1623 #endif
1624 }
1625
1626 static void decrRefCount(void *obj) {
1627 robj *o = obj;
1628
1629 #ifdef DEBUG_REFCOUNT
1630 if (o->type == REDIS_STRING)
1631 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1632 #endif
1633 if (--(o->refcount) == 0) {
1634 switch(o->type) {
1635 case REDIS_STRING: freeStringObject(o); break;
1636 case REDIS_LIST: freeListObject(o); break;
1637 case REDIS_SET: freeSetObject(o); break;
1638 case REDIS_HASH: freeHashObject(o); break;
1639 default: assert(0 != 0); break;
1640 }
1641 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1642 !listAddNodeHead(server.objfreelist,o))
1643 zfree(o);
1644 }
1645 }
1646
1647 /* Try to share an object against the shared objects pool */
1648 static robj *tryObjectSharing(robj *o) {
1649 struct dictEntry *de;
1650 unsigned long c;
1651
1652 if (o == NULL || server.shareobjects == 0) return o;
1653
1654 assert(o->type == REDIS_STRING);
1655 de = dictFind(server.sharingpool,o);
1656 if (de) {
1657 robj *shared = dictGetEntryKey(de);
1658
1659 c = ((unsigned long) dictGetEntryVal(de))+1;
1660 dictGetEntryVal(de) = (void*) c;
1661 incrRefCount(shared);
1662 decrRefCount(o);
1663 return shared;
1664 } else {
1665 /* Here we are using a stream algorihtm: Every time an object is
1666 * shared we increment its count, everytime there is a miss we
1667 * recrement the counter of a random object. If this object reaches
1668 * zero we remove the object and put the current object instead. */
1669 if (dictSize(server.sharingpool) >=
1670 server.sharingpoolsize) {
1671 de = dictGetRandomKey(server.sharingpool);
1672 assert(de != NULL);
1673 c = ((unsigned long) dictGetEntryVal(de))-1;
1674 dictGetEntryVal(de) = (void*) c;
1675 if (c == 0) {
1676 dictDelete(server.sharingpool,de->key);
1677 }
1678 } else {
1679 c = 0; /* If the pool is empty we want to add this object */
1680 }
1681 if (c == 0) {
1682 int retval;
1683
1684 retval = dictAdd(server.sharingpool,o,(void*)1);
1685 assert(retval == DICT_OK);
1686 incrRefCount(o);
1687 }
1688 return o;
1689 }
1690 }
1691
1692 static robj *lookupKey(redisDb *db, robj *key) {
1693 dictEntry *de = dictFind(db->dict,key);
1694 return de ? dictGetEntryVal(de) : NULL;
1695 }
1696
1697 static robj *lookupKeyRead(redisDb *db, robj *key) {
1698 expireIfNeeded(db,key);
1699 return lookupKey(db,key);
1700 }
1701
1702 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1703 deleteIfVolatile(db,key);
1704 return lookupKey(db,key);
1705 }
1706
1707 static int deleteKey(redisDb *db, robj *key) {
1708 int retval;
1709
1710 /* We need to protect key from destruction: after the first dictDelete()
1711 * it may happen that 'key' is no longer valid if we don't increment
1712 * it's count. This may happen when we get the object reference directly
1713 * from the hash table with dictRandomKey() or dict iterators */
1714 incrRefCount(key);
1715 if (dictSize(db->expires)) dictDelete(db->expires,key);
1716 retval = dictDelete(db->dict,key);
1717 decrRefCount(key);
1718
1719 return retval == DICT_OK;
1720 }
1721
1722 /*============================ DB saving/loading ============================ */
1723
1724 static int rdbSaveType(FILE *fp, unsigned char type) {
1725 if (fwrite(&type,1,1,fp) == 0) return -1;
1726 return 0;
1727 }
1728
1729 static int rdbSaveTime(FILE *fp, time_t t) {
1730 int32_t t32 = (int32_t) t;
1731 if (fwrite(&t32,4,1,fp) == 0) return -1;
1732 return 0;
1733 }
1734
1735 /* check rdbLoadLen() comments for more info */
1736 static int rdbSaveLen(FILE *fp, uint32_t len) {
1737 unsigned char buf[2];
1738
1739 if (len < (1<<6)) {
1740 /* Save a 6 bit len */
1741 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
1742 if (fwrite(buf,1,1,fp) == 0) return -1;
1743 } else if (len < (1<<14)) {
1744 /* Save a 14 bit len */
1745 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
1746 buf[1] = len&0xFF;
1747 if (fwrite(buf,2,1,fp) == 0) return -1;
1748 } else {
1749 /* Save a 32 bit len */
1750 buf[0] = (REDIS_RDB_32BITLEN<<6);
1751 if (fwrite(buf,1,1,fp) == 0) return -1;
1752 len = htonl(len);
1753 if (fwrite(&len,4,1,fp) == 0) return -1;
1754 }
1755 return 0;
1756 }
1757
1758 /* String objects in the form "2391" "-100" without any space and with a
1759 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1760 * encoded as integers to save space */
1761 int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
1762 long long value;
1763 char *endptr, buf[32];
1764
1765 /* Check if it's possible to encode this value as a number */
1766 value = strtoll(s, &endptr, 10);
1767 if (endptr[0] != '\0') return 0;
1768 snprintf(buf,32,"%lld",value);
1769
1770 /* If the number converted back into a string is not identical
1771 * then it's not possible to encode the string as integer */
1772 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
1773
1774 /* Finally check if it fits in our ranges */
1775 if (value >= -(1<<7) && value <= (1<<7)-1) {
1776 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
1777 enc[1] = value&0xFF;
1778 return 2;
1779 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
1780 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
1781 enc[1] = value&0xFF;
1782 enc[2] = (value>>8)&0xFF;
1783 return 3;
1784 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
1785 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
1786 enc[1] = value&0xFF;
1787 enc[2] = (value>>8)&0xFF;
1788 enc[3] = (value>>16)&0xFF;
1789 enc[4] = (value>>24)&0xFF;
1790 return 5;
1791 } else {
1792 return 0;
1793 }
1794 }
1795
1796 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
1797 unsigned int comprlen, outlen;
1798 unsigned char byte;
1799 void *out;
1800
1801 /* We require at least four bytes compression for this to be worth it */
1802 outlen = sdslen(obj->ptr)-4;
1803 if (outlen <= 0) return 0;
1804 if ((out = zmalloc(outlen+1)) == NULL) return 0;
1805 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
1806 if (comprlen == 0) {
1807 zfree(out);
1808 return 0;
1809 }
1810 /* Data compressed! Let's save it on disk */
1811 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
1812 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
1813 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
1814 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
1815 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
1816 zfree(out);
1817 return comprlen;
1818
1819 writeerr:
1820 zfree(out);
1821 return -1;
1822 }
1823
1824 /* Save a string objet as [len][data] on disk. If the object is a string
1825 * representation of an integer value we try to safe it in a special form */
1826 static int rdbSaveStringObject(FILE *fp, robj *obj) {
1827 size_t len = sdslen(obj->ptr);
1828 int enclen;
1829
1830 /* Try integer encoding */
1831 if (len <= 11) {
1832 unsigned char buf[5];
1833 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
1834 if (fwrite(buf,enclen,1,fp) == 0) return -1;
1835 return 0;
1836 }
1837 }
1838
1839 /* Try LZF compression - under 20 bytes it's unable to compress even
1840 * aaaaaaaaaaaaaaaaaa so skip it */
1841 if (1 && len > 20) {
1842 int retval;
1843
1844 retval = rdbSaveLzfStringObject(fp,obj);
1845 if (retval == -1) return -1;
1846 if (retval > 0) return 0;
1847 /* retval == 0 means data can't be compressed, save the old way */
1848 }
1849
1850 /* Store verbatim */
1851 if (rdbSaveLen(fp,len) == -1) return -1;
1852 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
1853 return 0;
1854 }
1855
1856 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1857 static int rdbSave(char *filename) {
1858 dictIterator *di = NULL;
1859 dictEntry *de;
1860 FILE *fp;
1861 char tmpfile[256];
1862 int j;
1863 time_t now = time(NULL);
1864
1865 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
1866 fp = fopen(tmpfile,"w");
1867 if (!fp) {
1868 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
1869 return REDIS_ERR;
1870 }
1871 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
1872 for (j = 0; j < server.dbnum; j++) {
1873 redisDb *db = server.db+j;
1874 dict *d = db->dict;
1875 if (dictSize(d) == 0) continue;
1876 di = dictGetIterator(d);
1877 if (!di) {
1878 fclose(fp);
1879 return REDIS_ERR;
1880 }
1881
1882 /* Write the SELECT DB opcode */
1883 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
1884 if (rdbSaveLen(fp,j) == -1) goto werr;
1885
1886 /* Iterate this DB writing every entry */
1887 while((de = dictNext(di)) != NULL) {
1888 robj *key = dictGetEntryKey(de);
1889 robj *o = dictGetEntryVal(de);
1890 time_t expiretime = getExpire(db,key);
1891
1892 /* Save the expire time */
1893 if (expiretime != -1) {
1894 /* If this key is already expired skip it */
1895 if (expiretime < now) continue;
1896 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
1897 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
1898 }
1899 /* Save the key and associated value */
1900 if (rdbSaveType(fp,o->type) == -1) goto werr;
1901 if (rdbSaveStringObject(fp,key) == -1) goto werr;
1902 if (o->type == REDIS_STRING) {
1903 /* Save a string value */
1904 if (rdbSaveStringObject(fp,o) == -1) goto werr;
1905 } else if (o->type == REDIS_LIST) {
1906 /* Save a list value */
1907 list *list = o->ptr;
1908 listNode *ln;
1909
1910 listRewind(list);
1911 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
1912 while((ln = listYield(list))) {
1913 robj *eleobj = listNodeValue(ln);
1914
1915 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1916 }
1917 } else if (o->type == REDIS_SET) {
1918 /* Save a set value */
1919 dict *set = o->ptr;
1920 dictIterator *di = dictGetIterator(set);
1921 dictEntry *de;
1922
1923 if (!set) oom("dictGetIteraotr");
1924 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
1925 while((de = dictNext(di)) != NULL) {
1926 robj *eleobj = dictGetEntryKey(de);
1927
1928 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1929 }
1930 dictReleaseIterator(di);
1931 } else {
1932 assert(0 != 0);
1933 }
1934 }
1935 dictReleaseIterator(di);
1936 }
1937 /* EOF opcode */
1938 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
1939
1940 /* Make sure data will not remain on the OS's output buffers */
1941 fflush(fp);
1942 fsync(fileno(fp));
1943 fclose(fp);
1944
1945 /* Use RENAME to make sure the DB file is changed atomically only
1946 * if the generate DB file is ok. */
1947 if (rename(tmpfile,filename) == -1) {
1948 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
1949 unlink(tmpfile);
1950 return REDIS_ERR;
1951 }
1952 redisLog(REDIS_NOTICE,"DB saved on disk");
1953 server.dirty = 0;
1954 server.lastsave = time(NULL);
1955 return REDIS_OK;
1956
1957 werr:
1958 fclose(fp);
1959 unlink(tmpfile);
1960 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1961 if (di) dictReleaseIterator(di);
1962 return REDIS_ERR;
1963 }
1964
1965 static int rdbSaveBackground(char *filename) {
1966 pid_t childpid;
1967
1968 if (server.bgsaveinprogress) return REDIS_ERR;
1969 if ((childpid = fork()) == 0) {
1970 /* Child */
1971 close(server.fd);
1972 if (rdbSave(filename) == REDIS_OK) {
1973 exit(0);
1974 } else {
1975 exit(1);
1976 }
1977 } else {
1978 /* Parent */
1979 if (childpid == -1) {
1980 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
1981 strerror(errno));
1982 return REDIS_ERR;
1983 }
1984 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
1985 server.bgsaveinprogress = 1;
1986 return REDIS_OK;
1987 }
1988 return REDIS_OK; /* unreached */
1989 }
1990
1991 static int rdbLoadType(FILE *fp) {
1992 unsigned char type;
1993 if (fread(&type,1,1,fp) == 0) return -1;
1994 return type;
1995 }
1996
1997 static time_t rdbLoadTime(FILE *fp) {
1998 int32_t t32;
1999 if (fread(&t32,4,1,fp) == 0) return -1;
2000 return (time_t) t32;
2001 }
2002
2003 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2004 * of this file for a description of how this are stored on disk.
2005 *
2006 * isencoded is set to 1 if the readed length is not actually a length but
2007 * an "encoding type", check the above comments for more info */
2008 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2009 unsigned char buf[2];
2010 uint32_t len;
2011
2012 if (isencoded) *isencoded = 0;
2013 if (rdbver == 0) {
2014 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2015 return ntohl(len);
2016 } else {
2017 int type;
2018
2019 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2020 type = (buf[0]&0xC0)>>6;
2021 if (type == REDIS_RDB_6BITLEN) {
2022 /* Read a 6 bit len */
2023 return buf[0]&0x3F;
2024 } else if (type == REDIS_RDB_ENCVAL) {
2025 /* Read a 6 bit len encoding type */
2026 if (isencoded) *isencoded = 1;
2027 return buf[0]&0x3F;
2028 } else if (type == REDIS_RDB_14BITLEN) {
2029 /* Read a 14 bit len */
2030 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2031 return ((buf[0]&0x3F)<<8)|buf[1];
2032 } else {
2033 /* Read a 32 bit len */
2034 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2035 return ntohl(len);
2036 }
2037 }
2038 }
2039
2040 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2041 unsigned char enc[4];
2042 long long val;
2043
2044 if (enctype == REDIS_RDB_ENC_INT8) {
2045 if (fread(enc,1,1,fp) == 0) return NULL;
2046 val = (signed char)enc[0];
2047 } else if (enctype == REDIS_RDB_ENC_INT16) {
2048 uint16_t v;
2049 if (fread(enc,2,1,fp) == 0) return NULL;
2050 v = enc[0]|(enc[1]<<8);
2051 val = (int16_t)v;
2052 } else if (enctype == REDIS_RDB_ENC_INT32) {
2053 uint32_t v;
2054 if (fread(enc,4,1,fp) == 0) return NULL;
2055 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2056 val = (int32_t)v;
2057 } else {
2058 val = 0; /* anti-warning */
2059 assert(0!=0);
2060 }
2061 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2062 }
2063
2064 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2065 unsigned int len, clen;
2066 unsigned char *c = NULL;
2067 sds val = NULL;
2068
2069 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2070 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2071 if ((c = zmalloc(clen)) == NULL) goto err;
2072 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2073 if (fread(c,clen,1,fp) == 0) goto err;
2074 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2075 zfree(c);
2076 return createObject(REDIS_STRING,val);
2077 err:
2078 zfree(c);
2079 sdsfree(val);
2080 return NULL;
2081 }
2082
2083 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2084 int isencoded;
2085 uint32_t len;
2086 sds val;
2087
2088 len = rdbLoadLen(fp,rdbver,&isencoded);
2089 if (isencoded) {
2090 switch(len) {
2091 case REDIS_RDB_ENC_INT8:
2092 case REDIS_RDB_ENC_INT16:
2093 case REDIS_RDB_ENC_INT32:
2094 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2095 case REDIS_RDB_ENC_LZF:
2096 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2097 default:
2098 assert(0!=0);
2099 }
2100 }
2101
2102 if (len == REDIS_RDB_LENERR) return NULL;
2103 val = sdsnewlen(NULL,len);
2104 if (len && fread(val,len,1,fp) == 0) {
2105 sdsfree(val);
2106 return NULL;
2107 }
2108 return tryObjectSharing(createObject(REDIS_STRING,val));
2109 }
2110
2111 static int rdbLoad(char *filename) {
2112 FILE *fp;
2113 robj *keyobj = NULL;
2114 uint32_t dbid;
2115 int type, retval, rdbver;
2116 dict *d = server.db[0].dict;
2117 redisDb *db = server.db+0;
2118 char buf[1024];
2119 time_t expiretime = -1, now = time(NULL);
2120
2121 fp = fopen(filename,"r");
2122 if (!fp) return REDIS_ERR;
2123 if (fread(buf,9,1,fp) == 0) goto eoferr;
2124 buf[9] = '\0';
2125 if (memcmp(buf,"REDIS",5) != 0) {
2126 fclose(fp);
2127 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2128 return REDIS_ERR;
2129 }
2130 rdbver = atoi(buf+5);
2131 if (rdbver > 1) {
2132 fclose(fp);
2133 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2134 return REDIS_ERR;
2135 }
2136 while(1) {
2137 robj *o;
2138
2139 /* Read type. */
2140 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2141 if (type == REDIS_EXPIRETIME) {
2142 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2143 /* We read the time so we need to read the object type again */
2144 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2145 }
2146 if (type == REDIS_EOF) break;
2147 /* Handle SELECT DB opcode as a special case */
2148 if (type == REDIS_SELECTDB) {
2149 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2150 goto eoferr;
2151 if (dbid >= (unsigned)server.dbnum) {
2152 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2153 exit(1);
2154 }
2155 db = server.db+dbid;
2156 d = db->dict;
2157 continue;
2158 }
2159 /* Read key */
2160 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2161
2162 if (type == REDIS_STRING) {
2163 /* Read string value */
2164 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2165 } else if (type == REDIS_LIST || type == REDIS_SET) {
2166 /* Read list/set value */
2167 uint32_t listlen;
2168
2169 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2170 goto eoferr;
2171 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2172 /* Load every single element of the list/set */
2173 while(listlen--) {
2174 robj *ele;
2175
2176 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2177 if (type == REDIS_LIST) {
2178 if (!listAddNodeTail((list*)o->ptr,ele))
2179 oom("listAddNodeTail");
2180 } else {
2181 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2182 oom("dictAdd");
2183 }
2184 }
2185 } else {
2186 assert(0 != 0);
2187 }
2188 /* Add the new object in the hash table */
2189 retval = dictAdd(d,keyobj,o);
2190 if (retval == DICT_ERR) {
2191 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2192 exit(1);
2193 }
2194 /* Set the expire time if needed */
2195 if (expiretime != -1) {
2196 setExpire(db,keyobj,expiretime);
2197 /* Delete this key if already expired */
2198 if (expiretime < now) deleteKey(db,keyobj);
2199 expiretime = -1;
2200 }
2201 keyobj = o = NULL;
2202 }
2203 fclose(fp);
2204 return REDIS_OK;
2205
2206 eoferr: /* unexpected end of file is handled here with a fatal exit */
2207 if (keyobj) decrRefCount(keyobj);
2208 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2209 exit(1);
2210 return REDIS_ERR; /* Just to avoid warning */
2211 }
2212
2213 /*================================== Commands =============================== */
2214
2215 static void authCommand(redisClient *c) {
2216 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2217 c->authenticated = 1;
2218 addReply(c,shared.ok);
2219 } else {
2220 c->authenticated = 0;
2221 addReply(c,shared.err);
2222 }
2223 }
2224
2225 static void pingCommand(redisClient *c) {
2226 addReply(c,shared.pong);
2227 }
2228
2229 static void echoCommand(redisClient *c) {
2230 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
2231 (int)sdslen(c->argv[1]->ptr)));
2232 addReply(c,c->argv[1]);
2233 addReply(c,shared.crlf);
2234 }
2235
2236 /*=================================== Strings =============================== */
2237
2238 static void setGenericCommand(redisClient *c, int nx) {
2239 int retval;
2240
2241 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2242 if (retval == DICT_ERR) {
2243 if (!nx) {
2244 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2245 incrRefCount(c->argv[2]);
2246 } else {
2247 addReply(c,shared.czero);
2248 return;
2249 }
2250 } else {
2251 incrRefCount(c->argv[1]);
2252 incrRefCount(c->argv[2]);
2253 }
2254 server.dirty++;
2255 removeExpire(c->db,c->argv[1]);
2256 addReply(c, nx ? shared.cone : shared.ok);
2257 }
2258
2259 static void setCommand(redisClient *c) {
2260 setGenericCommand(c,0);
2261 }
2262
2263 static void setnxCommand(redisClient *c) {
2264 setGenericCommand(c,1);
2265 }
2266
2267 static void getCommand(redisClient *c) {
2268 robj *o = lookupKeyRead(c->db,c->argv[1]);
2269
2270 if (o == NULL) {
2271 addReply(c,shared.nullbulk);
2272 } else {
2273 if (o->type != REDIS_STRING) {
2274 addReply(c,shared.wrongtypeerr);
2275 } else {
2276 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2277 addReply(c,o);
2278 addReply(c,shared.crlf);
2279 }
2280 }
2281 }
2282
2283 static void getSetCommand(redisClient *c) {
2284 getCommand(c);
2285 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2286 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2287 } else {
2288 incrRefCount(c->argv[1]);
2289 }
2290 incrRefCount(c->argv[2]);
2291 server.dirty++;
2292 removeExpire(c->db,c->argv[1]);
2293 }
2294
2295 static void mgetCommand(redisClient *c) {
2296 int j;
2297
2298 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2299 for (j = 1; j < c->argc; j++) {
2300 robj *o = lookupKeyRead(c->db,c->argv[j]);
2301 if (o == NULL) {
2302 addReply(c,shared.nullbulk);
2303 } else {
2304 if (o->type != REDIS_STRING) {
2305 addReply(c,shared.nullbulk);
2306 } else {
2307 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2308 addReply(c,o);
2309 addReply(c,shared.crlf);
2310 }
2311 }
2312 }
2313 }
2314
2315 static void incrDecrCommand(redisClient *c, long long incr) {
2316 long long value;
2317 int retval;
2318 robj *o;
2319
2320 o = lookupKeyWrite(c->db,c->argv[1]);
2321 if (o == NULL) {
2322 value = 0;
2323 } else {
2324 if (o->type != REDIS_STRING) {
2325 value = 0;
2326 } else {
2327 char *eptr;
2328
2329 value = strtoll(o->ptr, &eptr, 10);
2330 }
2331 }
2332
2333 value += incr;
2334 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2335 retval = dictAdd(c->db->dict,c->argv[1],o);
2336 if (retval == DICT_ERR) {
2337 dictReplace(c->db->dict,c->argv[1],o);
2338 removeExpire(c->db,c->argv[1]);
2339 } else {
2340 incrRefCount(c->argv[1]);
2341 }
2342 server.dirty++;
2343 addReply(c,shared.colon);
2344 addReply(c,o);
2345 addReply(c,shared.crlf);
2346 }
2347
2348 static void incrCommand(redisClient *c) {
2349 incrDecrCommand(c,1);
2350 }
2351
2352 static void decrCommand(redisClient *c) {
2353 incrDecrCommand(c,-1);
2354 }
2355
2356 static void incrbyCommand(redisClient *c) {
2357 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2358 incrDecrCommand(c,incr);
2359 }
2360
2361 static void decrbyCommand(redisClient *c) {
2362 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2363 incrDecrCommand(c,-incr);
2364 }
2365
2366 /* ========================= Type agnostic commands ========================= */
2367
2368 static void delCommand(redisClient *c) {
2369 int deleted = 0, j;
2370
2371 for (j = 1; j < c->argc; j++) {
2372 if (deleteKey(c->db,c->argv[j])) {
2373 server.dirty++;
2374 deleted++;
2375 }
2376 }
2377 switch(deleted) {
2378 case 0:
2379 addReply(c,shared.czero);
2380 break;
2381 case 1:
2382 addReply(c,shared.cone);
2383 break;
2384 default:
2385 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2386 break;
2387 }
2388 }
2389
2390 static void existsCommand(redisClient *c) {
2391 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2392 }
2393
2394 static void selectCommand(redisClient *c) {
2395 int id = atoi(c->argv[1]->ptr);
2396
2397 if (selectDb(c,id) == REDIS_ERR) {
2398 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2399 } else {
2400 addReply(c,shared.ok);
2401 }
2402 }
2403
2404 static void randomkeyCommand(redisClient *c) {
2405 dictEntry *de;
2406
2407 while(1) {
2408 de = dictGetRandomKey(c->db->dict);
2409 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2410 }
2411 if (de == NULL) {
2412 addReply(c,shared.plus);
2413 addReply(c,shared.crlf);
2414 } else {
2415 addReply(c,shared.plus);
2416 addReply(c,dictGetEntryKey(de));
2417 addReply(c,shared.crlf);
2418 }
2419 }
2420
2421 static void keysCommand(redisClient *c) {
2422 dictIterator *di;
2423 dictEntry *de;
2424 sds pattern = c->argv[1]->ptr;
2425 int plen = sdslen(pattern);
2426 int numkeys = 0, keyslen = 0;
2427 robj *lenobj = createObject(REDIS_STRING,NULL);
2428
2429 di = dictGetIterator(c->db->dict);
2430 if (!di) oom("dictGetIterator");
2431 addReply(c,lenobj);
2432 decrRefCount(lenobj);
2433 while((de = dictNext(di)) != NULL) {
2434 robj *keyobj = dictGetEntryKey(de);
2435
2436 sds key = keyobj->ptr;
2437 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2438 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2439 if (expireIfNeeded(c->db,keyobj) == 0) {
2440 if (numkeys != 0)
2441 addReply(c,shared.space);
2442 addReply(c,keyobj);
2443 numkeys++;
2444 keyslen += sdslen(key);
2445 }
2446 }
2447 }
2448 dictReleaseIterator(di);
2449 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2450 addReply(c,shared.crlf);
2451 }
2452
2453 static void dbsizeCommand(redisClient *c) {
2454 addReplySds(c,
2455 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2456 }
2457
2458 static void lastsaveCommand(redisClient *c) {
2459 addReplySds(c,
2460 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2461 }
2462
2463 static void typeCommand(redisClient *c) {
2464 robj *o;
2465 char *type;
2466
2467 o = lookupKeyRead(c->db,c->argv[1]);
2468 if (o == NULL) {
2469 type = "+none";
2470 } else {
2471 switch(o->type) {
2472 case REDIS_STRING: type = "+string"; break;
2473 case REDIS_LIST: type = "+list"; break;
2474 case REDIS_SET: type = "+set"; break;
2475 default: type = "unknown"; break;
2476 }
2477 }
2478 addReplySds(c,sdsnew(type));
2479 addReply(c,shared.crlf);
2480 }
2481
2482 static void saveCommand(redisClient *c) {
2483 if (server.bgsaveinprogress) {
2484 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2485 return;
2486 }
2487 if (rdbSave(server.dbfilename) == REDIS_OK) {
2488 addReply(c,shared.ok);
2489 } else {
2490 addReply(c,shared.err);
2491 }
2492 }
2493
2494 static void bgsaveCommand(redisClient *c) {
2495 if (server.bgsaveinprogress) {
2496 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2497 return;
2498 }
2499 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2500 addReply(c,shared.ok);
2501 } else {
2502 addReply(c,shared.err);
2503 }
2504 }
2505
2506 static void shutdownCommand(redisClient *c) {
2507 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2508 /* XXX: TODO kill the child if there is a bgsave in progress */
2509 if (rdbSave(server.dbfilename) == REDIS_OK) {
2510 if (server.daemonize) {
2511 unlink(server.pidfile);
2512 }
2513 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2514 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2515 exit(1);
2516 } else {
2517 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2518 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2519 }
2520 }
2521
2522 static void renameGenericCommand(redisClient *c, int nx) {
2523 robj *o;
2524
2525 /* To use the same key as src and dst is probably an error */
2526 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2527 addReply(c,shared.sameobjecterr);
2528 return;
2529 }
2530
2531 o = lookupKeyWrite(c->db,c->argv[1]);
2532 if (o == NULL) {
2533 addReply(c,shared.nokeyerr);
2534 return;
2535 }
2536 incrRefCount(o);
2537 deleteIfVolatile(c->db,c->argv[2]);
2538 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2539 if (nx) {
2540 decrRefCount(o);
2541 addReply(c,shared.czero);
2542 return;
2543 }
2544 dictReplace(c->db->dict,c->argv[2],o);
2545 } else {
2546 incrRefCount(c->argv[2]);
2547 }
2548 deleteKey(c->db,c->argv[1]);
2549 server.dirty++;
2550 addReply(c,nx ? shared.cone : shared.ok);
2551 }
2552
2553 static void renameCommand(redisClient *c) {
2554 renameGenericCommand(c,0);
2555 }
2556
2557 static void renamenxCommand(redisClient *c) {
2558 renameGenericCommand(c,1);
2559 }
2560
2561 static void moveCommand(redisClient *c) {
2562 robj *o;
2563 redisDb *src, *dst;
2564 int srcid;
2565
2566 /* Obtain source and target DB pointers */
2567 src = c->db;
2568 srcid = c->db->id;
2569 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2570 addReply(c,shared.outofrangeerr);
2571 return;
2572 }
2573 dst = c->db;
2574 selectDb(c,srcid); /* Back to the source DB */
2575
2576 /* If the user is moving using as target the same
2577 * DB as the source DB it is probably an error. */
2578 if (src == dst) {
2579 addReply(c,shared.sameobjecterr);
2580 return;
2581 }
2582
2583 /* Check if the element exists and get a reference */
2584 o = lookupKeyWrite(c->db,c->argv[1]);
2585 if (!o) {
2586 addReply(c,shared.czero);
2587 return;
2588 }
2589
2590 /* Try to add the element to the target DB */
2591 deleteIfVolatile(dst,c->argv[1]);
2592 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2593 addReply(c,shared.czero);
2594 return;
2595 }
2596 incrRefCount(c->argv[1]);
2597 incrRefCount(o);
2598
2599 /* OK! key moved, free the entry in the source DB */
2600 deleteKey(src,c->argv[1]);
2601 server.dirty++;
2602 addReply(c,shared.cone);
2603 }
2604
2605 /* =================================== Lists ================================ */
2606 static void pushGenericCommand(redisClient *c, int where) {
2607 robj *lobj;
2608 list *list;
2609
2610 lobj = lookupKeyWrite(c->db,c->argv[1]);
2611 if (lobj == NULL) {
2612 lobj = createListObject();
2613 list = lobj->ptr;
2614 if (where == REDIS_HEAD) {
2615 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2616 } else {
2617 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2618 }
2619 dictAdd(c->db->dict,c->argv[1],lobj);
2620 incrRefCount(c->argv[1]);
2621 incrRefCount(c->argv[2]);
2622 } else {
2623 if (lobj->type != REDIS_LIST) {
2624 addReply(c,shared.wrongtypeerr);
2625 return;
2626 }
2627 list = lobj->ptr;
2628 if (where == REDIS_HEAD) {
2629 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2630 } else {
2631 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2632 }
2633 incrRefCount(c->argv[2]);
2634 }
2635 server.dirty++;
2636 addReply(c,shared.ok);
2637 }
2638
2639 static void lpushCommand(redisClient *c) {
2640 pushGenericCommand(c,REDIS_HEAD);
2641 }
2642
2643 static void rpushCommand(redisClient *c) {
2644 pushGenericCommand(c,REDIS_TAIL);
2645 }
2646
2647 static void llenCommand(redisClient *c) {
2648 robj *o;
2649 list *l;
2650
2651 o = lookupKeyRead(c->db,c->argv[1]);
2652 if (o == NULL) {
2653 addReply(c,shared.czero);
2654 return;
2655 } else {
2656 if (o->type != REDIS_LIST) {
2657 addReply(c,shared.wrongtypeerr);
2658 } else {
2659 l = o->ptr;
2660 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
2661 }
2662 }
2663 }
2664
2665 static void lindexCommand(redisClient *c) {
2666 robj *o;
2667 int index = atoi(c->argv[2]->ptr);
2668
2669 o = lookupKeyRead(c->db,c->argv[1]);
2670 if (o == NULL) {
2671 addReply(c,shared.nullbulk);
2672 } else {
2673 if (o->type != REDIS_LIST) {
2674 addReply(c,shared.wrongtypeerr);
2675 } else {
2676 list *list = o->ptr;
2677 listNode *ln;
2678
2679 ln = listIndex(list, index);
2680 if (ln == NULL) {
2681 addReply(c,shared.nullbulk);
2682 } else {
2683 robj *ele = listNodeValue(ln);
2684 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2685 addReply(c,ele);
2686 addReply(c,shared.crlf);
2687 }
2688 }
2689 }
2690 }
2691
2692 static void lsetCommand(redisClient *c) {
2693 robj *o;
2694 int index = atoi(c->argv[2]->ptr);
2695
2696 o = lookupKeyWrite(c->db,c->argv[1]);
2697 if (o == NULL) {
2698 addReply(c,shared.nokeyerr);
2699 } else {
2700 if (o->type != REDIS_LIST) {
2701 addReply(c,shared.wrongtypeerr);
2702 } else {
2703 list *list = o->ptr;
2704 listNode *ln;
2705
2706 ln = listIndex(list, index);
2707 if (ln == NULL) {
2708 addReply(c,shared.outofrangeerr);
2709 } else {
2710 robj *ele = listNodeValue(ln);
2711
2712 decrRefCount(ele);
2713 listNodeValue(ln) = c->argv[3];
2714 incrRefCount(c->argv[3]);
2715 addReply(c,shared.ok);
2716 server.dirty++;
2717 }
2718 }
2719 }
2720 }
2721
2722 static void popGenericCommand(redisClient *c, int where) {
2723 robj *o;
2724
2725 o = lookupKeyWrite(c->db,c->argv[1]);
2726 if (o == NULL) {
2727 addReply(c,shared.nullbulk);
2728 } else {
2729 if (o->type != REDIS_LIST) {
2730 addReply(c,shared.wrongtypeerr);
2731 } else {
2732 list *list = o->ptr;
2733 listNode *ln;
2734
2735 if (where == REDIS_HEAD)
2736 ln = listFirst(list);
2737 else
2738 ln = listLast(list);
2739
2740 if (ln == NULL) {
2741 addReply(c,shared.nullbulk);
2742 } else {
2743 robj *ele = listNodeValue(ln);
2744 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2745 addReply(c,ele);
2746 addReply(c,shared.crlf);
2747 listDelNode(list,ln);
2748 server.dirty++;
2749 }
2750 }
2751 }
2752 }
2753
2754 static void lpopCommand(redisClient *c) {
2755 popGenericCommand(c,REDIS_HEAD);
2756 }
2757
2758 static void rpopCommand(redisClient *c) {
2759 popGenericCommand(c,REDIS_TAIL);
2760 }
2761
2762 static void lrangeCommand(redisClient *c) {
2763 robj *o;
2764 int start = atoi(c->argv[2]->ptr);
2765 int end = atoi(c->argv[3]->ptr);
2766
2767 o = lookupKeyRead(c->db,c->argv[1]);
2768 if (o == NULL) {
2769 addReply(c,shared.nullmultibulk);
2770 } else {
2771 if (o->type != REDIS_LIST) {
2772 addReply(c,shared.wrongtypeerr);
2773 } else {
2774 list *list = o->ptr;
2775 listNode *ln;
2776 int llen = listLength(list);
2777 int rangelen, j;
2778 robj *ele;
2779
2780 /* convert negative indexes */
2781 if (start < 0) start = llen+start;
2782 if (end < 0) end = llen+end;
2783 if (start < 0) start = 0;
2784 if (end < 0) end = 0;
2785
2786 /* indexes sanity checks */
2787 if (start > end || start >= llen) {
2788 /* Out of range start or start > end result in empty list */
2789 addReply(c,shared.emptymultibulk);
2790 return;
2791 }
2792 if (end >= llen) end = llen-1;
2793 rangelen = (end-start)+1;
2794
2795 /* Return the result in form of a multi-bulk reply */
2796 ln = listIndex(list, start);
2797 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
2798 for (j = 0; j < rangelen; j++) {
2799 ele = listNodeValue(ln);
2800 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2801 addReply(c,ele);
2802 addReply(c,shared.crlf);
2803 ln = ln->next;
2804 }
2805 }
2806 }
2807 }
2808
2809 static void ltrimCommand(redisClient *c) {
2810 robj *o;
2811 int start = atoi(c->argv[2]->ptr);
2812 int end = atoi(c->argv[3]->ptr);
2813
2814 o = lookupKeyWrite(c->db,c->argv[1]);
2815 if (o == NULL) {
2816 addReply(c,shared.nokeyerr);
2817 } else {
2818 if (o->type != REDIS_LIST) {
2819 addReply(c,shared.wrongtypeerr);
2820 } else {
2821 list *list = o->ptr;
2822 listNode *ln;
2823 int llen = listLength(list);
2824 int j, ltrim, rtrim;
2825
2826 /* convert negative indexes */
2827 if (start < 0) start = llen+start;
2828 if (end < 0) end = llen+end;
2829 if (start < 0) start = 0;
2830 if (end < 0) end = 0;
2831
2832 /* indexes sanity checks */
2833 if (start > end || start >= llen) {
2834 /* Out of range start or start > end result in empty list */
2835 ltrim = llen;
2836 rtrim = 0;
2837 } else {
2838 if (end >= llen) end = llen-1;
2839 ltrim = start;
2840 rtrim = llen-end-1;
2841 }
2842
2843 /* Remove list elements to perform the trim */
2844 for (j = 0; j < ltrim; j++) {
2845 ln = listFirst(list);
2846 listDelNode(list,ln);
2847 }
2848 for (j = 0; j < rtrim; j++) {
2849 ln = listLast(list);
2850 listDelNode(list,ln);
2851 }
2852 addReply(c,shared.ok);
2853 server.dirty++;
2854 }
2855 }
2856 }
2857
2858 static void lremCommand(redisClient *c) {
2859 robj *o;
2860
2861 o = lookupKeyWrite(c->db,c->argv[1]);
2862 if (o == NULL) {
2863 addReply(c,shared.czero);
2864 } else {
2865 if (o->type != REDIS_LIST) {
2866 addReply(c,shared.wrongtypeerr);
2867 } else {
2868 list *list = o->ptr;
2869 listNode *ln, *next;
2870 int toremove = atoi(c->argv[2]->ptr);
2871 int removed = 0;
2872 int fromtail = 0;
2873
2874 if (toremove < 0) {
2875 toremove = -toremove;
2876 fromtail = 1;
2877 }
2878 ln = fromtail ? list->tail : list->head;
2879 while (ln) {
2880 robj *ele = listNodeValue(ln);
2881
2882 next = fromtail ? ln->prev : ln->next;
2883 if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) {
2884 listDelNode(list,ln);
2885 server.dirty++;
2886 removed++;
2887 if (toremove && removed == toremove) break;
2888 }
2889 ln = next;
2890 }
2891 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
2892 }
2893 }
2894 }
2895
2896 /* ==================================== Sets ================================ */
2897
2898 static void saddCommand(redisClient *c) {
2899 robj *set;
2900
2901 set = lookupKeyWrite(c->db,c->argv[1]);
2902 if (set == NULL) {
2903 set = createSetObject();
2904 dictAdd(c->db->dict,c->argv[1],set);
2905 incrRefCount(c->argv[1]);
2906 } else {
2907 if (set->type != REDIS_SET) {
2908 addReply(c,shared.wrongtypeerr);
2909 return;
2910 }
2911 }
2912 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
2913 incrRefCount(c->argv[2]);
2914 server.dirty++;
2915 addReply(c,shared.cone);
2916 } else {
2917 addReply(c,shared.czero);
2918 }
2919 }
2920
2921 static void sremCommand(redisClient *c) {
2922 robj *set;
2923
2924 set = lookupKeyWrite(c->db,c->argv[1]);
2925 if (set == NULL) {
2926 addReply(c,shared.czero);
2927 } else {
2928 if (set->type != REDIS_SET) {
2929 addReply(c,shared.wrongtypeerr);
2930 return;
2931 }
2932 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
2933 server.dirty++;
2934 addReply(c,shared.cone);
2935 } else {
2936 addReply(c,shared.czero);
2937 }
2938 }
2939 }
2940
2941 static void smoveCommand(redisClient *c) {
2942 robj *srcset, *dstset;
2943
2944 srcset = lookupKeyWrite(c->db,c->argv[1]);
2945 dstset = lookupKeyWrite(c->db,c->argv[2]);
2946
2947 /* If the source key does not exist return 0, if it's of the wrong type
2948 * raise an error */
2949 if (srcset == NULL || srcset->type != REDIS_SET) {
2950 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
2951 return;
2952 }
2953 /* Error if the destination key is not a set as well */
2954 if (dstset && dstset->type != REDIS_SET) {
2955 addReply(c,shared.wrongtypeerr);
2956 return;
2957 }
2958 /* Remove the element from the source set */
2959 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
2960 /* Key not found in the src set! return zero */
2961 addReply(c,shared.czero);
2962 return;
2963 }
2964 server.dirty++;
2965 /* Add the element to the destination set */
2966 if (!dstset) {
2967 dstset = createSetObject();
2968 dictAdd(c->db->dict,c->argv[2],dstset);
2969 incrRefCount(c->argv[2]);
2970 }
2971 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
2972 incrRefCount(c->argv[3]);
2973 addReply(c,shared.cone);
2974 }
2975
2976 static void sismemberCommand(redisClient *c) {
2977 robj *set;
2978
2979 set = lookupKeyRead(c->db,c->argv[1]);
2980 if (set == NULL) {
2981 addReply(c,shared.czero);
2982 } else {
2983 if (set->type != REDIS_SET) {
2984 addReply(c,shared.wrongtypeerr);
2985 return;
2986 }
2987 if (dictFind(set->ptr,c->argv[2]))
2988 addReply(c,shared.cone);
2989 else
2990 addReply(c,shared.czero);
2991 }
2992 }
2993
2994 static void scardCommand(redisClient *c) {
2995 robj *o;
2996 dict *s;
2997
2998 o = lookupKeyRead(c->db,c->argv[1]);
2999 if (o == NULL) {
3000 addReply(c,shared.czero);
3001 return;
3002 } else {
3003 if (o->type != REDIS_SET) {
3004 addReply(c,shared.wrongtypeerr);
3005 } else {
3006 s = o->ptr;
3007 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3008 dictSize(s)));
3009 }
3010 }
3011 }
3012
3013 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3014 dict **d1 = (void*) s1, **d2 = (void*) s2;
3015
3016 return dictSize(*d1)-dictSize(*d2);
3017 }
3018
3019 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3020 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3021 dictIterator *di;
3022 dictEntry *de;
3023 robj *lenobj = NULL, *dstset = NULL;
3024 int j, cardinality = 0;
3025
3026 if (!dv) oom("sinterGenericCommand");
3027 for (j = 0; j < setsnum; j++) {
3028 robj *setobj;
3029
3030 setobj = dstkey ?
3031 lookupKeyWrite(c->db,setskeys[j]) :
3032 lookupKeyRead(c->db,setskeys[j]);
3033 if (!setobj) {
3034 zfree(dv);
3035 if (dstkey) {
3036 deleteKey(c->db,dstkey);
3037 addReply(c,shared.ok);
3038 } else {
3039 addReply(c,shared.nullmultibulk);
3040 }
3041 return;
3042 }
3043 if (setobj->type != REDIS_SET) {
3044 zfree(dv);
3045 addReply(c,shared.wrongtypeerr);
3046 return;
3047 }
3048 dv[j] = setobj->ptr;
3049 }
3050 /* Sort sets from the smallest to largest, this will improve our
3051 * algorithm's performace */
3052 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3053
3054 /* The first thing we should output is the total number of elements...
3055 * since this is a multi-bulk write, but at this stage we don't know
3056 * the intersection set size, so we use a trick, append an empty object
3057 * to the output list and save the pointer to later modify it with the
3058 * right length */
3059 if (!dstkey) {
3060 lenobj = createObject(REDIS_STRING,NULL);
3061 addReply(c,lenobj);
3062 decrRefCount(lenobj);
3063 } else {
3064 /* If we have a target key where to store the resulting set
3065 * create this key with an empty set inside */
3066 dstset = createSetObject();
3067 }
3068
3069 /* Iterate all the elements of the first (smallest) set, and test
3070 * the element against all the other sets, if at least one set does
3071 * not include the element it is discarded */
3072 di = dictGetIterator(dv[0]);
3073 if (!di) oom("dictGetIterator");
3074
3075 while((de = dictNext(di)) != NULL) {
3076 robj *ele;
3077
3078 for (j = 1; j < setsnum; j++)
3079 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3080 if (j != setsnum)
3081 continue; /* at least one set does not contain the member */
3082 ele = dictGetEntryKey(de);
3083 if (!dstkey) {
3084 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
3085 addReply(c,ele);
3086 addReply(c,shared.crlf);
3087 cardinality++;
3088 } else {
3089 dictAdd(dstset->ptr,ele,NULL);
3090 incrRefCount(ele);
3091 }
3092 }
3093 dictReleaseIterator(di);
3094
3095 if (dstkey) {
3096 /* Store the resulting set into the target */
3097 deleteKey(c->db,dstkey);
3098 dictAdd(c->db->dict,dstkey,dstset);
3099 incrRefCount(dstkey);
3100 }
3101
3102 if (!dstkey) {
3103 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3104 } else {
3105 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3106 dictSize((dict*)dstset->ptr)));
3107 server.dirty++;
3108 }
3109 zfree(dv);
3110 }
3111
3112 static void sinterCommand(redisClient *c) {
3113 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3114 }
3115
3116 static void sinterstoreCommand(redisClient *c) {
3117 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3118 }
3119
3120 #define REDIS_OP_UNION 0
3121 #define REDIS_OP_DIFF 1
3122
3123 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3124 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3125 dictIterator *di;
3126 dictEntry *de;
3127 robj *dstset = NULL;
3128 int j, cardinality = 0;
3129
3130 if (!dv) oom("sunionDiffGenericCommand");
3131 for (j = 0; j < setsnum; j++) {
3132 robj *setobj;
3133
3134 setobj = dstkey ?
3135 lookupKeyWrite(c->db,setskeys[j]) :
3136 lookupKeyRead(c->db,setskeys[j]);
3137 if (!setobj) {
3138 dv[j] = NULL;
3139 continue;
3140 }
3141 if (setobj->type != REDIS_SET) {
3142 zfree(dv);
3143 addReply(c,shared.wrongtypeerr);
3144 return;
3145 }
3146 dv[j] = setobj->ptr;
3147 }
3148
3149 /* We need a temp set object to store our union. If the dstkey
3150 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3151 * this set object will be the resulting object to set into the target key*/
3152 dstset = createSetObject();
3153
3154 /* Iterate all the elements of all the sets, add every element a single
3155 * time to the result set */
3156 for (j = 0; j < setsnum; j++) {
3157 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3158 if (!dv[j]) continue; /* non existing keys are like empty sets */
3159
3160 di = dictGetIterator(dv[j]);
3161 if (!di) oom("dictGetIterator");
3162
3163 while((de = dictNext(di)) != NULL) {
3164 robj *ele;
3165
3166 /* dictAdd will not add the same element multiple times */
3167 ele = dictGetEntryKey(de);
3168 if (op == REDIS_OP_UNION || j == 0) {
3169 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3170 incrRefCount(ele);
3171 cardinality++;
3172 }
3173 } else if (op == REDIS_OP_DIFF) {
3174 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3175 cardinality--;
3176 }
3177 }
3178 }
3179 dictReleaseIterator(di);
3180
3181 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3182 }
3183
3184 /* Output the content of the resulting set, if not in STORE mode */
3185 if (!dstkey) {
3186 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3187 di = dictGetIterator(dstset->ptr);
3188 if (!di) oom("dictGetIterator");
3189 while((de = dictNext(di)) != NULL) {
3190 robj *ele;
3191
3192 ele = dictGetEntryKey(de);
3193 addReplySds(c,sdscatprintf(sdsempty(),
3194 "$%d\r\n",sdslen(ele->ptr)));
3195 addReply(c,ele);
3196 addReply(c,shared.crlf);
3197 }
3198 dictReleaseIterator(di);
3199 } else {
3200 /* If we have a target key where to store the resulting set
3201 * create this key with the result set inside */
3202 deleteKey(c->db,dstkey);
3203 dictAdd(c->db->dict,dstkey,dstset);
3204 incrRefCount(dstkey);
3205 }
3206
3207 /* Cleanup */
3208 if (!dstkey) {
3209 decrRefCount(dstset);
3210 } else {
3211 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3212 dictSize((dict*)dstset->ptr)));
3213 server.dirty++;
3214 }
3215 zfree(dv);
3216 }
3217
3218 static void sunionCommand(redisClient *c) {
3219 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3220 }
3221
3222 static void sunionstoreCommand(redisClient *c) {
3223 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3224 }
3225
3226 static void sdiffCommand(redisClient *c) {
3227 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3228 }
3229
3230 static void sdiffstoreCommand(redisClient *c) {
3231 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3232 }
3233
3234 static void flushdbCommand(redisClient *c) {
3235 server.dirty += dictSize(c->db->dict);
3236 dictEmpty(c->db->dict);
3237 dictEmpty(c->db->expires);
3238 addReply(c,shared.ok);
3239 }
3240
3241 static void flushallCommand(redisClient *c) {
3242 server.dirty += emptyDb();
3243 addReply(c,shared.ok);
3244 rdbSave(server.dbfilename);
3245 server.dirty++;
3246 }
3247
3248 redisSortOperation *createSortOperation(int type, robj *pattern) {
3249 redisSortOperation *so = zmalloc(sizeof(*so));
3250 if (!so) oom("createSortOperation");
3251 so->type = type;
3252 so->pattern = pattern;
3253 return so;
3254 }
3255
3256 /* Return the value associated to the key with a name obtained
3257 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3258 robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3259 char *p;
3260 sds spat, ssub;
3261 robj keyobj;
3262 int prefixlen, sublen, postfixlen;
3263 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3264 struct {
3265 long len;
3266 long free;
3267 char buf[REDIS_SORTKEY_MAX+1];
3268 } keyname;
3269
3270 spat = pattern->ptr;
3271 ssub = subst->ptr;
3272 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3273 p = strchr(spat,'*');
3274 if (!p) return NULL;
3275
3276 prefixlen = p-spat;
3277 sublen = sdslen(ssub);
3278 postfixlen = sdslen(spat)-(prefixlen+1);
3279 memcpy(keyname.buf,spat,prefixlen);
3280 memcpy(keyname.buf+prefixlen,ssub,sublen);
3281 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3282 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3283 keyname.len = prefixlen+sublen+postfixlen;
3284
3285 keyobj.refcount = 1;
3286 keyobj.type = REDIS_STRING;
3287 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3288
3289 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3290 return lookupKeyRead(db,&keyobj);
3291 }
3292
3293 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3294 * the additional parameter is not standard but a BSD-specific we have to
3295 * pass sorting parameters via the global 'server' structure */
3296 static int sortCompare(const void *s1, const void *s2) {
3297 const redisSortObject *so1 = s1, *so2 = s2;
3298 int cmp;
3299
3300 if (!server.sort_alpha) {
3301 /* Numeric sorting. Here it's trivial as we precomputed scores */
3302 if (so1->u.score > so2->u.score) {
3303 cmp = 1;
3304 } else if (so1->u.score < so2->u.score) {
3305 cmp = -1;
3306 } else {
3307 cmp = 0;
3308 }
3309 } else {
3310 /* Alphanumeric sorting */
3311 if (server.sort_bypattern) {
3312 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3313 /* At least one compare object is NULL */
3314 if (so1->u.cmpobj == so2->u.cmpobj)
3315 cmp = 0;
3316 else if (so1->u.cmpobj == NULL)
3317 cmp = -1;
3318 else
3319 cmp = 1;
3320 } else {
3321 /* We have both the objects, use strcoll */
3322 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3323 }
3324 } else {
3325 /* Compare elements directly */
3326 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3327 }
3328 }
3329 return server.sort_desc ? -cmp : cmp;
3330 }
3331
3332 /* The SORT command is the most complex command in Redis. Warning: this code
3333 * is optimized for speed and a bit less for readability */
3334 static void sortCommand(redisClient *c) {
3335 list *operations;
3336 int outputlen = 0;
3337 int desc = 0, alpha = 0;
3338 int limit_start = 0, limit_count = -1, start, end;
3339 int j, dontsort = 0, vectorlen;
3340 int getop = 0; /* GET operation counter */
3341 robj *sortval, *sortby = NULL;
3342 redisSortObject *vector; /* Resulting vector to sort */
3343
3344 /* Lookup the key to sort. It must be of the right types */
3345 sortval = lookupKeyRead(c->db,c->argv[1]);
3346 if (sortval == NULL) {
3347 addReply(c,shared.nokeyerr);
3348 return;
3349 }
3350 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3351 addReply(c,shared.wrongtypeerr);
3352 return;
3353 }
3354
3355 /* Create a list of operations to perform for every sorted element.
3356 * Operations can be GET/DEL/INCR/DECR */
3357 operations = listCreate();
3358 listSetFreeMethod(operations,zfree);
3359 j = 2;
3360
3361 /* Now we need to protect sortval incrementing its count, in the future
3362 * SORT may have options able to overwrite/delete keys during the sorting
3363 * and the sorted key itself may get destroied */
3364 incrRefCount(sortval);
3365
3366 /* The SORT command has an SQL-alike syntax, parse it */
3367 while(j < c->argc) {
3368 int leftargs = c->argc-j-1;
3369 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3370 desc = 0;
3371 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3372 desc = 1;
3373 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3374 alpha = 1;
3375 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3376 limit_start = atoi(c->argv[j+1]->ptr);
3377 limit_count = atoi(c->argv[j+2]->ptr);
3378 j+=2;
3379 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3380 sortby = c->argv[j+1];
3381 /* If the BY pattern does not contain '*', i.e. it is constant,
3382 * we don't need to sort nor to lookup the weight keys. */
3383 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3384 j++;
3385 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3386 listAddNodeTail(operations,createSortOperation(
3387 REDIS_SORT_GET,c->argv[j+1]));
3388 getop++;
3389 j++;
3390 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3391 listAddNodeTail(operations,createSortOperation(
3392 REDIS_SORT_DEL,c->argv[j+1]));
3393 j++;
3394 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3395 listAddNodeTail(operations,createSortOperation(
3396 REDIS_SORT_INCR,c->argv[j+1]));
3397 j++;
3398 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3399 listAddNodeTail(operations,createSortOperation(
3400 REDIS_SORT_DECR,c->argv[j+1]));
3401 j++;
3402 } else {
3403 decrRefCount(sortval);
3404 listRelease(operations);
3405 addReply(c,shared.syntaxerr);
3406 return;
3407 }
3408 j++;
3409 }
3410
3411 /* Load the sorting vector with all the objects to sort */
3412 vectorlen = (sortval->type == REDIS_LIST) ?
3413 listLength((list*)sortval->ptr) :
3414 dictSize((dict*)sortval->ptr);
3415 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3416 if (!vector) oom("allocating objects vector for SORT");
3417 j = 0;
3418 if (sortval->type == REDIS_LIST) {
3419 list *list = sortval->ptr;
3420 listNode *ln;
3421
3422 listRewind(list);
3423 while((ln = listYield(list))) {
3424 robj *ele = ln->value;
3425 vector[j].obj = ele;
3426 vector[j].u.score = 0;
3427 vector[j].u.cmpobj = NULL;
3428 j++;
3429 }
3430 } else {
3431 dict *set = sortval->ptr;
3432 dictIterator *di;
3433 dictEntry *setele;
3434
3435 di = dictGetIterator(set);
3436 if (!di) oom("dictGetIterator");
3437 while((setele = dictNext(di)) != NULL) {
3438 vector[j].obj = dictGetEntryKey(setele);
3439 vector[j].u.score = 0;
3440 vector[j].u.cmpobj = NULL;
3441 j++;
3442 }
3443 dictReleaseIterator(di);
3444 }
3445 assert(j == vectorlen);
3446
3447 /* Now it's time to load the right scores in the sorting vector */
3448 if (dontsort == 0) {
3449 for (j = 0; j < vectorlen; j++) {
3450 if (sortby) {
3451 robj *byval;
3452
3453 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3454 if (!byval || byval->type != REDIS_STRING) continue;
3455 if (alpha) {
3456 vector[j].u.cmpobj = byval;
3457 incrRefCount(byval);
3458 } else {
3459 vector[j].u.score = strtod(byval->ptr,NULL);
3460 }
3461 } else {
3462 if (!alpha) vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3463 }
3464 }
3465 }
3466
3467 /* We are ready to sort the vector... perform a bit of sanity check
3468 * on the LIMIT option too. We'll use a partial version of quicksort. */
3469 start = (limit_start < 0) ? 0 : limit_start;
3470 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3471 if (start >= vectorlen) {
3472 start = vectorlen-1;
3473 end = vectorlen-2;
3474 }
3475 if (end >= vectorlen) end = vectorlen-1;
3476
3477 if (dontsort == 0) {
3478 server.sort_desc = desc;
3479 server.sort_alpha = alpha;
3480 server.sort_bypattern = sortby ? 1 : 0;
3481 if (sortby && (start != 0 || end != vectorlen-1))
3482 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3483 else
3484 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3485 }
3486
3487 /* Send command output to the output buffer, performing the specified
3488 * GET/DEL/INCR/DECR operations if any. */
3489 outputlen = getop ? getop*(end-start+1) : end-start+1;
3490 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3491 for (j = start; j <= end; j++) {
3492 listNode *ln;
3493 if (!getop) {
3494 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3495 sdslen(vector[j].obj->ptr)));
3496 addReply(c,vector[j].obj);
3497 addReply(c,shared.crlf);
3498 }
3499 listRewind(operations);
3500 while((ln = listYield(operations))) {
3501 redisSortOperation *sop = ln->value;
3502 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3503 vector[j].obj);
3504
3505 if (sop->type == REDIS_SORT_GET) {
3506 if (!val || val->type != REDIS_STRING) {
3507 addReply(c,shared.nullbulk);
3508 } else {
3509 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3510 sdslen(val->ptr)));
3511 addReply(c,val);
3512 addReply(c,shared.crlf);
3513 }
3514 } else if (sop->type == REDIS_SORT_DEL) {
3515 /* TODO */
3516 }
3517 }
3518 }
3519
3520 /* Cleanup */
3521 decrRefCount(sortval);
3522 listRelease(operations);
3523 for (j = 0; j < vectorlen; j++) {
3524 if (sortby && alpha && vector[j].u.cmpobj)
3525 decrRefCount(vector[j].u.cmpobj);
3526 }
3527 zfree(vector);
3528 }
3529
3530 static void infoCommand(redisClient *c) {
3531 sds info;
3532 time_t uptime = time(NULL)-server.stat_starttime;
3533
3534 info = sdscatprintf(sdsempty(),
3535 "redis_version:%s\r\n"
3536 "uptime_in_seconds:%d\r\n"
3537 "uptime_in_days:%d\r\n"
3538 "connected_clients:%d\r\n"
3539 "connected_slaves:%d\r\n"
3540 "used_memory:%zu\r\n"
3541 "changes_since_last_save:%lld\r\n"
3542 "bgsave_in_progress:%d\r\n"
3543 "last_save_time:%d\r\n"
3544 "total_connections_received:%lld\r\n"
3545 "total_commands_processed:%lld\r\n"
3546 "role:%s\r\n"
3547 ,REDIS_VERSION,
3548 uptime,
3549 uptime/(3600*24),
3550 listLength(server.clients)-listLength(server.slaves),
3551 listLength(server.slaves),
3552 server.usedmemory,
3553 server.dirty,
3554 server.bgsaveinprogress,
3555 server.lastsave,
3556 server.stat_numconnections,
3557 server.stat_numcommands,
3558 server.masterhost == NULL ? "master" : "slave"
3559 );
3560 if (server.masterhost) {
3561 info = sdscatprintf(info,
3562 "master_host:%s\r\n"
3563 "master_port:%d\r\n"
3564 "master_link_status:%s\r\n"
3565 "master_last_io_seconds_ago:%d\r\n"
3566 ,server.masterhost,
3567 server.masterport,
3568 (server.replstate == REDIS_REPL_CONNECTED) ?
3569 "up" : "down",
3570 (int)(time(NULL)-server.master->lastinteraction)
3571 );
3572 }
3573 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
3574 addReplySds(c,info);
3575 addReply(c,shared.crlf);
3576 }
3577
3578 static void monitorCommand(redisClient *c) {
3579 /* ignore MONITOR if aleady slave or in monitor mode */
3580 if (c->flags & REDIS_SLAVE) return;
3581
3582 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3583 c->slaveseldb = 0;
3584 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3585 addReply(c,shared.ok);
3586 }
3587
3588 /* ================================= Expire ================================= */
3589 static int removeExpire(redisDb *db, robj *key) {
3590 if (dictDelete(db->expires,key) == DICT_OK) {
3591 return 1;
3592 } else {
3593 return 0;
3594 }
3595 }
3596
3597 static int setExpire(redisDb *db, robj *key, time_t when) {
3598 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
3599 return 0;
3600 } else {
3601 incrRefCount(key);
3602 return 1;
3603 }
3604 }
3605
3606 /* Return the expire time of the specified key, or -1 if no expire
3607 * is associated with this key (i.e. the key is non volatile) */
3608 static time_t getExpire(redisDb *db, robj *key) {
3609 dictEntry *de;
3610
3611 /* No expire? return ASAP */
3612 if (dictSize(db->expires) == 0 ||
3613 (de = dictFind(db->expires,key)) == NULL) return -1;
3614
3615 return (time_t) dictGetEntryVal(de);
3616 }
3617
3618 static int expireIfNeeded(redisDb *db, robj *key) {
3619 time_t when;
3620 dictEntry *de;
3621
3622 /* No expire? return ASAP */
3623 if (dictSize(db->expires) == 0 ||
3624 (de = dictFind(db->expires,key)) == NULL) return 0;
3625
3626 /* Lookup the expire */
3627 when = (time_t) dictGetEntryVal(de);
3628 if (time(NULL) <= when) return 0;
3629
3630 /* Delete the key */
3631 dictDelete(db->expires,key);
3632 return dictDelete(db->dict,key) == DICT_OK;
3633 }
3634
3635 static int deleteIfVolatile(redisDb *db, robj *key) {
3636 dictEntry *de;
3637
3638 /* No expire? return ASAP */
3639 if (dictSize(db->expires) == 0 ||
3640 (de = dictFind(db->expires,key)) == NULL) return 0;
3641
3642 /* Delete the key */
3643 server.dirty++;
3644 dictDelete(db->expires,key);
3645 return dictDelete(db->dict,key) == DICT_OK;
3646 }
3647
3648 static void expireCommand(redisClient *c) {
3649 dictEntry *de;
3650 int seconds = atoi(c->argv[2]->ptr);
3651
3652 de = dictFind(c->db->dict,c->argv[1]);
3653 if (de == NULL) {
3654 addReply(c,shared.czero);
3655 return;
3656 }
3657 if (seconds <= 0) {
3658 addReply(c, shared.czero);
3659 return;
3660 } else {
3661 time_t when = time(NULL)+seconds;
3662 if (setExpire(c->db,c->argv[1],when))
3663 addReply(c,shared.cone);
3664 else
3665 addReply(c,shared.czero);
3666 return;
3667 }
3668 }
3669
3670 static void ttlCommand(redisClient *c) {
3671 time_t expire;
3672 int ttl = -1;
3673
3674 expire = getExpire(c->db,c->argv[1]);
3675 if (expire != -1) {
3676 ttl = (int) (expire-time(NULL));
3677 if (ttl < 0) ttl = -1;
3678 }
3679 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
3680 }
3681
3682 /* =============================== Replication ============================= */
3683
3684 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
3685 ssize_t nwritten, ret = size;
3686 time_t start = time(NULL);
3687
3688 timeout++;
3689 while(size) {
3690 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
3691 nwritten = write(fd,ptr,size);
3692 if (nwritten == -1) return -1;
3693 ptr += nwritten;
3694 size -= nwritten;
3695 }
3696 if ((time(NULL)-start) > timeout) {
3697 errno = ETIMEDOUT;
3698 return -1;
3699 }
3700 }
3701 return ret;
3702 }
3703
3704 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
3705 ssize_t nread, totread = 0;
3706 time_t start = time(NULL);
3707
3708 timeout++;
3709 while(size) {
3710 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
3711 nread = read(fd,ptr,size);
3712 if (nread == -1) return -1;
3713 ptr += nread;
3714 size -= nread;
3715 totread += nread;
3716 }
3717 if ((time(NULL)-start) > timeout) {
3718 errno = ETIMEDOUT;
3719 return -1;
3720 }
3721 }
3722 return totread;
3723 }
3724
3725 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
3726 ssize_t nread = 0;
3727
3728 size--;
3729 while(size) {
3730 char c;
3731
3732 if (syncRead(fd,&c,1,timeout) == -1) return -1;
3733 if (c == '\n') {
3734 *ptr = '\0';
3735 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
3736 return nread;
3737 } else {
3738 *ptr++ = c;
3739 *ptr = '\0';
3740 nread++;
3741 }
3742 }
3743 return nread;
3744 }
3745
3746 static void syncCommand(redisClient *c) {
3747 /* ignore SYNC if aleady slave or in monitor mode */
3748 if (c->flags & REDIS_SLAVE) return;
3749
3750 /* SYNC can't be issued when the server has pending data to send to
3751 * the client about already issued commands. We need a fresh reply
3752 * buffer registering the differences between the BGSAVE and the current
3753 * dataset, so that we can copy to other slaves if needed. */
3754 if (listLength(c->reply) != 0) {
3755 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3756 return;
3757 }
3758
3759 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
3760 /* Here we need to check if there is a background saving operation
3761 * in progress, or if it is required to start one */
3762 if (server.bgsaveinprogress) {
3763 /* Ok a background save is in progress. Let's check if it is a good
3764 * one for replication, i.e. if there is another slave that is
3765 * registering differences since the server forked to save */
3766 redisClient *slave;
3767 listNode *ln;
3768
3769 listRewind(server.slaves);
3770 while((ln = listYield(server.slaves))) {
3771 slave = ln->value;
3772 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
3773 }
3774 if (ln) {
3775 /* Perfect, the server is already registering differences for
3776 * another slave. Set the right state, and copy the buffer. */
3777 listRelease(c->reply);
3778 c->reply = listDup(slave->reply);
3779 if (!c->reply) oom("listDup copying slave reply list");
3780 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3781 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
3782 } else {
3783 /* No way, we need to wait for the next BGSAVE in order to
3784 * register differences */
3785 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
3786 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
3787 }
3788 } else {
3789 /* Ok we don't have a BGSAVE in progress, let's start one */
3790 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
3791 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3792 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
3793 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
3794 return;
3795 }
3796 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3797 }
3798 c->repldbfd = -1;
3799 c->flags |= REDIS_SLAVE;
3800 c->slaveseldb = 0;
3801 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
3802 return;
3803 }
3804
3805 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
3806 redisClient *slave = privdata;
3807 REDIS_NOTUSED(el);
3808 REDIS_NOTUSED(mask);
3809 char buf[REDIS_IOBUF_LEN];
3810 ssize_t nwritten, buflen;
3811
3812 if (slave->repldboff == 0) {
3813 /* Write the bulk write count before to transfer the DB. In theory here
3814 * we don't know how much room there is in the output buffer of the
3815 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3816 * operations) will never be smaller than the few bytes we need. */
3817 sds bulkcount;
3818
3819 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3820 slave->repldbsize);
3821 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
3822 {
3823 sdsfree(bulkcount);
3824 freeClient(slave);
3825 return;
3826 }
3827 sdsfree(bulkcount);
3828 }
3829 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
3830 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
3831 if (buflen <= 0) {
3832 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
3833 (buflen == 0) ? "premature EOF" : strerror(errno));
3834 freeClient(slave);
3835 return;
3836 }
3837 if ((nwritten = write(fd,buf,buflen)) == -1) {
3838 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
3839 strerror(errno));
3840 freeClient(slave);
3841 return;
3842 }
3843 slave->repldboff += nwritten;
3844 if (slave->repldboff == slave->repldbsize) {
3845 close(slave->repldbfd);
3846 slave->repldbfd = -1;
3847 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3848 slave->replstate = REDIS_REPL_ONLINE;
3849 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
3850 sendReplyToClient, slave, NULL) == AE_ERR) {
3851 freeClient(slave);
3852 return;
3853 }
3854 addReplySds(slave,sdsempty());
3855 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
3856 }
3857 }
3858
3859 static void updateSalvesWaitingBgsave(int bgsaveerr) {
3860 listNode *ln;
3861 int startbgsave = 0;
3862
3863 listRewind(server.slaves);
3864 while((ln = listYield(server.slaves))) {
3865 redisClient *slave = ln->value;
3866
3867 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
3868 startbgsave = 1;
3869 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3870 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
3871 struct redis_stat buf;
3872
3873 if (bgsaveerr != REDIS_OK) {
3874 freeClient(slave);
3875 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
3876 continue;
3877 }
3878 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
3879 redis_fstat(slave->repldbfd,&buf) == -1) {
3880 freeClient(slave);
3881 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
3882 continue;
3883 }
3884 slave->repldboff = 0;
3885 slave->repldbsize = buf.st_size;
3886 slave->replstate = REDIS_REPL_SEND_BULK;
3887 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3888 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
3889 freeClient(slave);
3890 continue;
3891 }
3892 }
3893 }
3894 if (startbgsave) {
3895 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3896 listRewind(server.slaves);
3897 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
3898 while((ln = listYield(server.slaves))) {
3899 redisClient *slave = ln->value;
3900
3901 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
3902 freeClient(slave);
3903 }
3904 }
3905 }
3906 }
3907
3908 static int syncWithMaster(void) {
3909 char buf[1024], tmpfile[256];
3910 int dumpsize;
3911 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
3912 int dfd;
3913
3914 if (fd == -1) {
3915 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
3916 strerror(errno));
3917 return REDIS_ERR;
3918 }
3919 /* Issue the SYNC command */
3920 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
3921 close(fd);
3922 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
3923 strerror(errno));
3924 return REDIS_ERR;
3925 }
3926 /* Read the bulk write count */
3927 if (syncReadLine(fd,buf,1024,3600) == -1) {
3928 close(fd);
3929 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
3930 strerror(errno));
3931 return REDIS_ERR;
3932 }
3933 dumpsize = atoi(buf+1);
3934 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
3935 /* Read the bulk write data on a temp file */
3936 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
3937 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
3938 if (dfd == -1) {
3939 close(fd);
3940 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
3941 return REDIS_ERR;
3942 }
3943 while(dumpsize) {
3944 int nread, nwritten;
3945
3946 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
3947 if (nread == -1) {
3948 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
3949 strerror(errno));
3950 close(fd);
3951 close(dfd);
3952 return REDIS_ERR;
3953 }
3954 nwritten = write(dfd,buf,nread);
3955 if (nwritten == -1) {
3956 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
3957 close(fd);
3958 close(dfd);
3959 return REDIS_ERR;
3960 }
3961 dumpsize -= nread;
3962 }
3963 close(dfd);
3964 if (rename(tmpfile,server.dbfilename) == -1) {
3965 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
3966 unlink(tmpfile);
3967 close(fd);
3968 return REDIS_ERR;
3969 }
3970 emptyDb();
3971 if (rdbLoad(server.dbfilename) != REDIS_OK) {
3972 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
3973 close(fd);
3974 return REDIS_ERR;
3975 }
3976 server.master = createClient(fd);
3977 server.master->flags |= REDIS_MASTER;
3978 server.replstate = REDIS_REPL_CONNECTED;
3979 return REDIS_OK;
3980 }
3981
3982 static void slaveofCommand(redisClient *c) {
3983 if (!strcasecmp(c->argv[1]->ptr,"no") &&
3984 !strcasecmp(c->argv[2]->ptr,"one")) {
3985 if (server.masterhost) {
3986 sdsfree(server.masterhost);
3987 server.masterhost = NULL;
3988 if (server.master) freeClient(server.master);
3989 server.replstate = REDIS_REPL_NONE;
3990 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
3991 }
3992 } else {
3993 sdsfree(server.masterhost);
3994 server.masterhost = sdsdup(c->argv[1]->ptr);
3995 server.masterport = atoi(c->argv[2]->ptr);
3996 if (server.master) freeClient(server.master);
3997 server.replstate = REDIS_REPL_CONNECT;
3998 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
3999 server.masterhost, server.masterport);
4000 }
4001 addReply(c,shared.ok);
4002 }
4003
4004 /* ============================ Maxmemory directive ======================== */
4005
4006 /* This function gets called when 'maxmemory' is set on the config file to limit
4007 * the max memory used by the server, and we are out of memory.
4008 * This function will try to, in order:
4009 *
4010 * - Free objects from the free list
4011 * - Try to remove keys with an EXPIRE set
4012 *
4013 * It is not possible to free enough memory to reach used-memory < maxmemory
4014 * the server will start refusing commands that will enlarge even more the
4015 * memory usage.
4016 */
4017 static void freeMemoryIfNeeded(void) {
4018 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4019 if (listLength(server.objfreelist)) {
4020 robj *o;
4021
4022 listNode *head = listFirst(server.objfreelist);
4023 o = listNodeValue(head);
4024 listDelNode(server.objfreelist,head);
4025 zfree(o);
4026 } else {
4027 int j, k, freed = 0;
4028
4029 for (j = 0; j < server.dbnum; j++) {
4030 int minttl = -1;
4031 robj *minkey = NULL;
4032 struct dictEntry *de;
4033
4034 if (dictSize(server.db[j].expires)) {
4035 freed = 1;
4036 /* From a sample of three keys drop the one nearest to
4037 * the natural expire */
4038 for (k = 0; k < 3; k++) {
4039 time_t t;
4040
4041 de = dictGetRandomKey(server.db[j].expires);
4042 t = (time_t) dictGetEntryVal(de);
4043 if (minttl == -1 || t < minttl) {
4044 minkey = dictGetEntryKey(de);
4045 minttl = t;
4046 }
4047 }
4048 deleteKey(server.db+j,minkey);
4049 }
4050 }
4051 if (!freed) return; /* nothing to free... */
4052 }
4053 }
4054 }
4055
4056 /* ================================= Debugging ============================== */
4057
4058 static void debugCommand(redisClient *c) {
4059 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4060 *((char*)-1) = 'x';
4061 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4062 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4063 robj *key, *val;
4064
4065 if (!de) {
4066 addReply(c,shared.nokeyerr);
4067 return;
4068 }
4069 key = dictGetEntryKey(de);
4070 val = dictGetEntryVal(de);
4071 addReplySds(c,sdscatprintf(sdsempty(),
4072 "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
4073 key, key->refcount, val, val->refcount));
4074 } else {
4075 addReplySds(c,sdsnew(
4076 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4077 }
4078 }
4079
4080 /* =================================== Main! ================================ */
4081
4082 #ifdef __linux__
4083 int linuxOvercommitMemoryValue(void) {
4084 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4085 char buf[64];
4086
4087 if (!fp) return -1;
4088 if (fgets(buf,64,fp) == NULL) {
4089 fclose(fp);
4090 return -1;
4091 }
4092 fclose(fp);
4093
4094 return atoi(buf);
4095 }
4096
4097 void linuxOvercommitMemoryWarning(void) {
4098 if (linuxOvercommitMemoryValue() == 0) {
4099 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4100 }
4101 }
4102 #endif /* __linux__ */
4103
4104 static void daemonize(void) {
4105 int fd;
4106 FILE *fp;
4107
4108 if (fork() != 0) exit(0); /* parent exits */
4109 setsid(); /* create a new session */
4110
4111 /* Every output goes to /dev/null. If Redis is daemonized but
4112 * the 'logfile' is set to 'stdout' in the configuration file
4113 * it will not log at all. */
4114 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4115 dup2(fd, STDIN_FILENO);
4116 dup2(fd, STDOUT_FILENO);
4117 dup2(fd, STDERR_FILENO);
4118 if (fd > STDERR_FILENO) close(fd);
4119 }
4120 /* Try to write the pid file */
4121 fp = fopen(server.pidfile,"w");
4122 if (fp) {
4123 fprintf(fp,"%d\n",getpid());
4124 fclose(fp);
4125 }
4126 }
4127
4128 int main(int argc, char **argv) {
4129 #ifdef __linux__
4130 linuxOvercommitMemoryWarning();
4131 #endif
4132
4133 initServerConfig();
4134 if (argc == 2) {
4135 ResetServerSaveParams();
4136 loadServerConfig(argv[1]);
4137 } else if (argc > 2) {
4138 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4139 exit(1);
4140 } else {
4141 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4142 }
4143 initServer();
4144 if (server.daemonize) daemonize();
4145 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4146 if (rdbLoad(server.dbfilename) == REDIS_OK)
4147 redisLog(REDIS_NOTICE,"DB loaded from disk");
4148 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4149 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4150 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4151 aeMain(server.el);
4152 aeDeleteEventLoop(server.el);
4153 return 0;
4154 }