]> git.saurik.com Git - redis.git/blob - redis.c
benchmark now outputs the right command line to shorten the TIME_WAIT interval on...
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.001"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60
61 #include "redis.h"
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
70
71 /* Error codes */
72 #define REDIS_OK 0
73 #define REDIS_ERR -1
74
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
88
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
91
92 /* Command flags */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
100
101 /* Object types */
102 #define REDIS_STRING 0
103 #define REDIS_LIST 1
104 #define REDIS_SET 2
105 #define REDIS_HASH 3
106
107 /* Objects encoding */
108 #define REDIS_ENCODING_RAW 0 /* Raw representation */
109 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
110
111 /* Object types only used for dumping to disk */
112 #define REDIS_EXPIRETIME 253
113 #define REDIS_SELECTDB 254
114 #define REDIS_EOF 255
115
116 /* Defines related to the dump file format. To store 32 bits lengths for short
117 * keys requires a lot of space, so we check the most significant 2 bits of
118 * the first byte to interpreter the length:
119 *
120 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
121 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
122 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
123 * 11|000000 this means: specially encoded object will follow. The six bits
124 * number specify the kind of object that follows.
125 * See the REDIS_RDB_ENC_* defines.
126 *
127 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
128 * values, will fit inside. */
129 #define REDIS_RDB_6BITLEN 0
130 #define REDIS_RDB_14BITLEN 1
131 #define REDIS_RDB_32BITLEN 2
132 #define REDIS_RDB_ENCVAL 3
133 #define REDIS_RDB_LENERR UINT_MAX
134
135 /* When a length of a string object stored on disk has the first two bits
136 * set, the remaining two bits specify a special encoding for the object
137 * accordingly to the following defines: */
138 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
139 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
140 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
141 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
142
143 /* Client flags */
144 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
145 #define REDIS_SLAVE 2 /* This client is a slave server */
146 #define REDIS_MASTER 4 /* This client is a master server */
147 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
148
149 /* Slave replication state - slave side */
150 #define REDIS_REPL_NONE 0 /* No active replication */
151 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
152 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
153
154 /* Slave replication state - from the point of view of master
155 * Note that in SEND_BULK and ONLINE state the slave receives new updates
156 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
157 * to start the next background saving in order to send updates to it. */
158 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
159 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
160 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
161 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
162
163 /* List related stuff */
164 #define REDIS_HEAD 0
165 #define REDIS_TAIL 1
166
167 /* Sort operations */
168 #define REDIS_SORT_GET 0
169 #define REDIS_SORT_DEL 1
170 #define REDIS_SORT_INCR 2
171 #define REDIS_SORT_DECR 3
172 #define REDIS_SORT_ASC 4
173 #define REDIS_SORT_DESC 5
174 #define REDIS_SORTKEY_MAX 1024
175
176 /* Log levels */
177 #define REDIS_DEBUG 0
178 #define REDIS_NOTICE 1
179 #define REDIS_WARNING 2
180
181 /* Anti-warning macro... */
182 #define REDIS_NOTUSED(V) ((void) V)
183
184
185 /*================================= Data types ============================== */
186
187 /* A redis object, that is a type able to hold a string / list / set */
188 typedef struct redisObject {
189 void *ptr;
190 unsigned char type;
191 unsigned char encoding;
192 unsigned char notused[2];
193 int refcount;
194 } robj;
195
196 typedef struct redisDb {
197 dict *dict;
198 dict *expires;
199 int id;
200 } redisDb;
201
202 /* With multiplexing we need to take per-clinet state.
203 * Clients are taken in a liked list. */
204 typedef struct redisClient {
205 int fd;
206 redisDb *db;
207 int dictid;
208 sds querybuf;
209 robj **argv;
210 int argc;
211 int bulklen; /* bulk read len. -1 if not in bulk read mode */
212 list *reply;
213 int sentlen;
214 time_t lastinteraction; /* time of the last interaction, used for timeout */
215 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
216 int slaveseldb; /* slave selected db, if this client is a slave */
217 int authenticated; /* when requirepass is non-NULL */
218 int replstate; /* replication state if this is a slave */
219 int repldbfd; /* replication DB file descriptor */
220 long repldboff; /* replication DB file offset */
221 off_t repldbsize; /* replication DB file size */
222 } redisClient;
223
224 struct saveparam {
225 time_t seconds;
226 int changes;
227 };
228
229 /* Global server state structure */
230 struct redisServer {
231 int port;
232 int fd;
233 redisDb *db;
234 dict *sharingpool;
235 unsigned int sharingpoolsize;
236 long long dirty; /* changes to DB from the last save */
237 list *clients;
238 list *slaves, *monitors;
239 char neterr[ANET_ERR_LEN];
240 aeEventLoop *el;
241 int cronloops; /* number of times the cron function run */
242 list *objfreelist; /* A list of freed objects to avoid malloc() */
243 time_t lastsave; /* Unix time of last save succeeede */
244 size_t usedmemory; /* Used memory in megabytes */
245 /* Fields used only for stats */
246 time_t stat_starttime; /* server start time */
247 long long stat_numcommands; /* number of processed commands */
248 long long stat_numconnections; /* number of connections received */
249 /* Configuration */
250 int verbosity;
251 int glueoutputbuf;
252 int maxidletime;
253 int dbnum;
254 int daemonize;
255 char *pidfile;
256 int bgsaveinprogress;
257 pid_t bgsavechildpid;
258 struct saveparam *saveparams;
259 int saveparamslen;
260 char *logfile;
261 char *bindaddr;
262 char *dbfilename;
263 char *requirepass;
264 int shareobjects;
265 /* Replication related */
266 int isslave;
267 char *masterhost;
268 int masterport;
269 redisClient *master; /* client that is master for this slave */
270 int replstate;
271 unsigned int maxclients;
272 unsigned long maxmemory;
273 /* Sort parameters - qsort_r() is only available under BSD so we
274 * have to take this state global, in order to pass it to sortCompare() */
275 int sort_desc;
276 int sort_alpha;
277 int sort_bypattern;
278 };
279
280 typedef void redisCommandProc(redisClient *c);
281 struct redisCommand {
282 char *name;
283 redisCommandProc *proc;
284 int arity;
285 int flags;
286 };
287
288 struct redisFunctionSym {
289 char *name;
290 unsigned long pointer;
291 };
292
293 typedef struct _redisSortObject {
294 robj *obj;
295 union {
296 double score;
297 robj *cmpobj;
298 } u;
299 } redisSortObject;
300
301 typedef struct _redisSortOperation {
302 int type;
303 robj *pattern;
304 } redisSortOperation;
305
306 struct sharedObjectsStruct {
307 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
308 *colon, *nullbulk, *nullmultibulk,
309 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
310 *outofrangeerr, *plus,
311 *select0, *select1, *select2, *select3, *select4,
312 *select5, *select6, *select7, *select8, *select9;
313 } shared;
314
315 /*================================ Prototypes =============================== */
316
317 static void freeStringObject(robj *o);
318 static void freeListObject(robj *o);
319 static void freeSetObject(robj *o);
320 static void decrRefCount(void *o);
321 static robj *createObject(int type, void *ptr);
322 static void freeClient(redisClient *c);
323 static int rdbLoad(char *filename);
324 static void addReply(redisClient *c, robj *obj);
325 static void addReplySds(redisClient *c, sds s);
326 static void incrRefCount(robj *o);
327 static int rdbSaveBackground(char *filename);
328 static robj *createStringObject(char *ptr, size_t len);
329 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
330 static int syncWithMaster(void);
331 static robj *tryObjectSharing(robj *o);
332 static int tryObjectEncoding(robj *o);
333 static robj *getDecodedObject(const robj *o);
334 static int removeExpire(redisDb *db, robj *key);
335 static int expireIfNeeded(redisDb *db, robj *key);
336 static int deleteIfVolatile(redisDb *db, robj *key);
337 static int deleteKey(redisDb *db, robj *key);
338 static time_t getExpire(redisDb *db, robj *key);
339 static int setExpire(redisDb *db, robj *key, time_t when);
340 static void updateSlavesWaitingBgsave(int bgsaveerr);
341 static void freeMemoryIfNeeded(void);
342 static int processCommand(redisClient *c);
343 static void setupSigSegvAction(void);
344 static void rdbRemoveTempFile(pid_t childpid);
345 static size_t stringObjectLen(robj *o);
346
347 static void authCommand(redisClient *c);
348 static void pingCommand(redisClient *c);
349 static void echoCommand(redisClient *c);
350 static void setCommand(redisClient *c);
351 static void setnxCommand(redisClient *c);
352 static void getCommand(redisClient *c);
353 static void delCommand(redisClient *c);
354 static void existsCommand(redisClient *c);
355 static void incrCommand(redisClient *c);
356 static void decrCommand(redisClient *c);
357 static void incrbyCommand(redisClient *c);
358 static void decrbyCommand(redisClient *c);
359 static void selectCommand(redisClient *c);
360 static void randomkeyCommand(redisClient *c);
361 static void keysCommand(redisClient *c);
362 static void dbsizeCommand(redisClient *c);
363 static void lastsaveCommand(redisClient *c);
364 static void saveCommand(redisClient *c);
365 static void bgsaveCommand(redisClient *c);
366 static void shutdownCommand(redisClient *c);
367 static void moveCommand(redisClient *c);
368 static void renameCommand(redisClient *c);
369 static void renamenxCommand(redisClient *c);
370 static void lpushCommand(redisClient *c);
371 static void rpushCommand(redisClient *c);
372 static void lpopCommand(redisClient *c);
373 static void rpopCommand(redisClient *c);
374 static void llenCommand(redisClient *c);
375 static void lindexCommand(redisClient *c);
376 static void lrangeCommand(redisClient *c);
377 static void ltrimCommand(redisClient *c);
378 static void typeCommand(redisClient *c);
379 static void lsetCommand(redisClient *c);
380 static void saddCommand(redisClient *c);
381 static void sremCommand(redisClient *c);
382 static void smoveCommand(redisClient *c);
383 static void sismemberCommand(redisClient *c);
384 static void scardCommand(redisClient *c);
385 static void spopCommand(redisClient *c);
386 static void sinterCommand(redisClient *c);
387 static void sinterstoreCommand(redisClient *c);
388 static void sunionCommand(redisClient *c);
389 static void sunionstoreCommand(redisClient *c);
390 static void sdiffCommand(redisClient *c);
391 static void sdiffstoreCommand(redisClient *c);
392 static void syncCommand(redisClient *c);
393 static void flushdbCommand(redisClient *c);
394 static void flushallCommand(redisClient *c);
395 static void sortCommand(redisClient *c);
396 static void lremCommand(redisClient *c);
397 static void infoCommand(redisClient *c);
398 static void mgetCommand(redisClient *c);
399 static void monitorCommand(redisClient *c);
400 static void expireCommand(redisClient *c);
401 static void getSetCommand(redisClient *c);
402 static void ttlCommand(redisClient *c);
403 static void slaveofCommand(redisClient *c);
404 static void debugCommand(redisClient *c);
405 /*================================= Globals ================================= */
406
407 /* Global vars */
408 static struct redisServer server; /* server global state */
409 static struct redisCommand cmdTable[] = {
410 {"get",getCommand,2,REDIS_CMD_INLINE},
411 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
412 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
413 {"del",delCommand,-2,REDIS_CMD_INLINE},
414 {"exists",existsCommand,2,REDIS_CMD_INLINE},
415 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
416 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
417 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
418 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
419 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
420 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
421 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
422 {"llen",llenCommand,2,REDIS_CMD_INLINE},
423 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
424 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
425 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
426 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
427 {"lrem",lremCommand,4,REDIS_CMD_BULK},
428 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
429 {"srem",sremCommand,3,REDIS_CMD_BULK},
430 {"smove",smoveCommand,4,REDIS_CMD_BULK},
431 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
432 {"scard",scardCommand,2,REDIS_CMD_INLINE},
433 {"spop",spopCommand,2,REDIS_CMD_INLINE},
434 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
435 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
436 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
437 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
438 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
439 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
440 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
441 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
442 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
443 {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
444 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
445 {"select",selectCommand,2,REDIS_CMD_INLINE},
446 {"move",moveCommand,3,REDIS_CMD_INLINE},
447 {"rename",renameCommand,3,REDIS_CMD_INLINE},
448 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
449 {"expire",expireCommand,3,REDIS_CMD_INLINE},
450 {"keys",keysCommand,2,REDIS_CMD_INLINE},
451 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
452 {"auth",authCommand,2,REDIS_CMD_INLINE},
453 {"ping",pingCommand,1,REDIS_CMD_INLINE},
454 {"echo",echoCommand,2,REDIS_CMD_BULK},
455 {"save",saveCommand,1,REDIS_CMD_INLINE},
456 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
457 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
458 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
459 {"type",typeCommand,2,REDIS_CMD_INLINE},
460 {"sync",syncCommand,1,REDIS_CMD_INLINE},
461 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
462 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
463 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
464 {"info",infoCommand,1,REDIS_CMD_INLINE},
465 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
466 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
467 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
468 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
469 {NULL,NULL,0,0}
470 };
471 /*============================ Utility functions ============================ */
472
473 /* Glob-style pattern matching. */
474 int stringmatchlen(const char *pattern, int patternLen,
475 const char *string, int stringLen, int nocase)
476 {
477 while(patternLen) {
478 switch(pattern[0]) {
479 case '*':
480 while (pattern[1] == '*') {
481 pattern++;
482 patternLen--;
483 }
484 if (patternLen == 1)
485 return 1; /* match */
486 while(stringLen) {
487 if (stringmatchlen(pattern+1, patternLen-1,
488 string, stringLen, nocase))
489 return 1; /* match */
490 string++;
491 stringLen--;
492 }
493 return 0; /* no match */
494 break;
495 case '?':
496 if (stringLen == 0)
497 return 0; /* no match */
498 string++;
499 stringLen--;
500 break;
501 case '[':
502 {
503 int not, match;
504
505 pattern++;
506 patternLen--;
507 not = pattern[0] == '^';
508 if (not) {
509 pattern++;
510 patternLen--;
511 }
512 match = 0;
513 while(1) {
514 if (pattern[0] == '\\') {
515 pattern++;
516 patternLen--;
517 if (pattern[0] == string[0])
518 match = 1;
519 } else if (pattern[0] == ']') {
520 break;
521 } else if (patternLen == 0) {
522 pattern--;
523 patternLen++;
524 break;
525 } else if (pattern[1] == '-' && patternLen >= 3) {
526 int start = pattern[0];
527 int end = pattern[2];
528 int c = string[0];
529 if (start > end) {
530 int t = start;
531 start = end;
532 end = t;
533 }
534 if (nocase) {
535 start = tolower(start);
536 end = tolower(end);
537 c = tolower(c);
538 }
539 pattern += 2;
540 patternLen -= 2;
541 if (c >= start && c <= end)
542 match = 1;
543 } else {
544 if (!nocase) {
545 if (pattern[0] == string[0])
546 match = 1;
547 } else {
548 if (tolower((int)pattern[0]) == tolower((int)string[0]))
549 match = 1;
550 }
551 }
552 pattern++;
553 patternLen--;
554 }
555 if (not)
556 match = !match;
557 if (!match)
558 return 0; /* no match */
559 string++;
560 stringLen--;
561 break;
562 }
563 case '\\':
564 if (patternLen >= 2) {
565 pattern++;
566 patternLen--;
567 }
568 /* fall through */
569 default:
570 if (!nocase) {
571 if (pattern[0] != string[0])
572 return 0; /* no match */
573 } else {
574 if (tolower((int)pattern[0]) != tolower((int)string[0]))
575 return 0; /* no match */
576 }
577 string++;
578 stringLen--;
579 break;
580 }
581 pattern++;
582 patternLen--;
583 if (stringLen == 0) {
584 while(*pattern == '*') {
585 pattern++;
586 patternLen--;
587 }
588 break;
589 }
590 }
591 if (patternLen == 0 && stringLen == 0)
592 return 1;
593 return 0;
594 }
595
596 static void redisLog(int level, const char *fmt, ...) {
597 va_list ap;
598 FILE *fp;
599
600 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
601 if (!fp) return;
602
603 va_start(ap, fmt);
604 if (level >= server.verbosity) {
605 char *c = ".-*";
606 char buf[64];
607 time_t now;
608
609 now = time(NULL);
610 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
611 fprintf(fp,"%s %c ",buf,c[level]);
612 vfprintf(fp, fmt, ap);
613 fprintf(fp,"\n");
614 fflush(fp);
615 }
616 va_end(ap);
617
618 if (server.logfile) fclose(fp);
619 }
620
621 /*====================== Hash table type implementation ==================== */
622
623 /* This is an hash table type that uses the SDS dynamic strings libary as
624 * keys and radis objects as values (objects can hold SDS strings,
625 * lists, sets). */
626
627 static int sdsDictKeyCompare(void *privdata, const void *key1,
628 const void *key2)
629 {
630 int l1,l2;
631 DICT_NOTUSED(privdata);
632
633 l1 = sdslen((sds)key1);
634 l2 = sdslen((sds)key2);
635 if (l1 != l2) return 0;
636 return memcmp(key1, key2, l1) == 0;
637 }
638
639 static void dictRedisObjectDestructor(void *privdata, void *val)
640 {
641 DICT_NOTUSED(privdata);
642
643 decrRefCount(val);
644 }
645
646 static int dictObjKeyCompare(void *privdata, const void *key1,
647 const void *key2)
648 {
649 const robj *o1 = key1, *o2 = key2;
650 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
651 }
652
653 static unsigned int dictObjHash(const void *key) {
654 const robj *o = key;
655 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
656 }
657
658 static int dictEncObjKeyCompare(void *privdata, const void *key1,
659 const void *key2)
660 {
661 const robj *o1 = key1, *o2 = key2;
662
663 if (o1->encoding == REDIS_ENCODING_RAW &&
664 o2->encoding == REDIS_ENCODING_RAW)
665 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
666 else {
667 robj *dec1, *dec2;
668 int cmp;
669
670 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
671 getDecodedObject(o1) : (robj*)o1;
672 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
673 getDecodedObject(o2) : (robj*)o2;
674 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
675 if (dec1 != o1) decrRefCount(dec1);
676 if (dec2 != o2) decrRefCount(dec2);
677 return cmp;
678 }
679 }
680
681 static unsigned int dictEncObjHash(const void *key) {
682 const robj *o = key;
683
684 if (o->encoding == REDIS_ENCODING_RAW)
685 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
686 else {
687 robj *dec = getDecodedObject(o);
688 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
689 decrRefCount(dec);
690 return hash;
691 }
692 }
693
694 static dictType setDictType = {
695 dictEncObjHash, /* hash function */
696 NULL, /* key dup */
697 NULL, /* val dup */
698 dictEncObjKeyCompare, /* key compare */
699 dictRedisObjectDestructor, /* key destructor */
700 NULL /* val destructor */
701 };
702
703 static dictType hashDictType = {
704 dictObjHash, /* hash function */
705 NULL, /* key dup */
706 NULL, /* val dup */
707 dictObjKeyCompare, /* key compare */
708 dictRedisObjectDestructor, /* key destructor */
709 dictRedisObjectDestructor /* val destructor */
710 };
711
712 /* ========================= Random utility functions ======================= */
713
714 /* Redis generally does not try to recover from out of memory conditions
715 * when allocating objects or strings, it is not clear if it will be possible
716 * to report this condition to the client since the networking layer itself
717 * is based on heap allocation for send buffers, so we simply abort.
718 * At least the code will be simpler to read... */
719 static void oom(const char *msg) {
720 fprintf(stderr, "%s: Out of memory\n",msg);
721 fflush(stderr);
722 sleep(1);
723 abort();
724 }
725
726 /* ====================== Redis server networking stuff ===================== */
727 static void closeTimedoutClients(void) {
728 redisClient *c;
729 listNode *ln;
730 time_t now = time(NULL);
731
732 listRewind(server.clients);
733 while ((ln = listYield(server.clients)) != NULL) {
734 c = listNodeValue(ln);
735 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
736 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
737 (now - c->lastinteraction > server.maxidletime)) {
738 redisLog(REDIS_DEBUG,"Closing idle client");
739 freeClient(c);
740 }
741 }
742 }
743
744 static int htNeedsResize(dict *dict) {
745 long long size, used;
746
747 size = dictSlots(dict);
748 used = dictSize(dict);
749 return (size && used && size > DICT_HT_INITIAL_SIZE &&
750 (used*100/size < REDIS_HT_MINFILL));
751 }
752
753 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
754 * we resize the hash table to save memory */
755 static void tryResizeHashTables(void) {
756 int j;
757
758 for (j = 0; j < server.dbnum; j++) {
759 if (htNeedsResize(server.db[j].dict)) {
760 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
761 dictResize(server.db[j].dict);
762 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
763 }
764 if (htNeedsResize(server.db[j].expires))
765 dictResize(server.db[j].expires);
766 }
767 }
768
769 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
770 int j, loops = server.cronloops++;
771 REDIS_NOTUSED(eventLoop);
772 REDIS_NOTUSED(id);
773 REDIS_NOTUSED(clientData);
774
775 /* Update the global state with the amount of used memory */
776 server.usedmemory = zmalloc_used_memory();
777
778 /* Show some info about non-empty databases */
779 for (j = 0; j < server.dbnum; j++) {
780 long long size, used, vkeys;
781
782 size = dictSlots(server.db[j].dict);
783 used = dictSize(server.db[j].dict);
784 vkeys = dictSize(server.db[j].expires);
785 if (!(loops % 5) && (used || vkeys)) {
786 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
787 /* dictPrintStats(server.dict); */
788 }
789 }
790
791 /* We don't want to resize the hash tables while a bacground saving
792 * is in progress: the saving child is created using fork() that is
793 * implemented with a copy-on-write semantic in most modern systems, so
794 * if we resize the HT while there is the saving child at work actually
795 * a lot of memory movements in the parent will cause a lot of pages
796 * copied. */
797 if (!server.bgsaveinprogress) tryResizeHashTables();
798
799 /* Show information about connected clients */
800 if (!(loops % 5)) {
801 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
802 listLength(server.clients)-listLength(server.slaves),
803 listLength(server.slaves),
804 server.usedmemory,
805 dictSize(server.sharingpool));
806 }
807
808 /* Close connections of timedout clients */
809 if (server.maxidletime && !(loops % 10))
810 closeTimedoutClients();
811
812 /* Check if a background saving in progress terminated */
813 if (server.bgsaveinprogress) {
814 int statloc;
815 if (wait4(-1,&statloc,WNOHANG,NULL)) {
816 int exitcode = WEXITSTATUS(statloc);
817 int bysignal = WIFSIGNALED(statloc);
818
819 if (!bysignal && exitcode == 0) {
820 redisLog(REDIS_NOTICE,
821 "Background saving terminated with success");
822 server.dirty = 0;
823 server.lastsave = time(NULL);
824 } else if (!bysignal && exitcode != 0) {
825 redisLog(REDIS_WARNING, "Background saving error");
826 } else {
827 redisLog(REDIS_WARNING,
828 "Background saving terminated by signal");
829 rdbRemoveTempFile(server.bgsavechildpid);
830 }
831 server.bgsaveinprogress = 0;
832 server.bgsavechildpid = -1;
833 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
834 }
835 } else {
836 /* If there is not a background saving in progress check if
837 * we have to save now */
838 time_t now = time(NULL);
839 for (j = 0; j < server.saveparamslen; j++) {
840 struct saveparam *sp = server.saveparams+j;
841
842 if (server.dirty >= sp->changes &&
843 now-server.lastsave > sp->seconds) {
844 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
845 sp->changes, sp->seconds);
846 rdbSaveBackground(server.dbfilename);
847 break;
848 }
849 }
850 }
851
852 /* Try to expire a few timed out keys */
853 for (j = 0; j < server.dbnum; j++) {
854 redisDb *db = server.db+j;
855 int num = dictSize(db->expires);
856
857 if (num) {
858 time_t now = time(NULL);
859
860 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
861 num = REDIS_EXPIRELOOKUPS_PER_CRON;
862 while (num--) {
863 dictEntry *de;
864 time_t t;
865
866 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
867 t = (time_t) dictGetEntryVal(de);
868 if (now > t) {
869 deleteKey(db,dictGetEntryKey(de));
870 }
871 }
872 }
873 }
874
875 /* Check if we should connect to a MASTER */
876 if (server.replstate == REDIS_REPL_CONNECT) {
877 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
878 if (syncWithMaster() == REDIS_OK) {
879 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
880 }
881 }
882 return 1000;
883 }
884
885 static void createSharedObjects(void) {
886 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
887 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
888 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
889 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
890 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
891 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
892 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
893 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
894 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
895 /* no such key */
896 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
897 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
898 "-ERR Operation against a key holding the wrong kind of value\r\n"));
899 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
900 "-ERR no such key\r\n"));
901 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
902 "-ERR syntax error\r\n"));
903 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
904 "-ERR source and destination objects are the same\r\n"));
905 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
906 "-ERR index out of range\r\n"));
907 shared.space = createObject(REDIS_STRING,sdsnew(" "));
908 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
909 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
910 shared.select0 = createStringObject("select 0\r\n",10);
911 shared.select1 = createStringObject("select 1\r\n",10);
912 shared.select2 = createStringObject("select 2\r\n",10);
913 shared.select3 = createStringObject("select 3\r\n",10);
914 shared.select4 = createStringObject("select 4\r\n",10);
915 shared.select5 = createStringObject("select 5\r\n",10);
916 shared.select6 = createStringObject("select 6\r\n",10);
917 shared.select7 = createStringObject("select 7\r\n",10);
918 shared.select8 = createStringObject("select 8\r\n",10);
919 shared.select9 = createStringObject("select 9\r\n",10);
920 }
921
922 static void appendServerSaveParams(time_t seconds, int changes) {
923 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
924 if (server.saveparams == NULL) oom("appendServerSaveParams");
925 server.saveparams[server.saveparamslen].seconds = seconds;
926 server.saveparams[server.saveparamslen].changes = changes;
927 server.saveparamslen++;
928 }
929
930 static void ResetServerSaveParams() {
931 zfree(server.saveparams);
932 server.saveparams = NULL;
933 server.saveparamslen = 0;
934 }
935
936 static void initServerConfig() {
937 server.dbnum = REDIS_DEFAULT_DBNUM;
938 server.port = REDIS_SERVERPORT;
939 server.verbosity = REDIS_DEBUG;
940 server.maxidletime = REDIS_MAXIDLETIME;
941 server.saveparams = NULL;
942 server.logfile = NULL; /* NULL = log on standard output */
943 server.bindaddr = NULL;
944 server.glueoutputbuf = 1;
945 server.daemonize = 0;
946 server.pidfile = "/var/run/redis.pid";
947 server.dbfilename = "dump.rdb";
948 server.requirepass = NULL;
949 server.shareobjects = 0;
950 server.sharingpoolsize = 1024;
951 server.maxclients = 0;
952 server.maxmemory = 0;
953 ResetServerSaveParams();
954
955 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
956 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
957 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
958 /* Replication related */
959 server.isslave = 0;
960 server.masterhost = NULL;
961 server.masterport = 6379;
962 server.master = NULL;
963 server.replstate = REDIS_REPL_NONE;
964 }
965
966 static void initServer() {
967 int j;
968
969 signal(SIGHUP, SIG_IGN);
970 signal(SIGPIPE, SIG_IGN);
971 setupSigSegvAction();
972
973 server.clients = listCreate();
974 server.slaves = listCreate();
975 server.monitors = listCreate();
976 server.objfreelist = listCreate();
977 createSharedObjects();
978 server.el = aeCreateEventLoop();
979 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
980 server.sharingpool = dictCreate(&setDictType,NULL);
981 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
982 oom("server initialization"); /* Fatal OOM */
983 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
984 if (server.fd == -1) {
985 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
986 exit(1);
987 }
988 for (j = 0; j < server.dbnum; j++) {
989 server.db[j].dict = dictCreate(&hashDictType,NULL);
990 server.db[j].expires = dictCreate(&setDictType,NULL);
991 server.db[j].id = j;
992 }
993 server.cronloops = 0;
994 server.bgsaveinprogress = 0;
995 server.bgsavechildpid = -1;
996 server.lastsave = time(NULL);
997 server.dirty = 0;
998 server.usedmemory = 0;
999 server.stat_numcommands = 0;
1000 server.stat_numconnections = 0;
1001 server.stat_starttime = time(NULL);
1002 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1003 }
1004
1005 /* Empty the whole database */
1006 static long long emptyDb() {
1007 int j;
1008 long long removed = 0;
1009
1010 for (j = 0; j < server.dbnum; j++) {
1011 removed += dictSize(server.db[j].dict);
1012 dictEmpty(server.db[j].dict);
1013 dictEmpty(server.db[j].expires);
1014 }
1015 return removed;
1016 }
1017
1018 static int yesnotoi(char *s) {
1019 if (!strcasecmp(s,"yes")) return 1;
1020 else if (!strcasecmp(s,"no")) return 0;
1021 else return -1;
1022 }
1023
1024 /* I agree, this is a very rudimental way to load a configuration...
1025 will improve later if the config gets more complex */
1026 static void loadServerConfig(char *filename) {
1027 FILE *fp;
1028 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1029 int linenum = 0;
1030 sds line = NULL;
1031
1032 if (filename[0] == '-' && filename[1] == '\0')
1033 fp = stdin;
1034 else {
1035 if ((fp = fopen(filename,"r")) == NULL) {
1036 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1037 exit(1);
1038 }
1039 }
1040
1041 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1042 sds *argv;
1043 int argc, j;
1044
1045 linenum++;
1046 line = sdsnew(buf);
1047 line = sdstrim(line," \t\r\n");
1048
1049 /* Skip comments and blank lines*/
1050 if (line[0] == '#' || line[0] == '\0') {
1051 sdsfree(line);
1052 continue;
1053 }
1054
1055 /* Split into arguments */
1056 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1057 sdstolower(argv[0]);
1058
1059 /* Execute config directives */
1060 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1061 server.maxidletime = atoi(argv[1]);
1062 if (server.maxidletime < 0) {
1063 err = "Invalid timeout value"; goto loaderr;
1064 }
1065 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1066 server.port = atoi(argv[1]);
1067 if (server.port < 1 || server.port > 65535) {
1068 err = "Invalid port"; goto loaderr;
1069 }
1070 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1071 server.bindaddr = zstrdup(argv[1]);
1072 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1073 int seconds = atoi(argv[1]);
1074 int changes = atoi(argv[2]);
1075 if (seconds < 1 || changes < 0) {
1076 err = "Invalid save parameters"; goto loaderr;
1077 }
1078 appendServerSaveParams(seconds,changes);
1079 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1080 if (chdir(argv[1]) == -1) {
1081 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1082 argv[1], strerror(errno));
1083 exit(1);
1084 }
1085 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1086 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1087 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1088 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1089 else {
1090 err = "Invalid log level. Must be one of debug, notice, warning";
1091 goto loaderr;
1092 }
1093 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1094 FILE *logfp;
1095
1096 server.logfile = zstrdup(argv[1]);
1097 if (!strcasecmp(server.logfile,"stdout")) {
1098 zfree(server.logfile);
1099 server.logfile = NULL;
1100 }
1101 if (server.logfile) {
1102 /* Test if we are able to open the file. The server will not
1103 * be able to abort just for this problem later... */
1104 logfp = fopen(server.logfile,"a");
1105 if (logfp == NULL) {
1106 err = sdscatprintf(sdsempty(),
1107 "Can't open the log file: %s", strerror(errno));
1108 goto loaderr;
1109 }
1110 fclose(logfp);
1111 }
1112 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1113 server.dbnum = atoi(argv[1]);
1114 if (server.dbnum < 1) {
1115 err = "Invalid number of databases"; goto loaderr;
1116 }
1117 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1118 server.maxclients = atoi(argv[1]);
1119 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1120 server.maxmemory = strtoll(argv[1], NULL, 10);
1121 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1122 server.masterhost = sdsnew(argv[1]);
1123 server.masterport = atoi(argv[2]);
1124 server.replstate = REDIS_REPL_CONNECT;
1125 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1126 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1127 err = "argument must be 'yes' or 'no'"; goto loaderr;
1128 }
1129 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1130 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1131 err = "argument must be 'yes' or 'no'"; goto loaderr;
1132 }
1133 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1134 server.sharingpoolsize = atoi(argv[1]);
1135 if (server.sharingpoolsize < 1) {
1136 err = "invalid object sharing pool size"; goto loaderr;
1137 }
1138 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1139 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1140 err = "argument must be 'yes' or 'no'"; goto loaderr;
1141 }
1142 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1143 server.requirepass = zstrdup(argv[1]);
1144 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1145 server.pidfile = zstrdup(argv[1]);
1146 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1147 server.dbfilename = zstrdup(argv[1]);
1148 } else {
1149 err = "Bad directive or wrong number of arguments"; goto loaderr;
1150 }
1151 for (j = 0; j < argc; j++)
1152 sdsfree(argv[j]);
1153 zfree(argv);
1154 sdsfree(line);
1155 }
1156 if (fp != stdin) fclose(fp);
1157 return;
1158
1159 loaderr:
1160 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1161 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1162 fprintf(stderr, ">>> '%s'\n", line);
1163 fprintf(stderr, "%s\n", err);
1164 exit(1);
1165 }
1166
1167 static void freeClientArgv(redisClient *c) {
1168 int j;
1169
1170 for (j = 0; j < c->argc; j++)
1171 decrRefCount(c->argv[j]);
1172 c->argc = 0;
1173 }
1174
1175 static void freeClient(redisClient *c) {
1176 listNode *ln;
1177
1178 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1179 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1180 sdsfree(c->querybuf);
1181 listRelease(c->reply);
1182 freeClientArgv(c);
1183 close(c->fd);
1184 ln = listSearchKey(server.clients,c);
1185 assert(ln != NULL);
1186 listDelNode(server.clients,ln);
1187 if (c->flags & REDIS_SLAVE) {
1188 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1189 close(c->repldbfd);
1190 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1191 ln = listSearchKey(l,c);
1192 assert(ln != NULL);
1193 listDelNode(l,ln);
1194 }
1195 if (c->flags & REDIS_MASTER) {
1196 server.master = NULL;
1197 server.replstate = REDIS_REPL_CONNECT;
1198 }
1199 zfree(c->argv);
1200 zfree(c);
1201 }
1202
1203 static void glueReplyBuffersIfNeeded(redisClient *c) {
1204 int totlen = 0;
1205 listNode *ln;
1206 robj *o;
1207
1208 listRewind(c->reply);
1209 while((ln = listYield(c->reply))) {
1210 o = ln->value;
1211 totlen += sdslen(o->ptr);
1212 /* This optimization makes more sense if we don't have to copy
1213 * too much data */
1214 if (totlen > 1024) return;
1215 }
1216 if (totlen > 0) {
1217 char buf[1024];
1218 int copylen = 0;
1219
1220 listRewind(c->reply);
1221 while((ln = listYield(c->reply))) {
1222 o = ln->value;
1223 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1224 copylen += sdslen(o->ptr);
1225 listDelNode(c->reply,ln);
1226 }
1227 /* Now the output buffer is empty, add the new single element */
1228 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1229 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1230 }
1231 }
1232
1233 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1234 redisClient *c = privdata;
1235 int nwritten = 0, totwritten = 0, objlen;
1236 robj *o;
1237 REDIS_NOTUSED(el);
1238 REDIS_NOTUSED(mask);
1239
1240 if (server.glueoutputbuf && listLength(c->reply) > 1)
1241 glueReplyBuffersIfNeeded(c);
1242 while(listLength(c->reply)) {
1243 o = listNodeValue(listFirst(c->reply));
1244 objlen = sdslen(o->ptr);
1245
1246 if (objlen == 0) {
1247 listDelNode(c->reply,listFirst(c->reply));
1248 continue;
1249 }
1250
1251 if (c->flags & REDIS_MASTER) {
1252 /* Don't reply to a master */
1253 nwritten = objlen - c->sentlen;
1254 } else {
1255 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1256 if (nwritten <= 0) break;
1257 }
1258 c->sentlen += nwritten;
1259 totwritten += nwritten;
1260 /* If we fully sent the object on head go to the next one */
1261 if (c->sentlen == objlen) {
1262 listDelNode(c->reply,listFirst(c->reply));
1263 c->sentlen = 0;
1264 }
1265 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1266 * bytes, in a single threaded server it's a good idea to server
1267 * other clients as well, even if a very large request comes from
1268 * super fast link that is always able to accept data (in real world
1269 * terms think to 'KEYS *' against the loopback interfae) */
1270 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1271 }
1272 if (nwritten == -1) {
1273 if (errno == EAGAIN) {
1274 nwritten = 0;
1275 } else {
1276 redisLog(REDIS_DEBUG,
1277 "Error writing to client: %s", strerror(errno));
1278 freeClient(c);
1279 return;
1280 }
1281 }
1282 if (totwritten > 0) c->lastinteraction = time(NULL);
1283 if (listLength(c->reply) == 0) {
1284 c->sentlen = 0;
1285 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1286 }
1287 }
1288
1289 static struct redisCommand *lookupCommand(char *name) {
1290 int j = 0;
1291 while(cmdTable[j].name != NULL) {
1292 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1293 j++;
1294 }
1295 return NULL;
1296 }
1297
1298 /* resetClient prepare the client to process the next command */
1299 static void resetClient(redisClient *c) {
1300 freeClientArgv(c);
1301 c->bulklen = -1;
1302 }
1303
1304 /* If this function gets called we already read a whole
1305 * command, argments are in the client argv/argc fields.
1306 * processCommand() execute the command or prepare the
1307 * server for a bulk read from the client.
1308 *
1309 * If 1 is returned the client is still alive and valid and
1310 * and other operations can be performed by the caller. Otherwise
1311 * if 0 is returned the client was destroied (i.e. after QUIT). */
1312 static int processCommand(redisClient *c) {
1313 struct redisCommand *cmd;
1314 long long dirty;
1315
1316 /* Free some memory if needed (maxmemory setting) */
1317 if (server.maxmemory) freeMemoryIfNeeded();
1318
1319 /* The QUIT command is handled as a special case. Normal command
1320 * procs are unable to close the client connection safely */
1321 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1322 freeClient(c);
1323 return 0;
1324 }
1325 cmd = lookupCommand(c->argv[0]->ptr);
1326 if (!cmd) {
1327 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1328 resetClient(c);
1329 return 1;
1330 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1331 (c->argc < -cmd->arity)) {
1332 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1333 resetClient(c);
1334 return 1;
1335 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1336 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1337 resetClient(c);
1338 return 1;
1339 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1340 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1341
1342 decrRefCount(c->argv[c->argc-1]);
1343 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1344 c->argc--;
1345 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1346 resetClient(c);
1347 return 1;
1348 }
1349 c->argc--;
1350 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1351 /* It is possible that the bulk read is already in the
1352 * buffer. Check this condition and handle it accordingly */
1353 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1354 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1355 c->argc++;
1356 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1357 } else {
1358 return 1;
1359 }
1360 }
1361 /* Let's try to share objects on the command arguments vector */
1362 if (server.shareobjects) {
1363 int j;
1364 for(j = 1; j < c->argc; j++)
1365 c->argv[j] = tryObjectSharing(c->argv[j]);
1366 }
1367 /* Let's try to encode the bulk object to save space. */
1368 if (cmd->flags & REDIS_CMD_BULK)
1369 tryObjectEncoding(c->argv[c->argc-1]);
1370
1371 /* Check if the user is authenticated */
1372 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1373 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1374 resetClient(c);
1375 return 1;
1376 }
1377
1378 /* Exec the command */
1379 dirty = server.dirty;
1380 cmd->proc(c);
1381 if (server.dirty-dirty != 0 && listLength(server.slaves))
1382 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1383 if (listLength(server.monitors))
1384 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1385 server.stat_numcommands++;
1386
1387 /* Prepare the client for the next command */
1388 if (c->flags & REDIS_CLOSE) {
1389 freeClient(c);
1390 return 0;
1391 }
1392 resetClient(c);
1393 return 1;
1394 }
1395
1396 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1397 listNode *ln;
1398 int outc = 0, j;
1399 robj **outv;
1400 /* (args*2)+1 is enough room for args, spaces, newlines */
1401 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1402
1403 if (argc <= REDIS_STATIC_ARGS) {
1404 outv = static_outv;
1405 } else {
1406 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1407 if (!outv) oom("replicationFeedSlaves");
1408 }
1409
1410 for (j = 0; j < argc; j++) {
1411 if (j != 0) outv[outc++] = shared.space;
1412 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1413 robj *lenobj;
1414
1415 lenobj = createObject(REDIS_STRING,
1416 sdscatprintf(sdsempty(),"%d\r\n",
1417 stringObjectLen(argv[j])));
1418 lenobj->refcount = 0;
1419 outv[outc++] = lenobj;
1420 }
1421 outv[outc++] = argv[j];
1422 }
1423 outv[outc++] = shared.crlf;
1424
1425 /* Increment all the refcounts at start and decrement at end in order to
1426 * be sure to free objects if there is no slave in a replication state
1427 * able to be feed with commands */
1428 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1429 listRewind(slaves);
1430 while((ln = listYield(slaves))) {
1431 redisClient *slave = ln->value;
1432
1433 /* Don't feed slaves that are still waiting for BGSAVE to start */
1434 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1435
1436 /* Feed all the other slaves, MONITORs and so on */
1437 if (slave->slaveseldb != dictid) {
1438 robj *selectcmd;
1439
1440 switch(dictid) {
1441 case 0: selectcmd = shared.select0; break;
1442 case 1: selectcmd = shared.select1; break;
1443 case 2: selectcmd = shared.select2; break;
1444 case 3: selectcmd = shared.select3; break;
1445 case 4: selectcmd = shared.select4; break;
1446 case 5: selectcmd = shared.select5; break;
1447 case 6: selectcmd = shared.select6; break;
1448 case 7: selectcmd = shared.select7; break;
1449 case 8: selectcmd = shared.select8; break;
1450 case 9: selectcmd = shared.select9; break;
1451 default:
1452 selectcmd = createObject(REDIS_STRING,
1453 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1454 selectcmd->refcount = 0;
1455 break;
1456 }
1457 addReply(slave,selectcmd);
1458 slave->slaveseldb = dictid;
1459 }
1460 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1461 }
1462 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1463 if (outv != static_outv) zfree(outv);
1464 }
1465
1466 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1467 redisClient *c = (redisClient*) privdata;
1468 char buf[REDIS_IOBUF_LEN];
1469 int nread;
1470 REDIS_NOTUSED(el);
1471 REDIS_NOTUSED(mask);
1472
1473 nread = read(fd, buf, REDIS_IOBUF_LEN);
1474 if (nread == -1) {
1475 if (errno == EAGAIN) {
1476 nread = 0;
1477 } else {
1478 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1479 freeClient(c);
1480 return;
1481 }
1482 } else if (nread == 0) {
1483 redisLog(REDIS_DEBUG, "Client closed connection");
1484 freeClient(c);
1485 return;
1486 }
1487 if (nread) {
1488 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1489 c->lastinteraction = time(NULL);
1490 } else {
1491 return;
1492 }
1493
1494 again:
1495 if (c->bulklen == -1) {
1496 /* Read the first line of the query */
1497 char *p = strchr(c->querybuf,'\n');
1498 size_t querylen;
1499
1500 if (p) {
1501 sds query, *argv;
1502 int argc, j;
1503
1504 query = c->querybuf;
1505 c->querybuf = sdsempty();
1506 querylen = 1+(p-(query));
1507 if (sdslen(query) > querylen) {
1508 /* leave data after the first line of the query in the buffer */
1509 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1510 }
1511 *p = '\0'; /* remove "\n" */
1512 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1513 sdsupdatelen(query);
1514
1515 /* Now we can split the query in arguments */
1516 if (sdslen(query) == 0) {
1517 /* Ignore empty query */
1518 sdsfree(query);
1519 return;
1520 }
1521 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1522 if (argv == NULL) oom("sdssplitlen");
1523 sdsfree(query);
1524
1525 if (c->argv) zfree(c->argv);
1526 c->argv = zmalloc(sizeof(robj*)*argc);
1527 if (c->argv == NULL) oom("allocating arguments list for client");
1528
1529 for (j = 0; j < argc; j++) {
1530 if (sdslen(argv[j])) {
1531 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1532 c->argc++;
1533 } else {
1534 sdsfree(argv[j]);
1535 }
1536 }
1537 zfree(argv);
1538 /* Execute the command. If the client is still valid
1539 * after processCommand() return and there is something
1540 * on the query buffer try to process the next command. */
1541 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1542 return;
1543 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1544 redisLog(REDIS_DEBUG, "Client protocol error");
1545 freeClient(c);
1546 return;
1547 }
1548 } else {
1549 /* Bulk read handling. Note that if we are at this point
1550 the client already sent a command terminated with a newline,
1551 we are reading the bulk data that is actually the last
1552 argument of the command. */
1553 int qbl = sdslen(c->querybuf);
1554
1555 if (c->bulklen <= qbl) {
1556 /* Copy everything but the final CRLF as final argument */
1557 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1558 c->argc++;
1559 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1560 processCommand(c);
1561 return;
1562 }
1563 }
1564 }
1565
1566 static int selectDb(redisClient *c, int id) {
1567 if (id < 0 || id >= server.dbnum)
1568 return REDIS_ERR;
1569 c->db = &server.db[id];
1570 return REDIS_OK;
1571 }
1572
1573 static void *dupClientReplyValue(void *o) {
1574 incrRefCount((robj*)o);
1575 return 0;
1576 }
1577
1578 static redisClient *createClient(int fd) {
1579 redisClient *c = zmalloc(sizeof(*c));
1580
1581 anetNonBlock(NULL,fd);
1582 anetTcpNoDelay(NULL,fd);
1583 if (!c) return NULL;
1584 selectDb(c,0);
1585 c->fd = fd;
1586 c->querybuf = sdsempty();
1587 c->argc = 0;
1588 c->argv = NULL;
1589 c->bulklen = -1;
1590 c->sentlen = 0;
1591 c->flags = 0;
1592 c->lastinteraction = time(NULL);
1593 c->authenticated = 0;
1594 c->replstate = REDIS_REPL_NONE;
1595 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1596 listSetFreeMethod(c->reply,decrRefCount);
1597 listSetDupMethod(c->reply,dupClientReplyValue);
1598 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1599 readQueryFromClient, c, NULL) == AE_ERR) {
1600 freeClient(c);
1601 return NULL;
1602 }
1603 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1604 return c;
1605 }
1606
1607 static void addReply(redisClient *c, robj *obj) {
1608 if (listLength(c->reply) == 0 &&
1609 (c->replstate == REDIS_REPL_NONE ||
1610 c->replstate == REDIS_REPL_ONLINE) &&
1611 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1612 sendReplyToClient, c, NULL) == AE_ERR) return;
1613 if (obj->encoding != REDIS_ENCODING_RAW) {
1614 obj = getDecodedObject(obj);
1615 } else {
1616 incrRefCount(obj);
1617 }
1618 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1619 }
1620
1621 static void addReplySds(redisClient *c, sds s) {
1622 robj *o = createObject(REDIS_STRING,s);
1623 addReply(c,o);
1624 decrRefCount(o);
1625 }
1626
1627 static void addReplyBulkLen(redisClient *c, robj *obj) {
1628 size_t len;
1629
1630 if (obj->encoding == REDIS_ENCODING_RAW) {
1631 len = sdslen(obj->ptr);
1632 } else {
1633 long n = (long)obj->ptr;
1634
1635 len = 1;
1636 if (n < 0) {
1637 len++;
1638 n = -n;
1639 }
1640 while((n = n/10) != 0) {
1641 len++;
1642 }
1643 }
1644 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1645 }
1646
1647 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1648 int cport, cfd;
1649 char cip[128];
1650 redisClient *c;
1651 REDIS_NOTUSED(el);
1652 REDIS_NOTUSED(mask);
1653 REDIS_NOTUSED(privdata);
1654
1655 cfd = anetAccept(server.neterr, fd, cip, &cport);
1656 if (cfd == AE_ERR) {
1657 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1658 return;
1659 }
1660 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1661 if ((c = createClient(cfd)) == NULL) {
1662 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1663 close(cfd); /* May be already closed, just ingore errors */
1664 return;
1665 }
1666 /* If maxclient directive is set and this is one client more... close the
1667 * connection. Note that we create the client instead to check before
1668 * for this condition, since now the socket is already set in nonblocking
1669 * mode and we can send an error for free using the Kernel I/O */
1670 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1671 char *err = "-ERR max number of clients reached\r\n";
1672
1673 /* That's a best effort error message, don't check write errors */
1674 (void) write(c->fd,err,strlen(err));
1675 freeClient(c);
1676 return;
1677 }
1678 server.stat_numconnections++;
1679 }
1680
1681 /* ======================= Redis objects implementation ===================== */
1682
1683 static robj *createObject(int type, void *ptr) {
1684 robj *o;
1685
1686 if (listLength(server.objfreelist)) {
1687 listNode *head = listFirst(server.objfreelist);
1688 o = listNodeValue(head);
1689 listDelNode(server.objfreelist,head);
1690 } else {
1691 o = zmalloc(sizeof(*o));
1692 }
1693 if (!o) oom("createObject");
1694 o->type = type;
1695 o->encoding = REDIS_ENCODING_RAW;
1696 o->ptr = ptr;
1697 o->refcount = 1;
1698 return o;
1699 }
1700
1701 static robj *createStringObject(char *ptr, size_t len) {
1702 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1703 }
1704
1705 static robj *createListObject(void) {
1706 list *l = listCreate();
1707
1708 if (!l) oom("listCreate");
1709 listSetFreeMethod(l,decrRefCount);
1710 return createObject(REDIS_LIST,l);
1711 }
1712
1713 static robj *createSetObject(void) {
1714 dict *d = dictCreate(&setDictType,NULL);
1715 if (!d) oom("dictCreate");
1716 return createObject(REDIS_SET,d);
1717 }
1718
1719 static void freeStringObject(robj *o) {
1720 if (o->encoding == REDIS_ENCODING_RAW) {
1721 sdsfree(o->ptr);
1722 }
1723 }
1724
1725 static void freeListObject(robj *o) {
1726 listRelease((list*) o->ptr);
1727 }
1728
1729 static void freeSetObject(robj *o) {
1730 dictRelease((dict*) o->ptr);
1731 }
1732
1733 static void freeHashObject(robj *o) {
1734 dictRelease((dict*) o->ptr);
1735 }
1736
1737 static void incrRefCount(robj *o) {
1738 o->refcount++;
1739 #ifdef DEBUG_REFCOUNT
1740 if (o->type == REDIS_STRING)
1741 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1742 #endif
1743 }
1744
1745 static void decrRefCount(void *obj) {
1746 robj *o = obj;
1747
1748 #ifdef DEBUG_REFCOUNT
1749 if (o->type == REDIS_STRING)
1750 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1751 #endif
1752 if (--(o->refcount) == 0) {
1753 switch(o->type) {
1754 case REDIS_STRING: freeStringObject(o); break;
1755 case REDIS_LIST: freeListObject(o); break;
1756 case REDIS_SET: freeSetObject(o); break;
1757 case REDIS_HASH: freeHashObject(o); break;
1758 default: assert(0 != 0); break;
1759 }
1760 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1761 !listAddNodeHead(server.objfreelist,o))
1762 zfree(o);
1763 }
1764 }
1765
1766 static robj *lookupKey(redisDb *db, robj *key) {
1767 dictEntry *de = dictFind(db->dict,key);
1768 return de ? dictGetEntryVal(de) : NULL;
1769 }
1770
1771 static robj *lookupKeyRead(redisDb *db, robj *key) {
1772 expireIfNeeded(db,key);
1773 return lookupKey(db,key);
1774 }
1775
1776 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1777 deleteIfVolatile(db,key);
1778 return lookupKey(db,key);
1779 }
1780
1781 static int deleteKey(redisDb *db, robj *key) {
1782 int retval;
1783
1784 /* We need to protect key from destruction: after the first dictDelete()
1785 * it may happen that 'key' is no longer valid if we don't increment
1786 * it's count. This may happen when we get the object reference directly
1787 * from the hash table with dictRandomKey() or dict iterators */
1788 incrRefCount(key);
1789 if (dictSize(db->expires)) dictDelete(db->expires,key);
1790 retval = dictDelete(db->dict,key);
1791 decrRefCount(key);
1792
1793 return retval == DICT_OK;
1794 }
1795
1796 /* Try to share an object against the shared objects pool */
1797 static robj *tryObjectSharing(robj *o) {
1798 struct dictEntry *de;
1799 unsigned long c;
1800
1801 if (o == NULL || server.shareobjects == 0) return o;
1802
1803 assert(o->type == REDIS_STRING);
1804 de = dictFind(server.sharingpool,o);
1805 if (de) {
1806 robj *shared = dictGetEntryKey(de);
1807
1808 c = ((unsigned long) dictGetEntryVal(de))+1;
1809 dictGetEntryVal(de) = (void*) c;
1810 incrRefCount(shared);
1811 decrRefCount(o);
1812 return shared;
1813 } else {
1814 /* Here we are using a stream algorihtm: Every time an object is
1815 * shared we increment its count, everytime there is a miss we
1816 * recrement the counter of a random object. If this object reaches
1817 * zero we remove the object and put the current object instead. */
1818 if (dictSize(server.sharingpool) >=
1819 server.sharingpoolsize) {
1820 de = dictGetRandomKey(server.sharingpool);
1821 assert(de != NULL);
1822 c = ((unsigned long) dictGetEntryVal(de))-1;
1823 dictGetEntryVal(de) = (void*) c;
1824 if (c == 0) {
1825 dictDelete(server.sharingpool,de->key);
1826 }
1827 } else {
1828 c = 0; /* If the pool is empty we want to add this object */
1829 }
1830 if (c == 0) {
1831 int retval;
1832
1833 retval = dictAdd(server.sharingpool,o,(void*)1);
1834 assert(retval == DICT_OK);
1835 incrRefCount(o);
1836 }
1837 return o;
1838 }
1839 }
1840
1841 /* Check if the nul-terminated string 's' can be represented by a long
1842 * (that is, is a number that fits into long without any other space or
1843 * character before or after the digits).
1844 *
1845 * If so, the function returns REDIS_OK and *longval is set to the value
1846 * of the number. Otherwise REDIS_ERR is returned */
1847 static int isStringRepresentableAsLong(char *s, long *longval) {
1848 char buf[32], *endptr;
1849 long value;
1850 int slen;
1851
1852 value = strtol(s, &endptr, 10);
1853 if (endptr[0] != '\0') return REDIS_ERR;
1854 slen = snprintf(buf,32,"%ld",value);
1855
1856 /* If the number converted back into a string is not identical
1857 * then it's not possible to encode the string as integer */
1858 if (strlen(buf) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
1859 if (longval) *longval = value;
1860 return REDIS_OK;
1861 }
1862
1863 /* Try to encode a string object in order to save space */
1864 static int tryObjectEncoding(robj *o) {
1865 long value;
1866 sds s = o->ptr;
1867
1868 if (o->encoding != REDIS_ENCODING_RAW)
1869 return REDIS_ERR; /* Already encoded */
1870
1871 /* It's not save to encode shared objects: shared objects can be shared
1872 * everywhere in the "object space" of Redis. Encoded objects can only
1873 * appear as "values" (and not, for instance, as keys) */
1874 if (o->refcount > 1) return REDIS_ERR;
1875
1876 /* Currently we try to encode only strings */
1877 assert(o->type == REDIS_STRING);
1878
1879 /* Check if we can represent this string as a long integer */
1880 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
1881
1882 /* Ok, this object can be encoded */
1883 o->encoding = REDIS_ENCODING_INT;
1884 sdsfree(o->ptr);
1885 o->ptr = (void*) value;
1886 return REDIS_OK;
1887 }
1888
1889 /* Get a decoded version of an encoded object (returned as a new object) */
1890 static robj *getDecodedObject(const robj *o) {
1891 robj *dec;
1892
1893 assert(o->encoding != REDIS_ENCODING_RAW);
1894 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
1895 char buf[32];
1896
1897 snprintf(buf,32,"%ld",(long)o->ptr);
1898 dec = createStringObject(buf,strlen(buf));
1899 return dec;
1900 } else {
1901 assert(1 != 1);
1902 }
1903 }
1904
1905 static int compareStringObjects(robj *a, robj *b) {
1906 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
1907
1908 if (a->encoding == REDIS_ENCODING_INT && b->encoding == REDIS_ENCODING_INT){
1909 return (long)a->ptr - (long)b->ptr;
1910 } else {
1911 int retval;
1912
1913 incrRefCount(a);
1914 incrRefCount(b);
1915 if (a->encoding != REDIS_ENCODING_RAW) a = getDecodedObject(a);
1916 if (b->encoding != REDIS_ENCODING_RAW) b = getDecodedObject(a);
1917 retval = sdscmp(a->ptr,b->ptr);
1918 decrRefCount(a);
1919 decrRefCount(b);
1920 return retval;
1921 }
1922 }
1923
1924 static size_t stringObjectLen(robj *o) {
1925 assert(o->type == REDIS_STRING);
1926 if (o->encoding == REDIS_ENCODING_RAW) {
1927 return sdslen(o->ptr);
1928 } else {
1929 char buf[32];
1930
1931 return snprintf(buf,32,"%ld",(long)o->ptr);
1932 }
1933 }
1934
1935 /*============================ DB saving/loading ============================ */
1936
1937 static int rdbSaveType(FILE *fp, unsigned char type) {
1938 if (fwrite(&type,1,1,fp) == 0) return -1;
1939 return 0;
1940 }
1941
1942 static int rdbSaveTime(FILE *fp, time_t t) {
1943 int32_t t32 = (int32_t) t;
1944 if (fwrite(&t32,4,1,fp) == 0) return -1;
1945 return 0;
1946 }
1947
1948 /* check rdbLoadLen() comments for more info */
1949 static int rdbSaveLen(FILE *fp, uint32_t len) {
1950 unsigned char buf[2];
1951
1952 if (len < (1<<6)) {
1953 /* Save a 6 bit len */
1954 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
1955 if (fwrite(buf,1,1,fp) == 0) return -1;
1956 } else if (len < (1<<14)) {
1957 /* Save a 14 bit len */
1958 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
1959 buf[1] = len&0xFF;
1960 if (fwrite(buf,2,1,fp) == 0) return -1;
1961 } else {
1962 /* Save a 32 bit len */
1963 buf[0] = (REDIS_RDB_32BITLEN<<6);
1964 if (fwrite(buf,1,1,fp) == 0) return -1;
1965 len = htonl(len);
1966 if (fwrite(&len,4,1,fp) == 0) return -1;
1967 }
1968 return 0;
1969 }
1970
1971 /* String objects in the form "2391" "-100" without any space and with a
1972 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1973 * encoded as integers to save space */
1974 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
1975 long long value;
1976 char *endptr, buf[32];
1977
1978 /* Check if it's possible to encode this value as a number */
1979 value = strtoll(s, &endptr, 10);
1980 if (endptr[0] != '\0') return 0;
1981 snprintf(buf,32,"%lld",value);
1982
1983 /* If the number converted back into a string is not identical
1984 * then it's not possible to encode the string as integer */
1985 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
1986
1987 /* Finally check if it fits in our ranges */
1988 if (value >= -(1<<7) && value <= (1<<7)-1) {
1989 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
1990 enc[1] = value&0xFF;
1991 return 2;
1992 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
1993 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
1994 enc[1] = value&0xFF;
1995 enc[2] = (value>>8)&0xFF;
1996 return 3;
1997 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
1998 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
1999 enc[1] = value&0xFF;
2000 enc[2] = (value>>8)&0xFF;
2001 enc[3] = (value>>16)&0xFF;
2002 enc[4] = (value>>24)&0xFF;
2003 return 5;
2004 } else {
2005 return 0;
2006 }
2007 }
2008
2009 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2010 unsigned int comprlen, outlen;
2011 unsigned char byte;
2012 void *out;
2013
2014 /* We require at least four bytes compression for this to be worth it */
2015 outlen = sdslen(obj->ptr)-4;
2016 if (outlen <= 0) return 0;
2017 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2018 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2019 if (comprlen == 0) {
2020 zfree(out);
2021 return 0;
2022 }
2023 /* Data compressed! Let's save it on disk */
2024 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2025 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2026 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2027 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2028 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2029 zfree(out);
2030 return comprlen;
2031
2032 writeerr:
2033 zfree(out);
2034 return -1;
2035 }
2036
2037 /* Save a string objet as [len][data] on disk. If the object is a string
2038 * representation of an integer value we try to safe it in a special form */
2039 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2040 size_t len;
2041 int enclen;
2042
2043 len = sdslen(obj->ptr);
2044
2045 /* Try integer encoding */
2046 if (len <= 11) {
2047 unsigned char buf[5];
2048 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2049 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2050 return 0;
2051 }
2052 }
2053
2054 /* Try LZF compression - under 20 bytes it's unable to compress even
2055 * aaaaaaaaaaaaaaaaaa so skip it */
2056 if (len > 20) {
2057 int retval;
2058
2059 retval = rdbSaveLzfStringObject(fp,obj);
2060 if (retval == -1) return -1;
2061 if (retval > 0) return 0;
2062 /* retval == 0 means data can't be compressed, save the old way */
2063 }
2064
2065 /* Store verbatim */
2066 if (rdbSaveLen(fp,len) == -1) return -1;
2067 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2068 return 0;
2069 }
2070
2071 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2072 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2073 int retval;
2074 robj *dec;
2075
2076 if (obj->encoding != REDIS_ENCODING_RAW) {
2077 dec = getDecodedObject(obj);
2078 retval = rdbSaveStringObjectRaw(fp,dec);
2079 decrRefCount(dec);
2080 return retval;
2081 } else {
2082 return rdbSaveStringObjectRaw(fp,obj);
2083 }
2084 }
2085
2086 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2087 static int rdbSave(char *filename) {
2088 dictIterator *di = NULL;
2089 dictEntry *de;
2090 FILE *fp;
2091 char tmpfile[256];
2092 int j;
2093 time_t now = time(NULL);
2094
2095 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2096 fp = fopen(tmpfile,"w");
2097 if (!fp) {
2098 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2099 return REDIS_ERR;
2100 }
2101 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2102 for (j = 0; j < server.dbnum; j++) {
2103 redisDb *db = server.db+j;
2104 dict *d = db->dict;
2105 if (dictSize(d) == 0) continue;
2106 di = dictGetIterator(d);
2107 if (!di) {
2108 fclose(fp);
2109 return REDIS_ERR;
2110 }
2111
2112 /* Write the SELECT DB opcode */
2113 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2114 if (rdbSaveLen(fp,j) == -1) goto werr;
2115
2116 /* Iterate this DB writing every entry */
2117 while((de = dictNext(di)) != NULL) {
2118 robj *key = dictGetEntryKey(de);
2119 robj *o = dictGetEntryVal(de);
2120 time_t expiretime = getExpire(db,key);
2121
2122 /* Save the expire time */
2123 if (expiretime != -1) {
2124 /* If this key is already expired skip it */
2125 if (expiretime < now) continue;
2126 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2127 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2128 }
2129 /* Save the key and associated value */
2130 if (rdbSaveType(fp,o->type) == -1) goto werr;
2131 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2132 if (o->type == REDIS_STRING) {
2133 /* Save a string value */
2134 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2135 } else if (o->type == REDIS_LIST) {
2136 /* Save a list value */
2137 list *list = o->ptr;
2138 listNode *ln;
2139
2140 listRewind(list);
2141 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2142 while((ln = listYield(list))) {
2143 robj *eleobj = listNodeValue(ln);
2144
2145 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2146 }
2147 } else if (o->type == REDIS_SET) {
2148 /* Save a set value */
2149 dict *set = o->ptr;
2150 dictIterator *di = dictGetIterator(set);
2151 dictEntry *de;
2152
2153 if (!set) oom("dictGetIteraotr");
2154 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2155 while((de = dictNext(di)) != NULL) {
2156 robj *eleobj = dictGetEntryKey(de);
2157
2158 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2159 }
2160 dictReleaseIterator(di);
2161 } else {
2162 assert(0 != 0);
2163 }
2164 }
2165 dictReleaseIterator(di);
2166 }
2167 /* EOF opcode */
2168 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2169
2170 /* Make sure data will not remain on the OS's output buffers */
2171 fflush(fp);
2172 fsync(fileno(fp));
2173 fclose(fp);
2174
2175 /* Use RENAME to make sure the DB file is changed atomically only
2176 * if the generate DB file is ok. */
2177 if (rename(tmpfile,filename) == -1) {
2178 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
2179 unlink(tmpfile);
2180 return REDIS_ERR;
2181 }
2182 redisLog(REDIS_NOTICE,"DB saved on disk");
2183 server.dirty = 0;
2184 server.lastsave = time(NULL);
2185 return REDIS_OK;
2186
2187 werr:
2188 fclose(fp);
2189 unlink(tmpfile);
2190 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2191 if (di) dictReleaseIterator(di);
2192 return REDIS_ERR;
2193 }
2194
2195 static int rdbSaveBackground(char *filename) {
2196 pid_t childpid;
2197
2198 if (server.bgsaveinprogress) return REDIS_ERR;
2199 if ((childpid = fork()) == 0) {
2200 /* Child */
2201 close(server.fd);
2202 if (rdbSave(filename) == REDIS_OK) {
2203 exit(0);
2204 } else {
2205 exit(1);
2206 }
2207 } else {
2208 /* Parent */
2209 if (childpid == -1) {
2210 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2211 strerror(errno));
2212 return REDIS_ERR;
2213 }
2214 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2215 server.bgsaveinprogress = 1;
2216 server.bgsavechildpid = childpid;
2217 return REDIS_OK;
2218 }
2219 return REDIS_OK; /* unreached */
2220 }
2221
2222 static void rdbRemoveTempFile(pid_t childpid) {
2223 char tmpfile[256];
2224
2225 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2226 unlink(tmpfile);
2227 }
2228
2229 static int rdbLoadType(FILE *fp) {
2230 unsigned char type;
2231 if (fread(&type,1,1,fp) == 0) return -1;
2232 return type;
2233 }
2234
2235 static time_t rdbLoadTime(FILE *fp) {
2236 int32_t t32;
2237 if (fread(&t32,4,1,fp) == 0) return -1;
2238 return (time_t) t32;
2239 }
2240
2241 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2242 * of this file for a description of how this are stored on disk.
2243 *
2244 * isencoded is set to 1 if the readed length is not actually a length but
2245 * an "encoding type", check the above comments for more info */
2246 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2247 unsigned char buf[2];
2248 uint32_t len;
2249
2250 if (isencoded) *isencoded = 0;
2251 if (rdbver == 0) {
2252 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2253 return ntohl(len);
2254 } else {
2255 int type;
2256
2257 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2258 type = (buf[0]&0xC0)>>6;
2259 if (type == REDIS_RDB_6BITLEN) {
2260 /* Read a 6 bit len */
2261 return buf[0]&0x3F;
2262 } else if (type == REDIS_RDB_ENCVAL) {
2263 /* Read a 6 bit len encoding type */
2264 if (isencoded) *isencoded = 1;
2265 return buf[0]&0x3F;
2266 } else if (type == REDIS_RDB_14BITLEN) {
2267 /* Read a 14 bit len */
2268 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2269 return ((buf[0]&0x3F)<<8)|buf[1];
2270 } else {
2271 /* Read a 32 bit len */
2272 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2273 return ntohl(len);
2274 }
2275 }
2276 }
2277
2278 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2279 unsigned char enc[4];
2280 long long val;
2281
2282 if (enctype == REDIS_RDB_ENC_INT8) {
2283 if (fread(enc,1,1,fp) == 0) return NULL;
2284 val = (signed char)enc[0];
2285 } else if (enctype == REDIS_RDB_ENC_INT16) {
2286 uint16_t v;
2287 if (fread(enc,2,1,fp) == 0) return NULL;
2288 v = enc[0]|(enc[1]<<8);
2289 val = (int16_t)v;
2290 } else if (enctype == REDIS_RDB_ENC_INT32) {
2291 uint32_t v;
2292 if (fread(enc,4,1,fp) == 0) return NULL;
2293 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2294 val = (int32_t)v;
2295 } else {
2296 val = 0; /* anti-warning */
2297 assert(0!=0);
2298 }
2299 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2300 }
2301
2302 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2303 unsigned int len, clen;
2304 unsigned char *c = NULL;
2305 sds val = NULL;
2306
2307 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2308 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2309 if ((c = zmalloc(clen)) == NULL) goto err;
2310 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2311 if (fread(c,clen,1,fp) == 0) goto err;
2312 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2313 zfree(c);
2314 return createObject(REDIS_STRING,val);
2315 err:
2316 zfree(c);
2317 sdsfree(val);
2318 return NULL;
2319 }
2320
2321 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2322 int isencoded;
2323 uint32_t len;
2324 sds val;
2325
2326 len = rdbLoadLen(fp,rdbver,&isencoded);
2327 if (isencoded) {
2328 switch(len) {
2329 case REDIS_RDB_ENC_INT8:
2330 case REDIS_RDB_ENC_INT16:
2331 case REDIS_RDB_ENC_INT32:
2332 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2333 case REDIS_RDB_ENC_LZF:
2334 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2335 default:
2336 assert(0!=0);
2337 }
2338 }
2339
2340 if (len == REDIS_RDB_LENERR) return NULL;
2341 val = sdsnewlen(NULL,len);
2342 if (len && fread(val,len,1,fp) == 0) {
2343 sdsfree(val);
2344 return NULL;
2345 }
2346 return tryObjectSharing(createObject(REDIS_STRING,val));
2347 }
2348
2349 static int rdbLoad(char *filename) {
2350 FILE *fp;
2351 robj *keyobj = NULL;
2352 uint32_t dbid;
2353 int type, retval, rdbver;
2354 dict *d = server.db[0].dict;
2355 redisDb *db = server.db+0;
2356 char buf[1024];
2357 time_t expiretime = -1, now = time(NULL);
2358
2359 fp = fopen(filename,"r");
2360 if (!fp) return REDIS_ERR;
2361 if (fread(buf,9,1,fp) == 0) goto eoferr;
2362 buf[9] = '\0';
2363 if (memcmp(buf,"REDIS",5) != 0) {
2364 fclose(fp);
2365 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2366 return REDIS_ERR;
2367 }
2368 rdbver = atoi(buf+5);
2369 if (rdbver > 1) {
2370 fclose(fp);
2371 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2372 return REDIS_ERR;
2373 }
2374 while(1) {
2375 robj *o;
2376
2377 /* Read type. */
2378 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2379 if (type == REDIS_EXPIRETIME) {
2380 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2381 /* We read the time so we need to read the object type again */
2382 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2383 }
2384 if (type == REDIS_EOF) break;
2385 /* Handle SELECT DB opcode as a special case */
2386 if (type == REDIS_SELECTDB) {
2387 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2388 goto eoferr;
2389 if (dbid >= (unsigned)server.dbnum) {
2390 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2391 exit(1);
2392 }
2393 db = server.db+dbid;
2394 d = db->dict;
2395 continue;
2396 }
2397 /* Read key */
2398 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2399
2400 if (type == REDIS_STRING) {
2401 /* Read string value */
2402 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2403 tryObjectEncoding(o);
2404 } else if (type == REDIS_LIST || type == REDIS_SET) {
2405 /* Read list/set value */
2406 uint32_t listlen;
2407
2408 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2409 goto eoferr;
2410 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2411 /* Load every single element of the list/set */
2412 while(listlen--) {
2413 robj *ele;
2414
2415 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2416 tryObjectEncoding(ele);
2417 if (type == REDIS_LIST) {
2418 if (!listAddNodeTail((list*)o->ptr,ele))
2419 oom("listAddNodeTail");
2420 } else {
2421 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2422 oom("dictAdd");
2423 }
2424 }
2425 } else {
2426 assert(0 != 0);
2427 }
2428 /* Add the new object in the hash table */
2429 retval = dictAdd(d,keyobj,o);
2430 if (retval == DICT_ERR) {
2431 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2432 exit(1);
2433 }
2434 /* Set the expire time if needed */
2435 if (expiretime != -1) {
2436 setExpire(db,keyobj,expiretime);
2437 /* Delete this key if already expired */
2438 if (expiretime < now) deleteKey(db,keyobj);
2439 expiretime = -1;
2440 }
2441 keyobj = o = NULL;
2442 }
2443 fclose(fp);
2444 return REDIS_OK;
2445
2446 eoferr: /* unexpected end of file is handled here with a fatal exit */
2447 if (keyobj) decrRefCount(keyobj);
2448 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2449 exit(1);
2450 return REDIS_ERR; /* Just to avoid warning */
2451 }
2452
2453 /*================================== Commands =============================== */
2454
2455 static void authCommand(redisClient *c) {
2456 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2457 c->authenticated = 1;
2458 addReply(c,shared.ok);
2459 } else {
2460 c->authenticated = 0;
2461 addReply(c,shared.err);
2462 }
2463 }
2464
2465 static void pingCommand(redisClient *c) {
2466 addReply(c,shared.pong);
2467 }
2468
2469 static void echoCommand(redisClient *c) {
2470 addReplyBulkLen(c,c->argv[1]);
2471 addReply(c,c->argv[1]);
2472 addReply(c,shared.crlf);
2473 }
2474
2475 /*=================================== Strings =============================== */
2476
2477 static void setGenericCommand(redisClient *c, int nx) {
2478 int retval;
2479
2480 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2481 if (retval == DICT_ERR) {
2482 if (!nx) {
2483 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2484 incrRefCount(c->argv[2]);
2485 } else {
2486 addReply(c,shared.czero);
2487 return;
2488 }
2489 } else {
2490 incrRefCount(c->argv[1]);
2491 incrRefCount(c->argv[2]);
2492 }
2493 server.dirty++;
2494 removeExpire(c->db,c->argv[1]);
2495 addReply(c, nx ? shared.cone : shared.ok);
2496 }
2497
2498 static void setCommand(redisClient *c) {
2499 setGenericCommand(c,0);
2500 }
2501
2502 static void setnxCommand(redisClient *c) {
2503 setGenericCommand(c,1);
2504 }
2505
2506 static void getCommand(redisClient *c) {
2507 robj *o = lookupKeyRead(c->db,c->argv[1]);
2508
2509 if (o == NULL) {
2510 addReply(c,shared.nullbulk);
2511 } else {
2512 if (o->type != REDIS_STRING) {
2513 addReply(c,shared.wrongtypeerr);
2514 } else {
2515 addReplyBulkLen(c,o);
2516 addReply(c,o);
2517 addReply(c,shared.crlf);
2518 }
2519 }
2520 }
2521
2522 static void getSetCommand(redisClient *c) {
2523 getCommand(c);
2524 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2525 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2526 } else {
2527 incrRefCount(c->argv[1]);
2528 }
2529 incrRefCount(c->argv[2]);
2530 server.dirty++;
2531 removeExpire(c->db,c->argv[1]);
2532 }
2533
2534 static void mgetCommand(redisClient *c) {
2535 int j;
2536
2537 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2538 for (j = 1; j < c->argc; j++) {
2539 robj *o = lookupKeyRead(c->db,c->argv[j]);
2540 if (o == NULL) {
2541 addReply(c,shared.nullbulk);
2542 } else {
2543 if (o->type != REDIS_STRING) {
2544 addReply(c,shared.nullbulk);
2545 } else {
2546 addReplyBulkLen(c,o);
2547 addReply(c,o);
2548 addReply(c,shared.crlf);
2549 }
2550 }
2551 }
2552 }
2553
2554 static void incrDecrCommand(redisClient *c, long long incr) {
2555 long long value;
2556 int retval;
2557 robj *o;
2558
2559 o = lookupKeyWrite(c->db,c->argv[1]);
2560 if (o == NULL) {
2561 value = 0;
2562 } else {
2563 if (o->type != REDIS_STRING) {
2564 value = 0;
2565 } else {
2566 char *eptr;
2567
2568 if (o->encoding == REDIS_ENCODING_RAW)
2569 value = strtoll(o->ptr, &eptr, 10);
2570 else if (o->encoding == REDIS_ENCODING_INT)
2571 value = (long)o->ptr;
2572 else
2573 assert(1 != 1);
2574 }
2575 }
2576
2577 value += incr;
2578 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2579 tryObjectEncoding(o);
2580 retval = dictAdd(c->db->dict,c->argv[1],o);
2581 if (retval == DICT_ERR) {
2582 dictReplace(c->db->dict,c->argv[1],o);
2583 removeExpire(c->db,c->argv[1]);
2584 } else {
2585 incrRefCount(c->argv[1]);
2586 }
2587 server.dirty++;
2588 addReply(c,shared.colon);
2589 addReply(c,o);
2590 addReply(c,shared.crlf);
2591 }
2592
2593 static void incrCommand(redisClient *c) {
2594 incrDecrCommand(c,1);
2595 }
2596
2597 static void decrCommand(redisClient *c) {
2598 incrDecrCommand(c,-1);
2599 }
2600
2601 static void incrbyCommand(redisClient *c) {
2602 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2603 incrDecrCommand(c,incr);
2604 }
2605
2606 static void decrbyCommand(redisClient *c) {
2607 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2608 incrDecrCommand(c,-incr);
2609 }
2610
2611 /* ========================= Type agnostic commands ========================= */
2612
2613 static void delCommand(redisClient *c) {
2614 int deleted = 0, j;
2615
2616 for (j = 1; j < c->argc; j++) {
2617 if (deleteKey(c->db,c->argv[j])) {
2618 server.dirty++;
2619 deleted++;
2620 }
2621 }
2622 switch(deleted) {
2623 case 0:
2624 addReply(c,shared.czero);
2625 break;
2626 case 1:
2627 addReply(c,shared.cone);
2628 break;
2629 default:
2630 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2631 break;
2632 }
2633 }
2634
2635 static void existsCommand(redisClient *c) {
2636 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2637 }
2638
2639 static void selectCommand(redisClient *c) {
2640 int id = atoi(c->argv[1]->ptr);
2641
2642 if (selectDb(c,id) == REDIS_ERR) {
2643 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2644 } else {
2645 addReply(c,shared.ok);
2646 }
2647 }
2648
2649 static void randomkeyCommand(redisClient *c) {
2650 dictEntry *de;
2651
2652 while(1) {
2653 de = dictGetRandomKey(c->db->dict);
2654 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2655 }
2656 if (de == NULL) {
2657 addReply(c,shared.plus);
2658 addReply(c,shared.crlf);
2659 } else {
2660 addReply(c,shared.plus);
2661 addReply(c,dictGetEntryKey(de));
2662 addReply(c,shared.crlf);
2663 }
2664 }
2665
2666 static void keysCommand(redisClient *c) {
2667 dictIterator *di;
2668 dictEntry *de;
2669 sds pattern = c->argv[1]->ptr;
2670 int plen = sdslen(pattern);
2671 int numkeys = 0, keyslen = 0;
2672 robj *lenobj = createObject(REDIS_STRING,NULL);
2673
2674 di = dictGetIterator(c->db->dict);
2675 if (!di) oom("dictGetIterator");
2676 addReply(c,lenobj);
2677 decrRefCount(lenobj);
2678 while((de = dictNext(di)) != NULL) {
2679 robj *keyobj = dictGetEntryKey(de);
2680
2681 sds key = keyobj->ptr;
2682 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2683 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2684 if (expireIfNeeded(c->db,keyobj) == 0) {
2685 if (numkeys != 0)
2686 addReply(c,shared.space);
2687 addReply(c,keyobj);
2688 numkeys++;
2689 keyslen += sdslen(key);
2690 }
2691 }
2692 }
2693 dictReleaseIterator(di);
2694 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2695 addReply(c,shared.crlf);
2696 }
2697
2698 static void dbsizeCommand(redisClient *c) {
2699 addReplySds(c,
2700 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2701 }
2702
2703 static void lastsaveCommand(redisClient *c) {
2704 addReplySds(c,
2705 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2706 }
2707
2708 static void typeCommand(redisClient *c) {
2709 robj *o;
2710 char *type;
2711
2712 o = lookupKeyRead(c->db,c->argv[1]);
2713 if (o == NULL) {
2714 type = "+none";
2715 } else {
2716 switch(o->type) {
2717 case REDIS_STRING: type = "+string"; break;
2718 case REDIS_LIST: type = "+list"; break;
2719 case REDIS_SET: type = "+set"; break;
2720 default: type = "unknown"; break;
2721 }
2722 }
2723 addReplySds(c,sdsnew(type));
2724 addReply(c,shared.crlf);
2725 }
2726
2727 static void saveCommand(redisClient *c) {
2728 if (server.bgsaveinprogress) {
2729 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2730 return;
2731 }
2732 if (rdbSave(server.dbfilename) == REDIS_OK) {
2733 addReply(c,shared.ok);
2734 } else {
2735 addReply(c,shared.err);
2736 }
2737 }
2738
2739 static void bgsaveCommand(redisClient *c) {
2740 if (server.bgsaveinprogress) {
2741 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2742 return;
2743 }
2744 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2745 addReply(c,shared.ok);
2746 } else {
2747 addReply(c,shared.err);
2748 }
2749 }
2750
2751 static void shutdownCommand(redisClient *c) {
2752 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2753 /* Kill the saving child if there is a background saving in progress.
2754 We want to avoid race conditions, for instance our saving child may
2755 overwrite the synchronous saving did by SHUTDOWN. */
2756 if (server.bgsaveinprogress) {
2757 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2758 kill(server.bgsavechildpid,SIGKILL);
2759 rdbRemoveTempFile(server.bgsavechildpid);
2760 }
2761 /* SYNC SAVE */
2762 if (rdbSave(server.dbfilename) == REDIS_OK) {
2763 if (server.daemonize)
2764 unlink(server.pidfile);
2765 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2766 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2767 exit(1);
2768 } else {
2769 /* Ooops.. error saving! The best we can do is to continue operating.
2770 * Note that if there was a background saving process, in the next
2771 * cron() Redis will be notified that the background saving aborted,
2772 * handling special stuff like slaves pending for synchronization... */
2773 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2774 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2775 }
2776 }
2777
2778 static void renameGenericCommand(redisClient *c, int nx) {
2779 robj *o;
2780
2781 /* To use the same key as src and dst is probably an error */
2782 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2783 addReply(c,shared.sameobjecterr);
2784 return;
2785 }
2786
2787 o = lookupKeyWrite(c->db,c->argv[1]);
2788 if (o == NULL) {
2789 addReply(c,shared.nokeyerr);
2790 return;
2791 }
2792 incrRefCount(o);
2793 deleteIfVolatile(c->db,c->argv[2]);
2794 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2795 if (nx) {
2796 decrRefCount(o);
2797 addReply(c,shared.czero);
2798 return;
2799 }
2800 dictReplace(c->db->dict,c->argv[2],o);
2801 } else {
2802 incrRefCount(c->argv[2]);
2803 }
2804 deleteKey(c->db,c->argv[1]);
2805 server.dirty++;
2806 addReply(c,nx ? shared.cone : shared.ok);
2807 }
2808
2809 static void renameCommand(redisClient *c) {
2810 renameGenericCommand(c,0);
2811 }
2812
2813 static void renamenxCommand(redisClient *c) {
2814 renameGenericCommand(c,1);
2815 }
2816
2817 static void moveCommand(redisClient *c) {
2818 robj *o;
2819 redisDb *src, *dst;
2820 int srcid;
2821
2822 /* Obtain source and target DB pointers */
2823 src = c->db;
2824 srcid = c->db->id;
2825 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2826 addReply(c,shared.outofrangeerr);
2827 return;
2828 }
2829 dst = c->db;
2830 selectDb(c,srcid); /* Back to the source DB */
2831
2832 /* If the user is moving using as target the same
2833 * DB as the source DB it is probably an error. */
2834 if (src == dst) {
2835 addReply(c,shared.sameobjecterr);
2836 return;
2837 }
2838
2839 /* Check if the element exists and get a reference */
2840 o = lookupKeyWrite(c->db,c->argv[1]);
2841 if (!o) {
2842 addReply(c,shared.czero);
2843 return;
2844 }
2845
2846 /* Try to add the element to the target DB */
2847 deleteIfVolatile(dst,c->argv[1]);
2848 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2849 addReply(c,shared.czero);
2850 return;
2851 }
2852 incrRefCount(c->argv[1]);
2853 incrRefCount(o);
2854
2855 /* OK! key moved, free the entry in the source DB */
2856 deleteKey(src,c->argv[1]);
2857 server.dirty++;
2858 addReply(c,shared.cone);
2859 }
2860
2861 /* =================================== Lists ================================ */
2862 static void pushGenericCommand(redisClient *c, int where) {
2863 robj *lobj;
2864 list *list;
2865
2866 lobj = lookupKeyWrite(c->db,c->argv[1]);
2867 if (lobj == NULL) {
2868 lobj = createListObject();
2869 list = lobj->ptr;
2870 if (where == REDIS_HEAD) {
2871 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2872 } else {
2873 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2874 }
2875 dictAdd(c->db->dict,c->argv[1],lobj);
2876 incrRefCount(c->argv[1]);
2877 incrRefCount(c->argv[2]);
2878 } else {
2879 if (lobj->type != REDIS_LIST) {
2880 addReply(c,shared.wrongtypeerr);
2881 return;
2882 }
2883 list = lobj->ptr;
2884 if (where == REDIS_HEAD) {
2885 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2886 } else {
2887 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2888 }
2889 incrRefCount(c->argv[2]);
2890 }
2891 server.dirty++;
2892 addReply(c,shared.ok);
2893 }
2894
2895 static void lpushCommand(redisClient *c) {
2896 pushGenericCommand(c,REDIS_HEAD);
2897 }
2898
2899 static void rpushCommand(redisClient *c) {
2900 pushGenericCommand(c,REDIS_TAIL);
2901 }
2902
2903 static void llenCommand(redisClient *c) {
2904 robj *o;
2905 list *l;
2906
2907 o = lookupKeyRead(c->db,c->argv[1]);
2908 if (o == NULL) {
2909 addReply(c,shared.czero);
2910 return;
2911 } else {
2912 if (o->type != REDIS_LIST) {
2913 addReply(c,shared.wrongtypeerr);
2914 } else {
2915 l = o->ptr;
2916 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
2917 }
2918 }
2919 }
2920
2921 static void lindexCommand(redisClient *c) {
2922 robj *o;
2923 int index = atoi(c->argv[2]->ptr);
2924
2925 o = lookupKeyRead(c->db,c->argv[1]);
2926 if (o == NULL) {
2927 addReply(c,shared.nullbulk);
2928 } else {
2929 if (o->type != REDIS_LIST) {
2930 addReply(c,shared.wrongtypeerr);
2931 } else {
2932 list *list = o->ptr;
2933 listNode *ln;
2934
2935 ln = listIndex(list, index);
2936 if (ln == NULL) {
2937 addReply(c,shared.nullbulk);
2938 } else {
2939 robj *ele = listNodeValue(ln);
2940 addReplyBulkLen(c,ele);
2941 addReply(c,ele);
2942 addReply(c,shared.crlf);
2943 }
2944 }
2945 }
2946 }
2947
2948 static void lsetCommand(redisClient *c) {
2949 robj *o;
2950 int index = atoi(c->argv[2]->ptr);
2951
2952 o = lookupKeyWrite(c->db,c->argv[1]);
2953 if (o == NULL) {
2954 addReply(c,shared.nokeyerr);
2955 } else {
2956 if (o->type != REDIS_LIST) {
2957 addReply(c,shared.wrongtypeerr);
2958 } else {
2959 list *list = o->ptr;
2960 listNode *ln;
2961
2962 ln = listIndex(list, index);
2963 if (ln == NULL) {
2964 addReply(c,shared.outofrangeerr);
2965 } else {
2966 robj *ele = listNodeValue(ln);
2967
2968 decrRefCount(ele);
2969 listNodeValue(ln) = c->argv[3];
2970 incrRefCount(c->argv[3]);
2971 addReply(c,shared.ok);
2972 server.dirty++;
2973 }
2974 }
2975 }
2976 }
2977
2978 static void popGenericCommand(redisClient *c, int where) {
2979 robj *o;
2980
2981 o = lookupKeyWrite(c->db,c->argv[1]);
2982 if (o == NULL) {
2983 addReply(c,shared.nullbulk);
2984 } else {
2985 if (o->type != REDIS_LIST) {
2986 addReply(c,shared.wrongtypeerr);
2987 } else {
2988 list *list = o->ptr;
2989 listNode *ln;
2990
2991 if (where == REDIS_HEAD)
2992 ln = listFirst(list);
2993 else
2994 ln = listLast(list);
2995
2996 if (ln == NULL) {
2997 addReply(c,shared.nullbulk);
2998 } else {
2999 robj *ele = listNodeValue(ln);
3000 addReplyBulkLen(c,ele);
3001 addReply(c,ele);
3002 addReply(c,shared.crlf);
3003 listDelNode(list,ln);
3004 server.dirty++;
3005 }
3006 }
3007 }
3008 }
3009
3010 static void lpopCommand(redisClient *c) {
3011 popGenericCommand(c,REDIS_HEAD);
3012 }
3013
3014 static void rpopCommand(redisClient *c) {
3015 popGenericCommand(c,REDIS_TAIL);
3016 }
3017
3018 static void lrangeCommand(redisClient *c) {
3019 robj *o;
3020 int start = atoi(c->argv[2]->ptr);
3021 int end = atoi(c->argv[3]->ptr);
3022
3023 o = lookupKeyRead(c->db,c->argv[1]);
3024 if (o == NULL) {
3025 addReply(c,shared.nullmultibulk);
3026 } else {
3027 if (o->type != REDIS_LIST) {
3028 addReply(c,shared.wrongtypeerr);
3029 } else {
3030 list *list = o->ptr;
3031 listNode *ln;
3032 int llen = listLength(list);
3033 int rangelen, j;
3034 robj *ele;
3035
3036 /* convert negative indexes */
3037 if (start < 0) start = llen+start;
3038 if (end < 0) end = llen+end;
3039 if (start < 0) start = 0;
3040 if (end < 0) end = 0;
3041
3042 /* indexes sanity checks */
3043 if (start > end || start >= llen) {
3044 /* Out of range start or start > end result in empty list */
3045 addReply(c,shared.emptymultibulk);
3046 return;
3047 }
3048 if (end >= llen) end = llen-1;
3049 rangelen = (end-start)+1;
3050
3051 /* Return the result in form of a multi-bulk reply */
3052 ln = listIndex(list, start);
3053 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3054 for (j = 0; j < rangelen; j++) {
3055 ele = listNodeValue(ln);
3056 addReplyBulkLen(c,ele);
3057 addReply(c,ele);
3058 addReply(c,shared.crlf);
3059 ln = ln->next;
3060 }
3061 }
3062 }
3063 }
3064
3065 static void ltrimCommand(redisClient *c) {
3066 robj *o;
3067 int start = atoi(c->argv[2]->ptr);
3068 int end = atoi(c->argv[3]->ptr);
3069
3070 o = lookupKeyWrite(c->db,c->argv[1]);
3071 if (o == NULL) {
3072 addReply(c,shared.nokeyerr);
3073 } else {
3074 if (o->type != REDIS_LIST) {
3075 addReply(c,shared.wrongtypeerr);
3076 } else {
3077 list *list = o->ptr;
3078 listNode *ln;
3079 int llen = listLength(list);
3080 int j, ltrim, rtrim;
3081
3082 /* convert negative indexes */
3083 if (start < 0) start = llen+start;
3084 if (end < 0) end = llen+end;
3085 if (start < 0) start = 0;
3086 if (end < 0) end = 0;
3087
3088 /* indexes sanity checks */
3089 if (start > end || start >= llen) {
3090 /* Out of range start or start > end result in empty list */
3091 ltrim = llen;
3092 rtrim = 0;
3093 } else {
3094 if (end >= llen) end = llen-1;
3095 ltrim = start;
3096 rtrim = llen-end-1;
3097 }
3098
3099 /* Remove list elements to perform the trim */
3100 for (j = 0; j < ltrim; j++) {
3101 ln = listFirst(list);
3102 listDelNode(list,ln);
3103 }
3104 for (j = 0; j < rtrim; j++) {
3105 ln = listLast(list);
3106 listDelNode(list,ln);
3107 }
3108 server.dirty++;
3109 addReply(c,shared.ok);
3110 }
3111 }
3112 }
3113
3114 static void lremCommand(redisClient *c) {
3115 robj *o;
3116
3117 o = lookupKeyWrite(c->db,c->argv[1]);
3118 if (o == NULL) {
3119 addReply(c,shared.czero);
3120 } else {
3121 if (o->type != REDIS_LIST) {
3122 addReply(c,shared.wrongtypeerr);
3123 } else {
3124 list *list = o->ptr;
3125 listNode *ln, *next;
3126 int toremove = atoi(c->argv[2]->ptr);
3127 int removed = 0;
3128 int fromtail = 0;
3129
3130 if (toremove < 0) {
3131 toremove = -toremove;
3132 fromtail = 1;
3133 }
3134 ln = fromtail ? list->tail : list->head;
3135 while (ln) {
3136 robj *ele = listNodeValue(ln);
3137
3138 next = fromtail ? ln->prev : ln->next;
3139 if (compareStringObjects(ele,c->argv[3]) == 0) {
3140 listDelNode(list,ln);
3141 server.dirty++;
3142 removed++;
3143 if (toremove && removed == toremove) break;
3144 }
3145 ln = next;
3146 }
3147 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3148 }
3149 }
3150 }
3151
3152 /* ==================================== Sets ================================ */
3153
3154 static void saddCommand(redisClient *c) {
3155 robj *set;
3156
3157 set = lookupKeyWrite(c->db,c->argv[1]);
3158 if (set == NULL) {
3159 set = createSetObject();
3160 dictAdd(c->db->dict,c->argv[1],set);
3161 incrRefCount(c->argv[1]);
3162 } else {
3163 if (set->type != REDIS_SET) {
3164 addReply(c,shared.wrongtypeerr);
3165 return;
3166 }
3167 }
3168 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3169 incrRefCount(c->argv[2]);
3170 server.dirty++;
3171 addReply(c,shared.cone);
3172 } else {
3173 addReply(c,shared.czero);
3174 }
3175 }
3176
3177 static void sremCommand(redisClient *c) {
3178 robj *set;
3179
3180 set = lookupKeyWrite(c->db,c->argv[1]);
3181 if (set == NULL) {
3182 addReply(c,shared.czero);
3183 } else {
3184 if (set->type != REDIS_SET) {
3185 addReply(c,shared.wrongtypeerr);
3186 return;
3187 }
3188 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3189 server.dirty++;
3190 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3191 addReply(c,shared.cone);
3192 } else {
3193 addReply(c,shared.czero);
3194 }
3195 }
3196 }
3197
3198 static void smoveCommand(redisClient *c) {
3199 robj *srcset, *dstset;
3200
3201 srcset = lookupKeyWrite(c->db,c->argv[1]);
3202 dstset = lookupKeyWrite(c->db,c->argv[2]);
3203
3204 /* If the source key does not exist return 0, if it's of the wrong type
3205 * raise an error */
3206 if (srcset == NULL || srcset->type != REDIS_SET) {
3207 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3208 return;
3209 }
3210 /* Error if the destination key is not a set as well */
3211 if (dstset && dstset->type != REDIS_SET) {
3212 addReply(c,shared.wrongtypeerr);
3213 return;
3214 }
3215 /* Remove the element from the source set */
3216 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3217 /* Key not found in the src set! return zero */
3218 addReply(c,shared.czero);
3219 return;
3220 }
3221 server.dirty++;
3222 /* Add the element to the destination set */
3223 if (!dstset) {
3224 dstset = createSetObject();
3225 dictAdd(c->db->dict,c->argv[2],dstset);
3226 incrRefCount(c->argv[2]);
3227 }
3228 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3229 incrRefCount(c->argv[3]);
3230 addReply(c,shared.cone);
3231 }
3232
3233 static void sismemberCommand(redisClient *c) {
3234 robj *set;
3235
3236 set = lookupKeyRead(c->db,c->argv[1]);
3237 if (set == NULL) {
3238 addReply(c,shared.czero);
3239 } else {
3240 if (set->type != REDIS_SET) {
3241 addReply(c,shared.wrongtypeerr);
3242 return;
3243 }
3244 if (dictFind(set->ptr,c->argv[2]))
3245 addReply(c,shared.cone);
3246 else
3247 addReply(c,shared.czero);
3248 }
3249 }
3250
3251 static void scardCommand(redisClient *c) {
3252 robj *o;
3253 dict *s;
3254
3255 o = lookupKeyRead(c->db,c->argv[1]);
3256 if (o == NULL) {
3257 addReply(c,shared.czero);
3258 return;
3259 } else {
3260 if (o->type != REDIS_SET) {
3261 addReply(c,shared.wrongtypeerr);
3262 } else {
3263 s = o->ptr;
3264 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3265 dictSize(s)));
3266 }
3267 }
3268 }
3269
3270 static void spopCommand(redisClient *c) {
3271 robj *set;
3272 dictEntry *de;
3273
3274 set = lookupKeyWrite(c->db,c->argv[1]);
3275 if (set == NULL) {
3276 addReply(c,shared.nullbulk);
3277 } else {
3278 if (set->type != REDIS_SET) {
3279 addReply(c,shared.wrongtypeerr);
3280 return;
3281 }
3282 de = dictGetRandomKey(set->ptr);
3283 if (de == NULL) {
3284 addReply(c,shared.nullbulk);
3285 } else {
3286 robj *ele = dictGetEntryKey(de);
3287
3288 addReplyBulkLen(c,ele);
3289 addReply(c,ele);
3290 addReply(c,shared.crlf);
3291 dictDelete(set->ptr,ele);
3292 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3293 server.dirty++;
3294 }
3295 }
3296 }
3297
3298 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3299 dict **d1 = (void*) s1, **d2 = (void*) s2;
3300
3301 return dictSize(*d1)-dictSize(*d2);
3302 }
3303
3304 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3305 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3306 dictIterator *di;
3307 dictEntry *de;
3308 robj *lenobj = NULL, *dstset = NULL;
3309 int j, cardinality = 0;
3310
3311 if (!dv) oom("sinterGenericCommand");
3312 for (j = 0; j < setsnum; j++) {
3313 robj *setobj;
3314
3315 setobj = dstkey ?
3316 lookupKeyWrite(c->db,setskeys[j]) :
3317 lookupKeyRead(c->db,setskeys[j]);
3318 if (!setobj) {
3319 zfree(dv);
3320 if (dstkey) {
3321 deleteKey(c->db,dstkey);
3322 addReply(c,shared.ok);
3323 } else {
3324 addReply(c,shared.nullmultibulk);
3325 }
3326 return;
3327 }
3328 if (setobj->type != REDIS_SET) {
3329 zfree(dv);
3330 addReply(c,shared.wrongtypeerr);
3331 return;
3332 }
3333 dv[j] = setobj->ptr;
3334 }
3335 /* Sort sets from the smallest to largest, this will improve our
3336 * algorithm's performace */
3337 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3338
3339 /* The first thing we should output is the total number of elements...
3340 * since this is a multi-bulk write, but at this stage we don't know
3341 * the intersection set size, so we use a trick, append an empty object
3342 * to the output list and save the pointer to later modify it with the
3343 * right length */
3344 if (!dstkey) {
3345 lenobj = createObject(REDIS_STRING,NULL);
3346 addReply(c,lenobj);
3347 decrRefCount(lenobj);
3348 } else {
3349 /* If we have a target key where to store the resulting set
3350 * create this key with an empty set inside */
3351 dstset = createSetObject();
3352 }
3353
3354 /* Iterate all the elements of the first (smallest) set, and test
3355 * the element against all the other sets, if at least one set does
3356 * not include the element it is discarded */
3357 di = dictGetIterator(dv[0]);
3358 if (!di) oom("dictGetIterator");
3359
3360 while((de = dictNext(di)) != NULL) {
3361 robj *ele;
3362
3363 for (j = 1; j < setsnum; j++)
3364 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3365 if (j != setsnum)
3366 continue; /* at least one set does not contain the member */
3367 ele = dictGetEntryKey(de);
3368 if (!dstkey) {
3369 addReplyBulkLen(c,ele);
3370 addReply(c,ele);
3371 addReply(c,shared.crlf);
3372 cardinality++;
3373 } else {
3374 dictAdd(dstset->ptr,ele,NULL);
3375 incrRefCount(ele);
3376 }
3377 }
3378 dictReleaseIterator(di);
3379
3380 if (dstkey) {
3381 /* Store the resulting set into the target */
3382 deleteKey(c->db,dstkey);
3383 dictAdd(c->db->dict,dstkey,dstset);
3384 incrRefCount(dstkey);
3385 }
3386
3387 if (!dstkey) {
3388 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3389 } else {
3390 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3391 dictSize((dict*)dstset->ptr)));
3392 server.dirty++;
3393 }
3394 zfree(dv);
3395 }
3396
3397 static void sinterCommand(redisClient *c) {
3398 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3399 }
3400
3401 static void sinterstoreCommand(redisClient *c) {
3402 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3403 }
3404
3405 #define REDIS_OP_UNION 0
3406 #define REDIS_OP_DIFF 1
3407
3408 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3409 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3410 dictIterator *di;
3411 dictEntry *de;
3412 robj *dstset = NULL;
3413 int j, cardinality = 0;
3414
3415 if (!dv) oom("sunionDiffGenericCommand");
3416 for (j = 0; j < setsnum; j++) {
3417 robj *setobj;
3418
3419 setobj = dstkey ?
3420 lookupKeyWrite(c->db,setskeys[j]) :
3421 lookupKeyRead(c->db,setskeys[j]);
3422 if (!setobj) {
3423 dv[j] = NULL;
3424 continue;
3425 }
3426 if (setobj->type != REDIS_SET) {
3427 zfree(dv);
3428 addReply(c,shared.wrongtypeerr);
3429 return;
3430 }
3431 dv[j] = setobj->ptr;
3432 }
3433
3434 /* We need a temp set object to store our union. If the dstkey
3435 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3436 * this set object will be the resulting object to set into the target key*/
3437 dstset = createSetObject();
3438
3439 /* Iterate all the elements of all the sets, add every element a single
3440 * time to the result set */
3441 for (j = 0; j < setsnum; j++) {
3442 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3443 if (!dv[j]) continue; /* non existing keys are like empty sets */
3444
3445 di = dictGetIterator(dv[j]);
3446 if (!di) oom("dictGetIterator");
3447
3448 while((de = dictNext(di)) != NULL) {
3449 robj *ele;
3450
3451 /* dictAdd will not add the same element multiple times */
3452 ele = dictGetEntryKey(de);
3453 if (op == REDIS_OP_UNION || j == 0) {
3454 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3455 incrRefCount(ele);
3456 cardinality++;
3457 }
3458 } else if (op == REDIS_OP_DIFF) {
3459 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3460 cardinality--;
3461 }
3462 }
3463 }
3464 dictReleaseIterator(di);
3465
3466 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3467 }
3468
3469 /* Output the content of the resulting set, if not in STORE mode */
3470 if (!dstkey) {
3471 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3472 di = dictGetIterator(dstset->ptr);
3473 if (!di) oom("dictGetIterator");
3474 while((de = dictNext(di)) != NULL) {
3475 robj *ele;
3476
3477 ele = dictGetEntryKey(de);
3478 addReplyBulkLen(c,ele);
3479 addReply(c,ele);
3480 addReply(c,shared.crlf);
3481 }
3482 dictReleaseIterator(di);
3483 } else {
3484 /* If we have a target key where to store the resulting set
3485 * create this key with the result set inside */
3486 deleteKey(c->db,dstkey);
3487 dictAdd(c->db->dict,dstkey,dstset);
3488 incrRefCount(dstkey);
3489 }
3490
3491 /* Cleanup */
3492 if (!dstkey) {
3493 decrRefCount(dstset);
3494 } else {
3495 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3496 dictSize((dict*)dstset->ptr)));
3497 server.dirty++;
3498 }
3499 zfree(dv);
3500 }
3501
3502 static void sunionCommand(redisClient *c) {
3503 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3504 }
3505
3506 static void sunionstoreCommand(redisClient *c) {
3507 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3508 }
3509
3510 static void sdiffCommand(redisClient *c) {
3511 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3512 }
3513
3514 static void sdiffstoreCommand(redisClient *c) {
3515 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3516 }
3517
3518 static void flushdbCommand(redisClient *c) {
3519 server.dirty += dictSize(c->db->dict);
3520 dictEmpty(c->db->dict);
3521 dictEmpty(c->db->expires);
3522 addReply(c,shared.ok);
3523 }
3524
3525 static void flushallCommand(redisClient *c) {
3526 server.dirty += emptyDb();
3527 addReply(c,shared.ok);
3528 rdbSave(server.dbfilename);
3529 server.dirty++;
3530 }
3531
3532 static redisSortOperation *createSortOperation(int type, robj *pattern) {
3533 redisSortOperation *so = zmalloc(sizeof(*so));
3534 if (!so) oom("createSortOperation");
3535 so->type = type;
3536 so->pattern = pattern;
3537 return so;
3538 }
3539
3540 /* Return the value associated to the key with a name obtained
3541 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3542 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3543 char *p;
3544 sds spat, ssub;
3545 robj keyobj;
3546 int prefixlen, sublen, postfixlen;
3547 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3548 struct {
3549 int len;
3550 unsigned short free;
3551 unsigned short _len; /* not used here */
3552 char buf[REDIS_SORTKEY_MAX+1];
3553 } keyname;
3554
3555 if (subst->encoding == REDIS_ENCODING_RAW)
3556 incrRefCount(subst);
3557 else {
3558 subst = getDecodedObject(subst);
3559 }
3560
3561 spat = pattern->ptr;
3562 ssub = subst->ptr;
3563 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3564 p = strchr(spat,'*');
3565 if (!p) return NULL;
3566
3567 prefixlen = p-spat;
3568 sublen = sdslen(ssub);
3569 postfixlen = sdslen(spat)-(prefixlen+1);
3570 memcpy(keyname.buf,spat,prefixlen);
3571 memcpy(keyname.buf+prefixlen,ssub,sublen);
3572 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3573 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3574 keyname.len = prefixlen+sublen+postfixlen;
3575 keyname._len = USHRT_MAX;
3576
3577 keyobj.refcount = 1;
3578 keyobj.type = REDIS_STRING;
3579 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3580
3581 decrRefCount(subst);
3582
3583 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3584 return lookupKeyRead(db,&keyobj);
3585 }
3586
3587 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3588 * the additional parameter is not standard but a BSD-specific we have to
3589 * pass sorting parameters via the global 'server' structure */
3590 static int sortCompare(const void *s1, const void *s2) {
3591 const redisSortObject *so1 = s1, *so2 = s2;
3592 int cmp;
3593
3594 if (!server.sort_alpha) {
3595 /* Numeric sorting. Here it's trivial as we precomputed scores */
3596 if (so1->u.score > so2->u.score) {
3597 cmp = 1;
3598 } else if (so1->u.score < so2->u.score) {
3599 cmp = -1;
3600 } else {
3601 cmp = 0;
3602 }
3603 } else {
3604 /* Alphanumeric sorting */
3605 if (server.sort_bypattern) {
3606 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3607 /* At least one compare object is NULL */
3608 if (so1->u.cmpobj == so2->u.cmpobj)
3609 cmp = 0;
3610 else if (so1->u.cmpobj == NULL)
3611 cmp = -1;
3612 else
3613 cmp = 1;
3614 } else {
3615 /* We have both the objects, use strcoll */
3616 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3617 }
3618 } else {
3619 /* Compare elements directly */
3620 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
3621 so2->obj->encoding == REDIS_ENCODING_RAW) {
3622 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3623 } else {
3624 robj *dec1, *dec2;
3625
3626 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
3627 so1->obj : getDecodedObject(so1->obj);
3628 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
3629 so2->obj : getDecodedObject(so2->obj);
3630 cmp = strcoll(dec1->ptr,dec2->ptr);
3631 if (dec1 != so1->obj) decrRefCount(dec1);
3632 if (dec2 != so2->obj) decrRefCount(dec2);
3633 }
3634 }
3635 }
3636 return server.sort_desc ? -cmp : cmp;
3637 }
3638
3639 /* The SORT command is the most complex command in Redis. Warning: this code
3640 * is optimized for speed and a bit less for readability */
3641 static void sortCommand(redisClient *c) {
3642 list *operations;
3643 int outputlen = 0;
3644 int desc = 0, alpha = 0;
3645 int limit_start = 0, limit_count = -1, start, end;
3646 int j, dontsort = 0, vectorlen;
3647 int getop = 0; /* GET operation counter */
3648 robj *sortval, *sortby = NULL;
3649 redisSortObject *vector; /* Resulting vector to sort */
3650
3651 /* Lookup the key to sort. It must be of the right types */
3652 sortval = lookupKeyRead(c->db,c->argv[1]);
3653 if (sortval == NULL) {
3654 addReply(c,shared.nokeyerr);
3655 return;
3656 }
3657 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3658 addReply(c,shared.wrongtypeerr);
3659 return;
3660 }
3661
3662 /* Create a list of operations to perform for every sorted element.
3663 * Operations can be GET/DEL/INCR/DECR */
3664 operations = listCreate();
3665 listSetFreeMethod(operations,zfree);
3666 j = 2;
3667
3668 /* Now we need to protect sortval incrementing its count, in the future
3669 * SORT may have options able to overwrite/delete keys during the sorting
3670 * and the sorted key itself may get destroied */
3671 incrRefCount(sortval);
3672
3673 /* The SORT command has an SQL-alike syntax, parse it */
3674 while(j < c->argc) {
3675 int leftargs = c->argc-j-1;
3676 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3677 desc = 0;
3678 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3679 desc = 1;
3680 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3681 alpha = 1;
3682 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3683 limit_start = atoi(c->argv[j+1]->ptr);
3684 limit_count = atoi(c->argv[j+2]->ptr);
3685 j+=2;
3686 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3687 sortby = c->argv[j+1];
3688 /* If the BY pattern does not contain '*', i.e. it is constant,
3689 * we don't need to sort nor to lookup the weight keys. */
3690 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3691 j++;
3692 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3693 listAddNodeTail(operations,createSortOperation(
3694 REDIS_SORT_GET,c->argv[j+1]));
3695 getop++;
3696 j++;
3697 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3698 listAddNodeTail(operations,createSortOperation(
3699 REDIS_SORT_DEL,c->argv[j+1]));
3700 j++;
3701 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3702 listAddNodeTail(operations,createSortOperation(
3703 REDIS_SORT_INCR,c->argv[j+1]));
3704 j++;
3705 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3706 listAddNodeTail(operations,createSortOperation(
3707 REDIS_SORT_DECR,c->argv[j+1]));
3708 j++;
3709 } else {
3710 decrRefCount(sortval);
3711 listRelease(operations);
3712 addReply(c,shared.syntaxerr);
3713 return;
3714 }
3715 j++;
3716 }
3717
3718 /* Load the sorting vector with all the objects to sort */
3719 vectorlen = (sortval->type == REDIS_LIST) ?
3720 listLength((list*)sortval->ptr) :
3721 dictSize((dict*)sortval->ptr);
3722 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3723 if (!vector) oom("allocating objects vector for SORT");
3724 j = 0;
3725 if (sortval->type == REDIS_LIST) {
3726 list *list = sortval->ptr;
3727 listNode *ln;
3728
3729 listRewind(list);
3730 while((ln = listYield(list))) {
3731 robj *ele = ln->value;
3732 vector[j].obj = ele;
3733 vector[j].u.score = 0;
3734 vector[j].u.cmpobj = NULL;
3735 j++;
3736 }
3737 } else {
3738 dict *set = sortval->ptr;
3739 dictIterator *di;
3740 dictEntry *setele;
3741
3742 di = dictGetIterator(set);
3743 if (!di) oom("dictGetIterator");
3744 while((setele = dictNext(di)) != NULL) {
3745 vector[j].obj = dictGetEntryKey(setele);
3746 vector[j].u.score = 0;
3747 vector[j].u.cmpobj = NULL;
3748 j++;
3749 }
3750 dictReleaseIterator(di);
3751 }
3752 assert(j == vectorlen);
3753
3754 /* Now it's time to load the right scores in the sorting vector */
3755 if (dontsort == 0) {
3756 for (j = 0; j < vectorlen; j++) {
3757 if (sortby) {
3758 robj *byval;
3759
3760 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3761 if (!byval || byval->type != REDIS_STRING) continue;
3762 if (alpha) {
3763 if (byval->encoding == REDIS_ENCODING_RAW) {
3764 vector[j].u.cmpobj = byval;
3765 incrRefCount(byval);
3766 } else {
3767 vector[j].u.cmpobj = getDecodedObject(byval);
3768 }
3769 } else {
3770 if (byval->encoding == REDIS_ENCODING_RAW) {
3771 vector[j].u.score = strtod(byval->ptr,NULL);
3772 } else {
3773 if (byval->encoding == REDIS_ENCODING_INT)
3774 vector[j].u.score = (long)byval->ptr;
3775 else
3776 assert(1 != 1);
3777 }
3778 }
3779 } else {
3780 if (!alpha) {
3781 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
3782 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3783 else {
3784 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
3785 vector[j].u.score = (long) vector[j].obj->ptr;
3786 else
3787 assert(1 != 1);
3788 }
3789 }
3790 }
3791 }
3792 }
3793
3794 /* We are ready to sort the vector... perform a bit of sanity check
3795 * on the LIMIT option too. We'll use a partial version of quicksort. */
3796 start = (limit_start < 0) ? 0 : limit_start;
3797 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3798 if (start >= vectorlen) {
3799 start = vectorlen-1;
3800 end = vectorlen-2;
3801 }
3802 if (end >= vectorlen) end = vectorlen-1;
3803
3804 if (dontsort == 0) {
3805 server.sort_desc = desc;
3806 server.sort_alpha = alpha;
3807 server.sort_bypattern = sortby ? 1 : 0;
3808 if (sortby && (start != 0 || end != vectorlen-1))
3809 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3810 else
3811 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3812 }
3813
3814 /* Send command output to the output buffer, performing the specified
3815 * GET/DEL/INCR/DECR operations if any. */
3816 outputlen = getop ? getop*(end-start+1) : end-start+1;
3817 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3818 for (j = start; j <= end; j++) {
3819 listNode *ln;
3820 if (!getop) {
3821 addReplyBulkLen(c,vector[j].obj);
3822 addReply(c,vector[j].obj);
3823 addReply(c,shared.crlf);
3824 }
3825 listRewind(operations);
3826 while((ln = listYield(operations))) {
3827 redisSortOperation *sop = ln->value;
3828 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3829 vector[j].obj);
3830
3831 if (sop->type == REDIS_SORT_GET) {
3832 if (!val || val->type != REDIS_STRING) {
3833 addReply(c,shared.nullbulk);
3834 } else {
3835 addReplyBulkLen(c,val);
3836 addReply(c,val);
3837 addReply(c,shared.crlf);
3838 }
3839 } else if (sop->type == REDIS_SORT_DEL) {
3840 /* TODO */
3841 }
3842 }
3843 }
3844
3845 /* Cleanup */
3846 decrRefCount(sortval);
3847 listRelease(operations);
3848 for (j = 0; j < vectorlen; j++) {
3849 if (sortby && alpha && vector[j].u.cmpobj)
3850 decrRefCount(vector[j].u.cmpobj);
3851 }
3852 zfree(vector);
3853 }
3854
3855 static void infoCommand(redisClient *c) {
3856 sds info;
3857 time_t uptime = time(NULL)-server.stat_starttime;
3858 int j;
3859
3860 info = sdscatprintf(sdsempty(),
3861 "redis_version:%s\r\n"
3862 "uptime_in_seconds:%d\r\n"
3863 "uptime_in_days:%d\r\n"
3864 "connected_clients:%d\r\n"
3865 "connected_slaves:%d\r\n"
3866 "used_memory:%zu\r\n"
3867 "changes_since_last_save:%lld\r\n"
3868 "bgsave_in_progress:%d\r\n"
3869 "last_save_time:%d\r\n"
3870 "total_connections_received:%lld\r\n"
3871 "total_commands_processed:%lld\r\n"
3872 "role:%s\r\n"
3873 ,REDIS_VERSION,
3874 uptime,
3875 uptime/(3600*24),
3876 listLength(server.clients)-listLength(server.slaves),
3877 listLength(server.slaves),
3878 server.usedmemory,
3879 server.dirty,
3880 server.bgsaveinprogress,
3881 server.lastsave,
3882 server.stat_numconnections,
3883 server.stat_numcommands,
3884 server.masterhost == NULL ? "master" : "slave"
3885 );
3886 if (server.masterhost) {
3887 info = sdscatprintf(info,
3888 "master_host:%s\r\n"
3889 "master_port:%d\r\n"
3890 "master_link_status:%s\r\n"
3891 "master_last_io_seconds_ago:%d\r\n"
3892 ,server.masterhost,
3893 server.masterport,
3894 (server.replstate == REDIS_REPL_CONNECTED) ?
3895 "up" : "down",
3896 (int)(time(NULL)-server.master->lastinteraction)
3897 );
3898 }
3899 for (j = 0; j < server.dbnum; j++) {
3900 long long keys, vkeys;
3901
3902 keys = dictSize(server.db[j].dict);
3903 vkeys = dictSize(server.db[j].expires);
3904 if (keys || vkeys) {
3905 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
3906 j, keys, vkeys);
3907 }
3908 }
3909 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
3910 addReplySds(c,info);
3911 addReply(c,shared.crlf);
3912 }
3913
3914 static void monitorCommand(redisClient *c) {
3915 /* ignore MONITOR if aleady slave or in monitor mode */
3916 if (c->flags & REDIS_SLAVE) return;
3917
3918 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3919 c->slaveseldb = 0;
3920 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3921 addReply(c,shared.ok);
3922 }
3923
3924 /* ================================= Expire ================================= */
3925 static int removeExpire(redisDb *db, robj *key) {
3926 if (dictDelete(db->expires,key) == DICT_OK) {
3927 return 1;
3928 } else {
3929 return 0;
3930 }
3931 }
3932
3933 static int setExpire(redisDb *db, robj *key, time_t when) {
3934 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
3935 return 0;
3936 } else {
3937 incrRefCount(key);
3938 return 1;
3939 }
3940 }
3941
3942 /* Return the expire time of the specified key, or -1 if no expire
3943 * is associated with this key (i.e. the key is non volatile) */
3944 static time_t getExpire(redisDb *db, robj *key) {
3945 dictEntry *de;
3946
3947 /* No expire? return ASAP */
3948 if (dictSize(db->expires) == 0 ||
3949 (de = dictFind(db->expires,key)) == NULL) return -1;
3950
3951 return (time_t) dictGetEntryVal(de);
3952 }
3953
3954 static int expireIfNeeded(redisDb *db, robj *key) {
3955 time_t when;
3956 dictEntry *de;
3957
3958 /* No expire? return ASAP */
3959 if (dictSize(db->expires) == 0 ||
3960 (de = dictFind(db->expires,key)) == NULL) return 0;
3961
3962 /* Lookup the expire */
3963 when = (time_t) dictGetEntryVal(de);
3964 if (time(NULL) <= when) return 0;
3965
3966 /* Delete the key */
3967 dictDelete(db->expires,key);
3968 return dictDelete(db->dict,key) == DICT_OK;
3969 }
3970
3971 static int deleteIfVolatile(redisDb *db, robj *key) {
3972 dictEntry *de;
3973
3974 /* No expire? return ASAP */
3975 if (dictSize(db->expires) == 0 ||
3976 (de = dictFind(db->expires,key)) == NULL) return 0;
3977
3978 /* Delete the key */
3979 server.dirty++;
3980 dictDelete(db->expires,key);
3981 return dictDelete(db->dict,key) == DICT_OK;
3982 }
3983
3984 static void expireCommand(redisClient *c) {
3985 dictEntry *de;
3986 int seconds = atoi(c->argv[2]->ptr);
3987
3988 de = dictFind(c->db->dict,c->argv[1]);
3989 if (de == NULL) {
3990 addReply(c,shared.czero);
3991 return;
3992 }
3993 if (seconds <= 0) {
3994 addReply(c, shared.czero);
3995 return;
3996 } else {
3997 time_t when = time(NULL)+seconds;
3998 if (setExpire(c->db,c->argv[1],when)) {
3999 addReply(c,shared.cone);
4000 server.dirty++;
4001 } else {
4002 addReply(c,shared.czero);
4003 }
4004 return;
4005 }
4006 }
4007
4008 static void ttlCommand(redisClient *c) {
4009 time_t expire;
4010 int ttl = -1;
4011
4012 expire = getExpire(c->db,c->argv[1]);
4013 if (expire != -1) {
4014 ttl = (int) (expire-time(NULL));
4015 if (ttl < 0) ttl = -1;
4016 }
4017 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4018 }
4019
4020 /* =============================== Replication ============================= */
4021
4022 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4023 ssize_t nwritten, ret = size;
4024 time_t start = time(NULL);
4025
4026 timeout++;
4027 while(size) {
4028 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4029 nwritten = write(fd,ptr,size);
4030 if (nwritten == -1) return -1;
4031 ptr += nwritten;
4032 size -= nwritten;
4033 }
4034 if ((time(NULL)-start) > timeout) {
4035 errno = ETIMEDOUT;
4036 return -1;
4037 }
4038 }
4039 return ret;
4040 }
4041
4042 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4043 ssize_t nread, totread = 0;
4044 time_t start = time(NULL);
4045
4046 timeout++;
4047 while(size) {
4048 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4049 nread = read(fd,ptr,size);
4050 if (nread == -1) return -1;
4051 ptr += nread;
4052 size -= nread;
4053 totread += nread;
4054 }
4055 if ((time(NULL)-start) > timeout) {
4056 errno = ETIMEDOUT;
4057 return -1;
4058 }
4059 }
4060 return totread;
4061 }
4062
4063 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4064 ssize_t nread = 0;
4065
4066 size--;
4067 while(size) {
4068 char c;
4069
4070 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4071 if (c == '\n') {
4072 *ptr = '\0';
4073 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4074 return nread;
4075 } else {
4076 *ptr++ = c;
4077 *ptr = '\0';
4078 nread++;
4079 }
4080 }
4081 return nread;
4082 }
4083
4084 static void syncCommand(redisClient *c) {
4085 /* ignore SYNC if aleady slave or in monitor mode */
4086 if (c->flags & REDIS_SLAVE) return;
4087
4088 /* SYNC can't be issued when the server has pending data to send to
4089 * the client about already issued commands. We need a fresh reply
4090 * buffer registering the differences between the BGSAVE and the current
4091 * dataset, so that we can copy to other slaves if needed. */
4092 if (listLength(c->reply) != 0) {
4093 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4094 return;
4095 }
4096
4097 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4098 /* Here we need to check if there is a background saving operation
4099 * in progress, or if it is required to start one */
4100 if (server.bgsaveinprogress) {
4101 /* Ok a background save is in progress. Let's check if it is a good
4102 * one for replication, i.e. if there is another slave that is
4103 * registering differences since the server forked to save */
4104 redisClient *slave;
4105 listNode *ln;
4106
4107 listRewind(server.slaves);
4108 while((ln = listYield(server.slaves))) {
4109 slave = ln->value;
4110 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4111 }
4112 if (ln) {
4113 /* Perfect, the server is already registering differences for
4114 * another slave. Set the right state, and copy the buffer. */
4115 listRelease(c->reply);
4116 c->reply = listDup(slave->reply);
4117 if (!c->reply) oom("listDup copying slave reply list");
4118 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4119 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4120 } else {
4121 /* No way, we need to wait for the next BGSAVE in order to
4122 * register differences */
4123 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4124 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4125 }
4126 } else {
4127 /* Ok we don't have a BGSAVE in progress, let's start one */
4128 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4129 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4130 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4131 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4132 return;
4133 }
4134 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4135 }
4136 c->repldbfd = -1;
4137 c->flags |= REDIS_SLAVE;
4138 c->slaveseldb = 0;
4139 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
4140 return;
4141 }
4142
4143 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4144 redisClient *slave = privdata;
4145 REDIS_NOTUSED(el);
4146 REDIS_NOTUSED(mask);
4147 char buf[REDIS_IOBUF_LEN];
4148 ssize_t nwritten, buflen;
4149
4150 if (slave->repldboff == 0) {
4151 /* Write the bulk write count before to transfer the DB. In theory here
4152 * we don't know how much room there is in the output buffer of the
4153 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4154 * operations) will never be smaller than the few bytes we need. */
4155 sds bulkcount;
4156
4157 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4158 slave->repldbsize);
4159 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4160 {
4161 sdsfree(bulkcount);
4162 freeClient(slave);
4163 return;
4164 }
4165 sdsfree(bulkcount);
4166 }
4167 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4168 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4169 if (buflen <= 0) {
4170 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4171 (buflen == 0) ? "premature EOF" : strerror(errno));
4172 freeClient(slave);
4173 return;
4174 }
4175 if ((nwritten = write(fd,buf,buflen)) == -1) {
4176 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4177 strerror(errno));
4178 freeClient(slave);
4179 return;
4180 }
4181 slave->repldboff += nwritten;
4182 if (slave->repldboff == slave->repldbsize) {
4183 close(slave->repldbfd);
4184 slave->repldbfd = -1;
4185 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4186 slave->replstate = REDIS_REPL_ONLINE;
4187 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4188 sendReplyToClient, slave, NULL) == AE_ERR) {
4189 freeClient(slave);
4190 return;
4191 }
4192 addReplySds(slave,sdsempty());
4193 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4194 }
4195 }
4196
4197 /* This function is called at the end of every backgrond saving.
4198 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4199 * otherwise REDIS_ERR is passed to the function.
4200 *
4201 * The goal of this function is to handle slaves waiting for a successful
4202 * background saving in order to perform non-blocking synchronization. */
4203 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4204 listNode *ln;
4205 int startbgsave = 0;
4206
4207 listRewind(server.slaves);
4208 while((ln = listYield(server.slaves))) {
4209 redisClient *slave = ln->value;
4210
4211 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4212 startbgsave = 1;
4213 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4214 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4215 struct redis_stat buf;
4216
4217 if (bgsaveerr != REDIS_OK) {
4218 freeClient(slave);
4219 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4220 continue;
4221 }
4222 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4223 redis_fstat(slave->repldbfd,&buf) == -1) {
4224 freeClient(slave);
4225 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4226 continue;
4227 }
4228 slave->repldboff = 0;
4229 slave->repldbsize = buf.st_size;
4230 slave->replstate = REDIS_REPL_SEND_BULK;
4231 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4232 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4233 freeClient(slave);
4234 continue;
4235 }
4236 }
4237 }
4238 if (startbgsave) {
4239 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4240 listRewind(server.slaves);
4241 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4242 while((ln = listYield(server.slaves))) {
4243 redisClient *slave = ln->value;
4244
4245 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4246 freeClient(slave);
4247 }
4248 }
4249 }
4250 }
4251
4252 static int syncWithMaster(void) {
4253 char buf[1024], tmpfile[256];
4254 int dumpsize;
4255 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4256 int dfd;
4257
4258 if (fd == -1) {
4259 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4260 strerror(errno));
4261 return REDIS_ERR;
4262 }
4263 /* Issue the SYNC command */
4264 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4265 close(fd);
4266 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4267 strerror(errno));
4268 return REDIS_ERR;
4269 }
4270 /* Read the bulk write count */
4271 if (syncReadLine(fd,buf,1024,3600) == -1) {
4272 close(fd);
4273 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
4274 strerror(errno));
4275 return REDIS_ERR;
4276 }
4277 dumpsize = atoi(buf+1);
4278 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4279 /* Read the bulk write data on a temp file */
4280 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4281 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4282 if (dfd == -1) {
4283 close(fd);
4284 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
4285 return REDIS_ERR;
4286 }
4287 while(dumpsize) {
4288 int nread, nwritten;
4289
4290 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
4291 if (nread == -1) {
4292 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
4293 strerror(errno));
4294 close(fd);
4295 close(dfd);
4296 return REDIS_ERR;
4297 }
4298 nwritten = write(dfd,buf,nread);
4299 if (nwritten == -1) {
4300 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
4301 close(fd);
4302 close(dfd);
4303 return REDIS_ERR;
4304 }
4305 dumpsize -= nread;
4306 }
4307 close(dfd);
4308 if (rename(tmpfile,server.dbfilename) == -1) {
4309 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
4310 unlink(tmpfile);
4311 close(fd);
4312 return REDIS_ERR;
4313 }
4314 emptyDb();
4315 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4316 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4317 close(fd);
4318 return REDIS_ERR;
4319 }
4320 server.master = createClient(fd);
4321 server.master->flags |= REDIS_MASTER;
4322 server.replstate = REDIS_REPL_CONNECTED;
4323 return REDIS_OK;
4324 }
4325
4326 static void slaveofCommand(redisClient *c) {
4327 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4328 !strcasecmp(c->argv[2]->ptr,"one")) {
4329 if (server.masterhost) {
4330 sdsfree(server.masterhost);
4331 server.masterhost = NULL;
4332 if (server.master) freeClient(server.master);
4333 server.replstate = REDIS_REPL_NONE;
4334 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4335 }
4336 } else {
4337 sdsfree(server.masterhost);
4338 server.masterhost = sdsdup(c->argv[1]->ptr);
4339 server.masterport = atoi(c->argv[2]->ptr);
4340 if (server.master) freeClient(server.master);
4341 server.replstate = REDIS_REPL_CONNECT;
4342 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4343 server.masterhost, server.masterport);
4344 }
4345 addReply(c,shared.ok);
4346 }
4347
4348 /* ============================ Maxmemory directive ======================== */
4349
4350 /* This function gets called when 'maxmemory' is set on the config file to limit
4351 * the max memory used by the server, and we are out of memory.
4352 * This function will try to, in order:
4353 *
4354 * - Free objects from the free list
4355 * - Try to remove keys with an EXPIRE set
4356 *
4357 * It is not possible to free enough memory to reach used-memory < maxmemory
4358 * the server will start refusing commands that will enlarge even more the
4359 * memory usage.
4360 */
4361 static void freeMemoryIfNeeded(void) {
4362 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4363 if (listLength(server.objfreelist)) {
4364 robj *o;
4365
4366 listNode *head = listFirst(server.objfreelist);
4367 o = listNodeValue(head);
4368 listDelNode(server.objfreelist,head);
4369 zfree(o);
4370 } else {
4371 int j, k, freed = 0;
4372
4373 for (j = 0; j < server.dbnum; j++) {
4374 int minttl = -1;
4375 robj *minkey = NULL;
4376 struct dictEntry *de;
4377
4378 if (dictSize(server.db[j].expires)) {
4379 freed = 1;
4380 /* From a sample of three keys drop the one nearest to
4381 * the natural expire */
4382 for (k = 0; k < 3; k++) {
4383 time_t t;
4384
4385 de = dictGetRandomKey(server.db[j].expires);
4386 t = (time_t) dictGetEntryVal(de);
4387 if (minttl == -1 || t < minttl) {
4388 minkey = dictGetEntryKey(de);
4389 minttl = t;
4390 }
4391 }
4392 deleteKey(server.db+j,minkey);
4393 }
4394 }
4395 if (!freed) return; /* nothing to free... */
4396 }
4397 }
4398 }
4399
4400 /* ================================= Debugging ============================== */
4401
4402 static void debugCommand(redisClient *c) {
4403 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4404 *((char*)-1) = 'x';
4405 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4406 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4407 robj *key, *val;
4408
4409 if (!de) {
4410 addReply(c,shared.nokeyerr);
4411 return;
4412 }
4413 key = dictGetEntryKey(de);
4414 val = dictGetEntryVal(de);
4415 addReplySds(c,sdscatprintf(sdsempty(),
4416 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4417 key, key->refcount, val, val->refcount, val->encoding));
4418 } else {
4419 addReplySds(c,sdsnew(
4420 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4421 }
4422 }
4423
4424 #ifdef HAVE_BACKTRACE
4425 static struct redisFunctionSym symsTable[] = {
4426 {"compareStringObjects", (unsigned long)compareStringObjects},
4427 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
4428 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
4429 {"dictEncObjHash", (unsigned long)dictEncObjHash},
4430 {"incrDecrCommand", (unsigned long)incrDecrCommand},
4431 {"freeStringObject", (unsigned long)freeStringObject},
4432 {"freeListObject", (unsigned long)freeListObject},
4433 {"freeSetObject", (unsigned long)freeSetObject},
4434 {"decrRefCount", (unsigned long)decrRefCount},
4435 {"createObject", (unsigned long)createObject},
4436 {"freeClient", (unsigned long)freeClient},
4437 {"rdbLoad", (unsigned long)rdbLoad},
4438 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
4439 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
4440 {"addReply", (unsigned long)addReply},
4441 {"addReplySds", (unsigned long)addReplySds},
4442 {"incrRefCount", (unsigned long)incrRefCount},
4443 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
4444 {"createStringObject", (unsigned long)createStringObject},
4445 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
4446 {"syncWithMaster", (unsigned long)syncWithMaster},
4447 {"tryObjectSharing", (unsigned long)tryObjectSharing},
4448 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
4449 {"getDecodedObject", (unsigned long)getDecodedObject},
4450 {"removeExpire", (unsigned long)removeExpire},
4451 {"expireIfNeeded", (unsigned long)expireIfNeeded},
4452 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
4453 {"deleteKey", (unsigned long)deleteKey},
4454 {"getExpire", (unsigned long)getExpire},
4455 {"setExpire", (unsigned long)setExpire},
4456 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
4457 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
4458 {"authCommand", (unsigned long)authCommand},
4459 {"pingCommand", (unsigned long)pingCommand},
4460 {"echoCommand", (unsigned long)echoCommand},
4461 {"setCommand", (unsigned long)setCommand},
4462 {"setnxCommand", (unsigned long)setnxCommand},
4463 {"getCommand", (unsigned long)getCommand},
4464 {"delCommand", (unsigned long)delCommand},
4465 {"existsCommand", (unsigned long)existsCommand},
4466 {"incrCommand", (unsigned long)incrCommand},
4467 {"decrCommand", (unsigned long)decrCommand},
4468 {"incrbyCommand", (unsigned long)incrbyCommand},
4469 {"decrbyCommand", (unsigned long)decrbyCommand},
4470 {"selectCommand", (unsigned long)selectCommand},
4471 {"randomkeyCommand", (unsigned long)randomkeyCommand},
4472 {"keysCommand", (unsigned long)keysCommand},
4473 {"dbsizeCommand", (unsigned long)dbsizeCommand},
4474 {"lastsaveCommand", (unsigned long)lastsaveCommand},
4475 {"saveCommand", (unsigned long)saveCommand},
4476 {"bgsaveCommand", (unsigned long)bgsaveCommand},
4477 {"shutdownCommand", (unsigned long)shutdownCommand},
4478 {"moveCommand", (unsigned long)moveCommand},
4479 {"renameCommand", (unsigned long)renameCommand},
4480 {"renamenxCommand", (unsigned long)renamenxCommand},
4481 {"lpushCommand", (unsigned long)lpushCommand},
4482 {"rpushCommand", (unsigned long)rpushCommand},
4483 {"lpopCommand", (unsigned long)lpopCommand},
4484 {"rpopCommand", (unsigned long)rpopCommand},
4485 {"llenCommand", (unsigned long)llenCommand},
4486 {"lindexCommand", (unsigned long)lindexCommand},
4487 {"lrangeCommand", (unsigned long)lrangeCommand},
4488 {"ltrimCommand", (unsigned long)ltrimCommand},
4489 {"typeCommand", (unsigned long)typeCommand},
4490 {"lsetCommand", (unsigned long)lsetCommand},
4491 {"saddCommand", (unsigned long)saddCommand},
4492 {"sremCommand", (unsigned long)sremCommand},
4493 {"smoveCommand", (unsigned long)smoveCommand},
4494 {"sismemberCommand", (unsigned long)sismemberCommand},
4495 {"scardCommand", (unsigned long)scardCommand},
4496 {"spopCommand", (unsigned long)spopCommand},
4497 {"sinterCommand", (unsigned long)sinterCommand},
4498 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
4499 {"sunionCommand", (unsigned long)sunionCommand},
4500 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
4501 {"sdiffCommand", (unsigned long)sdiffCommand},
4502 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
4503 {"syncCommand", (unsigned long)syncCommand},
4504 {"flushdbCommand", (unsigned long)flushdbCommand},
4505 {"flushallCommand", (unsigned long)flushallCommand},
4506 {"sortCommand", (unsigned long)sortCommand},
4507 {"lremCommand", (unsigned long)lremCommand},
4508 {"infoCommand", (unsigned long)infoCommand},
4509 {"mgetCommand", (unsigned long)mgetCommand},
4510 {"monitorCommand", (unsigned long)monitorCommand},
4511 {"expireCommand", (unsigned long)expireCommand},
4512 {"getSetCommand", (unsigned long)getSetCommand},
4513 {"ttlCommand", (unsigned long)ttlCommand},
4514 {"slaveofCommand", (unsigned long)slaveofCommand},
4515 {"debugCommand", (unsigned long)debugCommand},
4516 {"processCommand", (unsigned long)processCommand},
4517 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
4518 {"readQueryFromClient", (unsigned long)readQueryFromClient},
4519 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
4520 {NULL,0}
4521 };
4522
4523 /* This function try to convert a pointer into a function name. It's used in
4524 * oreder to provide a backtrace under segmentation fault that's able to
4525 * display functions declared as static (otherwise the backtrace is useless). */
4526 static char *findFuncName(void *pointer, unsigned long *offset){
4527 int i, ret = -1;
4528 unsigned long off, minoff = 0;
4529
4530 /* Try to match against the Symbol with the smallest offset */
4531 for (i=0; symsTable[i].pointer; i++) {
4532 unsigned long lp = (unsigned long) pointer;
4533
4534 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
4535 off=lp-symsTable[i].pointer;
4536 if (ret < 0 || off < minoff) {
4537 minoff=off;
4538 ret=i;
4539 }
4540 }
4541 }
4542 if (ret == -1) return NULL;
4543 *offset = minoff;
4544 return symsTable[ret].name;
4545 }
4546
4547 static void *getMcontextEip(ucontext_t *uc) {
4548 #if defined(__FreeBSD__)
4549 return (void*) uc->uc_mcontext.mc_eip;
4550 #elif defined(__dietlibc__)
4551 return (void*) uc->uc_mcontext.eip;
4552 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
4553 return (void*) uc->uc_mcontext->__ss.__eip;
4554 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
4555 #ifdef _STRUCT_X86_THREAD_STATE64
4556 return (void*) uc->uc_mcontext->__ss.__rip;
4557 #else
4558 return (void*) uc->uc_mcontext->__ss.__eip;
4559 #endif
4560 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
4561 return (void*) uc->uc_mcontext.gregs[REG_EIP];
4562 #elif defined(__ia64__) /* Linux IA64 */
4563 return (void*) uc->uc_mcontext.sc_ip;
4564 #else
4565 return NULL;
4566 #endif
4567 }
4568
4569 static void segvHandler(int sig, siginfo_t *info, void *secret) {
4570 void *trace[100];
4571 char **messages = NULL;
4572 int i, trace_size = 0;
4573 unsigned long offset=0;
4574 time_t uptime = time(NULL)-server.stat_starttime;
4575 ucontext_t *uc = (ucontext_t*) secret;
4576 REDIS_NOTUSED(info);
4577
4578 redisLog(REDIS_WARNING,
4579 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
4580 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
4581 "redis_version:%s; "
4582 "uptime_in_seconds:%d; "
4583 "connected_clients:%d; "
4584 "connected_slaves:%d; "
4585 "used_memory:%zu; "
4586 "changes_since_last_save:%lld; "
4587 "bgsave_in_progress:%d; "
4588 "last_save_time:%d; "
4589 "total_connections_received:%lld; "
4590 "total_commands_processed:%lld; "
4591 "role:%s;"
4592 ,REDIS_VERSION,
4593 uptime,
4594 listLength(server.clients)-listLength(server.slaves),
4595 listLength(server.slaves),
4596 server.usedmemory,
4597 server.dirty,
4598 server.bgsaveinprogress,
4599 server.lastsave,
4600 server.stat_numconnections,
4601 server.stat_numcommands,
4602 server.masterhost == NULL ? "master" : "slave"
4603 ));
4604
4605 trace_size = backtrace(trace, 100);
4606 /* overwrite sigaction with caller's address */
4607 if (getMcontextEip(uc) != NULL) {
4608 trace[1] = getMcontextEip(uc);
4609 }
4610 messages = backtrace_symbols(trace, trace_size);
4611
4612 for (i=1; i<trace_size; ++i) {
4613 char *fn = findFuncName(trace[i], &offset), *p;
4614
4615 p = strchr(messages[i],'+');
4616 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
4617 redisLog(REDIS_WARNING,"%s", messages[i]);
4618 } else {
4619 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
4620 }
4621 }
4622 free(messages);
4623 exit(0);
4624 }
4625
4626 static void setupSigSegvAction(void) {
4627 struct sigaction act;
4628
4629 sigemptyset (&act.sa_mask);
4630 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4631 * is used. Otherwise, sa_handler is used */
4632 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
4633 act.sa_sigaction = segvHandler;
4634 sigaction (SIGSEGV, &act, NULL);
4635 sigaction (SIGBUS, &act, NULL);
4636 sigaction (SIGFPE, &act, NULL);
4637 sigaction (SIGILL, &act, NULL);
4638 sigaction (SIGBUS, &act, NULL);
4639 return;
4640 }
4641 #else /* HAVE_BACKTRACE */
4642 static void setupSigSegvAction(void) {
4643 }
4644 #endif /* HAVE_BACKTRACE */
4645
4646 /* =================================== Main! ================================ */
4647
4648 #ifdef __linux__
4649 int linuxOvercommitMemoryValue(void) {
4650 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4651 char buf[64];
4652
4653 if (!fp) return -1;
4654 if (fgets(buf,64,fp) == NULL) {
4655 fclose(fp);
4656 return -1;
4657 }
4658 fclose(fp);
4659
4660 return atoi(buf);
4661 }
4662
4663 void linuxOvercommitMemoryWarning(void) {
4664 if (linuxOvercommitMemoryValue() == 0) {
4665 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
4666 }
4667 }
4668 #endif /* __linux__ */
4669
4670 static void daemonize(void) {
4671 int fd;
4672 FILE *fp;
4673
4674 if (fork() != 0) exit(0); /* parent exits */
4675 setsid(); /* create a new session */
4676
4677 /* Every output goes to /dev/null. If Redis is daemonized but
4678 * the 'logfile' is set to 'stdout' in the configuration file
4679 * it will not log at all. */
4680 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4681 dup2(fd, STDIN_FILENO);
4682 dup2(fd, STDOUT_FILENO);
4683 dup2(fd, STDERR_FILENO);
4684 if (fd > STDERR_FILENO) close(fd);
4685 }
4686 /* Try to write the pid file */
4687 fp = fopen(server.pidfile,"w");
4688 if (fp) {
4689 fprintf(fp,"%d\n",getpid());
4690 fclose(fp);
4691 }
4692 }
4693
4694 int main(int argc, char **argv) {
4695 initServerConfig();
4696 if (argc == 2) {
4697 ResetServerSaveParams();
4698 loadServerConfig(argv[1]);
4699 } else if (argc > 2) {
4700 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4701 exit(1);
4702 } else {
4703 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4704 }
4705 initServer();
4706 if (server.daemonize) daemonize();
4707 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4708 #ifdef __linux__
4709 linuxOvercommitMemoryWarning();
4710 #endif
4711 if (rdbLoad(server.dbfilename) == REDIS_OK)
4712 redisLog(REDIS_NOTICE,"DB loaded from disk");
4713 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4714 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4715 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4716 aeMain(server.el);
4717 aeDeleteEventLoop(server.el);
4718 return 0;
4719 }