]> git.saurik.com Git - redis.git/blob - redis.c
65e62172b3e4b149872117aa2b545bdccaa43661
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60
61 #include "redis.h"
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
70
71 /* Error codes */
72 #define REDIS_OK 0
73 #define REDIS_ERR -1
74
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
88
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
91
92 /* Command flags */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
100
101 /* Object types */
102 #define REDIS_STRING 0
103 #define REDIS_LIST 1
104 #define REDIS_SET 2
105 #define REDIS_ZSET 3
106 #define REDIS_HASH 4
107
108 /* Objects encoding */
109 #define REDIS_ENCODING_RAW 0 /* Raw representation */
110 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
111
112 /* Object types only used for dumping to disk */
113 #define REDIS_EXPIRETIME 253
114 #define REDIS_SELECTDB 254
115 #define REDIS_EOF 255
116
117 /* Defines related to the dump file format. To store 32 bits lengths for short
118 * keys requires a lot of space, so we check the most significant 2 bits of
119 * the first byte to interpreter the length:
120 *
121 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
122 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
123 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
124 * 11|000000 this means: specially encoded object will follow. The six bits
125 * number specify the kind of object that follows.
126 * See the REDIS_RDB_ENC_* defines.
127 *
128 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
129 * values, will fit inside. */
130 #define REDIS_RDB_6BITLEN 0
131 #define REDIS_RDB_14BITLEN 1
132 #define REDIS_RDB_32BITLEN 2
133 #define REDIS_RDB_ENCVAL 3
134 #define REDIS_RDB_LENERR UINT_MAX
135
136 /* When a length of a string object stored on disk has the first two bits
137 * set, the remaining two bits specify a special encoding for the object
138 * accordingly to the following defines: */
139 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
140 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
141 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
142 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
143
144 /* Client flags */
145 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
146 #define REDIS_SLAVE 2 /* This client is a slave server */
147 #define REDIS_MASTER 4 /* This client is a master server */
148 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
149
150 /* Slave replication state - slave side */
151 #define REDIS_REPL_NONE 0 /* No active replication */
152 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
153 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
154
155 /* Slave replication state - from the point of view of master
156 * Note that in SEND_BULK and ONLINE state the slave receives new updates
157 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
158 * to start the next background saving in order to send updates to it. */
159 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
160 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
161 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
162 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
163
164 /* List related stuff */
165 #define REDIS_HEAD 0
166 #define REDIS_TAIL 1
167
168 /* Sort operations */
169 #define REDIS_SORT_GET 0
170 #define REDIS_SORT_DEL 1
171 #define REDIS_SORT_INCR 2
172 #define REDIS_SORT_DECR 3
173 #define REDIS_SORT_ASC 4
174 #define REDIS_SORT_DESC 5
175 #define REDIS_SORTKEY_MAX 1024
176
177 /* Log levels */
178 #define REDIS_DEBUG 0
179 #define REDIS_NOTICE 1
180 #define REDIS_WARNING 2
181
182 /* Anti-warning macro... */
183 #define REDIS_NOTUSED(V) ((void) V)
184
185 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
186 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
187
188 /*================================= Data types ============================== */
189
190 /* A redis object, that is a type able to hold a string / list / set */
191 typedef struct redisObject {
192 void *ptr;
193 unsigned char type;
194 unsigned char encoding;
195 unsigned char notused[2];
196 int refcount;
197 } robj;
198
199 typedef struct redisDb {
200 dict *dict;
201 dict *expires;
202 int id;
203 } redisDb;
204
205 /* With multiplexing we need to take per-clinet state.
206 * Clients are taken in a liked list. */
207 typedef struct redisClient {
208 int fd;
209 redisDb *db;
210 int dictid;
211 sds querybuf;
212 robj **argv, **mbargv;
213 int argc, mbargc;
214 int bulklen; /* bulk read len. -1 if not in bulk read mode */
215 int multibulk; /* multi bulk command format active */
216 list *reply;
217 int sentlen;
218 time_t lastinteraction; /* time of the last interaction, used for timeout */
219 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
220 int slaveseldb; /* slave selected db, if this client is a slave */
221 int authenticated; /* when requirepass is non-NULL */
222 int replstate; /* replication state if this is a slave */
223 int repldbfd; /* replication DB file descriptor */
224 long repldboff; /* replication DB file offset */
225 off_t repldbsize; /* replication DB file size */
226 } redisClient;
227
228 struct saveparam {
229 time_t seconds;
230 int changes;
231 };
232
233 /* Global server state structure */
234 struct redisServer {
235 int port;
236 int fd;
237 redisDb *db;
238 dict *sharingpool;
239 unsigned int sharingpoolsize;
240 long long dirty; /* changes to DB from the last save */
241 list *clients;
242 list *slaves, *monitors;
243 char neterr[ANET_ERR_LEN];
244 aeEventLoop *el;
245 int cronloops; /* number of times the cron function run */
246 list *objfreelist; /* A list of freed objects to avoid malloc() */
247 time_t lastsave; /* Unix time of last save succeeede */
248 size_t usedmemory; /* Used memory in megabytes */
249 /* Fields used only for stats */
250 time_t stat_starttime; /* server start time */
251 long long stat_numcommands; /* number of processed commands */
252 long long stat_numconnections; /* number of connections received */
253 /* Configuration */
254 int verbosity;
255 int glueoutputbuf;
256 int maxidletime;
257 int dbnum;
258 int daemonize;
259 char *pidfile;
260 int bgsaveinprogress;
261 pid_t bgsavechildpid;
262 struct saveparam *saveparams;
263 int saveparamslen;
264 char *logfile;
265 char *bindaddr;
266 char *dbfilename;
267 char *requirepass;
268 int shareobjects;
269 /* Replication related */
270 int isslave;
271 char *masterhost;
272 int masterport;
273 redisClient *master; /* client that is master for this slave */
274 int replstate;
275 unsigned int maxclients;
276 unsigned long maxmemory;
277 /* Sort parameters - qsort_r() is only available under BSD so we
278 * have to take this state global, in order to pass it to sortCompare() */
279 int sort_desc;
280 int sort_alpha;
281 int sort_bypattern;
282 };
283
284 typedef void redisCommandProc(redisClient *c);
285 struct redisCommand {
286 char *name;
287 redisCommandProc *proc;
288 int arity;
289 int flags;
290 };
291
292 struct redisFunctionSym {
293 char *name;
294 unsigned long pointer;
295 };
296
297 typedef struct _redisSortObject {
298 robj *obj;
299 union {
300 double score;
301 robj *cmpobj;
302 } u;
303 } redisSortObject;
304
305 typedef struct _redisSortOperation {
306 int type;
307 robj *pattern;
308 } redisSortOperation;
309
310 /* ZSETs use a specialized version of Skiplists */
311
312 typedef struct zskiplistNode {
313 struct zskiplistNode **forward;
314 double score;
315 robj *obj;
316 } zskiplistNode;
317
318 typedef struct zskiplist {
319 struct zskiplistNode *header;
320 long length;
321 int level;
322 } zskiplist;
323
324 typedef struct zset {
325 dict *dict;
326 zskiplist *zsl;
327 } zset;
328
329 /* Our shared "common" objects */
330
331 struct sharedObjectsStruct {
332 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
333 *colon, *nullbulk, *nullmultibulk,
334 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
335 *outofrangeerr, *plus,
336 *select0, *select1, *select2, *select3, *select4,
337 *select5, *select6, *select7, *select8, *select9;
338 } shared;
339
340 /*================================ Prototypes =============================== */
341
342 static void freeStringObject(robj *o);
343 static void freeListObject(robj *o);
344 static void freeSetObject(robj *o);
345 static void decrRefCount(void *o);
346 static robj *createObject(int type, void *ptr);
347 static void freeClient(redisClient *c);
348 static int rdbLoad(char *filename);
349 static void addReply(redisClient *c, robj *obj);
350 static void addReplySds(redisClient *c, sds s);
351 static void incrRefCount(robj *o);
352 static int rdbSaveBackground(char *filename);
353 static robj *createStringObject(char *ptr, size_t len);
354 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
355 static int syncWithMaster(void);
356 static robj *tryObjectSharing(robj *o);
357 static int tryObjectEncoding(robj *o);
358 static robj *getDecodedObject(const robj *o);
359 static int removeExpire(redisDb *db, robj *key);
360 static int expireIfNeeded(redisDb *db, robj *key);
361 static int deleteIfVolatile(redisDb *db, robj *key);
362 static int deleteKey(redisDb *db, robj *key);
363 static time_t getExpire(redisDb *db, robj *key);
364 static int setExpire(redisDb *db, robj *key, time_t when);
365 static void updateSlavesWaitingBgsave(int bgsaveerr);
366 static void freeMemoryIfNeeded(void);
367 static int processCommand(redisClient *c);
368 static void setupSigSegvAction(void);
369 static void rdbRemoveTempFile(pid_t childpid);
370 static size_t stringObjectLen(robj *o);
371 static void processInputBuffer(redisClient *c);
372 static zskiplist *zslCreate(void);
373 static void zslFree(zskiplist *zsl);
374
375 static void authCommand(redisClient *c);
376 static void pingCommand(redisClient *c);
377 static void echoCommand(redisClient *c);
378 static void setCommand(redisClient *c);
379 static void setnxCommand(redisClient *c);
380 static void getCommand(redisClient *c);
381 static void delCommand(redisClient *c);
382 static void existsCommand(redisClient *c);
383 static void incrCommand(redisClient *c);
384 static void decrCommand(redisClient *c);
385 static void incrbyCommand(redisClient *c);
386 static void decrbyCommand(redisClient *c);
387 static void selectCommand(redisClient *c);
388 static void randomkeyCommand(redisClient *c);
389 static void keysCommand(redisClient *c);
390 static void dbsizeCommand(redisClient *c);
391 static void lastsaveCommand(redisClient *c);
392 static void saveCommand(redisClient *c);
393 static void bgsaveCommand(redisClient *c);
394 static void shutdownCommand(redisClient *c);
395 static void moveCommand(redisClient *c);
396 static void renameCommand(redisClient *c);
397 static void renamenxCommand(redisClient *c);
398 static void lpushCommand(redisClient *c);
399 static void rpushCommand(redisClient *c);
400 static void lpopCommand(redisClient *c);
401 static void rpopCommand(redisClient *c);
402 static void llenCommand(redisClient *c);
403 static void lindexCommand(redisClient *c);
404 static void lrangeCommand(redisClient *c);
405 static void ltrimCommand(redisClient *c);
406 static void typeCommand(redisClient *c);
407 static void lsetCommand(redisClient *c);
408 static void saddCommand(redisClient *c);
409 static void sremCommand(redisClient *c);
410 static void smoveCommand(redisClient *c);
411 static void sismemberCommand(redisClient *c);
412 static void scardCommand(redisClient *c);
413 static void spopCommand(redisClient *c);
414 static void srandmemberCommand(redisClient *c);
415 static void sinterCommand(redisClient *c);
416 static void sinterstoreCommand(redisClient *c);
417 static void sunionCommand(redisClient *c);
418 static void sunionstoreCommand(redisClient *c);
419 static void sdiffCommand(redisClient *c);
420 static void sdiffstoreCommand(redisClient *c);
421 static void syncCommand(redisClient *c);
422 static void flushdbCommand(redisClient *c);
423 static void flushallCommand(redisClient *c);
424 static void sortCommand(redisClient *c);
425 static void lremCommand(redisClient *c);
426 static void infoCommand(redisClient *c);
427 static void mgetCommand(redisClient *c);
428 static void monitorCommand(redisClient *c);
429 static void expireCommand(redisClient *c);
430 static void getsetCommand(redisClient *c);
431 static void ttlCommand(redisClient *c);
432 static void slaveofCommand(redisClient *c);
433 static void debugCommand(redisClient *c);
434 static void msetCommand(redisClient *c);
435 static void msetnxCommand(redisClient *c);
436 static void zaddCommand(redisClient *c);
437 static void zrangeCommand(redisClient *c);
438
439 /*================================= Globals ================================= */
440
441 /* Global vars */
442 static struct redisServer server; /* server global state */
443 static struct redisCommand cmdTable[] = {
444 {"get",getCommand,2,REDIS_CMD_INLINE},
445 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
446 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
447 {"del",delCommand,-2,REDIS_CMD_INLINE},
448 {"exists",existsCommand,2,REDIS_CMD_INLINE},
449 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
450 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
451 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
452 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
453 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
454 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
455 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
456 {"llen",llenCommand,2,REDIS_CMD_INLINE},
457 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
458 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
459 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
460 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
461 {"lrem",lremCommand,4,REDIS_CMD_BULK},
462 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
463 {"srem",sremCommand,3,REDIS_CMD_BULK},
464 {"smove",smoveCommand,4,REDIS_CMD_BULK},
465 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
466 {"scard",scardCommand,2,REDIS_CMD_INLINE},
467 {"spop",spopCommand,2,REDIS_CMD_INLINE},
468 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
469 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
470 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
471 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
472 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
473 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
474 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
475 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
476 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
477 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
478 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
479 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
480 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
481 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
482 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
483 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
484 {"select",selectCommand,2,REDIS_CMD_INLINE},
485 {"move",moveCommand,3,REDIS_CMD_INLINE},
486 {"rename",renameCommand,3,REDIS_CMD_INLINE},
487 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
488 {"expire",expireCommand,3,REDIS_CMD_INLINE},
489 {"keys",keysCommand,2,REDIS_CMD_INLINE},
490 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
491 {"auth",authCommand,2,REDIS_CMD_INLINE},
492 {"ping",pingCommand,1,REDIS_CMD_INLINE},
493 {"echo",echoCommand,2,REDIS_CMD_BULK},
494 {"save",saveCommand,1,REDIS_CMD_INLINE},
495 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
496 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
497 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
498 {"type",typeCommand,2,REDIS_CMD_INLINE},
499 {"sync",syncCommand,1,REDIS_CMD_INLINE},
500 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
501 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
502 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
503 {"info",infoCommand,1,REDIS_CMD_INLINE},
504 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
505 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
506 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
507 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
508 {NULL,NULL,0,0}
509 };
510 /*============================ Utility functions ============================ */
511
512 /* Glob-style pattern matching. */
513 int stringmatchlen(const char *pattern, int patternLen,
514 const char *string, int stringLen, int nocase)
515 {
516 while(patternLen) {
517 switch(pattern[0]) {
518 case '*':
519 while (pattern[1] == '*') {
520 pattern++;
521 patternLen--;
522 }
523 if (patternLen == 1)
524 return 1; /* match */
525 while(stringLen) {
526 if (stringmatchlen(pattern+1, patternLen-1,
527 string, stringLen, nocase))
528 return 1; /* match */
529 string++;
530 stringLen--;
531 }
532 return 0; /* no match */
533 break;
534 case '?':
535 if (stringLen == 0)
536 return 0; /* no match */
537 string++;
538 stringLen--;
539 break;
540 case '[':
541 {
542 int not, match;
543
544 pattern++;
545 patternLen--;
546 not = pattern[0] == '^';
547 if (not) {
548 pattern++;
549 patternLen--;
550 }
551 match = 0;
552 while(1) {
553 if (pattern[0] == '\\') {
554 pattern++;
555 patternLen--;
556 if (pattern[0] == string[0])
557 match = 1;
558 } else if (pattern[0] == ']') {
559 break;
560 } else if (patternLen == 0) {
561 pattern--;
562 patternLen++;
563 break;
564 } else if (pattern[1] == '-' && patternLen >= 3) {
565 int start = pattern[0];
566 int end = pattern[2];
567 int c = string[0];
568 if (start > end) {
569 int t = start;
570 start = end;
571 end = t;
572 }
573 if (nocase) {
574 start = tolower(start);
575 end = tolower(end);
576 c = tolower(c);
577 }
578 pattern += 2;
579 patternLen -= 2;
580 if (c >= start && c <= end)
581 match = 1;
582 } else {
583 if (!nocase) {
584 if (pattern[0] == string[0])
585 match = 1;
586 } else {
587 if (tolower((int)pattern[0]) == tolower((int)string[0]))
588 match = 1;
589 }
590 }
591 pattern++;
592 patternLen--;
593 }
594 if (not)
595 match = !match;
596 if (!match)
597 return 0; /* no match */
598 string++;
599 stringLen--;
600 break;
601 }
602 case '\\':
603 if (patternLen >= 2) {
604 pattern++;
605 patternLen--;
606 }
607 /* fall through */
608 default:
609 if (!nocase) {
610 if (pattern[0] != string[0])
611 return 0; /* no match */
612 } else {
613 if (tolower((int)pattern[0]) != tolower((int)string[0]))
614 return 0; /* no match */
615 }
616 string++;
617 stringLen--;
618 break;
619 }
620 pattern++;
621 patternLen--;
622 if (stringLen == 0) {
623 while(*pattern == '*') {
624 pattern++;
625 patternLen--;
626 }
627 break;
628 }
629 }
630 if (patternLen == 0 && stringLen == 0)
631 return 1;
632 return 0;
633 }
634
635 static void redisLog(int level, const char *fmt, ...) {
636 va_list ap;
637 FILE *fp;
638
639 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
640 if (!fp) return;
641
642 va_start(ap, fmt);
643 if (level >= server.verbosity) {
644 char *c = ".-*";
645 char buf[64];
646 time_t now;
647
648 now = time(NULL);
649 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
650 fprintf(fp,"%s %c ",buf,c[level]);
651 vfprintf(fp, fmt, ap);
652 fprintf(fp,"\n");
653 fflush(fp);
654 }
655 va_end(ap);
656
657 if (server.logfile) fclose(fp);
658 }
659
660 /*====================== Hash table type implementation ==================== */
661
662 /* This is an hash table type that uses the SDS dynamic strings libary as
663 * keys and radis objects as values (objects can hold SDS strings,
664 * lists, sets). */
665
666 static void dictVanillaFree(void *privdata, void *val)
667 {
668 DICT_NOTUSED(privdata);
669 zfree(val);
670 }
671
672 static int sdsDictKeyCompare(void *privdata, const void *key1,
673 const void *key2)
674 {
675 int l1,l2;
676 DICT_NOTUSED(privdata);
677
678 l1 = sdslen((sds)key1);
679 l2 = sdslen((sds)key2);
680 if (l1 != l2) return 0;
681 return memcmp(key1, key2, l1) == 0;
682 }
683
684 static void dictRedisObjectDestructor(void *privdata, void *val)
685 {
686 DICT_NOTUSED(privdata);
687
688 decrRefCount(val);
689 }
690
691 static int dictObjKeyCompare(void *privdata, const void *key1,
692 const void *key2)
693 {
694 const robj *o1 = key1, *o2 = key2;
695 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
696 }
697
698 static unsigned int dictObjHash(const void *key) {
699 const robj *o = key;
700 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
701 }
702
703 static int dictEncObjKeyCompare(void *privdata, const void *key1,
704 const void *key2)
705 {
706 const robj *o1 = key1, *o2 = key2;
707
708 if (o1->encoding == REDIS_ENCODING_RAW &&
709 o2->encoding == REDIS_ENCODING_RAW)
710 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
711 else {
712 robj *dec1, *dec2;
713 int cmp;
714
715 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
716 getDecodedObject(o1) : (robj*)o1;
717 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
718 getDecodedObject(o2) : (robj*)o2;
719 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
720 if (dec1 != o1) decrRefCount(dec1);
721 if (dec2 != o2) decrRefCount(dec2);
722 return cmp;
723 }
724 }
725
726 static unsigned int dictEncObjHash(const void *key) {
727 const robj *o = key;
728
729 if (o->encoding == REDIS_ENCODING_RAW)
730 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
731 else {
732 robj *dec = getDecodedObject(o);
733 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
734 decrRefCount(dec);
735 return hash;
736 }
737 }
738
739 static dictType setDictType = {
740 dictEncObjHash, /* hash function */
741 NULL, /* key dup */
742 NULL, /* val dup */
743 dictEncObjKeyCompare, /* key compare */
744 dictRedisObjectDestructor, /* key destructor */
745 NULL /* val destructor */
746 };
747
748 static dictType zsetDictType = {
749 dictEncObjHash, /* hash function */
750 NULL, /* key dup */
751 NULL, /* val dup */
752 dictEncObjKeyCompare, /* key compare */
753 dictRedisObjectDestructor, /* key destructor */
754 dictVanillaFree /* val destructor */
755 };
756
757 static dictType hashDictType = {
758 dictObjHash, /* hash function */
759 NULL, /* key dup */
760 NULL, /* val dup */
761 dictObjKeyCompare, /* key compare */
762 dictRedisObjectDestructor, /* key destructor */
763 dictRedisObjectDestructor /* val destructor */
764 };
765
766 /* ========================= Random utility functions ======================= */
767
768 /* Redis generally does not try to recover from out of memory conditions
769 * when allocating objects or strings, it is not clear if it will be possible
770 * to report this condition to the client since the networking layer itself
771 * is based on heap allocation for send buffers, so we simply abort.
772 * At least the code will be simpler to read... */
773 static void oom(const char *msg) {
774 fprintf(stderr, "%s: Out of memory\n",msg);
775 fflush(stderr);
776 sleep(1);
777 abort();
778 }
779
780 /* ====================== Redis server networking stuff ===================== */
781 static void closeTimedoutClients(void) {
782 redisClient *c;
783 listNode *ln;
784 time_t now = time(NULL);
785
786 listRewind(server.clients);
787 while ((ln = listYield(server.clients)) != NULL) {
788 c = listNodeValue(ln);
789 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
790 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
791 (now - c->lastinteraction > server.maxidletime)) {
792 redisLog(REDIS_DEBUG,"Closing idle client");
793 freeClient(c);
794 }
795 }
796 }
797
798 static int htNeedsResize(dict *dict) {
799 long long size, used;
800
801 size = dictSlots(dict);
802 used = dictSize(dict);
803 return (size && used && size > DICT_HT_INITIAL_SIZE &&
804 (used*100/size < REDIS_HT_MINFILL));
805 }
806
807 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
808 * we resize the hash table to save memory */
809 static void tryResizeHashTables(void) {
810 int j;
811
812 for (j = 0; j < server.dbnum; j++) {
813 if (htNeedsResize(server.db[j].dict)) {
814 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
815 dictResize(server.db[j].dict);
816 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
817 }
818 if (htNeedsResize(server.db[j].expires))
819 dictResize(server.db[j].expires);
820 }
821 }
822
823 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
824 int j, loops = server.cronloops++;
825 REDIS_NOTUSED(eventLoop);
826 REDIS_NOTUSED(id);
827 REDIS_NOTUSED(clientData);
828
829 /* Update the global state with the amount of used memory */
830 server.usedmemory = zmalloc_used_memory();
831
832 /* Show some info about non-empty databases */
833 for (j = 0; j < server.dbnum; j++) {
834 long long size, used, vkeys;
835
836 size = dictSlots(server.db[j].dict);
837 used = dictSize(server.db[j].dict);
838 vkeys = dictSize(server.db[j].expires);
839 if (!(loops % 5) && (used || vkeys)) {
840 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
841 /* dictPrintStats(server.dict); */
842 }
843 }
844
845 /* We don't want to resize the hash tables while a bacground saving
846 * is in progress: the saving child is created using fork() that is
847 * implemented with a copy-on-write semantic in most modern systems, so
848 * if we resize the HT while there is the saving child at work actually
849 * a lot of memory movements in the parent will cause a lot of pages
850 * copied. */
851 if (!server.bgsaveinprogress) tryResizeHashTables();
852
853 /* Show information about connected clients */
854 if (!(loops % 5)) {
855 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
856 listLength(server.clients)-listLength(server.slaves),
857 listLength(server.slaves),
858 server.usedmemory,
859 dictSize(server.sharingpool));
860 }
861
862 /* Close connections of timedout clients */
863 if (server.maxidletime && !(loops % 10))
864 closeTimedoutClients();
865
866 /* Check if a background saving in progress terminated */
867 if (server.bgsaveinprogress) {
868 int statloc;
869 if (wait4(-1,&statloc,WNOHANG,NULL)) {
870 int exitcode = WEXITSTATUS(statloc);
871 int bysignal = WIFSIGNALED(statloc);
872
873 if (!bysignal && exitcode == 0) {
874 redisLog(REDIS_NOTICE,
875 "Background saving terminated with success");
876 server.dirty = 0;
877 server.lastsave = time(NULL);
878 } else if (!bysignal && exitcode != 0) {
879 redisLog(REDIS_WARNING, "Background saving error");
880 } else {
881 redisLog(REDIS_WARNING,
882 "Background saving terminated by signal");
883 rdbRemoveTempFile(server.bgsavechildpid);
884 }
885 server.bgsaveinprogress = 0;
886 server.bgsavechildpid = -1;
887 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
888 }
889 } else {
890 /* If there is not a background saving in progress check if
891 * we have to save now */
892 time_t now = time(NULL);
893 for (j = 0; j < server.saveparamslen; j++) {
894 struct saveparam *sp = server.saveparams+j;
895
896 if (server.dirty >= sp->changes &&
897 now-server.lastsave > sp->seconds) {
898 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
899 sp->changes, sp->seconds);
900 rdbSaveBackground(server.dbfilename);
901 break;
902 }
903 }
904 }
905
906 /* Try to expire a few timed out keys */
907 for (j = 0; j < server.dbnum; j++) {
908 redisDb *db = server.db+j;
909 int num = dictSize(db->expires);
910
911 if (num) {
912 time_t now = time(NULL);
913
914 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
915 num = REDIS_EXPIRELOOKUPS_PER_CRON;
916 while (num--) {
917 dictEntry *de;
918 time_t t;
919
920 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
921 t = (time_t) dictGetEntryVal(de);
922 if (now > t) {
923 deleteKey(db,dictGetEntryKey(de));
924 }
925 }
926 }
927 }
928
929 /* Check if we should connect to a MASTER */
930 if (server.replstate == REDIS_REPL_CONNECT) {
931 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
932 if (syncWithMaster() == REDIS_OK) {
933 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
934 }
935 }
936 return 1000;
937 }
938
939 static void createSharedObjects(void) {
940 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
941 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
942 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
943 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
944 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
945 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
946 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
947 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
948 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
949 /* no such key */
950 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
951 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
952 "-ERR Operation against a key holding the wrong kind of value\r\n"));
953 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
954 "-ERR no such key\r\n"));
955 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
956 "-ERR syntax error\r\n"));
957 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
958 "-ERR source and destination objects are the same\r\n"));
959 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
960 "-ERR index out of range\r\n"));
961 shared.space = createObject(REDIS_STRING,sdsnew(" "));
962 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
963 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
964 shared.select0 = createStringObject("select 0\r\n",10);
965 shared.select1 = createStringObject("select 1\r\n",10);
966 shared.select2 = createStringObject("select 2\r\n",10);
967 shared.select3 = createStringObject("select 3\r\n",10);
968 shared.select4 = createStringObject("select 4\r\n",10);
969 shared.select5 = createStringObject("select 5\r\n",10);
970 shared.select6 = createStringObject("select 6\r\n",10);
971 shared.select7 = createStringObject("select 7\r\n",10);
972 shared.select8 = createStringObject("select 8\r\n",10);
973 shared.select9 = createStringObject("select 9\r\n",10);
974 }
975
976 static void appendServerSaveParams(time_t seconds, int changes) {
977 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
978 server.saveparams[server.saveparamslen].seconds = seconds;
979 server.saveparams[server.saveparamslen].changes = changes;
980 server.saveparamslen++;
981 }
982
983 static void ResetServerSaveParams() {
984 zfree(server.saveparams);
985 server.saveparams = NULL;
986 server.saveparamslen = 0;
987 }
988
989 static void initServerConfig() {
990 server.dbnum = REDIS_DEFAULT_DBNUM;
991 server.port = REDIS_SERVERPORT;
992 server.verbosity = REDIS_DEBUG;
993 server.maxidletime = REDIS_MAXIDLETIME;
994 server.saveparams = NULL;
995 server.logfile = NULL; /* NULL = log on standard output */
996 server.bindaddr = NULL;
997 server.glueoutputbuf = 1;
998 server.daemonize = 0;
999 server.pidfile = "/var/run/redis.pid";
1000 server.dbfilename = "dump.rdb";
1001 server.requirepass = NULL;
1002 server.shareobjects = 0;
1003 server.sharingpoolsize = 1024;
1004 server.maxclients = 0;
1005 server.maxmemory = 0;
1006 ResetServerSaveParams();
1007
1008 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1009 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1010 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1011 /* Replication related */
1012 server.isslave = 0;
1013 server.masterhost = NULL;
1014 server.masterport = 6379;
1015 server.master = NULL;
1016 server.replstate = REDIS_REPL_NONE;
1017 }
1018
1019 static void initServer() {
1020 int j;
1021
1022 signal(SIGHUP, SIG_IGN);
1023 signal(SIGPIPE, SIG_IGN);
1024 setupSigSegvAction();
1025
1026 server.clients = listCreate();
1027 server.slaves = listCreate();
1028 server.monitors = listCreate();
1029 server.objfreelist = listCreate();
1030 createSharedObjects();
1031 server.el = aeCreateEventLoop();
1032 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1033 server.sharingpool = dictCreate(&setDictType,NULL);
1034 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1035 if (server.fd == -1) {
1036 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1037 exit(1);
1038 }
1039 for (j = 0; j < server.dbnum; j++) {
1040 server.db[j].dict = dictCreate(&hashDictType,NULL);
1041 server.db[j].expires = dictCreate(&setDictType,NULL);
1042 server.db[j].id = j;
1043 }
1044 server.cronloops = 0;
1045 server.bgsaveinprogress = 0;
1046 server.bgsavechildpid = -1;
1047 server.lastsave = time(NULL);
1048 server.dirty = 0;
1049 server.usedmemory = 0;
1050 server.stat_numcommands = 0;
1051 server.stat_numconnections = 0;
1052 server.stat_starttime = time(NULL);
1053 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1054 }
1055
1056 /* Empty the whole database */
1057 static long long emptyDb() {
1058 int j;
1059 long long removed = 0;
1060
1061 for (j = 0; j < server.dbnum; j++) {
1062 removed += dictSize(server.db[j].dict);
1063 dictEmpty(server.db[j].dict);
1064 dictEmpty(server.db[j].expires);
1065 }
1066 return removed;
1067 }
1068
1069 static int yesnotoi(char *s) {
1070 if (!strcasecmp(s,"yes")) return 1;
1071 else if (!strcasecmp(s,"no")) return 0;
1072 else return -1;
1073 }
1074
1075 /* I agree, this is a very rudimental way to load a configuration...
1076 will improve later if the config gets more complex */
1077 static void loadServerConfig(char *filename) {
1078 FILE *fp;
1079 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1080 int linenum = 0;
1081 sds line = NULL;
1082
1083 if (filename[0] == '-' && filename[1] == '\0')
1084 fp = stdin;
1085 else {
1086 if ((fp = fopen(filename,"r")) == NULL) {
1087 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1088 exit(1);
1089 }
1090 }
1091
1092 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1093 sds *argv;
1094 int argc, j;
1095
1096 linenum++;
1097 line = sdsnew(buf);
1098 line = sdstrim(line," \t\r\n");
1099
1100 /* Skip comments and blank lines*/
1101 if (line[0] == '#' || line[0] == '\0') {
1102 sdsfree(line);
1103 continue;
1104 }
1105
1106 /* Split into arguments */
1107 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1108 sdstolower(argv[0]);
1109
1110 /* Execute config directives */
1111 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1112 server.maxidletime = atoi(argv[1]);
1113 if (server.maxidletime < 0) {
1114 err = "Invalid timeout value"; goto loaderr;
1115 }
1116 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1117 server.port = atoi(argv[1]);
1118 if (server.port < 1 || server.port > 65535) {
1119 err = "Invalid port"; goto loaderr;
1120 }
1121 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1122 server.bindaddr = zstrdup(argv[1]);
1123 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1124 int seconds = atoi(argv[1]);
1125 int changes = atoi(argv[2]);
1126 if (seconds < 1 || changes < 0) {
1127 err = "Invalid save parameters"; goto loaderr;
1128 }
1129 appendServerSaveParams(seconds,changes);
1130 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1131 if (chdir(argv[1]) == -1) {
1132 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1133 argv[1], strerror(errno));
1134 exit(1);
1135 }
1136 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1137 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1138 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1139 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1140 else {
1141 err = "Invalid log level. Must be one of debug, notice, warning";
1142 goto loaderr;
1143 }
1144 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1145 FILE *logfp;
1146
1147 server.logfile = zstrdup(argv[1]);
1148 if (!strcasecmp(server.logfile,"stdout")) {
1149 zfree(server.logfile);
1150 server.logfile = NULL;
1151 }
1152 if (server.logfile) {
1153 /* Test if we are able to open the file. The server will not
1154 * be able to abort just for this problem later... */
1155 logfp = fopen(server.logfile,"a");
1156 if (logfp == NULL) {
1157 err = sdscatprintf(sdsempty(),
1158 "Can't open the log file: %s", strerror(errno));
1159 goto loaderr;
1160 }
1161 fclose(logfp);
1162 }
1163 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1164 server.dbnum = atoi(argv[1]);
1165 if (server.dbnum < 1) {
1166 err = "Invalid number of databases"; goto loaderr;
1167 }
1168 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1169 server.maxclients = atoi(argv[1]);
1170 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1171 server.maxmemory = strtoll(argv[1], NULL, 10);
1172 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1173 server.masterhost = sdsnew(argv[1]);
1174 server.masterport = atoi(argv[2]);
1175 server.replstate = REDIS_REPL_CONNECT;
1176 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1177 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1178 err = "argument must be 'yes' or 'no'"; goto loaderr;
1179 }
1180 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1181 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1182 err = "argument must be 'yes' or 'no'"; goto loaderr;
1183 }
1184 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1185 server.sharingpoolsize = atoi(argv[1]);
1186 if (server.sharingpoolsize < 1) {
1187 err = "invalid object sharing pool size"; goto loaderr;
1188 }
1189 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1190 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1191 err = "argument must be 'yes' or 'no'"; goto loaderr;
1192 }
1193 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1194 server.requirepass = zstrdup(argv[1]);
1195 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1196 server.pidfile = zstrdup(argv[1]);
1197 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1198 server.dbfilename = zstrdup(argv[1]);
1199 } else {
1200 err = "Bad directive or wrong number of arguments"; goto loaderr;
1201 }
1202 for (j = 0; j < argc; j++)
1203 sdsfree(argv[j]);
1204 zfree(argv);
1205 sdsfree(line);
1206 }
1207 if (fp != stdin) fclose(fp);
1208 return;
1209
1210 loaderr:
1211 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1212 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1213 fprintf(stderr, ">>> '%s'\n", line);
1214 fprintf(stderr, "%s\n", err);
1215 exit(1);
1216 }
1217
1218 static void freeClientArgv(redisClient *c) {
1219 int j;
1220
1221 for (j = 0; j < c->argc; j++)
1222 decrRefCount(c->argv[j]);
1223 for (j = 0; j < c->mbargc; j++)
1224 decrRefCount(c->mbargv[j]);
1225 c->argc = 0;
1226 c->mbargc = 0;
1227 }
1228
1229 static void freeClient(redisClient *c) {
1230 listNode *ln;
1231
1232 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1233 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1234 sdsfree(c->querybuf);
1235 listRelease(c->reply);
1236 freeClientArgv(c);
1237 close(c->fd);
1238 ln = listSearchKey(server.clients,c);
1239 assert(ln != NULL);
1240 listDelNode(server.clients,ln);
1241 if (c->flags & REDIS_SLAVE) {
1242 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1243 close(c->repldbfd);
1244 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1245 ln = listSearchKey(l,c);
1246 assert(ln != NULL);
1247 listDelNode(l,ln);
1248 }
1249 if (c->flags & REDIS_MASTER) {
1250 server.master = NULL;
1251 server.replstate = REDIS_REPL_CONNECT;
1252 }
1253 zfree(c->argv);
1254 zfree(c->mbargv);
1255 zfree(c);
1256 }
1257
1258 static void glueReplyBuffersIfNeeded(redisClient *c) {
1259 int totlen = 0;
1260 listNode *ln;
1261 robj *o;
1262
1263 listRewind(c->reply);
1264 while((ln = listYield(c->reply))) {
1265 o = ln->value;
1266 totlen += sdslen(o->ptr);
1267 /* This optimization makes more sense if we don't have to copy
1268 * too much data */
1269 if (totlen > 1024) return;
1270 }
1271 if (totlen > 0) {
1272 char buf[1024];
1273 int copylen = 0;
1274
1275 listRewind(c->reply);
1276 while((ln = listYield(c->reply))) {
1277 o = ln->value;
1278 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1279 copylen += sdslen(o->ptr);
1280 listDelNode(c->reply,ln);
1281 }
1282 /* Now the output buffer is empty, add the new single element */
1283 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1284 listAddNodeTail(c->reply,o);
1285 }
1286 }
1287
1288 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1289 redisClient *c = privdata;
1290 int nwritten = 0, totwritten = 0, objlen;
1291 robj *o;
1292 REDIS_NOTUSED(el);
1293 REDIS_NOTUSED(mask);
1294
1295 if (server.glueoutputbuf && listLength(c->reply) > 1)
1296 glueReplyBuffersIfNeeded(c);
1297 while(listLength(c->reply)) {
1298 o = listNodeValue(listFirst(c->reply));
1299 objlen = sdslen(o->ptr);
1300
1301 if (objlen == 0) {
1302 listDelNode(c->reply,listFirst(c->reply));
1303 continue;
1304 }
1305
1306 if (c->flags & REDIS_MASTER) {
1307 /* Don't reply to a master */
1308 nwritten = objlen - c->sentlen;
1309 } else {
1310 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1311 if (nwritten <= 0) break;
1312 }
1313 c->sentlen += nwritten;
1314 totwritten += nwritten;
1315 /* If we fully sent the object on head go to the next one */
1316 if (c->sentlen == objlen) {
1317 listDelNode(c->reply,listFirst(c->reply));
1318 c->sentlen = 0;
1319 }
1320 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1321 * bytes, in a single threaded server it's a good idea to server
1322 * other clients as well, even if a very large request comes from
1323 * super fast link that is always able to accept data (in real world
1324 * terms think to 'KEYS *' against the loopback interfae) */
1325 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1326 }
1327 if (nwritten == -1) {
1328 if (errno == EAGAIN) {
1329 nwritten = 0;
1330 } else {
1331 redisLog(REDIS_DEBUG,
1332 "Error writing to client: %s", strerror(errno));
1333 freeClient(c);
1334 return;
1335 }
1336 }
1337 if (totwritten > 0) c->lastinteraction = time(NULL);
1338 if (listLength(c->reply) == 0) {
1339 c->sentlen = 0;
1340 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1341 }
1342 }
1343
1344 static struct redisCommand *lookupCommand(char *name) {
1345 int j = 0;
1346 while(cmdTable[j].name != NULL) {
1347 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1348 j++;
1349 }
1350 return NULL;
1351 }
1352
1353 /* resetClient prepare the client to process the next command */
1354 static void resetClient(redisClient *c) {
1355 freeClientArgv(c);
1356 c->bulklen = -1;
1357 c->multibulk = 0;
1358 }
1359
1360 /* If this function gets called we already read a whole
1361 * command, argments are in the client argv/argc fields.
1362 * processCommand() execute the command or prepare the
1363 * server for a bulk read from the client.
1364 *
1365 * If 1 is returned the client is still alive and valid and
1366 * and other operations can be performed by the caller. Otherwise
1367 * if 0 is returned the client was destroied (i.e. after QUIT). */
1368 static int processCommand(redisClient *c) {
1369 struct redisCommand *cmd;
1370 long long dirty;
1371
1372 /* Free some memory if needed (maxmemory setting) */
1373 if (server.maxmemory) freeMemoryIfNeeded();
1374
1375 /* Handle the multi bulk command type. This is an alternative protocol
1376 * supported by Redis in order to receive commands that are composed of
1377 * multiple binary-safe "bulk" arguments. The latency of processing is
1378 * a bit higher but this allows things like multi-sets, so if this
1379 * protocol is used only for MSET and similar commands this is a big win. */
1380 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1381 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1382 if (c->multibulk <= 0) {
1383 resetClient(c);
1384 return 1;
1385 } else {
1386 decrRefCount(c->argv[c->argc-1]);
1387 c->argc--;
1388 return 1;
1389 }
1390 } else if (c->multibulk) {
1391 if (c->bulklen == -1) {
1392 if (((char*)c->argv[0]->ptr)[0] != '$') {
1393 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1394 resetClient(c);
1395 return 1;
1396 } else {
1397 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1398 decrRefCount(c->argv[0]);
1399 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1400 c->argc--;
1401 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1402 resetClient(c);
1403 return 1;
1404 }
1405 c->argc--;
1406 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1407 return 1;
1408 }
1409 } else {
1410 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1411 c->mbargv[c->mbargc] = c->argv[0];
1412 c->mbargc++;
1413 c->argc--;
1414 c->multibulk--;
1415 if (c->multibulk == 0) {
1416 robj **auxargv;
1417 int auxargc;
1418
1419 /* Here we need to swap the multi-bulk argc/argv with the
1420 * normal argc/argv of the client structure. */
1421 auxargv = c->argv;
1422 c->argv = c->mbargv;
1423 c->mbargv = auxargv;
1424
1425 auxargc = c->argc;
1426 c->argc = c->mbargc;
1427 c->mbargc = auxargc;
1428
1429 /* We need to set bulklen to something different than -1
1430 * in order for the code below to process the command without
1431 * to try to read the last argument of a bulk command as
1432 * a special argument. */
1433 c->bulklen = 0;
1434 /* continue below and process the command */
1435 } else {
1436 c->bulklen = -1;
1437 return 1;
1438 }
1439 }
1440 }
1441 /* -- end of multi bulk commands processing -- */
1442
1443 /* The QUIT command is handled as a special case. Normal command
1444 * procs are unable to close the client connection safely */
1445 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1446 freeClient(c);
1447 return 0;
1448 }
1449 cmd = lookupCommand(c->argv[0]->ptr);
1450 if (!cmd) {
1451 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1452 resetClient(c);
1453 return 1;
1454 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1455 (c->argc < -cmd->arity)) {
1456 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1457 resetClient(c);
1458 return 1;
1459 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1460 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1461 resetClient(c);
1462 return 1;
1463 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1464 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1465
1466 decrRefCount(c->argv[c->argc-1]);
1467 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1468 c->argc--;
1469 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1470 resetClient(c);
1471 return 1;
1472 }
1473 c->argc--;
1474 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1475 /* It is possible that the bulk read is already in the
1476 * buffer. Check this condition and handle it accordingly.
1477 * This is just a fast path, alternative to call processInputBuffer().
1478 * It's a good idea since the code is small and this condition
1479 * happens most of the times. */
1480 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1481 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1482 c->argc++;
1483 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1484 } else {
1485 return 1;
1486 }
1487 }
1488 /* Let's try to share objects on the command arguments vector */
1489 if (server.shareobjects) {
1490 int j;
1491 for(j = 1; j < c->argc; j++)
1492 c->argv[j] = tryObjectSharing(c->argv[j]);
1493 }
1494 /* Let's try to encode the bulk object to save space. */
1495 if (cmd->flags & REDIS_CMD_BULK)
1496 tryObjectEncoding(c->argv[c->argc-1]);
1497
1498 /* Check if the user is authenticated */
1499 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1500 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1501 resetClient(c);
1502 return 1;
1503 }
1504
1505 /* Exec the command */
1506 dirty = server.dirty;
1507 cmd->proc(c);
1508 if (server.dirty-dirty != 0 && listLength(server.slaves))
1509 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1510 if (listLength(server.monitors))
1511 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1512 server.stat_numcommands++;
1513
1514 /* Prepare the client for the next command */
1515 if (c->flags & REDIS_CLOSE) {
1516 freeClient(c);
1517 return 0;
1518 }
1519 resetClient(c);
1520 return 1;
1521 }
1522
1523 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1524 listNode *ln;
1525 int outc = 0, j;
1526 robj **outv;
1527 /* (args*2)+1 is enough room for args, spaces, newlines */
1528 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1529
1530 if (argc <= REDIS_STATIC_ARGS) {
1531 outv = static_outv;
1532 } else {
1533 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1534 }
1535
1536 for (j = 0; j < argc; j++) {
1537 if (j != 0) outv[outc++] = shared.space;
1538 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1539 robj *lenobj;
1540
1541 lenobj = createObject(REDIS_STRING,
1542 sdscatprintf(sdsempty(),"%d\r\n",
1543 stringObjectLen(argv[j])));
1544 lenobj->refcount = 0;
1545 outv[outc++] = lenobj;
1546 }
1547 outv[outc++] = argv[j];
1548 }
1549 outv[outc++] = shared.crlf;
1550
1551 /* Increment all the refcounts at start and decrement at end in order to
1552 * be sure to free objects if there is no slave in a replication state
1553 * able to be feed with commands */
1554 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1555 listRewind(slaves);
1556 while((ln = listYield(slaves))) {
1557 redisClient *slave = ln->value;
1558
1559 /* Don't feed slaves that are still waiting for BGSAVE to start */
1560 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1561
1562 /* Feed all the other slaves, MONITORs and so on */
1563 if (slave->slaveseldb != dictid) {
1564 robj *selectcmd;
1565
1566 switch(dictid) {
1567 case 0: selectcmd = shared.select0; break;
1568 case 1: selectcmd = shared.select1; break;
1569 case 2: selectcmd = shared.select2; break;
1570 case 3: selectcmd = shared.select3; break;
1571 case 4: selectcmd = shared.select4; break;
1572 case 5: selectcmd = shared.select5; break;
1573 case 6: selectcmd = shared.select6; break;
1574 case 7: selectcmd = shared.select7; break;
1575 case 8: selectcmd = shared.select8; break;
1576 case 9: selectcmd = shared.select9; break;
1577 default:
1578 selectcmd = createObject(REDIS_STRING,
1579 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1580 selectcmd->refcount = 0;
1581 break;
1582 }
1583 addReply(slave,selectcmd);
1584 slave->slaveseldb = dictid;
1585 }
1586 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1587 }
1588 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1589 if (outv != static_outv) zfree(outv);
1590 }
1591
1592 static void processInputBuffer(redisClient *c) {
1593 again:
1594 if (c->bulklen == -1) {
1595 /* Read the first line of the query */
1596 char *p = strchr(c->querybuf,'\n');
1597 size_t querylen;
1598
1599 if (p) {
1600 sds query, *argv;
1601 int argc, j;
1602
1603 query = c->querybuf;
1604 c->querybuf = sdsempty();
1605 querylen = 1+(p-(query));
1606 if (sdslen(query) > querylen) {
1607 /* leave data after the first line of the query in the buffer */
1608 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1609 }
1610 *p = '\0'; /* remove "\n" */
1611 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1612 sdsupdatelen(query);
1613
1614 /* Now we can split the query in arguments */
1615 if (sdslen(query) == 0) {
1616 /* Ignore empty query */
1617 sdsfree(query);
1618 return;
1619 }
1620 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1621 sdsfree(query);
1622
1623 if (c->argv) zfree(c->argv);
1624 c->argv = zmalloc(sizeof(robj*)*argc);
1625
1626 for (j = 0; j < argc; j++) {
1627 if (sdslen(argv[j])) {
1628 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1629 c->argc++;
1630 } else {
1631 sdsfree(argv[j]);
1632 }
1633 }
1634 zfree(argv);
1635 /* Execute the command. If the client is still valid
1636 * after processCommand() return and there is something
1637 * on the query buffer try to process the next command. */
1638 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1639 return;
1640 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1641 redisLog(REDIS_DEBUG, "Client protocol error");
1642 freeClient(c);
1643 return;
1644 }
1645 } else {
1646 /* Bulk read handling. Note that if we are at this point
1647 the client already sent a command terminated with a newline,
1648 we are reading the bulk data that is actually the last
1649 argument of the command. */
1650 int qbl = sdslen(c->querybuf);
1651
1652 if (c->bulklen <= qbl) {
1653 /* Copy everything but the final CRLF as final argument */
1654 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1655 c->argc++;
1656 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1657 /* Process the command. If the client is still valid after
1658 * the processing and there is more data in the buffer
1659 * try to parse it. */
1660 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1661 return;
1662 }
1663 }
1664 }
1665
1666 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1667 redisClient *c = (redisClient*) privdata;
1668 char buf[REDIS_IOBUF_LEN];
1669 int nread;
1670 REDIS_NOTUSED(el);
1671 REDIS_NOTUSED(mask);
1672
1673 nread = read(fd, buf, REDIS_IOBUF_LEN);
1674 if (nread == -1) {
1675 if (errno == EAGAIN) {
1676 nread = 0;
1677 } else {
1678 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1679 freeClient(c);
1680 return;
1681 }
1682 } else if (nread == 0) {
1683 redisLog(REDIS_DEBUG, "Client closed connection");
1684 freeClient(c);
1685 return;
1686 }
1687 if (nread) {
1688 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1689 c->lastinteraction = time(NULL);
1690 } else {
1691 return;
1692 }
1693 processInputBuffer(c);
1694 }
1695
1696 static int selectDb(redisClient *c, int id) {
1697 if (id < 0 || id >= server.dbnum)
1698 return REDIS_ERR;
1699 c->db = &server.db[id];
1700 return REDIS_OK;
1701 }
1702
1703 static void *dupClientReplyValue(void *o) {
1704 incrRefCount((robj*)o);
1705 return 0;
1706 }
1707
1708 static redisClient *createClient(int fd) {
1709 redisClient *c = zmalloc(sizeof(*c));
1710
1711 anetNonBlock(NULL,fd);
1712 anetTcpNoDelay(NULL,fd);
1713 if (!c) return NULL;
1714 selectDb(c,0);
1715 c->fd = fd;
1716 c->querybuf = sdsempty();
1717 c->argc = 0;
1718 c->argv = NULL;
1719 c->bulklen = -1;
1720 c->multibulk = 0;
1721 c->mbargc = 0;
1722 c->mbargv = NULL;
1723 c->sentlen = 0;
1724 c->flags = 0;
1725 c->lastinteraction = time(NULL);
1726 c->authenticated = 0;
1727 c->replstate = REDIS_REPL_NONE;
1728 c->reply = listCreate();
1729 listSetFreeMethod(c->reply,decrRefCount);
1730 listSetDupMethod(c->reply,dupClientReplyValue);
1731 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1732 readQueryFromClient, c, NULL) == AE_ERR) {
1733 freeClient(c);
1734 return NULL;
1735 }
1736 listAddNodeTail(server.clients,c);
1737 return c;
1738 }
1739
1740 static void addReply(redisClient *c, robj *obj) {
1741 if (listLength(c->reply) == 0 &&
1742 (c->replstate == REDIS_REPL_NONE ||
1743 c->replstate == REDIS_REPL_ONLINE) &&
1744 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1745 sendReplyToClient, c, NULL) == AE_ERR) return;
1746 if (obj->encoding != REDIS_ENCODING_RAW) {
1747 obj = getDecodedObject(obj);
1748 } else {
1749 incrRefCount(obj);
1750 }
1751 listAddNodeTail(c->reply,obj);
1752 }
1753
1754 static void addReplySds(redisClient *c, sds s) {
1755 robj *o = createObject(REDIS_STRING,s);
1756 addReply(c,o);
1757 decrRefCount(o);
1758 }
1759
1760 static void addReplyBulkLen(redisClient *c, robj *obj) {
1761 size_t len;
1762
1763 if (obj->encoding == REDIS_ENCODING_RAW) {
1764 len = sdslen(obj->ptr);
1765 } else {
1766 long n = (long)obj->ptr;
1767
1768 len = 1;
1769 if (n < 0) {
1770 len++;
1771 n = -n;
1772 }
1773 while((n = n/10) != 0) {
1774 len++;
1775 }
1776 }
1777 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1778 }
1779
1780 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1781 int cport, cfd;
1782 char cip[128];
1783 redisClient *c;
1784 REDIS_NOTUSED(el);
1785 REDIS_NOTUSED(mask);
1786 REDIS_NOTUSED(privdata);
1787
1788 cfd = anetAccept(server.neterr, fd, cip, &cport);
1789 if (cfd == AE_ERR) {
1790 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1791 return;
1792 }
1793 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1794 if ((c = createClient(cfd)) == NULL) {
1795 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1796 close(cfd); /* May be already closed, just ingore errors */
1797 return;
1798 }
1799 /* If maxclient directive is set and this is one client more... close the
1800 * connection. Note that we create the client instead to check before
1801 * for this condition, since now the socket is already set in nonblocking
1802 * mode and we can send an error for free using the Kernel I/O */
1803 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1804 char *err = "-ERR max number of clients reached\r\n";
1805
1806 /* That's a best effort error message, don't check write errors */
1807 (void) write(c->fd,err,strlen(err));
1808 freeClient(c);
1809 return;
1810 }
1811 server.stat_numconnections++;
1812 }
1813
1814 /* ======================= Redis objects implementation ===================== */
1815
1816 static robj *createObject(int type, void *ptr) {
1817 robj *o;
1818
1819 if (listLength(server.objfreelist)) {
1820 listNode *head = listFirst(server.objfreelist);
1821 o = listNodeValue(head);
1822 listDelNode(server.objfreelist,head);
1823 } else {
1824 o = zmalloc(sizeof(*o));
1825 }
1826 o->type = type;
1827 o->encoding = REDIS_ENCODING_RAW;
1828 o->ptr = ptr;
1829 o->refcount = 1;
1830 return o;
1831 }
1832
1833 static robj *createStringObject(char *ptr, size_t len) {
1834 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1835 }
1836
1837 static robj *createListObject(void) {
1838 list *l = listCreate();
1839
1840 listSetFreeMethod(l,decrRefCount);
1841 return createObject(REDIS_LIST,l);
1842 }
1843
1844 static robj *createSetObject(void) {
1845 dict *d = dictCreate(&setDictType,NULL);
1846 return createObject(REDIS_SET,d);
1847 }
1848
1849 static robj *createZsetObject(void) {
1850 zset *zs = zmalloc(sizeof(*zs));
1851
1852 zs->dict = dictCreate(&zsetDictType,NULL);
1853 zs->zsl = zslCreate();
1854 return createObject(REDIS_ZSET,zs);
1855 }
1856
1857 static void freeStringObject(robj *o) {
1858 if (o->encoding == REDIS_ENCODING_RAW) {
1859 sdsfree(o->ptr);
1860 }
1861 }
1862
1863 static void freeListObject(robj *o) {
1864 listRelease((list*) o->ptr);
1865 }
1866
1867 static void freeSetObject(robj *o) {
1868 dictRelease((dict*) o->ptr);
1869 }
1870
1871 static void freeZsetObject(robj *o) {
1872 zset *zs = o->ptr;
1873
1874 dictRelease(zs->dict);
1875 zslFree(zs->zsl);
1876 zfree(zs);
1877 }
1878
1879 static void freeHashObject(robj *o) {
1880 dictRelease((dict*) o->ptr);
1881 }
1882
1883 static void incrRefCount(robj *o) {
1884 o->refcount++;
1885 #ifdef DEBUG_REFCOUNT
1886 if (o->type == REDIS_STRING)
1887 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1888 #endif
1889 }
1890
1891 static void decrRefCount(void *obj) {
1892 robj *o = obj;
1893
1894 #ifdef DEBUG_REFCOUNT
1895 if (o->type == REDIS_STRING)
1896 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1897 #endif
1898 if (--(o->refcount) == 0) {
1899 switch(o->type) {
1900 case REDIS_STRING: freeStringObject(o); break;
1901 case REDIS_LIST: freeListObject(o); break;
1902 case REDIS_SET: freeSetObject(o); break;
1903 case REDIS_ZSET: freeZsetObject(o); break;
1904 case REDIS_HASH: freeHashObject(o); break;
1905 default: assert(0 != 0); break;
1906 }
1907 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1908 !listAddNodeHead(server.objfreelist,o))
1909 zfree(o);
1910 }
1911 }
1912
1913 static robj *lookupKey(redisDb *db, robj *key) {
1914 dictEntry *de = dictFind(db->dict,key);
1915 return de ? dictGetEntryVal(de) : NULL;
1916 }
1917
1918 static robj *lookupKeyRead(redisDb *db, robj *key) {
1919 expireIfNeeded(db,key);
1920 return lookupKey(db,key);
1921 }
1922
1923 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1924 deleteIfVolatile(db,key);
1925 return lookupKey(db,key);
1926 }
1927
1928 static int deleteKey(redisDb *db, robj *key) {
1929 int retval;
1930
1931 /* We need to protect key from destruction: after the first dictDelete()
1932 * it may happen that 'key' is no longer valid if we don't increment
1933 * it's count. This may happen when we get the object reference directly
1934 * from the hash table with dictRandomKey() or dict iterators */
1935 incrRefCount(key);
1936 if (dictSize(db->expires)) dictDelete(db->expires,key);
1937 retval = dictDelete(db->dict,key);
1938 decrRefCount(key);
1939
1940 return retval == DICT_OK;
1941 }
1942
1943 /* Try to share an object against the shared objects pool */
1944 static robj *tryObjectSharing(robj *o) {
1945 struct dictEntry *de;
1946 unsigned long c;
1947
1948 if (o == NULL || server.shareobjects == 0) return o;
1949
1950 assert(o->type == REDIS_STRING);
1951 de = dictFind(server.sharingpool,o);
1952 if (de) {
1953 robj *shared = dictGetEntryKey(de);
1954
1955 c = ((unsigned long) dictGetEntryVal(de))+1;
1956 dictGetEntryVal(de) = (void*) c;
1957 incrRefCount(shared);
1958 decrRefCount(o);
1959 return shared;
1960 } else {
1961 /* Here we are using a stream algorihtm: Every time an object is
1962 * shared we increment its count, everytime there is a miss we
1963 * recrement the counter of a random object. If this object reaches
1964 * zero we remove the object and put the current object instead. */
1965 if (dictSize(server.sharingpool) >=
1966 server.sharingpoolsize) {
1967 de = dictGetRandomKey(server.sharingpool);
1968 assert(de != NULL);
1969 c = ((unsigned long) dictGetEntryVal(de))-1;
1970 dictGetEntryVal(de) = (void*) c;
1971 if (c == 0) {
1972 dictDelete(server.sharingpool,de->key);
1973 }
1974 } else {
1975 c = 0; /* If the pool is empty we want to add this object */
1976 }
1977 if (c == 0) {
1978 int retval;
1979
1980 retval = dictAdd(server.sharingpool,o,(void*)1);
1981 assert(retval == DICT_OK);
1982 incrRefCount(o);
1983 }
1984 return o;
1985 }
1986 }
1987
1988 /* Check if the nul-terminated string 's' can be represented by a long
1989 * (that is, is a number that fits into long without any other space or
1990 * character before or after the digits).
1991 *
1992 * If so, the function returns REDIS_OK and *longval is set to the value
1993 * of the number. Otherwise REDIS_ERR is returned */
1994 static int isStringRepresentableAsLong(sds s, long *longval) {
1995 char buf[32], *endptr;
1996 long value;
1997 int slen;
1998
1999 value = strtol(s, &endptr, 10);
2000 if (endptr[0] != '\0') return REDIS_ERR;
2001 slen = snprintf(buf,32,"%ld",value);
2002
2003 /* If the number converted back into a string is not identical
2004 * then it's not possible to encode the string as integer */
2005 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2006 if (longval) *longval = value;
2007 return REDIS_OK;
2008 }
2009
2010 /* Try to encode a string object in order to save space */
2011 static int tryObjectEncoding(robj *o) {
2012 long value;
2013 sds s = o->ptr;
2014
2015 if (o->encoding != REDIS_ENCODING_RAW)
2016 return REDIS_ERR; /* Already encoded */
2017
2018 /* It's not save to encode shared objects: shared objects can be shared
2019 * everywhere in the "object space" of Redis. Encoded objects can only
2020 * appear as "values" (and not, for instance, as keys) */
2021 if (o->refcount > 1) return REDIS_ERR;
2022
2023 /* Currently we try to encode only strings */
2024 assert(o->type == REDIS_STRING);
2025
2026 /* Check if we can represent this string as a long integer */
2027 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2028
2029 /* Ok, this object can be encoded */
2030 o->encoding = REDIS_ENCODING_INT;
2031 sdsfree(o->ptr);
2032 o->ptr = (void*) value;
2033 return REDIS_OK;
2034 }
2035
2036 /* Get a decoded version of an encoded object (returned as a new object) */
2037 static robj *getDecodedObject(const robj *o) {
2038 robj *dec;
2039
2040 assert(o->encoding != REDIS_ENCODING_RAW);
2041 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2042 char buf[32];
2043
2044 snprintf(buf,32,"%ld",(long)o->ptr);
2045 dec = createStringObject(buf,strlen(buf));
2046 return dec;
2047 } else {
2048 assert(1 != 1);
2049 }
2050 }
2051
2052 static int compareStringObjects(robj *a, robj *b) {
2053 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2054
2055 if (a->encoding == REDIS_ENCODING_INT && b->encoding == REDIS_ENCODING_INT){
2056 return (long)a->ptr - (long)b->ptr;
2057 } else {
2058 int retval;
2059
2060 incrRefCount(a);
2061 incrRefCount(b);
2062 if (a->encoding != REDIS_ENCODING_RAW) a = getDecodedObject(a);
2063 if (b->encoding != REDIS_ENCODING_RAW) b = getDecodedObject(a);
2064 retval = sdscmp(a->ptr,b->ptr);
2065 decrRefCount(a);
2066 decrRefCount(b);
2067 return retval;
2068 }
2069 }
2070
2071 static size_t stringObjectLen(robj *o) {
2072 assert(o->type == REDIS_STRING);
2073 if (o->encoding == REDIS_ENCODING_RAW) {
2074 return sdslen(o->ptr);
2075 } else {
2076 char buf[32];
2077
2078 return snprintf(buf,32,"%ld",(long)o->ptr);
2079 }
2080 }
2081
2082 /*============================ DB saving/loading ============================ */
2083
2084 static int rdbSaveType(FILE *fp, unsigned char type) {
2085 if (fwrite(&type,1,1,fp) == 0) return -1;
2086 return 0;
2087 }
2088
2089 static int rdbSaveTime(FILE *fp, time_t t) {
2090 int32_t t32 = (int32_t) t;
2091 if (fwrite(&t32,4,1,fp) == 0) return -1;
2092 return 0;
2093 }
2094
2095 /* check rdbLoadLen() comments for more info */
2096 static int rdbSaveLen(FILE *fp, uint32_t len) {
2097 unsigned char buf[2];
2098
2099 if (len < (1<<6)) {
2100 /* Save a 6 bit len */
2101 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2102 if (fwrite(buf,1,1,fp) == 0) return -1;
2103 } else if (len < (1<<14)) {
2104 /* Save a 14 bit len */
2105 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2106 buf[1] = len&0xFF;
2107 if (fwrite(buf,2,1,fp) == 0) return -1;
2108 } else {
2109 /* Save a 32 bit len */
2110 buf[0] = (REDIS_RDB_32BITLEN<<6);
2111 if (fwrite(buf,1,1,fp) == 0) return -1;
2112 len = htonl(len);
2113 if (fwrite(&len,4,1,fp) == 0) return -1;
2114 }
2115 return 0;
2116 }
2117
2118 /* String objects in the form "2391" "-100" without any space and with a
2119 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2120 * encoded as integers to save space */
2121 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2122 long long value;
2123 char *endptr, buf[32];
2124
2125 /* Check if it's possible to encode this value as a number */
2126 value = strtoll(s, &endptr, 10);
2127 if (endptr[0] != '\0') return 0;
2128 snprintf(buf,32,"%lld",value);
2129
2130 /* If the number converted back into a string is not identical
2131 * then it's not possible to encode the string as integer */
2132 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2133
2134 /* Finally check if it fits in our ranges */
2135 if (value >= -(1<<7) && value <= (1<<7)-1) {
2136 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2137 enc[1] = value&0xFF;
2138 return 2;
2139 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2140 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2141 enc[1] = value&0xFF;
2142 enc[2] = (value>>8)&0xFF;
2143 return 3;
2144 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2145 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2146 enc[1] = value&0xFF;
2147 enc[2] = (value>>8)&0xFF;
2148 enc[3] = (value>>16)&0xFF;
2149 enc[4] = (value>>24)&0xFF;
2150 return 5;
2151 } else {
2152 return 0;
2153 }
2154 }
2155
2156 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2157 unsigned int comprlen, outlen;
2158 unsigned char byte;
2159 void *out;
2160
2161 /* We require at least four bytes compression for this to be worth it */
2162 outlen = sdslen(obj->ptr)-4;
2163 if (outlen <= 0) return 0;
2164 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2165 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2166 if (comprlen == 0) {
2167 zfree(out);
2168 return 0;
2169 }
2170 /* Data compressed! Let's save it on disk */
2171 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2172 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2173 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2174 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2175 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2176 zfree(out);
2177 return comprlen;
2178
2179 writeerr:
2180 zfree(out);
2181 return -1;
2182 }
2183
2184 /* Save a string objet as [len][data] on disk. If the object is a string
2185 * representation of an integer value we try to safe it in a special form */
2186 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2187 size_t len;
2188 int enclen;
2189
2190 len = sdslen(obj->ptr);
2191
2192 /* Try integer encoding */
2193 if (len <= 11) {
2194 unsigned char buf[5];
2195 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2196 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2197 return 0;
2198 }
2199 }
2200
2201 /* Try LZF compression - under 20 bytes it's unable to compress even
2202 * aaaaaaaaaaaaaaaaaa so skip it */
2203 if (len > 20) {
2204 int retval;
2205
2206 retval = rdbSaveLzfStringObject(fp,obj);
2207 if (retval == -1) return -1;
2208 if (retval > 0) return 0;
2209 /* retval == 0 means data can't be compressed, save the old way */
2210 }
2211
2212 /* Store verbatim */
2213 if (rdbSaveLen(fp,len) == -1) return -1;
2214 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2215 return 0;
2216 }
2217
2218 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2219 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2220 int retval;
2221 robj *dec;
2222
2223 if (obj->encoding != REDIS_ENCODING_RAW) {
2224 dec = getDecodedObject(obj);
2225 retval = rdbSaveStringObjectRaw(fp,dec);
2226 decrRefCount(dec);
2227 return retval;
2228 } else {
2229 return rdbSaveStringObjectRaw(fp,obj);
2230 }
2231 }
2232
2233 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2234 static int rdbSave(char *filename) {
2235 dictIterator *di = NULL;
2236 dictEntry *de;
2237 FILE *fp;
2238 char tmpfile[256];
2239 int j;
2240 time_t now = time(NULL);
2241
2242 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2243 fp = fopen(tmpfile,"w");
2244 if (!fp) {
2245 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2246 return REDIS_ERR;
2247 }
2248 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2249 for (j = 0; j < server.dbnum; j++) {
2250 redisDb *db = server.db+j;
2251 dict *d = db->dict;
2252 if (dictSize(d) == 0) continue;
2253 di = dictGetIterator(d);
2254 if (!di) {
2255 fclose(fp);
2256 return REDIS_ERR;
2257 }
2258
2259 /* Write the SELECT DB opcode */
2260 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2261 if (rdbSaveLen(fp,j) == -1) goto werr;
2262
2263 /* Iterate this DB writing every entry */
2264 while((de = dictNext(di)) != NULL) {
2265 robj *key = dictGetEntryKey(de);
2266 robj *o = dictGetEntryVal(de);
2267 time_t expiretime = getExpire(db,key);
2268
2269 /* Save the expire time */
2270 if (expiretime != -1) {
2271 /* If this key is already expired skip it */
2272 if (expiretime < now) continue;
2273 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2274 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2275 }
2276 /* Save the key and associated value */
2277 if (rdbSaveType(fp,o->type) == -1) goto werr;
2278 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2279 if (o->type == REDIS_STRING) {
2280 /* Save a string value */
2281 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2282 } else if (o->type == REDIS_LIST) {
2283 /* Save a list value */
2284 list *list = o->ptr;
2285 listNode *ln;
2286
2287 listRewind(list);
2288 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2289 while((ln = listYield(list))) {
2290 robj *eleobj = listNodeValue(ln);
2291
2292 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2293 }
2294 } else if (o->type == REDIS_SET) {
2295 /* Save a set value */
2296 dict *set = o->ptr;
2297 dictIterator *di = dictGetIterator(set);
2298 dictEntry *de;
2299
2300 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2301 while((de = dictNext(di)) != NULL) {
2302 robj *eleobj = dictGetEntryKey(de);
2303
2304 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2305 }
2306 dictReleaseIterator(di);
2307 } else {
2308 assert(0 != 0);
2309 }
2310 }
2311 dictReleaseIterator(di);
2312 }
2313 /* EOF opcode */
2314 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2315
2316 /* Make sure data will not remain on the OS's output buffers */
2317 fflush(fp);
2318 fsync(fileno(fp));
2319 fclose(fp);
2320
2321 /* Use RENAME to make sure the DB file is changed atomically only
2322 * if the generate DB file is ok. */
2323 if (rename(tmpfile,filename) == -1) {
2324 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
2325 unlink(tmpfile);
2326 return REDIS_ERR;
2327 }
2328 redisLog(REDIS_NOTICE,"DB saved on disk");
2329 server.dirty = 0;
2330 server.lastsave = time(NULL);
2331 return REDIS_OK;
2332
2333 werr:
2334 fclose(fp);
2335 unlink(tmpfile);
2336 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2337 if (di) dictReleaseIterator(di);
2338 return REDIS_ERR;
2339 }
2340
2341 static int rdbSaveBackground(char *filename) {
2342 pid_t childpid;
2343
2344 if (server.bgsaveinprogress) return REDIS_ERR;
2345 if ((childpid = fork()) == 0) {
2346 /* Child */
2347 close(server.fd);
2348 if (rdbSave(filename) == REDIS_OK) {
2349 exit(0);
2350 } else {
2351 exit(1);
2352 }
2353 } else {
2354 /* Parent */
2355 if (childpid == -1) {
2356 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2357 strerror(errno));
2358 return REDIS_ERR;
2359 }
2360 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2361 server.bgsaveinprogress = 1;
2362 server.bgsavechildpid = childpid;
2363 return REDIS_OK;
2364 }
2365 return REDIS_OK; /* unreached */
2366 }
2367
2368 static void rdbRemoveTempFile(pid_t childpid) {
2369 char tmpfile[256];
2370
2371 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2372 unlink(tmpfile);
2373 }
2374
2375 static int rdbLoadType(FILE *fp) {
2376 unsigned char type;
2377 if (fread(&type,1,1,fp) == 0) return -1;
2378 return type;
2379 }
2380
2381 static time_t rdbLoadTime(FILE *fp) {
2382 int32_t t32;
2383 if (fread(&t32,4,1,fp) == 0) return -1;
2384 return (time_t) t32;
2385 }
2386
2387 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2388 * of this file for a description of how this are stored on disk.
2389 *
2390 * isencoded is set to 1 if the readed length is not actually a length but
2391 * an "encoding type", check the above comments for more info */
2392 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2393 unsigned char buf[2];
2394 uint32_t len;
2395
2396 if (isencoded) *isencoded = 0;
2397 if (rdbver == 0) {
2398 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2399 return ntohl(len);
2400 } else {
2401 int type;
2402
2403 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2404 type = (buf[0]&0xC0)>>6;
2405 if (type == REDIS_RDB_6BITLEN) {
2406 /* Read a 6 bit len */
2407 return buf[0]&0x3F;
2408 } else if (type == REDIS_RDB_ENCVAL) {
2409 /* Read a 6 bit len encoding type */
2410 if (isencoded) *isencoded = 1;
2411 return buf[0]&0x3F;
2412 } else if (type == REDIS_RDB_14BITLEN) {
2413 /* Read a 14 bit len */
2414 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2415 return ((buf[0]&0x3F)<<8)|buf[1];
2416 } else {
2417 /* Read a 32 bit len */
2418 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2419 return ntohl(len);
2420 }
2421 }
2422 }
2423
2424 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2425 unsigned char enc[4];
2426 long long val;
2427
2428 if (enctype == REDIS_RDB_ENC_INT8) {
2429 if (fread(enc,1,1,fp) == 0) return NULL;
2430 val = (signed char)enc[0];
2431 } else if (enctype == REDIS_RDB_ENC_INT16) {
2432 uint16_t v;
2433 if (fread(enc,2,1,fp) == 0) return NULL;
2434 v = enc[0]|(enc[1]<<8);
2435 val = (int16_t)v;
2436 } else if (enctype == REDIS_RDB_ENC_INT32) {
2437 uint32_t v;
2438 if (fread(enc,4,1,fp) == 0) return NULL;
2439 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2440 val = (int32_t)v;
2441 } else {
2442 val = 0; /* anti-warning */
2443 assert(0!=0);
2444 }
2445 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2446 }
2447
2448 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2449 unsigned int len, clen;
2450 unsigned char *c = NULL;
2451 sds val = NULL;
2452
2453 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2454 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2455 if ((c = zmalloc(clen)) == NULL) goto err;
2456 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2457 if (fread(c,clen,1,fp) == 0) goto err;
2458 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2459 zfree(c);
2460 return createObject(REDIS_STRING,val);
2461 err:
2462 zfree(c);
2463 sdsfree(val);
2464 return NULL;
2465 }
2466
2467 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2468 int isencoded;
2469 uint32_t len;
2470 sds val;
2471
2472 len = rdbLoadLen(fp,rdbver,&isencoded);
2473 if (isencoded) {
2474 switch(len) {
2475 case REDIS_RDB_ENC_INT8:
2476 case REDIS_RDB_ENC_INT16:
2477 case REDIS_RDB_ENC_INT32:
2478 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2479 case REDIS_RDB_ENC_LZF:
2480 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2481 default:
2482 assert(0!=0);
2483 }
2484 }
2485
2486 if (len == REDIS_RDB_LENERR) return NULL;
2487 val = sdsnewlen(NULL,len);
2488 if (len && fread(val,len,1,fp) == 0) {
2489 sdsfree(val);
2490 return NULL;
2491 }
2492 return tryObjectSharing(createObject(REDIS_STRING,val));
2493 }
2494
2495 static int rdbLoad(char *filename) {
2496 FILE *fp;
2497 robj *keyobj = NULL;
2498 uint32_t dbid;
2499 int type, retval, rdbver;
2500 dict *d = server.db[0].dict;
2501 redisDb *db = server.db+0;
2502 char buf[1024];
2503 time_t expiretime = -1, now = time(NULL);
2504
2505 fp = fopen(filename,"r");
2506 if (!fp) return REDIS_ERR;
2507 if (fread(buf,9,1,fp) == 0) goto eoferr;
2508 buf[9] = '\0';
2509 if (memcmp(buf,"REDIS",5) != 0) {
2510 fclose(fp);
2511 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2512 return REDIS_ERR;
2513 }
2514 rdbver = atoi(buf+5);
2515 if (rdbver > 1) {
2516 fclose(fp);
2517 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2518 return REDIS_ERR;
2519 }
2520 while(1) {
2521 robj *o;
2522
2523 /* Read type. */
2524 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2525 if (type == REDIS_EXPIRETIME) {
2526 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2527 /* We read the time so we need to read the object type again */
2528 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2529 }
2530 if (type == REDIS_EOF) break;
2531 /* Handle SELECT DB opcode as a special case */
2532 if (type == REDIS_SELECTDB) {
2533 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2534 goto eoferr;
2535 if (dbid >= (unsigned)server.dbnum) {
2536 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2537 exit(1);
2538 }
2539 db = server.db+dbid;
2540 d = db->dict;
2541 continue;
2542 }
2543 /* Read key */
2544 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2545
2546 if (type == REDIS_STRING) {
2547 /* Read string value */
2548 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2549 tryObjectEncoding(o);
2550 } else if (type == REDIS_LIST || type == REDIS_SET) {
2551 /* Read list/set value */
2552 uint32_t listlen;
2553
2554 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2555 goto eoferr;
2556 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2557 /* Load every single element of the list/set */
2558 while(listlen--) {
2559 robj *ele;
2560
2561 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2562 tryObjectEncoding(ele);
2563 if (type == REDIS_LIST) {
2564 listAddNodeTail((list*)o->ptr,ele);
2565 } else {
2566 dictAdd((dict*)o->ptr,ele,NULL);
2567 }
2568 }
2569 } else {
2570 assert(0 != 0);
2571 }
2572 /* Add the new object in the hash table */
2573 retval = dictAdd(d,keyobj,o);
2574 if (retval == DICT_ERR) {
2575 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2576 exit(1);
2577 }
2578 /* Set the expire time if needed */
2579 if (expiretime != -1) {
2580 setExpire(db,keyobj,expiretime);
2581 /* Delete this key if already expired */
2582 if (expiretime < now) deleteKey(db,keyobj);
2583 expiretime = -1;
2584 }
2585 keyobj = o = NULL;
2586 }
2587 fclose(fp);
2588 return REDIS_OK;
2589
2590 eoferr: /* unexpected end of file is handled here with a fatal exit */
2591 if (keyobj) decrRefCount(keyobj);
2592 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2593 exit(1);
2594 return REDIS_ERR; /* Just to avoid warning */
2595 }
2596
2597 /*================================== Commands =============================== */
2598
2599 static void authCommand(redisClient *c) {
2600 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2601 c->authenticated = 1;
2602 addReply(c,shared.ok);
2603 } else {
2604 c->authenticated = 0;
2605 addReply(c,shared.err);
2606 }
2607 }
2608
2609 static void pingCommand(redisClient *c) {
2610 addReply(c,shared.pong);
2611 }
2612
2613 static void echoCommand(redisClient *c) {
2614 addReplyBulkLen(c,c->argv[1]);
2615 addReply(c,c->argv[1]);
2616 addReply(c,shared.crlf);
2617 }
2618
2619 /*=================================== Strings =============================== */
2620
2621 static void setGenericCommand(redisClient *c, int nx) {
2622 int retval;
2623
2624 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2625 if (retval == DICT_ERR) {
2626 if (!nx) {
2627 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2628 incrRefCount(c->argv[2]);
2629 } else {
2630 addReply(c,shared.czero);
2631 return;
2632 }
2633 } else {
2634 incrRefCount(c->argv[1]);
2635 incrRefCount(c->argv[2]);
2636 }
2637 server.dirty++;
2638 removeExpire(c->db,c->argv[1]);
2639 addReply(c, nx ? shared.cone : shared.ok);
2640 }
2641
2642 static void setCommand(redisClient *c) {
2643 setGenericCommand(c,0);
2644 }
2645
2646 static void setnxCommand(redisClient *c) {
2647 setGenericCommand(c,1);
2648 }
2649
2650 static void getCommand(redisClient *c) {
2651 robj *o = lookupKeyRead(c->db,c->argv[1]);
2652
2653 if (o == NULL) {
2654 addReply(c,shared.nullbulk);
2655 } else {
2656 if (o->type != REDIS_STRING) {
2657 addReply(c,shared.wrongtypeerr);
2658 } else {
2659 addReplyBulkLen(c,o);
2660 addReply(c,o);
2661 addReply(c,shared.crlf);
2662 }
2663 }
2664 }
2665
2666 static void getsetCommand(redisClient *c) {
2667 getCommand(c);
2668 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2669 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2670 } else {
2671 incrRefCount(c->argv[1]);
2672 }
2673 incrRefCount(c->argv[2]);
2674 server.dirty++;
2675 removeExpire(c->db,c->argv[1]);
2676 }
2677
2678 static void mgetCommand(redisClient *c) {
2679 int j;
2680
2681 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2682 for (j = 1; j < c->argc; j++) {
2683 robj *o = lookupKeyRead(c->db,c->argv[j]);
2684 if (o == NULL) {
2685 addReply(c,shared.nullbulk);
2686 } else {
2687 if (o->type != REDIS_STRING) {
2688 addReply(c,shared.nullbulk);
2689 } else {
2690 addReplyBulkLen(c,o);
2691 addReply(c,o);
2692 addReply(c,shared.crlf);
2693 }
2694 }
2695 }
2696 }
2697
2698 static void incrDecrCommand(redisClient *c, long long incr) {
2699 long long value;
2700 int retval;
2701 robj *o;
2702
2703 o = lookupKeyWrite(c->db,c->argv[1]);
2704 if (o == NULL) {
2705 value = 0;
2706 } else {
2707 if (o->type != REDIS_STRING) {
2708 value = 0;
2709 } else {
2710 char *eptr;
2711
2712 if (o->encoding == REDIS_ENCODING_RAW)
2713 value = strtoll(o->ptr, &eptr, 10);
2714 else if (o->encoding == REDIS_ENCODING_INT)
2715 value = (long)o->ptr;
2716 else
2717 assert(1 != 1);
2718 }
2719 }
2720
2721 value += incr;
2722 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2723 tryObjectEncoding(o);
2724 retval = dictAdd(c->db->dict,c->argv[1],o);
2725 if (retval == DICT_ERR) {
2726 dictReplace(c->db->dict,c->argv[1],o);
2727 removeExpire(c->db,c->argv[1]);
2728 } else {
2729 incrRefCount(c->argv[1]);
2730 }
2731 server.dirty++;
2732 addReply(c,shared.colon);
2733 addReply(c,o);
2734 addReply(c,shared.crlf);
2735 }
2736
2737 static void incrCommand(redisClient *c) {
2738 incrDecrCommand(c,1);
2739 }
2740
2741 static void decrCommand(redisClient *c) {
2742 incrDecrCommand(c,-1);
2743 }
2744
2745 static void incrbyCommand(redisClient *c) {
2746 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2747 incrDecrCommand(c,incr);
2748 }
2749
2750 static void decrbyCommand(redisClient *c) {
2751 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2752 incrDecrCommand(c,-incr);
2753 }
2754
2755 /* ========================= Type agnostic commands ========================= */
2756
2757 static void delCommand(redisClient *c) {
2758 int deleted = 0, j;
2759
2760 for (j = 1; j < c->argc; j++) {
2761 if (deleteKey(c->db,c->argv[j])) {
2762 server.dirty++;
2763 deleted++;
2764 }
2765 }
2766 switch(deleted) {
2767 case 0:
2768 addReply(c,shared.czero);
2769 break;
2770 case 1:
2771 addReply(c,shared.cone);
2772 break;
2773 default:
2774 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2775 break;
2776 }
2777 }
2778
2779 static void existsCommand(redisClient *c) {
2780 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2781 }
2782
2783 static void selectCommand(redisClient *c) {
2784 int id = atoi(c->argv[1]->ptr);
2785
2786 if (selectDb(c,id) == REDIS_ERR) {
2787 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2788 } else {
2789 addReply(c,shared.ok);
2790 }
2791 }
2792
2793 static void randomkeyCommand(redisClient *c) {
2794 dictEntry *de;
2795
2796 while(1) {
2797 de = dictGetRandomKey(c->db->dict);
2798 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2799 }
2800 if (de == NULL) {
2801 addReply(c,shared.plus);
2802 addReply(c,shared.crlf);
2803 } else {
2804 addReply(c,shared.plus);
2805 addReply(c,dictGetEntryKey(de));
2806 addReply(c,shared.crlf);
2807 }
2808 }
2809
2810 static void keysCommand(redisClient *c) {
2811 dictIterator *di;
2812 dictEntry *de;
2813 sds pattern = c->argv[1]->ptr;
2814 int plen = sdslen(pattern);
2815 int numkeys = 0, keyslen = 0;
2816 robj *lenobj = createObject(REDIS_STRING,NULL);
2817
2818 di = dictGetIterator(c->db->dict);
2819 addReply(c,lenobj);
2820 decrRefCount(lenobj);
2821 while((de = dictNext(di)) != NULL) {
2822 robj *keyobj = dictGetEntryKey(de);
2823
2824 sds key = keyobj->ptr;
2825 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2826 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2827 if (expireIfNeeded(c->db,keyobj) == 0) {
2828 if (numkeys != 0)
2829 addReply(c,shared.space);
2830 addReply(c,keyobj);
2831 numkeys++;
2832 keyslen += sdslen(key);
2833 }
2834 }
2835 }
2836 dictReleaseIterator(di);
2837 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2838 addReply(c,shared.crlf);
2839 }
2840
2841 static void dbsizeCommand(redisClient *c) {
2842 addReplySds(c,
2843 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2844 }
2845
2846 static void lastsaveCommand(redisClient *c) {
2847 addReplySds(c,
2848 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2849 }
2850
2851 static void typeCommand(redisClient *c) {
2852 robj *o;
2853 char *type;
2854
2855 o = lookupKeyRead(c->db,c->argv[1]);
2856 if (o == NULL) {
2857 type = "+none";
2858 } else {
2859 switch(o->type) {
2860 case REDIS_STRING: type = "+string"; break;
2861 case REDIS_LIST: type = "+list"; break;
2862 case REDIS_SET: type = "+set"; break;
2863 default: type = "unknown"; break;
2864 }
2865 }
2866 addReplySds(c,sdsnew(type));
2867 addReply(c,shared.crlf);
2868 }
2869
2870 static void saveCommand(redisClient *c) {
2871 if (server.bgsaveinprogress) {
2872 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2873 return;
2874 }
2875 if (rdbSave(server.dbfilename) == REDIS_OK) {
2876 addReply(c,shared.ok);
2877 } else {
2878 addReply(c,shared.err);
2879 }
2880 }
2881
2882 static void bgsaveCommand(redisClient *c) {
2883 if (server.bgsaveinprogress) {
2884 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2885 return;
2886 }
2887 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2888 addReply(c,shared.ok);
2889 } else {
2890 addReply(c,shared.err);
2891 }
2892 }
2893
2894 static void shutdownCommand(redisClient *c) {
2895 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2896 /* Kill the saving child if there is a background saving in progress.
2897 We want to avoid race conditions, for instance our saving child may
2898 overwrite the synchronous saving did by SHUTDOWN. */
2899 if (server.bgsaveinprogress) {
2900 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2901 kill(server.bgsavechildpid,SIGKILL);
2902 rdbRemoveTempFile(server.bgsavechildpid);
2903 }
2904 /* SYNC SAVE */
2905 if (rdbSave(server.dbfilename) == REDIS_OK) {
2906 if (server.daemonize)
2907 unlink(server.pidfile);
2908 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2909 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2910 exit(1);
2911 } else {
2912 /* Ooops.. error saving! The best we can do is to continue operating.
2913 * Note that if there was a background saving process, in the next
2914 * cron() Redis will be notified that the background saving aborted,
2915 * handling special stuff like slaves pending for synchronization... */
2916 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2917 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2918 }
2919 }
2920
2921 static void renameGenericCommand(redisClient *c, int nx) {
2922 robj *o;
2923
2924 /* To use the same key as src and dst is probably an error */
2925 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2926 addReply(c,shared.sameobjecterr);
2927 return;
2928 }
2929
2930 o = lookupKeyWrite(c->db,c->argv[1]);
2931 if (o == NULL) {
2932 addReply(c,shared.nokeyerr);
2933 return;
2934 }
2935 incrRefCount(o);
2936 deleteIfVolatile(c->db,c->argv[2]);
2937 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2938 if (nx) {
2939 decrRefCount(o);
2940 addReply(c,shared.czero);
2941 return;
2942 }
2943 dictReplace(c->db->dict,c->argv[2],o);
2944 } else {
2945 incrRefCount(c->argv[2]);
2946 }
2947 deleteKey(c->db,c->argv[1]);
2948 server.dirty++;
2949 addReply(c,nx ? shared.cone : shared.ok);
2950 }
2951
2952 static void renameCommand(redisClient *c) {
2953 renameGenericCommand(c,0);
2954 }
2955
2956 static void renamenxCommand(redisClient *c) {
2957 renameGenericCommand(c,1);
2958 }
2959
2960 static void moveCommand(redisClient *c) {
2961 robj *o;
2962 redisDb *src, *dst;
2963 int srcid;
2964
2965 /* Obtain source and target DB pointers */
2966 src = c->db;
2967 srcid = c->db->id;
2968 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2969 addReply(c,shared.outofrangeerr);
2970 return;
2971 }
2972 dst = c->db;
2973 selectDb(c,srcid); /* Back to the source DB */
2974
2975 /* If the user is moving using as target the same
2976 * DB as the source DB it is probably an error. */
2977 if (src == dst) {
2978 addReply(c,shared.sameobjecterr);
2979 return;
2980 }
2981
2982 /* Check if the element exists and get a reference */
2983 o = lookupKeyWrite(c->db,c->argv[1]);
2984 if (!o) {
2985 addReply(c,shared.czero);
2986 return;
2987 }
2988
2989 /* Try to add the element to the target DB */
2990 deleteIfVolatile(dst,c->argv[1]);
2991 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2992 addReply(c,shared.czero);
2993 return;
2994 }
2995 incrRefCount(c->argv[1]);
2996 incrRefCount(o);
2997
2998 /* OK! key moved, free the entry in the source DB */
2999 deleteKey(src,c->argv[1]);
3000 server.dirty++;
3001 addReply(c,shared.cone);
3002 }
3003
3004 /* =================================== Lists ================================ */
3005 static void pushGenericCommand(redisClient *c, int where) {
3006 robj *lobj;
3007 list *list;
3008
3009 lobj = lookupKeyWrite(c->db,c->argv[1]);
3010 if (lobj == NULL) {
3011 lobj = createListObject();
3012 list = lobj->ptr;
3013 if (where == REDIS_HEAD) {
3014 listAddNodeHead(list,c->argv[2]);
3015 } else {
3016 listAddNodeTail(list,c->argv[2]);
3017 }
3018 dictAdd(c->db->dict,c->argv[1],lobj);
3019 incrRefCount(c->argv[1]);
3020 incrRefCount(c->argv[2]);
3021 } else {
3022 if (lobj->type != REDIS_LIST) {
3023 addReply(c,shared.wrongtypeerr);
3024 return;
3025 }
3026 list = lobj->ptr;
3027 if (where == REDIS_HEAD) {
3028 listAddNodeHead(list,c->argv[2]);
3029 } else {
3030 listAddNodeTail(list,c->argv[2]);
3031 }
3032 incrRefCount(c->argv[2]);
3033 }
3034 server.dirty++;
3035 addReply(c,shared.ok);
3036 }
3037
3038 static void lpushCommand(redisClient *c) {
3039 pushGenericCommand(c,REDIS_HEAD);
3040 }
3041
3042 static void rpushCommand(redisClient *c) {
3043 pushGenericCommand(c,REDIS_TAIL);
3044 }
3045
3046 static void llenCommand(redisClient *c) {
3047 robj *o;
3048 list *l;
3049
3050 o = lookupKeyRead(c->db,c->argv[1]);
3051 if (o == NULL) {
3052 addReply(c,shared.czero);
3053 return;
3054 } else {
3055 if (o->type != REDIS_LIST) {
3056 addReply(c,shared.wrongtypeerr);
3057 } else {
3058 l = o->ptr;
3059 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3060 }
3061 }
3062 }
3063
3064 static void lindexCommand(redisClient *c) {
3065 robj *o;
3066 int index = atoi(c->argv[2]->ptr);
3067
3068 o = lookupKeyRead(c->db,c->argv[1]);
3069 if (o == NULL) {
3070 addReply(c,shared.nullbulk);
3071 } else {
3072 if (o->type != REDIS_LIST) {
3073 addReply(c,shared.wrongtypeerr);
3074 } else {
3075 list *list = o->ptr;
3076 listNode *ln;
3077
3078 ln = listIndex(list, index);
3079 if (ln == NULL) {
3080 addReply(c,shared.nullbulk);
3081 } else {
3082 robj *ele = listNodeValue(ln);
3083 addReplyBulkLen(c,ele);
3084 addReply(c,ele);
3085 addReply(c,shared.crlf);
3086 }
3087 }
3088 }
3089 }
3090
3091 static void lsetCommand(redisClient *c) {
3092 robj *o;
3093 int index = atoi(c->argv[2]->ptr);
3094
3095 o = lookupKeyWrite(c->db,c->argv[1]);
3096 if (o == NULL) {
3097 addReply(c,shared.nokeyerr);
3098 } else {
3099 if (o->type != REDIS_LIST) {
3100 addReply(c,shared.wrongtypeerr);
3101 } else {
3102 list *list = o->ptr;
3103 listNode *ln;
3104
3105 ln = listIndex(list, index);
3106 if (ln == NULL) {
3107 addReply(c,shared.outofrangeerr);
3108 } else {
3109 robj *ele = listNodeValue(ln);
3110
3111 decrRefCount(ele);
3112 listNodeValue(ln) = c->argv[3];
3113 incrRefCount(c->argv[3]);
3114 addReply(c,shared.ok);
3115 server.dirty++;
3116 }
3117 }
3118 }
3119 }
3120
3121 static void popGenericCommand(redisClient *c, int where) {
3122 robj *o;
3123
3124 o = lookupKeyWrite(c->db,c->argv[1]);
3125 if (o == NULL) {
3126 addReply(c,shared.nullbulk);
3127 } else {
3128 if (o->type != REDIS_LIST) {
3129 addReply(c,shared.wrongtypeerr);
3130 } else {
3131 list *list = o->ptr;
3132 listNode *ln;
3133
3134 if (where == REDIS_HEAD)
3135 ln = listFirst(list);
3136 else
3137 ln = listLast(list);
3138
3139 if (ln == NULL) {
3140 addReply(c,shared.nullbulk);
3141 } else {
3142 robj *ele = listNodeValue(ln);
3143 addReplyBulkLen(c,ele);
3144 addReply(c,ele);
3145 addReply(c,shared.crlf);
3146 listDelNode(list,ln);
3147 server.dirty++;
3148 }
3149 }
3150 }
3151 }
3152
3153 static void lpopCommand(redisClient *c) {
3154 popGenericCommand(c,REDIS_HEAD);
3155 }
3156
3157 static void rpopCommand(redisClient *c) {
3158 popGenericCommand(c,REDIS_TAIL);
3159 }
3160
3161 static void lrangeCommand(redisClient *c) {
3162 robj *o;
3163 int start = atoi(c->argv[2]->ptr);
3164 int end = atoi(c->argv[3]->ptr);
3165
3166 o = lookupKeyRead(c->db,c->argv[1]);
3167 if (o == NULL) {
3168 addReply(c,shared.nullmultibulk);
3169 } else {
3170 if (o->type != REDIS_LIST) {
3171 addReply(c,shared.wrongtypeerr);
3172 } else {
3173 list *list = o->ptr;
3174 listNode *ln;
3175 int llen = listLength(list);
3176 int rangelen, j;
3177 robj *ele;
3178
3179 /* convert negative indexes */
3180 if (start < 0) start = llen+start;
3181 if (end < 0) end = llen+end;
3182 if (start < 0) start = 0;
3183 if (end < 0) end = 0;
3184
3185 /* indexes sanity checks */
3186 if (start > end || start >= llen) {
3187 /* Out of range start or start > end result in empty list */
3188 addReply(c,shared.emptymultibulk);
3189 return;
3190 }
3191 if (end >= llen) end = llen-1;
3192 rangelen = (end-start)+1;
3193
3194 /* Return the result in form of a multi-bulk reply */
3195 ln = listIndex(list, start);
3196 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3197 for (j = 0; j < rangelen; j++) {
3198 ele = listNodeValue(ln);
3199 addReplyBulkLen(c,ele);
3200 addReply(c,ele);
3201 addReply(c,shared.crlf);
3202 ln = ln->next;
3203 }
3204 }
3205 }
3206 }
3207
3208 static void ltrimCommand(redisClient *c) {
3209 robj *o;
3210 int start = atoi(c->argv[2]->ptr);
3211 int end = atoi(c->argv[3]->ptr);
3212
3213 o = lookupKeyWrite(c->db,c->argv[1]);
3214 if (o == NULL) {
3215 addReply(c,shared.nokeyerr);
3216 } else {
3217 if (o->type != REDIS_LIST) {
3218 addReply(c,shared.wrongtypeerr);
3219 } else {
3220 list *list = o->ptr;
3221 listNode *ln;
3222 int llen = listLength(list);
3223 int j, ltrim, rtrim;
3224
3225 /* convert negative indexes */
3226 if (start < 0) start = llen+start;
3227 if (end < 0) end = llen+end;
3228 if (start < 0) start = 0;
3229 if (end < 0) end = 0;
3230
3231 /* indexes sanity checks */
3232 if (start > end || start >= llen) {
3233 /* Out of range start or start > end result in empty list */
3234 ltrim = llen;
3235 rtrim = 0;
3236 } else {
3237 if (end >= llen) end = llen-1;
3238 ltrim = start;
3239 rtrim = llen-end-1;
3240 }
3241
3242 /* Remove list elements to perform the trim */
3243 for (j = 0; j < ltrim; j++) {
3244 ln = listFirst(list);
3245 listDelNode(list,ln);
3246 }
3247 for (j = 0; j < rtrim; j++) {
3248 ln = listLast(list);
3249 listDelNode(list,ln);
3250 }
3251 server.dirty++;
3252 addReply(c,shared.ok);
3253 }
3254 }
3255 }
3256
3257 static void lremCommand(redisClient *c) {
3258 robj *o;
3259
3260 o = lookupKeyWrite(c->db,c->argv[1]);
3261 if (o == NULL) {
3262 addReply(c,shared.czero);
3263 } else {
3264 if (o->type != REDIS_LIST) {
3265 addReply(c,shared.wrongtypeerr);
3266 } else {
3267 list *list = o->ptr;
3268 listNode *ln, *next;
3269 int toremove = atoi(c->argv[2]->ptr);
3270 int removed = 0;
3271 int fromtail = 0;
3272
3273 if (toremove < 0) {
3274 toremove = -toremove;
3275 fromtail = 1;
3276 }
3277 ln = fromtail ? list->tail : list->head;
3278 while (ln) {
3279 robj *ele = listNodeValue(ln);
3280
3281 next = fromtail ? ln->prev : ln->next;
3282 if (compareStringObjects(ele,c->argv[3]) == 0) {
3283 listDelNode(list,ln);
3284 server.dirty++;
3285 removed++;
3286 if (toremove && removed == toremove) break;
3287 }
3288 ln = next;
3289 }
3290 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3291 }
3292 }
3293 }
3294
3295 /* ==================================== Sets ================================ */
3296
3297 static void saddCommand(redisClient *c) {
3298 robj *set;
3299
3300 set = lookupKeyWrite(c->db,c->argv[1]);
3301 if (set == NULL) {
3302 set = createSetObject();
3303 dictAdd(c->db->dict,c->argv[1],set);
3304 incrRefCount(c->argv[1]);
3305 } else {
3306 if (set->type != REDIS_SET) {
3307 addReply(c,shared.wrongtypeerr);
3308 return;
3309 }
3310 }
3311 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3312 incrRefCount(c->argv[2]);
3313 server.dirty++;
3314 addReply(c,shared.cone);
3315 } else {
3316 addReply(c,shared.czero);
3317 }
3318 }
3319
3320 static void sremCommand(redisClient *c) {
3321 robj *set;
3322
3323 set = lookupKeyWrite(c->db,c->argv[1]);
3324 if (set == NULL) {
3325 addReply(c,shared.czero);
3326 } else {
3327 if (set->type != REDIS_SET) {
3328 addReply(c,shared.wrongtypeerr);
3329 return;
3330 }
3331 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3332 server.dirty++;
3333 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3334 addReply(c,shared.cone);
3335 } else {
3336 addReply(c,shared.czero);
3337 }
3338 }
3339 }
3340
3341 static void smoveCommand(redisClient *c) {
3342 robj *srcset, *dstset;
3343
3344 srcset = lookupKeyWrite(c->db,c->argv[1]);
3345 dstset = lookupKeyWrite(c->db,c->argv[2]);
3346
3347 /* If the source key does not exist return 0, if it's of the wrong type
3348 * raise an error */
3349 if (srcset == NULL || srcset->type != REDIS_SET) {
3350 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3351 return;
3352 }
3353 /* Error if the destination key is not a set as well */
3354 if (dstset && dstset->type != REDIS_SET) {
3355 addReply(c,shared.wrongtypeerr);
3356 return;
3357 }
3358 /* Remove the element from the source set */
3359 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3360 /* Key not found in the src set! return zero */
3361 addReply(c,shared.czero);
3362 return;
3363 }
3364 server.dirty++;
3365 /* Add the element to the destination set */
3366 if (!dstset) {
3367 dstset = createSetObject();
3368 dictAdd(c->db->dict,c->argv[2],dstset);
3369 incrRefCount(c->argv[2]);
3370 }
3371 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3372 incrRefCount(c->argv[3]);
3373 addReply(c,shared.cone);
3374 }
3375
3376 static void sismemberCommand(redisClient *c) {
3377 robj *set;
3378
3379 set = lookupKeyRead(c->db,c->argv[1]);
3380 if (set == NULL) {
3381 addReply(c,shared.czero);
3382 } else {
3383 if (set->type != REDIS_SET) {
3384 addReply(c,shared.wrongtypeerr);
3385 return;
3386 }
3387 if (dictFind(set->ptr,c->argv[2]))
3388 addReply(c,shared.cone);
3389 else
3390 addReply(c,shared.czero);
3391 }
3392 }
3393
3394 static void scardCommand(redisClient *c) {
3395 robj *o;
3396 dict *s;
3397
3398 o = lookupKeyRead(c->db,c->argv[1]);
3399 if (o == NULL) {
3400 addReply(c,shared.czero);
3401 return;
3402 } else {
3403 if (o->type != REDIS_SET) {
3404 addReply(c,shared.wrongtypeerr);
3405 } else {
3406 s = o->ptr;
3407 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3408 dictSize(s)));
3409 }
3410 }
3411 }
3412
3413 static void spopCommand(redisClient *c) {
3414 robj *set;
3415 dictEntry *de;
3416
3417 set = lookupKeyWrite(c->db,c->argv[1]);
3418 if (set == NULL) {
3419 addReply(c,shared.nullbulk);
3420 } else {
3421 if (set->type != REDIS_SET) {
3422 addReply(c,shared.wrongtypeerr);
3423 return;
3424 }
3425 de = dictGetRandomKey(set->ptr);
3426 if (de == NULL) {
3427 addReply(c,shared.nullbulk);
3428 } else {
3429 robj *ele = dictGetEntryKey(de);
3430
3431 addReplyBulkLen(c,ele);
3432 addReply(c,ele);
3433 addReply(c,shared.crlf);
3434 dictDelete(set->ptr,ele);
3435 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3436 server.dirty++;
3437 }
3438 }
3439 }
3440
3441 static void srandmemberCommand(redisClient *c) {
3442 robj *set;
3443 dictEntry *de;
3444
3445 set = lookupKeyRead(c->db,c->argv[1]);
3446 if (set == NULL) {
3447 addReply(c,shared.nullbulk);
3448 } else {
3449 if (set->type != REDIS_SET) {
3450 addReply(c,shared.wrongtypeerr);
3451 return;
3452 }
3453 de = dictGetRandomKey(set->ptr);
3454 if (de == NULL) {
3455 addReply(c,shared.nullbulk);
3456 } else {
3457 robj *ele = dictGetEntryKey(de);
3458
3459 addReplyBulkLen(c,ele);
3460 addReply(c,ele);
3461 addReply(c,shared.crlf);
3462 }
3463 }
3464 }
3465
3466 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3467 dict **d1 = (void*) s1, **d2 = (void*) s2;
3468
3469 return dictSize(*d1)-dictSize(*d2);
3470 }
3471
3472 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3473 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3474 dictIterator *di;
3475 dictEntry *de;
3476 robj *lenobj = NULL, *dstset = NULL;
3477 int j, cardinality = 0;
3478
3479 for (j = 0; j < setsnum; j++) {
3480 robj *setobj;
3481
3482 setobj = dstkey ?
3483 lookupKeyWrite(c->db,setskeys[j]) :
3484 lookupKeyRead(c->db,setskeys[j]);
3485 if (!setobj) {
3486 zfree(dv);
3487 if (dstkey) {
3488 deleteKey(c->db,dstkey);
3489 addReply(c,shared.ok);
3490 } else {
3491 addReply(c,shared.nullmultibulk);
3492 }
3493 return;
3494 }
3495 if (setobj->type != REDIS_SET) {
3496 zfree(dv);
3497 addReply(c,shared.wrongtypeerr);
3498 return;
3499 }
3500 dv[j] = setobj->ptr;
3501 }
3502 /* Sort sets from the smallest to largest, this will improve our
3503 * algorithm's performace */
3504 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3505
3506 /* The first thing we should output is the total number of elements...
3507 * since this is a multi-bulk write, but at this stage we don't know
3508 * the intersection set size, so we use a trick, append an empty object
3509 * to the output list and save the pointer to later modify it with the
3510 * right length */
3511 if (!dstkey) {
3512 lenobj = createObject(REDIS_STRING,NULL);
3513 addReply(c,lenobj);
3514 decrRefCount(lenobj);
3515 } else {
3516 /* If we have a target key where to store the resulting set
3517 * create this key with an empty set inside */
3518 dstset = createSetObject();
3519 }
3520
3521 /* Iterate all the elements of the first (smallest) set, and test
3522 * the element against all the other sets, if at least one set does
3523 * not include the element it is discarded */
3524 di = dictGetIterator(dv[0]);
3525
3526 while((de = dictNext(di)) != NULL) {
3527 robj *ele;
3528
3529 for (j = 1; j < setsnum; j++)
3530 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3531 if (j != setsnum)
3532 continue; /* at least one set does not contain the member */
3533 ele = dictGetEntryKey(de);
3534 if (!dstkey) {
3535 addReplyBulkLen(c,ele);
3536 addReply(c,ele);
3537 addReply(c,shared.crlf);
3538 cardinality++;
3539 } else {
3540 dictAdd(dstset->ptr,ele,NULL);
3541 incrRefCount(ele);
3542 }
3543 }
3544 dictReleaseIterator(di);
3545
3546 if (dstkey) {
3547 /* Store the resulting set into the target */
3548 deleteKey(c->db,dstkey);
3549 dictAdd(c->db->dict,dstkey,dstset);
3550 incrRefCount(dstkey);
3551 }
3552
3553 if (!dstkey) {
3554 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3555 } else {
3556 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3557 dictSize((dict*)dstset->ptr)));
3558 server.dirty++;
3559 }
3560 zfree(dv);
3561 }
3562
3563 static void sinterCommand(redisClient *c) {
3564 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3565 }
3566
3567 static void sinterstoreCommand(redisClient *c) {
3568 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3569 }
3570
3571 #define REDIS_OP_UNION 0
3572 #define REDIS_OP_DIFF 1
3573
3574 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3575 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3576 dictIterator *di;
3577 dictEntry *de;
3578 robj *dstset = NULL;
3579 int j, cardinality = 0;
3580
3581 for (j = 0; j < setsnum; j++) {
3582 robj *setobj;
3583
3584 setobj = dstkey ?
3585 lookupKeyWrite(c->db,setskeys[j]) :
3586 lookupKeyRead(c->db,setskeys[j]);
3587 if (!setobj) {
3588 dv[j] = NULL;
3589 continue;
3590 }
3591 if (setobj->type != REDIS_SET) {
3592 zfree(dv);
3593 addReply(c,shared.wrongtypeerr);
3594 return;
3595 }
3596 dv[j] = setobj->ptr;
3597 }
3598
3599 /* We need a temp set object to store our union. If the dstkey
3600 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3601 * this set object will be the resulting object to set into the target key*/
3602 dstset = createSetObject();
3603
3604 /* Iterate all the elements of all the sets, add every element a single
3605 * time to the result set */
3606 for (j = 0; j < setsnum; j++) {
3607 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3608 if (!dv[j]) continue; /* non existing keys are like empty sets */
3609
3610 di = dictGetIterator(dv[j]);
3611
3612 while((de = dictNext(di)) != NULL) {
3613 robj *ele;
3614
3615 /* dictAdd will not add the same element multiple times */
3616 ele = dictGetEntryKey(de);
3617 if (op == REDIS_OP_UNION || j == 0) {
3618 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3619 incrRefCount(ele);
3620 cardinality++;
3621 }
3622 } else if (op == REDIS_OP_DIFF) {
3623 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3624 cardinality--;
3625 }
3626 }
3627 }
3628 dictReleaseIterator(di);
3629
3630 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3631 }
3632
3633 /* Output the content of the resulting set, if not in STORE mode */
3634 if (!dstkey) {
3635 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3636 di = dictGetIterator(dstset->ptr);
3637 while((de = dictNext(di)) != NULL) {
3638 robj *ele;
3639
3640 ele = dictGetEntryKey(de);
3641 addReplyBulkLen(c,ele);
3642 addReply(c,ele);
3643 addReply(c,shared.crlf);
3644 }
3645 dictReleaseIterator(di);
3646 } else {
3647 /* If we have a target key where to store the resulting set
3648 * create this key with the result set inside */
3649 deleteKey(c->db,dstkey);
3650 dictAdd(c->db->dict,dstkey,dstset);
3651 incrRefCount(dstkey);
3652 }
3653
3654 /* Cleanup */
3655 if (!dstkey) {
3656 decrRefCount(dstset);
3657 } else {
3658 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3659 dictSize((dict*)dstset->ptr)));
3660 server.dirty++;
3661 }
3662 zfree(dv);
3663 }
3664
3665 static void sunionCommand(redisClient *c) {
3666 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3667 }
3668
3669 static void sunionstoreCommand(redisClient *c) {
3670 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3671 }
3672
3673 static void sdiffCommand(redisClient *c) {
3674 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3675 }
3676
3677 static void sdiffstoreCommand(redisClient *c) {
3678 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3679 }
3680
3681 /* ==================================== ZSets =============================== */
3682
3683 /* ZSETs are ordered sets using two data structures to hold the same elements
3684 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3685 * data structure.
3686 *
3687 * The elements are added to an hash table mapping Redis objects to scores.
3688 * At the same time the elements are added to a skip list mapping scores
3689 * to Redis objects (so objects are sorted by scores in this "view"). */
3690
3691 /* This skiplist implementation is almost a C translation of the original
3692 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3693 * Alternative to Balanced Trees", modified in three ways:
3694 * a) this implementation allows for repeated values.
3695 * b) the comparison is not just by key (our 'score') but by satellite data.
3696 * c) there is a back pointer, so it's a doubly linked list with the back
3697 * pointers being only at "level 1". This allows to traverse the list
3698 * from tail to head, useful for ZREVRANGE. */
3699
3700 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3701 zskiplistNode *zn = zmalloc(sizeof(*zn));
3702
3703 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3704 zn->score = score;
3705 zn->obj = obj;
3706 return zn;
3707 }
3708
3709 static zskiplist *zslCreate(void) {
3710 int j;
3711 zskiplist *zsl;
3712
3713 zsl = zmalloc(sizeof(*zsl));
3714 zsl->level = 1;
3715 zsl->length = 0;
3716 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3717 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3718 zsl->header->forward[j] = NULL;
3719 return zsl;
3720 }
3721
3722 static void zslFreeNode(zskiplistNode *node) {
3723 decrRefCount(node->obj);
3724 zfree(node);
3725 }
3726
3727 static void zslFree(zskiplist *zsl) {
3728 zskiplistNode *node = zsl->header->forward[1], *next;
3729
3730 while(node) {
3731 next = node->forward[1];
3732 zslFreeNode(node);
3733 node = next;
3734 }
3735 }
3736
3737 static int zslRandomLevel(void) {
3738 int level = 1;
3739 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3740 level += 1;
3741 return level;
3742 }
3743
3744 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3745 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3746 int i, level;
3747
3748 x = zsl->header;
3749 for (i = zsl->level-1; i >= 0; i--) {
3750 while (x->forward[i] && x->forward[i]->score < score)
3751 x = x->forward[i];
3752 update[i] = x;
3753 }
3754 x = x->forward[1];
3755 /* we assume the key is not already inside, since we allow duplicated
3756 * scores, and the re-insertion of score and redis object should never
3757 * happpen since the caller of zslInsert() should test in the hash table
3758 * if the element is already inside or not. */
3759 level = zslRandomLevel();
3760 if (level > zsl->level) {
3761 for (i = zsl->level; i < level; i++)
3762 update[i] = zsl->header;
3763 zsl->level = level;
3764 }
3765 x = zslCreateNode(level,score,obj);
3766 for (i = 0; i < level; i++) {
3767 x->forward[i] = update[i]->forward[i];
3768 update[i]->forward[i] = x;
3769 }
3770 zsl->length++;
3771 }
3772
3773 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
3774 return 1;
3775 }
3776
3777 /* The actual Z-commands implementations */
3778
3779 static void zaddCommand(redisClient *c) {
3780 robj *zsetobj;
3781 zset *zs;
3782 double *score;
3783
3784 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
3785 if (zsetobj == NULL) {
3786 zsetobj = createZsetObject();
3787 dictAdd(c->db->dict,c->argv[1],zsetobj);
3788 incrRefCount(c->argv[1]);
3789 } else {
3790 if (zsetobj->type != REDIS_ZSET) {
3791 addReply(c,shared.wrongtypeerr);
3792 return;
3793 }
3794 }
3795 score = zmalloc(sizeof(double));
3796 *score = strtod(c->argv[2]->ptr,NULL);
3797 zs = zsetobj->ptr;
3798 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
3799 /* case 1: New element */
3800 incrRefCount(c->argv[3]); /* added to hash */
3801 zslInsert(zs->zsl,*score,c->argv[3]);
3802 incrRefCount(c->argv[3]); /* added to skiplist */
3803 server.dirty++;
3804 addReply(c,shared.cone);
3805 } else {
3806 dictEntry *de;
3807 double *oldscore;
3808
3809 /* case 2: Score update operation */
3810 de = dictFind(zs->dict,c->argv[3]);
3811 assert(de != NULL);
3812 oldscore = dictGetEntryVal(de);
3813 if (*score != *oldscore) {
3814 int deleted;
3815
3816 deleted = zslDelete(zs->zsl,*score,c->argv[3]);
3817 assert(deleted != 0);
3818 zslInsert(zs->zsl,*score,c->argv[3]);
3819 incrRefCount(c->argv[3]);
3820 server.dirty++;
3821 }
3822 addReply(c,shared.czero);
3823 }
3824 }
3825
3826 static void zrangeCommand(redisClient *c) {
3827 robj *o;
3828 int start = atoi(c->argv[2]->ptr);
3829 int end = atoi(c->argv[3]->ptr);
3830
3831 o = lookupKeyRead(c->db,c->argv[1]);
3832 if (o == NULL) {
3833 addReply(c,shared.nullmultibulk);
3834 } else {
3835 if (o->type != REDIS_ZSET) {
3836 addReply(c,shared.wrongtypeerr);
3837 } else {
3838 zset *zsetobj = o->ptr;
3839 zskiplist *zsl = zsetobj->zsl;
3840 zskiplistNode *ln;
3841
3842 int llen = zsl->length;
3843 int rangelen, j;
3844 robj *ele;
3845
3846 /* convert negative indexes */
3847 if (start < 0) start = llen+start;
3848 if (end < 0) end = llen+end;
3849 if (start < 0) start = 0;
3850 if (end < 0) end = 0;
3851
3852 /* indexes sanity checks */
3853 if (start > end || start >= llen) {
3854 /* Out of range start or start > end result in empty list */
3855 addReply(c,shared.emptymultibulk);
3856 return;
3857 }
3858 if (end >= llen) end = llen-1;
3859 rangelen = (end-start)+1;
3860
3861 /* Return the result in form of a multi-bulk reply */
3862 ln = zsl->header->forward[0];
3863 while (start--)
3864 ln = ln->forward[0];
3865
3866 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3867 for (j = 0; j < rangelen; j++) {
3868 ele = ln->obj;
3869 addReplyBulkLen(c,ele);
3870 addReply(c,ele);
3871 addReply(c,shared.crlf);
3872 ln = ln->forward[0];
3873 }
3874 }
3875 }
3876 }
3877
3878 /* ========================= Non type-specific commands ==================== */
3879
3880 static void flushdbCommand(redisClient *c) {
3881 server.dirty += dictSize(c->db->dict);
3882 dictEmpty(c->db->dict);
3883 dictEmpty(c->db->expires);
3884 addReply(c,shared.ok);
3885 }
3886
3887 static void flushallCommand(redisClient *c) {
3888 server.dirty += emptyDb();
3889 addReply(c,shared.ok);
3890 rdbSave(server.dbfilename);
3891 server.dirty++;
3892 }
3893
3894 static redisSortOperation *createSortOperation(int type, robj *pattern) {
3895 redisSortOperation *so = zmalloc(sizeof(*so));
3896 so->type = type;
3897 so->pattern = pattern;
3898 return so;
3899 }
3900
3901 /* Return the value associated to the key with a name obtained
3902 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3903 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3904 char *p;
3905 sds spat, ssub;
3906 robj keyobj;
3907 int prefixlen, sublen, postfixlen;
3908 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3909 struct {
3910 long len;
3911 long free;
3912 char buf[REDIS_SORTKEY_MAX+1];
3913 } keyname;
3914
3915 if (subst->encoding == REDIS_ENCODING_RAW)
3916 incrRefCount(subst);
3917 else {
3918 subst = getDecodedObject(subst);
3919 }
3920
3921 spat = pattern->ptr;
3922 ssub = subst->ptr;
3923 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3924 p = strchr(spat,'*');
3925 if (!p) return NULL;
3926
3927 prefixlen = p-spat;
3928 sublen = sdslen(ssub);
3929 postfixlen = sdslen(spat)-(prefixlen+1);
3930 memcpy(keyname.buf,spat,prefixlen);
3931 memcpy(keyname.buf+prefixlen,ssub,sublen);
3932 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3933 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3934 keyname.len = prefixlen+sublen+postfixlen;
3935
3936 keyobj.refcount = 1;
3937 keyobj.type = REDIS_STRING;
3938 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3939
3940 decrRefCount(subst);
3941
3942 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3943 return lookupKeyRead(db,&keyobj);
3944 }
3945
3946 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3947 * the additional parameter is not standard but a BSD-specific we have to
3948 * pass sorting parameters via the global 'server' structure */
3949 static int sortCompare(const void *s1, const void *s2) {
3950 const redisSortObject *so1 = s1, *so2 = s2;
3951 int cmp;
3952
3953 if (!server.sort_alpha) {
3954 /* Numeric sorting. Here it's trivial as we precomputed scores */
3955 if (so1->u.score > so2->u.score) {
3956 cmp = 1;
3957 } else if (so1->u.score < so2->u.score) {
3958 cmp = -1;
3959 } else {
3960 cmp = 0;
3961 }
3962 } else {
3963 /* Alphanumeric sorting */
3964 if (server.sort_bypattern) {
3965 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3966 /* At least one compare object is NULL */
3967 if (so1->u.cmpobj == so2->u.cmpobj)
3968 cmp = 0;
3969 else if (so1->u.cmpobj == NULL)
3970 cmp = -1;
3971 else
3972 cmp = 1;
3973 } else {
3974 /* We have both the objects, use strcoll */
3975 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3976 }
3977 } else {
3978 /* Compare elements directly */
3979 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
3980 so2->obj->encoding == REDIS_ENCODING_RAW) {
3981 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3982 } else {
3983 robj *dec1, *dec2;
3984
3985 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
3986 so1->obj : getDecodedObject(so1->obj);
3987 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
3988 so2->obj : getDecodedObject(so2->obj);
3989 cmp = strcoll(dec1->ptr,dec2->ptr);
3990 if (dec1 != so1->obj) decrRefCount(dec1);
3991 if (dec2 != so2->obj) decrRefCount(dec2);
3992 }
3993 }
3994 }
3995 return server.sort_desc ? -cmp : cmp;
3996 }
3997
3998 /* The SORT command is the most complex command in Redis. Warning: this code
3999 * is optimized for speed and a bit less for readability */
4000 static void sortCommand(redisClient *c) {
4001 list *operations;
4002 int outputlen = 0;
4003 int desc = 0, alpha = 0;
4004 int limit_start = 0, limit_count = -1, start, end;
4005 int j, dontsort = 0, vectorlen;
4006 int getop = 0; /* GET operation counter */
4007 robj *sortval, *sortby = NULL;
4008 redisSortObject *vector; /* Resulting vector to sort */
4009
4010 /* Lookup the key to sort. It must be of the right types */
4011 sortval = lookupKeyRead(c->db,c->argv[1]);
4012 if (sortval == NULL) {
4013 addReply(c,shared.nokeyerr);
4014 return;
4015 }
4016 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4017 addReply(c,shared.wrongtypeerr);
4018 return;
4019 }
4020
4021 /* Create a list of operations to perform for every sorted element.
4022 * Operations can be GET/DEL/INCR/DECR */
4023 operations = listCreate();
4024 listSetFreeMethod(operations,zfree);
4025 j = 2;
4026
4027 /* Now we need to protect sortval incrementing its count, in the future
4028 * SORT may have options able to overwrite/delete keys during the sorting
4029 * and the sorted key itself may get destroied */
4030 incrRefCount(sortval);
4031
4032 /* The SORT command has an SQL-alike syntax, parse it */
4033 while(j < c->argc) {
4034 int leftargs = c->argc-j-1;
4035 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4036 desc = 0;
4037 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4038 desc = 1;
4039 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4040 alpha = 1;
4041 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4042 limit_start = atoi(c->argv[j+1]->ptr);
4043 limit_count = atoi(c->argv[j+2]->ptr);
4044 j+=2;
4045 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4046 sortby = c->argv[j+1];
4047 /* If the BY pattern does not contain '*', i.e. it is constant,
4048 * we don't need to sort nor to lookup the weight keys. */
4049 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4050 j++;
4051 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4052 listAddNodeTail(operations,createSortOperation(
4053 REDIS_SORT_GET,c->argv[j+1]));
4054 getop++;
4055 j++;
4056 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
4057 listAddNodeTail(operations,createSortOperation(
4058 REDIS_SORT_DEL,c->argv[j+1]));
4059 j++;
4060 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
4061 listAddNodeTail(operations,createSortOperation(
4062 REDIS_SORT_INCR,c->argv[j+1]));
4063 j++;
4064 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4065 listAddNodeTail(operations,createSortOperation(
4066 REDIS_SORT_DECR,c->argv[j+1]));
4067 j++;
4068 } else {
4069 decrRefCount(sortval);
4070 listRelease(operations);
4071 addReply(c,shared.syntaxerr);
4072 return;
4073 }
4074 j++;
4075 }
4076
4077 /* Load the sorting vector with all the objects to sort */
4078 vectorlen = (sortval->type == REDIS_LIST) ?
4079 listLength((list*)sortval->ptr) :
4080 dictSize((dict*)sortval->ptr);
4081 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4082 j = 0;
4083 if (sortval->type == REDIS_LIST) {
4084 list *list = sortval->ptr;
4085 listNode *ln;
4086
4087 listRewind(list);
4088 while((ln = listYield(list))) {
4089 robj *ele = ln->value;
4090 vector[j].obj = ele;
4091 vector[j].u.score = 0;
4092 vector[j].u.cmpobj = NULL;
4093 j++;
4094 }
4095 } else {
4096 dict *set = sortval->ptr;
4097 dictIterator *di;
4098 dictEntry *setele;
4099
4100 di = dictGetIterator(set);
4101 while((setele = dictNext(di)) != NULL) {
4102 vector[j].obj = dictGetEntryKey(setele);
4103 vector[j].u.score = 0;
4104 vector[j].u.cmpobj = NULL;
4105 j++;
4106 }
4107 dictReleaseIterator(di);
4108 }
4109 assert(j == vectorlen);
4110
4111 /* Now it's time to load the right scores in the sorting vector */
4112 if (dontsort == 0) {
4113 for (j = 0; j < vectorlen; j++) {
4114 if (sortby) {
4115 robj *byval;
4116
4117 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4118 if (!byval || byval->type != REDIS_STRING) continue;
4119 if (alpha) {
4120 if (byval->encoding == REDIS_ENCODING_RAW) {
4121 vector[j].u.cmpobj = byval;
4122 incrRefCount(byval);
4123 } else {
4124 vector[j].u.cmpobj = getDecodedObject(byval);
4125 }
4126 } else {
4127 if (byval->encoding == REDIS_ENCODING_RAW) {
4128 vector[j].u.score = strtod(byval->ptr,NULL);
4129 } else {
4130 if (byval->encoding == REDIS_ENCODING_INT) {
4131 vector[j].u.score = (long)byval->ptr;
4132 } else
4133 assert(1 != 1);
4134 }
4135 }
4136 } else {
4137 if (!alpha) {
4138 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4139 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4140 else {
4141 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4142 vector[j].u.score = (long) vector[j].obj->ptr;
4143 else
4144 assert(1 != 1);
4145 }
4146 }
4147 }
4148 }
4149 }
4150
4151 /* We are ready to sort the vector... perform a bit of sanity check
4152 * on the LIMIT option too. We'll use a partial version of quicksort. */
4153 start = (limit_start < 0) ? 0 : limit_start;
4154 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4155 if (start >= vectorlen) {
4156 start = vectorlen-1;
4157 end = vectorlen-2;
4158 }
4159 if (end >= vectorlen) end = vectorlen-1;
4160
4161 if (dontsort == 0) {
4162 server.sort_desc = desc;
4163 server.sort_alpha = alpha;
4164 server.sort_bypattern = sortby ? 1 : 0;
4165 if (sortby && (start != 0 || end != vectorlen-1))
4166 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4167 else
4168 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4169 }
4170
4171 /* Send command output to the output buffer, performing the specified
4172 * GET/DEL/INCR/DECR operations if any. */
4173 outputlen = getop ? getop*(end-start+1) : end-start+1;
4174 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4175 for (j = start; j <= end; j++) {
4176 listNode *ln;
4177 if (!getop) {
4178 addReplyBulkLen(c,vector[j].obj);
4179 addReply(c,vector[j].obj);
4180 addReply(c,shared.crlf);
4181 }
4182 listRewind(operations);
4183 while((ln = listYield(operations))) {
4184 redisSortOperation *sop = ln->value;
4185 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4186 vector[j].obj);
4187
4188 if (sop->type == REDIS_SORT_GET) {
4189 if (!val || val->type != REDIS_STRING) {
4190 addReply(c,shared.nullbulk);
4191 } else {
4192 addReplyBulkLen(c,val);
4193 addReply(c,val);
4194 addReply(c,shared.crlf);
4195 }
4196 } else if (sop->type == REDIS_SORT_DEL) {
4197 /* TODO */
4198 }
4199 }
4200 }
4201
4202 /* Cleanup */
4203 decrRefCount(sortval);
4204 listRelease(operations);
4205 for (j = 0; j < vectorlen; j++) {
4206 if (sortby && alpha && vector[j].u.cmpobj)
4207 decrRefCount(vector[j].u.cmpobj);
4208 }
4209 zfree(vector);
4210 }
4211
4212 static void infoCommand(redisClient *c) {
4213 sds info;
4214 time_t uptime = time(NULL)-server.stat_starttime;
4215 int j;
4216
4217 info = sdscatprintf(sdsempty(),
4218 "redis_version:%s\r\n"
4219 "arch_bits:%s\r\n"
4220 "uptime_in_seconds:%d\r\n"
4221 "uptime_in_days:%d\r\n"
4222 "connected_clients:%d\r\n"
4223 "connected_slaves:%d\r\n"
4224 "used_memory:%zu\r\n"
4225 "changes_since_last_save:%lld\r\n"
4226 "bgsave_in_progress:%d\r\n"
4227 "last_save_time:%d\r\n"
4228 "total_connections_received:%lld\r\n"
4229 "total_commands_processed:%lld\r\n"
4230 "role:%s\r\n"
4231 ,REDIS_VERSION,
4232 (sizeof(long) == 8) ? "64" : "32",
4233 uptime,
4234 uptime/(3600*24),
4235 listLength(server.clients)-listLength(server.slaves),
4236 listLength(server.slaves),
4237 server.usedmemory,
4238 server.dirty,
4239 server.bgsaveinprogress,
4240 server.lastsave,
4241 server.stat_numconnections,
4242 server.stat_numcommands,
4243 server.masterhost == NULL ? "master" : "slave"
4244 );
4245 if (server.masterhost) {
4246 info = sdscatprintf(info,
4247 "master_host:%s\r\n"
4248 "master_port:%d\r\n"
4249 "master_link_status:%s\r\n"
4250 "master_last_io_seconds_ago:%d\r\n"
4251 ,server.masterhost,
4252 server.masterport,
4253 (server.replstate == REDIS_REPL_CONNECTED) ?
4254 "up" : "down",
4255 (int)(time(NULL)-server.master->lastinteraction)
4256 );
4257 }
4258 for (j = 0; j < server.dbnum; j++) {
4259 long long keys, vkeys;
4260
4261 keys = dictSize(server.db[j].dict);
4262 vkeys = dictSize(server.db[j].expires);
4263 if (keys || vkeys) {
4264 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4265 j, keys, vkeys);
4266 }
4267 }
4268 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4269 addReplySds(c,info);
4270 addReply(c,shared.crlf);
4271 }
4272
4273 static void monitorCommand(redisClient *c) {
4274 /* ignore MONITOR if aleady slave or in monitor mode */
4275 if (c->flags & REDIS_SLAVE) return;
4276
4277 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4278 c->slaveseldb = 0;
4279 listAddNodeTail(server.monitors,c);
4280 addReply(c,shared.ok);
4281 }
4282
4283 /* ================================= Expire ================================= */
4284 static int removeExpire(redisDb *db, robj *key) {
4285 if (dictDelete(db->expires,key) == DICT_OK) {
4286 return 1;
4287 } else {
4288 return 0;
4289 }
4290 }
4291
4292 static int setExpire(redisDb *db, robj *key, time_t when) {
4293 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4294 return 0;
4295 } else {
4296 incrRefCount(key);
4297 return 1;
4298 }
4299 }
4300
4301 /* Return the expire time of the specified key, or -1 if no expire
4302 * is associated with this key (i.e. the key is non volatile) */
4303 static time_t getExpire(redisDb *db, robj *key) {
4304 dictEntry *de;
4305
4306 /* No expire? return ASAP */
4307 if (dictSize(db->expires) == 0 ||
4308 (de = dictFind(db->expires,key)) == NULL) return -1;
4309
4310 return (time_t) dictGetEntryVal(de);
4311 }
4312
4313 static int expireIfNeeded(redisDb *db, robj *key) {
4314 time_t when;
4315 dictEntry *de;
4316
4317 /* No expire? return ASAP */
4318 if (dictSize(db->expires) == 0 ||
4319 (de = dictFind(db->expires,key)) == NULL) return 0;
4320
4321 /* Lookup the expire */
4322 when = (time_t) dictGetEntryVal(de);
4323 if (time(NULL) <= when) return 0;
4324
4325 /* Delete the key */
4326 dictDelete(db->expires,key);
4327 return dictDelete(db->dict,key) == DICT_OK;
4328 }
4329
4330 static int deleteIfVolatile(redisDb *db, robj *key) {
4331 dictEntry *de;
4332
4333 /* No expire? return ASAP */
4334 if (dictSize(db->expires) == 0 ||
4335 (de = dictFind(db->expires,key)) == NULL) return 0;
4336
4337 /* Delete the key */
4338 server.dirty++;
4339 dictDelete(db->expires,key);
4340 return dictDelete(db->dict,key) == DICT_OK;
4341 }
4342
4343 static void expireCommand(redisClient *c) {
4344 dictEntry *de;
4345 int seconds = atoi(c->argv[2]->ptr);
4346
4347 de = dictFind(c->db->dict,c->argv[1]);
4348 if (de == NULL) {
4349 addReply(c,shared.czero);
4350 return;
4351 }
4352 if (seconds <= 0) {
4353 addReply(c, shared.czero);
4354 return;
4355 } else {
4356 time_t when = time(NULL)+seconds;
4357 if (setExpire(c->db,c->argv[1],when)) {
4358 addReply(c,shared.cone);
4359 server.dirty++;
4360 } else {
4361 addReply(c,shared.czero);
4362 }
4363 return;
4364 }
4365 }
4366
4367 static void ttlCommand(redisClient *c) {
4368 time_t expire;
4369 int ttl = -1;
4370
4371 expire = getExpire(c->db,c->argv[1]);
4372 if (expire != -1) {
4373 ttl = (int) (expire-time(NULL));
4374 if (ttl < 0) ttl = -1;
4375 }
4376 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4377 }
4378
4379 static void msetGenericCommand(redisClient *c, int nx) {
4380 int j;
4381
4382 if ((c->argc % 2) == 0) {
4383 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4384 return;
4385 }
4386 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4387 * set nothing at all if at least one already key exists. */
4388 if (nx) {
4389 for (j = 1; j < c->argc; j += 2) {
4390 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4391 addReply(c, shared.czero);
4392 return;
4393 }
4394 }
4395 }
4396
4397 for (j = 1; j < c->argc; j += 2) {
4398 int retval;
4399
4400 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4401 if (retval == DICT_ERR) {
4402 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4403 incrRefCount(c->argv[j+1]);
4404 } else {
4405 incrRefCount(c->argv[j]);
4406 incrRefCount(c->argv[j+1]);
4407 }
4408 removeExpire(c->db,c->argv[j]);
4409 }
4410 server.dirty += (c->argc-1)/2;
4411 addReply(c, nx ? shared.cone : shared.ok);
4412 }
4413
4414 static void msetCommand(redisClient *c) {
4415 msetGenericCommand(c,0);
4416 }
4417
4418 static void msetnxCommand(redisClient *c) {
4419 msetGenericCommand(c,1);
4420 }
4421
4422 /* =============================== Replication ============================= */
4423
4424 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4425 ssize_t nwritten, ret = size;
4426 time_t start = time(NULL);
4427
4428 timeout++;
4429 while(size) {
4430 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4431 nwritten = write(fd,ptr,size);
4432 if (nwritten == -1) return -1;
4433 ptr += nwritten;
4434 size -= nwritten;
4435 }
4436 if ((time(NULL)-start) > timeout) {
4437 errno = ETIMEDOUT;
4438 return -1;
4439 }
4440 }
4441 return ret;
4442 }
4443
4444 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4445 ssize_t nread, totread = 0;
4446 time_t start = time(NULL);
4447
4448 timeout++;
4449 while(size) {
4450 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4451 nread = read(fd,ptr,size);
4452 if (nread == -1) return -1;
4453 ptr += nread;
4454 size -= nread;
4455 totread += nread;
4456 }
4457 if ((time(NULL)-start) > timeout) {
4458 errno = ETIMEDOUT;
4459 return -1;
4460 }
4461 }
4462 return totread;
4463 }
4464
4465 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4466 ssize_t nread = 0;
4467
4468 size--;
4469 while(size) {
4470 char c;
4471
4472 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4473 if (c == '\n') {
4474 *ptr = '\0';
4475 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4476 return nread;
4477 } else {
4478 *ptr++ = c;
4479 *ptr = '\0';
4480 nread++;
4481 }
4482 }
4483 return nread;
4484 }
4485
4486 static void syncCommand(redisClient *c) {
4487 /* ignore SYNC if aleady slave or in monitor mode */
4488 if (c->flags & REDIS_SLAVE) return;
4489
4490 /* SYNC can't be issued when the server has pending data to send to
4491 * the client about already issued commands. We need a fresh reply
4492 * buffer registering the differences between the BGSAVE and the current
4493 * dataset, so that we can copy to other slaves if needed. */
4494 if (listLength(c->reply) != 0) {
4495 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4496 return;
4497 }
4498
4499 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4500 /* Here we need to check if there is a background saving operation
4501 * in progress, or if it is required to start one */
4502 if (server.bgsaveinprogress) {
4503 /* Ok a background save is in progress. Let's check if it is a good
4504 * one for replication, i.e. if there is another slave that is
4505 * registering differences since the server forked to save */
4506 redisClient *slave;
4507 listNode *ln;
4508
4509 listRewind(server.slaves);
4510 while((ln = listYield(server.slaves))) {
4511 slave = ln->value;
4512 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4513 }
4514 if (ln) {
4515 /* Perfect, the server is already registering differences for
4516 * another slave. Set the right state, and copy the buffer. */
4517 listRelease(c->reply);
4518 c->reply = listDup(slave->reply);
4519 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4520 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4521 } else {
4522 /* No way, we need to wait for the next BGSAVE in order to
4523 * register differences */
4524 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4525 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4526 }
4527 } else {
4528 /* Ok we don't have a BGSAVE in progress, let's start one */
4529 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4530 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4531 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4532 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4533 return;
4534 }
4535 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4536 }
4537 c->repldbfd = -1;
4538 c->flags |= REDIS_SLAVE;
4539 c->slaveseldb = 0;
4540 listAddNodeTail(server.slaves,c);
4541 return;
4542 }
4543
4544 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4545 redisClient *slave = privdata;
4546 REDIS_NOTUSED(el);
4547 REDIS_NOTUSED(mask);
4548 char buf[REDIS_IOBUF_LEN];
4549 ssize_t nwritten, buflen;
4550
4551 if (slave->repldboff == 0) {
4552 /* Write the bulk write count before to transfer the DB. In theory here
4553 * we don't know how much room there is in the output buffer of the
4554 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4555 * operations) will never be smaller than the few bytes we need. */
4556 sds bulkcount;
4557
4558 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4559 slave->repldbsize);
4560 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4561 {
4562 sdsfree(bulkcount);
4563 freeClient(slave);
4564 return;
4565 }
4566 sdsfree(bulkcount);
4567 }
4568 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4569 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4570 if (buflen <= 0) {
4571 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4572 (buflen == 0) ? "premature EOF" : strerror(errno));
4573 freeClient(slave);
4574 return;
4575 }
4576 if ((nwritten = write(fd,buf,buflen)) == -1) {
4577 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4578 strerror(errno));
4579 freeClient(slave);
4580 return;
4581 }
4582 slave->repldboff += nwritten;
4583 if (slave->repldboff == slave->repldbsize) {
4584 close(slave->repldbfd);
4585 slave->repldbfd = -1;
4586 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4587 slave->replstate = REDIS_REPL_ONLINE;
4588 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4589 sendReplyToClient, slave, NULL) == AE_ERR) {
4590 freeClient(slave);
4591 return;
4592 }
4593 addReplySds(slave,sdsempty());
4594 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4595 }
4596 }
4597
4598 /* This function is called at the end of every backgrond saving.
4599 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4600 * otherwise REDIS_ERR is passed to the function.
4601 *
4602 * The goal of this function is to handle slaves waiting for a successful
4603 * background saving in order to perform non-blocking synchronization. */
4604 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4605 listNode *ln;
4606 int startbgsave = 0;
4607
4608 listRewind(server.slaves);
4609 while((ln = listYield(server.slaves))) {
4610 redisClient *slave = ln->value;
4611
4612 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4613 startbgsave = 1;
4614 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4615 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4616 struct redis_stat buf;
4617
4618 if (bgsaveerr != REDIS_OK) {
4619 freeClient(slave);
4620 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4621 continue;
4622 }
4623 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4624 redis_fstat(slave->repldbfd,&buf) == -1) {
4625 freeClient(slave);
4626 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4627 continue;
4628 }
4629 slave->repldboff = 0;
4630 slave->repldbsize = buf.st_size;
4631 slave->replstate = REDIS_REPL_SEND_BULK;
4632 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4633 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4634 freeClient(slave);
4635 continue;
4636 }
4637 }
4638 }
4639 if (startbgsave) {
4640 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4641 listRewind(server.slaves);
4642 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4643 while((ln = listYield(server.slaves))) {
4644 redisClient *slave = ln->value;
4645
4646 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4647 freeClient(slave);
4648 }
4649 }
4650 }
4651 }
4652
4653 static int syncWithMaster(void) {
4654 char buf[1024], tmpfile[256];
4655 int dumpsize;
4656 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4657 int dfd;
4658
4659 if (fd == -1) {
4660 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4661 strerror(errno));
4662 return REDIS_ERR;
4663 }
4664 /* Issue the SYNC command */
4665 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4666 close(fd);
4667 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4668 strerror(errno));
4669 return REDIS_ERR;
4670 }
4671 /* Read the bulk write count */
4672 if (syncReadLine(fd,buf,1024,3600) == -1) {
4673 close(fd);
4674 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
4675 strerror(errno));
4676 return REDIS_ERR;
4677 }
4678 dumpsize = atoi(buf+1);
4679 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4680 /* Read the bulk write data on a temp file */
4681 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4682 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4683 if (dfd == -1) {
4684 close(fd);
4685 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
4686 return REDIS_ERR;
4687 }
4688 while(dumpsize) {
4689 int nread, nwritten;
4690
4691 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
4692 if (nread == -1) {
4693 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
4694 strerror(errno));
4695 close(fd);
4696 close(dfd);
4697 return REDIS_ERR;
4698 }
4699 nwritten = write(dfd,buf,nread);
4700 if (nwritten == -1) {
4701 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
4702 close(fd);
4703 close(dfd);
4704 return REDIS_ERR;
4705 }
4706 dumpsize -= nread;
4707 }
4708 close(dfd);
4709 if (rename(tmpfile,server.dbfilename) == -1) {
4710 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
4711 unlink(tmpfile);
4712 close(fd);
4713 return REDIS_ERR;
4714 }
4715 emptyDb();
4716 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4717 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4718 close(fd);
4719 return REDIS_ERR;
4720 }
4721 server.master = createClient(fd);
4722 server.master->flags |= REDIS_MASTER;
4723 server.replstate = REDIS_REPL_CONNECTED;
4724 return REDIS_OK;
4725 }
4726
4727 static void slaveofCommand(redisClient *c) {
4728 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4729 !strcasecmp(c->argv[2]->ptr,"one")) {
4730 if (server.masterhost) {
4731 sdsfree(server.masterhost);
4732 server.masterhost = NULL;
4733 if (server.master) freeClient(server.master);
4734 server.replstate = REDIS_REPL_NONE;
4735 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4736 }
4737 } else {
4738 sdsfree(server.masterhost);
4739 server.masterhost = sdsdup(c->argv[1]->ptr);
4740 server.masterport = atoi(c->argv[2]->ptr);
4741 if (server.master) freeClient(server.master);
4742 server.replstate = REDIS_REPL_CONNECT;
4743 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4744 server.masterhost, server.masterport);
4745 }
4746 addReply(c,shared.ok);
4747 }
4748
4749 /* ============================ Maxmemory directive ======================== */
4750
4751 /* This function gets called when 'maxmemory' is set on the config file to limit
4752 * the max memory used by the server, and we are out of memory.
4753 * This function will try to, in order:
4754 *
4755 * - Free objects from the free list
4756 * - Try to remove keys with an EXPIRE set
4757 *
4758 * It is not possible to free enough memory to reach used-memory < maxmemory
4759 * the server will start refusing commands that will enlarge even more the
4760 * memory usage.
4761 */
4762 static void freeMemoryIfNeeded(void) {
4763 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4764 if (listLength(server.objfreelist)) {
4765 robj *o;
4766
4767 listNode *head = listFirst(server.objfreelist);
4768 o = listNodeValue(head);
4769 listDelNode(server.objfreelist,head);
4770 zfree(o);
4771 } else {
4772 int j, k, freed = 0;
4773
4774 for (j = 0; j < server.dbnum; j++) {
4775 int minttl = -1;
4776 robj *minkey = NULL;
4777 struct dictEntry *de;
4778
4779 if (dictSize(server.db[j].expires)) {
4780 freed = 1;
4781 /* From a sample of three keys drop the one nearest to
4782 * the natural expire */
4783 for (k = 0; k < 3; k++) {
4784 time_t t;
4785
4786 de = dictGetRandomKey(server.db[j].expires);
4787 t = (time_t) dictGetEntryVal(de);
4788 if (minttl == -1 || t < minttl) {
4789 minkey = dictGetEntryKey(de);
4790 minttl = t;
4791 }
4792 }
4793 deleteKey(server.db+j,minkey);
4794 }
4795 }
4796 if (!freed) return; /* nothing to free... */
4797 }
4798 }
4799 }
4800
4801 /* ================================= Debugging ============================== */
4802
4803 static void debugCommand(redisClient *c) {
4804 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4805 *((char*)-1) = 'x';
4806 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4807 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4808 robj *key, *val;
4809
4810 if (!de) {
4811 addReply(c,shared.nokeyerr);
4812 return;
4813 }
4814 key = dictGetEntryKey(de);
4815 val = dictGetEntryVal(de);
4816 addReplySds(c,sdscatprintf(sdsempty(),
4817 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4818 key, key->refcount, val, val->refcount, val->encoding));
4819 } else {
4820 addReplySds(c,sdsnew(
4821 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4822 }
4823 }
4824
4825 #ifdef HAVE_BACKTRACE
4826 static struct redisFunctionSym symsTable[] = {
4827 {"compareStringObjects", (unsigned long)compareStringObjects},
4828 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
4829 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
4830 {"dictEncObjHash", (unsigned long)dictEncObjHash},
4831 {"incrDecrCommand", (unsigned long)incrDecrCommand},
4832 {"freeStringObject", (unsigned long)freeStringObject},
4833 {"freeListObject", (unsigned long)freeListObject},
4834 {"freeSetObject", (unsigned long)freeSetObject},
4835 {"decrRefCount", (unsigned long)decrRefCount},
4836 {"createObject", (unsigned long)createObject},
4837 {"freeClient", (unsigned long)freeClient},
4838 {"rdbLoad", (unsigned long)rdbLoad},
4839 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
4840 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
4841 {"addReply", (unsigned long)addReply},
4842 {"addReplySds", (unsigned long)addReplySds},
4843 {"incrRefCount", (unsigned long)incrRefCount},
4844 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
4845 {"createStringObject", (unsigned long)createStringObject},
4846 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
4847 {"syncWithMaster", (unsigned long)syncWithMaster},
4848 {"tryObjectSharing", (unsigned long)tryObjectSharing},
4849 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
4850 {"getDecodedObject", (unsigned long)getDecodedObject},
4851 {"removeExpire", (unsigned long)removeExpire},
4852 {"expireIfNeeded", (unsigned long)expireIfNeeded},
4853 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
4854 {"deleteKey", (unsigned long)deleteKey},
4855 {"getExpire", (unsigned long)getExpire},
4856 {"setExpire", (unsigned long)setExpire},
4857 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
4858 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
4859 {"authCommand", (unsigned long)authCommand},
4860 {"pingCommand", (unsigned long)pingCommand},
4861 {"echoCommand", (unsigned long)echoCommand},
4862 {"setCommand", (unsigned long)setCommand},
4863 {"setnxCommand", (unsigned long)setnxCommand},
4864 {"getCommand", (unsigned long)getCommand},
4865 {"delCommand", (unsigned long)delCommand},
4866 {"existsCommand", (unsigned long)existsCommand},
4867 {"incrCommand", (unsigned long)incrCommand},
4868 {"decrCommand", (unsigned long)decrCommand},
4869 {"incrbyCommand", (unsigned long)incrbyCommand},
4870 {"decrbyCommand", (unsigned long)decrbyCommand},
4871 {"selectCommand", (unsigned long)selectCommand},
4872 {"randomkeyCommand", (unsigned long)randomkeyCommand},
4873 {"keysCommand", (unsigned long)keysCommand},
4874 {"dbsizeCommand", (unsigned long)dbsizeCommand},
4875 {"lastsaveCommand", (unsigned long)lastsaveCommand},
4876 {"saveCommand", (unsigned long)saveCommand},
4877 {"bgsaveCommand", (unsigned long)bgsaveCommand},
4878 {"shutdownCommand", (unsigned long)shutdownCommand},
4879 {"moveCommand", (unsigned long)moveCommand},
4880 {"renameCommand", (unsigned long)renameCommand},
4881 {"renamenxCommand", (unsigned long)renamenxCommand},
4882 {"lpushCommand", (unsigned long)lpushCommand},
4883 {"rpushCommand", (unsigned long)rpushCommand},
4884 {"lpopCommand", (unsigned long)lpopCommand},
4885 {"rpopCommand", (unsigned long)rpopCommand},
4886 {"llenCommand", (unsigned long)llenCommand},
4887 {"lindexCommand", (unsigned long)lindexCommand},
4888 {"lrangeCommand", (unsigned long)lrangeCommand},
4889 {"ltrimCommand", (unsigned long)ltrimCommand},
4890 {"typeCommand", (unsigned long)typeCommand},
4891 {"lsetCommand", (unsigned long)lsetCommand},
4892 {"saddCommand", (unsigned long)saddCommand},
4893 {"sremCommand", (unsigned long)sremCommand},
4894 {"smoveCommand", (unsigned long)smoveCommand},
4895 {"sismemberCommand", (unsigned long)sismemberCommand},
4896 {"scardCommand", (unsigned long)scardCommand},
4897 {"spopCommand", (unsigned long)spopCommand},
4898 {"srandmemberCommand", (unsigned long)srandmemberCommand},
4899 {"sinterCommand", (unsigned long)sinterCommand},
4900 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
4901 {"sunionCommand", (unsigned long)sunionCommand},
4902 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
4903 {"sdiffCommand", (unsigned long)sdiffCommand},
4904 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
4905 {"syncCommand", (unsigned long)syncCommand},
4906 {"flushdbCommand", (unsigned long)flushdbCommand},
4907 {"flushallCommand", (unsigned long)flushallCommand},
4908 {"sortCommand", (unsigned long)sortCommand},
4909 {"lremCommand", (unsigned long)lremCommand},
4910 {"infoCommand", (unsigned long)infoCommand},
4911 {"mgetCommand", (unsigned long)mgetCommand},
4912 {"monitorCommand", (unsigned long)monitorCommand},
4913 {"expireCommand", (unsigned long)expireCommand},
4914 {"getsetCommand", (unsigned long)getsetCommand},
4915 {"ttlCommand", (unsigned long)ttlCommand},
4916 {"slaveofCommand", (unsigned long)slaveofCommand},
4917 {"debugCommand", (unsigned long)debugCommand},
4918 {"processCommand", (unsigned long)processCommand},
4919 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
4920 {"readQueryFromClient", (unsigned long)readQueryFromClient},
4921 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
4922 {"msetGenericCommand", (unsigned long)msetGenericCommand},
4923 {"msetCommand", (unsigned long)msetCommand},
4924 {"msetnxCommand", (unsigned long)msetnxCommand},
4925 {"zslCreateNode", (unsigned long)zslCreateNode},
4926 {"zslCreate", (unsigned long)zslCreate},
4927 {"zslFreeNode",(unsigned long)zslFreeNode},
4928 {"zslFree",(unsigned long)zslFree},
4929 {"zslRandomLevel",(unsigned long)zslRandomLevel},
4930 {"zslInsert",(unsigned long)zslInsert},
4931 {"zslDelete",(unsigned long)zslDelete},
4932 {"createZsetObject",(unsigned long)createZsetObject},
4933 {"zaddCommand",(unsigned long)zaddCommand},
4934 {"zrangeCommand",(unsigned long)zrangeCommand},
4935 {NULL,0}
4936 };
4937
4938 /* This function try to convert a pointer into a function name. It's used in
4939 * oreder to provide a backtrace under segmentation fault that's able to
4940 * display functions declared as static (otherwise the backtrace is useless). */
4941 static char *findFuncName(void *pointer, unsigned long *offset){
4942 int i, ret = -1;
4943 unsigned long off, minoff = 0;
4944
4945 /* Try to match against the Symbol with the smallest offset */
4946 for (i=0; symsTable[i].pointer; i++) {
4947 unsigned long lp = (unsigned long) pointer;
4948
4949 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
4950 off=lp-symsTable[i].pointer;
4951 if (ret < 0 || off < minoff) {
4952 minoff=off;
4953 ret=i;
4954 }
4955 }
4956 }
4957 if (ret == -1) return NULL;
4958 *offset = minoff;
4959 return symsTable[ret].name;
4960 }
4961
4962 static void *getMcontextEip(ucontext_t *uc) {
4963 #if defined(__FreeBSD__)
4964 return (void*) uc->uc_mcontext.mc_eip;
4965 #elif defined(__dietlibc__)
4966 return (void*) uc->uc_mcontext.eip;
4967 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
4968 return (void*) uc->uc_mcontext->__ss.__eip;
4969 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
4970 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
4971 return (void*) uc->uc_mcontext->__ss.__rip;
4972 #else
4973 return (void*) uc->uc_mcontext->__ss.__eip;
4974 #endif
4975 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
4976 return (void*) uc->uc_mcontext.gregs[REG_EIP];
4977 #elif defined(__ia64__) /* Linux IA64 */
4978 return (void*) uc->uc_mcontext.sc_ip;
4979 #else
4980 return NULL;
4981 #endif
4982 }
4983
4984 static void segvHandler(int sig, siginfo_t *info, void *secret) {
4985 void *trace[100];
4986 char **messages = NULL;
4987 int i, trace_size = 0;
4988 unsigned long offset=0;
4989 time_t uptime = time(NULL)-server.stat_starttime;
4990 ucontext_t *uc = (ucontext_t*) secret;
4991 REDIS_NOTUSED(info);
4992
4993 redisLog(REDIS_WARNING,
4994 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
4995 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
4996 "redis_version:%s; "
4997 "uptime_in_seconds:%d; "
4998 "connected_clients:%d; "
4999 "connected_slaves:%d; "
5000 "used_memory:%zu; "
5001 "changes_since_last_save:%lld; "
5002 "bgsave_in_progress:%d; "
5003 "last_save_time:%d; "
5004 "total_connections_received:%lld; "
5005 "total_commands_processed:%lld; "
5006 "role:%s;"
5007 ,REDIS_VERSION,
5008 uptime,
5009 listLength(server.clients)-listLength(server.slaves),
5010 listLength(server.slaves),
5011 server.usedmemory,
5012 server.dirty,
5013 server.bgsaveinprogress,
5014 server.lastsave,
5015 server.stat_numconnections,
5016 server.stat_numcommands,
5017 server.masterhost == NULL ? "master" : "slave"
5018 ));
5019
5020 trace_size = backtrace(trace, 100);
5021 /* overwrite sigaction with caller's address */
5022 if (getMcontextEip(uc) != NULL) {
5023 trace[1] = getMcontextEip(uc);
5024 }
5025 messages = backtrace_symbols(trace, trace_size);
5026
5027 for (i=1; i<trace_size; ++i) {
5028 char *fn = findFuncName(trace[i], &offset), *p;
5029
5030 p = strchr(messages[i],'+');
5031 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5032 redisLog(REDIS_WARNING,"%s", messages[i]);
5033 } else {
5034 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5035 }
5036 }
5037 free(messages);
5038 exit(0);
5039 }
5040
5041 static void setupSigSegvAction(void) {
5042 struct sigaction act;
5043
5044 sigemptyset (&act.sa_mask);
5045 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5046 * is used. Otherwise, sa_handler is used */
5047 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5048 act.sa_sigaction = segvHandler;
5049 sigaction (SIGSEGV, &act, NULL);
5050 sigaction (SIGBUS, &act, NULL);
5051 sigaction (SIGFPE, &act, NULL);
5052 sigaction (SIGILL, &act, NULL);
5053 sigaction (SIGBUS, &act, NULL);
5054 return;
5055 }
5056 #else /* HAVE_BACKTRACE */
5057 static void setupSigSegvAction(void) {
5058 }
5059 #endif /* HAVE_BACKTRACE */
5060
5061 /* =================================== Main! ================================ */
5062
5063 #ifdef __linux__
5064 int linuxOvercommitMemoryValue(void) {
5065 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5066 char buf[64];
5067
5068 if (!fp) return -1;
5069 if (fgets(buf,64,fp) == NULL) {
5070 fclose(fp);
5071 return -1;
5072 }
5073 fclose(fp);
5074
5075 return atoi(buf);
5076 }
5077
5078 void linuxOvercommitMemoryWarning(void) {
5079 if (linuxOvercommitMemoryValue() == 0) {
5080 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5081 }
5082 }
5083 #endif /* __linux__ */
5084
5085 static void daemonize(void) {
5086 int fd;
5087 FILE *fp;
5088
5089 if (fork() != 0) exit(0); /* parent exits */
5090 setsid(); /* create a new session */
5091
5092 /* Every output goes to /dev/null. If Redis is daemonized but
5093 * the 'logfile' is set to 'stdout' in the configuration file
5094 * it will not log at all. */
5095 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5096 dup2(fd, STDIN_FILENO);
5097 dup2(fd, STDOUT_FILENO);
5098 dup2(fd, STDERR_FILENO);
5099 if (fd > STDERR_FILENO) close(fd);
5100 }
5101 /* Try to write the pid file */
5102 fp = fopen(server.pidfile,"w");
5103 if (fp) {
5104 fprintf(fp,"%d\n",getpid());
5105 fclose(fp);
5106 }
5107 }
5108
5109 int main(int argc, char **argv) {
5110 initServerConfig();
5111 if (argc == 2) {
5112 ResetServerSaveParams();
5113 loadServerConfig(argv[1]);
5114 } else if (argc > 2) {
5115 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5116 exit(1);
5117 } else {
5118 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5119 }
5120 initServer();
5121 if (server.daemonize) daemonize();
5122 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5123 #ifdef __linux__
5124 linuxOvercommitMemoryWarning();
5125 #endif
5126 if (rdbLoad(server.dbfilename) == REDIS_OK)
5127 redisLog(REDIS_NOTICE,"DB loaded from disk");
5128 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5129 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5130 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5131 aeMain(server.el);
5132 aeDeleteEventLoop(server.el);
5133 return 0;
5134 }