]> git.saurik.com Git - redis.git/blob - redis.c
010dfb8fe53541b30dd42c77ab667b6b5e839e62
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.000"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60
61 #include "redis.h"
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
70
71 /* Error codes */
72 #define REDIS_OK 0
73 #define REDIS_ERR -1
74
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
88
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
91
92 /* Command flags */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
100
101 /* Object types */
102 #define REDIS_STRING 0
103 #define REDIS_LIST 1
104 #define REDIS_SET 2
105 #define REDIS_HASH 3
106
107 /* Objects encoding */
108 #define REDIS_ENCODING_RAW 0 /* Raw representation */
109 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
110
111 /* Object types only used for dumping to disk */
112 #define REDIS_EXPIRETIME 253
113 #define REDIS_SELECTDB 254
114 #define REDIS_EOF 255
115
116 /* Defines related to the dump file format. To store 32 bits lengths for short
117 * keys requires a lot of space, so we check the most significant 2 bits of
118 * the first byte to interpreter the length:
119 *
120 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
121 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
122 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
123 * 11|000000 this means: specially encoded object will follow. The six bits
124 * number specify the kind of object that follows.
125 * See the REDIS_RDB_ENC_* defines.
126 *
127 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
128 * values, will fit inside. */
129 #define REDIS_RDB_6BITLEN 0
130 #define REDIS_RDB_14BITLEN 1
131 #define REDIS_RDB_32BITLEN 2
132 #define REDIS_RDB_ENCVAL 3
133 #define REDIS_RDB_LENERR UINT_MAX
134
135 /* When a length of a string object stored on disk has the first two bits
136 * set, the remaining two bits specify a special encoding for the object
137 * accordingly to the following defines: */
138 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
139 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
140 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
141 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
142
143 /* Client flags */
144 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
145 #define REDIS_SLAVE 2 /* This client is a slave server */
146 #define REDIS_MASTER 4 /* This client is a master server */
147 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
148
149 /* Slave replication state - slave side */
150 #define REDIS_REPL_NONE 0 /* No active replication */
151 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
152 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
153
154 /* Slave replication state - from the point of view of master
155 * Note that in SEND_BULK and ONLINE state the slave receives new updates
156 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
157 * to start the next background saving in order to send updates to it. */
158 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
159 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
160 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
161 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
162
163 /* List related stuff */
164 #define REDIS_HEAD 0
165 #define REDIS_TAIL 1
166
167 /* Sort operations */
168 #define REDIS_SORT_GET 0
169 #define REDIS_SORT_DEL 1
170 #define REDIS_SORT_INCR 2
171 #define REDIS_SORT_DECR 3
172 #define REDIS_SORT_ASC 4
173 #define REDIS_SORT_DESC 5
174 #define REDIS_SORTKEY_MAX 1024
175
176 /* Log levels */
177 #define REDIS_DEBUG 0
178 #define REDIS_NOTICE 1
179 #define REDIS_WARNING 2
180
181 /* Anti-warning macro... */
182 #define REDIS_NOTUSED(V) ((void) V)
183
184
185 /*================================= Data types ============================== */
186
187 /* A redis object, that is a type able to hold a string / list / set */
188 typedef struct redisObject {
189 void *ptr;
190 unsigned char type;
191 unsigned char encoding;
192 unsigned char notused[2];
193 int refcount;
194 } robj;
195
196 typedef struct redisDb {
197 dict *dict;
198 dict *expires;
199 int id;
200 } redisDb;
201
202 /* With multiplexing we need to take per-clinet state.
203 * Clients are taken in a liked list. */
204 typedef struct redisClient {
205 int fd;
206 redisDb *db;
207 int dictid;
208 sds querybuf;
209 robj **argv;
210 int argc;
211 int bulklen; /* bulk read len. -1 if not in bulk read mode */
212 list *reply;
213 int sentlen;
214 time_t lastinteraction; /* time of the last interaction, used for timeout */
215 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
216 int slaveseldb; /* slave selected db, if this client is a slave */
217 int authenticated; /* when requirepass is non-NULL */
218 int replstate; /* replication state if this is a slave */
219 int repldbfd; /* replication DB file descriptor */
220 long repldboff; /* replication DB file offset */
221 off_t repldbsize; /* replication DB file size */
222 } redisClient;
223
224 struct saveparam {
225 time_t seconds;
226 int changes;
227 };
228
229 /* Global server state structure */
230 struct redisServer {
231 int port;
232 int fd;
233 redisDb *db;
234 dict *sharingpool;
235 unsigned int sharingpoolsize;
236 long long dirty; /* changes to DB from the last save */
237 list *clients;
238 list *slaves, *monitors;
239 char neterr[ANET_ERR_LEN];
240 aeEventLoop *el;
241 int cronloops; /* number of times the cron function run */
242 list *objfreelist; /* A list of freed objects to avoid malloc() */
243 time_t lastsave; /* Unix time of last save succeeede */
244 size_t usedmemory; /* Used memory in megabytes */
245 /* Fields used only for stats */
246 time_t stat_starttime; /* server start time */
247 long long stat_numcommands; /* number of processed commands */
248 long long stat_numconnections; /* number of connections received */
249 /* Configuration */
250 int verbosity;
251 int glueoutputbuf;
252 int maxidletime;
253 int dbnum;
254 int daemonize;
255 char *pidfile;
256 int bgsaveinprogress;
257 pid_t bgsavechildpid;
258 struct saveparam *saveparams;
259 int saveparamslen;
260 char *logfile;
261 char *bindaddr;
262 char *dbfilename;
263 char *requirepass;
264 int shareobjects;
265 /* Replication related */
266 int isslave;
267 char *masterhost;
268 int masterport;
269 redisClient *master; /* client that is master for this slave */
270 int replstate;
271 unsigned int maxclients;
272 unsigned int maxmemory;
273 /* Sort parameters - qsort_r() is only available under BSD so we
274 * have to take this state global, in order to pass it to sortCompare() */
275 int sort_desc;
276 int sort_alpha;
277 int sort_bypattern;
278 };
279
280 typedef void redisCommandProc(redisClient *c);
281 struct redisCommand {
282 char *name;
283 redisCommandProc *proc;
284 int arity;
285 int flags;
286 };
287
288 struct redisFunctionSym {
289 char *name;
290 unsigned long pointer;
291 };
292
293 typedef struct _redisSortObject {
294 robj *obj;
295 union {
296 double score;
297 robj *cmpobj;
298 } u;
299 } redisSortObject;
300
301 typedef struct _redisSortOperation {
302 int type;
303 robj *pattern;
304 } redisSortOperation;
305
306 struct sharedObjectsStruct {
307 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
308 *colon, *nullbulk, *nullmultibulk,
309 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
310 *outofrangeerr, *plus,
311 *select0, *select1, *select2, *select3, *select4,
312 *select5, *select6, *select7, *select8, *select9;
313 } shared;
314
315 /*================================ Prototypes =============================== */
316
317 static void freeStringObject(robj *o);
318 static void freeListObject(robj *o);
319 static void freeSetObject(robj *o);
320 static void decrRefCount(void *o);
321 static robj *createObject(int type, void *ptr);
322 static void freeClient(redisClient *c);
323 static int rdbLoad(char *filename);
324 static void addReply(redisClient *c, robj *obj);
325 static void addReplySds(redisClient *c, sds s);
326 static void incrRefCount(robj *o);
327 static int rdbSaveBackground(char *filename);
328 static robj *createStringObject(char *ptr, size_t len);
329 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
330 static int syncWithMaster(void);
331 static robj *tryObjectSharing(robj *o);
332 static int tryObjectEncoding(robj *o);
333 static robj *getDecodedObject(const robj *o);
334 static int removeExpire(redisDb *db, robj *key);
335 static int expireIfNeeded(redisDb *db, robj *key);
336 static int deleteIfVolatile(redisDb *db, robj *key);
337 static int deleteKey(redisDb *db, robj *key);
338 static time_t getExpire(redisDb *db, robj *key);
339 static int setExpire(redisDb *db, robj *key, time_t when);
340 static void updateSlavesWaitingBgsave(int bgsaveerr);
341 static void freeMemoryIfNeeded(void);
342 static int processCommand(redisClient *c);
343 static void setupSigSegvAction(void);
344 static void rdbRemoveTempFile(pid_t childpid);
345
346 static void authCommand(redisClient *c);
347 static void pingCommand(redisClient *c);
348 static void echoCommand(redisClient *c);
349 static void setCommand(redisClient *c);
350 static void setnxCommand(redisClient *c);
351 static void getCommand(redisClient *c);
352 static void delCommand(redisClient *c);
353 static void existsCommand(redisClient *c);
354 static void incrCommand(redisClient *c);
355 static void decrCommand(redisClient *c);
356 static void incrbyCommand(redisClient *c);
357 static void decrbyCommand(redisClient *c);
358 static void selectCommand(redisClient *c);
359 static void randomkeyCommand(redisClient *c);
360 static void keysCommand(redisClient *c);
361 static void dbsizeCommand(redisClient *c);
362 static void lastsaveCommand(redisClient *c);
363 static void saveCommand(redisClient *c);
364 static void bgsaveCommand(redisClient *c);
365 static void shutdownCommand(redisClient *c);
366 static void moveCommand(redisClient *c);
367 static void renameCommand(redisClient *c);
368 static void renamenxCommand(redisClient *c);
369 static void lpushCommand(redisClient *c);
370 static void rpushCommand(redisClient *c);
371 static void lpopCommand(redisClient *c);
372 static void rpopCommand(redisClient *c);
373 static void llenCommand(redisClient *c);
374 static void lindexCommand(redisClient *c);
375 static void lrangeCommand(redisClient *c);
376 static void ltrimCommand(redisClient *c);
377 static void typeCommand(redisClient *c);
378 static void lsetCommand(redisClient *c);
379 static void saddCommand(redisClient *c);
380 static void sremCommand(redisClient *c);
381 static void smoveCommand(redisClient *c);
382 static void sismemberCommand(redisClient *c);
383 static void scardCommand(redisClient *c);
384 static void spopCommand(redisClient *c);
385 static void sinterCommand(redisClient *c);
386 static void sinterstoreCommand(redisClient *c);
387 static void sunionCommand(redisClient *c);
388 static void sunionstoreCommand(redisClient *c);
389 static void sdiffCommand(redisClient *c);
390 static void sdiffstoreCommand(redisClient *c);
391 static void syncCommand(redisClient *c);
392 static void flushdbCommand(redisClient *c);
393 static void flushallCommand(redisClient *c);
394 static void sortCommand(redisClient *c);
395 static void lremCommand(redisClient *c);
396 static void infoCommand(redisClient *c);
397 static void mgetCommand(redisClient *c);
398 static void monitorCommand(redisClient *c);
399 static void expireCommand(redisClient *c);
400 static void getSetCommand(redisClient *c);
401 static void ttlCommand(redisClient *c);
402 static void slaveofCommand(redisClient *c);
403 static void debugCommand(redisClient *c);
404 /*================================= Globals ================================= */
405
406 /* Global vars */
407 static struct redisServer server; /* server global state */
408 static struct redisCommand cmdTable[] = {
409 {"get",getCommand,2,REDIS_CMD_INLINE},
410 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
411 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
412 {"del",delCommand,-2,REDIS_CMD_INLINE},
413 {"exists",existsCommand,2,REDIS_CMD_INLINE},
414 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
415 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
416 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
417 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
418 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
419 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
420 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
421 {"llen",llenCommand,2,REDIS_CMD_INLINE},
422 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
423 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
424 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
425 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
426 {"lrem",lremCommand,4,REDIS_CMD_BULK},
427 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
428 {"srem",sremCommand,3,REDIS_CMD_BULK},
429 {"smove",smoveCommand,4,REDIS_CMD_BULK},
430 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
431 {"scard",scardCommand,2,REDIS_CMD_INLINE},
432 {"spop",spopCommand,2,REDIS_CMD_INLINE},
433 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
434 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
435 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
436 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
437 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
438 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
439 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
440 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
441 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
442 {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
443 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
444 {"select",selectCommand,2,REDIS_CMD_INLINE},
445 {"move",moveCommand,3,REDIS_CMD_INLINE},
446 {"rename",renameCommand,3,REDIS_CMD_INLINE},
447 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
448 {"expire",expireCommand,3,REDIS_CMD_INLINE},
449 {"keys",keysCommand,2,REDIS_CMD_INLINE},
450 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
451 {"auth",authCommand,2,REDIS_CMD_INLINE},
452 {"ping",pingCommand,1,REDIS_CMD_INLINE},
453 {"echo",echoCommand,2,REDIS_CMD_BULK},
454 {"save",saveCommand,1,REDIS_CMD_INLINE},
455 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
456 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
457 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
458 {"type",typeCommand,2,REDIS_CMD_INLINE},
459 {"sync",syncCommand,1,REDIS_CMD_INLINE},
460 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
461 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
462 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
463 {"info",infoCommand,1,REDIS_CMD_INLINE},
464 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
465 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
466 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
467 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
468 {NULL,NULL,0,0}
469 };
470 /*============================ Utility functions ============================ */
471
472 /* Glob-style pattern matching. */
473 int stringmatchlen(const char *pattern, int patternLen,
474 const char *string, int stringLen, int nocase)
475 {
476 while(patternLen) {
477 switch(pattern[0]) {
478 case '*':
479 while (pattern[1] == '*') {
480 pattern++;
481 patternLen--;
482 }
483 if (patternLen == 1)
484 return 1; /* match */
485 while(stringLen) {
486 if (stringmatchlen(pattern+1, patternLen-1,
487 string, stringLen, nocase))
488 return 1; /* match */
489 string++;
490 stringLen--;
491 }
492 return 0; /* no match */
493 break;
494 case '?':
495 if (stringLen == 0)
496 return 0; /* no match */
497 string++;
498 stringLen--;
499 break;
500 case '[':
501 {
502 int not, match;
503
504 pattern++;
505 patternLen--;
506 not = pattern[0] == '^';
507 if (not) {
508 pattern++;
509 patternLen--;
510 }
511 match = 0;
512 while(1) {
513 if (pattern[0] == '\\') {
514 pattern++;
515 patternLen--;
516 if (pattern[0] == string[0])
517 match = 1;
518 } else if (pattern[0] == ']') {
519 break;
520 } else if (patternLen == 0) {
521 pattern--;
522 patternLen++;
523 break;
524 } else if (pattern[1] == '-' && patternLen >= 3) {
525 int start = pattern[0];
526 int end = pattern[2];
527 int c = string[0];
528 if (start > end) {
529 int t = start;
530 start = end;
531 end = t;
532 }
533 if (nocase) {
534 start = tolower(start);
535 end = tolower(end);
536 c = tolower(c);
537 }
538 pattern += 2;
539 patternLen -= 2;
540 if (c >= start && c <= end)
541 match = 1;
542 } else {
543 if (!nocase) {
544 if (pattern[0] == string[0])
545 match = 1;
546 } else {
547 if (tolower((int)pattern[0]) == tolower((int)string[0]))
548 match = 1;
549 }
550 }
551 pattern++;
552 patternLen--;
553 }
554 if (not)
555 match = !match;
556 if (!match)
557 return 0; /* no match */
558 string++;
559 stringLen--;
560 break;
561 }
562 case '\\':
563 if (patternLen >= 2) {
564 pattern++;
565 patternLen--;
566 }
567 /* fall through */
568 default:
569 if (!nocase) {
570 if (pattern[0] != string[0])
571 return 0; /* no match */
572 } else {
573 if (tolower((int)pattern[0]) != tolower((int)string[0]))
574 return 0; /* no match */
575 }
576 string++;
577 stringLen--;
578 break;
579 }
580 pattern++;
581 patternLen--;
582 if (stringLen == 0) {
583 while(*pattern == '*') {
584 pattern++;
585 patternLen--;
586 }
587 break;
588 }
589 }
590 if (patternLen == 0 && stringLen == 0)
591 return 1;
592 return 0;
593 }
594
595 static void redisLog(int level, const char *fmt, ...) {
596 va_list ap;
597 FILE *fp;
598
599 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
600 if (!fp) return;
601
602 va_start(ap, fmt);
603 if (level >= server.verbosity) {
604 char *c = ".-*";
605 char buf[64];
606 time_t now;
607
608 now = time(NULL);
609 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
610 fprintf(fp,"%s %c ",buf,c[level]);
611 vfprintf(fp, fmt, ap);
612 fprintf(fp,"\n");
613 fflush(fp);
614 }
615 va_end(ap);
616
617 if (server.logfile) fclose(fp);
618 }
619
620 /*====================== Hash table type implementation ==================== */
621
622 /* This is an hash table type that uses the SDS dynamic strings libary as
623 * keys and radis objects as values (objects can hold SDS strings,
624 * lists, sets). */
625
626 static int sdsDictKeyCompare(void *privdata, const void *key1,
627 const void *key2)
628 {
629 int l1,l2;
630 DICT_NOTUSED(privdata);
631
632 l1 = sdslen((sds)key1);
633 l2 = sdslen((sds)key2);
634 if (l1 != l2) return 0;
635 return memcmp(key1, key2, l1) == 0;
636 }
637
638 static void dictRedisObjectDestructor(void *privdata, void *val)
639 {
640 DICT_NOTUSED(privdata);
641
642 decrRefCount(val);
643 }
644
645 static int dictObjKeyCompare(void *privdata, const void *key1,
646 const void *key2)
647 {
648 const robj *o1 = key1, *o2 = key2;
649 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
650 }
651
652 static unsigned int dictObjHash(const void *key) {
653 const robj *o = key;
654 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
655 }
656
657 static int dictEncObjKeyCompare(void *privdata, const void *key1,
658 const void *key2)
659 {
660 const robj *o1 = key1, *o2 = key2;
661
662 if (o1->encoding == REDIS_ENCODING_RAW &&
663 o2->encoding == REDIS_ENCODING_RAW)
664 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
665 else {
666 robj *dec1, *dec2;
667 int cmp;
668
669 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
670 getDecodedObject(o1) : (robj*)o1;
671 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
672 getDecodedObject(o2) : (robj*)o2;
673 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
674 if (dec1 != o1) decrRefCount(dec1);
675 if (dec2 != o2) decrRefCount(dec2);
676 return cmp;
677 }
678 }
679
680 static unsigned int dictEncObjHash(const void *key) {
681 const robj *o = key;
682
683 if (o->encoding == REDIS_ENCODING_RAW)
684 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
685 else {
686 robj *dec = getDecodedObject(o);
687 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
688 decrRefCount(dec);
689 return hash;
690 }
691 }
692
693 static dictType setDictType = {
694 dictEncObjHash, /* hash function */
695 NULL, /* key dup */
696 NULL, /* val dup */
697 dictEncObjKeyCompare, /* key compare */
698 dictRedisObjectDestructor, /* key destructor */
699 NULL /* val destructor */
700 };
701
702 static dictType hashDictType = {
703 dictObjHash, /* hash function */
704 NULL, /* key dup */
705 NULL, /* val dup */
706 dictObjKeyCompare, /* key compare */
707 dictRedisObjectDestructor, /* key destructor */
708 dictRedisObjectDestructor /* val destructor */
709 };
710
711 /* ========================= Random utility functions ======================= */
712
713 /* Redis generally does not try to recover from out of memory conditions
714 * when allocating objects or strings, it is not clear if it will be possible
715 * to report this condition to the client since the networking layer itself
716 * is based on heap allocation for send buffers, so we simply abort.
717 * At least the code will be simpler to read... */
718 static void oom(const char *msg) {
719 fprintf(stderr, "%s: Out of memory\n",msg);
720 fflush(stderr);
721 sleep(1);
722 abort();
723 }
724
725 /* ====================== Redis server networking stuff ===================== */
726 static void closeTimedoutClients(void) {
727 redisClient *c;
728 listNode *ln;
729 time_t now = time(NULL);
730
731 listRewind(server.clients);
732 while ((ln = listYield(server.clients)) != NULL) {
733 c = listNodeValue(ln);
734 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
735 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
736 (now - c->lastinteraction > server.maxidletime)) {
737 redisLog(REDIS_DEBUG,"Closing idle client");
738 freeClient(c);
739 }
740 }
741 }
742
743 static int htNeedsResize(dict *dict) {
744 long long size, used;
745
746 size = dictSlots(dict);
747 used = dictSize(dict);
748 return (size && used && size > DICT_HT_INITIAL_SIZE &&
749 (used*100/size < REDIS_HT_MINFILL));
750 }
751
752 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
753 * we resize the hash table to save memory */
754 static void tryResizeHashTables(void) {
755 int j;
756
757 for (j = 0; j < server.dbnum; j++) {
758 if (htNeedsResize(server.db[j].dict)) {
759 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
760 dictResize(server.db[j].dict);
761 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
762 }
763 if (htNeedsResize(server.db[j].expires))
764 dictResize(server.db[j].expires);
765 }
766 }
767
768 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
769 int j, loops = server.cronloops++;
770 REDIS_NOTUSED(eventLoop);
771 REDIS_NOTUSED(id);
772 REDIS_NOTUSED(clientData);
773
774 /* Update the global state with the amount of used memory */
775 server.usedmemory = zmalloc_used_memory();
776
777 /* Show some info about non-empty databases */
778 for (j = 0; j < server.dbnum; j++) {
779 long long size, used, vkeys;
780
781 size = dictSlots(server.db[j].dict);
782 used = dictSize(server.db[j].dict);
783 vkeys = dictSize(server.db[j].expires);
784 if (!(loops % 5) && (used || vkeys)) {
785 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
786 /* dictPrintStats(server.dict); */
787 }
788 }
789
790 /* We don't want to resize the hash tables while a bacground saving
791 * is in progress: the saving child is created using fork() that is
792 * implemented with a copy-on-write semantic in most modern systems, so
793 * if we resize the HT while there is the saving child at work actually
794 * a lot of memory movements in the parent will cause a lot of pages
795 * copied. */
796 if (!server.bgsaveinprogress) tryResizeHashTables();
797
798 /* Show information about connected clients */
799 if (!(loops % 5)) {
800 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
801 listLength(server.clients)-listLength(server.slaves),
802 listLength(server.slaves),
803 server.usedmemory,
804 dictSize(server.sharingpool));
805 }
806
807 /* Close connections of timedout clients */
808 if (server.maxidletime && !(loops % 10))
809 closeTimedoutClients();
810
811 /* Check if a background saving in progress terminated */
812 if (server.bgsaveinprogress) {
813 int statloc;
814 if (wait4(-1,&statloc,WNOHANG,NULL)) {
815 int exitcode = WEXITSTATUS(statloc);
816 int bysignal = WIFSIGNALED(statloc);
817
818 if (!bysignal && exitcode == 0) {
819 redisLog(REDIS_NOTICE,
820 "Background saving terminated with success");
821 server.dirty = 0;
822 server.lastsave = time(NULL);
823 } else if (!bysignal && exitcode != 0) {
824 redisLog(REDIS_WARNING, "Background saving error");
825 } else {
826 redisLog(REDIS_WARNING,
827 "Background saving terminated by signal");
828 rdbRemoveTempFile(server.bgsavechildpid);
829 }
830 server.bgsaveinprogress = 0;
831 server.bgsavechildpid = -1;
832 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
833 }
834 } else {
835 /* If there is not a background saving in progress check if
836 * we have to save now */
837 time_t now = time(NULL);
838 for (j = 0; j < server.saveparamslen; j++) {
839 struct saveparam *sp = server.saveparams+j;
840
841 if (server.dirty >= sp->changes &&
842 now-server.lastsave > sp->seconds) {
843 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
844 sp->changes, sp->seconds);
845 rdbSaveBackground(server.dbfilename);
846 break;
847 }
848 }
849 }
850
851 /* Try to expire a few timed out keys */
852 for (j = 0; j < server.dbnum; j++) {
853 redisDb *db = server.db+j;
854 int num = dictSize(db->expires);
855
856 if (num) {
857 time_t now = time(NULL);
858
859 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
860 num = REDIS_EXPIRELOOKUPS_PER_CRON;
861 while (num--) {
862 dictEntry *de;
863 time_t t;
864
865 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
866 t = (time_t) dictGetEntryVal(de);
867 if (now > t) {
868 deleteKey(db,dictGetEntryKey(de));
869 }
870 }
871 }
872 }
873
874 /* Check if we should connect to a MASTER */
875 if (server.replstate == REDIS_REPL_CONNECT) {
876 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
877 if (syncWithMaster() == REDIS_OK) {
878 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
879 }
880 }
881 return 1000;
882 }
883
884 static void createSharedObjects(void) {
885 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
886 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
887 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
888 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
889 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
890 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
891 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
892 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
893 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
894 /* no such key */
895 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
896 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
897 "-ERR Operation against a key holding the wrong kind of value\r\n"));
898 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
899 "-ERR no such key\r\n"));
900 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
901 "-ERR syntax error\r\n"));
902 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
903 "-ERR source and destination objects are the same\r\n"));
904 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
905 "-ERR index out of range\r\n"));
906 shared.space = createObject(REDIS_STRING,sdsnew(" "));
907 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
908 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
909 shared.select0 = createStringObject("select 0\r\n",10);
910 shared.select1 = createStringObject("select 1\r\n",10);
911 shared.select2 = createStringObject("select 2\r\n",10);
912 shared.select3 = createStringObject("select 3\r\n",10);
913 shared.select4 = createStringObject("select 4\r\n",10);
914 shared.select5 = createStringObject("select 5\r\n",10);
915 shared.select6 = createStringObject("select 6\r\n",10);
916 shared.select7 = createStringObject("select 7\r\n",10);
917 shared.select8 = createStringObject("select 8\r\n",10);
918 shared.select9 = createStringObject("select 9\r\n",10);
919 }
920
921 static void appendServerSaveParams(time_t seconds, int changes) {
922 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
923 if (server.saveparams == NULL) oom("appendServerSaveParams");
924 server.saveparams[server.saveparamslen].seconds = seconds;
925 server.saveparams[server.saveparamslen].changes = changes;
926 server.saveparamslen++;
927 }
928
929 static void ResetServerSaveParams() {
930 zfree(server.saveparams);
931 server.saveparams = NULL;
932 server.saveparamslen = 0;
933 }
934
935 static void initServerConfig() {
936 server.dbnum = REDIS_DEFAULT_DBNUM;
937 server.port = REDIS_SERVERPORT;
938 server.verbosity = REDIS_DEBUG;
939 server.maxidletime = REDIS_MAXIDLETIME;
940 server.saveparams = NULL;
941 server.logfile = NULL; /* NULL = log on standard output */
942 server.bindaddr = NULL;
943 server.glueoutputbuf = 1;
944 server.daemonize = 0;
945 server.pidfile = "/var/run/redis.pid";
946 server.dbfilename = "dump.rdb";
947 server.requirepass = NULL;
948 server.shareobjects = 0;
949 server.sharingpoolsize = 1024;
950 server.maxclients = 0;
951 server.maxmemory = 0;
952 ResetServerSaveParams();
953
954 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
955 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
956 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
957 /* Replication related */
958 server.isslave = 0;
959 server.masterhost = NULL;
960 server.masterport = 6379;
961 server.master = NULL;
962 server.replstate = REDIS_REPL_NONE;
963 }
964
965 static void initServer() {
966 int j;
967
968 signal(SIGHUP, SIG_IGN);
969 signal(SIGPIPE, SIG_IGN);
970 setupSigSegvAction();
971
972 server.clients = listCreate();
973 server.slaves = listCreate();
974 server.monitors = listCreate();
975 server.objfreelist = listCreate();
976 createSharedObjects();
977 server.el = aeCreateEventLoop();
978 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
979 server.sharingpool = dictCreate(&setDictType,NULL);
980 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
981 oom("server initialization"); /* Fatal OOM */
982 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
983 if (server.fd == -1) {
984 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
985 exit(1);
986 }
987 for (j = 0; j < server.dbnum; j++) {
988 server.db[j].dict = dictCreate(&hashDictType,NULL);
989 server.db[j].expires = dictCreate(&setDictType,NULL);
990 server.db[j].id = j;
991 }
992 server.cronloops = 0;
993 server.bgsaveinprogress = 0;
994 server.bgsavechildpid = -1;
995 server.lastsave = time(NULL);
996 server.dirty = 0;
997 server.usedmemory = 0;
998 server.stat_numcommands = 0;
999 server.stat_numconnections = 0;
1000 server.stat_starttime = time(NULL);
1001 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1002 }
1003
1004 /* Empty the whole database */
1005 static long long emptyDb() {
1006 int j;
1007 long long removed = 0;
1008
1009 for (j = 0; j < server.dbnum; j++) {
1010 removed += dictSize(server.db[j].dict);
1011 dictEmpty(server.db[j].dict);
1012 dictEmpty(server.db[j].expires);
1013 }
1014 return removed;
1015 }
1016
1017 static int yesnotoi(char *s) {
1018 if (!strcasecmp(s,"yes")) return 1;
1019 else if (!strcasecmp(s,"no")) return 0;
1020 else return -1;
1021 }
1022
1023 /* I agree, this is a very rudimental way to load a configuration...
1024 will improve later if the config gets more complex */
1025 static void loadServerConfig(char *filename) {
1026 FILE *fp;
1027 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1028 int linenum = 0;
1029 sds line = NULL;
1030
1031 if (filename[0] == '-' && filename[1] == '\0')
1032 fp = stdin;
1033 else {
1034 if ((fp = fopen(filename,"r")) == NULL) {
1035 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1036 exit(1);
1037 }
1038 }
1039
1040 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1041 sds *argv;
1042 int argc, j;
1043
1044 linenum++;
1045 line = sdsnew(buf);
1046 line = sdstrim(line," \t\r\n");
1047
1048 /* Skip comments and blank lines*/
1049 if (line[0] == '#' || line[0] == '\0') {
1050 sdsfree(line);
1051 continue;
1052 }
1053
1054 /* Split into arguments */
1055 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1056 sdstolower(argv[0]);
1057
1058 /* Execute config directives */
1059 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1060 server.maxidletime = atoi(argv[1]);
1061 if (server.maxidletime < 0) {
1062 err = "Invalid timeout value"; goto loaderr;
1063 }
1064 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1065 server.port = atoi(argv[1]);
1066 if (server.port < 1 || server.port > 65535) {
1067 err = "Invalid port"; goto loaderr;
1068 }
1069 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1070 server.bindaddr = zstrdup(argv[1]);
1071 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1072 int seconds = atoi(argv[1]);
1073 int changes = atoi(argv[2]);
1074 if (seconds < 1 || changes < 0) {
1075 err = "Invalid save parameters"; goto loaderr;
1076 }
1077 appendServerSaveParams(seconds,changes);
1078 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1079 if (chdir(argv[1]) == -1) {
1080 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1081 argv[1], strerror(errno));
1082 exit(1);
1083 }
1084 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1085 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1086 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1087 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1088 else {
1089 err = "Invalid log level. Must be one of debug, notice, warning";
1090 goto loaderr;
1091 }
1092 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1093 FILE *logfp;
1094
1095 server.logfile = zstrdup(argv[1]);
1096 if (!strcasecmp(server.logfile,"stdout")) {
1097 zfree(server.logfile);
1098 server.logfile = NULL;
1099 }
1100 if (server.logfile) {
1101 /* Test if we are able to open the file. The server will not
1102 * be able to abort just for this problem later... */
1103 logfp = fopen(server.logfile,"a");
1104 if (logfp == NULL) {
1105 err = sdscatprintf(sdsempty(),
1106 "Can't open the log file: %s", strerror(errno));
1107 goto loaderr;
1108 }
1109 fclose(logfp);
1110 }
1111 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1112 server.dbnum = atoi(argv[1]);
1113 if (server.dbnum < 1) {
1114 err = "Invalid number of databases"; goto loaderr;
1115 }
1116 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1117 server.maxclients = atoi(argv[1]);
1118 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1119 server.maxmemory = atoi(argv[1]);
1120 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1121 server.masterhost = sdsnew(argv[1]);
1122 server.masterport = atoi(argv[2]);
1123 server.replstate = REDIS_REPL_CONNECT;
1124 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1125 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1126 err = "argument must be 'yes' or 'no'"; goto loaderr;
1127 }
1128 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1129 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1130 err = "argument must be 'yes' or 'no'"; goto loaderr;
1131 }
1132 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1133 server.sharingpoolsize = atoi(argv[1]);
1134 if (server.sharingpoolsize < 1) {
1135 err = "invalid object sharing pool size"; goto loaderr;
1136 }
1137 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1138 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1139 err = "argument must be 'yes' or 'no'"; goto loaderr;
1140 }
1141 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1142 server.requirepass = zstrdup(argv[1]);
1143 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1144 server.pidfile = zstrdup(argv[1]);
1145 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1146 server.dbfilename = zstrdup(argv[1]);
1147 } else {
1148 err = "Bad directive or wrong number of arguments"; goto loaderr;
1149 }
1150 for (j = 0; j < argc; j++)
1151 sdsfree(argv[j]);
1152 zfree(argv);
1153 sdsfree(line);
1154 }
1155 if (fp != stdin) fclose(fp);
1156 return;
1157
1158 loaderr:
1159 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1160 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1161 fprintf(stderr, ">>> '%s'\n", line);
1162 fprintf(stderr, "%s\n", err);
1163 exit(1);
1164 }
1165
1166 static void freeClientArgv(redisClient *c) {
1167 int j;
1168
1169 for (j = 0; j < c->argc; j++)
1170 decrRefCount(c->argv[j]);
1171 c->argc = 0;
1172 }
1173
1174 static void freeClient(redisClient *c) {
1175 listNode *ln;
1176
1177 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1178 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1179 sdsfree(c->querybuf);
1180 listRelease(c->reply);
1181 freeClientArgv(c);
1182 close(c->fd);
1183 ln = listSearchKey(server.clients,c);
1184 assert(ln != NULL);
1185 listDelNode(server.clients,ln);
1186 if (c->flags & REDIS_SLAVE) {
1187 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1188 close(c->repldbfd);
1189 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1190 ln = listSearchKey(l,c);
1191 assert(ln != NULL);
1192 listDelNode(l,ln);
1193 }
1194 if (c->flags & REDIS_MASTER) {
1195 server.master = NULL;
1196 server.replstate = REDIS_REPL_CONNECT;
1197 }
1198 zfree(c->argv);
1199 zfree(c);
1200 }
1201
1202 static void glueReplyBuffersIfNeeded(redisClient *c) {
1203 int totlen = 0;
1204 listNode *ln;
1205 robj *o;
1206
1207 listRewind(c->reply);
1208 while((ln = listYield(c->reply))) {
1209 o = ln->value;
1210 totlen += sdslen(o->ptr);
1211 /* This optimization makes more sense if we don't have to copy
1212 * too much data */
1213 if (totlen > 1024) return;
1214 }
1215 if (totlen > 0) {
1216 char buf[1024];
1217 int copylen = 0;
1218
1219 listRewind(c->reply);
1220 while((ln = listYield(c->reply))) {
1221 o = ln->value;
1222 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1223 copylen += sdslen(o->ptr);
1224 listDelNode(c->reply,ln);
1225 }
1226 /* Now the output buffer is empty, add the new single element */
1227 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1228 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1229 }
1230 }
1231
1232 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1233 redisClient *c = privdata;
1234 int nwritten = 0, totwritten = 0, objlen;
1235 robj *o;
1236 REDIS_NOTUSED(el);
1237 REDIS_NOTUSED(mask);
1238
1239 if (server.glueoutputbuf && listLength(c->reply) > 1)
1240 glueReplyBuffersIfNeeded(c);
1241 while(listLength(c->reply)) {
1242 o = listNodeValue(listFirst(c->reply));
1243 objlen = sdslen(o->ptr);
1244
1245 if (objlen == 0) {
1246 listDelNode(c->reply,listFirst(c->reply));
1247 continue;
1248 }
1249
1250 if (c->flags & REDIS_MASTER) {
1251 /* Don't reply to a master */
1252 nwritten = objlen - c->sentlen;
1253 } else {
1254 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1255 if (nwritten <= 0) break;
1256 }
1257 c->sentlen += nwritten;
1258 totwritten += nwritten;
1259 /* If we fully sent the object on head go to the next one */
1260 if (c->sentlen == objlen) {
1261 listDelNode(c->reply,listFirst(c->reply));
1262 c->sentlen = 0;
1263 }
1264 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1265 * bytes, in a single threaded server it's a good idea to server
1266 * other clients as well, even if a very large request comes from
1267 * super fast link that is always able to accept data (in real world
1268 * terms think to 'KEYS *' against the loopback interfae) */
1269 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1270 }
1271 if (nwritten == -1) {
1272 if (errno == EAGAIN) {
1273 nwritten = 0;
1274 } else {
1275 redisLog(REDIS_DEBUG,
1276 "Error writing to client: %s", strerror(errno));
1277 freeClient(c);
1278 return;
1279 }
1280 }
1281 if (totwritten > 0) c->lastinteraction = time(NULL);
1282 if (listLength(c->reply) == 0) {
1283 c->sentlen = 0;
1284 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1285 }
1286 }
1287
1288 static struct redisCommand *lookupCommand(char *name) {
1289 int j = 0;
1290 while(cmdTable[j].name != NULL) {
1291 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1292 j++;
1293 }
1294 return NULL;
1295 }
1296
1297 /* resetClient prepare the client to process the next command */
1298 static void resetClient(redisClient *c) {
1299 freeClientArgv(c);
1300 c->bulklen = -1;
1301 }
1302
1303 /* If this function gets called we already read a whole
1304 * command, argments are in the client argv/argc fields.
1305 * processCommand() execute the command or prepare the
1306 * server for a bulk read from the client.
1307 *
1308 * If 1 is returned the client is still alive and valid and
1309 * and other operations can be performed by the caller. Otherwise
1310 * if 0 is returned the client was destroied (i.e. after QUIT). */
1311 static int processCommand(redisClient *c) {
1312 struct redisCommand *cmd;
1313 long long dirty;
1314
1315 /* Free some memory if needed (maxmemory setting) */
1316 if (server.maxmemory) freeMemoryIfNeeded();
1317
1318 /* The QUIT command is handled as a special case. Normal command
1319 * procs are unable to close the client connection safely */
1320 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1321 freeClient(c);
1322 return 0;
1323 }
1324 cmd = lookupCommand(c->argv[0]->ptr);
1325 if (!cmd) {
1326 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1327 resetClient(c);
1328 return 1;
1329 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1330 (c->argc < -cmd->arity)) {
1331 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1332 resetClient(c);
1333 return 1;
1334 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1335 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1336 resetClient(c);
1337 return 1;
1338 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1339 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1340
1341 decrRefCount(c->argv[c->argc-1]);
1342 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1343 c->argc--;
1344 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1345 resetClient(c);
1346 return 1;
1347 }
1348 c->argc--;
1349 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1350 /* It is possible that the bulk read is already in the
1351 * buffer. Check this condition and handle it accordingly */
1352 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1353 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1354 c->argc++;
1355 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1356 } else {
1357 return 1;
1358 }
1359 }
1360 /* Let's try to share objects on the command arguments vector */
1361 if (server.shareobjects) {
1362 int j;
1363 for(j = 1; j < c->argc; j++)
1364 c->argv[j] = tryObjectSharing(c->argv[j]);
1365 }
1366 /* Let's try to encode the bulk object to save space. */
1367 if (cmd->flags & REDIS_CMD_BULK)
1368 tryObjectEncoding(c->argv[c->argc-1]);
1369
1370 /* Check if the user is authenticated */
1371 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1372 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1373 resetClient(c);
1374 return 1;
1375 }
1376
1377 /* Exec the command */
1378 dirty = server.dirty;
1379 cmd->proc(c);
1380 if (server.dirty-dirty != 0 && listLength(server.slaves))
1381 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1382 if (listLength(server.monitors))
1383 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1384 server.stat_numcommands++;
1385
1386 /* Prepare the client for the next command */
1387 if (c->flags & REDIS_CLOSE) {
1388 freeClient(c);
1389 return 0;
1390 }
1391 resetClient(c);
1392 return 1;
1393 }
1394
1395 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1396 listNode *ln;
1397 int outc = 0, j;
1398 robj **outv;
1399 /* (args*2)+1 is enough room for args, spaces, newlines */
1400 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1401
1402 if (argc <= REDIS_STATIC_ARGS) {
1403 outv = static_outv;
1404 } else {
1405 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1406 if (!outv) oom("replicationFeedSlaves");
1407 }
1408
1409 for (j = 0; j < argc; j++) {
1410 if (j != 0) outv[outc++] = shared.space;
1411 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1412 robj *lenobj;
1413
1414 lenobj = createObject(REDIS_STRING,
1415 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv[j]->ptr)));
1416 lenobj->refcount = 0;
1417 outv[outc++] = lenobj;
1418 }
1419 outv[outc++] = argv[j];
1420 }
1421 outv[outc++] = shared.crlf;
1422
1423 /* Increment all the refcounts at start and decrement at end in order to
1424 * be sure to free objects if there is no slave in a replication state
1425 * able to be feed with commands */
1426 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1427 listRewind(slaves);
1428 while((ln = listYield(slaves))) {
1429 redisClient *slave = ln->value;
1430
1431 /* Don't feed slaves that are still waiting for BGSAVE to start */
1432 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1433
1434 /* Feed all the other slaves, MONITORs and so on */
1435 if (slave->slaveseldb != dictid) {
1436 robj *selectcmd;
1437
1438 switch(dictid) {
1439 case 0: selectcmd = shared.select0; break;
1440 case 1: selectcmd = shared.select1; break;
1441 case 2: selectcmd = shared.select2; break;
1442 case 3: selectcmd = shared.select3; break;
1443 case 4: selectcmd = shared.select4; break;
1444 case 5: selectcmd = shared.select5; break;
1445 case 6: selectcmd = shared.select6; break;
1446 case 7: selectcmd = shared.select7; break;
1447 case 8: selectcmd = shared.select8; break;
1448 case 9: selectcmd = shared.select9; break;
1449 default:
1450 selectcmd = createObject(REDIS_STRING,
1451 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1452 selectcmd->refcount = 0;
1453 break;
1454 }
1455 addReply(slave,selectcmd);
1456 slave->slaveseldb = dictid;
1457 }
1458 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1459 }
1460 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1461 if (outv != static_outv) zfree(outv);
1462 }
1463
1464 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1465 redisClient *c = (redisClient*) privdata;
1466 char buf[REDIS_IOBUF_LEN];
1467 int nread;
1468 REDIS_NOTUSED(el);
1469 REDIS_NOTUSED(mask);
1470
1471 nread = read(fd, buf, REDIS_IOBUF_LEN);
1472 if (nread == -1) {
1473 if (errno == EAGAIN) {
1474 nread = 0;
1475 } else {
1476 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1477 freeClient(c);
1478 return;
1479 }
1480 } else if (nread == 0) {
1481 redisLog(REDIS_DEBUG, "Client closed connection");
1482 freeClient(c);
1483 return;
1484 }
1485 if (nread) {
1486 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1487 c->lastinteraction = time(NULL);
1488 } else {
1489 return;
1490 }
1491
1492 again:
1493 if (c->bulklen == -1) {
1494 /* Read the first line of the query */
1495 char *p = strchr(c->querybuf,'\n');
1496 size_t querylen;
1497
1498 if (p) {
1499 sds query, *argv;
1500 int argc, j;
1501
1502 query = c->querybuf;
1503 c->querybuf = sdsempty();
1504 querylen = 1+(p-(query));
1505 if (sdslen(query) > querylen) {
1506 /* leave data after the first line of the query in the buffer */
1507 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1508 }
1509 *p = '\0'; /* remove "\n" */
1510 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1511 sdsupdatelen(query);
1512
1513 /* Now we can split the query in arguments */
1514 if (sdslen(query) == 0) {
1515 /* Ignore empty query */
1516 sdsfree(query);
1517 return;
1518 }
1519 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1520 if (argv == NULL) oom("sdssplitlen");
1521 sdsfree(query);
1522
1523 if (c->argv) zfree(c->argv);
1524 c->argv = zmalloc(sizeof(robj*)*argc);
1525 if (c->argv == NULL) oom("allocating arguments list for client");
1526
1527 for (j = 0; j < argc; j++) {
1528 if (sdslen(argv[j])) {
1529 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1530 c->argc++;
1531 } else {
1532 sdsfree(argv[j]);
1533 }
1534 }
1535 zfree(argv);
1536 /* Execute the command. If the client is still valid
1537 * after processCommand() return and there is something
1538 * on the query buffer try to process the next command. */
1539 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1540 return;
1541 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1542 redisLog(REDIS_DEBUG, "Client protocol error");
1543 freeClient(c);
1544 return;
1545 }
1546 } else {
1547 /* Bulk read handling. Note that if we are at this point
1548 the client already sent a command terminated with a newline,
1549 we are reading the bulk data that is actually the last
1550 argument of the command. */
1551 int qbl = sdslen(c->querybuf);
1552
1553 if (c->bulklen <= qbl) {
1554 /* Copy everything but the final CRLF as final argument */
1555 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1556 c->argc++;
1557 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1558 processCommand(c);
1559 return;
1560 }
1561 }
1562 }
1563
1564 static int selectDb(redisClient *c, int id) {
1565 if (id < 0 || id >= server.dbnum)
1566 return REDIS_ERR;
1567 c->db = &server.db[id];
1568 return REDIS_OK;
1569 }
1570
1571 static void *dupClientReplyValue(void *o) {
1572 incrRefCount((robj*)o);
1573 return 0;
1574 }
1575
1576 static redisClient *createClient(int fd) {
1577 redisClient *c = zmalloc(sizeof(*c));
1578
1579 anetNonBlock(NULL,fd);
1580 anetTcpNoDelay(NULL,fd);
1581 if (!c) return NULL;
1582 selectDb(c,0);
1583 c->fd = fd;
1584 c->querybuf = sdsempty();
1585 c->argc = 0;
1586 c->argv = NULL;
1587 c->bulklen = -1;
1588 c->sentlen = 0;
1589 c->flags = 0;
1590 c->lastinteraction = time(NULL);
1591 c->authenticated = 0;
1592 c->replstate = REDIS_REPL_NONE;
1593 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1594 listSetFreeMethod(c->reply,decrRefCount);
1595 listSetDupMethod(c->reply,dupClientReplyValue);
1596 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1597 readQueryFromClient, c, NULL) == AE_ERR) {
1598 freeClient(c);
1599 return NULL;
1600 }
1601 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1602 return c;
1603 }
1604
1605 static void addReply(redisClient *c, robj *obj) {
1606 if (listLength(c->reply) == 0 &&
1607 (c->replstate == REDIS_REPL_NONE ||
1608 c->replstate == REDIS_REPL_ONLINE) &&
1609 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1610 sendReplyToClient, c, NULL) == AE_ERR) return;
1611 if (obj->encoding != REDIS_ENCODING_RAW) {
1612 obj = getDecodedObject(obj);
1613 } else {
1614 incrRefCount(obj);
1615 }
1616 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1617 }
1618
1619 static void addReplySds(redisClient *c, sds s) {
1620 robj *o = createObject(REDIS_STRING,s);
1621 addReply(c,o);
1622 decrRefCount(o);
1623 }
1624
1625 static void addReplyBulkLen(redisClient *c, robj *obj) {
1626 size_t len;
1627
1628 if (obj->encoding == REDIS_ENCODING_RAW) {
1629 len = sdslen(obj->ptr);
1630 } else {
1631 long n = (long)obj->ptr;
1632
1633 len = 1;
1634 if (n < 0) {
1635 len++;
1636 n = -n;
1637 }
1638 while((n = n/10) != 0) {
1639 len++;
1640 }
1641 }
1642 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1643 }
1644
1645 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1646 int cport, cfd;
1647 char cip[128];
1648 redisClient *c;
1649 REDIS_NOTUSED(el);
1650 REDIS_NOTUSED(mask);
1651 REDIS_NOTUSED(privdata);
1652
1653 cfd = anetAccept(server.neterr, fd, cip, &cport);
1654 if (cfd == AE_ERR) {
1655 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1656 return;
1657 }
1658 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1659 if ((c = createClient(cfd)) == NULL) {
1660 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1661 close(cfd); /* May be already closed, just ingore errors */
1662 return;
1663 }
1664 /* If maxclient directive is set and this is one client more... close the
1665 * connection. Note that we create the client instead to check before
1666 * for this condition, since now the socket is already set in nonblocking
1667 * mode and we can send an error for free using the Kernel I/O */
1668 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1669 char *err = "-ERR max number of clients reached\r\n";
1670
1671 /* That's a best effort error message, don't check write errors */
1672 (void) write(c->fd,err,strlen(err));
1673 freeClient(c);
1674 return;
1675 }
1676 server.stat_numconnections++;
1677 }
1678
1679 /* ======================= Redis objects implementation ===================== */
1680
1681 static robj *createObject(int type, void *ptr) {
1682 robj *o;
1683
1684 if (listLength(server.objfreelist)) {
1685 listNode *head = listFirst(server.objfreelist);
1686 o = listNodeValue(head);
1687 listDelNode(server.objfreelist,head);
1688 } else {
1689 o = zmalloc(sizeof(*o));
1690 }
1691 if (!o) oom("createObject");
1692 o->type = type;
1693 o->encoding = REDIS_ENCODING_RAW;
1694 o->ptr = ptr;
1695 o->refcount = 1;
1696 return o;
1697 }
1698
1699 static robj *createStringObject(char *ptr, size_t len) {
1700 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1701 }
1702
1703 static robj *createListObject(void) {
1704 list *l = listCreate();
1705
1706 if (!l) oom("listCreate");
1707 listSetFreeMethod(l,decrRefCount);
1708 return createObject(REDIS_LIST,l);
1709 }
1710
1711 static robj *createSetObject(void) {
1712 dict *d = dictCreate(&setDictType,NULL);
1713 if (!d) oom("dictCreate");
1714 return createObject(REDIS_SET,d);
1715 }
1716
1717 static void freeStringObject(robj *o) {
1718 if (o->encoding == REDIS_ENCODING_RAW) {
1719 sdsfree(o->ptr);
1720 }
1721 }
1722
1723 static void freeListObject(robj *o) {
1724 listRelease((list*) o->ptr);
1725 }
1726
1727 static void freeSetObject(robj *o) {
1728 dictRelease((dict*) o->ptr);
1729 }
1730
1731 static void freeHashObject(robj *o) {
1732 dictRelease((dict*) o->ptr);
1733 }
1734
1735 static void incrRefCount(robj *o) {
1736 o->refcount++;
1737 #ifdef DEBUG_REFCOUNT
1738 if (o->type == REDIS_STRING)
1739 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1740 #endif
1741 }
1742
1743 static void decrRefCount(void *obj) {
1744 robj *o = obj;
1745
1746 #ifdef DEBUG_REFCOUNT
1747 if (o->type == REDIS_STRING)
1748 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1749 #endif
1750 if (--(o->refcount) == 0) {
1751 switch(o->type) {
1752 case REDIS_STRING: freeStringObject(o); break;
1753 case REDIS_LIST: freeListObject(o); break;
1754 case REDIS_SET: freeSetObject(o); break;
1755 case REDIS_HASH: freeHashObject(o); break;
1756 default: assert(0 != 0); break;
1757 }
1758 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1759 !listAddNodeHead(server.objfreelist,o))
1760 zfree(o);
1761 }
1762 }
1763
1764 static robj *lookupKey(redisDb *db, robj *key) {
1765 dictEntry *de = dictFind(db->dict,key);
1766 return de ? dictGetEntryVal(de) : NULL;
1767 }
1768
1769 static robj *lookupKeyRead(redisDb *db, robj *key) {
1770 expireIfNeeded(db,key);
1771 return lookupKey(db,key);
1772 }
1773
1774 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1775 deleteIfVolatile(db,key);
1776 return lookupKey(db,key);
1777 }
1778
1779 static int deleteKey(redisDb *db, robj *key) {
1780 int retval;
1781
1782 /* We need to protect key from destruction: after the first dictDelete()
1783 * it may happen that 'key' is no longer valid if we don't increment
1784 * it's count. This may happen when we get the object reference directly
1785 * from the hash table with dictRandomKey() or dict iterators */
1786 incrRefCount(key);
1787 if (dictSize(db->expires)) dictDelete(db->expires,key);
1788 retval = dictDelete(db->dict,key);
1789 decrRefCount(key);
1790
1791 return retval == DICT_OK;
1792 }
1793
1794 /* Try to share an object against the shared objects pool */
1795 static robj *tryObjectSharing(robj *o) {
1796 struct dictEntry *de;
1797 unsigned long c;
1798
1799 if (o == NULL || server.shareobjects == 0) return o;
1800
1801 assert(o->type == REDIS_STRING);
1802 de = dictFind(server.sharingpool,o);
1803 if (de) {
1804 robj *shared = dictGetEntryKey(de);
1805
1806 c = ((unsigned long) dictGetEntryVal(de))+1;
1807 dictGetEntryVal(de) = (void*) c;
1808 incrRefCount(shared);
1809 decrRefCount(o);
1810 return shared;
1811 } else {
1812 /* Here we are using a stream algorihtm: Every time an object is
1813 * shared we increment its count, everytime there is a miss we
1814 * recrement the counter of a random object. If this object reaches
1815 * zero we remove the object and put the current object instead. */
1816 if (dictSize(server.sharingpool) >=
1817 server.sharingpoolsize) {
1818 de = dictGetRandomKey(server.sharingpool);
1819 assert(de != NULL);
1820 c = ((unsigned long) dictGetEntryVal(de))-1;
1821 dictGetEntryVal(de) = (void*) c;
1822 if (c == 0) {
1823 dictDelete(server.sharingpool,de->key);
1824 }
1825 } else {
1826 c = 0; /* If the pool is empty we want to add this object */
1827 }
1828 if (c == 0) {
1829 int retval;
1830
1831 retval = dictAdd(server.sharingpool,o,(void*)1);
1832 assert(retval == DICT_OK);
1833 incrRefCount(o);
1834 }
1835 return o;
1836 }
1837 }
1838
1839 /* Try to encode a string object in order to save space */
1840 static int tryObjectEncoding(robj *o) {
1841 long value;
1842 char *endptr, buf[32];
1843 sds s = o->ptr;
1844
1845 if (o->encoding != REDIS_ENCODING_RAW)
1846 return REDIS_ERR; /* Already encoded */
1847
1848 /* It's not save to encode shared objects: shared objects can be shared
1849 * everywhere in the "object space" of Redis. Encoded objects can only
1850 * appear as "values" (and not, for instance, as keys) */
1851 if (o->refcount > 1) return REDIS_ERR;
1852
1853 /* Currently we try to encode only strings */
1854 assert(o->type == REDIS_STRING);
1855
1856 /* Check if it's possible to encode this value as a long. We are assuming
1857 * that sizeof(long) = sizeof(void) in all the supported archs. */
1858 value = strtol(s, &endptr, 10);
1859 if (endptr[0] != '\0') return REDIS_ERR;
1860 snprintf(buf,32,"%ld",value);
1861
1862 /* If the number converted back into a string is not identical
1863 * then it's not possible to encode the string as integer */
1864 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return REDIS_ERR;
1865
1866 /* Ok, this object can be encoded */
1867 o->encoding = REDIS_ENCODING_INT;
1868 sdsfree(o->ptr);
1869 o->ptr = (void*) value;
1870 return REDIS_OK;
1871 }
1872
1873 /* Get a decoded version of an encoded object (returned as a new object) */
1874 static robj *getDecodedObject(const robj *o) {
1875 robj *dec;
1876
1877 assert(o->encoding != REDIS_ENCODING_RAW);
1878 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
1879 char buf[32];
1880
1881 snprintf(buf,32,"%ld",(long)o->ptr);
1882 dec = createStringObject(buf,strlen(buf));
1883 return dec;
1884 } else {
1885 assert(1 != 1);
1886 }
1887 }
1888
1889 /*============================ DB saving/loading ============================ */
1890
1891 static int rdbSaveType(FILE *fp, unsigned char type) {
1892 if (fwrite(&type,1,1,fp) == 0) return -1;
1893 return 0;
1894 }
1895
1896 static int rdbSaveTime(FILE *fp, time_t t) {
1897 int32_t t32 = (int32_t) t;
1898 if (fwrite(&t32,4,1,fp) == 0) return -1;
1899 return 0;
1900 }
1901
1902 /* check rdbLoadLen() comments for more info */
1903 static int rdbSaveLen(FILE *fp, uint32_t len) {
1904 unsigned char buf[2];
1905
1906 if (len < (1<<6)) {
1907 /* Save a 6 bit len */
1908 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
1909 if (fwrite(buf,1,1,fp) == 0) return -1;
1910 } else if (len < (1<<14)) {
1911 /* Save a 14 bit len */
1912 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
1913 buf[1] = len&0xFF;
1914 if (fwrite(buf,2,1,fp) == 0) return -1;
1915 } else {
1916 /* Save a 32 bit len */
1917 buf[0] = (REDIS_RDB_32BITLEN<<6);
1918 if (fwrite(buf,1,1,fp) == 0) return -1;
1919 len = htonl(len);
1920 if (fwrite(&len,4,1,fp) == 0) return -1;
1921 }
1922 return 0;
1923 }
1924
1925 /* String objects in the form "2391" "-100" without any space and with a
1926 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1927 * encoded as integers to save space */
1928 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
1929 long long value;
1930 char *endptr, buf[32];
1931
1932 /* Check if it's possible to encode this value as a number */
1933 value = strtoll(s, &endptr, 10);
1934 if (endptr[0] != '\0') return 0;
1935 snprintf(buf,32,"%lld",value);
1936
1937 /* If the number converted back into a string is not identical
1938 * then it's not possible to encode the string as integer */
1939 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
1940
1941 /* Finally check if it fits in our ranges */
1942 if (value >= -(1<<7) && value <= (1<<7)-1) {
1943 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
1944 enc[1] = value&0xFF;
1945 return 2;
1946 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
1947 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
1948 enc[1] = value&0xFF;
1949 enc[2] = (value>>8)&0xFF;
1950 return 3;
1951 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
1952 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
1953 enc[1] = value&0xFF;
1954 enc[2] = (value>>8)&0xFF;
1955 enc[3] = (value>>16)&0xFF;
1956 enc[4] = (value>>24)&0xFF;
1957 return 5;
1958 } else {
1959 return 0;
1960 }
1961 }
1962
1963 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
1964 unsigned int comprlen, outlen;
1965 unsigned char byte;
1966 void *out;
1967
1968 /* We require at least four bytes compression for this to be worth it */
1969 outlen = sdslen(obj->ptr)-4;
1970 if (outlen <= 0) return 0;
1971 if ((out = zmalloc(outlen+1)) == NULL) return 0;
1972 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
1973 if (comprlen == 0) {
1974 zfree(out);
1975 return 0;
1976 }
1977 /* Data compressed! Let's save it on disk */
1978 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
1979 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
1980 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
1981 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
1982 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
1983 zfree(out);
1984 return comprlen;
1985
1986 writeerr:
1987 zfree(out);
1988 return -1;
1989 }
1990
1991 /* Save a string objet as [len][data] on disk. If the object is a string
1992 * representation of an integer value we try to safe it in a special form */
1993 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
1994 size_t len;
1995 int enclen;
1996
1997 len = sdslen(obj->ptr);
1998
1999 /* Try integer encoding */
2000 if (len <= 11) {
2001 unsigned char buf[5];
2002 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2003 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2004 return 0;
2005 }
2006 }
2007
2008 /* Try LZF compression - under 20 bytes it's unable to compress even
2009 * aaaaaaaaaaaaaaaaaa so skip it */
2010 if (len > 20) {
2011 int retval;
2012
2013 retval = rdbSaveLzfStringObject(fp,obj);
2014 if (retval == -1) return -1;
2015 if (retval > 0) return 0;
2016 /* retval == 0 means data can't be compressed, save the old way */
2017 }
2018
2019 /* Store verbatim */
2020 if (rdbSaveLen(fp,len) == -1) return -1;
2021 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2022 return 0;
2023 }
2024
2025 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2026 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2027 int retval;
2028 robj *dec;
2029
2030 if (obj->encoding != REDIS_ENCODING_RAW) {
2031 dec = getDecodedObject(obj);
2032 retval = rdbSaveStringObjectRaw(fp,dec);
2033 decrRefCount(dec);
2034 return retval;
2035 } else {
2036 return rdbSaveStringObjectRaw(fp,obj);
2037 }
2038 }
2039
2040 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2041 static int rdbSave(char *filename) {
2042 dictIterator *di = NULL;
2043 dictEntry *de;
2044 FILE *fp;
2045 char tmpfile[256];
2046 int j;
2047 time_t now = time(NULL);
2048
2049 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2050 fp = fopen(tmpfile,"w");
2051 if (!fp) {
2052 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2053 return REDIS_ERR;
2054 }
2055 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2056 for (j = 0; j < server.dbnum; j++) {
2057 redisDb *db = server.db+j;
2058 dict *d = db->dict;
2059 if (dictSize(d) == 0) continue;
2060 di = dictGetIterator(d);
2061 if (!di) {
2062 fclose(fp);
2063 return REDIS_ERR;
2064 }
2065
2066 /* Write the SELECT DB opcode */
2067 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2068 if (rdbSaveLen(fp,j) == -1) goto werr;
2069
2070 /* Iterate this DB writing every entry */
2071 while((de = dictNext(di)) != NULL) {
2072 robj *key = dictGetEntryKey(de);
2073 robj *o = dictGetEntryVal(de);
2074 time_t expiretime = getExpire(db,key);
2075
2076 /* Save the expire time */
2077 if (expiretime != -1) {
2078 /* If this key is already expired skip it */
2079 if (expiretime < now) continue;
2080 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2081 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2082 }
2083 /* Save the key and associated value */
2084 if (rdbSaveType(fp,o->type) == -1) goto werr;
2085 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2086 if (o->type == REDIS_STRING) {
2087 /* Save a string value */
2088 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2089 } else if (o->type == REDIS_LIST) {
2090 /* Save a list value */
2091 list *list = o->ptr;
2092 listNode *ln;
2093
2094 listRewind(list);
2095 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2096 while((ln = listYield(list))) {
2097 robj *eleobj = listNodeValue(ln);
2098
2099 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2100 }
2101 } else if (o->type == REDIS_SET) {
2102 /* Save a set value */
2103 dict *set = o->ptr;
2104 dictIterator *di = dictGetIterator(set);
2105 dictEntry *de;
2106
2107 if (!set) oom("dictGetIteraotr");
2108 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2109 while((de = dictNext(di)) != NULL) {
2110 robj *eleobj = dictGetEntryKey(de);
2111
2112 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2113 }
2114 dictReleaseIterator(di);
2115 } else {
2116 assert(0 != 0);
2117 }
2118 }
2119 dictReleaseIterator(di);
2120 }
2121 /* EOF opcode */
2122 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2123
2124 /* Make sure data will not remain on the OS's output buffers */
2125 fflush(fp);
2126 fsync(fileno(fp));
2127 fclose(fp);
2128
2129 /* Use RENAME to make sure the DB file is changed atomically only
2130 * if the generate DB file is ok. */
2131 if (rename(tmpfile,filename) == -1) {
2132 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
2133 unlink(tmpfile);
2134 return REDIS_ERR;
2135 }
2136 redisLog(REDIS_NOTICE,"DB saved on disk");
2137 server.dirty = 0;
2138 server.lastsave = time(NULL);
2139 return REDIS_OK;
2140
2141 werr:
2142 fclose(fp);
2143 unlink(tmpfile);
2144 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2145 if (di) dictReleaseIterator(di);
2146 return REDIS_ERR;
2147 }
2148
2149 static int rdbSaveBackground(char *filename) {
2150 pid_t childpid;
2151
2152 if (server.bgsaveinprogress) return REDIS_ERR;
2153 if ((childpid = fork()) == 0) {
2154 /* Child */
2155 close(server.fd);
2156 if (rdbSave(filename) == REDIS_OK) {
2157 exit(0);
2158 } else {
2159 exit(1);
2160 }
2161 } else {
2162 /* Parent */
2163 if (childpid == -1) {
2164 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2165 strerror(errno));
2166 return REDIS_ERR;
2167 }
2168 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2169 server.bgsaveinprogress = 1;
2170 server.bgsavechildpid = childpid;
2171 return REDIS_OK;
2172 }
2173 return REDIS_OK; /* unreached */
2174 }
2175
2176 static void rdbRemoveTempFile(pid_t childpid) {
2177 char tmpfile[256];
2178
2179 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2180 unlink(tmpfile);
2181 }
2182
2183 static int rdbLoadType(FILE *fp) {
2184 unsigned char type;
2185 if (fread(&type,1,1,fp) == 0) return -1;
2186 return type;
2187 }
2188
2189 static time_t rdbLoadTime(FILE *fp) {
2190 int32_t t32;
2191 if (fread(&t32,4,1,fp) == 0) return -1;
2192 return (time_t) t32;
2193 }
2194
2195 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2196 * of this file for a description of how this are stored on disk.
2197 *
2198 * isencoded is set to 1 if the readed length is not actually a length but
2199 * an "encoding type", check the above comments for more info */
2200 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2201 unsigned char buf[2];
2202 uint32_t len;
2203
2204 if (isencoded) *isencoded = 0;
2205 if (rdbver == 0) {
2206 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2207 return ntohl(len);
2208 } else {
2209 int type;
2210
2211 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2212 type = (buf[0]&0xC0)>>6;
2213 if (type == REDIS_RDB_6BITLEN) {
2214 /* Read a 6 bit len */
2215 return buf[0]&0x3F;
2216 } else if (type == REDIS_RDB_ENCVAL) {
2217 /* Read a 6 bit len encoding type */
2218 if (isencoded) *isencoded = 1;
2219 return buf[0]&0x3F;
2220 } else if (type == REDIS_RDB_14BITLEN) {
2221 /* Read a 14 bit len */
2222 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2223 return ((buf[0]&0x3F)<<8)|buf[1];
2224 } else {
2225 /* Read a 32 bit len */
2226 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2227 return ntohl(len);
2228 }
2229 }
2230 }
2231
2232 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2233 unsigned char enc[4];
2234 long long val;
2235
2236 if (enctype == REDIS_RDB_ENC_INT8) {
2237 if (fread(enc,1,1,fp) == 0) return NULL;
2238 val = (signed char)enc[0];
2239 } else if (enctype == REDIS_RDB_ENC_INT16) {
2240 uint16_t v;
2241 if (fread(enc,2,1,fp) == 0) return NULL;
2242 v = enc[0]|(enc[1]<<8);
2243 val = (int16_t)v;
2244 } else if (enctype == REDIS_RDB_ENC_INT32) {
2245 uint32_t v;
2246 if (fread(enc,4,1,fp) == 0) return NULL;
2247 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2248 val = (int32_t)v;
2249 } else {
2250 val = 0; /* anti-warning */
2251 assert(0!=0);
2252 }
2253 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2254 }
2255
2256 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2257 unsigned int len, clen;
2258 unsigned char *c = NULL;
2259 sds val = NULL;
2260
2261 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2262 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2263 if ((c = zmalloc(clen)) == NULL) goto err;
2264 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2265 if (fread(c,clen,1,fp) == 0) goto err;
2266 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2267 zfree(c);
2268 return createObject(REDIS_STRING,val);
2269 err:
2270 zfree(c);
2271 sdsfree(val);
2272 return NULL;
2273 }
2274
2275 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2276 int isencoded;
2277 uint32_t len;
2278 sds val;
2279
2280 len = rdbLoadLen(fp,rdbver,&isencoded);
2281 if (isencoded) {
2282 switch(len) {
2283 case REDIS_RDB_ENC_INT8:
2284 case REDIS_RDB_ENC_INT16:
2285 case REDIS_RDB_ENC_INT32:
2286 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2287 case REDIS_RDB_ENC_LZF:
2288 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2289 default:
2290 assert(0!=0);
2291 }
2292 }
2293
2294 if (len == REDIS_RDB_LENERR) return NULL;
2295 val = sdsnewlen(NULL,len);
2296 if (len && fread(val,len,1,fp) == 0) {
2297 sdsfree(val);
2298 return NULL;
2299 }
2300 return tryObjectSharing(createObject(REDIS_STRING,val));
2301 }
2302
2303 static int rdbLoad(char *filename) {
2304 FILE *fp;
2305 robj *keyobj = NULL;
2306 uint32_t dbid;
2307 int type, retval, rdbver;
2308 dict *d = server.db[0].dict;
2309 redisDb *db = server.db+0;
2310 char buf[1024];
2311 time_t expiretime = -1, now = time(NULL);
2312
2313 fp = fopen(filename,"r");
2314 if (!fp) return REDIS_ERR;
2315 if (fread(buf,9,1,fp) == 0) goto eoferr;
2316 buf[9] = '\0';
2317 if (memcmp(buf,"REDIS",5) != 0) {
2318 fclose(fp);
2319 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2320 return REDIS_ERR;
2321 }
2322 rdbver = atoi(buf+5);
2323 if (rdbver > 1) {
2324 fclose(fp);
2325 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2326 return REDIS_ERR;
2327 }
2328 while(1) {
2329 robj *o;
2330
2331 /* Read type. */
2332 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2333 if (type == REDIS_EXPIRETIME) {
2334 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2335 /* We read the time so we need to read the object type again */
2336 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2337 }
2338 if (type == REDIS_EOF) break;
2339 /* Handle SELECT DB opcode as a special case */
2340 if (type == REDIS_SELECTDB) {
2341 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2342 goto eoferr;
2343 if (dbid >= (unsigned)server.dbnum) {
2344 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2345 exit(1);
2346 }
2347 db = server.db+dbid;
2348 d = db->dict;
2349 continue;
2350 }
2351 /* Read key */
2352 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2353
2354 if (type == REDIS_STRING) {
2355 /* Read string value */
2356 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2357 tryObjectEncoding(o);
2358 } else if (type == REDIS_LIST || type == REDIS_SET) {
2359 /* Read list/set value */
2360 uint32_t listlen;
2361
2362 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2363 goto eoferr;
2364 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2365 /* Load every single element of the list/set */
2366 while(listlen--) {
2367 robj *ele;
2368
2369 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2370 tryObjectEncoding(ele);
2371 if (type == REDIS_LIST) {
2372 if (!listAddNodeTail((list*)o->ptr,ele))
2373 oom("listAddNodeTail");
2374 } else {
2375 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2376 oom("dictAdd");
2377 }
2378 }
2379 } else {
2380 assert(0 != 0);
2381 }
2382 /* Add the new object in the hash table */
2383 retval = dictAdd(d,keyobj,o);
2384 if (retval == DICT_ERR) {
2385 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2386 exit(1);
2387 }
2388 /* Set the expire time if needed */
2389 if (expiretime != -1) {
2390 setExpire(db,keyobj,expiretime);
2391 /* Delete this key if already expired */
2392 if (expiretime < now) deleteKey(db,keyobj);
2393 expiretime = -1;
2394 }
2395 keyobj = o = NULL;
2396 }
2397 fclose(fp);
2398 return REDIS_OK;
2399
2400 eoferr: /* unexpected end of file is handled here with a fatal exit */
2401 if (keyobj) decrRefCount(keyobj);
2402 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2403 exit(1);
2404 return REDIS_ERR; /* Just to avoid warning */
2405 }
2406
2407 /*================================== Commands =============================== */
2408
2409 static void authCommand(redisClient *c) {
2410 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2411 c->authenticated = 1;
2412 addReply(c,shared.ok);
2413 } else {
2414 c->authenticated = 0;
2415 addReply(c,shared.err);
2416 }
2417 }
2418
2419 static void pingCommand(redisClient *c) {
2420 addReply(c,shared.pong);
2421 }
2422
2423 static void echoCommand(redisClient *c) {
2424 addReplyBulkLen(c,c->argv[1]);
2425 addReply(c,c->argv[1]);
2426 addReply(c,shared.crlf);
2427 }
2428
2429 /*=================================== Strings =============================== */
2430
2431 static void setGenericCommand(redisClient *c, int nx) {
2432 int retval;
2433
2434 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2435 if (retval == DICT_ERR) {
2436 if (!nx) {
2437 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2438 incrRefCount(c->argv[2]);
2439 } else {
2440 addReply(c,shared.czero);
2441 return;
2442 }
2443 } else {
2444 incrRefCount(c->argv[1]);
2445 incrRefCount(c->argv[2]);
2446 }
2447 server.dirty++;
2448 removeExpire(c->db,c->argv[1]);
2449 addReply(c, nx ? shared.cone : shared.ok);
2450 }
2451
2452 static void setCommand(redisClient *c) {
2453 setGenericCommand(c,0);
2454 }
2455
2456 static void setnxCommand(redisClient *c) {
2457 setGenericCommand(c,1);
2458 }
2459
2460 static void getCommand(redisClient *c) {
2461 robj *o = lookupKeyRead(c->db,c->argv[1]);
2462
2463 if (o == NULL) {
2464 addReply(c,shared.nullbulk);
2465 } else {
2466 if (o->type != REDIS_STRING) {
2467 addReply(c,shared.wrongtypeerr);
2468 } else {
2469 addReplyBulkLen(c,o);
2470 addReply(c,o);
2471 addReply(c,shared.crlf);
2472 }
2473 }
2474 }
2475
2476 static void getSetCommand(redisClient *c) {
2477 getCommand(c);
2478 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2479 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2480 } else {
2481 incrRefCount(c->argv[1]);
2482 }
2483 incrRefCount(c->argv[2]);
2484 server.dirty++;
2485 removeExpire(c->db,c->argv[1]);
2486 }
2487
2488 static void mgetCommand(redisClient *c) {
2489 int j;
2490
2491 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2492 for (j = 1; j < c->argc; j++) {
2493 robj *o = lookupKeyRead(c->db,c->argv[j]);
2494 if (o == NULL) {
2495 addReply(c,shared.nullbulk);
2496 } else {
2497 if (o->type != REDIS_STRING) {
2498 addReply(c,shared.nullbulk);
2499 } else {
2500 addReplyBulkLen(c,o);
2501 addReply(c,o);
2502 addReply(c,shared.crlf);
2503 }
2504 }
2505 }
2506 }
2507
2508 static void incrDecrCommand(redisClient *c, long long incr) {
2509 long long value;
2510 int retval;
2511 robj *o;
2512
2513 o = lookupKeyWrite(c->db,c->argv[1]);
2514 if (o == NULL) {
2515 value = 0;
2516 } else {
2517 if (o->type != REDIS_STRING) {
2518 value = 0;
2519 } else {
2520 char *eptr;
2521
2522 if (o->encoding == REDIS_ENCODING_RAW)
2523 value = strtoll(o->ptr, &eptr, 10);
2524 else if (o->encoding == REDIS_ENCODING_INT)
2525 value = (long)o->ptr;
2526 else
2527 assert(1 != 1);
2528 }
2529 }
2530
2531 value += incr;
2532 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2533 tryObjectEncoding(o);
2534 retval = dictAdd(c->db->dict,c->argv[1],o);
2535 if (retval == DICT_ERR) {
2536 dictReplace(c->db->dict,c->argv[1],o);
2537 removeExpire(c->db,c->argv[1]);
2538 } else {
2539 incrRefCount(c->argv[1]);
2540 }
2541 server.dirty++;
2542 addReply(c,shared.colon);
2543 addReply(c,o);
2544 addReply(c,shared.crlf);
2545 }
2546
2547 static void incrCommand(redisClient *c) {
2548 incrDecrCommand(c,1);
2549 }
2550
2551 static void decrCommand(redisClient *c) {
2552 incrDecrCommand(c,-1);
2553 }
2554
2555 static void incrbyCommand(redisClient *c) {
2556 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2557 incrDecrCommand(c,incr);
2558 }
2559
2560 static void decrbyCommand(redisClient *c) {
2561 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2562 incrDecrCommand(c,-incr);
2563 }
2564
2565 /* ========================= Type agnostic commands ========================= */
2566
2567 static void delCommand(redisClient *c) {
2568 int deleted = 0, j;
2569
2570 for (j = 1; j < c->argc; j++) {
2571 if (deleteKey(c->db,c->argv[j])) {
2572 server.dirty++;
2573 deleted++;
2574 }
2575 }
2576 switch(deleted) {
2577 case 0:
2578 addReply(c,shared.czero);
2579 break;
2580 case 1:
2581 addReply(c,shared.cone);
2582 break;
2583 default:
2584 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2585 break;
2586 }
2587 }
2588
2589 static void existsCommand(redisClient *c) {
2590 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2591 }
2592
2593 static void selectCommand(redisClient *c) {
2594 int id = atoi(c->argv[1]->ptr);
2595
2596 if (selectDb(c,id) == REDIS_ERR) {
2597 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2598 } else {
2599 addReply(c,shared.ok);
2600 }
2601 }
2602
2603 static void randomkeyCommand(redisClient *c) {
2604 dictEntry *de;
2605
2606 while(1) {
2607 de = dictGetRandomKey(c->db->dict);
2608 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2609 }
2610 if (de == NULL) {
2611 addReply(c,shared.plus);
2612 addReply(c,shared.crlf);
2613 } else {
2614 addReply(c,shared.plus);
2615 addReply(c,dictGetEntryKey(de));
2616 addReply(c,shared.crlf);
2617 }
2618 }
2619
2620 static void keysCommand(redisClient *c) {
2621 dictIterator *di;
2622 dictEntry *de;
2623 sds pattern = c->argv[1]->ptr;
2624 int plen = sdslen(pattern);
2625 int numkeys = 0, keyslen = 0;
2626 robj *lenobj = createObject(REDIS_STRING,NULL);
2627
2628 di = dictGetIterator(c->db->dict);
2629 if (!di) oom("dictGetIterator");
2630 addReply(c,lenobj);
2631 decrRefCount(lenobj);
2632 while((de = dictNext(di)) != NULL) {
2633 robj *keyobj = dictGetEntryKey(de);
2634
2635 sds key = keyobj->ptr;
2636 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2637 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2638 if (expireIfNeeded(c->db,keyobj) == 0) {
2639 if (numkeys != 0)
2640 addReply(c,shared.space);
2641 addReply(c,keyobj);
2642 numkeys++;
2643 keyslen += sdslen(key);
2644 }
2645 }
2646 }
2647 dictReleaseIterator(di);
2648 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2649 addReply(c,shared.crlf);
2650 }
2651
2652 static void dbsizeCommand(redisClient *c) {
2653 addReplySds(c,
2654 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2655 }
2656
2657 static void lastsaveCommand(redisClient *c) {
2658 addReplySds(c,
2659 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2660 }
2661
2662 static void typeCommand(redisClient *c) {
2663 robj *o;
2664 char *type;
2665
2666 o = lookupKeyRead(c->db,c->argv[1]);
2667 if (o == NULL) {
2668 type = "+none";
2669 } else {
2670 switch(o->type) {
2671 case REDIS_STRING: type = "+string"; break;
2672 case REDIS_LIST: type = "+list"; break;
2673 case REDIS_SET: type = "+set"; break;
2674 default: type = "unknown"; break;
2675 }
2676 }
2677 addReplySds(c,sdsnew(type));
2678 addReply(c,shared.crlf);
2679 }
2680
2681 static void saveCommand(redisClient *c) {
2682 if (server.bgsaveinprogress) {
2683 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2684 return;
2685 }
2686 if (rdbSave(server.dbfilename) == REDIS_OK) {
2687 addReply(c,shared.ok);
2688 } else {
2689 addReply(c,shared.err);
2690 }
2691 }
2692
2693 static void bgsaveCommand(redisClient *c) {
2694 if (server.bgsaveinprogress) {
2695 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2696 return;
2697 }
2698 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2699 addReply(c,shared.ok);
2700 } else {
2701 addReply(c,shared.err);
2702 }
2703 }
2704
2705 static void shutdownCommand(redisClient *c) {
2706 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2707 /* Kill the saving child if there is a background saving in progress.
2708 We want to avoid race conditions, for instance our saving child may
2709 overwrite the synchronous saving did by SHUTDOWN. */
2710 if (server.bgsaveinprogress) {
2711 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2712 kill(server.bgsavechildpid,SIGKILL);
2713 rdbRemoveTempFile(server.bgsavechildpid);
2714 }
2715 /* SYNC SAVE */
2716 if (rdbSave(server.dbfilename) == REDIS_OK) {
2717 if (server.daemonize)
2718 unlink(server.pidfile);
2719 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2720 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2721 exit(1);
2722 } else {
2723 /* Ooops.. error saving! The best we can do is to continue operating.
2724 * Note that if there was a background saving process, in the next
2725 * cron() Redis will be notified that the background saving aborted,
2726 * handling special stuff like slaves pending for synchronization... */
2727 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2728 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2729 }
2730 }
2731
2732 static void renameGenericCommand(redisClient *c, int nx) {
2733 robj *o;
2734
2735 /* To use the same key as src and dst is probably an error */
2736 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2737 addReply(c,shared.sameobjecterr);
2738 return;
2739 }
2740
2741 o = lookupKeyWrite(c->db,c->argv[1]);
2742 if (o == NULL) {
2743 addReply(c,shared.nokeyerr);
2744 return;
2745 }
2746 incrRefCount(o);
2747 deleteIfVolatile(c->db,c->argv[2]);
2748 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2749 if (nx) {
2750 decrRefCount(o);
2751 addReply(c,shared.czero);
2752 return;
2753 }
2754 dictReplace(c->db->dict,c->argv[2],o);
2755 } else {
2756 incrRefCount(c->argv[2]);
2757 }
2758 deleteKey(c->db,c->argv[1]);
2759 server.dirty++;
2760 addReply(c,nx ? shared.cone : shared.ok);
2761 }
2762
2763 static void renameCommand(redisClient *c) {
2764 renameGenericCommand(c,0);
2765 }
2766
2767 static void renamenxCommand(redisClient *c) {
2768 renameGenericCommand(c,1);
2769 }
2770
2771 static void moveCommand(redisClient *c) {
2772 robj *o;
2773 redisDb *src, *dst;
2774 int srcid;
2775
2776 /* Obtain source and target DB pointers */
2777 src = c->db;
2778 srcid = c->db->id;
2779 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2780 addReply(c,shared.outofrangeerr);
2781 return;
2782 }
2783 dst = c->db;
2784 selectDb(c,srcid); /* Back to the source DB */
2785
2786 /* If the user is moving using as target the same
2787 * DB as the source DB it is probably an error. */
2788 if (src == dst) {
2789 addReply(c,shared.sameobjecterr);
2790 return;
2791 }
2792
2793 /* Check if the element exists and get a reference */
2794 o = lookupKeyWrite(c->db,c->argv[1]);
2795 if (!o) {
2796 addReply(c,shared.czero);
2797 return;
2798 }
2799
2800 /* Try to add the element to the target DB */
2801 deleteIfVolatile(dst,c->argv[1]);
2802 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2803 addReply(c,shared.czero);
2804 return;
2805 }
2806 incrRefCount(c->argv[1]);
2807 incrRefCount(o);
2808
2809 /* OK! key moved, free the entry in the source DB */
2810 deleteKey(src,c->argv[1]);
2811 server.dirty++;
2812 addReply(c,shared.cone);
2813 }
2814
2815 /* =================================== Lists ================================ */
2816 static void pushGenericCommand(redisClient *c, int where) {
2817 robj *lobj;
2818 list *list;
2819
2820 lobj = lookupKeyWrite(c->db,c->argv[1]);
2821 if (lobj == NULL) {
2822 lobj = createListObject();
2823 list = lobj->ptr;
2824 if (where == REDIS_HEAD) {
2825 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2826 } else {
2827 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2828 }
2829 dictAdd(c->db->dict,c->argv[1],lobj);
2830 incrRefCount(c->argv[1]);
2831 incrRefCount(c->argv[2]);
2832 } else {
2833 if (lobj->type != REDIS_LIST) {
2834 addReply(c,shared.wrongtypeerr);
2835 return;
2836 }
2837 list = lobj->ptr;
2838 if (where == REDIS_HEAD) {
2839 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2840 } else {
2841 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2842 }
2843 incrRefCount(c->argv[2]);
2844 }
2845 server.dirty++;
2846 addReply(c,shared.ok);
2847 }
2848
2849 static void lpushCommand(redisClient *c) {
2850 pushGenericCommand(c,REDIS_HEAD);
2851 }
2852
2853 static void rpushCommand(redisClient *c) {
2854 pushGenericCommand(c,REDIS_TAIL);
2855 }
2856
2857 static void llenCommand(redisClient *c) {
2858 robj *o;
2859 list *l;
2860
2861 o = lookupKeyRead(c->db,c->argv[1]);
2862 if (o == NULL) {
2863 addReply(c,shared.czero);
2864 return;
2865 } else {
2866 if (o->type != REDIS_LIST) {
2867 addReply(c,shared.wrongtypeerr);
2868 } else {
2869 l = o->ptr;
2870 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
2871 }
2872 }
2873 }
2874
2875 static void lindexCommand(redisClient *c) {
2876 robj *o;
2877 int index = atoi(c->argv[2]->ptr);
2878
2879 o = lookupKeyRead(c->db,c->argv[1]);
2880 if (o == NULL) {
2881 addReply(c,shared.nullbulk);
2882 } else {
2883 if (o->type != REDIS_LIST) {
2884 addReply(c,shared.wrongtypeerr);
2885 } else {
2886 list *list = o->ptr;
2887 listNode *ln;
2888
2889 ln = listIndex(list, index);
2890 if (ln == NULL) {
2891 addReply(c,shared.nullbulk);
2892 } else {
2893 robj *ele = listNodeValue(ln);
2894 addReplyBulkLen(c,ele);
2895 addReply(c,ele);
2896 addReply(c,shared.crlf);
2897 }
2898 }
2899 }
2900 }
2901
2902 static void lsetCommand(redisClient *c) {
2903 robj *o;
2904 int index = atoi(c->argv[2]->ptr);
2905
2906 o = lookupKeyWrite(c->db,c->argv[1]);
2907 if (o == NULL) {
2908 addReply(c,shared.nokeyerr);
2909 } else {
2910 if (o->type != REDIS_LIST) {
2911 addReply(c,shared.wrongtypeerr);
2912 } else {
2913 list *list = o->ptr;
2914 listNode *ln;
2915
2916 ln = listIndex(list, index);
2917 if (ln == NULL) {
2918 addReply(c,shared.outofrangeerr);
2919 } else {
2920 robj *ele = listNodeValue(ln);
2921
2922 decrRefCount(ele);
2923 listNodeValue(ln) = c->argv[3];
2924 incrRefCount(c->argv[3]);
2925 addReply(c,shared.ok);
2926 server.dirty++;
2927 }
2928 }
2929 }
2930 }
2931
2932 static void popGenericCommand(redisClient *c, int where) {
2933 robj *o;
2934
2935 o = lookupKeyWrite(c->db,c->argv[1]);
2936 if (o == NULL) {
2937 addReply(c,shared.nullbulk);
2938 } else {
2939 if (o->type != REDIS_LIST) {
2940 addReply(c,shared.wrongtypeerr);
2941 } else {
2942 list *list = o->ptr;
2943 listNode *ln;
2944
2945 if (where == REDIS_HEAD)
2946 ln = listFirst(list);
2947 else
2948 ln = listLast(list);
2949
2950 if (ln == NULL) {
2951 addReply(c,shared.nullbulk);
2952 } else {
2953 robj *ele = listNodeValue(ln);
2954 addReplyBulkLen(c,ele);
2955 addReply(c,ele);
2956 addReply(c,shared.crlf);
2957 listDelNode(list,ln);
2958 server.dirty++;
2959 }
2960 }
2961 }
2962 }
2963
2964 static void lpopCommand(redisClient *c) {
2965 popGenericCommand(c,REDIS_HEAD);
2966 }
2967
2968 static void rpopCommand(redisClient *c) {
2969 popGenericCommand(c,REDIS_TAIL);
2970 }
2971
2972 static void lrangeCommand(redisClient *c) {
2973 robj *o;
2974 int start = atoi(c->argv[2]->ptr);
2975 int end = atoi(c->argv[3]->ptr);
2976
2977 o = lookupKeyRead(c->db,c->argv[1]);
2978 if (o == NULL) {
2979 addReply(c,shared.nullmultibulk);
2980 } else {
2981 if (o->type != REDIS_LIST) {
2982 addReply(c,shared.wrongtypeerr);
2983 } else {
2984 list *list = o->ptr;
2985 listNode *ln;
2986 int llen = listLength(list);
2987 int rangelen, j;
2988 robj *ele;
2989
2990 /* convert negative indexes */
2991 if (start < 0) start = llen+start;
2992 if (end < 0) end = llen+end;
2993 if (start < 0) start = 0;
2994 if (end < 0) end = 0;
2995
2996 /* indexes sanity checks */
2997 if (start > end || start >= llen) {
2998 /* Out of range start or start > end result in empty list */
2999 addReply(c,shared.emptymultibulk);
3000 return;
3001 }
3002 if (end >= llen) end = llen-1;
3003 rangelen = (end-start)+1;
3004
3005 /* Return the result in form of a multi-bulk reply */
3006 ln = listIndex(list, start);
3007 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3008 for (j = 0; j < rangelen; j++) {
3009 ele = listNodeValue(ln);
3010 addReplyBulkLen(c,ele);
3011 addReply(c,ele);
3012 addReply(c,shared.crlf);
3013 ln = ln->next;
3014 }
3015 }
3016 }
3017 }
3018
3019 static void ltrimCommand(redisClient *c) {
3020 robj *o;
3021 int start = atoi(c->argv[2]->ptr);
3022 int end = atoi(c->argv[3]->ptr);
3023
3024 o = lookupKeyWrite(c->db,c->argv[1]);
3025 if (o == NULL) {
3026 addReply(c,shared.nokeyerr);
3027 } else {
3028 if (o->type != REDIS_LIST) {
3029 addReply(c,shared.wrongtypeerr);
3030 } else {
3031 list *list = o->ptr;
3032 listNode *ln;
3033 int llen = listLength(list);
3034 int j, ltrim, rtrim;
3035
3036 /* convert negative indexes */
3037 if (start < 0) start = llen+start;
3038 if (end < 0) end = llen+end;
3039 if (start < 0) start = 0;
3040 if (end < 0) end = 0;
3041
3042 /* indexes sanity checks */
3043 if (start > end || start >= llen) {
3044 /* Out of range start or start > end result in empty list */
3045 ltrim = llen;
3046 rtrim = 0;
3047 } else {
3048 if (end >= llen) end = llen-1;
3049 ltrim = start;
3050 rtrim = llen-end-1;
3051 }
3052
3053 /* Remove list elements to perform the trim */
3054 for (j = 0; j < ltrim; j++) {
3055 ln = listFirst(list);
3056 listDelNode(list,ln);
3057 }
3058 for (j = 0; j < rtrim; j++) {
3059 ln = listLast(list);
3060 listDelNode(list,ln);
3061 }
3062 server.dirty++;
3063 addReply(c,shared.ok);
3064 }
3065 }
3066 }
3067
3068 static void lremCommand(redisClient *c) {
3069 robj *o;
3070
3071 o = lookupKeyWrite(c->db,c->argv[1]);
3072 if (o == NULL) {
3073 addReply(c,shared.czero);
3074 } else {
3075 if (o->type != REDIS_LIST) {
3076 addReply(c,shared.wrongtypeerr);
3077 } else {
3078 list *list = o->ptr;
3079 listNode *ln, *next;
3080 int toremove = atoi(c->argv[2]->ptr);
3081 int removed = 0;
3082 int fromtail = 0;
3083
3084 if (toremove < 0) {
3085 toremove = -toremove;
3086 fromtail = 1;
3087 }
3088 ln = fromtail ? list->tail : list->head;
3089 while (ln) {
3090 robj *ele = listNodeValue(ln);
3091
3092 next = fromtail ? ln->prev : ln->next;
3093 if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) {
3094 listDelNode(list,ln);
3095 server.dirty++;
3096 removed++;
3097 if (toremove && removed == toremove) break;
3098 }
3099 ln = next;
3100 }
3101 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3102 }
3103 }
3104 }
3105
3106 /* ==================================== Sets ================================ */
3107
3108 static void saddCommand(redisClient *c) {
3109 robj *set;
3110
3111 set = lookupKeyWrite(c->db,c->argv[1]);
3112 if (set == NULL) {
3113 set = createSetObject();
3114 dictAdd(c->db->dict,c->argv[1],set);
3115 incrRefCount(c->argv[1]);
3116 } else {
3117 if (set->type != REDIS_SET) {
3118 addReply(c,shared.wrongtypeerr);
3119 return;
3120 }
3121 }
3122 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3123 incrRefCount(c->argv[2]);
3124 server.dirty++;
3125 addReply(c,shared.cone);
3126 } else {
3127 addReply(c,shared.czero);
3128 }
3129 }
3130
3131 static void sremCommand(redisClient *c) {
3132 robj *set;
3133
3134 set = lookupKeyWrite(c->db,c->argv[1]);
3135 if (set == NULL) {
3136 addReply(c,shared.czero);
3137 } else {
3138 if (set->type != REDIS_SET) {
3139 addReply(c,shared.wrongtypeerr);
3140 return;
3141 }
3142 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3143 server.dirty++;
3144 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3145 addReply(c,shared.cone);
3146 } else {
3147 addReply(c,shared.czero);
3148 }
3149 }
3150 }
3151
3152 static void smoveCommand(redisClient *c) {
3153 robj *srcset, *dstset;
3154
3155 srcset = lookupKeyWrite(c->db,c->argv[1]);
3156 dstset = lookupKeyWrite(c->db,c->argv[2]);
3157
3158 /* If the source key does not exist return 0, if it's of the wrong type
3159 * raise an error */
3160 if (srcset == NULL || srcset->type != REDIS_SET) {
3161 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3162 return;
3163 }
3164 /* Error if the destination key is not a set as well */
3165 if (dstset && dstset->type != REDIS_SET) {
3166 addReply(c,shared.wrongtypeerr);
3167 return;
3168 }
3169 /* Remove the element from the source set */
3170 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3171 /* Key not found in the src set! return zero */
3172 addReply(c,shared.czero);
3173 return;
3174 }
3175 server.dirty++;
3176 /* Add the element to the destination set */
3177 if (!dstset) {
3178 dstset = createSetObject();
3179 dictAdd(c->db->dict,c->argv[2],dstset);
3180 incrRefCount(c->argv[2]);
3181 }
3182 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3183 incrRefCount(c->argv[3]);
3184 addReply(c,shared.cone);
3185 }
3186
3187 static void sismemberCommand(redisClient *c) {
3188 robj *set;
3189
3190 set = lookupKeyRead(c->db,c->argv[1]);
3191 if (set == NULL) {
3192 addReply(c,shared.czero);
3193 } else {
3194 if (set->type != REDIS_SET) {
3195 addReply(c,shared.wrongtypeerr);
3196 return;
3197 }
3198 if (dictFind(set->ptr,c->argv[2]))
3199 addReply(c,shared.cone);
3200 else
3201 addReply(c,shared.czero);
3202 }
3203 }
3204
3205 static void scardCommand(redisClient *c) {
3206 robj *o;
3207 dict *s;
3208
3209 o = lookupKeyRead(c->db,c->argv[1]);
3210 if (o == NULL) {
3211 addReply(c,shared.czero);
3212 return;
3213 } else {
3214 if (o->type != REDIS_SET) {
3215 addReply(c,shared.wrongtypeerr);
3216 } else {
3217 s = o->ptr;
3218 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3219 dictSize(s)));
3220 }
3221 }
3222 }
3223
3224 static void spopCommand(redisClient *c) {
3225 robj *set;
3226 dictEntry *de;
3227
3228 set = lookupKeyWrite(c->db,c->argv[1]);
3229 if (set == NULL) {
3230 addReply(c,shared.nullbulk);
3231 } else {
3232 if (set->type != REDIS_SET) {
3233 addReply(c,shared.wrongtypeerr);
3234 return;
3235 }
3236 de = dictGetRandomKey(set->ptr);
3237 if (de == NULL) {
3238 addReply(c,shared.nullbulk);
3239 } else {
3240 robj *ele = dictGetEntryKey(de);
3241
3242 addReplyBulkLen(c,ele);
3243 addReply(c,ele);
3244 addReply(c,shared.crlf);
3245 dictDelete(set->ptr,ele);
3246 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3247 server.dirty++;
3248 }
3249 }
3250 }
3251
3252 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3253 dict **d1 = (void*) s1, **d2 = (void*) s2;
3254
3255 return dictSize(*d1)-dictSize(*d2);
3256 }
3257
3258 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3259 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3260 dictIterator *di;
3261 dictEntry *de;
3262 robj *lenobj = NULL, *dstset = NULL;
3263 int j, cardinality = 0;
3264
3265 if (!dv) oom("sinterGenericCommand");
3266 for (j = 0; j < setsnum; j++) {
3267 robj *setobj;
3268
3269 setobj = dstkey ?
3270 lookupKeyWrite(c->db,setskeys[j]) :
3271 lookupKeyRead(c->db,setskeys[j]);
3272 if (!setobj) {
3273 zfree(dv);
3274 if (dstkey) {
3275 deleteKey(c->db,dstkey);
3276 addReply(c,shared.ok);
3277 } else {
3278 addReply(c,shared.nullmultibulk);
3279 }
3280 return;
3281 }
3282 if (setobj->type != REDIS_SET) {
3283 zfree(dv);
3284 addReply(c,shared.wrongtypeerr);
3285 return;
3286 }
3287 dv[j] = setobj->ptr;
3288 }
3289 /* Sort sets from the smallest to largest, this will improve our
3290 * algorithm's performace */
3291 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3292
3293 /* The first thing we should output is the total number of elements...
3294 * since this is a multi-bulk write, but at this stage we don't know
3295 * the intersection set size, so we use a trick, append an empty object
3296 * to the output list and save the pointer to later modify it with the
3297 * right length */
3298 if (!dstkey) {
3299 lenobj = createObject(REDIS_STRING,NULL);
3300 addReply(c,lenobj);
3301 decrRefCount(lenobj);
3302 } else {
3303 /* If we have a target key where to store the resulting set
3304 * create this key with an empty set inside */
3305 dstset = createSetObject();
3306 }
3307
3308 /* Iterate all the elements of the first (smallest) set, and test
3309 * the element against all the other sets, if at least one set does
3310 * not include the element it is discarded */
3311 di = dictGetIterator(dv[0]);
3312 if (!di) oom("dictGetIterator");
3313
3314 while((de = dictNext(di)) != NULL) {
3315 robj *ele;
3316
3317 for (j = 1; j < setsnum; j++)
3318 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3319 if (j != setsnum)
3320 continue; /* at least one set does not contain the member */
3321 ele = dictGetEntryKey(de);
3322 if (!dstkey) {
3323 addReplyBulkLen(c,ele);
3324 addReply(c,ele);
3325 addReply(c,shared.crlf);
3326 cardinality++;
3327 } else {
3328 dictAdd(dstset->ptr,ele,NULL);
3329 incrRefCount(ele);
3330 }
3331 }
3332 dictReleaseIterator(di);
3333
3334 if (dstkey) {
3335 /* Store the resulting set into the target */
3336 deleteKey(c->db,dstkey);
3337 dictAdd(c->db->dict,dstkey,dstset);
3338 incrRefCount(dstkey);
3339 }
3340
3341 if (!dstkey) {
3342 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3343 } else {
3344 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3345 dictSize((dict*)dstset->ptr)));
3346 server.dirty++;
3347 }
3348 zfree(dv);
3349 }
3350
3351 static void sinterCommand(redisClient *c) {
3352 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3353 }
3354
3355 static void sinterstoreCommand(redisClient *c) {
3356 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3357 }
3358
3359 #define REDIS_OP_UNION 0
3360 #define REDIS_OP_DIFF 1
3361
3362 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3363 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3364 dictIterator *di;
3365 dictEntry *de;
3366 robj *dstset = NULL;
3367 int j, cardinality = 0;
3368
3369 if (!dv) oom("sunionDiffGenericCommand");
3370 for (j = 0; j < setsnum; j++) {
3371 robj *setobj;
3372
3373 setobj = dstkey ?
3374 lookupKeyWrite(c->db,setskeys[j]) :
3375 lookupKeyRead(c->db,setskeys[j]);
3376 if (!setobj) {
3377 dv[j] = NULL;
3378 continue;
3379 }
3380 if (setobj->type != REDIS_SET) {
3381 zfree(dv);
3382 addReply(c,shared.wrongtypeerr);
3383 return;
3384 }
3385 dv[j] = setobj->ptr;
3386 }
3387
3388 /* We need a temp set object to store our union. If the dstkey
3389 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3390 * this set object will be the resulting object to set into the target key*/
3391 dstset = createSetObject();
3392
3393 /* Iterate all the elements of all the sets, add every element a single
3394 * time to the result set */
3395 for (j = 0; j < setsnum; j++) {
3396 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3397 if (!dv[j]) continue; /* non existing keys are like empty sets */
3398
3399 di = dictGetIterator(dv[j]);
3400 if (!di) oom("dictGetIterator");
3401
3402 while((de = dictNext(di)) != NULL) {
3403 robj *ele;
3404
3405 /* dictAdd will not add the same element multiple times */
3406 ele = dictGetEntryKey(de);
3407 if (op == REDIS_OP_UNION || j == 0) {
3408 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3409 incrRefCount(ele);
3410 cardinality++;
3411 }
3412 } else if (op == REDIS_OP_DIFF) {
3413 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3414 cardinality--;
3415 }
3416 }
3417 }
3418 dictReleaseIterator(di);
3419
3420 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3421 }
3422
3423 /* Output the content of the resulting set, if not in STORE mode */
3424 if (!dstkey) {
3425 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3426 di = dictGetIterator(dstset->ptr);
3427 if (!di) oom("dictGetIterator");
3428 while((de = dictNext(di)) != NULL) {
3429 robj *ele;
3430
3431 ele = dictGetEntryKey(de);
3432 addReplyBulkLen(c,ele);
3433 addReply(c,ele);
3434 addReply(c,shared.crlf);
3435 }
3436 dictReleaseIterator(di);
3437 } else {
3438 /* If we have a target key where to store the resulting set
3439 * create this key with the result set inside */
3440 deleteKey(c->db,dstkey);
3441 dictAdd(c->db->dict,dstkey,dstset);
3442 incrRefCount(dstkey);
3443 }
3444
3445 /* Cleanup */
3446 if (!dstkey) {
3447 decrRefCount(dstset);
3448 } else {
3449 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3450 dictSize((dict*)dstset->ptr)));
3451 server.dirty++;
3452 }
3453 zfree(dv);
3454 }
3455
3456 static void sunionCommand(redisClient *c) {
3457 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3458 }
3459
3460 static void sunionstoreCommand(redisClient *c) {
3461 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3462 }
3463
3464 static void sdiffCommand(redisClient *c) {
3465 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3466 }
3467
3468 static void sdiffstoreCommand(redisClient *c) {
3469 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3470 }
3471
3472 static void flushdbCommand(redisClient *c) {
3473 server.dirty += dictSize(c->db->dict);
3474 dictEmpty(c->db->dict);
3475 dictEmpty(c->db->expires);
3476 addReply(c,shared.ok);
3477 }
3478
3479 static void flushallCommand(redisClient *c) {
3480 server.dirty += emptyDb();
3481 addReply(c,shared.ok);
3482 rdbSave(server.dbfilename);
3483 server.dirty++;
3484 }
3485
3486 static redisSortOperation *createSortOperation(int type, robj *pattern) {
3487 redisSortOperation *so = zmalloc(sizeof(*so));
3488 if (!so) oom("createSortOperation");
3489 so->type = type;
3490 so->pattern = pattern;
3491 return so;
3492 }
3493
3494 /* Return the value associated to the key with a name obtained
3495 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3496 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3497 char *p;
3498 sds spat, ssub;
3499 robj keyobj;
3500 int prefixlen, sublen, postfixlen;
3501 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3502 struct {
3503 long len;
3504 long free;
3505 char buf[REDIS_SORTKEY_MAX+1];
3506 } keyname;
3507
3508 if (subst->encoding == REDIS_ENCODING_RAW)
3509 incrRefCount(subst);
3510 else {
3511 subst = getDecodedObject(subst);
3512 }
3513
3514 spat = pattern->ptr;
3515 ssub = subst->ptr;
3516 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3517 p = strchr(spat,'*');
3518 if (!p) return NULL;
3519
3520 prefixlen = p-spat;
3521 sublen = sdslen(ssub);
3522 postfixlen = sdslen(spat)-(prefixlen+1);
3523 memcpy(keyname.buf,spat,prefixlen);
3524 memcpy(keyname.buf+prefixlen,ssub,sublen);
3525 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3526 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3527 keyname.len = prefixlen+sublen+postfixlen;
3528
3529 keyobj.refcount = 1;
3530 keyobj.type = REDIS_STRING;
3531 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3532
3533 decrRefCount(subst);
3534
3535 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3536 return lookupKeyRead(db,&keyobj);
3537 }
3538
3539 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3540 * the additional parameter is not standard but a BSD-specific we have to
3541 * pass sorting parameters via the global 'server' structure */
3542 static int sortCompare(const void *s1, const void *s2) {
3543 const redisSortObject *so1 = s1, *so2 = s2;
3544 int cmp;
3545
3546 if (!server.sort_alpha) {
3547 /* Numeric sorting. Here it's trivial as we precomputed scores */
3548 if (so1->u.score > so2->u.score) {
3549 cmp = 1;
3550 } else if (so1->u.score < so2->u.score) {
3551 cmp = -1;
3552 } else {
3553 cmp = 0;
3554 }
3555 } else {
3556 /* Alphanumeric sorting */
3557 if (server.sort_bypattern) {
3558 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3559 /* At least one compare object is NULL */
3560 if (so1->u.cmpobj == so2->u.cmpobj)
3561 cmp = 0;
3562 else if (so1->u.cmpobj == NULL)
3563 cmp = -1;
3564 else
3565 cmp = 1;
3566 } else {
3567 /* We have both the objects, use strcoll */
3568 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3569 }
3570 } else {
3571 /* Compare elements directly */
3572 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
3573 so2->obj->encoding == REDIS_ENCODING_RAW) {
3574 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3575 } else {
3576 robj *dec1, *dec2;
3577
3578 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
3579 so1->obj : getDecodedObject(so1->obj);
3580 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
3581 so2->obj : getDecodedObject(so2->obj);
3582 cmp = strcoll(dec1->ptr,dec2->ptr);
3583 if (dec1 != so1->obj) decrRefCount(dec1);
3584 if (dec2 != so2->obj) decrRefCount(dec2);
3585 }
3586 }
3587 }
3588 return server.sort_desc ? -cmp : cmp;
3589 }
3590
3591 /* The SORT command is the most complex command in Redis. Warning: this code
3592 * is optimized for speed and a bit less for readability */
3593 static void sortCommand(redisClient *c) {
3594 list *operations;
3595 int outputlen = 0;
3596 int desc = 0, alpha = 0;
3597 int limit_start = 0, limit_count = -1, start, end;
3598 int j, dontsort = 0, vectorlen;
3599 int getop = 0; /* GET operation counter */
3600 robj *sortval, *sortby = NULL;
3601 redisSortObject *vector; /* Resulting vector to sort */
3602
3603 /* Lookup the key to sort. It must be of the right types */
3604 sortval = lookupKeyRead(c->db,c->argv[1]);
3605 if (sortval == NULL) {
3606 addReply(c,shared.nokeyerr);
3607 return;
3608 }
3609 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3610 addReply(c,shared.wrongtypeerr);
3611 return;
3612 }
3613
3614 /* Create a list of operations to perform for every sorted element.
3615 * Operations can be GET/DEL/INCR/DECR */
3616 operations = listCreate();
3617 listSetFreeMethod(operations,zfree);
3618 j = 2;
3619
3620 /* Now we need to protect sortval incrementing its count, in the future
3621 * SORT may have options able to overwrite/delete keys during the sorting
3622 * and the sorted key itself may get destroied */
3623 incrRefCount(sortval);
3624
3625 /* The SORT command has an SQL-alike syntax, parse it */
3626 while(j < c->argc) {
3627 int leftargs = c->argc-j-1;
3628 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3629 desc = 0;
3630 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3631 desc = 1;
3632 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3633 alpha = 1;
3634 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3635 limit_start = atoi(c->argv[j+1]->ptr);
3636 limit_count = atoi(c->argv[j+2]->ptr);
3637 j+=2;
3638 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3639 sortby = c->argv[j+1];
3640 /* If the BY pattern does not contain '*', i.e. it is constant,
3641 * we don't need to sort nor to lookup the weight keys. */
3642 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3643 j++;
3644 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3645 listAddNodeTail(operations,createSortOperation(
3646 REDIS_SORT_GET,c->argv[j+1]));
3647 getop++;
3648 j++;
3649 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3650 listAddNodeTail(operations,createSortOperation(
3651 REDIS_SORT_DEL,c->argv[j+1]));
3652 j++;
3653 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3654 listAddNodeTail(operations,createSortOperation(
3655 REDIS_SORT_INCR,c->argv[j+1]));
3656 j++;
3657 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3658 listAddNodeTail(operations,createSortOperation(
3659 REDIS_SORT_DECR,c->argv[j+1]));
3660 j++;
3661 } else {
3662 decrRefCount(sortval);
3663 listRelease(operations);
3664 addReply(c,shared.syntaxerr);
3665 return;
3666 }
3667 j++;
3668 }
3669
3670 /* Load the sorting vector with all the objects to sort */
3671 vectorlen = (sortval->type == REDIS_LIST) ?
3672 listLength((list*)sortval->ptr) :
3673 dictSize((dict*)sortval->ptr);
3674 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3675 if (!vector) oom("allocating objects vector for SORT");
3676 j = 0;
3677 if (sortval->type == REDIS_LIST) {
3678 list *list = sortval->ptr;
3679 listNode *ln;
3680
3681 listRewind(list);
3682 while((ln = listYield(list))) {
3683 robj *ele = ln->value;
3684 vector[j].obj = ele;
3685 vector[j].u.score = 0;
3686 vector[j].u.cmpobj = NULL;
3687 j++;
3688 }
3689 } else {
3690 dict *set = sortval->ptr;
3691 dictIterator *di;
3692 dictEntry *setele;
3693
3694 di = dictGetIterator(set);
3695 if (!di) oom("dictGetIterator");
3696 while((setele = dictNext(di)) != NULL) {
3697 vector[j].obj = dictGetEntryKey(setele);
3698 vector[j].u.score = 0;
3699 vector[j].u.cmpobj = NULL;
3700 j++;
3701 }
3702 dictReleaseIterator(di);
3703 }
3704 assert(j == vectorlen);
3705
3706 /* Now it's time to load the right scores in the sorting vector */
3707 if (dontsort == 0) {
3708 for (j = 0; j < vectorlen; j++) {
3709 if (sortby) {
3710 robj *byval;
3711
3712 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3713 if (!byval || byval->type != REDIS_STRING) continue;
3714 if (alpha) {
3715 if (byval->encoding == REDIS_ENCODING_RAW) {
3716 vector[j].u.cmpobj = byval;
3717 incrRefCount(byval);
3718 } else {
3719 vector[j].u.cmpobj = getDecodedObject(byval);
3720 }
3721 } else {
3722 if (byval->encoding == REDIS_ENCODING_RAW) {
3723 vector[j].u.score = strtod(byval->ptr,NULL);
3724 } else {
3725 if (byval->encoding == REDIS_ENCODING_INT)
3726 vector[j].u.score = (long)byval->ptr;
3727 else
3728 assert(1 != 1);
3729 }
3730 }
3731 } else {
3732 if (!alpha) {
3733 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
3734 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3735 else {
3736 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
3737 vector[j].u.score = (long) vector[j].obj->ptr;
3738 else
3739 assert(1 != 1);
3740 }
3741 }
3742 }
3743 }
3744 }
3745
3746 /* We are ready to sort the vector... perform a bit of sanity check
3747 * on the LIMIT option too. We'll use a partial version of quicksort. */
3748 start = (limit_start < 0) ? 0 : limit_start;
3749 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3750 if (start >= vectorlen) {
3751 start = vectorlen-1;
3752 end = vectorlen-2;
3753 }
3754 if (end >= vectorlen) end = vectorlen-1;
3755
3756 if (dontsort == 0) {
3757 server.sort_desc = desc;
3758 server.sort_alpha = alpha;
3759 server.sort_bypattern = sortby ? 1 : 0;
3760 if (sortby && (start != 0 || end != vectorlen-1))
3761 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3762 else
3763 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3764 }
3765
3766 /* Send command output to the output buffer, performing the specified
3767 * GET/DEL/INCR/DECR operations if any. */
3768 outputlen = getop ? getop*(end-start+1) : end-start+1;
3769 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3770 for (j = start; j <= end; j++) {
3771 listNode *ln;
3772 if (!getop) {
3773 addReplyBulkLen(c,vector[j].obj);
3774 addReply(c,vector[j].obj);
3775 addReply(c,shared.crlf);
3776 }
3777 listRewind(operations);
3778 while((ln = listYield(operations))) {
3779 redisSortOperation *sop = ln->value;
3780 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3781 vector[j].obj);
3782
3783 if (sop->type == REDIS_SORT_GET) {
3784 if (!val || val->type != REDIS_STRING) {
3785 addReply(c,shared.nullbulk);
3786 } else {
3787 addReplyBulkLen(c,val);
3788 addReply(c,val);
3789 addReply(c,shared.crlf);
3790 }
3791 } else if (sop->type == REDIS_SORT_DEL) {
3792 /* TODO */
3793 }
3794 }
3795 }
3796
3797 /* Cleanup */
3798 decrRefCount(sortval);
3799 listRelease(operations);
3800 for (j = 0; j < vectorlen; j++) {
3801 if (sortby && alpha && vector[j].u.cmpobj)
3802 decrRefCount(vector[j].u.cmpobj);
3803 }
3804 zfree(vector);
3805 }
3806
3807 static void infoCommand(redisClient *c) {
3808 sds info;
3809 time_t uptime = time(NULL)-server.stat_starttime;
3810 int j;
3811
3812 info = sdscatprintf(sdsempty(),
3813 "redis_version:%s\r\n"
3814 "uptime_in_seconds:%d\r\n"
3815 "uptime_in_days:%d\r\n"
3816 "connected_clients:%d\r\n"
3817 "connected_slaves:%d\r\n"
3818 "used_memory:%zu\r\n"
3819 "changes_since_last_save:%lld\r\n"
3820 "bgsave_in_progress:%d\r\n"
3821 "last_save_time:%d\r\n"
3822 "total_connections_received:%lld\r\n"
3823 "total_commands_processed:%lld\r\n"
3824 "role:%s\r\n"
3825 ,REDIS_VERSION,
3826 uptime,
3827 uptime/(3600*24),
3828 listLength(server.clients)-listLength(server.slaves),
3829 listLength(server.slaves),
3830 server.usedmemory,
3831 server.dirty,
3832 server.bgsaveinprogress,
3833 server.lastsave,
3834 server.stat_numconnections,
3835 server.stat_numcommands,
3836 server.masterhost == NULL ? "master" : "slave"
3837 );
3838 if (server.masterhost) {
3839 info = sdscatprintf(info,
3840 "master_host:%s\r\n"
3841 "master_port:%d\r\n"
3842 "master_link_status:%s\r\n"
3843 "master_last_io_seconds_ago:%d\r\n"
3844 ,server.masterhost,
3845 server.masterport,
3846 (server.replstate == REDIS_REPL_CONNECTED) ?
3847 "up" : "down",
3848 (int)(time(NULL)-server.master->lastinteraction)
3849 );
3850 }
3851 for (j = 0; j < server.dbnum; j++) {
3852 long long keys, vkeys;
3853
3854 keys = dictSize(server.db[j].dict);
3855 vkeys = dictSize(server.db[j].expires);
3856 if (keys || vkeys) {
3857 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
3858 j, keys, vkeys);
3859 }
3860 }
3861 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
3862 addReplySds(c,info);
3863 addReply(c,shared.crlf);
3864 }
3865
3866 static void monitorCommand(redisClient *c) {
3867 /* ignore MONITOR if aleady slave or in monitor mode */
3868 if (c->flags & REDIS_SLAVE) return;
3869
3870 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3871 c->slaveseldb = 0;
3872 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3873 addReply(c,shared.ok);
3874 }
3875
3876 /* ================================= Expire ================================= */
3877 static int removeExpire(redisDb *db, robj *key) {
3878 if (dictDelete(db->expires,key) == DICT_OK) {
3879 return 1;
3880 } else {
3881 return 0;
3882 }
3883 }
3884
3885 static int setExpire(redisDb *db, robj *key, time_t when) {
3886 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
3887 return 0;
3888 } else {
3889 incrRefCount(key);
3890 return 1;
3891 }
3892 }
3893
3894 /* Return the expire time of the specified key, or -1 if no expire
3895 * is associated with this key (i.e. the key is non volatile) */
3896 static time_t getExpire(redisDb *db, robj *key) {
3897 dictEntry *de;
3898
3899 /* No expire? return ASAP */
3900 if (dictSize(db->expires) == 0 ||
3901 (de = dictFind(db->expires,key)) == NULL) return -1;
3902
3903 return (time_t) dictGetEntryVal(de);
3904 }
3905
3906 static int expireIfNeeded(redisDb *db, robj *key) {
3907 time_t when;
3908 dictEntry *de;
3909
3910 /* No expire? return ASAP */
3911 if (dictSize(db->expires) == 0 ||
3912 (de = dictFind(db->expires,key)) == NULL) return 0;
3913
3914 /* Lookup the expire */
3915 when = (time_t) dictGetEntryVal(de);
3916 if (time(NULL) <= when) return 0;
3917
3918 /* Delete the key */
3919 dictDelete(db->expires,key);
3920 return dictDelete(db->dict,key) == DICT_OK;
3921 }
3922
3923 static int deleteIfVolatile(redisDb *db, robj *key) {
3924 dictEntry *de;
3925
3926 /* No expire? return ASAP */
3927 if (dictSize(db->expires) == 0 ||
3928 (de = dictFind(db->expires,key)) == NULL) return 0;
3929
3930 /* Delete the key */
3931 server.dirty++;
3932 dictDelete(db->expires,key);
3933 return dictDelete(db->dict,key) == DICT_OK;
3934 }
3935
3936 static void expireCommand(redisClient *c) {
3937 dictEntry *de;
3938 int seconds = atoi(c->argv[2]->ptr);
3939
3940 de = dictFind(c->db->dict,c->argv[1]);
3941 if (de == NULL) {
3942 addReply(c,shared.czero);
3943 return;
3944 }
3945 if (seconds <= 0) {
3946 addReply(c, shared.czero);
3947 return;
3948 } else {
3949 time_t when = time(NULL)+seconds;
3950 if (setExpire(c->db,c->argv[1],when)) {
3951 addReply(c,shared.cone);
3952 server.dirty++;
3953 } else {
3954 addReply(c,shared.czero);
3955 }
3956 return;
3957 }
3958 }
3959
3960 static void ttlCommand(redisClient *c) {
3961 time_t expire;
3962 int ttl = -1;
3963
3964 expire = getExpire(c->db,c->argv[1]);
3965 if (expire != -1) {
3966 ttl = (int) (expire-time(NULL));
3967 if (ttl < 0) ttl = -1;
3968 }
3969 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
3970 }
3971
3972 /* =============================== Replication ============================= */
3973
3974 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
3975 ssize_t nwritten, ret = size;
3976 time_t start = time(NULL);
3977
3978 timeout++;
3979 while(size) {
3980 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
3981 nwritten = write(fd,ptr,size);
3982 if (nwritten == -1) return -1;
3983 ptr += nwritten;
3984 size -= nwritten;
3985 }
3986 if ((time(NULL)-start) > timeout) {
3987 errno = ETIMEDOUT;
3988 return -1;
3989 }
3990 }
3991 return ret;
3992 }
3993
3994 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
3995 ssize_t nread, totread = 0;
3996 time_t start = time(NULL);
3997
3998 timeout++;
3999 while(size) {
4000 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4001 nread = read(fd,ptr,size);
4002 if (nread == -1) return -1;
4003 ptr += nread;
4004 size -= nread;
4005 totread += nread;
4006 }
4007 if ((time(NULL)-start) > timeout) {
4008 errno = ETIMEDOUT;
4009 return -1;
4010 }
4011 }
4012 return totread;
4013 }
4014
4015 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4016 ssize_t nread = 0;
4017
4018 size--;
4019 while(size) {
4020 char c;
4021
4022 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4023 if (c == '\n') {
4024 *ptr = '\0';
4025 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4026 return nread;
4027 } else {
4028 *ptr++ = c;
4029 *ptr = '\0';
4030 nread++;
4031 }
4032 }
4033 return nread;
4034 }
4035
4036 static void syncCommand(redisClient *c) {
4037 /* ignore SYNC if aleady slave or in monitor mode */
4038 if (c->flags & REDIS_SLAVE) return;
4039
4040 /* SYNC can't be issued when the server has pending data to send to
4041 * the client about already issued commands. We need a fresh reply
4042 * buffer registering the differences between the BGSAVE and the current
4043 * dataset, so that we can copy to other slaves if needed. */
4044 if (listLength(c->reply) != 0) {
4045 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4046 return;
4047 }
4048
4049 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4050 /* Here we need to check if there is a background saving operation
4051 * in progress, or if it is required to start one */
4052 if (server.bgsaveinprogress) {
4053 /* Ok a background save is in progress. Let's check if it is a good
4054 * one for replication, i.e. if there is another slave that is
4055 * registering differences since the server forked to save */
4056 redisClient *slave;
4057 listNode *ln;
4058
4059 listRewind(server.slaves);
4060 while((ln = listYield(server.slaves))) {
4061 slave = ln->value;
4062 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4063 }
4064 if (ln) {
4065 /* Perfect, the server is already registering differences for
4066 * another slave. Set the right state, and copy the buffer. */
4067 listRelease(c->reply);
4068 c->reply = listDup(slave->reply);
4069 if (!c->reply) oom("listDup copying slave reply list");
4070 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4071 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4072 } else {
4073 /* No way, we need to wait for the next BGSAVE in order to
4074 * register differences */
4075 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4076 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4077 }
4078 } else {
4079 /* Ok we don't have a BGSAVE in progress, let's start one */
4080 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4081 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4082 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4083 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4084 return;
4085 }
4086 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4087 }
4088 c->repldbfd = -1;
4089 c->flags |= REDIS_SLAVE;
4090 c->slaveseldb = 0;
4091 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
4092 return;
4093 }
4094
4095 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4096 redisClient *slave = privdata;
4097 REDIS_NOTUSED(el);
4098 REDIS_NOTUSED(mask);
4099 char buf[REDIS_IOBUF_LEN];
4100 ssize_t nwritten, buflen;
4101
4102 if (slave->repldboff == 0) {
4103 /* Write the bulk write count before to transfer the DB. In theory here
4104 * we don't know how much room there is in the output buffer of the
4105 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4106 * operations) will never be smaller than the few bytes we need. */
4107 sds bulkcount;
4108
4109 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4110 slave->repldbsize);
4111 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4112 {
4113 sdsfree(bulkcount);
4114 freeClient(slave);
4115 return;
4116 }
4117 sdsfree(bulkcount);
4118 }
4119 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4120 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4121 if (buflen <= 0) {
4122 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4123 (buflen == 0) ? "premature EOF" : strerror(errno));
4124 freeClient(slave);
4125 return;
4126 }
4127 if ((nwritten = write(fd,buf,buflen)) == -1) {
4128 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4129 strerror(errno));
4130 freeClient(slave);
4131 return;
4132 }
4133 slave->repldboff += nwritten;
4134 if (slave->repldboff == slave->repldbsize) {
4135 close(slave->repldbfd);
4136 slave->repldbfd = -1;
4137 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4138 slave->replstate = REDIS_REPL_ONLINE;
4139 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4140 sendReplyToClient, slave, NULL) == AE_ERR) {
4141 freeClient(slave);
4142 return;
4143 }
4144 addReplySds(slave,sdsempty());
4145 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4146 }
4147 }
4148
4149 /* This function is called at the end of every backgrond saving.
4150 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4151 * otherwise REDIS_ERR is passed to the function.
4152 *
4153 * The goal of this function is to handle slaves waiting for a successful
4154 * background saving in order to perform non-blocking synchronization. */
4155 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4156 listNode *ln;
4157 int startbgsave = 0;
4158
4159 listRewind(server.slaves);
4160 while((ln = listYield(server.slaves))) {
4161 redisClient *slave = ln->value;
4162
4163 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4164 startbgsave = 1;
4165 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4166 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4167 struct redis_stat buf;
4168
4169 if (bgsaveerr != REDIS_OK) {
4170 freeClient(slave);
4171 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4172 continue;
4173 }
4174 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4175 redis_fstat(slave->repldbfd,&buf) == -1) {
4176 freeClient(slave);
4177 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4178 continue;
4179 }
4180 slave->repldboff = 0;
4181 slave->repldbsize = buf.st_size;
4182 slave->replstate = REDIS_REPL_SEND_BULK;
4183 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4184 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4185 freeClient(slave);
4186 continue;
4187 }
4188 }
4189 }
4190 if (startbgsave) {
4191 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4192 listRewind(server.slaves);
4193 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4194 while((ln = listYield(server.slaves))) {
4195 redisClient *slave = ln->value;
4196
4197 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4198 freeClient(slave);
4199 }
4200 }
4201 }
4202 }
4203
4204 static int syncWithMaster(void) {
4205 char buf[1024], tmpfile[256];
4206 int dumpsize;
4207 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4208 int dfd;
4209
4210 if (fd == -1) {
4211 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4212 strerror(errno));
4213 return REDIS_ERR;
4214 }
4215 /* Issue the SYNC command */
4216 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4217 close(fd);
4218 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4219 strerror(errno));
4220 return REDIS_ERR;
4221 }
4222 /* Read the bulk write count */
4223 if (syncReadLine(fd,buf,1024,3600) == -1) {
4224 close(fd);
4225 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
4226 strerror(errno));
4227 return REDIS_ERR;
4228 }
4229 dumpsize = atoi(buf+1);
4230 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4231 /* Read the bulk write data on a temp file */
4232 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4233 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4234 if (dfd == -1) {
4235 close(fd);
4236 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
4237 return REDIS_ERR;
4238 }
4239 while(dumpsize) {
4240 int nread, nwritten;
4241
4242 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
4243 if (nread == -1) {
4244 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
4245 strerror(errno));
4246 close(fd);
4247 close(dfd);
4248 return REDIS_ERR;
4249 }
4250 nwritten = write(dfd,buf,nread);
4251 if (nwritten == -1) {
4252 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
4253 close(fd);
4254 close(dfd);
4255 return REDIS_ERR;
4256 }
4257 dumpsize -= nread;
4258 }
4259 close(dfd);
4260 if (rename(tmpfile,server.dbfilename) == -1) {
4261 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
4262 unlink(tmpfile);
4263 close(fd);
4264 return REDIS_ERR;
4265 }
4266 emptyDb();
4267 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4268 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4269 close(fd);
4270 return REDIS_ERR;
4271 }
4272 server.master = createClient(fd);
4273 server.master->flags |= REDIS_MASTER;
4274 server.replstate = REDIS_REPL_CONNECTED;
4275 return REDIS_OK;
4276 }
4277
4278 static void slaveofCommand(redisClient *c) {
4279 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4280 !strcasecmp(c->argv[2]->ptr,"one")) {
4281 if (server.masterhost) {
4282 sdsfree(server.masterhost);
4283 server.masterhost = NULL;
4284 if (server.master) freeClient(server.master);
4285 server.replstate = REDIS_REPL_NONE;
4286 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4287 }
4288 } else {
4289 sdsfree(server.masterhost);
4290 server.masterhost = sdsdup(c->argv[1]->ptr);
4291 server.masterport = atoi(c->argv[2]->ptr);
4292 if (server.master) freeClient(server.master);
4293 server.replstate = REDIS_REPL_CONNECT;
4294 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4295 server.masterhost, server.masterport);
4296 }
4297 addReply(c,shared.ok);
4298 }
4299
4300 /* ============================ Maxmemory directive ======================== */
4301
4302 /* This function gets called when 'maxmemory' is set on the config file to limit
4303 * the max memory used by the server, and we are out of memory.
4304 * This function will try to, in order:
4305 *
4306 * - Free objects from the free list
4307 * - Try to remove keys with an EXPIRE set
4308 *
4309 * It is not possible to free enough memory to reach used-memory < maxmemory
4310 * the server will start refusing commands that will enlarge even more the
4311 * memory usage.
4312 */
4313 static void freeMemoryIfNeeded(void) {
4314 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4315 if (listLength(server.objfreelist)) {
4316 robj *o;
4317
4318 listNode *head = listFirst(server.objfreelist);
4319 o = listNodeValue(head);
4320 listDelNode(server.objfreelist,head);
4321 zfree(o);
4322 } else {
4323 int j, k, freed = 0;
4324
4325 for (j = 0; j < server.dbnum; j++) {
4326 int minttl = -1;
4327 robj *minkey = NULL;
4328 struct dictEntry *de;
4329
4330 if (dictSize(server.db[j].expires)) {
4331 freed = 1;
4332 /* From a sample of three keys drop the one nearest to
4333 * the natural expire */
4334 for (k = 0; k < 3; k++) {
4335 time_t t;
4336
4337 de = dictGetRandomKey(server.db[j].expires);
4338 t = (time_t) dictGetEntryVal(de);
4339 if (minttl == -1 || t < minttl) {
4340 minkey = dictGetEntryKey(de);
4341 minttl = t;
4342 }
4343 }
4344 deleteKey(server.db+j,minkey);
4345 }
4346 }
4347 if (!freed) return; /* nothing to free... */
4348 }
4349 }
4350 }
4351
4352 /* ================================= Debugging ============================== */
4353
4354 static void debugCommand(redisClient *c) {
4355 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4356 *((char*)-1) = 'x';
4357 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4358 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4359 robj *key, *val;
4360
4361 if (!de) {
4362 addReply(c,shared.nokeyerr);
4363 return;
4364 }
4365 key = dictGetEntryKey(de);
4366 val = dictGetEntryVal(de);
4367 addReplySds(c,sdscatprintf(sdsempty(),
4368 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4369 key, key->refcount, val, val->refcount, val->encoding));
4370 } else {
4371 addReplySds(c,sdsnew(
4372 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4373 }
4374 }
4375
4376 #ifdef HAVE_BACKTRACE
4377 static struct redisFunctionSym symsTable[] = {
4378 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
4379 {"dictEncObjHash", (unsigned long)dictEncObjHash},
4380 {"incrDecrCommand", (unsigned long)incrDecrCommand},
4381 {"freeStringObject", (unsigned long)freeStringObject},
4382 {"freeListObject", (unsigned long)freeListObject},
4383 {"freeSetObject", (unsigned long)freeSetObject},
4384 {"decrRefCount", (unsigned long)decrRefCount},
4385 {"createObject", (unsigned long)createObject},
4386 {"freeClient", (unsigned long)freeClient},
4387 {"rdbLoad", (unsigned long)rdbLoad},
4388 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
4389 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
4390 {"addReply", (unsigned long)addReply},
4391 {"addReplySds", (unsigned long)addReplySds},
4392 {"incrRefCount", (unsigned long)incrRefCount},
4393 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
4394 {"createStringObject", (unsigned long)createStringObject},
4395 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
4396 {"syncWithMaster", (unsigned long)syncWithMaster},
4397 {"tryObjectSharing", (unsigned long)tryObjectSharing},
4398 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
4399 {"getDecodedObject", (unsigned long)getDecodedObject},
4400 {"removeExpire", (unsigned long)removeExpire},
4401 {"expireIfNeeded", (unsigned long)expireIfNeeded},
4402 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
4403 {"deleteKey", (unsigned long)deleteKey},
4404 {"getExpire", (unsigned long)getExpire},
4405 {"setExpire", (unsigned long)setExpire},
4406 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
4407 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
4408 {"authCommand", (unsigned long)authCommand},
4409 {"pingCommand", (unsigned long)pingCommand},
4410 {"echoCommand", (unsigned long)echoCommand},
4411 {"setCommand", (unsigned long)setCommand},
4412 {"setnxCommand", (unsigned long)setnxCommand},
4413 {"getCommand", (unsigned long)getCommand},
4414 {"delCommand", (unsigned long)delCommand},
4415 {"existsCommand", (unsigned long)existsCommand},
4416 {"incrCommand", (unsigned long)incrCommand},
4417 {"decrCommand", (unsigned long)decrCommand},
4418 {"incrbyCommand", (unsigned long)incrbyCommand},
4419 {"decrbyCommand", (unsigned long)decrbyCommand},
4420 {"selectCommand", (unsigned long)selectCommand},
4421 {"randomkeyCommand", (unsigned long)randomkeyCommand},
4422 {"keysCommand", (unsigned long)keysCommand},
4423 {"dbsizeCommand", (unsigned long)dbsizeCommand},
4424 {"lastsaveCommand", (unsigned long)lastsaveCommand},
4425 {"saveCommand", (unsigned long)saveCommand},
4426 {"bgsaveCommand", (unsigned long)bgsaveCommand},
4427 {"shutdownCommand", (unsigned long)shutdownCommand},
4428 {"moveCommand", (unsigned long)moveCommand},
4429 {"renameCommand", (unsigned long)renameCommand},
4430 {"renamenxCommand", (unsigned long)renamenxCommand},
4431 {"lpushCommand", (unsigned long)lpushCommand},
4432 {"rpushCommand", (unsigned long)rpushCommand},
4433 {"lpopCommand", (unsigned long)lpopCommand},
4434 {"rpopCommand", (unsigned long)rpopCommand},
4435 {"llenCommand", (unsigned long)llenCommand},
4436 {"lindexCommand", (unsigned long)lindexCommand},
4437 {"lrangeCommand", (unsigned long)lrangeCommand},
4438 {"ltrimCommand", (unsigned long)ltrimCommand},
4439 {"typeCommand", (unsigned long)typeCommand},
4440 {"lsetCommand", (unsigned long)lsetCommand},
4441 {"saddCommand", (unsigned long)saddCommand},
4442 {"sremCommand", (unsigned long)sremCommand},
4443 {"smoveCommand", (unsigned long)smoveCommand},
4444 {"sismemberCommand", (unsigned long)sismemberCommand},
4445 {"scardCommand", (unsigned long)scardCommand},
4446 {"spopCommand", (unsigned long)spopCommand},
4447 {"sinterCommand", (unsigned long)sinterCommand},
4448 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
4449 {"sunionCommand", (unsigned long)sunionCommand},
4450 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
4451 {"sdiffCommand", (unsigned long)sdiffCommand},
4452 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
4453 {"syncCommand", (unsigned long)syncCommand},
4454 {"flushdbCommand", (unsigned long)flushdbCommand},
4455 {"flushallCommand", (unsigned long)flushallCommand},
4456 {"sortCommand", (unsigned long)sortCommand},
4457 {"lremCommand", (unsigned long)lremCommand},
4458 {"infoCommand", (unsigned long)infoCommand},
4459 {"mgetCommand", (unsigned long)mgetCommand},
4460 {"monitorCommand", (unsigned long)monitorCommand},
4461 {"expireCommand", (unsigned long)expireCommand},
4462 {"getSetCommand", (unsigned long)getSetCommand},
4463 {"ttlCommand", (unsigned long)ttlCommand},
4464 {"slaveofCommand", (unsigned long)slaveofCommand},
4465 {"debugCommand", (unsigned long)debugCommand},
4466 {"processCommand", (unsigned long)processCommand},
4467 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
4468 {"readQueryFromClient", (unsigned long)readQueryFromClient},
4469 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
4470 {NULL,0}
4471 };
4472
4473 /* This function try to convert a pointer into a function name. It's used in
4474 * oreder to provide a backtrace under segmentation fault that's able to
4475 * display functions declared as static (otherwise the backtrace is useless). */
4476 static char *findFuncName(void *pointer, unsigned long *offset){
4477 int i, ret = -1;
4478 unsigned long off, minoff = 0;
4479
4480 /* Try to match against the Symbol with the smallest offset */
4481 for (i=0; symsTable[i].pointer; i++) {
4482 unsigned long lp = (unsigned long) pointer;
4483
4484 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
4485 off=lp-symsTable[i].pointer;
4486 if (ret < 0 || off < minoff) {
4487 minoff=off;
4488 ret=i;
4489 }
4490 }
4491 }
4492 if (ret == -1) return NULL;
4493 *offset = minoff;
4494 return symsTable[ret].name;
4495 }
4496
4497 static void *getMcontextEip(ucontext_t *uc) {
4498 #if defined(__FreeBSD__)
4499 return (void*) uc->uc_mcontext.mc_eip;
4500 #elif defined(__dietlibc__)
4501 return (void*) uc->uc_mcontext.eip;
4502 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
4503 return (void*) uc->uc_mcontext->__ss.__eip;
4504 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
4505 #ifdef _STRUCT_X86_THREAD_STATE64
4506 return (void*) uc->uc_mcontext->__ss.__rip;
4507 #else
4508 return (void*) uc->uc_mcontext->__ss.__eip;
4509 #endif
4510 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
4511 return (void*) uc->uc_mcontext.gregs[REG_EIP];
4512 #elif defined(__ia64__) /* Linux IA64 */
4513 return (void*) uc->uc_mcontext.sc_ip;
4514 #else
4515 return NULL;
4516 #endif
4517 }
4518
4519 static void segvHandler(int sig, siginfo_t *info, void *secret) {
4520 void *trace[100];
4521 char **messages = NULL;
4522 int i, trace_size = 0;
4523 unsigned long offset=0;
4524 time_t uptime = time(NULL)-server.stat_starttime;
4525 ucontext_t *uc = (ucontext_t*) secret;
4526 REDIS_NOTUSED(info);
4527
4528 redisLog(REDIS_WARNING,
4529 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
4530 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
4531 "redis_version:%s; "
4532 "uptime_in_seconds:%d; "
4533 "connected_clients:%d; "
4534 "connected_slaves:%d; "
4535 "used_memory:%zu; "
4536 "changes_since_last_save:%lld; "
4537 "bgsave_in_progress:%d; "
4538 "last_save_time:%d; "
4539 "total_connections_received:%lld; "
4540 "total_commands_processed:%lld; "
4541 "role:%s;"
4542 ,REDIS_VERSION,
4543 uptime,
4544 listLength(server.clients)-listLength(server.slaves),
4545 listLength(server.slaves),
4546 server.usedmemory,
4547 server.dirty,
4548 server.bgsaveinprogress,
4549 server.lastsave,
4550 server.stat_numconnections,
4551 server.stat_numcommands,
4552 server.masterhost == NULL ? "master" : "slave"
4553 ));
4554
4555 trace_size = backtrace(trace, 100);
4556 /* overwrite sigaction with caller's address */
4557 if (getMcontextEip(uc) != NULL) {
4558 trace[1] = getMcontextEip(uc);
4559 }
4560 messages = backtrace_symbols(trace, trace_size);
4561
4562 for (i=1; i<trace_size; ++i) {
4563 char *fn = findFuncName(trace[i], &offset), *p;
4564
4565 p = strchr(messages[i],'+');
4566 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
4567 redisLog(REDIS_WARNING,"%s", messages[i]);
4568 } else {
4569 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
4570 }
4571 }
4572 free(messages);
4573 exit(0);
4574 }
4575
4576 static void setupSigSegvAction(void) {
4577 struct sigaction act;
4578
4579 sigemptyset (&act.sa_mask);
4580 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
4581 * is used. Otherwise, sa_handler is used */
4582 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
4583 act.sa_sigaction = segvHandler;
4584 sigaction (SIGSEGV, &act, NULL);
4585 sigaction (SIGBUS, &act, NULL);
4586 sigaction (SIGFPE, &act, NULL);
4587 sigaction (SIGILL, &act, NULL);
4588 sigaction (SIGBUS, &act, NULL);
4589 return;
4590 }
4591 #else /* HAVE_BACKTRACE */
4592 static void setupSigSegvAction(void) {
4593 }
4594 #endif /* HAVE_BACKTRACE */
4595
4596 /* =================================== Main! ================================ */
4597
4598 #ifdef __linux__
4599 int linuxOvercommitMemoryValue(void) {
4600 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4601 char buf[64];
4602
4603 if (!fp) return -1;
4604 if (fgets(buf,64,fp) == NULL) {
4605 fclose(fp);
4606 return -1;
4607 }
4608 fclose(fp);
4609
4610 return atoi(buf);
4611 }
4612
4613 void linuxOvercommitMemoryWarning(void) {
4614 if (linuxOvercommitMemoryValue() == 0) {
4615 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
4616 }
4617 }
4618 #endif /* __linux__ */
4619
4620 static void daemonize(void) {
4621 int fd;
4622 FILE *fp;
4623
4624 if (fork() != 0) exit(0); /* parent exits */
4625 setsid(); /* create a new session */
4626
4627 /* Every output goes to /dev/null. If Redis is daemonized but
4628 * the 'logfile' is set to 'stdout' in the configuration file
4629 * it will not log at all. */
4630 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4631 dup2(fd, STDIN_FILENO);
4632 dup2(fd, STDOUT_FILENO);
4633 dup2(fd, STDERR_FILENO);
4634 if (fd > STDERR_FILENO) close(fd);
4635 }
4636 /* Try to write the pid file */
4637 fp = fopen(server.pidfile,"w");
4638 if (fp) {
4639 fprintf(fp,"%d\n",getpid());
4640 fclose(fp);
4641 }
4642 }
4643
4644 int main(int argc, char **argv) {
4645 initServerConfig();
4646 if (argc == 2) {
4647 ResetServerSaveParams();
4648 loadServerConfig(argv[1]);
4649 } else if (argc > 2) {
4650 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4651 exit(1);
4652 } else {
4653 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4654 }
4655 initServer();
4656 if (server.daemonize) daemonize();
4657 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4658 #ifdef __linux__
4659 linuxOvercommitMemoryWarning();
4660 #endif
4661 if (rdbLoad(server.dbfilename) == REDIS_OK)
4662 redisLog(REDIS_NOTICE,"DB loaded from disk");
4663 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4664 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4665 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4666 aeMain(server.el);
4667 aeDeleteEventLoop(server.el);
4668 return 0;
4669 }