]> git.saurik.com Git - redis.git/blob - redis.c
83318443171228bfe700088940f37f430c5e08c4
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60
61 #include "redis.h"
62 #include "ae.h" /* Event driven programming library */
63 #include "sds.h" /* Dynamic safe strings */
64 #include "anet.h" /* Networking the easy way */
65 #include "dict.h" /* Hash tables */
66 #include "adlist.h" /* Linked lists */
67 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
68 #include "lzf.h" /* LZF compression library */
69 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
70
71 /* Error codes */
72 #define REDIS_OK 0
73 #define REDIS_ERR -1
74
75 /* Static server configuration */
76 #define REDIS_SERVERPORT 6379 /* TCP port */
77 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
78 #define REDIS_IOBUF_LEN 1024
79 #define REDIS_LOADBUF_LEN 1024
80 #define REDIS_STATIC_ARGS 4
81 #define REDIS_DEFAULT_DBNUM 16
82 #define REDIS_CONFIGLINE_MAX 1024
83 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
84 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
85 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
86 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
87 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
88
89 /* Hash table parameters */
90 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
91
92 /* Command flags */
93 #define REDIS_CMD_BULK 1 /* Bulk write command */
94 #define REDIS_CMD_INLINE 2 /* Inline command */
95 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
96 this flags will return an error when the 'maxmemory' option is set in the
97 config file and the server is using more than maxmemory bytes of memory.
98 In short this commands are denied on low memory conditions. */
99 #define REDIS_CMD_DENYOOM 4
100
101 /* Object types */
102 #define REDIS_STRING 0
103 #define REDIS_LIST 1
104 #define REDIS_SET 2
105 #define REDIS_ZSET 3
106 #define REDIS_HASH 4
107
108 /* Objects encoding */
109 #define REDIS_ENCODING_RAW 0 /* Raw representation */
110 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
111
112 /* Object types only used for dumping to disk */
113 #define REDIS_EXPIRETIME 253
114 #define REDIS_SELECTDB 254
115 #define REDIS_EOF 255
116
117 /* Defines related to the dump file format. To store 32 bits lengths for short
118 * keys requires a lot of space, so we check the most significant 2 bits of
119 * the first byte to interpreter the length:
120 *
121 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
122 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
123 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
124 * 11|000000 this means: specially encoded object will follow. The six bits
125 * number specify the kind of object that follows.
126 * See the REDIS_RDB_ENC_* defines.
127 *
128 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
129 * values, will fit inside. */
130 #define REDIS_RDB_6BITLEN 0
131 #define REDIS_RDB_14BITLEN 1
132 #define REDIS_RDB_32BITLEN 2
133 #define REDIS_RDB_ENCVAL 3
134 #define REDIS_RDB_LENERR UINT_MAX
135
136 /* When a length of a string object stored on disk has the first two bits
137 * set, the remaining two bits specify a special encoding for the object
138 * accordingly to the following defines: */
139 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
140 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
141 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
142 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
143
144 /* Client flags */
145 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
146 #define REDIS_SLAVE 2 /* This client is a slave server */
147 #define REDIS_MASTER 4 /* This client is a master server */
148 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
149
150 /* Slave replication state - slave side */
151 #define REDIS_REPL_NONE 0 /* No active replication */
152 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
153 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
154
155 /* Slave replication state - from the point of view of master
156 * Note that in SEND_BULK and ONLINE state the slave receives new updates
157 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
158 * to start the next background saving in order to send updates to it. */
159 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
160 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
161 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
162 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
163
164 /* List related stuff */
165 #define REDIS_HEAD 0
166 #define REDIS_TAIL 1
167
168 /* Sort operations */
169 #define REDIS_SORT_GET 0
170 #define REDIS_SORT_DEL 1
171 #define REDIS_SORT_INCR 2
172 #define REDIS_SORT_DECR 3
173 #define REDIS_SORT_ASC 4
174 #define REDIS_SORT_DESC 5
175 #define REDIS_SORTKEY_MAX 1024
176
177 /* Log levels */
178 #define REDIS_DEBUG 0
179 #define REDIS_NOTICE 1
180 #define REDIS_WARNING 2
181
182 /* Anti-warning macro... */
183 #define REDIS_NOTUSED(V) ((void) V)
184
185 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
186 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
187
188 /*================================= Data types ============================== */
189
190 /* A redis object, that is a type able to hold a string / list / set */
191 typedef struct redisObject {
192 void *ptr;
193 unsigned char type;
194 unsigned char encoding;
195 unsigned char notused[2];
196 int refcount;
197 } robj;
198
199 typedef struct redisDb {
200 dict *dict;
201 dict *expires;
202 int id;
203 } redisDb;
204
205 /* With multiplexing we need to take per-clinet state.
206 * Clients are taken in a liked list. */
207 typedef struct redisClient {
208 int fd;
209 redisDb *db;
210 int dictid;
211 sds querybuf;
212 robj **argv, **mbargv;
213 int argc, mbargc;
214 int bulklen; /* bulk read len. -1 if not in bulk read mode */
215 int multibulk; /* multi bulk command format active */
216 list *reply;
217 int sentlen;
218 time_t lastinteraction; /* time of the last interaction, used for timeout */
219 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
220 int slaveseldb; /* slave selected db, if this client is a slave */
221 int authenticated; /* when requirepass is non-NULL */
222 int replstate; /* replication state if this is a slave */
223 int repldbfd; /* replication DB file descriptor */
224 long repldboff; /* replication DB file offset */
225 off_t repldbsize; /* replication DB file size */
226 } redisClient;
227
228 struct saveparam {
229 time_t seconds;
230 int changes;
231 };
232
233 /* Global server state structure */
234 struct redisServer {
235 int port;
236 int fd;
237 redisDb *db;
238 dict *sharingpool;
239 unsigned int sharingpoolsize;
240 long long dirty; /* changes to DB from the last save */
241 list *clients;
242 list *slaves, *monitors;
243 char neterr[ANET_ERR_LEN];
244 aeEventLoop *el;
245 int cronloops; /* number of times the cron function run */
246 list *objfreelist; /* A list of freed objects to avoid malloc() */
247 time_t lastsave; /* Unix time of last save succeeede */
248 size_t usedmemory; /* Used memory in megabytes */
249 /* Fields used only for stats */
250 time_t stat_starttime; /* server start time */
251 long long stat_numcommands; /* number of processed commands */
252 long long stat_numconnections; /* number of connections received */
253 /* Configuration */
254 int verbosity;
255 int glueoutputbuf;
256 int maxidletime;
257 int dbnum;
258 int daemonize;
259 char *pidfile;
260 int bgsaveinprogress;
261 pid_t bgsavechildpid;
262 struct saveparam *saveparams;
263 int saveparamslen;
264 char *logfile;
265 char *bindaddr;
266 char *dbfilename;
267 char *requirepass;
268 int shareobjects;
269 /* Replication related */
270 int isslave;
271 char *masterhost;
272 int masterport;
273 redisClient *master; /* client that is master for this slave */
274 int replstate;
275 unsigned int maxclients;
276 unsigned long maxmemory;
277 /* Sort parameters - qsort_r() is only available under BSD so we
278 * have to take this state global, in order to pass it to sortCompare() */
279 int sort_desc;
280 int sort_alpha;
281 int sort_bypattern;
282 };
283
284 typedef void redisCommandProc(redisClient *c);
285 struct redisCommand {
286 char *name;
287 redisCommandProc *proc;
288 int arity;
289 int flags;
290 };
291
292 struct redisFunctionSym {
293 char *name;
294 unsigned long pointer;
295 };
296
297 typedef struct _redisSortObject {
298 robj *obj;
299 union {
300 double score;
301 robj *cmpobj;
302 } u;
303 } redisSortObject;
304
305 typedef struct _redisSortOperation {
306 int type;
307 robj *pattern;
308 } redisSortOperation;
309
310 /* ZSETs use a specialized version of Skiplists */
311
312 typedef struct zskiplistNode {
313 struct zskiplistNode **forward;
314 double score;
315 robj *obj;
316 } zskiplistNode;
317
318 typedef struct zskiplist {
319 struct zskiplistNode *header;
320 long length;
321 int level;
322 } zskiplist;
323
324 typedef struct zset {
325 dict *dict;
326 zskiplist *zsl;
327 } zset;
328
329 /* Our shared "common" objects */
330
331 struct sharedObjectsStruct {
332 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
333 *colon, *nullbulk, *nullmultibulk,
334 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
335 *outofrangeerr, *plus,
336 *select0, *select1, *select2, *select3, *select4,
337 *select5, *select6, *select7, *select8, *select9;
338 } shared;
339
340 /*================================ Prototypes =============================== */
341
342 static void freeStringObject(robj *o);
343 static void freeListObject(robj *o);
344 static void freeSetObject(robj *o);
345 static void decrRefCount(void *o);
346 static robj *createObject(int type, void *ptr);
347 static void freeClient(redisClient *c);
348 static int rdbLoad(char *filename);
349 static void addReply(redisClient *c, robj *obj);
350 static void addReplySds(redisClient *c, sds s);
351 static void incrRefCount(robj *o);
352 static int rdbSaveBackground(char *filename);
353 static robj *createStringObject(char *ptr, size_t len);
354 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
355 static int syncWithMaster(void);
356 static robj *tryObjectSharing(robj *o);
357 static int tryObjectEncoding(robj *o);
358 static robj *getDecodedObject(const robj *o);
359 static int removeExpire(redisDb *db, robj *key);
360 static int expireIfNeeded(redisDb *db, robj *key);
361 static int deleteIfVolatile(redisDb *db, robj *key);
362 static int deleteKey(redisDb *db, robj *key);
363 static time_t getExpire(redisDb *db, robj *key);
364 static int setExpire(redisDb *db, robj *key, time_t when);
365 static void updateSlavesWaitingBgsave(int bgsaveerr);
366 static void freeMemoryIfNeeded(void);
367 static int processCommand(redisClient *c);
368 static void setupSigSegvAction(void);
369 static void rdbRemoveTempFile(pid_t childpid);
370 static size_t stringObjectLen(robj *o);
371 static void processInputBuffer(redisClient *c);
372 static zskiplist *zslCreate(void);
373 static void zslFree(zskiplist *zsl);
374
375 static void authCommand(redisClient *c);
376 static void pingCommand(redisClient *c);
377 static void echoCommand(redisClient *c);
378 static void setCommand(redisClient *c);
379 static void setnxCommand(redisClient *c);
380 static void getCommand(redisClient *c);
381 static void delCommand(redisClient *c);
382 static void existsCommand(redisClient *c);
383 static void incrCommand(redisClient *c);
384 static void decrCommand(redisClient *c);
385 static void incrbyCommand(redisClient *c);
386 static void decrbyCommand(redisClient *c);
387 static void selectCommand(redisClient *c);
388 static void randomkeyCommand(redisClient *c);
389 static void keysCommand(redisClient *c);
390 static void dbsizeCommand(redisClient *c);
391 static void lastsaveCommand(redisClient *c);
392 static void saveCommand(redisClient *c);
393 static void bgsaveCommand(redisClient *c);
394 static void shutdownCommand(redisClient *c);
395 static void moveCommand(redisClient *c);
396 static void renameCommand(redisClient *c);
397 static void renamenxCommand(redisClient *c);
398 static void lpushCommand(redisClient *c);
399 static void rpushCommand(redisClient *c);
400 static void lpopCommand(redisClient *c);
401 static void rpopCommand(redisClient *c);
402 static void llenCommand(redisClient *c);
403 static void lindexCommand(redisClient *c);
404 static void lrangeCommand(redisClient *c);
405 static void ltrimCommand(redisClient *c);
406 static void typeCommand(redisClient *c);
407 static void lsetCommand(redisClient *c);
408 static void saddCommand(redisClient *c);
409 static void sremCommand(redisClient *c);
410 static void smoveCommand(redisClient *c);
411 static void sismemberCommand(redisClient *c);
412 static void scardCommand(redisClient *c);
413 static void spopCommand(redisClient *c);
414 static void srandmemberCommand(redisClient *c);
415 static void sinterCommand(redisClient *c);
416 static void sinterstoreCommand(redisClient *c);
417 static void sunionCommand(redisClient *c);
418 static void sunionstoreCommand(redisClient *c);
419 static void sdiffCommand(redisClient *c);
420 static void sdiffstoreCommand(redisClient *c);
421 static void syncCommand(redisClient *c);
422 static void flushdbCommand(redisClient *c);
423 static void flushallCommand(redisClient *c);
424 static void sortCommand(redisClient *c);
425 static void lremCommand(redisClient *c);
426 static void infoCommand(redisClient *c);
427 static void mgetCommand(redisClient *c);
428 static void monitorCommand(redisClient *c);
429 static void expireCommand(redisClient *c);
430 static void getsetCommand(redisClient *c);
431 static void ttlCommand(redisClient *c);
432 static void slaveofCommand(redisClient *c);
433 static void debugCommand(redisClient *c);
434 static void msetCommand(redisClient *c);
435 static void msetnxCommand(redisClient *c);
436 static void zaddCommand(redisClient *c);
437 static void zrangeCommand(redisClient *c);
438 static void zlenCommand(redisClient *c);
439 static void zremCommand(redisClient *c);
440
441 /*================================= Globals ================================= */
442
443 /* Global vars */
444 static struct redisServer server; /* server global state */
445 static struct redisCommand cmdTable[] = {
446 {"get",getCommand,2,REDIS_CMD_INLINE},
447 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
448 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
449 {"del",delCommand,-2,REDIS_CMD_INLINE},
450 {"exists",existsCommand,2,REDIS_CMD_INLINE},
451 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
452 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
453 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
454 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
455 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
456 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
457 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
458 {"llen",llenCommand,2,REDIS_CMD_INLINE},
459 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
460 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
461 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
462 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
463 {"lrem",lremCommand,4,REDIS_CMD_BULK},
464 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
465 {"srem",sremCommand,3,REDIS_CMD_BULK},
466 {"smove",smoveCommand,4,REDIS_CMD_BULK},
467 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
468 {"scard",scardCommand,2,REDIS_CMD_INLINE},
469 {"spop",spopCommand,2,REDIS_CMD_INLINE},
470 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
471 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
472 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
473 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
474 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
475 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
476 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
477 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
478 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
479 {"zrem",zremCommand,3,REDIS_CMD_BULK},
480 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
481 {"zlen",zlenCommand,2,REDIS_CMD_INLINE},
482 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
483 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
484 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
485 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
486 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
487 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
488 {"select",selectCommand,2,REDIS_CMD_INLINE},
489 {"move",moveCommand,3,REDIS_CMD_INLINE},
490 {"rename",renameCommand,3,REDIS_CMD_INLINE},
491 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
492 {"expire",expireCommand,3,REDIS_CMD_INLINE},
493 {"keys",keysCommand,2,REDIS_CMD_INLINE},
494 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
495 {"auth",authCommand,2,REDIS_CMD_INLINE},
496 {"ping",pingCommand,1,REDIS_CMD_INLINE},
497 {"echo",echoCommand,2,REDIS_CMD_BULK},
498 {"save",saveCommand,1,REDIS_CMD_INLINE},
499 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
500 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
501 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
502 {"type",typeCommand,2,REDIS_CMD_INLINE},
503 {"sync",syncCommand,1,REDIS_CMD_INLINE},
504 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
505 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
506 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
507 {"info",infoCommand,1,REDIS_CMD_INLINE},
508 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
509 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
510 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
511 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
512 {NULL,NULL,0,0}
513 };
514 /*============================ Utility functions ============================ */
515
516 /* Glob-style pattern matching. */
517 int stringmatchlen(const char *pattern, int patternLen,
518 const char *string, int stringLen, int nocase)
519 {
520 while(patternLen) {
521 switch(pattern[0]) {
522 case '*':
523 while (pattern[1] == '*') {
524 pattern++;
525 patternLen--;
526 }
527 if (patternLen == 1)
528 return 1; /* match */
529 while(stringLen) {
530 if (stringmatchlen(pattern+1, patternLen-1,
531 string, stringLen, nocase))
532 return 1; /* match */
533 string++;
534 stringLen--;
535 }
536 return 0; /* no match */
537 break;
538 case '?':
539 if (stringLen == 0)
540 return 0; /* no match */
541 string++;
542 stringLen--;
543 break;
544 case '[':
545 {
546 int not, match;
547
548 pattern++;
549 patternLen--;
550 not = pattern[0] == '^';
551 if (not) {
552 pattern++;
553 patternLen--;
554 }
555 match = 0;
556 while(1) {
557 if (pattern[0] == '\\') {
558 pattern++;
559 patternLen--;
560 if (pattern[0] == string[0])
561 match = 1;
562 } else if (pattern[0] == ']') {
563 break;
564 } else if (patternLen == 0) {
565 pattern--;
566 patternLen++;
567 break;
568 } else if (pattern[1] == '-' && patternLen >= 3) {
569 int start = pattern[0];
570 int end = pattern[2];
571 int c = string[0];
572 if (start > end) {
573 int t = start;
574 start = end;
575 end = t;
576 }
577 if (nocase) {
578 start = tolower(start);
579 end = tolower(end);
580 c = tolower(c);
581 }
582 pattern += 2;
583 patternLen -= 2;
584 if (c >= start && c <= end)
585 match = 1;
586 } else {
587 if (!nocase) {
588 if (pattern[0] == string[0])
589 match = 1;
590 } else {
591 if (tolower((int)pattern[0]) == tolower((int)string[0]))
592 match = 1;
593 }
594 }
595 pattern++;
596 patternLen--;
597 }
598 if (not)
599 match = !match;
600 if (!match)
601 return 0; /* no match */
602 string++;
603 stringLen--;
604 break;
605 }
606 case '\\':
607 if (patternLen >= 2) {
608 pattern++;
609 patternLen--;
610 }
611 /* fall through */
612 default:
613 if (!nocase) {
614 if (pattern[0] != string[0])
615 return 0; /* no match */
616 } else {
617 if (tolower((int)pattern[0]) != tolower((int)string[0]))
618 return 0; /* no match */
619 }
620 string++;
621 stringLen--;
622 break;
623 }
624 pattern++;
625 patternLen--;
626 if (stringLen == 0) {
627 while(*pattern == '*') {
628 pattern++;
629 patternLen--;
630 }
631 break;
632 }
633 }
634 if (patternLen == 0 && stringLen == 0)
635 return 1;
636 return 0;
637 }
638
639 static void redisLog(int level, const char *fmt, ...) {
640 va_list ap;
641 FILE *fp;
642
643 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
644 if (!fp) return;
645
646 va_start(ap, fmt);
647 if (level >= server.verbosity) {
648 char *c = ".-*";
649 char buf[64];
650 time_t now;
651
652 now = time(NULL);
653 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
654 fprintf(fp,"%s %c ",buf,c[level]);
655 vfprintf(fp, fmt, ap);
656 fprintf(fp,"\n");
657 fflush(fp);
658 }
659 va_end(ap);
660
661 if (server.logfile) fclose(fp);
662 }
663
664 /*====================== Hash table type implementation ==================== */
665
666 /* This is an hash table type that uses the SDS dynamic strings libary as
667 * keys and radis objects as values (objects can hold SDS strings,
668 * lists, sets). */
669
670 static void dictVanillaFree(void *privdata, void *val)
671 {
672 DICT_NOTUSED(privdata);
673 zfree(val);
674 }
675
676 static int sdsDictKeyCompare(void *privdata, const void *key1,
677 const void *key2)
678 {
679 int l1,l2;
680 DICT_NOTUSED(privdata);
681
682 l1 = sdslen((sds)key1);
683 l2 = sdslen((sds)key2);
684 if (l1 != l2) return 0;
685 return memcmp(key1, key2, l1) == 0;
686 }
687
688 static void dictRedisObjectDestructor(void *privdata, void *val)
689 {
690 DICT_NOTUSED(privdata);
691
692 decrRefCount(val);
693 }
694
695 static int dictObjKeyCompare(void *privdata, const void *key1,
696 const void *key2)
697 {
698 const robj *o1 = key1, *o2 = key2;
699 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
700 }
701
702 static unsigned int dictObjHash(const void *key) {
703 const robj *o = key;
704 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
705 }
706
707 static int dictEncObjKeyCompare(void *privdata, const void *key1,
708 const void *key2)
709 {
710 const robj *o1 = key1, *o2 = key2;
711
712 if (o1->encoding == REDIS_ENCODING_RAW &&
713 o2->encoding == REDIS_ENCODING_RAW)
714 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
715 else {
716 robj *dec1, *dec2;
717 int cmp;
718
719 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
720 getDecodedObject(o1) : (robj*)o1;
721 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
722 getDecodedObject(o2) : (robj*)o2;
723 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
724 if (dec1 != o1) decrRefCount(dec1);
725 if (dec2 != o2) decrRefCount(dec2);
726 return cmp;
727 }
728 }
729
730 static unsigned int dictEncObjHash(const void *key) {
731 const robj *o = key;
732
733 if (o->encoding == REDIS_ENCODING_RAW)
734 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
735 else {
736 robj *dec = getDecodedObject(o);
737 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
738 decrRefCount(dec);
739 return hash;
740 }
741 }
742
743 static dictType setDictType = {
744 dictEncObjHash, /* hash function */
745 NULL, /* key dup */
746 NULL, /* val dup */
747 dictEncObjKeyCompare, /* key compare */
748 dictRedisObjectDestructor, /* key destructor */
749 NULL /* val destructor */
750 };
751
752 static dictType zsetDictType = {
753 dictEncObjHash, /* hash function */
754 NULL, /* key dup */
755 NULL, /* val dup */
756 dictEncObjKeyCompare, /* key compare */
757 dictRedisObjectDestructor, /* key destructor */
758 dictVanillaFree /* val destructor */
759 };
760
761 static dictType hashDictType = {
762 dictObjHash, /* hash function */
763 NULL, /* key dup */
764 NULL, /* val dup */
765 dictObjKeyCompare, /* key compare */
766 dictRedisObjectDestructor, /* key destructor */
767 dictRedisObjectDestructor /* val destructor */
768 };
769
770 /* ========================= Random utility functions ======================= */
771
772 /* Redis generally does not try to recover from out of memory conditions
773 * when allocating objects or strings, it is not clear if it will be possible
774 * to report this condition to the client since the networking layer itself
775 * is based on heap allocation for send buffers, so we simply abort.
776 * At least the code will be simpler to read... */
777 static void oom(const char *msg) {
778 fprintf(stderr, "%s: Out of memory\n",msg);
779 fflush(stderr);
780 sleep(1);
781 abort();
782 }
783
784 /* ====================== Redis server networking stuff ===================== */
785 static void closeTimedoutClients(void) {
786 redisClient *c;
787 listNode *ln;
788 time_t now = time(NULL);
789
790 listRewind(server.clients);
791 while ((ln = listYield(server.clients)) != NULL) {
792 c = listNodeValue(ln);
793 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
794 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
795 (now - c->lastinteraction > server.maxidletime)) {
796 redisLog(REDIS_DEBUG,"Closing idle client");
797 freeClient(c);
798 }
799 }
800 }
801
802 static int htNeedsResize(dict *dict) {
803 long long size, used;
804
805 size = dictSlots(dict);
806 used = dictSize(dict);
807 return (size && used && size > DICT_HT_INITIAL_SIZE &&
808 (used*100/size < REDIS_HT_MINFILL));
809 }
810
811 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
812 * we resize the hash table to save memory */
813 static void tryResizeHashTables(void) {
814 int j;
815
816 for (j = 0; j < server.dbnum; j++) {
817 if (htNeedsResize(server.db[j].dict)) {
818 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
819 dictResize(server.db[j].dict);
820 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
821 }
822 if (htNeedsResize(server.db[j].expires))
823 dictResize(server.db[j].expires);
824 }
825 }
826
827 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
828 int j, loops = server.cronloops++;
829 REDIS_NOTUSED(eventLoop);
830 REDIS_NOTUSED(id);
831 REDIS_NOTUSED(clientData);
832
833 /* Update the global state with the amount of used memory */
834 server.usedmemory = zmalloc_used_memory();
835
836 /* Show some info about non-empty databases */
837 for (j = 0; j < server.dbnum; j++) {
838 long long size, used, vkeys;
839
840 size = dictSlots(server.db[j].dict);
841 used = dictSize(server.db[j].dict);
842 vkeys = dictSize(server.db[j].expires);
843 if (!(loops % 5) && (used || vkeys)) {
844 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
845 /* dictPrintStats(server.dict); */
846 }
847 }
848
849 /* We don't want to resize the hash tables while a bacground saving
850 * is in progress: the saving child is created using fork() that is
851 * implemented with a copy-on-write semantic in most modern systems, so
852 * if we resize the HT while there is the saving child at work actually
853 * a lot of memory movements in the parent will cause a lot of pages
854 * copied. */
855 if (!server.bgsaveinprogress) tryResizeHashTables();
856
857 /* Show information about connected clients */
858 if (!(loops % 5)) {
859 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
860 listLength(server.clients)-listLength(server.slaves),
861 listLength(server.slaves),
862 server.usedmemory,
863 dictSize(server.sharingpool));
864 }
865
866 /* Close connections of timedout clients */
867 if (server.maxidletime && !(loops % 10))
868 closeTimedoutClients();
869
870 /* Check if a background saving in progress terminated */
871 if (server.bgsaveinprogress) {
872 int statloc;
873 if (wait4(-1,&statloc,WNOHANG,NULL)) {
874 int exitcode = WEXITSTATUS(statloc);
875 int bysignal = WIFSIGNALED(statloc);
876
877 if (!bysignal && exitcode == 0) {
878 redisLog(REDIS_NOTICE,
879 "Background saving terminated with success");
880 server.dirty = 0;
881 server.lastsave = time(NULL);
882 } else if (!bysignal && exitcode != 0) {
883 redisLog(REDIS_WARNING, "Background saving error");
884 } else {
885 redisLog(REDIS_WARNING,
886 "Background saving terminated by signal");
887 rdbRemoveTempFile(server.bgsavechildpid);
888 }
889 server.bgsaveinprogress = 0;
890 server.bgsavechildpid = -1;
891 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
892 }
893 } else {
894 /* If there is not a background saving in progress check if
895 * we have to save now */
896 time_t now = time(NULL);
897 for (j = 0; j < server.saveparamslen; j++) {
898 struct saveparam *sp = server.saveparams+j;
899
900 if (server.dirty >= sp->changes &&
901 now-server.lastsave > sp->seconds) {
902 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
903 sp->changes, sp->seconds);
904 rdbSaveBackground(server.dbfilename);
905 break;
906 }
907 }
908 }
909
910 /* Try to expire a few timed out keys */
911 for (j = 0; j < server.dbnum; j++) {
912 redisDb *db = server.db+j;
913 int num = dictSize(db->expires);
914
915 if (num) {
916 time_t now = time(NULL);
917
918 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
919 num = REDIS_EXPIRELOOKUPS_PER_CRON;
920 while (num--) {
921 dictEntry *de;
922 time_t t;
923
924 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
925 t = (time_t) dictGetEntryVal(de);
926 if (now > t) {
927 deleteKey(db,dictGetEntryKey(de));
928 }
929 }
930 }
931 }
932
933 /* Check if we should connect to a MASTER */
934 if (server.replstate == REDIS_REPL_CONNECT) {
935 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
936 if (syncWithMaster() == REDIS_OK) {
937 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
938 }
939 }
940 return 1000;
941 }
942
943 static void createSharedObjects(void) {
944 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
945 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
946 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
947 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
948 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
949 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
950 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
951 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
952 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
953 /* no such key */
954 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
955 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
956 "-ERR Operation against a key holding the wrong kind of value\r\n"));
957 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
958 "-ERR no such key\r\n"));
959 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
960 "-ERR syntax error\r\n"));
961 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
962 "-ERR source and destination objects are the same\r\n"));
963 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
964 "-ERR index out of range\r\n"));
965 shared.space = createObject(REDIS_STRING,sdsnew(" "));
966 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
967 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
968 shared.select0 = createStringObject("select 0\r\n",10);
969 shared.select1 = createStringObject("select 1\r\n",10);
970 shared.select2 = createStringObject("select 2\r\n",10);
971 shared.select3 = createStringObject("select 3\r\n",10);
972 shared.select4 = createStringObject("select 4\r\n",10);
973 shared.select5 = createStringObject("select 5\r\n",10);
974 shared.select6 = createStringObject("select 6\r\n",10);
975 shared.select7 = createStringObject("select 7\r\n",10);
976 shared.select8 = createStringObject("select 8\r\n",10);
977 shared.select9 = createStringObject("select 9\r\n",10);
978 }
979
980 static void appendServerSaveParams(time_t seconds, int changes) {
981 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
982 server.saveparams[server.saveparamslen].seconds = seconds;
983 server.saveparams[server.saveparamslen].changes = changes;
984 server.saveparamslen++;
985 }
986
987 static void ResetServerSaveParams() {
988 zfree(server.saveparams);
989 server.saveparams = NULL;
990 server.saveparamslen = 0;
991 }
992
993 static void initServerConfig() {
994 server.dbnum = REDIS_DEFAULT_DBNUM;
995 server.port = REDIS_SERVERPORT;
996 server.verbosity = REDIS_DEBUG;
997 server.maxidletime = REDIS_MAXIDLETIME;
998 server.saveparams = NULL;
999 server.logfile = NULL; /* NULL = log on standard output */
1000 server.bindaddr = NULL;
1001 server.glueoutputbuf = 1;
1002 server.daemonize = 0;
1003 server.pidfile = "/var/run/redis.pid";
1004 server.dbfilename = "dump.rdb";
1005 server.requirepass = NULL;
1006 server.shareobjects = 0;
1007 server.sharingpoolsize = 1024;
1008 server.maxclients = 0;
1009 server.maxmemory = 0;
1010 ResetServerSaveParams();
1011
1012 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1013 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1014 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1015 /* Replication related */
1016 server.isslave = 0;
1017 server.masterhost = NULL;
1018 server.masterport = 6379;
1019 server.master = NULL;
1020 server.replstate = REDIS_REPL_NONE;
1021 }
1022
1023 static void initServer() {
1024 int j;
1025
1026 signal(SIGHUP, SIG_IGN);
1027 signal(SIGPIPE, SIG_IGN);
1028 setupSigSegvAction();
1029
1030 server.clients = listCreate();
1031 server.slaves = listCreate();
1032 server.monitors = listCreate();
1033 server.objfreelist = listCreate();
1034 createSharedObjects();
1035 server.el = aeCreateEventLoop();
1036 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1037 server.sharingpool = dictCreate(&setDictType,NULL);
1038 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1039 if (server.fd == -1) {
1040 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1041 exit(1);
1042 }
1043 for (j = 0; j < server.dbnum; j++) {
1044 server.db[j].dict = dictCreate(&hashDictType,NULL);
1045 server.db[j].expires = dictCreate(&setDictType,NULL);
1046 server.db[j].id = j;
1047 }
1048 server.cronloops = 0;
1049 server.bgsaveinprogress = 0;
1050 server.bgsavechildpid = -1;
1051 server.lastsave = time(NULL);
1052 server.dirty = 0;
1053 server.usedmemory = 0;
1054 server.stat_numcommands = 0;
1055 server.stat_numconnections = 0;
1056 server.stat_starttime = time(NULL);
1057 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1058 }
1059
1060 /* Empty the whole database */
1061 static long long emptyDb() {
1062 int j;
1063 long long removed = 0;
1064
1065 for (j = 0; j < server.dbnum; j++) {
1066 removed += dictSize(server.db[j].dict);
1067 dictEmpty(server.db[j].dict);
1068 dictEmpty(server.db[j].expires);
1069 }
1070 return removed;
1071 }
1072
1073 static int yesnotoi(char *s) {
1074 if (!strcasecmp(s,"yes")) return 1;
1075 else if (!strcasecmp(s,"no")) return 0;
1076 else return -1;
1077 }
1078
1079 /* I agree, this is a very rudimental way to load a configuration...
1080 will improve later if the config gets more complex */
1081 static void loadServerConfig(char *filename) {
1082 FILE *fp;
1083 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1084 int linenum = 0;
1085 sds line = NULL;
1086
1087 if (filename[0] == '-' && filename[1] == '\0')
1088 fp = stdin;
1089 else {
1090 if ((fp = fopen(filename,"r")) == NULL) {
1091 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1092 exit(1);
1093 }
1094 }
1095
1096 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1097 sds *argv;
1098 int argc, j;
1099
1100 linenum++;
1101 line = sdsnew(buf);
1102 line = sdstrim(line," \t\r\n");
1103
1104 /* Skip comments and blank lines*/
1105 if (line[0] == '#' || line[0] == '\0') {
1106 sdsfree(line);
1107 continue;
1108 }
1109
1110 /* Split into arguments */
1111 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1112 sdstolower(argv[0]);
1113
1114 /* Execute config directives */
1115 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1116 server.maxidletime = atoi(argv[1]);
1117 if (server.maxidletime < 0) {
1118 err = "Invalid timeout value"; goto loaderr;
1119 }
1120 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1121 server.port = atoi(argv[1]);
1122 if (server.port < 1 || server.port > 65535) {
1123 err = "Invalid port"; goto loaderr;
1124 }
1125 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1126 server.bindaddr = zstrdup(argv[1]);
1127 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1128 int seconds = atoi(argv[1]);
1129 int changes = atoi(argv[2]);
1130 if (seconds < 1 || changes < 0) {
1131 err = "Invalid save parameters"; goto loaderr;
1132 }
1133 appendServerSaveParams(seconds,changes);
1134 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1135 if (chdir(argv[1]) == -1) {
1136 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1137 argv[1], strerror(errno));
1138 exit(1);
1139 }
1140 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1141 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1142 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1143 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1144 else {
1145 err = "Invalid log level. Must be one of debug, notice, warning";
1146 goto loaderr;
1147 }
1148 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1149 FILE *logfp;
1150
1151 server.logfile = zstrdup(argv[1]);
1152 if (!strcasecmp(server.logfile,"stdout")) {
1153 zfree(server.logfile);
1154 server.logfile = NULL;
1155 }
1156 if (server.logfile) {
1157 /* Test if we are able to open the file. The server will not
1158 * be able to abort just for this problem later... */
1159 logfp = fopen(server.logfile,"a");
1160 if (logfp == NULL) {
1161 err = sdscatprintf(sdsempty(),
1162 "Can't open the log file: %s", strerror(errno));
1163 goto loaderr;
1164 }
1165 fclose(logfp);
1166 }
1167 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1168 server.dbnum = atoi(argv[1]);
1169 if (server.dbnum < 1) {
1170 err = "Invalid number of databases"; goto loaderr;
1171 }
1172 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1173 server.maxclients = atoi(argv[1]);
1174 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1175 server.maxmemory = strtoll(argv[1], NULL, 10);
1176 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1177 server.masterhost = sdsnew(argv[1]);
1178 server.masterport = atoi(argv[2]);
1179 server.replstate = REDIS_REPL_CONNECT;
1180 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1181 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1182 err = "argument must be 'yes' or 'no'"; goto loaderr;
1183 }
1184 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1185 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1186 err = "argument must be 'yes' or 'no'"; goto loaderr;
1187 }
1188 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1189 server.sharingpoolsize = atoi(argv[1]);
1190 if (server.sharingpoolsize < 1) {
1191 err = "invalid object sharing pool size"; goto loaderr;
1192 }
1193 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1194 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1195 err = "argument must be 'yes' or 'no'"; goto loaderr;
1196 }
1197 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1198 server.requirepass = zstrdup(argv[1]);
1199 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1200 server.pidfile = zstrdup(argv[1]);
1201 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1202 server.dbfilename = zstrdup(argv[1]);
1203 } else {
1204 err = "Bad directive or wrong number of arguments"; goto loaderr;
1205 }
1206 for (j = 0; j < argc; j++)
1207 sdsfree(argv[j]);
1208 zfree(argv);
1209 sdsfree(line);
1210 }
1211 if (fp != stdin) fclose(fp);
1212 return;
1213
1214 loaderr:
1215 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1216 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1217 fprintf(stderr, ">>> '%s'\n", line);
1218 fprintf(stderr, "%s\n", err);
1219 exit(1);
1220 }
1221
1222 static void freeClientArgv(redisClient *c) {
1223 int j;
1224
1225 for (j = 0; j < c->argc; j++)
1226 decrRefCount(c->argv[j]);
1227 for (j = 0; j < c->mbargc; j++)
1228 decrRefCount(c->mbargv[j]);
1229 c->argc = 0;
1230 c->mbargc = 0;
1231 }
1232
1233 static void freeClient(redisClient *c) {
1234 listNode *ln;
1235
1236 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1237 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1238 sdsfree(c->querybuf);
1239 listRelease(c->reply);
1240 freeClientArgv(c);
1241 close(c->fd);
1242 ln = listSearchKey(server.clients,c);
1243 assert(ln != NULL);
1244 listDelNode(server.clients,ln);
1245 if (c->flags & REDIS_SLAVE) {
1246 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1247 close(c->repldbfd);
1248 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1249 ln = listSearchKey(l,c);
1250 assert(ln != NULL);
1251 listDelNode(l,ln);
1252 }
1253 if (c->flags & REDIS_MASTER) {
1254 server.master = NULL;
1255 server.replstate = REDIS_REPL_CONNECT;
1256 }
1257 zfree(c->argv);
1258 zfree(c->mbargv);
1259 zfree(c);
1260 }
1261
1262 static void glueReplyBuffersIfNeeded(redisClient *c) {
1263 int totlen = 0;
1264 listNode *ln;
1265 robj *o;
1266
1267 listRewind(c->reply);
1268 while((ln = listYield(c->reply))) {
1269 o = ln->value;
1270 totlen += sdslen(o->ptr);
1271 /* This optimization makes more sense if we don't have to copy
1272 * too much data */
1273 if (totlen > 1024) return;
1274 }
1275 if (totlen > 0) {
1276 char buf[1024];
1277 int copylen = 0;
1278
1279 listRewind(c->reply);
1280 while((ln = listYield(c->reply))) {
1281 o = ln->value;
1282 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1283 copylen += sdslen(o->ptr);
1284 listDelNode(c->reply,ln);
1285 }
1286 /* Now the output buffer is empty, add the new single element */
1287 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1288 listAddNodeTail(c->reply,o);
1289 }
1290 }
1291
1292 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1293 redisClient *c = privdata;
1294 int nwritten = 0, totwritten = 0, objlen;
1295 robj *o;
1296 REDIS_NOTUSED(el);
1297 REDIS_NOTUSED(mask);
1298
1299 if (server.glueoutputbuf && listLength(c->reply) > 1)
1300 glueReplyBuffersIfNeeded(c);
1301 while(listLength(c->reply)) {
1302 o = listNodeValue(listFirst(c->reply));
1303 objlen = sdslen(o->ptr);
1304
1305 if (objlen == 0) {
1306 listDelNode(c->reply,listFirst(c->reply));
1307 continue;
1308 }
1309
1310 if (c->flags & REDIS_MASTER) {
1311 /* Don't reply to a master */
1312 nwritten = objlen - c->sentlen;
1313 } else {
1314 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1315 if (nwritten <= 0) break;
1316 }
1317 c->sentlen += nwritten;
1318 totwritten += nwritten;
1319 /* If we fully sent the object on head go to the next one */
1320 if (c->sentlen == objlen) {
1321 listDelNode(c->reply,listFirst(c->reply));
1322 c->sentlen = 0;
1323 }
1324 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1325 * bytes, in a single threaded server it's a good idea to server
1326 * other clients as well, even if a very large request comes from
1327 * super fast link that is always able to accept data (in real world
1328 * terms think to 'KEYS *' against the loopback interfae) */
1329 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1330 }
1331 if (nwritten == -1) {
1332 if (errno == EAGAIN) {
1333 nwritten = 0;
1334 } else {
1335 redisLog(REDIS_DEBUG,
1336 "Error writing to client: %s", strerror(errno));
1337 freeClient(c);
1338 return;
1339 }
1340 }
1341 if (totwritten > 0) c->lastinteraction = time(NULL);
1342 if (listLength(c->reply) == 0) {
1343 c->sentlen = 0;
1344 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1345 }
1346 }
1347
1348 static struct redisCommand *lookupCommand(char *name) {
1349 int j = 0;
1350 while(cmdTable[j].name != NULL) {
1351 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1352 j++;
1353 }
1354 return NULL;
1355 }
1356
1357 /* resetClient prepare the client to process the next command */
1358 static void resetClient(redisClient *c) {
1359 freeClientArgv(c);
1360 c->bulklen = -1;
1361 c->multibulk = 0;
1362 }
1363
1364 /* If this function gets called we already read a whole
1365 * command, argments are in the client argv/argc fields.
1366 * processCommand() execute the command or prepare the
1367 * server for a bulk read from the client.
1368 *
1369 * If 1 is returned the client is still alive and valid and
1370 * and other operations can be performed by the caller. Otherwise
1371 * if 0 is returned the client was destroied (i.e. after QUIT). */
1372 static int processCommand(redisClient *c) {
1373 struct redisCommand *cmd;
1374 long long dirty;
1375
1376 /* Free some memory if needed (maxmemory setting) */
1377 if (server.maxmemory) freeMemoryIfNeeded();
1378
1379 /* Handle the multi bulk command type. This is an alternative protocol
1380 * supported by Redis in order to receive commands that are composed of
1381 * multiple binary-safe "bulk" arguments. The latency of processing is
1382 * a bit higher but this allows things like multi-sets, so if this
1383 * protocol is used only for MSET and similar commands this is a big win. */
1384 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1385 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1386 if (c->multibulk <= 0) {
1387 resetClient(c);
1388 return 1;
1389 } else {
1390 decrRefCount(c->argv[c->argc-1]);
1391 c->argc--;
1392 return 1;
1393 }
1394 } else if (c->multibulk) {
1395 if (c->bulklen == -1) {
1396 if (((char*)c->argv[0]->ptr)[0] != '$') {
1397 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1398 resetClient(c);
1399 return 1;
1400 } else {
1401 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1402 decrRefCount(c->argv[0]);
1403 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1404 c->argc--;
1405 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1406 resetClient(c);
1407 return 1;
1408 }
1409 c->argc--;
1410 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1411 return 1;
1412 }
1413 } else {
1414 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1415 c->mbargv[c->mbargc] = c->argv[0];
1416 c->mbargc++;
1417 c->argc--;
1418 c->multibulk--;
1419 if (c->multibulk == 0) {
1420 robj **auxargv;
1421 int auxargc;
1422
1423 /* Here we need to swap the multi-bulk argc/argv with the
1424 * normal argc/argv of the client structure. */
1425 auxargv = c->argv;
1426 c->argv = c->mbargv;
1427 c->mbargv = auxargv;
1428
1429 auxargc = c->argc;
1430 c->argc = c->mbargc;
1431 c->mbargc = auxargc;
1432
1433 /* We need to set bulklen to something different than -1
1434 * in order for the code below to process the command without
1435 * to try to read the last argument of a bulk command as
1436 * a special argument. */
1437 c->bulklen = 0;
1438 /* continue below and process the command */
1439 } else {
1440 c->bulklen = -1;
1441 return 1;
1442 }
1443 }
1444 }
1445 /* -- end of multi bulk commands processing -- */
1446
1447 /* The QUIT command is handled as a special case. Normal command
1448 * procs are unable to close the client connection safely */
1449 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1450 freeClient(c);
1451 return 0;
1452 }
1453 cmd = lookupCommand(c->argv[0]->ptr);
1454 if (!cmd) {
1455 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1456 resetClient(c);
1457 return 1;
1458 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1459 (c->argc < -cmd->arity)) {
1460 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1461 resetClient(c);
1462 return 1;
1463 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1464 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1465 resetClient(c);
1466 return 1;
1467 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1468 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1469
1470 decrRefCount(c->argv[c->argc-1]);
1471 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1472 c->argc--;
1473 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1474 resetClient(c);
1475 return 1;
1476 }
1477 c->argc--;
1478 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1479 /* It is possible that the bulk read is already in the
1480 * buffer. Check this condition and handle it accordingly.
1481 * This is just a fast path, alternative to call processInputBuffer().
1482 * It's a good idea since the code is small and this condition
1483 * happens most of the times. */
1484 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1485 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1486 c->argc++;
1487 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1488 } else {
1489 return 1;
1490 }
1491 }
1492 /* Let's try to share objects on the command arguments vector */
1493 if (server.shareobjects) {
1494 int j;
1495 for(j = 1; j < c->argc; j++)
1496 c->argv[j] = tryObjectSharing(c->argv[j]);
1497 }
1498 /* Let's try to encode the bulk object to save space. */
1499 if (cmd->flags & REDIS_CMD_BULK)
1500 tryObjectEncoding(c->argv[c->argc-1]);
1501
1502 /* Check if the user is authenticated */
1503 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1504 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1505 resetClient(c);
1506 return 1;
1507 }
1508
1509 /* Exec the command */
1510 dirty = server.dirty;
1511 cmd->proc(c);
1512 if (server.dirty-dirty != 0 && listLength(server.slaves))
1513 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1514 if (listLength(server.monitors))
1515 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1516 server.stat_numcommands++;
1517
1518 /* Prepare the client for the next command */
1519 if (c->flags & REDIS_CLOSE) {
1520 freeClient(c);
1521 return 0;
1522 }
1523 resetClient(c);
1524 return 1;
1525 }
1526
1527 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1528 listNode *ln;
1529 int outc = 0, j;
1530 robj **outv;
1531 /* (args*2)+1 is enough room for args, spaces, newlines */
1532 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1533
1534 if (argc <= REDIS_STATIC_ARGS) {
1535 outv = static_outv;
1536 } else {
1537 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1538 }
1539
1540 for (j = 0; j < argc; j++) {
1541 if (j != 0) outv[outc++] = shared.space;
1542 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1543 robj *lenobj;
1544
1545 lenobj = createObject(REDIS_STRING,
1546 sdscatprintf(sdsempty(),"%d\r\n",
1547 stringObjectLen(argv[j])));
1548 lenobj->refcount = 0;
1549 outv[outc++] = lenobj;
1550 }
1551 outv[outc++] = argv[j];
1552 }
1553 outv[outc++] = shared.crlf;
1554
1555 /* Increment all the refcounts at start and decrement at end in order to
1556 * be sure to free objects if there is no slave in a replication state
1557 * able to be feed with commands */
1558 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1559 listRewind(slaves);
1560 while((ln = listYield(slaves))) {
1561 redisClient *slave = ln->value;
1562
1563 /* Don't feed slaves that are still waiting for BGSAVE to start */
1564 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1565
1566 /* Feed all the other slaves, MONITORs and so on */
1567 if (slave->slaveseldb != dictid) {
1568 robj *selectcmd;
1569
1570 switch(dictid) {
1571 case 0: selectcmd = shared.select0; break;
1572 case 1: selectcmd = shared.select1; break;
1573 case 2: selectcmd = shared.select2; break;
1574 case 3: selectcmd = shared.select3; break;
1575 case 4: selectcmd = shared.select4; break;
1576 case 5: selectcmd = shared.select5; break;
1577 case 6: selectcmd = shared.select6; break;
1578 case 7: selectcmd = shared.select7; break;
1579 case 8: selectcmd = shared.select8; break;
1580 case 9: selectcmd = shared.select9; break;
1581 default:
1582 selectcmd = createObject(REDIS_STRING,
1583 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1584 selectcmd->refcount = 0;
1585 break;
1586 }
1587 addReply(slave,selectcmd);
1588 slave->slaveseldb = dictid;
1589 }
1590 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1591 }
1592 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1593 if (outv != static_outv) zfree(outv);
1594 }
1595
1596 static void processInputBuffer(redisClient *c) {
1597 again:
1598 if (c->bulklen == -1) {
1599 /* Read the first line of the query */
1600 char *p = strchr(c->querybuf,'\n');
1601 size_t querylen;
1602
1603 if (p) {
1604 sds query, *argv;
1605 int argc, j;
1606
1607 query = c->querybuf;
1608 c->querybuf = sdsempty();
1609 querylen = 1+(p-(query));
1610 if (sdslen(query) > querylen) {
1611 /* leave data after the first line of the query in the buffer */
1612 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1613 }
1614 *p = '\0'; /* remove "\n" */
1615 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1616 sdsupdatelen(query);
1617
1618 /* Now we can split the query in arguments */
1619 if (sdslen(query) == 0) {
1620 /* Ignore empty query */
1621 sdsfree(query);
1622 return;
1623 }
1624 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1625 sdsfree(query);
1626
1627 if (c->argv) zfree(c->argv);
1628 c->argv = zmalloc(sizeof(robj*)*argc);
1629
1630 for (j = 0; j < argc; j++) {
1631 if (sdslen(argv[j])) {
1632 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1633 c->argc++;
1634 } else {
1635 sdsfree(argv[j]);
1636 }
1637 }
1638 zfree(argv);
1639 /* Execute the command. If the client is still valid
1640 * after processCommand() return and there is something
1641 * on the query buffer try to process the next command. */
1642 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1643 return;
1644 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1645 redisLog(REDIS_DEBUG, "Client protocol error");
1646 freeClient(c);
1647 return;
1648 }
1649 } else {
1650 /* Bulk read handling. Note that if we are at this point
1651 the client already sent a command terminated with a newline,
1652 we are reading the bulk data that is actually the last
1653 argument of the command. */
1654 int qbl = sdslen(c->querybuf);
1655
1656 if (c->bulklen <= qbl) {
1657 /* Copy everything but the final CRLF as final argument */
1658 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1659 c->argc++;
1660 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1661 /* Process the command. If the client is still valid after
1662 * the processing and there is more data in the buffer
1663 * try to parse it. */
1664 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1665 return;
1666 }
1667 }
1668 }
1669
1670 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1671 redisClient *c = (redisClient*) privdata;
1672 char buf[REDIS_IOBUF_LEN];
1673 int nread;
1674 REDIS_NOTUSED(el);
1675 REDIS_NOTUSED(mask);
1676
1677 nread = read(fd, buf, REDIS_IOBUF_LEN);
1678 if (nread == -1) {
1679 if (errno == EAGAIN) {
1680 nread = 0;
1681 } else {
1682 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1683 freeClient(c);
1684 return;
1685 }
1686 } else if (nread == 0) {
1687 redisLog(REDIS_DEBUG, "Client closed connection");
1688 freeClient(c);
1689 return;
1690 }
1691 if (nread) {
1692 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1693 c->lastinteraction = time(NULL);
1694 } else {
1695 return;
1696 }
1697 processInputBuffer(c);
1698 }
1699
1700 static int selectDb(redisClient *c, int id) {
1701 if (id < 0 || id >= server.dbnum)
1702 return REDIS_ERR;
1703 c->db = &server.db[id];
1704 return REDIS_OK;
1705 }
1706
1707 static void *dupClientReplyValue(void *o) {
1708 incrRefCount((robj*)o);
1709 return 0;
1710 }
1711
1712 static redisClient *createClient(int fd) {
1713 redisClient *c = zmalloc(sizeof(*c));
1714
1715 anetNonBlock(NULL,fd);
1716 anetTcpNoDelay(NULL,fd);
1717 if (!c) return NULL;
1718 selectDb(c,0);
1719 c->fd = fd;
1720 c->querybuf = sdsempty();
1721 c->argc = 0;
1722 c->argv = NULL;
1723 c->bulklen = -1;
1724 c->multibulk = 0;
1725 c->mbargc = 0;
1726 c->mbargv = NULL;
1727 c->sentlen = 0;
1728 c->flags = 0;
1729 c->lastinteraction = time(NULL);
1730 c->authenticated = 0;
1731 c->replstate = REDIS_REPL_NONE;
1732 c->reply = listCreate();
1733 listSetFreeMethod(c->reply,decrRefCount);
1734 listSetDupMethod(c->reply,dupClientReplyValue);
1735 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1736 readQueryFromClient, c, NULL) == AE_ERR) {
1737 freeClient(c);
1738 return NULL;
1739 }
1740 listAddNodeTail(server.clients,c);
1741 return c;
1742 }
1743
1744 static void addReply(redisClient *c, robj *obj) {
1745 if (listLength(c->reply) == 0 &&
1746 (c->replstate == REDIS_REPL_NONE ||
1747 c->replstate == REDIS_REPL_ONLINE) &&
1748 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1749 sendReplyToClient, c, NULL) == AE_ERR) return;
1750 if (obj->encoding != REDIS_ENCODING_RAW) {
1751 obj = getDecodedObject(obj);
1752 } else {
1753 incrRefCount(obj);
1754 }
1755 listAddNodeTail(c->reply,obj);
1756 }
1757
1758 static void addReplySds(redisClient *c, sds s) {
1759 robj *o = createObject(REDIS_STRING,s);
1760 addReply(c,o);
1761 decrRefCount(o);
1762 }
1763
1764 static void addReplyBulkLen(redisClient *c, robj *obj) {
1765 size_t len;
1766
1767 if (obj->encoding == REDIS_ENCODING_RAW) {
1768 len = sdslen(obj->ptr);
1769 } else {
1770 long n = (long)obj->ptr;
1771
1772 len = 1;
1773 if (n < 0) {
1774 len++;
1775 n = -n;
1776 }
1777 while((n = n/10) != 0) {
1778 len++;
1779 }
1780 }
1781 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1782 }
1783
1784 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1785 int cport, cfd;
1786 char cip[128];
1787 redisClient *c;
1788 REDIS_NOTUSED(el);
1789 REDIS_NOTUSED(mask);
1790 REDIS_NOTUSED(privdata);
1791
1792 cfd = anetAccept(server.neterr, fd, cip, &cport);
1793 if (cfd == AE_ERR) {
1794 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1795 return;
1796 }
1797 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1798 if ((c = createClient(cfd)) == NULL) {
1799 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1800 close(cfd); /* May be already closed, just ingore errors */
1801 return;
1802 }
1803 /* If maxclient directive is set and this is one client more... close the
1804 * connection. Note that we create the client instead to check before
1805 * for this condition, since now the socket is already set in nonblocking
1806 * mode and we can send an error for free using the Kernel I/O */
1807 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1808 char *err = "-ERR max number of clients reached\r\n";
1809
1810 /* That's a best effort error message, don't check write errors */
1811 (void) write(c->fd,err,strlen(err));
1812 freeClient(c);
1813 return;
1814 }
1815 server.stat_numconnections++;
1816 }
1817
1818 /* ======================= Redis objects implementation ===================== */
1819
1820 static robj *createObject(int type, void *ptr) {
1821 robj *o;
1822
1823 if (listLength(server.objfreelist)) {
1824 listNode *head = listFirst(server.objfreelist);
1825 o = listNodeValue(head);
1826 listDelNode(server.objfreelist,head);
1827 } else {
1828 o = zmalloc(sizeof(*o));
1829 }
1830 o->type = type;
1831 o->encoding = REDIS_ENCODING_RAW;
1832 o->ptr = ptr;
1833 o->refcount = 1;
1834 return o;
1835 }
1836
1837 static robj *createStringObject(char *ptr, size_t len) {
1838 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1839 }
1840
1841 static robj *createListObject(void) {
1842 list *l = listCreate();
1843
1844 listSetFreeMethod(l,decrRefCount);
1845 return createObject(REDIS_LIST,l);
1846 }
1847
1848 static robj *createSetObject(void) {
1849 dict *d = dictCreate(&setDictType,NULL);
1850 return createObject(REDIS_SET,d);
1851 }
1852
1853 static robj *createZsetObject(void) {
1854 zset *zs = zmalloc(sizeof(*zs));
1855
1856 zs->dict = dictCreate(&zsetDictType,NULL);
1857 zs->zsl = zslCreate();
1858 return createObject(REDIS_ZSET,zs);
1859 }
1860
1861 static void freeStringObject(robj *o) {
1862 if (o->encoding == REDIS_ENCODING_RAW) {
1863 sdsfree(o->ptr);
1864 }
1865 }
1866
1867 static void freeListObject(robj *o) {
1868 listRelease((list*) o->ptr);
1869 }
1870
1871 static void freeSetObject(robj *o) {
1872 dictRelease((dict*) o->ptr);
1873 }
1874
1875 static void freeZsetObject(robj *o) {
1876 zset *zs = o->ptr;
1877
1878 dictRelease(zs->dict);
1879 zslFree(zs->zsl);
1880 zfree(zs);
1881 }
1882
1883 static void freeHashObject(robj *o) {
1884 dictRelease((dict*) o->ptr);
1885 }
1886
1887 static void incrRefCount(robj *o) {
1888 o->refcount++;
1889 #ifdef DEBUG_REFCOUNT
1890 if (o->type == REDIS_STRING)
1891 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1892 #endif
1893 }
1894
1895 static void decrRefCount(void *obj) {
1896 robj *o = obj;
1897
1898 #ifdef DEBUG_REFCOUNT
1899 if (o->type == REDIS_STRING)
1900 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1901 #endif
1902 if (--(o->refcount) == 0) {
1903 switch(o->type) {
1904 case REDIS_STRING: freeStringObject(o); break;
1905 case REDIS_LIST: freeListObject(o); break;
1906 case REDIS_SET: freeSetObject(o); break;
1907 case REDIS_ZSET: freeZsetObject(o); break;
1908 case REDIS_HASH: freeHashObject(o); break;
1909 default: assert(0 != 0); break;
1910 }
1911 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1912 !listAddNodeHead(server.objfreelist,o))
1913 zfree(o);
1914 }
1915 }
1916
1917 static robj *lookupKey(redisDb *db, robj *key) {
1918 dictEntry *de = dictFind(db->dict,key);
1919 return de ? dictGetEntryVal(de) : NULL;
1920 }
1921
1922 static robj *lookupKeyRead(redisDb *db, robj *key) {
1923 expireIfNeeded(db,key);
1924 return lookupKey(db,key);
1925 }
1926
1927 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1928 deleteIfVolatile(db,key);
1929 return lookupKey(db,key);
1930 }
1931
1932 static int deleteKey(redisDb *db, robj *key) {
1933 int retval;
1934
1935 /* We need to protect key from destruction: after the first dictDelete()
1936 * it may happen that 'key' is no longer valid if we don't increment
1937 * it's count. This may happen when we get the object reference directly
1938 * from the hash table with dictRandomKey() or dict iterators */
1939 incrRefCount(key);
1940 if (dictSize(db->expires)) dictDelete(db->expires,key);
1941 retval = dictDelete(db->dict,key);
1942 decrRefCount(key);
1943
1944 return retval == DICT_OK;
1945 }
1946
1947 /* Try to share an object against the shared objects pool */
1948 static robj *tryObjectSharing(robj *o) {
1949 struct dictEntry *de;
1950 unsigned long c;
1951
1952 if (o == NULL || server.shareobjects == 0) return o;
1953
1954 assert(o->type == REDIS_STRING);
1955 de = dictFind(server.sharingpool,o);
1956 if (de) {
1957 robj *shared = dictGetEntryKey(de);
1958
1959 c = ((unsigned long) dictGetEntryVal(de))+1;
1960 dictGetEntryVal(de) = (void*) c;
1961 incrRefCount(shared);
1962 decrRefCount(o);
1963 return shared;
1964 } else {
1965 /* Here we are using a stream algorihtm: Every time an object is
1966 * shared we increment its count, everytime there is a miss we
1967 * recrement the counter of a random object. If this object reaches
1968 * zero we remove the object and put the current object instead. */
1969 if (dictSize(server.sharingpool) >=
1970 server.sharingpoolsize) {
1971 de = dictGetRandomKey(server.sharingpool);
1972 assert(de != NULL);
1973 c = ((unsigned long) dictGetEntryVal(de))-1;
1974 dictGetEntryVal(de) = (void*) c;
1975 if (c == 0) {
1976 dictDelete(server.sharingpool,de->key);
1977 }
1978 } else {
1979 c = 0; /* If the pool is empty we want to add this object */
1980 }
1981 if (c == 0) {
1982 int retval;
1983
1984 retval = dictAdd(server.sharingpool,o,(void*)1);
1985 assert(retval == DICT_OK);
1986 incrRefCount(o);
1987 }
1988 return o;
1989 }
1990 }
1991
1992 /* Check if the nul-terminated string 's' can be represented by a long
1993 * (that is, is a number that fits into long without any other space or
1994 * character before or after the digits).
1995 *
1996 * If so, the function returns REDIS_OK and *longval is set to the value
1997 * of the number. Otherwise REDIS_ERR is returned */
1998 static int isStringRepresentableAsLong(sds s, long *longval) {
1999 char buf[32], *endptr;
2000 long value;
2001 int slen;
2002
2003 value = strtol(s, &endptr, 10);
2004 if (endptr[0] != '\0') return REDIS_ERR;
2005 slen = snprintf(buf,32,"%ld",value);
2006
2007 /* If the number converted back into a string is not identical
2008 * then it's not possible to encode the string as integer */
2009 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2010 if (longval) *longval = value;
2011 return REDIS_OK;
2012 }
2013
2014 /* Try to encode a string object in order to save space */
2015 static int tryObjectEncoding(robj *o) {
2016 long value;
2017 sds s = o->ptr;
2018
2019 if (o->encoding != REDIS_ENCODING_RAW)
2020 return REDIS_ERR; /* Already encoded */
2021
2022 /* It's not save to encode shared objects: shared objects can be shared
2023 * everywhere in the "object space" of Redis. Encoded objects can only
2024 * appear as "values" (and not, for instance, as keys) */
2025 if (o->refcount > 1) return REDIS_ERR;
2026
2027 /* Currently we try to encode only strings */
2028 assert(o->type == REDIS_STRING);
2029
2030 /* Check if we can represent this string as a long integer */
2031 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2032
2033 /* Ok, this object can be encoded */
2034 o->encoding = REDIS_ENCODING_INT;
2035 sdsfree(o->ptr);
2036 o->ptr = (void*) value;
2037 return REDIS_OK;
2038 }
2039
2040 /* Get a decoded version of an encoded object (returned as a new object) */
2041 static robj *getDecodedObject(const robj *o) {
2042 robj *dec;
2043
2044 assert(o->encoding != REDIS_ENCODING_RAW);
2045 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2046 char buf[32];
2047
2048 snprintf(buf,32,"%ld",(long)o->ptr);
2049 dec = createStringObject(buf,strlen(buf));
2050 return dec;
2051 } else {
2052 assert(1 != 1);
2053 }
2054 }
2055
2056 static int compareStringObjects(robj *a, robj *b) {
2057 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2058
2059 if (a == b) return 0;
2060 if (a->encoding == REDIS_ENCODING_INT && b->encoding == REDIS_ENCODING_INT){
2061 return (long)a->ptr - (long)b->ptr;
2062 } else {
2063 int retval;
2064
2065 incrRefCount(a);
2066 incrRefCount(b);
2067 if (a->encoding != REDIS_ENCODING_RAW) a = getDecodedObject(a);
2068 if (b->encoding != REDIS_ENCODING_RAW) b = getDecodedObject(a);
2069 retval = sdscmp(a->ptr,b->ptr);
2070 decrRefCount(a);
2071 decrRefCount(b);
2072 return retval;
2073 }
2074 }
2075
2076 static size_t stringObjectLen(robj *o) {
2077 assert(o->type == REDIS_STRING);
2078 if (o->encoding == REDIS_ENCODING_RAW) {
2079 return sdslen(o->ptr);
2080 } else {
2081 char buf[32];
2082
2083 return snprintf(buf,32,"%ld",(long)o->ptr);
2084 }
2085 }
2086
2087 /*============================ DB saving/loading ============================ */
2088
2089 static int rdbSaveType(FILE *fp, unsigned char type) {
2090 if (fwrite(&type,1,1,fp) == 0) return -1;
2091 return 0;
2092 }
2093
2094 static int rdbSaveTime(FILE *fp, time_t t) {
2095 int32_t t32 = (int32_t) t;
2096 if (fwrite(&t32,4,1,fp) == 0) return -1;
2097 return 0;
2098 }
2099
2100 /* check rdbLoadLen() comments for more info */
2101 static int rdbSaveLen(FILE *fp, uint32_t len) {
2102 unsigned char buf[2];
2103
2104 if (len < (1<<6)) {
2105 /* Save a 6 bit len */
2106 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2107 if (fwrite(buf,1,1,fp) == 0) return -1;
2108 } else if (len < (1<<14)) {
2109 /* Save a 14 bit len */
2110 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2111 buf[1] = len&0xFF;
2112 if (fwrite(buf,2,1,fp) == 0) return -1;
2113 } else {
2114 /* Save a 32 bit len */
2115 buf[0] = (REDIS_RDB_32BITLEN<<6);
2116 if (fwrite(buf,1,1,fp) == 0) return -1;
2117 len = htonl(len);
2118 if (fwrite(&len,4,1,fp) == 0) return -1;
2119 }
2120 return 0;
2121 }
2122
2123 /* String objects in the form "2391" "-100" without any space and with a
2124 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2125 * encoded as integers to save space */
2126 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2127 long long value;
2128 char *endptr, buf[32];
2129
2130 /* Check if it's possible to encode this value as a number */
2131 value = strtoll(s, &endptr, 10);
2132 if (endptr[0] != '\0') return 0;
2133 snprintf(buf,32,"%lld",value);
2134
2135 /* If the number converted back into a string is not identical
2136 * then it's not possible to encode the string as integer */
2137 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2138
2139 /* Finally check if it fits in our ranges */
2140 if (value >= -(1<<7) && value <= (1<<7)-1) {
2141 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2142 enc[1] = value&0xFF;
2143 return 2;
2144 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2145 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2146 enc[1] = value&0xFF;
2147 enc[2] = (value>>8)&0xFF;
2148 return 3;
2149 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2150 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2151 enc[1] = value&0xFF;
2152 enc[2] = (value>>8)&0xFF;
2153 enc[3] = (value>>16)&0xFF;
2154 enc[4] = (value>>24)&0xFF;
2155 return 5;
2156 } else {
2157 return 0;
2158 }
2159 }
2160
2161 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2162 unsigned int comprlen, outlen;
2163 unsigned char byte;
2164 void *out;
2165
2166 /* We require at least four bytes compression for this to be worth it */
2167 outlen = sdslen(obj->ptr)-4;
2168 if (outlen <= 0) return 0;
2169 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2170 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2171 if (comprlen == 0) {
2172 zfree(out);
2173 return 0;
2174 }
2175 /* Data compressed! Let's save it on disk */
2176 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2177 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2178 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2179 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2180 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2181 zfree(out);
2182 return comprlen;
2183
2184 writeerr:
2185 zfree(out);
2186 return -1;
2187 }
2188
2189 /* Save a string objet as [len][data] on disk. If the object is a string
2190 * representation of an integer value we try to safe it in a special form */
2191 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2192 size_t len;
2193 int enclen;
2194
2195 len = sdslen(obj->ptr);
2196
2197 /* Try integer encoding */
2198 if (len <= 11) {
2199 unsigned char buf[5];
2200 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2201 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2202 return 0;
2203 }
2204 }
2205
2206 /* Try LZF compression - under 20 bytes it's unable to compress even
2207 * aaaaaaaaaaaaaaaaaa so skip it */
2208 if (len > 20) {
2209 int retval;
2210
2211 retval = rdbSaveLzfStringObject(fp,obj);
2212 if (retval == -1) return -1;
2213 if (retval > 0) return 0;
2214 /* retval == 0 means data can't be compressed, save the old way */
2215 }
2216
2217 /* Store verbatim */
2218 if (rdbSaveLen(fp,len) == -1) return -1;
2219 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2220 return 0;
2221 }
2222
2223 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2224 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2225 int retval;
2226 robj *dec;
2227
2228 if (obj->encoding != REDIS_ENCODING_RAW) {
2229 dec = getDecodedObject(obj);
2230 retval = rdbSaveStringObjectRaw(fp,dec);
2231 decrRefCount(dec);
2232 return retval;
2233 } else {
2234 return rdbSaveStringObjectRaw(fp,obj);
2235 }
2236 }
2237
2238 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2239 static int rdbSave(char *filename) {
2240 dictIterator *di = NULL;
2241 dictEntry *de;
2242 FILE *fp;
2243 char tmpfile[256];
2244 int j;
2245 time_t now = time(NULL);
2246
2247 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2248 fp = fopen(tmpfile,"w");
2249 if (!fp) {
2250 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2251 return REDIS_ERR;
2252 }
2253 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2254 for (j = 0; j < server.dbnum; j++) {
2255 redisDb *db = server.db+j;
2256 dict *d = db->dict;
2257 if (dictSize(d) == 0) continue;
2258 di = dictGetIterator(d);
2259 if (!di) {
2260 fclose(fp);
2261 return REDIS_ERR;
2262 }
2263
2264 /* Write the SELECT DB opcode */
2265 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2266 if (rdbSaveLen(fp,j) == -1) goto werr;
2267
2268 /* Iterate this DB writing every entry */
2269 while((de = dictNext(di)) != NULL) {
2270 robj *key = dictGetEntryKey(de);
2271 robj *o = dictGetEntryVal(de);
2272 time_t expiretime = getExpire(db,key);
2273
2274 /* Save the expire time */
2275 if (expiretime != -1) {
2276 /* If this key is already expired skip it */
2277 if (expiretime < now) continue;
2278 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2279 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2280 }
2281 /* Save the key and associated value */
2282 if (rdbSaveType(fp,o->type) == -1) goto werr;
2283 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2284 if (o->type == REDIS_STRING) {
2285 /* Save a string value */
2286 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2287 } else if (o->type == REDIS_LIST) {
2288 /* Save a list value */
2289 list *list = o->ptr;
2290 listNode *ln;
2291
2292 listRewind(list);
2293 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2294 while((ln = listYield(list))) {
2295 robj *eleobj = listNodeValue(ln);
2296
2297 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2298 }
2299 } else if (o->type == REDIS_SET) {
2300 /* Save a set value */
2301 dict *set = o->ptr;
2302 dictIterator *di = dictGetIterator(set);
2303 dictEntry *de;
2304
2305 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2306 while((de = dictNext(di)) != NULL) {
2307 robj *eleobj = dictGetEntryKey(de);
2308
2309 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2310 }
2311 dictReleaseIterator(di);
2312 } else {
2313 assert(0 != 0);
2314 }
2315 }
2316 dictReleaseIterator(di);
2317 }
2318 /* EOF opcode */
2319 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2320
2321 /* Make sure data will not remain on the OS's output buffers */
2322 fflush(fp);
2323 fsync(fileno(fp));
2324 fclose(fp);
2325
2326 /* Use RENAME to make sure the DB file is changed atomically only
2327 * if the generate DB file is ok. */
2328 if (rename(tmpfile,filename) == -1) {
2329 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
2330 unlink(tmpfile);
2331 return REDIS_ERR;
2332 }
2333 redisLog(REDIS_NOTICE,"DB saved on disk");
2334 server.dirty = 0;
2335 server.lastsave = time(NULL);
2336 return REDIS_OK;
2337
2338 werr:
2339 fclose(fp);
2340 unlink(tmpfile);
2341 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2342 if (di) dictReleaseIterator(di);
2343 return REDIS_ERR;
2344 }
2345
2346 static int rdbSaveBackground(char *filename) {
2347 pid_t childpid;
2348
2349 if (server.bgsaveinprogress) return REDIS_ERR;
2350 if ((childpid = fork()) == 0) {
2351 /* Child */
2352 close(server.fd);
2353 if (rdbSave(filename) == REDIS_OK) {
2354 exit(0);
2355 } else {
2356 exit(1);
2357 }
2358 } else {
2359 /* Parent */
2360 if (childpid == -1) {
2361 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2362 strerror(errno));
2363 return REDIS_ERR;
2364 }
2365 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2366 server.bgsaveinprogress = 1;
2367 server.bgsavechildpid = childpid;
2368 return REDIS_OK;
2369 }
2370 return REDIS_OK; /* unreached */
2371 }
2372
2373 static void rdbRemoveTempFile(pid_t childpid) {
2374 char tmpfile[256];
2375
2376 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2377 unlink(tmpfile);
2378 }
2379
2380 static int rdbLoadType(FILE *fp) {
2381 unsigned char type;
2382 if (fread(&type,1,1,fp) == 0) return -1;
2383 return type;
2384 }
2385
2386 static time_t rdbLoadTime(FILE *fp) {
2387 int32_t t32;
2388 if (fread(&t32,4,1,fp) == 0) return -1;
2389 return (time_t) t32;
2390 }
2391
2392 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2393 * of this file for a description of how this are stored on disk.
2394 *
2395 * isencoded is set to 1 if the readed length is not actually a length but
2396 * an "encoding type", check the above comments for more info */
2397 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2398 unsigned char buf[2];
2399 uint32_t len;
2400
2401 if (isencoded) *isencoded = 0;
2402 if (rdbver == 0) {
2403 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2404 return ntohl(len);
2405 } else {
2406 int type;
2407
2408 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2409 type = (buf[0]&0xC0)>>6;
2410 if (type == REDIS_RDB_6BITLEN) {
2411 /* Read a 6 bit len */
2412 return buf[0]&0x3F;
2413 } else if (type == REDIS_RDB_ENCVAL) {
2414 /* Read a 6 bit len encoding type */
2415 if (isencoded) *isencoded = 1;
2416 return buf[0]&0x3F;
2417 } else if (type == REDIS_RDB_14BITLEN) {
2418 /* Read a 14 bit len */
2419 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2420 return ((buf[0]&0x3F)<<8)|buf[1];
2421 } else {
2422 /* Read a 32 bit len */
2423 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2424 return ntohl(len);
2425 }
2426 }
2427 }
2428
2429 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2430 unsigned char enc[4];
2431 long long val;
2432
2433 if (enctype == REDIS_RDB_ENC_INT8) {
2434 if (fread(enc,1,1,fp) == 0) return NULL;
2435 val = (signed char)enc[0];
2436 } else if (enctype == REDIS_RDB_ENC_INT16) {
2437 uint16_t v;
2438 if (fread(enc,2,1,fp) == 0) return NULL;
2439 v = enc[0]|(enc[1]<<8);
2440 val = (int16_t)v;
2441 } else if (enctype == REDIS_RDB_ENC_INT32) {
2442 uint32_t v;
2443 if (fread(enc,4,1,fp) == 0) return NULL;
2444 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2445 val = (int32_t)v;
2446 } else {
2447 val = 0; /* anti-warning */
2448 assert(0!=0);
2449 }
2450 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2451 }
2452
2453 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2454 unsigned int len, clen;
2455 unsigned char *c = NULL;
2456 sds val = NULL;
2457
2458 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2459 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2460 if ((c = zmalloc(clen)) == NULL) goto err;
2461 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2462 if (fread(c,clen,1,fp) == 0) goto err;
2463 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2464 zfree(c);
2465 return createObject(REDIS_STRING,val);
2466 err:
2467 zfree(c);
2468 sdsfree(val);
2469 return NULL;
2470 }
2471
2472 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2473 int isencoded;
2474 uint32_t len;
2475 sds val;
2476
2477 len = rdbLoadLen(fp,rdbver,&isencoded);
2478 if (isencoded) {
2479 switch(len) {
2480 case REDIS_RDB_ENC_INT8:
2481 case REDIS_RDB_ENC_INT16:
2482 case REDIS_RDB_ENC_INT32:
2483 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2484 case REDIS_RDB_ENC_LZF:
2485 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2486 default:
2487 assert(0!=0);
2488 }
2489 }
2490
2491 if (len == REDIS_RDB_LENERR) return NULL;
2492 val = sdsnewlen(NULL,len);
2493 if (len && fread(val,len,1,fp) == 0) {
2494 sdsfree(val);
2495 return NULL;
2496 }
2497 return tryObjectSharing(createObject(REDIS_STRING,val));
2498 }
2499
2500 static int rdbLoad(char *filename) {
2501 FILE *fp;
2502 robj *keyobj = NULL;
2503 uint32_t dbid;
2504 int type, retval, rdbver;
2505 dict *d = server.db[0].dict;
2506 redisDb *db = server.db+0;
2507 char buf[1024];
2508 time_t expiretime = -1, now = time(NULL);
2509
2510 fp = fopen(filename,"r");
2511 if (!fp) return REDIS_ERR;
2512 if (fread(buf,9,1,fp) == 0) goto eoferr;
2513 buf[9] = '\0';
2514 if (memcmp(buf,"REDIS",5) != 0) {
2515 fclose(fp);
2516 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2517 return REDIS_ERR;
2518 }
2519 rdbver = atoi(buf+5);
2520 if (rdbver > 1) {
2521 fclose(fp);
2522 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2523 return REDIS_ERR;
2524 }
2525 while(1) {
2526 robj *o;
2527
2528 /* Read type. */
2529 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2530 if (type == REDIS_EXPIRETIME) {
2531 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2532 /* We read the time so we need to read the object type again */
2533 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2534 }
2535 if (type == REDIS_EOF) break;
2536 /* Handle SELECT DB opcode as a special case */
2537 if (type == REDIS_SELECTDB) {
2538 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2539 goto eoferr;
2540 if (dbid >= (unsigned)server.dbnum) {
2541 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2542 exit(1);
2543 }
2544 db = server.db+dbid;
2545 d = db->dict;
2546 continue;
2547 }
2548 /* Read key */
2549 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2550
2551 if (type == REDIS_STRING) {
2552 /* Read string value */
2553 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2554 tryObjectEncoding(o);
2555 } else if (type == REDIS_LIST || type == REDIS_SET) {
2556 /* Read list/set value */
2557 uint32_t listlen;
2558
2559 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2560 goto eoferr;
2561 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2562 /* Load every single element of the list/set */
2563 while(listlen--) {
2564 robj *ele;
2565
2566 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2567 tryObjectEncoding(ele);
2568 if (type == REDIS_LIST) {
2569 listAddNodeTail((list*)o->ptr,ele);
2570 } else {
2571 dictAdd((dict*)o->ptr,ele,NULL);
2572 }
2573 }
2574 } else {
2575 assert(0 != 0);
2576 }
2577 /* Add the new object in the hash table */
2578 retval = dictAdd(d,keyobj,o);
2579 if (retval == DICT_ERR) {
2580 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2581 exit(1);
2582 }
2583 /* Set the expire time if needed */
2584 if (expiretime != -1) {
2585 setExpire(db,keyobj,expiretime);
2586 /* Delete this key if already expired */
2587 if (expiretime < now) deleteKey(db,keyobj);
2588 expiretime = -1;
2589 }
2590 keyobj = o = NULL;
2591 }
2592 fclose(fp);
2593 return REDIS_OK;
2594
2595 eoferr: /* unexpected end of file is handled here with a fatal exit */
2596 if (keyobj) decrRefCount(keyobj);
2597 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2598 exit(1);
2599 return REDIS_ERR; /* Just to avoid warning */
2600 }
2601
2602 /*================================== Commands =============================== */
2603
2604 static void authCommand(redisClient *c) {
2605 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2606 c->authenticated = 1;
2607 addReply(c,shared.ok);
2608 } else {
2609 c->authenticated = 0;
2610 addReply(c,shared.err);
2611 }
2612 }
2613
2614 static void pingCommand(redisClient *c) {
2615 addReply(c,shared.pong);
2616 }
2617
2618 static void echoCommand(redisClient *c) {
2619 addReplyBulkLen(c,c->argv[1]);
2620 addReply(c,c->argv[1]);
2621 addReply(c,shared.crlf);
2622 }
2623
2624 /*=================================== Strings =============================== */
2625
2626 static void setGenericCommand(redisClient *c, int nx) {
2627 int retval;
2628
2629 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2630 if (retval == DICT_ERR) {
2631 if (!nx) {
2632 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2633 incrRefCount(c->argv[2]);
2634 } else {
2635 addReply(c,shared.czero);
2636 return;
2637 }
2638 } else {
2639 incrRefCount(c->argv[1]);
2640 incrRefCount(c->argv[2]);
2641 }
2642 server.dirty++;
2643 removeExpire(c->db,c->argv[1]);
2644 addReply(c, nx ? shared.cone : shared.ok);
2645 }
2646
2647 static void setCommand(redisClient *c) {
2648 setGenericCommand(c,0);
2649 }
2650
2651 static void setnxCommand(redisClient *c) {
2652 setGenericCommand(c,1);
2653 }
2654
2655 static void getCommand(redisClient *c) {
2656 robj *o = lookupKeyRead(c->db,c->argv[1]);
2657
2658 if (o == NULL) {
2659 addReply(c,shared.nullbulk);
2660 } else {
2661 if (o->type != REDIS_STRING) {
2662 addReply(c,shared.wrongtypeerr);
2663 } else {
2664 addReplyBulkLen(c,o);
2665 addReply(c,o);
2666 addReply(c,shared.crlf);
2667 }
2668 }
2669 }
2670
2671 static void getsetCommand(redisClient *c) {
2672 getCommand(c);
2673 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2674 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2675 } else {
2676 incrRefCount(c->argv[1]);
2677 }
2678 incrRefCount(c->argv[2]);
2679 server.dirty++;
2680 removeExpire(c->db,c->argv[1]);
2681 }
2682
2683 static void mgetCommand(redisClient *c) {
2684 int j;
2685
2686 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2687 for (j = 1; j < c->argc; j++) {
2688 robj *o = lookupKeyRead(c->db,c->argv[j]);
2689 if (o == NULL) {
2690 addReply(c,shared.nullbulk);
2691 } else {
2692 if (o->type != REDIS_STRING) {
2693 addReply(c,shared.nullbulk);
2694 } else {
2695 addReplyBulkLen(c,o);
2696 addReply(c,o);
2697 addReply(c,shared.crlf);
2698 }
2699 }
2700 }
2701 }
2702
2703 static void incrDecrCommand(redisClient *c, long long incr) {
2704 long long value;
2705 int retval;
2706 robj *o;
2707
2708 o = lookupKeyWrite(c->db,c->argv[1]);
2709 if (o == NULL) {
2710 value = 0;
2711 } else {
2712 if (o->type != REDIS_STRING) {
2713 value = 0;
2714 } else {
2715 char *eptr;
2716
2717 if (o->encoding == REDIS_ENCODING_RAW)
2718 value = strtoll(o->ptr, &eptr, 10);
2719 else if (o->encoding == REDIS_ENCODING_INT)
2720 value = (long)o->ptr;
2721 else
2722 assert(1 != 1);
2723 }
2724 }
2725
2726 value += incr;
2727 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2728 tryObjectEncoding(o);
2729 retval = dictAdd(c->db->dict,c->argv[1],o);
2730 if (retval == DICT_ERR) {
2731 dictReplace(c->db->dict,c->argv[1],o);
2732 removeExpire(c->db,c->argv[1]);
2733 } else {
2734 incrRefCount(c->argv[1]);
2735 }
2736 server.dirty++;
2737 addReply(c,shared.colon);
2738 addReply(c,o);
2739 addReply(c,shared.crlf);
2740 }
2741
2742 static void incrCommand(redisClient *c) {
2743 incrDecrCommand(c,1);
2744 }
2745
2746 static void decrCommand(redisClient *c) {
2747 incrDecrCommand(c,-1);
2748 }
2749
2750 static void incrbyCommand(redisClient *c) {
2751 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2752 incrDecrCommand(c,incr);
2753 }
2754
2755 static void decrbyCommand(redisClient *c) {
2756 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2757 incrDecrCommand(c,-incr);
2758 }
2759
2760 /* ========================= Type agnostic commands ========================= */
2761
2762 static void delCommand(redisClient *c) {
2763 int deleted = 0, j;
2764
2765 for (j = 1; j < c->argc; j++) {
2766 if (deleteKey(c->db,c->argv[j])) {
2767 server.dirty++;
2768 deleted++;
2769 }
2770 }
2771 switch(deleted) {
2772 case 0:
2773 addReply(c,shared.czero);
2774 break;
2775 case 1:
2776 addReply(c,shared.cone);
2777 break;
2778 default:
2779 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2780 break;
2781 }
2782 }
2783
2784 static void existsCommand(redisClient *c) {
2785 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2786 }
2787
2788 static void selectCommand(redisClient *c) {
2789 int id = atoi(c->argv[1]->ptr);
2790
2791 if (selectDb(c,id) == REDIS_ERR) {
2792 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2793 } else {
2794 addReply(c,shared.ok);
2795 }
2796 }
2797
2798 static void randomkeyCommand(redisClient *c) {
2799 dictEntry *de;
2800
2801 while(1) {
2802 de = dictGetRandomKey(c->db->dict);
2803 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2804 }
2805 if (de == NULL) {
2806 addReply(c,shared.plus);
2807 addReply(c,shared.crlf);
2808 } else {
2809 addReply(c,shared.plus);
2810 addReply(c,dictGetEntryKey(de));
2811 addReply(c,shared.crlf);
2812 }
2813 }
2814
2815 static void keysCommand(redisClient *c) {
2816 dictIterator *di;
2817 dictEntry *de;
2818 sds pattern = c->argv[1]->ptr;
2819 int plen = sdslen(pattern);
2820 int numkeys = 0, keyslen = 0;
2821 robj *lenobj = createObject(REDIS_STRING,NULL);
2822
2823 di = dictGetIterator(c->db->dict);
2824 addReply(c,lenobj);
2825 decrRefCount(lenobj);
2826 while((de = dictNext(di)) != NULL) {
2827 robj *keyobj = dictGetEntryKey(de);
2828
2829 sds key = keyobj->ptr;
2830 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2831 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2832 if (expireIfNeeded(c->db,keyobj) == 0) {
2833 if (numkeys != 0)
2834 addReply(c,shared.space);
2835 addReply(c,keyobj);
2836 numkeys++;
2837 keyslen += sdslen(key);
2838 }
2839 }
2840 }
2841 dictReleaseIterator(di);
2842 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2843 addReply(c,shared.crlf);
2844 }
2845
2846 static void dbsizeCommand(redisClient *c) {
2847 addReplySds(c,
2848 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2849 }
2850
2851 static void lastsaveCommand(redisClient *c) {
2852 addReplySds(c,
2853 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2854 }
2855
2856 static void typeCommand(redisClient *c) {
2857 robj *o;
2858 char *type;
2859
2860 o = lookupKeyRead(c->db,c->argv[1]);
2861 if (o == NULL) {
2862 type = "+none";
2863 } else {
2864 switch(o->type) {
2865 case REDIS_STRING: type = "+string"; break;
2866 case REDIS_LIST: type = "+list"; break;
2867 case REDIS_SET: type = "+set"; break;
2868 default: type = "unknown"; break;
2869 }
2870 }
2871 addReplySds(c,sdsnew(type));
2872 addReply(c,shared.crlf);
2873 }
2874
2875 static void saveCommand(redisClient *c) {
2876 if (server.bgsaveinprogress) {
2877 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2878 return;
2879 }
2880 if (rdbSave(server.dbfilename) == REDIS_OK) {
2881 addReply(c,shared.ok);
2882 } else {
2883 addReply(c,shared.err);
2884 }
2885 }
2886
2887 static void bgsaveCommand(redisClient *c) {
2888 if (server.bgsaveinprogress) {
2889 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2890 return;
2891 }
2892 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2893 addReply(c,shared.ok);
2894 } else {
2895 addReply(c,shared.err);
2896 }
2897 }
2898
2899 static void shutdownCommand(redisClient *c) {
2900 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2901 /* Kill the saving child if there is a background saving in progress.
2902 We want to avoid race conditions, for instance our saving child may
2903 overwrite the synchronous saving did by SHUTDOWN. */
2904 if (server.bgsaveinprogress) {
2905 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2906 kill(server.bgsavechildpid,SIGKILL);
2907 rdbRemoveTempFile(server.bgsavechildpid);
2908 }
2909 /* SYNC SAVE */
2910 if (rdbSave(server.dbfilename) == REDIS_OK) {
2911 if (server.daemonize)
2912 unlink(server.pidfile);
2913 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2914 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2915 exit(1);
2916 } else {
2917 /* Ooops.. error saving! The best we can do is to continue operating.
2918 * Note that if there was a background saving process, in the next
2919 * cron() Redis will be notified that the background saving aborted,
2920 * handling special stuff like slaves pending for synchronization... */
2921 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2922 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2923 }
2924 }
2925
2926 static void renameGenericCommand(redisClient *c, int nx) {
2927 robj *o;
2928
2929 /* To use the same key as src and dst is probably an error */
2930 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2931 addReply(c,shared.sameobjecterr);
2932 return;
2933 }
2934
2935 o = lookupKeyWrite(c->db,c->argv[1]);
2936 if (o == NULL) {
2937 addReply(c,shared.nokeyerr);
2938 return;
2939 }
2940 incrRefCount(o);
2941 deleteIfVolatile(c->db,c->argv[2]);
2942 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2943 if (nx) {
2944 decrRefCount(o);
2945 addReply(c,shared.czero);
2946 return;
2947 }
2948 dictReplace(c->db->dict,c->argv[2],o);
2949 } else {
2950 incrRefCount(c->argv[2]);
2951 }
2952 deleteKey(c->db,c->argv[1]);
2953 server.dirty++;
2954 addReply(c,nx ? shared.cone : shared.ok);
2955 }
2956
2957 static void renameCommand(redisClient *c) {
2958 renameGenericCommand(c,0);
2959 }
2960
2961 static void renamenxCommand(redisClient *c) {
2962 renameGenericCommand(c,1);
2963 }
2964
2965 static void moveCommand(redisClient *c) {
2966 robj *o;
2967 redisDb *src, *dst;
2968 int srcid;
2969
2970 /* Obtain source and target DB pointers */
2971 src = c->db;
2972 srcid = c->db->id;
2973 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2974 addReply(c,shared.outofrangeerr);
2975 return;
2976 }
2977 dst = c->db;
2978 selectDb(c,srcid); /* Back to the source DB */
2979
2980 /* If the user is moving using as target the same
2981 * DB as the source DB it is probably an error. */
2982 if (src == dst) {
2983 addReply(c,shared.sameobjecterr);
2984 return;
2985 }
2986
2987 /* Check if the element exists and get a reference */
2988 o = lookupKeyWrite(c->db,c->argv[1]);
2989 if (!o) {
2990 addReply(c,shared.czero);
2991 return;
2992 }
2993
2994 /* Try to add the element to the target DB */
2995 deleteIfVolatile(dst,c->argv[1]);
2996 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2997 addReply(c,shared.czero);
2998 return;
2999 }
3000 incrRefCount(c->argv[1]);
3001 incrRefCount(o);
3002
3003 /* OK! key moved, free the entry in the source DB */
3004 deleteKey(src,c->argv[1]);
3005 server.dirty++;
3006 addReply(c,shared.cone);
3007 }
3008
3009 /* =================================== Lists ================================ */
3010 static void pushGenericCommand(redisClient *c, int where) {
3011 robj *lobj;
3012 list *list;
3013
3014 lobj = lookupKeyWrite(c->db,c->argv[1]);
3015 if (lobj == NULL) {
3016 lobj = createListObject();
3017 list = lobj->ptr;
3018 if (where == REDIS_HEAD) {
3019 listAddNodeHead(list,c->argv[2]);
3020 } else {
3021 listAddNodeTail(list,c->argv[2]);
3022 }
3023 dictAdd(c->db->dict,c->argv[1],lobj);
3024 incrRefCount(c->argv[1]);
3025 incrRefCount(c->argv[2]);
3026 } else {
3027 if (lobj->type != REDIS_LIST) {
3028 addReply(c,shared.wrongtypeerr);
3029 return;
3030 }
3031 list = lobj->ptr;
3032 if (where == REDIS_HEAD) {
3033 listAddNodeHead(list,c->argv[2]);
3034 } else {
3035 listAddNodeTail(list,c->argv[2]);
3036 }
3037 incrRefCount(c->argv[2]);
3038 }
3039 server.dirty++;
3040 addReply(c,shared.ok);
3041 }
3042
3043 static void lpushCommand(redisClient *c) {
3044 pushGenericCommand(c,REDIS_HEAD);
3045 }
3046
3047 static void rpushCommand(redisClient *c) {
3048 pushGenericCommand(c,REDIS_TAIL);
3049 }
3050
3051 static void llenCommand(redisClient *c) {
3052 robj *o;
3053 list *l;
3054
3055 o = lookupKeyRead(c->db,c->argv[1]);
3056 if (o == NULL) {
3057 addReply(c,shared.czero);
3058 return;
3059 } else {
3060 if (o->type != REDIS_LIST) {
3061 addReply(c,shared.wrongtypeerr);
3062 } else {
3063 l = o->ptr;
3064 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3065 }
3066 }
3067 }
3068
3069 static void lindexCommand(redisClient *c) {
3070 robj *o;
3071 int index = atoi(c->argv[2]->ptr);
3072
3073 o = lookupKeyRead(c->db,c->argv[1]);
3074 if (o == NULL) {
3075 addReply(c,shared.nullbulk);
3076 } else {
3077 if (o->type != REDIS_LIST) {
3078 addReply(c,shared.wrongtypeerr);
3079 } else {
3080 list *list = o->ptr;
3081 listNode *ln;
3082
3083 ln = listIndex(list, index);
3084 if (ln == NULL) {
3085 addReply(c,shared.nullbulk);
3086 } else {
3087 robj *ele = listNodeValue(ln);
3088 addReplyBulkLen(c,ele);
3089 addReply(c,ele);
3090 addReply(c,shared.crlf);
3091 }
3092 }
3093 }
3094 }
3095
3096 static void lsetCommand(redisClient *c) {
3097 robj *o;
3098 int index = atoi(c->argv[2]->ptr);
3099
3100 o = lookupKeyWrite(c->db,c->argv[1]);
3101 if (o == NULL) {
3102 addReply(c,shared.nokeyerr);
3103 } else {
3104 if (o->type != REDIS_LIST) {
3105 addReply(c,shared.wrongtypeerr);
3106 } else {
3107 list *list = o->ptr;
3108 listNode *ln;
3109
3110 ln = listIndex(list, index);
3111 if (ln == NULL) {
3112 addReply(c,shared.outofrangeerr);
3113 } else {
3114 robj *ele = listNodeValue(ln);
3115
3116 decrRefCount(ele);
3117 listNodeValue(ln) = c->argv[3];
3118 incrRefCount(c->argv[3]);
3119 addReply(c,shared.ok);
3120 server.dirty++;
3121 }
3122 }
3123 }
3124 }
3125
3126 static void popGenericCommand(redisClient *c, int where) {
3127 robj *o;
3128
3129 o = lookupKeyWrite(c->db,c->argv[1]);
3130 if (o == NULL) {
3131 addReply(c,shared.nullbulk);
3132 } else {
3133 if (o->type != REDIS_LIST) {
3134 addReply(c,shared.wrongtypeerr);
3135 } else {
3136 list *list = o->ptr;
3137 listNode *ln;
3138
3139 if (where == REDIS_HEAD)
3140 ln = listFirst(list);
3141 else
3142 ln = listLast(list);
3143
3144 if (ln == NULL) {
3145 addReply(c,shared.nullbulk);
3146 } else {
3147 robj *ele = listNodeValue(ln);
3148 addReplyBulkLen(c,ele);
3149 addReply(c,ele);
3150 addReply(c,shared.crlf);
3151 listDelNode(list,ln);
3152 server.dirty++;
3153 }
3154 }
3155 }
3156 }
3157
3158 static void lpopCommand(redisClient *c) {
3159 popGenericCommand(c,REDIS_HEAD);
3160 }
3161
3162 static void rpopCommand(redisClient *c) {
3163 popGenericCommand(c,REDIS_TAIL);
3164 }
3165
3166 static void lrangeCommand(redisClient *c) {
3167 robj *o;
3168 int start = atoi(c->argv[2]->ptr);
3169 int end = atoi(c->argv[3]->ptr);
3170
3171 o = lookupKeyRead(c->db,c->argv[1]);
3172 if (o == NULL) {
3173 addReply(c,shared.nullmultibulk);
3174 } else {
3175 if (o->type != REDIS_LIST) {
3176 addReply(c,shared.wrongtypeerr);
3177 } else {
3178 list *list = o->ptr;
3179 listNode *ln;
3180 int llen = listLength(list);
3181 int rangelen, j;
3182 robj *ele;
3183
3184 /* convert negative indexes */
3185 if (start < 0) start = llen+start;
3186 if (end < 0) end = llen+end;
3187 if (start < 0) start = 0;
3188 if (end < 0) end = 0;
3189
3190 /* indexes sanity checks */
3191 if (start > end || start >= llen) {
3192 /* Out of range start or start > end result in empty list */
3193 addReply(c,shared.emptymultibulk);
3194 return;
3195 }
3196 if (end >= llen) end = llen-1;
3197 rangelen = (end-start)+1;
3198
3199 /* Return the result in form of a multi-bulk reply */
3200 ln = listIndex(list, start);
3201 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3202 for (j = 0; j < rangelen; j++) {
3203 ele = listNodeValue(ln);
3204 addReplyBulkLen(c,ele);
3205 addReply(c,ele);
3206 addReply(c,shared.crlf);
3207 ln = ln->next;
3208 }
3209 }
3210 }
3211 }
3212
3213 static void ltrimCommand(redisClient *c) {
3214 robj *o;
3215 int start = atoi(c->argv[2]->ptr);
3216 int end = atoi(c->argv[3]->ptr);
3217
3218 o = lookupKeyWrite(c->db,c->argv[1]);
3219 if (o == NULL) {
3220 addReply(c,shared.nokeyerr);
3221 } else {
3222 if (o->type != REDIS_LIST) {
3223 addReply(c,shared.wrongtypeerr);
3224 } else {
3225 list *list = o->ptr;
3226 listNode *ln;
3227 int llen = listLength(list);
3228 int j, ltrim, rtrim;
3229
3230 /* convert negative indexes */
3231 if (start < 0) start = llen+start;
3232 if (end < 0) end = llen+end;
3233 if (start < 0) start = 0;
3234 if (end < 0) end = 0;
3235
3236 /* indexes sanity checks */
3237 if (start > end || start >= llen) {
3238 /* Out of range start or start > end result in empty list */
3239 ltrim = llen;
3240 rtrim = 0;
3241 } else {
3242 if (end >= llen) end = llen-1;
3243 ltrim = start;
3244 rtrim = llen-end-1;
3245 }
3246
3247 /* Remove list elements to perform the trim */
3248 for (j = 0; j < ltrim; j++) {
3249 ln = listFirst(list);
3250 listDelNode(list,ln);
3251 }
3252 for (j = 0; j < rtrim; j++) {
3253 ln = listLast(list);
3254 listDelNode(list,ln);
3255 }
3256 server.dirty++;
3257 addReply(c,shared.ok);
3258 }
3259 }
3260 }
3261
3262 static void lremCommand(redisClient *c) {
3263 robj *o;
3264
3265 o = lookupKeyWrite(c->db,c->argv[1]);
3266 if (o == NULL) {
3267 addReply(c,shared.czero);
3268 } else {
3269 if (o->type != REDIS_LIST) {
3270 addReply(c,shared.wrongtypeerr);
3271 } else {
3272 list *list = o->ptr;
3273 listNode *ln, *next;
3274 int toremove = atoi(c->argv[2]->ptr);
3275 int removed = 0;
3276 int fromtail = 0;
3277
3278 if (toremove < 0) {
3279 toremove = -toremove;
3280 fromtail = 1;
3281 }
3282 ln = fromtail ? list->tail : list->head;
3283 while (ln) {
3284 robj *ele = listNodeValue(ln);
3285
3286 next = fromtail ? ln->prev : ln->next;
3287 if (compareStringObjects(ele,c->argv[3]) == 0) {
3288 listDelNode(list,ln);
3289 server.dirty++;
3290 removed++;
3291 if (toremove && removed == toremove) break;
3292 }
3293 ln = next;
3294 }
3295 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3296 }
3297 }
3298 }
3299
3300 /* ==================================== Sets ================================ */
3301
3302 static void saddCommand(redisClient *c) {
3303 robj *set;
3304
3305 set = lookupKeyWrite(c->db,c->argv[1]);
3306 if (set == NULL) {
3307 set = createSetObject();
3308 dictAdd(c->db->dict,c->argv[1],set);
3309 incrRefCount(c->argv[1]);
3310 } else {
3311 if (set->type != REDIS_SET) {
3312 addReply(c,shared.wrongtypeerr);
3313 return;
3314 }
3315 }
3316 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3317 incrRefCount(c->argv[2]);
3318 server.dirty++;
3319 addReply(c,shared.cone);
3320 } else {
3321 addReply(c,shared.czero);
3322 }
3323 }
3324
3325 static void sremCommand(redisClient *c) {
3326 robj *set;
3327
3328 set = lookupKeyWrite(c->db,c->argv[1]);
3329 if (set == NULL) {
3330 addReply(c,shared.czero);
3331 } else {
3332 if (set->type != REDIS_SET) {
3333 addReply(c,shared.wrongtypeerr);
3334 return;
3335 }
3336 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3337 server.dirty++;
3338 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3339 addReply(c,shared.cone);
3340 } else {
3341 addReply(c,shared.czero);
3342 }
3343 }
3344 }
3345
3346 static void smoveCommand(redisClient *c) {
3347 robj *srcset, *dstset;
3348
3349 srcset = lookupKeyWrite(c->db,c->argv[1]);
3350 dstset = lookupKeyWrite(c->db,c->argv[2]);
3351
3352 /* If the source key does not exist return 0, if it's of the wrong type
3353 * raise an error */
3354 if (srcset == NULL || srcset->type != REDIS_SET) {
3355 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3356 return;
3357 }
3358 /* Error if the destination key is not a set as well */
3359 if (dstset && dstset->type != REDIS_SET) {
3360 addReply(c,shared.wrongtypeerr);
3361 return;
3362 }
3363 /* Remove the element from the source set */
3364 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3365 /* Key not found in the src set! return zero */
3366 addReply(c,shared.czero);
3367 return;
3368 }
3369 server.dirty++;
3370 /* Add the element to the destination set */
3371 if (!dstset) {
3372 dstset = createSetObject();
3373 dictAdd(c->db->dict,c->argv[2],dstset);
3374 incrRefCount(c->argv[2]);
3375 }
3376 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3377 incrRefCount(c->argv[3]);
3378 addReply(c,shared.cone);
3379 }
3380
3381 static void sismemberCommand(redisClient *c) {
3382 robj *set;
3383
3384 set = lookupKeyRead(c->db,c->argv[1]);
3385 if (set == NULL) {
3386 addReply(c,shared.czero);
3387 } else {
3388 if (set->type != REDIS_SET) {
3389 addReply(c,shared.wrongtypeerr);
3390 return;
3391 }
3392 if (dictFind(set->ptr,c->argv[2]))
3393 addReply(c,shared.cone);
3394 else
3395 addReply(c,shared.czero);
3396 }
3397 }
3398
3399 static void scardCommand(redisClient *c) {
3400 robj *o;
3401 dict *s;
3402
3403 o = lookupKeyRead(c->db,c->argv[1]);
3404 if (o == NULL) {
3405 addReply(c,shared.czero);
3406 return;
3407 } else {
3408 if (o->type != REDIS_SET) {
3409 addReply(c,shared.wrongtypeerr);
3410 } else {
3411 s = o->ptr;
3412 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3413 dictSize(s)));
3414 }
3415 }
3416 }
3417
3418 static void spopCommand(redisClient *c) {
3419 robj *set;
3420 dictEntry *de;
3421
3422 set = lookupKeyWrite(c->db,c->argv[1]);
3423 if (set == NULL) {
3424 addReply(c,shared.nullbulk);
3425 } else {
3426 if (set->type != REDIS_SET) {
3427 addReply(c,shared.wrongtypeerr);
3428 return;
3429 }
3430 de = dictGetRandomKey(set->ptr);
3431 if (de == NULL) {
3432 addReply(c,shared.nullbulk);
3433 } else {
3434 robj *ele = dictGetEntryKey(de);
3435
3436 addReplyBulkLen(c,ele);
3437 addReply(c,ele);
3438 addReply(c,shared.crlf);
3439 dictDelete(set->ptr,ele);
3440 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3441 server.dirty++;
3442 }
3443 }
3444 }
3445
3446 static void srandmemberCommand(redisClient *c) {
3447 robj *set;
3448 dictEntry *de;
3449
3450 set = lookupKeyRead(c->db,c->argv[1]);
3451 if (set == NULL) {
3452 addReply(c,shared.nullbulk);
3453 } else {
3454 if (set->type != REDIS_SET) {
3455 addReply(c,shared.wrongtypeerr);
3456 return;
3457 }
3458 de = dictGetRandomKey(set->ptr);
3459 if (de == NULL) {
3460 addReply(c,shared.nullbulk);
3461 } else {
3462 robj *ele = dictGetEntryKey(de);
3463
3464 addReplyBulkLen(c,ele);
3465 addReply(c,ele);
3466 addReply(c,shared.crlf);
3467 }
3468 }
3469 }
3470
3471 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3472 dict **d1 = (void*) s1, **d2 = (void*) s2;
3473
3474 return dictSize(*d1)-dictSize(*d2);
3475 }
3476
3477 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3478 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3479 dictIterator *di;
3480 dictEntry *de;
3481 robj *lenobj = NULL, *dstset = NULL;
3482 int j, cardinality = 0;
3483
3484 for (j = 0; j < setsnum; j++) {
3485 robj *setobj;
3486
3487 setobj = dstkey ?
3488 lookupKeyWrite(c->db,setskeys[j]) :
3489 lookupKeyRead(c->db,setskeys[j]);
3490 if (!setobj) {
3491 zfree(dv);
3492 if (dstkey) {
3493 deleteKey(c->db,dstkey);
3494 addReply(c,shared.ok);
3495 } else {
3496 addReply(c,shared.nullmultibulk);
3497 }
3498 return;
3499 }
3500 if (setobj->type != REDIS_SET) {
3501 zfree(dv);
3502 addReply(c,shared.wrongtypeerr);
3503 return;
3504 }
3505 dv[j] = setobj->ptr;
3506 }
3507 /* Sort sets from the smallest to largest, this will improve our
3508 * algorithm's performace */
3509 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3510
3511 /* The first thing we should output is the total number of elements...
3512 * since this is a multi-bulk write, but at this stage we don't know
3513 * the intersection set size, so we use a trick, append an empty object
3514 * to the output list and save the pointer to later modify it with the
3515 * right length */
3516 if (!dstkey) {
3517 lenobj = createObject(REDIS_STRING,NULL);
3518 addReply(c,lenobj);
3519 decrRefCount(lenobj);
3520 } else {
3521 /* If we have a target key where to store the resulting set
3522 * create this key with an empty set inside */
3523 dstset = createSetObject();
3524 }
3525
3526 /* Iterate all the elements of the first (smallest) set, and test
3527 * the element against all the other sets, if at least one set does
3528 * not include the element it is discarded */
3529 di = dictGetIterator(dv[0]);
3530
3531 while((de = dictNext(di)) != NULL) {
3532 robj *ele;
3533
3534 for (j = 1; j < setsnum; j++)
3535 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3536 if (j != setsnum)
3537 continue; /* at least one set does not contain the member */
3538 ele = dictGetEntryKey(de);
3539 if (!dstkey) {
3540 addReplyBulkLen(c,ele);
3541 addReply(c,ele);
3542 addReply(c,shared.crlf);
3543 cardinality++;
3544 } else {
3545 dictAdd(dstset->ptr,ele,NULL);
3546 incrRefCount(ele);
3547 }
3548 }
3549 dictReleaseIterator(di);
3550
3551 if (dstkey) {
3552 /* Store the resulting set into the target */
3553 deleteKey(c->db,dstkey);
3554 dictAdd(c->db->dict,dstkey,dstset);
3555 incrRefCount(dstkey);
3556 }
3557
3558 if (!dstkey) {
3559 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3560 } else {
3561 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3562 dictSize((dict*)dstset->ptr)));
3563 server.dirty++;
3564 }
3565 zfree(dv);
3566 }
3567
3568 static void sinterCommand(redisClient *c) {
3569 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3570 }
3571
3572 static void sinterstoreCommand(redisClient *c) {
3573 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3574 }
3575
3576 #define REDIS_OP_UNION 0
3577 #define REDIS_OP_DIFF 1
3578
3579 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3580 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3581 dictIterator *di;
3582 dictEntry *de;
3583 robj *dstset = NULL;
3584 int j, cardinality = 0;
3585
3586 for (j = 0; j < setsnum; j++) {
3587 robj *setobj;
3588
3589 setobj = dstkey ?
3590 lookupKeyWrite(c->db,setskeys[j]) :
3591 lookupKeyRead(c->db,setskeys[j]);
3592 if (!setobj) {
3593 dv[j] = NULL;
3594 continue;
3595 }
3596 if (setobj->type != REDIS_SET) {
3597 zfree(dv);
3598 addReply(c,shared.wrongtypeerr);
3599 return;
3600 }
3601 dv[j] = setobj->ptr;
3602 }
3603
3604 /* We need a temp set object to store our union. If the dstkey
3605 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3606 * this set object will be the resulting object to set into the target key*/
3607 dstset = createSetObject();
3608
3609 /* Iterate all the elements of all the sets, add every element a single
3610 * time to the result set */
3611 for (j = 0; j < setsnum; j++) {
3612 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3613 if (!dv[j]) continue; /* non existing keys are like empty sets */
3614
3615 di = dictGetIterator(dv[j]);
3616
3617 while((de = dictNext(di)) != NULL) {
3618 robj *ele;
3619
3620 /* dictAdd will not add the same element multiple times */
3621 ele = dictGetEntryKey(de);
3622 if (op == REDIS_OP_UNION || j == 0) {
3623 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3624 incrRefCount(ele);
3625 cardinality++;
3626 }
3627 } else if (op == REDIS_OP_DIFF) {
3628 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3629 cardinality--;
3630 }
3631 }
3632 }
3633 dictReleaseIterator(di);
3634
3635 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3636 }
3637
3638 /* Output the content of the resulting set, if not in STORE mode */
3639 if (!dstkey) {
3640 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3641 di = dictGetIterator(dstset->ptr);
3642 while((de = dictNext(di)) != NULL) {
3643 robj *ele;
3644
3645 ele = dictGetEntryKey(de);
3646 addReplyBulkLen(c,ele);
3647 addReply(c,ele);
3648 addReply(c,shared.crlf);
3649 }
3650 dictReleaseIterator(di);
3651 } else {
3652 /* If we have a target key where to store the resulting set
3653 * create this key with the result set inside */
3654 deleteKey(c->db,dstkey);
3655 dictAdd(c->db->dict,dstkey,dstset);
3656 incrRefCount(dstkey);
3657 }
3658
3659 /* Cleanup */
3660 if (!dstkey) {
3661 decrRefCount(dstset);
3662 } else {
3663 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3664 dictSize((dict*)dstset->ptr)));
3665 server.dirty++;
3666 }
3667 zfree(dv);
3668 }
3669
3670 static void sunionCommand(redisClient *c) {
3671 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3672 }
3673
3674 static void sunionstoreCommand(redisClient *c) {
3675 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3676 }
3677
3678 static void sdiffCommand(redisClient *c) {
3679 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3680 }
3681
3682 static void sdiffstoreCommand(redisClient *c) {
3683 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3684 }
3685
3686 /* ==================================== ZSets =============================== */
3687
3688 /* ZSETs are ordered sets using two data structures to hold the same elements
3689 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3690 * data structure.
3691 *
3692 * The elements are added to an hash table mapping Redis objects to scores.
3693 * At the same time the elements are added to a skip list mapping scores
3694 * to Redis objects (so objects are sorted by scores in this "view"). */
3695
3696 /* This skiplist implementation is almost a C translation of the original
3697 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3698 * Alternative to Balanced Trees", modified in three ways:
3699 * a) this implementation allows for repeated values.
3700 * b) the comparison is not just by key (our 'score') but by satellite data.
3701 * c) there is a back pointer, so it's a doubly linked list with the back
3702 * pointers being only at "level 1". This allows to traverse the list
3703 * from tail to head, useful for ZREVRANGE. */
3704
3705 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3706 zskiplistNode *zn = zmalloc(sizeof(*zn));
3707
3708 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3709 zn->score = score;
3710 zn->obj = obj;
3711 return zn;
3712 }
3713
3714 static zskiplist *zslCreate(void) {
3715 int j;
3716 zskiplist *zsl;
3717
3718 zsl = zmalloc(sizeof(*zsl));
3719 zsl->level = 1;
3720 zsl->length = 0;
3721 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3722 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3723 zsl->header->forward[j] = NULL;
3724 return zsl;
3725 }
3726
3727 static void zslFreeNode(zskiplistNode *node) {
3728 decrRefCount(node->obj);
3729 zfree(node);
3730 }
3731
3732 static void zslFree(zskiplist *zsl) {
3733 zskiplistNode *node = zsl->header->forward[1], *next;
3734
3735 while(node) {
3736 next = node->forward[1];
3737 zslFreeNode(node);
3738 node = next;
3739 }
3740 }
3741
3742 static int zslRandomLevel(void) {
3743 int level = 1;
3744 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3745 level += 1;
3746 return level;
3747 }
3748
3749 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3750 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3751 int i, level;
3752
3753 x = zsl->header;
3754 for (i = zsl->level-1; i >= 0; i--) {
3755 while (x->forward[i] && x->forward[i]->score < score)
3756 x = x->forward[i];
3757 update[i] = x;
3758 }
3759 /* we assume the key is not already inside, since we allow duplicated
3760 * scores, and the re-insertion of score and redis object should never
3761 * happpen since the caller of zslInsert() should test in the hash table
3762 * if the element is already inside or not. */
3763 level = zslRandomLevel();
3764 if (level > zsl->level) {
3765 for (i = zsl->level; i < level; i++)
3766 update[i] = zsl->header;
3767 zsl->level = level;
3768 }
3769 x = zslCreateNode(level,score,obj);
3770 for (i = 0; i < level; i++) {
3771 x->forward[i] = update[i]->forward[i];
3772 update[i]->forward[i] = x;
3773 }
3774 zsl->length++;
3775 }
3776
3777 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
3778 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3779 int i;
3780
3781 x = zsl->header;
3782 for (i = zsl->level-1; i >= 0; i--) {
3783 while (x->forward[i] && x->forward[i]->score < score)
3784 x = x->forward[i];
3785 update[i] = x;
3786 }
3787 /* We may have multiple elements with the same score, what we need
3788 * is to find the element with both the right score and object. */
3789 x = x->forward[0];
3790 while(x->score == score) {
3791 if (compareStringObjects(x->obj,obj) == 0) {
3792 for (i = 0; i < zsl->level; i++) {
3793 if (update[i]->forward[i] != x) break;
3794 update[i]->forward[i] = x->forward[i];
3795 }
3796 zslFreeNode(x);
3797 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
3798 zsl->level--;
3799 zsl->length--;
3800 return 1;
3801 } else {
3802 x = x->forward[0];
3803 if (!x) return 0; /* end of the list reached, not found */
3804 }
3805 }
3806 return 0; /* not found */
3807 }
3808
3809 /* The actual Z-commands implementations */
3810
3811 static void zaddCommand(redisClient *c) {
3812 robj *zsetobj;
3813 zset *zs;
3814 double *score;
3815
3816 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
3817 if (zsetobj == NULL) {
3818 zsetobj = createZsetObject();
3819 dictAdd(c->db->dict,c->argv[1],zsetobj);
3820 incrRefCount(c->argv[1]);
3821 } else {
3822 if (zsetobj->type != REDIS_ZSET) {
3823 addReply(c,shared.wrongtypeerr);
3824 return;
3825 }
3826 }
3827 score = zmalloc(sizeof(double));
3828 *score = strtod(c->argv[2]->ptr,NULL);
3829 zs = zsetobj->ptr;
3830 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
3831 /* case 1: New element */
3832 incrRefCount(c->argv[3]); /* added to hash */
3833 zslInsert(zs->zsl,*score,c->argv[3]);
3834 incrRefCount(c->argv[3]); /* added to skiplist */
3835 server.dirty++;
3836 addReply(c,shared.cone);
3837 } else {
3838 dictEntry *de;
3839 double *oldscore;
3840
3841 /* case 2: Score update operation */
3842 de = dictFind(zs->dict,c->argv[3]);
3843 assert(de != NULL);
3844 oldscore = dictGetEntryVal(de);
3845 if (*score != *oldscore) {
3846 int deleted;
3847
3848 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
3849 assert(deleted != 0);
3850 zslInsert(zs->zsl,*score,c->argv[3]);
3851 incrRefCount(c->argv[3]);
3852 server.dirty++;
3853 }
3854 addReply(c,shared.czero);
3855 }
3856 }
3857
3858 static void zremCommand(redisClient *c) {
3859 robj *zsetobj;
3860 zset *zs;
3861
3862 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
3863 if (zsetobj == NULL) {
3864 addReply(c,shared.czero);
3865 } else {
3866 dictEntry *de;
3867 double *oldscore;
3868 int deleted;
3869
3870 if (zsetobj->type != REDIS_ZSET) {
3871 addReply(c,shared.wrongtypeerr);
3872 return;
3873 }
3874 zs = zsetobj->ptr;
3875 de = dictFind(zs->dict,c->argv[2]);
3876 if (de == NULL) {
3877 addReply(c,shared.czero);
3878 return;
3879 }
3880 /* Delete from the skiplist */
3881 oldscore = dictGetEntryVal(de);
3882 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
3883 assert(deleted != 0);
3884
3885 /* Delete from the hash table */
3886 dictDelete(zs->dict,c->argv[2]);
3887 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
3888 server.dirty++;
3889 addReply(c,shared.cone);
3890 }
3891 }
3892
3893 static void zrangeCommand(redisClient *c) {
3894 robj *o;
3895 int start = atoi(c->argv[2]->ptr);
3896 int end = atoi(c->argv[3]->ptr);
3897
3898 o = lookupKeyRead(c->db,c->argv[1]);
3899 if (o == NULL) {
3900 addReply(c,shared.nullmultibulk);
3901 } else {
3902 if (o->type != REDIS_ZSET) {
3903 addReply(c,shared.wrongtypeerr);
3904 } else {
3905 zset *zsetobj = o->ptr;
3906 zskiplist *zsl = zsetobj->zsl;
3907 zskiplistNode *ln;
3908
3909 int llen = zsl->length;
3910 int rangelen, j;
3911 robj *ele;
3912
3913 /* convert negative indexes */
3914 if (start < 0) start = llen+start;
3915 if (end < 0) end = llen+end;
3916 if (start < 0) start = 0;
3917 if (end < 0) end = 0;
3918
3919 /* indexes sanity checks */
3920 if (start > end || start >= llen) {
3921 /* Out of range start or start > end result in empty list */
3922 addReply(c,shared.emptymultibulk);
3923 return;
3924 }
3925 if (end >= llen) end = llen-1;
3926 rangelen = (end-start)+1;
3927
3928 /* Return the result in form of a multi-bulk reply */
3929 ln = zsl->header->forward[0];
3930 while (start--)
3931 ln = ln->forward[0];
3932
3933 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3934 for (j = 0; j < rangelen; j++) {
3935 ele = ln->obj;
3936 addReplyBulkLen(c,ele);
3937 addReply(c,ele);
3938 addReply(c,shared.crlf);
3939 ln = ln->forward[0];
3940 }
3941 }
3942 }
3943 }
3944
3945 static void zlenCommand(redisClient *c) {
3946 robj *o;
3947 zset *zs;
3948
3949 o = lookupKeyRead(c->db,c->argv[1]);
3950 if (o == NULL) {
3951 addReply(c,shared.czero);
3952 return;
3953 } else {
3954 if (o->type != REDIS_ZSET) {
3955 addReply(c,shared.wrongtypeerr);
3956 } else {
3957 zs = o->ptr;
3958 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
3959 }
3960 }
3961 }
3962
3963 /* ========================= Non type-specific commands ==================== */
3964
3965 static void flushdbCommand(redisClient *c) {
3966 server.dirty += dictSize(c->db->dict);
3967 dictEmpty(c->db->dict);
3968 dictEmpty(c->db->expires);
3969 addReply(c,shared.ok);
3970 }
3971
3972 static void flushallCommand(redisClient *c) {
3973 server.dirty += emptyDb();
3974 addReply(c,shared.ok);
3975 rdbSave(server.dbfilename);
3976 server.dirty++;
3977 }
3978
3979 static redisSortOperation *createSortOperation(int type, robj *pattern) {
3980 redisSortOperation *so = zmalloc(sizeof(*so));
3981 so->type = type;
3982 so->pattern = pattern;
3983 return so;
3984 }
3985
3986 /* Return the value associated to the key with a name obtained
3987 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3988 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3989 char *p;
3990 sds spat, ssub;
3991 robj keyobj;
3992 int prefixlen, sublen, postfixlen;
3993 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3994 struct {
3995 long len;
3996 long free;
3997 char buf[REDIS_SORTKEY_MAX+1];
3998 } keyname;
3999
4000 if (subst->encoding == REDIS_ENCODING_RAW)
4001 incrRefCount(subst);
4002 else {
4003 subst = getDecodedObject(subst);
4004 }
4005
4006 spat = pattern->ptr;
4007 ssub = subst->ptr;
4008 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4009 p = strchr(spat,'*');
4010 if (!p) return NULL;
4011
4012 prefixlen = p-spat;
4013 sublen = sdslen(ssub);
4014 postfixlen = sdslen(spat)-(prefixlen+1);
4015 memcpy(keyname.buf,spat,prefixlen);
4016 memcpy(keyname.buf+prefixlen,ssub,sublen);
4017 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4018 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4019 keyname.len = prefixlen+sublen+postfixlen;
4020
4021 keyobj.refcount = 1;
4022 keyobj.type = REDIS_STRING;
4023 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4024
4025 decrRefCount(subst);
4026
4027 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4028 return lookupKeyRead(db,&keyobj);
4029 }
4030
4031 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4032 * the additional parameter is not standard but a BSD-specific we have to
4033 * pass sorting parameters via the global 'server' structure */
4034 static int sortCompare(const void *s1, const void *s2) {
4035 const redisSortObject *so1 = s1, *so2 = s2;
4036 int cmp;
4037
4038 if (!server.sort_alpha) {
4039 /* Numeric sorting. Here it's trivial as we precomputed scores */
4040 if (so1->u.score > so2->u.score) {
4041 cmp = 1;
4042 } else if (so1->u.score < so2->u.score) {
4043 cmp = -1;
4044 } else {
4045 cmp = 0;
4046 }
4047 } else {
4048 /* Alphanumeric sorting */
4049 if (server.sort_bypattern) {
4050 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4051 /* At least one compare object is NULL */
4052 if (so1->u.cmpobj == so2->u.cmpobj)
4053 cmp = 0;
4054 else if (so1->u.cmpobj == NULL)
4055 cmp = -1;
4056 else
4057 cmp = 1;
4058 } else {
4059 /* We have both the objects, use strcoll */
4060 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4061 }
4062 } else {
4063 /* Compare elements directly */
4064 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4065 so2->obj->encoding == REDIS_ENCODING_RAW) {
4066 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4067 } else {
4068 robj *dec1, *dec2;
4069
4070 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4071 so1->obj : getDecodedObject(so1->obj);
4072 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4073 so2->obj : getDecodedObject(so2->obj);
4074 cmp = strcoll(dec1->ptr,dec2->ptr);
4075 if (dec1 != so1->obj) decrRefCount(dec1);
4076 if (dec2 != so2->obj) decrRefCount(dec2);
4077 }
4078 }
4079 }
4080 return server.sort_desc ? -cmp : cmp;
4081 }
4082
4083 /* The SORT command is the most complex command in Redis. Warning: this code
4084 * is optimized for speed and a bit less for readability */
4085 static void sortCommand(redisClient *c) {
4086 list *operations;
4087 int outputlen = 0;
4088 int desc = 0, alpha = 0;
4089 int limit_start = 0, limit_count = -1, start, end;
4090 int j, dontsort = 0, vectorlen;
4091 int getop = 0; /* GET operation counter */
4092 robj *sortval, *sortby = NULL;
4093 redisSortObject *vector; /* Resulting vector to sort */
4094
4095 /* Lookup the key to sort. It must be of the right types */
4096 sortval = lookupKeyRead(c->db,c->argv[1]);
4097 if (sortval == NULL) {
4098 addReply(c,shared.nokeyerr);
4099 return;
4100 }
4101 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4102 addReply(c,shared.wrongtypeerr);
4103 return;
4104 }
4105
4106 /* Create a list of operations to perform for every sorted element.
4107 * Operations can be GET/DEL/INCR/DECR */
4108 operations = listCreate();
4109 listSetFreeMethod(operations,zfree);
4110 j = 2;
4111
4112 /* Now we need to protect sortval incrementing its count, in the future
4113 * SORT may have options able to overwrite/delete keys during the sorting
4114 * and the sorted key itself may get destroied */
4115 incrRefCount(sortval);
4116
4117 /* The SORT command has an SQL-alike syntax, parse it */
4118 while(j < c->argc) {
4119 int leftargs = c->argc-j-1;
4120 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4121 desc = 0;
4122 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4123 desc = 1;
4124 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4125 alpha = 1;
4126 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4127 limit_start = atoi(c->argv[j+1]->ptr);
4128 limit_count = atoi(c->argv[j+2]->ptr);
4129 j+=2;
4130 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4131 sortby = c->argv[j+1];
4132 /* If the BY pattern does not contain '*', i.e. it is constant,
4133 * we don't need to sort nor to lookup the weight keys. */
4134 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4135 j++;
4136 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4137 listAddNodeTail(operations,createSortOperation(
4138 REDIS_SORT_GET,c->argv[j+1]));
4139 getop++;
4140 j++;
4141 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
4142 listAddNodeTail(operations,createSortOperation(
4143 REDIS_SORT_DEL,c->argv[j+1]));
4144 j++;
4145 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
4146 listAddNodeTail(operations,createSortOperation(
4147 REDIS_SORT_INCR,c->argv[j+1]));
4148 j++;
4149 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4150 listAddNodeTail(operations,createSortOperation(
4151 REDIS_SORT_DECR,c->argv[j+1]));
4152 j++;
4153 } else {
4154 decrRefCount(sortval);
4155 listRelease(operations);
4156 addReply(c,shared.syntaxerr);
4157 return;
4158 }
4159 j++;
4160 }
4161
4162 /* Load the sorting vector with all the objects to sort */
4163 vectorlen = (sortval->type == REDIS_LIST) ?
4164 listLength((list*)sortval->ptr) :
4165 dictSize((dict*)sortval->ptr);
4166 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4167 j = 0;
4168 if (sortval->type == REDIS_LIST) {
4169 list *list = sortval->ptr;
4170 listNode *ln;
4171
4172 listRewind(list);
4173 while((ln = listYield(list))) {
4174 robj *ele = ln->value;
4175 vector[j].obj = ele;
4176 vector[j].u.score = 0;
4177 vector[j].u.cmpobj = NULL;
4178 j++;
4179 }
4180 } else {
4181 dict *set = sortval->ptr;
4182 dictIterator *di;
4183 dictEntry *setele;
4184
4185 di = dictGetIterator(set);
4186 while((setele = dictNext(di)) != NULL) {
4187 vector[j].obj = dictGetEntryKey(setele);
4188 vector[j].u.score = 0;
4189 vector[j].u.cmpobj = NULL;
4190 j++;
4191 }
4192 dictReleaseIterator(di);
4193 }
4194 assert(j == vectorlen);
4195
4196 /* Now it's time to load the right scores in the sorting vector */
4197 if (dontsort == 0) {
4198 for (j = 0; j < vectorlen; j++) {
4199 if (sortby) {
4200 robj *byval;
4201
4202 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4203 if (!byval || byval->type != REDIS_STRING) continue;
4204 if (alpha) {
4205 if (byval->encoding == REDIS_ENCODING_RAW) {
4206 vector[j].u.cmpobj = byval;
4207 incrRefCount(byval);
4208 } else {
4209 vector[j].u.cmpobj = getDecodedObject(byval);
4210 }
4211 } else {
4212 if (byval->encoding == REDIS_ENCODING_RAW) {
4213 vector[j].u.score = strtod(byval->ptr,NULL);
4214 } else {
4215 if (byval->encoding == REDIS_ENCODING_INT) {
4216 vector[j].u.score = (long)byval->ptr;
4217 } else
4218 assert(1 != 1);
4219 }
4220 }
4221 } else {
4222 if (!alpha) {
4223 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4224 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4225 else {
4226 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4227 vector[j].u.score = (long) vector[j].obj->ptr;
4228 else
4229 assert(1 != 1);
4230 }
4231 }
4232 }
4233 }
4234 }
4235
4236 /* We are ready to sort the vector... perform a bit of sanity check
4237 * on the LIMIT option too. We'll use a partial version of quicksort. */
4238 start = (limit_start < 0) ? 0 : limit_start;
4239 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4240 if (start >= vectorlen) {
4241 start = vectorlen-1;
4242 end = vectorlen-2;
4243 }
4244 if (end >= vectorlen) end = vectorlen-1;
4245
4246 if (dontsort == 0) {
4247 server.sort_desc = desc;
4248 server.sort_alpha = alpha;
4249 server.sort_bypattern = sortby ? 1 : 0;
4250 if (sortby && (start != 0 || end != vectorlen-1))
4251 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4252 else
4253 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4254 }
4255
4256 /* Send command output to the output buffer, performing the specified
4257 * GET/DEL/INCR/DECR operations if any. */
4258 outputlen = getop ? getop*(end-start+1) : end-start+1;
4259 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4260 for (j = start; j <= end; j++) {
4261 listNode *ln;
4262 if (!getop) {
4263 addReplyBulkLen(c,vector[j].obj);
4264 addReply(c,vector[j].obj);
4265 addReply(c,shared.crlf);
4266 }
4267 listRewind(operations);
4268 while((ln = listYield(operations))) {
4269 redisSortOperation *sop = ln->value;
4270 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4271 vector[j].obj);
4272
4273 if (sop->type == REDIS_SORT_GET) {
4274 if (!val || val->type != REDIS_STRING) {
4275 addReply(c,shared.nullbulk);
4276 } else {
4277 addReplyBulkLen(c,val);
4278 addReply(c,val);
4279 addReply(c,shared.crlf);
4280 }
4281 } else if (sop->type == REDIS_SORT_DEL) {
4282 /* TODO */
4283 }
4284 }
4285 }
4286
4287 /* Cleanup */
4288 decrRefCount(sortval);
4289 listRelease(operations);
4290 for (j = 0; j < vectorlen; j++) {
4291 if (sortby && alpha && vector[j].u.cmpobj)
4292 decrRefCount(vector[j].u.cmpobj);
4293 }
4294 zfree(vector);
4295 }
4296
4297 static void infoCommand(redisClient *c) {
4298 sds info;
4299 time_t uptime = time(NULL)-server.stat_starttime;
4300 int j;
4301
4302 info = sdscatprintf(sdsempty(),
4303 "redis_version:%s\r\n"
4304 "arch_bits:%s\r\n"
4305 "uptime_in_seconds:%d\r\n"
4306 "uptime_in_days:%d\r\n"
4307 "connected_clients:%d\r\n"
4308 "connected_slaves:%d\r\n"
4309 "used_memory:%zu\r\n"
4310 "changes_since_last_save:%lld\r\n"
4311 "bgsave_in_progress:%d\r\n"
4312 "last_save_time:%d\r\n"
4313 "total_connections_received:%lld\r\n"
4314 "total_commands_processed:%lld\r\n"
4315 "role:%s\r\n"
4316 ,REDIS_VERSION,
4317 (sizeof(long) == 8) ? "64" : "32",
4318 uptime,
4319 uptime/(3600*24),
4320 listLength(server.clients)-listLength(server.slaves),
4321 listLength(server.slaves),
4322 server.usedmemory,
4323 server.dirty,
4324 server.bgsaveinprogress,
4325 server.lastsave,
4326 server.stat_numconnections,
4327 server.stat_numcommands,
4328 server.masterhost == NULL ? "master" : "slave"
4329 );
4330 if (server.masterhost) {
4331 info = sdscatprintf(info,
4332 "master_host:%s\r\n"
4333 "master_port:%d\r\n"
4334 "master_link_status:%s\r\n"
4335 "master_last_io_seconds_ago:%d\r\n"
4336 ,server.masterhost,
4337 server.masterport,
4338 (server.replstate == REDIS_REPL_CONNECTED) ?
4339 "up" : "down",
4340 (int)(time(NULL)-server.master->lastinteraction)
4341 );
4342 }
4343 for (j = 0; j < server.dbnum; j++) {
4344 long long keys, vkeys;
4345
4346 keys = dictSize(server.db[j].dict);
4347 vkeys = dictSize(server.db[j].expires);
4348 if (keys || vkeys) {
4349 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4350 j, keys, vkeys);
4351 }
4352 }
4353 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4354 addReplySds(c,info);
4355 addReply(c,shared.crlf);
4356 }
4357
4358 static void monitorCommand(redisClient *c) {
4359 /* ignore MONITOR if aleady slave or in monitor mode */
4360 if (c->flags & REDIS_SLAVE) return;
4361
4362 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4363 c->slaveseldb = 0;
4364 listAddNodeTail(server.monitors,c);
4365 addReply(c,shared.ok);
4366 }
4367
4368 /* ================================= Expire ================================= */
4369 static int removeExpire(redisDb *db, robj *key) {
4370 if (dictDelete(db->expires,key) == DICT_OK) {
4371 return 1;
4372 } else {
4373 return 0;
4374 }
4375 }
4376
4377 static int setExpire(redisDb *db, robj *key, time_t when) {
4378 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4379 return 0;
4380 } else {
4381 incrRefCount(key);
4382 return 1;
4383 }
4384 }
4385
4386 /* Return the expire time of the specified key, or -1 if no expire
4387 * is associated with this key (i.e. the key is non volatile) */
4388 static time_t getExpire(redisDb *db, robj *key) {
4389 dictEntry *de;
4390
4391 /* No expire? return ASAP */
4392 if (dictSize(db->expires) == 0 ||
4393 (de = dictFind(db->expires,key)) == NULL) return -1;
4394
4395 return (time_t) dictGetEntryVal(de);
4396 }
4397
4398 static int expireIfNeeded(redisDb *db, robj *key) {
4399 time_t when;
4400 dictEntry *de;
4401
4402 /* No expire? return ASAP */
4403 if (dictSize(db->expires) == 0 ||
4404 (de = dictFind(db->expires,key)) == NULL) return 0;
4405
4406 /* Lookup the expire */
4407 when = (time_t) dictGetEntryVal(de);
4408 if (time(NULL) <= when) return 0;
4409
4410 /* Delete the key */
4411 dictDelete(db->expires,key);
4412 return dictDelete(db->dict,key) == DICT_OK;
4413 }
4414
4415 static int deleteIfVolatile(redisDb *db, robj *key) {
4416 dictEntry *de;
4417
4418 /* No expire? return ASAP */
4419 if (dictSize(db->expires) == 0 ||
4420 (de = dictFind(db->expires,key)) == NULL) return 0;
4421
4422 /* Delete the key */
4423 server.dirty++;
4424 dictDelete(db->expires,key);
4425 return dictDelete(db->dict,key) == DICT_OK;
4426 }
4427
4428 static void expireCommand(redisClient *c) {
4429 dictEntry *de;
4430 int seconds = atoi(c->argv[2]->ptr);
4431
4432 de = dictFind(c->db->dict,c->argv[1]);
4433 if (de == NULL) {
4434 addReply(c,shared.czero);
4435 return;
4436 }
4437 if (seconds <= 0) {
4438 addReply(c, shared.czero);
4439 return;
4440 } else {
4441 time_t when = time(NULL)+seconds;
4442 if (setExpire(c->db,c->argv[1],when)) {
4443 addReply(c,shared.cone);
4444 server.dirty++;
4445 } else {
4446 addReply(c,shared.czero);
4447 }
4448 return;
4449 }
4450 }
4451
4452 static void ttlCommand(redisClient *c) {
4453 time_t expire;
4454 int ttl = -1;
4455
4456 expire = getExpire(c->db,c->argv[1]);
4457 if (expire != -1) {
4458 ttl = (int) (expire-time(NULL));
4459 if (ttl < 0) ttl = -1;
4460 }
4461 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4462 }
4463
4464 static void msetGenericCommand(redisClient *c, int nx) {
4465 int j;
4466
4467 if ((c->argc % 2) == 0) {
4468 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4469 return;
4470 }
4471 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4472 * set nothing at all if at least one already key exists. */
4473 if (nx) {
4474 for (j = 1; j < c->argc; j += 2) {
4475 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4476 addReply(c, shared.czero);
4477 return;
4478 }
4479 }
4480 }
4481
4482 for (j = 1; j < c->argc; j += 2) {
4483 int retval;
4484
4485 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4486 if (retval == DICT_ERR) {
4487 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4488 incrRefCount(c->argv[j+1]);
4489 } else {
4490 incrRefCount(c->argv[j]);
4491 incrRefCount(c->argv[j+1]);
4492 }
4493 removeExpire(c->db,c->argv[j]);
4494 }
4495 server.dirty += (c->argc-1)/2;
4496 addReply(c, nx ? shared.cone : shared.ok);
4497 }
4498
4499 static void msetCommand(redisClient *c) {
4500 msetGenericCommand(c,0);
4501 }
4502
4503 static void msetnxCommand(redisClient *c) {
4504 msetGenericCommand(c,1);
4505 }
4506
4507 /* =============================== Replication ============================= */
4508
4509 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4510 ssize_t nwritten, ret = size;
4511 time_t start = time(NULL);
4512
4513 timeout++;
4514 while(size) {
4515 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4516 nwritten = write(fd,ptr,size);
4517 if (nwritten == -1) return -1;
4518 ptr += nwritten;
4519 size -= nwritten;
4520 }
4521 if ((time(NULL)-start) > timeout) {
4522 errno = ETIMEDOUT;
4523 return -1;
4524 }
4525 }
4526 return ret;
4527 }
4528
4529 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4530 ssize_t nread, totread = 0;
4531 time_t start = time(NULL);
4532
4533 timeout++;
4534 while(size) {
4535 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4536 nread = read(fd,ptr,size);
4537 if (nread == -1) return -1;
4538 ptr += nread;
4539 size -= nread;
4540 totread += nread;
4541 }
4542 if ((time(NULL)-start) > timeout) {
4543 errno = ETIMEDOUT;
4544 return -1;
4545 }
4546 }
4547 return totread;
4548 }
4549
4550 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4551 ssize_t nread = 0;
4552
4553 size--;
4554 while(size) {
4555 char c;
4556
4557 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4558 if (c == '\n') {
4559 *ptr = '\0';
4560 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4561 return nread;
4562 } else {
4563 *ptr++ = c;
4564 *ptr = '\0';
4565 nread++;
4566 }
4567 }
4568 return nread;
4569 }
4570
4571 static void syncCommand(redisClient *c) {
4572 /* ignore SYNC if aleady slave or in monitor mode */
4573 if (c->flags & REDIS_SLAVE) return;
4574
4575 /* SYNC can't be issued when the server has pending data to send to
4576 * the client about already issued commands. We need a fresh reply
4577 * buffer registering the differences between the BGSAVE and the current
4578 * dataset, so that we can copy to other slaves if needed. */
4579 if (listLength(c->reply) != 0) {
4580 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4581 return;
4582 }
4583
4584 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4585 /* Here we need to check if there is a background saving operation
4586 * in progress, or if it is required to start one */
4587 if (server.bgsaveinprogress) {
4588 /* Ok a background save is in progress. Let's check if it is a good
4589 * one for replication, i.e. if there is another slave that is
4590 * registering differences since the server forked to save */
4591 redisClient *slave;
4592 listNode *ln;
4593
4594 listRewind(server.slaves);
4595 while((ln = listYield(server.slaves))) {
4596 slave = ln->value;
4597 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4598 }
4599 if (ln) {
4600 /* Perfect, the server is already registering differences for
4601 * another slave. Set the right state, and copy the buffer. */
4602 listRelease(c->reply);
4603 c->reply = listDup(slave->reply);
4604 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4605 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4606 } else {
4607 /* No way, we need to wait for the next BGSAVE in order to
4608 * register differences */
4609 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4610 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4611 }
4612 } else {
4613 /* Ok we don't have a BGSAVE in progress, let's start one */
4614 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4615 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4616 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4617 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4618 return;
4619 }
4620 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4621 }
4622 c->repldbfd = -1;
4623 c->flags |= REDIS_SLAVE;
4624 c->slaveseldb = 0;
4625 listAddNodeTail(server.slaves,c);
4626 return;
4627 }
4628
4629 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4630 redisClient *slave = privdata;
4631 REDIS_NOTUSED(el);
4632 REDIS_NOTUSED(mask);
4633 char buf[REDIS_IOBUF_LEN];
4634 ssize_t nwritten, buflen;
4635
4636 if (slave->repldboff == 0) {
4637 /* Write the bulk write count before to transfer the DB. In theory here
4638 * we don't know how much room there is in the output buffer of the
4639 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4640 * operations) will never be smaller than the few bytes we need. */
4641 sds bulkcount;
4642
4643 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4644 slave->repldbsize);
4645 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4646 {
4647 sdsfree(bulkcount);
4648 freeClient(slave);
4649 return;
4650 }
4651 sdsfree(bulkcount);
4652 }
4653 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4654 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4655 if (buflen <= 0) {
4656 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4657 (buflen == 0) ? "premature EOF" : strerror(errno));
4658 freeClient(slave);
4659 return;
4660 }
4661 if ((nwritten = write(fd,buf,buflen)) == -1) {
4662 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4663 strerror(errno));
4664 freeClient(slave);
4665 return;
4666 }
4667 slave->repldboff += nwritten;
4668 if (slave->repldboff == slave->repldbsize) {
4669 close(slave->repldbfd);
4670 slave->repldbfd = -1;
4671 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4672 slave->replstate = REDIS_REPL_ONLINE;
4673 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4674 sendReplyToClient, slave, NULL) == AE_ERR) {
4675 freeClient(slave);
4676 return;
4677 }
4678 addReplySds(slave,sdsempty());
4679 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4680 }
4681 }
4682
4683 /* This function is called at the end of every backgrond saving.
4684 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4685 * otherwise REDIS_ERR is passed to the function.
4686 *
4687 * The goal of this function is to handle slaves waiting for a successful
4688 * background saving in order to perform non-blocking synchronization. */
4689 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4690 listNode *ln;
4691 int startbgsave = 0;
4692
4693 listRewind(server.slaves);
4694 while((ln = listYield(server.slaves))) {
4695 redisClient *slave = ln->value;
4696
4697 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4698 startbgsave = 1;
4699 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4700 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4701 struct redis_stat buf;
4702
4703 if (bgsaveerr != REDIS_OK) {
4704 freeClient(slave);
4705 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4706 continue;
4707 }
4708 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4709 redis_fstat(slave->repldbfd,&buf) == -1) {
4710 freeClient(slave);
4711 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4712 continue;
4713 }
4714 slave->repldboff = 0;
4715 slave->repldbsize = buf.st_size;
4716 slave->replstate = REDIS_REPL_SEND_BULK;
4717 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4718 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4719 freeClient(slave);
4720 continue;
4721 }
4722 }
4723 }
4724 if (startbgsave) {
4725 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4726 listRewind(server.slaves);
4727 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4728 while((ln = listYield(server.slaves))) {
4729 redisClient *slave = ln->value;
4730
4731 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4732 freeClient(slave);
4733 }
4734 }
4735 }
4736 }
4737
4738 static int syncWithMaster(void) {
4739 char buf[1024], tmpfile[256];
4740 int dumpsize;
4741 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4742 int dfd;
4743
4744 if (fd == -1) {
4745 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4746 strerror(errno));
4747 return REDIS_ERR;
4748 }
4749 /* Issue the SYNC command */
4750 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4751 close(fd);
4752 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4753 strerror(errno));
4754 return REDIS_ERR;
4755 }
4756 /* Read the bulk write count */
4757 if (syncReadLine(fd,buf,1024,3600) == -1) {
4758 close(fd);
4759 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
4760 strerror(errno));
4761 return REDIS_ERR;
4762 }
4763 dumpsize = atoi(buf+1);
4764 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4765 /* Read the bulk write data on a temp file */
4766 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4767 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4768 if (dfd == -1) {
4769 close(fd);
4770 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
4771 return REDIS_ERR;
4772 }
4773 while(dumpsize) {
4774 int nread, nwritten;
4775
4776 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
4777 if (nread == -1) {
4778 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
4779 strerror(errno));
4780 close(fd);
4781 close(dfd);
4782 return REDIS_ERR;
4783 }
4784 nwritten = write(dfd,buf,nread);
4785 if (nwritten == -1) {
4786 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
4787 close(fd);
4788 close(dfd);
4789 return REDIS_ERR;
4790 }
4791 dumpsize -= nread;
4792 }
4793 close(dfd);
4794 if (rename(tmpfile,server.dbfilename) == -1) {
4795 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
4796 unlink(tmpfile);
4797 close(fd);
4798 return REDIS_ERR;
4799 }
4800 emptyDb();
4801 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4802 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4803 close(fd);
4804 return REDIS_ERR;
4805 }
4806 server.master = createClient(fd);
4807 server.master->flags |= REDIS_MASTER;
4808 server.replstate = REDIS_REPL_CONNECTED;
4809 return REDIS_OK;
4810 }
4811
4812 static void slaveofCommand(redisClient *c) {
4813 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4814 !strcasecmp(c->argv[2]->ptr,"one")) {
4815 if (server.masterhost) {
4816 sdsfree(server.masterhost);
4817 server.masterhost = NULL;
4818 if (server.master) freeClient(server.master);
4819 server.replstate = REDIS_REPL_NONE;
4820 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4821 }
4822 } else {
4823 sdsfree(server.masterhost);
4824 server.masterhost = sdsdup(c->argv[1]->ptr);
4825 server.masterport = atoi(c->argv[2]->ptr);
4826 if (server.master) freeClient(server.master);
4827 server.replstate = REDIS_REPL_CONNECT;
4828 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4829 server.masterhost, server.masterport);
4830 }
4831 addReply(c,shared.ok);
4832 }
4833
4834 /* ============================ Maxmemory directive ======================== */
4835
4836 /* This function gets called when 'maxmemory' is set on the config file to limit
4837 * the max memory used by the server, and we are out of memory.
4838 * This function will try to, in order:
4839 *
4840 * - Free objects from the free list
4841 * - Try to remove keys with an EXPIRE set
4842 *
4843 * It is not possible to free enough memory to reach used-memory < maxmemory
4844 * the server will start refusing commands that will enlarge even more the
4845 * memory usage.
4846 */
4847 static void freeMemoryIfNeeded(void) {
4848 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4849 if (listLength(server.objfreelist)) {
4850 robj *o;
4851
4852 listNode *head = listFirst(server.objfreelist);
4853 o = listNodeValue(head);
4854 listDelNode(server.objfreelist,head);
4855 zfree(o);
4856 } else {
4857 int j, k, freed = 0;
4858
4859 for (j = 0; j < server.dbnum; j++) {
4860 int minttl = -1;
4861 robj *minkey = NULL;
4862 struct dictEntry *de;
4863
4864 if (dictSize(server.db[j].expires)) {
4865 freed = 1;
4866 /* From a sample of three keys drop the one nearest to
4867 * the natural expire */
4868 for (k = 0; k < 3; k++) {
4869 time_t t;
4870
4871 de = dictGetRandomKey(server.db[j].expires);
4872 t = (time_t) dictGetEntryVal(de);
4873 if (minttl == -1 || t < minttl) {
4874 minkey = dictGetEntryKey(de);
4875 minttl = t;
4876 }
4877 }
4878 deleteKey(server.db+j,minkey);
4879 }
4880 }
4881 if (!freed) return; /* nothing to free... */
4882 }
4883 }
4884 }
4885
4886 /* ================================= Debugging ============================== */
4887
4888 static void debugCommand(redisClient *c) {
4889 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4890 *((char*)-1) = 'x';
4891 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4892 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4893 robj *key, *val;
4894
4895 if (!de) {
4896 addReply(c,shared.nokeyerr);
4897 return;
4898 }
4899 key = dictGetEntryKey(de);
4900 val = dictGetEntryVal(de);
4901 addReplySds(c,sdscatprintf(sdsempty(),
4902 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4903 key, key->refcount, val, val->refcount, val->encoding));
4904 } else {
4905 addReplySds(c,sdsnew(
4906 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4907 }
4908 }
4909
4910 #ifdef HAVE_BACKTRACE
4911 static struct redisFunctionSym symsTable[] = {
4912 {"compareStringObjects", (unsigned long)compareStringObjects},
4913 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
4914 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
4915 {"dictEncObjHash", (unsigned long)dictEncObjHash},
4916 {"incrDecrCommand", (unsigned long)incrDecrCommand},
4917 {"freeStringObject", (unsigned long)freeStringObject},
4918 {"freeListObject", (unsigned long)freeListObject},
4919 {"freeSetObject", (unsigned long)freeSetObject},
4920 {"decrRefCount", (unsigned long)decrRefCount},
4921 {"createObject", (unsigned long)createObject},
4922 {"freeClient", (unsigned long)freeClient},
4923 {"rdbLoad", (unsigned long)rdbLoad},
4924 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
4925 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
4926 {"addReply", (unsigned long)addReply},
4927 {"addReplySds", (unsigned long)addReplySds},
4928 {"incrRefCount", (unsigned long)incrRefCount},
4929 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
4930 {"createStringObject", (unsigned long)createStringObject},
4931 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
4932 {"syncWithMaster", (unsigned long)syncWithMaster},
4933 {"tryObjectSharing", (unsigned long)tryObjectSharing},
4934 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
4935 {"getDecodedObject", (unsigned long)getDecodedObject},
4936 {"removeExpire", (unsigned long)removeExpire},
4937 {"expireIfNeeded", (unsigned long)expireIfNeeded},
4938 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
4939 {"deleteKey", (unsigned long)deleteKey},
4940 {"getExpire", (unsigned long)getExpire},
4941 {"setExpire", (unsigned long)setExpire},
4942 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
4943 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
4944 {"authCommand", (unsigned long)authCommand},
4945 {"pingCommand", (unsigned long)pingCommand},
4946 {"echoCommand", (unsigned long)echoCommand},
4947 {"setCommand", (unsigned long)setCommand},
4948 {"setnxCommand", (unsigned long)setnxCommand},
4949 {"getCommand", (unsigned long)getCommand},
4950 {"delCommand", (unsigned long)delCommand},
4951 {"existsCommand", (unsigned long)existsCommand},
4952 {"incrCommand", (unsigned long)incrCommand},
4953 {"decrCommand", (unsigned long)decrCommand},
4954 {"incrbyCommand", (unsigned long)incrbyCommand},
4955 {"decrbyCommand", (unsigned long)decrbyCommand},
4956 {"selectCommand", (unsigned long)selectCommand},
4957 {"randomkeyCommand", (unsigned long)randomkeyCommand},
4958 {"keysCommand", (unsigned long)keysCommand},
4959 {"dbsizeCommand", (unsigned long)dbsizeCommand},
4960 {"lastsaveCommand", (unsigned long)lastsaveCommand},
4961 {"saveCommand", (unsigned long)saveCommand},
4962 {"bgsaveCommand", (unsigned long)bgsaveCommand},
4963 {"shutdownCommand", (unsigned long)shutdownCommand},
4964 {"moveCommand", (unsigned long)moveCommand},
4965 {"renameCommand", (unsigned long)renameCommand},
4966 {"renamenxCommand", (unsigned long)renamenxCommand},
4967 {"lpushCommand", (unsigned long)lpushCommand},
4968 {"rpushCommand", (unsigned long)rpushCommand},
4969 {"lpopCommand", (unsigned long)lpopCommand},
4970 {"rpopCommand", (unsigned long)rpopCommand},
4971 {"llenCommand", (unsigned long)llenCommand},
4972 {"lindexCommand", (unsigned long)lindexCommand},
4973 {"lrangeCommand", (unsigned long)lrangeCommand},
4974 {"ltrimCommand", (unsigned long)ltrimCommand},
4975 {"typeCommand", (unsigned long)typeCommand},
4976 {"lsetCommand", (unsigned long)lsetCommand},
4977 {"saddCommand", (unsigned long)saddCommand},
4978 {"sremCommand", (unsigned long)sremCommand},
4979 {"smoveCommand", (unsigned long)smoveCommand},
4980 {"sismemberCommand", (unsigned long)sismemberCommand},
4981 {"scardCommand", (unsigned long)scardCommand},
4982 {"spopCommand", (unsigned long)spopCommand},
4983 {"srandmemberCommand", (unsigned long)srandmemberCommand},
4984 {"sinterCommand", (unsigned long)sinterCommand},
4985 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
4986 {"sunionCommand", (unsigned long)sunionCommand},
4987 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
4988 {"sdiffCommand", (unsigned long)sdiffCommand},
4989 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
4990 {"syncCommand", (unsigned long)syncCommand},
4991 {"flushdbCommand", (unsigned long)flushdbCommand},
4992 {"flushallCommand", (unsigned long)flushallCommand},
4993 {"sortCommand", (unsigned long)sortCommand},
4994 {"lremCommand", (unsigned long)lremCommand},
4995 {"infoCommand", (unsigned long)infoCommand},
4996 {"mgetCommand", (unsigned long)mgetCommand},
4997 {"monitorCommand", (unsigned long)monitorCommand},
4998 {"expireCommand", (unsigned long)expireCommand},
4999 {"getsetCommand", (unsigned long)getsetCommand},
5000 {"ttlCommand", (unsigned long)ttlCommand},
5001 {"slaveofCommand", (unsigned long)slaveofCommand},
5002 {"debugCommand", (unsigned long)debugCommand},
5003 {"processCommand", (unsigned long)processCommand},
5004 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
5005 {"readQueryFromClient", (unsigned long)readQueryFromClient},
5006 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
5007 {"msetGenericCommand", (unsigned long)msetGenericCommand},
5008 {"msetCommand", (unsigned long)msetCommand},
5009 {"msetnxCommand", (unsigned long)msetnxCommand},
5010 {"zslCreateNode", (unsigned long)zslCreateNode},
5011 {"zslCreate", (unsigned long)zslCreate},
5012 {"zslFreeNode",(unsigned long)zslFreeNode},
5013 {"zslFree",(unsigned long)zslFree},
5014 {"zslRandomLevel",(unsigned long)zslRandomLevel},
5015 {"zslInsert",(unsigned long)zslInsert},
5016 {"zslDelete",(unsigned long)zslDelete},
5017 {"createZsetObject",(unsigned long)createZsetObject},
5018 {"zaddCommand",(unsigned long)zaddCommand},
5019 {"zrangeCommand",(unsigned long)zrangeCommand},
5020 {"zremCommand",(unsigned long)zremCommand},
5021 {NULL,0}
5022 };
5023
5024 /* This function try to convert a pointer into a function name. It's used in
5025 * oreder to provide a backtrace under segmentation fault that's able to
5026 * display functions declared as static (otherwise the backtrace is useless). */
5027 static char *findFuncName(void *pointer, unsigned long *offset){
5028 int i, ret = -1;
5029 unsigned long off, minoff = 0;
5030
5031 /* Try to match against the Symbol with the smallest offset */
5032 for (i=0; symsTable[i].pointer; i++) {
5033 unsigned long lp = (unsigned long) pointer;
5034
5035 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5036 off=lp-symsTable[i].pointer;
5037 if (ret < 0 || off < minoff) {
5038 minoff=off;
5039 ret=i;
5040 }
5041 }
5042 }
5043 if (ret == -1) return NULL;
5044 *offset = minoff;
5045 return symsTable[ret].name;
5046 }
5047
5048 static void *getMcontextEip(ucontext_t *uc) {
5049 #if defined(__FreeBSD__)
5050 return (void*) uc->uc_mcontext.mc_eip;
5051 #elif defined(__dietlibc__)
5052 return (void*) uc->uc_mcontext.eip;
5053 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5054 return (void*) uc->uc_mcontext->__ss.__eip;
5055 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5056 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5057 return (void*) uc->uc_mcontext->__ss.__rip;
5058 #else
5059 return (void*) uc->uc_mcontext->__ss.__eip;
5060 #endif
5061 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5062 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5063 #elif defined(__ia64__) /* Linux IA64 */
5064 return (void*) uc->uc_mcontext.sc_ip;
5065 #else
5066 return NULL;
5067 #endif
5068 }
5069
5070 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5071 void *trace[100];
5072 char **messages = NULL;
5073 int i, trace_size = 0;
5074 unsigned long offset=0;
5075 time_t uptime = time(NULL)-server.stat_starttime;
5076 ucontext_t *uc = (ucontext_t*) secret;
5077 REDIS_NOTUSED(info);
5078
5079 redisLog(REDIS_WARNING,
5080 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5081 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5082 "redis_version:%s; "
5083 "uptime_in_seconds:%d; "
5084 "connected_clients:%d; "
5085 "connected_slaves:%d; "
5086 "used_memory:%zu; "
5087 "changes_since_last_save:%lld; "
5088 "bgsave_in_progress:%d; "
5089 "last_save_time:%d; "
5090 "total_connections_received:%lld; "
5091 "total_commands_processed:%lld; "
5092 "role:%s;"
5093 ,REDIS_VERSION,
5094 uptime,
5095 listLength(server.clients)-listLength(server.slaves),
5096 listLength(server.slaves),
5097 server.usedmemory,
5098 server.dirty,
5099 server.bgsaveinprogress,
5100 server.lastsave,
5101 server.stat_numconnections,
5102 server.stat_numcommands,
5103 server.masterhost == NULL ? "master" : "slave"
5104 ));
5105
5106 trace_size = backtrace(trace, 100);
5107 /* overwrite sigaction with caller's address */
5108 if (getMcontextEip(uc) != NULL) {
5109 trace[1] = getMcontextEip(uc);
5110 }
5111 messages = backtrace_symbols(trace, trace_size);
5112
5113 for (i=1; i<trace_size; ++i) {
5114 char *fn = findFuncName(trace[i], &offset), *p;
5115
5116 p = strchr(messages[i],'+');
5117 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5118 redisLog(REDIS_WARNING,"%s", messages[i]);
5119 } else {
5120 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5121 }
5122 }
5123 free(messages);
5124 exit(0);
5125 }
5126
5127 static void setupSigSegvAction(void) {
5128 struct sigaction act;
5129
5130 sigemptyset (&act.sa_mask);
5131 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5132 * is used. Otherwise, sa_handler is used */
5133 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5134 act.sa_sigaction = segvHandler;
5135 sigaction (SIGSEGV, &act, NULL);
5136 sigaction (SIGBUS, &act, NULL);
5137 sigaction (SIGFPE, &act, NULL);
5138 sigaction (SIGILL, &act, NULL);
5139 sigaction (SIGBUS, &act, NULL);
5140 return;
5141 }
5142 #else /* HAVE_BACKTRACE */
5143 static void setupSigSegvAction(void) {
5144 }
5145 #endif /* HAVE_BACKTRACE */
5146
5147 /* =================================== Main! ================================ */
5148
5149 #ifdef __linux__
5150 int linuxOvercommitMemoryValue(void) {
5151 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5152 char buf[64];
5153
5154 if (!fp) return -1;
5155 if (fgets(buf,64,fp) == NULL) {
5156 fclose(fp);
5157 return -1;
5158 }
5159 fclose(fp);
5160
5161 return atoi(buf);
5162 }
5163
5164 void linuxOvercommitMemoryWarning(void) {
5165 if (linuxOvercommitMemoryValue() == 0) {
5166 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5167 }
5168 }
5169 #endif /* __linux__ */
5170
5171 static void daemonize(void) {
5172 int fd;
5173 FILE *fp;
5174
5175 if (fork() != 0) exit(0); /* parent exits */
5176 setsid(); /* create a new session */
5177
5178 /* Every output goes to /dev/null. If Redis is daemonized but
5179 * the 'logfile' is set to 'stdout' in the configuration file
5180 * it will not log at all. */
5181 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5182 dup2(fd, STDIN_FILENO);
5183 dup2(fd, STDOUT_FILENO);
5184 dup2(fd, STDERR_FILENO);
5185 if (fd > STDERR_FILENO) close(fd);
5186 }
5187 /* Try to write the pid file */
5188 fp = fopen(server.pidfile,"w");
5189 if (fp) {
5190 fprintf(fp,"%d\n",getpid());
5191 fclose(fp);
5192 }
5193 }
5194
5195 int main(int argc, char **argv) {
5196 initServerConfig();
5197 if (argc == 2) {
5198 ResetServerSaveParams();
5199 loadServerConfig(argv[1]);
5200 } else if (argc > 2) {
5201 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5202 exit(1);
5203 } else {
5204 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5205 }
5206 initServer();
5207 if (server.daemonize) daemonize();
5208 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5209 #ifdef __linux__
5210 linuxOvercommitMemoryWarning();
5211 #endif
5212 if (rdbLoad(server.dbfilename) == REDIS_OK)
5213 redisLog(REDIS_NOTICE,"DB loaded from disk");
5214 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5215 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5216 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5217 aeMain(server.el);
5218 aeDeleteEventLoop(server.el);
5219 return 0;
5220 }