]> git.saurik.com Git - redis.git/blob - redis.c
ee99792bbd084d91e6be38e9083733e1c9cb6535
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60
61 #include "redis.h"
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
70
71 /* Error codes */
72 #define REDIS_OK 0
73 #define REDIS_ERR -1
74
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
88
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
91
92 /* Command flags */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
100
101 /* Object types */
102 #define REDIS_STRING 0
103 #define REDIS_LIST 1
104 #define REDIS_SET 2
105 #define REDIS_ZSET 3
106 #define REDIS_HASH 4
107
108 /* Objects encoding */
109 #define REDIS_ENCODING_RAW 0 /* Raw representation */
110 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
111
112 /* Object types only used for dumping to disk */
113 #define REDIS_EXPIRETIME 253
114 #define REDIS_SELECTDB 254
115 #define REDIS_EOF 255
116
117 /* Defines related to the dump file format. To store 32 bits lengths for short
118 * keys requires a lot of space, so we check the most significant 2 bits of
119 * the first byte to interpreter the length:
120 *
121 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
122 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
123 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
124 * 11|000000 this means: specially encoded object will follow. The six bits
125 * number specify the kind of object that follows.
126 * See the REDIS_RDB_ENC_* defines.
127 *
128 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
129 * values, will fit inside. */
130 #define REDIS_RDB_6BITLEN 0
131 #define REDIS_RDB_14BITLEN 1
132 #define REDIS_RDB_32BITLEN 2
133 #define REDIS_RDB_ENCVAL 3
134 #define REDIS_RDB_LENERR UINT_MAX
135
136 /* When a length of a string object stored on disk has the first two bits
137 * set, the remaining two bits specify a special encoding for the object
138 * accordingly to the following defines: */
139 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
140 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
141 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
142 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
143
144 /* Client flags */
145 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
146 #define REDIS_SLAVE 2 /* This client is a slave server */
147 #define REDIS_MASTER 4 /* This client is a master server */
148 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
149
150 /* Slave replication state - slave side */
151 #define REDIS_REPL_NONE 0 /* No active replication */
152 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
153 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
154
155 /* Slave replication state - from the point of view of master
156 * Note that in SEND_BULK and ONLINE state the slave receives new updates
157 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
158 * to start the next background saving in order to send updates to it. */
159 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
160 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
161 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
162 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
163
164 /* List related stuff */
165 #define REDIS_HEAD 0
166 #define REDIS_TAIL 1
167
168 /* Sort operations */
169 #define REDIS_SORT_GET 0
170 #define REDIS_SORT_DEL 1
171 #define REDIS_SORT_INCR 2
172 #define REDIS_SORT_DECR 3
173 #define REDIS_SORT_ASC 4
174 #define REDIS_SORT_DESC 5
175 #define REDIS_SORTKEY_MAX 1024
176
177 /* Log levels */
178 #define REDIS_DEBUG 0
179 #define REDIS_NOTICE 1
180 #define REDIS_WARNING 2
181
182 /* Anti-warning macro... */
183 #define REDIS_NOTUSED(V) ((void) V)
184
185 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
186 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
187
188 /*================================= Data types ============================== */
189
190 /* A redis object, that is a type able to hold a string / list / set */
191 typedef struct redisObject {
192 void *ptr;
193 unsigned char type;
194 unsigned char encoding;
195 unsigned char notused[2];
196 int refcount;
197 } robj;
198
199 typedef struct redisDb {
200 dict *dict;
201 dict *expires;
202 int id;
203 } redisDb;
204
205 /* With multiplexing we need to take per-clinet state.
206 * Clients are taken in a liked list. */
207 typedef struct redisClient {
208 int fd;
209 redisDb *db;
210 int dictid;
211 sds querybuf;
212 robj **argv, **mbargv;
213 int argc, mbargc;
214 int bulklen; /* bulk read len. -1 if not in bulk read mode */
215 int multibulk; /* multi bulk command format active */
216 list *reply;
217 int sentlen;
218 time_t lastinteraction; /* time of the last interaction, used for timeout */
219 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
220 int slaveseldb; /* slave selected db, if this client is a slave */
221 int authenticated; /* when requirepass is non-NULL */
222 int replstate; /* replication state if this is a slave */
223 int repldbfd; /* replication DB file descriptor */
224 long repldboff; /* replication DB file offset */
225 off_t repldbsize; /* replication DB file size */
226 } redisClient;
227
228 struct saveparam {
229 time_t seconds;
230 int changes;
231 };
232
233 /* Global server state structure */
234 struct redisServer {
235 int port;
236 int fd;
237 redisDb *db;
238 dict *sharingpool;
239 unsigned int sharingpoolsize;
240 long long dirty; /* changes to DB from the last save */
241 list *clients;
242 list *slaves, *monitors;
243 char neterr[ANET_ERR_LEN];
244 aeEventLoop *el;
245 int cronloops; /* number of times the cron function run */
246 list *objfreelist; /* A list of freed objects to avoid malloc() */
247 time_t lastsave; /* Unix time of last save succeeede */
248 size_t usedmemory; /* Used memory in megabytes */
249 /* Fields used only for stats */
250 time_t stat_starttime; /* server start time */
251 long long stat_numcommands; /* number of processed commands */
252 long long stat_numconnections; /* number of connections received */
253 /* Configuration */
254 int verbosity;
255 int glueoutputbuf;
256 int maxidletime;
257 int dbnum;
258 int daemonize;
259 char *pidfile;
260 int bgsaveinprogress;
261 pid_t bgsavechildpid;
262 struct saveparam *saveparams;
263 int saveparamslen;
264 char *logfile;
265 char *bindaddr;
266 char *dbfilename;
267 char *requirepass;
268 int shareobjects;
269 /* Replication related */
270 int isslave;
271 char *masterhost;
272 int masterport;
273 redisClient *master; /* client that is master for this slave */
274 int replstate;
275 unsigned int maxclients;
276 unsigned long maxmemory;
277 /* Sort parameters - qsort_r() is only available under BSD so we
278 * have to take this state global, in order to pass it to sortCompare() */
279 int sort_desc;
280 int sort_alpha;
281 int sort_bypattern;
282 };
283
284 typedef void redisCommandProc(redisClient *c);
285 struct redisCommand {
286 char *name;
287 redisCommandProc *proc;
288 int arity;
289 int flags;
290 };
291
292 struct redisFunctionSym {
293 char *name;
294 unsigned long pointer;
295 };
296
297 typedef struct _redisSortObject {
298 robj *obj;
299 union {
300 double score;
301 robj *cmpobj;
302 } u;
303 } redisSortObject;
304
305 typedef struct _redisSortOperation {
306 int type;
307 robj *pattern;
308 } redisSortOperation;
309
310 /* ZSETs use a specialized version of Skiplists */
311
312 typedef struct zskiplistNode {
313 struct zskiplistNode **forward;
314 double score;
315 robj *obj;
316 } zskiplistNode;
317
318 typedef struct zskiplist {
319 struct zskiplistNode *header;
320 long length;
321 int level;
322 } zskiplist;
323
324 typedef struct zset {
325 dict *dict;
326 zskiplist *zsl;
327 } zset;
328
329 /* Our shared "common" objects */
330
331 struct sharedObjectsStruct {
332 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
333 *colon, *nullbulk, *nullmultibulk,
334 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
335 *outofrangeerr, *plus,
336 *select0, *select1, *select2, *select3, *select4,
337 *select5, *select6, *select7, *select8, *select9;
338 } shared;
339
340 /*================================ Prototypes =============================== */
341
342 static void freeStringObject(robj *o);
343 static void freeListObject(robj *o);
344 static void freeSetObject(robj *o);
345 static void decrRefCount(void *o);
346 static robj *createObject(int type, void *ptr);
347 static void freeClient(redisClient *c);
348 static int rdbLoad(char *filename);
349 static void addReply(redisClient *c, robj *obj);
350 static void addReplySds(redisClient *c, sds s);
351 static void incrRefCount(robj *o);
352 static int rdbSaveBackground(char *filename);
353 static robj *createStringObject(char *ptr, size_t len);
354 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
355 static int syncWithMaster(void);
356 static robj *tryObjectSharing(robj *o);
357 static int tryObjectEncoding(robj *o);
358 static robj *getDecodedObject(const robj *o);
359 static int removeExpire(redisDb *db, robj *key);
360 static int expireIfNeeded(redisDb *db, robj *key);
361 static int deleteIfVolatile(redisDb *db, robj *key);
362 static int deleteKey(redisDb *db, robj *key);
363 static time_t getExpire(redisDb *db, robj *key);
364 static int setExpire(redisDb *db, robj *key, time_t when);
365 static void updateSlavesWaitingBgsave(int bgsaveerr);
366 static void freeMemoryIfNeeded(void);
367 static int processCommand(redisClient *c);
368 static void setupSigSegvAction(void);
369 static void rdbRemoveTempFile(pid_t childpid);
370 static size_t stringObjectLen(robj *o);
371 static void processInputBuffer(redisClient *c);
372 static zskiplist *zslCreate(void);
373 static void zslFree(zskiplist *zsl);
374
375 static void authCommand(redisClient *c);
376 static void pingCommand(redisClient *c);
377 static void echoCommand(redisClient *c);
378 static void setCommand(redisClient *c);
379 static void setnxCommand(redisClient *c);
380 static void getCommand(redisClient *c);
381 static void delCommand(redisClient *c);
382 static void existsCommand(redisClient *c);
383 static void incrCommand(redisClient *c);
384 static void decrCommand(redisClient *c);
385 static void incrbyCommand(redisClient *c);
386 static void decrbyCommand(redisClient *c);
387 static void selectCommand(redisClient *c);
388 static void randomkeyCommand(redisClient *c);
389 static void keysCommand(redisClient *c);
390 static void dbsizeCommand(redisClient *c);
391 static void lastsaveCommand(redisClient *c);
392 static void saveCommand(redisClient *c);
393 static void bgsaveCommand(redisClient *c);
394 static void shutdownCommand(redisClient *c);
395 static void moveCommand(redisClient *c);
396 static void renameCommand(redisClient *c);
397 static void renamenxCommand(redisClient *c);
398 static void lpushCommand(redisClient *c);
399 static void rpushCommand(redisClient *c);
400 static void lpopCommand(redisClient *c);
401 static void rpopCommand(redisClient *c);
402 static void llenCommand(redisClient *c);
403 static void lindexCommand(redisClient *c);
404 static void lrangeCommand(redisClient *c);
405 static void ltrimCommand(redisClient *c);
406 static void typeCommand(redisClient *c);
407 static void lsetCommand(redisClient *c);
408 static void saddCommand(redisClient *c);
409 static void sremCommand(redisClient *c);
410 static void smoveCommand(redisClient *c);
411 static void sismemberCommand(redisClient *c);
412 static void scardCommand(redisClient *c);
413 static void spopCommand(redisClient *c);
414 static void srandmemberCommand(redisClient *c);
415 static void sinterCommand(redisClient *c);
416 static void sinterstoreCommand(redisClient *c);
417 static void sunionCommand(redisClient *c);
418 static void sunionstoreCommand(redisClient *c);
419 static void sdiffCommand(redisClient *c);
420 static void sdiffstoreCommand(redisClient *c);
421 static void syncCommand(redisClient *c);
422 static void flushdbCommand(redisClient *c);
423 static void flushallCommand(redisClient *c);
424 static void sortCommand(redisClient *c);
425 static void lremCommand(redisClient *c);
426 static void infoCommand(redisClient *c);
427 static void mgetCommand(redisClient *c);
428 static void monitorCommand(redisClient *c);
429 static void expireCommand(redisClient *c);
430 static void getsetCommand(redisClient *c);
431 static void ttlCommand(redisClient *c);
432 static void slaveofCommand(redisClient *c);
433 static void debugCommand(redisClient *c);
434 static void msetCommand(redisClient *c);
435 static void msetnxCommand(redisClient *c);
436 static void zaddCommand(redisClient *c);
437 static void zrangeCommand(redisClient *c);
438 static void zlenCommand(redisClient *c);
439
440 /*================================= Globals ================================= */
441
442 /* Global vars */
443 static struct redisServer server; /* server global state */
444 static struct redisCommand cmdTable[] = {
445 {"get",getCommand,2,REDIS_CMD_INLINE},
446 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
447 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
448 {"del",delCommand,-2,REDIS_CMD_INLINE},
449 {"exists",existsCommand,2,REDIS_CMD_INLINE},
450 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
451 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
452 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
453 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
454 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
455 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
456 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
457 {"llen",llenCommand,2,REDIS_CMD_INLINE},
458 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
459 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
460 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
461 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
462 {"lrem",lremCommand,4,REDIS_CMD_BULK},
463 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
464 {"srem",sremCommand,3,REDIS_CMD_BULK},
465 {"smove",smoveCommand,4,REDIS_CMD_BULK},
466 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
467 {"scard",scardCommand,2,REDIS_CMD_INLINE},
468 {"spop",spopCommand,2,REDIS_CMD_INLINE},
469 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
470 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
471 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
472 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
473 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
474 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
475 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
476 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
477 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
478 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
479 {"zlen",zlenCommand,2,REDIS_CMD_INLINE},
480 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
481 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
482 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
483 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
484 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
485 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
486 {"select",selectCommand,2,REDIS_CMD_INLINE},
487 {"move",moveCommand,3,REDIS_CMD_INLINE},
488 {"rename",renameCommand,3,REDIS_CMD_INLINE},
489 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
490 {"expire",expireCommand,3,REDIS_CMD_INLINE},
491 {"keys",keysCommand,2,REDIS_CMD_INLINE},
492 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
493 {"auth",authCommand,2,REDIS_CMD_INLINE},
494 {"ping",pingCommand,1,REDIS_CMD_INLINE},
495 {"echo",echoCommand,2,REDIS_CMD_BULK},
496 {"save",saveCommand,1,REDIS_CMD_INLINE},
497 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
498 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
499 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
500 {"type",typeCommand,2,REDIS_CMD_INLINE},
501 {"sync",syncCommand,1,REDIS_CMD_INLINE},
502 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
503 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
504 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
505 {"info",infoCommand,1,REDIS_CMD_INLINE},
506 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
507 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
508 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
509 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
510 {NULL,NULL,0,0}
511 };
512 /*============================ Utility functions ============================ */
513
514 /* Glob-style pattern matching. */
515 int stringmatchlen(const char *pattern, int patternLen,
516 const char *string, int stringLen, int nocase)
517 {
518 while(patternLen) {
519 switch(pattern[0]) {
520 case '*':
521 while (pattern[1] == '*') {
522 pattern++;
523 patternLen--;
524 }
525 if (patternLen == 1)
526 return 1; /* match */
527 while(stringLen) {
528 if (stringmatchlen(pattern+1, patternLen-1,
529 string, stringLen, nocase))
530 return 1; /* match */
531 string++;
532 stringLen--;
533 }
534 return 0; /* no match */
535 break;
536 case '?':
537 if (stringLen == 0)
538 return 0; /* no match */
539 string++;
540 stringLen--;
541 break;
542 case '[':
543 {
544 int not, match;
545
546 pattern++;
547 patternLen--;
548 not = pattern[0] == '^';
549 if (not) {
550 pattern++;
551 patternLen--;
552 }
553 match = 0;
554 while(1) {
555 if (pattern[0] == '\\') {
556 pattern++;
557 patternLen--;
558 if (pattern[0] == string[0])
559 match = 1;
560 } else if (pattern[0] == ']') {
561 break;
562 } else if (patternLen == 0) {
563 pattern--;
564 patternLen++;
565 break;
566 } else if (pattern[1] == '-' && patternLen >= 3) {
567 int start = pattern[0];
568 int end = pattern[2];
569 int c = string[0];
570 if (start > end) {
571 int t = start;
572 start = end;
573 end = t;
574 }
575 if (nocase) {
576 start = tolower(start);
577 end = tolower(end);
578 c = tolower(c);
579 }
580 pattern += 2;
581 patternLen -= 2;
582 if (c >= start && c <= end)
583 match = 1;
584 } else {
585 if (!nocase) {
586 if (pattern[0] == string[0])
587 match = 1;
588 } else {
589 if (tolower((int)pattern[0]) == tolower((int)string[0]))
590 match = 1;
591 }
592 }
593 pattern++;
594 patternLen--;
595 }
596 if (not)
597 match = !match;
598 if (!match)
599 return 0; /* no match */
600 string++;
601 stringLen--;
602 break;
603 }
604 case '\\':
605 if (patternLen >= 2) {
606 pattern++;
607 patternLen--;
608 }
609 /* fall through */
610 default:
611 if (!nocase) {
612 if (pattern[0] != string[0])
613 return 0; /* no match */
614 } else {
615 if (tolower((int)pattern[0]) != tolower((int)string[0]))
616 return 0; /* no match */
617 }
618 string++;
619 stringLen--;
620 break;
621 }
622 pattern++;
623 patternLen--;
624 if (stringLen == 0) {
625 while(*pattern == '*') {
626 pattern++;
627 patternLen--;
628 }
629 break;
630 }
631 }
632 if (patternLen == 0 && stringLen == 0)
633 return 1;
634 return 0;
635 }
636
637 static void redisLog(int level, const char *fmt, ...) {
638 va_list ap;
639 FILE *fp;
640
641 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
642 if (!fp) return;
643
644 va_start(ap, fmt);
645 if (level >= server.verbosity) {
646 char *c = ".-*";
647 char buf[64];
648 time_t now;
649
650 now = time(NULL);
651 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
652 fprintf(fp,"%s %c ",buf,c[level]);
653 vfprintf(fp, fmt, ap);
654 fprintf(fp,"\n");
655 fflush(fp);
656 }
657 va_end(ap);
658
659 if (server.logfile) fclose(fp);
660 }
661
662 /*====================== Hash table type implementation ==================== */
663
664 /* This is an hash table type that uses the SDS dynamic strings libary as
665 * keys and radis objects as values (objects can hold SDS strings,
666 * lists, sets). */
667
668 static void dictVanillaFree(void *privdata, void *val)
669 {
670 DICT_NOTUSED(privdata);
671 zfree(val);
672 }
673
674 static int sdsDictKeyCompare(void *privdata, const void *key1,
675 const void *key2)
676 {
677 int l1,l2;
678 DICT_NOTUSED(privdata);
679
680 l1 = sdslen((sds)key1);
681 l2 = sdslen((sds)key2);
682 if (l1 != l2) return 0;
683 return memcmp(key1, key2, l1) == 0;
684 }
685
686 static void dictRedisObjectDestructor(void *privdata, void *val)
687 {
688 DICT_NOTUSED(privdata);
689
690 decrRefCount(val);
691 }
692
693 static int dictObjKeyCompare(void *privdata, const void *key1,
694 const void *key2)
695 {
696 const robj *o1 = key1, *o2 = key2;
697 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
698 }
699
700 static unsigned int dictObjHash(const void *key) {
701 const robj *o = key;
702 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
703 }
704
705 static int dictEncObjKeyCompare(void *privdata, const void *key1,
706 const void *key2)
707 {
708 const robj *o1 = key1, *o2 = key2;
709
710 if (o1->encoding == REDIS_ENCODING_RAW &&
711 o2->encoding == REDIS_ENCODING_RAW)
712 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
713 else {
714 robj *dec1, *dec2;
715 int cmp;
716
717 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
718 getDecodedObject(o1) : (robj*)o1;
719 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
720 getDecodedObject(o2) : (robj*)o2;
721 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
722 if (dec1 != o1) decrRefCount(dec1);
723 if (dec2 != o2) decrRefCount(dec2);
724 return cmp;
725 }
726 }
727
728 static unsigned int dictEncObjHash(const void *key) {
729 const robj *o = key;
730
731 if (o->encoding == REDIS_ENCODING_RAW)
732 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
733 else {
734 robj *dec = getDecodedObject(o);
735 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
736 decrRefCount(dec);
737 return hash;
738 }
739 }
740
741 static dictType setDictType = {
742 dictEncObjHash, /* hash function */
743 NULL, /* key dup */
744 NULL, /* val dup */
745 dictEncObjKeyCompare, /* key compare */
746 dictRedisObjectDestructor, /* key destructor */
747 NULL /* val destructor */
748 };
749
750 static dictType zsetDictType = {
751 dictEncObjHash, /* hash function */
752 NULL, /* key dup */
753 NULL, /* val dup */
754 dictEncObjKeyCompare, /* key compare */
755 dictRedisObjectDestructor, /* key destructor */
756 dictVanillaFree /* val destructor */
757 };
758
759 static dictType hashDictType = {
760 dictObjHash, /* hash function */
761 NULL, /* key dup */
762 NULL, /* val dup */
763 dictObjKeyCompare, /* key compare */
764 dictRedisObjectDestructor, /* key destructor */
765 dictRedisObjectDestructor /* val destructor */
766 };
767
768 /* ========================= Random utility functions ======================= */
769
770 /* Redis generally does not try to recover from out of memory conditions
771 * when allocating objects or strings, it is not clear if it will be possible
772 * to report this condition to the client since the networking layer itself
773 * is based on heap allocation for send buffers, so we simply abort.
774 * At least the code will be simpler to read... */
775 static void oom(const char *msg) {
776 fprintf(stderr, "%s: Out of memory\n",msg);
777 fflush(stderr);
778 sleep(1);
779 abort();
780 }
781
782 /* ====================== Redis server networking stuff ===================== */
783 static void closeTimedoutClients(void) {
784 redisClient *c;
785 listNode *ln;
786 time_t now = time(NULL);
787
788 listRewind(server.clients);
789 while ((ln = listYield(server.clients)) != NULL) {
790 c = listNodeValue(ln);
791 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
792 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
793 (now - c->lastinteraction > server.maxidletime)) {
794 redisLog(REDIS_DEBUG,"Closing idle client");
795 freeClient(c);
796 }
797 }
798 }
799
800 static int htNeedsResize(dict *dict) {
801 long long size, used;
802
803 size = dictSlots(dict);
804 used = dictSize(dict);
805 return (size && used && size > DICT_HT_INITIAL_SIZE &&
806 (used*100/size < REDIS_HT_MINFILL));
807 }
808
809 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
810 * we resize the hash table to save memory */
811 static void tryResizeHashTables(void) {
812 int j;
813
814 for (j = 0; j < server.dbnum; j++) {
815 if (htNeedsResize(server.db[j].dict)) {
816 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
817 dictResize(server.db[j].dict);
818 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
819 }
820 if (htNeedsResize(server.db[j].expires))
821 dictResize(server.db[j].expires);
822 }
823 }
824
825 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
826 int j, loops = server.cronloops++;
827 REDIS_NOTUSED(eventLoop);
828 REDIS_NOTUSED(id);
829 REDIS_NOTUSED(clientData);
830
831 /* Update the global state with the amount of used memory */
832 server.usedmemory = zmalloc_used_memory();
833
834 /* Show some info about non-empty databases */
835 for (j = 0; j < server.dbnum; j++) {
836 long long size, used, vkeys;
837
838 size = dictSlots(server.db[j].dict);
839 used = dictSize(server.db[j].dict);
840 vkeys = dictSize(server.db[j].expires);
841 if (!(loops % 5) && (used || vkeys)) {
842 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
843 /* dictPrintStats(server.dict); */
844 }
845 }
846
847 /* We don't want to resize the hash tables while a bacground saving
848 * is in progress: the saving child is created using fork() that is
849 * implemented with a copy-on-write semantic in most modern systems, so
850 * if we resize the HT while there is the saving child at work actually
851 * a lot of memory movements in the parent will cause a lot of pages
852 * copied. */
853 if (!server.bgsaveinprogress) tryResizeHashTables();
854
855 /* Show information about connected clients */
856 if (!(loops % 5)) {
857 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
858 listLength(server.clients)-listLength(server.slaves),
859 listLength(server.slaves),
860 server.usedmemory,
861 dictSize(server.sharingpool));
862 }
863
864 /* Close connections of timedout clients */
865 if (server.maxidletime && !(loops % 10))
866 closeTimedoutClients();
867
868 /* Check if a background saving in progress terminated */
869 if (server.bgsaveinprogress) {
870 int statloc;
871 if (wait4(-1,&statloc,WNOHANG,NULL)) {
872 int exitcode = WEXITSTATUS(statloc);
873 int bysignal = WIFSIGNALED(statloc);
874
875 if (!bysignal && exitcode == 0) {
876 redisLog(REDIS_NOTICE,
877 "Background saving terminated with success");
878 server.dirty = 0;
879 server.lastsave = time(NULL);
880 } else if (!bysignal && exitcode != 0) {
881 redisLog(REDIS_WARNING, "Background saving error");
882 } else {
883 redisLog(REDIS_WARNING,
884 "Background saving terminated by signal");
885 rdbRemoveTempFile(server.bgsavechildpid);
886 }
887 server.bgsaveinprogress = 0;
888 server.bgsavechildpid = -1;
889 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
890 }
891 } else {
892 /* If there is not a background saving in progress check if
893 * we have to save now */
894 time_t now = time(NULL);
895 for (j = 0; j < server.saveparamslen; j++) {
896 struct saveparam *sp = server.saveparams+j;
897
898 if (server.dirty >= sp->changes &&
899 now-server.lastsave > sp->seconds) {
900 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
901 sp->changes, sp->seconds);
902 rdbSaveBackground(server.dbfilename);
903 break;
904 }
905 }
906 }
907
908 /* Try to expire a few timed out keys */
909 for (j = 0; j < server.dbnum; j++) {
910 redisDb *db = server.db+j;
911 int num = dictSize(db->expires);
912
913 if (num) {
914 time_t now = time(NULL);
915
916 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
917 num = REDIS_EXPIRELOOKUPS_PER_CRON;
918 while (num--) {
919 dictEntry *de;
920 time_t t;
921
922 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
923 t = (time_t) dictGetEntryVal(de);
924 if (now > t) {
925 deleteKey(db,dictGetEntryKey(de));
926 }
927 }
928 }
929 }
930
931 /* Check if we should connect to a MASTER */
932 if (server.replstate == REDIS_REPL_CONNECT) {
933 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
934 if (syncWithMaster() == REDIS_OK) {
935 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
936 }
937 }
938 return 1000;
939 }
940
941 static void createSharedObjects(void) {
942 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
943 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
944 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
945 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
946 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
947 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
948 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
949 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
950 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
951 /* no such key */
952 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
953 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
954 "-ERR Operation against a key holding the wrong kind of value\r\n"));
955 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
956 "-ERR no such key\r\n"));
957 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
958 "-ERR syntax error\r\n"));
959 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
960 "-ERR source and destination objects are the same\r\n"));
961 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
962 "-ERR index out of range\r\n"));
963 shared.space = createObject(REDIS_STRING,sdsnew(" "));
964 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
965 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
966 shared.select0 = createStringObject("select 0\r\n",10);
967 shared.select1 = createStringObject("select 1\r\n",10);
968 shared.select2 = createStringObject("select 2\r\n",10);
969 shared.select3 = createStringObject("select 3\r\n",10);
970 shared.select4 = createStringObject("select 4\r\n",10);
971 shared.select5 = createStringObject("select 5\r\n",10);
972 shared.select6 = createStringObject("select 6\r\n",10);
973 shared.select7 = createStringObject("select 7\r\n",10);
974 shared.select8 = createStringObject("select 8\r\n",10);
975 shared.select9 = createStringObject("select 9\r\n",10);
976 }
977
978 static void appendServerSaveParams(time_t seconds, int changes) {
979 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
980 server.saveparams[server.saveparamslen].seconds = seconds;
981 server.saveparams[server.saveparamslen].changes = changes;
982 server.saveparamslen++;
983 }
984
985 static void ResetServerSaveParams() {
986 zfree(server.saveparams);
987 server.saveparams = NULL;
988 server.saveparamslen = 0;
989 }
990
991 static void initServerConfig() {
992 server.dbnum = REDIS_DEFAULT_DBNUM;
993 server.port = REDIS_SERVERPORT;
994 server.verbosity = REDIS_DEBUG;
995 server.maxidletime = REDIS_MAXIDLETIME;
996 server.saveparams = NULL;
997 server.logfile = NULL; /* NULL = log on standard output */
998 server.bindaddr = NULL;
999 server.glueoutputbuf = 1;
1000 server.daemonize = 0;
1001 server.pidfile = "/var/run/redis.pid";
1002 server.dbfilename = "dump.rdb";
1003 server.requirepass = NULL;
1004 server.shareobjects = 0;
1005 server.sharingpoolsize = 1024;
1006 server.maxclients = 0;
1007 server.maxmemory = 0;
1008 ResetServerSaveParams();
1009
1010 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1011 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1012 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1013 /* Replication related */
1014 server.isslave = 0;
1015 server.masterhost = NULL;
1016 server.masterport = 6379;
1017 server.master = NULL;
1018 server.replstate = REDIS_REPL_NONE;
1019 }
1020
1021 static void initServer() {
1022 int j;
1023
1024 signal(SIGHUP, SIG_IGN);
1025 signal(SIGPIPE, SIG_IGN);
1026 setupSigSegvAction();
1027
1028 server.clients = listCreate();
1029 server.slaves = listCreate();
1030 server.monitors = listCreate();
1031 server.objfreelist = listCreate();
1032 createSharedObjects();
1033 server.el = aeCreateEventLoop();
1034 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1035 server.sharingpool = dictCreate(&setDictType,NULL);
1036 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1037 if (server.fd == -1) {
1038 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1039 exit(1);
1040 }
1041 for (j = 0; j < server.dbnum; j++) {
1042 server.db[j].dict = dictCreate(&hashDictType,NULL);
1043 server.db[j].expires = dictCreate(&setDictType,NULL);
1044 server.db[j].id = j;
1045 }
1046 server.cronloops = 0;
1047 server.bgsaveinprogress = 0;
1048 server.bgsavechildpid = -1;
1049 server.lastsave = time(NULL);
1050 server.dirty = 0;
1051 server.usedmemory = 0;
1052 server.stat_numcommands = 0;
1053 server.stat_numconnections = 0;
1054 server.stat_starttime = time(NULL);
1055 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1056 }
1057
1058 /* Empty the whole database */
1059 static long long emptyDb() {
1060 int j;
1061 long long removed = 0;
1062
1063 for (j = 0; j < server.dbnum; j++) {
1064 removed += dictSize(server.db[j].dict);
1065 dictEmpty(server.db[j].dict);
1066 dictEmpty(server.db[j].expires);
1067 }
1068 return removed;
1069 }
1070
1071 static int yesnotoi(char *s) {
1072 if (!strcasecmp(s,"yes")) return 1;
1073 else if (!strcasecmp(s,"no")) return 0;
1074 else return -1;
1075 }
1076
1077 /* I agree, this is a very rudimental way to load a configuration...
1078 will improve later if the config gets more complex */
1079 static void loadServerConfig(char *filename) {
1080 FILE *fp;
1081 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1082 int linenum = 0;
1083 sds line = NULL;
1084
1085 if (filename[0] == '-' && filename[1] == '\0')
1086 fp = stdin;
1087 else {
1088 if ((fp = fopen(filename,"r")) == NULL) {
1089 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1090 exit(1);
1091 }
1092 }
1093
1094 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1095 sds *argv;
1096 int argc, j;
1097
1098 linenum++;
1099 line = sdsnew(buf);
1100 line = sdstrim(line," \t\r\n");
1101
1102 /* Skip comments and blank lines*/
1103 if (line[0] == '#' || line[0] == '\0') {
1104 sdsfree(line);
1105 continue;
1106 }
1107
1108 /* Split into arguments */
1109 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1110 sdstolower(argv[0]);
1111
1112 /* Execute config directives */
1113 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1114 server.maxidletime = atoi(argv[1]);
1115 if (server.maxidletime < 0) {
1116 err = "Invalid timeout value"; goto loaderr;
1117 }
1118 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1119 server.port = atoi(argv[1]);
1120 if (server.port < 1 || server.port > 65535) {
1121 err = "Invalid port"; goto loaderr;
1122 }
1123 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1124 server.bindaddr = zstrdup(argv[1]);
1125 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1126 int seconds = atoi(argv[1]);
1127 int changes = atoi(argv[2]);
1128 if (seconds < 1 || changes < 0) {
1129 err = "Invalid save parameters"; goto loaderr;
1130 }
1131 appendServerSaveParams(seconds,changes);
1132 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1133 if (chdir(argv[1]) == -1) {
1134 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1135 argv[1], strerror(errno));
1136 exit(1);
1137 }
1138 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1139 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1140 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1141 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1142 else {
1143 err = "Invalid log level. Must be one of debug, notice, warning";
1144 goto loaderr;
1145 }
1146 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1147 FILE *logfp;
1148
1149 server.logfile = zstrdup(argv[1]);
1150 if (!strcasecmp(server.logfile,"stdout")) {
1151 zfree(server.logfile);
1152 server.logfile = NULL;
1153 }
1154 if (server.logfile) {
1155 /* Test if we are able to open the file. The server will not
1156 * be able to abort just for this problem later... */
1157 logfp = fopen(server.logfile,"a");
1158 if (logfp == NULL) {
1159 err = sdscatprintf(sdsempty(),
1160 "Can't open the log file: %s", strerror(errno));
1161 goto loaderr;
1162 }
1163 fclose(logfp);
1164 }
1165 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1166 server.dbnum = atoi(argv[1]);
1167 if (server.dbnum < 1) {
1168 err = "Invalid number of databases"; goto loaderr;
1169 }
1170 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1171 server.maxclients = atoi(argv[1]);
1172 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1173 server.maxmemory = strtoll(argv[1], NULL, 10);
1174 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1175 server.masterhost = sdsnew(argv[1]);
1176 server.masterport = atoi(argv[2]);
1177 server.replstate = REDIS_REPL_CONNECT;
1178 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1179 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1180 err = "argument must be 'yes' or 'no'"; goto loaderr;
1181 }
1182 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1183 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1184 err = "argument must be 'yes' or 'no'"; goto loaderr;
1185 }
1186 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1187 server.sharingpoolsize = atoi(argv[1]);
1188 if (server.sharingpoolsize < 1) {
1189 err = "invalid object sharing pool size"; goto loaderr;
1190 }
1191 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1192 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1193 err = "argument must be 'yes' or 'no'"; goto loaderr;
1194 }
1195 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1196 server.requirepass = zstrdup(argv[1]);
1197 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1198 server.pidfile = zstrdup(argv[1]);
1199 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1200 server.dbfilename = zstrdup(argv[1]);
1201 } else {
1202 err = "Bad directive or wrong number of arguments"; goto loaderr;
1203 }
1204 for (j = 0; j < argc; j++)
1205 sdsfree(argv[j]);
1206 zfree(argv);
1207 sdsfree(line);
1208 }
1209 if (fp != stdin) fclose(fp);
1210 return;
1211
1212 loaderr:
1213 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1214 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1215 fprintf(stderr, ">>> '%s'\n", line);
1216 fprintf(stderr, "%s\n", err);
1217 exit(1);
1218 }
1219
1220 static void freeClientArgv(redisClient *c) {
1221 int j;
1222
1223 for (j = 0; j < c->argc; j++)
1224 decrRefCount(c->argv[j]);
1225 for (j = 0; j < c->mbargc; j++)
1226 decrRefCount(c->mbargv[j]);
1227 c->argc = 0;
1228 c->mbargc = 0;
1229 }
1230
1231 static void freeClient(redisClient *c) {
1232 listNode *ln;
1233
1234 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1235 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1236 sdsfree(c->querybuf);
1237 listRelease(c->reply);
1238 freeClientArgv(c);
1239 close(c->fd);
1240 ln = listSearchKey(server.clients,c);
1241 assert(ln != NULL);
1242 listDelNode(server.clients,ln);
1243 if (c->flags & REDIS_SLAVE) {
1244 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1245 close(c->repldbfd);
1246 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1247 ln = listSearchKey(l,c);
1248 assert(ln != NULL);
1249 listDelNode(l,ln);
1250 }
1251 if (c->flags & REDIS_MASTER) {
1252 server.master = NULL;
1253 server.replstate = REDIS_REPL_CONNECT;
1254 }
1255 zfree(c->argv);
1256 zfree(c->mbargv);
1257 zfree(c);
1258 }
1259
1260 static void glueReplyBuffersIfNeeded(redisClient *c) {
1261 int totlen = 0;
1262 listNode *ln;
1263 robj *o;
1264
1265 listRewind(c->reply);
1266 while((ln = listYield(c->reply))) {
1267 o = ln->value;
1268 totlen += sdslen(o->ptr);
1269 /* This optimization makes more sense if we don't have to copy
1270 * too much data */
1271 if (totlen > 1024) return;
1272 }
1273 if (totlen > 0) {
1274 char buf[1024];
1275 int copylen = 0;
1276
1277 listRewind(c->reply);
1278 while((ln = listYield(c->reply))) {
1279 o = ln->value;
1280 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1281 copylen += sdslen(o->ptr);
1282 listDelNode(c->reply,ln);
1283 }
1284 /* Now the output buffer is empty, add the new single element */
1285 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1286 listAddNodeTail(c->reply,o);
1287 }
1288 }
1289
1290 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1291 redisClient *c = privdata;
1292 int nwritten = 0, totwritten = 0, objlen;
1293 robj *o;
1294 REDIS_NOTUSED(el);
1295 REDIS_NOTUSED(mask);
1296
1297 if (server.glueoutputbuf && listLength(c->reply) > 1)
1298 glueReplyBuffersIfNeeded(c);
1299 while(listLength(c->reply)) {
1300 o = listNodeValue(listFirst(c->reply));
1301 objlen = sdslen(o->ptr);
1302
1303 if (objlen == 0) {
1304 listDelNode(c->reply,listFirst(c->reply));
1305 continue;
1306 }
1307
1308 if (c->flags & REDIS_MASTER) {
1309 /* Don't reply to a master */
1310 nwritten = objlen - c->sentlen;
1311 } else {
1312 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1313 if (nwritten <= 0) break;
1314 }
1315 c->sentlen += nwritten;
1316 totwritten += nwritten;
1317 /* If we fully sent the object on head go to the next one */
1318 if (c->sentlen == objlen) {
1319 listDelNode(c->reply,listFirst(c->reply));
1320 c->sentlen = 0;
1321 }
1322 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1323 * bytes, in a single threaded server it's a good idea to server
1324 * other clients as well, even if a very large request comes from
1325 * super fast link that is always able to accept data (in real world
1326 * terms think to 'KEYS *' against the loopback interfae) */
1327 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1328 }
1329 if (nwritten == -1) {
1330 if (errno == EAGAIN) {
1331 nwritten = 0;
1332 } else {
1333 redisLog(REDIS_DEBUG,
1334 "Error writing to client: %s", strerror(errno));
1335 freeClient(c);
1336 return;
1337 }
1338 }
1339 if (totwritten > 0) c->lastinteraction = time(NULL);
1340 if (listLength(c->reply) == 0) {
1341 c->sentlen = 0;
1342 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1343 }
1344 }
1345
1346 static struct redisCommand *lookupCommand(char *name) {
1347 int j = 0;
1348 while(cmdTable[j].name != NULL) {
1349 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1350 j++;
1351 }
1352 return NULL;
1353 }
1354
1355 /* resetClient prepare the client to process the next command */
1356 static void resetClient(redisClient *c) {
1357 freeClientArgv(c);
1358 c->bulklen = -1;
1359 c->multibulk = 0;
1360 }
1361
1362 /* If this function gets called we already read a whole
1363 * command, argments are in the client argv/argc fields.
1364 * processCommand() execute the command or prepare the
1365 * server for a bulk read from the client.
1366 *
1367 * If 1 is returned the client is still alive and valid and
1368 * and other operations can be performed by the caller. Otherwise
1369 * if 0 is returned the client was destroied (i.e. after QUIT). */
1370 static int processCommand(redisClient *c) {
1371 struct redisCommand *cmd;
1372 long long dirty;
1373
1374 /* Free some memory if needed (maxmemory setting) */
1375 if (server.maxmemory) freeMemoryIfNeeded();
1376
1377 /* Handle the multi bulk command type. This is an alternative protocol
1378 * supported by Redis in order to receive commands that are composed of
1379 * multiple binary-safe "bulk" arguments. The latency of processing is
1380 * a bit higher but this allows things like multi-sets, so if this
1381 * protocol is used only for MSET and similar commands this is a big win. */
1382 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1383 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1384 if (c->multibulk <= 0) {
1385 resetClient(c);
1386 return 1;
1387 } else {
1388 decrRefCount(c->argv[c->argc-1]);
1389 c->argc--;
1390 return 1;
1391 }
1392 } else if (c->multibulk) {
1393 if (c->bulklen == -1) {
1394 if (((char*)c->argv[0]->ptr)[0] != '$') {
1395 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1396 resetClient(c);
1397 return 1;
1398 } else {
1399 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1400 decrRefCount(c->argv[0]);
1401 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1402 c->argc--;
1403 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1404 resetClient(c);
1405 return 1;
1406 }
1407 c->argc--;
1408 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1409 return 1;
1410 }
1411 } else {
1412 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1413 c->mbargv[c->mbargc] = c->argv[0];
1414 c->mbargc++;
1415 c->argc--;
1416 c->multibulk--;
1417 if (c->multibulk == 0) {
1418 robj **auxargv;
1419 int auxargc;
1420
1421 /* Here we need to swap the multi-bulk argc/argv with the
1422 * normal argc/argv of the client structure. */
1423 auxargv = c->argv;
1424 c->argv = c->mbargv;
1425 c->mbargv = auxargv;
1426
1427 auxargc = c->argc;
1428 c->argc = c->mbargc;
1429 c->mbargc = auxargc;
1430
1431 /* We need to set bulklen to something different than -1
1432 * in order for the code below to process the command without
1433 * to try to read the last argument of a bulk command as
1434 * a special argument. */
1435 c->bulklen = 0;
1436 /* continue below and process the command */
1437 } else {
1438 c->bulklen = -1;
1439 return 1;
1440 }
1441 }
1442 }
1443 /* -- end of multi bulk commands processing -- */
1444
1445 /* The QUIT command is handled as a special case. Normal command
1446 * procs are unable to close the client connection safely */
1447 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1448 freeClient(c);
1449 return 0;
1450 }
1451 cmd = lookupCommand(c->argv[0]->ptr);
1452 if (!cmd) {
1453 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1454 resetClient(c);
1455 return 1;
1456 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1457 (c->argc < -cmd->arity)) {
1458 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1459 resetClient(c);
1460 return 1;
1461 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1462 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1463 resetClient(c);
1464 return 1;
1465 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1466 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1467
1468 decrRefCount(c->argv[c->argc-1]);
1469 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1470 c->argc--;
1471 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1472 resetClient(c);
1473 return 1;
1474 }
1475 c->argc--;
1476 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1477 /* It is possible that the bulk read is already in the
1478 * buffer. Check this condition and handle it accordingly.
1479 * This is just a fast path, alternative to call processInputBuffer().
1480 * It's a good idea since the code is small and this condition
1481 * happens most of the times. */
1482 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1483 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1484 c->argc++;
1485 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1486 } else {
1487 return 1;
1488 }
1489 }
1490 /* Let's try to share objects on the command arguments vector */
1491 if (server.shareobjects) {
1492 int j;
1493 for(j = 1; j < c->argc; j++)
1494 c->argv[j] = tryObjectSharing(c->argv[j]);
1495 }
1496 /* Let's try to encode the bulk object to save space. */
1497 if (cmd->flags & REDIS_CMD_BULK)
1498 tryObjectEncoding(c->argv[c->argc-1]);
1499
1500 /* Check if the user is authenticated */
1501 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1502 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1503 resetClient(c);
1504 return 1;
1505 }
1506
1507 /* Exec the command */
1508 dirty = server.dirty;
1509 cmd->proc(c);
1510 if (server.dirty-dirty != 0 && listLength(server.slaves))
1511 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1512 if (listLength(server.monitors))
1513 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1514 server.stat_numcommands++;
1515
1516 /* Prepare the client for the next command */
1517 if (c->flags & REDIS_CLOSE) {
1518 freeClient(c);
1519 return 0;
1520 }
1521 resetClient(c);
1522 return 1;
1523 }
1524
1525 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1526 listNode *ln;
1527 int outc = 0, j;
1528 robj **outv;
1529 /* (args*2)+1 is enough room for args, spaces, newlines */
1530 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1531
1532 if (argc <= REDIS_STATIC_ARGS) {
1533 outv = static_outv;
1534 } else {
1535 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1536 }
1537
1538 for (j = 0; j < argc; j++) {
1539 if (j != 0) outv[outc++] = shared.space;
1540 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1541 robj *lenobj;
1542
1543 lenobj = createObject(REDIS_STRING,
1544 sdscatprintf(sdsempty(),"%d\r\n",
1545 stringObjectLen(argv[j])));
1546 lenobj->refcount = 0;
1547 outv[outc++] = lenobj;
1548 }
1549 outv[outc++] = argv[j];
1550 }
1551 outv[outc++] = shared.crlf;
1552
1553 /* Increment all the refcounts at start and decrement at end in order to
1554 * be sure to free objects if there is no slave in a replication state
1555 * able to be feed with commands */
1556 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1557 listRewind(slaves);
1558 while((ln = listYield(slaves))) {
1559 redisClient *slave = ln->value;
1560
1561 /* Don't feed slaves that are still waiting for BGSAVE to start */
1562 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1563
1564 /* Feed all the other slaves, MONITORs and so on */
1565 if (slave->slaveseldb != dictid) {
1566 robj *selectcmd;
1567
1568 switch(dictid) {
1569 case 0: selectcmd = shared.select0; break;
1570 case 1: selectcmd = shared.select1; break;
1571 case 2: selectcmd = shared.select2; break;
1572 case 3: selectcmd = shared.select3; break;
1573 case 4: selectcmd = shared.select4; break;
1574 case 5: selectcmd = shared.select5; break;
1575 case 6: selectcmd = shared.select6; break;
1576 case 7: selectcmd = shared.select7; break;
1577 case 8: selectcmd = shared.select8; break;
1578 case 9: selectcmd = shared.select9; break;
1579 default:
1580 selectcmd = createObject(REDIS_STRING,
1581 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1582 selectcmd->refcount = 0;
1583 break;
1584 }
1585 addReply(slave,selectcmd);
1586 slave->slaveseldb = dictid;
1587 }
1588 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1589 }
1590 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1591 if (outv != static_outv) zfree(outv);
1592 }
1593
1594 static void processInputBuffer(redisClient *c) {
1595 again:
1596 if (c->bulklen == -1) {
1597 /* Read the first line of the query */
1598 char *p = strchr(c->querybuf,'\n');
1599 size_t querylen;
1600
1601 if (p) {
1602 sds query, *argv;
1603 int argc, j;
1604
1605 query = c->querybuf;
1606 c->querybuf = sdsempty();
1607 querylen = 1+(p-(query));
1608 if (sdslen(query) > querylen) {
1609 /* leave data after the first line of the query in the buffer */
1610 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1611 }
1612 *p = '\0'; /* remove "\n" */
1613 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1614 sdsupdatelen(query);
1615
1616 /* Now we can split the query in arguments */
1617 if (sdslen(query) == 0) {
1618 /* Ignore empty query */
1619 sdsfree(query);
1620 return;
1621 }
1622 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1623 sdsfree(query);
1624
1625 if (c->argv) zfree(c->argv);
1626 c->argv = zmalloc(sizeof(robj*)*argc);
1627
1628 for (j = 0; j < argc; j++) {
1629 if (sdslen(argv[j])) {
1630 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1631 c->argc++;
1632 } else {
1633 sdsfree(argv[j]);
1634 }
1635 }
1636 zfree(argv);
1637 /* Execute the command. If the client is still valid
1638 * after processCommand() return and there is something
1639 * on the query buffer try to process the next command. */
1640 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1641 return;
1642 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1643 redisLog(REDIS_DEBUG, "Client protocol error");
1644 freeClient(c);
1645 return;
1646 }
1647 } else {
1648 /* Bulk read handling. Note that if we are at this point
1649 the client already sent a command terminated with a newline,
1650 we are reading the bulk data that is actually the last
1651 argument of the command. */
1652 int qbl = sdslen(c->querybuf);
1653
1654 if (c->bulklen <= qbl) {
1655 /* Copy everything but the final CRLF as final argument */
1656 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1657 c->argc++;
1658 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1659 /* Process the command. If the client is still valid after
1660 * the processing and there is more data in the buffer
1661 * try to parse it. */
1662 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1663 return;
1664 }
1665 }
1666 }
1667
1668 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1669 redisClient *c = (redisClient*) privdata;
1670 char buf[REDIS_IOBUF_LEN];
1671 int nread;
1672 REDIS_NOTUSED(el);
1673 REDIS_NOTUSED(mask);
1674
1675 nread = read(fd, buf, REDIS_IOBUF_LEN);
1676 if (nread == -1) {
1677 if (errno == EAGAIN) {
1678 nread = 0;
1679 } else {
1680 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1681 freeClient(c);
1682 return;
1683 }
1684 } else if (nread == 0) {
1685 redisLog(REDIS_DEBUG, "Client closed connection");
1686 freeClient(c);
1687 return;
1688 }
1689 if (nread) {
1690 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1691 c->lastinteraction = time(NULL);
1692 } else {
1693 return;
1694 }
1695 processInputBuffer(c);
1696 }
1697
1698 static int selectDb(redisClient *c, int id) {
1699 if (id < 0 || id >= server.dbnum)
1700 return REDIS_ERR;
1701 c->db = &server.db[id];
1702 return REDIS_OK;
1703 }
1704
1705 static void *dupClientReplyValue(void *o) {
1706 incrRefCount((robj*)o);
1707 return 0;
1708 }
1709
1710 static redisClient *createClient(int fd) {
1711 redisClient *c = zmalloc(sizeof(*c));
1712
1713 anetNonBlock(NULL,fd);
1714 anetTcpNoDelay(NULL,fd);
1715 if (!c) return NULL;
1716 selectDb(c,0);
1717 c->fd = fd;
1718 c->querybuf = sdsempty();
1719 c->argc = 0;
1720 c->argv = NULL;
1721 c->bulklen = -1;
1722 c->multibulk = 0;
1723 c->mbargc = 0;
1724 c->mbargv = NULL;
1725 c->sentlen = 0;
1726 c->flags = 0;
1727 c->lastinteraction = time(NULL);
1728 c->authenticated = 0;
1729 c->replstate = REDIS_REPL_NONE;
1730 c->reply = listCreate();
1731 listSetFreeMethod(c->reply,decrRefCount);
1732 listSetDupMethod(c->reply,dupClientReplyValue);
1733 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1734 readQueryFromClient, c, NULL) == AE_ERR) {
1735 freeClient(c);
1736 return NULL;
1737 }
1738 listAddNodeTail(server.clients,c);
1739 return c;
1740 }
1741
1742 static void addReply(redisClient *c, robj *obj) {
1743 if (listLength(c->reply) == 0 &&
1744 (c->replstate == REDIS_REPL_NONE ||
1745 c->replstate == REDIS_REPL_ONLINE) &&
1746 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1747 sendReplyToClient, c, NULL) == AE_ERR) return;
1748 if (obj->encoding != REDIS_ENCODING_RAW) {
1749 obj = getDecodedObject(obj);
1750 } else {
1751 incrRefCount(obj);
1752 }
1753 listAddNodeTail(c->reply,obj);
1754 }
1755
1756 static void addReplySds(redisClient *c, sds s) {
1757 robj *o = createObject(REDIS_STRING,s);
1758 addReply(c,o);
1759 decrRefCount(o);
1760 }
1761
1762 static void addReplyBulkLen(redisClient *c, robj *obj) {
1763 size_t len;
1764
1765 if (obj->encoding == REDIS_ENCODING_RAW) {
1766 len = sdslen(obj->ptr);
1767 } else {
1768 long n = (long)obj->ptr;
1769
1770 len = 1;
1771 if (n < 0) {
1772 len++;
1773 n = -n;
1774 }
1775 while((n = n/10) != 0) {
1776 len++;
1777 }
1778 }
1779 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1780 }
1781
1782 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1783 int cport, cfd;
1784 char cip[128];
1785 redisClient *c;
1786 REDIS_NOTUSED(el);
1787 REDIS_NOTUSED(mask);
1788 REDIS_NOTUSED(privdata);
1789
1790 cfd = anetAccept(server.neterr, fd, cip, &cport);
1791 if (cfd == AE_ERR) {
1792 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1793 return;
1794 }
1795 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1796 if ((c = createClient(cfd)) == NULL) {
1797 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1798 close(cfd); /* May be already closed, just ingore errors */
1799 return;
1800 }
1801 /* If maxclient directive is set and this is one client more... close the
1802 * connection. Note that we create the client instead to check before
1803 * for this condition, since now the socket is already set in nonblocking
1804 * mode and we can send an error for free using the Kernel I/O */
1805 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1806 char *err = "-ERR max number of clients reached\r\n";
1807
1808 /* That's a best effort error message, don't check write errors */
1809 (void) write(c->fd,err,strlen(err));
1810 freeClient(c);
1811 return;
1812 }
1813 server.stat_numconnections++;
1814 }
1815
1816 /* ======================= Redis objects implementation ===================== */
1817
1818 static robj *createObject(int type, void *ptr) {
1819 robj *o;
1820
1821 if (listLength(server.objfreelist)) {
1822 listNode *head = listFirst(server.objfreelist);
1823 o = listNodeValue(head);
1824 listDelNode(server.objfreelist,head);
1825 } else {
1826 o = zmalloc(sizeof(*o));
1827 }
1828 o->type = type;
1829 o->encoding = REDIS_ENCODING_RAW;
1830 o->ptr = ptr;
1831 o->refcount = 1;
1832 return o;
1833 }
1834
1835 static robj *createStringObject(char *ptr, size_t len) {
1836 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1837 }
1838
1839 static robj *createListObject(void) {
1840 list *l = listCreate();
1841
1842 listSetFreeMethod(l,decrRefCount);
1843 return createObject(REDIS_LIST,l);
1844 }
1845
1846 static robj *createSetObject(void) {
1847 dict *d = dictCreate(&setDictType,NULL);
1848 return createObject(REDIS_SET,d);
1849 }
1850
1851 static robj *createZsetObject(void) {
1852 zset *zs = zmalloc(sizeof(*zs));
1853
1854 zs->dict = dictCreate(&zsetDictType,NULL);
1855 zs->zsl = zslCreate();
1856 return createObject(REDIS_ZSET,zs);
1857 }
1858
1859 static void freeStringObject(robj *o) {
1860 if (o->encoding == REDIS_ENCODING_RAW) {
1861 sdsfree(o->ptr);
1862 }
1863 }
1864
1865 static void freeListObject(robj *o) {
1866 listRelease((list*) o->ptr);
1867 }
1868
1869 static void freeSetObject(robj *o) {
1870 dictRelease((dict*) o->ptr);
1871 }
1872
1873 static void freeZsetObject(robj *o) {
1874 zset *zs = o->ptr;
1875
1876 dictRelease(zs->dict);
1877 zslFree(zs->zsl);
1878 zfree(zs);
1879 }
1880
1881 static void freeHashObject(robj *o) {
1882 dictRelease((dict*) o->ptr);
1883 }
1884
1885 static void incrRefCount(robj *o) {
1886 o->refcount++;
1887 #ifdef DEBUG_REFCOUNT
1888 if (o->type == REDIS_STRING)
1889 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1890 #endif
1891 }
1892
1893 static void decrRefCount(void *obj) {
1894 robj *o = obj;
1895
1896 #ifdef DEBUG_REFCOUNT
1897 if (o->type == REDIS_STRING)
1898 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1899 #endif
1900 if (--(o->refcount) == 0) {
1901 switch(o->type) {
1902 case REDIS_STRING: freeStringObject(o); break;
1903 case REDIS_LIST: freeListObject(o); break;
1904 case REDIS_SET: freeSetObject(o); break;
1905 case REDIS_ZSET: freeZsetObject(o); break;
1906 case REDIS_HASH: freeHashObject(o); break;
1907 default: assert(0 != 0); break;
1908 }
1909 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1910 !listAddNodeHead(server.objfreelist,o))
1911 zfree(o);
1912 }
1913 }
1914
1915 static robj *lookupKey(redisDb *db, robj *key) {
1916 dictEntry *de = dictFind(db->dict,key);
1917 return de ? dictGetEntryVal(de) : NULL;
1918 }
1919
1920 static robj *lookupKeyRead(redisDb *db, robj *key) {
1921 expireIfNeeded(db,key);
1922 return lookupKey(db,key);
1923 }
1924
1925 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1926 deleteIfVolatile(db,key);
1927 return lookupKey(db,key);
1928 }
1929
1930 static int deleteKey(redisDb *db, robj *key) {
1931 int retval;
1932
1933 /* We need to protect key from destruction: after the first dictDelete()
1934 * it may happen that 'key' is no longer valid if we don't increment
1935 * it's count. This may happen when we get the object reference directly
1936 * from the hash table with dictRandomKey() or dict iterators */
1937 incrRefCount(key);
1938 if (dictSize(db->expires)) dictDelete(db->expires,key);
1939 retval = dictDelete(db->dict,key);
1940 decrRefCount(key);
1941
1942 return retval == DICT_OK;
1943 }
1944
1945 /* Try to share an object against the shared objects pool */
1946 static robj *tryObjectSharing(robj *o) {
1947 struct dictEntry *de;
1948 unsigned long c;
1949
1950 if (o == NULL || server.shareobjects == 0) return o;
1951
1952 assert(o->type == REDIS_STRING);
1953 de = dictFind(server.sharingpool,o);
1954 if (de) {
1955 robj *shared = dictGetEntryKey(de);
1956
1957 c = ((unsigned long) dictGetEntryVal(de))+1;
1958 dictGetEntryVal(de) = (void*) c;
1959 incrRefCount(shared);
1960 decrRefCount(o);
1961 return shared;
1962 } else {
1963 /* Here we are using a stream algorihtm: Every time an object is
1964 * shared we increment its count, everytime there is a miss we
1965 * recrement the counter of a random object. If this object reaches
1966 * zero we remove the object and put the current object instead. */
1967 if (dictSize(server.sharingpool) >=
1968 server.sharingpoolsize) {
1969 de = dictGetRandomKey(server.sharingpool);
1970 assert(de != NULL);
1971 c = ((unsigned long) dictGetEntryVal(de))-1;
1972 dictGetEntryVal(de) = (void*) c;
1973 if (c == 0) {
1974 dictDelete(server.sharingpool,de->key);
1975 }
1976 } else {
1977 c = 0; /* If the pool is empty we want to add this object */
1978 }
1979 if (c == 0) {
1980 int retval;
1981
1982 retval = dictAdd(server.sharingpool,o,(void*)1);
1983 assert(retval == DICT_OK);
1984 incrRefCount(o);
1985 }
1986 return o;
1987 }
1988 }
1989
1990 /* Check if the nul-terminated string 's' can be represented by a long
1991 * (that is, is a number that fits into long without any other space or
1992 * character before or after the digits).
1993 *
1994 * If so, the function returns REDIS_OK and *longval is set to the value
1995 * of the number. Otherwise REDIS_ERR is returned */
1996 static int isStringRepresentableAsLong(sds s, long *longval) {
1997 char buf[32], *endptr;
1998 long value;
1999 int slen;
2000
2001 value = strtol(s, &endptr, 10);
2002 if (endptr[0] != '\0') return REDIS_ERR;
2003 slen = snprintf(buf,32,"%ld",value);
2004
2005 /* If the number converted back into a string is not identical
2006 * then it's not possible to encode the string as integer */
2007 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2008 if (longval) *longval = value;
2009 return REDIS_OK;
2010 }
2011
2012 /* Try to encode a string object in order to save space */
2013 static int tryObjectEncoding(robj *o) {
2014 long value;
2015 sds s = o->ptr;
2016
2017 if (o->encoding != REDIS_ENCODING_RAW)
2018 return REDIS_ERR; /* Already encoded */
2019
2020 /* It's not save to encode shared objects: shared objects can be shared
2021 * everywhere in the "object space" of Redis. Encoded objects can only
2022 * appear as "values" (and not, for instance, as keys) */
2023 if (o->refcount > 1) return REDIS_ERR;
2024
2025 /* Currently we try to encode only strings */
2026 assert(o->type == REDIS_STRING);
2027
2028 /* Check if we can represent this string as a long integer */
2029 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2030
2031 /* Ok, this object can be encoded */
2032 o->encoding = REDIS_ENCODING_INT;
2033 sdsfree(o->ptr);
2034 o->ptr = (void*) value;
2035 return REDIS_OK;
2036 }
2037
2038 /* Get a decoded version of an encoded object (returned as a new object) */
2039 static robj *getDecodedObject(const robj *o) {
2040 robj *dec;
2041
2042 assert(o->encoding != REDIS_ENCODING_RAW);
2043 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2044 char buf[32];
2045
2046 snprintf(buf,32,"%ld",(long)o->ptr);
2047 dec = createStringObject(buf,strlen(buf));
2048 return dec;
2049 } else {
2050 assert(1 != 1);
2051 }
2052 }
2053
2054 static int compareStringObjects(robj *a, robj *b) {
2055 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2056
2057 if (a == b) return 0;
2058 if (a->encoding == REDIS_ENCODING_INT && b->encoding == REDIS_ENCODING_INT){
2059 return (long)a->ptr - (long)b->ptr;
2060 } else {
2061 int retval;
2062
2063 incrRefCount(a);
2064 incrRefCount(b);
2065 if (a->encoding != REDIS_ENCODING_RAW) a = getDecodedObject(a);
2066 if (b->encoding != REDIS_ENCODING_RAW) b = getDecodedObject(a);
2067 retval = sdscmp(a->ptr,b->ptr);
2068 decrRefCount(a);
2069 decrRefCount(b);
2070 return retval;
2071 }
2072 }
2073
2074 static size_t stringObjectLen(robj *o) {
2075 assert(o->type == REDIS_STRING);
2076 if (o->encoding == REDIS_ENCODING_RAW) {
2077 return sdslen(o->ptr);
2078 } else {
2079 char buf[32];
2080
2081 return snprintf(buf,32,"%ld",(long)o->ptr);
2082 }
2083 }
2084
2085 /*============================ DB saving/loading ============================ */
2086
2087 static int rdbSaveType(FILE *fp, unsigned char type) {
2088 if (fwrite(&type,1,1,fp) == 0) return -1;
2089 return 0;
2090 }
2091
2092 static int rdbSaveTime(FILE *fp, time_t t) {
2093 int32_t t32 = (int32_t) t;
2094 if (fwrite(&t32,4,1,fp) == 0) return -1;
2095 return 0;
2096 }
2097
2098 /* check rdbLoadLen() comments for more info */
2099 static int rdbSaveLen(FILE *fp, uint32_t len) {
2100 unsigned char buf[2];
2101
2102 if (len < (1<<6)) {
2103 /* Save a 6 bit len */
2104 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2105 if (fwrite(buf,1,1,fp) == 0) return -1;
2106 } else if (len < (1<<14)) {
2107 /* Save a 14 bit len */
2108 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2109 buf[1] = len&0xFF;
2110 if (fwrite(buf,2,1,fp) == 0) return -1;
2111 } else {
2112 /* Save a 32 bit len */
2113 buf[0] = (REDIS_RDB_32BITLEN<<6);
2114 if (fwrite(buf,1,1,fp) == 0) return -1;
2115 len = htonl(len);
2116 if (fwrite(&len,4,1,fp) == 0) return -1;
2117 }
2118 return 0;
2119 }
2120
2121 /* String objects in the form "2391" "-100" without any space and with a
2122 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2123 * encoded as integers to save space */
2124 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2125 long long value;
2126 char *endptr, buf[32];
2127
2128 /* Check if it's possible to encode this value as a number */
2129 value = strtoll(s, &endptr, 10);
2130 if (endptr[0] != '\0') return 0;
2131 snprintf(buf,32,"%lld",value);
2132
2133 /* If the number converted back into a string is not identical
2134 * then it's not possible to encode the string as integer */
2135 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2136
2137 /* Finally check if it fits in our ranges */
2138 if (value >= -(1<<7) && value <= (1<<7)-1) {
2139 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2140 enc[1] = value&0xFF;
2141 return 2;
2142 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2143 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2144 enc[1] = value&0xFF;
2145 enc[2] = (value>>8)&0xFF;
2146 return 3;
2147 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2148 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2149 enc[1] = value&0xFF;
2150 enc[2] = (value>>8)&0xFF;
2151 enc[3] = (value>>16)&0xFF;
2152 enc[4] = (value>>24)&0xFF;
2153 return 5;
2154 } else {
2155 return 0;
2156 }
2157 }
2158
2159 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2160 unsigned int comprlen, outlen;
2161 unsigned char byte;
2162 void *out;
2163
2164 /* We require at least four bytes compression for this to be worth it */
2165 outlen = sdslen(obj->ptr)-4;
2166 if (outlen <= 0) return 0;
2167 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2168 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2169 if (comprlen == 0) {
2170 zfree(out);
2171 return 0;
2172 }
2173 /* Data compressed! Let's save it on disk */
2174 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2175 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2176 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2177 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2178 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2179 zfree(out);
2180 return comprlen;
2181
2182 writeerr:
2183 zfree(out);
2184 return -1;
2185 }
2186
2187 /* Save a string objet as [len][data] on disk. If the object is a string
2188 * representation of an integer value we try to safe it in a special form */
2189 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2190 size_t len;
2191 int enclen;
2192
2193 len = sdslen(obj->ptr);
2194
2195 /* Try integer encoding */
2196 if (len <= 11) {
2197 unsigned char buf[5];
2198 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2199 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2200 return 0;
2201 }
2202 }
2203
2204 /* Try LZF compression - under 20 bytes it's unable to compress even
2205 * aaaaaaaaaaaaaaaaaa so skip it */
2206 if (len > 20) {
2207 int retval;
2208
2209 retval = rdbSaveLzfStringObject(fp,obj);
2210 if (retval == -1) return -1;
2211 if (retval > 0) return 0;
2212 /* retval == 0 means data can't be compressed, save the old way */
2213 }
2214
2215 /* Store verbatim */
2216 if (rdbSaveLen(fp,len) == -1) return -1;
2217 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2218 return 0;
2219 }
2220
2221 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2222 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2223 int retval;
2224 robj *dec;
2225
2226 if (obj->encoding != REDIS_ENCODING_RAW) {
2227 dec = getDecodedObject(obj);
2228 retval = rdbSaveStringObjectRaw(fp,dec);
2229 decrRefCount(dec);
2230 return retval;
2231 } else {
2232 return rdbSaveStringObjectRaw(fp,obj);
2233 }
2234 }
2235
2236 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2237 static int rdbSave(char *filename) {
2238 dictIterator *di = NULL;
2239 dictEntry *de;
2240 FILE *fp;
2241 char tmpfile[256];
2242 int j;
2243 time_t now = time(NULL);
2244
2245 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2246 fp = fopen(tmpfile,"w");
2247 if (!fp) {
2248 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2249 return REDIS_ERR;
2250 }
2251 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2252 for (j = 0; j < server.dbnum; j++) {
2253 redisDb *db = server.db+j;
2254 dict *d = db->dict;
2255 if (dictSize(d) == 0) continue;
2256 di = dictGetIterator(d);
2257 if (!di) {
2258 fclose(fp);
2259 return REDIS_ERR;
2260 }
2261
2262 /* Write the SELECT DB opcode */
2263 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2264 if (rdbSaveLen(fp,j) == -1) goto werr;
2265
2266 /* Iterate this DB writing every entry */
2267 while((de = dictNext(di)) != NULL) {
2268 robj *key = dictGetEntryKey(de);
2269 robj *o = dictGetEntryVal(de);
2270 time_t expiretime = getExpire(db,key);
2271
2272 /* Save the expire time */
2273 if (expiretime != -1) {
2274 /* If this key is already expired skip it */
2275 if (expiretime < now) continue;
2276 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2277 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2278 }
2279 /* Save the key and associated value */
2280 if (rdbSaveType(fp,o->type) == -1) goto werr;
2281 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2282 if (o->type == REDIS_STRING) {
2283 /* Save a string value */
2284 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2285 } else if (o->type == REDIS_LIST) {
2286 /* Save a list value */
2287 list *list = o->ptr;
2288 listNode *ln;
2289
2290 listRewind(list);
2291 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2292 while((ln = listYield(list))) {
2293 robj *eleobj = listNodeValue(ln);
2294
2295 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2296 }
2297 } else if (o->type == REDIS_SET) {
2298 /* Save a set value */
2299 dict *set = o->ptr;
2300 dictIterator *di = dictGetIterator(set);
2301 dictEntry *de;
2302
2303 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2304 while((de = dictNext(di)) != NULL) {
2305 robj *eleobj = dictGetEntryKey(de);
2306
2307 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2308 }
2309 dictReleaseIterator(di);
2310 } else {
2311 assert(0 != 0);
2312 }
2313 }
2314 dictReleaseIterator(di);
2315 }
2316 /* EOF opcode */
2317 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2318
2319 /* Make sure data will not remain on the OS's output buffers */
2320 fflush(fp);
2321 fsync(fileno(fp));
2322 fclose(fp);
2323
2324 /* Use RENAME to make sure the DB file is changed atomically only
2325 * if the generate DB file is ok. */
2326 if (rename(tmpfile,filename) == -1) {
2327 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
2328 unlink(tmpfile);
2329 return REDIS_ERR;
2330 }
2331 redisLog(REDIS_NOTICE,"DB saved on disk");
2332 server.dirty = 0;
2333 server.lastsave = time(NULL);
2334 return REDIS_OK;
2335
2336 werr:
2337 fclose(fp);
2338 unlink(tmpfile);
2339 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2340 if (di) dictReleaseIterator(di);
2341 return REDIS_ERR;
2342 }
2343
2344 static int rdbSaveBackground(char *filename) {
2345 pid_t childpid;
2346
2347 if (server.bgsaveinprogress) return REDIS_ERR;
2348 if ((childpid = fork()) == 0) {
2349 /* Child */
2350 close(server.fd);
2351 if (rdbSave(filename) == REDIS_OK) {
2352 exit(0);
2353 } else {
2354 exit(1);
2355 }
2356 } else {
2357 /* Parent */
2358 if (childpid == -1) {
2359 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2360 strerror(errno));
2361 return REDIS_ERR;
2362 }
2363 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2364 server.bgsaveinprogress = 1;
2365 server.bgsavechildpid = childpid;
2366 return REDIS_OK;
2367 }
2368 return REDIS_OK; /* unreached */
2369 }
2370
2371 static void rdbRemoveTempFile(pid_t childpid) {
2372 char tmpfile[256];
2373
2374 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2375 unlink(tmpfile);
2376 }
2377
2378 static int rdbLoadType(FILE *fp) {
2379 unsigned char type;
2380 if (fread(&type,1,1,fp) == 0) return -1;
2381 return type;
2382 }
2383
2384 static time_t rdbLoadTime(FILE *fp) {
2385 int32_t t32;
2386 if (fread(&t32,4,1,fp) == 0) return -1;
2387 return (time_t) t32;
2388 }
2389
2390 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2391 * of this file for a description of how this are stored on disk.
2392 *
2393 * isencoded is set to 1 if the readed length is not actually a length but
2394 * an "encoding type", check the above comments for more info */
2395 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2396 unsigned char buf[2];
2397 uint32_t len;
2398
2399 if (isencoded) *isencoded = 0;
2400 if (rdbver == 0) {
2401 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2402 return ntohl(len);
2403 } else {
2404 int type;
2405
2406 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2407 type = (buf[0]&0xC0)>>6;
2408 if (type == REDIS_RDB_6BITLEN) {
2409 /* Read a 6 bit len */
2410 return buf[0]&0x3F;
2411 } else if (type == REDIS_RDB_ENCVAL) {
2412 /* Read a 6 bit len encoding type */
2413 if (isencoded) *isencoded = 1;
2414 return buf[0]&0x3F;
2415 } else if (type == REDIS_RDB_14BITLEN) {
2416 /* Read a 14 bit len */
2417 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2418 return ((buf[0]&0x3F)<<8)|buf[1];
2419 } else {
2420 /* Read a 32 bit len */
2421 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2422 return ntohl(len);
2423 }
2424 }
2425 }
2426
2427 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2428 unsigned char enc[4];
2429 long long val;
2430
2431 if (enctype == REDIS_RDB_ENC_INT8) {
2432 if (fread(enc,1,1,fp) == 0) return NULL;
2433 val = (signed char)enc[0];
2434 } else if (enctype == REDIS_RDB_ENC_INT16) {
2435 uint16_t v;
2436 if (fread(enc,2,1,fp) == 0) return NULL;
2437 v = enc[0]|(enc[1]<<8);
2438 val = (int16_t)v;
2439 } else if (enctype == REDIS_RDB_ENC_INT32) {
2440 uint32_t v;
2441 if (fread(enc,4,1,fp) == 0) return NULL;
2442 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2443 val = (int32_t)v;
2444 } else {
2445 val = 0; /* anti-warning */
2446 assert(0!=0);
2447 }
2448 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2449 }
2450
2451 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2452 unsigned int len, clen;
2453 unsigned char *c = NULL;
2454 sds val = NULL;
2455
2456 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2457 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2458 if ((c = zmalloc(clen)) == NULL) goto err;
2459 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2460 if (fread(c,clen,1,fp) == 0) goto err;
2461 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2462 zfree(c);
2463 return createObject(REDIS_STRING,val);
2464 err:
2465 zfree(c);
2466 sdsfree(val);
2467 return NULL;
2468 }
2469
2470 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2471 int isencoded;
2472 uint32_t len;
2473 sds val;
2474
2475 len = rdbLoadLen(fp,rdbver,&isencoded);
2476 if (isencoded) {
2477 switch(len) {
2478 case REDIS_RDB_ENC_INT8:
2479 case REDIS_RDB_ENC_INT16:
2480 case REDIS_RDB_ENC_INT32:
2481 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2482 case REDIS_RDB_ENC_LZF:
2483 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2484 default:
2485 assert(0!=0);
2486 }
2487 }
2488
2489 if (len == REDIS_RDB_LENERR) return NULL;
2490 val = sdsnewlen(NULL,len);
2491 if (len && fread(val,len,1,fp) == 0) {
2492 sdsfree(val);
2493 return NULL;
2494 }
2495 return tryObjectSharing(createObject(REDIS_STRING,val));
2496 }
2497
2498 static int rdbLoad(char *filename) {
2499 FILE *fp;
2500 robj *keyobj = NULL;
2501 uint32_t dbid;
2502 int type, retval, rdbver;
2503 dict *d = server.db[0].dict;
2504 redisDb *db = server.db+0;
2505 char buf[1024];
2506 time_t expiretime = -1, now = time(NULL);
2507
2508 fp = fopen(filename,"r");
2509 if (!fp) return REDIS_ERR;
2510 if (fread(buf,9,1,fp) == 0) goto eoferr;
2511 buf[9] = '\0';
2512 if (memcmp(buf,"REDIS",5) != 0) {
2513 fclose(fp);
2514 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2515 return REDIS_ERR;
2516 }
2517 rdbver = atoi(buf+5);
2518 if (rdbver > 1) {
2519 fclose(fp);
2520 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2521 return REDIS_ERR;
2522 }
2523 while(1) {
2524 robj *o;
2525
2526 /* Read type. */
2527 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2528 if (type == REDIS_EXPIRETIME) {
2529 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2530 /* We read the time so we need to read the object type again */
2531 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2532 }
2533 if (type == REDIS_EOF) break;
2534 /* Handle SELECT DB opcode as a special case */
2535 if (type == REDIS_SELECTDB) {
2536 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2537 goto eoferr;
2538 if (dbid >= (unsigned)server.dbnum) {
2539 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2540 exit(1);
2541 }
2542 db = server.db+dbid;
2543 d = db->dict;
2544 continue;
2545 }
2546 /* Read key */
2547 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2548
2549 if (type == REDIS_STRING) {
2550 /* Read string value */
2551 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2552 tryObjectEncoding(o);
2553 } else if (type == REDIS_LIST || type == REDIS_SET) {
2554 /* Read list/set value */
2555 uint32_t listlen;
2556
2557 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2558 goto eoferr;
2559 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2560 /* Load every single element of the list/set */
2561 while(listlen--) {
2562 robj *ele;
2563
2564 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2565 tryObjectEncoding(ele);
2566 if (type == REDIS_LIST) {
2567 listAddNodeTail((list*)o->ptr,ele);
2568 } else {
2569 dictAdd((dict*)o->ptr,ele,NULL);
2570 }
2571 }
2572 } else {
2573 assert(0 != 0);
2574 }
2575 /* Add the new object in the hash table */
2576 retval = dictAdd(d,keyobj,o);
2577 if (retval == DICT_ERR) {
2578 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2579 exit(1);
2580 }
2581 /* Set the expire time if needed */
2582 if (expiretime != -1) {
2583 setExpire(db,keyobj,expiretime);
2584 /* Delete this key if already expired */
2585 if (expiretime < now) deleteKey(db,keyobj);
2586 expiretime = -1;
2587 }
2588 keyobj = o = NULL;
2589 }
2590 fclose(fp);
2591 return REDIS_OK;
2592
2593 eoferr: /* unexpected end of file is handled here with a fatal exit */
2594 if (keyobj) decrRefCount(keyobj);
2595 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2596 exit(1);
2597 return REDIS_ERR; /* Just to avoid warning */
2598 }
2599
2600 /*================================== Commands =============================== */
2601
2602 static void authCommand(redisClient *c) {
2603 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2604 c->authenticated = 1;
2605 addReply(c,shared.ok);
2606 } else {
2607 c->authenticated = 0;
2608 addReply(c,shared.err);
2609 }
2610 }
2611
2612 static void pingCommand(redisClient *c) {
2613 addReply(c,shared.pong);
2614 }
2615
2616 static void echoCommand(redisClient *c) {
2617 addReplyBulkLen(c,c->argv[1]);
2618 addReply(c,c->argv[1]);
2619 addReply(c,shared.crlf);
2620 }
2621
2622 /*=================================== Strings =============================== */
2623
2624 static void setGenericCommand(redisClient *c, int nx) {
2625 int retval;
2626
2627 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2628 if (retval == DICT_ERR) {
2629 if (!nx) {
2630 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2631 incrRefCount(c->argv[2]);
2632 } else {
2633 addReply(c,shared.czero);
2634 return;
2635 }
2636 } else {
2637 incrRefCount(c->argv[1]);
2638 incrRefCount(c->argv[2]);
2639 }
2640 server.dirty++;
2641 removeExpire(c->db,c->argv[1]);
2642 addReply(c, nx ? shared.cone : shared.ok);
2643 }
2644
2645 static void setCommand(redisClient *c) {
2646 setGenericCommand(c,0);
2647 }
2648
2649 static void setnxCommand(redisClient *c) {
2650 setGenericCommand(c,1);
2651 }
2652
2653 static void getCommand(redisClient *c) {
2654 robj *o = lookupKeyRead(c->db,c->argv[1]);
2655
2656 if (o == NULL) {
2657 addReply(c,shared.nullbulk);
2658 } else {
2659 if (o->type != REDIS_STRING) {
2660 addReply(c,shared.wrongtypeerr);
2661 } else {
2662 addReplyBulkLen(c,o);
2663 addReply(c,o);
2664 addReply(c,shared.crlf);
2665 }
2666 }
2667 }
2668
2669 static void getsetCommand(redisClient *c) {
2670 getCommand(c);
2671 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2672 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2673 } else {
2674 incrRefCount(c->argv[1]);
2675 }
2676 incrRefCount(c->argv[2]);
2677 server.dirty++;
2678 removeExpire(c->db,c->argv[1]);
2679 }
2680
2681 static void mgetCommand(redisClient *c) {
2682 int j;
2683
2684 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2685 for (j = 1; j < c->argc; j++) {
2686 robj *o = lookupKeyRead(c->db,c->argv[j]);
2687 if (o == NULL) {
2688 addReply(c,shared.nullbulk);
2689 } else {
2690 if (o->type != REDIS_STRING) {
2691 addReply(c,shared.nullbulk);
2692 } else {
2693 addReplyBulkLen(c,o);
2694 addReply(c,o);
2695 addReply(c,shared.crlf);
2696 }
2697 }
2698 }
2699 }
2700
2701 static void incrDecrCommand(redisClient *c, long long incr) {
2702 long long value;
2703 int retval;
2704 robj *o;
2705
2706 o = lookupKeyWrite(c->db,c->argv[1]);
2707 if (o == NULL) {
2708 value = 0;
2709 } else {
2710 if (o->type != REDIS_STRING) {
2711 value = 0;
2712 } else {
2713 char *eptr;
2714
2715 if (o->encoding == REDIS_ENCODING_RAW)
2716 value = strtoll(o->ptr, &eptr, 10);
2717 else if (o->encoding == REDIS_ENCODING_INT)
2718 value = (long)o->ptr;
2719 else
2720 assert(1 != 1);
2721 }
2722 }
2723
2724 value += incr;
2725 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2726 tryObjectEncoding(o);
2727 retval = dictAdd(c->db->dict,c->argv[1],o);
2728 if (retval == DICT_ERR) {
2729 dictReplace(c->db->dict,c->argv[1],o);
2730 removeExpire(c->db,c->argv[1]);
2731 } else {
2732 incrRefCount(c->argv[1]);
2733 }
2734 server.dirty++;
2735 addReply(c,shared.colon);
2736 addReply(c,o);
2737 addReply(c,shared.crlf);
2738 }
2739
2740 static void incrCommand(redisClient *c) {
2741 incrDecrCommand(c,1);
2742 }
2743
2744 static void decrCommand(redisClient *c) {
2745 incrDecrCommand(c,-1);
2746 }
2747
2748 static void incrbyCommand(redisClient *c) {
2749 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2750 incrDecrCommand(c,incr);
2751 }
2752
2753 static void decrbyCommand(redisClient *c) {
2754 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2755 incrDecrCommand(c,-incr);
2756 }
2757
2758 /* ========================= Type agnostic commands ========================= */
2759
2760 static void delCommand(redisClient *c) {
2761 int deleted = 0, j;
2762
2763 for (j = 1; j < c->argc; j++) {
2764 if (deleteKey(c->db,c->argv[j])) {
2765 server.dirty++;
2766 deleted++;
2767 }
2768 }
2769 switch(deleted) {
2770 case 0:
2771 addReply(c,shared.czero);
2772 break;
2773 case 1:
2774 addReply(c,shared.cone);
2775 break;
2776 default:
2777 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2778 break;
2779 }
2780 }
2781
2782 static void existsCommand(redisClient *c) {
2783 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2784 }
2785
2786 static void selectCommand(redisClient *c) {
2787 int id = atoi(c->argv[1]->ptr);
2788
2789 if (selectDb(c,id) == REDIS_ERR) {
2790 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2791 } else {
2792 addReply(c,shared.ok);
2793 }
2794 }
2795
2796 static void randomkeyCommand(redisClient *c) {
2797 dictEntry *de;
2798
2799 while(1) {
2800 de = dictGetRandomKey(c->db->dict);
2801 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2802 }
2803 if (de == NULL) {
2804 addReply(c,shared.plus);
2805 addReply(c,shared.crlf);
2806 } else {
2807 addReply(c,shared.plus);
2808 addReply(c,dictGetEntryKey(de));
2809 addReply(c,shared.crlf);
2810 }
2811 }
2812
2813 static void keysCommand(redisClient *c) {
2814 dictIterator *di;
2815 dictEntry *de;
2816 sds pattern = c->argv[1]->ptr;
2817 int plen = sdslen(pattern);
2818 int numkeys = 0, keyslen = 0;
2819 robj *lenobj = createObject(REDIS_STRING,NULL);
2820
2821 di = dictGetIterator(c->db->dict);
2822 addReply(c,lenobj);
2823 decrRefCount(lenobj);
2824 while((de = dictNext(di)) != NULL) {
2825 robj *keyobj = dictGetEntryKey(de);
2826
2827 sds key = keyobj->ptr;
2828 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2829 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2830 if (expireIfNeeded(c->db,keyobj) == 0) {
2831 if (numkeys != 0)
2832 addReply(c,shared.space);
2833 addReply(c,keyobj);
2834 numkeys++;
2835 keyslen += sdslen(key);
2836 }
2837 }
2838 }
2839 dictReleaseIterator(di);
2840 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2841 addReply(c,shared.crlf);
2842 }
2843
2844 static void dbsizeCommand(redisClient *c) {
2845 addReplySds(c,
2846 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2847 }
2848
2849 static void lastsaveCommand(redisClient *c) {
2850 addReplySds(c,
2851 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2852 }
2853
2854 static void typeCommand(redisClient *c) {
2855 robj *o;
2856 char *type;
2857
2858 o = lookupKeyRead(c->db,c->argv[1]);
2859 if (o == NULL) {
2860 type = "+none";
2861 } else {
2862 switch(o->type) {
2863 case REDIS_STRING: type = "+string"; break;
2864 case REDIS_LIST: type = "+list"; break;
2865 case REDIS_SET: type = "+set"; break;
2866 default: type = "unknown"; break;
2867 }
2868 }
2869 addReplySds(c,sdsnew(type));
2870 addReply(c,shared.crlf);
2871 }
2872
2873 static void saveCommand(redisClient *c) {
2874 if (server.bgsaveinprogress) {
2875 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2876 return;
2877 }
2878 if (rdbSave(server.dbfilename) == REDIS_OK) {
2879 addReply(c,shared.ok);
2880 } else {
2881 addReply(c,shared.err);
2882 }
2883 }
2884
2885 static void bgsaveCommand(redisClient *c) {
2886 if (server.bgsaveinprogress) {
2887 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2888 return;
2889 }
2890 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2891 addReply(c,shared.ok);
2892 } else {
2893 addReply(c,shared.err);
2894 }
2895 }
2896
2897 static void shutdownCommand(redisClient *c) {
2898 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2899 /* Kill the saving child if there is a background saving in progress.
2900 We want to avoid race conditions, for instance our saving child may
2901 overwrite the synchronous saving did by SHUTDOWN. */
2902 if (server.bgsaveinprogress) {
2903 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2904 kill(server.bgsavechildpid,SIGKILL);
2905 rdbRemoveTempFile(server.bgsavechildpid);
2906 }
2907 /* SYNC SAVE */
2908 if (rdbSave(server.dbfilename) == REDIS_OK) {
2909 if (server.daemonize)
2910 unlink(server.pidfile);
2911 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2912 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2913 exit(1);
2914 } else {
2915 /* Ooops.. error saving! The best we can do is to continue operating.
2916 * Note that if there was a background saving process, in the next
2917 * cron() Redis will be notified that the background saving aborted,
2918 * handling special stuff like slaves pending for synchronization... */
2919 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2920 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2921 }
2922 }
2923
2924 static void renameGenericCommand(redisClient *c, int nx) {
2925 robj *o;
2926
2927 /* To use the same key as src and dst is probably an error */
2928 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2929 addReply(c,shared.sameobjecterr);
2930 return;
2931 }
2932
2933 o = lookupKeyWrite(c->db,c->argv[1]);
2934 if (o == NULL) {
2935 addReply(c,shared.nokeyerr);
2936 return;
2937 }
2938 incrRefCount(o);
2939 deleteIfVolatile(c->db,c->argv[2]);
2940 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2941 if (nx) {
2942 decrRefCount(o);
2943 addReply(c,shared.czero);
2944 return;
2945 }
2946 dictReplace(c->db->dict,c->argv[2],o);
2947 } else {
2948 incrRefCount(c->argv[2]);
2949 }
2950 deleteKey(c->db,c->argv[1]);
2951 server.dirty++;
2952 addReply(c,nx ? shared.cone : shared.ok);
2953 }
2954
2955 static void renameCommand(redisClient *c) {
2956 renameGenericCommand(c,0);
2957 }
2958
2959 static void renamenxCommand(redisClient *c) {
2960 renameGenericCommand(c,1);
2961 }
2962
2963 static void moveCommand(redisClient *c) {
2964 robj *o;
2965 redisDb *src, *dst;
2966 int srcid;
2967
2968 /* Obtain source and target DB pointers */
2969 src = c->db;
2970 srcid = c->db->id;
2971 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2972 addReply(c,shared.outofrangeerr);
2973 return;
2974 }
2975 dst = c->db;
2976 selectDb(c,srcid); /* Back to the source DB */
2977
2978 /* If the user is moving using as target the same
2979 * DB as the source DB it is probably an error. */
2980 if (src == dst) {
2981 addReply(c,shared.sameobjecterr);
2982 return;
2983 }
2984
2985 /* Check if the element exists and get a reference */
2986 o = lookupKeyWrite(c->db,c->argv[1]);
2987 if (!o) {
2988 addReply(c,shared.czero);
2989 return;
2990 }
2991
2992 /* Try to add the element to the target DB */
2993 deleteIfVolatile(dst,c->argv[1]);
2994 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2995 addReply(c,shared.czero);
2996 return;
2997 }
2998 incrRefCount(c->argv[1]);
2999 incrRefCount(o);
3000
3001 /* OK! key moved, free the entry in the source DB */
3002 deleteKey(src,c->argv[1]);
3003 server.dirty++;
3004 addReply(c,shared.cone);
3005 }
3006
3007 /* =================================== Lists ================================ */
3008 static void pushGenericCommand(redisClient *c, int where) {
3009 robj *lobj;
3010 list *list;
3011
3012 lobj = lookupKeyWrite(c->db,c->argv[1]);
3013 if (lobj == NULL) {
3014 lobj = createListObject();
3015 list = lobj->ptr;
3016 if (where == REDIS_HEAD) {
3017 listAddNodeHead(list,c->argv[2]);
3018 } else {
3019 listAddNodeTail(list,c->argv[2]);
3020 }
3021 dictAdd(c->db->dict,c->argv[1],lobj);
3022 incrRefCount(c->argv[1]);
3023 incrRefCount(c->argv[2]);
3024 } else {
3025 if (lobj->type != REDIS_LIST) {
3026 addReply(c,shared.wrongtypeerr);
3027 return;
3028 }
3029 list = lobj->ptr;
3030 if (where == REDIS_HEAD) {
3031 listAddNodeHead(list,c->argv[2]);
3032 } else {
3033 listAddNodeTail(list,c->argv[2]);
3034 }
3035 incrRefCount(c->argv[2]);
3036 }
3037 server.dirty++;
3038 addReply(c,shared.ok);
3039 }
3040
3041 static void lpushCommand(redisClient *c) {
3042 pushGenericCommand(c,REDIS_HEAD);
3043 }
3044
3045 static void rpushCommand(redisClient *c) {
3046 pushGenericCommand(c,REDIS_TAIL);
3047 }
3048
3049 static void llenCommand(redisClient *c) {
3050 robj *o;
3051 list *l;
3052
3053 o = lookupKeyRead(c->db,c->argv[1]);
3054 if (o == NULL) {
3055 addReply(c,shared.czero);
3056 return;
3057 } else {
3058 if (o->type != REDIS_LIST) {
3059 addReply(c,shared.wrongtypeerr);
3060 } else {
3061 l = o->ptr;
3062 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3063 }
3064 }
3065 }
3066
3067 static void lindexCommand(redisClient *c) {
3068 robj *o;
3069 int index = atoi(c->argv[2]->ptr);
3070
3071 o = lookupKeyRead(c->db,c->argv[1]);
3072 if (o == NULL) {
3073 addReply(c,shared.nullbulk);
3074 } else {
3075 if (o->type != REDIS_LIST) {
3076 addReply(c,shared.wrongtypeerr);
3077 } else {
3078 list *list = o->ptr;
3079 listNode *ln;
3080
3081 ln = listIndex(list, index);
3082 if (ln == NULL) {
3083 addReply(c,shared.nullbulk);
3084 } else {
3085 robj *ele = listNodeValue(ln);
3086 addReplyBulkLen(c,ele);
3087 addReply(c,ele);
3088 addReply(c,shared.crlf);
3089 }
3090 }
3091 }
3092 }
3093
3094 static void lsetCommand(redisClient *c) {
3095 robj *o;
3096 int index = atoi(c->argv[2]->ptr);
3097
3098 o = lookupKeyWrite(c->db,c->argv[1]);
3099 if (o == NULL) {
3100 addReply(c,shared.nokeyerr);
3101 } else {
3102 if (o->type != REDIS_LIST) {
3103 addReply(c,shared.wrongtypeerr);
3104 } else {
3105 list *list = o->ptr;
3106 listNode *ln;
3107
3108 ln = listIndex(list, index);
3109 if (ln == NULL) {
3110 addReply(c,shared.outofrangeerr);
3111 } else {
3112 robj *ele = listNodeValue(ln);
3113
3114 decrRefCount(ele);
3115 listNodeValue(ln) = c->argv[3];
3116 incrRefCount(c->argv[3]);
3117 addReply(c,shared.ok);
3118 server.dirty++;
3119 }
3120 }
3121 }
3122 }
3123
3124 static void popGenericCommand(redisClient *c, int where) {
3125 robj *o;
3126
3127 o = lookupKeyWrite(c->db,c->argv[1]);
3128 if (o == NULL) {
3129 addReply(c,shared.nullbulk);
3130 } else {
3131 if (o->type != REDIS_LIST) {
3132 addReply(c,shared.wrongtypeerr);
3133 } else {
3134 list *list = o->ptr;
3135 listNode *ln;
3136
3137 if (where == REDIS_HEAD)
3138 ln = listFirst(list);
3139 else
3140 ln = listLast(list);
3141
3142 if (ln == NULL) {
3143 addReply(c,shared.nullbulk);
3144 } else {
3145 robj *ele = listNodeValue(ln);
3146 addReplyBulkLen(c,ele);
3147 addReply(c,ele);
3148 addReply(c,shared.crlf);
3149 listDelNode(list,ln);
3150 server.dirty++;
3151 }
3152 }
3153 }
3154 }
3155
3156 static void lpopCommand(redisClient *c) {
3157 popGenericCommand(c,REDIS_HEAD);
3158 }
3159
3160 static void rpopCommand(redisClient *c) {
3161 popGenericCommand(c,REDIS_TAIL);
3162 }
3163
3164 static void lrangeCommand(redisClient *c) {
3165 robj *o;
3166 int start = atoi(c->argv[2]->ptr);
3167 int end = atoi(c->argv[3]->ptr);
3168
3169 o = lookupKeyRead(c->db,c->argv[1]);
3170 if (o == NULL) {
3171 addReply(c,shared.nullmultibulk);
3172 } else {
3173 if (o->type != REDIS_LIST) {
3174 addReply(c,shared.wrongtypeerr);
3175 } else {
3176 list *list = o->ptr;
3177 listNode *ln;
3178 int llen = listLength(list);
3179 int rangelen, j;
3180 robj *ele;
3181
3182 /* convert negative indexes */
3183 if (start < 0) start = llen+start;
3184 if (end < 0) end = llen+end;
3185 if (start < 0) start = 0;
3186 if (end < 0) end = 0;
3187
3188 /* indexes sanity checks */
3189 if (start > end || start >= llen) {
3190 /* Out of range start or start > end result in empty list */
3191 addReply(c,shared.emptymultibulk);
3192 return;
3193 }
3194 if (end >= llen) end = llen-1;
3195 rangelen = (end-start)+1;
3196
3197 /* Return the result in form of a multi-bulk reply */
3198 ln = listIndex(list, start);
3199 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3200 for (j = 0; j < rangelen; j++) {
3201 ele = listNodeValue(ln);
3202 addReplyBulkLen(c,ele);
3203 addReply(c,ele);
3204 addReply(c,shared.crlf);
3205 ln = ln->next;
3206 }
3207 }
3208 }
3209 }
3210
3211 static void ltrimCommand(redisClient *c) {
3212 robj *o;
3213 int start = atoi(c->argv[2]->ptr);
3214 int end = atoi(c->argv[3]->ptr);
3215
3216 o = lookupKeyWrite(c->db,c->argv[1]);
3217 if (o == NULL) {
3218 addReply(c,shared.nokeyerr);
3219 } else {
3220 if (o->type != REDIS_LIST) {
3221 addReply(c,shared.wrongtypeerr);
3222 } else {
3223 list *list = o->ptr;
3224 listNode *ln;
3225 int llen = listLength(list);
3226 int j, ltrim, rtrim;
3227
3228 /* convert negative indexes */
3229 if (start < 0) start = llen+start;
3230 if (end < 0) end = llen+end;
3231 if (start < 0) start = 0;
3232 if (end < 0) end = 0;
3233
3234 /* indexes sanity checks */
3235 if (start > end || start >= llen) {
3236 /* Out of range start or start > end result in empty list */
3237 ltrim = llen;
3238 rtrim = 0;
3239 } else {
3240 if (end >= llen) end = llen-1;
3241 ltrim = start;
3242 rtrim = llen-end-1;
3243 }
3244
3245 /* Remove list elements to perform the trim */
3246 for (j = 0; j < ltrim; j++) {
3247 ln = listFirst(list);
3248 listDelNode(list,ln);
3249 }
3250 for (j = 0; j < rtrim; j++) {
3251 ln = listLast(list);
3252 listDelNode(list,ln);
3253 }
3254 server.dirty++;
3255 addReply(c,shared.ok);
3256 }
3257 }
3258 }
3259
3260 static void lremCommand(redisClient *c) {
3261 robj *o;
3262
3263 o = lookupKeyWrite(c->db,c->argv[1]);
3264 if (o == NULL) {
3265 addReply(c,shared.czero);
3266 } else {
3267 if (o->type != REDIS_LIST) {
3268 addReply(c,shared.wrongtypeerr);
3269 } else {
3270 list *list = o->ptr;
3271 listNode *ln, *next;
3272 int toremove = atoi(c->argv[2]->ptr);
3273 int removed = 0;
3274 int fromtail = 0;
3275
3276 if (toremove < 0) {
3277 toremove = -toremove;
3278 fromtail = 1;
3279 }
3280 ln = fromtail ? list->tail : list->head;
3281 while (ln) {
3282 robj *ele = listNodeValue(ln);
3283
3284 next = fromtail ? ln->prev : ln->next;
3285 if (compareStringObjects(ele,c->argv[3]) == 0) {
3286 listDelNode(list,ln);
3287 server.dirty++;
3288 removed++;
3289 if (toremove && removed == toremove) break;
3290 }
3291 ln = next;
3292 }
3293 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3294 }
3295 }
3296 }
3297
3298 /* ==================================== Sets ================================ */
3299
3300 static void saddCommand(redisClient *c) {
3301 robj *set;
3302
3303 set = lookupKeyWrite(c->db,c->argv[1]);
3304 if (set == NULL) {
3305 set = createSetObject();
3306 dictAdd(c->db->dict,c->argv[1],set);
3307 incrRefCount(c->argv[1]);
3308 } else {
3309 if (set->type != REDIS_SET) {
3310 addReply(c,shared.wrongtypeerr);
3311 return;
3312 }
3313 }
3314 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3315 incrRefCount(c->argv[2]);
3316 server.dirty++;
3317 addReply(c,shared.cone);
3318 } else {
3319 addReply(c,shared.czero);
3320 }
3321 }
3322
3323 static void sremCommand(redisClient *c) {
3324 robj *set;
3325
3326 set = lookupKeyWrite(c->db,c->argv[1]);
3327 if (set == NULL) {
3328 addReply(c,shared.czero);
3329 } else {
3330 if (set->type != REDIS_SET) {
3331 addReply(c,shared.wrongtypeerr);
3332 return;
3333 }
3334 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3335 server.dirty++;
3336 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3337 addReply(c,shared.cone);
3338 } else {
3339 addReply(c,shared.czero);
3340 }
3341 }
3342 }
3343
3344 static void smoveCommand(redisClient *c) {
3345 robj *srcset, *dstset;
3346
3347 srcset = lookupKeyWrite(c->db,c->argv[1]);
3348 dstset = lookupKeyWrite(c->db,c->argv[2]);
3349
3350 /* If the source key does not exist return 0, if it's of the wrong type
3351 * raise an error */
3352 if (srcset == NULL || srcset->type != REDIS_SET) {
3353 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3354 return;
3355 }
3356 /* Error if the destination key is not a set as well */
3357 if (dstset && dstset->type != REDIS_SET) {
3358 addReply(c,shared.wrongtypeerr);
3359 return;
3360 }
3361 /* Remove the element from the source set */
3362 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3363 /* Key not found in the src set! return zero */
3364 addReply(c,shared.czero);
3365 return;
3366 }
3367 server.dirty++;
3368 /* Add the element to the destination set */
3369 if (!dstset) {
3370 dstset = createSetObject();
3371 dictAdd(c->db->dict,c->argv[2],dstset);
3372 incrRefCount(c->argv[2]);
3373 }
3374 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3375 incrRefCount(c->argv[3]);
3376 addReply(c,shared.cone);
3377 }
3378
3379 static void sismemberCommand(redisClient *c) {
3380 robj *set;
3381
3382 set = lookupKeyRead(c->db,c->argv[1]);
3383 if (set == NULL) {
3384 addReply(c,shared.czero);
3385 } else {
3386 if (set->type != REDIS_SET) {
3387 addReply(c,shared.wrongtypeerr);
3388 return;
3389 }
3390 if (dictFind(set->ptr,c->argv[2]))
3391 addReply(c,shared.cone);
3392 else
3393 addReply(c,shared.czero);
3394 }
3395 }
3396
3397 static void scardCommand(redisClient *c) {
3398 robj *o;
3399 dict *s;
3400
3401 o = lookupKeyRead(c->db,c->argv[1]);
3402 if (o == NULL) {
3403 addReply(c,shared.czero);
3404 return;
3405 } else {
3406 if (o->type != REDIS_SET) {
3407 addReply(c,shared.wrongtypeerr);
3408 } else {
3409 s = o->ptr;
3410 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3411 dictSize(s)));
3412 }
3413 }
3414 }
3415
3416 static void spopCommand(redisClient *c) {
3417 robj *set;
3418 dictEntry *de;
3419
3420 set = lookupKeyWrite(c->db,c->argv[1]);
3421 if (set == NULL) {
3422 addReply(c,shared.nullbulk);
3423 } else {
3424 if (set->type != REDIS_SET) {
3425 addReply(c,shared.wrongtypeerr);
3426 return;
3427 }
3428 de = dictGetRandomKey(set->ptr);
3429 if (de == NULL) {
3430 addReply(c,shared.nullbulk);
3431 } else {
3432 robj *ele = dictGetEntryKey(de);
3433
3434 addReplyBulkLen(c,ele);
3435 addReply(c,ele);
3436 addReply(c,shared.crlf);
3437 dictDelete(set->ptr,ele);
3438 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3439 server.dirty++;
3440 }
3441 }
3442 }
3443
3444 static void srandmemberCommand(redisClient *c) {
3445 robj *set;
3446 dictEntry *de;
3447
3448 set = lookupKeyRead(c->db,c->argv[1]);
3449 if (set == NULL) {
3450 addReply(c,shared.nullbulk);
3451 } else {
3452 if (set->type != REDIS_SET) {
3453 addReply(c,shared.wrongtypeerr);
3454 return;
3455 }
3456 de = dictGetRandomKey(set->ptr);
3457 if (de == NULL) {
3458 addReply(c,shared.nullbulk);
3459 } else {
3460 robj *ele = dictGetEntryKey(de);
3461
3462 addReplyBulkLen(c,ele);
3463 addReply(c,ele);
3464 addReply(c,shared.crlf);
3465 }
3466 }
3467 }
3468
3469 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3470 dict **d1 = (void*) s1, **d2 = (void*) s2;
3471
3472 return dictSize(*d1)-dictSize(*d2);
3473 }
3474
3475 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3476 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3477 dictIterator *di;
3478 dictEntry *de;
3479 robj *lenobj = NULL, *dstset = NULL;
3480 int j, cardinality = 0;
3481
3482 for (j = 0; j < setsnum; j++) {
3483 robj *setobj;
3484
3485 setobj = dstkey ?
3486 lookupKeyWrite(c->db,setskeys[j]) :
3487 lookupKeyRead(c->db,setskeys[j]);
3488 if (!setobj) {
3489 zfree(dv);
3490 if (dstkey) {
3491 deleteKey(c->db,dstkey);
3492 addReply(c,shared.ok);
3493 } else {
3494 addReply(c,shared.nullmultibulk);
3495 }
3496 return;
3497 }
3498 if (setobj->type != REDIS_SET) {
3499 zfree(dv);
3500 addReply(c,shared.wrongtypeerr);
3501 return;
3502 }
3503 dv[j] = setobj->ptr;
3504 }
3505 /* Sort sets from the smallest to largest, this will improve our
3506 * algorithm's performace */
3507 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3508
3509 /* The first thing we should output is the total number of elements...
3510 * since this is a multi-bulk write, but at this stage we don't know
3511 * the intersection set size, so we use a trick, append an empty object
3512 * to the output list and save the pointer to later modify it with the
3513 * right length */
3514 if (!dstkey) {
3515 lenobj = createObject(REDIS_STRING,NULL);
3516 addReply(c,lenobj);
3517 decrRefCount(lenobj);
3518 } else {
3519 /* If we have a target key where to store the resulting set
3520 * create this key with an empty set inside */
3521 dstset = createSetObject();
3522 }
3523
3524 /* Iterate all the elements of the first (smallest) set, and test
3525 * the element against all the other sets, if at least one set does
3526 * not include the element it is discarded */
3527 di = dictGetIterator(dv[0]);
3528
3529 while((de = dictNext(di)) != NULL) {
3530 robj *ele;
3531
3532 for (j = 1; j < setsnum; j++)
3533 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3534 if (j != setsnum)
3535 continue; /* at least one set does not contain the member */
3536 ele = dictGetEntryKey(de);
3537 if (!dstkey) {
3538 addReplyBulkLen(c,ele);
3539 addReply(c,ele);
3540 addReply(c,shared.crlf);
3541 cardinality++;
3542 } else {
3543 dictAdd(dstset->ptr,ele,NULL);
3544 incrRefCount(ele);
3545 }
3546 }
3547 dictReleaseIterator(di);
3548
3549 if (dstkey) {
3550 /* Store the resulting set into the target */
3551 deleteKey(c->db,dstkey);
3552 dictAdd(c->db->dict,dstkey,dstset);
3553 incrRefCount(dstkey);
3554 }
3555
3556 if (!dstkey) {
3557 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3558 } else {
3559 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3560 dictSize((dict*)dstset->ptr)));
3561 server.dirty++;
3562 }
3563 zfree(dv);
3564 }
3565
3566 static void sinterCommand(redisClient *c) {
3567 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3568 }
3569
3570 static void sinterstoreCommand(redisClient *c) {
3571 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3572 }
3573
3574 #define REDIS_OP_UNION 0
3575 #define REDIS_OP_DIFF 1
3576
3577 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3578 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3579 dictIterator *di;
3580 dictEntry *de;
3581 robj *dstset = NULL;
3582 int j, cardinality = 0;
3583
3584 for (j = 0; j < setsnum; j++) {
3585 robj *setobj;
3586
3587 setobj = dstkey ?
3588 lookupKeyWrite(c->db,setskeys[j]) :
3589 lookupKeyRead(c->db,setskeys[j]);
3590 if (!setobj) {
3591 dv[j] = NULL;
3592 continue;
3593 }
3594 if (setobj->type != REDIS_SET) {
3595 zfree(dv);
3596 addReply(c,shared.wrongtypeerr);
3597 return;
3598 }
3599 dv[j] = setobj->ptr;
3600 }
3601
3602 /* We need a temp set object to store our union. If the dstkey
3603 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3604 * this set object will be the resulting object to set into the target key*/
3605 dstset = createSetObject();
3606
3607 /* Iterate all the elements of all the sets, add every element a single
3608 * time to the result set */
3609 for (j = 0; j < setsnum; j++) {
3610 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3611 if (!dv[j]) continue; /* non existing keys are like empty sets */
3612
3613 di = dictGetIterator(dv[j]);
3614
3615 while((de = dictNext(di)) != NULL) {
3616 robj *ele;
3617
3618 /* dictAdd will not add the same element multiple times */
3619 ele = dictGetEntryKey(de);
3620 if (op == REDIS_OP_UNION || j == 0) {
3621 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3622 incrRefCount(ele);
3623 cardinality++;
3624 }
3625 } else if (op == REDIS_OP_DIFF) {
3626 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3627 cardinality--;
3628 }
3629 }
3630 }
3631 dictReleaseIterator(di);
3632
3633 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3634 }
3635
3636 /* Output the content of the resulting set, if not in STORE mode */
3637 if (!dstkey) {
3638 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3639 di = dictGetIterator(dstset->ptr);
3640 while((de = dictNext(di)) != NULL) {
3641 robj *ele;
3642
3643 ele = dictGetEntryKey(de);
3644 addReplyBulkLen(c,ele);
3645 addReply(c,ele);
3646 addReply(c,shared.crlf);
3647 }
3648 dictReleaseIterator(di);
3649 } else {
3650 /* If we have a target key where to store the resulting set
3651 * create this key with the result set inside */
3652 deleteKey(c->db,dstkey);
3653 dictAdd(c->db->dict,dstkey,dstset);
3654 incrRefCount(dstkey);
3655 }
3656
3657 /* Cleanup */
3658 if (!dstkey) {
3659 decrRefCount(dstset);
3660 } else {
3661 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3662 dictSize((dict*)dstset->ptr)));
3663 server.dirty++;
3664 }
3665 zfree(dv);
3666 }
3667
3668 static void sunionCommand(redisClient *c) {
3669 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3670 }
3671
3672 static void sunionstoreCommand(redisClient *c) {
3673 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3674 }
3675
3676 static void sdiffCommand(redisClient *c) {
3677 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3678 }
3679
3680 static void sdiffstoreCommand(redisClient *c) {
3681 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3682 }
3683
3684 /* ==================================== ZSets =============================== */
3685
3686 /* ZSETs are ordered sets using two data structures to hold the same elements
3687 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3688 * data structure.
3689 *
3690 * The elements are added to an hash table mapping Redis objects to scores.
3691 * At the same time the elements are added to a skip list mapping scores
3692 * to Redis objects (so objects are sorted by scores in this "view"). */
3693
3694 /* This skiplist implementation is almost a C translation of the original
3695 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3696 * Alternative to Balanced Trees", modified in three ways:
3697 * a) this implementation allows for repeated values.
3698 * b) the comparison is not just by key (our 'score') but by satellite data.
3699 * c) there is a back pointer, so it's a doubly linked list with the back
3700 * pointers being only at "level 1". This allows to traverse the list
3701 * from tail to head, useful for ZREVRANGE. */
3702
3703 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3704 zskiplistNode *zn = zmalloc(sizeof(*zn));
3705
3706 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3707 zn->score = score;
3708 zn->obj = obj;
3709 return zn;
3710 }
3711
3712 static zskiplist *zslCreate(void) {
3713 int j;
3714 zskiplist *zsl;
3715
3716 zsl = zmalloc(sizeof(*zsl));
3717 zsl->level = 1;
3718 zsl->length = 0;
3719 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3720 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3721 zsl->header->forward[j] = NULL;
3722 return zsl;
3723 }
3724
3725 static void zslFreeNode(zskiplistNode *node) {
3726 decrRefCount(node->obj);
3727 zfree(node);
3728 }
3729
3730 static void zslFree(zskiplist *zsl) {
3731 zskiplistNode *node = zsl->header->forward[1], *next;
3732
3733 while(node) {
3734 next = node->forward[1];
3735 zslFreeNode(node);
3736 node = next;
3737 }
3738 }
3739
3740 static int zslRandomLevel(void) {
3741 int level = 1;
3742 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3743 level += 1;
3744 return level;
3745 }
3746
3747 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3748 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3749 int i, level;
3750
3751 x = zsl->header;
3752 for (i = zsl->level-1; i >= 0; i--) {
3753 while (x->forward[i] && x->forward[i]->score < score)
3754 x = x->forward[i];
3755 update[i] = x;
3756 }
3757 /* we assume the key is not already inside, since we allow duplicated
3758 * scores, and the re-insertion of score and redis object should never
3759 * happpen since the caller of zslInsert() should test in the hash table
3760 * if the element is already inside or not. */
3761 level = zslRandomLevel();
3762 if (level > zsl->level) {
3763 for (i = zsl->level; i < level; i++)
3764 update[i] = zsl->header;
3765 zsl->level = level;
3766 }
3767 x = zslCreateNode(level,score,obj);
3768 for (i = 0; i < level; i++) {
3769 x->forward[i] = update[i]->forward[i];
3770 update[i]->forward[i] = x;
3771 }
3772 zsl->length++;
3773 }
3774
3775 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
3776 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3777 int i;
3778
3779 x = zsl->header;
3780 for (i = zsl->level-1; i >= 0; i--) {
3781 while (x->forward[i] && x->forward[i]->score < score)
3782 x = x->forward[i];
3783 update[i] = x;
3784 }
3785 /* We may have multiple elements with the same score, what we need
3786 * is to find the element with both the right score and object. */
3787 x = x->forward[0];
3788 while(x->score == score) {
3789 if (compareStringObjects(x->obj,obj) == 0) {
3790 for (i = 0; i < zsl->level; i++) {
3791 if (update[i]->forward[i] != x) break;
3792 update[i]->forward[i] = x->forward[i];
3793 }
3794 zslFreeNode(x);
3795 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
3796 zsl->level--;
3797 zsl->length--;
3798 return 1;
3799 } else {
3800 x = x->forward[0];
3801 if (!x) return 0; /* end of the list reached, not found */
3802 }
3803 }
3804 return 0; /* not found */
3805 }
3806
3807 /* The actual Z-commands implementations */
3808
3809 static void zaddCommand(redisClient *c) {
3810 robj *zsetobj;
3811 zset *zs;
3812 double *score;
3813
3814 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
3815 if (zsetobj == NULL) {
3816 zsetobj = createZsetObject();
3817 dictAdd(c->db->dict,c->argv[1],zsetobj);
3818 incrRefCount(c->argv[1]);
3819 } else {
3820 if (zsetobj->type != REDIS_ZSET) {
3821 addReply(c,shared.wrongtypeerr);
3822 return;
3823 }
3824 }
3825 score = zmalloc(sizeof(double));
3826 *score = strtod(c->argv[2]->ptr,NULL);
3827 zs = zsetobj->ptr;
3828 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
3829 /* case 1: New element */
3830 incrRefCount(c->argv[3]); /* added to hash */
3831 zslInsert(zs->zsl,*score,c->argv[3]);
3832 incrRefCount(c->argv[3]); /* added to skiplist */
3833 server.dirty++;
3834 addReply(c,shared.cone);
3835 } else {
3836 dictEntry *de;
3837 double *oldscore;
3838
3839 /* case 2: Score update operation */
3840 de = dictFind(zs->dict,c->argv[3]);
3841 assert(de != NULL);
3842 oldscore = dictGetEntryVal(de);
3843 if (*score != *oldscore) {
3844 int deleted;
3845
3846 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
3847 assert(deleted != 0);
3848 zslInsert(zs->zsl,*score,c->argv[3]);
3849 incrRefCount(c->argv[3]);
3850 server.dirty++;
3851 }
3852 addReply(c,shared.czero);
3853 }
3854 }
3855
3856 static void zrangeCommand(redisClient *c) {
3857 robj *o;
3858 int start = atoi(c->argv[2]->ptr);
3859 int end = atoi(c->argv[3]->ptr);
3860
3861 o = lookupKeyRead(c->db,c->argv[1]);
3862 if (o == NULL) {
3863 addReply(c,shared.nullmultibulk);
3864 } else {
3865 if (o->type != REDIS_ZSET) {
3866 addReply(c,shared.wrongtypeerr);
3867 } else {
3868 zset *zsetobj = o->ptr;
3869 zskiplist *zsl = zsetobj->zsl;
3870 zskiplistNode *ln;
3871
3872 int llen = zsl->length;
3873 int rangelen, j;
3874 robj *ele;
3875
3876 /* convert negative indexes */
3877 if (start < 0) start = llen+start;
3878 if (end < 0) end = llen+end;
3879 if (start < 0) start = 0;
3880 if (end < 0) end = 0;
3881
3882 /* indexes sanity checks */
3883 if (start > end || start >= llen) {
3884 /* Out of range start or start > end result in empty list */
3885 addReply(c,shared.emptymultibulk);
3886 return;
3887 }
3888 if (end >= llen) end = llen-1;
3889 rangelen = (end-start)+1;
3890
3891 /* Return the result in form of a multi-bulk reply */
3892 ln = zsl->header->forward[0];
3893 while (start--)
3894 ln = ln->forward[0];
3895
3896 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3897 for (j = 0; j < rangelen; j++) {
3898 ele = ln->obj;
3899 addReplyBulkLen(c,ele);
3900 addReply(c,ele);
3901 addReply(c,shared.crlf);
3902 ln = ln->forward[0];
3903 }
3904 }
3905 }
3906 }
3907
3908 static void zlenCommand(redisClient *c) {
3909 robj *o;
3910 zset *zs;
3911
3912 o = lookupKeyRead(c->db,c->argv[1]);
3913 if (o == NULL) {
3914 addReply(c,shared.czero);
3915 return;
3916 } else {
3917 if (o->type != REDIS_ZSET) {
3918 addReply(c,shared.wrongtypeerr);
3919 } else {
3920 zs = o->ptr;
3921 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
3922 }
3923 }
3924 }
3925
3926 /* ========================= Non type-specific commands ==================== */
3927
3928 static void flushdbCommand(redisClient *c) {
3929 server.dirty += dictSize(c->db->dict);
3930 dictEmpty(c->db->dict);
3931 dictEmpty(c->db->expires);
3932 addReply(c,shared.ok);
3933 }
3934
3935 static void flushallCommand(redisClient *c) {
3936 server.dirty += emptyDb();
3937 addReply(c,shared.ok);
3938 rdbSave(server.dbfilename);
3939 server.dirty++;
3940 }
3941
3942 static redisSortOperation *createSortOperation(int type, robj *pattern) {
3943 redisSortOperation *so = zmalloc(sizeof(*so));
3944 so->type = type;
3945 so->pattern = pattern;
3946 return so;
3947 }
3948
3949 /* Return the value associated to the key with a name obtained
3950 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3951 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3952 char *p;
3953 sds spat, ssub;
3954 robj keyobj;
3955 int prefixlen, sublen, postfixlen;
3956 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3957 struct {
3958 long len;
3959 long free;
3960 char buf[REDIS_SORTKEY_MAX+1];
3961 } keyname;
3962
3963 if (subst->encoding == REDIS_ENCODING_RAW)
3964 incrRefCount(subst);
3965 else {
3966 subst = getDecodedObject(subst);
3967 }
3968
3969 spat = pattern->ptr;
3970 ssub = subst->ptr;
3971 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3972 p = strchr(spat,'*');
3973 if (!p) return NULL;
3974
3975 prefixlen = p-spat;
3976 sublen = sdslen(ssub);
3977 postfixlen = sdslen(spat)-(prefixlen+1);
3978 memcpy(keyname.buf,spat,prefixlen);
3979 memcpy(keyname.buf+prefixlen,ssub,sublen);
3980 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3981 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3982 keyname.len = prefixlen+sublen+postfixlen;
3983
3984 keyobj.refcount = 1;
3985 keyobj.type = REDIS_STRING;
3986 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3987
3988 decrRefCount(subst);
3989
3990 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3991 return lookupKeyRead(db,&keyobj);
3992 }
3993
3994 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3995 * the additional parameter is not standard but a BSD-specific we have to
3996 * pass sorting parameters via the global 'server' structure */
3997 static int sortCompare(const void *s1, const void *s2) {
3998 const redisSortObject *so1 = s1, *so2 = s2;
3999 int cmp;
4000
4001 if (!server.sort_alpha) {
4002 /* Numeric sorting. Here it's trivial as we precomputed scores */
4003 if (so1->u.score > so2->u.score) {
4004 cmp = 1;
4005 } else if (so1->u.score < so2->u.score) {
4006 cmp = -1;
4007 } else {
4008 cmp = 0;
4009 }
4010 } else {
4011 /* Alphanumeric sorting */
4012 if (server.sort_bypattern) {
4013 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4014 /* At least one compare object is NULL */
4015 if (so1->u.cmpobj == so2->u.cmpobj)
4016 cmp = 0;
4017 else if (so1->u.cmpobj == NULL)
4018 cmp = -1;
4019 else
4020 cmp = 1;
4021 } else {
4022 /* We have both the objects, use strcoll */
4023 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4024 }
4025 } else {
4026 /* Compare elements directly */
4027 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4028 so2->obj->encoding == REDIS_ENCODING_RAW) {
4029 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4030 } else {
4031 robj *dec1, *dec2;
4032
4033 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4034 so1->obj : getDecodedObject(so1->obj);
4035 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4036 so2->obj : getDecodedObject(so2->obj);
4037 cmp = strcoll(dec1->ptr,dec2->ptr);
4038 if (dec1 != so1->obj) decrRefCount(dec1);
4039 if (dec2 != so2->obj) decrRefCount(dec2);
4040 }
4041 }
4042 }
4043 return server.sort_desc ? -cmp : cmp;
4044 }
4045
4046 /* The SORT command is the most complex command in Redis. Warning: this code
4047 * is optimized for speed and a bit less for readability */
4048 static void sortCommand(redisClient *c) {
4049 list *operations;
4050 int outputlen = 0;
4051 int desc = 0, alpha = 0;
4052 int limit_start = 0, limit_count = -1, start, end;
4053 int j, dontsort = 0, vectorlen;
4054 int getop = 0; /* GET operation counter */
4055 robj *sortval, *sortby = NULL;
4056 redisSortObject *vector; /* Resulting vector to sort */
4057
4058 /* Lookup the key to sort. It must be of the right types */
4059 sortval = lookupKeyRead(c->db,c->argv[1]);
4060 if (sortval == NULL) {
4061 addReply(c,shared.nokeyerr);
4062 return;
4063 }
4064 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4065 addReply(c,shared.wrongtypeerr);
4066 return;
4067 }
4068
4069 /* Create a list of operations to perform for every sorted element.
4070 * Operations can be GET/DEL/INCR/DECR */
4071 operations = listCreate();
4072 listSetFreeMethod(operations,zfree);
4073 j = 2;
4074
4075 /* Now we need to protect sortval incrementing its count, in the future
4076 * SORT may have options able to overwrite/delete keys during the sorting
4077 * and the sorted key itself may get destroied */
4078 incrRefCount(sortval);
4079
4080 /* The SORT command has an SQL-alike syntax, parse it */
4081 while(j < c->argc) {
4082 int leftargs = c->argc-j-1;
4083 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4084 desc = 0;
4085 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4086 desc = 1;
4087 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4088 alpha = 1;
4089 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4090 limit_start = atoi(c->argv[j+1]->ptr);
4091 limit_count = atoi(c->argv[j+2]->ptr);
4092 j+=2;
4093 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4094 sortby = c->argv[j+1];
4095 /* If the BY pattern does not contain '*', i.e. it is constant,
4096 * we don't need to sort nor to lookup the weight keys. */
4097 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4098 j++;
4099 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4100 listAddNodeTail(operations,createSortOperation(
4101 REDIS_SORT_GET,c->argv[j+1]));
4102 getop++;
4103 j++;
4104 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
4105 listAddNodeTail(operations,createSortOperation(
4106 REDIS_SORT_DEL,c->argv[j+1]));
4107 j++;
4108 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
4109 listAddNodeTail(operations,createSortOperation(
4110 REDIS_SORT_INCR,c->argv[j+1]));
4111 j++;
4112 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4113 listAddNodeTail(operations,createSortOperation(
4114 REDIS_SORT_DECR,c->argv[j+1]));
4115 j++;
4116 } else {
4117 decrRefCount(sortval);
4118 listRelease(operations);
4119 addReply(c,shared.syntaxerr);
4120 return;
4121 }
4122 j++;
4123 }
4124
4125 /* Load the sorting vector with all the objects to sort */
4126 vectorlen = (sortval->type == REDIS_LIST) ?
4127 listLength((list*)sortval->ptr) :
4128 dictSize((dict*)sortval->ptr);
4129 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4130 j = 0;
4131 if (sortval->type == REDIS_LIST) {
4132 list *list = sortval->ptr;
4133 listNode *ln;
4134
4135 listRewind(list);
4136 while((ln = listYield(list))) {
4137 robj *ele = ln->value;
4138 vector[j].obj = ele;
4139 vector[j].u.score = 0;
4140 vector[j].u.cmpobj = NULL;
4141 j++;
4142 }
4143 } else {
4144 dict *set = sortval->ptr;
4145 dictIterator *di;
4146 dictEntry *setele;
4147
4148 di = dictGetIterator(set);
4149 while((setele = dictNext(di)) != NULL) {
4150 vector[j].obj = dictGetEntryKey(setele);
4151 vector[j].u.score = 0;
4152 vector[j].u.cmpobj = NULL;
4153 j++;
4154 }
4155 dictReleaseIterator(di);
4156 }
4157 assert(j == vectorlen);
4158
4159 /* Now it's time to load the right scores in the sorting vector */
4160 if (dontsort == 0) {
4161 for (j = 0; j < vectorlen; j++) {
4162 if (sortby) {
4163 robj *byval;
4164
4165 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4166 if (!byval || byval->type != REDIS_STRING) continue;
4167 if (alpha) {
4168 if (byval->encoding == REDIS_ENCODING_RAW) {
4169 vector[j].u.cmpobj = byval;
4170 incrRefCount(byval);
4171 } else {
4172 vector[j].u.cmpobj = getDecodedObject(byval);
4173 }
4174 } else {
4175 if (byval->encoding == REDIS_ENCODING_RAW) {
4176 vector[j].u.score = strtod(byval->ptr,NULL);
4177 } else {
4178 if (byval->encoding == REDIS_ENCODING_INT) {
4179 vector[j].u.score = (long)byval->ptr;
4180 } else
4181 assert(1 != 1);
4182 }
4183 }
4184 } else {
4185 if (!alpha) {
4186 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4187 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4188 else {
4189 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4190 vector[j].u.score = (long) vector[j].obj->ptr;
4191 else
4192 assert(1 != 1);
4193 }
4194 }
4195 }
4196 }
4197 }
4198
4199 /* We are ready to sort the vector... perform a bit of sanity check
4200 * on the LIMIT option too. We'll use a partial version of quicksort. */
4201 start = (limit_start < 0) ? 0 : limit_start;
4202 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4203 if (start >= vectorlen) {
4204 start = vectorlen-1;
4205 end = vectorlen-2;
4206 }
4207 if (end >= vectorlen) end = vectorlen-1;
4208
4209 if (dontsort == 0) {
4210 server.sort_desc = desc;
4211 server.sort_alpha = alpha;
4212 server.sort_bypattern = sortby ? 1 : 0;
4213 if (sortby && (start != 0 || end != vectorlen-1))
4214 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4215 else
4216 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4217 }
4218
4219 /* Send command output to the output buffer, performing the specified
4220 * GET/DEL/INCR/DECR operations if any. */
4221 outputlen = getop ? getop*(end-start+1) : end-start+1;
4222 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4223 for (j = start; j <= end; j++) {
4224 listNode *ln;
4225 if (!getop) {
4226 addReplyBulkLen(c,vector[j].obj);
4227 addReply(c,vector[j].obj);
4228 addReply(c,shared.crlf);
4229 }
4230 listRewind(operations);
4231 while((ln = listYield(operations))) {
4232 redisSortOperation *sop = ln->value;
4233 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4234 vector[j].obj);
4235
4236 if (sop->type == REDIS_SORT_GET) {
4237 if (!val || val->type != REDIS_STRING) {
4238 addReply(c,shared.nullbulk);
4239 } else {
4240 addReplyBulkLen(c,val);
4241 addReply(c,val);
4242 addReply(c,shared.crlf);
4243 }
4244 } else if (sop->type == REDIS_SORT_DEL) {
4245 /* TODO */
4246 }
4247 }
4248 }
4249
4250 /* Cleanup */
4251 decrRefCount(sortval);
4252 listRelease(operations);
4253 for (j = 0; j < vectorlen; j++) {
4254 if (sortby && alpha && vector[j].u.cmpobj)
4255 decrRefCount(vector[j].u.cmpobj);
4256 }
4257 zfree(vector);
4258 }
4259
4260 static void infoCommand(redisClient *c) {
4261 sds info;
4262 time_t uptime = time(NULL)-server.stat_starttime;
4263 int j;
4264
4265 info = sdscatprintf(sdsempty(),
4266 "redis_version:%s\r\n"
4267 "arch_bits:%s\r\n"
4268 "uptime_in_seconds:%d\r\n"
4269 "uptime_in_days:%d\r\n"
4270 "connected_clients:%d\r\n"
4271 "connected_slaves:%d\r\n"
4272 "used_memory:%zu\r\n"
4273 "changes_since_last_save:%lld\r\n"
4274 "bgsave_in_progress:%d\r\n"
4275 "last_save_time:%d\r\n"
4276 "total_connections_received:%lld\r\n"
4277 "total_commands_processed:%lld\r\n"
4278 "role:%s\r\n"
4279 ,REDIS_VERSION,
4280 (sizeof(long) == 8) ? "64" : "32",
4281 uptime,
4282 uptime/(3600*24),
4283 listLength(server.clients)-listLength(server.slaves),
4284 listLength(server.slaves),
4285 server.usedmemory,
4286 server.dirty,
4287 server.bgsaveinprogress,
4288 server.lastsave,
4289 server.stat_numconnections,
4290 server.stat_numcommands,
4291 server.masterhost == NULL ? "master" : "slave"
4292 );
4293 if (server.masterhost) {
4294 info = sdscatprintf(info,
4295 "master_host:%s\r\n"
4296 "master_port:%d\r\n"
4297 "master_link_status:%s\r\n"
4298 "master_last_io_seconds_ago:%d\r\n"
4299 ,server.masterhost,
4300 server.masterport,
4301 (server.replstate == REDIS_REPL_CONNECTED) ?
4302 "up" : "down",
4303 (int)(time(NULL)-server.master->lastinteraction)
4304 );
4305 }
4306 for (j = 0; j < server.dbnum; j++) {
4307 long long keys, vkeys;
4308
4309 keys = dictSize(server.db[j].dict);
4310 vkeys = dictSize(server.db[j].expires);
4311 if (keys || vkeys) {
4312 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4313 j, keys, vkeys);
4314 }
4315 }
4316 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4317 addReplySds(c,info);
4318 addReply(c,shared.crlf);
4319 }
4320
4321 static void monitorCommand(redisClient *c) {
4322 /* ignore MONITOR if aleady slave or in monitor mode */
4323 if (c->flags & REDIS_SLAVE) return;
4324
4325 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4326 c->slaveseldb = 0;
4327 listAddNodeTail(server.monitors,c);
4328 addReply(c,shared.ok);
4329 }
4330
4331 /* ================================= Expire ================================= */
4332 static int removeExpire(redisDb *db, robj *key) {
4333 if (dictDelete(db->expires,key) == DICT_OK) {
4334 return 1;
4335 } else {
4336 return 0;
4337 }
4338 }
4339
4340 static int setExpire(redisDb *db, robj *key, time_t when) {
4341 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4342 return 0;
4343 } else {
4344 incrRefCount(key);
4345 return 1;
4346 }
4347 }
4348
4349 /* Return the expire time of the specified key, or -1 if no expire
4350 * is associated with this key (i.e. the key is non volatile) */
4351 static time_t getExpire(redisDb *db, robj *key) {
4352 dictEntry *de;
4353
4354 /* No expire? return ASAP */
4355 if (dictSize(db->expires) == 0 ||
4356 (de = dictFind(db->expires,key)) == NULL) return -1;
4357
4358 return (time_t) dictGetEntryVal(de);
4359 }
4360
4361 static int expireIfNeeded(redisDb *db, robj *key) {
4362 time_t when;
4363 dictEntry *de;
4364
4365 /* No expire? return ASAP */
4366 if (dictSize(db->expires) == 0 ||
4367 (de = dictFind(db->expires,key)) == NULL) return 0;
4368
4369 /* Lookup the expire */
4370 when = (time_t) dictGetEntryVal(de);
4371 if (time(NULL) <= when) return 0;
4372
4373 /* Delete the key */
4374 dictDelete(db->expires,key);
4375 return dictDelete(db->dict,key) == DICT_OK;
4376 }
4377
4378 static int deleteIfVolatile(redisDb *db, robj *key) {
4379 dictEntry *de;
4380
4381 /* No expire? return ASAP */
4382 if (dictSize(db->expires) == 0 ||
4383 (de = dictFind(db->expires,key)) == NULL) return 0;
4384
4385 /* Delete the key */
4386 server.dirty++;
4387 dictDelete(db->expires,key);
4388 return dictDelete(db->dict,key) == DICT_OK;
4389 }
4390
4391 static void expireCommand(redisClient *c) {
4392 dictEntry *de;
4393 int seconds = atoi(c->argv[2]->ptr);
4394
4395 de = dictFind(c->db->dict,c->argv[1]);
4396 if (de == NULL) {
4397 addReply(c,shared.czero);
4398 return;
4399 }
4400 if (seconds <= 0) {
4401 addReply(c, shared.czero);
4402 return;
4403 } else {
4404 time_t when = time(NULL)+seconds;
4405 if (setExpire(c->db,c->argv[1],when)) {
4406 addReply(c,shared.cone);
4407 server.dirty++;
4408 } else {
4409 addReply(c,shared.czero);
4410 }
4411 return;
4412 }
4413 }
4414
4415 static void ttlCommand(redisClient *c) {
4416 time_t expire;
4417 int ttl = -1;
4418
4419 expire = getExpire(c->db,c->argv[1]);
4420 if (expire != -1) {
4421 ttl = (int) (expire-time(NULL));
4422 if (ttl < 0) ttl = -1;
4423 }
4424 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4425 }
4426
4427 static void msetGenericCommand(redisClient *c, int nx) {
4428 int j;
4429
4430 if ((c->argc % 2) == 0) {
4431 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4432 return;
4433 }
4434 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4435 * set nothing at all if at least one already key exists. */
4436 if (nx) {
4437 for (j = 1; j < c->argc; j += 2) {
4438 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4439 addReply(c, shared.czero);
4440 return;
4441 }
4442 }
4443 }
4444
4445 for (j = 1; j < c->argc; j += 2) {
4446 int retval;
4447
4448 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4449 if (retval == DICT_ERR) {
4450 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4451 incrRefCount(c->argv[j+1]);
4452 } else {
4453 incrRefCount(c->argv[j]);
4454 incrRefCount(c->argv[j+1]);
4455 }
4456 removeExpire(c->db,c->argv[j]);
4457 }
4458 server.dirty += (c->argc-1)/2;
4459 addReply(c, nx ? shared.cone : shared.ok);
4460 }
4461
4462 static void msetCommand(redisClient *c) {
4463 msetGenericCommand(c,0);
4464 }
4465
4466 static void msetnxCommand(redisClient *c) {
4467 msetGenericCommand(c,1);
4468 }
4469
4470 /* =============================== Replication ============================= */
4471
4472 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4473 ssize_t nwritten, ret = size;
4474 time_t start = time(NULL);
4475
4476 timeout++;
4477 while(size) {
4478 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4479 nwritten = write(fd,ptr,size);
4480 if (nwritten == -1) return -1;
4481 ptr += nwritten;
4482 size -= nwritten;
4483 }
4484 if ((time(NULL)-start) > timeout) {
4485 errno = ETIMEDOUT;
4486 return -1;
4487 }
4488 }
4489 return ret;
4490 }
4491
4492 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4493 ssize_t nread, totread = 0;
4494 time_t start = time(NULL);
4495
4496 timeout++;
4497 while(size) {
4498 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4499 nread = read(fd,ptr,size);
4500 if (nread == -1) return -1;
4501 ptr += nread;
4502 size -= nread;
4503 totread += nread;
4504 }
4505 if ((time(NULL)-start) > timeout) {
4506 errno = ETIMEDOUT;
4507 return -1;
4508 }
4509 }
4510 return totread;
4511 }
4512
4513 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4514 ssize_t nread = 0;
4515
4516 size--;
4517 while(size) {
4518 char c;
4519
4520 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4521 if (c == '\n') {
4522 *ptr = '\0';
4523 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4524 return nread;
4525 } else {
4526 *ptr++ = c;
4527 *ptr = '\0';
4528 nread++;
4529 }
4530 }
4531 return nread;
4532 }
4533
4534 static void syncCommand(redisClient *c) {
4535 /* ignore SYNC if aleady slave or in monitor mode */
4536 if (c->flags & REDIS_SLAVE) return;
4537
4538 /* SYNC can't be issued when the server has pending data to send to
4539 * the client about already issued commands. We need a fresh reply
4540 * buffer registering the differences between the BGSAVE and the current
4541 * dataset, so that we can copy to other slaves if needed. */
4542 if (listLength(c->reply) != 0) {
4543 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4544 return;
4545 }
4546
4547 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4548 /* Here we need to check if there is a background saving operation
4549 * in progress, or if it is required to start one */
4550 if (server.bgsaveinprogress) {
4551 /* Ok a background save is in progress. Let's check if it is a good
4552 * one for replication, i.e. if there is another slave that is
4553 * registering differences since the server forked to save */
4554 redisClient *slave;
4555 listNode *ln;
4556
4557 listRewind(server.slaves);
4558 while((ln = listYield(server.slaves))) {
4559 slave = ln->value;
4560 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4561 }
4562 if (ln) {
4563 /* Perfect, the server is already registering differences for
4564 * another slave. Set the right state, and copy the buffer. */
4565 listRelease(c->reply);
4566 c->reply = listDup(slave->reply);
4567 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4568 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4569 } else {
4570 /* No way, we need to wait for the next BGSAVE in order to
4571 * register differences */
4572 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4573 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4574 }
4575 } else {
4576 /* Ok we don't have a BGSAVE in progress, let's start one */
4577 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4578 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4579 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4580 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4581 return;
4582 }
4583 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4584 }
4585 c->repldbfd = -1;
4586 c->flags |= REDIS_SLAVE;
4587 c->slaveseldb = 0;
4588 listAddNodeTail(server.slaves,c);
4589 return;
4590 }
4591
4592 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4593 redisClient *slave = privdata;
4594 REDIS_NOTUSED(el);
4595 REDIS_NOTUSED(mask);
4596 char buf[REDIS_IOBUF_LEN];
4597 ssize_t nwritten, buflen;
4598
4599 if (slave->repldboff == 0) {
4600 /* Write the bulk write count before to transfer the DB. In theory here
4601 * we don't know how much room there is in the output buffer of the
4602 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4603 * operations) will never be smaller than the few bytes we need. */
4604 sds bulkcount;
4605
4606 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4607 slave->repldbsize);
4608 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4609 {
4610 sdsfree(bulkcount);
4611 freeClient(slave);
4612 return;
4613 }
4614 sdsfree(bulkcount);
4615 }
4616 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4617 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4618 if (buflen <= 0) {
4619 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4620 (buflen == 0) ? "premature EOF" : strerror(errno));
4621 freeClient(slave);
4622 return;
4623 }
4624 if ((nwritten = write(fd,buf,buflen)) == -1) {
4625 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4626 strerror(errno));
4627 freeClient(slave);
4628 return;
4629 }
4630 slave->repldboff += nwritten;
4631 if (slave->repldboff == slave->repldbsize) {
4632 close(slave->repldbfd);
4633 slave->repldbfd = -1;
4634 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4635 slave->replstate = REDIS_REPL_ONLINE;
4636 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4637 sendReplyToClient, slave, NULL) == AE_ERR) {
4638 freeClient(slave);
4639 return;
4640 }
4641 addReplySds(slave,sdsempty());
4642 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4643 }
4644 }
4645
4646 /* This function is called at the end of every backgrond saving.
4647 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4648 * otherwise REDIS_ERR is passed to the function.
4649 *
4650 * The goal of this function is to handle slaves waiting for a successful
4651 * background saving in order to perform non-blocking synchronization. */
4652 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4653 listNode *ln;
4654 int startbgsave = 0;
4655
4656 listRewind(server.slaves);
4657 while((ln = listYield(server.slaves))) {
4658 redisClient *slave = ln->value;
4659
4660 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4661 startbgsave = 1;
4662 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4663 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4664 struct redis_stat buf;
4665
4666 if (bgsaveerr != REDIS_OK) {
4667 freeClient(slave);
4668 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4669 continue;
4670 }
4671 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4672 redis_fstat(slave->repldbfd,&buf) == -1) {
4673 freeClient(slave);
4674 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4675 continue;
4676 }
4677 slave->repldboff = 0;
4678 slave->repldbsize = buf.st_size;
4679 slave->replstate = REDIS_REPL_SEND_BULK;
4680 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4681 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4682 freeClient(slave);
4683 continue;
4684 }
4685 }
4686 }
4687 if (startbgsave) {
4688 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4689 listRewind(server.slaves);
4690 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4691 while((ln = listYield(server.slaves))) {
4692 redisClient *slave = ln->value;
4693
4694 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4695 freeClient(slave);
4696 }
4697 }
4698 }
4699 }
4700
4701 static int syncWithMaster(void) {
4702 char buf[1024], tmpfile[256];
4703 int dumpsize;
4704 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4705 int dfd;
4706
4707 if (fd == -1) {
4708 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4709 strerror(errno));
4710 return REDIS_ERR;
4711 }
4712 /* Issue the SYNC command */
4713 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4714 close(fd);
4715 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4716 strerror(errno));
4717 return REDIS_ERR;
4718 }
4719 /* Read the bulk write count */
4720 if (syncReadLine(fd,buf,1024,3600) == -1) {
4721 close(fd);
4722 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
4723 strerror(errno));
4724 return REDIS_ERR;
4725 }
4726 dumpsize = atoi(buf+1);
4727 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4728 /* Read the bulk write data on a temp file */
4729 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4730 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4731 if (dfd == -1) {
4732 close(fd);
4733 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
4734 return REDIS_ERR;
4735 }
4736 while(dumpsize) {
4737 int nread, nwritten;
4738
4739 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
4740 if (nread == -1) {
4741 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
4742 strerror(errno));
4743 close(fd);
4744 close(dfd);
4745 return REDIS_ERR;
4746 }
4747 nwritten = write(dfd,buf,nread);
4748 if (nwritten == -1) {
4749 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
4750 close(fd);
4751 close(dfd);
4752 return REDIS_ERR;
4753 }
4754 dumpsize -= nread;
4755 }
4756 close(dfd);
4757 if (rename(tmpfile,server.dbfilename) == -1) {
4758 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
4759 unlink(tmpfile);
4760 close(fd);
4761 return REDIS_ERR;
4762 }
4763 emptyDb();
4764 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4765 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4766 close(fd);
4767 return REDIS_ERR;
4768 }
4769 server.master = createClient(fd);
4770 server.master->flags |= REDIS_MASTER;
4771 server.replstate = REDIS_REPL_CONNECTED;
4772 return REDIS_OK;
4773 }
4774
4775 static void slaveofCommand(redisClient *c) {
4776 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4777 !strcasecmp(c->argv[2]->ptr,"one")) {
4778 if (server.masterhost) {
4779 sdsfree(server.masterhost);
4780 server.masterhost = NULL;
4781 if (server.master) freeClient(server.master);
4782 server.replstate = REDIS_REPL_NONE;
4783 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4784 }
4785 } else {
4786 sdsfree(server.masterhost);
4787 server.masterhost = sdsdup(c->argv[1]->ptr);
4788 server.masterport = atoi(c->argv[2]->ptr);
4789 if (server.master) freeClient(server.master);
4790 server.replstate = REDIS_REPL_CONNECT;
4791 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4792 server.masterhost, server.masterport);
4793 }
4794 addReply(c,shared.ok);
4795 }
4796
4797 /* ============================ Maxmemory directive ======================== */
4798
4799 /* This function gets called when 'maxmemory' is set on the config file to limit
4800 * the max memory used by the server, and we are out of memory.
4801 * This function will try to, in order:
4802 *
4803 * - Free objects from the free list
4804 * - Try to remove keys with an EXPIRE set
4805 *
4806 * It is not possible to free enough memory to reach used-memory < maxmemory
4807 * the server will start refusing commands that will enlarge even more the
4808 * memory usage.
4809 */
4810 static void freeMemoryIfNeeded(void) {
4811 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4812 if (listLength(server.objfreelist)) {
4813 robj *o;
4814
4815 listNode *head = listFirst(server.objfreelist);
4816 o = listNodeValue(head);
4817 listDelNode(server.objfreelist,head);
4818 zfree(o);
4819 } else {
4820 int j, k, freed = 0;
4821
4822 for (j = 0; j < server.dbnum; j++) {
4823 int minttl = -1;
4824 robj *minkey = NULL;
4825 struct dictEntry *de;
4826
4827 if (dictSize(server.db[j].expires)) {
4828 freed = 1;
4829 /* From a sample of three keys drop the one nearest to
4830 * the natural expire */
4831 for (k = 0; k < 3; k++) {
4832 time_t t;
4833
4834 de = dictGetRandomKey(server.db[j].expires);
4835 t = (time_t) dictGetEntryVal(de);
4836 if (minttl == -1 || t < minttl) {
4837 minkey = dictGetEntryKey(de);
4838 minttl = t;
4839 }
4840 }
4841 deleteKey(server.db+j,minkey);
4842 }
4843 }
4844 if (!freed) return; /* nothing to free... */
4845 }
4846 }
4847 }
4848
4849 /* ================================= Debugging ============================== */
4850
4851 static void debugCommand(redisClient *c) {
4852 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4853 *((char*)-1) = 'x';
4854 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4855 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4856 robj *key, *val;
4857
4858 if (!de) {
4859 addReply(c,shared.nokeyerr);
4860 return;
4861 }
4862 key = dictGetEntryKey(de);
4863 val = dictGetEntryVal(de);
4864 addReplySds(c,sdscatprintf(sdsempty(),
4865 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4866 key, key->refcount, val, val->refcount, val->encoding));
4867 } else {
4868 addReplySds(c,sdsnew(
4869 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4870 }
4871 }
4872
4873 #ifdef HAVE_BACKTRACE
4874 static struct redisFunctionSym symsTable[] = {
4875 {"compareStringObjects", (unsigned long)compareStringObjects},
4876 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
4877 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
4878 {"dictEncObjHash", (unsigned long)dictEncObjHash},
4879 {"incrDecrCommand", (unsigned long)incrDecrCommand},
4880 {"freeStringObject", (unsigned long)freeStringObject},
4881 {"freeListObject", (unsigned long)freeListObject},
4882 {"freeSetObject", (unsigned long)freeSetObject},
4883 {"decrRefCount", (unsigned long)decrRefCount},
4884 {"createObject", (unsigned long)createObject},
4885 {"freeClient", (unsigned long)freeClient},
4886 {"rdbLoad", (unsigned long)rdbLoad},
4887 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
4888 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
4889 {"addReply", (unsigned long)addReply},
4890 {"addReplySds", (unsigned long)addReplySds},
4891 {"incrRefCount", (unsigned long)incrRefCount},
4892 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
4893 {"createStringObject", (unsigned long)createStringObject},
4894 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
4895 {"syncWithMaster", (unsigned long)syncWithMaster},
4896 {"tryObjectSharing", (unsigned long)tryObjectSharing},
4897 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
4898 {"getDecodedObject", (unsigned long)getDecodedObject},
4899 {"removeExpire", (unsigned long)removeExpire},
4900 {"expireIfNeeded", (unsigned long)expireIfNeeded},
4901 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
4902 {"deleteKey", (unsigned long)deleteKey},
4903 {"getExpire", (unsigned long)getExpire},
4904 {"setExpire", (unsigned long)setExpire},
4905 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
4906 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
4907 {"authCommand", (unsigned long)authCommand},
4908 {"pingCommand", (unsigned long)pingCommand},
4909 {"echoCommand", (unsigned long)echoCommand},
4910 {"setCommand", (unsigned long)setCommand},
4911 {"setnxCommand", (unsigned long)setnxCommand},
4912 {"getCommand", (unsigned long)getCommand},
4913 {"delCommand", (unsigned long)delCommand},
4914 {"existsCommand", (unsigned long)existsCommand},
4915 {"incrCommand", (unsigned long)incrCommand},
4916 {"decrCommand", (unsigned long)decrCommand},
4917 {"incrbyCommand", (unsigned long)incrbyCommand},
4918 {"decrbyCommand", (unsigned long)decrbyCommand},
4919 {"selectCommand", (unsigned long)selectCommand},
4920 {"randomkeyCommand", (unsigned long)randomkeyCommand},
4921 {"keysCommand", (unsigned long)keysCommand},
4922 {"dbsizeCommand", (unsigned long)dbsizeCommand},
4923 {"lastsaveCommand", (unsigned long)lastsaveCommand},
4924 {"saveCommand", (unsigned long)saveCommand},
4925 {"bgsaveCommand", (unsigned long)bgsaveCommand},
4926 {"shutdownCommand", (unsigned long)shutdownCommand},
4927 {"moveCommand", (unsigned long)moveCommand},
4928 {"renameCommand", (unsigned long)renameCommand},
4929 {"renamenxCommand", (unsigned long)renamenxCommand},
4930 {"lpushCommand", (unsigned long)lpushCommand},
4931 {"rpushCommand", (unsigned long)rpushCommand},
4932 {"lpopCommand", (unsigned long)lpopCommand},
4933 {"rpopCommand", (unsigned long)rpopCommand},
4934 {"llenCommand", (unsigned long)llenCommand},
4935 {"lindexCommand", (unsigned long)lindexCommand},
4936 {"lrangeCommand", (unsigned long)lrangeCommand},
4937 {"ltrimCommand", (unsigned long)ltrimCommand},
4938 {"typeCommand", (unsigned long)typeCommand},
4939 {"lsetCommand", (unsigned long)lsetCommand},
4940 {"saddCommand", (unsigned long)saddCommand},
4941 {"sremCommand", (unsigned long)sremCommand},
4942 {"smoveCommand", (unsigned long)smoveCommand},
4943 {"sismemberCommand", (unsigned long)sismemberCommand},
4944 {"scardCommand", (unsigned long)scardCommand},
4945 {"spopCommand", (unsigned long)spopCommand},
4946 {"srandmemberCommand", (unsigned long)srandmemberCommand},
4947 {"sinterCommand", (unsigned long)sinterCommand},
4948 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
4949 {"sunionCommand", (unsigned long)sunionCommand},
4950 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
4951 {"sdiffCommand", (unsigned long)sdiffCommand},
4952 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
4953 {"syncCommand", (unsigned long)syncCommand},
4954 {"flushdbCommand", (unsigned long)flushdbCommand},
4955 {"flushallCommand", (unsigned long)flushallCommand},
4956 {"sortCommand", (unsigned long)sortCommand},
4957 {"lremCommand", (unsigned long)lremCommand},
4958 {"infoCommand", (unsigned long)infoCommand},
4959 {"mgetCommand", (unsigned long)mgetCommand},
4960 {"monitorCommand", (unsigned long)monitorCommand},
4961 {"expireCommand", (unsigned long)expireCommand},
4962 {"getsetCommand", (unsigned long)getsetCommand},
4963 {"ttlCommand", (unsigned long)ttlCommand},
4964 {"slaveofCommand", (unsigned long)slaveofCommand},
4965 {"debugCommand", (unsigned long)debugCommand},
4966 {"processCommand", (unsigned long)processCommand},
4967 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
4968 {"readQueryFromClient", (unsigned long)readQueryFromClient},
4969 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
4970 {"msetGenericCommand", (unsigned long)msetGenericCommand},
4971 {"msetCommand", (unsigned long)msetCommand},
4972 {"msetnxCommand", (unsigned long)msetnxCommand},
4973 {"zslCreateNode", (unsigned long)zslCreateNode},
4974 {"zslCreate", (unsigned long)zslCreate},
4975 {"zslFreeNode",(unsigned long)zslFreeNode},
4976 {"zslFree",(unsigned long)zslFree},
4977 {"zslRandomLevel",(unsigned long)zslRandomLevel},
4978 {"zslInsert",(unsigned long)zslInsert},
4979 {"zslDelete",(unsigned long)zslDelete},
4980 {"createZsetObject",(unsigned long)createZsetObject},
4981 {"zaddCommand",(unsigned long)zaddCommand},
4982 {"zrangeCommand",(unsigned long)zrangeCommand},
4983 {NULL,0}
4984 };
4985
4986 /* This function try to convert a pointer into a function name. It's used in
4987 * oreder to provide a backtrace under segmentation fault that's able to
4988 * display functions declared as static (otherwise the backtrace is useless). */
4989 static char *findFuncName(void *pointer, unsigned long *offset){
4990 int i, ret = -1;
4991 unsigned long off, minoff = 0;
4992
4993 /* Try to match against the Symbol with the smallest offset */
4994 for (i=0; symsTable[i].pointer; i++) {
4995 unsigned long lp = (unsigned long) pointer;
4996
4997 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
4998 off=lp-symsTable[i].pointer;
4999 if (ret < 0 || off < minoff) {
5000 minoff=off;
5001 ret=i;
5002 }
5003 }
5004 }
5005 if (ret == -1) return NULL;
5006 *offset = minoff;
5007 return symsTable[ret].name;
5008 }
5009
5010 static void *getMcontextEip(ucontext_t *uc) {
5011 #if defined(__FreeBSD__)
5012 return (void*) uc->uc_mcontext.mc_eip;
5013 #elif defined(__dietlibc__)
5014 return (void*) uc->uc_mcontext.eip;
5015 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5016 return (void*) uc->uc_mcontext->__ss.__eip;
5017 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5018 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5019 return (void*) uc->uc_mcontext->__ss.__rip;
5020 #else
5021 return (void*) uc->uc_mcontext->__ss.__eip;
5022 #endif
5023 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5024 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5025 #elif defined(__ia64__) /* Linux IA64 */
5026 return (void*) uc->uc_mcontext.sc_ip;
5027 #else
5028 return NULL;
5029 #endif
5030 }
5031
5032 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5033 void *trace[100];
5034 char **messages = NULL;
5035 int i, trace_size = 0;
5036 unsigned long offset=0;
5037 time_t uptime = time(NULL)-server.stat_starttime;
5038 ucontext_t *uc = (ucontext_t*) secret;
5039 REDIS_NOTUSED(info);
5040
5041 redisLog(REDIS_WARNING,
5042 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5043 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5044 "redis_version:%s; "
5045 "uptime_in_seconds:%d; "
5046 "connected_clients:%d; "
5047 "connected_slaves:%d; "
5048 "used_memory:%zu; "
5049 "changes_since_last_save:%lld; "
5050 "bgsave_in_progress:%d; "
5051 "last_save_time:%d; "
5052 "total_connections_received:%lld; "
5053 "total_commands_processed:%lld; "
5054 "role:%s;"
5055 ,REDIS_VERSION,
5056 uptime,
5057 listLength(server.clients)-listLength(server.slaves),
5058 listLength(server.slaves),
5059 server.usedmemory,
5060 server.dirty,
5061 server.bgsaveinprogress,
5062 server.lastsave,
5063 server.stat_numconnections,
5064 server.stat_numcommands,
5065 server.masterhost == NULL ? "master" : "slave"
5066 ));
5067
5068 trace_size = backtrace(trace, 100);
5069 /* overwrite sigaction with caller's address */
5070 if (getMcontextEip(uc) != NULL) {
5071 trace[1] = getMcontextEip(uc);
5072 }
5073 messages = backtrace_symbols(trace, trace_size);
5074
5075 for (i=1; i<trace_size; ++i) {
5076 char *fn = findFuncName(trace[i], &offset), *p;
5077
5078 p = strchr(messages[i],'+');
5079 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5080 redisLog(REDIS_WARNING,"%s", messages[i]);
5081 } else {
5082 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5083 }
5084 }
5085 free(messages);
5086 exit(0);
5087 }
5088
5089 static void setupSigSegvAction(void) {
5090 struct sigaction act;
5091
5092 sigemptyset (&act.sa_mask);
5093 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5094 * is used. Otherwise, sa_handler is used */
5095 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5096 act.sa_sigaction = segvHandler;
5097 sigaction (SIGSEGV, &act, NULL);
5098 sigaction (SIGBUS, &act, NULL);
5099 sigaction (SIGFPE, &act, NULL);
5100 sigaction (SIGILL, &act, NULL);
5101 sigaction (SIGBUS, &act, NULL);
5102 return;
5103 }
5104 #else /* HAVE_BACKTRACE */
5105 static void setupSigSegvAction(void) {
5106 }
5107 #endif /* HAVE_BACKTRACE */
5108
5109 /* =================================== Main! ================================ */
5110
5111 #ifdef __linux__
5112 int linuxOvercommitMemoryValue(void) {
5113 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5114 char buf[64];
5115
5116 if (!fp) return -1;
5117 if (fgets(buf,64,fp) == NULL) {
5118 fclose(fp);
5119 return -1;
5120 }
5121 fclose(fp);
5122
5123 return atoi(buf);
5124 }
5125
5126 void linuxOvercommitMemoryWarning(void) {
5127 if (linuxOvercommitMemoryValue() == 0) {
5128 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5129 }
5130 }
5131 #endif /* __linux__ */
5132
5133 static void daemonize(void) {
5134 int fd;
5135 FILE *fp;
5136
5137 if (fork() != 0) exit(0); /* parent exits */
5138 setsid(); /* create a new session */
5139
5140 /* Every output goes to /dev/null. If Redis is daemonized but
5141 * the 'logfile' is set to 'stdout' in the configuration file
5142 * it will not log at all. */
5143 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5144 dup2(fd, STDIN_FILENO);
5145 dup2(fd, STDOUT_FILENO);
5146 dup2(fd, STDERR_FILENO);
5147 if (fd > STDERR_FILENO) close(fd);
5148 }
5149 /* Try to write the pid file */
5150 fp = fopen(server.pidfile,"w");
5151 if (fp) {
5152 fprintf(fp,"%d\n",getpid());
5153 fclose(fp);
5154 }
5155 }
5156
5157 int main(int argc, char **argv) {
5158 initServerConfig();
5159 if (argc == 2) {
5160 ResetServerSaveParams();
5161 loadServerConfig(argv[1]);
5162 } else if (argc > 2) {
5163 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5164 exit(1);
5165 } else {
5166 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5167 }
5168 initServer();
5169 if (server.daemonize) daemonize();
5170 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5171 #ifdef __linux__
5172 linuxOvercommitMemoryWarning();
5173 #endif
5174 if (rdbLoad(server.dbfilename) == REDIS_OK)
5175 redisLog(REDIS_NOTICE,"DB loaded from disk");
5176 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5177 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5178 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5179 aeMain(server.el);
5180 aeDeleteEventLoop(server.el);
5181 return 0;
5182 }