]> git.saurik.com Git - redis.git/blob - redis.c
74986528c3573e690dfd158a852d506794ea5249
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.001"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60
61 #include "redis.h"
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
70
71 /* Error codes */
72 #define REDIS_OK 0
73 #define REDIS_ERR -1
74
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
88
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
91
92 /* Command flags */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
100
101 /* Object types */
102 #define REDIS_STRING 0
103 #define REDIS_LIST 1
104 #define REDIS_SET 2
105 #define REDIS_HASH 3
106
107 /* Objects encoding */
108 #define REDIS_ENCODING_RAW 0 /* Raw representation */
109 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
110
111 /* Object types only used for dumping to disk */
112 #define REDIS_EXPIRETIME 253
113 #define REDIS_SELECTDB 254
114 #define REDIS_EOF 255
115
116 /* Defines related to the dump file format. To store 32 bits lengths for short
117 * keys requires a lot of space, so we check the most significant 2 bits of
118 * the first byte to interpreter the length:
119 *
120 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
121 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
122 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
123 * 11|000000 this means: specially encoded object will follow. The six bits
124 * number specify the kind of object that follows.
125 * See the REDIS_RDB_ENC_* defines.
126 *
127 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
128 * values, will fit inside. */
129 #define REDIS_RDB_6BITLEN 0
130 #define REDIS_RDB_14BITLEN 1
131 #define REDIS_RDB_32BITLEN 2
132 #define REDIS_RDB_ENCVAL 3
133 #define REDIS_RDB_LENERR UINT_MAX
134
135 /* When a length of a string object stored on disk has the first two bits
136 * set, the remaining two bits specify a special encoding for the object
137 * accordingly to the following defines: */
138 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
139 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
140 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
141 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
142
143 /* Client flags */
144 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
145 #define REDIS_SLAVE 2 /* This client is a slave server */
146 #define REDIS_MASTER 4 /* This client is a master server */
147 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
148
149 /* Slave replication state - slave side */
150 #define REDIS_REPL_NONE 0 /* No active replication */
151 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
152 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
153
154 /* Slave replication state - from the point of view of master
155 * Note that in SEND_BULK and ONLINE state the slave receives new updates
156 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
157 * to start the next background saving in order to send updates to it. */
158 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
159 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
160 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
161 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
162
163 /* List related stuff */
164 #define REDIS_HEAD 0
165 #define REDIS_TAIL 1
166
167 /* Sort operations */
168 #define REDIS_SORT_GET 0
169 #define REDIS_SORT_DEL 1
170 #define REDIS_SORT_INCR 2
171 #define REDIS_SORT_DECR 3
172 #define REDIS_SORT_ASC 4
173 #define REDIS_SORT_DESC 5
174 #define REDIS_SORTKEY_MAX 1024
175
176 /* Log levels */
177 #define REDIS_DEBUG 0
178 #define REDIS_NOTICE 1
179 #define REDIS_WARNING 2
180
181 /* Anti-warning macro... */
182 #define REDIS_NOTUSED(V) ((void) V)
183
184
185 /*================================= Data types ============================== */
186
187 /* A redis object, that is a type able to hold a string / list / set */
188 typedef struct redisObject {
189 void *ptr;
190 unsigned char type;
191 unsigned char encoding;
192 unsigned char notused[2];
193 int refcount;
194 } robj;
195
196 typedef struct redisDb {
197 dict *dict;
198 dict *expires;
199 int id;
200 } redisDb;
201
202 /* With multiplexing we need to take per-clinet state.
203 * Clients are taken in a liked list. */
204 typedef struct redisClient {
205 int fd;
206 redisDb *db;
207 int dictid;
208 sds querybuf;
209 robj **argv, **mbargv;
210 int argc, mbargc;
211 int bulklen; /* bulk read len. -1 if not in bulk read mode */
212 int multibulk; /* multi bulk command format active */
213 list *reply;
214 int sentlen;
215 time_t lastinteraction; /* time of the last interaction, used for timeout */
216 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
217 int slaveseldb; /* slave selected db, if this client is a slave */
218 int authenticated; /* when requirepass is non-NULL */
219 int replstate; /* replication state if this is a slave */
220 int repldbfd; /* replication DB file descriptor */
221 long repldboff; /* replication DB file offset */
222 off_t repldbsize; /* replication DB file size */
223 } redisClient;
224
225 struct saveparam {
226 time_t seconds;
227 int changes;
228 };
229
230 /* Global server state structure */
231 struct redisServer {
232 int port;
233 int fd;
234 redisDb *db;
235 dict *sharingpool;
236 unsigned int sharingpoolsize;
237 long long dirty; /* changes to DB from the last save */
238 list *clients;
239 list *slaves, *monitors;
240 char neterr[ANET_ERR_LEN];
241 aeEventLoop *el;
242 int cronloops; /* number of times the cron function run */
243 list *objfreelist; /* A list of freed objects to avoid malloc() */
244 time_t lastsave; /* Unix time of last save succeeede */
245 size_t usedmemory; /* Used memory in megabytes */
246 /* Fields used only for stats */
247 time_t stat_starttime; /* server start time */
248 long long stat_numcommands; /* number of processed commands */
249 long long stat_numconnections; /* number of connections received */
250 /* Configuration */
251 int verbosity;
252 int glueoutputbuf;
253 int maxidletime;
254 int dbnum;
255 int daemonize;
256 char *pidfile;
257 int bgsaveinprogress;
258 pid_t bgsavechildpid;
259 struct saveparam *saveparams;
260 int saveparamslen;
261 char *logfile;
262 char *bindaddr;
263 char *dbfilename;
264 char *requirepass;
265 int shareobjects;
266 /* Replication related */
267 int isslave;
268 char *masterhost;
269 int masterport;
270 redisClient *master; /* client that is master for this slave */
271 int replstate;
272 unsigned int maxclients;
273 unsigned long maxmemory;
274 /* Sort parameters - qsort_r() is only available under BSD so we
275 * have to take this state global, in order to pass it to sortCompare() */
276 int sort_desc;
277 int sort_alpha;
278 int sort_bypattern;
279 };
280
281 typedef void redisCommandProc(redisClient *c);
282 struct redisCommand {
283 char *name;
284 redisCommandProc *proc;
285 int arity;
286 int flags;
287 };
288
289 struct redisFunctionSym {
290 char *name;
291 unsigned long pointer;
292 };
293
294 typedef struct _redisSortObject {
295 robj *obj;
296 union {
297 double score;
298 robj *cmpobj;
299 } u;
300 } redisSortObject;
301
302 typedef struct _redisSortOperation {
303 int type;
304 robj *pattern;
305 } redisSortOperation;
306
307 struct sharedObjectsStruct {
308 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
309 *colon, *nullbulk, *nullmultibulk,
310 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
311 *outofrangeerr, *plus,
312 *select0, *select1, *select2, *select3, *select4,
313 *select5, *select6, *select7, *select8, *select9;
314 } shared;
315
316 /*================================ Prototypes =============================== */
317
318 static void freeStringObject(robj *o);
319 static void freeListObject(robj *o);
320 static void freeSetObject(robj *o);
321 static void decrRefCount(void *o);
322 static robj *createObject(int type, void *ptr);
323 static void freeClient(redisClient *c);
324 static int rdbLoad(char *filename);
325 static void addReply(redisClient *c, robj *obj);
326 static void addReplySds(redisClient *c, sds s);
327 static void incrRefCount(robj *o);
328 static int rdbSaveBackground(char *filename);
329 static robj *createStringObject(char *ptr, size_t len);
330 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
331 static int syncWithMaster(void);
332 static robj *tryObjectSharing(robj *o);
333 static int tryObjectEncoding(robj *o);
334 static robj *getDecodedObject(const robj *o);
335 static int removeExpire(redisDb *db, robj *key);
336 static int expireIfNeeded(redisDb *db, robj *key);
337 static int deleteIfVolatile(redisDb *db, robj *key);
338 static int deleteKey(redisDb *db, robj *key);
339 static time_t getExpire(redisDb *db, robj *key);
340 static int setExpire(redisDb *db, robj *key, time_t when);
341 static void updateSlavesWaitingBgsave(int bgsaveerr);
342 static void freeMemoryIfNeeded(void);
343 static int processCommand(redisClient *c);
344 static void setupSigSegvAction(void);
345 static void rdbRemoveTempFile(pid_t childpid);
346 static size_t stringObjectLen(robj *o);
347 static void processInputBuffer(redisClient *c);
348
349 static void authCommand(redisClient *c);
350 static void pingCommand(redisClient *c);
351 static void echoCommand(redisClient *c);
352 static void setCommand(redisClient *c);
353 static void setnxCommand(redisClient *c);
354 static void getCommand(redisClient *c);
355 static void delCommand(redisClient *c);
356 static void existsCommand(redisClient *c);
357 static void incrCommand(redisClient *c);
358 static void decrCommand(redisClient *c);
359 static void incrbyCommand(redisClient *c);
360 static void decrbyCommand(redisClient *c);
361 static void selectCommand(redisClient *c);
362 static void randomkeyCommand(redisClient *c);
363 static void keysCommand(redisClient *c);
364 static void dbsizeCommand(redisClient *c);
365 static void lastsaveCommand(redisClient *c);
366 static void saveCommand(redisClient *c);
367 static void bgsaveCommand(redisClient *c);
368 static void shutdownCommand(redisClient *c);
369 static void moveCommand(redisClient *c);
370 static void renameCommand(redisClient *c);
371 static void renamenxCommand(redisClient *c);
372 static void lpushCommand(redisClient *c);
373 static void rpushCommand(redisClient *c);
374 static void lpopCommand(redisClient *c);
375 static void rpopCommand(redisClient *c);
376 static void llenCommand(redisClient *c);
377 static void lindexCommand(redisClient *c);
378 static void lrangeCommand(redisClient *c);
379 static void ltrimCommand(redisClient *c);
380 static void typeCommand(redisClient *c);
381 static void lsetCommand(redisClient *c);
382 static void saddCommand(redisClient *c);
383 static void sremCommand(redisClient *c);
384 static void smoveCommand(redisClient *c);
385 static void sismemberCommand(redisClient *c);
386 static void scardCommand(redisClient *c);
387 static void spopCommand(redisClient *c);
388 static void sinterCommand(redisClient *c);
389 static void sinterstoreCommand(redisClient *c);
390 static void sunionCommand(redisClient *c);
391 static void sunionstoreCommand(redisClient *c);
392 static void sdiffCommand(redisClient *c);
393 static void sdiffstoreCommand(redisClient *c);
394 static void syncCommand(redisClient *c);
395 static void flushdbCommand(redisClient *c);
396 static void flushallCommand(redisClient *c);
397 static void sortCommand(redisClient *c);
398 static void lremCommand(redisClient *c);
399 static void infoCommand(redisClient *c);
400 static void mgetCommand(redisClient *c);
401 static void monitorCommand(redisClient *c);
402 static void expireCommand(redisClient *c);
403 static void getsetCommand(redisClient *c);
404 static void ttlCommand(redisClient *c);
405 static void slaveofCommand(redisClient *c);
406 static void debugCommand(redisClient *c);
407 static void msetCommand(redisClient *c);
408 static void msetnxCommand(redisClient *c);
409
410 /*================================= Globals ================================= */
411
412 /* Global vars */
413 static struct redisServer server; /* server global state */
414 static struct redisCommand cmdTable[] = {
415 {"get",getCommand,2,REDIS_CMD_INLINE},
416 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
417 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
418 {"del",delCommand,-2,REDIS_CMD_INLINE},
419 {"exists",existsCommand,2,REDIS_CMD_INLINE},
420 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
421 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
422 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
423 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
424 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
425 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
426 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
427 {"llen",llenCommand,2,REDIS_CMD_INLINE},
428 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
429 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
430 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
431 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
432 {"lrem",lremCommand,4,REDIS_CMD_BULK},
433 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
434 {"srem",sremCommand,3,REDIS_CMD_BULK},
435 {"smove",smoveCommand,4,REDIS_CMD_BULK},
436 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
437 {"scard",scardCommand,2,REDIS_CMD_INLINE},
438 {"spop",spopCommand,2,REDIS_CMD_INLINE},
439 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
440 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
441 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
442 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
443 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
444 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
445 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
446 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
447 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
448 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
449 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
450 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
451 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
452 {"select",selectCommand,2,REDIS_CMD_INLINE},
453 {"move",moveCommand,3,REDIS_CMD_INLINE},
454 {"rename",renameCommand,3,REDIS_CMD_INLINE},
455 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
456 {"expire",expireCommand,3,REDIS_CMD_INLINE},
457 {"keys",keysCommand,2,REDIS_CMD_INLINE},
458 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
459 {"auth",authCommand,2,REDIS_CMD_INLINE},
460 {"ping",pingCommand,1,REDIS_CMD_INLINE},
461 {"echo",echoCommand,2,REDIS_CMD_BULK},
462 {"save",saveCommand,1,REDIS_CMD_INLINE},
463 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
464 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
465 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
466 {"type",typeCommand,2,REDIS_CMD_INLINE},
467 {"sync",syncCommand,1,REDIS_CMD_INLINE},
468 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
469 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
470 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
471 {"info",infoCommand,1,REDIS_CMD_INLINE},
472 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
473 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
474 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
475 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
476 {NULL,NULL,0,0}
477 };
478 /*============================ Utility functions ============================ */
479
480 /* Glob-style pattern matching. */
481 int stringmatchlen(const char *pattern, int patternLen,
482 const char *string, int stringLen, int nocase)
483 {
484 while(patternLen) {
485 switch(pattern[0]) {
486 case '*':
487 while (pattern[1] == '*') {
488 pattern++;
489 patternLen--;
490 }
491 if (patternLen == 1)
492 return 1; /* match */
493 while(stringLen) {
494 if (stringmatchlen(pattern+1, patternLen-1,
495 string, stringLen, nocase))
496 return 1; /* match */
497 string++;
498 stringLen--;
499 }
500 return 0; /* no match */
501 break;
502 case '?':
503 if (stringLen == 0)
504 return 0; /* no match */
505 string++;
506 stringLen--;
507 break;
508 case '[':
509 {
510 int not, match;
511
512 pattern++;
513 patternLen--;
514 not = pattern[0] == '^';
515 if (not) {
516 pattern++;
517 patternLen--;
518 }
519 match = 0;
520 while(1) {
521 if (pattern[0] == '\\') {
522 pattern++;
523 patternLen--;
524 if (pattern[0] == string[0])
525 match = 1;
526 } else if (pattern[0] == ']') {
527 break;
528 } else if (patternLen == 0) {
529 pattern--;
530 patternLen++;
531 break;
532 } else if (pattern[1] == '-' && patternLen >= 3) {
533 int start = pattern[0];
534 int end = pattern[2];
535 int c = string[0];
536 if (start > end) {
537 int t = start;
538 start = end;
539 end = t;
540 }
541 if (nocase) {
542 start = tolower(start);
543 end = tolower(end);
544 c = tolower(c);
545 }
546 pattern += 2;
547 patternLen -= 2;
548 if (c >= start && c <= end)
549 match = 1;
550 } else {
551 if (!nocase) {
552 if (pattern[0] == string[0])
553 match = 1;
554 } else {
555 if (tolower((int)pattern[0]) == tolower((int)string[0]))
556 match = 1;
557 }
558 }
559 pattern++;
560 patternLen--;
561 }
562 if (not)
563 match = !match;
564 if (!match)
565 return 0; /* no match */
566 string++;
567 stringLen--;
568 break;
569 }
570 case '\\':
571 if (patternLen >= 2) {
572 pattern++;
573 patternLen--;
574 }
575 /* fall through */
576 default:
577 if (!nocase) {
578 if (pattern[0] != string[0])
579 return 0; /* no match */
580 } else {
581 if (tolower((int)pattern[0]) != tolower((int)string[0]))
582 return 0; /* no match */
583 }
584 string++;
585 stringLen--;
586 break;
587 }
588 pattern++;
589 patternLen--;
590 if (stringLen == 0) {
591 while(*pattern == '*') {
592 pattern++;
593 patternLen--;
594 }
595 break;
596 }
597 }
598 if (patternLen == 0 && stringLen == 0)
599 return 1;
600 return 0;
601 }
602
603 static void redisLog(int level, const char *fmt, ...) {
604 va_list ap;
605 FILE *fp;
606
607 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
608 if (!fp) return;
609
610 va_start(ap, fmt);
611 if (level >= server.verbosity) {
612 char *c = ".-*";
613 char buf[64];
614 time_t now;
615
616 now = time(NULL);
617 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
618 fprintf(fp,"%s %c ",buf,c[level]);
619 vfprintf(fp, fmt, ap);
620 fprintf(fp,"\n");
621 fflush(fp);
622 }
623 va_end(ap);
624
625 if (server.logfile) fclose(fp);
626 }
627
628 /*====================== Hash table type implementation ==================== */
629
630 /* This is an hash table type that uses the SDS dynamic strings libary as
631 * keys and radis objects as values (objects can hold SDS strings,
632 * lists, sets). */
633
634 static int sdsDictKeyCompare(void *privdata, const void *key1,
635 const void *key2)
636 {
637 int l1,l2;
638 DICT_NOTUSED(privdata);
639
640 l1 = sdslen((sds)key1);
641 l2 = sdslen((sds)key2);
642 if (l1 != l2) return 0;
643 return memcmp(key1, key2, l1) == 0;
644 }
645
646 static void dictRedisObjectDestructor(void *privdata, void *val)
647 {
648 DICT_NOTUSED(privdata);
649
650 decrRefCount(val);
651 }
652
653 static int dictObjKeyCompare(void *privdata, const void *key1,
654 const void *key2)
655 {
656 const robj *o1 = key1, *o2 = key2;
657 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
658 }
659
660 static unsigned int dictObjHash(const void *key) {
661 const robj *o = key;
662 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
663 }
664
665 static int dictEncObjKeyCompare(void *privdata, const void *key1,
666 const void *key2)
667 {
668 const robj *o1 = key1, *o2 = key2;
669
670 if (o1->encoding == REDIS_ENCODING_RAW &&
671 o2->encoding == REDIS_ENCODING_RAW)
672 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
673 else {
674 robj *dec1, *dec2;
675 int cmp;
676
677 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
678 getDecodedObject(o1) : (robj*)o1;
679 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
680 getDecodedObject(o2) : (robj*)o2;
681 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
682 if (dec1 != o1) decrRefCount(dec1);
683 if (dec2 != o2) decrRefCount(dec2);
684 return cmp;
685 }
686 }
687
688 static unsigned int dictEncObjHash(const void *key) {
689 const robj *o = key;
690
691 if (o->encoding == REDIS_ENCODING_RAW)
692 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
693 else {
694 robj *dec = getDecodedObject(o);
695 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
696 decrRefCount(dec);
697 return hash;
698 }
699 }
700
701 static dictType setDictType = {
702 dictEncObjHash, /* hash function */
703 NULL, /* key dup */
704 NULL, /* val dup */
705 dictEncObjKeyCompare, /* key compare */
706 dictRedisObjectDestructor, /* key destructor */
707 NULL /* val destructor */
708 };
709
710 static dictType hashDictType = {
711 dictObjHash, /* hash function */
712 NULL, /* key dup */
713 NULL, /* val dup */
714 dictObjKeyCompare, /* key compare */
715 dictRedisObjectDestructor, /* key destructor */
716 dictRedisObjectDestructor /* val destructor */
717 };
718
719 /* ========================= Random utility functions ======================= */
720
721 /* Redis generally does not try to recover from out of memory conditions
722 * when allocating objects or strings, it is not clear if it will be possible
723 * to report this condition to the client since the networking layer itself
724 * is based on heap allocation for send buffers, so we simply abort.
725 * At least the code will be simpler to read... */
726 static void oom(const char *msg) {
727 fprintf(stderr, "%s: Out of memory\n",msg);
728 fflush(stderr);
729 sleep(1);
730 abort();
731 }
732
733 /* ====================== Redis server networking stuff ===================== */
734 static void closeTimedoutClients(void) {
735 redisClient *c;
736 listNode *ln;
737 time_t now = time(NULL);
738
739 listRewind(server.clients);
740 while ((ln = listYield(server.clients)) != NULL) {
741 c = listNodeValue(ln);
742 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
743 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
744 (now - c->lastinteraction > server.maxidletime)) {
745 redisLog(REDIS_DEBUG,"Closing idle client");
746 freeClient(c);
747 }
748 }
749 }
750
751 static int htNeedsResize(dict *dict) {
752 long long size, used;
753
754 size = dictSlots(dict);
755 used = dictSize(dict);
756 return (size && used && size > DICT_HT_INITIAL_SIZE &&
757 (used*100/size < REDIS_HT_MINFILL));
758 }
759
760 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
761 * we resize the hash table to save memory */
762 static void tryResizeHashTables(void) {
763 int j;
764
765 for (j = 0; j < server.dbnum; j++) {
766 if (htNeedsResize(server.db[j].dict)) {
767 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
768 dictResize(server.db[j].dict);
769 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
770 }
771 if (htNeedsResize(server.db[j].expires))
772 dictResize(server.db[j].expires);
773 }
774 }
775
776 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
777 int j, loops = server.cronloops++;
778 REDIS_NOTUSED(eventLoop);
779 REDIS_NOTUSED(id);
780 REDIS_NOTUSED(clientData);
781
782 /* Update the global state with the amount of used memory */
783 server.usedmemory = zmalloc_used_memory();
784
785 /* Show some info about non-empty databases */
786 for (j = 0; j < server.dbnum; j++) {
787 long long size, used, vkeys;
788
789 size = dictSlots(server.db[j].dict);
790 used = dictSize(server.db[j].dict);
791 vkeys = dictSize(server.db[j].expires);
792 if (!(loops % 5) && (used || vkeys)) {
793 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
794 /* dictPrintStats(server.dict); */
795 }
796 }
797
798 /* We don't want to resize the hash tables while a bacground saving
799 * is in progress: the saving child is created using fork() that is
800 * implemented with a copy-on-write semantic in most modern systems, so
801 * if we resize the HT while there is the saving child at work actually
802 * a lot of memory movements in the parent will cause a lot of pages
803 * copied. */
804 if (!server.bgsaveinprogress) tryResizeHashTables();
805
806 /* Show information about connected clients */
807 if (!(loops % 5)) {
808 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
809 listLength(server.clients)-listLength(server.slaves),
810 listLength(server.slaves),
811 server.usedmemory,
812 dictSize(server.sharingpool));
813 }
814
815 /* Close connections of timedout clients */
816 if (server.maxidletime && !(loops % 10))
817 closeTimedoutClients();
818
819 /* Check if a background saving in progress terminated */
820 if (server.bgsaveinprogress) {
821 int statloc;
822 if (wait4(-1,&statloc,WNOHANG,NULL)) {
823 int exitcode = WEXITSTATUS(statloc);
824 int bysignal = WIFSIGNALED(statloc);
825
826 if (!bysignal && exitcode == 0) {
827 redisLog(REDIS_NOTICE,
828 "Background saving terminated with success");
829 server.dirty = 0;
830 server.lastsave = time(NULL);
831 } else if (!bysignal && exitcode != 0) {
832 redisLog(REDIS_WARNING, "Background saving error");
833 } else {
834 redisLog(REDIS_WARNING,
835 "Background saving terminated by signal");
836 rdbRemoveTempFile(server.bgsavechildpid);
837 }
838 server.bgsaveinprogress = 0;
839 server.bgsavechildpid = -1;
840 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
841 }
842 } else {
843 /* If there is not a background saving in progress check if
844 * we have to save now */
845 time_t now = time(NULL);
846 for (j = 0; j < server.saveparamslen; j++) {
847 struct saveparam *sp = server.saveparams+j;
848
849 if (server.dirty >= sp->changes &&
850 now-server.lastsave > sp->seconds) {
851 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
852 sp->changes, sp->seconds);
853 rdbSaveBackground(server.dbfilename);
854 break;
855 }
856 }
857 }
858
859 /* Try to expire a few timed out keys */
860 for (j = 0; j < server.dbnum; j++) {
861 redisDb *db = server.db+j;
862 int num = dictSize(db->expires);
863
864 if (num) {
865 time_t now = time(NULL);
866
867 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
868 num = REDIS_EXPIRELOOKUPS_PER_CRON;
869 while (num--) {
870 dictEntry *de;
871 time_t t;
872
873 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
874 t = (time_t) dictGetEntryVal(de);
875 if (now > t) {
876 deleteKey(db,dictGetEntryKey(de));
877 }
878 }
879 }
880 }
881
882 /* Check if we should connect to a MASTER */
883 if (server.replstate == REDIS_REPL_CONNECT) {
884 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
885 if (syncWithMaster() == REDIS_OK) {
886 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
887 }
888 }
889 return 1000;
890 }
891
892 static void createSharedObjects(void) {
893 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
894 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
895 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
896 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
897 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
898 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
899 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
900 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
901 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
902 /* no such key */
903 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
904 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
905 "-ERR Operation against a key holding the wrong kind of value\r\n"));
906 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
907 "-ERR no such key\r\n"));
908 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
909 "-ERR syntax error\r\n"));
910 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
911 "-ERR source and destination objects are the same\r\n"));
912 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
913 "-ERR index out of range\r\n"));
914 shared.space = createObject(REDIS_STRING,sdsnew(" "));
915 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
916 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
917 shared.select0 = createStringObject("select 0\r\n",10);
918 shared.select1 = createStringObject("select 1\r\n",10);
919 shared.select2 = createStringObject("select 2\r\n",10);
920 shared.select3 = createStringObject("select 3\r\n",10);
921 shared.select4 = createStringObject("select 4\r\n",10);
922 shared.select5 = createStringObject("select 5\r\n",10);
923 shared.select6 = createStringObject("select 6\r\n",10);
924 shared.select7 = createStringObject("select 7\r\n",10);
925 shared.select8 = createStringObject("select 8\r\n",10);
926 shared.select9 = createStringObject("select 9\r\n",10);
927 }
928
929 static void appendServerSaveParams(time_t seconds, int changes) {
930 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
931 if (server.saveparams == NULL) oom("appendServerSaveParams");
932 server.saveparams[server.saveparamslen].seconds = seconds;
933 server.saveparams[server.saveparamslen].changes = changes;
934 server.saveparamslen++;
935 }
936
937 static void ResetServerSaveParams() {
938 zfree(server.saveparams);
939 server.saveparams = NULL;
940 server.saveparamslen = 0;
941 }
942
943 static void initServerConfig() {
944 server.dbnum = REDIS_DEFAULT_DBNUM;
945 server.port = REDIS_SERVERPORT;
946 server.verbosity = REDIS_DEBUG;
947 server.maxidletime = REDIS_MAXIDLETIME;
948 server.saveparams = NULL;
949 server.logfile = NULL; /* NULL = log on standard output */
950 server.bindaddr = NULL;
951 server.glueoutputbuf = 1;
952 server.daemonize = 0;
953 server.pidfile = "/var/run/redis.pid";
954 server.dbfilename = "dump.rdb";
955 server.requirepass = NULL;
956 server.shareobjects = 0;
957 server.sharingpoolsize = 1024;
958 server.maxclients = 0;
959 server.maxmemory = 0;
960 ResetServerSaveParams();
961
962 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
963 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
964 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
965 /* Replication related */
966 server.isslave = 0;
967 server.masterhost = NULL;
968 server.masterport = 6379;
969 server.master = NULL;
970 server.replstate = REDIS_REPL_NONE;
971 }
972
973 static void initServer() {
974 int j;
975
976 signal(SIGHUP, SIG_IGN);
977 signal(SIGPIPE, SIG_IGN);
978 setupSigSegvAction();
979
980 server.clients = listCreate();
981 server.slaves = listCreate();
982 server.monitors = listCreate();
983 server.objfreelist = listCreate();
984 createSharedObjects();
985 server.el = aeCreateEventLoop();
986 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
987 server.sharingpool = dictCreate(&setDictType,NULL);
988 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
989 oom("server initialization"); /* Fatal OOM */
990 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
991 if (server.fd == -1) {
992 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
993 exit(1);
994 }
995 for (j = 0; j < server.dbnum; j++) {
996 server.db[j].dict = dictCreate(&hashDictType,NULL);
997 server.db[j].expires = dictCreate(&setDictType,NULL);
998 server.db[j].id = j;
999 }
1000 server.cronloops = 0;
1001 server.bgsaveinprogress = 0;
1002 server.bgsavechildpid = -1;
1003 server.lastsave = time(NULL);
1004 server.dirty = 0;
1005 server.usedmemory = 0;
1006 server.stat_numcommands = 0;
1007 server.stat_numconnections = 0;
1008 server.stat_starttime = time(NULL);
1009 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1010 }
1011
1012 /* Empty the whole database */
1013 static long long emptyDb() {
1014 int j;
1015 long long removed = 0;
1016
1017 for (j = 0; j < server.dbnum; j++) {
1018 removed += dictSize(server.db[j].dict);
1019 dictEmpty(server.db[j].dict);
1020 dictEmpty(server.db[j].expires);
1021 }
1022 return removed;
1023 }
1024
1025 static int yesnotoi(char *s) {
1026 if (!strcasecmp(s,"yes")) return 1;
1027 else if (!strcasecmp(s,"no")) return 0;
1028 else return -1;
1029 }
1030
1031 /* I agree, this is a very rudimental way to load a configuration...
1032 will improve later if the config gets more complex */
1033 static void loadServerConfig(char *filename) {
1034 FILE *fp;
1035 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1036 int linenum = 0;
1037 sds line = NULL;
1038
1039 if (filename[0] == '-' && filename[1] == '\0')
1040 fp = stdin;
1041 else {
1042 if ((fp = fopen(filename,"r")) == NULL) {
1043 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1044 exit(1);
1045 }
1046 }
1047
1048 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1049 sds *argv;
1050 int argc, j;
1051
1052 linenum++;
1053 line = sdsnew(buf);
1054 line = sdstrim(line," \t\r\n");
1055
1056 /* Skip comments and blank lines*/
1057 if (line[0] == '#' || line[0] == '\0') {
1058 sdsfree(line);
1059 continue;
1060 }
1061
1062 /* Split into arguments */
1063 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1064 sdstolower(argv[0]);
1065
1066 /* Execute config directives */
1067 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1068 server.maxidletime = atoi(argv[1]);
1069 if (server.maxidletime < 0) {
1070 err = "Invalid timeout value"; goto loaderr;
1071 }
1072 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1073 server.port = atoi(argv[1]);
1074 if (server.port < 1 || server.port > 65535) {
1075 err = "Invalid port"; goto loaderr;
1076 }
1077 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1078 server.bindaddr = zstrdup(argv[1]);
1079 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1080 int seconds = atoi(argv[1]);
1081 int changes = atoi(argv[2]);
1082 if (seconds < 1 || changes < 0) {
1083 err = "Invalid save parameters"; goto loaderr;
1084 }
1085 appendServerSaveParams(seconds,changes);
1086 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1087 if (chdir(argv[1]) == -1) {
1088 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1089 argv[1], strerror(errno));
1090 exit(1);
1091 }
1092 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1093 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1094 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1095 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1096 else {
1097 err = "Invalid log level. Must be one of debug, notice, warning";
1098 goto loaderr;
1099 }
1100 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1101 FILE *logfp;
1102
1103 server.logfile = zstrdup(argv[1]);
1104 if (!strcasecmp(server.logfile,"stdout")) {
1105 zfree(server.logfile);
1106 server.logfile = NULL;
1107 }
1108 if (server.logfile) {
1109 /* Test if we are able to open the file. The server will not
1110 * be able to abort just for this problem later... */
1111 logfp = fopen(server.logfile,"a");
1112 if (logfp == NULL) {
1113 err = sdscatprintf(sdsempty(),
1114 "Can't open the log file: %s", strerror(errno));
1115 goto loaderr;
1116 }
1117 fclose(logfp);
1118 }
1119 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1120 server.dbnum = atoi(argv[1]);
1121 if (server.dbnum < 1) {
1122 err = "Invalid number of databases"; goto loaderr;
1123 }
1124 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1125 server.maxclients = atoi(argv[1]);
1126 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1127 server.maxmemory = strtoll(argv[1], NULL, 10);
1128 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1129 server.masterhost = sdsnew(argv[1]);
1130 server.masterport = atoi(argv[2]);
1131 server.replstate = REDIS_REPL_CONNECT;
1132 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1133 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1134 err = "argument must be 'yes' or 'no'"; goto loaderr;
1135 }
1136 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1137 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1138 err = "argument must be 'yes' or 'no'"; goto loaderr;
1139 }
1140 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1141 server.sharingpoolsize = atoi(argv[1]);
1142 if (server.sharingpoolsize < 1) {
1143 err = "invalid object sharing pool size"; goto loaderr;
1144 }
1145 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1146 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1147 err = "argument must be 'yes' or 'no'"; goto loaderr;
1148 }
1149 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1150 server.requirepass = zstrdup(argv[1]);
1151 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1152 server.pidfile = zstrdup(argv[1]);
1153 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1154 server.dbfilename = zstrdup(argv[1]);
1155 } else {
1156 err = "Bad directive or wrong number of arguments"; goto loaderr;
1157 }
1158 for (j = 0; j < argc; j++)
1159 sdsfree(argv[j]);
1160 zfree(argv);
1161 sdsfree(line);
1162 }
1163 if (fp != stdin) fclose(fp);
1164 return;
1165
1166 loaderr:
1167 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1168 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1169 fprintf(stderr, ">>> '%s'\n", line);
1170 fprintf(stderr, "%s\n", err);
1171 exit(1);
1172 }
1173
1174 static void freeClientArgv(redisClient *c) {
1175 int j;
1176
1177 for (j = 0; j < c->argc; j++)
1178 decrRefCount(c->argv[j]);
1179 for (j = 0; j < c->mbargc; j++)
1180 decrRefCount(c->mbargv[j]);
1181 c->argc = 0;
1182 c->mbargc = 0;
1183 }
1184
1185 static void freeClient(redisClient *c) {
1186 listNode *ln;
1187
1188 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1189 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1190 sdsfree(c->querybuf);
1191 listRelease(c->reply);
1192 freeClientArgv(c);
1193 close(c->fd);
1194 ln = listSearchKey(server.clients,c);
1195 assert(ln != NULL);
1196 listDelNode(server.clients,ln);
1197 if (c->flags & REDIS_SLAVE) {
1198 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1199 close(c->repldbfd);
1200 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1201 ln = listSearchKey(l,c);
1202 assert(ln != NULL);
1203 listDelNode(l,ln);
1204 }
1205 if (c->flags & REDIS_MASTER) {
1206 server.master = NULL;
1207 server.replstate = REDIS_REPL_CONNECT;
1208 }
1209 zfree(c->argv);
1210 zfree(c->mbargv);
1211 zfree(c);
1212 }
1213
1214 static void glueReplyBuffersIfNeeded(redisClient *c) {
1215 int totlen = 0;
1216 listNode *ln;
1217 robj *o;
1218
1219 listRewind(c->reply);
1220 while((ln = listYield(c->reply))) {
1221 o = ln->value;
1222 totlen += sdslen(o->ptr);
1223 /* This optimization makes more sense if we don't have to copy
1224 * too much data */
1225 if (totlen > 1024) return;
1226 }
1227 if (totlen > 0) {
1228 char buf[1024];
1229 int copylen = 0;
1230
1231 listRewind(c->reply);
1232 while((ln = listYield(c->reply))) {
1233 o = ln->value;
1234 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1235 copylen += sdslen(o->ptr);
1236 listDelNode(c->reply,ln);
1237 }
1238 /* Now the output buffer is empty, add the new single element */
1239 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1240 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1241 }
1242 }
1243
1244 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1245 redisClient *c = privdata;
1246 int nwritten = 0, totwritten = 0, objlen;
1247 robj *o;
1248 REDIS_NOTUSED(el);
1249 REDIS_NOTUSED(mask);
1250
1251 if (server.glueoutputbuf && listLength(c->reply) > 1)
1252 glueReplyBuffersIfNeeded(c);
1253 while(listLength(c->reply)) {
1254 o = listNodeValue(listFirst(c->reply));
1255 objlen = sdslen(o->ptr);
1256
1257 if (objlen == 0) {
1258 listDelNode(c->reply,listFirst(c->reply));
1259 continue;
1260 }
1261
1262 if (c->flags & REDIS_MASTER) {
1263 /* Don't reply to a master */
1264 nwritten = objlen - c->sentlen;
1265 } else {
1266 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1267 if (nwritten <= 0) break;
1268 }
1269 c->sentlen += nwritten;
1270 totwritten += nwritten;
1271 /* If we fully sent the object on head go to the next one */
1272 if (c->sentlen == objlen) {
1273 listDelNode(c->reply,listFirst(c->reply));
1274 c->sentlen = 0;
1275 }
1276 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1277 * bytes, in a single threaded server it's a good idea to server
1278 * other clients as well, even if a very large request comes from
1279 * super fast link that is always able to accept data (in real world
1280 * terms think to 'KEYS *' against the loopback interfae) */
1281 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1282 }
1283 if (nwritten == -1) {
1284 if (errno == EAGAIN) {
1285 nwritten = 0;
1286 } else {
1287 redisLog(REDIS_DEBUG,
1288 "Error writing to client: %s", strerror(errno));
1289 freeClient(c);
1290 return;
1291 }
1292 }
1293 if (totwritten > 0) c->lastinteraction = time(NULL);
1294 if (listLength(c->reply) == 0) {
1295 c->sentlen = 0;
1296 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1297 }
1298 }
1299
1300 static struct redisCommand *lookupCommand(char *name) {
1301 int j = 0;
1302 while(cmdTable[j].name != NULL) {
1303 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1304 j++;
1305 }
1306 return NULL;
1307 }
1308
1309 /* resetClient prepare the client to process the next command */
1310 static void resetClient(redisClient *c) {
1311 freeClientArgv(c);
1312 c->bulklen = -1;
1313 c->multibulk = 0;
1314 }
1315
1316 /* If this function gets called we already read a whole
1317 * command, argments are in the client argv/argc fields.
1318 * processCommand() execute the command or prepare the
1319 * server for a bulk read from the client.
1320 *
1321 * If 1 is returned the client is still alive and valid and
1322 * and other operations can be performed by the caller. Otherwise
1323 * if 0 is returned the client was destroied (i.e. after QUIT). */
1324 static int processCommand(redisClient *c) {
1325 struct redisCommand *cmd;
1326 long long dirty;
1327
1328 /* Free some memory if needed (maxmemory setting) */
1329 if (server.maxmemory) freeMemoryIfNeeded();
1330
1331 /* Handle the multi bulk command type. This is an alternative protocol
1332 * supported by Redis in order to receive commands that are composed of
1333 * multiple binary-safe "bulk" arguments. The latency of processing is
1334 * a bit higher but this allows things like multi-sets, so if this
1335 * protocol is used only for MSET and similar commands this is a big win. */
1336 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1337 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1338 if (c->multibulk <= 0) {
1339 resetClient(c);
1340 return 1;
1341 } else {
1342 decrRefCount(c->argv[c->argc-1]);
1343 c->argc--;
1344 return 1;
1345 }
1346 } else if (c->multibulk) {
1347 if (c->bulklen == -1) {
1348 if (((char*)c->argv[0]->ptr)[0] != '$') {
1349 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1350 resetClient(c);
1351 return 1;
1352 } else {
1353 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1354 decrRefCount(c->argv[0]);
1355 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1356 c->argc--;
1357 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1358 resetClient(c);
1359 return 1;
1360 }
1361 c->argc--;
1362 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1363 /*
1364 if (sdslen(c->querybuf) > 0)
1365 processInputBuffer(c);
1366 */
1367 return 1;
1368 }
1369 } else {
1370 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1371 c->mbargv[c->mbargc] = c->argv[0];
1372 c->mbargc++;
1373 c->argc--;
1374 c->multibulk--;
1375 if (c->multibulk == 0) {
1376 robj **auxargv;
1377 int auxargc;
1378
1379 /* Here we need to swap the multi-bulk argc/argv with the
1380 * normal argc/argv of the client structure. */
1381 auxargv = c->argv;
1382 c->argv = c->mbargv;
1383 c->mbargv = auxargv;
1384
1385 auxargc = c->argc;
1386 c->argc = c->mbargc;
1387 c->mbargc = auxargc;
1388
1389 /* We need to set bulklen to something different than -1
1390 * in order for the code below to process the command without
1391 * to try to read the last argument of a bulk command as
1392 * a special argument. */
1393 c->bulklen = 0;
1394 /* continue below and process the command */
1395 } else {
1396 c->bulklen = -1;
1397 /*
1398 if (sdslen(c->querybuf) > 0)
1399 processInputBuffer(c);
1400 */
1401 return 1;
1402 }
1403 }
1404 }
1405 /* -- end of multi bulk commands processing -- */
1406
1407 /* The QUIT command is handled as a special case. Normal command
1408 * procs are unable to close the client connection safely */
1409 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1410 freeClient(c);
1411 return 0;
1412 }
1413 cmd = lookupCommand(c->argv[0]->ptr);
1414 if (!cmd) {
1415 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1416 resetClient(c);
1417 return 1;
1418 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1419 (c->argc < -cmd->arity)) {
1420 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1421 resetClient(c);
1422 return 1;
1423 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1424 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1425 resetClient(c);
1426 return 1;
1427 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1428 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1429
1430 decrRefCount(c->argv[c->argc-1]);
1431 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1432 c->argc--;
1433 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1434 resetClient(c);
1435 return 1;
1436 }
1437 c->argc--;
1438 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1439 /* It is possible that the bulk read is already in the
1440 * buffer. Check this condition and handle it accordingly */
1441 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1442 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1443 c->argc++;
1444 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1445 } else {
1446 return 1;
1447 }
1448 }
1449 /* Let's try to share objects on the command arguments vector */
1450 if (server.shareobjects) {
1451 int j;
1452 for(j = 1; j < c->argc; j++)
1453 c->argv[j] = tryObjectSharing(c->argv[j]);
1454 }
1455 /* Let's try to encode the bulk object to save space. */
1456 if (cmd->flags & REDIS_CMD_BULK)
1457 tryObjectEncoding(c->argv[c->argc-1]);
1458
1459 /* Check if the user is authenticated */
1460 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1461 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1462 resetClient(c);
1463 return 1;
1464 }
1465
1466 /* Exec the command */
1467 dirty = server.dirty;
1468 cmd->proc(c);
1469 if (server.dirty-dirty != 0 && listLength(server.slaves))
1470 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1471 if (listLength(server.monitors))
1472 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1473 server.stat_numcommands++;
1474
1475 /* Prepare the client for the next command */
1476 if (c->flags & REDIS_CLOSE) {
1477 freeClient(c);
1478 return 0;
1479 }
1480 resetClient(c);
1481 return 1;
1482 }
1483
1484 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1485 listNode *ln;
1486 int outc = 0, j;
1487 robj **outv;
1488 /* (args*2)+1 is enough room for args, spaces, newlines */
1489 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1490
1491 if (argc <= REDIS_STATIC_ARGS) {
1492 outv = static_outv;
1493 } else {
1494 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1495 if (!outv) oom("replicationFeedSlaves");
1496 }
1497
1498 for (j = 0; j < argc; j++) {
1499 if (j != 0) outv[outc++] = shared.space;
1500 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1501 robj *lenobj;
1502
1503 lenobj = createObject(REDIS_STRING,
1504 sdscatprintf(sdsempty(),"%d\r\n",
1505 stringObjectLen(argv[j])));
1506 lenobj->refcount = 0;
1507 outv[outc++] = lenobj;
1508 }
1509 outv[outc++] = argv[j];
1510 }
1511 outv[outc++] = shared.crlf;
1512
1513 /* Increment all the refcounts at start and decrement at end in order to
1514 * be sure to free objects if there is no slave in a replication state
1515 * able to be feed with commands */
1516 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1517 listRewind(slaves);
1518 while((ln = listYield(slaves))) {
1519 redisClient *slave = ln->value;
1520
1521 /* Don't feed slaves that are still waiting for BGSAVE to start */
1522 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1523
1524 /* Feed all the other slaves, MONITORs and so on */
1525 if (slave->slaveseldb != dictid) {
1526 robj *selectcmd;
1527
1528 switch(dictid) {
1529 case 0: selectcmd = shared.select0; break;
1530 case 1: selectcmd = shared.select1; break;
1531 case 2: selectcmd = shared.select2; break;
1532 case 3: selectcmd = shared.select3; break;
1533 case 4: selectcmd = shared.select4; break;
1534 case 5: selectcmd = shared.select5; break;
1535 case 6: selectcmd = shared.select6; break;
1536 case 7: selectcmd = shared.select7; break;
1537 case 8: selectcmd = shared.select8; break;
1538 case 9: selectcmd = shared.select9; break;
1539 default:
1540 selectcmd = createObject(REDIS_STRING,
1541 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1542 selectcmd->refcount = 0;
1543 break;
1544 }
1545 addReply(slave,selectcmd);
1546 slave->slaveseldb = dictid;
1547 }
1548 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1549 }
1550 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1551 if (outv != static_outv) zfree(outv);
1552 }
1553
1554 static void processInputBuffer(redisClient *c) {
1555 again:
1556 if (c->bulklen == -1) {
1557 /* Read the first line of the query */
1558 char *p = strchr(c->querybuf,'\n');
1559 size_t querylen;
1560
1561 if (p) {
1562 sds query, *argv;
1563 int argc, j;
1564
1565 query = c->querybuf;
1566 c->querybuf = sdsempty();
1567 querylen = 1+(p-(query));
1568 if (sdslen(query) > querylen) {
1569 /* leave data after the first line of the query in the buffer */
1570 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1571 }
1572 *p = '\0'; /* remove "\n" */
1573 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1574 sdsupdatelen(query);
1575
1576 /* Now we can split the query in arguments */
1577 if (sdslen(query) == 0) {
1578 /* Ignore empty query */
1579 sdsfree(query);
1580 return;
1581 }
1582 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1583 if (argv == NULL) oom("sdssplitlen");
1584 sdsfree(query);
1585
1586 if (c->argv) zfree(c->argv);
1587 c->argv = zmalloc(sizeof(robj*)*argc);
1588 if (c->argv == NULL) oom("allocating arguments list for client");
1589
1590 for (j = 0; j < argc; j++) {
1591 if (sdslen(argv[j])) {
1592 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1593 c->argc++;
1594 } else {
1595 sdsfree(argv[j]);
1596 }
1597 }
1598 zfree(argv);
1599 /* Execute the command. If the client is still valid
1600 * after processCommand() return and there is something
1601 * on the query buffer try to process the next command. */
1602 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1603 return;
1604 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1605 redisLog(REDIS_DEBUG, "Client protocol error");
1606 freeClient(c);
1607 return;
1608 }
1609 } else {
1610 /* Bulk read handling. Note that if we are at this point
1611 the client already sent a command terminated with a newline,
1612 we are reading the bulk data that is actually the last
1613 argument of the command. */
1614 int qbl = sdslen(c->querybuf);
1615
1616 if (c->bulklen <= qbl) {
1617 /* Copy everything but the final CRLF as final argument */
1618 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1619 c->argc++;
1620 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1621 /* Process the command. If the client is still valid after
1622 * the processing and there is more data in the buffer
1623 * try to parse it. */
1624 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1625 return;
1626 }
1627 }
1628 }
1629
1630 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1631 redisClient *c = (redisClient*) privdata;
1632 char buf[REDIS_IOBUF_LEN];
1633 int nread;
1634 REDIS_NOTUSED(el);
1635 REDIS_NOTUSED(mask);
1636
1637 nread = read(fd, buf, REDIS_IOBUF_LEN);
1638 if (nread == -1) {
1639 if (errno == EAGAIN) {
1640 nread = 0;
1641 } else {
1642 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1643 freeClient(c);
1644 return;
1645 }
1646 } else if (nread == 0) {
1647 redisLog(REDIS_DEBUG, "Client closed connection");
1648 freeClient(c);
1649 return;
1650 }
1651 if (nread) {
1652 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1653 c->lastinteraction = time(NULL);
1654 } else {
1655 return;
1656 }
1657 processInputBuffer(c);
1658 }
1659
1660 static int selectDb(redisClient *c, int id) {
1661 if (id < 0 || id >= server.dbnum)
1662 return REDIS_ERR;
1663 c->db = &server.db[id];
1664 return REDIS_OK;
1665 }
1666
1667 static void *dupClientReplyValue(void *o) {
1668 incrRefCount((robj*)o);
1669 return 0;
1670 }
1671
1672 static redisClient *createClient(int fd) {
1673 redisClient *c = zmalloc(sizeof(*c));
1674
1675 anetNonBlock(NULL,fd);
1676 anetTcpNoDelay(NULL,fd);
1677 if (!c) return NULL;
1678 selectDb(c,0);
1679 c->fd = fd;
1680 c->querybuf = sdsempty();
1681 c->argc = 0;
1682 c->argv = NULL;
1683 c->bulklen = -1;
1684 c->multibulk = 0;
1685 c->mbargc = 0;
1686 c->mbargv = NULL;
1687 c->sentlen = 0;
1688 c->flags = 0;
1689 c->lastinteraction = time(NULL);
1690 c->authenticated = 0;
1691 c->replstate = REDIS_REPL_NONE;
1692 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1693 listSetFreeMethod(c->reply,decrRefCount);
1694 listSetDupMethod(c->reply,dupClientReplyValue);
1695 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1696 readQueryFromClient, c, NULL) == AE_ERR) {
1697 freeClient(c);
1698 return NULL;
1699 }
1700 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1701 return c;
1702 }
1703
1704 static void addReply(redisClient *c, robj *obj) {
1705 if (listLength(c->reply) == 0 &&
1706 (c->replstate == REDIS_REPL_NONE ||
1707 c->replstate == REDIS_REPL_ONLINE) &&
1708 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1709 sendReplyToClient, c, NULL) == AE_ERR) return;
1710 if (obj->encoding != REDIS_ENCODING_RAW) {
1711 obj = getDecodedObject(obj);
1712 } else {
1713 incrRefCount(obj);
1714 }
1715 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1716 }
1717
1718 static void addReplySds(redisClient *c, sds s) {
1719 robj *o = createObject(REDIS_STRING,s);
1720 addReply(c,o);
1721 decrRefCount(o);
1722 }
1723
1724 static void addReplyBulkLen(redisClient *c, robj *obj) {
1725 size_t len;
1726
1727 if (obj->encoding == REDIS_ENCODING_RAW) {
1728 len = sdslen(obj->ptr);
1729 } else {
1730 long n = (long)obj->ptr;
1731
1732 len = 1;
1733 if (n < 0) {
1734 len++;
1735 n = -n;
1736 }
1737 while((n = n/10) != 0) {
1738 len++;
1739 }
1740 }
1741 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1742 }
1743
1744 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1745 int cport, cfd;
1746 char cip[128];
1747 redisClient *c;
1748 REDIS_NOTUSED(el);
1749 REDIS_NOTUSED(mask);
1750 REDIS_NOTUSED(privdata);
1751
1752 cfd = anetAccept(server.neterr, fd, cip, &cport);
1753 if (cfd == AE_ERR) {
1754 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1755 return;
1756 }
1757 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1758 if ((c = createClient(cfd)) == NULL) {
1759 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1760 close(cfd); /* May be already closed, just ingore errors */
1761 return;
1762 }
1763 /* If maxclient directive is set and this is one client more... close the
1764 * connection. Note that we create the client instead to check before
1765 * for this condition, since now the socket is already set in nonblocking
1766 * mode and we can send an error for free using the Kernel I/O */
1767 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1768 char *err = "-ERR max number of clients reached\r\n";
1769
1770 /* That's a best effort error message, don't check write errors */
1771 (void) write(c->fd,err,strlen(err));
1772 freeClient(c);
1773 return;
1774 }
1775 server.stat_numconnections++;
1776 }
1777
1778 /* ======================= Redis objects implementation ===================== */
1779
1780 static robj *createObject(int type, void *ptr) {
1781 robj *o;
1782
1783 if (listLength(server.objfreelist)) {
1784 listNode *head = listFirst(server.objfreelist);
1785 o = listNodeValue(head);
1786 listDelNode(server.objfreelist,head);
1787 } else {
1788 o = zmalloc(sizeof(*o));
1789 }
1790 if (!o) oom("createObject");
1791 o->type = type;
1792 o->encoding = REDIS_ENCODING_RAW;
1793 o->ptr = ptr;
1794 o->refcount = 1;
1795 return o;
1796 }
1797
1798 static robj *createStringObject(char *ptr, size_t len) {
1799 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1800 }
1801
1802 static robj *createListObject(void) {
1803 list *l = listCreate();
1804
1805 if (!l) oom("listCreate");
1806 listSetFreeMethod(l,decrRefCount);
1807 return createObject(REDIS_LIST,l);
1808 }
1809
1810 static robj *createSetObject(void) {
1811 dict *d = dictCreate(&setDictType,NULL);
1812 if (!d) oom("dictCreate");
1813 return createObject(REDIS_SET,d);
1814 }
1815
1816 static void freeStringObject(robj *o) {
1817 if (o->encoding == REDIS_ENCODING_RAW) {
1818 sdsfree(o->ptr);
1819 }
1820 }
1821
1822 static void freeListObject(robj *o) {
1823 listRelease((list*) o->ptr);
1824 }
1825
1826 static void freeSetObject(robj *o) {
1827 dictRelease((dict*) o->ptr);
1828 }
1829
1830 static void freeHashObject(robj *o) {
1831 dictRelease((dict*) o->ptr);
1832 }
1833
1834 static void incrRefCount(robj *o) {
1835 o->refcount++;
1836 #ifdef DEBUG_REFCOUNT
1837 if (o->type == REDIS_STRING)
1838 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1839 #endif
1840 }
1841
1842 static void decrRefCount(void *obj) {
1843 robj *o = obj;
1844
1845 #ifdef DEBUG_REFCOUNT
1846 if (o->type == REDIS_STRING)
1847 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1848 #endif
1849 if (--(o->refcount) == 0) {
1850 switch(o->type) {
1851 case REDIS_STRING: freeStringObject(o); break;
1852 case REDIS_LIST: freeListObject(o); break;
1853 case REDIS_SET: freeSetObject(o); break;
1854 case REDIS_HASH: freeHashObject(o); break;
1855 default: assert(0 != 0); break;
1856 }
1857 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1858 !listAddNodeHead(server.objfreelist,o))
1859 zfree(o);
1860 }
1861 }
1862
1863 static robj *lookupKey(redisDb *db, robj *key) {
1864 dictEntry *de = dictFind(db->dict,key);
1865 return de ? dictGetEntryVal(de) : NULL;
1866 }
1867
1868 static robj *lookupKeyRead(redisDb *db, robj *key) {
1869 expireIfNeeded(db,key);
1870 return lookupKey(db,key);
1871 }
1872
1873 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1874 deleteIfVolatile(db,key);
1875 return lookupKey(db,key);
1876 }
1877
1878 static int deleteKey(redisDb *db, robj *key) {
1879 int retval;
1880
1881 /* We need to protect key from destruction: after the first dictDelete()
1882 * it may happen that 'key' is no longer valid if we don't increment
1883 * it's count. This may happen when we get the object reference directly
1884 * from the hash table with dictRandomKey() or dict iterators */
1885 incrRefCount(key);
1886 if (dictSize(db->expires)) dictDelete(db->expires,key);
1887 retval = dictDelete(db->dict,key);
1888 decrRefCount(key);
1889
1890 return retval == DICT_OK;
1891 }
1892
1893 /* Try to share an object against the shared objects pool */
1894 static robj *tryObjectSharing(robj *o) {
1895 struct dictEntry *de;
1896 unsigned long c;
1897
1898 if (o == NULL || server.shareobjects == 0) return o;
1899
1900 assert(o->type == REDIS_STRING);
1901 de = dictFind(server.sharingpool,o);
1902 if (de) {
1903 robj *shared = dictGetEntryKey(de);
1904
1905 c = ((unsigned long) dictGetEntryVal(de))+1;
1906 dictGetEntryVal(de) = (void*) c;
1907 incrRefCount(shared);
1908 decrRefCount(o);
1909 return shared;
1910 } else {
1911 /* Here we are using a stream algorihtm: Every time an object is
1912 * shared we increment its count, everytime there is a miss we
1913 * recrement the counter of a random object. If this object reaches
1914 * zero we remove the object and put the current object instead. */
1915 if (dictSize(server.sharingpool) >=
1916 server.sharingpoolsize) {
1917 de = dictGetRandomKey(server.sharingpool);
1918 assert(de != NULL);
1919 c = ((unsigned long) dictGetEntryVal(de))-1;
1920 dictGetEntryVal(de) = (void*) c;
1921 if (c == 0) {
1922 dictDelete(server.sharingpool,de->key);
1923 }
1924 } else {
1925 c = 0; /* If the pool is empty we want to add this object */
1926 }
1927 if (c == 0) {
1928 int retval;
1929
1930 retval = dictAdd(server.sharingpool,o,(void*)1);
1931 assert(retval == DICT_OK);
1932 incrRefCount(o);
1933 }
1934 return o;
1935 }
1936 }
1937
1938 /* Check if the nul-terminated string 's' can be represented by a long
1939 * (that is, is a number that fits into long without any other space or
1940 * character before or after the digits).
1941 *
1942 * If so, the function returns REDIS_OK and *longval is set to the value
1943 * of the number. Otherwise REDIS_ERR is returned */
1944 static int isStringRepresentableAsLong(char *s, long *longval) {
1945 char buf[32], *endptr;
1946 long value;
1947 int slen;
1948
1949 value = strtol(s, &endptr, 10);
1950 if (endptr[0] != '\0') return REDIS_ERR;
1951 slen = snprintf(buf,32,"%ld",value);
1952
1953 /* If the number converted back into a string is not identical
1954 * then it's not possible to encode the string as integer */
1955 if (strlen(buf) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
1956 if (longval) *longval = value;
1957 return REDIS_OK;
1958 }
1959
1960 /* Try to encode a string object in order to save space */
1961 static int tryObjectEncoding(robj *o) {
1962 long value;
1963 sds s = o->ptr;
1964
1965 if (o->encoding != REDIS_ENCODING_RAW)
1966 return REDIS_ERR; /* Already encoded */
1967
1968 /* It's not save to encode shared objects: shared objects can be shared
1969 * everywhere in the "object space" of Redis. Encoded objects can only
1970 * appear as "values" (and not, for instance, as keys) */
1971 if (o->refcount > 1) return REDIS_ERR;
1972
1973 /* Currently we try to encode only strings */
1974 assert(o->type == REDIS_STRING);
1975
1976 /* Check if we can represent this string as a long integer */
1977 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
1978
1979 /* Ok, this object can be encoded */
1980 o->encoding = REDIS_ENCODING_INT;
1981 sdsfree(o->ptr);
1982 o->ptr = (void*) value;
1983 return REDIS_OK;
1984 }
1985
1986 /* Get a decoded version of an encoded object (returned as a new object) */
1987 static robj *getDecodedObject(const robj *o) {
1988 robj *dec;
1989
1990 assert(o->encoding != REDIS_ENCODING_RAW);
1991 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
1992 char buf[32];
1993
1994 snprintf(buf,32,"%ld",(long)o->ptr);
1995 dec = createStringObject(buf,strlen(buf));
1996 return dec;
1997 } else {
1998 assert(1 != 1);
1999 }
2000 }
2001
2002 static int compareStringObjects(robj *a, robj *b) {
2003 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2004
2005 if (a->encoding == REDIS_ENCODING_INT && b->encoding == REDIS_ENCODING_INT){
2006 return (long)a->ptr - (long)b->ptr;
2007 } else {
2008 int retval;
2009
2010 incrRefCount(a);
2011 incrRefCount(b);
2012 if (a->encoding != REDIS_ENCODING_RAW) a = getDecodedObject(a);
2013 if (b->encoding != REDIS_ENCODING_RAW) b = getDecodedObject(a);
2014 retval = sdscmp(a->ptr,b->ptr);
2015 decrRefCount(a);
2016 decrRefCount(b);
2017 return retval;
2018 }
2019 }
2020
2021 static size_t stringObjectLen(robj *o) {
2022 assert(o->type == REDIS_STRING);
2023 if (o->encoding == REDIS_ENCODING_RAW) {
2024 return sdslen(o->ptr);
2025 } else {
2026 char buf[32];
2027
2028 return snprintf(buf,32,"%ld",(long)o->ptr);
2029 }
2030 }
2031
2032 /*============================ DB saving/loading ============================ */
2033
2034 static int rdbSaveType(FILE *fp, unsigned char type) {
2035 if (fwrite(&type,1,1,fp) == 0) return -1;
2036 return 0;
2037 }
2038
2039 static int rdbSaveTime(FILE *fp, time_t t) {
2040 int32_t t32 = (int32_t) t;
2041 if (fwrite(&t32,4,1,fp) == 0) return -1;
2042 return 0;
2043 }
2044
2045 /* check rdbLoadLen() comments for more info */
2046 static int rdbSaveLen(FILE *fp, uint32_t len) {
2047 unsigned char buf[2];
2048
2049 if (len < (1<<6)) {
2050 /* Save a 6 bit len */
2051 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2052 if (fwrite(buf,1,1,fp) == 0) return -1;
2053 } else if (len < (1<<14)) {
2054 /* Save a 14 bit len */
2055 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2056 buf[1] = len&0xFF;
2057 if (fwrite(buf,2,1,fp) == 0) return -1;
2058 } else {
2059 /* Save a 32 bit len */
2060 buf[0] = (REDIS_RDB_32BITLEN<<6);
2061 if (fwrite(buf,1,1,fp) == 0) return -1;
2062 len = htonl(len);
2063 if (fwrite(&len,4,1,fp) == 0) return -1;
2064 }
2065 return 0;
2066 }
2067
2068 /* String objects in the form "2391" "-100" without any space and with a
2069 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2070 * encoded as integers to save space */
2071 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2072 long long value;
2073 char *endptr, buf[32];
2074
2075 /* Check if it's possible to encode this value as a number */
2076 value = strtoll(s, &endptr, 10);
2077 if (endptr[0] != '\0') return 0;
2078 snprintf(buf,32,"%lld",value);
2079
2080 /* If the number converted back into a string is not identical
2081 * then it's not possible to encode the string as integer */
2082 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2083
2084 /* Finally check if it fits in our ranges */
2085 if (value >= -(1<<7) && value <= (1<<7)-1) {
2086 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2087 enc[1] = value&0xFF;
2088 return 2;
2089 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2090 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2091 enc[1] = value&0xFF;
2092 enc[2] = (value>>8)&0xFF;
2093 return 3;
2094 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2095 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2096 enc[1] = value&0xFF;
2097 enc[2] = (value>>8)&0xFF;
2098 enc[3] = (value>>16)&0xFF;
2099 enc[4] = (value>>24)&0xFF;
2100 return 5;
2101 } else {
2102 return 0;
2103 }
2104 }
2105
2106 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2107 unsigned int comprlen, outlen;
2108 unsigned char byte;
2109 void *out;
2110
2111 /* We require at least four bytes compression for this to be worth it */
2112 outlen = sdslen(obj->ptr)-4;
2113 if (outlen <= 0) return 0;
2114 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2115 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2116 if (comprlen == 0) {
2117 zfree(out);
2118 return 0;
2119 }
2120 /* Data compressed! Let's save it on disk */
2121 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2122 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2123 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2124 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2125 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2126 zfree(out);
2127 return comprlen;
2128
2129 writeerr:
2130 zfree(out);
2131 return -1;
2132 }
2133
2134 /* Save a string objet as [len][data] on disk. If the object is a string
2135 * representation of an integer value we try to safe it in a special form */
2136 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2137 size_t len;
2138 int enclen;
2139
2140 len = sdslen(obj->ptr);
2141
2142 /* Try integer encoding */
2143 if (len <= 11) {
2144 unsigned char buf[5];
2145 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2146 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2147 return 0;
2148 }
2149 }
2150
2151 /* Try LZF compression - under 20 bytes it's unable to compress even
2152 * aaaaaaaaaaaaaaaaaa so skip it */
2153 if (len > 20) {
2154 int retval;
2155
2156 retval = rdbSaveLzfStringObject(fp,obj);
2157 if (retval == -1) return -1;
2158 if (retval > 0) return 0;
2159 /* retval == 0 means data can't be compressed, save the old way */
2160 }
2161
2162 /* Store verbatim */
2163 if (rdbSaveLen(fp,len) == -1) return -1;
2164 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2165 return 0;
2166 }
2167
2168 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2169 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2170 int retval;
2171 robj *dec;
2172
2173 if (obj->encoding != REDIS_ENCODING_RAW) {
2174 dec = getDecodedObject(obj);
2175 retval = rdbSaveStringObjectRaw(fp,dec);
2176 decrRefCount(dec);
2177 return retval;
2178 } else {
2179 return rdbSaveStringObjectRaw(fp,obj);
2180 }
2181 }
2182
2183 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2184 static int rdbSave(char *filename) {
2185 dictIterator *di = NULL;
2186 dictEntry *de;
2187 FILE *fp;
2188 char tmpfile[256];
2189 int j;
2190 time_t now = time(NULL);
2191
2192 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2193 fp = fopen(tmpfile,"w");
2194 if (!fp) {
2195 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2196 return REDIS_ERR;
2197 }
2198 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2199 for (j = 0; j < server.dbnum; j++) {
2200 redisDb *db = server.db+j;
2201 dict *d = db->dict;
2202 if (dictSize(d) == 0) continue;
2203 di = dictGetIterator(d);
2204 if (!di) {
2205 fclose(fp);
2206 return REDIS_ERR;
2207 }
2208
2209 /* Write the SELECT DB opcode */
2210 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2211 if (rdbSaveLen(fp,j) == -1) goto werr;
2212
2213 /* Iterate this DB writing every entry */
2214 while((de = dictNext(di)) != NULL) {
2215 robj *key = dictGetEntryKey(de);
2216 robj *o = dictGetEntryVal(de);
2217 time_t expiretime = getExpire(db,key);
2218
2219 /* Save the expire time */
2220 if (expiretime != -1) {
2221 /* If this key is already expired skip it */
2222 if (expiretime < now) continue;
2223 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2224 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2225 }
2226 /* Save the key and associated value */
2227 if (rdbSaveType(fp,o->type) == -1) goto werr;
2228 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2229 if (o->type == REDIS_STRING) {
2230 /* Save a string value */
2231 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2232 } else if (o->type == REDIS_LIST) {
2233 /* Save a list value */
2234 list *list = o->ptr;
2235 listNode *ln;
2236
2237 listRewind(list);
2238 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2239 while((ln = listYield(list))) {
2240 robj *eleobj = listNodeValue(ln);
2241
2242 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2243 }
2244 } else if (o->type == REDIS_SET) {
2245 /* Save a set value */
2246 dict *set = o->ptr;
2247 dictIterator *di = dictGetIterator(set);
2248 dictEntry *de;
2249
2250 if (!set) oom("dictGetIteraotr");
2251 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2252 while((de = dictNext(di)) != NULL) {
2253 robj *eleobj = dictGetEntryKey(de);
2254
2255 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2256 }
2257 dictReleaseIterator(di);
2258 } else {
2259 assert(0 != 0);
2260 }
2261 }
2262 dictReleaseIterator(di);
2263 }
2264 /* EOF opcode */
2265 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2266
2267 /* Make sure data will not remain on the OS's output buffers */
2268 fflush(fp);
2269 fsync(fileno(fp));
2270 fclose(fp);
2271
2272 /* Use RENAME to make sure the DB file is changed atomically only
2273 * if the generate DB file is ok. */
2274 if (rename(tmpfile,filename) == -1) {
2275 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
2276 unlink(tmpfile);
2277 return REDIS_ERR;
2278 }
2279 redisLog(REDIS_NOTICE,"DB saved on disk");
2280 server.dirty = 0;
2281 server.lastsave = time(NULL);
2282 return REDIS_OK;
2283
2284 werr:
2285 fclose(fp);
2286 unlink(tmpfile);
2287 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2288 if (di) dictReleaseIterator(di);
2289 return REDIS_ERR;
2290 }
2291
2292 static int rdbSaveBackground(char *filename) {
2293 pid_t childpid;
2294
2295 if (server.bgsaveinprogress) return REDIS_ERR;
2296 if ((childpid = fork()) == 0) {
2297 /* Child */
2298 close(server.fd);
2299 if (rdbSave(filename) == REDIS_OK) {
2300 exit(0);
2301 } else {
2302 exit(1);
2303 }
2304 } else {
2305 /* Parent */
2306 if (childpid == -1) {
2307 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2308 strerror(errno));
2309 return REDIS_ERR;
2310 }
2311 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2312 server.bgsaveinprogress = 1;
2313 server.bgsavechildpid = childpid;
2314 return REDIS_OK;
2315 }
2316 return REDIS_OK; /* unreached */
2317 }
2318
2319 static void rdbRemoveTempFile(pid_t childpid) {
2320 char tmpfile[256];
2321
2322 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2323 unlink(tmpfile);
2324 }
2325
2326 static int rdbLoadType(FILE *fp) {
2327 unsigned char type;
2328 if (fread(&type,1,1,fp) == 0) return -1;
2329 return type;
2330 }
2331
2332 static time_t rdbLoadTime(FILE *fp) {
2333 int32_t t32;
2334 if (fread(&t32,4,1,fp) == 0) return -1;
2335 return (time_t) t32;
2336 }
2337
2338 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2339 * of this file for a description of how this are stored on disk.
2340 *
2341 * isencoded is set to 1 if the readed length is not actually a length but
2342 * an "encoding type", check the above comments for more info */
2343 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2344 unsigned char buf[2];
2345 uint32_t len;
2346
2347 if (isencoded) *isencoded = 0;
2348 if (rdbver == 0) {
2349 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2350 return ntohl(len);
2351 } else {
2352 int type;
2353
2354 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2355 type = (buf[0]&0xC0)>>6;
2356 if (type == REDIS_RDB_6BITLEN) {
2357 /* Read a 6 bit len */
2358 return buf[0]&0x3F;
2359 } else if (type == REDIS_RDB_ENCVAL) {
2360 /* Read a 6 bit len encoding type */
2361 if (isencoded) *isencoded = 1;
2362 return buf[0]&0x3F;
2363 } else if (type == REDIS_RDB_14BITLEN) {
2364 /* Read a 14 bit len */
2365 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2366 return ((buf[0]&0x3F)<<8)|buf[1];
2367 } else {
2368 /* Read a 32 bit len */
2369 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2370 return ntohl(len);
2371 }
2372 }
2373 }
2374
2375 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2376 unsigned char enc[4];
2377 long long val;
2378
2379 if (enctype == REDIS_RDB_ENC_INT8) {
2380 if (fread(enc,1,1,fp) == 0) return NULL;
2381 val = (signed char)enc[0];
2382 } else if (enctype == REDIS_RDB_ENC_INT16) {
2383 uint16_t v;
2384 if (fread(enc,2,1,fp) == 0) return NULL;
2385 v = enc[0]|(enc[1]<<8);
2386 val = (int16_t)v;
2387 } else if (enctype == REDIS_RDB_ENC_INT32) {
2388 uint32_t v;
2389 if (fread(enc,4,1,fp) == 0) return NULL;
2390 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2391 val = (int32_t)v;
2392 } else {
2393 val = 0; /* anti-warning */
2394 assert(0!=0);
2395 }
2396 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2397 }
2398
2399 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2400 unsigned int len, clen;
2401 unsigned char *c = NULL;
2402 sds val = NULL;
2403
2404 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2405 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2406 if ((c = zmalloc(clen)) == NULL) goto err;
2407 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2408 if (fread(c,clen,1,fp) == 0) goto err;
2409 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2410 zfree(c);
2411 return createObject(REDIS_STRING,val);
2412 err:
2413 zfree(c);
2414 sdsfree(val);
2415 return NULL;
2416 }
2417
2418 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2419 int isencoded;
2420 uint32_t len;
2421 sds val;
2422
2423 len = rdbLoadLen(fp,rdbver,&isencoded);
2424 if (isencoded) {
2425 switch(len) {
2426 case REDIS_RDB_ENC_INT8:
2427 case REDIS_RDB_ENC_INT16:
2428 case REDIS_RDB_ENC_INT32:
2429 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2430 case REDIS_RDB_ENC_LZF:
2431 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2432 default:
2433 assert(0!=0);
2434 }
2435 }
2436
2437 if (len == REDIS_RDB_LENERR) return NULL;
2438 val = sdsnewlen(NULL,len);
2439 if (len && fread(val,len,1,fp) == 0) {
2440 sdsfree(val);
2441 return NULL;
2442 }
2443 return tryObjectSharing(createObject(REDIS_STRING,val));
2444 }
2445
2446 static int rdbLoad(char *filename) {
2447 FILE *fp;
2448 robj *keyobj = NULL;
2449 uint32_t dbid;
2450 int type, retval, rdbver;
2451 dict *d = server.db[0].dict;
2452 redisDb *db = server.db+0;
2453 char buf[1024];
2454 time_t expiretime = -1, now = time(NULL);
2455
2456 fp = fopen(filename,"r");
2457 if (!fp) return REDIS_ERR;
2458 if (fread(buf,9,1,fp) == 0) goto eoferr;
2459 buf[9] = '\0';
2460 if (memcmp(buf,"REDIS",5) != 0) {
2461 fclose(fp);
2462 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2463 return REDIS_ERR;
2464 }
2465 rdbver = atoi(buf+5);
2466 if (rdbver > 1) {
2467 fclose(fp);
2468 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2469 return REDIS_ERR;
2470 }
2471 while(1) {
2472 robj *o;
2473
2474 /* Read type. */
2475 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2476 if (type == REDIS_EXPIRETIME) {
2477 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2478 /* We read the time so we need to read the object type again */
2479 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2480 }
2481 if (type == REDIS_EOF) break;
2482 /* Handle SELECT DB opcode as a special case */
2483 if (type == REDIS_SELECTDB) {
2484 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2485 goto eoferr;
2486 if (dbid >= (unsigned)server.dbnum) {
2487 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2488 exit(1);
2489 }
2490 db = server.db+dbid;
2491 d = db->dict;
2492 continue;
2493 }
2494 /* Read key */
2495 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2496
2497 if (type == REDIS_STRING) {
2498 /* Read string value */
2499 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2500 tryObjectEncoding(o);
2501 } else if (type == REDIS_LIST || type == REDIS_SET) {
2502 /* Read list/set value */
2503 uint32_t listlen;
2504
2505 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2506 goto eoferr;
2507 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2508 /* Load every single element of the list/set */
2509 while(listlen--) {
2510 robj *ele;
2511
2512 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2513 tryObjectEncoding(ele);
2514 if (type == REDIS_LIST) {
2515 if (!listAddNodeTail((list*)o->ptr,ele))
2516 oom("listAddNodeTail");
2517 } else {
2518 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2519 oom("dictAdd");
2520 }
2521 }
2522 } else {
2523 assert(0 != 0);
2524 }
2525 /* Add the new object in the hash table */
2526 retval = dictAdd(d,keyobj,o);
2527 if (retval == DICT_ERR) {
2528 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2529 exit(1);
2530 }
2531 /* Set the expire time if needed */
2532 if (expiretime != -1) {
2533 setExpire(db,keyobj,expiretime);
2534 /* Delete this key if already expired */
2535 if (expiretime < now) deleteKey(db,keyobj);
2536 expiretime = -1;
2537 }
2538 keyobj = o = NULL;
2539 }
2540 fclose(fp);
2541 return REDIS_OK;
2542
2543 eoferr: /* unexpected end of file is handled here with a fatal exit */
2544 if (keyobj) decrRefCount(keyobj);
2545 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2546 exit(1);
2547 return REDIS_ERR; /* Just to avoid warning */
2548 }
2549
2550 /*================================== Commands =============================== */
2551
2552 static void authCommand(redisClient *c) {
2553 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2554 c->authenticated = 1;
2555 addReply(c,shared.ok);
2556 } else {
2557 c->authenticated = 0;
2558 addReply(c,shared.err);
2559 }
2560 }
2561
2562 static void pingCommand(redisClient *c) {
2563 addReply(c,shared.pong);
2564 }
2565
2566 static void echoCommand(redisClient *c) {
2567 addReplyBulkLen(c,c->argv[1]);
2568 addReply(c,c->argv[1]);
2569 addReply(c,shared.crlf);
2570 }
2571
2572 /*=================================== Strings =============================== */
2573
2574 static void setGenericCommand(redisClient *c, int nx) {
2575 int retval;
2576
2577 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2578 if (retval == DICT_ERR) {
2579 if (!nx) {
2580 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2581 incrRefCount(c->argv[2]);
2582 } else {
2583 addReply(c,shared.czero);
2584 return;
2585 }
2586 } else {
2587 incrRefCount(c->argv[1]);
2588 incrRefCount(c->argv[2]);
2589 }
2590 server.dirty++;
2591 removeExpire(c->db,c->argv[1]);
2592 addReply(c, nx ? shared.cone : shared.ok);
2593 }
2594
2595 static void setCommand(redisClient *c) {
2596 setGenericCommand(c,0);
2597 }
2598
2599 static void setnxCommand(redisClient *c) {
2600 setGenericCommand(c,1);
2601 }
2602
2603 static void getCommand(redisClient *c) {
2604 robj *o = lookupKeyRead(c->db,c->argv[1]);
2605
2606 if (o == NULL) {
2607 addReply(c,shared.nullbulk);
2608 } else {
2609 if (o->type != REDIS_STRING) {
2610 addReply(c,shared.wrongtypeerr);
2611 } else {
2612 addReplyBulkLen(c,o);
2613 addReply(c,o);
2614 addReply(c,shared.crlf);
2615 }
2616 }
2617 }
2618
2619 static void getsetCommand(redisClient *c) {
2620 getCommand(c);
2621 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2622 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2623 } else {
2624 incrRefCount(c->argv[1]);
2625 }
2626 incrRefCount(c->argv[2]);
2627 server.dirty++;
2628 removeExpire(c->db,c->argv[1]);
2629 }
2630
2631 static void mgetCommand(redisClient *c) {
2632 int j;
2633
2634 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2635 for (j = 1; j < c->argc; j++) {
2636 robj *o = lookupKeyRead(c->db,c->argv[j]);
2637 if (o == NULL) {
2638 addReply(c,shared.nullbulk);
2639 } else {
2640 if (o->type != REDIS_STRING) {
2641 addReply(c,shared.nullbulk);
2642 } else {
2643 addReplyBulkLen(c,o);
2644 addReply(c,o);
2645 addReply(c,shared.crlf);
2646 }
2647 }
2648 }
2649 }
2650
2651 static void incrDecrCommand(redisClient *c, long long incr) {
2652 long long value;
2653 int retval;
2654 robj *o;
2655
2656 o = lookupKeyWrite(c->db,c->argv[1]);
2657 if (o == NULL) {
2658 value = 0;
2659 } else {
2660 if (o->type != REDIS_STRING) {
2661 value = 0;
2662 } else {
2663 char *eptr;
2664
2665 if (o->encoding == REDIS_ENCODING_RAW)
2666 value = strtoll(o->ptr, &eptr, 10);
2667 else if (o->encoding == REDIS_ENCODING_INT)
2668 value = (long)o->ptr;
2669 else
2670 assert(1 != 1);
2671 }
2672 }
2673
2674 value += incr;
2675 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2676 tryObjectEncoding(o);
2677 retval = dictAdd(c->db->dict,c->argv[1],o);
2678 if (retval == DICT_ERR) {
2679 dictReplace(c->db->dict,c->argv[1],o);
2680 removeExpire(c->db,c->argv[1]);
2681 } else {
2682 incrRefCount(c->argv[1]);
2683 }
2684 server.dirty++;
2685 addReply(c,shared.colon);
2686 addReply(c,o);
2687 addReply(c,shared.crlf);
2688 }
2689
2690 static void incrCommand(redisClient *c) {
2691 incrDecrCommand(c,1);
2692 }
2693
2694 static void decrCommand(redisClient *c) {
2695 incrDecrCommand(c,-1);
2696 }
2697
2698 static void incrbyCommand(redisClient *c) {
2699 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2700 incrDecrCommand(c,incr);
2701 }
2702
2703 static void decrbyCommand(redisClient *c) {
2704 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2705 incrDecrCommand(c,-incr);
2706 }
2707
2708 /* ========================= Type agnostic commands ========================= */
2709
2710 static void delCommand(redisClient *c) {
2711 int deleted = 0, j;
2712
2713 for (j = 1; j < c->argc; j++) {
2714 if (deleteKey(c->db,c->argv[j])) {
2715 server.dirty++;
2716 deleted++;
2717 }
2718 }
2719 switch(deleted) {
2720 case 0:
2721 addReply(c,shared.czero);
2722 break;
2723 case 1:
2724 addReply(c,shared.cone);
2725 break;
2726 default:
2727 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2728 break;
2729 }
2730 }
2731
2732 static void existsCommand(redisClient *c) {
2733 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2734 }
2735
2736 static void selectCommand(redisClient *c) {
2737 int id = atoi(c->argv[1]->ptr);
2738
2739 if (selectDb(c,id) == REDIS_ERR) {
2740 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2741 } else {
2742 addReply(c,shared.ok);
2743 }
2744 }
2745
2746 static void randomkeyCommand(redisClient *c) {
2747 dictEntry *de;
2748
2749 while(1) {
2750 de = dictGetRandomKey(c->db->dict);
2751 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2752 }
2753 if (de == NULL) {
2754 addReply(c,shared.plus);
2755 addReply(c,shared.crlf);
2756 } else {
2757 addReply(c,shared.plus);
2758 addReply(c,dictGetEntryKey(de));
2759 addReply(c,shared.crlf);
2760 }
2761 }
2762
2763 static void keysCommand(redisClient *c) {
2764 dictIterator *di;
2765 dictEntry *de;
2766 sds pattern = c->argv[1]->ptr;
2767 int plen = sdslen(pattern);
2768 int numkeys = 0, keyslen = 0;
2769 robj *lenobj = createObject(REDIS_STRING,NULL);
2770
2771 di = dictGetIterator(c->db->dict);
2772 if (!di) oom("dictGetIterator");
2773 addReply(c,lenobj);
2774 decrRefCount(lenobj);
2775 while((de = dictNext(di)) != NULL) {
2776 robj *keyobj = dictGetEntryKey(de);
2777
2778 sds key = keyobj->ptr;
2779 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2780 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2781 if (expireIfNeeded(c->db,keyobj) == 0) {
2782 if (numkeys != 0)
2783 addReply(c,shared.space);
2784 addReply(c,keyobj);
2785 numkeys++;
2786 keyslen += sdslen(key);
2787 }
2788 }
2789 }
2790 dictReleaseIterator(di);
2791 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2792 addReply(c,shared.crlf);
2793 }
2794
2795 static void dbsizeCommand(redisClient *c) {
2796 addReplySds(c,
2797 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2798 }
2799
2800 static void lastsaveCommand(redisClient *c) {
2801 addReplySds(c,
2802 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2803 }
2804
2805 static void typeCommand(redisClient *c) {
2806 robj *o;
2807 char *type;
2808
2809 o = lookupKeyRead(c->db,c->argv[1]);
2810 if (o == NULL) {
2811 type = "+none";
2812 } else {
2813 switch(o->type) {
2814 case REDIS_STRING: type = "+string"; break;
2815 case REDIS_LIST: type = "+list"; break;
2816 case REDIS_SET: type = "+set"; break;
2817 default: type = "unknown"; break;
2818 }
2819 }
2820 addReplySds(c,sdsnew(type));
2821 addReply(c,shared.crlf);
2822 }
2823
2824 static void saveCommand(redisClient *c) {
2825 if (server.bgsaveinprogress) {
2826 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2827 return;
2828 }
2829 if (rdbSave(server.dbfilename) == REDIS_OK) {
2830 addReply(c,shared.ok);
2831 } else {
2832 addReply(c,shared.err);
2833 }
2834 }
2835
2836 static void bgsaveCommand(redisClient *c) {
2837 if (server.bgsaveinprogress) {
2838 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2839 return;
2840 }
2841 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2842 addReply(c,shared.ok);
2843 } else {
2844 addReply(c,shared.err);
2845 }
2846 }
2847
2848 static void shutdownCommand(redisClient *c) {
2849 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2850 /* Kill the saving child if there is a background saving in progress.
2851 We want to avoid race conditions, for instance our saving child may
2852 overwrite the synchronous saving did by SHUTDOWN. */
2853 if (server.bgsaveinprogress) {
2854 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2855 kill(server.bgsavechildpid,SIGKILL);
2856 rdbRemoveTempFile(server.bgsavechildpid);
2857 }
2858 /* SYNC SAVE */
2859 if (rdbSave(server.dbfilename) == REDIS_OK) {
2860 if (server.daemonize)
2861 unlink(server.pidfile);
2862 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2863 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2864 exit(1);
2865 } else {
2866 /* Ooops.. error saving! The best we can do is to continue operating.
2867 * Note that if there was a background saving process, in the next
2868 * cron() Redis will be notified that the background saving aborted,
2869 * handling special stuff like slaves pending for synchronization... */
2870 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2871 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2872 }
2873 }
2874
2875 static void renameGenericCommand(redisClient *c, int nx) {
2876 robj *o;
2877
2878 /* To use the same key as src and dst is probably an error */
2879 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2880 addReply(c,shared.sameobjecterr);
2881 return;
2882 }
2883
2884 o = lookupKeyWrite(c->db,c->argv[1]);
2885 if (o == NULL) {
2886 addReply(c,shared.nokeyerr);
2887 return;
2888 }
2889 incrRefCount(o);
2890 deleteIfVolatile(c->db,c->argv[2]);
2891 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2892 if (nx) {
2893 decrRefCount(o);
2894 addReply(c,shared.czero);
2895 return;
2896 }
2897 dictReplace(c->db->dict,c->argv[2],o);
2898 } else {
2899 incrRefCount(c->argv[2]);
2900 }
2901 deleteKey(c->db,c->argv[1]);
2902 server.dirty++;
2903 addReply(c,nx ? shared.cone : shared.ok);
2904 }
2905
2906 static void renameCommand(redisClient *c) {
2907 renameGenericCommand(c,0);
2908 }
2909
2910 static void renamenxCommand(redisClient *c) {
2911 renameGenericCommand(c,1);
2912 }
2913
2914 static void moveCommand(redisClient *c) {
2915 robj *o;
2916 redisDb *src, *dst;
2917 int srcid;
2918
2919 /* Obtain source and target DB pointers */
2920 src = c->db;
2921 srcid = c->db->id;
2922 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2923 addReply(c,shared.outofrangeerr);
2924 return;
2925 }
2926 dst = c->db;
2927 selectDb(c,srcid); /* Back to the source DB */
2928
2929 /* If the user is moving using as target the same
2930 * DB as the source DB it is probably an error. */
2931 if (src == dst) {
2932 addReply(c,shared.sameobjecterr);
2933 return;
2934 }
2935
2936 /* Check if the element exists and get a reference */
2937 o = lookupKeyWrite(c->db,c->argv[1]);
2938 if (!o) {
2939 addReply(c,shared.czero);
2940 return;
2941 }
2942
2943 /* Try to add the element to the target DB */
2944 deleteIfVolatile(dst,c->argv[1]);
2945 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2946 addReply(c,shared.czero);
2947 return;
2948 }
2949 incrRefCount(c->argv[1]);
2950 incrRefCount(o);
2951
2952 /* OK! key moved, free the entry in the source DB */
2953 deleteKey(src,c->argv[1]);
2954 server.dirty++;
2955 addReply(c,shared.cone);
2956 }
2957
2958 /* =================================== Lists ================================ */
2959 static void pushGenericCommand(redisClient *c, int where) {
2960 robj *lobj;
2961 list *list;
2962
2963 lobj = lookupKeyWrite(c->db,c->argv[1]);
2964 if (lobj == NULL) {
2965 lobj = createListObject();
2966 list = lobj->ptr;
2967 if (where == REDIS_HEAD) {
2968 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2969 } else {
2970 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2971 }
2972 dictAdd(c->db->dict,c->argv[1],lobj);
2973 incrRefCount(c->argv[1]);
2974 incrRefCount(c->argv[2]);
2975 } else {
2976 if (lobj->type != REDIS_LIST) {
2977 addReply(c,shared.wrongtypeerr);
2978 return;
2979 }
2980 list = lobj->ptr;
2981 if (where == REDIS_HEAD) {
2982 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2983 } else {
2984 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2985 }
2986 incrRefCount(c->argv[2]);
2987 }
2988 server.dirty++;
2989 addReply(c,shared.ok);
2990 }
2991
2992 static void lpushCommand(redisClient *c) {
2993 pushGenericCommand(c,REDIS_HEAD);
2994 }
2995
2996 static void rpushCommand(redisClient *c) {
2997 pushGenericCommand(c,REDIS_TAIL);
2998 }
2999
3000 static void llenCommand(redisClient *c) {
3001 robj *o;
3002 list *l;
3003
3004 o = lookupKeyRead(c->db,c->argv[1]);
3005 if (o == NULL) {
3006 addReply(c,shared.czero);
3007 return;
3008 } else {
3009 if (o->type != REDIS_LIST) {
3010 addReply(c,shared.wrongtypeerr);
3011 } else {
3012 l = o->ptr;
3013 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3014 }
3015 }
3016 }
3017
3018 static void lindexCommand(redisClient *c) {
3019 robj *o;
3020 int index = atoi(c->argv[2]->ptr);
3021
3022 o = lookupKeyRead(c->db,c->argv[1]);
3023 if (o == NULL) {
3024 addReply(c,shared.nullbulk);
3025 } else {
3026 if (o->type != REDIS_LIST) {
3027 addReply(c,shared.wrongtypeerr);
3028 } else {
3029 list *list = o->ptr;
3030 listNode *ln;
3031
3032 ln = listIndex(list, index);
3033 if (ln == NULL) {
3034 addReply(c,shared.nullbulk);
3035 } else {
3036 robj *ele = listNodeValue(ln);
3037 addReplyBulkLen(c,ele);
3038 addReply(c,ele);
3039 addReply(c,shared.crlf);
3040 }
3041 }
3042 }
3043 }
3044
3045 static void lsetCommand(redisClient *c) {
3046 robj *o;
3047 int index = atoi(c->argv[2]->ptr);
3048
3049 o = lookupKeyWrite(c->db,c->argv[1]);
3050 if (o == NULL) {
3051 addReply(c,shared.nokeyerr);
3052 } else {
3053 if (o->type != REDIS_LIST) {
3054 addReply(c,shared.wrongtypeerr);
3055 } else {
3056 list *list = o->ptr;
3057 listNode *ln;
3058
3059 ln = listIndex(list, index);
3060 if (ln == NULL) {
3061 addReply(c,shared.outofrangeerr);
3062 } else {
3063 robj *ele = listNodeValue(ln);
3064
3065 decrRefCount(ele);
3066 listNodeValue(ln) = c->argv[3];
3067 incrRefCount(c->argv[3]);
3068 addReply(c,shared.ok);
3069 server.dirty++;
3070 }
3071 }
3072 }
3073 }
3074
3075 static void popGenericCommand(redisClient *c, int where) {
3076 robj *o;
3077
3078 o = lookupKeyWrite(c->db,c->argv[1]);
3079 if (o == NULL) {
3080 addReply(c,shared.nullbulk);
3081 } else {
3082 if (o->type != REDIS_LIST) {
3083 addReply(c,shared.wrongtypeerr);
3084 } else {
3085 list *list = o->ptr;
3086 listNode *ln;
3087
3088 if (where == REDIS_HEAD)
3089 ln = listFirst(list);
3090 else
3091 ln = listLast(list);
3092
3093 if (ln == NULL) {
3094 addReply(c,shared.nullbulk);
3095 } else {
3096 robj *ele = listNodeValue(ln);
3097 addReplyBulkLen(c,ele);
3098 addReply(c,ele);
3099 addReply(c,shared.crlf);
3100 listDelNode(list,ln);
3101 server.dirty++;
3102 }
3103 }
3104 }
3105 }
3106
3107 static void lpopCommand(redisClient *c) {
3108 popGenericCommand(c,REDIS_HEAD);
3109 }
3110
3111 static void rpopCommand(redisClient *c) {
3112 popGenericCommand(c,REDIS_TAIL);
3113 }
3114
3115 static void lrangeCommand(redisClient *c) {
3116 robj *o;
3117 int start = atoi(c->argv[2]->ptr);
3118 int end = atoi(c->argv[3]->ptr);
3119
3120 o = lookupKeyRead(c->db,c->argv[1]);
3121 if (o == NULL) {
3122 addReply(c,shared.nullmultibulk);
3123 } else {
3124 if (o->type != REDIS_LIST) {
3125 addReply(c,shared.wrongtypeerr);
3126 } else {
3127 list *list = o->ptr;
3128 listNode *ln;
3129 int llen = listLength(list);
3130 int rangelen, j;
3131 robj *ele;
3132
3133 /* convert negative indexes */
3134 if (start < 0) start = llen+start;
3135 if (end < 0) end = llen+end;
3136 if (start < 0) start = 0;
3137 if (end < 0) end = 0;
3138
3139 /* indexes sanity checks */
3140 if (start > end || start >= llen) {
3141 /* Out of range start or start > end result in empty list */
3142 addReply(c,shared.emptymultibulk);
3143 return;
3144 }
3145 if (end >= llen) end = llen-1;
3146 rangelen = (end-start)+1;
3147
3148 /* Return the result in form of a multi-bulk reply */
3149 ln = listIndex(list, start);
3150 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3151 for (j = 0; j < rangelen; j++) {
3152 ele = listNodeValue(ln);
3153 addReplyBulkLen(c,ele);
3154 addReply(c,ele);
3155 addReply(c,shared.crlf);
3156 ln = ln->next;
3157 }
3158 }
3159 }
3160 }
3161
3162 static void ltrimCommand(redisClient *c) {
3163 robj *o;
3164 int start = atoi(c->argv[2]->ptr);
3165 int end = atoi(c->argv[3]->ptr);
3166
3167 o = lookupKeyWrite(c->db,c->argv[1]);
3168 if (o == NULL) {
3169 addReply(c,shared.nokeyerr);
3170 } else {
3171 if (o->type != REDIS_LIST) {
3172 addReply(c,shared.wrongtypeerr);
3173 } else {
3174 list *list = o->ptr;
3175 listNode *ln;
3176 int llen = listLength(list);
3177 int j, ltrim, rtrim;
3178
3179 /* convert negative indexes */
3180 if (start < 0) start = llen+start;
3181 if (end < 0) end = llen+end;
3182 if (start < 0) start = 0;
3183 if (end < 0) end = 0;
3184
3185 /* indexes sanity checks */
3186 if (start > end || start >= llen) {
3187 /* Out of range start or start > end result in empty list */
3188 ltrim = llen;
3189 rtrim = 0;
3190 } else {
3191 if (end >= llen) end = llen-1;
3192 ltrim = start;
3193 rtrim = llen-end-1;
3194 }
3195
3196 /* Remove list elements to perform the trim */
3197 for (j = 0; j < ltrim; j++) {
3198 ln = listFirst(list);
3199 listDelNode(list,ln);
3200 }
3201 for (j = 0; j < rtrim; j++) {
3202 ln = listLast(list);
3203 listDelNode(list,ln);
3204 }
3205 server.dirty++;
3206 addReply(c,shared.ok);
3207 }
3208 }
3209 }
3210
3211 static void lremCommand(redisClient *c) {
3212 robj *o;
3213
3214 o = lookupKeyWrite(c->db,c->argv[1]);
3215 if (o == NULL) {
3216 addReply(c,shared.czero);
3217 } else {
3218 if (o->type != REDIS_LIST) {
3219 addReply(c,shared.wrongtypeerr);
3220 } else {
3221 list *list = o->ptr;
3222 listNode *ln, *next;
3223 int toremove = atoi(c->argv[2]->ptr);
3224 int removed = 0;
3225 int fromtail = 0;
3226
3227 if (toremove < 0) {
3228 toremove = -toremove;
3229 fromtail = 1;
3230 }
3231 ln = fromtail ? list->tail : list->head;
3232 while (ln) {
3233 robj *ele = listNodeValue(ln);
3234
3235 next = fromtail ? ln->prev : ln->next;
3236 if (compareStringObjects(ele,c->argv[3]) == 0) {
3237 listDelNode(list,ln);
3238 server.dirty++;
3239 removed++;
3240 if (toremove && removed == toremove) break;
3241 }
3242 ln = next;
3243 }
3244 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3245 }
3246 }
3247 }
3248
3249 /* ==================================== Sets ================================ */
3250
3251 static void saddCommand(redisClient *c) {
3252 robj *set;
3253
3254 set = lookupKeyWrite(c->db,c->argv[1]);
3255 if (set == NULL) {
3256 set = createSetObject();
3257 dictAdd(c->db->dict,c->argv[1],set);
3258 incrRefCount(c->argv[1]);
3259 } else {
3260 if (set->type != REDIS_SET) {
3261 addReply(c,shared.wrongtypeerr);
3262 return;
3263 }
3264 }
3265 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3266 incrRefCount(c->argv[2]);
3267 server.dirty++;
3268 addReply(c,shared.cone);
3269 } else {
3270 addReply(c,shared.czero);
3271 }
3272 }
3273
3274 static void sremCommand(redisClient *c) {
3275 robj *set;
3276
3277 set = lookupKeyWrite(c->db,c->argv[1]);
3278 if (set == NULL) {
3279 addReply(c,shared.czero);
3280 } else {
3281 if (set->type != REDIS_SET) {
3282 addReply(c,shared.wrongtypeerr);
3283 return;
3284 }
3285 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3286 server.dirty++;
3287 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3288 addReply(c,shared.cone);
3289 } else {
3290 addReply(c,shared.czero);
3291 }
3292 }
3293 }
3294
3295 static void smoveCommand(redisClient *c) {
3296 robj *srcset, *dstset;
3297
3298 srcset = lookupKeyWrite(c->db,c->argv[1]);
3299 dstset = lookupKeyWrite(c->db,c->argv[2]);
3300
3301 /* If the source key does not exist return 0, if it's of the wrong type
3302 * raise an error */
3303 if (srcset == NULL || srcset->type != REDIS_SET) {
3304 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3305 return;
3306 }
3307 /* Error if the destination key is not a set as well */
3308 if (dstset && dstset->type != REDIS_SET) {
3309 addReply(c,shared.wrongtypeerr);
3310 return;
3311 }
3312 /* Remove the element from the source set */
3313 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3314 /* Key not found in the src set! return zero */
3315 addReply(c,shared.czero);
3316 return;
3317 }
3318 server.dirty++;
3319 /* Add the element to the destination set */
3320 if (!dstset) {
3321 dstset = createSetObject();
3322 dictAdd(c->db->dict,c->argv[2],dstset);
3323 incrRefCount(c->argv[2]);
3324 }
3325 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3326 incrRefCount(c->argv[3]);
3327 addReply(c,shared.cone);
3328 }
3329
3330 static void sismemberCommand(redisClient *c) {
3331 robj *set;
3332
3333 set = lookupKeyRead(c->db,c->argv[1]);
3334 if (set == NULL) {
3335 addReply(c,shared.czero);
3336 } else {
3337 if (set->type != REDIS_SET) {
3338 addReply(c,shared.wrongtypeerr);
3339 return;
3340 }
3341 if (dictFind(set->ptr,c->argv[2]))
3342 addReply(c,shared.cone);
3343 else
3344 addReply(c,shared.czero);
3345 }
3346 }
3347
3348 static void scardCommand(redisClient *c) {
3349 robj *o;
3350 dict *s;
3351
3352 o = lookupKeyRead(c->db,c->argv[1]);
3353 if (o == NULL) {
3354 addReply(c,shared.czero);
3355 return;
3356 } else {
3357 if (o->type != REDIS_SET) {
3358 addReply(c,shared.wrongtypeerr);
3359 } else {
3360 s = o->ptr;
3361 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3362 dictSize(s)));
3363 }
3364 }
3365 }
3366
3367 static void spopCommand(redisClient *c) {
3368 robj *set;
3369 dictEntry *de;
3370
3371 set = lookupKeyWrite(c->db,c->argv[1]);
3372 if (set == NULL) {
3373 addReply(c,shared.nullbulk);
3374 } else {
3375 if (set->type != REDIS_SET) {
3376 addReply(c,shared.wrongtypeerr);
3377 return;
3378 }
3379 de = dictGetRandomKey(set->ptr);
3380 if (de == NULL) {
3381 addReply(c,shared.nullbulk);
3382 } else {
3383 robj *ele = dictGetEntryKey(de);
3384
3385 addReplyBulkLen(c,ele);
3386 addReply(c,ele);
3387 addReply(c,shared.crlf);
3388 dictDelete(set->ptr,ele);
3389 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3390 server.dirty++;
3391 }
3392 }
3393 }
3394
3395 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3396 dict **d1 = (void*) s1, **d2 = (void*) s2;
3397
3398 return dictSize(*d1)-dictSize(*d2);
3399 }
3400
3401 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3402 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3403 dictIterator *di;
3404 dictEntry *de;
3405 robj *lenobj = NULL, *dstset = NULL;
3406 int j, cardinality = 0;
3407
3408 if (!dv) oom("sinterGenericCommand");
3409 for (j = 0; j < setsnum; j++) {
3410 robj *setobj;
3411
3412 setobj = dstkey ?
3413 lookupKeyWrite(c->db,setskeys[j]) :
3414 lookupKeyRead(c->db,setskeys[j]);
3415 if (!setobj) {
3416 zfree(dv);
3417 if (dstkey) {
3418 deleteKey(c->db,dstkey);
3419 addReply(c,shared.ok);
3420 } else {
3421 addReply(c,shared.nullmultibulk);
3422 }
3423 return;
3424 }
3425 if (setobj->type != REDIS_SET) {
3426 zfree(dv);
3427 addReply(c,shared.wrongtypeerr);
3428 return;
3429 }
3430 dv[j] = setobj->ptr;
3431 }
3432 /* Sort sets from the smallest to largest, this will improve our
3433 * algorithm's performace */
3434 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3435
3436 /* The first thing we should output is the total number of elements...
3437 * since this is a multi-bulk write, but at this stage we don't know
3438 * the intersection set size, so we use a trick, append an empty object
3439 * to the output list and save the pointer to later modify it with the
3440 * right length */
3441 if (!dstkey) {
3442 lenobj = createObject(REDIS_STRING,NULL);
3443 addReply(c,lenobj);
3444 decrRefCount(lenobj);
3445 } else {
3446 /* If we have a target key where to store the resulting set
3447 * create this key with an empty set inside */
3448 dstset = createSetObject();
3449 }
3450
3451 /* Iterate all the elements of the first (smallest) set, and test
3452 * the element against all the other sets, if at least one set does
3453 * not include the element it is discarded */
3454 di = dictGetIterator(dv[0]);
3455 if (!di) oom("dictGetIterator");
3456
3457 while((de = dictNext(di)) != NULL) {
3458 robj *ele;
3459
3460 for (j = 1; j < setsnum; j++)
3461 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3462 if (j != setsnum)
3463 continue; /* at least one set does not contain the member */
3464 ele = dictGetEntryKey(de);
3465 if (!dstkey) {
3466 addReplyBulkLen(c,ele);
3467 addReply(c,ele);
3468 addReply(c,shared.crlf);
3469 cardinality++;
3470 } else {
3471 dictAdd(dstset->ptr,ele,NULL);
3472 incrRefCount(ele);
3473 }
3474 }
3475 dictReleaseIterator(di);
3476
3477 if (dstkey) {
3478 /* Store the resulting set into the target */
3479 deleteKey(c->db,dstkey);
3480 dictAdd(c->db->dict,dstkey,dstset);
3481 incrRefCount(dstkey);
3482 }
3483
3484 if (!dstkey) {
3485 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3486 } else {
3487 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3488 dictSize((dict*)dstset->ptr)));
3489 server.dirty++;
3490 }
3491 zfree(dv);
3492 }
3493
3494 static void sinterCommand(redisClient *c) {
3495 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3496 }
3497
3498 static void sinterstoreCommand(redisClient *c) {
3499 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3500 }
3501
3502 #define REDIS_OP_UNION 0
3503 #define REDIS_OP_DIFF 1
3504
3505 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3506 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3507 dictIterator *di;
3508 dictEntry *de;
3509 robj *dstset = NULL;
3510 int j, cardinality = 0;
3511
3512 if (!dv) oom("sunionDiffGenericCommand");
3513 for (j = 0; j < setsnum; j++) {
3514 robj *setobj;
3515
3516 setobj = dstkey ?
3517 lookupKeyWrite(c->db,setskeys[j]) :
3518 lookupKeyRead(c->db,setskeys[j]);
3519 if (!setobj) {
3520 dv[j] = NULL;
3521 continue;
3522 }
3523 if (setobj->type != REDIS_SET) {
3524 zfree(dv);
3525 addReply(c,shared.wrongtypeerr);
3526 return;
3527 }
3528 dv[j] = setobj->ptr;
3529 }
3530
3531 /* We need a temp set object to store our union. If the dstkey
3532 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3533 * this set object will be the resulting object to set into the target key*/
3534 dstset = createSetObject();
3535
3536 /* Iterate all the elements of all the sets, add every element a single
3537 * time to the result set */
3538 for (j = 0; j < setsnum; j++) {
3539 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3540 if (!dv[j]) continue; /* non existing keys are like empty sets */
3541
3542 di = dictGetIterator(dv[j]);
3543 if (!di) oom("dictGetIterator");
3544
3545 while((de = dictNext(di)) != NULL) {
3546 robj *ele;
3547
3548 /* dictAdd will not add the same element multiple times */
3549 ele = dictGetEntryKey(de);
3550 if (op == REDIS_OP_UNION || j == 0) {
3551 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3552 incrRefCount(ele);
3553 cardinality++;
3554 }
3555 } else if (op == REDIS_OP_DIFF) {
3556 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3557 cardinality--;
3558 }
3559 }
3560 }
3561 dictReleaseIterator(di);
3562
3563 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3564 }
3565
3566 /* Output the content of the resulting set, if not in STORE mode */
3567 if (!dstkey) {
3568 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3569 di = dictGetIterator(dstset->ptr);
3570 if (!di) oom("dictGetIterator");
3571 while((de = dictNext(di)) != NULL) {
3572 robj *ele;
3573
3574 ele = dictGetEntryKey(de);
3575 addReplyBulkLen(c,ele);
3576 addReply(c,ele);
3577 addReply(c,shared.crlf);
3578 }
3579 dictReleaseIterator(di);
3580 } else {
3581 /* If we have a target key where to store the resulting set
3582 * create this key with the result set inside */
3583 deleteKey(c->db,dstkey);
3584 dictAdd(c->db->dict,dstkey,dstset);
3585 incrRefCount(dstkey);
3586 }
3587
3588 /* Cleanup */
3589 if (!dstkey) {
3590 decrRefCount(dstset);
3591 } else {
3592 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3593 dictSize((dict*)dstset->ptr)));
3594 server.dirty++;
3595 }
3596 zfree(dv);
3597 }
3598
3599 static void sunionCommand(redisClient *c) {
3600 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3601 }
3602
3603 static void sunionstoreCommand(redisClient *c) {
3604 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3605 }
3606
3607 static void sdiffCommand(redisClient *c) {
3608 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3609 }
3610
3611 static void sdiffstoreCommand(redisClient *c) {
3612 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3613 }
3614
3615 static void flushdbCommand(redisClient *c) {
3616 server.dirty += dictSize(c->db->dict);
3617 dictEmpty(c->db->dict);
3618 dictEmpty(c->db->expires);
3619 addReply(c,shared.ok);
3620 }
3621
3622 static void flushallCommand(redisClient *c) {
3623 server.dirty += emptyDb();
3624 addReply(c,shared.ok);
3625 rdbSave(server.dbfilename);
3626 server.dirty++;
3627 }
3628
3629 static redisSortOperation *createSortOperation(int type, robj *pattern) {
3630 redisSortOperation *so = zmalloc(sizeof(*so));
3631 if (!so) oom("createSortOperation");
3632 so->type = type;
3633 so->pattern = pattern;
3634 return so;
3635 }
3636
3637 /* Return the value associated to the key with a name obtained
3638 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3639 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3640 char *p;
3641 sds spat, ssub;
3642 robj keyobj;
3643 int prefixlen, sublen, postfixlen;
3644 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3645 struct {
3646 long len;
3647 long free;
3648 char buf[REDIS_SORTKEY_MAX+1];
3649 } keyname;
3650
3651 if (subst->encoding == REDIS_ENCODING_RAW)
3652 incrRefCount(subst);
3653 else {
3654 subst = getDecodedObject(subst);
3655 }
3656
3657 spat = pattern->ptr;
3658 ssub = subst->ptr;
3659 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3660 p = strchr(spat,'*');
3661 if (!p) return NULL;
3662
3663 prefixlen = p-spat;
3664 sublen = sdslen(ssub);
3665 postfixlen = sdslen(spat)-(prefixlen+1);
3666 memcpy(keyname.buf,spat,prefixlen);
3667 memcpy(keyname.buf+prefixlen,ssub,sublen);
3668 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3669 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3670 keyname.len = prefixlen+sublen+postfixlen;
3671
3672 keyobj.refcount = 1;
3673 keyobj.type = REDIS_STRING;
3674 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3675
3676 decrRefCount(subst);
3677
3678 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3679 return lookupKeyRead(db,&keyobj);
3680 }
3681
3682 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3683 * the additional parameter is not standard but a BSD-specific we have to
3684 * pass sorting parameters via the global 'server' structure */
3685 static int sortCompare(const void *s1, const void *s2) {
3686 const redisSortObject *so1 = s1, *so2 = s2;
3687 int cmp;
3688
3689 if (!server.sort_alpha) {
3690 /* Numeric sorting. Here it's trivial as we precomputed scores */
3691 if (so1->u.score > so2->u.score) {
3692 cmp = 1;
3693 } else if (so1->u.score < so2->u.score) {
3694 cmp = -1;
3695 } else {
3696 cmp = 0;
3697 }
3698 } else {
3699 /* Alphanumeric sorting */
3700 if (server.sort_bypattern) {
3701 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3702 /* At least one compare object is NULL */
3703 if (so1->u.cmpobj == so2->u.cmpobj)
3704 cmp = 0;
3705 else if (so1->u.cmpobj == NULL)
3706 cmp = -1;
3707 else
3708 cmp = 1;
3709 } else {
3710 /* We have both the objects, use strcoll */
3711 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3712 }
3713 } else {
3714 /* Compare elements directly */
3715 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
3716 so2->obj->encoding == REDIS_ENCODING_RAW) {
3717 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3718 } else {
3719 robj *dec1, *dec2;
3720
3721 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
3722 so1->obj : getDecodedObject(so1->obj);
3723 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
3724 so2->obj : getDecodedObject(so2->obj);
3725 cmp = strcoll(dec1->ptr,dec2->ptr);
3726 if (dec1 != so1->obj) decrRefCount(dec1);
3727 if (dec2 != so2->obj) decrRefCount(dec2);
3728 }
3729 }
3730 }
3731 return server.sort_desc ? -cmp : cmp;
3732 }
3733
3734 /* The SORT command is the most complex command in Redis. Warning: this code
3735 * is optimized for speed and a bit less for readability */
3736 static void sortCommand(redisClient *c) {
3737 list *operations;
3738 int outputlen = 0;
3739 int desc = 0, alpha = 0;
3740 int limit_start = 0, limit_count = -1, start, end;
3741 int j, dontsort = 0, vectorlen;
3742 int getop = 0; /* GET operation counter */
3743 robj *sortval, *sortby = NULL;
3744 redisSortObject *vector; /* Resulting vector to sort */
3745
3746 /* Lookup the key to sort. It must be of the right types */
3747 sortval = lookupKeyRead(c->db,c->argv[1]);
3748 if (sortval == NULL) {
3749 addReply(c,shared.nokeyerr);
3750 return;
3751 }
3752 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3753 addReply(c,shared.wrongtypeerr);
3754 return;
3755 }
3756
3757 /* Create a list of operations to perform for every sorted element.
3758 * Operations can be GET/DEL/INCR/DECR */
3759 operations = listCreate();
3760 listSetFreeMethod(operations,zfree);
3761 j = 2;
3762
3763 /* Now we need to protect sortval incrementing its count, in the future
3764 * SORT may have options able to overwrite/delete keys during the sorting
3765 * and the sorted key itself may get destroied */
3766 incrRefCount(sortval);
3767
3768 /* The SORT command has an SQL-alike syntax, parse it */
3769 while(j < c->argc) {
3770 int leftargs = c->argc-j-1;
3771 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3772 desc = 0;
3773 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3774 desc = 1;
3775 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3776 alpha = 1;
3777 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3778 limit_start = atoi(c->argv[j+1]->ptr);
3779 limit_count = atoi(c->argv[j+2]->ptr);
3780 j+=2;
3781 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3782 sortby = c->argv[j+1];
3783 /* If the BY pattern does not contain '*', i.e. it is constant,
3784 * we don't need to sort nor to lookup the weight keys. */
3785 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3786 j++;
3787 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3788 listAddNodeTail(operations,createSortOperation(
3789 REDIS_SORT_GET,c->argv[j+1]));
3790 getop++;
3791 j++;
3792 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3793 listAddNodeTail(operations,createSortOperation(
3794 REDIS_SORT_DEL,c->argv[j+1]));
3795 j++;
3796 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3797 listAddNodeTail(operations,createSortOperation(
3798 REDIS_SORT_INCR,c->argv[j+1]));
3799 j++;
3800 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3801 listAddNodeTail(operations,createSortOperation(
3802 REDIS_SORT_DECR,c->argv[j+1]));
3803 j++;
3804 } else {
3805 decrRefCount(sortval);
3806 listRelease(operations);
3807 addReply(c,shared.syntaxerr);
3808 return;
3809 }
3810 j++;
3811 }
3812
3813 /* Load the sorting vector with all the objects to sort */
3814 vectorlen = (sortval->type == REDIS_LIST) ?
3815 listLength((list*)sortval->ptr) :
3816 dictSize((dict*)sortval->ptr);
3817 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3818 if (!vector) oom("allocating objects vector for SORT");
3819 j = 0;
3820 if (sortval->type == REDIS_LIST) {
3821 list *list = sortval->ptr;
3822 listNode *ln;
3823
3824 listRewind(list);
3825 while((ln = listYield(list))) {
3826 robj *ele = ln->value;
3827 vector[j].obj = ele;
3828 vector[j].u.score = 0;
3829 vector[j].u.cmpobj = NULL;
3830 j++;
3831 }
3832 } else {
3833 dict *set = sortval->ptr;
3834 dictIterator *di;
3835 dictEntry *setele;
3836
3837 di = dictGetIterator(set);
3838 if (!di) oom("dictGetIterator");
3839 while((setele = dictNext(di)) != NULL) {
3840 vector[j].obj = dictGetEntryKey(setele);
3841 vector[j].u.score = 0;
3842 vector[j].u.cmpobj = NULL;
3843 j++;
3844 }
3845 dictReleaseIterator(di);
3846 }
3847 assert(j == vectorlen);
3848
3849 /* Now it's time to load the right scores in the sorting vector */
3850 if (dontsort == 0) {
3851 for (j = 0; j < vectorlen; j++) {
3852 if (sortby) {
3853 robj *byval;
3854
3855 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3856 if (!byval || byval->type != REDIS_STRING) continue;
3857 if (alpha) {
3858 if (byval->encoding == REDIS_ENCODING_RAW) {
3859 vector[j].u.cmpobj = byval;
3860 incrRefCount(byval);
3861 } else {
3862 vector[j].u.cmpobj = getDecodedObject(byval);
3863 }
3864 } else {
3865 if (byval->encoding == REDIS_ENCODING_RAW) {
3866 vector[j].u.score = strtod(byval->ptr,NULL);
3867 } else {
3868 if (byval->encoding == REDIS_ENCODING_INT) {
3869 vector[j].u.score = (long)byval->ptr;
3870 } else
3871 assert(1 != 1);
3872 }
3873 }
3874 } else {
3875 if (!alpha) {
3876 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
3877 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3878 else {
3879 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
3880 vector[j].u.score = (long) vector[j].obj->ptr;
3881 else
3882 assert(1 != 1);
3883 }
3884 }
3885 }
3886 }
3887 }
3888
3889 /* We are ready to sort the vector... perform a bit of sanity check
3890 * on the LIMIT option too. We'll use a partial version of quicksort. */
3891 start = (limit_start < 0) ? 0 : limit_start;
3892 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3893 if (start >= vectorlen) {
3894 start = vectorlen-1;
3895 end = vectorlen-2;
3896 }
3897 if (end >= vectorlen) end = vectorlen-1;
3898
3899 if (dontsort == 0) {
3900 server.sort_desc = desc;
3901 server.sort_alpha = alpha;
3902 server.sort_bypattern = sortby ? 1 : 0;
3903 if (sortby && (start != 0 || end != vectorlen-1))
3904 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3905 else
3906 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3907 }
3908
3909 /* Send command output to the output buffer, performing the specified
3910 * GET/DEL/INCR/DECR operations if any. */
3911 outputlen = getop ? getop*(end-start+1) : end-start+1;
3912 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3913 for (j = start; j <= end; j++) {
3914 listNode *ln;
3915 if (!getop) {
3916 addReplyBulkLen(c,vector[j].obj);
3917 addReply(c,vector[j].obj);
3918 addReply(c,shared.crlf);
3919 }
3920 listRewind(operations);
3921 while((ln = listYield(operations))) {
3922 redisSortOperation *sop = ln->value;
3923 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3924 vector[j].obj);
3925
3926 if (sop->type == REDIS_SORT_GET) {
3927 if (!val || val->type != REDIS_STRING) {
3928 addReply(c,shared.nullbulk);
3929 } else {
3930 addReplyBulkLen(c,val);
3931 addReply(c,val);
3932 addReply(c,shared.crlf);
3933 }
3934 } else if (sop->type == REDIS_SORT_DEL) {
3935 /* TODO */
3936 }
3937 }
3938 }
3939
3940 /* Cleanup */
3941 decrRefCount(sortval);
3942 listRelease(operations);
3943 for (j = 0; j < vectorlen; j++) {
3944 if (sortby && alpha && vector[j].u.cmpobj)
3945 decrRefCount(vector[j].u.cmpobj);
3946 }
3947 zfree(vector);
3948 }
3949
3950 static void infoCommand(redisClient *c) {
3951 sds info;
3952 time_t uptime = time(NULL)-server.stat_starttime;
3953 int j;
3954
3955 info = sdscatprintf(sdsempty(),
3956 "redis_version:%s\r\n"
3957 "arch_bits:%s\r\n"
3958 "uptime_in_seconds:%d\r\n"
3959 "uptime_in_days:%d\r\n"
3960 "connected_clients:%d\r\n"
3961 "connected_slaves:%d\r\n"
3962 "used_memory:%zu\r\n"
3963 "changes_since_last_save:%lld\r\n"
3964 "bgsave_in_progress:%d\r\n"
3965 "last_save_time:%d\r\n"
3966 "total_connections_received:%lld\r\n"
3967 "total_commands_processed:%lld\r\n"
3968 "role:%s\r\n"
3969 ,REDIS_VERSION,
3970 (sizeof(long) == 8) ? "64" : "32",
3971 uptime,
3972 uptime/(3600*24),
3973 listLength(server.clients)-listLength(server.slaves),
3974 listLength(server.slaves),
3975 server.usedmemory,
3976 server.dirty,
3977 server.bgsaveinprogress,
3978 server.lastsave,
3979 server.stat_numconnections,
3980 server.stat_numcommands,
3981 server.masterhost == NULL ? "master" : "slave"
3982 );
3983 if (server.masterhost) {
3984 info = sdscatprintf(info,
3985 "master_host:%s\r\n"
3986 "master_port:%d\r\n"
3987 "master_link_status:%s\r\n"
3988 "master_last_io_seconds_ago:%d\r\n"
3989 ,server.masterhost,
3990 server.masterport,
3991 (server.replstate == REDIS_REPL_CONNECTED) ?
3992 "up" : "down",
3993 (int)(time(NULL)-server.master->lastinteraction)
3994 );
3995 }
3996 for (j = 0; j < server.dbnum; j++) {
3997 long long keys, vkeys;
3998
3999 keys = dictSize(server.db[j].dict);
4000 vkeys = dictSize(server.db[j].expires);
4001 if (keys || vkeys) {
4002 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4003 j, keys, vkeys);
4004 }
4005 }
4006 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4007 addReplySds(c,info);
4008 addReply(c,shared.crlf);
4009 }
4010
4011 static void monitorCommand(redisClient *c) {
4012 /* ignore MONITOR if aleady slave or in monitor mode */
4013 if (c->flags & REDIS_SLAVE) return;
4014
4015 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4016 c->slaveseldb = 0;
4017 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
4018 addReply(c,shared.ok);
4019 }
4020
4021 /* ================================= Expire ================================= */
4022 static int removeExpire(redisDb *db, robj *key) {
4023 if (dictDelete(db->expires,key) == DICT_OK) {
4024 return 1;
4025 } else {
4026 return 0;
4027 }
4028 }
4029
4030 static int setExpire(redisDb *db, robj *key, time_t when) {
4031 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4032 return 0;
4033 } else {
4034 incrRefCount(key);
4035 return 1;
4036 }
4037 }
4038
4039 /* Return the expire time of the specified key, or -1 if no expire
4040 * is associated with this key (i.e. the key is non volatile) */
4041 static time_t getExpire(redisDb *db, robj *key) {
4042 dictEntry *de;
4043
4044 /* No expire? return ASAP */
4045 if (dictSize(db->expires) == 0 ||
4046 (de = dictFind(db->expires,key)) == NULL) return -1;
4047
4048 return (time_t) dictGetEntryVal(de);
4049 }
4050
4051 static int expireIfNeeded(redisDb *db, robj *key) {
4052 time_t when;
4053 dictEntry *de;
4054
4055 /* No expire? return ASAP */
4056 if (dictSize(db->expires) == 0 ||
4057 (de = dictFind(db->expires,key)) == NULL) return 0;
4058
4059 /* Lookup the expire */
4060 when = (time_t) dictGetEntryVal(de);
4061 if (time(NULL) <= when) return 0;
4062
4063 /* Delete the key */
4064 dictDelete(db->expires,key);
4065 return dictDelete(db->dict,key) == DICT_OK;
4066 }
4067
4068 static int deleteIfVolatile(redisDb *db, robj *key) {
4069 dictEntry *de;
4070
4071 /* No expire? return ASAP */
4072 if (dictSize(db->expires) == 0 ||
4073 (de = dictFind(db->expires,key)) == NULL) return 0;
4074
4075 /* Delete the key */
4076 server.dirty++;
4077 dictDelete(db->expires,key);
4078 return dictDelete(db->dict,key) == DICT_OK;
4079 }
4080
4081 static void expireCommand(redisClient *c) {
4082 dictEntry *de;
4083 int seconds = atoi(c->argv[2]->ptr);
4084
4085 de = dictFind(c->db->dict,c->argv[1]);
4086 if (de == NULL) {
4087 addReply(c,shared.czero);
4088 return;
4089 }
4090 if (seconds <= 0) {
4091 addReply(c, shared.czero);
4092 return;
4093 } else {
4094 time_t when = time(NULL)+seconds;
4095 if (setExpire(c->db,c->argv[1],when)) {
4096 addReply(c,shared.cone);
4097 server.dirty++;
4098 } else {
4099 addReply(c,shared.czero);
4100 }
4101 return;
4102 }
4103 }
4104
4105 static void ttlCommand(redisClient *c) {
4106 time_t expire;
4107 int ttl = -1;
4108
4109 expire = getExpire(c->db,c->argv[1]);
4110 if (expire != -1) {
4111 ttl = (int) (expire-time(NULL));
4112 if (ttl < 0) ttl = -1;
4113 }
4114 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4115 }
4116
4117 static void msetGenericCommand(redisClient *c, int nx) {
4118 int j;
4119
4120 if ((c->argc % 2) == 0) {
4121 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4122 return;
4123 }
4124 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4125 * set nothing at all if at least one already key exists. */
4126 if (nx) {
4127 for (j = 1; j < c->argc; j += 2) {
4128 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4129 addReply(c, shared.czero);
4130 return;
4131 }
4132 }
4133 }
4134
4135 for (j = 1; j < c->argc; j += 2) {
4136 dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4137 incrRefCount(c->argv[j]);
4138 incrRefCount(c->argv[j+1]);
4139 removeExpire(c->db,c->argv[j]);
4140 }
4141 server.dirty += (c->argc-1)/2;
4142 addReply(c, nx ? shared.cone : shared.ok);
4143 }
4144
4145 static void msetCommand(redisClient *c) {
4146 msetGenericCommand(c,0);
4147 }
4148
4149 static void msetnxCommand(redisClient *c) {
4150 msetGenericCommand(c,1);
4151 }
4152
4153 /* =============================== Replication ============================= */
4154
4155 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4156 ssize_t nwritten, ret = size;
4157 time_t start = time(NULL);
4158
4159 timeout++;
4160 while(size) {
4161 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4162 nwritten = write(fd,ptr,size);
4163 if (nwritten == -1) return -1;
4164 ptr += nwritten;
4165 size -= nwritten;
4166 }
4167 if ((time(NULL)-start) > timeout) {
4168 errno = ETIMEDOUT;
4169 return -1;
4170 }
4171 }
4172 return ret;
4173 }
4174
4175 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4176 ssize_t nread, totread = 0;
4177 time_t start = time(NULL);
4178
4179 timeout++;
4180 while(size) {
4181 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4182 nread = read(fd,ptr,size);
4183 if (nread == -1) return -1;
4184 ptr += nread;
4185 size -= nread;
4186 totread += nread;
4187 }
4188 if ((time(NULL)-start) > timeout) {
4189 errno = ETIMEDOUT;
4190 return -1;
4191 }
4192 }
4193 return totread;
4194 }
4195
4196 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4197 ssize_t nread = 0;
4198
4199 size--;
4200 while(size) {
4201 char c;
4202
4203 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4204 if (c == '\n') {
4205 *ptr = '\0';
4206 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4207 return nread;
4208 } else {
4209 *ptr++ = c;
4210 *ptr = '\0';
4211 nread++;
4212 }
4213 }
4214 return nread;
4215 }
4216
4217 static void syncCommand(redisClient *c) {
4218 /* ignore SYNC if aleady slave or in monitor mode */
4219 if (c->flags & REDIS_SLAVE) return;
4220
4221 /* SYNC can't be issued when the server has pending data to send to
4222 * the client about already issued commands. We need a fresh reply
4223 * buffer registering the differences between the BGSAVE and the current
4224 * dataset, so that we can copy to other slaves if needed. */
4225 if (listLength(c->reply) != 0) {
4226 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4227 return;
4228 }
4229
4230 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4231 /* Here we need to check if there is a background saving operation
4232 * in progress, or if it is required to start one */
4233 if (server.bgsaveinprogress) {
4234 /* Ok a background save is in progress. Let's check if it is a good
4235 * one for replication, i.e. if there is another slave that is
4236 * registering differences since the server forked to save */
4237 redisClient *slave;
4238 listNode *ln;
4239
4240 listRewind(server.slaves);
4241 while((ln = listYield(server.slaves))) {
4242 slave = ln->value;
4243 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4244 }
4245 if (ln) {
4246 /* Perfect, the server is already registering differences for
4247 * another slave. Set the right state, and copy the buffer. */
4248 listRelease(c->reply);
4249 c->reply = listDup(slave->reply);
4250 if (!c->reply) oom("listDup copying slave reply list");
4251 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4252 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4253 } else {
4254 /* No way, we need to wait for the next BGSAVE in order to
4255 * register differences */
4256 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4257 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4258 }
4259 } else {
4260 /* Ok we don't have a BGSAVE in progress, let's start one */
4261 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4262 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4263 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4264 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4265 return;
4266 }
4267 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4268 }
4269 c->repldbfd = -1;
4270 c->flags |= REDIS_SLAVE;
4271 c->slaveseldb = 0;
4272 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
4273 return;
4274 }
4275
4276 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4277 redisClient *slave = privdata;
4278 REDIS_NOTUSED(el);
4279 REDIS_NOTUSED(mask);
4280 char buf[REDIS_IOBUF_LEN];
4281 ssize_t nwritten, buflen;
4282
4283 if (slave->repldboff == 0) {
4284 /* Write the bulk write count before to transfer the DB. In theory here
4285 * we don't know how much room there is in the output buffer of the
4286 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4287 * operations) will never be smaller than the few bytes we need. */
4288 sds bulkcount;
4289
4290 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4291 slave->repldbsize);
4292 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4293 {
4294 sdsfree(bulkcount);
4295 freeClient(slave);
4296 return;
4297 }
4298 sdsfree(bulkcount);
4299 }
4300 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4301 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4302 if (buflen <= 0) {
4303 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4304 (buflen == 0) ? "premature EOF" : strerror(errno));
4305 freeClient(slave);
4306 return;
4307 }
4308 if ((nwritten = write(fd,buf,buflen)) == -1) {
4309 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4310 strerror(errno));
4311 freeClient(slave);
4312 return;
4313 }
4314 slave->repldboff += nwritten;
4315 if (slave->repldboff == slave->repldbsize) {
4316 close(slave->repldbfd);
4317 slave->repldbfd = -1;
4318 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4319 slave->replstate = REDIS_REPL_ONLINE;
4320 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4321 sendReplyToClient, slave, NULL) == AE_ERR) {
4322 freeClient(slave);
4323 return;
4324 }
4325 addReplySds(slave,sdsempty());
4326 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4327 }
4328 }
4329
4330 /* This function is called at the end of every backgrond saving.
4331 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4332 * otherwise REDIS_ERR is passed to the function.
4333 *
4334 * The goal of this function is to handle slaves waiting for a successful
4335 * background saving in order to perform non-blocking synchronization. */
4336 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4337 listNode *ln;
4338 int startbgsave = 0;
4339
4340 listRewind(server.slaves);
4341 while((ln = listYield(server.slaves))) {
4342 redisClient *slave = ln->value;
4343
4344 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4345 startbgsave = 1;
4346 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4347 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4348 struct redis_stat buf;
4349
4350 if (bgsaveerr != REDIS_OK) {
4351 freeClient(slave);
4352 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4353 continue;
4354 }
4355 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4356 redis_fstat(slave->repldbfd,&buf) == -1) {
4357 freeClient(slave);
4358 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4359 continue;
4360 }
4361 slave->repldboff = 0;
4362 slave->repldbsize = buf.st_size;
4363 slave->replstate = REDIS_REPL_SEND_BULK;
4364 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4365 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4366 freeClient(slave);
4367 continue;
4368 }
4369 }
4370 }
4371 if (startbgsave) {
4372 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4373 listRewind(server.slaves);
4374 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4375 while((ln = listYield(server.slaves))) {
4376 redisClient *slave = ln->value;
4377
4378 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4379 freeClient(slave);
4380 }
4381 }
4382 }
4383 }
4384
4385 static int syncWithMaster(void) {
4386 char buf[1024], tmpfile[256];
4387 int dumpsize;
4388 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4389 int dfd;
4390
4391 if (fd == -1) {
4392 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4393 strerror(errno));
4394 return REDIS_ERR;
4395 }
4396 /* Issue the SYNC command */
4397 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4398 close(fd);
4399 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4400 strerror(errno));
4401 return REDIS_ERR;
4402 }
4403 /* Read the bulk write count */
4404 if (syncReadLine(fd,buf,1024,3600) == -1) {
4405 close(fd);
4406 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
4407 strerror(errno));
4408 return REDIS_ERR;
4409 }
4410 dumpsize = atoi(buf+1);
4411 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4412 /* Read the bulk write data on a temp file */
4413 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4414 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4415 if (dfd == -1) {
4416 close(fd);
4417 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
4418 return REDIS_ERR;
4419 }
4420 while(dumpsize) {
4421 int nread, nwritten;
4422
4423 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
4424 if (nread == -1) {
4425 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
4426 strerror(errno));
4427 close(fd);
4428 close(dfd);
4429 return REDIS_ERR;
4430 }
4431 nwritten = write(dfd,buf,nread);
4432 if (nwritten == -1) {
4433 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
4434 close(fd);
4435 close(dfd);
4436 return REDIS_ERR;
4437 }
4438 dumpsize -= nread;
4439 }
4440 close(dfd);
4441 if (rename(tmpfile,server.dbfilename) == -1) {
4442 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
4443 unlink(tmpfile);
4444 close(fd);
4445 return REDIS_ERR;
4446 }
4447 emptyDb();
4448 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4449 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4450 close(fd);
4451 return REDIS_ERR;
4452 }
4453 server.master = createClient(fd);
4454 server.master->flags |= REDIS_MASTER;
4455 server.replstate = REDIS_REPL_CONNECTED;
4456 return REDIS_OK;
4457 }
4458
4459 static void slaveofCommand(redisClient *c) {
4460 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4461 !strcasecmp(c->argv[2]->ptr,"one")) {
4462 if (server.masterhost) {
4463 sdsfree(server.masterhost);
4464 server.masterhost = NULL;
4465 if (server.master) freeClient(server.master);
4466 server.replstate = REDIS_REPL_NONE;
4467 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4468 }
4469 } else {
4470 sdsfree(server.masterhost);
4471 server.masterhost = sdsdup(c->argv[1]->ptr);
4472 server.masterport = atoi(c->argv[2]->ptr);
4473 if (server.master) freeClient(server.master);
4474 server.replstate = REDIS_REPL_CONNECT;
4475 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4476 server.masterhost, server.masterport);
4477 }
4478 addReply(c,shared.ok);
4479 }
4480
4481 /* ============================ Maxmemory directive ======================== */
4482
4483 /* This function gets called when 'maxmemory' is set on the config file to limit
4484 * the max memory used by the server, and we are out of memory.
4485 * This function will try to, in order:
4486 *
4487 * - Free objects from the free list
4488 * - Try to remove keys with an EXPIRE set
4489 *
4490 * It is not possible to free enough memory to reach used-memory < maxmemory
4491 * the server will start refusing commands that will enlarge even more the
4492 * memory usage.
4493 */
4494 static void freeMemoryIfNeeded(void) {
4495 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4496 if (listLength(server.objfreelist)) {
4497 robj *o;
4498
4499 listNode *head = listFirst(server.objfreelist);
4500 o = listNodeValue(head);
4501 listDelNode(server.objfreelist,head);
4502 zfree(o);
4503 } else {
4504 int j, k, freed = 0;
4505
4506 for (j = 0; j < server.dbnum; j++) {
4507 int minttl = -1;
4508 robj *minkey = NULL;
4509 struct dictEntry *de;
4510
4511 if (dictSize(server.db[j].expires)) {
4512 freed = 1;
4513 /* From a sample of three keys drop the one nearest to
4514 * the natural expire */
4515 for (k = 0; k < 3; k++) {
4516 time_t t;
4517
4518 de = dictGetRandomKey(server.db[j].expires);
4519 t = (time_t) dictGetEntryVal(de);
4520 if (minttl == -1 || t < minttl) {
4521 minkey = dictGetEntryKey(de);
4522 minttl = t;
4523 }
4524 }
4525 deleteKey(server.db+j,minkey);
4526 }
4527 }
4528 if (!freed) return; /* nothing to free... */
4529 }
4530 }
4531 }
4532
4533 /* ================================= Debugging ============================== */
4534
4535 static void debugCommand(redisClient *c) {
4536 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4537 *((char*)-1) = 'x';
4538 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4539 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4540 robj *key, *val;
4541
4542 if (!de) {
4543 addReply(c,shared.nokeyerr);
4544 return;
4545 }
4546 key = dictGetEntryKey(de);
4547 val = dictGetEntryVal(de);
4548 addReplySds(c,sdscatprintf(sdsempty(),
4549 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4550 key, key->refcount, val, val->refcount, val->encoding));
4551 } else {
4552 addReplySds(c,sdsnew(
4553 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4554 }
4555 }
4556
4557 #ifdef HAVE_BACKTRACE
4558 static struct redisFunctionSym symsTable[] = {
4559 {"compareStringObjects", (unsigned long)compareStringObjects},
4560 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
4561 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
4562 {"dictEncObjHash", (unsigned long)dictEncObjHash},
4563 {"incrDecrCommand", (unsigned long)incrDecrCommand},
4564 {"freeStringObject", (unsigned long)freeStringObject},
4565 {"freeListObject", (unsigned long)freeListObject},
4566 {"freeSetObject", (unsigned long)freeSetObject},
4567 {"decrRefCount", (unsigned long)decrRefCount},
4568 {"createObject", (unsigned long)createObject},
4569 {"freeClient", (unsigned long)freeClient},
4570 {"rdbLoad", (unsigned long)rdbLoad},
4571 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
4572 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
4573 {"addReply", (unsigned long)addReply},
4574 {"addReplySds", (unsigned long)addReplySds},
4575 {"incrRefCount", (unsigned long)incrRefCount},
4576 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
4577 {"createStringObject", (unsigned long)createStringObject},
4578 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
4579 {"syncWithMaster", (unsigned long)syncWithMaster},
4580 {"tryObjectSharing", (unsigned long)tryObjectSharing},
4581 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
4582 {"getDecodedObject", (unsigned long)getDecodedObject},
4583 {"removeExpire", (unsigned long)removeExpire},
4584 {"expireIfNeeded", (unsigned long)expireIfNeeded},
4585 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
4586 {"deleteKey", (unsigned long)deleteKey},
4587 {"getExpire", (unsigned long)getExpire},
4588 {"setExpire", (unsigned long)setExpire},
4589 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
4590 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
4591 {"authCommand", (unsigned long)authCommand},
4592 {"pingCommand", (unsigned long)pingCommand},
4593 {"echoCommand", (unsigned long)echoCommand},
4594 {"setCommand", (unsigned long)setCommand},
4595 {"setnxCommand", (unsigned long)setnxCommand},
4596 {"getCommand", (unsigned long)getCommand},
4597 {"delCommand", (unsigned long)delCommand},
4598 {"existsCommand", (unsigned long)existsCommand},
4599 {"incrCommand", (unsigned long)incrCommand},
4600 {"decrCommand", (unsigned long)decrCommand},
4601 {"incrbyCommand", (unsigned long)incrbyCommand},
4602 {"decrbyCommand", (unsigned long)decrbyCommand},
4603 {"selectCommand", (unsigned long)selectCommand},
4604 {"randomkeyCommand", (unsigned long)randomkeyCommand},
4605 {"keysCommand", (unsigned long)keysCommand},
4606 {"dbsizeCommand", (unsigned long)dbsizeCommand},
4607 {"lastsaveCommand", (unsigned long)lastsaveCommand},
4608 {"saveCommand", (unsigned long)saveCommand},
4609 {"bgsaveCommand", (unsigned long)bgsaveCommand},
4610 {"shutdownCommand", (unsigned long)shutdownCommand},
4611 {"moveCommand", (unsigned long)moveCommand},
4612 {"renameCommand", (unsigned long)renameCommand},
4613 {"renamenxCommand", (unsigned long)renamenxCommand},
4614 {"lpushCommand", (unsigned long)lpushCommand},
4615 {"rpushCommand", (unsigned long)rpushCommand},
4616 {"lpopCommand", (unsigned long)lpopCommand},
4617 {"rpopCommand", (unsigned long)rpopCommand},
4618 {"llenCommand", (unsigned long)llenCommand},
4619 {"lindexCommand", (unsigned long)lindexCommand},
4620 {"lrangeCommand", (unsigned long)lrangeCommand},
4621 {"ltrimCommand", (unsigned long)ltrimCommand},
4622 {"typeCommand", (unsigned long)typeCommand},
4623 {"lsetCommand", (unsigned long)lsetCommand},
4624 {"saddCommand", (unsigned long)saddCommand},
4625 {"sremCommand", (unsigned long)sremCommand},
4626 {"smoveCommand", (unsigned long)smoveCommand},
4627 {"sismemberCommand", (unsigned long)sismemberCommand},
4628 {"scardCommand", (unsigned long)scardCommand},
4629 {"spopCommand", (unsigned long)spopCommand},
4630 {"sinterCommand", (unsigned long)sinterCommand},
4631 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
4632 {"sunionCommand", (unsigned long)sunionCommand},
4633 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
4634 {"sdiffCommand", (unsigned long)sdiffCommand},
4635 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
4636 {"syncCommand", (unsigned long)syncCommand},
4637 {"flushdbCommand", (unsigned long)flushdbCommand},
4638 {"flushallCommand", (unsigned long)flushallCommand},
4639 {"sortCommand", (unsigned long)sortCommand},
4640 {"lremCommand", (unsigned long)lremCommand},
4641 {"infoCommand", (unsigned long)infoCommand},
4642 {"mgetCommand", (unsigned long)mgetCommand},
4643 {"monitorCommand", (unsigned long)monitorCommand},
4644 {"expireCommand", (unsigned long)expireCommand},
4645 {"getsetCommand", (unsigned long)getsetCommand},
4646 {"ttlCommand", (unsigned long)ttlCommand},
4647 {"slaveofCommand", (unsigned long)slaveofCommand},
4648 {"debugCommand", (unsigned long)debugCommand},
4649 {"processCommand", (unsigned long)processCommand},
4650 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
4651 {"readQueryFromClient", (unsigned long)readQueryFromClient},
4652 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
4653 {"msetGenericCommand", (unsigned long)msetGenericCommand},
4654 {"msetCommand", (unsigned long)msetCommand},
4655 {"msetnxCommand", (unsigned long)msetnxCommand},
4656 {NULL,0}
4657 };
4658
4659 /* This function try to convert a pointer into a function name. It's used in
4660 * oreder to provide a backtrace under segmentation fault that's able to
4661 * display functions declared as static (otherwise the backtrace is useless). */
4662 static char *findFuncName(void *pointer, unsigned long *offset){
4663 int i, ret = -1;
4664 unsigned long off, minoff = 0;
4665
4666 /* Try to match against the Symbol with the smallest offset */
4667 for (i=0; symsTable[i].pointer; i++) {
4668 unsigned long lp = (unsigned long) pointer;
4669
4670 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
4671 off=lp-symsTable[i].pointer;
4672 if (ret < 0 || off < minoff) {
4673 minoff=off;
4674 ret=i;
4675 }
4676 }
4677 }
4678 if (ret == -1) return NULL;
4679 *offset = minoff;
4680 return symsTable[ret].name;
4681 }
4682
4683 static void *getMcontextEip(ucontext_t *uc) {
4684 #if defined(__FreeBSD__)
4685 return (void*) uc->uc_mcontext.mc_eip;
4686 #elif defined(__dietlibc__)
4687 return (void*) uc->uc_mcontext.eip;
4688 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
4689 return (void*) uc->uc_mcontext->__ss.__eip;
4690 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
4691 #ifdef _STRUCT_X86_THREAD_STATE64
4692 return (void*) uc->uc_mcontext->__ss.__rip;
4693 #else
4694 return (void*) uc->uc_mcontext->__ss.__eip;
4695 #endif
4696 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
4697 return (void*) uc->uc_mcontext.gregs[REG_EIP];
4698 #elif defined(__ia64__) /* Linux IA64 */
4699 return (void*) uc->uc_mcontext.sc_ip;
4700 #else
4701 return NULL;
4702 #endif
4703 }
4704
4705 static void segvHandler(int sig, siginfo_t *info, void *secret) {
4706 void *trace[100];
4707 char **messages = NULL;
4708 int i, trace_size = 0;
4709 unsigned long offset=0;
4710 time_t uptime = time(NULL)-server.stat_starttime;
4711 ucontext_t *uc = (ucontext_t*) secret;
4712 REDIS_NOTUSED(info);
4713
4714 redisLog(REDIS_WARNING,
4715 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
4716 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
4717 "redis_version:%s; "
4718 "uptime_in_seconds:%d; "
4719 "connected_clients:%d; "
4720 "connected_slaves:%d; "
4721 "used_memory:%zu; "
4722 "changes_since_last_save:%lld; "
4723 "bgsave_in_progress:%d; "
4724 "last_save_time:%d; "
4725 "total_connections_received:%lld; "
4726 "total_commands_processed:%lld; "
4727 "role:%s;"
4728 ,REDIS_VERSION,
4729 uptime,
4730 listLength(server.clients)-listLength(server.slaves),
4731 listLength(server.slaves),
4732 server.usedmemory,
4733 server.dirty,
4734 server.bgsaveinprogress,
4735 server.lastsave,
4736 server.stat_numconnections,
4737 server.stat_numcommands,
4738 server.masterhost == NULL ? "master" : "slave"
4739 ));
4740
4741 trace_size = backtrace(trace, 100);
4742 /* overwrite sigaction with caller's address */
4743 if (getMcontextEip(uc) != NULL) {
4744 trace[1] = getMcontextEip(uc);
4745 }
4746 messages = backtrace_symbols(trace, trace_size);
4747
4748 for (i=1; i<trace_size; ++i) {
4749 char *fn = findFuncName(trace[i], &offset), *p;
4750
4751 p = strchr(messages[i],'+');
4752 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
4753 redisLog(REDIS_WARNING,"%s", messages[i]);
4754 } else {
4755 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
4756 }
4757 }
4758 free(messages);
4759 exit(0);
4760 }
4761
4762 static void setupSigSegvAction(void) {
4763 struct sigaction act;
4764
4765 sigemptyset (&act.sa_mask);
4766 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4767 * is used. Otherwise, sa_handler is used */
4768 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
4769 act.sa_sigaction = segvHandler;
4770 sigaction (SIGSEGV, &act, NULL);
4771 sigaction (SIGBUS, &act, NULL);
4772 sigaction (SIGFPE, &act, NULL);
4773 sigaction (SIGILL, &act, NULL);
4774 sigaction (SIGBUS, &act, NULL);
4775 return;
4776 }
4777 #else /* HAVE_BACKTRACE */
4778 static void setupSigSegvAction(void) {
4779 }
4780 #endif /* HAVE_BACKTRACE */
4781
4782 /* =================================== Main! ================================ */
4783
4784 #ifdef __linux__
4785 int linuxOvercommitMemoryValue(void) {
4786 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4787 char buf[64];
4788
4789 if (!fp) return -1;
4790 if (fgets(buf,64,fp) == NULL) {
4791 fclose(fp);
4792 return -1;
4793 }
4794 fclose(fp);
4795
4796 return atoi(buf);
4797 }
4798
4799 void linuxOvercommitMemoryWarning(void) {
4800 if (linuxOvercommitMemoryValue() == 0) {
4801 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
4802 }
4803 }
4804 #endif /* __linux__ */
4805
4806 static void daemonize(void) {
4807 int fd;
4808 FILE *fp;
4809
4810 if (fork() != 0) exit(0); /* parent exits */
4811 setsid(); /* create a new session */
4812
4813 /* Every output goes to /dev/null. If Redis is daemonized but
4814 * the 'logfile' is set to 'stdout' in the configuration file
4815 * it will not log at all. */
4816 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4817 dup2(fd, STDIN_FILENO);
4818 dup2(fd, STDOUT_FILENO);
4819 dup2(fd, STDERR_FILENO);
4820 if (fd > STDERR_FILENO) close(fd);
4821 }
4822 /* Try to write the pid file */
4823 fp = fopen(server.pidfile,"w");
4824 if (fp) {
4825 fprintf(fp,"%d\n",getpid());
4826 fclose(fp);
4827 }
4828 }
4829
4830 int main(int argc, char **argv) {
4831 initServerConfig();
4832 if (argc == 2) {
4833 ResetServerSaveParams();
4834 loadServerConfig(argv[1]);
4835 } else if (argc > 2) {
4836 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4837 exit(1);
4838 } else {
4839 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4840 }
4841 initServer();
4842 if (server.daemonize) daemonize();
4843 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4844 #ifdef __linux__
4845 linuxOvercommitMemoryWarning();
4846 #endif
4847 if (rdbLoad(server.dbfilename) == REDIS_OK)
4848 redisLog(REDIS_NOTICE,"DB loaded from disk");
4849 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4850 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4851 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4852 aeMain(server.el);
4853 aeDeleteEventLoop(server.el);
4854 return 0;
4855 }