]> git.saurik.com Git - redis.git/blob - redis.c
MSET fixed, was not able to replace keys already set for a stupid bug
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.001"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60
61 #include "redis.h"
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
70
71 /* Error codes */
72 #define REDIS_OK 0
73 #define REDIS_ERR -1
74
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
88
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
91
92 /* Command flags */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
100
101 /* Object types */
102 #define REDIS_STRING 0
103 #define REDIS_LIST 1
104 #define REDIS_SET 2
105 #define REDIS_HASH 3
106
107 /* Objects encoding */
108 #define REDIS_ENCODING_RAW 0 /* Raw representation */
109 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
110
111 /* Object types only used for dumping to disk */
112 #define REDIS_EXPIRETIME 253
113 #define REDIS_SELECTDB 254
114 #define REDIS_EOF 255
115
116 /* Defines related to the dump file format. To store 32 bits lengths for short
117 * keys requires a lot of space, so we check the most significant 2 bits of
118 * the first byte to interpreter the length:
119 *
120 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
121 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
122 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
123 * 11|000000 this means: specially encoded object will follow. The six bits
124 * number specify the kind of object that follows.
125 * See the REDIS_RDB_ENC_* defines.
126 *
127 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
128 * values, will fit inside. */
129 #define REDIS_RDB_6BITLEN 0
130 #define REDIS_RDB_14BITLEN 1
131 #define REDIS_RDB_32BITLEN 2
132 #define REDIS_RDB_ENCVAL 3
133 #define REDIS_RDB_LENERR UINT_MAX
134
135 /* When a length of a string object stored on disk has the first two bits
136 * set, the remaining two bits specify a special encoding for the object
137 * accordingly to the following defines: */
138 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
139 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
140 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
141 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
142
143 /* Client flags */
144 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
145 #define REDIS_SLAVE 2 /* This client is a slave server */
146 #define REDIS_MASTER 4 /* This client is a master server */
147 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
148
149 /* Slave replication state - slave side */
150 #define REDIS_REPL_NONE 0 /* No active replication */
151 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
152 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
153
154 /* Slave replication state - from the point of view of master
155 * Note that in SEND_BULK and ONLINE state the slave receives new updates
156 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
157 * to start the next background saving in order to send updates to it. */
158 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
159 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
160 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
161 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
162
163 /* List related stuff */
164 #define REDIS_HEAD 0
165 #define REDIS_TAIL 1
166
167 /* Sort operations */
168 #define REDIS_SORT_GET 0
169 #define REDIS_SORT_DEL 1
170 #define REDIS_SORT_INCR 2
171 #define REDIS_SORT_DECR 3
172 #define REDIS_SORT_ASC 4
173 #define REDIS_SORT_DESC 5
174 #define REDIS_SORTKEY_MAX 1024
175
176 /* Log levels */
177 #define REDIS_DEBUG 0
178 #define REDIS_NOTICE 1
179 #define REDIS_WARNING 2
180
181 /* Anti-warning macro... */
182 #define REDIS_NOTUSED(V) ((void) V)
183
184
185 /*================================= Data types ============================== */
186
187 /* A redis object, that is a type able to hold a string / list / set */
188 typedef struct redisObject {
189 void *ptr;
190 unsigned char type;
191 unsigned char encoding;
192 unsigned char notused[2];
193 int refcount;
194 } robj;
195
196 typedef struct redisDb {
197 dict *dict;
198 dict *expires;
199 int id;
200 } redisDb;
201
202 /* With multiplexing we need to take per-clinet state.
203 * Clients are taken in a liked list. */
204 typedef struct redisClient {
205 int fd;
206 redisDb *db;
207 int dictid;
208 sds querybuf;
209 robj **argv, **mbargv;
210 int argc, mbargc;
211 int bulklen; /* bulk read len. -1 if not in bulk read mode */
212 int multibulk; /* multi bulk command format active */
213 list *reply;
214 int sentlen;
215 time_t lastinteraction; /* time of the last interaction, used for timeout */
216 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
217 int slaveseldb; /* slave selected db, if this client is a slave */
218 int authenticated; /* when requirepass is non-NULL */
219 int replstate; /* replication state if this is a slave */
220 int repldbfd; /* replication DB file descriptor */
221 long repldboff; /* replication DB file offset */
222 off_t repldbsize; /* replication DB file size */
223 } redisClient;
224
225 struct saveparam {
226 time_t seconds;
227 int changes;
228 };
229
230 /* Global server state structure */
231 struct redisServer {
232 int port;
233 int fd;
234 redisDb *db;
235 dict *sharingpool;
236 unsigned int sharingpoolsize;
237 long long dirty; /* changes to DB from the last save */
238 list *clients;
239 list *slaves, *monitors;
240 char neterr[ANET_ERR_LEN];
241 aeEventLoop *el;
242 int cronloops; /* number of times the cron function run */
243 list *objfreelist; /* A list of freed objects to avoid malloc() */
244 time_t lastsave; /* Unix time of last save succeeede */
245 size_t usedmemory; /* Used memory in megabytes */
246 /* Fields used only for stats */
247 time_t stat_starttime; /* server start time */
248 long long stat_numcommands; /* number of processed commands */
249 long long stat_numconnections; /* number of connections received */
250 /* Configuration */
251 int verbosity;
252 int glueoutputbuf;
253 int maxidletime;
254 int dbnum;
255 int daemonize;
256 char *pidfile;
257 int bgsaveinprogress;
258 pid_t bgsavechildpid;
259 struct saveparam *saveparams;
260 int saveparamslen;
261 char *logfile;
262 char *bindaddr;
263 char *dbfilename;
264 char *requirepass;
265 int shareobjects;
266 /* Replication related */
267 int isslave;
268 char *masterhost;
269 int masterport;
270 redisClient *master; /* client that is master for this slave */
271 int replstate;
272 unsigned int maxclients;
273 unsigned long maxmemory;
274 /* Sort parameters - qsort_r() is only available under BSD so we
275 * have to take this state global, in order to pass it to sortCompare() */
276 int sort_desc;
277 int sort_alpha;
278 int sort_bypattern;
279 };
280
281 typedef void redisCommandProc(redisClient *c);
282 struct redisCommand {
283 char *name;
284 redisCommandProc *proc;
285 int arity;
286 int flags;
287 };
288
289 struct redisFunctionSym {
290 char *name;
291 unsigned long pointer;
292 };
293
294 typedef struct _redisSortObject {
295 robj *obj;
296 union {
297 double score;
298 robj *cmpobj;
299 } u;
300 } redisSortObject;
301
302 typedef struct _redisSortOperation {
303 int type;
304 robj *pattern;
305 } redisSortOperation;
306
307 struct sharedObjectsStruct {
308 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
309 *colon, *nullbulk, *nullmultibulk,
310 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
311 *outofrangeerr, *plus,
312 *select0, *select1, *select2, *select3, *select4,
313 *select5, *select6, *select7, *select8, *select9;
314 } shared;
315
316 /*================================ Prototypes =============================== */
317
318 static void freeStringObject(robj *o);
319 static void freeListObject(robj *o);
320 static void freeSetObject(robj *o);
321 static void decrRefCount(void *o);
322 static robj *createObject(int type, void *ptr);
323 static void freeClient(redisClient *c);
324 static int rdbLoad(char *filename);
325 static void addReply(redisClient *c, robj *obj);
326 static void addReplySds(redisClient *c, sds s);
327 static void incrRefCount(robj *o);
328 static int rdbSaveBackground(char *filename);
329 static robj *createStringObject(char *ptr, size_t len);
330 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
331 static int syncWithMaster(void);
332 static robj *tryObjectSharing(robj *o);
333 static int tryObjectEncoding(robj *o);
334 static robj *getDecodedObject(const robj *o);
335 static int removeExpire(redisDb *db, robj *key);
336 static int expireIfNeeded(redisDb *db, robj *key);
337 static int deleteIfVolatile(redisDb *db, robj *key);
338 static int deleteKey(redisDb *db, robj *key);
339 static time_t getExpire(redisDb *db, robj *key);
340 static int setExpire(redisDb *db, robj *key, time_t when);
341 static void updateSlavesWaitingBgsave(int bgsaveerr);
342 static void freeMemoryIfNeeded(void);
343 static int processCommand(redisClient *c);
344 static void setupSigSegvAction(void);
345 static void rdbRemoveTempFile(pid_t childpid);
346 static size_t stringObjectLen(robj *o);
347 static void processInputBuffer(redisClient *c);
348
349 static void authCommand(redisClient *c);
350 static void pingCommand(redisClient *c);
351 static void echoCommand(redisClient *c);
352 static void setCommand(redisClient *c);
353 static void setnxCommand(redisClient *c);
354 static void getCommand(redisClient *c);
355 static void delCommand(redisClient *c);
356 static void existsCommand(redisClient *c);
357 static void incrCommand(redisClient *c);
358 static void decrCommand(redisClient *c);
359 static void incrbyCommand(redisClient *c);
360 static void decrbyCommand(redisClient *c);
361 static void selectCommand(redisClient *c);
362 static void randomkeyCommand(redisClient *c);
363 static void keysCommand(redisClient *c);
364 static void dbsizeCommand(redisClient *c);
365 static void lastsaveCommand(redisClient *c);
366 static void saveCommand(redisClient *c);
367 static void bgsaveCommand(redisClient *c);
368 static void shutdownCommand(redisClient *c);
369 static void moveCommand(redisClient *c);
370 static void renameCommand(redisClient *c);
371 static void renamenxCommand(redisClient *c);
372 static void lpushCommand(redisClient *c);
373 static void rpushCommand(redisClient *c);
374 static void lpopCommand(redisClient *c);
375 static void rpopCommand(redisClient *c);
376 static void llenCommand(redisClient *c);
377 static void lindexCommand(redisClient *c);
378 static void lrangeCommand(redisClient *c);
379 static void ltrimCommand(redisClient *c);
380 static void typeCommand(redisClient *c);
381 static void lsetCommand(redisClient *c);
382 static void saddCommand(redisClient *c);
383 static void sremCommand(redisClient *c);
384 static void smoveCommand(redisClient *c);
385 static void sismemberCommand(redisClient *c);
386 static void scardCommand(redisClient *c);
387 static void spopCommand(redisClient *c);
388 static void sinterCommand(redisClient *c);
389 static void sinterstoreCommand(redisClient *c);
390 static void sunionCommand(redisClient *c);
391 static void sunionstoreCommand(redisClient *c);
392 static void sdiffCommand(redisClient *c);
393 static void sdiffstoreCommand(redisClient *c);
394 static void syncCommand(redisClient *c);
395 static void flushdbCommand(redisClient *c);
396 static void flushallCommand(redisClient *c);
397 static void sortCommand(redisClient *c);
398 static void lremCommand(redisClient *c);
399 static void infoCommand(redisClient *c);
400 static void mgetCommand(redisClient *c);
401 static void monitorCommand(redisClient *c);
402 static void expireCommand(redisClient *c);
403 static void getsetCommand(redisClient *c);
404 static void ttlCommand(redisClient *c);
405 static void slaveofCommand(redisClient *c);
406 static void debugCommand(redisClient *c);
407 static void msetCommand(redisClient *c);
408 static void msetnxCommand(redisClient *c);
409
410 /*================================= Globals ================================= */
411
412 /* Global vars */
413 static struct redisServer server; /* server global state */
414 static struct redisCommand cmdTable[] = {
415 {"get",getCommand,2,REDIS_CMD_INLINE},
416 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
417 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
418 {"del",delCommand,-2,REDIS_CMD_INLINE},
419 {"exists",existsCommand,2,REDIS_CMD_INLINE},
420 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
421 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
422 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
423 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
424 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
425 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
426 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
427 {"llen",llenCommand,2,REDIS_CMD_INLINE},
428 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
429 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
430 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
431 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
432 {"lrem",lremCommand,4,REDIS_CMD_BULK},
433 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
434 {"srem",sremCommand,3,REDIS_CMD_BULK},
435 {"smove",smoveCommand,4,REDIS_CMD_BULK},
436 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
437 {"scard",scardCommand,2,REDIS_CMD_INLINE},
438 {"spop",spopCommand,2,REDIS_CMD_INLINE},
439 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
440 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
441 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
442 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
443 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
444 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
445 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
446 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
447 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
448 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
449 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
450 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
451 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
452 {"select",selectCommand,2,REDIS_CMD_INLINE},
453 {"move",moveCommand,3,REDIS_CMD_INLINE},
454 {"rename",renameCommand,3,REDIS_CMD_INLINE},
455 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
456 {"expire",expireCommand,3,REDIS_CMD_INLINE},
457 {"keys",keysCommand,2,REDIS_CMD_INLINE},
458 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
459 {"auth",authCommand,2,REDIS_CMD_INLINE},
460 {"ping",pingCommand,1,REDIS_CMD_INLINE},
461 {"echo",echoCommand,2,REDIS_CMD_BULK},
462 {"save",saveCommand,1,REDIS_CMD_INLINE},
463 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
464 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
465 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
466 {"type",typeCommand,2,REDIS_CMD_INLINE},
467 {"sync",syncCommand,1,REDIS_CMD_INLINE},
468 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
469 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
470 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
471 {"info",infoCommand,1,REDIS_CMD_INLINE},
472 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
473 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
474 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
475 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
476 {NULL,NULL,0,0}
477 };
478 /*============================ Utility functions ============================ */
479
480 /* Glob-style pattern matching. */
481 int stringmatchlen(const char *pattern, int patternLen,
482 const char *string, int stringLen, int nocase)
483 {
484 while(patternLen) {
485 switch(pattern[0]) {
486 case '*':
487 while (pattern[1] == '*') {
488 pattern++;
489 patternLen--;
490 }
491 if (patternLen == 1)
492 return 1; /* match */
493 while(stringLen) {
494 if (stringmatchlen(pattern+1, patternLen-1,
495 string, stringLen, nocase))
496 return 1; /* match */
497 string++;
498 stringLen--;
499 }
500 return 0; /* no match */
501 break;
502 case '?':
503 if (stringLen == 0)
504 return 0; /* no match */
505 string++;
506 stringLen--;
507 break;
508 case '[':
509 {
510 int not, match;
511
512 pattern++;
513 patternLen--;
514 not = pattern[0] == '^';
515 if (not) {
516 pattern++;
517 patternLen--;
518 }
519 match = 0;
520 while(1) {
521 if (pattern[0] == '\\') {
522 pattern++;
523 patternLen--;
524 if (pattern[0] == string[0])
525 match = 1;
526 } else if (pattern[0] == ']') {
527 break;
528 } else if (patternLen == 0) {
529 pattern--;
530 patternLen++;
531 break;
532 } else if (pattern[1] == '-' && patternLen >= 3) {
533 int start = pattern[0];
534 int end = pattern[2];
535 int c = string[0];
536 if (start > end) {
537 int t = start;
538 start = end;
539 end = t;
540 }
541 if (nocase) {
542 start = tolower(start);
543 end = tolower(end);
544 c = tolower(c);
545 }
546 pattern += 2;
547 patternLen -= 2;
548 if (c >= start && c <= end)
549 match = 1;
550 } else {
551 if (!nocase) {
552 if (pattern[0] == string[0])
553 match = 1;
554 } else {
555 if (tolower((int)pattern[0]) == tolower((int)string[0]))
556 match = 1;
557 }
558 }
559 pattern++;
560 patternLen--;
561 }
562 if (not)
563 match = !match;
564 if (!match)
565 return 0; /* no match */
566 string++;
567 stringLen--;
568 break;
569 }
570 case '\\':
571 if (patternLen >= 2) {
572 pattern++;
573 patternLen--;
574 }
575 /* fall through */
576 default:
577 if (!nocase) {
578 if (pattern[0] != string[0])
579 return 0; /* no match */
580 } else {
581 if (tolower((int)pattern[0]) != tolower((int)string[0]))
582 return 0; /* no match */
583 }
584 string++;
585 stringLen--;
586 break;
587 }
588 pattern++;
589 patternLen--;
590 if (stringLen == 0) {
591 while(*pattern == '*') {
592 pattern++;
593 patternLen--;
594 }
595 break;
596 }
597 }
598 if (patternLen == 0 && stringLen == 0)
599 return 1;
600 return 0;
601 }
602
603 static void redisLog(int level, const char *fmt, ...) {
604 va_list ap;
605 FILE *fp;
606
607 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
608 if (!fp) return;
609
610 va_start(ap, fmt);
611 if (level >= server.verbosity) {
612 char *c = ".-*";
613 char buf[64];
614 time_t now;
615
616 now = time(NULL);
617 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
618 fprintf(fp,"%s %c ",buf,c[level]);
619 vfprintf(fp, fmt, ap);
620 fprintf(fp,"\n");
621 fflush(fp);
622 }
623 va_end(ap);
624
625 if (server.logfile) fclose(fp);
626 }
627
628 /*====================== Hash table type implementation ==================== */
629
630 /* This is an hash table type that uses the SDS dynamic strings libary as
631 * keys and radis objects as values (objects can hold SDS strings,
632 * lists, sets). */
633
634 static int sdsDictKeyCompare(void *privdata, const void *key1,
635 const void *key2)
636 {
637 int l1,l2;
638 DICT_NOTUSED(privdata);
639
640 l1 = sdslen((sds)key1);
641 l2 = sdslen((sds)key2);
642 if (l1 != l2) return 0;
643 return memcmp(key1, key2, l1) == 0;
644 }
645
646 static void dictRedisObjectDestructor(void *privdata, void *val)
647 {
648 DICT_NOTUSED(privdata);
649
650 decrRefCount(val);
651 }
652
653 static int dictObjKeyCompare(void *privdata, const void *key1,
654 const void *key2)
655 {
656 const robj *o1 = key1, *o2 = key2;
657 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
658 }
659
660 static unsigned int dictObjHash(const void *key) {
661 const robj *o = key;
662 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
663 }
664
665 static int dictEncObjKeyCompare(void *privdata, const void *key1,
666 const void *key2)
667 {
668 const robj *o1 = key1, *o2 = key2;
669
670 if (o1->encoding == REDIS_ENCODING_RAW &&
671 o2->encoding == REDIS_ENCODING_RAW)
672 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
673 else {
674 robj *dec1, *dec2;
675 int cmp;
676
677 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
678 getDecodedObject(o1) : (robj*)o1;
679 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
680 getDecodedObject(o2) : (robj*)o2;
681 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
682 if (dec1 != o1) decrRefCount(dec1);
683 if (dec2 != o2) decrRefCount(dec2);
684 return cmp;
685 }
686 }
687
688 static unsigned int dictEncObjHash(const void *key) {
689 const robj *o = key;
690
691 if (o->encoding == REDIS_ENCODING_RAW)
692 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
693 else {
694 robj *dec = getDecodedObject(o);
695 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
696 decrRefCount(dec);
697 return hash;
698 }
699 }
700
701 static dictType setDictType = {
702 dictEncObjHash, /* hash function */
703 NULL, /* key dup */
704 NULL, /* val dup */
705 dictEncObjKeyCompare, /* key compare */
706 dictRedisObjectDestructor, /* key destructor */
707 NULL /* val destructor */
708 };
709
710 static dictType hashDictType = {
711 dictObjHash, /* hash function */
712 NULL, /* key dup */
713 NULL, /* val dup */
714 dictObjKeyCompare, /* key compare */
715 dictRedisObjectDestructor, /* key destructor */
716 dictRedisObjectDestructor /* val destructor */
717 };
718
719 /* ========================= Random utility functions ======================= */
720
721 /* Redis generally does not try to recover from out of memory conditions
722 * when allocating objects or strings, it is not clear if it will be possible
723 * to report this condition to the client since the networking layer itself
724 * is based on heap allocation for send buffers, so we simply abort.
725 * At least the code will be simpler to read... */
726 static void oom(const char *msg) {
727 fprintf(stderr, "%s: Out of memory\n",msg);
728 fflush(stderr);
729 sleep(1);
730 abort();
731 }
732
733 /* ====================== Redis server networking stuff ===================== */
734 static void closeTimedoutClients(void) {
735 redisClient *c;
736 listNode *ln;
737 time_t now = time(NULL);
738
739 listRewind(server.clients);
740 while ((ln = listYield(server.clients)) != NULL) {
741 c = listNodeValue(ln);
742 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
743 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
744 (now - c->lastinteraction > server.maxidletime)) {
745 redisLog(REDIS_DEBUG,"Closing idle client");
746 freeClient(c);
747 }
748 }
749 }
750
751 static int htNeedsResize(dict *dict) {
752 long long size, used;
753
754 size = dictSlots(dict);
755 used = dictSize(dict);
756 return (size && used && size > DICT_HT_INITIAL_SIZE &&
757 (used*100/size < REDIS_HT_MINFILL));
758 }
759
760 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
761 * we resize the hash table to save memory */
762 static void tryResizeHashTables(void) {
763 int j;
764
765 for (j = 0; j < server.dbnum; j++) {
766 if (htNeedsResize(server.db[j].dict)) {
767 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
768 dictResize(server.db[j].dict);
769 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
770 }
771 if (htNeedsResize(server.db[j].expires))
772 dictResize(server.db[j].expires);
773 }
774 }
775
776 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
777 int j, loops = server.cronloops++;
778 REDIS_NOTUSED(eventLoop);
779 REDIS_NOTUSED(id);
780 REDIS_NOTUSED(clientData);
781
782 /* Update the global state with the amount of used memory */
783 server.usedmemory = zmalloc_used_memory();
784
785 /* Show some info about non-empty databases */
786 for (j = 0; j < server.dbnum; j++) {
787 long long size, used, vkeys;
788
789 size = dictSlots(server.db[j].dict);
790 used = dictSize(server.db[j].dict);
791 vkeys = dictSize(server.db[j].expires);
792 if (!(loops % 5) && (used || vkeys)) {
793 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
794 /* dictPrintStats(server.dict); */
795 }
796 }
797
798 /* We don't want to resize the hash tables while a bacground saving
799 * is in progress: the saving child is created using fork() that is
800 * implemented with a copy-on-write semantic in most modern systems, so
801 * if we resize the HT while there is the saving child at work actually
802 * a lot of memory movements in the parent will cause a lot of pages
803 * copied. */
804 if (!server.bgsaveinprogress) tryResizeHashTables();
805
806 /* Show information about connected clients */
807 if (!(loops % 5)) {
808 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
809 listLength(server.clients)-listLength(server.slaves),
810 listLength(server.slaves),
811 server.usedmemory,
812 dictSize(server.sharingpool));
813 }
814
815 /* Close connections of timedout clients */
816 if (server.maxidletime && !(loops % 10))
817 closeTimedoutClients();
818
819 /* Check if a background saving in progress terminated */
820 if (server.bgsaveinprogress) {
821 int statloc;
822 if (wait4(-1,&statloc,WNOHANG,NULL)) {
823 int exitcode = WEXITSTATUS(statloc);
824 int bysignal = WIFSIGNALED(statloc);
825
826 if (!bysignal && exitcode == 0) {
827 redisLog(REDIS_NOTICE,
828 "Background saving terminated with success");
829 server.dirty = 0;
830 server.lastsave = time(NULL);
831 } else if (!bysignal && exitcode != 0) {
832 redisLog(REDIS_WARNING, "Background saving error");
833 } else {
834 redisLog(REDIS_WARNING,
835 "Background saving terminated by signal");
836 rdbRemoveTempFile(server.bgsavechildpid);
837 }
838 server.bgsaveinprogress = 0;
839 server.bgsavechildpid = -1;
840 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
841 }
842 } else {
843 /* If there is not a background saving in progress check if
844 * we have to save now */
845 time_t now = time(NULL);
846 for (j = 0; j < server.saveparamslen; j++) {
847 struct saveparam *sp = server.saveparams+j;
848
849 if (server.dirty >= sp->changes &&
850 now-server.lastsave > sp->seconds) {
851 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
852 sp->changes, sp->seconds);
853 rdbSaveBackground(server.dbfilename);
854 break;
855 }
856 }
857 }
858
859 /* Try to expire a few timed out keys */
860 for (j = 0; j < server.dbnum; j++) {
861 redisDb *db = server.db+j;
862 int num = dictSize(db->expires);
863
864 if (num) {
865 time_t now = time(NULL);
866
867 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
868 num = REDIS_EXPIRELOOKUPS_PER_CRON;
869 while (num--) {
870 dictEntry *de;
871 time_t t;
872
873 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
874 t = (time_t) dictGetEntryVal(de);
875 if (now > t) {
876 deleteKey(db,dictGetEntryKey(de));
877 }
878 }
879 }
880 }
881
882 /* Check if we should connect to a MASTER */
883 if (server.replstate == REDIS_REPL_CONNECT) {
884 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
885 if (syncWithMaster() == REDIS_OK) {
886 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
887 }
888 }
889 return 1000;
890 }
891
892 static void createSharedObjects(void) {
893 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
894 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
895 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
896 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
897 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
898 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
899 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
900 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
901 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
902 /* no such key */
903 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
904 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
905 "-ERR Operation against a key holding the wrong kind of value\r\n"));
906 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
907 "-ERR no such key\r\n"));
908 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
909 "-ERR syntax error\r\n"));
910 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
911 "-ERR source and destination objects are the same\r\n"));
912 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
913 "-ERR index out of range\r\n"));
914 shared.space = createObject(REDIS_STRING,sdsnew(" "));
915 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
916 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
917 shared.select0 = createStringObject("select 0\r\n",10);
918 shared.select1 = createStringObject("select 1\r\n",10);
919 shared.select2 = createStringObject("select 2\r\n",10);
920 shared.select3 = createStringObject("select 3\r\n",10);
921 shared.select4 = createStringObject("select 4\r\n",10);
922 shared.select5 = createStringObject("select 5\r\n",10);
923 shared.select6 = createStringObject("select 6\r\n",10);
924 shared.select7 = createStringObject("select 7\r\n",10);
925 shared.select8 = createStringObject("select 8\r\n",10);
926 shared.select9 = createStringObject("select 9\r\n",10);
927 }
928
929 static void appendServerSaveParams(time_t seconds, int changes) {
930 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
931 if (server.saveparams == NULL) oom("appendServerSaveParams");
932 server.saveparams[server.saveparamslen].seconds = seconds;
933 server.saveparams[server.saveparamslen].changes = changes;
934 server.saveparamslen++;
935 }
936
937 static void ResetServerSaveParams() {
938 zfree(server.saveparams);
939 server.saveparams = NULL;
940 server.saveparamslen = 0;
941 }
942
943 static void initServerConfig() {
944 server.dbnum = REDIS_DEFAULT_DBNUM;
945 server.port = REDIS_SERVERPORT;
946 server.verbosity = REDIS_DEBUG;
947 server.maxidletime = REDIS_MAXIDLETIME;
948 server.saveparams = NULL;
949 server.logfile = NULL; /* NULL = log on standard output */
950 server.bindaddr = NULL;
951 server.glueoutputbuf = 1;
952 server.daemonize = 0;
953 server.pidfile = "/var/run/redis.pid";
954 server.dbfilename = "dump.rdb";
955 server.requirepass = NULL;
956 server.shareobjects = 0;
957 server.sharingpoolsize = 1024;
958 server.maxclients = 0;
959 server.maxmemory = 0;
960 ResetServerSaveParams();
961
962 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
963 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
964 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
965 /* Replication related */
966 server.isslave = 0;
967 server.masterhost = NULL;
968 server.masterport = 6379;
969 server.master = NULL;
970 server.replstate = REDIS_REPL_NONE;
971 }
972
973 static void initServer() {
974 int j;
975
976 signal(SIGHUP, SIG_IGN);
977 signal(SIGPIPE, SIG_IGN);
978 setupSigSegvAction();
979
980 server.clients = listCreate();
981 server.slaves = listCreate();
982 server.monitors = listCreate();
983 server.objfreelist = listCreate();
984 createSharedObjects();
985 server.el = aeCreateEventLoop();
986 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
987 server.sharingpool = dictCreate(&setDictType,NULL);
988 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
989 oom("server initialization"); /* Fatal OOM */
990 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
991 if (server.fd == -1) {
992 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
993 exit(1);
994 }
995 for (j = 0; j < server.dbnum; j++) {
996 server.db[j].dict = dictCreate(&hashDictType,NULL);
997 server.db[j].expires = dictCreate(&setDictType,NULL);
998 server.db[j].id = j;
999 }
1000 server.cronloops = 0;
1001 server.bgsaveinprogress = 0;
1002 server.bgsavechildpid = -1;
1003 server.lastsave = time(NULL);
1004 server.dirty = 0;
1005 server.usedmemory = 0;
1006 server.stat_numcommands = 0;
1007 server.stat_numconnections = 0;
1008 server.stat_starttime = time(NULL);
1009 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1010 }
1011
1012 /* Empty the whole database */
1013 static long long emptyDb() {
1014 int j;
1015 long long removed = 0;
1016
1017 for (j = 0; j < server.dbnum; j++) {
1018 removed += dictSize(server.db[j].dict);
1019 dictEmpty(server.db[j].dict);
1020 dictEmpty(server.db[j].expires);
1021 }
1022 return removed;
1023 }
1024
1025 static int yesnotoi(char *s) {
1026 if (!strcasecmp(s,"yes")) return 1;
1027 else if (!strcasecmp(s,"no")) return 0;
1028 else return -1;
1029 }
1030
1031 /* I agree, this is a very rudimental way to load a configuration...
1032 will improve later if the config gets more complex */
1033 static void loadServerConfig(char *filename) {
1034 FILE *fp;
1035 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1036 int linenum = 0;
1037 sds line = NULL;
1038
1039 if (filename[0] == '-' && filename[1] == '\0')
1040 fp = stdin;
1041 else {
1042 if ((fp = fopen(filename,"r")) == NULL) {
1043 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1044 exit(1);
1045 }
1046 }
1047
1048 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1049 sds *argv;
1050 int argc, j;
1051
1052 linenum++;
1053 line = sdsnew(buf);
1054 line = sdstrim(line," \t\r\n");
1055
1056 /* Skip comments and blank lines*/
1057 if (line[0] == '#' || line[0] == '\0') {
1058 sdsfree(line);
1059 continue;
1060 }
1061
1062 /* Split into arguments */
1063 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1064 sdstolower(argv[0]);
1065
1066 /* Execute config directives */
1067 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1068 server.maxidletime = atoi(argv[1]);
1069 if (server.maxidletime < 0) {
1070 err = "Invalid timeout value"; goto loaderr;
1071 }
1072 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1073 server.port = atoi(argv[1]);
1074 if (server.port < 1 || server.port > 65535) {
1075 err = "Invalid port"; goto loaderr;
1076 }
1077 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1078 server.bindaddr = zstrdup(argv[1]);
1079 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1080 int seconds = atoi(argv[1]);
1081 int changes = atoi(argv[2]);
1082 if (seconds < 1 || changes < 0) {
1083 err = "Invalid save parameters"; goto loaderr;
1084 }
1085 appendServerSaveParams(seconds,changes);
1086 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1087 if (chdir(argv[1]) == -1) {
1088 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1089 argv[1], strerror(errno));
1090 exit(1);
1091 }
1092 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1093 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1094 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1095 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1096 else {
1097 err = "Invalid log level. Must be one of debug, notice, warning";
1098 goto loaderr;
1099 }
1100 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1101 FILE *logfp;
1102
1103 server.logfile = zstrdup(argv[1]);
1104 if (!strcasecmp(server.logfile,"stdout")) {
1105 zfree(server.logfile);
1106 server.logfile = NULL;
1107 }
1108 if (server.logfile) {
1109 /* Test if we are able to open the file. The server will not
1110 * be able to abort just for this problem later... */
1111 logfp = fopen(server.logfile,"a");
1112 if (logfp == NULL) {
1113 err = sdscatprintf(sdsempty(),
1114 "Can't open the log file: %s", strerror(errno));
1115 goto loaderr;
1116 }
1117 fclose(logfp);
1118 }
1119 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1120 server.dbnum = atoi(argv[1]);
1121 if (server.dbnum < 1) {
1122 err = "Invalid number of databases"; goto loaderr;
1123 }
1124 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1125 server.maxclients = atoi(argv[1]);
1126 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1127 server.maxmemory = strtoll(argv[1], NULL, 10);
1128 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1129 server.masterhost = sdsnew(argv[1]);
1130 server.masterport = atoi(argv[2]);
1131 server.replstate = REDIS_REPL_CONNECT;
1132 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1133 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1134 err = "argument must be 'yes' or 'no'"; goto loaderr;
1135 }
1136 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1137 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1138 err = "argument must be 'yes' or 'no'"; goto loaderr;
1139 }
1140 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1141 server.sharingpoolsize = atoi(argv[1]);
1142 if (server.sharingpoolsize < 1) {
1143 err = "invalid object sharing pool size"; goto loaderr;
1144 }
1145 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1146 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1147 err = "argument must be 'yes' or 'no'"; goto loaderr;
1148 }
1149 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1150 server.requirepass = zstrdup(argv[1]);
1151 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1152 server.pidfile = zstrdup(argv[1]);
1153 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1154 server.dbfilename = zstrdup(argv[1]);
1155 } else {
1156 err = "Bad directive or wrong number of arguments"; goto loaderr;
1157 }
1158 for (j = 0; j < argc; j++)
1159 sdsfree(argv[j]);
1160 zfree(argv);
1161 sdsfree(line);
1162 }
1163 if (fp != stdin) fclose(fp);
1164 return;
1165
1166 loaderr:
1167 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1168 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1169 fprintf(stderr, ">>> '%s'\n", line);
1170 fprintf(stderr, "%s\n", err);
1171 exit(1);
1172 }
1173
1174 static void freeClientArgv(redisClient *c) {
1175 int j;
1176
1177 for (j = 0; j < c->argc; j++)
1178 decrRefCount(c->argv[j]);
1179 for (j = 0; j < c->mbargc; j++)
1180 decrRefCount(c->mbargv[j]);
1181 c->argc = 0;
1182 c->mbargc = 0;
1183 }
1184
1185 static void freeClient(redisClient *c) {
1186 listNode *ln;
1187
1188 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1189 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1190 sdsfree(c->querybuf);
1191 listRelease(c->reply);
1192 freeClientArgv(c);
1193 close(c->fd);
1194 ln = listSearchKey(server.clients,c);
1195 assert(ln != NULL);
1196 listDelNode(server.clients,ln);
1197 if (c->flags & REDIS_SLAVE) {
1198 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1199 close(c->repldbfd);
1200 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1201 ln = listSearchKey(l,c);
1202 assert(ln != NULL);
1203 listDelNode(l,ln);
1204 }
1205 if (c->flags & REDIS_MASTER) {
1206 server.master = NULL;
1207 server.replstate = REDIS_REPL_CONNECT;
1208 }
1209 zfree(c->argv);
1210 zfree(c->mbargv);
1211 zfree(c);
1212 }
1213
1214 static void glueReplyBuffersIfNeeded(redisClient *c) {
1215 int totlen = 0;
1216 listNode *ln;
1217 robj *o;
1218
1219 listRewind(c->reply);
1220 while((ln = listYield(c->reply))) {
1221 o = ln->value;
1222 totlen += sdslen(o->ptr);
1223 /* This optimization makes more sense if we don't have to copy
1224 * too much data */
1225 if (totlen > 1024) return;
1226 }
1227 if (totlen > 0) {
1228 char buf[1024];
1229 int copylen = 0;
1230
1231 listRewind(c->reply);
1232 while((ln = listYield(c->reply))) {
1233 o = ln->value;
1234 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1235 copylen += sdslen(o->ptr);
1236 listDelNode(c->reply,ln);
1237 }
1238 /* Now the output buffer is empty, add the new single element */
1239 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1240 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1241 }
1242 }
1243
1244 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1245 redisClient *c = privdata;
1246 int nwritten = 0, totwritten = 0, objlen;
1247 robj *o;
1248 REDIS_NOTUSED(el);
1249 REDIS_NOTUSED(mask);
1250
1251 if (server.glueoutputbuf && listLength(c->reply) > 1)
1252 glueReplyBuffersIfNeeded(c);
1253 while(listLength(c->reply)) {
1254 o = listNodeValue(listFirst(c->reply));
1255 objlen = sdslen(o->ptr);
1256
1257 if (objlen == 0) {
1258 listDelNode(c->reply,listFirst(c->reply));
1259 continue;
1260 }
1261
1262 if (c->flags & REDIS_MASTER) {
1263 /* Don't reply to a master */
1264 nwritten = objlen - c->sentlen;
1265 } else {
1266 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1267 if (nwritten <= 0) break;
1268 }
1269 c->sentlen += nwritten;
1270 totwritten += nwritten;
1271 /* If we fully sent the object on head go to the next one */
1272 if (c->sentlen == objlen) {
1273 listDelNode(c->reply,listFirst(c->reply));
1274 c->sentlen = 0;
1275 }
1276 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1277 * bytes, in a single threaded server it's a good idea to server
1278 * other clients as well, even if a very large request comes from
1279 * super fast link that is always able to accept data (in real world
1280 * terms think to 'KEYS *' against the loopback interfae) */
1281 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1282 }
1283 if (nwritten == -1) {
1284 if (errno == EAGAIN) {
1285 nwritten = 0;
1286 } else {
1287 redisLog(REDIS_DEBUG,
1288 "Error writing to client: %s", strerror(errno));
1289 freeClient(c);
1290 return;
1291 }
1292 }
1293 if (totwritten > 0) c->lastinteraction = time(NULL);
1294 if (listLength(c->reply) == 0) {
1295 c->sentlen = 0;
1296 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1297 }
1298 }
1299
1300 static struct redisCommand *lookupCommand(char *name) {
1301 int j = 0;
1302 while(cmdTable[j].name != NULL) {
1303 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1304 j++;
1305 }
1306 return NULL;
1307 }
1308
1309 /* resetClient prepare the client to process the next command */
1310 static void resetClient(redisClient *c) {
1311 freeClientArgv(c);
1312 c->bulklen = -1;
1313 c->multibulk = 0;
1314 }
1315
1316 /* If this function gets called we already read a whole
1317 * command, argments are in the client argv/argc fields.
1318 * processCommand() execute the command or prepare the
1319 * server for a bulk read from the client.
1320 *
1321 * If 1 is returned the client is still alive and valid and
1322 * and other operations can be performed by the caller. Otherwise
1323 * if 0 is returned the client was destroied (i.e. after QUIT). */
1324 static int processCommand(redisClient *c) {
1325 struct redisCommand *cmd;
1326 long long dirty;
1327
1328 /* Free some memory if needed (maxmemory setting) */
1329 if (server.maxmemory) freeMemoryIfNeeded();
1330
1331 /* Handle the multi bulk command type. This is an alternative protocol
1332 * supported by Redis in order to receive commands that are composed of
1333 * multiple binary-safe "bulk" arguments. The latency of processing is
1334 * a bit higher but this allows things like multi-sets, so if this
1335 * protocol is used only for MSET and similar commands this is a big win. */
1336 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1337 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1338 if (c->multibulk <= 0) {
1339 resetClient(c);
1340 return 1;
1341 } else {
1342 decrRefCount(c->argv[c->argc-1]);
1343 c->argc--;
1344 return 1;
1345 }
1346 } else if (c->multibulk) {
1347 if (c->bulklen == -1) {
1348 if (((char*)c->argv[0]->ptr)[0] != '$') {
1349 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1350 resetClient(c);
1351 return 1;
1352 } else {
1353 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1354 decrRefCount(c->argv[0]);
1355 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1356 c->argc--;
1357 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1358 resetClient(c);
1359 return 1;
1360 }
1361 c->argc--;
1362 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1363 return 1;
1364 }
1365 } else {
1366 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1367 c->mbargv[c->mbargc] = c->argv[0];
1368 c->mbargc++;
1369 c->argc--;
1370 c->multibulk--;
1371 if (c->multibulk == 0) {
1372 robj **auxargv;
1373 int auxargc;
1374
1375 /* Here we need to swap the multi-bulk argc/argv with the
1376 * normal argc/argv of the client structure. */
1377 auxargv = c->argv;
1378 c->argv = c->mbargv;
1379 c->mbargv = auxargv;
1380
1381 auxargc = c->argc;
1382 c->argc = c->mbargc;
1383 c->mbargc = auxargc;
1384
1385 /* We need to set bulklen to something different than -1
1386 * in order for the code below to process the command without
1387 * to try to read the last argument of a bulk command as
1388 * a special argument. */
1389 c->bulklen = 0;
1390 /* continue below and process the command */
1391 } else {
1392 c->bulklen = -1;
1393 return 1;
1394 }
1395 }
1396 }
1397 /* -- end of multi bulk commands processing -- */
1398
1399 /* The QUIT command is handled as a special case. Normal command
1400 * procs are unable to close the client connection safely */
1401 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1402 freeClient(c);
1403 return 0;
1404 }
1405 cmd = lookupCommand(c->argv[0]->ptr);
1406 if (!cmd) {
1407 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1408 resetClient(c);
1409 return 1;
1410 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1411 (c->argc < -cmd->arity)) {
1412 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1413 resetClient(c);
1414 return 1;
1415 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1416 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1417 resetClient(c);
1418 return 1;
1419 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1420 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1421
1422 decrRefCount(c->argv[c->argc-1]);
1423 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1424 c->argc--;
1425 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1426 resetClient(c);
1427 return 1;
1428 }
1429 c->argc--;
1430 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1431 /* It is possible that the bulk read is already in the
1432 * buffer. Check this condition and handle it accordingly.
1433 * This is just a fast path, alternative to call processInputBuffer().
1434 * It's a good idea since the code is small and this condition
1435 * happens most of the times. */
1436 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1437 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1438 c->argc++;
1439 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1440 } else {
1441 return 1;
1442 }
1443 }
1444 /* Let's try to share objects on the command arguments vector */
1445 if (server.shareobjects) {
1446 int j;
1447 for(j = 1; j < c->argc; j++)
1448 c->argv[j] = tryObjectSharing(c->argv[j]);
1449 }
1450 /* Let's try to encode the bulk object to save space. */
1451 if (cmd->flags & REDIS_CMD_BULK)
1452 tryObjectEncoding(c->argv[c->argc-1]);
1453
1454 /* Check if the user is authenticated */
1455 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1456 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1457 resetClient(c);
1458 return 1;
1459 }
1460
1461 /* Exec the command */
1462 dirty = server.dirty;
1463 cmd->proc(c);
1464 if (server.dirty-dirty != 0 && listLength(server.slaves))
1465 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1466 if (listLength(server.monitors))
1467 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1468 server.stat_numcommands++;
1469
1470 /* Prepare the client for the next command */
1471 if (c->flags & REDIS_CLOSE) {
1472 freeClient(c);
1473 return 0;
1474 }
1475 resetClient(c);
1476 return 1;
1477 }
1478
1479 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1480 listNode *ln;
1481 int outc = 0, j;
1482 robj **outv;
1483 /* (args*2)+1 is enough room for args, spaces, newlines */
1484 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1485
1486 if (argc <= REDIS_STATIC_ARGS) {
1487 outv = static_outv;
1488 } else {
1489 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1490 if (!outv) oom("replicationFeedSlaves");
1491 }
1492
1493 for (j = 0; j < argc; j++) {
1494 if (j != 0) outv[outc++] = shared.space;
1495 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1496 robj *lenobj;
1497
1498 lenobj = createObject(REDIS_STRING,
1499 sdscatprintf(sdsempty(),"%d\r\n",
1500 stringObjectLen(argv[j])));
1501 lenobj->refcount = 0;
1502 outv[outc++] = lenobj;
1503 }
1504 outv[outc++] = argv[j];
1505 }
1506 outv[outc++] = shared.crlf;
1507
1508 /* Increment all the refcounts at start and decrement at end in order to
1509 * be sure to free objects if there is no slave in a replication state
1510 * able to be feed with commands */
1511 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1512 listRewind(slaves);
1513 while((ln = listYield(slaves))) {
1514 redisClient *slave = ln->value;
1515
1516 /* Don't feed slaves that are still waiting for BGSAVE to start */
1517 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1518
1519 /* Feed all the other slaves, MONITORs and so on */
1520 if (slave->slaveseldb != dictid) {
1521 robj *selectcmd;
1522
1523 switch(dictid) {
1524 case 0: selectcmd = shared.select0; break;
1525 case 1: selectcmd = shared.select1; break;
1526 case 2: selectcmd = shared.select2; break;
1527 case 3: selectcmd = shared.select3; break;
1528 case 4: selectcmd = shared.select4; break;
1529 case 5: selectcmd = shared.select5; break;
1530 case 6: selectcmd = shared.select6; break;
1531 case 7: selectcmd = shared.select7; break;
1532 case 8: selectcmd = shared.select8; break;
1533 case 9: selectcmd = shared.select9; break;
1534 default:
1535 selectcmd = createObject(REDIS_STRING,
1536 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1537 selectcmd->refcount = 0;
1538 break;
1539 }
1540 addReply(slave,selectcmd);
1541 slave->slaveseldb = dictid;
1542 }
1543 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1544 }
1545 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1546 if (outv != static_outv) zfree(outv);
1547 }
1548
1549 static void processInputBuffer(redisClient *c) {
1550 again:
1551 if (c->bulklen == -1) {
1552 /* Read the first line of the query */
1553 char *p = strchr(c->querybuf,'\n');
1554 size_t querylen;
1555
1556 if (p) {
1557 sds query, *argv;
1558 int argc, j;
1559
1560 query = c->querybuf;
1561 c->querybuf = sdsempty();
1562 querylen = 1+(p-(query));
1563 if (sdslen(query) > querylen) {
1564 /* leave data after the first line of the query in the buffer */
1565 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1566 }
1567 *p = '\0'; /* remove "\n" */
1568 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1569 sdsupdatelen(query);
1570
1571 /* Now we can split the query in arguments */
1572 if (sdslen(query) == 0) {
1573 /* Ignore empty query */
1574 sdsfree(query);
1575 return;
1576 }
1577 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1578 if (argv == NULL) oom("sdssplitlen");
1579 sdsfree(query);
1580
1581 if (c->argv) zfree(c->argv);
1582 c->argv = zmalloc(sizeof(robj*)*argc);
1583 if (c->argv == NULL) oom("allocating arguments list for client");
1584
1585 for (j = 0; j < argc; j++) {
1586 if (sdslen(argv[j])) {
1587 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1588 c->argc++;
1589 } else {
1590 sdsfree(argv[j]);
1591 }
1592 }
1593 zfree(argv);
1594 /* Execute the command. If the client is still valid
1595 * after processCommand() return and there is something
1596 * on the query buffer try to process the next command. */
1597 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1598 return;
1599 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1600 redisLog(REDIS_DEBUG, "Client protocol error");
1601 freeClient(c);
1602 return;
1603 }
1604 } else {
1605 /* Bulk read handling. Note that if we are at this point
1606 the client already sent a command terminated with a newline,
1607 we are reading the bulk data that is actually the last
1608 argument of the command. */
1609 int qbl = sdslen(c->querybuf);
1610
1611 if (c->bulklen <= qbl) {
1612 /* Copy everything but the final CRLF as final argument */
1613 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1614 c->argc++;
1615 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1616 /* Process the command. If the client is still valid after
1617 * the processing and there is more data in the buffer
1618 * try to parse it. */
1619 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1620 return;
1621 }
1622 }
1623 }
1624
1625 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1626 redisClient *c = (redisClient*) privdata;
1627 char buf[REDIS_IOBUF_LEN];
1628 int nread;
1629 REDIS_NOTUSED(el);
1630 REDIS_NOTUSED(mask);
1631
1632 nread = read(fd, buf, REDIS_IOBUF_LEN);
1633 if (nread == -1) {
1634 if (errno == EAGAIN) {
1635 nread = 0;
1636 } else {
1637 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1638 freeClient(c);
1639 return;
1640 }
1641 } else if (nread == 0) {
1642 redisLog(REDIS_DEBUG, "Client closed connection");
1643 freeClient(c);
1644 return;
1645 }
1646 if (nread) {
1647 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1648 c->lastinteraction = time(NULL);
1649 } else {
1650 return;
1651 }
1652 processInputBuffer(c);
1653 }
1654
1655 static int selectDb(redisClient *c, int id) {
1656 if (id < 0 || id >= server.dbnum)
1657 return REDIS_ERR;
1658 c->db = &server.db[id];
1659 return REDIS_OK;
1660 }
1661
1662 static void *dupClientReplyValue(void *o) {
1663 incrRefCount((robj*)o);
1664 return 0;
1665 }
1666
1667 static redisClient *createClient(int fd) {
1668 redisClient *c = zmalloc(sizeof(*c));
1669
1670 anetNonBlock(NULL,fd);
1671 anetTcpNoDelay(NULL,fd);
1672 if (!c) return NULL;
1673 selectDb(c,0);
1674 c->fd = fd;
1675 c->querybuf = sdsempty();
1676 c->argc = 0;
1677 c->argv = NULL;
1678 c->bulklen = -1;
1679 c->multibulk = 0;
1680 c->mbargc = 0;
1681 c->mbargv = NULL;
1682 c->sentlen = 0;
1683 c->flags = 0;
1684 c->lastinteraction = time(NULL);
1685 c->authenticated = 0;
1686 c->replstate = REDIS_REPL_NONE;
1687 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1688 listSetFreeMethod(c->reply,decrRefCount);
1689 listSetDupMethod(c->reply,dupClientReplyValue);
1690 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1691 readQueryFromClient, c, NULL) == AE_ERR) {
1692 freeClient(c);
1693 return NULL;
1694 }
1695 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1696 return c;
1697 }
1698
1699 static void addReply(redisClient *c, robj *obj) {
1700 if (listLength(c->reply) == 0 &&
1701 (c->replstate == REDIS_REPL_NONE ||
1702 c->replstate == REDIS_REPL_ONLINE) &&
1703 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1704 sendReplyToClient, c, NULL) == AE_ERR) return;
1705 if (obj->encoding != REDIS_ENCODING_RAW) {
1706 obj = getDecodedObject(obj);
1707 } else {
1708 incrRefCount(obj);
1709 }
1710 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1711 }
1712
1713 static void addReplySds(redisClient *c, sds s) {
1714 robj *o = createObject(REDIS_STRING,s);
1715 addReply(c,o);
1716 decrRefCount(o);
1717 }
1718
1719 static void addReplyBulkLen(redisClient *c, robj *obj) {
1720 size_t len;
1721
1722 if (obj->encoding == REDIS_ENCODING_RAW) {
1723 len = sdslen(obj->ptr);
1724 } else {
1725 long n = (long)obj->ptr;
1726
1727 len = 1;
1728 if (n < 0) {
1729 len++;
1730 n = -n;
1731 }
1732 while((n = n/10) != 0) {
1733 len++;
1734 }
1735 }
1736 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1737 }
1738
1739 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1740 int cport, cfd;
1741 char cip[128];
1742 redisClient *c;
1743 REDIS_NOTUSED(el);
1744 REDIS_NOTUSED(mask);
1745 REDIS_NOTUSED(privdata);
1746
1747 cfd = anetAccept(server.neterr, fd, cip, &cport);
1748 if (cfd == AE_ERR) {
1749 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1750 return;
1751 }
1752 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1753 if ((c = createClient(cfd)) == NULL) {
1754 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1755 close(cfd); /* May be already closed, just ingore errors */
1756 return;
1757 }
1758 /* If maxclient directive is set and this is one client more... close the
1759 * connection. Note that we create the client instead to check before
1760 * for this condition, since now the socket is already set in nonblocking
1761 * mode and we can send an error for free using the Kernel I/O */
1762 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1763 char *err = "-ERR max number of clients reached\r\n";
1764
1765 /* That's a best effort error message, don't check write errors */
1766 (void) write(c->fd,err,strlen(err));
1767 freeClient(c);
1768 return;
1769 }
1770 server.stat_numconnections++;
1771 }
1772
1773 /* ======================= Redis objects implementation ===================== */
1774
1775 static robj *createObject(int type, void *ptr) {
1776 robj *o;
1777
1778 if (listLength(server.objfreelist)) {
1779 listNode *head = listFirst(server.objfreelist);
1780 o = listNodeValue(head);
1781 listDelNode(server.objfreelist,head);
1782 } else {
1783 o = zmalloc(sizeof(*o));
1784 }
1785 if (!o) oom("createObject");
1786 o->type = type;
1787 o->encoding = REDIS_ENCODING_RAW;
1788 o->ptr = ptr;
1789 o->refcount = 1;
1790 return o;
1791 }
1792
1793 static robj *createStringObject(char *ptr, size_t len) {
1794 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1795 }
1796
1797 static robj *createListObject(void) {
1798 list *l = listCreate();
1799
1800 if (!l) oom("listCreate");
1801 listSetFreeMethod(l,decrRefCount);
1802 return createObject(REDIS_LIST,l);
1803 }
1804
1805 static robj *createSetObject(void) {
1806 dict *d = dictCreate(&setDictType,NULL);
1807 if (!d) oom("dictCreate");
1808 return createObject(REDIS_SET,d);
1809 }
1810
1811 static void freeStringObject(robj *o) {
1812 if (o->encoding == REDIS_ENCODING_RAW) {
1813 sdsfree(o->ptr);
1814 }
1815 }
1816
1817 static void freeListObject(robj *o) {
1818 listRelease((list*) o->ptr);
1819 }
1820
1821 static void freeSetObject(robj *o) {
1822 dictRelease((dict*) o->ptr);
1823 }
1824
1825 static void freeHashObject(robj *o) {
1826 dictRelease((dict*) o->ptr);
1827 }
1828
1829 static void incrRefCount(robj *o) {
1830 o->refcount++;
1831 #ifdef DEBUG_REFCOUNT
1832 if (o->type == REDIS_STRING)
1833 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1834 #endif
1835 }
1836
1837 static void decrRefCount(void *obj) {
1838 robj *o = obj;
1839
1840 #ifdef DEBUG_REFCOUNT
1841 if (o->type == REDIS_STRING)
1842 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1843 #endif
1844 if (--(o->refcount) == 0) {
1845 switch(o->type) {
1846 case REDIS_STRING: freeStringObject(o); break;
1847 case REDIS_LIST: freeListObject(o); break;
1848 case REDIS_SET: freeSetObject(o); break;
1849 case REDIS_HASH: freeHashObject(o); break;
1850 default: assert(0 != 0); break;
1851 }
1852 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1853 !listAddNodeHead(server.objfreelist,o))
1854 zfree(o);
1855 }
1856 }
1857
1858 static robj *lookupKey(redisDb *db, robj *key) {
1859 dictEntry *de = dictFind(db->dict,key);
1860 return de ? dictGetEntryVal(de) : NULL;
1861 }
1862
1863 static robj *lookupKeyRead(redisDb *db, robj *key) {
1864 expireIfNeeded(db,key);
1865 return lookupKey(db,key);
1866 }
1867
1868 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1869 deleteIfVolatile(db,key);
1870 return lookupKey(db,key);
1871 }
1872
1873 static int deleteKey(redisDb *db, robj *key) {
1874 int retval;
1875
1876 /* We need to protect key from destruction: after the first dictDelete()
1877 * it may happen that 'key' is no longer valid if we don't increment
1878 * it's count. This may happen when we get the object reference directly
1879 * from the hash table with dictRandomKey() or dict iterators */
1880 incrRefCount(key);
1881 if (dictSize(db->expires)) dictDelete(db->expires,key);
1882 retval = dictDelete(db->dict,key);
1883 decrRefCount(key);
1884
1885 return retval == DICT_OK;
1886 }
1887
1888 /* Try to share an object against the shared objects pool */
1889 static robj *tryObjectSharing(robj *o) {
1890 struct dictEntry *de;
1891 unsigned long c;
1892
1893 if (o == NULL || server.shareobjects == 0) return o;
1894
1895 assert(o->type == REDIS_STRING);
1896 de = dictFind(server.sharingpool,o);
1897 if (de) {
1898 robj *shared = dictGetEntryKey(de);
1899
1900 c = ((unsigned long) dictGetEntryVal(de))+1;
1901 dictGetEntryVal(de) = (void*) c;
1902 incrRefCount(shared);
1903 decrRefCount(o);
1904 return shared;
1905 } else {
1906 /* Here we are using a stream algorihtm: Every time an object is
1907 * shared we increment its count, everytime there is a miss we
1908 * recrement the counter of a random object. If this object reaches
1909 * zero we remove the object and put the current object instead. */
1910 if (dictSize(server.sharingpool) >=
1911 server.sharingpoolsize) {
1912 de = dictGetRandomKey(server.sharingpool);
1913 assert(de != NULL);
1914 c = ((unsigned long) dictGetEntryVal(de))-1;
1915 dictGetEntryVal(de) = (void*) c;
1916 if (c == 0) {
1917 dictDelete(server.sharingpool,de->key);
1918 }
1919 } else {
1920 c = 0; /* If the pool is empty we want to add this object */
1921 }
1922 if (c == 0) {
1923 int retval;
1924
1925 retval = dictAdd(server.sharingpool,o,(void*)1);
1926 assert(retval == DICT_OK);
1927 incrRefCount(o);
1928 }
1929 return o;
1930 }
1931 }
1932
1933 /* Check if the nul-terminated string 's' can be represented by a long
1934 * (that is, is a number that fits into long without any other space or
1935 * character before or after the digits).
1936 *
1937 * If so, the function returns REDIS_OK and *longval is set to the value
1938 * of the number. Otherwise REDIS_ERR is returned */
1939 static int isStringRepresentableAsLong(char *s, long *longval) {
1940 char buf[32], *endptr;
1941 long value;
1942 int slen;
1943
1944 value = strtol(s, &endptr, 10);
1945 if (endptr[0] != '\0') return REDIS_ERR;
1946 slen = snprintf(buf,32,"%ld",value);
1947
1948 /* If the number converted back into a string is not identical
1949 * then it's not possible to encode the string as integer */
1950 if (strlen(buf) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
1951 if (longval) *longval = value;
1952 return REDIS_OK;
1953 }
1954
1955 /* Try to encode a string object in order to save space */
1956 static int tryObjectEncoding(robj *o) {
1957 long value;
1958 sds s = o->ptr;
1959
1960 if (o->encoding != REDIS_ENCODING_RAW)
1961 return REDIS_ERR; /* Already encoded */
1962
1963 /* It's not save to encode shared objects: shared objects can be shared
1964 * everywhere in the "object space" of Redis. Encoded objects can only
1965 * appear as "values" (and not, for instance, as keys) */
1966 if (o->refcount > 1) return REDIS_ERR;
1967
1968 /* Currently we try to encode only strings */
1969 assert(o->type == REDIS_STRING);
1970
1971 /* Check if we can represent this string as a long integer */
1972 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
1973
1974 /* Ok, this object can be encoded */
1975 o->encoding = REDIS_ENCODING_INT;
1976 sdsfree(o->ptr);
1977 o->ptr = (void*) value;
1978 return REDIS_OK;
1979 }
1980
1981 /* Get a decoded version of an encoded object (returned as a new object) */
1982 static robj *getDecodedObject(const robj *o) {
1983 robj *dec;
1984
1985 assert(o->encoding != REDIS_ENCODING_RAW);
1986 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
1987 char buf[32];
1988
1989 snprintf(buf,32,"%ld",(long)o->ptr);
1990 dec = createStringObject(buf,strlen(buf));
1991 return dec;
1992 } else {
1993 assert(1 != 1);
1994 }
1995 }
1996
1997 static int compareStringObjects(robj *a, robj *b) {
1998 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
1999
2000 if (a->encoding == REDIS_ENCODING_INT && b->encoding == REDIS_ENCODING_INT){
2001 return (long)a->ptr - (long)b->ptr;
2002 } else {
2003 int retval;
2004
2005 incrRefCount(a);
2006 incrRefCount(b);
2007 if (a->encoding != REDIS_ENCODING_RAW) a = getDecodedObject(a);
2008 if (b->encoding != REDIS_ENCODING_RAW) b = getDecodedObject(a);
2009 retval = sdscmp(a->ptr,b->ptr);
2010 decrRefCount(a);
2011 decrRefCount(b);
2012 return retval;
2013 }
2014 }
2015
2016 static size_t stringObjectLen(robj *o) {
2017 assert(o->type == REDIS_STRING);
2018 if (o->encoding == REDIS_ENCODING_RAW) {
2019 return sdslen(o->ptr);
2020 } else {
2021 char buf[32];
2022
2023 return snprintf(buf,32,"%ld",(long)o->ptr);
2024 }
2025 }
2026
2027 /*============================ DB saving/loading ============================ */
2028
2029 static int rdbSaveType(FILE *fp, unsigned char type) {
2030 if (fwrite(&type,1,1,fp) == 0) return -1;
2031 return 0;
2032 }
2033
2034 static int rdbSaveTime(FILE *fp, time_t t) {
2035 int32_t t32 = (int32_t) t;
2036 if (fwrite(&t32,4,1,fp) == 0) return -1;
2037 return 0;
2038 }
2039
2040 /* check rdbLoadLen() comments for more info */
2041 static int rdbSaveLen(FILE *fp, uint32_t len) {
2042 unsigned char buf[2];
2043
2044 if (len < (1<<6)) {
2045 /* Save a 6 bit len */
2046 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2047 if (fwrite(buf,1,1,fp) == 0) return -1;
2048 } else if (len < (1<<14)) {
2049 /* Save a 14 bit len */
2050 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2051 buf[1] = len&0xFF;
2052 if (fwrite(buf,2,1,fp) == 0) return -1;
2053 } else {
2054 /* Save a 32 bit len */
2055 buf[0] = (REDIS_RDB_32BITLEN<<6);
2056 if (fwrite(buf,1,1,fp) == 0) return -1;
2057 len = htonl(len);
2058 if (fwrite(&len,4,1,fp) == 0) return -1;
2059 }
2060 return 0;
2061 }
2062
2063 /* String objects in the form "2391" "-100" without any space and with a
2064 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2065 * encoded as integers to save space */
2066 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2067 long long value;
2068 char *endptr, buf[32];
2069
2070 /* Check if it's possible to encode this value as a number */
2071 value = strtoll(s, &endptr, 10);
2072 if (endptr[0] != '\0') return 0;
2073 snprintf(buf,32,"%lld",value);
2074
2075 /* If the number converted back into a string is not identical
2076 * then it's not possible to encode the string as integer */
2077 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2078
2079 /* Finally check if it fits in our ranges */
2080 if (value >= -(1<<7) && value <= (1<<7)-1) {
2081 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2082 enc[1] = value&0xFF;
2083 return 2;
2084 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2085 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2086 enc[1] = value&0xFF;
2087 enc[2] = (value>>8)&0xFF;
2088 return 3;
2089 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2090 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2091 enc[1] = value&0xFF;
2092 enc[2] = (value>>8)&0xFF;
2093 enc[3] = (value>>16)&0xFF;
2094 enc[4] = (value>>24)&0xFF;
2095 return 5;
2096 } else {
2097 return 0;
2098 }
2099 }
2100
2101 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2102 unsigned int comprlen, outlen;
2103 unsigned char byte;
2104 void *out;
2105
2106 /* We require at least four bytes compression for this to be worth it */
2107 outlen = sdslen(obj->ptr)-4;
2108 if (outlen <= 0) return 0;
2109 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2110 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2111 if (comprlen == 0) {
2112 zfree(out);
2113 return 0;
2114 }
2115 /* Data compressed! Let's save it on disk */
2116 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2117 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2118 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2119 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2120 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2121 zfree(out);
2122 return comprlen;
2123
2124 writeerr:
2125 zfree(out);
2126 return -1;
2127 }
2128
2129 /* Save a string objet as [len][data] on disk. If the object is a string
2130 * representation of an integer value we try to safe it in a special form */
2131 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2132 size_t len;
2133 int enclen;
2134
2135 len = sdslen(obj->ptr);
2136
2137 /* Try integer encoding */
2138 if (len <= 11) {
2139 unsigned char buf[5];
2140 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2141 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2142 return 0;
2143 }
2144 }
2145
2146 /* Try LZF compression - under 20 bytes it's unable to compress even
2147 * aaaaaaaaaaaaaaaaaa so skip it */
2148 if (len > 20) {
2149 int retval;
2150
2151 retval = rdbSaveLzfStringObject(fp,obj);
2152 if (retval == -1) return -1;
2153 if (retval > 0) return 0;
2154 /* retval == 0 means data can't be compressed, save the old way */
2155 }
2156
2157 /* Store verbatim */
2158 if (rdbSaveLen(fp,len) == -1) return -1;
2159 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2160 return 0;
2161 }
2162
2163 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2164 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2165 int retval;
2166 robj *dec;
2167
2168 if (obj->encoding != REDIS_ENCODING_RAW) {
2169 dec = getDecodedObject(obj);
2170 retval = rdbSaveStringObjectRaw(fp,dec);
2171 decrRefCount(dec);
2172 return retval;
2173 } else {
2174 return rdbSaveStringObjectRaw(fp,obj);
2175 }
2176 }
2177
2178 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2179 static int rdbSave(char *filename) {
2180 dictIterator *di = NULL;
2181 dictEntry *de;
2182 FILE *fp;
2183 char tmpfile[256];
2184 int j;
2185 time_t now = time(NULL);
2186
2187 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2188 fp = fopen(tmpfile,"w");
2189 if (!fp) {
2190 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2191 return REDIS_ERR;
2192 }
2193 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2194 for (j = 0; j < server.dbnum; j++) {
2195 redisDb *db = server.db+j;
2196 dict *d = db->dict;
2197 if (dictSize(d) == 0) continue;
2198 di = dictGetIterator(d);
2199 if (!di) {
2200 fclose(fp);
2201 return REDIS_ERR;
2202 }
2203
2204 /* Write the SELECT DB opcode */
2205 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2206 if (rdbSaveLen(fp,j) == -1) goto werr;
2207
2208 /* Iterate this DB writing every entry */
2209 while((de = dictNext(di)) != NULL) {
2210 robj *key = dictGetEntryKey(de);
2211 robj *o = dictGetEntryVal(de);
2212 time_t expiretime = getExpire(db,key);
2213
2214 /* Save the expire time */
2215 if (expiretime != -1) {
2216 /* If this key is already expired skip it */
2217 if (expiretime < now) continue;
2218 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2219 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2220 }
2221 /* Save the key and associated value */
2222 if (rdbSaveType(fp,o->type) == -1) goto werr;
2223 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2224 if (o->type == REDIS_STRING) {
2225 /* Save a string value */
2226 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2227 } else if (o->type == REDIS_LIST) {
2228 /* Save a list value */
2229 list *list = o->ptr;
2230 listNode *ln;
2231
2232 listRewind(list);
2233 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2234 while((ln = listYield(list))) {
2235 robj *eleobj = listNodeValue(ln);
2236
2237 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2238 }
2239 } else if (o->type == REDIS_SET) {
2240 /* Save a set value */
2241 dict *set = o->ptr;
2242 dictIterator *di = dictGetIterator(set);
2243 dictEntry *de;
2244
2245 if (!set) oom("dictGetIteraotr");
2246 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2247 while((de = dictNext(di)) != NULL) {
2248 robj *eleobj = dictGetEntryKey(de);
2249
2250 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2251 }
2252 dictReleaseIterator(di);
2253 } else {
2254 assert(0 != 0);
2255 }
2256 }
2257 dictReleaseIterator(di);
2258 }
2259 /* EOF opcode */
2260 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2261
2262 /* Make sure data will not remain on the OS's output buffers */
2263 fflush(fp);
2264 fsync(fileno(fp));
2265 fclose(fp);
2266
2267 /* Use RENAME to make sure the DB file is changed atomically only
2268 * if the generate DB file is ok. */
2269 if (rename(tmpfile,filename) == -1) {
2270 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
2271 unlink(tmpfile);
2272 return REDIS_ERR;
2273 }
2274 redisLog(REDIS_NOTICE,"DB saved on disk");
2275 server.dirty = 0;
2276 server.lastsave = time(NULL);
2277 return REDIS_OK;
2278
2279 werr:
2280 fclose(fp);
2281 unlink(tmpfile);
2282 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2283 if (di) dictReleaseIterator(di);
2284 return REDIS_ERR;
2285 }
2286
2287 static int rdbSaveBackground(char *filename) {
2288 pid_t childpid;
2289
2290 if (server.bgsaveinprogress) return REDIS_ERR;
2291 if ((childpid = fork()) == 0) {
2292 /* Child */
2293 close(server.fd);
2294 if (rdbSave(filename) == REDIS_OK) {
2295 exit(0);
2296 } else {
2297 exit(1);
2298 }
2299 } else {
2300 /* Parent */
2301 if (childpid == -1) {
2302 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2303 strerror(errno));
2304 return REDIS_ERR;
2305 }
2306 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2307 server.bgsaveinprogress = 1;
2308 server.bgsavechildpid = childpid;
2309 return REDIS_OK;
2310 }
2311 return REDIS_OK; /* unreached */
2312 }
2313
2314 static void rdbRemoveTempFile(pid_t childpid) {
2315 char tmpfile[256];
2316
2317 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2318 unlink(tmpfile);
2319 }
2320
2321 static int rdbLoadType(FILE *fp) {
2322 unsigned char type;
2323 if (fread(&type,1,1,fp) == 0) return -1;
2324 return type;
2325 }
2326
2327 static time_t rdbLoadTime(FILE *fp) {
2328 int32_t t32;
2329 if (fread(&t32,4,1,fp) == 0) return -1;
2330 return (time_t) t32;
2331 }
2332
2333 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2334 * of this file for a description of how this are stored on disk.
2335 *
2336 * isencoded is set to 1 if the readed length is not actually a length but
2337 * an "encoding type", check the above comments for more info */
2338 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2339 unsigned char buf[2];
2340 uint32_t len;
2341
2342 if (isencoded) *isencoded = 0;
2343 if (rdbver == 0) {
2344 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2345 return ntohl(len);
2346 } else {
2347 int type;
2348
2349 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2350 type = (buf[0]&0xC0)>>6;
2351 if (type == REDIS_RDB_6BITLEN) {
2352 /* Read a 6 bit len */
2353 return buf[0]&0x3F;
2354 } else if (type == REDIS_RDB_ENCVAL) {
2355 /* Read a 6 bit len encoding type */
2356 if (isencoded) *isencoded = 1;
2357 return buf[0]&0x3F;
2358 } else if (type == REDIS_RDB_14BITLEN) {
2359 /* Read a 14 bit len */
2360 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2361 return ((buf[0]&0x3F)<<8)|buf[1];
2362 } else {
2363 /* Read a 32 bit len */
2364 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2365 return ntohl(len);
2366 }
2367 }
2368 }
2369
2370 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2371 unsigned char enc[4];
2372 long long val;
2373
2374 if (enctype == REDIS_RDB_ENC_INT8) {
2375 if (fread(enc,1,1,fp) == 0) return NULL;
2376 val = (signed char)enc[0];
2377 } else if (enctype == REDIS_RDB_ENC_INT16) {
2378 uint16_t v;
2379 if (fread(enc,2,1,fp) == 0) return NULL;
2380 v = enc[0]|(enc[1]<<8);
2381 val = (int16_t)v;
2382 } else if (enctype == REDIS_RDB_ENC_INT32) {
2383 uint32_t v;
2384 if (fread(enc,4,1,fp) == 0) return NULL;
2385 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2386 val = (int32_t)v;
2387 } else {
2388 val = 0; /* anti-warning */
2389 assert(0!=0);
2390 }
2391 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2392 }
2393
2394 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2395 unsigned int len, clen;
2396 unsigned char *c = NULL;
2397 sds val = NULL;
2398
2399 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2400 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2401 if ((c = zmalloc(clen)) == NULL) goto err;
2402 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2403 if (fread(c,clen,1,fp) == 0) goto err;
2404 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2405 zfree(c);
2406 return createObject(REDIS_STRING,val);
2407 err:
2408 zfree(c);
2409 sdsfree(val);
2410 return NULL;
2411 }
2412
2413 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2414 int isencoded;
2415 uint32_t len;
2416 sds val;
2417
2418 len = rdbLoadLen(fp,rdbver,&isencoded);
2419 if (isencoded) {
2420 switch(len) {
2421 case REDIS_RDB_ENC_INT8:
2422 case REDIS_RDB_ENC_INT16:
2423 case REDIS_RDB_ENC_INT32:
2424 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2425 case REDIS_RDB_ENC_LZF:
2426 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2427 default:
2428 assert(0!=0);
2429 }
2430 }
2431
2432 if (len == REDIS_RDB_LENERR) return NULL;
2433 val = sdsnewlen(NULL,len);
2434 if (len && fread(val,len,1,fp) == 0) {
2435 sdsfree(val);
2436 return NULL;
2437 }
2438 return tryObjectSharing(createObject(REDIS_STRING,val));
2439 }
2440
2441 static int rdbLoad(char *filename) {
2442 FILE *fp;
2443 robj *keyobj = NULL;
2444 uint32_t dbid;
2445 int type, retval, rdbver;
2446 dict *d = server.db[0].dict;
2447 redisDb *db = server.db+0;
2448 char buf[1024];
2449 time_t expiretime = -1, now = time(NULL);
2450
2451 fp = fopen(filename,"r");
2452 if (!fp) return REDIS_ERR;
2453 if (fread(buf,9,1,fp) == 0) goto eoferr;
2454 buf[9] = '\0';
2455 if (memcmp(buf,"REDIS",5) != 0) {
2456 fclose(fp);
2457 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2458 return REDIS_ERR;
2459 }
2460 rdbver = atoi(buf+5);
2461 if (rdbver > 1) {
2462 fclose(fp);
2463 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2464 return REDIS_ERR;
2465 }
2466 while(1) {
2467 robj *o;
2468
2469 /* Read type. */
2470 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2471 if (type == REDIS_EXPIRETIME) {
2472 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2473 /* We read the time so we need to read the object type again */
2474 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2475 }
2476 if (type == REDIS_EOF) break;
2477 /* Handle SELECT DB opcode as a special case */
2478 if (type == REDIS_SELECTDB) {
2479 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2480 goto eoferr;
2481 if (dbid >= (unsigned)server.dbnum) {
2482 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2483 exit(1);
2484 }
2485 db = server.db+dbid;
2486 d = db->dict;
2487 continue;
2488 }
2489 /* Read key */
2490 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2491
2492 if (type == REDIS_STRING) {
2493 /* Read string value */
2494 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2495 tryObjectEncoding(o);
2496 } else if (type == REDIS_LIST || type == REDIS_SET) {
2497 /* Read list/set value */
2498 uint32_t listlen;
2499
2500 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2501 goto eoferr;
2502 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2503 /* Load every single element of the list/set */
2504 while(listlen--) {
2505 robj *ele;
2506
2507 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2508 tryObjectEncoding(ele);
2509 if (type == REDIS_LIST) {
2510 if (!listAddNodeTail((list*)o->ptr,ele))
2511 oom("listAddNodeTail");
2512 } else {
2513 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2514 oom("dictAdd");
2515 }
2516 }
2517 } else {
2518 assert(0 != 0);
2519 }
2520 /* Add the new object in the hash table */
2521 retval = dictAdd(d,keyobj,o);
2522 if (retval == DICT_ERR) {
2523 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2524 exit(1);
2525 }
2526 /* Set the expire time if needed */
2527 if (expiretime != -1) {
2528 setExpire(db,keyobj,expiretime);
2529 /* Delete this key if already expired */
2530 if (expiretime < now) deleteKey(db,keyobj);
2531 expiretime = -1;
2532 }
2533 keyobj = o = NULL;
2534 }
2535 fclose(fp);
2536 return REDIS_OK;
2537
2538 eoferr: /* unexpected end of file is handled here with a fatal exit */
2539 if (keyobj) decrRefCount(keyobj);
2540 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2541 exit(1);
2542 return REDIS_ERR; /* Just to avoid warning */
2543 }
2544
2545 /*================================== Commands =============================== */
2546
2547 static void authCommand(redisClient *c) {
2548 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2549 c->authenticated = 1;
2550 addReply(c,shared.ok);
2551 } else {
2552 c->authenticated = 0;
2553 addReply(c,shared.err);
2554 }
2555 }
2556
2557 static void pingCommand(redisClient *c) {
2558 addReply(c,shared.pong);
2559 }
2560
2561 static void echoCommand(redisClient *c) {
2562 addReplyBulkLen(c,c->argv[1]);
2563 addReply(c,c->argv[1]);
2564 addReply(c,shared.crlf);
2565 }
2566
2567 /*=================================== Strings =============================== */
2568
2569 static void setGenericCommand(redisClient *c, int nx) {
2570 int retval;
2571
2572 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2573 if (retval == DICT_ERR) {
2574 if (!nx) {
2575 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2576 incrRefCount(c->argv[2]);
2577 } else {
2578 addReply(c,shared.czero);
2579 return;
2580 }
2581 } else {
2582 incrRefCount(c->argv[1]);
2583 incrRefCount(c->argv[2]);
2584 }
2585 server.dirty++;
2586 removeExpire(c->db,c->argv[1]);
2587 addReply(c, nx ? shared.cone : shared.ok);
2588 }
2589
2590 static void setCommand(redisClient *c) {
2591 setGenericCommand(c,0);
2592 }
2593
2594 static void setnxCommand(redisClient *c) {
2595 setGenericCommand(c,1);
2596 }
2597
2598 static void getCommand(redisClient *c) {
2599 robj *o = lookupKeyRead(c->db,c->argv[1]);
2600
2601 if (o == NULL) {
2602 addReply(c,shared.nullbulk);
2603 } else {
2604 if (o->type != REDIS_STRING) {
2605 addReply(c,shared.wrongtypeerr);
2606 } else {
2607 addReplyBulkLen(c,o);
2608 addReply(c,o);
2609 addReply(c,shared.crlf);
2610 }
2611 }
2612 }
2613
2614 static void getsetCommand(redisClient *c) {
2615 getCommand(c);
2616 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2617 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2618 } else {
2619 incrRefCount(c->argv[1]);
2620 }
2621 incrRefCount(c->argv[2]);
2622 server.dirty++;
2623 removeExpire(c->db,c->argv[1]);
2624 }
2625
2626 static void mgetCommand(redisClient *c) {
2627 int j;
2628
2629 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2630 for (j = 1; j < c->argc; j++) {
2631 robj *o = lookupKeyRead(c->db,c->argv[j]);
2632 if (o == NULL) {
2633 addReply(c,shared.nullbulk);
2634 } else {
2635 if (o->type != REDIS_STRING) {
2636 addReply(c,shared.nullbulk);
2637 } else {
2638 addReplyBulkLen(c,o);
2639 addReply(c,o);
2640 addReply(c,shared.crlf);
2641 }
2642 }
2643 }
2644 }
2645
2646 static void incrDecrCommand(redisClient *c, long long incr) {
2647 long long value;
2648 int retval;
2649 robj *o;
2650
2651 o = lookupKeyWrite(c->db,c->argv[1]);
2652 if (o == NULL) {
2653 value = 0;
2654 } else {
2655 if (o->type != REDIS_STRING) {
2656 value = 0;
2657 } else {
2658 char *eptr;
2659
2660 if (o->encoding == REDIS_ENCODING_RAW)
2661 value = strtoll(o->ptr, &eptr, 10);
2662 else if (o->encoding == REDIS_ENCODING_INT)
2663 value = (long)o->ptr;
2664 else
2665 assert(1 != 1);
2666 }
2667 }
2668
2669 value += incr;
2670 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2671 tryObjectEncoding(o);
2672 retval = dictAdd(c->db->dict,c->argv[1],o);
2673 if (retval == DICT_ERR) {
2674 dictReplace(c->db->dict,c->argv[1],o);
2675 removeExpire(c->db,c->argv[1]);
2676 } else {
2677 incrRefCount(c->argv[1]);
2678 }
2679 server.dirty++;
2680 addReply(c,shared.colon);
2681 addReply(c,o);
2682 addReply(c,shared.crlf);
2683 }
2684
2685 static void incrCommand(redisClient *c) {
2686 incrDecrCommand(c,1);
2687 }
2688
2689 static void decrCommand(redisClient *c) {
2690 incrDecrCommand(c,-1);
2691 }
2692
2693 static void incrbyCommand(redisClient *c) {
2694 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2695 incrDecrCommand(c,incr);
2696 }
2697
2698 static void decrbyCommand(redisClient *c) {
2699 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2700 incrDecrCommand(c,-incr);
2701 }
2702
2703 /* ========================= Type agnostic commands ========================= */
2704
2705 static void delCommand(redisClient *c) {
2706 int deleted = 0, j;
2707
2708 for (j = 1; j < c->argc; j++) {
2709 if (deleteKey(c->db,c->argv[j])) {
2710 server.dirty++;
2711 deleted++;
2712 }
2713 }
2714 switch(deleted) {
2715 case 0:
2716 addReply(c,shared.czero);
2717 break;
2718 case 1:
2719 addReply(c,shared.cone);
2720 break;
2721 default:
2722 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2723 break;
2724 }
2725 }
2726
2727 static void existsCommand(redisClient *c) {
2728 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2729 }
2730
2731 static void selectCommand(redisClient *c) {
2732 int id = atoi(c->argv[1]->ptr);
2733
2734 if (selectDb(c,id) == REDIS_ERR) {
2735 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2736 } else {
2737 addReply(c,shared.ok);
2738 }
2739 }
2740
2741 static void randomkeyCommand(redisClient *c) {
2742 dictEntry *de;
2743
2744 while(1) {
2745 de = dictGetRandomKey(c->db->dict);
2746 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2747 }
2748 if (de == NULL) {
2749 addReply(c,shared.plus);
2750 addReply(c,shared.crlf);
2751 } else {
2752 addReply(c,shared.plus);
2753 addReply(c,dictGetEntryKey(de));
2754 addReply(c,shared.crlf);
2755 }
2756 }
2757
2758 static void keysCommand(redisClient *c) {
2759 dictIterator *di;
2760 dictEntry *de;
2761 sds pattern = c->argv[1]->ptr;
2762 int plen = sdslen(pattern);
2763 int numkeys = 0, keyslen = 0;
2764 robj *lenobj = createObject(REDIS_STRING,NULL);
2765
2766 di = dictGetIterator(c->db->dict);
2767 if (!di) oom("dictGetIterator");
2768 addReply(c,lenobj);
2769 decrRefCount(lenobj);
2770 while((de = dictNext(di)) != NULL) {
2771 robj *keyobj = dictGetEntryKey(de);
2772
2773 sds key = keyobj->ptr;
2774 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2775 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2776 if (expireIfNeeded(c->db,keyobj) == 0) {
2777 if (numkeys != 0)
2778 addReply(c,shared.space);
2779 addReply(c,keyobj);
2780 numkeys++;
2781 keyslen += sdslen(key);
2782 }
2783 }
2784 }
2785 dictReleaseIterator(di);
2786 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2787 addReply(c,shared.crlf);
2788 }
2789
2790 static void dbsizeCommand(redisClient *c) {
2791 addReplySds(c,
2792 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2793 }
2794
2795 static void lastsaveCommand(redisClient *c) {
2796 addReplySds(c,
2797 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2798 }
2799
2800 static void typeCommand(redisClient *c) {
2801 robj *o;
2802 char *type;
2803
2804 o = lookupKeyRead(c->db,c->argv[1]);
2805 if (o == NULL) {
2806 type = "+none";
2807 } else {
2808 switch(o->type) {
2809 case REDIS_STRING: type = "+string"; break;
2810 case REDIS_LIST: type = "+list"; break;
2811 case REDIS_SET: type = "+set"; break;
2812 default: type = "unknown"; break;
2813 }
2814 }
2815 addReplySds(c,sdsnew(type));
2816 addReply(c,shared.crlf);
2817 }
2818
2819 static void saveCommand(redisClient *c) {
2820 if (server.bgsaveinprogress) {
2821 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2822 return;
2823 }
2824 if (rdbSave(server.dbfilename) == REDIS_OK) {
2825 addReply(c,shared.ok);
2826 } else {
2827 addReply(c,shared.err);
2828 }
2829 }
2830
2831 static void bgsaveCommand(redisClient *c) {
2832 if (server.bgsaveinprogress) {
2833 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2834 return;
2835 }
2836 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2837 addReply(c,shared.ok);
2838 } else {
2839 addReply(c,shared.err);
2840 }
2841 }
2842
2843 static void shutdownCommand(redisClient *c) {
2844 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2845 /* Kill the saving child if there is a background saving in progress.
2846 We want to avoid race conditions, for instance our saving child may
2847 overwrite the synchronous saving did by SHUTDOWN. */
2848 if (server.bgsaveinprogress) {
2849 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2850 kill(server.bgsavechildpid,SIGKILL);
2851 rdbRemoveTempFile(server.bgsavechildpid);
2852 }
2853 /* SYNC SAVE */
2854 if (rdbSave(server.dbfilename) == REDIS_OK) {
2855 if (server.daemonize)
2856 unlink(server.pidfile);
2857 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2858 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2859 exit(1);
2860 } else {
2861 /* Ooops.. error saving! The best we can do is to continue operating.
2862 * Note that if there was a background saving process, in the next
2863 * cron() Redis will be notified that the background saving aborted,
2864 * handling special stuff like slaves pending for synchronization... */
2865 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2866 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2867 }
2868 }
2869
2870 static void renameGenericCommand(redisClient *c, int nx) {
2871 robj *o;
2872
2873 /* To use the same key as src and dst is probably an error */
2874 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2875 addReply(c,shared.sameobjecterr);
2876 return;
2877 }
2878
2879 o = lookupKeyWrite(c->db,c->argv[1]);
2880 if (o == NULL) {
2881 addReply(c,shared.nokeyerr);
2882 return;
2883 }
2884 incrRefCount(o);
2885 deleteIfVolatile(c->db,c->argv[2]);
2886 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2887 if (nx) {
2888 decrRefCount(o);
2889 addReply(c,shared.czero);
2890 return;
2891 }
2892 dictReplace(c->db->dict,c->argv[2],o);
2893 } else {
2894 incrRefCount(c->argv[2]);
2895 }
2896 deleteKey(c->db,c->argv[1]);
2897 server.dirty++;
2898 addReply(c,nx ? shared.cone : shared.ok);
2899 }
2900
2901 static void renameCommand(redisClient *c) {
2902 renameGenericCommand(c,0);
2903 }
2904
2905 static void renamenxCommand(redisClient *c) {
2906 renameGenericCommand(c,1);
2907 }
2908
2909 static void moveCommand(redisClient *c) {
2910 robj *o;
2911 redisDb *src, *dst;
2912 int srcid;
2913
2914 /* Obtain source and target DB pointers */
2915 src = c->db;
2916 srcid = c->db->id;
2917 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2918 addReply(c,shared.outofrangeerr);
2919 return;
2920 }
2921 dst = c->db;
2922 selectDb(c,srcid); /* Back to the source DB */
2923
2924 /* If the user is moving using as target the same
2925 * DB as the source DB it is probably an error. */
2926 if (src == dst) {
2927 addReply(c,shared.sameobjecterr);
2928 return;
2929 }
2930
2931 /* Check if the element exists and get a reference */
2932 o = lookupKeyWrite(c->db,c->argv[1]);
2933 if (!o) {
2934 addReply(c,shared.czero);
2935 return;
2936 }
2937
2938 /* Try to add the element to the target DB */
2939 deleteIfVolatile(dst,c->argv[1]);
2940 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2941 addReply(c,shared.czero);
2942 return;
2943 }
2944 incrRefCount(c->argv[1]);
2945 incrRefCount(o);
2946
2947 /* OK! key moved, free the entry in the source DB */
2948 deleteKey(src,c->argv[1]);
2949 server.dirty++;
2950 addReply(c,shared.cone);
2951 }
2952
2953 /* =================================== Lists ================================ */
2954 static void pushGenericCommand(redisClient *c, int where) {
2955 robj *lobj;
2956 list *list;
2957
2958 lobj = lookupKeyWrite(c->db,c->argv[1]);
2959 if (lobj == NULL) {
2960 lobj = createListObject();
2961 list = lobj->ptr;
2962 if (where == REDIS_HEAD) {
2963 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2964 } else {
2965 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2966 }
2967 dictAdd(c->db->dict,c->argv[1],lobj);
2968 incrRefCount(c->argv[1]);
2969 incrRefCount(c->argv[2]);
2970 } else {
2971 if (lobj->type != REDIS_LIST) {
2972 addReply(c,shared.wrongtypeerr);
2973 return;
2974 }
2975 list = lobj->ptr;
2976 if (where == REDIS_HEAD) {
2977 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2978 } else {
2979 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2980 }
2981 incrRefCount(c->argv[2]);
2982 }
2983 server.dirty++;
2984 addReply(c,shared.ok);
2985 }
2986
2987 static void lpushCommand(redisClient *c) {
2988 pushGenericCommand(c,REDIS_HEAD);
2989 }
2990
2991 static void rpushCommand(redisClient *c) {
2992 pushGenericCommand(c,REDIS_TAIL);
2993 }
2994
2995 static void llenCommand(redisClient *c) {
2996 robj *o;
2997 list *l;
2998
2999 o = lookupKeyRead(c->db,c->argv[1]);
3000 if (o == NULL) {
3001 addReply(c,shared.czero);
3002 return;
3003 } else {
3004 if (o->type != REDIS_LIST) {
3005 addReply(c,shared.wrongtypeerr);
3006 } else {
3007 l = o->ptr;
3008 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3009 }
3010 }
3011 }
3012
3013 static void lindexCommand(redisClient *c) {
3014 robj *o;
3015 int index = atoi(c->argv[2]->ptr);
3016
3017 o = lookupKeyRead(c->db,c->argv[1]);
3018 if (o == NULL) {
3019 addReply(c,shared.nullbulk);
3020 } else {
3021 if (o->type != REDIS_LIST) {
3022 addReply(c,shared.wrongtypeerr);
3023 } else {
3024 list *list = o->ptr;
3025 listNode *ln;
3026
3027 ln = listIndex(list, index);
3028 if (ln == NULL) {
3029 addReply(c,shared.nullbulk);
3030 } else {
3031 robj *ele = listNodeValue(ln);
3032 addReplyBulkLen(c,ele);
3033 addReply(c,ele);
3034 addReply(c,shared.crlf);
3035 }
3036 }
3037 }
3038 }
3039
3040 static void lsetCommand(redisClient *c) {
3041 robj *o;
3042 int index = atoi(c->argv[2]->ptr);
3043
3044 o = lookupKeyWrite(c->db,c->argv[1]);
3045 if (o == NULL) {
3046 addReply(c,shared.nokeyerr);
3047 } else {
3048 if (o->type != REDIS_LIST) {
3049 addReply(c,shared.wrongtypeerr);
3050 } else {
3051 list *list = o->ptr;
3052 listNode *ln;
3053
3054 ln = listIndex(list, index);
3055 if (ln == NULL) {
3056 addReply(c,shared.outofrangeerr);
3057 } else {
3058 robj *ele = listNodeValue(ln);
3059
3060 decrRefCount(ele);
3061 listNodeValue(ln) = c->argv[3];
3062 incrRefCount(c->argv[3]);
3063 addReply(c,shared.ok);
3064 server.dirty++;
3065 }
3066 }
3067 }
3068 }
3069
3070 static void popGenericCommand(redisClient *c, int where) {
3071 robj *o;
3072
3073 o = lookupKeyWrite(c->db,c->argv[1]);
3074 if (o == NULL) {
3075 addReply(c,shared.nullbulk);
3076 } else {
3077 if (o->type != REDIS_LIST) {
3078 addReply(c,shared.wrongtypeerr);
3079 } else {
3080 list *list = o->ptr;
3081 listNode *ln;
3082
3083 if (where == REDIS_HEAD)
3084 ln = listFirst(list);
3085 else
3086 ln = listLast(list);
3087
3088 if (ln == NULL) {
3089 addReply(c,shared.nullbulk);
3090 } else {
3091 robj *ele = listNodeValue(ln);
3092 addReplyBulkLen(c,ele);
3093 addReply(c,ele);
3094 addReply(c,shared.crlf);
3095 listDelNode(list,ln);
3096 server.dirty++;
3097 }
3098 }
3099 }
3100 }
3101
3102 static void lpopCommand(redisClient *c) {
3103 popGenericCommand(c,REDIS_HEAD);
3104 }
3105
3106 static void rpopCommand(redisClient *c) {
3107 popGenericCommand(c,REDIS_TAIL);
3108 }
3109
3110 static void lrangeCommand(redisClient *c) {
3111 robj *o;
3112 int start = atoi(c->argv[2]->ptr);
3113 int end = atoi(c->argv[3]->ptr);
3114
3115 o = lookupKeyRead(c->db,c->argv[1]);
3116 if (o == NULL) {
3117 addReply(c,shared.nullmultibulk);
3118 } else {
3119 if (o->type != REDIS_LIST) {
3120 addReply(c,shared.wrongtypeerr);
3121 } else {
3122 list *list = o->ptr;
3123 listNode *ln;
3124 int llen = listLength(list);
3125 int rangelen, j;
3126 robj *ele;
3127
3128 /* convert negative indexes */
3129 if (start < 0) start = llen+start;
3130 if (end < 0) end = llen+end;
3131 if (start < 0) start = 0;
3132 if (end < 0) end = 0;
3133
3134 /* indexes sanity checks */
3135 if (start > end || start >= llen) {
3136 /* Out of range start or start > end result in empty list */
3137 addReply(c,shared.emptymultibulk);
3138 return;
3139 }
3140 if (end >= llen) end = llen-1;
3141 rangelen = (end-start)+1;
3142
3143 /* Return the result in form of a multi-bulk reply */
3144 ln = listIndex(list, start);
3145 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3146 for (j = 0; j < rangelen; j++) {
3147 ele = listNodeValue(ln);
3148 addReplyBulkLen(c,ele);
3149 addReply(c,ele);
3150 addReply(c,shared.crlf);
3151 ln = ln->next;
3152 }
3153 }
3154 }
3155 }
3156
3157 static void ltrimCommand(redisClient *c) {
3158 robj *o;
3159 int start = atoi(c->argv[2]->ptr);
3160 int end = atoi(c->argv[3]->ptr);
3161
3162 o = lookupKeyWrite(c->db,c->argv[1]);
3163 if (o == NULL) {
3164 addReply(c,shared.nokeyerr);
3165 } else {
3166 if (o->type != REDIS_LIST) {
3167 addReply(c,shared.wrongtypeerr);
3168 } else {
3169 list *list = o->ptr;
3170 listNode *ln;
3171 int llen = listLength(list);
3172 int j, ltrim, rtrim;
3173
3174 /* convert negative indexes */
3175 if (start < 0) start = llen+start;
3176 if (end < 0) end = llen+end;
3177 if (start < 0) start = 0;
3178 if (end < 0) end = 0;
3179
3180 /* indexes sanity checks */
3181 if (start > end || start >= llen) {
3182 /* Out of range start or start > end result in empty list */
3183 ltrim = llen;
3184 rtrim = 0;
3185 } else {
3186 if (end >= llen) end = llen-1;
3187 ltrim = start;
3188 rtrim = llen-end-1;
3189 }
3190
3191 /* Remove list elements to perform the trim */
3192 for (j = 0; j < ltrim; j++) {
3193 ln = listFirst(list);
3194 listDelNode(list,ln);
3195 }
3196 for (j = 0; j < rtrim; j++) {
3197 ln = listLast(list);
3198 listDelNode(list,ln);
3199 }
3200 server.dirty++;
3201 addReply(c,shared.ok);
3202 }
3203 }
3204 }
3205
3206 static void lremCommand(redisClient *c) {
3207 robj *o;
3208
3209 o = lookupKeyWrite(c->db,c->argv[1]);
3210 if (o == NULL) {
3211 addReply(c,shared.czero);
3212 } else {
3213 if (o->type != REDIS_LIST) {
3214 addReply(c,shared.wrongtypeerr);
3215 } else {
3216 list *list = o->ptr;
3217 listNode *ln, *next;
3218 int toremove = atoi(c->argv[2]->ptr);
3219 int removed = 0;
3220 int fromtail = 0;
3221
3222 if (toremove < 0) {
3223 toremove = -toremove;
3224 fromtail = 1;
3225 }
3226 ln = fromtail ? list->tail : list->head;
3227 while (ln) {
3228 robj *ele = listNodeValue(ln);
3229
3230 next = fromtail ? ln->prev : ln->next;
3231 if (compareStringObjects(ele,c->argv[3]) == 0) {
3232 listDelNode(list,ln);
3233 server.dirty++;
3234 removed++;
3235 if (toremove && removed == toremove) break;
3236 }
3237 ln = next;
3238 }
3239 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3240 }
3241 }
3242 }
3243
3244 /* ==================================== Sets ================================ */
3245
3246 static void saddCommand(redisClient *c) {
3247 robj *set;
3248
3249 set = lookupKeyWrite(c->db,c->argv[1]);
3250 if (set == NULL) {
3251 set = createSetObject();
3252 dictAdd(c->db->dict,c->argv[1],set);
3253 incrRefCount(c->argv[1]);
3254 } else {
3255 if (set->type != REDIS_SET) {
3256 addReply(c,shared.wrongtypeerr);
3257 return;
3258 }
3259 }
3260 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3261 incrRefCount(c->argv[2]);
3262 server.dirty++;
3263 addReply(c,shared.cone);
3264 } else {
3265 addReply(c,shared.czero);
3266 }
3267 }
3268
3269 static void sremCommand(redisClient *c) {
3270 robj *set;
3271
3272 set = lookupKeyWrite(c->db,c->argv[1]);
3273 if (set == NULL) {
3274 addReply(c,shared.czero);
3275 } else {
3276 if (set->type != REDIS_SET) {
3277 addReply(c,shared.wrongtypeerr);
3278 return;
3279 }
3280 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3281 server.dirty++;
3282 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3283 addReply(c,shared.cone);
3284 } else {
3285 addReply(c,shared.czero);
3286 }
3287 }
3288 }
3289
3290 static void smoveCommand(redisClient *c) {
3291 robj *srcset, *dstset;
3292
3293 srcset = lookupKeyWrite(c->db,c->argv[1]);
3294 dstset = lookupKeyWrite(c->db,c->argv[2]);
3295
3296 /* If the source key does not exist return 0, if it's of the wrong type
3297 * raise an error */
3298 if (srcset == NULL || srcset->type != REDIS_SET) {
3299 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3300 return;
3301 }
3302 /* Error if the destination key is not a set as well */
3303 if (dstset && dstset->type != REDIS_SET) {
3304 addReply(c,shared.wrongtypeerr);
3305 return;
3306 }
3307 /* Remove the element from the source set */
3308 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3309 /* Key not found in the src set! return zero */
3310 addReply(c,shared.czero);
3311 return;
3312 }
3313 server.dirty++;
3314 /* Add the element to the destination set */
3315 if (!dstset) {
3316 dstset = createSetObject();
3317 dictAdd(c->db->dict,c->argv[2],dstset);
3318 incrRefCount(c->argv[2]);
3319 }
3320 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3321 incrRefCount(c->argv[3]);
3322 addReply(c,shared.cone);
3323 }
3324
3325 static void sismemberCommand(redisClient *c) {
3326 robj *set;
3327
3328 set = lookupKeyRead(c->db,c->argv[1]);
3329 if (set == NULL) {
3330 addReply(c,shared.czero);
3331 } else {
3332 if (set->type != REDIS_SET) {
3333 addReply(c,shared.wrongtypeerr);
3334 return;
3335 }
3336 if (dictFind(set->ptr,c->argv[2]))
3337 addReply(c,shared.cone);
3338 else
3339 addReply(c,shared.czero);
3340 }
3341 }
3342
3343 static void scardCommand(redisClient *c) {
3344 robj *o;
3345 dict *s;
3346
3347 o = lookupKeyRead(c->db,c->argv[1]);
3348 if (o == NULL) {
3349 addReply(c,shared.czero);
3350 return;
3351 } else {
3352 if (o->type != REDIS_SET) {
3353 addReply(c,shared.wrongtypeerr);
3354 } else {
3355 s = o->ptr;
3356 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3357 dictSize(s)));
3358 }
3359 }
3360 }
3361
3362 static void spopCommand(redisClient *c) {
3363 robj *set;
3364 dictEntry *de;
3365
3366 set = lookupKeyWrite(c->db,c->argv[1]);
3367 if (set == NULL) {
3368 addReply(c,shared.nullbulk);
3369 } else {
3370 if (set->type != REDIS_SET) {
3371 addReply(c,shared.wrongtypeerr);
3372 return;
3373 }
3374 de = dictGetRandomKey(set->ptr);
3375 if (de == NULL) {
3376 addReply(c,shared.nullbulk);
3377 } else {
3378 robj *ele = dictGetEntryKey(de);
3379
3380 addReplyBulkLen(c,ele);
3381 addReply(c,ele);
3382 addReply(c,shared.crlf);
3383 dictDelete(set->ptr,ele);
3384 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3385 server.dirty++;
3386 }
3387 }
3388 }
3389
3390 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3391 dict **d1 = (void*) s1, **d2 = (void*) s2;
3392
3393 return dictSize(*d1)-dictSize(*d2);
3394 }
3395
3396 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3397 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3398 dictIterator *di;
3399 dictEntry *de;
3400 robj *lenobj = NULL, *dstset = NULL;
3401 int j, cardinality = 0;
3402
3403 if (!dv) oom("sinterGenericCommand");
3404 for (j = 0; j < setsnum; j++) {
3405 robj *setobj;
3406
3407 setobj = dstkey ?
3408 lookupKeyWrite(c->db,setskeys[j]) :
3409 lookupKeyRead(c->db,setskeys[j]);
3410 if (!setobj) {
3411 zfree(dv);
3412 if (dstkey) {
3413 deleteKey(c->db,dstkey);
3414 addReply(c,shared.ok);
3415 } else {
3416 addReply(c,shared.nullmultibulk);
3417 }
3418 return;
3419 }
3420 if (setobj->type != REDIS_SET) {
3421 zfree(dv);
3422 addReply(c,shared.wrongtypeerr);
3423 return;
3424 }
3425 dv[j] = setobj->ptr;
3426 }
3427 /* Sort sets from the smallest to largest, this will improve our
3428 * algorithm's performace */
3429 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3430
3431 /* The first thing we should output is the total number of elements...
3432 * since this is a multi-bulk write, but at this stage we don't know
3433 * the intersection set size, so we use a trick, append an empty object
3434 * to the output list and save the pointer to later modify it with the
3435 * right length */
3436 if (!dstkey) {
3437 lenobj = createObject(REDIS_STRING,NULL);
3438 addReply(c,lenobj);
3439 decrRefCount(lenobj);
3440 } else {
3441 /* If we have a target key where to store the resulting set
3442 * create this key with an empty set inside */
3443 dstset = createSetObject();
3444 }
3445
3446 /* Iterate all the elements of the first (smallest) set, and test
3447 * the element against all the other sets, if at least one set does
3448 * not include the element it is discarded */
3449 di = dictGetIterator(dv[0]);
3450 if (!di) oom("dictGetIterator");
3451
3452 while((de = dictNext(di)) != NULL) {
3453 robj *ele;
3454
3455 for (j = 1; j < setsnum; j++)
3456 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3457 if (j != setsnum)
3458 continue; /* at least one set does not contain the member */
3459 ele = dictGetEntryKey(de);
3460 if (!dstkey) {
3461 addReplyBulkLen(c,ele);
3462 addReply(c,ele);
3463 addReply(c,shared.crlf);
3464 cardinality++;
3465 } else {
3466 dictAdd(dstset->ptr,ele,NULL);
3467 incrRefCount(ele);
3468 }
3469 }
3470 dictReleaseIterator(di);
3471
3472 if (dstkey) {
3473 /* Store the resulting set into the target */
3474 deleteKey(c->db,dstkey);
3475 dictAdd(c->db->dict,dstkey,dstset);
3476 incrRefCount(dstkey);
3477 }
3478
3479 if (!dstkey) {
3480 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3481 } else {
3482 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3483 dictSize((dict*)dstset->ptr)));
3484 server.dirty++;
3485 }
3486 zfree(dv);
3487 }
3488
3489 static void sinterCommand(redisClient *c) {
3490 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3491 }
3492
3493 static void sinterstoreCommand(redisClient *c) {
3494 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3495 }
3496
3497 #define REDIS_OP_UNION 0
3498 #define REDIS_OP_DIFF 1
3499
3500 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3501 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3502 dictIterator *di;
3503 dictEntry *de;
3504 robj *dstset = NULL;
3505 int j, cardinality = 0;
3506
3507 if (!dv) oom("sunionDiffGenericCommand");
3508 for (j = 0; j < setsnum; j++) {
3509 robj *setobj;
3510
3511 setobj = dstkey ?
3512 lookupKeyWrite(c->db,setskeys[j]) :
3513 lookupKeyRead(c->db,setskeys[j]);
3514 if (!setobj) {
3515 dv[j] = NULL;
3516 continue;
3517 }
3518 if (setobj->type != REDIS_SET) {
3519 zfree(dv);
3520 addReply(c,shared.wrongtypeerr);
3521 return;
3522 }
3523 dv[j] = setobj->ptr;
3524 }
3525
3526 /* We need a temp set object to store our union. If the dstkey
3527 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3528 * this set object will be the resulting object to set into the target key*/
3529 dstset = createSetObject();
3530
3531 /* Iterate all the elements of all the sets, add every element a single
3532 * time to the result set */
3533 for (j = 0; j < setsnum; j++) {
3534 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3535 if (!dv[j]) continue; /* non existing keys are like empty sets */
3536
3537 di = dictGetIterator(dv[j]);
3538 if (!di) oom("dictGetIterator");
3539
3540 while((de = dictNext(di)) != NULL) {
3541 robj *ele;
3542
3543 /* dictAdd will not add the same element multiple times */
3544 ele = dictGetEntryKey(de);
3545 if (op == REDIS_OP_UNION || j == 0) {
3546 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3547 incrRefCount(ele);
3548 cardinality++;
3549 }
3550 } else if (op == REDIS_OP_DIFF) {
3551 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3552 cardinality--;
3553 }
3554 }
3555 }
3556 dictReleaseIterator(di);
3557
3558 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3559 }
3560
3561 /* Output the content of the resulting set, if not in STORE mode */
3562 if (!dstkey) {
3563 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3564 di = dictGetIterator(dstset->ptr);
3565 if (!di) oom("dictGetIterator");
3566 while((de = dictNext(di)) != NULL) {
3567 robj *ele;
3568
3569 ele = dictGetEntryKey(de);
3570 addReplyBulkLen(c,ele);
3571 addReply(c,ele);
3572 addReply(c,shared.crlf);
3573 }
3574 dictReleaseIterator(di);
3575 } else {
3576 /* If we have a target key where to store the resulting set
3577 * create this key with the result set inside */
3578 deleteKey(c->db,dstkey);
3579 dictAdd(c->db->dict,dstkey,dstset);
3580 incrRefCount(dstkey);
3581 }
3582
3583 /* Cleanup */
3584 if (!dstkey) {
3585 decrRefCount(dstset);
3586 } else {
3587 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3588 dictSize((dict*)dstset->ptr)));
3589 server.dirty++;
3590 }
3591 zfree(dv);
3592 }
3593
3594 static void sunionCommand(redisClient *c) {
3595 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3596 }
3597
3598 static void sunionstoreCommand(redisClient *c) {
3599 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3600 }
3601
3602 static void sdiffCommand(redisClient *c) {
3603 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3604 }
3605
3606 static void sdiffstoreCommand(redisClient *c) {
3607 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3608 }
3609
3610 static void flushdbCommand(redisClient *c) {
3611 server.dirty += dictSize(c->db->dict);
3612 dictEmpty(c->db->dict);
3613 dictEmpty(c->db->expires);
3614 addReply(c,shared.ok);
3615 }
3616
3617 static void flushallCommand(redisClient *c) {
3618 server.dirty += emptyDb();
3619 addReply(c,shared.ok);
3620 rdbSave(server.dbfilename);
3621 server.dirty++;
3622 }
3623
3624 static redisSortOperation *createSortOperation(int type, robj *pattern) {
3625 redisSortOperation *so = zmalloc(sizeof(*so));
3626 if (!so) oom("createSortOperation");
3627 so->type = type;
3628 so->pattern = pattern;
3629 return so;
3630 }
3631
3632 /* Return the value associated to the key with a name obtained
3633 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3634 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3635 char *p;
3636 sds spat, ssub;
3637 robj keyobj;
3638 int prefixlen, sublen, postfixlen;
3639 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3640 struct {
3641 long len;
3642 long free;
3643 char buf[REDIS_SORTKEY_MAX+1];
3644 } keyname;
3645
3646 if (subst->encoding == REDIS_ENCODING_RAW)
3647 incrRefCount(subst);
3648 else {
3649 subst = getDecodedObject(subst);
3650 }
3651
3652 spat = pattern->ptr;
3653 ssub = subst->ptr;
3654 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3655 p = strchr(spat,'*');
3656 if (!p) return NULL;
3657
3658 prefixlen = p-spat;
3659 sublen = sdslen(ssub);
3660 postfixlen = sdslen(spat)-(prefixlen+1);
3661 memcpy(keyname.buf,spat,prefixlen);
3662 memcpy(keyname.buf+prefixlen,ssub,sublen);
3663 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3664 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3665 keyname.len = prefixlen+sublen+postfixlen;
3666
3667 keyobj.refcount = 1;
3668 keyobj.type = REDIS_STRING;
3669 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3670
3671 decrRefCount(subst);
3672
3673 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3674 return lookupKeyRead(db,&keyobj);
3675 }
3676
3677 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3678 * the additional parameter is not standard but a BSD-specific we have to
3679 * pass sorting parameters via the global 'server' structure */
3680 static int sortCompare(const void *s1, const void *s2) {
3681 const redisSortObject *so1 = s1, *so2 = s2;
3682 int cmp;
3683
3684 if (!server.sort_alpha) {
3685 /* Numeric sorting. Here it's trivial as we precomputed scores */
3686 if (so1->u.score > so2->u.score) {
3687 cmp = 1;
3688 } else if (so1->u.score < so2->u.score) {
3689 cmp = -1;
3690 } else {
3691 cmp = 0;
3692 }
3693 } else {
3694 /* Alphanumeric sorting */
3695 if (server.sort_bypattern) {
3696 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3697 /* At least one compare object is NULL */
3698 if (so1->u.cmpobj == so2->u.cmpobj)
3699 cmp = 0;
3700 else if (so1->u.cmpobj == NULL)
3701 cmp = -1;
3702 else
3703 cmp = 1;
3704 } else {
3705 /* We have both the objects, use strcoll */
3706 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3707 }
3708 } else {
3709 /* Compare elements directly */
3710 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
3711 so2->obj->encoding == REDIS_ENCODING_RAW) {
3712 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3713 } else {
3714 robj *dec1, *dec2;
3715
3716 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
3717 so1->obj : getDecodedObject(so1->obj);
3718 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
3719 so2->obj : getDecodedObject(so2->obj);
3720 cmp = strcoll(dec1->ptr,dec2->ptr);
3721 if (dec1 != so1->obj) decrRefCount(dec1);
3722 if (dec2 != so2->obj) decrRefCount(dec2);
3723 }
3724 }
3725 }
3726 return server.sort_desc ? -cmp : cmp;
3727 }
3728
3729 /* The SORT command is the most complex command in Redis. Warning: this code
3730 * is optimized for speed and a bit less for readability */
3731 static void sortCommand(redisClient *c) {
3732 list *operations;
3733 int outputlen = 0;
3734 int desc = 0, alpha = 0;
3735 int limit_start = 0, limit_count = -1, start, end;
3736 int j, dontsort = 0, vectorlen;
3737 int getop = 0; /* GET operation counter */
3738 robj *sortval, *sortby = NULL;
3739 redisSortObject *vector; /* Resulting vector to sort */
3740
3741 /* Lookup the key to sort. It must be of the right types */
3742 sortval = lookupKeyRead(c->db,c->argv[1]);
3743 if (sortval == NULL) {
3744 addReply(c,shared.nokeyerr);
3745 return;
3746 }
3747 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3748 addReply(c,shared.wrongtypeerr);
3749 return;
3750 }
3751
3752 /* Create a list of operations to perform for every sorted element.
3753 * Operations can be GET/DEL/INCR/DECR */
3754 operations = listCreate();
3755 listSetFreeMethod(operations,zfree);
3756 j = 2;
3757
3758 /* Now we need to protect sortval incrementing its count, in the future
3759 * SORT may have options able to overwrite/delete keys during the sorting
3760 * and the sorted key itself may get destroied */
3761 incrRefCount(sortval);
3762
3763 /* The SORT command has an SQL-alike syntax, parse it */
3764 while(j < c->argc) {
3765 int leftargs = c->argc-j-1;
3766 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3767 desc = 0;
3768 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3769 desc = 1;
3770 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3771 alpha = 1;
3772 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3773 limit_start = atoi(c->argv[j+1]->ptr);
3774 limit_count = atoi(c->argv[j+2]->ptr);
3775 j+=2;
3776 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3777 sortby = c->argv[j+1];
3778 /* If the BY pattern does not contain '*', i.e. it is constant,
3779 * we don't need to sort nor to lookup the weight keys. */
3780 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3781 j++;
3782 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3783 listAddNodeTail(operations,createSortOperation(
3784 REDIS_SORT_GET,c->argv[j+1]));
3785 getop++;
3786 j++;
3787 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3788 listAddNodeTail(operations,createSortOperation(
3789 REDIS_SORT_DEL,c->argv[j+1]));
3790 j++;
3791 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3792 listAddNodeTail(operations,createSortOperation(
3793 REDIS_SORT_INCR,c->argv[j+1]));
3794 j++;
3795 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3796 listAddNodeTail(operations,createSortOperation(
3797 REDIS_SORT_DECR,c->argv[j+1]));
3798 j++;
3799 } else {
3800 decrRefCount(sortval);
3801 listRelease(operations);
3802 addReply(c,shared.syntaxerr);
3803 return;
3804 }
3805 j++;
3806 }
3807
3808 /* Load the sorting vector with all the objects to sort */
3809 vectorlen = (sortval->type == REDIS_LIST) ?
3810 listLength((list*)sortval->ptr) :
3811 dictSize((dict*)sortval->ptr);
3812 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3813 if (!vector) oom("allocating objects vector for SORT");
3814 j = 0;
3815 if (sortval->type == REDIS_LIST) {
3816 list *list = sortval->ptr;
3817 listNode *ln;
3818
3819 listRewind(list);
3820 while((ln = listYield(list))) {
3821 robj *ele = ln->value;
3822 vector[j].obj = ele;
3823 vector[j].u.score = 0;
3824 vector[j].u.cmpobj = NULL;
3825 j++;
3826 }
3827 } else {
3828 dict *set = sortval->ptr;
3829 dictIterator *di;
3830 dictEntry *setele;
3831
3832 di = dictGetIterator(set);
3833 if (!di) oom("dictGetIterator");
3834 while((setele = dictNext(di)) != NULL) {
3835 vector[j].obj = dictGetEntryKey(setele);
3836 vector[j].u.score = 0;
3837 vector[j].u.cmpobj = NULL;
3838 j++;
3839 }
3840 dictReleaseIterator(di);
3841 }
3842 assert(j == vectorlen);
3843
3844 /* Now it's time to load the right scores in the sorting vector */
3845 if (dontsort == 0) {
3846 for (j = 0; j < vectorlen; j++) {
3847 if (sortby) {
3848 robj *byval;
3849
3850 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3851 if (!byval || byval->type != REDIS_STRING) continue;
3852 if (alpha) {
3853 if (byval->encoding == REDIS_ENCODING_RAW) {
3854 vector[j].u.cmpobj = byval;
3855 incrRefCount(byval);
3856 } else {
3857 vector[j].u.cmpobj = getDecodedObject(byval);
3858 }
3859 } else {
3860 if (byval->encoding == REDIS_ENCODING_RAW) {
3861 vector[j].u.score = strtod(byval->ptr,NULL);
3862 } else {
3863 if (byval->encoding == REDIS_ENCODING_INT) {
3864 vector[j].u.score = (long)byval->ptr;
3865 } else
3866 assert(1 != 1);
3867 }
3868 }
3869 } else {
3870 if (!alpha) {
3871 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
3872 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3873 else {
3874 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
3875 vector[j].u.score = (long) vector[j].obj->ptr;
3876 else
3877 assert(1 != 1);
3878 }
3879 }
3880 }
3881 }
3882 }
3883
3884 /* We are ready to sort the vector... perform a bit of sanity check
3885 * on the LIMIT option too. We'll use a partial version of quicksort. */
3886 start = (limit_start < 0) ? 0 : limit_start;
3887 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3888 if (start >= vectorlen) {
3889 start = vectorlen-1;
3890 end = vectorlen-2;
3891 }
3892 if (end >= vectorlen) end = vectorlen-1;
3893
3894 if (dontsort == 0) {
3895 server.sort_desc = desc;
3896 server.sort_alpha = alpha;
3897 server.sort_bypattern = sortby ? 1 : 0;
3898 if (sortby && (start != 0 || end != vectorlen-1))
3899 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3900 else
3901 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3902 }
3903
3904 /* Send command output to the output buffer, performing the specified
3905 * GET/DEL/INCR/DECR operations if any. */
3906 outputlen = getop ? getop*(end-start+1) : end-start+1;
3907 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3908 for (j = start; j <= end; j++) {
3909 listNode *ln;
3910 if (!getop) {
3911 addReplyBulkLen(c,vector[j].obj);
3912 addReply(c,vector[j].obj);
3913 addReply(c,shared.crlf);
3914 }
3915 listRewind(operations);
3916 while((ln = listYield(operations))) {
3917 redisSortOperation *sop = ln->value;
3918 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3919 vector[j].obj);
3920
3921 if (sop->type == REDIS_SORT_GET) {
3922 if (!val || val->type != REDIS_STRING) {
3923 addReply(c,shared.nullbulk);
3924 } else {
3925 addReplyBulkLen(c,val);
3926 addReply(c,val);
3927 addReply(c,shared.crlf);
3928 }
3929 } else if (sop->type == REDIS_SORT_DEL) {
3930 /* TODO */
3931 }
3932 }
3933 }
3934
3935 /* Cleanup */
3936 decrRefCount(sortval);
3937 listRelease(operations);
3938 for (j = 0; j < vectorlen; j++) {
3939 if (sortby && alpha && vector[j].u.cmpobj)
3940 decrRefCount(vector[j].u.cmpobj);
3941 }
3942 zfree(vector);
3943 }
3944
3945 static void infoCommand(redisClient *c) {
3946 sds info;
3947 time_t uptime = time(NULL)-server.stat_starttime;
3948 int j;
3949
3950 info = sdscatprintf(sdsempty(),
3951 "redis_version:%s\r\n"
3952 "arch_bits:%s\r\n"
3953 "uptime_in_seconds:%d\r\n"
3954 "uptime_in_days:%d\r\n"
3955 "connected_clients:%d\r\n"
3956 "connected_slaves:%d\r\n"
3957 "used_memory:%zu\r\n"
3958 "changes_since_last_save:%lld\r\n"
3959 "bgsave_in_progress:%d\r\n"
3960 "last_save_time:%d\r\n"
3961 "total_connections_received:%lld\r\n"
3962 "total_commands_processed:%lld\r\n"
3963 "role:%s\r\n"
3964 ,REDIS_VERSION,
3965 (sizeof(long) == 8) ? "64" : "32",
3966 uptime,
3967 uptime/(3600*24),
3968 listLength(server.clients)-listLength(server.slaves),
3969 listLength(server.slaves),
3970 server.usedmemory,
3971 server.dirty,
3972 server.bgsaveinprogress,
3973 server.lastsave,
3974 server.stat_numconnections,
3975 server.stat_numcommands,
3976 server.masterhost == NULL ? "master" : "slave"
3977 );
3978 if (server.masterhost) {
3979 info = sdscatprintf(info,
3980 "master_host:%s\r\n"
3981 "master_port:%d\r\n"
3982 "master_link_status:%s\r\n"
3983 "master_last_io_seconds_ago:%d\r\n"
3984 ,server.masterhost,
3985 server.masterport,
3986 (server.replstate == REDIS_REPL_CONNECTED) ?
3987 "up" : "down",
3988 (int)(time(NULL)-server.master->lastinteraction)
3989 );
3990 }
3991 for (j = 0; j < server.dbnum; j++) {
3992 long long keys, vkeys;
3993
3994 keys = dictSize(server.db[j].dict);
3995 vkeys = dictSize(server.db[j].expires);
3996 if (keys || vkeys) {
3997 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
3998 j, keys, vkeys);
3999 }
4000 }
4001 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4002 addReplySds(c,info);
4003 addReply(c,shared.crlf);
4004 }
4005
4006 static void monitorCommand(redisClient *c) {
4007 /* ignore MONITOR if aleady slave or in monitor mode */
4008 if (c->flags & REDIS_SLAVE) return;
4009
4010 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4011 c->slaveseldb = 0;
4012 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
4013 addReply(c,shared.ok);
4014 }
4015
4016 /* ================================= Expire ================================= */
4017 static int removeExpire(redisDb *db, robj *key) {
4018 if (dictDelete(db->expires,key) == DICT_OK) {
4019 return 1;
4020 } else {
4021 return 0;
4022 }
4023 }
4024
4025 static int setExpire(redisDb *db, robj *key, time_t when) {
4026 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4027 return 0;
4028 } else {
4029 incrRefCount(key);
4030 return 1;
4031 }
4032 }
4033
4034 /* Return the expire time of the specified key, or -1 if no expire
4035 * is associated with this key (i.e. the key is non volatile) */
4036 static time_t getExpire(redisDb *db, robj *key) {
4037 dictEntry *de;
4038
4039 /* No expire? return ASAP */
4040 if (dictSize(db->expires) == 0 ||
4041 (de = dictFind(db->expires,key)) == NULL) return -1;
4042
4043 return (time_t) dictGetEntryVal(de);
4044 }
4045
4046 static int expireIfNeeded(redisDb *db, robj *key) {
4047 time_t when;
4048 dictEntry *de;
4049
4050 /* No expire? return ASAP */
4051 if (dictSize(db->expires) == 0 ||
4052 (de = dictFind(db->expires,key)) == NULL) return 0;
4053
4054 /* Lookup the expire */
4055 when = (time_t) dictGetEntryVal(de);
4056 if (time(NULL) <= when) return 0;
4057
4058 /* Delete the key */
4059 dictDelete(db->expires,key);
4060 return dictDelete(db->dict,key) == DICT_OK;
4061 }
4062
4063 static int deleteIfVolatile(redisDb *db, robj *key) {
4064 dictEntry *de;
4065
4066 /* No expire? return ASAP */
4067 if (dictSize(db->expires) == 0 ||
4068 (de = dictFind(db->expires,key)) == NULL) return 0;
4069
4070 /* Delete the key */
4071 server.dirty++;
4072 dictDelete(db->expires,key);
4073 return dictDelete(db->dict,key) == DICT_OK;
4074 }
4075
4076 static void expireCommand(redisClient *c) {
4077 dictEntry *de;
4078 int seconds = atoi(c->argv[2]->ptr);
4079
4080 de = dictFind(c->db->dict,c->argv[1]);
4081 if (de == NULL) {
4082 addReply(c,shared.czero);
4083 return;
4084 }
4085 if (seconds <= 0) {
4086 addReply(c, shared.czero);
4087 return;
4088 } else {
4089 time_t when = time(NULL)+seconds;
4090 if (setExpire(c->db,c->argv[1],when)) {
4091 addReply(c,shared.cone);
4092 server.dirty++;
4093 } else {
4094 addReply(c,shared.czero);
4095 }
4096 return;
4097 }
4098 }
4099
4100 static void ttlCommand(redisClient *c) {
4101 time_t expire;
4102 int ttl = -1;
4103
4104 expire = getExpire(c->db,c->argv[1]);
4105 if (expire != -1) {
4106 ttl = (int) (expire-time(NULL));
4107 if (ttl < 0) ttl = -1;
4108 }
4109 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4110 }
4111
4112 static void msetGenericCommand(redisClient *c, int nx) {
4113 int j;
4114
4115 if ((c->argc % 2) == 0) {
4116 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4117 return;
4118 }
4119 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4120 * set nothing at all if at least one already key exists. */
4121 if (nx) {
4122 for (j = 1; j < c->argc; j += 2) {
4123 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4124 addReply(c, shared.czero);
4125 return;
4126 }
4127 }
4128 }
4129
4130 for (j = 1; j < c->argc; j += 2) {
4131 int retval;
4132
4133 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4134 if (retval == DICT_ERR) {
4135 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4136 incrRefCount(c->argv[j+1]);
4137 } else {
4138 incrRefCount(c->argv[j]);
4139 incrRefCount(c->argv[j+1]);
4140 }
4141 removeExpire(c->db,c->argv[j]);
4142 }
4143 server.dirty += (c->argc-1)/2;
4144 addReply(c, nx ? shared.cone : shared.ok);
4145 }
4146
4147 static void msetCommand(redisClient *c) {
4148 msetGenericCommand(c,0);
4149 }
4150
4151 static void msetnxCommand(redisClient *c) {
4152 msetGenericCommand(c,1);
4153 }
4154
4155 /* =============================== Replication ============================= */
4156
4157 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4158 ssize_t nwritten, ret = size;
4159 time_t start = time(NULL);
4160
4161 timeout++;
4162 while(size) {
4163 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4164 nwritten = write(fd,ptr,size);
4165 if (nwritten == -1) return -1;
4166 ptr += nwritten;
4167 size -= nwritten;
4168 }
4169 if ((time(NULL)-start) > timeout) {
4170 errno = ETIMEDOUT;
4171 return -1;
4172 }
4173 }
4174 return ret;
4175 }
4176
4177 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4178 ssize_t nread, totread = 0;
4179 time_t start = time(NULL);
4180
4181 timeout++;
4182 while(size) {
4183 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4184 nread = read(fd,ptr,size);
4185 if (nread == -1) return -1;
4186 ptr += nread;
4187 size -= nread;
4188 totread += nread;
4189 }
4190 if ((time(NULL)-start) > timeout) {
4191 errno = ETIMEDOUT;
4192 return -1;
4193 }
4194 }
4195 return totread;
4196 }
4197
4198 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4199 ssize_t nread = 0;
4200
4201 size--;
4202 while(size) {
4203 char c;
4204
4205 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4206 if (c == '\n') {
4207 *ptr = '\0';
4208 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4209 return nread;
4210 } else {
4211 *ptr++ = c;
4212 *ptr = '\0';
4213 nread++;
4214 }
4215 }
4216 return nread;
4217 }
4218
4219 static void syncCommand(redisClient *c) {
4220 /* ignore SYNC if aleady slave or in monitor mode */
4221 if (c->flags & REDIS_SLAVE) return;
4222
4223 /* SYNC can't be issued when the server has pending data to send to
4224 * the client about already issued commands. We need a fresh reply
4225 * buffer registering the differences between the BGSAVE and the current
4226 * dataset, so that we can copy to other slaves if needed. */
4227 if (listLength(c->reply) != 0) {
4228 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4229 return;
4230 }
4231
4232 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4233 /* Here we need to check if there is a background saving operation
4234 * in progress, or if it is required to start one */
4235 if (server.bgsaveinprogress) {
4236 /* Ok a background save is in progress. Let's check if it is a good
4237 * one for replication, i.e. if there is another slave that is
4238 * registering differences since the server forked to save */
4239 redisClient *slave;
4240 listNode *ln;
4241
4242 listRewind(server.slaves);
4243 while((ln = listYield(server.slaves))) {
4244 slave = ln->value;
4245 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4246 }
4247 if (ln) {
4248 /* Perfect, the server is already registering differences for
4249 * another slave. Set the right state, and copy the buffer. */
4250 listRelease(c->reply);
4251 c->reply = listDup(slave->reply);
4252 if (!c->reply) oom("listDup copying slave reply list");
4253 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4254 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4255 } else {
4256 /* No way, we need to wait for the next BGSAVE in order to
4257 * register differences */
4258 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4259 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4260 }
4261 } else {
4262 /* Ok we don't have a BGSAVE in progress, let's start one */
4263 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4264 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4265 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4266 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4267 return;
4268 }
4269 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4270 }
4271 c->repldbfd = -1;
4272 c->flags |= REDIS_SLAVE;
4273 c->slaveseldb = 0;
4274 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
4275 return;
4276 }
4277
4278 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4279 redisClient *slave = privdata;
4280 REDIS_NOTUSED(el);
4281 REDIS_NOTUSED(mask);
4282 char buf[REDIS_IOBUF_LEN];
4283 ssize_t nwritten, buflen;
4284
4285 if (slave->repldboff == 0) {
4286 /* Write the bulk write count before to transfer the DB. In theory here
4287 * we don't know how much room there is in the output buffer of the
4288 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4289 * operations) will never be smaller than the few bytes we need. */
4290 sds bulkcount;
4291
4292 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4293 slave->repldbsize);
4294 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4295 {
4296 sdsfree(bulkcount);
4297 freeClient(slave);
4298 return;
4299 }
4300 sdsfree(bulkcount);
4301 }
4302 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4303 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4304 if (buflen <= 0) {
4305 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4306 (buflen == 0) ? "premature EOF" : strerror(errno));
4307 freeClient(slave);
4308 return;
4309 }
4310 if ((nwritten = write(fd,buf,buflen)) == -1) {
4311 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4312 strerror(errno));
4313 freeClient(slave);
4314 return;
4315 }
4316 slave->repldboff += nwritten;
4317 if (slave->repldboff == slave->repldbsize) {
4318 close(slave->repldbfd);
4319 slave->repldbfd = -1;
4320 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4321 slave->replstate = REDIS_REPL_ONLINE;
4322 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4323 sendReplyToClient, slave, NULL) == AE_ERR) {
4324 freeClient(slave);
4325 return;
4326 }
4327 addReplySds(slave,sdsempty());
4328 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4329 }
4330 }
4331
4332 /* This function is called at the end of every backgrond saving.
4333 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4334 * otherwise REDIS_ERR is passed to the function.
4335 *
4336 * The goal of this function is to handle slaves waiting for a successful
4337 * background saving in order to perform non-blocking synchronization. */
4338 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4339 listNode *ln;
4340 int startbgsave = 0;
4341
4342 listRewind(server.slaves);
4343 while((ln = listYield(server.slaves))) {
4344 redisClient *slave = ln->value;
4345
4346 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4347 startbgsave = 1;
4348 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4349 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4350 struct redis_stat buf;
4351
4352 if (bgsaveerr != REDIS_OK) {
4353 freeClient(slave);
4354 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4355 continue;
4356 }
4357 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4358 redis_fstat(slave->repldbfd,&buf) == -1) {
4359 freeClient(slave);
4360 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4361 continue;
4362 }
4363 slave->repldboff = 0;
4364 slave->repldbsize = buf.st_size;
4365 slave->replstate = REDIS_REPL_SEND_BULK;
4366 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4367 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4368 freeClient(slave);
4369 continue;
4370 }
4371 }
4372 }
4373 if (startbgsave) {
4374 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4375 listRewind(server.slaves);
4376 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4377 while((ln = listYield(server.slaves))) {
4378 redisClient *slave = ln->value;
4379
4380 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4381 freeClient(slave);
4382 }
4383 }
4384 }
4385 }
4386
4387 static int syncWithMaster(void) {
4388 char buf[1024], tmpfile[256];
4389 int dumpsize;
4390 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4391 int dfd;
4392
4393 if (fd == -1) {
4394 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4395 strerror(errno));
4396 return REDIS_ERR;
4397 }
4398 /* Issue the SYNC command */
4399 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4400 close(fd);
4401 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4402 strerror(errno));
4403 return REDIS_ERR;
4404 }
4405 /* Read the bulk write count */
4406 if (syncReadLine(fd,buf,1024,3600) == -1) {
4407 close(fd);
4408 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
4409 strerror(errno));
4410 return REDIS_ERR;
4411 }
4412 dumpsize = atoi(buf+1);
4413 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4414 /* Read the bulk write data on a temp file */
4415 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4416 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4417 if (dfd == -1) {
4418 close(fd);
4419 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
4420 return REDIS_ERR;
4421 }
4422 while(dumpsize) {
4423 int nread, nwritten;
4424
4425 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
4426 if (nread == -1) {
4427 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
4428 strerror(errno));
4429 close(fd);
4430 close(dfd);
4431 return REDIS_ERR;
4432 }
4433 nwritten = write(dfd,buf,nread);
4434 if (nwritten == -1) {
4435 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
4436 close(fd);
4437 close(dfd);
4438 return REDIS_ERR;
4439 }
4440 dumpsize -= nread;
4441 }
4442 close(dfd);
4443 if (rename(tmpfile,server.dbfilename) == -1) {
4444 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
4445 unlink(tmpfile);
4446 close(fd);
4447 return REDIS_ERR;
4448 }
4449 emptyDb();
4450 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4451 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4452 close(fd);
4453 return REDIS_ERR;
4454 }
4455 server.master = createClient(fd);
4456 server.master->flags |= REDIS_MASTER;
4457 server.replstate = REDIS_REPL_CONNECTED;
4458 return REDIS_OK;
4459 }
4460
4461 static void slaveofCommand(redisClient *c) {
4462 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4463 !strcasecmp(c->argv[2]->ptr,"one")) {
4464 if (server.masterhost) {
4465 sdsfree(server.masterhost);
4466 server.masterhost = NULL;
4467 if (server.master) freeClient(server.master);
4468 server.replstate = REDIS_REPL_NONE;
4469 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4470 }
4471 } else {
4472 sdsfree(server.masterhost);
4473 server.masterhost = sdsdup(c->argv[1]->ptr);
4474 server.masterport = atoi(c->argv[2]->ptr);
4475 if (server.master) freeClient(server.master);
4476 server.replstate = REDIS_REPL_CONNECT;
4477 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4478 server.masterhost, server.masterport);
4479 }
4480 addReply(c,shared.ok);
4481 }
4482
4483 /* ============================ Maxmemory directive ======================== */
4484
4485 /* This function gets called when 'maxmemory' is set on the config file to limit
4486 * the max memory used by the server, and we are out of memory.
4487 * This function will try to, in order:
4488 *
4489 * - Free objects from the free list
4490 * - Try to remove keys with an EXPIRE set
4491 *
4492 * It is not possible to free enough memory to reach used-memory < maxmemory
4493 * the server will start refusing commands that will enlarge even more the
4494 * memory usage.
4495 */
4496 static void freeMemoryIfNeeded(void) {
4497 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4498 if (listLength(server.objfreelist)) {
4499 robj *o;
4500
4501 listNode *head = listFirst(server.objfreelist);
4502 o = listNodeValue(head);
4503 listDelNode(server.objfreelist,head);
4504 zfree(o);
4505 } else {
4506 int j, k, freed = 0;
4507
4508 for (j = 0; j < server.dbnum; j++) {
4509 int minttl = -1;
4510 robj *minkey = NULL;
4511 struct dictEntry *de;
4512
4513 if (dictSize(server.db[j].expires)) {
4514 freed = 1;
4515 /* From a sample of three keys drop the one nearest to
4516 * the natural expire */
4517 for (k = 0; k < 3; k++) {
4518 time_t t;
4519
4520 de = dictGetRandomKey(server.db[j].expires);
4521 t = (time_t) dictGetEntryVal(de);
4522 if (minttl == -1 || t < minttl) {
4523 minkey = dictGetEntryKey(de);
4524 minttl = t;
4525 }
4526 }
4527 deleteKey(server.db+j,minkey);
4528 }
4529 }
4530 if (!freed) return; /* nothing to free... */
4531 }
4532 }
4533 }
4534
4535 /* ================================= Debugging ============================== */
4536
4537 static void debugCommand(redisClient *c) {
4538 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4539 *((char*)-1) = 'x';
4540 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4541 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4542 robj *key, *val;
4543
4544 if (!de) {
4545 addReply(c,shared.nokeyerr);
4546 return;
4547 }
4548 key = dictGetEntryKey(de);
4549 val = dictGetEntryVal(de);
4550 addReplySds(c,sdscatprintf(sdsempty(),
4551 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4552 key, key->refcount, val, val->refcount, val->encoding));
4553 } else {
4554 addReplySds(c,sdsnew(
4555 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4556 }
4557 }
4558
4559 #ifdef HAVE_BACKTRACE
4560 static struct redisFunctionSym symsTable[] = {
4561 {"compareStringObjects", (unsigned long)compareStringObjects},
4562 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
4563 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
4564 {"dictEncObjHash", (unsigned long)dictEncObjHash},
4565 {"incrDecrCommand", (unsigned long)incrDecrCommand},
4566 {"freeStringObject", (unsigned long)freeStringObject},
4567 {"freeListObject", (unsigned long)freeListObject},
4568 {"freeSetObject", (unsigned long)freeSetObject},
4569 {"decrRefCount", (unsigned long)decrRefCount},
4570 {"createObject", (unsigned long)createObject},
4571 {"freeClient", (unsigned long)freeClient},
4572 {"rdbLoad", (unsigned long)rdbLoad},
4573 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
4574 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
4575 {"addReply", (unsigned long)addReply},
4576 {"addReplySds", (unsigned long)addReplySds},
4577 {"incrRefCount", (unsigned long)incrRefCount},
4578 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
4579 {"createStringObject", (unsigned long)createStringObject},
4580 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
4581 {"syncWithMaster", (unsigned long)syncWithMaster},
4582 {"tryObjectSharing", (unsigned long)tryObjectSharing},
4583 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
4584 {"getDecodedObject", (unsigned long)getDecodedObject},
4585 {"removeExpire", (unsigned long)removeExpire},
4586 {"expireIfNeeded", (unsigned long)expireIfNeeded},
4587 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
4588 {"deleteKey", (unsigned long)deleteKey},
4589 {"getExpire", (unsigned long)getExpire},
4590 {"setExpire", (unsigned long)setExpire},
4591 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
4592 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
4593 {"authCommand", (unsigned long)authCommand},
4594 {"pingCommand", (unsigned long)pingCommand},
4595 {"echoCommand", (unsigned long)echoCommand},
4596 {"setCommand", (unsigned long)setCommand},
4597 {"setnxCommand", (unsigned long)setnxCommand},
4598 {"getCommand", (unsigned long)getCommand},
4599 {"delCommand", (unsigned long)delCommand},
4600 {"existsCommand", (unsigned long)existsCommand},
4601 {"incrCommand", (unsigned long)incrCommand},
4602 {"decrCommand", (unsigned long)decrCommand},
4603 {"incrbyCommand", (unsigned long)incrbyCommand},
4604 {"decrbyCommand", (unsigned long)decrbyCommand},
4605 {"selectCommand", (unsigned long)selectCommand},
4606 {"randomkeyCommand", (unsigned long)randomkeyCommand},
4607 {"keysCommand", (unsigned long)keysCommand},
4608 {"dbsizeCommand", (unsigned long)dbsizeCommand},
4609 {"lastsaveCommand", (unsigned long)lastsaveCommand},
4610 {"saveCommand", (unsigned long)saveCommand},
4611 {"bgsaveCommand", (unsigned long)bgsaveCommand},
4612 {"shutdownCommand", (unsigned long)shutdownCommand},
4613 {"moveCommand", (unsigned long)moveCommand},
4614 {"renameCommand", (unsigned long)renameCommand},
4615 {"renamenxCommand", (unsigned long)renamenxCommand},
4616 {"lpushCommand", (unsigned long)lpushCommand},
4617 {"rpushCommand", (unsigned long)rpushCommand},
4618 {"lpopCommand", (unsigned long)lpopCommand},
4619 {"rpopCommand", (unsigned long)rpopCommand},
4620 {"llenCommand", (unsigned long)llenCommand},
4621 {"lindexCommand", (unsigned long)lindexCommand},
4622 {"lrangeCommand", (unsigned long)lrangeCommand},
4623 {"ltrimCommand", (unsigned long)ltrimCommand},
4624 {"typeCommand", (unsigned long)typeCommand},
4625 {"lsetCommand", (unsigned long)lsetCommand},
4626 {"saddCommand", (unsigned long)saddCommand},
4627 {"sremCommand", (unsigned long)sremCommand},
4628 {"smoveCommand", (unsigned long)smoveCommand},
4629 {"sismemberCommand", (unsigned long)sismemberCommand},
4630 {"scardCommand", (unsigned long)scardCommand},
4631 {"spopCommand", (unsigned long)spopCommand},
4632 {"sinterCommand", (unsigned long)sinterCommand},
4633 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
4634 {"sunionCommand", (unsigned long)sunionCommand},
4635 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
4636 {"sdiffCommand", (unsigned long)sdiffCommand},
4637 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
4638 {"syncCommand", (unsigned long)syncCommand},
4639 {"flushdbCommand", (unsigned long)flushdbCommand},
4640 {"flushallCommand", (unsigned long)flushallCommand},
4641 {"sortCommand", (unsigned long)sortCommand},
4642 {"lremCommand", (unsigned long)lremCommand},
4643 {"infoCommand", (unsigned long)infoCommand},
4644 {"mgetCommand", (unsigned long)mgetCommand},
4645 {"monitorCommand", (unsigned long)monitorCommand},
4646 {"expireCommand", (unsigned long)expireCommand},
4647 {"getsetCommand", (unsigned long)getsetCommand},
4648 {"ttlCommand", (unsigned long)ttlCommand},
4649 {"slaveofCommand", (unsigned long)slaveofCommand},
4650 {"debugCommand", (unsigned long)debugCommand},
4651 {"processCommand", (unsigned long)processCommand},
4652 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
4653 {"readQueryFromClient", (unsigned long)readQueryFromClient},
4654 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
4655 {"msetGenericCommand", (unsigned long)msetGenericCommand},
4656 {"msetCommand", (unsigned long)msetCommand},
4657 {"msetnxCommand", (unsigned long)msetnxCommand},
4658 {NULL,0}
4659 };
4660
4661 /* This function try to convert a pointer into a function name. It's used in
4662 * oreder to provide a backtrace under segmentation fault that's able to
4663 * display functions declared as static (otherwise the backtrace is useless). */
4664 static char *findFuncName(void *pointer, unsigned long *offset){
4665 int i, ret = -1;
4666 unsigned long off, minoff = 0;
4667
4668 /* Try to match against the Symbol with the smallest offset */
4669 for (i=0; symsTable[i].pointer; i++) {
4670 unsigned long lp = (unsigned long) pointer;
4671
4672 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
4673 off=lp-symsTable[i].pointer;
4674 if (ret < 0 || off < minoff) {
4675 minoff=off;
4676 ret=i;
4677 }
4678 }
4679 }
4680 if (ret == -1) return NULL;
4681 *offset = minoff;
4682 return symsTable[ret].name;
4683 }
4684
4685 static void *getMcontextEip(ucontext_t *uc) {
4686 #if defined(__FreeBSD__)
4687 return (void*) uc->uc_mcontext.mc_eip;
4688 #elif defined(__dietlibc__)
4689 return (void*) uc->uc_mcontext.eip;
4690 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
4691 return (void*) uc->uc_mcontext->__ss.__eip;
4692 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
4693 #ifdef _STRUCT_X86_THREAD_STATE64
4694 return (void*) uc->uc_mcontext->__ss.__rip;
4695 #else
4696 return (void*) uc->uc_mcontext->__ss.__eip;
4697 #endif
4698 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
4699 return (void*) uc->uc_mcontext.gregs[REG_EIP];
4700 #elif defined(__ia64__) /* Linux IA64 */
4701 return (void*) uc->uc_mcontext.sc_ip;
4702 #else
4703 return NULL;
4704 #endif
4705 }
4706
4707 static void segvHandler(int sig, siginfo_t *info, void *secret) {
4708 void *trace[100];
4709 char **messages = NULL;
4710 int i, trace_size = 0;
4711 unsigned long offset=0;
4712 time_t uptime = time(NULL)-server.stat_starttime;
4713 ucontext_t *uc = (ucontext_t*) secret;
4714 REDIS_NOTUSED(info);
4715
4716 redisLog(REDIS_WARNING,
4717 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
4718 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
4719 "redis_version:%s; "
4720 "uptime_in_seconds:%d; "
4721 "connected_clients:%d; "
4722 "connected_slaves:%d; "
4723 "used_memory:%zu; "
4724 "changes_since_last_save:%lld; "
4725 "bgsave_in_progress:%d; "
4726 "last_save_time:%d; "
4727 "total_connections_received:%lld; "
4728 "total_commands_processed:%lld; "
4729 "role:%s;"
4730 ,REDIS_VERSION,
4731 uptime,
4732 listLength(server.clients)-listLength(server.slaves),
4733 listLength(server.slaves),
4734 server.usedmemory,
4735 server.dirty,
4736 server.bgsaveinprogress,
4737 server.lastsave,
4738 server.stat_numconnections,
4739 server.stat_numcommands,
4740 server.masterhost == NULL ? "master" : "slave"
4741 ));
4742
4743 trace_size = backtrace(trace, 100);
4744 /* overwrite sigaction with caller's address */
4745 if (getMcontextEip(uc) != NULL) {
4746 trace[1] = getMcontextEip(uc);
4747 }
4748 messages = backtrace_symbols(trace, trace_size);
4749
4750 for (i=1; i<trace_size; ++i) {
4751 char *fn = findFuncName(trace[i], &offset), *p;
4752
4753 p = strchr(messages[i],'+');
4754 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
4755 redisLog(REDIS_WARNING,"%s", messages[i]);
4756 } else {
4757 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
4758 }
4759 }
4760 free(messages);
4761 exit(0);
4762 }
4763
4764 static void setupSigSegvAction(void) {
4765 struct sigaction act;
4766
4767 sigemptyset (&act.sa_mask);
4768 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4769 * is used. Otherwise, sa_handler is used */
4770 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
4771 act.sa_sigaction = segvHandler;
4772 sigaction (SIGSEGV, &act, NULL);
4773 sigaction (SIGBUS, &act, NULL);
4774 sigaction (SIGFPE, &act, NULL);
4775 sigaction (SIGILL, &act, NULL);
4776 sigaction (SIGBUS, &act, NULL);
4777 return;
4778 }
4779 #else /* HAVE_BACKTRACE */
4780 static void setupSigSegvAction(void) {
4781 }
4782 #endif /* HAVE_BACKTRACE */
4783
4784 /* =================================== Main! ================================ */
4785
4786 #ifdef __linux__
4787 int linuxOvercommitMemoryValue(void) {
4788 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4789 char buf[64];
4790
4791 if (!fp) return -1;
4792 if (fgets(buf,64,fp) == NULL) {
4793 fclose(fp);
4794 return -1;
4795 }
4796 fclose(fp);
4797
4798 return atoi(buf);
4799 }
4800
4801 void linuxOvercommitMemoryWarning(void) {
4802 if (linuxOvercommitMemoryValue() == 0) {
4803 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
4804 }
4805 }
4806 #endif /* __linux__ */
4807
4808 static void daemonize(void) {
4809 int fd;
4810 FILE *fp;
4811
4812 if (fork() != 0) exit(0); /* parent exits */
4813 setsid(); /* create a new session */
4814
4815 /* Every output goes to /dev/null. If Redis is daemonized but
4816 * the 'logfile' is set to 'stdout' in the configuration file
4817 * it will not log at all. */
4818 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4819 dup2(fd, STDIN_FILENO);
4820 dup2(fd, STDOUT_FILENO);
4821 dup2(fd, STDERR_FILENO);
4822 if (fd > STDERR_FILENO) close(fd);
4823 }
4824 /* Try to write the pid file */
4825 fp = fopen(server.pidfile,"w");
4826 if (fp) {
4827 fprintf(fp,"%d\n",getpid());
4828 fclose(fp);
4829 }
4830 }
4831
4832 int main(int argc, char **argv) {
4833 initServerConfig();
4834 if (argc == 2) {
4835 ResetServerSaveParams();
4836 loadServerConfig(argv[1]);
4837 } else if (argc > 2) {
4838 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4839 exit(1);
4840 } else {
4841 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4842 }
4843 initServer();
4844 if (server.daemonize) daemonize();
4845 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4846 #ifdef __linux__
4847 linuxOvercommitMemoryWarning();
4848 #endif
4849 if (rdbLoad(server.dbfilename) == REDIS_OK)
4850 redisLog(REDIS_NOTICE,"DB loaded from disk");
4851 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4852 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4853 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4854 aeMain(server.el);
4855 aeDeleteEventLoop(server.el);
4856 return 0;
4857 }