]> git.saurik.com Git - redis.git/blob - redis.c
31402663251a85867bf066776cd5336443d3c466
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.001"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60
61 #include "redis.h"
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
70
71 /* Error codes */
72 #define REDIS_OK 0
73 #define REDIS_ERR -1
74
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
88
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
91
92 /* Command flags */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
100
101 /* Object types */
102 #define REDIS_STRING 0
103 #define REDIS_LIST 1
104 #define REDIS_SET 2
105 #define REDIS_HASH 3
106
107 /* Objects encoding */
108 #define REDIS_ENCODING_RAW 0 /* Raw representation */
109 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
110
111 /* Object types only used for dumping to disk */
112 #define REDIS_EXPIRETIME 253
113 #define REDIS_SELECTDB 254
114 #define REDIS_EOF 255
115
116 /* Defines related to the dump file format. To store 32 bits lengths for short
117 * keys requires a lot of space, so we check the most significant 2 bits of
118 * the first byte to interpreter the length:
119 *
120 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
121 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
122 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
123 * 11|000000 this means: specially encoded object will follow. The six bits
124 * number specify the kind of object that follows.
125 * See the REDIS_RDB_ENC_* defines.
126 *
127 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
128 * values, will fit inside. */
129 #define REDIS_RDB_6BITLEN 0
130 #define REDIS_RDB_14BITLEN 1
131 #define REDIS_RDB_32BITLEN 2
132 #define REDIS_RDB_ENCVAL 3
133 #define REDIS_RDB_LENERR UINT_MAX
134
135 /* When a length of a string object stored on disk has the first two bits
136 * set, the remaining two bits specify a special encoding for the object
137 * accordingly to the following defines: */
138 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
139 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
140 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
141 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
142
143 /* Client flags */
144 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
145 #define REDIS_SLAVE 2 /* This client is a slave server */
146 #define REDIS_MASTER 4 /* This client is a master server */
147 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
148
149 /* Slave replication state - slave side */
150 #define REDIS_REPL_NONE 0 /* No active replication */
151 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
152 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
153
154 /* Slave replication state - from the point of view of master
155 * Note that in SEND_BULK and ONLINE state the slave receives new updates
156 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
157 * to start the next background saving in order to send updates to it. */
158 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
159 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
160 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
161 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
162
163 /* List related stuff */
164 #define REDIS_HEAD 0
165 #define REDIS_TAIL 1
166
167 /* Sort operations */
168 #define REDIS_SORT_GET 0
169 #define REDIS_SORT_DEL 1
170 #define REDIS_SORT_INCR 2
171 #define REDIS_SORT_DECR 3
172 #define REDIS_SORT_ASC 4
173 #define REDIS_SORT_DESC 5
174 #define REDIS_SORTKEY_MAX 1024
175
176 /* Log levels */
177 #define REDIS_DEBUG 0
178 #define REDIS_NOTICE 1
179 #define REDIS_WARNING 2
180
181 /* Anti-warning macro... */
182 #define REDIS_NOTUSED(V) ((void) V)
183
184
185 /*================================= Data types ============================== */
186
187 /* A redis object, that is a type able to hold a string / list / set */
188 typedef struct redisObject {
189 void *ptr;
190 unsigned char type;
191 unsigned char encoding;
192 unsigned char notused[2];
193 int refcount;
194 } robj;
195
196 typedef struct redisDb {
197 dict *dict;
198 dict *expires;
199 int id;
200 } redisDb;
201
202 /* With multiplexing we need to take per-clinet state.
203 * Clients are taken in a liked list. */
204 typedef struct redisClient {
205 int fd;
206 redisDb *db;
207 int dictid;
208 sds querybuf;
209 robj **argv, **mbargv;
210 int argc, mbargc;
211 int bulklen; /* bulk read len. -1 if not in bulk read mode */
212 int multibulk; /* multi bulk command format active */
213 list *reply;
214 int sentlen;
215 time_t lastinteraction; /* time of the last interaction, used for timeout */
216 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
217 int slaveseldb; /* slave selected db, if this client is a slave */
218 int authenticated; /* when requirepass is non-NULL */
219 int replstate; /* replication state if this is a slave */
220 int repldbfd; /* replication DB file descriptor */
221 long repldboff; /* replication DB file offset */
222 off_t repldbsize; /* replication DB file size */
223 } redisClient;
224
225 struct saveparam {
226 time_t seconds;
227 int changes;
228 };
229
230 /* Global server state structure */
231 struct redisServer {
232 int port;
233 int fd;
234 redisDb *db;
235 dict *sharingpool;
236 unsigned int sharingpoolsize;
237 long long dirty; /* changes to DB from the last save */
238 list *clients;
239 list *slaves, *monitors;
240 char neterr[ANET_ERR_LEN];
241 aeEventLoop *el;
242 int cronloops; /* number of times the cron function run */
243 list *objfreelist; /* A list of freed objects to avoid malloc() */
244 time_t lastsave; /* Unix time of last save succeeede */
245 size_t usedmemory; /* Used memory in megabytes */
246 /* Fields used only for stats */
247 time_t stat_starttime; /* server start time */
248 long long stat_numcommands; /* number of processed commands */
249 long long stat_numconnections; /* number of connections received */
250 /* Configuration */
251 int verbosity;
252 int glueoutputbuf;
253 int maxidletime;
254 int dbnum;
255 int daemonize;
256 char *pidfile;
257 int bgsaveinprogress;
258 pid_t bgsavechildpid;
259 struct saveparam *saveparams;
260 int saveparamslen;
261 char *logfile;
262 char *bindaddr;
263 char *dbfilename;
264 char *requirepass;
265 int shareobjects;
266 /* Replication related */
267 int isslave;
268 char *masterhost;
269 int masterport;
270 redisClient *master; /* client that is master for this slave */
271 int replstate;
272 unsigned int maxclients;
273 unsigned long maxmemory;
274 /* Sort parameters - qsort_r() is only available under BSD so we
275 * have to take this state global, in order to pass it to sortCompare() */
276 int sort_desc;
277 int sort_alpha;
278 int sort_bypattern;
279 };
280
281 typedef void redisCommandProc(redisClient *c);
282 struct redisCommand {
283 char *name;
284 redisCommandProc *proc;
285 int arity;
286 int flags;
287 };
288
289 struct redisFunctionSym {
290 char *name;
291 unsigned long pointer;
292 };
293
294 typedef struct _redisSortObject {
295 robj *obj;
296 union {
297 double score;
298 robj *cmpobj;
299 } u;
300 } redisSortObject;
301
302 typedef struct _redisSortOperation {
303 int type;
304 robj *pattern;
305 } redisSortOperation;
306
307 struct sharedObjectsStruct {
308 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
309 *colon, *nullbulk, *nullmultibulk,
310 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
311 *outofrangeerr, *plus,
312 *select0, *select1, *select2, *select3, *select4,
313 *select5, *select6, *select7, *select8, *select9;
314 } shared;
315
316 /*================================ Prototypes =============================== */
317
318 static void freeStringObject(robj *o);
319 static void freeListObject(robj *o);
320 static void freeSetObject(robj *o);
321 static void decrRefCount(void *o);
322 static robj *createObject(int type, void *ptr);
323 static void freeClient(redisClient *c);
324 static int rdbLoad(char *filename);
325 static void addReply(redisClient *c, robj *obj);
326 static void addReplySds(redisClient *c, sds s);
327 static void incrRefCount(robj *o);
328 static int rdbSaveBackground(char *filename);
329 static robj *createStringObject(char *ptr, size_t len);
330 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
331 static int syncWithMaster(void);
332 static robj *tryObjectSharing(robj *o);
333 static int tryObjectEncoding(robj *o);
334 static robj *getDecodedObject(const robj *o);
335 static int removeExpire(redisDb *db, robj *key);
336 static int expireIfNeeded(redisDb *db, robj *key);
337 static int deleteIfVolatile(redisDb *db, robj *key);
338 static int deleteKey(redisDb *db, robj *key);
339 static time_t getExpire(redisDb *db, robj *key);
340 static int setExpire(redisDb *db, robj *key, time_t when);
341 static void updateSlavesWaitingBgsave(int bgsaveerr);
342 static void freeMemoryIfNeeded(void);
343 static int processCommand(redisClient *c);
344 static void setupSigSegvAction(void);
345 static void rdbRemoveTempFile(pid_t childpid);
346 static size_t stringObjectLen(robj *o);
347 static void processInputBuffer(redisClient *c);
348
349 static void authCommand(redisClient *c);
350 static void pingCommand(redisClient *c);
351 static void echoCommand(redisClient *c);
352 static void setCommand(redisClient *c);
353 static void setnxCommand(redisClient *c);
354 static void getCommand(redisClient *c);
355 static void delCommand(redisClient *c);
356 static void existsCommand(redisClient *c);
357 static void incrCommand(redisClient *c);
358 static void decrCommand(redisClient *c);
359 static void incrbyCommand(redisClient *c);
360 static void decrbyCommand(redisClient *c);
361 static void selectCommand(redisClient *c);
362 static void randomkeyCommand(redisClient *c);
363 static void keysCommand(redisClient *c);
364 static void dbsizeCommand(redisClient *c);
365 static void lastsaveCommand(redisClient *c);
366 static void saveCommand(redisClient *c);
367 static void bgsaveCommand(redisClient *c);
368 static void shutdownCommand(redisClient *c);
369 static void moveCommand(redisClient *c);
370 static void renameCommand(redisClient *c);
371 static void renamenxCommand(redisClient *c);
372 static void lpushCommand(redisClient *c);
373 static void rpushCommand(redisClient *c);
374 static void lpopCommand(redisClient *c);
375 static void rpopCommand(redisClient *c);
376 static void llenCommand(redisClient *c);
377 static void lindexCommand(redisClient *c);
378 static void lrangeCommand(redisClient *c);
379 static void ltrimCommand(redisClient *c);
380 static void typeCommand(redisClient *c);
381 static void lsetCommand(redisClient *c);
382 static void saddCommand(redisClient *c);
383 static void sremCommand(redisClient *c);
384 static void smoveCommand(redisClient *c);
385 static void sismemberCommand(redisClient *c);
386 static void scardCommand(redisClient *c);
387 static void spopCommand(redisClient *c);
388 static void srandmemberCommand(redisClient *c);
389 static void sinterCommand(redisClient *c);
390 static void sinterstoreCommand(redisClient *c);
391 static void sunionCommand(redisClient *c);
392 static void sunionstoreCommand(redisClient *c);
393 static void sdiffCommand(redisClient *c);
394 static void sdiffstoreCommand(redisClient *c);
395 static void syncCommand(redisClient *c);
396 static void flushdbCommand(redisClient *c);
397 static void flushallCommand(redisClient *c);
398 static void sortCommand(redisClient *c);
399 static void lremCommand(redisClient *c);
400 static void infoCommand(redisClient *c);
401 static void mgetCommand(redisClient *c);
402 static void monitorCommand(redisClient *c);
403 static void expireCommand(redisClient *c);
404 static void getsetCommand(redisClient *c);
405 static void ttlCommand(redisClient *c);
406 static void slaveofCommand(redisClient *c);
407 static void debugCommand(redisClient *c);
408 static void msetCommand(redisClient *c);
409 static void msetnxCommand(redisClient *c);
410
411 /*================================= Globals ================================= */
412
413 /* Global vars */
414 static struct redisServer server; /* server global state */
415 static struct redisCommand cmdTable[] = {
416 {"get",getCommand,2,REDIS_CMD_INLINE},
417 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
418 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
419 {"del",delCommand,-2,REDIS_CMD_INLINE},
420 {"exists",existsCommand,2,REDIS_CMD_INLINE},
421 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
422 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
423 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
424 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
425 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
426 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
427 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
428 {"llen",llenCommand,2,REDIS_CMD_INLINE},
429 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
430 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
431 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
432 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
433 {"lrem",lremCommand,4,REDIS_CMD_BULK},
434 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
435 {"srem",sremCommand,3,REDIS_CMD_BULK},
436 {"smove",smoveCommand,4,REDIS_CMD_BULK},
437 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
438 {"scard",scardCommand,2,REDIS_CMD_INLINE},
439 {"spop",spopCommand,2,REDIS_CMD_INLINE},
440 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
441 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
442 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
443 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
444 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
445 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
446 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
447 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
448 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
449 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
450 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
451 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
452 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
453 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
454 {"select",selectCommand,2,REDIS_CMD_INLINE},
455 {"move",moveCommand,3,REDIS_CMD_INLINE},
456 {"rename",renameCommand,3,REDIS_CMD_INLINE},
457 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
458 {"expire",expireCommand,3,REDIS_CMD_INLINE},
459 {"keys",keysCommand,2,REDIS_CMD_INLINE},
460 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
461 {"auth",authCommand,2,REDIS_CMD_INLINE},
462 {"ping",pingCommand,1,REDIS_CMD_INLINE},
463 {"echo",echoCommand,2,REDIS_CMD_BULK},
464 {"save",saveCommand,1,REDIS_CMD_INLINE},
465 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
466 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
467 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
468 {"type",typeCommand,2,REDIS_CMD_INLINE},
469 {"sync",syncCommand,1,REDIS_CMD_INLINE},
470 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
471 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
472 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
473 {"info",infoCommand,1,REDIS_CMD_INLINE},
474 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
475 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
476 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
477 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
478 {NULL,NULL,0,0}
479 };
480 /*============================ Utility functions ============================ */
481
482 /* Glob-style pattern matching. */
483 int stringmatchlen(const char *pattern, int patternLen,
484 const char *string, int stringLen, int nocase)
485 {
486 while(patternLen) {
487 switch(pattern[0]) {
488 case '*':
489 while (pattern[1] == '*') {
490 pattern++;
491 patternLen--;
492 }
493 if (patternLen == 1)
494 return 1; /* match */
495 while(stringLen) {
496 if (stringmatchlen(pattern+1, patternLen-1,
497 string, stringLen, nocase))
498 return 1; /* match */
499 string++;
500 stringLen--;
501 }
502 return 0; /* no match */
503 break;
504 case '?':
505 if (stringLen == 0)
506 return 0; /* no match */
507 string++;
508 stringLen--;
509 break;
510 case '[':
511 {
512 int not, match;
513
514 pattern++;
515 patternLen--;
516 not = pattern[0] == '^';
517 if (not) {
518 pattern++;
519 patternLen--;
520 }
521 match = 0;
522 while(1) {
523 if (pattern[0] == '\\') {
524 pattern++;
525 patternLen--;
526 if (pattern[0] == string[0])
527 match = 1;
528 } else if (pattern[0] == ']') {
529 break;
530 } else if (patternLen == 0) {
531 pattern--;
532 patternLen++;
533 break;
534 } else if (pattern[1] == '-' && patternLen >= 3) {
535 int start = pattern[0];
536 int end = pattern[2];
537 int c = string[0];
538 if (start > end) {
539 int t = start;
540 start = end;
541 end = t;
542 }
543 if (nocase) {
544 start = tolower(start);
545 end = tolower(end);
546 c = tolower(c);
547 }
548 pattern += 2;
549 patternLen -= 2;
550 if (c >= start && c <= end)
551 match = 1;
552 } else {
553 if (!nocase) {
554 if (pattern[0] == string[0])
555 match = 1;
556 } else {
557 if (tolower((int)pattern[0]) == tolower((int)string[0]))
558 match = 1;
559 }
560 }
561 pattern++;
562 patternLen--;
563 }
564 if (not)
565 match = !match;
566 if (!match)
567 return 0; /* no match */
568 string++;
569 stringLen--;
570 break;
571 }
572 case '\\':
573 if (patternLen >= 2) {
574 pattern++;
575 patternLen--;
576 }
577 /* fall through */
578 default:
579 if (!nocase) {
580 if (pattern[0] != string[0])
581 return 0; /* no match */
582 } else {
583 if (tolower((int)pattern[0]) != tolower((int)string[0]))
584 return 0; /* no match */
585 }
586 string++;
587 stringLen--;
588 break;
589 }
590 pattern++;
591 patternLen--;
592 if (stringLen == 0) {
593 while(*pattern == '*') {
594 pattern++;
595 patternLen--;
596 }
597 break;
598 }
599 }
600 if (patternLen == 0 && stringLen == 0)
601 return 1;
602 return 0;
603 }
604
605 static void redisLog(int level, const char *fmt, ...) {
606 va_list ap;
607 FILE *fp;
608
609 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
610 if (!fp) return;
611
612 va_start(ap, fmt);
613 if (level >= server.verbosity) {
614 char *c = ".-*";
615 char buf[64];
616 time_t now;
617
618 now = time(NULL);
619 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
620 fprintf(fp,"%s %c ",buf,c[level]);
621 vfprintf(fp, fmt, ap);
622 fprintf(fp,"\n");
623 fflush(fp);
624 }
625 va_end(ap);
626
627 if (server.logfile) fclose(fp);
628 }
629
630 /*====================== Hash table type implementation ==================== */
631
632 /* This is an hash table type that uses the SDS dynamic strings libary as
633 * keys and radis objects as values (objects can hold SDS strings,
634 * lists, sets). */
635
636 static int sdsDictKeyCompare(void *privdata, const void *key1,
637 const void *key2)
638 {
639 int l1,l2;
640 DICT_NOTUSED(privdata);
641
642 l1 = sdslen((sds)key1);
643 l2 = sdslen((sds)key2);
644 if (l1 != l2) return 0;
645 return memcmp(key1, key2, l1) == 0;
646 }
647
648 static void dictRedisObjectDestructor(void *privdata, void *val)
649 {
650 DICT_NOTUSED(privdata);
651
652 decrRefCount(val);
653 }
654
655 static int dictObjKeyCompare(void *privdata, const void *key1,
656 const void *key2)
657 {
658 const robj *o1 = key1, *o2 = key2;
659 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
660 }
661
662 static unsigned int dictObjHash(const void *key) {
663 const robj *o = key;
664 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
665 }
666
667 static int dictEncObjKeyCompare(void *privdata, const void *key1,
668 const void *key2)
669 {
670 const robj *o1 = key1, *o2 = key2;
671
672 if (o1->encoding == REDIS_ENCODING_RAW &&
673 o2->encoding == REDIS_ENCODING_RAW)
674 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
675 else {
676 robj *dec1, *dec2;
677 int cmp;
678
679 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
680 getDecodedObject(o1) : (robj*)o1;
681 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
682 getDecodedObject(o2) : (robj*)o2;
683 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
684 if (dec1 != o1) decrRefCount(dec1);
685 if (dec2 != o2) decrRefCount(dec2);
686 return cmp;
687 }
688 }
689
690 static unsigned int dictEncObjHash(const void *key) {
691 const robj *o = key;
692
693 if (o->encoding == REDIS_ENCODING_RAW)
694 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
695 else {
696 robj *dec = getDecodedObject(o);
697 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
698 decrRefCount(dec);
699 return hash;
700 }
701 }
702
703 static dictType setDictType = {
704 dictEncObjHash, /* hash function */
705 NULL, /* key dup */
706 NULL, /* val dup */
707 dictEncObjKeyCompare, /* key compare */
708 dictRedisObjectDestructor, /* key destructor */
709 NULL /* val destructor */
710 };
711
712 static dictType hashDictType = {
713 dictObjHash, /* hash function */
714 NULL, /* key dup */
715 NULL, /* val dup */
716 dictObjKeyCompare, /* key compare */
717 dictRedisObjectDestructor, /* key destructor */
718 dictRedisObjectDestructor /* val destructor */
719 };
720
721 /* ========================= Random utility functions ======================= */
722
723 /* Redis generally does not try to recover from out of memory conditions
724 * when allocating objects or strings, it is not clear if it will be possible
725 * to report this condition to the client since the networking layer itself
726 * is based on heap allocation for send buffers, so we simply abort.
727 * At least the code will be simpler to read... */
728 static void oom(const char *msg) {
729 fprintf(stderr, "%s: Out of memory\n",msg);
730 fflush(stderr);
731 sleep(1);
732 abort();
733 }
734
735 /* ====================== Redis server networking stuff ===================== */
736 static void closeTimedoutClients(void) {
737 redisClient *c;
738 listNode *ln;
739 time_t now = time(NULL);
740
741 listRewind(server.clients);
742 while ((ln = listYield(server.clients)) != NULL) {
743 c = listNodeValue(ln);
744 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
745 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
746 (now - c->lastinteraction > server.maxidletime)) {
747 redisLog(REDIS_DEBUG,"Closing idle client");
748 freeClient(c);
749 }
750 }
751 }
752
753 static int htNeedsResize(dict *dict) {
754 long long size, used;
755
756 size = dictSlots(dict);
757 used = dictSize(dict);
758 return (size && used && size > DICT_HT_INITIAL_SIZE &&
759 (used*100/size < REDIS_HT_MINFILL));
760 }
761
762 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
763 * we resize the hash table to save memory */
764 static void tryResizeHashTables(void) {
765 int j;
766
767 for (j = 0; j < server.dbnum; j++) {
768 if (htNeedsResize(server.db[j].dict)) {
769 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
770 dictResize(server.db[j].dict);
771 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
772 }
773 if (htNeedsResize(server.db[j].expires))
774 dictResize(server.db[j].expires);
775 }
776 }
777
778 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
779 int j, loops = server.cronloops++;
780 REDIS_NOTUSED(eventLoop);
781 REDIS_NOTUSED(id);
782 REDIS_NOTUSED(clientData);
783
784 /* Update the global state with the amount of used memory */
785 server.usedmemory = zmalloc_used_memory();
786
787 /* Show some info about non-empty databases */
788 for (j = 0; j < server.dbnum; j++) {
789 long long size, used, vkeys;
790
791 size = dictSlots(server.db[j].dict);
792 used = dictSize(server.db[j].dict);
793 vkeys = dictSize(server.db[j].expires);
794 if (!(loops % 5) && (used || vkeys)) {
795 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
796 /* dictPrintStats(server.dict); */
797 }
798 }
799
800 /* We don't want to resize the hash tables while a bacground saving
801 * is in progress: the saving child is created using fork() that is
802 * implemented with a copy-on-write semantic in most modern systems, so
803 * if we resize the HT while there is the saving child at work actually
804 * a lot of memory movements in the parent will cause a lot of pages
805 * copied. */
806 if (!server.bgsaveinprogress) tryResizeHashTables();
807
808 /* Show information about connected clients */
809 if (!(loops % 5)) {
810 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
811 listLength(server.clients)-listLength(server.slaves),
812 listLength(server.slaves),
813 server.usedmemory,
814 dictSize(server.sharingpool));
815 }
816
817 /* Close connections of timedout clients */
818 if (server.maxidletime && !(loops % 10))
819 closeTimedoutClients();
820
821 /* Check if a background saving in progress terminated */
822 if (server.bgsaveinprogress) {
823 int statloc;
824 if (wait4(-1,&statloc,WNOHANG,NULL)) {
825 int exitcode = WEXITSTATUS(statloc);
826 int bysignal = WIFSIGNALED(statloc);
827
828 if (!bysignal && exitcode == 0) {
829 redisLog(REDIS_NOTICE,
830 "Background saving terminated with success");
831 server.dirty = 0;
832 server.lastsave = time(NULL);
833 } else if (!bysignal && exitcode != 0) {
834 redisLog(REDIS_WARNING, "Background saving error");
835 } else {
836 redisLog(REDIS_WARNING,
837 "Background saving terminated by signal");
838 rdbRemoveTempFile(server.bgsavechildpid);
839 }
840 server.bgsaveinprogress = 0;
841 server.bgsavechildpid = -1;
842 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
843 }
844 } else {
845 /* If there is not a background saving in progress check if
846 * we have to save now */
847 time_t now = time(NULL);
848 for (j = 0; j < server.saveparamslen; j++) {
849 struct saveparam *sp = server.saveparams+j;
850
851 if (server.dirty >= sp->changes &&
852 now-server.lastsave > sp->seconds) {
853 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
854 sp->changes, sp->seconds);
855 rdbSaveBackground(server.dbfilename);
856 break;
857 }
858 }
859 }
860
861 /* Try to expire a few timed out keys */
862 for (j = 0; j < server.dbnum; j++) {
863 redisDb *db = server.db+j;
864 int num = dictSize(db->expires);
865
866 if (num) {
867 time_t now = time(NULL);
868
869 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
870 num = REDIS_EXPIRELOOKUPS_PER_CRON;
871 while (num--) {
872 dictEntry *de;
873 time_t t;
874
875 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
876 t = (time_t) dictGetEntryVal(de);
877 if (now > t) {
878 deleteKey(db,dictGetEntryKey(de));
879 }
880 }
881 }
882 }
883
884 /* Check if we should connect to a MASTER */
885 if (server.replstate == REDIS_REPL_CONNECT) {
886 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
887 if (syncWithMaster() == REDIS_OK) {
888 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
889 }
890 }
891 return 1000;
892 }
893
894 static void createSharedObjects(void) {
895 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
896 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
897 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
898 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
899 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
900 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
901 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
902 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
903 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
904 /* no such key */
905 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
906 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
907 "-ERR Operation against a key holding the wrong kind of value\r\n"));
908 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
909 "-ERR no such key\r\n"));
910 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
911 "-ERR syntax error\r\n"));
912 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
913 "-ERR source and destination objects are the same\r\n"));
914 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
915 "-ERR index out of range\r\n"));
916 shared.space = createObject(REDIS_STRING,sdsnew(" "));
917 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
918 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
919 shared.select0 = createStringObject("select 0\r\n",10);
920 shared.select1 = createStringObject("select 1\r\n",10);
921 shared.select2 = createStringObject("select 2\r\n",10);
922 shared.select3 = createStringObject("select 3\r\n",10);
923 shared.select4 = createStringObject("select 4\r\n",10);
924 shared.select5 = createStringObject("select 5\r\n",10);
925 shared.select6 = createStringObject("select 6\r\n",10);
926 shared.select7 = createStringObject("select 7\r\n",10);
927 shared.select8 = createStringObject("select 8\r\n",10);
928 shared.select9 = createStringObject("select 9\r\n",10);
929 }
930
931 static void appendServerSaveParams(time_t seconds, int changes) {
932 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
933 if (server.saveparams == NULL) oom("appendServerSaveParams");
934 server.saveparams[server.saveparamslen].seconds = seconds;
935 server.saveparams[server.saveparamslen].changes = changes;
936 server.saveparamslen++;
937 }
938
939 static void ResetServerSaveParams() {
940 zfree(server.saveparams);
941 server.saveparams = NULL;
942 server.saveparamslen = 0;
943 }
944
945 static void initServerConfig() {
946 server.dbnum = REDIS_DEFAULT_DBNUM;
947 server.port = REDIS_SERVERPORT;
948 server.verbosity = REDIS_DEBUG;
949 server.maxidletime = REDIS_MAXIDLETIME;
950 server.saveparams = NULL;
951 server.logfile = NULL; /* NULL = log on standard output */
952 server.bindaddr = NULL;
953 server.glueoutputbuf = 1;
954 server.daemonize = 0;
955 server.pidfile = "/var/run/redis.pid";
956 server.dbfilename = "dump.rdb";
957 server.requirepass = NULL;
958 server.shareobjects = 0;
959 server.sharingpoolsize = 1024;
960 server.maxclients = 0;
961 server.maxmemory = 0;
962 ResetServerSaveParams();
963
964 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
965 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
966 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
967 /* Replication related */
968 server.isslave = 0;
969 server.masterhost = NULL;
970 server.masterport = 6379;
971 server.master = NULL;
972 server.replstate = REDIS_REPL_NONE;
973 }
974
975 static void initServer() {
976 int j;
977
978 signal(SIGHUP, SIG_IGN);
979 signal(SIGPIPE, SIG_IGN);
980 setupSigSegvAction();
981
982 server.clients = listCreate();
983 server.slaves = listCreate();
984 server.monitors = listCreate();
985 server.objfreelist = listCreate();
986 createSharedObjects();
987 server.el = aeCreateEventLoop();
988 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
989 server.sharingpool = dictCreate(&setDictType,NULL);
990 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
991 oom("server initialization"); /* Fatal OOM */
992 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
993 if (server.fd == -1) {
994 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
995 exit(1);
996 }
997 for (j = 0; j < server.dbnum; j++) {
998 server.db[j].dict = dictCreate(&hashDictType,NULL);
999 server.db[j].expires = dictCreate(&setDictType,NULL);
1000 server.db[j].id = j;
1001 }
1002 server.cronloops = 0;
1003 server.bgsaveinprogress = 0;
1004 server.bgsavechildpid = -1;
1005 server.lastsave = time(NULL);
1006 server.dirty = 0;
1007 server.usedmemory = 0;
1008 server.stat_numcommands = 0;
1009 server.stat_numconnections = 0;
1010 server.stat_starttime = time(NULL);
1011 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1012 }
1013
1014 /* Empty the whole database */
1015 static long long emptyDb() {
1016 int j;
1017 long long removed = 0;
1018
1019 for (j = 0; j < server.dbnum; j++) {
1020 removed += dictSize(server.db[j].dict);
1021 dictEmpty(server.db[j].dict);
1022 dictEmpty(server.db[j].expires);
1023 }
1024 return removed;
1025 }
1026
1027 static int yesnotoi(char *s) {
1028 if (!strcasecmp(s,"yes")) return 1;
1029 else if (!strcasecmp(s,"no")) return 0;
1030 else return -1;
1031 }
1032
1033 /* I agree, this is a very rudimental way to load a configuration...
1034 will improve later if the config gets more complex */
1035 static void loadServerConfig(char *filename) {
1036 FILE *fp;
1037 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1038 int linenum = 0;
1039 sds line = NULL;
1040
1041 if (filename[0] == '-' && filename[1] == '\0')
1042 fp = stdin;
1043 else {
1044 if ((fp = fopen(filename,"r")) == NULL) {
1045 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1046 exit(1);
1047 }
1048 }
1049
1050 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1051 sds *argv;
1052 int argc, j;
1053
1054 linenum++;
1055 line = sdsnew(buf);
1056 line = sdstrim(line," \t\r\n");
1057
1058 /* Skip comments and blank lines*/
1059 if (line[0] == '#' || line[0] == '\0') {
1060 sdsfree(line);
1061 continue;
1062 }
1063
1064 /* Split into arguments */
1065 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1066 sdstolower(argv[0]);
1067
1068 /* Execute config directives */
1069 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1070 server.maxidletime = atoi(argv[1]);
1071 if (server.maxidletime < 0) {
1072 err = "Invalid timeout value"; goto loaderr;
1073 }
1074 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1075 server.port = atoi(argv[1]);
1076 if (server.port < 1 || server.port > 65535) {
1077 err = "Invalid port"; goto loaderr;
1078 }
1079 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1080 server.bindaddr = zstrdup(argv[1]);
1081 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1082 int seconds = atoi(argv[1]);
1083 int changes = atoi(argv[2]);
1084 if (seconds < 1 || changes < 0) {
1085 err = "Invalid save parameters"; goto loaderr;
1086 }
1087 appendServerSaveParams(seconds,changes);
1088 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1089 if (chdir(argv[1]) == -1) {
1090 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1091 argv[1], strerror(errno));
1092 exit(1);
1093 }
1094 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1095 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1096 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1097 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1098 else {
1099 err = "Invalid log level. Must be one of debug, notice, warning";
1100 goto loaderr;
1101 }
1102 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1103 FILE *logfp;
1104
1105 server.logfile = zstrdup(argv[1]);
1106 if (!strcasecmp(server.logfile,"stdout")) {
1107 zfree(server.logfile);
1108 server.logfile = NULL;
1109 }
1110 if (server.logfile) {
1111 /* Test if we are able to open the file. The server will not
1112 * be able to abort just for this problem later... */
1113 logfp = fopen(server.logfile,"a");
1114 if (logfp == NULL) {
1115 err = sdscatprintf(sdsempty(),
1116 "Can't open the log file: %s", strerror(errno));
1117 goto loaderr;
1118 }
1119 fclose(logfp);
1120 }
1121 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1122 server.dbnum = atoi(argv[1]);
1123 if (server.dbnum < 1) {
1124 err = "Invalid number of databases"; goto loaderr;
1125 }
1126 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1127 server.maxclients = atoi(argv[1]);
1128 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1129 server.maxmemory = strtoll(argv[1], NULL, 10);
1130 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1131 server.masterhost = sdsnew(argv[1]);
1132 server.masterport = atoi(argv[2]);
1133 server.replstate = REDIS_REPL_CONNECT;
1134 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1135 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1136 err = "argument must be 'yes' or 'no'"; goto loaderr;
1137 }
1138 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1139 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1140 err = "argument must be 'yes' or 'no'"; goto loaderr;
1141 }
1142 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1143 server.sharingpoolsize = atoi(argv[1]);
1144 if (server.sharingpoolsize < 1) {
1145 err = "invalid object sharing pool size"; goto loaderr;
1146 }
1147 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1148 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1149 err = "argument must be 'yes' or 'no'"; goto loaderr;
1150 }
1151 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1152 server.requirepass = zstrdup(argv[1]);
1153 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1154 server.pidfile = zstrdup(argv[1]);
1155 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1156 server.dbfilename = zstrdup(argv[1]);
1157 } else {
1158 err = "Bad directive or wrong number of arguments"; goto loaderr;
1159 }
1160 for (j = 0; j < argc; j++)
1161 sdsfree(argv[j]);
1162 zfree(argv);
1163 sdsfree(line);
1164 }
1165 if (fp != stdin) fclose(fp);
1166 return;
1167
1168 loaderr:
1169 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1170 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1171 fprintf(stderr, ">>> '%s'\n", line);
1172 fprintf(stderr, "%s\n", err);
1173 exit(1);
1174 }
1175
1176 static void freeClientArgv(redisClient *c) {
1177 int j;
1178
1179 for (j = 0; j < c->argc; j++)
1180 decrRefCount(c->argv[j]);
1181 for (j = 0; j < c->mbargc; j++)
1182 decrRefCount(c->mbargv[j]);
1183 c->argc = 0;
1184 c->mbargc = 0;
1185 }
1186
1187 static void freeClient(redisClient *c) {
1188 listNode *ln;
1189
1190 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1191 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1192 sdsfree(c->querybuf);
1193 listRelease(c->reply);
1194 freeClientArgv(c);
1195 close(c->fd);
1196 ln = listSearchKey(server.clients,c);
1197 assert(ln != NULL);
1198 listDelNode(server.clients,ln);
1199 if (c->flags & REDIS_SLAVE) {
1200 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1201 close(c->repldbfd);
1202 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1203 ln = listSearchKey(l,c);
1204 assert(ln != NULL);
1205 listDelNode(l,ln);
1206 }
1207 if (c->flags & REDIS_MASTER) {
1208 server.master = NULL;
1209 server.replstate = REDIS_REPL_CONNECT;
1210 }
1211 zfree(c->argv);
1212 zfree(c->mbargv);
1213 zfree(c);
1214 }
1215
1216 static void glueReplyBuffersIfNeeded(redisClient *c) {
1217 int totlen = 0;
1218 listNode *ln;
1219 robj *o;
1220
1221 listRewind(c->reply);
1222 while((ln = listYield(c->reply))) {
1223 o = ln->value;
1224 totlen += sdslen(o->ptr);
1225 /* This optimization makes more sense if we don't have to copy
1226 * too much data */
1227 if (totlen > 1024) return;
1228 }
1229 if (totlen > 0) {
1230 char buf[1024];
1231 int copylen = 0;
1232
1233 listRewind(c->reply);
1234 while((ln = listYield(c->reply))) {
1235 o = ln->value;
1236 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1237 copylen += sdslen(o->ptr);
1238 listDelNode(c->reply,ln);
1239 }
1240 /* Now the output buffer is empty, add the new single element */
1241 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1242 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1243 }
1244 }
1245
1246 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1247 redisClient *c = privdata;
1248 int nwritten = 0, totwritten = 0, objlen;
1249 robj *o;
1250 REDIS_NOTUSED(el);
1251 REDIS_NOTUSED(mask);
1252
1253 if (server.glueoutputbuf && listLength(c->reply) > 1)
1254 glueReplyBuffersIfNeeded(c);
1255 while(listLength(c->reply)) {
1256 o = listNodeValue(listFirst(c->reply));
1257 objlen = sdslen(o->ptr);
1258
1259 if (objlen == 0) {
1260 listDelNode(c->reply,listFirst(c->reply));
1261 continue;
1262 }
1263
1264 if (c->flags & REDIS_MASTER) {
1265 /* Don't reply to a master */
1266 nwritten = objlen - c->sentlen;
1267 } else {
1268 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1269 if (nwritten <= 0) break;
1270 }
1271 c->sentlen += nwritten;
1272 totwritten += nwritten;
1273 /* If we fully sent the object on head go to the next one */
1274 if (c->sentlen == objlen) {
1275 listDelNode(c->reply,listFirst(c->reply));
1276 c->sentlen = 0;
1277 }
1278 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1279 * bytes, in a single threaded server it's a good idea to server
1280 * other clients as well, even if a very large request comes from
1281 * super fast link that is always able to accept data (in real world
1282 * terms think to 'KEYS *' against the loopback interfae) */
1283 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1284 }
1285 if (nwritten == -1) {
1286 if (errno == EAGAIN) {
1287 nwritten = 0;
1288 } else {
1289 redisLog(REDIS_DEBUG,
1290 "Error writing to client: %s", strerror(errno));
1291 freeClient(c);
1292 return;
1293 }
1294 }
1295 if (totwritten > 0) c->lastinteraction = time(NULL);
1296 if (listLength(c->reply) == 0) {
1297 c->sentlen = 0;
1298 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1299 }
1300 }
1301
1302 static struct redisCommand *lookupCommand(char *name) {
1303 int j = 0;
1304 while(cmdTable[j].name != NULL) {
1305 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1306 j++;
1307 }
1308 return NULL;
1309 }
1310
1311 /* resetClient prepare the client to process the next command */
1312 static void resetClient(redisClient *c) {
1313 freeClientArgv(c);
1314 c->bulklen = -1;
1315 c->multibulk = 0;
1316 }
1317
1318 /* If this function gets called we already read a whole
1319 * command, argments are in the client argv/argc fields.
1320 * processCommand() execute the command or prepare the
1321 * server for a bulk read from the client.
1322 *
1323 * If 1 is returned the client is still alive and valid and
1324 * and other operations can be performed by the caller. Otherwise
1325 * if 0 is returned the client was destroied (i.e. after QUIT). */
1326 static int processCommand(redisClient *c) {
1327 struct redisCommand *cmd;
1328 long long dirty;
1329
1330 /* Free some memory if needed (maxmemory setting) */
1331 if (server.maxmemory) freeMemoryIfNeeded();
1332
1333 /* Handle the multi bulk command type. This is an alternative protocol
1334 * supported by Redis in order to receive commands that are composed of
1335 * multiple binary-safe "bulk" arguments. The latency of processing is
1336 * a bit higher but this allows things like multi-sets, so if this
1337 * protocol is used only for MSET and similar commands this is a big win. */
1338 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1339 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1340 if (c->multibulk <= 0) {
1341 resetClient(c);
1342 return 1;
1343 } else {
1344 decrRefCount(c->argv[c->argc-1]);
1345 c->argc--;
1346 return 1;
1347 }
1348 } else if (c->multibulk) {
1349 if (c->bulklen == -1) {
1350 if (((char*)c->argv[0]->ptr)[0] != '$') {
1351 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1352 resetClient(c);
1353 return 1;
1354 } else {
1355 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1356 decrRefCount(c->argv[0]);
1357 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1358 c->argc--;
1359 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1360 resetClient(c);
1361 return 1;
1362 }
1363 c->argc--;
1364 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1365 return 1;
1366 }
1367 } else {
1368 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1369 c->mbargv[c->mbargc] = c->argv[0];
1370 c->mbargc++;
1371 c->argc--;
1372 c->multibulk--;
1373 if (c->multibulk == 0) {
1374 robj **auxargv;
1375 int auxargc;
1376
1377 /* Here we need to swap the multi-bulk argc/argv with the
1378 * normal argc/argv of the client structure. */
1379 auxargv = c->argv;
1380 c->argv = c->mbargv;
1381 c->mbargv = auxargv;
1382
1383 auxargc = c->argc;
1384 c->argc = c->mbargc;
1385 c->mbargc = auxargc;
1386
1387 /* We need to set bulklen to something different than -1
1388 * in order for the code below to process the command without
1389 * to try to read the last argument of a bulk command as
1390 * a special argument. */
1391 c->bulklen = 0;
1392 /* continue below and process the command */
1393 } else {
1394 c->bulklen = -1;
1395 return 1;
1396 }
1397 }
1398 }
1399 /* -- end of multi bulk commands processing -- */
1400
1401 /* The QUIT command is handled as a special case. Normal command
1402 * procs are unable to close the client connection safely */
1403 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1404 freeClient(c);
1405 return 0;
1406 }
1407 cmd = lookupCommand(c->argv[0]->ptr);
1408 if (!cmd) {
1409 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1410 resetClient(c);
1411 return 1;
1412 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1413 (c->argc < -cmd->arity)) {
1414 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1415 resetClient(c);
1416 return 1;
1417 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1418 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1419 resetClient(c);
1420 return 1;
1421 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1422 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1423
1424 decrRefCount(c->argv[c->argc-1]);
1425 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1426 c->argc--;
1427 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1428 resetClient(c);
1429 return 1;
1430 }
1431 c->argc--;
1432 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1433 /* It is possible that the bulk read is already in the
1434 * buffer. Check this condition and handle it accordingly.
1435 * This is just a fast path, alternative to call processInputBuffer().
1436 * It's a good idea since the code is small and this condition
1437 * happens most of the times. */
1438 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1439 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1440 c->argc++;
1441 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1442 } else {
1443 return 1;
1444 }
1445 }
1446 /* Let's try to share objects on the command arguments vector */
1447 if (server.shareobjects) {
1448 int j;
1449 for(j = 1; j < c->argc; j++)
1450 c->argv[j] = tryObjectSharing(c->argv[j]);
1451 }
1452 /* Let's try to encode the bulk object to save space. */
1453 if (cmd->flags & REDIS_CMD_BULK)
1454 tryObjectEncoding(c->argv[c->argc-1]);
1455
1456 /* Check if the user is authenticated */
1457 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1458 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1459 resetClient(c);
1460 return 1;
1461 }
1462
1463 /* Exec the command */
1464 dirty = server.dirty;
1465 cmd->proc(c);
1466 if (server.dirty-dirty != 0 && listLength(server.slaves))
1467 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1468 if (listLength(server.monitors))
1469 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1470 server.stat_numcommands++;
1471
1472 /* Prepare the client for the next command */
1473 if (c->flags & REDIS_CLOSE) {
1474 freeClient(c);
1475 return 0;
1476 }
1477 resetClient(c);
1478 return 1;
1479 }
1480
1481 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1482 listNode *ln;
1483 int outc = 0, j;
1484 robj **outv;
1485 /* (args*2)+1 is enough room for args, spaces, newlines */
1486 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1487
1488 if (argc <= REDIS_STATIC_ARGS) {
1489 outv = static_outv;
1490 } else {
1491 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1492 if (!outv) oom("replicationFeedSlaves");
1493 }
1494
1495 for (j = 0; j < argc; j++) {
1496 if (j != 0) outv[outc++] = shared.space;
1497 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1498 robj *lenobj;
1499
1500 lenobj = createObject(REDIS_STRING,
1501 sdscatprintf(sdsempty(),"%d\r\n",
1502 stringObjectLen(argv[j])));
1503 lenobj->refcount = 0;
1504 outv[outc++] = lenobj;
1505 }
1506 outv[outc++] = argv[j];
1507 }
1508 outv[outc++] = shared.crlf;
1509
1510 /* Increment all the refcounts at start and decrement at end in order to
1511 * be sure to free objects if there is no slave in a replication state
1512 * able to be feed with commands */
1513 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1514 listRewind(slaves);
1515 while((ln = listYield(slaves))) {
1516 redisClient *slave = ln->value;
1517
1518 /* Don't feed slaves that are still waiting for BGSAVE to start */
1519 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1520
1521 /* Feed all the other slaves, MONITORs and so on */
1522 if (slave->slaveseldb != dictid) {
1523 robj *selectcmd;
1524
1525 switch(dictid) {
1526 case 0: selectcmd = shared.select0; break;
1527 case 1: selectcmd = shared.select1; break;
1528 case 2: selectcmd = shared.select2; break;
1529 case 3: selectcmd = shared.select3; break;
1530 case 4: selectcmd = shared.select4; break;
1531 case 5: selectcmd = shared.select5; break;
1532 case 6: selectcmd = shared.select6; break;
1533 case 7: selectcmd = shared.select7; break;
1534 case 8: selectcmd = shared.select8; break;
1535 case 9: selectcmd = shared.select9; break;
1536 default:
1537 selectcmd = createObject(REDIS_STRING,
1538 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1539 selectcmd->refcount = 0;
1540 break;
1541 }
1542 addReply(slave,selectcmd);
1543 slave->slaveseldb = dictid;
1544 }
1545 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1546 }
1547 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1548 if (outv != static_outv) zfree(outv);
1549 }
1550
1551 static void processInputBuffer(redisClient *c) {
1552 again:
1553 if (c->bulklen == -1) {
1554 /* Read the first line of the query */
1555 char *p = strchr(c->querybuf,'\n');
1556 size_t querylen;
1557
1558 if (p) {
1559 sds query, *argv;
1560 int argc, j;
1561
1562 query = c->querybuf;
1563 c->querybuf = sdsempty();
1564 querylen = 1+(p-(query));
1565 if (sdslen(query) > querylen) {
1566 /* leave data after the first line of the query in the buffer */
1567 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1568 }
1569 *p = '\0'; /* remove "\n" */
1570 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1571 sdsupdatelen(query);
1572
1573 /* Now we can split the query in arguments */
1574 if (sdslen(query) == 0) {
1575 /* Ignore empty query */
1576 sdsfree(query);
1577 return;
1578 }
1579 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1580 if (argv == NULL) oom("sdssplitlen");
1581 sdsfree(query);
1582
1583 if (c->argv) zfree(c->argv);
1584 c->argv = zmalloc(sizeof(robj*)*argc);
1585 if (c->argv == NULL) oom("allocating arguments list for client");
1586
1587 for (j = 0; j < argc; j++) {
1588 if (sdslen(argv[j])) {
1589 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1590 c->argc++;
1591 } else {
1592 sdsfree(argv[j]);
1593 }
1594 }
1595 zfree(argv);
1596 /* Execute the command. If the client is still valid
1597 * after processCommand() return and there is something
1598 * on the query buffer try to process the next command. */
1599 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1600 return;
1601 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1602 redisLog(REDIS_DEBUG, "Client protocol error");
1603 freeClient(c);
1604 return;
1605 }
1606 } else {
1607 /* Bulk read handling. Note that if we are at this point
1608 the client already sent a command terminated with a newline,
1609 we are reading the bulk data that is actually the last
1610 argument of the command. */
1611 int qbl = sdslen(c->querybuf);
1612
1613 if (c->bulklen <= qbl) {
1614 /* Copy everything but the final CRLF as final argument */
1615 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1616 c->argc++;
1617 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1618 /* Process the command. If the client is still valid after
1619 * the processing and there is more data in the buffer
1620 * try to parse it. */
1621 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1622 return;
1623 }
1624 }
1625 }
1626
1627 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1628 redisClient *c = (redisClient*) privdata;
1629 char buf[REDIS_IOBUF_LEN];
1630 int nread;
1631 REDIS_NOTUSED(el);
1632 REDIS_NOTUSED(mask);
1633
1634 nread = read(fd, buf, REDIS_IOBUF_LEN);
1635 if (nread == -1) {
1636 if (errno == EAGAIN) {
1637 nread = 0;
1638 } else {
1639 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1640 freeClient(c);
1641 return;
1642 }
1643 } else if (nread == 0) {
1644 redisLog(REDIS_DEBUG, "Client closed connection");
1645 freeClient(c);
1646 return;
1647 }
1648 if (nread) {
1649 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1650 c->lastinteraction = time(NULL);
1651 } else {
1652 return;
1653 }
1654 processInputBuffer(c);
1655 }
1656
1657 static int selectDb(redisClient *c, int id) {
1658 if (id < 0 || id >= server.dbnum)
1659 return REDIS_ERR;
1660 c->db = &server.db[id];
1661 return REDIS_OK;
1662 }
1663
1664 static void *dupClientReplyValue(void *o) {
1665 incrRefCount((robj*)o);
1666 return 0;
1667 }
1668
1669 static redisClient *createClient(int fd) {
1670 redisClient *c = zmalloc(sizeof(*c));
1671
1672 anetNonBlock(NULL,fd);
1673 anetTcpNoDelay(NULL,fd);
1674 if (!c) return NULL;
1675 selectDb(c,0);
1676 c->fd = fd;
1677 c->querybuf = sdsempty();
1678 c->argc = 0;
1679 c->argv = NULL;
1680 c->bulklen = -1;
1681 c->multibulk = 0;
1682 c->mbargc = 0;
1683 c->mbargv = NULL;
1684 c->sentlen = 0;
1685 c->flags = 0;
1686 c->lastinteraction = time(NULL);
1687 c->authenticated = 0;
1688 c->replstate = REDIS_REPL_NONE;
1689 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1690 listSetFreeMethod(c->reply,decrRefCount);
1691 listSetDupMethod(c->reply,dupClientReplyValue);
1692 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1693 readQueryFromClient, c, NULL) == AE_ERR) {
1694 freeClient(c);
1695 return NULL;
1696 }
1697 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1698 return c;
1699 }
1700
1701 static void addReply(redisClient *c, robj *obj) {
1702 if (listLength(c->reply) == 0 &&
1703 (c->replstate == REDIS_REPL_NONE ||
1704 c->replstate == REDIS_REPL_ONLINE) &&
1705 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1706 sendReplyToClient, c, NULL) == AE_ERR) return;
1707 if (obj->encoding != REDIS_ENCODING_RAW) {
1708 obj = getDecodedObject(obj);
1709 } else {
1710 incrRefCount(obj);
1711 }
1712 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1713 }
1714
1715 static void addReplySds(redisClient *c, sds s) {
1716 robj *o = createObject(REDIS_STRING,s);
1717 addReply(c,o);
1718 decrRefCount(o);
1719 }
1720
1721 static void addReplyBulkLen(redisClient *c, robj *obj) {
1722 size_t len;
1723
1724 if (obj->encoding == REDIS_ENCODING_RAW) {
1725 len = sdslen(obj->ptr);
1726 } else {
1727 long n = (long)obj->ptr;
1728
1729 len = 1;
1730 if (n < 0) {
1731 len++;
1732 n = -n;
1733 }
1734 while((n = n/10) != 0) {
1735 len++;
1736 }
1737 }
1738 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1739 }
1740
1741 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1742 int cport, cfd;
1743 char cip[128];
1744 redisClient *c;
1745 REDIS_NOTUSED(el);
1746 REDIS_NOTUSED(mask);
1747 REDIS_NOTUSED(privdata);
1748
1749 cfd = anetAccept(server.neterr, fd, cip, &cport);
1750 if (cfd == AE_ERR) {
1751 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1752 return;
1753 }
1754 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1755 if ((c = createClient(cfd)) == NULL) {
1756 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1757 close(cfd); /* May be already closed, just ingore errors */
1758 return;
1759 }
1760 /* If maxclient directive is set and this is one client more... close the
1761 * connection. Note that we create the client instead to check before
1762 * for this condition, since now the socket is already set in nonblocking
1763 * mode and we can send an error for free using the Kernel I/O */
1764 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1765 char *err = "-ERR max number of clients reached\r\n";
1766
1767 /* That's a best effort error message, don't check write errors */
1768 (void) write(c->fd,err,strlen(err));
1769 freeClient(c);
1770 return;
1771 }
1772 server.stat_numconnections++;
1773 }
1774
1775 /* ======================= Redis objects implementation ===================== */
1776
1777 static robj *createObject(int type, void *ptr) {
1778 robj *o;
1779
1780 if (listLength(server.objfreelist)) {
1781 listNode *head = listFirst(server.objfreelist);
1782 o = listNodeValue(head);
1783 listDelNode(server.objfreelist,head);
1784 } else {
1785 o = zmalloc(sizeof(*o));
1786 }
1787 if (!o) oom("createObject");
1788 o->type = type;
1789 o->encoding = REDIS_ENCODING_RAW;
1790 o->ptr = ptr;
1791 o->refcount = 1;
1792 return o;
1793 }
1794
1795 static robj *createStringObject(char *ptr, size_t len) {
1796 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1797 }
1798
1799 static robj *createListObject(void) {
1800 list *l = listCreate();
1801
1802 if (!l) oom("listCreate");
1803 listSetFreeMethod(l,decrRefCount);
1804 return createObject(REDIS_LIST,l);
1805 }
1806
1807 static robj *createSetObject(void) {
1808 dict *d = dictCreate(&setDictType,NULL);
1809 if (!d) oom("dictCreate");
1810 return createObject(REDIS_SET,d);
1811 }
1812
1813 static void freeStringObject(robj *o) {
1814 if (o->encoding == REDIS_ENCODING_RAW) {
1815 sdsfree(o->ptr);
1816 }
1817 }
1818
1819 static void freeListObject(robj *o) {
1820 listRelease((list*) o->ptr);
1821 }
1822
1823 static void freeSetObject(robj *o) {
1824 dictRelease((dict*) o->ptr);
1825 }
1826
1827 static void freeHashObject(robj *o) {
1828 dictRelease((dict*) o->ptr);
1829 }
1830
1831 static void incrRefCount(robj *o) {
1832 o->refcount++;
1833 #ifdef DEBUG_REFCOUNT
1834 if (o->type == REDIS_STRING)
1835 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1836 #endif
1837 }
1838
1839 static void decrRefCount(void *obj) {
1840 robj *o = obj;
1841
1842 #ifdef DEBUG_REFCOUNT
1843 if (o->type == REDIS_STRING)
1844 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1845 #endif
1846 if (--(o->refcount) == 0) {
1847 switch(o->type) {
1848 case REDIS_STRING: freeStringObject(o); break;
1849 case REDIS_LIST: freeListObject(o); break;
1850 case REDIS_SET: freeSetObject(o); break;
1851 case REDIS_HASH: freeHashObject(o); break;
1852 default: assert(0 != 0); break;
1853 }
1854 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1855 !listAddNodeHead(server.objfreelist,o))
1856 zfree(o);
1857 }
1858 }
1859
1860 static robj *lookupKey(redisDb *db, robj *key) {
1861 dictEntry *de = dictFind(db->dict,key);
1862 return de ? dictGetEntryVal(de) : NULL;
1863 }
1864
1865 static robj *lookupKeyRead(redisDb *db, robj *key) {
1866 expireIfNeeded(db,key);
1867 return lookupKey(db,key);
1868 }
1869
1870 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1871 deleteIfVolatile(db,key);
1872 return lookupKey(db,key);
1873 }
1874
1875 static int deleteKey(redisDb *db, robj *key) {
1876 int retval;
1877
1878 /* We need to protect key from destruction: after the first dictDelete()
1879 * it may happen that 'key' is no longer valid if we don't increment
1880 * it's count. This may happen when we get the object reference directly
1881 * from the hash table with dictRandomKey() or dict iterators */
1882 incrRefCount(key);
1883 if (dictSize(db->expires)) dictDelete(db->expires,key);
1884 retval = dictDelete(db->dict,key);
1885 decrRefCount(key);
1886
1887 return retval == DICT_OK;
1888 }
1889
1890 /* Try to share an object against the shared objects pool */
1891 static robj *tryObjectSharing(robj *o) {
1892 struct dictEntry *de;
1893 unsigned long c;
1894
1895 if (o == NULL || server.shareobjects == 0) return o;
1896
1897 assert(o->type == REDIS_STRING);
1898 de = dictFind(server.sharingpool,o);
1899 if (de) {
1900 robj *shared = dictGetEntryKey(de);
1901
1902 c = ((unsigned long) dictGetEntryVal(de))+1;
1903 dictGetEntryVal(de) = (void*) c;
1904 incrRefCount(shared);
1905 decrRefCount(o);
1906 return shared;
1907 } else {
1908 /* Here we are using a stream algorihtm: Every time an object is
1909 * shared we increment its count, everytime there is a miss we
1910 * recrement the counter of a random object. If this object reaches
1911 * zero we remove the object and put the current object instead. */
1912 if (dictSize(server.sharingpool) >=
1913 server.sharingpoolsize) {
1914 de = dictGetRandomKey(server.sharingpool);
1915 assert(de != NULL);
1916 c = ((unsigned long) dictGetEntryVal(de))-1;
1917 dictGetEntryVal(de) = (void*) c;
1918 if (c == 0) {
1919 dictDelete(server.sharingpool,de->key);
1920 }
1921 } else {
1922 c = 0; /* If the pool is empty we want to add this object */
1923 }
1924 if (c == 0) {
1925 int retval;
1926
1927 retval = dictAdd(server.sharingpool,o,(void*)1);
1928 assert(retval == DICT_OK);
1929 incrRefCount(o);
1930 }
1931 return o;
1932 }
1933 }
1934
1935 /* Check if the nul-terminated string 's' can be represented by a long
1936 * (that is, is a number that fits into long without any other space or
1937 * character before or after the digits).
1938 *
1939 * If so, the function returns REDIS_OK and *longval is set to the value
1940 * of the number. Otherwise REDIS_ERR is returned */
1941 static int isStringRepresentableAsLong(sds s, long *longval) {
1942 char buf[32], *endptr;
1943 long value;
1944 int slen;
1945
1946 value = strtol(s, &endptr, 10);
1947 if (endptr[0] != '\0') return REDIS_ERR;
1948 slen = snprintf(buf,32,"%ld",value);
1949
1950 /* If the number converted back into a string is not identical
1951 * then it's not possible to encode the string as integer */
1952 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
1953 if (longval) *longval = value;
1954 return REDIS_OK;
1955 }
1956
1957 /* Try to encode a string object in order to save space */
1958 static int tryObjectEncoding(robj *o) {
1959 long value;
1960 sds s = o->ptr;
1961
1962 if (o->encoding != REDIS_ENCODING_RAW)
1963 return REDIS_ERR; /* Already encoded */
1964
1965 /* It's not save to encode shared objects: shared objects can be shared
1966 * everywhere in the "object space" of Redis. Encoded objects can only
1967 * appear as "values" (and not, for instance, as keys) */
1968 if (o->refcount > 1) return REDIS_ERR;
1969
1970 /* Currently we try to encode only strings */
1971 assert(o->type == REDIS_STRING);
1972
1973 /* Check if we can represent this string as a long integer */
1974 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
1975
1976 /* Ok, this object can be encoded */
1977 o->encoding = REDIS_ENCODING_INT;
1978 sdsfree(o->ptr);
1979 o->ptr = (void*) value;
1980 return REDIS_OK;
1981 }
1982
1983 /* Get a decoded version of an encoded object (returned as a new object) */
1984 static robj *getDecodedObject(const robj *o) {
1985 robj *dec;
1986
1987 assert(o->encoding != REDIS_ENCODING_RAW);
1988 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
1989 char buf[32];
1990
1991 snprintf(buf,32,"%ld",(long)o->ptr);
1992 dec = createStringObject(buf,strlen(buf));
1993 return dec;
1994 } else {
1995 assert(1 != 1);
1996 }
1997 }
1998
1999 static int compareStringObjects(robj *a, robj *b) {
2000 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2001
2002 if (a->encoding == REDIS_ENCODING_INT && b->encoding == REDIS_ENCODING_INT){
2003 return (long)a->ptr - (long)b->ptr;
2004 } else {
2005 int retval;
2006
2007 incrRefCount(a);
2008 incrRefCount(b);
2009 if (a->encoding != REDIS_ENCODING_RAW) a = getDecodedObject(a);
2010 if (b->encoding != REDIS_ENCODING_RAW) b = getDecodedObject(a);
2011 retval = sdscmp(a->ptr,b->ptr);
2012 decrRefCount(a);
2013 decrRefCount(b);
2014 return retval;
2015 }
2016 }
2017
2018 static size_t stringObjectLen(robj *o) {
2019 assert(o->type == REDIS_STRING);
2020 if (o->encoding == REDIS_ENCODING_RAW) {
2021 return sdslen(o->ptr);
2022 } else {
2023 char buf[32];
2024
2025 return snprintf(buf,32,"%ld",(long)o->ptr);
2026 }
2027 }
2028
2029 /*============================ DB saving/loading ============================ */
2030
2031 static int rdbSaveType(FILE *fp, unsigned char type) {
2032 if (fwrite(&type,1,1,fp) == 0) return -1;
2033 return 0;
2034 }
2035
2036 static int rdbSaveTime(FILE *fp, time_t t) {
2037 int32_t t32 = (int32_t) t;
2038 if (fwrite(&t32,4,1,fp) == 0) return -1;
2039 return 0;
2040 }
2041
2042 /* check rdbLoadLen() comments for more info */
2043 static int rdbSaveLen(FILE *fp, uint32_t len) {
2044 unsigned char buf[2];
2045
2046 if (len < (1<<6)) {
2047 /* Save a 6 bit len */
2048 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2049 if (fwrite(buf,1,1,fp) == 0) return -1;
2050 } else if (len < (1<<14)) {
2051 /* Save a 14 bit len */
2052 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2053 buf[1] = len&0xFF;
2054 if (fwrite(buf,2,1,fp) == 0) return -1;
2055 } else {
2056 /* Save a 32 bit len */
2057 buf[0] = (REDIS_RDB_32BITLEN<<6);
2058 if (fwrite(buf,1,1,fp) == 0) return -1;
2059 len = htonl(len);
2060 if (fwrite(&len,4,1,fp) == 0) return -1;
2061 }
2062 return 0;
2063 }
2064
2065 /* String objects in the form "2391" "-100" without any space and with a
2066 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2067 * encoded as integers to save space */
2068 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2069 long long value;
2070 char *endptr, buf[32];
2071
2072 /* Check if it's possible to encode this value as a number */
2073 value = strtoll(s, &endptr, 10);
2074 if (endptr[0] != '\0') return 0;
2075 snprintf(buf,32,"%lld",value);
2076
2077 /* If the number converted back into a string is not identical
2078 * then it's not possible to encode the string as integer */
2079 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2080
2081 /* Finally check if it fits in our ranges */
2082 if (value >= -(1<<7) && value <= (1<<7)-1) {
2083 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2084 enc[1] = value&0xFF;
2085 return 2;
2086 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2087 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2088 enc[1] = value&0xFF;
2089 enc[2] = (value>>8)&0xFF;
2090 return 3;
2091 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2092 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2093 enc[1] = value&0xFF;
2094 enc[2] = (value>>8)&0xFF;
2095 enc[3] = (value>>16)&0xFF;
2096 enc[4] = (value>>24)&0xFF;
2097 return 5;
2098 } else {
2099 return 0;
2100 }
2101 }
2102
2103 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2104 unsigned int comprlen, outlen;
2105 unsigned char byte;
2106 void *out;
2107
2108 /* We require at least four bytes compression for this to be worth it */
2109 outlen = sdslen(obj->ptr)-4;
2110 if (outlen <= 0) return 0;
2111 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2112 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2113 if (comprlen == 0) {
2114 zfree(out);
2115 return 0;
2116 }
2117 /* Data compressed! Let's save it on disk */
2118 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2119 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2120 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2121 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2122 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2123 zfree(out);
2124 return comprlen;
2125
2126 writeerr:
2127 zfree(out);
2128 return -1;
2129 }
2130
2131 /* Save a string objet as [len][data] on disk. If the object is a string
2132 * representation of an integer value we try to safe it in a special form */
2133 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2134 size_t len;
2135 int enclen;
2136
2137 len = sdslen(obj->ptr);
2138
2139 /* Try integer encoding */
2140 if (len <= 11) {
2141 unsigned char buf[5];
2142 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2143 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2144 return 0;
2145 }
2146 }
2147
2148 /* Try LZF compression - under 20 bytes it's unable to compress even
2149 * aaaaaaaaaaaaaaaaaa so skip it */
2150 if (len > 20) {
2151 int retval;
2152
2153 retval = rdbSaveLzfStringObject(fp,obj);
2154 if (retval == -1) return -1;
2155 if (retval > 0) return 0;
2156 /* retval == 0 means data can't be compressed, save the old way */
2157 }
2158
2159 /* Store verbatim */
2160 if (rdbSaveLen(fp,len) == -1) return -1;
2161 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2162 return 0;
2163 }
2164
2165 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2166 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2167 int retval;
2168 robj *dec;
2169
2170 if (obj->encoding != REDIS_ENCODING_RAW) {
2171 dec = getDecodedObject(obj);
2172 retval = rdbSaveStringObjectRaw(fp,dec);
2173 decrRefCount(dec);
2174 return retval;
2175 } else {
2176 return rdbSaveStringObjectRaw(fp,obj);
2177 }
2178 }
2179
2180 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2181 static int rdbSave(char *filename) {
2182 dictIterator *di = NULL;
2183 dictEntry *de;
2184 FILE *fp;
2185 char tmpfile[256];
2186 int j;
2187 time_t now = time(NULL);
2188
2189 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2190 fp = fopen(tmpfile,"w");
2191 if (!fp) {
2192 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2193 return REDIS_ERR;
2194 }
2195 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2196 for (j = 0; j < server.dbnum; j++) {
2197 redisDb *db = server.db+j;
2198 dict *d = db->dict;
2199 if (dictSize(d) == 0) continue;
2200 di = dictGetIterator(d);
2201 if (!di) {
2202 fclose(fp);
2203 return REDIS_ERR;
2204 }
2205
2206 /* Write the SELECT DB opcode */
2207 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2208 if (rdbSaveLen(fp,j) == -1) goto werr;
2209
2210 /* Iterate this DB writing every entry */
2211 while((de = dictNext(di)) != NULL) {
2212 robj *key = dictGetEntryKey(de);
2213 robj *o = dictGetEntryVal(de);
2214 time_t expiretime = getExpire(db,key);
2215
2216 /* Save the expire time */
2217 if (expiretime != -1) {
2218 /* If this key is already expired skip it */
2219 if (expiretime < now) continue;
2220 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2221 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2222 }
2223 /* Save the key and associated value */
2224 if (rdbSaveType(fp,o->type) == -1) goto werr;
2225 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2226 if (o->type == REDIS_STRING) {
2227 /* Save a string value */
2228 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2229 } else if (o->type == REDIS_LIST) {
2230 /* Save a list value */
2231 list *list = o->ptr;
2232 listNode *ln;
2233
2234 listRewind(list);
2235 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2236 while((ln = listYield(list))) {
2237 robj *eleobj = listNodeValue(ln);
2238
2239 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2240 }
2241 } else if (o->type == REDIS_SET) {
2242 /* Save a set value */
2243 dict *set = o->ptr;
2244 dictIterator *di = dictGetIterator(set);
2245 dictEntry *de;
2246
2247 if (!set) oom("dictGetIteraotr");
2248 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2249 while((de = dictNext(di)) != NULL) {
2250 robj *eleobj = dictGetEntryKey(de);
2251
2252 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2253 }
2254 dictReleaseIterator(di);
2255 } else {
2256 assert(0 != 0);
2257 }
2258 }
2259 dictReleaseIterator(di);
2260 }
2261 /* EOF opcode */
2262 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2263
2264 /* Make sure data will not remain on the OS's output buffers */
2265 fflush(fp);
2266 fsync(fileno(fp));
2267 fclose(fp);
2268
2269 /* Use RENAME to make sure the DB file is changed atomically only
2270 * if the generate DB file is ok. */
2271 if (rename(tmpfile,filename) == -1) {
2272 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
2273 unlink(tmpfile);
2274 return REDIS_ERR;
2275 }
2276 redisLog(REDIS_NOTICE,"DB saved on disk");
2277 server.dirty = 0;
2278 server.lastsave = time(NULL);
2279 return REDIS_OK;
2280
2281 werr:
2282 fclose(fp);
2283 unlink(tmpfile);
2284 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2285 if (di) dictReleaseIterator(di);
2286 return REDIS_ERR;
2287 }
2288
2289 static int rdbSaveBackground(char *filename) {
2290 pid_t childpid;
2291
2292 if (server.bgsaveinprogress) return REDIS_ERR;
2293 if ((childpid = fork()) == 0) {
2294 /* Child */
2295 close(server.fd);
2296 if (rdbSave(filename) == REDIS_OK) {
2297 exit(0);
2298 } else {
2299 exit(1);
2300 }
2301 } else {
2302 /* Parent */
2303 if (childpid == -1) {
2304 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2305 strerror(errno));
2306 return REDIS_ERR;
2307 }
2308 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2309 server.bgsaveinprogress = 1;
2310 server.bgsavechildpid = childpid;
2311 return REDIS_OK;
2312 }
2313 return REDIS_OK; /* unreached */
2314 }
2315
2316 static void rdbRemoveTempFile(pid_t childpid) {
2317 char tmpfile[256];
2318
2319 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2320 unlink(tmpfile);
2321 }
2322
2323 static int rdbLoadType(FILE *fp) {
2324 unsigned char type;
2325 if (fread(&type,1,1,fp) == 0) return -1;
2326 return type;
2327 }
2328
2329 static time_t rdbLoadTime(FILE *fp) {
2330 int32_t t32;
2331 if (fread(&t32,4,1,fp) == 0) return -1;
2332 return (time_t) t32;
2333 }
2334
2335 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2336 * of this file for a description of how this are stored on disk.
2337 *
2338 * isencoded is set to 1 if the readed length is not actually a length but
2339 * an "encoding type", check the above comments for more info */
2340 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2341 unsigned char buf[2];
2342 uint32_t len;
2343
2344 if (isencoded) *isencoded = 0;
2345 if (rdbver == 0) {
2346 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2347 return ntohl(len);
2348 } else {
2349 int type;
2350
2351 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2352 type = (buf[0]&0xC0)>>6;
2353 if (type == REDIS_RDB_6BITLEN) {
2354 /* Read a 6 bit len */
2355 return buf[0]&0x3F;
2356 } else if (type == REDIS_RDB_ENCVAL) {
2357 /* Read a 6 bit len encoding type */
2358 if (isencoded) *isencoded = 1;
2359 return buf[0]&0x3F;
2360 } else if (type == REDIS_RDB_14BITLEN) {
2361 /* Read a 14 bit len */
2362 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2363 return ((buf[0]&0x3F)<<8)|buf[1];
2364 } else {
2365 /* Read a 32 bit len */
2366 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2367 return ntohl(len);
2368 }
2369 }
2370 }
2371
2372 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2373 unsigned char enc[4];
2374 long long val;
2375
2376 if (enctype == REDIS_RDB_ENC_INT8) {
2377 if (fread(enc,1,1,fp) == 0) return NULL;
2378 val = (signed char)enc[0];
2379 } else if (enctype == REDIS_RDB_ENC_INT16) {
2380 uint16_t v;
2381 if (fread(enc,2,1,fp) == 0) return NULL;
2382 v = enc[0]|(enc[1]<<8);
2383 val = (int16_t)v;
2384 } else if (enctype == REDIS_RDB_ENC_INT32) {
2385 uint32_t v;
2386 if (fread(enc,4,1,fp) == 0) return NULL;
2387 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2388 val = (int32_t)v;
2389 } else {
2390 val = 0; /* anti-warning */
2391 assert(0!=0);
2392 }
2393 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2394 }
2395
2396 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2397 unsigned int len, clen;
2398 unsigned char *c = NULL;
2399 sds val = NULL;
2400
2401 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2402 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2403 if ((c = zmalloc(clen)) == NULL) goto err;
2404 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2405 if (fread(c,clen,1,fp) == 0) goto err;
2406 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2407 zfree(c);
2408 return createObject(REDIS_STRING,val);
2409 err:
2410 zfree(c);
2411 sdsfree(val);
2412 return NULL;
2413 }
2414
2415 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2416 int isencoded;
2417 uint32_t len;
2418 sds val;
2419
2420 len = rdbLoadLen(fp,rdbver,&isencoded);
2421 if (isencoded) {
2422 switch(len) {
2423 case REDIS_RDB_ENC_INT8:
2424 case REDIS_RDB_ENC_INT16:
2425 case REDIS_RDB_ENC_INT32:
2426 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2427 case REDIS_RDB_ENC_LZF:
2428 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2429 default:
2430 assert(0!=0);
2431 }
2432 }
2433
2434 if (len == REDIS_RDB_LENERR) return NULL;
2435 val = sdsnewlen(NULL,len);
2436 if (len && fread(val,len,1,fp) == 0) {
2437 sdsfree(val);
2438 return NULL;
2439 }
2440 return tryObjectSharing(createObject(REDIS_STRING,val));
2441 }
2442
2443 static int rdbLoad(char *filename) {
2444 FILE *fp;
2445 robj *keyobj = NULL;
2446 uint32_t dbid;
2447 int type, retval, rdbver;
2448 dict *d = server.db[0].dict;
2449 redisDb *db = server.db+0;
2450 char buf[1024];
2451 time_t expiretime = -1, now = time(NULL);
2452
2453 fp = fopen(filename,"r");
2454 if (!fp) return REDIS_ERR;
2455 if (fread(buf,9,1,fp) == 0) goto eoferr;
2456 buf[9] = '\0';
2457 if (memcmp(buf,"REDIS",5) != 0) {
2458 fclose(fp);
2459 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2460 return REDIS_ERR;
2461 }
2462 rdbver = atoi(buf+5);
2463 if (rdbver > 1) {
2464 fclose(fp);
2465 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2466 return REDIS_ERR;
2467 }
2468 while(1) {
2469 robj *o;
2470
2471 /* Read type. */
2472 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2473 if (type == REDIS_EXPIRETIME) {
2474 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2475 /* We read the time so we need to read the object type again */
2476 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2477 }
2478 if (type == REDIS_EOF) break;
2479 /* Handle SELECT DB opcode as a special case */
2480 if (type == REDIS_SELECTDB) {
2481 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2482 goto eoferr;
2483 if (dbid >= (unsigned)server.dbnum) {
2484 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2485 exit(1);
2486 }
2487 db = server.db+dbid;
2488 d = db->dict;
2489 continue;
2490 }
2491 /* Read key */
2492 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2493
2494 if (type == REDIS_STRING) {
2495 /* Read string value */
2496 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2497 tryObjectEncoding(o);
2498 } else if (type == REDIS_LIST || type == REDIS_SET) {
2499 /* Read list/set value */
2500 uint32_t listlen;
2501
2502 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2503 goto eoferr;
2504 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2505 /* Load every single element of the list/set */
2506 while(listlen--) {
2507 robj *ele;
2508
2509 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2510 tryObjectEncoding(ele);
2511 if (type == REDIS_LIST) {
2512 if (!listAddNodeTail((list*)o->ptr,ele))
2513 oom("listAddNodeTail");
2514 } else {
2515 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2516 oom("dictAdd");
2517 }
2518 }
2519 } else {
2520 assert(0 != 0);
2521 }
2522 /* Add the new object in the hash table */
2523 retval = dictAdd(d,keyobj,o);
2524 if (retval == DICT_ERR) {
2525 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2526 exit(1);
2527 }
2528 /* Set the expire time if needed */
2529 if (expiretime != -1) {
2530 setExpire(db,keyobj,expiretime);
2531 /* Delete this key if already expired */
2532 if (expiretime < now) deleteKey(db,keyobj);
2533 expiretime = -1;
2534 }
2535 keyobj = o = NULL;
2536 }
2537 fclose(fp);
2538 return REDIS_OK;
2539
2540 eoferr: /* unexpected end of file is handled here with a fatal exit */
2541 if (keyobj) decrRefCount(keyobj);
2542 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2543 exit(1);
2544 return REDIS_ERR; /* Just to avoid warning */
2545 }
2546
2547 /*================================== Commands =============================== */
2548
2549 static void authCommand(redisClient *c) {
2550 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2551 c->authenticated = 1;
2552 addReply(c,shared.ok);
2553 } else {
2554 c->authenticated = 0;
2555 addReply(c,shared.err);
2556 }
2557 }
2558
2559 static void pingCommand(redisClient *c) {
2560 addReply(c,shared.pong);
2561 }
2562
2563 static void echoCommand(redisClient *c) {
2564 addReplyBulkLen(c,c->argv[1]);
2565 addReply(c,c->argv[1]);
2566 addReply(c,shared.crlf);
2567 }
2568
2569 /*=================================== Strings =============================== */
2570
2571 static void setGenericCommand(redisClient *c, int nx) {
2572 int retval;
2573
2574 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2575 if (retval == DICT_ERR) {
2576 if (!nx) {
2577 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2578 incrRefCount(c->argv[2]);
2579 } else {
2580 addReply(c,shared.czero);
2581 return;
2582 }
2583 } else {
2584 incrRefCount(c->argv[1]);
2585 incrRefCount(c->argv[2]);
2586 }
2587 server.dirty++;
2588 removeExpire(c->db,c->argv[1]);
2589 addReply(c, nx ? shared.cone : shared.ok);
2590 }
2591
2592 static void setCommand(redisClient *c) {
2593 setGenericCommand(c,0);
2594 }
2595
2596 static void setnxCommand(redisClient *c) {
2597 setGenericCommand(c,1);
2598 }
2599
2600 static void getCommand(redisClient *c) {
2601 robj *o = lookupKeyRead(c->db,c->argv[1]);
2602
2603 if (o == NULL) {
2604 addReply(c,shared.nullbulk);
2605 } else {
2606 if (o->type != REDIS_STRING) {
2607 addReply(c,shared.wrongtypeerr);
2608 } else {
2609 addReplyBulkLen(c,o);
2610 addReply(c,o);
2611 addReply(c,shared.crlf);
2612 }
2613 }
2614 }
2615
2616 static void getsetCommand(redisClient *c) {
2617 getCommand(c);
2618 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2619 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2620 } else {
2621 incrRefCount(c->argv[1]);
2622 }
2623 incrRefCount(c->argv[2]);
2624 server.dirty++;
2625 removeExpire(c->db,c->argv[1]);
2626 }
2627
2628 static void mgetCommand(redisClient *c) {
2629 int j;
2630
2631 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2632 for (j = 1; j < c->argc; j++) {
2633 robj *o = lookupKeyRead(c->db,c->argv[j]);
2634 if (o == NULL) {
2635 addReply(c,shared.nullbulk);
2636 } else {
2637 if (o->type != REDIS_STRING) {
2638 addReply(c,shared.nullbulk);
2639 } else {
2640 addReplyBulkLen(c,o);
2641 addReply(c,o);
2642 addReply(c,shared.crlf);
2643 }
2644 }
2645 }
2646 }
2647
2648 static void incrDecrCommand(redisClient *c, long long incr) {
2649 long long value;
2650 int retval;
2651 robj *o;
2652
2653 o = lookupKeyWrite(c->db,c->argv[1]);
2654 if (o == NULL) {
2655 value = 0;
2656 } else {
2657 if (o->type != REDIS_STRING) {
2658 value = 0;
2659 } else {
2660 char *eptr;
2661
2662 if (o->encoding == REDIS_ENCODING_RAW)
2663 value = strtoll(o->ptr, &eptr, 10);
2664 else if (o->encoding == REDIS_ENCODING_INT)
2665 value = (long)o->ptr;
2666 else
2667 assert(1 != 1);
2668 }
2669 }
2670
2671 value += incr;
2672 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2673 tryObjectEncoding(o);
2674 retval = dictAdd(c->db->dict,c->argv[1],o);
2675 if (retval == DICT_ERR) {
2676 dictReplace(c->db->dict,c->argv[1],o);
2677 removeExpire(c->db,c->argv[1]);
2678 } else {
2679 incrRefCount(c->argv[1]);
2680 }
2681 server.dirty++;
2682 addReply(c,shared.colon);
2683 addReply(c,o);
2684 addReply(c,shared.crlf);
2685 }
2686
2687 static void incrCommand(redisClient *c) {
2688 incrDecrCommand(c,1);
2689 }
2690
2691 static void decrCommand(redisClient *c) {
2692 incrDecrCommand(c,-1);
2693 }
2694
2695 static void incrbyCommand(redisClient *c) {
2696 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2697 incrDecrCommand(c,incr);
2698 }
2699
2700 static void decrbyCommand(redisClient *c) {
2701 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2702 incrDecrCommand(c,-incr);
2703 }
2704
2705 /* ========================= Type agnostic commands ========================= */
2706
2707 static void delCommand(redisClient *c) {
2708 int deleted = 0, j;
2709
2710 for (j = 1; j < c->argc; j++) {
2711 if (deleteKey(c->db,c->argv[j])) {
2712 server.dirty++;
2713 deleted++;
2714 }
2715 }
2716 switch(deleted) {
2717 case 0:
2718 addReply(c,shared.czero);
2719 break;
2720 case 1:
2721 addReply(c,shared.cone);
2722 break;
2723 default:
2724 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2725 break;
2726 }
2727 }
2728
2729 static void existsCommand(redisClient *c) {
2730 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2731 }
2732
2733 static void selectCommand(redisClient *c) {
2734 int id = atoi(c->argv[1]->ptr);
2735
2736 if (selectDb(c,id) == REDIS_ERR) {
2737 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2738 } else {
2739 addReply(c,shared.ok);
2740 }
2741 }
2742
2743 static void randomkeyCommand(redisClient *c) {
2744 dictEntry *de;
2745
2746 while(1) {
2747 de = dictGetRandomKey(c->db->dict);
2748 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2749 }
2750 if (de == NULL) {
2751 addReply(c,shared.plus);
2752 addReply(c,shared.crlf);
2753 } else {
2754 addReply(c,shared.plus);
2755 addReply(c,dictGetEntryKey(de));
2756 addReply(c,shared.crlf);
2757 }
2758 }
2759
2760 static void keysCommand(redisClient *c) {
2761 dictIterator *di;
2762 dictEntry *de;
2763 sds pattern = c->argv[1]->ptr;
2764 int plen = sdslen(pattern);
2765 int numkeys = 0, keyslen = 0;
2766 robj *lenobj = createObject(REDIS_STRING,NULL);
2767
2768 di = dictGetIterator(c->db->dict);
2769 if (!di) oom("dictGetIterator");
2770 addReply(c,lenobj);
2771 decrRefCount(lenobj);
2772 while((de = dictNext(di)) != NULL) {
2773 robj *keyobj = dictGetEntryKey(de);
2774
2775 sds key = keyobj->ptr;
2776 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2777 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2778 if (expireIfNeeded(c->db,keyobj) == 0) {
2779 if (numkeys != 0)
2780 addReply(c,shared.space);
2781 addReply(c,keyobj);
2782 numkeys++;
2783 keyslen += sdslen(key);
2784 }
2785 }
2786 }
2787 dictReleaseIterator(di);
2788 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2789 addReply(c,shared.crlf);
2790 }
2791
2792 static void dbsizeCommand(redisClient *c) {
2793 addReplySds(c,
2794 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2795 }
2796
2797 static void lastsaveCommand(redisClient *c) {
2798 addReplySds(c,
2799 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2800 }
2801
2802 static void typeCommand(redisClient *c) {
2803 robj *o;
2804 char *type;
2805
2806 o = lookupKeyRead(c->db,c->argv[1]);
2807 if (o == NULL) {
2808 type = "+none";
2809 } else {
2810 switch(o->type) {
2811 case REDIS_STRING: type = "+string"; break;
2812 case REDIS_LIST: type = "+list"; break;
2813 case REDIS_SET: type = "+set"; break;
2814 default: type = "unknown"; break;
2815 }
2816 }
2817 addReplySds(c,sdsnew(type));
2818 addReply(c,shared.crlf);
2819 }
2820
2821 static void saveCommand(redisClient *c) {
2822 if (server.bgsaveinprogress) {
2823 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2824 return;
2825 }
2826 if (rdbSave(server.dbfilename) == REDIS_OK) {
2827 addReply(c,shared.ok);
2828 } else {
2829 addReply(c,shared.err);
2830 }
2831 }
2832
2833 static void bgsaveCommand(redisClient *c) {
2834 if (server.bgsaveinprogress) {
2835 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2836 return;
2837 }
2838 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2839 addReply(c,shared.ok);
2840 } else {
2841 addReply(c,shared.err);
2842 }
2843 }
2844
2845 static void shutdownCommand(redisClient *c) {
2846 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2847 /* Kill the saving child if there is a background saving in progress.
2848 We want to avoid race conditions, for instance our saving child may
2849 overwrite the synchronous saving did by SHUTDOWN. */
2850 if (server.bgsaveinprogress) {
2851 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2852 kill(server.bgsavechildpid,SIGKILL);
2853 rdbRemoveTempFile(server.bgsavechildpid);
2854 }
2855 /* SYNC SAVE */
2856 if (rdbSave(server.dbfilename) == REDIS_OK) {
2857 if (server.daemonize)
2858 unlink(server.pidfile);
2859 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2860 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2861 exit(1);
2862 } else {
2863 /* Ooops.. error saving! The best we can do is to continue operating.
2864 * Note that if there was a background saving process, in the next
2865 * cron() Redis will be notified that the background saving aborted,
2866 * handling special stuff like slaves pending for synchronization... */
2867 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2868 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2869 }
2870 }
2871
2872 static void renameGenericCommand(redisClient *c, int nx) {
2873 robj *o;
2874
2875 /* To use the same key as src and dst is probably an error */
2876 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2877 addReply(c,shared.sameobjecterr);
2878 return;
2879 }
2880
2881 o = lookupKeyWrite(c->db,c->argv[1]);
2882 if (o == NULL) {
2883 addReply(c,shared.nokeyerr);
2884 return;
2885 }
2886 incrRefCount(o);
2887 deleteIfVolatile(c->db,c->argv[2]);
2888 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2889 if (nx) {
2890 decrRefCount(o);
2891 addReply(c,shared.czero);
2892 return;
2893 }
2894 dictReplace(c->db->dict,c->argv[2],o);
2895 } else {
2896 incrRefCount(c->argv[2]);
2897 }
2898 deleteKey(c->db,c->argv[1]);
2899 server.dirty++;
2900 addReply(c,nx ? shared.cone : shared.ok);
2901 }
2902
2903 static void renameCommand(redisClient *c) {
2904 renameGenericCommand(c,0);
2905 }
2906
2907 static void renamenxCommand(redisClient *c) {
2908 renameGenericCommand(c,1);
2909 }
2910
2911 static void moveCommand(redisClient *c) {
2912 robj *o;
2913 redisDb *src, *dst;
2914 int srcid;
2915
2916 /* Obtain source and target DB pointers */
2917 src = c->db;
2918 srcid = c->db->id;
2919 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2920 addReply(c,shared.outofrangeerr);
2921 return;
2922 }
2923 dst = c->db;
2924 selectDb(c,srcid); /* Back to the source DB */
2925
2926 /* If the user is moving using as target the same
2927 * DB as the source DB it is probably an error. */
2928 if (src == dst) {
2929 addReply(c,shared.sameobjecterr);
2930 return;
2931 }
2932
2933 /* Check if the element exists and get a reference */
2934 o = lookupKeyWrite(c->db,c->argv[1]);
2935 if (!o) {
2936 addReply(c,shared.czero);
2937 return;
2938 }
2939
2940 /* Try to add the element to the target DB */
2941 deleteIfVolatile(dst,c->argv[1]);
2942 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2943 addReply(c,shared.czero);
2944 return;
2945 }
2946 incrRefCount(c->argv[1]);
2947 incrRefCount(o);
2948
2949 /* OK! key moved, free the entry in the source DB */
2950 deleteKey(src,c->argv[1]);
2951 server.dirty++;
2952 addReply(c,shared.cone);
2953 }
2954
2955 /* =================================== Lists ================================ */
2956 static void pushGenericCommand(redisClient *c, int where) {
2957 robj *lobj;
2958 list *list;
2959
2960 lobj = lookupKeyWrite(c->db,c->argv[1]);
2961 if (lobj == NULL) {
2962 lobj = createListObject();
2963 list = lobj->ptr;
2964 if (where == REDIS_HEAD) {
2965 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2966 } else {
2967 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2968 }
2969 dictAdd(c->db->dict,c->argv[1],lobj);
2970 incrRefCount(c->argv[1]);
2971 incrRefCount(c->argv[2]);
2972 } else {
2973 if (lobj->type != REDIS_LIST) {
2974 addReply(c,shared.wrongtypeerr);
2975 return;
2976 }
2977 list = lobj->ptr;
2978 if (where == REDIS_HEAD) {
2979 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2980 } else {
2981 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2982 }
2983 incrRefCount(c->argv[2]);
2984 }
2985 server.dirty++;
2986 addReply(c,shared.ok);
2987 }
2988
2989 static void lpushCommand(redisClient *c) {
2990 pushGenericCommand(c,REDIS_HEAD);
2991 }
2992
2993 static void rpushCommand(redisClient *c) {
2994 pushGenericCommand(c,REDIS_TAIL);
2995 }
2996
2997 static void llenCommand(redisClient *c) {
2998 robj *o;
2999 list *l;
3000
3001 o = lookupKeyRead(c->db,c->argv[1]);
3002 if (o == NULL) {
3003 addReply(c,shared.czero);
3004 return;
3005 } else {
3006 if (o->type != REDIS_LIST) {
3007 addReply(c,shared.wrongtypeerr);
3008 } else {
3009 l = o->ptr;
3010 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3011 }
3012 }
3013 }
3014
3015 static void lindexCommand(redisClient *c) {
3016 robj *o;
3017 int index = atoi(c->argv[2]->ptr);
3018
3019 o = lookupKeyRead(c->db,c->argv[1]);
3020 if (o == NULL) {
3021 addReply(c,shared.nullbulk);
3022 } else {
3023 if (o->type != REDIS_LIST) {
3024 addReply(c,shared.wrongtypeerr);
3025 } else {
3026 list *list = o->ptr;
3027 listNode *ln;
3028
3029 ln = listIndex(list, index);
3030 if (ln == NULL) {
3031 addReply(c,shared.nullbulk);
3032 } else {
3033 robj *ele = listNodeValue(ln);
3034 addReplyBulkLen(c,ele);
3035 addReply(c,ele);
3036 addReply(c,shared.crlf);
3037 }
3038 }
3039 }
3040 }
3041
3042 static void lsetCommand(redisClient *c) {
3043 robj *o;
3044 int index = atoi(c->argv[2]->ptr);
3045
3046 o = lookupKeyWrite(c->db,c->argv[1]);
3047 if (o == NULL) {
3048 addReply(c,shared.nokeyerr);
3049 } else {
3050 if (o->type != REDIS_LIST) {
3051 addReply(c,shared.wrongtypeerr);
3052 } else {
3053 list *list = o->ptr;
3054 listNode *ln;
3055
3056 ln = listIndex(list, index);
3057 if (ln == NULL) {
3058 addReply(c,shared.outofrangeerr);
3059 } else {
3060 robj *ele = listNodeValue(ln);
3061
3062 decrRefCount(ele);
3063 listNodeValue(ln) = c->argv[3];
3064 incrRefCount(c->argv[3]);
3065 addReply(c,shared.ok);
3066 server.dirty++;
3067 }
3068 }
3069 }
3070 }
3071
3072 static void popGenericCommand(redisClient *c, int where) {
3073 robj *o;
3074
3075 o = lookupKeyWrite(c->db,c->argv[1]);
3076 if (o == NULL) {
3077 addReply(c,shared.nullbulk);
3078 } else {
3079 if (o->type != REDIS_LIST) {
3080 addReply(c,shared.wrongtypeerr);
3081 } else {
3082 list *list = o->ptr;
3083 listNode *ln;
3084
3085 if (where == REDIS_HEAD)
3086 ln = listFirst(list);
3087 else
3088 ln = listLast(list);
3089
3090 if (ln == NULL) {
3091 addReply(c,shared.nullbulk);
3092 } else {
3093 robj *ele = listNodeValue(ln);
3094 addReplyBulkLen(c,ele);
3095 addReply(c,ele);
3096 addReply(c,shared.crlf);
3097 listDelNode(list,ln);
3098 server.dirty++;
3099 }
3100 }
3101 }
3102 }
3103
3104 static void lpopCommand(redisClient *c) {
3105 popGenericCommand(c,REDIS_HEAD);
3106 }
3107
3108 static void rpopCommand(redisClient *c) {
3109 popGenericCommand(c,REDIS_TAIL);
3110 }
3111
3112 static void lrangeCommand(redisClient *c) {
3113 robj *o;
3114 int start = atoi(c->argv[2]->ptr);
3115 int end = atoi(c->argv[3]->ptr);
3116
3117 o = lookupKeyRead(c->db,c->argv[1]);
3118 if (o == NULL) {
3119 addReply(c,shared.nullmultibulk);
3120 } else {
3121 if (o->type != REDIS_LIST) {
3122 addReply(c,shared.wrongtypeerr);
3123 } else {
3124 list *list = o->ptr;
3125 listNode *ln;
3126 int llen = listLength(list);
3127 int rangelen, j;
3128 robj *ele;
3129
3130 /* convert negative indexes */
3131 if (start < 0) start = llen+start;
3132 if (end < 0) end = llen+end;
3133 if (start < 0) start = 0;
3134 if (end < 0) end = 0;
3135
3136 /* indexes sanity checks */
3137 if (start > end || start >= llen) {
3138 /* Out of range start or start > end result in empty list */
3139 addReply(c,shared.emptymultibulk);
3140 return;
3141 }
3142 if (end >= llen) end = llen-1;
3143 rangelen = (end-start)+1;
3144
3145 /* Return the result in form of a multi-bulk reply */
3146 ln = listIndex(list, start);
3147 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3148 for (j = 0; j < rangelen; j++) {
3149 ele = listNodeValue(ln);
3150 addReplyBulkLen(c,ele);
3151 addReply(c,ele);
3152 addReply(c,shared.crlf);
3153 ln = ln->next;
3154 }
3155 }
3156 }
3157 }
3158
3159 static void ltrimCommand(redisClient *c) {
3160 robj *o;
3161 int start = atoi(c->argv[2]->ptr);
3162 int end = atoi(c->argv[3]->ptr);
3163
3164 o = lookupKeyWrite(c->db,c->argv[1]);
3165 if (o == NULL) {
3166 addReply(c,shared.nokeyerr);
3167 } else {
3168 if (o->type != REDIS_LIST) {
3169 addReply(c,shared.wrongtypeerr);
3170 } else {
3171 list *list = o->ptr;
3172 listNode *ln;
3173 int llen = listLength(list);
3174 int j, ltrim, rtrim;
3175
3176 /* convert negative indexes */
3177 if (start < 0) start = llen+start;
3178 if (end < 0) end = llen+end;
3179 if (start < 0) start = 0;
3180 if (end < 0) end = 0;
3181
3182 /* indexes sanity checks */
3183 if (start > end || start >= llen) {
3184 /* Out of range start or start > end result in empty list */
3185 ltrim = llen;
3186 rtrim = 0;
3187 } else {
3188 if (end >= llen) end = llen-1;
3189 ltrim = start;
3190 rtrim = llen-end-1;
3191 }
3192
3193 /* Remove list elements to perform the trim */
3194 for (j = 0; j < ltrim; j++) {
3195 ln = listFirst(list);
3196 listDelNode(list,ln);
3197 }
3198 for (j = 0; j < rtrim; j++) {
3199 ln = listLast(list);
3200 listDelNode(list,ln);
3201 }
3202 server.dirty++;
3203 addReply(c,shared.ok);
3204 }
3205 }
3206 }
3207
3208 static void lremCommand(redisClient *c) {
3209 robj *o;
3210
3211 o = lookupKeyWrite(c->db,c->argv[1]);
3212 if (o == NULL) {
3213 addReply(c,shared.czero);
3214 } else {
3215 if (o->type != REDIS_LIST) {
3216 addReply(c,shared.wrongtypeerr);
3217 } else {
3218 list *list = o->ptr;
3219 listNode *ln, *next;
3220 int toremove = atoi(c->argv[2]->ptr);
3221 int removed = 0;
3222 int fromtail = 0;
3223
3224 if (toremove < 0) {
3225 toremove = -toremove;
3226 fromtail = 1;
3227 }
3228 ln = fromtail ? list->tail : list->head;
3229 while (ln) {
3230 robj *ele = listNodeValue(ln);
3231
3232 next = fromtail ? ln->prev : ln->next;
3233 if (compareStringObjects(ele,c->argv[3]) == 0) {
3234 listDelNode(list,ln);
3235 server.dirty++;
3236 removed++;
3237 if (toremove && removed == toremove) break;
3238 }
3239 ln = next;
3240 }
3241 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3242 }
3243 }
3244 }
3245
3246 /* ==================================== Sets ================================ */
3247
3248 static void saddCommand(redisClient *c) {
3249 robj *set;
3250
3251 set = lookupKeyWrite(c->db,c->argv[1]);
3252 if (set == NULL) {
3253 set = createSetObject();
3254 dictAdd(c->db->dict,c->argv[1],set);
3255 incrRefCount(c->argv[1]);
3256 } else {
3257 if (set->type != REDIS_SET) {
3258 addReply(c,shared.wrongtypeerr);
3259 return;
3260 }
3261 }
3262 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3263 incrRefCount(c->argv[2]);
3264 server.dirty++;
3265 addReply(c,shared.cone);
3266 } else {
3267 addReply(c,shared.czero);
3268 }
3269 }
3270
3271 static void sremCommand(redisClient *c) {
3272 robj *set;
3273
3274 set = lookupKeyWrite(c->db,c->argv[1]);
3275 if (set == NULL) {
3276 addReply(c,shared.czero);
3277 } else {
3278 if (set->type != REDIS_SET) {
3279 addReply(c,shared.wrongtypeerr);
3280 return;
3281 }
3282 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3283 server.dirty++;
3284 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3285 addReply(c,shared.cone);
3286 } else {
3287 addReply(c,shared.czero);
3288 }
3289 }
3290 }
3291
3292 static void smoveCommand(redisClient *c) {
3293 robj *srcset, *dstset;
3294
3295 srcset = lookupKeyWrite(c->db,c->argv[1]);
3296 dstset = lookupKeyWrite(c->db,c->argv[2]);
3297
3298 /* If the source key does not exist return 0, if it's of the wrong type
3299 * raise an error */
3300 if (srcset == NULL || srcset->type != REDIS_SET) {
3301 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3302 return;
3303 }
3304 /* Error if the destination key is not a set as well */
3305 if (dstset && dstset->type != REDIS_SET) {
3306 addReply(c,shared.wrongtypeerr);
3307 return;
3308 }
3309 /* Remove the element from the source set */
3310 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3311 /* Key not found in the src set! return zero */
3312 addReply(c,shared.czero);
3313 return;
3314 }
3315 server.dirty++;
3316 /* Add the element to the destination set */
3317 if (!dstset) {
3318 dstset = createSetObject();
3319 dictAdd(c->db->dict,c->argv[2],dstset);
3320 incrRefCount(c->argv[2]);
3321 }
3322 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3323 incrRefCount(c->argv[3]);
3324 addReply(c,shared.cone);
3325 }
3326
3327 static void sismemberCommand(redisClient *c) {
3328 robj *set;
3329
3330 set = lookupKeyRead(c->db,c->argv[1]);
3331 if (set == NULL) {
3332 addReply(c,shared.czero);
3333 } else {
3334 if (set->type != REDIS_SET) {
3335 addReply(c,shared.wrongtypeerr);
3336 return;
3337 }
3338 if (dictFind(set->ptr,c->argv[2]))
3339 addReply(c,shared.cone);
3340 else
3341 addReply(c,shared.czero);
3342 }
3343 }
3344
3345 static void scardCommand(redisClient *c) {
3346 robj *o;
3347 dict *s;
3348
3349 o = lookupKeyRead(c->db,c->argv[1]);
3350 if (o == NULL) {
3351 addReply(c,shared.czero);
3352 return;
3353 } else {
3354 if (o->type != REDIS_SET) {
3355 addReply(c,shared.wrongtypeerr);
3356 } else {
3357 s = o->ptr;
3358 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3359 dictSize(s)));
3360 }
3361 }
3362 }
3363
3364 static void spopCommand(redisClient *c) {
3365 robj *set;
3366 dictEntry *de;
3367
3368 set = lookupKeyWrite(c->db,c->argv[1]);
3369 if (set == NULL) {
3370 addReply(c,shared.nullbulk);
3371 } else {
3372 if (set->type != REDIS_SET) {
3373 addReply(c,shared.wrongtypeerr);
3374 return;
3375 }
3376 de = dictGetRandomKey(set->ptr);
3377 if (de == NULL) {
3378 addReply(c,shared.nullbulk);
3379 } else {
3380 robj *ele = dictGetEntryKey(de);
3381
3382 addReplyBulkLen(c,ele);
3383 addReply(c,ele);
3384 addReply(c,shared.crlf);
3385 dictDelete(set->ptr,ele);
3386 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3387 server.dirty++;
3388 }
3389 }
3390 }
3391
3392 static void srandmemberCommand(redisClient *c) {
3393 robj *set;
3394 dictEntry *de;
3395
3396 set = lookupKeyRead(c->db,c->argv[1]);
3397 if (set == NULL) {
3398 addReply(c,shared.nullbulk);
3399 } else {
3400 if (set->type != REDIS_SET) {
3401 addReply(c,shared.wrongtypeerr);
3402 return;
3403 }
3404 de = dictGetRandomKey(set->ptr);
3405 if (de == NULL) {
3406 addReply(c,shared.nullbulk);
3407 } else {
3408 robj *ele = dictGetEntryKey(de);
3409
3410 addReplyBulkLen(c,ele);
3411 addReply(c,ele);
3412 addReply(c,shared.crlf);
3413 }
3414 }
3415 }
3416
3417 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3418 dict **d1 = (void*) s1, **d2 = (void*) s2;
3419
3420 return dictSize(*d1)-dictSize(*d2);
3421 }
3422
3423 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3424 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3425 dictIterator *di;
3426 dictEntry *de;
3427 robj *lenobj = NULL, *dstset = NULL;
3428 int j, cardinality = 0;
3429
3430 if (!dv) oom("sinterGenericCommand");
3431 for (j = 0; j < setsnum; j++) {
3432 robj *setobj;
3433
3434 setobj = dstkey ?
3435 lookupKeyWrite(c->db,setskeys[j]) :
3436 lookupKeyRead(c->db,setskeys[j]);
3437 if (!setobj) {
3438 zfree(dv);
3439 if (dstkey) {
3440 deleteKey(c->db,dstkey);
3441 addReply(c,shared.ok);
3442 } else {
3443 addReply(c,shared.nullmultibulk);
3444 }
3445 return;
3446 }
3447 if (setobj->type != REDIS_SET) {
3448 zfree(dv);
3449 addReply(c,shared.wrongtypeerr);
3450 return;
3451 }
3452 dv[j] = setobj->ptr;
3453 }
3454 /* Sort sets from the smallest to largest, this will improve our
3455 * algorithm's performace */
3456 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3457
3458 /* The first thing we should output is the total number of elements...
3459 * since this is a multi-bulk write, but at this stage we don't know
3460 * the intersection set size, so we use a trick, append an empty object
3461 * to the output list and save the pointer to later modify it with the
3462 * right length */
3463 if (!dstkey) {
3464 lenobj = createObject(REDIS_STRING,NULL);
3465 addReply(c,lenobj);
3466 decrRefCount(lenobj);
3467 } else {
3468 /* If we have a target key where to store the resulting set
3469 * create this key with an empty set inside */
3470 dstset = createSetObject();
3471 }
3472
3473 /* Iterate all the elements of the first (smallest) set, and test
3474 * the element against all the other sets, if at least one set does
3475 * not include the element it is discarded */
3476 di = dictGetIterator(dv[0]);
3477 if (!di) oom("dictGetIterator");
3478
3479 while((de = dictNext(di)) != NULL) {
3480 robj *ele;
3481
3482 for (j = 1; j < setsnum; j++)
3483 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3484 if (j != setsnum)
3485 continue; /* at least one set does not contain the member */
3486 ele = dictGetEntryKey(de);
3487 if (!dstkey) {
3488 addReplyBulkLen(c,ele);
3489 addReply(c,ele);
3490 addReply(c,shared.crlf);
3491 cardinality++;
3492 } else {
3493 dictAdd(dstset->ptr,ele,NULL);
3494 incrRefCount(ele);
3495 }
3496 }
3497 dictReleaseIterator(di);
3498
3499 if (dstkey) {
3500 /* Store the resulting set into the target */
3501 deleteKey(c->db,dstkey);
3502 dictAdd(c->db->dict,dstkey,dstset);
3503 incrRefCount(dstkey);
3504 }
3505
3506 if (!dstkey) {
3507 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3508 } else {
3509 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3510 dictSize((dict*)dstset->ptr)));
3511 server.dirty++;
3512 }
3513 zfree(dv);
3514 }
3515
3516 static void sinterCommand(redisClient *c) {
3517 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3518 }
3519
3520 static void sinterstoreCommand(redisClient *c) {
3521 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3522 }
3523
3524 #define REDIS_OP_UNION 0
3525 #define REDIS_OP_DIFF 1
3526
3527 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3528 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3529 dictIterator *di;
3530 dictEntry *de;
3531 robj *dstset = NULL;
3532 int j, cardinality = 0;
3533
3534 if (!dv) oom("sunionDiffGenericCommand");
3535 for (j = 0; j < setsnum; j++) {
3536 robj *setobj;
3537
3538 setobj = dstkey ?
3539 lookupKeyWrite(c->db,setskeys[j]) :
3540 lookupKeyRead(c->db,setskeys[j]);
3541 if (!setobj) {
3542 dv[j] = NULL;
3543 continue;
3544 }
3545 if (setobj->type != REDIS_SET) {
3546 zfree(dv);
3547 addReply(c,shared.wrongtypeerr);
3548 return;
3549 }
3550 dv[j] = setobj->ptr;
3551 }
3552
3553 /* We need a temp set object to store our union. If the dstkey
3554 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3555 * this set object will be the resulting object to set into the target key*/
3556 dstset = createSetObject();
3557
3558 /* Iterate all the elements of all the sets, add every element a single
3559 * time to the result set */
3560 for (j = 0; j < setsnum; j++) {
3561 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3562 if (!dv[j]) continue; /* non existing keys are like empty sets */
3563
3564 di = dictGetIterator(dv[j]);
3565 if (!di) oom("dictGetIterator");
3566
3567 while((de = dictNext(di)) != NULL) {
3568 robj *ele;
3569
3570 /* dictAdd will not add the same element multiple times */
3571 ele = dictGetEntryKey(de);
3572 if (op == REDIS_OP_UNION || j == 0) {
3573 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3574 incrRefCount(ele);
3575 cardinality++;
3576 }
3577 } else if (op == REDIS_OP_DIFF) {
3578 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3579 cardinality--;
3580 }
3581 }
3582 }
3583 dictReleaseIterator(di);
3584
3585 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3586 }
3587
3588 /* Output the content of the resulting set, if not in STORE mode */
3589 if (!dstkey) {
3590 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3591 di = dictGetIterator(dstset->ptr);
3592 if (!di) oom("dictGetIterator");
3593 while((de = dictNext(di)) != NULL) {
3594 robj *ele;
3595
3596 ele = dictGetEntryKey(de);
3597 addReplyBulkLen(c,ele);
3598 addReply(c,ele);
3599 addReply(c,shared.crlf);
3600 }
3601 dictReleaseIterator(di);
3602 } else {
3603 /* If we have a target key where to store the resulting set
3604 * create this key with the result set inside */
3605 deleteKey(c->db,dstkey);
3606 dictAdd(c->db->dict,dstkey,dstset);
3607 incrRefCount(dstkey);
3608 }
3609
3610 /* Cleanup */
3611 if (!dstkey) {
3612 decrRefCount(dstset);
3613 } else {
3614 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3615 dictSize((dict*)dstset->ptr)));
3616 server.dirty++;
3617 }
3618 zfree(dv);
3619 }
3620
3621 static void sunionCommand(redisClient *c) {
3622 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3623 }
3624
3625 static void sunionstoreCommand(redisClient *c) {
3626 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3627 }
3628
3629 static void sdiffCommand(redisClient *c) {
3630 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3631 }
3632
3633 static void sdiffstoreCommand(redisClient *c) {
3634 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3635 }
3636
3637 static void flushdbCommand(redisClient *c) {
3638 server.dirty += dictSize(c->db->dict);
3639 dictEmpty(c->db->dict);
3640 dictEmpty(c->db->expires);
3641 addReply(c,shared.ok);
3642 }
3643
3644 static void flushallCommand(redisClient *c) {
3645 server.dirty += emptyDb();
3646 addReply(c,shared.ok);
3647 rdbSave(server.dbfilename);
3648 server.dirty++;
3649 }
3650
3651 static redisSortOperation *createSortOperation(int type, robj *pattern) {
3652 redisSortOperation *so = zmalloc(sizeof(*so));
3653 if (!so) oom("createSortOperation");
3654 so->type = type;
3655 so->pattern = pattern;
3656 return so;
3657 }
3658
3659 /* Return the value associated to the key with a name obtained
3660 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3661 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3662 char *p;
3663 sds spat, ssub;
3664 robj keyobj;
3665 int prefixlen, sublen, postfixlen;
3666 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3667 struct {
3668 long len;
3669 long free;
3670 char buf[REDIS_SORTKEY_MAX+1];
3671 } keyname;
3672
3673 if (subst->encoding == REDIS_ENCODING_RAW)
3674 incrRefCount(subst);
3675 else {
3676 subst = getDecodedObject(subst);
3677 }
3678
3679 spat = pattern->ptr;
3680 ssub = subst->ptr;
3681 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3682 p = strchr(spat,'*');
3683 if (!p) return NULL;
3684
3685 prefixlen = p-spat;
3686 sublen = sdslen(ssub);
3687 postfixlen = sdslen(spat)-(prefixlen+1);
3688 memcpy(keyname.buf,spat,prefixlen);
3689 memcpy(keyname.buf+prefixlen,ssub,sublen);
3690 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3691 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3692 keyname.len = prefixlen+sublen+postfixlen;
3693
3694 keyobj.refcount = 1;
3695 keyobj.type = REDIS_STRING;
3696 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3697
3698 decrRefCount(subst);
3699
3700 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3701 return lookupKeyRead(db,&keyobj);
3702 }
3703
3704 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3705 * the additional parameter is not standard but a BSD-specific we have to
3706 * pass sorting parameters via the global 'server' structure */
3707 static int sortCompare(const void *s1, const void *s2) {
3708 const redisSortObject *so1 = s1, *so2 = s2;
3709 int cmp;
3710
3711 if (!server.sort_alpha) {
3712 /* Numeric sorting. Here it's trivial as we precomputed scores */
3713 if (so1->u.score > so2->u.score) {
3714 cmp = 1;
3715 } else if (so1->u.score < so2->u.score) {
3716 cmp = -1;
3717 } else {
3718 cmp = 0;
3719 }
3720 } else {
3721 /* Alphanumeric sorting */
3722 if (server.sort_bypattern) {
3723 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3724 /* At least one compare object is NULL */
3725 if (so1->u.cmpobj == so2->u.cmpobj)
3726 cmp = 0;
3727 else if (so1->u.cmpobj == NULL)
3728 cmp = -1;
3729 else
3730 cmp = 1;
3731 } else {
3732 /* We have both the objects, use strcoll */
3733 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3734 }
3735 } else {
3736 /* Compare elements directly */
3737 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
3738 so2->obj->encoding == REDIS_ENCODING_RAW) {
3739 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3740 } else {
3741 robj *dec1, *dec2;
3742
3743 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
3744 so1->obj : getDecodedObject(so1->obj);
3745 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
3746 so2->obj : getDecodedObject(so2->obj);
3747 cmp = strcoll(dec1->ptr,dec2->ptr);
3748 if (dec1 != so1->obj) decrRefCount(dec1);
3749 if (dec2 != so2->obj) decrRefCount(dec2);
3750 }
3751 }
3752 }
3753 return server.sort_desc ? -cmp : cmp;
3754 }
3755
3756 /* The SORT command is the most complex command in Redis. Warning: this code
3757 * is optimized for speed and a bit less for readability */
3758 static void sortCommand(redisClient *c) {
3759 list *operations;
3760 int outputlen = 0;
3761 int desc = 0, alpha = 0;
3762 int limit_start = 0, limit_count = -1, start, end;
3763 int j, dontsort = 0, vectorlen;
3764 int getop = 0; /* GET operation counter */
3765 robj *sortval, *sortby = NULL;
3766 redisSortObject *vector; /* Resulting vector to sort */
3767
3768 /* Lookup the key to sort. It must be of the right types */
3769 sortval = lookupKeyRead(c->db,c->argv[1]);
3770 if (sortval == NULL) {
3771 addReply(c,shared.nokeyerr);
3772 return;
3773 }
3774 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3775 addReply(c,shared.wrongtypeerr);
3776 return;
3777 }
3778
3779 /* Create a list of operations to perform for every sorted element.
3780 * Operations can be GET/DEL/INCR/DECR */
3781 operations = listCreate();
3782 listSetFreeMethod(operations,zfree);
3783 j = 2;
3784
3785 /* Now we need to protect sortval incrementing its count, in the future
3786 * SORT may have options able to overwrite/delete keys during the sorting
3787 * and the sorted key itself may get destroied */
3788 incrRefCount(sortval);
3789
3790 /* The SORT command has an SQL-alike syntax, parse it */
3791 while(j < c->argc) {
3792 int leftargs = c->argc-j-1;
3793 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3794 desc = 0;
3795 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3796 desc = 1;
3797 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3798 alpha = 1;
3799 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3800 limit_start = atoi(c->argv[j+1]->ptr);
3801 limit_count = atoi(c->argv[j+2]->ptr);
3802 j+=2;
3803 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3804 sortby = c->argv[j+1];
3805 /* If the BY pattern does not contain '*', i.e. it is constant,
3806 * we don't need to sort nor to lookup the weight keys. */
3807 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3808 j++;
3809 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3810 listAddNodeTail(operations,createSortOperation(
3811 REDIS_SORT_GET,c->argv[j+1]));
3812 getop++;
3813 j++;
3814 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3815 listAddNodeTail(operations,createSortOperation(
3816 REDIS_SORT_DEL,c->argv[j+1]));
3817 j++;
3818 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3819 listAddNodeTail(operations,createSortOperation(
3820 REDIS_SORT_INCR,c->argv[j+1]));
3821 j++;
3822 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3823 listAddNodeTail(operations,createSortOperation(
3824 REDIS_SORT_DECR,c->argv[j+1]));
3825 j++;
3826 } else {
3827 decrRefCount(sortval);
3828 listRelease(operations);
3829 addReply(c,shared.syntaxerr);
3830 return;
3831 }
3832 j++;
3833 }
3834
3835 /* Load the sorting vector with all the objects to sort */
3836 vectorlen = (sortval->type == REDIS_LIST) ?
3837 listLength((list*)sortval->ptr) :
3838 dictSize((dict*)sortval->ptr);
3839 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3840 if (!vector) oom("allocating objects vector for SORT");
3841 j = 0;
3842 if (sortval->type == REDIS_LIST) {
3843 list *list = sortval->ptr;
3844 listNode *ln;
3845
3846 listRewind(list);
3847 while((ln = listYield(list))) {
3848 robj *ele = ln->value;
3849 vector[j].obj = ele;
3850 vector[j].u.score = 0;
3851 vector[j].u.cmpobj = NULL;
3852 j++;
3853 }
3854 } else {
3855 dict *set = sortval->ptr;
3856 dictIterator *di;
3857 dictEntry *setele;
3858
3859 di = dictGetIterator(set);
3860 if (!di) oom("dictGetIterator");
3861 while((setele = dictNext(di)) != NULL) {
3862 vector[j].obj = dictGetEntryKey(setele);
3863 vector[j].u.score = 0;
3864 vector[j].u.cmpobj = NULL;
3865 j++;
3866 }
3867 dictReleaseIterator(di);
3868 }
3869 assert(j == vectorlen);
3870
3871 /* Now it's time to load the right scores in the sorting vector */
3872 if (dontsort == 0) {
3873 for (j = 0; j < vectorlen; j++) {
3874 if (sortby) {
3875 robj *byval;
3876
3877 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3878 if (!byval || byval->type != REDIS_STRING) continue;
3879 if (alpha) {
3880 if (byval->encoding == REDIS_ENCODING_RAW) {
3881 vector[j].u.cmpobj = byval;
3882 incrRefCount(byval);
3883 } else {
3884 vector[j].u.cmpobj = getDecodedObject(byval);
3885 }
3886 } else {
3887 if (byval->encoding == REDIS_ENCODING_RAW) {
3888 vector[j].u.score = strtod(byval->ptr,NULL);
3889 } else {
3890 if (byval->encoding == REDIS_ENCODING_INT) {
3891 vector[j].u.score = (long)byval->ptr;
3892 } else
3893 assert(1 != 1);
3894 }
3895 }
3896 } else {
3897 if (!alpha) {
3898 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
3899 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3900 else {
3901 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
3902 vector[j].u.score = (long) vector[j].obj->ptr;
3903 else
3904 assert(1 != 1);
3905 }
3906 }
3907 }
3908 }
3909 }
3910
3911 /* We are ready to sort the vector... perform a bit of sanity check
3912 * on the LIMIT option too. We'll use a partial version of quicksort. */
3913 start = (limit_start < 0) ? 0 : limit_start;
3914 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3915 if (start >= vectorlen) {
3916 start = vectorlen-1;
3917 end = vectorlen-2;
3918 }
3919 if (end >= vectorlen) end = vectorlen-1;
3920
3921 if (dontsort == 0) {
3922 server.sort_desc = desc;
3923 server.sort_alpha = alpha;
3924 server.sort_bypattern = sortby ? 1 : 0;
3925 if (sortby && (start != 0 || end != vectorlen-1))
3926 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3927 else
3928 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3929 }
3930
3931 /* Send command output to the output buffer, performing the specified
3932 * GET/DEL/INCR/DECR operations if any. */
3933 outputlen = getop ? getop*(end-start+1) : end-start+1;
3934 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3935 for (j = start; j <= end; j++) {
3936 listNode *ln;
3937 if (!getop) {
3938 addReplyBulkLen(c,vector[j].obj);
3939 addReply(c,vector[j].obj);
3940 addReply(c,shared.crlf);
3941 }
3942 listRewind(operations);
3943 while((ln = listYield(operations))) {
3944 redisSortOperation *sop = ln->value;
3945 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3946 vector[j].obj);
3947
3948 if (sop->type == REDIS_SORT_GET) {
3949 if (!val || val->type != REDIS_STRING) {
3950 addReply(c,shared.nullbulk);
3951 } else {
3952 addReplyBulkLen(c,val);
3953 addReply(c,val);
3954 addReply(c,shared.crlf);
3955 }
3956 } else if (sop->type == REDIS_SORT_DEL) {
3957 /* TODO */
3958 }
3959 }
3960 }
3961
3962 /* Cleanup */
3963 decrRefCount(sortval);
3964 listRelease(operations);
3965 for (j = 0; j < vectorlen; j++) {
3966 if (sortby && alpha && vector[j].u.cmpobj)
3967 decrRefCount(vector[j].u.cmpobj);
3968 }
3969 zfree(vector);
3970 }
3971
3972 static void infoCommand(redisClient *c) {
3973 sds info;
3974 time_t uptime = time(NULL)-server.stat_starttime;
3975 int j;
3976
3977 info = sdscatprintf(sdsempty(),
3978 "redis_version:%s\r\n"
3979 "arch_bits:%s\r\n"
3980 "uptime_in_seconds:%d\r\n"
3981 "uptime_in_days:%d\r\n"
3982 "connected_clients:%d\r\n"
3983 "connected_slaves:%d\r\n"
3984 "used_memory:%zu\r\n"
3985 "changes_since_last_save:%lld\r\n"
3986 "bgsave_in_progress:%d\r\n"
3987 "last_save_time:%d\r\n"
3988 "total_connections_received:%lld\r\n"
3989 "total_commands_processed:%lld\r\n"
3990 "role:%s\r\n"
3991 ,REDIS_VERSION,
3992 (sizeof(long) == 8) ? "64" : "32",
3993 uptime,
3994 uptime/(3600*24),
3995 listLength(server.clients)-listLength(server.slaves),
3996 listLength(server.slaves),
3997 server.usedmemory,
3998 server.dirty,
3999 server.bgsaveinprogress,
4000 server.lastsave,
4001 server.stat_numconnections,
4002 server.stat_numcommands,
4003 server.masterhost == NULL ? "master" : "slave"
4004 );
4005 if (server.masterhost) {
4006 info = sdscatprintf(info,
4007 "master_host:%s\r\n"
4008 "master_port:%d\r\n"
4009 "master_link_status:%s\r\n"
4010 "master_last_io_seconds_ago:%d\r\n"
4011 ,server.masterhost,
4012 server.masterport,
4013 (server.replstate == REDIS_REPL_CONNECTED) ?
4014 "up" : "down",
4015 (int)(time(NULL)-server.master->lastinteraction)
4016 );
4017 }
4018 for (j = 0; j < server.dbnum; j++) {
4019 long long keys, vkeys;
4020
4021 keys = dictSize(server.db[j].dict);
4022 vkeys = dictSize(server.db[j].expires);
4023 if (keys || vkeys) {
4024 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4025 j, keys, vkeys);
4026 }
4027 }
4028 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4029 addReplySds(c,info);
4030 addReply(c,shared.crlf);
4031 }
4032
4033 static void monitorCommand(redisClient *c) {
4034 /* ignore MONITOR if aleady slave or in monitor mode */
4035 if (c->flags & REDIS_SLAVE) return;
4036
4037 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4038 c->slaveseldb = 0;
4039 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
4040 addReply(c,shared.ok);
4041 }
4042
4043 /* ================================= Expire ================================= */
4044 static int removeExpire(redisDb *db, robj *key) {
4045 if (dictDelete(db->expires,key) == DICT_OK) {
4046 return 1;
4047 } else {
4048 return 0;
4049 }
4050 }
4051
4052 static int setExpire(redisDb *db, robj *key, time_t when) {
4053 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4054 return 0;
4055 } else {
4056 incrRefCount(key);
4057 return 1;
4058 }
4059 }
4060
4061 /* Return the expire time of the specified key, or -1 if no expire
4062 * is associated with this key (i.e. the key is non volatile) */
4063 static time_t getExpire(redisDb *db, robj *key) {
4064 dictEntry *de;
4065
4066 /* No expire? return ASAP */
4067 if (dictSize(db->expires) == 0 ||
4068 (de = dictFind(db->expires,key)) == NULL) return -1;
4069
4070 return (time_t) dictGetEntryVal(de);
4071 }
4072
4073 static int expireIfNeeded(redisDb *db, robj *key) {
4074 time_t when;
4075 dictEntry *de;
4076
4077 /* No expire? return ASAP */
4078 if (dictSize(db->expires) == 0 ||
4079 (de = dictFind(db->expires,key)) == NULL) return 0;
4080
4081 /* Lookup the expire */
4082 when = (time_t) dictGetEntryVal(de);
4083 if (time(NULL) <= when) return 0;
4084
4085 /* Delete the key */
4086 dictDelete(db->expires,key);
4087 return dictDelete(db->dict,key) == DICT_OK;
4088 }
4089
4090 static int deleteIfVolatile(redisDb *db, robj *key) {
4091 dictEntry *de;
4092
4093 /* No expire? return ASAP */
4094 if (dictSize(db->expires) == 0 ||
4095 (de = dictFind(db->expires,key)) == NULL) return 0;
4096
4097 /* Delete the key */
4098 server.dirty++;
4099 dictDelete(db->expires,key);
4100 return dictDelete(db->dict,key) == DICT_OK;
4101 }
4102
4103 static void expireCommand(redisClient *c) {
4104 dictEntry *de;
4105 int seconds = atoi(c->argv[2]->ptr);
4106
4107 de = dictFind(c->db->dict,c->argv[1]);
4108 if (de == NULL) {
4109 addReply(c,shared.czero);
4110 return;
4111 }
4112 if (seconds <= 0) {
4113 addReply(c, shared.czero);
4114 return;
4115 } else {
4116 time_t when = time(NULL)+seconds;
4117 if (setExpire(c->db,c->argv[1],when)) {
4118 addReply(c,shared.cone);
4119 server.dirty++;
4120 } else {
4121 addReply(c,shared.czero);
4122 }
4123 return;
4124 }
4125 }
4126
4127 static void ttlCommand(redisClient *c) {
4128 time_t expire;
4129 int ttl = -1;
4130
4131 expire = getExpire(c->db,c->argv[1]);
4132 if (expire != -1) {
4133 ttl = (int) (expire-time(NULL));
4134 if (ttl < 0) ttl = -1;
4135 }
4136 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4137 }
4138
4139 static void msetGenericCommand(redisClient *c, int nx) {
4140 int j;
4141
4142 if ((c->argc % 2) == 0) {
4143 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4144 return;
4145 }
4146 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4147 * set nothing at all if at least one already key exists. */
4148 if (nx) {
4149 for (j = 1; j < c->argc; j += 2) {
4150 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4151 addReply(c, shared.czero);
4152 return;
4153 }
4154 }
4155 }
4156
4157 for (j = 1; j < c->argc; j += 2) {
4158 int retval;
4159
4160 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4161 if (retval == DICT_ERR) {
4162 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4163 incrRefCount(c->argv[j+1]);
4164 } else {
4165 incrRefCount(c->argv[j]);
4166 incrRefCount(c->argv[j+1]);
4167 }
4168 removeExpire(c->db,c->argv[j]);
4169 }
4170 server.dirty += (c->argc-1)/2;
4171 addReply(c, nx ? shared.cone : shared.ok);
4172 }
4173
4174 static void msetCommand(redisClient *c) {
4175 msetGenericCommand(c,0);
4176 }
4177
4178 static void msetnxCommand(redisClient *c) {
4179 msetGenericCommand(c,1);
4180 }
4181
4182 /* =============================== Replication ============================= */
4183
4184 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4185 ssize_t nwritten, ret = size;
4186 time_t start = time(NULL);
4187
4188 timeout++;
4189 while(size) {
4190 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4191 nwritten = write(fd,ptr,size);
4192 if (nwritten == -1) return -1;
4193 ptr += nwritten;
4194 size -= nwritten;
4195 }
4196 if ((time(NULL)-start) > timeout) {
4197 errno = ETIMEDOUT;
4198 return -1;
4199 }
4200 }
4201 return ret;
4202 }
4203
4204 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4205 ssize_t nread, totread = 0;
4206 time_t start = time(NULL);
4207
4208 timeout++;
4209 while(size) {
4210 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4211 nread = read(fd,ptr,size);
4212 if (nread == -1) return -1;
4213 ptr += nread;
4214 size -= nread;
4215 totread += nread;
4216 }
4217 if ((time(NULL)-start) > timeout) {
4218 errno = ETIMEDOUT;
4219 return -1;
4220 }
4221 }
4222 return totread;
4223 }
4224
4225 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4226 ssize_t nread = 0;
4227
4228 size--;
4229 while(size) {
4230 char c;
4231
4232 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4233 if (c == '\n') {
4234 *ptr = '\0';
4235 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4236 return nread;
4237 } else {
4238 *ptr++ = c;
4239 *ptr = '\0';
4240 nread++;
4241 }
4242 }
4243 return nread;
4244 }
4245
4246 static void syncCommand(redisClient *c) {
4247 /* ignore SYNC if aleady slave or in monitor mode */
4248 if (c->flags & REDIS_SLAVE) return;
4249
4250 /* SYNC can't be issued when the server has pending data to send to
4251 * the client about already issued commands. We need a fresh reply
4252 * buffer registering the differences between the BGSAVE and the current
4253 * dataset, so that we can copy to other slaves if needed. */
4254 if (listLength(c->reply) != 0) {
4255 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4256 return;
4257 }
4258
4259 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4260 /* Here we need to check if there is a background saving operation
4261 * in progress, or if it is required to start one */
4262 if (server.bgsaveinprogress) {
4263 /* Ok a background save is in progress. Let's check if it is a good
4264 * one for replication, i.e. if there is another slave that is
4265 * registering differences since the server forked to save */
4266 redisClient *slave;
4267 listNode *ln;
4268
4269 listRewind(server.slaves);
4270 while((ln = listYield(server.slaves))) {
4271 slave = ln->value;
4272 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4273 }
4274 if (ln) {
4275 /* Perfect, the server is already registering differences for
4276 * another slave. Set the right state, and copy the buffer. */
4277 listRelease(c->reply);
4278 c->reply = listDup(slave->reply);
4279 if (!c->reply) oom("listDup copying slave reply list");
4280 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4281 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4282 } else {
4283 /* No way, we need to wait for the next BGSAVE in order to
4284 * register differences */
4285 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4286 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4287 }
4288 } else {
4289 /* Ok we don't have a BGSAVE in progress, let's start one */
4290 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4291 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4292 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4293 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4294 return;
4295 }
4296 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4297 }
4298 c->repldbfd = -1;
4299 c->flags |= REDIS_SLAVE;
4300 c->slaveseldb = 0;
4301 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
4302 return;
4303 }
4304
4305 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4306 redisClient *slave = privdata;
4307 REDIS_NOTUSED(el);
4308 REDIS_NOTUSED(mask);
4309 char buf[REDIS_IOBUF_LEN];
4310 ssize_t nwritten, buflen;
4311
4312 if (slave->repldboff == 0) {
4313 /* Write the bulk write count before to transfer the DB. In theory here
4314 * we don't know how much room there is in the output buffer of the
4315 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4316 * operations) will never be smaller than the few bytes we need. */
4317 sds bulkcount;
4318
4319 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4320 slave->repldbsize);
4321 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4322 {
4323 sdsfree(bulkcount);
4324 freeClient(slave);
4325 return;
4326 }
4327 sdsfree(bulkcount);
4328 }
4329 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4330 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4331 if (buflen <= 0) {
4332 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4333 (buflen == 0) ? "premature EOF" : strerror(errno));
4334 freeClient(slave);
4335 return;
4336 }
4337 if ((nwritten = write(fd,buf,buflen)) == -1) {
4338 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4339 strerror(errno));
4340 freeClient(slave);
4341 return;
4342 }
4343 slave->repldboff += nwritten;
4344 if (slave->repldboff == slave->repldbsize) {
4345 close(slave->repldbfd);
4346 slave->repldbfd = -1;
4347 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4348 slave->replstate = REDIS_REPL_ONLINE;
4349 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4350 sendReplyToClient, slave, NULL) == AE_ERR) {
4351 freeClient(slave);
4352 return;
4353 }
4354 addReplySds(slave,sdsempty());
4355 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4356 }
4357 }
4358
4359 /* This function is called at the end of every backgrond saving.
4360 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4361 * otherwise REDIS_ERR is passed to the function.
4362 *
4363 * The goal of this function is to handle slaves waiting for a successful
4364 * background saving in order to perform non-blocking synchronization. */
4365 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4366 listNode *ln;
4367 int startbgsave = 0;
4368
4369 listRewind(server.slaves);
4370 while((ln = listYield(server.slaves))) {
4371 redisClient *slave = ln->value;
4372
4373 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4374 startbgsave = 1;
4375 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4376 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4377 struct redis_stat buf;
4378
4379 if (bgsaveerr != REDIS_OK) {
4380 freeClient(slave);
4381 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4382 continue;
4383 }
4384 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4385 redis_fstat(slave->repldbfd,&buf) == -1) {
4386 freeClient(slave);
4387 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4388 continue;
4389 }
4390 slave->repldboff = 0;
4391 slave->repldbsize = buf.st_size;
4392 slave->replstate = REDIS_REPL_SEND_BULK;
4393 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4394 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4395 freeClient(slave);
4396 continue;
4397 }
4398 }
4399 }
4400 if (startbgsave) {
4401 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4402 listRewind(server.slaves);
4403 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4404 while((ln = listYield(server.slaves))) {
4405 redisClient *slave = ln->value;
4406
4407 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4408 freeClient(slave);
4409 }
4410 }
4411 }
4412 }
4413
4414 static int syncWithMaster(void) {
4415 char buf[1024], tmpfile[256];
4416 int dumpsize;
4417 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4418 int dfd;
4419
4420 if (fd == -1) {
4421 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4422 strerror(errno));
4423 return REDIS_ERR;
4424 }
4425 /* Issue the SYNC command */
4426 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4427 close(fd);
4428 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4429 strerror(errno));
4430 return REDIS_ERR;
4431 }
4432 /* Read the bulk write count */
4433 if (syncReadLine(fd,buf,1024,3600) == -1) {
4434 close(fd);
4435 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
4436 strerror(errno));
4437 return REDIS_ERR;
4438 }
4439 dumpsize = atoi(buf+1);
4440 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4441 /* Read the bulk write data on a temp file */
4442 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4443 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4444 if (dfd == -1) {
4445 close(fd);
4446 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
4447 return REDIS_ERR;
4448 }
4449 while(dumpsize) {
4450 int nread, nwritten;
4451
4452 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
4453 if (nread == -1) {
4454 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
4455 strerror(errno));
4456 close(fd);
4457 close(dfd);
4458 return REDIS_ERR;
4459 }
4460 nwritten = write(dfd,buf,nread);
4461 if (nwritten == -1) {
4462 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
4463 close(fd);
4464 close(dfd);
4465 return REDIS_ERR;
4466 }
4467 dumpsize -= nread;
4468 }
4469 close(dfd);
4470 if (rename(tmpfile,server.dbfilename) == -1) {
4471 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
4472 unlink(tmpfile);
4473 close(fd);
4474 return REDIS_ERR;
4475 }
4476 emptyDb();
4477 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4478 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4479 close(fd);
4480 return REDIS_ERR;
4481 }
4482 server.master = createClient(fd);
4483 server.master->flags |= REDIS_MASTER;
4484 server.replstate = REDIS_REPL_CONNECTED;
4485 return REDIS_OK;
4486 }
4487
4488 static void slaveofCommand(redisClient *c) {
4489 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4490 !strcasecmp(c->argv[2]->ptr,"one")) {
4491 if (server.masterhost) {
4492 sdsfree(server.masterhost);
4493 server.masterhost = NULL;
4494 if (server.master) freeClient(server.master);
4495 server.replstate = REDIS_REPL_NONE;
4496 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4497 }
4498 } else {
4499 sdsfree(server.masterhost);
4500 server.masterhost = sdsdup(c->argv[1]->ptr);
4501 server.masterport = atoi(c->argv[2]->ptr);
4502 if (server.master) freeClient(server.master);
4503 server.replstate = REDIS_REPL_CONNECT;
4504 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4505 server.masterhost, server.masterport);
4506 }
4507 addReply(c,shared.ok);
4508 }
4509
4510 /* ============================ Maxmemory directive ======================== */
4511
4512 /* This function gets called when 'maxmemory' is set on the config file to limit
4513 * the max memory used by the server, and we are out of memory.
4514 * This function will try to, in order:
4515 *
4516 * - Free objects from the free list
4517 * - Try to remove keys with an EXPIRE set
4518 *
4519 * It is not possible to free enough memory to reach used-memory < maxmemory
4520 * the server will start refusing commands that will enlarge even more the
4521 * memory usage.
4522 */
4523 static void freeMemoryIfNeeded(void) {
4524 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4525 if (listLength(server.objfreelist)) {
4526 robj *o;
4527
4528 listNode *head = listFirst(server.objfreelist);
4529 o = listNodeValue(head);
4530 listDelNode(server.objfreelist,head);
4531 zfree(o);
4532 } else {
4533 int j, k, freed = 0;
4534
4535 for (j = 0; j < server.dbnum; j++) {
4536 int minttl = -1;
4537 robj *minkey = NULL;
4538 struct dictEntry *de;
4539
4540 if (dictSize(server.db[j].expires)) {
4541 freed = 1;
4542 /* From a sample of three keys drop the one nearest to
4543 * the natural expire */
4544 for (k = 0; k < 3; k++) {
4545 time_t t;
4546
4547 de = dictGetRandomKey(server.db[j].expires);
4548 t = (time_t) dictGetEntryVal(de);
4549 if (minttl == -1 || t < minttl) {
4550 minkey = dictGetEntryKey(de);
4551 minttl = t;
4552 }
4553 }
4554 deleteKey(server.db+j,minkey);
4555 }
4556 }
4557 if (!freed) return; /* nothing to free... */
4558 }
4559 }
4560 }
4561
4562 /* ================================= Debugging ============================== */
4563
4564 static void debugCommand(redisClient *c) {
4565 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4566 *((char*)-1) = 'x';
4567 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4568 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4569 robj *key, *val;
4570
4571 if (!de) {
4572 addReply(c,shared.nokeyerr);
4573 return;
4574 }
4575 key = dictGetEntryKey(de);
4576 val = dictGetEntryVal(de);
4577 addReplySds(c,sdscatprintf(sdsempty(),
4578 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4579 key, key->refcount, val, val->refcount, val->encoding));
4580 } else {
4581 addReplySds(c,sdsnew(
4582 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4583 }
4584 }
4585
4586 #ifdef HAVE_BACKTRACE
4587 static struct redisFunctionSym symsTable[] = {
4588 {"compareStringObjects", (unsigned long)compareStringObjects},
4589 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
4590 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
4591 {"dictEncObjHash", (unsigned long)dictEncObjHash},
4592 {"incrDecrCommand", (unsigned long)incrDecrCommand},
4593 {"freeStringObject", (unsigned long)freeStringObject},
4594 {"freeListObject", (unsigned long)freeListObject},
4595 {"freeSetObject", (unsigned long)freeSetObject},
4596 {"decrRefCount", (unsigned long)decrRefCount},
4597 {"createObject", (unsigned long)createObject},
4598 {"freeClient", (unsigned long)freeClient},
4599 {"rdbLoad", (unsigned long)rdbLoad},
4600 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
4601 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
4602 {"addReply", (unsigned long)addReply},
4603 {"addReplySds", (unsigned long)addReplySds},
4604 {"incrRefCount", (unsigned long)incrRefCount},
4605 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
4606 {"createStringObject", (unsigned long)createStringObject},
4607 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
4608 {"syncWithMaster", (unsigned long)syncWithMaster},
4609 {"tryObjectSharing", (unsigned long)tryObjectSharing},
4610 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
4611 {"getDecodedObject", (unsigned long)getDecodedObject},
4612 {"removeExpire", (unsigned long)removeExpire},
4613 {"expireIfNeeded", (unsigned long)expireIfNeeded},
4614 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
4615 {"deleteKey", (unsigned long)deleteKey},
4616 {"getExpire", (unsigned long)getExpire},
4617 {"setExpire", (unsigned long)setExpire},
4618 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
4619 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
4620 {"authCommand", (unsigned long)authCommand},
4621 {"pingCommand", (unsigned long)pingCommand},
4622 {"echoCommand", (unsigned long)echoCommand},
4623 {"setCommand", (unsigned long)setCommand},
4624 {"setnxCommand", (unsigned long)setnxCommand},
4625 {"getCommand", (unsigned long)getCommand},
4626 {"delCommand", (unsigned long)delCommand},
4627 {"existsCommand", (unsigned long)existsCommand},
4628 {"incrCommand", (unsigned long)incrCommand},
4629 {"decrCommand", (unsigned long)decrCommand},
4630 {"incrbyCommand", (unsigned long)incrbyCommand},
4631 {"decrbyCommand", (unsigned long)decrbyCommand},
4632 {"selectCommand", (unsigned long)selectCommand},
4633 {"randomkeyCommand", (unsigned long)randomkeyCommand},
4634 {"keysCommand", (unsigned long)keysCommand},
4635 {"dbsizeCommand", (unsigned long)dbsizeCommand},
4636 {"lastsaveCommand", (unsigned long)lastsaveCommand},
4637 {"saveCommand", (unsigned long)saveCommand},
4638 {"bgsaveCommand", (unsigned long)bgsaveCommand},
4639 {"shutdownCommand", (unsigned long)shutdownCommand},
4640 {"moveCommand", (unsigned long)moveCommand},
4641 {"renameCommand", (unsigned long)renameCommand},
4642 {"renamenxCommand", (unsigned long)renamenxCommand},
4643 {"lpushCommand", (unsigned long)lpushCommand},
4644 {"rpushCommand", (unsigned long)rpushCommand},
4645 {"lpopCommand", (unsigned long)lpopCommand},
4646 {"rpopCommand", (unsigned long)rpopCommand},
4647 {"llenCommand", (unsigned long)llenCommand},
4648 {"lindexCommand", (unsigned long)lindexCommand},
4649 {"lrangeCommand", (unsigned long)lrangeCommand},
4650 {"ltrimCommand", (unsigned long)ltrimCommand},
4651 {"typeCommand", (unsigned long)typeCommand},
4652 {"lsetCommand", (unsigned long)lsetCommand},
4653 {"saddCommand", (unsigned long)saddCommand},
4654 {"sremCommand", (unsigned long)sremCommand},
4655 {"smoveCommand", (unsigned long)smoveCommand},
4656 {"sismemberCommand", (unsigned long)sismemberCommand},
4657 {"scardCommand", (unsigned long)scardCommand},
4658 {"spopCommand", (unsigned long)spopCommand},
4659 {"srandmemberCommand", (unsigned long)srandmemberCommand},
4660 {"sinterCommand", (unsigned long)sinterCommand},
4661 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
4662 {"sunionCommand", (unsigned long)sunionCommand},
4663 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
4664 {"sdiffCommand", (unsigned long)sdiffCommand},
4665 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
4666 {"syncCommand", (unsigned long)syncCommand},
4667 {"flushdbCommand", (unsigned long)flushdbCommand},
4668 {"flushallCommand", (unsigned long)flushallCommand},
4669 {"sortCommand", (unsigned long)sortCommand},
4670 {"lremCommand", (unsigned long)lremCommand},
4671 {"infoCommand", (unsigned long)infoCommand},
4672 {"mgetCommand", (unsigned long)mgetCommand},
4673 {"monitorCommand", (unsigned long)monitorCommand},
4674 {"expireCommand", (unsigned long)expireCommand},
4675 {"getsetCommand", (unsigned long)getsetCommand},
4676 {"ttlCommand", (unsigned long)ttlCommand},
4677 {"slaveofCommand", (unsigned long)slaveofCommand},
4678 {"debugCommand", (unsigned long)debugCommand},
4679 {"processCommand", (unsigned long)processCommand},
4680 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
4681 {"readQueryFromClient", (unsigned long)readQueryFromClient},
4682 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
4683 {"msetGenericCommand", (unsigned long)msetGenericCommand},
4684 {"msetCommand", (unsigned long)msetCommand},
4685 {"msetnxCommand", (unsigned long)msetnxCommand},
4686 {NULL,0}
4687 };
4688
4689 /* This function try to convert a pointer into a function name. It's used in
4690 * oreder to provide a backtrace under segmentation fault that's able to
4691 * display functions declared as static (otherwise the backtrace is useless). */
4692 static char *findFuncName(void *pointer, unsigned long *offset){
4693 int i, ret = -1;
4694 unsigned long off, minoff = 0;
4695
4696 /* Try to match against the Symbol with the smallest offset */
4697 for (i=0; symsTable[i].pointer; i++) {
4698 unsigned long lp = (unsigned long) pointer;
4699
4700 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
4701 off=lp-symsTable[i].pointer;
4702 if (ret < 0 || off < minoff) {
4703 minoff=off;
4704 ret=i;
4705 }
4706 }
4707 }
4708 if (ret == -1) return NULL;
4709 *offset = minoff;
4710 return symsTable[ret].name;
4711 }
4712
4713 static void *getMcontextEip(ucontext_t *uc) {
4714 #if defined(__FreeBSD__)
4715 return (void*) uc->uc_mcontext.mc_eip;
4716 #elif defined(__dietlibc__)
4717 return (void*) uc->uc_mcontext.eip;
4718 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
4719 return (void*) uc->uc_mcontext->__ss.__eip;
4720 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
4721 #ifdef _STRUCT_X86_THREAD_STATE64
4722 return (void*) uc->uc_mcontext->__ss.__rip;
4723 #else
4724 return (void*) uc->uc_mcontext->__ss.__eip;
4725 #endif
4726 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
4727 return (void*) uc->uc_mcontext.gregs[REG_EIP];
4728 #elif defined(__ia64__) /* Linux IA64 */
4729 return (void*) uc->uc_mcontext.sc_ip;
4730 #else
4731 return NULL;
4732 #endif
4733 }
4734
4735 static void segvHandler(int sig, siginfo_t *info, void *secret) {
4736 void *trace[100];
4737 char **messages = NULL;
4738 int i, trace_size = 0;
4739 unsigned long offset=0;
4740 time_t uptime = time(NULL)-server.stat_starttime;
4741 ucontext_t *uc = (ucontext_t*) secret;
4742 REDIS_NOTUSED(info);
4743
4744 redisLog(REDIS_WARNING,
4745 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
4746 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
4747 "redis_version:%s; "
4748 "uptime_in_seconds:%d; "
4749 "connected_clients:%d; "
4750 "connected_slaves:%d; "
4751 "used_memory:%zu; "
4752 "changes_since_last_save:%lld; "
4753 "bgsave_in_progress:%d; "
4754 "last_save_time:%d; "
4755 "total_connections_received:%lld; "
4756 "total_commands_processed:%lld; "
4757 "role:%s;"
4758 ,REDIS_VERSION,
4759 uptime,
4760 listLength(server.clients)-listLength(server.slaves),
4761 listLength(server.slaves),
4762 server.usedmemory,
4763 server.dirty,
4764 server.bgsaveinprogress,
4765 server.lastsave,
4766 server.stat_numconnections,
4767 server.stat_numcommands,
4768 server.masterhost == NULL ? "master" : "slave"
4769 ));
4770
4771 trace_size = backtrace(trace, 100);
4772 /* overwrite sigaction with caller's address */
4773 if (getMcontextEip(uc) != NULL) {
4774 trace[1] = getMcontextEip(uc);
4775 }
4776 messages = backtrace_symbols(trace, trace_size);
4777
4778 for (i=1; i<trace_size; ++i) {
4779 char *fn = findFuncName(trace[i], &offset), *p;
4780
4781 p = strchr(messages[i],'+');
4782 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
4783 redisLog(REDIS_WARNING,"%s", messages[i]);
4784 } else {
4785 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
4786 }
4787 }
4788 free(messages);
4789 exit(0);
4790 }
4791
4792 static void setupSigSegvAction(void) {
4793 struct sigaction act;
4794
4795 sigemptyset (&act.sa_mask);
4796 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4797 * is used. Otherwise, sa_handler is used */
4798 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
4799 act.sa_sigaction = segvHandler;
4800 sigaction (SIGSEGV, &act, NULL);
4801 sigaction (SIGBUS, &act, NULL);
4802 sigaction (SIGFPE, &act, NULL);
4803 sigaction (SIGILL, &act, NULL);
4804 sigaction (SIGBUS, &act, NULL);
4805 return;
4806 }
4807 #else /* HAVE_BACKTRACE */
4808 static void setupSigSegvAction(void) {
4809 }
4810 #endif /* HAVE_BACKTRACE */
4811
4812 /* =================================== Main! ================================ */
4813
4814 #ifdef __linux__
4815 int linuxOvercommitMemoryValue(void) {
4816 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4817 char buf[64];
4818
4819 if (!fp) return -1;
4820 if (fgets(buf,64,fp) == NULL) {
4821 fclose(fp);
4822 return -1;
4823 }
4824 fclose(fp);
4825
4826 return atoi(buf);
4827 }
4828
4829 void linuxOvercommitMemoryWarning(void) {
4830 if (linuxOvercommitMemoryValue() == 0) {
4831 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
4832 }
4833 }
4834 #endif /* __linux__ */
4835
4836 static void daemonize(void) {
4837 int fd;
4838 FILE *fp;
4839
4840 if (fork() != 0) exit(0); /* parent exits */
4841 setsid(); /* create a new session */
4842
4843 /* Every output goes to /dev/null. If Redis is daemonized but
4844 * the 'logfile' is set to 'stdout' in the configuration file
4845 * it will not log at all. */
4846 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4847 dup2(fd, STDIN_FILENO);
4848 dup2(fd, STDOUT_FILENO);
4849 dup2(fd, STDERR_FILENO);
4850 if (fd > STDERR_FILENO) close(fd);
4851 }
4852 /* Try to write the pid file */
4853 fp = fopen(server.pidfile,"w");
4854 if (fp) {
4855 fprintf(fp,"%d\n",getpid());
4856 fclose(fp);
4857 }
4858 }
4859
4860 int main(int argc, char **argv) {
4861 initServerConfig();
4862 if (argc == 2) {
4863 ResetServerSaveParams();
4864 loadServerConfig(argv[1]);
4865 } else if (argc > 2) {
4866 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4867 exit(1);
4868 } else {
4869 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4870 }
4871 initServer();
4872 if (server.daemonize) daemonize();
4873 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4874 #ifdef __linux__
4875 linuxOvercommitMemoryWarning();
4876 #endif
4877 if (rdbLoad(server.dbfilename) == REDIS_OK)
4878 redisLog(REDIS_NOTICE,"DB loaded from disk");
4879 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4880 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4881 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4882 aeMain(server.el);
4883 aeDeleteEventLoop(server.el);
4884 return 0;
4885 }