]> git.saurik.com Git - redis.git/blob - redis.c
9882dc41ca73781dc1f677395e0ff7f48b840c67
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60
61 #include "redis.h"
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
70
71 /* Error codes */
72 #define REDIS_OK 0
73 #define REDIS_ERR -1
74
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
88
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
91
92 /* Command flags */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
100
101 /* Object types */
102 #define REDIS_STRING 0
103 #define REDIS_LIST 1
104 #define REDIS_SET 2
105 #define REDIS_ZSET 3
106 #define REDIS_HASH 4
107
108 /* Objects encoding */
109 #define REDIS_ENCODING_RAW 0 /* Raw representation */
110 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
111
112 /* Object types only used for dumping to disk */
113 #define REDIS_EXPIRETIME 253
114 #define REDIS_SELECTDB 254
115 #define REDIS_EOF 255
116
117 /* Defines related to the dump file format. To store 32 bits lengths for short
118 * keys requires a lot of space, so we check the most significant 2 bits of
119 * the first byte to interpreter the length:
120 *
121 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
122 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
123 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
124 * 11|000000 this means: specially encoded object will follow. The six bits
125 * number specify the kind of object that follows.
126 * See the REDIS_RDB_ENC_* defines.
127 *
128 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
129 * values, will fit inside. */
130 #define REDIS_RDB_6BITLEN 0
131 #define REDIS_RDB_14BITLEN 1
132 #define REDIS_RDB_32BITLEN 2
133 #define REDIS_RDB_ENCVAL 3
134 #define REDIS_RDB_LENERR UINT_MAX
135
136 /* When a length of a string object stored on disk has the first two bits
137 * set, the remaining two bits specify a special encoding for the object
138 * accordingly to the following defines: */
139 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
140 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
141 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
142 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
143
144 /* Client flags */
145 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
146 #define REDIS_SLAVE 2 /* This client is a slave server */
147 #define REDIS_MASTER 4 /* This client is a master server */
148 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
149
150 /* Slave replication state - slave side */
151 #define REDIS_REPL_NONE 0 /* No active replication */
152 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
153 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
154
155 /* Slave replication state - from the point of view of master
156 * Note that in SEND_BULK and ONLINE state the slave receives new updates
157 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
158 * to start the next background saving in order to send updates to it. */
159 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
160 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
161 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
162 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
163
164 /* List related stuff */
165 #define REDIS_HEAD 0
166 #define REDIS_TAIL 1
167
168 /* Sort operations */
169 #define REDIS_SORT_GET 0
170 #define REDIS_SORT_DEL 1
171 #define REDIS_SORT_INCR 2
172 #define REDIS_SORT_DECR 3
173 #define REDIS_SORT_ASC 4
174 #define REDIS_SORT_DESC 5
175 #define REDIS_SORTKEY_MAX 1024
176
177 /* Log levels */
178 #define REDIS_DEBUG 0
179 #define REDIS_NOTICE 1
180 #define REDIS_WARNING 2
181
182 /* Anti-warning macro... */
183 #define REDIS_NOTUSED(V) ((void) V)
184
185 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
186 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
187
188 /*================================= Data types ============================== */
189
190 /* A redis object, that is a type able to hold a string / list / set */
191 typedef struct redisObject {
192 void *ptr;
193 unsigned char type;
194 unsigned char encoding;
195 unsigned char notused[2];
196 int refcount;
197 } robj;
198
199 typedef struct redisDb {
200 dict *dict;
201 dict *expires;
202 int id;
203 } redisDb;
204
205 /* With multiplexing we need to take per-clinet state.
206 * Clients are taken in a liked list. */
207 typedef struct redisClient {
208 int fd;
209 redisDb *db;
210 int dictid;
211 sds querybuf;
212 robj **argv, **mbargv;
213 int argc, mbargc;
214 int bulklen; /* bulk read len. -1 if not in bulk read mode */
215 int multibulk; /* multi bulk command format active */
216 list *reply;
217 int sentlen;
218 time_t lastinteraction; /* time of the last interaction, used for timeout */
219 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
220 int slaveseldb; /* slave selected db, if this client is a slave */
221 int authenticated; /* when requirepass is non-NULL */
222 int replstate; /* replication state if this is a slave */
223 int repldbfd; /* replication DB file descriptor */
224 long repldboff; /* replication DB file offset */
225 off_t repldbsize; /* replication DB file size */
226 } redisClient;
227
228 struct saveparam {
229 time_t seconds;
230 int changes;
231 };
232
233 /* Global server state structure */
234 struct redisServer {
235 int port;
236 int fd;
237 redisDb *db;
238 dict *sharingpool;
239 unsigned int sharingpoolsize;
240 long long dirty; /* changes to DB from the last save */
241 list *clients;
242 list *slaves, *monitors;
243 char neterr[ANET_ERR_LEN];
244 aeEventLoop *el;
245 int cronloops; /* number of times the cron function run */
246 list *objfreelist; /* A list of freed objects to avoid malloc() */
247 time_t lastsave; /* Unix time of last save succeeede */
248 size_t usedmemory; /* Used memory in megabytes */
249 /* Fields used only for stats */
250 time_t stat_starttime; /* server start time */
251 long long stat_numcommands; /* number of processed commands */
252 long long stat_numconnections; /* number of connections received */
253 /* Configuration */
254 int verbosity;
255 int glueoutputbuf;
256 int maxidletime;
257 int dbnum;
258 int daemonize;
259 char *pidfile;
260 int bgsaveinprogress;
261 pid_t bgsavechildpid;
262 struct saveparam *saveparams;
263 int saveparamslen;
264 char *logfile;
265 char *bindaddr;
266 char *dbfilename;
267 char *requirepass;
268 int shareobjects;
269 /* Replication related */
270 int isslave;
271 char *masterhost;
272 int masterport;
273 redisClient *master; /* client that is master for this slave */
274 int replstate;
275 unsigned int maxclients;
276 unsigned long maxmemory;
277 /* Sort parameters - qsort_r() is only available under BSD so we
278 * have to take this state global, in order to pass it to sortCompare() */
279 int sort_desc;
280 int sort_alpha;
281 int sort_bypattern;
282 };
283
284 typedef void redisCommandProc(redisClient *c);
285 struct redisCommand {
286 char *name;
287 redisCommandProc *proc;
288 int arity;
289 int flags;
290 };
291
292 struct redisFunctionSym {
293 char *name;
294 unsigned long pointer;
295 };
296
297 typedef struct _redisSortObject {
298 robj *obj;
299 union {
300 double score;
301 robj *cmpobj;
302 } u;
303 } redisSortObject;
304
305 typedef struct _redisSortOperation {
306 int type;
307 robj *pattern;
308 } redisSortOperation;
309
310 /* ZSETs use a specialized version of Skiplists */
311
312 typedef struct zskiplistNode {
313 struct zskiplistNode **forward;
314 double score;
315 robj *obj;
316 } zskiplistNode;
317
318 typedef struct zskiplist {
319 struct zskiplistNode *header;
320 long length;
321 int level;
322 } zskiplist;
323
324 typedef struct zset {
325 dict *dict;
326 zskiplist *zsl;
327 } zset;
328
329 /* Our shared "common" objects */
330
331 struct sharedObjectsStruct {
332 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
333 *colon, *nullbulk, *nullmultibulk,
334 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
335 *outofrangeerr, *plus,
336 *select0, *select1, *select2, *select3, *select4,
337 *select5, *select6, *select7, *select8, *select9;
338 } shared;
339
340 /*================================ Prototypes =============================== */
341
342 static void freeStringObject(robj *o);
343 static void freeListObject(robj *o);
344 static void freeSetObject(robj *o);
345 static void decrRefCount(void *o);
346 static robj *createObject(int type, void *ptr);
347 static void freeClient(redisClient *c);
348 static int rdbLoad(char *filename);
349 static void addReply(redisClient *c, robj *obj);
350 static void addReplySds(redisClient *c, sds s);
351 static void incrRefCount(robj *o);
352 static int rdbSaveBackground(char *filename);
353 static robj *createStringObject(char *ptr, size_t len);
354 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
355 static int syncWithMaster(void);
356 static robj *tryObjectSharing(robj *o);
357 static int tryObjectEncoding(robj *o);
358 static robj *getDecodedObject(const robj *o);
359 static int removeExpire(redisDb *db, robj *key);
360 static int expireIfNeeded(redisDb *db, robj *key);
361 static int deleteIfVolatile(redisDb *db, robj *key);
362 static int deleteKey(redisDb *db, robj *key);
363 static time_t getExpire(redisDb *db, robj *key);
364 static int setExpire(redisDb *db, robj *key, time_t when);
365 static void updateSlavesWaitingBgsave(int bgsaveerr);
366 static void freeMemoryIfNeeded(void);
367 static int processCommand(redisClient *c);
368 static void setupSigSegvAction(void);
369 static void rdbRemoveTempFile(pid_t childpid);
370 static size_t stringObjectLen(robj *o);
371 static void processInputBuffer(redisClient *c);
372 static zskiplist *zslCreate(void);
373
374 static void authCommand(redisClient *c);
375 static void pingCommand(redisClient *c);
376 static void echoCommand(redisClient *c);
377 static void setCommand(redisClient *c);
378 static void setnxCommand(redisClient *c);
379 static void getCommand(redisClient *c);
380 static void delCommand(redisClient *c);
381 static void existsCommand(redisClient *c);
382 static void incrCommand(redisClient *c);
383 static void decrCommand(redisClient *c);
384 static void incrbyCommand(redisClient *c);
385 static void decrbyCommand(redisClient *c);
386 static void selectCommand(redisClient *c);
387 static void randomkeyCommand(redisClient *c);
388 static void keysCommand(redisClient *c);
389 static void dbsizeCommand(redisClient *c);
390 static void lastsaveCommand(redisClient *c);
391 static void saveCommand(redisClient *c);
392 static void bgsaveCommand(redisClient *c);
393 static void shutdownCommand(redisClient *c);
394 static void moveCommand(redisClient *c);
395 static void renameCommand(redisClient *c);
396 static void renamenxCommand(redisClient *c);
397 static void lpushCommand(redisClient *c);
398 static void rpushCommand(redisClient *c);
399 static void lpopCommand(redisClient *c);
400 static void rpopCommand(redisClient *c);
401 static void llenCommand(redisClient *c);
402 static void lindexCommand(redisClient *c);
403 static void lrangeCommand(redisClient *c);
404 static void ltrimCommand(redisClient *c);
405 static void typeCommand(redisClient *c);
406 static void lsetCommand(redisClient *c);
407 static void saddCommand(redisClient *c);
408 static void sremCommand(redisClient *c);
409 static void smoveCommand(redisClient *c);
410 static void sismemberCommand(redisClient *c);
411 static void scardCommand(redisClient *c);
412 static void spopCommand(redisClient *c);
413 static void srandmemberCommand(redisClient *c);
414 static void sinterCommand(redisClient *c);
415 static void sinterstoreCommand(redisClient *c);
416 static void sunionCommand(redisClient *c);
417 static void sunionstoreCommand(redisClient *c);
418 static void sdiffCommand(redisClient *c);
419 static void sdiffstoreCommand(redisClient *c);
420 static void syncCommand(redisClient *c);
421 static void flushdbCommand(redisClient *c);
422 static void flushallCommand(redisClient *c);
423 static void sortCommand(redisClient *c);
424 static void lremCommand(redisClient *c);
425 static void infoCommand(redisClient *c);
426 static void mgetCommand(redisClient *c);
427 static void monitorCommand(redisClient *c);
428 static void expireCommand(redisClient *c);
429 static void getsetCommand(redisClient *c);
430 static void ttlCommand(redisClient *c);
431 static void slaveofCommand(redisClient *c);
432 static void debugCommand(redisClient *c);
433 static void msetCommand(redisClient *c);
434 static void msetnxCommand(redisClient *c);
435
436 /*================================= Globals ================================= */
437
438 /* Global vars */
439 static struct redisServer server; /* server global state */
440 static struct redisCommand cmdTable[] = {
441 {"get",getCommand,2,REDIS_CMD_INLINE},
442 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
443 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
444 {"del",delCommand,-2,REDIS_CMD_INLINE},
445 {"exists",existsCommand,2,REDIS_CMD_INLINE},
446 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
447 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
448 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
449 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
450 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
451 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
452 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
453 {"llen",llenCommand,2,REDIS_CMD_INLINE},
454 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
455 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
456 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
457 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
458 {"lrem",lremCommand,4,REDIS_CMD_BULK},
459 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
460 {"srem",sremCommand,3,REDIS_CMD_BULK},
461 {"smove",smoveCommand,4,REDIS_CMD_BULK},
462 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
463 {"scard",scardCommand,2,REDIS_CMD_INLINE},
464 {"spop",spopCommand,2,REDIS_CMD_INLINE},
465 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
466 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
467 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
468 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
469 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
470 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
471 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
472 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
473 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
474 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
475 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
476 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
477 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
478 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
479 {"select",selectCommand,2,REDIS_CMD_INLINE},
480 {"move",moveCommand,3,REDIS_CMD_INLINE},
481 {"rename",renameCommand,3,REDIS_CMD_INLINE},
482 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
483 {"expire",expireCommand,3,REDIS_CMD_INLINE},
484 {"keys",keysCommand,2,REDIS_CMD_INLINE},
485 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
486 {"auth",authCommand,2,REDIS_CMD_INLINE},
487 {"ping",pingCommand,1,REDIS_CMD_INLINE},
488 {"echo",echoCommand,2,REDIS_CMD_BULK},
489 {"save",saveCommand,1,REDIS_CMD_INLINE},
490 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
491 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
492 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
493 {"type",typeCommand,2,REDIS_CMD_INLINE},
494 {"sync",syncCommand,1,REDIS_CMD_INLINE},
495 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
496 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
497 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
498 {"info",infoCommand,1,REDIS_CMD_INLINE},
499 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
500 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
501 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
502 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
503 {NULL,NULL,0,0}
504 };
505 /*============================ Utility functions ============================ */
506
507 /* Glob-style pattern matching. */
508 int stringmatchlen(const char *pattern, int patternLen,
509 const char *string, int stringLen, int nocase)
510 {
511 while(patternLen) {
512 switch(pattern[0]) {
513 case '*':
514 while (pattern[1] == '*') {
515 pattern++;
516 patternLen--;
517 }
518 if (patternLen == 1)
519 return 1; /* match */
520 while(stringLen) {
521 if (stringmatchlen(pattern+1, patternLen-1,
522 string, stringLen, nocase))
523 return 1; /* match */
524 string++;
525 stringLen--;
526 }
527 return 0; /* no match */
528 break;
529 case '?':
530 if (stringLen == 0)
531 return 0; /* no match */
532 string++;
533 stringLen--;
534 break;
535 case '[':
536 {
537 int not, match;
538
539 pattern++;
540 patternLen--;
541 not = pattern[0] == '^';
542 if (not) {
543 pattern++;
544 patternLen--;
545 }
546 match = 0;
547 while(1) {
548 if (pattern[0] == '\\') {
549 pattern++;
550 patternLen--;
551 if (pattern[0] == string[0])
552 match = 1;
553 } else if (pattern[0] == ']') {
554 break;
555 } else if (patternLen == 0) {
556 pattern--;
557 patternLen++;
558 break;
559 } else if (pattern[1] == '-' && patternLen >= 3) {
560 int start = pattern[0];
561 int end = pattern[2];
562 int c = string[0];
563 if (start > end) {
564 int t = start;
565 start = end;
566 end = t;
567 }
568 if (nocase) {
569 start = tolower(start);
570 end = tolower(end);
571 c = tolower(c);
572 }
573 pattern += 2;
574 patternLen -= 2;
575 if (c >= start && c <= end)
576 match = 1;
577 } else {
578 if (!nocase) {
579 if (pattern[0] == string[0])
580 match = 1;
581 } else {
582 if (tolower((int)pattern[0]) == tolower((int)string[0]))
583 match = 1;
584 }
585 }
586 pattern++;
587 patternLen--;
588 }
589 if (not)
590 match = !match;
591 if (!match)
592 return 0; /* no match */
593 string++;
594 stringLen--;
595 break;
596 }
597 case '\\':
598 if (patternLen >= 2) {
599 pattern++;
600 patternLen--;
601 }
602 /* fall through */
603 default:
604 if (!nocase) {
605 if (pattern[0] != string[0])
606 return 0; /* no match */
607 } else {
608 if (tolower((int)pattern[0]) != tolower((int)string[0]))
609 return 0; /* no match */
610 }
611 string++;
612 stringLen--;
613 break;
614 }
615 pattern++;
616 patternLen--;
617 if (stringLen == 0) {
618 while(*pattern == '*') {
619 pattern++;
620 patternLen--;
621 }
622 break;
623 }
624 }
625 if (patternLen == 0 && stringLen == 0)
626 return 1;
627 return 0;
628 }
629
630 static void redisLog(int level, const char *fmt, ...) {
631 va_list ap;
632 FILE *fp;
633
634 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
635 if (!fp) return;
636
637 va_start(ap, fmt);
638 if (level >= server.verbosity) {
639 char *c = ".-*";
640 char buf[64];
641 time_t now;
642
643 now = time(NULL);
644 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
645 fprintf(fp,"%s %c ",buf,c[level]);
646 vfprintf(fp, fmt, ap);
647 fprintf(fp,"\n");
648 fflush(fp);
649 }
650 va_end(ap);
651
652 if (server.logfile) fclose(fp);
653 }
654
655 /*====================== Hash table type implementation ==================== */
656
657 /* This is an hash table type that uses the SDS dynamic strings libary as
658 * keys and radis objects as values (objects can hold SDS strings,
659 * lists, sets). */
660
661 static void dictVanillaFree(void *privdata, void *val)
662 {
663 DICT_NOTUSED(privdata);
664 zfree(val);
665 }
666
667 static int sdsDictKeyCompare(void *privdata, const void *key1,
668 const void *key2)
669 {
670 int l1,l2;
671 DICT_NOTUSED(privdata);
672
673 l1 = sdslen((sds)key1);
674 l2 = sdslen((sds)key2);
675 if (l1 != l2) return 0;
676 return memcmp(key1, key2, l1) == 0;
677 }
678
679 static void dictRedisObjectDestructor(void *privdata, void *val)
680 {
681 DICT_NOTUSED(privdata);
682
683 decrRefCount(val);
684 }
685
686 static int dictObjKeyCompare(void *privdata, const void *key1,
687 const void *key2)
688 {
689 const robj *o1 = key1, *o2 = key2;
690 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
691 }
692
693 static unsigned int dictObjHash(const void *key) {
694 const robj *o = key;
695 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
696 }
697
698 static int dictEncObjKeyCompare(void *privdata, const void *key1,
699 const void *key2)
700 {
701 const robj *o1 = key1, *o2 = key2;
702
703 if (o1->encoding == REDIS_ENCODING_RAW &&
704 o2->encoding == REDIS_ENCODING_RAW)
705 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
706 else {
707 robj *dec1, *dec2;
708 int cmp;
709
710 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
711 getDecodedObject(o1) : (robj*)o1;
712 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
713 getDecodedObject(o2) : (robj*)o2;
714 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
715 if (dec1 != o1) decrRefCount(dec1);
716 if (dec2 != o2) decrRefCount(dec2);
717 return cmp;
718 }
719 }
720
721 static unsigned int dictEncObjHash(const void *key) {
722 const robj *o = key;
723
724 if (o->encoding == REDIS_ENCODING_RAW)
725 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
726 else {
727 robj *dec = getDecodedObject(o);
728 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
729 decrRefCount(dec);
730 return hash;
731 }
732 }
733
734 static dictType setDictType = {
735 dictEncObjHash, /* hash function */
736 NULL, /* key dup */
737 NULL, /* val dup */
738 dictEncObjKeyCompare, /* key compare */
739 dictRedisObjectDestructor, /* key destructor */
740 NULL /* val destructor */
741 };
742
743 static dictType zsetDictType = {
744 dictEncObjHash, /* hash function */
745 NULL, /* key dup */
746 NULL, /* val dup */
747 dictEncObjKeyCompare, /* key compare */
748 dictRedisObjectDestructor, /* key destructor */
749 dictVanillaFree /* val destructor */
750 };
751
752 static dictType hashDictType = {
753 dictObjHash, /* hash function */
754 NULL, /* key dup */
755 NULL, /* val dup */
756 dictObjKeyCompare, /* key compare */
757 dictRedisObjectDestructor, /* key destructor */
758 dictRedisObjectDestructor /* val destructor */
759 };
760
761 /* ========================= Random utility functions ======================= */
762
763 /* Redis generally does not try to recover from out of memory conditions
764 * when allocating objects or strings, it is not clear if it will be possible
765 * to report this condition to the client since the networking layer itself
766 * is based on heap allocation for send buffers, so we simply abort.
767 * At least the code will be simpler to read... */
768 static void oom(const char *msg) {
769 fprintf(stderr, "%s: Out of memory\n",msg);
770 fflush(stderr);
771 sleep(1);
772 abort();
773 }
774
775 /* ====================== Redis server networking stuff ===================== */
776 static void closeTimedoutClients(void) {
777 redisClient *c;
778 listNode *ln;
779 time_t now = time(NULL);
780
781 listRewind(server.clients);
782 while ((ln = listYield(server.clients)) != NULL) {
783 c = listNodeValue(ln);
784 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
785 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
786 (now - c->lastinteraction > server.maxidletime)) {
787 redisLog(REDIS_DEBUG,"Closing idle client");
788 freeClient(c);
789 }
790 }
791 }
792
793 static int htNeedsResize(dict *dict) {
794 long long size, used;
795
796 size = dictSlots(dict);
797 used = dictSize(dict);
798 return (size && used && size > DICT_HT_INITIAL_SIZE &&
799 (used*100/size < REDIS_HT_MINFILL));
800 }
801
802 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
803 * we resize the hash table to save memory */
804 static void tryResizeHashTables(void) {
805 int j;
806
807 for (j = 0; j < server.dbnum; j++) {
808 if (htNeedsResize(server.db[j].dict)) {
809 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
810 dictResize(server.db[j].dict);
811 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
812 }
813 if (htNeedsResize(server.db[j].expires))
814 dictResize(server.db[j].expires);
815 }
816 }
817
818 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
819 int j, loops = server.cronloops++;
820 REDIS_NOTUSED(eventLoop);
821 REDIS_NOTUSED(id);
822 REDIS_NOTUSED(clientData);
823
824 /* Update the global state with the amount of used memory */
825 server.usedmemory = zmalloc_used_memory();
826
827 /* Show some info about non-empty databases */
828 for (j = 0; j < server.dbnum; j++) {
829 long long size, used, vkeys;
830
831 size = dictSlots(server.db[j].dict);
832 used = dictSize(server.db[j].dict);
833 vkeys = dictSize(server.db[j].expires);
834 if (!(loops % 5) && (used || vkeys)) {
835 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
836 /* dictPrintStats(server.dict); */
837 }
838 }
839
840 /* We don't want to resize the hash tables while a bacground saving
841 * is in progress: the saving child is created using fork() that is
842 * implemented with a copy-on-write semantic in most modern systems, so
843 * if we resize the HT while there is the saving child at work actually
844 * a lot of memory movements in the parent will cause a lot of pages
845 * copied. */
846 if (!server.bgsaveinprogress) tryResizeHashTables();
847
848 /* Show information about connected clients */
849 if (!(loops % 5)) {
850 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
851 listLength(server.clients)-listLength(server.slaves),
852 listLength(server.slaves),
853 server.usedmemory,
854 dictSize(server.sharingpool));
855 }
856
857 /* Close connections of timedout clients */
858 if (server.maxidletime && !(loops % 10))
859 closeTimedoutClients();
860
861 /* Check if a background saving in progress terminated */
862 if (server.bgsaveinprogress) {
863 int statloc;
864 if (wait4(-1,&statloc,WNOHANG,NULL)) {
865 int exitcode = WEXITSTATUS(statloc);
866 int bysignal = WIFSIGNALED(statloc);
867
868 if (!bysignal && exitcode == 0) {
869 redisLog(REDIS_NOTICE,
870 "Background saving terminated with success");
871 server.dirty = 0;
872 server.lastsave = time(NULL);
873 } else if (!bysignal && exitcode != 0) {
874 redisLog(REDIS_WARNING, "Background saving error");
875 } else {
876 redisLog(REDIS_WARNING,
877 "Background saving terminated by signal");
878 rdbRemoveTempFile(server.bgsavechildpid);
879 }
880 server.bgsaveinprogress = 0;
881 server.bgsavechildpid = -1;
882 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
883 }
884 } else {
885 /* If there is not a background saving in progress check if
886 * we have to save now */
887 time_t now = time(NULL);
888 for (j = 0; j < server.saveparamslen; j++) {
889 struct saveparam *sp = server.saveparams+j;
890
891 if (server.dirty >= sp->changes &&
892 now-server.lastsave > sp->seconds) {
893 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
894 sp->changes, sp->seconds);
895 rdbSaveBackground(server.dbfilename);
896 break;
897 }
898 }
899 }
900
901 /* Try to expire a few timed out keys */
902 for (j = 0; j < server.dbnum; j++) {
903 redisDb *db = server.db+j;
904 int num = dictSize(db->expires);
905
906 if (num) {
907 time_t now = time(NULL);
908
909 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
910 num = REDIS_EXPIRELOOKUPS_PER_CRON;
911 while (num--) {
912 dictEntry *de;
913 time_t t;
914
915 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
916 t = (time_t) dictGetEntryVal(de);
917 if (now > t) {
918 deleteKey(db,dictGetEntryKey(de));
919 }
920 }
921 }
922 }
923
924 /* Check if we should connect to a MASTER */
925 if (server.replstate == REDIS_REPL_CONNECT) {
926 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
927 if (syncWithMaster() == REDIS_OK) {
928 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
929 }
930 }
931 return 1000;
932 }
933
934 static void createSharedObjects(void) {
935 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
936 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
937 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
938 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
939 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
940 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
941 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
942 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
943 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
944 /* no such key */
945 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
946 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
947 "-ERR Operation against a key holding the wrong kind of value\r\n"));
948 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
949 "-ERR no such key\r\n"));
950 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
951 "-ERR syntax error\r\n"));
952 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
953 "-ERR source and destination objects are the same\r\n"));
954 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
955 "-ERR index out of range\r\n"));
956 shared.space = createObject(REDIS_STRING,sdsnew(" "));
957 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
958 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
959 shared.select0 = createStringObject("select 0\r\n",10);
960 shared.select1 = createStringObject("select 1\r\n",10);
961 shared.select2 = createStringObject("select 2\r\n",10);
962 shared.select3 = createStringObject("select 3\r\n",10);
963 shared.select4 = createStringObject("select 4\r\n",10);
964 shared.select5 = createStringObject("select 5\r\n",10);
965 shared.select6 = createStringObject("select 6\r\n",10);
966 shared.select7 = createStringObject("select 7\r\n",10);
967 shared.select8 = createStringObject("select 8\r\n",10);
968 shared.select9 = createStringObject("select 9\r\n",10);
969 }
970
971 static void appendServerSaveParams(time_t seconds, int changes) {
972 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
973 server.saveparams[server.saveparamslen].seconds = seconds;
974 server.saveparams[server.saveparamslen].changes = changes;
975 server.saveparamslen++;
976 }
977
978 static void ResetServerSaveParams() {
979 zfree(server.saveparams);
980 server.saveparams = NULL;
981 server.saveparamslen = 0;
982 }
983
984 static void initServerConfig() {
985 server.dbnum = REDIS_DEFAULT_DBNUM;
986 server.port = REDIS_SERVERPORT;
987 server.verbosity = REDIS_DEBUG;
988 server.maxidletime = REDIS_MAXIDLETIME;
989 server.saveparams = NULL;
990 server.logfile = NULL; /* NULL = log on standard output */
991 server.bindaddr = NULL;
992 server.glueoutputbuf = 1;
993 server.daemonize = 0;
994 server.pidfile = "/var/run/redis.pid";
995 server.dbfilename = "dump.rdb";
996 server.requirepass = NULL;
997 server.shareobjects = 0;
998 server.sharingpoolsize = 1024;
999 server.maxclients = 0;
1000 server.maxmemory = 0;
1001 ResetServerSaveParams();
1002
1003 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1004 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1005 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1006 /* Replication related */
1007 server.isslave = 0;
1008 server.masterhost = NULL;
1009 server.masterport = 6379;
1010 server.master = NULL;
1011 server.replstate = REDIS_REPL_NONE;
1012 }
1013
1014 static void initServer() {
1015 int j;
1016
1017 signal(SIGHUP, SIG_IGN);
1018 signal(SIGPIPE, SIG_IGN);
1019 setupSigSegvAction();
1020
1021 server.clients = listCreate();
1022 server.slaves = listCreate();
1023 server.monitors = listCreate();
1024 server.objfreelist = listCreate();
1025 createSharedObjects();
1026 server.el = aeCreateEventLoop();
1027 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1028 server.sharingpool = dictCreate(&setDictType,NULL);
1029 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1030 if (server.fd == -1) {
1031 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1032 exit(1);
1033 }
1034 for (j = 0; j < server.dbnum; j++) {
1035 server.db[j].dict = dictCreate(&hashDictType,NULL);
1036 server.db[j].expires = dictCreate(&setDictType,NULL);
1037 server.db[j].id = j;
1038 }
1039 server.cronloops = 0;
1040 server.bgsaveinprogress = 0;
1041 server.bgsavechildpid = -1;
1042 server.lastsave = time(NULL);
1043 server.dirty = 0;
1044 server.usedmemory = 0;
1045 server.stat_numcommands = 0;
1046 server.stat_numconnections = 0;
1047 server.stat_starttime = time(NULL);
1048 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1049 }
1050
1051 /* Empty the whole database */
1052 static long long emptyDb() {
1053 int j;
1054 long long removed = 0;
1055
1056 for (j = 0; j < server.dbnum; j++) {
1057 removed += dictSize(server.db[j].dict);
1058 dictEmpty(server.db[j].dict);
1059 dictEmpty(server.db[j].expires);
1060 }
1061 return removed;
1062 }
1063
1064 static int yesnotoi(char *s) {
1065 if (!strcasecmp(s,"yes")) return 1;
1066 else if (!strcasecmp(s,"no")) return 0;
1067 else return -1;
1068 }
1069
1070 /* I agree, this is a very rudimental way to load a configuration...
1071 will improve later if the config gets more complex */
1072 static void loadServerConfig(char *filename) {
1073 FILE *fp;
1074 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1075 int linenum = 0;
1076 sds line = NULL;
1077
1078 if (filename[0] == '-' && filename[1] == '\0')
1079 fp = stdin;
1080 else {
1081 if ((fp = fopen(filename,"r")) == NULL) {
1082 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1083 exit(1);
1084 }
1085 }
1086
1087 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1088 sds *argv;
1089 int argc, j;
1090
1091 linenum++;
1092 line = sdsnew(buf);
1093 line = sdstrim(line," \t\r\n");
1094
1095 /* Skip comments and blank lines*/
1096 if (line[0] == '#' || line[0] == '\0') {
1097 sdsfree(line);
1098 continue;
1099 }
1100
1101 /* Split into arguments */
1102 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1103 sdstolower(argv[0]);
1104
1105 /* Execute config directives */
1106 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1107 server.maxidletime = atoi(argv[1]);
1108 if (server.maxidletime < 0) {
1109 err = "Invalid timeout value"; goto loaderr;
1110 }
1111 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1112 server.port = atoi(argv[1]);
1113 if (server.port < 1 || server.port > 65535) {
1114 err = "Invalid port"; goto loaderr;
1115 }
1116 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1117 server.bindaddr = zstrdup(argv[1]);
1118 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1119 int seconds = atoi(argv[1]);
1120 int changes = atoi(argv[2]);
1121 if (seconds < 1 || changes < 0) {
1122 err = "Invalid save parameters"; goto loaderr;
1123 }
1124 appendServerSaveParams(seconds,changes);
1125 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1126 if (chdir(argv[1]) == -1) {
1127 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1128 argv[1], strerror(errno));
1129 exit(1);
1130 }
1131 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1132 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1133 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1134 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1135 else {
1136 err = "Invalid log level. Must be one of debug, notice, warning";
1137 goto loaderr;
1138 }
1139 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1140 FILE *logfp;
1141
1142 server.logfile = zstrdup(argv[1]);
1143 if (!strcasecmp(server.logfile,"stdout")) {
1144 zfree(server.logfile);
1145 server.logfile = NULL;
1146 }
1147 if (server.logfile) {
1148 /* Test if we are able to open the file. The server will not
1149 * be able to abort just for this problem later... */
1150 logfp = fopen(server.logfile,"a");
1151 if (logfp == NULL) {
1152 err = sdscatprintf(sdsempty(),
1153 "Can't open the log file: %s", strerror(errno));
1154 goto loaderr;
1155 }
1156 fclose(logfp);
1157 }
1158 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1159 server.dbnum = atoi(argv[1]);
1160 if (server.dbnum < 1) {
1161 err = "Invalid number of databases"; goto loaderr;
1162 }
1163 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1164 server.maxclients = atoi(argv[1]);
1165 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1166 server.maxmemory = strtoll(argv[1], NULL, 10);
1167 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1168 server.masterhost = sdsnew(argv[1]);
1169 server.masterport = atoi(argv[2]);
1170 server.replstate = REDIS_REPL_CONNECT;
1171 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1172 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1173 err = "argument must be 'yes' or 'no'"; goto loaderr;
1174 }
1175 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1176 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1177 err = "argument must be 'yes' or 'no'"; goto loaderr;
1178 }
1179 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1180 server.sharingpoolsize = atoi(argv[1]);
1181 if (server.sharingpoolsize < 1) {
1182 err = "invalid object sharing pool size"; goto loaderr;
1183 }
1184 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1185 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1186 err = "argument must be 'yes' or 'no'"; goto loaderr;
1187 }
1188 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1189 server.requirepass = zstrdup(argv[1]);
1190 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1191 server.pidfile = zstrdup(argv[1]);
1192 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1193 server.dbfilename = zstrdup(argv[1]);
1194 } else {
1195 err = "Bad directive or wrong number of arguments"; goto loaderr;
1196 }
1197 for (j = 0; j < argc; j++)
1198 sdsfree(argv[j]);
1199 zfree(argv);
1200 sdsfree(line);
1201 }
1202 if (fp != stdin) fclose(fp);
1203 return;
1204
1205 loaderr:
1206 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1207 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1208 fprintf(stderr, ">>> '%s'\n", line);
1209 fprintf(stderr, "%s\n", err);
1210 exit(1);
1211 }
1212
1213 static void freeClientArgv(redisClient *c) {
1214 int j;
1215
1216 for (j = 0; j < c->argc; j++)
1217 decrRefCount(c->argv[j]);
1218 for (j = 0; j < c->mbargc; j++)
1219 decrRefCount(c->mbargv[j]);
1220 c->argc = 0;
1221 c->mbargc = 0;
1222 }
1223
1224 static void freeClient(redisClient *c) {
1225 listNode *ln;
1226
1227 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1228 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1229 sdsfree(c->querybuf);
1230 listRelease(c->reply);
1231 freeClientArgv(c);
1232 close(c->fd);
1233 ln = listSearchKey(server.clients,c);
1234 assert(ln != NULL);
1235 listDelNode(server.clients,ln);
1236 if (c->flags & REDIS_SLAVE) {
1237 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1238 close(c->repldbfd);
1239 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1240 ln = listSearchKey(l,c);
1241 assert(ln != NULL);
1242 listDelNode(l,ln);
1243 }
1244 if (c->flags & REDIS_MASTER) {
1245 server.master = NULL;
1246 server.replstate = REDIS_REPL_CONNECT;
1247 }
1248 zfree(c->argv);
1249 zfree(c->mbargv);
1250 zfree(c);
1251 }
1252
1253 static void glueReplyBuffersIfNeeded(redisClient *c) {
1254 int totlen = 0;
1255 listNode *ln;
1256 robj *o;
1257
1258 listRewind(c->reply);
1259 while((ln = listYield(c->reply))) {
1260 o = ln->value;
1261 totlen += sdslen(o->ptr);
1262 /* This optimization makes more sense if we don't have to copy
1263 * too much data */
1264 if (totlen > 1024) return;
1265 }
1266 if (totlen > 0) {
1267 char buf[1024];
1268 int copylen = 0;
1269
1270 listRewind(c->reply);
1271 while((ln = listYield(c->reply))) {
1272 o = ln->value;
1273 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1274 copylen += sdslen(o->ptr);
1275 listDelNode(c->reply,ln);
1276 }
1277 /* Now the output buffer is empty, add the new single element */
1278 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1279 listAddNodeTail(c->reply,o);
1280 }
1281 }
1282
1283 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1284 redisClient *c = privdata;
1285 int nwritten = 0, totwritten = 0, objlen;
1286 robj *o;
1287 REDIS_NOTUSED(el);
1288 REDIS_NOTUSED(mask);
1289
1290 if (server.glueoutputbuf && listLength(c->reply) > 1)
1291 glueReplyBuffersIfNeeded(c);
1292 while(listLength(c->reply)) {
1293 o = listNodeValue(listFirst(c->reply));
1294 objlen = sdslen(o->ptr);
1295
1296 if (objlen == 0) {
1297 listDelNode(c->reply,listFirst(c->reply));
1298 continue;
1299 }
1300
1301 if (c->flags & REDIS_MASTER) {
1302 /* Don't reply to a master */
1303 nwritten = objlen - c->sentlen;
1304 } else {
1305 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1306 if (nwritten <= 0) break;
1307 }
1308 c->sentlen += nwritten;
1309 totwritten += nwritten;
1310 /* If we fully sent the object on head go to the next one */
1311 if (c->sentlen == objlen) {
1312 listDelNode(c->reply,listFirst(c->reply));
1313 c->sentlen = 0;
1314 }
1315 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1316 * bytes, in a single threaded server it's a good idea to server
1317 * other clients as well, even if a very large request comes from
1318 * super fast link that is always able to accept data (in real world
1319 * terms think to 'KEYS *' against the loopback interfae) */
1320 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1321 }
1322 if (nwritten == -1) {
1323 if (errno == EAGAIN) {
1324 nwritten = 0;
1325 } else {
1326 redisLog(REDIS_DEBUG,
1327 "Error writing to client: %s", strerror(errno));
1328 freeClient(c);
1329 return;
1330 }
1331 }
1332 if (totwritten > 0) c->lastinteraction = time(NULL);
1333 if (listLength(c->reply) == 0) {
1334 c->sentlen = 0;
1335 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1336 }
1337 }
1338
1339 static struct redisCommand *lookupCommand(char *name) {
1340 int j = 0;
1341 while(cmdTable[j].name != NULL) {
1342 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1343 j++;
1344 }
1345 return NULL;
1346 }
1347
1348 /* resetClient prepare the client to process the next command */
1349 static void resetClient(redisClient *c) {
1350 freeClientArgv(c);
1351 c->bulklen = -1;
1352 c->multibulk = 0;
1353 }
1354
1355 /* If this function gets called we already read a whole
1356 * command, argments are in the client argv/argc fields.
1357 * processCommand() execute the command or prepare the
1358 * server for a bulk read from the client.
1359 *
1360 * If 1 is returned the client is still alive and valid and
1361 * and other operations can be performed by the caller. Otherwise
1362 * if 0 is returned the client was destroied (i.e. after QUIT). */
1363 static int processCommand(redisClient *c) {
1364 struct redisCommand *cmd;
1365 long long dirty;
1366
1367 /* Free some memory if needed (maxmemory setting) */
1368 if (server.maxmemory) freeMemoryIfNeeded();
1369
1370 /* Handle the multi bulk command type. This is an alternative protocol
1371 * supported by Redis in order to receive commands that are composed of
1372 * multiple binary-safe "bulk" arguments. The latency of processing is
1373 * a bit higher but this allows things like multi-sets, so if this
1374 * protocol is used only for MSET and similar commands this is a big win. */
1375 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1376 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1377 if (c->multibulk <= 0) {
1378 resetClient(c);
1379 return 1;
1380 } else {
1381 decrRefCount(c->argv[c->argc-1]);
1382 c->argc--;
1383 return 1;
1384 }
1385 } else if (c->multibulk) {
1386 if (c->bulklen == -1) {
1387 if (((char*)c->argv[0]->ptr)[0] != '$') {
1388 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1389 resetClient(c);
1390 return 1;
1391 } else {
1392 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1393 decrRefCount(c->argv[0]);
1394 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1395 c->argc--;
1396 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1397 resetClient(c);
1398 return 1;
1399 }
1400 c->argc--;
1401 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1402 return 1;
1403 }
1404 } else {
1405 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1406 c->mbargv[c->mbargc] = c->argv[0];
1407 c->mbargc++;
1408 c->argc--;
1409 c->multibulk--;
1410 if (c->multibulk == 0) {
1411 robj **auxargv;
1412 int auxargc;
1413
1414 /* Here we need to swap the multi-bulk argc/argv with the
1415 * normal argc/argv of the client structure. */
1416 auxargv = c->argv;
1417 c->argv = c->mbargv;
1418 c->mbargv = auxargv;
1419
1420 auxargc = c->argc;
1421 c->argc = c->mbargc;
1422 c->mbargc = auxargc;
1423
1424 /* We need to set bulklen to something different than -1
1425 * in order for the code below to process the command without
1426 * to try to read the last argument of a bulk command as
1427 * a special argument. */
1428 c->bulklen = 0;
1429 /* continue below and process the command */
1430 } else {
1431 c->bulklen = -1;
1432 return 1;
1433 }
1434 }
1435 }
1436 /* -- end of multi bulk commands processing -- */
1437
1438 /* The QUIT command is handled as a special case. Normal command
1439 * procs are unable to close the client connection safely */
1440 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1441 freeClient(c);
1442 return 0;
1443 }
1444 cmd = lookupCommand(c->argv[0]->ptr);
1445 if (!cmd) {
1446 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1447 resetClient(c);
1448 return 1;
1449 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1450 (c->argc < -cmd->arity)) {
1451 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1452 resetClient(c);
1453 return 1;
1454 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1455 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1456 resetClient(c);
1457 return 1;
1458 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1459 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1460
1461 decrRefCount(c->argv[c->argc-1]);
1462 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1463 c->argc--;
1464 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1465 resetClient(c);
1466 return 1;
1467 }
1468 c->argc--;
1469 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1470 /* It is possible that the bulk read is already in the
1471 * buffer. Check this condition and handle it accordingly.
1472 * This is just a fast path, alternative to call processInputBuffer().
1473 * It's a good idea since the code is small and this condition
1474 * happens most of the times. */
1475 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1476 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1477 c->argc++;
1478 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1479 } else {
1480 return 1;
1481 }
1482 }
1483 /* Let's try to share objects on the command arguments vector */
1484 if (server.shareobjects) {
1485 int j;
1486 for(j = 1; j < c->argc; j++)
1487 c->argv[j] = tryObjectSharing(c->argv[j]);
1488 }
1489 /* Let's try to encode the bulk object to save space. */
1490 if (cmd->flags & REDIS_CMD_BULK)
1491 tryObjectEncoding(c->argv[c->argc-1]);
1492
1493 /* Check if the user is authenticated */
1494 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1495 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1496 resetClient(c);
1497 return 1;
1498 }
1499
1500 /* Exec the command */
1501 dirty = server.dirty;
1502 cmd->proc(c);
1503 if (server.dirty-dirty != 0 && listLength(server.slaves))
1504 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1505 if (listLength(server.monitors))
1506 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1507 server.stat_numcommands++;
1508
1509 /* Prepare the client for the next command */
1510 if (c->flags & REDIS_CLOSE) {
1511 freeClient(c);
1512 return 0;
1513 }
1514 resetClient(c);
1515 return 1;
1516 }
1517
1518 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1519 listNode *ln;
1520 int outc = 0, j;
1521 robj **outv;
1522 /* (args*2)+1 is enough room for args, spaces, newlines */
1523 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1524
1525 if (argc <= REDIS_STATIC_ARGS) {
1526 outv = static_outv;
1527 } else {
1528 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1529 }
1530
1531 for (j = 0; j < argc; j++) {
1532 if (j != 0) outv[outc++] = shared.space;
1533 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1534 robj *lenobj;
1535
1536 lenobj = createObject(REDIS_STRING,
1537 sdscatprintf(sdsempty(),"%d\r\n",
1538 stringObjectLen(argv[j])));
1539 lenobj->refcount = 0;
1540 outv[outc++] = lenobj;
1541 }
1542 outv[outc++] = argv[j];
1543 }
1544 outv[outc++] = shared.crlf;
1545
1546 /* Increment all the refcounts at start and decrement at end in order to
1547 * be sure to free objects if there is no slave in a replication state
1548 * able to be feed with commands */
1549 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1550 listRewind(slaves);
1551 while((ln = listYield(slaves))) {
1552 redisClient *slave = ln->value;
1553
1554 /* Don't feed slaves that are still waiting for BGSAVE to start */
1555 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1556
1557 /* Feed all the other slaves, MONITORs and so on */
1558 if (slave->slaveseldb != dictid) {
1559 robj *selectcmd;
1560
1561 switch(dictid) {
1562 case 0: selectcmd = shared.select0; break;
1563 case 1: selectcmd = shared.select1; break;
1564 case 2: selectcmd = shared.select2; break;
1565 case 3: selectcmd = shared.select3; break;
1566 case 4: selectcmd = shared.select4; break;
1567 case 5: selectcmd = shared.select5; break;
1568 case 6: selectcmd = shared.select6; break;
1569 case 7: selectcmd = shared.select7; break;
1570 case 8: selectcmd = shared.select8; break;
1571 case 9: selectcmd = shared.select9; break;
1572 default:
1573 selectcmd = createObject(REDIS_STRING,
1574 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1575 selectcmd->refcount = 0;
1576 break;
1577 }
1578 addReply(slave,selectcmd);
1579 slave->slaveseldb = dictid;
1580 }
1581 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1582 }
1583 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1584 if (outv != static_outv) zfree(outv);
1585 }
1586
1587 static void processInputBuffer(redisClient *c) {
1588 again:
1589 if (c->bulklen == -1) {
1590 /* Read the first line of the query */
1591 char *p = strchr(c->querybuf,'\n');
1592 size_t querylen;
1593
1594 if (p) {
1595 sds query, *argv;
1596 int argc, j;
1597
1598 query = c->querybuf;
1599 c->querybuf = sdsempty();
1600 querylen = 1+(p-(query));
1601 if (sdslen(query) > querylen) {
1602 /* leave data after the first line of the query in the buffer */
1603 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1604 }
1605 *p = '\0'; /* remove "\n" */
1606 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1607 sdsupdatelen(query);
1608
1609 /* Now we can split the query in arguments */
1610 if (sdslen(query) == 0) {
1611 /* Ignore empty query */
1612 sdsfree(query);
1613 return;
1614 }
1615 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1616 sdsfree(query);
1617
1618 if (c->argv) zfree(c->argv);
1619 c->argv = zmalloc(sizeof(robj*)*argc);
1620
1621 for (j = 0; j < argc; j++) {
1622 if (sdslen(argv[j])) {
1623 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1624 c->argc++;
1625 } else {
1626 sdsfree(argv[j]);
1627 }
1628 }
1629 zfree(argv);
1630 /* Execute the command. If the client is still valid
1631 * after processCommand() return and there is something
1632 * on the query buffer try to process the next command. */
1633 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1634 return;
1635 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1636 redisLog(REDIS_DEBUG, "Client protocol error");
1637 freeClient(c);
1638 return;
1639 }
1640 } else {
1641 /* Bulk read handling. Note that if we are at this point
1642 the client already sent a command terminated with a newline,
1643 we are reading the bulk data that is actually the last
1644 argument of the command. */
1645 int qbl = sdslen(c->querybuf);
1646
1647 if (c->bulklen <= qbl) {
1648 /* Copy everything but the final CRLF as final argument */
1649 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1650 c->argc++;
1651 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1652 /* Process the command. If the client is still valid after
1653 * the processing and there is more data in the buffer
1654 * try to parse it. */
1655 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1656 return;
1657 }
1658 }
1659 }
1660
1661 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1662 redisClient *c = (redisClient*) privdata;
1663 char buf[REDIS_IOBUF_LEN];
1664 int nread;
1665 REDIS_NOTUSED(el);
1666 REDIS_NOTUSED(mask);
1667
1668 nread = read(fd, buf, REDIS_IOBUF_LEN);
1669 if (nread == -1) {
1670 if (errno == EAGAIN) {
1671 nread = 0;
1672 } else {
1673 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1674 freeClient(c);
1675 return;
1676 }
1677 } else if (nread == 0) {
1678 redisLog(REDIS_DEBUG, "Client closed connection");
1679 freeClient(c);
1680 return;
1681 }
1682 if (nread) {
1683 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1684 c->lastinteraction = time(NULL);
1685 } else {
1686 return;
1687 }
1688 processInputBuffer(c);
1689 }
1690
1691 static int selectDb(redisClient *c, int id) {
1692 if (id < 0 || id >= server.dbnum)
1693 return REDIS_ERR;
1694 c->db = &server.db[id];
1695 return REDIS_OK;
1696 }
1697
1698 static void *dupClientReplyValue(void *o) {
1699 incrRefCount((robj*)o);
1700 return 0;
1701 }
1702
1703 static redisClient *createClient(int fd) {
1704 redisClient *c = zmalloc(sizeof(*c));
1705
1706 anetNonBlock(NULL,fd);
1707 anetTcpNoDelay(NULL,fd);
1708 if (!c) return NULL;
1709 selectDb(c,0);
1710 c->fd = fd;
1711 c->querybuf = sdsempty();
1712 c->argc = 0;
1713 c->argv = NULL;
1714 c->bulklen = -1;
1715 c->multibulk = 0;
1716 c->mbargc = 0;
1717 c->mbargv = NULL;
1718 c->sentlen = 0;
1719 c->flags = 0;
1720 c->lastinteraction = time(NULL);
1721 c->authenticated = 0;
1722 c->replstate = REDIS_REPL_NONE;
1723 c->reply = listCreate();
1724 listSetFreeMethod(c->reply,decrRefCount);
1725 listSetDupMethod(c->reply,dupClientReplyValue);
1726 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1727 readQueryFromClient, c, NULL) == AE_ERR) {
1728 freeClient(c);
1729 return NULL;
1730 }
1731 listAddNodeTail(server.clients,c);
1732 return c;
1733 }
1734
1735 static void addReply(redisClient *c, robj *obj) {
1736 if (listLength(c->reply) == 0 &&
1737 (c->replstate == REDIS_REPL_NONE ||
1738 c->replstate == REDIS_REPL_ONLINE) &&
1739 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1740 sendReplyToClient, c, NULL) == AE_ERR) return;
1741 if (obj->encoding != REDIS_ENCODING_RAW) {
1742 obj = getDecodedObject(obj);
1743 } else {
1744 incrRefCount(obj);
1745 }
1746 listAddNodeTail(c->reply,obj);
1747 }
1748
1749 static void addReplySds(redisClient *c, sds s) {
1750 robj *o = createObject(REDIS_STRING,s);
1751 addReply(c,o);
1752 decrRefCount(o);
1753 }
1754
1755 static void addReplyBulkLen(redisClient *c, robj *obj) {
1756 size_t len;
1757
1758 if (obj->encoding == REDIS_ENCODING_RAW) {
1759 len = sdslen(obj->ptr);
1760 } else {
1761 long n = (long)obj->ptr;
1762
1763 len = 1;
1764 if (n < 0) {
1765 len++;
1766 n = -n;
1767 }
1768 while((n = n/10) != 0) {
1769 len++;
1770 }
1771 }
1772 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1773 }
1774
1775 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1776 int cport, cfd;
1777 char cip[128];
1778 redisClient *c;
1779 REDIS_NOTUSED(el);
1780 REDIS_NOTUSED(mask);
1781 REDIS_NOTUSED(privdata);
1782
1783 cfd = anetAccept(server.neterr, fd, cip, &cport);
1784 if (cfd == AE_ERR) {
1785 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1786 return;
1787 }
1788 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1789 if ((c = createClient(cfd)) == NULL) {
1790 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1791 close(cfd); /* May be already closed, just ingore errors */
1792 return;
1793 }
1794 /* If maxclient directive is set and this is one client more... close the
1795 * connection. Note that we create the client instead to check before
1796 * for this condition, since now the socket is already set in nonblocking
1797 * mode and we can send an error for free using the Kernel I/O */
1798 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1799 char *err = "-ERR max number of clients reached\r\n";
1800
1801 /* That's a best effort error message, don't check write errors */
1802 (void) write(c->fd,err,strlen(err));
1803 freeClient(c);
1804 return;
1805 }
1806 server.stat_numconnections++;
1807 }
1808
1809 /* ======================= Redis objects implementation ===================== */
1810
1811 static robj *createObject(int type, void *ptr) {
1812 robj *o;
1813
1814 if (listLength(server.objfreelist)) {
1815 listNode *head = listFirst(server.objfreelist);
1816 o = listNodeValue(head);
1817 listDelNode(server.objfreelist,head);
1818 } else {
1819 o = zmalloc(sizeof(*o));
1820 }
1821 o->type = type;
1822 o->encoding = REDIS_ENCODING_RAW;
1823 o->ptr = ptr;
1824 o->refcount = 1;
1825 return o;
1826 }
1827
1828 static robj *createStringObject(char *ptr, size_t len) {
1829 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1830 }
1831
1832 static robj *createListObject(void) {
1833 list *l = listCreate();
1834
1835 listSetFreeMethod(l,decrRefCount);
1836 return createObject(REDIS_LIST,l);
1837 }
1838
1839 static robj *createSetObject(void) {
1840 dict *d = dictCreate(&setDictType,NULL);
1841 return createObject(REDIS_SET,d);
1842 }
1843
1844 static robj *createZsetObject(void) {
1845 zset *zs = zmalloc(sizeof(*zs));
1846
1847 zs->dict = dictCreate(&zsetDictType,NULL);
1848 zs->zsl = zslCreate();
1849 return createObject(REDIS_ZSET,zs);
1850 }
1851
1852 static void freeStringObject(robj *o) {
1853 if (o->encoding == REDIS_ENCODING_RAW) {
1854 sdsfree(o->ptr);
1855 }
1856 }
1857
1858 static void freeListObject(robj *o) {
1859 listRelease((list*) o->ptr);
1860 }
1861
1862 static void freeSetObject(robj *o) {
1863 dictRelease((dict*) o->ptr);
1864 }
1865
1866 static void freeHashObject(robj *o) {
1867 dictRelease((dict*) o->ptr);
1868 }
1869
1870 static void incrRefCount(robj *o) {
1871 o->refcount++;
1872 #ifdef DEBUG_REFCOUNT
1873 if (o->type == REDIS_STRING)
1874 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1875 #endif
1876 }
1877
1878 static void decrRefCount(void *obj) {
1879 robj *o = obj;
1880
1881 #ifdef DEBUG_REFCOUNT
1882 if (o->type == REDIS_STRING)
1883 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1884 #endif
1885 if (--(o->refcount) == 0) {
1886 switch(o->type) {
1887 case REDIS_STRING: freeStringObject(o); break;
1888 case REDIS_LIST: freeListObject(o); break;
1889 case REDIS_SET: freeSetObject(o); break;
1890 case REDIS_HASH: freeHashObject(o); break;
1891 default: assert(0 != 0); break;
1892 }
1893 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1894 !listAddNodeHead(server.objfreelist,o))
1895 zfree(o);
1896 }
1897 }
1898
1899 static robj *lookupKey(redisDb *db, robj *key) {
1900 dictEntry *de = dictFind(db->dict,key);
1901 return de ? dictGetEntryVal(de) : NULL;
1902 }
1903
1904 static robj *lookupKeyRead(redisDb *db, robj *key) {
1905 expireIfNeeded(db,key);
1906 return lookupKey(db,key);
1907 }
1908
1909 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1910 deleteIfVolatile(db,key);
1911 return lookupKey(db,key);
1912 }
1913
1914 static int deleteKey(redisDb *db, robj *key) {
1915 int retval;
1916
1917 /* We need to protect key from destruction: after the first dictDelete()
1918 * it may happen that 'key' is no longer valid if we don't increment
1919 * it's count. This may happen when we get the object reference directly
1920 * from the hash table with dictRandomKey() or dict iterators */
1921 incrRefCount(key);
1922 if (dictSize(db->expires)) dictDelete(db->expires,key);
1923 retval = dictDelete(db->dict,key);
1924 decrRefCount(key);
1925
1926 return retval == DICT_OK;
1927 }
1928
1929 /* Try to share an object against the shared objects pool */
1930 static robj *tryObjectSharing(robj *o) {
1931 struct dictEntry *de;
1932 unsigned long c;
1933
1934 if (o == NULL || server.shareobjects == 0) return o;
1935
1936 assert(o->type == REDIS_STRING);
1937 de = dictFind(server.sharingpool,o);
1938 if (de) {
1939 robj *shared = dictGetEntryKey(de);
1940
1941 c = ((unsigned long) dictGetEntryVal(de))+1;
1942 dictGetEntryVal(de) = (void*) c;
1943 incrRefCount(shared);
1944 decrRefCount(o);
1945 return shared;
1946 } else {
1947 /* Here we are using a stream algorihtm: Every time an object is
1948 * shared we increment its count, everytime there is a miss we
1949 * recrement the counter of a random object. If this object reaches
1950 * zero we remove the object and put the current object instead. */
1951 if (dictSize(server.sharingpool) >=
1952 server.sharingpoolsize) {
1953 de = dictGetRandomKey(server.sharingpool);
1954 assert(de != NULL);
1955 c = ((unsigned long) dictGetEntryVal(de))-1;
1956 dictGetEntryVal(de) = (void*) c;
1957 if (c == 0) {
1958 dictDelete(server.sharingpool,de->key);
1959 }
1960 } else {
1961 c = 0; /* If the pool is empty we want to add this object */
1962 }
1963 if (c == 0) {
1964 int retval;
1965
1966 retval = dictAdd(server.sharingpool,o,(void*)1);
1967 assert(retval == DICT_OK);
1968 incrRefCount(o);
1969 }
1970 return o;
1971 }
1972 }
1973
1974 /* Check if the nul-terminated string 's' can be represented by a long
1975 * (that is, is a number that fits into long without any other space or
1976 * character before or after the digits).
1977 *
1978 * If so, the function returns REDIS_OK and *longval is set to the value
1979 * of the number. Otherwise REDIS_ERR is returned */
1980 static int isStringRepresentableAsLong(sds s, long *longval) {
1981 char buf[32], *endptr;
1982 long value;
1983 int slen;
1984
1985 value = strtol(s, &endptr, 10);
1986 if (endptr[0] != '\0') return REDIS_ERR;
1987 slen = snprintf(buf,32,"%ld",value);
1988
1989 /* If the number converted back into a string is not identical
1990 * then it's not possible to encode the string as integer */
1991 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
1992 if (longval) *longval = value;
1993 return REDIS_OK;
1994 }
1995
1996 /* Try to encode a string object in order to save space */
1997 static int tryObjectEncoding(robj *o) {
1998 long value;
1999 sds s = o->ptr;
2000
2001 if (o->encoding != REDIS_ENCODING_RAW)
2002 return REDIS_ERR; /* Already encoded */
2003
2004 /* It's not save to encode shared objects: shared objects can be shared
2005 * everywhere in the "object space" of Redis. Encoded objects can only
2006 * appear as "values" (and not, for instance, as keys) */
2007 if (o->refcount > 1) return REDIS_ERR;
2008
2009 /* Currently we try to encode only strings */
2010 assert(o->type == REDIS_STRING);
2011
2012 /* Check if we can represent this string as a long integer */
2013 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2014
2015 /* Ok, this object can be encoded */
2016 o->encoding = REDIS_ENCODING_INT;
2017 sdsfree(o->ptr);
2018 o->ptr = (void*) value;
2019 return REDIS_OK;
2020 }
2021
2022 /* Get a decoded version of an encoded object (returned as a new object) */
2023 static robj *getDecodedObject(const robj *o) {
2024 robj *dec;
2025
2026 assert(o->encoding != REDIS_ENCODING_RAW);
2027 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2028 char buf[32];
2029
2030 snprintf(buf,32,"%ld",(long)o->ptr);
2031 dec = createStringObject(buf,strlen(buf));
2032 return dec;
2033 } else {
2034 assert(1 != 1);
2035 }
2036 }
2037
2038 static int compareStringObjects(robj *a, robj *b) {
2039 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2040
2041 if (a->encoding == REDIS_ENCODING_INT && b->encoding == REDIS_ENCODING_INT){
2042 return (long)a->ptr - (long)b->ptr;
2043 } else {
2044 int retval;
2045
2046 incrRefCount(a);
2047 incrRefCount(b);
2048 if (a->encoding != REDIS_ENCODING_RAW) a = getDecodedObject(a);
2049 if (b->encoding != REDIS_ENCODING_RAW) b = getDecodedObject(a);
2050 retval = sdscmp(a->ptr,b->ptr);
2051 decrRefCount(a);
2052 decrRefCount(b);
2053 return retval;
2054 }
2055 }
2056
2057 static size_t stringObjectLen(robj *o) {
2058 assert(o->type == REDIS_STRING);
2059 if (o->encoding == REDIS_ENCODING_RAW) {
2060 return sdslen(o->ptr);
2061 } else {
2062 char buf[32];
2063
2064 return snprintf(buf,32,"%ld",(long)o->ptr);
2065 }
2066 }
2067
2068 /*============================ DB saving/loading ============================ */
2069
2070 static int rdbSaveType(FILE *fp, unsigned char type) {
2071 if (fwrite(&type,1,1,fp) == 0) return -1;
2072 return 0;
2073 }
2074
2075 static int rdbSaveTime(FILE *fp, time_t t) {
2076 int32_t t32 = (int32_t) t;
2077 if (fwrite(&t32,4,1,fp) == 0) return -1;
2078 return 0;
2079 }
2080
2081 /* check rdbLoadLen() comments for more info */
2082 static int rdbSaveLen(FILE *fp, uint32_t len) {
2083 unsigned char buf[2];
2084
2085 if (len < (1<<6)) {
2086 /* Save a 6 bit len */
2087 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2088 if (fwrite(buf,1,1,fp) == 0) return -1;
2089 } else if (len < (1<<14)) {
2090 /* Save a 14 bit len */
2091 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2092 buf[1] = len&0xFF;
2093 if (fwrite(buf,2,1,fp) == 0) return -1;
2094 } else {
2095 /* Save a 32 bit len */
2096 buf[0] = (REDIS_RDB_32BITLEN<<6);
2097 if (fwrite(buf,1,1,fp) == 0) return -1;
2098 len = htonl(len);
2099 if (fwrite(&len,4,1,fp) == 0) return -1;
2100 }
2101 return 0;
2102 }
2103
2104 /* String objects in the form "2391" "-100" without any space and with a
2105 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2106 * encoded as integers to save space */
2107 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2108 long long value;
2109 char *endptr, buf[32];
2110
2111 /* Check if it's possible to encode this value as a number */
2112 value = strtoll(s, &endptr, 10);
2113 if (endptr[0] != '\0') return 0;
2114 snprintf(buf,32,"%lld",value);
2115
2116 /* If the number converted back into a string is not identical
2117 * then it's not possible to encode the string as integer */
2118 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2119
2120 /* Finally check if it fits in our ranges */
2121 if (value >= -(1<<7) && value <= (1<<7)-1) {
2122 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2123 enc[1] = value&0xFF;
2124 return 2;
2125 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2126 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2127 enc[1] = value&0xFF;
2128 enc[2] = (value>>8)&0xFF;
2129 return 3;
2130 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2131 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2132 enc[1] = value&0xFF;
2133 enc[2] = (value>>8)&0xFF;
2134 enc[3] = (value>>16)&0xFF;
2135 enc[4] = (value>>24)&0xFF;
2136 return 5;
2137 } else {
2138 return 0;
2139 }
2140 }
2141
2142 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2143 unsigned int comprlen, outlen;
2144 unsigned char byte;
2145 void *out;
2146
2147 /* We require at least four bytes compression for this to be worth it */
2148 outlen = sdslen(obj->ptr)-4;
2149 if (outlen <= 0) return 0;
2150 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2151 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2152 if (comprlen == 0) {
2153 zfree(out);
2154 return 0;
2155 }
2156 /* Data compressed! Let's save it on disk */
2157 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2158 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2159 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2160 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2161 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2162 zfree(out);
2163 return comprlen;
2164
2165 writeerr:
2166 zfree(out);
2167 return -1;
2168 }
2169
2170 /* Save a string objet as [len][data] on disk. If the object is a string
2171 * representation of an integer value we try to safe it in a special form */
2172 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2173 size_t len;
2174 int enclen;
2175
2176 len = sdslen(obj->ptr);
2177
2178 /* Try integer encoding */
2179 if (len <= 11) {
2180 unsigned char buf[5];
2181 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2182 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2183 return 0;
2184 }
2185 }
2186
2187 /* Try LZF compression - under 20 bytes it's unable to compress even
2188 * aaaaaaaaaaaaaaaaaa so skip it */
2189 if (len > 20) {
2190 int retval;
2191
2192 retval = rdbSaveLzfStringObject(fp,obj);
2193 if (retval == -1) return -1;
2194 if (retval > 0) return 0;
2195 /* retval == 0 means data can't be compressed, save the old way */
2196 }
2197
2198 /* Store verbatim */
2199 if (rdbSaveLen(fp,len) == -1) return -1;
2200 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2201 return 0;
2202 }
2203
2204 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2205 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2206 int retval;
2207 robj *dec;
2208
2209 if (obj->encoding != REDIS_ENCODING_RAW) {
2210 dec = getDecodedObject(obj);
2211 retval = rdbSaveStringObjectRaw(fp,dec);
2212 decrRefCount(dec);
2213 return retval;
2214 } else {
2215 return rdbSaveStringObjectRaw(fp,obj);
2216 }
2217 }
2218
2219 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2220 static int rdbSave(char *filename) {
2221 dictIterator *di = NULL;
2222 dictEntry *de;
2223 FILE *fp;
2224 char tmpfile[256];
2225 int j;
2226 time_t now = time(NULL);
2227
2228 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2229 fp = fopen(tmpfile,"w");
2230 if (!fp) {
2231 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2232 return REDIS_ERR;
2233 }
2234 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2235 for (j = 0; j < server.dbnum; j++) {
2236 redisDb *db = server.db+j;
2237 dict *d = db->dict;
2238 if (dictSize(d) == 0) continue;
2239 di = dictGetIterator(d);
2240 if (!di) {
2241 fclose(fp);
2242 return REDIS_ERR;
2243 }
2244
2245 /* Write the SELECT DB opcode */
2246 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2247 if (rdbSaveLen(fp,j) == -1) goto werr;
2248
2249 /* Iterate this DB writing every entry */
2250 while((de = dictNext(di)) != NULL) {
2251 robj *key = dictGetEntryKey(de);
2252 robj *o = dictGetEntryVal(de);
2253 time_t expiretime = getExpire(db,key);
2254
2255 /* Save the expire time */
2256 if (expiretime != -1) {
2257 /* If this key is already expired skip it */
2258 if (expiretime < now) continue;
2259 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2260 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2261 }
2262 /* Save the key and associated value */
2263 if (rdbSaveType(fp,o->type) == -1) goto werr;
2264 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2265 if (o->type == REDIS_STRING) {
2266 /* Save a string value */
2267 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2268 } else if (o->type == REDIS_LIST) {
2269 /* Save a list value */
2270 list *list = o->ptr;
2271 listNode *ln;
2272
2273 listRewind(list);
2274 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2275 while((ln = listYield(list))) {
2276 robj *eleobj = listNodeValue(ln);
2277
2278 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2279 }
2280 } else if (o->type == REDIS_SET) {
2281 /* Save a set value */
2282 dict *set = o->ptr;
2283 dictIterator *di = dictGetIterator(set);
2284 dictEntry *de;
2285
2286 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2287 while((de = dictNext(di)) != NULL) {
2288 robj *eleobj = dictGetEntryKey(de);
2289
2290 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2291 }
2292 dictReleaseIterator(di);
2293 } else {
2294 assert(0 != 0);
2295 }
2296 }
2297 dictReleaseIterator(di);
2298 }
2299 /* EOF opcode */
2300 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2301
2302 /* Make sure data will not remain on the OS's output buffers */
2303 fflush(fp);
2304 fsync(fileno(fp));
2305 fclose(fp);
2306
2307 /* Use RENAME to make sure the DB file is changed atomically only
2308 * if the generate DB file is ok. */
2309 if (rename(tmpfile,filename) == -1) {
2310 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
2311 unlink(tmpfile);
2312 return REDIS_ERR;
2313 }
2314 redisLog(REDIS_NOTICE,"DB saved on disk");
2315 server.dirty = 0;
2316 server.lastsave = time(NULL);
2317 return REDIS_OK;
2318
2319 werr:
2320 fclose(fp);
2321 unlink(tmpfile);
2322 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2323 if (di) dictReleaseIterator(di);
2324 return REDIS_ERR;
2325 }
2326
2327 static int rdbSaveBackground(char *filename) {
2328 pid_t childpid;
2329
2330 if (server.bgsaveinprogress) return REDIS_ERR;
2331 if ((childpid = fork()) == 0) {
2332 /* Child */
2333 close(server.fd);
2334 if (rdbSave(filename) == REDIS_OK) {
2335 exit(0);
2336 } else {
2337 exit(1);
2338 }
2339 } else {
2340 /* Parent */
2341 if (childpid == -1) {
2342 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2343 strerror(errno));
2344 return REDIS_ERR;
2345 }
2346 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2347 server.bgsaveinprogress = 1;
2348 server.bgsavechildpid = childpid;
2349 return REDIS_OK;
2350 }
2351 return REDIS_OK; /* unreached */
2352 }
2353
2354 static void rdbRemoveTempFile(pid_t childpid) {
2355 char tmpfile[256];
2356
2357 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2358 unlink(tmpfile);
2359 }
2360
2361 static int rdbLoadType(FILE *fp) {
2362 unsigned char type;
2363 if (fread(&type,1,1,fp) == 0) return -1;
2364 return type;
2365 }
2366
2367 static time_t rdbLoadTime(FILE *fp) {
2368 int32_t t32;
2369 if (fread(&t32,4,1,fp) == 0) return -1;
2370 return (time_t) t32;
2371 }
2372
2373 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2374 * of this file for a description of how this are stored on disk.
2375 *
2376 * isencoded is set to 1 if the readed length is not actually a length but
2377 * an "encoding type", check the above comments for more info */
2378 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2379 unsigned char buf[2];
2380 uint32_t len;
2381
2382 if (isencoded) *isencoded = 0;
2383 if (rdbver == 0) {
2384 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2385 return ntohl(len);
2386 } else {
2387 int type;
2388
2389 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2390 type = (buf[0]&0xC0)>>6;
2391 if (type == REDIS_RDB_6BITLEN) {
2392 /* Read a 6 bit len */
2393 return buf[0]&0x3F;
2394 } else if (type == REDIS_RDB_ENCVAL) {
2395 /* Read a 6 bit len encoding type */
2396 if (isencoded) *isencoded = 1;
2397 return buf[0]&0x3F;
2398 } else if (type == REDIS_RDB_14BITLEN) {
2399 /* Read a 14 bit len */
2400 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2401 return ((buf[0]&0x3F)<<8)|buf[1];
2402 } else {
2403 /* Read a 32 bit len */
2404 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2405 return ntohl(len);
2406 }
2407 }
2408 }
2409
2410 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2411 unsigned char enc[4];
2412 long long val;
2413
2414 if (enctype == REDIS_RDB_ENC_INT8) {
2415 if (fread(enc,1,1,fp) == 0) return NULL;
2416 val = (signed char)enc[0];
2417 } else if (enctype == REDIS_RDB_ENC_INT16) {
2418 uint16_t v;
2419 if (fread(enc,2,1,fp) == 0) return NULL;
2420 v = enc[0]|(enc[1]<<8);
2421 val = (int16_t)v;
2422 } else if (enctype == REDIS_RDB_ENC_INT32) {
2423 uint32_t v;
2424 if (fread(enc,4,1,fp) == 0) return NULL;
2425 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2426 val = (int32_t)v;
2427 } else {
2428 val = 0; /* anti-warning */
2429 assert(0!=0);
2430 }
2431 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2432 }
2433
2434 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2435 unsigned int len, clen;
2436 unsigned char *c = NULL;
2437 sds val = NULL;
2438
2439 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2440 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2441 if ((c = zmalloc(clen)) == NULL) goto err;
2442 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2443 if (fread(c,clen,1,fp) == 0) goto err;
2444 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2445 zfree(c);
2446 return createObject(REDIS_STRING,val);
2447 err:
2448 zfree(c);
2449 sdsfree(val);
2450 return NULL;
2451 }
2452
2453 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2454 int isencoded;
2455 uint32_t len;
2456 sds val;
2457
2458 len = rdbLoadLen(fp,rdbver,&isencoded);
2459 if (isencoded) {
2460 switch(len) {
2461 case REDIS_RDB_ENC_INT8:
2462 case REDIS_RDB_ENC_INT16:
2463 case REDIS_RDB_ENC_INT32:
2464 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2465 case REDIS_RDB_ENC_LZF:
2466 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2467 default:
2468 assert(0!=0);
2469 }
2470 }
2471
2472 if (len == REDIS_RDB_LENERR) return NULL;
2473 val = sdsnewlen(NULL,len);
2474 if (len && fread(val,len,1,fp) == 0) {
2475 sdsfree(val);
2476 return NULL;
2477 }
2478 return tryObjectSharing(createObject(REDIS_STRING,val));
2479 }
2480
2481 static int rdbLoad(char *filename) {
2482 FILE *fp;
2483 robj *keyobj = NULL;
2484 uint32_t dbid;
2485 int type, retval, rdbver;
2486 dict *d = server.db[0].dict;
2487 redisDb *db = server.db+0;
2488 char buf[1024];
2489 time_t expiretime = -1, now = time(NULL);
2490
2491 fp = fopen(filename,"r");
2492 if (!fp) return REDIS_ERR;
2493 if (fread(buf,9,1,fp) == 0) goto eoferr;
2494 buf[9] = '\0';
2495 if (memcmp(buf,"REDIS",5) != 0) {
2496 fclose(fp);
2497 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2498 return REDIS_ERR;
2499 }
2500 rdbver = atoi(buf+5);
2501 if (rdbver > 1) {
2502 fclose(fp);
2503 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2504 return REDIS_ERR;
2505 }
2506 while(1) {
2507 robj *o;
2508
2509 /* Read type. */
2510 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2511 if (type == REDIS_EXPIRETIME) {
2512 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2513 /* We read the time so we need to read the object type again */
2514 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2515 }
2516 if (type == REDIS_EOF) break;
2517 /* Handle SELECT DB opcode as a special case */
2518 if (type == REDIS_SELECTDB) {
2519 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2520 goto eoferr;
2521 if (dbid >= (unsigned)server.dbnum) {
2522 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2523 exit(1);
2524 }
2525 db = server.db+dbid;
2526 d = db->dict;
2527 continue;
2528 }
2529 /* Read key */
2530 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2531
2532 if (type == REDIS_STRING) {
2533 /* Read string value */
2534 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2535 tryObjectEncoding(o);
2536 } else if (type == REDIS_LIST || type == REDIS_SET) {
2537 /* Read list/set value */
2538 uint32_t listlen;
2539
2540 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2541 goto eoferr;
2542 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2543 /* Load every single element of the list/set */
2544 while(listlen--) {
2545 robj *ele;
2546
2547 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2548 tryObjectEncoding(ele);
2549 if (type == REDIS_LIST) {
2550 listAddNodeTail((list*)o->ptr,ele);
2551 } else {
2552 dictAdd((dict*)o->ptr,ele,NULL);
2553 }
2554 }
2555 } else {
2556 assert(0 != 0);
2557 }
2558 /* Add the new object in the hash table */
2559 retval = dictAdd(d,keyobj,o);
2560 if (retval == DICT_ERR) {
2561 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2562 exit(1);
2563 }
2564 /* Set the expire time if needed */
2565 if (expiretime != -1) {
2566 setExpire(db,keyobj,expiretime);
2567 /* Delete this key if already expired */
2568 if (expiretime < now) deleteKey(db,keyobj);
2569 expiretime = -1;
2570 }
2571 keyobj = o = NULL;
2572 }
2573 fclose(fp);
2574 return REDIS_OK;
2575
2576 eoferr: /* unexpected end of file is handled here with a fatal exit */
2577 if (keyobj) decrRefCount(keyobj);
2578 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2579 exit(1);
2580 return REDIS_ERR; /* Just to avoid warning */
2581 }
2582
2583 /*================================== Commands =============================== */
2584
2585 static void authCommand(redisClient *c) {
2586 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2587 c->authenticated = 1;
2588 addReply(c,shared.ok);
2589 } else {
2590 c->authenticated = 0;
2591 addReply(c,shared.err);
2592 }
2593 }
2594
2595 static void pingCommand(redisClient *c) {
2596 addReply(c,shared.pong);
2597 }
2598
2599 static void echoCommand(redisClient *c) {
2600 addReplyBulkLen(c,c->argv[1]);
2601 addReply(c,c->argv[1]);
2602 addReply(c,shared.crlf);
2603 }
2604
2605 /*=================================== Strings =============================== */
2606
2607 static void setGenericCommand(redisClient *c, int nx) {
2608 int retval;
2609
2610 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2611 if (retval == DICT_ERR) {
2612 if (!nx) {
2613 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2614 incrRefCount(c->argv[2]);
2615 } else {
2616 addReply(c,shared.czero);
2617 return;
2618 }
2619 } else {
2620 incrRefCount(c->argv[1]);
2621 incrRefCount(c->argv[2]);
2622 }
2623 server.dirty++;
2624 removeExpire(c->db,c->argv[1]);
2625 addReply(c, nx ? shared.cone : shared.ok);
2626 }
2627
2628 static void setCommand(redisClient *c) {
2629 setGenericCommand(c,0);
2630 }
2631
2632 static void setnxCommand(redisClient *c) {
2633 setGenericCommand(c,1);
2634 }
2635
2636 static void getCommand(redisClient *c) {
2637 robj *o = lookupKeyRead(c->db,c->argv[1]);
2638
2639 if (o == NULL) {
2640 addReply(c,shared.nullbulk);
2641 } else {
2642 if (o->type != REDIS_STRING) {
2643 addReply(c,shared.wrongtypeerr);
2644 } else {
2645 addReplyBulkLen(c,o);
2646 addReply(c,o);
2647 addReply(c,shared.crlf);
2648 }
2649 }
2650 }
2651
2652 static void getsetCommand(redisClient *c) {
2653 getCommand(c);
2654 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2655 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2656 } else {
2657 incrRefCount(c->argv[1]);
2658 }
2659 incrRefCount(c->argv[2]);
2660 server.dirty++;
2661 removeExpire(c->db,c->argv[1]);
2662 }
2663
2664 static void mgetCommand(redisClient *c) {
2665 int j;
2666
2667 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2668 for (j = 1; j < c->argc; j++) {
2669 robj *o = lookupKeyRead(c->db,c->argv[j]);
2670 if (o == NULL) {
2671 addReply(c,shared.nullbulk);
2672 } else {
2673 if (o->type != REDIS_STRING) {
2674 addReply(c,shared.nullbulk);
2675 } else {
2676 addReplyBulkLen(c,o);
2677 addReply(c,o);
2678 addReply(c,shared.crlf);
2679 }
2680 }
2681 }
2682 }
2683
2684 static void incrDecrCommand(redisClient *c, long long incr) {
2685 long long value;
2686 int retval;
2687 robj *o;
2688
2689 o = lookupKeyWrite(c->db,c->argv[1]);
2690 if (o == NULL) {
2691 value = 0;
2692 } else {
2693 if (o->type != REDIS_STRING) {
2694 value = 0;
2695 } else {
2696 char *eptr;
2697
2698 if (o->encoding == REDIS_ENCODING_RAW)
2699 value = strtoll(o->ptr, &eptr, 10);
2700 else if (o->encoding == REDIS_ENCODING_INT)
2701 value = (long)o->ptr;
2702 else
2703 assert(1 != 1);
2704 }
2705 }
2706
2707 value += incr;
2708 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2709 tryObjectEncoding(o);
2710 retval = dictAdd(c->db->dict,c->argv[1],o);
2711 if (retval == DICT_ERR) {
2712 dictReplace(c->db->dict,c->argv[1],o);
2713 removeExpire(c->db,c->argv[1]);
2714 } else {
2715 incrRefCount(c->argv[1]);
2716 }
2717 server.dirty++;
2718 addReply(c,shared.colon);
2719 addReply(c,o);
2720 addReply(c,shared.crlf);
2721 }
2722
2723 static void incrCommand(redisClient *c) {
2724 incrDecrCommand(c,1);
2725 }
2726
2727 static void decrCommand(redisClient *c) {
2728 incrDecrCommand(c,-1);
2729 }
2730
2731 static void incrbyCommand(redisClient *c) {
2732 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2733 incrDecrCommand(c,incr);
2734 }
2735
2736 static void decrbyCommand(redisClient *c) {
2737 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2738 incrDecrCommand(c,-incr);
2739 }
2740
2741 /* ========================= Type agnostic commands ========================= */
2742
2743 static void delCommand(redisClient *c) {
2744 int deleted = 0, j;
2745
2746 for (j = 1; j < c->argc; j++) {
2747 if (deleteKey(c->db,c->argv[j])) {
2748 server.dirty++;
2749 deleted++;
2750 }
2751 }
2752 switch(deleted) {
2753 case 0:
2754 addReply(c,shared.czero);
2755 break;
2756 case 1:
2757 addReply(c,shared.cone);
2758 break;
2759 default:
2760 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2761 break;
2762 }
2763 }
2764
2765 static void existsCommand(redisClient *c) {
2766 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2767 }
2768
2769 static void selectCommand(redisClient *c) {
2770 int id = atoi(c->argv[1]->ptr);
2771
2772 if (selectDb(c,id) == REDIS_ERR) {
2773 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2774 } else {
2775 addReply(c,shared.ok);
2776 }
2777 }
2778
2779 static void randomkeyCommand(redisClient *c) {
2780 dictEntry *de;
2781
2782 while(1) {
2783 de = dictGetRandomKey(c->db->dict);
2784 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2785 }
2786 if (de == NULL) {
2787 addReply(c,shared.plus);
2788 addReply(c,shared.crlf);
2789 } else {
2790 addReply(c,shared.plus);
2791 addReply(c,dictGetEntryKey(de));
2792 addReply(c,shared.crlf);
2793 }
2794 }
2795
2796 static void keysCommand(redisClient *c) {
2797 dictIterator *di;
2798 dictEntry *de;
2799 sds pattern = c->argv[1]->ptr;
2800 int plen = sdslen(pattern);
2801 int numkeys = 0, keyslen = 0;
2802 robj *lenobj = createObject(REDIS_STRING,NULL);
2803
2804 di = dictGetIterator(c->db->dict);
2805 addReply(c,lenobj);
2806 decrRefCount(lenobj);
2807 while((de = dictNext(di)) != NULL) {
2808 robj *keyobj = dictGetEntryKey(de);
2809
2810 sds key = keyobj->ptr;
2811 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2812 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2813 if (expireIfNeeded(c->db,keyobj) == 0) {
2814 if (numkeys != 0)
2815 addReply(c,shared.space);
2816 addReply(c,keyobj);
2817 numkeys++;
2818 keyslen += sdslen(key);
2819 }
2820 }
2821 }
2822 dictReleaseIterator(di);
2823 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2824 addReply(c,shared.crlf);
2825 }
2826
2827 static void dbsizeCommand(redisClient *c) {
2828 addReplySds(c,
2829 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2830 }
2831
2832 static void lastsaveCommand(redisClient *c) {
2833 addReplySds(c,
2834 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2835 }
2836
2837 static void typeCommand(redisClient *c) {
2838 robj *o;
2839 char *type;
2840
2841 o = lookupKeyRead(c->db,c->argv[1]);
2842 if (o == NULL) {
2843 type = "+none";
2844 } else {
2845 switch(o->type) {
2846 case REDIS_STRING: type = "+string"; break;
2847 case REDIS_LIST: type = "+list"; break;
2848 case REDIS_SET: type = "+set"; break;
2849 default: type = "unknown"; break;
2850 }
2851 }
2852 addReplySds(c,sdsnew(type));
2853 addReply(c,shared.crlf);
2854 }
2855
2856 static void saveCommand(redisClient *c) {
2857 if (server.bgsaveinprogress) {
2858 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2859 return;
2860 }
2861 if (rdbSave(server.dbfilename) == REDIS_OK) {
2862 addReply(c,shared.ok);
2863 } else {
2864 addReply(c,shared.err);
2865 }
2866 }
2867
2868 static void bgsaveCommand(redisClient *c) {
2869 if (server.bgsaveinprogress) {
2870 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2871 return;
2872 }
2873 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2874 addReply(c,shared.ok);
2875 } else {
2876 addReply(c,shared.err);
2877 }
2878 }
2879
2880 static void shutdownCommand(redisClient *c) {
2881 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2882 /* Kill the saving child if there is a background saving in progress.
2883 We want to avoid race conditions, for instance our saving child may
2884 overwrite the synchronous saving did by SHUTDOWN. */
2885 if (server.bgsaveinprogress) {
2886 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2887 kill(server.bgsavechildpid,SIGKILL);
2888 rdbRemoveTempFile(server.bgsavechildpid);
2889 }
2890 /* SYNC SAVE */
2891 if (rdbSave(server.dbfilename) == REDIS_OK) {
2892 if (server.daemonize)
2893 unlink(server.pidfile);
2894 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2895 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2896 exit(1);
2897 } else {
2898 /* Ooops.. error saving! The best we can do is to continue operating.
2899 * Note that if there was a background saving process, in the next
2900 * cron() Redis will be notified that the background saving aborted,
2901 * handling special stuff like slaves pending for synchronization... */
2902 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2903 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2904 }
2905 }
2906
2907 static void renameGenericCommand(redisClient *c, int nx) {
2908 robj *o;
2909
2910 /* To use the same key as src and dst is probably an error */
2911 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2912 addReply(c,shared.sameobjecterr);
2913 return;
2914 }
2915
2916 o = lookupKeyWrite(c->db,c->argv[1]);
2917 if (o == NULL) {
2918 addReply(c,shared.nokeyerr);
2919 return;
2920 }
2921 incrRefCount(o);
2922 deleteIfVolatile(c->db,c->argv[2]);
2923 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2924 if (nx) {
2925 decrRefCount(o);
2926 addReply(c,shared.czero);
2927 return;
2928 }
2929 dictReplace(c->db->dict,c->argv[2],o);
2930 } else {
2931 incrRefCount(c->argv[2]);
2932 }
2933 deleteKey(c->db,c->argv[1]);
2934 server.dirty++;
2935 addReply(c,nx ? shared.cone : shared.ok);
2936 }
2937
2938 static void renameCommand(redisClient *c) {
2939 renameGenericCommand(c,0);
2940 }
2941
2942 static void renamenxCommand(redisClient *c) {
2943 renameGenericCommand(c,1);
2944 }
2945
2946 static void moveCommand(redisClient *c) {
2947 robj *o;
2948 redisDb *src, *dst;
2949 int srcid;
2950
2951 /* Obtain source and target DB pointers */
2952 src = c->db;
2953 srcid = c->db->id;
2954 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2955 addReply(c,shared.outofrangeerr);
2956 return;
2957 }
2958 dst = c->db;
2959 selectDb(c,srcid); /* Back to the source DB */
2960
2961 /* If the user is moving using as target the same
2962 * DB as the source DB it is probably an error. */
2963 if (src == dst) {
2964 addReply(c,shared.sameobjecterr);
2965 return;
2966 }
2967
2968 /* Check if the element exists and get a reference */
2969 o = lookupKeyWrite(c->db,c->argv[1]);
2970 if (!o) {
2971 addReply(c,shared.czero);
2972 return;
2973 }
2974
2975 /* Try to add the element to the target DB */
2976 deleteIfVolatile(dst,c->argv[1]);
2977 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2978 addReply(c,shared.czero);
2979 return;
2980 }
2981 incrRefCount(c->argv[1]);
2982 incrRefCount(o);
2983
2984 /* OK! key moved, free the entry in the source DB */
2985 deleteKey(src,c->argv[1]);
2986 server.dirty++;
2987 addReply(c,shared.cone);
2988 }
2989
2990 /* =================================== Lists ================================ */
2991 static void pushGenericCommand(redisClient *c, int where) {
2992 robj *lobj;
2993 list *list;
2994
2995 lobj = lookupKeyWrite(c->db,c->argv[1]);
2996 if (lobj == NULL) {
2997 lobj = createListObject();
2998 list = lobj->ptr;
2999 if (where == REDIS_HEAD) {
3000 listAddNodeHead(list,c->argv[2]);
3001 } else {
3002 listAddNodeTail(list,c->argv[2]);
3003 }
3004 dictAdd(c->db->dict,c->argv[1],lobj);
3005 incrRefCount(c->argv[1]);
3006 incrRefCount(c->argv[2]);
3007 } else {
3008 if (lobj->type != REDIS_LIST) {
3009 addReply(c,shared.wrongtypeerr);
3010 return;
3011 }
3012 list = lobj->ptr;
3013 if (where == REDIS_HEAD) {
3014 listAddNodeHead(list,c->argv[2]);
3015 } else {
3016 listAddNodeTail(list,c->argv[2]);
3017 }
3018 incrRefCount(c->argv[2]);
3019 }
3020 server.dirty++;
3021 addReply(c,shared.ok);
3022 }
3023
3024 static void lpushCommand(redisClient *c) {
3025 pushGenericCommand(c,REDIS_HEAD);
3026 }
3027
3028 static void rpushCommand(redisClient *c) {
3029 pushGenericCommand(c,REDIS_TAIL);
3030 }
3031
3032 static void llenCommand(redisClient *c) {
3033 robj *o;
3034 list *l;
3035
3036 o = lookupKeyRead(c->db,c->argv[1]);
3037 if (o == NULL) {
3038 addReply(c,shared.czero);
3039 return;
3040 } else {
3041 if (o->type != REDIS_LIST) {
3042 addReply(c,shared.wrongtypeerr);
3043 } else {
3044 l = o->ptr;
3045 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3046 }
3047 }
3048 }
3049
3050 static void lindexCommand(redisClient *c) {
3051 robj *o;
3052 int index = atoi(c->argv[2]->ptr);
3053
3054 o = lookupKeyRead(c->db,c->argv[1]);
3055 if (o == NULL) {
3056 addReply(c,shared.nullbulk);
3057 } else {
3058 if (o->type != REDIS_LIST) {
3059 addReply(c,shared.wrongtypeerr);
3060 } else {
3061 list *list = o->ptr;
3062 listNode *ln;
3063
3064 ln = listIndex(list, index);
3065 if (ln == NULL) {
3066 addReply(c,shared.nullbulk);
3067 } else {
3068 robj *ele = listNodeValue(ln);
3069 addReplyBulkLen(c,ele);
3070 addReply(c,ele);
3071 addReply(c,shared.crlf);
3072 }
3073 }
3074 }
3075 }
3076
3077 static void lsetCommand(redisClient *c) {
3078 robj *o;
3079 int index = atoi(c->argv[2]->ptr);
3080
3081 o = lookupKeyWrite(c->db,c->argv[1]);
3082 if (o == NULL) {
3083 addReply(c,shared.nokeyerr);
3084 } else {
3085 if (o->type != REDIS_LIST) {
3086 addReply(c,shared.wrongtypeerr);
3087 } else {
3088 list *list = o->ptr;
3089 listNode *ln;
3090
3091 ln = listIndex(list, index);
3092 if (ln == NULL) {
3093 addReply(c,shared.outofrangeerr);
3094 } else {
3095 robj *ele = listNodeValue(ln);
3096
3097 decrRefCount(ele);
3098 listNodeValue(ln) = c->argv[3];
3099 incrRefCount(c->argv[3]);
3100 addReply(c,shared.ok);
3101 server.dirty++;
3102 }
3103 }
3104 }
3105 }
3106
3107 static void popGenericCommand(redisClient *c, int where) {
3108 robj *o;
3109
3110 o = lookupKeyWrite(c->db,c->argv[1]);
3111 if (o == NULL) {
3112 addReply(c,shared.nullbulk);
3113 } else {
3114 if (o->type != REDIS_LIST) {
3115 addReply(c,shared.wrongtypeerr);
3116 } else {
3117 list *list = o->ptr;
3118 listNode *ln;
3119
3120 if (where == REDIS_HEAD)
3121 ln = listFirst(list);
3122 else
3123 ln = listLast(list);
3124
3125 if (ln == NULL) {
3126 addReply(c,shared.nullbulk);
3127 } else {
3128 robj *ele = listNodeValue(ln);
3129 addReplyBulkLen(c,ele);
3130 addReply(c,ele);
3131 addReply(c,shared.crlf);
3132 listDelNode(list,ln);
3133 server.dirty++;
3134 }
3135 }
3136 }
3137 }
3138
3139 static void lpopCommand(redisClient *c) {
3140 popGenericCommand(c,REDIS_HEAD);
3141 }
3142
3143 static void rpopCommand(redisClient *c) {
3144 popGenericCommand(c,REDIS_TAIL);
3145 }
3146
3147 static void lrangeCommand(redisClient *c) {
3148 robj *o;
3149 int start = atoi(c->argv[2]->ptr);
3150 int end = atoi(c->argv[3]->ptr);
3151
3152 o = lookupKeyRead(c->db,c->argv[1]);
3153 if (o == NULL) {
3154 addReply(c,shared.nullmultibulk);
3155 } else {
3156 if (o->type != REDIS_LIST) {
3157 addReply(c,shared.wrongtypeerr);
3158 } else {
3159 list *list = o->ptr;
3160 listNode *ln;
3161 int llen = listLength(list);
3162 int rangelen, j;
3163 robj *ele;
3164
3165 /* convert negative indexes */
3166 if (start < 0) start = llen+start;
3167 if (end < 0) end = llen+end;
3168 if (start < 0) start = 0;
3169 if (end < 0) end = 0;
3170
3171 /* indexes sanity checks */
3172 if (start > end || start >= llen) {
3173 /* Out of range start or start > end result in empty list */
3174 addReply(c,shared.emptymultibulk);
3175 return;
3176 }
3177 if (end >= llen) end = llen-1;
3178 rangelen = (end-start)+1;
3179
3180 /* Return the result in form of a multi-bulk reply */
3181 ln = listIndex(list, start);
3182 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3183 for (j = 0; j < rangelen; j++) {
3184 ele = listNodeValue(ln);
3185 addReplyBulkLen(c,ele);
3186 addReply(c,ele);
3187 addReply(c,shared.crlf);
3188 ln = ln->next;
3189 }
3190 }
3191 }
3192 }
3193
3194 static void ltrimCommand(redisClient *c) {
3195 robj *o;
3196 int start = atoi(c->argv[2]->ptr);
3197 int end = atoi(c->argv[3]->ptr);
3198
3199 o = lookupKeyWrite(c->db,c->argv[1]);
3200 if (o == NULL) {
3201 addReply(c,shared.nokeyerr);
3202 } else {
3203 if (o->type != REDIS_LIST) {
3204 addReply(c,shared.wrongtypeerr);
3205 } else {
3206 list *list = o->ptr;
3207 listNode *ln;
3208 int llen = listLength(list);
3209 int j, ltrim, rtrim;
3210
3211 /* convert negative indexes */
3212 if (start < 0) start = llen+start;
3213 if (end < 0) end = llen+end;
3214 if (start < 0) start = 0;
3215 if (end < 0) end = 0;
3216
3217 /* indexes sanity checks */
3218 if (start > end || start >= llen) {
3219 /* Out of range start or start > end result in empty list */
3220 ltrim = llen;
3221 rtrim = 0;
3222 } else {
3223 if (end >= llen) end = llen-1;
3224 ltrim = start;
3225 rtrim = llen-end-1;
3226 }
3227
3228 /* Remove list elements to perform the trim */
3229 for (j = 0; j < ltrim; j++) {
3230 ln = listFirst(list);
3231 listDelNode(list,ln);
3232 }
3233 for (j = 0; j < rtrim; j++) {
3234 ln = listLast(list);
3235 listDelNode(list,ln);
3236 }
3237 server.dirty++;
3238 addReply(c,shared.ok);
3239 }
3240 }
3241 }
3242
3243 static void lremCommand(redisClient *c) {
3244 robj *o;
3245
3246 o = lookupKeyWrite(c->db,c->argv[1]);
3247 if (o == NULL) {
3248 addReply(c,shared.czero);
3249 } else {
3250 if (o->type != REDIS_LIST) {
3251 addReply(c,shared.wrongtypeerr);
3252 } else {
3253 list *list = o->ptr;
3254 listNode *ln, *next;
3255 int toremove = atoi(c->argv[2]->ptr);
3256 int removed = 0;
3257 int fromtail = 0;
3258
3259 if (toremove < 0) {
3260 toremove = -toremove;
3261 fromtail = 1;
3262 }
3263 ln = fromtail ? list->tail : list->head;
3264 while (ln) {
3265 robj *ele = listNodeValue(ln);
3266
3267 next = fromtail ? ln->prev : ln->next;
3268 if (compareStringObjects(ele,c->argv[3]) == 0) {
3269 listDelNode(list,ln);
3270 server.dirty++;
3271 removed++;
3272 if (toremove && removed == toremove) break;
3273 }
3274 ln = next;
3275 }
3276 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3277 }
3278 }
3279 }
3280
3281 /* ==================================== Sets ================================ */
3282
3283 static void saddCommand(redisClient *c) {
3284 robj *set;
3285
3286 set = lookupKeyWrite(c->db,c->argv[1]);
3287 if (set == NULL) {
3288 set = createSetObject();
3289 dictAdd(c->db->dict,c->argv[1],set);
3290 incrRefCount(c->argv[1]);
3291 } else {
3292 if (set->type != REDIS_SET) {
3293 addReply(c,shared.wrongtypeerr);
3294 return;
3295 }
3296 }
3297 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3298 incrRefCount(c->argv[2]);
3299 server.dirty++;
3300 addReply(c,shared.cone);
3301 } else {
3302 addReply(c,shared.czero);
3303 }
3304 }
3305
3306 static void sremCommand(redisClient *c) {
3307 robj *set;
3308
3309 set = lookupKeyWrite(c->db,c->argv[1]);
3310 if (set == NULL) {
3311 addReply(c,shared.czero);
3312 } else {
3313 if (set->type != REDIS_SET) {
3314 addReply(c,shared.wrongtypeerr);
3315 return;
3316 }
3317 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3318 server.dirty++;
3319 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3320 addReply(c,shared.cone);
3321 } else {
3322 addReply(c,shared.czero);
3323 }
3324 }
3325 }
3326
3327 static void smoveCommand(redisClient *c) {
3328 robj *srcset, *dstset;
3329
3330 srcset = lookupKeyWrite(c->db,c->argv[1]);
3331 dstset = lookupKeyWrite(c->db,c->argv[2]);
3332
3333 /* If the source key does not exist return 0, if it's of the wrong type
3334 * raise an error */
3335 if (srcset == NULL || srcset->type != REDIS_SET) {
3336 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3337 return;
3338 }
3339 /* Error if the destination key is not a set as well */
3340 if (dstset && dstset->type != REDIS_SET) {
3341 addReply(c,shared.wrongtypeerr);
3342 return;
3343 }
3344 /* Remove the element from the source set */
3345 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3346 /* Key not found in the src set! return zero */
3347 addReply(c,shared.czero);
3348 return;
3349 }
3350 server.dirty++;
3351 /* Add the element to the destination set */
3352 if (!dstset) {
3353 dstset = createSetObject();
3354 dictAdd(c->db->dict,c->argv[2],dstset);
3355 incrRefCount(c->argv[2]);
3356 }
3357 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3358 incrRefCount(c->argv[3]);
3359 addReply(c,shared.cone);
3360 }
3361
3362 static void sismemberCommand(redisClient *c) {
3363 robj *set;
3364
3365 set = lookupKeyRead(c->db,c->argv[1]);
3366 if (set == NULL) {
3367 addReply(c,shared.czero);
3368 } else {
3369 if (set->type != REDIS_SET) {
3370 addReply(c,shared.wrongtypeerr);
3371 return;
3372 }
3373 if (dictFind(set->ptr,c->argv[2]))
3374 addReply(c,shared.cone);
3375 else
3376 addReply(c,shared.czero);
3377 }
3378 }
3379
3380 static void scardCommand(redisClient *c) {
3381 robj *o;
3382 dict *s;
3383
3384 o = lookupKeyRead(c->db,c->argv[1]);
3385 if (o == NULL) {
3386 addReply(c,shared.czero);
3387 return;
3388 } else {
3389 if (o->type != REDIS_SET) {
3390 addReply(c,shared.wrongtypeerr);
3391 } else {
3392 s = o->ptr;
3393 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3394 dictSize(s)));
3395 }
3396 }
3397 }
3398
3399 static void spopCommand(redisClient *c) {
3400 robj *set;
3401 dictEntry *de;
3402
3403 set = lookupKeyWrite(c->db,c->argv[1]);
3404 if (set == NULL) {
3405 addReply(c,shared.nullbulk);
3406 } else {
3407 if (set->type != REDIS_SET) {
3408 addReply(c,shared.wrongtypeerr);
3409 return;
3410 }
3411 de = dictGetRandomKey(set->ptr);
3412 if (de == NULL) {
3413 addReply(c,shared.nullbulk);
3414 } else {
3415 robj *ele = dictGetEntryKey(de);
3416
3417 addReplyBulkLen(c,ele);
3418 addReply(c,ele);
3419 addReply(c,shared.crlf);
3420 dictDelete(set->ptr,ele);
3421 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3422 server.dirty++;
3423 }
3424 }
3425 }
3426
3427 static void srandmemberCommand(redisClient *c) {
3428 robj *set;
3429 dictEntry *de;
3430
3431 set = lookupKeyRead(c->db,c->argv[1]);
3432 if (set == NULL) {
3433 addReply(c,shared.nullbulk);
3434 } else {
3435 if (set->type != REDIS_SET) {
3436 addReply(c,shared.wrongtypeerr);
3437 return;
3438 }
3439 de = dictGetRandomKey(set->ptr);
3440 if (de == NULL) {
3441 addReply(c,shared.nullbulk);
3442 } else {
3443 robj *ele = dictGetEntryKey(de);
3444
3445 addReplyBulkLen(c,ele);
3446 addReply(c,ele);
3447 addReply(c,shared.crlf);
3448 }
3449 }
3450 }
3451
3452 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3453 dict **d1 = (void*) s1, **d2 = (void*) s2;
3454
3455 return dictSize(*d1)-dictSize(*d2);
3456 }
3457
3458 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3459 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3460 dictIterator *di;
3461 dictEntry *de;
3462 robj *lenobj = NULL, *dstset = NULL;
3463 int j, cardinality = 0;
3464
3465 for (j = 0; j < setsnum; j++) {
3466 robj *setobj;
3467
3468 setobj = dstkey ?
3469 lookupKeyWrite(c->db,setskeys[j]) :
3470 lookupKeyRead(c->db,setskeys[j]);
3471 if (!setobj) {
3472 zfree(dv);
3473 if (dstkey) {
3474 deleteKey(c->db,dstkey);
3475 addReply(c,shared.ok);
3476 } else {
3477 addReply(c,shared.nullmultibulk);
3478 }
3479 return;
3480 }
3481 if (setobj->type != REDIS_SET) {
3482 zfree(dv);
3483 addReply(c,shared.wrongtypeerr);
3484 return;
3485 }
3486 dv[j] = setobj->ptr;
3487 }
3488 /* Sort sets from the smallest to largest, this will improve our
3489 * algorithm's performace */
3490 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3491
3492 /* The first thing we should output is the total number of elements...
3493 * since this is a multi-bulk write, but at this stage we don't know
3494 * the intersection set size, so we use a trick, append an empty object
3495 * to the output list and save the pointer to later modify it with the
3496 * right length */
3497 if (!dstkey) {
3498 lenobj = createObject(REDIS_STRING,NULL);
3499 addReply(c,lenobj);
3500 decrRefCount(lenobj);
3501 } else {
3502 /* If we have a target key where to store the resulting set
3503 * create this key with an empty set inside */
3504 dstset = createSetObject();
3505 }
3506
3507 /* Iterate all the elements of the first (smallest) set, and test
3508 * the element against all the other sets, if at least one set does
3509 * not include the element it is discarded */
3510 di = dictGetIterator(dv[0]);
3511
3512 while((de = dictNext(di)) != NULL) {
3513 robj *ele;
3514
3515 for (j = 1; j < setsnum; j++)
3516 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3517 if (j != setsnum)
3518 continue; /* at least one set does not contain the member */
3519 ele = dictGetEntryKey(de);
3520 if (!dstkey) {
3521 addReplyBulkLen(c,ele);
3522 addReply(c,ele);
3523 addReply(c,shared.crlf);
3524 cardinality++;
3525 } else {
3526 dictAdd(dstset->ptr,ele,NULL);
3527 incrRefCount(ele);
3528 }
3529 }
3530 dictReleaseIterator(di);
3531
3532 if (dstkey) {
3533 /* Store the resulting set into the target */
3534 deleteKey(c->db,dstkey);
3535 dictAdd(c->db->dict,dstkey,dstset);
3536 incrRefCount(dstkey);
3537 }
3538
3539 if (!dstkey) {
3540 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3541 } else {
3542 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3543 dictSize((dict*)dstset->ptr)));
3544 server.dirty++;
3545 }
3546 zfree(dv);
3547 }
3548
3549 static void sinterCommand(redisClient *c) {
3550 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3551 }
3552
3553 static void sinterstoreCommand(redisClient *c) {
3554 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3555 }
3556
3557 #define REDIS_OP_UNION 0
3558 #define REDIS_OP_DIFF 1
3559
3560 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3561 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3562 dictIterator *di;
3563 dictEntry *de;
3564 robj *dstset = NULL;
3565 int j, cardinality = 0;
3566
3567 for (j = 0; j < setsnum; j++) {
3568 robj *setobj;
3569
3570 setobj = dstkey ?
3571 lookupKeyWrite(c->db,setskeys[j]) :
3572 lookupKeyRead(c->db,setskeys[j]);
3573 if (!setobj) {
3574 dv[j] = NULL;
3575 continue;
3576 }
3577 if (setobj->type != REDIS_SET) {
3578 zfree(dv);
3579 addReply(c,shared.wrongtypeerr);
3580 return;
3581 }
3582 dv[j] = setobj->ptr;
3583 }
3584
3585 /* We need a temp set object to store our union. If the dstkey
3586 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3587 * this set object will be the resulting object to set into the target key*/
3588 dstset = createSetObject();
3589
3590 /* Iterate all the elements of all the sets, add every element a single
3591 * time to the result set */
3592 for (j = 0; j < setsnum; j++) {
3593 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3594 if (!dv[j]) continue; /* non existing keys are like empty sets */
3595
3596 di = dictGetIterator(dv[j]);
3597
3598 while((de = dictNext(di)) != NULL) {
3599 robj *ele;
3600
3601 /* dictAdd will not add the same element multiple times */
3602 ele = dictGetEntryKey(de);
3603 if (op == REDIS_OP_UNION || j == 0) {
3604 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3605 incrRefCount(ele);
3606 cardinality++;
3607 }
3608 } else if (op == REDIS_OP_DIFF) {
3609 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3610 cardinality--;
3611 }
3612 }
3613 }
3614 dictReleaseIterator(di);
3615
3616 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3617 }
3618
3619 /* Output the content of the resulting set, if not in STORE mode */
3620 if (!dstkey) {
3621 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3622 di = dictGetIterator(dstset->ptr);
3623 while((de = dictNext(di)) != NULL) {
3624 robj *ele;
3625
3626 ele = dictGetEntryKey(de);
3627 addReplyBulkLen(c,ele);
3628 addReply(c,ele);
3629 addReply(c,shared.crlf);
3630 }
3631 dictReleaseIterator(di);
3632 } else {
3633 /* If we have a target key where to store the resulting set
3634 * create this key with the result set inside */
3635 deleteKey(c->db,dstkey);
3636 dictAdd(c->db->dict,dstkey,dstset);
3637 incrRefCount(dstkey);
3638 }
3639
3640 /* Cleanup */
3641 if (!dstkey) {
3642 decrRefCount(dstset);
3643 } else {
3644 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3645 dictSize((dict*)dstset->ptr)));
3646 server.dirty++;
3647 }
3648 zfree(dv);
3649 }
3650
3651 static void sunionCommand(redisClient *c) {
3652 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3653 }
3654
3655 static void sunionstoreCommand(redisClient *c) {
3656 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3657 }
3658
3659 static void sdiffCommand(redisClient *c) {
3660 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3661 }
3662
3663 static void sdiffstoreCommand(redisClient *c) {
3664 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3665 }
3666
3667 /* ==================================== ZSets =============================== */
3668
3669 /* ZSETs are ordered sets using two data structures to hold the same elements
3670 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3671 * data structure.
3672 *
3673 * The elements are added to an hash table mapping Redis objects to scores.
3674 * At the same time the elements are added to a skip list mapping scores
3675 * to Redis objects (so objects are sorted by scores in this "view"). */
3676
3677 /* This skiplist implementation is almost a C translation of the original
3678 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3679 * Alternative to Balanced Trees", modified in three ways:
3680 * a) this implementation allows for repeated values.
3681 * b) the comparison is not just by key (our 'score') but by satellite data.
3682 * c) there is a back pointer, so it's a doubly linked list with the back
3683 * pointers being only at "level 1". This allows to traverse the list
3684 * from tail to head, useful for ZREVRANGE. */
3685
3686 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3687 zskiplistNode *zn = zmalloc(sizeof(*zn));
3688
3689 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3690 zn->score = score;
3691 zn->obj = obj;
3692 return zn;
3693 }
3694
3695 static zskiplist *zslCreate(void) {
3696 int j;
3697 zskiplist *zsl;
3698
3699 zsl = zmalloc(sizeof(*zsl));
3700 zsl->level = 1;
3701 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3702 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3703 zsl->header->forward[j] = NULL;
3704 return zsl;
3705 }
3706
3707 static int zslRandomLevel(void) {
3708 int level = 1;
3709 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3710 level += 1;
3711 return level;
3712 }
3713
3714 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3715 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3716 int i, level;
3717
3718 x = zsl->header;
3719 for (i = zsl->level-1; i >= 0; i--) {
3720 while (x->forward[i]->score < score)
3721 x = x->forward[i];
3722 update[i] = x;
3723 }
3724 x = x->forward[1];
3725 /* we assume the key is not already inside, since we allow duplicated
3726 * scores, and the re-insertion of score and redis object should never
3727 * happpen since the caller of zslInsert() should test in the hash table
3728 * if the element is already inside or not. */
3729 level = zslRandomLevel();
3730 if (level > zsl->level) {
3731 for (i = zsl->level; i < level; i++)
3732 update[i] = zsl->header;
3733 zsl->level = level;
3734 }
3735 x = zslCreateNode(level,score,obj);
3736 for (i = 0; i < level; i++) {
3737 x->forward[i] = update[i]->forward[i];
3738 update[i]->forward[i] = x;
3739 }
3740 }
3741
3742 /* ========================= Non type-specific commands ==================== */
3743
3744 static void flushdbCommand(redisClient *c) {
3745 server.dirty += dictSize(c->db->dict);
3746 dictEmpty(c->db->dict);
3747 dictEmpty(c->db->expires);
3748 addReply(c,shared.ok);
3749 }
3750
3751 static void flushallCommand(redisClient *c) {
3752 server.dirty += emptyDb();
3753 addReply(c,shared.ok);
3754 rdbSave(server.dbfilename);
3755 server.dirty++;
3756 }
3757
3758 static redisSortOperation *createSortOperation(int type, robj *pattern) {
3759 redisSortOperation *so = zmalloc(sizeof(*so));
3760 so->type = type;
3761 so->pattern = pattern;
3762 return so;
3763 }
3764
3765 /* Return the value associated to the key with a name obtained
3766 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3767 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3768 char *p;
3769 sds spat, ssub;
3770 robj keyobj;
3771 int prefixlen, sublen, postfixlen;
3772 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3773 struct {
3774 long len;
3775 long free;
3776 char buf[REDIS_SORTKEY_MAX+1];
3777 } keyname;
3778
3779 if (subst->encoding == REDIS_ENCODING_RAW)
3780 incrRefCount(subst);
3781 else {
3782 subst = getDecodedObject(subst);
3783 }
3784
3785 spat = pattern->ptr;
3786 ssub = subst->ptr;
3787 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3788 p = strchr(spat,'*');
3789 if (!p) return NULL;
3790
3791 prefixlen = p-spat;
3792 sublen = sdslen(ssub);
3793 postfixlen = sdslen(spat)-(prefixlen+1);
3794 memcpy(keyname.buf,spat,prefixlen);
3795 memcpy(keyname.buf+prefixlen,ssub,sublen);
3796 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3797 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3798 keyname.len = prefixlen+sublen+postfixlen;
3799
3800 keyobj.refcount = 1;
3801 keyobj.type = REDIS_STRING;
3802 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3803
3804 decrRefCount(subst);
3805
3806 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3807 return lookupKeyRead(db,&keyobj);
3808 }
3809
3810 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3811 * the additional parameter is not standard but a BSD-specific we have to
3812 * pass sorting parameters via the global 'server' structure */
3813 static int sortCompare(const void *s1, const void *s2) {
3814 const redisSortObject *so1 = s1, *so2 = s2;
3815 int cmp;
3816
3817 if (!server.sort_alpha) {
3818 /* Numeric sorting. Here it's trivial as we precomputed scores */
3819 if (so1->u.score > so2->u.score) {
3820 cmp = 1;
3821 } else if (so1->u.score < so2->u.score) {
3822 cmp = -1;
3823 } else {
3824 cmp = 0;
3825 }
3826 } else {
3827 /* Alphanumeric sorting */
3828 if (server.sort_bypattern) {
3829 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3830 /* At least one compare object is NULL */
3831 if (so1->u.cmpobj == so2->u.cmpobj)
3832 cmp = 0;
3833 else if (so1->u.cmpobj == NULL)
3834 cmp = -1;
3835 else
3836 cmp = 1;
3837 } else {
3838 /* We have both the objects, use strcoll */
3839 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3840 }
3841 } else {
3842 /* Compare elements directly */
3843 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
3844 so2->obj->encoding == REDIS_ENCODING_RAW) {
3845 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3846 } else {
3847 robj *dec1, *dec2;
3848
3849 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
3850 so1->obj : getDecodedObject(so1->obj);
3851 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
3852 so2->obj : getDecodedObject(so2->obj);
3853 cmp = strcoll(dec1->ptr,dec2->ptr);
3854 if (dec1 != so1->obj) decrRefCount(dec1);
3855 if (dec2 != so2->obj) decrRefCount(dec2);
3856 }
3857 }
3858 }
3859 return server.sort_desc ? -cmp : cmp;
3860 }
3861
3862 /* The SORT command is the most complex command in Redis. Warning: this code
3863 * is optimized for speed and a bit less for readability */
3864 static void sortCommand(redisClient *c) {
3865 list *operations;
3866 int outputlen = 0;
3867 int desc = 0, alpha = 0;
3868 int limit_start = 0, limit_count = -1, start, end;
3869 int j, dontsort = 0, vectorlen;
3870 int getop = 0; /* GET operation counter */
3871 robj *sortval, *sortby = NULL;
3872 redisSortObject *vector; /* Resulting vector to sort */
3873
3874 /* Lookup the key to sort. It must be of the right types */
3875 sortval = lookupKeyRead(c->db,c->argv[1]);
3876 if (sortval == NULL) {
3877 addReply(c,shared.nokeyerr);
3878 return;
3879 }
3880 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3881 addReply(c,shared.wrongtypeerr);
3882 return;
3883 }
3884
3885 /* Create a list of operations to perform for every sorted element.
3886 * Operations can be GET/DEL/INCR/DECR */
3887 operations = listCreate();
3888 listSetFreeMethod(operations,zfree);
3889 j = 2;
3890
3891 /* Now we need to protect sortval incrementing its count, in the future
3892 * SORT may have options able to overwrite/delete keys during the sorting
3893 * and the sorted key itself may get destroied */
3894 incrRefCount(sortval);
3895
3896 /* The SORT command has an SQL-alike syntax, parse it */
3897 while(j < c->argc) {
3898 int leftargs = c->argc-j-1;
3899 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3900 desc = 0;
3901 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3902 desc = 1;
3903 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3904 alpha = 1;
3905 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3906 limit_start = atoi(c->argv[j+1]->ptr);
3907 limit_count = atoi(c->argv[j+2]->ptr);
3908 j+=2;
3909 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3910 sortby = c->argv[j+1];
3911 /* If the BY pattern does not contain '*', i.e. it is constant,
3912 * we don't need to sort nor to lookup the weight keys. */
3913 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3914 j++;
3915 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3916 listAddNodeTail(operations,createSortOperation(
3917 REDIS_SORT_GET,c->argv[j+1]));
3918 getop++;
3919 j++;
3920 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3921 listAddNodeTail(operations,createSortOperation(
3922 REDIS_SORT_DEL,c->argv[j+1]));
3923 j++;
3924 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3925 listAddNodeTail(operations,createSortOperation(
3926 REDIS_SORT_INCR,c->argv[j+1]));
3927 j++;
3928 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3929 listAddNodeTail(operations,createSortOperation(
3930 REDIS_SORT_DECR,c->argv[j+1]));
3931 j++;
3932 } else {
3933 decrRefCount(sortval);
3934 listRelease(operations);
3935 addReply(c,shared.syntaxerr);
3936 return;
3937 }
3938 j++;
3939 }
3940
3941 /* Load the sorting vector with all the objects to sort */
3942 vectorlen = (sortval->type == REDIS_LIST) ?
3943 listLength((list*)sortval->ptr) :
3944 dictSize((dict*)sortval->ptr);
3945 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3946 j = 0;
3947 if (sortval->type == REDIS_LIST) {
3948 list *list = sortval->ptr;
3949 listNode *ln;
3950
3951 listRewind(list);
3952 while((ln = listYield(list))) {
3953 robj *ele = ln->value;
3954 vector[j].obj = ele;
3955 vector[j].u.score = 0;
3956 vector[j].u.cmpobj = NULL;
3957 j++;
3958 }
3959 } else {
3960 dict *set = sortval->ptr;
3961 dictIterator *di;
3962 dictEntry *setele;
3963
3964 di = dictGetIterator(set);
3965 while((setele = dictNext(di)) != NULL) {
3966 vector[j].obj = dictGetEntryKey(setele);
3967 vector[j].u.score = 0;
3968 vector[j].u.cmpobj = NULL;
3969 j++;
3970 }
3971 dictReleaseIterator(di);
3972 }
3973 assert(j == vectorlen);
3974
3975 /* Now it's time to load the right scores in the sorting vector */
3976 if (dontsort == 0) {
3977 for (j = 0; j < vectorlen; j++) {
3978 if (sortby) {
3979 robj *byval;
3980
3981 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3982 if (!byval || byval->type != REDIS_STRING) continue;
3983 if (alpha) {
3984 if (byval->encoding == REDIS_ENCODING_RAW) {
3985 vector[j].u.cmpobj = byval;
3986 incrRefCount(byval);
3987 } else {
3988 vector[j].u.cmpobj = getDecodedObject(byval);
3989 }
3990 } else {
3991 if (byval->encoding == REDIS_ENCODING_RAW) {
3992 vector[j].u.score = strtod(byval->ptr,NULL);
3993 } else {
3994 if (byval->encoding == REDIS_ENCODING_INT) {
3995 vector[j].u.score = (long)byval->ptr;
3996 } else
3997 assert(1 != 1);
3998 }
3999 }
4000 } else {
4001 if (!alpha) {
4002 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4003 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4004 else {
4005 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4006 vector[j].u.score = (long) vector[j].obj->ptr;
4007 else
4008 assert(1 != 1);
4009 }
4010 }
4011 }
4012 }
4013 }
4014
4015 /* We are ready to sort the vector... perform a bit of sanity check
4016 * on the LIMIT option too. We'll use a partial version of quicksort. */
4017 start = (limit_start < 0) ? 0 : limit_start;
4018 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4019 if (start >= vectorlen) {
4020 start = vectorlen-1;
4021 end = vectorlen-2;
4022 }
4023 if (end >= vectorlen) end = vectorlen-1;
4024
4025 if (dontsort == 0) {
4026 server.sort_desc = desc;
4027 server.sort_alpha = alpha;
4028 server.sort_bypattern = sortby ? 1 : 0;
4029 if (sortby && (start != 0 || end != vectorlen-1))
4030 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4031 else
4032 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4033 }
4034
4035 /* Send command output to the output buffer, performing the specified
4036 * GET/DEL/INCR/DECR operations if any. */
4037 outputlen = getop ? getop*(end-start+1) : end-start+1;
4038 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4039 for (j = start; j <= end; j++) {
4040 listNode *ln;
4041 if (!getop) {
4042 addReplyBulkLen(c,vector[j].obj);
4043 addReply(c,vector[j].obj);
4044 addReply(c,shared.crlf);
4045 }
4046 listRewind(operations);
4047 while((ln = listYield(operations))) {
4048 redisSortOperation *sop = ln->value;
4049 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4050 vector[j].obj);
4051
4052 if (sop->type == REDIS_SORT_GET) {
4053 if (!val || val->type != REDIS_STRING) {
4054 addReply(c,shared.nullbulk);
4055 } else {
4056 addReplyBulkLen(c,val);
4057 addReply(c,val);
4058 addReply(c,shared.crlf);
4059 }
4060 } else if (sop->type == REDIS_SORT_DEL) {
4061 /* TODO */
4062 }
4063 }
4064 }
4065
4066 /* Cleanup */
4067 decrRefCount(sortval);
4068 listRelease(operations);
4069 for (j = 0; j < vectorlen; j++) {
4070 if (sortby && alpha && vector[j].u.cmpobj)
4071 decrRefCount(vector[j].u.cmpobj);
4072 }
4073 zfree(vector);
4074 }
4075
4076 static void infoCommand(redisClient *c) {
4077 sds info;
4078 time_t uptime = time(NULL)-server.stat_starttime;
4079 int j;
4080
4081 info = sdscatprintf(sdsempty(),
4082 "redis_version:%s\r\n"
4083 "arch_bits:%s\r\n"
4084 "uptime_in_seconds:%d\r\n"
4085 "uptime_in_days:%d\r\n"
4086 "connected_clients:%d\r\n"
4087 "connected_slaves:%d\r\n"
4088 "used_memory:%zu\r\n"
4089 "changes_since_last_save:%lld\r\n"
4090 "bgsave_in_progress:%d\r\n"
4091 "last_save_time:%d\r\n"
4092 "total_connections_received:%lld\r\n"
4093 "total_commands_processed:%lld\r\n"
4094 "role:%s\r\n"
4095 ,REDIS_VERSION,
4096 (sizeof(long) == 8) ? "64" : "32",
4097 uptime,
4098 uptime/(3600*24),
4099 listLength(server.clients)-listLength(server.slaves),
4100 listLength(server.slaves),
4101 server.usedmemory,
4102 server.dirty,
4103 server.bgsaveinprogress,
4104 server.lastsave,
4105 server.stat_numconnections,
4106 server.stat_numcommands,
4107 server.masterhost == NULL ? "master" : "slave"
4108 );
4109 if (server.masterhost) {
4110 info = sdscatprintf(info,
4111 "master_host:%s\r\n"
4112 "master_port:%d\r\n"
4113 "master_link_status:%s\r\n"
4114 "master_last_io_seconds_ago:%d\r\n"
4115 ,server.masterhost,
4116 server.masterport,
4117 (server.replstate == REDIS_REPL_CONNECTED) ?
4118 "up" : "down",
4119 (int)(time(NULL)-server.master->lastinteraction)
4120 );
4121 }
4122 for (j = 0; j < server.dbnum; j++) {
4123 long long keys, vkeys;
4124
4125 keys = dictSize(server.db[j].dict);
4126 vkeys = dictSize(server.db[j].expires);
4127 if (keys || vkeys) {
4128 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4129 j, keys, vkeys);
4130 }
4131 }
4132 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4133 addReplySds(c,info);
4134 addReply(c,shared.crlf);
4135 }
4136
4137 static void monitorCommand(redisClient *c) {
4138 /* ignore MONITOR if aleady slave or in monitor mode */
4139 if (c->flags & REDIS_SLAVE) return;
4140
4141 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4142 c->slaveseldb = 0;
4143 listAddNodeTail(server.monitors,c);
4144 addReply(c,shared.ok);
4145 }
4146
4147 /* ================================= Expire ================================= */
4148 static int removeExpire(redisDb *db, robj *key) {
4149 if (dictDelete(db->expires,key) == DICT_OK) {
4150 return 1;
4151 } else {
4152 return 0;
4153 }
4154 }
4155
4156 static int setExpire(redisDb *db, robj *key, time_t when) {
4157 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4158 return 0;
4159 } else {
4160 incrRefCount(key);
4161 return 1;
4162 }
4163 }
4164
4165 /* Return the expire time of the specified key, or -1 if no expire
4166 * is associated with this key (i.e. the key is non volatile) */
4167 static time_t getExpire(redisDb *db, robj *key) {
4168 dictEntry *de;
4169
4170 /* No expire? return ASAP */
4171 if (dictSize(db->expires) == 0 ||
4172 (de = dictFind(db->expires,key)) == NULL) return -1;
4173
4174 return (time_t) dictGetEntryVal(de);
4175 }
4176
4177 static int expireIfNeeded(redisDb *db, robj *key) {
4178 time_t when;
4179 dictEntry *de;
4180
4181 /* No expire? return ASAP */
4182 if (dictSize(db->expires) == 0 ||
4183 (de = dictFind(db->expires,key)) == NULL) return 0;
4184
4185 /* Lookup the expire */
4186 when = (time_t) dictGetEntryVal(de);
4187 if (time(NULL) <= when) return 0;
4188
4189 /* Delete the key */
4190 dictDelete(db->expires,key);
4191 return dictDelete(db->dict,key) == DICT_OK;
4192 }
4193
4194 static int deleteIfVolatile(redisDb *db, robj *key) {
4195 dictEntry *de;
4196
4197 /* No expire? return ASAP */
4198 if (dictSize(db->expires) == 0 ||
4199 (de = dictFind(db->expires,key)) == NULL) return 0;
4200
4201 /* Delete the key */
4202 server.dirty++;
4203 dictDelete(db->expires,key);
4204 return dictDelete(db->dict,key) == DICT_OK;
4205 }
4206
4207 static void expireCommand(redisClient *c) {
4208 dictEntry *de;
4209 int seconds = atoi(c->argv[2]->ptr);
4210
4211 de = dictFind(c->db->dict,c->argv[1]);
4212 if (de == NULL) {
4213 addReply(c,shared.czero);
4214 return;
4215 }
4216 if (seconds <= 0) {
4217 addReply(c, shared.czero);
4218 return;
4219 } else {
4220 time_t when = time(NULL)+seconds;
4221 if (setExpire(c->db,c->argv[1],when)) {
4222 addReply(c,shared.cone);
4223 server.dirty++;
4224 } else {
4225 addReply(c,shared.czero);
4226 }
4227 return;
4228 }
4229 }
4230
4231 static void ttlCommand(redisClient *c) {
4232 time_t expire;
4233 int ttl = -1;
4234
4235 expire = getExpire(c->db,c->argv[1]);
4236 if (expire != -1) {
4237 ttl = (int) (expire-time(NULL));
4238 if (ttl < 0) ttl = -1;
4239 }
4240 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4241 }
4242
4243 static void msetGenericCommand(redisClient *c, int nx) {
4244 int j;
4245
4246 if ((c->argc % 2) == 0) {
4247 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4248 return;
4249 }
4250 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4251 * set nothing at all if at least one already key exists. */
4252 if (nx) {
4253 for (j = 1; j < c->argc; j += 2) {
4254 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4255 addReply(c, shared.czero);
4256 return;
4257 }
4258 }
4259 }
4260
4261 for (j = 1; j < c->argc; j += 2) {
4262 int retval;
4263
4264 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4265 if (retval == DICT_ERR) {
4266 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4267 incrRefCount(c->argv[j+1]);
4268 } else {
4269 incrRefCount(c->argv[j]);
4270 incrRefCount(c->argv[j+1]);
4271 }
4272 removeExpire(c->db,c->argv[j]);
4273 }
4274 server.dirty += (c->argc-1)/2;
4275 addReply(c, nx ? shared.cone : shared.ok);
4276 }
4277
4278 static void msetCommand(redisClient *c) {
4279 msetGenericCommand(c,0);
4280 }
4281
4282 static void msetnxCommand(redisClient *c) {
4283 msetGenericCommand(c,1);
4284 }
4285
4286 /* =============================== Replication ============================= */
4287
4288 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4289 ssize_t nwritten, ret = size;
4290 time_t start = time(NULL);
4291
4292 timeout++;
4293 while(size) {
4294 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4295 nwritten = write(fd,ptr,size);
4296 if (nwritten == -1) return -1;
4297 ptr += nwritten;
4298 size -= nwritten;
4299 }
4300 if ((time(NULL)-start) > timeout) {
4301 errno = ETIMEDOUT;
4302 return -1;
4303 }
4304 }
4305 return ret;
4306 }
4307
4308 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4309 ssize_t nread, totread = 0;
4310 time_t start = time(NULL);
4311
4312 timeout++;
4313 while(size) {
4314 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4315 nread = read(fd,ptr,size);
4316 if (nread == -1) return -1;
4317 ptr += nread;
4318 size -= nread;
4319 totread += nread;
4320 }
4321 if ((time(NULL)-start) > timeout) {
4322 errno = ETIMEDOUT;
4323 return -1;
4324 }
4325 }
4326 return totread;
4327 }
4328
4329 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4330 ssize_t nread = 0;
4331
4332 size--;
4333 while(size) {
4334 char c;
4335
4336 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4337 if (c == '\n') {
4338 *ptr = '\0';
4339 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4340 return nread;
4341 } else {
4342 *ptr++ = c;
4343 *ptr = '\0';
4344 nread++;
4345 }
4346 }
4347 return nread;
4348 }
4349
4350 static void syncCommand(redisClient *c) {
4351 /* ignore SYNC if aleady slave or in monitor mode */
4352 if (c->flags & REDIS_SLAVE) return;
4353
4354 /* SYNC can't be issued when the server has pending data to send to
4355 * the client about already issued commands. We need a fresh reply
4356 * buffer registering the differences between the BGSAVE and the current
4357 * dataset, so that we can copy to other slaves if needed. */
4358 if (listLength(c->reply) != 0) {
4359 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4360 return;
4361 }
4362
4363 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4364 /* Here we need to check if there is a background saving operation
4365 * in progress, or if it is required to start one */
4366 if (server.bgsaveinprogress) {
4367 /* Ok a background save is in progress. Let's check if it is a good
4368 * one for replication, i.e. if there is another slave that is
4369 * registering differences since the server forked to save */
4370 redisClient *slave;
4371 listNode *ln;
4372
4373 listRewind(server.slaves);
4374 while((ln = listYield(server.slaves))) {
4375 slave = ln->value;
4376 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4377 }
4378 if (ln) {
4379 /* Perfect, the server is already registering differences for
4380 * another slave. Set the right state, and copy the buffer. */
4381 listRelease(c->reply);
4382 c->reply = listDup(slave->reply);
4383 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4384 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4385 } else {
4386 /* No way, we need to wait for the next BGSAVE in order to
4387 * register differences */
4388 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4389 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4390 }
4391 } else {
4392 /* Ok we don't have a BGSAVE in progress, let's start one */
4393 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4394 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4395 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4396 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4397 return;
4398 }
4399 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4400 }
4401 c->repldbfd = -1;
4402 c->flags |= REDIS_SLAVE;
4403 c->slaveseldb = 0;
4404 listAddNodeTail(server.slaves,c);
4405 return;
4406 }
4407
4408 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4409 redisClient *slave = privdata;
4410 REDIS_NOTUSED(el);
4411 REDIS_NOTUSED(mask);
4412 char buf[REDIS_IOBUF_LEN];
4413 ssize_t nwritten, buflen;
4414
4415 if (slave->repldboff == 0) {
4416 /* Write the bulk write count before to transfer the DB. In theory here
4417 * we don't know how much room there is in the output buffer of the
4418 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4419 * operations) will never be smaller than the few bytes we need. */
4420 sds bulkcount;
4421
4422 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4423 slave->repldbsize);
4424 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4425 {
4426 sdsfree(bulkcount);
4427 freeClient(slave);
4428 return;
4429 }
4430 sdsfree(bulkcount);
4431 }
4432 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4433 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4434 if (buflen <= 0) {
4435 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4436 (buflen == 0) ? "premature EOF" : strerror(errno));
4437 freeClient(slave);
4438 return;
4439 }
4440 if ((nwritten = write(fd,buf,buflen)) == -1) {
4441 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4442 strerror(errno));
4443 freeClient(slave);
4444 return;
4445 }
4446 slave->repldboff += nwritten;
4447 if (slave->repldboff == slave->repldbsize) {
4448 close(slave->repldbfd);
4449 slave->repldbfd = -1;
4450 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4451 slave->replstate = REDIS_REPL_ONLINE;
4452 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4453 sendReplyToClient, slave, NULL) == AE_ERR) {
4454 freeClient(slave);
4455 return;
4456 }
4457 addReplySds(slave,sdsempty());
4458 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4459 }
4460 }
4461
4462 /* This function is called at the end of every backgrond saving.
4463 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4464 * otherwise REDIS_ERR is passed to the function.
4465 *
4466 * The goal of this function is to handle slaves waiting for a successful
4467 * background saving in order to perform non-blocking synchronization. */
4468 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4469 listNode *ln;
4470 int startbgsave = 0;
4471
4472 listRewind(server.slaves);
4473 while((ln = listYield(server.slaves))) {
4474 redisClient *slave = ln->value;
4475
4476 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4477 startbgsave = 1;
4478 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4479 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4480 struct redis_stat buf;
4481
4482 if (bgsaveerr != REDIS_OK) {
4483 freeClient(slave);
4484 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4485 continue;
4486 }
4487 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4488 redis_fstat(slave->repldbfd,&buf) == -1) {
4489 freeClient(slave);
4490 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4491 continue;
4492 }
4493 slave->repldboff = 0;
4494 slave->repldbsize = buf.st_size;
4495 slave->replstate = REDIS_REPL_SEND_BULK;
4496 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4497 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4498 freeClient(slave);
4499 continue;
4500 }
4501 }
4502 }
4503 if (startbgsave) {
4504 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4505 listRewind(server.slaves);
4506 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4507 while((ln = listYield(server.slaves))) {
4508 redisClient *slave = ln->value;
4509
4510 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4511 freeClient(slave);
4512 }
4513 }
4514 }
4515 }
4516
4517 static int syncWithMaster(void) {
4518 char buf[1024], tmpfile[256];
4519 int dumpsize;
4520 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4521 int dfd;
4522
4523 if (fd == -1) {
4524 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4525 strerror(errno));
4526 return REDIS_ERR;
4527 }
4528 /* Issue the SYNC command */
4529 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4530 close(fd);
4531 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4532 strerror(errno));
4533 return REDIS_ERR;
4534 }
4535 /* Read the bulk write count */
4536 if (syncReadLine(fd,buf,1024,3600) == -1) {
4537 close(fd);
4538 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
4539 strerror(errno));
4540 return REDIS_ERR;
4541 }
4542 dumpsize = atoi(buf+1);
4543 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4544 /* Read the bulk write data on a temp file */
4545 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4546 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4547 if (dfd == -1) {
4548 close(fd);
4549 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
4550 return REDIS_ERR;
4551 }
4552 while(dumpsize) {
4553 int nread, nwritten;
4554
4555 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
4556 if (nread == -1) {
4557 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
4558 strerror(errno));
4559 close(fd);
4560 close(dfd);
4561 return REDIS_ERR;
4562 }
4563 nwritten = write(dfd,buf,nread);
4564 if (nwritten == -1) {
4565 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
4566 close(fd);
4567 close(dfd);
4568 return REDIS_ERR;
4569 }
4570 dumpsize -= nread;
4571 }
4572 close(dfd);
4573 if (rename(tmpfile,server.dbfilename) == -1) {
4574 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
4575 unlink(tmpfile);
4576 close(fd);
4577 return REDIS_ERR;
4578 }
4579 emptyDb();
4580 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4581 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4582 close(fd);
4583 return REDIS_ERR;
4584 }
4585 server.master = createClient(fd);
4586 server.master->flags |= REDIS_MASTER;
4587 server.replstate = REDIS_REPL_CONNECTED;
4588 return REDIS_OK;
4589 }
4590
4591 static void slaveofCommand(redisClient *c) {
4592 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4593 !strcasecmp(c->argv[2]->ptr,"one")) {
4594 if (server.masterhost) {
4595 sdsfree(server.masterhost);
4596 server.masterhost = NULL;
4597 if (server.master) freeClient(server.master);
4598 server.replstate = REDIS_REPL_NONE;
4599 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4600 }
4601 } else {
4602 sdsfree(server.masterhost);
4603 server.masterhost = sdsdup(c->argv[1]->ptr);
4604 server.masterport = atoi(c->argv[2]->ptr);
4605 if (server.master) freeClient(server.master);
4606 server.replstate = REDIS_REPL_CONNECT;
4607 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4608 server.masterhost, server.masterport);
4609 }
4610 addReply(c,shared.ok);
4611 }
4612
4613 /* ============================ Maxmemory directive ======================== */
4614
4615 /* This function gets called when 'maxmemory' is set on the config file to limit
4616 * the max memory used by the server, and we are out of memory.
4617 * This function will try to, in order:
4618 *
4619 * - Free objects from the free list
4620 * - Try to remove keys with an EXPIRE set
4621 *
4622 * It is not possible to free enough memory to reach used-memory < maxmemory
4623 * the server will start refusing commands that will enlarge even more the
4624 * memory usage.
4625 */
4626 static void freeMemoryIfNeeded(void) {
4627 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4628 if (listLength(server.objfreelist)) {
4629 robj *o;
4630
4631 listNode *head = listFirst(server.objfreelist);
4632 o = listNodeValue(head);
4633 listDelNode(server.objfreelist,head);
4634 zfree(o);
4635 } else {
4636 int j, k, freed = 0;
4637
4638 for (j = 0; j < server.dbnum; j++) {
4639 int minttl = -1;
4640 robj *minkey = NULL;
4641 struct dictEntry *de;
4642
4643 if (dictSize(server.db[j].expires)) {
4644 freed = 1;
4645 /* From a sample of three keys drop the one nearest to
4646 * the natural expire */
4647 for (k = 0; k < 3; k++) {
4648 time_t t;
4649
4650 de = dictGetRandomKey(server.db[j].expires);
4651 t = (time_t) dictGetEntryVal(de);
4652 if (minttl == -1 || t < minttl) {
4653 minkey = dictGetEntryKey(de);
4654 minttl = t;
4655 }
4656 }
4657 deleteKey(server.db+j,minkey);
4658 }
4659 }
4660 if (!freed) return; /* nothing to free... */
4661 }
4662 }
4663 }
4664
4665 /* ================================= Debugging ============================== */
4666
4667 static void debugCommand(redisClient *c) {
4668 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4669 *((char*)-1) = 'x';
4670 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4671 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4672 robj *key, *val;
4673
4674 if (!de) {
4675 addReply(c,shared.nokeyerr);
4676 return;
4677 }
4678 key = dictGetEntryKey(de);
4679 val = dictGetEntryVal(de);
4680 addReplySds(c,sdscatprintf(sdsempty(),
4681 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4682 key, key->refcount, val, val->refcount, val->encoding));
4683 } else {
4684 addReplySds(c,sdsnew(
4685 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4686 }
4687 }
4688
4689 #ifdef HAVE_BACKTRACE
4690 static struct redisFunctionSym symsTable[] = {
4691 {"compareStringObjects", (unsigned long)compareStringObjects},
4692 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
4693 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
4694 {"dictEncObjHash", (unsigned long)dictEncObjHash},
4695 {"incrDecrCommand", (unsigned long)incrDecrCommand},
4696 {"freeStringObject", (unsigned long)freeStringObject},
4697 {"freeListObject", (unsigned long)freeListObject},
4698 {"freeSetObject", (unsigned long)freeSetObject},
4699 {"decrRefCount", (unsigned long)decrRefCount},
4700 {"createObject", (unsigned long)createObject},
4701 {"freeClient", (unsigned long)freeClient},
4702 {"rdbLoad", (unsigned long)rdbLoad},
4703 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
4704 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
4705 {"addReply", (unsigned long)addReply},
4706 {"addReplySds", (unsigned long)addReplySds},
4707 {"incrRefCount", (unsigned long)incrRefCount},
4708 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
4709 {"createStringObject", (unsigned long)createStringObject},
4710 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
4711 {"syncWithMaster", (unsigned long)syncWithMaster},
4712 {"tryObjectSharing", (unsigned long)tryObjectSharing},
4713 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
4714 {"getDecodedObject", (unsigned long)getDecodedObject},
4715 {"removeExpire", (unsigned long)removeExpire},
4716 {"expireIfNeeded", (unsigned long)expireIfNeeded},
4717 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
4718 {"deleteKey", (unsigned long)deleteKey},
4719 {"getExpire", (unsigned long)getExpire},
4720 {"setExpire", (unsigned long)setExpire},
4721 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
4722 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
4723 {"authCommand", (unsigned long)authCommand},
4724 {"pingCommand", (unsigned long)pingCommand},
4725 {"echoCommand", (unsigned long)echoCommand},
4726 {"setCommand", (unsigned long)setCommand},
4727 {"setnxCommand", (unsigned long)setnxCommand},
4728 {"getCommand", (unsigned long)getCommand},
4729 {"delCommand", (unsigned long)delCommand},
4730 {"existsCommand", (unsigned long)existsCommand},
4731 {"incrCommand", (unsigned long)incrCommand},
4732 {"decrCommand", (unsigned long)decrCommand},
4733 {"incrbyCommand", (unsigned long)incrbyCommand},
4734 {"decrbyCommand", (unsigned long)decrbyCommand},
4735 {"selectCommand", (unsigned long)selectCommand},
4736 {"randomkeyCommand", (unsigned long)randomkeyCommand},
4737 {"keysCommand", (unsigned long)keysCommand},
4738 {"dbsizeCommand", (unsigned long)dbsizeCommand},
4739 {"lastsaveCommand", (unsigned long)lastsaveCommand},
4740 {"saveCommand", (unsigned long)saveCommand},
4741 {"bgsaveCommand", (unsigned long)bgsaveCommand},
4742 {"shutdownCommand", (unsigned long)shutdownCommand},
4743 {"moveCommand", (unsigned long)moveCommand},
4744 {"renameCommand", (unsigned long)renameCommand},
4745 {"renamenxCommand", (unsigned long)renamenxCommand},
4746 {"lpushCommand", (unsigned long)lpushCommand},
4747 {"rpushCommand", (unsigned long)rpushCommand},
4748 {"lpopCommand", (unsigned long)lpopCommand},
4749 {"rpopCommand", (unsigned long)rpopCommand},
4750 {"llenCommand", (unsigned long)llenCommand},
4751 {"lindexCommand", (unsigned long)lindexCommand},
4752 {"lrangeCommand", (unsigned long)lrangeCommand},
4753 {"ltrimCommand", (unsigned long)ltrimCommand},
4754 {"typeCommand", (unsigned long)typeCommand},
4755 {"lsetCommand", (unsigned long)lsetCommand},
4756 {"saddCommand", (unsigned long)saddCommand},
4757 {"sremCommand", (unsigned long)sremCommand},
4758 {"smoveCommand", (unsigned long)smoveCommand},
4759 {"sismemberCommand", (unsigned long)sismemberCommand},
4760 {"scardCommand", (unsigned long)scardCommand},
4761 {"spopCommand", (unsigned long)spopCommand},
4762 {"srandmemberCommand", (unsigned long)srandmemberCommand},
4763 {"sinterCommand", (unsigned long)sinterCommand},
4764 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
4765 {"sunionCommand", (unsigned long)sunionCommand},
4766 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
4767 {"sdiffCommand", (unsigned long)sdiffCommand},
4768 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
4769 {"syncCommand", (unsigned long)syncCommand},
4770 {"flushdbCommand", (unsigned long)flushdbCommand},
4771 {"flushallCommand", (unsigned long)flushallCommand},
4772 {"sortCommand", (unsigned long)sortCommand},
4773 {"lremCommand", (unsigned long)lremCommand},
4774 {"infoCommand", (unsigned long)infoCommand},
4775 {"mgetCommand", (unsigned long)mgetCommand},
4776 {"monitorCommand", (unsigned long)monitorCommand},
4777 {"expireCommand", (unsigned long)expireCommand},
4778 {"getsetCommand", (unsigned long)getsetCommand},
4779 {"ttlCommand", (unsigned long)ttlCommand},
4780 {"slaveofCommand", (unsigned long)slaveofCommand},
4781 {"debugCommand", (unsigned long)debugCommand},
4782 {"processCommand", (unsigned long)processCommand},
4783 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
4784 {"readQueryFromClient", (unsigned long)readQueryFromClient},
4785 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
4786 {"msetGenericCommand", (unsigned long)msetGenericCommand},
4787 {"msetCommand", (unsigned long)msetCommand},
4788 {"msetnxCommand", (unsigned long)msetnxCommand},
4789 {NULL,0}
4790 };
4791
4792 /* This function try to convert a pointer into a function name. It's used in
4793 * oreder to provide a backtrace under segmentation fault that's able to
4794 * display functions declared as static (otherwise the backtrace is useless). */
4795 static char *findFuncName(void *pointer, unsigned long *offset){
4796 int i, ret = -1;
4797 unsigned long off, minoff = 0;
4798
4799 /* Try to match against the Symbol with the smallest offset */
4800 for (i=0; symsTable[i].pointer; i++) {
4801 unsigned long lp = (unsigned long) pointer;
4802
4803 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
4804 off=lp-symsTable[i].pointer;
4805 if (ret < 0 || off < minoff) {
4806 minoff=off;
4807 ret=i;
4808 }
4809 }
4810 }
4811 if (ret == -1) return NULL;
4812 *offset = minoff;
4813 return symsTable[ret].name;
4814 }
4815
4816 static void *getMcontextEip(ucontext_t *uc) {
4817 #if defined(__FreeBSD__)
4818 return (void*) uc->uc_mcontext.mc_eip;
4819 #elif defined(__dietlibc__)
4820 return (void*) uc->uc_mcontext.eip;
4821 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
4822 return (void*) uc->uc_mcontext->__ss.__eip;
4823 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
4824 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
4825 return (void*) uc->uc_mcontext->__ss.__rip;
4826 #else
4827 return (void*) uc->uc_mcontext->__ss.__eip;
4828 #endif
4829 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
4830 return (void*) uc->uc_mcontext.gregs[REG_EIP];
4831 #elif defined(__ia64__) /* Linux IA64 */
4832 return (void*) uc->uc_mcontext.sc_ip;
4833 #else
4834 return NULL;
4835 #endif
4836 }
4837
4838 static void segvHandler(int sig, siginfo_t *info, void *secret) {
4839 void *trace[100];
4840 char **messages = NULL;
4841 int i, trace_size = 0;
4842 unsigned long offset=0;
4843 time_t uptime = time(NULL)-server.stat_starttime;
4844 ucontext_t *uc = (ucontext_t*) secret;
4845 REDIS_NOTUSED(info);
4846
4847 redisLog(REDIS_WARNING,
4848 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
4849 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
4850 "redis_version:%s; "
4851 "uptime_in_seconds:%d; "
4852 "connected_clients:%d; "
4853 "connected_slaves:%d; "
4854 "used_memory:%zu; "
4855 "changes_since_last_save:%lld; "
4856 "bgsave_in_progress:%d; "
4857 "last_save_time:%d; "
4858 "total_connections_received:%lld; "
4859 "total_commands_processed:%lld; "
4860 "role:%s;"
4861 ,REDIS_VERSION,
4862 uptime,
4863 listLength(server.clients)-listLength(server.slaves),
4864 listLength(server.slaves),
4865 server.usedmemory,
4866 server.dirty,
4867 server.bgsaveinprogress,
4868 server.lastsave,
4869 server.stat_numconnections,
4870 server.stat_numcommands,
4871 server.masterhost == NULL ? "master" : "slave"
4872 ));
4873
4874 trace_size = backtrace(trace, 100);
4875 /* overwrite sigaction with caller's address */
4876 if (getMcontextEip(uc) != NULL) {
4877 trace[1] = getMcontextEip(uc);
4878 }
4879 messages = backtrace_symbols(trace, trace_size);
4880
4881 for (i=1; i<trace_size; ++i) {
4882 char *fn = findFuncName(trace[i], &offset), *p;
4883
4884 p = strchr(messages[i],'+');
4885 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
4886 redisLog(REDIS_WARNING,"%s", messages[i]);
4887 } else {
4888 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
4889 }
4890 }
4891 free(messages);
4892 exit(0);
4893 }
4894
4895 static void setupSigSegvAction(void) {
4896 struct sigaction act;
4897
4898 sigemptyset (&act.sa_mask);
4899 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4900 * is used. Otherwise, sa_handler is used */
4901 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
4902 act.sa_sigaction = segvHandler;
4903 sigaction (SIGSEGV, &act, NULL);
4904 sigaction (SIGBUS, &act, NULL);
4905 sigaction (SIGFPE, &act, NULL);
4906 sigaction (SIGILL, &act, NULL);
4907 sigaction (SIGBUS, &act, NULL);
4908 return;
4909 }
4910 #else /* HAVE_BACKTRACE */
4911 static void setupSigSegvAction(void) {
4912 }
4913 #endif /* HAVE_BACKTRACE */
4914
4915 /* =================================== Main! ================================ */
4916
4917 #ifdef __linux__
4918 int linuxOvercommitMemoryValue(void) {
4919 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4920 char buf[64];
4921
4922 if (!fp) return -1;
4923 if (fgets(buf,64,fp) == NULL) {
4924 fclose(fp);
4925 return -1;
4926 }
4927 fclose(fp);
4928
4929 return atoi(buf);
4930 }
4931
4932 void linuxOvercommitMemoryWarning(void) {
4933 if (linuxOvercommitMemoryValue() == 0) {
4934 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
4935 }
4936 }
4937 #endif /* __linux__ */
4938
4939 static void daemonize(void) {
4940 int fd;
4941 FILE *fp;
4942
4943 if (fork() != 0) exit(0); /* parent exits */
4944 setsid(); /* create a new session */
4945
4946 /* Every output goes to /dev/null. If Redis is daemonized but
4947 * the 'logfile' is set to 'stdout' in the configuration file
4948 * it will not log at all. */
4949 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4950 dup2(fd, STDIN_FILENO);
4951 dup2(fd, STDOUT_FILENO);
4952 dup2(fd, STDERR_FILENO);
4953 if (fd > STDERR_FILENO) close(fd);
4954 }
4955 /* Try to write the pid file */
4956 fp = fopen(server.pidfile,"w");
4957 if (fp) {
4958 fprintf(fp,"%d\n",getpid());
4959 fclose(fp);
4960 }
4961 }
4962
4963 int main(int argc, char **argv) {
4964 initServerConfig();
4965 if (argc == 2) {
4966 ResetServerSaveParams();
4967 loadServerConfig(argv[1]);
4968 } else if (argc > 2) {
4969 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4970 exit(1);
4971 } else {
4972 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4973 }
4974 initServer();
4975 if (server.daemonize) daemonize();
4976 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4977 #ifdef __linux__
4978 linuxOvercommitMemoryWarning();
4979 #endif
4980 if (rdbLoad(server.dbfilename) == REDIS_OK)
4981 redisLog(REDIS_NOTICE,"DB loaded from disk");
4982 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4983 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4984 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4985 aeMain(server.el);
4986 aeDeleteEventLoop(server.el);
4987 return 0;
4988 }