]> git.saurik.com Git - redis.git/blob - redis.c
853ea1cc604500ffc42beeb52fe3754873024576
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.001"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60
61 #include "redis.h"
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
70
71 /* Error codes */
72 #define REDIS_OK 0
73 #define REDIS_ERR -1
74
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
88
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
91
92 /* Command flags */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
100
101 /* Object types */
102 #define REDIS_STRING 0
103 #define REDIS_LIST 1
104 #define REDIS_SET 2
105 #define REDIS_HASH 3
106
107 /* Objects encoding */
108 #define REDIS_ENCODING_RAW 0 /* Raw representation */
109 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
110
111 /* Object types only used for dumping to disk */
112 #define REDIS_EXPIRETIME 253
113 #define REDIS_SELECTDB 254
114 #define REDIS_EOF 255
115
116 /* Defines related to the dump file format. To store 32 bits lengths for short
117 * keys requires a lot of space, so we check the most significant 2 bits of
118 * the first byte to interpreter the length:
119 *
120 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
121 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
122 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
123 * 11|000000 this means: specially encoded object will follow. The six bits
124 * number specify the kind of object that follows.
125 * See the REDIS_RDB_ENC_* defines.
126 *
127 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
128 * values, will fit inside. */
129 #define REDIS_RDB_6BITLEN 0
130 #define REDIS_RDB_14BITLEN 1
131 #define REDIS_RDB_32BITLEN 2
132 #define REDIS_RDB_ENCVAL 3
133 #define REDIS_RDB_LENERR UINT_MAX
134
135 /* When a length of a string object stored on disk has the first two bits
136 * set, the remaining two bits specify a special encoding for the object
137 * accordingly to the following defines: */
138 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
139 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
140 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
141 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
142
143 /* Client flags */
144 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
145 #define REDIS_SLAVE 2 /* This client is a slave server */
146 #define REDIS_MASTER 4 /* This client is a master server */
147 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
148
149 /* Slave replication state - slave side */
150 #define REDIS_REPL_NONE 0 /* No active replication */
151 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
152 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
153
154 /* Slave replication state - from the point of view of master
155 * Note that in SEND_BULK and ONLINE state the slave receives new updates
156 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
157 * to start the next background saving in order to send updates to it. */
158 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
159 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
160 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
161 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
162
163 /* List related stuff */
164 #define REDIS_HEAD 0
165 #define REDIS_TAIL 1
166
167 /* Sort operations */
168 #define REDIS_SORT_GET 0
169 #define REDIS_SORT_DEL 1
170 #define REDIS_SORT_INCR 2
171 #define REDIS_SORT_DECR 3
172 #define REDIS_SORT_ASC 4
173 #define REDIS_SORT_DESC 5
174 #define REDIS_SORTKEY_MAX 1024
175
176 /* Log levels */
177 #define REDIS_DEBUG 0
178 #define REDIS_NOTICE 1
179 #define REDIS_WARNING 2
180
181 /* Anti-warning macro... */
182 #define REDIS_NOTUSED(V) ((void) V)
183
184
185 /*================================= Data types ============================== */
186
187 /* A redis object, that is a type able to hold a string / list / set */
188 typedef struct redisObject {
189 void *ptr;
190 unsigned char type;
191 unsigned char encoding;
192 unsigned char notused[2];
193 int refcount;
194 } robj;
195
196 typedef struct redisDb {
197 dict *dict;
198 dict *expires;
199 int id;
200 } redisDb;
201
202 /* With multiplexing we need to take per-clinet state.
203 * Clients are taken in a liked list. */
204 typedef struct redisClient {
205 int fd;
206 redisDb *db;
207 int dictid;
208 sds querybuf;
209 robj **argv, **mbargv;
210 int argc, mbargc;
211 int bulklen; /* bulk read len. -1 if not in bulk read mode */
212 int multibulk; /* multi bulk command format active */
213 list *reply;
214 int sentlen;
215 time_t lastinteraction; /* time of the last interaction, used for timeout */
216 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
217 int slaveseldb; /* slave selected db, if this client is a slave */
218 int authenticated; /* when requirepass is non-NULL */
219 int replstate; /* replication state if this is a slave */
220 int repldbfd; /* replication DB file descriptor */
221 long repldboff; /* replication DB file offset */
222 off_t repldbsize; /* replication DB file size */
223 } redisClient;
224
225 struct saveparam {
226 time_t seconds;
227 int changes;
228 };
229
230 /* Global server state structure */
231 struct redisServer {
232 int port;
233 int fd;
234 redisDb *db;
235 dict *sharingpool;
236 unsigned int sharingpoolsize;
237 long long dirty; /* changes to DB from the last save */
238 list *clients;
239 list *slaves, *monitors;
240 char neterr[ANET_ERR_LEN];
241 aeEventLoop *el;
242 int cronloops; /* number of times the cron function run */
243 list *objfreelist; /* A list of freed objects to avoid malloc() */
244 time_t lastsave; /* Unix time of last save succeeede */
245 size_t usedmemory; /* Used memory in megabytes */
246 /* Fields used only for stats */
247 time_t stat_starttime; /* server start time */
248 long long stat_numcommands; /* number of processed commands */
249 long long stat_numconnections; /* number of connections received */
250 /* Configuration */
251 int verbosity;
252 int glueoutputbuf;
253 int maxidletime;
254 int dbnum;
255 int daemonize;
256 char *pidfile;
257 int bgsaveinprogress;
258 pid_t bgsavechildpid;
259 struct saveparam *saveparams;
260 int saveparamslen;
261 char *logfile;
262 char *bindaddr;
263 char *dbfilename;
264 char *requirepass;
265 int shareobjects;
266 /* Replication related */
267 int isslave;
268 char *masterhost;
269 int masterport;
270 redisClient *master; /* client that is master for this slave */
271 int replstate;
272 unsigned int maxclients;
273 unsigned long maxmemory;
274 /* Sort parameters - qsort_r() is only available under BSD so we
275 * have to take this state global, in order to pass it to sortCompare() */
276 int sort_desc;
277 int sort_alpha;
278 int sort_bypattern;
279 };
280
281 typedef void redisCommandProc(redisClient *c);
282 struct redisCommand {
283 char *name;
284 redisCommandProc *proc;
285 int arity;
286 int flags;
287 };
288
289 struct redisFunctionSym {
290 char *name;
291 unsigned long pointer;
292 };
293
294 typedef struct _redisSortObject {
295 robj *obj;
296 union {
297 double score;
298 robj *cmpobj;
299 } u;
300 } redisSortObject;
301
302 typedef struct _redisSortOperation {
303 int type;
304 robj *pattern;
305 } redisSortOperation;
306
307 struct sharedObjectsStruct {
308 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
309 *colon, *nullbulk, *nullmultibulk,
310 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
311 *outofrangeerr, *plus,
312 *select0, *select1, *select2, *select3, *select4,
313 *select5, *select6, *select7, *select8, *select9;
314 } shared;
315
316 /*================================ Prototypes =============================== */
317
318 static void freeStringObject(robj *o);
319 static void freeListObject(robj *o);
320 static void freeSetObject(robj *o);
321 static void decrRefCount(void *o);
322 static robj *createObject(int type, void *ptr);
323 static void freeClient(redisClient *c);
324 static int rdbLoad(char *filename);
325 static void addReply(redisClient *c, robj *obj);
326 static void addReplySds(redisClient *c, sds s);
327 static void incrRefCount(robj *o);
328 static int rdbSaveBackground(char *filename);
329 static robj *createStringObject(char *ptr, size_t len);
330 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
331 static int syncWithMaster(void);
332 static robj *tryObjectSharing(robj *o);
333 static int tryObjectEncoding(robj *o);
334 static robj *getDecodedObject(const robj *o);
335 static int removeExpire(redisDb *db, robj *key);
336 static int expireIfNeeded(redisDb *db, robj *key);
337 static int deleteIfVolatile(redisDb *db, robj *key);
338 static int deleteKey(redisDb *db, robj *key);
339 static time_t getExpire(redisDb *db, robj *key);
340 static int setExpire(redisDb *db, robj *key, time_t when);
341 static void updateSlavesWaitingBgsave(int bgsaveerr);
342 static void freeMemoryIfNeeded(void);
343 static int processCommand(redisClient *c);
344 static void setupSigSegvAction(void);
345 static void rdbRemoveTempFile(pid_t childpid);
346 static size_t stringObjectLen(robj *o);
347
348 static void authCommand(redisClient *c);
349 static void pingCommand(redisClient *c);
350 static void echoCommand(redisClient *c);
351 static void setCommand(redisClient *c);
352 static void setnxCommand(redisClient *c);
353 static void getCommand(redisClient *c);
354 static void delCommand(redisClient *c);
355 static void existsCommand(redisClient *c);
356 static void incrCommand(redisClient *c);
357 static void decrCommand(redisClient *c);
358 static void incrbyCommand(redisClient *c);
359 static void decrbyCommand(redisClient *c);
360 static void selectCommand(redisClient *c);
361 static void randomkeyCommand(redisClient *c);
362 static void keysCommand(redisClient *c);
363 static void dbsizeCommand(redisClient *c);
364 static void lastsaveCommand(redisClient *c);
365 static void saveCommand(redisClient *c);
366 static void bgsaveCommand(redisClient *c);
367 static void shutdownCommand(redisClient *c);
368 static void moveCommand(redisClient *c);
369 static void renameCommand(redisClient *c);
370 static void renamenxCommand(redisClient *c);
371 static void lpushCommand(redisClient *c);
372 static void rpushCommand(redisClient *c);
373 static void lpopCommand(redisClient *c);
374 static void rpopCommand(redisClient *c);
375 static void llenCommand(redisClient *c);
376 static void lindexCommand(redisClient *c);
377 static void lrangeCommand(redisClient *c);
378 static void ltrimCommand(redisClient *c);
379 static void typeCommand(redisClient *c);
380 static void lsetCommand(redisClient *c);
381 static void saddCommand(redisClient *c);
382 static void sremCommand(redisClient *c);
383 static void smoveCommand(redisClient *c);
384 static void sismemberCommand(redisClient *c);
385 static void scardCommand(redisClient *c);
386 static void spopCommand(redisClient *c);
387 static void sinterCommand(redisClient *c);
388 static void sinterstoreCommand(redisClient *c);
389 static void sunionCommand(redisClient *c);
390 static void sunionstoreCommand(redisClient *c);
391 static void sdiffCommand(redisClient *c);
392 static void sdiffstoreCommand(redisClient *c);
393 static void syncCommand(redisClient *c);
394 static void flushdbCommand(redisClient *c);
395 static void flushallCommand(redisClient *c);
396 static void sortCommand(redisClient *c);
397 static void lremCommand(redisClient *c);
398 static void infoCommand(redisClient *c);
399 static void mgetCommand(redisClient *c);
400 static void monitorCommand(redisClient *c);
401 static void expireCommand(redisClient *c);
402 static void getSetCommand(redisClient *c);
403 static void ttlCommand(redisClient *c);
404 static void slaveofCommand(redisClient *c);
405 static void debugCommand(redisClient *c);
406 /*================================= Globals ================================= */
407
408 /* Global vars */
409 static struct redisServer server; /* server global state */
410 static struct redisCommand cmdTable[] = {
411 {"get",getCommand,2,REDIS_CMD_INLINE},
412 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
413 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
414 {"del",delCommand,-2,REDIS_CMD_INLINE},
415 {"exists",existsCommand,2,REDIS_CMD_INLINE},
416 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
417 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
418 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
419 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
420 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
421 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
422 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
423 {"llen",llenCommand,2,REDIS_CMD_INLINE},
424 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
425 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
426 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
427 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
428 {"lrem",lremCommand,4,REDIS_CMD_BULK},
429 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
430 {"srem",sremCommand,3,REDIS_CMD_BULK},
431 {"smove",smoveCommand,4,REDIS_CMD_BULK},
432 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
433 {"scard",scardCommand,2,REDIS_CMD_INLINE},
434 {"spop",spopCommand,2,REDIS_CMD_INLINE},
435 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
436 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
437 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
438 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
439 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
440 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
441 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
442 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
443 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
444 {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
445 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
446 {"select",selectCommand,2,REDIS_CMD_INLINE},
447 {"move",moveCommand,3,REDIS_CMD_INLINE},
448 {"rename",renameCommand,3,REDIS_CMD_INLINE},
449 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
450 {"expire",expireCommand,3,REDIS_CMD_INLINE},
451 {"keys",keysCommand,2,REDIS_CMD_INLINE},
452 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
453 {"auth",authCommand,2,REDIS_CMD_INLINE},
454 {"ping",pingCommand,1,REDIS_CMD_INLINE},
455 {"echo",echoCommand,2,REDIS_CMD_BULK},
456 {"save",saveCommand,1,REDIS_CMD_INLINE},
457 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
458 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
459 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
460 {"type",typeCommand,2,REDIS_CMD_INLINE},
461 {"sync",syncCommand,1,REDIS_CMD_INLINE},
462 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
463 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
464 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
465 {"info",infoCommand,1,REDIS_CMD_INLINE},
466 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
467 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
468 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
469 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
470 {NULL,NULL,0,0}
471 };
472 /*============================ Utility functions ============================ */
473
474 /* Glob-style pattern matching. */
475 int stringmatchlen(const char *pattern, int patternLen,
476 const char *string, int stringLen, int nocase)
477 {
478 while(patternLen) {
479 switch(pattern[0]) {
480 case '*':
481 while (pattern[1] == '*') {
482 pattern++;
483 patternLen--;
484 }
485 if (patternLen == 1)
486 return 1; /* match */
487 while(stringLen) {
488 if (stringmatchlen(pattern+1, patternLen-1,
489 string, stringLen, nocase))
490 return 1; /* match */
491 string++;
492 stringLen--;
493 }
494 return 0; /* no match */
495 break;
496 case '?':
497 if (stringLen == 0)
498 return 0; /* no match */
499 string++;
500 stringLen--;
501 break;
502 case '[':
503 {
504 int not, match;
505
506 pattern++;
507 patternLen--;
508 not = pattern[0] == '^';
509 if (not) {
510 pattern++;
511 patternLen--;
512 }
513 match = 0;
514 while(1) {
515 if (pattern[0] == '\\') {
516 pattern++;
517 patternLen--;
518 if (pattern[0] == string[0])
519 match = 1;
520 } else if (pattern[0] == ']') {
521 break;
522 } else if (patternLen == 0) {
523 pattern--;
524 patternLen++;
525 break;
526 } else if (pattern[1] == '-' && patternLen >= 3) {
527 int start = pattern[0];
528 int end = pattern[2];
529 int c = string[0];
530 if (start > end) {
531 int t = start;
532 start = end;
533 end = t;
534 }
535 if (nocase) {
536 start = tolower(start);
537 end = tolower(end);
538 c = tolower(c);
539 }
540 pattern += 2;
541 patternLen -= 2;
542 if (c >= start && c <= end)
543 match = 1;
544 } else {
545 if (!nocase) {
546 if (pattern[0] == string[0])
547 match = 1;
548 } else {
549 if (tolower((int)pattern[0]) == tolower((int)string[0]))
550 match = 1;
551 }
552 }
553 pattern++;
554 patternLen--;
555 }
556 if (not)
557 match = !match;
558 if (!match)
559 return 0; /* no match */
560 string++;
561 stringLen--;
562 break;
563 }
564 case '\\':
565 if (patternLen >= 2) {
566 pattern++;
567 patternLen--;
568 }
569 /* fall through */
570 default:
571 if (!nocase) {
572 if (pattern[0] != string[0])
573 return 0; /* no match */
574 } else {
575 if (tolower((int)pattern[0]) != tolower((int)string[0]))
576 return 0; /* no match */
577 }
578 string++;
579 stringLen--;
580 break;
581 }
582 pattern++;
583 patternLen--;
584 if (stringLen == 0) {
585 while(*pattern == '*') {
586 pattern++;
587 patternLen--;
588 }
589 break;
590 }
591 }
592 if (patternLen == 0 && stringLen == 0)
593 return 1;
594 return 0;
595 }
596
597 static void redisLog(int level, const char *fmt, ...) {
598 va_list ap;
599 FILE *fp;
600
601 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
602 if (!fp) return;
603
604 va_start(ap, fmt);
605 if (level >= server.verbosity) {
606 char *c = ".-*";
607 char buf[64];
608 time_t now;
609
610 now = time(NULL);
611 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
612 fprintf(fp,"%s %c ",buf,c[level]);
613 vfprintf(fp, fmt, ap);
614 fprintf(fp,"\n");
615 fflush(fp);
616 }
617 va_end(ap);
618
619 if (server.logfile) fclose(fp);
620 }
621
622 /*====================== Hash table type implementation ==================== */
623
624 /* This is an hash table type that uses the SDS dynamic strings libary as
625 * keys and radis objects as values (objects can hold SDS strings,
626 * lists, sets). */
627
628 static int sdsDictKeyCompare(void *privdata, const void *key1,
629 const void *key2)
630 {
631 int l1,l2;
632 DICT_NOTUSED(privdata);
633
634 l1 = sdslen((sds)key1);
635 l2 = sdslen((sds)key2);
636 if (l1 != l2) return 0;
637 return memcmp(key1, key2, l1) == 0;
638 }
639
640 static void dictRedisObjectDestructor(void *privdata, void *val)
641 {
642 DICT_NOTUSED(privdata);
643
644 decrRefCount(val);
645 }
646
647 static int dictObjKeyCompare(void *privdata, const void *key1,
648 const void *key2)
649 {
650 const robj *o1 = key1, *o2 = key2;
651 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
652 }
653
654 static unsigned int dictObjHash(const void *key) {
655 const robj *o = key;
656 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
657 }
658
659 static int dictEncObjKeyCompare(void *privdata, const void *key1,
660 const void *key2)
661 {
662 const robj *o1 = key1, *o2 = key2;
663
664 if (o1->encoding == REDIS_ENCODING_RAW &&
665 o2->encoding == REDIS_ENCODING_RAW)
666 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
667 else {
668 robj *dec1, *dec2;
669 int cmp;
670
671 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
672 getDecodedObject(o1) : (robj*)o1;
673 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
674 getDecodedObject(o2) : (robj*)o2;
675 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
676 if (dec1 != o1) decrRefCount(dec1);
677 if (dec2 != o2) decrRefCount(dec2);
678 return cmp;
679 }
680 }
681
682 static unsigned int dictEncObjHash(const void *key) {
683 const robj *o = key;
684
685 if (o->encoding == REDIS_ENCODING_RAW)
686 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
687 else {
688 robj *dec = getDecodedObject(o);
689 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
690 decrRefCount(dec);
691 return hash;
692 }
693 }
694
695 static dictType setDictType = {
696 dictEncObjHash, /* hash function */
697 NULL, /* key dup */
698 NULL, /* val dup */
699 dictEncObjKeyCompare, /* key compare */
700 dictRedisObjectDestructor, /* key destructor */
701 NULL /* val destructor */
702 };
703
704 static dictType hashDictType = {
705 dictObjHash, /* hash function */
706 NULL, /* key dup */
707 NULL, /* val dup */
708 dictObjKeyCompare, /* key compare */
709 dictRedisObjectDestructor, /* key destructor */
710 dictRedisObjectDestructor /* val destructor */
711 };
712
713 /* ========================= Random utility functions ======================= */
714
715 /* Redis generally does not try to recover from out of memory conditions
716 * when allocating objects or strings, it is not clear if it will be possible
717 * to report this condition to the client since the networking layer itself
718 * is based on heap allocation for send buffers, so we simply abort.
719 * At least the code will be simpler to read... */
720 static void oom(const char *msg) {
721 fprintf(stderr, "%s: Out of memory\n",msg);
722 fflush(stderr);
723 sleep(1);
724 abort();
725 }
726
727 /* ====================== Redis server networking stuff ===================== */
728 static void closeTimedoutClients(void) {
729 redisClient *c;
730 listNode *ln;
731 time_t now = time(NULL);
732
733 listRewind(server.clients);
734 while ((ln = listYield(server.clients)) != NULL) {
735 c = listNodeValue(ln);
736 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
737 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
738 (now - c->lastinteraction > server.maxidletime)) {
739 redisLog(REDIS_DEBUG,"Closing idle client");
740 freeClient(c);
741 }
742 }
743 }
744
745 static int htNeedsResize(dict *dict) {
746 long long size, used;
747
748 size = dictSlots(dict);
749 used = dictSize(dict);
750 return (size && used && size > DICT_HT_INITIAL_SIZE &&
751 (used*100/size < REDIS_HT_MINFILL));
752 }
753
754 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
755 * we resize the hash table to save memory */
756 static void tryResizeHashTables(void) {
757 int j;
758
759 for (j = 0; j < server.dbnum; j++) {
760 if (htNeedsResize(server.db[j].dict)) {
761 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
762 dictResize(server.db[j].dict);
763 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
764 }
765 if (htNeedsResize(server.db[j].expires))
766 dictResize(server.db[j].expires);
767 }
768 }
769
770 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
771 int j, loops = server.cronloops++;
772 REDIS_NOTUSED(eventLoop);
773 REDIS_NOTUSED(id);
774 REDIS_NOTUSED(clientData);
775
776 /* Update the global state with the amount of used memory */
777 server.usedmemory = zmalloc_used_memory();
778
779 /* Show some info about non-empty databases */
780 for (j = 0; j < server.dbnum; j++) {
781 long long size, used, vkeys;
782
783 size = dictSlots(server.db[j].dict);
784 used = dictSize(server.db[j].dict);
785 vkeys = dictSize(server.db[j].expires);
786 if (!(loops % 5) && (used || vkeys)) {
787 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
788 /* dictPrintStats(server.dict); */
789 }
790 }
791
792 /* We don't want to resize the hash tables while a bacground saving
793 * is in progress: the saving child is created using fork() that is
794 * implemented with a copy-on-write semantic in most modern systems, so
795 * if we resize the HT while there is the saving child at work actually
796 * a lot of memory movements in the parent will cause a lot of pages
797 * copied. */
798 if (!server.bgsaveinprogress) tryResizeHashTables();
799
800 /* Show information about connected clients */
801 if (!(loops % 5)) {
802 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
803 listLength(server.clients)-listLength(server.slaves),
804 listLength(server.slaves),
805 server.usedmemory,
806 dictSize(server.sharingpool));
807 }
808
809 /* Close connections of timedout clients */
810 if (server.maxidletime && !(loops % 10))
811 closeTimedoutClients();
812
813 /* Check if a background saving in progress terminated */
814 if (server.bgsaveinprogress) {
815 int statloc;
816 if (wait4(-1,&statloc,WNOHANG,NULL)) {
817 int exitcode = WEXITSTATUS(statloc);
818 int bysignal = WIFSIGNALED(statloc);
819
820 if (!bysignal && exitcode == 0) {
821 redisLog(REDIS_NOTICE,
822 "Background saving terminated with success");
823 server.dirty = 0;
824 server.lastsave = time(NULL);
825 } else if (!bysignal && exitcode != 0) {
826 redisLog(REDIS_WARNING, "Background saving error");
827 } else {
828 redisLog(REDIS_WARNING,
829 "Background saving terminated by signal");
830 rdbRemoveTempFile(server.bgsavechildpid);
831 }
832 server.bgsaveinprogress = 0;
833 server.bgsavechildpid = -1;
834 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
835 }
836 } else {
837 /* If there is not a background saving in progress check if
838 * we have to save now */
839 time_t now = time(NULL);
840 for (j = 0; j < server.saveparamslen; j++) {
841 struct saveparam *sp = server.saveparams+j;
842
843 if (server.dirty >= sp->changes &&
844 now-server.lastsave > sp->seconds) {
845 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
846 sp->changes, sp->seconds);
847 rdbSaveBackground(server.dbfilename);
848 break;
849 }
850 }
851 }
852
853 /* Try to expire a few timed out keys */
854 for (j = 0; j < server.dbnum; j++) {
855 redisDb *db = server.db+j;
856 int num = dictSize(db->expires);
857
858 if (num) {
859 time_t now = time(NULL);
860
861 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
862 num = REDIS_EXPIRELOOKUPS_PER_CRON;
863 while (num--) {
864 dictEntry *de;
865 time_t t;
866
867 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
868 t = (time_t) dictGetEntryVal(de);
869 if (now > t) {
870 deleteKey(db,dictGetEntryKey(de));
871 }
872 }
873 }
874 }
875
876 /* Check if we should connect to a MASTER */
877 if (server.replstate == REDIS_REPL_CONNECT) {
878 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
879 if (syncWithMaster() == REDIS_OK) {
880 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
881 }
882 }
883 return 1000;
884 }
885
886 static void createSharedObjects(void) {
887 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
888 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
889 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
890 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
891 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
892 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
893 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
894 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
895 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
896 /* no such key */
897 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
898 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
899 "-ERR Operation against a key holding the wrong kind of value\r\n"));
900 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
901 "-ERR no such key\r\n"));
902 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
903 "-ERR syntax error\r\n"));
904 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
905 "-ERR source and destination objects are the same\r\n"));
906 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
907 "-ERR index out of range\r\n"));
908 shared.space = createObject(REDIS_STRING,sdsnew(" "));
909 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
910 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
911 shared.select0 = createStringObject("select 0\r\n",10);
912 shared.select1 = createStringObject("select 1\r\n",10);
913 shared.select2 = createStringObject("select 2\r\n",10);
914 shared.select3 = createStringObject("select 3\r\n",10);
915 shared.select4 = createStringObject("select 4\r\n",10);
916 shared.select5 = createStringObject("select 5\r\n",10);
917 shared.select6 = createStringObject("select 6\r\n",10);
918 shared.select7 = createStringObject("select 7\r\n",10);
919 shared.select8 = createStringObject("select 8\r\n",10);
920 shared.select9 = createStringObject("select 9\r\n",10);
921 }
922
923 static void appendServerSaveParams(time_t seconds, int changes) {
924 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
925 if (server.saveparams == NULL) oom("appendServerSaveParams");
926 server.saveparams[server.saveparamslen].seconds = seconds;
927 server.saveparams[server.saveparamslen].changes = changes;
928 server.saveparamslen++;
929 }
930
931 static void ResetServerSaveParams() {
932 zfree(server.saveparams);
933 server.saveparams = NULL;
934 server.saveparamslen = 0;
935 }
936
937 static void initServerConfig() {
938 server.dbnum = REDIS_DEFAULT_DBNUM;
939 server.port = REDIS_SERVERPORT;
940 server.verbosity = REDIS_DEBUG;
941 server.maxidletime = REDIS_MAXIDLETIME;
942 server.saveparams = NULL;
943 server.logfile = NULL; /* NULL = log on standard output */
944 server.bindaddr = NULL;
945 server.glueoutputbuf = 1;
946 server.daemonize = 0;
947 server.pidfile = "/var/run/redis.pid";
948 server.dbfilename = "dump.rdb";
949 server.requirepass = NULL;
950 server.shareobjects = 0;
951 server.sharingpoolsize = 1024;
952 server.maxclients = 0;
953 server.maxmemory = 0;
954 ResetServerSaveParams();
955
956 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
957 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
958 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
959 /* Replication related */
960 server.isslave = 0;
961 server.masterhost = NULL;
962 server.masterport = 6379;
963 server.master = NULL;
964 server.replstate = REDIS_REPL_NONE;
965 }
966
967 static void initServer() {
968 int j;
969
970 signal(SIGHUP, SIG_IGN);
971 signal(SIGPIPE, SIG_IGN);
972 setupSigSegvAction();
973
974 server.clients = listCreate();
975 server.slaves = listCreate();
976 server.monitors = listCreate();
977 server.objfreelist = listCreate();
978 createSharedObjects();
979 server.el = aeCreateEventLoop();
980 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
981 server.sharingpool = dictCreate(&setDictType,NULL);
982 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
983 oom("server initialization"); /* Fatal OOM */
984 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
985 if (server.fd == -1) {
986 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
987 exit(1);
988 }
989 for (j = 0; j < server.dbnum; j++) {
990 server.db[j].dict = dictCreate(&hashDictType,NULL);
991 server.db[j].expires = dictCreate(&setDictType,NULL);
992 server.db[j].id = j;
993 }
994 server.cronloops = 0;
995 server.bgsaveinprogress = 0;
996 server.bgsavechildpid = -1;
997 server.lastsave = time(NULL);
998 server.dirty = 0;
999 server.usedmemory = 0;
1000 server.stat_numcommands = 0;
1001 server.stat_numconnections = 0;
1002 server.stat_starttime = time(NULL);
1003 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1004 }
1005
1006 /* Empty the whole database */
1007 static long long emptyDb() {
1008 int j;
1009 long long removed = 0;
1010
1011 for (j = 0; j < server.dbnum; j++) {
1012 removed += dictSize(server.db[j].dict);
1013 dictEmpty(server.db[j].dict);
1014 dictEmpty(server.db[j].expires);
1015 }
1016 return removed;
1017 }
1018
1019 static int yesnotoi(char *s) {
1020 if (!strcasecmp(s,"yes")) return 1;
1021 else if (!strcasecmp(s,"no")) return 0;
1022 else return -1;
1023 }
1024
1025 /* I agree, this is a very rudimental way to load a configuration...
1026 will improve later if the config gets more complex */
1027 static void loadServerConfig(char *filename) {
1028 FILE *fp;
1029 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1030 int linenum = 0;
1031 sds line = NULL;
1032
1033 if (filename[0] == '-' && filename[1] == '\0')
1034 fp = stdin;
1035 else {
1036 if ((fp = fopen(filename,"r")) == NULL) {
1037 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1038 exit(1);
1039 }
1040 }
1041
1042 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1043 sds *argv;
1044 int argc, j;
1045
1046 linenum++;
1047 line = sdsnew(buf);
1048 line = sdstrim(line," \t\r\n");
1049
1050 /* Skip comments and blank lines*/
1051 if (line[0] == '#' || line[0] == '\0') {
1052 sdsfree(line);
1053 continue;
1054 }
1055
1056 /* Split into arguments */
1057 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1058 sdstolower(argv[0]);
1059
1060 /* Execute config directives */
1061 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1062 server.maxidletime = atoi(argv[1]);
1063 if (server.maxidletime < 0) {
1064 err = "Invalid timeout value"; goto loaderr;
1065 }
1066 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1067 server.port = atoi(argv[1]);
1068 if (server.port < 1 || server.port > 65535) {
1069 err = "Invalid port"; goto loaderr;
1070 }
1071 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1072 server.bindaddr = zstrdup(argv[1]);
1073 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1074 int seconds = atoi(argv[1]);
1075 int changes = atoi(argv[2]);
1076 if (seconds < 1 || changes < 0) {
1077 err = "Invalid save parameters"; goto loaderr;
1078 }
1079 appendServerSaveParams(seconds,changes);
1080 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1081 if (chdir(argv[1]) == -1) {
1082 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1083 argv[1], strerror(errno));
1084 exit(1);
1085 }
1086 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1087 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1088 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1089 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1090 else {
1091 err = "Invalid log level. Must be one of debug, notice, warning";
1092 goto loaderr;
1093 }
1094 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1095 FILE *logfp;
1096
1097 server.logfile = zstrdup(argv[1]);
1098 if (!strcasecmp(server.logfile,"stdout")) {
1099 zfree(server.logfile);
1100 server.logfile = NULL;
1101 }
1102 if (server.logfile) {
1103 /* Test if we are able to open the file. The server will not
1104 * be able to abort just for this problem later... */
1105 logfp = fopen(server.logfile,"a");
1106 if (logfp == NULL) {
1107 err = sdscatprintf(sdsempty(),
1108 "Can't open the log file: %s", strerror(errno));
1109 goto loaderr;
1110 }
1111 fclose(logfp);
1112 }
1113 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1114 server.dbnum = atoi(argv[1]);
1115 if (server.dbnum < 1) {
1116 err = "Invalid number of databases"; goto loaderr;
1117 }
1118 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1119 server.maxclients = atoi(argv[1]);
1120 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1121 server.maxmemory = strtoll(argv[1], NULL, 10);
1122 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1123 server.masterhost = sdsnew(argv[1]);
1124 server.masterport = atoi(argv[2]);
1125 server.replstate = REDIS_REPL_CONNECT;
1126 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1127 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1128 err = "argument must be 'yes' or 'no'"; goto loaderr;
1129 }
1130 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1131 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1132 err = "argument must be 'yes' or 'no'"; goto loaderr;
1133 }
1134 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1135 server.sharingpoolsize = atoi(argv[1]);
1136 if (server.sharingpoolsize < 1) {
1137 err = "invalid object sharing pool size"; goto loaderr;
1138 }
1139 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1140 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1141 err = "argument must be 'yes' or 'no'"; goto loaderr;
1142 }
1143 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1144 server.requirepass = zstrdup(argv[1]);
1145 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1146 server.pidfile = zstrdup(argv[1]);
1147 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1148 server.dbfilename = zstrdup(argv[1]);
1149 } else {
1150 err = "Bad directive or wrong number of arguments"; goto loaderr;
1151 }
1152 for (j = 0; j < argc; j++)
1153 sdsfree(argv[j]);
1154 zfree(argv);
1155 sdsfree(line);
1156 }
1157 if (fp != stdin) fclose(fp);
1158 return;
1159
1160 loaderr:
1161 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1162 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1163 fprintf(stderr, ">>> '%s'\n", line);
1164 fprintf(stderr, "%s\n", err);
1165 exit(1);
1166 }
1167
1168 static void freeClientArgv(redisClient *c) {
1169 int j;
1170
1171 for (j = 0; j < c->argc; j++)
1172 decrRefCount(c->argv[j]);
1173 for (j = 0; j < c->mbargc; j++)
1174 decrRefCount(c->mbargv[j]);
1175 c->argc = 0;
1176 c->mbargc = 0;
1177 }
1178
1179 static void freeClient(redisClient *c) {
1180 listNode *ln;
1181
1182 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1183 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1184 sdsfree(c->querybuf);
1185 listRelease(c->reply);
1186 freeClientArgv(c);
1187 close(c->fd);
1188 ln = listSearchKey(server.clients,c);
1189 assert(ln != NULL);
1190 listDelNode(server.clients,ln);
1191 if (c->flags & REDIS_SLAVE) {
1192 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1193 close(c->repldbfd);
1194 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1195 ln = listSearchKey(l,c);
1196 assert(ln != NULL);
1197 listDelNode(l,ln);
1198 }
1199 if (c->flags & REDIS_MASTER) {
1200 server.master = NULL;
1201 server.replstate = REDIS_REPL_CONNECT;
1202 }
1203 zfree(c->argv);
1204 zfree(c->mbargv);
1205 zfree(c);
1206 }
1207
1208 static void glueReplyBuffersIfNeeded(redisClient *c) {
1209 int totlen = 0;
1210 listNode *ln;
1211 robj *o;
1212
1213 listRewind(c->reply);
1214 while((ln = listYield(c->reply))) {
1215 o = ln->value;
1216 totlen += sdslen(o->ptr);
1217 /* This optimization makes more sense if we don't have to copy
1218 * too much data */
1219 if (totlen > 1024) return;
1220 }
1221 if (totlen > 0) {
1222 char buf[1024];
1223 int copylen = 0;
1224
1225 listRewind(c->reply);
1226 while((ln = listYield(c->reply))) {
1227 o = ln->value;
1228 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1229 copylen += sdslen(o->ptr);
1230 listDelNode(c->reply,ln);
1231 }
1232 /* Now the output buffer is empty, add the new single element */
1233 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1234 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1235 }
1236 }
1237
1238 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1239 redisClient *c = privdata;
1240 int nwritten = 0, totwritten = 0, objlen;
1241 robj *o;
1242 REDIS_NOTUSED(el);
1243 REDIS_NOTUSED(mask);
1244
1245 if (server.glueoutputbuf && listLength(c->reply) > 1)
1246 glueReplyBuffersIfNeeded(c);
1247 while(listLength(c->reply)) {
1248 o = listNodeValue(listFirst(c->reply));
1249 objlen = sdslen(o->ptr);
1250
1251 if (objlen == 0) {
1252 listDelNode(c->reply,listFirst(c->reply));
1253 continue;
1254 }
1255
1256 if (c->flags & REDIS_MASTER) {
1257 /* Don't reply to a master */
1258 nwritten = objlen - c->sentlen;
1259 } else {
1260 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1261 if (nwritten <= 0) break;
1262 }
1263 c->sentlen += nwritten;
1264 totwritten += nwritten;
1265 /* If we fully sent the object on head go to the next one */
1266 if (c->sentlen == objlen) {
1267 listDelNode(c->reply,listFirst(c->reply));
1268 c->sentlen = 0;
1269 }
1270 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1271 * bytes, in a single threaded server it's a good idea to server
1272 * other clients as well, even if a very large request comes from
1273 * super fast link that is always able to accept data (in real world
1274 * terms think to 'KEYS *' against the loopback interfae) */
1275 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1276 }
1277 if (nwritten == -1) {
1278 if (errno == EAGAIN) {
1279 nwritten = 0;
1280 } else {
1281 redisLog(REDIS_DEBUG,
1282 "Error writing to client: %s", strerror(errno));
1283 freeClient(c);
1284 return;
1285 }
1286 }
1287 if (totwritten > 0) c->lastinteraction = time(NULL);
1288 if (listLength(c->reply) == 0) {
1289 c->sentlen = 0;
1290 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1291 }
1292 }
1293
1294 static struct redisCommand *lookupCommand(char *name) {
1295 int j = 0;
1296 while(cmdTable[j].name != NULL) {
1297 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1298 j++;
1299 }
1300 return NULL;
1301 }
1302
1303 /* resetClient prepare the client to process the next command */
1304 static void resetClient(redisClient *c) {
1305 freeClientArgv(c);
1306 c->bulklen = -1;
1307 c->multibulk = 0;
1308 }
1309
1310 /* If this function gets called we already read a whole
1311 * command, argments are in the client argv/argc fields.
1312 * processCommand() execute the command or prepare the
1313 * server for a bulk read from the client.
1314 *
1315 * If 1 is returned the client is still alive and valid and
1316 * and other operations can be performed by the caller. Otherwise
1317 * if 0 is returned the client was destroied (i.e. after QUIT). */
1318 static int processCommand(redisClient *c) {
1319 struct redisCommand *cmd;
1320 long long dirty;
1321
1322 /* Free some memory if needed (maxmemory setting) */
1323 if (server.maxmemory) freeMemoryIfNeeded();
1324
1325 /* Handle the multi bulk command type. This is an alternative protocol
1326 * supported by Redis in order to receive commands that are composed of
1327 * multiple binary-safe "bulk" arguments. The latency of processing is
1328 * a bit higher but this allows things like multi-sets, so if this
1329 * protocol is used only for MSET and similar commands this is a big win. */
1330 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1331 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1332 if (c->multibulk <= 0) {
1333 resetClient(c);
1334 return 1;
1335 } else {
1336 decrRefCount(c->argv[c->argc-1]);
1337 c->argc--;
1338 return 1;
1339 }
1340 } else if (c->multibulk) {
1341 if (c->bulklen == -1) {
1342 if (((char*)c->argv[0]->ptr)[0] != '$') {
1343 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1344 resetClient(c);
1345 return 1;
1346 } else {
1347 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1348 decrRefCount(c->argv[0]);
1349 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1350 c->argc--;
1351 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1352 resetClient(c);
1353 return 1;
1354 }
1355 c->argc--;
1356 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1357 return 1;
1358 }
1359 } else {
1360 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1361 c->mbargv[c->mbargc] = c->argv[0];
1362 c->mbargc++;
1363 c->argc--;
1364 c->multibulk--;
1365 if (c->multibulk == 0) {
1366 robj **auxargv;
1367 int auxargc;
1368
1369 /* Here we need to swap the multi-bulk argc/argv with the
1370 * normal argc/argv of the client structure. */
1371 auxargv = c->argv;
1372 c->argv = c->mbargv;
1373 c->mbargv = auxargv;
1374
1375 auxargc = c->argc;
1376 c->argc = c->mbargc;
1377 c->mbargc = auxargc;
1378
1379 /* We need to set bulklen to something different than -1
1380 * in order for the code below to process the command without
1381 * to try to read the last argument of a bulk command as
1382 * a special argument. */
1383 c->bulklen = 0;
1384 /* continue below and process the command */
1385 } else {
1386 c->bulklen = -1;
1387 return 1;
1388 }
1389 }
1390 }
1391 /* -- end of multi bulk commands processing -- */
1392
1393 /* The QUIT command is handled as a special case. Normal command
1394 * procs are unable to close the client connection safely */
1395 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1396 freeClient(c);
1397 return 0;
1398 }
1399 cmd = lookupCommand(c->argv[0]->ptr);
1400 if (!cmd) {
1401 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1402 resetClient(c);
1403 return 1;
1404 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1405 (c->argc < -cmd->arity)) {
1406 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1407 resetClient(c);
1408 return 1;
1409 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1410 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1411 resetClient(c);
1412 return 1;
1413 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1414 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1415
1416 decrRefCount(c->argv[c->argc-1]);
1417 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1418 c->argc--;
1419 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1420 resetClient(c);
1421 return 1;
1422 }
1423 c->argc--;
1424 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1425 /* It is possible that the bulk read is already in the
1426 * buffer. Check this condition and handle it accordingly */
1427 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1428 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1429 c->argc++;
1430 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1431 } else {
1432 return 1;
1433 }
1434 }
1435 /* Let's try to share objects on the command arguments vector */
1436 if (server.shareobjects) {
1437 int j;
1438 for(j = 1; j < c->argc; j++)
1439 c->argv[j] = tryObjectSharing(c->argv[j]);
1440 }
1441 /* Let's try to encode the bulk object to save space. */
1442 if (cmd->flags & REDIS_CMD_BULK)
1443 tryObjectEncoding(c->argv[c->argc-1]);
1444
1445 /* Check if the user is authenticated */
1446 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1447 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1448 resetClient(c);
1449 return 1;
1450 }
1451
1452 /* Exec the command */
1453 dirty = server.dirty;
1454 cmd->proc(c);
1455 if (server.dirty-dirty != 0 && listLength(server.slaves))
1456 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1457 if (listLength(server.monitors))
1458 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1459 server.stat_numcommands++;
1460
1461 /* Prepare the client for the next command */
1462 if (c->flags & REDIS_CLOSE) {
1463 freeClient(c);
1464 return 0;
1465 }
1466 resetClient(c);
1467 return 1;
1468 }
1469
1470 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1471 listNode *ln;
1472 int outc = 0, j;
1473 robj **outv;
1474 /* (args*2)+1 is enough room for args, spaces, newlines */
1475 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1476
1477 if (argc <= REDIS_STATIC_ARGS) {
1478 outv = static_outv;
1479 } else {
1480 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1481 if (!outv) oom("replicationFeedSlaves");
1482 }
1483
1484 for (j = 0; j < argc; j++) {
1485 if (j != 0) outv[outc++] = shared.space;
1486 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1487 robj *lenobj;
1488
1489 lenobj = createObject(REDIS_STRING,
1490 sdscatprintf(sdsempty(),"%d\r\n",
1491 stringObjectLen(argv[j])));
1492 lenobj->refcount = 0;
1493 outv[outc++] = lenobj;
1494 }
1495 outv[outc++] = argv[j];
1496 }
1497 outv[outc++] = shared.crlf;
1498
1499 /* Increment all the refcounts at start and decrement at end in order to
1500 * be sure to free objects if there is no slave in a replication state
1501 * able to be feed with commands */
1502 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1503 listRewind(slaves);
1504 while((ln = listYield(slaves))) {
1505 redisClient *slave = ln->value;
1506
1507 /* Don't feed slaves that are still waiting for BGSAVE to start */
1508 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1509
1510 /* Feed all the other slaves, MONITORs and so on */
1511 if (slave->slaveseldb != dictid) {
1512 robj *selectcmd;
1513
1514 switch(dictid) {
1515 case 0: selectcmd = shared.select0; break;
1516 case 1: selectcmd = shared.select1; break;
1517 case 2: selectcmd = shared.select2; break;
1518 case 3: selectcmd = shared.select3; break;
1519 case 4: selectcmd = shared.select4; break;
1520 case 5: selectcmd = shared.select5; break;
1521 case 6: selectcmd = shared.select6; break;
1522 case 7: selectcmd = shared.select7; break;
1523 case 8: selectcmd = shared.select8; break;
1524 case 9: selectcmd = shared.select9; break;
1525 default:
1526 selectcmd = createObject(REDIS_STRING,
1527 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1528 selectcmd->refcount = 0;
1529 break;
1530 }
1531 addReply(slave,selectcmd);
1532 slave->slaveseldb = dictid;
1533 }
1534 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1535 }
1536 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1537 if (outv != static_outv) zfree(outv);
1538 }
1539
1540 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1541 redisClient *c = (redisClient*) privdata;
1542 char buf[REDIS_IOBUF_LEN];
1543 int nread;
1544 REDIS_NOTUSED(el);
1545 REDIS_NOTUSED(mask);
1546
1547 nread = read(fd, buf, REDIS_IOBUF_LEN);
1548 if (nread == -1) {
1549 if (errno == EAGAIN) {
1550 nread = 0;
1551 } else {
1552 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1553 freeClient(c);
1554 return;
1555 }
1556 } else if (nread == 0) {
1557 redisLog(REDIS_DEBUG, "Client closed connection");
1558 freeClient(c);
1559 return;
1560 }
1561 if (nread) {
1562 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1563 c->lastinteraction = time(NULL);
1564 } else {
1565 return;
1566 }
1567
1568 again:
1569 if (c->bulklen == -1) {
1570 /* Read the first line of the query */
1571 char *p = strchr(c->querybuf,'\n');
1572 size_t querylen;
1573
1574 if (p) {
1575 sds query, *argv;
1576 int argc, j;
1577
1578 query = c->querybuf;
1579 c->querybuf = sdsempty();
1580 querylen = 1+(p-(query));
1581 if (sdslen(query) > querylen) {
1582 /* leave data after the first line of the query in the buffer */
1583 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1584 }
1585 *p = '\0'; /* remove "\n" */
1586 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1587 sdsupdatelen(query);
1588
1589 /* Now we can split the query in arguments */
1590 if (sdslen(query) == 0) {
1591 /* Ignore empty query */
1592 sdsfree(query);
1593 return;
1594 }
1595 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1596 if (argv == NULL) oom("sdssplitlen");
1597 sdsfree(query);
1598
1599 if (c->argv) zfree(c->argv);
1600 c->argv = zmalloc(sizeof(robj*)*argc);
1601 if (c->argv == NULL) oom("allocating arguments list for client");
1602
1603 for (j = 0; j < argc; j++) {
1604 if (sdslen(argv[j])) {
1605 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1606 c->argc++;
1607 } else {
1608 sdsfree(argv[j]);
1609 }
1610 }
1611 zfree(argv);
1612 /* Execute the command. If the client is still valid
1613 * after processCommand() return and there is something
1614 * on the query buffer try to process the next command. */
1615 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1616 return;
1617 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1618 redisLog(REDIS_DEBUG, "Client protocol error");
1619 freeClient(c);
1620 return;
1621 }
1622 } else {
1623 /* Bulk read handling. Note that if we are at this point
1624 the client already sent a command terminated with a newline,
1625 we are reading the bulk data that is actually the last
1626 argument of the command. */
1627 int qbl = sdslen(c->querybuf);
1628
1629 if (c->bulklen <= qbl) {
1630 /* Copy everything but the final CRLF as final argument */
1631 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1632 c->argc++;
1633 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1634 processCommand(c);
1635 return;
1636 }
1637 }
1638 }
1639
1640 static int selectDb(redisClient *c, int id) {
1641 if (id < 0 || id >= server.dbnum)
1642 return REDIS_ERR;
1643 c->db = &server.db[id];
1644 return REDIS_OK;
1645 }
1646
1647 static void *dupClientReplyValue(void *o) {
1648 incrRefCount((robj*)o);
1649 return 0;
1650 }
1651
1652 static redisClient *createClient(int fd) {
1653 redisClient *c = zmalloc(sizeof(*c));
1654
1655 anetNonBlock(NULL,fd);
1656 anetTcpNoDelay(NULL,fd);
1657 if (!c) return NULL;
1658 selectDb(c,0);
1659 c->fd = fd;
1660 c->querybuf = sdsempty();
1661 c->argc = 0;
1662 c->argv = NULL;
1663 c->bulklen = -1;
1664 c->multibulk = 0;
1665 c->mbargc = 0;
1666 c->mbargv = NULL;
1667 c->sentlen = 0;
1668 c->flags = 0;
1669 c->lastinteraction = time(NULL);
1670 c->authenticated = 0;
1671 c->replstate = REDIS_REPL_NONE;
1672 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1673 listSetFreeMethod(c->reply,decrRefCount);
1674 listSetDupMethod(c->reply,dupClientReplyValue);
1675 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1676 readQueryFromClient, c, NULL) == AE_ERR) {
1677 freeClient(c);
1678 return NULL;
1679 }
1680 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1681 return c;
1682 }
1683
1684 static void addReply(redisClient *c, robj *obj) {
1685 if (listLength(c->reply) == 0 &&
1686 (c->replstate == REDIS_REPL_NONE ||
1687 c->replstate == REDIS_REPL_ONLINE) &&
1688 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1689 sendReplyToClient, c, NULL) == AE_ERR) return;
1690 if (obj->encoding != REDIS_ENCODING_RAW) {
1691 obj = getDecodedObject(obj);
1692 } else {
1693 incrRefCount(obj);
1694 }
1695 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1696 }
1697
1698 static void addReplySds(redisClient *c, sds s) {
1699 robj *o = createObject(REDIS_STRING,s);
1700 addReply(c,o);
1701 decrRefCount(o);
1702 }
1703
1704 static void addReplyBulkLen(redisClient *c, robj *obj) {
1705 size_t len;
1706
1707 if (obj->encoding == REDIS_ENCODING_RAW) {
1708 len = sdslen(obj->ptr);
1709 } else {
1710 long n = (long)obj->ptr;
1711
1712 len = 1;
1713 if (n < 0) {
1714 len++;
1715 n = -n;
1716 }
1717 while((n = n/10) != 0) {
1718 len++;
1719 }
1720 }
1721 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1722 }
1723
1724 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1725 int cport, cfd;
1726 char cip[128];
1727 redisClient *c;
1728 REDIS_NOTUSED(el);
1729 REDIS_NOTUSED(mask);
1730 REDIS_NOTUSED(privdata);
1731
1732 cfd = anetAccept(server.neterr, fd, cip, &cport);
1733 if (cfd == AE_ERR) {
1734 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1735 return;
1736 }
1737 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1738 if ((c = createClient(cfd)) == NULL) {
1739 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1740 close(cfd); /* May be already closed, just ingore errors */
1741 return;
1742 }
1743 /* If maxclient directive is set and this is one client more... close the
1744 * connection. Note that we create the client instead to check before
1745 * for this condition, since now the socket is already set in nonblocking
1746 * mode and we can send an error for free using the Kernel I/O */
1747 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1748 char *err = "-ERR max number of clients reached\r\n";
1749
1750 /* That's a best effort error message, don't check write errors */
1751 (void) write(c->fd,err,strlen(err));
1752 freeClient(c);
1753 return;
1754 }
1755 server.stat_numconnections++;
1756 }
1757
1758 /* ======================= Redis objects implementation ===================== */
1759
1760 static robj *createObject(int type, void *ptr) {
1761 robj *o;
1762
1763 if (listLength(server.objfreelist)) {
1764 listNode *head = listFirst(server.objfreelist);
1765 o = listNodeValue(head);
1766 listDelNode(server.objfreelist,head);
1767 } else {
1768 o = zmalloc(sizeof(*o));
1769 }
1770 if (!o) oom("createObject");
1771 o->type = type;
1772 o->encoding = REDIS_ENCODING_RAW;
1773 o->ptr = ptr;
1774 o->refcount = 1;
1775 return o;
1776 }
1777
1778 static robj *createStringObject(char *ptr, size_t len) {
1779 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1780 }
1781
1782 static robj *createListObject(void) {
1783 list *l = listCreate();
1784
1785 if (!l) oom("listCreate");
1786 listSetFreeMethod(l,decrRefCount);
1787 return createObject(REDIS_LIST,l);
1788 }
1789
1790 static robj *createSetObject(void) {
1791 dict *d = dictCreate(&setDictType,NULL);
1792 if (!d) oom("dictCreate");
1793 return createObject(REDIS_SET,d);
1794 }
1795
1796 static void freeStringObject(robj *o) {
1797 if (o->encoding == REDIS_ENCODING_RAW) {
1798 sdsfree(o->ptr);
1799 }
1800 }
1801
1802 static void freeListObject(robj *o) {
1803 listRelease((list*) o->ptr);
1804 }
1805
1806 static void freeSetObject(robj *o) {
1807 dictRelease((dict*) o->ptr);
1808 }
1809
1810 static void freeHashObject(robj *o) {
1811 dictRelease((dict*) o->ptr);
1812 }
1813
1814 static void incrRefCount(robj *o) {
1815 o->refcount++;
1816 #ifdef DEBUG_REFCOUNT
1817 if (o->type == REDIS_STRING)
1818 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1819 #endif
1820 }
1821
1822 static void decrRefCount(void *obj) {
1823 robj *o = obj;
1824
1825 #ifdef DEBUG_REFCOUNT
1826 if (o->type == REDIS_STRING)
1827 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1828 #endif
1829 if (--(o->refcount) == 0) {
1830 switch(o->type) {
1831 case REDIS_STRING: freeStringObject(o); break;
1832 case REDIS_LIST: freeListObject(o); break;
1833 case REDIS_SET: freeSetObject(o); break;
1834 case REDIS_HASH: freeHashObject(o); break;
1835 default: assert(0 != 0); break;
1836 }
1837 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1838 !listAddNodeHead(server.objfreelist,o))
1839 zfree(o);
1840 }
1841 }
1842
1843 static robj *lookupKey(redisDb *db, robj *key) {
1844 dictEntry *de = dictFind(db->dict,key);
1845 return de ? dictGetEntryVal(de) : NULL;
1846 }
1847
1848 static robj *lookupKeyRead(redisDb *db, robj *key) {
1849 expireIfNeeded(db,key);
1850 return lookupKey(db,key);
1851 }
1852
1853 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1854 deleteIfVolatile(db,key);
1855 return lookupKey(db,key);
1856 }
1857
1858 static int deleteKey(redisDb *db, robj *key) {
1859 int retval;
1860
1861 /* We need to protect key from destruction: after the first dictDelete()
1862 * it may happen that 'key' is no longer valid if we don't increment
1863 * it's count. This may happen when we get the object reference directly
1864 * from the hash table with dictRandomKey() or dict iterators */
1865 incrRefCount(key);
1866 if (dictSize(db->expires)) dictDelete(db->expires,key);
1867 retval = dictDelete(db->dict,key);
1868 decrRefCount(key);
1869
1870 return retval == DICT_OK;
1871 }
1872
1873 /* Try to share an object against the shared objects pool */
1874 static robj *tryObjectSharing(robj *o) {
1875 struct dictEntry *de;
1876 unsigned long c;
1877
1878 if (o == NULL || server.shareobjects == 0) return o;
1879
1880 assert(o->type == REDIS_STRING);
1881 de = dictFind(server.sharingpool,o);
1882 if (de) {
1883 robj *shared = dictGetEntryKey(de);
1884
1885 c = ((unsigned long) dictGetEntryVal(de))+1;
1886 dictGetEntryVal(de) = (void*) c;
1887 incrRefCount(shared);
1888 decrRefCount(o);
1889 return shared;
1890 } else {
1891 /* Here we are using a stream algorihtm: Every time an object is
1892 * shared we increment its count, everytime there is a miss we
1893 * recrement the counter of a random object. If this object reaches
1894 * zero we remove the object and put the current object instead. */
1895 if (dictSize(server.sharingpool) >=
1896 server.sharingpoolsize) {
1897 de = dictGetRandomKey(server.sharingpool);
1898 assert(de != NULL);
1899 c = ((unsigned long) dictGetEntryVal(de))-1;
1900 dictGetEntryVal(de) = (void*) c;
1901 if (c == 0) {
1902 dictDelete(server.sharingpool,de->key);
1903 }
1904 } else {
1905 c = 0; /* If the pool is empty we want to add this object */
1906 }
1907 if (c == 0) {
1908 int retval;
1909
1910 retval = dictAdd(server.sharingpool,o,(void*)1);
1911 assert(retval == DICT_OK);
1912 incrRefCount(o);
1913 }
1914 return o;
1915 }
1916 }
1917
1918 /* Check if the nul-terminated string 's' can be represented by a long
1919 * (that is, is a number that fits into long without any other space or
1920 * character before or after the digits).
1921 *
1922 * If so, the function returns REDIS_OK and *longval is set to the value
1923 * of the number. Otherwise REDIS_ERR is returned */
1924 static int isStringRepresentableAsLong(char *s, long *longval) {
1925 char buf[32], *endptr;
1926 long value;
1927 int slen;
1928
1929 value = strtol(s, &endptr, 10);
1930 if (endptr[0] != '\0') return REDIS_ERR;
1931 slen = snprintf(buf,32,"%ld",value);
1932
1933 /* If the number converted back into a string is not identical
1934 * then it's not possible to encode the string as integer */
1935 if (strlen(buf) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
1936 if (longval) *longval = value;
1937 return REDIS_OK;
1938 }
1939
1940 /* Try to encode a string object in order to save space */
1941 static int tryObjectEncoding(robj *o) {
1942 long value;
1943 sds s = o->ptr;
1944
1945 if (o->encoding != REDIS_ENCODING_RAW)
1946 return REDIS_ERR; /* Already encoded */
1947
1948 /* It's not save to encode shared objects: shared objects can be shared
1949 * everywhere in the "object space" of Redis. Encoded objects can only
1950 * appear as "values" (and not, for instance, as keys) */
1951 if (o->refcount > 1) return REDIS_ERR;
1952
1953 /* Currently we try to encode only strings */
1954 assert(o->type == REDIS_STRING);
1955
1956 /* Check if we can represent this string as a long integer */
1957 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
1958
1959 /* Ok, this object can be encoded */
1960 o->encoding = REDIS_ENCODING_INT;
1961 sdsfree(o->ptr);
1962 o->ptr = (void*) value;
1963 return REDIS_OK;
1964 }
1965
1966 /* Get a decoded version of an encoded object (returned as a new object) */
1967 static robj *getDecodedObject(const robj *o) {
1968 robj *dec;
1969
1970 assert(o->encoding != REDIS_ENCODING_RAW);
1971 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
1972 char buf[32];
1973
1974 snprintf(buf,32,"%ld",(long)o->ptr);
1975 dec = createStringObject(buf,strlen(buf));
1976 return dec;
1977 } else {
1978 assert(1 != 1);
1979 }
1980 }
1981
1982 static int compareStringObjects(robj *a, robj *b) {
1983 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
1984
1985 if (a->encoding == REDIS_ENCODING_INT && b->encoding == REDIS_ENCODING_INT){
1986 return (long)a->ptr - (long)b->ptr;
1987 } else {
1988 int retval;
1989
1990 incrRefCount(a);
1991 incrRefCount(b);
1992 if (a->encoding != REDIS_ENCODING_RAW) a = getDecodedObject(a);
1993 if (b->encoding != REDIS_ENCODING_RAW) b = getDecodedObject(a);
1994 retval = sdscmp(a->ptr,b->ptr);
1995 decrRefCount(a);
1996 decrRefCount(b);
1997 return retval;
1998 }
1999 }
2000
2001 static size_t stringObjectLen(robj *o) {
2002 assert(o->type == REDIS_STRING);
2003 if (o->encoding == REDIS_ENCODING_RAW) {
2004 return sdslen(o->ptr);
2005 } else {
2006 char buf[32];
2007
2008 return snprintf(buf,32,"%ld",(long)o->ptr);
2009 }
2010 }
2011
2012 /*============================ DB saving/loading ============================ */
2013
2014 static int rdbSaveType(FILE *fp, unsigned char type) {
2015 if (fwrite(&type,1,1,fp) == 0) return -1;
2016 return 0;
2017 }
2018
2019 static int rdbSaveTime(FILE *fp, time_t t) {
2020 int32_t t32 = (int32_t) t;
2021 if (fwrite(&t32,4,1,fp) == 0) return -1;
2022 return 0;
2023 }
2024
2025 /* check rdbLoadLen() comments for more info */
2026 static int rdbSaveLen(FILE *fp, uint32_t len) {
2027 unsigned char buf[2];
2028
2029 if (len < (1<<6)) {
2030 /* Save a 6 bit len */
2031 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2032 if (fwrite(buf,1,1,fp) == 0) return -1;
2033 } else if (len < (1<<14)) {
2034 /* Save a 14 bit len */
2035 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2036 buf[1] = len&0xFF;
2037 if (fwrite(buf,2,1,fp) == 0) return -1;
2038 } else {
2039 /* Save a 32 bit len */
2040 buf[0] = (REDIS_RDB_32BITLEN<<6);
2041 if (fwrite(buf,1,1,fp) == 0) return -1;
2042 len = htonl(len);
2043 if (fwrite(&len,4,1,fp) == 0) return -1;
2044 }
2045 return 0;
2046 }
2047
2048 /* String objects in the form "2391" "-100" without any space and with a
2049 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2050 * encoded as integers to save space */
2051 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2052 long long value;
2053 char *endptr, buf[32];
2054
2055 /* Check if it's possible to encode this value as a number */
2056 value = strtoll(s, &endptr, 10);
2057 if (endptr[0] != '\0') return 0;
2058 snprintf(buf,32,"%lld",value);
2059
2060 /* If the number converted back into a string is not identical
2061 * then it's not possible to encode the string as integer */
2062 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2063
2064 /* Finally check if it fits in our ranges */
2065 if (value >= -(1<<7) && value <= (1<<7)-1) {
2066 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2067 enc[1] = value&0xFF;
2068 return 2;
2069 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2070 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2071 enc[1] = value&0xFF;
2072 enc[2] = (value>>8)&0xFF;
2073 return 3;
2074 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2075 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2076 enc[1] = value&0xFF;
2077 enc[2] = (value>>8)&0xFF;
2078 enc[3] = (value>>16)&0xFF;
2079 enc[4] = (value>>24)&0xFF;
2080 return 5;
2081 } else {
2082 return 0;
2083 }
2084 }
2085
2086 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2087 unsigned int comprlen, outlen;
2088 unsigned char byte;
2089 void *out;
2090
2091 /* We require at least four bytes compression for this to be worth it */
2092 outlen = sdslen(obj->ptr)-4;
2093 if (outlen <= 0) return 0;
2094 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2095 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2096 if (comprlen == 0) {
2097 zfree(out);
2098 return 0;
2099 }
2100 /* Data compressed! Let's save it on disk */
2101 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2102 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2103 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2104 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2105 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2106 zfree(out);
2107 return comprlen;
2108
2109 writeerr:
2110 zfree(out);
2111 return -1;
2112 }
2113
2114 /* Save a string objet as [len][data] on disk. If the object is a string
2115 * representation of an integer value we try to safe it in a special form */
2116 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2117 size_t len;
2118 int enclen;
2119
2120 len = sdslen(obj->ptr);
2121
2122 /* Try integer encoding */
2123 if (len <= 11) {
2124 unsigned char buf[5];
2125 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2126 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2127 return 0;
2128 }
2129 }
2130
2131 /* Try LZF compression - under 20 bytes it's unable to compress even
2132 * aaaaaaaaaaaaaaaaaa so skip it */
2133 if (len > 20) {
2134 int retval;
2135
2136 retval = rdbSaveLzfStringObject(fp,obj);
2137 if (retval == -1) return -1;
2138 if (retval > 0) return 0;
2139 /* retval == 0 means data can't be compressed, save the old way */
2140 }
2141
2142 /* Store verbatim */
2143 if (rdbSaveLen(fp,len) == -1) return -1;
2144 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2145 return 0;
2146 }
2147
2148 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2149 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2150 int retval;
2151 robj *dec;
2152
2153 if (obj->encoding != REDIS_ENCODING_RAW) {
2154 dec = getDecodedObject(obj);
2155 retval = rdbSaveStringObjectRaw(fp,dec);
2156 decrRefCount(dec);
2157 return retval;
2158 } else {
2159 return rdbSaveStringObjectRaw(fp,obj);
2160 }
2161 }
2162
2163 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2164 static int rdbSave(char *filename) {
2165 dictIterator *di = NULL;
2166 dictEntry *de;
2167 FILE *fp;
2168 char tmpfile[256];
2169 int j;
2170 time_t now = time(NULL);
2171
2172 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2173 fp = fopen(tmpfile,"w");
2174 if (!fp) {
2175 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2176 return REDIS_ERR;
2177 }
2178 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2179 for (j = 0; j < server.dbnum; j++) {
2180 redisDb *db = server.db+j;
2181 dict *d = db->dict;
2182 if (dictSize(d) == 0) continue;
2183 di = dictGetIterator(d);
2184 if (!di) {
2185 fclose(fp);
2186 return REDIS_ERR;
2187 }
2188
2189 /* Write the SELECT DB opcode */
2190 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2191 if (rdbSaveLen(fp,j) == -1) goto werr;
2192
2193 /* Iterate this DB writing every entry */
2194 while((de = dictNext(di)) != NULL) {
2195 robj *key = dictGetEntryKey(de);
2196 robj *o = dictGetEntryVal(de);
2197 time_t expiretime = getExpire(db,key);
2198
2199 /* Save the expire time */
2200 if (expiretime != -1) {
2201 /* If this key is already expired skip it */
2202 if (expiretime < now) continue;
2203 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2204 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2205 }
2206 /* Save the key and associated value */
2207 if (rdbSaveType(fp,o->type) == -1) goto werr;
2208 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2209 if (o->type == REDIS_STRING) {
2210 /* Save a string value */
2211 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2212 } else if (o->type == REDIS_LIST) {
2213 /* Save a list value */
2214 list *list = o->ptr;
2215 listNode *ln;
2216
2217 listRewind(list);
2218 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2219 while((ln = listYield(list))) {
2220 robj *eleobj = listNodeValue(ln);
2221
2222 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2223 }
2224 } else if (o->type == REDIS_SET) {
2225 /* Save a set value */
2226 dict *set = o->ptr;
2227 dictIterator *di = dictGetIterator(set);
2228 dictEntry *de;
2229
2230 if (!set) oom("dictGetIteraotr");
2231 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2232 while((de = dictNext(di)) != NULL) {
2233 robj *eleobj = dictGetEntryKey(de);
2234
2235 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2236 }
2237 dictReleaseIterator(di);
2238 } else {
2239 assert(0 != 0);
2240 }
2241 }
2242 dictReleaseIterator(di);
2243 }
2244 /* EOF opcode */
2245 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2246
2247 /* Make sure data will not remain on the OS's output buffers */
2248 fflush(fp);
2249 fsync(fileno(fp));
2250 fclose(fp);
2251
2252 /* Use RENAME to make sure the DB file is changed atomically only
2253 * if the generate DB file is ok. */
2254 if (rename(tmpfile,filename) == -1) {
2255 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
2256 unlink(tmpfile);
2257 return REDIS_ERR;
2258 }
2259 redisLog(REDIS_NOTICE,"DB saved on disk");
2260 server.dirty = 0;
2261 server.lastsave = time(NULL);
2262 return REDIS_OK;
2263
2264 werr:
2265 fclose(fp);
2266 unlink(tmpfile);
2267 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2268 if (di) dictReleaseIterator(di);
2269 return REDIS_ERR;
2270 }
2271
2272 static int rdbSaveBackground(char *filename) {
2273 pid_t childpid;
2274
2275 if (server.bgsaveinprogress) return REDIS_ERR;
2276 if ((childpid = fork()) == 0) {
2277 /* Child */
2278 close(server.fd);
2279 if (rdbSave(filename) == REDIS_OK) {
2280 exit(0);
2281 } else {
2282 exit(1);
2283 }
2284 } else {
2285 /* Parent */
2286 if (childpid == -1) {
2287 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2288 strerror(errno));
2289 return REDIS_ERR;
2290 }
2291 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2292 server.bgsaveinprogress = 1;
2293 server.bgsavechildpid = childpid;
2294 return REDIS_OK;
2295 }
2296 return REDIS_OK; /* unreached */
2297 }
2298
2299 static void rdbRemoveTempFile(pid_t childpid) {
2300 char tmpfile[256];
2301
2302 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2303 unlink(tmpfile);
2304 }
2305
2306 static int rdbLoadType(FILE *fp) {
2307 unsigned char type;
2308 if (fread(&type,1,1,fp) == 0) return -1;
2309 return type;
2310 }
2311
2312 static time_t rdbLoadTime(FILE *fp) {
2313 int32_t t32;
2314 if (fread(&t32,4,1,fp) == 0) return -1;
2315 return (time_t) t32;
2316 }
2317
2318 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2319 * of this file for a description of how this are stored on disk.
2320 *
2321 * isencoded is set to 1 if the readed length is not actually a length but
2322 * an "encoding type", check the above comments for more info */
2323 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2324 unsigned char buf[2];
2325 uint32_t len;
2326
2327 if (isencoded) *isencoded = 0;
2328 if (rdbver == 0) {
2329 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2330 return ntohl(len);
2331 } else {
2332 int type;
2333
2334 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2335 type = (buf[0]&0xC0)>>6;
2336 if (type == REDIS_RDB_6BITLEN) {
2337 /* Read a 6 bit len */
2338 return buf[0]&0x3F;
2339 } else if (type == REDIS_RDB_ENCVAL) {
2340 /* Read a 6 bit len encoding type */
2341 if (isencoded) *isencoded = 1;
2342 return buf[0]&0x3F;
2343 } else if (type == REDIS_RDB_14BITLEN) {
2344 /* Read a 14 bit len */
2345 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2346 return ((buf[0]&0x3F)<<8)|buf[1];
2347 } else {
2348 /* Read a 32 bit len */
2349 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2350 return ntohl(len);
2351 }
2352 }
2353 }
2354
2355 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2356 unsigned char enc[4];
2357 long long val;
2358
2359 if (enctype == REDIS_RDB_ENC_INT8) {
2360 if (fread(enc,1,1,fp) == 0) return NULL;
2361 val = (signed char)enc[0];
2362 } else if (enctype == REDIS_RDB_ENC_INT16) {
2363 uint16_t v;
2364 if (fread(enc,2,1,fp) == 0) return NULL;
2365 v = enc[0]|(enc[1]<<8);
2366 val = (int16_t)v;
2367 } else if (enctype == REDIS_RDB_ENC_INT32) {
2368 uint32_t v;
2369 if (fread(enc,4,1,fp) == 0) return NULL;
2370 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2371 val = (int32_t)v;
2372 } else {
2373 val = 0; /* anti-warning */
2374 assert(0!=0);
2375 }
2376 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2377 }
2378
2379 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2380 unsigned int len, clen;
2381 unsigned char *c = NULL;
2382 sds val = NULL;
2383
2384 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2385 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2386 if ((c = zmalloc(clen)) == NULL) goto err;
2387 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2388 if (fread(c,clen,1,fp) == 0) goto err;
2389 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2390 zfree(c);
2391 return createObject(REDIS_STRING,val);
2392 err:
2393 zfree(c);
2394 sdsfree(val);
2395 return NULL;
2396 }
2397
2398 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2399 int isencoded;
2400 uint32_t len;
2401 sds val;
2402
2403 len = rdbLoadLen(fp,rdbver,&isencoded);
2404 if (isencoded) {
2405 switch(len) {
2406 case REDIS_RDB_ENC_INT8:
2407 case REDIS_RDB_ENC_INT16:
2408 case REDIS_RDB_ENC_INT32:
2409 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2410 case REDIS_RDB_ENC_LZF:
2411 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2412 default:
2413 assert(0!=0);
2414 }
2415 }
2416
2417 if (len == REDIS_RDB_LENERR) return NULL;
2418 val = sdsnewlen(NULL,len);
2419 if (len && fread(val,len,1,fp) == 0) {
2420 sdsfree(val);
2421 return NULL;
2422 }
2423 return tryObjectSharing(createObject(REDIS_STRING,val));
2424 }
2425
2426 static int rdbLoad(char *filename) {
2427 FILE *fp;
2428 robj *keyobj = NULL;
2429 uint32_t dbid;
2430 int type, retval, rdbver;
2431 dict *d = server.db[0].dict;
2432 redisDb *db = server.db+0;
2433 char buf[1024];
2434 time_t expiretime = -1, now = time(NULL);
2435
2436 fp = fopen(filename,"r");
2437 if (!fp) return REDIS_ERR;
2438 if (fread(buf,9,1,fp) == 0) goto eoferr;
2439 buf[9] = '\0';
2440 if (memcmp(buf,"REDIS",5) != 0) {
2441 fclose(fp);
2442 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2443 return REDIS_ERR;
2444 }
2445 rdbver = atoi(buf+5);
2446 if (rdbver > 1) {
2447 fclose(fp);
2448 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2449 return REDIS_ERR;
2450 }
2451 while(1) {
2452 robj *o;
2453
2454 /* Read type. */
2455 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2456 if (type == REDIS_EXPIRETIME) {
2457 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2458 /* We read the time so we need to read the object type again */
2459 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2460 }
2461 if (type == REDIS_EOF) break;
2462 /* Handle SELECT DB opcode as a special case */
2463 if (type == REDIS_SELECTDB) {
2464 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2465 goto eoferr;
2466 if (dbid >= (unsigned)server.dbnum) {
2467 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2468 exit(1);
2469 }
2470 db = server.db+dbid;
2471 d = db->dict;
2472 continue;
2473 }
2474 /* Read key */
2475 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2476
2477 if (type == REDIS_STRING) {
2478 /* Read string value */
2479 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2480 tryObjectEncoding(o);
2481 } else if (type == REDIS_LIST || type == REDIS_SET) {
2482 /* Read list/set value */
2483 uint32_t listlen;
2484
2485 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2486 goto eoferr;
2487 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2488 /* Load every single element of the list/set */
2489 while(listlen--) {
2490 robj *ele;
2491
2492 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2493 tryObjectEncoding(ele);
2494 if (type == REDIS_LIST) {
2495 if (!listAddNodeTail((list*)o->ptr,ele))
2496 oom("listAddNodeTail");
2497 } else {
2498 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2499 oom("dictAdd");
2500 }
2501 }
2502 } else {
2503 assert(0 != 0);
2504 }
2505 /* Add the new object in the hash table */
2506 retval = dictAdd(d,keyobj,o);
2507 if (retval == DICT_ERR) {
2508 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2509 exit(1);
2510 }
2511 /* Set the expire time if needed */
2512 if (expiretime != -1) {
2513 setExpire(db,keyobj,expiretime);
2514 /* Delete this key if already expired */
2515 if (expiretime < now) deleteKey(db,keyobj);
2516 expiretime = -1;
2517 }
2518 keyobj = o = NULL;
2519 }
2520 fclose(fp);
2521 return REDIS_OK;
2522
2523 eoferr: /* unexpected end of file is handled here with a fatal exit */
2524 if (keyobj) decrRefCount(keyobj);
2525 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2526 exit(1);
2527 return REDIS_ERR; /* Just to avoid warning */
2528 }
2529
2530 /*================================== Commands =============================== */
2531
2532 static void authCommand(redisClient *c) {
2533 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2534 c->authenticated = 1;
2535 addReply(c,shared.ok);
2536 } else {
2537 c->authenticated = 0;
2538 addReply(c,shared.err);
2539 }
2540 }
2541
2542 static void pingCommand(redisClient *c) {
2543 addReply(c,shared.pong);
2544 }
2545
2546 static void echoCommand(redisClient *c) {
2547 addReplyBulkLen(c,c->argv[1]);
2548 addReply(c,c->argv[1]);
2549 addReply(c,shared.crlf);
2550 }
2551
2552 /*=================================== Strings =============================== */
2553
2554 static void setGenericCommand(redisClient *c, int nx) {
2555 int retval;
2556
2557 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2558 if (retval == DICT_ERR) {
2559 if (!nx) {
2560 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2561 incrRefCount(c->argv[2]);
2562 } else {
2563 addReply(c,shared.czero);
2564 return;
2565 }
2566 } else {
2567 incrRefCount(c->argv[1]);
2568 incrRefCount(c->argv[2]);
2569 }
2570 server.dirty++;
2571 removeExpire(c->db,c->argv[1]);
2572 addReply(c, nx ? shared.cone : shared.ok);
2573 }
2574
2575 static void setCommand(redisClient *c) {
2576 setGenericCommand(c,0);
2577 }
2578
2579 static void setnxCommand(redisClient *c) {
2580 setGenericCommand(c,1);
2581 }
2582
2583 static void getCommand(redisClient *c) {
2584 robj *o = lookupKeyRead(c->db,c->argv[1]);
2585
2586 if (o == NULL) {
2587 addReply(c,shared.nullbulk);
2588 } else {
2589 if (o->type != REDIS_STRING) {
2590 addReply(c,shared.wrongtypeerr);
2591 } else {
2592 addReplyBulkLen(c,o);
2593 addReply(c,o);
2594 addReply(c,shared.crlf);
2595 }
2596 }
2597 }
2598
2599 static void getSetCommand(redisClient *c) {
2600 getCommand(c);
2601 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2602 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2603 } else {
2604 incrRefCount(c->argv[1]);
2605 }
2606 incrRefCount(c->argv[2]);
2607 server.dirty++;
2608 removeExpire(c->db,c->argv[1]);
2609 }
2610
2611 static void mgetCommand(redisClient *c) {
2612 int j;
2613
2614 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2615 for (j = 1; j < c->argc; j++) {
2616 robj *o = lookupKeyRead(c->db,c->argv[j]);
2617 if (o == NULL) {
2618 addReply(c,shared.nullbulk);
2619 } else {
2620 if (o->type != REDIS_STRING) {
2621 addReply(c,shared.nullbulk);
2622 } else {
2623 addReplyBulkLen(c,o);
2624 addReply(c,o);
2625 addReply(c,shared.crlf);
2626 }
2627 }
2628 }
2629 }
2630
2631 static void incrDecrCommand(redisClient *c, long long incr) {
2632 long long value;
2633 int retval;
2634 robj *o;
2635
2636 o = lookupKeyWrite(c->db,c->argv[1]);
2637 if (o == NULL) {
2638 value = 0;
2639 } else {
2640 if (o->type != REDIS_STRING) {
2641 value = 0;
2642 } else {
2643 char *eptr;
2644
2645 if (o->encoding == REDIS_ENCODING_RAW)
2646 value = strtoll(o->ptr, &eptr, 10);
2647 else if (o->encoding == REDIS_ENCODING_INT)
2648 value = (long)o->ptr;
2649 else
2650 assert(1 != 1);
2651 }
2652 }
2653
2654 value += incr;
2655 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2656 tryObjectEncoding(o);
2657 retval = dictAdd(c->db->dict,c->argv[1],o);
2658 if (retval == DICT_ERR) {
2659 dictReplace(c->db->dict,c->argv[1],o);
2660 removeExpire(c->db,c->argv[1]);
2661 } else {
2662 incrRefCount(c->argv[1]);
2663 }
2664 server.dirty++;
2665 addReply(c,shared.colon);
2666 addReply(c,o);
2667 addReply(c,shared.crlf);
2668 }
2669
2670 static void incrCommand(redisClient *c) {
2671 incrDecrCommand(c,1);
2672 }
2673
2674 static void decrCommand(redisClient *c) {
2675 incrDecrCommand(c,-1);
2676 }
2677
2678 static void incrbyCommand(redisClient *c) {
2679 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2680 incrDecrCommand(c,incr);
2681 }
2682
2683 static void decrbyCommand(redisClient *c) {
2684 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2685 incrDecrCommand(c,-incr);
2686 }
2687
2688 /* ========================= Type agnostic commands ========================= */
2689
2690 static void delCommand(redisClient *c) {
2691 int deleted = 0, j;
2692
2693 for (j = 1; j < c->argc; j++) {
2694 if (deleteKey(c->db,c->argv[j])) {
2695 server.dirty++;
2696 deleted++;
2697 }
2698 }
2699 switch(deleted) {
2700 case 0:
2701 addReply(c,shared.czero);
2702 break;
2703 case 1:
2704 addReply(c,shared.cone);
2705 break;
2706 default:
2707 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2708 break;
2709 }
2710 }
2711
2712 static void existsCommand(redisClient *c) {
2713 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2714 }
2715
2716 static void selectCommand(redisClient *c) {
2717 int id = atoi(c->argv[1]->ptr);
2718
2719 if (selectDb(c,id) == REDIS_ERR) {
2720 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2721 } else {
2722 addReply(c,shared.ok);
2723 }
2724 }
2725
2726 static void randomkeyCommand(redisClient *c) {
2727 dictEntry *de;
2728
2729 while(1) {
2730 de = dictGetRandomKey(c->db->dict);
2731 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2732 }
2733 if (de == NULL) {
2734 addReply(c,shared.plus);
2735 addReply(c,shared.crlf);
2736 } else {
2737 addReply(c,shared.plus);
2738 addReply(c,dictGetEntryKey(de));
2739 addReply(c,shared.crlf);
2740 }
2741 }
2742
2743 static void keysCommand(redisClient *c) {
2744 dictIterator *di;
2745 dictEntry *de;
2746 sds pattern = c->argv[1]->ptr;
2747 int plen = sdslen(pattern);
2748 int numkeys = 0, keyslen = 0;
2749 robj *lenobj = createObject(REDIS_STRING,NULL);
2750
2751 di = dictGetIterator(c->db->dict);
2752 if (!di) oom("dictGetIterator");
2753 addReply(c,lenobj);
2754 decrRefCount(lenobj);
2755 while((de = dictNext(di)) != NULL) {
2756 robj *keyobj = dictGetEntryKey(de);
2757
2758 sds key = keyobj->ptr;
2759 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2760 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2761 if (expireIfNeeded(c->db,keyobj) == 0) {
2762 if (numkeys != 0)
2763 addReply(c,shared.space);
2764 addReply(c,keyobj);
2765 numkeys++;
2766 keyslen += sdslen(key);
2767 }
2768 }
2769 }
2770 dictReleaseIterator(di);
2771 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2772 addReply(c,shared.crlf);
2773 }
2774
2775 static void dbsizeCommand(redisClient *c) {
2776 addReplySds(c,
2777 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2778 }
2779
2780 static void lastsaveCommand(redisClient *c) {
2781 addReplySds(c,
2782 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2783 }
2784
2785 static void typeCommand(redisClient *c) {
2786 robj *o;
2787 char *type;
2788
2789 o = lookupKeyRead(c->db,c->argv[1]);
2790 if (o == NULL) {
2791 type = "+none";
2792 } else {
2793 switch(o->type) {
2794 case REDIS_STRING: type = "+string"; break;
2795 case REDIS_LIST: type = "+list"; break;
2796 case REDIS_SET: type = "+set"; break;
2797 default: type = "unknown"; break;
2798 }
2799 }
2800 addReplySds(c,sdsnew(type));
2801 addReply(c,shared.crlf);
2802 }
2803
2804 static void saveCommand(redisClient *c) {
2805 if (server.bgsaveinprogress) {
2806 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2807 return;
2808 }
2809 if (rdbSave(server.dbfilename) == REDIS_OK) {
2810 addReply(c,shared.ok);
2811 } else {
2812 addReply(c,shared.err);
2813 }
2814 }
2815
2816 static void bgsaveCommand(redisClient *c) {
2817 if (server.bgsaveinprogress) {
2818 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2819 return;
2820 }
2821 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2822 addReply(c,shared.ok);
2823 } else {
2824 addReply(c,shared.err);
2825 }
2826 }
2827
2828 static void shutdownCommand(redisClient *c) {
2829 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2830 /* Kill the saving child if there is a background saving in progress.
2831 We want to avoid race conditions, for instance our saving child may
2832 overwrite the synchronous saving did by SHUTDOWN. */
2833 if (server.bgsaveinprogress) {
2834 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2835 kill(server.bgsavechildpid,SIGKILL);
2836 rdbRemoveTempFile(server.bgsavechildpid);
2837 }
2838 /* SYNC SAVE */
2839 if (rdbSave(server.dbfilename) == REDIS_OK) {
2840 if (server.daemonize)
2841 unlink(server.pidfile);
2842 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2843 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2844 exit(1);
2845 } else {
2846 /* Ooops.. error saving! The best we can do is to continue operating.
2847 * Note that if there was a background saving process, in the next
2848 * cron() Redis will be notified that the background saving aborted,
2849 * handling special stuff like slaves pending for synchronization... */
2850 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2851 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2852 }
2853 }
2854
2855 static void renameGenericCommand(redisClient *c, int nx) {
2856 robj *o;
2857
2858 /* To use the same key as src and dst is probably an error */
2859 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2860 addReply(c,shared.sameobjecterr);
2861 return;
2862 }
2863
2864 o = lookupKeyWrite(c->db,c->argv[1]);
2865 if (o == NULL) {
2866 addReply(c,shared.nokeyerr);
2867 return;
2868 }
2869 incrRefCount(o);
2870 deleteIfVolatile(c->db,c->argv[2]);
2871 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2872 if (nx) {
2873 decrRefCount(o);
2874 addReply(c,shared.czero);
2875 return;
2876 }
2877 dictReplace(c->db->dict,c->argv[2],o);
2878 } else {
2879 incrRefCount(c->argv[2]);
2880 }
2881 deleteKey(c->db,c->argv[1]);
2882 server.dirty++;
2883 addReply(c,nx ? shared.cone : shared.ok);
2884 }
2885
2886 static void renameCommand(redisClient *c) {
2887 renameGenericCommand(c,0);
2888 }
2889
2890 static void renamenxCommand(redisClient *c) {
2891 renameGenericCommand(c,1);
2892 }
2893
2894 static void moveCommand(redisClient *c) {
2895 robj *o;
2896 redisDb *src, *dst;
2897 int srcid;
2898
2899 /* Obtain source and target DB pointers */
2900 src = c->db;
2901 srcid = c->db->id;
2902 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2903 addReply(c,shared.outofrangeerr);
2904 return;
2905 }
2906 dst = c->db;
2907 selectDb(c,srcid); /* Back to the source DB */
2908
2909 /* If the user is moving using as target the same
2910 * DB as the source DB it is probably an error. */
2911 if (src == dst) {
2912 addReply(c,shared.sameobjecterr);
2913 return;
2914 }
2915
2916 /* Check if the element exists and get a reference */
2917 o = lookupKeyWrite(c->db,c->argv[1]);
2918 if (!o) {
2919 addReply(c,shared.czero);
2920 return;
2921 }
2922
2923 /* Try to add the element to the target DB */
2924 deleteIfVolatile(dst,c->argv[1]);
2925 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2926 addReply(c,shared.czero);
2927 return;
2928 }
2929 incrRefCount(c->argv[1]);
2930 incrRefCount(o);
2931
2932 /* OK! key moved, free the entry in the source DB */
2933 deleteKey(src,c->argv[1]);
2934 server.dirty++;
2935 addReply(c,shared.cone);
2936 }
2937
2938 /* =================================== Lists ================================ */
2939 static void pushGenericCommand(redisClient *c, int where) {
2940 robj *lobj;
2941 list *list;
2942
2943 lobj = lookupKeyWrite(c->db,c->argv[1]);
2944 if (lobj == NULL) {
2945 lobj = createListObject();
2946 list = lobj->ptr;
2947 if (where == REDIS_HEAD) {
2948 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2949 } else {
2950 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2951 }
2952 dictAdd(c->db->dict,c->argv[1],lobj);
2953 incrRefCount(c->argv[1]);
2954 incrRefCount(c->argv[2]);
2955 } else {
2956 if (lobj->type != REDIS_LIST) {
2957 addReply(c,shared.wrongtypeerr);
2958 return;
2959 }
2960 list = lobj->ptr;
2961 if (where == REDIS_HEAD) {
2962 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2963 } else {
2964 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2965 }
2966 incrRefCount(c->argv[2]);
2967 }
2968 server.dirty++;
2969 addReply(c,shared.ok);
2970 }
2971
2972 static void lpushCommand(redisClient *c) {
2973 pushGenericCommand(c,REDIS_HEAD);
2974 }
2975
2976 static void rpushCommand(redisClient *c) {
2977 pushGenericCommand(c,REDIS_TAIL);
2978 }
2979
2980 static void llenCommand(redisClient *c) {
2981 robj *o;
2982 list *l;
2983
2984 o = lookupKeyRead(c->db,c->argv[1]);
2985 if (o == NULL) {
2986 addReply(c,shared.czero);
2987 return;
2988 } else {
2989 if (o->type != REDIS_LIST) {
2990 addReply(c,shared.wrongtypeerr);
2991 } else {
2992 l = o->ptr;
2993 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
2994 }
2995 }
2996 }
2997
2998 static void lindexCommand(redisClient *c) {
2999 robj *o;
3000 int index = atoi(c->argv[2]->ptr);
3001
3002 o = lookupKeyRead(c->db,c->argv[1]);
3003 if (o == NULL) {
3004 addReply(c,shared.nullbulk);
3005 } else {
3006 if (o->type != REDIS_LIST) {
3007 addReply(c,shared.wrongtypeerr);
3008 } else {
3009 list *list = o->ptr;
3010 listNode *ln;
3011
3012 ln = listIndex(list, index);
3013 if (ln == NULL) {
3014 addReply(c,shared.nullbulk);
3015 } else {
3016 robj *ele = listNodeValue(ln);
3017 addReplyBulkLen(c,ele);
3018 addReply(c,ele);
3019 addReply(c,shared.crlf);
3020 }
3021 }
3022 }
3023 }
3024
3025 static void lsetCommand(redisClient *c) {
3026 robj *o;
3027 int index = atoi(c->argv[2]->ptr);
3028
3029 o = lookupKeyWrite(c->db,c->argv[1]);
3030 if (o == NULL) {
3031 addReply(c,shared.nokeyerr);
3032 } else {
3033 if (o->type != REDIS_LIST) {
3034 addReply(c,shared.wrongtypeerr);
3035 } else {
3036 list *list = o->ptr;
3037 listNode *ln;
3038
3039 ln = listIndex(list, index);
3040 if (ln == NULL) {
3041 addReply(c,shared.outofrangeerr);
3042 } else {
3043 robj *ele = listNodeValue(ln);
3044
3045 decrRefCount(ele);
3046 listNodeValue(ln) = c->argv[3];
3047 incrRefCount(c->argv[3]);
3048 addReply(c,shared.ok);
3049 server.dirty++;
3050 }
3051 }
3052 }
3053 }
3054
3055 static void popGenericCommand(redisClient *c, int where) {
3056 robj *o;
3057
3058 o = lookupKeyWrite(c->db,c->argv[1]);
3059 if (o == NULL) {
3060 addReply(c,shared.nullbulk);
3061 } else {
3062 if (o->type != REDIS_LIST) {
3063 addReply(c,shared.wrongtypeerr);
3064 } else {
3065 list *list = o->ptr;
3066 listNode *ln;
3067
3068 if (where == REDIS_HEAD)
3069 ln = listFirst(list);
3070 else
3071 ln = listLast(list);
3072
3073 if (ln == NULL) {
3074 addReply(c,shared.nullbulk);
3075 } else {
3076 robj *ele = listNodeValue(ln);
3077 addReplyBulkLen(c,ele);
3078 addReply(c,ele);
3079 addReply(c,shared.crlf);
3080 listDelNode(list,ln);
3081 server.dirty++;
3082 }
3083 }
3084 }
3085 }
3086
3087 static void lpopCommand(redisClient *c) {
3088 popGenericCommand(c,REDIS_HEAD);
3089 }
3090
3091 static void rpopCommand(redisClient *c) {
3092 popGenericCommand(c,REDIS_TAIL);
3093 }
3094
3095 static void lrangeCommand(redisClient *c) {
3096 robj *o;
3097 int start = atoi(c->argv[2]->ptr);
3098 int end = atoi(c->argv[3]->ptr);
3099
3100 o = lookupKeyRead(c->db,c->argv[1]);
3101 if (o == NULL) {
3102 addReply(c,shared.nullmultibulk);
3103 } else {
3104 if (o->type != REDIS_LIST) {
3105 addReply(c,shared.wrongtypeerr);
3106 } else {
3107 list *list = o->ptr;
3108 listNode *ln;
3109 int llen = listLength(list);
3110 int rangelen, j;
3111 robj *ele;
3112
3113 /* convert negative indexes */
3114 if (start < 0) start = llen+start;
3115 if (end < 0) end = llen+end;
3116 if (start < 0) start = 0;
3117 if (end < 0) end = 0;
3118
3119 /* indexes sanity checks */
3120 if (start > end || start >= llen) {
3121 /* Out of range start or start > end result in empty list */
3122 addReply(c,shared.emptymultibulk);
3123 return;
3124 }
3125 if (end >= llen) end = llen-1;
3126 rangelen = (end-start)+1;
3127
3128 /* Return the result in form of a multi-bulk reply */
3129 ln = listIndex(list, start);
3130 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3131 for (j = 0; j < rangelen; j++) {
3132 ele = listNodeValue(ln);
3133 addReplyBulkLen(c,ele);
3134 addReply(c,ele);
3135 addReply(c,shared.crlf);
3136 ln = ln->next;
3137 }
3138 }
3139 }
3140 }
3141
3142 static void ltrimCommand(redisClient *c) {
3143 robj *o;
3144 int start = atoi(c->argv[2]->ptr);
3145 int end = atoi(c->argv[3]->ptr);
3146
3147 o = lookupKeyWrite(c->db,c->argv[1]);
3148 if (o == NULL) {
3149 addReply(c,shared.nokeyerr);
3150 } else {
3151 if (o->type != REDIS_LIST) {
3152 addReply(c,shared.wrongtypeerr);
3153 } else {
3154 list *list = o->ptr;
3155 listNode *ln;
3156 int llen = listLength(list);
3157 int j, ltrim, rtrim;
3158
3159 /* convert negative indexes */
3160 if (start < 0) start = llen+start;
3161 if (end < 0) end = llen+end;
3162 if (start < 0) start = 0;
3163 if (end < 0) end = 0;
3164
3165 /* indexes sanity checks */
3166 if (start > end || start >= llen) {
3167 /* Out of range start or start > end result in empty list */
3168 ltrim = llen;
3169 rtrim = 0;
3170 } else {
3171 if (end >= llen) end = llen-1;
3172 ltrim = start;
3173 rtrim = llen-end-1;
3174 }
3175
3176 /* Remove list elements to perform the trim */
3177 for (j = 0; j < ltrim; j++) {
3178 ln = listFirst(list);
3179 listDelNode(list,ln);
3180 }
3181 for (j = 0; j < rtrim; j++) {
3182 ln = listLast(list);
3183 listDelNode(list,ln);
3184 }
3185 server.dirty++;
3186 addReply(c,shared.ok);
3187 }
3188 }
3189 }
3190
3191 static void lremCommand(redisClient *c) {
3192 robj *o;
3193
3194 o = lookupKeyWrite(c->db,c->argv[1]);
3195 if (o == NULL) {
3196 addReply(c,shared.czero);
3197 } else {
3198 if (o->type != REDIS_LIST) {
3199 addReply(c,shared.wrongtypeerr);
3200 } else {
3201 list *list = o->ptr;
3202 listNode *ln, *next;
3203 int toremove = atoi(c->argv[2]->ptr);
3204 int removed = 0;
3205 int fromtail = 0;
3206
3207 if (toremove < 0) {
3208 toremove = -toremove;
3209 fromtail = 1;
3210 }
3211 ln = fromtail ? list->tail : list->head;
3212 while (ln) {
3213 robj *ele = listNodeValue(ln);
3214
3215 next = fromtail ? ln->prev : ln->next;
3216 if (compareStringObjects(ele,c->argv[3]) == 0) {
3217 listDelNode(list,ln);
3218 server.dirty++;
3219 removed++;
3220 if (toremove && removed == toremove) break;
3221 }
3222 ln = next;
3223 }
3224 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3225 }
3226 }
3227 }
3228
3229 /* ==================================== Sets ================================ */
3230
3231 static void saddCommand(redisClient *c) {
3232 robj *set;
3233
3234 set = lookupKeyWrite(c->db,c->argv[1]);
3235 if (set == NULL) {
3236 set = createSetObject();
3237 dictAdd(c->db->dict,c->argv[1],set);
3238 incrRefCount(c->argv[1]);
3239 } else {
3240 if (set->type != REDIS_SET) {
3241 addReply(c,shared.wrongtypeerr);
3242 return;
3243 }
3244 }
3245 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3246 incrRefCount(c->argv[2]);
3247 server.dirty++;
3248 addReply(c,shared.cone);
3249 } else {
3250 addReply(c,shared.czero);
3251 }
3252 }
3253
3254 static void sremCommand(redisClient *c) {
3255 robj *set;
3256
3257 set = lookupKeyWrite(c->db,c->argv[1]);
3258 if (set == NULL) {
3259 addReply(c,shared.czero);
3260 } else {
3261 if (set->type != REDIS_SET) {
3262 addReply(c,shared.wrongtypeerr);
3263 return;
3264 }
3265 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3266 server.dirty++;
3267 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3268 addReply(c,shared.cone);
3269 } else {
3270 addReply(c,shared.czero);
3271 }
3272 }
3273 }
3274
3275 static void smoveCommand(redisClient *c) {
3276 robj *srcset, *dstset;
3277
3278 srcset = lookupKeyWrite(c->db,c->argv[1]);
3279 dstset = lookupKeyWrite(c->db,c->argv[2]);
3280
3281 /* If the source key does not exist return 0, if it's of the wrong type
3282 * raise an error */
3283 if (srcset == NULL || srcset->type != REDIS_SET) {
3284 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3285 return;
3286 }
3287 /* Error if the destination key is not a set as well */
3288 if (dstset && dstset->type != REDIS_SET) {
3289 addReply(c,shared.wrongtypeerr);
3290 return;
3291 }
3292 /* Remove the element from the source set */
3293 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3294 /* Key not found in the src set! return zero */
3295 addReply(c,shared.czero);
3296 return;
3297 }
3298 server.dirty++;
3299 /* Add the element to the destination set */
3300 if (!dstset) {
3301 dstset = createSetObject();
3302 dictAdd(c->db->dict,c->argv[2],dstset);
3303 incrRefCount(c->argv[2]);
3304 }
3305 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3306 incrRefCount(c->argv[3]);
3307 addReply(c,shared.cone);
3308 }
3309
3310 static void sismemberCommand(redisClient *c) {
3311 robj *set;
3312
3313 set = lookupKeyRead(c->db,c->argv[1]);
3314 if (set == NULL) {
3315 addReply(c,shared.czero);
3316 } else {
3317 if (set->type != REDIS_SET) {
3318 addReply(c,shared.wrongtypeerr);
3319 return;
3320 }
3321 if (dictFind(set->ptr,c->argv[2]))
3322 addReply(c,shared.cone);
3323 else
3324 addReply(c,shared.czero);
3325 }
3326 }
3327
3328 static void scardCommand(redisClient *c) {
3329 robj *o;
3330 dict *s;
3331
3332 o = lookupKeyRead(c->db,c->argv[1]);
3333 if (o == NULL) {
3334 addReply(c,shared.czero);
3335 return;
3336 } else {
3337 if (o->type != REDIS_SET) {
3338 addReply(c,shared.wrongtypeerr);
3339 } else {
3340 s = o->ptr;
3341 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3342 dictSize(s)));
3343 }
3344 }
3345 }
3346
3347 static void spopCommand(redisClient *c) {
3348 robj *set;
3349 dictEntry *de;
3350
3351 set = lookupKeyWrite(c->db,c->argv[1]);
3352 if (set == NULL) {
3353 addReply(c,shared.nullbulk);
3354 } else {
3355 if (set->type != REDIS_SET) {
3356 addReply(c,shared.wrongtypeerr);
3357 return;
3358 }
3359 de = dictGetRandomKey(set->ptr);
3360 if (de == NULL) {
3361 addReply(c,shared.nullbulk);
3362 } else {
3363 robj *ele = dictGetEntryKey(de);
3364
3365 addReplyBulkLen(c,ele);
3366 addReply(c,ele);
3367 addReply(c,shared.crlf);
3368 dictDelete(set->ptr,ele);
3369 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3370 server.dirty++;
3371 }
3372 }
3373 }
3374
3375 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3376 dict **d1 = (void*) s1, **d2 = (void*) s2;
3377
3378 return dictSize(*d1)-dictSize(*d2);
3379 }
3380
3381 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3382 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3383 dictIterator *di;
3384 dictEntry *de;
3385 robj *lenobj = NULL, *dstset = NULL;
3386 int j, cardinality = 0;
3387
3388 if (!dv) oom("sinterGenericCommand");
3389 for (j = 0; j < setsnum; j++) {
3390 robj *setobj;
3391
3392 setobj = dstkey ?
3393 lookupKeyWrite(c->db,setskeys[j]) :
3394 lookupKeyRead(c->db,setskeys[j]);
3395 if (!setobj) {
3396 zfree(dv);
3397 if (dstkey) {
3398 deleteKey(c->db,dstkey);
3399 addReply(c,shared.ok);
3400 } else {
3401 addReply(c,shared.nullmultibulk);
3402 }
3403 return;
3404 }
3405 if (setobj->type != REDIS_SET) {
3406 zfree(dv);
3407 addReply(c,shared.wrongtypeerr);
3408 return;
3409 }
3410 dv[j] = setobj->ptr;
3411 }
3412 /* Sort sets from the smallest to largest, this will improve our
3413 * algorithm's performace */
3414 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3415
3416 /* The first thing we should output is the total number of elements...
3417 * since this is a multi-bulk write, but at this stage we don't know
3418 * the intersection set size, so we use a trick, append an empty object
3419 * to the output list and save the pointer to later modify it with the
3420 * right length */
3421 if (!dstkey) {
3422 lenobj = createObject(REDIS_STRING,NULL);
3423 addReply(c,lenobj);
3424 decrRefCount(lenobj);
3425 } else {
3426 /* If we have a target key where to store the resulting set
3427 * create this key with an empty set inside */
3428 dstset = createSetObject();
3429 }
3430
3431 /* Iterate all the elements of the first (smallest) set, and test
3432 * the element against all the other sets, if at least one set does
3433 * not include the element it is discarded */
3434 di = dictGetIterator(dv[0]);
3435 if (!di) oom("dictGetIterator");
3436
3437 while((de = dictNext(di)) != NULL) {
3438 robj *ele;
3439
3440 for (j = 1; j < setsnum; j++)
3441 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3442 if (j != setsnum)
3443 continue; /* at least one set does not contain the member */
3444 ele = dictGetEntryKey(de);
3445 if (!dstkey) {
3446 addReplyBulkLen(c,ele);
3447 addReply(c,ele);
3448 addReply(c,shared.crlf);
3449 cardinality++;
3450 } else {
3451 dictAdd(dstset->ptr,ele,NULL);
3452 incrRefCount(ele);
3453 }
3454 }
3455 dictReleaseIterator(di);
3456
3457 if (dstkey) {
3458 /* Store the resulting set into the target */
3459 deleteKey(c->db,dstkey);
3460 dictAdd(c->db->dict,dstkey,dstset);
3461 incrRefCount(dstkey);
3462 }
3463
3464 if (!dstkey) {
3465 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3466 } else {
3467 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3468 dictSize((dict*)dstset->ptr)));
3469 server.dirty++;
3470 }
3471 zfree(dv);
3472 }
3473
3474 static void sinterCommand(redisClient *c) {
3475 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3476 }
3477
3478 static void sinterstoreCommand(redisClient *c) {
3479 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3480 }
3481
3482 #define REDIS_OP_UNION 0
3483 #define REDIS_OP_DIFF 1
3484
3485 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3486 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3487 dictIterator *di;
3488 dictEntry *de;
3489 robj *dstset = NULL;
3490 int j, cardinality = 0;
3491
3492 if (!dv) oom("sunionDiffGenericCommand");
3493 for (j = 0; j < setsnum; j++) {
3494 robj *setobj;
3495
3496 setobj = dstkey ?
3497 lookupKeyWrite(c->db,setskeys[j]) :
3498 lookupKeyRead(c->db,setskeys[j]);
3499 if (!setobj) {
3500 dv[j] = NULL;
3501 continue;
3502 }
3503 if (setobj->type != REDIS_SET) {
3504 zfree(dv);
3505 addReply(c,shared.wrongtypeerr);
3506 return;
3507 }
3508 dv[j] = setobj->ptr;
3509 }
3510
3511 /* We need a temp set object to store our union. If the dstkey
3512 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3513 * this set object will be the resulting object to set into the target key*/
3514 dstset = createSetObject();
3515
3516 /* Iterate all the elements of all the sets, add every element a single
3517 * time to the result set */
3518 for (j = 0; j < setsnum; j++) {
3519 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3520 if (!dv[j]) continue; /* non existing keys are like empty sets */
3521
3522 di = dictGetIterator(dv[j]);
3523 if (!di) oom("dictGetIterator");
3524
3525 while((de = dictNext(di)) != NULL) {
3526 robj *ele;
3527
3528 /* dictAdd will not add the same element multiple times */
3529 ele = dictGetEntryKey(de);
3530 if (op == REDIS_OP_UNION || j == 0) {
3531 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3532 incrRefCount(ele);
3533 cardinality++;
3534 }
3535 } else if (op == REDIS_OP_DIFF) {
3536 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3537 cardinality--;
3538 }
3539 }
3540 }
3541 dictReleaseIterator(di);
3542
3543 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3544 }
3545
3546 /* Output the content of the resulting set, if not in STORE mode */
3547 if (!dstkey) {
3548 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3549 di = dictGetIterator(dstset->ptr);
3550 if (!di) oom("dictGetIterator");
3551 while((de = dictNext(di)) != NULL) {
3552 robj *ele;
3553
3554 ele = dictGetEntryKey(de);
3555 addReplyBulkLen(c,ele);
3556 addReply(c,ele);
3557 addReply(c,shared.crlf);
3558 }
3559 dictReleaseIterator(di);
3560 } else {
3561 /* If we have a target key where to store the resulting set
3562 * create this key with the result set inside */
3563 deleteKey(c->db,dstkey);
3564 dictAdd(c->db->dict,dstkey,dstset);
3565 incrRefCount(dstkey);
3566 }
3567
3568 /* Cleanup */
3569 if (!dstkey) {
3570 decrRefCount(dstset);
3571 } else {
3572 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3573 dictSize((dict*)dstset->ptr)));
3574 server.dirty++;
3575 }
3576 zfree(dv);
3577 }
3578
3579 static void sunionCommand(redisClient *c) {
3580 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3581 }
3582
3583 static void sunionstoreCommand(redisClient *c) {
3584 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3585 }
3586
3587 static void sdiffCommand(redisClient *c) {
3588 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3589 }
3590
3591 static void sdiffstoreCommand(redisClient *c) {
3592 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3593 }
3594
3595 static void flushdbCommand(redisClient *c) {
3596 server.dirty += dictSize(c->db->dict);
3597 dictEmpty(c->db->dict);
3598 dictEmpty(c->db->expires);
3599 addReply(c,shared.ok);
3600 }
3601
3602 static void flushallCommand(redisClient *c) {
3603 server.dirty += emptyDb();
3604 addReply(c,shared.ok);
3605 rdbSave(server.dbfilename);
3606 server.dirty++;
3607 }
3608
3609 static redisSortOperation *createSortOperation(int type, robj *pattern) {
3610 redisSortOperation *so = zmalloc(sizeof(*so));
3611 if (!so) oom("createSortOperation");
3612 so->type = type;
3613 so->pattern = pattern;
3614 return so;
3615 }
3616
3617 /* Return the value associated to the key with a name obtained
3618 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3619 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3620 char *p;
3621 sds spat, ssub;
3622 robj keyobj;
3623 int prefixlen, sublen, postfixlen;
3624 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3625 struct {
3626 int len;
3627 unsigned short free;
3628 unsigned short _len; /* not used here */
3629 char buf[REDIS_SORTKEY_MAX+1];
3630 } keyname;
3631
3632 if (subst->encoding == REDIS_ENCODING_RAW)
3633 incrRefCount(subst);
3634 else {
3635 subst = getDecodedObject(subst);
3636 }
3637
3638 spat = pattern->ptr;
3639 ssub = subst->ptr;
3640 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3641 p = strchr(spat,'*');
3642 if (!p) return NULL;
3643
3644 prefixlen = p-spat;
3645 sublen = sdslen(ssub);
3646 postfixlen = sdslen(spat)-(prefixlen+1);
3647 memcpy(keyname.buf,spat,prefixlen);
3648 memcpy(keyname.buf+prefixlen,ssub,sublen);
3649 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3650 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3651 keyname.len = prefixlen+sublen+postfixlen;
3652 keyname._len = USHRT_MAX;
3653
3654 keyobj.refcount = 1;
3655 keyobj.type = REDIS_STRING;
3656 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3657
3658 decrRefCount(subst);
3659
3660 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3661 return lookupKeyRead(db,&keyobj);
3662 }
3663
3664 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3665 * the additional parameter is not standard but a BSD-specific we have to
3666 * pass sorting parameters via the global 'server' structure */
3667 static int sortCompare(const void *s1, const void *s2) {
3668 const redisSortObject *so1 = s1, *so2 = s2;
3669 int cmp;
3670
3671 if (!server.sort_alpha) {
3672 /* Numeric sorting. Here it's trivial as we precomputed scores */
3673 if (so1->u.score > so2->u.score) {
3674 cmp = 1;
3675 } else if (so1->u.score < so2->u.score) {
3676 cmp = -1;
3677 } else {
3678 cmp = 0;
3679 }
3680 } else {
3681 /* Alphanumeric sorting */
3682 if (server.sort_bypattern) {
3683 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3684 /* At least one compare object is NULL */
3685 if (so1->u.cmpobj == so2->u.cmpobj)
3686 cmp = 0;
3687 else if (so1->u.cmpobj == NULL)
3688 cmp = -1;
3689 else
3690 cmp = 1;
3691 } else {
3692 /* We have both the objects, use strcoll */
3693 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3694 }
3695 } else {
3696 /* Compare elements directly */
3697 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
3698 so2->obj->encoding == REDIS_ENCODING_RAW) {
3699 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3700 } else {
3701 robj *dec1, *dec2;
3702
3703 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
3704 so1->obj : getDecodedObject(so1->obj);
3705 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
3706 so2->obj : getDecodedObject(so2->obj);
3707 cmp = strcoll(dec1->ptr,dec2->ptr);
3708 if (dec1 != so1->obj) decrRefCount(dec1);
3709 if (dec2 != so2->obj) decrRefCount(dec2);
3710 }
3711 }
3712 }
3713 return server.sort_desc ? -cmp : cmp;
3714 }
3715
3716 /* The SORT command is the most complex command in Redis. Warning: this code
3717 * is optimized for speed and a bit less for readability */
3718 static void sortCommand(redisClient *c) {
3719 list *operations;
3720 int outputlen = 0;
3721 int desc = 0, alpha = 0;
3722 int limit_start = 0, limit_count = -1, start, end;
3723 int j, dontsort = 0, vectorlen;
3724 int getop = 0; /* GET operation counter */
3725 robj *sortval, *sortby = NULL;
3726 redisSortObject *vector; /* Resulting vector to sort */
3727
3728 /* Lookup the key to sort. It must be of the right types */
3729 sortval = lookupKeyRead(c->db,c->argv[1]);
3730 if (sortval == NULL) {
3731 addReply(c,shared.nokeyerr);
3732 return;
3733 }
3734 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3735 addReply(c,shared.wrongtypeerr);
3736 return;
3737 }
3738
3739 /* Create a list of operations to perform for every sorted element.
3740 * Operations can be GET/DEL/INCR/DECR */
3741 operations = listCreate();
3742 listSetFreeMethod(operations,zfree);
3743 j = 2;
3744
3745 /* Now we need to protect sortval incrementing its count, in the future
3746 * SORT may have options able to overwrite/delete keys during the sorting
3747 * and the sorted key itself may get destroied */
3748 incrRefCount(sortval);
3749
3750 /* The SORT command has an SQL-alike syntax, parse it */
3751 while(j < c->argc) {
3752 int leftargs = c->argc-j-1;
3753 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3754 desc = 0;
3755 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3756 desc = 1;
3757 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3758 alpha = 1;
3759 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3760 limit_start = atoi(c->argv[j+1]->ptr);
3761 limit_count = atoi(c->argv[j+2]->ptr);
3762 j+=2;
3763 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3764 sortby = c->argv[j+1];
3765 /* If the BY pattern does not contain '*', i.e. it is constant,
3766 * we don't need to sort nor to lookup the weight keys. */
3767 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3768 j++;
3769 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3770 listAddNodeTail(operations,createSortOperation(
3771 REDIS_SORT_GET,c->argv[j+1]));
3772 getop++;
3773 j++;
3774 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3775 listAddNodeTail(operations,createSortOperation(
3776 REDIS_SORT_DEL,c->argv[j+1]));
3777 j++;
3778 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3779 listAddNodeTail(operations,createSortOperation(
3780 REDIS_SORT_INCR,c->argv[j+1]));
3781 j++;
3782 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3783 listAddNodeTail(operations,createSortOperation(
3784 REDIS_SORT_DECR,c->argv[j+1]));
3785 j++;
3786 } else {
3787 decrRefCount(sortval);
3788 listRelease(operations);
3789 addReply(c,shared.syntaxerr);
3790 return;
3791 }
3792 j++;
3793 }
3794
3795 /* Load the sorting vector with all the objects to sort */
3796 vectorlen = (sortval->type == REDIS_LIST) ?
3797 listLength((list*)sortval->ptr) :
3798 dictSize((dict*)sortval->ptr);
3799 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3800 if (!vector) oom("allocating objects vector for SORT");
3801 j = 0;
3802 if (sortval->type == REDIS_LIST) {
3803 list *list = sortval->ptr;
3804 listNode *ln;
3805
3806 listRewind(list);
3807 while((ln = listYield(list))) {
3808 robj *ele = ln->value;
3809 vector[j].obj = ele;
3810 vector[j].u.score = 0;
3811 vector[j].u.cmpobj = NULL;
3812 j++;
3813 }
3814 } else {
3815 dict *set = sortval->ptr;
3816 dictIterator *di;
3817 dictEntry *setele;
3818
3819 di = dictGetIterator(set);
3820 if (!di) oom("dictGetIterator");
3821 while((setele = dictNext(di)) != NULL) {
3822 vector[j].obj = dictGetEntryKey(setele);
3823 vector[j].u.score = 0;
3824 vector[j].u.cmpobj = NULL;
3825 j++;
3826 }
3827 dictReleaseIterator(di);
3828 }
3829 assert(j == vectorlen);
3830
3831 /* Now it's time to load the right scores in the sorting vector */
3832 if (dontsort == 0) {
3833 for (j = 0; j < vectorlen; j++) {
3834 if (sortby) {
3835 robj *byval;
3836
3837 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3838 if (!byval || byval->type != REDIS_STRING) continue;
3839 if (alpha) {
3840 if (byval->encoding == REDIS_ENCODING_RAW) {
3841 vector[j].u.cmpobj = byval;
3842 incrRefCount(byval);
3843 } else {
3844 vector[j].u.cmpobj = getDecodedObject(byval);
3845 }
3846 } else {
3847 if (byval->encoding == REDIS_ENCODING_RAW) {
3848 vector[j].u.score = strtod(byval->ptr,NULL);
3849 } else {
3850 if (byval->encoding == REDIS_ENCODING_INT)
3851 vector[j].u.score = (long)byval->ptr;
3852 else
3853 assert(1 != 1);
3854 }
3855 }
3856 } else {
3857 if (!alpha) {
3858 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
3859 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3860 else {
3861 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
3862 vector[j].u.score = (long) vector[j].obj->ptr;
3863 else
3864 assert(1 != 1);
3865 }
3866 }
3867 }
3868 }
3869 }
3870
3871 /* We are ready to sort the vector... perform a bit of sanity check
3872 * on the LIMIT option too. We'll use a partial version of quicksort. */
3873 start = (limit_start < 0) ? 0 : limit_start;
3874 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3875 if (start >= vectorlen) {
3876 start = vectorlen-1;
3877 end = vectorlen-2;
3878 }
3879 if (end >= vectorlen) end = vectorlen-1;
3880
3881 if (dontsort == 0) {
3882 server.sort_desc = desc;
3883 server.sort_alpha = alpha;
3884 server.sort_bypattern = sortby ? 1 : 0;
3885 if (sortby && (start != 0 || end != vectorlen-1))
3886 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3887 else
3888 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3889 }
3890
3891 /* Send command output to the output buffer, performing the specified
3892 * GET/DEL/INCR/DECR operations if any. */
3893 outputlen = getop ? getop*(end-start+1) : end-start+1;
3894 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3895 for (j = start; j <= end; j++) {
3896 listNode *ln;
3897 if (!getop) {
3898 addReplyBulkLen(c,vector[j].obj);
3899 addReply(c,vector[j].obj);
3900 addReply(c,shared.crlf);
3901 }
3902 listRewind(operations);
3903 while((ln = listYield(operations))) {
3904 redisSortOperation *sop = ln->value;
3905 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3906 vector[j].obj);
3907
3908 if (sop->type == REDIS_SORT_GET) {
3909 if (!val || val->type != REDIS_STRING) {
3910 addReply(c,shared.nullbulk);
3911 } else {
3912 addReplyBulkLen(c,val);
3913 addReply(c,val);
3914 addReply(c,shared.crlf);
3915 }
3916 } else if (sop->type == REDIS_SORT_DEL) {
3917 /* TODO */
3918 }
3919 }
3920 }
3921
3922 /* Cleanup */
3923 decrRefCount(sortval);
3924 listRelease(operations);
3925 for (j = 0; j < vectorlen; j++) {
3926 if (sortby && alpha && vector[j].u.cmpobj)
3927 decrRefCount(vector[j].u.cmpobj);
3928 }
3929 zfree(vector);
3930 }
3931
3932 static void infoCommand(redisClient *c) {
3933 sds info;
3934 time_t uptime = time(NULL)-server.stat_starttime;
3935 int j;
3936
3937 info = sdscatprintf(sdsempty(),
3938 "redis_version:%s\r\n"
3939 "uptime_in_seconds:%d\r\n"
3940 "uptime_in_days:%d\r\n"
3941 "connected_clients:%d\r\n"
3942 "connected_slaves:%d\r\n"
3943 "used_memory:%zu\r\n"
3944 "changes_since_last_save:%lld\r\n"
3945 "bgsave_in_progress:%d\r\n"
3946 "last_save_time:%d\r\n"
3947 "total_connections_received:%lld\r\n"
3948 "total_commands_processed:%lld\r\n"
3949 "role:%s\r\n"
3950 ,REDIS_VERSION,
3951 uptime,
3952 uptime/(3600*24),
3953 listLength(server.clients)-listLength(server.slaves),
3954 listLength(server.slaves),
3955 server.usedmemory,
3956 server.dirty,
3957 server.bgsaveinprogress,
3958 server.lastsave,
3959 server.stat_numconnections,
3960 server.stat_numcommands,
3961 server.masterhost == NULL ? "master" : "slave"
3962 );
3963 if (server.masterhost) {
3964 info = sdscatprintf(info,
3965 "master_host:%s\r\n"
3966 "master_port:%d\r\n"
3967 "master_link_status:%s\r\n"
3968 "master_last_io_seconds_ago:%d\r\n"
3969 ,server.masterhost,
3970 server.masterport,
3971 (server.replstate == REDIS_REPL_CONNECTED) ?
3972 "up" : "down",
3973 (int)(time(NULL)-server.master->lastinteraction)
3974 );
3975 }
3976 for (j = 0; j < server.dbnum; j++) {
3977 long long keys, vkeys;
3978
3979 keys = dictSize(server.db[j].dict);
3980 vkeys = dictSize(server.db[j].expires);
3981 if (keys || vkeys) {
3982 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
3983 j, keys, vkeys);
3984 }
3985 }
3986 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
3987 addReplySds(c,info);
3988 addReply(c,shared.crlf);
3989 }
3990
3991 static void monitorCommand(redisClient *c) {
3992 /* ignore MONITOR if aleady slave or in monitor mode */
3993 if (c->flags & REDIS_SLAVE) return;
3994
3995 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3996 c->slaveseldb = 0;
3997 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3998 addReply(c,shared.ok);
3999 }
4000
4001 /* ================================= Expire ================================= */
4002 static int removeExpire(redisDb *db, robj *key) {
4003 if (dictDelete(db->expires,key) == DICT_OK) {
4004 return 1;
4005 } else {
4006 return 0;
4007 }
4008 }
4009
4010 static int setExpire(redisDb *db, robj *key, time_t when) {
4011 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4012 return 0;
4013 } else {
4014 incrRefCount(key);
4015 return 1;
4016 }
4017 }
4018
4019 /* Return the expire time of the specified key, or -1 if no expire
4020 * is associated with this key (i.e. the key is non volatile) */
4021 static time_t getExpire(redisDb *db, robj *key) {
4022 dictEntry *de;
4023
4024 /* No expire? return ASAP */
4025 if (dictSize(db->expires) == 0 ||
4026 (de = dictFind(db->expires,key)) == NULL) return -1;
4027
4028 return (time_t) dictGetEntryVal(de);
4029 }
4030
4031 static int expireIfNeeded(redisDb *db, robj *key) {
4032 time_t when;
4033 dictEntry *de;
4034
4035 /* No expire? return ASAP */
4036 if (dictSize(db->expires) == 0 ||
4037 (de = dictFind(db->expires,key)) == NULL) return 0;
4038
4039 /* Lookup the expire */
4040 when = (time_t) dictGetEntryVal(de);
4041 if (time(NULL) <= when) return 0;
4042
4043 /* Delete the key */
4044 dictDelete(db->expires,key);
4045 return dictDelete(db->dict,key) == DICT_OK;
4046 }
4047
4048 static int deleteIfVolatile(redisDb *db, robj *key) {
4049 dictEntry *de;
4050
4051 /* No expire? return ASAP */
4052 if (dictSize(db->expires) == 0 ||
4053 (de = dictFind(db->expires,key)) == NULL) return 0;
4054
4055 /* Delete the key */
4056 server.dirty++;
4057 dictDelete(db->expires,key);
4058 return dictDelete(db->dict,key) == DICT_OK;
4059 }
4060
4061 static void expireCommand(redisClient *c) {
4062 dictEntry *de;
4063 int seconds = atoi(c->argv[2]->ptr);
4064
4065 de = dictFind(c->db->dict,c->argv[1]);
4066 if (de == NULL) {
4067 addReply(c,shared.czero);
4068 return;
4069 }
4070 if (seconds <= 0) {
4071 addReply(c, shared.czero);
4072 return;
4073 } else {
4074 time_t when = time(NULL)+seconds;
4075 if (setExpire(c->db,c->argv[1],when)) {
4076 addReply(c,shared.cone);
4077 server.dirty++;
4078 } else {
4079 addReply(c,shared.czero);
4080 }
4081 return;
4082 }
4083 }
4084
4085 static void ttlCommand(redisClient *c) {
4086 time_t expire;
4087 int ttl = -1;
4088
4089 expire = getExpire(c->db,c->argv[1]);
4090 if (expire != -1) {
4091 ttl = (int) (expire-time(NULL));
4092 if (ttl < 0) ttl = -1;
4093 }
4094 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4095 }
4096
4097 /* =============================== Replication ============================= */
4098
4099 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4100 ssize_t nwritten, ret = size;
4101 time_t start = time(NULL);
4102
4103 timeout++;
4104 while(size) {
4105 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4106 nwritten = write(fd,ptr,size);
4107 if (nwritten == -1) return -1;
4108 ptr += nwritten;
4109 size -= nwritten;
4110 }
4111 if ((time(NULL)-start) > timeout) {
4112 errno = ETIMEDOUT;
4113 return -1;
4114 }
4115 }
4116 return ret;
4117 }
4118
4119 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4120 ssize_t nread, totread = 0;
4121 time_t start = time(NULL);
4122
4123 timeout++;
4124 while(size) {
4125 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4126 nread = read(fd,ptr,size);
4127 if (nread == -1) return -1;
4128 ptr += nread;
4129 size -= nread;
4130 totread += nread;
4131 }
4132 if ((time(NULL)-start) > timeout) {
4133 errno = ETIMEDOUT;
4134 return -1;
4135 }
4136 }
4137 return totread;
4138 }
4139
4140 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4141 ssize_t nread = 0;
4142
4143 size--;
4144 while(size) {
4145 char c;
4146
4147 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4148 if (c == '\n') {
4149 *ptr = '\0';
4150 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4151 return nread;
4152 } else {
4153 *ptr++ = c;
4154 *ptr = '\0';
4155 nread++;
4156 }
4157 }
4158 return nread;
4159 }
4160
4161 static void syncCommand(redisClient *c) {
4162 /* ignore SYNC if aleady slave or in monitor mode */
4163 if (c->flags & REDIS_SLAVE) return;
4164
4165 /* SYNC can't be issued when the server has pending data to send to
4166 * the client about already issued commands. We need a fresh reply
4167 * buffer registering the differences between the BGSAVE and the current
4168 * dataset, so that we can copy to other slaves if needed. */
4169 if (listLength(c->reply) != 0) {
4170 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4171 return;
4172 }
4173
4174 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4175 /* Here we need to check if there is a background saving operation
4176 * in progress, or if it is required to start one */
4177 if (server.bgsaveinprogress) {
4178 /* Ok a background save is in progress. Let's check if it is a good
4179 * one for replication, i.e. if there is another slave that is
4180 * registering differences since the server forked to save */
4181 redisClient *slave;
4182 listNode *ln;
4183
4184 listRewind(server.slaves);
4185 while((ln = listYield(server.slaves))) {
4186 slave = ln->value;
4187 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4188 }
4189 if (ln) {
4190 /* Perfect, the server is already registering differences for
4191 * another slave. Set the right state, and copy the buffer. */
4192 listRelease(c->reply);
4193 c->reply = listDup(slave->reply);
4194 if (!c->reply) oom("listDup copying slave reply list");
4195 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4196 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4197 } else {
4198 /* No way, we need to wait for the next BGSAVE in order to
4199 * register differences */
4200 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4201 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4202 }
4203 } else {
4204 /* Ok we don't have a BGSAVE in progress, let's start one */
4205 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4206 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4207 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4208 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4209 return;
4210 }
4211 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4212 }
4213 c->repldbfd = -1;
4214 c->flags |= REDIS_SLAVE;
4215 c->slaveseldb = 0;
4216 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
4217 return;
4218 }
4219
4220 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4221 redisClient *slave = privdata;
4222 REDIS_NOTUSED(el);
4223 REDIS_NOTUSED(mask);
4224 char buf[REDIS_IOBUF_LEN];
4225 ssize_t nwritten, buflen;
4226
4227 if (slave->repldboff == 0) {
4228 /* Write the bulk write count before to transfer the DB. In theory here
4229 * we don't know how much room there is in the output buffer of the
4230 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4231 * operations) will never be smaller than the few bytes we need. */
4232 sds bulkcount;
4233
4234 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4235 slave->repldbsize);
4236 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4237 {
4238 sdsfree(bulkcount);
4239 freeClient(slave);
4240 return;
4241 }
4242 sdsfree(bulkcount);
4243 }
4244 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4245 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4246 if (buflen <= 0) {
4247 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4248 (buflen == 0) ? "premature EOF" : strerror(errno));
4249 freeClient(slave);
4250 return;
4251 }
4252 if ((nwritten = write(fd,buf,buflen)) == -1) {
4253 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4254 strerror(errno));
4255 freeClient(slave);
4256 return;
4257 }
4258 slave->repldboff += nwritten;
4259 if (slave->repldboff == slave->repldbsize) {
4260 close(slave->repldbfd);
4261 slave->repldbfd = -1;
4262 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4263 slave->replstate = REDIS_REPL_ONLINE;
4264 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4265 sendReplyToClient, slave, NULL) == AE_ERR) {
4266 freeClient(slave);
4267 return;
4268 }
4269 addReplySds(slave,sdsempty());
4270 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4271 }
4272 }
4273
4274 /* This function is called at the end of every backgrond saving.
4275 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4276 * otherwise REDIS_ERR is passed to the function.
4277 *
4278 * The goal of this function is to handle slaves waiting for a successful
4279 * background saving in order to perform non-blocking synchronization. */
4280 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4281 listNode *ln;
4282 int startbgsave = 0;
4283
4284 listRewind(server.slaves);
4285 while((ln = listYield(server.slaves))) {
4286 redisClient *slave = ln->value;
4287
4288 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4289 startbgsave = 1;
4290 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4291 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4292 struct redis_stat buf;
4293
4294 if (bgsaveerr != REDIS_OK) {
4295 freeClient(slave);
4296 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4297 continue;
4298 }
4299 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4300 redis_fstat(slave->repldbfd,&buf) == -1) {
4301 freeClient(slave);
4302 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4303 continue;
4304 }
4305 slave->repldboff = 0;
4306 slave->repldbsize = buf.st_size;
4307 slave->replstate = REDIS_REPL_SEND_BULK;
4308 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4309 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4310 freeClient(slave);
4311 continue;
4312 }
4313 }
4314 }
4315 if (startbgsave) {
4316 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4317 listRewind(server.slaves);
4318 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4319 while((ln = listYield(server.slaves))) {
4320 redisClient *slave = ln->value;
4321
4322 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4323 freeClient(slave);
4324 }
4325 }
4326 }
4327 }
4328
4329 static int syncWithMaster(void) {
4330 char buf[1024], tmpfile[256];
4331 int dumpsize;
4332 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4333 int dfd;
4334
4335 if (fd == -1) {
4336 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4337 strerror(errno));
4338 return REDIS_ERR;
4339 }
4340 /* Issue the SYNC command */
4341 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4342 close(fd);
4343 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4344 strerror(errno));
4345 return REDIS_ERR;
4346 }
4347 /* Read the bulk write count */
4348 if (syncReadLine(fd,buf,1024,3600) == -1) {
4349 close(fd);
4350 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
4351 strerror(errno));
4352 return REDIS_ERR;
4353 }
4354 dumpsize = atoi(buf+1);
4355 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4356 /* Read the bulk write data on a temp file */
4357 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4358 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4359 if (dfd == -1) {
4360 close(fd);
4361 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
4362 return REDIS_ERR;
4363 }
4364 while(dumpsize) {
4365 int nread, nwritten;
4366
4367 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
4368 if (nread == -1) {
4369 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
4370 strerror(errno));
4371 close(fd);
4372 close(dfd);
4373 return REDIS_ERR;
4374 }
4375 nwritten = write(dfd,buf,nread);
4376 if (nwritten == -1) {
4377 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
4378 close(fd);
4379 close(dfd);
4380 return REDIS_ERR;
4381 }
4382 dumpsize -= nread;
4383 }
4384 close(dfd);
4385 if (rename(tmpfile,server.dbfilename) == -1) {
4386 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
4387 unlink(tmpfile);
4388 close(fd);
4389 return REDIS_ERR;
4390 }
4391 emptyDb();
4392 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4393 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4394 close(fd);
4395 return REDIS_ERR;
4396 }
4397 server.master = createClient(fd);
4398 server.master->flags |= REDIS_MASTER;
4399 server.replstate = REDIS_REPL_CONNECTED;
4400 return REDIS_OK;
4401 }
4402
4403 static void slaveofCommand(redisClient *c) {
4404 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4405 !strcasecmp(c->argv[2]->ptr,"one")) {
4406 if (server.masterhost) {
4407 sdsfree(server.masterhost);
4408 server.masterhost = NULL;
4409 if (server.master) freeClient(server.master);
4410 server.replstate = REDIS_REPL_NONE;
4411 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4412 }
4413 } else {
4414 sdsfree(server.masterhost);
4415 server.masterhost = sdsdup(c->argv[1]->ptr);
4416 server.masterport = atoi(c->argv[2]->ptr);
4417 if (server.master) freeClient(server.master);
4418 server.replstate = REDIS_REPL_CONNECT;
4419 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4420 server.masterhost, server.masterport);
4421 }
4422 addReply(c,shared.ok);
4423 }
4424
4425 /* ============================ Maxmemory directive ======================== */
4426
4427 /* This function gets called when 'maxmemory' is set on the config file to limit
4428 * the max memory used by the server, and we are out of memory.
4429 * This function will try to, in order:
4430 *
4431 * - Free objects from the free list
4432 * - Try to remove keys with an EXPIRE set
4433 *
4434 * It is not possible to free enough memory to reach used-memory < maxmemory
4435 * the server will start refusing commands that will enlarge even more the
4436 * memory usage.
4437 */
4438 static void freeMemoryIfNeeded(void) {
4439 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4440 if (listLength(server.objfreelist)) {
4441 robj *o;
4442
4443 listNode *head = listFirst(server.objfreelist);
4444 o = listNodeValue(head);
4445 listDelNode(server.objfreelist,head);
4446 zfree(o);
4447 } else {
4448 int j, k, freed = 0;
4449
4450 for (j = 0; j < server.dbnum; j++) {
4451 int minttl = -1;
4452 robj *minkey = NULL;
4453 struct dictEntry *de;
4454
4455 if (dictSize(server.db[j].expires)) {
4456 freed = 1;
4457 /* From a sample of three keys drop the one nearest to
4458 * the natural expire */
4459 for (k = 0; k < 3; k++) {
4460 time_t t;
4461
4462 de = dictGetRandomKey(server.db[j].expires);
4463 t = (time_t) dictGetEntryVal(de);
4464 if (minttl == -1 || t < minttl) {
4465 minkey = dictGetEntryKey(de);
4466 minttl = t;
4467 }
4468 }
4469 deleteKey(server.db+j,minkey);
4470 }
4471 }
4472 if (!freed) return; /* nothing to free... */
4473 }
4474 }
4475 }
4476
4477 /* ================================= Debugging ============================== */
4478
4479 static void debugCommand(redisClient *c) {
4480 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4481 *((char*)-1) = 'x';
4482 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4483 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4484 robj *key, *val;
4485
4486 if (!de) {
4487 addReply(c,shared.nokeyerr);
4488 return;
4489 }
4490 key = dictGetEntryKey(de);
4491 val = dictGetEntryVal(de);
4492 addReplySds(c,sdscatprintf(sdsempty(),
4493 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4494 key, key->refcount, val, val->refcount, val->encoding));
4495 } else {
4496 addReplySds(c,sdsnew(
4497 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4498 }
4499 }
4500
4501 #ifdef HAVE_BACKTRACE
4502 static struct redisFunctionSym symsTable[] = {
4503 {"compareStringObjects", (unsigned long)compareStringObjects},
4504 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
4505 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
4506 {"dictEncObjHash", (unsigned long)dictEncObjHash},
4507 {"incrDecrCommand", (unsigned long)incrDecrCommand},
4508 {"freeStringObject", (unsigned long)freeStringObject},
4509 {"freeListObject", (unsigned long)freeListObject},
4510 {"freeSetObject", (unsigned long)freeSetObject},
4511 {"decrRefCount", (unsigned long)decrRefCount},
4512 {"createObject", (unsigned long)createObject},
4513 {"freeClient", (unsigned long)freeClient},
4514 {"rdbLoad", (unsigned long)rdbLoad},
4515 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
4516 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
4517 {"addReply", (unsigned long)addReply},
4518 {"addReplySds", (unsigned long)addReplySds},
4519 {"incrRefCount", (unsigned long)incrRefCount},
4520 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
4521 {"createStringObject", (unsigned long)createStringObject},
4522 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
4523 {"syncWithMaster", (unsigned long)syncWithMaster},
4524 {"tryObjectSharing", (unsigned long)tryObjectSharing},
4525 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
4526 {"getDecodedObject", (unsigned long)getDecodedObject},
4527 {"removeExpire", (unsigned long)removeExpire},
4528 {"expireIfNeeded", (unsigned long)expireIfNeeded},
4529 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
4530 {"deleteKey", (unsigned long)deleteKey},
4531 {"getExpire", (unsigned long)getExpire},
4532 {"setExpire", (unsigned long)setExpire},
4533 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
4534 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
4535 {"authCommand", (unsigned long)authCommand},
4536 {"pingCommand", (unsigned long)pingCommand},
4537 {"echoCommand", (unsigned long)echoCommand},
4538 {"setCommand", (unsigned long)setCommand},
4539 {"setnxCommand", (unsigned long)setnxCommand},
4540 {"getCommand", (unsigned long)getCommand},
4541 {"delCommand", (unsigned long)delCommand},
4542 {"existsCommand", (unsigned long)existsCommand},
4543 {"incrCommand", (unsigned long)incrCommand},
4544 {"decrCommand", (unsigned long)decrCommand},
4545 {"incrbyCommand", (unsigned long)incrbyCommand},
4546 {"decrbyCommand", (unsigned long)decrbyCommand},
4547 {"selectCommand", (unsigned long)selectCommand},
4548 {"randomkeyCommand", (unsigned long)randomkeyCommand},
4549 {"keysCommand", (unsigned long)keysCommand},
4550 {"dbsizeCommand", (unsigned long)dbsizeCommand},
4551 {"lastsaveCommand", (unsigned long)lastsaveCommand},
4552 {"saveCommand", (unsigned long)saveCommand},
4553 {"bgsaveCommand", (unsigned long)bgsaveCommand},
4554 {"shutdownCommand", (unsigned long)shutdownCommand},
4555 {"moveCommand", (unsigned long)moveCommand},
4556 {"renameCommand", (unsigned long)renameCommand},
4557 {"renamenxCommand", (unsigned long)renamenxCommand},
4558 {"lpushCommand", (unsigned long)lpushCommand},
4559 {"rpushCommand", (unsigned long)rpushCommand},
4560 {"lpopCommand", (unsigned long)lpopCommand},
4561 {"rpopCommand", (unsigned long)rpopCommand},
4562 {"llenCommand", (unsigned long)llenCommand},
4563 {"lindexCommand", (unsigned long)lindexCommand},
4564 {"lrangeCommand", (unsigned long)lrangeCommand},
4565 {"ltrimCommand", (unsigned long)ltrimCommand},
4566 {"typeCommand", (unsigned long)typeCommand},
4567 {"lsetCommand", (unsigned long)lsetCommand},
4568 {"saddCommand", (unsigned long)saddCommand},
4569 {"sremCommand", (unsigned long)sremCommand},
4570 {"smoveCommand", (unsigned long)smoveCommand},
4571 {"sismemberCommand", (unsigned long)sismemberCommand},
4572 {"scardCommand", (unsigned long)scardCommand},
4573 {"spopCommand", (unsigned long)spopCommand},
4574 {"sinterCommand", (unsigned long)sinterCommand},
4575 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
4576 {"sunionCommand", (unsigned long)sunionCommand},
4577 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
4578 {"sdiffCommand", (unsigned long)sdiffCommand},
4579 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
4580 {"syncCommand", (unsigned long)syncCommand},
4581 {"flushdbCommand", (unsigned long)flushdbCommand},
4582 {"flushallCommand", (unsigned long)flushallCommand},
4583 {"sortCommand", (unsigned long)sortCommand},
4584 {"lremCommand", (unsigned long)lremCommand},
4585 {"infoCommand", (unsigned long)infoCommand},
4586 {"mgetCommand", (unsigned long)mgetCommand},
4587 {"monitorCommand", (unsigned long)monitorCommand},
4588 {"expireCommand", (unsigned long)expireCommand},
4589 {"getSetCommand", (unsigned long)getSetCommand},
4590 {"ttlCommand", (unsigned long)ttlCommand},
4591 {"slaveofCommand", (unsigned long)slaveofCommand},
4592 {"debugCommand", (unsigned long)debugCommand},
4593 {"processCommand", (unsigned long)processCommand},
4594 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
4595 {"readQueryFromClient", (unsigned long)readQueryFromClient},
4596 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
4597 {NULL,0}
4598 };
4599
4600 /* This function try to convert a pointer into a function name. It's used in
4601 * oreder to provide a backtrace under segmentation fault that's able to
4602 * display functions declared as static (otherwise the backtrace is useless). */
4603 static char *findFuncName(void *pointer, unsigned long *offset){
4604 int i, ret = -1;
4605 unsigned long off, minoff = 0;
4606
4607 /* Try to match against the Symbol with the smallest offset */
4608 for (i=0; symsTable[i].pointer; i++) {
4609 unsigned long lp = (unsigned long) pointer;
4610
4611 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
4612 off=lp-symsTable[i].pointer;
4613 if (ret < 0 || off < minoff) {
4614 minoff=off;
4615 ret=i;
4616 }
4617 }
4618 }
4619 if (ret == -1) return NULL;
4620 *offset = minoff;
4621 return symsTable[ret].name;
4622 }
4623
4624 static void *getMcontextEip(ucontext_t *uc) {
4625 #if defined(__FreeBSD__)
4626 return (void*) uc->uc_mcontext.mc_eip;
4627 #elif defined(__dietlibc__)
4628 return (void*) uc->uc_mcontext.eip;
4629 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
4630 return (void*) uc->uc_mcontext->__ss.__eip;
4631 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
4632 #ifdef _STRUCT_X86_THREAD_STATE64
4633 return (void*) uc->uc_mcontext->__ss.__rip;
4634 #else
4635 return (void*) uc->uc_mcontext->__ss.__eip;
4636 #endif
4637 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
4638 return (void*) uc->uc_mcontext.gregs[REG_EIP];
4639 #elif defined(__ia64__) /* Linux IA64 */
4640 return (void*) uc->uc_mcontext.sc_ip;
4641 #else
4642 return NULL;
4643 #endif
4644 }
4645
4646 static void segvHandler(int sig, siginfo_t *info, void *secret) {
4647 void *trace[100];
4648 char **messages = NULL;
4649 int i, trace_size = 0;
4650 unsigned long offset=0;
4651 time_t uptime = time(NULL)-server.stat_starttime;
4652 ucontext_t *uc = (ucontext_t*) secret;
4653 REDIS_NOTUSED(info);
4654
4655 redisLog(REDIS_WARNING,
4656 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
4657 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
4658 "redis_version:%s; "
4659 "uptime_in_seconds:%d; "
4660 "connected_clients:%d; "
4661 "connected_slaves:%d; "
4662 "used_memory:%zu; "
4663 "changes_since_last_save:%lld; "
4664 "bgsave_in_progress:%d; "
4665 "last_save_time:%d; "
4666 "total_connections_received:%lld; "
4667 "total_commands_processed:%lld; "
4668 "role:%s;"
4669 ,REDIS_VERSION,
4670 uptime,
4671 listLength(server.clients)-listLength(server.slaves),
4672 listLength(server.slaves),
4673 server.usedmemory,
4674 server.dirty,
4675 server.bgsaveinprogress,
4676 server.lastsave,
4677 server.stat_numconnections,
4678 server.stat_numcommands,
4679 server.masterhost == NULL ? "master" : "slave"
4680 ));
4681
4682 trace_size = backtrace(trace, 100);
4683 /* overwrite sigaction with caller's address */
4684 if (getMcontextEip(uc) != NULL) {
4685 trace[1] = getMcontextEip(uc);
4686 }
4687 messages = backtrace_symbols(trace, trace_size);
4688
4689 for (i=1; i<trace_size; ++i) {
4690 char *fn = findFuncName(trace[i], &offset), *p;
4691
4692 p = strchr(messages[i],'+');
4693 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
4694 redisLog(REDIS_WARNING,"%s", messages[i]);
4695 } else {
4696 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
4697 }
4698 }
4699 free(messages);
4700 exit(0);
4701 }
4702
4703 static void setupSigSegvAction(void) {
4704 struct sigaction act;
4705
4706 sigemptyset (&act.sa_mask);
4707 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4708 * is used. Otherwise, sa_handler is used */
4709 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
4710 act.sa_sigaction = segvHandler;
4711 sigaction (SIGSEGV, &act, NULL);
4712 sigaction (SIGBUS, &act, NULL);
4713 sigaction (SIGFPE, &act, NULL);
4714 sigaction (SIGILL, &act, NULL);
4715 sigaction (SIGBUS, &act, NULL);
4716 return;
4717 }
4718 #else /* HAVE_BACKTRACE */
4719 static void setupSigSegvAction(void) {
4720 }
4721 #endif /* HAVE_BACKTRACE */
4722
4723 /* =================================== Main! ================================ */
4724
4725 #ifdef __linux__
4726 int linuxOvercommitMemoryValue(void) {
4727 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4728 char buf[64];
4729
4730 if (!fp) return -1;
4731 if (fgets(buf,64,fp) == NULL) {
4732 fclose(fp);
4733 return -1;
4734 }
4735 fclose(fp);
4736
4737 return atoi(buf);
4738 }
4739
4740 void linuxOvercommitMemoryWarning(void) {
4741 if (linuxOvercommitMemoryValue() == 0) {
4742 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
4743 }
4744 }
4745 #endif /* __linux__ */
4746
4747 static void daemonize(void) {
4748 int fd;
4749 FILE *fp;
4750
4751 if (fork() != 0) exit(0); /* parent exits */
4752 setsid(); /* create a new session */
4753
4754 /* Every output goes to /dev/null. If Redis is daemonized but
4755 * the 'logfile' is set to 'stdout' in the configuration file
4756 * it will not log at all. */
4757 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4758 dup2(fd, STDIN_FILENO);
4759 dup2(fd, STDOUT_FILENO);
4760 dup2(fd, STDERR_FILENO);
4761 if (fd > STDERR_FILENO) close(fd);
4762 }
4763 /* Try to write the pid file */
4764 fp = fopen(server.pidfile,"w");
4765 if (fp) {
4766 fprintf(fp,"%d\n",getpid());
4767 fclose(fp);
4768 }
4769 }
4770
4771 int main(int argc, char **argv) {
4772 initServerConfig();
4773 if (argc == 2) {
4774 ResetServerSaveParams();
4775 loadServerConfig(argv[1]);
4776 } else if (argc > 2) {
4777 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4778 exit(1);
4779 } else {
4780 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4781 }
4782 initServer();
4783 if (server.daemonize) daemonize();
4784 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4785 #ifdef __linux__
4786 linuxOvercommitMemoryWarning();
4787 #endif
4788 if (rdbLoad(server.dbfilename) == REDIS_OK)
4789 redisLog(REDIS_NOTICE,"DB loaded from disk");
4790 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4791 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4792 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4793 aeMain(server.el);
4794 aeDeleteEventLoop(server.el);
4795 return 0;
4796 }