]> git.saurik.com Git - redis.git/blob - redis.c
ffa416972bdd9b860c3b736b2bc4fe14e7f05247
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60
61 #include "redis.h"
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
70
71 /* Error codes */
72 #define REDIS_OK 0
73 #define REDIS_ERR -1
74
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
88
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
91
92 /* Command flags */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
100
101 /* Object types */
102 #define REDIS_STRING 0
103 #define REDIS_LIST 1
104 #define REDIS_SET 2
105 #define REDIS_ZSET 3
106 #define REDIS_HASH 4
107
108 /* Objects encoding */
109 #define REDIS_ENCODING_RAW 0 /* Raw representation */
110 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
111
112 /* Object types only used for dumping to disk */
113 #define REDIS_EXPIRETIME 253
114 #define REDIS_SELECTDB 254
115 #define REDIS_EOF 255
116
117 /* Defines related to the dump file format. To store 32 bits lengths for short
118 * keys requires a lot of space, so we check the most significant 2 bits of
119 * the first byte to interpreter the length:
120 *
121 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
122 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
123 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
124 * 11|000000 this means: specially encoded object will follow. The six bits
125 * number specify the kind of object that follows.
126 * See the REDIS_RDB_ENC_* defines.
127 *
128 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
129 * values, will fit inside. */
130 #define REDIS_RDB_6BITLEN 0
131 #define REDIS_RDB_14BITLEN 1
132 #define REDIS_RDB_32BITLEN 2
133 #define REDIS_RDB_ENCVAL 3
134 #define REDIS_RDB_LENERR UINT_MAX
135
136 /* When a length of a string object stored on disk has the first two bits
137 * set, the remaining two bits specify a special encoding for the object
138 * accordingly to the following defines: */
139 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
140 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
141 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
142 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
143
144 /* Client flags */
145 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
146 #define REDIS_SLAVE 2 /* This client is a slave server */
147 #define REDIS_MASTER 4 /* This client is a master server */
148 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
149
150 /* Slave replication state - slave side */
151 #define REDIS_REPL_NONE 0 /* No active replication */
152 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
153 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
154
155 /* Slave replication state - from the point of view of master
156 * Note that in SEND_BULK and ONLINE state the slave receives new updates
157 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
158 * to start the next background saving in order to send updates to it. */
159 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
160 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
161 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
162 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
163
164 /* List related stuff */
165 #define REDIS_HEAD 0
166 #define REDIS_TAIL 1
167
168 /* Sort operations */
169 #define REDIS_SORT_GET 0
170 #define REDIS_SORT_DEL 1
171 #define REDIS_SORT_INCR 2
172 #define REDIS_SORT_DECR 3
173 #define REDIS_SORT_ASC 4
174 #define REDIS_SORT_DESC 5
175 #define REDIS_SORTKEY_MAX 1024
176
177 /* Log levels */
178 #define REDIS_DEBUG 0
179 #define REDIS_NOTICE 1
180 #define REDIS_WARNING 2
181
182 /* Anti-warning macro... */
183 #define REDIS_NOTUSED(V) ((void) V)
184
185 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
186 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
187
188 /*================================= Data types ============================== */
189
190 /* A redis object, that is a type able to hold a string / list / set */
191 typedef struct redisObject {
192 void *ptr;
193 unsigned char type;
194 unsigned char encoding;
195 unsigned char notused[2];
196 int refcount;
197 } robj;
198
199 typedef struct redisDb {
200 dict *dict;
201 dict *expires;
202 int id;
203 } redisDb;
204
205 /* With multiplexing we need to take per-clinet state.
206 * Clients are taken in a liked list. */
207 typedef struct redisClient {
208 int fd;
209 redisDb *db;
210 int dictid;
211 sds querybuf;
212 robj **argv, **mbargv;
213 int argc, mbargc;
214 int bulklen; /* bulk read len. -1 if not in bulk read mode */
215 int multibulk; /* multi bulk command format active */
216 list *reply;
217 int sentlen;
218 time_t lastinteraction; /* time of the last interaction, used for timeout */
219 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
220 int slaveseldb; /* slave selected db, if this client is a slave */
221 int authenticated; /* when requirepass is non-NULL */
222 int replstate; /* replication state if this is a slave */
223 int repldbfd; /* replication DB file descriptor */
224 long repldboff; /* replication DB file offset */
225 off_t repldbsize; /* replication DB file size */
226 } redisClient;
227
228 struct saveparam {
229 time_t seconds;
230 int changes;
231 };
232
233 /* Global server state structure */
234 struct redisServer {
235 int port;
236 int fd;
237 redisDb *db;
238 dict *sharingpool;
239 unsigned int sharingpoolsize;
240 long long dirty; /* changes to DB from the last save */
241 list *clients;
242 list *slaves, *monitors;
243 char neterr[ANET_ERR_LEN];
244 aeEventLoop *el;
245 int cronloops; /* number of times the cron function run */
246 list *objfreelist; /* A list of freed objects to avoid malloc() */
247 time_t lastsave; /* Unix time of last save succeeede */
248 size_t usedmemory; /* Used memory in megabytes */
249 /* Fields used only for stats */
250 time_t stat_starttime; /* server start time */
251 long long stat_numcommands; /* number of processed commands */
252 long long stat_numconnections; /* number of connections received */
253 /* Configuration */
254 int verbosity;
255 int glueoutputbuf;
256 int maxidletime;
257 int dbnum;
258 int daemonize;
259 char *pidfile;
260 int bgsaveinprogress;
261 pid_t bgsavechildpid;
262 struct saveparam *saveparams;
263 int saveparamslen;
264 char *logfile;
265 char *bindaddr;
266 char *dbfilename;
267 char *requirepass;
268 int shareobjects;
269 /* Replication related */
270 int isslave;
271 char *masterhost;
272 int masterport;
273 redisClient *master; /* client that is master for this slave */
274 int replstate;
275 unsigned int maxclients;
276 unsigned long maxmemory;
277 /* Sort parameters - qsort_r() is only available under BSD so we
278 * have to take this state global, in order to pass it to sortCompare() */
279 int sort_desc;
280 int sort_alpha;
281 int sort_bypattern;
282 };
283
284 typedef void redisCommandProc(redisClient *c);
285 struct redisCommand {
286 char *name;
287 redisCommandProc *proc;
288 int arity;
289 int flags;
290 };
291
292 struct redisFunctionSym {
293 char *name;
294 unsigned long pointer;
295 };
296
297 typedef struct _redisSortObject {
298 robj *obj;
299 union {
300 double score;
301 robj *cmpobj;
302 } u;
303 } redisSortObject;
304
305 typedef struct _redisSortOperation {
306 int type;
307 robj *pattern;
308 } redisSortOperation;
309
310 /* ZSETs use a specialized version of Skiplists */
311
312 typedef struct zskiplistNode {
313 struct zskiplistNode **forward;
314 struct zskiplistNode *backward;
315 double score;
316 robj *obj;
317 } zskiplistNode;
318
319 typedef struct zskiplist {
320 struct zskiplistNode *header, *tail;
321 long length;
322 int level;
323 } zskiplist;
324
325 typedef struct zset {
326 dict *dict;
327 zskiplist *zsl;
328 } zset;
329
330 /* Our shared "common" objects */
331
332 struct sharedObjectsStruct {
333 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
334 *colon, *nullbulk, *nullmultibulk,
335 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
336 *outofrangeerr, *plus,
337 *select0, *select1, *select2, *select3, *select4,
338 *select5, *select6, *select7, *select8, *select9;
339 } shared;
340
341 /*================================ Prototypes =============================== */
342
343 static void freeStringObject(robj *o);
344 static void freeListObject(robj *o);
345 static void freeSetObject(robj *o);
346 static void decrRefCount(void *o);
347 static robj *createObject(int type, void *ptr);
348 static void freeClient(redisClient *c);
349 static int rdbLoad(char *filename);
350 static void addReply(redisClient *c, robj *obj);
351 static void addReplySds(redisClient *c, sds s);
352 static void incrRefCount(robj *o);
353 static int rdbSaveBackground(char *filename);
354 static robj *createStringObject(char *ptr, size_t len);
355 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
356 static int syncWithMaster(void);
357 static robj *tryObjectSharing(robj *o);
358 static int tryObjectEncoding(robj *o);
359 static robj *getDecodedObject(const robj *o);
360 static int removeExpire(redisDb *db, robj *key);
361 static int expireIfNeeded(redisDb *db, robj *key);
362 static int deleteIfVolatile(redisDb *db, robj *key);
363 static int deleteKey(redisDb *db, robj *key);
364 static time_t getExpire(redisDb *db, robj *key);
365 static int setExpire(redisDb *db, robj *key, time_t when);
366 static void updateSlavesWaitingBgsave(int bgsaveerr);
367 static void freeMemoryIfNeeded(void);
368 static int processCommand(redisClient *c);
369 static void setupSigSegvAction(void);
370 static void rdbRemoveTempFile(pid_t childpid);
371 static size_t stringObjectLen(robj *o);
372 static void processInputBuffer(redisClient *c);
373 static zskiplist *zslCreate(void);
374 static void zslFree(zskiplist *zsl);
375
376 static void authCommand(redisClient *c);
377 static void pingCommand(redisClient *c);
378 static void echoCommand(redisClient *c);
379 static void setCommand(redisClient *c);
380 static void setnxCommand(redisClient *c);
381 static void getCommand(redisClient *c);
382 static void delCommand(redisClient *c);
383 static void existsCommand(redisClient *c);
384 static void incrCommand(redisClient *c);
385 static void decrCommand(redisClient *c);
386 static void incrbyCommand(redisClient *c);
387 static void decrbyCommand(redisClient *c);
388 static void selectCommand(redisClient *c);
389 static void randomkeyCommand(redisClient *c);
390 static void keysCommand(redisClient *c);
391 static void dbsizeCommand(redisClient *c);
392 static void lastsaveCommand(redisClient *c);
393 static void saveCommand(redisClient *c);
394 static void bgsaveCommand(redisClient *c);
395 static void shutdownCommand(redisClient *c);
396 static void moveCommand(redisClient *c);
397 static void renameCommand(redisClient *c);
398 static void renamenxCommand(redisClient *c);
399 static void lpushCommand(redisClient *c);
400 static void rpushCommand(redisClient *c);
401 static void lpopCommand(redisClient *c);
402 static void rpopCommand(redisClient *c);
403 static void llenCommand(redisClient *c);
404 static void lindexCommand(redisClient *c);
405 static void lrangeCommand(redisClient *c);
406 static void ltrimCommand(redisClient *c);
407 static void typeCommand(redisClient *c);
408 static void lsetCommand(redisClient *c);
409 static void saddCommand(redisClient *c);
410 static void sremCommand(redisClient *c);
411 static void smoveCommand(redisClient *c);
412 static void sismemberCommand(redisClient *c);
413 static void scardCommand(redisClient *c);
414 static void spopCommand(redisClient *c);
415 static void srandmemberCommand(redisClient *c);
416 static void sinterCommand(redisClient *c);
417 static void sinterstoreCommand(redisClient *c);
418 static void sunionCommand(redisClient *c);
419 static void sunionstoreCommand(redisClient *c);
420 static void sdiffCommand(redisClient *c);
421 static void sdiffstoreCommand(redisClient *c);
422 static void syncCommand(redisClient *c);
423 static void flushdbCommand(redisClient *c);
424 static void flushallCommand(redisClient *c);
425 static void sortCommand(redisClient *c);
426 static void lremCommand(redisClient *c);
427 static void infoCommand(redisClient *c);
428 static void mgetCommand(redisClient *c);
429 static void monitorCommand(redisClient *c);
430 static void expireCommand(redisClient *c);
431 static void getsetCommand(redisClient *c);
432 static void ttlCommand(redisClient *c);
433 static void slaveofCommand(redisClient *c);
434 static void debugCommand(redisClient *c);
435 static void msetCommand(redisClient *c);
436 static void msetnxCommand(redisClient *c);
437 static void zaddCommand(redisClient *c);
438 static void zrangeCommand(redisClient *c);
439 static void zrevrangeCommand(redisClient *c);
440 static void zlenCommand(redisClient *c);
441 static void zremCommand(redisClient *c);
442
443 /*================================= Globals ================================= */
444
445 /* Global vars */
446 static struct redisServer server; /* server global state */
447 static struct redisCommand cmdTable[] = {
448 {"get",getCommand,2,REDIS_CMD_INLINE},
449 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
450 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
451 {"del",delCommand,-2,REDIS_CMD_INLINE},
452 {"exists",existsCommand,2,REDIS_CMD_INLINE},
453 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
454 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
455 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
456 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
457 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
458 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
459 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
460 {"llen",llenCommand,2,REDIS_CMD_INLINE},
461 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
462 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
463 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
464 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
465 {"lrem",lremCommand,4,REDIS_CMD_BULK},
466 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
467 {"srem",sremCommand,3,REDIS_CMD_BULK},
468 {"smove",smoveCommand,4,REDIS_CMD_BULK},
469 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
470 {"scard",scardCommand,2,REDIS_CMD_INLINE},
471 {"spop",spopCommand,2,REDIS_CMD_INLINE},
472 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
473 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
474 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
475 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
476 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
477 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
478 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
479 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
480 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
481 {"zrem",zremCommand,3,REDIS_CMD_BULK},
482 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
483 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
484 {"zlen",zlenCommand,2,REDIS_CMD_INLINE},
485 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
486 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
487 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
488 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
489 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
490 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
491 {"select",selectCommand,2,REDIS_CMD_INLINE},
492 {"move",moveCommand,3,REDIS_CMD_INLINE},
493 {"rename",renameCommand,3,REDIS_CMD_INLINE},
494 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
495 {"expire",expireCommand,3,REDIS_CMD_INLINE},
496 {"keys",keysCommand,2,REDIS_CMD_INLINE},
497 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
498 {"auth",authCommand,2,REDIS_CMD_INLINE},
499 {"ping",pingCommand,1,REDIS_CMD_INLINE},
500 {"echo",echoCommand,2,REDIS_CMD_BULK},
501 {"save",saveCommand,1,REDIS_CMD_INLINE},
502 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
503 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
504 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
505 {"type",typeCommand,2,REDIS_CMD_INLINE},
506 {"sync",syncCommand,1,REDIS_CMD_INLINE},
507 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
508 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
509 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
510 {"info",infoCommand,1,REDIS_CMD_INLINE},
511 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
512 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
513 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
514 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
515 {NULL,NULL,0,0}
516 };
517 /*============================ Utility functions ============================ */
518
519 /* Glob-style pattern matching. */
520 int stringmatchlen(const char *pattern, int patternLen,
521 const char *string, int stringLen, int nocase)
522 {
523 while(patternLen) {
524 switch(pattern[0]) {
525 case '*':
526 while (pattern[1] == '*') {
527 pattern++;
528 patternLen--;
529 }
530 if (patternLen == 1)
531 return 1; /* match */
532 while(stringLen) {
533 if (stringmatchlen(pattern+1, patternLen-1,
534 string, stringLen, nocase))
535 return 1; /* match */
536 string++;
537 stringLen--;
538 }
539 return 0; /* no match */
540 break;
541 case '?':
542 if (stringLen == 0)
543 return 0; /* no match */
544 string++;
545 stringLen--;
546 break;
547 case '[':
548 {
549 int not, match;
550
551 pattern++;
552 patternLen--;
553 not = pattern[0] == '^';
554 if (not) {
555 pattern++;
556 patternLen--;
557 }
558 match = 0;
559 while(1) {
560 if (pattern[0] == '\\') {
561 pattern++;
562 patternLen--;
563 if (pattern[0] == string[0])
564 match = 1;
565 } else if (pattern[0] == ']') {
566 break;
567 } else if (patternLen == 0) {
568 pattern--;
569 patternLen++;
570 break;
571 } else if (pattern[1] == '-' && patternLen >= 3) {
572 int start = pattern[0];
573 int end = pattern[2];
574 int c = string[0];
575 if (start > end) {
576 int t = start;
577 start = end;
578 end = t;
579 }
580 if (nocase) {
581 start = tolower(start);
582 end = tolower(end);
583 c = tolower(c);
584 }
585 pattern += 2;
586 patternLen -= 2;
587 if (c >= start && c <= end)
588 match = 1;
589 } else {
590 if (!nocase) {
591 if (pattern[0] == string[0])
592 match = 1;
593 } else {
594 if (tolower((int)pattern[0]) == tolower((int)string[0]))
595 match = 1;
596 }
597 }
598 pattern++;
599 patternLen--;
600 }
601 if (not)
602 match = !match;
603 if (!match)
604 return 0; /* no match */
605 string++;
606 stringLen--;
607 break;
608 }
609 case '\\':
610 if (patternLen >= 2) {
611 pattern++;
612 patternLen--;
613 }
614 /* fall through */
615 default:
616 if (!nocase) {
617 if (pattern[0] != string[0])
618 return 0; /* no match */
619 } else {
620 if (tolower((int)pattern[0]) != tolower((int)string[0]))
621 return 0; /* no match */
622 }
623 string++;
624 stringLen--;
625 break;
626 }
627 pattern++;
628 patternLen--;
629 if (stringLen == 0) {
630 while(*pattern == '*') {
631 pattern++;
632 patternLen--;
633 }
634 break;
635 }
636 }
637 if (patternLen == 0 && stringLen == 0)
638 return 1;
639 return 0;
640 }
641
642 static void redisLog(int level, const char *fmt, ...) {
643 va_list ap;
644 FILE *fp;
645
646 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
647 if (!fp) return;
648
649 va_start(ap, fmt);
650 if (level >= server.verbosity) {
651 char *c = ".-*";
652 char buf[64];
653 time_t now;
654
655 now = time(NULL);
656 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
657 fprintf(fp,"%s %c ",buf,c[level]);
658 vfprintf(fp, fmt, ap);
659 fprintf(fp,"\n");
660 fflush(fp);
661 }
662 va_end(ap);
663
664 if (server.logfile) fclose(fp);
665 }
666
667 /*====================== Hash table type implementation ==================== */
668
669 /* This is an hash table type that uses the SDS dynamic strings libary as
670 * keys and radis objects as values (objects can hold SDS strings,
671 * lists, sets). */
672
673 static void dictVanillaFree(void *privdata, void *val)
674 {
675 DICT_NOTUSED(privdata);
676 zfree(val);
677 }
678
679 static int sdsDictKeyCompare(void *privdata, const void *key1,
680 const void *key2)
681 {
682 int l1,l2;
683 DICT_NOTUSED(privdata);
684
685 l1 = sdslen((sds)key1);
686 l2 = sdslen((sds)key2);
687 if (l1 != l2) return 0;
688 return memcmp(key1, key2, l1) == 0;
689 }
690
691 static void dictRedisObjectDestructor(void *privdata, void *val)
692 {
693 DICT_NOTUSED(privdata);
694
695 decrRefCount(val);
696 }
697
698 static int dictObjKeyCompare(void *privdata, const void *key1,
699 const void *key2)
700 {
701 const robj *o1 = key1, *o2 = key2;
702 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
703 }
704
705 static unsigned int dictObjHash(const void *key) {
706 const robj *o = key;
707 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
708 }
709
710 static int dictEncObjKeyCompare(void *privdata, const void *key1,
711 const void *key2)
712 {
713 const robj *o1 = key1, *o2 = key2;
714
715 if (o1->encoding == REDIS_ENCODING_RAW &&
716 o2->encoding == REDIS_ENCODING_RAW)
717 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
718 else {
719 robj *dec1, *dec2;
720 int cmp;
721
722 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
723 getDecodedObject(o1) : (robj*)o1;
724 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
725 getDecodedObject(o2) : (robj*)o2;
726 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
727 if (dec1 != o1) decrRefCount(dec1);
728 if (dec2 != o2) decrRefCount(dec2);
729 return cmp;
730 }
731 }
732
733 static unsigned int dictEncObjHash(const void *key) {
734 const robj *o = key;
735
736 if (o->encoding == REDIS_ENCODING_RAW)
737 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
738 else {
739 robj *dec = getDecodedObject(o);
740 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
741 decrRefCount(dec);
742 return hash;
743 }
744 }
745
746 static dictType setDictType = {
747 dictEncObjHash, /* hash function */
748 NULL, /* key dup */
749 NULL, /* val dup */
750 dictEncObjKeyCompare, /* key compare */
751 dictRedisObjectDestructor, /* key destructor */
752 NULL /* val destructor */
753 };
754
755 static dictType zsetDictType = {
756 dictEncObjHash, /* hash function */
757 NULL, /* key dup */
758 NULL, /* val dup */
759 dictEncObjKeyCompare, /* key compare */
760 dictRedisObjectDestructor, /* key destructor */
761 dictVanillaFree /* val destructor */
762 };
763
764 static dictType hashDictType = {
765 dictObjHash, /* hash function */
766 NULL, /* key dup */
767 NULL, /* val dup */
768 dictObjKeyCompare, /* key compare */
769 dictRedisObjectDestructor, /* key destructor */
770 dictRedisObjectDestructor /* val destructor */
771 };
772
773 /* ========================= Random utility functions ======================= */
774
775 /* Redis generally does not try to recover from out of memory conditions
776 * when allocating objects or strings, it is not clear if it will be possible
777 * to report this condition to the client since the networking layer itself
778 * is based on heap allocation for send buffers, so we simply abort.
779 * At least the code will be simpler to read... */
780 static void oom(const char *msg) {
781 fprintf(stderr, "%s: Out of memory\n",msg);
782 fflush(stderr);
783 sleep(1);
784 abort();
785 }
786
787 /* ====================== Redis server networking stuff ===================== */
788 static void closeTimedoutClients(void) {
789 redisClient *c;
790 listNode *ln;
791 time_t now = time(NULL);
792
793 listRewind(server.clients);
794 while ((ln = listYield(server.clients)) != NULL) {
795 c = listNodeValue(ln);
796 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
797 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
798 (now - c->lastinteraction > server.maxidletime)) {
799 redisLog(REDIS_DEBUG,"Closing idle client");
800 freeClient(c);
801 }
802 }
803 }
804
805 static int htNeedsResize(dict *dict) {
806 long long size, used;
807
808 size = dictSlots(dict);
809 used = dictSize(dict);
810 return (size && used && size > DICT_HT_INITIAL_SIZE &&
811 (used*100/size < REDIS_HT_MINFILL));
812 }
813
814 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
815 * we resize the hash table to save memory */
816 static void tryResizeHashTables(void) {
817 int j;
818
819 for (j = 0; j < server.dbnum; j++) {
820 if (htNeedsResize(server.db[j].dict)) {
821 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
822 dictResize(server.db[j].dict);
823 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
824 }
825 if (htNeedsResize(server.db[j].expires))
826 dictResize(server.db[j].expires);
827 }
828 }
829
830 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
831 int j, loops = server.cronloops++;
832 REDIS_NOTUSED(eventLoop);
833 REDIS_NOTUSED(id);
834 REDIS_NOTUSED(clientData);
835
836 /* Update the global state with the amount of used memory */
837 server.usedmemory = zmalloc_used_memory();
838
839 /* Show some info about non-empty databases */
840 for (j = 0; j < server.dbnum; j++) {
841 long long size, used, vkeys;
842
843 size = dictSlots(server.db[j].dict);
844 used = dictSize(server.db[j].dict);
845 vkeys = dictSize(server.db[j].expires);
846 if (!(loops % 5) && (used || vkeys)) {
847 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
848 /* dictPrintStats(server.dict); */
849 }
850 }
851
852 /* We don't want to resize the hash tables while a bacground saving
853 * is in progress: the saving child is created using fork() that is
854 * implemented with a copy-on-write semantic in most modern systems, so
855 * if we resize the HT while there is the saving child at work actually
856 * a lot of memory movements in the parent will cause a lot of pages
857 * copied. */
858 if (!server.bgsaveinprogress) tryResizeHashTables();
859
860 /* Show information about connected clients */
861 if (!(loops % 5)) {
862 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
863 listLength(server.clients)-listLength(server.slaves),
864 listLength(server.slaves),
865 server.usedmemory,
866 dictSize(server.sharingpool));
867 }
868
869 /* Close connections of timedout clients */
870 if (server.maxidletime && !(loops % 10))
871 closeTimedoutClients();
872
873 /* Check if a background saving in progress terminated */
874 if (server.bgsaveinprogress) {
875 int statloc;
876 if (wait4(-1,&statloc,WNOHANG,NULL)) {
877 int exitcode = WEXITSTATUS(statloc);
878 int bysignal = WIFSIGNALED(statloc);
879
880 if (!bysignal && exitcode == 0) {
881 redisLog(REDIS_NOTICE,
882 "Background saving terminated with success");
883 server.dirty = 0;
884 server.lastsave = time(NULL);
885 } else if (!bysignal && exitcode != 0) {
886 redisLog(REDIS_WARNING, "Background saving error");
887 } else {
888 redisLog(REDIS_WARNING,
889 "Background saving terminated by signal");
890 rdbRemoveTempFile(server.bgsavechildpid);
891 }
892 server.bgsaveinprogress = 0;
893 server.bgsavechildpid = -1;
894 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
895 }
896 } else {
897 /* If there is not a background saving in progress check if
898 * we have to save now */
899 time_t now = time(NULL);
900 for (j = 0; j < server.saveparamslen; j++) {
901 struct saveparam *sp = server.saveparams+j;
902
903 if (server.dirty >= sp->changes &&
904 now-server.lastsave > sp->seconds) {
905 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
906 sp->changes, sp->seconds);
907 rdbSaveBackground(server.dbfilename);
908 break;
909 }
910 }
911 }
912
913 /* Try to expire a few timed out keys */
914 for (j = 0; j < server.dbnum; j++) {
915 redisDb *db = server.db+j;
916 int num = dictSize(db->expires);
917
918 if (num) {
919 time_t now = time(NULL);
920
921 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
922 num = REDIS_EXPIRELOOKUPS_PER_CRON;
923 while (num--) {
924 dictEntry *de;
925 time_t t;
926
927 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
928 t = (time_t) dictGetEntryVal(de);
929 if (now > t) {
930 deleteKey(db,dictGetEntryKey(de));
931 }
932 }
933 }
934 }
935
936 /* Check if we should connect to a MASTER */
937 if (server.replstate == REDIS_REPL_CONNECT) {
938 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
939 if (syncWithMaster() == REDIS_OK) {
940 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
941 }
942 }
943 return 1000;
944 }
945
946 static void createSharedObjects(void) {
947 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
948 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
949 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
950 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
951 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
952 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
953 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
954 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
955 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
956 /* no such key */
957 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
958 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
959 "-ERR Operation against a key holding the wrong kind of value\r\n"));
960 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
961 "-ERR no such key\r\n"));
962 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
963 "-ERR syntax error\r\n"));
964 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
965 "-ERR source and destination objects are the same\r\n"));
966 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
967 "-ERR index out of range\r\n"));
968 shared.space = createObject(REDIS_STRING,sdsnew(" "));
969 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
970 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
971 shared.select0 = createStringObject("select 0\r\n",10);
972 shared.select1 = createStringObject("select 1\r\n",10);
973 shared.select2 = createStringObject("select 2\r\n",10);
974 shared.select3 = createStringObject("select 3\r\n",10);
975 shared.select4 = createStringObject("select 4\r\n",10);
976 shared.select5 = createStringObject("select 5\r\n",10);
977 shared.select6 = createStringObject("select 6\r\n",10);
978 shared.select7 = createStringObject("select 7\r\n",10);
979 shared.select8 = createStringObject("select 8\r\n",10);
980 shared.select9 = createStringObject("select 9\r\n",10);
981 }
982
983 static void appendServerSaveParams(time_t seconds, int changes) {
984 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
985 server.saveparams[server.saveparamslen].seconds = seconds;
986 server.saveparams[server.saveparamslen].changes = changes;
987 server.saveparamslen++;
988 }
989
990 static void ResetServerSaveParams() {
991 zfree(server.saveparams);
992 server.saveparams = NULL;
993 server.saveparamslen = 0;
994 }
995
996 static void initServerConfig() {
997 server.dbnum = REDIS_DEFAULT_DBNUM;
998 server.port = REDIS_SERVERPORT;
999 server.verbosity = REDIS_DEBUG;
1000 server.maxidletime = REDIS_MAXIDLETIME;
1001 server.saveparams = NULL;
1002 server.logfile = NULL; /* NULL = log on standard output */
1003 server.bindaddr = NULL;
1004 server.glueoutputbuf = 1;
1005 server.daemonize = 0;
1006 server.pidfile = "/var/run/redis.pid";
1007 server.dbfilename = "dump.rdb";
1008 server.requirepass = NULL;
1009 server.shareobjects = 0;
1010 server.sharingpoolsize = 1024;
1011 server.maxclients = 0;
1012 server.maxmemory = 0;
1013 ResetServerSaveParams();
1014
1015 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1016 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1017 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1018 /* Replication related */
1019 server.isslave = 0;
1020 server.masterhost = NULL;
1021 server.masterport = 6379;
1022 server.master = NULL;
1023 server.replstate = REDIS_REPL_NONE;
1024 }
1025
1026 static void initServer() {
1027 int j;
1028
1029 signal(SIGHUP, SIG_IGN);
1030 signal(SIGPIPE, SIG_IGN);
1031 setupSigSegvAction();
1032
1033 server.clients = listCreate();
1034 server.slaves = listCreate();
1035 server.monitors = listCreate();
1036 server.objfreelist = listCreate();
1037 createSharedObjects();
1038 server.el = aeCreateEventLoop();
1039 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1040 server.sharingpool = dictCreate(&setDictType,NULL);
1041 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1042 if (server.fd == -1) {
1043 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1044 exit(1);
1045 }
1046 for (j = 0; j < server.dbnum; j++) {
1047 server.db[j].dict = dictCreate(&hashDictType,NULL);
1048 server.db[j].expires = dictCreate(&setDictType,NULL);
1049 server.db[j].id = j;
1050 }
1051 server.cronloops = 0;
1052 server.bgsaveinprogress = 0;
1053 server.bgsavechildpid = -1;
1054 server.lastsave = time(NULL);
1055 server.dirty = 0;
1056 server.usedmemory = 0;
1057 server.stat_numcommands = 0;
1058 server.stat_numconnections = 0;
1059 server.stat_starttime = time(NULL);
1060 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1061 }
1062
1063 /* Empty the whole database */
1064 static long long emptyDb() {
1065 int j;
1066 long long removed = 0;
1067
1068 for (j = 0; j < server.dbnum; j++) {
1069 removed += dictSize(server.db[j].dict);
1070 dictEmpty(server.db[j].dict);
1071 dictEmpty(server.db[j].expires);
1072 }
1073 return removed;
1074 }
1075
1076 static int yesnotoi(char *s) {
1077 if (!strcasecmp(s,"yes")) return 1;
1078 else if (!strcasecmp(s,"no")) return 0;
1079 else return -1;
1080 }
1081
1082 /* I agree, this is a very rudimental way to load a configuration...
1083 will improve later if the config gets more complex */
1084 static void loadServerConfig(char *filename) {
1085 FILE *fp;
1086 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1087 int linenum = 0;
1088 sds line = NULL;
1089
1090 if (filename[0] == '-' && filename[1] == '\0')
1091 fp = stdin;
1092 else {
1093 if ((fp = fopen(filename,"r")) == NULL) {
1094 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1095 exit(1);
1096 }
1097 }
1098
1099 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1100 sds *argv;
1101 int argc, j;
1102
1103 linenum++;
1104 line = sdsnew(buf);
1105 line = sdstrim(line," \t\r\n");
1106
1107 /* Skip comments and blank lines*/
1108 if (line[0] == '#' || line[0] == '\0') {
1109 sdsfree(line);
1110 continue;
1111 }
1112
1113 /* Split into arguments */
1114 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1115 sdstolower(argv[0]);
1116
1117 /* Execute config directives */
1118 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1119 server.maxidletime = atoi(argv[1]);
1120 if (server.maxidletime < 0) {
1121 err = "Invalid timeout value"; goto loaderr;
1122 }
1123 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1124 server.port = atoi(argv[1]);
1125 if (server.port < 1 || server.port > 65535) {
1126 err = "Invalid port"; goto loaderr;
1127 }
1128 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1129 server.bindaddr = zstrdup(argv[1]);
1130 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1131 int seconds = atoi(argv[1]);
1132 int changes = atoi(argv[2]);
1133 if (seconds < 1 || changes < 0) {
1134 err = "Invalid save parameters"; goto loaderr;
1135 }
1136 appendServerSaveParams(seconds,changes);
1137 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1138 if (chdir(argv[1]) == -1) {
1139 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1140 argv[1], strerror(errno));
1141 exit(1);
1142 }
1143 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1144 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1145 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1146 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1147 else {
1148 err = "Invalid log level. Must be one of debug, notice, warning";
1149 goto loaderr;
1150 }
1151 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1152 FILE *logfp;
1153
1154 server.logfile = zstrdup(argv[1]);
1155 if (!strcasecmp(server.logfile,"stdout")) {
1156 zfree(server.logfile);
1157 server.logfile = NULL;
1158 }
1159 if (server.logfile) {
1160 /* Test if we are able to open the file. The server will not
1161 * be able to abort just for this problem later... */
1162 logfp = fopen(server.logfile,"a");
1163 if (logfp == NULL) {
1164 err = sdscatprintf(sdsempty(),
1165 "Can't open the log file: %s", strerror(errno));
1166 goto loaderr;
1167 }
1168 fclose(logfp);
1169 }
1170 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1171 server.dbnum = atoi(argv[1]);
1172 if (server.dbnum < 1) {
1173 err = "Invalid number of databases"; goto loaderr;
1174 }
1175 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1176 server.maxclients = atoi(argv[1]);
1177 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1178 server.maxmemory = strtoll(argv[1], NULL, 10);
1179 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1180 server.masterhost = sdsnew(argv[1]);
1181 server.masterport = atoi(argv[2]);
1182 server.replstate = REDIS_REPL_CONNECT;
1183 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1184 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1185 err = "argument must be 'yes' or 'no'"; goto loaderr;
1186 }
1187 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1188 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1189 err = "argument must be 'yes' or 'no'"; goto loaderr;
1190 }
1191 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1192 server.sharingpoolsize = atoi(argv[1]);
1193 if (server.sharingpoolsize < 1) {
1194 err = "invalid object sharing pool size"; goto loaderr;
1195 }
1196 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1197 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1198 err = "argument must be 'yes' or 'no'"; goto loaderr;
1199 }
1200 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1201 server.requirepass = zstrdup(argv[1]);
1202 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1203 server.pidfile = zstrdup(argv[1]);
1204 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1205 server.dbfilename = zstrdup(argv[1]);
1206 } else {
1207 err = "Bad directive or wrong number of arguments"; goto loaderr;
1208 }
1209 for (j = 0; j < argc; j++)
1210 sdsfree(argv[j]);
1211 zfree(argv);
1212 sdsfree(line);
1213 }
1214 if (fp != stdin) fclose(fp);
1215 return;
1216
1217 loaderr:
1218 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1219 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1220 fprintf(stderr, ">>> '%s'\n", line);
1221 fprintf(stderr, "%s\n", err);
1222 exit(1);
1223 }
1224
1225 static void freeClientArgv(redisClient *c) {
1226 int j;
1227
1228 for (j = 0; j < c->argc; j++)
1229 decrRefCount(c->argv[j]);
1230 for (j = 0; j < c->mbargc; j++)
1231 decrRefCount(c->mbargv[j]);
1232 c->argc = 0;
1233 c->mbargc = 0;
1234 }
1235
1236 static void freeClient(redisClient *c) {
1237 listNode *ln;
1238
1239 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1240 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1241 sdsfree(c->querybuf);
1242 listRelease(c->reply);
1243 freeClientArgv(c);
1244 close(c->fd);
1245 ln = listSearchKey(server.clients,c);
1246 assert(ln != NULL);
1247 listDelNode(server.clients,ln);
1248 if (c->flags & REDIS_SLAVE) {
1249 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1250 close(c->repldbfd);
1251 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1252 ln = listSearchKey(l,c);
1253 assert(ln != NULL);
1254 listDelNode(l,ln);
1255 }
1256 if (c->flags & REDIS_MASTER) {
1257 server.master = NULL;
1258 server.replstate = REDIS_REPL_CONNECT;
1259 }
1260 zfree(c->argv);
1261 zfree(c->mbargv);
1262 zfree(c);
1263 }
1264
1265 static void glueReplyBuffersIfNeeded(redisClient *c) {
1266 int totlen = 0;
1267 listNode *ln;
1268 robj *o;
1269
1270 listRewind(c->reply);
1271 while((ln = listYield(c->reply))) {
1272 o = ln->value;
1273 totlen += sdslen(o->ptr);
1274 /* This optimization makes more sense if we don't have to copy
1275 * too much data */
1276 if (totlen > 1024) return;
1277 }
1278 if (totlen > 0) {
1279 char buf[1024];
1280 int copylen = 0;
1281
1282 listRewind(c->reply);
1283 while((ln = listYield(c->reply))) {
1284 o = ln->value;
1285 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1286 copylen += sdslen(o->ptr);
1287 listDelNode(c->reply,ln);
1288 }
1289 /* Now the output buffer is empty, add the new single element */
1290 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1291 listAddNodeTail(c->reply,o);
1292 }
1293 }
1294
1295 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1296 redisClient *c = privdata;
1297 int nwritten = 0, totwritten = 0, objlen;
1298 robj *o;
1299 REDIS_NOTUSED(el);
1300 REDIS_NOTUSED(mask);
1301
1302 if (server.glueoutputbuf && listLength(c->reply) > 1)
1303 glueReplyBuffersIfNeeded(c);
1304 while(listLength(c->reply)) {
1305 o = listNodeValue(listFirst(c->reply));
1306 objlen = sdslen(o->ptr);
1307
1308 if (objlen == 0) {
1309 listDelNode(c->reply,listFirst(c->reply));
1310 continue;
1311 }
1312
1313 if (c->flags & REDIS_MASTER) {
1314 /* Don't reply to a master */
1315 nwritten = objlen - c->sentlen;
1316 } else {
1317 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1318 if (nwritten <= 0) break;
1319 }
1320 c->sentlen += nwritten;
1321 totwritten += nwritten;
1322 /* If we fully sent the object on head go to the next one */
1323 if (c->sentlen == objlen) {
1324 listDelNode(c->reply,listFirst(c->reply));
1325 c->sentlen = 0;
1326 }
1327 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1328 * bytes, in a single threaded server it's a good idea to server
1329 * other clients as well, even if a very large request comes from
1330 * super fast link that is always able to accept data (in real world
1331 * terms think to 'KEYS *' against the loopback interfae) */
1332 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1333 }
1334 if (nwritten == -1) {
1335 if (errno == EAGAIN) {
1336 nwritten = 0;
1337 } else {
1338 redisLog(REDIS_DEBUG,
1339 "Error writing to client: %s", strerror(errno));
1340 freeClient(c);
1341 return;
1342 }
1343 }
1344 if (totwritten > 0) c->lastinteraction = time(NULL);
1345 if (listLength(c->reply) == 0) {
1346 c->sentlen = 0;
1347 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1348 }
1349 }
1350
1351 static struct redisCommand *lookupCommand(char *name) {
1352 int j = 0;
1353 while(cmdTable[j].name != NULL) {
1354 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1355 j++;
1356 }
1357 return NULL;
1358 }
1359
1360 /* resetClient prepare the client to process the next command */
1361 static void resetClient(redisClient *c) {
1362 freeClientArgv(c);
1363 c->bulklen = -1;
1364 c->multibulk = 0;
1365 }
1366
1367 /* If this function gets called we already read a whole
1368 * command, argments are in the client argv/argc fields.
1369 * processCommand() execute the command or prepare the
1370 * server for a bulk read from the client.
1371 *
1372 * If 1 is returned the client is still alive and valid and
1373 * and other operations can be performed by the caller. Otherwise
1374 * if 0 is returned the client was destroied (i.e. after QUIT). */
1375 static int processCommand(redisClient *c) {
1376 struct redisCommand *cmd;
1377 long long dirty;
1378
1379 /* Free some memory if needed (maxmemory setting) */
1380 if (server.maxmemory) freeMemoryIfNeeded();
1381
1382 /* Handle the multi bulk command type. This is an alternative protocol
1383 * supported by Redis in order to receive commands that are composed of
1384 * multiple binary-safe "bulk" arguments. The latency of processing is
1385 * a bit higher but this allows things like multi-sets, so if this
1386 * protocol is used only for MSET and similar commands this is a big win. */
1387 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1388 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1389 if (c->multibulk <= 0) {
1390 resetClient(c);
1391 return 1;
1392 } else {
1393 decrRefCount(c->argv[c->argc-1]);
1394 c->argc--;
1395 return 1;
1396 }
1397 } else if (c->multibulk) {
1398 if (c->bulklen == -1) {
1399 if (((char*)c->argv[0]->ptr)[0] != '$') {
1400 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1401 resetClient(c);
1402 return 1;
1403 } else {
1404 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1405 decrRefCount(c->argv[0]);
1406 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1407 c->argc--;
1408 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1409 resetClient(c);
1410 return 1;
1411 }
1412 c->argc--;
1413 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1414 return 1;
1415 }
1416 } else {
1417 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1418 c->mbargv[c->mbargc] = c->argv[0];
1419 c->mbargc++;
1420 c->argc--;
1421 c->multibulk--;
1422 if (c->multibulk == 0) {
1423 robj **auxargv;
1424 int auxargc;
1425
1426 /* Here we need to swap the multi-bulk argc/argv with the
1427 * normal argc/argv of the client structure. */
1428 auxargv = c->argv;
1429 c->argv = c->mbargv;
1430 c->mbargv = auxargv;
1431
1432 auxargc = c->argc;
1433 c->argc = c->mbargc;
1434 c->mbargc = auxargc;
1435
1436 /* We need to set bulklen to something different than -1
1437 * in order for the code below to process the command without
1438 * to try to read the last argument of a bulk command as
1439 * a special argument. */
1440 c->bulklen = 0;
1441 /* continue below and process the command */
1442 } else {
1443 c->bulklen = -1;
1444 return 1;
1445 }
1446 }
1447 }
1448 /* -- end of multi bulk commands processing -- */
1449
1450 /* The QUIT command is handled as a special case. Normal command
1451 * procs are unable to close the client connection safely */
1452 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1453 freeClient(c);
1454 return 0;
1455 }
1456 cmd = lookupCommand(c->argv[0]->ptr);
1457 if (!cmd) {
1458 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1459 resetClient(c);
1460 return 1;
1461 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1462 (c->argc < -cmd->arity)) {
1463 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1464 resetClient(c);
1465 return 1;
1466 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1467 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1468 resetClient(c);
1469 return 1;
1470 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1471 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1472
1473 decrRefCount(c->argv[c->argc-1]);
1474 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1475 c->argc--;
1476 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1477 resetClient(c);
1478 return 1;
1479 }
1480 c->argc--;
1481 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1482 /* It is possible that the bulk read is already in the
1483 * buffer. Check this condition and handle it accordingly.
1484 * This is just a fast path, alternative to call processInputBuffer().
1485 * It's a good idea since the code is small and this condition
1486 * happens most of the times. */
1487 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1488 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1489 c->argc++;
1490 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1491 } else {
1492 return 1;
1493 }
1494 }
1495 /* Let's try to share objects on the command arguments vector */
1496 if (server.shareobjects) {
1497 int j;
1498 for(j = 1; j < c->argc; j++)
1499 c->argv[j] = tryObjectSharing(c->argv[j]);
1500 }
1501 /* Let's try to encode the bulk object to save space. */
1502 if (cmd->flags & REDIS_CMD_BULK)
1503 tryObjectEncoding(c->argv[c->argc-1]);
1504
1505 /* Check if the user is authenticated */
1506 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1507 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1508 resetClient(c);
1509 return 1;
1510 }
1511
1512 /* Exec the command */
1513 dirty = server.dirty;
1514 cmd->proc(c);
1515 if (server.dirty-dirty != 0 && listLength(server.slaves))
1516 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1517 if (listLength(server.monitors))
1518 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1519 server.stat_numcommands++;
1520
1521 /* Prepare the client for the next command */
1522 if (c->flags & REDIS_CLOSE) {
1523 freeClient(c);
1524 return 0;
1525 }
1526 resetClient(c);
1527 return 1;
1528 }
1529
1530 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1531 listNode *ln;
1532 int outc = 0, j;
1533 robj **outv;
1534 /* (args*2)+1 is enough room for args, spaces, newlines */
1535 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1536
1537 if (argc <= REDIS_STATIC_ARGS) {
1538 outv = static_outv;
1539 } else {
1540 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1541 }
1542
1543 for (j = 0; j < argc; j++) {
1544 if (j != 0) outv[outc++] = shared.space;
1545 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1546 robj *lenobj;
1547
1548 lenobj = createObject(REDIS_STRING,
1549 sdscatprintf(sdsempty(),"%d\r\n",
1550 stringObjectLen(argv[j])));
1551 lenobj->refcount = 0;
1552 outv[outc++] = lenobj;
1553 }
1554 outv[outc++] = argv[j];
1555 }
1556 outv[outc++] = shared.crlf;
1557
1558 /* Increment all the refcounts at start and decrement at end in order to
1559 * be sure to free objects if there is no slave in a replication state
1560 * able to be feed with commands */
1561 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1562 listRewind(slaves);
1563 while((ln = listYield(slaves))) {
1564 redisClient *slave = ln->value;
1565
1566 /* Don't feed slaves that are still waiting for BGSAVE to start */
1567 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1568
1569 /* Feed all the other slaves, MONITORs and so on */
1570 if (slave->slaveseldb != dictid) {
1571 robj *selectcmd;
1572
1573 switch(dictid) {
1574 case 0: selectcmd = shared.select0; break;
1575 case 1: selectcmd = shared.select1; break;
1576 case 2: selectcmd = shared.select2; break;
1577 case 3: selectcmd = shared.select3; break;
1578 case 4: selectcmd = shared.select4; break;
1579 case 5: selectcmd = shared.select5; break;
1580 case 6: selectcmd = shared.select6; break;
1581 case 7: selectcmd = shared.select7; break;
1582 case 8: selectcmd = shared.select8; break;
1583 case 9: selectcmd = shared.select9; break;
1584 default:
1585 selectcmd = createObject(REDIS_STRING,
1586 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1587 selectcmd->refcount = 0;
1588 break;
1589 }
1590 addReply(slave,selectcmd);
1591 slave->slaveseldb = dictid;
1592 }
1593 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1594 }
1595 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1596 if (outv != static_outv) zfree(outv);
1597 }
1598
1599 static void processInputBuffer(redisClient *c) {
1600 again:
1601 if (c->bulklen == -1) {
1602 /* Read the first line of the query */
1603 char *p = strchr(c->querybuf,'\n');
1604 size_t querylen;
1605
1606 if (p) {
1607 sds query, *argv;
1608 int argc, j;
1609
1610 query = c->querybuf;
1611 c->querybuf = sdsempty();
1612 querylen = 1+(p-(query));
1613 if (sdslen(query) > querylen) {
1614 /* leave data after the first line of the query in the buffer */
1615 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1616 }
1617 *p = '\0'; /* remove "\n" */
1618 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1619 sdsupdatelen(query);
1620
1621 /* Now we can split the query in arguments */
1622 if (sdslen(query) == 0) {
1623 /* Ignore empty query */
1624 sdsfree(query);
1625 return;
1626 }
1627 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1628 sdsfree(query);
1629
1630 if (c->argv) zfree(c->argv);
1631 c->argv = zmalloc(sizeof(robj*)*argc);
1632
1633 for (j = 0; j < argc; j++) {
1634 if (sdslen(argv[j])) {
1635 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1636 c->argc++;
1637 } else {
1638 sdsfree(argv[j]);
1639 }
1640 }
1641 zfree(argv);
1642 /* Execute the command. If the client is still valid
1643 * after processCommand() return and there is something
1644 * on the query buffer try to process the next command. */
1645 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1646 return;
1647 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1648 redisLog(REDIS_DEBUG, "Client protocol error");
1649 freeClient(c);
1650 return;
1651 }
1652 } else {
1653 /* Bulk read handling. Note that if we are at this point
1654 the client already sent a command terminated with a newline,
1655 we are reading the bulk data that is actually the last
1656 argument of the command. */
1657 int qbl = sdslen(c->querybuf);
1658
1659 if (c->bulklen <= qbl) {
1660 /* Copy everything but the final CRLF as final argument */
1661 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1662 c->argc++;
1663 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1664 /* Process the command. If the client is still valid after
1665 * the processing and there is more data in the buffer
1666 * try to parse it. */
1667 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1668 return;
1669 }
1670 }
1671 }
1672
1673 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1674 redisClient *c = (redisClient*) privdata;
1675 char buf[REDIS_IOBUF_LEN];
1676 int nread;
1677 REDIS_NOTUSED(el);
1678 REDIS_NOTUSED(mask);
1679
1680 nread = read(fd, buf, REDIS_IOBUF_LEN);
1681 if (nread == -1) {
1682 if (errno == EAGAIN) {
1683 nread = 0;
1684 } else {
1685 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1686 freeClient(c);
1687 return;
1688 }
1689 } else if (nread == 0) {
1690 redisLog(REDIS_DEBUG, "Client closed connection");
1691 freeClient(c);
1692 return;
1693 }
1694 if (nread) {
1695 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1696 c->lastinteraction = time(NULL);
1697 } else {
1698 return;
1699 }
1700 processInputBuffer(c);
1701 }
1702
1703 static int selectDb(redisClient *c, int id) {
1704 if (id < 0 || id >= server.dbnum)
1705 return REDIS_ERR;
1706 c->db = &server.db[id];
1707 return REDIS_OK;
1708 }
1709
1710 static void *dupClientReplyValue(void *o) {
1711 incrRefCount((robj*)o);
1712 return 0;
1713 }
1714
1715 static redisClient *createClient(int fd) {
1716 redisClient *c = zmalloc(sizeof(*c));
1717
1718 anetNonBlock(NULL,fd);
1719 anetTcpNoDelay(NULL,fd);
1720 if (!c) return NULL;
1721 selectDb(c,0);
1722 c->fd = fd;
1723 c->querybuf = sdsempty();
1724 c->argc = 0;
1725 c->argv = NULL;
1726 c->bulklen = -1;
1727 c->multibulk = 0;
1728 c->mbargc = 0;
1729 c->mbargv = NULL;
1730 c->sentlen = 0;
1731 c->flags = 0;
1732 c->lastinteraction = time(NULL);
1733 c->authenticated = 0;
1734 c->replstate = REDIS_REPL_NONE;
1735 c->reply = listCreate();
1736 listSetFreeMethod(c->reply,decrRefCount);
1737 listSetDupMethod(c->reply,dupClientReplyValue);
1738 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1739 readQueryFromClient, c, NULL) == AE_ERR) {
1740 freeClient(c);
1741 return NULL;
1742 }
1743 listAddNodeTail(server.clients,c);
1744 return c;
1745 }
1746
1747 static void addReply(redisClient *c, robj *obj) {
1748 if (listLength(c->reply) == 0 &&
1749 (c->replstate == REDIS_REPL_NONE ||
1750 c->replstate == REDIS_REPL_ONLINE) &&
1751 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1752 sendReplyToClient, c, NULL) == AE_ERR) return;
1753 if (obj->encoding != REDIS_ENCODING_RAW) {
1754 obj = getDecodedObject(obj);
1755 } else {
1756 incrRefCount(obj);
1757 }
1758 listAddNodeTail(c->reply,obj);
1759 }
1760
1761 static void addReplySds(redisClient *c, sds s) {
1762 robj *o = createObject(REDIS_STRING,s);
1763 addReply(c,o);
1764 decrRefCount(o);
1765 }
1766
1767 static void addReplyBulkLen(redisClient *c, robj *obj) {
1768 size_t len;
1769
1770 if (obj->encoding == REDIS_ENCODING_RAW) {
1771 len = sdslen(obj->ptr);
1772 } else {
1773 long n = (long)obj->ptr;
1774
1775 len = 1;
1776 if (n < 0) {
1777 len++;
1778 n = -n;
1779 }
1780 while((n = n/10) != 0) {
1781 len++;
1782 }
1783 }
1784 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1785 }
1786
1787 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1788 int cport, cfd;
1789 char cip[128];
1790 redisClient *c;
1791 REDIS_NOTUSED(el);
1792 REDIS_NOTUSED(mask);
1793 REDIS_NOTUSED(privdata);
1794
1795 cfd = anetAccept(server.neterr, fd, cip, &cport);
1796 if (cfd == AE_ERR) {
1797 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1798 return;
1799 }
1800 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1801 if ((c = createClient(cfd)) == NULL) {
1802 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1803 close(cfd); /* May be already closed, just ingore errors */
1804 return;
1805 }
1806 /* If maxclient directive is set and this is one client more... close the
1807 * connection. Note that we create the client instead to check before
1808 * for this condition, since now the socket is already set in nonblocking
1809 * mode and we can send an error for free using the Kernel I/O */
1810 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1811 char *err = "-ERR max number of clients reached\r\n";
1812
1813 /* That's a best effort error message, don't check write errors */
1814 (void) write(c->fd,err,strlen(err));
1815 freeClient(c);
1816 return;
1817 }
1818 server.stat_numconnections++;
1819 }
1820
1821 /* ======================= Redis objects implementation ===================== */
1822
1823 static robj *createObject(int type, void *ptr) {
1824 robj *o;
1825
1826 if (listLength(server.objfreelist)) {
1827 listNode *head = listFirst(server.objfreelist);
1828 o = listNodeValue(head);
1829 listDelNode(server.objfreelist,head);
1830 } else {
1831 o = zmalloc(sizeof(*o));
1832 }
1833 o->type = type;
1834 o->encoding = REDIS_ENCODING_RAW;
1835 o->ptr = ptr;
1836 o->refcount = 1;
1837 return o;
1838 }
1839
1840 static robj *createStringObject(char *ptr, size_t len) {
1841 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1842 }
1843
1844 static robj *createListObject(void) {
1845 list *l = listCreate();
1846
1847 listSetFreeMethod(l,decrRefCount);
1848 return createObject(REDIS_LIST,l);
1849 }
1850
1851 static robj *createSetObject(void) {
1852 dict *d = dictCreate(&setDictType,NULL);
1853 return createObject(REDIS_SET,d);
1854 }
1855
1856 static robj *createZsetObject(void) {
1857 zset *zs = zmalloc(sizeof(*zs));
1858
1859 zs->dict = dictCreate(&zsetDictType,NULL);
1860 zs->zsl = zslCreate();
1861 return createObject(REDIS_ZSET,zs);
1862 }
1863
1864 static void freeStringObject(robj *o) {
1865 if (o->encoding == REDIS_ENCODING_RAW) {
1866 sdsfree(o->ptr);
1867 }
1868 }
1869
1870 static void freeListObject(robj *o) {
1871 listRelease((list*) o->ptr);
1872 }
1873
1874 static void freeSetObject(robj *o) {
1875 dictRelease((dict*) o->ptr);
1876 }
1877
1878 static void freeZsetObject(robj *o) {
1879 zset *zs = o->ptr;
1880
1881 dictRelease(zs->dict);
1882 zslFree(zs->zsl);
1883 zfree(zs);
1884 }
1885
1886 static void freeHashObject(robj *o) {
1887 dictRelease((dict*) o->ptr);
1888 }
1889
1890 static void incrRefCount(robj *o) {
1891 o->refcount++;
1892 #ifdef DEBUG_REFCOUNT
1893 if (o->type == REDIS_STRING)
1894 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1895 #endif
1896 }
1897
1898 static void decrRefCount(void *obj) {
1899 robj *o = obj;
1900
1901 #ifdef DEBUG_REFCOUNT
1902 if (o->type == REDIS_STRING)
1903 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1904 #endif
1905 if (--(o->refcount) == 0) {
1906 switch(o->type) {
1907 case REDIS_STRING: freeStringObject(o); break;
1908 case REDIS_LIST: freeListObject(o); break;
1909 case REDIS_SET: freeSetObject(o); break;
1910 case REDIS_ZSET: freeZsetObject(o); break;
1911 case REDIS_HASH: freeHashObject(o); break;
1912 default: assert(0 != 0); break;
1913 }
1914 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1915 !listAddNodeHead(server.objfreelist,o))
1916 zfree(o);
1917 }
1918 }
1919
1920 static robj *lookupKey(redisDb *db, robj *key) {
1921 dictEntry *de = dictFind(db->dict,key);
1922 return de ? dictGetEntryVal(de) : NULL;
1923 }
1924
1925 static robj *lookupKeyRead(redisDb *db, robj *key) {
1926 expireIfNeeded(db,key);
1927 return lookupKey(db,key);
1928 }
1929
1930 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1931 deleteIfVolatile(db,key);
1932 return lookupKey(db,key);
1933 }
1934
1935 static int deleteKey(redisDb *db, robj *key) {
1936 int retval;
1937
1938 /* We need to protect key from destruction: after the first dictDelete()
1939 * it may happen that 'key' is no longer valid if we don't increment
1940 * it's count. This may happen when we get the object reference directly
1941 * from the hash table with dictRandomKey() or dict iterators */
1942 incrRefCount(key);
1943 if (dictSize(db->expires)) dictDelete(db->expires,key);
1944 retval = dictDelete(db->dict,key);
1945 decrRefCount(key);
1946
1947 return retval == DICT_OK;
1948 }
1949
1950 /* Try to share an object against the shared objects pool */
1951 static robj *tryObjectSharing(robj *o) {
1952 struct dictEntry *de;
1953 unsigned long c;
1954
1955 if (o == NULL || server.shareobjects == 0) return o;
1956
1957 assert(o->type == REDIS_STRING);
1958 de = dictFind(server.sharingpool,o);
1959 if (de) {
1960 robj *shared = dictGetEntryKey(de);
1961
1962 c = ((unsigned long) dictGetEntryVal(de))+1;
1963 dictGetEntryVal(de) = (void*) c;
1964 incrRefCount(shared);
1965 decrRefCount(o);
1966 return shared;
1967 } else {
1968 /* Here we are using a stream algorihtm: Every time an object is
1969 * shared we increment its count, everytime there is a miss we
1970 * recrement the counter of a random object. If this object reaches
1971 * zero we remove the object and put the current object instead. */
1972 if (dictSize(server.sharingpool) >=
1973 server.sharingpoolsize) {
1974 de = dictGetRandomKey(server.sharingpool);
1975 assert(de != NULL);
1976 c = ((unsigned long) dictGetEntryVal(de))-1;
1977 dictGetEntryVal(de) = (void*) c;
1978 if (c == 0) {
1979 dictDelete(server.sharingpool,de->key);
1980 }
1981 } else {
1982 c = 0; /* If the pool is empty we want to add this object */
1983 }
1984 if (c == 0) {
1985 int retval;
1986
1987 retval = dictAdd(server.sharingpool,o,(void*)1);
1988 assert(retval == DICT_OK);
1989 incrRefCount(o);
1990 }
1991 return o;
1992 }
1993 }
1994
1995 /* Check if the nul-terminated string 's' can be represented by a long
1996 * (that is, is a number that fits into long without any other space or
1997 * character before or after the digits).
1998 *
1999 * If so, the function returns REDIS_OK and *longval is set to the value
2000 * of the number. Otherwise REDIS_ERR is returned */
2001 static int isStringRepresentableAsLong(sds s, long *longval) {
2002 char buf[32], *endptr;
2003 long value;
2004 int slen;
2005
2006 value = strtol(s, &endptr, 10);
2007 if (endptr[0] != '\0') return REDIS_ERR;
2008 slen = snprintf(buf,32,"%ld",value);
2009
2010 /* If the number converted back into a string is not identical
2011 * then it's not possible to encode the string as integer */
2012 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2013 if (longval) *longval = value;
2014 return REDIS_OK;
2015 }
2016
2017 /* Try to encode a string object in order to save space */
2018 static int tryObjectEncoding(robj *o) {
2019 long value;
2020 sds s = o->ptr;
2021
2022 if (o->encoding != REDIS_ENCODING_RAW)
2023 return REDIS_ERR; /* Already encoded */
2024
2025 /* It's not save to encode shared objects: shared objects can be shared
2026 * everywhere in the "object space" of Redis. Encoded objects can only
2027 * appear as "values" (and not, for instance, as keys) */
2028 if (o->refcount > 1) return REDIS_ERR;
2029
2030 /* Currently we try to encode only strings */
2031 assert(o->type == REDIS_STRING);
2032
2033 /* Check if we can represent this string as a long integer */
2034 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2035
2036 /* Ok, this object can be encoded */
2037 o->encoding = REDIS_ENCODING_INT;
2038 sdsfree(o->ptr);
2039 o->ptr = (void*) value;
2040 return REDIS_OK;
2041 }
2042
2043 /* Get a decoded version of an encoded object (returned as a new object) */
2044 static robj *getDecodedObject(const robj *o) {
2045 robj *dec;
2046
2047 assert(o->encoding != REDIS_ENCODING_RAW);
2048 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2049 char buf[32];
2050
2051 snprintf(buf,32,"%ld",(long)o->ptr);
2052 dec = createStringObject(buf,strlen(buf));
2053 return dec;
2054 } else {
2055 assert(1 != 1);
2056 }
2057 }
2058
2059 static int compareStringObjects(robj *a, robj *b) {
2060 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2061
2062 if (a == b) return 0;
2063 if (a->encoding == REDIS_ENCODING_INT && b->encoding == REDIS_ENCODING_INT){
2064 return (long)a->ptr - (long)b->ptr;
2065 } else {
2066 int retval;
2067
2068 incrRefCount(a);
2069 incrRefCount(b);
2070 if (a->encoding != REDIS_ENCODING_RAW) a = getDecodedObject(a);
2071 if (b->encoding != REDIS_ENCODING_RAW) b = getDecodedObject(a);
2072 retval = sdscmp(a->ptr,b->ptr);
2073 decrRefCount(a);
2074 decrRefCount(b);
2075 return retval;
2076 }
2077 }
2078
2079 static size_t stringObjectLen(robj *o) {
2080 assert(o->type == REDIS_STRING);
2081 if (o->encoding == REDIS_ENCODING_RAW) {
2082 return sdslen(o->ptr);
2083 } else {
2084 char buf[32];
2085
2086 return snprintf(buf,32,"%ld",(long)o->ptr);
2087 }
2088 }
2089
2090 /*============================ DB saving/loading ============================ */
2091
2092 static int rdbSaveType(FILE *fp, unsigned char type) {
2093 if (fwrite(&type,1,1,fp) == 0) return -1;
2094 return 0;
2095 }
2096
2097 static int rdbSaveTime(FILE *fp, time_t t) {
2098 int32_t t32 = (int32_t) t;
2099 if (fwrite(&t32,4,1,fp) == 0) return -1;
2100 return 0;
2101 }
2102
2103 /* check rdbLoadLen() comments for more info */
2104 static int rdbSaveLen(FILE *fp, uint32_t len) {
2105 unsigned char buf[2];
2106
2107 if (len < (1<<6)) {
2108 /* Save a 6 bit len */
2109 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2110 if (fwrite(buf,1,1,fp) == 0) return -1;
2111 } else if (len < (1<<14)) {
2112 /* Save a 14 bit len */
2113 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2114 buf[1] = len&0xFF;
2115 if (fwrite(buf,2,1,fp) == 0) return -1;
2116 } else {
2117 /* Save a 32 bit len */
2118 buf[0] = (REDIS_RDB_32BITLEN<<6);
2119 if (fwrite(buf,1,1,fp) == 0) return -1;
2120 len = htonl(len);
2121 if (fwrite(&len,4,1,fp) == 0) return -1;
2122 }
2123 return 0;
2124 }
2125
2126 /* String objects in the form "2391" "-100" without any space and with a
2127 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2128 * encoded as integers to save space */
2129 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2130 long long value;
2131 char *endptr, buf[32];
2132
2133 /* Check if it's possible to encode this value as a number */
2134 value = strtoll(s, &endptr, 10);
2135 if (endptr[0] != '\0') return 0;
2136 snprintf(buf,32,"%lld",value);
2137
2138 /* If the number converted back into a string is not identical
2139 * then it's not possible to encode the string as integer */
2140 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2141
2142 /* Finally check if it fits in our ranges */
2143 if (value >= -(1<<7) && value <= (1<<7)-1) {
2144 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2145 enc[1] = value&0xFF;
2146 return 2;
2147 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2148 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2149 enc[1] = value&0xFF;
2150 enc[2] = (value>>8)&0xFF;
2151 return 3;
2152 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2153 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2154 enc[1] = value&0xFF;
2155 enc[2] = (value>>8)&0xFF;
2156 enc[3] = (value>>16)&0xFF;
2157 enc[4] = (value>>24)&0xFF;
2158 return 5;
2159 } else {
2160 return 0;
2161 }
2162 }
2163
2164 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2165 unsigned int comprlen, outlen;
2166 unsigned char byte;
2167 void *out;
2168
2169 /* We require at least four bytes compression for this to be worth it */
2170 outlen = sdslen(obj->ptr)-4;
2171 if (outlen <= 0) return 0;
2172 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2173 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2174 if (comprlen == 0) {
2175 zfree(out);
2176 return 0;
2177 }
2178 /* Data compressed! Let's save it on disk */
2179 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2180 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2181 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2182 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2183 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2184 zfree(out);
2185 return comprlen;
2186
2187 writeerr:
2188 zfree(out);
2189 return -1;
2190 }
2191
2192 /* Save a string objet as [len][data] on disk. If the object is a string
2193 * representation of an integer value we try to safe it in a special form */
2194 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2195 size_t len;
2196 int enclen;
2197
2198 len = sdslen(obj->ptr);
2199
2200 /* Try integer encoding */
2201 if (len <= 11) {
2202 unsigned char buf[5];
2203 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2204 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2205 return 0;
2206 }
2207 }
2208
2209 /* Try LZF compression - under 20 bytes it's unable to compress even
2210 * aaaaaaaaaaaaaaaaaa so skip it */
2211 if (len > 20) {
2212 int retval;
2213
2214 retval = rdbSaveLzfStringObject(fp,obj);
2215 if (retval == -1) return -1;
2216 if (retval > 0) return 0;
2217 /* retval == 0 means data can't be compressed, save the old way */
2218 }
2219
2220 /* Store verbatim */
2221 if (rdbSaveLen(fp,len) == -1) return -1;
2222 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2223 return 0;
2224 }
2225
2226 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2227 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2228 int retval;
2229 robj *dec;
2230
2231 if (obj->encoding != REDIS_ENCODING_RAW) {
2232 dec = getDecodedObject(obj);
2233 retval = rdbSaveStringObjectRaw(fp,dec);
2234 decrRefCount(dec);
2235 return retval;
2236 } else {
2237 return rdbSaveStringObjectRaw(fp,obj);
2238 }
2239 }
2240
2241 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2242 static int rdbSave(char *filename) {
2243 dictIterator *di = NULL;
2244 dictEntry *de;
2245 FILE *fp;
2246 char tmpfile[256];
2247 int j;
2248 time_t now = time(NULL);
2249
2250 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2251 fp = fopen(tmpfile,"w");
2252 if (!fp) {
2253 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2254 return REDIS_ERR;
2255 }
2256 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2257 for (j = 0; j < server.dbnum; j++) {
2258 redisDb *db = server.db+j;
2259 dict *d = db->dict;
2260 if (dictSize(d) == 0) continue;
2261 di = dictGetIterator(d);
2262 if (!di) {
2263 fclose(fp);
2264 return REDIS_ERR;
2265 }
2266
2267 /* Write the SELECT DB opcode */
2268 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2269 if (rdbSaveLen(fp,j) == -1) goto werr;
2270
2271 /* Iterate this DB writing every entry */
2272 while((de = dictNext(di)) != NULL) {
2273 robj *key = dictGetEntryKey(de);
2274 robj *o = dictGetEntryVal(de);
2275 time_t expiretime = getExpire(db,key);
2276
2277 /* Save the expire time */
2278 if (expiretime != -1) {
2279 /* If this key is already expired skip it */
2280 if (expiretime < now) continue;
2281 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2282 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2283 }
2284 /* Save the key and associated value */
2285 if (rdbSaveType(fp,o->type) == -1) goto werr;
2286 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2287 if (o->type == REDIS_STRING) {
2288 /* Save a string value */
2289 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2290 } else if (o->type == REDIS_LIST) {
2291 /* Save a list value */
2292 list *list = o->ptr;
2293 listNode *ln;
2294
2295 listRewind(list);
2296 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2297 while((ln = listYield(list))) {
2298 robj *eleobj = listNodeValue(ln);
2299
2300 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2301 }
2302 } else if (o->type == REDIS_SET) {
2303 /* Save a set value */
2304 dict *set = o->ptr;
2305 dictIterator *di = dictGetIterator(set);
2306 dictEntry *de;
2307
2308 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2309 while((de = dictNext(di)) != NULL) {
2310 robj *eleobj = dictGetEntryKey(de);
2311
2312 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2313 }
2314 dictReleaseIterator(di);
2315 } else {
2316 assert(0 != 0);
2317 }
2318 }
2319 dictReleaseIterator(di);
2320 }
2321 /* EOF opcode */
2322 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2323
2324 /* Make sure data will not remain on the OS's output buffers */
2325 fflush(fp);
2326 fsync(fileno(fp));
2327 fclose(fp);
2328
2329 /* Use RENAME to make sure the DB file is changed atomically only
2330 * if the generate DB file is ok. */
2331 if (rename(tmpfile,filename) == -1) {
2332 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2333 unlink(tmpfile);
2334 return REDIS_ERR;
2335 }
2336 redisLog(REDIS_NOTICE,"DB saved on disk");
2337 server.dirty = 0;
2338 server.lastsave = time(NULL);
2339 return REDIS_OK;
2340
2341 werr:
2342 fclose(fp);
2343 unlink(tmpfile);
2344 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2345 if (di) dictReleaseIterator(di);
2346 return REDIS_ERR;
2347 }
2348
2349 static int rdbSaveBackground(char *filename) {
2350 pid_t childpid;
2351
2352 if (server.bgsaveinprogress) return REDIS_ERR;
2353 if ((childpid = fork()) == 0) {
2354 /* Child */
2355 close(server.fd);
2356 if (rdbSave(filename) == REDIS_OK) {
2357 exit(0);
2358 } else {
2359 exit(1);
2360 }
2361 } else {
2362 /* Parent */
2363 if (childpid == -1) {
2364 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2365 strerror(errno));
2366 return REDIS_ERR;
2367 }
2368 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2369 server.bgsaveinprogress = 1;
2370 server.bgsavechildpid = childpid;
2371 return REDIS_OK;
2372 }
2373 return REDIS_OK; /* unreached */
2374 }
2375
2376 static void rdbRemoveTempFile(pid_t childpid) {
2377 char tmpfile[256];
2378
2379 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2380 unlink(tmpfile);
2381 }
2382
2383 static int rdbLoadType(FILE *fp) {
2384 unsigned char type;
2385 if (fread(&type,1,1,fp) == 0) return -1;
2386 return type;
2387 }
2388
2389 static time_t rdbLoadTime(FILE *fp) {
2390 int32_t t32;
2391 if (fread(&t32,4,1,fp) == 0) return -1;
2392 return (time_t) t32;
2393 }
2394
2395 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2396 * of this file for a description of how this are stored on disk.
2397 *
2398 * isencoded is set to 1 if the readed length is not actually a length but
2399 * an "encoding type", check the above comments for more info */
2400 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2401 unsigned char buf[2];
2402 uint32_t len;
2403
2404 if (isencoded) *isencoded = 0;
2405 if (rdbver == 0) {
2406 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2407 return ntohl(len);
2408 } else {
2409 int type;
2410
2411 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2412 type = (buf[0]&0xC0)>>6;
2413 if (type == REDIS_RDB_6BITLEN) {
2414 /* Read a 6 bit len */
2415 return buf[0]&0x3F;
2416 } else if (type == REDIS_RDB_ENCVAL) {
2417 /* Read a 6 bit len encoding type */
2418 if (isencoded) *isencoded = 1;
2419 return buf[0]&0x3F;
2420 } else if (type == REDIS_RDB_14BITLEN) {
2421 /* Read a 14 bit len */
2422 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2423 return ((buf[0]&0x3F)<<8)|buf[1];
2424 } else {
2425 /* Read a 32 bit len */
2426 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2427 return ntohl(len);
2428 }
2429 }
2430 }
2431
2432 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2433 unsigned char enc[4];
2434 long long val;
2435
2436 if (enctype == REDIS_RDB_ENC_INT8) {
2437 if (fread(enc,1,1,fp) == 0) return NULL;
2438 val = (signed char)enc[0];
2439 } else if (enctype == REDIS_RDB_ENC_INT16) {
2440 uint16_t v;
2441 if (fread(enc,2,1,fp) == 0) return NULL;
2442 v = enc[0]|(enc[1]<<8);
2443 val = (int16_t)v;
2444 } else if (enctype == REDIS_RDB_ENC_INT32) {
2445 uint32_t v;
2446 if (fread(enc,4,1,fp) == 0) return NULL;
2447 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2448 val = (int32_t)v;
2449 } else {
2450 val = 0; /* anti-warning */
2451 assert(0!=0);
2452 }
2453 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2454 }
2455
2456 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2457 unsigned int len, clen;
2458 unsigned char *c = NULL;
2459 sds val = NULL;
2460
2461 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2462 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2463 if ((c = zmalloc(clen)) == NULL) goto err;
2464 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2465 if (fread(c,clen,1,fp) == 0) goto err;
2466 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2467 zfree(c);
2468 return createObject(REDIS_STRING,val);
2469 err:
2470 zfree(c);
2471 sdsfree(val);
2472 return NULL;
2473 }
2474
2475 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2476 int isencoded;
2477 uint32_t len;
2478 sds val;
2479
2480 len = rdbLoadLen(fp,rdbver,&isencoded);
2481 if (isencoded) {
2482 switch(len) {
2483 case REDIS_RDB_ENC_INT8:
2484 case REDIS_RDB_ENC_INT16:
2485 case REDIS_RDB_ENC_INT32:
2486 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2487 case REDIS_RDB_ENC_LZF:
2488 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2489 default:
2490 assert(0!=0);
2491 }
2492 }
2493
2494 if (len == REDIS_RDB_LENERR) return NULL;
2495 val = sdsnewlen(NULL,len);
2496 if (len && fread(val,len,1,fp) == 0) {
2497 sdsfree(val);
2498 return NULL;
2499 }
2500 return tryObjectSharing(createObject(REDIS_STRING,val));
2501 }
2502
2503 static int rdbLoad(char *filename) {
2504 FILE *fp;
2505 robj *keyobj = NULL;
2506 uint32_t dbid;
2507 int type, retval, rdbver;
2508 dict *d = server.db[0].dict;
2509 redisDb *db = server.db+0;
2510 char buf[1024];
2511 time_t expiretime = -1, now = time(NULL);
2512
2513 fp = fopen(filename,"r");
2514 if (!fp) return REDIS_ERR;
2515 if (fread(buf,9,1,fp) == 0) goto eoferr;
2516 buf[9] = '\0';
2517 if (memcmp(buf,"REDIS",5) != 0) {
2518 fclose(fp);
2519 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2520 return REDIS_ERR;
2521 }
2522 rdbver = atoi(buf+5);
2523 if (rdbver > 1) {
2524 fclose(fp);
2525 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2526 return REDIS_ERR;
2527 }
2528 while(1) {
2529 robj *o;
2530
2531 /* Read type. */
2532 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2533 if (type == REDIS_EXPIRETIME) {
2534 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2535 /* We read the time so we need to read the object type again */
2536 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2537 }
2538 if (type == REDIS_EOF) break;
2539 /* Handle SELECT DB opcode as a special case */
2540 if (type == REDIS_SELECTDB) {
2541 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2542 goto eoferr;
2543 if (dbid >= (unsigned)server.dbnum) {
2544 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2545 exit(1);
2546 }
2547 db = server.db+dbid;
2548 d = db->dict;
2549 continue;
2550 }
2551 /* Read key */
2552 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2553
2554 if (type == REDIS_STRING) {
2555 /* Read string value */
2556 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2557 tryObjectEncoding(o);
2558 } else if (type == REDIS_LIST || type == REDIS_SET) {
2559 /* Read list/set value */
2560 uint32_t listlen;
2561
2562 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2563 goto eoferr;
2564 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2565 /* Load every single element of the list/set */
2566 while(listlen--) {
2567 robj *ele;
2568
2569 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2570 tryObjectEncoding(ele);
2571 if (type == REDIS_LIST) {
2572 listAddNodeTail((list*)o->ptr,ele);
2573 } else {
2574 dictAdd((dict*)o->ptr,ele,NULL);
2575 }
2576 }
2577 } else {
2578 assert(0 != 0);
2579 }
2580 /* Add the new object in the hash table */
2581 retval = dictAdd(d,keyobj,o);
2582 if (retval == DICT_ERR) {
2583 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2584 exit(1);
2585 }
2586 /* Set the expire time if needed */
2587 if (expiretime != -1) {
2588 setExpire(db,keyobj,expiretime);
2589 /* Delete this key if already expired */
2590 if (expiretime < now) deleteKey(db,keyobj);
2591 expiretime = -1;
2592 }
2593 keyobj = o = NULL;
2594 }
2595 fclose(fp);
2596 return REDIS_OK;
2597
2598 eoferr: /* unexpected end of file is handled here with a fatal exit */
2599 if (keyobj) decrRefCount(keyobj);
2600 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2601 exit(1);
2602 return REDIS_ERR; /* Just to avoid warning */
2603 }
2604
2605 /*================================== Commands =============================== */
2606
2607 static void authCommand(redisClient *c) {
2608 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2609 c->authenticated = 1;
2610 addReply(c,shared.ok);
2611 } else {
2612 c->authenticated = 0;
2613 addReply(c,shared.err);
2614 }
2615 }
2616
2617 static void pingCommand(redisClient *c) {
2618 addReply(c,shared.pong);
2619 }
2620
2621 static void echoCommand(redisClient *c) {
2622 addReplyBulkLen(c,c->argv[1]);
2623 addReply(c,c->argv[1]);
2624 addReply(c,shared.crlf);
2625 }
2626
2627 /*=================================== Strings =============================== */
2628
2629 static void setGenericCommand(redisClient *c, int nx) {
2630 int retval;
2631
2632 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2633 if (retval == DICT_ERR) {
2634 if (!nx) {
2635 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2636 incrRefCount(c->argv[2]);
2637 } else {
2638 addReply(c,shared.czero);
2639 return;
2640 }
2641 } else {
2642 incrRefCount(c->argv[1]);
2643 incrRefCount(c->argv[2]);
2644 }
2645 server.dirty++;
2646 removeExpire(c->db,c->argv[1]);
2647 addReply(c, nx ? shared.cone : shared.ok);
2648 }
2649
2650 static void setCommand(redisClient *c) {
2651 setGenericCommand(c,0);
2652 }
2653
2654 static void setnxCommand(redisClient *c) {
2655 setGenericCommand(c,1);
2656 }
2657
2658 static void getCommand(redisClient *c) {
2659 robj *o = lookupKeyRead(c->db,c->argv[1]);
2660
2661 if (o == NULL) {
2662 addReply(c,shared.nullbulk);
2663 } else {
2664 if (o->type != REDIS_STRING) {
2665 addReply(c,shared.wrongtypeerr);
2666 } else {
2667 addReplyBulkLen(c,o);
2668 addReply(c,o);
2669 addReply(c,shared.crlf);
2670 }
2671 }
2672 }
2673
2674 static void getsetCommand(redisClient *c) {
2675 getCommand(c);
2676 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2677 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2678 } else {
2679 incrRefCount(c->argv[1]);
2680 }
2681 incrRefCount(c->argv[2]);
2682 server.dirty++;
2683 removeExpire(c->db,c->argv[1]);
2684 }
2685
2686 static void mgetCommand(redisClient *c) {
2687 int j;
2688
2689 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2690 for (j = 1; j < c->argc; j++) {
2691 robj *o = lookupKeyRead(c->db,c->argv[j]);
2692 if (o == NULL) {
2693 addReply(c,shared.nullbulk);
2694 } else {
2695 if (o->type != REDIS_STRING) {
2696 addReply(c,shared.nullbulk);
2697 } else {
2698 addReplyBulkLen(c,o);
2699 addReply(c,o);
2700 addReply(c,shared.crlf);
2701 }
2702 }
2703 }
2704 }
2705
2706 static void incrDecrCommand(redisClient *c, long long incr) {
2707 long long value;
2708 int retval;
2709 robj *o;
2710
2711 o = lookupKeyWrite(c->db,c->argv[1]);
2712 if (o == NULL) {
2713 value = 0;
2714 } else {
2715 if (o->type != REDIS_STRING) {
2716 value = 0;
2717 } else {
2718 char *eptr;
2719
2720 if (o->encoding == REDIS_ENCODING_RAW)
2721 value = strtoll(o->ptr, &eptr, 10);
2722 else if (o->encoding == REDIS_ENCODING_INT)
2723 value = (long)o->ptr;
2724 else
2725 assert(1 != 1);
2726 }
2727 }
2728
2729 value += incr;
2730 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2731 tryObjectEncoding(o);
2732 retval = dictAdd(c->db->dict,c->argv[1],o);
2733 if (retval == DICT_ERR) {
2734 dictReplace(c->db->dict,c->argv[1],o);
2735 removeExpire(c->db,c->argv[1]);
2736 } else {
2737 incrRefCount(c->argv[1]);
2738 }
2739 server.dirty++;
2740 addReply(c,shared.colon);
2741 addReply(c,o);
2742 addReply(c,shared.crlf);
2743 }
2744
2745 static void incrCommand(redisClient *c) {
2746 incrDecrCommand(c,1);
2747 }
2748
2749 static void decrCommand(redisClient *c) {
2750 incrDecrCommand(c,-1);
2751 }
2752
2753 static void incrbyCommand(redisClient *c) {
2754 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2755 incrDecrCommand(c,incr);
2756 }
2757
2758 static void decrbyCommand(redisClient *c) {
2759 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2760 incrDecrCommand(c,-incr);
2761 }
2762
2763 /* ========================= Type agnostic commands ========================= */
2764
2765 static void delCommand(redisClient *c) {
2766 int deleted = 0, j;
2767
2768 for (j = 1; j < c->argc; j++) {
2769 if (deleteKey(c->db,c->argv[j])) {
2770 server.dirty++;
2771 deleted++;
2772 }
2773 }
2774 switch(deleted) {
2775 case 0:
2776 addReply(c,shared.czero);
2777 break;
2778 case 1:
2779 addReply(c,shared.cone);
2780 break;
2781 default:
2782 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2783 break;
2784 }
2785 }
2786
2787 static void existsCommand(redisClient *c) {
2788 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2789 }
2790
2791 static void selectCommand(redisClient *c) {
2792 int id = atoi(c->argv[1]->ptr);
2793
2794 if (selectDb(c,id) == REDIS_ERR) {
2795 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2796 } else {
2797 addReply(c,shared.ok);
2798 }
2799 }
2800
2801 static void randomkeyCommand(redisClient *c) {
2802 dictEntry *de;
2803
2804 while(1) {
2805 de = dictGetRandomKey(c->db->dict);
2806 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2807 }
2808 if (de == NULL) {
2809 addReply(c,shared.plus);
2810 addReply(c,shared.crlf);
2811 } else {
2812 addReply(c,shared.plus);
2813 addReply(c,dictGetEntryKey(de));
2814 addReply(c,shared.crlf);
2815 }
2816 }
2817
2818 static void keysCommand(redisClient *c) {
2819 dictIterator *di;
2820 dictEntry *de;
2821 sds pattern = c->argv[1]->ptr;
2822 int plen = sdslen(pattern);
2823 int numkeys = 0, keyslen = 0;
2824 robj *lenobj = createObject(REDIS_STRING,NULL);
2825
2826 di = dictGetIterator(c->db->dict);
2827 addReply(c,lenobj);
2828 decrRefCount(lenobj);
2829 while((de = dictNext(di)) != NULL) {
2830 robj *keyobj = dictGetEntryKey(de);
2831
2832 sds key = keyobj->ptr;
2833 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2834 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2835 if (expireIfNeeded(c->db,keyobj) == 0) {
2836 if (numkeys != 0)
2837 addReply(c,shared.space);
2838 addReply(c,keyobj);
2839 numkeys++;
2840 keyslen += sdslen(key);
2841 }
2842 }
2843 }
2844 dictReleaseIterator(di);
2845 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2846 addReply(c,shared.crlf);
2847 }
2848
2849 static void dbsizeCommand(redisClient *c) {
2850 addReplySds(c,
2851 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2852 }
2853
2854 static void lastsaveCommand(redisClient *c) {
2855 addReplySds(c,
2856 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2857 }
2858
2859 static void typeCommand(redisClient *c) {
2860 robj *o;
2861 char *type;
2862
2863 o = lookupKeyRead(c->db,c->argv[1]);
2864 if (o == NULL) {
2865 type = "+none";
2866 } else {
2867 switch(o->type) {
2868 case REDIS_STRING: type = "+string"; break;
2869 case REDIS_LIST: type = "+list"; break;
2870 case REDIS_SET: type = "+set"; break;
2871 default: type = "unknown"; break;
2872 }
2873 }
2874 addReplySds(c,sdsnew(type));
2875 addReply(c,shared.crlf);
2876 }
2877
2878 static void saveCommand(redisClient *c) {
2879 if (server.bgsaveinprogress) {
2880 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2881 return;
2882 }
2883 if (rdbSave(server.dbfilename) == REDIS_OK) {
2884 addReply(c,shared.ok);
2885 } else {
2886 addReply(c,shared.err);
2887 }
2888 }
2889
2890 static void bgsaveCommand(redisClient *c) {
2891 if (server.bgsaveinprogress) {
2892 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2893 return;
2894 }
2895 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2896 addReply(c,shared.ok);
2897 } else {
2898 addReply(c,shared.err);
2899 }
2900 }
2901
2902 static void shutdownCommand(redisClient *c) {
2903 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2904 /* Kill the saving child if there is a background saving in progress.
2905 We want to avoid race conditions, for instance our saving child may
2906 overwrite the synchronous saving did by SHUTDOWN. */
2907 if (server.bgsaveinprogress) {
2908 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2909 kill(server.bgsavechildpid,SIGKILL);
2910 rdbRemoveTempFile(server.bgsavechildpid);
2911 }
2912 /* SYNC SAVE */
2913 if (rdbSave(server.dbfilename) == REDIS_OK) {
2914 if (server.daemonize)
2915 unlink(server.pidfile);
2916 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2917 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2918 exit(1);
2919 } else {
2920 /* Ooops.. error saving! The best we can do is to continue operating.
2921 * Note that if there was a background saving process, in the next
2922 * cron() Redis will be notified that the background saving aborted,
2923 * handling special stuff like slaves pending for synchronization... */
2924 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2925 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2926 }
2927 }
2928
2929 static void renameGenericCommand(redisClient *c, int nx) {
2930 robj *o;
2931
2932 /* To use the same key as src and dst is probably an error */
2933 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2934 addReply(c,shared.sameobjecterr);
2935 return;
2936 }
2937
2938 o = lookupKeyWrite(c->db,c->argv[1]);
2939 if (o == NULL) {
2940 addReply(c,shared.nokeyerr);
2941 return;
2942 }
2943 incrRefCount(o);
2944 deleteIfVolatile(c->db,c->argv[2]);
2945 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2946 if (nx) {
2947 decrRefCount(o);
2948 addReply(c,shared.czero);
2949 return;
2950 }
2951 dictReplace(c->db->dict,c->argv[2],o);
2952 } else {
2953 incrRefCount(c->argv[2]);
2954 }
2955 deleteKey(c->db,c->argv[1]);
2956 server.dirty++;
2957 addReply(c,nx ? shared.cone : shared.ok);
2958 }
2959
2960 static void renameCommand(redisClient *c) {
2961 renameGenericCommand(c,0);
2962 }
2963
2964 static void renamenxCommand(redisClient *c) {
2965 renameGenericCommand(c,1);
2966 }
2967
2968 static void moveCommand(redisClient *c) {
2969 robj *o;
2970 redisDb *src, *dst;
2971 int srcid;
2972
2973 /* Obtain source and target DB pointers */
2974 src = c->db;
2975 srcid = c->db->id;
2976 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2977 addReply(c,shared.outofrangeerr);
2978 return;
2979 }
2980 dst = c->db;
2981 selectDb(c,srcid); /* Back to the source DB */
2982
2983 /* If the user is moving using as target the same
2984 * DB as the source DB it is probably an error. */
2985 if (src == dst) {
2986 addReply(c,shared.sameobjecterr);
2987 return;
2988 }
2989
2990 /* Check if the element exists and get a reference */
2991 o = lookupKeyWrite(c->db,c->argv[1]);
2992 if (!o) {
2993 addReply(c,shared.czero);
2994 return;
2995 }
2996
2997 /* Try to add the element to the target DB */
2998 deleteIfVolatile(dst,c->argv[1]);
2999 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3000 addReply(c,shared.czero);
3001 return;
3002 }
3003 incrRefCount(c->argv[1]);
3004 incrRefCount(o);
3005
3006 /* OK! key moved, free the entry in the source DB */
3007 deleteKey(src,c->argv[1]);
3008 server.dirty++;
3009 addReply(c,shared.cone);
3010 }
3011
3012 /* =================================== Lists ================================ */
3013 static void pushGenericCommand(redisClient *c, int where) {
3014 robj *lobj;
3015 list *list;
3016
3017 lobj = lookupKeyWrite(c->db,c->argv[1]);
3018 if (lobj == NULL) {
3019 lobj = createListObject();
3020 list = lobj->ptr;
3021 if (where == REDIS_HEAD) {
3022 listAddNodeHead(list,c->argv[2]);
3023 } else {
3024 listAddNodeTail(list,c->argv[2]);
3025 }
3026 dictAdd(c->db->dict,c->argv[1],lobj);
3027 incrRefCount(c->argv[1]);
3028 incrRefCount(c->argv[2]);
3029 } else {
3030 if (lobj->type != REDIS_LIST) {
3031 addReply(c,shared.wrongtypeerr);
3032 return;
3033 }
3034 list = lobj->ptr;
3035 if (where == REDIS_HEAD) {
3036 listAddNodeHead(list,c->argv[2]);
3037 } else {
3038 listAddNodeTail(list,c->argv[2]);
3039 }
3040 incrRefCount(c->argv[2]);
3041 }
3042 server.dirty++;
3043 addReply(c,shared.ok);
3044 }
3045
3046 static void lpushCommand(redisClient *c) {
3047 pushGenericCommand(c,REDIS_HEAD);
3048 }
3049
3050 static void rpushCommand(redisClient *c) {
3051 pushGenericCommand(c,REDIS_TAIL);
3052 }
3053
3054 static void llenCommand(redisClient *c) {
3055 robj *o;
3056 list *l;
3057
3058 o = lookupKeyRead(c->db,c->argv[1]);
3059 if (o == NULL) {
3060 addReply(c,shared.czero);
3061 return;
3062 } else {
3063 if (o->type != REDIS_LIST) {
3064 addReply(c,shared.wrongtypeerr);
3065 } else {
3066 l = o->ptr;
3067 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3068 }
3069 }
3070 }
3071
3072 static void lindexCommand(redisClient *c) {
3073 robj *o;
3074 int index = atoi(c->argv[2]->ptr);
3075
3076 o = lookupKeyRead(c->db,c->argv[1]);
3077 if (o == NULL) {
3078 addReply(c,shared.nullbulk);
3079 } else {
3080 if (o->type != REDIS_LIST) {
3081 addReply(c,shared.wrongtypeerr);
3082 } else {
3083 list *list = o->ptr;
3084 listNode *ln;
3085
3086 ln = listIndex(list, index);
3087 if (ln == NULL) {
3088 addReply(c,shared.nullbulk);
3089 } else {
3090 robj *ele = listNodeValue(ln);
3091 addReplyBulkLen(c,ele);
3092 addReply(c,ele);
3093 addReply(c,shared.crlf);
3094 }
3095 }
3096 }
3097 }
3098
3099 static void lsetCommand(redisClient *c) {
3100 robj *o;
3101 int index = atoi(c->argv[2]->ptr);
3102
3103 o = lookupKeyWrite(c->db,c->argv[1]);
3104 if (o == NULL) {
3105 addReply(c,shared.nokeyerr);
3106 } else {
3107 if (o->type != REDIS_LIST) {
3108 addReply(c,shared.wrongtypeerr);
3109 } else {
3110 list *list = o->ptr;
3111 listNode *ln;
3112
3113 ln = listIndex(list, index);
3114 if (ln == NULL) {
3115 addReply(c,shared.outofrangeerr);
3116 } else {
3117 robj *ele = listNodeValue(ln);
3118
3119 decrRefCount(ele);
3120 listNodeValue(ln) = c->argv[3];
3121 incrRefCount(c->argv[3]);
3122 addReply(c,shared.ok);
3123 server.dirty++;
3124 }
3125 }
3126 }
3127 }
3128
3129 static void popGenericCommand(redisClient *c, int where) {
3130 robj *o;
3131
3132 o = lookupKeyWrite(c->db,c->argv[1]);
3133 if (o == NULL) {
3134 addReply(c,shared.nullbulk);
3135 } else {
3136 if (o->type != REDIS_LIST) {
3137 addReply(c,shared.wrongtypeerr);
3138 } else {
3139 list *list = o->ptr;
3140 listNode *ln;
3141
3142 if (where == REDIS_HEAD)
3143 ln = listFirst(list);
3144 else
3145 ln = listLast(list);
3146
3147 if (ln == NULL) {
3148 addReply(c,shared.nullbulk);
3149 } else {
3150 robj *ele = listNodeValue(ln);
3151 addReplyBulkLen(c,ele);
3152 addReply(c,ele);
3153 addReply(c,shared.crlf);
3154 listDelNode(list,ln);
3155 server.dirty++;
3156 }
3157 }
3158 }
3159 }
3160
3161 static void lpopCommand(redisClient *c) {
3162 popGenericCommand(c,REDIS_HEAD);
3163 }
3164
3165 static void rpopCommand(redisClient *c) {
3166 popGenericCommand(c,REDIS_TAIL);
3167 }
3168
3169 static void lrangeCommand(redisClient *c) {
3170 robj *o;
3171 int start = atoi(c->argv[2]->ptr);
3172 int end = atoi(c->argv[3]->ptr);
3173
3174 o = lookupKeyRead(c->db,c->argv[1]);
3175 if (o == NULL) {
3176 addReply(c,shared.nullmultibulk);
3177 } else {
3178 if (o->type != REDIS_LIST) {
3179 addReply(c,shared.wrongtypeerr);
3180 } else {
3181 list *list = o->ptr;
3182 listNode *ln;
3183 int llen = listLength(list);
3184 int rangelen, j;
3185 robj *ele;
3186
3187 /* convert negative indexes */
3188 if (start < 0) start = llen+start;
3189 if (end < 0) end = llen+end;
3190 if (start < 0) start = 0;
3191 if (end < 0) end = 0;
3192
3193 /* indexes sanity checks */
3194 if (start > end || start >= llen) {
3195 /* Out of range start or start > end result in empty list */
3196 addReply(c,shared.emptymultibulk);
3197 return;
3198 }
3199 if (end >= llen) end = llen-1;
3200 rangelen = (end-start)+1;
3201
3202 /* Return the result in form of a multi-bulk reply */
3203 ln = listIndex(list, start);
3204 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3205 for (j = 0; j < rangelen; j++) {
3206 ele = listNodeValue(ln);
3207 addReplyBulkLen(c,ele);
3208 addReply(c,ele);
3209 addReply(c,shared.crlf);
3210 ln = ln->next;
3211 }
3212 }
3213 }
3214 }
3215
3216 static void ltrimCommand(redisClient *c) {
3217 robj *o;
3218 int start = atoi(c->argv[2]->ptr);
3219 int end = atoi(c->argv[3]->ptr);
3220
3221 o = lookupKeyWrite(c->db,c->argv[1]);
3222 if (o == NULL) {
3223 addReply(c,shared.nokeyerr);
3224 } else {
3225 if (o->type != REDIS_LIST) {
3226 addReply(c,shared.wrongtypeerr);
3227 } else {
3228 list *list = o->ptr;
3229 listNode *ln;
3230 int llen = listLength(list);
3231 int j, ltrim, rtrim;
3232
3233 /* convert negative indexes */
3234 if (start < 0) start = llen+start;
3235 if (end < 0) end = llen+end;
3236 if (start < 0) start = 0;
3237 if (end < 0) end = 0;
3238
3239 /* indexes sanity checks */
3240 if (start > end || start >= llen) {
3241 /* Out of range start or start > end result in empty list */
3242 ltrim = llen;
3243 rtrim = 0;
3244 } else {
3245 if (end >= llen) end = llen-1;
3246 ltrim = start;
3247 rtrim = llen-end-1;
3248 }
3249
3250 /* Remove list elements to perform the trim */
3251 for (j = 0; j < ltrim; j++) {
3252 ln = listFirst(list);
3253 listDelNode(list,ln);
3254 }
3255 for (j = 0; j < rtrim; j++) {
3256 ln = listLast(list);
3257 listDelNode(list,ln);
3258 }
3259 server.dirty++;
3260 addReply(c,shared.ok);
3261 }
3262 }
3263 }
3264
3265 static void lremCommand(redisClient *c) {
3266 robj *o;
3267
3268 o = lookupKeyWrite(c->db,c->argv[1]);
3269 if (o == NULL) {
3270 addReply(c,shared.czero);
3271 } else {
3272 if (o->type != REDIS_LIST) {
3273 addReply(c,shared.wrongtypeerr);
3274 } else {
3275 list *list = o->ptr;
3276 listNode *ln, *next;
3277 int toremove = atoi(c->argv[2]->ptr);
3278 int removed = 0;
3279 int fromtail = 0;
3280
3281 if (toremove < 0) {
3282 toremove = -toremove;
3283 fromtail = 1;
3284 }
3285 ln = fromtail ? list->tail : list->head;
3286 while (ln) {
3287 robj *ele = listNodeValue(ln);
3288
3289 next = fromtail ? ln->prev : ln->next;
3290 if (compareStringObjects(ele,c->argv[3]) == 0) {
3291 listDelNode(list,ln);
3292 server.dirty++;
3293 removed++;
3294 if (toremove && removed == toremove) break;
3295 }
3296 ln = next;
3297 }
3298 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3299 }
3300 }
3301 }
3302
3303 /* ==================================== Sets ================================ */
3304
3305 static void saddCommand(redisClient *c) {
3306 robj *set;
3307
3308 set = lookupKeyWrite(c->db,c->argv[1]);
3309 if (set == NULL) {
3310 set = createSetObject();
3311 dictAdd(c->db->dict,c->argv[1],set);
3312 incrRefCount(c->argv[1]);
3313 } else {
3314 if (set->type != REDIS_SET) {
3315 addReply(c,shared.wrongtypeerr);
3316 return;
3317 }
3318 }
3319 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3320 incrRefCount(c->argv[2]);
3321 server.dirty++;
3322 addReply(c,shared.cone);
3323 } else {
3324 addReply(c,shared.czero);
3325 }
3326 }
3327
3328 static void sremCommand(redisClient *c) {
3329 robj *set;
3330
3331 set = lookupKeyWrite(c->db,c->argv[1]);
3332 if (set == NULL) {
3333 addReply(c,shared.czero);
3334 } else {
3335 if (set->type != REDIS_SET) {
3336 addReply(c,shared.wrongtypeerr);
3337 return;
3338 }
3339 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3340 server.dirty++;
3341 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3342 addReply(c,shared.cone);
3343 } else {
3344 addReply(c,shared.czero);
3345 }
3346 }
3347 }
3348
3349 static void smoveCommand(redisClient *c) {
3350 robj *srcset, *dstset;
3351
3352 srcset = lookupKeyWrite(c->db,c->argv[1]);
3353 dstset = lookupKeyWrite(c->db,c->argv[2]);
3354
3355 /* If the source key does not exist return 0, if it's of the wrong type
3356 * raise an error */
3357 if (srcset == NULL || srcset->type != REDIS_SET) {
3358 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3359 return;
3360 }
3361 /* Error if the destination key is not a set as well */
3362 if (dstset && dstset->type != REDIS_SET) {
3363 addReply(c,shared.wrongtypeerr);
3364 return;
3365 }
3366 /* Remove the element from the source set */
3367 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3368 /* Key not found in the src set! return zero */
3369 addReply(c,shared.czero);
3370 return;
3371 }
3372 server.dirty++;
3373 /* Add the element to the destination set */
3374 if (!dstset) {
3375 dstset = createSetObject();
3376 dictAdd(c->db->dict,c->argv[2],dstset);
3377 incrRefCount(c->argv[2]);
3378 }
3379 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3380 incrRefCount(c->argv[3]);
3381 addReply(c,shared.cone);
3382 }
3383
3384 static void sismemberCommand(redisClient *c) {
3385 robj *set;
3386
3387 set = lookupKeyRead(c->db,c->argv[1]);
3388 if (set == NULL) {
3389 addReply(c,shared.czero);
3390 } else {
3391 if (set->type != REDIS_SET) {
3392 addReply(c,shared.wrongtypeerr);
3393 return;
3394 }
3395 if (dictFind(set->ptr,c->argv[2]))
3396 addReply(c,shared.cone);
3397 else
3398 addReply(c,shared.czero);
3399 }
3400 }
3401
3402 static void scardCommand(redisClient *c) {
3403 robj *o;
3404 dict *s;
3405
3406 o = lookupKeyRead(c->db,c->argv[1]);
3407 if (o == NULL) {
3408 addReply(c,shared.czero);
3409 return;
3410 } else {
3411 if (o->type != REDIS_SET) {
3412 addReply(c,shared.wrongtypeerr);
3413 } else {
3414 s = o->ptr;
3415 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3416 dictSize(s)));
3417 }
3418 }
3419 }
3420
3421 static void spopCommand(redisClient *c) {
3422 robj *set;
3423 dictEntry *de;
3424
3425 set = lookupKeyWrite(c->db,c->argv[1]);
3426 if (set == NULL) {
3427 addReply(c,shared.nullbulk);
3428 } else {
3429 if (set->type != REDIS_SET) {
3430 addReply(c,shared.wrongtypeerr);
3431 return;
3432 }
3433 de = dictGetRandomKey(set->ptr);
3434 if (de == NULL) {
3435 addReply(c,shared.nullbulk);
3436 } else {
3437 robj *ele = dictGetEntryKey(de);
3438
3439 addReplyBulkLen(c,ele);
3440 addReply(c,ele);
3441 addReply(c,shared.crlf);
3442 dictDelete(set->ptr,ele);
3443 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3444 server.dirty++;
3445 }
3446 }
3447 }
3448
3449 static void srandmemberCommand(redisClient *c) {
3450 robj *set;
3451 dictEntry *de;
3452
3453 set = lookupKeyRead(c->db,c->argv[1]);
3454 if (set == NULL) {
3455 addReply(c,shared.nullbulk);
3456 } else {
3457 if (set->type != REDIS_SET) {
3458 addReply(c,shared.wrongtypeerr);
3459 return;
3460 }
3461 de = dictGetRandomKey(set->ptr);
3462 if (de == NULL) {
3463 addReply(c,shared.nullbulk);
3464 } else {
3465 robj *ele = dictGetEntryKey(de);
3466
3467 addReplyBulkLen(c,ele);
3468 addReply(c,ele);
3469 addReply(c,shared.crlf);
3470 }
3471 }
3472 }
3473
3474 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3475 dict **d1 = (void*) s1, **d2 = (void*) s2;
3476
3477 return dictSize(*d1)-dictSize(*d2);
3478 }
3479
3480 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3481 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3482 dictIterator *di;
3483 dictEntry *de;
3484 robj *lenobj = NULL, *dstset = NULL;
3485 int j, cardinality = 0;
3486
3487 for (j = 0; j < setsnum; j++) {
3488 robj *setobj;
3489
3490 setobj = dstkey ?
3491 lookupKeyWrite(c->db,setskeys[j]) :
3492 lookupKeyRead(c->db,setskeys[j]);
3493 if (!setobj) {
3494 zfree(dv);
3495 if (dstkey) {
3496 deleteKey(c->db,dstkey);
3497 addReply(c,shared.ok);
3498 } else {
3499 addReply(c,shared.nullmultibulk);
3500 }
3501 return;
3502 }
3503 if (setobj->type != REDIS_SET) {
3504 zfree(dv);
3505 addReply(c,shared.wrongtypeerr);
3506 return;
3507 }
3508 dv[j] = setobj->ptr;
3509 }
3510 /* Sort sets from the smallest to largest, this will improve our
3511 * algorithm's performace */
3512 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3513
3514 /* The first thing we should output is the total number of elements...
3515 * since this is a multi-bulk write, but at this stage we don't know
3516 * the intersection set size, so we use a trick, append an empty object
3517 * to the output list and save the pointer to later modify it with the
3518 * right length */
3519 if (!dstkey) {
3520 lenobj = createObject(REDIS_STRING,NULL);
3521 addReply(c,lenobj);
3522 decrRefCount(lenobj);
3523 } else {
3524 /* If we have a target key where to store the resulting set
3525 * create this key with an empty set inside */
3526 dstset = createSetObject();
3527 }
3528
3529 /* Iterate all the elements of the first (smallest) set, and test
3530 * the element against all the other sets, if at least one set does
3531 * not include the element it is discarded */
3532 di = dictGetIterator(dv[0]);
3533
3534 while((de = dictNext(di)) != NULL) {
3535 robj *ele;
3536
3537 for (j = 1; j < setsnum; j++)
3538 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3539 if (j != setsnum)
3540 continue; /* at least one set does not contain the member */
3541 ele = dictGetEntryKey(de);
3542 if (!dstkey) {
3543 addReplyBulkLen(c,ele);
3544 addReply(c,ele);
3545 addReply(c,shared.crlf);
3546 cardinality++;
3547 } else {
3548 dictAdd(dstset->ptr,ele,NULL);
3549 incrRefCount(ele);
3550 }
3551 }
3552 dictReleaseIterator(di);
3553
3554 if (dstkey) {
3555 /* Store the resulting set into the target */
3556 deleteKey(c->db,dstkey);
3557 dictAdd(c->db->dict,dstkey,dstset);
3558 incrRefCount(dstkey);
3559 }
3560
3561 if (!dstkey) {
3562 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3563 } else {
3564 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3565 dictSize((dict*)dstset->ptr)));
3566 server.dirty++;
3567 }
3568 zfree(dv);
3569 }
3570
3571 static void sinterCommand(redisClient *c) {
3572 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3573 }
3574
3575 static void sinterstoreCommand(redisClient *c) {
3576 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3577 }
3578
3579 #define REDIS_OP_UNION 0
3580 #define REDIS_OP_DIFF 1
3581
3582 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3583 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3584 dictIterator *di;
3585 dictEntry *de;
3586 robj *dstset = NULL;
3587 int j, cardinality = 0;
3588
3589 for (j = 0; j < setsnum; j++) {
3590 robj *setobj;
3591
3592 setobj = dstkey ?
3593 lookupKeyWrite(c->db,setskeys[j]) :
3594 lookupKeyRead(c->db,setskeys[j]);
3595 if (!setobj) {
3596 dv[j] = NULL;
3597 continue;
3598 }
3599 if (setobj->type != REDIS_SET) {
3600 zfree(dv);
3601 addReply(c,shared.wrongtypeerr);
3602 return;
3603 }
3604 dv[j] = setobj->ptr;
3605 }
3606
3607 /* We need a temp set object to store our union. If the dstkey
3608 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3609 * this set object will be the resulting object to set into the target key*/
3610 dstset = createSetObject();
3611
3612 /* Iterate all the elements of all the sets, add every element a single
3613 * time to the result set */
3614 for (j = 0; j < setsnum; j++) {
3615 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3616 if (!dv[j]) continue; /* non existing keys are like empty sets */
3617
3618 di = dictGetIterator(dv[j]);
3619
3620 while((de = dictNext(di)) != NULL) {
3621 robj *ele;
3622
3623 /* dictAdd will not add the same element multiple times */
3624 ele = dictGetEntryKey(de);
3625 if (op == REDIS_OP_UNION || j == 0) {
3626 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3627 incrRefCount(ele);
3628 cardinality++;
3629 }
3630 } else if (op == REDIS_OP_DIFF) {
3631 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3632 cardinality--;
3633 }
3634 }
3635 }
3636 dictReleaseIterator(di);
3637
3638 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3639 }
3640
3641 /* Output the content of the resulting set, if not in STORE mode */
3642 if (!dstkey) {
3643 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3644 di = dictGetIterator(dstset->ptr);
3645 while((de = dictNext(di)) != NULL) {
3646 robj *ele;
3647
3648 ele = dictGetEntryKey(de);
3649 addReplyBulkLen(c,ele);
3650 addReply(c,ele);
3651 addReply(c,shared.crlf);
3652 }
3653 dictReleaseIterator(di);
3654 } else {
3655 /* If we have a target key where to store the resulting set
3656 * create this key with the result set inside */
3657 deleteKey(c->db,dstkey);
3658 dictAdd(c->db->dict,dstkey,dstset);
3659 incrRefCount(dstkey);
3660 }
3661
3662 /* Cleanup */
3663 if (!dstkey) {
3664 decrRefCount(dstset);
3665 } else {
3666 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3667 dictSize((dict*)dstset->ptr)));
3668 server.dirty++;
3669 }
3670 zfree(dv);
3671 }
3672
3673 static void sunionCommand(redisClient *c) {
3674 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3675 }
3676
3677 static void sunionstoreCommand(redisClient *c) {
3678 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3679 }
3680
3681 static void sdiffCommand(redisClient *c) {
3682 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3683 }
3684
3685 static void sdiffstoreCommand(redisClient *c) {
3686 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3687 }
3688
3689 /* ==================================== ZSets =============================== */
3690
3691 /* ZSETs are ordered sets using two data structures to hold the same elements
3692 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3693 * data structure.
3694 *
3695 * The elements are added to an hash table mapping Redis objects to scores.
3696 * At the same time the elements are added to a skip list mapping scores
3697 * to Redis objects (so objects are sorted by scores in this "view"). */
3698
3699 /* This skiplist implementation is almost a C translation of the original
3700 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3701 * Alternative to Balanced Trees", modified in three ways:
3702 * a) this implementation allows for repeated values.
3703 * b) the comparison is not just by key (our 'score') but by satellite data.
3704 * c) there is a back pointer, so it's a doubly linked list with the back
3705 * pointers being only at "level 1". This allows to traverse the list
3706 * from tail to head, useful for ZREVRANGE. */
3707
3708 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3709 zskiplistNode *zn = zmalloc(sizeof(*zn));
3710
3711 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3712 zn->score = score;
3713 zn->obj = obj;
3714 return zn;
3715 }
3716
3717 static zskiplist *zslCreate(void) {
3718 int j;
3719 zskiplist *zsl;
3720
3721 zsl = zmalloc(sizeof(*zsl));
3722 zsl->level = 1;
3723 zsl->length = 0;
3724 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3725 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3726 zsl->header->forward[j] = NULL;
3727 zsl->header->backward = NULL;
3728 zsl->tail = NULL;
3729 return zsl;
3730 }
3731
3732 static void zslFreeNode(zskiplistNode *node) {
3733 decrRefCount(node->obj);
3734 zfree(node->forward);
3735 zfree(node);
3736 }
3737
3738 static void zslFree(zskiplist *zsl) {
3739 zskiplistNode *node = zsl->header->forward[0], *next;
3740
3741 zfree(zsl->header->forward);
3742 zfree(zsl->header);
3743 while(node) {
3744 next = node->forward[0];
3745 zslFreeNode(node);
3746 node = next;
3747 }
3748 zfree(zsl);
3749 }
3750
3751 static int zslRandomLevel(void) {
3752 int level = 1;
3753 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3754 level += 1;
3755 return level;
3756 }
3757
3758 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3759 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3760 int i, level;
3761
3762 x = zsl->header;
3763 for (i = zsl->level-1; i >= 0; i--) {
3764 while (x->forward[i] && x->forward[i]->score < score)
3765 x = x->forward[i];
3766 update[i] = x;
3767 }
3768 /* we assume the key is not already inside, since we allow duplicated
3769 * scores, and the re-insertion of score and redis object should never
3770 * happpen since the caller of zslInsert() should test in the hash table
3771 * if the element is already inside or not. */
3772 level = zslRandomLevel();
3773 if (level > zsl->level) {
3774 for (i = zsl->level; i < level; i++)
3775 update[i] = zsl->header;
3776 zsl->level = level;
3777 }
3778 x = zslCreateNode(level,score,obj);
3779 for (i = 0; i < level; i++) {
3780 x->forward[i] = update[i]->forward[i];
3781 update[i]->forward[i] = x;
3782 }
3783 x->backward = (update[0] == zsl->header) ? NULL : update[0];
3784 if (x->forward[0])
3785 x->forward[0]->backward = x;
3786 else
3787 zsl->tail = x;
3788 zsl->length++;
3789 }
3790
3791 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
3792 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3793 int i;
3794
3795 x = zsl->header;
3796 for (i = zsl->level-1; i >= 0; i--) {
3797 while (x->forward[i] && x->forward[i]->score < score)
3798 x = x->forward[i];
3799 update[i] = x;
3800 }
3801 /* We may have multiple elements with the same score, what we need
3802 * is to find the element with both the right score and object. */
3803 x = x->forward[0];
3804 while(x->score == score) {
3805 if (compareStringObjects(x->obj,obj) == 0) {
3806 for (i = 0; i < zsl->level; i++) {
3807 if (update[i]->forward[i] != x) break;
3808 update[i]->forward[i] = x->forward[i];
3809 }
3810 if (x->forward[0]) {
3811 x->forward[0]->backward = (x->backward == zsl->header) ?
3812 NULL : x->backward;
3813 } else {
3814 zsl->tail = x->backward;
3815 }
3816 zslFreeNode(x);
3817 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
3818 zsl->level--;
3819 zsl->length--;
3820 return 1;
3821 } else {
3822 x = x->forward[0];
3823 if (!x) return 0; /* end of the list reached, not found */
3824 }
3825 }
3826 return 0; /* not found */
3827 }
3828
3829 /* The actual Z-commands implementations */
3830
3831 static void zaddCommand(redisClient *c) {
3832 robj *zsetobj;
3833 zset *zs;
3834 double *score;
3835
3836 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
3837 if (zsetobj == NULL) {
3838 zsetobj = createZsetObject();
3839 dictAdd(c->db->dict,c->argv[1],zsetobj);
3840 incrRefCount(c->argv[1]);
3841 } else {
3842 if (zsetobj->type != REDIS_ZSET) {
3843 addReply(c,shared.wrongtypeerr);
3844 return;
3845 }
3846 }
3847 score = zmalloc(sizeof(double));
3848 *score = strtod(c->argv[2]->ptr,NULL);
3849 zs = zsetobj->ptr;
3850 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
3851 /* case 1: New element */
3852 incrRefCount(c->argv[3]); /* added to hash */
3853 zslInsert(zs->zsl,*score,c->argv[3]);
3854 incrRefCount(c->argv[3]); /* added to skiplist */
3855 server.dirty++;
3856 addReply(c,shared.cone);
3857 } else {
3858 dictEntry *de;
3859 double *oldscore;
3860
3861 /* case 2: Score update operation */
3862 de = dictFind(zs->dict,c->argv[3]);
3863 assert(de != NULL);
3864 oldscore = dictGetEntryVal(de);
3865 if (*score != *oldscore) {
3866 int deleted;
3867
3868 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
3869 assert(deleted != 0);
3870 zslInsert(zs->zsl,*score,c->argv[3]);
3871 incrRefCount(c->argv[3]);
3872 server.dirty++;
3873 }
3874 addReply(c,shared.czero);
3875 }
3876 }
3877
3878 static void zremCommand(redisClient *c) {
3879 robj *zsetobj;
3880 zset *zs;
3881
3882 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
3883 if (zsetobj == NULL) {
3884 addReply(c,shared.czero);
3885 } else {
3886 dictEntry *de;
3887 double *oldscore;
3888 int deleted;
3889
3890 if (zsetobj->type != REDIS_ZSET) {
3891 addReply(c,shared.wrongtypeerr);
3892 return;
3893 }
3894 zs = zsetobj->ptr;
3895 de = dictFind(zs->dict,c->argv[2]);
3896 if (de == NULL) {
3897 addReply(c,shared.czero);
3898 return;
3899 }
3900 /* Delete from the skiplist */
3901 oldscore = dictGetEntryVal(de);
3902 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
3903 assert(deleted != 0);
3904
3905 /* Delete from the hash table */
3906 dictDelete(zs->dict,c->argv[2]);
3907 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
3908 server.dirty++;
3909 addReply(c,shared.cone);
3910 }
3911 }
3912
3913 static void zrangeGenericCommand(redisClient *c, int reverse) {
3914 robj *o;
3915 int start = atoi(c->argv[2]->ptr);
3916 int end = atoi(c->argv[3]->ptr);
3917
3918 o = lookupKeyRead(c->db,c->argv[1]);
3919 if (o == NULL) {
3920 addReply(c,shared.nullmultibulk);
3921 } else {
3922 if (o->type != REDIS_ZSET) {
3923 addReply(c,shared.wrongtypeerr);
3924 } else {
3925 zset *zsetobj = o->ptr;
3926 zskiplist *zsl = zsetobj->zsl;
3927 zskiplistNode *ln;
3928
3929 int llen = zsl->length;
3930 int rangelen, j;
3931 robj *ele;
3932
3933 /* convert negative indexes */
3934 if (start < 0) start = llen+start;
3935 if (end < 0) end = llen+end;
3936 if (start < 0) start = 0;
3937 if (end < 0) end = 0;
3938
3939 /* indexes sanity checks */
3940 if (start > end || start >= llen) {
3941 /* Out of range start or start > end result in empty list */
3942 addReply(c,shared.emptymultibulk);
3943 return;
3944 }
3945 if (end >= llen) end = llen-1;
3946 rangelen = (end-start)+1;
3947
3948 /* Return the result in form of a multi-bulk reply */
3949 if (reverse) {
3950 ln = zsl->tail;
3951 while (start--)
3952 ln = ln->backward;
3953 } else {
3954 ln = zsl->header->forward[0];
3955 while (start--)
3956 ln = ln->forward[0];
3957 }
3958
3959 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3960 for (j = 0; j < rangelen; j++) {
3961 ele = ln->obj;
3962 addReplyBulkLen(c,ele);
3963 addReply(c,ele);
3964 addReply(c,shared.crlf);
3965 ln = reverse ? ln->backward : ln->forward[0];
3966 }
3967 }
3968 }
3969 }
3970
3971 static void zrangeCommand(redisClient *c) {
3972 zrangeGenericCommand(c,0);
3973 }
3974
3975 static void zrevrangeCommand(redisClient *c) {
3976 zrangeGenericCommand(c,1);
3977 }
3978
3979 static void zlenCommand(redisClient *c) {
3980 robj *o;
3981 zset *zs;
3982
3983 o = lookupKeyRead(c->db,c->argv[1]);
3984 if (o == NULL) {
3985 addReply(c,shared.czero);
3986 return;
3987 } else {
3988 if (o->type != REDIS_ZSET) {
3989 addReply(c,shared.wrongtypeerr);
3990 } else {
3991 zs = o->ptr;
3992 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
3993 }
3994 }
3995 }
3996
3997 /* ========================= Non type-specific commands ==================== */
3998
3999 static void flushdbCommand(redisClient *c) {
4000 server.dirty += dictSize(c->db->dict);
4001 dictEmpty(c->db->dict);
4002 dictEmpty(c->db->expires);
4003 addReply(c,shared.ok);
4004 }
4005
4006 static void flushallCommand(redisClient *c) {
4007 server.dirty += emptyDb();
4008 addReply(c,shared.ok);
4009 rdbSave(server.dbfilename);
4010 server.dirty++;
4011 }
4012
4013 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4014 redisSortOperation *so = zmalloc(sizeof(*so));
4015 so->type = type;
4016 so->pattern = pattern;
4017 return so;
4018 }
4019
4020 /* Return the value associated to the key with a name obtained
4021 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4022 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4023 char *p;
4024 sds spat, ssub;
4025 robj keyobj;
4026 int prefixlen, sublen, postfixlen;
4027 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4028 struct {
4029 long len;
4030 long free;
4031 char buf[REDIS_SORTKEY_MAX+1];
4032 } keyname;
4033
4034 if (subst->encoding == REDIS_ENCODING_RAW)
4035 incrRefCount(subst);
4036 else {
4037 subst = getDecodedObject(subst);
4038 }
4039
4040 spat = pattern->ptr;
4041 ssub = subst->ptr;
4042 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4043 p = strchr(spat,'*');
4044 if (!p) return NULL;
4045
4046 prefixlen = p-spat;
4047 sublen = sdslen(ssub);
4048 postfixlen = sdslen(spat)-(prefixlen+1);
4049 memcpy(keyname.buf,spat,prefixlen);
4050 memcpy(keyname.buf+prefixlen,ssub,sublen);
4051 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4052 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4053 keyname.len = prefixlen+sublen+postfixlen;
4054
4055 keyobj.refcount = 1;
4056 keyobj.type = REDIS_STRING;
4057 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4058
4059 decrRefCount(subst);
4060
4061 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4062 return lookupKeyRead(db,&keyobj);
4063 }
4064
4065 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4066 * the additional parameter is not standard but a BSD-specific we have to
4067 * pass sorting parameters via the global 'server' structure */
4068 static int sortCompare(const void *s1, const void *s2) {
4069 const redisSortObject *so1 = s1, *so2 = s2;
4070 int cmp;
4071
4072 if (!server.sort_alpha) {
4073 /* Numeric sorting. Here it's trivial as we precomputed scores */
4074 if (so1->u.score > so2->u.score) {
4075 cmp = 1;
4076 } else if (so1->u.score < so2->u.score) {
4077 cmp = -1;
4078 } else {
4079 cmp = 0;
4080 }
4081 } else {
4082 /* Alphanumeric sorting */
4083 if (server.sort_bypattern) {
4084 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4085 /* At least one compare object is NULL */
4086 if (so1->u.cmpobj == so2->u.cmpobj)
4087 cmp = 0;
4088 else if (so1->u.cmpobj == NULL)
4089 cmp = -1;
4090 else
4091 cmp = 1;
4092 } else {
4093 /* We have both the objects, use strcoll */
4094 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4095 }
4096 } else {
4097 /* Compare elements directly */
4098 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4099 so2->obj->encoding == REDIS_ENCODING_RAW) {
4100 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4101 } else {
4102 robj *dec1, *dec2;
4103
4104 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4105 so1->obj : getDecodedObject(so1->obj);
4106 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4107 so2->obj : getDecodedObject(so2->obj);
4108 cmp = strcoll(dec1->ptr,dec2->ptr);
4109 if (dec1 != so1->obj) decrRefCount(dec1);
4110 if (dec2 != so2->obj) decrRefCount(dec2);
4111 }
4112 }
4113 }
4114 return server.sort_desc ? -cmp : cmp;
4115 }
4116
4117 /* The SORT command is the most complex command in Redis. Warning: this code
4118 * is optimized for speed and a bit less for readability */
4119 static void sortCommand(redisClient *c) {
4120 list *operations;
4121 int outputlen = 0;
4122 int desc = 0, alpha = 0;
4123 int limit_start = 0, limit_count = -1, start, end;
4124 int j, dontsort = 0, vectorlen;
4125 int getop = 0; /* GET operation counter */
4126 robj *sortval, *sortby = NULL;
4127 redisSortObject *vector; /* Resulting vector to sort */
4128
4129 /* Lookup the key to sort. It must be of the right types */
4130 sortval = lookupKeyRead(c->db,c->argv[1]);
4131 if (sortval == NULL) {
4132 addReply(c,shared.nokeyerr);
4133 return;
4134 }
4135 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4136 addReply(c,shared.wrongtypeerr);
4137 return;
4138 }
4139
4140 /* Create a list of operations to perform for every sorted element.
4141 * Operations can be GET/DEL/INCR/DECR */
4142 operations = listCreate();
4143 listSetFreeMethod(operations,zfree);
4144 j = 2;
4145
4146 /* Now we need to protect sortval incrementing its count, in the future
4147 * SORT may have options able to overwrite/delete keys during the sorting
4148 * and the sorted key itself may get destroied */
4149 incrRefCount(sortval);
4150
4151 /* The SORT command has an SQL-alike syntax, parse it */
4152 while(j < c->argc) {
4153 int leftargs = c->argc-j-1;
4154 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4155 desc = 0;
4156 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4157 desc = 1;
4158 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4159 alpha = 1;
4160 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4161 limit_start = atoi(c->argv[j+1]->ptr);
4162 limit_count = atoi(c->argv[j+2]->ptr);
4163 j+=2;
4164 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4165 sortby = c->argv[j+1];
4166 /* If the BY pattern does not contain '*', i.e. it is constant,
4167 * we don't need to sort nor to lookup the weight keys. */
4168 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4169 j++;
4170 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4171 listAddNodeTail(operations,createSortOperation(
4172 REDIS_SORT_GET,c->argv[j+1]));
4173 getop++;
4174 j++;
4175 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
4176 listAddNodeTail(operations,createSortOperation(
4177 REDIS_SORT_DEL,c->argv[j+1]));
4178 j++;
4179 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
4180 listAddNodeTail(operations,createSortOperation(
4181 REDIS_SORT_INCR,c->argv[j+1]));
4182 j++;
4183 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4184 listAddNodeTail(operations,createSortOperation(
4185 REDIS_SORT_DECR,c->argv[j+1]));
4186 j++;
4187 } else {
4188 decrRefCount(sortval);
4189 listRelease(operations);
4190 addReply(c,shared.syntaxerr);
4191 return;
4192 }
4193 j++;
4194 }
4195
4196 /* Load the sorting vector with all the objects to sort */
4197 vectorlen = (sortval->type == REDIS_LIST) ?
4198 listLength((list*)sortval->ptr) :
4199 dictSize((dict*)sortval->ptr);
4200 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4201 j = 0;
4202 if (sortval->type == REDIS_LIST) {
4203 list *list = sortval->ptr;
4204 listNode *ln;
4205
4206 listRewind(list);
4207 while((ln = listYield(list))) {
4208 robj *ele = ln->value;
4209 vector[j].obj = ele;
4210 vector[j].u.score = 0;
4211 vector[j].u.cmpobj = NULL;
4212 j++;
4213 }
4214 } else {
4215 dict *set = sortval->ptr;
4216 dictIterator *di;
4217 dictEntry *setele;
4218
4219 di = dictGetIterator(set);
4220 while((setele = dictNext(di)) != NULL) {
4221 vector[j].obj = dictGetEntryKey(setele);
4222 vector[j].u.score = 0;
4223 vector[j].u.cmpobj = NULL;
4224 j++;
4225 }
4226 dictReleaseIterator(di);
4227 }
4228 assert(j == vectorlen);
4229
4230 /* Now it's time to load the right scores in the sorting vector */
4231 if (dontsort == 0) {
4232 for (j = 0; j < vectorlen; j++) {
4233 if (sortby) {
4234 robj *byval;
4235
4236 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4237 if (!byval || byval->type != REDIS_STRING) continue;
4238 if (alpha) {
4239 if (byval->encoding == REDIS_ENCODING_RAW) {
4240 vector[j].u.cmpobj = byval;
4241 incrRefCount(byval);
4242 } else {
4243 vector[j].u.cmpobj = getDecodedObject(byval);
4244 }
4245 } else {
4246 if (byval->encoding == REDIS_ENCODING_RAW) {
4247 vector[j].u.score = strtod(byval->ptr,NULL);
4248 } else {
4249 if (byval->encoding == REDIS_ENCODING_INT) {
4250 vector[j].u.score = (long)byval->ptr;
4251 } else
4252 assert(1 != 1);
4253 }
4254 }
4255 } else {
4256 if (!alpha) {
4257 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4258 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4259 else {
4260 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4261 vector[j].u.score = (long) vector[j].obj->ptr;
4262 else
4263 assert(1 != 1);
4264 }
4265 }
4266 }
4267 }
4268 }
4269
4270 /* We are ready to sort the vector... perform a bit of sanity check
4271 * on the LIMIT option too. We'll use a partial version of quicksort. */
4272 start = (limit_start < 0) ? 0 : limit_start;
4273 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4274 if (start >= vectorlen) {
4275 start = vectorlen-1;
4276 end = vectorlen-2;
4277 }
4278 if (end >= vectorlen) end = vectorlen-1;
4279
4280 if (dontsort == 0) {
4281 server.sort_desc = desc;
4282 server.sort_alpha = alpha;
4283 server.sort_bypattern = sortby ? 1 : 0;
4284 if (sortby && (start != 0 || end != vectorlen-1))
4285 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4286 else
4287 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4288 }
4289
4290 /* Send command output to the output buffer, performing the specified
4291 * GET/DEL/INCR/DECR operations if any. */
4292 outputlen = getop ? getop*(end-start+1) : end-start+1;
4293 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4294 for (j = start; j <= end; j++) {
4295 listNode *ln;
4296 if (!getop) {
4297 addReplyBulkLen(c,vector[j].obj);
4298 addReply(c,vector[j].obj);
4299 addReply(c,shared.crlf);
4300 }
4301 listRewind(operations);
4302 while((ln = listYield(operations))) {
4303 redisSortOperation *sop = ln->value;
4304 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4305 vector[j].obj);
4306
4307 if (sop->type == REDIS_SORT_GET) {
4308 if (!val || val->type != REDIS_STRING) {
4309 addReply(c,shared.nullbulk);
4310 } else {
4311 addReplyBulkLen(c,val);
4312 addReply(c,val);
4313 addReply(c,shared.crlf);
4314 }
4315 } else if (sop->type == REDIS_SORT_DEL) {
4316 /* TODO */
4317 }
4318 }
4319 }
4320
4321 /* Cleanup */
4322 decrRefCount(sortval);
4323 listRelease(operations);
4324 for (j = 0; j < vectorlen; j++) {
4325 if (sortby && alpha && vector[j].u.cmpobj)
4326 decrRefCount(vector[j].u.cmpobj);
4327 }
4328 zfree(vector);
4329 }
4330
4331 static void infoCommand(redisClient *c) {
4332 sds info;
4333 time_t uptime = time(NULL)-server.stat_starttime;
4334 int j;
4335
4336 info = sdscatprintf(sdsempty(),
4337 "redis_version:%s\r\n"
4338 "arch_bits:%s\r\n"
4339 "uptime_in_seconds:%d\r\n"
4340 "uptime_in_days:%d\r\n"
4341 "connected_clients:%d\r\n"
4342 "connected_slaves:%d\r\n"
4343 "used_memory:%zu\r\n"
4344 "changes_since_last_save:%lld\r\n"
4345 "bgsave_in_progress:%d\r\n"
4346 "last_save_time:%d\r\n"
4347 "total_connections_received:%lld\r\n"
4348 "total_commands_processed:%lld\r\n"
4349 "role:%s\r\n"
4350 ,REDIS_VERSION,
4351 (sizeof(long) == 8) ? "64" : "32",
4352 uptime,
4353 uptime/(3600*24),
4354 listLength(server.clients)-listLength(server.slaves),
4355 listLength(server.slaves),
4356 server.usedmemory,
4357 server.dirty,
4358 server.bgsaveinprogress,
4359 server.lastsave,
4360 server.stat_numconnections,
4361 server.stat_numcommands,
4362 server.masterhost == NULL ? "master" : "slave"
4363 );
4364 if (server.masterhost) {
4365 info = sdscatprintf(info,
4366 "master_host:%s\r\n"
4367 "master_port:%d\r\n"
4368 "master_link_status:%s\r\n"
4369 "master_last_io_seconds_ago:%d\r\n"
4370 ,server.masterhost,
4371 server.masterport,
4372 (server.replstate == REDIS_REPL_CONNECTED) ?
4373 "up" : "down",
4374 (int)(time(NULL)-server.master->lastinteraction)
4375 );
4376 }
4377 for (j = 0; j < server.dbnum; j++) {
4378 long long keys, vkeys;
4379
4380 keys = dictSize(server.db[j].dict);
4381 vkeys = dictSize(server.db[j].expires);
4382 if (keys || vkeys) {
4383 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4384 j, keys, vkeys);
4385 }
4386 }
4387 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4388 addReplySds(c,info);
4389 addReply(c,shared.crlf);
4390 }
4391
4392 static void monitorCommand(redisClient *c) {
4393 /* ignore MONITOR if aleady slave or in monitor mode */
4394 if (c->flags & REDIS_SLAVE) return;
4395
4396 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4397 c->slaveseldb = 0;
4398 listAddNodeTail(server.monitors,c);
4399 addReply(c,shared.ok);
4400 }
4401
4402 /* ================================= Expire ================================= */
4403 static int removeExpire(redisDb *db, robj *key) {
4404 if (dictDelete(db->expires,key) == DICT_OK) {
4405 return 1;
4406 } else {
4407 return 0;
4408 }
4409 }
4410
4411 static int setExpire(redisDb *db, robj *key, time_t when) {
4412 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4413 return 0;
4414 } else {
4415 incrRefCount(key);
4416 return 1;
4417 }
4418 }
4419
4420 /* Return the expire time of the specified key, or -1 if no expire
4421 * is associated with this key (i.e. the key is non volatile) */
4422 static time_t getExpire(redisDb *db, robj *key) {
4423 dictEntry *de;
4424
4425 /* No expire? return ASAP */
4426 if (dictSize(db->expires) == 0 ||
4427 (de = dictFind(db->expires,key)) == NULL) return -1;
4428
4429 return (time_t) dictGetEntryVal(de);
4430 }
4431
4432 static int expireIfNeeded(redisDb *db, robj *key) {
4433 time_t when;
4434 dictEntry *de;
4435
4436 /* No expire? return ASAP */
4437 if (dictSize(db->expires) == 0 ||
4438 (de = dictFind(db->expires,key)) == NULL) return 0;
4439
4440 /* Lookup the expire */
4441 when = (time_t) dictGetEntryVal(de);
4442 if (time(NULL) <= when) return 0;
4443
4444 /* Delete the key */
4445 dictDelete(db->expires,key);
4446 return dictDelete(db->dict,key) == DICT_OK;
4447 }
4448
4449 static int deleteIfVolatile(redisDb *db, robj *key) {
4450 dictEntry *de;
4451
4452 /* No expire? return ASAP */
4453 if (dictSize(db->expires) == 0 ||
4454 (de = dictFind(db->expires,key)) == NULL) return 0;
4455
4456 /* Delete the key */
4457 server.dirty++;
4458 dictDelete(db->expires,key);
4459 return dictDelete(db->dict,key) == DICT_OK;
4460 }
4461
4462 static void expireCommand(redisClient *c) {
4463 dictEntry *de;
4464 int seconds = atoi(c->argv[2]->ptr);
4465
4466 de = dictFind(c->db->dict,c->argv[1]);
4467 if (de == NULL) {
4468 addReply(c,shared.czero);
4469 return;
4470 }
4471 if (seconds <= 0) {
4472 addReply(c, shared.czero);
4473 return;
4474 } else {
4475 time_t when = time(NULL)+seconds;
4476 if (setExpire(c->db,c->argv[1],when)) {
4477 addReply(c,shared.cone);
4478 server.dirty++;
4479 } else {
4480 addReply(c,shared.czero);
4481 }
4482 return;
4483 }
4484 }
4485
4486 static void ttlCommand(redisClient *c) {
4487 time_t expire;
4488 int ttl = -1;
4489
4490 expire = getExpire(c->db,c->argv[1]);
4491 if (expire != -1) {
4492 ttl = (int) (expire-time(NULL));
4493 if (ttl < 0) ttl = -1;
4494 }
4495 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4496 }
4497
4498 static void msetGenericCommand(redisClient *c, int nx) {
4499 int j;
4500
4501 if ((c->argc % 2) == 0) {
4502 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4503 return;
4504 }
4505 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4506 * set nothing at all if at least one already key exists. */
4507 if (nx) {
4508 for (j = 1; j < c->argc; j += 2) {
4509 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4510 addReply(c, shared.czero);
4511 return;
4512 }
4513 }
4514 }
4515
4516 for (j = 1; j < c->argc; j += 2) {
4517 int retval;
4518
4519 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4520 if (retval == DICT_ERR) {
4521 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4522 incrRefCount(c->argv[j+1]);
4523 } else {
4524 incrRefCount(c->argv[j]);
4525 incrRefCount(c->argv[j+1]);
4526 }
4527 removeExpire(c->db,c->argv[j]);
4528 }
4529 server.dirty += (c->argc-1)/2;
4530 addReply(c, nx ? shared.cone : shared.ok);
4531 }
4532
4533 static void msetCommand(redisClient *c) {
4534 msetGenericCommand(c,0);
4535 }
4536
4537 static void msetnxCommand(redisClient *c) {
4538 msetGenericCommand(c,1);
4539 }
4540
4541 /* =============================== Replication ============================= */
4542
4543 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4544 ssize_t nwritten, ret = size;
4545 time_t start = time(NULL);
4546
4547 timeout++;
4548 while(size) {
4549 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4550 nwritten = write(fd,ptr,size);
4551 if (nwritten == -1) return -1;
4552 ptr += nwritten;
4553 size -= nwritten;
4554 }
4555 if ((time(NULL)-start) > timeout) {
4556 errno = ETIMEDOUT;
4557 return -1;
4558 }
4559 }
4560 return ret;
4561 }
4562
4563 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4564 ssize_t nread, totread = 0;
4565 time_t start = time(NULL);
4566
4567 timeout++;
4568 while(size) {
4569 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4570 nread = read(fd,ptr,size);
4571 if (nread == -1) return -1;
4572 ptr += nread;
4573 size -= nread;
4574 totread += nread;
4575 }
4576 if ((time(NULL)-start) > timeout) {
4577 errno = ETIMEDOUT;
4578 return -1;
4579 }
4580 }
4581 return totread;
4582 }
4583
4584 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4585 ssize_t nread = 0;
4586
4587 size--;
4588 while(size) {
4589 char c;
4590
4591 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4592 if (c == '\n') {
4593 *ptr = '\0';
4594 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4595 return nread;
4596 } else {
4597 *ptr++ = c;
4598 *ptr = '\0';
4599 nread++;
4600 }
4601 }
4602 return nread;
4603 }
4604
4605 static void syncCommand(redisClient *c) {
4606 /* ignore SYNC if aleady slave or in monitor mode */
4607 if (c->flags & REDIS_SLAVE) return;
4608
4609 /* SYNC can't be issued when the server has pending data to send to
4610 * the client about already issued commands. We need a fresh reply
4611 * buffer registering the differences between the BGSAVE and the current
4612 * dataset, so that we can copy to other slaves if needed. */
4613 if (listLength(c->reply) != 0) {
4614 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4615 return;
4616 }
4617
4618 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4619 /* Here we need to check if there is a background saving operation
4620 * in progress, or if it is required to start one */
4621 if (server.bgsaveinprogress) {
4622 /* Ok a background save is in progress. Let's check if it is a good
4623 * one for replication, i.e. if there is another slave that is
4624 * registering differences since the server forked to save */
4625 redisClient *slave;
4626 listNode *ln;
4627
4628 listRewind(server.slaves);
4629 while((ln = listYield(server.slaves))) {
4630 slave = ln->value;
4631 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4632 }
4633 if (ln) {
4634 /* Perfect, the server is already registering differences for
4635 * another slave. Set the right state, and copy the buffer. */
4636 listRelease(c->reply);
4637 c->reply = listDup(slave->reply);
4638 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4639 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4640 } else {
4641 /* No way, we need to wait for the next BGSAVE in order to
4642 * register differences */
4643 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4644 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4645 }
4646 } else {
4647 /* Ok we don't have a BGSAVE in progress, let's start one */
4648 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4649 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4650 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4651 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4652 return;
4653 }
4654 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4655 }
4656 c->repldbfd = -1;
4657 c->flags |= REDIS_SLAVE;
4658 c->slaveseldb = 0;
4659 listAddNodeTail(server.slaves,c);
4660 return;
4661 }
4662
4663 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4664 redisClient *slave = privdata;
4665 REDIS_NOTUSED(el);
4666 REDIS_NOTUSED(mask);
4667 char buf[REDIS_IOBUF_LEN];
4668 ssize_t nwritten, buflen;
4669
4670 if (slave->repldboff == 0) {
4671 /* Write the bulk write count before to transfer the DB. In theory here
4672 * we don't know how much room there is in the output buffer of the
4673 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4674 * operations) will never be smaller than the few bytes we need. */
4675 sds bulkcount;
4676
4677 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4678 slave->repldbsize);
4679 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4680 {
4681 sdsfree(bulkcount);
4682 freeClient(slave);
4683 return;
4684 }
4685 sdsfree(bulkcount);
4686 }
4687 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4688 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4689 if (buflen <= 0) {
4690 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4691 (buflen == 0) ? "premature EOF" : strerror(errno));
4692 freeClient(slave);
4693 return;
4694 }
4695 if ((nwritten = write(fd,buf,buflen)) == -1) {
4696 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4697 strerror(errno));
4698 freeClient(slave);
4699 return;
4700 }
4701 slave->repldboff += nwritten;
4702 if (slave->repldboff == slave->repldbsize) {
4703 close(slave->repldbfd);
4704 slave->repldbfd = -1;
4705 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4706 slave->replstate = REDIS_REPL_ONLINE;
4707 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4708 sendReplyToClient, slave, NULL) == AE_ERR) {
4709 freeClient(slave);
4710 return;
4711 }
4712 addReplySds(slave,sdsempty());
4713 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4714 }
4715 }
4716
4717 /* This function is called at the end of every backgrond saving.
4718 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4719 * otherwise REDIS_ERR is passed to the function.
4720 *
4721 * The goal of this function is to handle slaves waiting for a successful
4722 * background saving in order to perform non-blocking synchronization. */
4723 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4724 listNode *ln;
4725 int startbgsave = 0;
4726
4727 listRewind(server.slaves);
4728 while((ln = listYield(server.slaves))) {
4729 redisClient *slave = ln->value;
4730
4731 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4732 startbgsave = 1;
4733 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4734 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4735 struct redis_stat buf;
4736
4737 if (bgsaveerr != REDIS_OK) {
4738 freeClient(slave);
4739 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4740 continue;
4741 }
4742 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4743 redis_fstat(slave->repldbfd,&buf) == -1) {
4744 freeClient(slave);
4745 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4746 continue;
4747 }
4748 slave->repldboff = 0;
4749 slave->repldbsize = buf.st_size;
4750 slave->replstate = REDIS_REPL_SEND_BULK;
4751 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4752 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4753 freeClient(slave);
4754 continue;
4755 }
4756 }
4757 }
4758 if (startbgsave) {
4759 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4760 listRewind(server.slaves);
4761 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4762 while((ln = listYield(server.slaves))) {
4763 redisClient *slave = ln->value;
4764
4765 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4766 freeClient(slave);
4767 }
4768 }
4769 }
4770 }
4771
4772 static int syncWithMaster(void) {
4773 char buf[1024], tmpfile[256];
4774 int dumpsize;
4775 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4776 int dfd;
4777
4778 if (fd == -1) {
4779 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4780 strerror(errno));
4781 return REDIS_ERR;
4782 }
4783 /* Issue the SYNC command */
4784 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4785 close(fd);
4786 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4787 strerror(errno));
4788 return REDIS_ERR;
4789 }
4790 /* Read the bulk write count */
4791 if (syncReadLine(fd,buf,1024,3600) == -1) {
4792 close(fd);
4793 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
4794 strerror(errno));
4795 return REDIS_ERR;
4796 }
4797 dumpsize = atoi(buf+1);
4798 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4799 /* Read the bulk write data on a temp file */
4800 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4801 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4802 if (dfd == -1) {
4803 close(fd);
4804 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
4805 return REDIS_ERR;
4806 }
4807 while(dumpsize) {
4808 int nread, nwritten;
4809
4810 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
4811 if (nread == -1) {
4812 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
4813 strerror(errno));
4814 close(fd);
4815 close(dfd);
4816 return REDIS_ERR;
4817 }
4818 nwritten = write(dfd,buf,nread);
4819 if (nwritten == -1) {
4820 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
4821 close(fd);
4822 close(dfd);
4823 return REDIS_ERR;
4824 }
4825 dumpsize -= nread;
4826 }
4827 close(dfd);
4828 if (rename(tmpfile,server.dbfilename) == -1) {
4829 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
4830 unlink(tmpfile);
4831 close(fd);
4832 return REDIS_ERR;
4833 }
4834 emptyDb();
4835 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4836 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4837 close(fd);
4838 return REDIS_ERR;
4839 }
4840 server.master = createClient(fd);
4841 server.master->flags |= REDIS_MASTER;
4842 server.replstate = REDIS_REPL_CONNECTED;
4843 return REDIS_OK;
4844 }
4845
4846 static void slaveofCommand(redisClient *c) {
4847 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4848 !strcasecmp(c->argv[2]->ptr,"one")) {
4849 if (server.masterhost) {
4850 sdsfree(server.masterhost);
4851 server.masterhost = NULL;
4852 if (server.master) freeClient(server.master);
4853 server.replstate = REDIS_REPL_NONE;
4854 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4855 }
4856 } else {
4857 sdsfree(server.masterhost);
4858 server.masterhost = sdsdup(c->argv[1]->ptr);
4859 server.masterport = atoi(c->argv[2]->ptr);
4860 if (server.master) freeClient(server.master);
4861 server.replstate = REDIS_REPL_CONNECT;
4862 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4863 server.masterhost, server.masterport);
4864 }
4865 addReply(c,shared.ok);
4866 }
4867
4868 /* ============================ Maxmemory directive ======================== */
4869
4870 /* This function gets called when 'maxmemory' is set on the config file to limit
4871 * the max memory used by the server, and we are out of memory.
4872 * This function will try to, in order:
4873 *
4874 * - Free objects from the free list
4875 * - Try to remove keys with an EXPIRE set
4876 *
4877 * It is not possible to free enough memory to reach used-memory < maxmemory
4878 * the server will start refusing commands that will enlarge even more the
4879 * memory usage.
4880 */
4881 static void freeMemoryIfNeeded(void) {
4882 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4883 if (listLength(server.objfreelist)) {
4884 robj *o;
4885
4886 listNode *head = listFirst(server.objfreelist);
4887 o = listNodeValue(head);
4888 listDelNode(server.objfreelist,head);
4889 zfree(o);
4890 } else {
4891 int j, k, freed = 0;
4892
4893 for (j = 0; j < server.dbnum; j++) {
4894 int minttl = -1;
4895 robj *minkey = NULL;
4896 struct dictEntry *de;
4897
4898 if (dictSize(server.db[j].expires)) {
4899 freed = 1;
4900 /* From a sample of three keys drop the one nearest to
4901 * the natural expire */
4902 for (k = 0; k < 3; k++) {
4903 time_t t;
4904
4905 de = dictGetRandomKey(server.db[j].expires);
4906 t = (time_t) dictGetEntryVal(de);
4907 if (minttl == -1 || t < minttl) {
4908 minkey = dictGetEntryKey(de);
4909 minttl = t;
4910 }
4911 }
4912 deleteKey(server.db+j,minkey);
4913 }
4914 }
4915 if (!freed) return; /* nothing to free... */
4916 }
4917 }
4918 }
4919
4920 /* ================================= Debugging ============================== */
4921
4922 static void debugCommand(redisClient *c) {
4923 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4924 *((char*)-1) = 'x';
4925 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4926 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4927 robj *key, *val;
4928
4929 if (!de) {
4930 addReply(c,shared.nokeyerr);
4931 return;
4932 }
4933 key = dictGetEntryKey(de);
4934 val = dictGetEntryVal(de);
4935 addReplySds(c,sdscatprintf(sdsempty(),
4936 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4937 key, key->refcount, val, val->refcount, val->encoding));
4938 } else {
4939 addReplySds(c,sdsnew(
4940 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4941 }
4942 }
4943
4944 #ifdef HAVE_BACKTRACE
4945 static struct redisFunctionSym symsTable[] = {
4946 {"compareStringObjects", (unsigned long)compareStringObjects},
4947 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
4948 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
4949 {"dictEncObjHash", (unsigned long)dictEncObjHash},
4950 {"incrDecrCommand", (unsigned long)incrDecrCommand},
4951 {"freeStringObject", (unsigned long)freeStringObject},
4952 {"freeListObject", (unsigned long)freeListObject},
4953 {"freeSetObject", (unsigned long)freeSetObject},
4954 {"decrRefCount", (unsigned long)decrRefCount},
4955 {"createObject", (unsigned long)createObject},
4956 {"freeClient", (unsigned long)freeClient},
4957 {"rdbLoad", (unsigned long)rdbLoad},
4958 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
4959 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
4960 {"addReply", (unsigned long)addReply},
4961 {"addReplySds", (unsigned long)addReplySds},
4962 {"incrRefCount", (unsigned long)incrRefCount},
4963 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
4964 {"createStringObject", (unsigned long)createStringObject},
4965 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
4966 {"syncWithMaster", (unsigned long)syncWithMaster},
4967 {"tryObjectSharing", (unsigned long)tryObjectSharing},
4968 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
4969 {"getDecodedObject", (unsigned long)getDecodedObject},
4970 {"removeExpire", (unsigned long)removeExpire},
4971 {"expireIfNeeded", (unsigned long)expireIfNeeded},
4972 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
4973 {"deleteKey", (unsigned long)deleteKey},
4974 {"getExpire", (unsigned long)getExpire},
4975 {"setExpire", (unsigned long)setExpire},
4976 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
4977 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
4978 {"authCommand", (unsigned long)authCommand},
4979 {"pingCommand", (unsigned long)pingCommand},
4980 {"echoCommand", (unsigned long)echoCommand},
4981 {"setCommand", (unsigned long)setCommand},
4982 {"setnxCommand", (unsigned long)setnxCommand},
4983 {"getCommand", (unsigned long)getCommand},
4984 {"delCommand", (unsigned long)delCommand},
4985 {"existsCommand", (unsigned long)existsCommand},
4986 {"incrCommand", (unsigned long)incrCommand},
4987 {"decrCommand", (unsigned long)decrCommand},
4988 {"incrbyCommand", (unsigned long)incrbyCommand},
4989 {"decrbyCommand", (unsigned long)decrbyCommand},
4990 {"selectCommand", (unsigned long)selectCommand},
4991 {"randomkeyCommand", (unsigned long)randomkeyCommand},
4992 {"keysCommand", (unsigned long)keysCommand},
4993 {"dbsizeCommand", (unsigned long)dbsizeCommand},
4994 {"lastsaveCommand", (unsigned long)lastsaveCommand},
4995 {"saveCommand", (unsigned long)saveCommand},
4996 {"bgsaveCommand", (unsigned long)bgsaveCommand},
4997 {"shutdownCommand", (unsigned long)shutdownCommand},
4998 {"moveCommand", (unsigned long)moveCommand},
4999 {"renameCommand", (unsigned long)renameCommand},
5000 {"renamenxCommand", (unsigned long)renamenxCommand},
5001 {"lpushCommand", (unsigned long)lpushCommand},
5002 {"rpushCommand", (unsigned long)rpushCommand},
5003 {"lpopCommand", (unsigned long)lpopCommand},
5004 {"rpopCommand", (unsigned long)rpopCommand},
5005 {"llenCommand", (unsigned long)llenCommand},
5006 {"lindexCommand", (unsigned long)lindexCommand},
5007 {"lrangeCommand", (unsigned long)lrangeCommand},
5008 {"ltrimCommand", (unsigned long)ltrimCommand},
5009 {"typeCommand", (unsigned long)typeCommand},
5010 {"lsetCommand", (unsigned long)lsetCommand},
5011 {"saddCommand", (unsigned long)saddCommand},
5012 {"sremCommand", (unsigned long)sremCommand},
5013 {"smoveCommand", (unsigned long)smoveCommand},
5014 {"sismemberCommand", (unsigned long)sismemberCommand},
5015 {"scardCommand", (unsigned long)scardCommand},
5016 {"spopCommand", (unsigned long)spopCommand},
5017 {"srandmemberCommand", (unsigned long)srandmemberCommand},
5018 {"sinterCommand", (unsigned long)sinterCommand},
5019 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
5020 {"sunionCommand", (unsigned long)sunionCommand},
5021 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
5022 {"sdiffCommand", (unsigned long)sdiffCommand},
5023 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
5024 {"syncCommand", (unsigned long)syncCommand},
5025 {"flushdbCommand", (unsigned long)flushdbCommand},
5026 {"flushallCommand", (unsigned long)flushallCommand},
5027 {"sortCommand", (unsigned long)sortCommand},
5028 {"lremCommand", (unsigned long)lremCommand},
5029 {"infoCommand", (unsigned long)infoCommand},
5030 {"mgetCommand", (unsigned long)mgetCommand},
5031 {"monitorCommand", (unsigned long)monitorCommand},
5032 {"expireCommand", (unsigned long)expireCommand},
5033 {"getsetCommand", (unsigned long)getsetCommand},
5034 {"ttlCommand", (unsigned long)ttlCommand},
5035 {"slaveofCommand", (unsigned long)slaveofCommand},
5036 {"debugCommand", (unsigned long)debugCommand},
5037 {"processCommand", (unsigned long)processCommand},
5038 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
5039 {"readQueryFromClient", (unsigned long)readQueryFromClient},
5040 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
5041 {"msetGenericCommand", (unsigned long)msetGenericCommand},
5042 {"msetCommand", (unsigned long)msetCommand},
5043 {"msetnxCommand", (unsigned long)msetnxCommand},
5044 {"zslCreateNode", (unsigned long)zslCreateNode},
5045 {"zslCreate", (unsigned long)zslCreate},
5046 {"zslFreeNode",(unsigned long)zslFreeNode},
5047 {"zslFree",(unsigned long)zslFree},
5048 {"zslRandomLevel",(unsigned long)zslRandomLevel},
5049 {"zslInsert",(unsigned long)zslInsert},
5050 {"zslDelete",(unsigned long)zslDelete},
5051 {"createZsetObject",(unsigned long)createZsetObject},
5052 {"zaddCommand",(unsigned long)zaddCommand},
5053 {"zrangeGenericCommand",(unsigned long)zrangeGenericCommand},
5054 {"zrangeCommand",(unsigned long)zrangeCommand},
5055 {"zrevrangeCommand",(unsigned long)zrevrangeCommand},
5056 {"zremCommand",(unsigned long)zremCommand},
5057 {NULL,0}
5058 };
5059
5060 /* This function try to convert a pointer into a function name. It's used in
5061 * oreder to provide a backtrace under segmentation fault that's able to
5062 * display functions declared as static (otherwise the backtrace is useless). */
5063 static char *findFuncName(void *pointer, unsigned long *offset){
5064 int i, ret = -1;
5065 unsigned long off, minoff = 0;
5066
5067 /* Try to match against the Symbol with the smallest offset */
5068 for (i=0; symsTable[i].pointer; i++) {
5069 unsigned long lp = (unsigned long) pointer;
5070
5071 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5072 off=lp-symsTable[i].pointer;
5073 if (ret < 0 || off < minoff) {
5074 minoff=off;
5075 ret=i;
5076 }
5077 }
5078 }
5079 if (ret == -1) return NULL;
5080 *offset = minoff;
5081 return symsTable[ret].name;
5082 }
5083
5084 static void *getMcontextEip(ucontext_t *uc) {
5085 #if defined(__FreeBSD__)
5086 return (void*) uc->uc_mcontext.mc_eip;
5087 #elif defined(__dietlibc__)
5088 return (void*) uc->uc_mcontext.eip;
5089 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5090 return (void*) uc->uc_mcontext->__ss.__eip;
5091 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5092 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5093 return (void*) uc->uc_mcontext->__ss.__rip;
5094 #else
5095 return (void*) uc->uc_mcontext->__ss.__eip;
5096 #endif
5097 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5098 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5099 #elif defined(__ia64__) /* Linux IA64 */
5100 return (void*) uc->uc_mcontext.sc_ip;
5101 #else
5102 return NULL;
5103 #endif
5104 }
5105
5106 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5107 void *trace[100];
5108 char **messages = NULL;
5109 int i, trace_size = 0;
5110 unsigned long offset=0;
5111 time_t uptime = time(NULL)-server.stat_starttime;
5112 ucontext_t *uc = (ucontext_t*) secret;
5113 REDIS_NOTUSED(info);
5114
5115 redisLog(REDIS_WARNING,
5116 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5117 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5118 "redis_version:%s; "
5119 "uptime_in_seconds:%d; "
5120 "connected_clients:%d; "
5121 "connected_slaves:%d; "
5122 "used_memory:%zu; "
5123 "changes_since_last_save:%lld; "
5124 "bgsave_in_progress:%d; "
5125 "last_save_time:%d; "
5126 "total_connections_received:%lld; "
5127 "total_commands_processed:%lld; "
5128 "role:%s;"
5129 ,REDIS_VERSION,
5130 uptime,
5131 listLength(server.clients)-listLength(server.slaves),
5132 listLength(server.slaves),
5133 server.usedmemory,
5134 server.dirty,
5135 server.bgsaveinprogress,
5136 server.lastsave,
5137 server.stat_numconnections,
5138 server.stat_numcommands,
5139 server.masterhost == NULL ? "master" : "slave"
5140 ));
5141
5142 trace_size = backtrace(trace, 100);
5143 /* overwrite sigaction with caller's address */
5144 if (getMcontextEip(uc) != NULL) {
5145 trace[1] = getMcontextEip(uc);
5146 }
5147 messages = backtrace_symbols(trace, trace_size);
5148
5149 for (i=1; i<trace_size; ++i) {
5150 char *fn = findFuncName(trace[i], &offset), *p;
5151
5152 p = strchr(messages[i],'+');
5153 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5154 redisLog(REDIS_WARNING,"%s", messages[i]);
5155 } else {
5156 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5157 }
5158 }
5159 free(messages);
5160 exit(0);
5161 }
5162
5163 static void setupSigSegvAction(void) {
5164 struct sigaction act;
5165
5166 sigemptyset (&act.sa_mask);
5167 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5168 * is used. Otherwise, sa_handler is used */
5169 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5170 act.sa_sigaction = segvHandler;
5171 sigaction (SIGSEGV, &act, NULL);
5172 sigaction (SIGBUS, &act, NULL);
5173 sigaction (SIGFPE, &act, NULL);
5174 sigaction (SIGILL, &act, NULL);
5175 sigaction (SIGBUS, &act, NULL);
5176 return;
5177 }
5178 #else /* HAVE_BACKTRACE */
5179 static void setupSigSegvAction(void) {
5180 }
5181 #endif /* HAVE_BACKTRACE */
5182
5183 /* =================================== Main! ================================ */
5184
5185 #ifdef __linux__
5186 int linuxOvercommitMemoryValue(void) {
5187 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5188 char buf[64];
5189
5190 if (!fp) return -1;
5191 if (fgets(buf,64,fp) == NULL) {
5192 fclose(fp);
5193 return -1;
5194 }
5195 fclose(fp);
5196
5197 return atoi(buf);
5198 }
5199
5200 void linuxOvercommitMemoryWarning(void) {
5201 if (linuxOvercommitMemoryValue() == 0) {
5202 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5203 }
5204 }
5205 #endif /* __linux__ */
5206
5207 static void daemonize(void) {
5208 int fd;
5209 FILE *fp;
5210
5211 if (fork() != 0) exit(0); /* parent exits */
5212 setsid(); /* create a new session */
5213
5214 /* Every output goes to /dev/null. If Redis is daemonized but
5215 * the 'logfile' is set to 'stdout' in the configuration file
5216 * it will not log at all. */
5217 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5218 dup2(fd, STDIN_FILENO);
5219 dup2(fd, STDOUT_FILENO);
5220 dup2(fd, STDERR_FILENO);
5221 if (fd > STDERR_FILENO) close(fd);
5222 }
5223 /* Try to write the pid file */
5224 fp = fopen(server.pidfile,"w");
5225 if (fp) {
5226 fprintf(fp,"%d\n",getpid());
5227 fclose(fp);
5228 }
5229 }
5230
5231 int main(int argc, char **argv) {
5232 initServerConfig();
5233 if (argc == 2) {
5234 ResetServerSaveParams();
5235 loadServerConfig(argv[1]);
5236 } else if (argc > 2) {
5237 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5238 exit(1);
5239 } else {
5240 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5241 }
5242 initServer();
5243 if (server.daemonize) daemonize();
5244 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5245 #ifdef __linux__
5246 linuxOvercommitMemoryWarning();
5247 #endif
5248 if (rdbLoad(server.dbfilename) == REDIS_OK)
5249 redisLog(REDIS_NOTICE,"DB loaded from disk");
5250 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5251 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5252 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5253 aeMain(server.el);
5254 aeDeleteEventLoop(server.el);
5255 return 0;
5256 }