]> git.saurik.com Git - redis.git/blob - redis.c
version incremented to 1.050 to distinguish from 1.001 stable and next stable version...
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60
61 #include "redis.h"
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
70
71 /* Error codes */
72 #define REDIS_OK 0
73 #define REDIS_ERR -1
74
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
88
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
91
92 /* Command flags */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
100
101 /* Object types */
102 #define REDIS_STRING 0
103 #define REDIS_LIST 1
104 #define REDIS_SET 2
105 #define REDIS_ZSET 3
106 #define REDIS_HASH 4
107
108 /* Objects encoding */
109 #define REDIS_ENCODING_RAW 0 /* Raw representation */
110 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
111
112 /* Object types only used for dumping to disk */
113 #define REDIS_EXPIRETIME 253
114 #define REDIS_SELECTDB 254
115 #define REDIS_EOF 255
116
117 /* Defines related to the dump file format. To store 32 bits lengths for short
118 * keys requires a lot of space, so we check the most significant 2 bits of
119 * the first byte to interpreter the length:
120 *
121 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
122 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
123 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
124 * 11|000000 this means: specially encoded object will follow. The six bits
125 * number specify the kind of object that follows.
126 * See the REDIS_RDB_ENC_* defines.
127 *
128 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
129 * values, will fit inside. */
130 #define REDIS_RDB_6BITLEN 0
131 #define REDIS_RDB_14BITLEN 1
132 #define REDIS_RDB_32BITLEN 2
133 #define REDIS_RDB_ENCVAL 3
134 #define REDIS_RDB_LENERR UINT_MAX
135
136 /* When a length of a string object stored on disk has the first two bits
137 * set, the remaining two bits specify a special encoding for the object
138 * accordingly to the following defines: */
139 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
140 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
141 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
142 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
143
144 /* Client flags */
145 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
146 #define REDIS_SLAVE 2 /* This client is a slave server */
147 #define REDIS_MASTER 4 /* This client is a master server */
148 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
149
150 /* Slave replication state - slave side */
151 #define REDIS_REPL_NONE 0 /* No active replication */
152 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
153 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
154
155 /* Slave replication state - from the point of view of master
156 * Note that in SEND_BULK and ONLINE state the slave receives new updates
157 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
158 * to start the next background saving in order to send updates to it. */
159 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
160 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
161 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
162 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
163
164 /* List related stuff */
165 #define REDIS_HEAD 0
166 #define REDIS_TAIL 1
167
168 /* Sort operations */
169 #define REDIS_SORT_GET 0
170 #define REDIS_SORT_DEL 1
171 #define REDIS_SORT_INCR 2
172 #define REDIS_SORT_DECR 3
173 #define REDIS_SORT_ASC 4
174 #define REDIS_SORT_DESC 5
175 #define REDIS_SORTKEY_MAX 1024
176
177 /* Log levels */
178 #define REDIS_DEBUG 0
179 #define REDIS_NOTICE 1
180 #define REDIS_WARNING 2
181
182 /* Anti-warning macro... */
183 #define REDIS_NOTUSED(V) ((void) V)
184
185
186 /*================================= Data types ============================== */
187
188 /* A redis object, that is a type able to hold a string / list / set */
189 typedef struct redisObject {
190 void *ptr;
191 unsigned char type;
192 unsigned char encoding;
193 unsigned char notused[2];
194 int refcount;
195 } robj;
196
197 typedef struct redisDb {
198 dict *dict;
199 dict *expires;
200 int id;
201 } redisDb;
202
203 /* With multiplexing we need to take per-clinet state.
204 * Clients are taken in a liked list. */
205 typedef struct redisClient {
206 int fd;
207 redisDb *db;
208 int dictid;
209 sds querybuf;
210 robj **argv, **mbargv;
211 int argc, mbargc;
212 int bulklen; /* bulk read len. -1 if not in bulk read mode */
213 int multibulk; /* multi bulk command format active */
214 list *reply;
215 int sentlen;
216 time_t lastinteraction; /* time of the last interaction, used for timeout */
217 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
218 int slaveseldb; /* slave selected db, if this client is a slave */
219 int authenticated; /* when requirepass is non-NULL */
220 int replstate; /* replication state if this is a slave */
221 int repldbfd; /* replication DB file descriptor */
222 long repldboff; /* replication DB file offset */
223 off_t repldbsize; /* replication DB file size */
224 } redisClient;
225
226 struct saveparam {
227 time_t seconds;
228 int changes;
229 };
230
231 /* Global server state structure */
232 struct redisServer {
233 int port;
234 int fd;
235 redisDb *db;
236 dict *sharingpool;
237 unsigned int sharingpoolsize;
238 long long dirty; /* changes to DB from the last save */
239 list *clients;
240 list *slaves, *monitors;
241 char neterr[ANET_ERR_LEN];
242 aeEventLoop *el;
243 int cronloops; /* number of times the cron function run */
244 list *objfreelist; /* A list of freed objects to avoid malloc() */
245 time_t lastsave; /* Unix time of last save succeeede */
246 size_t usedmemory; /* Used memory in megabytes */
247 /* Fields used only for stats */
248 time_t stat_starttime; /* server start time */
249 long long stat_numcommands; /* number of processed commands */
250 long long stat_numconnections; /* number of connections received */
251 /* Configuration */
252 int verbosity;
253 int glueoutputbuf;
254 int maxidletime;
255 int dbnum;
256 int daemonize;
257 char *pidfile;
258 int bgsaveinprogress;
259 pid_t bgsavechildpid;
260 struct saveparam *saveparams;
261 int saveparamslen;
262 char *logfile;
263 char *bindaddr;
264 char *dbfilename;
265 char *requirepass;
266 int shareobjects;
267 /* Replication related */
268 int isslave;
269 char *masterhost;
270 int masterport;
271 redisClient *master; /* client that is master for this slave */
272 int replstate;
273 unsigned int maxclients;
274 unsigned long maxmemory;
275 /* Sort parameters - qsort_r() is only available under BSD so we
276 * have to take this state global, in order to pass it to sortCompare() */
277 int sort_desc;
278 int sort_alpha;
279 int sort_bypattern;
280 };
281
282 typedef void redisCommandProc(redisClient *c);
283 struct redisCommand {
284 char *name;
285 redisCommandProc *proc;
286 int arity;
287 int flags;
288 };
289
290 struct redisFunctionSym {
291 char *name;
292 unsigned long pointer;
293 };
294
295 typedef struct _redisSortObject {
296 robj *obj;
297 union {
298 double score;
299 robj *cmpobj;
300 } u;
301 } redisSortObject;
302
303 typedef struct _redisSortOperation {
304 int type;
305 robj *pattern;
306 } redisSortOperation;
307
308 typedef struct zset {
309 dict *dict;
310 tree *tree;
311 } zset;
312
313 struct sharedObjectsStruct {
314 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
315 *colon, *nullbulk, *nullmultibulk,
316 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
317 *outofrangeerr, *plus,
318 *select0, *select1, *select2, *select3, *select4,
319 *select5, *select6, *select7, *select8, *select9;
320 } shared;
321
322 /*================================ Prototypes =============================== */
323
324 static void freeStringObject(robj *o);
325 static void freeListObject(robj *o);
326 static void freeSetObject(robj *o);
327 static void decrRefCount(void *o);
328 static robj *createObject(int type, void *ptr);
329 static void freeClient(redisClient *c);
330 static int rdbLoad(char *filename);
331 static void addReply(redisClient *c, robj *obj);
332 static void addReplySds(redisClient *c, sds s);
333 static void incrRefCount(robj *o);
334 static int rdbSaveBackground(char *filename);
335 static robj *createStringObject(char *ptr, size_t len);
336 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
337 static int syncWithMaster(void);
338 static robj *tryObjectSharing(robj *o);
339 static int tryObjectEncoding(robj *o);
340 static robj *getDecodedObject(const robj *o);
341 static int removeExpire(redisDb *db, robj *key);
342 static int expireIfNeeded(redisDb *db, robj *key);
343 static int deleteIfVolatile(redisDb *db, robj *key);
344 static int deleteKey(redisDb *db, robj *key);
345 static time_t getExpire(redisDb *db, robj *key);
346 static int setExpire(redisDb *db, robj *key, time_t when);
347 static void updateSlavesWaitingBgsave(int bgsaveerr);
348 static void freeMemoryIfNeeded(void);
349 static int processCommand(redisClient *c);
350 static void setupSigSegvAction(void);
351 static void rdbRemoveTempFile(pid_t childpid);
352 static size_t stringObjectLen(robj *o);
353 static void processInputBuffer(redisClient *c);
354
355 static void authCommand(redisClient *c);
356 static void pingCommand(redisClient *c);
357 static void echoCommand(redisClient *c);
358 static void setCommand(redisClient *c);
359 static void setnxCommand(redisClient *c);
360 static void getCommand(redisClient *c);
361 static void delCommand(redisClient *c);
362 static void existsCommand(redisClient *c);
363 static void incrCommand(redisClient *c);
364 static void decrCommand(redisClient *c);
365 static void incrbyCommand(redisClient *c);
366 static void decrbyCommand(redisClient *c);
367 static void selectCommand(redisClient *c);
368 static void randomkeyCommand(redisClient *c);
369 static void keysCommand(redisClient *c);
370 static void dbsizeCommand(redisClient *c);
371 static void lastsaveCommand(redisClient *c);
372 static void saveCommand(redisClient *c);
373 static void bgsaveCommand(redisClient *c);
374 static void shutdownCommand(redisClient *c);
375 static void moveCommand(redisClient *c);
376 static void renameCommand(redisClient *c);
377 static void renamenxCommand(redisClient *c);
378 static void lpushCommand(redisClient *c);
379 static void rpushCommand(redisClient *c);
380 static void lpopCommand(redisClient *c);
381 static void rpopCommand(redisClient *c);
382 static void llenCommand(redisClient *c);
383 static void lindexCommand(redisClient *c);
384 static void lrangeCommand(redisClient *c);
385 static void ltrimCommand(redisClient *c);
386 static void typeCommand(redisClient *c);
387 static void lsetCommand(redisClient *c);
388 static void saddCommand(redisClient *c);
389 static void sremCommand(redisClient *c);
390 static void smoveCommand(redisClient *c);
391 static void sismemberCommand(redisClient *c);
392 static void scardCommand(redisClient *c);
393 static void spopCommand(redisClient *c);
394 static void srandmemberCommand(redisClient *c);
395 static void sinterCommand(redisClient *c);
396 static void sinterstoreCommand(redisClient *c);
397 static void sunionCommand(redisClient *c);
398 static void sunionstoreCommand(redisClient *c);
399 static void sdiffCommand(redisClient *c);
400 static void sdiffstoreCommand(redisClient *c);
401 static void syncCommand(redisClient *c);
402 static void flushdbCommand(redisClient *c);
403 static void flushallCommand(redisClient *c);
404 static void sortCommand(redisClient *c);
405 static void lremCommand(redisClient *c);
406 static void infoCommand(redisClient *c);
407 static void mgetCommand(redisClient *c);
408 static void monitorCommand(redisClient *c);
409 static void expireCommand(redisClient *c);
410 static void getsetCommand(redisClient *c);
411 static void ttlCommand(redisClient *c);
412 static void slaveofCommand(redisClient *c);
413 static void debugCommand(redisClient *c);
414 static void msetCommand(redisClient *c);
415 static void msetnxCommand(redisClient *c);
416
417 /*================================= Globals ================================= */
418
419 /* Global vars */
420 static struct redisServer server; /* server global state */
421 static struct redisCommand cmdTable[] = {
422 {"get",getCommand,2,REDIS_CMD_INLINE},
423 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
424 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
425 {"del",delCommand,-2,REDIS_CMD_INLINE},
426 {"exists",existsCommand,2,REDIS_CMD_INLINE},
427 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
428 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
429 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
430 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
431 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
432 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
433 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
434 {"llen",llenCommand,2,REDIS_CMD_INLINE},
435 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
436 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
437 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
438 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
439 {"lrem",lremCommand,4,REDIS_CMD_BULK},
440 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
441 {"srem",sremCommand,3,REDIS_CMD_BULK},
442 {"smove",smoveCommand,4,REDIS_CMD_BULK},
443 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
444 {"scard",scardCommand,2,REDIS_CMD_INLINE},
445 {"spop",spopCommand,2,REDIS_CMD_INLINE},
446 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
447 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
448 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
449 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
450 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
451 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
452 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
453 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
454 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
455 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
456 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
457 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
458 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
459 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
460 {"select",selectCommand,2,REDIS_CMD_INLINE},
461 {"move",moveCommand,3,REDIS_CMD_INLINE},
462 {"rename",renameCommand,3,REDIS_CMD_INLINE},
463 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
464 {"expire",expireCommand,3,REDIS_CMD_INLINE},
465 {"keys",keysCommand,2,REDIS_CMD_INLINE},
466 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
467 {"auth",authCommand,2,REDIS_CMD_INLINE},
468 {"ping",pingCommand,1,REDIS_CMD_INLINE},
469 {"echo",echoCommand,2,REDIS_CMD_BULK},
470 {"save",saveCommand,1,REDIS_CMD_INLINE},
471 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
472 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
473 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
474 {"type",typeCommand,2,REDIS_CMD_INLINE},
475 {"sync",syncCommand,1,REDIS_CMD_INLINE},
476 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
477 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
478 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
479 {"info",infoCommand,1,REDIS_CMD_INLINE},
480 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
481 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
482 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
483 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
484 {NULL,NULL,0,0}
485 };
486 /*============================ Utility functions ============================ */
487
488 /* Glob-style pattern matching. */
489 int stringmatchlen(const char *pattern, int patternLen,
490 const char *string, int stringLen, int nocase)
491 {
492 while(patternLen) {
493 switch(pattern[0]) {
494 case '*':
495 while (pattern[1] == '*') {
496 pattern++;
497 patternLen--;
498 }
499 if (patternLen == 1)
500 return 1; /* match */
501 while(stringLen) {
502 if (stringmatchlen(pattern+1, patternLen-1,
503 string, stringLen, nocase))
504 return 1; /* match */
505 string++;
506 stringLen--;
507 }
508 return 0; /* no match */
509 break;
510 case '?':
511 if (stringLen == 0)
512 return 0; /* no match */
513 string++;
514 stringLen--;
515 break;
516 case '[':
517 {
518 int not, match;
519
520 pattern++;
521 patternLen--;
522 not = pattern[0] == '^';
523 if (not) {
524 pattern++;
525 patternLen--;
526 }
527 match = 0;
528 while(1) {
529 if (pattern[0] == '\\') {
530 pattern++;
531 patternLen--;
532 if (pattern[0] == string[0])
533 match = 1;
534 } else if (pattern[0] == ']') {
535 break;
536 } else if (patternLen == 0) {
537 pattern--;
538 patternLen++;
539 break;
540 } else if (pattern[1] == '-' && patternLen >= 3) {
541 int start = pattern[0];
542 int end = pattern[2];
543 int c = string[0];
544 if (start > end) {
545 int t = start;
546 start = end;
547 end = t;
548 }
549 if (nocase) {
550 start = tolower(start);
551 end = tolower(end);
552 c = tolower(c);
553 }
554 pattern += 2;
555 patternLen -= 2;
556 if (c >= start && c <= end)
557 match = 1;
558 } else {
559 if (!nocase) {
560 if (pattern[0] == string[0])
561 match = 1;
562 } else {
563 if (tolower((int)pattern[0]) == tolower((int)string[0]))
564 match = 1;
565 }
566 }
567 pattern++;
568 patternLen--;
569 }
570 if (not)
571 match = !match;
572 if (!match)
573 return 0; /* no match */
574 string++;
575 stringLen--;
576 break;
577 }
578 case '\\':
579 if (patternLen >= 2) {
580 pattern++;
581 patternLen--;
582 }
583 /* fall through */
584 default:
585 if (!nocase) {
586 if (pattern[0] != string[0])
587 return 0; /* no match */
588 } else {
589 if (tolower((int)pattern[0]) != tolower((int)string[0]))
590 return 0; /* no match */
591 }
592 string++;
593 stringLen--;
594 break;
595 }
596 pattern++;
597 patternLen--;
598 if (stringLen == 0) {
599 while(*pattern == '*') {
600 pattern++;
601 patternLen--;
602 }
603 break;
604 }
605 }
606 if (patternLen == 0 && stringLen == 0)
607 return 1;
608 return 0;
609 }
610
611 static void redisLog(int level, const char *fmt, ...) {
612 va_list ap;
613 FILE *fp;
614
615 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
616 if (!fp) return;
617
618 va_start(ap, fmt);
619 if (level >= server.verbosity) {
620 char *c = ".-*";
621 char buf[64];
622 time_t now;
623
624 now = time(NULL);
625 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
626 fprintf(fp,"%s %c ",buf,c[level]);
627 vfprintf(fp, fmt, ap);
628 fprintf(fp,"\n");
629 fflush(fp);
630 }
631 va_end(ap);
632
633 if (server.logfile) fclose(fp);
634 }
635
636 /*====================== Hash table type implementation ==================== */
637
638 /* This is an hash table type that uses the SDS dynamic strings libary as
639 * keys and radis objects as values (objects can hold SDS strings,
640 * lists, sets). */
641
642 static void dictVanillaFree(void *privdata, void *val)
643 {
644 DICT_NOTUSED(privdata);
645 zfree(val);
646 }
647
648 static int sdsDictKeyCompare(void *privdata, const void *key1,
649 const void *key2)
650 {
651 int l1,l2;
652 DICT_NOTUSED(privdata);
653
654 l1 = sdslen((sds)key1);
655 l2 = sdslen((sds)key2);
656 if (l1 != l2) return 0;
657 return memcmp(key1, key2, l1) == 0;
658 }
659
660 static void dictRedisObjectDestructor(void *privdata, void *val)
661 {
662 DICT_NOTUSED(privdata);
663
664 decrRefCount(val);
665 }
666
667 static int dictObjKeyCompare(void *privdata, const void *key1,
668 const void *key2)
669 {
670 const robj *o1 = key1, *o2 = key2;
671 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
672 }
673
674 static unsigned int dictObjHash(const void *key) {
675 const robj *o = key;
676 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
677 }
678
679 static int dictEncObjKeyCompare(void *privdata, const void *key1,
680 const void *key2)
681 {
682 const robj *o1 = key1, *o2 = key2;
683
684 if (o1->encoding == REDIS_ENCODING_RAW &&
685 o2->encoding == REDIS_ENCODING_RAW)
686 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
687 else {
688 robj *dec1, *dec2;
689 int cmp;
690
691 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
692 getDecodedObject(o1) : (robj*)o1;
693 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
694 getDecodedObject(o2) : (robj*)o2;
695 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
696 if (dec1 != o1) decrRefCount(dec1);
697 if (dec2 != o2) decrRefCount(dec2);
698 return cmp;
699 }
700 }
701
702 static unsigned int dictEncObjHash(const void *key) {
703 const robj *o = key;
704
705 if (o->encoding == REDIS_ENCODING_RAW)
706 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
707 else {
708 robj *dec = getDecodedObject(o);
709 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
710 decrRefCount(dec);
711 return hash;
712 }
713 }
714
715 static dictType setDictType = {
716 dictEncObjHash, /* hash function */
717 NULL, /* key dup */
718 NULL, /* val dup */
719 dictEncObjKeyCompare, /* key compare */
720 dictRedisObjectDestructor, /* key destructor */
721 NULL /* val destructor */
722 };
723
724 static dictType zsetDictType = {
725 dictEncObjHash, /* hash function */
726 NULL, /* key dup */
727 NULL, /* val dup */
728 dictEncObjKeyCompare, /* key compare */
729 dictRedisObjectDestructor, /* key destructor */
730 dictVanillaFree /* val destructor */
731 };
732
733 static dictType hashDictType = {
734 dictObjHash, /* hash function */
735 NULL, /* key dup */
736 NULL, /* val dup */
737 dictObjKeyCompare, /* key compare */
738 dictRedisObjectDestructor, /* key destructor */
739 dictRedisObjectDestructor /* val destructor */
740 };
741
742 /* ========================= Random utility functions ======================= */
743
744 /* Redis generally does not try to recover from out of memory conditions
745 * when allocating objects or strings, it is not clear if it will be possible
746 * to report this condition to the client since the networking layer itself
747 * is based on heap allocation for send buffers, so we simply abort.
748 * At least the code will be simpler to read... */
749 static void oom(const char *msg) {
750 fprintf(stderr, "%s: Out of memory\n",msg);
751 fflush(stderr);
752 sleep(1);
753 abort();
754 }
755
756 /* ====================== Redis server networking stuff ===================== */
757 static void closeTimedoutClients(void) {
758 redisClient *c;
759 listNode *ln;
760 time_t now = time(NULL);
761
762 listRewind(server.clients);
763 while ((ln = listYield(server.clients)) != NULL) {
764 c = listNodeValue(ln);
765 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
766 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
767 (now - c->lastinteraction > server.maxidletime)) {
768 redisLog(REDIS_DEBUG,"Closing idle client");
769 freeClient(c);
770 }
771 }
772 }
773
774 static int htNeedsResize(dict *dict) {
775 long long size, used;
776
777 size = dictSlots(dict);
778 used = dictSize(dict);
779 return (size && used && size > DICT_HT_INITIAL_SIZE &&
780 (used*100/size < REDIS_HT_MINFILL));
781 }
782
783 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
784 * we resize the hash table to save memory */
785 static void tryResizeHashTables(void) {
786 int j;
787
788 for (j = 0; j < server.dbnum; j++) {
789 if (htNeedsResize(server.db[j].dict)) {
790 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
791 dictResize(server.db[j].dict);
792 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
793 }
794 if (htNeedsResize(server.db[j].expires))
795 dictResize(server.db[j].expires);
796 }
797 }
798
799 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
800 int j, loops = server.cronloops++;
801 REDIS_NOTUSED(eventLoop);
802 REDIS_NOTUSED(id);
803 REDIS_NOTUSED(clientData);
804
805 /* Update the global state with the amount of used memory */
806 server.usedmemory = zmalloc_used_memory();
807
808 /* Show some info about non-empty databases */
809 for (j = 0; j < server.dbnum; j++) {
810 long long size, used, vkeys;
811
812 size = dictSlots(server.db[j].dict);
813 used = dictSize(server.db[j].dict);
814 vkeys = dictSize(server.db[j].expires);
815 if (!(loops % 5) && (used || vkeys)) {
816 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
817 /* dictPrintStats(server.dict); */
818 }
819 }
820
821 /* We don't want to resize the hash tables while a bacground saving
822 * is in progress: the saving child is created using fork() that is
823 * implemented with a copy-on-write semantic in most modern systems, so
824 * if we resize the HT while there is the saving child at work actually
825 * a lot of memory movements in the parent will cause a lot of pages
826 * copied. */
827 if (!server.bgsaveinprogress) tryResizeHashTables();
828
829 /* Show information about connected clients */
830 if (!(loops % 5)) {
831 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
832 listLength(server.clients)-listLength(server.slaves),
833 listLength(server.slaves),
834 server.usedmemory,
835 dictSize(server.sharingpool));
836 }
837
838 /* Close connections of timedout clients */
839 if (server.maxidletime && !(loops % 10))
840 closeTimedoutClients();
841
842 /* Check if a background saving in progress terminated */
843 if (server.bgsaveinprogress) {
844 int statloc;
845 if (wait4(-1,&statloc,WNOHANG,NULL)) {
846 int exitcode = WEXITSTATUS(statloc);
847 int bysignal = WIFSIGNALED(statloc);
848
849 if (!bysignal && exitcode == 0) {
850 redisLog(REDIS_NOTICE,
851 "Background saving terminated with success");
852 server.dirty = 0;
853 server.lastsave = time(NULL);
854 } else if (!bysignal && exitcode != 0) {
855 redisLog(REDIS_WARNING, "Background saving error");
856 } else {
857 redisLog(REDIS_WARNING,
858 "Background saving terminated by signal");
859 rdbRemoveTempFile(server.bgsavechildpid);
860 }
861 server.bgsaveinprogress = 0;
862 server.bgsavechildpid = -1;
863 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
864 }
865 } else {
866 /* If there is not a background saving in progress check if
867 * we have to save now */
868 time_t now = time(NULL);
869 for (j = 0; j < server.saveparamslen; j++) {
870 struct saveparam *sp = server.saveparams+j;
871
872 if (server.dirty >= sp->changes &&
873 now-server.lastsave > sp->seconds) {
874 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
875 sp->changes, sp->seconds);
876 rdbSaveBackground(server.dbfilename);
877 break;
878 }
879 }
880 }
881
882 /* Try to expire a few timed out keys */
883 for (j = 0; j < server.dbnum; j++) {
884 redisDb *db = server.db+j;
885 int num = dictSize(db->expires);
886
887 if (num) {
888 time_t now = time(NULL);
889
890 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
891 num = REDIS_EXPIRELOOKUPS_PER_CRON;
892 while (num--) {
893 dictEntry *de;
894 time_t t;
895
896 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
897 t = (time_t) dictGetEntryVal(de);
898 if (now > t) {
899 deleteKey(db,dictGetEntryKey(de));
900 }
901 }
902 }
903 }
904
905 /* Check if we should connect to a MASTER */
906 if (server.replstate == REDIS_REPL_CONNECT) {
907 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
908 if (syncWithMaster() == REDIS_OK) {
909 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
910 }
911 }
912 return 1000;
913 }
914
915 static void createSharedObjects(void) {
916 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
917 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
918 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
919 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
920 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
921 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
922 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
923 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
924 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
925 /* no such key */
926 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
927 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
928 "-ERR Operation against a key holding the wrong kind of value\r\n"));
929 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
930 "-ERR no such key\r\n"));
931 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
932 "-ERR syntax error\r\n"));
933 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
934 "-ERR source and destination objects are the same\r\n"));
935 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
936 "-ERR index out of range\r\n"));
937 shared.space = createObject(REDIS_STRING,sdsnew(" "));
938 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
939 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
940 shared.select0 = createStringObject("select 0\r\n",10);
941 shared.select1 = createStringObject("select 1\r\n",10);
942 shared.select2 = createStringObject("select 2\r\n",10);
943 shared.select3 = createStringObject("select 3\r\n",10);
944 shared.select4 = createStringObject("select 4\r\n",10);
945 shared.select5 = createStringObject("select 5\r\n",10);
946 shared.select6 = createStringObject("select 6\r\n",10);
947 shared.select7 = createStringObject("select 7\r\n",10);
948 shared.select8 = createStringObject("select 8\r\n",10);
949 shared.select9 = createStringObject("select 9\r\n",10);
950 }
951
952 static void appendServerSaveParams(time_t seconds, int changes) {
953 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
954 if (server.saveparams == NULL) oom("appendServerSaveParams");
955 server.saveparams[server.saveparamslen].seconds = seconds;
956 server.saveparams[server.saveparamslen].changes = changes;
957 server.saveparamslen++;
958 }
959
960 static void ResetServerSaveParams() {
961 zfree(server.saveparams);
962 server.saveparams = NULL;
963 server.saveparamslen = 0;
964 }
965
966 static void initServerConfig() {
967 server.dbnum = REDIS_DEFAULT_DBNUM;
968 server.port = REDIS_SERVERPORT;
969 server.verbosity = REDIS_DEBUG;
970 server.maxidletime = REDIS_MAXIDLETIME;
971 server.saveparams = NULL;
972 server.logfile = NULL; /* NULL = log on standard output */
973 server.bindaddr = NULL;
974 server.glueoutputbuf = 1;
975 server.daemonize = 0;
976 server.pidfile = "/var/run/redis.pid";
977 server.dbfilename = "dump.rdb";
978 server.requirepass = NULL;
979 server.shareobjects = 0;
980 server.sharingpoolsize = 1024;
981 server.maxclients = 0;
982 server.maxmemory = 0;
983 ResetServerSaveParams();
984
985 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
986 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
987 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
988 /* Replication related */
989 server.isslave = 0;
990 server.masterhost = NULL;
991 server.masterport = 6379;
992 server.master = NULL;
993 server.replstate = REDIS_REPL_NONE;
994 }
995
996 static void initServer() {
997 int j;
998
999 signal(SIGHUP, SIG_IGN);
1000 signal(SIGPIPE, SIG_IGN);
1001 setupSigSegvAction();
1002
1003 server.clients = listCreate();
1004 server.slaves = listCreate();
1005 server.monitors = listCreate();
1006 server.objfreelist = listCreate();
1007 createSharedObjects();
1008 server.el = aeCreateEventLoop();
1009 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1010 server.sharingpool = dictCreate(&setDictType,NULL);
1011 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
1012 oom("server initialization"); /* Fatal OOM */
1013 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1014 if (server.fd == -1) {
1015 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1016 exit(1);
1017 }
1018 for (j = 0; j < server.dbnum; j++) {
1019 server.db[j].dict = dictCreate(&hashDictType,NULL);
1020 server.db[j].expires = dictCreate(&setDictType,NULL);
1021 server.db[j].id = j;
1022 }
1023 server.cronloops = 0;
1024 server.bgsaveinprogress = 0;
1025 server.bgsavechildpid = -1;
1026 server.lastsave = time(NULL);
1027 server.dirty = 0;
1028 server.usedmemory = 0;
1029 server.stat_numcommands = 0;
1030 server.stat_numconnections = 0;
1031 server.stat_starttime = time(NULL);
1032 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1033 }
1034
1035 /* Empty the whole database */
1036 static long long emptyDb() {
1037 int j;
1038 long long removed = 0;
1039
1040 for (j = 0; j < server.dbnum; j++) {
1041 removed += dictSize(server.db[j].dict);
1042 dictEmpty(server.db[j].dict);
1043 dictEmpty(server.db[j].expires);
1044 }
1045 return removed;
1046 }
1047
1048 static int yesnotoi(char *s) {
1049 if (!strcasecmp(s,"yes")) return 1;
1050 else if (!strcasecmp(s,"no")) return 0;
1051 else return -1;
1052 }
1053
1054 /* I agree, this is a very rudimental way to load a configuration...
1055 will improve later if the config gets more complex */
1056 static void loadServerConfig(char *filename) {
1057 FILE *fp;
1058 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1059 int linenum = 0;
1060 sds line = NULL;
1061
1062 if (filename[0] == '-' && filename[1] == '\0')
1063 fp = stdin;
1064 else {
1065 if ((fp = fopen(filename,"r")) == NULL) {
1066 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1067 exit(1);
1068 }
1069 }
1070
1071 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1072 sds *argv;
1073 int argc, j;
1074
1075 linenum++;
1076 line = sdsnew(buf);
1077 line = sdstrim(line," \t\r\n");
1078
1079 /* Skip comments and blank lines*/
1080 if (line[0] == '#' || line[0] == '\0') {
1081 sdsfree(line);
1082 continue;
1083 }
1084
1085 /* Split into arguments */
1086 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1087 sdstolower(argv[0]);
1088
1089 /* Execute config directives */
1090 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1091 server.maxidletime = atoi(argv[1]);
1092 if (server.maxidletime < 0) {
1093 err = "Invalid timeout value"; goto loaderr;
1094 }
1095 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1096 server.port = atoi(argv[1]);
1097 if (server.port < 1 || server.port > 65535) {
1098 err = "Invalid port"; goto loaderr;
1099 }
1100 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1101 server.bindaddr = zstrdup(argv[1]);
1102 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1103 int seconds = atoi(argv[1]);
1104 int changes = atoi(argv[2]);
1105 if (seconds < 1 || changes < 0) {
1106 err = "Invalid save parameters"; goto loaderr;
1107 }
1108 appendServerSaveParams(seconds,changes);
1109 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1110 if (chdir(argv[1]) == -1) {
1111 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1112 argv[1], strerror(errno));
1113 exit(1);
1114 }
1115 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1116 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1117 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1118 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1119 else {
1120 err = "Invalid log level. Must be one of debug, notice, warning";
1121 goto loaderr;
1122 }
1123 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1124 FILE *logfp;
1125
1126 server.logfile = zstrdup(argv[1]);
1127 if (!strcasecmp(server.logfile,"stdout")) {
1128 zfree(server.logfile);
1129 server.logfile = NULL;
1130 }
1131 if (server.logfile) {
1132 /* Test if we are able to open the file. The server will not
1133 * be able to abort just for this problem later... */
1134 logfp = fopen(server.logfile,"a");
1135 if (logfp == NULL) {
1136 err = sdscatprintf(sdsempty(),
1137 "Can't open the log file: %s", strerror(errno));
1138 goto loaderr;
1139 }
1140 fclose(logfp);
1141 }
1142 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1143 server.dbnum = atoi(argv[1]);
1144 if (server.dbnum < 1) {
1145 err = "Invalid number of databases"; goto loaderr;
1146 }
1147 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1148 server.maxclients = atoi(argv[1]);
1149 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1150 server.maxmemory = strtoll(argv[1], NULL, 10);
1151 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1152 server.masterhost = sdsnew(argv[1]);
1153 server.masterport = atoi(argv[2]);
1154 server.replstate = REDIS_REPL_CONNECT;
1155 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1156 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1157 err = "argument must be 'yes' or 'no'"; goto loaderr;
1158 }
1159 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1160 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1161 err = "argument must be 'yes' or 'no'"; goto loaderr;
1162 }
1163 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1164 server.sharingpoolsize = atoi(argv[1]);
1165 if (server.sharingpoolsize < 1) {
1166 err = "invalid object sharing pool size"; goto loaderr;
1167 }
1168 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1169 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1170 err = "argument must be 'yes' or 'no'"; goto loaderr;
1171 }
1172 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1173 server.requirepass = zstrdup(argv[1]);
1174 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1175 server.pidfile = zstrdup(argv[1]);
1176 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1177 server.dbfilename = zstrdup(argv[1]);
1178 } else {
1179 err = "Bad directive or wrong number of arguments"; goto loaderr;
1180 }
1181 for (j = 0; j < argc; j++)
1182 sdsfree(argv[j]);
1183 zfree(argv);
1184 sdsfree(line);
1185 }
1186 if (fp != stdin) fclose(fp);
1187 return;
1188
1189 loaderr:
1190 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1191 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1192 fprintf(stderr, ">>> '%s'\n", line);
1193 fprintf(stderr, "%s\n", err);
1194 exit(1);
1195 }
1196
1197 static void freeClientArgv(redisClient *c) {
1198 int j;
1199
1200 for (j = 0; j < c->argc; j++)
1201 decrRefCount(c->argv[j]);
1202 for (j = 0; j < c->mbargc; j++)
1203 decrRefCount(c->mbargv[j]);
1204 c->argc = 0;
1205 c->mbargc = 0;
1206 }
1207
1208 static void freeClient(redisClient *c) {
1209 listNode *ln;
1210
1211 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1212 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1213 sdsfree(c->querybuf);
1214 listRelease(c->reply);
1215 freeClientArgv(c);
1216 close(c->fd);
1217 ln = listSearchKey(server.clients,c);
1218 assert(ln != NULL);
1219 listDelNode(server.clients,ln);
1220 if (c->flags & REDIS_SLAVE) {
1221 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1222 close(c->repldbfd);
1223 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1224 ln = listSearchKey(l,c);
1225 assert(ln != NULL);
1226 listDelNode(l,ln);
1227 }
1228 if (c->flags & REDIS_MASTER) {
1229 server.master = NULL;
1230 server.replstate = REDIS_REPL_CONNECT;
1231 }
1232 zfree(c->argv);
1233 zfree(c->mbargv);
1234 zfree(c);
1235 }
1236
1237 static void glueReplyBuffersIfNeeded(redisClient *c) {
1238 int totlen = 0;
1239 listNode *ln;
1240 robj *o;
1241
1242 listRewind(c->reply);
1243 while((ln = listYield(c->reply))) {
1244 o = ln->value;
1245 totlen += sdslen(o->ptr);
1246 /* This optimization makes more sense if we don't have to copy
1247 * too much data */
1248 if (totlen > 1024) return;
1249 }
1250 if (totlen > 0) {
1251 char buf[1024];
1252 int copylen = 0;
1253
1254 listRewind(c->reply);
1255 while((ln = listYield(c->reply))) {
1256 o = ln->value;
1257 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1258 copylen += sdslen(o->ptr);
1259 listDelNode(c->reply,ln);
1260 }
1261 /* Now the output buffer is empty, add the new single element */
1262 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1263 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1264 }
1265 }
1266
1267 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1268 redisClient *c = privdata;
1269 int nwritten = 0, totwritten = 0, objlen;
1270 robj *o;
1271 REDIS_NOTUSED(el);
1272 REDIS_NOTUSED(mask);
1273
1274 if (server.glueoutputbuf && listLength(c->reply) > 1)
1275 glueReplyBuffersIfNeeded(c);
1276 while(listLength(c->reply)) {
1277 o = listNodeValue(listFirst(c->reply));
1278 objlen = sdslen(o->ptr);
1279
1280 if (objlen == 0) {
1281 listDelNode(c->reply,listFirst(c->reply));
1282 continue;
1283 }
1284
1285 if (c->flags & REDIS_MASTER) {
1286 /* Don't reply to a master */
1287 nwritten = objlen - c->sentlen;
1288 } else {
1289 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1290 if (nwritten <= 0) break;
1291 }
1292 c->sentlen += nwritten;
1293 totwritten += nwritten;
1294 /* If we fully sent the object on head go to the next one */
1295 if (c->sentlen == objlen) {
1296 listDelNode(c->reply,listFirst(c->reply));
1297 c->sentlen = 0;
1298 }
1299 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1300 * bytes, in a single threaded server it's a good idea to server
1301 * other clients as well, even if a very large request comes from
1302 * super fast link that is always able to accept data (in real world
1303 * terms think to 'KEYS *' against the loopback interfae) */
1304 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1305 }
1306 if (nwritten == -1) {
1307 if (errno == EAGAIN) {
1308 nwritten = 0;
1309 } else {
1310 redisLog(REDIS_DEBUG,
1311 "Error writing to client: %s", strerror(errno));
1312 freeClient(c);
1313 return;
1314 }
1315 }
1316 if (totwritten > 0) c->lastinteraction = time(NULL);
1317 if (listLength(c->reply) == 0) {
1318 c->sentlen = 0;
1319 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1320 }
1321 }
1322
1323 static struct redisCommand *lookupCommand(char *name) {
1324 int j = 0;
1325 while(cmdTable[j].name != NULL) {
1326 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1327 j++;
1328 }
1329 return NULL;
1330 }
1331
1332 /* resetClient prepare the client to process the next command */
1333 static void resetClient(redisClient *c) {
1334 freeClientArgv(c);
1335 c->bulklen = -1;
1336 c->multibulk = 0;
1337 }
1338
1339 /* If this function gets called we already read a whole
1340 * command, argments are in the client argv/argc fields.
1341 * processCommand() execute the command or prepare the
1342 * server for a bulk read from the client.
1343 *
1344 * If 1 is returned the client is still alive and valid and
1345 * and other operations can be performed by the caller. Otherwise
1346 * if 0 is returned the client was destroied (i.e. after QUIT). */
1347 static int processCommand(redisClient *c) {
1348 struct redisCommand *cmd;
1349 long long dirty;
1350
1351 /* Free some memory if needed (maxmemory setting) */
1352 if (server.maxmemory) freeMemoryIfNeeded();
1353
1354 /* Handle the multi bulk command type. This is an alternative protocol
1355 * supported by Redis in order to receive commands that are composed of
1356 * multiple binary-safe "bulk" arguments. The latency of processing is
1357 * a bit higher but this allows things like multi-sets, so if this
1358 * protocol is used only for MSET and similar commands this is a big win. */
1359 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1360 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1361 if (c->multibulk <= 0) {
1362 resetClient(c);
1363 return 1;
1364 } else {
1365 decrRefCount(c->argv[c->argc-1]);
1366 c->argc--;
1367 return 1;
1368 }
1369 } else if (c->multibulk) {
1370 if (c->bulklen == -1) {
1371 if (((char*)c->argv[0]->ptr)[0] != '$') {
1372 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1373 resetClient(c);
1374 return 1;
1375 } else {
1376 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1377 decrRefCount(c->argv[0]);
1378 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1379 c->argc--;
1380 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1381 resetClient(c);
1382 return 1;
1383 }
1384 c->argc--;
1385 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1386 return 1;
1387 }
1388 } else {
1389 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1390 c->mbargv[c->mbargc] = c->argv[0];
1391 c->mbargc++;
1392 c->argc--;
1393 c->multibulk--;
1394 if (c->multibulk == 0) {
1395 robj **auxargv;
1396 int auxargc;
1397
1398 /* Here we need to swap the multi-bulk argc/argv with the
1399 * normal argc/argv of the client structure. */
1400 auxargv = c->argv;
1401 c->argv = c->mbargv;
1402 c->mbargv = auxargv;
1403
1404 auxargc = c->argc;
1405 c->argc = c->mbargc;
1406 c->mbargc = auxargc;
1407
1408 /* We need to set bulklen to something different than -1
1409 * in order for the code below to process the command without
1410 * to try to read the last argument of a bulk command as
1411 * a special argument. */
1412 c->bulklen = 0;
1413 /* continue below and process the command */
1414 } else {
1415 c->bulklen = -1;
1416 return 1;
1417 }
1418 }
1419 }
1420 /* -- end of multi bulk commands processing -- */
1421
1422 /* The QUIT command is handled as a special case. Normal command
1423 * procs are unable to close the client connection safely */
1424 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1425 freeClient(c);
1426 return 0;
1427 }
1428 cmd = lookupCommand(c->argv[0]->ptr);
1429 if (!cmd) {
1430 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1431 resetClient(c);
1432 return 1;
1433 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1434 (c->argc < -cmd->arity)) {
1435 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1436 resetClient(c);
1437 return 1;
1438 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1439 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1440 resetClient(c);
1441 return 1;
1442 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1443 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1444
1445 decrRefCount(c->argv[c->argc-1]);
1446 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1447 c->argc--;
1448 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1449 resetClient(c);
1450 return 1;
1451 }
1452 c->argc--;
1453 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1454 /* It is possible that the bulk read is already in the
1455 * buffer. Check this condition and handle it accordingly.
1456 * This is just a fast path, alternative to call processInputBuffer().
1457 * It's a good idea since the code is small and this condition
1458 * happens most of the times. */
1459 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1460 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1461 c->argc++;
1462 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1463 } else {
1464 return 1;
1465 }
1466 }
1467 /* Let's try to share objects on the command arguments vector */
1468 if (server.shareobjects) {
1469 int j;
1470 for(j = 1; j < c->argc; j++)
1471 c->argv[j] = tryObjectSharing(c->argv[j]);
1472 }
1473 /* Let's try to encode the bulk object to save space. */
1474 if (cmd->flags & REDIS_CMD_BULK)
1475 tryObjectEncoding(c->argv[c->argc-1]);
1476
1477 /* Check if the user is authenticated */
1478 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1479 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1480 resetClient(c);
1481 return 1;
1482 }
1483
1484 /* Exec the command */
1485 dirty = server.dirty;
1486 cmd->proc(c);
1487 if (server.dirty-dirty != 0 && listLength(server.slaves))
1488 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1489 if (listLength(server.monitors))
1490 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1491 server.stat_numcommands++;
1492
1493 /* Prepare the client for the next command */
1494 if (c->flags & REDIS_CLOSE) {
1495 freeClient(c);
1496 return 0;
1497 }
1498 resetClient(c);
1499 return 1;
1500 }
1501
1502 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1503 listNode *ln;
1504 int outc = 0, j;
1505 robj **outv;
1506 /* (args*2)+1 is enough room for args, spaces, newlines */
1507 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1508
1509 if (argc <= REDIS_STATIC_ARGS) {
1510 outv = static_outv;
1511 } else {
1512 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1513 if (!outv) oom("replicationFeedSlaves");
1514 }
1515
1516 for (j = 0; j < argc; j++) {
1517 if (j != 0) outv[outc++] = shared.space;
1518 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1519 robj *lenobj;
1520
1521 lenobj = createObject(REDIS_STRING,
1522 sdscatprintf(sdsempty(),"%d\r\n",
1523 stringObjectLen(argv[j])));
1524 lenobj->refcount = 0;
1525 outv[outc++] = lenobj;
1526 }
1527 outv[outc++] = argv[j];
1528 }
1529 outv[outc++] = shared.crlf;
1530
1531 /* Increment all the refcounts at start and decrement at end in order to
1532 * be sure to free objects if there is no slave in a replication state
1533 * able to be feed with commands */
1534 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1535 listRewind(slaves);
1536 while((ln = listYield(slaves))) {
1537 redisClient *slave = ln->value;
1538
1539 /* Don't feed slaves that are still waiting for BGSAVE to start */
1540 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1541
1542 /* Feed all the other slaves, MONITORs and so on */
1543 if (slave->slaveseldb != dictid) {
1544 robj *selectcmd;
1545
1546 switch(dictid) {
1547 case 0: selectcmd = shared.select0; break;
1548 case 1: selectcmd = shared.select1; break;
1549 case 2: selectcmd = shared.select2; break;
1550 case 3: selectcmd = shared.select3; break;
1551 case 4: selectcmd = shared.select4; break;
1552 case 5: selectcmd = shared.select5; break;
1553 case 6: selectcmd = shared.select6; break;
1554 case 7: selectcmd = shared.select7; break;
1555 case 8: selectcmd = shared.select8; break;
1556 case 9: selectcmd = shared.select9; break;
1557 default:
1558 selectcmd = createObject(REDIS_STRING,
1559 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1560 selectcmd->refcount = 0;
1561 break;
1562 }
1563 addReply(slave,selectcmd);
1564 slave->slaveseldb = dictid;
1565 }
1566 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1567 }
1568 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1569 if (outv != static_outv) zfree(outv);
1570 }
1571
1572 static void processInputBuffer(redisClient *c) {
1573 again:
1574 if (c->bulklen == -1) {
1575 /* Read the first line of the query */
1576 char *p = strchr(c->querybuf,'\n');
1577 size_t querylen;
1578
1579 if (p) {
1580 sds query, *argv;
1581 int argc, j;
1582
1583 query = c->querybuf;
1584 c->querybuf = sdsempty();
1585 querylen = 1+(p-(query));
1586 if (sdslen(query) > querylen) {
1587 /* leave data after the first line of the query in the buffer */
1588 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1589 }
1590 *p = '\0'; /* remove "\n" */
1591 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1592 sdsupdatelen(query);
1593
1594 /* Now we can split the query in arguments */
1595 if (sdslen(query) == 0) {
1596 /* Ignore empty query */
1597 sdsfree(query);
1598 return;
1599 }
1600 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1601 if (argv == NULL) oom("sdssplitlen");
1602 sdsfree(query);
1603
1604 if (c->argv) zfree(c->argv);
1605 c->argv = zmalloc(sizeof(robj*)*argc);
1606 if (c->argv == NULL) oom("allocating arguments list for client");
1607
1608 for (j = 0; j < argc; j++) {
1609 if (sdslen(argv[j])) {
1610 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1611 c->argc++;
1612 } else {
1613 sdsfree(argv[j]);
1614 }
1615 }
1616 zfree(argv);
1617 /* Execute the command. If the client is still valid
1618 * after processCommand() return and there is something
1619 * on the query buffer try to process the next command. */
1620 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1621 return;
1622 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1623 redisLog(REDIS_DEBUG, "Client protocol error");
1624 freeClient(c);
1625 return;
1626 }
1627 } else {
1628 /* Bulk read handling. Note that if we are at this point
1629 the client already sent a command terminated with a newline,
1630 we are reading the bulk data that is actually the last
1631 argument of the command. */
1632 int qbl = sdslen(c->querybuf);
1633
1634 if (c->bulklen <= qbl) {
1635 /* Copy everything but the final CRLF as final argument */
1636 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1637 c->argc++;
1638 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1639 /* Process the command. If the client is still valid after
1640 * the processing and there is more data in the buffer
1641 * try to parse it. */
1642 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1643 return;
1644 }
1645 }
1646 }
1647
1648 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1649 redisClient *c = (redisClient*) privdata;
1650 char buf[REDIS_IOBUF_LEN];
1651 int nread;
1652 REDIS_NOTUSED(el);
1653 REDIS_NOTUSED(mask);
1654
1655 nread = read(fd, buf, REDIS_IOBUF_LEN);
1656 if (nread == -1) {
1657 if (errno == EAGAIN) {
1658 nread = 0;
1659 } else {
1660 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1661 freeClient(c);
1662 return;
1663 }
1664 } else if (nread == 0) {
1665 redisLog(REDIS_DEBUG, "Client closed connection");
1666 freeClient(c);
1667 return;
1668 }
1669 if (nread) {
1670 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1671 c->lastinteraction = time(NULL);
1672 } else {
1673 return;
1674 }
1675 processInputBuffer(c);
1676 }
1677
1678 static int selectDb(redisClient *c, int id) {
1679 if (id < 0 || id >= server.dbnum)
1680 return REDIS_ERR;
1681 c->db = &server.db[id];
1682 return REDIS_OK;
1683 }
1684
1685 static void *dupClientReplyValue(void *o) {
1686 incrRefCount((robj*)o);
1687 return 0;
1688 }
1689
1690 static redisClient *createClient(int fd) {
1691 redisClient *c = zmalloc(sizeof(*c));
1692
1693 anetNonBlock(NULL,fd);
1694 anetTcpNoDelay(NULL,fd);
1695 if (!c) return NULL;
1696 selectDb(c,0);
1697 c->fd = fd;
1698 c->querybuf = sdsempty();
1699 c->argc = 0;
1700 c->argv = NULL;
1701 c->bulklen = -1;
1702 c->multibulk = 0;
1703 c->mbargc = 0;
1704 c->mbargv = NULL;
1705 c->sentlen = 0;
1706 c->flags = 0;
1707 c->lastinteraction = time(NULL);
1708 c->authenticated = 0;
1709 c->replstate = REDIS_REPL_NONE;
1710 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1711 listSetFreeMethod(c->reply,decrRefCount);
1712 listSetDupMethod(c->reply,dupClientReplyValue);
1713 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1714 readQueryFromClient, c, NULL) == AE_ERR) {
1715 freeClient(c);
1716 return NULL;
1717 }
1718 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1719 return c;
1720 }
1721
1722 static void addReply(redisClient *c, robj *obj) {
1723 if (listLength(c->reply) == 0 &&
1724 (c->replstate == REDIS_REPL_NONE ||
1725 c->replstate == REDIS_REPL_ONLINE) &&
1726 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1727 sendReplyToClient, c, NULL) == AE_ERR) return;
1728 if (obj->encoding != REDIS_ENCODING_RAW) {
1729 obj = getDecodedObject(obj);
1730 } else {
1731 incrRefCount(obj);
1732 }
1733 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1734 }
1735
1736 static void addReplySds(redisClient *c, sds s) {
1737 robj *o = createObject(REDIS_STRING,s);
1738 addReply(c,o);
1739 decrRefCount(o);
1740 }
1741
1742 static void addReplyBulkLen(redisClient *c, robj *obj) {
1743 size_t len;
1744
1745 if (obj->encoding == REDIS_ENCODING_RAW) {
1746 len = sdslen(obj->ptr);
1747 } else {
1748 long n = (long)obj->ptr;
1749
1750 len = 1;
1751 if (n < 0) {
1752 len++;
1753 n = -n;
1754 }
1755 while((n = n/10) != 0) {
1756 len++;
1757 }
1758 }
1759 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1760 }
1761
1762 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1763 int cport, cfd;
1764 char cip[128];
1765 redisClient *c;
1766 REDIS_NOTUSED(el);
1767 REDIS_NOTUSED(mask);
1768 REDIS_NOTUSED(privdata);
1769
1770 cfd = anetAccept(server.neterr, fd, cip, &cport);
1771 if (cfd == AE_ERR) {
1772 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1773 return;
1774 }
1775 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1776 if ((c = createClient(cfd)) == NULL) {
1777 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1778 close(cfd); /* May be already closed, just ingore errors */
1779 return;
1780 }
1781 /* If maxclient directive is set and this is one client more... close the
1782 * connection. Note that we create the client instead to check before
1783 * for this condition, since now the socket is already set in nonblocking
1784 * mode and we can send an error for free using the Kernel I/O */
1785 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1786 char *err = "-ERR max number of clients reached\r\n";
1787
1788 /* That's a best effort error message, don't check write errors */
1789 (void) write(c->fd,err,strlen(err));
1790 freeClient(c);
1791 return;
1792 }
1793 server.stat_numconnections++;
1794 }
1795
1796 /* ======================= Redis objects implementation ===================== */
1797
1798 static robj *createObject(int type, void *ptr) {
1799 robj *o;
1800
1801 if (listLength(server.objfreelist)) {
1802 listNode *head = listFirst(server.objfreelist);
1803 o = listNodeValue(head);
1804 listDelNode(server.objfreelist,head);
1805 } else {
1806 o = zmalloc(sizeof(*o));
1807 }
1808 if (!o) oom("createObject");
1809 o->type = type;
1810 o->encoding = REDIS_ENCODING_RAW;
1811 o->ptr = ptr;
1812 o->refcount = 1;
1813 return o;
1814 }
1815
1816 static robj *createStringObject(char *ptr, size_t len) {
1817 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1818 }
1819
1820 static robj *createListObject(void) {
1821 list *l = listCreate();
1822
1823 if (!l) oom("listCreate");
1824 listSetFreeMethod(l,decrRefCount);
1825 return createObject(REDIS_LIST,l);
1826 }
1827
1828 static robj *createSetObject(void) {
1829 dict *d = dictCreate(&setDictType,NULL);
1830 if (!d) oom("dictCreate");
1831 return createObject(REDIS_SET,d);
1832 }
1833
1834 static robj *createZsetObject(void) {
1835 dict *d = dictCreate(&zsetDictType,NULL);
1836 if (!d) oom("dictCreate");
1837 return createObject(REDIS_ZSET,d);
1838 }
1839
1840 static void freeStringObject(robj *o) {
1841 if (o->encoding == REDIS_ENCODING_RAW) {
1842 sdsfree(o->ptr);
1843 }
1844 }
1845
1846 static void freeListObject(robj *o) {
1847 listRelease((list*) o->ptr);
1848 }
1849
1850 static void freeSetObject(robj *o) {
1851 dictRelease((dict*) o->ptr);
1852 }
1853
1854 static void freeHashObject(robj *o) {
1855 dictRelease((dict*) o->ptr);
1856 }
1857
1858 static void incrRefCount(robj *o) {
1859 o->refcount++;
1860 #ifdef DEBUG_REFCOUNT
1861 if (o->type == REDIS_STRING)
1862 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1863 #endif
1864 }
1865
1866 static void decrRefCount(void *obj) {
1867 robj *o = obj;
1868
1869 #ifdef DEBUG_REFCOUNT
1870 if (o->type == REDIS_STRING)
1871 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1872 #endif
1873 if (--(o->refcount) == 0) {
1874 switch(o->type) {
1875 case REDIS_STRING: freeStringObject(o); break;
1876 case REDIS_LIST: freeListObject(o); break;
1877 case REDIS_SET: freeSetObject(o); break;
1878 case REDIS_HASH: freeHashObject(o); break;
1879 default: assert(0 != 0); break;
1880 }
1881 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1882 !listAddNodeHead(server.objfreelist,o))
1883 zfree(o);
1884 }
1885 }
1886
1887 static robj *lookupKey(redisDb *db, robj *key) {
1888 dictEntry *de = dictFind(db->dict,key);
1889 return de ? dictGetEntryVal(de) : NULL;
1890 }
1891
1892 static robj *lookupKeyRead(redisDb *db, robj *key) {
1893 expireIfNeeded(db,key);
1894 return lookupKey(db,key);
1895 }
1896
1897 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1898 deleteIfVolatile(db,key);
1899 return lookupKey(db,key);
1900 }
1901
1902 static int deleteKey(redisDb *db, robj *key) {
1903 int retval;
1904
1905 /* We need to protect key from destruction: after the first dictDelete()
1906 * it may happen that 'key' is no longer valid if we don't increment
1907 * it's count. This may happen when we get the object reference directly
1908 * from the hash table with dictRandomKey() or dict iterators */
1909 incrRefCount(key);
1910 if (dictSize(db->expires)) dictDelete(db->expires,key);
1911 retval = dictDelete(db->dict,key);
1912 decrRefCount(key);
1913
1914 return retval == DICT_OK;
1915 }
1916
1917 /* Try to share an object against the shared objects pool */
1918 static robj *tryObjectSharing(robj *o) {
1919 struct dictEntry *de;
1920 unsigned long c;
1921
1922 if (o == NULL || server.shareobjects == 0) return o;
1923
1924 assert(o->type == REDIS_STRING);
1925 de = dictFind(server.sharingpool,o);
1926 if (de) {
1927 robj *shared = dictGetEntryKey(de);
1928
1929 c = ((unsigned long) dictGetEntryVal(de))+1;
1930 dictGetEntryVal(de) = (void*) c;
1931 incrRefCount(shared);
1932 decrRefCount(o);
1933 return shared;
1934 } else {
1935 /* Here we are using a stream algorihtm: Every time an object is
1936 * shared we increment its count, everytime there is a miss we
1937 * recrement the counter of a random object. If this object reaches
1938 * zero we remove the object and put the current object instead. */
1939 if (dictSize(server.sharingpool) >=
1940 server.sharingpoolsize) {
1941 de = dictGetRandomKey(server.sharingpool);
1942 assert(de != NULL);
1943 c = ((unsigned long) dictGetEntryVal(de))-1;
1944 dictGetEntryVal(de) = (void*) c;
1945 if (c == 0) {
1946 dictDelete(server.sharingpool,de->key);
1947 }
1948 } else {
1949 c = 0; /* If the pool is empty we want to add this object */
1950 }
1951 if (c == 0) {
1952 int retval;
1953
1954 retval = dictAdd(server.sharingpool,o,(void*)1);
1955 assert(retval == DICT_OK);
1956 incrRefCount(o);
1957 }
1958 return o;
1959 }
1960 }
1961
1962 /* Check if the nul-terminated string 's' can be represented by a long
1963 * (that is, is a number that fits into long without any other space or
1964 * character before or after the digits).
1965 *
1966 * If so, the function returns REDIS_OK and *longval is set to the value
1967 * of the number. Otherwise REDIS_ERR is returned */
1968 static int isStringRepresentableAsLong(sds s, long *longval) {
1969 char buf[32], *endptr;
1970 long value;
1971 int slen;
1972
1973 value = strtol(s, &endptr, 10);
1974 if (endptr[0] != '\0') return REDIS_ERR;
1975 slen = snprintf(buf,32,"%ld",value);
1976
1977 /* If the number converted back into a string is not identical
1978 * then it's not possible to encode the string as integer */
1979 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
1980 if (longval) *longval = value;
1981 return REDIS_OK;
1982 }
1983
1984 /* Try to encode a string object in order to save space */
1985 static int tryObjectEncoding(robj *o) {
1986 long value;
1987 sds s = o->ptr;
1988
1989 if (o->encoding != REDIS_ENCODING_RAW)
1990 return REDIS_ERR; /* Already encoded */
1991
1992 /* It's not save to encode shared objects: shared objects can be shared
1993 * everywhere in the "object space" of Redis. Encoded objects can only
1994 * appear as "values" (and not, for instance, as keys) */
1995 if (o->refcount > 1) return REDIS_ERR;
1996
1997 /* Currently we try to encode only strings */
1998 assert(o->type == REDIS_STRING);
1999
2000 /* Check if we can represent this string as a long integer */
2001 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2002
2003 /* Ok, this object can be encoded */
2004 o->encoding = REDIS_ENCODING_INT;
2005 sdsfree(o->ptr);
2006 o->ptr = (void*) value;
2007 return REDIS_OK;
2008 }
2009
2010 /* Get a decoded version of an encoded object (returned as a new object) */
2011 static robj *getDecodedObject(const robj *o) {
2012 robj *dec;
2013
2014 assert(o->encoding != REDIS_ENCODING_RAW);
2015 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2016 char buf[32];
2017
2018 snprintf(buf,32,"%ld",(long)o->ptr);
2019 dec = createStringObject(buf,strlen(buf));
2020 return dec;
2021 } else {
2022 assert(1 != 1);
2023 }
2024 }
2025
2026 static int compareStringObjects(robj *a, robj *b) {
2027 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2028
2029 if (a->encoding == REDIS_ENCODING_INT && b->encoding == REDIS_ENCODING_INT){
2030 return (long)a->ptr - (long)b->ptr;
2031 } else {
2032 int retval;
2033
2034 incrRefCount(a);
2035 incrRefCount(b);
2036 if (a->encoding != REDIS_ENCODING_RAW) a = getDecodedObject(a);
2037 if (b->encoding != REDIS_ENCODING_RAW) b = getDecodedObject(a);
2038 retval = sdscmp(a->ptr,b->ptr);
2039 decrRefCount(a);
2040 decrRefCount(b);
2041 return retval;
2042 }
2043 }
2044
2045 static size_t stringObjectLen(robj *o) {
2046 assert(o->type == REDIS_STRING);
2047 if (o->encoding == REDIS_ENCODING_RAW) {
2048 return sdslen(o->ptr);
2049 } else {
2050 char buf[32];
2051
2052 return snprintf(buf,32,"%ld",(long)o->ptr);
2053 }
2054 }
2055
2056 /*============================ DB saving/loading ============================ */
2057
2058 static int rdbSaveType(FILE *fp, unsigned char type) {
2059 if (fwrite(&type,1,1,fp) == 0) return -1;
2060 return 0;
2061 }
2062
2063 static int rdbSaveTime(FILE *fp, time_t t) {
2064 int32_t t32 = (int32_t) t;
2065 if (fwrite(&t32,4,1,fp) == 0) return -1;
2066 return 0;
2067 }
2068
2069 /* check rdbLoadLen() comments for more info */
2070 static int rdbSaveLen(FILE *fp, uint32_t len) {
2071 unsigned char buf[2];
2072
2073 if (len < (1<<6)) {
2074 /* Save a 6 bit len */
2075 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2076 if (fwrite(buf,1,1,fp) == 0) return -1;
2077 } else if (len < (1<<14)) {
2078 /* Save a 14 bit len */
2079 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2080 buf[1] = len&0xFF;
2081 if (fwrite(buf,2,1,fp) == 0) return -1;
2082 } else {
2083 /* Save a 32 bit len */
2084 buf[0] = (REDIS_RDB_32BITLEN<<6);
2085 if (fwrite(buf,1,1,fp) == 0) return -1;
2086 len = htonl(len);
2087 if (fwrite(&len,4,1,fp) == 0) return -1;
2088 }
2089 return 0;
2090 }
2091
2092 /* String objects in the form "2391" "-100" without any space and with a
2093 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2094 * encoded as integers to save space */
2095 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2096 long long value;
2097 char *endptr, buf[32];
2098
2099 /* Check if it's possible to encode this value as a number */
2100 value = strtoll(s, &endptr, 10);
2101 if (endptr[0] != '\0') return 0;
2102 snprintf(buf,32,"%lld",value);
2103
2104 /* If the number converted back into a string is not identical
2105 * then it's not possible to encode the string as integer */
2106 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2107
2108 /* Finally check if it fits in our ranges */
2109 if (value >= -(1<<7) && value <= (1<<7)-1) {
2110 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2111 enc[1] = value&0xFF;
2112 return 2;
2113 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2114 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2115 enc[1] = value&0xFF;
2116 enc[2] = (value>>8)&0xFF;
2117 return 3;
2118 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2119 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2120 enc[1] = value&0xFF;
2121 enc[2] = (value>>8)&0xFF;
2122 enc[3] = (value>>16)&0xFF;
2123 enc[4] = (value>>24)&0xFF;
2124 return 5;
2125 } else {
2126 return 0;
2127 }
2128 }
2129
2130 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2131 unsigned int comprlen, outlen;
2132 unsigned char byte;
2133 void *out;
2134
2135 /* We require at least four bytes compression for this to be worth it */
2136 outlen = sdslen(obj->ptr)-4;
2137 if (outlen <= 0) return 0;
2138 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2139 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2140 if (comprlen == 0) {
2141 zfree(out);
2142 return 0;
2143 }
2144 /* Data compressed! Let's save it on disk */
2145 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2146 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2147 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2148 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2149 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2150 zfree(out);
2151 return comprlen;
2152
2153 writeerr:
2154 zfree(out);
2155 return -1;
2156 }
2157
2158 /* Save a string objet as [len][data] on disk. If the object is a string
2159 * representation of an integer value we try to safe it in a special form */
2160 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2161 size_t len;
2162 int enclen;
2163
2164 len = sdslen(obj->ptr);
2165
2166 /* Try integer encoding */
2167 if (len <= 11) {
2168 unsigned char buf[5];
2169 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2170 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2171 return 0;
2172 }
2173 }
2174
2175 /* Try LZF compression - under 20 bytes it's unable to compress even
2176 * aaaaaaaaaaaaaaaaaa so skip it */
2177 if (len > 20) {
2178 int retval;
2179
2180 retval = rdbSaveLzfStringObject(fp,obj);
2181 if (retval == -1) return -1;
2182 if (retval > 0) return 0;
2183 /* retval == 0 means data can't be compressed, save the old way */
2184 }
2185
2186 /* Store verbatim */
2187 if (rdbSaveLen(fp,len) == -1) return -1;
2188 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2189 return 0;
2190 }
2191
2192 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2193 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2194 int retval;
2195 robj *dec;
2196
2197 if (obj->encoding != REDIS_ENCODING_RAW) {
2198 dec = getDecodedObject(obj);
2199 retval = rdbSaveStringObjectRaw(fp,dec);
2200 decrRefCount(dec);
2201 return retval;
2202 } else {
2203 return rdbSaveStringObjectRaw(fp,obj);
2204 }
2205 }
2206
2207 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2208 static int rdbSave(char *filename) {
2209 dictIterator *di = NULL;
2210 dictEntry *de;
2211 FILE *fp;
2212 char tmpfile[256];
2213 int j;
2214 time_t now = time(NULL);
2215
2216 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2217 fp = fopen(tmpfile,"w");
2218 if (!fp) {
2219 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2220 return REDIS_ERR;
2221 }
2222 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2223 for (j = 0; j < server.dbnum; j++) {
2224 redisDb *db = server.db+j;
2225 dict *d = db->dict;
2226 if (dictSize(d) == 0) continue;
2227 di = dictGetIterator(d);
2228 if (!di) {
2229 fclose(fp);
2230 return REDIS_ERR;
2231 }
2232
2233 /* Write the SELECT DB opcode */
2234 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2235 if (rdbSaveLen(fp,j) == -1) goto werr;
2236
2237 /* Iterate this DB writing every entry */
2238 while((de = dictNext(di)) != NULL) {
2239 robj *key = dictGetEntryKey(de);
2240 robj *o = dictGetEntryVal(de);
2241 time_t expiretime = getExpire(db,key);
2242
2243 /* Save the expire time */
2244 if (expiretime != -1) {
2245 /* If this key is already expired skip it */
2246 if (expiretime < now) continue;
2247 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2248 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2249 }
2250 /* Save the key and associated value */
2251 if (rdbSaveType(fp,o->type) == -1) goto werr;
2252 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2253 if (o->type == REDIS_STRING) {
2254 /* Save a string value */
2255 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2256 } else if (o->type == REDIS_LIST) {
2257 /* Save a list value */
2258 list *list = o->ptr;
2259 listNode *ln;
2260
2261 listRewind(list);
2262 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2263 while((ln = listYield(list))) {
2264 robj *eleobj = listNodeValue(ln);
2265
2266 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2267 }
2268 } else if (o->type == REDIS_SET) {
2269 /* Save a set value */
2270 dict *set = o->ptr;
2271 dictIterator *di = dictGetIterator(set);
2272 dictEntry *de;
2273
2274 if (!set) oom("dictGetIteraotr");
2275 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2276 while((de = dictNext(di)) != NULL) {
2277 robj *eleobj = dictGetEntryKey(de);
2278
2279 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2280 }
2281 dictReleaseIterator(di);
2282 } else {
2283 assert(0 != 0);
2284 }
2285 }
2286 dictReleaseIterator(di);
2287 }
2288 /* EOF opcode */
2289 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2290
2291 /* Make sure data will not remain on the OS's output buffers */
2292 fflush(fp);
2293 fsync(fileno(fp));
2294 fclose(fp);
2295
2296 /* Use RENAME to make sure the DB file is changed atomically only
2297 * if the generate DB file is ok. */
2298 if (rename(tmpfile,filename) == -1) {
2299 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
2300 unlink(tmpfile);
2301 return REDIS_ERR;
2302 }
2303 redisLog(REDIS_NOTICE,"DB saved on disk");
2304 server.dirty = 0;
2305 server.lastsave = time(NULL);
2306 return REDIS_OK;
2307
2308 werr:
2309 fclose(fp);
2310 unlink(tmpfile);
2311 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2312 if (di) dictReleaseIterator(di);
2313 return REDIS_ERR;
2314 }
2315
2316 static int rdbSaveBackground(char *filename) {
2317 pid_t childpid;
2318
2319 if (server.bgsaveinprogress) return REDIS_ERR;
2320 if ((childpid = fork()) == 0) {
2321 /* Child */
2322 close(server.fd);
2323 if (rdbSave(filename) == REDIS_OK) {
2324 exit(0);
2325 } else {
2326 exit(1);
2327 }
2328 } else {
2329 /* Parent */
2330 if (childpid == -1) {
2331 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2332 strerror(errno));
2333 return REDIS_ERR;
2334 }
2335 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2336 server.bgsaveinprogress = 1;
2337 server.bgsavechildpid = childpid;
2338 return REDIS_OK;
2339 }
2340 return REDIS_OK; /* unreached */
2341 }
2342
2343 static void rdbRemoveTempFile(pid_t childpid) {
2344 char tmpfile[256];
2345
2346 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2347 unlink(tmpfile);
2348 }
2349
2350 static int rdbLoadType(FILE *fp) {
2351 unsigned char type;
2352 if (fread(&type,1,1,fp) == 0) return -1;
2353 return type;
2354 }
2355
2356 static time_t rdbLoadTime(FILE *fp) {
2357 int32_t t32;
2358 if (fread(&t32,4,1,fp) == 0) return -1;
2359 return (time_t) t32;
2360 }
2361
2362 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2363 * of this file for a description of how this are stored on disk.
2364 *
2365 * isencoded is set to 1 if the readed length is not actually a length but
2366 * an "encoding type", check the above comments for more info */
2367 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2368 unsigned char buf[2];
2369 uint32_t len;
2370
2371 if (isencoded) *isencoded = 0;
2372 if (rdbver == 0) {
2373 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2374 return ntohl(len);
2375 } else {
2376 int type;
2377
2378 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2379 type = (buf[0]&0xC0)>>6;
2380 if (type == REDIS_RDB_6BITLEN) {
2381 /* Read a 6 bit len */
2382 return buf[0]&0x3F;
2383 } else if (type == REDIS_RDB_ENCVAL) {
2384 /* Read a 6 bit len encoding type */
2385 if (isencoded) *isencoded = 1;
2386 return buf[0]&0x3F;
2387 } else if (type == REDIS_RDB_14BITLEN) {
2388 /* Read a 14 bit len */
2389 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2390 return ((buf[0]&0x3F)<<8)|buf[1];
2391 } else {
2392 /* Read a 32 bit len */
2393 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2394 return ntohl(len);
2395 }
2396 }
2397 }
2398
2399 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2400 unsigned char enc[4];
2401 long long val;
2402
2403 if (enctype == REDIS_RDB_ENC_INT8) {
2404 if (fread(enc,1,1,fp) == 0) return NULL;
2405 val = (signed char)enc[0];
2406 } else if (enctype == REDIS_RDB_ENC_INT16) {
2407 uint16_t v;
2408 if (fread(enc,2,1,fp) == 0) return NULL;
2409 v = enc[0]|(enc[1]<<8);
2410 val = (int16_t)v;
2411 } else if (enctype == REDIS_RDB_ENC_INT32) {
2412 uint32_t v;
2413 if (fread(enc,4,1,fp) == 0) return NULL;
2414 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2415 val = (int32_t)v;
2416 } else {
2417 val = 0; /* anti-warning */
2418 assert(0!=0);
2419 }
2420 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2421 }
2422
2423 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2424 unsigned int len, clen;
2425 unsigned char *c = NULL;
2426 sds val = NULL;
2427
2428 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2429 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2430 if ((c = zmalloc(clen)) == NULL) goto err;
2431 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2432 if (fread(c,clen,1,fp) == 0) goto err;
2433 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2434 zfree(c);
2435 return createObject(REDIS_STRING,val);
2436 err:
2437 zfree(c);
2438 sdsfree(val);
2439 return NULL;
2440 }
2441
2442 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2443 int isencoded;
2444 uint32_t len;
2445 sds val;
2446
2447 len = rdbLoadLen(fp,rdbver,&isencoded);
2448 if (isencoded) {
2449 switch(len) {
2450 case REDIS_RDB_ENC_INT8:
2451 case REDIS_RDB_ENC_INT16:
2452 case REDIS_RDB_ENC_INT32:
2453 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2454 case REDIS_RDB_ENC_LZF:
2455 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2456 default:
2457 assert(0!=0);
2458 }
2459 }
2460
2461 if (len == REDIS_RDB_LENERR) return NULL;
2462 val = sdsnewlen(NULL,len);
2463 if (len && fread(val,len,1,fp) == 0) {
2464 sdsfree(val);
2465 return NULL;
2466 }
2467 return tryObjectSharing(createObject(REDIS_STRING,val));
2468 }
2469
2470 static int rdbLoad(char *filename) {
2471 FILE *fp;
2472 robj *keyobj = NULL;
2473 uint32_t dbid;
2474 int type, retval, rdbver;
2475 dict *d = server.db[0].dict;
2476 redisDb *db = server.db+0;
2477 char buf[1024];
2478 time_t expiretime = -1, now = time(NULL);
2479
2480 fp = fopen(filename,"r");
2481 if (!fp) return REDIS_ERR;
2482 if (fread(buf,9,1,fp) == 0) goto eoferr;
2483 buf[9] = '\0';
2484 if (memcmp(buf,"REDIS",5) != 0) {
2485 fclose(fp);
2486 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2487 return REDIS_ERR;
2488 }
2489 rdbver = atoi(buf+5);
2490 if (rdbver > 1) {
2491 fclose(fp);
2492 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2493 return REDIS_ERR;
2494 }
2495 while(1) {
2496 robj *o;
2497
2498 /* Read type. */
2499 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2500 if (type == REDIS_EXPIRETIME) {
2501 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2502 /* We read the time so we need to read the object type again */
2503 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2504 }
2505 if (type == REDIS_EOF) break;
2506 /* Handle SELECT DB opcode as a special case */
2507 if (type == REDIS_SELECTDB) {
2508 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2509 goto eoferr;
2510 if (dbid >= (unsigned)server.dbnum) {
2511 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2512 exit(1);
2513 }
2514 db = server.db+dbid;
2515 d = db->dict;
2516 continue;
2517 }
2518 /* Read key */
2519 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2520
2521 if (type == REDIS_STRING) {
2522 /* Read string value */
2523 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2524 tryObjectEncoding(o);
2525 } else if (type == REDIS_LIST || type == REDIS_SET) {
2526 /* Read list/set value */
2527 uint32_t listlen;
2528
2529 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2530 goto eoferr;
2531 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2532 /* Load every single element of the list/set */
2533 while(listlen--) {
2534 robj *ele;
2535
2536 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2537 tryObjectEncoding(ele);
2538 if (type == REDIS_LIST) {
2539 if (!listAddNodeTail((list*)o->ptr,ele))
2540 oom("listAddNodeTail");
2541 } else {
2542 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2543 oom("dictAdd");
2544 }
2545 }
2546 } else {
2547 assert(0 != 0);
2548 }
2549 /* Add the new object in the hash table */
2550 retval = dictAdd(d,keyobj,o);
2551 if (retval == DICT_ERR) {
2552 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2553 exit(1);
2554 }
2555 /* Set the expire time if needed */
2556 if (expiretime != -1) {
2557 setExpire(db,keyobj,expiretime);
2558 /* Delete this key if already expired */
2559 if (expiretime < now) deleteKey(db,keyobj);
2560 expiretime = -1;
2561 }
2562 keyobj = o = NULL;
2563 }
2564 fclose(fp);
2565 return REDIS_OK;
2566
2567 eoferr: /* unexpected end of file is handled here with a fatal exit */
2568 if (keyobj) decrRefCount(keyobj);
2569 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2570 exit(1);
2571 return REDIS_ERR; /* Just to avoid warning */
2572 }
2573
2574 /*================================== Commands =============================== */
2575
2576 static void authCommand(redisClient *c) {
2577 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2578 c->authenticated = 1;
2579 addReply(c,shared.ok);
2580 } else {
2581 c->authenticated = 0;
2582 addReply(c,shared.err);
2583 }
2584 }
2585
2586 static void pingCommand(redisClient *c) {
2587 addReply(c,shared.pong);
2588 }
2589
2590 static void echoCommand(redisClient *c) {
2591 addReplyBulkLen(c,c->argv[1]);
2592 addReply(c,c->argv[1]);
2593 addReply(c,shared.crlf);
2594 }
2595
2596 /*=================================== Strings =============================== */
2597
2598 static void setGenericCommand(redisClient *c, int nx) {
2599 int retval;
2600
2601 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2602 if (retval == DICT_ERR) {
2603 if (!nx) {
2604 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2605 incrRefCount(c->argv[2]);
2606 } else {
2607 addReply(c,shared.czero);
2608 return;
2609 }
2610 } else {
2611 incrRefCount(c->argv[1]);
2612 incrRefCount(c->argv[2]);
2613 }
2614 server.dirty++;
2615 removeExpire(c->db,c->argv[1]);
2616 addReply(c, nx ? shared.cone : shared.ok);
2617 }
2618
2619 static void setCommand(redisClient *c) {
2620 setGenericCommand(c,0);
2621 }
2622
2623 static void setnxCommand(redisClient *c) {
2624 setGenericCommand(c,1);
2625 }
2626
2627 static void getCommand(redisClient *c) {
2628 robj *o = lookupKeyRead(c->db,c->argv[1]);
2629
2630 if (o == NULL) {
2631 addReply(c,shared.nullbulk);
2632 } else {
2633 if (o->type != REDIS_STRING) {
2634 addReply(c,shared.wrongtypeerr);
2635 } else {
2636 addReplyBulkLen(c,o);
2637 addReply(c,o);
2638 addReply(c,shared.crlf);
2639 }
2640 }
2641 }
2642
2643 static void getsetCommand(redisClient *c) {
2644 getCommand(c);
2645 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2646 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2647 } else {
2648 incrRefCount(c->argv[1]);
2649 }
2650 incrRefCount(c->argv[2]);
2651 server.dirty++;
2652 removeExpire(c->db,c->argv[1]);
2653 }
2654
2655 static void mgetCommand(redisClient *c) {
2656 int j;
2657
2658 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2659 for (j = 1; j < c->argc; j++) {
2660 robj *o = lookupKeyRead(c->db,c->argv[j]);
2661 if (o == NULL) {
2662 addReply(c,shared.nullbulk);
2663 } else {
2664 if (o->type != REDIS_STRING) {
2665 addReply(c,shared.nullbulk);
2666 } else {
2667 addReplyBulkLen(c,o);
2668 addReply(c,o);
2669 addReply(c,shared.crlf);
2670 }
2671 }
2672 }
2673 }
2674
2675 static void incrDecrCommand(redisClient *c, long long incr) {
2676 long long value;
2677 int retval;
2678 robj *o;
2679
2680 o = lookupKeyWrite(c->db,c->argv[1]);
2681 if (o == NULL) {
2682 value = 0;
2683 } else {
2684 if (o->type != REDIS_STRING) {
2685 value = 0;
2686 } else {
2687 char *eptr;
2688
2689 if (o->encoding == REDIS_ENCODING_RAW)
2690 value = strtoll(o->ptr, &eptr, 10);
2691 else if (o->encoding == REDIS_ENCODING_INT)
2692 value = (long)o->ptr;
2693 else
2694 assert(1 != 1);
2695 }
2696 }
2697
2698 value += incr;
2699 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2700 tryObjectEncoding(o);
2701 retval = dictAdd(c->db->dict,c->argv[1],o);
2702 if (retval == DICT_ERR) {
2703 dictReplace(c->db->dict,c->argv[1],o);
2704 removeExpire(c->db,c->argv[1]);
2705 } else {
2706 incrRefCount(c->argv[1]);
2707 }
2708 server.dirty++;
2709 addReply(c,shared.colon);
2710 addReply(c,o);
2711 addReply(c,shared.crlf);
2712 }
2713
2714 static void incrCommand(redisClient *c) {
2715 incrDecrCommand(c,1);
2716 }
2717
2718 static void decrCommand(redisClient *c) {
2719 incrDecrCommand(c,-1);
2720 }
2721
2722 static void incrbyCommand(redisClient *c) {
2723 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2724 incrDecrCommand(c,incr);
2725 }
2726
2727 static void decrbyCommand(redisClient *c) {
2728 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2729 incrDecrCommand(c,-incr);
2730 }
2731
2732 /* ========================= Type agnostic commands ========================= */
2733
2734 static void delCommand(redisClient *c) {
2735 int deleted = 0, j;
2736
2737 for (j = 1; j < c->argc; j++) {
2738 if (deleteKey(c->db,c->argv[j])) {
2739 server.dirty++;
2740 deleted++;
2741 }
2742 }
2743 switch(deleted) {
2744 case 0:
2745 addReply(c,shared.czero);
2746 break;
2747 case 1:
2748 addReply(c,shared.cone);
2749 break;
2750 default:
2751 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2752 break;
2753 }
2754 }
2755
2756 static void existsCommand(redisClient *c) {
2757 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2758 }
2759
2760 static void selectCommand(redisClient *c) {
2761 int id = atoi(c->argv[1]->ptr);
2762
2763 if (selectDb(c,id) == REDIS_ERR) {
2764 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2765 } else {
2766 addReply(c,shared.ok);
2767 }
2768 }
2769
2770 static void randomkeyCommand(redisClient *c) {
2771 dictEntry *de;
2772
2773 while(1) {
2774 de = dictGetRandomKey(c->db->dict);
2775 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2776 }
2777 if (de == NULL) {
2778 addReply(c,shared.plus);
2779 addReply(c,shared.crlf);
2780 } else {
2781 addReply(c,shared.plus);
2782 addReply(c,dictGetEntryKey(de));
2783 addReply(c,shared.crlf);
2784 }
2785 }
2786
2787 static void keysCommand(redisClient *c) {
2788 dictIterator *di;
2789 dictEntry *de;
2790 sds pattern = c->argv[1]->ptr;
2791 int plen = sdslen(pattern);
2792 int numkeys = 0, keyslen = 0;
2793 robj *lenobj = createObject(REDIS_STRING,NULL);
2794
2795 di = dictGetIterator(c->db->dict);
2796 if (!di) oom("dictGetIterator");
2797 addReply(c,lenobj);
2798 decrRefCount(lenobj);
2799 while((de = dictNext(di)) != NULL) {
2800 robj *keyobj = dictGetEntryKey(de);
2801
2802 sds key = keyobj->ptr;
2803 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2804 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2805 if (expireIfNeeded(c->db,keyobj) == 0) {
2806 if (numkeys != 0)
2807 addReply(c,shared.space);
2808 addReply(c,keyobj);
2809 numkeys++;
2810 keyslen += sdslen(key);
2811 }
2812 }
2813 }
2814 dictReleaseIterator(di);
2815 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2816 addReply(c,shared.crlf);
2817 }
2818
2819 static void dbsizeCommand(redisClient *c) {
2820 addReplySds(c,
2821 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2822 }
2823
2824 static void lastsaveCommand(redisClient *c) {
2825 addReplySds(c,
2826 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2827 }
2828
2829 static void typeCommand(redisClient *c) {
2830 robj *o;
2831 char *type;
2832
2833 o = lookupKeyRead(c->db,c->argv[1]);
2834 if (o == NULL) {
2835 type = "+none";
2836 } else {
2837 switch(o->type) {
2838 case REDIS_STRING: type = "+string"; break;
2839 case REDIS_LIST: type = "+list"; break;
2840 case REDIS_SET: type = "+set"; break;
2841 default: type = "unknown"; break;
2842 }
2843 }
2844 addReplySds(c,sdsnew(type));
2845 addReply(c,shared.crlf);
2846 }
2847
2848 static void saveCommand(redisClient *c) {
2849 if (server.bgsaveinprogress) {
2850 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2851 return;
2852 }
2853 if (rdbSave(server.dbfilename) == REDIS_OK) {
2854 addReply(c,shared.ok);
2855 } else {
2856 addReply(c,shared.err);
2857 }
2858 }
2859
2860 static void bgsaveCommand(redisClient *c) {
2861 if (server.bgsaveinprogress) {
2862 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2863 return;
2864 }
2865 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2866 addReply(c,shared.ok);
2867 } else {
2868 addReply(c,shared.err);
2869 }
2870 }
2871
2872 static void shutdownCommand(redisClient *c) {
2873 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2874 /* Kill the saving child if there is a background saving in progress.
2875 We want to avoid race conditions, for instance our saving child may
2876 overwrite the synchronous saving did by SHUTDOWN. */
2877 if (server.bgsaveinprogress) {
2878 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2879 kill(server.bgsavechildpid,SIGKILL);
2880 rdbRemoveTempFile(server.bgsavechildpid);
2881 }
2882 /* SYNC SAVE */
2883 if (rdbSave(server.dbfilename) == REDIS_OK) {
2884 if (server.daemonize)
2885 unlink(server.pidfile);
2886 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2887 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2888 exit(1);
2889 } else {
2890 /* Ooops.. error saving! The best we can do is to continue operating.
2891 * Note that if there was a background saving process, in the next
2892 * cron() Redis will be notified that the background saving aborted,
2893 * handling special stuff like slaves pending for synchronization... */
2894 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2895 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2896 }
2897 }
2898
2899 static void renameGenericCommand(redisClient *c, int nx) {
2900 robj *o;
2901
2902 /* To use the same key as src and dst is probably an error */
2903 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2904 addReply(c,shared.sameobjecterr);
2905 return;
2906 }
2907
2908 o = lookupKeyWrite(c->db,c->argv[1]);
2909 if (o == NULL) {
2910 addReply(c,shared.nokeyerr);
2911 return;
2912 }
2913 incrRefCount(o);
2914 deleteIfVolatile(c->db,c->argv[2]);
2915 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2916 if (nx) {
2917 decrRefCount(o);
2918 addReply(c,shared.czero);
2919 return;
2920 }
2921 dictReplace(c->db->dict,c->argv[2],o);
2922 } else {
2923 incrRefCount(c->argv[2]);
2924 }
2925 deleteKey(c->db,c->argv[1]);
2926 server.dirty++;
2927 addReply(c,nx ? shared.cone : shared.ok);
2928 }
2929
2930 static void renameCommand(redisClient *c) {
2931 renameGenericCommand(c,0);
2932 }
2933
2934 static void renamenxCommand(redisClient *c) {
2935 renameGenericCommand(c,1);
2936 }
2937
2938 static void moveCommand(redisClient *c) {
2939 robj *o;
2940 redisDb *src, *dst;
2941 int srcid;
2942
2943 /* Obtain source and target DB pointers */
2944 src = c->db;
2945 srcid = c->db->id;
2946 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2947 addReply(c,shared.outofrangeerr);
2948 return;
2949 }
2950 dst = c->db;
2951 selectDb(c,srcid); /* Back to the source DB */
2952
2953 /* If the user is moving using as target the same
2954 * DB as the source DB it is probably an error. */
2955 if (src == dst) {
2956 addReply(c,shared.sameobjecterr);
2957 return;
2958 }
2959
2960 /* Check if the element exists and get a reference */
2961 o = lookupKeyWrite(c->db,c->argv[1]);
2962 if (!o) {
2963 addReply(c,shared.czero);
2964 return;
2965 }
2966
2967 /* Try to add the element to the target DB */
2968 deleteIfVolatile(dst,c->argv[1]);
2969 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2970 addReply(c,shared.czero);
2971 return;
2972 }
2973 incrRefCount(c->argv[1]);
2974 incrRefCount(o);
2975
2976 /* OK! key moved, free the entry in the source DB */
2977 deleteKey(src,c->argv[1]);
2978 server.dirty++;
2979 addReply(c,shared.cone);
2980 }
2981
2982 /* =================================== Lists ================================ */
2983 static void pushGenericCommand(redisClient *c, int where) {
2984 robj *lobj;
2985 list *list;
2986
2987 lobj = lookupKeyWrite(c->db,c->argv[1]);
2988 if (lobj == NULL) {
2989 lobj = createListObject();
2990 list = lobj->ptr;
2991 if (where == REDIS_HEAD) {
2992 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2993 } else {
2994 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2995 }
2996 dictAdd(c->db->dict,c->argv[1],lobj);
2997 incrRefCount(c->argv[1]);
2998 incrRefCount(c->argv[2]);
2999 } else {
3000 if (lobj->type != REDIS_LIST) {
3001 addReply(c,shared.wrongtypeerr);
3002 return;
3003 }
3004 list = lobj->ptr;
3005 if (where == REDIS_HEAD) {
3006 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
3007 } else {
3008 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
3009 }
3010 incrRefCount(c->argv[2]);
3011 }
3012 server.dirty++;
3013 addReply(c,shared.ok);
3014 }
3015
3016 static void lpushCommand(redisClient *c) {
3017 pushGenericCommand(c,REDIS_HEAD);
3018 }
3019
3020 static void rpushCommand(redisClient *c) {
3021 pushGenericCommand(c,REDIS_TAIL);
3022 }
3023
3024 static void llenCommand(redisClient *c) {
3025 robj *o;
3026 list *l;
3027
3028 o = lookupKeyRead(c->db,c->argv[1]);
3029 if (o == NULL) {
3030 addReply(c,shared.czero);
3031 return;
3032 } else {
3033 if (o->type != REDIS_LIST) {
3034 addReply(c,shared.wrongtypeerr);
3035 } else {
3036 l = o->ptr;
3037 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3038 }
3039 }
3040 }
3041
3042 static void lindexCommand(redisClient *c) {
3043 robj *o;
3044 int index = atoi(c->argv[2]->ptr);
3045
3046 o = lookupKeyRead(c->db,c->argv[1]);
3047 if (o == NULL) {
3048 addReply(c,shared.nullbulk);
3049 } else {
3050 if (o->type != REDIS_LIST) {
3051 addReply(c,shared.wrongtypeerr);
3052 } else {
3053 list *list = o->ptr;
3054 listNode *ln;
3055
3056 ln = listIndex(list, index);
3057 if (ln == NULL) {
3058 addReply(c,shared.nullbulk);
3059 } else {
3060 robj *ele = listNodeValue(ln);
3061 addReplyBulkLen(c,ele);
3062 addReply(c,ele);
3063 addReply(c,shared.crlf);
3064 }
3065 }
3066 }
3067 }
3068
3069 static void lsetCommand(redisClient *c) {
3070 robj *o;
3071 int index = atoi(c->argv[2]->ptr);
3072
3073 o = lookupKeyWrite(c->db,c->argv[1]);
3074 if (o == NULL) {
3075 addReply(c,shared.nokeyerr);
3076 } else {
3077 if (o->type != REDIS_LIST) {
3078 addReply(c,shared.wrongtypeerr);
3079 } else {
3080 list *list = o->ptr;
3081 listNode *ln;
3082
3083 ln = listIndex(list, index);
3084 if (ln == NULL) {
3085 addReply(c,shared.outofrangeerr);
3086 } else {
3087 robj *ele = listNodeValue(ln);
3088
3089 decrRefCount(ele);
3090 listNodeValue(ln) = c->argv[3];
3091 incrRefCount(c->argv[3]);
3092 addReply(c,shared.ok);
3093 server.dirty++;
3094 }
3095 }
3096 }
3097 }
3098
3099 static void popGenericCommand(redisClient *c, int where) {
3100 robj *o;
3101
3102 o = lookupKeyWrite(c->db,c->argv[1]);
3103 if (o == NULL) {
3104 addReply(c,shared.nullbulk);
3105 } else {
3106 if (o->type != REDIS_LIST) {
3107 addReply(c,shared.wrongtypeerr);
3108 } else {
3109 list *list = o->ptr;
3110 listNode *ln;
3111
3112 if (where == REDIS_HEAD)
3113 ln = listFirst(list);
3114 else
3115 ln = listLast(list);
3116
3117 if (ln == NULL) {
3118 addReply(c,shared.nullbulk);
3119 } else {
3120 robj *ele = listNodeValue(ln);
3121 addReplyBulkLen(c,ele);
3122 addReply(c,ele);
3123 addReply(c,shared.crlf);
3124 listDelNode(list,ln);
3125 server.dirty++;
3126 }
3127 }
3128 }
3129 }
3130
3131 static void lpopCommand(redisClient *c) {
3132 popGenericCommand(c,REDIS_HEAD);
3133 }
3134
3135 static void rpopCommand(redisClient *c) {
3136 popGenericCommand(c,REDIS_TAIL);
3137 }
3138
3139 static void lrangeCommand(redisClient *c) {
3140 robj *o;
3141 int start = atoi(c->argv[2]->ptr);
3142 int end = atoi(c->argv[3]->ptr);
3143
3144 o = lookupKeyRead(c->db,c->argv[1]);
3145 if (o == NULL) {
3146 addReply(c,shared.nullmultibulk);
3147 } else {
3148 if (o->type != REDIS_LIST) {
3149 addReply(c,shared.wrongtypeerr);
3150 } else {
3151 list *list = o->ptr;
3152 listNode *ln;
3153 int llen = listLength(list);
3154 int rangelen, j;
3155 robj *ele;
3156
3157 /* convert negative indexes */
3158 if (start < 0) start = llen+start;
3159 if (end < 0) end = llen+end;
3160 if (start < 0) start = 0;
3161 if (end < 0) end = 0;
3162
3163 /* indexes sanity checks */
3164 if (start > end || start >= llen) {
3165 /* Out of range start or start > end result in empty list */
3166 addReply(c,shared.emptymultibulk);
3167 return;
3168 }
3169 if (end >= llen) end = llen-1;
3170 rangelen = (end-start)+1;
3171
3172 /* Return the result in form of a multi-bulk reply */
3173 ln = listIndex(list, start);
3174 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3175 for (j = 0; j < rangelen; j++) {
3176 ele = listNodeValue(ln);
3177 addReplyBulkLen(c,ele);
3178 addReply(c,ele);
3179 addReply(c,shared.crlf);
3180 ln = ln->next;
3181 }
3182 }
3183 }
3184 }
3185
3186 static void ltrimCommand(redisClient *c) {
3187 robj *o;
3188 int start = atoi(c->argv[2]->ptr);
3189 int end = atoi(c->argv[3]->ptr);
3190
3191 o = lookupKeyWrite(c->db,c->argv[1]);
3192 if (o == NULL) {
3193 addReply(c,shared.nokeyerr);
3194 } else {
3195 if (o->type != REDIS_LIST) {
3196 addReply(c,shared.wrongtypeerr);
3197 } else {
3198 list *list = o->ptr;
3199 listNode *ln;
3200 int llen = listLength(list);
3201 int j, ltrim, rtrim;
3202
3203 /* convert negative indexes */
3204 if (start < 0) start = llen+start;
3205 if (end < 0) end = llen+end;
3206 if (start < 0) start = 0;
3207 if (end < 0) end = 0;
3208
3209 /* indexes sanity checks */
3210 if (start > end || start >= llen) {
3211 /* Out of range start or start > end result in empty list */
3212 ltrim = llen;
3213 rtrim = 0;
3214 } else {
3215 if (end >= llen) end = llen-1;
3216 ltrim = start;
3217 rtrim = llen-end-1;
3218 }
3219
3220 /* Remove list elements to perform the trim */
3221 for (j = 0; j < ltrim; j++) {
3222 ln = listFirst(list);
3223 listDelNode(list,ln);
3224 }
3225 for (j = 0; j < rtrim; j++) {
3226 ln = listLast(list);
3227 listDelNode(list,ln);
3228 }
3229 server.dirty++;
3230 addReply(c,shared.ok);
3231 }
3232 }
3233 }
3234
3235 static void lremCommand(redisClient *c) {
3236 robj *o;
3237
3238 o = lookupKeyWrite(c->db,c->argv[1]);
3239 if (o == NULL) {
3240 addReply(c,shared.czero);
3241 } else {
3242 if (o->type != REDIS_LIST) {
3243 addReply(c,shared.wrongtypeerr);
3244 } else {
3245 list *list = o->ptr;
3246 listNode *ln, *next;
3247 int toremove = atoi(c->argv[2]->ptr);
3248 int removed = 0;
3249 int fromtail = 0;
3250
3251 if (toremove < 0) {
3252 toremove = -toremove;
3253 fromtail = 1;
3254 }
3255 ln = fromtail ? list->tail : list->head;
3256 while (ln) {
3257 robj *ele = listNodeValue(ln);
3258
3259 next = fromtail ? ln->prev : ln->next;
3260 if (compareStringObjects(ele,c->argv[3]) == 0) {
3261 listDelNode(list,ln);
3262 server.dirty++;
3263 removed++;
3264 if (toremove && removed == toremove) break;
3265 }
3266 ln = next;
3267 }
3268 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3269 }
3270 }
3271 }
3272
3273 /* ==================================== Sets ================================ */
3274
3275 static void saddCommand(redisClient *c) {
3276 robj *set;
3277
3278 set = lookupKeyWrite(c->db,c->argv[1]);
3279 if (set == NULL) {
3280 set = createSetObject();
3281 dictAdd(c->db->dict,c->argv[1],set);
3282 incrRefCount(c->argv[1]);
3283 } else {
3284 if (set->type != REDIS_SET) {
3285 addReply(c,shared.wrongtypeerr);
3286 return;
3287 }
3288 }
3289 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3290 incrRefCount(c->argv[2]);
3291 server.dirty++;
3292 addReply(c,shared.cone);
3293 } else {
3294 addReply(c,shared.czero);
3295 }
3296 }
3297
3298 static void sremCommand(redisClient *c) {
3299 robj *set;
3300
3301 set = lookupKeyWrite(c->db,c->argv[1]);
3302 if (set == NULL) {
3303 addReply(c,shared.czero);
3304 } else {
3305 if (set->type != REDIS_SET) {
3306 addReply(c,shared.wrongtypeerr);
3307 return;
3308 }
3309 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3310 server.dirty++;
3311 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3312 addReply(c,shared.cone);
3313 } else {
3314 addReply(c,shared.czero);
3315 }
3316 }
3317 }
3318
3319 static void smoveCommand(redisClient *c) {
3320 robj *srcset, *dstset;
3321
3322 srcset = lookupKeyWrite(c->db,c->argv[1]);
3323 dstset = lookupKeyWrite(c->db,c->argv[2]);
3324
3325 /* If the source key does not exist return 0, if it's of the wrong type
3326 * raise an error */
3327 if (srcset == NULL || srcset->type != REDIS_SET) {
3328 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3329 return;
3330 }
3331 /* Error if the destination key is not a set as well */
3332 if (dstset && dstset->type != REDIS_SET) {
3333 addReply(c,shared.wrongtypeerr);
3334 return;
3335 }
3336 /* Remove the element from the source set */
3337 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3338 /* Key not found in the src set! return zero */
3339 addReply(c,shared.czero);
3340 return;
3341 }
3342 server.dirty++;
3343 /* Add the element to the destination set */
3344 if (!dstset) {
3345 dstset = createSetObject();
3346 dictAdd(c->db->dict,c->argv[2],dstset);
3347 incrRefCount(c->argv[2]);
3348 }
3349 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3350 incrRefCount(c->argv[3]);
3351 addReply(c,shared.cone);
3352 }
3353
3354 static void sismemberCommand(redisClient *c) {
3355 robj *set;
3356
3357 set = lookupKeyRead(c->db,c->argv[1]);
3358 if (set == NULL) {
3359 addReply(c,shared.czero);
3360 } else {
3361 if (set->type != REDIS_SET) {
3362 addReply(c,shared.wrongtypeerr);
3363 return;
3364 }
3365 if (dictFind(set->ptr,c->argv[2]))
3366 addReply(c,shared.cone);
3367 else
3368 addReply(c,shared.czero);
3369 }
3370 }
3371
3372 static void scardCommand(redisClient *c) {
3373 robj *o;
3374 dict *s;
3375
3376 o = lookupKeyRead(c->db,c->argv[1]);
3377 if (o == NULL) {
3378 addReply(c,shared.czero);
3379 return;
3380 } else {
3381 if (o->type != REDIS_SET) {
3382 addReply(c,shared.wrongtypeerr);
3383 } else {
3384 s = o->ptr;
3385 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3386 dictSize(s)));
3387 }
3388 }
3389 }
3390
3391 static void spopCommand(redisClient *c) {
3392 robj *set;
3393 dictEntry *de;
3394
3395 set = lookupKeyWrite(c->db,c->argv[1]);
3396 if (set == NULL) {
3397 addReply(c,shared.nullbulk);
3398 } else {
3399 if (set->type != REDIS_SET) {
3400 addReply(c,shared.wrongtypeerr);
3401 return;
3402 }
3403 de = dictGetRandomKey(set->ptr);
3404 if (de == NULL) {
3405 addReply(c,shared.nullbulk);
3406 } else {
3407 robj *ele = dictGetEntryKey(de);
3408
3409 addReplyBulkLen(c,ele);
3410 addReply(c,ele);
3411 addReply(c,shared.crlf);
3412 dictDelete(set->ptr,ele);
3413 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3414 server.dirty++;
3415 }
3416 }
3417 }
3418
3419 static void srandmemberCommand(redisClient *c) {
3420 robj *set;
3421 dictEntry *de;
3422
3423 set = lookupKeyRead(c->db,c->argv[1]);
3424 if (set == NULL) {
3425 addReply(c,shared.nullbulk);
3426 } else {
3427 if (set->type != REDIS_SET) {
3428 addReply(c,shared.wrongtypeerr);
3429 return;
3430 }
3431 de = dictGetRandomKey(set->ptr);
3432 if (de == NULL) {
3433 addReply(c,shared.nullbulk);
3434 } else {
3435 robj *ele = dictGetEntryKey(de);
3436
3437 addReplyBulkLen(c,ele);
3438 addReply(c,ele);
3439 addReply(c,shared.crlf);
3440 }
3441 }
3442 }
3443
3444 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3445 dict **d1 = (void*) s1, **d2 = (void*) s2;
3446
3447 return dictSize(*d1)-dictSize(*d2);
3448 }
3449
3450 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3451 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3452 dictIterator *di;
3453 dictEntry *de;
3454 robj *lenobj = NULL, *dstset = NULL;
3455 int j, cardinality = 0;
3456
3457 if (!dv) oom("sinterGenericCommand");
3458 for (j = 0; j < setsnum; j++) {
3459 robj *setobj;
3460
3461 setobj = dstkey ?
3462 lookupKeyWrite(c->db,setskeys[j]) :
3463 lookupKeyRead(c->db,setskeys[j]);
3464 if (!setobj) {
3465 zfree(dv);
3466 if (dstkey) {
3467 deleteKey(c->db,dstkey);
3468 addReply(c,shared.ok);
3469 } else {
3470 addReply(c,shared.nullmultibulk);
3471 }
3472 return;
3473 }
3474 if (setobj->type != REDIS_SET) {
3475 zfree(dv);
3476 addReply(c,shared.wrongtypeerr);
3477 return;
3478 }
3479 dv[j] = setobj->ptr;
3480 }
3481 /* Sort sets from the smallest to largest, this will improve our
3482 * algorithm's performace */
3483 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3484
3485 /* The first thing we should output is the total number of elements...
3486 * since this is a multi-bulk write, but at this stage we don't know
3487 * the intersection set size, so we use a trick, append an empty object
3488 * to the output list and save the pointer to later modify it with the
3489 * right length */
3490 if (!dstkey) {
3491 lenobj = createObject(REDIS_STRING,NULL);
3492 addReply(c,lenobj);
3493 decrRefCount(lenobj);
3494 } else {
3495 /* If we have a target key where to store the resulting set
3496 * create this key with an empty set inside */
3497 dstset = createSetObject();
3498 }
3499
3500 /* Iterate all the elements of the first (smallest) set, and test
3501 * the element against all the other sets, if at least one set does
3502 * not include the element it is discarded */
3503 di = dictGetIterator(dv[0]);
3504 if (!di) oom("dictGetIterator");
3505
3506 while((de = dictNext(di)) != NULL) {
3507 robj *ele;
3508
3509 for (j = 1; j < setsnum; j++)
3510 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3511 if (j != setsnum)
3512 continue; /* at least one set does not contain the member */
3513 ele = dictGetEntryKey(de);
3514 if (!dstkey) {
3515 addReplyBulkLen(c,ele);
3516 addReply(c,ele);
3517 addReply(c,shared.crlf);
3518 cardinality++;
3519 } else {
3520 dictAdd(dstset->ptr,ele,NULL);
3521 incrRefCount(ele);
3522 }
3523 }
3524 dictReleaseIterator(di);
3525
3526 if (dstkey) {
3527 /* Store the resulting set into the target */
3528 deleteKey(c->db,dstkey);
3529 dictAdd(c->db->dict,dstkey,dstset);
3530 incrRefCount(dstkey);
3531 }
3532
3533 if (!dstkey) {
3534 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3535 } else {
3536 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3537 dictSize((dict*)dstset->ptr)));
3538 server.dirty++;
3539 }
3540 zfree(dv);
3541 }
3542
3543 static void sinterCommand(redisClient *c) {
3544 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3545 }
3546
3547 static void sinterstoreCommand(redisClient *c) {
3548 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3549 }
3550
3551 #define REDIS_OP_UNION 0
3552 #define REDIS_OP_DIFF 1
3553
3554 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3555 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3556 dictIterator *di;
3557 dictEntry *de;
3558 robj *dstset = NULL;
3559 int j, cardinality = 0;
3560
3561 if (!dv) oom("sunionDiffGenericCommand");
3562 for (j = 0; j < setsnum; j++) {
3563 robj *setobj;
3564
3565 setobj = dstkey ?
3566 lookupKeyWrite(c->db,setskeys[j]) :
3567 lookupKeyRead(c->db,setskeys[j]);
3568 if (!setobj) {
3569 dv[j] = NULL;
3570 continue;
3571 }
3572 if (setobj->type != REDIS_SET) {
3573 zfree(dv);
3574 addReply(c,shared.wrongtypeerr);
3575 return;
3576 }
3577 dv[j] = setobj->ptr;
3578 }
3579
3580 /* We need a temp set object to store our union. If the dstkey
3581 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3582 * this set object will be the resulting object to set into the target key*/
3583 dstset = createSetObject();
3584
3585 /* Iterate all the elements of all the sets, add every element a single
3586 * time to the result set */
3587 for (j = 0; j < setsnum; j++) {
3588 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3589 if (!dv[j]) continue; /* non existing keys are like empty sets */
3590
3591 di = dictGetIterator(dv[j]);
3592 if (!di) oom("dictGetIterator");
3593
3594 while((de = dictNext(di)) != NULL) {
3595 robj *ele;
3596
3597 /* dictAdd will not add the same element multiple times */
3598 ele = dictGetEntryKey(de);
3599 if (op == REDIS_OP_UNION || j == 0) {
3600 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3601 incrRefCount(ele);
3602 cardinality++;
3603 }
3604 } else if (op == REDIS_OP_DIFF) {
3605 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3606 cardinality--;
3607 }
3608 }
3609 }
3610 dictReleaseIterator(di);
3611
3612 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3613 }
3614
3615 /* Output the content of the resulting set, if not in STORE mode */
3616 if (!dstkey) {
3617 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3618 di = dictGetIterator(dstset->ptr);
3619 if (!di) oom("dictGetIterator");
3620 while((de = dictNext(di)) != NULL) {
3621 robj *ele;
3622
3623 ele = dictGetEntryKey(de);
3624 addReplyBulkLen(c,ele);
3625 addReply(c,ele);
3626 addReply(c,shared.crlf);
3627 }
3628 dictReleaseIterator(di);
3629 } else {
3630 /* If we have a target key where to store the resulting set
3631 * create this key with the result set inside */
3632 deleteKey(c->db,dstkey);
3633 dictAdd(c->db->dict,dstkey,dstset);
3634 incrRefCount(dstkey);
3635 }
3636
3637 /* Cleanup */
3638 if (!dstkey) {
3639 decrRefCount(dstset);
3640 } else {
3641 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3642 dictSize((dict*)dstset->ptr)));
3643 server.dirty++;
3644 }
3645 zfree(dv);
3646 }
3647
3648 static void sunionCommand(redisClient *c) {
3649 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3650 }
3651
3652 static void sunionstoreCommand(redisClient *c) {
3653 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3654 }
3655
3656 static void sdiffCommand(redisClient *c) {
3657 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3658 }
3659
3660 static void sdiffstoreCommand(redisClient *c) {
3661 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3662 }
3663
3664 static void flushdbCommand(redisClient *c) {
3665 server.dirty += dictSize(c->db->dict);
3666 dictEmpty(c->db->dict);
3667 dictEmpty(c->db->expires);
3668 addReply(c,shared.ok);
3669 }
3670
3671 static void flushallCommand(redisClient *c) {
3672 server.dirty += emptyDb();
3673 addReply(c,shared.ok);
3674 rdbSave(server.dbfilename);
3675 server.dirty++;
3676 }
3677
3678 static redisSortOperation *createSortOperation(int type, robj *pattern) {
3679 redisSortOperation *so = zmalloc(sizeof(*so));
3680 if (!so) oom("createSortOperation");
3681 so->type = type;
3682 so->pattern = pattern;
3683 return so;
3684 }
3685
3686 /* Return the value associated to the key with a name obtained
3687 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3688 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3689 char *p;
3690 sds spat, ssub;
3691 robj keyobj;
3692 int prefixlen, sublen, postfixlen;
3693 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3694 struct {
3695 long len;
3696 long free;
3697 char buf[REDIS_SORTKEY_MAX+1];
3698 } keyname;
3699
3700 if (subst->encoding == REDIS_ENCODING_RAW)
3701 incrRefCount(subst);
3702 else {
3703 subst = getDecodedObject(subst);
3704 }
3705
3706 spat = pattern->ptr;
3707 ssub = subst->ptr;
3708 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3709 p = strchr(spat,'*');
3710 if (!p) return NULL;
3711
3712 prefixlen = p-spat;
3713 sublen = sdslen(ssub);
3714 postfixlen = sdslen(spat)-(prefixlen+1);
3715 memcpy(keyname.buf,spat,prefixlen);
3716 memcpy(keyname.buf+prefixlen,ssub,sublen);
3717 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3718 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3719 keyname.len = prefixlen+sublen+postfixlen;
3720
3721 keyobj.refcount = 1;
3722 keyobj.type = REDIS_STRING;
3723 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3724
3725 decrRefCount(subst);
3726
3727 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3728 return lookupKeyRead(db,&keyobj);
3729 }
3730
3731 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3732 * the additional parameter is not standard but a BSD-specific we have to
3733 * pass sorting parameters via the global 'server' structure */
3734 static int sortCompare(const void *s1, const void *s2) {
3735 const redisSortObject *so1 = s1, *so2 = s2;
3736 int cmp;
3737
3738 if (!server.sort_alpha) {
3739 /* Numeric sorting. Here it's trivial as we precomputed scores */
3740 if (so1->u.score > so2->u.score) {
3741 cmp = 1;
3742 } else if (so1->u.score < so2->u.score) {
3743 cmp = -1;
3744 } else {
3745 cmp = 0;
3746 }
3747 } else {
3748 /* Alphanumeric sorting */
3749 if (server.sort_bypattern) {
3750 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3751 /* At least one compare object is NULL */
3752 if (so1->u.cmpobj == so2->u.cmpobj)
3753 cmp = 0;
3754 else if (so1->u.cmpobj == NULL)
3755 cmp = -1;
3756 else
3757 cmp = 1;
3758 } else {
3759 /* We have both the objects, use strcoll */
3760 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3761 }
3762 } else {
3763 /* Compare elements directly */
3764 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
3765 so2->obj->encoding == REDIS_ENCODING_RAW) {
3766 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3767 } else {
3768 robj *dec1, *dec2;
3769
3770 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
3771 so1->obj : getDecodedObject(so1->obj);
3772 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
3773 so2->obj : getDecodedObject(so2->obj);
3774 cmp = strcoll(dec1->ptr,dec2->ptr);
3775 if (dec1 != so1->obj) decrRefCount(dec1);
3776 if (dec2 != so2->obj) decrRefCount(dec2);
3777 }
3778 }
3779 }
3780 return server.sort_desc ? -cmp : cmp;
3781 }
3782
3783 /* The SORT command is the most complex command in Redis. Warning: this code
3784 * is optimized for speed and a bit less for readability */
3785 static void sortCommand(redisClient *c) {
3786 list *operations;
3787 int outputlen = 0;
3788 int desc = 0, alpha = 0;
3789 int limit_start = 0, limit_count = -1, start, end;
3790 int j, dontsort = 0, vectorlen;
3791 int getop = 0; /* GET operation counter */
3792 robj *sortval, *sortby = NULL;
3793 redisSortObject *vector; /* Resulting vector to sort */
3794
3795 /* Lookup the key to sort. It must be of the right types */
3796 sortval = lookupKeyRead(c->db,c->argv[1]);
3797 if (sortval == NULL) {
3798 addReply(c,shared.nokeyerr);
3799 return;
3800 }
3801 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3802 addReply(c,shared.wrongtypeerr);
3803 return;
3804 }
3805
3806 /* Create a list of operations to perform for every sorted element.
3807 * Operations can be GET/DEL/INCR/DECR */
3808 operations = listCreate();
3809 listSetFreeMethod(operations,zfree);
3810 j = 2;
3811
3812 /* Now we need to protect sortval incrementing its count, in the future
3813 * SORT may have options able to overwrite/delete keys during the sorting
3814 * and the sorted key itself may get destroied */
3815 incrRefCount(sortval);
3816
3817 /* The SORT command has an SQL-alike syntax, parse it */
3818 while(j < c->argc) {
3819 int leftargs = c->argc-j-1;
3820 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3821 desc = 0;
3822 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3823 desc = 1;
3824 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3825 alpha = 1;
3826 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3827 limit_start = atoi(c->argv[j+1]->ptr);
3828 limit_count = atoi(c->argv[j+2]->ptr);
3829 j+=2;
3830 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3831 sortby = c->argv[j+1];
3832 /* If the BY pattern does not contain '*', i.e. it is constant,
3833 * we don't need to sort nor to lookup the weight keys. */
3834 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3835 j++;
3836 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3837 listAddNodeTail(operations,createSortOperation(
3838 REDIS_SORT_GET,c->argv[j+1]));
3839 getop++;
3840 j++;
3841 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3842 listAddNodeTail(operations,createSortOperation(
3843 REDIS_SORT_DEL,c->argv[j+1]));
3844 j++;
3845 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3846 listAddNodeTail(operations,createSortOperation(
3847 REDIS_SORT_INCR,c->argv[j+1]));
3848 j++;
3849 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3850 listAddNodeTail(operations,createSortOperation(
3851 REDIS_SORT_DECR,c->argv[j+1]));
3852 j++;
3853 } else {
3854 decrRefCount(sortval);
3855 listRelease(operations);
3856 addReply(c,shared.syntaxerr);
3857 return;
3858 }
3859 j++;
3860 }
3861
3862 /* Load the sorting vector with all the objects to sort */
3863 vectorlen = (sortval->type == REDIS_LIST) ?
3864 listLength((list*)sortval->ptr) :
3865 dictSize((dict*)sortval->ptr);
3866 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3867 if (!vector) oom("allocating objects vector for SORT");
3868 j = 0;
3869 if (sortval->type == REDIS_LIST) {
3870 list *list = sortval->ptr;
3871 listNode *ln;
3872
3873 listRewind(list);
3874 while((ln = listYield(list))) {
3875 robj *ele = ln->value;
3876 vector[j].obj = ele;
3877 vector[j].u.score = 0;
3878 vector[j].u.cmpobj = NULL;
3879 j++;
3880 }
3881 } else {
3882 dict *set = sortval->ptr;
3883 dictIterator *di;
3884 dictEntry *setele;
3885
3886 di = dictGetIterator(set);
3887 if (!di) oom("dictGetIterator");
3888 while((setele = dictNext(di)) != NULL) {
3889 vector[j].obj = dictGetEntryKey(setele);
3890 vector[j].u.score = 0;
3891 vector[j].u.cmpobj = NULL;
3892 j++;
3893 }
3894 dictReleaseIterator(di);
3895 }
3896 assert(j == vectorlen);
3897
3898 /* Now it's time to load the right scores in the sorting vector */
3899 if (dontsort == 0) {
3900 for (j = 0; j < vectorlen; j++) {
3901 if (sortby) {
3902 robj *byval;
3903
3904 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3905 if (!byval || byval->type != REDIS_STRING) continue;
3906 if (alpha) {
3907 if (byval->encoding == REDIS_ENCODING_RAW) {
3908 vector[j].u.cmpobj = byval;
3909 incrRefCount(byval);
3910 } else {
3911 vector[j].u.cmpobj = getDecodedObject(byval);
3912 }
3913 } else {
3914 if (byval->encoding == REDIS_ENCODING_RAW) {
3915 vector[j].u.score = strtod(byval->ptr,NULL);
3916 } else {
3917 if (byval->encoding == REDIS_ENCODING_INT) {
3918 vector[j].u.score = (long)byval->ptr;
3919 } else
3920 assert(1 != 1);
3921 }
3922 }
3923 } else {
3924 if (!alpha) {
3925 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
3926 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3927 else {
3928 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
3929 vector[j].u.score = (long) vector[j].obj->ptr;
3930 else
3931 assert(1 != 1);
3932 }
3933 }
3934 }
3935 }
3936 }
3937
3938 /* We are ready to sort the vector... perform a bit of sanity check
3939 * on the LIMIT option too. We'll use a partial version of quicksort. */
3940 start = (limit_start < 0) ? 0 : limit_start;
3941 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3942 if (start >= vectorlen) {
3943 start = vectorlen-1;
3944 end = vectorlen-2;
3945 }
3946 if (end >= vectorlen) end = vectorlen-1;
3947
3948 if (dontsort == 0) {
3949 server.sort_desc = desc;
3950 server.sort_alpha = alpha;
3951 server.sort_bypattern = sortby ? 1 : 0;
3952 if (sortby && (start != 0 || end != vectorlen-1))
3953 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3954 else
3955 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3956 }
3957
3958 /* Send command output to the output buffer, performing the specified
3959 * GET/DEL/INCR/DECR operations if any. */
3960 outputlen = getop ? getop*(end-start+1) : end-start+1;
3961 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3962 for (j = start; j <= end; j++) {
3963 listNode *ln;
3964 if (!getop) {
3965 addReplyBulkLen(c,vector[j].obj);
3966 addReply(c,vector[j].obj);
3967 addReply(c,shared.crlf);
3968 }
3969 listRewind(operations);
3970 while((ln = listYield(operations))) {
3971 redisSortOperation *sop = ln->value;
3972 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3973 vector[j].obj);
3974
3975 if (sop->type == REDIS_SORT_GET) {
3976 if (!val || val->type != REDIS_STRING) {
3977 addReply(c,shared.nullbulk);
3978 } else {
3979 addReplyBulkLen(c,val);
3980 addReply(c,val);
3981 addReply(c,shared.crlf);
3982 }
3983 } else if (sop->type == REDIS_SORT_DEL) {
3984 /* TODO */
3985 }
3986 }
3987 }
3988
3989 /* Cleanup */
3990 decrRefCount(sortval);
3991 listRelease(operations);
3992 for (j = 0; j < vectorlen; j++) {
3993 if (sortby && alpha && vector[j].u.cmpobj)
3994 decrRefCount(vector[j].u.cmpobj);
3995 }
3996 zfree(vector);
3997 }
3998
3999 static void infoCommand(redisClient *c) {
4000 sds info;
4001 time_t uptime = time(NULL)-server.stat_starttime;
4002 int j;
4003
4004 info = sdscatprintf(sdsempty(),
4005 "redis_version:%s\r\n"
4006 "arch_bits:%s\r\n"
4007 "uptime_in_seconds:%d\r\n"
4008 "uptime_in_days:%d\r\n"
4009 "connected_clients:%d\r\n"
4010 "connected_slaves:%d\r\n"
4011 "used_memory:%zu\r\n"
4012 "changes_since_last_save:%lld\r\n"
4013 "bgsave_in_progress:%d\r\n"
4014 "last_save_time:%d\r\n"
4015 "total_connections_received:%lld\r\n"
4016 "total_commands_processed:%lld\r\n"
4017 "role:%s\r\n"
4018 ,REDIS_VERSION,
4019 (sizeof(long) == 8) ? "64" : "32",
4020 uptime,
4021 uptime/(3600*24),
4022 listLength(server.clients)-listLength(server.slaves),
4023 listLength(server.slaves),
4024 server.usedmemory,
4025 server.dirty,
4026 server.bgsaveinprogress,
4027 server.lastsave,
4028 server.stat_numconnections,
4029 server.stat_numcommands,
4030 server.masterhost == NULL ? "master" : "slave"
4031 );
4032 if (server.masterhost) {
4033 info = sdscatprintf(info,
4034 "master_host:%s\r\n"
4035 "master_port:%d\r\n"
4036 "master_link_status:%s\r\n"
4037 "master_last_io_seconds_ago:%d\r\n"
4038 ,server.masterhost,
4039 server.masterport,
4040 (server.replstate == REDIS_REPL_CONNECTED) ?
4041 "up" : "down",
4042 (int)(time(NULL)-server.master->lastinteraction)
4043 );
4044 }
4045 for (j = 0; j < server.dbnum; j++) {
4046 long long keys, vkeys;
4047
4048 keys = dictSize(server.db[j].dict);
4049 vkeys = dictSize(server.db[j].expires);
4050 if (keys || vkeys) {
4051 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4052 j, keys, vkeys);
4053 }
4054 }
4055 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4056 addReplySds(c,info);
4057 addReply(c,shared.crlf);
4058 }
4059
4060 static void monitorCommand(redisClient *c) {
4061 /* ignore MONITOR if aleady slave or in monitor mode */
4062 if (c->flags & REDIS_SLAVE) return;
4063
4064 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4065 c->slaveseldb = 0;
4066 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
4067 addReply(c,shared.ok);
4068 }
4069
4070 /* ================================= Expire ================================= */
4071 static int removeExpire(redisDb *db, robj *key) {
4072 if (dictDelete(db->expires,key) == DICT_OK) {
4073 return 1;
4074 } else {
4075 return 0;
4076 }
4077 }
4078
4079 static int setExpire(redisDb *db, robj *key, time_t when) {
4080 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4081 return 0;
4082 } else {
4083 incrRefCount(key);
4084 return 1;
4085 }
4086 }
4087
4088 /* Return the expire time of the specified key, or -1 if no expire
4089 * is associated with this key (i.e. the key is non volatile) */
4090 static time_t getExpire(redisDb *db, robj *key) {
4091 dictEntry *de;
4092
4093 /* No expire? return ASAP */
4094 if (dictSize(db->expires) == 0 ||
4095 (de = dictFind(db->expires,key)) == NULL) return -1;
4096
4097 return (time_t) dictGetEntryVal(de);
4098 }
4099
4100 static int expireIfNeeded(redisDb *db, robj *key) {
4101 time_t when;
4102 dictEntry *de;
4103
4104 /* No expire? return ASAP */
4105 if (dictSize(db->expires) == 0 ||
4106 (de = dictFind(db->expires,key)) == NULL) return 0;
4107
4108 /* Lookup the expire */
4109 when = (time_t) dictGetEntryVal(de);
4110 if (time(NULL) <= when) return 0;
4111
4112 /* Delete the key */
4113 dictDelete(db->expires,key);
4114 return dictDelete(db->dict,key) == DICT_OK;
4115 }
4116
4117 static int deleteIfVolatile(redisDb *db, robj *key) {
4118 dictEntry *de;
4119
4120 /* No expire? return ASAP */
4121 if (dictSize(db->expires) == 0 ||
4122 (de = dictFind(db->expires,key)) == NULL) return 0;
4123
4124 /* Delete the key */
4125 server.dirty++;
4126 dictDelete(db->expires,key);
4127 return dictDelete(db->dict,key) == DICT_OK;
4128 }
4129
4130 static void expireCommand(redisClient *c) {
4131 dictEntry *de;
4132 int seconds = atoi(c->argv[2]->ptr);
4133
4134 de = dictFind(c->db->dict,c->argv[1]);
4135 if (de == NULL) {
4136 addReply(c,shared.czero);
4137 return;
4138 }
4139 if (seconds <= 0) {
4140 addReply(c, shared.czero);
4141 return;
4142 } else {
4143 time_t when = time(NULL)+seconds;
4144 if (setExpire(c->db,c->argv[1],when)) {
4145 addReply(c,shared.cone);
4146 server.dirty++;
4147 } else {
4148 addReply(c,shared.czero);
4149 }
4150 return;
4151 }
4152 }
4153
4154 static void ttlCommand(redisClient *c) {
4155 time_t expire;
4156 int ttl = -1;
4157
4158 expire = getExpire(c->db,c->argv[1]);
4159 if (expire != -1) {
4160 ttl = (int) (expire-time(NULL));
4161 if (ttl < 0) ttl = -1;
4162 }
4163 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4164 }
4165
4166 static void msetGenericCommand(redisClient *c, int nx) {
4167 int j;
4168
4169 if ((c->argc % 2) == 0) {
4170 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4171 return;
4172 }
4173 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4174 * set nothing at all if at least one already key exists. */
4175 if (nx) {
4176 for (j = 1; j < c->argc; j += 2) {
4177 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4178 addReply(c, shared.czero);
4179 return;
4180 }
4181 }
4182 }
4183
4184 for (j = 1; j < c->argc; j += 2) {
4185 int retval;
4186
4187 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4188 if (retval == DICT_ERR) {
4189 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4190 incrRefCount(c->argv[j+1]);
4191 } else {
4192 incrRefCount(c->argv[j]);
4193 incrRefCount(c->argv[j+1]);
4194 }
4195 removeExpire(c->db,c->argv[j]);
4196 }
4197 server.dirty += (c->argc-1)/2;
4198 addReply(c, nx ? shared.cone : shared.ok);
4199 }
4200
4201 static void msetCommand(redisClient *c) {
4202 msetGenericCommand(c,0);
4203 }
4204
4205 static void msetnxCommand(redisClient *c) {
4206 msetGenericCommand(c,1);
4207 }
4208
4209 /* =============================== Replication ============================= */
4210
4211 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4212 ssize_t nwritten, ret = size;
4213 time_t start = time(NULL);
4214
4215 timeout++;
4216 while(size) {
4217 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4218 nwritten = write(fd,ptr,size);
4219 if (nwritten == -1) return -1;
4220 ptr += nwritten;
4221 size -= nwritten;
4222 }
4223 if ((time(NULL)-start) > timeout) {
4224 errno = ETIMEDOUT;
4225 return -1;
4226 }
4227 }
4228 return ret;
4229 }
4230
4231 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4232 ssize_t nread, totread = 0;
4233 time_t start = time(NULL);
4234
4235 timeout++;
4236 while(size) {
4237 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4238 nread = read(fd,ptr,size);
4239 if (nread == -1) return -1;
4240 ptr += nread;
4241 size -= nread;
4242 totread += nread;
4243 }
4244 if ((time(NULL)-start) > timeout) {
4245 errno = ETIMEDOUT;
4246 return -1;
4247 }
4248 }
4249 return totread;
4250 }
4251
4252 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4253 ssize_t nread = 0;
4254
4255 size--;
4256 while(size) {
4257 char c;
4258
4259 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4260 if (c == '\n') {
4261 *ptr = '\0';
4262 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4263 return nread;
4264 } else {
4265 *ptr++ = c;
4266 *ptr = '\0';
4267 nread++;
4268 }
4269 }
4270 return nread;
4271 }
4272
4273 static void syncCommand(redisClient *c) {
4274 /* ignore SYNC if aleady slave or in monitor mode */
4275 if (c->flags & REDIS_SLAVE) return;
4276
4277 /* SYNC can't be issued when the server has pending data to send to
4278 * the client about already issued commands. We need a fresh reply
4279 * buffer registering the differences between the BGSAVE and the current
4280 * dataset, so that we can copy to other slaves if needed. */
4281 if (listLength(c->reply) != 0) {
4282 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4283 return;
4284 }
4285
4286 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4287 /* Here we need to check if there is a background saving operation
4288 * in progress, or if it is required to start one */
4289 if (server.bgsaveinprogress) {
4290 /* Ok a background save is in progress. Let's check if it is a good
4291 * one for replication, i.e. if there is another slave that is
4292 * registering differences since the server forked to save */
4293 redisClient *slave;
4294 listNode *ln;
4295
4296 listRewind(server.slaves);
4297 while((ln = listYield(server.slaves))) {
4298 slave = ln->value;
4299 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4300 }
4301 if (ln) {
4302 /* Perfect, the server is already registering differences for
4303 * another slave. Set the right state, and copy the buffer. */
4304 listRelease(c->reply);
4305 c->reply = listDup(slave->reply);
4306 if (!c->reply) oom("listDup copying slave reply list");
4307 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4308 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4309 } else {
4310 /* No way, we need to wait for the next BGSAVE in order to
4311 * register differences */
4312 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4313 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4314 }
4315 } else {
4316 /* Ok we don't have a BGSAVE in progress, let's start one */
4317 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4318 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4319 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4320 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4321 return;
4322 }
4323 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4324 }
4325 c->repldbfd = -1;
4326 c->flags |= REDIS_SLAVE;
4327 c->slaveseldb = 0;
4328 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
4329 return;
4330 }
4331
4332 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4333 redisClient *slave = privdata;
4334 REDIS_NOTUSED(el);
4335 REDIS_NOTUSED(mask);
4336 char buf[REDIS_IOBUF_LEN];
4337 ssize_t nwritten, buflen;
4338
4339 if (slave->repldboff == 0) {
4340 /* Write the bulk write count before to transfer the DB. In theory here
4341 * we don't know how much room there is in the output buffer of the
4342 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4343 * operations) will never be smaller than the few bytes we need. */
4344 sds bulkcount;
4345
4346 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4347 slave->repldbsize);
4348 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4349 {
4350 sdsfree(bulkcount);
4351 freeClient(slave);
4352 return;
4353 }
4354 sdsfree(bulkcount);
4355 }
4356 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4357 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4358 if (buflen <= 0) {
4359 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4360 (buflen == 0) ? "premature EOF" : strerror(errno));
4361 freeClient(slave);
4362 return;
4363 }
4364 if ((nwritten = write(fd,buf,buflen)) == -1) {
4365 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4366 strerror(errno));
4367 freeClient(slave);
4368 return;
4369 }
4370 slave->repldboff += nwritten;
4371 if (slave->repldboff == slave->repldbsize) {
4372 close(slave->repldbfd);
4373 slave->repldbfd = -1;
4374 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4375 slave->replstate = REDIS_REPL_ONLINE;
4376 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4377 sendReplyToClient, slave, NULL) == AE_ERR) {
4378 freeClient(slave);
4379 return;
4380 }
4381 addReplySds(slave,sdsempty());
4382 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4383 }
4384 }
4385
4386 /* This function is called at the end of every backgrond saving.
4387 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4388 * otherwise REDIS_ERR is passed to the function.
4389 *
4390 * The goal of this function is to handle slaves waiting for a successful
4391 * background saving in order to perform non-blocking synchronization. */
4392 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4393 listNode *ln;
4394 int startbgsave = 0;
4395
4396 listRewind(server.slaves);
4397 while((ln = listYield(server.slaves))) {
4398 redisClient *slave = ln->value;
4399
4400 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4401 startbgsave = 1;
4402 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4403 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4404 struct redis_stat buf;
4405
4406 if (bgsaveerr != REDIS_OK) {
4407 freeClient(slave);
4408 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4409 continue;
4410 }
4411 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4412 redis_fstat(slave->repldbfd,&buf) == -1) {
4413 freeClient(slave);
4414 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4415 continue;
4416 }
4417 slave->repldboff = 0;
4418 slave->repldbsize = buf.st_size;
4419 slave->replstate = REDIS_REPL_SEND_BULK;
4420 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4421 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4422 freeClient(slave);
4423 continue;
4424 }
4425 }
4426 }
4427 if (startbgsave) {
4428 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4429 listRewind(server.slaves);
4430 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4431 while((ln = listYield(server.slaves))) {
4432 redisClient *slave = ln->value;
4433
4434 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4435 freeClient(slave);
4436 }
4437 }
4438 }
4439 }
4440
4441 static int syncWithMaster(void) {
4442 char buf[1024], tmpfile[256];
4443 int dumpsize;
4444 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4445 int dfd;
4446
4447 if (fd == -1) {
4448 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4449 strerror(errno));
4450 return REDIS_ERR;
4451 }
4452 /* Issue the SYNC command */
4453 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4454 close(fd);
4455 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4456 strerror(errno));
4457 return REDIS_ERR;
4458 }
4459 /* Read the bulk write count */
4460 if (syncReadLine(fd,buf,1024,3600) == -1) {
4461 close(fd);
4462 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
4463 strerror(errno));
4464 return REDIS_ERR;
4465 }
4466 dumpsize = atoi(buf+1);
4467 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4468 /* Read the bulk write data on a temp file */
4469 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4470 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4471 if (dfd == -1) {
4472 close(fd);
4473 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
4474 return REDIS_ERR;
4475 }
4476 while(dumpsize) {
4477 int nread, nwritten;
4478
4479 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
4480 if (nread == -1) {
4481 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
4482 strerror(errno));
4483 close(fd);
4484 close(dfd);
4485 return REDIS_ERR;
4486 }
4487 nwritten = write(dfd,buf,nread);
4488 if (nwritten == -1) {
4489 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
4490 close(fd);
4491 close(dfd);
4492 return REDIS_ERR;
4493 }
4494 dumpsize -= nread;
4495 }
4496 close(dfd);
4497 if (rename(tmpfile,server.dbfilename) == -1) {
4498 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
4499 unlink(tmpfile);
4500 close(fd);
4501 return REDIS_ERR;
4502 }
4503 emptyDb();
4504 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4505 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4506 close(fd);
4507 return REDIS_ERR;
4508 }
4509 server.master = createClient(fd);
4510 server.master->flags |= REDIS_MASTER;
4511 server.replstate = REDIS_REPL_CONNECTED;
4512 return REDIS_OK;
4513 }
4514
4515 static void slaveofCommand(redisClient *c) {
4516 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4517 !strcasecmp(c->argv[2]->ptr,"one")) {
4518 if (server.masterhost) {
4519 sdsfree(server.masterhost);
4520 server.masterhost = NULL;
4521 if (server.master) freeClient(server.master);
4522 server.replstate = REDIS_REPL_NONE;
4523 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4524 }
4525 } else {
4526 sdsfree(server.masterhost);
4527 server.masterhost = sdsdup(c->argv[1]->ptr);
4528 server.masterport = atoi(c->argv[2]->ptr);
4529 if (server.master) freeClient(server.master);
4530 server.replstate = REDIS_REPL_CONNECT;
4531 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4532 server.masterhost, server.masterport);
4533 }
4534 addReply(c,shared.ok);
4535 }
4536
4537 /* ============================ Maxmemory directive ======================== */
4538
4539 /* This function gets called when 'maxmemory' is set on the config file to limit
4540 * the max memory used by the server, and we are out of memory.
4541 * This function will try to, in order:
4542 *
4543 * - Free objects from the free list
4544 * - Try to remove keys with an EXPIRE set
4545 *
4546 * It is not possible to free enough memory to reach used-memory < maxmemory
4547 * the server will start refusing commands that will enlarge even more the
4548 * memory usage.
4549 */
4550 static void freeMemoryIfNeeded(void) {
4551 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4552 if (listLength(server.objfreelist)) {
4553 robj *o;
4554
4555 listNode *head = listFirst(server.objfreelist);
4556 o = listNodeValue(head);
4557 listDelNode(server.objfreelist,head);
4558 zfree(o);
4559 } else {
4560 int j, k, freed = 0;
4561
4562 for (j = 0; j < server.dbnum; j++) {
4563 int minttl = -1;
4564 robj *minkey = NULL;
4565 struct dictEntry *de;
4566
4567 if (dictSize(server.db[j].expires)) {
4568 freed = 1;
4569 /* From a sample of three keys drop the one nearest to
4570 * the natural expire */
4571 for (k = 0; k < 3; k++) {
4572 time_t t;
4573
4574 de = dictGetRandomKey(server.db[j].expires);
4575 t = (time_t) dictGetEntryVal(de);
4576 if (minttl == -1 || t < minttl) {
4577 minkey = dictGetEntryKey(de);
4578 minttl = t;
4579 }
4580 }
4581 deleteKey(server.db+j,minkey);
4582 }
4583 }
4584 if (!freed) return; /* nothing to free... */
4585 }
4586 }
4587 }
4588
4589 /* ================================= Debugging ============================== */
4590
4591 static void debugCommand(redisClient *c) {
4592 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4593 *((char*)-1) = 'x';
4594 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4595 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4596 robj *key, *val;
4597
4598 if (!de) {
4599 addReply(c,shared.nokeyerr);
4600 return;
4601 }
4602 key = dictGetEntryKey(de);
4603 val = dictGetEntryVal(de);
4604 addReplySds(c,sdscatprintf(sdsempty(),
4605 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4606 key, key->refcount, val, val->refcount, val->encoding));
4607 } else {
4608 addReplySds(c,sdsnew(
4609 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4610 }
4611 }
4612
4613 #ifdef HAVE_BACKTRACE
4614 static struct redisFunctionSym symsTable[] = {
4615 {"compareStringObjects", (unsigned long)compareStringObjects},
4616 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
4617 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
4618 {"dictEncObjHash", (unsigned long)dictEncObjHash},
4619 {"incrDecrCommand", (unsigned long)incrDecrCommand},
4620 {"freeStringObject", (unsigned long)freeStringObject},
4621 {"freeListObject", (unsigned long)freeListObject},
4622 {"freeSetObject", (unsigned long)freeSetObject},
4623 {"decrRefCount", (unsigned long)decrRefCount},
4624 {"createObject", (unsigned long)createObject},
4625 {"freeClient", (unsigned long)freeClient},
4626 {"rdbLoad", (unsigned long)rdbLoad},
4627 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
4628 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
4629 {"addReply", (unsigned long)addReply},
4630 {"addReplySds", (unsigned long)addReplySds},
4631 {"incrRefCount", (unsigned long)incrRefCount},
4632 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
4633 {"createStringObject", (unsigned long)createStringObject},
4634 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
4635 {"syncWithMaster", (unsigned long)syncWithMaster},
4636 {"tryObjectSharing", (unsigned long)tryObjectSharing},
4637 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
4638 {"getDecodedObject", (unsigned long)getDecodedObject},
4639 {"removeExpire", (unsigned long)removeExpire},
4640 {"expireIfNeeded", (unsigned long)expireIfNeeded},
4641 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
4642 {"deleteKey", (unsigned long)deleteKey},
4643 {"getExpire", (unsigned long)getExpire},
4644 {"setExpire", (unsigned long)setExpire},
4645 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
4646 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
4647 {"authCommand", (unsigned long)authCommand},
4648 {"pingCommand", (unsigned long)pingCommand},
4649 {"echoCommand", (unsigned long)echoCommand},
4650 {"setCommand", (unsigned long)setCommand},
4651 {"setnxCommand", (unsigned long)setnxCommand},
4652 {"getCommand", (unsigned long)getCommand},
4653 {"delCommand", (unsigned long)delCommand},
4654 {"existsCommand", (unsigned long)existsCommand},
4655 {"incrCommand", (unsigned long)incrCommand},
4656 {"decrCommand", (unsigned long)decrCommand},
4657 {"incrbyCommand", (unsigned long)incrbyCommand},
4658 {"decrbyCommand", (unsigned long)decrbyCommand},
4659 {"selectCommand", (unsigned long)selectCommand},
4660 {"randomkeyCommand", (unsigned long)randomkeyCommand},
4661 {"keysCommand", (unsigned long)keysCommand},
4662 {"dbsizeCommand", (unsigned long)dbsizeCommand},
4663 {"lastsaveCommand", (unsigned long)lastsaveCommand},
4664 {"saveCommand", (unsigned long)saveCommand},
4665 {"bgsaveCommand", (unsigned long)bgsaveCommand},
4666 {"shutdownCommand", (unsigned long)shutdownCommand},
4667 {"moveCommand", (unsigned long)moveCommand},
4668 {"renameCommand", (unsigned long)renameCommand},
4669 {"renamenxCommand", (unsigned long)renamenxCommand},
4670 {"lpushCommand", (unsigned long)lpushCommand},
4671 {"rpushCommand", (unsigned long)rpushCommand},
4672 {"lpopCommand", (unsigned long)lpopCommand},
4673 {"rpopCommand", (unsigned long)rpopCommand},
4674 {"llenCommand", (unsigned long)llenCommand},
4675 {"lindexCommand", (unsigned long)lindexCommand},
4676 {"lrangeCommand", (unsigned long)lrangeCommand},
4677 {"ltrimCommand", (unsigned long)ltrimCommand},
4678 {"typeCommand", (unsigned long)typeCommand},
4679 {"lsetCommand", (unsigned long)lsetCommand},
4680 {"saddCommand", (unsigned long)saddCommand},
4681 {"sremCommand", (unsigned long)sremCommand},
4682 {"smoveCommand", (unsigned long)smoveCommand},
4683 {"sismemberCommand", (unsigned long)sismemberCommand},
4684 {"scardCommand", (unsigned long)scardCommand},
4685 {"spopCommand", (unsigned long)spopCommand},
4686 {"srandmemberCommand", (unsigned long)srandmemberCommand},
4687 {"sinterCommand", (unsigned long)sinterCommand},
4688 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
4689 {"sunionCommand", (unsigned long)sunionCommand},
4690 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
4691 {"sdiffCommand", (unsigned long)sdiffCommand},
4692 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
4693 {"syncCommand", (unsigned long)syncCommand},
4694 {"flushdbCommand", (unsigned long)flushdbCommand},
4695 {"flushallCommand", (unsigned long)flushallCommand},
4696 {"sortCommand", (unsigned long)sortCommand},
4697 {"lremCommand", (unsigned long)lremCommand},
4698 {"infoCommand", (unsigned long)infoCommand},
4699 {"mgetCommand", (unsigned long)mgetCommand},
4700 {"monitorCommand", (unsigned long)monitorCommand},
4701 {"expireCommand", (unsigned long)expireCommand},
4702 {"getsetCommand", (unsigned long)getsetCommand},
4703 {"ttlCommand", (unsigned long)ttlCommand},
4704 {"slaveofCommand", (unsigned long)slaveofCommand},
4705 {"debugCommand", (unsigned long)debugCommand},
4706 {"processCommand", (unsigned long)processCommand},
4707 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
4708 {"readQueryFromClient", (unsigned long)readQueryFromClient},
4709 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
4710 {"msetGenericCommand", (unsigned long)msetGenericCommand},
4711 {"msetCommand", (unsigned long)msetCommand},
4712 {"msetnxCommand", (unsigned long)msetnxCommand},
4713 {NULL,0}
4714 };
4715
4716 /* This function try to convert a pointer into a function name. It's used in
4717 * oreder to provide a backtrace under segmentation fault that's able to
4718 * display functions declared as static (otherwise the backtrace is useless). */
4719 static char *findFuncName(void *pointer, unsigned long *offset){
4720 int i, ret = -1;
4721 unsigned long off, minoff = 0;
4722
4723 /* Try to match against the Symbol with the smallest offset */
4724 for (i=0; symsTable[i].pointer; i++) {
4725 unsigned long lp = (unsigned long) pointer;
4726
4727 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
4728 off=lp-symsTable[i].pointer;
4729 if (ret < 0 || off < minoff) {
4730 minoff=off;
4731 ret=i;
4732 }
4733 }
4734 }
4735 if (ret == -1) return NULL;
4736 *offset = minoff;
4737 return symsTable[ret].name;
4738 }
4739
4740 static void *getMcontextEip(ucontext_t *uc) {
4741 #if defined(__FreeBSD__)
4742 return (void*) uc->uc_mcontext.mc_eip;
4743 #elif defined(__dietlibc__)
4744 return (void*) uc->uc_mcontext.eip;
4745 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
4746 return (void*) uc->uc_mcontext->__ss.__eip;
4747 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
4748 #ifdef _STRUCT_X86_THREAD_STATE64
4749 return (void*) uc->uc_mcontext->__ss.__rip;
4750 #else
4751 return (void*) uc->uc_mcontext->__ss.__eip;
4752 #endif
4753 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
4754 return (void*) uc->uc_mcontext.gregs[REG_EIP];
4755 #elif defined(__ia64__) /* Linux IA64 */
4756 return (void*) uc->uc_mcontext.sc_ip;
4757 #else
4758 return NULL;
4759 #endif
4760 }
4761
4762 static void segvHandler(int sig, siginfo_t *info, void *secret) {
4763 void *trace[100];
4764 char **messages = NULL;
4765 int i, trace_size = 0;
4766 unsigned long offset=0;
4767 time_t uptime = time(NULL)-server.stat_starttime;
4768 ucontext_t *uc = (ucontext_t*) secret;
4769 REDIS_NOTUSED(info);
4770
4771 redisLog(REDIS_WARNING,
4772 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
4773 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
4774 "redis_version:%s; "
4775 "uptime_in_seconds:%d; "
4776 "connected_clients:%d; "
4777 "connected_slaves:%d; "
4778 "used_memory:%zu; "
4779 "changes_since_last_save:%lld; "
4780 "bgsave_in_progress:%d; "
4781 "last_save_time:%d; "
4782 "total_connections_received:%lld; "
4783 "total_commands_processed:%lld; "
4784 "role:%s;"
4785 ,REDIS_VERSION,
4786 uptime,
4787 listLength(server.clients)-listLength(server.slaves),
4788 listLength(server.slaves),
4789 server.usedmemory,
4790 server.dirty,
4791 server.bgsaveinprogress,
4792 server.lastsave,
4793 server.stat_numconnections,
4794 server.stat_numcommands,
4795 server.masterhost == NULL ? "master" : "slave"
4796 ));
4797
4798 trace_size = backtrace(trace, 100);
4799 /* overwrite sigaction with caller's address */
4800 if (getMcontextEip(uc) != NULL) {
4801 trace[1] = getMcontextEip(uc);
4802 }
4803 messages = backtrace_symbols(trace, trace_size);
4804
4805 for (i=1; i<trace_size; ++i) {
4806 char *fn = findFuncName(trace[i], &offset), *p;
4807
4808 p = strchr(messages[i],'+');
4809 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
4810 redisLog(REDIS_WARNING,"%s", messages[i]);
4811 } else {
4812 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
4813 }
4814 }
4815 free(messages);
4816 exit(0);
4817 }
4818
4819 static void setupSigSegvAction(void) {
4820 struct sigaction act;
4821
4822 sigemptyset (&act.sa_mask);
4823 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4824 * is used. Otherwise, sa_handler is used */
4825 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
4826 act.sa_sigaction = segvHandler;
4827 sigaction (SIGSEGV, &act, NULL);
4828 sigaction (SIGBUS, &act, NULL);
4829 sigaction (SIGFPE, &act, NULL);
4830 sigaction (SIGILL, &act, NULL);
4831 sigaction (SIGBUS, &act, NULL);
4832 return;
4833 }
4834 #else /* HAVE_BACKTRACE */
4835 static void setupSigSegvAction(void) {
4836 }
4837 #endif /* HAVE_BACKTRACE */
4838
4839 /* =================================== Main! ================================ */
4840
4841 #ifdef __linux__
4842 int linuxOvercommitMemoryValue(void) {
4843 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4844 char buf[64];
4845
4846 if (!fp) return -1;
4847 if (fgets(buf,64,fp) == NULL) {
4848 fclose(fp);
4849 return -1;
4850 }
4851 fclose(fp);
4852
4853 return atoi(buf);
4854 }
4855
4856 void linuxOvercommitMemoryWarning(void) {
4857 if (linuxOvercommitMemoryValue() == 0) {
4858 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
4859 }
4860 }
4861 #endif /* __linux__ */
4862
4863 static void daemonize(void) {
4864 int fd;
4865 FILE *fp;
4866
4867 if (fork() != 0) exit(0); /* parent exits */
4868 setsid(); /* create a new session */
4869
4870 /* Every output goes to /dev/null. If Redis is daemonized but
4871 * the 'logfile' is set to 'stdout' in the configuration file
4872 * it will not log at all. */
4873 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4874 dup2(fd, STDIN_FILENO);
4875 dup2(fd, STDOUT_FILENO);
4876 dup2(fd, STDERR_FILENO);
4877 if (fd > STDERR_FILENO) close(fd);
4878 }
4879 /* Try to write the pid file */
4880 fp = fopen(server.pidfile,"w");
4881 if (fp) {
4882 fprintf(fp,"%d\n",getpid());
4883 fclose(fp);
4884 }
4885 }
4886
4887 int main(int argc, char **argv) {
4888 initServerConfig();
4889 if (argc == 2) {
4890 ResetServerSaveParams();
4891 loadServerConfig(argv[1]);
4892 } else if (argc > 2) {
4893 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4894 exit(1);
4895 } else {
4896 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4897 }
4898 initServer();
4899 if (server.daemonize) daemonize();
4900 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4901 #ifdef __linux__
4902 linuxOvercommitMemoryWarning();
4903 #endif
4904 if (rdbLoad(server.dbfilename) == REDIS_OK)
4905 redisLog(REDIS_NOTICE,"DB loaded from disk");
4906 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4907 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4908 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4909 aeMain(server.el);
4910 aeDeleteEventLoop(server.el);
4911 return 0;
4912 }