]> git.saurik.com Git - redis.git/blob - redis.c
Fixed compilation in mac os x snow leopard when compiling a 32 bit binary.
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60
61 #include "redis.h"
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
70
71 /* Error codes */
72 #define REDIS_OK 0
73 #define REDIS_ERR -1
74
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
88
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
91
92 /* Command flags */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
100
101 /* Object types */
102 #define REDIS_STRING 0
103 #define REDIS_LIST 1
104 #define REDIS_SET 2
105 #define REDIS_ZSET 3
106 #define REDIS_HASH 4
107
108 /* Objects encoding */
109 #define REDIS_ENCODING_RAW 0 /* Raw representation */
110 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
111
112 /* Object types only used for dumping to disk */
113 #define REDIS_EXPIRETIME 253
114 #define REDIS_SELECTDB 254
115 #define REDIS_EOF 255
116
117 /* Defines related to the dump file format. To store 32 bits lengths for short
118 * keys requires a lot of space, so we check the most significant 2 bits of
119 * the first byte to interpreter the length:
120 *
121 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
122 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
123 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
124 * 11|000000 this means: specially encoded object will follow. The six bits
125 * number specify the kind of object that follows.
126 * See the REDIS_RDB_ENC_* defines.
127 *
128 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
129 * values, will fit inside. */
130 #define REDIS_RDB_6BITLEN 0
131 #define REDIS_RDB_14BITLEN 1
132 #define REDIS_RDB_32BITLEN 2
133 #define REDIS_RDB_ENCVAL 3
134 #define REDIS_RDB_LENERR UINT_MAX
135
136 /* When a length of a string object stored on disk has the first two bits
137 * set, the remaining two bits specify a special encoding for the object
138 * accordingly to the following defines: */
139 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
140 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
141 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
142 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
143
144 /* Client flags */
145 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
146 #define REDIS_SLAVE 2 /* This client is a slave server */
147 #define REDIS_MASTER 4 /* This client is a master server */
148 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
149
150 /* Slave replication state - slave side */
151 #define REDIS_REPL_NONE 0 /* No active replication */
152 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
153 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
154
155 /* Slave replication state - from the point of view of master
156 * Note that in SEND_BULK and ONLINE state the slave receives new updates
157 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
158 * to start the next background saving in order to send updates to it. */
159 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
160 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
161 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
162 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
163
164 /* List related stuff */
165 #define REDIS_HEAD 0
166 #define REDIS_TAIL 1
167
168 /* Sort operations */
169 #define REDIS_SORT_GET 0
170 #define REDIS_SORT_DEL 1
171 #define REDIS_SORT_INCR 2
172 #define REDIS_SORT_DECR 3
173 #define REDIS_SORT_ASC 4
174 #define REDIS_SORT_DESC 5
175 #define REDIS_SORTKEY_MAX 1024
176
177 /* Log levels */
178 #define REDIS_DEBUG 0
179 #define REDIS_NOTICE 1
180 #define REDIS_WARNING 2
181
182 /* Anti-warning macro... */
183 #define REDIS_NOTUSED(V) ((void) V)
184
185
186 /*================================= Data types ============================== */
187
188 /* A redis object, that is a type able to hold a string / list / set */
189 typedef struct redisObject {
190 void *ptr;
191 unsigned char type;
192 unsigned char encoding;
193 unsigned char notused[2];
194 int refcount;
195 } robj;
196
197 typedef struct redisDb {
198 dict *dict;
199 dict *expires;
200 int id;
201 } redisDb;
202
203 /* With multiplexing we need to take per-clinet state.
204 * Clients are taken in a liked list. */
205 typedef struct redisClient {
206 int fd;
207 redisDb *db;
208 int dictid;
209 sds querybuf;
210 robj **argv, **mbargv;
211 int argc, mbargc;
212 int bulklen; /* bulk read len. -1 if not in bulk read mode */
213 int multibulk; /* multi bulk command format active */
214 list *reply;
215 int sentlen;
216 time_t lastinteraction; /* time of the last interaction, used for timeout */
217 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
218 int slaveseldb; /* slave selected db, if this client is a slave */
219 int authenticated; /* when requirepass is non-NULL */
220 int replstate; /* replication state if this is a slave */
221 int repldbfd; /* replication DB file descriptor */
222 long repldboff; /* replication DB file offset */
223 off_t repldbsize; /* replication DB file size */
224 } redisClient;
225
226 struct saveparam {
227 time_t seconds;
228 int changes;
229 };
230
231 /* Global server state structure */
232 struct redisServer {
233 int port;
234 int fd;
235 redisDb *db;
236 dict *sharingpool;
237 unsigned int sharingpoolsize;
238 long long dirty; /* changes to DB from the last save */
239 list *clients;
240 list *slaves, *monitors;
241 char neterr[ANET_ERR_LEN];
242 aeEventLoop *el;
243 int cronloops; /* number of times the cron function run */
244 list *objfreelist; /* A list of freed objects to avoid malloc() */
245 time_t lastsave; /* Unix time of last save succeeede */
246 size_t usedmemory; /* Used memory in megabytes */
247 /* Fields used only for stats */
248 time_t stat_starttime; /* server start time */
249 long long stat_numcommands; /* number of processed commands */
250 long long stat_numconnections; /* number of connections received */
251 /* Configuration */
252 int verbosity;
253 int glueoutputbuf;
254 int maxidletime;
255 int dbnum;
256 int daemonize;
257 char *pidfile;
258 int bgsaveinprogress;
259 pid_t bgsavechildpid;
260 struct saveparam *saveparams;
261 int saveparamslen;
262 char *logfile;
263 char *bindaddr;
264 char *dbfilename;
265 char *requirepass;
266 int shareobjects;
267 /* Replication related */
268 int isslave;
269 char *masterhost;
270 int masterport;
271 redisClient *master; /* client that is master for this slave */
272 int replstate;
273 unsigned int maxclients;
274 unsigned long maxmemory;
275 /* Sort parameters - qsort_r() is only available under BSD so we
276 * have to take this state global, in order to pass it to sortCompare() */
277 int sort_desc;
278 int sort_alpha;
279 int sort_bypattern;
280 };
281
282 typedef void redisCommandProc(redisClient *c);
283 struct redisCommand {
284 char *name;
285 redisCommandProc *proc;
286 int arity;
287 int flags;
288 };
289
290 struct redisFunctionSym {
291 char *name;
292 unsigned long pointer;
293 };
294
295 typedef struct _redisSortObject {
296 robj *obj;
297 union {
298 double score;
299 robj *cmpobj;
300 } u;
301 } redisSortObject;
302
303 typedef struct _redisSortOperation {
304 int type;
305 robj *pattern;
306 } redisSortOperation;
307
308 typedef struct zset {
309 dict *dict;
310 /* tree *tree; */
311 } zset;
312
313 struct sharedObjectsStruct {
314 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
315 *colon, *nullbulk, *nullmultibulk,
316 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
317 *outofrangeerr, *plus,
318 *select0, *select1, *select2, *select3, *select4,
319 *select5, *select6, *select7, *select8, *select9;
320 } shared;
321
322 /*================================ Prototypes =============================== */
323
324 static void freeStringObject(robj *o);
325 static void freeListObject(robj *o);
326 static void freeSetObject(robj *o);
327 static void decrRefCount(void *o);
328 static robj *createObject(int type, void *ptr);
329 static void freeClient(redisClient *c);
330 static int rdbLoad(char *filename);
331 static void addReply(redisClient *c, robj *obj);
332 static void addReplySds(redisClient *c, sds s);
333 static void incrRefCount(robj *o);
334 static int rdbSaveBackground(char *filename);
335 static robj *createStringObject(char *ptr, size_t len);
336 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
337 static int syncWithMaster(void);
338 static robj *tryObjectSharing(robj *o);
339 static int tryObjectEncoding(robj *o);
340 static robj *getDecodedObject(const robj *o);
341 static int removeExpire(redisDb *db, robj *key);
342 static int expireIfNeeded(redisDb *db, robj *key);
343 static int deleteIfVolatile(redisDb *db, robj *key);
344 static int deleteKey(redisDb *db, robj *key);
345 static time_t getExpire(redisDb *db, robj *key);
346 static int setExpire(redisDb *db, robj *key, time_t when);
347 static void updateSlavesWaitingBgsave(int bgsaveerr);
348 static void freeMemoryIfNeeded(void);
349 static int processCommand(redisClient *c);
350 static void setupSigSegvAction(void);
351 static void rdbRemoveTempFile(pid_t childpid);
352 static size_t stringObjectLen(robj *o);
353 static void processInputBuffer(redisClient *c);
354
355 static void authCommand(redisClient *c);
356 static void pingCommand(redisClient *c);
357 static void echoCommand(redisClient *c);
358 static void setCommand(redisClient *c);
359 static void setnxCommand(redisClient *c);
360 static void getCommand(redisClient *c);
361 static void delCommand(redisClient *c);
362 static void existsCommand(redisClient *c);
363 static void incrCommand(redisClient *c);
364 static void decrCommand(redisClient *c);
365 static void incrbyCommand(redisClient *c);
366 static void decrbyCommand(redisClient *c);
367 static void selectCommand(redisClient *c);
368 static void randomkeyCommand(redisClient *c);
369 static void keysCommand(redisClient *c);
370 static void dbsizeCommand(redisClient *c);
371 static void lastsaveCommand(redisClient *c);
372 static void saveCommand(redisClient *c);
373 static void bgsaveCommand(redisClient *c);
374 static void shutdownCommand(redisClient *c);
375 static void moveCommand(redisClient *c);
376 static void renameCommand(redisClient *c);
377 static void renamenxCommand(redisClient *c);
378 static void lpushCommand(redisClient *c);
379 static void rpushCommand(redisClient *c);
380 static void lpopCommand(redisClient *c);
381 static void rpopCommand(redisClient *c);
382 static void llenCommand(redisClient *c);
383 static void lindexCommand(redisClient *c);
384 static void lrangeCommand(redisClient *c);
385 static void ltrimCommand(redisClient *c);
386 static void typeCommand(redisClient *c);
387 static void lsetCommand(redisClient *c);
388 static void saddCommand(redisClient *c);
389 static void sremCommand(redisClient *c);
390 static void smoveCommand(redisClient *c);
391 static void sismemberCommand(redisClient *c);
392 static void scardCommand(redisClient *c);
393 static void spopCommand(redisClient *c);
394 static void srandmemberCommand(redisClient *c);
395 static void sinterCommand(redisClient *c);
396 static void sinterstoreCommand(redisClient *c);
397 static void sunionCommand(redisClient *c);
398 static void sunionstoreCommand(redisClient *c);
399 static void sdiffCommand(redisClient *c);
400 static void sdiffstoreCommand(redisClient *c);
401 static void syncCommand(redisClient *c);
402 static void flushdbCommand(redisClient *c);
403 static void flushallCommand(redisClient *c);
404 static void sortCommand(redisClient *c);
405 static void lremCommand(redisClient *c);
406 static void infoCommand(redisClient *c);
407 static void mgetCommand(redisClient *c);
408 static void monitorCommand(redisClient *c);
409 static void expireCommand(redisClient *c);
410 static void getsetCommand(redisClient *c);
411 static void ttlCommand(redisClient *c);
412 static void slaveofCommand(redisClient *c);
413 static void debugCommand(redisClient *c);
414 static void msetCommand(redisClient *c);
415 static void msetnxCommand(redisClient *c);
416
417 /*================================= Globals ================================= */
418
419 /* Global vars */
420 static struct redisServer server; /* server global state */
421 static struct redisCommand cmdTable[] = {
422 {"get",getCommand,2,REDIS_CMD_INLINE},
423 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
424 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
425 {"del",delCommand,-2,REDIS_CMD_INLINE},
426 {"exists",existsCommand,2,REDIS_CMD_INLINE},
427 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
428 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
429 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
430 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
431 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
432 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
433 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
434 {"llen",llenCommand,2,REDIS_CMD_INLINE},
435 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
436 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
437 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
438 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
439 {"lrem",lremCommand,4,REDIS_CMD_BULK},
440 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
441 {"srem",sremCommand,3,REDIS_CMD_BULK},
442 {"smove",smoveCommand,4,REDIS_CMD_BULK},
443 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
444 {"scard",scardCommand,2,REDIS_CMD_INLINE},
445 {"spop",spopCommand,2,REDIS_CMD_INLINE},
446 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
447 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
448 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
449 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
450 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
451 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
452 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
453 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
454 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
455 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
456 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
457 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
458 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
459 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
460 {"select",selectCommand,2,REDIS_CMD_INLINE},
461 {"move",moveCommand,3,REDIS_CMD_INLINE},
462 {"rename",renameCommand,3,REDIS_CMD_INLINE},
463 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
464 {"expire",expireCommand,3,REDIS_CMD_INLINE},
465 {"keys",keysCommand,2,REDIS_CMD_INLINE},
466 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
467 {"auth",authCommand,2,REDIS_CMD_INLINE},
468 {"ping",pingCommand,1,REDIS_CMD_INLINE},
469 {"echo",echoCommand,2,REDIS_CMD_BULK},
470 {"save",saveCommand,1,REDIS_CMD_INLINE},
471 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
472 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
473 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
474 {"type",typeCommand,2,REDIS_CMD_INLINE},
475 {"sync",syncCommand,1,REDIS_CMD_INLINE},
476 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
477 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
478 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
479 {"info",infoCommand,1,REDIS_CMD_INLINE},
480 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
481 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
482 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
483 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
484 {NULL,NULL,0,0}
485 };
486 /*============================ Utility functions ============================ */
487
488 /* Glob-style pattern matching. */
489 int stringmatchlen(const char *pattern, int patternLen,
490 const char *string, int stringLen, int nocase)
491 {
492 while(patternLen) {
493 switch(pattern[0]) {
494 case '*':
495 while (pattern[1] == '*') {
496 pattern++;
497 patternLen--;
498 }
499 if (patternLen == 1)
500 return 1; /* match */
501 while(stringLen) {
502 if (stringmatchlen(pattern+1, patternLen-1,
503 string, stringLen, nocase))
504 return 1; /* match */
505 string++;
506 stringLen--;
507 }
508 return 0; /* no match */
509 break;
510 case '?':
511 if (stringLen == 0)
512 return 0; /* no match */
513 string++;
514 stringLen--;
515 break;
516 case '[':
517 {
518 int not, match;
519
520 pattern++;
521 patternLen--;
522 not = pattern[0] == '^';
523 if (not) {
524 pattern++;
525 patternLen--;
526 }
527 match = 0;
528 while(1) {
529 if (pattern[0] == '\\') {
530 pattern++;
531 patternLen--;
532 if (pattern[0] == string[0])
533 match = 1;
534 } else if (pattern[0] == ']') {
535 break;
536 } else if (patternLen == 0) {
537 pattern--;
538 patternLen++;
539 break;
540 } else if (pattern[1] == '-' && patternLen >= 3) {
541 int start = pattern[0];
542 int end = pattern[2];
543 int c = string[0];
544 if (start > end) {
545 int t = start;
546 start = end;
547 end = t;
548 }
549 if (nocase) {
550 start = tolower(start);
551 end = tolower(end);
552 c = tolower(c);
553 }
554 pattern += 2;
555 patternLen -= 2;
556 if (c >= start && c <= end)
557 match = 1;
558 } else {
559 if (!nocase) {
560 if (pattern[0] == string[0])
561 match = 1;
562 } else {
563 if (tolower((int)pattern[0]) == tolower((int)string[0]))
564 match = 1;
565 }
566 }
567 pattern++;
568 patternLen--;
569 }
570 if (not)
571 match = !match;
572 if (!match)
573 return 0; /* no match */
574 string++;
575 stringLen--;
576 break;
577 }
578 case '\\':
579 if (patternLen >= 2) {
580 pattern++;
581 patternLen--;
582 }
583 /* fall through */
584 default:
585 if (!nocase) {
586 if (pattern[0] != string[0])
587 return 0; /* no match */
588 } else {
589 if (tolower((int)pattern[0]) != tolower((int)string[0]))
590 return 0; /* no match */
591 }
592 string++;
593 stringLen--;
594 break;
595 }
596 pattern++;
597 patternLen--;
598 if (stringLen == 0) {
599 while(*pattern == '*') {
600 pattern++;
601 patternLen--;
602 }
603 break;
604 }
605 }
606 if (patternLen == 0 && stringLen == 0)
607 return 1;
608 return 0;
609 }
610
611 static void redisLog(int level, const char *fmt, ...) {
612 va_list ap;
613 FILE *fp;
614
615 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
616 if (!fp) return;
617
618 va_start(ap, fmt);
619 if (level >= server.verbosity) {
620 char *c = ".-*";
621 char buf[64];
622 time_t now;
623
624 now = time(NULL);
625 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
626 fprintf(fp,"%s %c ",buf,c[level]);
627 vfprintf(fp, fmt, ap);
628 fprintf(fp,"\n");
629 fflush(fp);
630 }
631 va_end(ap);
632
633 if (server.logfile) fclose(fp);
634 }
635
636 /*====================== Hash table type implementation ==================== */
637
638 /* This is an hash table type that uses the SDS dynamic strings libary as
639 * keys and radis objects as values (objects can hold SDS strings,
640 * lists, sets). */
641
642 #if 0
643 static void dictVanillaFree(void *privdata, void *val)
644 {
645 DICT_NOTUSED(privdata);
646 zfree(val);
647 }
648 #endif
649
650 static int sdsDictKeyCompare(void *privdata, const void *key1,
651 const void *key2)
652 {
653 int l1,l2;
654 DICT_NOTUSED(privdata);
655
656 l1 = sdslen((sds)key1);
657 l2 = sdslen((sds)key2);
658 if (l1 != l2) return 0;
659 return memcmp(key1, key2, l1) == 0;
660 }
661
662 static void dictRedisObjectDestructor(void *privdata, void *val)
663 {
664 DICT_NOTUSED(privdata);
665
666 decrRefCount(val);
667 }
668
669 static int dictObjKeyCompare(void *privdata, const void *key1,
670 const void *key2)
671 {
672 const robj *o1 = key1, *o2 = key2;
673 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
674 }
675
676 static unsigned int dictObjHash(const void *key) {
677 const robj *o = key;
678 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
679 }
680
681 static int dictEncObjKeyCompare(void *privdata, const void *key1,
682 const void *key2)
683 {
684 const robj *o1 = key1, *o2 = key2;
685
686 if (o1->encoding == REDIS_ENCODING_RAW &&
687 o2->encoding == REDIS_ENCODING_RAW)
688 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
689 else {
690 robj *dec1, *dec2;
691 int cmp;
692
693 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
694 getDecodedObject(o1) : (robj*)o1;
695 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
696 getDecodedObject(o2) : (robj*)o2;
697 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
698 if (dec1 != o1) decrRefCount(dec1);
699 if (dec2 != o2) decrRefCount(dec2);
700 return cmp;
701 }
702 }
703
704 static unsigned int dictEncObjHash(const void *key) {
705 const robj *o = key;
706
707 if (o->encoding == REDIS_ENCODING_RAW)
708 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
709 else {
710 robj *dec = getDecodedObject(o);
711 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
712 decrRefCount(dec);
713 return hash;
714 }
715 }
716
717 static dictType setDictType = {
718 dictEncObjHash, /* hash function */
719 NULL, /* key dup */
720 NULL, /* val dup */
721 dictEncObjKeyCompare, /* key compare */
722 dictRedisObjectDestructor, /* key destructor */
723 NULL /* val destructor */
724 };
725
726 #if 0
727 static dictType zsetDictType = {
728 dictEncObjHash, /* hash function */
729 NULL, /* key dup */
730 NULL, /* val dup */
731 dictEncObjKeyCompare, /* key compare */
732 dictRedisObjectDestructor, /* key destructor */
733 dictVanillaFree /* val destructor */
734 };
735 #endif
736
737 static dictType hashDictType = {
738 dictObjHash, /* hash function */
739 NULL, /* key dup */
740 NULL, /* val dup */
741 dictObjKeyCompare, /* key compare */
742 dictRedisObjectDestructor, /* key destructor */
743 dictRedisObjectDestructor /* val destructor */
744 };
745
746 /* ========================= Random utility functions ======================= */
747
748 /* Redis generally does not try to recover from out of memory conditions
749 * when allocating objects or strings, it is not clear if it will be possible
750 * to report this condition to the client since the networking layer itself
751 * is based on heap allocation for send buffers, so we simply abort.
752 * At least the code will be simpler to read... */
753 static void oom(const char *msg) {
754 fprintf(stderr, "%s: Out of memory\n",msg);
755 fflush(stderr);
756 sleep(1);
757 abort();
758 }
759
760 /* ====================== Redis server networking stuff ===================== */
761 static void closeTimedoutClients(void) {
762 redisClient *c;
763 listNode *ln;
764 time_t now = time(NULL);
765
766 listRewind(server.clients);
767 while ((ln = listYield(server.clients)) != NULL) {
768 c = listNodeValue(ln);
769 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
770 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
771 (now - c->lastinteraction > server.maxidletime)) {
772 redisLog(REDIS_DEBUG,"Closing idle client");
773 freeClient(c);
774 }
775 }
776 }
777
778 static int htNeedsResize(dict *dict) {
779 long long size, used;
780
781 size = dictSlots(dict);
782 used = dictSize(dict);
783 return (size && used && size > DICT_HT_INITIAL_SIZE &&
784 (used*100/size < REDIS_HT_MINFILL));
785 }
786
787 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
788 * we resize the hash table to save memory */
789 static void tryResizeHashTables(void) {
790 int j;
791
792 for (j = 0; j < server.dbnum; j++) {
793 if (htNeedsResize(server.db[j].dict)) {
794 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
795 dictResize(server.db[j].dict);
796 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
797 }
798 if (htNeedsResize(server.db[j].expires))
799 dictResize(server.db[j].expires);
800 }
801 }
802
803 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
804 int j, loops = server.cronloops++;
805 REDIS_NOTUSED(eventLoop);
806 REDIS_NOTUSED(id);
807 REDIS_NOTUSED(clientData);
808
809 /* Update the global state with the amount of used memory */
810 server.usedmemory = zmalloc_used_memory();
811
812 /* Show some info about non-empty databases */
813 for (j = 0; j < server.dbnum; j++) {
814 long long size, used, vkeys;
815
816 size = dictSlots(server.db[j].dict);
817 used = dictSize(server.db[j].dict);
818 vkeys = dictSize(server.db[j].expires);
819 if (!(loops % 5) && (used || vkeys)) {
820 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
821 /* dictPrintStats(server.dict); */
822 }
823 }
824
825 /* We don't want to resize the hash tables while a bacground saving
826 * is in progress: the saving child is created using fork() that is
827 * implemented with a copy-on-write semantic in most modern systems, so
828 * if we resize the HT while there is the saving child at work actually
829 * a lot of memory movements in the parent will cause a lot of pages
830 * copied. */
831 if (!server.bgsaveinprogress) tryResizeHashTables();
832
833 /* Show information about connected clients */
834 if (!(loops % 5)) {
835 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
836 listLength(server.clients)-listLength(server.slaves),
837 listLength(server.slaves),
838 server.usedmemory,
839 dictSize(server.sharingpool));
840 }
841
842 /* Close connections of timedout clients */
843 if (server.maxidletime && !(loops % 10))
844 closeTimedoutClients();
845
846 /* Check if a background saving in progress terminated */
847 if (server.bgsaveinprogress) {
848 int statloc;
849 if (wait4(-1,&statloc,WNOHANG,NULL)) {
850 int exitcode = WEXITSTATUS(statloc);
851 int bysignal = WIFSIGNALED(statloc);
852
853 if (!bysignal && exitcode == 0) {
854 redisLog(REDIS_NOTICE,
855 "Background saving terminated with success");
856 server.dirty = 0;
857 server.lastsave = time(NULL);
858 } else if (!bysignal && exitcode != 0) {
859 redisLog(REDIS_WARNING, "Background saving error");
860 } else {
861 redisLog(REDIS_WARNING,
862 "Background saving terminated by signal");
863 rdbRemoveTempFile(server.bgsavechildpid);
864 }
865 server.bgsaveinprogress = 0;
866 server.bgsavechildpid = -1;
867 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
868 }
869 } else {
870 /* If there is not a background saving in progress check if
871 * we have to save now */
872 time_t now = time(NULL);
873 for (j = 0; j < server.saveparamslen; j++) {
874 struct saveparam *sp = server.saveparams+j;
875
876 if (server.dirty >= sp->changes &&
877 now-server.lastsave > sp->seconds) {
878 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
879 sp->changes, sp->seconds);
880 rdbSaveBackground(server.dbfilename);
881 break;
882 }
883 }
884 }
885
886 /* Try to expire a few timed out keys */
887 for (j = 0; j < server.dbnum; j++) {
888 redisDb *db = server.db+j;
889 int num = dictSize(db->expires);
890
891 if (num) {
892 time_t now = time(NULL);
893
894 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
895 num = REDIS_EXPIRELOOKUPS_PER_CRON;
896 while (num--) {
897 dictEntry *de;
898 time_t t;
899
900 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
901 t = (time_t) dictGetEntryVal(de);
902 if (now > t) {
903 deleteKey(db,dictGetEntryKey(de));
904 }
905 }
906 }
907 }
908
909 /* Check if we should connect to a MASTER */
910 if (server.replstate == REDIS_REPL_CONNECT) {
911 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
912 if (syncWithMaster() == REDIS_OK) {
913 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
914 }
915 }
916 return 1000;
917 }
918
919 static void createSharedObjects(void) {
920 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
921 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
922 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
923 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
924 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
925 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
926 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
927 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
928 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
929 /* no such key */
930 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
931 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
932 "-ERR Operation against a key holding the wrong kind of value\r\n"));
933 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
934 "-ERR no such key\r\n"));
935 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
936 "-ERR syntax error\r\n"));
937 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
938 "-ERR source and destination objects are the same\r\n"));
939 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
940 "-ERR index out of range\r\n"));
941 shared.space = createObject(REDIS_STRING,sdsnew(" "));
942 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
943 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
944 shared.select0 = createStringObject("select 0\r\n",10);
945 shared.select1 = createStringObject("select 1\r\n",10);
946 shared.select2 = createStringObject("select 2\r\n",10);
947 shared.select3 = createStringObject("select 3\r\n",10);
948 shared.select4 = createStringObject("select 4\r\n",10);
949 shared.select5 = createStringObject("select 5\r\n",10);
950 shared.select6 = createStringObject("select 6\r\n",10);
951 shared.select7 = createStringObject("select 7\r\n",10);
952 shared.select8 = createStringObject("select 8\r\n",10);
953 shared.select9 = createStringObject("select 9\r\n",10);
954 }
955
956 static void appendServerSaveParams(time_t seconds, int changes) {
957 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
958 if (server.saveparams == NULL) oom("appendServerSaveParams");
959 server.saveparams[server.saveparamslen].seconds = seconds;
960 server.saveparams[server.saveparamslen].changes = changes;
961 server.saveparamslen++;
962 }
963
964 static void ResetServerSaveParams() {
965 zfree(server.saveparams);
966 server.saveparams = NULL;
967 server.saveparamslen = 0;
968 }
969
970 static void initServerConfig() {
971 server.dbnum = REDIS_DEFAULT_DBNUM;
972 server.port = REDIS_SERVERPORT;
973 server.verbosity = REDIS_DEBUG;
974 server.maxidletime = REDIS_MAXIDLETIME;
975 server.saveparams = NULL;
976 server.logfile = NULL; /* NULL = log on standard output */
977 server.bindaddr = NULL;
978 server.glueoutputbuf = 1;
979 server.daemonize = 0;
980 server.pidfile = "/var/run/redis.pid";
981 server.dbfilename = "dump.rdb";
982 server.requirepass = NULL;
983 server.shareobjects = 0;
984 server.sharingpoolsize = 1024;
985 server.maxclients = 0;
986 server.maxmemory = 0;
987 ResetServerSaveParams();
988
989 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
990 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
991 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
992 /* Replication related */
993 server.isslave = 0;
994 server.masterhost = NULL;
995 server.masterport = 6379;
996 server.master = NULL;
997 server.replstate = REDIS_REPL_NONE;
998 }
999
1000 static void initServer() {
1001 int j;
1002
1003 signal(SIGHUP, SIG_IGN);
1004 signal(SIGPIPE, SIG_IGN);
1005 setupSigSegvAction();
1006
1007 server.clients = listCreate();
1008 server.slaves = listCreate();
1009 server.monitors = listCreate();
1010 server.objfreelist = listCreate();
1011 createSharedObjects();
1012 server.el = aeCreateEventLoop();
1013 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1014 server.sharingpool = dictCreate(&setDictType,NULL);
1015 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
1016 oom("server initialization"); /* Fatal OOM */
1017 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1018 if (server.fd == -1) {
1019 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1020 exit(1);
1021 }
1022 for (j = 0; j < server.dbnum; j++) {
1023 server.db[j].dict = dictCreate(&hashDictType,NULL);
1024 server.db[j].expires = dictCreate(&setDictType,NULL);
1025 server.db[j].id = j;
1026 }
1027 server.cronloops = 0;
1028 server.bgsaveinprogress = 0;
1029 server.bgsavechildpid = -1;
1030 server.lastsave = time(NULL);
1031 server.dirty = 0;
1032 server.usedmemory = 0;
1033 server.stat_numcommands = 0;
1034 server.stat_numconnections = 0;
1035 server.stat_starttime = time(NULL);
1036 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1037 }
1038
1039 /* Empty the whole database */
1040 static long long emptyDb() {
1041 int j;
1042 long long removed = 0;
1043
1044 for (j = 0; j < server.dbnum; j++) {
1045 removed += dictSize(server.db[j].dict);
1046 dictEmpty(server.db[j].dict);
1047 dictEmpty(server.db[j].expires);
1048 }
1049 return removed;
1050 }
1051
1052 static int yesnotoi(char *s) {
1053 if (!strcasecmp(s,"yes")) return 1;
1054 else if (!strcasecmp(s,"no")) return 0;
1055 else return -1;
1056 }
1057
1058 /* I agree, this is a very rudimental way to load a configuration...
1059 will improve later if the config gets more complex */
1060 static void loadServerConfig(char *filename) {
1061 FILE *fp;
1062 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1063 int linenum = 0;
1064 sds line = NULL;
1065
1066 if (filename[0] == '-' && filename[1] == '\0')
1067 fp = stdin;
1068 else {
1069 if ((fp = fopen(filename,"r")) == NULL) {
1070 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1071 exit(1);
1072 }
1073 }
1074
1075 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1076 sds *argv;
1077 int argc, j;
1078
1079 linenum++;
1080 line = sdsnew(buf);
1081 line = sdstrim(line," \t\r\n");
1082
1083 /* Skip comments and blank lines*/
1084 if (line[0] == '#' || line[0] == '\0') {
1085 sdsfree(line);
1086 continue;
1087 }
1088
1089 /* Split into arguments */
1090 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1091 sdstolower(argv[0]);
1092
1093 /* Execute config directives */
1094 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1095 server.maxidletime = atoi(argv[1]);
1096 if (server.maxidletime < 0) {
1097 err = "Invalid timeout value"; goto loaderr;
1098 }
1099 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1100 server.port = atoi(argv[1]);
1101 if (server.port < 1 || server.port > 65535) {
1102 err = "Invalid port"; goto loaderr;
1103 }
1104 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1105 server.bindaddr = zstrdup(argv[1]);
1106 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1107 int seconds = atoi(argv[1]);
1108 int changes = atoi(argv[2]);
1109 if (seconds < 1 || changes < 0) {
1110 err = "Invalid save parameters"; goto loaderr;
1111 }
1112 appendServerSaveParams(seconds,changes);
1113 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1114 if (chdir(argv[1]) == -1) {
1115 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1116 argv[1], strerror(errno));
1117 exit(1);
1118 }
1119 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1120 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1121 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1122 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1123 else {
1124 err = "Invalid log level. Must be one of debug, notice, warning";
1125 goto loaderr;
1126 }
1127 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1128 FILE *logfp;
1129
1130 server.logfile = zstrdup(argv[1]);
1131 if (!strcasecmp(server.logfile,"stdout")) {
1132 zfree(server.logfile);
1133 server.logfile = NULL;
1134 }
1135 if (server.logfile) {
1136 /* Test if we are able to open the file. The server will not
1137 * be able to abort just for this problem later... */
1138 logfp = fopen(server.logfile,"a");
1139 if (logfp == NULL) {
1140 err = sdscatprintf(sdsempty(),
1141 "Can't open the log file: %s", strerror(errno));
1142 goto loaderr;
1143 }
1144 fclose(logfp);
1145 }
1146 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1147 server.dbnum = atoi(argv[1]);
1148 if (server.dbnum < 1) {
1149 err = "Invalid number of databases"; goto loaderr;
1150 }
1151 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1152 server.maxclients = atoi(argv[1]);
1153 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1154 server.maxmemory = strtoll(argv[1], NULL, 10);
1155 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1156 server.masterhost = sdsnew(argv[1]);
1157 server.masterport = atoi(argv[2]);
1158 server.replstate = REDIS_REPL_CONNECT;
1159 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1160 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1161 err = "argument must be 'yes' or 'no'"; goto loaderr;
1162 }
1163 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1164 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1165 err = "argument must be 'yes' or 'no'"; goto loaderr;
1166 }
1167 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1168 server.sharingpoolsize = atoi(argv[1]);
1169 if (server.sharingpoolsize < 1) {
1170 err = "invalid object sharing pool size"; goto loaderr;
1171 }
1172 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1173 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1174 err = "argument must be 'yes' or 'no'"; goto loaderr;
1175 }
1176 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1177 server.requirepass = zstrdup(argv[1]);
1178 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1179 server.pidfile = zstrdup(argv[1]);
1180 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1181 server.dbfilename = zstrdup(argv[1]);
1182 } else {
1183 err = "Bad directive or wrong number of arguments"; goto loaderr;
1184 }
1185 for (j = 0; j < argc; j++)
1186 sdsfree(argv[j]);
1187 zfree(argv);
1188 sdsfree(line);
1189 }
1190 if (fp != stdin) fclose(fp);
1191 return;
1192
1193 loaderr:
1194 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1195 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1196 fprintf(stderr, ">>> '%s'\n", line);
1197 fprintf(stderr, "%s\n", err);
1198 exit(1);
1199 }
1200
1201 static void freeClientArgv(redisClient *c) {
1202 int j;
1203
1204 for (j = 0; j < c->argc; j++)
1205 decrRefCount(c->argv[j]);
1206 for (j = 0; j < c->mbargc; j++)
1207 decrRefCount(c->mbargv[j]);
1208 c->argc = 0;
1209 c->mbargc = 0;
1210 }
1211
1212 static void freeClient(redisClient *c) {
1213 listNode *ln;
1214
1215 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1216 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1217 sdsfree(c->querybuf);
1218 listRelease(c->reply);
1219 freeClientArgv(c);
1220 close(c->fd);
1221 ln = listSearchKey(server.clients,c);
1222 assert(ln != NULL);
1223 listDelNode(server.clients,ln);
1224 if (c->flags & REDIS_SLAVE) {
1225 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1226 close(c->repldbfd);
1227 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1228 ln = listSearchKey(l,c);
1229 assert(ln != NULL);
1230 listDelNode(l,ln);
1231 }
1232 if (c->flags & REDIS_MASTER) {
1233 server.master = NULL;
1234 server.replstate = REDIS_REPL_CONNECT;
1235 }
1236 zfree(c->argv);
1237 zfree(c->mbargv);
1238 zfree(c);
1239 }
1240
1241 static void glueReplyBuffersIfNeeded(redisClient *c) {
1242 int totlen = 0;
1243 listNode *ln;
1244 robj *o;
1245
1246 listRewind(c->reply);
1247 while((ln = listYield(c->reply))) {
1248 o = ln->value;
1249 totlen += sdslen(o->ptr);
1250 /* This optimization makes more sense if we don't have to copy
1251 * too much data */
1252 if (totlen > 1024) return;
1253 }
1254 if (totlen > 0) {
1255 char buf[1024];
1256 int copylen = 0;
1257
1258 listRewind(c->reply);
1259 while((ln = listYield(c->reply))) {
1260 o = ln->value;
1261 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1262 copylen += sdslen(o->ptr);
1263 listDelNode(c->reply,ln);
1264 }
1265 /* Now the output buffer is empty, add the new single element */
1266 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1267 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1268 }
1269 }
1270
1271 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1272 redisClient *c = privdata;
1273 int nwritten = 0, totwritten = 0, objlen;
1274 robj *o;
1275 REDIS_NOTUSED(el);
1276 REDIS_NOTUSED(mask);
1277
1278 if (server.glueoutputbuf && listLength(c->reply) > 1)
1279 glueReplyBuffersIfNeeded(c);
1280 while(listLength(c->reply)) {
1281 o = listNodeValue(listFirst(c->reply));
1282 objlen = sdslen(o->ptr);
1283
1284 if (objlen == 0) {
1285 listDelNode(c->reply,listFirst(c->reply));
1286 continue;
1287 }
1288
1289 if (c->flags & REDIS_MASTER) {
1290 /* Don't reply to a master */
1291 nwritten = objlen - c->sentlen;
1292 } else {
1293 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1294 if (nwritten <= 0) break;
1295 }
1296 c->sentlen += nwritten;
1297 totwritten += nwritten;
1298 /* If we fully sent the object on head go to the next one */
1299 if (c->sentlen == objlen) {
1300 listDelNode(c->reply,listFirst(c->reply));
1301 c->sentlen = 0;
1302 }
1303 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1304 * bytes, in a single threaded server it's a good idea to server
1305 * other clients as well, even if a very large request comes from
1306 * super fast link that is always able to accept data (in real world
1307 * terms think to 'KEYS *' against the loopback interfae) */
1308 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1309 }
1310 if (nwritten == -1) {
1311 if (errno == EAGAIN) {
1312 nwritten = 0;
1313 } else {
1314 redisLog(REDIS_DEBUG,
1315 "Error writing to client: %s", strerror(errno));
1316 freeClient(c);
1317 return;
1318 }
1319 }
1320 if (totwritten > 0) c->lastinteraction = time(NULL);
1321 if (listLength(c->reply) == 0) {
1322 c->sentlen = 0;
1323 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1324 }
1325 }
1326
1327 static struct redisCommand *lookupCommand(char *name) {
1328 int j = 0;
1329 while(cmdTable[j].name != NULL) {
1330 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1331 j++;
1332 }
1333 return NULL;
1334 }
1335
1336 /* resetClient prepare the client to process the next command */
1337 static void resetClient(redisClient *c) {
1338 freeClientArgv(c);
1339 c->bulklen = -1;
1340 c->multibulk = 0;
1341 }
1342
1343 /* If this function gets called we already read a whole
1344 * command, argments are in the client argv/argc fields.
1345 * processCommand() execute the command or prepare the
1346 * server for a bulk read from the client.
1347 *
1348 * If 1 is returned the client is still alive and valid and
1349 * and other operations can be performed by the caller. Otherwise
1350 * if 0 is returned the client was destroied (i.e. after QUIT). */
1351 static int processCommand(redisClient *c) {
1352 struct redisCommand *cmd;
1353 long long dirty;
1354
1355 /* Free some memory if needed (maxmemory setting) */
1356 if (server.maxmemory) freeMemoryIfNeeded();
1357
1358 /* Handle the multi bulk command type. This is an alternative protocol
1359 * supported by Redis in order to receive commands that are composed of
1360 * multiple binary-safe "bulk" arguments. The latency of processing is
1361 * a bit higher but this allows things like multi-sets, so if this
1362 * protocol is used only for MSET and similar commands this is a big win. */
1363 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1364 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1365 if (c->multibulk <= 0) {
1366 resetClient(c);
1367 return 1;
1368 } else {
1369 decrRefCount(c->argv[c->argc-1]);
1370 c->argc--;
1371 return 1;
1372 }
1373 } else if (c->multibulk) {
1374 if (c->bulklen == -1) {
1375 if (((char*)c->argv[0]->ptr)[0] != '$') {
1376 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1377 resetClient(c);
1378 return 1;
1379 } else {
1380 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1381 decrRefCount(c->argv[0]);
1382 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1383 c->argc--;
1384 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1385 resetClient(c);
1386 return 1;
1387 }
1388 c->argc--;
1389 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1390 return 1;
1391 }
1392 } else {
1393 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1394 c->mbargv[c->mbargc] = c->argv[0];
1395 c->mbargc++;
1396 c->argc--;
1397 c->multibulk--;
1398 if (c->multibulk == 0) {
1399 robj **auxargv;
1400 int auxargc;
1401
1402 /* Here we need to swap the multi-bulk argc/argv with the
1403 * normal argc/argv of the client structure. */
1404 auxargv = c->argv;
1405 c->argv = c->mbargv;
1406 c->mbargv = auxargv;
1407
1408 auxargc = c->argc;
1409 c->argc = c->mbargc;
1410 c->mbargc = auxargc;
1411
1412 /* We need to set bulklen to something different than -1
1413 * in order for the code below to process the command without
1414 * to try to read the last argument of a bulk command as
1415 * a special argument. */
1416 c->bulklen = 0;
1417 /* continue below and process the command */
1418 } else {
1419 c->bulklen = -1;
1420 return 1;
1421 }
1422 }
1423 }
1424 /* -- end of multi bulk commands processing -- */
1425
1426 /* The QUIT command is handled as a special case. Normal command
1427 * procs are unable to close the client connection safely */
1428 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1429 freeClient(c);
1430 return 0;
1431 }
1432 cmd = lookupCommand(c->argv[0]->ptr);
1433 if (!cmd) {
1434 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1435 resetClient(c);
1436 return 1;
1437 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1438 (c->argc < -cmd->arity)) {
1439 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1440 resetClient(c);
1441 return 1;
1442 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1443 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1444 resetClient(c);
1445 return 1;
1446 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1447 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1448
1449 decrRefCount(c->argv[c->argc-1]);
1450 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1451 c->argc--;
1452 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1453 resetClient(c);
1454 return 1;
1455 }
1456 c->argc--;
1457 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1458 /* It is possible that the bulk read is already in the
1459 * buffer. Check this condition and handle it accordingly.
1460 * This is just a fast path, alternative to call processInputBuffer().
1461 * It's a good idea since the code is small and this condition
1462 * happens most of the times. */
1463 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1464 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1465 c->argc++;
1466 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1467 } else {
1468 return 1;
1469 }
1470 }
1471 /* Let's try to share objects on the command arguments vector */
1472 if (server.shareobjects) {
1473 int j;
1474 for(j = 1; j < c->argc; j++)
1475 c->argv[j] = tryObjectSharing(c->argv[j]);
1476 }
1477 /* Let's try to encode the bulk object to save space. */
1478 if (cmd->flags & REDIS_CMD_BULK)
1479 tryObjectEncoding(c->argv[c->argc-1]);
1480
1481 /* Check if the user is authenticated */
1482 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1483 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1484 resetClient(c);
1485 return 1;
1486 }
1487
1488 /* Exec the command */
1489 dirty = server.dirty;
1490 cmd->proc(c);
1491 if (server.dirty-dirty != 0 && listLength(server.slaves))
1492 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1493 if (listLength(server.monitors))
1494 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1495 server.stat_numcommands++;
1496
1497 /* Prepare the client for the next command */
1498 if (c->flags & REDIS_CLOSE) {
1499 freeClient(c);
1500 return 0;
1501 }
1502 resetClient(c);
1503 return 1;
1504 }
1505
1506 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1507 listNode *ln;
1508 int outc = 0, j;
1509 robj **outv;
1510 /* (args*2)+1 is enough room for args, spaces, newlines */
1511 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1512
1513 if (argc <= REDIS_STATIC_ARGS) {
1514 outv = static_outv;
1515 } else {
1516 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1517 if (!outv) oom("replicationFeedSlaves");
1518 }
1519
1520 for (j = 0; j < argc; j++) {
1521 if (j != 0) outv[outc++] = shared.space;
1522 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1523 robj *lenobj;
1524
1525 lenobj = createObject(REDIS_STRING,
1526 sdscatprintf(sdsempty(),"%d\r\n",
1527 stringObjectLen(argv[j])));
1528 lenobj->refcount = 0;
1529 outv[outc++] = lenobj;
1530 }
1531 outv[outc++] = argv[j];
1532 }
1533 outv[outc++] = shared.crlf;
1534
1535 /* Increment all the refcounts at start and decrement at end in order to
1536 * be sure to free objects if there is no slave in a replication state
1537 * able to be feed with commands */
1538 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1539 listRewind(slaves);
1540 while((ln = listYield(slaves))) {
1541 redisClient *slave = ln->value;
1542
1543 /* Don't feed slaves that are still waiting for BGSAVE to start */
1544 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1545
1546 /* Feed all the other slaves, MONITORs and so on */
1547 if (slave->slaveseldb != dictid) {
1548 robj *selectcmd;
1549
1550 switch(dictid) {
1551 case 0: selectcmd = shared.select0; break;
1552 case 1: selectcmd = shared.select1; break;
1553 case 2: selectcmd = shared.select2; break;
1554 case 3: selectcmd = shared.select3; break;
1555 case 4: selectcmd = shared.select4; break;
1556 case 5: selectcmd = shared.select5; break;
1557 case 6: selectcmd = shared.select6; break;
1558 case 7: selectcmd = shared.select7; break;
1559 case 8: selectcmd = shared.select8; break;
1560 case 9: selectcmd = shared.select9; break;
1561 default:
1562 selectcmd = createObject(REDIS_STRING,
1563 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1564 selectcmd->refcount = 0;
1565 break;
1566 }
1567 addReply(slave,selectcmd);
1568 slave->slaveseldb = dictid;
1569 }
1570 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1571 }
1572 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1573 if (outv != static_outv) zfree(outv);
1574 }
1575
1576 static void processInputBuffer(redisClient *c) {
1577 again:
1578 if (c->bulklen == -1) {
1579 /* Read the first line of the query */
1580 char *p = strchr(c->querybuf,'\n');
1581 size_t querylen;
1582
1583 if (p) {
1584 sds query, *argv;
1585 int argc, j;
1586
1587 query = c->querybuf;
1588 c->querybuf = sdsempty();
1589 querylen = 1+(p-(query));
1590 if (sdslen(query) > querylen) {
1591 /* leave data after the first line of the query in the buffer */
1592 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1593 }
1594 *p = '\0'; /* remove "\n" */
1595 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1596 sdsupdatelen(query);
1597
1598 /* Now we can split the query in arguments */
1599 if (sdslen(query) == 0) {
1600 /* Ignore empty query */
1601 sdsfree(query);
1602 return;
1603 }
1604 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1605 if (argv == NULL) oom("sdssplitlen");
1606 sdsfree(query);
1607
1608 if (c->argv) zfree(c->argv);
1609 c->argv = zmalloc(sizeof(robj*)*argc);
1610 if (c->argv == NULL) oom("allocating arguments list for client");
1611
1612 for (j = 0; j < argc; j++) {
1613 if (sdslen(argv[j])) {
1614 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1615 c->argc++;
1616 } else {
1617 sdsfree(argv[j]);
1618 }
1619 }
1620 zfree(argv);
1621 /* Execute the command. If the client is still valid
1622 * after processCommand() return and there is something
1623 * on the query buffer try to process the next command. */
1624 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1625 return;
1626 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1627 redisLog(REDIS_DEBUG, "Client protocol error");
1628 freeClient(c);
1629 return;
1630 }
1631 } else {
1632 /* Bulk read handling. Note that if we are at this point
1633 the client already sent a command terminated with a newline,
1634 we are reading the bulk data that is actually the last
1635 argument of the command. */
1636 int qbl = sdslen(c->querybuf);
1637
1638 if (c->bulklen <= qbl) {
1639 /* Copy everything but the final CRLF as final argument */
1640 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1641 c->argc++;
1642 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1643 /* Process the command. If the client is still valid after
1644 * the processing and there is more data in the buffer
1645 * try to parse it. */
1646 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1647 return;
1648 }
1649 }
1650 }
1651
1652 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1653 redisClient *c = (redisClient*) privdata;
1654 char buf[REDIS_IOBUF_LEN];
1655 int nread;
1656 REDIS_NOTUSED(el);
1657 REDIS_NOTUSED(mask);
1658
1659 nread = read(fd, buf, REDIS_IOBUF_LEN);
1660 if (nread == -1) {
1661 if (errno == EAGAIN) {
1662 nread = 0;
1663 } else {
1664 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1665 freeClient(c);
1666 return;
1667 }
1668 } else if (nread == 0) {
1669 redisLog(REDIS_DEBUG, "Client closed connection");
1670 freeClient(c);
1671 return;
1672 }
1673 if (nread) {
1674 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1675 c->lastinteraction = time(NULL);
1676 } else {
1677 return;
1678 }
1679 processInputBuffer(c);
1680 }
1681
1682 static int selectDb(redisClient *c, int id) {
1683 if (id < 0 || id >= server.dbnum)
1684 return REDIS_ERR;
1685 c->db = &server.db[id];
1686 return REDIS_OK;
1687 }
1688
1689 static void *dupClientReplyValue(void *o) {
1690 incrRefCount((robj*)o);
1691 return 0;
1692 }
1693
1694 static redisClient *createClient(int fd) {
1695 redisClient *c = zmalloc(sizeof(*c));
1696
1697 anetNonBlock(NULL,fd);
1698 anetTcpNoDelay(NULL,fd);
1699 if (!c) return NULL;
1700 selectDb(c,0);
1701 c->fd = fd;
1702 c->querybuf = sdsempty();
1703 c->argc = 0;
1704 c->argv = NULL;
1705 c->bulklen = -1;
1706 c->multibulk = 0;
1707 c->mbargc = 0;
1708 c->mbargv = NULL;
1709 c->sentlen = 0;
1710 c->flags = 0;
1711 c->lastinteraction = time(NULL);
1712 c->authenticated = 0;
1713 c->replstate = REDIS_REPL_NONE;
1714 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1715 listSetFreeMethod(c->reply,decrRefCount);
1716 listSetDupMethod(c->reply,dupClientReplyValue);
1717 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1718 readQueryFromClient, c, NULL) == AE_ERR) {
1719 freeClient(c);
1720 return NULL;
1721 }
1722 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1723 return c;
1724 }
1725
1726 static void addReply(redisClient *c, robj *obj) {
1727 if (listLength(c->reply) == 0 &&
1728 (c->replstate == REDIS_REPL_NONE ||
1729 c->replstate == REDIS_REPL_ONLINE) &&
1730 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1731 sendReplyToClient, c, NULL) == AE_ERR) return;
1732 if (obj->encoding != REDIS_ENCODING_RAW) {
1733 obj = getDecodedObject(obj);
1734 } else {
1735 incrRefCount(obj);
1736 }
1737 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1738 }
1739
1740 static void addReplySds(redisClient *c, sds s) {
1741 robj *o = createObject(REDIS_STRING,s);
1742 addReply(c,o);
1743 decrRefCount(o);
1744 }
1745
1746 static void addReplyBulkLen(redisClient *c, robj *obj) {
1747 size_t len;
1748
1749 if (obj->encoding == REDIS_ENCODING_RAW) {
1750 len = sdslen(obj->ptr);
1751 } else {
1752 long n = (long)obj->ptr;
1753
1754 len = 1;
1755 if (n < 0) {
1756 len++;
1757 n = -n;
1758 }
1759 while((n = n/10) != 0) {
1760 len++;
1761 }
1762 }
1763 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1764 }
1765
1766 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1767 int cport, cfd;
1768 char cip[128];
1769 redisClient *c;
1770 REDIS_NOTUSED(el);
1771 REDIS_NOTUSED(mask);
1772 REDIS_NOTUSED(privdata);
1773
1774 cfd = anetAccept(server.neterr, fd, cip, &cport);
1775 if (cfd == AE_ERR) {
1776 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1777 return;
1778 }
1779 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1780 if ((c = createClient(cfd)) == NULL) {
1781 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1782 close(cfd); /* May be already closed, just ingore errors */
1783 return;
1784 }
1785 /* If maxclient directive is set and this is one client more... close the
1786 * connection. Note that we create the client instead to check before
1787 * for this condition, since now the socket is already set in nonblocking
1788 * mode and we can send an error for free using the Kernel I/O */
1789 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1790 char *err = "-ERR max number of clients reached\r\n";
1791
1792 /* That's a best effort error message, don't check write errors */
1793 (void) write(c->fd,err,strlen(err));
1794 freeClient(c);
1795 return;
1796 }
1797 server.stat_numconnections++;
1798 }
1799
1800 /* ======================= Redis objects implementation ===================== */
1801
1802 static robj *createObject(int type, void *ptr) {
1803 robj *o;
1804
1805 if (listLength(server.objfreelist)) {
1806 listNode *head = listFirst(server.objfreelist);
1807 o = listNodeValue(head);
1808 listDelNode(server.objfreelist,head);
1809 } else {
1810 o = zmalloc(sizeof(*o));
1811 }
1812 if (!o) oom("createObject");
1813 o->type = type;
1814 o->encoding = REDIS_ENCODING_RAW;
1815 o->ptr = ptr;
1816 o->refcount = 1;
1817 return o;
1818 }
1819
1820 static robj *createStringObject(char *ptr, size_t len) {
1821 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1822 }
1823
1824 static robj *createListObject(void) {
1825 list *l = listCreate();
1826
1827 if (!l) oom("listCreate");
1828 listSetFreeMethod(l,decrRefCount);
1829 return createObject(REDIS_LIST,l);
1830 }
1831
1832 static robj *createSetObject(void) {
1833 dict *d = dictCreate(&setDictType,NULL);
1834 if (!d) oom("dictCreate");
1835 return createObject(REDIS_SET,d);
1836 }
1837
1838 #if 0
1839 static robj *createZsetObject(void) {
1840 dict *d = dictCreate(&zsetDictType,NULL);
1841 if (!d) oom("dictCreate");
1842 return createObject(REDIS_ZSET,d);
1843 }
1844 #endif
1845
1846 static void freeStringObject(robj *o) {
1847 if (o->encoding == REDIS_ENCODING_RAW) {
1848 sdsfree(o->ptr);
1849 }
1850 }
1851
1852 static void freeListObject(robj *o) {
1853 listRelease((list*) o->ptr);
1854 }
1855
1856 static void freeSetObject(robj *o) {
1857 dictRelease((dict*) o->ptr);
1858 }
1859
1860 static void freeHashObject(robj *o) {
1861 dictRelease((dict*) o->ptr);
1862 }
1863
1864 static void incrRefCount(robj *o) {
1865 o->refcount++;
1866 #ifdef DEBUG_REFCOUNT
1867 if (o->type == REDIS_STRING)
1868 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1869 #endif
1870 }
1871
1872 static void decrRefCount(void *obj) {
1873 robj *o = obj;
1874
1875 #ifdef DEBUG_REFCOUNT
1876 if (o->type == REDIS_STRING)
1877 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1878 #endif
1879 if (--(o->refcount) == 0) {
1880 switch(o->type) {
1881 case REDIS_STRING: freeStringObject(o); break;
1882 case REDIS_LIST: freeListObject(o); break;
1883 case REDIS_SET: freeSetObject(o); break;
1884 case REDIS_HASH: freeHashObject(o); break;
1885 default: assert(0 != 0); break;
1886 }
1887 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1888 !listAddNodeHead(server.objfreelist,o))
1889 zfree(o);
1890 }
1891 }
1892
1893 static robj *lookupKey(redisDb *db, robj *key) {
1894 dictEntry *de = dictFind(db->dict,key);
1895 return de ? dictGetEntryVal(de) : NULL;
1896 }
1897
1898 static robj *lookupKeyRead(redisDb *db, robj *key) {
1899 expireIfNeeded(db,key);
1900 return lookupKey(db,key);
1901 }
1902
1903 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1904 deleteIfVolatile(db,key);
1905 return lookupKey(db,key);
1906 }
1907
1908 static int deleteKey(redisDb *db, robj *key) {
1909 int retval;
1910
1911 /* We need to protect key from destruction: after the first dictDelete()
1912 * it may happen that 'key' is no longer valid if we don't increment
1913 * it's count. This may happen when we get the object reference directly
1914 * from the hash table with dictRandomKey() or dict iterators */
1915 incrRefCount(key);
1916 if (dictSize(db->expires)) dictDelete(db->expires,key);
1917 retval = dictDelete(db->dict,key);
1918 decrRefCount(key);
1919
1920 return retval == DICT_OK;
1921 }
1922
1923 /* Try to share an object against the shared objects pool */
1924 static robj *tryObjectSharing(robj *o) {
1925 struct dictEntry *de;
1926 unsigned long c;
1927
1928 if (o == NULL || server.shareobjects == 0) return o;
1929
1930 assert(o->type == REDIS_STRING);
1931 de = dictFind(server.sharingpool,o);
1932 if (de) {
1933 robj *shared = dictGetEntryKey(de);
1934
1935 c = ((unsigned long) dictGetEntryVal(de))+1;
1936 dictGetEntryVal(de) = (void*) c;
1937 incrRefCount(shared);
1938 decrRefCount(o);
1939 return shared;
1940 } else {
1941 /* Here we are using a stream algorihtm: Every time an object is
1942 * shared we increment its count, everytime there is a miss we
1943 * recrement the counter of a random object. If this object reaches
1944 * zero we remove the object and put the current object instead. */
1945 if (dictSize(server.sharingpool) >=
1946 server.sharingpoolsize) {
1947 de = dictGetRandomKey(server.sharingpool);
1948 assert(de != NULL);
1949 c = ((unsigned long) dictGetEntryVal(de))-1;
1950 dictGetEntryVal(de) = (void*) c;
1951 if (c == 0) {
1952 dictDelete(server.sharingpool,de->key);
1953 }
1954 } else {
1955 c = 0; /* If the pool is empty we want to add this object */
1956 }
1957 if (c == 0) {
1958 int retval;
1959
1960 retval = dictAdd(server.sharingpool,o,(void*)1);
1961 assert(retval == DICT_OK);
1962 incrRefCount(o);
1963 }
1964 return o;
1965 }
1966 }
1967
1968 /* Check if the nul-terminated string 's' can be represented by a long
1969 * (that is, is a number that fits into long without any other space or
1970 * character before or after the digits).
1971 *
1972 * If so, the function returns REDIS_OK and *longval is set to the value
1973 * of the number. Otherwise REDIS_ERR is returned */
1974 static int isStringRepresentableAsLong(sds s, long *longval) {
1975 char buf[32], *endptr;
1976 long value;
1977 int slen;
1978
1979 value = strtol(s, &endptr, 10);
1980 if (endptr[0] != '\0') return REDIS_ERR;
1981 slen = snprintf(buf,32,"%ld",value);
1982
1983 /* If the number converted back into a string is not identical
1984 * then it's not possible to encode the string as integer */
1985 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
1986 if (longval) *longval = value;
1987 return REDIS_OK;
1988 }
1989
1990 /* Try to encode a string object in order to save space */
1991 static int tryObjectEncoding(robj *o) {
1992 long value;
1993 sds s = o->ptr;
1994
1995 if (o->encoding != REDIS_ENCODING_RAW)
1996 return REDIS_ERR; /* Already encoded */
1997
1998 /* It's not save to encode shared objects: shared objects can be shared
1999 * everywhere in the "object space" of Redis. Encoded objects can only
2000 * appear as "values" (and not, for instance, as keys) */
2001 if (o->refcount > 1) return REDIS_ERR;
2002
2003 /* Currently we try to encode only strings */
2004 assert(o->type == REDIS_STRING);
2005
2006 /* Check if we can represent this string as a long integer */
2007 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2008
2009 /* Ok, this object can be encoded */
2010 o->encoding = REDIS_ENCODING_INT;
2011 sdsfree(o->ptr);
2012 o->ptr = (void*) value;
2013 return REDIS_OK;
2014 }
2015
2016 /* Get a decoded version of an encoded object (returned as a new object) */
2017 static robj *getDecodedObject(const robj *o) {
2018 robj *dec;
2019
2020 assert(o->encoding != REDIS_ENCODING_RAW);
2021 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2022 char buf[32];
2023
2024 snprintf(buf,32,"%ld",(long)o->ptr);
2025 dec = createStringObject(buf,strlen(buf));
2026 return dec;
2027 } else {
2028 assert(1 != 1);
2029 }
2030 }
2031
2032 static int compareStringObjects(robj *a, robj *b) {
2033 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2034
2035 if (a->encoding == REDIS_ENCODING_INT && b->encoding == REDIS_ENCODING_INT){
2036 return (long)a->ptr - (long)b->ptr;
2037 } else {
2038 int retval;
2039
2040 incrRefCount(a);
2041 incrRefCount(b);
2042 if (a->encoding != REDIS_ENCODING_RAW) a = getDecodedObject(a);
2043 if (b->encoding != REDIS_ENCODING_RAW) b = getDecodedObject(a);
2044 retval = sdscmp(a->ptr,b->ptr);
2045 decrRefCount(a);
2046 decrRefCount(b);
2047 return retval;
2048 }
2049 }
2050
2051 static size_t stringObjectLen(robj *o) {
2052 assert(o->type == REDIS_STRING);
2053 if (o->encoding == REDIS_ENCODING_RAW) {
2054 return sdslen(o->ptr);
2055 } else {
2056 char buf[32];
2057
2058 return snprintf(buf,32,"%ld",(long)o->ptr);
2059 }
2060 }
2061
2062 /*============================ DB saving/loading ============================ */
2063
2064 static int rdbSaveType(FILE *fp, unsigned char type) {
2065 if (fwrite(&type,1,1,fp) == 0) return -1;
2066 return 0;
2067 }
2068
2069 static int rdbSaveTime(FILE *fp, time_t t) {
2070 int32_t t32 = (int32_t) t;
2071 if (fwrite(&t32,4,1,fp) == 0) return -1;
2072 return 0;
2073 }
2074
2075 /* check rdbLoadLen() comments for more info */
2076 static int rdbSaveLen(FILE *fp, uint32_t len) {
2077 unsigned char buf[2];
2078
2079 if (len < (1<<6)) {
2080 /* Save a 6 bit len */
2081 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2082 if (fwrite(buf,1,1,fp) == 0) return -1;
2083 } else if (len < (1<<14)) {
2084 /* Save a 14 bit len */
2085 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2086 buf[1] = len&0xFF;
2087 if (fwrite(buf,2,1,fp) == 0) return -1;
2088 } else {
2089 /* Save a 32 bit len */
2090 buf[0] = (REDIS_RDB_32BITLEN<<6);
2091 if (fwrite(buf,1,1,fp) == 0) return -1;
2092 len = htonl(len);
2093 if (fwrite(&len,4,1,fp) == 0) return -1;
2094 }
2095 return 0;
2096 }
2097
2098 /* String objects in the form "2391" "-100" without any space and with a
2099 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2100 * encoded as integers to save space */
2101 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2102 long long value;
2103 char *endptr, buf[32];
2104
2105 /* Check if it's possible to encode this value as a number */
2106 value = strtoll(s, &endptr, 10);
2107 if (endptr[0] != '\0') return 0;
2108 snprintf(buf,32,"%lld",value);
2109
2110 /* If the number converted back into a string is not identical
2111 * then it's not possible to encode the string as integer */
2112 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2113
2114 /* Finally check if it fits in our ranges */
2115 if (value >= -(1<<7) && value <= (1<<7)-1) {
2116 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2117 enc[1] = value&0xFF;
2118 return 2;
2119 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2120 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2121 enc[1] = value&0xFF;
2122 enc[2] = (value>>8)&0xFF;
2123 return 3;
2124 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2125 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2126 enc[1] = value&0xFF;
2127 enc[2] = (value>>8)&0xFF;
2128 enc[3] = (value>>16)&0xFF;
2129 enc[4] = (value>>24)&0xFF;
2130 return 5;
2131 } else {
2132 return 0;
2133 }
2134 }
2135
2136 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2137 unsigned int comprlen, outlen;
2138 unsigned char byte;
2139 void *out;
2140
2141 /* We require at least four bytes compression for this to be worth it */
2142 outlen = sdslen(obj->ptr)-4;
2143 if (outlen <= 0) return 0;
2144 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2145 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2146 if (comprlen == 0) {
2147 zfree(out);
2148 return 0;
2149 }
2150 /* Data compressed! Let's save it on disk */
2151 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2152 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2153 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2154 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2155 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2156 zfree(out);
2157 return comprlen;
2158
2159 writeerr:
2160 zfree(out);
2161 return -1;
2162 }
2163
2164 /* Save a string objet as [len][data] on disk. If the object is a string
2165 * representation of an integer value we try to safe it in a special form */
2166 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2167 size_t len;
2168 int enclen;
2169
2170 len = sdslen(obj->ptr);
2171
2172 /* Try integer encoding */
2173 if (len <= 11) {
2174 unsigned char buf[5];
2175 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2176 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2177 return 0;
2178 }
2179 }
2180
2181 /* Try LZF compression - under 20 bytes it's unable to compress even
2182 * aaaaaaaaaaaaaaaaaa so skip it */
2183 if (len > 20) {
2184 int retval;
2185
2186 retval = rdbSaveLzfStringObject(fp,obj);
2187 if (retval == -1) return -1;
2188 if (retval > 0) return 0;
2189 /* retval == 0 means data can't be compressed, save the old way */
2190 }
2191
2192 /* Store verbatim */
2193 if (rdbSaveLen(fp,len) == -1) return -1;
2194 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2195 return 0;
2196 }
2197
2198 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2199 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2200 int retval;
2201 robj *dec;
2202
2203 if (obj->encoding != REDIS_ENCODING_RAW) {
2204 dec = getDecodedObject(obj);
2205 retval = rdbSaveStringObjectRaw(fp,dec);
2206 decrRefCount(dec);
2207 return retval;
2208 } else {
2209 return rdbSaveStringObjectRaw(fp,obj);
2210 }
2211 }
2212
2213 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2214 static int rdbSave(char *filename) {
2215 dictIterator *di = NULL;
2216 dictEntry *de;
2217 FILE *fp;
2218 char tmpfile[256];
2219 int j;
2220 time_t now = time(NULL);
2221
2222 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2223 fp = fopen(tmpfile,"w");
2224 if (!fp) {
2225 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2226 return REDIS_ERR;
2227 }
2228 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2229 for (j = 0; j < server.dbnum; j++) {
2230 redisDb *db = server.db+j;
2231 dict *d = db->dict;
2232 if (dictSize(d) == 0) continue;
2233 di = dictGetIterator(d);
2234 if (!di) {
2235 fclose(fp);
2236 return REDIS_ERR;
2237 }
2238
2239 /* Write the SELECT DB opcode */
2240 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2241 if (rdbSaveLen(fp,j) == -1) goto werr;
2242
2243 /* Iterate this DB writing every entry */
2244 while((de = dictNext(di)) != NULL) {
2245 robj *key = dictGetEntryKey(de);
2246 robj *o = dictGetEntryVal(de);
2247 time_t expiretime = getExpire(db,key);
2248
2249 /* Save the expire time */
2250 if (expiretime != -1) {
2251 /* If this key is already expired skip it */
2252 if (expiretime < now) continue;
2253 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2254 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2255 }
2256 /* Save the key and associated value */
2257 if (rdbSaveType(fp,o->type) == -1) goto werr;
2258 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2259 if (o->type == REDIS_STRING) {
2260 /* Save a string value */
2261 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2262 } else if (o->type == REDIS_LIST) {
2263 /* Save a list value */
2264 list *list = o->ptr;
2265 listNode *ln;
2266
2267 listRewind(list);
2268 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2269 while((ln = listYield(list))) {
2270 robj *eleobj = listNodeValue(ln);
2271
2272 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2273 }
2274 } else if (o->type == REDIS_SET) {
2275 /* Save a set value */
2276 dict *set = o->ptr;
2277 dictIterator *di = dictGetIterator(set);
2278 dictEntry *de;
2279
2280 if (!set) oom("dictGetIteraotr");
2281 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2282 while((de = dictNext(di)) != NULL) {
2283 robj *eleobj = dictGetEntryKey(de);
2284
2285 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2286 }
2287 dictReleaseIterator(di);
2288 } else {
2289 assert(0 != 0);
2290 }
2291 }
2292 dictReleaseIterator(di);
2293 }
2294 /* EOF opcode */
2295 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2296
2297 /* Make sure data will not remain on the OS's output buffers */
2298 fflush(fp);
2299 fsync(fileno(fp));
2300 fclose(fp);
2301
2302 /* Use RENAME to make sure the DB file is changed atomically only
2303 * if the generate DB file is ok. */
2304 if (rename(tmpfile,filename) == -1) {
2305 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
2306 unlink(tmpfile);
2307 return REDIS_ERR;
2308 }
2309 redisLog(REDIS_NOTICE,"DB saved on disk");
2310 server.dirty = 0;
2311 server.lastsave = time(NULL);
2312 return REDIS_OK;
2313
2314 werr:
2315 fclose(fp);
2316 unlink(tmpfile);
2317 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2318 if (di) dictReleaseIterator(di);
2319 return REDIS_ERR;
2320 }
2321
2322 static int rdbSaveBackground(char *filename) {
2323 pid_t childpid;
2324
2325 if (server.bgsaveinprogress) return REDIS_ERR;
2326 if ((childpid = fork()) == 0) {
2327 /* Child */
2328 close(server.fd);
2329 if (rdbSave(filename) == REDIS_OK) {
2330 exit(0);
2331 } else {
2332 exit(1);
2333 }
2334 } else {
2335 /* Parent */
2336 if (childpid == -1) {
2337 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2338 strerror(errno));
2339 return REDIS_ERR;
2340 }
2341 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2342 server.bgsaveinprogress = 1;
2343 server.bgsavechildpid = childpid;
2344 return REDIS_OK;
2345 }
2346 return REDIS_OK; /* unreached */
2347 }
2348
2349 static void rdbRemoveTempFile(pid_t childpid) {
2350 char tmpfile[256];
2351
2352 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2353 unlink(tmpfile);
2354 }
2355
2356 static int rdbLoadType(FILE *fp) {
2357 unsigned char type;
2358 if (fread(&type,1,1,fp) == 0) return -1;
2359 return type;
2360 }
2361
2362 static time_t rdbLoadTime(FILE *fp) {
2363 int32_t t32;
2364 if (fread(&t32,4,1,fp) == 0) return -1;
2365 return (time_t) t32;
2366 }
2367
2368 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2369 * of this file for a description of how this are stored on disk.
2370 *
2371 * isencoded is set to 1 if the readed length is not actually a length but
2372 * an "encoding type", check the above comments for more info */
2373 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2374 unsigned char buf[2];
2375 uint32_t len;
2376
2377 if (isencoded) *isencoded = 0;
2378 if (rdbver == 0) {
2379 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2380 return ntohl(len);
2381 } else {
2382 int type;
2383
2384 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2385 type = (buf[0]&0xC0)>>6;
2386 if (type == REDIS_RDB_6BITLEN) {
2387 /* Read a 6 bit len */
2388 return buf[0]&0x3F;
2389 } else if (type == REDIS_RDB_ENCVAL) {
2390 /* Read a 6 bit len encoding type */
2391 if (isencoded) *isencoded = 1;
2392 return buf[0]&0x3F;
2393 } else if (type == REDIS_RDB_14BITLEN) {
2394 /* Read a 14 bit len */
2395 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2396 return ((buf[0]&0x3F)<<8)|buf[1];
2397 } else {
2398 /* Read a 32 bit len */
2399 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2400 return ntohl(len);
2401 }
2402 }
2403 }
2404
2405 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2406 unsigned char enc[4];
2407 long long val;
2408
2409 if (enctype == REDIS_RDB_ENC_INT8) {
2410 if (fread(enc,1,1,fp) == 0) return NULL;
2411 val = (signed char)enc[0];
2412 } else if (enctype == REDIS_RDB_ENC_INT16) {
2413 uint16_t v;
2414 if (fread(enc,2,1,fp) == 0) return NULL;
2415 v = enc[0]|(enc[1]<<8);
2416 val = (int16_t)v;
2417 } else if (enctype == REDIS_RDB_ENC_INT32) {
2418 uint32_t v;
2419 if (fread(enc,4,1,fp) == 0) return NULL;
2420 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2421 val = (int32_t)v;
2422 } else {
2423 val = 0; /* anti-warning */
2424 assert(0!=0);
2425 }
2426 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2427 }
2428
2429 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2430 unsigned int len, clen;
2431 unsigned char *c = NULL;
2432 sds val = NULL;
2433
2434 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2435 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2436 if ((c = zmalloc(clen)) == NULL) goto err;
2437 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2438 if (fread(c,clen,1,fp) == 0) goto err;
2439 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2440 zfree(c);
2441 return createObject(REDIS_STRING,val);
2442 err:
2443 zfree(c);
2444 sdsfree(val);
2445 return NULL;
2446 }
2447
2448 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2449 int isencoded;
2450 uint32_t len;
2451 sds val;
2452
2453 len = rdbLoadLen(fp,rdbver,&isencoded);
2454 if (isencoded) {
2455 switch(len) {
2456 case REDIS_RDB_ENC_INT8:
2457 case REDIS_RDB_ENC_INT16:
2458 case REDIS_RDB_ENC_INT32:
2459 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2460 case REDIS_RDB_ENC_LZF:
2461 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2462 default:
2463 assert(0!=0);
2464 }
2465 }
2466
2467 if (len == REDIS_RDB_LENERR) return NULL;
2468 val = sdsnewlen(NULL,len);
2469 if (len && fread(val,len,1,fp) == 0) {
2470 sdsfree(val);
2471 return NULL;
2472 }
2473 return tryObjectSharing(createObject(REDIS_STRING,val));
2474 }
2475
2476 static int rdbLoad(char *filename) {
2477 FILE *fp;
2478 robj *keyobj = NULL;
2479 uint32_t dbid;
2480 int type, retval, rdbver;
2481 dict *d = server.db[0].dict;
2482 redisDb *db = server.db+0;
2483 char buf[1024];
2484 time_t expiretime = -1, now = time(NULL);
2485
2486 fp = fopen(filename,"r");
2487 if (!fp) return REDIS_ERR;
2488 if (fread(buf,9,1,fp) == 0) goto eoferr;
2489 buf[9] = '\0';
2490 if (memcmp(buf,"REDIS",5) != 0) {
2491 fclose(fp);
2492 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2493 return REDIS_ERR;
2494 }
2495 rdbver = atoi(buf+5);
2496 if (rdbver > 1) {
2497 fclose(fp);
2498 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2499 return REDIS_ERR;
2500 }
2501 while(1) {
2502 robj *o;
2503
2504 /* Read type. */
2505 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2506 if (type == REDIS_EXPIRETIME) {
2507 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2508 /* We read the time so we need to read the object type again */
2509 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2510 }
2511 if (type == REDIS_EOF) break;
2512 /* Handle SELECT DB opcode as a special case */
2513 if (type == REDIS_SELECTDB) {
2514 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2515 goto eoferr;
2516 if (dbid >= (unsigned)server.dbnum) {
2517 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2518 exit(1);
2519 }
2520 db = server.db+dbid;
2521 d = db->dict;
2522 continue;
2523 }
2524 /* Read key */
2525 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2526
2527 if (type == REDIS_STRING) {
2528 /* Read string value */
2529 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2530 tryObjectEncoding(o);
2531 } else if (type == REDIS_LIST || type == REDIS_SET) {
2532 /* Read list/set value */
2533 uint32_t listlen;
2534
2535 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2536 goto eoferr;
2537 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2538 /* Load every single element of the list/set */
2539 while(listlen--) {
2540 robj *ele;
2541
2542 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2543 tryObjectEncoding(ele);
2544 if (type == REDIS_LIST) {
2545 if (!listAddNodeTail((list*)o->ptr,ele))
2546 oom("listAddNodeTail");
2547 } else {
2548 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2549 oom("dictAdd");
2550 }
2551 }
2552 } else {
2553 assert(0 != 0);
2554 }
2555 /* Add the new object in the hash table */
2556 retval = dictAdd(d,keyobj,o);
2557 if (retval == DICT_ERR) {
2558 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2559 exit(1);
2560 }
2561 /* Set the expire time if needed */
2562 if (expiretime != -1) {
2563 setExpire(db,keyobj,expiretime);
2564 /* Delete this key if already expired */
2565 if (expiretime < now) deleteKey(db,keyobj);
2566 expiretime = -1;
2567 }
2568 keyobj = o = NULL;
2569 }
2570 fclose(fp);
2571 return REDIS_OK;
2572
2573 eoferr: /* unexpected end of file is handled here with a fatal exit */
2574 if (keyobj) decrRefCount(keyobj);
2575 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2576 exit(1);
2577 return REDIS_ERR; /* Just to avoid warning */
2578 }
2579
2580 /*================================== Commands =============================== */
2581
2582 static void authCommand(redisClient *c) {
2583 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2584 c->authenticated = 1;
2585 addReply(c,shared.ok);
2586 } else {
2587 c->authenticated = 0;
2588 addReply(c,shared.err);
2589 }
2590 }
2591
2592 static void pingCommand(redisClient *c) {
2593 addReply(c,shared.pong);
2594 }
2595
2596 static void echoCommand(redisClient *c) {
2597 addReplyBulkLen(c,c->argv[1]);
2598 addReply(c,c->argv[1]);
2599 addReply(c,shared.crlf);
2600 }
2601
2602 /*=================================== Strings =============================== */
2603
2604 static void setGenericCommand(redisClient *c, int nx) {
2605 int retval;
2606
2607 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2608 if (retval == DICT_ERR) {
2609 if (!nx) {
2610 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2611 incrRefCount(c->argv[2]);
2612 } else {
2613 addReply(c,shared.czero);
2614 return;
2615 }
2616 } else {
2617 incrRefCount(c->argv[1]);
2618 incrRefCount(c->argv[2]);
2619 }
2620 server.dirty++;
2621 removeExpire(c->db,c->argv[1]);
2622 addReply(c, nx ? shared.cone : shared.ok);
2623 }
2624
2625 static void setCommand(redisClient *c) {
2626 setGenericCommand(c,0);
2627 }
2628
2629 static void setnxCommand(redisClient *c) {
2630 setGenericCommand(c,1);
2631 }
2632
2633 static void getCommand(redisClient *c) {
2634 robj *o = lookupKeyRead(c->db,c->argv[1]);
2635
2636 if (o == NULL) {
2637 addReply(c,shared.nullbulk);
2638 } else {
2639 if (o->type != REDIS_STRING) {
2640 addReply(c,shared.wrongtypeerr);
2641 } else {
2642 addReplyBulkLen(c,o);
2643 addReply(c,o);
2644 addReply(c,shared.crlf);
2645 }
2646 }
2647 }
2648
2649 static void getsetCommand(redisClient *c) {
2650 getCommand(c);
2651 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2652 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2653 } else {
2654 incrRefCount(c->argv[1]);
2655 }
2656 incrRefCount(c->argv[2]);
2657 server.dirty++;
2658 removeExpire(c->db,c->argv[1]);
2659 }
2660
2661 static void mgetCommand(redisClient *c) {
2662 int j;
2663
2664 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2665 for (j = 1; j < c->argc; j++) {
2666 robj *o = lookupKeyRead(c->db,c->argv[j]);
2667 if (o == NULL) {
2668 addReply(c,shared.nullbulk);
2669 } else {
2670 if (o->type != REDIS_STRING) {
2671 addReply(c,shared.nullbulk);
2672 } else {
2673 addReplyBulkLen(c,o);
2674 addReply(c,o);
2675 addReply(c,shared.crlf);
2676 }
2677 }
2678 }
2679 }
2680
2681 static void incrDecrCommand(redisClient *c, long long incr) {
2682 long long value;
2683 int retval;
2684 robj *o;
2685
2686 o = lookupKeyWrite(c->db,c->argv[1]);
2687 if (o == NULL) {
2688 value = 0;
2689 } else {
2690 if (o->type != REDIS_STRING) {
2691 value = 0;
2692 } else {
2693 char *eptr;
2694
2695 if (o->encoding == REDIS_ENCODING_RAW)
2696 value = strtoll(o->ptr, &eptr, 10);
2697 else if (o->encoding == REDIS_ENCODING_INT)
2698 value = (long)o->ptr;
2699 else
2700 assert(1 != 1);
2701 }
2702 }
2703
2704 value += incr;
2705 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2706 tryObjectEncoding(o);
2707 retval = dictAdd(c->db->dict,c->argv[1],o);
2708 if (retval == DICT_ERR) {
2709 dictReplace(c->db->dict,c->argv[1],o);
2710 removeExpire(c->db,c->argv[1]);
2711 } else {
2712 incrRefCount(c->argv[1]);
2713 }
2714 server.dirty++;
2715 addReply(c,shared.colon);
2716 addReply(c,o);
2717 addReply(c,shared.crlf);
2718 }
2719
2720 static void incrCommand(redisClient *c) {
2721 incrDecrCommand(c,1);
2722 }
2723
2724 static void decrCommand(redisClient *c) {
2725 incrDecrCommand(c,-1);
2726 }
2727
2728 static void incrbyCommand(redisClient *c) {
2729 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2730 incrDecrCommand(c,incr);
2731 }
2732
2733 static void decrbyCommand(redisClient *c) {
2734 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2735 incrDecrCommand(c,-incr);
2736 }
2737
2738 /* ========================= Type agnostic commands ========================= */
2739
2740 static void delCommand(redisClient *c) {
2741 int deleted = 0, j;
2742
2743 for (j = 1; j < c->argc; j++) {
2744 if (deleteKey(c->db,c->argv[j])) {
2745 server.dirty++;
2746 deleted++;
2747 }
2748 }
2749 switch(deleted) {
2750 case 0:
2751 addReply(c,shared.czero);
2752 break;
2753 case 1:
2754 addReply(c,shared.cone);
2755 break;
2756 default:
2757 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2758 break;
2759 }
2760 }
2761
2762 static void existsCommand(redisClient *c) {
2763 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2764 }
2765
2766 static void selectCommand(redisClient *c) {
2767 int id = atoi(c->argv[1]->ptr);
2768
2769 if (selectDb(c,id) == REDIS_ERR) {
2770 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2771 } else {
2772 addReply(c,shared.ok);
2773 }
2774 }
2775
2776 static void randomkeyCommand(redisClient *c) {
2777 dictEntry *de;
2778
2779 while(1) {
2780 de = dictGetRandomKey(c->db->dict);
2781 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2782 }
2783 if (de == NULL) {
2784 addReply(c,shared.plus);
2785 addReply(c,shared.crlf);
2786 } else {
2787 addReply(c,shared.plus);
2788 addReply(c,dictGetEntryKey(de));
2789 addReply(c,shared.crlf);
2790 }
2791 }
2792
2793 static void keysCommand(redisClient *c) {
2794 dictIterator *di;
2795 dictEntry *de;
2796 sds pattern = c->argv[1]->ptr;
2797 int plen = sdslen(pattern);
2798 int numkeys = 0, keyslen = 0;
2799 robj *lenobj = createObject(REDIS_STRING,NULL);
2800
2801 di = dictGetIterator(c->db->dict);
2802 if (!di) oom("dictGetIterator");
2803 addReply(c,lenobj);
2804 decrRefCount(lenobj);
2805 while((de = dictNext(di)) != NULL) {
2806 robj *keyobj = dictGetEntryKey(de);
2807
2808 sds key = keyobj->ptr;
2809 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2810 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2811 if (expireIfNeeded(c->db,keyobj) == 0) {
2812 if (numkeys != 0)
2813 addReply(c,shared.space);
2814 addReply(c,keyobj);
2815 numkeys++;
2816 keyslen += sdslen(key);
2817 }
2818 }
2819 }
2820 dictReleaseIterator(di);
2821 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2822 addReply(c,shared.crlf);
2823 }
2824
2825 static void dbsizeCommand(redisClient *c) {
2826 addReplySds(c,
2827 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2828 }
2829
2830 static void lastsaveCommand(redisClient *c) {
2831 addReplySds(c,
2832 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2833 }
2834
2835 static void typeCommand(redisClient *c) {
2836 robj *o;
2837 char *type;
2838
2839 o = lookupKeyRead(c->db,c->argv[1]);
2840 if (o == NULL) {
2841 type = "+none";
2842 } else {
2843 switch(o->type) {
2844 case REDIS_STRING: type = "+string"; break;
2845 case REDIS_LIST: type = "+list"; break;
2846 case REDIS_SET: type = "+set"; break;
2847 default: type = "unknown"; break;
2848 }
2849 }
2850 addReplySds(c,sdsnew(type));
2851 addReply(c,shared.crlf);
2852 }
2853
2854 static void saveCommand(redisClient *c) {
2855 if (server.bgsaveinprogress) {
2856 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2857 return;
2858 }
2859 if (rdbSave(server.dbfilename) == REDIS_OK) {
2860 addReply(c,shared.ok);
2861 } else {
2862 addReply(c,shared.err);
2863 }
2864 }
2865
2866 static void bgsaveCommand(redisClient *c) {
2867 if (server.bgsaveinprogress) {
2868 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2869 return;
2870 }
2871 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2872 addReply(c,shared.ok);
2873 } else {
2874 addReply(c,shared.err);
2875 }
2876 }
2877
2878 static void shutdownCommand(redisClient *c) {
2879 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2880 /* Kill the saving child if there is a background saving in progress.
2881 We want to avoid race conditions, for instance our saving child may
2882 overwrite the synchronous saving did by SHUTDOWN. */
2883 if (server.bgsaveinprogress) {
2884 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2885 kill(server.bgsavechildpid,SIGKILL);
2886 rdbRemoveTempFile(server.bgsavechildpid);
2887 }
2888 /* SYNC SAVE */
2889 if (rdbSave(server.dbfilename) == REDIS_OK) {
2890 if (server.daemonize)
2891 unlink(server.pidfile);
2892 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2893 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2894 exit(1);
2895 } else {
2896 /* Ooops.. error saving! The best we can do is to continue operating.
2897 * Note that if there was a background saving process, in the next
2898 * cron() Redis will be notified that the background saving aborted,
2899 * handling special stuff like slaves pending for synchronization... */
2900 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2901 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2902 }
2903 }
2904
2905 static void renameGenericCommand(redisClient *c, int nx) {
2906 robj *o;
2907
2908 /* To use the same key as src and dst is probably an error */
2909 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2910 addReply(c,shared.sameobjecterr);
2911 return;
2912 }
2913
2914 o = lookupKeyWrite(c->db,c->argv[1]);
2915 if (o == NULL) {
2916 addReply(c,shared.nokeyerr);
2917 return;
2918 }
2919 incrRefCount(o);
2920 deleteIfVolatile(c->db,c->argv[2]);
2921 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2922 if (nx) {
2923 decrRefCount(o);
2924 addReply(c,shared.czero);
2925 return;
2926 }
2927 dictReplace(c->db->dict,c->argv[2],o);
2928 } else {
2929 incrRefCount(c->argv[2]);
2930 }
2931 deleteKey(c->db,c->argv[1]);
2932 server.dirty++;
2933 addReply(c,nx ? shared.cone : shared.ok);
2934 }
2935
2936 static void renameCommand(redisClient *c) {
2937 renameGenericCommand(c,0);
2938 }
2939
2940 static void renamenxCommand(redisClient *c) {
2941 renameGenericCommand(c,1);
2942 }
2943
2944 static void moveCommand(redisClient *c) {
2945 robj *o;
2946 redisDb *src, *dst;
2947 int srcid;
2948
2949 /* Obtain source and target DB pointers */
2950 src = c->db;
2951 srcid = c->db->id;
2952 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2953 addReply(c,shared.outofrangeerr);
2954 return;
2955 }
2956 dst = c->db;
2957 selectDb(c,srcid); /* Back to the source DB */
2958
2959 /* If the user is moving using as target the same
2960 * DB as the source DB it is probably an error. */
2961 if (src == dst) {
2962 addReply(c,shared.sameobjecterr);
2963 return;
2964 }
2965
2966 /* Check if the element exists and get a reference */
2967 o = lookupKeyWrite(c->db,c->argv[1]);
2968 if (!o) {
2969 addReply(c,shared.czero);
2970 return;
2971 }
2972
2973 /* Try to add the element to the target DB */
2974 deleteIfVolatile(dst,c->argv[1]);
2975 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2976 addReply(c,shared.czero);
2977 return;
2978 }
2979 incrRefCount(c->argv[1]);
2980 incrRefCount(o);
2981
2982 /* OK! key moved, free the entry in the source DB */
2983 deleteKey(src,c->argv[1]);
2984 server.dirty++;
2985 addReply(c,shared.cone);
2986 }
2987
2988 /* =================================== Lists ================================ */
2989 static void pushGenericCommand(redisClient *c, int where) {
2990 robj *lobj;
2991 list *list;
2992
2993 lobj = lookupKeyWrite(c->db,c->argv[1]);
2994 if (lobj == NULL) {
2995 lobj = createListObject();
2996 list = lobj->ptr;
2997 if (where == REDIS_HEAD) {
2998 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2999 } else {
3000 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
3001 }
3002 dictAdd(c->db->dict,c->argv[1],lobj);
3003 incrRefCount(c->argv[1]);
3004 incrRefCount(c->argv[2]);
3005 } else {
3006 if (lobj->type != REDIS_LIST) {
3007 addReply(c,shared.wrongtypeerr);
3008 return;
3009 }
3010 list = lobj->ptr;
3011 if (where == REDIS_HEAD) {
3012 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
3013 } else {
3014 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
3015 }
3016 incrRefCount(c->argv[2]);
3017 }
3018 server.dirty++;
3019 addReply(c,shared.ok);
3020 }
3021
3022 static void lpushCommand(redisClient *c) {
3023 pushGenericCommand(c,REDIS_HEAD);
3024 }
3025
3026 static void rpushCommand(redisClient *c) {
3027 pushGenericCommand(c,REDIS_TAIL);
3028 }
3029
3030 static void llenCommand(redisClient *c) {
3031 robj *o;
3032 list *l;
3033
3034 o = lookupKeyRead(c->db,c->argv[1]);
3035 if (o == NULL) {
3036 addReply(c,shared.czero);
3037 return;
3038 } else {
3039 if (o->type != REDIS_LIST) {
3040 addReply(c,shared.wrongtypeerr);
3041 } else {
3042 l = o->ptr;
3043 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3044 }
3045 }
3046 }
3047
3048 static void lindexCommand(redisClient *c) {
3049 robj *o;
3050 int index = atoi(c->argv[2]->ptr);
3051
3052 o = lookupKeyRead(c->db,c->argv[1]);
3053 if (o == NULL) {
3054 addReply(c,shared.nullbulk);
3055 } else {
3056 if (o->type != REDIS_LIST) {
3057 addReply(c,shared.wrongtypeerr);
3058 } else {
3059 list *list = o->ptr;
3060 listNode *ln;
3061
3062 ln = listIndex(list, index);
3063 if (ln == NULL) {
3064 addReply(c,shared.nullbulk);
3065 } else {
3066 robj *ele = listNodeValue(ln);
3067 addReplyBulkLen(c,ele);
3068 addReply(c,ele);
3069 addReply(c,shared.crlf);
3070 }
3071 }
3072 }
3073 }
3074
3075 static void lsetCommand(redisClient *c) {
3076 robj *o;
3077 int index = atoi(c->argv[2]->ptr);
3078
3079 o = lookupKeyWrite(c->db,c->argv[1]);
3080 if (o == NULL) {
3081 addReply(c,shared.nokeyerr);
3082 } else {
3083 if (o->type != REDIS_LIST) {
3084 addReply(c,shared.wrongtypeerr);
3085 } else {
3086 list *list = o->ptr;
3087 listNode *ln;
3088
3089 ln = listIndex(list, index);
3090 if (ln == NULL) {
3091 addReply(c,shared.outofrangeerr);
3092 } else {
3093 robj *ele = listNodeValue(ln);
3094
3095 decrRefCount(ele);
3096 listNodeValue(ln) = c->argv[3];
3097 incrRefCount(c->argv[3]);
3098 addReply(c,shared.ok);
3099 server.dirty++;
3100 }
3101 }
3102 }
3103 }
3104
3105 static void popGenericCommand(redisClient *c, int where) {
3106 robj *o;
3107
3108 o = lookupKeyWrite(c->db,c->argv[1]);
3109 if (o == NULL) {
3110 addReply(c,shared.nullbulk);
3111 } else {
3112 if (o->type != REDIS_LIST) {
3113 addReply(c,shared.wrongtypeerr);
3114 } else {
3115 list *list = o->ptr;
3116 listNode *ln;
3117
3118 if (where == REDIS_HEAD)
3119 ln = listFirst(list);
3120 else
3121 ln = listLast(list);
3122
3123 if (ln == NULL) {
3124 addReply(c,shared.nullbulk);
3125 } else {
3126 robj *ele = listNodeValue(ln);
3127 addReplyBulkLen(c,ele);
3128 addReply(c,ele);
3129 addReply(c,shared.crlf);
3130 listDelNode(list,ln);
3131 server.dirty++;
3132 }
3133 }
3134 }
3135 }
3136
3137 static void lpopCommand(redisClient *c) {
3138 popGenericCommand(c,REDIS_HEAD);
3139 }
3140
3141 static void rpopCommand(redisClient *c) {
3142 popGenericCommand(c,REDIS_TAIL);
3143 }
3144
3145 static void lrangeCommand(redisClient *c) {
3146 robj *o;
3147 int start = atoi(c->argv[2]->ptr);
3148 int end = atoi(c->argv[3]->ptr);
3149
3150 o = lookupKeyRead(c->db,c->argv[1]);
3151 if (o == NULL) {
3152 addReply(c,shared.nullmultibulk);
3153 } else {
3154 if (o->type != REDIS_LIST) {
3155 addReply(c,shared.wrongtypeerr);
3156 } else {
3157 list *list = o->ptr;
3158 listNode *ln;
3159 int llen = listLength(list);
3160 int rangelen, j;
3161 robj *ele;
3162
3163 /* convert negative indexes */
3164 if (start < 0) start = llen+start;
3165 if (end < 0) end = llen+end;
3166 if (start < 0) start = 0;
3167 if (end < 0) end = 0;
3168
3169 /* indexes sanity checks */
3170 if (start > end || start >= llen) {
3171 /* Out of range start or start > end result in empty list */
3172 addReply(c,shared.emptymultibulk);
3173 return;
3174 }
3175 if (end >= llen) end = llen-1;
3176 rangelen = (end-start)+1;
3177
3178 /* Return the result in form of a multi-bulk reply */
3179 ln = listIndex(list, start);
3180 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3181 for (j = 0; j < rangelen; j++) {
3182 ele = listNodeValue(ln);
3183 addReplyBulkLen(c,ele);
3184 addReply(c,ele);
3185 addReply(c,shared.crlf);
3186 ln = ln->next;
3187 }
3188 }
3189 }
3190 }
3191
3192 static void ltrimCommand(redisClient *c) {
3193 robj *o;
3194 int start = atoi(c->argv[2]->ptr);
3195 int end = atoi(c->argv[3]->ptr);
3196
3197 o = lookupKeyWrite(c->db,c->argv[1]);
3198 if (o == NULL) {
3199 addReply(c,shared.nokeyerr);
3200 } else {
3201 if (o->type != REDIS_LIST) {
3202 addReply(c,shared.wrongtypeerr);
3203 } else {
3204 list *list = o->ptr;
3205 listNode *ln;
3206 int llen = listLength(list);
3207 int j, ltrim, rtrim;
3208
3209 /* convert negative indexes */
3210 if (start < 0) start = llen+start;
3211 if (end < 0) end = llen+end;
3212 if (start < 0) start = 0;
3213 if (end < 0) end = 0;
3214
3215 /* indexes sanity checks */
3216 if (start > end || start >= llen) {
3217 /* Out of range start or start > end result in empty list */
3218 ltrim = llen;
3219 rtrim = 0;
3220 } else {
3221 if (end >= llen) end = llen-1;
3222 ltrim = start;
3223 rtrim = llen-end-1;
3224 }
3225
3226 /* Remove list elements to perform the trim */
3227 for (j = 0; j < ltrim; j++) {
3228 ln = listFirst(list);
3229 listDelNode(list,ln);
3230 }
3231 for (j = 0; j < rtrim; j++) {
3232 ln = listLast(list);
3233 listDelNode(list,ln);
3234 }
3235 server.dirty++;
3236 addReply(c,shared.ok);
3237 }
3238 }
3239 }
3240
3241 static void lremCommand(redisClient *c) {
3242 robj *o;
3243
3244 o = lookupKeyWrite(c->db,c->argv[1]);
3245 if (o == NULL) {
3246 addReply(c,shared.czero);
3247 } else {
3248 if (o->type != REDIS_LIST) {
3249 addReply(c,shared.wrongtypeerr);
3250 } else {
3251 list *list = o->ptr;
3252 listNode *ln, *next;
3253 int toremove = atoi(c->argv[2]->ptr);
3254 int removed = 0;
3255 int fromtail = 0;
3256
3257 if (toremove < 0) {
3258 toremove = -toremove;
3259 fromtail = 1;
3260 }
3261 ln = fromtail ? list->tail : list->head;
3262 while (ln) {
3263 robj *ele = listNodeValue(ln);
3264
3265 next = fromtail ? ln->prev : ln->next;
3266 if (compareStringObjects(ele,c->argv[3]) == 0) {
3267 listDelNode(list,ln);
3268 server.dirty++;
3269 removed++;
3270 if (toremove && removed == toremove) break;
3271 }
3272 ln = next;
3273 }
3274 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3275 }
3276 }
3277 }
3278
3279 /* ==================================== Sets ================================ */
3280
3281 static void saddCommand(redisClient *c) {
3282 robj *set;
3283
3284 set = lookupKeyWrite(c->db,c->argv[1]);
3285 if (set == NULL) {
3286 set = createSetObject();
3287 dictAdd(c->db->dict,c->argv[1],set);
3288 incrRefCount(c->argv[1]);
3289 } else {
3290 if (set->type != REDIS_SET) {
3291 addReply(c,shared.wrongtypeerr);
3292 return;
3293 }
3294 }
3295 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3296 incrRefCount(c->argv[2]);
3297 server.dirty++;
3298 addReply(c,shared.cone);
3299 } else {
3300 addReply(c,shared.czero);
3301 }
3302 }
3303
3304 static void sremCommand(redisClient *c) {
3305 robj *set;
3306
3307 set = lookupKeyWrite(c->db,c->argv[1]);
3308 if (set == NULL) {
3309 addReply(c,shared.czero);
3310 } else {
3311 if (set->type != REDIS_SET) {
3312 addReply(c,shared.wrongtypeerr);
3313 return;
3314 }
3315 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3316 server.dirty++;
3317 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3318 addReply(c,shared.cone);
3319 } else {
3320 addReply(c,shared.czero);
3321 }
3322 }
3323 }
3324
3325 static void smoveCommand(redisClient *c) {
3326 robj *srcset, *dstset;
3327
3328 srcset = lookupKeyWrite(c->db,c->argv[1]);
3329 dstset = lookupKeyWrite(c->db,c->argv[2]);
3330
3331 /* If the source key does not exist return 0, if it's of the wrong type
3332 * raise an error */
3333 if (srcset == NULL || srcset->type != REDIS_SET) {
3334 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3335 return;
3336 }
3337 /* Error if the destination key is not a set as well */
3338 if (dstset && dstset->type != REDIS_SET) {
3339 addReply(c,shared.wrongtypeerr);
3340 return;
3341 }
3342 /* Remove the element from the source set */
3343 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3344 /* Key not found in the src set! return zero */
3345 addReply(c,shared.czero);
3346 return;
3347 }
3348 server.dirty++;
3349 /* Add the element to the destination set */
3350 if (!dstset) {
3351 dstset = createSetObject();
3352 dictAdd(c->db->dict,c->argv[2],dstset);
3353 incrRefCount(c->argv[2]);
3354 }
3355 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3356 incrRefCount(c->argv[3]);
3357 addReply(c,shared.cone);
3358 }
3359
3360 static void sismemberCommand(redisClient *c) {
3361 robj *set;
3362
3363 set = lookupKeyRead(c->db,c->argv[1]);
3364 if (set == NULL) {
3365 addReply(c,shared.czero);
3366 } else {
3367 if (set->type != REDIS_SET) {
3368 addReply(c,shared.wrongtypeerr);
3369 return;
3370 }
3371 if (dictFind(set->ptr,c->argv[2]))
3372 addReply(c,shared.cone);
3373 else
3374 addReply(c,shared.czero);
3375 }
3376 }
3377
3378 static void scardCommand(redisClient *c) {
3379 robj *o;
3380 dict *s;
3381
3382 o = lookupKeyRead(c->db,c->argv[1]);
3383 if (o == NULL) {
3384 addReply(c,shared.czero);
3385 return;
3386 } else {
3387 if (o->type != REDIS_SET) {
3388 addReply(c,shared.wrongtypeerr);
3389 } else {
3390 s = o->ptr;
3391 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3392 dictSize(s)));
3393 }
3394 }
3395 }
3396
3397 static void spopCommand(redisClient *c) {
3398 robj *set;
3399 dictEntry *de;
3400
3401 set = lookupKeyWrite(c->db,c->argv[1]);
3402 if (set == NULL) {
3403 addReply(c,shared.nullbulk);
3404 } else {
3405 if (set->type != REDIS_SET) {
3406 addReply(c,shared.wrongtypeerr);
3407 return;
3408 }
3409 de = dictGetRandomKey(set->ptr);
3410 if (de == NULL) {
3411 addReply(c,shared.nullbulk);
3412 } else {
3413 robj *ele = dictGetEntryKey(de);
3414
3415 addReplyBulkLen(c,ele);
3416 addReply(c,ele);
3417 addReply(c,shared.crlf);
3418 dictDelete(set->ptr,ele);
3419 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3420 server.dirty++;
3421 }
3422 }
3423 }
3424
3425 static void srandmemberCommand(redisClient *c) {
3426 robj *set;
3427 dictEntry *de;
3428
3429 set = lookupKeyRead(c->db,c->argv[1]);
3430 if (set == NULL) {
3431 addReply(c,shared.nullbulk);
3432 } else {
3433 if (set->type != REDIS_SET) {
3434 addReply(c,shared.wrongtypeerr);
3435 return;
3436 }
3437 de = dictGetRandomKey(set->ptr);
3438 if (de == NULL) {
3439 addReply(c,shared.nullbulk);
3440 } else {
3441 robj *ele = dictGetEntryKey(de);
3442
3443 addReplyBulkLen(c,ele);
3444 addReply(c,ele);
3445 addReply(c,shared.crlf);
3446 }
3447 }
3448 }
3449
3450 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3451 dict **d1 = (void*) s1, **d2 = (void*) s2;
3452
3453 return dictSize(*d1)-dictSize(*d2);
3454 }
3455
3456 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3457 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3458 dictIterator *di;
3459 dictEntry *de;
3460 robj *lenobj = NULL, *dstset = NULL;
3461 int j, cardinality = 0;
3462
3463 if (!dv) oom("sinterGenericCommand");
3464 for (j = 0; j < setsnum; j++) {
3465 robj *setobj;
3466
3467 setobj = dstkey ?
3468 lookupKeyWrite(c->db,setskeys[j]) :
3469 lookupKeyRead(c->db,setskeys[j]);
3470 if (!setobj) {
3471 zfree(dv);
3472 if (dstkey) {
3473 deleteKey(c->db,dstkey);
3474 addReply(c,shared.ok);
3475 } else {
3476 addReply(c,shared.nullmultibulk);
3477 }
3478 return;
3479 }
3480 if (setobj->type != REDIS_SET) {
3481 zfree(dv);
3482 addReply(c,shared.wrongtypeerr);
3483 return;
3484 }
3485 dv[j] = setobj->ptr;
3486 }
3487 /* Sort sets from the smallest to largest, this will improve our
3488 * algorithm's performace */
3489 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3490
3491 /* The first thing we should output is the total number of elements...
3492 * since this is a multi-bulk write, but at this stage we don't know
3493 * the intersection set size, so we use a trick, append an empty object
3494 * to the output list and save the pointer to later modify it with the
3495 * right length */
3496 if (!dstkey) {
3497 lenobj = createObject(REDIS_STRING,NULL);
3498 addReply(c,lenobj);
3499 decrRefCount(lenobj);
3500 } else {
3501 /* If we have a target key where to store the resulting set
3502 * create this key with an empty set inside */
3503 dstset = createSetObject();
3504 }
3505
3506 /* Iterate all the elements of the first (smallest) set, and test
3507 * the element against all the other sets, if at least one set does
3508 * not include the element it is discarded */
3509 di = dictGetIterator(dv[0]);
3510 if (!di) oom("dictGetIterator");
3511
3512 while((de = dictNext(di)) != NULL) {
3513 robj *ele;
3514
3515 for (j = 1; j < setsnum; j++)
3516 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3517 if (j != setsnum)
3518 continue; /* at least one set does not contain the member */
3519 ele = dictGetEntryKey(de);
3520 if (!dstkey) {
3521 addReplyBulkLen(c,ele);
3522 addReply(c,ele);
3523 addReply(c,shared.crlf);
3524 cardinality++;
3525 } else {
3526 dictAdd(dstset->ptr,ele,NULL);
3527 incrRefCount(ele);
3528 }
3529 }
3530 dictReleaseIterator(di);
3531
3532 if (dstkey) {
3533 /* Store the resulting set into the target */
3534 deleteKey(c->db,dstkey);
3535 dictAdd(c->db->dict,dstkey,dstset);
3536 incrRefCount(dstkey);
3537 }
3538
3539 if (!dstkey) {
3540 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3541 } else {
3542 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3543 dictSize((dict*)dstset->ptr)));
3544 server.dirty++;
3545 }
3546 zfree(dv);
3547 }
3548
3549 static void sinterCommand(redisClient *c) {
3550 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3551 }
3552
3553 static void sinterstoreCommand(redisClient *c) {
3554 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3555 }
3556
3557 #define REDIS_OP_UNION 0
3558 #define REDIS_OP_DIFF 1
3559
3560 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3561 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3562 dictIterator *di;
3563 dictEntry *de;
3564 robj *dstset = NULL;
3565 int j, cardinality = 0;
3566
3567 if (!dv) oom("sunionDiffGenericCommand");
3568 for (j = 0; j < setsnum; j++) {
3569 robj *setobj;
3570
3571 setobj = dstkey ?
3572 lookupKeyWrite(c->db,setskeys[j]) :
3573 lookupKeyRead(c->db,setskeys[j]);
3574 if (!setobj) {
3575 dv[j] = NULL;
3576 continue;
3577 }
3578 if (setobj->type != REDIS_SET) {
3579 zfree(dv);
3580 addReply(c,shared.wrongtypeerr);
3581 return;
3582 }
3583 dv[j] = setobj->ptr;
3584 }
3585
3586 /* We need a temp set object to store our union. If the dstkey
3587 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3588 * this set object will be the resulting object to set into the target key*/
3589 dstset = createSetObject();
3590
3591 /* Iterate all the elements of all the sets, add every element a single
3592 * time to the result set */
3593 for (j = 0; j < setsnum; j++) {
3594 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3595 if (!dv[j]) continue; /* non existing keys are like empty sets */
3596
3597 di = dictGetIterator(dv[j]);
3598 if (!di) oom("dictGetIterator");
3599
3600 while((de = dictNext(di)) != NULL) {
3601 robj *ele;
3602
3603 /* dictAdd will not add the same element multiple times */
3604 ele = dictGetEntryKey(de);
3605 if (op == REDIS_OP_UNION || j == 0) {
3606 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3607 incrRefCount(ele);
3608 cardinality++;
3609 }
3610 } else if (op == REDIS_OP_DIFF) {
3611 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3612 cardinality--;
3613 }
3614 }
3615 }
3616 dictReleaseIterator(di);
3617
3618 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3619 }
3620
3621 /* Output the content of the resulting set, if not in STORE mode */
3622 if (!dstkey) {
3623 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3624 di = dictGetIterator(dstset->ptr);
3625 if (!di) oom("dictGetIterator");
3626 while((de = dictNext(di)) != NULL) {
3627 robj *ele;
3628
3629 ele = dictGetEntryKey(de);
3630 addReplyBulkLen(c,ele);
3631 addReply(c,ele);
3632 addReply(c,shared.crlf);
3633 }
3634 dictReleaseIterator(di);
3635 } else {
3636 /* If we have a target key where to store the resulting set
3637 * create this key with the result set inside */
3638 deleteKey(c->db,dstkey);
3639 dictAdd(c->db->dict,dstkey,dstset);
3640 incrRefCount(dstkey);
3641 }
3642
3643 /* Cleanup */
3644 if (!dstkey) {
3645 decrRefCount(dstset);
3646 } else {
3647 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3648 dictSize((dict*)dstset->ptr)));
3649 server.dirty++;
3650 }
3651 zfree(dv);
3652 }
3653
3654 static void sunionCommand(redisClient *c) {
3655 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3656 }
3657
3658 static void sunionstoreCommand(redisClient *c) {
3659 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3660 }
3661
3662 static void sdiffCommand(redisClient *c) {
3663 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3664 }
3665
3666 static void sdiffstoreCommand(redisClient *c) {
3667 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3668 }
3669
3670 static void flushdbCommand(redisClient *c) {
3671 server.dirty += dictSize(c->db->dict);
3672 dictEmpty(c->db->dict);
3673 dictEmpty(c->db->expires);
3674 addReply(c,shared.ok);
3675 }
3676
3677 static void flushallCommand(redisClient *c) {
3678 server.dirty += emptyDb();
3679 addReply(c,shared.ok);
3680 rdbSave(server.dbfilename);
3681 server.dirty++;
3682 }
3683
3684 static redisSortOperation *createSortOperation(int type, robj *pattern) {
3685 redisSortOperation *so = zmalloc(sizeof(*so));
3686 if (!so) oom("createSortOperation");
3687 so->type = type;
3688 so->pattern = pattern;
3689 return so;
3690 }
3691
3692 /* Return the value associated to the key with a name obtained
3693 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3694 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3695 char *p;
3696 sds spat, ssub;
3697 robj keyobj;
3698 int prefixlen, sublen, postfixlen;
3699 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3700 struct {
3701 long len;
3702 long free;
3703 char buf[REDIS_SORTKEY_MAX+1];
3704 } keyname;
3705
3706 if (subst->encoding == REDIS_ENCODING_RAW)
3707 incrRefCount(subst);
3708 else {
3709 subst = getDecodedObject(subst);
3710 }
3711
3712 spat = pattern->ptr;
3713 ssub = subst->ptr;
3714 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3715 p = strchr(spat,'*');
3716 if (!p) return NULL;
3717
3718 prefixlen = p-spat;
3719 sublen = sdslen(ssub);
3720 postfixlen = sdslen(spat)-(prefixlen+1);
3721 memcpy(keyname.buf,spat,prefixlen);
3722 memcpy(keyname.buf+prefixlen,ssub,sublen);
3723 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3724 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3725 keyname.len = prefixlen+sublen+postfixlen;
3726
3727 keyobj.refcount = 1;
3728 keyobj.type = REDIS_STRING;
3729 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3730
3731 decrRefCount(subst);
3732
3733 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3734 return lookupKeyRead(db,&keyobj);
3735 }
3736
3737 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3738 * the additional parameter is not standard but a BSD-specific we have to
3739 * pass sorting parameters via the global 'server' structure */
3740 static int sortCompare(const void *s1, const void *s2) {
3741 const redisSortObject *so1 = s1, *so2 = s2;
3742 int cmp;
3743
3744 if (!server.sort_alpha) {
3745 /* Numeric sorting. Here it's trivial as we precomputed scores */
3746 if (so1->u.score > so2->u.score) {
3747 cmp = 1;
3748 } else if (so1->u.score < so2->u.score) {
3749 cmp = -1;
3750 } else {
3751 cmp = 0;
3752 }
3753 } else {
3754 /* Alphanumeric sorting */
3755 if (server.sort_bypattern) {
3756 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3757 /* At least one compare object is NULL */
3758 if (so1->u.cmpobj == so2->u.cmpobj)
3759 cmp = 0;
3760 else if (so1->u.cmpobj == NULL)
3761 cmp = -1;
3762 else
3763 cmp = 1;
3764 } else {
3765 /* We have both the objects, use strcoll */
3766 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3767 }
3768 } else {
3769 /* Compare elements directly */
3770 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
3771 so2->obj->encoding == REDIS_ENCODING_RAW) {
3772 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3773 } else {
3774 robj *dec1, *dec2;
3775
3776 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
3777 so1->obj : getDecodedObject(so1->obj);
3778 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
3779 so2->obj : getDecodedObject(so2->obj);
3780 cmp = strcoll(dec1->ptr,dec2->ptr);
3781 if (dec1 != so1->obj) decrRefCount(dec1);
3782 if (dec2 != so2->obj) decrRefCount(dec2);
3783 }
3784 }
3785 }
3786 return server.sort_desc ? -cmp : cmp;
3787 }
3788
3789 /* The SORT command is the most complex command in Redis. Warning: this code
3790 * is optimized for speed and a bit less for readability */
3791 static void sortCommand(redisClient *c) {
3792 list *operations;
3793 int outputlen = 0;
3794 int desc = 0, alpha = 0;
3795 int limit_start = 0, limit_count = -1, start, end;
3796 int j, dontsort = 0, vectorlen;
3797 int getop = 0; /* GET operation counter */
3798 robj *sortval, *sortby = NULL;
3799 redisSortObject *vector; /* Resulting vector to sort */
3800
3801 /* Lookup the key to sort. It must be of the right types */
3802 sortval = lookupKeyRead(c->db,c->argv[1]);
3803 if (sortval == NULL) {
3804 addReply(c,shared.nokeyerr);
3805 return;
3806 }
3807 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3808 addReply(c,shared.wrongtypeerr);
3809 return;
3810 }
3811
3812 /* Create a list of operations to perform for every sorted element.
3813 * Operations can be GET/DEL/INCR/DECR */
3814 operations = listCreate();
3815 listSetFreeMethod(operations,zfree);
3816 j = 2;
3817
3818 /* Now we need to protect sortval incrementing its count, in the future
3819 * SORT may have options able to overwrite/delete keys during the sorting
3820 * and the sorted key itself may get destroied */
3821 incrRefCount(sortval);
3822
3823 /* The SORT command has an SQL-alike syntax, parse it */
3824 while(j < c->argc) {
3825 int leftargs = c->argc-j-1;
3826 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3827 desc = 0;
3828 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3829 desc = 1;
3830 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3831 alpha = 1;
3832 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3833 limit_start = atoi(c->argv[j+1]->ptr);
3834 limit_count = atoi(c->argv[j+2]->ptr);
3835 j+=2;
3836 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3837 sortby = c->argv[j+1];
3838 /* If the BY pattern does not contain '*', i.e. it is constant,
3839 * we don't need to sort nor to lookup the weight keys. */
3840 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3841 j++;
3842 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3843 listAddNodeTail(operations,createSortOperation(
3844 REDIS_SORT_GET,c->argv[j+1]));
3845 getop++;
3846 j++;
3847 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3848 listAddNodeTail(operations,createSortOperation(
3849 REDIS_SORT_DEL,c->argv[j+1]));
3850 j++;
3851 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3852 listAddNodeTail(operations,createSortOperation(
3853 REDIS_SORT_INCR,c->argv[j+1]));
3854 j++;
3855 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3856 listAddNodeTail(operations,createSortOperation(
3857 REDIS_SORT_DECR,c->argv[j+1]));
3858 j++;
3859 } else {
3860 decrRefCount(sortval);
3861 listRelease(operations);
3862 addReply(c,shared.syntaxerr);
3863 return;
3864 }
3865 j++;
3866 }
3867
3868 /* Load the sorting vector with all the objects to sort */
3869 vectorlen = (sortval->type == REDIS_LIST) ?
3870 listLength((list*)sortval->ptr) :
3871 dictSize((dict*)sortval->ptr);
3872 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3873 if (!vector) oom("allocating objects vector for SORT");
3874 j = 0;
3875 if (sortval->type == REDIS_LIST) {
3876 list *list = sortval->ptr;
3877 listNode *ln;
3878
3879 listRewind(list);
3880 while((ln = listYield(list))) {
3881 robj *ele = ln->value;
3882 vector[j].obj = ele;
3883 vector[j].u.score = 0;
3884 vector[j].u.cmpobj = NULL;
3885 j++;
3886 }
3887 } else {
3888 dict *set = sortval->ptr;
3889 dictIterator *di;
3890 dictEntry *setele;
3891
3892 di = dictGetIterator(set);
3893 if (!di) oom("dictGetIterator");
3894 while((setele = dictNext(di)) != NULL) {
3895 vector[j].obj = dictGetEntryKey(setele);
3896 vector[j].u.score = 0;
3897 vector[j].u.cmpobj = NULL;
3898 j++;
3899 }
3900 dictReleaseIterator(di);
3901 }
3902 assert(j == vectorlen);
3903
3904 /* Now it's time to load the right scores in the sorting vector */
3905 if (dontsort == 0) {
3906 for (j = 0; j < vectorlen; j++) {
3907 if (sortby) {
3908 robj *byval;
3909
3910 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3911 if (!byval || byval->type != REDIS_STRING) continue;
3912 if (alpha) {
3913 if (byval->encoding == REDIS_ENCODING_RAW) {
3914 vector[j].u.cmpobj = byval;
3915 incrRefCount(byval);
3916 } else {
3917 vector[j].u.cmpobj = getDecodedObject(byval);
3918 }
3919 } else {
3920 if (byval->encoding == REDIS_ENCODING_RAW) {
3921 vector[j].u.score = strtod(byval->ptr,NULL);
3922 } else {
3923 if (byval->encoding == REDIS_ENCODING_INT) {
3924 vector[j].u.score = (long)byval->ptr;
3925 } else
3926 assert(1 != 1);
3927 }
3928 }
3929 } else {
3930 if (!alpha) {
3931 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
3932 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3933 else {
3934 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
3935 vector[j].u.score = (long) vector[j].obj->ptr;
3936 else
3937 assert(1 != 1);
3938 }
3939 }
3940 }
3941 }
3942 }
3943
3944 /* We are ready to sort the vector... perform a bit of sanity check
3945 * on the LIMIT option too. We'll use a partial version of quicksort. */
3946 start = (limit_start < 0) ? 0 : limit_start;
3947 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3948 if (start >= vectorlen) {
3949 start = vectorlen-1;
3950 end = vectorlen-2;
3951 }
3952 if (end >= vectorlen) end = vectorlen-1;
3953
3954 if (dontsort == 0) {
3955 server.sort_desc = desc;
3956 server.sort_alpha = alpha;
3957 server.sort_bypattern = sortby ? 1 : 0;
3958 if (sortby && (start != 0 || end != vectorlen-1))
3959 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3960 else
3961 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3962 }
3963
3964 /* Send command output to the output buffer, performing the specified
3965 * GET/DEL/INCR/DECR operations if any. */
3966 outputlen = getop ? getop*(end-start+1) : end-start+1;
3967 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3968 for (j = start; j <= end; j++) {
3969 listNode *ln;
3970 if (!getop) {
3971 addReplyBulkLen(c,vector[j].obj);
3972 addReply(c,vector[j].obj);
3973 addReply(c,shared.crlf);
3974 }
3975 listRewind(operations);
3976 while((ln = listYield(operations))) {
3977 redisSortOperation *sop = ln->value;
3978 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3979 vector[j].obj);
3980
3981 if (sop->type == REDIS_SORT_GET) {
3982 if (!val || val->type != REDIS_STRING) {
3983 addReply(c,shared.nullbulk);
3984 } else {
3985 addReplyBulkLen(c,val);
3986 addReply(c,val);
3987 addReply(c,shared.crlf);
3988 }
3989 } else if (sop->type == REDIS_SORT_DEL) {
3990 /* TODO */
3991 }
3992 }
3993 }
3994
3995 /* Cleanup */
3996 decrRefCount(sortval);
3997 listRelease(operations);
3998 for (j = 0; j < vectorlen; j++) {
3999 if (sortby && alpha && vector[j].u.cmpobj)
4000 decrRefCount(vector[j].u.cmpobj);
4001 }
4002 zfree(vector);
4003 }
4004
4005 static void infoCommand(redisClient *c) {
4006 sds info;
4007 time_t uptime = time(NULL)-server.stat_starttime;
4008 int j;
4009
4010 info = sdscatprintf(sdsempty(),
4011 "redis_version:%s\r\n"
4012 "arch_bits:%s\r\n"
4013 "uptime_in_seconds:%d\r\n"
4014 "uptime_in_days:%d\r\n"
4015 "connected_clients:%d\r\n"
4016 "connected_slaves:%d\r\n"
4017 "used_memory:%zu\r\n"
4018 "changes_since_last_save:%lld\r\n"
4019 "bgsave_in_progress:%d\r\n"
4020 "last_save_time:%d\r\n"
4021 "total_connections_received:%lld\r\n"
4022 "total_commands_processed:%lld\r\n"
4023 "role:%s\r\n"
4024 ,REDIS_VERSION,
4025 (sizeof(long) == 8) ? "64" : "32",
4026 uptime,
4027 uptime/(3600*24),
4028 listLength(server.clients)-listLength(server.slaves),
4029 listLength(server.slaves),
4030 server.usedmemory,
4031 server.dirty,
4032 server.bgsaveinprogress,
4033 server.lastsave,
4034 server.stat_numconnections,
4035 server.stat_numcommands,
4036 server.masterhost == NULL ? "master" : "slave"
4037 );
4038 if (server.masterhost) {
4039 info = sdscatprintf(info,
4040 "master_host:%s\r\n"
4041 "master_port:%d\r\n"
4042 "master_link_status:%s\r\n"
4043 "master_last_io_seconds_ago:%d\r\n"
4044 ,server.masterhost,
4045 server.masterport,
4046 (server.replstate == REDIS_REPL_CONNECTED) ?
4047 "up" : "down",
4048 (int)(time(NULL)-server.master->lastinteraction)
4049 );
4050 }
4051 for (j = 0; j < server.dbnum; j++) {
4052 long long keys, vkeys;
4053
4054 keys = dictSize(server.db[j].dict);
4055 vkeys = dictSize(server.db[j].expires);
4056 if (keys || vkeys) {
4057 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4058 j, keys, vkeys);
4059 }
4060 }
4061 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4062 addReplySds(c,info);
4063 addReply(c,shared.crlf);
4064 }
4065
4066 static void monitorCommand(redisClient *c) {
4067 /* ignore MONITOR if aleady slave or in monitor mode */
4068 if (c->flags & REDIS_SLAVE) return;
4069
4070 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4071 c->slaveseldb = 0;
4072 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
4073 addReply(c,shared.ok);
4074 }
4075
4076 /* ================================= Expire ================================= */
4077 static int removeExpire(redisDb *db, robj *key) {
4078 if (dictDelete(db->expires,key) == DICT_OK) {
4079 return 1;
4080 } else {
4081 return 0;
4082 }
4083 }
4084
4085 static int setExpire(redisDb *db, robj *key, time_t when) {
4086 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4087 return 0;
4088 } else {
4089 incrRefCount(key);
4090 return 1;
4091 }
4092 }
4093
4094 /* Return the expire time of the specified key, or -1 if no expire
4095 * is associated with this key (i.e. the key is non volatile) */
4096 static time_t getExpire(redisDb *db, robj *key) {
4097 dictEntry *de;
4098
4099 /* No expire? return ASAP */
4100 if (dictSize(db->expires) == 0 ||
4101 (de = dictFind(db->expires,key)) == NULL) return -1;
4102
4103 return (time_t) dictGetEntryVal(de);
4104 }
4105
4106 static int expireIfNeeded(redisDb *db, robj *key) {
4107 time_t when;
4108 dictEntry *de;
4109
4110 /* No expire? return ASAP */
4111 if (dictSize(db->expires) == 0 ||
4112 (de = dictFind(db->expires,key)) == NULL) return 0;
4113
4114 /* Lookup the expire */
4115 when = (time_t) dictGetEntryVal(de);
4116 if (time(NULL) <= when) return 0;
4117
4118 /* Delete the key */
4119 dictDelete(db->expires,key);
4120 return dictDelete(db->dict,key) == DICT_OK;
4121 }
4122
4123 static int deleteIfVolatile(redisDb *db, robj *key) {
4124 dictEntry *de;
4125
4126 /* No expire? return ASAP */
4127 if (dictSize(db->expires) == 0 ||
4128 (de = dictFind(db->expires,key)) == NULL) return 0;
4129
4130 /* Delete the key */
4131 server.dirty++;
4132 dictDelete(db->expires,key);
4133 return dictDelete(db->dict,key) == DICT_OK;
4134 }
4135
4136 static void expireCommand(redisClient *c) {
4137 dictEntry *de;
4138 int seconds = atoi(c->argv[2]->ptr);
4139
4140 de = dictFind(c->db->dict,c->argv[1]);
4141 if (de == NULL) {
4142 addReply(c,shared.czero);
4143 return;
4144 }
4145 if (seconds <= 0) {
4146 addReply(c, shared.czero);
4147 return;
4148 } else {
4149 time_t when = time(NULL)+seconds;
4150 if (setExpire(c->db,c->argv[1],when)) {
4151 addReply(c,shared.cone);
4152 server.dirty++;
4153 } else {
4154 addReply(c,shared.czero);
4155 }
4156 return;
4157 }
4158 }
4159
4160 static void ttlCommand(redisClient *c) {
4161 time_t expire;
4162 int ttl = -1;
4163
4164 expire = getExpire(c->db,c->argv[1]);
4165 if (expire != -1) {
4166 ttl = (int) (expire-time(NULL));
4167 if (ttl < 0) ttl = -1;
4168 }
4169 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4170 }
4171
4172 static void msetGenericCommand(redisClient *c, int nx) {
4173 int j;
4174
4175 if ((c->argc % 2) == 0) {
4176 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4177 return;
4178 }
4179 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4180 * set nothing at all if at least one already key exists. */
4181 if (nx) {
4182 for (j = 1; j < c->argc; j += 2) {
4183 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4184 addReply(c, shared.czero);
4185 return;
4186 }
4187 }
4188 }
4189
4190 for (j = 1; j < c->argc; j += 2) {
4191 int retval;
4192
4193 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4194 if (retval == DICT_ERR) {
4195 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4196 incrRefCount(c->argv[j+1]);
4197 } else {
4198 incrRefCount(c->argv[j]);
4199 incrRefCount(c->argv[j+1]);
4200 }
4201 removeExpire(c->db,c->argv[j]);
4202 }
4203 server.dirty += (c->argc-1)/2;
4204 addReply(c, nx ? shared.cone : shared.ok);
4205 }
4206
4207 static void msetCommand(redisClient *c) {
4208 msetGenericCommand(c,0);
4209 }
4210
4211 static void msetnxCommand(redisClient *c) {
4212 msetGenericCommand(c,1);
4213 }
4214
4215 /* =============================== Replication ============================= */
4216
4217 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4218 ssize_t nwritten, ret = size;
4219 time_t start = time(NULL);
4220
4221 timeout++;
4222 while(size) {
4223 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4224 nwritten = write(fd,ptr,size);
4225 if (nwritten == -1) return -1;
4226 ptr += nwritten;
4227 size -= nwritten;
4228 }
4229 if ((time(NULL)-start) > timeout) {
4230 errno = ETIMEDOUT;
4231 return -1;
4232 }
4233 }
4234 return ret;
4235 }
4236
4237 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4238 ssize_t nread, totread = 0;
4239 time_t start = time(NULL);
4240
4241 timeout++;
4242 while(size) {
4243 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4244 nread = read(fd,ptr,size);
4245 if (nread == -1) return -1;
4246 ptr += nread;
4247 size -= nread;
4248 totread += nread;
4249 }
4250 if ((time(NULL)-start) > timeout) {
4251 errno = ETIMEDOUT;
4252 return -1;
4253 }
4254 }
4255 return totread;
4256 }
4257
4258 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4259 ssize_t nread = 0;
4260
4261 size--;
4262 while(size) {
4263 char c;
4264
4265 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4266 if (c == '\n') {
4267 *ptr = '\0';
4268 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4269 return nread;
4270 } else {
4271 *ptr++ = c;
4272 *ptr = '\0';
4273 nread++;
4274 }
4275 }
4276 return nread;
4277 }
4278
4279 static void syncCommand(redisClient *c) {
4280 /* ignore SYNC if aleady slave or in monitor mode */
4281 if (c->flags & REDIS_SLAVE) return;
4282
4283 /* SYNC can't be issued when the server has pending data to send to
4284 * the client about already issued commands. We need a fresh reply
4285 * buffer registering the differences between the BGSAVE and the current
4286 * dataset, so that we can copy to other slaves if needed. */
4287 if (listLength(c->reply) != 0) {
4288 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4289 return;
4290 }
4291
4292 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4293 /* Here we need to check if there is a background saving operation
4294 * in progress, or if it is required to start one */
4295 if (server.bgsaveinprogress) {
4296 /* Ok a background save is in progress. Let's check if it is a good
4297 * one for replication, i.e. if there is another slave that is
4298 * registering differences since the server forked to save */
4299 redisClient *slave;
4300 listNode *ln;
4301
4302 listRewind(server.slaves);
4303 while((ln = listYield(server.slaves))) {
4304 slave = ln->value;
4305 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4306 }
4307 if (ln) {
4308 /* Perfect, the server is already registering differences for
4309 * another slave. Set the right state, and copy the buffer. */
4310 listRelease(c->reply);
4311 c->reply = listDup(slave->reply);
4312 if (!c->reply) oom("listDup copying slave reply list");
4313 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4314 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4315 } else {
4316 /* No way, we need to wait for the next BGSAVE in order to
4317 * register differences */
4318 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4319 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4320 }
4321 } else {
4322 /* Ok we don't have a BGSAVE in progress, let's start one */
4323 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4324 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4325 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4326 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4327 return;
4328 }
4329 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4330 }
4331 c->repldbfd = -1;
4332 c->flags |= REDIS_SLAVE;
4333 c->slaveseldb = 0;
4334 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
4335 return;
4336 }
4337
4338 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4339 redisClient *slave = privdata;
4340 REDIS_NOTUSED(el);
4341 REDIS_NOTUSED(mask);
4342 char buf[REDIS_IOBUF_LEN];
4343 ssize_t nwritten, buflen;
4344
4345 if (slave->repldboff == 0) {
4346 /* Write the bulk write count before to transfer the DB. In theory here
4347 * we don't know how much room there is in the output buffer of the
4348 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4349 * operations) will never be smaller than the few bytes we need. */
4350 sds bulkcount;
4351
4352 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4353 slave->repldbsize);
4354 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4355 {
4356 sdsfree(bulkcount);
4357 freeClient(slave);
4358 return;
4359 }
4360 sdsfree(bulkcount);
4361 }
4362 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4363 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4364 if (buflen <= 0) {
4365 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4366 (buflen == 0) ? "premature EOF" : strerror(errno));
4367 freeClient(slave);
4368 return;
4369 }
4370 if ((nwritten = write(fd,buf,buflen)) == -1) {
4371 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4372 strerror(errno));
4373 freeClient(slave);
4374 return;
4375 }
4376 slave->repldboff += nwritten;
4377 if (slave->repldboff == slave->repldbsize) {
4378 close(slave->repldbfd);
4379 slave->repldbfd = -1;
4380 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4381 slave->replstate = REDIS_REPL_ONLINE;
4382 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4383 sendReplyToClient, slave, NULL) == AE_ERR) {
4384 freeClient(slave);
4385 return;
4386 }
4387 addReplySds(slave,sdsempty());
4388 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4389 }
4390 }
4391
4392 /* This function is called at the end of every backgrond saving.
4393 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4394 * otherwise REDIS_ERR is passed to the function.
4395 *
4396 * The goal of this function is to handle slaves waiting for a successful
4397 * background saving in order to perform non-blocking synchronization. */
4398 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4399 listNode *ln;
4400 int startbgsave = 0;
4401
4402 listRewind(server.slaves);
4403 while((ln = listYield(server.slaves))) {
4404 redisClient *slave = ln->value;
4405
4406 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4407 startbgsave = 1;
4408 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4409 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4410 struct redis_stat buf;
4411
4412 if (bgsaveerr != REDIS_OK) {
4413 freeClient(slave);
4414 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4415 continue;
4416 }
4417 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4418 redis_fstat(slave->repldbfd,&buf) == -1) {
4419 freeClient(slave);
4420 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4421 continue;
4422 }
4423 slave->repldboff = 0;
4424 slave->repldbsize = buf.st_size;
4425 slave->replstate = REDIS_REPL_SEND_BULK;
4426 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4427 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4428 freeClient(slave);
4429 continue;
4430 }
4431 }
4432 }
4433 if (startbgsave) {
4434 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4435 listRewind(server.slaves);
4436 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4437 while((ln = listYield(server.slaves))) {
4438 redisClient *slave = ln->value;
4439
4440 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4441 freeClient(slave);
4442 }
4443 }
4444 }
4445 }
4446
4447 static int syncWithMaster(void) {
4448 char buf[1024], tmpfile[256];
4449 int dumpsize;
4450 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4451 int dfd;
4452
4453 if (fd == -1) {
4454 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4455 strerror(errno));
4456 return REDIS_ERR;
4457 }
4458 /* Issue the SYNC command */
4459 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4460 close(fd);
4461 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4462 strerror(errno));
4463 return REDIS_ERR;
4464 }
4465 /* Read the bulk write count */
4466 if (syncReadLine(fd,buf,1024,3600) == -1) {
4467 close(fd);
4468 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
4469 strerror(errno));
4470 return REDIS_ERR;
4471 }
4472 dumpsize = atoi(buf+1);
4473 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4474 /* Read the bulk write data on a temp file */
4475 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4476 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4477 if (dfd == -1) {
4478 close(fd);
4479 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
4480 return REDIS_ERR;
4481 }
4482 while(dumpsize) {
4483 int nread, nwritten;
4484
4485 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
4486 if (nread == -1) {
4487 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
4488 strerror(errno));
4489 close(fd);
4490 close(dfd);
4491 return REDIS_ERR;
4492 }
4493 nwritten = write(dfd,buf,nread);
4494 if (nwritten == -1) {
4495 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
4496 close(fd);
4497 close(dfd);
4498 return REDIS_ERR;
4499 }
4500 dumpsize -= nread;
4501 }
4502 close(dfd);
4503 if (rename(tmpfile,server.dbfilename) == -1) {
4504 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
4505 unlink(tmpfile);
4506 close(fd);
4507 return REDIS_ERR;
4508 }
4509 emptyDb();
4510 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4511 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4512 close(fd);
4513 return REDIS_ERR;
4514 }
4515 server.master = createClient(fd);
4516 server.master->flags |= REDIS_MASTER;
4517 server.replstate = REDIS_REPL_CONNECTED;
4518 return REDIS_OK;
4519 }
4520
4521 static void slaveofCommand(redisClient *c) {
4522 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4523 !strcasecmp(c->argv[2]->ptr,"one")) {
4524 if (server.masterhost) {
4525 sdsfree(server.masterhost);
4526 server.masterhost = NULL;
4527 if (server.master) freeClient(server.master);
4528 server.replstate = REDIS_REPL_NONE;
4529 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4530 }
4531 } else {
4532 sdsfree(server.masterhost);
4533 server.masterhost = sdsdup(c->argv[1]->ptr);
4534 server.masterport = atoi(c->argv[2]->ptr);
4535 if (server.master) freeClient(server.master);
4536 server.replstate = REDIS_REPL_CONNECT;
4537 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4538 server.masterhost, server.masterport);
4539 }
4540 addReply(c,shared.ok);
4541 }
4542
4543 /* ============================ Maxmemory directive ======================== */
4544
4545 /* This function gets called when 'maxmemory' is set on the config file to limit
4546 * the max memory used by the server, and we are out of memory.
4547 * This function will try to, in order:
4548 *
4549 * - Free objects from the free list
4550 * - Try to remove keys with an EXPIRE set
4551 *
4552 * It is not possible to free enough memory to reach used-memory < maxmemory
4553 * the server will start refusing commands that will enlarge even more the
4554 * memory usage.
4555 */
4556 static void freeMemoryIfNeeded(void) {
4557 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4558 if (listLength(server.objfreelist)) {
4559 robj *o;
4560
4561 listNode *head = listFirst(server.objfreelist);
4562 o = listNodeValue(head);
4563 listDelNode(server.objfreelist,head);
4564 zfree(o);
4565 } else {
4566 int j, k, freed = 0;
4567
4568 for (j = 0; j < server.dbnum; j++) {
4569 int minttl = -1;
4570 robj *minkey = NULL;
4571 struct dictEntry *de;
4572
4573 if (dictSize(server.db[j].expires)) {
4574 freed = 1;
4575 /* From a sample of three keys drop the one nearest to
4576 * the natural expire */
4577 for (k = 0; k < 3; k++) {
4578 time_t t;
4579
4580 de = dictGetRandomKey(server.db[j].expires);
4581 t = (time_t) dictGetEntryVal(de);
4582 if (minttl == -1 || t < minttl) {
4583 minkey = dictGetEntryKey(de);
4584 minttl = t;
4585 }
4586 }
4587 deleteKey(server.db+j,minkey);
4588 }
4589 }
4590 if (!freed) return; /* nothing to free... */
4591 }
4592 }
4593 }
4594
4595 /* ================================= Debugging ============================== */
4596
4597 static void debugCommand(redisClient *c) {
4598 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4599 *((char*)-1) = 'x';
4600 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4601 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4602 robj *key, *val;
4603
4604 if (!de) {
4605 addReply(c,shared.nokeyerr);
4606 return;
4607 }
4608 key = dictGetEntryKey(de);
4609 val = dictGetEntryVal(de);
4610 addReplySds(c,sdscatprintf(sdsempty(),
4611 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4612 key, key->refcount, val, val->refcount, val->encoding));
4613 } else {
4614 addReplySds(c,sdsnew(
4615 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4616 }
4617 }
4618
4619 #ifdef HAVE_BACKTRACE
4620 static struct redisFunctionSym symsTable[] = {
4621 {"compareStringObjects", (unsigned long)compareStringObjects},
4622 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
4623 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
4624 {"dictEncObjHash", (unsigned long)dictEncObjHash},
4625 {"incrDecrCommand", (unsigned long)incrDecrCommand},
4626 {"freeStringObject", (unsigned long)freeStringObject},
4627 {"freeListObject", (unsigned long)freeListObject},
4628 {"freeSetObject", (unsigned long)freeSetObject},
4629 {"decrRefCount", (unsigned long)decrRefCount},
4630 {"createObject", (unsigned long)createObject},
4631 {"freeClient", (unsigned long)freeClient},
4632 {"rdbLoad", (unsigned long)rdbLoad},
4633 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
4634 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
4635 {"addReply", (unsigned long)addReply},
4636 {"addReplySds", (unsigned long)addReplySds},
4637 {"incrRefCount", (unsigned long)incrRefCount},
4638 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
4639 {"createStringObject", (unsigned long)createStringObject},
4640 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
4641 {"syncWithMaster", (unsigned long)syncWithMaster},
4642 {"tryObjectSharing", (unsigned long)tryObjectSharing},
4643 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
4644 {"getDecodedObject", (unsigned long)getDecodedObject},
4645 {"removeExpire", (unsigned long)removeExpire},
4646 {"expireIfNeeded", (unsigned long)expireIfNeeded},
4647 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
4648 {"deleteKey", (unsigned long)deleteKey},
4649 {"getExpire", (unsigned long)getExpire},
4650 {"setExpire", (unsigned long)setExpire},
4651 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
4652 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
4653 {"authCommand", (unsigned long)authCommand},
4654 {"pingCommand", (unsigned long)pingCommand},
4655 {"echoCommand", (unsigned long)echoCommand},
4656 {"setCommand", (unsigned long)setCommand},
4657 {"setnxCommand", (unsigned long)setnxCommand},
4658 {"getCommand", (unsigned long)getCommand},
4659 {"delCommand", (unsigned long)delCommand},
4660 {"existsCommand", (unsigned long)existsCommand},
4661 {"incrCommand", (unsigned long)incrCommand},
4662 {"decrCommand", (unsigned long)decrCommand},
4663 {"incrbyCommand", (unsigned long)incrbyCommand},
4664 {"decrbyCommand", (unsigned long)decrbyCommand},
4665 {"selectCommand", (unsigned long)selectCommand},
4666 {"randomkeyCommand", (unsigned long)randomkeyCommand},
4667 {"keysCommand", (unsigned long)keysCommand},
4668 {"dbsizeCommand", (unsigned long)dbsizeCommand},
4669 {"lastsaveCommand", (unsigned long)lastsaveCommand},
4670 {"saveCommand", (unsigned long)saveCommand},
4671 {"bgsaveCommand", (unsigned long)bgsaveCommand},
4672 {"shutdownCommand", (unsigned long)shutdownCommand},
4673 {"moveCommand", (unsigned long)moveCommand},
4674 {"renameCommand", (unsigned long)renameCommand},
4675 {"renamenxCommand", (unsigned long)renamenxCommand},
4676 {"lpushCommand", (unsigned long)lpushCommand},
4677 {"rpushCommand", (unsigned long)rpushCommand},
4678 {"lpopCommand", (unsigned long)lpopCommand},
4679 {"rpopCommand", (unsigned long)rpopCommand},
4680 {"llenCommand", (unsigned long)llenCommand},
4681 {"lindexCommand", (unsigned long)lindexCommand},
4682 {"lrangeCommand", (unsigned long)lrangeCommand},
4683 {"ltrimCommand", (unsigned long)ltrimCommand},
4684 {"typeCommand", (unsigned long)typeCommand},
4685 {"lsetCommand", (unsigned long)lsetCommand},
4686 {"saddCommand", (unsigned long)saddCommand},
4687 {"sremCommand", (unsigned long)sremCommand},
4688 {"smoveCommand", (unsigned long)smoveCommand},
4689 {"sismemberCommand", (unsigned long)sismemberCommand},
4690 {"scardCommand", (unsigned long)scardCommand},
4691 {"spopCommand", (unsigned long)spopCommand},
4692 {"srandmemberCommand", (unsigned long)srandmemberCommand},
4693 {"sinterCommand", (unsigned long)sinterCommand},
4694 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
4695 {"sunionCommand", (unsigned long)sunionCommand},
4696 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
4697 {"sdiffCommand", (unsigned long)sdiffCommand},
4698 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
4699 {"syncCommand", (unsigned long)syncCommand},
4700 {"flushdbCommand", (unsigned long)flushdbCommand},
4701 {"flushallCommand", (unsigned long)flushallCommand},
4702 {"sortCommand", (unsigned long)sortCommand},
4703 {"lremCommand", (unsigned long)lremCommand},
4704 {"infoCommand", (unsigned long)infoCommand},
4705 {"mgetCommand", (unsigned long)mgetCommand},
4706 {"monitorCommand", (unsigned long)monitorCommand},
4707 {"expireCommand", (unsigned long)expireCommand},
4708 {"getsetCommand", (unsigned long)getsetCommand},
4709 {"ttlCommand", (unsigned long)ttlCommand},
4710 {"slaveofCommand", (unsigned long)slaveofCommand},
4711 {"debugCommand", (unsigned long)debugCommand},
4712 {"processCommand", (unsigned long)processCommand},
4713 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
4714 {"readQueryFromClient", (unsigned long)readQueryFromClient},
4715 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
4716 {"msetGenericCommand", (unsigned long)msetGenericCommand},
4717 {"msetCommand", (unsigned long)msetCommand},
4718 {"msetnxCommand", (unsigned long)msetnxCommand},
4719 {NULL,0}
4720 };
4721
4722 /* This function try to convert a pointer into a function name. It's used in
4723 * oreder to provide a backtrace under segmentation fault that's able to
4724 * display functions declared as static (otherwise the backtrace is useless). */
4725 static char *findFuncName(void *pointer, unsigned long *offset){
4726 int i, ret = -1;
4727 unsigned long off, minoff = 0;
4728
4729 /* Try to match against the Symbol with the smallest offset */
4730 for (i=0; symsTable[i].pointer; i++) {
4731 unsigned long lp = (unsigned long) pointer;
4732
4733 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
4734 off=lp-symsTable[i].pointer;
4735 if (ret < 0 || off < minoff) {
4736 minoff=off;
4737 ret=i;
4738 }
4739 }
4740 }
4741 if (ret == -1) return NULL;
4742 *offset = minoff;
4743 return symsTable[ret].name;
4744 }
4745
4746 static void *getMcontextEip(ucontext_t *uc) {
4747 #if defined(__FreeBSD__)
4748 return (void*) uc->uc_mcontext.mc_eip;
4749 #elif defined(__dietlibc__)
4750 return (void*) uc->uc_mcontext.eip;
4751 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
4752 return (void*) uc->uc_mcontext->__ss.__eip;
4753 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
4754 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
4755 return (void*) uc->uc_mcontext->__ss.__rip;
4756 #else
4757 return (void*) uc->uc_mcontext->__ss.__eip;
4758 #endif
4759 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
4760 return (void*) uc->uc_mcontext.gregs[REG_EIP];
4761 #elif defined(__ia64__) /* Linux IA64 */
4762 return (void*) uc->uc_mcontext.sc_ip;
4763 #else
4764 return NULL;
4765 #endif
4766 }
4767
4768 static void segvHandler(int sig, siginfo_t *info, void *secret) {
4769 void *trace[100];
4770 char **messages = NULL;
4771 int i, trace_size = 0;
4772 unsigned long offset=0;
4773 time_t uptime = time(NULL)-server.stat_starttime;
4774 ucontext_t *uc = (ucontext_t*) secret;
4775 REDIS_NOTUSED(info);
4776
4777 redisLog(REDIS_WARNING,
4778 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
4779 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
4780 "redis_version:%s; "
4781 "uptime_in_seconds:%d; "
4782 "connected_clients:%d; "
4783 "connected_slaves:%d; "
4784 "used_memory:%zu; "
4785 "changes_since_last_save:%lld; "
4786 "bgsave_in_progress:%d; "
4787 "last_save_time:%d; "
4788 "total_connections_received:%lld; "
4789 "total_commands_processed:%lld; "
4790 "role:%s;"
4791 ,REDIS_VERSION,
4792 uptime,
4793 listLength(server.clients)-listLength(server.slaves),
4794 listLength(server.slaves),
4795 server.usedmemory,
4796 server.dirty,
4797 server.bgsaveinprogress,
4798 server.lastsave,
4799 server.stat_numconnections,
4800 server.stat_numcommands,
4801 server.masterhost == NULL ? "master" : "slave"
4802 ));
4803
4804 trace_size = backtrace(trace, 100);
4805 /* overwrite sigaction with caller's address */
4806 if (getMcontextEip(uc) != NULL) {
4807 trace[1] = getMcontextEip(uc);
4808 }
4809 messages = backtrace_symbols(trace, trace_size);
4810
4811 for (i=1; i<trace_size; ++i) {
4812 char *fn = findFuncName(trace[i], &offset), *p;
4813
4814 p = strchr(messages[i],'+');
4815 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
4816 redisLog(REDIS_WARNING,"%s", messages[i]);
4817 } else {
4818 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
4819 }
4820 }
4821 free(messages);
4822 exit(0);
4823 }
4824
4825 static void setupSigSegvAction(void) {
4826 struct sigaction act;
4827
4828 sigemptyset (&act.sa_mask);
4829 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4830 * is used. Otherwise, sa_handler is used */
4831 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
4832 act.sa_sigaction = segvHandler;
4833 sigaction (SIGSEGV, &act, NULL);
4834 sigaction (SIGBUS, &act, NULL);
4835 sigaction (SIGFPE, &act, NULL);
4836 sigaction (SIGILL, &act, NULL);
4837 sigaction (SIGBUS, &act, NULL);
4838 return;
4839 }
4840 #else /* HAVE_BACKTRACE */
4841 static void setupSigSegvAction(void) {
4842 }
4843 #endif /* HAVE_BACKTRACE */
4844
4845 /* =================================== Main! ================================ */
4846
4847 #ifdef __linux__
4848 int linuxOvercommitMemoryValue(void) {
4849 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4850 char buf[64];
4851
4852 if (!fp) return -1;
4853 if (fgets(buf,64,fp) == NULL) {
4854 fclose(fp);
4855 return -1;
4856 }
4857 fclose(fp);
4858
4859 return atoi(buf);
4860 }
4861
4862 void linuxOvercommitMemoryWarning(void) {
4863 if (linuxOvercommitMemoryValue() == 0) {
4864 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
4865 }
4866 }
4867 #endif /* __linux__ */
4868
4869 static void daemonize(void) {
4870 int fd;
4871 FILE *fp;
4872
4873 if (fork() != 0) exit(0); /* parent exits */
4874 setsid(); /* create a new session */
4875
4876 /* Every output goes to /dev/null. If Redis is daemonized but
4877 * the 'logfile' is set to 'stdout' in the configuration file
4878 * it will not log at all. */
4879 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4880 dup2(fd, STDIN_FILENO);
4881 dup2(fd, STDOUT_FILENO);
4882 dup2(fd, STDERR_FILENO);
4883 if (fd > STDERR_FILENO) close(fd);
4884 }
4885 /* Try to write the pid file */
4886 fp = fopen(server.pidfile,"w");
4887 if (fp) {
4888 fprintf(fp,"%d\n",getpid());
4889 fclose(fp);
4890 }
4891 }
4892
4893 int main(int argc, char **argv) {
4894 initServerConfig();
4895 if (argc == 2) {
4896 ResetServerSaveParams();
4897 loadServerConfig(argv[1]);
4898 } else if (argc > 2) {
4899 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4900 exit(1);
4901 } else {
4902 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4903 }
4904 initServer();
4905 if (server.daemonize) daemonize();
4906 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4907 #ifdef __linux__
4908 linuxOvercommitMemoryWarning();
4909 #endif
4910 if (rdbLoad(server.dbfilename) == REDIS_OK)
4911 redisLog(REDIS_NOTICE,"DB loaded from disk");
4912 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4913 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4914 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4915 aeMain(server.el);
4916 aeDeleteEventLoop(server.el);
4917 return 0;
4918 }