]> git.saurik.com Git - redis.git/blob - redis.c
ZSET now saved on disk like any other type
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60 #include <math.h>
61
62 #include "redis.h"
63 #include "ae.h" /* Event driven programming library */
64 #include "sds.h" /* Dynamic safe strings */
65 #include "anet.h" /* Networking the easy way */
66 #include "dict.h" /* Hash tables */
67 #include "adlist.h" /* Linked lists */
68 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
69 #include "lzf.h" /* LZF compression library */
70 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
71
72 /* Error codes */
73 #define REDIS_OK 0
74 #define REDIS_ERR -1
75
76 /* Static server configuration */
77 #define REDIS_SERVERPORT 6379 /* TCP port */
78 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
79 #define REDIS_IOBUF_LEN 1024
80 #define REDIS_LOADBUF_LEN 1024
81 #define REDIS_STATIC_ARGS 4
82 #define REDIS_DEFAULT_DBNUM 16
83 #define REDIS_CONFIGLINE_MAX 1024
84 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
85 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
86 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
87 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
88 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
89
90 /* Hash table parameters */
91 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
92
93 /* Command flags */
94 #define REDIS_CMD_BULK 1 /* Bulk write command */
95 #define REDIS_CMD_INLINE 2 /* Inline command */
96 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
97 this flags will return an error when the 'maxmemory' option is set in the
98 config file and the server is using more than maxmemory bytes of memory.
99 In short this commands are denied on low memory conditions. */
100 #define REDIS_CMD_DENYOOM 4
101
102 /* Object types */
103 #define REDIS_STRING 0
104 #define REDIS_LIST 1
105 #define REDIS_SET 2
106 #define REDIS_ZSET 3
107 #define REDIS_HASH 4
108
109 /* Objects encoding */
110 #define REDIS_ENCODING_RAW 0 /* Raw representation */
111 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
112
113 /* Object types only used for dumping to disk */
114 #define REDIS_EXPIRETIME 253
115 #define REDIS_SELECTDB 254
116 #define REDIS_EOF 255
117
118 /* Defines related to the dump file format. To store 32 bits lengths for short
119 * keys requires a lot of space, so we check the most significant 2 bits of
120 * the first byte to interpreter the length:
121 *
122 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
123 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
124 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
125 * 11|000000 this means: specially encoded object will follow. The six bits
126 * number specify the kind of object that follows.
127 * See the REDIS_RDB_ENC_* defines.
128 *
129 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
130 * values, will fit inside. */
131 #define REDIS_RDB_6BITLEN 0
132 #define REDIS_RDB_14BITLEN 1
133 #define REDIS_RDB_32BITLEN 2
134 #define REDIS_RDB_ENCVAL 3
135 #define REDIS_RDB_LENERR UINT_MAX
136
137 /* When a length of a string object stored on disk has the first two bits
138 * set, the remaining two bits specify a special encoding for the object
139 * accordingly to the following defines: */
140 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
141 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
142 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
143 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
144
145 /* Client flags */
146 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
147 #define REDIS_SLAVE 2 /* This client is a slave server */
148 #define REDIS_MASTER 4 /* This client is a master server */
149 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
150
151 /* Slave replication state - slave side */
152 #define REDIS_REPL_NONE 0 /* No active replication */
153 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
154 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
155
156 /* Slave replication state - from the point of view of master
157 * Note that in SEND_BULK and ONLINE state the slave receives new updates
158 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
159 * to start the next background saving in order to send updates to it. */
160 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
161 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
162 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
163 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
164
165 /* List related stuff */
166 #define REDIS_HEAD 0
167 #define REDIS_TAIL 1
168
169 /* Sort operations */
170 #define REDIS_SORT_GET 0
171 #define REDIS_SORT_DEL 1
172 #define REDIS_SORT_INCR 2
173 #define REDIS_SORT_DECR 3
174 #define REDIS_SORT_ASC 4
175 #define REDIS_SORT_DESC 5
176 #define REDIS_SORTKEY_MAX 1024
177
178 /* Log levels */
179 #define REDIS_DEBUG 0
180 #define REDIS_NOTICE 1
181 #define REDIS_WARNING 2
182
183 /* Anti-warning macro... */
184 #define REDIS_NOTUSED(V) ((void) V)
185
186 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
187 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
188
189 /*================================= Data types ============================== */
190
191 /* A redis object, that is a type able to hold a string / list / set */
192 typedef struct redisObject {
193 void *ptr;
194 unsigned char type;
195 unsigned char encoding;
196 unsigned char notused[2];
197 int refcount;
198 } robj;
199
200 typedef struct redisDb {
201 dict *dict;
202 dict *expires;
203 int id;
204 } redisDb;
205
206 /* With multiplexing we need to take per-clinet state.
207 * Clients are taken in a liked list. */
208 typedef struct redisClient {
209 int fd;
210 redisDb *db;
211 int dictid;
212 sds querybuf;
213 robj **argv, **mbargv;
214 int argc, mbargc;
215 int bulklen; /* bulk read len. -1 if not in bulk read mode */
216 int multibulk; /* multi bulk command format active */
217 list *reply;
218 int sentlen;
219 time_t lastinteraction; /* time of the last interaction, used for timeout */
220 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
221 int slaveseldb; /* slave selected db, if this client is a slave */
222 int authenticated; /* when requirepass is non-NULL */
223 int replstate; /* replication state if this is a slave */
224 int repldbfd; /* replication DB file descriptor */
225 long repldboff; /* replication DB file offset */
226 off_t repldbsize; /* replication DB file size */
227 } redisClient;
228
229 struct saveparam {
230 time_t seconds;
231 int changes;
232 };
233
234 /* Global server state structure */
235 struct redisServer {
236 int port;
237 int fd;
238 redisDb *db;
239 dict *sharingpool;
240 unsigned int sharingpoolsize;
241 long long dirty; /* changes to DB from the last save */
242 list *clients;
243 list *slaves, *monitors;
244 char neterr[ANET_ERR_LEN];
245 aeEventLoop *el;
246 int cronloops; /* number of times the cron function run */
247 list *objfreelist; /* A list of freed objects to avoid malloc() */
248 time_t lastsave; /* Unix time of last save succeeede */
249 size_t usedmemory; /* Used memory in megabytes */
250 /* Fields used only for stats */
251 time_t stat_starttime; /* server start time */
252 long long stat_numcommands; /* number of processed commands */
253 long long stat_numconnections; /* number of connections received */
254 /* Configuration */
255 int verbosity;
256 int glueoutputbuf;
257 int maxidletime;
258 int dbnum;
259 int daemonize;
260 char *pidfile;
261 int bgsaveinprogress;
262 pid_t bgsavechildpid;
263 struct saveparam *saveparams;
264 int saveparamslen;
265 char *logfile;
266 char *bindaddr;
267 char *dbfilename;
268 char *requirepass;
269 int shareobjects;
270 /* Replication related */
271 int isslave;
272 char *masterhost;
273 int masterport;
274 redisClient *master; /* client that is master for this slave */
275 int replstate;
276 unsigned int maxclients;
277 unsigned long maxmemory;
278 /* Sort parameters - qsort_r() is only available under BSD so we
279 * have to take this state global, in order to pass it to sortCompare() */
280 int sort_desc;
281 int sort_alpha;
282 int sort_bypattern;
283 };
284
285 typedef void redisCommandProc(redisClient *c);
286 struct redisCommand {
287 char *name;
288 redisCommandProc *proc;
289 int arity;
290 int flags;
291 };
292
293 struct redisFunctionSym {
294 char *name;
295 unsigned long pointer;
296 };
297
298 typedef struct _redisSortObject {
299 robj *obj;
300 union {
301 double score;
302 robj *cmpobj;
303 } u;
304 } redisSortObject;
305
306 typedef struct _redisSortOperation {
307 int type;
308 robj *pattern;
309 } redisSortOperation;
310
311 /* ZSETs use a specialized version of Skiplists */
312
313 typedef struct zskiplistNode {
314 struct zskiplistNode **forward;
315 struct zskiplistNode *backward;
316 double score;
317 robj *obj;
318 } zskiplistNode;
319
320 typedef struct zskiplist {
321 struct zskiplistNode *header, *tail;
322 long length;
323 int level;
324 } zskiplist;
325
326 typedef struct zset {
327 dict *dict;
328 zskiplist *zsl;
329 } zset;
330
331 /* Our shared "common" objects */
332
333 struct sharedObjectsStruct {
334 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
335 *colon, *nullbulk, *nullmultibulk,
336 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
337 *outofrangeerr, *plus,
338 *select0, *select1, *select2, *select3, *select4,
339 *select5, *select6, *select7, *select8, *select9;
340 } shared;
341
342 /* Global vars that are actally used as constants. The following double
343 * values are used for double on-disk serialization, and are initialized
344 * at runtime to avoid strange compiler optimizations. */
345
346 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
347
348 /*================================ Prototypes =============================== */
349
350 static void freeStringObject(robj *o);
351 static void freeListObject(robj *o);
352 static void freeSetObject(robj *o);
353 static void decrRefCount(void *o);
354 static robj *createObject(int type, void *ptr);
355 static void freeClient(redisClient *c);
356 static int rdbLoad(char *filename);
357 static void addReply(redisClient *c, robj *obj);
358 static void addReplySds(redisClient *c, sds s);
359 static void incrRefCount(robj *o);
360 static int rdbSaveBackground(char *filename);
361 static robj *createStringObject(char *ptr, size_t len);
362 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
363 static int syncWithMaster(void);
364 static robj *tryObjectSharing(robj *o);
365 static int tryObjectEncoding(robj *o);
366 static robj *getDecodedObject(const robj *o);
367 static int removeExpire(redisDb *db, robj *key);
368 static int expireIfNeeded(redisDb *db, robj *key);
369 static int deleteIfVolatile(redisDb *db, robj *key);
370 static int deleteKey(redisDb *db, robj *key);
371 static time_t getExpire(redisDb *db, robj *key);
372 static int setExpire(redisDb *db, robj *key, time_t when);
373 static void updateSlavesWaitingBgsave(int bgsaveerr);
374 static void freeMemoryIfNeeded(void);
375 static int processCommand(redisClient *c);
376 static void setupSigSegvAction(void);
377 static void rdbRemoveTempFile(pid_t childpid);
378 static size_t stringObjectLen(robj *o);
379 static void processInputBuffer(redisClient *c);
380 static zskiplist *zslCreate(void);
381 static void zslFree(zskiplist *zsl);
382 static void zslInsert(zskiplist *zsl, double score, robj *obj);
383
384 static void authCommand(redisClient *c);
385 static void pingCommand(redisClient *c);
386 static void echoCommand(redisClient *c);
387 static void setCommand(redisClient *c);
388 static void setnxCommand(redisClient *c);
389 static void getCommand(redisClient *c);
390 static void delCommand(redisClient *c);
391 static void existsCommand(redisClient *c);
392 static void incrCommand(redisClient *c);
393 static void decrCommand(redisClient *c);
394 static void incrbyCommand(redisClient *c);
395 static void decrbyCommand(redisClient *c);
396 static void selectCommand(redisClient *c);
397 static void randomkeyCommand(redisClient *c);
398 static void keysCommand(redisClient *c);
399 static void dbsizeCommand(redisClient *c);
400 static void lastsaveCommand(redisClient *c);
401 static void saveCommand(redisClient *c);
402 static void bgsaveCommand(redisClient *c);
403 static void shutdownCommand(redisClient *c);
404 static void moveCommand(redisClient *c);
405 static void renameCommand(redisClient *c);
406 static void renamenxCommand(redisClient *c);
407 static void lpushCommand(redisClient *c);
408 static void rpushCommand(redisClient *c);
409 static void lpopCommand(redisClient *c);
410 static void rpopCommand(redisClient *c);
411 static void llenCommand(redisClient *c);
412 static void lindexCommand(redisClient *c);
413 static void lrangeCommand(redisClient *c);
414 static void ltrimCommand(redisClient *c);
415 static void typeCommand(redisClient *c);
416 static void lsetCommand(redisClient *c);
417 static void saddCommand(redisClient *c);
418 static void sremCommand(redisClient *c);
419 static void smoveCommand(redisClient *c);
420 static void sismemberCommand(redisClient *c);
421 static void scardCommand(redisClient *c);
422 static void spopCommand(redisClient *c);
423 static void srandmemberCommand(redisClient *c);
424 static void sinterCommand(redisClient *c);
425 static void sinterstoreCommand(redisClient *c);
426 static void sunionCommand(redisClient *c);
427 static void sunionstoreCommand(redisClient *c);
428 static void sdiffCommand(redisClient *c);
429 static void sdiffstoreCommand(redisClient *c);
430 static void syncCommand(redisClient *c);
431 static void flushdbCommand(redisClient *c);
432 static void flushallCommand(redisClient *c);
433 static void sortCommand(redisClient *c);
434 static void lremCommand(redisClient *c);
435 static void infoCommand(redisClient *c);
436 static void mgetCommand(redisClient *c);
437 static void monitorCommand(redisClient *c);
438 static void expireCommand(redisClient *c);
439 static void getsetCommand(redisClient *c);
440 static void ttlCommand(redisClient *c);
441 static void slaveofCommand(redisClient *c);
442 static void debugCommand(redisClient *c);
443 static void msetCommand(redisClient *c);
444 static void msetnxCommand(redisClient *c);
445 static void zaddCommand(redisClient *c);
446 static void zrangeCommand(redisClient *c);
447 static void zrevrangeCommand(redisClient *c);
448 static void zlenCommand(redisClient *c);
449 static void zremCommand(redisClient *c);
450
451 /*================================= Globals ================================= */
452
453 /* Global vars */
454 static struct redisServer server; /* server global state */
455 static struct redisCommand cmdTable[] = {
456 {"get",getCommand,2,REDIS_CMD_INLINE},
457 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
458 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
459 {"del",delCommand,-2,REDIS_CMD_INLINE},
460 {"exists",existsCommand,2,REDIS_CMD_INLINE},
461 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
462 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
463 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
464 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
465 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
466 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
467 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
468 {"llen",llenCommand,2,REDIS_CMD_INLINE},
469 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
470 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
471 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
472 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
473 {"lrem",lremCommand,4,REDIS_CMD_BULK},
474 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
475 {"srem",sremCommand,3,REDIS_CMD_BULK},
476 {"smove",smoveCommand,4,REDIS_CMD_BULK},
477 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
478 {"scard",scardCommand,2,REDIS_CMD_INLINE},
479 {"spop",spopCommand,2,REDIS_CMD_INLINE},
480 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
481 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
482 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
483 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
484 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
485 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
486 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
487 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
488 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
489 {"zrem",zremCommand,3,REDIS_CMD_BULK},
490 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
491 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
492 {"zlen",zlenCommand,2,REDIS_CMD_INLINE},
493 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
494 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
495 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
496 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
497 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
498 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
499 {"select",selectCommand,2,REDIS_CMD_INLINE},
500 {"move",moveCommand,3,REDIS_CMD_INLINE},
501 {"rename",renameCommand,3,REDIS_CMD_INLINE},
502 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
503 {"expire",expireCommand,3,REDIS_CMD_INLINE},
504 {"keys",keysCommand,2,REDIS_CMD_INLINE},
505 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
506 {"auth",authCommand,2,REDIS_CMD_INLINE},
507 {"ping",pingCommand,1,REDIS_CMD_INLINE},
508 {"echo",echoCommand,2,REDIS_CMD_BULK},
509 {"save",saveCommand,1,REDIS_CMD_INLINE},
510 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
511 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
512 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
513 {"type",typeCommand,2,REDIS_CMD_INLINE},
514 {"sync",syncCommand,1,REDIS_CMD_INLINE},
515 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
516 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
517 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
518 {"info",infoCommand,1,REDIS_CMD_INLINE},
519 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
520 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
521 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
522 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
523 {NULL,NULL,0,0}
524 };
525 /*============================ Utility functions ============================ */
526
527 /* Glob-style pattern matching. */
528 int stringmatchlen(const char *pattern, int patternLen,
529 const char *string, int stringLen, int nocase)
530 {
531 while(patternLen) {
532 switch(pattern[0]) {
533 case '*':
534 while (pattern[1] == '*') {
535 pattern++;
536 patternLen--;
537 }
538 if (patternLen == 1)
539 return 1; /* match */
540 while(stringLen) {
541 if (stringmatchlen(pattern+1, patternLen-1,
542 string, stringLen, nocase))
543 return 1; /* match */
544 string++;
545 stringLen--;
546 }
547 return 0; /* no match */
548 break;
549 case '?':
550 if (stringLen == 0)
551 return 0; /* no match */
552 string++;
553 stringLen--;
554 break;
555 case '[':
556 {
557 int not, match;
558
559 pattern++;
560 patternLen--;
561 not = pattern[0] == '^';
562 if (not) {
563 pattern++;
564 patternLen--;
565 }
566 match = 0;
567 while(1) {
568 if (pattern[0] == '\\') {
569 pattern++;
570 patternLen--;
571 if (pattern[0] == string[0])
572 match = 1;
573 } else if (pattern[0] == ']') {
574 break;
575 } else if (patternLen == 0) {
576 pattern--;
577 patternLen++;
578 break;
579 } else if (pattern[1] == '-' && patternLen >= 3) {
580 int start = pattern[0];
581 int end = pattern[2];
582 int c = string[0];
583 if (start > end) {
584 int t = start;
585 start = end;
586 end = t;
587 }
588 if (nocase) {
589 start = tolower(start);
590 end = tolower(end);
591 c = tolower(c);
592 }
593 pattern += 2;
594 patternLen -= 2;
595 if (c >= start && c <= end)
596 match = 1;
597 } else {
598 if (!nocase) {
599 if (pattern[0] == string[0])
600 match = 1;
601 } else {
602 if (tolower((int)pattern[0]) == tolower((int)string[0]))
603 match = 1;
604 }
605 }
606 pattern++;
607 patternLen--;
608 }
609 if (not)
610 match = !match;
611 if (!match)
612 return 0; /* no match */
613 string++;
614 stringLen--;
615 break;
616 }
617 case '\\':
618 if (patternLen >= 2) {
619 pattern++;
620 patternLen--;
621 }
622 /* fall through */
623 default:
624 if (!nocase) {
625 if (pattern[0] != string[0])
626 return 0; /* no match */
627 } else {
628 if (tolower((int)pattern[0]) != tolower((int)string[0]))
629 return 0; /* no match */
630 }
631 string++;
632 stringLen--;
633 break;
634 }
635 pattern++;
636 patternLen--;
637 if (stringLen == 0) {
638 while(*pattern == '*') {
639 pattern++;
640 patternLen--;
641 }
642 break;
643 }
644 }
645 if (patternLen == 0 && stringLen == 0)
646 return 1;
647 return 0;
648 }
649
650 static void redisLog(int level, const char *fmt, ...) {
651 va_list ap;
652 FILE *fp;
653
654 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
655 if (!fp) return;
656
657 va_start(ap, fmt);
658 if (level >= server.verbosity) {
659 char *c = ".-*";
660 char buf[64];
661 time_t now;
662
663 now = time(NULL);
664 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
665 fprintf(fp,"%s %c ",buf,c[level]);
666 vfprintf(fp, fmt, ap);
667 fprintf(fp,"\n");
668 fflush(fp);
669 }
670 va_end(ap);
671
672 if (server.logfile) fclose(fp);
673 }
674
675 /*====================== Hash table type implementation ==================== */
676
677 /* This is an hash table type that uses the SDS dynamic strings libary as
678 * keys and radis objects as values (objects can hold SDS strings,
679 * lists, sets). */
680
681 static void dictVanillaFree(void *privdata, void *val)
682 {
683 DICT_NOTUSED(privdata);
684 zfree(val);
685 }
686
687 static int sdsDictKeyCompare(void *privdata, const void *key1,
688 const void *key2)
689 {
690 int l1,l2;
691 DICT_NOTUSED(privdata);
692
693 l1 = sdslen((sds)key1);
694 l2 = sdslen((sds)key2);
695 if (l1 != l2) return 0;
696 return memcmp(key1, key2, l1) == 0;
697 }
698
699 static void dictRedisObjectDestructor(void *privdata, void *val)
700 {
701 DICT_NOTUSED(privdata);
702
703 decrRefCount(val);
704 }
705
706 static int dictObjKeyCompare(void *privdata, const void *key1,
707 const void *key2)
708 {
709 const robj *o1 = key1, *o2 = key2;
710 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
711 }
712
713 static unsigned int dictObjHash(const void *key) {
714 const robj *o = key;
715 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
716 }
717
718 static int dictEncObjKeyCompare(void *privdata, const void *key1,
719 const void *key2)
720 {
721 const robj *o1 = key1, *o2 = key2;
722
723 if (o1->encoding == REDIS_ENCODING_RAW &&
724 o2->encoding == REDIS_ENCODING_RAW)
725 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
726 else {
727 robj *dec1, *dec2;
728 int cmp;
729
730 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
731 getDecodedObject(o1) : (robj*)o1;
732 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
733 getDecodedObject(o2) : (robj*)o2;
734 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
735 if (dec1 != o1) decrRefCount(dec1);
736 if (dec2 != o2) decrRefCount(dec2);
737 return cmp;
738 }
739 }
740
741 static unsigned int dictEncObjHash(const void *key) {
742 const robj *o = key;
743
744 if (o->encoding == REDIS_ENCODING_RAW)
745 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
746 else {
747 robj *dec = getDecodedObject(o);
748 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
749 decrRefCount(dec);
750 return hash;
751 }
752 }
753
754 static dictType setDictType = {
755 dictEncObjHash, /* hash function */
756 NULL, /* key dup */
757 NULL, /* val dup */
758 dictEncObjKeyCompare, /* key compare */
759 dictRedisObjectDestructor, /* key destructor */
760 NULL /* val destructor */
761 };
762
763 static dictType zsetDictType = {
764 dictEncObjHash, /* hash function */
765 NULL, /* key dup */
766 NULL, /* val dup */
767 dictEncObjKeyCompare, /* key compare */
768 dictRedisObjectDestructor, /* key destructor */
769 dictVanillaFree /* val destructor */
770 };
771
772 static dictType hashDictType = {
773 dictObjHash, /* hash function */
774 NULL, /* key dup */
775 NULL, /* val dup */
776 dictObjKeyCompare, /* key compare */
777 dictRedisObjectDestructor, /* key destructor */
778 dictRedisObjectDestructor /* val destructor */
779 };
780
781 /* ========================= Random utility functions ======================= */
782
783 /* Redis generally does not try to recover from out of memory conditions
784 * when allocating objects or strings, it is not clear if it will be possible
785 * to report this condition to the client since the networking layer itself
786 * is based on heap allocation for send buffers, so we simply abort.
787 * At least the code will be simpler to read... */
788 static void oom(const char *msg) {
789 fprintf(stderr, "%s: Out of memory\n",msg);
790 fflush(stderr);
791 sleep(1);
792 abort();
793 }
794
795 /* ====================== Redis server networking stuff ===================== */
796 static void closeTimedoutClients(void) {
797 redisClient *c;
798 listNode *ln;
799 time_t now = time(NULL);
800
801 listRewind(server.clients);
802 while ((ln = listYield(server.clients)) != NULL) {
803 c = listNodeValue(ln);
804 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
805 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
806 (now - c->lastinteraction > server.maxidletime)) {
807 redisLog(REDIS_DEBUG,"Closing idle client");
808 freeClient(c);
809 }
810 }
811 }
812
813 static int htNeedsResize(dict *dict) {
814 long long size, used;
815
816 size = dictSlots(dict);
817 used = dictSize(dict);
818 return (size && used && size > DICT_HT_INITIAL_SIZE &&
819 (used*100/size < REDIS_HT_MINFILL));
820 }
821
822 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
823 * we resize the hash table to save memory */
824 static void tryResizeHashTables(void) {
825 int j;
826
827 for (j = 0; j < server.dbnum; j++) {
828 if (htNeedsResize(server.db[j].dict)) {
829 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
830 dictResize(server.db[j].dict);
831 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
832 }
833 if (htNeedsResize(server.db[j].expires))
834 dictResize(server.db[j].expires);
835 }
836 }
837
838 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
839 int j, loops = server.cronloops++;
840 REDIS_NOTUSED(eventLoop);
841 REDIS_NOTUSED(id);
842 REDIS_NOTUSED(clientData);
843
844 /* Update the global state with the amount of used memory */
845 server.usedmemory = zmalloc_used_memory();
846
847 /* Show some info about non-empty databases */
848 for (j = 0; j < server.dbnum; j++) {
849 long long size, used, vkeys;
850
851 size = dictSlots(server.db[j].dict);
852 used = dictSize(server.db[j].dict);
853 vkeys = dictSize(server.db[j].expires);
854 if (!(loops % 5) && (used || vkeys)) {
855 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
856 /* dictPrintStats(server.dict); */
857 }
858 }
859
860 /* We don't want to resize the hash tables while a bacground saving
861 * is in progress: the saving child is created using fork() that is
862 * implemented with a copy-on-write semantic in most modern systems, so
863 * if we resize the HT while there is the saving child at work actually
864 * a lot of memory movements in the parent will cause a lot of pages
865 * copied. */
866 if (!server.bgsaveinprogress) tryResizeHashTables();
867
868 /* Show information about connected clients */
869 if (!(loops % 5)) {
870 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
871 listLength(server.clients)-listLength(server.slaves),
872 listLength(server.slaves),
873 server.usedmemory,
874 dictSize(server.sharingpool));
875 }
876
877 /* Close connections of timedout clients */
878 if (server.maxidletime && !(loops % 10))
879 closeTimedoutClients();
880
881 /* Check if a background saving in progress terminated */
882 if (server.bgsaveinprogress) {
883 int statloc;
884 if (wait4(-1,&statloc,WNOHANG,NULL)) {
885 int exitcode = WEXITSTATUS(statloc);
886 int bysignal = WIFSIGNALED(statloc);
887
888 if (!bysignal && exitcode == 0) {
889 redisLog(REDIS_NOTICE,
890 "Background saving terminated with success");
891 server.dirty = 0;
892 server.lastsave = time(NULL);
893 } else if (!bysignal && exitcode != 0) {
894 redisLog(REDIS_WARNING, "Background saving error");
895 } else {
896 redisLog(REDIS_WARNING,
897 "Background saving terminated by signal");
898 rdbRemoveTempFile(server.bgsavechildpid);
899 }
900 server.bgsaveinprogress = 0;
901 server.bgsavechildpid = -1;
902 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
903 }
904 } else {
905 /* If there is not a background saving in progress check if
906 * we have to save now */
907 time_t now = time(NULL);
908 for (j = 0; j < server.saveparamslen; j++) {
909 struct saveparam *sp = server.saveparams+j;
910
911 if (server.dirty >= sp->changes &&
912 now-server.lastsave > sp->seconds) {
913 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
914 sp->changes, sp->seconds);
915 rdbSaveBackground(server.dbfilename);
916 break;
917 }
918 }
919 }
920
921 /* Try to expire a few timed out keys */
922 for (j = 0; j < server.dbnum; j++) {
923 redisDb *db = server.db+j;
924 int num = dictSize(db->expires);
925
926 if (num) {
927 time_t now = time(NULL);
928
929 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
930 num = REDIS_EXPIRELOOKUPS_PER_CRON;
931 while (num--) {
932 dictEntry *de;
933 time_t t;
934
935 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
936 t = (time_t) dictGetEntryVal(de);
937 if (now > t) {
938 deleteKey(db,dictGetEntryKey(de));
939 }
940 }
941 }
942 }
943
944 /* Check if we should connect to a MASTER */
945 if (server.replstate == REDIS_REPL_CONNECT) {
946 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
947 if (syncWithMaster() == REDIS_OK) {
948 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
949 }
950 }
951 return 1000;
952 }
953
954 static void createSharedObjects(void) {
955 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
956 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
957 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
958 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
959 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
960 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
961 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
962 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
963 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
964 /* no such key */
965 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
966 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
967 "-ERR Operation against a key holding the wrong kind of value\r\n"));
968 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
969 "-ERR no such key\r\n"));
970 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
971 "-ERR syntax error\r\n"));
972 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
973 "-ERR source and destination objects are the same\r\n"));
974 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
975 "-ERR index out of range\r\n"));
976 shared.space = createObject(REDIS_STRING,sdsnew(" "));
977 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
978 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
979 shared.select0 = createStringObject("select 0\r\n",10);
980 shared.select1 = createStringObject("select 1\r\n",10);
981 shared.select2 = createStringObject("select 2\r\n",10);
982 shared.select3 = createStringObject("select 3\r\n",10);
983 shared.select4 = createStringObject("select 4\r\n",10);
984 shared.select5 = createStringObject("select 5\r\n",10);
985 shared.select6 = createStringObject("select 6\r\n",10);
986 shared.select7 = createStringObject("select 7\r\n",10);
987 shared.select8 = createStringObject("select 8\r\n",10);
988 shared.select9 = createStringObject("select 9\r\n",10);
989 }
990
991 static void appendServerSaveParams(time_t seconds, int changes) {
992 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
993 server.saveparams[server.saveparamslen].seconds = seconds;
994 server.saveparams[server.saveparamslen].changes = changes;
995 server.saveparamslen++;
996 }
997
998 static void ResetServerSaveParams() {
999 zfree(server.saveparams);
1000 server.saveparams = NULL;
1001 server.saveparamslen = 0;
1002 }
1003
1004 static void initServerConfig() {
1005 server.dbnum = REDIS_DEFAULT_DBNUM;
1006 server.port = REDIS_SERVERPORT;
1007 server.verbosity = REDIS_DEBUG;
1008 server.maxidletime = REDIS_MAXIDLETIME;
1009 server.saveparams = NULL;
1010 server.logfile = NULL; /* NULL = log on standard output */
1011 server.bindaddr = NULL;
1012 server.glueoutputbuf = 1;
1013 server.daemonize = 0;
1014 server.pidfile = "/var/run/redis.pid";
1015 server.dbfilename = "dump.rdb";
1016 server.requirepass = NULL;
1017 server.shareobjects = 0;
1018 server.sharingpoolsize = 1024;
1019 server.maxclients = 0;
1020 server.maxmemory = 0;
1021 ResetServerSaveParams();
1022
1023 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1024 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1025 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1026 /* Replication related */
1027 server.isslave = 0;
1028 server.masterhost = NULL;
1029 server.masterport = 6379;
1030 server.master = NULL;
1031 server.replstate = REDIS_REPL_NONE;
1032
1033 /* Double constants initialization */
1034 R_Zero = 0.0;
1035 R_PosInf = 1.0/R_Zero;
1036 R_NegInf = -1.0/R_Zero;
1037 R_Nan = R_Zero/R_Zero;
1038 }
1039
1040 static void initServer() {
1041 int j;
1042
1043 signal(SIGHUP, SIG_IGN);
1044 signal(SIGPIPE, SIG_IGN);
1045 setupSigSegvAction();
1046
1047 server.clients = listCreate();
1048 server.slaves = listCreate();
1049 server.monitors = listCreate();
1050 server.objfreelist = listCreate();
1051 createSharedObjects();
1052 server.el = aeCreateEventLoop();
1053 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1054 server.sharingpool = dictCreate(&setDictType,NULL);
1055 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1056 if (server.fd == -1) {
1057 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1058 exit(1);
1059 }
1060 for (j = 0; j < server.dbnum; j++) {
1061 server.db[j].dict = dictCreate(&hashDictType,NULL);
1062 server.db[j].expires = dictCreate(&setDictType,NULL);
1063 server.db[j].id = j;
1064 }
1065 server.cronloops = 0;
1066 server.bgsaveinprogress = 0;
1067 server.bgsavechildpid = -1;
1068 server.lastsave = time(NULL);
1069 server.dirty = 0;
1070 server.usedmemory = 0;
1071 server.stat_numcommands = 0;
1072 server.stat_numconnections = 0;
1073 server.stat_starttime = time(NULL);
1074 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1075 }
1076
1077 /* Empty the whole database */
1078 static long long emptyDb() {
1079 int j;
1080 long long removed = 0;
1081
1082 for (j = 0; j < server.dbnum; j++) {
1083 removed += dictSize(server.db[j].dict);
1084 dictEmpty(server.db[j].dict);
1085 dictEmpty(server.db[j].expires);
1086 }
1087 return removed;
1088 }
1089
1090 static int yesnotoi(char *s) {
1091 if (!strcasecmp(s,"yes")) return 1;
1092 else if (!strcasecmp(s,"no")) return 0;
1093 else return -1;
1094 }
1095
1096 /* I agree, this is a very rudimental way to load a configuration...
1097 will improve later if the config gets more complex */
1098 static void loadServerConfig(char *filename) {
1099 FILE *fp;
1100 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1101 int linenum = 0;
1102 sds line = NULL;
1103
1104 if (filename[0] == '-' && filename[1] == '\0')
1105 fp = stdin;
1106 else {
1107 if ((fp = fopen(filename,"r")) == NULL) {
1108 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1109 exit(1);
1110 }
1111 }
1112
1113 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1114 sds *argv;
1115 int argc, j;
1116
1117 linenum++;
1118 line = sdsnew(buf);
1119 line = sdstrim(line," \t\r\n");
1120
1121 /* Skip comments and blank lines*/
1122 if (line[0] == '#' || line[0] == '\0') {
1123 sdsfree(line);
1124 continue;
1125 }
1126
1127 /* Split into arguments */
1128 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1129 sdstolower(argv[0]);
1130
1131 /* Execute config directives */
1132 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1133 server.maxidletime = atoi(argv[1]);
1134 if (server.maxidletime < 0) {
1135 err = "Invalid timeout value"; goto loaderr;
1136 }
1137 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1138 server.port = atoi(argv[1]);
1139 if (server.port < 1 || server.port > 65535) {
1140 err = "Invalid port"; goto loaderr;
1141 }
1142 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1143 server.bindaddr = zstrdup(argv[1]);
1144 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1145 int seconds = atoi(argv[1]);
1146 int changes = atoi(argv[2]);
1147 if (seconds < 1 || changes < 0) {
1148 err = "Invalid save parameters"; goto loaderr;
1149 }
1150 appendServerSaveParams(seconds,changes);
1151 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1152 if (chdir(argv[1]) == -1) {
1153 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1154 argv[1], strerror(errno));
1155 exit(1);
1156 }
1157 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1158 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1159 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1160 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1161 else {
1162 err = "Invalid log level. Must be one of debug, notice, warning";
1163 goto loaderr;
1164 }
1165 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1166 FILE *logfp;
1167
1168 server.logfile = zstrdup(argv[1]);
1169 if (!strcasecmp(server.logfile,"stdout")) {
1170 zfree(server.logfile);
1171 server.logfile = NULL;
1172 }
1173 if (server.logfile) {
1174 /* Test if we are able to open the file. The server will not
1175 * be able to abort just for this problem later... */
1176 logfp = fopen(server.logfile,"a");
1177 if (logfp == NULL) {
1178 err = sdscatprintf(sdsempty(),
1179 "Can't open the log file: %s", strerror(errno));
1180 goto loaderr;
1181 }
1182 fclose(logfp);
1183 }
1184 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1185 server.dbnum = atoi(argv[1]);
1186 if (server.dbnum < 1) {
1187 err = "Invalid number of databases"; goto loaderr;
1188 }
1189 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1190 server.maxclients = atoi(argv[1]);
1191 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1192 server.maxmemory = strtoll(argv[1], NULL, 10);
1193 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1194 server.masterhost = sdsnew(argv[1]);
1195 server.masterport = atoi(argv[2]);
1196 server.replstate = REDIS_REPL_CONNECT;
1197 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1198 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1199 err = "argument must be 'yes' or 'no'"; goto loaderr;
1200 }
1201 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1202 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1203 err = "argument must be 'yes' or 'no'"; goto loaderr;
1204 }
1205 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1206 server.sharingpoolsize = atoi(argv[1]);
1207 if (server.sharingpoolsize < 1) {
1208 err = "invalid object sharing pool size"; goto loaderr;
1209 }
1210 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1211 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1212 err = "argument must be 'yes' or 'no'"; goto loaderr;
1213 }
1214 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1215 server.requirepass = zstrdup(argv[1]);
1216 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1217 server.pidfile = zstrdup(argv[1]);
1218 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1219 server.dbfilename = zstrdup(argv[1]);
1220 } else {
1221 err = "Bad directive or wrong number of arguments"; goto loaderr;
1222 }
1223 for (j = 0; j < argc; j++)
1224 sdsfree(argv[j]);
1225 zfree(argv);
1226 sdsfree(line);
1227 }
1228 if (fp != stdin) fclose(fp);
1229 return;
1230
1231 loaderr:
1232 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1233 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1234 fprintf(stderr, ">>> '%s'\n", line);
1235 fprintf(stderr, "%s\n", err);
1236 exit(1);
1237 }
1238
1239 static void freeClientArgv(redisClient *c) {
1240 int j;
1241
1242 for (j = 0; j < c->argc; j++)
1243 decrRefCount(c->argv[j]);
1244 for (j = 0; j < c->mbargc; j++)
1245 decrRefCount(c->mbargv[j]);
1246 c->argc = 0;
1247 c->mbargc = 0;
1248 }
1249
1250 static void freeClient(redisClient *c) {
1251 listNode *ln;
1252
1253 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1254 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1255 sdsfree(c->querybuf);
1256 listRelease(c->reply);
1257 freeClientArgv(c);
1258 close(c->fd);
1259 ln = listSearchKey(server.clients,c);
1260 assert(ln != NULL);
1261 listDelNode(server.clients,ln);
1262 if (c->flags & REDIS_SLAVE) {
1263 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1264 close(c->repldbfd);
1265 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1266 ln = listSearchKey(l,c);
1267 assert(ln != NULL);
1268 listDelNode(l,ln);
1269 }
1270 if (c->flags & REDIS_MASTER) {
1271 server.master = NULL;
1272 server.replstate = REDIS_REPL_CONNECT;
1273 }
1274 zfree(c->argv);
1275 zfree(c->mbargv);
1276 zfree(c);
1277 }
1278
1279 static void glueReplyBuffersIfNeeded(redisClient *c) {
1280 int totlen = 0;
1281 listNode *ln;
1282 robj *o;
1283
1284 listRewind(c->reply);
1285 while((ln = listYield(c->reply))) {
1286 o = ln->value;
1287 totlen += sdslen(o->ptr);
1288 /* This optimization makes more sense if we don't have to copy
1289 * too much data */
1290 if (totlen > 1024) return;
1291 }
1292 if (totlen > 0) {
1293 char buf[1024];
1294 int copylen = 0;
1295
1296 listRewind(c->reply);
1297 while((ln = listYield(c->reply))) {
1298 o = ln->value;
1299 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1300 copylen += sdslen(o->ptr);
1301 listDelNode(c->reply,ln);
1302 }
1303 /* Now the output buffer is empty, add the new single element */
1304 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1305 listAddNodeTail(c->reply,o);
1306 }
1307 }
1308
1309 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1310 redisClient *c = privdata;
1311 int nwritten = 0, totwritten = 0, objlen;
1312 robj *o;
1313 REDIS_NOTUSED(el);
1314 REDIS_NOTUSED(mask);
1315
1316 if (server.glueoutputbuf && listLength(c->reply) > 1)
1317 glueReplyBuffersIfNeeded(c);
1318 while(listLength(c->reply)) {
1319 o = listNodeValue(listFirst(c->reply));
1320 objlen = sdslen(o->ptr);
1321
1322 if (objlen == 0) {
1323 listDelNode(c->reply,listFirst(c->reply));
1324 continue;
1325 }
1326
1327 if (c->flags & REDIS_MASTER) {
1328 /* Don't reply to a master */
1329 nwritten = objlen - c->sentlen;
1330 } else {
1331 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1332 if (nwritten <= 0) break;
1333 }
1334 c->sentlen += nwritten;
1335 totwritten += nwritten;
1336 /* If we fully sent the object on head go to the next one */
1337 if (c->sentlen == objlen) {
1338 listDelNode(c->reply,listFirst(c->reply));
1339 c->sentlen = 0;
1340 }
1341 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1342 * bytes, in a single threaded server it's a good idea to server
1343 * other clients as well, even if a very large request comes from
1344 * super fast link that is always able to accept data (in real world
1345 * terms think to 'KEYS *' against the loopback interfae) */
1346 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1347 }
1348 if (nwritten == -1) {
1349 if (errno == EAGAIN) {
1350 nwritten = 0;
1351 } else {
1352 redisLog(REDIS_DEBUG,
1353 "Error writing to client: %s", strerror(errno));
1354 freeClient(c);
1355 return;
1356 }
1357 }
1358 if (totwritten > 0) c->lastinteraction = time(NULL);
1359 if (listLength(c->reply) == 0) {
1360 c->sentlen = 0;
1361 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1362 }
1363 }
1364
1365 static struct redisCommand *lookupCommand(char *name) {
1366 int j = 0;
1367 while(cmdTable[j].name != NULL) {
1368 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1369 j++;
1370 }
1371 return NULL;
1372 }
1373
1374 /* resetClient prepare the client to process the next command */
1375 static void resetClient(redisClient *c) {
1376 freeClientArgv(c);
1377 c->bulklen = -1;
1378 c->multibulk = 0;
1379 }
1380
1381 /* If this function gets called we already read a whole
1382 * command, argments are in the client argv/argc fields.
1383 * processCommand() execute the command or prepare the
1384 * server for a bulk read from the client.
1385 *
1386 * If 1 is returned the client is still alive and valid and
1387 * and other operations can be performed by the caller. Otherwise
1388 * if 0 is returned the client was destroied (i.e. after QUIT). */
1389 static int processCommand(redisClient *c) {
1390 struct redisCommand *cmd;
1391 long long dirty;
1392
1393 /* Free some memory if needed (maxmemory setting) */
1394 if (server.maxmemory) freeMemoryIfNeeded();
1395
1396 /* Handle the multi bulk command type. This is an alternative protocol
1397 * supported by Redis in order to receive commands that are composed of
1398 * multiple binary-safe "bulk" arguments. The latency of processing is
1399 * a bit higher but this allows things like multi-sets, so if this
1400 * protocol is used only for MSET and similar commands this is a big win. */
1401 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1402 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1403 if (c->multibulk <= 0) {
1404 resetClient(c);
1405 return 1;
1406 } else {
1407 decrRefCount(c->argv[c->argc-1]);
1408 c->argc--;
1409 return 1;
1410 }
1411 } else if (c->multibulk) {
1412 if (c->bulklen == -1) {
1413 if (((char*)c->argv[0]->ptr)[0] != '$') {
1414 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1415 resetClient(c);
1416 return 1;
1417 } else {
1418 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1419 decrRefCount(c->argv[0]);
1420 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1421 c->argc--;
1422 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1423 resetClient(c);
1424 return 1;
1425 }
1426 c->argc--;
1427 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1428 return 1;
1429 }
1430 } else {
1431 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1432 c->mbargv[c->mbargc] = c->argv[0];
1433 c->mbargc++;
1434 c->argc--;
1435 c->multibulk--;
1436 if (c->multibulk == 0) {
1437 robj **auxargv;
1438 int auxargc;
1439
1440 /* Here we need to swap the multi-bulk argc/argv with the
1441 * normal argc/argv of the client structure. */
1442 auxargv = c->argv;
1443 c->argv = c->mbargv;
1444 c->mbargv = auxargv;
1445
1446 auxargc = c->argc;
1447 c->argc = c->mbargc;
1448 c->mbargc = auxargc;
1449
1450 /* We need to set bulklen to something different than -1
1451 * in order for the code below to process the command without
1452 * to try to read the last argument of a bulk command as
1453 * a special argument. */
1454 c->bulklen = 0;
1455 /* continue below and process the command */
1456 } else {
1457 c->bulklen = -1;
1458 return 1;
1459 }
1460 }
1461 }
1462 /* -- end of multi bulk commands processing -- */
1463
1464 /* The QUIT command is handled as a special case. Normal command
1465 * procs are unable to close the client connection safely */
1466 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1467 freeClient(c);
1468 return 0;
1469 }
1470 cmd = lookupCommand(c->argv[0]->ptr);
1471 if (!cmd) {
1472 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1473 resetClient(c);
1474 return 1;
1475 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1476 (c->argc < -cmd->arity)) {
1477 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1478 resetClient(c);
1479 return 1;
1480 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1481 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1482 resetClient(c);
1483 return 1;
1484 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1485 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1486
1487 decrRefCount(c->argv[c->argc-1]);
1488 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1489 c->argc--;
1490 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1491 resetClient(c);
1492 return 1;
1493 }
1494 c->argc--;
1495 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1496 /* It is possible that the bulk read is already in the
1497 * buffer. Check this condition and handle it accordingly.
1498 * This is just a fast path, alternative to call processInputBuffer().
1499 * It's a good idea since the code is small and this condition
1500 * happens most of the times. */
1501 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1502 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1503 c->argc++;
1504 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1505 } else {
1506 return 1;
1507 }
1508 }
1509 /* Let's try to share objects on the command arguments vector */
1510 if (server.shareobjects) {
1511 int j;
1512 for(j = 1; j < c->argc; j++)
1513 c->argv[j] = tryObjectSharing(c->argv[j]);
1514 }
1515 /* Let's try to encode the bulk object to save space. */
1516 if (cmd->flags & REDIS_CMD_BULK)
1517 tryObjectEncoding(c->argv[c->argc-1]);
1518
1519 /* Check if the user is authenticated */
1520 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1521 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1522 resetClient(c);
1523 return 1;
1524 }
1525
1526 /* Exec the command */
1527 dirty = server.dirty;
1528 cmd->proc(c);
1529 if (server.dirty-dirty != 0 && listLength(server.slaves))
1530 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1531 if (listLength(server.monitors))
1532 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1533 server.stat_numcommands++;
1534
1535 /* Prepare the client for the next command */
1536 if (c->flags & REDIS_CLOSE) {
1537 freeClient(c);
1538 return 0;
1539 }
1540 resetClient(c);
1541 return 1;
1542 }
1543
1544 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1545 listNode *ln;
1546 int outc = 0, j;
1547 robj **outv;
1548 /* (args*2)+1 is enough room for args, spaces, newlines */
1549 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1550
1551 if (argc <= REDIS_STATIC_ARGS) {
1552 outv = static_outv;
1553 } else {
1554 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1555 }
1556
1557 for (j = 0; j < argc; j++) {
1558 if (j != 0) outv[outc++] = shared.space;
1559 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1560 robj *lenobj;
1561
1562 lenobj = createObject(REDIS_STRING,
1563 sdscatprintf(sdsempty(),"%d\r\n",
1564 stringObjectLen(argv[j])));
1565 lenobj->refcount = 0;
1566 outv[outc++] = lenobj;
1567 }
1568 outv[outc++] = argv[j];
1569 }
1570 outv[outc++] = shared.crlf;
1571
1572 /* Increment all the refcounts at start and decrement at end in order to
1573 * be sure to free objects if there is no slave in a replication state
1574 * able to be feed with commands */
1575 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1576 listRewind(slaves);
1577 while((ln = listYield(slaves))) {
1578 redisClient *slave = ln->value;
1579
1580 /* Don't feed slaves that are still waiting for BGSAVE to start */
1581 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1582
1583 /* Feed all the other slaves, MONITORs and so on */
1584 if (slave->slaveseldb != dictid) {
1585 robj *selectcmd;
1586
1587 switch(dictid) {
1588 case 0: selectcmd = shared.select0; break;
1589 case 1: selectcmd = shared.select1; break;
1590 case 2: selectcmd = shared.select2; break;
1591 case 3: selectcmd = shared.select3; break;
1592 case 4: selectcmd = shared.select4; break;
1593 case 5: selectcmd = shared.select5; break;
1594 case 6: selectcmd = shared.select6; break;
1595 case 7: selectcmd = shared.select7; break;
1596 case 8: selectcmd = shared.select8; break;
1597 case 9: selectcmd = shared.select9; break;
1598 default:
1599 selectcmd = createObject(REDIS_STRING,
1600 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1601 selectcmd->refcount = 0;
1602 break;
1603 }
1604 addReply(slave,selectcmd);
1605 slave->slaveseldb = dictid;
1606 }
1607 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1608 }
1609 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1610 if (outv != static_outv) zfree(outv);
1611 }
1612
1613 static void processInputBuffer(redisClient *c) {
1614 again:
1615 if (c->bulklen == -1) {
1616 /* Read the first line of the query */
1617 char *p = strchr(c->querybuf,'\n');
1618 size_t querylen;
1619
1620 if (p) {
1621 sds query, *argv;
1622 int argc, j;
1623
1624 query = c->querybuf;
1625 c->querybuf = sdsempty();
1626 querylen = 1+(p-(query));
1627 if (sdslen(query) > querylen) {
1628 /* leave data after the first line of the query in the buffer */
1629 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1630 }
1631 *p = '\0'; /* remove "\n" */
1632 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1633 sdsupdatelen(query);
1634
1635 /* Now we can split the query in arguments */
1636 if (sdslen(query) == 0) {
1637 /* Ignore empty query */
1638 sdsfree(query);
1639 return;
1640 }
1641 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1642 sdsfree(query);
1643
1644 if (c->argv) zfree(c->argv);
1645 c->argv = zmalloc(sizeof(robj*)*argc);
1646
1647 for (j = 0; j < argc; j++) {
1648 if (sdslen(argv[j])) {
1649 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1650 c->argc++;
1651 } else {
1652 sdsfree(argv[j]);
1653 }
1654 }
1655 zfree(argv);
1656 /* Execute the command. If the client is still valid
1657 * after processCommand() return and there is something
1658 * on the query buffer try to process the next command. */
1659 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1660 return;
1661 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1662 redisLog(REDIS_DEBUG, "Client protocol error");
1663 freeClient(c);
1664 return;
1665 }
1666 } else {
1667 /* Bulk read handling. Note that if we are at this point
1668 the client already sent a command terminated with a newline,
1669 we are reading the bulk data that is actually the last
1670 argument of the command. */
1671 int qbl = sdslen(c->querybuf);
1672
1673 if (c->bulklen <= qbl) {
1674 /* Copy everything but the final CRLF as final argument */
1675 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1676 c->argc++;
1677 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1678 /* Process the command. If the client is still valid after
1679 * the processing and there is more data in the buffer
1680 * try to parse it. */
1681 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1682 return;
1683 }
1684 }
1685 }
1686
1687 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1688 redisClient *c = (redisClient*) privdata;
1689 char buf[REDIS_IOBUF_LEN];
1690 int nread;
1691 REDIS_NOTUSED(el);
1692 REDIS_NOTUSED(mask);
1693
1694 nread = read(fd, buf, REDIS_IOBUF_LEN);
1695 if (nread == -1) {
1696 if (errno == EAGAIN) {
1697 nread = 0;
1698 } else {
1699 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1700 freeClient(c);
1701 return;
1702 }
1703 } else if (nread == 0) {
1704 redisLog(REDIS_DEBUG, "Client closed connection");
1705 freeClient(c);
1706 return;
1707 }
1708 if (nread) {
1709 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1710 c->lastinteraction = time(NULL);
1711 } else {
1712 return;
1713 }
1714 processInputBuffer(c);
1715 }
1716
1717 static int selectDb(redisClient *c, int id) {
1718 if (id < 0 || id >= server.dbnum)
1719 return REDIS_ERR;
1720 c->db = &server.db[id];
1721 return REDIS_OK;
1722 }
1723
1724 static void *dupClientReplyValue(void *o) {
1725 incrRefCount((robj*)o);
1726 return 0;
1727 }
1728
1729 static redisClient *createClient(int fd) {
1730 redisClient *c = zmalloc(sizeof(*c));
1731
1732 anetNonBlock(NULL,fd);
1733 anetTcpNoDelay(NULL,fd);
1734 if (!c) return NULL;
1735 selectDb(c,0);
1736 c->fd = fd;
1737 c->querybuf = sdsempty();
1738 c->argc = 0;
1739 c->argv = NULL;
1740 c->bulklen = -1;
1741 c->multibulk = 0;
1742 c->mbargc = 0;
1743 c->mbargv = NULL;
1744 c->sentlen = 0;
1745 c->flags = 0;
1746 c->lastinteraction = time(NULL);
1747 c->authenticated = 0;
1748 c->replstate = REDIS_REPL_NONE;
1749 c->reply = listCreate();
1750 listSetFreeMethod(c->reply,decrRefCount);
1751 listSetDupMethod(c->reply,dupClientReplyValue);
1752 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1753 readQueryFromClient, c, NULL) == AE_ERR) {
1754 freeClient(c);
1755 return NULL;
1756 }
1757 listAddNodeTail(server.clients,c);
1758 return c;
1759 }
1760
1761 static void addReply(redisClient *c, robj *obj) {
1762 if (listLength(c->reply) == 0 &&
1763 (c->replstate == REDIS_REPL_NONE ||
1764 c->replstate == REDIS_REPL_ONLINE) &&
1765 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1766 sendReplyToClient, c, NULL) == AE_ERR) return;
1767 if (obj->encoding != REDIS_ENCODING_RAW) {
1768 obj = getDecodedObject(obj);
1769 } else {
1770 incrRefCount(obj);
1771 }
1772 listAddNodeTail(c->reply,obj);
1773 }
1774
1775 static void addReplySds(redisClient *c, sds s) {
1776 robj *o = createObject(REDIS_STRING,s);
1777 addReply(c,o);
1778 decrRefCount(o);
1779 }
1780
1781 static void addReplyBulkLen(redisClient *c, robj *obj) {
1782 size_t len;
1783
1784 if (obj->encoding == REDIS_ENCODING_RAW) {
1785 len = sdslen(obj->ptr);
1786 } else {
1787 long n = (long)obj->ptr;
1788
1789 len = 1;
1790 if (n < 0) {
1791 len++;
1792 n = -n;
1793 }
1794 while((n = n/10) != 0) {
1795 len++;
1796 }
1797 }
1798 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1799 }
1800
1801 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1802 int cport, cfd;
1803 char cip[128];
1804 redisClient *c;
1805 REDIS_NOTUSED(el);
1806 REDIS_NOTUSED(mask);
1807 REDIS_NOTUSED(privdata);
1808
1809 cfd = anetAccept(server.neterr, fd, cip, &cport);
1810 if (cfd == AE_ERR) {
1811 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1812 return;
1813 }
1814 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1815 if ((c = createClient(cfd)) == NULL) {
1816 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1817 close(cfd); /* May be already closed, just ingore errors */
1818 return;
1819 }
1820 /* If maxclient directive is set and this is one client more... close the
1821 * connection. Note that we create the client instead to check before
1822 * for this condition, since now the socket is already set in nonblocking
1823 * mode and we can send an error for free using the Kernel I/O */
1824 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1825 char *err = "-ERR max number of clients reached\r\n";
1826
1827 /* That's a best effort error message, don't check write errors */
1828 (void) write(c->fd,err,strlen(err));
1829 freeClient(c);
1830 return;
1831 }
1832 server.stat_numconnections++;
1833 }
1834
1835 /* ======================= Redis objects implementation ===================== */
1836
1837 static robj *createObject(int type, void *ptr) {
1838 robj *o;
1839
1840 if (listLength(server.objfreelist)) {
1841 listNode *head = listFirst(server.objfreelist);
1842 o = listNodeValue(head);
1843 listDelNode(server.objfreelist,head);
1844 } else {
1845 o = zmalloc(sizeof(*o));
1846 }
1847 o->type = type;
1848 o->encoding = REDIS_ENCODING_RAW;
1849 o->ptr = ptr;
1850 o->refcount = 1;
1851 return o;
1852 }
1853
1854 static robj *createStringObject(char *ptr, size_t len) {
1855 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1856 }
1857
1858 static robj *createListObject(void) {
1859 list *l = listCreate();
1860
1861 listSetFreeMethod(l,decrRefCount);
1862 return createObject(REDIS_LIST,l);
1863 }
1864
1865 static robj *createSetObject(void) {
1866 dict *d = dictCreate(&setDictType,NULL);
1867 return createObject(REDIS_SET,d);
1868 }
1869
1870 static robj *createZsetObject(void) {
1871 zset *zs = zmalloc(sizeof(*zs));
1872
1873 zs->dict = dictCreate(&zsetDictType,NULL);
1874 zs->zsl = zslCreate();
1875 return createObject(REDIS_ZSET,zs);
1876 }
1877
1878 static void freeStringObject(robj *o) {
1879 if (o->encoding == REDIS_ENCODING_RAW) {
1880 sdsfree(o->ptr);
1881 }
1882 }
1883
1884 static void freeListObject(robj *o) {
1885 listRelease((list*) o->ptr);
1886 }
1887
1888 static void freeSetObject(robj *o) {
1889 dictRelease((dict*) o->ptr);
1890 }
1891
1892 static void freeZsetObject(robj *o) {
1893 zset *zs = o->ptr;
1894
1895 dictRelease(zs->dict);
1896 zslFree(zs->zsl);
1897 zfree(zs);
1898 }
1899
1900 static void freeHashObject(robj *o) {
1901 dictRelease((dict*) o->ptr);
1902 }
1903
1904 static void incrRefCount(robj *o) {
1905 o->refcount++;
1906 #ifdef DEBUG_REFCOUNT
1907 if (o->type == REDIS_STRING)
1908 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1909 #endif
1910 }
1911
1912 static void decrRefCount(void *obj) {
1913 robj *o = obj;
1914
1915 #ifdef DEBUG_REFCOUNT
1916 if (o->type == REDIS_STRING)
1917 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1918 #endif
1919 if (--(o->refcount) == 0) {
1920 switch(o->type) {
1921 case REDIS_STRING: freeStringObject(o); break;
1922 case REDIS_LIST: freeListObject(o); break;
1923 case REDIS_SET: freeSetObject(o); break;
1924 case REDIS_ZSET: freeZsetObject(o); break;
1925 case REDIS_HASH: freeHashObject(o); break;
1926 default: assert(0 != 0); break;
1927 }
1928 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1929 !listAddNodeHead(server.objfreelist,o))
1930 zfree(o);
1931 }
1932 }
1933
1934 static robj *lookupKey(redisDb *db, robj *key) {
1935 dictEntry *de = dictFind(db->dict,key);
1936 return de ? dictGetEntryVal(de) : NULL;
1937 }
1938
1939 static robj *lookupKeyRead(redisDb *db, robj *key) {
1940 expireIfNeeded(db,key);
1941 return lookupKey(db,key);
1942 }
1943
1944 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1945 deleteIfVolatile(db,key);
1946 return lookupKey(db,key);
1947 }
1948
1949 static int deleteKey(redisDb *db, robj *key) {
1950 int retval;
1951
1952 /* We need to protect key from destruction: after the first dictDelete()
1953 * it may happen that 'key' is no longer valid if we don't increment
1954 * it's count. This may happen when we get the object reference directly
1955 * from the hash table with dictRandomKey() or dict iterators */
1956 incrRefCount(key);
1957 if (dictSize(db->expires)) dictDelete(db->expires,key);
1958 retval = dictDelete(db->dict,key);
1959 decrRefCount(key);
1960
1961 return retval == DICT_OK;
1962 }
1963
1964 /* Try to share an object against the shared objects pool */
1965 static robj *tryObjectSharing(robj *o) {
1966 struct dictEntry *de;
1967 unsigned long c;
1968
1969 if (o == NULL || server.shareobjects == 0) return o;
1970
1971 assert(o->type == REDIS_STRING);
1972 de = dictFind(server.sharingpool,o);
1973 if (de) {
1974 robj *shared = dictGetEntryKey(de);
1975
1976 c = ((unsigned long) dictGetEntryVal(de))+1;
1977 dictGetEntryVal(de) = (void*) c;
1978 incrRefCount(shared);
1979 decrRefCount(o);
1980 return shared;
1981 } else {
1982 /* Here we are using a stream algorihtm: Every time an object is
1983 * shared we increment its count, everytime there is a miss we
1984 * recrement the counter of a random object. If this object reaches
1985 * zero we remove the object and put the current object instead. */
1986 if (dictSize(server.sharingpool) >=
1987 server.sharingpoolsize) {
1988 de = dictGetRandomKey(server.sharingpool);
1989 assert(de != NULL);
1990 c = ((unsigned long) dictGetEntryVal(de))-1;
1991 dictGetEntryVal(de) = (void*) c;
1992 if (c == 0) {
1993 dictDelete(server.sharingpool,de->key);
1994 }
1995 } else {
1996 c = 0; /* If the pool is empty we want to add this object */
1997 }
1998 if (c == 0) {
1999 int retval;
2000
2001 retval = dictAdd(server.sharingpool,o,(void*)1);
2002 assert(retval == DICT_OK);
2003 incrRefCount(o);
2004 }
2005 return o;
2006 }
2007 }
2008
2009 /* Check if the nul-terminated string 's' can be represented by a long
2010 * (that is, is a number that fits into long without any other space or
2011 * character before or after the digits).
2012 *
2013 * If so, the function returns REDIS_OK and *longval is set to the value
2014 * of the number. Otherwise REDIS_ERR is returned */
2015 static int isStringRepresentableAsLong(sds s, long *longval) {
2016 char buf[32], *endptr;
2017 long value;
2018 int slen;
2019
2020 value = strtol(s, &endptr, 10);
2021 if (endptr[0] != '\0') return REDIS_ERR;
2022 slen = snprintf(buf,32,"%ld",value);
2023
2024 /* If the number converted back into a string is not identical
2025 * then it's not possible to encode the string as integer */
2026 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2027 if (longval) *longval = value;
2028 return REDIS_OK;
2029 }
2030
2031 /* Try to encode a string object in order to save space */
2032 static int tryObjectEncoding(robj *o) {
2033 long value;
2034 sds s = o->ptr;
2035
2036 if (o->encoding != REDIS_ENCODING_RAW)
2037 return REDIS_ERR; /* Already encoded */
2038
2039 /* It's not save to encode shared objects: shared objects can be shared
2040 * everywhere in the "object space" of Redis. Encoded objects can only
2041 * appear as "values" (and not, for instance, as keys) */
2042 if (o->refcount > 1) return REDIS_ERR;
2043
2044 /* Currently we try to encode only strings */
2045 assert(o->type == REDIS_STRING);
2046
2047 /* Check if we can represent this string as a long integer */
2048 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2049
2050 /* Ok, this object can be encoded */
2051 o->encoding = REDIS_ENCODING_INT;
2052 sdsfree(o->ptr);
2053 o->ptr = (void*) value;
2054 return REDIS_OK;
2055 }
2056
2057 /* Get a decoded version of an encoded object (returned as a new object) */
2058 static robj *getDecodedObject(const robj *o) {
2059 robj *dec;
2060
2061 assert(o->encoding != REDIS_ENCODING_RAW);
2062 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2063 char buf[32];
2064
2065 snprintf(buf,32,"%ld",(long)o->ptr);
2066 dec = createStringObject(buf,strlen(buf));
2067 return dec;
2068 } else {
2069 assert(1 != 1);
2070 }
2071 }
2072
2073 static int compareStringObjects(robj *a, robj *b) {
2074 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2075
2076 if (a == b) return 0;
2077 if (a->encoding == REDIS_ENCODING_INT && b->encoding == REDIS_ENCODING_INT){
2078 return (long)a->ptr - (long)b->ptr;
2079 } else {
2080 int retval;
2081
2082 incrRefCount(a);
2083 incrRefCount(b);
2084 if (a->encoding != REDIS_ENCODING_RAW) a = getDecodedObject(a);
2085 if (b->encoding != REDIS_ENCODING_RAW) b = getDecodedObject(a);
2086 retval = sdscmp(a->ptr,b->ptr);
2087 decrRefCount(a);
2088 decrRefCount(b);
2089 return retval;
2090 }
2091 }
2092
2093 static size_t stringObjectLen(robj *o) {
2094 assert(o->type == REDIS_STRING);
2095 if (o->encoding == REDIS_ENCODING_RAW) {
2096 return sdslen(o->ptr);
2097 } else {
2098 char buf[32];
2099
2100 return snprintf(buf,32,"%ld",(long)o->ptr);
2101 }
2102 }
2103
2104 /*============================ DB saving/loading ============================ */
2105
2106 static int rdbSaveType(FILE *fp, unsigned char type) {
2107 if (fwrite(&type,1,1,fp) == 0) return -1;
2108 return 0;
2109 }
2110
2111 static int rdbSaveTime(FILE *fp, time_t t) {
2112 int32_t t32 = (int32_t) t;
2113 if (fwrite(&t32,4,1,fp) == 0) return -1;
2114 return 0;
2115 }
2116
2117 /* check rdbLoadLen() comments for more info */
2118 static int rdbSaveLen(FILE *fp, uint32_t len) {
2119 unsigned char buf[2];
2120
2121 if (len < (1<<6)) {
2122 /* Save a 6 bit len */
2123 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2124 if (fwrite(buf,1,1,fp) == 0) return -1;
2125 } else if (len < (1<<14)) {
2126 /* Save a 14 bit len */
2127 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2128 buf[1] = len&0xFF;
2129 if (fwrite(buf,2,1,fp) == 0) return -1;
2130 } else {
2131 /* Save a 32 bit len */
2132 buf[0] = (REDIS_RDB_32BITLEN<<6);
2133 if (fwrite(buf,1,1,fp) == 0) return -1;
2134 len = htonl(len);
2135 if (fwrite(&len,4,1,fp) == 0) return -1;
2136 }
2137 return 0;
2138 }
2139
2140 /* String objects in the form "2391" "-100" without any space and with a
2141 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2142 * encoded as integers to save space */
2143 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2144 long long value;
2145 char *endptr, buf[32];
2146
2147 /* Check if it's possible to encode this value as a number */
2148 value = strtoll(s, &endptr, 10);
2149 if (endptr[0] != '\0') return 0;
2150 snprintf(buf,32,"%lld",value);
2151
2152 /* If the number converted back into a string is not identical
2153 * then it's not possible to encode the string as integer */
2154 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2155
2156 /* Finally check if it fits in our ranges */
2157 if (value >= -(1<<7) && value <= (1<<7)-1) {
2158 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2159 enc[1] = value&0xFF;
2160 return 2;
2161 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2162 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2163 enc[1] = value&0xFF;
2164 enc[2] = (value>>8)&0xFF;
2165 return 3;
2166 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2167 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2168 enc[1] = value&0xFF;
2169 enc[2] = (value>>8)&0xFF;
2170 enc[3] = (value>>16)&0xFF;
2171 enc[4] = (value>>24)&0xFF;
2172 return 5;
2173 } else {
2174 return 0;
2175 }
2176 }
2177
2178 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2179 unsigned int comprlen, outlen;
2180 unsigned char byte;
2181 void *out;
2182
2183 /* We require at least four bytes compression for this to be worth it */
2184 outlen = sdslen(obj->ptr)-4;
2185 if (outlen <= 0) return 0;
2186 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2187 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2188 if (comprlen == 0) {
2189 zfree(out);
2190 return 0;
2191 }
2192 /* Data compressed! Let's save it on disk */
2193 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2194 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2195 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2196 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2197 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2198 zfree(out);
2199 return comprlen;
2200
2201 writeerr:
2202 zfree(out);
2203 return -1;
2204 }
2205
2206 /* Save a string objet as [len][data] on disk. If the object is a string
2207 * representation of an integer value we try to safe it in a special form */
2208 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2209 size_t len;
2210 int enclen;
2211
2212 len = sdslen(obj->ptr);
2213
2214 /* Try integer encoding */
2215 if (len <= 11) {
2216 unsigned char buf[5];
2217 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2218 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2219 return 0;
2220 }
2221 }
2222
2223 /* Try LZF compression - under 20 bytes it's unable to compress even
2224 * aaaaaaaaaaaaaaaaaa so skip it */
2225 if (len > 20) {
2226 int retval;
2227
2228 retval = rdbSaveLzfStringObject(fp,obj);
2229 if (retval == -1) return -1;
2230 if (retval > 0) return 0;
2231 /* retval == 0 means data can't be compressed, save the old way */
2232 }
2233
2234 /* Store verbatim */
2235 if (rdbSaveLen(fp,len) == -1) return -1;
2236 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2237 return 0;
2238 }
2239
2240 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2241 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2242 int retval;
2243 robj *dec;
2244
2245 if (obj->encoding != REDIS_ENCODING_RAW) {
2246 dec = getDecodedObject(obj);
2247 retval = rdbSaveStringObjectRaw(fp,dec);
2248 decrRefCount(dec);
2249 return retval;
2250 } else {
2251 return rdbSaveStringObjectRaw(fp,obj);
2252 }
2253 }
2254
2255 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2256 * 8 bit integer specifing the length of the representation.
2257 * This 8 bit integer has special values in order to specify the following
2258 * conditions:
2259 * 253: not a number
2260 * 254: + inf
2261 * 255: - inf
2262 */
2263 static int rdbSaveDoubleValue(FILE *fp, double val) {
2264 unsigned char buf[128];
2265 int len;
2266
2267 if (isnan(val)) {
2268 buf[0] = 253;
2269 len = 1;
2270 } else if (!isfinite(val)) {
2271 len = 1;
2272 buf[0] = (val < 0) ? 255 : 254;
2273 } else {
2274 snprintf((char*)buf+1,sizeof(buf)-1,"%.16g",val);
2275 buf[0] = strlen((char*)buf);
2276 len = buf[0]+1;
2277 }
2278 if (fwrite(buf,len,1,fp) == 0) return -1;
2279 return 0;
2280 }
2281
2282 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2283 static int rdbSave(char *filename) {
2284 dictIterator *di = NULL;
2285 dictEntry *de;
2286 FILE *fp;
2287 char tmpfile[256];
2288 int j;
2289 time_t now = time(NULL);
2290
2291 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2292 fp = fopen(tmpfile,"w");
2293 if (!fp) {
2294 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2295 return REDIS_ERR;
2296 }
2297 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2298 for (j = 0; j < server.dbnum; j++) {
2299 redisDb *db = server.db+j;
2300 dict *d = db->dict;
2301 if (dictSize(d) == 0) continue;
2302 di = dictGetIterator(d);
2303 if (!di) {
2304 fclose(fp);
2305 return REDIS_ERR;
2306 }
2307
2308 /* Write the SELECT DB opcode */
2309 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2310 if (rdbSaveLen(fp,j) == -1) goto werr;
2311
2312 /* Iterate this DB writing every entry */
2313 while((de = dictNext(di)) != NULL) {
2314 robj *key = dictGetEntryKey(de);
2315 robj *o = dictGetEntryVal(de);
2316 time_t expiretime = getExpire(db,key);
2317
2318 /* Save the expire time */
2319 if (expiretime != -1) {
2320 /* If this key is already expired skip it */
2321 if (expiretime < now) continue;
2322 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2323 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2324 }
2325 /* Save the key and associated value */
2326 if (rdbSaveType(fp,o->type) == -1) goto werr;
2327 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2328 if (o->type == REDIS_STRING) {
2329 /* Save a string value */
2330 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2331 } else if (o->type == REDIS_LIST) {
2332 /* Save a list value */
2333 list *list = o->ptr;
2334 listNode *ln;
2335
2336 listRewind(list);
2337 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2338 while((ln = listYield(list))) {
2339 robj *eleobj = listNodeValue(ln);
2340
2341 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2342 }
2343 } else if (o->type == REDIS_SET) {
2344 /* Save a set value */
2345 dict *set = o->ptr;
2346 dictIterator *di = dictGetIterator(set);
2347 dictEntry *de;
2348
2349 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2350 while((de = dictNext(di)) != NULL) {
2351 robj *eleobj = dictGetEntryKey(de);
2352
2353 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2354 }
2355 dictReleaseIterator(di);
2356 } else if (o->type == REDIS_ZSET) {
2357 /* Save a set value */
2358 zset *zs = o->ptr;
2359 dictIterator *di = dictGetIterator(zs->dict);
2360 dictEntry *de;
2361
2362 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2363 while((de = dictNext(di)) != NULL) {
2364 robj *eleobj = dictGetEntryKey(de);
2365 double *score = dictGetEntryVal(de);
2366
2367 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2368 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2369 }
2370 dictReleaseIterator(di);
2371 } else {
2372 assert(0 != 0);
2373 }
2374 }
2375 dictReleaseIterator(di);
2376 }
2377 /* EOF opcode */
2378 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2379
2380 /* Make sure data will not remain on the OS's output buffers */
2381 fflush(fp);
2382 fsync(fileno(fp));
2383 fclose(fp);
2384
2385 /* Use RENAME to make sure the DB file is changed atomically only
2386 * if the generate DB file is ok. */
2387 if (rename(tmpfile,filename) == -1) {
2388 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2389 unlink(tmpfile);
2390 return REDIS_ERR;
2391 }
2392 redisLog(REDIS_NOTICE,"DB saved on disk");
2393 server.dirty = 0;
2394 server.lastsave = time(NULL);
2395 return REDIS_OK;
2396
2397 werr:
2398 fclose(fp);
2399 unlink(tmpfile);
2400 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2401 if (di) dictReleaseIterator(di);
2402 return REDIS_ERR;
2403 }
2404
2405 static int rdbSaveBackground(char *filename) {
2406 pid_t childpid;
2407
2408 if (server.bgsaveinprogress) return REDIS_ERR;
2409 if ((childpid = fork()) == 0) {
2410 /* Child */
2411 close(server.fd);
2412 if (rdbSave(filename) == REDIS_OK) {
2413 exit(0);
2414 } else {
2415 exit(1);
2416 }
2417 } else {
2418 /* Parent */
2419 if (childpid == -1) {
2420 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2421 strerror(errno));
2422 return REDIS_ERR;
2423 }
2424 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2425 server.bgsaveinprogress = 1;
2426 server.bgsavechildpid = childpid;
2427 return REDIS_OK;
2428 }
2429 return REDIS_OK; /* unreached */
2430 }
2431
2432 static void rdbRemoveTempFile(pid_t childpid) {
2433 char tmpfile[256];
2434
2435 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2436 unlink(tmpfile);
2437 }
2438
2439 static int rdbLoadType(FILE *fp) {
2440 unsigned char type;
2441 if (fread(&type,1,1,fp) == 0) return -1;
2442 return type;
2443 }
2444
2445 static time_t rdbLoadTime(FILE *fp) {
2446 int32_t t32;
2447 if (fread(&t32,4,1,fp) == 0) return -1;
2448 return (time_t) t32;
2449 }
2450
2451 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2452 * of this file for a description of how this are stored on disk.
2453 *
2454 * isencoded is set to 1 if the readed length is not actually a length but
2455 * an "encoding type", check the above comments for more info */
2456 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2457 unsigned char buf[2];
2458 uint32_t len;
2459
2460 if (isencoded) *isencoded = 0;
2461 if (rdbver == 0) {
2462 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2463 return ntohl(len);
2464 } else {
2465 int type;
2466
2467 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2468 type = (buf[0]&0xC0)>>6;
2469 if (type == REDIS_RDB_6BITLEN) {
2470 /* Read a 6 bit len */
2471 return buf[0]&0x3F;
2472 } else if (type == REDIS_RDB_ENCVAL) {
2473 /* Read a 6 bit len encoding type */
2474 if (isencoded) *isencoded = 1;
2475 return buf[0]&0x3F;
2476 } else if (type == REDIS_RDB_14BITLEN) {
2477 /* Read a 14 bit len */
2478 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2479 return ((buf[0]&0x3F)<<8)|buf[1];
2480 } else {
2481 /* Read a 32 bit len */
2482 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2483 return ntohl(len);
2484 }
2485 }
2486 }
2487
2488 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2489 unsigned char enc[4];
2490 long long val;
2491
2492 if (enctype == REDIS_RDB_ENC_INT8) {
2493 if (fread(enc,1,1,fp) == 0) return NULL;
2494 val = (signed char)enc[0];
2495 } else if (enctype == REDIS_RDB_ENC_INT16) {
2496 uint16_t v;
2497 if (fread(enc,2,1,fp) == 0) return NULL;
2498 v = enc[0]|(enc[1]<<8);
2499 val = (int16_t)v;
2500 } else if (enctype == REDIS_RDB_ENC_INT32) {
2501 uint32_t v;
2502 if (fread(enc,4,1,fp) == 0) return NULL;
2503 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2504 val = (int32_t)v;
2505 } else {
2506 val = 0; /* anti-warning */
2507 assert(0!=0);
2508 }
2509 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2510 }
2511
2512 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2513 unsigned int len, clen;
2514 unsigned char *c = NULL;
2515 sds val = NULL;
2516
2517 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2518 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2519 if ((c = zmalloc(clen)) == NULL) goto err;
2520 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2521 if (fread(c,clen,1,fp) == 0) goto err;
2522 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2523 zfree(c);
2524 return createObject(REDIS_STRING,val);
2525 err:
2526 zfree(c);
2527 sdsfree(val);
2528 return NULL;
2529 }
2530
2531 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2532 int isencoded;
2533 uint32_t len;
2534 sds val;
2535
2536 len = rdbLoadLen(fp,rdbver,&isencoded);
2537 if (isencoded) {
2538 switch(len) {
2539 case REDIS_RDB_ENC_INT8:
2540 case REDIS_RDB_ENC_INT16:
2541 case REDIS_RDB_ENC_INT32:
2542 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2543 case REDIS_RDB_ENC_LZF:
2544 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2545 default:
2546 assert(0!=0);
2547 }
2548 }
2549
2550 if (len == REDIS_RDB_LENERR) return NULL;
2551 val = sdsnewlen(NULL,len);
2552 if (len && fread(val,len,1,fp) == 0) {
2553 sdsfree(val);
2554 return NULL;
2555 }
2556 return tryObjectSharing(createObject(REDIS_STRING,val));
2557 }
2558
2559 /* For information about double serialization check rdbSaveDoubleValue() */
2560 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2561 char buf[128];
2562 unsigned char len;
2563
2564 if (fread(&len,1,1,fp) == 0) return -1;
2565 switch(len) {
2566 case 255: *val = R_NegInf; return 0;
2567 case 254: *val = R_PosInf; return 0;
2568 case 253: *val = R_Nan; return 0;
2569 default:
2570 if (fread(buf,len,1,fp) == 0) return -1;
2571 sscanf(buf, "%lg", val);
2572 return 0;
2573 }
2574 }
2575
2576 static int rdbLoad(char *filename) {
2577 FILE *fp;
2578 robj *keyobj = NULL;
2579 uint32_t dbid;
2580 int type, retval, rdbver;
2581 dict *d = server.db[0].dict;
2582 redisDb *db = server.db+0;
2583 char buf[1024];
2584 time_t expiretime = -1, now = time(NULL);
2585
2586 fp = fopen(filename,"r");
2587 if (!fp) return REDIS_ERR;
2588 if (fread(buf,9,1,fp) == 0) goto eoferr;
2589 buf[9] = '\0';
2590 if (memcmp(buf,"REDIS",5) != 0) {
2591 fclose(fp);
2592 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2593 return REDIS_ERR;
2594 }
2595 rdbver = atoi(buf+5);
2596 if (rdbver > 1) {
2597 fclose(fp);
2598 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2599 return REDIS_ERR;
2600 }
2601 while(1) {
2602 robj *o;
2603
2604 /* Read type. */
2605 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2606 if (type == REDIS_EXPIRETIME) {
2607 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2608 /* We read the time so we need to read the object type again */
2609 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2610 }
2611 if (type == REDIS_EOF) break;
2612 /* Handle SELECT DB opcode as a special case */
2613 if (type == REDIS_SELECTDB) {
2614 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2615 goto eoferr;
2616 if (dbid >= (unsigned)server.dbnum) {
2617 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2618 exit(1);
2619 }
2620 db = server.db+dbid;
2621 d = db->dict;
2622 continue;
2623 }
2624 /* Read key */
2625 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2626
2627 if (type == REDIS_STRING) {
2628 /* Read string value */
2629 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2630 tryObjectEncoding(o);
2631 } else if (type == REDIS_LIST || type == REDIS_SET) {
2632 /* Read list/set value */
2633 uint32_t listlen;
2634
2635 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2636 goto eoferr;
2637 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2638 /* Load every single element of the list/set */
2639 while(listlen--) {
2640 robj *ele;
2641
2642 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2643 tryObjectEncoding(ele);
2644 if (type == REDIS_LIST) {
2645 listAddNodeTail((list*)o->ptr,ele);
2646 } else {
2647 dictAdd((dict*)o->ptr,ele,NULL);
2648 }
2649 }
2650 } else if (type == REDIS_ZSET) {
2651 /* Read list/set value */
2652 uint32_t zsetlen;
2653 zset *zs;
2654
2655 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2656 goto eoferr;
2657 o = createZsetObject();
2658 zs = o->ptr;
2659 /* Load every single element of the list/set */
2660 while(zsetlen--) {
2661 robj *ele;
2662 double *score = zmalloc(sizeof(double));
2663
2664 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2665 tryObjectEncoding(ele);
2666 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2667 dictAdd(zs->dict,ele,score);
2668 zslInsert(zs->zsl,*score,ele);
2669 incrRefCount(ele); /* added to skiplist */
2670 }
2671 } else {
2672 assert(0 != 0);
2673 }
2674 /* Add the new object in the hash table */
2675 retval = dictAdd(d,keyobj,o);
2676 if (retval == DICT_ERR) {
2677 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2678 exit(1);
2679 }
2680 /* Set the expire time if needed */
2681 if (expiretime != -1) {
2682 setExpire(db,keyobj,expiretime);
2683 /* Delete this key if already expired */
2684 if (expiretime < now) deleteKey(db,keyobj);
2685 expiretime = -1;
2686 }
2687 keyobj = o = NULL;
2688 }
2689 fclose(fp);
2690 return REDIS_OK;
2691
2692 eoferr: /* unexpected end of file is handled here with a fatal exit */
2693 if (keyobj) decrRefCount(keyobj);
2694 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2695 exit(1);
2696 return REDIS_ERR; /* Just to avoid warning */
2697 }
2698
2699 /*================================== Commands =============================== */
2700
2701 static void authCommand(redisClient *c) {
2702 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2703 c->authenticated = 1;
2704 addReply(c,shared.ok);
2705 } else {
2706 c->authenticated = 0;
2707 addReply(c,shared.err);
2708 }
2709 }
2710
2711 static void pingCommand(redisClient *c) {
2712 addReply(c,shared.pong);
2713 }
2714
2715 static void echoCommand(redisClient *c) {
2716 addReplyBulkLen(c,c->argv[1]);
2717 addReply(c,c->argv[1]);
2718 addReply(c,shared.crlf);
2719 }
2720
2721 /*=================================== Strings =============================== */
2722
2723 static void setGenericCommand(redisClient *c, int nx) {
2724 int retval;
2725
2726 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2727 if (retval == DICT_ERR) {
2728 if (!nx) {
2729 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2730 incrRefCount(c->argv[2]);
2731 } else {
2732 addReply(c,shared.czero);
2733 return;
2734 }
2735 } else {
2736 incrRefCount(c->argv[1]);
2737 incrRefCount(c->argv[2]);
2738 }
2739 server.dirty++;
2740 removeExpire(c->db,c->argv[1]);
2741 addReply(c, nx ? shared.cone : shared.ok);
2742 }
2743
2744 static void setCommand(redisClient *c) {
2745 setGenericCommand(c,0);
2746 }
2747
2748 static void setnxCommand(redisClient *c) {
2749 setGenericCommand(c,1);
2750 }
2751
2752 static void getCommand(redisClient *c) {
2753 robj *o = lookupKeyRead(c->db,c->argv[1]);
2754
2755 if (o == NULL) {
2756 addReply(c,shared.nullbulk);
2757 } else {
2758 if (o->type != REDIS_STRING) {
2759 addReply(c,shared.wrongtypeerr);
2760 } else {
2761 addReplyBulkLen(c,o);
2762 addReply(c,o);
2763 addReply(c,shared.crlf);
2764 }
2765 }
2766 }
2767
2768 static void getsetCommand(redisClient *c) {
2769 getCommand(c);
2770 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2771 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2772 } else {
2773 incrRefCount(c->argv[1]);
2774 }
2775 incrRefCount(c->argv[2]);
2776 server.dirty++;
2777 removeExpire(c->db,c->argv[1]);
2778 }
2779
2780 static void mgetCommand(redisClient *c) {
2781 int j;
2782
2783 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2784 for (j = 1; j < c->argc; j++) {
2785 robj *o = lookupKeyRead(c->db,c->argv[j]);
2786 if (o == NULL) {
2787 addReply(c,shared.nullbulk);
2788 } else {
2789 if (o->type != REDIS_STRING) {
2790 addReply(c,shared.nullbulk);
2791 } else {
2792 addReplyBulkLen(c,o);
2793 addReply(c,o);
2794 addReply(c,shared.crlf);
2795 }
2796 }
2797 }
2798 }
2799
2800 static void incrDecrCommand(redisClient *c, long long incr) {
2801 long long value;
2802 int retval;
2803 robj *o;
2804
2805 o = lookupKeyWrite(c->db,c->argv[1]);
2806 if (o == NULL) {
2807 value = 0;
2808 } else {
2809 if (o->type != REDIS_STRING) {
2810 value = 0;
2811 } else {
2812 char *eptr;
2813
2814 if (o->encoding == REDIS_ENCODING_RAW)
2815 value = strtoll(o->ptr, &eptr, 10);
2816 else if (o->encoding == REDIS_ENCODING_INT)
2817 value = (long)o->ptr;
2818 else
2819 assert(1 != 1);
2820 }
2821 }
2822
2823 value += incr;
2824 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2825 tryObjectEncoding(o);
2826 retval = dictAdd(c->db->dict,c->argv[1],o);
2827 if (retval == DICT_ERR) {
2828 dictReplace(c->db->dict,c->argv[1],o);
2829 removeExpire(c->db,c->argv[1]);
2830 } else {
2831 incrRefCount(c->argv[1]);
2832 }
2833 server.dirty++;
2834 addReply(c,shared.colon);
2835 addReply(c,o);
2836 addReply(c,shared.crlf);
2837 }
2838
2839 static void incrCommand(redisClient *c) {
2840 incrDecrCommand(c,1);
2841 }
2842
2843 static void decrCommand(redisClient *c) {
2844 incrDecrCommand(c,-1);
2845 }
2846
2847 static void incrbyCommand(redisClient *c) {
2848 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2849 incrDecrCommand(c,incr);
2850 }
2851
2852 static void decrbyCommand(redisClient *c) {
2853 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2854 incrDecrCommand(c,-incr);
2855 }
2856
2857 /* ========================= Type agnostic commands ========================= */
2858
2859 static void delCommand(redisClient *c) {
2860 int deleted = 0, j;
2861
2862 for (j = 1; j < c->argc; j++) {
2863 if (deleteKey(c->db,c->argv[j])) {
2864 server.dirty++;
2865 deleted++;
2866 }
2867 }
2868 switch(deleted) {
2869 case 0:
2870 addReply(c,shared.czero);
2871 break;
2872 case 1:
2873 addReply(c,shared.cone);
2874 break;
2875 default:
2876 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2877 break;
2878 }
2879 }
2880
2881 static void existsCommand(redisClient *c) {
2882 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2883 }
2884
2885 static void selectCommand(redisClient *c) {
2886 int id = atoi(c->argv[1]->ptr);
2887
2888 if (selectDb(c,id) == REDIS_ERR) {
2889 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2890 } else {
2891 addReply(c,shared.ok);
2892 }
2893 }
2894
2895 static void randomkeyCommand(redisClient *c) {
2896 dictEntry *de;
2897
2898 while(1) {
2899 de = dictGetRandomKey(c->db->dict);
2900 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2901 }
2902 if (de == NULL) {
2903 addReply(c,shared.plus);
2904 addReply(c,shared.crlf);
2905 } else {
2906 addReply(c,shared.plus);
2907 addReply(c,dictGetEntryKey(de));
2908 addReply(c,shared.crlf);
2909 }
2910 }
2911
2912 static void keysCommand(redisClient *c) {
2913 dictIterator *di;
2914 dictEntry *de;
2915 sds pattern = c->argv[1]->ptr;
2916 int plen = sdslen(pattern);
2917 int numkeys = 0, keyslen = 0;
2918 robj *lenobj = createObject(REDIS_STRING,NULL);
2919
2920 di = dictGetIterator(c->db->dict);
2921 addReply(c,lenobj);
2922 decrRefCount(lenobj);
2923 while((de = dictNext(di)) != NULL) {
2924 robj *keyobj = dictGetEntryKey(de);
2925
2926 sds key = keyobj->ptr;
2927 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2928 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2929 if (expireIfNeeded(c->db,keyobj) == 0) {
2930 if (numkeys != 0)
2931 addReply(c,shared.space);
2932 addReply(c,keyobj);
2933 numkeys++;
2934 keyslen += sdslen(key);
2935 }
2936 }
2937 }
2938 dictReleaseIterator(di);
2939 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2940 addReply(c,shared.crlf);
2941 }
2942
2943 static void dbsizeCommand(redisClient *c) {
2944 addReplySds(c,
2945 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2946 }
2947
2948 static void lastsaveCommand(redisClient *c) {
2949 addReplySds(c,
2950 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2951 }
2952
2953 static void typeCommand(redisClient *c) {
2954 robj *o;
2955 char *type;
2956
2957 o = lookupKeyRead(c->db,c->argv[1]);
2958 if (o == NULL) {
2959 type = "+none";
2960 } else {
2961 switch(o->type) {
2962 case REDIS_STRING: type = "+string"; break;
2963 case REDIS_LIST: type = "+list"; break;
2964 case REDIS_SET: type = "+set"; break;
2965 default: type = "unknown"; break;
2966 }
2967 }
2968 addReplySds(c,sdsnew(type));
2969 addReply(c,shared.crlf);
2970 }
2971
2972 static void saveCommand(redisClient *c) {
2973 if (server.bgsaveinprogress) {
2974 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2975 return;
2976 }
2977 if (rdbSave(server.dbfilename) == REDIS_OK) {
2978 addReply(c,shared.ok);
2979 } else {
2980 addReply(c,shared.err);
2981 }
2982 }
2983
2984 static void bgsaveCommand(redisClient *c) {
2985 if (server.bgsaveinprogress) {
2986 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2987 return;
2988 }
2989 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2990 addReply(c,shared.ok);
2991 } else {
2992 addReply(c,shared.err);
2993 }
2994 }
2995
2996 static void shutdownCommand(redisClient *c) {
2997 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2998 /* Kill the saving child if there is a background saving in progress.
2999 We want to avoid race conditions, for instance our saving child may
3000 overwrite the synchronous saving did by SHUTDOWN. */
3001 if (server.bgsaveinprogress) {
3002 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3003 kill(server.bgsavechildpid,SIGKILL);
3004 rdbRemoveTempFile(server.bgsavechildpid);
3005 }
3006 /* SYNC SAVE */
3007 if (rdbSave(server.dbfilename) == REDIS_OK) {
3008 if (server.daemonize)
3009 unlink(server.pidfile);
3010 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3011 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3012 exit(1);
3013 } else {
3014 /* Ooops.. error saving! The best we can do is to continue operating.
3015 * Note that if there was a background saving process, in the next
3016 * cron() Redis will be notified that the background saving aborted,
3017 * handling special stuff like slaves pending for synchronization... */
3018 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3019 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3020 }
3021 }
3022
3023 static void renameGenericCommand(redisClient *c, int nx) {
3024 robj *o;
3025
3026 /* To use the same key as src and dst is probably an error */
3027 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3028 addReply(c,shared.sameobjecterr);
3029 return;
3030 }
3031
3032 o = lookupKeyWrite(c->db,c->argv[1]);
3033 if (o == NULL) {
3034 addReply(c,shared.nokeyerr);
3035 return;
3036 }
3037 incrRefCount(o);
3038 deleteIfVolatile(c->db,c->argv[2]);
3039 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3040 if (nx) {
3041 decrRefCount(o);
3042 addReply(c,shared.czero);
3043 return;
3044 }
3045 dictReplace(c->db->dict,c->argv[2],o);
3046 } else {
3047 incrRefCount(c->argv[2]);
3048 }
3049 deleteKey(c->db,c->argv[1]);
3050 server.dirty++;
3051 addReply(c,nx ? shared.cone : shared.ok);
3052 }
3053
3054 static void renameCommand(redisClient *c) {
3055 renameGenericCommand(c,0);
3056 }
3057
3058 static void renamenxCommand(redisClient *c) {
3059 renameGenericCommand(c,1);
3060 }
3061
3062 static void moveCommand(redisClient *c) {
3063 robj *o;
3064 redisDb *src, *dst;
3065 int srcid;
3066
3067 /* Obtain source and target DB pointers */
3068 src = c->db;
3069 srcid = c->db->id;
3070 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3071 addReply(c,shared.outofrangeerr);
3072 return;
3073 }
3074 dst = c->db;
3075 selectDb(c,srcid); /* Back to the source DB */
3076
3077 /* If the user is moving using as target the same
3078 * DB as the source DB it is probably an error. */
3079 if (src == dst) {
3080 addReply(c,shared.sameobjecterr);
3081 return;
3082 }
3083
3084 /* Check if the element exists and get a reference */
3085 o = lookupKeyWrite(c->db,c->argv[1]);
3086 if (!o) {
3087 addReply(c,shared.czero);
3088 return;
3089 }
3090
3091 /* Try to add the element to the target DB */
3092 deleteIfVolatile(dst,c->argv[1]);
3093 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3094 addReply(c,shared.czero);
3095 return;
3096 }
3097 incrRefCount(c->argv[1]);
3098 incrRefCount(o);
3099
3100 /* OK! key moved, free the entry in the source DB */
3101 deleteKey(src,c->argv[1]);
3102 server.dirty++;
3103 addReply(c,shared.cone);
3104 }
3105
3106 /* =================================== Lists ================================ */
3107 static void pushGenericCommand(redisClient *c, int where) {
3108 robj *lobj;
3109 list *list;
3110
3111 lobj = lookupKeyWrite(c->db,c->argv[1]);
3112 if (lobj == NULL) {
3113 lobj = createListObject();
3114 list = lobj->ptr;
3115 if (where == REDIS_HEAD) {
3116 listAddNodeHead(list,c->argv[2]);
3117 } else {
3118 listAddNodeTail(list,c->argv[2]);
3119 }
3120 dictAdd(c->db->dict,c->argv[1],lobj);
3121 incrRefCount(c->argv[1]);
3122 incrRefCount(c->argv[2]);
3123 } else {
3124 if (lobj->type != REDIS_LIST) {
3125 addReply(c,shared.wrongtypeerr);
3126 return;
3127 }
3128 list = lobj->ptr;
3129 if (where == REDIS_HEAD) {
3130 listAddNodeHead(list,c->argv[2]);
3131 } else {
3132 listAddNodeTail(list,c->argv[2]);
3133 }
3134 incrRefCount(c->argv[2]);
3135 }
3136 server.dirty++;
3137 addReply(c,shared.ok);
3138 }
3139
3140 static void lpushCommand(redisClient *c) {
3141 pushGenericCommand(c,REDIS_HEAD);
3142 }
3143
3144 static void rpushCommand(redisClient *c) {
3145 pushGenericCommand(c,REDIS_TAIL);
3146 }
3147
3148 static void llenCommand(redisClient *c) {
3149 robj *o;
3150 list *l;
3151
3152 o = lookupKeyRead(c->db,c->argv[1]);
3153 if (o == NULL) {
3154 addReply(c,shared.czero);
3155 return;
3156 } else {
3157 if (o->type != REDIS_LIST) {
3158 addReply(c,shared.wrongtypeerr);
3159 } else {
3160 l = o->ptr;
3161 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3162 }
3163 }
3164 }
3165
3166 static void lindexCommand(redisClient *c) {
3167 robj *o;
3168 int index = atoi(c->argv[2]->ptr);
3169
3170 o = lookupKeyRead(c->db,c->argv[1]);
3171 if (o == NULL) {
3172 addReply(c,shared.nullbulk);
3173 } else {
3174 if (o->type != REDIS_LIST) {
3175 addReply(c,shared.wrongtypeerr);
3176 } else {
3177 list *list = o->ptr;
3178 listNode *ln;
3179
3180 ln = listIndex(list, index);
3181 if (ln == NULL) {
3182 addReply(c,shared.nullbulk);
3183 } else {
3184 robj *ele = listNodeValue(ln);
3185 addReplyBulkLen(c,ele);
3186 addReply(c,ele);
3187 addReply(c,shared.crlf);
3188 }
3189 }
3190 }
3191 }
3192
3193 static void lsetCommand(redisClient *c) {
3194 robj *o;
3195 int index = atoi(c->argv[2]->ptr);
3196
3197 o = lookupKeyWrite(c->db,c->argv[1]);
3198 if (o == NULL) {
3199 addReply(c,shared.nokeyerr);
3200 } else {
3201 if (o->type != REDIS_LIST) {
3202 addReply(c,shared.wrongtypeerr);
3203 } else {
3204 list *list = o->ptr;
3205 listNode *ln;
3206
3207 ln = listIndex(list, index);
3208 if (ln == NULL) {
3209 addReply(c,shared.outofrangeerr);
3210 } else {
3211 robj *ele = listNodeValue(ln);
3212
3213 decrRefCount(ele);
3214 listNodeValue(ln) = c->argv[3];
3215 incrRefCount(c->argv[3]);
3216 addReply(c,shared.ok);
3217 server.dirty++;
3218 }
3219 }
3220 }
3221 }
3222
3223 static void popGenericCommand(redisClient *c, int where) {
3224 robj *o;
3225
3226 o = lookupKeyWrite(c->db,c->argv[1]);
3227 if (o == NULL) {
3228 addReply(c,shared.nullbulk);
3229 } else {
3230 if (o->type != REDIS_LIST) {
3231 addReply(c,shared.wrongtypeerr);
3232 } else {
3233 list *list = o->ptr;
3234 listNode *ln;
3235
3236 if (where == REDIS_HEAD)
3237 ln = listFirst(list);
3238 else
3239 ln = listLast(list);
3240
3241 if (ln == NULL) {
3242 addReply(c,shared.nullbulk);
3243 } else {
3244 robj *ele = listNodeValue(ln);
3245 addReplyBulkLen(c,ele);
3246 addReply(c,ele);
3247 addReply(c,shared.crlf);
3248 listDelNode(list,ln);
3249 server.dirty++;
3250 }
3251 }
3252 }
3253 }
3254
3255 static void lpopCommand(redisClient *c) {
3256 popGenericCommand(c,REDIS_HEAD);
3257 }
3258
3259 static void rpopCommand(redisClient *c) {
3260 popGenericCommand(c,REDIS_TAIL);
3261 }
3262
3263 static void lrangeCommand(redisClient *c) {
3264 robj *o;
3265 int start = atoi(c->argv[2]->ptr);
3266 int end = atoi(c->argv[3]->ptr);
3267
3268 o = lookupKeyRead(c->db,c->argv[1]);
3269 if (o == NULL) {
3270 addReply(c,shared.nullmultibulk);
3271 } else {
3272 if (o->type != REDIS_LIST) {
3273 addReply(c,shared.wrongtypeerr);
3274 } else {
3275 list *list = o->ptr;
3276 listNode *ln;
3277 int llen = listLength(list);
3278 int rangelen, j;
3279 robj *ele;
3280
3281 /* convert negative indexes */
3282 if (start < 0) start = llen+start;
3283 if (end < 0) end = llen+end;
3284 if (start < 0) start = 0;
3285 if (end < 0) end = 0;
3286
3287 /* indexes sanity checks */
3288 if (start > end || start >= llen) {
3289 /* Out of range start or start > end result in empty list */
3290 addReply(c,shared.emptymultibulk);
3291 return;
3292 }
3293 if (end >= llen) end = llen-1;
3294 rangelen = (end-start)+1;
3295
3296 /* Return the result in form of a multi-bulk reply */
3297 ln = listIndex(list, start);
3298 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3299 for (j = 0; j < rangelen; j++) {
3300 ele = listNodeValue(ln);
3301 addReplyBulkLen(c,ele);
3302 addReply(c,ele);
3303 addReply(c,shared.crlf);
3304 ln = ln->next;
3305 }
3306 }
3307 }
3308 }
3309
3310 static void ltrimCommand(redisClient *c) {
3311 robj *o;
3312 int start = atoi(c->argv[2]->ptr);
3313 int end = atoi(c->argv[3]->ptr);
3314
3315 o = lookupKeyWrite(c->db,c->argv[1]);
3316 if (o == NULL) {
3317 addReply(c,shared.nokeyerr);
3318 } else {
3319 if (o->type != REDIS_LIST) {
3320 addReply(c,shared.wrongtypeerr);
3321 } else {
3322 list *list = o->ptr;
3323 listNode *ln;
3324 int llen = listLength(list);
3325 int j, ltrim, rtrim;
3326
3327 /* convert negative indexes */
3328 if (start < 0) start = llen+start;
3329 if (end < 0) end = llen+end;
3330 if (start < 0) start = 0;
3331 if (end < 0) end = 0;
3332
3333 /* indexes sanity checks */
3334 if (start > end || start >= llen) {
3335 /* Out of range start or start > end result in empty list */
3336 ltrim = llen;
3337 rtrim = 0;
3338 } else {
3339 if (end >= llen) end = llen-1;
3340 ltrim = start;
3341 rtrim = llen-end-1;
3342 }
3343
3344 /* Remove list elements to perform the trim */
3345 for (j = 0; j < ltrim; j++) {
3346 ln = listFirst(list);
3347 listDelNode(list,ln);
3348 }
3349 for (j = 0; j < rtrim; j++) {
3350 ln = listLast(list);
3351 listDelNode(list,ln);
3352 }
3353 server.dirty++;
3354 addReply(c,shared.ok);
3355 }
3356 }
3357 }
3358
3359 static void lremCommand(redisClient *c) {
3360 robj *o;
3361
3362 o = lookupKeyWrite(c->db,c->argv[1]);
3363 if (o == NULL) {
3364 addReply(c,shared.czero);
3365 } else {
3366 if (o->type != REDIS_LIST) {
3367 addReply(c,shared.wrongtypeerr);
3368 } else {
3369 list *list = o->ptr;
3370 listNode *ln, *next;
3371 int toremove = atoi(c->argv[2]->ptr);
3372 int removed = 0;
3373 int fromtail = 0;
3374
3375 if (toremove < 0) {
3376 toremove = -toremove;
3377 fromtail = 1;
3378 }
3379 ln = fromtail ? list->tail : list->head;
3380 while (ln) {
3381 robj *ele = listNodeValue(ln);
3382
3383 next = fromtail ? ln->prev : ln->next;
3384 if (compareStringObjects(ele,c->argv[3]) == 0) {
3385 listDelNode(list,ln);
3386 server.dirty++;
3387 removed++;
3388 if (toremove && removed == toremove) break;
3389 }
3390 ln = next;
3391 }
3392 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3393 }
3394 }
3395 }
3396
3397 /* ==================================== Sets ================================ */
3398
3399 static void saddCommand(redisClient *c) {
3400 robj *set;
3401
3402 set = lookupKeyWrite(c->db,c->argv[1]);
3403 if (set == NULL) {
3404 set = createSetObject();
3405 dictAdd(c->db->dict,c->argv[1],set);
3406 incrRefCount(c->argv[1]);
3407 } else {
3408 if (set->type != REDIS_SET) {
3409 addReply(c,shared.wrongtypeerr);
3410 return;
3411 }
3412 }
3413 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3414 incrRefCount(c->argv[2]);
3415 server.dirty++;
3416 addReply(c,shared.cone);
3417 } else {
3418 addReply(c,shared.czero);
3419 }
3420 }
3421
3422 static void sremCommand(redisClient *c) {
3423 robj *set;
3424
3425 set = lookupKeyWrite(c->db,c->argv[1]);
3426 if (set == NULL) {
3427 addReply(c,shared.czero);
3428 } else {
3429 if (set->type != REDIS_SET) {
3430 addReply(c,shared.wrongtypeerr);
3431 return;
3432 }
3433 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3434 server.dirty++;
3435 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3436 addReply(c,shared.cone);
3437 } else {
3438 addReply(c,shared.czero);
3439 }
3440 }
3441 }
3442
3443 static void smoveCommand(redisClient *c) {
3444 robj *srcset, *dstset;
3445
3446 srcset = lookupKeyWrite(c->db,c->argv[1]);
3447 dstset = lookupKeyWrite(c->db,c->argv[2]);
3448
3449 /* If the source key does not exist return 0, if it's of the wrong type
3450 * raise an error */
3451 if (srcset == NULL || srcset->type != REDIS_SET) {
3452 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3453 return;
3454 }
3455 /* Error if the destination key is not a set as well */
3456 if (dstset && dstset->type != REDIS_SET) {
3457 addReply(c,shared.wrongtypeerr);
3458 return;
3459 }
3460 /* Remove the element from the source set */
3461 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3462 /* Key not found in the src set! return zero */
3463 addReply(c,shared.czero);
3464 return;
3465 }
3466 server.dirty++;
3467 /* Add the element to the destination set */
3468 if (!dstset) {
3469 dstset = createSetObject();
3470 dictAdd(c->db->dict,c->argv[2],dstset);
3471 incrRefCount(c->argv[2]);
3472 }
3473 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3474 incrRefCount(c->argv[3]);
3475 addReply(c,shared.cone);
3476 }
3477
3478 static void sismemberCommand(redisClient *c) {
3479 robj *set;
3480
3481 set = lookupKeyRead(c->db,c->argv[1]);
3482 if (set == NULL) {
3483 addReply(c,shared.czero);
3484 } else {
3485 if (set->type != REDIS_SET) {
3486 addReply(c,shared.wrongtypeerr);
3487 return;
3488 }
3489 if (dictFind(set->ptr,c->argv[2]))
3490 addReply(c,shared.cone);
3491 else
3492 addReply(c,shared.czero);
3493 }
3494 }
3495
3496 static void scardCommand(redisClient *c) {
3497 robj *o;
3498 dict *s;
3499
3500 o = lookupKeyRead(c->db,c->argv[1]);
3501 if (o == NULL) {
3502 addReply(c,shared.czero);
3503 return;
3504 } else {
3505 if (o->type != REDIS_SET) {
3506 addReply(c,shared.wrongtypeerr);
3507 } else {
3508 s = o->ptr;
3509 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3510 dictSize(s)));
3511 }
3512 }
3513 }
3514
3515 static void spopCommand(redisClient *c) {
3516 robj *set;
3517 dictEntry *de;
3518
3519 set = lookupKeyWrite(c->db,c->argv[1]);
3520 if (set == NULL) {
3521 addReply(c,shared.nullbulk);
3522 } else {
3523 if (set->type != REDIS_SET) {
3524 addReply(c,shared.wrongtypeerr);
3525 return;
3526 }
3527 de = dictGetRandomKey(set->ptr);
3528 if (de == NULL) {
3529 addReply(c,shared.nullbulk);
3530 } else {
3531 robj *ele = dictGetEntryKey(de);
3532
3533 addReplyBulkLen(c,ele);
3534 addReply(c,ele);
3535 addReply(c,shared.crlf);
3536 dictDelete(set->ptr,ele);
3537 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3538 server.dirty++;
3539 }
3540 }
3541 }
3542
3543 static void srandmemberCommand(redisClient *c) {
3544 robj *set;
3545 dictEntry *de;
3546
3547 set = lookupKeyRead(c->db,c->argv[1]);
3548 if (set == NULL) {
3549 addReply(c,shared.nullbulk);
3550 } else {
3551 if (set->type != REDIS_SET) {
3552 addReply(c,shared.wrongtypeerr);
3553 return;
3554 }
3555 de = dictGetRandomKey(set->ptr);
3556 if (de == NULL) {
3557 addReply(c,shared.nullbulk);
3558 } else {
3559 robj *ele = dictGetEntryKey(de);
3560
3561 addReplyBulkLen(c,ele);
3562 addReply(c,ele);
3563 addReply(c,shared.crlf);
3564 }
3565 }
3566 }
3567
3568 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3569 dict **d1 = (void*) s1, **d2 = (void*) s2;
3570
3571 return dictSize(*d1)-dictSize(*d2);
3572 }
3573
3574 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3575 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3576 dictIterator *di;
3577 dictEntry *de;
3578 robj *lenobj = NULL, *dstset = NULL;
3579 int j, cardinality = 0;
3580
3581 for (j = 0; j < setsnum; j++) {
3582 robj *setobj;
3583
3584 setobj = dstkey ?
3585 lookupKeyWrite(c->db,setskeys[j]) :
3586 lookupKeyRead(c->db,setskeys[j]);
3587 if (!setobj) {
3588 zfree(dv);
3589 if (dstkey) {
3590 deleteKey(c->db,dstkey);
3591 addReply(c,shared.ok);
3592 } else {
3593 addReply(c,shared.nullmultibulk);
3594 }
3595 return;
3596 }
3597 if (setobj->type != REDIS_SET) {
3598 zfree(dv);
3599 addReply(c,shared.wrongtypeerr);
3600 return;
3601 }
3602 dv[j] = setobj->ptr;
3603 }
3604 /* Sort sets from the smallest to largest, this will improve our
3605 * algorithm's performace */
3606 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3607
3608 /* The first thing we should output is the total number of elements...
3609 * since this is a multi-bulk write, but at this stage we don't know
3610 * the intersection set size, so we use a trick, append an empty object
3611 * to the output list and save the pointer to later modify it with the
3612 * right length */
3613 if (!dstkey) {
3614 lenobj = createObject(REDIS_STRING,NULL);
3615 addReply(c,lenobj);
3616 decrRefCount(lenobj);
3617 } else {
3618 /* If we have a target key where to store the resulting set
3619 * create this key with an empty set inside */
3620 dstset = createSetObject();
3621 }
3622
3623 /* Iterate all the elements of the first (smallest) set, and test
3624 * the element against all the other sets, if at least one set does
3625 * not include the element it is discarded */
3626 di = dictGetIterator(dv[0]);
3627
3628 while((de = dictNext(di)) != NULL) {
3629 robj *ele;
3630
3631 for (j = 1; j < setsnum; j++)
3632 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3633 if (j != setsnum)
3634 continue; /* at least one set does not contain the member */
3635 ele = dictGetEntryKey(de);
3636 if (!dstkey) {
3637 addReplyBulkLen(c,ele);
3638 addReply(c,ele);
3639 addReply(c,shared.crlf);
3640 cardinality++;
3641 } else {
3642 dictAdd(dstset->ptr,ele,NULL);
3643 incrRefCount(ele);
3644 }
3645 }
3646 dictReleaseIterator(di);
3647
3648 if (dstkey) {
3649 /* Store the resulting set into the target */
3650 deleteKey(c->db,dstkey);
3651 dictAdd(c->db->dict,dstkey,dstset);
3652 incrRefCount(dstkey);
3653 }
3654
3655 if (!dstkey) {
3656 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3657 } else {
3658 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3659 dictSize((dict*)dstset->ptr)));
3660 server.dirty++;
3661 }
3662 zfree(dv);
3663 }
3664
3665 static void sinterCommand(redisClient *c) {
3666 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3667 }
3668
3669 static void sinterstoreCommand(redisClient *c) {
3670 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3671 }
3672
3673 #define REDIS_OP_UNION 0
3674 #define REDIS_OP_DIFF 1
3675
3676 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3677 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3678 dictIterator *di;
3679 dictEntry *de;
3680 robj *dstset = NULL;
3681 int j, cardinality = 0;
3682
3683 for (j = 0; j < setsnum; j++) {
3684 robj *setobj;
3685
3686 setobj = dstkey ?
3687 lookupKeyWrite(c->db,setskeys[j]) :
3688 lookupKeyRead(c->db,setskeys[j]);
3689 if (!setobj) {
3690 dv[j] = NULL;
3691 continue;
3692 }
3693 if (setobj->type != REDIS_SET) {
3694 zfree(dv);
3695 addReply(c,shared.wrongtypeerr);
3696 return;
3697 }
3698 dv[j] = setobj->ptr;
3699 }
3700
3701 /* We need a temp set object to store our union. If the dstkey
3702 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3703 * this set object will be the resulting object to set into the target key*/
3704 dstset = createSetObject();
3705
3706 /* Iterate all the elements of all the sets, add every element a single
3707 * time to the result set */
3708 for (j = 0; j < setsnum; j++) {
3709 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3710 if (!dv[j]) continue; /* non existing keys are like empty sets */
3711
3712 di = dictGetIterator(dv[j]);
3713
3714 while((de = dictNext(di)) != NULL) {
3715 robj *ele;
3716
3717 /* dictAdd will not add the same element multiple times */
3718 ele = dictGetEntryKey(de);
3719 if (op == REDIS_OP_UNION || j == 0) {
3720 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3721 incrRefCount(ele);
3722 cardinality++;
3723 }
3724 } else if (op == REDIS_OP_DIFF) {
3725 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3726 cardinality--;
3727 }
3728 }
3729 }
3730 dictReleaseIterator(di);
3731
3732 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3733 }
3734
3735 /* Output the content of the resulting set, if not in STORE mode */
3736 if (!dstkey) {
3737 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3738 di = dictGetIterator(dstset->ptr);
3739 while((de = dictNext(di)) != NULL) {
3740 robj *ele;
3741
3742 ele = dictGetEntryKey(de);
3743 addReplyBulkLen(c,ele);
3744 addReply(c,ele);
3745 addReply(c,shared.crlf);
3746 }
3747 dictReleaseIterator(di);
3748 } else {
3749 /* If we have a target key where to store the resulting set
3750 * create this key with the result set inside */
3751 deleteKey(c->db,dstkey);
3752 dictAdd(c->db->dict,dstkey,dstset);
3753 incrRefCount(dstkey);
3754 }
3755
3756 /* Cleanup */
3757 if (!dstkey) {
3758 decrRefCount(dstset);
3759 } else {
3760 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3761 dictSize((dict*)dstset->ptr)));
3762 server.dirty++;
3763 }
3764 zfree(dv);
3765 }
3766
3767 static void sunionCommand(redisClient *c) {
3768 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3769 }
3770
3771 static void sunionstoreCommand(redisClient *c) {
3772 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3773 }
3774
3775 static void sdiffCommand(redisClient *c) {
3776 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3777 }
3778
3779 static void sdiffstoreCommand(redisClient *c) {
3780 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3781 }
3782
3783 /* ==================================== ZSets =============================== */
3784
3785 /* ZSETs are ordered sets using two data structures to hold the same elements
3786 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3787 * data structure.
3788 *
3789 * The elements are added to an hash table mapping Redis objects to scores.
3790 * At the same time the elements are added to a skip list mapping scores
3791 * to Redis objects (so objects are sorted by scores in this "view"). */
3792
3793 /* This skiplist implementation is almost a C translation of the original
3794 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3795 * Alternative to Balanced Trees", modified in three ways:
3796 * a) this implementation allows for repeated values.
3797 * b) the comparison is not just by key (our 'score') but by satellite data.
3798 * c) there is a back pointer, so it's a doubly linked list with the back
3799 * pointers being only at "level 1". This allows to traverse the list
3800 * from tail to head, useful for ZREVRANGE. */
3801
3802 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3803 zskiplistNode *zn = zmalloc(sizeof(*zn));
3804
3805 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3806 zn->score = score;
3807 zn->obj = obj;
3808 return zn;
3809 }
3810
3811 static zskiplist *zslCreate(void) {
3812 int j;
3813 zskiplist *zsl;
3814
3815 zsl = zmalloc(sizeof(*zsl));
3816 zsl->level = 1;
3817 zsl->length = 0;
3818 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3819 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3820 zsl->header->forward[j] = NULL;
3821 zsl->header->backward = NULL;
3822 zsl->tail = NULL;
3823 return zsl;
3824 }
3825
3826 static void zslFreeNode(zskiplistNode *node) {
3827 decrRefCount(node->obj);
3828 zfree(node->forward);
3829 zfree(node);
3830 }
3831
3832 static void zslFree(zskiplist *zsl) {
3833 zskiplistNode *node = zsl->header->forward[0], *next;
3834
3835 zfree(zsl->header->forward);
3836 zfree(zsl->header);
3837 while(node) {
3838 next = node->forward[0];
3839 zslFreeNode(node);
3840 node = next;
3841 }
3842 zfree(zsl);
3843 }
3844
3845 static int zslRandomLevel(void) {
3846 int level = 1;
3847 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3848 level += 1;
3849 return level;
3850 }
3851
3852 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3853 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3854 int i, level;
3855
3856 x = zsl->header;
3857 for (i = zsl->level-1; i >= 0; i--) {
3858 while (x->forward[i] && x->forward[i]->score < score)
3859 x = x->forward[i];
3860 update[i] = x;
3861 }
3862 /* we assume the key is not already inside, since we allow duplicated
3863 * scores, and the re-insertion of score and redis object should never
3864 * happpen since the caller of zslInsert() should test in the hash table
3865 * if the element is already inside or not. */
3866 level = zslRandomLevel();
3867 if (level > zsl->level) {
3868 for (i = zsl->level; i < level; i++)
3869 update[i] = zsl->header;
3870 zsl->level = level;
3871 }
3872 x = zslCreateNode(level,score,obj);
3873 for (i = 0; i < level; i++) {
3874 x->forward[i] = update[i]->forward[i];
3875 update[i]->forward[i] = x;
3876 }
3877 x->backward = (update[0] == zsl->header) ? NULL : update[0];
3878 if (x->forward[0])
3879 x->forward[0]->backward = x;
3880 else
3881 zsl->tail = x;
3882 zsl->length++;
3883 }
3884
3885 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
3886 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3887 int i;
3888
3889 x = zsl->header;
3890 for (i = zsl->level-1; i >= 0; i--) {
3891 while (x->forward[i] && x->forward[i]->score < score)
3892 x = x->forward[i];
3893 update[i] = x;
3894 }
3895 /* We may have multiple elements with the same score, what we need
3896 * is to find the element with both the right score and object. */
3897 x = x->forward[0];
3898 while(x->score == score) {
3899 if (compareStringObjects(x->obj,obj) == 0) {
3900 for (i = 0; i < zsl->level; i++) {
3901 if (update[i]->forward[i] != x) break;
3902 update[i]->forward[i] = x->forward[i];
3903 }
3904 if (x->forward[0]) {
3905 x->forward[0]->backward = (x->backward == zsl->header) ?
3906 NULL : x->backward;
3907 } else {
3908 zsl->tail = x->backward;
3909 }
3910 zslFreeNode(x);
3911 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
3912 zsl->level--;
3913 zsl->length--;
3914 return 1;
3915 } else {
3916 x = x->forward[0];
3917 if (!x) return 0; /* end of the list reached, not found */
3918 }
3919 }
3920 return 0; /* not found */
3921 }
3922
3923 /* The actual Z-commands implementations */
3924
3925 static void zaddCommand(redisClient *c) {
3926 robj *zsetobj;
3927 zset *zs;
3928 double *score;
3929
3930 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
3931 if (zsetobj == NULL) {
3932 zsetobj = createZsetObject();
3933 dictAdd(c->db->dict,c->argv[1],zsetobj);
3934 incrRefCount(c->argv[1]);
3935 } else {
3936 if (zsetobj->type != REDIS_ZSET) {
3937 addReply(c,shared.wrongtypeerr);
3938 return;
3939 }
3940 }
3941 score = zmalloc(sizeof(double));
3942 *score = strtod(c->argv[2]->ptr,NULL);
3943 zs = zsetobj->ptr;
3944 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
3945 /* case 1: New element */
3946 incrRefCount(c->argv[3]); /* added to hash */
3947 zslInsert(zs->zsl,*score,c->argv[3]);
3948 incrRefCount(c->argv[3]); /* added to skiplist */
3949 server.dirty++;
3950 addReply(c,shared.cone);
3951 } else {
3952 dictEntry *de;
3953 double *oldscore;
3954
3955 /* case 2: Score update operation */
3956 de = dictFind(zs->dict,c->argv[3]);
3957 assert(de != NULL);
3958 oldscore = dictGetEntryVal(de);
3959 if (*score != *oldscore) {
3960 int deleted;
3961
3962 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
3963 assert(deleted != 0);
3964 zslInsert(zs->zsl,*score,c->argv[3]);
3965 incrRefCount(c->argv[3]);
3966 dictReplace(zs->dict,c->argv[3],score);
3967 server.dirty++;
3968 } else {
3969 zfree(score);
3970 }
3971 addReply(c,shared.czero);
3972 }
3973 }
3974
3975 static void zremCommand(redisClient *c) {
3976 robj *zsetobj;
3977 zset *zs;
3978
3979 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
3980 if (zsetobj == NULL) {
3981 addReply(c,shared.czero);
3982 } else {
3983 dictEntry *de;
3984 double *oldscore;
3985 int deleted;
3986
3987 if (zsetobj->type != REDIS_ZSET) {
3988 addReply(c,shared.wrongtypeerr);
3989 return;
3990 }
3991 zs = zsetobj->ptr;
3992 de = dictFind(zs->dict,c->argv[2]);
3993 if (de == NULL) {
3994 addReply(c,shared.czero);
3995 return;
3996 }
3997 /* Delete from the skiplist */
3998 oldscore = dictGetEntryVal(de);
3999 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4000 assert(deleted != 0);
4001
4002 /* Delete from the hash table */
4003 dictDelete(zs->dict,c->argv[2]);
4004 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4005 server.dirty++;
4006 addReply(c,shared.cone);
4007 }
4008 }
4009
4010 static void zrangeGenericCommand(redisClient *c, int reverse) {
4011 robj *o;
4012 int start = atoi(c->argv[2]->ptr);
4013 int end = atoi(c->argv[3]->ptr);
4014
4015 o = lookupKeyRead(c->db,c->argv[1]);
4016 if (o == NULL) {
4017 addReply(c,shared.nullmultibulk);
4018 } else {
4019 if (o->type != REDIS_ZSET) {
4020 addReply(c,shared.wrongtypeerr);
4021 } else {
4022 zset *zsetobj = o->ptr;
4023 zskiplist *zsl = zsetobj->zsl;
4024 zskiplistNode *ln;
4025
4026 int llen = zsl->length;
4027 int rangelen, j;
4028 robj *ele;
4029
4030 /* convert negative indexes */
4031 if (start < 0) start = llen+start;
4032 if (end < 0) end = llen+end;
4033 if (start < 0) start = 0;
4034 if (end < 0) end = 0;
4035
4036 /* indexes sanity checks */
4037 if (start > end || start >= llen) {
4038 /* Out of range start or start > end result in empty list */
4039 addReply(c,shared.emptymultibulk);
4040 return;
4041 }
4042 if (end >= llen) end = llen-1;
4043 rangelen = (end-start)+1;
4044
4045 /* Return the result in form of a multi-bulk reply */
4046 if (reverse) {
4047 ln = zsl->tail;
4048 while (start--)
4049 ln = ln->backward;
4050 } else {
4051 ln = zsl->header->forward[0];
4052 while (start--)
4053 ln = ln->forward[0];
4054 }
4055
4056 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4057 for (j = 0; j < rangelen; j++) {
4058 ele = ln->obj;
4059 addReplyBulkLen(c,ele);
4060 addReply(c,ele);
4061 addReply(c,shared.crlf);
4062 ln = reverse ? ln->backward : ln->forward[0];
4063 }
4064 }
4065 }
4066 }
4067
4068 static void zrangeCommand(redisClient *c) {
4069 zrangeGenericCommand(c,0);
4070 }
4071
4072 static void zrevrangeCommand(redisClient *c) {
4073 zrangeGenericCommand(c,1);
4074 }
4075
4076 static void zlenCommand(redisClient *c) {
4077 robj *o;
4078 zset *zs;
4079
4080 o = lookupKeyRead(c->db,c->argv[1]);
4081 if (o == NULL) {
4082 addReply(c,shared.czero);
4083 return;
4084 } else {
4085 if (o->type != REDIS_ZSET) {
4086 addReply(c,shared.wrongtypeerr);
4087 } else {
4088 zs = o->ptr;
4089 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4090 }
4091 }
4092 }
4093
4094 /* ========================= Non type-specific commands ==================== */
4095
4096 static void flushdbCommand(redisClient *c) {
4097 server.dirty += dictSize(c->db->dict);
4098 dictEmpty(c->db->dict);
4099 dictEmpty(c->db->expires);
4100 addReply(c,shared.ok);
4101 }
4102
4103 static void flushallCommand(redisClient *c) {
4104 server.dirty += emptyDb();
4105 addReply(c,shared.ok);
4106 rdbSave(server.dbfilename);
4107 server.dirty++;
4108 }
4109
4110 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4111 redisSortOperation *so = zmalloc(sizeof(*so));
4112 so->type = type;
4113 so->pattern = pattern;
4114 return so;
4115 }
4116
4117 /* Return the value associated to the key with a name obtained
4118 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4119 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4120 char *p;
4121 sds spat, ssub;
4122 robj keyobj;
4123 int prefixlen, sublen, postfixlen;
4124 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4125 struct {
4126 long len;
4127 long free;
4128 char buf[REDIS_SORTKEY_MAX+1];
4129 } keyname;
4130
4131 if (subst->encoding == REDIS_ENCODING_RAW)
4132 incrRefCount(subst);
4133 else {
4134 subst = getDecodedObject(subst);
4135 }
4136
4137 spat = pattern->ptr;
4138 ssub = subst->ptr;
4139 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4140 p = strchr(spat,'*');
4141 if (!p) return NULL;
4142
4143 prefixlen = p-spat;
4144 sublen = sdslen(ssub);
4145 postfixlen = sdslen(spat)-(prefixlen+1);
4146 memcpy(keyname.buf,spat,prefixlen);
4147 memcpy(keyname.buf+prefixlen,ssub,sublen);
4148 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4149 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4150 keyname.len = prefixlen+sublen+postfixlen;
4151
4152 keyobj.refcount = 1;
4153 keyobj.type = REDIS_STRING;
4154 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4155
4156 decrRefCount(subst);
4157
4158 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4159 return lookupKeyRead(db,&keyobj);
4160 }
4161
4162 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4163 * the additional parameter is not standard but a BSD-specific we have to
4164 * pass sorting parameters via the global 'server' structure */
4165 static int sortCompare(const void *s1, const void *s2) {
4166 const redisSortObject *so1 = s1, *so2 = s2;
4167 int cmp;
4168
4169 if (!server.sort_alpha) {
4170 /* Numeric sorting. Here it's trivial as we precomputed scores */
4171 if (so1->u.score > so2->u.score) {
4172 cmp = 1;
4173 } else if (so1->u.score < so2->u.score) {
4174 cmp = -1;
4175 } else {
4176 cmp = 0;
4177 }
4178 } else {
4179 /* Alphanumeric sorting */
4180 if (server.sort_bypattern) {
4181 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4182 /* At least one compare object is NULL */
4183 if (so1->u.cmpobj == so2->u.cmpobj)
4184 cmp = 0;
4185 else if (so1->u.cmpobj == NULL)
4186 cmp = -1;
4187 else
4188 cmp = 1;
4189 } else {
4190 /* We have both the objects, use strcoll */
4191 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4192 }
4193 } else {
4194 /* Compare elements directly */
4195 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4196 so2->obj->encoding == REDIS_ENCODING_RAW) {
4197 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4198 } else {
4199 robj *dec1, *dec2;
4200
4201 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4202 so1->obj : getDecodedObject(so1->obj);
4203 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4204 so2->obj : getDecodedObject(so2->obj);
4205 cmp = strcoll(dec1->ptr,dec2->ptr);
4206 if (dec1 != so1->obj) decrRefCount(dec1);
4207 if (dec2 != so2->obj) decrRefCount(dec2);
4208 }
4209 }
4210 }
4211 return server.sort_desc ? -cmp : cmp;
4212 }
4213
4214 /* The SORT command is the most complex command in Redis. Warning: this code
4215 * is optimized for speed and a bit less for readability */
4216 static void sortCommand(redisClient *c) {
4217 list *operations;
4218 int outputlen = 0;
4219 int desc = 0, alpha = 0;
4220 int limit_start = 0, limit_count = -1, start, end;
4221 int j, dontsort = 0, vectorlen;
4222 int getop = 0; /* GET operation counter */
4223 robj *sortval, *sortby = NULL;
4224 redisSortObject *vector; /* Resulting vector to sort */
4225
4226 /* Lookup the key to sort. It must be of the right types */
4227 sortval = lookupKeyRead(c->db,c->argv[1]);
4228 if (sortval == NULL) {
4229 addReply(c,shared.nokeyerr);
4230 return;
4231 }
4232 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4233 addReply(c,shared.wrongtypeerr);
4234 return;
4235 }
4236
4237 /* Create a list of operations to perform for every sorted element.
4238 * Operations can be GET/DEL/INCR/DECR */
4239 operations = listCreate();
4240 listSetFreeMethod(operations,zfree);
4241 j = 2;
4242
4243 /* Now we need to protect sortval incrementing its count, in the future
4244 * SORT may have options able to overwrite/delete keys during the sorting
4245 * and the sorted key itself may get destroied */
4246 incrRefCount(sortval);
4247
4248 /* The SORT command has an SQL-alike syntax, parse it */
4249 while(j < c->argc) {
4250 int leftargs = c->argc-j-1;
4251 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4252 desc = 0;
4253 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4254 desc = 1;
4255 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4256 alpha = 1;
4257 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4258 limit_start = atoi(c->argv[j+1]->ptr);
4259 limit_count = atoi(c->argv[j+2]->ptr);
4260 j+=2;
4261 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4262 sortby = c->argv[j+1];
4263 /* If the BY pattern does not contain '*', i.e. it is constant,
4264 * we don't need to sort nor to lookup the weight keys. */
4265 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4266 j++;
4267 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4268 listAddNodeTail(operations,createSortOperation(
4269 REDIS_SORT_GET,c->argv[j+1]));
4270 getop++;
4271 j++;
4272 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
4273 listAddNodeTail(operations,createSortOperation(
4274 REDIS_SORT_DEL,c->argv[j+1]));
4275 j++;
4276 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
4277 listAddNodeTail(operations,createSortOperation(
4278 REDIS_SORT_INCR,c->argv[j+1]));
4279 j++;
4280 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4281 listAddNodeTail(operations,createSortOperation(
4282 REDIS_SORT_DECR,c->argv[j+1]));
4283 j++;
4284 } else {
4285 decrRefCount(sortval);
4286 listRelease(operations);
4287 addReply(c,shared.syntaxerr);
4288 return;
4289 }
4290 j++;
4291 }
4292
4293 /* Load the sorting vector with all the objects to sort */
4294 vectorlen = (sortval->type == REDIS_LIST) ?
4295 listLength((list*)sortval->ptr) :
4296 dictSize((dict*)sortval->ptr);
4297 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4298 j = 0;
4299 if (sortval->type == REDIS_LIST) {
4300 list *list = sortval->ptr;
4301 listNode *ln;
4302
4303 listRewind(list);
4304 while((ln = listYield(list))) {
4305 robj *ele = ln->value;
4306 vector[j].obj = ele;
4307 vector[j].u.score = 0;
4308 vector[j].u.cmpobj = NULL;
4309 j++;
4310 }
4311 } else {
4312 dict *set = sortval->ptr;
4313 dictIterator *di;
4314 dictEntry *setele;
4315
4316 di = dictGetIterator(set);
4317 while((setele = dictNext(di)) != NULL) {
4318 vector[j].obj = dictGetEntryKey(setele);
4319 vector[j].u.score = 0;
4320 vector[j].u.cmpobj = NULL;
4321 j++;
4322 }
4323 dictReleaseIterator(di);
4324 }
4325 assert(j == vectorlen);
4326
4327 /* Now it's time to load the right scores in the sorting vector */
4328 if (dontsort == 0) {
4329 for (j = 0; j < vectorlen; j++) {
4330 if (sortby) {
4331 robj *byval;
4332
4333 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4334 if (!byval || byval->type != REDIS_STRING) continue;
4335 if (alpha) {
4336 if (byval->encoding == REDIS_ENCODING_RAW) {
4337 vector[j].u.cmpobj = byval;
4338 incrRefCount(byval);
4339 } else {
4340 vector[j].u.cmpobj = getDecodedObject(byval);
4341 }
4342 } else {
4343 if (byval->encoding == REDIS_ENCODING_RAW) {
4344 vector[j].u.score = strtod(byval->ptr,NULL);
4345 } else {
4346 if (byval->encoding == REDIS_ENCODING_INT) {
4347 vector[j].u.score = (long)byval->ptr;
4348 } else
4349 assert(1 != 1);
4350 }
4351 }
4352 } else {
4353 if (!alpha) {
4354 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4355 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4356 else {
4357 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4358 vector[j].u.score = (long) vector[j].obj->ptr;
4359 else
4360 assert(1 != 1);
4361 }
4362 }
4363 }
4364 }
4365 }
4366
4367 /* We are ready to sort the vector... perform a bit of sanity check
4368 * on the LIMIT option too. We'll use a partial version of quicksort. */
4369 start = (limit_start < 0) ? 0 : limit_start;
4370 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4371 if (start >= vectorlen) {
4372 start = vectorlen-1;
4373 end = vectorlen-2;
4374 }
4375 if (end >= vectorlen) end = vectorlen-1;
4376
4377 if (dontsort == 0) {
4378 server.sort_desc = desc;
4379 server.sort_alpha = alpha;
4380 server.sort_bypattern = sortby ? 1 : 0;
4381 if (sortby && (start != 0 || end != vectorlen-1))
4382 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4383 else
4384 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4385 }
4386
4387 /* Send command output to the output buffer, performing the specified
4388 * GET/DEL/INCR/DECR operations if any. */
4389 outputlen = getop ? getop*(end-start+1) : end-start+1;
4390 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4391 for (j = start; j <= end; j++) {
4392 listNode *ln;
4393 if (!getop) {
4394 addReplyBulkLen(c,vector[j].obj);
4395 addReply(c,vector[j].obj);
4396 addReply(c,shared.crlf);
4397 }
4398 listRewind(operations);
4399 while((ln = listYield(operations))) {
4400 redisSortOperation *sop = ln->value;
4401 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4402 vector[j].obj);
4403
4404 if (sop->type == REDIS_SORT_GET) {
4405 if (!val || val->type != REDIS_STRING) {
4406 addReply(c,shared.nullbulk);
4407 } else {
4408 addReplyBulkLen(c,val);
4409 addReply(c,val);
4410 addReply(c,shared.crlf);
4411 }
4412 } else if (sop->type == REDIS_SORT_DEL) {
4413 /* TODO */
4414 }
4415 }
4416 }
4417
4418 /* Cleanup */
4419 decrRefCount(sortval);
4420 listRelease(operations);
4421 for (j = 0; j < vectorlen; j++) {
4422 if (sortby && alpha && vector[j].u.cmpobj)
4423 decrRefCount(vector[j].u.cmpobj);
4424 }
4425 zfree(vector);
4426 }
4427
4428 static void infoCommand(redisClient *c) {
4429 sds info;
4430 time_t uptime = time(NULL)-server.stat_starttime;
4431 int j;
4432
4433 info = sdscatprintf(sdsempty(),
4434 "redis_version:%s\r\n"
4435 "arch_bits:%s\r\n"
4436 "uptime_in_seconds:%d\r\n"
4437 "uptime_in_days:%d\r\n"
4438 "connected_clients:%d\r\n"
4439 "connected_slaves:%d\r\n"
4440 "used_memory:%zu\r\n"
4441 "changes_since_last_save:%lld\r\n"
4442 "bgsave_in_progress:%d\r\n"
4443 "last_save_time:%d\r\n"
4444 "total_connections_received:%lld\r\n"
4445 "total_commands_processed:%lld\r\n"
4446 "role:%s\r\n"
4447 ,REDIS_VERSION,
4448 (sizeof(long) == 8) ? "64" : "32",
4449 uptime,
4450 uptime/(3600*24),
4451 listLength(server.clients)-listLength(server.slaves),
4452 listLength(server.slaves),
4453 server.usedmemory,
4454 server.dirty,
4455 server.bgsaveinprogress,
4456 server.lastsave,
4457 server.stat_numconnections,
4458 server.stat_numcommands,
4459 server.masterhost == NULL ? "master" : "slave"
4460 );
4461 if (server.masterhost) {
4462 info = sdscatprintf(info,
4463 "master_host:%s\r\n"
4464 "master_port:%d\r\n"
4465 "master_link_status:%s\r\n"
4466 "master_last_io_seconds_ago:%d\r\n"
4467 ,server.masterhost,
4468 server.masterport,
4469 (server.replstate == REDIS_REPL_CONNECTED) ?
4470 "up" : "down",
4471 (int)(time(NULL)-server.master->lastinteraction)
4472 );
4473 }
4474 for (j = 0; j < server.dbnum; j++) {
4475 long long keys, vkeys;
4476
4477 keys = dictSize(server.db[j].dict);
4478 vkeys = dictSize(server.db[j].expires);
4479 if (keys || vkeys) {
4480 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4481 j, keys, vkeys);
4482 }
4483 }
4484 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4485 addReplySds(c,info);
4486 addReply(c,shared.crlf);
4487 }
4488
4489 static void monitorCommand(redisClient *c) {
4490 /* ignore MONITOR if aleady slave or in monitor mode */
4491 if (c->flags & REDIS_SLAVE) return;
4492
4493 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4494 c->slaveseldb = 0;
4495 listAddNodeTail(server.monitors,c);
4496 addReply(c,shared.ok);
4497 }
4498
4499 /* ================================= Expire ================================= */
4500 static int removeExpire(redisDb *db, robj *key) {
4501 if (dictDelete(db->expires,key) == DICT_OK) {
4502 return 1;
4503 } else {
4504 return 0;
4505 }
4506 }
4507
4508 static int setExpire(redisDb *db, robj *key, time_t when) {
4509 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4510 return 0;
4511 } else {
4512 incrRefCount(key);
4513 return 1;
4514 }
4515 }
4516
4517 /* Return the expire time of the specified key, or -1 if no expire
4518 * is associated with this key (i.e. the key is non volatile) */
4519 static time_t getExpire(redisDb *db, robj *key) {
4520 dictEntry *de;
4521
4522 /* No expire? return ASAP */
4523 if (dictSize(db->expires) == 0 ||
4524 (de = dictFind(db->expires,key)) == NULL) return -1;
4525
4526 return (time_t) dictGetEntryVal(de);
4527 }
4528
4529 static int expireIfNeeded(redisDb *db, robj *key) {
4530 time_t when;
4531 dictEntry *de;
4532
4533 /* No expire? return ASAP */
4534 if (dictSize(db->expires) == 0 ||
4535 (de = dictFind(db->expires,key)) == NULL) return 0;
4536
4537 /* Lookup the expire */
4538 when = (time_t) dictGetEntryVal(de);
4539 if (time(NULL) <= when) return 0;
4540
4541 /* Delete the key */
4542 dictDelete(db->expires,key);
4543 return dictDelete(db->dict,key) == DICT_OK;
4544 }
4545
4546 static int deleteIfVolatile(redisDb *db, robj *key) {
4547 dictEntry *de;
4548
4549 /* No expire? return ASAP */
4550 if (dictSize(db->expires) == 0 ||
4551 (de = dictFind(db->expires,key)) == NULL) return 0;
4552
4553 /* Delete the key */
4554 server.dirty++;
4555 dictDelete(db->expires,key);
4556 return dictDelete(db->dict,key) == DICT_OK;
4557 }
4558
4559 static void expireCommand(redisClient *c) {
4560 dictEntry *de;
4561 int seconds = atoi(c->argv[2]->ptr);
4562
4563 de = dictFind(c->db->dict,c->argv[1]);
4564 if (de == NULL) {
4565 addReply(c,shared.czero);
4566 return;
4567 }
4568 if (seconds <= 0) {
4569 addReply(c, shared.czero);
4570 return;
4571 } else {
4572 time_t when = time(NULL)+seconds;
4573 if (setExpire(c->db,c->argv[1],when)) {
4574 addReply(c,shared.cone);
4575 server.dirty++;
4576 } else {
4577 addReply(c,shared.czero);
4578 }
4579 return;
4580 }
4581 }
4582
4583 static void ttlCommand(redisClient *c) {
4584 time_t expire;
4585 int ttl = -1;
4586
4587 expire = getExpire(c->db,c->argv[1]);
4588 if (expire != -1) {
4589 ttl = (int) (expire-time(NULL));
4590 if (ttl < 0) ttl = -1;
4591 }
4592 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4593 }
4594
4595 static void msetGenericCommand(redisClient *c, int nx) {
4596 int j;
4597
4598 if ((c->argc % 2) == 0) {
4599 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4600 return;
4601 }
4602 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4603 * set nothing at all if at least one already key exists. */
4604 if (nx) {
4605 for (j = 1; j < c->argc; j += 2) {
4606 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4607 addReply(c, shared.czero);
4608 return;
4609 }
4610 }
4611 }
4612
4613 for (j = 1; j < c->argc; j += 2) {
4614 int retval;
4615
4616 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4617 if (retval == DICT_ERR) {
4618 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4619 incrRefCount(c->argv[j+1]);
4620 } else {
4621 incrRefCount(c->argv[j]);
4622 incrRefCount(c->argv[j+1]);
4623 }
4624 removeExpire(c->db,c->argv[j]);
4625 }
4626 server.dirty += (c->argc-1)/2;
4627 addReply(c, nx ? shared.cone : shared.ok);
4628 }
4629
4630 static void msetCommand(redisClient *c) {
4631 msetGenericCommand(c,0);
4632 }
4633
4634 static void msetnxCommand(redisClient *c) {
4635 msetGenericCommand(c,1);
4636 }
4637
4638 /* =============================== Replication ============================= */
4639
4640 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4641 ssize_t nwritten, ret = size;
4642 time_t start = time(NULL);
4643
4644 timeout++;
4645 while(size) {
4646 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4647 nwritten = write(fd,ptr,size);
4648 if (nwritten == -1) return -1;
4649 ptr += nwritten;
4650 size -= nwritten;
4651 }
4652 if ((time(NULL)-start) > timeout) {
4653 errno = ETIMEDOUT;
4654 return -1;
4655 }
4656 }
4657 return ret;
4658 }
4659
4660 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4661 ssize_t nread, totread = 0;
4662 time_t start = time(NULL);
4663
4664 timeout++;
4665 while(size) {
4666 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4667 nread = read(fd,ptr,size);
4668 if (nread == -1) return -1;
4669 ptr += nread;
4670 size -= nread;
4671 totread += nread;
4672 }
4673 if ((time(NULL)-start) > timeout) {
4674 errno = ETIMEDOUT;
4675 return -1;
4676 }
4677 }
4678 return totread;
4679 }
4680
4681 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4682 ssize_t nread = 0;
4683
4684 size--;
4685 while(size) {
4686 char c;
4687
4688 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4689 if (c == '\n') {
4690 *ptr = '\0';
4691 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4692 return nread;
4693 } else {
4694 *ptr++ = c;
4695 *ptr = '\0';
4696 nread++;
4697 }
4698 }
4699 return nread;
4700 }
4701
4702 static void syncCommand(redisClient *c) {
4703 /* ignore SYNC if aleady slave or in monitor mode */
4704 if (c->flags & REDIS_SLAVE) return;
4705
4706 /* SYNC can't be issued when the server has pending data to send to
4707 * the client about already issued commands. We need a fresh reply
4708 * buffer registering the differences between the BGSAVE and the current
4709 * dataset, so that we can copy to other slaves if needed. */
4710 if (listLength(c->reply) != 0) {
4711 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4712 return;
4713 }
4714
4715 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4716 /* Here we need to check if there is a background saving operation
4717 * in progress, or if it is required to start one */
4718 if (server.bgsaveinprogress) {
4719 /* Ok a background save is in progress. Let's check if it is a good
4720 * one for replication, i.e. if there is another slave that is
4721 * registering differences since the server forked to save */
4722 redisClient *slave;
4723 listNode *ln;
4724
4725 listRewind(server.slaves);
4726 while((ln = listYield(server.slaves))) {
4727 slave = ln->value;
4728 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4729 }
4730 if (ln) {
4731 /* Perfect, the server is already registering differences for
4732 * another slave. Set the right state, and copy the buffer. */
4733 listRelease(c->reply);
4734 c->reply = listDup(slave->reply);
4735 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4736 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4737 } else {
4738 /* No way, we need to wait for the next BGSAVE in order to
4739 * register differences */
4740 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4741 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4742 }
4743 } else {
4744 /* Ok we don't have a BGSAVE in progress, let's start one */
4745 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4746 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4747 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4748 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4749 return;
4750 }
4751 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4752 }
4753 c->repldbfd = -1;
4754 c->flags |= REDIS_SLAVE;
4755 c->slaveseldb = 0;
4756 listAddNodeTail(server.slaves,c);
4757 return;
4758 }
4759
4760 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4761 redisClient *slave = privdata;
4762 REDIS_NOTUSED(el);
4763 REDIS_NOTUSED(mask);
4764 char buf[REDIS_IOBUF_LEN];
4765 ssize_t nwritten, buflen;
4766
4767 if (slave->repldboff == 0) {
4768 /* Write the bulk write count before to transfer the DB. In theory here
4769 * we don't know how much room there is in the output buffer of the
4770 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4771 * operations) will never be smaller than the few bytes we need. */
4772 sds bulkcount;
4773
4774 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4775 slave->repldbsize);
4776 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4777 {
4778 sdsfree(bulkcount);
4779 freeClient(slave);
4780 return;
4781 }
4782 sdsfree(bulkcount);
4783 }
4784 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4785 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4786 if (buflen <= 0) {
4787 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4788 (buflen == 0) ? "premature EOF" : strerror(errno));
4789 freeClient(slave);
4790 return;
4791 }
4792 if ((nwritten = write(fd,buf,buflen)) == -1) {
4793 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4794 strerror(errno));
4795 freeClient(slave);
4796 return;
4797 }
4798 slave->repldboff += nwritten;
4799 if (slave->repldboff == slave->repldbsize) {
4800 close(slave->repldbfd);
4801 slave->repldbfd = -1;
4802 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4803 slave->replstate = REDIS_REPL_ONLINE;
4804 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4805 sendReplyToClient, slave, NULL) == AE_ERR) {
4806 freeClient(slave);
4807 return;
4808 }
4809 addReplySds(slave,sdsempty());
4810 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4811 }
4812 }
4813
4814 /* This function is called at the end of every backgrond saving.
4815 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4816 * otherwise REDIS_ERR is passed to the function.
4817 *
4818 * The goal of this function is to handle slaves waiting for a successful
4819 * background saving in order to perform non-blocking synchronization. */
4820 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4821 listNode *ln;
4822 int startbgsave = 0;
4823
4824 listRewind(server.slaves);
4825 while((ln = listYield(server.slaves))) {
4826 redisClient *slave = ln->value;
4827
4828 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4829 startbgsave = 1;
4830 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4831 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4832 struct redis_stat buf;
4833
4834 if (bgsaveerr != REDIS_OK) {
4835 freeClient(slave);
4836 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4837 continue;
4838 }
4839 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4840 redis_fstat(slave->repldbfd,&buf) == -1) {
4841 freeClient(slave);
4842 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4843 continue;
4844 }
4845 slave->repldboff = 0;
4846 slave->repldbsize = buf.st_size;
4847 slave->replstate = REDIS_REPL_SEND_BULK;
4848 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4849 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4850 freeClient(slave);
4851 continue;
4852 }
4853 }
4854 }
4855 if (startbgsave) {
4856 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4857 listRewind(server.slaves);
4858 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4859 while((ln = listYield(server.slaves))) {
4860 redisClient *slave = ln->value;
4861
4862 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4863 freeClient(slave);
4864 }
4865 }
4866 }
4867 }
4868
4869 static int syncWithMaster(void) {
4870 char buf[1024], tmpfile[256];
4871 int dumpsize;
4872 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4873 int dfd;
4874
4875 if (fd == -1) {
4876 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4877 strerror(errno));
4878 return REDIS_ERR;
4879 }
4880 /* Issue the SYNC command */
4881 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4882 close(fd);
4883 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4884 strerror(errno));
4885 return REDIS_ERR;
4886 }
4887 /* Read the bulk write count */
4888 if (syncReadLine(fd,buf,1024,3600) == -1) {
4889 close(fd);
4890 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
4891 strerror(errno));
4892 return REDIS_ERR;
4893 }
4894 dumpsize = atoi(buf+1);
4895 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4896 /* Read the bulk write data on a temp file */
4897 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4898 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4899 if (dfd == -1) {
4900 close(fd);
4901 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
4902 return REDIS_ERR;
4903 }
4904 while(dumpsize) {
4905 int nread, nwritten;
4906
4907 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
4908 if (nread == -1) {
4909 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
4910 strerror(errno));
4911 close(fd);
4912 close(dfd);
4913 return REDIS_ERR;
4914 }
4915 nwritten = write(dfd,buf,nread);
4916 if (nwritten == -1) {
4917 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
4918 close(fd);
4919 close(dfd);
4920 return REDIS_ERR;
4921 }
4922 dumpsize -= nread;
4923 }
4924 close(dfd);
4925 if (rename(tmpfile,server.dbfilename) == -1) {
4926 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
4927 unlink(tmpfile);
4928 close(fd);
4929 return REDIS_ERR;
4930 }
4931 emptyDb();
4932 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4933 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4934 close(fd);
4935 return REDIS_ERR;
4936 }
4937 server.master = createClient(fd);
4938 server.master->flags |= REDIS_MASTER;
4939 server.replstate = REDIS_REPL_CONNECTED;
4940 return REDIS_OK;
4941 }
4942
4943 static void slaveofCommand(redisClient *c) {
4944 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4945 !strcasecmp(c->argv[2]->ptr,"one")) {
4946 if (server.masterhost) {
4947 sdsfree(server.masterhost);
4948 server.masterhost = NULL;
4949 if (server.master) freeClient(server.master);
4950 server.replstate = REDIS_REPL_NONE;
4951 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4952 }
4953 } else {
4954 sdsfree(server.masterhost);
4955 server.masterhost = sdsdup(c->argv[1]->ptr);
4956 server.masterport = atoi(c->argv[2]->ptr);
4957 if (server.master) freeClient(server.master);
4958 server.replstate = REDIS_REPL_CONNECT;
4959 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4960 server.masterhost, server.masterport);
4961 }
4962 addReply(c,shared.ok);
4963 }
4964
4965 /* ============================ Maxmemory directive ======================== */
4966
4967 /* This function gets called when 'maxmemory' is set on the config file to limit
4968 * the max memory used by the server, and we are out of memory.
4969 * This function will try to, in order:
4970 *
4971 * - Free objects from the free list
4972 * - Try to remove keys with an EXPIRE set
4973 *
4974 * It is not possible to free enough memory to reach used-memory < maxmemory
4975 * the server will start refusing commands that will enlarge even more the
4976 * memory usage.
4977 */
4978 static void freeMemoryIfNeeded(void) {
4979 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4980 if (listLength(server.objfreelist)) {
4981 robj *o;
4982
4983 listNode *head = listFirst(server.objfreelist);
4984 o = listNodeValue(head);
4985 listDelNode(server.objfreelist,head);
4986 zfree(o);
4987 } else {
4988 int j, k, freed = 0;
4989
4990 for (j = 0; j < server.dbnum; j++) {
4991 int minttl = -1;
4992 robj *minkey = NULL;
4993 struct dictEntry *de;
4994
4995 if (dictSize(server.db[j].expires)) {
4996 freed = 1;
4997 /* From a sample of three keys drop the one nearest to
4998 * the natural expire */
4999 for (k = 0; k < 3; k++) {
5000 time_t t;
5001
5002 de = dictGetRandomKey(server.db[j].expires);
5003 t = (time_t) dictGetEntryVal(de);
5004 if (minttl == -1 || t < minttl) {
5005 minkey = dictGetEntryKey(de);
5006 minttl = t;
5007 }
5008 }
5009 deleteKey(server.db+j,minkey);
5010 }
5011 }
5012 if (!freed) return; /* nothing to free... */
5013 }
5014 }
5015 }
5016
5017 /* ================================= Debugging ============================== */
5018
5019 static void debugCommand(redisClient *c) {
5020 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
5021 *((char*)-1) = 'x';
5022 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
5023 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
5024 robj *key, *val;
5025
5026 if (!de) {
5027 addReply(c,shared.nokeyerr);
5028 return;
5029 }
5030 key = dictGetEntryKey(de);
5031 val = dictGetEntryVal(de);
5032 addReplySds(c,sdscatprintf(sdsempty(),
5033 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5034 key, key->refcount, val, val->refcount, val->encoding));
5035 } else {
5036 addReplySds(c,sdsnew(
5037 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5038 }
5039 }
5040
5041 #ifdef HAVE_BACKTRACE
5042 static struct redisFunctionSym symsTable[] = {
5043 {"compareStringObjects", (unsigned long)compareStringObjects},
5044 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
5045 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
5046 {"dictEncObjHash", (unsigned long)dictEncObjHash},
5047 {"incrDecrCommand", (unsigned long)incrDecrCommand},
5048 {"freeStringObject", (unsigned long)freeStringObject},
5049 {"freeListObject", (unsigned long)freeListObject},
5050 {"freeSetObject", (unsigned long)freeSetObject},
5051 {"decrRefCount", (unsigned long)decrRefCount},
5052 {"createObject", (unsigned long)createObject},
5053 {"freeClient", (unsigned long)freeClient},
5054 {"rdbLoad", (unsigned long)rdbLoad},
5055 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
5056 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
5057 {"addReply", (unsigned long)addReply},
5058 {"addReplySds", (unsigned long)addReplySds},
5059 {"incrRefCount", (unsigned long)incrRefCount},
5060 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
5061 {"createStringObject", (unsigned long)createStringObject},
5062 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
5063 {"syncWithMaster", (unsigned long)syncWithMaster},
5064 {"tryObjectSharing", (unsigned long)tryObjectSharing},
5065 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
5066 {"getDecodedObject", (unsigned long)getDecodedObject},
5067 {"removeExpire", (unsigned long)removeExpire},
5068 {"expireIfNeeded", (unsigned long)expireIfNeeded},
5069 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
5070 {"deleteKey", (unsigned long)deleteKey},
5071 {"getExpire", (unsigned long)getExpire},
5072 {"setExpire", (unsigned long)setExpire},
5073 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
5074 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
5075 {"authCommand", (unsigned long)authCommand},
5076 {"pingCommand", (unsigned long)pingCommand},
5077 {"echoCommand", (unsigned long)echoCommand},
5078 {"setCommand", (unsigned long)setCommand},
5079 {"setnxCommand", (unsigned long)setnxCommand},
5080 {"getCommand", (unsigned long)getCommand},
5081 {"delCommand", (unsigned long)delCommand},
5082 {"existsCommand", (unsigned long)existsCommand},
5083 {"incrCommand", (unsigned long)incrCommand},
5084 {"decrCommand", (unsigned long)decrCommand},
5085 {"incrbyCommand", (unsigned long)incrbyCommand},
5086 {"decrbyCommand", (unsigned long)decrbyCommand},
5087 {"selectCommand", (unsigned long)selectCommand},
5088 {"randomkeyCommand", (unsigned long)randomkeyCommand},
5089 {"keysCommand", (unsigned long)keysCommand},
5090 {"dbsizeCommand", (unsigned long)dbsizeCommand},
5091 {"lastsaveCommand", (unsigned long)lastsaveCommand},
5092 {"saveCommand", (unsigned long)saveCommand},
5093 {"bgsaveCommand", (unsigned long)bgsaveCommand},
5094 {"shutdownCommand", (unsigned long)shutdownCommand},
5095 {"moveCommand", (unsigned long)moveCommand},
5096 {"renameCommand", (unsigned long)renameCommand},
5097 {"renamenxCommand", (unsigned long)renamenxCommand},
5098 {"lpushCommand", (unsigned long)lpushCommand},
5099 {"rpushCommand", (unsigned long)rpushCommand},
5100 {"lpopCommand", (unsigned long)lpopCommand},
5101 {"rpopCommand", (unsigned long)rpopCommand},
5102 {"llenCommand", (unsigned long)llenCommand},
5103 {"lindexCommand", (unsigned long)lindexCommand},
5104 {"lrangeCommand", (unsigned long)lrangeCommand},
5105 {"ltrimCommand", (unsigned long)ltrimCommand},
5106 {"typeCommand", (unsigned long)typeCommand},
5107 {"lsetCommand", (unsigned long)lsetCommand},
5108 {"saddCommand", (unsigned long)saddCommand},
5109 {"sremCommand", (unsigned long)sremCommand},
5110 {"smoveCommand", (unsigned long)smoveCommand},
5111 {"sismemberCommand", (unsigned long)sismemberCommand},
5112 {"scardCommand", (unsigned long)scardCommand},
5113 {"spopCommand", (unsigned long)spopCommand},
5114 {"srandmemberCommand", (unsigned long)srandmemberCommand},
5115 {"sinterCommand", (unsigned long)sinterCommand},
5116 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
5117 {"sunionCommand", (unsigned long)sunionCommand},
5118 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
5119 {"sdiffCommand", (unsigned long)sdiffCommand},
5120 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
5121 {"syncCommand", (unsigned long)syncCommand},
5122 {"flushdbCommand", (unsigned long)flushdbCommand},
5123 {"flushallCommand", (unsigned long)flushallCommand},
5124 {"sortCommand", (unsigned long)sortCommand},
5125 {"lremCommand", (unsigned long)lremCommand},
5126 {"infoCommand", (unsigned long)infoCommand},
5127 {"mgetCommand", (unsigned long)mgetCommand},
5128 {"monitorCommand", (unsigned long)monitorCommand},
5129 {"expireCommand", (unsigned long)expireCommand},
5130 {"getsetCommand", (unsigned long)getsetCommand},
5131 {"ttlCommand", (unsigned long)ttlCommand},
5132 {"slaveofCommand", (unsigned long)slaveofCommand},
5133 {"debugCommand", (unsigned long)debugCommand},
5134 {"processCommand", (unsigned long)processCommand},
5135 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
5136 {"readQueryFromClient", (unsigned long)readQueryFromClient},
5137 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
5138 {"msetGenericCommand", (unsigned long)msetGenericCommand},
5139 {"msetCommand", (unsigned long)msetCommand},
5140 {"msetnxCommand", (unsigned long)msetnxCommand},
5141 {"zslCreateNode", (unsigned long)zslCreateNode},
5142 {"zslCreate", (unsigned long)zslCreate},
5143 {"zslFreeNode",(unsigned long)zslFreeNode},
5144 {"zslFree",(unsigned long)zslFree},
5145 {"zslRandomLevel",(unsigned long)zslRandomLevel},
5146 {"zslInsert",(unsigned long)zslInsert},
5147 {"zslDelete",(unsigned long)zslDelete},
5148 {"createZsetObject",(unsigned long)createZsetObject},
5149 {"zaddCommand",(unsigned long)zaddCommand},
5150 {"zrangeGenericCommand",(unsigned long)zrangeGenericCommand},
5151 {"zrangeCommand",(unsigned long)zrangeCommand},
5152 {"zrevrangeCommand",(unsigned long)zrevrangeCommand},
5153 {"zremCommand",(unsigned long)zremCommand},
5154 {"rdbSaveDoubleValue",(unsigned long)rdbSaveDoubleValue},
5155 {"rdbLoadDoubleValue",(unsigned long)rdbLoadDoubleValue},
5156 {NULL,0}
5157 };
5158
5159 /* This function try to convert a pointer into a function name. It's used in
5160 * oreder to provide a backtrace under segmentation fault that's able to
5161 * display functions declared as static (otherwise the backtrace is useless). */
5162 static char *findFuncName(void *pointer, unsigned long *offset){
5163 int i, ret = -1;
5164 unsigned long off, minoff = 0;
5165
5166 /* Try to match against the Symbol with the smallest offset */
5167 for (i=0; symsTable[i].pointer; i++) {
5168 unsigned long lp = (unsigned long) pointer;
5169
5170 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5171 off=lp-symsTable[i].pointer;
5172 if (ret < 0 || off < minoff) {
5173 minoff=off;
5174 ret=i;
5175 }
5176 }
5177 }
5178 if (ret == -1) return NULL;
5179 *offset = minoff;
5180 return symsTable[ret].name;
5181 }
5182
5183 static void *getMcontextEip(ucontext_t *uc) {
5184 #if defined(__FreeBSD__)
5185 return (void*) uc->uc_mcontext.mc_eip;
5186 #elif defined(__dietlibc__)
5187 return (void*) uc->uc_mcontext.eip;
5188 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5189 return (void*) uc->uc_mcontext->__ss.__eip;
5190 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5191 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5192 return (void*) uc->uc_mcontext->__ss.__rip;
5193 #else
5194 return (void*) uc->uc_mcontext->__ss.__eip;
5195 #endif
5196 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5197 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5198 #elif defined(__ia64__) /* Linux IA64 */
5199 return (void*) uc->uc_mcontext.sc_ip;
5200 #else
5201 return NULL;
5202 #endif
5203 }
5204
5205 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5206 void *trace[100];
5207 char **messages = NULL;
5208 int i, trace_size = 0;
5209 unsigned long offset=0;
5210 time_t uptime = time(NULL)-server.stat_starttime;
5211 ucontext_t *uc = (ucontext_t*) secret;
5212 REDIS_NOTUSED(info);
5213
5214 redisLog(REDIS_WARNING,
5215 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5216 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5217 "redis_version:%s; "
5218 "uptime_in_seconds:%d; "
5219 "connected_clients:%d; "
5220 "connected_slaves:%d; "
5221 "used_memory:%zu; "
5222 "changes_since_last_save:%lld; "
5223 "bgsave_in_progress:%d; "
5224 "last_save_time:%d; "
5225 "total_connections_received:%lld; "
5226 "total_commands_processed:%lld; "
5227 "role:%s;"
5228 ,REDIS_VERSION,
5229 uptime,
5230 listLength(server.clients)-listLength(server.slaves),
5231 listLength(server.slaves),
5232 server.usedmemory,
5233 server.dirty,
5234 server.bgsaveinprogress,
5235 server.lastsave,
5236 server.stat_numconnections,
5237 server.stat_numcommands,
5238 server.masterhost == NULL ? "master" : "slave"
5239 ));
5240
5241 trace_size = backtrace(trace, 100);
5242 /* overwrite sigaction with caller's address */
5243 if (getMcontextEip(uc) != NULL) {
5244 trace[1] = getMcontextEip(uc);
5245 }
5246 messages = backtrace_symbols(trace, trace_size);
5247
5248 for (i=1; i<trace_size; ++i) {
5249 char *fn = findFuncName(trace[i], &offset), *p;
5250
5251 p = strchr(messages[i],'+');
5252 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5253 redisLog(REDIS_WARNING,"%s", messages[i]);
5254 } else {
5255 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5256 }
5257 }
5258 free(messages);
5259 exit(0);
5260 }
5261
5262 static void setupSigSegvAction(void) {
5263 struct sigaction act;
5264
5265 sigemptyset (&act.sa_mask);
5266 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5267 * is used. Otherwise, sa_handler is used */
5268 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5269 act.sa_sigaction = segvHandler;
5270 sigaction (SIGSEGV, &act, NULL);
5271 sigaction (SIGBUS, &act, NULL);
5272 sigaction (SIGFPE, &act, NULL);
5273 sigaction (SIGILL, &act, NULL);
5274 sigaction (SIGBUS, &act, NULL);
5275 return;
5276 }
5277 #else /* HAVE_BACKTRACE */
5278 static void setupSigSegvAction(void) {
5279 }
5280 #endif /* HAVE_BACKTRACE */
5281
5282 /* =================================== Main! ================================ */
5283
5284 #ifdef __linux__
5285 int linuxOvercommitMemoryValue(void) {
5286 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5287 char buf[64];
5288
5289 if (!fp) return -1;
5290 if (fgets(buf,64,fp) == NULL) {
5291 fclose(fp);
5292 return -1;
5293 }
5294 fclose(fp);
5295
5296 return atoi(buf);
5297 }
5298
5299 void linuxOvercommitMemoryWarning(void) {
5300 if (linuxOvercommitMemoryValue() == 0) {
5301 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5302 }
5303 }
5304 #endif /* __linux__ */
5305
5306 static void daemonize(void) {
5307 int fd;
5308 FILE *fp;
5309
5310 if (fork() != 0) exit(0); /* parent exits */
5311 setsid(); /* create a new session */
5312
5313 /* Every output goes to /dev/null. If Redis is daemonized but
5314 * the 'logfile' is set to 'stdout' in the configuration file
5315 * it will not log at all. */
5316 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5317 dup2(fd, STDIN_FILENO);
5318 dup2(fd, STDOUT_FILENO);
5319 dup2(fd, STDERR_FILENO);
5320 if (fd > STDERR_FILENO) close(fd);
5321 }
5322 /* Try to write the pid file */
5323 fp = fopen(server.pidfile,"w");
5324 if (fp) {
5325 fprintf(fp,"%d\n",getpid());
5326 fclose(fp);
5327 }
5328 }
5329
5330 int main(int argc, char **argv) {
5331 initServerConfig();
5332 if (argc == 2) {
5333 ResetServerSaveParams();
5334 loadServerConfig(argv[1]);
5335 } else if (argc > 2) {
5336 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5337 exit(1);
5338 } else {
5339 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5340 }
5341 initServer();
5342 if (server.daemonize) daemonize();
5343 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5344 #ifdef __linux__
5345 linuxOvercommitMemoryWarning();
5346 #endif
5347 if (rdbLoad(server.dbfilename) == REDIS_OK)
5348 redisLog(REDIS_NOTICE,"DB loaded from disk");
5349 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5350 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5351 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5352 aeMain(server.el);
5353 aeDeleteEventLoop(server.el);
5354 return 0;
5355 }