]> git.saurik.com Git - redis.git/blob - redis.c
509d2f0851999a9ba3525f3690e1e30baedc1ac5
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60 #include <math.h>
61
62 #include "redis.h"
63 #include "ae.h" /* Event driven programming library */
64 #include "sds.h" /* Dynamic safe strings */
65 #include "anet.h" /* Networking the easy way */
66 #include "dict.h" /* Hash tables */
67 #include "adlist.h" /* Linked lists */
68 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
69 #include "lzf.h" /* LZF compression library */
70 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
71
72 /* Error codes */
73 #define REDIS_OK 0
74 #define REDIS_ERR -1
75
76 /* Static server configuration */
77 #define REDIS_SERVERPORT 6379 /* TCP port */
78 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
79 #define REDIS_IOBUF_LEN 1024
80 #define REDIS_LOADBUF_LEN 1024
81 #define REDIS_STATIC_ARGS 4
82 #define REDIS_DEFAULT_DBNUM 16
83 #define REDIS_CONFIGLINE_MAX 1024
84 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
85 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
86 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
87 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
88 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
89
90 /* Hash table parameters */
91 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
92
93 /* Command flags */
94 #define REDIS_CMD_BULK 1 /* Bulk write command */
95 #define REDIS_CMD_INLINE 2 /* Inline command */
96 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
97 this flags will return an error when the 'maxmemory' option is set in the
98 config file and the server is using more than maxmemory bytes of memory.
99 In short this commands are denied on low memory conditions. */
100 #define REDIS_CMD_DENYOOM 4
101
102 /* Object types */
103 #define REDIS_STRING 0
104 #define REDIS_LIST 1
105 #define REDIS_SET 2
106 #define REDIS_ZSET 3
107 #define REDIS_HASH 4
108
109 /* Objects encoding */
110 #define REDIS_ENCODING_RAW 0 /* Raw representation */
111 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
112
113 /* Object types only used for dumping to disk */
114 #define REDIS_EXPIRETIME 253
115 #define REDIS_SELECTDB 254
116 #define REDIS_EOF 255
117
118 /* Defines related to the dump file format. To store 32 bits lengths for short
119 * keys requires a lot of space, so we check the most significant 2 bits of
120 * the first byte to interpreter the length:
121 *
122 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
123 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
124 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
125 * 11|000000 this means: specially encoded object will follow. The six bits
126 * number specify the kind of object that follows.
127 * See the REDIS_RDB_ENC_* defines.
128 *
129 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
130 * values, will fit inside. */
131 #define REDIS_RDB_6BITLEN 0
132 #define REDIS_RDB_14BITLEN 1
133 #define REDIS_RDB_32BITLEN 2
134 #define REDIS_RDB_ENCVAL 3
135 #define REDIS_RDB_LENERR UINT_MAX
136
137 /* When a length of a string object stored on disk has the first two bits
138 * set, the remaining two bits specify a special encoding for the object
139 * accordingly to the following defines: */
140 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
141 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
142 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
143 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
144
145 /* Client flags */
146 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
147 #define REDIS_SLAVE 2 /* This client is a slave server */
148 #define REDIS_MASTER 4 /* This client is a master server */
149 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
150
151 /* Slave replication state - slave side */
152 #define REDIS_REPL_NONE 0 /* No active replication */
153 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
154 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
155
156 /* Slave replication state - from the point of view of master
157 * Note that in SEND_BULK and ONLINE state the slave receives new updates
158 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
159 * to start the next background saving in order to send updates to it. */
160 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
161 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
162 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
163 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
164
165 /* List related stuff */
166 #define REDIS_HEAD 0
167 #define REDIS_TAIL 1
168
169 /* Sort operations */
170 #define REDIS_SORT_GET 0
171 #define REDIS_SORT_DEL 1
172 #define REDIS_SORT_INCR 2
173 #define REDIS_SORT_DECR 3
174 #define REDIS_SORT_ASC 4
175 #define REDIS_SORT_DESC 5
176 #define REDIS_SORTKEY_MAX 1024
177
178 /* Log levels */
179 #define REDIS_DEBUG 0
180 #define REDIS_NOTICE 1
181 #define REDIS_WARNING 2
182
183 /* Anti-warning macro... */
184 #define REDIS_NOTUSED(V) ((void) V)
185
186 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
187 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
188
189 /*================================= Data types ============================== */
190
191 /* A redis object, that is a type able to hold a string / list / set */
192 typedef struct redisObject {
193 void *ptr;
194 unsigned char type;
195 unsigned char encoding;
196 unsigned char notused[2];
197 int refcount;
198 } robj;
199
200 typedef struct redisDb {
201 dict *dict;
202 dict *expires;
203 int id;
204 } redisDb;
205
206 /* With multiplexing we need to take per-clinet state.
207 * Clients are taken in a liked list. */
208 typedef struct redisClient {
209 int fd;
210 redisDb *db;
211 int dictid;
212 sds querybuf;
213 robj **argv, **mbargv;
214 int argc, mbargc;
215 int bulklen; /* bulk read len. -1 if not in bulk read mode */
216 int multibulk; /* multi bulk command format active */
217 list *reply;
218 int sentlen;
219 time_t lastinteraction; /* time of the last interaction, used for timeout */
220 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
221 int slaveseldb; /* slave selected db, if this client is a slave */
222 int authenticated; /* when requirepass is non-NULL */
223 int replstate; /* replication state if this is a slave */
224 int repldbfd; /* replication DB file descriptor */
225 long repldboff; /* replication DB file offset */
226 off_t repldbsize; /* replication DB file size */
227 } redisClient;
228
229 struct saveparam {
230 time_t seconds;
231 int changes;
232 };
233
234 /* Global server state structure */
235 struct redisServer {
236 int port;
237 int fd;
238 redisDb *db;
239 dict *sharingpool;
240 unsigned int sharingpoolsize;
241 long long dirty; /* changes to DB from the last save */
242 list *clients;
243 list *slaves, *monitors;
244 char neterr[ANET_ERR_LEN];
245 aeEventLoop *el;
246 int cronloops; /* number of times the cron function run */
247 list *objfreelist; /* A list of freed objects to avoid malloc() */
248 time_t lastsave; /* Unix time of last save succeeede */
249 size_t usedmemory; /* Used memory in megabytes */
250 /* Fields used only for stats */
251 time_t stat_starttime; /* server start time */
252 long long stat_numcommands; /* number of processed commands */
253 long long stat_numconnections; /* number of connections received */
254 /* Configuration */
255 int verbosity;
256 int glueoutputbuf;
257 int maxidletime;
258 int dbnum;
259 int daemonize;
260 char *pidfile;
261 int bgsaveinprogress;
262 pid_t bgsavechildpid;
263 struct saveparam *saveparams;
264 int saveparamslen;
265 char *logfile;
266 char *bindaddr;
267 char *dbfilename;
268 char *requirepass;
269 int shareobjects;
270 /* Replication related */
271 int isslave;
272 char *masterhost;
273 int masterport;
274 redisClient *master; /* client that is master for this slave */
275 int replstate;
276 unsigned int maxclients;
277 unsigned long maxmemory;
278 /* Sort parameters - qsort_r() is only available under BSD so we
279 * have to take this state global, in order to pass it to sortCompare() */
280 int sort_desc;
281 int sort_alpha;
282 int sort_bypattern;
283 };
284
285 typedef void redisCommandProc(redisClient *c);
286 struct redisCommand {
287 char *name;
288 redisCommandProc *proc;
289 int arity;
290 int flags;
291 };
292
293 struct redisFunctionSym {
294 char *name;
295 unsigned long pointer;
296 };
297
298 typedef struct _redisSortObject {
299 robj *obj;
300 union {
301 double score;
302 robj *cmpobj;
303 } u;
304 } redisSortObject;
305
306 typedef struct _redisSortOperation {
307 int type;
308 robj *pattern;
309 } redisSortOperation;
310
311 /* ZSETs use a specialized version of Skiplists */
312
313 typedef struct zskiplistNode {
314 struct zskiplistNode **forward;
315 struct zskiplistNode *backward;
316 double score;
317 robj *obj;
318 } zskiplistNode;
319
320 typedef struct zskiplist {
321 struct zskiplistNode *header, *tail;
322 long length;
323 int level;
324 } zskiplist;
325
326 typedef struct zset {
327 dict *dict;
328 zskiplist *zsl;
329 } zset;
330
331 /* Our shared "common" objects */
332
333 struct sharedObjectsStruct {
334 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
335 *colon, *nullbulk, *nullmultibulk,
336 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
337 *outofrangeerr, *plus,
338 *select0, *select1, *select2, *select3, *select4,
339 *select5, *select6, *select7, *select8, *select9;
340 } shared;
341
342 /* Global vars that are actally used as constants. The following double
343 * values are used for double on-disk serialization, and are initialized
344 * at runtime to avoid strange compiler optimizations. */
345
346 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
347
348 /*================================ Prototypes =============================== */
349
350 static void freeStringObject(robj *o);
351 static void freeListObject(robj *o);
352 static void freeSetObject(robj *o);
353 static void decrRefCount(void *o);
354 static robj *createObject(int type, void *ptr);
355 static void freeClient(redisClient *c);
356 static int rdbLoad(char *filename);
357 static void addReply(redisClient *c, robj *obj);
358 static void addReplySds(redisClient *c, sds s);
359 static void incrRefCount(robj *o);
360 static int rdbSaveBackground(char *filename);
361 static robj *createStringObject(char *ptr, size_t len);
362 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
363 static int syncWithMaster(void);
364 static robj *tryObjectSharing(robj *o);
365 static int tryObjectEncoding(robj *o);
366 static robj *getDecodedObject(const robj *o);
367 static int removeExpire(redisDb *db, robj *key);
368 static int expireIfNeeded(redisDb *db, robj *key);
369 static int deleteIfVolatile(redisDb *db, robj *key);
370 static int deleteKey(redisDb *db, robj *key);
371 static time_t getExpire(redisDb *db, robj *key);
372 static int setExpire(redisDb *db, robj *key, time_t when);
373 static void updateSlavesWaitingBgsave(int bgsaveerr);
374 static void freeMemoryIfNeeded(void);
375 static int processCommand(redisClient *c);
376 static void setupSigSegvAction(void);
377 static void rdbRemoveTempFile(pid_t childpid);
378 static size_t stringObjectLen(robj *o);
379 static void processInputBuffer(redisClient *c);
380 static zskiplist *zslCreate(void);
381 static void zslFree(zskiplist *zsl);
382 static void zslInsert(zskiplist *zsl, double score, robj *obj);
383
384 static void authCommand(redisClient *c);
385 static void pingCommand(redisClient *c);
386 static void echoCommand(redisClient *c);
387 static void setCommand(redisClient *c);
388 static void setnxCommand(redisClient *c);
389 static void getCommand(redisClient *c);
390 static void delCommand(redisClient *c);
391 static void existsCommand(redisClient *c);
392 static void incrCommand(redisClient *c);
393 static void decrCommand(redisClient *c);
394 static void incrbyCommand(redisClient *c);
395 static void decrbyCommand(redisClient *c);
396 static void selectCommand(redisClient *c);
397 static void randomkeyCommand(redisClient *c);
398 static void keysCommand(redisClient *c);
399 static void dbsizeCommand(redisClient *c);
400 static void lastsaveCommand(redisClient *c);
401 static void saveCommand(redisClient *c);
402 static void bgsaveCommand(redisClient *c);
403 static void shutdownCommand(redisClient *c);
404 static void moveCommand(redisClient *c);
405 static void renameCommand(redisClient *c);
406 static void renamenxCommand(redisClient *c);
407 static void lpushCommand(redisClient *c);
408 static void rpushCommand(redisClient *c);
409 static void lpopCommand(redisClient *c);
410 static void rpopCommand(redisClient *c);
411 static void llenCommand(redisClient *c);
412 static void lindexCommand(redisClient *c);
413 static void lrangeCommand(redisClient *c);
414 static void ltrimCommand(redisClient *c);
415 static void typeCommand(redisClient *c);
416 static void lsetCommand(redisClient *c);
417 static void saddCommand(redisClient *c);
418 static void sremCommand(redisClient *c);
419 static void smoveCommand(redisClient *c);
420 static void sismemberCommand(redisClient *c);
421 static void scardCommand(redisClient *c);
422 static void spopCommand(redisClient *c);
423 static void srandmemberCommand(redisClient *c);
424 static void sinterCommand(redisClient *c);
425 static void sinterstoreCommand(redisClient *c);
426 static void sunionCommand(redisClient *c);
427 static void sunionstoreCommand(redisClient *c);
428 static void sdiffCommand(redisClient *c);
429 static void sdiffstoreCommand(redisClient *c);
430 static void syncCommand(redisClient *c);
431 static void flushdbCommand(redisClient *c);
432 static void flushallCommand(redisClient *c);
433 static void sortCommand(redisClient *c);
434 static void lremCommand(redisClient *c);
435 static void infoCommand(redisClient *c);
436 static void mgetCommand(redisClient *c);
437 static void monitorCommand(redisClient *c);
438 static void expireCommand(redisClient *c);
439 static void getsetCommand(redisClient *c);
440 static void ttlCommand(redisClient *c);
441 static void slaveofCommand(redisClient *c);
442 static void debugCommand(redisClient *c);
443 static void msetCommand(redisClient *c);
444 static void msetnxCommand(redisClient *c);
445 static void zaddCommand(redisClient *c);
446 static void zrangeCommand(redisClient *c);
447 static void zrangebyscoreCommand(redisClient *c);
448 static void zrevrangeCommand(redisClient *c);
449 static void zlenCommand(redisClient *c);
450 static void zremCommand(redisClient *c);
451 static void zscoreCommand(redisClient *c);
452
453 /*================================= Globals ================================= */
454
455 /* Global vars */
456 static struct redisServer server; /* server global state */
457 static struct redisCommand cmdTable[] = {
458 {"get",getCommand,2,REDIS_CMD_INLINE},
459 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
460 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
461 {"del",delCommand,-2,REDIS_CMD_INLINE},
462 {"exists",existsCommand,2,REDIS_CMD_INLINE},
463 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
464 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
465 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
466 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
467 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
468 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
469 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
470 {"llen",llenCommand,2,REDIS_CMD_INLINE},
471 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
472 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
473 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
474 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
475 {"lrem",lremCommand,4,REDIS_CMD_BULK},
476 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
477 {"srem",sremCommand,3,REDIS_CMD_BULK},
478 {"smove",smoveCommand,4,REDIS_CMD_BULK},
479 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
480 {"scard",scardCommand,2,REDIS_CMD_INLINE},
481 {"spop",spopCommand,2,REDIS_CMD_INLINE},
482 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
483 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
484 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
485 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
486 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
487 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
488 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
489 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
490 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
491 {"zrem",zremCommand,3,REDIS_CMD_BULK},
492 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
493 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
494 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
495 {"zlen",zlenCommand,2,REDIS_CMD_INLINE},
496 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
497 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
498 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
499 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
500 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
501 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
502 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
503 {"select",selectCommand,2,REDIS_CMD_INLINE},
504 {"move",moveCommand,3,REDIS_CMD_INLINE},
505 {"rename",renameCommand,3,REDIS_CMD_INLINE},
506 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
507 {"expire",expireCommand,3,REDIS_CMD_INLINE},
508 {"keys",keysCommand,2,REDIS_CMD_INLINE},
509 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
510 {"auth",authCommand,2,REDIS_CMD_INLINE},
511 {"ping",pingCommand,1,REDIS_CMD_INLINE},
512 {"echo",echoCommand,2,REDIS_CMD_BULK},
513 {"save",saveCommand,1,REDIS_CMD_INLINE},
514 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
515 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
516 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
517 {"type",typeCommand,2,REDIS_CMD_INLINE},
518 {"sync",syncCommand,1,REDIS_CMD_INLINE},
519 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
520 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
521 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
522 {"info",infoCommand,1,REDIS_CMD_INLINE},
523 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
524 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
525 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
526 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
527 {NULL,NULL,0,0}
528 };
529 /*============================ Utility functions ============================ */
530
531 /* Glob-style pattern matching. */
532 int stringmatchlen(const char *pattern, int patternLen,
533 const char *string, int stringLen, int nocase)
534 {
535 while(patternLen) {
536 switch(pattern[0]) {
537 case '*':
538 while (pattern[1] == '*') {
539 pattern++;
540 patternLen--;
541 }
542 if (patternLen == 1)
543 return 1; /* match */
544 while(stringLen) {
545 if (stringmatchlen(pattern+1, patternLen-1,
546 string, stringLen, nocase))
547 return 1; /* match */
548 string++;
549 stringLen--;
550 }
551 return 0; /* no match */
552 break;
553 case '?':
554 if (stringLen == 0)
555 return 0; /* no match */
556 string++;
557 stringLen--;
558 break;
559 case '[':
560 {
561 int not, match;
562
563 pattern++;
564 patternLen--;
565 not = pattern[0] == '^';
566 if (not) {
567 pattern++;
568 patternLen--;
569 }
570 match = 0;
571 while(1) {
572 if (pattern[0] == '\\') {
573 pattern++;
574 patternLen--;
575 if (pattern[0] == string[0])
576 match = 1;
577 } else if (pattern[0] == ']') {
578 break;
579 } else if (patternLen == 0) {
580 pattern--;
581 patternLen++;
582 break;
583 } else if (pattern[1] == '-' && patternLen >= 3) {
584 int start = pattern[0];
585 int end = pattern[2];
586 int c = string[0];
587 if (start > end) {
588 int t = start;
589 start = end;
590 end = t;
591 }
592 if (nocase) {
593 start = tolower(start);
594 end = tolower(end);
595 c = tolower(c);
596 }
597 pattern += 2;
598 patternLen -= 2;
599 if (c >= start && c <= end)
600 match = 1;
601 } else {
602 if (!nocase) {
603 if (pattern[0] == string[0])
604 match = 1;
605 } else {
606 if (tolower((int)pattern[0]) == tolower((int)string[0]))
607 match = 1;
608 }
609 }
610 pattern++;
611 patternLen--;
612 }
613 if (not)
614 match = !match;
615 if (!match)
616 return 0; /* no match */
617 string++;
618 stringLen--;
619 break;
620 }
621 case '\\':
622 if (patternLen >= 2) {
623 pattern++;
624 patternLen--;
625 }
626 /* fall through */
627 default:
628 if (!nocase) {
629 if (pattern[0] != string[0])
630 return 0; /* no match */
631 } else {
632 if (tolower((int)pattern[0]) != tolower((int)string[0]))
633 return 0; /* no match */
634 }
635 string++;
636 stringLen--;
637 break;
638 }
639 pattern++;
640 patternLen--;
641 if (stringLen == 0) {
642 while(*pattern == '*') {
643 pattern++;
644 patternLen--;
645 }
646 break;
647 }
648 }
649 if (patternLen == 0 && stringLen == 0)
650 return 1;
651 return 0;
652 }
653
654 static void redisLog(int level, const char *fmt, ...) {
655 va_list ap;
656 FILE *fp;
657
658 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
659 if (!fp) return;
660
661 va_start(ap, fmt);
662 if (level >= server.verbosity) {
663 char *c = ".-*";
664 char buf[64];
665 time_t now;
666
667 now = time(NULL);
668 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
669 fprintf(fp,"%s %c ",buf,c[level]);
670 vfprintf(fp, fmt, ap);
671 fprintf(fp,"\n");
672 fflush(fp);
673 }
674 va_end(ap);
675
676 if (server.logfile) fclose(fp);
677 }
678
679 /*====================== Hash table type implementation ==================== */
680
681 /* This is an hash table type that uses the SDS dynamic strings libary as
682 * keys and radis objects as values (objects can hold SDS strings,
683 * lists, sets). */
684
685 static void dictVanillaFree(void *privdata, void *val)
686 {
687 DICT_NOTUSED(privdata);
688 zfree(val);
689 }
690
691 static int sdsDictKeyCompare(void *privdata, const void *key1,
692 const void *key2)
693 {
694 int l1,l2;
695 DICT_NOTUSED(privdata);
696
697 l1 = sdslen((sds)key1);
698 l2 = sdslen((sds)key2);
699 if (l1 != l2) return 0;
700 return memcmp(key1, key2, l1) == 0;
701 }
702
703 static void dictRedisObjectDestructor(void *privdata, void *val)
704 {
705 DICT_NOTUSED(privdata);
706
707 decrRefCount(val);
708 }
709
710 static int dictObjKeyCompare(void *privdata, const void *key1,
711 const void *key2)
712 {
713 const robj *o1 = key1, *o2 = key2;
714 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
715 }
716
717 static unsigned int dictObjHash(const void *key) {
718 const robj *o = key;
719 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
720 }
721
722 static int dictEncObjKeyCompare(void *privdata, const void *key1,
723 const void *key2)
724 {
725 const robj *o1 = key1, *o2 = key2;
726
727 if (o1->encoding == REDIS_ENCODING_RAW &&
728 o2->encoding == REDIS_ENCODING_RAW)
729 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
730 else {
731 robj *dec1, *dec2;
732 int cmp;
733
734 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
735 getDecodedObject(o1) : (robj*)o1;
736 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
737 getDecodedObject(o2) : (robj*)o2;
738 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
739 if (dec1 != o1) decrRefCount(dec1);
740 if (dec2 != o2) decrRefCount(dec2);
741 return cmp;
742 }
743 }
744
745 static unsigned int dictEncObjHash(const void *key) {
746 const robj *o = key;
747
748 if (o->encoding == REDIS_ENCODING_RAW)
749 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
750 else {
751 robj *dec = getDecodedObject(o);
752 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
753 decrRefCount(dec);
754 return hash;
755 }
756 }
757
758 static dictType setDictType = {
759 dictEncObjHash, /* hash function */
760 NULL, /* key dup */
761 NULL, /* val dup */
762 dictEncObjKeyCompare, /* key compare */
763 dictRedisObjectDestructor, /* key destructor */
764 NULL /* val destructor */
765 };
766
767 static dictType zsetDictType = {
768 dictEncObjHash, /* hash function */
769 NULL, /* key dup */
770 NULL, /* val dup */
771 dictEncObjKeyCompare, /* key compare */
772 dictRedisObjectDestructor, /* key destructor */
773 dictVanillaFree /* val destructor */
774 };
775
776 static dictType hashDictType = {
777 dictObjHash, /* hash function */
778 NULL, /* key dup */
779 NULL, /* val dup */
780 dictObjKeyCompare, /* key compare */
781 dictRedisObjectDestructor, /* key destructor */
782 dictRedisObjectDestructor /* val destructor */
783 };
784
785 /* ========================= Random utility functions ======================= */
786
787 /* Redis generally does not try to recover from out of memory conditions
788 * when allocating objects or strings, it is not clear if it will be possible
789 * to report this condition to the client since the networking layer itself
790 * is based on heap allocation for send buffers, so we simply abort.
791 * At least the code will be simpler to read... */
792 static void oom(const char *msg) {
793 fprintf(stderr, "%s: Out of memory\n",msg);
794 fflush(stderr);
795 sleep(1);
796 abort();
797 }
798
799 /* ====================== Redis server networking stuff ===================== */
800 static void closeTimedoutClients(void) {
801 redisClient *c;
802 listNode *ln;
803 time_t now = time(NULL);
804
805 listRewind(server.clients);
806 while ((ln = listYield(server.clients)) != NULL) {
807 c = listNodeValue(ln);
808 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
809 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
810 (now - c->lastinteraction > server.maxidletime)) {
811 redisLog(REDIS_DEBUG,"Closing idle client");
812 freeClient(c);
813 }
814 }
815 }
816
817 static int htNeedsResize(dict *dict) {
818 long long size, used;
819
820 size = dictSlots(dict);
821 used = dictSize(dict);
822 return (size && used && size > DICT_HT_INITIAL_SIZE &&
823 (used*100/size < REDIS_HT_MINFILL));
824 }
825
826 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
827 * we resize the hash table to save memory */
828 static void tryResizeHashTables(void) {
829 int j;
830
831 for (j = 0; j < server.dbnum; j++) {
832 if (htNeedsResize(server.db[j].dict)) {
833 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
834 dictResize(server.db[j].dict);
835 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
836 }
837 if (htNeedsResize(server.db[j].expires))
838 dictResize(server.db[j].expires);
839 }
840 }
841
842 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
843 int j, loops = server.cronloops++;
844 REDIS_NOTUSED(eventLoop);
845 REDIS_NOTUSED(id);
846 REDIS_NOTUSED(clientData);
847
848 /* Update the global state with the amount of used memory */
849 server.usedmemory = zmalloc_used_memory();
850
851 /* Show some info about non-empty databases */
852 for (j = 0; j < server.dbnum; j++) {
853 long long size, used, vkeys;
854
855 size = dictSlots(server.db[j].dict);
856 used = dictSize(server.db[j].dict);
857 vkeys = dictSize(server.db[j].expires);
858 if (!(loops % 5) && (used || vkeys)) {
859 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
860 /* dictPrintStats(server.dict); */
861 }
862 }
863
864 /* We don't want to resize the hash tables while a bacground saving
865 * is in progress: the saving child is created using fork() that is
866 * implemented with a copy-on-write semantic in most modern systems, so
867 * if we resize the HT while there is the saving child at work actually
868 * a lot of memory movements in the parent will cause a lot of pages
869 * copied. */
870 if (!server.bgsaveinprogress) tryResizeHashTables();
871
872 /* Show information about connected clients */
873 if (!(loops % 5)) {
874 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
875 listLength(server.clients)-listLength(server.slaves),
876 listLength(server.slaves),
877 server.usedmemory,
878 dictSize(server.sharingpool));
879 }
880
881 /* Close connections of timedout clients */
882 if (server.maxidletime && !(loops % 10))
883 closeTimedoutClients();
884
885 /* Check if a background saving in progress terminated */
886 if (server.bgsaveinprogress) {
887 int statloc;
888 if (wait4(-1,&statloc,WNOHANG,NULL)) {
889 int exitcode = WEXITSTATUS(statloc);
890 int bysignal = WIFSIGNALED(statloc);
891
892 if (!bysignal && exitcode == 0) {
893 redisLog(REDIS_NOTICE,
894 "Background saving terminated with success");
895 server.dirty = 0;
896 server.lastsave = time(NULL);
897 } else if (!bysignal && exitcode != 0) {
898 redisLog(REDIS_WARNING, "Background saving error");
899 } else {
900 redisLog(REDIS_WARNING,
901 "Background saving terminated by signal");
902 rdbRemoveTempFile(server.bgsavechildpid);
903 }
904 server.bgsaveinprogress = 0;
905 server.bgsavechildpid = -1;
906 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
907 }
908 } else {
909 /* If there is not a background saving in progress check if
910 * we have to save now */
911 time_t now = time(NULL);
912 for (j = 0; j < server.saveparamslen; j++) {
913 struct saveparam *sp = server.saveparams+j;
914
915 if (server.dirty >= sp->changes &&
916 now-server.lastsave > sp->seconds) {
917 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
918 sp->changes, sp->seconds);
919 rdbSaveBackground(server.dbfilename);
920 break;
921 }
922 }
923 }
924
925 /* Try to expire a few timed out keys */
926 for (j = 0; j < server.dbnum; j++) {
927 redisDb *db = server.db+j;
928 int num = dictSize(db->expires);
929
930 if (num) {
931 time_t now = time(NULL);
932
933 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
934 num = REDIS_EXPIRELOOKUPS_PER_CRON;
935 while (num--) {
936 dictEntry *de;
937 time_t t;
938
939 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
940 t = (time_t) dictGetEntryVal(de);
941 if (now > t) {
942 deleteKey(db,dictGetEntryKey(de));
943 }
944 }
945 }
946 }
947
948 /* Check if we should connect to a MASTER */
949 if (server.replstate == REDIS_REPL_CONNECT) {
950 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
951 if (syncWithMaster() == REDIS_OK) {
952 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
953 }
954 }
955 return 1000;
956 }
957
958 static void createSharedObjects(void) {
959 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
960 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
961 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
962 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
963 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
964 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
965 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
966 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
967 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
968 /* no such key */
969 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
970 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
971 "-ERR Operation against a key holding the wrong kind of value\r\n"));
972 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
973 "-ERR no such key\r\n"));
974 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
975 "-ERR syntax error\r\n"));
976 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
977 "-ERR source and destination objects are the same\r\n"));
978 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
979 "-ERR index out of range\r\n"));
980 shared.space = createObject(REDIS_STRING,sdsnew(" "));
981 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
982 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
983 shared.select0 = createStringObject("select 0\r\n",10);
984 shared.select1 = createStringObject("select 1\r\n",10);
985 shared.select2 = createStringObject("select 2\r\n",10);
986 shared.select3 = createStringObject("select 3\r\n",10);
987 shared.select4 = createStringObject("select 4\r\n",10);
988 shared.select5 = createStringObject("select 5\r\n",10);
989 shared.select6 = createStringObject("select 6\r\n",10);
990 shared.select7 = createStringObject("select 7\r\n",10);
991 shared.select8 = createStringObject("select 8\r\n",10);
992 shared.select9 = createStringObject("select 9\r\n",10);
993 }
994
995 static void appendServerSaveParams(time_t seconds, int changes) {
996 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
997 server.saveparams[server.saveparamslen].seconds = seconds;
998 server.saveparams[server.saveparamslen].changes = changes;
999 server.saveparamslen++;
1000 }
1001
1002 static void ResetServerSaveParams() {
1003 zfree(server.saveparams);
1004 server.saveparams = NULL;
1005 server.saveparamslen = 0;
1006 }
1007
1008 static void initServerConfig() {
1009 server.dbnum = REDIS_DEFAULT_DBNUM;
1010 server.port = REDIS_SERVERPORT;
1011 server.verbosity = REDIS_DEBUG;
1012 server.maxidletime = REDIS_MAXIDLETIME;
1013 server.saveparams = NULL;
1014 server.logfile = NULL; /* NULL = log on standard output */
1015 server.bindaddr = NULL;
1016 server.glueoutputbuf = 1;
1017 server.daemonize = 0;
1018 server.pidfile = "/var/run/redis.pid";
1019 server.dbfilename = "dump.rdb";
1020 server.requirepass = NULL;
1021 server.shareobjects = 0;
1022 server.sharingpoolsize = 1024;
1023 server.maxclients = 0;
1024 server.maxmemory = 0;
1025 ResetServerSaveParams();
1026
1027 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1028 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1029 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1030 /* Replication related */
1031 server.isslave = 0;
1032 server.masterhost = NULL;
1033 server.masterport = 6379;
1034 server.master = NULL;
1035 server.replstate = REDIS_REPL_NONE;
1036
1037 /* Double constants initialization */
1038 R_Zero = 0.0;
1039 R_PosInf = 1.0/R_Zero;
1040 R_NegInf = -1.0/R_Zero;
1041 R_Nan = R_Zero/R_Zero;
1042 }
1043
1044 static void initServer() {
1045 int j;
1046
1047 signal(SIGHUP, SIG_IGN);
1048 signal(SIGPIPE, SIG_IGN);
1049 setupSigSegvAction();
1050
1051 server.clients = listCreate();
1052 server.slaves = listCreate();
1053 server.monitors = listCreate();
1054 server.objfreelist = listCreate();
1055 createSharedObjects();
1056 server.el = aeCreateEventLoop();
1057 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1058 server.sharingpool = dictCreate(&setDictType,NULL);
1059 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1060 if (server.fd == -1) {
1061 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1062 exit(1);
1063 }
1064 for (j = 0; j < server.dbnum; j++) {
1065 server.db[j].dict = dictCreate(&hashDictType,NULL);
1066 server.db[j].expires = dictCreate(&setDictType,NULL);
1067 server.db[j].id = j;
1068 }
1069 server.cronloops = 0;
1070 server.bgsaveinprogress = 0;
1071 server.bgsavechildpid = -1;
1072 server.lastsave = time(NULL);
1073 server.dirty = 0;
1074 server.usedmemory = 0;
1075 server.stat_numcommands = 0;
1076 server.stat_numconnections = 0;
1077 server.stat_starttime = time(NULL);
1078 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1079 }
1080
1081 /* Empty the whole database */
1082 static long long emptyDb() {
1083 int j;
1084 long long removed = 0;
1085
1086 for (j = 0; j < server.dbnum; j++) {
1087 removed += dictSize(server.db[j].dict);
1088 dictEmpty(server.db[j].dict);
1089 dictEmpty(server.db[j].expires);
1090 }
1091 return removed;
1092 }
1093
1094 static int yesnotoi(char *s) {
1095 if (!strcasecmp(s,"yes")) return 1;
1096 else if (!strcasecmp(s,"no")) return 0;
1097 else return -1;
1098 }
1099
1100 /* I agree, this is a very rudimental way to load a configuration...
1101 will improve later if the config gets more complex */
1102 static void loadServerConfig(char *filename) {
1103 FILE *fp;
1104 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1105 int linenum = 0;
1106 sds line = NULL;
1107
1108 if (filename[0] == '-' && filename[1] == '\0')
1109 fp = stdin;
1110 else {
1111 if ((fp = fopen(filename,"r")) == NULL) {
1112 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1113 exit(1);
1114 }
1115 }
1116
1117 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1118 sds *argv;
1119 int argc, j;
1120
1121 linenum++;
1122 line = sdsnew(buf);
1123 line = sdstrim(line," \t\r\n");
1124
1125 /* Skip comments and blank lines*/
1126 if (line[0] == '#' || line[0] == '\0') {
1127 sdsfree(line);
1128 continue;
1129 }
1130
1131 /* Split into arguments */
1132 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1133 sdstolower(argv[0]);
1134
1135 /* Execute config directives */
1136 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1137 server.maxidletime = atoi(argv[1]);
1138 if (server.maxidletime < 0) {
1139 err = "Invalid timeout value"; goto loaderr;
1140 }
1141 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1142 server.port = atoi(argv[1]);
1143 if (server.port < 1 || server.port > 65535) {
1144 err = "Invalid port"; goto loaderr;
1145 }
1146 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1147 server.bindaddr = zstrdup(argv[1]);
1148 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1149 int seconds = atoi(argv[1]);
1150 int changes = atoi(argv[2]);
1151 if (seconds < 1 || changes < 0) {
1152 err = "Invalid save parameters"; goto loaderr;
1153 }
1154 appendServerSaveParams(seconds,changes);
1155 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1156 if (chdir(argv[1]) == -1) {
1157 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1158 argv[1], strerror(errno));
1159 exit(1);
1160 }
1161 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1162 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1163 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1164 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1165 else {
1166 err = "Invalid log level. Must be one of debug, notice, warning";
1167 goto loaderr;
1168 }
1169 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1170 FILE *logfp;
1171
1172 server.logfile = zstrdup(argv[1]);
1173 if (!strcasecmp(server.logfile,"stdout")) {
1174 zfree(server.logfile);
1175 server.logfile = NULL;
1176 }
1177 if (server.logfile) {
1178 /* Test if we are able to open the file. The server will not
1179 * be able to abort just for this problem later... */
1180 logfp = fopen(server.logfile,"a");
1181 if (logfp == NULL) {
1182 err = sdscatprintf(sdsempty(),
1183 "Can't open the log file: %s", strerror(errno));
1184 goto loaderr;
1185 }
1186 fclose(logfp);
1187 }
1188 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1189 server.dbnum = atoi(argv[1]);
1190 if (server.dbnum < 1) {
1191 err = "Invalid number of databases"; goto loaderr;
1192 }
1193 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1194 server.maxclients = atoi(argv[1]);
1195 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1196 server.maxmemory = strtoll(argv[1], NULL, 10);
1197 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1198 server.masterhost = sdsnew(argv[1]);
1199 server.masterport = atoi(argv[2]);
1200 server.replstate = REDIS_REPL_CONNECT;
1201 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1202 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1203 err = "argument must be 'yes' or 'no'"; goto loaderr;
1204 }
1205 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1206 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1207 err = "argument must be 'yes' or 'no'"; goto loaderr;
1208 }
1209 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1210 server.sharingpoolsize = atoi(argv[1]);
1211 if (server.sharingpoolsize < 1) {
1212 err = "invalid object sharing pool size"; goto loaderr;
1213 }
1214 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1215 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1216 err = "argument must be 'yes' or 'no'"; goto loaderr;
1217 }
1218 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1219 server.requirepass = zstrdup(argv[1]);
1220 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1221 server.pidfile = zstrdup(argv[1]);
1222 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1223 server.dbfilename = zstrdup(argv[1]);
1224 } else {
1225 err = "Bad directive or wrong number of arguments"; goto loaderr;
1226 }
1227 for (j = 0; j < argc; j++)
1228 sdsfree(argv[j]);
1229 zfree(argv);
1230 sdsfree(line);
1231 }
1232 if (fp != stdin) fclose(fp);
1233 return;
1234
1235 loaderr:
1236 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1237 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1238 fprintf(stderr, ">>> '%s'\n", line);
1239 fprintf(stderr, "%s\n", err);
1240 exit(1);
1241 }
1242
1243 static void freeClientArgv(redisClient *c) {
1244 int j;
1245
1246 for (j = 0; j < c->argc; j++)
1247 decrRefCount(c->argv[j]);
1248 for (j = 0; j < c->mbargc; j++)
1249 decrRefCount(c->mbargv[j]);
1250 c->argc = 0;
1251 c->mbargc = 0;
1252 }
1253
1254 static void freeClient(redisClient *c) {
1255 listNode *ln;
1256
1257 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1258 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1259 sdsfree(c->querybuf);
1260 listRelease(c->reply);
1261 freeClientArgv(c);
1262 close(c->fd);
1263 ln = listSearchKey(server.clients,c);
1264 assert(ln != NULL);
1265 listDelNode(server.clients,ln);
1266 if (c->flags & REDIS_SLAVE) {
1267 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1268 close(c->repldbfd);
1269 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1270 ln = listSearchKey(l,c);
1271 assert(ln != NULL);
1272 listDelNode(l,ln);
1273 }
1274 if (c->flags & REDIS_MASTER) {
1275 server.master = NULL;
1276 server.replstate = REDIS_REPL_CONNECT;
1277 }
1278 zfree(c->argv);
1279 zfree(c->mbargv);
1280 zfree(c);
1281 }
1282
1283 static void glueReplyBuffersIfNeeded(redisClient *c) {
1284 int totlen = 0;
1285 listNode *ln;
1286 robj *o;
1287
1288 listRewind(c->reply);
1289 while((ln = listYield(c->reply))) {
1290 o = ln->value;
1291 totlen += sdslen(o->ptr);
1292 /* This optimization makes more sense if we don't have to copy
1293 * too much data */
1294 if (totlen > 1024) return;
1295 }
1296 if (totlen > 0) {
1297 char buf[1024];
1298 int copylen = 0;
1299
1300 listRewind(c->reply);
1301 while((ln = listYield(c->reply))) {
1302 o = ln->value;
1303 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1304 copylen += sdslen(o->ptr);
1305 listDelNode(c->reply,ln);
1306 }
1307 /* Now the output buffer is empty, add the new single element */
1308 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1309 listAddNodeTail(c->reply,o);
1310 }
1311 }
1312
1313 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1314 redisClient *c = privdata;
1315 int nwritten = 0, totwritten = 0, objlen;
1316 robj *o;
1317 REDIS_NOTUSED(el);
1318 REDIS_NOTUSED(mask);
1319
1320 if (server.glueoutputbuf && listLength(c->reply) > 1)
1321 glueReplyBuffersIfNeeded(c);
1322 while(listLength(c->reply)) {
1323 o = listNodeValue(listFirst(c->reply));
1324 objlen = sdslen(o->ptr);
1325
1326 if (objlen == 0) {
1327 listDelNode(c->reply,listFirst(c->reply));
1328 continue;
1329 }
1330
1331 if (c->flags & REDIS_MASTER) {
1332 /* Don't reply to a master */
1333 nwritten = objlen - c->sentlen;
1334 } else {
1335 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1336 if (nwritten <= 0) break;
1337 }
1338 c->sentlen += nwritten;
1339 totwritten += nwritten;
1340 /* If we fully sent the object on head go to the next one */
1341 if (c->sentlen == objlen) {
1342 listDelNode(c->reply,listFirst(c->reply));
1343 c->sentlen = 0;
1344 }
1345 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1346 * bytes, in a single threaded server it's a good idea to server
1347 * other clients as well, even if a very large request comes from
1348 * super fast link that is always able to accept data (in real world
1349 * terms think to 'KEYS *' against the loopback interfae) */
1350 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1351 }
1352 if (nwritten == -1) {
1353 if (errno == EAGAIN) {
1354 nwritten = 0;
1355 } else {
1356 redisLog(REDIS_DEBUG,
1357 "Error writing to client: %s", strerror(errno));
1358 freeClient(c);
1359 return;
1360 }
1361 }
1362 if (totwritten > 0) c->lastinteraction = time(NULL);
1363 if (listLength(c->reply) == 0) {
1364 c->sentlen = 0;
1365 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1366 }
1367 }
1368
1369 static struct redisCommand *lookupCommand(char *name) {
1370 int j = 0;
1371 while(cmdTable[j].name != NULL) {
1372 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1373 j++;
1374 }
1375 return NULL;
1376 }
1377
1378 /* resetClient prepare the client to process the next command */
1379 static void resetClient(redisClient *c) {
1380 freeClientArgv(c);
1381 c->bulklen = -1;
1382 c->multibulk = 0;
1383 }
1384
1385 /* If this function gets called we already read a whole
1386 * command, argments are in the client argv/argc fields.
1387 * processCommand() execute the command or prepare the
1388 * server for a bulk read from the client.
1389 *
1390 * If 1 is returned the client is still alive and valid and
1391 * and other operations can be performed by the caller. Otherwise
1392 * if 0 is returned the client was destroied (i.e. after QUIT). */
1393 static int processCommand(redisClient *c) {
1394 struct redisCommand *cmd;
1395 long long dirty;
1396
1397 /* Free some memory if needed (maxmemory setting) */
1398 if (server.maxmemory) freeMemoryIfNeeded();
1399
1400 /* Handle the multi bulk command type. This is an alternative protocol
1401 * supported by Redis in order to receive commands that are composed of
1402 * multiple binary-safe "bulk" arguments. The latency of processing is
1403 * a bit higher but this allows things like multi-sets, so if this
1404 * protocol is used only for MSET and similar commands this is a big win. */
1405 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1406 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1407 if (c->multibulk <= 0) {
1408 resetClient(c);
1409 return 1;
1410 } else {
1411 decrRefCount(c->argv[c->argc-1]);
1412 c->argc--;
1413 return 1;
1414 }
1415 } else if (c->multibulk) {
1416 if (c->bulklen == -1) {
1417 if (((char*)c->argv[0]->ptr)[0] != '$') {
1418 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1419 resetClient(c);
1420 return 1;
1421 } else {
1422 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1423 decrRefCount(c->argv[0]);
1424 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1425 c->argc--;
1426 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1427 resetClient(c);
1428 return 1;
1429 }
1430 c->argc--;
1431 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1432 return 1;
1433 }
1434 } else {
1435 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1436 c->mbargv[c->mbargc] = c->argv[0];
1437 c->mbargc++;
1438 c->argc--;
1439 c->multibulk--;
1440 if (c->multibulk == 0) {
1441 robj **auxargv;
1442 int auxargc;
1443
1444 /* Here we need to swap the multi-bulk argc/argv with the
1445 * normal argc/argv of the client structure. */
1446 auxargv = c->argv;
1447 c->argv = c->mbargv;
1448 c->mbargv = auxargv;
1449
1450 auxargc = c->argc;
1451 c->argc = c->mbargc;
1452 c->mbargc = auxargc;
1453
1454 /* We need to set bulklen to something different than -1
1455 * in order for the code below to process the command without
1456 * to try to read the last argument of a bulk command as
1457 * a special argument. */
1458 c->bulklen = 0;
1459 /* continue below and process the command */
1460 } else {
1461 c->bulklen = -1;
1462 return 1;
1463 }
1464 }
1465 }
1466 /* -- end of multi bulk commands processing -- */
1467
1468 /* The QUIT command is handled as a special case. Normal command
1469 * procs are unable to close the client connection safely */
1470 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1471 freeClient(c);
1472 return 0;
1473 }
1474 cmd = lookupCommand(c->argv[0]->ptr);
1475 if (!cmd) {
1476 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1477 resetClient(c);
1478 return 1;
1479 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1480 (c->argc < -cmd->arity)) {
1481 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1482 resetClient(c);
1483 return 1;
1484 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1485 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1486 resetClient(c);
1487 return 1;
1488 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1489 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1490
1491 decrRefCount(c->argv[c->argc-1]);
1492 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1493 c->argc--;
1494 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1495 resetClient(c);
1496 return 1;
1497 }
1498 c->argc--;
1499 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1500 /* It is possible that the bulk read is already in the
1501 * buffer. Check this condition and handle it accordingly.
1502 * This is just a fast path, alternative to call processInputBuffer().
1503 * It's a good idea since the code is small and this condition
1504 * happens most of the times. */
1505 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1506 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1507 c->argc++;
1508 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1509 } else {
1510 return 1;
1511 }
1512 }
1513 /* Let's try to share objects on the command arguments vector */
1514 if (server.shareobjects) {
1515 int j;
1516 for(j = 1; j < c->argc; j++)
1517 c->argv[j] = tryObjectSharing(c->argv[j]);
1518 }
1519 /* Let's try to encode the bulk object to save space. */
1520 if (cmd->flags & REDIS_CMD_BULK)
1521 tryObjectEncoding(c->argv[c->argc-1]);
1522
1523 /* Check if the user is authenticated */
1524 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1525 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1526 resetClient(c);
1527 return 1;
1528 }
1529
1530 /* Exec the command */
1531 dirty = server.dirty;
1532 cmd->proc(c);
1533 if (server.dirty-dirty != 0 && listLength(server.slaves))
1534 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1535 if (listLength(server.monitors))
1536 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1537 server.stat_numcommands++;
1538
1539 /* Prepare the client for the next command */
1540 if (c->flags & REDIS_CLOSE) {
1541 freeClient(c);
1542 return 0;
1543 }
1544 resetClient(c);
1545 return 1;
1546 }
1547
1548 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1549 listNode *ln;
1550 int outc = 0, j;
1551 robj **outv;
1552 /* (args*2)+1 is enough room for args, spaces, newlines */
1553 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1554
1555 if (argc <= REDIS_STATIC_ARGS) {
1556 outv = static_outv;
1557 } else {
1558 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1559 }
1560
1561 for (j = 0; j < argc; j++) {
1562 if (j != 0) outv[outc++] = shared.space;
1563 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1564 robj *lenobj;
1565
1566 lenobj = createObject(REDIS_STRING,
1567 sdscatprintf(sdsempty(),"%d\r\n",
1568 stringObjectLen(argv[j])));
1569 lenobj->refcount = 0;
1570 outv[outc++] = lenobj;
1571 }
1572 outv[outc++] = argv[j];
1573 }
1574 outv[outc++] = shared.crlf;
1575
1576 /* Increment all the refcounts at start and decrement at end in order to
1577 * be sure to free objects if there is no slave in a replication state
1578 * able to be feed with commands */
1579 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1580 listRewind(slaves);
1581 while((ln = listYield(slaves))) {
1582 redisClient *slave = ln->value;
1583
1584 /* Don't feed slaves that are still waiting for BGSAVE to start */
1585 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1586
1587 /* Feed all the other slaves, MONITORs and so on */
1588 if (slave->slaveseldb != dictid) {
1589 robj *selectcmd;
1590
1591 switch(dictid) {
1592 case 0: selectcmd = shared.select0; break;
1593 case 1: selectcmd = shared.select1; break;
1594 case 2: selectcmd = shared.select2; break;
1595 case 3: selectcmd = shared.select3; break;
1596 case 4: selectcmd = shared.select4; break;
1597 case 5: selectcmd = shared.select5; break;
1598 case 6: selectcmd = shared.select6; break;
1599 case 7: selectcmd = shared.select7; break;
1600 case 8: selectcmd = shared.select8; break;
1601 case 9: selectcmd = shared.select9; break;
1602 default:
1603 selectcmd = createObject(REDIS_STRING,
1604 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1605 selectcmd->refcount = 0;
1606 break;
1607 }
1608 addReply(slave,selectcmd);
1609 slave->slaveseldb = dictid;
1610 }
1611 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1612 }
1613 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1614 if (outv != static_outv) zfree(outv);
1615 }
1616
1617 static void processInputBuffer(redisClient *c) {
1618 again:
1619 if (c->bulklen == -1) {
1620 /* Read the first line of the query */
1621 char *p = strchr(c->querybuf,'\n');
1622 size_t querylen;
1623
1624 if (p) {
1625 sds query, *argv;
1626 int argc, j;
1627
1628 query = c->querybuf;
1629 c->querybuf = sdsempty();
1630 querylen = 1+(p-(query));
1631 if (sdslen(query) > querylen) {
1632 /* leave data after the first line of the query in the buffer */
1633 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1634 }
1635 *p = '\0'; /* remove "\n" */
1636 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1637 sdsupdatelen(query);
1638
1639 /* Now we can split the query in arguments */
1640 if (sdslen(query) == 0) {
1641 /* Ignore empty query */
1642 sdsfree(query);
1643 return;
1644 }
1645 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1646 sdsfree(query);
1647
1648 if (c->argv) zfree(c->argv);
1649 c->argv = zmalloc(sizeof(robj*)*argc);
1650
1651 for (j = 0; j < argc; j++) {
1652 if (sdslen(argv[j])) {
1653 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1654 c->argc++;
1655 } else {
1656 sdsfree(argv[j]);
1657 }
1658 }
1659 zfree(argv);
1660 /* Execute the command. If the client is still valid
1661 * after processCommand() return and there is something
1662 * on the query buffer try to process the next command. */
1663 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1664 return;
1665 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1666 redisLog(REDIS_DEBUG, "Client protocol error");
1667 freeClient(c);
1668 return;
1669 }
1670 } else {
1671 /* Bulk read handling. Note that if we are at this point
1672 the client already sent a command terminated with a newline,
1673 we are reading the bulk data that is actually the last
1674 argument of the command. */
1675 int qbl = sdslen(c->querybuf);
1676
1677 if (c->bulklen <= qbl) {
1678 /* Copy everything but the final CRLF as final argument */
1679 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1680 c->argc++;
1681 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1682 /* Process the command. If the client is still valid after
1683 * the processing and there is more data in the buffer
1684 * try to parse it. */
1685 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1686 return;
1687 }
1688 }
1689 }
1690
1691 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1692 redisClient *c = (redisClient*) privdata;
1693 char buf[REDIS_IOBUF_LEN];
1694 int nread;
1695 REDIS_NOTUSED(el);
1696 REDIS_NOTUSED(mask);
1697
1698 nread = read(fd, buf, REDIS_IOBUF_LEN);
1699 if (nread == -1) {
1700 if (errno == EAGAIN) {
1701 nread = 0;
1702 } else {
1703 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1704 freeClient(c);
1705 return;
1706 }
1707 } else if (nread == 0) {
1708 redisLog(REDIS_DEBUG, "Client closed connection");
1709 freeClient(c);
1710 return;
1711 }
1712 if (nread) {
1713 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1714 c->lastinteraction = time(NULL);
1715 } else {
1716 return;
1717 }
1718 processInputBuffer(c);
1719 }
1720
1721 static int selectDb(redisClient *c, int id) {
1722 if (id < 0 || id >= server.dbnum)
1723 return REDIS_ERR;
1724 c->db = &server.db[id];
1725 return REDIS_OK;
1726 }
1727
1728 static void *dupClientReplyValue(void *o) {
1729 incrRefCount((robj*)o);
1730 return 0;
1731 }
1732
1733 static redisClient *createClient(int fd) {
1734 redisClient *c = zmalloc(sizeof(*c));
1735
1736 anetNonBlock(NULL,fd);
1737 anetTcpNoDelay(NULL,fd);
1738 if (!c) return NULL;
1739 selectDb(c,0);
1740 c->fd = fd;
1741 c->querybuf = sdsempty();
1742 c->argc = 0;
1743 c->argv = NULL;
1744 c->bulklen = -1;
1745 c->multibulk = 0;
1746 c->mbargc = 0;
1747 c->mbargv = NULL;
1748 c->sentlen = 0;
1749 c->flags = 0;
1750 c->lastinteraction = time(NULL);
1751 c->authenticated = 0;
1752 c->replstate = REDIS_REPL_NONE;
1753 c->reply = listCreate();
1754 listSetFreeMethod(c->reply,decrRefCount);
1755 listSetDupMethod(c->reply,dupClientReplyValue);
1756 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1757 readQueryFromClient, c, NULL) == AE_ERR) {
1758 freeClient(c);
1759 return NULL;
1760 }
1761 listAddNodeTail(server.clients,c);
1762 return c;
1763 }
1764
1765 static void addReply(redisClient *c, robj *obj) {
1766 if (listLength(c->reply) == 0 &&
1767 (c->replstate == REDIS_REPL_NONE ||
1768 c->replstate == REDIS_REPL_ONLINE) &&
1769 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1770 sendReplyToClient, c, NULL) == AE_ERR) return;
1771 if (obj->encoding != REDIS_ENCODING_RAW) {
1772 obj = getDecodedObject(obj);
1773 } else {
1774 incrRefCount(obj);
1775 }
1776 listAddNodeTail(c->reply,obj);
1777 }
1778
1779 static void addReplySds(redisClient *c, sds s) {
1780 robj *o = createObject(REDIS_STRING,s);
1781 addReply(c,o);
1782 decrRefCount(o);
1783 }
1784
1785 static void addReplyBulkLen(redisClient *c, robj *obj) {
1786 size_t len;
1787
1788 if (obj->encoding == REDIS_ENCODING_RAW) {
1789 len = sdslen(obj->ptr);
1790 } else {
1791 long n = (long)obj->ptr;
1792
1793 len = 1;
1794 if (n < 0) {
1795 len++;
1796 n = -n;
1797 }
1798 while((n = n/10) != 0) {
1799 len++;
1800 }
1801 }
1802 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1803 }
1804
1805 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1806 int cport, cfd;
1807 char cip[128];
1808 redisClient *c;
1809 REDIS_NOTUSED(el);
1810 REDIS_NOTUSED(mask);
1811 REDIS_NOTUSED(privdata);
1812
1813 cfd = anetAccept(server.neterr, fd, cip, &cport);
1814 if (cfd == AE_ERR) {
1815 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1816 return;
1817 }
1818 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1819 if ((c = createClient(cfd)) == NULL) {
1820 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1821 close(cfd); /* May be already closed, just ingore errors */
1822 return;
1823 }
1824 /* If maxclient directive is set and this is one client more... close the
1825 * connection. Note that we create the client instead to check before
1826 * for this condition, since now the socket is already set in nonblocking
1827 * mode and we can send an error for free using the Kernel I/O */
1828 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1829 char *err = "-ERR max number of clients reached\r\n";
1830
1831 /* That's a best effort error message, don't check write errors */
1832 (void) write(c->fd,err,strlen(err));
1833 freeClient(c);
1834 return;
1835 }
1836 server.stat_numconnections++;
1837 }
1838
1839 /* ======================= Redis objects implementation ===================== */
1840
1841 static robj *createObject(int type, void *ptr) {
1842 robj *o;
1843
1844 if (listLength(server.objfreelist)) {
1845 listNode *head = listFirst(server.objfreelist);
1846 o = listNodeValue(head);
1847 listDelNode(server.objfreelist,head);
1848 } else {
1849 o = zmalloc(sizeof(*o));
1850 }
1851 o->type = type;
1852 o->encoding = REDIS_ENCODING_RAW;
1853 o->ptr = ptr;
1854 o->refcount = 1;
1855 return o;
1856 }
1857
1858 static robj *createStringObject(char *ptr, size_t len) {
1859 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1860 }
1861
1862 static robj *createListObject(void) {
1863 list *l = listCreate();
1864
1865 listSetFreeMethod(l,decrRefCount);
1866 return createObject(REDIS_LIST,l);
1867 }
1868
1869 static robj *createSetObject(void) {
1870 dict *d = dictCreate(&setDictType,NULL);
1871 return createObject(REDIS_SET,d);
1872 }
1873
1874 static robj *createZsetObject(void) {
1875 zset *zs = zmalloc(sizeof(*zs));
1876
1877 zs->dict = dictCreate(&zsetDictType,NULL);
1878 zs->zsl = zslCreate();
1879 return createObject(REDIS_ZSET,zs);
1880 }
1881
1882 static void freeStringObject(robj *o) {
1883 if (o->encoding == REDIS_ENCODING_RAW) {
1884 sdsfree(o->ptr);
1885 }
1886 }
1887
1888 static void freeListObject(robj *o) {
1889 listRelease((list*) o->ptr);
1890 }
1891
1892 static void freeSetObject(robj *o) {
1893 dictRelease((dict*) o->ptr);
1894 }
1895
1896 static void freeZsetObject(robj *o) {
1897 zset *zs = o->ptr;
1898
1899 dictRelease(zs->dict);
1900 zslFree(zs->zsl);
1901 zfree(zs);
1902 }
1903
1904 static void freeHashObject(robj *o) {
1905 dictRelease((dict*) o->ptr);
1906 }
1907
1908 static void incrRefCount(robj *o) {
1909 o->refcount++;
1910 #ifdef DEBUG_REFCOUNT
1911 if (o->type == REDIS_STRING)
1912 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1913 #endif
1914 }
1915
1916 static void decrRefCount(void *obj) {
1917 robj *o = obj;
1918
1919 #ifdef DEBUG_REFCOUNT
1920 if (o->type == REDIS_STRING)
1921 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1922 #endif
1923 if (--(o->refcount) == 0) {
1924 switch(o->type) {
1925 case REDIS_STRING: freeStringObject(o); break;
1926 case REDIS_LIST: freeListObject(o); break;
1927 case REDIS_SET: freeSetObject(o); break;
1928 case REDIS_ZSET: freeZsetObject(o); break;
1929 case REDIS_HASH: freeHashObject(o); break;
1930 default: assert(0 != 0); break;
1931 }
1932 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1933 !listAddNodeHead(server.objfreelist,o))
1934 zfree(o);
1935 }
1936 }
1937
1938 static robj *lookupKey(redisDb *db, robj *key) {
1939 dictEntry *de = dictFind(db->dict,key);
1940 return de ? dictGetEntryVal(de) : NULL;
1941 }
1942
1943 static robj *lookupKeyRead(redisDb *db, robj *key) {
1944 expireIfNeeded(db,key);
1945 return lookupKey(db,key);
1946 }
1947
1948 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1949 deleteIfVolatile(db,key);
1950 return lookupKey(db,key);
1951 }
1952
1953 static int deleteKey(redisDb *db, robj *key) {
1954 int retval;
1955
1956 /* We need to protect key from destruction: after the first dictDelete()
1957 * it may happen that 'key' is no longer valid if we don't increment
1958 * it's count. This may happen when we get the object reference directly
1959 * from the hash table with dictRandomKey() or dict iterators */
1960 incrRefCount(key);
1961 if (dictSize(db->expires)) dictDelete(db->expires,key);
1962 retval = dictDelete(db->dict,key);
1963 decrRefCount(key);
1964
1965 return retval == DICT_OK;
1966 }
1967
1968 /* Try to share an object against the shared objects pool */
1969 static robj *tryObjectSharing(robj *o) {
1970 struct dictEntry *de;
1971 unsigned long c;
1972
1973 if (o == NULL || server.shareobjects == 0) return o;
1974
1975 assert(o->type == REDIS_STRING);
1976 de = dictFind(server.sharingpool,o);
1977 if (de) {
1978 robj *shared = dictGetEntryKey(de);
1979
1980 c = ((unsigned long) dictGetEntryVal(de))+1;
1981 dictGetEntryVal(de) = (void*) c;
1982 incrRefCount(shared);
1983 decrRefCount(o);
1984 return shared;
1985 } else {
1986 /* Here we are using a stream algorihtm: Every time an object is
1987 * shared we increment its count, everytime there is a miss we
1988 * recrement the counter of a random object. If this object reaches
1989 * zero we remove the object and put the current object instead. */
1990 if (dictSize(server.sharingpool) >=
1991 server.sharingpoolsize) {
1992 de = dictGetRandomKey(server.sharingpool);
1993 assert(de != NULL);
1994 c = ((unsigned long) dictGetEntryVal(de))-1;
1995 dictGetEntryVal(de) = (void*) c;
1996 if (c == 0) {
1997 dictDelete(server.sharingpool,de->key);
1998 }
1999 } else {
2000 c = 0; /* If the pool is empty we want to add this object */
2001 }
2002 if (c == 0) {
2003 int retval;
2004
2005 retval = dictAdd(server.sharingpool,o,(void*)1);
2006 assert(retval == DICT_OK);
2007 incrRefCount(o);
2008 }
2009 return o;
2010 }
2011 }
2012
2013 /* Check if the nul-terminated string 's' can be represented by a long
2014 * (that is, is a number that fits into long without any other space or
2015 * character before or after the digits).
2016 *
2017 * If so, the function returns REDIS_OK and *longval is set to the value
2018 * of the number. Otherwise REDIS_ERR is returned */
2019 static int isStringRepresentableAsLong(sds s, long *longval) {
2020 char buf[32], *endptr;
2021 long value;
2022 int slen;
2023
2024 value = strtol(s, &endptr, 10);
2025 if (endptr[0] != '\0') return REDIS_ERR;
2026 slen = snprintf(buf,32,"%ld",value);
2027
2028 /* If the number converted back into a string is not identical
2029 * then it's not possible to encode the string as integer */
2030 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2031 if (longval) *longval = value;
2032 return REDIS_OK;
2033 }
2034
2035 /* Try to encode a string object in order to save space */
2036 static int tryObjectEncoding(robj *o) {
2037 long value;
2038 sds s = o->ptr;
2039
2040 if (o->encoding != REDIS_ENCODING_RAW)
2041 return REDIS_ERR; /* Already encoded */
2042
2043 /* It's not save to encode shared objects: shared objects can be shared
2044 * everywhere in the "object space" of Redis. Encoded objects can only
2045 * appear as "values" (and not, for instance, as keys) */
2046 if (o->refcount > 1) return REDIS_ERR;
2047
2048 /* Currently we try to encode only strings */
2049 assert(o->type == REDIS_STRING);
2050
2051 /* Check if we can represent this string as a long integer */
2052 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2053
2054 /* Ok, this object can be encoded */
2055 o->encoding = REDIS_ENCODING_INT;
2056 sdsfree(o->ptr);
2057 o->ptr = (void*) value;
2058 return REDIS_OK;
2059 }
2060
2061 /* Get a decoded version of an encoded object (returned as a new object) */
2062 static robj *getDecodedObject(const robj *o) {
2063 robj *dec;
2064
2065 assert(o->encoding != REDIS_ENCODING_RAW);
2066 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2067 char buf[32];
2068
2069 snprintf(buf,32,"%ld",(long)o->ptr);
2070 dec = createStringObject(buf,strlen(buf));
2071 return dec;
2072 } else {
2073 assert(1 != 1);
2074 }
2075 }
2076
2077 static int compareStringObjects(robj *a, robj *b) {
2078 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2079
2080 if (a == b) return 0;
2081 if (a->encoding == REDIS_ENCODING_INT && b->encoding == REDIS_ENCODING_INT){
2082 return (long)a->ptr - (long)b->ptr;
2083 } else {
2084 int retval;
2085
2086 incrRefCount(a);
2087 incrRefCount(b);
2088 if (a->encoding != REDIS_ENCODING_RAW) a = getDecodedObject(a);
2089 if (b->encoding != REDIS_ENCODING_RAW) b = getDecodedObject(a);
2090 retval = sdscmp(a->ptr,b->ptr);
2091 decrRefCount(a);
2092 decrRefCount(b);
2093 return retval;
2094 }
2095 }
2096
2097 static size_t stringObjectLen(robj *o) {
2098 assert(o->type == REDIS_STRING);
2099 if (o->encoding == REDIS_ENCODING_RAW) {
2100 return sdslen(o->ptr);
2101 } else {
2102 char buf[32];
2103
2104 return snprintf(buf,32,"%ld",(long)o->ptr);
2105 }
2106 }
2107
2108 /*============================ DB saving/loading ============================ */
2109
2110 static int rdbSaveType(FILE *fp, unsigned char type) {
2111 if (fwrite(&type,1,1,fp) == 0) return -1;
2112 return 0;
2113 }
2114
2115 static int rdbSaveTime(FILE *fp, time_t t) {
2116 int32_t t32 = (int32_t) t;
2117 if (fwrite(&t32,4,1,fp) == 0) return -1;
2118 return 0;
2119 }
2120
2121 /* check rdbLoadLen() comments for more info */
2122 static int rdbSaveLen(FILE *fp, uint32_t len) {
2123 unsigned char buf[2];
2124
2125 if (len < (1<<6)) {
2126 /* Save a 6 bit len */
2127 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2128 if (fwrite(buf,1,1,fp) == 0) return -1;
2129 } else if (len < (1<<14)) {
2130 /* Save a 14 bit len */
2131 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2132 buf[1] = len&0xFF;
2133 if (fwrite(buf,2,1,fp) == 0) return -1;
2134 } else {
2135 /* Save a 32 bit len */
2136 buf[0] = (REDIS_RDB_32BITLEN<<6);
2137 if (fwrite(buf,1,1,fp) == 0) return -1;
2138 len = htonl(len);
2139 if (fwrite(&len,4,1,fp) == 0) return -1;
2140 }
2141 return 0;
2142 }
2143
2144 /* String objects in the form "2391" "-100" without any space and with a
2145 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2146 * encoded as integers to save space */
2147 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2148 long long value;
2149 char *endptr, buf[32];
2150
2151 /* Check if it's possible to encode this value as a number */
2152 value = strtoll(s, &endptr, 10);
2153 if (endptr[0] != '\0') return 0;
2154 snprintf(buf,32,"%lld",value);
2155
2156 /* If the number converted back into a string is not identical
2157 * then it's not possible to encode the string as integer */
2158 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2159
2160 /* Finally check if it fits in our ranges */
2161 if (value >= -(1<<7) && value <= (1<<7)-1) {
2162 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2163 enc[1] = value&0xFF;
2164 return 2;
2165 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2166 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2167 enc[1] = value&0xFF;
2168 enc[2] = (value>>8)&0xFF;
2169 return 3;
2170 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2171 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2172 enc[1] = value&0xFF;
2173 enc[2] = (value>>8)&0xFF;
2174 enc[3] = (value>>16)&0xFF;
2175 enc[4] = (value>>24)&0xFF;
2176 return 5;
2177 } else {
2178 return 0;
2179 }
2180 }
2181
2182 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2183 unsigned int comprlen, outlen;
2184 unsigned char byte;
2185 void *out;
2186
2187 /* We require at least four bytes compression for this to be worth it */
2188 outlen = sdslen(obj->ptr)-4;
2189 if (outlen <= 0) return 0;
2190 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2191 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2192 if (comprlen == 0) {
2193 zfree(out);
2194 return 0;
2195 }
2196 /* Data compressed! Let's save it on disk */
2197 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2198 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2199 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2200 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2201 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2202 zfree(out);
2203 return comprlen;
2204
2205 writeerr:
2206 zfree(out);
2207 return -1;
2208 }
2209
2210 /* Save a string objet as [len][data] on disk. If the object is a string
2211 * representation of an integer value we try to safe it in a special form */
2212 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2213 size_t len;
2214 int enclen;
2215
2216 len = sdslen(obj->ptr);
2217
2218 /* Try integer encoding */
2219 if (len <= 11) {
2220 unsigned char buf[5];
2221 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2222 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2223 return 0;
2224 }
2225 }
2226
2227 /* Try LZF compression - under 20 bytes it's unable to compress even
2228 * aaaaaaaaaaaaaaaaaa so skip it */
2229 if (len > 20) {
2230 int retval;
2231
2232 retval = rdbSaveLzfStringObject(fp,obj);
2233 if (retval == -1) return -1;
2234 if (retval > 0) return 0;
2235 /* retval == 0 means data can't be compressed, save the old way */
2236 }
2237
2238 /* Store verbatim */
2239 if (rdbSaveLen(fp,len) == -1) return -1;
2240 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2241 return 0;
2242 }
2243
2244 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2245 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2246 int retval;
2247 robj *dec;
2248
2249 if (obj->encoding != REDIS_ENCODING_RAW) {
2250 dec = getDecodedObject(obj);
2251 retval = rdbSaveStringObjectRaw(fp,dec);
2252 decrRefCount(dec);
2253 return retval;
2254 } else {
2255 return rdbSaveStringObjectRaw(fp,obj);
2256 }
2257 }
2258
2259 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2260 * 8 bit integer specifing the length of the representation.
2261 * This 8 bit integer has special values in order to specify the following
2262 * conditions:
2263 * 253: not a number
2264 * 254: + inf
2265 * 255: - inf
2266 */
2267 static int rdbSaveDoubleValue(FILE *fp, double val) {
2268 unsigned char buf[128];
2269 int len;
2270
2271 if (isnan(val)) {
2272 buf[0] = 253;
2273 len = 1;
2274 } else if (!isfinite(val)) {
2275 len = 1;
2276 buf[0] = (val < 0) ? 255 : 254;
2277 } else {
2278 snprintf((char*)buf+1,sizeof(buf)-1,"%.16g",val);
2279 buf[0] = strlen((char*)buf);
2280 len = buf[0]+1;
2281 }
2282 if (fwrite(buf,len,1,fp) == 0) return -1;
2283 return 0;
2284 }
2285
2286 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2287 static int rdbSave(char *filename) {
2288 dictIterator *di = NULL;
2289 dictEntry *de;
2290 FILE *fp;
2291 char tmpfile[256];
2292 int j;
2293 time_t now = time(NULL);
2294
2295 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2296 fp = fopen(tmpfile,"w");
2297 if (!fp) {
2298 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2299 return REDIS_ERR;
2300 }
2301 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2302 for (j = 0; j < server.dbnum; j++) {
2303 redisDb *db = server.db+j;
2304 dict *d = db->dict;
2305 if (dictSize(d) == 0) continue;
2306 di = dictGetIterator(d);
2307 if (!di) {
2308 fclose(fp);
2309 return REDIS_ERR;
2310 }
2311
2312 /* Write the SELECT DB opcode */
2313 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2314 if (rdbSaveLen(fp,j) == -1) goto werr;
2315
2316 /* Iterate this DB writing every entry */
2317 while((de = dictNext(di)) != NULL) {
2318 robj *key = dictGetEntryKey(de);
2319 robj *o = dictGetEntryVal(de);
2320 time_t expiretime = getExpire(db,key);
2321
2322 /* Save the expire time */
2323 if (expiretime != -1) {
2324 /* If this key is already expired skip it */
2325 if (expiretime < now) continue;
2326 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2327 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2328 }
2329 /* Save the key and associated value */
2330 if (rdbSaveType(fp,o->type) == -1) goto werr;
2331 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2332 if (o->type == REDIS_STRING) {
2333 /* Save a string value */
2334 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2335 } else if (o->type == REDIS_LIST) {
2336 /* Save a list value */
2337 list *list = o->ptr;
2338 listNode *ln;
2339
2340 listRewind(list);
2341 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2342 while((ln = listYield(list))) {
2343 robj *eleobj = listNodeValue(ln);
2344
2345 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2346 }
2347 } else if (o->type == REDIS_SET) {
2348 /* Save a set value */
2349 dict *set = o->ptr;
2350 dictIterator *di = dictGetIterator(set);
2351 dictEntry *de;
2352
2353 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2354 while((de = dictNext(di)) != NULL) {
2355 robj *eleobj = dictGetEntryKey(de);
2356
2357 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2358 }
2359 dictReleaseIterator(di);
2360 } else if (o->type == REDIS_ZSET) {
2361 /* Save a set value */
2362 zset *zs = o->ptr;
2363 dictIterator *di = dictGetIterator(zs->dict);
2364 dictEntry *de;
2365
2366 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2367 while((de = dictNext(di)) != NULL) {
2368 robj *eleobj = dictGetEntryKey(de);
2369 double *score = dictGetEntryVal(de);
2370
2371 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2372 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2373 }
2374 dictReleaseIterator(di);
2375 } else {
2376 assert(0 != 0);
2377 }
2378 }
2379 dictReleaseIterator(di);
2380 }
2381 /* EOF opcode */
2382 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2383
2384 /* Make sure data will not remain on the OS's output buffers */
2385 fflush(fp);
2386 fsync(fileno(fp));
2387 fclose(fp);
2388
2389 /* Use RENAME to make sure the DB file is changed atomically only
2390 * if the generate DB file is ok. */
2391 if (rename(tmpfile,filename) == -1) {
2392 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2393 unlink(tmpfile);
2394 return REDIS_ERR;
2395 }
2396 redisLog(REDIS_NOTICE,"DB saved on disk");
2397 server.dirty = 0;
2398 server.lastsave = time(NULL);
2399 return REDIS_OK;
2400
2401 werr:
2402 fclose(fp);
2403 unlink(tmpfile);
2404 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2405 if (di) dictReleaseIterator(di);
2406 return REDIS_ERR;
2407 }
2408
2409 static int rdbSaveBackground(char *filename) {
2410 pid_t childpid;
2411
2412 if (server.bgsaveinprogress) return REDIS_ERR;
2413 if ((childpid = fork()) == 0) {
2414 /* Child */
2415 close(server.fd);
2416 if (rdbSave(filename) == REDIS_OK) {
2417 exit(0);
2418 } else {
2419 exit(1);
2420 }
2421 } else {
2422 /* Parent */
2423 if (childpid == -1) {
2424 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2425 strerror(errno));
2426 return REDIS_ERR;
2427 }
2428 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2429 server.bgsaveinprogress = 1;
2430 server.bgsavechildpid = childpid;
2431 return REDIS_OK;
2432 }
2433 return REDIS_OK; /* unreached */
2434 }
2435
2436 static void rdbRemoveTempFile(pid_t childpid) {
2437 char tmpfile[256];
2438
2439 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2440 unlink(tmpfile);
2441 }
2442
2443 static int rdbLoadType(FILE *fp) {
2444 unsigned char type;
2445 if (fread(&type,1,1,fp) == 0) return -1;
2446 return type;
2447 }
2448
2449 static time_t rdbLoadTime(FILE *fp) {
2450 int32_t t32;
2451 if (fread(&t32,4,1,fp) == 0) return -1;
2452 return (time_t) t32;
2453 }
2454
2455 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2456 * of this file for a description of how this are stored on disk.
2457 *
2458 * isencoded is set to 1 if the readed length is not actually a length but
2459 * an "encoding type", check the above comments for more info */
2460 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2461 unsigned char buf[2];
2462 uint32_t len;
2463
2464 if (isencoded) *isencoded = 0;
2465 if (rdbver == 0) {
2466 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2467 return ntohl(len);
2468 } else {
2469 int type;
2470
2471 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2472 type = (buf[0]&0xC0)>>6;
2473 if (type == REDIS_RDB_6BITLEN) {
2474 /* Read a 6 bit len */
2475 return buf[0]&0x3F;
2476 } else if (type == REDIS_RDB_ENCVAL) {
2477 /* Read a 6 bit len encoding type */
2478 if (isencoded) *isencoded = 1;
2479 return buf[0]&0x3F;
2480 } else if (type == REDIS_RDB_14BITLEN) {
2481 /* Read a 14 bit len */
2482 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2483 return ((buf[0]&0x3F)<<8)|buf[1];
2484 } else {
2485 /* Read a 32 bit len */
2486 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2487 return ntohl(len);
2488 }
2489 }
2490 }
2491
2492 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2493 unsigned char enc[4];
2494 long long val;
2495
2496 if (enctype == REDIS_RDB_ENC_INT8) {
2497 if (fread(enc,1,1,fp) == 0) return NULL;
2498 val = (signed char)enc[0];
2499 } else if (enctype == REDIS_RDB_ENC_INT16) {
2500 uint16_t v;
2501 if (fread(enc,2,1,fp) == 0) return NULL;
2502 v = enc[0]|(enc[1]<<8);
2503 val = (int16_t)v;
2504 } else if (enctype == REDIS_RDB_ENC_INT32) {
2505 uint32_t v;
2506 if (fread(enc,4,1,fp) == 0) return NULL;
2507 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2508 val = (int32_t)v;
2509 } else {
2510 val = 0; /* anti-warning */
2511 assert(0!=0);
2512 }
2513 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2514 }
2515
2516 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2517 unsigned int len, clen;
2518 unsigned char *c = NULL;
2519 sds val = NULL;
2520
2521 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2522 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2523 if ((c = zmalloc(clen)) == NULL) goto err;
2524 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2525 if (fread(c,clen,1,fp) == 0) goto err;
2526 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2527 zfree(c);
2528 return createObject(REDIS_STRING,val);
2529 err:
2530 zfree(c);
2531 sdsfree(val);
2532 return NULL;
2533 }
2534
2535 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2536 int isencoded;
2537 uint32_t len;
2538 sds val;
2539
2540 len = rdbLoadLen(fp,rdbver,&isencoded);
2541 if (isencoded) {
2542 switch(len) {
2543 case REDIS_RDB_ENC_INT8:
2544 case REDIS_RDB_ENC_INT16:
2545 case REDIS_RDB_ENC_INT32:
2546 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2547 case REDIS_RDB_ENC_LZF:
2548 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2549 default:
2550 assert(0!=0);
2551 }
2552 }
2553
2554 if (len == REDIS_RDB_LENERR) return NULL;
2555 val = sdsnewlen(NULL,len);
2556 if (len && fread(val,len,1,fp) == 0) {
2557 sdsfree(val);
2558 return NULL;
2559 }
2560 return tryObjectSharing(createObject(REDIS_STRING,val));
2561 }
2562
2563 /* For information about double serialization check rdbSaveDoubleValue() */
2564 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2565 char buf[128];
2566 unsigned char len;
2567
2568 if (fread(&len,1,1,fp) == 0) return -1;
2569 switch(len) {
2570 case 255: *val = R_NegInf; return 0;
2571 case 254: *val = R_PosInf; return 0;
2572 case 253: *val = R_Nan; return 0;
2573 default:
2574 if (fread(buf,len,1,fp) == 0) return -1;
2575 sscanf(buf, "%lg", val);
2576 return 0;
2577 }
2578 }
2579
2580 static int rdbLoad(char *filename) {
2581 FILE *fp;
2582 robj *keyobj = NULL;
2583 uint32_t dbid;
2584 int type, retval, rdbver;
2585 dict *d = server.db[0].dict;
2586 redisDb *db = server.db+0;
2587 char buf[1024];
2588 time_t expiretime = -1, now = time(NULL);
2589
2590 fp = fopen(filename,"r");
2591 if (!fp) return REDIS_ERR;
2592 if (fread(buf,9,1,fp) == 0) goto eoferr;
2593 buf[9] = '\0';
2594 if (memcmp(buf,"REDIS",5) != 0) {
2595 fclose(fp);
2596 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2597 return REDIS_ERR;
2598 }
2599 rdbver = atoi(buf+5);
2600 if (rdbver > 1) {
2601 fclose(fp);
2602 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2603 return REDIS_ERR;
2604 }
2605 while(1) {
2606 robj *o;
2607
2608 /* Read type. */
2609 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2610 if (type == REDIS_EXPIRETIME) {
2611 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2612 /* We read the time so we need to read the object type again */
2613 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2614 }
2615 if (type == REDIS_EOF) break;
2616 /* Handle SELECT DB opcode as a special case */
2617 if (type == REDIS_SELECTDB) {
2618 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2619 goto eoferr;
2620 if (dbid >= (unsigned)server.dbnum) {
2621 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2622 exit(1);
2623 }
2624 db = server.db+dbid;
2625 d = db->dict;
2626 continue;
2627 }
2628 /* Read key */
2629 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2630
2631 if (type == REDIS_STRING) {
2632 /* Read string value */
2633 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2634 tryObjectEncoding(o);
2635 } else if (type == REDIS_LIST || type == REDIS_SET) {
2636 /* Read list/set value */
2637 uint32_t listlen;
2638
2639 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2640 goto eoferr;
2641 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2642 /* Load every single element of the list/set */
2643 while(listlen--) {
2644 robj *ele;
2645
2646 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2647 tryObjectEncoding(ele);
2648 if (type == REDIS_LIST) {
2649 listAddNodeTail((list*)o->ptr,ele);
2650 } else {
2651 dictAdd((dict*)o->ptr,ele,NULL);
2652 }
2653 }
2654 } else if (type == REDIS_ZSET) {
2655 /* Read list/set value */
2656 uint32_t zsetlen;
2657 zset *zs;
2658
2659 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2660 goto eoferr;
2661 o = createZsetObject();
2662 zs = o->ptr;
2663 /* Load every single element of the list/set */
2664 while(zsetlen--) {
2665 robj *ele;
2666 double *score = zmalloc(sizeof(double));
2667
2668 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2669 tryObjectEncoding(ele);
2670 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2671 dictAdd(zs->dict,ele,score);
2672 zslInsert(zs->zsl,*score,ele);
2673 incrRefCount(ele); /* added to skiplist */
2674 }
2675 } else {
2676 assert(0 != 0);
2677 }
2678 /* Add the new object in the hash table */
2679 retval = dictAdd(d,keyobj,o);
2680 if (retval == DICT_ERR) {
2681 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2682 exit(1);
2683 }
2684 /* Set the expire time if needed */
2685 if (expiretime != -1) {
2686 setExpire(db,keyobj,expiretime);
2687 /* Delete this key if already expired */
2688 if (expiretime < now) deleteKey(db,keyobj);
2689 expiretime = -1;
2690 }
2691 keyobj = o = NULL;
2692 }
2693 fclose(fp);
2694 return REDIS_OK;
2695
2696 eoferr: /* unexpected end of file is handled here with a fatal exit */
2697 if (keyobj) decrRefCount(keyobj);
2698 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2699 exit(1);
2700 return REDIS_ERR; /* Just to avoid warning */
2701 }
2702
2703 /*================================== Commands =============================== */
2704
2705 static void authCommand(redisClient *c) {
2706 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2707 c->authenticated = 1;
2708 addReply(c,shared.ok);
2709 } else {
2710 c->authenticated = 0;
2711 addReply(c,shared.err);
2712 }
2713 }
2714
2715 static void pingCommand(redisClient *c) {
2716 addReply(c,shared.pong);
2717 }
2718
2719 static void echoCommand(redisClient *c) {
2720 addReplyBulkLen(c,c->argv[1]);
2721 addReply(c,c->argv[1]);
2722 addReply(c,shared.crlf);
2723 }
2724
2725 /*=================================== Strings =============================== */
2726
2727 static void setGenericCommand(redisClient *c, int nx) {
2728 int retval;
2729
2730 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2731 if (retval == DICT_ERR) {
2732 if (!nx) {
2733 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2734 incrRefCount(c->argv[2]);
2735 } else {
2736 addReply(c,shared.czero);
2737 return;
2738 }
2739 } else {
2740 incrRefCount(c->argv[1]);
2741 incrRefCount(c->argv[2]);
2742 }
2743 server.dirty++;
2744 removeExpire(c->db,c->argv[1]);
2745 addReply(c, nx ? shared.cone : shared.ok);
2746 }
2747
2748 static void setCommand(redisClient *c) {
2749 setGenericCommand(c,0);
2750 }
2751
2752 static void setnxCommand(redisClient *c) {
2753 setGenericCommand(c,1);
2754 }
2755
2756 static void getCommand(redisClient *c) {
2757 robj *o = lookupKeyRead(c->db,c->argv[1]);
2758
2759 if (o == NULL) {
2760 addReply(c,shared.nullbulk);
2761 } else {
2762 if (o->type != REDIS_STRING) {
2763 addReply(c,shared.wrongtypeerr);
2764 } else {
2765 addReplyBulkLen(c,o);
2766 addReply(c,o);
2767 addReply(c,shared.crlf);
2768 }
2769 }
2770 }
2771
2772 static void getsetCommand(redisClient *c) {
2773 getCommand(c);
2774 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2775 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2776 } else {
2777 incrRefCount(c->argv[1]);
2778 }
2779 incrRefCount(c->argv[2]);
2780 server.dirty++;
2781 removeExpire(c->db,c->argv[1]);
2782 }
2783
2784 static void mgetCommand(redisClient *c) {
2785 int j;
2786
2787 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2788 for (j = 1; j < c->argc; j++) {
2789 robj *o = lookupKeyRead(c->db,c->argv[j]);
2790 if (o == NULL) {
2791 addReply(c,shared.nullbulk);
2792 } else {
2793 if (o->type != REDIS_STRING) {
2794 addReply(c,shared.nullbulk);
2795 } else {
2796 addReplyBulkLen(c,o);
2797 addReply(c,o);
2798 addReply(c,shared.crlf);
2799 }
2800 }
2801 }
2802 }
2803
2804 static void incrDecrCommand(redisClient *c, long long incr) {
2805 long long value;
2806 int retval;
2807 robj *o;
2808
2809 o = lookupKeyWrite(c->db,c->argv[1]);
2810 if (o == NULL) {
2811 value = 0;
2812 } else {
2813 if (o->type != REDIS_STRING) {
2814 value = 0;
2815 } else {
2816 char *eptr;
2817
2818 if (o->encoding == REDIS_ENCODING_RAW)
2819 value = strtoll(o->ptr, &eptr, 10);
2820 else if (o->encoding == REDIS_ENCODING_INT)
2821 value = (long)o->ptr;
2822 else
2823 assert(1 != 1);
2824 }
2825 }
2826
2827 value += incr;
2828 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2829 tryObjectEncoding(o);
2830 retval = dictAdd(c->db->dict,c->argv[1],o);
2831 if (retval == DICT_ERR) {
2832 dictReplace(c->db->dict,c->argv[1],o);
2833 removeExpire(c->db,c->argv[1]);
2834 } else {
2835 incrRefCount(c->argv[1]);
2836 }
2837 server.dirty++;
2838 addReply(c,shared.colon);
2839 addReply(c,o);
2840 addReply(c,shared.crlf);
2841 }
2842
2843 static void incrCommand(redisClient *c) {
2844 incrDecrCommand(c,1);
2845 }
2846
2847 static void decrCommand(redisClient *c) {
2848 incrDecrCommand(c,-1);
2849 }
2850
2851 static void incrbyCommand(redisClient *c) {
2852 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2853 incrDecrCommand(c,incr);
2854 }
2855
2856 static void decrbyCommand(redisClient *c) {
2857 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2858 incrDecrCommand(c,-incr);
2859 }
2860
2861 /* ========================= Type agnostic commands ========================= */
2862
2863 static void delCommand(redisClient *c) {
2864 int deleted = 0, j;
2865
2866 for (j = 1; j < c->argc; j++) {
2867 if (deleteKey(c->db,c->argv[j])) {
2868 server.dirty++;
2869 deleted++;
2870 }
2871 }
2872 switch(deleted) {
2873 case 0:
2874 addReply(c,shared.czero);
2875 break;
2876 case 1:
2877 addReply(c,shared.cone);
2878 break;
2879 default:
2880 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2881 break;
2882 }
2883 }
2884
2885 static void existsCommand(redisClient *c) {
2886 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2887 }
2888
2889 static void selectCommand(redisClient *c) {
2890 int id = atoi(c->argv[1]->ptr);
2891
2892 if (selectDb(c,id) == REDIS_ERR) {
2893 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2894 } else {
2895 addReply(c,shared.ok);
2896 }
2897 }
2898
2899 static void randomkeyCommand(redisClient *c) {
2900 dictEntry *de;
2901
2902 while(1) {
2903 de = dictGetRandomKey(c->db->dict);
2904 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2905 }
2906 if (de == NULL) {
2907 addReply(c,shared.plus);
2908 addReply(c,shared.crlf);
2909 } else {
2910 addReply(c,shared.plus);
2911 addReply(c,dictGetEntryKey(de));
2912 addReply(c,shared.crlf);
2913 }
2914 }
2915
2916 static void keysCommand(redisClient *c) {
2917 dictIterator *di;
2918 dictEntry *de;
2919 sds pattern = c->argv[1]->ptr;
2920 int plen = sdslen(pattern);
2921 int numkeys = 0, keyslen = 0;
2922 robj *lenobj = createObject(REDIS_STRING,NULL);
2923
2924 di = dictGetIterator(c->db->dict);
2925 addReply(c,lenobj);
2926 decrRefCount(lenobj);
2927 while((de = dictNext(di)) != NULL) {
2928 robj *keyobj = dictGetEntryKey(de);
2929
2930 sds key = keyobj->ptr;
2931 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2932 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2933 if (expireIfNeeded(c->db,keyobj) == 0) {
2934 if (numkeys != 0)
2935 addReply(c,shared.space);
2936 addReply(c,keyobj);
2937 numkeys++;
2938 keyslen += sdslen(key);
2939 }
2940 }
2941 }
2942 dictReleaseIterator(di);
2943 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2944 addReply(c,shared.crlf);
2945 }
2946
2947 static void dbsizeCommand(redisClient *c) {
2948 addReplySds(c,
2949 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2950 }
2951
2952 static void lastsaveCommand(redisClient *c) {
2953 addReplySds(c,
2954 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2955 }
2956
2957 static void typeCommand(redisClient *c) {
2958 robj *o;
2959 char *type;
2960
2961 o = lookupKeyRead(c->db,c->argv[1]);
2962 if (o == NULL) {
2963 type = "+none";
2964 } else {
2965 switch(o->type) {
2966 case REDIS_STRING: type = "+string"; break;
2967 case REDIS_LIST: type = "+list"; break;
2968 case REDIS_SET: type = "+set"; break;
2969 default: type = "unknown"; break;
2970 }
2971 }
2972 addReplySds(c,sdsnew(type));
2973 addReply(c,shared.crlf);
2974 }
2975
2976 static void saveCommand(redisClient *c) {
2977 if (server.bgsaveinprogress) {
2978 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2979 return;
2980 }
2981 if (rdbSave(server.dbfilename) == REDIS_OK) {
2982 addReply(c,shared.ok);
2983 } else {
2984 addReply(c,shared.err);
2985 }
2986 }
2987
2988 static void bgsaveCommand(redisClient *c) {
2989 if (server.bgsaveinprogress) {
2990 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2991 return;
2992 }
2993 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2994 addReply(c,shared.ok);
2995 } else {
2996 addReply(c,shared.err);
2997 }
2998 }
2999
3000 static void shutdownCommand(redisClient *c) {
3001 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3002 /* Kill the saving child if there is a background saving in progress.
3003 We want to avoid race conditions, for instance our saving child may
3004 overwrite the synchronous saving did by SHUTDOWN. */
3005 if (server.bgsaveinprogress) {
3006 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3007 kill(server.bgsavechildpid,SIGKILL);
3008 rdbRemoveTempFile(server.bgsavechildpid);
3009 }
3010 /* SYNC SAVE */
3011 if (rdbSave(server.dbfilename) == REDIS_OK) {
3012 if (server.daemonize)
3013 unlink(server.pidfile);
3014 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3015 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3016 exit(1);
3017 } else {
3018 /* Ooops.. error saving! The best we can do is to continue operating.
3019 * Note that if there was a background saving process, in the next
3020 * cron() Redis will be notified that the background saving aborted,
3021 * handling special stuff like slaves pending for synchronization... */
3022 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3023 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3024 }
3025 }
3026
3027 static void renameGenericCommand(redisClient *c, int nx) {
3028 robj *o;
3029
3030 /* To use the same key as src and dst is probably an error */
3031 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3032 addReply(c,shared.sameobjecterr);
3033 return;
3034 }
3035
3036 o = lookupKeyWrite(c->db,c->argv[1]);
3037 if (o == NULL) {
3038 addReply(c,shared.nokeyerr);
3039 return;
3040 }
3041 incrRefCount(o);
3042 deleteIfVolatile(c->db,c->argv[2]);
3043 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3044 if (nx) {
3045 decrRefCount(o);
3046 addReply(c,shared.czero);
3047 return;
3048 }
3049 dictReplace(c->db->dict,c->argv[2],o);
3050 } else {
3051 incrRefCount(c->argv[2]);
3052 }
3053 deleteKey(c->db,c->argv[1]);
3054 server.dirty++;
3055 addReply(c,nx ? shared.cone : shared.ok);
3056 }
3057
3058 static void renameCommand(redisClient *c) {
3059 renameGenericCommand(c,0);
3060 }
3061
3062 static void renamenxCommand(redisClient *c) {
3063 renameGenericCommand(c,1);
3064 }
3065
3066 static void moveCommand(redisClient *c) {
3067 robj *o;
3068 redisDb *src, *dst;
3069 int srcid;
3070
3071 /* Obtain source and target DB pointers */
3072 src = c->db;
3073 srcid = c->db->id;
3074 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3075 addReply(c,shared.outofrangeerr);
3076 return;
3077 }
3078 dst = c->db;
3079 selectDb(c,srcid); /* Back to the source DB */
3080
3081 /* If the user is moving using as target the same
3082 * DB as the source DB it is probably an error. */
3083 if (src == dst) {
3084 addReply(c,shared.sameobjecterr);
3085 return;
3086 }
3087
3088 /* Check if the element exists and get a reference */
3089 o = lookupKeyWrite(c->db,c->argv[1]);
3090 if (!o) {
3091 addReply(c,shared.czero);
3092 return;
3093 }
3094
3095 /* Try to add the element to the target DB */
3096 deleteIfVolatile(dst,c->argv[1]);
3097 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3098 addReply(c,shared.czero);
3099 return;
3100 }
3101 incrRefCount(c->argv[1]);
3102 incrRefCount(o);
3103
3104 /* OK! key moved, free the entry in the source DB */
3105 deleteKey(src,c->argv[1]);
3106 server.dirty++;
3107 addReply(c,shared.cone);
3108 }
3109
3110 /* =================================== Lists ================================ */
3111 static void pushGenericCommand(redisClient *c, int where) {
3112 robj *lobj;
3113 list *list;
3114
3115 lobj = lookupKeyWrite(c->db,c->argv[1]);
3116 if (lobj == NULL) {
3117 lobj = createListObject();
3118 list = lobj->ptr;
3119 if (where == REDIS_HEAD) {
3120 listAddNodeHead(list,c->argv[2]);
3121 } else {
3122 listAddNodeTail(list,c->argv[2]);
3123 }
3124 dictAdd(c->db->dict,c->argv[1],lobj);
3125 incrRefCount(c->argv[1]);
3126 incrRefCount(c->argv[2]);
3127 } else {
3128 if (lobj->type != REDIS_LIST) {
3129 addReply(c,shared.wrongtypeerr);
3130 return;
3131 }
3132 list = lobj->ptr;
3133 if (where == REDIS_HEAD) {
3134 listAddNodeHead(list,c->argv[2]);
3135 } else {
3136 listAddNodeTail(list,c->argv[2]);
3137 }
3138 incrRefCount(c->argv[2]);
3139 }
3140 server.dirty++;
3141 addReply(c,shared.ok);
3142 }
3143
3144 static void lpushCommand(redisClient *c) {
3145 pushGenericCommand(c,REDIS_HEAD);
3146 }
3147
3148 static void rpushCommand(redisClient *c) {
3149 pushGenericCommand(c,REDIS_TAIL);
3150 }
3151
3152 static void llenCommand(redisClient *c) {
3153 robj *o;
3154 list *l;
3155
3156 o = lookupKeyRead(c->db,c->argv[1]);
3157 if (o == NULL) {
3158 addReply(c,shared.czero);
3159 return;
3160 } else {
3161 if (o->type != REDIS_LIST) {
3162 addReply(c,shared.wrongtypeerr);
3163 } else {
3164 l = o->ptr;
3165 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3166 }
3167 }
3168 }
3169
3170 static void lindexCommand(redisClient *c) {
3171 robj *o;
3172 int index = atoi(c->argv[2]->ptr);
3173
3174 o = lookupKeyRead(c->db,c->argv[1]);
3175 if (o == NULL) {
3176 addReply(c,shared.nullbulk);
3177 } else {
3178 if (o->type != REDIS_LIST) {
3179 addReply(c,shared.wrongtypeerr);
3180 } else {
3181 list *list = o->ptr;
3182 listNode *ln;
3183
3184 ln = listIndex(list, index);
3185 if (ln == NULL) {
3186 addReply(c,shared.nullbulk);
3187 } else {
3188 robj *ele = listNodeValue(ln);
3189 addReplyBulkLen(c,ele);
3190 addReply(c,ele);
3191 addReply(c,shared.crlf);
3192 }
3193 }
3194 }
3195 }
3196
3197 static void lsetCommand(redisClient *c) {
3198 robj *o;
3199 int index = atoi(c->argv[2]->ptr);
3200
3201 o = lookupKeyWrite(c->db,c->argv[1]);
3202 if (o == NULL) {
3203 addReply(c,shared.nokeyerr);
3204 } else {
3205 if (o->type != REDIS_LIST) {
3206 addReply(c,shared.wrongtypeerr);
3207 } else {
3208 list *list = o->ptr;
3209 listNode *ln;
3210
3211 ln = listIndex(list, index);
3212 if (ln == NULL) {
3213 addReply(c,shared.outofrangeerr);
3214 } else {
3215 robj *ele = listNodeValue(ln);
3216
3217 decrRefCount(ele);
3218 listNodeValue(ln) = c->argv[3];
3219 incrRefCount(c->argv[3]);
3220 addReply(c,shared.ok);
3221 server.dirty++;
3222 }
3223 }
3224 }
3225 }
3226
3227 static void popGenericCommand(redisClient *c, int where) {
3228 robj *o;
3229
3230 o = lookupKeyWrite(c->db,c->argv[1]);
3231 if (o == NULL) {
3232 addReply(c,shared.nullbulk);
3233 } else {
3234 if (o->type != REDIS_LIST) {
3235 addReply(c,shared.wrongtypeerr);
3236 } else {
3237 list *list = o->ptr;
3238 listNode *ln;
3239
3240 if (where == REDIS_HEAD)
3241 ln = listFirst(list);
3242 else
3243 ln = listLast(list);
3244
3245 if (ln == NULL) {
3246 addReply(c,shared.nullbulk);
3247 } else {
3248 robj *ele = listNodeValue(ln);
3249 addReplyBulkLen(c,ele);
3250 addReply(c,ele);
3251 addReply(c,shared.crlf);
3252 listDelNode(list,ln);
3253 server.dirty++;
3254 }
3255 }
3256 }
3257 }
3258
3259 static void lpopCommand(redisClient *c) {
3260 popGenericCommand(c,REDIS_HEAD);
3261 }
3262
3263 static void rpopCommand(redisClient *c) {
3264 popGenericCommand(c,REDIS_TAIL);
3265 }
3266
3267 static void lrangeCommand(redisClient *c) {
3268 robj *o;
3269 int start = atoi(c->argv[2]->ptr);
3270 int end = atoi(c->argv[3]->ptr);
3271
3272 o = lookupKeyRead(c->db,c->argv[1]);
3273 if (o == NULL) {
3274 addReply(c,shared.nullmultibulk);
3275 } else {
3276 if (o->type != REDIS_LIST) {
3277 addReply(c,shared.wrongtypeerr);
3278 } else {
3279 list *list = o->ptr;
3280 listNode *ln;
3281 int llen = listLength(list);
3282 int rangelen, j;
3283 robj *ele;
3284
3285 /* convert negative indexes */
3286 if (start < 0) start = llen+start;
3287 if (end < 0) end = llen+end;
3288 if (start < 0) start = 0;
3289 if (end < 0) end = 0;
3290
3291 /* indexes sanity checks */
3292 if (start > end || start >= llen) {
3293 /* Out of range start or start > end result in empty list */
3294 addReply(c,shared.emptymultibulk);
3295 return;
3296 }
3297 if (end >= llen) end = llen-1;
3298 rangelen = (end-start)+1;
3299
3300 /* Return the result in form of a multi-bulk reply */
3301 ln = listIndex(list, start);
3302 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3303 for (j = 0; j < rangelen; j++) {
3304 ele = listNodeValue(ln);
3305 addReplyBulkLen(c,ele);
3306 addReply(c,ele);
3307 addReply(c,shared.crlf);
3308 ln = ln->next;
3309 }
3310 }
3311 }
3312 }
3313
3314 static void ltrimCommand(redisClient *c) {
3315 robj *o;
3316 int start = atoi(c->argv[2]->ptr);
3317 int end = atoi(c->argv[3]->ptr);
3318
3319 o = lookupKeyWrite(c->db,c->argv[1]);
3320 if (o == NULL) {
3321 addReply(c,shared.nokeyerr);
3322 } else {
3323 if (o->type != REDIS_LIST) {
3324 addReply(c,shared.wrongtypeerr);
3325 } else {
3326 list *list = o->ptr;
3327 listNode *ln;
3328 int llen = listLength(list);
3329 int j, ltrim, rtrim;
3330
3331 /* convert negative indexes */
3332 if (start < 0) start = llen+start;
3333 if (end < 0) end = llen+end;
3334 if (start < 0) start = 0;
3335 if (end < 0) end = 0;
3336
3337 /* indexes sanity checks */
3338 if (start > end || start >= llen) {
3339 /* Out of range start or start > end result in empty list */
3340 ltrim = llen;
3341 rtrim = 0;
3342 } else {
3343 if (end >= llen) end = llen-1;
3344 ltrim = start;
3345 rtrim = llen-end-1;
3346 }
3347
3348 /* Remove list elements to perform the trim */
3349 for (j = 0; j < ltrim; j++) {
3350 ln = listFirst(list);
3351 listDelNode(list,ln);
3352 }
3353 for (j = 0; j < rtrim; j++) {
3354 ln = listLast(list);
3355 listDelNode(list,ln);
3356 }
3357 server.dirty++;
3358 addReply(c,shared.ok);
3359 }
3360 }
3361 }
3362
3363 static void lremCommand(redisClient *c) {
3364 robj *o;
3365
3366 o = lookupKeyWrite(c->db,c->argv[1]);
3367 if (o == NULL) {
3368 addReply(c,shared.czero);
3369 } else {
3370 if (o->type != REDIS_LIST) {
3371 addReply(c,shared.wrongtypeerr);
3372 } else {
3373 list *list = o->ptr;
3374 listNode *ln, *next;
3375 int toremove = atoi(c->argv[2]->ptr);
3376 int removed = 0;
3377 int fromtail = 0;
3378
3379 if (toremove < 0) {
3380 toremove = -toremove;
3381 fromtail = 1;
3382 }
3383 ln = fromtail ? list->tail : list->head;
3384 while (ln) {
3385 robj *ele = listNodeValue(ln);
3386
3387 next = fromtail ? ln->prev : ln->next;
3388 if (compareStringObjects(ele,c->argv[3]) == 0) {
3389 listDelNode(list,ln);
3390 server.dirty++;
3391 removed++;
3392 if (toremove && removed == toremove) break;
3393 }
3394 ln = next;
3395 }
3396 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3397 }
3398 }
3399 }
3400
3401 /* ==================================== Sets ================================ */
3402
3403 static void saddCommand(redisClient *c) {
3404 robj *set;
3405
3406 set = lookupKeyWrite(c->db,c->argv[1]);
3407 if (set == NULL) {
3408 set = createSetObject();
3409 dictAdd(c->db->dict,c->argv[1],set);
3410 incrRefCount(c->argv[1]);
3411 } else {
3412 if (set->type != REDIS_SET) {
3413 addReply(c,shared.wrongtypeerr);
3414 return;
3415 }
3416 }
3417 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3418 incrRefCount(c->argv[2]);
3419 server.dirty++;
3420 addReply(c,shared.cone);
3421 } else {
3422 addReply(c,shared.czero);
3423 }
3424 }
3425
3426 static void sremCommand(redisClient *c) {
3427 robj *set;
3428
3429 set = lookupKeyWrite(c->db,c->argv[1]);
3430 if (set == NULL) {
3431 addReply(c,shared.czero);
3432 } else {
3433 if (set->type != REDIS_SET) {
3434 addReply(c,shared.wrongtypeerr);
3435 return;
3436 }
3437 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3438 server.dirty++;
3439 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3440 addReply(c,shared.cone);
3441 } else {
3442 addReply(c,shared.czero);
3443 }
3444 }
3445 }
3446
3447 static void smoveCommand(redisClient *c) {
3448 robj *srcset, *dstset;
3449
3450 srcset = lookupKeyWrite(c->db,c->argv[1]);
3451 dstset = lookupKeyWrite(c->db,c->argv[2]);
3452
3453 /* If the source key does not exist return 0, if it's of the wrong type
3454 * raise an error */
3455 if (srcset == NULL || srcset->type != REDIS_SET) {
3456 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3457 return;
3458 }
3459 /* Error if the destination key is not a set as well */
3460 if (dstset && dstset->type != REDIS_SET) {
3461 addReply(c,shared.wrongtypeerr);
3462 return;
3463 }
3464 /* Remove the element from the source set */
3465 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3466 /* Key not found in the src set! return zero */
3467 addReply(c,shared.czero);
3468 return;
3469 }
3470 server.dirty++;
3471 /* Add the element to the destination set */
3472 if (!dstset) {
3473 dstset = createSetObject();
3474 dictAdd(c->db->dict,c->argv[2],dstset);
3475 incrRefCount(c->argv[2]);
3476 }
3477 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3478 incrRefCount(c->argv[3]);
3479 addReply(c,shared.cone);
3480 }
3481
3482 static void sismemberCommand(redisClient *c) {
3483 robj *set;
3484
3485 set = lookupKeyRead(c->db,c->argv[1]);
3486 if (set == NULL) {
3487 addReply(c,shared.czero);
3488 } else {
3489 if (set->type != REDIS_SET) {
3490 addReply(c,shared.wrongtypeerr);
3491 return;
3492 }
3493 if (dictFind(set->ptr,c->argv[2]))
3494 addReply(c,shared.cone);
3495 else
3496 addReply(c,shared.czero);
3497 }
3498 }
3499
3500 static void scardCommand(redisClient *c) {
3501 robj *o;
3502 dict *s;
3503
3504 o = lookupKeyRead(c->db,c->argv[1]);
3505 if (o == NULL) {
3506 addReply(c,shared.czero);
3507 return;
3508 } else {
3509 if (o->type != REDIS_SET) {
3510 addReply(c,shared.wrongtypeerr);
3511 } else {
3512 s = o->ptr;
3513 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3514 dictSize(s)));
3515 }
3516 }
3517 }
3518
3519 static void spopCommand(redisClient *c) {
3520 robj *set;
3521 dictEntry *de;
3522
3523 set = lookupKeyWrite(c->db,c->argv[1]);
3524 if (set == NULL) {
3525 addReply(c,shared.nullbulk);
3526 } else {
3527 if (set->type != REDIS_SET) {
3528 addReply(c,shared.wrongtypeerr);
3529 return;
3530 }
3531 de = dictGetRandomKey(set->ptr);
3532 if (de == NULL) {
3533 addReply(c,shared.nullbulk);
3534 } else {
3535 robj *ele = dictGetEntryKey(de);
3536
3537 addReplyBulkLen(c,ele);
3538 addReply(c,ele);
3539 addReply(c,shared.crlf);
3540 dictDelete(set->ptr,ele);
3541 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3542 server.dirty++;
3543 }
3544 }
3545 }
3546
3547 static void srandmemberCommand(redisClient *c) {
3548 robj *set;
3549 dictEntry *de;
3550
3551 set = lookupKeyRead(c->db,c->argv[1]);
3552 if (set == NULL) {
3553 addReply(c,shared.nullbulk);
3554 } else {
3555 if (set->type != REDIS_SET) {
3556 addReply(c,shared.wrongtypeerr);
3557 return;
3558 }
3559 de = dictGetRandomKey(set->ptr);
3560 if (de == NULL) {
3561 addReply(c,shared.nullbulk);
3562 } else {
3563 robj *ele = dictGetEntryKey(de);
3564
3565 addReplyBulkLen(c,ele);
3566 addReply(c,ele);
3567 addReply(c,shared.crlf);
3568 }
3569 }
3570 }
3571
3572 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3573 dict **d1 = (void*) s1, **d2 = (void*) s2;
3574
3575 return dictSize(*d1)-dictSize(*d2);
3576 }
3577
3578 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3579 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3580 dictIterator *di;
3581 dictEntry *de;
3582 robj *lenobj = NULL, *dstset = NULL;
3583 int j, cardinality = 0;
3584
3585 for (j = 0; j < setsnum; j++) {
3586 robj *setobj;
3587
3588 setobj = dstkey ?
3589 lookupKeyWrite(c->db,setskeys[j]) :
3590 lookupKeyRead(c->db,setskeys[j]);
3591 if (!setobj) {
3592 zfree(dv);
3593 if (dstkey) {
3594 deleteKey(c->db,dstkey);
3595 addReply(c,shared.ok);
3596 } else {
3597 addReply(c,shared.nullmultibulk);
3598 }
3599 return;
3600 }
3601 if (setobj->type != REDIS_SET) {
3602 zfree(dv);
3603 addReply(c,shared.wrongtypeerr);
3604 return;
3605 }
3606 dv[j] = setobj->ptr;
3607 }
3608 /* Sort sets from the smallest to largest, this will improve our
3609 * algorithm's performace */
3610 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3611
3612 /* The first thing we should output is the total number of elements...
3613 * since this is a multi-bulk write, but at this stage we don't know
3614 * the intersection set size, so we use a trick, append an empty object
3615 * to the output list and save the pointer to later modify it with the
3616 * right length */
3617 if (!dstkey) {
3618 lenobj = createObject(REDIS_STRING,NULL);
3619 addReply(c,lenobj);
3620 decrRefCount(lenobj);
3621 } else {
3622 /* If we have a target key where to store the resulting set
3623 * create this key with an empty set inside */
3624 dstset = createSetObject();
3625 }
3626
3627 /* Iterate all the elements of the first (smallest) set, and test
3628 * the element against all the other sets, if at least one set does
3629 * not include the element it is discarded */
3630 di = dictGetIterator(dv[0]);
3631
3632 while((de = dictNext(di)) != NULL) {
3633 robj *ele;
3634
3635 for (j = 1; j < setsnum; j++)
3636 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3637 if (j != setsnum)
3638 continue; /* at least one set does not contain the member */
3639 ele = dictGetEntryKey(de);
3640 if (!dstkey) {
3641 addReplyBulkLen(c,ele);
3642 addReply(c,ele);
3643 addReply(c,shared.crlf);
3644 cardinality++;
3645 } else {
3646 dictAdd(dstset->ptr,ele,NULL);
3647 incrRefCount(ele);
3648 }
3649 }
3650 dictReleaseIterator(di);
3651
3652 if (dstkey) {
3653 /* Store the resulting set into the target */
3654 deleteKey(c->db,dstkey);
3655 dictAdd(c->db->dict,dstkey,dstset);
3656 incrRefCount(dstkey);
3657 }
3658
3659 if (!dstkey) {
3660 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3661 } else {
3662 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3663 dictSize((dict*)dstset->ptr)));
3664 server.dirty++;
3665 }
3666 zfree(dv);
3667 }
3668
3669 static void sinterCommand(redisClient *c) {
3670 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3671 }
3672
3673 static void sinterstoreCommand(redisClient *c) {
3674 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3675 }
3676
3677 #define REDIS_OP_UNION 0
3678 #define REDIS_OP_DIFF 1
3679
3680 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3681 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3682 dictIterator *di;
3683 dictEntry *de;
3684 robj *dstset = NULL;
3685 int j, cardinality = 0;
3686
3687 for (j = 0; j < setsnum; j++) {
3688 robj *setobj;
3689
3690 setobj = dstkey ?
3691 lookupKeyWrite(c->db,setskeys[j]) :
3692 lookupKeyRead(c->db,setskeys[j]);
3693 if (!setobj) {
3694 dv[j] = NULL;
3695 continue;
3696 }
3697 if (setobj->type != REDIS_SET) {
3698 zfree(dv);
3699 addReply(c,shared.wrongtypeerr);
3700 return;
3701 }
3702 dv[j] = setobj->ptr;
3703 }
3704
3705 /* We need a temp set object to store our union. If the dstkey
3706 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3707 * this set object will be the resulting object to set into the target key*/
3708 dstset = createSetObject();
3709
3710 /* Iterate all the elements of all the sets, add every element a single
3711 * time to the result set */
3712 for (j = 0; j < setsnum; j++) {
3713 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3714 if (!dv[j]) continue; /* non existing keys are like empty sets */
3715
3716 di = dictGetIterator(dv[j]);
3717
3718 while((de = dictNext(di)) != NULL) {
3719 robj *ele;
3720
3721 /* dictAdd will not add the same element multiple times */
3722 ele = dictGetEntryKey(de);
3723 if (op == REDIS_OP_UNION || j == 0) {
3724 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3725 incrRefCount(ele);
3726 cardinality++;
3727 }
3728 } else if (op == REDIS_OP_DIFF) {
3729 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3730 cardinality--;
3731 }
3732 }
3733 }
3734 dictReleaseIterator(di);
3735
3736 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3737 }
3738
3739 /* Output the content of the resulting set, if not in STORE mode */
3740 if (!dstkey) {
3741 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3742 di = dictGetIterator(dstset->ptr);
3743 while((de = dictNext(di)) != NULL) {
3744 robj *ele;
3745
3746 ele = dictGetEntryKey(de);
3747 addReplyBulkLen(c,ele);
3748 addReply(c,ele);
3749 addReply(c,shared.crlf);
3750 }
3751 dictReleaseIterator(di);
3752 } else {
3753 /* If we have a target key where to store the resulting set
3754 * create this key with the result set inside */
3755 deleteKey(c->db,dstkey);
3756 dictAdd(c->db->dict,dstkey,dstset);
3757 incrRefCount(dstkey);
3758 }
3759
3760 /* Cleanup */
3761 if (!dstkey) {
3762 decrRefCount(dstset);
3763 } else {
3764 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3765 dictSize((dict*)dstset->ptr)));
3766 server.dirty++;
3767 }
3768 zfree(dv);
3769 }
3770
3771 static void sunionCommand(redisClient *c) {
3772 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3773 }
3774
3775 static void sunionstoreCommand(redisClient *c) {
3776 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3777 }
3778
3779 static void sdiffCommand(redisClient *c) {
3780 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3781 }
3782
3783 static void sdiffstoreCommand(redisClient *c) {
3784 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3785 }
3786
3787 /* ==================================== ZSets =============================== */
3788
3789 /* ZSETs are ordered sets using two data structures to hold the same elements
3790 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3791 * data structure.
3792 *
3793 * The elements are added to an hash table mapping Redis objects to scores.
3794 * At the same time the elements are added to a skip list mapping scores
3795 * to Redis objects (so objects are sorted by scores in this "view"). */
3796
3797 /* This skiplist implementation is almost a C translation of the original
3798 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3799 * Alternative to Balanced Trees", modified in three ways:
3800 * a) this implementation allows for repeated values.
3801 * b) the comparison is not just by key (our 'score') but by satellite data.
3802 * c) there is a back pointer, so it's a doubly linked list with the back
3803 * pointers being only at "level 1". This allows to traverse the list
3804 * from tail to head, useful for ZREVRANGE. */
3805
3806 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3807 zskiplistNode *zn = zmalloc(sizeof(*zn));
3808
3809 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3810 zn->score = score;
3811 zn->obj = obj;
3812 return zn;
3813 }
3814
3815 static zskiplist *zslCreate(void) {
3816 int j;
3817 zskiplist *zsl;
3818
3819 zsl = zmalloc(sizeof(*zsl));
3820 zsl->level = 1;
3821 zsl->length = 0;
3822 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3823 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3824 zsl->header->forward[j] = NULL;
3825 zsl->header->backward = NULL;
3826 zsl->tail = NULL;
3827 return zsl;
3828 }
3829
3830 static void zslFreeNode(zskiplistNode *node) {
3831 decrRefCount(node->obj);
3832 zfree(node->forward);
3833 zfree(node);
3834 }
3835
3836 static void zslFree(zskiplist *zsl) {
3837 zskiplistNode *node = zsl->header->forward[0], *next;
3838
3839 zfree(zsl->header->forward);
3840 zfree(zsl->header);
3841 while(node) {
3842 next = node->forward[0];
3843 zslFreeNode(node);
3844 node = next;
3845 }
3846 zfree(zsl);
3847 }
3848
3849 static int zslRandomLevel(void) {
3850 int level = 1;
3851 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3852 level += 1;
3853 return level;
3854 }
3855
3856 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3857 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3858 int i, level;
3859
3860 x = zsl->header;
3861 for (i = zsl->level-1; i >= 0; i--) {
3862 while (x->forward[i] &&
3863 (x->forward[i]->score < score ||
3864 (x->forward[i]->score == score &&
3865 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3866 x = x->forward[i];
3867 update[i] = x;
3868 }
3869 /* we assume the key is not already inside, since we allow duplicated
3870 * scores, and the re-insertion of score and redis object should never
3871 * happpen since the caller of zslInsert() should test in the hash table
3872 * if the element is already inside or not. */
3873 level = zslRandomLevel();
3874 if (level > zsl->level) {
3875 for (i = zsl->level; i < level; i++)
3876 update[i] = zsl->header;
3877 zsl->level = level;
3878 }
3879 x = zslCreateNode(level,score,obj);
3880 for (i = 0; i < level; i++) {
3881 x->forward[i] = update[i]->forward[i];
3882 update[i]->forward[i] = x;
3883 }
3884 x->backward = (update[0] == zsl->header) ? NULL : update[0];
3885 if (x->forward[0])
3886 x->forward[0]->backward = x;
3887 else
3888 zsl->tail = x;
3889 zsl->length++;
3890 }
3891
3892 /* Delete an element with matching score/object from the skiplist. */
3893 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
3894 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3895 int i;
3896
3897 x = zsl->header;
3898 for (i = zsl->level-1; i >= 0; i--) {
3899 while (x->forward[i] &&
3900 (x->forward[i]->score < score ||
3901 (x->forward[i]->score == score &&
3902 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3903 x = x->forward[i];
3904 update[i] = x;
3905 }
3906 /* We may have multiple elements with the same score, what we need
3907 * is to find the element with both the right score and object. */
3908 x = x->forward[0];
3909 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
3910 for (i = 0; i < zsl->level; i++) {
3911 if (update[i]->forward[i] != x) break;
3912 update[i]->forward[i] = x->forward[i];
3913 }
3914 if (x->forward[0]) {
3915 x->forward[0]->backward = (x->backward == zsl->header) ?
3916 NULL : x->backward;
3917 } else {
3918 zsl->tail = x->backward;
3919 }
3920 zslFreeNode(x);
3921 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
3922 zsl->level--;
3923 zsl->length--;
3924 return 1;
3925 } else {
3926 return 0; /* not found */
3927 }
3928 return 0; /* not found */
3929 }
3930
3931 /* Find the first node having a score equal or greater than the specified one.
3932 * Returns NULL if there is no match. */
3933 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
3934 zskiplistNode *x;
3935 int i;
3936
3937 x = zsl->header;
3938 for (i = zsl->level-1; i >= 0; i--) {
3939 while (x->forward[i] && x->forward[i]->score < score)
3940 x = x->forward[i];
3941 }
3942 /* We may have multiple elements with the same score, what we need
3943 * is to find the element with both the right score and object. */
3944 return x->forward[0];
3945 }
3946
3947 /* The actual Z-commands implementations */
3948
3949 static void zaddCommand(redisClient *c) {
3950 robj *zsetobj;
3951 zset *zs;
3952 double *score;
3953
3954 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
3955 if (zsetobj == NULL) {
3956 zsetobj = createZsetObject();
3957 dictAdd(c->db->dict,c->argv[1],zsetobj);
3958 incrRefCount(c->argv[1]);
3959 } else {
3960 if (zsetobj->type != REDIS_ZSET) {
3961 addReply(c,shared.wrongtypeerr);
3962 return;
3963 }
3964 }
3965 score = zmalloc(sizeof(double));
3966 *score = strtod(c->argv[2]->ptr,NULL);
3967 zs = zsetobj->ptr;
3968 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
3969 /* case 1: New element */
3970 incrRefCount(c->argv[3]); /* added to hash */
3971 zslInsert(zs->zsl,*score,c->argv[3]);
3972 incrRefCount(c->argv[3]); /* added to skiplist */
3973 server.dirty++;
3974 addReply(c,shared.cone);
3975 } else {
3976 dictEntry *de;
3977 double *oldscore;
3978
3979 /* case 2: Score update operation */
3980 de = dictFind(zs->dict,c->argv[3]);
3981 assert(de != NULL);
3982 oldscore = dictGetEntryVal(de);
3983 if (*score != *oldscore) {
3984 int deleted;
3985
3986 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
3987 assert(deleted != 0);
3988 zslInsert(zs->zsl,*score,c->argv[3]);
3989 incrRefCount(c->argv[3]);
3990 dictReplace(zs->dict,c->argv[3],score);
3991 server.dirty++;
3992 } else {
3993 zfree(score);
3994 }
3995 addReply(c,shared.czero);
3996 }
3997 }
3998
3999 static void zremCommand(redisClient *c) {
4000 robj *zsetobj;
4001 zset *zs;
4002
4003 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4004 if (zsetobj == NULL) {
4005 addReply(c,shared.czero);
4006 } else {
4007 dictEntry *de;
4008 double *oldscore;
4009 int deleted;
4010
4011 if (zsetobj->type != REDIS_ZSET) {
4012 addReply(c,shared.wrongtypeerr);
4013 return;
4014 }
4015 zs = zsetobj->ptr;
4016 de = dictFind(zs->dict,c->argv[2]);
4017 if (de == NULL) {
4018 addReply(c,shared.czero);
4019 return;
4020 }
4021 /* Delete from the skiplist */
4022 oldscore = dictGetEntryVal(de);
4023 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4024 assert(deleted != 0);
4025
4026 /* Delete from the hash table */
4027 dictDelete(zs->dict,c->argv[2]);
4028 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4029 server.dirty++;
4030 addReply(c,shared.cone);
4031 }
4032 }
4033
4034 static void zrangeGenericCommand(redisClient *c, int reverse) {
4035 robj *o;
4036 int start = atoi(c->argv[2]->ptr);
4037 int end = atoi(c->argv[3]->ptr);
4038
4039 o = lookupKeyRead(c->db,c->argv[1]);
4040 if (o == NULL) {
4041 addReply(c,shared.nullmultibulk);
4042 } else {
4043 if (o->type != REDIS_ZSET) {
4044 addReply(c,shared.wrongtypeerr);
4045 } else {
4046 zset *zsetobj = o->ptr;
4047 zskiplist *zsl = zsetobj->zsl;
4048 zskiplistNode *ln;
4049
4050 int llen = zsl->length;
4051 int rangelen, j;
4052 robj *ele;
4053
4054 /* convert negative indexes */
4055 if (start < 0) start = llen+start;
4056 if (end < 0) end = llen+end;
4057 if (start < 0) start = 0;
4058 if (end < 0) end = 0;
4059
4060 /* indexes sanity checks */
4061 if (start > end || start >= llen) {
4062 /* Out of range start or start > end result in empty list */
4063 addReply(c,shared.emptymultibulk);
4064 return;
4065 }
4066 if (end >= llen) end = llen-1;
4067 rangelen = (end-start)+1;
4068
4069 /* Return the result in form of a multi-bulk reply */
4070 if (reverse) {
4071 ln = zsl->tail;
4072 while (start--)
4073 ln = ln->backward;
4074 } else {
4075 ln = zsl->header->forward[0];
4076 while (start--)
4077 ln = ln->forward[0];
4078 }
4079
4080 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4081 for (j = 0; j < rangelen; j++) {
4082 ele = ln->obj;
4083 addReplyBulkLen(c,ele);
4084 addReply(c,ele);
4085 addReply(c,shared.crlf);
4086 ln = reverse ? ln->backward : ln->forward[0];
4087 }
4088 }
4089 }
4090 }
4091
4092 static void zrangeCommand(redisClient *c) {
4093 zrangeGenericCommand(c,0);
4094 }
4095
4096 static void zrevrangeCommand(redisClient *c) {
4097 zrangeGenericCommand(c,1);
4098 }
4099
4100 static void zrangebyscoreCommand(redisClient *c) {
4101 robj *o;
4102 double min = strtod(c->argv[2]->ptr,NULL);
4103 double max = strtod(c->argv[3]->ptr,NULL);
4104
4105 o = lookupKeyRead(c->db,c->argv[1]);
4106 if (o == NULL) {
4107 addReply(c,shared.nullmultibulk);
4108 } else {
4109 if (o->type != REDIS_ZSET) {
4110 addReply(c,shared.wrongtypeerr);
4111 } else {
4112 zset *zsetobj = o->ptr;
4113 zskiplist *zsl = zsetobj->zsl;
4114 zskiplistNode *ln;
4115 robj *ele, *lenobj;
4116 unsigned int rangelen = 0;
4117
4118 /* Get the first node with the score >= min */
4119 ln = zslFirstWithScore(zsl,min);
4120 if (ln == NULL) {
4121 /* No element matching the speciifed interval */
4122 addReply(c,shared.emptymultibulk);
4123 return;
4124 }
4125
4126 /* We don't know in advance how many matching elements there
4127 * are in the list, so we push this object that will represent
4128 * the multi-bulk length in the output buffer, and will "fix"
4129 * it later */
4130 lenobj = createObject(REDIS_STRING,NULL);
4131 addReply(c,lenobj);
4132
4133 while(ln && ln->score <= max) {
4134 ele = ln->obj;
4135 addReplyBulkLen(c,ele);
4136 addReply(c,ele);
4137 addReply(c,shared.crlf);
4138 ln = ln->forward[0];
4139 rangelen++;
4140 }
4141 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4142 }
4143 }
4144 }
4145
4146 static void zlenCommand(redisClient *c) {
4147 robj *o;
4148 zset *zs;
4149
4150 o = lookupKeyRead(c->db,c->argv[1]);
4151 if (o == NULL) {
4152 addReply(c,shared.czero);
4153 return;
4154 } else {
4155 if (o->type != REDIS_ZSET) {
4156 addReply(c,shared.wrongtypeerr);
4157 } else {
4158 zs = o->ptr;
4159 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4160 }
4161 }
4162 }
4163
4164 static void zscoreCommand(redisClient *c) {
4165 robj *o;
4166 zset *zs;
4167
4168 o = lookupKeyRead(c->db,c->argv[1]);
4169 if (o == NULL) {
4170 addReply(c,shared.czero);
4171 return;
4172 } else {
4173 if (o->type != REDIS_ZSET) {
4174 addReply(c,shared.wrongtypeerr);
4175 } else {
4176 dictEntry *de;
4177
4178 zs = o->ptr;
4179 de = dictFind(zs->dict,c->argv[2]);
4180 if (!de) {
4181 addReply(c,shared.nullbulk);
4182 } else {
4183 char buf[128];
4184 double *score = dictGetEntryVal(de);
4185
4186 snprintf(buf,sizeof(buf),"%.16g",*score);
4187 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
4188 strlen(buf),buf));
4189 }
4190 }
4191 }
4192 }
4193
4194 /* ========================= Non type-specific commands ==================== */
4195
4196 static void flushdbCommand(redisClient *c) {
4197 server.dirty += dictSize(c->db->dict);
4198 dictEmpty(c->db->dict);
4199 dictEmpty(c->db->expires);
4200 addReply(c,shared.ok);
4201 }
4202
4203 static void flushallCommand(redisClient *c) {
4204 server.dirty += emptyDb();
4205 addReply(c,shared.ok);
4206 rdbSave(server.dbfilename);
4207 server.dirty++;
4208 }
4209
4210 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4211 redisSortOperation *so = zmalloc(sizeof(*so));
4212 so->type = type;
4213 so->pattern = pattern;
4214 return so;
4215 }
4216
4217 /* Return the value associated to the key with a name obtained
4218 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4219 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4220 char *p;
4221 sds spat, ssub;
4222 robj keyobj;
4223 int prefixlen, sublen, postfixlen;
4224 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4225 struct {
4226 long len;
4227 long free;
4228 char buf[REDIS_SORTKEY_MAX+1];
4229 } keyname;
4230
4231 if (subst->encoding == REDIS_ENCODING_RAW)
4232 incrRefCount(subst);
4233 else {
4234 subst = getDecodedObject(subst);
4235 }
4236
4237 spat = pattern->ptr;
4238 ssub = subst->ptr;
4239 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4240 p = strchr(spat,'*');
4241 if (!p) return NULL;
4242
4243 prefixlen = p-spat;
4244 sublen = sdslen(ssub);
4245 postfixlen = sdslen(spat)-(prefixlen+1);
4246 memcpy(keyname.buf,spat,prefixlen);
4247 memcpy(keyname.buf+prefixlen,ssub,sublen);
4248 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4249 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4250 keyname.len = prefixlen+sublen+postfixlen;
4251
4252 keyobj.refcount = 1;
4253 keyobj.type = REDIS_STRING;
4254 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4255
4256 decrRefCount(subst);
4257
4258 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4259 return lookupKeyRead(db,&keyobj);
4260 }
4261
4262 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4263 * the additional parameter is not standard but a BSD-specific we have to
4264 * pass sorting parameters via the global 'server' structure */
4265 static int sortCompare(const void *s1, const void *s2) {
4266 const redisSortObject *so1 = s1, *so2 = s2;
4267 int cmp;
4268
4269 if (!server.sort_alpha) {
4270 /* Numeric sorting. Here it's trivial as we precomputed scores */
4271 if (so1->u.score > so2->u.score) {
4272 cmp = 1;
4273 } else if (so1->u.score < so2->u.score) {
4274 cmp = -1;
4275 } else {
4276 cmp = 0;
4277 }
4278 } else {
4279 /* Alphanumeric sorting */
4280 if (server.sort_bypattern) {
4281 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4282 /* At least one compare object is NULL */
4283 if (so1->u.cmpobj == so2->u.cmpobj)
4284 cmp = 0;
4285 else if (so1->u.cmpobj == NULL)
4286 cmp = -1;
4287 else
4288 cmp = 1;
4289 } else {
4290 /* We have both the objects, use strcoll */
4291 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4292 }
4293 } else {
4294 /* Compare elements directly */
4295 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4296 so2->obj->encoding == REDIS_ENCODING_RAW) {
4297 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4298 } else {
4299 robj *dec1, *dec2;
4300
4301 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4302 so1->obj : getDecodedObject(so1->obj);
4303 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4304 so2->obj : getDecodedObject(so2->obj);
4305 cmp = strcoll(dec1->ptr,dec2->ptr);
4306 if (dec1 != so1->obj) decrRefCount(dec1);
4307 if (dec2 != so2->obj) decrRefCount(dec2);
4308 }
4309 }
4310 }
4311 return server.sort_desc ? -cmp : cmp;
4312 }
4313
4314 /* The SORT command is the most complex command in Redis. Warning: this code
4315 * is optimized for speed and a bit less for readability */
4316 static void sortCommand(redisClient *c) {
4317 list *operations;
4318 int outputlen = 0;
4319 int desc = 0, alpha = 0;
4320 int limit_start = 0, limit_count = -1, start, end;
4321 int j, dontsort = 0, vectorlen;
4322 int getop = 0; /* GET operation counter */
4323 robj *sortval, *sortby = NULL;
4324 redisSortObject *vector; /* Resulting vector to sort */
4325
4326 /* Lookup the key to sort. It must be of the right types */
4327 sortval = lookupKeyRead(c->db,c->argv[1]);
4328 if (sortval == NULL) {
4329 addReply(c,shared.nokeyerr);
4330 return;
4331 }
4332 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4333 addReply(c,shared.wrongtypeerr);
4334 return;
4335 }
4336
4337 /* Create a list of operations to perform for every sorted element.
4338 * Operations can be GET/DEL/INCR/DECR */
4339 operations = listCreate();
4340 listSetFreeMethod(operations,zfree);
4341 j = 2;
4342
4343 /* Now we need to protect sortval incrementing its count, in the future
4344 * SORT may have options able to overwrite/delete keys during the sorting
4345 * and the sorted key itself may get destroied */
4346 incrRefCount(sortval);
4347
4348 /* The SORT command has an SQL-alike syntax, parse it */
4349 while(j < c->argc) {
4350 int leftargs = c->argc-j-1;
4351 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4352 desc = 0;
4353 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4354 desc = 1;
4355 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4356 alpha = 1;
4357 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4358 limit_start = atoi(c->argv[j+1]->ptr);
4359 limit_count = atoi(c->argv[j+2]->ptr);
4360 j+=2;
4361 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4362 sortby = c->argv[j+1];
4363 /* If the BY pattern does not contain '*', i.e. it is constant,
4364 * we don't need to sort nor to lookup the weight keys. */
4365 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4366 j++;
4367 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4368 listAddNodeTail(operations,createSortOperation(
4369 REDIS_SORT_GET,c->argv[j+1]));
4370 getop++;
4371 j++;
4372 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
4373 listAddNodeTail(operations,createSortOperation(
4374 REDIS_SORT_DEL,c->argv[j+1]));
4375 j++;
4376 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
4377 listAddNodeTail(operations,createSortOperation(
4378 REDIS_SORT_INCR,c->argv[j+1]));
4379 j++;
4380 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4381 listAddNodeTail(operations,createSortOperation(
4382 REDIS_SORT_DECR,c->argv[j+1]));
4383 j++;
4384 } else {
4385 decrRefCount(sortval);
4386 listRelease(operations);
4387 addReply(c,shared.syntaxerr);
4388 return;
4389 }
4390 j++;
4391 }
4392
4393 /* Load the sorting vector with all the objects to sort */
4394 vectorlen = (sortval->type == REDIS_LIST) ?
4395 listLength((list*)sortval->ptr) :
4396 dictSize((dict*)sortval->ptr);
4397 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4398 j = 0;
4399 if (sortval->type == REDIS_LIST) {
4400 list *list = sortval->ptr;
4401 listNode *ln;
4402
4403 listRewind(list);
4404 while((ln = listYield(list))) {
4405 robj *ele = ln->value;
4406 vector[j].obj = ele;
4407 vector[j].u.score = 0;
4408 vector[j].u.cmpobj = NULL;
4409 j++;
4410 }
4411 } else {
4412 dict *set = sortval->ptr;
4413 dictIterator *di;
4414 dictEntry *setele;
4415
4416 di = dictGetIterator(set);
4417 while((setele = dictNext(di)) != NULL) {
4418 vector[j].obj = dictGetEntryKey(setele);
4419 vector[j].u.score = 0;
4420 vector[j].u.cmpobj = NULL;
4421 j++;
4422 }
4423 dictReleaseIterator(di);
4424 }
4425 assert(j == vectorlen);
4426
4427 /* Now it's time to load the right scores in the sorting vector */
4428 if (dontsort == 0) {
4429 for (j = 0; j < vectorlen; j++) {
4430 if (sortby) {
4431 robj *byval;
4432
4433 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4434 if (!byval || byval->type != REDIS_STRING) continue;
4435 if (alpha) {
4436 if (byval->encoding == REDIS_ENCODING_RAW) {
4437 vector[j].u.cmpobj = byval;
4438 incrRefCount(byval);
4439 } else {
4440 vector[j].u.cmpobj = getDecodedObject(byval);
4441 }
4442 } else {
4443 if (byval->encoding == REDIS_ENCODING_RAW) {
4444 vector[j].u.score = strtod(byval->ptr,NULL);
4445 } else {
4446 if (byval->encoding == REDIS_ENCODING_INT) {
4447 vector[j].u.score = (long)byval->ptr;
4448 } else
4449 assert(1 != 1);
4450 }
4451 }
4452 } else {
4453 if (!alpha) {
4454 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4455 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4456 else {
4457 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4458 vector[j].u.score = (long) vector[j].obj->ptr;
4459 else
4460 assert(1 != 1);
4461 }
4462 }
4463 }
4464 }
4465 }
4466
4467 /* We are ready to sort the vector... perform a bit of sanity check
4468 * on the LIMIT option too. We'll use a partial version of quicksort. */
4469 start = (limit_start < 0) ? 0 : limit_start;
4470 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4471 if (start >= vectorlen) {
4472 start = vectorlen-1;
4473 end = vectorlen-2;
4474 }
4475 if (end >= vectorlen) end = vectorlen-1;
4476
4477 if (dontsort == 0) {
4478 server.sort_desc = desc;
4479 server.sort_alpha = alpha;
4480 server.sort_bypattern = sortby ? 1 : 0;
4481 if (sortby && (start != 0 || end != vectorlen-1))
4482 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4483 else
4484 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4485 }
4486
4487 /* Send command output to the output buffer, performing the specified
4488 * GET/DEL/INCR/DECR operations if any. */
4489 outputlen = getop ? getop*(end-start+1) : end-start+1;
4490 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4491 for (j = start; j <= end; j++) {
4492 listNode *ln;
4493 if (!getop) {
4494 addReplyBulkLen(c,vector[j].obj);
4495 addReply(c,vector[j].obj);
4496 addReply(c,shared.crlf);
4497 }
4498 listRewind(operations);
4499 while((ln = listYield(operations))) {
4500 redisSortOperation *sop = ln->value;
4501 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4502 vector[j].obj);
4503
4504 if (sop->type == REDIS_SORT_GET) {
4505 if (!val || val->type != REDIS_STRING) {
4506 addReply(c,shared.nullbulk);
4507 } else {
4508 addReplyBulkLen(c,val);
4509 addReply(c,val);
4510 addReply(c,shared.crlf);
4511 }
4512 } else if (sop->type == REDIS_SORT_DEL) {
4513 /* TODO */
4514 }
4515 }
4516 }
4517
4518 /* Cleanup */
4519 decrRefCount(sortval);
4520 listRelease(operations);
4521 for (j = 0; j < vectorlen; j++) {
4522 if (sortby && alpha && vector[j].u.cmpobj)
4523 decrRefCount(vector[j].u.cmpobj);
4524 }
4525 zfree(vector);
4526 }
4527
4528 static void infoCommand(redisClient *c) {
4529 sds info;
4530 time_t uptime = time(NULL)-server.stat_starttime;
4531 int j;
4532
4533 info = sdscatprintf(sdsempty(),
4534 "redis_version:%s\r\n"
4535 "arch_bits:%s\r\n"
4536 "uptime_in_seconds:%d\r\n"
4537 "uptime_in_days:%d\r\n"
4538 "connected_clients:%d\r\n"
4539 "connected_slaves:%d\r\n"
4540 "used_memory:%zu\r\n"
4541 "changes_since_last_save:%lld\r\n"
4542 "bgsave_in_progress:%d\r\n"
4543 "last_save_time:%d\r\n"
4544 "total_connections_received:%lld\r\n"
4545 "total_commands_processed:%lld\r\n"
4546 "role:%s\r\n"
4547 ,REDIS_VERSION,
4548 (sizeof(long) == 8) ? "64" : "32",
4549 uptime,
4550 uptime/(3600*24),
4551 listLength(server.clients)-listLength(server.slaves),
4552 listLength(server.slaves),
4553 server.usedmemory,
4554 server.dirty,
4555 server.bgsaveinprogress,
4556 server.lastsave,
4557 server.stat_numconnections,
4558 server.stat_numcommands,
4559 server.masterhost == NULL ? "master" : "slave"
4560 );
4561 if (server.masterhost) {
4562 info = sdscatprintf(info,
4563 "master_host:%s\r\n"
4564 "master_port:%d\r\n"
4565 "master_link_status:%s\r\n"
4566 "master_last_io_seconds_ago:%d\r\n"
4567 ,server.masterhost,
4568 server.masterport,
4569 (server.replstate == REDIS_REPL_CONNECTED) ?
4570 "up" : "down",
4571 (int)(time(NULL)-server.master->lastinteraction)
4572 );
4573 }
4574 for (j = 0; j < server.dbnum; j++) {
4575 long long keys, vkeys;
4576
4577 keys = dictSize(server.db[j].dict);
4578 vkeys = dictSize(server.db[j].expires);
4579 if (keys || vkeys) {
4580 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4581 j, keys, vkeys);
4582 }
4583 }
4584 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4585 addReplySds(c,info);
4586 addReply(c,shared.crlf);
4587 }
4588
4589 static void monitorCommand(redisClient *c) {
4590 /* ignore MONITOR if aleady slave or in monitor mode */
4591 if (c->flags & REDIS_SLAVE) return;
4592
4593 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4594 c->slaveseldb = 0;
4595 listAddNodeTail(server.monitors,c);
4596 addReply(c,shared.ok);
4597 }
4598
4599 /* ================================= Expire ================================= */
4600 static int removeExpire(redisDb *db, robj *key) {
4601 if (dictDelete(db->expires,key) == DICT_OK) {
4602 return 1;
4603 } else {
4604 return 0;
4605 }
4606 }
4607
4608 static int setExpire(redisDb *db, robj *key, time_t when) {
4609 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4610 return 0;
4611 } else {
4612 incrRefCount(key);
4613 return 1;
4614 }
4615 }
4616
4617 /* Return the expire time of the specified key, or -1 if no expire
4618 * is associated with this key (i.e. the key is non volatile) */
4619 static time_t getExpire(redisDb *db, robj *key) {
4620 dictEntry *de;
4621
4622 /* No expire? return ASAP */
4623 if (dictSize(db->expires) == 0 ||
4624 (de = dictFind(db->expires,key)) == NULL) return -1;
4625
4626 return (time_t) dictGetEntryVal(de);
4627 }
4628
4629 static int expireIfNeeded(redisDb *db, robj *key) {
4630 time_t when;
4631 dictEntry *de;
4632
4633 /* No expire? return ASAP */
4634 if (dictSize(db->expires) == 0 ||
4635 (de = dictFind(db->expires,key)) == NULL) return 0;
4636
4637 /* Lookup the expire */
4638 when = (time_t) dictGetEntryVal(de);
4639 if (time(NULL) <= when) return 0;
4640
4641 /* Delete the key */
4642 dictDelete(db->expires,key);
4643 return dictDelete(db->dict,key) == DICT_OK;
4644 }
4645
4646 static int deleteIfVolatile(redisDb *db, robj *key) {
4647 dictEntry *de;
4648
4649 /* No expire? return ASAP */
4650 if (dictSize(db->expires) == 0 ||
4651 (de = dictFind(db->expires,key)) == NULL) return 0;
4652
4653 /* Delete the key */
4654 server.dirty++;
4655 dictDelete(db->expires,key);
4656 return dictDelete(db->dict,key) == DICT_OK;
4657 }
4658
4659 static void expireCommand(redisClient *c) {
4660 dictEntry *de;
4661 int seconds = atoi(c->argv[2]->ptr);
4662
4663 de = dictFind(c->db->dict,c->argv[1]);
4664 if (de == NULL) {
4665 addReply(c,shared.czero);
4666 return;
4667 }
4668 if (seconds <= 0) {
4669 addReply(c, shared.czero);
4670 return;
4671 } else {
4672 time_t when = time(NULL)+seconds;
4673 if (setExpire(c->db,c->argv[1],when)) {
4674 addReply(c,shared.cone);
4675 server.dirty++;
4676 } else {
4677 addReply(c,shared.czero);
4678 }
4679 return;
4680 }
4681 }
4682
4683 static void ttlCommand(redisClient *c) {
4684 time_t expire;
4685 int ttl = -1;
4686
4687 expire = getExpire(c->db,c->argv[1]);
4688 if (expire != -1) {
4689 ttl = (int) (expire-time(NULL));
4690 if (ttl < 0) ttl = -1;
4691 }
4692 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4693 }
4694
4695 static void msetGenericCommand(redisClient *c, int nx) {
4696 int j;
4697
4698 if ((c->argc % 2) == 0) {
4699 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4700 return;
4701 }
4702 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4703 * set nothing at all if at least one already key exists. */
4704 if (nx) {
4705 for (j = 1; j < c->argc; j += 2) {
4706 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4707 addReply(c, shared.czero);
4708 return;
4709 }
4710 }
4711 }
4712
4713 for (j = 1; j < c->argc; j += 2) {
4714 int retval;
4715
4716 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4717 if (retval == DICT_ERR) {
4718 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4719 incrRefCount(c->argv[j+1]);
4720 } else {
4721 incrRefCount(c->argv[j]);
4722 incrRefCount(c->argv[j+1]);
4723 }
4724 removeExpire(c->db,c->argv[j]);
4725 }
4726 server.dirty += (c->argc-1)/2;
4727 addReply(c, nx ? shared.cone : shared.ok);
4728 }
4729
4730 static void msetCommand(redisClient *c) {
4731 msetGenericCommand(c,0);
4732 }
4733
4734 static void msetnxCommand(redisClient *c) {
4735 msetGenericCommand(c,1);
4736 }
4737
4738 /* =============================== Replication ============================= */
4739
4740 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4741 ssize_t nwritten, ret = size;
4742 time_t start = time(NULL);
4743
4744 timeout++;
4745 while(size) {
4746 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4747 nwritten = write(fd,ptr,size);
4748 if (nwritten == -1) return -1;
4749 ptr += nwritten;
4750 size -= nwritten;
4751 }
4752 if ((time(NULL)-start) > timeout) {
4753 errno = ETIMEDOUT;
4754 return -1;
4755 }
4756 }
4757 return ret;
4758 }
4759
4760 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4761 ssize_t nread, totread = 0;
4762 time_t start = time(NULL);
4763
4764 timeout++;
4765 while(size) {
4766 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4767 nread = read(fd,ptr,size);
4768 if (nread == -1) return -1;
4769 ptr += nread;
4770 size -= nread;
4771 totread += nread;
4772 }
4773 if ((time(NULL)-start) > timeout) {
4774 errno = ETIMEDOUT;
4775 return -1;
4776 }
4777 }
4778 return totread;
4779 }
4780
4781 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4782 ssize_t nread = 0;
4783
4784 size--;
4785 while(size) {
4786 char c;
4787
4788 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4789 if (c == '\n') {
4790 *ptr = '\0';
4791 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4792 return nread;
4793 } else {
4794 *ptr++ = c;
4795 *ptr = '\0';
4796 nread++;
4797 }
4798 }
4799 return nread;
4800 }
4801
4802 static void syncCommand(redisClient *c) {
4803 /* ignore SYNC if aleady slave or in monitor mode */
4804 if (c->flags & REDIS_SLAVE) return;
4805
4806 /* SYNC can't be issued when the server has pending data to send to
4807 * the client about already issued commands. We need a fresh reply
4808 * buffer registering the differences between the BGSAVE and the current
4809 * dataset, so that we can copy to other slaves if needed. */
4810 if (listLength(c->reply) != 0) {
4811 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4812 return;
4813 }
4814
4815 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4816 /* Here we need to check if there is a background saving operation
4817 * in progress, or if it is required to start one */
4818 if (server.bgsaveinprogress) {
4819 /* Ok a background save is in progress. Let's check if it is a good
4820 * one for replication, i.e. if there is another slave that is
4821 * registering differences since the server forked to save */
4822 redisClient *slave;
4823 listNode *ln;
4824
4825 listRewind(server.slaves);
4826 while((ln = listYield(server.slaves))) {
4827 slave = ln->value;
4828 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4829 }
4830 if (ln) {
4831 /* Perfect, the server is already registering differences for
4832 * another slave. Set the right state, and copy the buffer. */
4833 listRelease(c->reply);
4834 c->reply = listDup(slave->reply);
4835 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4836 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4837 } else {
4838 /* No way, we need to wait for the next BGSAVE in order to
4839 * register differences */
4840 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4841 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4842 }
4843 } else {
4844 /* Ok we don't have a BGSAVE in progress, let's start one */
4845 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4846 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4847 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4848 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4849 return;
4850 }
4851 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4852 }
4853 c->repldbfd = -1;
4854 c->flags |= REDIS_SLAVE;
4855 c->slaveseldb = 0;
4856 listAddNodeTail(server.slaves,c);
4857 return;
4858 }
4859
4860 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4861 redisClient *slave = privdata;
4862 REDIS_NOTUSED(el);
4863 REDIS_NOTUSED(mask);
4864 char buf[REDIS_IOBUF_LEN];
4865 ssize_t nwritten, buflen;
4866
4867 if (slave->repldboff == 0) {
4868 /* Write the bulk write count before to transfer the DB. In theory here
4869 * we don't know how much room there is in the output buffer of the
4870 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4871 * operations) will never be smaller than the few bytes we need. */
4872 sds bulkcount;
4873
4874 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4875 slave->repldbsize);
4876 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4877 {
4878 sdsfree(bulkcount);
4879 freeClient(slave);
4880 return;
4881 }
4882 sdsfree(bulkcount);
4883 }
4884 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4885 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4886 if (buflen <= 0) {
4887 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4888 (buflen == 0) ? "premature EOF" : strerror(errno));
4889 freeClient(slave);
4890 return;
4891 }
4892 if ((nwritten = write(fd,buf,buflen)) == -1) {
4893 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4894 strerror(errno));
4895 freeClient(slave);
4896 return;
4897 }
4898 slave->repldboff += nwritten;
4899 if (slave->repldboff == slave->repldbsize) {
4900 close(slave->repldbfd);
4901 slave->repldbfd = -1;
4902 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4903 slave->replstate = REDIS_REPL_ONLINE;
4904 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4905 sendReplyToClient, slave, NULL) == AE_ERR) {
4906 freeClient(slave);
4907 return;
4908 }
4909 addReplySds(slave,sdsempty());
4910 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4911 }
4912 }
4913
4914 /* This function is called at the end of every backgrond saving.
4915 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4916 * otherwise REDIS_ERR is passed to the function.
4917 *
4918 * The goal of this function is to handle slaves waiting for a successful
4919 * background saving in order to perform non-blocking synchronization. */
4920 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4921 listNode *ln;
4922 int startbgsave = 0;
4923
4924 listRewind(server.slaves);
4925 while((ln = listYield(server.slaves))) {
4926 redisClient *slave = ln->value;
4927
4928 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4929 startbgsave = 1;
4930 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4931 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4932 struct redis_stat buf;
4933
4934 if (bgsaveerr != REDIS_OK) {
4935 freeClient(slave);
4936 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4937 continue;
4938 }
4939 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4940 redis_fstat(slave->repldbfd,&buf) == -1) {
4941 freeClient(slave);
4942 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4943 continue;
4944 }
4945 slave->repldboff = 0;
4946 slave->repldbsize = buf.st_size;
4947 slave->replstate = REDIS_REPL_SEND_BULK;
4948 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4949 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4950 freeClient(slave);
4951 continue;
4952 }
4953 }
4954 }
4955 if (startbgsave) {
4956 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4957 listRewind(server.slaves);
4958 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4959 while((ln = listYield(server.slaves))) {
4960 redisClient *slave = ln->value;
4961
4962 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4963 freeClient(slave);
4964 }
4965 }
4966 }
4967 }
4968
4969 static int syncWithMaster(void) {
4970 char buf[1024], tmpfile[256];
4971 int dumpsize;
4972 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4973 int dfd;
4974
4975 if (fd == -1) {
4976 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4977 strerror(errno));
4978 return REDIS_ERR;
4979 }
4980 /* Issue the SYNC command */
4981 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4982 close(fd);
4983 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4984 strerror(errno));
4985 return REDIS_ERR;
4986 }
4987 /* Read the bulk write count */
4988 if (syncReadLine(fd,buf,1024,3600) == -1) {
4989 close(fd);
4990 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
4991 strerror(errno));
4992 return REDIS_ERR;
4993 }
4994 dumpsize = atoi(buf+1);
4995 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4996 /* Read the bulk write data on a temp file */
4997 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4998 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4999 if (dfd == -1) {
5000 close(fd);
5001 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5002 return REDIS_ERR;
5003 }
5004 while(dumpsize) {
5005 int nread, nwritten;
5006
5007 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5008 if (nread == -1) {
5009 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5010 strerror(errno));
5011 close(fd);
5012 close(dfd);
5013 return REDIS_ERR;
5014 }
5015 nwritten = write(dfd,buf,nread);
5016 if (nwritten == -1) {
5017 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5018 close(fd);
5019 close(dfd);
5020 return REDIS_ERR;
5021 }
5022 dumpsize -= nread;
5023 }
5024 close(dfd);
5025 if (rename(tmpfile,server.dbfilename) == -1) {
5026 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5027 unlink(tmpfile);
5028 close(fd);
5029 return REDIS_ERR;
5030 }
5031 emptyDb();
5032 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5033 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5034 close(fd);
5035 return REDIS_ERR;
5036 }
5037 server.master = createClient(fd);
5038 server.master->flags |= REDIS_MASTER;
5039 server.replstate = REDIS_REPL_CONNECTED;
5040 return REDIS_OK;
5041 }
5042
5043 static void slaveofCommand(redisClient *c) {
5044 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5045 !strcasecmp(c->argv[2]->ptr,"one")) {
5046 if (server.masterhost) {
5047 sdsfree(server.masterhost);
5048 server.masterhost = NULL;
5049 if (server.master) freeClient(server.master);
5050 server.replstate = REDIS_REPL_NONE;
5051 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5052 }
5053 } else {
5054 sdsfree(server.masterhost);
5055 server.masterhost = sdsdup(c->argv[1]->ptr);
5056 server.masterport = atoi(c->argv[2]->ptr);
5057 if (server.master) freeClient(server.master);
5058 server.replstate = REDIS_REPL_CONNECT;
5059 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5060 server.masterhost, server.masterport);
5061 }
5062 addReply(c,shared.ok);
5063 }
5064
5065 /* ============================ Maxmemory directive ======================== */
5066
5067 /* This function gets called when 'maxmemory' is set on the config file to limit
5068 * the max memory used by the server, and we are out of memory.
5069 * This function will try to, in order:
5070 *
5071 * - Free objects from the free list
5072 * - Try to remove keys with an EXPIRE set
5073 *
5074 * It is not possible to free enough memory to reach used-memory < maxmemory
5075 * the server will start refusing commands that will enlarge even more the
5076 * memory usage.
5077 */
5078 static void freeMemoryIfNeeded(void) {
5079 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5080 if (listLength(server.objfreelist)) {
5081 robj *o;
5082
5083 listNode *head = listFirst(server.objfreelist);
5084 o = listNodeValue(head);
5085 listDelNode(server.objfreelist,head);
5086 zfree(o);
5087 } else {
5088 int j, k, freed = 0;
5089
5090 for (j = 0; j < server.dbnum; j++) {
5091 int minttl = -1;
5092 robj *minkey = NULL;
5093 struct dictEntry *de;
5094
5095 if (dictSize(server.db[j].expires)) {
5096 freed = 1;
5097 /* From a sample of three keys drop the one nearest to
5098 * the natural expire */
5099 for (k = 0; k < 3; k++) {
5100 time_t t;
5101
5102 de = dictGetRandomKey(server.db[j].expires);
5103 t = (time_t) dictGetEntryVal(de);
5104 if (minttl == -1 || t < minttl) {
5105 minkey = dictGetEntryKey(de);
5106 minttl = t;
5107 }
5108 }
5109 deleteKey(server.db+j,minkey);
5110 }
5111 }
5112 if (!freed) return; /* nothing to free... */
5113 }
5114 }
5115 }
5116
5117 /* ================================= Debugging ============================== */
5118
5119 static void debugCommand(redisClient *c) {
5120 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
5121 *((char*)-1) = 'x';
5122 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
5123 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
5124 robj *key, *val;
5125
5126 if (!de) {
5127 addReply(c,shared.nokeyerr);
5128 return;
5129 }
5130 key = dictGetEntryKey(de);
5131 val = dictGetEntryVal(de);
5132 addReplySds(c,sdscatprintf(sdsempty(),
5133 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5134 key, key->refcount, val, val->refcount, val->encoding));
5135 } else {
5136 addReplySds(c,sdsnew(
5137 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5138 }
5139 }
5140
5141 #ifdef HAVE_BACKTRACE
5142 static struct redisFunctionSym symsTable[] = {
5143 {"compareStringObjects", (unsigned long)compareStringObjects},
5144 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
5145 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
5146 {"dictEncObjHash", (unsigned long)dictEncObjHash},
5147 {"incrDecrCommand", (unsigned long)incrDecrCommand},
5148 {"freeStringObject", (unsigned long)freeStringObject},
5149 {"freeListObject", (unsigned long)freeListObject},
5150 {"freeSetObject", (unsigned long)freeSetObject},
5151 {"decrRefCount", (unsigned long)decrRefCount},
5152 {"createObject", (unsigned long)createObject},
5153 {"freeClient", (unsigned long)freeClient},
5154 {"rdbLoad", (unsigned long)rdbLoad},
5155 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
5156 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
5157 {"addReply", (unsigned long)addReply},
5158 {"addReplySds", (unsigned long)addReplySds},
5159 {"incrRefCount", (unsigned long)incrRefCount},
5160 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
5161 {"createStringObject", (unsigned long)createStringObject},
5162 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
5163 {"syncWithMaster", (unsigned long)syncWithMaster},
5164 {"tryObjectSharing", (unsigned long)tryObjectSharing},
5165 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
5166 {"getDecodedObject", (unsigned long)getDecodedObject},
5167 {"removeExpire", (unsigned long)removeExpire},
5168 {"expireIfNeeded", (unsigned long)expireIfNeeded},
5169 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
5170 {"deleteKey", (unsigned long)deleteKey},
5171 {"getExpire", (unsigned long)getExpire},
5172 {"setExpire", (unsigned long)setExpire},
5173 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
5174 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
5175 {"authCommand", (unsigned long)authCommand},
5176 {"pingCommand", (unsigned long)pingCommand},
5177 {"echoCommand", (unsigned long)echoCommand},
5178 {"setCommand", (unsigned long)setCommand},
5179 {"setnxCommand", (unsigned long)setnxCommand},
5180 {"getCommand", (unsigned long)getCommand},
5181 {"delCommand", (unsigned long)delCommand},
5182 {"existsCommand", (unsigned long)existsCommand},
5183 {"incrCommand", (unsigned long)incrCommand},
5184 {"decrCommand", (unsigned long)decrCommand},
5185 {"incrbyCommand", (unsigned long)incrbyCommand},
5186 {"decrbyCommand", (unsigned long)decrbyCommand},
5187 {"selectCommand", (unsigned long)selectCommand},
5188 {"randomkeyCommand", (unsigned long)randomkeyCommand},
5189 {"keysCommand", (unsigned long)keysCommand},
5190 {"dbsizeCommand", (unsigned long)dbsizeCommand},
5191 {"lastsaveCommand", (unsigned long)lastsaveCommand},
5192 {"saveCommand", (unsigned long)saveCommand},
5193 {"bgsaveCommand", (unsigned long)bgsaveCommand},
5194 {"shutdownCommand", (unsigned long)shutdownCommand},
5195 {"moveCommand", (unsigned long)moveCommand},
5196 {"renameCommand", (unsigned long)renameCommand},
5197 {"renamenxCommand", (unsigned long)renamenxCommand},
5198 {"lpushCommand", (unsigned long)lpushCommand},
5199 {"rpushCommand", (unsigned long)rpushCommand},
5200 {"lpopCommand", (unsigned long)lpopCommand},
5201 {"rpopCommand", (unsigned long)rpopCommand},
5202 {"llenCommand", (unsigned long)llenCommand},
5203 {"lindexCommand", (unsigned long)lindexCommand},
5204 {"lrangeCommand", (unsigned long)lrangeCommand},
5205 {"ltrimCommand", (unsigned long)ltrimCommand},
5206 {"typeCommand", (unsigned long)typeCommand},
5207 {"lsetCommand", (unsigned long)lsetCommand},
5208 {"saddCommand", (unsigned long)saddCommand},
5209 {"sremCommand", (unsigned long)sremCommand},
5210 {"smoveCommand", (unsigned long)smoveCommand},
5211 {"sismemberCommand", (unsigned long)sismemberCommand},
5212 {"scardCommand", (unsigned long)scardCommand},
5213 {"spopCommand", (unsigned long)spopCommand},
5214 {"srandmemberCommand", (unsigned long)srandmemberCommand},
5215 {"sinterCommand", (unsigned long)sinterCommand},
5216 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
5217 {"sunionCommand", (unsigned long)sunionCommand},
5218 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
5219 {"sdiffCommand", (unsigned long)sdiffCommand},
5220 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
5221 {"syncCommand", (unsigned long)syncCommand},
5222 {"flushdbCommand", (unsigned long)flushdbCommand},
5223 {"flushallCommand", (unsigned long)flushallCommand},
5224 {"sortCommand", (unsigned long)sortCommand},
5225 {"lremCommand", (unsigned long)lremCommand},
5226 {"infoCommand", (unsigned long)infoCommand},
5227 {"mgetCommand", (unsigned long)mgetCommand},
5228 {"monitorCommand", (unsigned long)monitorCommand},
5229 {"expireCommand", (unsigned long)expireCommand},
5230 {"getsetCommand", (unsigned long)getsetCommand},
5231 {"ttlCommand", (unsigned long)ttlCommand},
5232 {"slaveofCommand", (unsigned long)slaveofCommand},
5233 {"debugCommand", (unsigned long)debugCommand},
5234 {"processCommand", (unsigned long)processCommand},
5235 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
5236 {"readQueryFromClient", (unsigned long)readQueryFromClient},
5237 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
5238 {"msetGenericCommand", (unsigned long)msetGenericCommand},
5239 {"msetCommand", (unsigned long)msetCommand},
5240 {"msetnxCommand", (unsigned long)msetnxCommand},
5241 {"zslCreateNode", (unsigned long)zslCreateNode},
5242 {"zslCreate", (unsigned long)zslCreate},
5243 {"zslFreeNode",(unsigned long)zslFreeNode},
5244 {"zslFree",(unsigned long)zslFree},
5245 {"zslRandomLevel",(unsigned long)zslRandomLevel},
5246 {"zslInsert",(unsigned long)zslInsert},
5247 {"zslDelete",(unsigned long)zslDelete},
5248 {"createZsetObject",(unsigned long)createZsetObject},
5249 {"zaddCommand",(unsigned long)zaddCommand},
5250 {"zrangeGenericCommand",(unsigned long)zrangeGenericCommand},
5251 {"zrangeCommand",(unsigned long)zrangeCommand},
5252 {"zrevrangeCommand",(unsigned long)zrevrangeCommand},
5253 {"zremCommand",(unsigned long)zremCommand},
5254 {"rdbSaveDoubleValue",(unsigned long)rdbSaveDoubleValue},
5255 {"rdbLoadDoubleValue",(unsigned long)rdbLoadDoubleValue},
5256 {NULL,0}
5257 };
5258
5259 /* This function try to convert a pointer into a function name. It's used in
5260 * oreder to provide a backtrace under segmentation fault that's able to
5261 * display functions declared as static (otherwise the backtrace is useless). */
5262 static char *findFuncName(void *pointer, unsigned long *offset){
5263 int i, ret = -1;
5264 unsigned long off, minoff = 0;
5265
5266 /* Try to match against the Symbol with the smallest offset */
5267 for (i=0; symsTable[i].pointer; i++) {
5268 unsigned long lp = (unsigned long) pointer;
5269
5270 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5271 off=lp-symsTable[i].pointer;
5272 if (ret < 0 || off < minoff) {
5273 minoff=off;
5274 ret=i;
5275 }
5276 }
5277 }
5278 if (ret == -1) return NULL;
5279 *offset = minoff;
5280 return symsTable[ret].name;
5281 }
5282
5283 static void *getMcontextEip(ucontext_t *uc) {
5284 #if defined(__FreeBSD__)
5285 return (void*) uc->uc_mcontext.mc_eip;
5286 #elif defined(__dietlibc__)
5287 return (void*) uc->uc_mcontext.eip;
5288 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5289 return (void*) uc->uc_mcontext->__ss.__eip;
5290 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5291 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5292 return (void*) uc->uc_mcontext->__ss.__rip;
5293 #else
5294 return (void*) uc->uc_mcontext->__ss.__eip;
5295 #endif
5296 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5297 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5298 #elif defined(__ia64__) /* Linux IA64 */
5299 return (void*) uc->uc_mcontext.sc_ip;
5300 #else
5301 return NULL;
5302 #endif
5303 }
5304
5305 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5306 void *trace[100];
5307 char **messages = NULL;
5308 int i, trace_size = 0;
5309 unsigned long offset=0;
5310 time_t uptime = time(NULL)-server.stat_starttime;
5311 ucontext_t *uc = (ucontext_t*) secret;
5312 REDIS_NOTUSED(info);
5313
5314 redisLog(REDIS_WARNING,
5315 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5316 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5317 "redis_version:%s; "
5318 "uptime_in_seconds:%d; "
5319 "connected_clients:%d; "
5320 "connected_slaves:%d; "
5321 "used_memory:%zu; "
5322 "changes_since_last_save:%lld; "
5323 "bgsave_in_progress:%d; "
5324 "last_save_time:%d; "
5325 "total_connections_received:%lld; "
5326 "total_commands_processed:%lld; "
5327 "role:%s;"
5328 ,REDIS_VERSION,
5329 uptime,
5330 listLength(server.clients)-listLength(server.slaves),
5331 listLength(server.slaves),
5332 server.usedmemory,
5333 server.dirty,
5334 server.bgsaveinprogress,
5335 server.lastsave,
5336 server.stat_numconnections,
5337 server.stat_numcommands,
5338 server.masterhost == NULL ? "master" : "slave"
5339 ));
5340
5341 trace_size = backtrace(trace, 100);
5342 /* overwrite sigaction with caller's address */
5343 if (getMcontextEip(uc) != NULL) {
5344 trace[1] = getMcontextEip(uc);
5345 }
5346 messages = backtrace_symbols(trace, trace_size);
5347
5348 for (i=1; i<trace_size; ++i) {
5349 char *fn = findFuncName(trace[i], &offset), *p;
5350
5351 p = strchr(messages[i],'+');
5352 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5353 redisLog(REDIS_WARNING,"%s", messages[i]);
5354 } else {
5355 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5356 }
5357 }
5358 free(messages);
5359 exit(0);
5360 }
5361
5362 static void setupSigSegvAction(void) {
5363 struct sigaction act;
5364
5365 sigemptyset (&act.sa_mask);
5366 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5367 * is used. Otherwise, sa_handler is used */
5368 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5369 act.sa_sigaction = segvHandler;
5370 sigaction (SIGSEGV, &act, NULL);
5371 sigaction (SIGBUS, &act, NULL);
5372 sigaction (SIGFPE, &act, NULL);
5373 sigaction (SIGILL, &act, NULL);
5374 sigaction (SIGBUS, &act, NULL);
5375 return;
5376 }
5377 #else /* HAVE_BACKTRACE */
5378 static void setupSigSegvAction(void) {
5379 }
5380 #endif /* HAVE_BACKTRACE */
5381
5382 /* =================================== Main! ================================ */
5383
5384 #ifdef __linux__
5385 int linuxOvercommitMemoryValue(void) {
5386 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5387 char buf[64];
5388
5389 if (!fp) return -1;
5390 if (fgets(buf,64,fp) == NULL) {
5391 fclose(fp);
5392 return -1;
5393 }
5394 fclose(fp);
5395
5396 return atoi(buf);
5397 }
5398
5399 void linuxOvercommitMemoryWarning(void) {
5400 if (linuxOvercommitMemoryValue() == 0) {
5401 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5402 }
5403 }
5404 #endif /* __linux__ */
5405
5406 static void daemonize(void) {
5407 int fd;
5408 FILE *fp;
5409
5410 if (fork() != 0) exit(0); /* parent exits */
5411 setsid(); /* create a new session */
5412
5413 /* Every output goes to /dev/null. If Redis is daemonized but
5414 * the 'logfile' is set to 'stdout' in the configuration file
5415 * it will not log at all. */
5416 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5417 dup2(fd, STDIN_FILENO);
5418 dup2(fd, STDOUT_FILENO);
5419 dup2(fd, STDERR_FILENO);
5420 if (fd > STDERR_FILENO) close(fd);
5421 }
5422 /* Try to write the pid file */
5423 fp = fopen(server.pidfile,"w");
5424 if (fp) {
5425 fprintf(fp,"%d\n",getpid());
5426 fclose(fp);
5427 }
5428 }
5429
5430 int main(int argc, char **argv) {
5431 initServerConfig();
5432 if (argc == 2) {
5433 ResetServerSaveParams();
5434 loadServerConfig(argv[1]);
5435 } else if (argc > 2) {
5436 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5437 exit(1);
5438 } else {
5439 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5440 }
5441 initServer();
5442 if (server.daemonize) daemonize();
5443 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5444 #ifdef __linux__
5445 linuxOvercommitMemoryWarning();
5446 #endif
5447 if (rdbLoad(server.dbfilename) == REDIS_OK)
5448 redisLog(REDIS_NOTICE,"DB loaded from disk");
5449 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5450 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5451 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5452 aeMain(server.el);
5453 aeDeleteEventLoop(server.el);
5454 return 0;
5455 }