]> git.saurik.com Git - redis.git/blob - redis.c
some work on ZADD against existing element (score update), still broken...
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60
61 #include "redis.h"
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
70
71 /* Error codes */
72 #define REDIS_OK 0
73 #define REDIS_ERR -1
74
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
88
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
91
92 /* Command flags */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
100
101 /* Object types */
102 #define REDIS_STRING 0
103 #define REDIS_LIST 1
104 #define REDIS_SET 2
105 #define REDIS_ZSET 3
106 #define REDIS_HASH 4
107
108 /* Objects encoding */
109 #define REDIS_ENCODING_RAW 0 /* Raw representation */
110 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
111
112 /* Object types only used for dumping to disk */
113 #define REDIS_EXPIRETIME 253
114 #define REDIS_SELECTDB 254
115 #define REDIS_EOF 255
116
117 /* Defines related to the dump file format. To store 32 bits lengths for short
118 * keys requires a lot of space, so we check the most significant 2 bits of
119 * the first byte to interpreter the length:
120 *
121 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
122 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
123 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
124 * 11|000000 this means: specially encoded object will follow. The six bits
125 * number specify the kind of object that follows.
126 * See the REDIS_RDB_ENC_* defines.
127 *
128 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
129 * values, will fit inside. */
130 #define REDIS_RDB_6BITLEN 0
131 #define REDIS_RDB_14BITLEN 1
132 #define REDIS_RDB_32BITLEN 2
133 #define REDIS_RDB_ENCVAL 3
134 #define REDIS_RDB_LENERR UINT_MAX
135
136 /* When a length of a string object stored on disk has the first two bits
137 * set, the remaining two bits specify a special encoding for the object
138 * accordingly to the following defines: */
139 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
140 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
141 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
142 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
143
144 /* Client flags */
145 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
146 #define REDIS_SLAVE 2 /* This client is a slave server */
147 #define REDIS_MASTER 4 /* This client is a master server */
148 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
149
150 /* Slave replication state - slave side */
151 #define REDIS_REPL_NONE 0 /* No active replication */
152 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
153 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
154
155 /* Slave replication state - from the point of view of master
156 * Note that in SEND_BULK and ONLINE state the slave receives new updates
157 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
158 * to start the next background saving in order to send updates to it. */
159 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
160 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
161 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
162 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
163
164 /* List related stuff */
165 #define REDIS_HEAD 0
166 #define REDIS_TAIL 1
167
168 /* Sort operations */
169 #define REDIS_SORT_GET 0
170 #define REDIS_SORT_DEL 1
171 #define REDIS_SORT_INCR 2
172 #define REDIS_SORT_DECR 3
173 #define REDIS_SORT_ASC 4
174 #define REDIS_SORT_DESC 5
175 #define REDIS_SORTKEY_MAX 1024
176
177 /* Log levels */
178 #define REDIS_DEBUG 0
179 #define REDIS_NOTICE 1
180 #define REDIS_WARNING 2
181
182 /* Anti-warning macro... */
183 #define REDIS_NOTUSED(V) ((void) V)
184
185 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
186 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
187
188 /*================================= Data types ============================== */
189
190 /* A redis object, that is a type able to hold a string / list / set */
191 typedef struct redisObject {
192 void *ptr;
193 unsigned char type;
194 unsigned char encoding;
195 unsigned char notused[2];
196 int refcount;
197 } robj;
198
199 typedef struct redisDb {
200 dict *dict;
201 dict *expires;
202 int id;
203 } redisDb;
204
205 /* With multiplexing we need to take per-clinet state.
206 * Clients are taken in a liked list. */
207 typedef struct redisClient {
208 int fd;
209 redisDb *db;
210 int dictid;
211 sds querybuf;
212 robj **argv, **mbargv;
213 int argc, mbargc;
214 int bulklen; /* bulk read len. -1 if not in bulk read mode */
215 int multibulk; /* multi bulk command format active */
216 list *reply;
217 int sentlen;
218 time_t lastinteraction; /* time of the last interaction, used for timeout */
219 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
220 int slaveseldb; /* slave selected db, if this client is a slave */
221 int authenticated; /* when requirepass is non-NULL */
222 int replstate; /* replication state if this is a slave */
223 int repldbfd; /* replication DB file descriptor */
224 long repldboff; /* replication DB file offset */
225 off_t repldbsize; /* replication DB file size */
226 } redisClient;
227
228 struct saveparam {
229 time_t seconds;
230 int changes;
231 };
232
233 /* Global server state structure */
234 struct redisServer {
235 int port;
236 int fd;
237 redisDb *db;
238 dict *sharingpool;
239 unsigned int sharingpoolsize;
240 long long dirty; /* changes to DB from the last save */
241 list *clients;
242 list *slaves, *monitors;
243 char neterr[ANET_ERR_LEN];
244 aeEventLoop *el;
245 int cronloops; /* number of times the cron function run */
246 list *objfreelist; /* A list of freed objects to avoid malloc() */
247 time_t lastsave; /* Unix time of last save succeeede */
248 size_t usedmemory; /* Used memory in megabytes */
249 /* Fields used only for stats */
250 time_t stat_starttime; /* server start time */
251 long long stat_numcommands; /* number of processed commands */
252 long long stat_numconnections; /* number of connections received */
253 /* Configuration */
254 int verbosity;
255 int glueoutputbuf;
256 int maxidletime;
257 int dbnum;
258 int daemonize;
259 char *pidfile;
260 int bgsaveinprogress;
261 pid_t bgsavechildpid;
262 struct saveparam *saveparams;
263 int saveparamslen;
264 char *logfile;
265 char *bindaddr;
266 char *dbfilename;
267 char *requirepass;
268 int shareobjects;
269 /* Replication related */
270 int isslave;
271 char *masterhost;
272 int masterport;
273 redisClient *master; /* client that is master for this slave */
274 int replstate;
275 unsigned int maxclients;
276 unsigned long maxmemory;
277 /* Sort parameters - qsort_r() is only available under BSD so we
278 * have to take this state global, in order to pass it to sortCompare() */
279 int sort_desc;
280 int sort_alpha;
281 int sort_bypattern;
282 };
283
284 typedef void redisCommandProc(redisClient *c);
285 struct redisCommand {
286 char *name;
287 redisCommandProc *proc;
288 int arity;
289 int flags;
290 };
291
292 struct redisFunctionSym {
293 char *name;
294 unsigned long pointer;
295 };
296
297 typedef struct _redisSortObject {
298 robj *obj;
299 union {
300 double score;
301 robj *cmpobj;
302 } u;
303 } redisSortObject;
304
305 typedef struct _redisSortOperation {
306 int type;
307 robj *pattern;
308 } redisSortOperation;
309
310 /* ZSETs use a specialized version of Skiplists */
311
312 typedef struct zskiplistNode {
313 struct zskiplistNode **forward;
314 double score;
315 robj *obj;
316 } zskiplistNode;
317
318 typedef struct zskiplist {
319 struct zskiplistNode *header;
320 long length;
321 int level;
322 } zskiplist;
323
324 typedef struct zset {
325 dict *dict;
326 zskiplist *zsl;
327 } zset;
328
329 /* Our shared "common" objects */
330
331 struct sharedObjectsStruct {
332 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
333 *colon, *nullbulk, *nullmultibulk,
334 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
335 *outofrangeerr, *plus,
336 *select0, *select1, *select2, *select3, *select4,
337 *select5, *select6, *select7, *select8, *select9;
338 } shared;
339
340 /*================================ Prototypes =============================== */
341
342 static void freeStringObject(robj *o);
343 static void freeListObject(robj *o);
344 static void freeSetObject(robj *o);
345 static void decrRefCount(void *o);
346 static robj *createObject(int type, void *ptr);
347 static void freeClient(redisClient *c);
348 static int rdbLoad(char *filename);
349 static void addReply(redisClient *c, robj *obj);
350 static void addReplySds(redisClient *c, sds s);
351 static void incrRefCount(robj *o);
352 static int rdbSaveBackground(char *filename);
353 static robj *createStringObject(char *ptr, size_t len);
354 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
355 static int syncWithMaster(void);
356 static robj *tryObjectSharing(robj *o);
357 static int tryObjectEncoding(robj *o);
358 static robj *getDecodedObject(const robj *o);
359 static int removeExpire(redisDb *db, robj *key);
360 static int expireIfNeeded(redisDb *db, robj *key);
361 static int deleteIfVolatile(redisDb *db, robj *key);
362 static int deleteKey(redisDb *db, robj *key);
363 static time_t getExpire(redisDb *db, robj *key);
364 static int setExpire(redisDb *db, robj *key, time_t when);
365 static void updateSlavesWaitingBgsave(int bgsaveerr);
366 static void freeMemoryIfNeeded(void);
367 static int processCommand(redisClient *c);
368 static void setupSigSegvAction(void);
369 static void rdbRemoveTempFile(pid_t childpid);
370 static size_t stringObjectLen(robj *o);
371 static void processInputBuffer(redisClient *c);
372 static zskiplist *zslCreate(void);
373 static void zslFree(zskiplist *zsl);
374
375 static void authCommand(redisClient *c);
376 static void pingCommand(redisClient *c);
377 static void echoCommand(redisClient *c);
378 static void setCommand(redisClient *c);
379 static void setnxCommand(redisClient *c);
380 static void getCommand(redisClient *c);
381 static void delCommand(redisClient *c);
382 static void existsCommand(redisClient *c);
383 static void incrCommand(redisClient *c);
384 static void decrCommand(redisClient *c);
385 static void incrbyCommand(redisClient *c);
386 static void decrbyCommand(redisClient *c);
387 static void selectCommand(redisClient *c);
388 static void randomkeyCommand(redisClient *c);
389 static void keysCommand(redisClient *c);
390 static void dbsizeCommand(redisClient *c);
391 static void lastsaveCommand(redisClient *c);
392 static void saveCommand(redisClient *c);
393 static void bgsaveCommand(redisClient *c);
394 static void shutdownCommand(redisClient *c);
395 static void moveCommand(redisClient *c);
396 static void renameCommand(redisClient *c);
397 static void renamenxCommand(redisClient *c);
398 static void lpushCommand(redisClient *c);
399 static void rpushCommand(redisClient *c);
400 static void lpopCommand(redisClient *c);
401 static void rpopCommand(redisClient *c);
402 static void llenCommand(redisClient *c);
403 static void lindexCommand(redisClient *c);
404 static void lrangeCommand(redisClient *c);
405 static void ltrimCommand(redisClient *c);
406 static void typeCommand(redisClient *c);
407 static void lsetCommand(redisClient *c);
408 static void saddCommand(redisClient *c);
409 static void sremCommand(redisClient *c);
410 static void smoveCommand(redisClient *c);
411 static void sismemberCommand(redisClient *c);
412 static void scardCommand(redisClient *c);
413 static void spopCommand(redisClient *c);
414 static void srandmemberCommand(redisClient *c);
415 static void sinterCommand(redisClient *c);
416 static void sinterstoreCommand(redisClient *c);
417 static void sunionCommand(redisClient *c);
418 static void sunionstoreCommand(redisClient *c);
419 static void sdiffCommand(redisClient *c);
420 static void sdiffstoreCommand(redisClient *c);
421 static void syncCommand(redisClient *c);
422 static void flushdbCommand(redisClient *c);
423 static void flushallCommand(redisClient *c);
424 static void sortCommand(redisClient *c);
425 static void lremCommand(redisClient *c);
426 static void infoCommand(redisClient *c);
427 static void mgetCommand(redisClient *c);
428 static void monitorCommand(redisClient *c);
429 static void expireCommand(redisClient *c);
430 static void getsetCommand(redisClient *c);
431 static void ttlCommand(redisClient *c);
432 static void slaveofCommand(redisClient *c);
433 static void debugCommand(redisClient *c);
434 static void msetCommand(redisClient *c);
435 static void msetnxCommand(redisClient *c);
436 static void zaddCommand(redisClient *c);
437 static void zrangeCommand(redisClient *c);
438 static void zlenCommand(redisClient *c);
439
440 /*================================= Globals ================================= */
441
442 /* Global vars */
443 static struct redisServer server; /* server global state */
444 static struct redisCommand cmdTable[] = {
445 {"get",getCommand,2,REDIS_CMD_INLINE},
446 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
447 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
448 {"del",delCommand,-2,REDIS_CMD_INLINE},
449 {"exists",existsCommand,2,REDIS_CMD_INLINE},
450 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
451 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
452 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
453 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
454 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
455 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
456 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
457 {"llen",llenCommand,2,REDIS_CMD_INLINE},
458 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
459 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
460 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
461 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
462 {"lrem",lremCommand,4,REDIS_CMD_BULK},
463 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
464 {"srem",sremCommand,3,REDIS_CMD_BULK},
465 {"smove",smoveCommand,4,REDIS_CMD_BULK},
466 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
467 {"scard",scardCommand,2,REDIS_CMD_INLINE},
468 {"spop",spopCommand,2,REDIS_CMD_INLINE},
469 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
470 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
471 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
472 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
473 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
474 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
475 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
476 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
477 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
478 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
479 {"zlen",zlenCommand,2,REDIS_CMD_INLINE},
480 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
481 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
482 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
483 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
484 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
485 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
486 {"select",selectCommand,2,REDIS_CMD_INLINE},
487 {"move",moveCommand,3,REDIS_CMD_INLINE},
488 {"rename",renameCommand,3,REDIS_CMD_INLINE},
489 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
490 {"expire",expireCommand,3,REDIS_CMD_INLINE},
491 {"keys",keysCommand,2,REDIS_CMD_INLINE},
492 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
493 {"auth",authCommand,2,REDIS_CMD_INLINE},
494 {"ping",pingCommand,1,REDIS_CMD_INLINE},
495 {"echo",echoCommand,2,REDIS_CMD_BULK},
496 {"save",saveCommand,1,REDIS_CMD_INLINE},
497 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
498 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
499 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
500 {"type",typeCommand,2,REDIS_CMD_INLINE},
501 {"sync",syncCommand,1,REDIS_CMD_INLINE},
502 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
503 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
504 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
505 {"info",infoCommand,1,REDIS_CMD_INLINE},
506 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
507 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
508 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
509 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
510 {NULL,NULL,0,0}
511 };
512 /*============================ Utility functions ============================ */
513
514 /* Glob-style pattern matching. */
515 int stringmatchlen(const char *pattern, int patternLen,
516 const char *string, int stringLen, int nocase)
517 {
518 while(patternLen) {
519 switch(pattern[0]) {
520 case '*':
521 while (pattern[1] == '*') {
522 pattern++;
523 patternLen--;
524 }
525 if (patternLen == 1)
526 return 1; /* match */
527 while(stringLen) {
528 if (stringmatchlen(pattern+1, patternLen-1,
529 string, stringLen, nocase))
530 return 1; /* match */
531 string++;
532 stringLen--;
533 }
534 return 0; /* no match */
535 break;
536 case '?':
537 if (stringLen == 0)
538 return 0; /* no match */
539 string++;
540 stringLen--;
541 break;
542 case '[':
543 {
544 int not, match;
545
546 pattern++;
547 patternLen--;
548 not = pattern[0] == '^';
549 if (not) {
550 pattern++;
551 patternLen--;
552 }
553 match = 0;
554 while(1) {
555 if (pattern[0] == '\\') {
556 pattern++;
557 patternLen--;
558 if (pattern[0] == string[0])
559 match = 1;
560 } else if (pattern[0] == ']') {
561 break;
562 } else if (patternLen == 0) {
563 pattern--;
564 patternLen++;
565 break;
566 } else if (pattern[1] == '-' && patternLen >= 3) {
567 int start = pattern[0];
568 int end = pattern[2];
569 int c = string[0];
570 if (start > end) {
571 int t = start;
572 start = end;
573 end = t;
574 }
575 if (nocase) {
576 start = tolower(start);
577 end = tolower(end);
578 c = tolower(c);
579 }
580 pattern += 2;
581 patternLen -= 2;
582 if (c >= start && c <= end)
583 match = 1;
584 } else {
585 if (!nocase) {
586 if (pattern[0] == string[0])
587 match = 1;
588 } else {
589 if (tolower((int)pattern[0]) == tolower((int)string[0]))
590 match = 1;
591 }
592 }
593 pattern++;
594 patternLen--;
595 }
596 if (not)
597 match = !match;
598 if (!match)
599 return 0; /* no match */
600 string++;
601 stringLen--;
602 break;
603 }
604 case '\\':
605 if (patternLen >= 2) {
606 pattern++;
607 patternLen--;
608 }
609 /* fall through */
610 default:
611 if (!nocase) {
612 if (pattern[0] != string[0])
613 return 0; /* no match */
614 } else {
615 if (tolower((int)pattern[0]) != tolower((int)string[0]))
616 return 0; /* no match */
617 }
618 string++;
619 stringLen--;
620 break;
621 }
622 pattern++;
623 patternLen--;
624 if (stringLen == 0) {
625 while(*pattern == '*') {
626 pattern++;
627 patternLen--;
628 }
629 break;
630 }
631 }
632 if (patternLen == 0 && stringLen == 0)
633 return 1;
634 return 0;
635 }
636
637 static void redisLog(int level, const char *fmt, ...) {
638 va_list ap;
639 FILE *fp;
640
641 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
642 if (!fp) return;
643
644 va_start(ap, fmt);
645 if (level >= server.verbosity) {
646 char *c = ".-*";
647 char buf[64];
648 time_t now;
649
650 now = time(NULL);
651 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
652 fprintf(fp,"%s %c ",buf,c[level]);
653 vfprintf(fp, fmt, ap);
654 fprintf(fp,"\n");
655 fflush(fp);
656 }
657 va_end(ap);
658
659 if (server.logfile) fclose(fp);
660 }
661
662 /*====================== Hash table type implementation ==================== */
663
664 /* This is an hash table type that uses the SDS dynamic strings libary as
665 * keys and radis objects as values (objects can hold SDS strings,
666 * lists, sets). */
667
668 static void dictVanillaFree(void *privdata, void *val)
669 {
670 DICT_NOTUSED(privdata);
671 zfree(val);
672 }
673
674 static int sdsDictKeyCompare(void *privdata, const void *key1,
675 const void *key2)
676 {
677 int l1,l2;
678 DICT_NOTUSED(privdata);
679
680 l1 = sdslen((sds)key1);
681 l2 = sdslen((sds)key2);
682 if (l1 != l2) return 0;
683 return memcmp(key1, key2, l1) == 0;
684 }
685
686 static void dictRedisObjectDestructor(void *privdata, void *val)
687 {
688 DICT_NOTUSED(privdata);
689
690 decrRefCount(val);
691 }
692
693 static int dictObjKeyCompare(void *privdata, const void *key1,
694 const void *key2)
695 {
696 const robj *o1 = key1, *o2 = key2;
697 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
698 }
699
700 static unsigned int dictObjHash(const void *key) {
701 const robj *o = key;
702 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
703 }
704
705 static int dictEncObjKeyCompare(void *privdata, const void *key1,
706 const void *key2)
707 {
708 const robj *o1 = key1, *o2 = key2;
709
710 if (o1->encoding == REDIS_ENCODING_RAW &&
711 o2->encoding == REDIS_ENCODING_RAW)
712 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
713 else {
714 robj *dec1, *dec2;
715 int cmp;
716
717 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
718 getDecodedObject(o1) : (robj*)o1;
719 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
720 getDecodedObject(o2) : (robj*)o2;
721 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
722 if (dec1 != o1) decrRefCount(dec1);
723 if (dec2 != o2) decrRefCount(dec2);
724 return cmp;
725 }
726 }
727
728 static unsigned int dictEncObjHash(const void *key) {
729 const robj *o = key;
730
731 if (o->encoding == REDIS_ENCODING_RAW)
732 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
733 else {
734 robj *dec = getDecodedObject(o);
735 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
736 decrRefCount(dec);
737 return hash;
738 }
739 }
740
741 static dictType setDictType = {
742 dictEncObjHash, /* hash function */
743 NULL, /* key dup */
744 NULL, /* val dup */
745 dictEncObjKeyCompare, /* key compare */
746 dictRedisObjectDestructor, /* key destructor */
747 NULL /* val destructor */
748 };
749
750 static dictType zsetDictType = {
751 dictEncObjHash, /* hash function */
752 NULL, /* key dup */
753 NULL, /* val dup */
754 dictEncObjKeyCompare, /* key compare */
755 dictRedisObjectDestructor, /* key destructor */
756 dictVanillaFree /* val destructor */
757 };
758
759 static dictType hashDictType = {
760 dictObjHash, /* hash function */
761 NULL, /* key dup */
762 NULL, /* val dup */
763 dictObjKeyCompare, /* key compare */
764 dictRedisObjectDestructor, /* key destructor */
765 dictRedisObjectDestructor /* val destructor */
766 };
767
768 /* ========================= Random utility functions ======================= */
769
770 /* Redis generally does not try to recover from out of memory conditions
771 * when allocating objects or strings, it is not clear if it will be possible
772 * to report this condition to the client since the networking layer itself
773 * is based on heap allocation for send buffers, so we simply abort.
774 * At least the code will be simpler to read... */
775 static void oom(const char *msg) {
776 fprintf(stderr, "%s: Out of memory\n",msg);
777 fflush(stderr);
778 sleep(1);
779 abort();
780 }
781
782 /* ====================== Redis server networking stuff ===================== */
783 static void closeTimedoutClients(void) {
784 redisClient *c;
785 listNode *ln;
786 time_t now = time(NULL);
787
788 listRewind(server.clients);
789 while ((ln = listYield(server.clients)) != NULL) {
790 c = listNodeValue(ln);
791 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
792 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
793 (now - c->lastinteraction > server.maxidletime)) {
794 redisLog(REDIS_DEBUG,"Closing idle client");
795 freeClient(c);
796 }
797 }
798 }
799
800 static int htNeedsResize(dict *dict) {
801 long long size, used;
802
803 size = dictSlots(dict);
804 used = dictSize(dict);
805 return (size && used && size > DICT_HT_INITIAL_SIZE &&
806 (used*100/size < REDIS_HT_MINFILL));
807 }
808
809 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
810 * we resize the hash table to save memory */
811 static void tryResizeHashTables(void) {
812 int j;
813
814 for (j = 0; j < server.dbnum; j++) {
815 if (htNeedsResize(server.db[j].dict)) {
816 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
817 dictResize(server.db[j].dict);
818 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
819 }
820 if (htNeedsResize(server.db[j].expires))
821 dictResize(server.db[j].expires);
822 }
823 }
824
825 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
826 int j, loops = server.cronloops++;
827 REDIS_NOTUSED(eventLoop);
828 REDIS_NOTUSED(id);
829 REDIS_NOTUSED(clientData);
830
831 /* Update the global state with the amount of used memory */
832 server.usedmemory = zmalloc_used_memory();
833
834 /* Show some info about non-empty databases */
835 for (j = 0; j < server.dbnum; j++) {
836 long long size, used, vkeys;
837
838 size = dictSlots(server.db[j].dict);
839 used = dictSize(server.db[j].dict);
840 vkeys = dictSize(server.db[j].expires);
841 if (!(loops % 5) && (used || vkeys)) {
842 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
843 /* dictPrintStats(server.dict); */
844 }
845 }
846
847 /* We don't want to resize the hash tables while a bacground saving
848 * is in progress: the saving child is created using fork() that is
849 * implemented with a copy-on-write semantic in most modern systems, so
850 * if we resize the HT while there is the saving child at work actually
851 * a lot of memory movements in the parent will cause a lot of pages
852 * copied. */
853 if (!server.bgsaveinprogress) tryResizeHashTables();
854
855 /* Show information about connected clients */
856 if (!(loops % 5)) {
857 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
858 listLength(server.clients)-listLength(server.slaves),
859 listLength(server.slaves),
860 server.usedmemory,
861 dictSize(server.sharingpool));
862 }
863
864 /* Close connections of timedout clients */
865 if (server.maxidletime && !(loops % 10))
866 closeTimedoutClients();
867
868 /* Check if a background saving in progress terminated */
869 if (server.bgsaveinprogress) {
870 int statloc;
871 if (wait4(-1,&statloc,WNOHANG,NULL)) {
872 int exitcode = WEXITSTATUS(statloc);
873 int bysignal = WIFSIGNALED(statloc);
874
875 if (!bysignal && exitcode == 0) {
876 redisLog(REDIS_NOTICE,
877 "Background saving terminated with success");
878 server.dirty = 0;
879 server.lastsave = time(NULL);
880 } else if (!bysignal && exitcode != 0) {
881 redisLog(REDIS_WARNING, "Background saving error");
882 } else {
883 redisLog(REDIS_WARNING,
884 "Background saving terminated by signal");
885 rdbRemoveTempFile(server.bgsavechildpid);
886 }
887 server.bgsaveinprogress = 0;
888 server.bgsavechildpid = -1;
889 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
890 }
891 } else {
892 /* If there is not a background saving in progress check if
893 * we have to save now */
894 time_t now = time(NULL);
895 for (j = 0; j < server.saveparamslen; j++) {
896 struct saveparam *sp = server.saveparams+j;
897
898 if (server.dirty >= sp->changes &&
899 now-server.lastsave > sp->seconds) {
900 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
901 sp->changes, sp->seconds);
902 rdbSaveBackground(server.dbfilename);
903 break;
904 }
905 }
906 }
907
908 /* Try to expire a few timed out keys */
909 for (j = 0; j < server.dbnum; j++) {
910 redisDb *db = server.db+j;
911 int num = dictSize(db->expires);
912
913 if (num) {
914 time_t now = time(NULL);
915
916 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
917 num = REDIS_EXPIRELOOKUPS_PER_CRON;
918 while (num--) {
919 dictEntry *de;
920 time_t t;
921
922 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
923 t = (time_t) dictGetEntryVal(de);
924 if (now > t) {
925 deleteKey(db,dictGetEntryKey(de));
926 }
927 }
928 }
929 }
930
931 /* Check if we should connect to a MASTER */
932 if (server.replstate == REDIS_REPL_CONNECT) {
933 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
934 if (syncWithMaster() == REDIS_OK) {
935 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
936 }
937 }
938 return 1000;
939 }
940
941 static void createSharedObjects(void) {
942 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
943 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
944 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
945 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
946 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
947 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
948 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
949 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
950 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
951 /* no such key */
952 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
953 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
954 "-ERR Operation against a key holding the wrong kind of value\r\n"));
955 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
956 "-ERR no such key\r\n"));
957 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
958 "-ERR syntax error\r\n"));
959 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
960 "-ERR source and destination objects are the same\r\n"));
961 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
962 "-ERR index out of range\r\n"));
963 shared.space = createObject(REDIS_STRING,sdsnew(" "));
964 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
965 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
966 shared.select0 = createStringObject("select 0\r\n",10);
967 shared.select1 = createStringObject("select 1\r\n",10);
968 shared.select2 = createStringObject("select 2\r\n",10);
969 shared.select3 = createStringObject("select 3\r\n",10);
970 shared.select4 = createStringObject("select 4\r\n",10);
971 shared.select5 = createStringObject("select 5\r\n",10);
972 shared.select6 = createStringObject("select 6\r\n",10);
973 shared.select7 = createStringObject("select 7\r\n",10);
974 shared.select8 = createStringObject("select 8\r\n",10);
975 shared.select9 = createStringObject("select 9\r\n",10);
976 }
977
978 static void appendServerSaveParams(time_t seconds, int changes) {
979 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
980 server.saveparams[server.saveparamslen].seconds = seconds;
981 server.saveparams[server.saveparamslen].changes = changes;
982 server.saveparamslen++;
983 }
984
985 static void ResetServerSaveParams() {
986 zfree(server.saveparams);
987 server.saveparams = NULL;
988 server.saveparamslen = 0;
989 }
990
991 static void initServerConfig() {
992 server.dbnum = REDIS_DEFAULT_DBNUM;
993 server.port = REDIS_SERVERPORT;
994 server.verbosity = REDIS_DEBUG;
995 server.maxidletime = REDIS_MAXIDLETIME;
996 server.saveparams = NULL;
997 server.logfile = NULL; /* NULL = log on standard output */
998 server.bindaddr = NULL;
999 server.glueoutputbuf = 1;
1000 server.daemonize = 0;
1001 server.pidfile = "/var/run/redis.pid";
1002 server.dbfilename = "dump.rdb";
1003 server.requirepass = NULL;
1004 server.shareobjects = 0;
1005 server.sharingpoolsize = 1024;
1006 server.maxclients = 0;
1007 server.maxmemory = 0;
1008 ResetServerSaveParams();
1009
1010 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1011 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1012 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1013 /* Replication related */
1014 server.isslave = 0;
1015 server.masterhost = NULL;
1016 server.masterport = 6379;
1017 server.master = NULL;
1018 server.replstate = REDIS_REPL_NONE;
1019 }
1020
1021 static void initServer() {
1022 int j;
1023
1024 signal(SIGHUP, SIG_IGN);
1025 signal(SIGPIPE, SIG_IGN);
1026 setupSigSegvAction();
1027
1028 server.clients = listCreate();
1029 server.slaves = listCreate();
1030 server.monitors = listCreate();
1031 server.objfreelist = listCreate();
1032 createSharedObjects();
1033 server.el = aeCreateEventLoop();
1034 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1035 server.sharingpool = dictCreate(&setDictType,NULL);
1036 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1037 if (server.fd == -1) {
1038 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1039 exit(1);
1040 }
1041 for (j = 0; j < server.dbnum; j++) {
1042 server.db[j].dict = dictCreate(&hashDictType,NULL);
1043 server.db[j].expires = dictCreate(&setDictType,NULL);
1044 server.db[j].id = j;
1045 }
1046 server.cronloops = 0;
1047 server.bgsaveinprogress = 0;
1048 server.bgsavechildpid = -1;
1049 server.lastsave = time(NULL);
1050 server.dirty = 0;
1051 server.usedmemory = 0;
1052 server.stat_numcommands = 0;
1053 server.stat_numconnections = 0;
1054 server.stat_starttime = time(NULL);
1055 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1056 }
1057
1058 /* Empty the whole database */
1059 static long long emptyDb() {
1060 int j;
1061 long long removed = 0;
1062
1063 for (j = 0; j < server.dbnum; j++) {
1064 removed += dictSize(server.db[j].dict);
1065 dictEmpty(server.db[j].dict);
1066 dictEmpty(server.db[j].expires);
1067 }
1068 return removed;
1069 }
1070
1071 static int yesnotoi(char *s) {
1072 if (!strcasecmp(s,"yes")) return 1;
1073 else if (!strcasecmp(s,"no")) return 0;
1074 else return -1;
1075 }
1076
1077 /* I agree, this is a very rudimental way to load a configuration...
1078 will improve later if the config gets more complex */
1079 static void loadServerConfig(char *filename) {
1080 FILE *fp;
1081 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1082 int linenum = 0;
1083 sds line = NULL;
1084
1085 if (filename[0] == '-' && filename[1] == '\0')
1086 fp = stdin;
1087 else {
1088 if ((fp = fopen(filename,"r")) == NULL) {
1089 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1090 exit(1);
1091 }
1092 }
1093
1094 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1095 sds *argv;
1096 int argc, j;
1097
1098 linenum++;
1099 line = sdsnew(buf);
1100 line = sdstrim(line," \t\r\n");
1101
1102 /* Skip comments and blank lines*/
1103 if (line[0] == '#' || line[0] == '\0') {
1104 sdsfree(line);
1105 continue;
1106 }
1107
1108 /* Split into arguments */
1109 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1110 sdstolower(argv[0]);
1111
1112 /* Execute config directives */
1113 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1114 server.maxidletime = atoi(argv[1]);
1115 if (server.maxidletime < 0) {
1116 err = "Invalid timeout value"; goto loaderr;
1117 }
1118 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1119 server.port = atoi(argv[1]);
1120 if (server.port < 1 || server.port > 65535) {
1121 err = "Invalid port"; goto loaderr;
1122 }
1123 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1124 server.bindaddr = zstrdup(argv[1]);
1125 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1126 int seconds = atoi(argv[1]);
1127 int changes = atoi(argv[2]);
1128 if (seconds < 1 || changes < 0) {
1129 err = "Invalid save parameters"; goto loaderr;
1130 }
1131 appendServerSaveParams(seconds,changes);
1132 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1133 if (chdir(argv[1]) == -1) {
1134 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1135 argv[1], strerror(errno));
1136 exit(1);
1137 }
1138 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1139 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1140 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1141 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1142 else {
1143 err = "Invalid log level. Must be one of debug, notice, warning";
1144 goto loaderr;
1145 }
1146 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1147 FILE *logfp;
1148
1149 server.logfile = zstrdup(argv[1]);
1150 if (!strcasecmp(server.logfile,"stdout")) {
1151 zfree(server.logfile);
1152 server.logfile = NULL;
1153 }
1154 if (server.logfile) {
1155 /* Test if we are able to open the file. The server will not
1156 * be able to abort just for this problem later... */
1157 logfp = fopen(server.logfile,"a");
1158 if (logfp == NULL) {
1159 err = sdscatprintf(sdsempty(),
1160 "Can't open the log file: %s", strerror(errno));
1161 goto loaderr;
1162 }
1163 fclose(logfp);
1164 }
1165 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1166 server.dbnum = atoi(argv[1]);
1167 if (server.dbnum < 1) {
1168 err = "Invalid number of databases"; goto loaderr;
1169 }
1170 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1171 server.maxclients = atoi(argv[1]);
1172 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1173 server.maxmemory = strtoll(argv[1], NULL, 10);
1174 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1175 server.masterhost = sdsnew(argv[1]);
1176 server.masterport = atoi(argv[2]);
1177 server.replstate = REDIS_REPL_CONNECT;
1178 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1179 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1180 err = "argument must be 'yes' or 'no'"; goto loaderr;
1181 }
1182 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1183 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1184 err = "argument must be 'yes' or 'no'"; goto loaderr;
1185 }
1186 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1187 server.sharingpoolsize = atoi(argv[1]);
1188 if (server.sharingpoolsize < 1) {
1189 err = "invalid object sharing pool size"; goto loaderr;
1190 }
1191 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1192 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1193 err = "argument must be 'yes' or 'no'"; goto loaderr;
1194 }
1195 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1196 server.requirepass = zstrdup(argv[1]);
1197 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1198 server.pidfile = zstrdup(argv[1]);
1199 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1200 server.dbfilename = zstrdup(argv[1]);
1201 } else {
1202 err = "Bad directive or wrong number of arguments"; goto loaderr;
1203 }
1204 for (j = 0; j < argc; j++)
1205 sdsfree(argv[j]);
1206 zfree(argv);
1207 sdsfree(line);
1208 }
1209 if (fp != stdin) fclose(fp);
1210 return;
1211
1212 loaderr:
1213 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1214 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1215 fprintf(stderr, ">>> '%s'\n", line);
1216 fprintf(stderr, "%s\n", err);
1217 exit(1);
1218 }
1219
1220 static void freeClientArgv(redisClient *c) {
1221 int j;
1222
1223 for (j = 0; j < c->argc; j++)
1224 decrRefCount(c->argv[j]);
1225 for (j = 0; j < c->mbargc; j++)
1226 decrRefCount(c->mbargv[j]);
1227 c->argc = 0;
1228 c->mbargc = 0;
1229 }
1230
1231 static void freeClient(redisClient *c) {
1232 listNode *ln;
1233
1234 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1235 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1236 sdsfree(c->querybuf);
1237 listRelease(c->reply);
1238 freeClientArgv(c);
1239 close(c->fd);
1240 ln = listSearchKey(server.clients,c);
1241 assert(ln != NULL);
1242 listDelNode(server.clients,ln);
1243 if (c->flags & REDIS_SLAVE) {
1244 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1245 close(c->repldbfd);
1246 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1247 ln = listSearchKey(l,c);
1248 assert(ln != NULL);
1249 listDelNode(l,ln);
1250 }
1251 if (c->flags & REDIS_MASTER) {
1252 server.master = NULL;
1253 server.replstate = REDIS_REPL_CONNECT;
1254 }
1255 zfree(c->argv);
1256 zfree(c->mbargv);
1257 zfree(c);
1258 }
1259
1260 static void glueReplyBuffersIfNeeded(redisClient *c) {
1261 int totlen = 0;
1262 listNode *ln;
1263 robj *o;
1264
1265 listRewind(c->reply);
1266 while((ln = listYield(c->reply))) {
1267 o = ln->value;
1268 totlen += sdslen(o->ptr);
1269 /* This optimization makes more sense if we don't have to copy
1270 * too much data */
1271 if (totlen > 1024) return;
1272 }
1273 if (totlen > 0) {
1274 char buf[1024];
1275 int copylen = 0;
1276
1277 listRewind(c->reply);
1278 while((ln = listYield(c->reply))) {
1279 o = ln->value;
1280 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1281 copylen += sdslen(o->ptr);
1282 listDelNode(c->reply,ln);
1283 }
1284 /* Now the output buffer is empty, add the new single element */
1285 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1286 listAddNodeTail(c->reply,o);
1287 }
1288 }
1289
1290 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1291 redisClient *c = privdata;
1292 int nwritten = 0, totwritten = 0, objlen;
1293 robj *o;
1294 REDIS_NOTUSED(el);
1295 REDIS_NOTUSED(mask);
1296
1297 if (server.glueoutputbuf && listLength(c->reply) > 1)
1298 glueReplyBuffersIfNeeded(c);
1299 while(listLength(c->reply)) {
1300 o = listNodeValue(listFirst(c->reply));
1301 objlen = sdslen(o->ptr);
1302
1303 if (objlen == 0) {
1304 listDelNode(c->reply,listFirst(c->reply));
1305 continue;
1306 }
1307
1308 if (c->flags & REDIS_MASTER) {
1309 /* Don't reply to a master */
1310 nwritten = objlen - c->sentlen;
1311 } else {
1312 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1313 if (nwritten <= 0) break;
1314 }
1315 c->sentlen += nwritten;
1316 totwritten += nwritten;
1317 /* If we fully sent the object on head go to the next one */
1318 if (c->sentlen == objlen) {
1319 listDelNode(c->reply,listFirst(c->reply));
1320 c->sentlen = 0;
1321 }
1322 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1323 * bytes, in a single threaded server it's a good idea to server
1324 * other clients as well, even if a very large request comes from
1325 * super fast link that is always able to accept data (in real world
1326 * terms think to 'KEYS *' against the loopback interfae) */
1327 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1328 }
1329 if (nwritten == -1) {
1330 if (errno == EAGAIN) {
1331 nwritten = 0;
1332 } else {
1333 redisLog(REDIS_DEBUG,
1334 "Error writing to client: %s", strerror(errno));
1335 freeClient(c);
1336 return;
1337 }
1338 }
1339 if (totwritten > 0) c->lastinteraction = time(NULL);
1340 if (listLength(c->reply) == 0) {
1341 c->sentlen = 0;
1342 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1343 }
1344 }
1345
1346 static struct redisCommand *lookupCommand(char *name) {
1347 int j = 0;
1348 while(cmdTable[j].name != NULL) {
1349 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1350 j++;
1351 }
1352 return NULL;
1353 }
1354
1355 /* resetClient prepare the client to process the next command */
1356 static void resetClient(redisClient *c) {
1357 freeClientArgv(c);
1358 c->bulklen = -1;
1359 c->multibulk = 0;
1360 }
1361
1362 /* If this function gets called we already read a whole
1363 * command, argments are in the client argv/argc fields.
1364 * processCommand() execute the command or prepare the
1365 * server for a bulk read from the client.
1366 *
1367 * If 1 is returned the client is still alive and valid and
1368 * and other operations can be performed by the caller. Otherwise
1369 * if 0 is returned the client was destroied (i.e. after QUIT). */
1370 static int processCommand(redisClient *c) {
1371 struct redisCommand *cmd;
1372 long long dirty;
1373
1374 /* Free some memory if needed (maxmemory setting) */
1375 if (server.maxmemory) freeMemoryIfNeeded();
1376
1377 /* Handle the multi bulk command type. This is an alternative protocol
1378 * supported by Redis in order to receive commands that are composed of
1379 * multiple binary-safe "bulk" arguments. The latency of processing is
1380 * a bit higher but this allows things like multi-sets, so if this
1381 * protocol is used only for MSET and similar commands this is a big win. */
1382 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1383 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1384 if (c->multibulk <= 0) {
1385 resetClient(c);
1386 return 1;
1387 } else {
1388 decrRefCount(c->argv[c->argc-1]);
1389 c->argc--;
1390 return 1;
1391 }
1392 } else if (c->multibulk) {
1393 if (c->bulklen == -1) {
1394 if (((char*)c->argv[0]->ptr)[0] != '$') {
1395 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1396 resetClient(c);
1397 return 1;
1398 } else {
1399 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1400 decrRefCount(c->argv[0]);
1401 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1402 c->argc--;
1403 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1404 resetClient(c);
1405 return 1;
1406 }
1407 c->argc--;
1408 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1409 return 1;
1410 }
1411 } else {
1412 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1413 c->mbargv[c->mbargc] = c->argv[0];
1414 c->mbargc++;
1415 c->argc--;
1416 c->multibulk--;
1417 if (c->multibulk == 0) {
1418 robj **auxargv;
1419 int auxargc;
1420
1421 /* Here we need to swap the multi-bulk argc/argv with the
1422 * normal argc/argv of the client structure. */
1423 auxargv = c->argv;
1424 c->argv = c->mbargv;
1425 c->mbargv = auxargv;
1426
1427 auxargc = c->argc;
1428 c->argc = c->mbargc;
1429 c->mbargc = auxargc;
1430
1431 /* We need to set bulklen to something different than -1
1432 * in order for the code below to process the command without
1433 * to try to read the last argument of a bulk command as
1434 * a special argument. */
1435 c->bulklen = 0;
1436 /* continue below and process the command */
1437 } else {
1438 c->bulklen = -1;
1439 return 1;
1440 }
1441 }
1442 }
1443 /* -- end of multi bulk commands processing -- */
1444
1445 /* The QUIT command is handled as a special case. Normal command
1446 * procs are unable to close the client connection safely */
1447 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1448 freeClient(c);
1449 return 0;
1450 }
1451 cmd = lookupCommand(c->argv[0]->ptr);
1452 if (!cmd) {
1453 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1454 resetClient(c);
1455 return 1;
1456 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1457 (c->argc < -cmd->arity)) {
1458 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1459 resetClient(c);
1460 return 1;
1461 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1462 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1463 resetClient(c);
1464 return 1;
1465 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1466 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1467
1468 decrRefCount(c->argv[c->argc-1]);
1469 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1470 c->argc--;
1471 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1472 resetClient(c);
1473 return 1;
1474 }
1475 c->argc--;
1476 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1477 /* It is possible that the bulk read is already in the
1478 * buffer. Check this condition and handle it accordingly.
1479 * This is just a fast path, alternative to call processInputBuffer().
1480 * It's a good idea since the code is small and this condition
1481 * happens most of the times. */
1482 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1483 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1484 c->argc++;
1485 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1486 } else {
1487 return 1;
1488 }
1489 }
1490 /* Let's try to share objects on the command arguments vector */
1491 if (server.shareobjects) {
1492 int j;
1493 for(j = 1; j < c->argc; j++)
1494 c->argv[j] = tryObjectSharing(c->argv[j]);
1495 }
1496 /* Let's try to encode the bulk object to save space. */
1497 if (cmd->flags & REDIS_CMD_BULK)
1498 tryObjectEncoding(c->argv[c->argc-1]);
1499
1500 /* Check if the user is authenticated */
1501 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1502 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1503 resetClient(c);
1504 return 1;
1505 }
1506
1507 /* Exec the command */
1508 dirty = server.dirty;
1509 cmd->proc(c);
1510 if (server.dirty-dirty != 0 && listLength(server.slaves))
1511 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1512 if (listLength(server.monitors))
1513 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1514 server.stat_numcommands++;
1515
1516 /* Prepare the client for the next command */
1517 if (c->flags & REDIS_CLOSE) {
1518 freeClient(c);
1519 return 0;
1520 }
1521 resetClient(c);
1522 return 1;
1523 }
1524
1525 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1526 listNode *ln;
1527 int outc = 0, j;
1528 robj **outv;
1529 /* (args*2)+1 is enough room for args, spaces, newlines */
1530 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1531
1532 if (argc <= REDIS_STATIC_ARGS) {
1533 outv = static_outv;
1534 } else {
1535 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1536 }
1537
1538 for (j = 0; j < argc; j++) {
1539 if (j != 0) outv[outc++] = shared.space;
1540 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1541 robj *lenobj;
1542
1543 lenobj = createObject(REDIS_STRING,
1544 sdscatprintf(sdsempty(),"%d\r\n",
1545 stringObjectLen(argv[j])));
1546 lenobj->refcount = 0;
1547 outv[outc++] = lenobj;
1548 }
1549 outv[outc++] = argv[j];
1550 }
1551 outv[outc++] = shared.crlf;
1552
1553 /* Increment all the refcounts at start and decrement at end in order to
1554 * be sure to free objects if there is no slave in a replication state
1555 * able to be feed with commands */
1556 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1557 listRewind(slaves);
1558 while((ln = listYield(slaves))) {
1559 redisClient *slave = ln->value;
1560
1561 /* Don't feed slaves that are still waiting for BGSAVE to start */
1562 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1563
1564 /* Feed all the other slaves, MONITORs and so on */
1565 if (slave->slaveseldb != dictid) {
1566 robj *selectcmd;
1567
1568 switch(dictid) {
1569 case 0: selectcmd = shared.select0; break;
1570 case 1: selectcmd = shared.select1; break;
1571 case 2: selectcmd = shared.select2; break;
1572 case 3: selectcmd = shared.select3; break;
1573 case 4: selectcmd = shared.select4; break;
1574 case 5: selectcmd = shared.select5; break;
1575 case 6: selectcmd = shared.select6; break;
1576 case 7: selectcmd = shared.select7; break;
1577 case 8: selectcmd = shared.select8; break;
1578 case 9: selectcmd = shared.select9; break;
1579 default:
1580 selectcmd = createObject(REDIS_STRING,
1581 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1582 selectcmd->refcount = 0;
1583 break;
1584 }
1585 addReply(slave,selectcmd);
1586 slave->slaveseldb = dictid;
1587 }
1588 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1589 }
1590 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1591 if (outv != static_outv) zfree(outv);
1592 }
1593
1594 static void processInputBuffer(redisClient *c) {
1595 again:
1596 if (c->bulklen == -1) {
1597 /* Read the first line of the query */
1598 char *p = strchr(c->querybuf,'\n');
1599 size_t querylen;
1600
1601 if (p) {
1602 sds query, *argv;
1603 int argc, j;
1604
1605 query = c->querybuf;
1606 c->querybuf = sdsempty();
1607 querylen = 1+(p-(query));
1608 if (sdslen(query) > querylen) {
1609 /* leave data after the first line of the query in the buffer */
1610 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1611 }
1612 *p = '\0'; /* remove "\n" */
1613 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1614 sdsupdatelen(query);
1615
1616 /* Now we can split the query in arguments */
1617 if (sdslen(query) == 0) {
1618 /* Ignore empty query */
1619 sdsfree(query);
1620 return;
1621 }
1622 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1623 sdsfree(query);
1624
1625 if (c->argv) zfree(c->argv);
1626 c->argv = zmalloc(sizeof(robj*)*argc);
1627
1628 for (j = 0; j < argc; j++) {
1629 if (sdslen(argv[j])) {
1630 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1631 c->argc++;
1632 } else {
1633 sdsfree(argv[j]);
1634 }
1635 }
1636 zfree(argv);
1637 /* Execute the command. If the client is still valid
1638 * after processCommand() return and there is something
1639 * on the query buffer try to process the next command. */
1640 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1641 return;
1642 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1643 redisLog(REDIS_DEBUG, "Client protocol error");
1644 freeClient(c);
1645 return;
1646 }
1647 } else {
1648 /* Bulk read handling. Note that if we are at this point
1649 the client already sent a command terminated with a newline,
1650 we are reading the bulk data that is actually the last
1651 argument of the command. */
1652 int qbl = sdslen(c->querybuf);
1653
1654 if (c->bulklen <= qbl) {
1655 /* Copy everything but the final CRLF as final argument */
1656 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1657 c->argc++;
1658 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1659 /* Process the command. If the client is still valid after
1660 * the processing and there is more data in the buffer
1661 * try to parse it. */
1662 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1663 return;
1664 }
1665 }
1666 }
1667
1668 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1669 redisClient *c = (redisClient*) privdata;
1670 char buf[REDIS_IOBUF_LEN];
1671 int nread;
1672 REDIS_NOTUSED(el);
1673 REDIS_NOTUSED(mask);
1674
1675 nread = read(fd, buf, REDIS_IOBUF_LEN);
1676 if (nread == -1) {
1677 if (errno == EAGAIN) {
1678 nread = 0;
1679 } else {
1680 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1681 freeClient(c);
1682 return;
1683 }
1684 } else if (nread == 0) {
1685 redisLog(REDIS_DEBUG, "Client closed connection");
1686 freeClient(c);
1687 return;
1688 }
1689 if (nread) {
1690 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1691 c->lastinteraction = time(NULL);
1692 } else {
1693 return;
1694 }
1695 processInputBuffer(c);
1696 }
1697
1698 static int selectDb(redisClient *c, int id) {
1699 if (id < 0 || id >= server.dbnum)
1700 return REDIS_ERR;
1701 c->db = &server.db[id];
1702 return REDIS_OK;
1703 }
1704
1705 static void *dupClientReplyValue(void *o) {
1706 incrRefCount((robj*)o);
1707 return 0;
1708 }
1709
1710 static redisClient *createClient(int fd) {
1711 redisClient *c = zmalloc(sizeof(*c));
1712
1713 anetNonBlock(NULL,fd);
1714 anetTcpNoDelay(NULL,fd);
1715 if (!c) return NULL;
1716 selectDb(c,0);
1717 c->fd = fd;
1718 c->querybuf = sdsempty();
1719 c->argc = 0;
1720 c->argv = NULL;
1721 c->bulklen = -1;
1722 c->multibulk = 0;
1723 c->mbargc = 0;
1724 c->mbargv = NULL;
1725 c->sentlen = 0;
1726 c->flags = 0;
1727 c->lastinteraction = time(NULL);
1728 c->authenticated = 0;
1729 c->replstate = REDIS_REPL_NONE;
1730 c->reply = listCreate();
1731 listSetFreeMethod(c->reply,decrRefCount);
1732 listSetDupMethod(c->reply,dupClientReplyValue);
1733 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1734 readQueryFromClient, c, NULL) == AE_ERR) {
1735 freeClient(c);
1736 return NULL;
1737 }
1738 listAddNodeTail(server.clients,c);
1739 return c;
1740 }
1741
1742 static void addReply(redisClient *c, robj *obj) {
1743 if (listLength(c->reply) == 0 &&
1744 (c->replstate == REDIS_REPL_NONE ||
1745 c->replstate == REDIS_REPL_ONLINE) &&
1746 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1747 sendReplyToClient, c, NULL) == AE_ERR) return;
1748 if (obj->encoding != REDIS_ENCODING_RAW) {
1749 obj = getDecodedObject(obj);
1750 } else {
1751 incrRefCount(obj);
1752 }
1753 listAddNodeTail(c->reply,obj);
1754 }
1755
1756 static void addReplySds(redisClient *c, sds s) {
1757 robj *o = createObject(REDIS_STRING,s);
1758 addReply(c,o);
1759 decrRefCount(o);
1760 }
1761
1762 static void addReplyBulkLen(redisClient *c, robj *obj) {
1763 size_t len;
1764
1765 if (obj->encoding == REDIS_ENCODING_RAW) {
1766 len = sdslen(obj->ptr);
1767 } else {
1768 long n = (long)obj->ptr;
1769
1770 len = 1;
1771 if (n < 0) {
1772 len++;
1773 n = -n;
1774 }
1775 while((n = n/10) != 0) {
1776 len++;
1777 }
1778 }
1779 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1780 }
1781
1782 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1783 int cport, cfd;
1784 char cip[128];
1785 redisClient *c;
1786 REDIS_NOTUSED(el);
1787 REDIS_NOTUSED(mask);
1788 REDIS_NOTUSED(privdata);
1789
1790 cfd = anetAccept(server.neterr, fd, cip, &cport);
1791 if (cfd == AE_ERR) {
1792 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1793 return;
1794 }
1795 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1796 if ((c = createClient(cfd)) == NULL) {
1797 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1798 close(cfd); /* May be already closed, just ingore errors */
1799 return;
1800 }
1801 /* If maxclient directive is set and this is one client more... close the
1802 * connection. Note that we create the client instead to check before
1803 * for this condition, since now the socket is already set in nonblocking
1804 * mode and we can send an error for free using the Kernel I/O */
1805 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1806 char *err = "-ERR max number of clients reached\r\n";
1807
1808 /* That's a best effort error message, don't check write errors */
1809 (void) write(c->fd,err,strlen(err));
1810 freeClient(c);
1811 return;
1812 }
1813 server.stat_numconnections++;
1814 }
1815
1816 /* ======================= Redis objects implementation ===================== */
1817
1818 static robj *createObject(int type, void *ptr) {
1819 robj *o;
1820
1821 if (listLength(server.objfreelist)) {
1822 listNode *head = listFirst(server.objfreelist);
1823 o = listNodeValue(head);
1824 listDelNode(server.objfreelist,head);
1825 } else {
1826 o = zmalloc(sizeof(*o));
1827 }
1828 o->type = type;
1829 o->encoding = REDIS_ENCODING_RAW;
1830 o->ptr = ptr;
1831 o->refcount = 1;
1832 return o;
1833 }
1834
1835 static robj *createStringObject(char *ptr, size_t len) {
1836 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1837 }
1838
1839 static robj *createListObject(void) {
1840 list *l = listCreate();
1841
1842 listSetFreeMethod(l,decrRefCount);
1843 return createObject(REDIS_LIST,l);
1844 }
1845
1846 static robj *createSetObject(void) {
1847 dict *d = dictCreate(&setDictType,NULL);
1848 return createObject(REDIS_SET,d);
1849 }
1850
1851 static robj *createZsetObject(void) {
1852 zset *zs = zmalloc(sizeof(*zs));
1853
1854 zs->dict = dictCreate(&zsetDictType,NULL);
1855 zs->zsl = zslCreate();
1856 return createObject(REDIS_ZSET,zs);
1857 }
1858
1859 static void freeStringObject(robj *o) {
1860 if (o->encoding == REDIS_ENCODING_RAW) {
1861 sdsfree(o->ptr);
1862 }
1863 }
1864
1865 static void freeListObject(robj *o) {
1866 listRelease((list*) o->ptr);
1867 }
1868
1869 static void freeSetObject(robj *o) {
1870 dictRelease((dict*) o->ptr);
1871 }
1872
1873 static void freeZsetObject(robj *o) {
1874 zset *zs = o->ptr;
1875
1876 dictRelease(zs->dict);
1877 zslFree(zs->zsl);
1878 zfree(zs);
1879 }
1880
1881 static void freeHashObject(robj *o) {
1882 dictRelease((dict*) o->ptr);
1883 }
1884
1885 static void incrRefCount(robj *o) {
1886 o->refcount++;
1887 #ifdef DEBUG_REFCOUNT
1888 if (o->type == REDIS_STRING)
1889 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1890 #endif
1891 }
1892
1893 static void decrRefCount(void *obj) {
1894 robj *o = obj;
1895
1896 #ifdef DEBUG_REFCOUNT
1897 if (o->type == REDIS_STRING)
1898 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1899 #endif
1900 if (--(o->refcount) == 0) {
1901 switch(o->type) {
1902 case REDIS_STRING: freeStringObject(o); break;
1903 case REDIS_LIST: freeListObject(o); break;
1904 case REDIS_SET: freeSetObject(o); break;
1905 case REDIS_ZSET: freeZsetObject(o); break;
1906 case REDIS_HASH: freeHashObject(o); break;
1907 default: assert(0 != 0); break;
1908 }
1909 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1910 !listAddNodeHead(server.objfreelist,o))
1911 zfree(o);
1912 }
1913 }
1914
1915 static robj *lookupKey(redisDb *db, robj *key) {
1916 dictEntry *de = dictFind(db->dict,key);
1917 return de ? dictGetEntryVal(de) : NULL;
1918 }
1919
1920 static robj *lookupKeyRead(redisDb *db, robj *key) {
1921 expireIfNeeded(db,key);
1922 return lookupKey(db,key);
1923 }
1924
1925 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1926 deleteIfVolatile(db,key);
1927 return lookupKey(db,key);
1928 }
1929
1930 static int deleteKey(redisDb *db, robj *key) {
1931 int retval;
1932
1933 /* We need to protect key from destruction: after the first dictDelete()
1934 * it may happen that 'key' is no longer valid if we don't increment
1935 * it's count. This may happen when we get the object reference directly
1936 * from the hash table with dictRandomKey() or dict iterators */
1937 incrRefCount(key);
1938 if (dictSize(db->expires)) dictDelete(db->expires,key);
1939 retval = dictDelete(db->dict,key);
1940 decrRefCount(key);
1941
1942 return retval == DICT_OK;
1943 }
1944
1945 /* Try to share an object against the shared objects pool */
1946 static robj *tryObjectSharing(robj *o) {
1947 struct dictEntry *de;
1948 unsigned long c;
1949
1950 if (o == NULL || server.shareobjects == 0) return o;
1951
1952 assert(o->type == REDIS_STRING);
1953 de = dictFind(server.sharingpool,o);
1954 if (de) {
1955 robj *shared = dictGetEntryKey(de);
1956
1957 c = ((unsigned long) dictGetEntryVal(de))+1;
1958 dictGetEntryVal(de) = (void*) c;
1959 incrRefCount(shared);
1960 decrRefCount(o);
1961 return shared;
1962 } else {
1963 /* Here we are using a stream algorihtm: Every time an object is
1964 * shared we increment its count, everytime there is a miss we
1965 * recrement the counter of a random object. If this object reaches
1966 * zero we remove the object and put the current object instead. */
1967 if (dictSize(server.sharingpool) >=
1968 server.sharingpoolsize) {
1969 de = dictGetRandomKey(server.sharingpool);
1970 assert(de != NULL);
1971 c = ((unsigned long) dictGetEntryVal(de))-1;
1972 dictGetEntryVal(de) = (void*) c;
1973 if (c == 0) {
1974 dictDelete(server.sharingpool,de->key);
1975 }
1976 } else {
1977 c = 0; /* If the pool is empty we want to add this object */
1978 }
1979 if (c == 0) {
1980 int retval;
1981
1982 retval = dictAdd(server.sharingpool,o,(void*)1);
1983 assert(retval == DICT_OK);
1984 incrRefCount(o);
1985 }
1986 return o;
1987 }
1988 }
1989
1990 /* Check if the nul-terminated string 's' can be represented by a long
1991 * (that is, is a number that fits into long without any other space or
1992 * character before or after the digits).
1993 *
1994 * If so, the function returns REDIS_OK and *longval is set to the value
1995 * of the number. Otherwise REDIS_ERR is returned */
1996 static int isStringRepresentableAsLong(sds s, long *longval) {
1997 char buf[32], *endptr;
1998 long value;
1999 int slen;
2000
2001 value = strtol(s, &endptr, 10);
2002 if (endptr[0] != '\0') return REDIS_ERR;
2003 slen = snprintf(buf,32,"%ld",value);
2004
2005 /* If the number converted back into a string is not identical
2006 * then it's not possible to encode the string as integer */
2007 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2008 if (longval) *longval = value;
2009 return REDIS_OK;
2010 }
2011
2012 /* Try to encode a string object in order to save space */
2013 static int tryObjectEncoding(robj *o) {
2014 long value;
2015 sds s = o->ptr;
2016
2017 if (o->encoding != REDIS_ENCODING_RAW)
2018 return REDIS_ERR; /* Already encoded */
2019
2020 /* It's not save to encode shared objects: shared objects can be shared
2021 * everywhere in the "object space" of Redis. Encoded objects can only
2022 * appear as "values" (and not, for instance, as keys) */
2023 if (o->refcount > 1) return REDIS_ERR;
2024
2025 /* Currently we try to encode only strings */
2026 assert(o->type == REDIS_STRING);
2027
2028 /* Check if we can represent this string as a long integer */
2029 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2030
2031 /* Ok, this object can be encoded */
2032 o->encoding = REDIS_ENCODING_INT;
2033 sdsfree(o->ptr);
2034 o->ptr = (void*) value;
2035 return REDIS_OK;
2036 }
2037
2038 /* Get a decoded version of an encoded object (returned as a new object) */
2039 static robj *getDecodedObject(const robj *o) {
2040 robj *dec;
2041
2042 assert(o->encoding != REDIS_ENCODING_RAW);
2043 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2044 char buf[32];
2045
2046 snprintf(buf,32,"%ld",(long)o->ptr);
2047 dec = createStringObject(buf,strlen(buf));
2048 return dec;
2049 } else {
2050 assert(1 != 1);
2051 }
2052 }
2053
2054 static int compareStringObjects(robj *a, robj *b) {
2055 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2056
2057 if (a == b) return 0;
2058 if (a->encoding == REDIS_ENCODING_INT && b->encoding == REDIS_ENCODING_INT){
2059 return (long)a->ptr - (long)b->ptr;
2060 } else {
2061 int retval;
2062
2063 incrRefCount(a);
2064 incrRefCount(b);
2065 if (a->encoding != REDIS_ENCODING_RAW) a = getDecodedObject(a);
2066 if (b->encoding != REDIS_ENCODING_RAW) b = getDecodedObject(a);
2067 retval = sdscmp(a->ptr,b->ptr);
2068 decrRefCount(a);
2069 decrRefCount(b);
2070 return retval;
2071 }
2072 }
2073
2074 static size_t stringObjectLen(robj *o) {
2075 assert(o->type == REDIS_STRING);
2076 if (o->encoding == REDIS_ENCODING_RAW) {
2077 return sdslen(o->ptr);
2078 } else {
2079 char buf[32];
2080
2081 return snprintf(buf,32,"%ld",(long)o->ptr);
2082 }
2083 }
2084
2085 /*============================ DB saving/loading ============================ */
2086
2087 static int rdbSaveType(FILE *fp, unsigned char type) {
2088 if (fwrite(&type,1,1,fp) == 0) return -1;
2089 return 0;
2090 }
2091
2092 static int rdbSaveTime(FILE *fp, time_t t) {
2093 int32_t t32 = (int32_t) t;
2094 if (fwrite(&t32,4,1,fp) == 0) return -1;
2095 return 0;
2096 }
2097
2098 /* check rdbLoadLen() comments for more info */
2099 static int rdbSaveLen(FILE *fp, uint32_t len) {
2100 unsigned char buf[2];
2101
2102 if (len < (1<<6)) {
2103 /* Save a 6 bit len */
2104 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2105 if (fwrite(buf,1,1,fp) == 0) return -1;
2106 } else if (len < (1<<14)) {
2107 /* Save a 14 bit len */
2108 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2109 buf[1] = len&0xFF;
2110 if (fwrite(buf,2,1,fp) == 0) return -1;
2111 } else {
2112 /* Save a 32 bit len */
2113 buf[0] = (REDIS_RDB_32BITLEN<<6);
2114 if (fwrite(buf,1,1,fp) == 0) return -1;
2115 len = htonl(len);
2116 if (fwrite(&len,4,1,fp) == 0) return -1;
2117 }
2118 return 0;
2119 }
2120
2121 /* String objects in the form "2391" "-100" without any space and with a
2122 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2123 * encoded as integers to save space */
2124 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2125 long long value;
2126 char *endptr, buf[32];
2127
2128 /* Check if it's possible to encode this value as a number */
2129 value = strtoll(s, &endptr, 10);
2130 if (endptr[0] != '\0') return 0;
2131 snprintf(buf,32,"%lld",value);
2132
2133 /* If the number converted back into a string is not identical
2134 * then it's not possible to encode the string as integer */
2135 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2136
2137 /* Finally check if it fits in our ranges */
2138 if (value >= -(1<<7) && value <= (1<<7)-1) {
2139 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2140 enc[1] = value&0xFF;
2141 return 2;
2142 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2143 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2144 enc[1] = value&0xFF;
2145 enc[2] = (value>>8)&0xFF;
2146 return 3;
2147 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2148 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2149 enc[1] = value&0xFF;
2150 enc[2] = (value>>8)&0xFF;
2151 enc[3] = (value>>16)&0xFF;
2152 enc[4] = (value>>24)&0xFF;
2153 return 5;
2154 } else {
2155 return 0;
2156 }
2157 }
2158
2159 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2160 unsigned int comprlen, outlen;
2161 unsigned char byte;
2162 void *out;
2163
2164 /* We require at least four bytes compression for this to be worth it */
2165 outlen = sdslen(obj->ptr)-4;
2166 if (outlen <= 0) return 0;
2167 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2168 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2169 if (comprlen == 0) {
2170 zfree(out);
2171 return 0;
2172 }
2173 /* Data compressed! Let's save it on disk */
2174 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2175 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2176 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2177 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2178 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2179 zfree(out);
2180 return comprlen;
2181
2182 writeerr:
2183 zfree(out);
2184 return -1;
2185 }
2186
2187 /* Save a string objet as [len][data] on disk. If the object is a string
2188 * representation of an integer value we try to safe it in a special form */
2189 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2190 size_t len;
2191 int enclen;
2192
2193 len = sdslen(obj->ptr);
2194
2195 /* Try integer encoding */
2196 if (len <= 11) {
2197 unsigned char buf[5];
2198 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2199 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2200 return 0;
2201 }
2202 }
2203
2204 /* Try LZF compression - under 20 bytes it's unable to compress even
2205 * aaaaaaaaaaaaaaaaaa so skip it */
2206 if (len > 20) {
2207 int retval;
2208
2209 retval = rdbSaveLzfStringObject(fp,obj);
2210 if (retval == -1) return -1;
2211 if (retval > 0) return 0;
2212 /* retval == 0 means data can't be compressed, save the old way */
2213 }
2214
2215 /* Store verbatim */
2216 if (rdbSaveLen(fp,len) == -1) return -1;
2217 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2218 return 0;
2219 }
2220
2221 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2222 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2223 int retval;
2224 robj *dec;
2225
2226 if (obj->encoding != REDIS_ENCODING_RAW) {
2227 dec = getDecodedObject(obj);
2228 retval = rdbSaveStringObjectRaw(fp,dec);
2229 decrRefCount(dec);
2230 return retval;
2231 } else {
2232 return rdbSaveStringObjectRaw(fp,obj);
2233 }
2234 }
2235
2236 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2237 static int rdbSave(char *filename) {
2238 dictIterator *di = NULL;
2239 dictEntry *de;
2240 FILE *fp;
2241 char tmpfile[256];
2242 int j;
2243 time_t now = time(NULL);
2244
2245 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2246 fp = fopen(tmpfile,"w");
2247 if (!fp) {
2248 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2249 return REDIS_ERR;
2250 }
2251 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2252 for (j = 0; j < server.dbnum; j++) {
2253 redisDb *db = server.db+j;
2254 dict *d = db->dict;
2255 if (dictSize(d) == 0) continue;
2256 di = dictGetIterator(d);
2257 if (!di) {
2258 fclose(fp);
2259 return REDIS_ERR;
2260 }
2261
2262 /* Write the SELECT DB opcode */
2263 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2264 if (rdbSaveLen(fp,j) == -1) goto werr;
2265
2266 /* Iterate this DB writing every entry */
2267 while((de = dictNext(di)) != NULL) {
2268 robj *key = dictGetEntryKey(de);
2269 robj *o = dictGetEntryVal(de);
2270 time_t expiretime = getExpire(db,key);
2271
2272 /* Save the expire time */
2273 if (expiretime != -1) {
2274 /* If this key is already expired skip it */
2275 if (expiretime < now) continue;
2276 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2277 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2278 }
2279 /* Save the key and associated value */
2280 if (rdbSaveType(fp,o->type) == -1) goto werr;
2281 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2282 if (o->type == REDIS_STRING) {
2283 /* Save a string value */
2284 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2285 } else if (o->type == REDIS_LIST) {
2286 /* Save a list value */
2287 list *list = o->ptr;
2288 listNode *ln;
2289
2290 listRewind(list);
2291 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2292 while((ln = listYield(list))) {
2293 robj *eleobj = listNodeValue(ln);
2294
2295 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2296 }
2297 } else if (o->type == REDIS_SET) {
2298 /* Save a set value */
2299 dict *set = o->ptr;
2300 dictIterator *di = dictGetIterator(set);
2301 dictEntry *de;
2302
2303 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2304 while((de = dictNext(di)) != NULL) {
2305 robj *eleobj = dictGetEntryKey(de);
2306
2307 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2308 }
2309 dictReleaseIterator(di);
2310 } else {
2311 assert(0 != 0);
2312 }
2313 }
2314 dictReleaseIterator(di);
2315 }
2316 /* EOF opcode */
2317 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2318
2319 /* Make sure data will not remain on the OS's output buffers */
2320 fflush(fp);
2321 fsync(fileno(fp));
2322 fclose(fp);
2323
2324 /* Use RENAME to make sure the DB file is changed atomically only
2325 * if the generate DB file is ok. */
2326 if (rename(tmpfile,filename) == -1) {
2327 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
2328 unlink(tmpfile);
2329 return REDIS_ERR;
2330 }
2331 redisLog(REDIS_NOTICE,"DB saved on disk");
2332 server.dirty = 0;
2333 server.lastsave = time(NULL);
2334 return REDIS_OK;
2335
2336 werr:
2337 fclose(fp);
2338 unlink(tmpfile);
2339 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2340 if (di) dictReleaseIterator(di);
2341 return REDIS_ERR;
2342 }
2343
2344 static int rdbSaveBackground(char *filename) {
2345 pid_t childpid;
2346
2347 if (server.bgsaveinprogress) return REDIS_ERR;
2348 if ((childpid = fork()) == 0) {
2349 /* Child */
2350 close(server.fd);
2351 if (rdbSave(filename) == REDIS_OK) {
2352 exit(0);
2353 } else {
2354 exit(1);
2355 }
2356 } else {
2357 /* Parent */
2358 if (childpid == -1) {
2359 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2360 strerror(errno));
2361 return REDIS_ERR;
2362 }
2363 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2364 server.bgsaveinprogress = 1;
2365 server.bgsavechildpid = childpid;
2366 return REDIS_OK;
2367 }
2368 return REDIS_OK; /* unreached */
2369 }
2370
2371 static void rdbRemoveTempFile(pid_t childpid) {
2372 char tmpfile[256];
2373
2374 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2375 unlink(tmpfile);
2376 }
2377
2378 static int rdbLoadType(FILE *fp) {
2379 unsigned char type;
2380 if (fread(&type,1,1,fp) == 0) return -1;
2381 return type;
2382 }
2383
2384 static time_t rdbLoadTime(FILE *fp) {
2385 int32_t t32;
2386 if (fread(&t32,4,1,fp) == 0) return -1;
2387 return (time_t) t32;
2388 }
2389
2390 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2391 * of this file for a description of how this are stored on disk.
2392 *
2393 * isencoded is set to 1 if the readed length is not actually a length but
2394 * an "encoding type", check the above comments for more info */
2395 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2396 unsigned char buf[2];
2397 uint32_t len;
2398
2399 if (isencoded) *isencoded = 0;
2400 if (rdbver == 0) {
2401 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2402 return ntohl(len);
2403 } else {
2404 int type;
2405
2406 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2407 type = (buf[0]&0xC0)>>6;
2408 if (type == REDIS_RDB_6BITLEN) {
2409 /* Read a 6 bit len */
2410 return buf[0]&0x3F;
2411 } else if (type == REDIS_RDB_ENCVAL) {
2412 /* Read a 6 bit len encoding type */
2413 if (isencoded) *isencoded = 1;
2414 return buf[0]&0x3F;
2415 } else if (type == REDIS_RDB_14BITLEN) {
2416 /* Read a 14 bit len */
2417 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2418 return ((buf[0]&0x3F)<<8)|buf[1];
2419 } else {
2420 /* Read a 32 bit len */
2421 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2422 return ntohl(len);
2423 }
2424 }
2425 }
2426
2427 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2428 unsigned char enc[4];
2429 long long val;
2430
2431 if (enctype == REDIS_RDB_ENC_INT8) {
2432 if (fread(enc,1,1,fp) == 0) return NULL;
2433 val = (signed char)enc[0];
2434 } else if (enctype == REDIS_RDB_ENC_INT16) {
2435 uint16_t v;
2436 if (fread(enc,2,1,fp) == 0) return NULL;
2437 v = enc[0]|(enc[1]<<8);
2438 val = (int16_t)v;
2439 } else if (enctype == REDIS_RDB_ENC_INT32) {
2440 uint32_t v;
2441 if (fread(enc,4,1,fp) == 0) return NULL;
2442 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2443 val = (int32_t)v;
2444 } else {
2445 val = 0; /* anti-warning */
2446 assert(0!=0);
2447 }
2448 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2449 }
2450
2451 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2452 unsigned int len, clen;
2453 unsigned char *c = NULL;
2454 sds val = NULL;
2455
2456 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2457 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2458 if ((c = zmalloc(clen)) == NULL) goto err;
2459 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2460 if (fread(c,clen,1,fp) == 0) goto err;
2461 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2462 zfree(c);
2463 return createObject(REDIS_STRING,val);
2464 err:
2465 zfree(c);
2466 sdsfree(val);
2467 return NULL;
2468 }
2469
2470 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2471 int isencoded;
2472 uint32_t len;
2473 sds val;
2474
2475 len = rdbLoadLen(fp,rdbver,&isencoded);
2476 if (isencoded) {
2477 switch(len) {
2478 case REDIS_RDB_ENC_INT8:
2479 case REDIS_RDB_ENC_INT16:
2480 case REDIS_RDB_ENC_INT32:
2481 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2482 case REDIS_RDB_ENC_LZF:
2483 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2484 default:
2485 assert(0!=0);
2486 }
2487 }
2488
2489 if (len == REDIS_RDB_LENERR) return NULL;
2490 val = sdsnewlen(NULL,len);
2491 if (len && fread(val,len,1,fp) == 0) {
2492 sdsfree(val);
2493 return NULL;
2494 }
2495 return tryObjectSharing(createObject(REDIS_STRING,val));
2496 }
2497
2498 static int rdbLoad(char *filename) {
2499 FILE *fp;
2500 robj *keyobj = NULL;
2501 uint32_t dbid;
2502 int type, retval, rdbver;
2503 dict *d = server.db[0].dict;
2504 redisDb *db = server.db+0;
2505 char buf[1024];
2506 time_t expiretime = -1, now = time(NULL);
2507
2508 fp = fopen(filename,"r");
2509 if (!fp) return REDIS_ERR;
2510 if (fread(buf,9,1,fp) == 0) goto eoferr;
2511 buf[9] = '\0';
2512 if (memcmp(buf,"REDIS",5) != 0) {
2513 fclose(fp);
2514 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2515 return REDIS_ERR;
2516 }
2517 rdbver = atoi(buf+5);
2518 if (rdbver > 1) {
2519 fclose(fp);
2520 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2521 return REDIS_ERR;
2522 }
2523 while(1) {
2524 robj *o;
2525
2526 /* Read type. */
2527 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2528 if (type == REDIS_EXPIRETIME) {
2529 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2530 /* We read the time so we need to read the object type again */
2531 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2532 }
2533 if (type == REDIS_EOF) break;
2534 /* Handle SELECT DB opcode as a special case */
2535 if (type == REDIS_SELECTDB) {
2536 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2537 goto eoferr;
2538 if (dbid >= (unsigned)server.dbnum) {
2539 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2540 exit(1);
2541 }
2542 db = server.db+dbid;
2543 d = db->dict;
2544 continue;
2545 }
2546 /* Read key */
2547 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2548
2549 if (type == REDIS_STRING) {
2550 /* Read string value */
2551 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2552 tryObjectEncoding(o);
2553 } else if (type == REDIS_LIST || type == REDIS_SET) {
2554 /* Read list/set value */
2555 uint32_t listlen;
2556
2557 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2558 goto eoferr;
2559 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2560 /* Load every single element of the list/set */
2561 while(listlen--) {
2562 robj *ele;
2563
2564 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2565 tryObjectEncoding(ele);
2566 if (type == REDIS_LIST) {
2567 listAddNodeTail((list*)o->ptr,ele);
2568 } else {
2569 dictAdd((dict*)o->ptr,ele,NULL);
2570 }
2571 }
2572 } else {
2573 assert(0 != 0);
2574 }
2575 /* Add the new object in the hash table */
2576 retval = dictAdd(d,keyobj,o);
2577 if (retval == DICT_ERR) {
2578 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2579 exit(1);
2580 }
2581 /* Set the expire time if needed */
2582 if (expiretime != -1) {
2583 setExpire(db,keyobj,expiretime);
2584 /* Delete this key if already expired */
2585 if (expiretime < now) deleteKey(db,keyobj);
2586 expiretime = -1;
2587 }
2588 keyobj = o = NULL;
2589 }
2590 fclose(fp);
2591 return REDIS_OK;
2592
2593 eoferr: /* unexpected end of file is handled here with a fatal exit */
2594 if (keyobj) decrRefCount(keyobj);
2595 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2596 exit(1);
2597 return REDIS_ERR; /* Just to avoid warning */
2598 }
2599
2600 /*================================== Commands =============================== */
2601
2602 static void authCommand(redisClient *c) {
2603 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2604 c->authenticated = 1;
2605 addReply(c,shared.ok);
2606 } else {
2607 c->authenticated = 0;
2608 addReply(c,shared.err);
2609 }
2610 }
2611
2612 static void pingCommand(redisClient *c) {
2613 addReply(c,shared.pong);
2614 }
2615
2616 static void echoCommand(redisClient *c) {
2617 addReplyBulkLen(c,c->argv[1]);
2618 addReply(c,c->argv[1]);
2619 addReply(c,shared.crlf);
2620 }
2621
2622 /*=================================== Strings =============================== */
2623
2624 static void setGenericCommand(redisClient *c, int nx) {
2625 int retval;
2626
2627 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2628 if (retval == DICT_ERR) {
2629 if (!nx) {
2630 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2631 incrRefCount(c->argv[2]);
2632 } else {
2633 addReply(c,shared.czero);
2634 return;
2635 }
2636 } else {
2637 incrRefCount(c->argv[1]);
2638 incrRefCount(c->argv[2]);
2639 }
2640 server.dirty++;
2641 removeExpire(c->db,c->argv[1]);
2642 addReply(c, nx ? shared.cone : shared.ok);
2643 }
2644
2645 static void setCommand(redisClient *c) {
2646 setGenericCommand(c,0);
2647 }
2648
2649 static void setnxCommand(redisClient *c) {
2650 setGenericCommand(c,1);
2651 }
2652
2653 static void getCommand(redisClient *c) {
2654 robj *o = lookupKeyRead(c->db,c->argv[1]);
2655
2656 if (o == NULL) {
2657 addReply(c,shared.nullbulk);
2658 } else {
2659 if (o->type != REDIS_STRING) {
2660 addReply(c,shared.wrongtypeerr);
2661 } else {
2662 addReplyBulkLen(c,o);
2663 addReply(c,o);
2664 addReply(c,shared.crlf);
2665 }
2666 }
2667 }
2668
2669 static void getsetCommand(redisClient *c) {
2670 getCommand(c);
2671 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2672 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2673 } else {
2674 incrRefCount(c->argv[1]);
2675 }
2676 incrRefCount(c->argv[2]);
2677 server.dirty++;
2678 removeExpire(c->db,c->argv[1]);
2679 }
2680
2681 static void mgetCommand(redisClient *c) {
2682 int j;
2683
2684 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2685 for (j = 1; j < c->argc; j++) {
2686 robj *o = lookupKeyRead(c->db,c->argv[j]);
2687 if (o == NULL) {
2688 addReply(c,shared.nullbulk);
2689 } else {
2690 if (o->type != REDIS_STRING) {
2691 addReply(c,shared.nullbulk);
2692 } else {
2693 addReplyBulkLen(c,o);
2694 addReply(c,o);
2695 addReply(c,shared.crlf);
2696 }
2697 }
2698 }
2699 }
2700
2701 static void incrDecrCommand(redisClient *c, long long incr) {
2702 long long value;
2703 int retval;
2704 robj *o;
2705
2706 o = lookupKeyWrite(c->db,c->argv[1]);
2707 if (o == NULL) {
2708 value = 0;
2709 } else {
2710 if (o->type != REDIS_STRING) {
2711 value = 0;
2712 } else {
2713 char *eptr;
2714
2715 if (o->encoding == REDIS_ENCODING_RAW)
2716 value = strtoll(o->ptr, &eptr, 10);
2717 else if (o->encoding == REDIS_ENCODING_INT)
2718 value = (long)o->ptr;
2719 else
2720 assert(1 != 1);
2721 }
2722 }
2723
2724 value += incr;
2725 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2726 tryObjectEncoding(o);
2727 retval = dictAdd(c->db->dict,c->argv[1],o);
2728 if (retval == DICT_ERR) {
2729 dictReplace(c->db->dict,c->argv[1],o);
2730 removeExpire(c->db,c->argv[1]);
2731 } else {
2732 incrRefCount(c->argv[1]);
2733 }
2734 server.dirty++;
2735 addReply(c,shared.colon);
2736 addReply(c,o);
2737 addReply(c,shared.crlf);
2738 }
2739
2740 static void incrCommand(redisClient *c) {
2741 incrDecrCommand(c,1);
2742 }
2743
2744 static void decrCommand(redisClient *c) {
2745 incrDecrCommand(c,-1);
2746 }
2747
2748 static void incrbyCommand(redisClient *c) {
2749 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2750 incrDecrCommand(c,incr);
2751 }
2752
2753 static void decrbyCommand(redisClient *c) {
2754 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2755 incrDecrCommand(c,-incr);
2756 }
2757
2758 /* ========================= Type agnostic commands ========================= */
2759
2760 static void delCommand(redisClient *c) {
2761 int deleted = 0, j;
2762
2763 for (j = 1; j < c->argc; j++) {
2764 if (deleteKey(c->db,c->argv[j])) {
2765 server.dirty++;
2766 deleted++;
2767 }
2768 }
2769 switch(deleted) {
2770 case 0:
2771 addReply(c,shared.czero);
2772 break;
2773 case 1:
2774 addReply(c,shared.cone);
2775 break;
2776 default:
2777 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2778 break;
2779 }
2780 }
2781
2782 static void existsCommand(redisClient *c) {
2783 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2784 }
2785
2786 static void selectCommand(redisClient *c) {
2787 int id = atoi(c->argv[1]->ptr);
2788
2789 if (selectDb(c,id) == REDIS_ERR) {
2790 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2791 } else {
2792 addReply(c,shared.ok);
2793 }
2794 }
2795
2796 static void randomkeyCommand(redisClient *c) {
2797 dictEntry *de;
2798
2799 while(1) {
2800 de = dictGetRandomKey(c->db->dict);
2801 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2802 }
2803 if (de == NULL) {
2804 addReply(c,shared.plus);
2805 addReply(c,shared.crlf);
2806 } else {
2807 addReply(c,shared.plus);
2808 addReply(c,dictGetEntryKey(de));
2809 addReply(c,shared.crlf);
2810 }
2811 }
2812
2813 static void keysCommand(redisClient *c) {
2814 dictIterator *di;
2815 dictEntry *de;
2816 sds pattern = c->argv[1]->ptr;
2817 int plen = sdslen(pattern);
2818 int numkeys = 0, keyslen = 0;
2819 robj *lenobj = createObject(REDIS_STRING,NULL);
2820
2821 di = dictGetIterator(c->db->dict);
2822 addReply(c,lenobj);
2823 decrRefCount(lenobj);
2824 while((de = dictNext(di)) != NULL) {
2825 robj *keyobj = dictGetEntryKey(de);
2826
2827 sds key = keyobj->ptr;
2828 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2829 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2830 if (expireIfNeeded(c->db,keyobj) == 0) {
2831 if (numkeys != 0)
2832 addReply(c,shared.space);
2833 addReply(c,keyobj);
2834 numkeys++;
2835 keyslen += sdslen(key);
2836 }
2837 }
2838 }
2839 dictReleaseIterator(di);
2840 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2841 addReply(c,shared.crlf);
2842 }
2843
2844 static void dbsizeCommand(redisClient *c) {
2845 addReplySds(c,
2846 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2847 }
2848
2849 static void lastsaveCommand(redisClient *c) {
2850 addReplySds(c,
2851 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2852 }
2853
2854 static void typeCommand(redisClient *c) {
2855 robj *o;
2856 char *type;
2857
2858 o = lookupKeyRead(c->db,c->argv[1]);
2859 if (o == NULL) {
2860 type = "+none";
2861 } else {
2862 switch(o->type) {
2863 case REDIS_STRING: type = "+string"; break;
2864 case REDIS_LIST: type = "+list"; break;
2865 case REDIS_SET: type = "+set"; break;
2866 default: type = "unknown"; break;
2867 }
2868 }
2869 addReplySds(c,sdsnew(type));
2870 addReply(c,shared.crlf);
2871 }
2872
2873 static void saveCommand(redisClient *c) {
2874 if (server.bgsaveinprogress) {
2875 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2876 return;
2877 }
2878 if (rdbSave(server.dbfilename) == REDIS_OK) {
2879 addReply(c,shared.ok);
2880 } else {
2881 addReply(c,shared.err);
2882 }
2883 }
2884
2885 static void bgsaveCommand(redisClient *c) {
2886 if (server.bgsaveinprogress) {
2887 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2888 return;
2889 }
2890 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2891 addReply(c,shared.ok);
2892 } else {
2893 addReply(c,shared.err);
2894 }
2895 }
2896
2897 static void shutdownCommand(redisClient *c) {
2898 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2899 /* Kill the saving child if there is a background saving in progress.
2900 We want to avoid race conditions, for instance our saving child may
2901 overwrite the synchronous saving did by SHUTDOWN. */
2902 if (server.bgsaveinprogress) {
2903 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2904 kill(server.bgsavechildpid,SIGKILL);
2905 rdbRemoveTempFile(server.bgsavechildpid);
2906 }
2907 /* SYNC SAVE */
2908 if (rdbSave(server.dbfilename) == REDIS_OK) {
2909 if (server.daemonize)
2910 unlink(server.pidfile);
2911 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2912 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2913 exit(1);
2914 } else {
2915 /* Ooops.. error saving! The best we can do is to continue operating.
2916 * Note that if there was a background saving process, in the next
2917 * cron() Redis will be notified that the background saving aborted,
2918 * handling special stuff like slaves pending for synchronization... */
2919 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2920 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2921 }
2922 }
2923
2924 static void renameGenericCommand(redisClient *c, int nx) {
2925 robj *o;
2926
2927 /* To use the same key as src and dst is probably an error */
2928 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2929 addReply(c,shared.sameobjecterr);
2930 return;
2931 }
2932
2933 o = lookupKeyWrite(c->db,c->argv[1]);
2934 if (o == NULL) {
2935 addReply(c,shared.nokeyerr);
2936 return;
2937 }
2938 incrRefCount(o);
2939 deleteIfVolatile(c->db,c->argv[2]);
2940 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2941 if (nx) {
2942 decrRefCount(o);
2943 addReply(c,shared.czero);
2944 return;
2945 }
2946 dictReplace(c->db->dict,c->argv[2],o);
2947 } else {
2948 incrRefCount(c->argv[2]);
2949 }
2950 deleteKey(c->db,c->argv[1]);
2951 server.dirty++;
2952 addReply(c,nx ? shared.cone : shared.ok);
2953 }
2954
2955 static void renameCommand(redisClient *c) {
2956 renameGenericCommand(c,0);
2957 }
2958
2959 static void renamenxCommand(redisClient *c) {
2960 renameGenericCommand(c,1);
2961 }
2962
2963 static void moveCommand(redisClient *c) {
2964 robj *o;
2965 redisDb *src, *dst;
2966 int srcid;
2967
2968 /* Obtain source and target DB pointers */
2969 src = c->db;
2970 srcid = c->db->id;
2971 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2972 addReply(c,shared.outofrangeerr);
2973 return;
2974 }
2975 dst = c->db;
2976 selectDb(c,srcid); /* Back to the source DB */
2977
2978 /* If the user is moving using as target the same
2979 * DB as the source DB it is probably an error. */
2980 if (src == dst) {
2981 addReply(c,shared.sameobjecterr);
2982 return;
2983 }
2984
2985 /* Check if the element exists and get a reference */
2986 o = lookupKeyWrite(c->db,c->argv[1]);
2987 if (!o) {
2988 addReply(c,shared.czero);
2989 return;
2990 }
2991
2992 /* Try to add the element to the target DB */
2993 deleteIfVolatile(dst,c->argv[1]);
2994 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2995 addReply(c,shared.czero);
2996 return;
2997 }
2998 incrRefCount(c->argv[1]);
2999 incrRefCount(o);
3000
3001 /* OK! key moved, free the entry in the source DB */
3002 deleteKey(src,c->argv[1]);
3003 server.dirty++;
3004 addReply(c,shared.cone);
3005 }
3006
3007 /* =================================== Lists ================================ */
3008 static void pushGenericCommand(redisClient *c, int where) {
3009 robj *lobj;
3010 list *list;
3011
3012 lobj = lookupKeyWrite(c->db,c->argv[1]);
3013 if (lobj == NULL) {
3014 lobj = createListObject();
3015 list = lobj->ptr;
3016 if (where == REDIS_HEAD) {
3017 listAddNodeHead(list,c->argv[2]);
3018 } else {
3019 listAddNodeTail(list,c->argv[2]);
3020 }
3021 dictAdd(c->db->dict,c->argv[1],lobj);
3022 incrRefCount(c->argv[1]);
3023 incrRefCount(c->argv[2]);
3024 } else {
3025 if (lobj->type != REDIS_LIST) {
3026 addReply(c,shared.wrongtypeerr);
3027 return;
3028 }
3029 list = lobj->ptr;
3030 if (where == REDIS_HEAD) {
3031 listAddNodeHead(list,c->argv[2]);
3032 } else {
3033 listAddNodeTail(list,c->argv[2]);
3034 }
3035 incrRefCount(c->argv[2]);
3036 }
3037 server.dirty++;
3038 addReply(c,shared.ok);
3039 }
3040
3041 static void lpushCommand(redisClient *c) {
3042 pushGenericCommand(c,REDIS_HEAD);
3043 }
3044
3045 static void rpushCommand(redisClient *c) {
3046 pushGenericCommand(c,REDIS_TAIL);
3047 }
3048
3049 static void llenCommand(redisClient *c) {
3050 robj *o;
3051 list *l;
3052
3053 o = lookupKeyRead(c->db,c->argv[1]);
3054 if (o == NULL) {
3055 addReply(c,shared.czero);
3056 return;
3057 } else {
3058 if (o->type != REDIS_LIST) {
3059 addReply(c,shared.wrongtypeerr);
3060 } else {
3061 l = o->ptr;
3062 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3063 }
3064 }
3065 }
3066
3067 static void lindexCommand(redisClient *c) {
3068 robj *o;
3069 int index = atoi(c->argv[2]->ptr);
3070
3071 o = lookupKeyRead(c->db,c->argv[1]);
3072 if (o == NULL) {
3073 addReply(c,shared.nullbulk);
3074 } else {
3075 if (o->type != REDIS_LIST) {
3076 addReply(c,shared.wrongtypeerr);
3077 } else {
3078 list *list = o->ptr;
3079 listNode *ln;
3080
3081 ln = listIndex(list, index);
3082 if (ln == NULL) {
3083 addReply(c,shared.nullbulk);
3084 } else {
3085 robj *ele = listNodeValue(ln);
3086 addReplyBulkLen(c,ele);
3087 addReply(c,ele);
3088 addReply(c,shared.crlf);
3089 }
3090 }
3091 }
3092 }
3093
3094 static void lsetCommand(redisClient *c) {
3095 robj *o;
3096 int index = atoi(c->argv[2]->ptr);
3097
3098 o = lookupKeyWrite(c->db,c->argv[1]);
3099 if (o == NULL) {
3100 addReply(c,shared.nokeyerr);
3101 } else {
3102 if (o->type != REDIS_LIST) {
3103 addReply(c,shared.wrongtypeerr);
3104 } else {
3105 list *list = o->ptr;
3106 listNode *ln;
3107
3108 ln = listIndex(list, index);
3109 if (ln == NULL) {
3110 addReply(c,shared.outofrangeerr);
3111 } else {
3112 robj *ele = listNodeValue(ln);
3113
3114 decrRefCount(ele);
3115 listNodeValue(ln) = c->argv[3];
3116 incrRefCount(c->argv[3]);
3117 addReply(c,shared.ok);
3118 server.dirty++;
3119 }
3120 }
3121 }
3122 }
3123
3124 static void popGenericCommand(redisClient *c, int where) {
3125 robj *o;
3126
3127 o = lookupKeyWrite(c->db,c->argv[1]);
3128 if (o == NULL) {
3129 addReply(c,shared.nullbulk);
3130 } else {
3131 if (o->type != REDIS_LIST) {
3132 addReply(c,shared.wrongtypeerr);
3133 } else {
3134 list *list = o->ptr;
3135 listNode *ln;
3136
3137 if (where == REDIS_HEAD)
3138 ln = listFirst(list);
3139 else
3140 ln = listLast(list);
3141
3142 if (ln == NULL) {
3143 addReply(c,shared.nullbulk);
3144 } else {
3145 robj *ele = listNodeValue(ln);
3146 addReplyBulkLen(c,ele);
3147 addReply(c,ele);
3148 addReply(c,shared.crlf);
3149 listDelNode(list,ln);
3150 server.dirty++;
3151 }
3152 }
3153 }
3154 }
3155
3156 static void lpopCommand(redisClient *c) {
3157 popGenericCommand(c,REDIS_HEAD);
3158 }
3159
3160 static void rpopCommand(redisClient *c) {
3161 popGenericCommand(c,REDIS_TAIL);
3162 }
3163
3164 static void lrangeCommand(redisClient *c) {
3165 robj *o;
3166 int start = atoi(c->argv[2]->ptr);
3167 int end = atoi(c->argv[3]->ptr);
3168
3169 o = lookupKeyRead(c->db,c->argv[1]);
3170 if (o == NULL) {
3171 addReply(c,shared.nullmultibulk);
3172 } else {
3173 if (o->type != REDIS_LIST) {
3174 addReply(c,shared.wrongtypeerr);
3175 } else {
3176 list *list = o->ptr;
3177 listNode *ln;
3178 int llen = listLength(list);
3179 int rangelen, j;
3180 robj *ele;
3181
3182 /* convert negative indexes */
3183 if (start < 0) start = llen+start;
3184 if (end < 0) end = llen+end;
3185 if (start < 0) start = 0;
3186 if (end < 0) end = 0;
3187
3188 /* indexes sanity checks */
3189 if (start > end || start >= llen) {
3190 /* Out of range start or start > end result in empty list */
3191 addReply(c,shared.emptymultibulk);
3192 return;
3193 }
3194 if (end >= llen) end = llen-1;
3195 rangelen = (end-start)+1;
3196
3197 /* Return the result in form of a multi-bulk reply */
3198 ln = listIndex(list, start);
3199 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3200 for (j = 0; j < rangelen; j++) {
3201 ele = listNodeValue(ln);
3202 addReplyBulkLen(c,ele);
3203 addReply(c,ele);
3204 addReply(c,shared.crlf);
3205 ln = ln->next;
3206 }
3207 }
3208 }
3209 }
3210
3211 static void ltrimCommand(redisClient *c) {
3212 robj *o;
3213 int start = atoi(c->argv[2]->ptr);
3214 int end = atoi(c->argv[3]->ptr);
3215
3216 o = lookupKeyWrite(c->db,c->argv[1]);
3217 if (o == NULL) {
3218 addReply(c,shared.nokeyerr);
3219 } else {
3220 if (o->type != REDIS_LIST) {
3221 addReply(c,shared.wrongtypeerr);
3222 } else {
3223 list *list = o->ptr;
3224 listNode *ln;
3225 int llen = listLength(list);
3226 int j, ltrim, rtrim;
3227
3228 /* convert negative indexes */
3229 if (start < 0) start = llen+start;
3230 if (end < 0) end = llen+end;
3231 if (start < 0) start = 0;
3232 if (end < 0) end = 0;
3233
3234 /* indexes sanity checks */
3235 if (start > end || start >= llen) {
3236 /* Out of range start or start > end result in empty list */
3237 ltrim = llen;
3238 rtrim = 0;
3239 } else {
3240 if (end >= llen) end = llen-1;
3241 ltrim = start;
3242 rtrim = llen-end-1;
3243 }
3244
3245 /* Remove list elements to perform the trim */
3246 for (j = 0; j < ltrim; j++) {
3247 ln = listFirst(list);
3248 listDelNode(list,ln);
3249 }
3250 for (j = 0; j < rtrim; j++) {
3251 ln = listLast(list);
3252 listDelNode(list,ln);
3253 }
3254 server.dirty++;
3255 addReply(c,shared.ok);
3256 }
3257 }
3258 }
3259
3260 static void lremCommand(redisClient *c) {
3261 robj *o;
3262
3263 o = lookupKeyWrite(c->db,c->argv[1]);
3264 if (o == NULL) {
3265 addReply(c,shared.czero);
3266 } else {
3267 if (o->type != REDIS_LIST) {
3268 addReply(c,shared.wrongtypeerr);
3269 } else {
3270 list *list = o->ptr;
3271 listNode *ln, *next;
3272 int toremove = atoi(c->argv[2]->ptr);
3273 int removed = 0;
3274 int fromtail = 0;
3275
3276 if (toremove < 0) {
3277 toremove = -toremove;
3278 fromtail = 1;
3279 }
3280 ln = fromtail ? list->tail : list->head;
3281 while (ln) {
3282 robj *ele = listNodeValue(ln);
3283
3284 next = fromtail ? ln->prev : ln->next;
3285 if (compareStringObjects(ele,c->argv[3]) == 0) {
3286 listDelNode(list,ln);
3287 server.dirty++;
3288 removed++;
3289 if (toremove && removed == toremove) break;
3290 }
3291 ln = next;
3292 }
3293 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3294 }
3295 }
3296 }
3297
3298 /* ==================================== Sets ================================ */
3299
3300 static void saddCommand(redisClient *c) {
3301 robj *set;
3302
3303 set = lookupKeyWrite(c->db,c->argv[1]);
3304 if (set == NULL) {
3305 set = createSetObject();
3306 dictAdd(c->db->dict,c->argv[1],set);
3307 incrRefCount(c->argv[1]);
3308 } else {
3309 if (set->type != REDIS_SET) {
3310 addReply(c,shared.wrongtypeerr);
3311 return;
3312 }
3313 }
3314 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3315 incrRefCount(c->argv[2]);
3316 server.dirty++;
3317 addReply(c,shared.cone);
3318 } else {
3319 addReply(c,shared.czero);
3320 }
3321 }
3322
3323 static void sremCommand(redisClient *c) {
3324 robj *set;
3325
3326 set = lookupKeyWrite(c->db,c->argv[1]);
3327 if (set == NULL) {
3328 addReply(c,shared.czero);
3329 } else {
3330 if (set->type != REDIS_SET) {
3331 addReply(c,shared.wrongtypeerr);
3332 return;
3333 }
3334 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3335 server.dirty++;
3336 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3337 addReply(c,shared.cone);
3338 } else {
3339 addReply(c,shared.czero);
3340 }
3341 }
3342 }
3343
3344 static void smoveCommand(redisClient *c) {
3345 robj *srcset, *dstset;
3346
3347 srcset = lookupKeyWrite(c->db,c->argv[1]);
3348 dstset = lookupKeyWrite(c->db,c->argv[2]);
3349
3350 /* If the source key does not exist return 0, if it's of the wrong type
3351 * raise an error */
3352 if (srcset == NULL || srcset->type != REDIS_SET) {
3353 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3354 return;
3355 }
3356 /* Error if the destination key is not a set as well */
3357 if (dstset && dstset->type != REDIS_SET) {
3358 addReply(c,shared.wrongtypeerr);
3359 return;
3360 }
3361 /* Remove the element from the source set */
3362 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3363 /* Key not found in the src set! return zero */
3364 addReply(c,shared.czero);
3365 return;
3366 }
3367 server.dirty++;
3368 /* Add the element to the destination set */
3369 if (!dstset) {
3370 dstset = createSetObject();
3371 dictAdd(c->db->dict,c->argv[2],dstset);
3372 incrRefCount(c->argv[2]);
3373 }
3374 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3375 incrRefCount(c->argv[3]);
3376 addReply(c,shared.cone);
3377 }
3378
3379 static void sismemberCommand(redisClient *c) {
3380 robj *set;
3381
3382 set = lookupKeyRead(c->db,c->argv[1]);
3383 if (set == NULL) {
3384 addReply(c,shared.czero);
3385 } else {
3386 if (set->type != REDIS_SET) {
3387 addReply(c,shared.wrongtypeerr);
3388 return;
3389 }
3390 if (dictFind(set->ptr,c->argv[2]))
3391 addReply(c,shared.cone);
3392 else
3393 addReply(c,shared.czero);
3394 }
3395 }
3396
3397 static void scardCommand(redisClient *c) {
3398 robj *o;
3399 dict *s;
3400
3401 o = lookupKeyRead(c->db,c->argv[1]);
3402 if (o == NULL) {
3403 addReply(c,shared.czero);
3404 return;
3405 } else {
3406 if (o->type != REDIS_SET) {
3407 addReply(c,shared.wrongtypeerr);
3408 } else {
3409 s = o->ptr;
3410 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3411 dictSize(s)));
3412 }
3413 }
3414 }
3415
3416 static void spopCommand(redisClient *c) {
3417 robj *set;
3418 dictEntry *de;
3419
3420 set = lookupKeyWrite(c->db,c->argv[1]);
3421 if (set == NULL) {
3422 addReply(c,shared.nullbulk);
3423 } else {
3424 if (set->type != REDIS_SET) {
3425 addReply(c,shared.wrongtypeerr);
3426 return;
3427 }
3428 de = dictGetRandomKey(set->ptr);
3429 if (de == NULL) {
3430 addReply(c,shared.nullbulk);
3431 } else {
3432 robj *ele = dictGetEntryKey(de);
3433
3434 addReplyBulkLen(c,ele);
3435 addReply(c,ele);
3436 addReply(c,shared.crlf);
3437 dictDelete(set->ptr,ele);
3438 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3439 server.dirty++;
3440 }
3441 }
3442 }
3443
3444 static void srandmemberCommand(redisClient *c) {
3445 robj *set;
3446 dictEntry *de;
3447
3448 set = lookupKeyRead(c->db,c->argv[1]);
3449 if (set == NULL) {
3450 addReply(c,shared.nullbulk);
3451 } else {
3452 if (set->type != REDIS_SET) {
3453 addReply(c,shared.wrongtypeerr);
3454 return;
3455 }
3456 de = dictGetRandomKey(set->ptr);
3457 if (de == NULL) {
3458 addReply(c,shared.nullbulk);
3459 } else {
3460 robj *ele = dictGetEntryKey(de);
3461
3462 addReplyBulkLen(c,ele);
3463 addReply(c,ele);
3464 addReply(c,shared.crlf);
3465 }
3466 }
3467 }
3468
3469 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3470 dict **d1 = (void*) s1, **d2 = (void*) s2;
3471
3472 return dictSize(*d1)-dictSize(*d2);
3473 }
3474
3475 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3476 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3477 dictIterator *di;
3478 dictEntry *de;
3479 robj *lenobj = NULL, *dstset = NULL;
3480 int j, cardinality = 0;
3481
3482 for (j = 0; j < setsnum; j++) {
3483 robj *setobj;
3484
3485 setobj = dstkey ?
3486 lookupKeyWrite(c->db,setskeys[j]) :
3487 lookupKeyRead(c->db,setskeys[j]);
3488 if (!setobj) {
3489 zfree(dv);
3490 if (dstkey) {
3491 deleteKey(c->db,dstkey);
3492 addReply(c,shared.ok);
3493 } else {
3494 addReply(c,shared.nullmultibulk);
3495 }
3496 return;
3497 }
3498 if (setobj->type != REDIS_SET) {
3499 zfree(dv);
3500 addReply(c,shared.wrongtypeerr);
3501 return;
3502 }
3503 dv[j] = setobj->ptr;
3504 }
3505 /* Sort sets from the smallest to largest, this will improve our
3506 * algorithm's performace */
3507 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3508
3509 /* The first thing we should output is the total number of elements...
3510 * since this is a multi-bulk write, but at this stage we don't know
3511 * the intersection set size, so we use a trick, append an empty object
3512 * to the output list and save the pointer to later modify it with the
3513 * right length */
3514 if (!dstkey) {
3515 lenobj = createObject(REDIS_STRING,NULL);
3516 addReply(c,lenobj);
3517 decrRefCount(lenobj);
3518 } else {
3519 /* If we have a target key where to store the resulting set
3520 * create this key with an empty set inside */
3521 dstset = createSetObject();
3522 }
3523
3524 /* Iterate all the elements of the first (smallest) set, and test
3525 * the element against all the other sets, if at least one set does
3526 * not include the element it is discarded */
3527 di = dictGetIterator(dv[0]);
3528
3529 while((de = dictNext(di)) != NULL) {
3530 robj *ele;
3531
3532 for (j = 1; j < setsnum; j++)
3533 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3534 if (j != setsnum)
3535 continue; /* at least one set does not contain the member */
3536 ele = dictGetEntryKey(de);
3537 if (!dstkey) {
3538 addReplyBulkLen(c,ele);
3539 addReply(c,ele);
3540 addReply(c,shared.crlf);
3541 cardinality++;
3542 } else {
3543 dictAdd(dstset->ptr,ele,NULL);
3544 incrRefCount(ele);
3545 }
3546 }
3547 dictReleaseIterator(di);
3548
3549 if (dstkey) {
3550 /* Store the resulting set into the target */
3551 deleteKey(c->db,dstkey);
3552 dictAdd(c->db->dict,dstkey,dstset);
3553 incrRefCount(dstkey);
3554 }
3555
3556 if (!dstkey) {
3557 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3558 } else {
3559 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3560 dictSize((dict*)dstset->ptr)));
3561 server.dirty++;
3562 }
3563 zfree(dv);
3564 }
3565
3566 static void sinterCommand(redisClient *c) {
3567 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3568 }
3569
3570 static void sinterstoreCommand(redisClient *c) {
3571 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3572 }
3573
3574 #define REDIS_OP_UNION 0
3575 #define REDIS_OP_DIFF 1
3576
3577 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3578 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3579 dictIterator *di;
3580 dictEntry *de;
3581 robj *dstset = NULL;
3582 int j, cardinality = 0;
3583
3584 for (j = 0; j < setsnum; j++) {
3585 robj *setobj;
3586
3587 setobj = dstkey ?
3588 lookupKeyWrite(c->db,setskeys[j]) :
3589 lookupKeyRead(c->db,setskeys[j]);
3590 if (!setobj) {
3591 dv[j] = NULL;
3592 continue;
3593 }
3594 if (setobj->type != REDIS_SET) {
3595 zfree(dv);
3596 addReply(c,shared.wrongtypeerr);
3597 return;
3598 }
3599 dv[j] = setobj->ptr;
3600 }
3601
3602 /* We need a temp set object to store our union. If the dstkey
3603 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3604 * this set object will be the resulting object to set into the target key*/
3605 dstset = createSetObject();
3606
3607 /* Iterate all the elements of all the sets, add every element a single
3608 * time to the result set */
3609 for (j = 0; j < setsnum; j++) {
3610 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3611 if (!dv[j]) continue; /* non existing keys are like empty sets */
3612
3613 di = dictGetIterator(dv[j]);
3614
3615 while((de = dictNext(di)) != NULL) {
3616 robj *ele;
3617
3618 /* dictAdd will not add the same element multiple times */
3619 ele = dictGetEntryKey(de);
3620 if (op == REDIS_OP_UNION || j == 0) {
3621 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3622 incrRefCount(ele);
3623 cardinality++;
3624 }
3625 } else if (op == REDIS_OP_DIFF) {
3626 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3627 cardinality--;
3628 }
3629 }
3630 }
3631 dictReleaseIterator(di);
3632
3633 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3634 }
3635
3636 /* Output the content of the resulting set, if not in STORE mode */
3637 if (!dstkey) {
3638 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3639 di = dictGetIterator(dstset->ptr);
3640 while((de = dictNext(di)) != NULL) {
3641 robj *ele;
3642
3643 ele = dictGetEntryKey(de);
3644 addReplyBulkLen(c,ele);
3645 addReply(c,ele);
3646 addReply(c,shared.crlf);
3647 }
3648 dictReleaseIterator(di);
3649 } else {
3650 /* If we have a target key where to store the resulting set
3651 * create this key with the result set inside */
3652 deleteKey(c->db,dstkey);
3653 dictAdd(c->db->dict,dstkey,dstset);
3654 incrRefCount(dstkey);
3655 }
3656
3657 /* Cleanup */
3658 if (!dstkey) {
3659 decrRefCount(dstset);
3660 } else {
3661 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3662 dictSize((dict*)dstset->ptr)));
3663 server.dirty++;
3664 }
3665 zfree(dv);
3666 }
3667
3668 static void sunionCommand(redisClient *c) {
3669 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3670 }
3671
3672 static void sunionstoreCommand(redisClient *c) {
3673 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3674 }
3675
3676 static void sdiffCommand(redisClient *c) {
3677 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3678 }
3679
3680 static void sdiffstoreCommand(redisClient *c) {
3681 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3682 }
3683
3684 /* ==================================== ZSets =============================== */
3685
3686 /* ZSETs are ordered sets using two data structures to hold the same elements
3687 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3688 * data structure.
3689 *
3690 * The elements are added to an hash table mapping Redis objects to scores.
3691 * At the same time the elements are added to a skip list mapping scores
3692 * to Redis objects (so objects are sorted by scores in this "view"). */
3693
3694 /* This skiplist implementation is almost a C translation of the original
3695 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3696 * Alternative to Balanced Trees", modified in three ways:
3697 * a) this implementation allows for repeated values.
3698 * b) the comparison is not just by key (our 'score') but by satellite data.
3699 * c) there is a back pointer, so it's a doubly linked list with the back
3700 * pointers being only at "level 1". This allows to traverse the list
3701 * from tail to head, useful for ZREVRANGE. */
3702
3703 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3704 zskiplistNode *zn = zmalloc(sizeof(*zn));
3705
3706 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3707 zn->score = score;
3708 zn->obj = obj;
3709 return zn;
3710 }
3711
3712 static zskiplist *zslCreate(void) {
3713 int j;
3714 zskiplist *zsl;
3715
3716 zsl = zmalloc(sizeof(*zsl));
3717 zsl->level = 1;
3718 zsl->length = 0;
3719 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3720 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3721 zsl->header->forward[j] = NULL;
3722 return zsl;
3723 }
3724
3725 static void zslFreeNode(zskiplistNode *node) {
3726 decrRefCount(node->obj);
3727 zfree(node);
3728 }
3729
3730 static void zslFree(zskiplist *zsl) {
3731 zskiplistNode *node = zsl->header->forward[1], *next;
3732
3733 while(node) {
3734 next = node->forward[1];
3735 zslFreeNode(node);
3736 node = next;
3737 }
3738 }
3739
3740 static int zslRandomLevel(void) {
3741 int level = 1;
3742 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3743 level += 1;
3744 return level;
3745 }
3746
3747 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3748 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3749 int i, level;
3750
3751 x = zsl->header;
3752 for (i = zsl->level-1; i >= 0; i--) {
3753 while (x->forward[i] && x->forward[i]->score < score)
3754 x = x->forward[i];
3755 update[i] = x;
3756 }
3757 /* we assume the key is not already inside, since we allow duplicated
3758 * scores, and the re-insertion of score and redis object should never
3759 * happpen since the caller of zslInsert() should test in the hash table
3760 * if the element is already inside or not. */
3761 level = zslRandomLevel();
3762 if (level > zsl->level) {
3763 for (i = zsl->level; i < level; i++)
3764 update[i] = zsl->header;
3765 zsl->level = level;
3766 }
3767 x = zslCreateNode(level,score,obj);
3768 for (i = 0; i < level; i++) {
3769 x->forward[i] = update[i]->forward[i];
3770 update[i]->forward[i] = x;
3771 }
3772 zsl->length++;
3773 }
3774
3775 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
3776 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3777 int i;
3778
3779 x = zsl->header;
3780 for (i = zsl->level-1; i >= 0; i--) {
3781 while (x->forward[i] && x->forward[i]->score < score)
3782 x = x->forward[i];
3783 update[i] = x;
3784 }
3785 /* We may have multiple elements with the same score, what we need
3786 * is to find the element with both the right score and object. */
3787 x = x->forward[0];
3788 while(x->score == score) {
3789 if (compareStringObjects(x->obj,obj) == 0) {
3790 for (i = 0; i < zsl->level; i++) {
3791 if (update[i]->forward[i] != x) break;
3792 update[i]->forward[i] = x->forward[i];
3793 }
3794 zslFreeNode(x);
3795 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
3796 zsl->level--;
3797 return 1;
3798 } else {
3799 x = x->forward[0];
3800 if (!x) return 0; /* end of the list reached, not found */
3801 }
3802 }
3803 return 0; /* not found */
3804 }
3805
3806 /* The actual Z-commands implementations */
3807
3808 static void zaddCommand(redisClient *c) {
3809 robj *zsetobj;
3810 zset *zs;
3811 double *score;
3812
3813 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
3814 if (zsetobj == NULL) {
3815 zsetobj = createZsetObject();
3816 dictAdd(c->db->dict,c->argv[1],zsetobj);
3817 incrRefCount(c->argv[1]);
3818 } else {
3819 if (zsetobj->type != REDIS_ZSET) {
3820 addReply(c,shared.wrongtypeerr);
3821 return;
3822 }
3823 }
3824 score = zmalloc(sizeof(double));
3825 *score = strtod(c->argv[2]->ptr,NULL);
3826 zs = zsetobj->ptr;
3827 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
3828 /* case 1: New element */
3829 incrRefCount(c->argv[3]); /* added to hash */
3830 zslInsert(zs->zsl,*score,c->argv[3]);
3831 incrRefCount(c->argv[3]); /* added to skiplist */
3832 server.dirty++;
3833 addReply(c,shared.cone);
3834 } else {
3835 dictEntry *de;
3836 double *oldscore;
3837
3838 /* case 2: Score update operation */
3839 de = dictFind(zs->dict,c->argv[3]);
3840 assert(de != NULL);
3841 oldscore = dictGetEntryVal(de);
3842 if (*score != *oldscore) {
3843 int deleted;
3844
3845 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
3846 assert(deleted != 0);
3847 zslInsert(zs->zsl,*score,c->argv[3]);
3848 incrRefCount(c->argv[3]);
3849 server.dirty++;
3850 }
3851 addReply(c,shared.czero);
3852 }
3853 }
3854
3855 static void zrangeCommand(redisClient *c) {
3856 robj *o;
3857 int start = atoi(c->argv[2]->ptr);
3858 int end = atoi(c->argv[3]->ptr);
3859
3860 o = lookupKeyRead(c->db,c->argv[1]);
3861 if (o == NULL) {
3862 addReply(c,shared.nullmultibulk);
3863 } else {
3864 if (o->type != REDIS_ZSET) {
3865 addReply(c,shared.wrongtypeerr);
3866 } else {
3867 zset *zsetobj = o->ptr;
3868 zskiplist *zsl = zsetobj->zsl;
3869 zskiplistNode *ln;
3870
3871 int llen = zsl->length;
3872 int rangelen, j;
3873 robj *ele;
3874
3875 /* convert negative indexes */
3876 if (start < 0) start = llen+start;
3877 if (end < 0) end = llen+end;
3878 if (start < 0) start = 0;
3879 if (end < 0) end = 0;
3880
3881 /* indexes sanity checks */
3882 if (start > end || start >= llen) {
3883 /* Out of range start or start > end result in empty list */
3884 addReply(c,shared.emptymultibulk);
3885 return;
3886 }
3887 if (end >= llen) end = llen-1;
3888 rangelen = (end-start)+1;
3889
3890 /* Return the result in form of a multi-bulk reply */
3891 ln = zsl->header->forward[0];
3892 while (start--)
3893 ln = ln->forward[0];
3894
3895 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3896 for (j = 0; j < rangelen; j++) {
3897 ele = ln->obj;
3898 addReplyBulkLen(c,ele);
3899 addReply(c,ele);
3900 addReply(c,shared.crlf);
3901 ln = ln->forward[0];
3902 }
3903 }
3904 }
3905 }
3906
3907 static void zlenCommand(redisClient *c) {
3908 robj *o;
3909 zset *zs;
3910
3911 o = lookupKeyRead(c->db,c->argv[1]);
3912 if (o == NULL) {
3913 addReply(c,shared.czero);
3914 return;
3915 } else {
3916 if (o->type != REDIS_ZSET) {
3917 addReply(c,shared.wrongtypeerr);
3918 } else {
3919 zs = o->ptr;
3920 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
3921 }
3922 }
3923 }
3924
3925 /* ========================= Non type-specific commands ==================== */
3926
3927 static void flushdbCommand(redisClient *c) {
3928 server.dirty += dictSize(c->db->dict);
3929 dictEmpty(c->db->dict);
3930 dictEmpty(c->db->expires);
3931 addReply(c,shared.ok);
3932 }
3933
3934 static void flushallCommand(redisClient *c) {
3935 server.dirty += emptyDb();
3936 addReply(c,shared.ok);
3937 rdbSave(server.dbfilename);
3938 server.dirty++;
3939 }
3940
3941 static redisSortOperation *createSortOperation(int type, robj *pattern) {
3942 redisSortOperation *so = zmalloc(sizeof(*so));
3943 so->type = type;
3944 so->pattern = pattern;
3945 return so;
3946 }
3947
3948 /* Return the value associated to the key with a name obtained
3949 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3950 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3951 char *p;
3952 sds spat, ssub;
3953 robj keyobj;
3954 int prefixlen, sublen, postfixlen;
3955 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3956 struct {
3957 long len;
3958 long free;
3959 char buf[REDIS_SORTKEY_MAX+1];
3960 } keyname;
3961
3962 if (subst->encoding == REDIS_ENCODING_RAW)
3963 incrRefCount(subst);
3964 else {
3965 subst = getDecodedObject(subst);
3966 }
3967
3968 spat = pattern->ptr;
3969 ssub = subst->ptr;
3970 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3971 p = strchr(spat,'*');
3972 if (!p) return NULL;
3973
3974 prefixlen = p-spat;
3975 sublen = sdslen(ssub);
3976 postfixlen = sdslen(spat)-(prefixlen+1);
3977 memcpy(keyname.buf,spat,prefixlen);
3978 memcpy(keyname.buf+prefixlen,ssub,sublen);
3979 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3980 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3981 keyname.len = prefixlen+sublen+postfixlen;
3982
3983 keyobj.refcount = 1;
3984 keyobj.type = REDIS_STRING;
3985 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3986
3987 decrRefCount(subst);
3988
3989 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3990 return lookupKeyRead(db,&keyobj);
3991 }
3992
3993 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3994 * the additional parameter is not standard but a BSD-specific we have to
3995 * pass sorting parameters via the global 'server' structure */
3996 static int sortCompare(const void *s1, const void *s2) {
3997 const redisSortObject *so1 = s1, *so2 = s2;
3998 int cmp;
3999
4000 if (!server.sort_alpha) {
4001 /* Numeric sorting. Here it's trivial as we precomputed scores */
4002 if (so1->u.score > so2->u.score) {
4003 cmp = 1;
4004 } else if (so1->u.score < so2->u.score) {
4005 cmp = -1;
4006 } else {
4007 cmp = 0;
4008 }
4009 } else {
4010 /* Alphanumeric sorting */
4011 if (server.sort_bypattern) {
4012 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4013 /* At least one compare object is NULL */
4014 if (so1->u.cmpobj == so2->u.cmpobj)
4015 cmp = 0;
4016 else if (so1->u.cmpobj == NULL)
4017 cmp = -1;
4018 else
4019 cmp = 1;
4020 } else {
4021 /* We have both the objects, use strcoll */
4022 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4023 }
4024 } else {
4025 /* Compare elements directly */
4026 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4027 so2->obj->encoding == REDIS_ENCODING_RAW) {
4028 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4029 } else {
4030 robj *dec1, *dec2;
4031
4032 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4033 so1->obj : getDecodedObject(so1->obj);
4034 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4035 so2->obj : getDecodedObject(so2->obj);
4036 cmp = strcoll(dec1->ptr,dec2->ptr);
4037 if (dec1 != so1->obj) decrRefCount(dec1);
4038 if (dec2 != so2->obj) decrRefCount(dec2);
4039 }
4040 }
4041 }
4042 return server.sort_desc ? -cmp : cmp;
4043 }
4044
4045 /* The SORT command is the most complex command in Redis. Warning: this code
4046 * is optimized for speed and a bit less for readability */
4047 static void sortCommand(redisClient *c) {
4048 list *operations;
4049 int outputlen = 0;
4050 int desc = 0, alpha = 0;
4051 int limit_start = 0, limit_count = -1, start, end;
4052 int j, dontsort = 0, vectorlen;
4053 int getop = 0; /* GET operation counter */
4054 robj *sortval, *sortby = NULL;
4055 redisSortObject *vector; /* Resulting vector to sort */
4056
4057 /* Lookup the key to sort. It must be of the right types */
4058 sortval = lookupKeyRead(c->db,c->argv[1]);
4059 if (sortval == NULL) {
4060 addReply(c,shared.nokeyerr);
4061 return;
4062 }
4063 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4064 addReply(c,shared.wrongtypeerr);
4065 return;
4066 }
4067
4068 /* Create a list of operations to perform for every sorted element.
4069 * Operations can be GET/DEL/INCR/DECR */
4070 operations = listCreate();
4071 listSetFreeMethod(operations,zfree);
4072 j = 2;
4073
4074 /* Now we need to protect sortval incrementing its count, in the future
4075 * SORT may have options able to overwrite/delete keys during the sorting
4076 * and the sorted key itself may get destroied */
4077 incrRefCount(sortval);
4078
4079 /* The SORT command has an SQL-alike syntax, parse it */
4080 while(j < c->argc) {
4081 int leftargs = c->argc-j-1;
4082 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4083 desc = 0;
4084 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4085 desc = 1;
4086 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4087 alpha = 1;
4088 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4089 limit_start = atoi(c->argv[j+1]->ptr);
4090 limit_count = atoi(c->argv[j+2]->ptr);
4091 j+=2;
4092 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4093 sortby = c->argv[j+1];
4094 /* If the BY pattern does not contain '*', i.e. it is constant,
4095 * we don't need to sort nor to lookup the weight keys. */
4096 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4097 j++;
4098 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4099 listAddNodeTail(operations,createSortOperation(
4100 REDIS_SORT_GET,c->argv[j+1]));
4101 getop++;
4102 j++;
4103 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
4104 listAddNodeTail(operations,createSortOperation(
4105 REDIS_SORT_DEL,c->argv[j+1]));
4106 j++;
4107 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
4108 listAddNodeTail(operations,createSortOperation(
4109 REDIS_SORT_INCR,c->argv[j+1]));
4110 j++;
4111 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4112 listAddNodeTail(operations,createSortOperation(
4113 REDIS_SORT_DECR,c->argv[j+1]));
4114 j++;
4115 } else {
4116 decrRefCount(sortval);
4117 listRelease(operations);
4118 addReply(c,shared.syntaxerr);
4119 return;
4120 }
4121 j++;
4122 }
4123
4124 /* Load the sorting vector with all the objects to sort */
4125 vectorlen = (sortval->type == REDIS_LIST) ?
4126 listLength((list*)sortval->ptr) :
4127 dictSize((dict*)sortval->ptr);
4128 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4129 j = 0;
4130 if (sortval->type == REDIS_LIST) {
4131 list *list = sortval->ptr;
4132 listNode *ln;
4133
4134 listRewind(list);
4135 while((ln = listYield(list))) {
4136 robj *ele = ln->value;
4137 vector[j].obj = ele;
4138 vector[j].u.score = 0;
4139 vector[j].u.cmpobj = NULL;
4140 j++;
4141 }
4142 } else {
4143 dict *set = sortval->ptr;
4144 dictIterator *di;
4145 dictEntry *setele;
4146
4147 di = dictGetIterator(set);
4148 while((setele = dictNext(di)) != NULL) {
4149 vector[j].obj = dictGetEntryKey(setele);
4150 vector[j].u.score = 0;
4151 vector[j].u.cmpobj = NULL;
4152 j++;
4153 }
4154 dictReleaseIterator(di);
4155 }
4156 assert(j == vectorlen);
4157
4158 /* Now it's time to load the right scores in the sorting vector */
4159 if (dontsort == 0) {
4160 for (j = 0; j < vectorlen; j++) {
4161 if (sortby) {
4162 robj *byval;
4163
4164 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4165 if (!byval || byval->type != REDIS_STRING) continue;
4166 if (alpha) {
4167 if (byval->encoding == REDIS_ENCODING_RAW) {
4168 vector[j].u.cmpobj = byval;
4169 incrRefCount(byval);
4170 } else {
4171 vector[j].u.cmpobj = getDecodedObject(byval);
4172 }
4173 } else {
4174 if (byval->encoding == REDIS_ENCODING_RAW) {
4175 vector[j].u.score = strtod(byval->ptr,NULL);
4176 } else {
4177 if (byval->encoding == REDIS_ENCODING_INT) {
4178 vector[j].u.score = (long)byval->ptr;
4179 } else
4180 assert(1 != 1);
4181 }
4182 }
4183 } else {
4184 if (!alpha) {
4185 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4186 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4187 else {
4188 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4189 vector[j].u.score = (long) vector[j].obj->ptr;
4190 else
4191 assert(1 != 1);
4192 }
4193 }
4194 }
4195 }
4196 }
4197
4198 /* We are ready to sort the vector... perform a bit of sanity check
4199 * on the LIMIT option too. We'll use a partial version of quicksort. */
4200 start = (limit_start < 0) ? 0 : limit_start;
4201 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4202 if (start >= vectorlen) {
4203 start = vectorlen-1;
4204 end = vectorlen-2;
4205 }
4206 if (end >= vectorlen) end = vectorlen-1;
4207
4208 if (dontsort == 0) {
4209 server.sort_desc = desc;
4210 server.sort_alpha = alpha;
4211 server.sort_bypattern = sortby ? 1 : 0;
4212 if (sortby && (start != 0 || end != vectorlen-1))
4213 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4214 else
4215 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4216 }
4217
4218 /* Send command output to the output buffer, performing the specified
4219 * GET/DEL/INCR/DECR operations if any. */
4220 outputlen = getop ? getop*(end-start+1) : end-start+1;
4221 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4222 for (j = start; j <= end; j++) {
4223 listNode *ln;
4224 if (!getop) {
4225 addReplyBulkLen(c,vector[j].obj);
4226 addReply(c,vector[j].obj);
4227 addReply(c,shared.crlf);
4228 }
4229 listRewind(operations);
4230 while((ln = listYield(operations))) {
4231 redisSortOperation *sop = ln->value;
4232 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4233 vector[j].obj);
4234
4235 if (sop->type == REDIS_SORT_GET) {
4236 if (!val || val->type != REDIS_STRING) {
4237 addReply(c,shared.nullbulk);
4238 } else {
4239 addReplyBulkLen(c,val);
4240 addReply(c,val);
4241 addReply(c,shared.crlf);
4242 }
4243 } else if (sop->type == REDIS_SORT_DEL) {
4244 /* TODO */
4245 }
4246 }
4247 }
4248
4249 /* Cleanup */
4250 decrRefCount(sortval);
4251 listRelease(operations);
4252 for (j = 0; j < vectorlen; j++) {
4253 if (sortby && alpha && vector[j].u.cmpobj)
4254 decrRefCount(vector[j].u.cmpobj);
4255 }
4256 zfree(vector);
4257 }
4258
4259 static void infoCommand(redisClient *c) {
4260 sds info;
4261 time_t uptime = time(NULL)-server.stat_starttime;
4262 int j;
4263
4264 info = sdscatprintf(sdsempty(),
4265 "redis_version:%s\r\n"
4266 "arch_bits:%s\r\n"
4267 "uptime_in_seconds:%d\r\n"
4268 "uptime_in_days:%d\r\n"
4269 "connected_clients:%d\r\n"
4270 "connected_slaves:%d\r\n"
4271 "used_memory:%zu\r\n"
4272 "changes_since_last_save:%lld\r\n"
4273 "bgsave_in_progress:%d\r\n"
4274 "last_save_time:%d\r\n"
4275 "total_connections_received:%lld\r\n"
4276 "total_commands_processed:%lld\r\n"
4277 "role:%s\r\n"
4278 ,REDIS_VERSION,
4279 (sizeof(long) == 8) ? "64" : "32",
4280 uptime,
4281 uptime/(3600*24),
4282 listLength(server.clients)-listLength(server.slaves),
4283 listLength(server.slaves),
4284 server.usedmemory,
4285 server.dirty,
4286 server.bgsaveinprogress,
4287 server.lastsave,
4288 server.stat_numconnections,
4289 server.stat_numcommands,
4290 server.masterhost == NULL ? "master" : "slave"
4291 );
4292 if (server.masterhost) {
4293 info = sdscatprintf(info,
4294 "master_host:%s\r\n"
4295 "master_port:%d\r\n"
4296 "master_link_status:%s\r\n"
4297 "master_last_io_seconds_ago:%d\r\n"
4298 ,server.masterhost,
4299 server.masterport,
4300 (server.replstate == REDIS_REPL_CONNECTED) ?
4301 "up" : "down",
4302 (int)(time(NULL)-server.master->lastinteraction)
4303 );
4304 }
4305 for (j = 0; j < server.dbnum; j++) {
4306 long long keys, vkeys;
4307
4308 keys = dictSize(server.db[j].dict);
4309 vkeys = dictSize(server.db[j].expires);
4310 if (keys || vkeys) {
4311 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4312 j, keys, vkeys);
4313 }
4314 }
4315 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4316 addReplySds(c,info);
4317 addReply(c,shared.crlf);
4318 }
4319
4320 static void monitorCommand(redisClient *c) {
4321 /* ignore MONITOR if aleady slave or in monitor mode */
4322 if (c->flags & REDIS_SLAVE) return;
4323
4324 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4325 c->slaveseldb = 0;
4326 listAddNodeTail(server.monitors,c);
4327 addReply(c,shared.ok);
4328 }
4329
4330 /* ================================= Expire ================================= */
4331 static int removeExpire(redisDb *db, robj *key) {
4332 if (dictDelete(db->expires,key) == DICT_OK) {
4333 return 1;
4334 } else {
4335 return 0;
4336 }
4337 }
4338
4339 static int setExpire(redisDb *db, robj *key, time_t when) {
4340 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4341 return 0;
4342 } else {
4343 incrRefCount(key);
4344 return 1;
4345 }
4346 }
4347
4348 /* Return the expire time of the specified key, or -1 if no expire
4349 * is associated with this key (i.e. the key is non volatile) */
4350 static time_t getExpire(redisDb *db, robj *key) {
4351 dictEntry *de;
4352
4353 /* No expire? return ASAP */
4354 if (dictSize(db->expires) == 0 ||
4355 (de = dictFind(db->expires,key)) == NULL) return -1;
4356
4357 return (time_t) dictGetEntryVal(de);
4358 }
4359
4360 static int expireIfNeeded(redisDb *db, robj *key) {
4361 time_t when;
4362 dictEntry *de;
4363
4364 /* No expire? return ASAP */
4365 if (dictSize(db->expires) == 0 ||
4366 (de = dictFind(db->expires,key)) == NULL) return 0;
4367
4368 /* Lookup the expire */
4369 when = (time_t) dictGetEntryVal(de);
4370 if (time(NULL) <= when) return 0;
4371
4372 /* Delete the key */
4373 dictDelete(db->expires,key);
4374 return dictDelete(db->dict,key) == DICT_OK;
4375 }
4376
4377 static int deleteIfVolatile(redisDb *db, robj *key) {
4378 dictEntry *de;
4379
4380 /* No expire? return ASAP */
4381 if (dictSize(db->expires) == 0 ||
4382 (de = dictFind(db->expires,key)) == NULL) return 0;
4383
4384 /* Delete the key */
4385 server.dirty++;
4386 dictDelete(db->expires,key);
4387 return dictDelete(db->dict,key) == DICT_OK;
4388 }
4389
4390 static void expireCommand(redisClient *c) {
4391 dictEntry *de;
4392 int seconds = atoi(c->argv[2]->ptr);
4393
4394 de = dictFind(c->db->dict,c->argv[1]);
4395 if (de == NULL) {
4396 addReply(c,shared.czero);
4397 return;
4398 }
4399 if (seconds <= 0) {
4400 addReply(c, shared.czero);
4401 return;
4402 } else {
4403 time_t when = time(NULL)+seconds;
4404 if (setExpire(c->db,c->argv[1],when)) {
4405 addReply(c,shared.cone);
4406 server.dirty++;
4407 } else {
4408 addReply(c,shared.czero);
4409 }
4410 return;
4411 }
4412 }
4413
4414 static void ttlCommand(redisClient *c) {
4415 time_t expire;
4416 int ttl = -1;
4417
4418 expire = getExpire(c->db,c->argv[1]);
4419 if (expire != -1) {
4420 ttl = (int) (expire-time(NULL));
4421 if (ttl < 0) ttl = -1;
4422 }
4423 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4424 }
4425
4426 static void msetGenericCommand(redisClient *c, int nx) {
4427 int j;
4428
4429 if ((c->argc % 2) == 0) {
4430 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4431 return;
4432 }
4433 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4434 * set nothing at all if at least one already key exists. */
4435 if (nx) {
4436 for (j = 1; j < c->argc; j += 2) {
4437 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4438 addReply(c, shared.czero);
4439 return;
4440 }
4441 }
4442 }
4443
4444 for (j = 1; j < c->argc; j += 2) {
4445 int retval;
4446
4447 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4448 if (retval == DICT_ERR) {
4449 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4450 incrRefCount(c->argv[j+1]);
4451 } else {
4452 incrRefCount(c->argv[j]);
4453 incrRefCount(c->argv[j+1]);
4454 }
4455 removeExpire(c->db,c->argv[j]);
4456 }
4457 server.dirty += (c->argc-1)/2;
4458 addReply(c, nx ? shared.cone : shared.ok);
4459 }
4460
4461 static void msetCommand(redisClient *c) {
4462 msetGenericCommand(c,0);
4463 }
4464
4465 static void msetnxCommand(redisClient *c) {
4466 msetGenericCommand(c,1);
4467 }
4468
4469 /* =============================== Replication ============================= */
4470
4471 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4472 ssize_t nwritten, ret = size;
4473 time_t start = time(NULL);
4474
4475 timeout++;
4476 while(size) {
4477 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4478 nwritten = write(fd,ptr,size);
4479 if (nwritten == -1) return -1;
4480 ptr += nwritten;
4481 size -= nwritten;
4482 }
4483 if ((time(NULL)-start) > timeout) {
4484 errno = ETIMEDOUT;
4485 return -1;
4486 }
4487 }
4488 return ret;
4489 }
4490
4491 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4492 ssize_t nread, totread = 0;
4493 time_t start = time(NULL);
4494
4495 timeout++;
4496 while(size) {
4497 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4498 nread = read(fd,ptr,size);
4499 if (nread == -1) return -1;
4500 ptr += nread;
4501 size -= nread;
4502 totread += nread;
4503 }
4504 if ((time(NULL)-start) > timeout) {
4505 errno = ETIMEDOUT;
4506 return -1;
4507 }
4508 }
4509 return totread;
4510 }
4511
4512 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4513 ssize_t nread = 0;
4514
4515 size--;
4516 while(size) {
4517 char c;
4518
4519 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4520 if (c == '\n') {
4521 *ptr = '\0';
4522 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4523 return nread;
4524 } else {
4525 *ptr++ = c;
4526 *ptr = '\0';
4527 nread++;
4528 }
4529 }
4530 return nread;
4531 }
4532
4533 static void syncCommand(redisClient *c) {
4534 /* ignore SYNC if aleady slave or in monitor mode */
4535 if (c->flags & REDIS_SLAVE) return;
4536
4537 /* SYNC can't be issued when the server has pending data to send to
4538 * the client about already issued commands. We need a fresh reply
4539 * buffer registering the differences between the BGSAVE and the current
4540 * dataset, so that we can copy to other slaves if needed. */
4541 if (listLength(c->reply) != 0) {
4542 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4543 return;
4544 }
4545
4546 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4547 /* Here we need to check if there is a background saving operation
4548 * in progress, or if it is required to start one */
4549 if (server.bgsaveinprogress) {
4550 /* Ok a background save is in progress. Let's check if it is a good
4551 * one for replication, i.e. if there is another slave that is
4552 * registering differences since the server forked to save */
4553 redisClient *slave;
4554 listNode *ln;
4555
4556 listRewind(server.slaves);
4557 while((ln = listYield(server.slaves))) {
4558 slave = ln->value;
4559 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4560 }
4561 if (ln) {
4562 /* Perfect, the server is already registering differences for
4563 * another slave. Set the right state, and copy the buffer. */
4564 listRelease(c->reply);
4565 c->reply = listDup(slave->reply);
4566 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4567 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4568 } else {
4569 /* No way, we need to wait for the next BGSAVE in order to
4570 * register differences */
4571 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4572 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4573 }
4574 } else {
4575 /* Ok we don't have a BGSAVE in progress, let's start one */
4576 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4577 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4578 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4579 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4580 return;
4581 }
4582 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4583 }
4584 c->repldbfd = -1;
4585 c->flags |= REDIS_SLAVE;
4586 c->slaveseldb = 0;
4587 listAddNodeTail(server.slaves,c);
4588 return;
4589 }
4590
4591 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4592 redisClient *slave = privdata;
4593 REDIS_NOTUSED(el);
4594 REDIS_NOTUSED(mask);
4595 char buf[REDIS_IOBUF_LEN];
4596 ssize_t nwritten, buflen;
4597
4598 if (slave->repldboff == 0) {
4599 /* Write the bulk write count before to transfer the DB. In theory here
4600 * we don't know how much room there is in the output buffer of the
4601 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4602 * operations) will never be smaller than the few bytes we need. */
4603 sds bulkcount;
4604
4605 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4606 slave->repldbsize);
4607 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4608 {
4609 sdsfree(bulkcount);
4610 freeClient(slave);
4611 return;
4612 }
4613 sdsfree(bulkcount);
4614 }
4615 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4616 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4617 if (buflen <= 0) {
4618 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4619 (buflen == 0) ? "premature EOF" : strerror(errno));
4620 freeClient(slave);
4621 return;
4622 }
4623 if ((nwritten = write(fd,buf,buflen)) == -1) {
4624 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4625 strerror(errno));
4626 freeClient(slave);
4627 return;
4628 }
4629 slave->repldboff += nwritten;
4630 if (slave->repldboff == slave->repldbsize) {
4631 close(slave->repldbfd);
4632 slave->repldbfd = -1;
4633 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4634 slave->replstate = REDIS_REPL_ONLINE;
4635 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4636 sendReplyToClient, slave, NULL) == AE_ERR) {
4637 freeClient(slave);
4638 return;
4639 }
4640 addReplySds(slave,sdsempty());
4641 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4642 }
4643 }
4644
4645 /* This function is called at the end of every backgrond saving.
4646 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4647 * otherwise REDIS_ERR is passed to the function.
4648 *
4649 * The goal of this function is to handle slaves waiting for a successful
4650 * background saving in order to perform non-blocking synchronization. */
4651 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4652 listNode *ln;
4653 int startbgsave = 0;
4654
4655 listRewind(server.slaves);
4656 while((ln = listYield(server.slaves))) {
4657 redisClient *slave = ln->value;
4658
4659 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4660 startbgsave = 1;
4661 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4662 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4663 struct redis_stat buf;
4664
4665 if (bgsaveerr != REDIS_OK) {
4666 freeClient(slave);
4667 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4668 continue;
4669 }
4670 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4671 redis_fstat(slave->repldbfd,&buf) == -1) {
4672 freeClient(slave);
4673 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4674 continue;
4675 }
4676 slave->repldboff = 0;
4677 slave->repldbsize = buf.st_size;
4678 slave->replstate = REDIS_REPL_SEND_BULK;
4679 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4680 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4681 freeClient(slave);
4682 continue;
4683 }
4684 }
4685 }
4686 if (startbgsave) {
4687 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4688 listRewind(server.slaves);
4689 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4690 while((ln = listYield(server.slaves))) {
4691 redisClient *slave = ln->value;
4692
4693 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4694 freeClient(slave);
4695 }
4696 }
4697 }
4698 }
4699
4700 static int syncWithMaster(void) {
4701 char buf[1024], tmpfile[256];
4702 int dumpsize;
4703 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4704 int dfd;
4705
4706 if (fd == -1) {
4707 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4708 strerror(errno));
4709 return REDIS_ERR;
4710 }
4711 /* Issue the SYNC command */
4712 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4713 close(fd);
4714 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4715 strerror(errno));
4716 return REDIS_ERR;
4717 }
4718 /* Read the bulk write count */
4719 if (syncReadLine(fd,buf,1024,3600) == -1) {
4720 close(fd);
4721 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
4722 strerror(errno));
4723 return REDIS_ERR;
4724 }
4725 dumpsize = atoi(buf+1);
4726 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4727 /* Read the bulk write data on a temp file */
4728 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4729 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4730 if (dfd == -1) {
4731 close(fd);
4732 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
4733 return REDIS_ERR;
4734 }
4735 while(dumpsize) {
4736 int nread, nwritten;
4737
4738 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
4739 if (nread == -1) {
4740 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
4741 strerror(errno));
4742 close(fd);
4743 close(dfd);
4744 return REDIS_ERR;
4745 }
4746 nwritten = write(dfd,buf,nread);
4747 if (nwritten == -1) {
4748 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
4749 close(fd);
4750 close(dfd);
4751 return REDIS_ERR;
4752 }
4753 dumpsize -= nread;
4754 }
4755 close(dfd);
4756 if (rename(tmpfile,server.dbfilename) == -1) {
4757 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
4758 unlink(tmpfile);
4759 close(fd);
4760 return REDIS_ERR;
4761 }
4762 emptyDb();
4763 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4764 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4765 close(fd);
4766 return REDIS_ERR;
4767 }
4768 server.master = createClient(fd);
4769 server.master->flags |= REDIS_MASTER;
4770 server.replstate = REDIS_REPL_CONNECTED;
4771 return REDIS_OK;
4772 }
4773
4774 static void slaveofCommand(redisClient *c) {
4775 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4776 !strcasecmp(c->argv[2]->ptr,"one")) {
4777 if (server.masterhost) {
4778 sdsfree(server.masterhost);
4779 server.masterhost = NULL;
4780 if (server.master) freeClient(server.master);
4781 server.replstate = REDIS_REPL_NONE;
4782 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4783 }
4784 } else {
4785 sdsfree(server.masterhost);
4786 server.masterhost = sdsdup(c->argv[1]->ptr);
4787 server.masterport = atoi(c->argv[2]->ptr);
4788 if (server.master) freeClient(server.master);
4789 server.replstate = REDIS_REPL_CONNECT;
4790 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4791 server.masterhost, server.masterport);
4792 }
4793 addReply(c,shared.ok);
4794 }
4795
4796 /* ============================ Maxmemory directive ======================== */
4797
4798 /* This function gets called when 'maxmemory' is set on the config file to limit
4799 * the max memory used by the server, and we are out of memory.
4800 * This function will try to, in order:
4801 *
4802 * - Free objects from the free list
4803 * - Try to remove keys with an EXPIRE set
4804 *
4805 * It is not possible to free enough memory to reach used-memory < maxmemory
4806 * the server will start refusing commands that will enlarge even more the
4807 * memory usage.
4808 */
4809 static void freeMemoryIfNeeded(void) {
4810 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4811 if (listLength(server.objfreelist)) {
4812 robj *o;
4813
4814 listNode *head = listFirst(server.objfreelist);
4815 o = listNodeValue(head);
4816 listDelNode(server.objfreelist,head);
4817 zfree(o);
4818 } else {
4819 int j, k, freed = 0;
4820
4821 for (j = 0; j < server.dbnum; j++) {
4822 int minttl = -1;
4823 robj *minkey = NULL;
4824 struct dictEntry *de;
4825
4826 if (dictSize(server.db[j].expires)) {
4827 freed = 1;
4828 /* From a sample of three keys drop the one nearest to
4829 * the natural expire */
4830 for (k = 0; k < 3; k++) {
4831 time_t t;
4832
4833 de = dictGetRandomKey(server.db[j].expires);
4834 t = (time_t) dictGetEntryVal(de);
4835 if (minttl == -1 || t < minttl) {
4836 minkey = dictGetEntryKey(de);
4837 minttl = t;
4838 }
4839 }
4840 deleteKey(server.db+j,minkey);
4841 }
4842 }
4843 if (!freed) return; /* nothing to free... */
4844 }
4845 }
4846 }
4847
4848 /* ================================= Debugging ============================== */
4849
4850 static void debugCommand(redisClient *c) {
4851 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4852 *((char*)-1) = 'x';
4853 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4854 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4855 robj *key, *val;
4856
4857 if (!de) {
4858 addReply(c,shared.nokeyerr);
4859 return;
4860 }
4861 key = dictGetEntryKey(de);
4862 val = dictGetEntryVal(de);
4863 addReplySds(c,sdscatprintf(sdsempty(),
4864 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4865 key, key->refcount, val, val->refcount, val->encoding));
4866 } else {
4867 addReplySds(c,sdsnew(
4868 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4869 }
4870 }
4871
4872 #ifdef HAVE_BACKTRACE
4873 static struct redisFunctionSym symsTable[] = {
4874 {"compareStringObjects", (unsigned long)compareStringObjects},
4875 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
4876 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
4877 {"dictEncObjHash", (unsigned long)dictEncObjHash},
4878 {"incrDecrCommand", (unsigned long)incrDecrCommand},
4879 {"freeStringObject", (unsigned long)freeStringObject},
4880 {"freeListObject", (unsigned long)freeListObject},
4881 {"freeSetObject", (unsigned long)freeSetObject},
4882 {"decrRefCount", (unsigned long)decrRefCount},
4883 {"createObject", (unsigned long)createObject},
4884 {"freeClient", (unsigned long)freeClient},
4885 {"rdbLoad", (unsigned long)rdbLoad},
4886 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
4887 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
4888 {"addReply", (unsigned long)addReply},
4889 {"addReplySds", (unsigned long)addReplySds},
4890 {"incrRefCount", (unsigned long)incrRefCount},
4891 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
4892 {"createStringObject", (unsigned long)createStringObject},
4893 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
4894 {"syncWithMaster", (unsigned long)syncWithMaster},
4895 {"tryObjectSharing", (unsigned long)tryObjectSharing},
4896 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
4897 {"getDecodedObject", (unsigned long)getDecodedObject},
4898 {"removeExpire", (unsigned long)removeExpire},
4899 {"expireIfNeeded", (unsigned long)expireIfNeeded},
4900 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
4901 {"deleteKey", (unsigned long)deleteKey},
4902 {"getExpire", (unsigned long)getExpire},
4903 {"setExpire", (unsigned long)setExpire},
4904 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
4905 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
4906 {"authCommand", (unsigned long)authCommand},
4907 {"pingCommand", (unsigned long)pingCommand},
4908 {"echoCommand", (unsigned long)echoCommand},
4909 {"setCommand", (unsigned long)setCommand},
4910 {"setnxCommand", (unsigned long)setnxCommand},
4911 {"getCommand", (unsigned long)getCommand},
4912 {"delCommand", (unsigned long)delCommand},
4913 {"existsCommand", (unsigned long)existsCommand},
4914 {"incrCommand", (unsigned long)incrCommand},
4915 {"decrCommand", (unsigned long)decrCommand},
4916 {"incrbyCommand", (unsigned long)incrbyCommand},
4917 {"decrbyCommand", (unsigned long)decrbyCommand},
4918 {"selectCommand", (unsigned long)selectCommand},
4919 {"randomkeyCommand", (unsigned long)randomkeyCommand},
4920 {"keysCommand", (unsigned long)keysCommand},
4921 {"dbsizeCommand", (unsigned long)dbsizeCommand},
4922 {"lastsaveCommand", (unsigned long)lastsaveCommand},
4923 {"saveCommand", (unsigned long)saveCommand},
4924 {"bgsaveCommand", (unsigned long)bgsaveCommand},
4925 {"shutdownCommand", (unsigned long)shutdownCommand},
4926 {"moveCommand", (unsigned long)moveCommand},
4927 {"renameCommand", (unsigned long)renameCommand},
4928 {"renamenxCommand", (unsigned long)renamenxCommand},
4929 {"lpushCommand", (unsigned long)lpushCommand},
4930 {"rpushCommand", (unsigned long)rpushCommand},
4931 {"lpopCommand", (unsigned long)lpopCommand},
4932 {"rpopCommand", (unsigned long)rpopCommand},
4933 {"llenCommand", (unsigned long)llenCommand},
4934 {"lindexCommand", (unsigned long)lindexCommand},
4935 {"lrangeCommand", (unsigned long)lrangeCommand},
4936 {"ltrimCommand", (unsigned long)ltrimCommand},
4937 {"typeCommand", (unsigned long)typeCommand},
4938 {"lsetCommand", (unsigned long)lsetCommand},
4939 {"saddCommand", (unsigned long)saddCommand},
4940 {"sremCommand", (unsigned long)sremCommand},
4941 {"smoveCommand", (unsigned long)smoveCommand},
4942 {"sismemberCommand", (unsigned long)sismemberCommand},
4943 {"scardCommand", (unsigned long)scardCommand},
4944 {"spopCommand", (unsigned long)spopCommand},
4945 {"srandmemberCommand", (unsigned long)srandmemberCommand},
4946 {"sinterCommand", (unsigned long)sinterCommand},
4947 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
4948 {"sunionCommand", (unsigned long)sunionCommand},
4949 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
4950 {"sdiffCommand", (unsigned long)sdiffCommand},
4951 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
4952 {"syncCommand", (unsigned long)syncCommand},
4953 {"flushdbCommand", (unsigned long)flushdbCommand},
4954 {"flushallCommand", (unsigned long)flushallCommand},
4955 {"sortCommand", (unsigned long)sortCommand},
4956 {"lremCommand", (unsigned long)lremCommand},
4957 {"infoCommand", (unsigned long)infoCommand},
4958 {"mgetCommand", (unsigned long)mgetCommand},
4959 {"monitorCommand", (unsigned long)monitorCommand},
4960 {"expireCommand", (unsigned long)expireCommand},
4961 {"getsetCommand", (unsigned long)getsetCommand},
4962 {"ttlCommand", (unsigned long)ttlCommand},
4963 {"slaveofCommand", (unsigned long)slaveofCommand},
4964 {"debugCommand", (unsigned long)debugCommand},
4965 {"processCommand", (unsigned long)processCommand},
4966 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
4967 {"readQueryFromClient", (unsigned long)readQueryFromClient},
4968 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
4969 {"msetGenericCommand", (unsigned long)msetGenericCommand},
4970 {"msetCommand", (unsigned long)msetCommand},
4971 {"msetnxCommand", (unsigned long)msetnxCommand},
4972 {"zslCreateNode", (unsigned long)zslCreateNode},
4973 {"zslCreate", (unsigned long)zslCreate},
4974 {"zslFreeNode",(unsigned long)zslFreeNode},
4975 {"zslFree",(unsigned long)zslFree},
4976 {"zslRandomLevel",(unsigned long)zslRandomLevel},
4977 {"zslInsert",(unsigned long)zslInsert},
4978 {"zslDelete",(unsigned long)zslDelete},
4979 {"createZsetObject",(unsigned long)createZsetObject},
4980 {"zaddCommand",(unsigned long)zaddCommand},
4981 {"zrangeCommand",(unsigned long)zrangeCommand},
4982 {NULL,0}
4983 };
4984
4985 /* This function try to convert a pointer into a function name. It's used in
4986 * oreder to provide a backtrace under segmentation fault that's able to
4987 * display functions declared as static (otherwise the backtrace is useless). */
4988 static char *findFuncName(void *pointer, unsigned long *offset){
4989 int i, ret = -1;
4990 unsigned long off, minoff = 0;
4991
4992 /* Try to match against the Symbol with the smallest offset */
4993 for (i=0; symsTable[i].pointer; i++) {
4994 unsigned long lp = (unsigned long) pointer;
4995
4996 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
4997 off=lp-symsTable[i].pointer;
4998 if (ret < 0 || off < minoff) {
4999 minoff=off;
5000 ret=i;
5001 }
5002 }
5003 }
5004 if (ret == -1) return NULL;
5005 *offset = minoff;
5006 return symsTable[ret].name;
5007 }
5008
5009 static void *getMcontextEip(ucontext_t *uc) {
5010 #if defined(__FreeBSD__)
5011 return (void*) uc->uc_mcontext.mc_eip;
5012 #elif defined(__dietlibc__)
5013 return (void*) uc->uc_mcontext.eip;
5014 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5015 return (void*) uc->uc_mcontext->__ss.__eip;
5016 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5017 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5018 return (void*) uc->uc_mcontext->__ss.__rip;
5019 #else
5020 return (void*) uc->uc_mcontext->__ss.__eip;
5021 #endif
5022 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5023 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5024 #elif defined(__ia64__) /* Linux IA64 */
5025 return (void*) uc->uc_mcontext.sc_ip;
5026 #else
5027 return NULL;
5028 #endif
5029 }
5030
5031 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5032 void *trace[100];
5033 char **messages = NULL;
5034 int i, trace_size = 0;
5035 unsigned long offset=0;
5036 time_t uptime = time(NULL)-server.stat_starttime;
5037 ucontext_t *uc = (ucontext_t*) secret;
5038 REDIS_NOTUSED(info);
5039
5040 redisLog(REDIS_WARNING,
5041 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5042 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5043 "redis_version:%s; "
5044 "uptime_in_seconds:%d; "
5045 "connected_clients:%d; "
5046 "connected_slaves:%d; "
5047 "used_memory:%zu; "
5048 "changes_since_last_save:%lld; "
5049 "bgsave_in_progress:%d; "
5050 "last_save_time:%d; "
5051 "total_connections_received:%lld; "
5052 "total_commands_processed:%lld; "
5053 "role:%s;"
5054 ,REDIS_VERSION,
5055 uptime,
5056 listLength(server.clients)-listLength(server.slaves),
5057 listLength(server.slaves),
5058 server.usedmemory,
5059 server.dirty,
5060 server.bgsaveinprogress,
5061 server.lastsave,
5062 server.stat_numconnections,
5063 server.stat_numcommands,
5064 server.masterhost == NULL ? "master" : "slave"
5065 ));
5066
5067 trace_size = backtrace(trace, 100);
5068 /* overwrite sigaction with caller's address */
5069 if (getMcontextEip(uc) != NULL) {
5070 trace[1] = getMcontextEip(uc);
5071 }
5072 messages = backtrace_symbols(trace, trace_size);
5073
5074 for (i=1; i<trace_size; ++i) {
5075 char *fn = findFuncName(trace[i], &offset), *p;
5076
5077 p = strchr(messages[i],'+');
5078 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5079 redisLog(REDIS_WARNING,"%s", messages[i]);
5080 } else {
5081 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5082 }
5083 }
5084 free(messages);
5085 exit(0);
5086 }
5087
5088 static void setupSigSegvAction(void) {
5089 struct sigaction act;
5090
5091 sigemptyset (&act.sa_mask);
5092 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5093 * is used. Otherwise, sa_handler is used */
5094 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5095 act.sa_sigaction = segvHandler;
5096 sigaction (SIGSEGV, &act, NULL);
5097 sigaction (SIGBUS, &act, NULL);
5098 sigaction (SIGFPE, &act, NULL);
5099 sigaction (SIGILL, &act, NULL);
5100 sigaction (SIGBUS, &act, NULL);
5101 return;
5102 }
5103 #else /* HAVE_BACKTRACE */
5104 static void setupSigSegvAction(void) {
5105 }
5106 #endif /* HAVE_BACKTRACE */
5107
5108 /* =================================== Main! ================================ */
5109
5110 #ifdef __linux__
5111 int linuxOvercommitMemoryValue(void) {
5112 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5113 char buf[64];
5114
5115 if (!fp) return -1;
5116 if (fgets(buf,64,fp) == NULL) {
5117 fclose(fp);
5118 return -1;
5119 }
5120 fclose(fp);
5121
5122 return atoi(buf);
5123 }
5124
5125 void linuxOvercommitMemoryWarning(void) {
5126 if (linuxOvercommitMemoryValue() == 0) {
5127 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5128 }
5129 }
5130 #endif /* __linux__ */
5131
5132 static void daemonize(void) {
5133 int fd;
5134 FILE *fp;
5135
5136 if (fork() != 0) exit(0); /* parent exits */
5137 setsid(); /* create a new session */
5138
5139 /* Every output goes to /dev/null. If Redis is daemonized but
5140 * the 'logfile' is set to 'stdout' in the configuration file
5141 * it will not log at all. */
5142 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5143 dup2(fd, STDIN_FILENO);
5144 dup2(fd, STDOUT_FILENO);
5145 dup2(fd, STDERR_FILENO);
5146 if (fd > STDERR_FILENO) close(fd);
5147 }
5148 /* Try to write the pid file */
5149 fp = fopen(server.pidfile,"w");
5150 if (fp) {
5151 fprintf(fp,"%d\n",getpid());
5152 fclose(fp);
5153 }
5154 }
5155
5156 int main(int argc, char **argv) {
5157 initServerConfig();
5158 if (argc == 2) {
5159 ResetServerSaveParams();
5160 loadServerConfig(argv[1]);
5161 } else if (argc > 2) {
5162 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5163 exit(1);
5164 } else {
5165 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5166 }
5167 initServer();
5168 if (server.daemonize) daemonize();
5169 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5170 #ifdef __linux__
5171 linuxOvercommitMemoryWarning();
5172 #endif
5173 if (rdbLoad(server.dbfilename) == REDIS_OK)
5174 redisLog(REDIS_NOTICE,"DB loaded from disk");
5175 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5176 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5177 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5178 aeMain(server.el);
5179 aeDeleteEventLoop(server.el);
5180 return 0;
5181 }