]> git.saurik.com Git - redis.git/blob - redis.c
e27a364b5c6fb6e51e3e3512bc4aa8334044b01e
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60 #include <math.h>
61
62 #include "redis.h"
63 #include "ae.h" /* Event driven programming library */
64 #include "sds.h" /* Dynamic safe strings */
65 #include "anet.h" /* Networking the easy way */
66 #include "dict.h" /* Hash tables */
67 #include "adlist.h" /* Linked lists */
68 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
69 #include "lzf.h" /* LZF compression library */
70 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
71
72 /* Error codes */
73 #define REDIS_OK 0
74 #define REDIS_ERR -1
75
76 /* Static server configuration */
77 #define REDIS_SERVERPORT 6379 /* TCP port */
78 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
79 #define REDIS_IOBUF_LEN 1024
80 #define REDIS_LOADBUF_LEN 1024
81 #define REDIS_STATIC_ARGS 4
82 #define REDIS_DEFAULT_DBNUM 16
83 #define REDIS_CONFIGLINE_MAX 1024
84 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
85 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
86 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
87 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
88 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
89
90 /* Hash table parameters */
91 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
92
93 /* Command flags */
94 #define REDIS_CMD_BULK 1 /* Bulk write command */
95 #define REDIS_CMD_INLINE 2 /* Inline command */
96 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
97 this flags will return an error when the 'maxmemory' option is set in the
98 config file and the server is using more than maxmemory bytes of memory.
99 In short this commands are denied on low memory conditions. */
100 #define REDIS_CMD_DENYOOM 4
101
102 /* Object types */
103 #define REDIS_STRING 0
104 #define REDIS_LIST 1
105 #define REDIS_SET 2
106 #define REDIS_ZSET 3
107 #define REDIS_HASH 4
108
109 /* Objects encoding */
110 #define REDIS_ENCODING_RAW 0 /* Raw representation */
111 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
112
113 /* Object types only used for dumping to disk */
114 #define REDIS_EXPIRETIME 253
115 #define REDIS_SELECTDB 254
116 #define REDIS_EOF 255
117
118 /* Defines related to the dump file format. To store 32 bits lengths for short
119 * keys requires a lot of space, so we check the most significant 2 bits of
120 * the first byte to interpreter the length:
121 *
122 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
123 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
124 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
125 * 11|000000 this means: specially encoded object will follow. The six bits
126 * number specify the kind of object that follows.
127 * See the REDIS_RDB_ENC_* defines.
128 *
129 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
130 * values, will fit inside. */
131 #define REDIS_RDB_6BITLEN 0
132 #define REDIS_RDB_14BITLEN 1
133 #define REDIS_RDB_32BITLEN 2
134 #define REDIS_RDB_ENCVAL 3
135 #define REDIS_RDB_LENERR UINT_MAX
136
137 /* When a length of a string object stored on disk has the first two bits
138 * set, the remaining two bits specify a special encoding for the object
139 * accordingly to the following defines: */
140 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
141 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
142 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
143 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
144
145 /* Client flags */
146 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
147 #define REDIS_SLAVE 2 /* This client is a slave server */
148 #define REDIS_MASTER 4 /* This client is a master server */
149 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
150
151 /* Slave replication state - slave side */
152 #define REDIS_REPL_NONE 0 /* No active replication */
153 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
154 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
155
156 /* Slave replication state - from the point of view of master
157 * Note that in SEND_BULK and ONLINE state the slave receives new updates
158 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
159 * to start the next background saving in order to send updates to it. */
160 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
161 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
162 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
163 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
164
165 /* List related stuff */
166 #define REDIS_HEAD 0
167 #define REDIS_TAIL 1
168
169 /* Sort operations */
170 #define REDIS_SORT_GET 0
171 #define REDIS_SORT_DEL 1
172 #define REDIS_SORT_INCR 2
173 #define REDIS_SORT_DECR 3
174 #define REDIS_SORT_ASC 4
175 #define REDIS_SORT_DESC 5
176 #define REDIS_SORTKEY_MAX 1024
177
178 /* Log levels */
179 #define REDIS_DEBUG 0
180 #define REDIS_NOTICE 1
181 #define REDIS_WARNING 2
182
183 /* Anti-warning macro... */
184 #define REDIS_NOTUSED(V) ((void) V)
185
186 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
187 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
188
189 /*================================= Data types ============================== */
190
191 /* A redis object, that is a type able to hold a string / list / set */
192 typedef struct redisObject {
193 void *ptr;
194 unsigned char type;
195 unsigned char encoding;
196 unsigned char notused[2];
197 int refcount;
198 } robj;
199
200 typedef struct redisDb {
201 dict *dict;
202 dict *expires;
203 int id;
204 } redisDb;
205
206 /* With multiplexing we need to take per-clinet state.
207 * Clients are taken in a liked list. */
208 typedef struct redisClient {
209 int fd;
210 redisDb *db;
211 int dictid;
212 sds querybuf;
213 robj **argv, **mbargv;
214 int argc, mbargc;
215 int bulklen; /* bulk read len. -1 if not in bulk read mode */
216 int multibulk; /* multi bulk command format active */
217 list *reply;
218 int sentlen;
219 time_t lastinteraction; /* time of the last interaction, used for timeout */
220 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
221 int slaveseldb; /* slave selected db, if this client is a slave */
222 int authenticated; /* when requirepass is non-NULL */
223 int replstate; /* replication state if this is a slave */
224 int repldbfd; /* replication DB file descriptor */
225 long repldboff; /* replication DB file offset */
226 off_t repldbsize; /* replication DB file size */
227 } redisClient;
228
229 struct saveparam {
230 time_t seconds;
231 int changes;
232 };
233
234 /* Global server state structure */
235 struct redisServer {
236 int port;
237 int fd;
238 redisDb *db;
239 dict *sharingpool;
240 unsigned int sharingpoolsize;
241 long long dirty; /* changes to DB from the last save */
242 list *clients;
243 list *slaves, *monitors;
244 char neterr[ANET_ERR_LEN];
245 aeEventLoop *el;
246 int cronloops; /* number of times the cron function run */
247 list *objfreelist; /* A list of freed objects to avoid malloc() */
248 time_t lastsave; /* Unix time of last save succeeede */
249 size_t usedmemory; /* Used memory in megabytes */
250 /* Fields used only for stats */
251 time_t stat_starttime; /* server start time */
252 long long stat_numcommands; /* number of processed commands */
253 long long stat_numconnections; /* number of connections received */
254 /* Configuration */
255 int verbosity;
256 int glueoutputbuf;
257 int maxidletime;
258 int dbnum;
259 int daemonize;
260 char *pidfile;
261 int bgsaveinprogress;
262 pid_t bgsavechildpid;
263 struct saveparam *saveparams;
264 int saveparamslen;
265 char *logfile;
266 char *bindaddr;
267 char *dbfilename;
268 char *requirepass;
269 int shareobjects;
270 /* Replication related */
271 int isslave;
272 char *masterhost;
273 int masterport;
274 redisClient *master; /* client that is master for this slave */
275 int replstate;
276 unsigned int maxclients;
277 unsigned long maxmemory;
278 /* Sort parameters - qsort_r() is only available under BSD so we
279 * have to take this state global, in order to pass it to sortCompare() */
280 int sort_desc;
281 int sort_alpha;
282 int sort_bypattern;
283 };
284
285 typedef void redisCommandProc(redisClient *c);
286 struct redisCommand {
287 char *name;
288 redisCommandProc *proc;
289 int arity;
290 int flags;
291 };
292
293 struct redisFunctionSym {
294 char *name;
295 unsigned long pointer;
296 };
297
298 typedef struct _redisSortObject {
299 robj *obj;
300 union {
301 double score;
302 robj *cmpobj;
303 } u;
304 } redisSortObject;
305
306 typedef struct _redisSortOperation {
307 int type;
308 robj *pattern;
309 } redisSortOperation;
310
311 /* ZSETs use a specialized version of Skiplists */
312
313 typedef struct zskiplistNode {
314 struct zskiplistNode **forward;
315 struct zskiplistNode *backward;
316 double score;
317 robj *obj;
318 } zskiplistNode;
319
320 typedef struct zskiplist {
321 struct zskiplistNode *header, *tail;
322 long length;
323 int level;
324 } zskiplist;
325
326 typedef struct zset {
327 dict *dict;
328 zskiplist *zsl;
329 } zset;
330
331 /* Our shared "common" objects */
332
333 struct sharedObjectsStruct {
334 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
335 *colon, *nullbulk, *nullmultibulk,
336 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
337 *outofrangeerr, *plus,
338 *select0, *select1, *select2, *select3, *select4,
339 *select5, *select6, *select7, *select8, *select9;
340 } shared;
341
342 /* Global vars that are actally used as constants. The following double
343 * values are used for double on-disk serialization, and are initialized
344 * at runtime to avoid strange compiler optimizations. */
345
346 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
347
348 /*================================ Prototypes =============================== */
349
350 static void freeStringObject(robj *o);
351 static void freeListObject(robj *o);
352 static void freeSetObject(robj *o);
353 static void decrRefCount(void *o);
354 static robj *createObject(int type, void *ptr);
355 static void freeClient(redisClient *c);
356 static int rdbLoad(char *filename);
357 static void addReply(redisClient *c, robj *obj);
358 static void addReplySds(redisClient *c, sds s);
359 static void incrRefCount(robj *o);
360 static int rdbSaveBackground(char *filename);
361 static robj *createStringObject(char *ptr, size_t len);
362 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
363 static int syncWithMaster(void);
364 static robj *tryObjectSharing(robj *o);
365 static int tryObjectEncoding(robj *o);
366 static robj *getDecodedObject(const robj *o);
367 static int removeExpire(redisDb *db, robj *key);
368 static int expireIfNeeded(redisDb *db, robj *key);
369 static int deleteIfVolatile(redisDb *db, robj *key);
370 static int deleteKey(redisDb *db, robj *key);
371 static time_t getExpire(redisDb *db, robj *key);
372 static int setExpire(redisDb *db, robj *key, time_t when);
373 static void updateSlavesWaitingBgsave(int bgsaveerr);
374 static void freeMemoryIfNeeded(void);
375 static int processCommand(redisClient *c);
376 static void setupSigSegvAction(void);
377 static void rdbRemoveTempFile(pid_t childpid);
378 static size_t stringObjectLen(robj *o);
379 static void processInputBuffer(redisClient *c);
380 static zskiplist *zslCreate(void);
381 static void zslFree(zskiplist *zsl);
382
383 static void authCommand(redisClient *c);
384 static void pingCommand(redisClient *c);
385 static void echoCommand(redisClient *c);
386 static void setCommand(redisClient *c);
387 static void setnxCommand(redisClient *c);
388 static void getCommand(redisClient *c);
389 static void delCommand(redisClient *c);
390 static void existsCommand(redisClient *c);
391 static void incrCommand(redisClient *c);
392 static void decrCommand(redisClient *c);
393 static void incrbyCommand(redisClient *c);
394 static void decrbyCommand(redisClient *c);
395 static void selectCommand(redisClient *c);
396 static void randomkeyCommand(redisClient *c);
397 static void keysCommand(redisClient *c);
398 static void dbsizeCommand(redisClient *c);
399 static void lastsaveCommand(redisClient *c);
400 static void saveCommand(redisClient *c);
401 static void bgsaveCommand(redisClient *c);
402 static void shutdownCommand(redisClient *c);
403 static void moveCommand(redisClient *c);
404 static void renameCommand(redisClient *c);
405 static void renamenxCommand(redisClient *c);
406 static void lpushCommand(redisClient *c);
407 static void rpushCommand(redisClient *c);
408 static void lpopCommand(redisClient *c);
409 static void rpopCommand(redisClient *c);
410 static void llenCommand(redisClient *c);
411 static void lindexCommand(redisClient *c);
412 static void lrangeCommand(redisClient *c);
413 static void ltrimCommand(redisClient *c);
414 static void typeCommand(redisClient *c);
415 static void lsetCommand(redisClient *c);
416 static void saddCommand(redisClient *c);
417 static void sremCommand(redisClient *c);
418 static void smoveCommand(redisClient *c);
419 static void sismemberCommand(redisClient *c);
420 static void scardCommand(redisClient *c);
421 static void spopCommand(redisClient *c);
422 static void srandmemberCommand(redisClient *c);
423 static void sinterCommand(redisClient *c);
424 static void sinterstoreCommand(redisClient *c);
425 static void sunionCommand(redisClient *c);
426 static void sunionstoreCommand(redisClient *c);
427 static void sdiffCommand(redisClient *c);
428 static void sdiffstoreCommand(redisClient *c);
429 static void syncCommand(redisClient *c);
430 static void flushdbCommand(redisClient *c);
431 static void flushallCommand(redisClient *c);
432 static void sortCommand(redisClient *c);
433 static void lremCommand(redisClient *c);
434 static void infoCommand(redisClient *c);
435 static void mgetCommand(redisClient *c);
436 static void monitorCommand(redisClient *c);
437 static void expireCommand(redisClient *c);
438 static void getsetCommand(redisClient *c);
439 static void ttlCommand(redisClient *c);
440 static void slaveofCommand(redisClient *c);
441 static void debugCommand(redisClient *c);
442 static void msetCommand(redisClient *c);
443 static void msetnxCommand(redisClient *c);
444 static void zaddCommand(redisClient *c);
445 static void zrangeCommand(redisClient *c);
446 static void zrevrangeCommand(redisClient *c);
447 static void zlenCommand(redisClient *c);
448 static void zremCommand(redisClient *c);
449
450 /*================================= Globals ================================= */
451
452 /* Global vars */
453 static struct redisServer server; /* server global state */
454 static struct redisCommand cmdTable[] = {
455 {"get",getCommand,2,REDIS_CMD_INLINE},
456 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
457 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
458 {"del",delCommand,-2,REDIS_CMD_INLINE},
459 {"exists",existsCommand,2,REDIS_CMD_INLINE},
460 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
461 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
462 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
463 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
464 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
465 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
466 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
467 {"llen",llenCommand,2,REDIS_CMD_INLINE},
468 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
469 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
470 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
471 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
472 {"lrem",lremCommand,4,REDIS_CMD_BULK},
473 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
474 {"srem",sremCommand,3,REDIS_CMD_BULK},
475 {"smove",smoveCommand,4,REDIS_CMD_BULK},
476 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
477 {"scard",scardCommand,2,REDIS_CMD_INLINE},
478 {"spop",spopCommand,2,REDIS_CMD_INLINE},
479 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
480 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
481 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
482 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
483 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
484 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
485 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
486 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
487 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
488 {"zrem",zremCommand,3,REDIS_CMD_BULK},
489 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
490 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
491 {"zlen",zlenCommand,2,REDIS_CMD_INLINE},
492 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
493 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
494 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
495 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
496 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
497 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
498 {"select",selectCommand,2,REDIS_CMD_INLINE},
499 {"move",moveCommand,3,REDIS_CMD_INLINE},
500 {"rename",renameCommand,3,REDIS_CMD_INLINE},
501 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
502 {"expire",expireCommand,3,REDIS_CMD_INLINE},
503 {"keys",keysCommand,2,REDIS_CMD_INLINE},
504 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
505 {"auth",authCommand,2,REDIS_CMD_INLINE},
506 {"ping",pingCommand,1,REDIS_CMD_INLINE},
507 {"echo",echoCommand,2,REDIS_CMD_BULK},
508 {"save",saveCommand,1,REDIS_CMD_INLINE},
509 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
510 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
511 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
512 {"type",typeCommand,2,REDIS_CMD_INLINE},
513 {"sync",syncCommand,1,REDIS_CMD_INLINE},
514 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
515 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
516 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
517 {"info",infoCommand,1,REDIS_CMD_INLINE},
518 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
519 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
520 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
521 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
522 {NULL,NULL,0,0}
523 };
524 /*============================ Utility functions ============================ */
525
526 /* Glob-style pattern matching. */
527 int stringmatchlen(const char *pattern, int patternLen,
528 const char *string, int stringLen, int nocase)
529 {
530 while(patternLen) {
531 switch(pattern[0]) {
532 case '*':
533 while (pattern[1] == '*') {
534 pattern++;
535 patternLen--;
536 }
537 if (patternLen == 1)
538 return 1; /* match */
539 while(stringLen) {
540 if (stringmatchlen(pattern+1, patternLen-1,
541 string, stringLen, nocase))
542 return 1; /* match */
543 string++;
544 stringLen--;
545 }
546 return 0; /* no match */
547 break;
548 case '?':
549 if (stringLen == 0)
550 return 0; /* no match */
551 string++;
552 stringLen--;
553 break;
554 case '[':
555 {
556 int not, match;
557
558 pattern++;
559 patternLen--;
560 not = pattern[0] == '^';
561 if (not) {
562 pattern++;
563 patternLen--;
564 }
565 match = 0;
566 while(1) {
567 if (pattern[0] == '\\') {
568 pattern++;
569 patternLen--;
570 if (pattern[0] == string[0])
571 match = 1;
572 } else if (pattern[0] == ']') {
573 break;
574 } else if (patternLen == 0) {
575 pattern--;
576 patternLen++;
577 break;
578 } else if (pattern[1] == '-' && patternLen >= 3) {
579 int start = pattern[0];
580 int end = pattern[2];
581 int c = string[0];
582 if (start > end) {
583 int t = start;
584 start = end;
585 end = t;
586 }
587 if (nocase) {
588 start = tolower(start);
589 end = tolower(end);
590 c = tolower(c);
591 }
592 pattern += 2;
593 patternLen -= 2;
594 if (c >= start && c <= end)
595 match = 1;
596 } else {
597 if (!nocase) {
598 if (pattern[0] == string[0])
599 match = 1;
600 } else {
601 if (tolower((int)pattern[0]) == tolower((int)string[0]))
602 match = 1;
603 }
604 }
605 pattern++;
606 patternLen--;
607 }
608 if (not)
609 match = !match;
610 if (!match)
611 return 0; /* no match */
612 string++;
613 stringLen--;
614 break;
615 }
616 case '\\':
617 if (patternLen >= 2) {
618 pattern++;
619 patternLen--;
620 }
621 /* fall through */
622 default:
623 if (!nocase) {
624 if (pattern[0] != string[0])
625 return 0; /* no match */
626 } else {
627 if (tolower((int)pattern[0]) != tolower((int)string[0]))
628 return 0; /* no match */
629 }
630 string++;
631 stringLen--;
632 break;
633 }
634 pattern++;
635 patternLen--;
636 if (stringLen == 0) {
637 while(*pattern == '*') {
638 pattern++;
639 patternLen--;
640 }
641 break;
642 }
643 }
644 if (patternLen == 0 && stringLen == 0)
645 return 1;
646 return 0;
647 }
648
649 static void redisLog(int level, const char *fmt, ...) {
650 va_list ap;
651 FILE *fp;
652
653 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
654 if (!fp) return;
655
656 va_start(ap, fmt);
657 if (level >= server.verbosity) {
658 char *c = ".-*";
659 char buf[64];
660 time_t now;
661
662 now = time(NULL);
663 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
664 fprintf(fp,"%s %c ",buf,c[level]);
665 vfprintf(fp, fmt, ap);
666 fprintf(fp,"\n");
667 fflush(fp);
668 }
669 va_end(ap);
670
671 if (server.logfile) fclose(fp);
672 }
673
674 /*====================== Hash table type implementation ==================== */
675
676 /* This is an hash table type that uses the SDS dynamic strings libary as
677 * keys and radis objects as values (objects can hold SDS strings,
678 * lists, sets). */
679
680 static void dictVanillaFree(void *privdata, void *val)
681 {
682 DICT_NOTUSED(privdata);
683 zfree(val);
684 }
685
686 static int sdsDictKeyCompare(void *privdata, const void *key1,
687 const void *key2)
688 {
689 int l1,l2;
690 DICT_NOTUSED(privdata);
691
692 l1 = sdslen((sds)key1);
693 l2 = sdslen((sds)key2);
694 if (l1 != l2) return 0;
695 return memcmp(key1, key2, l1) == 0;
696 }
697
698 static void dictRedisObjectDestructor(void *privdata, void *val)
699 {
700 DICT_NOTUSED(privdata);
701
702 decrRefCount(val);
703 }
704
705 static int dictObjKeyCompare(void *privdata, const void *key1,
706 const void *key2)
707 {
708 const robj *o1 = key1, *o2 = key2;
709 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
710 }
711
712 static unsigned int dictObjHash(const void *key) {
713 const robj *o = key;
714 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
715 }
716
717 static int dictEncObjKeyCompare(void *privdata, const void *key1,
718 const void *key2)
719 {
720 const robj *o1 = key1, *o2 = key2;
721
722 if (o1->encoding == REDIS_ENCODING_RAW &&
723 o2->encoding == REDIS_ENCODING_RAW)
724 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
725 else {
726 robj *dec1, *dec2;
727 int cmp;
728
729 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
730 getDecodedObject(o1) : (robj*)o1;
731 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
732 getDecodedObject(o2) : (robj*)o2;
733 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
734 if (dec1 != o1) decrRefCount(dec1);
735 if (dec2 != o2) decrRefCount(dec2);
736 return cmp;
737 }
738 }
739
740 static unsigned int dictEncObjHash(const void *key) {
741 const robj *o = key;
742
743 if (o->encoding == REDIS_ENCODING_RAW)
744 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
745 else {
746 robj *dec = getDecodedObject(o);
747 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
748 decrRefCount(dec);
749 return hash;
750 }
751 }
752
753 static dictType setDictType = {
754 dictEncObjHash, /* hash function */
755 NULL, /* key dup */
756 NULL, /* val dup */
757 dictEncObjKeyCompare, /* key compare */
758 dictRedisObjectDestructor, /* key destructor */
759 NULL /* val destructor */
760 };
761
762 static dictType zsetDictType = {
763 dictEncObjHash, /* hash function */
764 NULL, /* key dup */
765 NULL, /* val dup */
766 dictEncObjKeyCompare, /* key compare */
767 dictRedisObjectDestructor, /* key destructor */
768 dictVanillaFree /* val destructor */
769 };
770
771 static dictType hashDictType = {
772 dictObjHash, /* hash function */
773 NULL, /* key dup */
774 NULL, /* val dup */
775 dictObjKeyCompare, /* key compare */
776 dictRedisObjectDestructor, /* key destructor */
777 dictRedisObjectDestructor /* val destructor */
778 };
779
780 /* ========================= Random utility functions ======================= */
781
782 /* Redis generally does not try to recover from out of memory conditions
783 * when allocating objects or strings, it is not clear if it will be possible
784 * to report this condition to the client since the networking layer itself
785 * is based on heap allocation for send buffers, so we simply abort.
786 * At least the code will be simpler to read... */
787 static void oom(const char *msg) {
788 fprintf(stderr, "%s: Out of memory\n",msg);
789 fflush(stderr);
790 sleep(1);
791 abort();
792 }
793
794 /* ====================== Redis server networking stuff ===================== */
795 static void closeTimedoutClients(void) {
796 redisClient *c;
797 listNode *ln;
798 time_t now = time(NULL);
799
800 listRewind(server.clients);
801 while ((ln = listYield(server.clients)) != NULL) {
802 c = listNodeValue(ln);
803 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
804 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
805 (now - c->lastinteraction > server.maxidletime)) {
806 redisLog(REDIS_DEBUG,"Closing idle client");
807 freeClient(c);
808 }
809 }
810 }
811
812 static int htNeedsResize(dict *dict) {
813 long long size, used;
814
815 size = dictSlots(dict);
816 used = dictSize(dict);
817 return (size && used && size > DICT_HT_INITIAL_SIZE &&
818 (used*100/size < REDIS_HT_MINFILL));
819 }
820
821 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
822 * we resize the hash table to save memory */
823 static void tryResizeHashTables(void) {
824 int j;
825
826 for (j = 0; j < server.dbnum; j++) {
827 if (htNeedsResize(server.db[j].dict)) {
828 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
829 dictResize(server.db[j].dict);
830 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
831 }
832 if (htNeedsResize(server.db[j].expires))
833 dictResize(server.db[j].expires);
834 }
835 }
836
837 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
838 int j, loops = server.cronloops++;
839 REDIS_NOTUSED(eventLoop);
840 REDIS_NOTUSED(id);
841 REDIS_NOTUSED(clientData);
842
843 /* Update the global state with the amount of used memory */
844 server.usedmemory = zmalloc_used_memory();
845
846 /* Show some info about non-empty databases */
847 for (j = 0; j < server.dbnum; j++) {
848 long long size, used, vkeys;
849
850 size = dictSlots(server.db[j].dict);
851 used = dictSize(server.db[j].dict);
852 vkeys = dictSize(server.db[j].expires);
853 if (!(loops % 5) && (used || vkeys)) {
854 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
855 /* dictPrintStats(server.dict); */
856 }
857 }
858
859 /* We don't want to resize the hash tables while a bacground saving
860 * is in progress: the saving child is created using fork() that is
861 * implemented with a copy-on-write semantic in most modern systems, so
862 * if we resize the HT while there is the saving child at work actually
863 * a lot of memory movements in the parent will cause a lot of pages
864 * copied. */
865 if (!server.bgsaveinprogress) tryResizeHashTables();
866
867 /* Show information about connected clients */
868 if (!(loops % 5)) {
869 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
870 listLength(server.clients)-listLength(server.slaves),
871 listLength(server.slaves),
872 server.usedmemory,
873 dictSize(server.sharingpool));
874 }
875
876 /* Close connections of timedout clients */
877 if (server.maxidletime && !(loops % 10))
878 closeTimedoutClients();
879
880 /* Check if a background saving in progress terminated */
881 if (server.bgsaveinprogress) {
882 int statloc;
883 if (wait4(-1,&statloc,WNOHANG,NULL)) {
884 int exitcode = WEXITSTATUS(statloc);
885 int bysignal = WIFSIGNALED(statloc);
886
887 if (!bysignal && exitcode == 0) {
888 redisLog(REDIS_NOTICE,
889 "Background saving terminated with success");
890 server.dirty = 0;
891 server.lastsave = time(NULL);
892 } else if (!bysignal && exitcode != 0) {
893 redisLog(REDIS_WARNING, "Background saving error");
894 } else {
895 redisLog(REDIS_WARNING,
896 "Background saving terminated by signal");
897 rdbRemoveTempFile(server.bgsavechildpid);
898 }
899 server.bgsaveinprogress = 0;
900 server.bgsavechildpid = -1;
901 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
902 }
903 } else {
904 /* If there is not a background saving in progress check if
905 * we have to save now */
906 time_t now = time(NULL);
907 for (j = 0; j < server.saveparamslen; j++) {
908 struct saveparam *sp = server.saveparams+j;
909
910 if (server.dirty >= sp->changes &&
911 now-server.lastsave > sp->seconds) {
912 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
913 sp->changes, sp->seconds);
914 rdbSaveBackground(server.dbfilename);
915 break;
916 }
917 }
918 }
919
920 /* Try to expire a few timed out keys */
921 for (j = 0; j < server.dbnum; j++) {
922 redisDb *db = server.db+j;
923 int num = dictSize(db->expires);
924
925 if (num) {
926 time_t now = time(NULL);
927
928 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
929 num = REDIS_EXPIRELOOKUPS_PER_CRON;
930 while (num--) {
931 dictEntry *de;
932 time_t t;
933
934 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
935 t = (time_t) dictGetEntryVal(de);
936 if (now > t) {
937 deleteKey(db,dictGetEntryKey(de));
938 }
939 }
940 }
941 }
942
943 /* Check if we should connect to a MASTER */
944 if (server.replstate == REDIS_REPL_CONNECT) {
945 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
946 if (syncWithMaster() == REDIS_OK) {
947 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
948 }
949 }
950 return 1000;
951 }
952
953 static void createSharedObjects(void) {
954 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
955 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
956 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
957 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
958 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
959 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
960 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
961 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
962 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
963 /* no such key */
964 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
965 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
966 "-ERR Operation against a key holding the wrong kind of value\r\n"));
967 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
968 "-ERR no such key\r\n"));
969 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
970 "-ERR syntax error\r\n"));
971 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
972 "-ERR source and destination objects are the same\r\n"));
973 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
974 "-ERR index out of range\r\n"));
975 shared.space = createObject(REDIS_STRING,sdsnew(" "));
976 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
977 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
978 shared.select0 = createStringObject("select 0\r\n",10);
979 shared.select1 = createStringObject("select 1\r\n",10);
980 shared.select2 = createStringObject("select 2\r\n",10);
981 shared.select3 = createStringObject("select 3\r\n",10);
982 shared.select4 = createStringObject("select 4\r\n",10);
983 shared.select5 = createStringObject("select 5\r\n",10);
984 shared.select6 = createStringObject("select 6\r\n",10);
985 shared.select7 = createStringObject("select 7\r\n",10);
986 shared.select8 = createStringObject("select 8\r\n",10);
987 shared.select9 = createStringObject("select 9\r\n",10);
988 }
989
990 static void appendServerSaveParams(time_t seconds, int changes) {
991 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
992 server.saveparams[server.saveparamslen].seconds = seconds;
993 server.saveparams[server.saveparamslen].changes = changes;
994 server.saveparamslen++;
995 }
996
997 static void ResetServerSaveParams() {
998 zfree(server.saveparams);
999 server.saveparams = NULL;
1000 server.saveparamslen = 0;
1001 }
1002
1003 static void initServerConfig() {
1004 server.dbnum = REDIS_DEFAULT_DBNUM;
1005 server.port = REDIS_SERVERPORT;
1006 server.verbosity = REDIS_DEBUG;
1007 server.maxidletime = REDIS_MAXIDLETIME;
1008 server.saveparams = NULL;
1009 server.logfile = NULL; /* NULL = log on standard output */
1010 server.bindaddr = NULL;
1011 server.glueoutputbuf = 1;
1012 server.daemonize = 0;
1013 server.pidfile = "/var/run/redis.pid";
1014 server.dbfilename = "dump.rdb";
1015 server.requirepass = NULL;
1016 server.shareobjects = 0;
1017 server.sharingpoolsize = 1024;
1018 server.maxclients = 0;
1019 server.maxmemory = 0;
1020 ResetServerSaveParams();
1021
1022 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1023 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1024 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1025 /* Replication related */
1026 server.isslave = 0;
1027 server.masterhost = NULL;
1028 server.masterport = 6379;
1029 server.master = NULL;
1030 server.replstate = REDIS_REPL_NONE;
1031
1032 /* Double constants initialization */
1033 R_Zero = 0.0;
1034 R_PosInf = 1.0/R_Zero;
1035 R_NegInf = -1.0/R_Zero;
1036 R_Nan = R_Zero/R_Zero;
1037 }
1038
1039 static void initServer() {
1040 int j;
1041
1042 signal(SIGHUP, SIG_IGN);
1043 signal(SIGPIPE, SIG_IGN);
1044 setupSigSegvAction();
1045
1046 server.clients = listCreate();
1047 server.slaves = listCreate();
1048 server.monitors = listCreate();
1049 server.objfreelist = listCreate();
1050 createSharedObjects();
1051 server.el = aeCreateEventLoop();
1052 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1053 server.sharingpool = dictCreate(&setDictType,NULL);
1054 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1055 if (server.fd == -1) {
1056 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1057 exit(1);
1058 }
1059 for (j = 0; j < server.dbnum; j++) {
1060 server.db[j].dict = dictCreate(&hashDictType,NULL);
1061 server.db[j].expires = dictCreate(&setDictType,NULL);
1062 server.db[j].id = j;
1063 }
1064 server.cronloops = 0;
1065 server.bgsaveinprogress = 0;
1066 server.bgsavechildpid = -1;
1067 server.lastsave = time(NULL);
1068 server.dirty = 0;
1069 server.usedmemory = 0;
1070 server.stat_numcommands = 0;
1071 server.stat_numconnections = 0;
1072 server.stat_starttime = time(NULL);
1073 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1074 }
1075
1076 /* Empty the whole database */
1077 static long long emptyDb() {
1078 int j;
1079 long long removed = 0;
1080
1081 for (j = 0; j < server.dbnum; j++) {
1082 removed += dictSize(server.db[j].dict);
1083 dictEmpty(server.db[j].dict);
1084 dictEmpty(server.db[j].expires);
1085 }
1086 return removed;
1087 }
1088
1089 static int yesnotoi(char *s) {
1090 if (!strcasecmp(s,"yes")) return 1;
1091 else if (!strcasecmp(s,"no")) return 0;
1092 else return -1;
1093 }
1094
1095 /* I agree, this is a very rudimental way to load a configuration...
1096 will improve later if the config gets more complex */
1097 static void loadServerConfig(char *filename) {
1098 FILE *fp;
1099 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1100 int linenum = 0;
1101 sds line = NULL;
1102
1103 if (filename[0] == '-' && filename[1] == '\0')
1104 fp = stdin;
1105 else {
1106 if ((fp = fopen(filename,"r")) == NULL) {
1107 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1108 exit(1);
1109 }
1110 }
1111
1112 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1113 sds *argv;
1114 int argc, j;
1115
1116 linenum++;
1117 line = sdsnew(buf);
1118 line = sdstrim(line," \t\r\n");
1119
1120 /* Skip comments and blank lines*/
1121 if (line[0] == '#' || line[0] == '\0') {
1122 sdsfree(line);
1123 continue;
1124 }
1125
1126 /* Split into arguments */
1127 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1128 sdstolower(argv[0]);
1129
1130 /* Execute config directives */
1131 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1132 server.maxidletime = atoi(argv[1]);
1133 if (server.maxidletime < 0) {
1134 err = "Invalid timeout value"; goto loaderr;
1135 }
1136 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1137 server.port = atoi(argv[1]);
1138 if (server.port < 1 || server.port > 65535) {
1139 err = "Invalid port"; goto loaderr;
1140 }
1141 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1142 server.bindaddr = zstrdup(argv[1]);
1143 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1144 int seconds = atoi(argv[1]);
1145 int changes = atoi(argv[2]);
1146 if (seconds < 1 || changes < 0) {
1147 err = "Invalid save parameters"; goto loaderr;
1148 }
1149 appendServerSaveParams(seconds,changes);
1150 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1151 if (chdir(argv[1]) == -1) {
1152 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1153 argv[1], strerror(errno));
1154 exit(1);
1155 }
1156 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1157 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1158 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1159 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1160 else {
1161 err = "Invalid log level. Must be one of debug, notice, warning";
1162 goto loaderr;
1163 }
1164 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1165 FILE *logfp;
1166
1167 server.logfile = zstrdup(argv[1]);
1168 if (!strcasecmp(server.logfile,"stdout")) {
1169 zfree(server.logfile);
1170 server.logfile = NULL;
1171 }
1172 if (server.logfile) {
1173 /* Test if we are able to open the file. The server will not
1174 * be able to abort just for this problem later... */
1175 logfp = fopen(server.logfile,"a");
1176 if (logfp == NULL) {
1177 err = sdscatprintf(sdsempty(),
1178 "Can't open the log file: %s", strerror(errno));
1179 goto loaderr;
1180 }
1181 fclose(logfp);
1182 }
1183 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1184 server.dbnum = atoi(argv[1]);
1185 if (server.dbnum < 1) {
1186 err = "Invalid number of databases"; goto loaderr;
1187 }
1188 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1189 server.maxclients = atoi(argv[1]);
1190 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1191 server.maxmemory = strtoll(argv[1], NULL, 10);
1192 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1193 server.masterhost = sdsnew(argv[1]);
1194 server.masterport = atoi(argv[2]);
1195 server.replstate = REDIS_REPL_CONNECT;
1196 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1197 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1198 err = "argument must be 'yes' or 'no'"; goto loaderr;
1199 }
1200 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1201 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1202 err = "argument must be 'yes' or 'no'"; goto loaderr;
1203 }
1204 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1205 server.sharingpoolsize = atoi(argv[1]);
1206 if (server.sharingpoolsize < 1) {
1207 err = "invalid object sharing pool size"; goto loaderr;
1208 }
1209 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1210 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1211 err = "argument must be 'yes' or 'no'"; goto loaderr;
1212 }
1213 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1214 server.requirepass = zstrdup(argv[1]);
1215 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1216 server.pidfile = zstrdup(argv[1]);
1217 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1218 server.dbfilename = zstrdup(argv[1]);
1219 } else {
1220 err = "Bad directive or wrong number of arguments"; goto loaderr;
1221 }
1222 for (j = 0; j < argc; j++)
1223 sdsfree(argv[j]);
1224 zfree(argv);
1225 sdsfree(line);
1226 }
1227 if (fp != stdin) fclose(fp);
1228 return;
1229
1230 loaderr:
1231 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1232 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1233 fprintf(stderr, ">>> '%s'\n", line);
1234 fprintf(stderr, "%s\n", err);
1235 exit(1);
1236 }
1237
1238 static void freeClientArgv(redisClient *c) {
1239 int j;
1240
1241 for (j = 0; j < c->argc; j++)
1242 decrRefCount(c->argv[j]);
1243 for (j = 0; j < c->mbargc; j++)
1244 decrRefCount(c->mbargv[j]);
1245 c->argc = 0;
1246 c->mbargc = 0;
1247 }
1248
1249 static void freeClient(redisClient *c) {
1250 listNode *ln;
1251
1252 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1253 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1254 sdsfree(c->querybuf);
1255 listRelease(c->reply);
1256 freeClientArgv(c);
1257 close(c->fd);
1258 ln = listSearchKey(server.clients,c);
1259 assert(ln != NULL);
1260 listDelNode(server.clients,ln);
1261 if (c->flags & REDIS_SLAVE) {
1262 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1263 close(c->repldbfd);
1264 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1265 ln = listSearchKey(l,c);
1266 assert(ln != NULL);
1267 listDelNode(l,ln);
1268 }
1269 if (c->flags & REDIS_MASTER) {
1270 server.master = NULL;
1271 server.replstate = REDIS_REPL_CONNECT;
1272 }
1273 zfree(c->argv);
1274 zfree(c->mbargv);
1275 zfree(c);
1276 }
1277
1278 static void glueReplyBuffersIfNeeded(redisClient *c) {
1279 int totlen = 0;
1280 listNode *ln;
1281 robj *o;
1282
1283 listRewind(c->reply);
1284 while((ln = listYield(c->reply))) {
1285 o = ln->value;
1286 totlen += sdslen(o->ptr);
1287 /* This optimization makes more sense if we don't have to copy
1288 * too much data */
1289 if (totlen > 1024) return;
1290 }
1291 if (totlen > 0) {
1292 char buf[1024];
1293 int copylen = 0;
1294
1295 listRewind(c->reply);
1296 while((ln = listYield(c->reply))) {
1297 o = ln->value;
1298 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1299 copylen += sdslen(o->ptr);
1300 listDelNode(c->reply,ln);
1301 }
1302 /* Now the output buffer is empty, add the new single element */
1303 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1304 listAddNodeTail(c->reply,o);
1305 }
1306 }
1307
1308 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1309 redisClient *c = privdata;
1310 int nwritten = 0, totwritten = 0, objlen;
1311 robj *o;
1312 REDIS_NOTUSED(el);
1313 REDIS_NOTUSED(mask);
1314
1315 if (server.glueoutputbuf && listLength(c->reply) > 1)
1316 glueReplyBuffersIfNeeded(c);
1317 while(listLength(c->reply)) {
1318 o = listNodeValue(listFirst(c->reply));
1319 objlen = sdslen(o->ptr);
1320
1321 if (objlen == 0) {
1322 listDelNode(c->reply,listFirst(c->reply));
1323 continue;
1324 }
1325
1326 if (c->flags & REDIS_MASTER) {
1327 /* Don't reply to a master */
1328 nwritten = objlen - c->sentlen;
1329 } else {
1330 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1331 if (nwritten <= 0) break;
1332 }
1333 c->sentlen += nwritten;
1334 totwritten += nwritten;
1335 /* If we fully sent the object on head go to the next one */
1336 if (c->sentlen == objlen) {
1337 listDelNode(c->reply,listFirst(c->reply));
1338 c->sentlen = 0;
1339 }
1340 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1341 * bytes, in a single threaded server it's a good idea to server
1342 * other clients as well, even if a very large request comes from
1343 * super fast link that is always able to accept data (in real world
1344 * terms think to 'KEYS *' against the loopback interfae) */
1345 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1346 }
1347 if (nwritten == -1) {
1348 if (errno == EAGAIN) {
1349 nwritten = 0;
1350 } else {
1351 redisLog(REDIS_DEBUG,
1352 "Error writing to client: %s", strerror(errno));
1353 freeClient(c);
1354 return;
1355 }
1356 }
1357 if (totwritten > 0) c->lastinteraction = time(NULL);
1358 if (listLength(c->reply) == 0) {
1359 c->sentlen = 0;
1360 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1361 }
1362 }
1363
1364 static struct redisCommand *lookupCommand(char *name) {
1365 int j = 0;
1366 while(cmdTable[j].name != NULL) {
1367 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1368 j++;
1369 }
1370 return NULL;
1371 }
1372
1373 /* resetClient prepare the client to process the next command */
1374 static void resetClient(redisClient *c) {
1375 freeClientArgv(c);
1376 c->bulklen = -1;
1377 c->multibulk = 0;
1378 }
1379
1380 /* If this function gets called we already read a whole
1381 * command, argments are in the client argv/argc fields.
1382 * processCommand() execute the command or prepare the
1383 * server for a bulk read from the client.
1384 *
1385 * If 1 is returned the client is still alive and valid and
1386 * and other operations can be performed by the caller. Otherwise
1387 * if 0 is returned the client was destroied (i.e. after QUIT). */
1388 static int processCommand(redisClient *c) {
1389 struct redisCommand *cmd;
1390 long long dirty;
1391
1392 /* Free some memory if needed (maxmemory setting) */
1393 if (server.maxmemory) freeMemoryIfNeeded();
1394
1395 /* Handle the multi bulk command type. This is an alternative protocol
1396 * supported by Redis in order to receive commands that are composed of
1397 * multiple binary-safe "bulk" arguments. The latency of processing is
1398 * a bit higher but this allows things like multi-sets, so if this
1399 * protocol is used only for MSET and similar commands this is a big win. */
1400 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1401 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1402 if (c->multibulk <= 0) {
1403 resetClient(c);
1404 return 1;
1405 } else {
1406 decrRefCount(c->argv[c->argc-1]);
1407 c->argc--;
1408 return 1;
1409 }
1410 } else if (c->multibulk) {
1411 if (c->bulklen == -1) {
1412 if (((char*)c->argv[0]->ptr)[0] != '$') {
1413 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1414 resetClient(c);
1415 return 1;
1416 } else {
1417 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1418 decrRefCount(c->argv[0]);
1419 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1420 c->argc--;
1421 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1422 resetClient(c);
1423 return 1;
1424 }
1425 c->argc--;
1426 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1427 return 1;
1428 }
1429 } else {
1430 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1431 c->mbargv[c->mbargc] = c->argv[0];
1432 c->mbargc++;
1433 c->argc--;
1434 c->multibulk--;
1435 if (c->multibulk == 0) {
1436 robj **auxargv;
1437 int auxargc;
1438
1439 /* Here we need to swap the multi-bulk argc/argv with the
1440 * normal argc/argv of the client structure. */
1441 auxargv = c->argv;
1442 c->argv = c->mbargv;
1443 c->mbargv = auxargv;
1444
1445 auxargc = c->argc;
1446 c->argc = c->mbargc;
1447 c->mbargc = auxargc;
1448
1449 /* We need to set bulklen to something different than -1
1450 * in order for the code below to process the command without
1451 * to try to read the last argument of a bulk command as
1452 * a special argument. */
1453 c->bulklen = 0;
1454 /* continue below and process the command */
1455 } else {
1456 c->bulklen = -1;
1457 return 1;
1458 }
1459 }
1460 }
1461 /* -- end of multi bulk commands processing -- */
1462
1463 /* The QUIT command is handled as a special case. Normal command
1464 * procs are unable to close the client connection safely */
1465 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1466 freeClient(c);
1467 return 0;
1468 }
1469 cmd = lookupCommand(c->argv[0]->ptr);
1470 if (!cmd) {
1471 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1472 resetClient(c);
1473 return 1;
1474 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1475 (c->argc < -cmd->arity)) {
1476 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1477 resetClient(c);
1478 return 1;
1479 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1480 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1481 resetClient(c);
1482 return 1;
1483 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1484 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1485
1486 decrRefCount(c->argv[c->argc-1]);
1487 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1488 c->argc--;
1489 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1490 resetClient(c);
1491 return 1;
1492 }
1493 c->argc--;
1494 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1495 /* It is possible that the bulk read is already in the
1496 * buffer. Check this condition and handle it accordingly.
1497 * This is just a fast path, alternative to call processInputBuffer().
1498 * It's a good idea since the code is small and this condition
1499 * happens most of the times. */
1500 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1501 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1502 c->argc++;
1503 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1504 } else {
1505 return 1;
1506 }
1507 }
1508 /* Let's try to share objects on the command arguments vector */
1509 if (server.shareobjects) {
1510 int j;
1511 for(j = 1; j < c->argc; j++)
1512 c->argv[j] = tryObjectSharing(c->argv[j]);
1513 }
1514 /* Let's try to encode the bulk object to save space. */
1515 if (cmd->flags & REDIS_CMD_BULK)
1516 tryObjectEncoding(c->argv[c->argc-1]);
1517
1518 /* Check if the user is authenticated */
1519 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1520 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1521 resetClient(c);
1522 return 1;
1523 }
1524
1525 /* Exec the command */
1526 dirty = server.dirty;
1527 cmd->proc(c);
1528 if (server.dirty-dirty != 0 && listLength(server.slaves))
1529 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1530 if (listLength(server.monitors))
1531 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1532 server.stat_numcommands++;
1533
1534 /* Prepare the client for the next command */
1535 if (c->flags & REDIS_CLOSE) {
1536 freeClient(c);
1537 return 0;
1538 }
1539 resetClient(c);
1540 return 1;
1541 }
1542
1543 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1544 listNode *ln;
1545 int outc = 0, j;
1546 robj **outv;
1547 /* (args*2)+1 is enough room for args, spaces, newlines */
1548 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1549
1550 if (argc <= REDIS_STATIC_ARGS) {
1551 outv = static_outv;
1552 } else {
1553 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1554 }
1555
1556 for (j = 0; j < argc; j++) {
1557 if (j != 0) outv[outc++] = shared.space;
1558 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1559 robj *lenobj;
1560
1561 lenobj = createObject(REDIS_STRING,
1562 sdscatprintf(sdsempty(),"%d\r\n",
1563 stringObjectLen(argv[j])));
1564 lenobj->refcount = 0;
1565 outv[outc++] = lenobj;
1566 }
1567 outv[outc++] = argv[j];
1568 }
1569 outv[outc++] = shared.crlf;
1570
1571 /* Increment all the refcounts at start and decrement at end in order to
1572 * be sure to free objects if there is no slave in a replication state
1573 * able to be feed with commands */
1574 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1575 listRewind(slaves);
1576 while((ln = listYield(slaves))) {
1577 redisClient *slave = ln->value;
1578
1579 /* Don't feed slaves that are still waiting for BGSAVE to start */
1580 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1581
1582 /* Feed all the other slaves, MONITORs and so on */
1583 if (slave->slaveseldb != dictid) {
1584 robj *selectcmd;
1585
1586 switch(dictid) {
1587 case 0: selectcmd = shared.select0; break;
1588 case 1: selectcmd = shared.select1; break;
1589 case 2: selectcmd = shared.select2; break;
1590 case 3: selectcmd = shared.select3; break;
1591 case 4: selectcmd = shared.select4; break;
1592 case 5: selectcmd = shared.select5; break;
1593 case 6: selectcmd = shared.select6; break;
1594 case 7: selectcmd = shared.select7; break;
1595 case 8: selectcmd = shared.select8; break;
1596 case 9: selectcmd = shared.select9; break;
1597 default:
1598 selectcmd = createObject(REDIS_STRING,
1599 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1600 selectcmd->refcount = 0;
1601 break;
1602 }
1603 addReply(slave,selectcmd);
1604 slave->slaveseldb = dictid;
1605 }
1606 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1607 }
1608 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1609 if (outv != static_outv) zfree(outv);
1610 }
1611
1612 static void processInputBuffer(redisClient *c) {
1613 again:
1614 if (c->bulklen == -1) {
1615 /* Read the first line of the query */
1616 char *p = strchr(c->querybuf,'\n');
1617 size_t querylen;
1618
1619 if (p) {
1620 sds query, *argv;
1621 int argc, j;
1622
1623 query = c->querybuf;
1624 c->querybuf = sdsempty();
1625 querylen = 1+(p-(query));
1626 if (sdslen(query) > querylen) {
1627 /* leave data after the first line of the query in the buffer */
1628 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1629 }
1630 *p = '\0'; /* remove "\n" */
1631 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1632 sdsupdatelen(query);
1633
1634 /* Now we can split the query in arguments */
1635 if (sdslen(query) == 0) {
1636 /* Ignore empty query */
1637 sdsfree(query);
1638 return;
1639 }
1640 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1641 sdsfree(query);
1642
1643 if (c->argv) zfree(c->argv);
1644 c->argv = zmalloc(sizeof(robj*)*argc);
1645
1646 for (j = 0; j < argc; j++) {
1647 if (sdslen(argv[j])) {
1648 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1649 c->argc++;
1650 } else {
1651 sdsfree(argv[j]);
1652 }
1653 }
1654 zfree(argv);
1655 /* Execute the command. If the client is still valid
1656 * after processCommand() return and there is something
1657 * on the query buffer try to process the next command. */
1658 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1659 return;
1660 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1661 redisLog(REDIS_DEBUG, "Client protocol error");
1662 freeClient(c);
1663 return;
1664 }
1665 } else {
1666 /* Bulk read handling. Note that if we are at this point
1667 the client already sent a command terminated with a newline,
1668 we are reading the bulk data that is actually the last
1669 argument of the command. */
1670 int qbl = sdslen(c->querybuf);
1671
1672 if (c->bulklen <= qbl) {
1673 /* Copy everything but the final CRLF as final argument */
1674 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1675 c->argc++;
1676 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1677 /* Process the command. If the client is still valid after
1678 * the processing and there is more data in the buffer
1679 * try to parse it. */
1680 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1681 return;
1682 }
1683 }
1684 }
1685
1686 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1687 redisClient *c = (redisClient*) privdata;
1688 char buf[REDIS_IOBUF_LEN];
1689 int nread;
1690 REDIS_NOTUSED(el);
1691 REDIS_NOTUSED(mask);
1692
1693 nread = read(fd, buf, REDIS_IOBUF_LEN);
1694 if (nread == -1) {
1695 if (errno == EAGAIN) {
1696 nread = 0;
1697 } else {
1698 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1699 freeClient(c);
1700 return;
1701 }
1702 } else if (nread == 0) {
1703 redisLog(REDIS_DEBUG, "Client closed connection");
1704 freeClient(c);
1705 return;
1706 }
1707 if (nread) {
1708 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1709 c->lastinteraction = time(NULL);
1710 } else {
1711 return;
1712 }
1713 processInputBuffer(c);
1714 }
1715
1716 static int selectDb(redisClient *c, int id) {
1717 if (id < 0 || id >= server.dbnum)
1718 return REDIS_ERR;
1719 c->db = &server.db[id];
1720 return REDIS_OK;
1721 }
1722
1723 static void *dupClientReplyValue(void *o) {
1724 incrRefCount((robj*)o);
1725 return 0;
1726 }
1727
1728 static redisClient *createClient(int fd) {
1729 redisClient *c = zmalloc(sizeof(*c));
1730
1731 anetNonBlock(NULL,fd);
1732 anetTcpNoDelay(NULL,fd);
1733 if (!c) return NULL;
1734 selectDb(c,0);
1735 c->fd = fd;
1736 c->querybuf = sdsempty();
1737 c->argc = 0;
1738 c->argv = NULL;
1739 c->bulklen = -1;
1740 c->multibulk = 0;
1741 c->mbargc = 0;
1742 c->mbargv = NULL;
1743 c->sentlen = 0;
1744 c->flags = 0;
1745 c->lastinteraction = time(NULL);
1746 c->authenticated = 0;
1747 c->replstate = REDIS_REPL_NONE;
1748 c->reply = listCreate();
1749 listSetFreeMethod(c->reply,decrRefCount);
1750 listSetDupMethod(c->reply,dupClientReplyValue);
1751 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1752 readQueryFromClient, c, NULL) == AE_ERR) {
1753 freeClient(c);
1754 return NULL;
1755 }
1756 listAddNodeTail(server.clients,c);
1757 return c;
1758 }
1759
1760 static void addReply(redisClient *c, robj *obj) {
1761 if (listLength(c->reply) == 0 &&
1762 (c->replstate == REDIS_REPL_NONE ||
1763 c->replstate == REDIS_REPL_ONLINE) &&
1764 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1765 sendReplyToClient, c, NULL) == AE_ERR) return;
1766 if (obj->encoding != REDIS_ENCODING_RAW) {
1767 obj = getDecodedObject(obj);
1768 } else {
1769 incrRefCount(obj);
1770 }
1771 listAddNodeTail(c->reply,obj);
1772 }
1773
1774 static void addReplySds(redisClient *c, sds s) {
1775 robj *o = createObject(REDIS_STRING,s);
1776 addReply(c,o);
1777 decrRefCount(o);
1778 }
1779
1780 static void addReplyBulkLen(redisClient *c, robj *obj) {
1781 size_t len;
1782
1783 if (obj->encoding == REDIS_ENCODING_RAW) {
1784 len = sdslen(obj->ptr);
1785 } else {
1786 long n = (long)obj->ptr;
1787
1788 len = 1;
1789 if (n < 0) {
1790 len++;
1791 n = -n;
1792 }
1793 while((n = n/10) != 0) {
1794 len++;
1795 }
1796 }
1797 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1798 }
1799
1800 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1801 int cport, cfd;
1802 char cip[128];
1803 redisClient *c;
1804 REDIS_NOTUSED(el);
1805 REDIS_NOTUSED(mask);
1806 REDIS_NOTUSED(privdata);
1807
1808 cfd = anetAccept(server.neterr, fd, cip, &cport);
1809 if (cfd == AE_ERR) {
1810 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1811 return;
1812 }
1813 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1814 if ((c = createClient(cfd)) == NULL) {
1815 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1816 close(cfd); /* May be already closed, just ingore errors */
1817 return;
1818 }
1819 /* If maxclient directive is set and this is one client more... close the
1820 * connection. Note that we create the client instead to check before
1821 * for this condition, since now the socket is already set in nonblocking
1822 * mode and we can send an error for free using the Kernel I/O */
1823 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1824 char *err = "-ERR max number of clients reached\r\n";
1825
1826 /* That's a best effort error message, don't check write errors */
1827 (void) write(c->fd,err,strlen(err));
1828 freeClient(c);
1829 return;
1830 }
1831 server.stat_numconnections++;
1832 }
1833
1834 /* ======================= Redis objects implementation ===================== */
1835
1836 static robj *createObject(int type, void *ptr) {
1837 robj *o;
1838
1839 if (listLength(server.objfreelist)) {
1840 listNode *head = listFirst(server.objfreelist);
1841 o = listNodeValue(head);
1842 listDelNode(server.objfreelist,head);
1843 } else {
1844 o = zmalloc(sizeof(*o));
1845 }
1846 o->type = type;
1847 o->encoding = REDIS_ENCODING_RAW;
1848 o->ptr = ptr;
1849 o->refcount = 1;
1850 return o;
1851 }
1852
1853 static robj *createStringObject(char *ptr, size_t len) {
1854 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1855 }
1856
1857 static robj *createListObject(void) {
1858 list *l = listCreate();
1859
1860 listSetFreeMethod(l,decrRefCount);
1861 return createObject(REDIS_LIST,l);
1862 }
1863
1864 static robj *createSetObject(void) {
1865 dict *d = dictCreate(&setDictType,NULL);
1866 return createObject(REDIS_SET,d);
1867 }
1868
1869 static robj *createZsetObject(void) {
1870 zset *zs = zmalloc(sizeof(*zs));
1871
1872 zs->dict = dictCreate(&zsetDictType,NULL);
1873 zs->zsl = zslCreate();
1874 return createObject(REDIS_ZSET,zs);
1875 }
1876
1877 static void freeStringObject(robj *o) {
1878 if (o->encoding == REDIS_ENCODING_RAW) {
1879 sdsfree(o->ptr);
1880 }
1881 }
1882
1883 static void freeListObject(robj *o) {
1884 listRelease((list*) o->ptr);
1885 }
1886
1887 static void freeSetObject(robj *o) {
1888 dictRelease((dict*) o->ptr);
1889 }
1890
1891 static void freeZsetObject(robj *o) {
1892 zset *zs = o->ptr;
1893
1894 dictRelease(zs->dict);
1895 zslFree(zs->zsl);
1896 zfree(zs);
1897 }
1898
1899 static void freeHashObject(robj *o) {
1900 dictRelease((dict*) o->ptr);
1901 }
1902
1903 static void incrRefCount(robj *o) {
1904 o->refcount++;
1905 #ifdef DEBUG_REFCOUNT
1906 if (o->type == REDIS_STRING)
1907 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1908 #endif
1909 }
1910
1911 static void decrRefCount(void *obj) {
1912 robj *o = obj;
1913
1914 #ifdef DEBUG_REFCOUNT
1915 if (o->type == REDIS_STRING)
1916 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1917 #endif
1918 if (--(o->refcount) == 0) {
1919 switch(o->type) {
1920 case REDIS_STRING: freeStringObject(o); break;
1921 case REDIS_LIST: freeListObject(o); break;
1922 case REDIS_SET: freeSetObject(o); break;
1923 case REDIS_ZSET: freeZsetObject(o); break;
1924 case REDIS_HASH: freeHashObject(o); break;
1925 default: assert(0 != 0); break;
1926 }
1927 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1928 !listAddNodeHead(server.objfreelist,o))
1929 zfree(o);
1930 }
1931 }
1932
1933 static robj *lookupKey(redisDb *db, robj *key) {
1934 dictEntry *de = dictFind(db->dict,key);
1935 return de ? dictGetEntryVal(de) : NULL;
1936 }
1937
1938 static robj *lookupKeyRead(redisDb *db, robj *key) {
1939 expireIfNeeded(db,key);
1940 return lookupKey(db,key);
1941 }
1942
1943 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1944 deleteIfVolatile(db,key);
1945 return lookupKey(db,key);
1946 }
1947
1948 static int deleteKey(redisDb *db, robj *key) {
1949 int retval;
1950
1951 /* We need to protect key from destruction: after the first dictDelete()
1952 * it may happen that 'key' is no longer valid if we don't increment
1953 * it's count. This may happen when we get the object reference directly
1954 * from the hash table with dictRandomKey() or dict iterators */
1955 incrRefCount(key);
1956 if (dictSize(db->expires)) dictDelete(db->expires,key);
1957 retval = dictDelete(db->dict,key);
1958 decrRefCount(key);
1959
1960 return retval == DICT_OK;
1961 }
1962
1963 /* Try to share an object against the shared objects pool */
1964 static robj *tryObjectSharing(robj *o) {
1965 struct dictEntry *de;
1966 unsigned long c;
1967
1968 if (o == NULL || server.shareobjects == 0) return o;
1969
1970 assert(o->type == REDIS_STRING);
1971 de = dictFind(server.sharingpool,o);
1972 if (de) {
1973 robj *shared = dictGetEntryKey(de);
1974
1975 c = ((unsigned long) dictGetEntryVal(de))+1;
1976 dictGetEntryVal(de) = (void*) c;
1977 incrRefCount(shared);
1978 decrRefCount(o);
1979 return shared;
1980 } else {
1981 /* Here we are using a stream algorihtm: Every time an object is
1982 * shared we increment its count, everytime there is a miss we
1983 * recrement the counter of a random object. If this object reaches
1984 * zero we remove the object and put the current object instead. */
1985 if (dictSize(server.sharingpool) >=
1986 server.sharingpoolsize) {
1987 de = dictGetRandomKey(server.sharingpool);
1988 assert(de != NULL);
1989 c = ((unsigned long) dictGetEntryVal(de))-1;
1990 dictGetEntryVal(de) = (void*) c;
1991 if (c == 0) {
1992 dictDelete(server.sharingpool,de->key);
1993 }
1994 } else {
1995 c = 0; /* If the pool is empty we want to add this object */
1996 }
1997 if (c == 0) {
1998 int retval;
1999
2000 retval = dictAdd(server.sharingpool,o,(void*)1);
2001 assert(retval == DICT_OK);
2002 incrRefCount(o);
2003 }
2004 return o;
2005 }
2006 }
2007
2008 /* Check if the nul-terminated string 's' can be represented by a long
2009 * (that is, is a number that fits into long without any other space or
2010 * character before or after the digits).
2011 *
2012 * If so, the function returns REDIS_OK and *longval is set to the value
2013 * of the number. Otherwise REDIS_ERR is returned */
2014 static int isStringRepresentableAsLong(sds s, long *longval) {
2015 char buf[32], *endptr;
2016 long value;
2017 int slen;
2018
2019 value = strtol(s, &endptr, 10);
2020 if (endptr[0] != '\0') return REDIS_ERR;
2021 slen = snprintf(buf,32,"%ld",value);
2022
2023 /* If the number converted back into a string is not identical
2024 * then it's not possible to encode the string as integer */
2025 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2026 if (longval) *longval = value;
2027 return REDIS_OK;
2028 }
2029
2030 /* Try to encode a string object in order to save space */
2031 static int tryObjectEncoding(robj *o) {
2032 long value;
2033 sds s = o->ptr;
2034
2035 if (o->encoding != REDIS_ENCODING_RAW)
2036 return REDIS_ERR; /* Already encoded */
2037
2038 /* It's not save to encode shared objects: shared objects can be shared
2039 * everywhere in the "object space" of Redis. Encoded objects can only
2040 * appear as "values" (and not, for instance, as keys) */
2041 if (o->refcount > 1) return REDIS_ERR;
2042
2043 /* Currently we try to encode only strings */
2044 assert(o->type == REDIS_STRING);
2045
2046 /* Check if we can represent this string as a long integer */
2047 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2048
2049 /* Ok, this object can be encoded */
2050 o->encoding = REDIS_ENCODING_INT;
2051 sdsfree(o->ptr);
2052 o->ptr = (void*) value;
2053 return REDIS_OK;
2054 }
2055
2056 /* Get a decoded version of an encoded object (returned as a new object) */
2057 static robj *getDecodedObject(const robj *o) {
2058 robj *dec;
2059
2060 assert(o->encoding != REDIS_ENCODING_RAW);
2061 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2062 char buf[32];
2063
2064 snprintf(buf,32,"%ld",(long)o->ptr);
2065 dec = createStringObject(buf,strlen(buf));
2066 return dec;
2067 } else {
2068 assert(1 != 1);
2069 }
2070 }
2071
2072 static int compareStringObjects(robj *a, robj *b) {
2073 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2074
2075 if (a == b) return 0;
2076 if (a->encoding == REDIS_ENCODING_INT && b->encoding == REDIS_ENCODING_INT){
2077 return (long)a->ptr - (long)b->ptr;
2078 } else {
2079 int retval;
2080
2081 incrRefCount(a);
2082 incrRefCount(b);
2083 if (a->encoding != REDIS_ENCODING_RAW) a = getDecodedObject(a);
2084 if (b->encoding != REDIS_ENCODING_RAW) b = getDecodedObject(a);
2085 retval = sdscmp(a->ptr,b->ptr);
2086 decrRefCount(a);
2087 decrRefCount(b);
2088 return retval;
2089 }
2090 }
2091
2092 static size_t stringObjectLen(robj *o) {
2093 assert(o->type == REDIS_STRING);
2094 if (o->encoding == REDIS_ENCODING_RAW) {
2095 return sdslen(o->ptr);
2096 } else {
2097 char buf[32];
2098
2099 return snprintf(buf,32,"%ld",(long)o->ptr);
2100 }
2101 }
2102
2103 /*============================ DB saving/loading ============================ */
2104
2105 static int rdbSaveType(FILE *fp, unsigned char type) {
2106 if (fwrite(&type,1,1,fp) == 0) return -1;
2107 return 0;
2108 }
2109
2110 static int rdbSaveTime(FILE *fp, time_t t) {
2111 int32_t t32 = (int32_t) t;
2112 if (fwrite(&t32,4,1,fp) == 0) return -1;
2113 return 0;
2114 }
2115
2116 /* check rdbLoadLen() comments for more info */
2117 static int rdbSaveLen(FILE *fp, uint32_t len) {
2118 unsigned char buf[2];
2119
2120 if (len < (1<<6)) {
2121 /* Save a 6 bit len */
2122 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2123 if (fwrite(buf,1,1,fp) == 0) return -1;
2124 } else if (len < (1<<14)) {
2125 /* Save a 14 bit len */
2126 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2127 buf[1] = len&0xFF;
2128 if (fwrite(buf,2,1,fp) == 0) return -1;
2129 } else {
2130 /* Save a 32 bit len */
2131 buf[0] = (REDIS_RDB_32BITLEN<<6);
2132 if (fwrite(buf,1,1,fp) == 0) return -1;
2133 len = htonl(len);
2134 if (fwrite(&len,4,1,fp) == 0) return -1;
2135 }
2136 return 0;
2137 }
2138
2139 /* String objects in the form "2391" "-100" without any space and with a
2140 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2141 * encoded as integers to save space */
2142 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2143 long long value;
2144 char *endptr, buf[32];
2145
2146 /* Check if it's possible to encode this value as a number */
2147 value = strtoll(s, &endptr, 10);
2148 if (endptr[0] != '\0') return 0;
2149 snprintf(buf,32,"%lld",value);
2150
2151 /* If the number converted back into a string is not identical
2152 * then it's not possible to encode the string as integer */
2153 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2154
2155 /* Finally check if it fits in our ranges */
2156 if (value >= -(1<<7) && value <= (1<<7)-1) {
2157 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2158 enc[1] = value&0xFF;
2159 return 2;
2160 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2161 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2162 enc[1] = value&0xFF;
2163 enc[2] = (value>>8)&0xFF;
2164 return 3;
2165 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2166 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2167 enc[1] = value&0xFF;
2168 enc[2] = (value>>8)&0xFF;
2169 enc[3] = (value>>16)&0xFF;
2170 enc[4] = (value>>24)&0xFF;
2171 return 5;
2172 } else {
2173 return 0;
2174 }
2175 }
2176
2177 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2178 unsigned int comprlen, outlen;
2179 unsigned char byte;
2180 void *out;
2181
2182 /* We require at least four bytes compression for this to be worth it */
2183 outlen = sdslen(obj->ptr)-4;
2184 if (outlen <= 0) return 0;
2185 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2186 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2187 if (comprlen == 0) {
2188 zfree(out);
2189 return 0;
2190 }
2191 /* Data compressed! Let's save it on disk */
2192 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2193 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2194 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2195 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2196 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2197 zfree(out);
2198 return comprlen;
2199
2200 writeerr:
2201 zfree(out);
2202 return -1;
2203 }
2204
2205 /* Save a string objet as [len][data] on disk. If the object is a string
2206 * representation of an integer value we try to safe it in a special form */
2207 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2208 size_t len;
2209 int enclen;
2210
2211 len = sdslen(obj->ptr);
2212
2213 /* Try integer encoding */
2214 if (len <= 11) {
2215 unsigned char buf[5];
2216 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2217 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2218 return 0;
2219 }
2220 }
2221
2222 /* Try LZF compression - under 20 bytes it's unable to compress even
2223 * aaaaaaaaaaaaaaaaaa so skip it */
2224 if (len > 20) {
2225 int retval;
2226
2227 retval = rdbSaveLzfStringObject(fp,obj);
2228 if (retval == -1) return -1;
2229 if (retval > 0) return 0;
2230 /* retval == 0 means data can't be compressed, save the old way */
2231 }
2232
2233 /* Store verbatim */
2234 if (rdbSaveLen(fp,len) == -1) return -1;
2235 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2236 return 0;
2237 }
2238
2239 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2240 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2241 int retval;
2242 robj *dec;
2243
2244 if (obj->encoding != REDIS_ENCODING_RAW) {
2245 dec = getDecodedObject(obj);
2246 retval = rdbSaveStringObjectRaw(fp,dec);
2247 decrRefCount(dec);
2248 return retval;
2249 } else {
2250 return rdbSaveStringObjectRaw(fp,obj);
2251 }
2252 }
2253
2254 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2255 * 8 bit integer specifing the length of the representation.
2256 * This 8 bit integer has special values in order to specify the following
2257 * conditions:
2258 * 253: not a number
2259 * 254: + inf
2260 * 255: - inf
2261 */
2262 static int rdbSaveDoubleValue(FILE *fp, double val) {
2263 unsigned char buf[128];
2264 int len;
2265
2266 if (isnan(val)) {
2267 buf[0] = 253;
2268 len = 1;
2269 } else if (!isfinite(val)) {
2270 len = 1;
2271 buf[0] = (val < 0) ? 255 : 254;
2272 } else {
2273 snprintf((char*)buf+1,sizeof(buf)-1,"%.16g",val);
2274 buf[0] = strlen((char*)buf);
2275 len = buf[0]+1;
2276 }
2277 if (fwrite(buf,len,1,fp) == 0) return -1;
2278 return 0;
2279 }
2280
2281 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2282 static int rdbSave(char *filename) {
2283 dictIterator *di = NULL;
2284 dictEntry *de;
2285 FILE *fp;
2286 char tmpfile[256];
2287 int j;
2288 time_t now = time(NULL);
2289
2290 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2291 fp = fopen(tmpfile,"w");
2292 if (!fp) {
2293 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2294 return REDIS_ERR;
2295 }
2296 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2297 for (j = 0; j < server.dbnum; j++) {
2298 redisDb *db = server.db+j;
2299 dict *d = db->dict;
2300 if (dictSize(d) == 0) continue;
2301 di = dictGetIterator(d);
2302 if (!di) {
2303 fclose(fp);
2304 return REDIS_ERR;
2305 }
2306
2307 /* Write the SELECT DB opcode */
2308 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2309 if (rdbSaveLen(fp,j) == -1) goto werr;
2310
2311 /* Iterate this DB writing every entry */
2312 while((de = dictNext(di)) != NULL) {
2313 robj *key = dictGetEntryKey(de);
2314 robj *o = dictGetEntryVal(de);
2315 time_t expiretime = getExpire(db,key);
2316
2317 /* Save the expire time */
2318 if (expiretime != -1) {
2319 /* If this key is already expired skip it */
2320 if (expiretime < now) continue;
2321 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2322 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2323 }
2324 /* Save the key and associated value */
2325 if (rdbSaveType(fp,o->type) == -1) goto werr;
2326 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2327 if (o->type == REDIS_STRING) {
2328 /* Save a string value */
2329 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2330 } else if (o->type == REDIS_LIST) {
2331 /* Save a list value */
2332 list *list = o->ptr;
2333 listNode *ln;
2334
2335 listRewind(list);
2336 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2337 while((ln = listYield(list))) {
2338 robj *eleobj = listNodeValue(ln);
2339
2340 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2341 }
2342 } else if (o->type == REDIS_SET) {
2343 /* Save a set value */
2344 dict *set = o->ptr;
2345 dictIterator *di = dictGetIterator(set);
2346 dictEntry *de;
2347
2348 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2349 while((de = dictNext(di)) != NULL) {
2350 robj *eleobj = dictGetEntryKey(de);
2351
2352 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2353 }
2354 dictReleaseIterator(di);
2355 } else {
2356 assert(0 != 0);
2357 }
2358 }
2359 dictReleaseIterator(di);
2360 }
2361 /* EOF opcode */
2362 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2363
2364 /* Make sure data will not remain on the OS's output buffers */
2365 fflush(fp);
2366 fsync(fileno(fp));
2367 fclose(fp);
2368
2369 /* Use RENAME to make sure the DB file is changed atomically only
2370 * if the generate DB file is ok. */
2371 if (rename(tmpfile,filename) == -1) {
2372 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2373 unlink(tmpfile);
2374 return REDIS_ERR;
2375 }
2376 redisLog(REDIS_NOTICE,"DB saved on disk");
2377 server.dirty = 0;
2378 server.lastsave = time(NULL);
2379 return REDIS_OK;
2380
2381 werr:
2382 fclose(fp);
2383 unlink(tmpfile);
2384 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2385 if (di) dictReleaseIterator(di);
2386 return REDIS_ERR;
2387 }
2388
2389 static int rdbSaveBackground(char *filename) {
2390 pid_t childpid;
2391
2392 if (server.bgsaveinprogress) return REDIS_ERR;
2393 if ((childpid = fork()) == 0) {
2394 /* Child */
2395 close(server.fd);
2396 if (rdbSave(filename) == REDIS_OK) {
2397 exit(0);
2398 } else {
2399 exit(1);
2400 }
2401 } else {
2402 /* Parent */
2403 if (childpid == -1) {
2404 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2405 strerror(errno));
2406 return REDIS_ERR;
2407 }
2408 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2409 server.bgsaveinprogress = 1;
2410 server.bgsavechildpid = childpid;
2411 return REDIS_OK;
2412 }
2413 return REDIS_OK; /* unreached */
2414 }
2415
2416 static void rdbRemoveTempFile(pid_t childpid) {
2417 char tmpfile[256];
2418
2419 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2420 unlink(tmpfile);
2421 }
2422
2423 static int rdbLoadType(FILE *fp) {
2424 unsigned char type;
2425 if (fread(&type,1,1,fp) == 0) return -1;
2426 return type;
2427 }
2428
2429 static time_t rdbLoadTime(FILE *fp) {
2430 int32_t t32;
2431 if (fread(&t32,4,1,fp) == 0) return -1;
2432 return (time_t) t32;
2433 }
2434
2435 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2436 * of this file for a description of how this are stored on disk.
2437 *
2438 * isencoded is set to 1 if the readed length is not actually a length but
2439 * an "encoding type", check the above comments for more info */
2440 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2441 unsigned char buf[2];
2442 uint32_t len;
2443
2444 if (isencoded) *isencoded = 0;
2445 if (rdbver == 0) {
2446 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2447 return ntohl(len);
2448 } else {
2449 int type;
2450
2451 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2452 type = (buf[0]&0xC0)>>6;
2453 if (type == REDIS_RDB_6BITLEN) {
2454 /* Read a 6 bit len */
2455 return buf[0]&0x3F;
2456 } else if (type == REDIS_RDB_ENCVAL) {
2457 /* Read a 6 bit len encoding type */
2458 if (isencoded) *isencoded = 1;
2459 return buf[0]&0x3F;
2460 } else if (type == REDIS_RDB_14BITLEN) {
2461 /* Read a 14 bit len */
2462 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2463 return ((buf[0]&0x3F)<<8)|buf[1];
2464 } else {
2465 /* Read a 32 bit len */
2466 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2467 return ntohl(len);
2468 }
2469 }
2470 }
2471
2472 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2473 unsigned char enc[4];
2474 long long val;
2475
2476 if (enctype == REDIS_RDB_ENC_INT8) {
2477 if (fread(enc,1,1,fp) == 0) return NULL;
2478 val = (signed char)enc[0];
2479 } else if (enctype == REDIS_RDB_ENC_INT16) {
2480 uint16_t v;
2481 if (fread(enc,2,1,fp) == 0) return NULL;
2482 v = enc[0]|(enc[1]<<8);
2483 val = (int16_t)v;
2484 } else if (enctype == REDIS_RDB_ENC_INT32) {
2485 uint32_t v;
2486 if (fread(enc,4,1,fp) == 0) return NULL;
2487 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2488 val = (int32_t)v;
2489 } else {
2490 val = 0; /* anti-warning */
2491 assert(0!=0);
2492 }
2493 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2494 }
2495
2496 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2497 unsigned int len, clen;
2498 unsigned char *c = NULL;
2499 sds val = NULL;
2500
2501 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2502 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2503 if ((c = zmalloc(clen)) == NULL) goto err;
2504 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2505 if (fread(c,clen,1,fp) == 0) goto err;
2506 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2507 zfree(c);
2508 return createObject(REDIS_STRING,val);
2509 err:
2510 zfree(c);
2511 sdsfree(val);
2512 return NULL;
2513 }
2514
2515 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2516 int isencoded;
2517 uint32_t len;
2518 sds val;
2519
2520 len = rdbLoadLen(fp,rdbver,&isencoded);
2521 if (isencoded) {
2522 switch(len) {
2523 case REDIS_RDB_ENC_INT8:
2524 case REDIS_RDB_ENC_INT16:
2525 case REDIS_RDB_ENC_INT32:
2526 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2527 case REDIS_RDB_ENC_LZF:
2528 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2529 default:
2530 assert(0!=0);
2531 }
2532 }
2533
2534 if (len == REDIS_RDB_LENERR) return NULL;
2535 val = sdsnewlen(NULL,len);
2536 if (len && fread(val,len,1,fp) == 0) {
2537 sdsfree(val);
2538 return NULL;
2539 }
2540 return tryObjectSharing(createObject(REDIS_STRING,val));
2541 }
2542
2543 /* For information about double serialization check rdbSaveDoubleValue() */
2544 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2545 char buf[128];
2546 unsigned char len;
2547
2548 if (fread(&len,1,1,fp) == 0) return -1;
2549 switch(len) {
2550 case 255: *val = R_NegInf; return 0;
2551 case 254: *val = R_PosInf; return 0;
2552 case 253: *val = R_Nan; return 0;
2553 default:
2554 if (fread(buf,len,1,fp) == 0) return -1;
2555 sscanf(buf, "%lg", val);
2556 return 0;
2557 }
2558 }
2559
2560 static int rdbLoad(char *filename) {
2561 FILE *fp;
2562 robj *keyobj = NULL;
2563 uint32_t dbid;
2564 int type, retval, rdbver;
2565 dict *d = server.db[0].dict;
2566 redisDb *db = server.db+0;
2567 char buf[1024];
2568 time_t expiretime = -1, now = time(NULL);
2569
2570 fp = fopen(filename,"r");
2571 if (!fp) return REDIS_ERR;
2572 if (fread(buf,9,1,fp) == 0) goto eoferr;
2573 buf[9] = '\0';
2574 if (memcmp(buf,"REDIS",5) != 0) {
2575 fclose(fp);
2576 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2577 return REDIS_ERR;
2578 }
2579 rdbver = atoi(buf+5);
2580 if (rdbver > 1) {
2581 fclose(fp);
2582 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2583 return REDIS_ERR;
2584 }
2585 while(1) {
2586 robj *o;
2587
2588 /* Read type. */
2589 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2590 if (type == REDIS_EXPIRETIME) {
2591 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2592 /* We read the time so we need to read the object type again */
2593 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2594 }
2595 if (type == REDIS_EOF) break;
2596 /* Handle SELECT DB opcode as a special case */
2597 if (type == REDIS_SELECTDB) {
2598 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2599 goto eoferr;
2600 if (dbid >= (unsigned)server.dbnum) {
2601 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2602 exit(1);
2603 }
2604 db = server.db+dbid;
2605 d = db->dict;
2606 continue;
2607 }
2608 /* Read key */
2609 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2610
2611 if (type == REDIS_STRING) {
2612 /* Read string value */
2613 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2614 tryObjectEncoding(o);
2615 } else if (type == REDIS_LIST || type == REDIS_SET) {
2616 /* Read list/set value */
2617 uint32_t listlen;
2618
2619 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2620 goto eoferr;
2621 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2622 /* Load every single element of the list/set */
2623 while(listlen--) {
2624 robj *ele;
2625
2626 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2627 tryObjectEncoding(ele);
2628 if (type == REDIS_LIST) {
2629 listAddNodeTail((list*)o->ptr,ele);
2630 } else {
2631 dictAdd((dict*)o->ptr,ele,NULL);
2632 }
2633 }
2634 } else {
2635 assert(0 != 0);
2636 }
2637 /* Add the new object in the hash table */
2638 retval = dictAdd(d,keyobj,o);
2639 if (retval == DICT_ERR) {
2640 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2641 exit(1);
2642 }
2643 /* Set the expire time if needed */
2644 if (expiretime != -1) {
2645 setExpire(db,keyobj,expiretime);
2646 /* Delete this key if already expired */
2647 if (expiretime < now) deleteKey(db,keyobj);
2648 expiretime = -1;
2649 }
2650 keyobj = o = NULL;
2651 }
2652 fclose(fp);
2653 return REDIS_OK;
2654
2655 eoferr: /* unexpected end of file is handled here with a fatal exit */
2656 if (keyobj) decrRefCount(keyobj);
2657 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2658 exit(1);
2659 return REDIS_ERR; /* Just to avoid warning */
2660 }
2661
2662 /*================================== Commands =============================== */
2663
2664 static void authCommand(redisClient *c) {
2665 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2666 c->authenticated = 1;
2667 addReply(c,shared.ok);
2668 } else {
2669 c->authenticated = 0;
2670 addReply(c,shared.err);
2671 }
2672 }
2673
2674 static void pingCommand(redisClient *c) {
2675 addReply(c,shared.pong);
2676 }
2677
2678 static void echoCommand(redisClient *c) {
2679 addReplyBulkLen(c,c->argv[1]);
2680 addReply(c,c->argv[1]);
2681 addReply(c,shared.crlf);
2682 }
2683
2684 /*=================================== Strings =============================== */
2685
2686 static void setGenericCommand(redisClient *c, int nx) {
2687 int retval;
2688
2689 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2690 if (retval == DICT_ERR) {
2691 if (!nx) {
2692 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2693 incrRefCount(c->argv[2]);
2694 } else {
2695 addReply(c,shared.czero);
2696 return;
2697 }
2698 } else {
2699 incrRefCount(c->argv[1]);
2700 incrRefCount(c->argv[2]);
2701 }
2702 server.dirty++;
2703 removeExpire(c->db,c->argv[1]);
2704 addReply(c, nx ? shared.cone : shared.ok);
2705 }
2706
2707 static void setCommand(redisClient *c) {
2708 setGenericCommand(c,0);
2709 }
2710
2711 static void setnxCommand(redisClient *c) {
2712 setGenericCommand(c,1);
2713 }
2714
2715 static void getCommand(redisClient *c) {
2716 robj *o = lookupKeyRead(c->db,c->argv[1]);
2717
2718 if (o == NULL) {
2719 addReply(c,shared.nullbulk);
2720 } else {
2721 if (o->type != REDIS_STRING) {
2722 addReply(c,shared.wrongtypeerr);
2723 } else {
2724 addReplyBulkLen(c,o);
2725 addReply(c,o);
2726 addReply(c,shared.crlf);
2727 }
2728 }
2729 }
2730
2731 static void getsetCommand(redisClient *c) {
2732 getCommand(c);
2733 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2734 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2735 } else {
2736 incrRefCount(c->argv[1]);
2737 }
2738 incrRefCount(c->argv[2]);
2739 server.dirty++;
2740 removeExpire(c->db,c->argv[1]);
2741 }
2742
2743 static void mgetCommand(redisClient *c) {
2744 int j;
2745
2746 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2747 for (j = 1; j < c->argc; j++) {
2748 robj *o = lookupKeyRead(c->db,c->argv[j]);
2749 if (o == NULL) {
2750 addReply(c,shared.nullbulk);
2751 } else {
2752 if (o->type != REDIS_STRING) {
2753 addReply(c,shared.nullbulk);
2754 } else {
2755 addReplyBulkLen(c,o);
2756 addReply(c,o);
2757 addReply(c,shared.crlf);
2758 }
2759 }
2760 }
2761 }
2762
2763 static void incrDecrCommand(redisClient *c, long long incr) {
2764 long long value;
2765 int retval;
2766 robj *o;
2767
2768 o = lookupKeyWrite(c->db,c->argv[1]);
2769 if (o == NULL) {
2770 value = 0;
2771 } else {
2772 if (o->type != REDIS_STRING) {
2773 value = 0;
2774 } else {
2775 char *eptr;
2776
2777 if (o->encoding == REDIS_ENCODING_RAW)
2778 value = strtoll(o->ptr, &eptr, 10);
2779 else if (o->encoding == REDIS_ENCODING_INT)
2780 value = (long)o->ptr;
2781 else
2782 assert(1 != 1);
2783 }
2784 }
2785
2786 value += incr;
2787 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2788 tryObjectEncoding(o);
2789 retval = dictAdd(c->db->dict,c->argv[1],o);
2790 if (retval == DICT_ERR) {
2791 dictReplace(c->db->dict,c->argv[1],o);
2792 removeExpire(c->db,c->argv[1]);
2793 } else {
2794 incrRefCount(c->argv[1]);
2795 }
2796 server.dirty++;
2797 addReply(c,shared.colon);
2798 addReply(c,o);
2799 addReply(c,shared.crlf);
2800 }
2801
2802 static void incrCommand(redisClient *c) {
2803 incrDecrCommand(c,1);
2804 }
2805
2806 static void decrCommand(redisClient *c) {
2807 incrDecrCommand(c,-1);
2808 }
2809
2810 static void incrbyCommand(redisClient *c) {
2811 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2812 incrDecrCommand(c,incr);
2813 }
2814
2815 static void decrbyCommand(redisClient *c) {
2816 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2817 incrDecrCommand(c,-incr);
2818 }
2819
2820 /* ========================= Type agnostic commands ========================= */
2821
2822 static void delCommand(redisClient *c) {
2823 int deleted = 0, j;
2824
2825 for (j = 1; j < c->argc; j++) {
2826 if (deleteKey(c->db,c->argv[j])) {
2827 server.dirty++;
2828 deleted++;
2829 }
2830 }
2831 switch(deleted) {
2832 case 0:
2833 addReply(c,shared.czero);
2834 break;
2835 case 1:
2836 addReply(c,shared.cone);
2837 break;
2838 default:
2839 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2840 break;
2841 }
2842 }
2843
2844 static void existsCommand(redisClient *c) {
2845 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2846 }
2847
2848 static void selectCommand(redisClient *c) {
2849 int id = atoi(c->argv[1]->ptr);
2850
2851 if (selectDb(c,id) == REDIS_ERR) {
2852 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2853 } else {
2854 addReply(c,shared.ok);
2855 }
2856 }
2857
2858 static void randomkeyCommand(redisClient *c) {
2859 dictEntry *de;
2860
2861 while(1) {
2862 de = dictGetRandomKey(c->db->dict);
2863 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2864 }
2865 if (de == NULL) {
2866 addReply(c,shared.plus);
2867 addReply(c,shared.crlf);
2868 } else {
2869 addReply(c,shared.plus);
2870 addReply(c,dictGetEntryKey(de));
2871 addReply(c,shared.crlf);
2872 }
2873 }
2874
2875 static void keysCommand(redisClient *c) {
2876 dictIterator *di;
2877 dictEntry *de;
2878 sds pattern = c->argv[1]->ptr;
2879 int plen = sdslen(pattern);
2880 int numkeys = 0, keyslen = 0;
2881 robj *lenobj = createObject(REDIS_STRING,NULL);
2882
2883 di = dictGetIterator(c->db->dict);
2884 addReply(c,lenobj);
2885 decrRefCount(lenobj);
2886 while((de = dictNext(di)) != NULL) {
2887 robj *keyobj = dictGetEntryKey(de);
2888
2889 sds key = keyobj->ptr;
2890 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2891 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2892 if (expireIfNeeded(c->db,keyobj) == 0) {
2893 if (numkeys != 0)
2894 addReply(c,shared.space);
2895 addReply(c,keyobj);
2896 numkeys++;
2897 keyslen += sdslen(key);
2898 }
2899 }
2900 }
2901 dictReleaseIterator(di);
2902 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2903 addReply(c,shared.crlf);
2904 }
2905
2906 static void dbsizeCommand(redisClient *c) {
2907 addReplySds(c,
2908 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2909 }
2910
2911 static void lastsaveCommand(redisClient *c) {
2912 addReplySds(c,
2913 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2914 }
2915
2916 static void typeCommand(redisClient *c) {
2917 robj *o;
2918 char *type;
2919
2920 o = lookupKeyRead(c->db,c->argv[1]);
2921 if (o == NULL) {
2922 type = "+none";
2923 } else {
2924 switch(o->type) {
2925 case REDIS_STRING: type = "+string"; break;
2926 case REDIS_LIST: type = "+list"; break;
2927 case REDIS_SET: type = "+set"; break;
2928 default: type = "unknown"; break;
2929 }
2930 }
2931 addReplySds(c,sdsnew(type));
2932 addReply(c,shared.crlf);
2933 }
2934
2935 static void saveCommand(redisClient *c) {
2936 if (server.bgsaveinprogress) {
2937 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2938 return;
2939 }
2940 if (rdbSave(server.dbfilename) == REDIS_OK) {
2941 addReply(c,shared.ok);
2942 } else {
2943 addReply(c,shared.err);
2944 }
2945 }
2946
2947 static void bgsaveCommand(redisClient *c) {
2948 if (server.bgsaveinprogress) {
2949 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2950 return;
2951 }
2952 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2953 addReply(c,shared.ok);
2954 } else {
2955 addReply(c,shared.err);
2956 }
2957 }
2958
2959 static void shutdownCommand(redisClient *c) {
2960 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2961 /* Kill the saving child if there is a background saving in progress.
2962 We want to avoid race conditions, for instance our saving child may
2963 overwrite the synchronous saving did by SHUTDOWN. */
2964 if (server.bgsaveinprogress) {
2965 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
2966 kill(server.bgsavechildpid,SIGKILL);
2967 rdbRemoveTempFile(server.bgsavechildpid);
2968 }
2969 /* SYNC SAVE */
2970 if (rdbSave(server.dbfilename) == REDIS_OK) {
2971 if (server.daemonize)
2972 unlink(server.pidfile);
2973 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2974 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2975 exit(1);
2976 } else {
2977 /* Ooops.. error saving! The best we can do is to continue operating.
2978 * Note that if there was a background saving process, in the next
2979 * cron() Redis will be notified that the background saving aborted,
2980 * handling special stuff like slaves pending for synchronization... */
2981 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2982 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2983 }
2984 }
2985
2986 static void renameGenericCommand(redisClient *c, int nx) {
2987 robj *o;
2988
2989 /* To use the same key as src and dst is probably an error */
2990 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2991 addReply(c,shared.sameobjecterr);
2992 return;
2993 }
2994
2995 o = lookupKeyWrite(c->db,c->argv[1]);
2996 if (o == NULL) {
2997 addReply(c,shared.nokeyerr);
2998 return;
2999 }
3000 incrRefCount(o);
3001 deleteIfVolatile(c->db,c->argv[2]);
3002 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3003 if (nx) {
3004 decrRefCount(o);
3005 addReply(c,shared.czero);
3006 return;
3007 }
3008 dictReplace(c->db->dict,c->argv[2],o);
3009 } else {
3010 incrRefCount(c->argv[2]);
3011 }
3012 deleteKey(c->db,c->argv[1]);
3013 server.dirty++;
3014 addReply(c,nx ? shared.cone : shared.ok);
3015 }
3016
3017 static void renameCommand(redisClient *c) {
3018 renameGenericCommand(c,0);
3019 }
3020
3021 static void renamenxCommand(redisClient *c) {
3022 renameGenericCommand(c,1);
3023 }
3024
3025 static void moveCommand(redisClient *c) {
3026 robj *o;
3027 redisDb *src, *dst;
3028 int srcid;
3029
3030 /* Obtain source and target DB pointers */
3031 src = c->db;
3032 srcid = c->db->id;
3033 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3034 addReply(c,shared.outofrangeerr);
3035 return;
3036 }
3037 dst = c->db;
3038 selectDb(c,srcid); /* Back to the source DB */
3039
3040 /* If the user is moving using as target the same
3041 * DB as the source DB it is probably an error. */
3042 if (src == dst) {
3043 addReply(c,shared.sameobjecterr);
3044 return;
3045 }
3046
3047 /* Check if the element exists and get a reference */
3048 o = lookupKeyWrite(c->db,c->argv[1]);
3049 if (!o) {
3050 addReply(c,shared.czero);
3051 return;
3052 }
3053
3054 /* Try to add the element to the target DB */
3055 deleteIfVolatile(dst,c->argv[1]);
3056 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3057 addReply(c,shared.czero);
3058 return;
3059 }
3060 incrRefCount(c->argv[1]);
3061 incrRefCount(o);
3062
3063 /* OK! key moved, free the entry in the source DB */
3064 deleteKey(src,c->argv[1]);
3065 server.dirty++;
3066 addReply(c,shared.cone);
3067 }
3068
3069 /* =================================== Lists ================================ */
3070 static void pushGenericCommand(redisClient *c, int where) {
3071 robj *lobj;
3072 list *list;
3073
3074 lobj = lookupKeyWrite(c->db,c->argv[1]);
3075 if (lobj == NULL) {
3076 lobj = createListObject();
3077 list = lobj->ptr;
3078 if (where == REDIS_HEAD) {
3079 listAddNodeHead(list,c->argv[2]);
3080 } else {
3081 listAddNodeTail(list,c->argv[2]);
3082 }
3083 dictAdd(c->db->dict,c->argv[1],lobj);
3084 incrRefCount(c->argv[1]);
3085 incrRefCount(c->argv[2]);
3086 } else {
3087 if (lobj->type != REDIS_LIST) {
3088 addReply(c,shared.wrongtypeerr);
3089 return;
3090 }
3091 list = lobj->ptr;
3092 if (where == REDIS_HEAD) {
3093 listAddNodeHead(list,c->argv[2]);
3094 } else {
3095 listAddNodeTail(list,c->argv[2]);
3096 }
3097 incrRefCount(c->argv[2]);
3098 }
3099 server.dirty++;
3100 addReply(c,shared.ok);
3101 }
3102
3103 static void lpushCommand(redisClient *c) {
3104 pushGenericCommand(c,REDIS_HEAD);
3105 }
3106
3107 static void rpushCommand(redisClient *c) {
3108 pushGenericCommand(c,REDIS_TAIL);
3109 }
3110
3111 static void llenCommand(redisClient *c) {
3112 robj *o;
3113 list *l;
3114
3115 o = lookupKeyRead(c->db,c->argv[1]);
3116 if (o == NULL) {
3117 addReply(c,shared.czero);
3118 return;
3119 } else {
3120 if (o->type != REDIS_LIST) {
3121 addReply(c,shared.wrongtypeerr);
3122 } else {
3123 l = o->ptr;
3124 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3125 }
3126 }
3127 }
3128
3129 static void lindexCommand(redisClient *c) {
3130 robj *o;
3131 int index = atoi(c->argv[2]->ptr);
3132
3133 o = lookupKeyRead(c->db,c->argv[1]);
3134 if (o == NULL) {
3135 addReply(c,shared.nullbulk);
3136 } else {
3137 if (o->type != REDIS_LIST) {
3138 addReply(c,shared.wrongtypeerr);
3139 } else {
3140 list *list = o->ptr;
3141 listNode *ln;
3142
3143 ln = listIndex(list, index);
3144 if (ln == NULL) {
3145 addReply(c,shared.nullbulk);
3146 } else {
3147 robj *ele = listNodeValue(ln);
3148 addReplyBulkLen(c,ele);
3149 addReply(c,ele);
3150 addReply(c,shared.crlf);
3151 }
3152 }
3153 }
3154 }
3155
3156 static void lsetCommand(redisClient *c) {
3157 robj *o;
3158 int index = atoi(c->argv[2]->ptr);
3159
3160 o = lookupKeyWrite(c->db,c->argv[1]);
3161 if (o == NULL) {
3162 addReply(c,shared.nokeyerr);
3163 } else {
3164 if (o->type != REDIS_LIST) {
3165 addReply(c,shared.wrongtypeerr);
3166 } else {
3167 list *list = o->ptr;
3168 listNode *ln;
3169
3170 ln = listIndex(list, index);
3171 if (ln == NULL) {
3172 addReply(c,shared.outofrangeerr);
3173 } else {
3174 robj *ele = listNodeValue(ln);
3175
3176 decrRefCount(ele);
3177 listNodeValue(ln) = c->argv[3];
3178 incrRefCount(c->argv[3]);
3179 addReply(c,shared.ok);
3180 server.dirty++;
3181 }
3182 }
3183 }
3184 }
3185
3186 static void popGenericCommand(redisClient *c, int where) {
3187 robj *o;
3188
3189 o = lookupKeyWrite(c->db,c->argv[1]);
3190 if (o == NULL) {
3191 addReply(c,shared.nullbulk);
3192 } else {
3193 if (o->type != REDIS_LIST) {
3194 addReply(c,shared.wrongtypeerr);
3195 } else {
3196 list *list = o->ptr;
3197 listNode *ln;
3198
3199 if (where == REDIS_HEAD)
3200 ln = listFirst(list);
3201 else
3202 ln = listLast(list);
3203
3204 if (ln == NULL) {
3205 addReply(c,shared.nullbulk);
3206 } else {
3207 robj *ele = listNodeValue(ln);
3208 addReplyBulkLen(c,ele);
3209 addReply(c,ele);
3210 addReply(c,shared.crlf);
3211 listDelNode(list,ln);
3212 server.dirty++;
3213 }
3214 }
3215 }
3216 }
3217
3218 static void lpopCommand(redisClient *c) {
3219 popGenericCommand(c,REDIS_HEAD);
3220 }
3221
3222 static void rpopCommand(redisClient *c) {
3223 popGenericCommand(c,REDIS_TAIL);
3224 }
3225
3226 static void lrangeCommand(redisClient *c) {
3227 robj *o;
3228 int start = atoi(c->argv[2]->ptr);
3229 int end = atoi(c->argv[3]->ptr);
3230
3231 o = lookupKeyRead(c->db,c->argv[1]);
3232 if (o == NULL) {
3233 addReply(c,shared.nullmultibulk);
3234 } else {
3235 if (o->type != REDIS_LIST) {
3236 addReply(c,shared.wrongtypeerr);
3237 } else {
3238 list *list = o->ptr;
3239 listNode *ln;
3240 int llen = listLength(list);
3241 int rangelen, j;
3242 robj *ele;
3243
3244 /* convert negative indexes */
3245 if (start < 0) start = llen+start;
3246 if (end < 0) end = llen+end;
3247 if (start < 0) start = 0;
3248 if (end < 0) end = 0;
3249
3250 /* indexes sanity checks */
3251 if (start > end || start >= llen) {
3252 /* Out of range start or start > end result in empty list */
3253 addReply(c,shared.emptymultibulk);
3254 return;
3255 }
3256 if (end >= llen) end = llen-1;
3257 rangelen = (end-start)+1;
3258
3259 /* Return the result in form of a multi-bulk reply */
3260 ln = listIndex(list, start);
3261 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3262 for (j = 0; j < rangelen; j++) {
3263 ele = listNodeValue(ln);
3264 addReplyBulkLen(c,ele);
3265 addReply(c,ele);
3266 addReply(c,shared.crlf);
3267 ln = ln->next;
3268 }
3269 }
3270 }
3271 }
3272
3273 static void ltrimCommand(redisClient *c) {
3274 robj *o;
3275 int start = atoi(c->argv[2]->ptr);
3276 int end = atoi(c->argv[3]->ptr);
3277
3278 o = lookupKeyWrite(c->db,c->argv[1]);
3279 if (o == NULL) {
3280 addReply(c,shared.nokeyerr);
3281 } else {
3282 if (o->type != REDIS_LIST) {
3283 addReply(c,shared.wrongtypeerr);
3284 } else {
3285 list *list = o->ptr;
3286 listNode *ln;
3287 int llen = listLength(list);
3288 int j, ltrim, rtrim;
3289
3290 /* convert negative indexes */
3291 if (start < 0) start = llen+start;
3292 if (end < 0) end = llen+end;
3293 if (start < 0) start = 0;
3294 if (end < 0) end = 0;
3295
3296 /* indexes sanity checks */
3297 if (start > end || start >= llen) {
3298 /* Out of range start or start > end result in empty list */
3299 ltrim = llen;
3300 rtrim = 0;
3301 } else {
3302 if (end >= llen) end = llen-1;
3303 ltrim = start;
3304 rtrim = llen-end-1;
3305 }
3306
3307 /* Remove list elements to perform the trim */
3308 for (j = 0; j < ltrim; j++) {
3309 ln = listFirst(list);
3310 listDelNode(list,ln);
3311 }
3312 for (j = 0; j < rtrim; j++) {
3313 ln = listLast(list);
3314 listDelNode(list,ln);
3315 }
3316 server.dirty++;
3317 addReply(c,shared.ok);
3318 }
3319 }
3320 }
3321
3322 static void lremCommand(redisClient *c) {
3323 robj *o;
3324
3325 o = lookupKeyWrite(c->db,c->argv[1]);
3326 if (o == NULL) {
3327 addReply(c,shared.czero);
3328 } else {
3329 if (o->type != REDIS_LIST) {
3330 addReply(c,shared.wrongtypeerr);
3331 } else {
3332 list *list = o->ptr;
3333 listNode *ln, *next;
3334 int toremove = atoi(c->argv[2]->ptr);
3335 int removed = 0;
3336 int fromtail = 0;
3337
3338 if (toremove < 0) {
3339 toremove = -toremove;
3340 fromtail = 1;
3341 }
3342 ln = fromtail ? list->tail : list->head;
3343 while (ln) {
3344 robj *ele = listNodeValue(ln);
3345
3346 next = fromtail ? ln->prev : ln->next;
3347 if (compareStringObjects(ele,c->argv[3]) == 0) {
3348 listDelNode(list,ln);
3349 server.dirty++;
3350 removed++;
3351 if (toremove && removed == toremove) break;
3352 }
3353 ln = next;
3354 }
3355 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3356 }
3357 }
3358 }
3359
3360 /* ==================================== Sets ================================ */
3361
3362 static void saddCommand(redisClient *c) {
3363 robj *set;
3364
3365 set = lookupKeyWrite(c->db,c->argv[1]);
3366 if (set == NULL) {
3367 set = createSetObject();
3368 dictAdd(c->db->dict,c->argv[1],set);
3369 incrRefCount(c->argv[1]);
3370 } else {
3371 if (set->type != REDIS_SET) {
3372 addReply(c,shared.wrongtypeerr);
3373 return;
3374 }
3375 }
3376 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3377 incrRefCount(c->argv[2]);
3378 server.dirty++;
3379 addReply(c,shared.cone);
3380 } else {
3381 addReply(c,shared.czero);
3382 }
3383 }
3384
3385 static void sremCommand(redisClient *c) {
3386 robj *set;
3387
3388 set = lookupKeyWrite(c->db,c->argv[1]);
3389 if (set == NULL) {
3390 addReply(c,shared.czero);
3391 } else {
3392 if (set->type != REDIS_SET) {
3393 addReply(c,shared.wrongtypeerr);
3394 return;
3395 }
3396 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3397 server.dirty++;
3398 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3399 addReply(c,shared.cone);
3400 } else {
3401 addReply(c,shared.czero);
3402 }
3403 }
3404 }
3405
3406 static void smoveCommand(redisClient *c) {
3407 robj *srcset, *dstset;
3408
3409 srcset = lookupKeyWrite(c->db,c->argv[1]);
3410 dstset = lookupKeyWrite(c->db,c->argv[2]);
3411
3412 /* If the source key does not exist return 0, if it's of the wrong type
3413 * raise an error */
3414 if (srcset == NULL || srcset->type != REDIS_SET) {
3415 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3416 return;
3417 }
3418 /* Error if the destination key is not a set as well */
3419 if (dstset && dstset->type != REDIS_SET) {
3420 addReply(c,shared.wrongtypeerr);
3421 return;
3422 }
3423 /* Remove the element from the source set */
3424 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3425 /* Key not found in the src set! return zero */
3426 addReply(c,shared.czero);
3427 return;
3428 }
3429 server.dirty++;
3430 /* Add the element to the destination set */
3431 if (!dstset) {
3432 dstset = createSetObject();
3433 dictAdd(c->db->dict,c->argv[2],dstset);
3434 incrRefCount(c->argv[2]);
3435 }
3436 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3437 incrRefCount(c->argv[3]);
3438 addReply(c,shared.cone);
3439 }
3440
3441 static void sismemberCommand(redisClient *c) {
3442 robj *set;
3443
3444 set = lookupKeyRead(c->db,c->argv[1]);
3445 if (set == NULL) {
3446 addReply(c,shared.czero);
3447 } else {
3448 if (set->type != REDIS_SET) {
3449 addReply(c,shared.wrongtypeerr);
3450 return;
3451 }
3452 if (dictFind(set->ptr,c->argv[2]))
3453 addReply(c,shared.cone);
3454 else
3455 addReply(c,shared.czero);
3456 }
3457 }
3458
3459 static void scardCommand(redisClient *c) {
3460 robj *o;
3461 dict *s;
3462
3463 o = lookupKeyRead(c->db,c->argv[1]);
3464 if (o == NULL) {
3465 addReply(c,shared.czero);
3466 return;
3467 } else {
3468 if (o->type != REDIS_SET) {
3469 addReply(c,shared.wrongtypeerr);
3470 } else {
3471 s = o->ptr;
3472 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3473 dictSize(s)));
3474 }
3475 }
3476 }
3477
3478 static void spopCommand(redisClient *c) {
3479 robj *set;
3480 dictEntry *de;
3481
3482 set = lookupKeyWrite(c->db,c->argv[1]);
3483 if (set == NULL) {
3484 addReply(c,shared.nullbulk);
3485 } else {
3486 if (set->type != REDIS_SET) {
3487 addReply(c,shared.wrongtypeerr);
3488 return;
3489 }
3490 de = dictGetRandomKey(set->ptr);
3491 if (de == NULL) {
3492 addReply(c,shared.nullbulk);
3493 } else {
3494 robj *ele = dictGetEntryKey(de);
3495
3496 addReplyBulkLen(c,ele);
3497 addReply(c,ele);
3498 addReply(c,shared.crlf);
3499 dictDelete(set->ptr,ele);
3500 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3501 server.dirty++;
3502 }
3503 }
3504 }
3505
3506 static void srandmemberCommand(redisClient *c) {
3507 robj *set;
3508 dictEntry *de;
3509
3510 set = lookupKeyRead(c->db,c->argv[1]);
3511 if (set == NULL) {
3512 addReply(c,shared.nullbulk);
3513 } else {
3514 if (set->type != REDIS_SET) {
3515 addReply(c,shared.wrongtypeerr);
3516 return;
3517 }
3518 de = dictGetRandomKey(set->ptr);
3519 if (de == NULL) {
3520 addReply(c,shared.nullbulk);
3521 } else {
3522 robj *ele = dictGetEntryKey(de);
3523
3524 addReplyBulkLen(c,ele);
3525 addReply(c,ele);
3526 addReply(c,shared.crlf);
3527 }
3528 }
3529 }
3530
3531 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3532 dict **d1 = (void*) s1, **d2 = (void*) s2;
3533
3534 return dictSize(*d1)-dictSize(*d2);
3535 }
3536
3537 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3538 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3539 dictIterator *di;
3540 dictEntry *de;
3541 robj *lenobj = NULL, *dstset = NULL;
3542 int j, cardinality = 0;
3543
3544 for (j = 0; j < setsnum; j++) {
3545 robj *setobj;
3546
3547 setobj = dstkey ?
3548 lookupKeyWrite(c->db,setskeys[j]) :
3549 lookupKeyRead(c->db,setskeys[j]);
3550 if (!setobj) {
3551 zfree(dv);
3552 if (dstkey) {
3553 deleteKey(c->db,dstkey);
3554 addReply(c,shared.ok);
3555 } else {
3556 addReply(c,shared.nullmultibulk);
3557 }
3558 return;
3559 }
3560 if (setobj->type != REDIS_SET) {
3561 zfree(dv);
3562 addReply(c,shared.wrongtypeerr);
3563 return;
3564 }
3565 dv[j] = setobj->ptr;
3566 }
3567 /* Sort sets from the smallest to largest, this will improve our
3568 * algorithm's performace */
3569 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3570
3571 /* The first thing we should output is the total number of elements...
3572 * since this is a multi-bulk write, but at this stage we don't know
3573 * the intersection set size, so we use a trick, append an empty object
3574 * to the output list and save the pointer to later modify it with the
3575 * right length */
3576 if (!dstkey) {
3577 lenobj = createObject(REDIS_STRING,NULL);
3578 addReply(c,lenobj);
3579 decrRefCount(lenobj);
3580 } else {
3581 /* If we have a target key where to store the resulting set
3582 * create this key with an empty set inside */
3583 dstset = createSetObject();
3584 }
3585
3586 /* Iterate all the elements of the first (smallest) set, and test
3587 * the element against all the other sets, if at least one set does
3588 * not include the element it is discarded */
3589 di = dictGetIterator(dv[0]);
3590
3591 while((de = dictNext(di)) != NULL) {
3592 robj *ele;
3593
3594 for (j = 1; j < setsnum; j++)
3595 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3596 if (j != setsnum)
3597 continue; /* at least one set does not contain the member */
3598 ele = dictGetEntryKey(de);
3599 if (!dstkey) {
3600 addReplyBulkLen(c,ele);
3601 addReply(c,ele);
3602 addReply(c,shared.crlf);
3603 cardinality++;
3604 } else {
3605 dictAdd(dstset->ptr,ele,NULL);
3606 incrRefCount(ele);
3607 }
3608 }
3609 dictReleaseIterator(di);
3610
3611 if (dstkey) {
3612 /* Store the resulting set into the target */
3613 deleteKey(c->db,dstkey);
3614 dictAdd(c->db->dict,dstkey,dstset);
3615 incrRefCount(dstkey);
3616 }
3617
3618 if (!dstkey) {
3619 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3620 } else {
3621 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3622 dictSize((dict*)dstset->ptr)));
3623 server.dirty++;
3624 }
3625 zfree(dv);
3626 }
3627
3628 static void sinterCommand(redisClient *c) {
3629 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3630 }
3631
3632 static void sinterstoreCommand(redisClient *c) {
3633 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3634 }
3635
3636 #define REDIS_OP_UNION 0
3637 #define REDIS_OP_DIFF 1
3638
3639 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3640 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3641 dictIterator *di;
3642 dictEntry *de;
3643 robj *dstset = NULL;
3644 int j, cardinality = 0;
3645
3646 for (j = 0; j < setsnum; j++) {
3647 robj *setobj;
3648
3649 setobj = dstkey ?
3650 lookupKeyWrite(c->db,setskeys[j]) :
3651 lookupKeyRead(c->db,setskeys[j]);
3652 if (!setobj) {
3653 dv[j] = NULL;
3654 continue;
3655 }
3656 if (setobj->type != REDIS_SET) {
3657 zfree(dv);
3658 addReply(c,shared.wrongtypeerr);
3659 return;
3660 }
3661 dv[j] = setobj->ptr;
3662 }
3663
3664 /* We need a temp set object to store our union. If the dstkey
3665 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3666 * this set object will be the resulting object to set into the target key*/
3667 dstset = createSetObject();
3668
3669 /* Iterate all the elements of all the sets, add every element a single
3670 * time to the result set */
3671 for (j = 0; j < setsnum; j++) {
3672 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3673 if (!dv[j]) continue; /* non existing keys are like empty sets */
3674
3675 di = dictGetIterator(dv[j]);
3676
3677 while((de = dictNext(di)) != NULL) {
3678 robj *ele;
3679
3680 /* dictAdd will not add the same element multiple times */
3681 ele = dictGetEntryKey(de);
3682 if (op == REDIS_OP_UNION || j == 0) {
3683 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3684 incrRefCount(ele);
3685 cardinality++;
3686 }
3687 } else if (op == REDIS_OP_DIFF) {
3688 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3689 cardinality--;
3690 }
3691 }
3692 }
3693 dictReleaseIterator(di);
3694
3695 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3696 }
3697
3698 /* Output the content of the resulting set, if not in STORE mode */
3699 if (!dstkey) {
3700 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3701 di = dictGetIterator(dstset->ptr);
3702 while((de = dictNext(di)) != NULL) {
3703 robj *ele;
3704
3705 ele = dictGetEntryKey(de);
3706 addReplyBulkLen(c,ele);
3707 addReply(c,ele);
3708 addReply(c,shared.crlf);
3709 }
3710 dictReleaseIterator(di);
3711 } else {
3712 /* If we have a target key where to store the resulting set
3713 * create this key with the result set inside */
3714 deleteKey(c->db,dstkey);
3715 dictAdd(c->db->dict,dstkey,dstset);
3716 incrRefCount(dstkey);
3717 }
3718
3719 /* Cleanup */
3720 if (!dstkey) {
3721 decrRefCount(dstset);
3722 } else {
3723 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3724 dictSize((dict*)dstset->ptr)));
3725 server.dirty++;
3726 }
3727 zfree(dv);
3728 }
3729
3730 static void sunionCommand(redisClient *c) {
3731 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3732 }
3733
3734 static void sunionstoreCommand(redisClient *c) {
3735 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3736 }
3737
3738 static void sdiffCommand(redisClient *c) {
3739 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3740 }
3741
3742 static void sdiffstoreCommand(redisClient *c) {
3743 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3744 }
3745
3746 /* ==================================== ZSets =============================== */
3747
3748 /* ZSETs are ordered sets using two data structures to hold the same elements
3749 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3750 * data structure.
3751 *
3752 * The elements are added to an hash table mapping Redis objects to scores.
3753 * At the same time the elements are added to a skip list mapping scores
3754 * to Redis objects (so objects are sorted by scores in this "view"). */
3755
3756 /* This skiplist implementation is almost a C translation of the original
3757 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3758 * Alternative to Balanced Trees", modified in three ways:
3759 * a) this implementation allows for repeated values.
3760 * b) the comparison is not just by key (our 'score') but by satellite data.
3761 * c) there is a back pointer, so it's a doubly linked list with the back
3762 * pointers being only at "level 1". This allows to traverse the list
3763 * from tail to head, useful for ZREVRANGE. */
3764
3765 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3766 zskiplistNode *zn = zmalloc(sizeof(*zn));
3767
3768 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3769 zn->score = score;
3770 zn->obj = obj;
3771 return zn;
3772 }
3773
3774 static zskiplist *zslCreate(void) {
3775 int j;
3776 zskiplist *zsl;
3777
3778 zsl = zmalloc(sizeof(*zsl));
3779 zsl->level = 1;
3780 zsl->length = 0;
3781 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3782 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3783 zsl->header->forward[j] = NULL;
3784 zsl->header->backward = NULL;
3785 zsl->tail = NULL;
3786 return zsl;
3787 }
3788
3789 static void zslFreeNode(zskiplistNode *node) {
3790 decrRefCount(node->obj);
3791 zfree(node->forward);
3792 zfree(node);
3793 }
3794
3795 static void zslFree(zskiplist *zsl) {
3796 zskiplistNode *node = zsl->header->forward[0], *next;
3797
3798 zfree(zsl->header->forward);
3799 zfree(zsl->header);
3800 while(node) {
3801 next = node->forward[0];
3802 zslFreeNode(node);
3803 node = next;
3804 }
3805 zfree(zsl);
3806 }
3807
3808 static int zslRandomLevel(void) {
3809 int level = 1;
3810 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3811 level += 1;
3812 return level;
3813 }
3814
3815 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3816 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3817 int i, level;
3818
3819 x = zsl->header;
3820 for (i = zsl->level-1; i >= 0; i--) {
3821 while (x->forward[i] && x->forward[i]->score < score)
3822 x = x->forward[i];
3823 update[i] = x;
3824 }
3825 /* we assume the key is not already inside, since we allow duplicated
3826 * scores, and the re-insertion of score and redis object should never
3827 * happpen since the caller of zslInsert() should test in the hash table
3828 * if the element is already inside or not. */
3829 level = zslRandomLevel();
3830 if (level > zsl->level) {
3831 for (i = zsl->level; i < level; i++)
3832 update[i] = zsl->header;
3833 zsl->level = level;
3834 }
3835 x = zslCreateNode(level,score,obj);
3836 for (i = 0; i < level; i++) {
3837 x->forward[i] = update[i]->forward[i];
3838 update[i]->forward[i] = x;
3839 }
3840 x->backward = (update[0] == zsl->header) ? NULL : update[0];
3841 if (x->forward[0])
3842 x->forward[0]->backward = x;
3843 else
3844 zsl->tail = x;
3845 zsl->length++;
3846 }
3847
3848 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
3849 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3850 int i;
3851
3852 x = zsl->header;
3853 for (i = zsl->level-1; i >= 0; i--) {
3854 while (x->forward[i] && x->forward[i]->score < score)
3855 x = x->forward[i];
3856 update[i] = x;
3857 }
3858 /* We may have multiple elements with the same score, what we need
3859 * is to find the element with both the right score and object. */
3860 x = x->forward[0];
3861 while(x->score == score) {
3862 if (compareStringObjects(x->obj,obj) == 0) {
3863 for (i = 0; i < zsl->level; i++) {
3864 if (update[i]->forward[i] != x) break;
3865 update[i]->forward[i] = x->forward[i];
3866 }
3867 if (x->forward[0]) {
3868 x->forward[0]->backward = (x->backward == zsl->header) ?
3869 NULL : x->backward;
3870 } else {
3871 zsl->tail = x->backward;
3872 }
3873 zslFreeNode(x);
3874 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
3875 zsl->level--;
3876 zsl->length--;
3877 return 1;
3878 } else {
3879 x = x->forward[0];
3880 if (!x) return 0; /* end of the list reached, not found */
3881 }
3882 }
3883 return 0; /* not found */
3884 }
3885
3886 /* The actual Z-commands implementations */
3887
3888 static void zaddCommand(redisClient *c) {
3889 robj *zsetobj;
3890 zset *zs;
3891 double *score;
3892
3893 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
3894 if (zsetobj == NULL) {
3895 zsetobj = createZsetObject();
3896 dictAdd(c->db->dict,c->argv[1],zsetobj);
3897 incrRefCount(c->argv[1]);
3898 } else {
3899 if (zsetobj->type != REDIS_ZSET) {
3900 addReply(c,shared.wrongtypeerr);
3901 return;
3902 }
3903 }
3904 score = zmalloc(sizeof(double));
3905 *score = strtod(c->argv[2]->ptr,NULL);
3906 zs = zsetobj->ptr;
3907 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
3908 /* case 1: New element */
3909 incrRefCount(c->argv[3]); /* added to hash */
3910 zslInsert(zs->zsl,*score,c->argv[3]);
3911 incrRefCount(c->argv[3]); /* added to skiplist */
3912 server.dirty++;
3913 addReply(c,shared.cone);
3914 } else {
3915 dictEntry *de;
3916 double *oldscore;
3917
3918 /* case 2: Score update operation */
3919 de = dictFind(zs->dict,c->argv[3]);
3920 assert(de != NULL);
3921 oldscore = dictGetEntryVal(de);
3922 if (*score != *oldscore) {
3923 int deleted;
3924
3925 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
3926 assert(deleted != 0);
3927 zslInsert(zs->zsl,*score,c->argv[3]);
3928 incrRefCount(c->argv[3]);
3929 dictReplace(zs->dict,c->argv[3],score);
3930 server.dirty++;
3931 } else {
3932 zfree(score);
3933 }
3934 addReply(c,shared.czero);
3935 }
3936 }
3937
3938 static void zremCommand(redisClient *c) {
3939 robj *zsetobj;
3940 zset *zs;
3941
3942 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
3943 if (zsetobj == NULL) {
3944 addReply(c,shared.czero);
3945 } else {
3946 dictEntry *de;
3947 double *oldscore;
3948 int deleted;
3949
3950 if (zsetobj->type != REDIS_ZSET) {
3951 addReply(c,shared.wrongtypeerr);
3952 return;
3953 }
3954 zs = zsetobj->ptr;
3955 de = dictFind(zs->dict,c->argv[2]);
3956 if (de == NULL) {
3957 addReply(c,shared.czero);
3958 return;
3959 }
3960 /* Delete from the skiplist */
3961 oldscore = dictGetEntryVal(de);
3962 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
3963 assert(deleted != 0);
3964
3965 /* Delete from the hash table */
3966 dictDelete(zs->dict,c->argv[2]);
3967 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
3968 server.dirty++;
3969 addReply(c,shared.cone);
3970 }
3971 }
3972
3973 static void zrangeGenericCommand(redisClient *c, int reverse) {
3974 robj *o;
3975 int start = atoi(c->argv[2]->ptr);
3976 int end = atoi(c->argv[3]->ptr);
3977
3978 o = lookupKeyRead(c->db,c->argv[1]);
3979 if (o == NULL) {
3980 addReply(c,shared.nullmultibulk);
3981 } else {
3982 if (o->type != REDIS_ZSET) {
3983 addReply(c,shared.wrongtypeerr);
3984 } else {
3985 zset *zsetobj = o->ptr;
3986 zskiplist *zsl = zsetobj->zsl;
3987 zskiplistNode *ln;
3988
3989 int llen = zsl->length;
3990 int rangelen, j;
3991 robj *ele;
3992
3993 /* convert negative indexes */
3994 if (start < 0) start = llen+start;
3995 if (end < 0) end = llen+end;
3996 if (start < 0) start = 0;
3997 if (end < 0) end = 0;
3998
3999 /* indexes sanity checks */
4000 if (start > end || start >= llen) {
4001 /* Out of range start or start > end result in empty list */
4002 addReply(c,shared.emptymultibulk);
4003 return;
4004 }
4005 if (end >= llen) end = llen-1;
4006 rangelen = (end-start)+1;
4007
4008 /* Return the result in form of a multi-bulk reply */
4009 if (reverse) {
4010 ln = zsl->tail;
4011 while (start--)
4012 ln = ln->backward;
4013 } else {
4014 ln = zsl->header->forward[0];
4015 while (start--)
4016 ln = ln->forward[0];
4017 }
4018
4019 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4020 for (j = 0; j < rangelen; j++) {
4021 ele = ln->obj;
4022 addReplyBulkLen(c,ele);
4023 addReply(c,ele);
4024 addReply(c,shared.crlf);
4025 ln = reverse ? ln->backward : ln->forward[0];
4026 }
4027 }
4028 }
4029 }
4030
4031 static void zrangeCommand(redisClient *c) {
4032 zrangeGenericCommand(c,0);
4033 }
4034
4035 static void zrevrangeCommand(redisClient *c) {
4036 zrangeGenericCommand(c,1);
4037 }
4038
4039 static void zlenCommand(redisClient *c) {
4040 robj *o;
4041 zset *zs;
4042
4043 o = lookupKeyRead(c->db,c->argv[1]);
4044 if (o == NULL) {
4045 addReply(c,shared.czero);
4046 return;
4047 } else {
4048 if (o->type != REDIS_ZSET) {
4049 addReply(c,shared.wrongtypeerr);
4050 } else {
4051 zs = o->ptr;
4052 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4053 }
4054 }
4055 }
4056
4057 /* ========================= Non type-specific commands ==================== */
4058
4059 static void flushdbCommand(redisClient *c) {
4060 server.dirty += dictSize(c->db->dict);
4061 dictEmpty(c->db->dict);
4062 dictEmpty(c->db->expires);
4063 addReply(c,shared.ok);
4064 }
4065
4066 static void flushallCommand(redisClient *c) {
4067 server.dirty += emptyDb();
4068 addReply(c,shared.ok);
4069 rdbSave(server.dbfilename);
4070 server.dirty++;
4071 }
4072
4073 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4074 redisSortOperation *so = zmalloc(sizeof(*so));
4075 so->type = type;
4076 so->pattern = pattern;
4077 return so;
4078 }
4079
4080 /* Return the value associated to the key with a name obtained
4081 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4082 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4083 char *p;
4084 sds spat, ssub;
4085 robj keyobj;
4086 int prefixlen, sublen, postfixlen;
4087 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4088 struct {
4089 long len;
4090 long free;
4091 char buf[REDIS_SORTKEY_MAX+1];
4092 } keyname;
4093
4094 if (subst->encoding == REDIS_ENCODING_RAW)
4095 incrRefCount(subst);
4096 else {
4097 subst = getDecodedObject(subst);
4098 }
4099
4100 spat = pattern->ptr;
4101 ssub = subst->ptr;
4102 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4103 p = strchr(spat,'*');
4104 if (!p) return NULL;
4105
4106 prefixlen = p-spat;
4107 sublen = sdslen(ssub);
4108 postfixlen = sdslen(spat)-(prefixlen+1);
4109 memcpy(keyname.buf,spat,prefixlen);
4110 memcpy(keyname.buf+prefixlen,ssub,sublen);
4111 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4112 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4113 keyname.len = prefixlen+sublen+postfixlen;
4114
4115 keyobj.refcount = 1;
4116 keyobj.type = REDIS_STRING;
4117 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4118
4119 decrRefCount(subst);
4120
4121 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4122 return lookupKeyRead(db,&keyobj);
4123 }
4124
4125 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4126 * the additional parameter is not standard but a BSD-specific we have to
4127 * pass sorting parameters via the global 'server' structure */
4128 static int sortCompare(const void *s1, const void *s2) {
4129 const redisSortObject *so1 = s1, *so2 = s2;
4130 int cmp;
4131
4132 if (!server.sort_alpha) {
4133 /* Numeric sorting. Here it's trivial as we precomputed scores */
4134 if (so1->u.score > so2->u.score) {
4135 cmp = 1;
4136 } else if (so1->u.score < so2->u.score) {
4137 cmp = -1;
4138 } else {
4139 cmp = 0;
4140 }
4141 } else {
4142 /* Alphanumeric sorting */
4143 if (server.sort_bypattern) {
4144 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4145 /* At least one compare object is NULL */
4146 if (so1->u.cmpobj == so2->u.cmpobj)
4147 cmp = 0;
4148 else if (so1->u.cmpobj == NULL)
4149 cmp = -1;
4150 else
4151 cmp = 1;
4152 } else {
4153 /* We have both the objects, use strcoll */
4154 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4155 }
4156 } else {
4157 /* Compare elements directly */
4158 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4159 so2->obj->encoding == REDIS_ENCODING_RAW) {
4160 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4161 } else {
4162 robj *dec1, *dec2;
4163
4164 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4165 so1->obj : getDecodedObject(so1->obj);
4166 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4167 so2->obj : getDecodedObject(so2->obj);
4168 cmp = strcoll(dec1->ptr,dec2->ptr);
4169 if (dec1 != so1->obj) decrRefCount(dec1);
4170 if (dec2 != so2->obj) decrRefCount(dec2);
4171 }
4172 }
4173 }
4174 return server.sort_desc ? -cmp : cmp;
4175 }
4176
4177 /* The SORT command is the most complex command in Redis. Warning: this code
4178 * is optimized for speed and a bit less for readability */
4179 static void sortCommand(redisClient *c) {
4180 list *operations;
4181 int outputlen = 0;
4182 int desc = 0, alpha = 0;
4183 int limit_start = 0, limit_count = -1, start, end;
4184 int j, dontsort = 0, vectorlen;
4185 int getop = 0; /* GET operation counter */
4186 robj *sortval, *sortby = NULL;
4187 redisSortObject *vector; /* Resulting vector to sort */
4188
4189 /* Lookup the key to sort. It must be of the right types */
4190 sortval = lookupKeyRead(c->db,c->argv[1]);
4191 if (sortval == NULL) {
4192 addReply(c,shared.nokeyerr);
4193 return;
4194 }
4195 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4196 addReply(c,shared.wrongtypeerr);
4197 return;
4198 }
4199
4200 /* Create a list of operations to perform for every sorted element.
4201 * Operations can be GET/DEL/INCR/DECR */
4202 operations = listCreate();
4203 listSetFreeMethod(operations,zfree);
4204 j = 2;
4205
4206 /* Now we need to protect sortval incrementing its count, in the future
4207 * SORT may have options able to overwrite/delete keys during the sorting
4208 * and the sorted key itself may get destroied */
4209 incrRefCount(sortval);
4210
4211 /* The SORT command has an SQL-alike syntax, parse it */
4212 while(j < c->argc) {
4213 int leftargs = c->argc-j-1;
4214 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4215 desc = 0;
4216 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4217 desc = 1;
4218 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4219 alpha = 1;
4220 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4221 limit_start = atoi(c->argv[j+1]->ptr);
4222 limit_count = atoi(c->argv[j+2]->ptr);
4223 j+=2;
4224 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4225 sortby = c->argv[j+1];
4226 /* If the BY pattern does not contain '*', i.e. it is constant,
4227 * we don't need to sort nor to lookup the weight keys. */
4228 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4229 j++;
4230 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4231 listAddNodeTail(operations,createSortOperation(
4232 REDIS_SORT_GET,c->argv[j+1]));
4233 getop++;
4234 j++;
4235 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
4236 listAddNodeTail(operations,createSortOperation(
4237 REDIS_SORT_DEL,c->argv[j+1]));
4238 j++;
4239 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
4240 listAddNodeTail(operations,createSortOperation(
4241 REDIS_SORT_INCR,c->argv[j+1]));
4242 j++;
4243 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4244 listAddNodeTail(operations,createSortOperation(
4245 REDIS_SORT_DECR,c->argv[j+1]));
4246 j++;
4247 } else {
4248 decrRefCount(sortval);
4249 listRelease(operations);
4250 addReply(c,shared.syntaxerr);
4251 return;
4252 }
4253 j++;
4254 }
4255
4256 /* Load the sorting vector with all the objects to sort */
4257 vectorlen = (sortval->type == REDIS_LIST) ?
4258 listLength((list*)sortval->ptr) :
4259 dictSize((dict*)sortval->ptr);
4260 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4261 j = 0;
4262 if (sortval->type == REDIS_LIST) {
4263 list *list = sortval->ptr;
4264 listNode *ln;
4265
4266 listRewind(list);
4267 while((ln = listYield(list))) {
4268 robj *ele = ln->value;
4269 vector[j].obj = ele;
4270 vector[j].u.score = 0;
4271 vector[j].u.cmpobj = NULL;
4272 j++;
4273 }
4274 } else {
4275 dict *set = sortval->ptr;
4276 dictIterator *di;
4277 dictEntry *setele;
4278
4279 di = dictGetIterator(set);
4280 while((setele = dictNext(di)) != NULL) {
4281 vector[j].obj = dictGetEntryKey(setele);
4282 vector[j].u.score = 0;
4283 vector[j].u.cmpobj = NULL;
4284 j++;
4285 }
4286 dictReleaseIterator(di);
4287 }
4288 assert(j == vectorlen);
4289
4290 /* Now it's time to load the right scores in the sorting vector */
4291 if (dontsort == 0) {
4292 for (j = 0; j < vectorlen; j++) {
4293 if (sortby) {
4294 robj *byval;
4295
4296 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4297 if (!byval || byval->type != REDIS_STRING) continue;
4298 if (alpha) {
4299 if (byval->encoding == REDIS_ENCODING_RAW) {
4300 vector[j].u.cmpobj = byval;
4301 incrRefCount(byval);
4302 } else {
4303 vector[j].u.cmpobj = getDecodedObject(byval);
4304 }
4305 } else {
4306 if (byval->encoding == REDIS_ENCODING_RAW) {
4307 vector[j].u.score = strtod(byval->ptr,NULL);
4308 } else {
4309 if (byval->encoding == REDIS_ENCODING_INT) {
4310 vector[j].u.score = (long)byval->ptr;
4311 } else
4312 assert(1 != 1);
4313 }
4314 }
4315 } else {
4316 if (!alpha) {
4317 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4318 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4319 else {
4320 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4321 vector[j].u.score = (long) vector[j].obj->ptr;
4322 else
4323 assert(1 != 1);
4324 }
4325 }
4326 }
4327 }
4328 }
4329
4330 /* We are ready to sort the vector... perform a bit of sanity check
4331 * on the LIMIT option too. We'll use a partial version of quicksort. */
4332 start = (limit_start < 0) ? 0 : limit_start;
4333 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4334 if (start >= vectorlen) {
4335 start = vectorlen-1;
4336 end = vectorlen-2;
4337 }
4338 if (end >= vectorlen) end = vectorlen-1;
4339
4340 if (dontsort == 0) {
4341 server.sort_desc = desc;
4342 server.sort_alpha = alpha;
4343 server.sort_bypattern = sortby ? 1 : 0;
4344 if (sortby && (start != 0 || end != vectorlen-1))
4345 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4346 else
4347 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4348 }
4349
4350 /* Send command output to the output buffer, performing the specified
4351 * GET/DEL/INCR/DECR operations if any. */
4352 outputlen = getop ? getop*(end-start+1) : end-start+1;
4353 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4354 for (j = start; j <= end; j++) {
4355 listNode *ln;
4356 if (!getop) {
4357 addReplyBulkLen(c,vector[j].obj);
4358 addReply(c,vector[j].obj);
4359 addReply(c,shared.crlf);
4360 }
4361 listRewind(operations);
4362 while((ln = listYield(operations))) {
4363 redisSortOperation *sop = ln->value;
4364 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4365 vector[j].obj);
4366
4367 if (sop->type == REDIS_SORT_GET) {
4368 if (!val || val->type != REDIS_STRING) {
4369 addReply(c,shared.nullbulk);
4370 } else {
4371 addReplyBulkLen(c,val);
4372 addReply(c,val);
4373 addReply(c,shared.crlf);
4374 }
4375 } else if (sop->type == REDIS_SORT_DEL) {
4376 /* TODO */
4377 }
4378 }
4379 }
4380
4381 /* Cleanup */
4382 decrRefCount(sortval);
4383 listRelease(operations);
4384 for (j = 0; j < vectorlen; j++) {
4385 if (sortby && alpha && vector[j].u.cmpobj)
4386 decrRefCount(vector[j].u.cmpobj);
4387 }
4388 zfree(vector);
4389 }
4390
4391 static void infoCommand(redisClient *c) {
4392 sds info;
4393 time_t uptime = time(NULL)-server.stat_starttime;
4394 int j;
4395
4396 info = sdscatprintf(sdsempty(),
4397 "redis_version:%s\r\n"
4398 "arch_bits:%s\r\n"
4399 "uptime_in_seconds:%d\r\n"
4400 "uptime_in_days:%d\r\n"
4401 "connected_clients:%d\r\n"
4402 "connected_slaves:%d\r\n"
4403 "used_memory:%zu\r\n"
4404 "changes_since_last_save:%lld\r\n"
4405 "bgsave_in_progress:%d\r\n"
4406 "last_save_time:%d\r\n"
4407 "total_connections_received:%lld\r\n"
4408 "total_commands_processed:%lld\r\n"
4409 "role:%s\r\n"
4410 ,REDIS_VERSION,
4411 (sizeof(long) == 8) ? "64" : "32",
4412 uptime,
4413 uptime/(3600*24),
4414 listLength(server.clients)-listLength(server.slaves),
4415 listLength(server.slaves),
4416 server.usedmemory,
4417 server.dirty,
4418 server.bgsaveinprogress,
4419 server.lastsave,
4420 server.stat_numconnections,
4421 server.stat_numcommands,
4422 server.masterhost == NULL ? "master" : "slave"
4423 );
4424 if (server.masterhost) {
4425 info = sdscatprintf(info,
4426 "master_host:%s\r\n"
4427 "master_port:%d\r\n"
4428 "master_link_status:%s\r\n"
4429 "master_last_io_seconds_ago:%d\r\n"
4430 ,server.masterhost,
4431 server.masterport,
4432 (server.replstate == REDIS_REPL_CONNECTED) ?
4433 "up" : "down",
4434 (int)(time(NULL)-server.master->lastinteraction)
4435 );
4436 }
4437 for (j = 0; j < server.dbnum; j++) {
4438 long long keys, vkeys;
4439
4440 keys = dictSize(server.db[j].dict);
4441 vkeys = dictSize(server.db[j].expires);
4442 if (keys || vkeys) {
4443 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4444 j, keys, vkeys);
4445 }
4446 }
4447 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4448 addReplySds(c,info);
4449 addReply(c,shared.crlf);
4450 }
4451
4452 static void monitorCommand(redisClient *c) {
4453 /* ignore MONITOR if aleady slave or in monitor mode */
4454 if (c->flags & REDIS_SLAVE) return;
4455
4456 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4457 c->slaveseldb = 0;
4458 listAddNodeTail(server.monitors,c);
4459 addReply(c,shared.ok);
4460 }
4461
4462 /* ================================= Expire ================================= */
4463 static int removeExpire(redisDb *db, robj *key) {
4464 if (dictDelete(db->expires,key) == DICT_OK) {
4465 return 1;
4466 } else {
4467 return 0;
4468 }
4469 }
4470
4471 static int setExpire(redisDb *db, robj *key, time_t when) {
4472 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4473 return 0;
4474 } else {
4475 incrRefCount(key);
4476 return 1;
4477 }
4478 }
4479
4480 /* Return the expire time of the specified key, or -1 if no expire
4481 * is associated with this key (i.e. the key is non volatile) */
4482 static time_t getExpire(redisDb *db, robj *key) {
4483 dictEntry *de;
4484
4485 /* No expire? return ASAP */
4486 if (dictSize(db->expires) == 0 ||
4487 (de = dictFind(db->expires,key)) == NULL) return -1;
4488
4489 return (time_t) dictGetEntryVal(de);
4490 }
4491
4492 static int expireIfNeeded(redisDb *db, robj *key) {
4493 time_t when;
4494 dictEntry *de;
4495
4496 /* No expire? return ASAP */
4497 if (dictSize(db->expires) == 0 ||
4498 (de = dictFind(db->expires,key)) == NULL) return 0;
4499
4500 /* Lookup the expire */
4501 when = (time_t) dictGetEntryVal(de);
4502 if (time(NULL) <= when) return 0;
4503
4504 /* Delete the key */
4505 dictDelete(db->expires,key);
4506 return dictDelete(db->dict,key) == DICT_OK;
4507 }
4508
4509 static int deleteIfVolatile(redisDb *db, robj *key) {
4510 dictEntry *de;
4511
4512 /* No expire? return ASAP */
4513 if (dictSize(db->expires) == 0 ||
4514 (de = dictFind(db->expires,key)) == NULL) return 0;
4515
4516 /* Delete the key */
4517 server.dirty++;
4518 dictDelete(db->expires,key);
4519 return dictDelete(db->dict,key) == DICT_OK;
4520 }
4521
4522 static void expireCommand(redisClient *c) {
4523 dictEntry *de;
4524 int seconds = atoi(c->argv[2]->ptr);
4525
4526 de = dictFind(c->db->dict,c->argv[1]);
4527 if (de == NULL) {
4528 addReply(c,shared.czero);
4529 return;
4530 }
4531 if (seconds <= 0) {
4532 addReply(c, shared.czero);
4533 return;
4534 } else {
4535 time_t when = time(NULL)+seconds;
4536 if (setExpire(c->db,c->argv[1],when)) {
4537 addReply(c,shared.cone);
4538 server.dirty++;
4539 } else {
4540 addReply(c,shared.czero);
4541 }
4542 return;
4543 }
4544 }
4545
4546 static void ttlCommand(redisClient *c) {
4547 time_t expire;
4548 int ttl = -1;
4549
4550 expire = getExpire(c->db,c->argv[1]);
4551 if (expire != -1) {
4552 ttl = (int) (expire-time(NULL));
4553 if (ttl < 0) ttl = -1;
4554 }
4555 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4556 }
4557
4558 static void msetGenericCommand(redisClient *c, int nx) {
4559 int j;
4560
4561 if ((c->argc % 2) == 0) {
4562 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4563 return;
4564 }
4565 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4566 * set nothing at all if at least one already key exists. */
4567 if (nx) {
4568 for (j = 1; j < c->argc; j += 2) {
4569 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4570 addReply(c, shared.czero);
4571 return;
4572 }
4573 }
4574 }
4575
4576 for (j = 1; j < c->argc; j += 2) {
4577 int retval;
4578
4579 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4580 if (retval == DICT_ERR) {
4581 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4582 incrRefCount(c->argv[j+1]);
4583 } else {
4584 incrRefCount(c->argv[j]);
4585 incrRefCount(c->argv[j+1]);
4586 }
4587 removeExpire(c->db,c->argv[j]);
4588 }
4589 server.dirty += (c->argc-1)/2;
4590 addReply(c, nx ? shared.cone : shared.ok);
4591 }
4592
4593 static void msetCommand(redisClient *c) {
4594 msetGenericCommand(c,0);
4595 }
4596
4597 static void msetnxCommand(redisClient *c) {
4598 msetGenericCommand(c,1);
4599 }
4600
4601 /* =============================== Replication ============================= */
4602
4603 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4604 ssize_t nwritten, ret = size;
4605 time_t start = time(NULL);
4606
4607 timeout++;
4608 while(size) {
4609 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4610 nwritten = write(fd,ptr,size);
4611 if (nwritten == -1) return -1;
4612 ptr += nwritten;
4613 size -= nwritten;
4614 }
4615 if ((time(NULL)-start) > timeout) {
4616 errno = ETIMEDOUT;
4617 return -1;
4618 }
4619 }
4620 return ret;
4621 }
4622
4623 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4624 ssize_t nread, totread = 0;
4625 time_t start = time(NULL);
4626
4627 timeout++;
4628 while(size) {
4629 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4630 nread = read(fd,ptr,size);
4631 if (nread == -1) return -1;
4632 ptr += nread;
4633 size -= nread;
4634 totread += nread;
4635 }
4636 if ((time(NULL)-start) > timeout) {
4637 errno = ETIMEDOUT;
4638 return -1;
4639 }
4640 }
4641 return totread;
4642 }
4643
4644 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4645 ssize_t nread = 0;
4646
4647 size--;
4648 while(size) {
4649 char c;
4650
4651 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4652 if (c == '\n') {
4653 *ptr = '\0';
4654 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4655 return nread;
4656 } else {
4657 *ptr++ = c;
4658 *ptr = '\0';
4659 nread++;
4660 }
4661 }
4662 return nread;
4663 }
4664
4665 static void syncCommand(redisClient *c) {
4666 /* ignore SYNC if aleady slave or in monitor mode */
4667 if (c->flags & REDIS_SLAVE) return;
4668
4669 /* SYNC can't be issued when the server has pending data to send to
4670 * the client about already issued commands. We need a fresh reply
4671 * buffer registering the differences between the BGSAVE and the current
4672 * dataset, so that we can copy to other slaves if needed. */
4673 if (listLength(c->reply) != 0) {
4674 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4675 return;
4676 }
4677
4678 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4679 /* Here we need to check if there is a background saving operation
4680 * in progress, or if it is required to start one */
4681 if (server.bgsaveinprogress) {
4682 /* Ok a background save is in progress. Let's check if it is a good
4683 * one for replication, i.e. if there is another slave that is
4684 * registering differences since the server forked to save */
4685 redisClient *slave;
4686 listNode *ln;
4687
4688 listRewind(server.slaves);
4689 while((ln = listYield(server.slaves))) {
4690 slave = ln->value;
4691 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4692 }
4693 if (ln) {
4694 /* Perfect, the server is already registering differences for
4695 * another slave. Set the right state, and copy the buffer. */
4696 listRelease(c->reply);
4697 c->reply = listDup(slave->reply);
4698 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4699 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4700 } else {
4701 /* No way, we need to wait for the next BGSAVE in order to
4702 * register differences */
4703 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4704 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4705 }
4706 } else {
4707 /* Ok we don't have a BGSAVE in progress, let's start one */
4708 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4709 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4710 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4711 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4712 return;
4713 }
4714 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4715 }
4716 c->repldbfd = -1;
4717 c->flags |= REDIS_SLAVE;
4718 c->slaveseldb = 0;
4719 listAddNodeTail(server.slaves,c);
4720 return;
4721 }
4722
4723 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4724 redisClient *slave = privdata;
4725 REDIS_NOTUSED(el);
4726 REDIS_NOTUSED(mask);
4727 char buf[REDIS_IOBUF_LEN];
4728 ssize_t nwritten, buflen;
4729
4730 if (slave->repldboff == 0) {
4731 /* Write the bulk write count before to transfer the DB. In theory here
4732 * we don't know how much room there is in the output buffer of the
4733 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4734 * operations) will never be smaller than the few bytes we need. */
4735 sds bulkcount;
4736
4737 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4738 slave->repldbsize);
4739 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4740 {
4741 sdsfree(bulkcount);
4742 freeClient(slave);
4743 return;
4744 }
4745 sdsfree(bulkcount);
4746 }
4747 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4748 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4749 if (buflen <= 0) {
4750 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4751 (buflen == 0) ? "premature EOF" : strerror(errno));
4752 freeClient(slave);
4753 return;
4754 }
4755 if ((nwritten = write(fd,buf,buflen)) == -1) {
4756 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4757 strerror(errno));
4758 freeClient(slave);
4759 return;
4760 }
4761 slave->repldboff += nwritten;
4762 if (slave->repldboff == slave->repldbsize) {
4763 close(slave->repldbfd);
4764 slave->repldbfd = -1;
4765 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4766 slave->replstate = REDIS_REPL_ONLINE;
4767 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4768 sendReplyToClient, slave, NULL) == AE_ERR) {
4769 freeClient(slave);
4770 return;
4771 }
4772 addReplySds(slave,sdsempty());
4773 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4774 }
4775 }
4776
4777 /* This function is called at the end of every backgrond saving.
4778 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4779 * otherwise REDIS_ERR is passed to the function.
4780 *
4781 * The goal of this function is to handle slaves waiting for a successful
4782 * background saving in order to perform non-blocking synchronization. */
4783 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4784 listNode *ln;
4785 int startbgsave = 0;
4786
4787 listRewind(server.slaves);
4788 while((ln = listYield(server.slaves))) {
4789 redisClient *slave = ln->value;
4790
4791 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4792 startbgsave = 1;
4793 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4794 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4795 struct redis_stat buf;
4796
4797 if (bgsaveerr != REDIS_OK) {
4798 freeClient(slave);
4799 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4800 continue;
4801 }
4802 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4803 redis_fstat(slave->repldbfd,&buf) == -1) {
4804 freeClient(slave);
4805 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4806 continue;
4807 }
4808 slave->repldboff = 0;
4809 slave->repldbsize = buf.st_size;
4810 slave->replstate = REDIS_REPL_SEND_BULK;
4811 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4812 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4813 freeClient(slave);
4814 continue;
4815 }
4816 }
4817 }
4818 if (startbgsave) {
4819 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4820 listRewind(server.slaves);
4821 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4822 while((ln = listYield(server.slaves))) {
4823 redisClient *slave = ln->value;
4824
4825 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4826 freeClient(slave);
4827 }
4828 }
4829 }
4830 }
4831
4832 static int syncWithMaster(void) {
4833 char buf[1024], tmpfile[256];
4834 int dumpsize;
4835 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4836 int dfd;
4837
4838 if (fd == -1) {
4839 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4840 strerror(errno));
4841 return REDIS_ERR;
4842 }
4843 /* Issue the SYNC command */
4844 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4845 close(fd);
4846 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4847 strerror(errno));
4848 return REDIS_ERR;
4849 }
4850 /* Read the bulk write count */
4851 if (syncReadLine(fd,buf,1024,3600) == -1) {
4852 close(fd);
4853 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
4854 strerror(errno));
4855 return REDIS_ERR;
4856 }
4857 dumpsize = atoi(buf+1);
4858 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4859 /* Read the bulk write data on a temp file */
4860 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4861 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4862 if (dfd == -1) {
4863 close(fd);
4864 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
4865 return REDIS_ERR;
4866 }
4867 while(dumpsize) {
4868 int nread, nwritten;
4869
4870 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
4871 if (nread == -1) {
4872 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
4873 strerror(errno));
4874 close(fd);
4875 close(dfd);
4876 return REDIS_ERR;
4877 }
4878 nwritten = write(dfd,buf,nread);
4879 if (nwritten == -1) {
4880 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
4881 close(fd);
4882 close(dfd);
4883 return REDIS_ERR;
4884 }
4885 dumpsize -= nread;
4886 }
4887 close(dfd);
4888 if (rename(tmpfile,server.dbfilename) == -1) {
4889 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
4890 unlink(tmpfile);
4891 close(fd);
4892 return REDIS_ERR;
4893 }
4894 emptyDb();
4895 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4896 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4897 close(fd);
4898 return REDIS_ERR;
4899 }
4900 server.master = createClient(fd);
4901 server.master->flags |= REDIS_MASTER;
4902 server.replstate = REDIS_REPL_CONNECTED;
4903 return REDIS_OK;
4904 }
4905
4906 static void slaveofCommand(redisClient *c) {
4907 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4908 !strcasecmp(c->argv[2]->ptr,"one")) {
4909 if (server.masterhost) {
4910 sdsfree(server.masterhost);
4911 server.masterhost = NULL;
4912 if (server.master) freeClient(server.master);
4913 server.replstate = REDIS_REPL_NONE;
4914 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4915 }
4916 } else {
4917 sdsfree(server.masterhost);
4918 server.masterhost = sdsdup(c->argv[1]->ptr);
4919 server.masterport = atoi(c->argv[2]->ptr);
4920 if (server.master) freeClient(server.master);
4921 server.replstate = REDIS_REPL_CONNECT;
4922 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4923 server.masterhost, server.masterport);
4924 }
4925 addReply(c,shared.ok);
4926 }
4927
4928 /* ============================ Maxmemory directive ======================== */
4929
4930 /* This function gets called when 'maxmemory' is set on the config file to limit
4931 * the max memory used by the server, and we are out of memory.
4932 * This function will try to, in order:
4933 *
4934 * - Free objects from the free list
4935 * - Try to remove keys with an EXPIRE set
4936 *
4937 * It is not possible to free enough memory to reach used-memory < maxmemory
4938 * the server will start refusing commands that will enlarge even more the
4939 * memory usage.
4940 */
4941 static void freeMemoryIfNeeded(void) {
4942 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4943 if (listLength(server.objfreelist)) {
4944 robj *o;
4945
4946 listNode *head = listFirst(server.objfreelist);
4947 o = listNodeValue(head);
4948 listDelNode(server.objfreelist,head);
4949 zfree(o);
4950 } else {
4951 int j, k, freed = 0;
4952
4953 for (j = 0; j < server.dbnum; j++) {
4954 int minttl = -1;
4955 robj *minkey = NULL;
4956 struct dictEntry *de;
4957
4958 if (dictSize(server.db[j].expires)) {
4959 freed = 1;
4960 /* From a sample of three keys drop the one nearest to
4961 * the natural expire */
4962 for (k = 0; k < 3; k++) {
4963 time_t t;
4964
4965 de = dictGetRandomKey(server.db[j].expires);
4966 t = (time_t) dictGetEntryVal(de);
4967 if (minttl == -1 || t < minttl) {
4968 minkey = dictGetEntryKey(de);
4969 minttl = t;
4970 }
4971 }
4972 deleteKey(server.db+j,minkey);
4973 }
4974 }
4975 if (!freed) return; /* nothing to free... */
4976 }
4977 }
4978 }
4979
4980 /* ================================= Debugging ============================== */
4981
4982 static void debugCommand(redisClient *c) {
4983 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4984 *((char*)-1) = 'x';
4985 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4986 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4987 robj *key, *val;
4988
4989 if (!de) {
4990 addReply(c,shared.nokeyerr);
4991 return;
4992 }
4993 key = dictGetEntryKey(de);
4994 val = dictGetEntryVal(de);
4995 addReplySds(c,sdscatprintf(sdsempty(),
4996 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
4997 key, key->refcount, val, val->refcount, val->encoding));
4998 } else {
4999 addReplySds(c,sdsnew(
5000 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5001 }
5002 }
5003
5004 #ifdef HAVE_BACKTRACE
5005 static struct redisFunctionSym symsTable[] = {
5006 {"compareStringObjects", (unsigned long)compareStringObjects},
5007 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
5008 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
5009 {"dictEncObjHash", (unsigned long)dictEncObjHash},
5010 {"incrDecrCommand", (unsigned long)incrDecrCommand},
5011 {"freeStringObject", (unsigned long)freeStringObject},
5012 {"freeListObject", (unsigned long)freeListObject},
5013 {"freeSetObject", (unsigned long)freeSetObject},
5014 {"decrRefCount", (unsigned long)decrRefCount},
5015 {"createObject", (unsigned long)createObject},
5016 {"freeClient", (unsigned long)freeClient},
5017 {"rdbLoad", (unsigned long)rdbLoad},
5018 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
5019 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
5020 {"addReply", (unsigned long)addReply},
5021 {"addReplySds", (unsigned long)addReplySds},
5022 {"incrRefCount", (unsigned long)incrRefCount},
5023 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
5024 {"createStringObject", (unsigned long)createStringObject},
5025 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
5026 {"syncWithMaster", (unsigned long)syncWithMaster},
5027 {"tryObjectSharing", (unsigned long)tryObjectSharing},
5028 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
5029 {"getDecodedObject", (unsigned long)getDecodedObject},
5030 {"removeExpire", (unsigned long)removeExpire},
5031 {"expireIfNeeded", (unsigned long)expireIfNeeded},
5032 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
5033 {"deleteKey", (unsigned long)deleteKey},
5034 {"getExpire", (unsigned long)getExpire},
5035 {"setExpire", (unsigned long)setExpire},
5036 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
5037 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
5038 {"authCommand", (unsigned long)authCommand},
5039 {"pingCommand", (unsigned long)pingCommand},
5040 {"echoCommand", (unsigned long)echoCommand},
5041 {"setCommand", (unsigned long)setCommand},
5042 {"setnxCommand", (unsigned long)setnxCommand},
5043 {"getCommand", (unsigned long)getCommand},
5044 {"delCommand", (unsigned long)delCommand},
5045 {"existsCommand", (unsigned long)existsCommand},
5046 {"incrCommand", (unsigned long)incrCommand},
5047 {"decrCommand", (unsigned long)decrCommand},
5048 {"incrbyCommand", (unsigned long)incrbyCommand},
5049 {"decrbyCommand", (unsigned long)decrbyCommand},
5050 {"selectCommand", (unsigned long)selectCommand},
5051 {"randomkeyCommand", (unsigned long)randomkeyCommand},
5052 {"keysCommand", (unsigned long)keysCommand},
5053 {"dbsizeCommand", (unsigned long)dbsizeCommand},
5054 {"lastsaveCommand", (unsigned long)lastsaveCommand},
5055 {"saveCommand", (unsigned long)saveCommand},
5056 {"bgsaveCommand", (unsigned long)bgsaveCommand},
5057 {"shutdownCommand", (unsigned long)shutdownCommand},
5058 {"moveCommand", (unsigned long)moveCommand},
5059 {"renameCommand", (unsigned long)renameCommand},
5060 {"renamenxCommand", (unsigned long)renamenxCommand},
5061 {"lpushCommand", (unsigned long)lpushCommand},
5062 {"rpushCommand", (unsigned long)rpushCommand},
5063 {"lpopCommand", (unsigned long)lpopCommand},
5064 {"rpopCommand", (unsigned long)rpopCommand},
5065 {"llenCommand", (unsigned long)llenCommand},
5066 {"lindexCommand", (unsigned long)lindexCommand},
5067 {"lrangeCommand", (unsigned long)lrangeCommand},
5068 {"ltrimCommand", (unsigned long)ltrimCommand},
5069 {"typeCommand", (unsigned long)typeCommand},
5070 {"lsetCommand", (unsigned long)lsetCommand},
5071 {"saddCommand", (unsigned long)saddCommand},
5072 {"sremCommand", (unsigned long)sremCommand},
5073 {"smoveCommand", (unsigned long)smoveCommand},
5074 {"sismemberCommand", (unsigned long)sismemberCommand},
5075 {"scardCommand", (unsigned long)scardCommand},
5076 {"spopCommand", (unsigned long)spopCommand},
5077 {"srandmemberCommand", (unsigned long)srandmemberCommand},
5078 {"sinterCommand", (unsigned long)sinterCommand},
5079 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
5080 {"sunionCommand", (unsigned long)sunionCommand},
5081 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
5082 {"sdiffCommand", (unsigned long)sdiffCommand},
5083 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
5084 {"syncCommand", (unsigned long)syncCommand},
5085 {"flushdbCommand", (unsigned long)flushdbCommand},
5086 {"flushallCommand", (unsigned long)flushallCommand},
5087 {"sortCommand", (unsigned long)sortCommand},
5088 {"lremCommand", (unsigned long)lremCommand},
5089 {"infoCommand", (unsigned long)infoCommand},
5090 {"mgetCommand", (unsigned long)mgetCommand},
5091 {"monitorCommand", (unsigned long)monitorCommand},
5092 {"expireCommand", (unsigned long)expireCommand},
5093 {"getsetCommand", (unsigned long)getsetCommand},
5094 {"ttlCommand", (unsigned long)ttlCommand},
5095 {"slaveofCommand", (unsigned long)slaveofCommand},
5096 {"debugCommand", (unsigned long)debugCommand},
5097 {"processCommand", (unsigned long)processCommand},
5098 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
5099 {"readQueryFromClient", (unsigned long)readQueryFromClient},
5100 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
5101 {"msetGenericCommand", (unsigned long)msetGenericCommand},
5102 {"msetCommand", (unsigned long)msetCommand},
5103 {"msetnxCommand", (unsigned long)msetnxCommand},
5104 {"zslCreateNode", (unsigned long)zslCreateNode},
5105 {"zslCreate", (unsigned long)zslCreate},
5106 {"zslFreeNode",(unsigned long)zslFreeNode},
5107 {"zslFree",(unsigned long)zslFree},
5108 {"zslRandomLevel",(unsigned long)zslRandomLevel},
5109 {"zslInsert",(unsigned long)zslInsert},
5110 {"zslDelete",(unsigned long)zslDelete},
5111 {"createZsetObject",(unsigned long)createZsetObject},
5112 {"zaddCommand",(unsigned long)zaddCommand},
5113 {"zrangeGenericCommand",(unsigned long)zrangeGenericCommand},
5114 {"zrangeCommand",(unsigned long)zrangeCommand},
5115 {"zrevrangeCommand",(unsigned long)zrevrangeCommand},
5116 {"zremCommand",(unsigned long)zremCommand},
5117 {NULL,0}
5118 };
5119
5120 /* This function try to convert a pointer into a function name. It's used in
5121 * oreder to provide a backtrace under segmentation fault that's able to
5122 * display functions declared as static (otherwise the backtrace is useless). */
5123 static char *findFuncName(void *pointer, unsigned long *offset){
5124 int i, ret = -1;
5125 unsigned long off, minoff = 0;
5126
5127 /* Try to match against the Symbol with the smallest offset */
5128 for (i=0; symsTable[i].pointer; i++) {
5129 unsigned long lp = (unsigned long) pointer;
5130
5131 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5132 off=lp-symsTable[i].pointer;
5133 if (ret < 0 || off < minoff) {
5134 minoff=off;
5135 ret=i;
5136 }
5137 }
5138 }
5139 if (ret == -1) return NULL;
5140 *offset = minoff;
5141 return symsTable[ret].name;
5142 }
5143
5144 static void *getMcontextEip(ucontext_t *uc) {
5145 #if defined(__FreeBSD__)
5146 return (void*) uc->uc_mcontext.mc_eip;
5147 #elif defined(__dietlibc__)
5148 return (void*) uc->uc_mcontext.eip;
5149 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5150 return (void*) uc->uc_mcontext->__ss.__eip;
5151 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5152 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5153 return (void*) uc->uc_mcontext->__ss.__rip;
5154 #else
5155 return (void*) uc->uc_mcontext->__ss.__eip;
5156 #endif
5157 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5158 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5159 #elif defined(__ia64__) /* Linux IA64 */
5160 return (void*) uc->uc_mcontext.sc_ip;
5161 #else
5162 return NULL;
5163 #endif
5164 }
5165
5166 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5167 void *trace[100];
5168 char **messages = NULL;
5169 int i, trace_size = 0;
5170 unsigned long offset=0;
5171 time_t uptime = time(NULL)-server.stat_starttime;
5172 ucontext_t *uc = (ucontext_t*) secret;
5173 REDIS_NOTUSED(info);
5174
5175 redisLog(REDIS_WARNING,
5176 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5177 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5178 "redis_version:%s; "
5179 "uptime_in_seconds:%d; "
5180 "connected_clients:%d; "
5181 "connected_slaves:%d; "
5182 "used_memory:%zu; "
5183 "changes_since_last_save:%lld; "
5184 "bgsave_in_progress:%d; "
5185 "last_save_time:%d; "
5186 "total_connections_received:%lld; "
5187 "total_commands_processed:%lld; "
5188 "role:%s;"
5189 ,REDIS_VERSION,
5190 uptime,
5191 listLength(server.clients)-listLength(server.slaves),
5192 listLength(server.slaves),
5193 server.usedmemory,
5194 server.dirty,
5195 server.bgsaveinprogress,
5196 server.lastsave,
5197 server.stat_numconnections,
5198 server.stat_numcommands,
5199 server.masterhost == NULL ? "master" : "slave"
5200 ));
5201
5202 trace_size = backtrace(trace, 100);
5203 /* overwrite sigaction with caller's address */
5204 if (getMcontextEip(uc) != NULL) {
5205 trace[1] = getMcontextEip(uc);
5206 }
5207 messages = backtrace_symbols(trace, trace_size);
5208
5209 for (i=1; i<trace_size; ++i) {
5210 char *fn = findFuncName(trace[i], &offset), *p;
5211
5212 p = strchr(messages[i],'+');
5213 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5214 redisLog(REDIS_WARNING,"%s", messages[i]);
5215 } else {
5216 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5217 }
5218 }
5219 free(messages);
5220 exit(0);
5221 }
5222
5223 static void setupSigSegvAction(void) {
5224 struct sigaction act;
5225
5226 sigemptyset (&act.sa_mask);
5227 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5228 * is used. Otherwise, sa_handler is used */
5229 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5230 act.sa_sigaction = segvHandler;
5231 sigaction (SIGSEGV, &act, NULL);
5232 sigaction (SIGBUS, &act, NULL);
5233 sigaction (SIGFPE, &act, NULL);
5234 sigaction (SIGILL, &act, NULL);
5235 sigaction (SIGBUS, &act, NULL);
5236 return;
5237 }
5238 #else /* HAVE_BACKTRACE */
5239 static void setupSigSegvAction(void) {
5240 }
5241 #endif /* HAVE_BACKTRACE */
5242
5243 /* =================================== Main! ================================ */
5244
5245 #ifdef __linux__
5246 int linuxOvercommitMemoryValue(void) {
5247 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5248 char buf[64];
5249
5250 if (!fp) return -1;
5251 if (fgets(buf,64,fp) == NULL) {
5252 fclose(fp);
5253 return -1;
5254 }
5255 fclose(fp);
5256
5257 return atoi(buf);
5258 }
5259
5260 void linuxOvercommitMemoryWarning(void) {
5261 if (linuxOvercommitMemoryValue() == 0) {
5262 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5263 }
5264 }
5265 #endif /* __linux__ */
5266
5267 static void daemonize(void) {
5268 int fd;
5269 FILE *fp;
5270
5271 if (fork() != 0) exit(0); /* parent exits */
5272 setsid(); /* create a new session */
5273
5274 /* Every output goes to /dev/null. If Redis is daemonized but
5275 * the 'logfile' is set to 'stdout' in the configuration file
5276 * it will not log at all. */
5277 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5278 dup2(fd, STDIN_FILENO);
5279 dup2(fd, STDOUT_FILENO);
5280 dup2(fd, STDERR_FILENO);
5281 if (fd > STDERR_FILENO) close(fd);
5282 }
5283 /* Try to write the pid file */
5284 fp = fopen(server.pidfile,"w");
5285 if (fp) {
5286 fprintf(fp,"%d\n",getpid());
5287 fclose(fp);
5288 }
5289 }
5290
5291 int main(int argc, char **argv) {
5292 initServerConfig();
5293 if (argc == 2) {
5294 ResetServerSaveParams();
5295 loadServerConfig(argv[1]);
5296 } else if (argc > 2) {
5297 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5298 exit(1);
5299 } else {
5300 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5301 }
5302 initServer();
5303 if (server.daemonize) daemonize();
5304 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5305 #ifdef __linux__
5306 linuxOvercommitMemoryWarning();
5307 #endif
5308 if (rdbLoad(server.dbfilename) == REDIS_OK)
5309 redisLog(REDIS_NOTICE,"DB loaded from disk");
5310 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5311 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5312 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5313 aeMain(server.el);
5314 aeDeleteEventLoop(server.el);
5315 return 0;
5316 }