]> git.saurik.com Git - redis.git/blob - redis.c
19abda2067bf608efcfb4234d17105fe92dbf428
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60 #include <math.h>
61
62 #if defined(__sun)
63 #include "solarisfixes.h"
64 #endif
65
66 #include "redis.h"
67 #include "ae.h" /* Event driven programming library */
68 #include "sds.h" /* Dynamic safe strings */
69 #include "anet.h" /* Networking the easy way */
70 #include "dict.h" /* Hash tables */
71 #include "adlist.h" /* Linked lists */
72 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
73 #include "lzf.h" /* LZF compression library */
74 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
75
76 /* Error codes */
77 #define REDIS_OK 0
78 #define REDIS_ERR -1
79
80 /* Static server configuration */
81 #define REDIS_SERVERPORT 6379 /* TCP port */
82 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
83 #define REDIS_IOBUF_LEN 1024
84 #define REDIS_LOADBUF_LEN 1024
85 #define REDIS_STATIC_ARGS 4
86 #define REDIS_DEFAULT_DBNUM 16
87 #define REDIS_CONFIGLINE_MAX 1024
88 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
89 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
90 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
91 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
92 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
93
94 /* Hash table parameters */
95 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
96
97 /* Command flags */
98 #define REDIS_CMD_BULK 1 /* Bulk write command */
99 #define REDIS_CMD_INLINE 2 /* Inline command */
100 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
101 this flags will return an error when the 'maxmemory' option is set in the
102 config file and the server is using more than maxmemory bytes of memory.
103 In short this commands are denied on low memory conditions. */
104 #define REDIS_CMD_DENYOOM 4
105
106 /* Object types */
107 #define REDIS_STRING 0
108 #define REDIS_LIST 1
109 #define REDIS_SET 2
110 #define REDIS_ZSET 3
111 #define REDIS_HASH 4
112
113 /* Objects encoding */
114 #define REDIS_ENCODING_RAW 0 /* Raw representation */
115 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
116
117 /* Object types only used for dumping to disk */
118 #define REDIS_EXPIRETIME 253
119 #define REDIS_SELECTDB 254
120 #define REDIS_EOF 255
121
122 /* Defines related to the dump file format. To store 32 bits lengths for short
123 * keys requires a lot of space, so we check the most significant 2 bits of
124 * the first byte to interpreter the length:
125 *
126 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
127 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
128 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
129 * 11|000000 this means: specially encoded object will follow. The six bits
130 * number specify the kind of object that follows.
131 * See the REDIS_RDB_ENC_* defines.
132 *
133 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
134 * values, will fit inside. */
135 #define REDIS_RDB_6BITLEN 0
136 #define REDIS_RDB_14BITLEN 1
137 #define REDIS_RDB_32BITLEN 2
138 #define REDIS_RDB_ENCVAL 3
139 #define REDIS_RDB_LENERR UINT_MAX
140
141 /* When a length of a string object stored on disk has the first two bits
142 * set, the remaining two bits specify a special encoding for the object
143 * accordingly to the following defines: */
144 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
145 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
146 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
147 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
148
149 /* Client flags */
150 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
151 #define REDIS_SLAVE 2 /* This client is a slave server */
152 #define REDIS_MASTER 4 /* This client is a master server */
153 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
154
155 /* Slave replication state - slave side */
156 #define REDIS_REPL_NONE 0 /* No active replication */
157 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
158 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
159
160 /* Slave replication state - from the point of view of master
161 * Note that in SEND_BULK and ONLINE state the slave receives new updates
162 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
163 * to start the next background saving in order to send updates to it. */
164 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
165 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
166 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
167 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
168
169 /* List related stuff */
170 #define REDIS_HEAD 0
171 #define REDIS_TAIL 1
172
173 /* Sort operations */
174 #define REDIS_SORT_GET 0
175 #define REDIS_SORT_DEL 1
176 #define REDIS_SORT_INCR 2
177 #define REDIS_SORT_DECR 3
178 #define REDIS_SORT_ASC 4
179 #define REDIS_SORT_DESC 5
180 #define REDIS_SORTKEY_MAX 1024
181
182 /* Log levels */
183 #define REDIS_DEBUG 0
184 #define REDIS_NOTICE 1
185 #define REDIS_WARNING 2
186
187 /* Anti-warning macro... */
188 #define REDIS_NOTUSED(V) ((void) V)
189
190 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
191 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
192
193 /*================================= Data types ============================== */
194
195 /* A redis object, that is a type able to hold a string / list / set */
196 typedef struct redisObject {
197 void *ptr;
198 unsigned char type;
199 unsigned char encoding;
200 unsigned char notused[2];
201 int refcount;
202 } robj;
203
204 typedef struct redisDb {
205 dict *dict;
206 dict *expires;
207 int id;
208 } redisDb;
209
210 /* With multiplexing we need to take per-clinet state.
211 * Clients are taken in a liked list. */
212 typedef struct redisClient {
213 int fd;
214 redisDb *db;
215 int dictid;
216 sds querybuf;
217 robj **argv, **mbargv;
218 int argc, mbargc;
219 int bulklen; /* bulk read len. -1 if not in bulk read mode */
220 int multibulk; /* multi bulk command format active */
221 list *reply;
222 int sentlen;
223 time_t lastinteraction; /* time of the last interaction, used for timeout */
224 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
225 int slaveseldb; /* slave selected db, if this client is a slave */
226 int authenticated; /* when requirepass is non-NULL */
227 int replstate; /* replication state if this is a slave */
228 int repldbfd; /* replication DB file descriptor */
229 long repldboff; /* replication DB file offset */
230 off_t repldbsize; /* replication DB file size */
231 } redisClient;
232
233 struct saveparam {
234 time_t seconds;
235 int changes;
236 };
237
238 /* Global server state structure */
239 struct redisServer {
240 int port;
241 int fd;
242 redisDb *db;
243 dict *sharingpool;
244 unsigned int sharingpoolsize;
245 long long dirty; /* changes to DB from the last save */
246 list *clients;
247 list *slaves, *monitors;
248 char neterr[ANET_ERR_LEN];
249 aeEventLoop *el;
250 int cronloops; /* number of times the cron function run */
251 list *objfreelist; /* A list of freed objects to avoid malloc() */
252 time_t lastsave; /* Unix time of last save succeeede */
253 size_t usedmemory; /* Used memory in megabytes */
254 /* Fields used only for stats */
255 time_t stat_starttime; /* server start time */
256 long long stat_numcommands; /* number of processed commands */
257 long long stat_numconnections; /* number of connections received */
258 /* Configuration */
259 int verbosity;
260 int glueoutputbuf;
261 int maxidletime;
262 int dbnum;
263 int daemonize;
264 int appendonly;
265 int appendfd;
266 int appendseldb;
267 char *pidfile;
268 int bgsaveinprogress;
269 pid_t bgsavechildpid;
270 struct saveparam *saveparams;
271 int saveparamslen;
272 char *logfile;
273 char *bindaddr;
274 char *dbfilename;
275 char *appendfilename;
276 char *requirepass;
277 int shareobjects;
278 /* Replication related */
279 int isslave;
280 char *masterhost;
281 int masterport;
282 redisClient *master; /* client that is master for this slave */
283 int replstate;
284 unsigned int maxclients;
285 unsigned long maxmemory;
286 /* Sort parameters - qsort_r() is only available under BSD so we
287 * have to take this state global, in order to pass it to sortCompare() */
288 int sort_desc;
289 int sort_alpha;
290 int sort_bypattern;
291 };
292
293 typedef void redisCommandProc(redisClient *c);
294 struct redisCommand {
295 char *name;
296 redisCommandProc *proc;
297 int arity;
298 int flags;
299 };
300
301 struct redisFunctionSym {
302 char *name;
303 unsigned long pointer;
304 };
305
306 typedef struct _redisSortObject {
307 robj *obj;
308 union {
309 double score;
310 robj *cmpobj;
311 } u;
312 } redisSortObject;
313
314 typedef struct _redisSortOperation {
315 int type;
316 robj *pattern;
317 } redisSortOperation;
318
319 /* ZSETs use a specialized version of Skiplists */
320
321 typedef struct zskiplistNode {
322 struct zskiplistNode **forward;
323 struct zskiplistNode *backward;
324 double score;
325 robj *obj;
326 } zskiplistNode;
327
328 typedef struct zskiplist {
329 struct zskiplistNode *header, *tail;
330 unsigned long length;
331 int level;
332 } zskiplist;
333
334 typedef struct zset {
335 dict *dict;
336 zskiplist *zsl;
337 } zset;
338
339 /* Our shared "common" objects */
340
341 struct sharedObjectsStruct {
342 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
343 *colon, *nullbulk, *nullmultibulk,
344 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
345 *outofrangeerr, *plus,
346 *select0, *select1, *select2, *select3, *select4,
347 *select5, *select6, *select7, *select8, *select9;
348 } shared;
349
350 /* Global vars that are actally used as constants. The following double
351 * values are used for double on-disk serialization, and are initialized
352 * at runtime to avoid strange compiler optimizations. */
353
354 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
355
356 /*================================ Prototypes =============================== */
357
358 static void freeStringObject(robj *o);
359 static void freeListObject(robj *o);
360 static void freeSetObject(robj *o);
361 static void decrRefCount(void *o);
362 static robj *createObject(int type, void *ptr);
363 static void freeClient(redisClient *c);
364 static int rdbLoad(char *filename);
365 static void addReply(redisClient *c, robj *obj);
366 static void addReplySds(redisClient *c, sds s);
367 static void incrRefCount(robj *o);
368 static int rdbSaveBackground(char *filename);
369 static robj *createStringObject(char *ptr, size_t len);
370 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
371 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
372 static int syncWithMaster(void);
373 static robj *tryObjectSharing(robj *o);
374 static int tryObjectEncoding(robj *o);
375 static robj *getDecodedObject(const robj *o);
376 static int removeExpire(redisDb *db, robj *key);
377 static int expireIfNeeded(redisDb *db, robj *key);
378 static int deleteIfVolatile(redisDb *db, robj *key);
379 static int deleteKey(redisDb *db, robj *key);
380 static time_t getExpire(redisDb *db, robj *key);
381 static int setExpire(redisDb *db, robj *key, time_t when);
382 static void updateSlavesWaitingBgsave(int bgsaveerr);
383 static void freeMemoryIfNeeded(void);
384 static int processCommand(redisClient *c);
385 static void setupSigSegvAction(void);
386 static void rdbRemoveTempFile(pid_t childpid);
387 static size_t stringObjectLen(robj *o);
388 static void processInputBuffer(redisClient *c);
389 static zskiplist *zslCreate(void);
390 static void zslFree(zskiplist *zsl);
391 static void zslInsert(zskiplist *zsl, double score, robj *obj);
392
393 static void authCommand(redisClient *c);
394 static void pingCommand(redisClient *c);
395 static void echoCommand(redisClient *c);
396 static void setCommand(redisClient *c);
397 static void setnxCommand(redisClient *c);
398 static void getCommand(redisClient *c);
399 static void delCommand(redisClient *c);
400 static void existsCommand(redisClient *c);
401 static void incrCommand(redisClient *c);
402 static void decrCommand(redisClient *c);
403 static void incrbyCommand(redisClient *c);
404 static void decrbyCommand(redisClient *c);
405 static void selectCommand(redisClient *c);
406 static void randomkeyCommand(redisClient *c);
407 static void keysCommand(redisClient *c);
408 static void dbsizeCommand(redisClient *c);
409 static void lastsaveCommand(redisClient *c);
410 static void saveCommand(redisClient *c);
411 static void bgsaveCommand(redisClient *c);
412 static void shutdownCommand(redisClient *c);
413 static void moveCommand(redisClient *c);
414 static void renameCommand(redisClient *c);
415 static void renamenxCommand(redisClient *c);
416 static void lpushCommand(redisClient *c);
417 static void rpushCommand(redisClient *c);
418 static void lpopCommand(redisClient *c);
419 static void rpopCommand(redisClient *c);
420 static void llenCommand(redisClient *c);
421 static void lindexCommand(redisClient *c);
422 static void lrangeCommand(redisClient *c);
423 static void ltrimCommand(redisClient *c);
424 static void typeCommand(redisClient *c);
425 static void lsetCommand(redisClient *c);
426 static void saddCommand(redisClient *c);
427 static void sremCommand(redisClient *c);
428 static void smoveCommand(redisClient *c);
429 static void sismemberCommand(redisClient *c);
430 static void scardCommand(redisClient *c);
431 static void spopCommand(redisClient *c);
432 static void srandmemberCommand(redisClient *c);
433 static void sinterCommand(redisClient *c);
434 static void sinterstoreCommand(redisClient *c);
435 static void sunionCommand(redisClient *c);
436 static void sunionstoreCommand(redisClient *c);
437 static void sdiffCommand(redisClient *c);
438 static void sdiffstoreCommand(redisClient *c);
439 static void syncCommand(redisClient *c);
440 static void flushdbCommand(redisClient *c);
441 static void flushallCommand(redisClient *c);
442 static void sortCommand(redisClient *c);
443 static void lremCommand(redisClient *c);
444 static void infoCommand(redisClient *c);
445 static void mgetCommand(redisClient *c);
446 static void monitorCommand(redisClient *c);
447 static void expireCommand(redisClient *c);
448 static void expireatCommand(redisClient *c);
449 static void getsetCommand(redisClient *c);
450 static void ttlCommand(redisClient *c);
451 static void slaveofCommand(redisClient *c);
452 static void debugCommand(redisClient *c);
453 static void msetCommand(redisClient *c);
454 static void msetnxCommand(redisClient *c);
455 static void zaddCommand(redisClient *c);
456 static void zrangeCommand(redisClient *c);
457 static void zrangebyscoreCommand(redisClient *c);
458 static void zrevrangeCommand(redisClient *c);
459 static void zcardCommand(redisClient *c);
460 static void zremCommand(redisClient *c);
461 static void zscoreCommand(redisClient *c);
462 static void zremrangebyscoreCommand(redisClient *c);
463
464 /*================================= Globals ================================= */
465
466 /* Global vars */
467 static struct redisServer server; /* server global state */
468 static struct redisCommand cmdTable[] = {
469 {"get",getCommand,2,REDIS_CMD_INLINE},
470 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
471 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
472 {"del",delCommand,-2,REDIS_CMD_INLINE},
473 {"exists",existsCommand,2,REDIS_CMD_INLINE},
474 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
475 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
476 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
477 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
478 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
479 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
480 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
481 {"llen",llenCommand,2,REDIS_CMD_INLINE},
482 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
483 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
484 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
485 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
486 {"lrem",lremCommand,4,REDIS_CMD_BULK},
487 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
488 {"srem",sremCommand,3,REDIS_CMD_BULK},
489 {"smove",smoveCommand,4,REDIS_CMD_BULK},
490 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
491 {"scard",scardCommand,2,REDIS_CMD_INLINE},
492 {"spop",spopCommand,2,REDIS_CMD_INLINE},
493 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
494 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
495 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
496 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
497 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
498 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
499 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
500 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
501 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
502 {"zrem",zremCommand,3,REDIS_CMD_BULK},
503 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
504 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
505 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
506 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
507 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
508 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
509 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
510 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
511 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
512 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
513 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
514 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
515 {"select",selectCommand,2,REDIS_CMD_INLINE},
516 {"move",moveCommand,3,REDIS_CMD_INLINE},
517 {"rename",renameCommand,3,REDIS_CMD_INLINE},
518 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
519 {"expire",expireCommand,3,REDIS_CMD_INLINE},
520 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
521 {"keys",keysCommand,2,REDIS_CMD_INLINE},
522 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
523 {"auth",authCommand,2,REDIS_CMD_INLINE},
524 {"ping",pingCommand,1,REDIS_CMD_INLINE},
525 {"echo",echoCommand,2,REDIS_CMD_BULK},
526 {"save",saveCommand,1,REDIS_CMD_INLINE},
527 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
528 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
529 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
530 {"type",typeCommand,2,REDIS_CMD_INLINE},
531 {"sync",syncCommand,1,REDIS_CMD_INLINE},
532 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
533 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
534 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
535 {"info",infoCommand,1,REDIS_CMD_INLINE},
536 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
537 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
538 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
539 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
540 {NULL,NULL,0,0}
541 };
542 /*============================ Utility functions ============================ */
543
544 /* Glob-style pattern matching. */
545 int stringmatchlen(const char *pattern, int patternLen,
546 const char *string, int stringLen, int nocase)
547 {
548 while(patternLen) {
549 switch(pattern[0]) {
550 case '*':
551 while (pattern[1] == '*') {
552 pattern++;
553 patternLen--;
554 }
555 if (patternLen == 1)
556 return 1; /* match */
557 while(stringLen) {
558 if (stringmatchlen(pattern+1, patternLen-1,
559 string, stringLen, nocase))
560 return 1; /* match */
561 string++;
562 stringLen--;
563 }
564 return 0; /* no match */
565 break;
566 case '?':
567 if (stringLen == 0)
568 return 0; /* no match */
569 string++;
570 stringLen--;
571 break;
572 case '[':
573 {
574 int not, match;
575
576 pattern++;
577 patternLen--;
578 not = pattern[0] == '^';
579 if (not) {
580 pattern++;
581 patternLen--;
582 }
583 match = 0;
584 while(1) {
585 if (pattern[0] == '\\') {
586 pattern++;
587 patternLen--;
588 if (pattern[0] == string[0])
589 match = 1;
590 } else if (pattern[0] == ']') {
591 break;
592 } else if (patternLen == 0) {
593 pattern--;
594 patternLen++;
595 break;
596 } else if (pattern[1] == '-' && patternLen >= 3) {
597 int start = pattern[0];
598 int end = pattern[2];
599 int c = string[0];
600 if (start > end) {
601 int t = start;
602 start = end;
603 end = t;
604 }
605 if (nocase) {
606 start = tolower(start);
607 end = tolower(end);
608 c = tolower(c);
609 }
610 pattern += 2;
611 patternLen -= 2;
612 if (c >= start && c <= end)
613 match = 1;
614 } else {
615 if (!nocase) {
616 if (pattern[0] == string[0])
617 match = 1;
618 } else {
619 if (tolower((int)pattern[0]) == tolower((int)string[0]))
620 match = 1;
621 }
622 }
623 pattern++;
624 patternLen--;
625 }
626 if (not)
627 match = !match;
628 if (!match)
629 return 0; /* no match */
630 string++;
631 stringLen--;
632 break;
633 }
634 case '\\':
635 if (patternLen >= 2) {
636 pattern++;
637 patternLen--;
638 }
639 /* fall through */
640 default:
641 if (!nocase) {
642 if (pattern[0] != string[0])
643 return 0; /* no match */
644 } else {
645 if (tolower((int)pattern[0]) != tolower((int)string[0]))
646 return 0; /* no match */
647 }
648 string++;
649 stringLen--;
650 break;
651 }
652 pattern++;
653 patternLen--;
654 if (stringLen == 0) {
655 while(*pattern == '*') {
656 pattern++;
657 patternLen--;
658 }
659 break;
660 }
661 }
662 if (patternLen == 0 && stringLen == 0)
663 return 1;
664 return 0;
665 }
666
667 static void redisLog(int level, const char *fmt, ...) {
668 va_list ap;
669 FILE *fp;
670
671 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
672 if (!fp) return;
673
674 va_start(ap, fmt);
675 if (level >= server.verbosity) {
676 char *c = ".-*";
677 char buf[64];
678 time_t now;
679
680 now = time(NULL);
681 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
682 fprintf(fp,"%s %c ",buf,c[level]);
683 vfprintf(fp, fmt, ap);
684 fprintf(fp,"\n");
685 fflush(fp);
686 }
687 va_end(ap);
688
689 if (server.logfile) fclose(fp);
690 }
691
692 /*====================== Hash table type implementation ==================== */
693
694 /* This is an hash table type that uses the SDS dynamic strings libary as
695 * keys and radis objects as values (objects can hold SDS strings,
696 * lists, sets). */
697
698 static void dictVanillaFree(void *privdata, void *val)
699 {
700 DICT_NOTUSED(privdata);
701 zfree(val);
702 }
703
704 static int sdsDictKeyCompare(void *privdata, const void *key1,
705 const void *key2)
706 {
707 int l1,l2;
708 DICT_NOTUSED(privdata);
709
710 l1 = sdslen((sds)key1);
711 l2 = sdslen((sds)key2);
712 if (l1 != l2) return 0;
713 return memcmp(key1, key2, l1) == 0;
714 }
715
716 static void dictRedisObjectDestructor(void *privdata, void *val)
717 {
718 DICT_NOTUSED(privdata);
719
720 decrRefCount(val);
721 }
722
723 static int dictObjKeyCompare(void *privdata, const void *key1,
724 const void *key2)
725 {
726 const robj *o1 = key1, *o2 = key2;
727 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
728 }
729
730 static unsigned int dictObjHash(const void *key) {
731 const robj *o = key;
732 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
733 }
734
735 static int dictEncObjKeyCompare(void *privdata, const void *key1,
736 const void *key2)
737 {
738 const robj *o1 = key1, *o2 = key2;
739
740 if (o1->encoding == REDIS_ENCODING_RAW &&
741 o2->encoding == REDIS_ENCODING_RAW)
742 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
743 else {
744 robj *dec1, *dec2;
745 int cmp;
746
747 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
748 getDecodedObject(o1) : (robj*)o1;
749 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
750 getDecodedObject(o2) : (robj*)o2;
751 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
752 if (dec1 != o1) decrRefCount(dec1);
753 if (dec2 != o2) decrRefCount(dec2);
754 return cmp;
755 }
756 }
757
758 static unsigned int dictEncObjHash(const void *key) {
759 const robj *o = key;
760
761 if (o->encoding == REDIS_ENCODING_RAW)
762 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
763 else {
764 robj *dec = getDecodedObject(o);
765 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
766 decrRefCount(dec);
767 return hash;
768 }
769 }
770
771 static dictType setDictType = {
772 dictEncObjHash, /* hash function */
773 NULL, /* key dup */
774 NULL, /* val dup */
775 dictEncObjKeyCompare, /* key compare */
776 dictRedisObjectDestructor, /* key destructor */
777 NULL /* val destructor */
778 };
779
780 static dictType zsetDictType = {
781 dictEncObjHash, /* hash function */
782 NULL, /* key dup */
783 NULL, /* val dup */
784 dictEncObjKeyCompare, /* key compare */
785 dictRedisObjectDestructor, /* key destructor */
786 dictVanillaFree /* val destructor */
787 };
788
789 static dictType hashDictType = {
790 dictObjHash, /* hash function */
791 NULL, /* key dup */
792 NULL, /* val dup */
793 dictObjKeyCompare, /* key compare */
794 dictRedisObjectDestructor, /* key destructor */
795 dictRedisObjectDestructor /* val destructor */
796 };
797
798 /* ========================= Random utility functions ======================= */
799
800 /* Redis generally does not try to recover from out of memory conditions
801 * when allocating objects or strings, it is not clear if it will be possible
802 * to report this condition to the client since the networking layer itself
803 * is based on heap allocation for send buffers, so we simply abort.
804 * At least the code will be simpler to read... */
805 static void oom(const char *msg) {
806 fprintf(stderr, "%s: Out of memory\n",msg);
807 fflush(stderr);
808 sleep(1);
809 abort();
810 }
811
812 /* ====================== Redis server networking stuff ===================== */
813 static void closeTimedoutClients(void) {
814 redisClient *c;
815 listNode *ln;
816 time_t now = time(NULL);
817
818 listRewind(server.clients);
819 while ((ln = listYield(server.clients)) != NULL) {
820 c = listNodeValue(ln);
821 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
822 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
823 (now - c->lastinteraction > server.maxidletime)) {
824 redisLog(REDIS_DEBUG,"Closing idle client");
825 freeClient(c);
826 }
827 }
828 }
829
830 static int htNeedsResize(dict *dict) {
831 long long size, used;
832
833 size = dictSlots(dict);
834 used = dictSize(dict);
835 return (size && used && size > DICT_HT_INITIAL_SIZE &&
836 (used*100/size < REDIS_HT_MINFILL));
837 }
838
839 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
840 * we resize the hash table to save memory */
841 static void tryResizeHashTables(void) {
842 int j;
843
844 for (j = 0; j < server.dbnum; j++) {
845 if (htNeedsResize(server.db[j].dict)) {
846 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
847 dictResize(server.db[j].dict);
848 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
849 }
850 if (htNeedsResize(server.db[j].expires))
851 dictResize(server.db[j].expires);
852 }
853 }
854
855 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
856 int j, loops = server.cronloops++;
857 REDIS_NOTUSED(eventLoop);
858 REDIS_NOTUSED(id);
859 REDIS_NOTUSED(clientData);
860
861 /* Update the global state with the amount of used memory */
862 server.usedmemory = zmalloc_used_memory();
863
864 /* Show some info about non-empty databases */
865 for (j = 0; j < server.dbnum; j++) {
866 long long size, used, vkeys;
867
868 size = dictSlots(server.db[j].dict);
869 used = dictSize(server.db[j].dict);
870 vkeys = dictSize(server.db[j].expires);
871 if (!(loops % 5) && (used || vkeys)) {
872 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
873 /* dictPrintStats(server.dict); */
874 }
875 }
876
877 /* We don't want to resize the hash tables while a bacground saving
878 * is in progress: the saving child is created using fork() that is
879 * implemented with a copy-on-write semantic in most modern systems, so
880 * if we resize the HT while there is the saving child at work actually
881 * a lot of memory movements in the parent will cause a lot of pages
882 * copied. */
883 if (!server.bgsaveinprogress) tryResizeHashTables();
884
885 /* Show information about connected clients */
886 if (!(loops % 5)) {
887 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
888 listLength(server.clients)-listLength(server.slaves),
889 listLength(server.slaves),
890 server.usedmemory,
891 dictSize(server.sharingpool));
892 }
893
894 /* Close connections of timedout clients */
895 if (server.maxidletime && !(loops % 10))
896 closeTimedoutClients();
897
898 /* Check if a background saving in progress terminated */
899 if (server.bgsaveinprogress) {
900 int statloc;
901 if (wait4(-1,&statloc,WNOHANG,NULL)) {
902 int exitcode = WEXITSTATUS(statloc);
903 int bysignal = WIFSIGNALED(statloc);
904
905 if (!bysignal && exitcode == 0) {
906 redisLog(REDIS_NOTICE,
907 "Background saving terminated with success");
908 server.dirty = 0;
909 server.lastsave = time(NULL);
910 } else if (!bysignal && exitcode != 0) {
911 redisLog(REDIS_WARNING, "Background saving error");
912 } else {
913 redisLog(REDIS_WARNING,
914 "Background saving terminated by signal");
915 rdbRemoveTempFile(server.bgsavechildpid);
916 }
917 server.bgsaveinprogress = 0;
918 server.bgsavechildpid = -1;
919 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
920 }
921 } else {
922 /* If there is not a background saving in progress check if
923 * we have to save now */
924 time_t now = time(NULL);
925 for (j = 0; j < server.saveparamslen; j++) {
926 struct saveparam *sp = server.saveparams+j;
927
928 if (server.dirty >= sp->changes &&
929 now-server.lastsave > sp->seconds) {
930 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
931 sp->changes, sp->seconds);
932 rdbSaveBackground(server.dbfilename);
933 break;
934 }
935 }
936 }
937
938 /* Try to expire a few timed out keys */
939 for (j = 0; j < server.dbnum; j++) {
940 redisDb *db = server.db+j;
941 int num = dictSize(db->expires);
942
943 if (num) {
944 time_t now = time(NULL);
945
946 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
947 num = REDIS_EXPIRELOOKUPS_PER_CRON;
948 while (num--) {
949 dictEntry *de;
950 time_t t;
951
952 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
953 t = (time_t) dictGetEntryVal(de);
954 if (now > t) {
955 deleteKey(db,dictGetEntryKey(de));
956 }
957 }
958 }
959 }
960
961 /* Check if we should connect to a MASTER */
962 if (server.replstate == REDIS_REPL_CONNECT) {
963 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
964 if (syncWithMaster() == REDIS_OK) {
965 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
966 }
967 }
968 return 1000;
969 }
970
971 static void createSharedObjects(void) {
972 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
973 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
974 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
975 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
976 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
977 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
978 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
979 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
980 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
981 /* no such key */
982 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
983 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
984 "-ERR Operation against a key holding the wrong kind of value\r\n"));
985 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
986 "-ERR no such key\r\n"));
987 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
988 "-ERR syntax error\r\n"));
989 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
990 "-ERR source and destination objects are the same\r\n"));
991 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
992 "-ERR index out of range\r\n"));
993 shared.space = createObject(REDIS_STRING,sdsnew(" "));
994 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
995 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
996 shared.select0 = createStringObject("select 0\r\n",10);
997 shared.select1 = createStringObject("select 1\r\n",10);
998 shared.select2 = createStringObject("select 2\r\n",10);
999 shared.select3 = createStringObject("select 3\r\n",10);
1000 shared.select4 = createStringObject("select 4\r\n",10);
1001 shared.select5 = createStringObject("select 5\r\n",10);
1002 shared.select6 = createStringObject("select 6\r\n",10);
1003 shared.select7 = createStringObject("select 7\r\n",10);
1004 shared.select8 = createStringObject("select 8\r\n",10);
1005 shared.select9 = createStringObject("select 9\r\n",10);
1006 }
1007
1008 static void appendServerSaveParams(time_t seconds, int changes) {
1009 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1010 server.saveparams[server.saveparamslen].seconds = seconds;
1011 server.saveparams[server.saveparamslen].changes = changes;
1012 server.saveparamslen++;
1013 }
1014
1015 static void ResetServerSaveParams() {
1016 zfree(server.saveparams);
1017 server.saveparams = NULL;
1018 server.saveparamslen = 0;
1019 }
1020
1021 static void initServerConfig() {
1022 server.dbnum = REDIS_DEFAULT_DBNUM;
1023 server.port = REDIS_SERVERPORT;
1024 server.verbosity = REDIS_DEBUG;
1025 server.maxidletime = REDIS_MAXIDLETIME;
1026 server.saveparams = NULL;
1027 server.logfile = NULL; /* NULL = log on standard output */
1028 server.bindaddr = NULL;
1029 server.glueoutputbuf = 1;
1030 server.daemonize = 0;
1031 server.appendonly = 0;
1032 server.appendfd = -1;
1033 server.appendseldb = -1; /* Make sure the first time will not match */
1034 server.pidfile = "/var/run/redis.pid";
1035 server.dbfilename = "dump.rdb";
1036 server.appendfilename = "appendonly.log";
1037 server.requirepass = NULL;
1038 server.shareobjects = 0;
1039 server.sharingpoolsize = 1024;
1040 server.maxclients = 0;
1041 server.maxmemory = 0;
1042 ResetServerSaveParams();
1043
1044 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1045 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1046 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1047 /* Replication related */
1048 server.isslave = 0;
1049 server.masterhost = NULL;
1050 server.masterport = 6379;
1051 server.master = NULL;
1052 server.replstate = REDIS_REPL_NONE;
1053
1054 /* Double constants initialization */
1055 R_Zero = 0.0;
1056 R_PosInf = 1.0/R_Zero;
1057 R_NegInf = -1.0/R_Zero;
1058 R_Nan = R_Zero/R_Zero;
1059 }
1060
1061 static void initServer() {
1062 int j;
1063
1064 signal(SIGHUP, SIG_IGN);
1065 signal(SIGPIPE, SIG_IGN);
1066 setupSigSegvAction();
1067
1068 server.clients = listCreate();
1069 server.slaves = listCreate();
1070 server.monitors = listCreate();
1071 server.objfreelist = listCreate();
1072 createSharedObjects();
1073 server.el = aeCreateEventLoop();
1074 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1075 server.sharingpool = dictCreate(&setDictType,NULL);
1076 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1077 if (server.fd == -1) {
1078 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1079 exit(1);
1080 }
1081 for (j = 0; j < server.dbnum; j++) {
1082 server.db[j].dict = dictCreate(&hashDictType,NULL);
1083 server.db[j].expires = dictCreate(&setDictType,NULL);
1084 server.db[j].id = j;
1085 }
1086 server.cronloops = 0;
1087 server.bgsaveinprogress = 0;
1088 server.bgsavechildpid = -1;
1089 server.lastsave = time(NULL);
1090 server.dirty = 0;
1091 server.usedmemory = 0;
1092 server.stat_numcommands = 0;
1093 server.stat_numconnections = 0;
1094 server.stat_starttime = time(NULL);
1095 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1096
1097 if (server.appendonly) {
1098 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT);
1099 if (server.appendfd == -1) {
1100 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1101 strerror(errno));
1102 exit(1);
1103 }
1104 }
1105 }
1106
1107 /* Empty the whole database */
1108 static long long emptyDb() {
1109 int j;
1110 long long removed = 0;
1111
1112 for (j = 0; j < server.dbnum; j++) {
1113 removed += dictSize(server.db[j].dict);
1114 dictEmpty(server.db[j].dict);
1115 dictEmpty(server.db[j].expires);
1116 }
1117 return removed;
1118 }
1119
1120 static int yesnotoi(char *s) {
1121 if (!strcasecmp(s,"yes")) return 1;
1122 else if (!strcasecmp(s,"no")) return 0;
1123 else return -1;
1124 }
1125
1126 /* I agree, this is a very rudimental way to load a configuration...
1127 will improve later if the config gets more complex */
1128 static void loadServerConfig(char *filename) {
1129 FILE *fp;
1130 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1131 int linenum = 0;
1132 sds line = NULL;
1133
1134 if (filename[0] == '-' && filename[1] == '\0')
1135 fp = stdin;
1136 else {
1137 if ((fp = fopen(filename,"r")) == NULL) {
1138 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1139 exit(1);
1140 }
1141 }
1142
1143 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1144 sds *argv;
1145 int argc, j;
1146
1147 linenum++;
1148 line = sdsnew(buf);
1149 line = sdstrim(line," \t\r\n");
1150
1151 /* Skip comments and blank lines*/
1152 if (line[0] == '#' || line[0] == '\0') {
1153 sdsfree(line);
1154 continue;
1155 }
1156
1157 /* Split into arguments */
1158 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1159 sdstolower(argv[0]);
1160
1161 /* Execute config directives */
1162 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1163 server.maxidletime = atoi(argv[1]);
1164 if (server.maxidletime < 0) {
1165 err = "Invalid timeout value"; goto loaderr;
1166 }
1167 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1168 server.port = atoi(argv[1]);
1169 if (server.port < 1 || server.port > 65535) {
1170 err = "Invalid port"; goto loaderr;
1171 }
1172 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1173 server.bindaddr = zstrdup(argv[1]);
1174 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1175 int seconds = atoi(argv[1]);
1176 int changes = atoi(argv[2]);
1177 if (seconds < 1 || changes < 0) {
1178 err = "Invalid save parameters"; goto loaderr;
1179 }
1180 appendServerSaveParams(seconds,changes);
1181 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1182 if (chdir(argv[1]) == -1) {
1183 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1184 argv[1], strerror(errno));
1185 exit(1);
1186 }
1187 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1188 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1189 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1190 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1191 else {
1192 err = "Invalid log level. Must be one of debug, notice, warning";
1193 goto loaderr;
1194 }
1195 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1196 FILE *logfp;
1197
1198 server.logfile = zstrdup(argv[1]);
1199 if (!strcasecmp(server.logfile,"stdout")) {
1200 zfree(server.logfile);
1201 server.logfile = NULL;
1202 }
1203 if (server.logfile) {
1204 /* Test if we are able to open the file. The server will not
1205 * be able to abort just for this problem later... */
1206 logfp = fopen(server.logfile,"a");
1207 if (logfp == NULL) {
1208 err = sdscatprintf(sdsempty(),
1209 "Can't open the log file: %s", strerror(errno));
1210 goto loaderr;
1211 }
1212 fclose(logfp);
1213 }
1214 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1215 server.dbnum = atoi(argv[1]);
1216 if (server.dbnum < 1) {
1217 err = "Invalid number of databases"; goto loaderr;
1218 }
1219 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1220 server.maxclients = atoi(argv[1]);
1221 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1222 server.maxmemory = strtoll(argv[1], NULL, 10);
1223 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1224 server.masterhost = sdsnew(argv[1]);
1225 server.masterport = atoi(argv[2]);
1226 server.replstate = REDIS_REPL_CONNECT;
1227 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1228 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1229 err = "argument must be 'yes' or 'no'"; goto loaderr;
1230 }
1231 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1232 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1233 err = "argument must be 'yes' or 'no'"; goto loaderr;
1234 }
1235 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1236 server.sharingpoolsize = atoi(argv[1]);
1237 if (server.sharingpoolsize < 1) {
1238 err = "invalid object sharing pool size"; goto loaderr;
1239 }
1240 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1241 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1242 err = "argument must be 'yes' or 'no'"; goto loaderr;
1243 }
1244 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1245 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1246 err = "argument must be 'yes' or 'no'"; goto loaderr;
1247 }
1248 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1249 server.requirepass = zstrdup(argv[1]);
1250 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1251 server.pidfile = zstrdup(argv[1]);
1252 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1253 server.dbfilename = zstrdup(argv[1]);
1254 } else {
1255 err = "Bad directive or wrong number of arguments"; goto loaderr;
1256 }
1257 for (j = 0; j < argc; j++)
1258 sdsfree(argv[j]);
1259 zfree(argv);
1260 sdsfree(line);
1261 }
1262 if (fp != stdin) fclose(fp);
1263 return;
1264
1265 loaderr:
1266 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1267 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1268 fprintf(stderr, ">>> '%s'\n", line);
1269 fprintf(stderr, "%s\n", err);
1270 exit(1);
1271 }
1272
1273 static void freeClientArgv(redisClient *c) {
1274 int j;
1275
1276 for (j = 0; j < c->argc; j++)
1277 decrRefCount(c->argv[j]);
1278 for (j = 0; j < c->mbargc; j++)
1279 decrRefCount(c->mbargv[j]);
1280 c->argc = 0;
1281 c->mbargc = 0;
1282 }
1283
1284 static void freeClient(redisClient *c) {
1285 listNode *ln;
1286
1287 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1288 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1289 sdsfree(c->querybuf);
1290 listRelease(c->reply);
1291 freeClientArgv(c);
1292 close(c->fd);
1293 ln = listSearchKey(server.clients,c);
1294 assert(ln != NULL);
1295 listDelNode(server.clients,ln);
1296 if (c->flags & REDIS_SLAVE) {
1297 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1298 close(c->repldbfd);
1299 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1300 ln = listSearchKey(l,c);
1301 assert(ln != NULL);
1302 listDelNode(l,ln);
1303 }
1304 if (c->flags & REDIS_MASTER) {
1305 server.master = NULL;
1306 server.replstate = REDIS_REPL_CONNECT;
1307 }
1308 zfree(c->argv);
1309 zfree(c->mbargv);
1310 zfree(c);
1311 }
1312
1313 static void glueReplyBuffersIfNeeded(redisClient *c) {
1314 int totlen = 0;
1315 listNode *ln;
1316 robj *o;
1317
1318 listRewind(c->reply);
1319 while((ln = listYield(c->reply))) {
1320 o = ln->value;
1321 totlen += sdslen(o->ptr);
1322 /* This optimization makes more sense if we don't have to copy
1323 * too much data */
1324 if (totlen > 1024) return;
1325 }
1326 if (totlen > 0) {
1327 char buf[1024];
1328 int copylen = 0;
1329
1330 listRewind(c->reply);
1331 while((ln = listYield(c->reply))) {
1332 o = ln->value;
1333 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1334 copylen += sdslen(o->ptr);
1335 listDelNode(c->reply,ln);
1336 }
1337 /* Now the output buffer is empty, add the new single element */
1338 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1339 listAddNodeTail(c->reply,o);
1340 }
1341 }
1342
1343 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1344 redisClient *c = privdata;
1345 int nwritten = 0, totwritten = 0, objlen;
1346 robj *o;
1347 REDIS_NOTUSED(el);
1348 REDIS_NOTUSED(mask);
1349
1350 if (server.glueoutputbuf && listLength(c->reply) > 1)
1351 glueReplyBuffersIfNeeded(c);
1352 while(listLength(c->reply)) {
1353 o = listNodeValue(listFirst(c->reply));
1354 objlen = sdslen(o->ptr);
1355
1356 if (objlen == 0) {
1357 listDelNode(c->reply,listFirst(c->reply));
1358 continue;
1359 }
1360
1361 if (c->flags & REDIS_MASTER) {
1362 /* Don't reply to a master */
1363 nwritten = objlen - c->sentlen;
1364 } else {
1365 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1366 if (nwritten <= 0) break;
1367 }
1368 c->sentlen += nwritten;
1369 totwritten += nwritten;
1370 /* If we fully sent the object on head go to the next one */
1371 if (c->sentlen == objlen) {
1372 listDelNode(c->reply,listFirst(c->reply));
1373 c->sentlen = 0;
1374 }
1375 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1376 * bytes, in a single threaded server it's a good idea to server
1377 * other clients as well, even if a very large request comes from
1378 * super fast link that is always able to accept data (in real world
1379 * terms think to 'KEYS *' against the loopback interfae) */
1380 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1381 }
1382 if (nwritten == -1) {
1383 if (errno == EAGAIN) {
1384 nwritten = 0;
1385 } else {
1386 redisLog(REDIS_DEBUG,
1387 "Error writing to client: %s", strerror(errno));
1388 freeClient(c);
1389 return;
1390 }
1391 }
1392 if (totwritten > 0) c->lastinteraction = time(NULL);
1393 if (listLength(c->reply) == 0) {
1394 c->sentlen = 0;
1395 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1396 }
1397 }
1398
1399 static struct redisCommand *lookupCommand(char *name) {
1400 int j = 0;
1401 while(cmdTable[j].name != NULL) {
1402 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1403 j++;
1404 }
1405 return NULL;
1406 }
1407
1408 /* resetClient prepare the client to process the next command */
1409 static void resetClient(redisClient *c) {
1410 freeClientArgv(c);
1411 c->bulklen = -1;
1412 c->multibulk = 0;
1413 }
1414
1415 /* If this function gets called we already read a whole
1416 * command, argments are in the client argv/argc fields.
1417 * processCommand() execute the command or prepare the
1418 * server for a bulk read from the client.
1419 *
1420 * If 1 is returned the client is still alive and valid and
1421 * and other operations can be performed by the caller. Otherwise
1422 * if 0 is returned the client was destroied (i.e. after QUIT). */
1423 static int processCommand(redisClient *c) {
1424 struct redisCommand *cmd;
1425 long long dirty;
1426
1427 /* Free some memory if needed (maxmemory setting) */
1428 if (server.maxmemory) freeMemoryIfNeeded();
1429
1430 /* Handle the multi bulk command type. This is an alternative protocol
1431 * supported by Redis in order to receive commands that are composed of
1432 * multiple binary-safe "bulk" arguments. The latency of processing is
1433 * a bit higher but this allows things like multi-sets, so if this
1434 * protocol is used only for MSET and similar commands this is a big win. */
1435 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1436 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1437 if (c->multibulk <= 0) {
1438 resetClient(c);
1439 return 1;
1440 } else {
1441 decrRefCount(c->argv[c->argc-1]);
1442 c->argc--;
1443 return 1;
1444 }
1445 } else if (c->multibulk) {
1446 if (c->bulklen == -1) {
1447 if (((char*)c->argv[0]->ptr)[0] != '$') {
1448 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1449 resetClient(c);
1450 return 1;
1451 } else {
1452 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1453 decrRefCount(c->argv[0]);
1454 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1455 c->argc--;
1456 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1457 resetClient(c);
1458 return 1;
1459 }
1460 c->argc--;
1461 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1462 return 1;
1463 }
1464 } else {
1465 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1466 c->mbargv[c->mbargc] = c->argv[0];
1467 c->mbargc++;
1468 c->argc--;
1469 c->multibulk--;
1470 if (c->multibulk == 0) {
1471 robj **auxargv;
1472 int auxargc;
1473
1474 /* Here we need to swap the multi-bulk argc/argv with the
1475 * normal argc/argv of the client structure. */
1476 auxargv = c->argv;
1477 c->argv = c->mbargv;
1478 c->mbargv = auxargv;
1479
1480 auxargc = c->argc;
1481 c->argc = c->mbargc;
1482 c->mbargc = auxargc;
1483
1484 /* We need to set bulklen to something different than -1
1485 * in order for the code below to process the command without
1486 * to try to read the last argument of a bulk command as
1487 * a special argument. */
1488 c->bulklen = 0;
1489 /* continue below and process the command */
1490 } else {
1491 c->bulklen = -1;
1492 return 1;
1493 }
1494 }
1495 }
1496 /* -- end of multi bulk commands processing -- */
1497
1498 /* The QUIT command is handled as a special case. Normal command
1499 * procs are unable to close the client connection safely */
1500 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1501 freeClient(c);
1502 return 0;
1503 }
1504 cmd = lookupCommand(c->argv[0]->ptr);
1505 if (!cmd) {
1506 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1507 resetClient(c);
1508 return 1;
1509 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1510 (c->argc < -cmd->arity)) {
1511 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1512 resetClient(c);
1513 return 1;
1514 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1515 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1516 resetClient(c);
1517 return 1;
1518 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1519 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1520
1521 decrRefCount(c->argv[c->argc-1]);
1522 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1523 c->argc--;
1524 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1525 resetClient(c);
1526 return 1;
1527 }
1528 c->argc--;
1529 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1530 /* It is possible that the bulk read is already in the
1531 * buffer. Check this condition and handle it accordingly.
1532 * This is just a fast path, alternative to call processInputBuffer().
1533 * It's a good idea since the code is small and this condition
1534 * happens most of the times. */
1535 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1536 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1537 c->argc++;
1538 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1539 } else {
1540 return 1;
1541 }
1542 }
1543 /* Let's try to share objects on the command arguments vector */
1544 if (server.shareobjects) {
1545 int j;
1546 for(j = 1; j < c->argc; j++)
1547 c->argv[j] = tryObjectSharing(c->argv[j]);
1548 }
1549 /* Let's try to encode the bulk object to save space. */
1550 if (cmd->flags & REDIS_CMD_BULK)
1551 tryObjectEncoding(c->argv[c->argc-1]);
1552
1553 /* Check if the user is authenticated */
1554 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1555 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1556 resetClient(c);
1557 return 1;
1558 }
1559
1560 /* Exec the command */
1561 dirty = server.dirty;
1562 cmd->proc(c);
1563 if (server.appendonly != 0)
1564 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1565 if (server.dirty-dirty != 0 && listLength(server.slaves))
1566 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1567 if (listLength(server.monitors))
1568 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1569 server.stat_numcommands++;
1570
1571 /* Prepare the client for the next command */
1572 if (c->flags & REDIS_CLOSE) {
1573 freeClient(c);
1574 return 0;
1575 }
1576 resetClient(c);
1577 return 1;
1578 }
1579
1580 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1581 listNode *ln;
1582 int outc = 0, j;
1583 robj **outv;
1584 /* (args*2)+1 is enough room for args, spaces, newlines */
1585 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1586
1587 if (argc <= REDIS_STATIC_ARGS) {
1588 outv = static_outv;
1589 } else {
1590 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1591 }
1592
1593 for (j = 0; j < argc; j++) {
1594 if (j != 0) outv[outc++] = shared.space;
1595 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1596 robj *lenobj;
1597
1598 lenobj = createObject(REDIS_STRING,
1599 sdscatprintf(sdsempty(),"%d\r\n",
1600 stringObjectLen(argv[j])));
1601 lenobj->refcount = 0;
1602 outv[outc++] = lenobj;
1603 }
1604 outv[outc++] = argv[j];
1605 }
1606 outv[outc++] = shared.crlf;
1607
1608 /* Increment all the refcounts at start and decrement at end in order to
1609 * be sure to free objects if there is no slave in a replication state
1610 * able to be feed with commands */
1611 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1612 listRewind(slaves);
1613 while((ln = listYield(slaves))) {
1614 redisClient *slave = ln->value;
1615
1616 /* Don't feed slaves that are still waiting for BGSAVE to start */
1617 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1618
1619 /* Feed all the other slaves, MONITORs and so on */
1620 if (slave->slaveseldb != dictid) {
1621 robj *selectcmd;
1622
1623 switch(dictid) {
1624 case 0: selectcmd = shared.select0; break;
1625 case 1: selectcmd = shared.select1; break;
1626 case 2: selectcmd = shared.select2; break;
1627 case 3: selectcmd = shared.select3; break;
1628 case 4: selectcmd = shared.select4; break;
1629 case 5: selectcmd = shared.select5; break;
1630 case 6: selectcmd = shared.select6; break;
1631 case 7: selectcmd = shared.select7; break;
1632 case 8: selectcmd = shared.select8; break;
1633 case 9: selectcmd = shared.select9; break;
1634 default:
1635 selectcmd = createObject(REDIS_STRING,
1636 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1637 selectcmd->refcount = 0;
1638 break;
1639 }
1640 addReply(slave,selectcmd);
1641 slave->slaveseldb = dictid;
1642 }
1643 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1644 }
1645 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1646 if (outv != static_outv) zfree(outv);
1647 }
1648
1649 /* TODO: translate EXPIREs into EXPIRETOs */
1650 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1651 sds buf = sdsempty();
1652 int j;
1653 ssize_t nwritten;
1654
1655 /* The DB this command was targetting is not the same as the last command
1656 * we appendend. To issue a SELECT command is needed. */
1657 if (dictid != server.appendseldb) {
1658 char seldb[64];
1659
1660 snprintf(seldb,sizeof(seldb),"%d",dictid);
1661 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
1662 strlen(seldb),seldb);
1663 }
1664 /* Append the actual command */
1665 buf = sdscatprintf(buf,"*%d\r\n",argc);
1666 for (j = 0; j < argc; j++) {
1667 robj *o = argv[j];
1668
1669 if (o->encoding != REDIS_ENCODING_RAW)
1670 o = getDecodedObject(o);
1671 buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
1672 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
1673 buf = sdscatlen(buf,"\r\n",2);
1674 if (o != argv[j])
1675 decrRefCount(o);
1676 }
1677 /* We want to perform a single write. This should be guaranteed atomic
1678 * at least if the filesystem we are writing is a real physical one.
1679 * While this will save us against the server being killed I don't think
1680 * there is much to do about the whole server stopping for power problems
1681 * or alike */
1682 nwritten = write(server.appendfd,buf,sdslen(buf));
1683 if (nwritten != (unsigned)sdslen(buf)) {
1684 /* Ooops, we are in troubles. The best thing to do for now is
1685 * to simply exit instead to give the illusion that everything is
1686 * working as expected. */
1687 if (nwritten == -1) {
1688 redisLog(REDIS_WARNING,"Aborting on error writing to the append-only file: %s",strerror(errno));
1689 } else {
1690 redisLog(REDIS_WARNING,"Aborting on short write while writing to the append-only file: %s",strerror(errno));
1691 }
1692 abort();
1693 }
1694 fsync(server.appendfd); /* Let's try to get this data on the disk */
1695 }
1696
1697 static void processInputBuffer(redisClient *c) {
1698 again:
1699 if (c->bulklen == -1) {
1700 /* Read the first line of the query */
1701 char *p = strchr(c->querybuf,'\n');
1702 size_t querylen;
1703
1704 if (p) {
1705 sds query, *argv;
1706 int argc, j;
1707
1708 query = c->querybuf;
1709 c->querybuf = sdsempty();
1710 querylen = 1+(p-(query));
1711 if (sdslen(query) > querylen) {
1712 /* leave data after the first line of the query in the buffer */
1713 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1714 }
1715 *p = '\0'; /* remove "\n" */
1716 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1717 sdsupdatelen(query);
1718
1719 /* Now we can split the query in arguments */
1720 if (sdslen(query) == 0) {
1721 /* Ignore empty query */
1722 sdsfree(query);
1723 return;
1724 }
1725 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1726 sdsfree(query);
1727
1728 if (c->argv) zfree(c->argv);
1729 c->argv = zmalloc(sizeof(robj*)*argc);
1730
1731 for (j = 0; j < argc; j++) {
1732 if (sdslen(argv[j])) {
1733 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1734 c->argc++;
1735 } else {
1736 sdsfree(argv[j]);
1737 }
1738 }
1739 zfree(argv);
1740 /* Execute the command. If the client is still valid
1741 * after processCommand() return and there is something
1742 * on the query buffer try to process the next command. */
1743 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1744 return;
1745 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1746 redisLog(REDIS_DEBUG, "Client protocol error");
1747 freeClient(c);
1748 return;
1749 }
1750 } else {
1751 /* Bulk read handling. Note that if we are at this point
1752 the client already sent a command terminated with a newline,
1753 we are reading the bulk data that is actually the last
1754 argument of the command. */
1755 int qbl = sdslen(c->querybuf);
1756
1757 if (c->bulklen <= qbl) {
1758 /* Copy everything but the final CRLF as final argument */
1759 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1760 c->argc++;
1761 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1762 /* Process the command. If the client is still valid after
1763 * the processing and there is more data in the buffer
1764 * try to parse it. */
1765 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1766 return;
1767 }
1768 }
1769 }
1770
1771 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1772 redisClient *c = (redisClient*) privdata;
1773 char buf[REDIS_IOBUF_LEN];
1774 int nread;
1775 REDIS_NOTUSED(el);
1776 REDIS_NOTUSED(mask);
1777
1778 nread = read(fd, buf, REDIS_IOBUF_LEN);
1779 if (nread == -1) {
1780 if (errno == EAGAIN) {
1781 nread = 0;
1782 } else {
1783 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1784 freeClient(c);
1785 return;
1786 }
1787 } else if (nread == 0) {
1788 redisLog(REDIS_DEBUG, "Client closed connection");
1789 freeClient(c);
1790 return;
1791 }
1792 if (nread) {
1793 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1794 c->lastinteraction = time(NULL);
1795 } else {
1796 return;
1797 }
1798 processInputBuffer(c);
1799 }
1800
1801 static int selectDb(redisClient *c, int id) {
1802 if (id < 0 || id >= server.dbnum)
1803 return REDIS_ERR;
1804 c->db = &server.db[id];
1805 return REDIS_OK;
1806 }
1807
1808 static void *dupClientReplyValue(void *o) {
1809 incrRefCount((robj*)o);
1810 return 0;
1811 }
1812
1813 static redisClient *createClient(int fd) {
1814 redisClient *c = zmalloc(sizeof(*c));
1815
1816 anetNonBlock(NULL,fd);
1817 anetTcpNoDelay(NULL,fd);
1818 if (!c) return NULL;
1819 selectDb(c,0);
1820 c->fd = fd;
1821 c->querybuf = sdsempty();
1822 c->argc = 0;
1823 c->argv = NULL;
1824 c->bulklen = -1;
1825 c->multibulk = 0;
1826 c->mbargc = 0;
1827 c->mbargv = NULL;
1828 c->sentlen = 0;
1829 c->flags = 0;
1830 c->lastinteraction = time(NULL);
1831 c->authenticated = 0;
1832 c->replstate = REDIS_REPL_NONE;
1833 c->reply = listCreate();
1834 listSetFreeMethod(c->reply,decrRefCount);
1835 listSetDupMethod(c->reply,dupClientReplyValue);
1836 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1837 readQueryFromClient, c, NULL) == AE_ERR) {
1838 freeClient(c);
1839 return NULL;
1840 }
1841 listAddNodeTail(server.clients,c);
1842 return c;
1843 }
1844
1845 static void addReply(redisClient *c, robj *obj) {
1846 if (listLength(c->reply) == 0 &&
1847 (c->replstate == REDIS_REPL_NONE ||
1848 c->replstate == REDIS_REPL_ONLINE) &&
1849 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1850 sendReplyToClient, c, NULL) == AE_ERR) return;
1851 if (obj->encoding != REDIS_ENCODING_RAW) {
1852 obj = getDecodedObject(obj);
1853 } else {
1854 incrRefCount(obj);
1855 }
1856 listAddNodeTail(c->reply,obj);
1857 }
1858
1859 static void addReplySds(redisClient *c, sds s) {
1860 robj *o = createObject(REDIS_STRING,s);
1861 addReply(c,o);
1862 decrRefCount(o);
1863 }
1864
1865 static void addReplyBulkLen(redisClient *c, robj *obj) {
1866 size_t len;
1867
1868 if (obj->encoding == REDIS_ENCODING_RAW) {
1869 len = sdslen(obj->ptr);
1870 } else {
1871 long n = (long)obj->ptr;
1872
1873 len = 1;
1874 if (n < 0) {
1875 len++;
1876 n = -n;
1877 }
1878 while((n = n/10) != 0) {
1879 len++;
1880 }
1881 }
1882 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1883 }
1884
1885 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1886 int cport, cfd;
1887 char cip[128];
1888 redisClient *c;
1889 REDIS_NOTUSED(el);
1890 REDIS_NOTUSED(mask);
1891 REDIS_NOTUSED(privdata);
1892
1893 cfd = anetAccept(server.neterr, fd, cip, &cport);
1894 if (cfd == AE_ERR) {
1895 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1896 return;
1897 }
1898 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1899 if ((c = createClient(cfd)) == NULL) {
1900 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1901 close(cfd); /* May be already closed, just ingore errors */
1902 return;
1903 }
1904 /* If maxclient directive is set and this is one client more... close the
1905 * connection. Note that we create the client instead to check before
1906 * for this condition, since now the socket is already set in nonblocking
1907 * mode and we can send an error for free using the Kernel I/O */
1908 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1909 char *err = "-ERR max number of clients reached\r\n";
1910
1911 /* That's a best effort error message, don't check write errors */
1912 (void) write(c->fd,err,strlen(err));
1913 freeClient(c);
1914 return;
1915 }
1916 server.stat_numconnections++;
1917 }
1918
1919 /* ======================= Redis objects implementation ===================== */
1920
1921 static robj *createObject(int type, void *ptr) {
1922 robj *o;
1923
1924 if (listLength(server.objfreelist)) {
1925 listNode *head = listFirst(server.objfreelist);
1926 o = listNodeValue(head);
1927 listDelNode(server.objfreelist,head);
1928 } else {
1929 o = zmalloc(sizeof(*o));
1930 }
1931 o->type = type;
1932 o->encoding = REDIS_ENCODING_RAW;
1933 o->ptr = ptr;
1934 o->refcount = 1;
1935 return o;
1936 }
1937
1938 static robj *createStringObject(char *ptr, size_t len) {
1939 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1940 }
1941
1942 static robj *createListObject(void) {
1943 list *l = listCreate();
1944
1945 listSetFreeMethod(l,decrRefCount);
1946 return createObject(REDIS_LIST,l);
1947 }
1948
1949 static robj *createSetObject(void) {
1950 dict *d = dictCreate(&setDictType,NULL);
1951 return createObject(REDIS_SET,d);
1952 }
1953
1954 static robj *createZsetObject(void) {
1955 zset *zs = zmalloc(sizeof(*zs));
1956
1957 zs->dict = dictCreate(&zsetDictType,NULL);
1958 zs->zsl = zslCreate();
1959 return createObject(REDIS_ZSET,zs);
1960 }
1961
1962 static void freeStringObject(robj *o) {
1963 if (o->encoding == REDIS_ENCODING_RAW) {
1964 sdsfree(o->ptr);
1965 }
1966 }
1967
1968 static void freeListObject(robj *o) {
1969 listRelease((list*) o->ptr);
1970 }
1971
1972 static void freeSetObject(robj *o) {
1973 dictRelease((dict*) o->ptr);
1974 }
1975
1976 static void freeZsetObject(robj *o) {
1977 zset *zs = o->ptr;
1978
1979 dictRelease(zs->dict);
1980 zslFree(zs->zsl);
1981 zfree(zs);
1982 }
1983
1984 static void freeHashObject(robj *o) {
1985 dictRelease((dict*) o->ptr);
1986 }
1987
1988 static void incrRefCount(robj *o) {
1989 o->refcount++;
1990 #ifdef DEBUG_REFCOUNT
1991 if (o->type == REDIS_STRING)
1992 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1993 #endif
1994 }
1995
1996 static void decrRefCount(void *obj) {
1997 robj *o = obj;
1998
1999 #ifdef DEBUG_REFCOUNT
2000 if (o->type == REDIS_STRING)
2001 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2002 #endif
2003 if (--(o->refcount) == 0) {
2004 switch(o->type) {
2005 case REDIS_STRING: freeStringObject(o); break;
2006 case REDIS_LIST: freeListObject(o); break;
2007 case REDIS_SET: freeSetObject(o); break;
2008 case REDIS_ZSET: freeZsetObject(o); break;
2009 case REDIS_HASH: freeHashObject(o); break;
2010 default: assert(0 != 0); break;
2011 }
2012 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2013 !listAddNodeHead(server.objfreelist,o))
2014 zfree(o);
2015 }
2016 }
2017
2018 static robj *lookupKey(redisDb *db, robj *key) {
2019 dictEntry *de = dictFind(db->dict,key);
2020 return de ? dictGetEntryVal(de) : NULL;
2021 }
2022
2023 static robj *lookupKeyRead(redisDb *db, robj *key) {
2024 expireIfNeeded(db,key);
2025 return lookupKey(db,key);
2026 }
2027
2028 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2029 deleteIfVolatile(db,key);
2030 return lookupKey(db,key);
2031 }
2032
2033 static int deleteKey(redisDb *db, robj *key) {
2034 int retval;
2035
2036 /* We need to protect key from destruction: after the first dictDelete()
2037 * it may happen that 'key' is no longer valid if we don't increment
2038 * it's count. This may happen when we get the object reference directly
2039 * from the hash table with dictRandomKey() or dict iterators */
2040 incrRefCount(key);
2041 if (dictSize(db->expires)) dictDelete(db->expires,key);
2042 retval = dictDelete(db->dict,key);
2043 decrRefCount(key);
2044
2045 return retval == DICT_OK;
2046 }
2047
2048 /* Try to share an object against the shared objects pool */
2049 static robj *tryObjectSharing(robj *o) {
2050 struct dictEntry *de;
2051 unsigned long c;
2052
2053 if (o == NULL || server.shareobjects == 0) return o;
2054
2055 assert(o->type == REDIS_STRING);
2056 de = dictFind(server.sharingpool,o);
2057 if (de) {
2058 robj *shared = dictGetEntryKey(de);
2059
2060 c = ((unsigned long) dictGetEntryVal(de))+1;
2061 dictGetEntryVal(de) = (void*) c;
2062 incrRefCount(shared);
2063 decrRefCount(o);
2064 return shared;
2065 } else {
2066 /* Here we are using a stream algorihtm: Every time an object is
2067 * shared we increment its count, everytime there is a miss we
2068 * recrement the counter of a random object. If this object reaches
2069 * zero we remove the object and put the current object instead. */
2070 if (dictSize(server.sharingpool) >=
2071 server.sharingpoolsize) {
2072 de = dictGetRandomKey(server.sharingpool);
2073 assert(de != NULL);
2074 c = ((unsigned long) dictGetEntryVal(de))-1;
2075 dictGetEntryVal(de) = (void*) c;
2076 if (c == 0) {
2077 dictDelete(server.sharingpool,de->key);
2078 }
2079 } else {
2080 c = 0; /* If the pool is empty we want to add this object */
2081 }
2082 if (c == 0) {
2083 int retval;
2084
2085 retval = dictAdd(server.sharingpool,o,(void*)1);
2086 assert(retval == DICT_OK);
2087 incrRefCount(o);
2088 }
2089 return o;
2090 }
2091 }
2092
2093 /* Check if the nul-terminated string 's' can be represented by a long
2094 * (that is, is a number that fits into long without any other space or
2095 * character before or after the digits).
2096 *
2097 * If so, the function returns REDIS_OK and *longval is set to the value
2098 * of the number. Otherwise REDIS_ERR is returned */
2099 static int isStringRepresentableAsLong(sds s, long *longval) {
2100 char buf[32], *endptr;
2101 long value;
2102 int slen;
2103
2104 value = strtol(s, &endptr, 10);
2105 if (endptr[0] != '\0') return REDIS_ERR;
2106 slen = snprintf(buf,32,"%ld",value);
2107
2108 /* If the number converted back into a string is not identical
2109 * then it's not possible to encode the string as integer */
2110 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2111 if (longval) *longval = value;
2112 return REDIS_OK;
2113 }
2114
2115 /* Try to encode a string object in order to save space */
2116 static int tryObjectEncoding(robj *o) {
2117 long value;
2118 sds s = o->ptr;
2119
2120 if (o->encoding != REDIS_ENCODING_RAW)
2121 return REDIS_ERR; /* Already encoded */
2122
2123 /* It's not save to encode shared objects: shared objects can be shared
2124 * everywhere in the "object space" of Redis. Encoded objects can only
2125 * appear as "values" (and not, for instance, as keys) */
2126 if (o->refcount > 1) return REDIS_ERR;
2127
2128 /* Currently we try to encode only strings */
2129 assert(o->type == REDIS_STRING);
2130
2131 /* Check if we can represent this string as a long integer */
2132 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2133
2134 /* Ok, this object can be encoded */
2135 o->encoding = REDIS_ENCODING_INT;
2136 sdsfree(o->ptr);
2137 o->ptr = (void*) value;
2138 return REDIS_OK;
2139 }
2140
2141 /* Get a decoded version of an encoded object (returned as a new object) */
2142 static robj *getDecodedObject(const robj *o) {
2143 robj *dec;
2144
2145 assert(o->encoding != REDIS_ENCODING_RAW);
2146 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2147 char buf[32];
2148
2149 snprintf(buf,32,"%ld",(long)o->ptr);
2150 dec = createStringObject(buf,strlen(buf));
2151 return dec;
2152 } else {
2153 assert(1 != 1);
2154 }
2155 }
2156
2157 /* Compare two string objects via strcmp() or alike.
2158 * Note that the objects may be integer-encoded. In such a case we
2159 * use snprintf() to get a string representation of the numbers on the stack
2160 * and compare the strings, it's much faster than calling getDecodedObject(). */
2161 static int compareStringObjects(robj *a, robj *b) {
2162 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2163 char bufa[128], bufb[128], *astr, *bstr;
2164 int bothsds = 1;
2165
2166 if (a == b) return 0;
2167 if (a->encoding != REDIS_ENCODING_RAW) {
2168 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2169 astr = bufa;
2170 bothsds = 0;
2171 } else {
2172 astr = a->ptr;
2173 }
2174 if (b->encoding != REDIS_ENCODING_RAW) {
2175 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2176 bstr = bufb;
2177 bothsds = 0;
2178 } else {
2179 bstr = b->ptr;
2180 }
2181 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2182 }
2183
2184 static size_t stringObjectLen(robj *o) {
2185 assert(o->type == REDIS_STRING);
2186 if (o->encoding == REDIS_ENCODING_RAW) {
2187 return sdslen(o->ptr);
2188 } else {
2189 char buf[32];
2190
2191 return snprintf(buf,32,"%ld",(long)o->ptr);
2192 }
2193 }
2194
2195 /*============================ DB saving/loading ============================ */
2196
2197 static int rdbSaveType(FILE *fp, unsigned char type) {
2198 if (fwrite(&type,1,1,fp) == 0) return -1;
2199 return 0;
2200 }
2201
2202 static int rdbSaveTime(FILE *fp, time_t t) {
2203 int32_t t32 = (int32_t) t;
2204 if (fwrite(&t32,4,1,fp) == 0) return -1;
2205 return 0;
2206 }
2207
2208 /* check rdbLoadLen() comments for more info */
2209 static int rdbSaveLen(FILE *fp, uint32_t len) {
2210 unsigned char buf[2];
2211
2212 if (len < (1<<6)) {
2213 /* Save a 6 bit len */
2214 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2215 if (fwrite(buf,1,1,fp) == 0) return -1;
2216 } else if (len < (1<<14)) {
2217 /* Save a 14 bit len */
2218 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2219 buf[1] = len&0xFF;
2220 if (fwrite(buf,2,1,fp) == 0) return -1;
2221 } else {
2222 /* Save a 32 bit len */
2223 buf[0] = (REDIS_RDB_32BITLEN<<6);
2224 if (fwrite(buf,1,1,fp) == 0) return -1;
2225 len = htonl(len);
2226 if (fwrite(&len,4,1,fp) == 0) return -1;
2227 }
2228 return 0;
2229 }
2230
2231 /* String objects in the form "2391" "-100" without any space and with a
2232 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2233 * encoded as integers to save space */
2234 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2235 long long value;
2236 char *endptr, buf[32];
2237
2238 /* Check if it's possible to encode this value as a number */
2239 value = strtoll(s, &endptr, 10);
2240 if (endptr[0] != '\0') return 0;
2241 snprintf(buf,32,"%lld",value);
2242
2243 /* If the number converted back into a string is not identical
2244 * then it's not possible to encode the string as integer */
2245 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2246
2247 /* Finally check if it fits in our ranges */
2248 if (value >= -(1<<7) && value <= (1<<7)-1) {
2249 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2250 enc[1] = value&0xFF;
2251 return 2;
2252 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2253 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2254 enc[1] = value&0xFF;
2255 enc[2] = (value>>8)&0xFF;
2256 return 3;
2257 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2258 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2259 enc[1] = value&0xFF;
2260 enc[2] = (value>>8)&0xFF;
2261 enc[3] = (value>>16)&0xFF;
2262 enc[4] = (value>>24)&0xFF;
2263 return 5;
2264 } else {
2265 return 0;
2266 }
2267 }
2268
2269 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2270 unsigned int comprlen, outlen;
2271 unsigned char byte;
2272 void *out;
2273
2274 /* We require at least four bytes compression for this to be worth it */
2275 outlen = sdslen(obj->ptr)-4;
2276 if (outlen <= 0) return 0;
2277 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2278 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2279 if (comprlen == 0) {
2280 zfree(out);
2281 return 0;
2282 }
2283 /* Data compressed! Let's save it on disk */
2284 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2285 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2286 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2287 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2288 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2289 zfree(out);
2290 return comprlen;
2291
2292 writeerr:
2293 zfree(out);
2294 return -1;
2295 }
2296
2297 /* Save a string objet as [len][data] on disk. If the object is a string
2298 * representation of an integer value we try to safe it in a special form */
2299 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2300 size_t len;
2301 int enclen;
2302
2303 len = sdslen(obj->ptr);
2304
2305 /* Try integer encoding */
2306 if (len <= 11) {
2307 unsigned char buf[5];
2308 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2309 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2310 return 0;
2311 }
2312 }
2313
2314 /* Try LZF compression - under 20 bytes it's unable to compress even
2315 * aaaaaaaaaaaaaaaaaa so skip it */
2316 if (len > 20) {
2317 int retval;
2318
2319 retval = rdbSaveLzfStringObject(fp,obj);
2320 if (retval == -1) return -1;
2321 if (retval > 0) return 0;
2322 /* retval == 0 means data can't be compressed, save the old way */
2323 }
2324
2325 /* Store verbatim */
2326 if (rdbSaveLen(fp,len) == -1) return -1;
2327 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2328 return 0;
2329 }
2330
2331 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2332 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2333 int retval;
2334 robj *dec;
2335
2336 if (obj->encoding != REDIS_ENCODING_RAW) {
2337 dec = getDecodedObject(obj);
2338 retval = rdbSaveStringObjectRaw(fp,dec);
2339 decrRefCount(dec);
2340 return retval;
2341 } else {
2342 return rdbSaveStringObjectRaw(fp,obj);
2343 }
2344 }
2345
2346 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2347 * 8 bit integer specifing the length of the representation.
2348 * This 8 bit integer has special values in order to specify the following
2349 * conditions:
2350 * 253: not a number
2351 * 254: + inf
2352 * 255: - inf
2353 */
2354 static int rdbSaveDoubleValue(FILE *fp, double val) {
2355 unsigned char buf[128];
2356 int len;
2357
2358 if (isnan(val)) {
2359 buf[0] = 253;
2360 len = 1;
2361 } else if (!isfinite(val)) {
2362 len = 1;
2363 buf[0] = (val < 0) ? 255 : 254;
2364 } else {
2365 snprintf((char*)buf+1,sizeof(buf)-1,"%.16g",val);
2366 buf[0] = strlen((char*)buf);
2367 len = buf[0]+1;
2368 }
2369 if (fwrite(buf,len,1,fp) == 0) return -1;
2370 return 0;
2371 }
2372
2373 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2374 static int rdbSave(char *filename) {
2375 dictIterator *di = NULL;
2376 dictEntry *de;
2377 FILE *fp;
2378 char tmpfile[256];
2379 int j;
2380 time_t now = time(NULL);
2381
2382 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2383 fp = fopen(tmpfile,"w");
2384 if (!fp) {
2385 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2386 return REDIS_ERR;
2387 }
2388 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2389 for (j = 0; j < server.dbnum; j++) {
2390 redisDb *db = server.db+j;
2391 dict *d = db->dict;
2392 if (dictSize(d) == 0) continue;
2393 di = dictGetIterator(d);
2394 if (!di) {
2395 fclose(fp);
2396 return REDIS_ERR;
2397 }
2398
2399 /* Write the SELECT DB opcode */
2400 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2401 if (rdbSaveLen(fp,j) == -1) goto werr;
2402
2403 /* Iterate this DB writing every entry */
2404 while((de = dictNext(di)) != NULL) {
2405 robj *key = dictGetEntryKey(de);
2406 robj *o = dictGetEntryVal(de);
2407 time_t expiretime = getExpire(db,key);
2408
2409 /* Save the expire time */
2410 if (expiretime != -1) {
2411 /* If this key is already expired skip it */
2412 if (expiretime < now) continue;
2413 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2414 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2415 }
2416 /* Save the key and associated value */
2417 if (rdbSaveType(fp,o->type) == -1) goto werr;
2418 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2419 if (o->type == REDIS_STRING) {
2420 /* Save a string value */
2421 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2422 } else if (o->type == REDIS_LIST) {
2423 /* Save a list value */
2424 list *list = o->ptr;
2425 listNode *ln;
2426
2427 listRewind(list);
2428 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2429 while((ln = listYield(list))) {
2430 robj *eleobj = listNodeValue(ln);
2431
2432 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2433 }
2434 } else if (o->type == REDIS_SET) {
2435 /* Save a set value */
2436 dict *set = o->ptr;
2437 dictIterator *di = dictGetIterator(set);
2438 dictEntry *de;
2439
2440 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2441 while((de = dictNext(di)) != NULL) {
2442 robj *eleobj = dictGetEntryKey(de);
2443
2444 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2445 }
2446 dictReleaseIterator(di);
2447 } else if (o->type == REDIS_ZSET) {
2448 /* Save a set value */
2449 zset *zs = o->ptr;
2450 dictIterator *di = dictGetIterator(zs->dict);
2451 dictEntry *de;
2452
2453 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2454 while((de = dictNext(di)) != NULL) {
2455 robj *eleobj = dictGetEntryKey(de);
2456 double *score = dictGetEntryVal(de);
2457
2458 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2459 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2460 }
2461 dictReleaseIterator(di);
2462 } else {
2463 assert(0 != 0);
2464 }
2465 }
2466 dictReleaseIterator(di);
2467 }
2468 /* EOF opcode */
2469 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2470
2471 /* Make sure data will not remain on the OS's output buffers */
2472 fflush(fp);
2473 fsync(fileno(fp));
2474 fclose(fp);
2475
2476 /* Use RENAME to make sure the DB file is changed atomically only
2477 * if the generate DB file is ok. */
2478 if (rename(tmpfile,filename) == -1) {
2479 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2480 unlink(tmpfile);
2481 return REDIS_ERR;
2482 }
2483 redisLog(REDIS_NOTICE,"DB saved on disk");
2484 server.dirty = 0;
2485 server.lastsave = time(NULL);
2486 return REDIS_OK;
2487
2488 werr:
2489 fclose(fp);
2490 unlink(tmpfile);
2491 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2492 if (di) dictReleaseIterator(di);
2493 return REDIS_ERR;
2494 }
2495
2496 static int rdbSaveBackground(char *filename) {
2497 pid_t childpid;
2498
2499 if (server.bgsaveinprogress) return REDIS_ERR;
2500 if ((childpid = fork()) == 0) {
2501 /* Child */
2502 close(server.fd);
2503 if (rdbSave(filename) == REDIS_OK) {
2504 exit(0);
2505 } else {
2506 exit(1);
2507 }
2508 } else {
2509 /* Parent */
2510 if (childpid == -1) {
2511 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2512 strerror(errno));
2513 return REDIS_ERR;
2514 }
2515 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2516 server.bgsaveinprogress = 1;
2517 server.bgsavechildpid = childpid;
2518 return REDIS_OK;
2519 }
2520 return REDIS_OK; /* unreached */
2521 }
2522
2523 static void rdbRemoveTempFile(pid_t childpid) {
2524 char tmpfile[256];
2525
2526 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2527 unlink(tmpfile);
2528 }
2529
2530 static int rdbLoadType(FILE *fp) {
2531 unsigned char type;
2532 if (fread(&type,1,1,fp) == 0) return -1;
2533 return type;
2534 }
2535
2536 static time_t rdbLoadTime(FILE *fp) {
2537 int32_t t32;
2538 if (fread(&t32,4,1,fp) == 0) return -1;
2539 return (time_t) t32;
2540 }
2541
2542 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2543 * of this file for a description of how this are stored on disk.
2544 *
2545 * isencoded is set to 1 if the readed length is not actually a length but
2546 * an "encoding type", check the above comments for more info */
2547 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2548 unsigned char buf[2];
2549 uint32_t len;
2550
2551 if (isencoded) *isencoded = 0;
2552 if (rdbver == 0) {
2553 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2554 return ntohl(len);
2555 } else {
2556 int type;
2557
2558 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2559 type = (buf[0]&0xC0)>>6;
2560 if (type == REDIS_RDB_6BITLEN) {
2561 /* Read a 6 bit len */
2562 return buf[0]&0x3F;
2563 } else if (type == REDIS_RDB_ENCVAL) {
2564 /* Read a 6 bit len encoding type */
2565 if (isencoded) *isencoded = 1;
2566 return buf[0]&0x3F;
2567 } else if (type == REDIS_RDB_14BITLEN) {
2568 /* Read a 14 bit len */
2569 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2570 return ((buf[0]&0x3F)<<8)|buf[1];
2571 } else {
2572 /* Read a 32 bit len */
2573 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2574 return ntohl(len);
2575 }
2576 }
2577 }
2578
2579 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2580 unsigned char enc[4];
2581 long long val;
2582
2583 if (enctype == REDIS_RDB_ENC_INT8) {
2584 if (fread(enc,1,1,fp) == 0) return NULL;
2585 val = (signed char)enc[0];
2586 } else if (enctype == REDIS_RDB_ENC_INT16) {
2587 uint16_t v;
2588 if (fread(enc,2,1,fp) == 0) return NULL;
2589 v = enc[0]|(enc[1]<<8);
2590 val = (int16_t)v;
2591 } else if (enctype == REDIS_RDB_ENC_INT32) {
2592 uint32_t v;
2593 if (fread(enc,4,1,fp) == 0) return NULL;
2594 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2595 val = (int32_t)v;
2596 } else {
2597 val = 0; /* anti-warning */
2598 assert(0!=0);
2599 }
2600 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2601 }
2602
2603 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2604 unsigned int len, clen;
2605 unsigned char *c = NULL;
2606 sds val = NULL;
2607
2608 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2609 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2610 if ((c = zmalloc(clen)) == NULL) goto err;
2611 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2612 if (fread(c,clen,1,fp) == 0) goto err;
2613 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2614 zfree(c);
2615 return createObject(REDIS_STRING,val);
2616 err:
2617 zfree(c);
2618 sdsfree(val);
2619 return NULL;
2620 }
2621
2622 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2623 int isencoded;
2624 uint32_t len;
2625 sds val;
2626
2627 len = rdbLoadLen(fp,rdbver,&isencoded);
2628 if (isencoded) {
2629 switch(len) {
2630 case REDIS_RDB_ENC_INT8:
2631 case REDIS_RDB_ENC_INT16:
2632 case REDIS_RDB_ENC_INT32:
2633 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2634 case REDIS_RDB_ENC_LZF:
2635 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2636 default:
2637 assert(0!=0);
2638 }
2639 }
2640
2641 if (len == REDIS_RDB_LENERR) return NULL;
2642 val = sdsnewlen(NULL,len);
2643 if (len && fread(val,len,1,fp) == 0) {
2644 sdsfree(val);
2645 return NULL;
2646 }
2647 return tryObjectSharing(createObject(REDIS_STRING,val));
2648 }
2649
2650 /* For information about double serialization check rdbSaveDoubleValue() */
2651 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2652 char buf[128];
2653 unsigned char len;
2654
2655 if (fread(&len,1,1,fp) == 0) return -1;
2656 switch(len) {
2657 case 255: *val = R_NegInf; return 0;
2658 case 254: *val = R_PosInf; return 0;
2659 case 253: *val = R_Nan; return 0;
2660 default:
2661 if (fread(buf,len,1,fp) == 0) return -1;
2662 sscanf(buf, "%lg", val);
2663 return 0;
2664 }
2665 }
2666
2667 static int rdbLoad(char *filename) {
2668 FILE *fp;
2669 robj *keyobj = NULL;
2670 uint32_t dbid;
2671 int type, retval, rdbver;
2672 dict *d = server.db[0].dict;
2673 redisDb *db = server.db+0;
2674 char buf[1024];
2675 time_t expiretime = -1, now = time(NULL);
2676
2677 fp = fopen(filename,"r");
2678 if (!fp) return REDIS_ERR;
2679 if (fread(buf,9,1,fp) == 0) goto eoferr;
2680 buf[9] = '\0';
2681 if (memcmp(buf,"REDIS",5) != 0) {
2682 fclose(fp);
2683 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2684 return REDIS_ERR;
2685 }
2686 rdbver = atoi(buf+5);
2687 if (rdbver > 1) {
2688 fclose(fp);
2689 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2690 return REDIS_ERR;
2691 }
2692 while(1) {
2693 robj *o;
2694
2695 /* Read type. */
2696 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2697 if (type == REDIS_EXPIRETIME) {
2698 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2699 /* We read the time so we need to read the object type again */
2700 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2701 }
2702 if (type == REDIS_EOF) break;
2703 /* Handle SELECT DB opcode as a special case */
2704 if (type == REDIS_SELECTDB) {
2705 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2706 goto eoferr;
2707 if (dbid >= (unsigned)server.dbnum) {
2708 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2709 exit(1);
2710 }
2711 db = server.db+dbid;
2712 d = db->dict;
2713 continue;
2714 }
2715 /* Read key */
2716 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2717
2718 if (type == REDIS_STRING) {
2719 /* Read string value */
2720 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2721 tryObjectEncoding(o);
2722 } else if (type == REDIS_LIST || type == REDIS_SET) {
2723 /* Read list/set value */
2724 uint32_t listlen;
2725
2726 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2727 goto eoferr;
2728 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2729 /* Load every single element of the list/set */
2730 while(listlen--) {
2731 robj *ele;
2732
2733 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2734 tryObjectEncoding(ele);
2735 if (type == REDIS_LIST) {
2736 listAddNodeTail((list*)o->ptr,ele);
2737 } else {
2738 dictAdd((dict*)o->ptr,ele,NULL);
2739 }
2740 }
2741 } else if (type == REDIS_ZSET) {
2742 /* Read list/set value */
2743 uint32_t zsetlen;
2744 zset *zs;
2745
2746 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2747 goto eoferr;
2748 o = createZsetObject();
2749 zs = o->ptr;
2750 /* Load every single element of the list/set */
2751 while(zsetlen--) {
2752 robj *ele;
2753 double *score = zmalloc(sizeof(double));
2754
2755 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2756 tryObjectEncoding(ele);
2757 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2758 dictAdd(zs->dict,ele,score);
2759 zslInsert(zs->zsl,*score,ele);
2760 incrRefCount(ele); /* added to skiplist */
2761 }
2762 } else {
2763 assert(0 != 0);
2764 }
2765 /* Add the new object in the hash table */
2766 retval = dictAdd(d,keyobj,o);
2767 if (retval == DICT_ERR) {
2768 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2769 exit(1);
2770 }
2771 /* Set the expire time if needed */
2772 if (expiretime != -1) {
2773 setExpire(db,keyobj,expiretime);
2774 /* Delete this key if already expired */
2775 if (expiretime < now) deleteKey(db,keyobj);
2776 expiretime = -1;
2777 }
2778 keyobj = o = NULL;
2779 }
2780 fclose(fp);
2781 return REDIS_OK;
2782
2783 eoferr: /* unexpected end of file is handled here with a fatal exit */
2784 if (keyobj) decrRefCount(keyobj);
2785 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2786 exit(1);
2787 return REDIS_ERR; /* Just to avoid warning */
2788 }
2789
2790 /*================================== Commands =============================== */
2791
2792 static void authCommand(redisClient *c) {
2793 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2794 c->authenticated = 1;
2795 addReply(c,shared.ok);
2796 } else {
2797 c->authenticated = 0;
2798 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2799 }
2800 }
2801
2802 static void pingCommand(redisClient *c) {
2803 addReply(c,shared.pong);
2804 }
2805
2806 static void echoCommand(redisClient *c) {
2807 addReplyBulkLen(c,c->argv[1]);
2808 addReply(c,c->argv[1]);
2809 addReply(c,shared.crlf);
2810 }
2811
2812 /*=================================== Strings =============================== */
2813
2814 static void setGenericCommand(redisClient *c, int nx) {
2815 int retval;
2816
2817 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2818 if (retval == DICT_ERR) {
2819 if (!nx) {
2820 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2821 incrRefCount(c->argv[2]);
2822 } else {
2823 addReply(c,shared.czero);
2824 return;
2825 }
2826 } else {
2827 incrRefCount(c->argv[1]);
2828 incrRefCount(c->argv[2]);
2829 }
2830 server.dirty++;
2831 removeExpire(c->db,c->argv[1]);
2832 addReply(c, nx ? shared.cone : shared.ok);
2833 }
2834
2835 static void setCommand(redisClient *c) {
2836 setGenericCommand(c,0);
2837 }
2838
2839 static void setnxCommand(redisClient *c) {
2840 setGenericCommand(c,1);
2841 }
2842
2843 static void getCommand(redisClient *c) {
2844 robj *o = lookupKeyRead(c->db,c->argv[1]);
2845
2846 if (o == NULL) {
2847 addReply(c,shared.nullbulk);
2848 } else {
2849 if (o->type != REDIS_STRING) {
2850 addReply(c,shared.wrongtypeerr);
2851 } else {
2852 addReplyBulkLen(c,o);
2853 addReply(c,o);
2854 addReply(c,shared.crlf);
2855 }
2856 }
2857 }
2858
2859 static void getsetCommand(redisClient *c) {
2860 getCommand(c);
2861 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2862 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2863 } else {
2864 incrRefCount(c->argv[1]);
2865 }
2866 incrRefCount(c->argv[2]);
2867 server.dirty++;
2868 removeExpire(c->db,c->argv[1]);
2869 }
2870
2871 static void mgetCommand(redisClient *c) {
2872 int j;
2873
2874 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2875 for (j = 1; j < c->argc; j++) {
2876 robj *o = lookupKeyRead(c->db,c->argv[j]);
2877 if (o == NULL) {
2878 addReply(c,shared.nullbulk);
2879 } else {
2880 if (o->type != REDIS_STRING) {
2881 addReply(c,shared.nullbulk);
2882 } else {
2883 addReplyBulkLen(c,o);
2884 addReply(c,o);
2885 addReply(c,shared.crlf);
2886 }
2887 }
2888 }
2889 }
2890
2891 static void incrDecrCommand(redisClient *c, long long incr) {
2892 long long value;
2893 int retval;
2894 robj *o;
2895
2896 o = lookupKeyWrite(c->db,c->argv[1]);
2897 if (o == NULL) {
2898 value = 0;
2899 } else {
2900 if (o->type != REDIS_STRING) {
2901 value = 0;
2902 } else {
2903 char *eptr;
2904
2905 if (o->encoding == REDIS_ENCODING_RAW)
2906 value = strtoll(o->ptr, &eptr, 10);
2907 else if (o->encoding == REDIS_ENCODING_INT)
2908 value = (long)o->ptr;
2909 else
2910 assert(1 != 1);
2911 }
2912 }
2913
2914 value += incr;
2915 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2916 tryObjectEncoding(o);
2917 retval = dictAdd(c->db->dict,c->argv[1],o);
2918 if (retval == DICT_ERR) {
2919 dictReplace(c->db->dict,c->argv[1],o);
2920 removeExpire(c->db,c->argv[1]);
2921 } else {
2922 incrRefCount(c->argv[1]);
2923 }
2924 server.dirty++;
2925 addReply(c,shared.colon);
2926 addReply(c,o);
2927 addReply(c,shared.crlf);
2928 }
2929
2930 static void incrCommand(redisClient *c) {
2931 incrDecrCommand(c,1);
2932 }
2933
2934 static void decrCommand(redisClient *c) {
2935 incrDecrCommand(c,-1);
2936 }
2937
2938 static void incrbyCommand(redisClient *c) {
2939 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2940 incrDecrCommand(c,incr);
2941 }
2942
2943 static void decrbyCommand(redisClient *c) {
2944 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2945 incrDecrCommand(c,-incr);
2946 }
2947
2948 /* ========================= Type agnostic commands ========================= */
2949
2950 static void delCommand(redisClient *c) {
2951 int deleted = 0, j;
2952
2953 for (j = 1; j < c->argc; j++) {
2954 if (deleteKey(c->db,c->argv[j])) {
2955 server.dirty++;
2956 deleted++;
2957 }
2958 }
2959 switch(deleted) {
2960 case 0:
2961 addReply(c,shared.czero);
2962 break;
2963 case 1:
2964 addReply(c,shared.cone);
2965 break;
2966 default:
2967 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2968 break;
2969 }
2970 }
2971
2972 static void existsCommand(redisClient *c) {
2973 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2974 }
2975
2976 static void selectCommand(redisClient *c) {
2977 int id = atoi(c->argv[1]->ptr);
2978
2979 if (selectDb(c,id) == REDIS_ERR) {
2980 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2981 } else {
2982 addReply(c,shared.ok);
2983 }
2984 }
2985
2986 static void randomkeyCommand(redisClient *c) {
2987 dictEntry *de;
2988
2989 while(1) {
2990 de = dictGetRandomKey(c->db->dict);
2991 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2992 }
2993 if (de == NULL) {
2994 addReply(c,shared.plus);
2995 addReply(c,shared.crlf);
2996 } else {
2997 addReply(c,shared.plus);
2998 addReply(c,dictGetEntryKey(de));
2999 addReply(c,shared.crlf);
3000 }
3001 }
3002
3003 static void keysCommand(redisClient *c) {
3004 dictIterator *di;
3005 dictEntry *de;
3006 sds pattern = c->argv[1]->ptr;
3007 int plen = sdslen(pattern);
3008 int numkeys = 0, keyslen = 0;
3009 robj *lenobj = createObject(REDIS_STRING,NULL);
3010
3011 di = dictGetIterator(c->db->dict);
3012 addReply(c,lenobj);
3013 decrRefCount(lenobj);
3014 while((de = dictNext(di)) != NULL) {
3015 robj *keyobj = dictGetEntryKey(de);
3016
3017 sds key = keyobj->ptr;
3018 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3019 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3020 if (expireIfNeeded(c->db,keyobj) == 0) {
3021 if (numkeys != 0)
3022 addReply(c,shared.space);
3023 addReply(c,keyobj);
3024 numkeys++;
3025 keyslen += sdslen(key);
3026 }
3027 }
3028 }
3029 dictReleaseIterator(di);
3030 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3031 addReply(c,shared.crlf);
3032 }
3033
3034 static void dbsizeCommand(redisClient *c) {
3035 addReplySds(c,
3036 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3037 }
3038
3039 static void lastsaveCommand(redisClient *c) {
3040 addReplySds(c,
3041 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3042 }
3043
3044 static void typeCommand(redisClient *c) {
3045 robj *o;
3046 char *type;
3047
3048 o = lookupKeyRead(c->db,c->argv[1]);
3049 if (o == NULL) {
3050 type = "+none";
3051 } else {
3052 switch(o->type) {
3053 case REDIS_STRING: type = "+string"; break;
3054 case REDIS_LIST: type = "+list"; break;
3055 case REDIS_SET: type = "+set"; break;
3056 default: type = "unknown"; break;
3057 }
3058 }
3059 addReplySds(c,sdsnew(type));
3060 addReply(c,shared.crlf);
3061 }
3062
3063 static void saveCommand(redisClient *c) {
3064 if (server.bgsaveinprogress) {
3065 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3066 return;
3067 }
3068 if (rdbSave(server.dbfilename) == REDIS_OK) {
3069 addReply(c,shared.ok);
3070 } else {
3071 addReply(c,shared.err);
3072 }
3073 }
3074
3075 static void bgsaveCommand(redisClient *c) {
3076 if (server.bgsaveinprogress) {
3077 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3078 return;
3079 }
3080 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3081 addReply(c,shared.ok);
3082 } else {
3083 addReply(c,shared.err);
3084 }
3085 }
3086
3087 static void shutdownCommand(redisClient *c) {
3088 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3089 /* Kill the saving child if there is a background saving in progress.
3090 We want to avoid race conditions, for instance our saving child may
3091 overwrite the synchronous saving did by SHUTDOWN. */
3092 if (server.bgsaveinprogress) {
3093 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3094 kill(server.bgsavechildpid,SIGKILL);
3095 rdbRemoveTempFile(server.bgsavechildpid);
3096 }
3097 /* SYNC SAVE */
3098 if (rdbSave(server.dbfilename) == REDIS_OK) {
3099 if (server.daemonize)
3100 unlink(server.pidfile);
3101 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3102 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3103 exit(1);
3104 } else {
3105 /* Ooops.. error saving! The best we can do is to continue operating.
3106 * Note that if there was a background saving process, in the next
3107 * cron() Redis will be notified that the background saving aborted,
3108 * handling special stuff like slaves pending for synchronization... */
3109 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3110 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3111 }
3112 }
3113
3114 static void renameGenericCommand(redisClient *c, int nx) {
3115 robj *o;
3116
3117 /* To use the same key as src and dst is probably an error */
3118 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3119 addReply(c,shared.sameobjecterr);
3120 return;
3121 }
3122
3123 o = lookupKeyWrite(c->db,c->argv[1]);
3124 if (o == NULL) {
3125 addReply(c,shared.nokeyerr);
3126 return;
3127 }
3128 incrRefCount(o);
3129 deleteIfVolatile(c->db,c->argv[2]);
3130 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3131 if (nx) {
3132 decrRefCount(o);
3133 addReply(c,shared.czero);
3134 return;
3135 }
3136 dictReplace(c->db->dict,c->argv[2],o);
3137 } else {
3138 incrRefCount(c->argv[2]);
3139 }
3140 deleteKey(c->db,c->argv[1]);
3141 server.dirty++;
3142 addReply(c,nx ? shared.cone : shared.ok);
3143 }
3144
3145 static void renameCommand(redisClient *c) {
3146 renameGenericCommand(c,0);
3147 }
3148
3149 static void renamenxCommand(redisClient *c) {
3150 renameGenericCommand(c,1);
3151 }
3152
3153 static void moveCommand(redisClient *c) {
3154 robj *o;
3155 redisDb *src, *dst;
3156 int srcid;
3157
3158 /* Obtain source and target DB pointers */
3159 src = c->db;
3160 srcid = c->db->id;
3161 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3162 addReply(c,shared.outofrangeerr);
3163 return;
3164 }
3165 dst = c->db;
3166 selectDb(c,srcid); /* Back to the source DB */
3167
3168 /* If the user is moving using as target the same
3169 * DB as the source DB it is probably an error. */
3170 if (src == dst) {
3171 addReply(c,shared.sameobjecterr);
3172 return;
3173 }
3174
3175 /* Check if the element exists and get a reference */
3176 o = lookupKeyWrite(c->db,c->argv[1]);
3177 if (!o) {
3178 addReply(c,shared.czero);
3179 return;
3180 }
3181
3182 /* Try to add the element to the target DB */
3183 deleteIfVolatile(dst,c->argv[1]);
3184 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3185 addReply(c,shared.czero);
3186 return;
3187 }
3188 incrRefCount(c->argv[1]);
3189 incrRefCount(o);
3190
3191 /* OK! key moved, free the entry in the source DB */
3192 deleteKey(src,c->argv[1]);
3193 server.dirty++;
3194 addReply(c,shared.cone);
3195 }
3196
3197 /* =================================== Lists ================================ */
3198 static void pushGenericCommand(redisClient *c, int where) {
3199 robj *lobj;
3200 list *list;
3201
3202 lobj = lookupKeyWrite(c->db,c->argv[1]);
3203 if (lobj == NULL) {
3204 lobj = createListObject();
3205 list = lobj->ptr;
3206 if (where == REDIS_HEAD) {
3207 listAddNodeHead(list,c->argv[2]);
3208 } else {
3209 listAddNodeTail(list,c->argv[2]);
3210 }
3211 dictAdd(c->db->dict,c->argv[1],lobj);
3212 incrRefCount(c->argv[1]);
3213 incrRefCount(c->argv[2]);
3214 } else {
3215 if (lobj->type != REDIS_LIST) {
3216 addReply(c,shared.wrongtypeerr);
3217 return;
3218 }
3219 list = lobj->ptr;
3220 if (where == REDIS_HEAD) {
3221 listAddNodeHead(list,c->argv[2]);
3222 } else {
3223 listAddNodeTail(list,c->argv[2]);
3224 }
3225 incrRefCount(c->argv[2]);
3226 }
3227 server.dirty++;
3228 addReply(c,shared.ok);
3229 }
3230
3231 static void lpushCommand(redisClient *c) {
3232 pushGenericCommand(c,REDIS_HEAD);
3233 }
3234
3235 static void rpushCommand(redisClient *c) {
3236 pushGenericCommand(c,REDIS_TAIL);
3237 }
3238
3239 static void llenCommand(redisClient *c) {
3240 robj *o;
3241 list *l;
3242
3243 o = lookupKeyRead(c->db,c->argv[1]);
3244 if (o == NULL) {
3245 addReply(c,shared.czero);
3246 return;
3247 } else {
3248 if (o->type != REDIS_LIST) {
3249 addReply(c,shared.wrongtypeerr);
3250 } else {
3251 l = o->ptr;
3252 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3253 }
3254 }
3255 }
3256
3257 static void lindexCommand(redisClient *c) {
3258 robj *o;
3259 int index = atoi(c->argv[2]->ptr);
3260
3261 o = lookupKeyRead(c->db,c->argv[1]);
3262 if (o == NULL) {
3263 addReply(c,shared.nullbulk);
3264 } else {
3265 if (o->type != REDIS_LIST) {
3266 addReply(c,shared.wrongtypeerr);
3267 } else {
3268 list *list = o->ptr;
3269 listNode *ln;
3270
3271 ln = listIndex(list, index);
3272 if (ln == NULL) {
3273 addReply(c,shared.nullbulk);
3274 } else {
3275 robj *ele = listNodeValue(ln);
3276 addReplyBulkLen(c,ele);
3277 addReply(c,ele);
3278 addReply(c,shared.crlf);
3279 }
3280 }
3281 }
3282 }
3283
3284 static void lsetCommand(redisClient *c) {
3285 robj *o;
3286 int index = atoi(c->argv[2]->ptr);
3287
3288 o = lookupKeyWrite(c->db,c->argv[1]);
3289 if (o == NULL) {
3290 addReply(c,shared.nokeyerr);
3291 } else {
3292 if (o->type != REDIS_LIST) {
3293 addReply(c,shared.wrongtypeerr);
3294 } else {
3295 list *list = o->ptr;
3296 listNode *ln;
3297
3298 ln = listIndex(list, index);
3299 if (ln == NULL) {
3300 addReply(c,shared.outofrangeerr);
3301 } else {
3302 robj *ele = listNodeValue(ln);
3303
3304 decrRefCount(ele);
3305 listNodeValue(ln) = c->argv[3];
3306 incrRefCount(c->argv[3]);
3307 addReply(c,shared.ok);
3308 server.dirty++;
3309 }
3310 }
3311 }
3312 }
3313
3314 static void popGenericCommand(redisClient *c, int where) {
3315 robj *o;
3316
3317 o = lookupKeyWrite(c->db,c->argv[1]);
3318 if (o == NULL) {
3319 addReply(c,shared.nullbulk);
3320 } else {
3321 if (o->type != REDIS_LIST) {
3322 addReply(c,shared.wrongtypeerr);
3323 } else {
3324 list *list = o->ptr;
3325 listNode *ln;
3326
3327 if (where == REDIS_HEAD)
3328 ln = listFirst(list);
3329 else
3330 ln = listLast(list);
3331
3332 if (ln == NULL) {
3333 addReply(c,shared.nullbulk);
3334 } else {
3335 robj *ele = listNodeValue(ln);
3336 addReplyBulkLen(c,ele);
3337 addReply(c,ele);
3338 addReply(c,shared.crlf);
3339 listDelNode(list,ln);
3340 server.dirty++;
3341 }
3342 }
3343 }
3344 }
3345
3346 static void lpopCommand(redisClient *c) {
3347 popGenericCommand(c,REDIS_HEAD);
3348 }
3349
3350 static void rpopCommand(redisClient *c) {
3351 popGenericCommand(c,REDIS_TAIL);
3352 }
3353
3354 static void lrangeCommand(redisClient *c) {
3355 robj *o;
3356 int start = atoi(c->argv[2]->ptr);
3357 int end = atoi(c->argv[3]->ptr);
3358
3359 o = lookupKeyRead(c->db,c->argv[1]);
3360 if (o == NULL) {
3361 addReply(c,shared.nullmultibulk);
3362 } else {
3363 if (o->type != REDIS_LIST) {
3364 addReply(c,shared.wrongtypeerr);
3365 } else {
3366 list *list = o->ptr;
3367 listNode *ln;
3368 int llen = listLength(list);
3369 int rangelen, j;
3370 robj *ele;
3371
3372 /* convert negative indexes */
3373 if (start < 0) start = llen+start;
3374 if (end < 0) end = llen+end;
3375 if (start < 0) start = 0;
3376 if (end < 0) end = 0;
3377
3378 /* indexes sanity checks */
3379 if (start > end || start >= llen) {
3380 /* Out of range start or start > end result in empty list */
3381 addReply(c,shared.emptymultibulk);
3382 return;
3383 }
3384 if (end >= llen) end = llen-1;
3385 rangelen = (end-start)+1;
3386
3387 /* Return the result in form of a multi-bulk reply */
3388 ln = listIndex(list, start);
3389 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3390 for (j = 0; j < rangelen; j++) {
3391 ele = listNodeValue(ln);
3392 addReplyBulkLen(c,ele);
3393 addReply(c,ele);
3394 addReply(c,shared.crlf);
3395 ln = ln->next;
3396 }
3397 }
3398 }
3399 }
3400
3401 static void ltrimCommand(redisClient *c) {
3402 robj *o;
3403 int start = atoi(c->argv[2]->ptr);
3404 int end = atoi(c->argv[3]->ptr);
3405
3406 o = lookupKeyWrite(c->db,c->argv[1]);
3407 if (o == NULL) {
3408 addReply(c,shared.nokeyerr);
3409 } else {
3410 if (o->type != REDIS_LIST) {
3411 addReply(c,shared.wrongtypeerr);
3412 } else {
3413 list *list = o->ptr;
3414 listNode *ln;
3415 int llen = listLength(list);
3416 int j, ltrim, rtrim;
3417
3418 /* convert negative indexes */
3419 if (start < 0) start = llen+start;
3420 if (end < 0) end = llen+end;
3421 if (start < 0) start = 0;
3422 if (end < 0) end = 0;
3423
3424 /* indexes sanity checks */
3425 if (start > end || start >= llen) {
3426 /* Out of range start or start > end result in empty list */
3427 ltrim = llen;
3428 rtrim = 0;
3429 } else {
3430 if (end >= llen) end = llen-1;
3431 ltrim = start;
3432 rtrim = llen-end-1;
3433 }
3434
3435 /* Remove list elements to perform the trim */
3436 for (j = 0; j < ltrim; j++) {
3437 ln = listFirst(list);
3438 listDelNode(list,ln);
3439 }
3440 for (j = 0; j < rtrim; j++) {
3441 ln = listLast(list);
3442 listDelNode(list,ln);
3443 }
3444 server.dirty++;
3445 addReply(c,shared.ok);
3446 }
3447 }
3448 }
3449
3450 static void lremCommand(redisClient *c) {
3451 robj *o;
3452
3453 o = lookupKeyWrite(c->db,c->argv[1]);
3454 if (o == NULL) {
3455 addReply(c,shared.czero);
3456 } else {
3457 if (o->type != REDIS_LIST) {
3458 addReply(c,shared.wrongtypeerr);
3459 } else {
3460 list *list = o->ptr;
3461 listNode *ln, *next;
3462 int toremove = atoi(c->argv[2]->ptr);
3463 int removed = 0;
3464 int fromtail = 0;
3465
3466 if (toremove < 0) {
3467 toremove = -toremove;
3468 fromtail = 1;
3469 }
3470 ln = fromtail ? list->tail : list->head;
3471 while (ln) {
3472 robj *ele = listNodeValue(ln);
3473
3474 next = fromtail ? ln->prev : ln->next;
3475 if (compareStringObjects(ele,c->argv[3]) == 0) {
3476 listDelNode(list,ln);
3477 server.dirty++;
3478 removed++;
3479 if (toremove && removed == toremove) break;
3480 }
3481 ln = next;
3482 }
3483 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3484 }
3485 }
3486 }
3487
3488 /* ==================================== Sets ================================ */
3489
3490 static void saddCommand(redisClient *c) {
3491 robj *set;
3492
3493 set = lookupKeyWrite(c->db,c->argv[1]);
3494 if (set == NULL) {
3495 set = createSetObject();
3496 dictAdd(c->db->dict,c->argv[1],set);
3497 incrRefCount(c->argv[1]);
3498 } else {
3499 if (set->type != REDIS_SET) {
3500 addReply(c,shared.wrongtypeerr);
3501 return;
3502 }
3503 }
3504 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3505 incrRefCount(c->argv[2]);
3506 server.dirty++;
3507 addReply(c,shared.cone);
3508 } else {
3509 addReply(c,shared.czero);
3510 }
3511 }
3512
3513 static void sremCommand(redisClient *c) {
3514 robj *set;
3515
3516 set = lookupKeyWrite(c->db,c->argv[1]);
3517 if (set == NULL) {
3518 addReply(c,shared.czero);
3519 } else {
3520 if (set->type != REDIS_SET) {
3521 addReply(c,shared.wrongtypeerr);
3522 return;
3523 }
3524 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3525 server.dirty++;
3526 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3527 addReply(c,shared.cone);
3528 } else {
3529 addReply(c,shared.czero);
3530 }
3531 }
3532 }
3533
3534 static void smoveCommand(redisClient *c) {
3535 robj *srcset, *dstset;
3536
3537 srcset = lookupKeyWrite(c->db,c->argv[1]);
3538 dstset = lookupKeyWrite(c->db,c->argv[2]);
3539
3540 /* If the source key does not exist return 0, if it's of the wrong type
3541 * raise an error */
3542 if (srcset == NULL || srcset->type != REDIS_SET) {
3543 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3544 return;
3545 }
3546 /* Error if the destination key is not a set as well */
3547 if (dstset && dstset->type != REDIS_SET) {
3548 addReply(c,shared.wrongtypeerr);
3549 return;
3550 }
3551 /* Remove the element from the source set */
3552 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3553 /* Key not found in the src set! return zero */
3554 addReply(c,shared.czero);
3555 return;
3556 }
3557 server.dirty++;
3558 /* Add the element to the destination set */
3559 if (!dstset) {
3560 dstset = createSetObject();
3561 dictAdd(c->db->dict,c->argv[2],dstset);
3562 incrRefCount(c->argv[2]);
3563 }
3564 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3565 incrRefCount(c->argv[3]);
3566 addReply(c,shared.cone);
3567 }
3568
3569 static void sismemberCommand(redisClient *c) {
3570 robj *set;
3571
3572 set = lookupKeyRead(c->db,c->argv[1]);
3573 if (set == NULL) {
3574 addReply(c,shared.czero);
3575 } else {
3576 if (set->type != REDIS_SET) {
3577 addReply(c,shared.wrongtypeerr);
3578 return;
3579 }
3580 if (dictFind(set->ptr,c->argv[2]))
3581 addReply(c,shared.cone);
3582 else
3583 addReply(c,shared.czero);
3584 }
3585 }
3586
3587 static void scardCommand(redisClient *c) {
3588 robj *o;
3589 dict *s;
3590
3591 o = lookupKeyRead(c->db,c->argv[1]);
3592 if (o == NULL) {
3593 addReply(c,shared.czero);
3594 return;
3595 } else {
3596 if (o->type != REDIS_SET) {
3597 addReply(c,shared.wrongtypeerr);
3598 } else {
3599 s = o->ptr;
3600 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3601 dictSize(s)));
3602 }
3603 }
3604 }
3605
3606 static void spopCommand(redisClient *c) {
3607 robj *set;
3608 dictEntry *de;
3609
3610 set = lookupKeyWrite(c->db,c->argv[1]);
3611 if (set == NULL) {
3612 addReply(c,shared.nullbulk);
3613 } else {
3614 if (set->type != REDIS_SET) {
3615 addReply(c,shared.wrongtypeerr);
3616 return;
3617 }
3618 de = dictGetRandomKey(set->ptr);
3619 if (de == NULL) {
3620 addReply(c,shared.nullbulk);
3621 } else {
3622 robj *ele = dictGetEntryKey(de);
3623
3624 addReplyBulkLen(c,ele);
3625 addReply(c,ele);
3626 addReply(c,shared.crlf);
3627 dictDelete(set->ptr,ele);
3628 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3629 server.dirty++;
3630 }
3631 }
3632 }
3633
3634 static void srandmemberCommand(redisClient *c) {
3635 robj *set;
3636 dictEntry *de;
3637
3638 set = lookupKeyRead(c->db,c->argv[1]);
3639 if (set == NULL) {
3640 addReply(c,shared.nullbulk);
3641 } else {
3642 if (set->type != REDIS_SET) {
3643 addReply(c,shared.wrongtypeerr);
3644 return;
3645 }
3646 de = dictGetRandomKey(set->ptr);
3647 if (de == NULL) {
3648 addReply(c,shared.nullbulk);
3649 } else {
3650 robj *ele = dictGetEntryKey(de);
3651
3652 addReplyBulkLen(c,ele);
3653 addReply(c,ele);
3654 addReply(c,shared.crlf);
3655 }
3656 }
3657 }
3658
3659 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3660 dict **d1 = (void*) s1, **d2 = (void*) s2;
3661
3662 return dictSize(*d1)-dictSize(*d2);
3663 }
3664
3665 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3666 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3667 dictIterator *di;
3668 dictEntry *de;
3669 robj *lenobj = NULL, *dstset = NULL;
3670 int j, cardinality = 0;
3671
3672 for (j = 0; j < setsnum; j++) {
3673 robj *setobj;
3674
3675 setobj = dstkey ?
3676 lookupKeyWrite(c->db,setskeys[j]) :
3677 lookupKeyRead(c->db,setskeys[j]);
3678 if (!setobj) {
3679 zfree(dv);
3680 if (dstkey) {
3681 deleteKey(c->db,dstkey);
3682 addReply(c,shared.ok);
3683 } else {
3684 addReply(c,shared.nullmultibulk);
3685 }
3686 return;
3687 }
3688 if (setobj->type != REDIS_SET) {
3689 zfree(dv);
3690 addReply(c,shared.wrongtypeerr);
3691 return;
3692 }
3693 dv[j] = setobj->ptr;
3694 }
3695 /* Sort sets from the smallest to largest, this will improve our
3696 * algorithm's performace */
3697 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3698
3699 /* The first thing we should output is the total number of elements...
3700 * since this is a multi-bulk write, but at this stage we don't know
3701 * the intersection set size, so we use a trick, append an empty object
3702 * to the output list and save the pointer to later modify it with the
3703 * right length */
3704 if (!dstkey) {
3705 lenobj = createObject(REDIS_STRING,NULL);
3706 addReply(c,lenobj);
3707 decrRefCount(lenobj);
3708 } else {
3709 /* If we have a target key where to store the resulting set
3710 * create this key with an empty set inside */
3711 dstset = createSetObject();
3712 }
3713
3714 /* Iterate all the elements of the first (smallest) set, and test
3715 * the element against all the other sets, if at least one set does
3716 * not include the element it is discarded */
3717 di = dictGetIterator(dv[0]);
3718
3719 while((de = dictNext(di)) != NULL) {
3720 robj *ele;
3721
3722 for (j = 1; j < setsnum; j++)
3723 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3724 if (j != setsnum)
3725 continue; /* at least one set does not contain the member */
3726 ele = dictGetEntryKey(de);
3727 if (!dstkey) {
3728 addReplyBulkLen(c,ele);
3729 addReply(c,ele);
3730 addReply(c,shared.crlf);
3731 cardinality++;
3732 } else {
3733 dictAdd(dstset->ptr,ele,NULL);
3734 incrRefCount(ele);
3735 }
3736 }
3737 dictReleaseIterator(di);
3738
3739 if (dstkey) {
3740 /* Store the resulting set into the target */
3741 deleteKey(c->db,dstkey);
3742 dictAdd(c->db->dict,dstkey,dstset);
3743 incrRefCount(dstkey);
3744 }
3745
3746 if (!dstkey) {
3747 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3748 } else {
3749 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3750 dictSize((dict*)dstset->ptr)));
3751 server.dirty++;
3752 }
3753 zfree(dv);
3754 }
3755
3756 static void sinterCommand(redisClient *c) {
3757 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3758 }
3759
3760 static void sinterstoreCommand(redisClient *c) {
3761 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3762 }
3763
3764 #define REDIS_OP_UNION 0
3765 #define REDIS_OP_DIFF 1
3766
3767 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3768 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3769 dictIterator *di;
3770 dictEntry *de;
3771 robj *dstset = NULL;
3772 int j, cardinality = 0;
3773
3774 for (j = 0; j < setsnum; j++) {
3775 robj *setobj;
3776
3777 setobj = dstkey ?
3778 lookupKeyWrite(c->db,setskeys[j]) :
3779 lookupKeyRead(c->db,setskeys[j]);
3780 if (!setobj) {
3781 dv[j] = NULL;
3782 continue;
3783 }
3784 if (setobj->type != REDIS_SET) {
3785 zfree(dv);
3786 addReply(c,shared.wrongtypeerr);
3787 return;
3788 }
3789 dv[j] = setobj->ptr;
3790 }
3791
3792 /* We need a temp set object to store our union. If the dstkey
3793 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3794 * this set object will be the resulting object to set into the target key*/
3795 dstset = createSetObject();
3796
3797 /* Iterate all the elements of all the sets, add every element a single
3798 * time to the result set */
3799 for (j = 0; j < setsnum; j++) {
3800 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3801 if (!dv[j]) continue; /* non existing keys are like empty sets */
3802
3803 di = dictGetIterator(dv[j]);
3804
3805 while((de = dictNext(di)) != NULL) {
3806 robj *ele;
3807
3808 /* dictAdd will not add the same element multiple times */
3809 ele = dictGetEntryKey(de);
3810 if (op == REDIS_OP_UNION || j == 0) {
3811 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3812 incrRefCount(ele);
3813 cardinality++;
3814 }
3815 } else if (op == REDIS_OP_DIFF) {
3816 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3817 cardinality--;
3818 }
3819 }
3820 }
3821 dictReleaseIterator(di);
3822
3823 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3824 }
3825
3826 /* Output the content of the resulting set, if not in STORE mode */
3827 if (!dstkey) {
3828 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3829 di = dictGetIterator(dstset->ptr);
3830 while((de = dictNext(di)) != NULL) {
3831 robj *ele;
3832
3833 ele = dictGetEntryKey(de);
3834 addReplyBulkLen(c,ele);
3835 addReply(c,ele);
3836 addReply(c,shared.crlf);
3837 }
3838 dictReleaseIterator(di);
3839 } else {
3840 /* If we have a target key where to store the resulting set
3841 * create this key with the result set inside */
3842 deleteKey(c->db,dstkey);
3843 dictAdd(c->db->dict,dstkey,dstset);
3844 incrRefCount(dstkey);
3845 }
3846
3847 /* Cleanup */
3848 if (!dstkey) {
3849 decrRefCount(dstset);
3850 } else {
3851 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3852 dictSize((dict*)dstset->ptr)));
3853 server.dirty++;
3854 }
3855 zfree(dv);
3856 }
3857
3858 static void sunionCommand(redisClient *c) {
3859 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3860 }
3861
3862 static void sunionstoreCommand(redisClient *c) {
3863 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3864 }
3865
3866 static void sdiffCommand(redisClient *c) {
3867 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3868 }
3869
3870 static void sdiffstoreCommand(redisClient *c) {
3871 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3872 }
3873
3874 /* ==================================== ZSets =============================== */
3875
3876 /* ZSETs are ordered sets using two data structures to hold the same elements
3877 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3878 * data structure.
3879 *
3880 * The elements are added to an hash table mapping Redis objects to scores.
3881 * At the same time the elements are added to a skip list mapping scores
3882 * to Redis objects (so objects are sorted by scores in this "view"). */
3883
3884 /* This skiplist implementation is almost a C translation of the original
3885 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3886 * Alternative to Balanced Trees", modified in three ways:
3887 * a) this implementation allows for repeated values.
3888 * b) the comparison is not just by key (our 'score') but by satellite data.
3889 * c) there is a back pointer, so it's a doubly linked list with the back
3890 * pointers being only at "level 1". This allows to traverse the list
3891 * from tail to head, useful for ZREVRANGE. */
3892
3893 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3894 zskiplistNode *zn = zmalloc(sizeof(*zn));
3895
3896 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3897 zn->score = score;
3898 zn->obj = obj;
3899 return zn;
3900 }
3901
3902 static zskiplist *zslCreate(void) {
3903 int j;
3904 zskiplist *zsl;
3905
3906 zsl = zmalloc(sizeof(*zsl));
3907 zsl->level = 1;
3908 zsl->length = 0;
3909 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3910 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3911 zsl->header->forward[j] = NULL;
3912 zsl->header->backward = NULL;
3913 zsl->tail = NULL;
3914 return zsl;
3915 }
3916
3917 static void zslFreeNode(zskiplistNode *node) {
3918 decrRefCount(node->obj);
3919 zfree(node->forward);
3920 zfree(node);
3921 }
3922
3923 static void zslFree(zskiplist *zsl) {
3924 zskiplistNode *node = zsl->header->forward[0], *next;
3925
3926 zfree(zsl->header->forward);
3927 zfree(zsl->header);
3928 while(node) {
3929 next = node->forward[0];
3930 zslFreeNode(node);
3931 node = next;
3932 }
3933 zfree(zsl);
3934 }
3935
3936 static int zslRandomLevel(void) {
3937 int level = 1;
3938 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3939 level += 1;
3940 return level;
3941 }
3942
3943 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3944 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3945 int i, level;
3946
3947 x = zsl->header;
3948 for (i = zsl->level-1; i >= 0; i--) {
3949 while (x->forward[i] &&
3950 (x->forward[i]->score < score ||
3951 (x->forward[i]->score == score &&
3952 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3953 x = x->forward[i];
3954 update[i] = x;
3955 }
3956 /* we assume the key is not already inside, since we allow duplicated
3957 * scores, and the re-insertion of score and redis object should never
3958 * happpen since the caller of zslInsert() should test in the hash table
3959 * if the element is already inside or not. */
3960 level = zslRandomLevel();
3961 if (level > zsl->level) {
3962 for (i = zsl->level; i < level; i++)
3963 update[i] = zsl->header;
3964 zsl->level = level;
3965 }
3966 x = zslCreateNode(level,score,obj);
3967 for (i = 0; i < level; i++) {
3968 x->forward[i] = update[i]->forward[i];
3969 update[i]->forward[i] = x;
3970 }
3971 x->backward = (update[0] == zsl->header) ? NULL : update[0];
3972 if (x->forward[0])
3973 x->forward[0]->backward = x;
3974 else
3975 zsl->tail = x;
3976 zsl->length++;
3977 }
3978
3979 /* Delete an element with matching score/object from the skiplist. */
3980 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
3981 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3982 int i;
3983
3984 x = zsl->header;
3985 for (i = zsl->level-1; i >= 0; i--) {
3986 while (x->forward[i] &&
3987 (x->forward[i]->score < score ||
3988 (x->forward[i]->score == score &&
3989 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3990 x = x->forward[i];
3991 update[i] = x;
3992 }
3993 /* We may have multiple elements with the same score, what we need
3994 * is to find the element with both the right score and object. */
3995 x = x->forward[0];
3996 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
3997 for (i = 0; i < zsl->level; i++) {
3998 if (update[i]->forward[i] != x) break;
3999 update[i]->forward[i] = x->forward[i];
4000 }
4001 if (x->forward[0]) {
4002 x->forward[0]->backward = (x->backward == zsl->header) ?
4003 NULL : x->backward;
4004 } else {
4005 zsl->tail = x->backward;
4006 }
4007 zslFreeNode(x);
4008 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4009 zsl->level--;
4010 zsl->length--;
4011 return 1;
4012 } else {
4013 return 0; /* not found */
4014 }
4015 return 0; /* not found */
4016 }
4017
4018 /* Delete all the elements with score between min and max from the skiplist.
4019 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4020 * Note that this function takes the reference to the hash table view of the
4021 * sorted set, in order to remove the elements from the hash table too. */
4022 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4023 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4024 unsigned long removed = 0;
4025 int i;
4026
4027 x = zsl->header;
4028 for (i = zsl->level-1; i >= 0; i--) {
4029 while (x->forward[i] && x->forward[i]->score < min)
4030 x = x->forward[i];
4031 update[i] = x;
4032 }
4033 /* We may have multiple elements with the same score, what we need
4034 * is to find the element with both the right score and object. */
4035 x = x->forward[0];
4036 while (x && x->score <= max) {
4037 zskiplistNode *next;
4038
4039 for (i = 0; i < zsl->level; i++) {
4040 if (update[i]->forward[i] != x) break;
4041 update[i]->forward[i] = x->forward[i];
4042 }
4043 if (x->forward[0]) {
4044 x->forward[0]->backward = (x->backward == zsl->header) ?
4045 NULL : x->backward;
4046 } else {
4047 zsl->tail = x->backward;
4048 }
4049 next = x->forward[0];
4050 dictDelete(dict,x->obj);
4051 zslFreeNode(x);
4052 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4053 zsl->level--;
4054 zsl->length--;
4055 removed++;
4056 x = next;
4057 }
4058 return removed; /* not found */
4059 }
4060
4061 /* Find the first node having a score equal or greater than the specified one.
4062 * Returns NULL if there is no match. */
4063 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4064 zskiplistNode *x;
4065 int i;
4066
4067 x = zsl->header;
4068 for (i = zsl->level-1; i >= 0; i--) {
4069 while (x->forward[i] && x->forward[i]->score < score)
4070 x = x->forward[i];
4071 }
4072 /* We may have multiple elements with the same score, what we need
4073 * is to find the element with both the right score and object. */
4074 return x->forward[0];
4075 }
4076
4077 /* The actual Z-commands implementations */
4078
4079 static void zaddCommand(redisClient *c) {
4080 robj *zsetobj;
4081 zset *zs;
4082 double *score;
4083
4084 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4085 if (zsetobj == NULL) {
4086 zsetobj = createZsetObject();
4087 dictAdd(c->db->dict,c->argv[1],zsetobj);
4088 incrRefCount(c->argv[1]);
4089 } else {
4090 if (zsetobj->type != REDIS_ZSET) {
4091 addReply(c,shared.wrongtypeerr);
4092 return;
4093 }
4094 }
4095 score = zmalloc(sizeof(double));
4096 *score = strtod(c->argv[2]->ptr,NULL);
4097 zs = zsetobj->ptr;
4098 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
4099 /* case 1: New element */
4100 incrRefCount(c->argv[3]); /* added to hash */
4101 zslInsert(zs->zsl,*score,c->argv[3]);
4102 incrRefCount(c->argv[3]); /* added to skiplist */
4103 server.dirty++;
4104 addReply(c,shared.cone);
4105 } else {
4106 dictEntry *de;
4107 double *oldscore;
4108
4109 /* case 2: Score update operation */
4110 de = dictFind(zs->dict,c->argv[3]);
4111 assert(de != NULL);
4112 oldscore = dictGetEntryVal(de);
4113 if (*score != *oldscore) {
4114 int deleted;
4115
4116 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
4117 assert(deleted != 0);
4118 zslInsert(zs->zsl,*score,c->argv[3]);
4119 incrRefCount(c->argv[3]);
4120 dictReplace(zs->dict,c->argv[3],score);
4121 server.dirty++;
4122 } else {
4123 zfree(score);
4124 }
4125 addReply(c,shared.czero);
4126 }
4127 }
4128
4129 static void zremCommand(redisClient *c) {
4130 robj *zsetobj;
4131 zset *zs;
4132
4133 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4134 if (zsetobj == NULL) {
4135 addReply(c,shared.czero);
4136 } else {
4137 dictEntry *de;
4138 double *oldscore;
4139 int deleted;
4140
4141 if (zsetobj->type != REDIS_ZSET) {
4142 addReply(c,shared.wrongtypeerr);
4143 return;
4144 }
4145 zs = zsetobj->ptr;
4146 de = dictFind(zs->dict,c->argv[2]);
4147 if (de == NULL) {
4148 addReply(c,shared.czero);
4149 return;
4150 }
4151 /* Delete from the skiplist */
4152 oldscore = dictGetEntryVal(de);
4153 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4154 assert(deleted != 0);
4155
4156 /* Delete from the hash table */
4157 dictDelete(zs->dict,c->argv[2]);
4158 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4159 server.dirty++;
4160 addReply(c,shared.cone);
4161 }
4162 }
4163
4164 static void zremrangebyscoreCommand(redisClient *c) {
4165 double min = strtod(c->argv[2]->ptr,NULL);
4166 double max = strtod(c->argv[3]->ptr,NULL);
4167 robj *zsetobj;
4168 zset *zs;
4169
4170 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4171 if (zsetobj == NULL) {
4172 addReply(c,shared.czero);
4173 } else {
4174 long deleted;
4175
4176 if (zsetobj->type != REDIS_ZSET) {
4177 addReply(c,shared.wrongtypeerr);
4178 return;
4179 }
4180 zs = zsetobj->ptr;
4181 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4182 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4183 server.dirty += deleted;
4184 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4185 }
4186 }
4187
4188 static void zrangeGenericCommand(redisClient *c, int reverse) {
4189 robj *o;
4190 int start = atoi(c->argv[2]->ptr);
4191 int end = atoi(c->argv[3]->ptr);
4192
4193 o = lookupKeyRead(c->db,c->argv[1]);
4194 if (o == NULL) {
4195 addReply(c,shared.nullmultibulk);
4196 } else {
4197 if (o->type != REDIS_ZSET) {
4198 addReply(c,shared.wrongtypeerr);
4199 } else {
4200 zset *zsetobj = o->ptr;
4201 zskiplist *zsl = zsetobj->zsl;
4202 zskiplistNode *ln;
4203
4204 int llen = zsl->length;
4205 int rangelen, j;
4206 robj *ele;
4207
4208 /* convert negative indexes */
4209 if (start < 0) start = llen+start;
4210 if (end < 0) end = llen+end;
4211 if (start < 0) start = 0;
4212 if (end < 0) end = 0;
4213
4214 /* indexes sanity checks */
4215 if (start > end || start >= llen) {
4216 /* Out of range start or start > end result in empty list */
4217 addReply(c,shared.emptymultibulk);
4218 return;
4219 }
4220 if (end >= llen) end = llen-1;
4221 rangelen = (end-start)+1;
4222
4223 /* Return the result in form of a multi-bulk reply */
4224 if (reverse) {
4225 ln = zsl->tail;
4226 while (start--)
4227 ln = ln->backward;
4228 } else {
4229 ln = zsl->header->forward[0];
4230 while (start--)
4231 ln = ln->forward[0];
4232 }
4233
4234 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4235 for (j = 0; j < rangelen; j++) {
4236 ele = ln->obj;
4237 addReplyBulkLen(c,ele);
4238 addReply(c,ele);
4239 addReply(c,shared.crlf);
4240 ln = reverse ? ln->backward : ln->forward[0];
4241 }
4242 }
4243 }
4244 }
4245
4246 static void zrangeCommand(redisClient *c) {
4247 zrangeGenericCommand(c,0);
4248 }
4249
4250 static void zrevrangeCommand(redisClient *c) {
4251 zrangeGenericCommand(c,1);
4252 }
4253
4254 static void zrangebyscoreCommand(redisClient *c) {
4255 robj *o;
4256 double min = strtod(c->argv[2]->ptr,NULL);
4257 double max = strtod(c->argv[3]->ptr,NULL);
4258
4259 o = lookupKeyRead(c->db,c->argv[1]);
4260 if (o == NULL) {
4261 addReply(c,shared.nullmultibulk);
4262 } else {
4263 if (o->type != REDIS_ZSET) {
4264 addReply(c,shared.wrongtypeerr);
4265 } else {
4266 zset *zsetobj = o->ptr;
4267 zskiplist *zsl = zsetobj->zsl;
4268 zskiplistNode *ln;
4269 robj *ele, *lenobj;
4270 unsigned int rangelen = 0;
4271
4272 /* Get the first node with the score >= min */
4273 ln = zslFirstWithScore(zsl,min);
4274 if (ln == NULL) {
4275 /* No element matching the speciifed interval */
4276 addReply(c,shared.emptymultibulk);
4277 return;
4278 }
4279
4280 /* We don't know in advance how many matching elements there
4281 * are in the list, so we push this object that will represent
4282 * the multi-bulk length in the output buffer, and will "fix"
4283 * it later */
4284 lenobj = createObject(REDIS_STRING,NULL);
4285 addReply(c,lenobj);
4286
4287 while(ln && ln->score <= max) {
4288 ele = ln->obj;
4289 addReplyBulkLen(c,ele);
4290 addReply(c,ele);
4291 addReply(c,shared.crlf);
4292 ln = ln->forward[0];
4293 rangelen++;
4294 }
4295 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4296 }
4297 }
4298 }
4299
4300 static void zcardCommand(redisClient *c) {
4301 robj *o;
4302 zset *zs;
4303
4304 o = lookupKeyRead(c->db,c->argv[1]);
4305 if (o == NULL) {
4306 addReply(c,shared.czero);
4307 return;
4308 } else {
4309 if (o->type != REDIS_ZSET) {
4310 addReply(c,shared.wrongtypeerr);
4311 } else {
4312 zs = o->ptr;
4313 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4314 }
4315 }
4316 }
4317
4318 static void zscoreCommand(redisClient *c) {
4319 robj *o;
4320 zset *zs;
4321
4322 o = lookupKeyRead(c->db,c->argv[1]);
4323 if (o == NULL) {
4324 addReply(c,shared.czero);
4325 return;
4326 } else {
4327 if (o->type != REDIS_ZSET) {
4328 addReply(c,shared.wrongtypeerr);
4329 } else {
4330 dictEntry *de;
4331
4332 zs = o->ptr;
4333 de = dictFind(zs->dict,c->argv[2]);
4334 if (!de) {
4335 addReply(c,shared.nullbulk);
4336 } else {
4337 char buf[128];
4338 double *score = dictGetEntryVal(de);
4339
4340 snprintf(buf,sizeof(buf),"%.16g",*score);
4341 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
4342 strlen(buf),buf));
4343 }
4344 }
4345 }
4346 }
4347
4348 /* ========================= Non type-specific commands ==================== */
4349
4350 static void flushdbCommand(redisClient *c) {
4351 server.dirty += dictSize(c->db->dict);
4352 dictEmpty(c->db->dict);
4353 dictEmpty(c->db->expires);
4354 addReply(c,shared.ok);
4355 }
4356
4357 static void flushallCommand(redisClient *c) {
4358 server.dirty += emptyDb();
4359 addReply(c,shared.ok);
4360 rdbSave(server.dbfilename);
4361 server.dirty++;
4362 }
4363
4364 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4365 redisSortOperation *so = zmalloc(sizeof(*so));
4366 so->type = type;
4367 so->pattern = pattern;
4368 return so;
4369 }
4370
4371 /* Return the value associated to the key with a name obtained
4372 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4373 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4374 char *p;
4375 sds spat, ssub;
4376 robj keyobj;
4377 int prefixlen, sublen, postfixlen;
4378 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4379 struct {
4380 long len;
4381 long free;
4382 char buf[REDIS_SORTKEY_MAX+1];
4383 } keyname;
4384
4385 if (subst->encoding == REDIS_ENCODING_RAW)
4386 incrRefCount(subst);
4387 else {
4388 subst = getDecodedObject(subst);
4389 }
4390
4391 spat = pattern->ptr;
4392 ssub = subst->ptr;
4393 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4394 p = strchr(spat,'*');
4395 if (!p) return NULL;
4396
4397 prefixlen = p-spat;
4398 sublen = sdslen(ssub);
4399 postfixlen = sdslen(spat)-(prefixlen+1);
4400 memcpy(keyname.buf,spat,prefixlen);
4401 memcpy(keyname.buf+prefixlen,ssub,sublen);
4402 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4403 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4404 keyname.len = prefixlen+sublen+postfixlen;
4405
4406 keyobj.refcount = 1;
4407 keyobj.type = REDIS_STRING;
4408 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4409
4410 decrRefCount(subst);
4411
4412 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4413 return lookupKeyRead(db,&keyobj);
4414 }
4415
4416 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4417 * the additional parameter is not standard but a BSD-specific we have to
4418 * pass sorting parameters via the global 'server' structure */
4419 static int sortCompare(const void *s1, const void *s2) {
4420 const redisSortObject *so1 = s1, *so2 = s2;
4421 int cmp;
4422
4423 if (!server.sort_alpha) {
4424 /* Numeric sorting. Here it's trivial as we precomputed scores */
4425 if (so1->u.score > so2->u.score) {
4426 cmp = 1;
4427 } else if (so1->u.score < so2->u.score) {
4428 cmp = -1;
4429 } else {
4430 cmp = 0;
4431 }
4432 } else {
4433 /* Alphanumeric sorting */
4434 if (server.sort_bypattern) {
4435 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4436 /* At least one compare object is NULL */
4437 if (so1->u.cmpobj == so2->u.cmpobj)
4438 cmp = 0;
4439 else if (so1->u.cmpobj == NULL)
4440 cmp = -1;
4441 else
4442 cmp = 1;
4443 } else {
4444 /* We have both the objects, use strcoll */
4445 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4446 }
4447 } else {
4448 /* Compare elements directly */
4449 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4450 so2->obj->encoding == REDIS_ENCODING_RAW) {
4451 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4452 } else {
4453 robj *dec1, *dec2;
4454
4455 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4456 so1->obj : getDecodedObject(so1->obj);
4457 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4458 so2->obj : getDecodedObject(so2->obj);
4459 cmp = strcoll(dec1->ptr,dec2->ptr);
4460 if (dec1 != so1->obj) decrRefCount(dec1);
4461 if (dec2 != so2->obj) decrRefCount(dec2);
4462 }
4463 }
4464 }
4465 return server.sort_desc ? -cmp : cmp;
4466 }
4467
4468 /* The SORT command is the most complex command in Redis. Warning: this code
4469 * is optimized for speed and a bit less for readability */
4470 static void sortCommand(redisClient *c) {
4471 list *operations;
4472 int outputlen = 0;
4473 int desc = 0, alpha = 0;
4474 int limit_start = 0, limit_count = -1, start, end;
4475 int j, dontsort = 0, vectorlen;
4476 int getop = 0; /* GET operation counter */
4477 robj *sortval, *sortby = NULL;
4478 redisSortObject *vector; /* Resulting vector to sort */
4479
4480 /* Lookup the key to sort. It must be of the right types */
4481 sortval = lookupKeyRead(c->db,c->argv[1]);
4482 if (sortval == NULL) {
4483 addReply(c,shared.nokeyerr);
4484 return;
4485 }
4486 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4487 addReply(c,shared.wrongtypeerr);
4488 return;
4489 }
4490
4491 /* Create a list of operations to perform for every sorted element.
4492 * Operations can be GET/DEL/INCR/DECR */
4493 operations = listCreate();
4494 listSetFreeMethod(operations,zfree);
4495 j = 2;
4496
4497 /* Now we need to protect sortval incrementing its count, in the future
4498 * SORT may have options able to overwrite/delete keys during the sorting
4499 * and the sorted key itself may get destroied */
4500 incrRefCount(sortval);
4501
4502 /* The SORT command has an SQL-alike syntax, parse it */
4503 while(j < c->argc) {
4504 int leftargs = c->argc-j-1;
4505 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4506 desc = 0;
4507 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4508 desc = 1;
4509 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4510 alpha = 1;
4511 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4512 limit_start = atoi(c->argv[j+1]->ptr);
4513 limit_count = atoi(c->argv[j+2]->ptr);
4514 j+=2;
4515 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4516 sortby = c->argv[j+1];
4517 /* If the BY pattern does not contain '*', i.e. it is constant,
4518 * we don't need to sort nor to lookup the weight keys. */
4519 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4520 j++;
4521 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4522 listAddNodeTail(operations,createSortOperation(
4523 REDIS_SORT_GET,c->argv[j+1]));
4524 getop++;
4525 j++;
4526 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
4527 listAddNodeTail(operations,createSortOperation(
4528 REDIS_SORT_DEL,c->argv[j+1]));
4529 j++;
4530 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
4531 listAddNodeTail(operations,createSortOperation(
4532 REDIS_SORT_INCR,c->argv[j+1]));
4533 j++;
4534 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4535 listAddNodeTail(operations,createSortOperation(
4536 REDIS_SORT_DECR,c->argv[j+1]));
4537 j++;
4538 } else {
4539 decrRefCount(sortval);
4540 listRelease(operations);
4541 addReply(c,shared.syntaxerr);
4542 return;
4543 }
4544 j++;
4545 }
4546
4547 /* Load the sorting vector with all the objects to sort */
4548 vectorlen = (sortval->type == REDIS_LIST) ?
4549 listLength((list*)sortval->ptr) :
4550 dictSize((dict*)sortval->ptr);
4551 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4552 j = 0;
4553 if (sortval->type == REDIS_LIST) {
4554 list *list = sortval->ptr;
4555 listNode *ln;
4556
4557 listRewind(list);
4558 while((ln = listYield(list))) {
4559 robj *ele = ln->value;
4560 vector[j].obj = ele;
4561 vector[j].u.score = 0;
4562 vector[j].u.cmpobj = NULL;
4563 j++;
4564 }
4565 } else {
4566 dict *set = sortval->ptr;
4567 dictIterator *di;
4568 dictEntry *setele;
4569
4570 di = dictGetIterator(set);
4571 while((setele = dictNext(di)) != NULL) {
4572 vector[j].obj = dictGetEntryKey(setele);
4573 vector[j].u.score = 0;
4574 vector[j].u.cmpobj = NULL;
4575 j++;
4576 }
4577 dictReleaseIterator(di);
4578 }
4579 assert(j == vectorlen);
4580
4581 /* Now it's time to load the right scores in the sorting vector */
4582 if (dontsort == 0) {
4583 for (j = 0; j < vectorlen; j++) {
4584 if (sortby) {
4585 robj *byval;
4586
4587 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4588 if (!byval || byval->type != REDIS_STRING) continue;
4589 if (alpha) {
4590 if (byval->encoding == REDIS_ENCODING_RAW) {
4591 vector[j].u.cmpobj = byval;
4592 incrRefCount(byval);
4593 } else {
4594 vector[j].u.cmpobj = getDecodedObject(byval);
4595 }
4596 } else {
4597 if (byval->encoding == REDIS_ENCODING_RAW) {
4598 vector[j].u.score = strtod(byval->ptr,NULL);
4599 } else {
4600 if (byval->encoding == REDIS_ENCODING_INT) {
4601 vector[j].u.score = (long)byval->ptr;
4602 } else
4603 assert(1 != 1);
4604 }
4605 }
4606 } else {
4607 if (!alpha) {
4608 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4609 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4610 else {
4611 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4612 vector[j].u.score = (long) vector[j].obj->ptr;
4613 else
4614 assert(1 != 1);
4615 }
4616 }
4617 }
4618 }
4619 }
4620
4621 /* We are ready to sort the vector... perform a bit of sanity check
4622 * on the LIMIT option too. We'll use a partial version of quicksort. */
4623 start = (limit_start < 0) ? 0 : limit_start;
4624 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4625 if (start >= vectorlen) {
4626 start = vectorlen-1;
4627 end = vectorlen-2;
4628 }
4629 if (end >= vectorlen) end = vectorlen-1;
4630
4631 if (dontsort == 0) {
4632 server.sort_desc = desc;
4633 server.sort_alpha = alpha;
4634 server.sort_bypattern = sortby ? 1 : 0;
4635 if (sortby && (start != 0 || end != vectorlen-1))
4636 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4637 else
4638 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4639 }
4640
4641 /* Send command output to the output buffer, performing the specified
4642 * GET/DEL/INCR/DECR operations if any. */
4643 outputlen = getop ? getop*(end-start+1) : end-start+1;
4644 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4645 for (j = start; j <= end; j++) {
4646 listNode *ln;
4647 if (!getop) {
4648 addReplyBulkLen(c,vector[j].obj);
4649 addReply(c,vector[j].obj);
4650 addReply(c,shared.crlf);
4651 }
4652 listRewind(operations);
4653 while((ln = listYield(operations))) {
4654 redisSortOperation *sop = ln->value;
4655 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4656 vector[j].obj);
4657
4658 if (sop->type == REDIS_SORT_GET) {
4659 if (!val || val->type != REDIS_STRING) {
4660 addReply(c,shared.nullbulk);
4661 } else {
4662 addReplyBulkLen(c,val);
4663 addReply(c,val);
4664 addReply(c,shared.crlf);
4665 }
4666 } else if (sop->type == REDIS_SORT_DEL) {
4667 /* TODO */
4668 }
4669 }
4670 }
4671
4672 /* Cleanup */
4673 decrRefCount(sortval);
4674 listRelease(operations);
4675 for (j = 0; j < vectorlen; j++) {
4676 if (sortby && alpha && vector[j].u.cmpobj)
4677 decrRefCount(vector[j].u.cmpobj);
4678 }
4679 zfree(vector);
4680 }
4681
4682 static void infoCommand(redisClient *c) {
4683 sds info;
4684 time_t uptime = time(NULL)-server.stat_starttime;
4685 int j;
4686
4687 info = sdscatprintf(sdsempty(),
4688 "redis_version:%s\r\n"
4689 "arch_bits:%s\r\n"
4690 "uptime_in_seconds:%d\r\n"
4691 "uptime_in_days:%d\r\n"
4692 "connected_clients:%d\r\n"
4693 "connected_slaves:%d\r\n"
4694 "used_memory:%zu\r\n"
4695 "changes_since_last_save:%lld\r\n"
4696 "bgsave_in_progress:%d\r\n"
4697 "last_save_time:%d\r\n"
4698 "total_connections_received:%lld\r\n"
4699 "total_commands_processed:%lld\r\n"
4700 "role:%s\r\n"
4701 ,REDIS_VERSION,
4702 (sizeof(long) == 8) ? "64" : "32",
4703 uptime,
4704 uptime/(3600*24),
4705 listLength(server.clients)-listLength(server.slaves),
4706 listLength(server.slaves),
4707 server.usedmemory,
4708 server.dirty,
4709 server.bgsaveinprogress,
4710 server.lastsave,
4711 server.stat_numconnections,
4712 server.stat_numcommands,
4713 server.masterhost == NULL ? "master" : "slave"
4714 );
4715 if (server.masterhost) {
4716 info = sdscatprintf(info,
4717 "master_host:%s\r\n"
4718 "master_port:%d\r\n"
4719 "master_link_status:%s\r\n"
4720 "master_last_io_seconds_ago:%d\r\n"
4721 ,server.masterhost,
4722 server.masterport,
4723 (server.replstate == REDIS_REPL_CONNECTED) ?
4724 "up" : "down",
4725 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
4726 );
4727 }
4728 for (j = 0; j < server.dbnum; j++) {
4729 long long keys, vkeys;
4730
4731 keys = dictSize(server.db[j].dict);
4732 vkeys = dictSize(server.db[j].expires);
4733 if (keys || vkeys) {
4734 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4735 j, keys, vkeys);
4736 }
4737 }
4738 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4739 addReplySds(c,info);
4740 addReply(c,shared.crlf);
4741 }
4742
4743 static void monitorCommand(redisClient *c) {
4744 /* ignore MONITOR if aleady slave or in monitor mode */
4745 if (c->flags & REDIS_SLAVE) return;
4746
4747 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4748 c->slaveseldb = 0;
4749 listAddNodeTail(server.monitors,c);
4750 addReply(c,shared.ok);
4751 }
4752
4753 /* ================================= Expire ================================= */
4754 static int removeExpire(redisDb *db, robj *key) {
4755 if (dictDelete(db->expires,key) == DICT_OK) {
4756 return 1;
4757 } else {
4758 return 0;
4759 }
4760 }
4761
4762 static int setExpire(redisDb *db, robj *key, time_t when) {
4763 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4764 return 0;
4765 } else {
4766 incrRefCount(key);
4767 return 1;
4768 }
4769 }
4770
4771 /* Return the expire time of the specified key, or -1 if no expire
4772 * is associated with this key (i.e. the key is non volatile) */
4773 static time_t getExpire(redisDb *db, robj *key) {
4774 dictEntry *de;
4775
4776 /* No expire? return ASAP */
4777 if (dictSize(db->expires) == 0 ||
4778 (de = dictFind(db->expires,key)) == NULL) return -1;
4779
4780 return (time_t) dictGetEntryVal(de);
4781 }
4782
4783 static int expireIfNeeded(redisDb *db, robj *key) {
4784 time_t when;
4785 dictEntry *de;
4786
4787 /* No expire? return ASAP */
4788 if (dictSize(db->expires) == 0 ||
4789 (de = dictFind(db->expires,key)) == NULL) return 0;
4790
4791 /* Lookup the expire */
4792 when = (time_t) dictGetEntryVal(de);
4793 if (time(NULL) <= when) return 0;
4794
4795 /* Delete the key */
4796 dictDelete(db->expires,key);
4797 return dictDelete(db->dict,key) == DICT_OK;
4798 }
4799
4800 static int deleteIfVolatile(redisDb *db, robj *key) {
4801 dictEntry *de;
4802
4803 /* No expire? return ASAP */
4804 if (dictSize(db->expires) == 0 ||
4805 (de = dictFind(db->expires,key)) == NULL) return 0;
4806
4807 /* Delete the key */
4808 server.dirty++;
4809 dictDelete(db->expires,key);
4810 return dictDelete(db->dict,key) == DICT_OK;
4811 }
4812
4813 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
4814 dictEntry *de;
4815
4816 de = dictFind(c->db->dict,key);
4817 if (de == NULL) {
4818 addReply(c,shared.czero);
4819 return;
4820 }
4821 if (seconds < 0) {
4822 if (deleteKey(c->db,key)) server.dirty++;
4823 addReply(c, shared.cone);
4824 return;
4825 } else {
4826 time_t when = time(NULL)+seconds;
4827 if (setExpire(c->db,key,when)) {
4828 addReply(c,shared.cone);
4829 server.dirty++;
4830 } else {
4831 addReply(c,shared.czero);
4832 }
4833 return;
4834 }
4835 }
4836
4837 static void expireCommand(redisClient *c) {
4838 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
4839 }
4840
4841 static void expireatCommand(redisClient *c) {
4842 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
4843 }
4844
4845 static void ttlCommand(redisClient *c) {
4846 time_t expire;
4847 int ttl = -1;
4848
4849 expire = getExpire(c->db,c->argv[1]);
4850 if (expire != -1) {
4851 ttl = (int) (expire-time(NULL));
4852 if (ttl < 0) ttl = -1;
4853 }
4854 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4855 }
4856
4857 static void msetGenericCommand(redisClient *c, int nx) {
4858 int j;
4859
4860 if ((c->argc % 2) == 0) {
4861 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4862 return;
4863 }
4864 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4865 * set nothing at all if at least one already key exists. */
4866 if (nx) {
4867 for (j = 1; j < c->argc; j += 2) {
4868 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4869 addReply(c, shared.czero);
4870 return;
4871 }
4872 }
4873 }
4874
4875 for (j = 1; j < c->argc; j += 2) {
4876 int retval;
4877
4878 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4879 if (retval == DICT_ERR) {
4880 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4881 incrRefCount(c->argv[j+1]);
4882 } else {
4883 incrRefCount(c->argv[j]);
4884 incrRefCount(c->argv[j+1]);
4885 }
4886 removeExpire(c->db,c->argv[j]);
4887 }
4888 server.dirty += (c->argc-1)/2;
4889 addReply(c, nx ? shared.cone : shared.ok);
4890 }
4891
4892 static void msetCommand(redisClient *c) {
4893 msetGenericCommand(c,0);
4894 }
4895
4896 static void msetnxCommand(redisClient *c) {
4897 msetGenericCommand(c,1);
4898 }
4899
4900 /* =============================== Replication ============================= */
4901
4902 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4903 ssize_t nwritten, ret = size;
4904 time_t start = time(NULL);
4905
4906 timeout++;
4907 while(size) {
4908 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4909 nwritten = write(fd,ptr,size);
4910 if (nwritten == -1) return -1;
4911 ptr += nwritten;
4912 size -= nwritten;
4913 }
4914 if ((time(NULL)-start) > timeout) {
4915 errno = ETIMEDOUT;
4916 return -1;
4917 }
4918 }
4919 return ret;
4920 }
4921
4922 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4923 ssize_t nread, totread = 0;
4924 time_t start = time(NULL);
4925
4926 timeout++;
4927 while(size) {
4928 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4929 nread = read(fd,ptr,size);
4930 if (nread == -1) return -1;
4931 ptr += nread;
4932 size -= nread;
4933 totread += nread;
4934 }
4935 if ((time(NULL)-start) > timeout) {
4936 errno = ETIMEDOUT;
4937 return -1;
4938 }
4939 }
4940 return totread;
4941 }
4942
4943 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4944 ssize_t nread = 0;
4945
4946 size--;
4947 while(size) {
4948 char c;
4949
4950 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4951 if (c == '\n') {
4952 *ptr = '\0';
4953 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4954 return nread;
4955 } else {
4956 *ptr++ = c;
4957 *ptr = '\0';
4958 nread++;
4959 }
4960 }
4961 return nread;
4962 }
4963
4964 static void syncCommand(redisClient *c) {
4965 /* ignore SYNC if aleady slave or in monitor mode */
4966 if (c->flags & REDIS_SLAVE) return;
4967
4968 /* SYNC can't be issued when the server has pending data to send to
4969 * the client about already issued commands. We need a fresh reply
4970 * buffer registering the differences between the BGSAVE and the current
4971 * dataset, so that we can copy to other slaves if needed. */
4972 if (listLength(c->reply) != 0) {
4973 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4974 return;
4975 }
4976
4977 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4978 /* Here we need to check if there is a background saving operation
4979 * in progress, or if it is required to start one */
4980 if (server.bgsaveinprogress) {
4981 /* Ok a background save is in progress. Let's check if it is a good
4982 * one for replication, i.e. if there is another slave that is
4983 * registering differences since the server forked to save */
4984 redisClient *slave;
4985 listNode *ln;
4986
4987 listRewind(server.slaves);
4988 while((ln = listYield(server.slaves))) {
4989 slave = ln->value;
4990 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4991 }
4992 if (ln) {
4993 /* Perfect, the server is already registering differences for
4994 * another slave. Set the right state, and copy the buffer. */
4995 listRelease(c->reply);
4996 c->reply = listDup(slave->reply);
4997 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4998 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4999 } else {
5000 /* No way, we need to wait for the next BGSAVE in order to
5001 * register differences */
5002 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5003 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5004 }
5005 } else {
5006 /* Ok we don't have a BGSAVE in progress, let's start one */
5007 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5008 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5009 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5010 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5011 return;
5012 }
5013 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5014 }
5015 c->repldbfd = -1;
5016 c->flags |= REDIS_SLAVE;
5017 c->slaveseldb = 0;
5018 listAddNodeTail(server.slaves,c);
5019 return;
5020 }
5021
5022 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5023 redisClient *slave = privdata;
5024 REDIS_NOTUSED(el);
5025 REDIS_NOTUSED(mask);
5026 char buf[REDIS_IOBUF_LEN];
5027 ssize_t nwritten, buflen;
5028
5029 if (slave->repldboff == 0) {
5030 /* Write the bulk write count before to transfer the DB. In theory here
5031 * we don't know how much room there is in the output buffer of the
5032 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5033 * operations) will never be smaller than the few bytes we need. */
5034 sds bulkcount;
5035
5036 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5037 slave->repldbsize);
5038 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5039 {
5040 sdsfree(bulkcount);
5041 freeClient(slave);
5042 return;
5043 }
5044 sdsfree(bulkcount);
5045 }
5046 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5047 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5048 if (buflen <= 0) {
5049 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5050 (buflen == 0) ? "premature EOF" : strerror(errno));
5051 freeClient(slave);
5052 return;
5053 }
5054 if ((nwritten = write(fd,buf,buflen)) == -1) {
5055 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5056 strerror(errno));
5057 freeClient(slave);
5058 return;
5059 }
5060 slave->repldboff += nwritten;
5061 if (slave->repldboff == slave->repldbsize) {
5062 close(slave->repldbfd);
5063 slave->repldbfd = -1;
5064 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5065 slave->replstate = REDIS_REPL_ONLINE;
5066 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5067 sendReplyToClient, slave, NULL) == AE_ERR) {
5068 freeClient(slave);
5069 return;
5070 }
5071 addReplySds(slave,sdsempty());
5072 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5073 }
5074 }
5075
5076 /* This function is called at the end of every backgrond saving.
5077 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5078 * otherwise REDIS_ERR is passed to the function.
5079 *
5080 * The goal of this function is to handle slaves waiting for a successful
5081 * background saving in order to perform non-blocking synchronization. */
5082 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5083 listNode *ln;
5084 int startbgsave = 0;
5085
5086 listRewind(server.slaves);
5087 while((ln = listYield(server.slaves))) {
5088 redisClient *slave = ln->value;
5089
5090 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5091 startbgsave = 1;
5092 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5093 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5094 struct redis_stat buf;
5095
5096 if (bgsaveerr != REDIS_OK) {
5097 freeClient(slave);
5098 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5099 continue;
5100 }
5101 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5102 redis_fstat(slave->repldbfd,&buf) == -1) {
5103 freeClient(slave);
5104 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5105 continue;
5106 }
5107 slave->repldboff = 0;
5108 slave->repldbsize = buf.st_size;
5109 slave->replstate = REDIS_REPL_SEND_BULK;
5110 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5111 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
5112 freeClient(slave);
5113 continue;
5114 }
5115 }
5116 }
5117 if (startbgsave) {
5118 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5119 listRewind(server.slaves);
5120 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5121 while((ln = listYield(server.slaves))) {
5122 redisClient *slave = ln->value;
5123
5124 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5125 freeClient(slave);
5126 }
5127 }
5128 }
5129 }
5130
5131 static int syncWithMaster(void) {
5132 char buf[1024], tmpfile[256];
5133 int dumpsize;
5134 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5135 int dfd;
5136
5137 if (fd == -1) {
5138 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5139 strerror(errno));
5140 return REDIS_ERR;
5141 }
5142 /* Issue the SYNC command */
5143 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5144 close(fd);
5145 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5146 strerror(errno));
5147 return REDIS_ERR;
5148 }
5149 /* Read the bulk write count */
5150 if (syncReadLine(fd,buf,1024,3600) == -1) {
5151 close(fd);
5152 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5153 strerror(errno));
5154 return REDIS_ERR;
5155 }
5156 if (buf[0] != '$') {
5157 close(fd);
5158 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5159 return REDIS_ERR;
5160 }
5161 dumpsize = atoi(buf+1);
5162 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5163 /* Read the bulk write data on a temp file */
5164 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5165 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5166 if (dfd == -1) {
5167 close(fd);
5168 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5169 return REDIS_ERR;
5170 }
5171 while(dumpsize) {
5172 int nread, nwritten;
5173
5174 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5175 if (nread == -1) {
5176 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5177 strerror(errno));
5178 close(fd);
5179 close(dfd);
5180 return REDIS_ERR;
5181 }
5182 nwritten = write(dfd,buf,nread);
5183 if (nwritten == -1) {
5184 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5185 close(fd);
5186 close(dfd);
5187 return REDIS_ERR;
5188 }
5189 dumpsize -= nread;
5190 }
5191 close(dfd);
5192 if (rename(tmpfile,server.dbfilename) == -1) {
5193 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5194 unlink(tmpfile);
5195 close(fd);
5196 return REDIS_ERR;
5197 }
5198 emptyDb();
5199 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5200 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5201 close(fd);
5202 return REDIS_ERR;
5203 }
5204 server.master = createClient(fd);
5205 server.master->flags |= REDIS_MASTER;
5206 server.replstate = REDIS_REPL_CONNECTED;
5207 return REDIS_OK;
5208 }
5209
5210 static void slaveofCommand(redisClient *c) {
5211 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5212 !strcasecmp(c->argv[2]->ptr,"one")) {
5213 if (server.masterhost) {
5214 sdsfree(server.masterhost);
5215 server.masterhost = NULL;
5216 if (server.master) freeClient(server.master);
5217 server.replstate = REDIS_REPL_NONE;
5218 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5219 }
5220 } else {
5221 sdsfree(server.masterhost);
5222 server.masterhost = sdsdup(c->argv[1]->ptr);
5223 server.masterport = atoi(c->argv[2]->ptr);
5224 if (server.master) freeClient(server.master);
5225 server.replstate = REDIS_REPL_CONNECT;
5226 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5227 server.masterhost, server.masterport);
5228 }
5229 addReply(c,shared.ok);
5230 }
5231
5232 /* ============================ Maxmemory directive ======================== */
5233
5234 /* This function gets called when 'maxmemory' is set on the config file to limit
5235 * the max memory used by the server, and we are out of memory.
5236 * This function will try to, in order:
5237 *
5238 * - Free objects from the free list
5239 * - Try to remove keys with an EXPIRE set
5240 *
5241 * It is not possible to free enough memory to reach used-memory < maxmemory
5242 * the server will start refusing commands that will enlarge even more the
5243 * memory usage.
5244 */
5245 static void freeMemoryIfNeeded(void) {
5246 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5247 if (listLength(server.objfreelist)) {
5248 robj *o;
5249
5250 listNode *head = listFirst(server.objfreelist);
5251 o = listNodeValue(head);
5252 listDelNode(server.objfreelist,head);
5253 zfree(o);
5254 } else {
5255 int j, k, freed = 0;
5256
5257 for (j = 0; j < server.dbnum; j++) {
5258 int minttl = -1;
5259 robj *minkey = NULL;
5260 struct dictEntry *de;
5261
5262 if (dictSize(server.db[j].expires)) {
5263 freed = 1;
5264 /* From a sample of three keys drop the one nearest to
5265 * the natural expire */
5266 for (k = 0; k < 3; k++) {
5267 time_t t;
5268
5269 de = dictGetRandomKey(server.db[j].expires);
5270 t = (time_t) dictGetEntryVal(de);
5271 if (minttl == -1 || t < minttl) {
5272 minkey = dictGetEntryKey(de);
5273 minttl = t;
5274 }
5275 }
5276 deleteKey(server.db+j,minkey);
5277 }
5278 }
5279 if (!freed) return; /* nothing to free... */
5280 }
5281 }
5282 }
5283
5284 /* ================================= Debugging ============================== */
5285
5286 static void debugCommand(redisClient *c) {
5287 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
5288 *((char*)-1) = 'x';
5289 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
5290 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
5291 robj *key, *val;
5292
5293 if (!de) {
5294 addReply(c,shared.nokeyerr);
5295 return;
5296 }
5297 key = dictGetEntryKey(de);
5298 val = dictGetEntryVal(de);
5299 addReplySds(c,sdscatprintf(sdsempty(),
5300 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5301 key, key->refcount, val, val->refcount, val->encoding));
5302 } else {
5303 addReplySds(c,sdsnew(
5304 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5305 }
5306 }
5307
5308 #ifdef HAVE_BACKTRACE
5309 static struct redisFunctionSym symsTable[] = {
5310 {"compareStringObjects", (unsigned long)compareStringObjects},
5311 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
5312 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
5313 {"dictEncObjHash", (unsigned long)dictEncObjHash},
5314 {"incrDecrCommand", (unsigned long)incrDecrCommand},
5315 {"freeStringObject", (unsigned long)freeStringObject},
5316 {"freeListObject", (unsigned long)freeListObject},
5317 {"freeSetObject", (unsigned long)freeSetObject},
5318 {"decrRefCount", (unsigned long)decrRefCount},
5319 {"createObject", (unsigned long)createObject},
5320 {"freeClient", (unsigned long)freeClient},
5321 {"rdbLoad", (unsigned long)rdbLoad},
5322 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
5323 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
5324 {"addReply", (unsigned long)addReply},
5325 {"addReplySds", (unsigned long)addReplySds},
5326 {"incrRefCount", (unsigned long)incrRefCount},
5327 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
5328 {"createStringObject", (unsigned long)createStringObject},
5329 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
5330 {"syncWithMaster", (unsigned long)syncWithMaster},
5331 {"tryObjectSharing", (unsigned long)tryObjectSharing},
5332 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
5333 {"getDecodedObject", (unsigned long)getDecodedObject},
5334 {"removeExpire", (unsigned long)removeExpire},
5335 {"expireIfNeeded", (unsigned long)expireIfNeeded},
5336 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
5337 {"deleteKey", (unsigned long)deleteKey},
5338 {"getExpire", (unsigned long)getExpire},
5339 {"setExpire", (unsigned long)setExpire},
5340 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
5341 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
5342 {"authCommand", (unsigned long)authCommand},
5343 {"pingCommand", (unsigned long)pingCommand},
5344 {"echoCommand", (unsigned long)echoCommand},
5345 {"setCommand", (unsigned long)setCommand},
5346 {"setnxCommand", (unsigned long)setnxCommand},
5347 {"getCommand", (unsigned long)getCommand},
5348 {"delCommand", (unsigned long)delCommand},
5349 {"existsCommand", (unsigned long)existsCommand},
5350 {"incrCommand", (unsigned long)incrCommand},
5351 {"decrCommand", (unsigned long)decrCommand},
5352 {"incrbyCommand", (unsigned long)incrbyCommand},
5353 {"decrbyCommand", (unsigned long)decrbyCommand},
5354 {"selectCommand", (unsigned long)selectCommand},
5355 {"randomkeyCommand", (unsigned long)randomkeyCommand},
5356 {"keysCommand", (unsigned long)keysCommand},
5357 {"dbsizeCommand", (unsigned long)dbsizeCommand},
5358 {"lastsaveCommand", (unsigned long)lastsaveCommand},
5359 {"saveCommand", (unsigned long)saveCommand},
5360 {"bgsaveCommand", (unsigned long)bgsaveCommand},
5361 {"shutdownCommand", (unsigned long)shutdownCommand},
5362 {"moveCommand", (unsigned long)moveCommand},
5363 {"renameCommand", (unsigned long)renameCommand},
5364 {"renamenxCommand", (unsigned long)renamenxCommand},
5365 {"lpushCommand", (unsigned long)lpushCommand},
5366 {"rpushCommand", (unsigned long)rpushCommand},
5367 {"lpopCommand", (unsigned long)lpopCommand},
5368 {"rpopCommand", (unsigned long)rpopCommand},
5369 {"llenCommand", (unsigned long)llenCommand},
5370 {"lindexCommand", (unsigned long)lindexCommand},
5371 {"lrangeCommand", (unsigned long)lrangeCommand},
5372 {"ltrimCommand", (unsigned long)ltrimCommand},
5373 {"typeCommand", (unsigned long)typeCommand},
5374 {"lsetCommand", (unsigned long)lsetCommand},
5375 {"saddCommand", (unsigned long)saddCommand},
5376 {"sremCommand", (unsigned long)sremCommand},
5377 {"smoveCommand", (unsigned long)smoveCommand},
5378 {"sismemberCommand", (unsigned long)sismemberCommand},
5379 {"scardCommand", (unsigned long)scardCommand},
5380 {"spopCommand", (unsigned long)spopCommand},
5381 {"srandmemberCommand", (unsigned long)srandmemberCommand},
5382 {"sinterCommand", (unsigned long)sinterCommand},
5383 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
5384 {"sunionCommand", (unsigned long)sunionCommand},
5385 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
5386 {"sdiffCommand", (unsigned long)sdiffCommand},
5387 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
5388 {"syncCommand", (unsigned long)syncCommand},
5389 {"flushdbCommand", (unsigned long)flushdbCommand},
5390 {"flushallCommand", (unsigned long)flushallCommand},
5391 {"sortCommand", (unsigned long)sortCommand},
5392 {"lremCommand", (unsigned long)lremCommand},
5393 {"infoCommand", (unsigned long)infoCommand},
5394 {"mgetCommand", (unsigned long)mgetCommand},
5395 {"monitorCommand", (unsigned long)monitorCommand},
5396 {"expireCommand", (unsigned long)expireCommand},
5397 {"expireatCommand", (unsigned long)expireatCommand},
5398 {"getsetCommand", (unsigned long)getsetCommand},
5399 {"ttlCommand", (unsigned long)ttlCommand},
5400 {"slaveofCommand", (unsigned long)slaveofCommand},
5401 {"debugCommand", (unsigned long)debugCommand},
5402 {"processCommand", (unsigned long)processCommand},
5403 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
5404 {"readQueryFromClient", (unsigned long)readQueryFromClient},
5405 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
5406 {"msetGenericCommand", (unsigned long)msetGenericCommand},
5407 {"msetCommand", (unsigned long)msetCommand},
5408 {"msetnxCommand", (unsigned long)msetnxCommand},
5409 {"zslCreateNode", (unsigned long)zslCreateNode},
5410 {"zslCreate", (unsigned long)zslCreate},
5411 {"zslFreeNode",(unsigned long)zslFreeNode},
5412 {"zslFree",(unsigned long)zslFree},
5413 {"zslRandomLevel",(unsigned long)zslRandomLevel},
5414 {"zslInsert",(unsigned long)zslInsert},
5415 {"zslDelete",(unsigned long)zslDelete},
5416 {"createZsetObject",(unsigned long)createZsetObject},
5417 {"zaddCommand",(unsigned long)zaddCommand},
5418 {"zrangeGenericCommand",(unsigned long)zrangeGenericCommand},
5419 {"zrangeCommand",(unsigned long)zrangeCommand},
5420 {"zrevrangeCommand",(unsigned long)zrevrangeCommand},
5421 {"zremCommand",(unsigned long)zremCommand},
5422 {"rdbSaveDoubleValue",(unsigned long)rdbSaveDoubleValue},
5423 {"rdbLoadDoubleValue",(unsigned long)rdbLoadDoubleValue},
5424 {NULL,0}
5425 };
5426
5427 /* This function try to convert a pointer into a function name. It's used in
5428 * oreder to provide a backtrace under segmentation fault that's able to
5429 * display functions declared as static (otherwise the backtrace is useless). */
5430 static char *findFuncName(void *pointer, unsigned long *offset){
5431 int i, ret = -1;
5432 unsigned long off, minoff = 0;
5433
5434 /* Try to match against the Symbol with the smallest offset */
5435 for (i=0; symsTable[i].pointer; i++) {
5436 unsigned long lp = (unsigned long) pointer;
5437
5438 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5439 off=lp-symsTable[i].pointer;
5440 if (ret < 0 || off < minoff) {
5441 minoff=off;
5442 ret=i;
5443 }
5444 }
5445 }
5446 if (ret == -1) return NULL;
5447 *offset = minoff;
5448 return symsTable[ret].name;
5449 }
5450
5451 static void *getMcontextEip(ucontext_t *uc) {
5452 #if defined(__FreeBSD__)
5453 return (void*) uc->uc_mcontext.mc_eip;
5454 #elif defined(__dietlibc__)
5455 return (void*) uc->uc_mcontext.eip;
5456 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5457 return (void*) uc->uc_mcontext->__ss.__eip;
5458 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5459 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5460 return (void*) uc->uc_mcontext->__ss.__rip;
5461 #else
5462 return (void*) uc->uc_mcontext->__ss.__eip;
5463 #endif
5464 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5465 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5466 #elif defined(__ia64__) /* Linux IA64 */
5467 return (void*) uc->uc_mcontext.sc_ip;
5468 #else
5469 return NULL;
5470 #endif
5471 }
5472
5473 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5474 void *trace[100];
5475 char **messages = NULL;
5476 int i, trace_size = 0;
5477 unsigned long offset=0;
5478 time_t uptime = time(NULL)-server.stat_starttime;
5479 ucontext_t *uc = (ucontext_t*) secret;
5480 REDIS_NOTUSED(info);
5481
5482 redisLog(REDIS_WARNING,
5483 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5484 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5485 "redis_version:%s; "
5486 "uptime_in_seconds:%d; "
5487 "connected_clients:%d; "
5488 "connected_slaves:%d; "
5489 "used_memory:%zu; "
5490 "changes_since_last_save:%lld; "
5491 "bgsave_in_progress:%d; "
5492 "last_save_time:%d; "
5493 "total_connections_received:%lld; "
5494 "total_commands_processed:%lld; "
5495 "role:%s;"
5496 ,REDIS_VERSION,
5497 uptime,
5498 listLength(server.clients)-listLength(server.slaves),
5499 listLength(server.slaves),
5500 server.usedmemory,
5501 server.dirty,
5502 server.bgsaveinprogress,
5503 server.lastsave,
5504 server.stat_numconnections,
5505 server.stat_numcommands,
5506 server.masterhost == NULL ? "master" : "slave"
5507 ));
5508
5509 trace_size = backtrace(trace, 100);
5510 /* overwrite sigaction with caller's address */
5511 if (getMcontextEip(uc) != NULL) {
5512 trace[1] = getMcontextEip(uc);
5513 }
5514 messages = backtrace_symbols(trace, trace_size);
5515
5516 for (i=1; i<trace_size; ++i) {
5517 char *fn = findFuncName(trace[i], &offset), *p;
5518
5519 p = strchr(messages[i],'+');
5520 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5521 redisLog(REDIS_WARNING,"%s", messages[i]);
5522 } else {
5523 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5524 }
5525 }
5526 free(messages);
5527 exit(0);
5528 }
5529
5530 static void setupSigSegvAction(void) {
5531 struct sigaction act;
5532
5533 sigemptyset (&act.sa_mask);
5534 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5535 * is used. Otherwise, sa_handler is used */
5536 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5537 act.sa_sigaction = segvHandler;
5538 sigaction (SIGSEGV, &act, NULL);
5539 sigaction (SIGBUS, &act, NULL);
5540 sigaction (SIGFPE, &act, NULL);
5541 sigaction (SIGILL, &act, NULL);
5542 sigaction (SIGBUS, &act, NULL);
5543 return;
5544 }
5545 #else /* HAVE_BACKTRACE */
5546 static void setupSigSegvAction(void) {
5547 }
5548 #endif /* HAVE_BACKTRACE */
5549
5550 /* =================================== Main! ================================ */
5551
5552 #ifdef __linux__
5553 int linuxOvercommitMemoryValue(void) {
5554 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5555 char buf[64];
5556
5557 if (!fp) return -1;
5558 if (fgets(buf,64,fp) == NULL) {
5559 fclose(fp);
5560 return -1;
5561 }
5562 fclose(fp);
5563
5564 return atoi(buf);
5565 }
5566
5567 void linuxOvercommitMemoryWarning(void) {
5568 if (linuxOvercommitMemoryValue() == 0) {
5569 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5570 }
5571 }
5572 #endif /* __linux__ */
5573
5574 static void daemonize(void) {
5575 int fd;
5576 FILE *fp;
5577
5578 if (fork() != 0) exit(0); /* parent exits */
5579 setsid(); /* create a new session */
5580
5581 /* Every output goes to /dev/null. If Redis is daemonized but
5582 * the 'logfile' is set to 'stdout' in the configuration file
5583 * it will not log at all. */
5584 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5585 dup2(fd, STDIN_FILENO);
5586 dup2(fd, STDOUT_FILENO);
5587 dup2(fd, STDERR_FILENO);
5588 if (fd > STDERR_FILENO) close(fd);
5589 }
5590 /* Try to write the pid file */
5591 fp = fopen(server.pidfile,"w");
5592 if (fp) {
5593 fprintf(fp,"%d\n",getpid());
5594 fclose(fp);
5595 }
5596 }
5597
5598 int main(int argc, char **argv) {
5599 initServerConfig();
5600 if (argc == 2) {
5601 ResetServerSaveParams();
5602 loadServerConfig(argv[1]);
5603 } else if (argc > 2) {
5604 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5605 exit(1);
5606 } else {
5607 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5608 }
5609 initServer();
5610 if (server.daemonize) daemonize();
5611 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5612 #ifdef __linux__
5613 linuxOvercommitMemoryWarning();
5614 #endif
5615 if (rdbLoad(server.dbfilename) == REDIS_OK)
5616 redisLog(REDIS_NOTICE,"DB loaded from disk");
5617 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5618 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5619 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5620 aeMain(server.el);
5621 aeDeleteEventLoop(server.el);
5622 return 0;
5623 }