38b0b686745d70722d36492937e5dfbacb3437c6
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60 #include <math.h>
61
62 #if defined(__sun)
63 #include "solarisfixes.h"
64 #endif
65
66 #include "redis.h"
67 #include "ae.h" /* Event driven programming library */
68 #include "sds.h" /* Dynamic safe strings */
69 #include "anet.h" /* Networking the easy way */
70 #include "dict.h" /* Hash tables */
71 #include "adlist.h" /* Linked lists */
72 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
73 #include "lzf.h" /* LZF compression library */
74 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
75
76 /* Error codes */
77 #define REDIS_OK 0
78 #define REDIS_ERR -1
79
80 /* Static server configuration */
81 #define REDIS_SERVERPORT 6379 /* TCP port */
82 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
83 #define REDIS_IOBUF_LEN 1024
84 #define REDIS_LOADBUF_LEN 1024
85 #define REDIS_STATIC_ARGS 4
86 #define REDIS_DEFAULT_DBNUM 16
87 #define REDIS_CONFIGLINE_MAX 1024
88 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
89 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
90 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
91 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
92 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
93
94 /* Hash table parameters */
95 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
96
97 /* Command flags */
98 #define REDIS_CMD_BULK 1 /* Bulk write command */
99 #define REDIS_CMD_INLINE 2 /* Inline command */
100 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
101 this flags will return an error when the 'maxmemory' option is set in the
102 config file and the server is using more than maxmemory bytes of memory.
103 In short this commands are denied on low memory conditions. */
104 #define REDIS_CMD_DENYOOM 4
105
106 /* Object types */
107 #define REDIS_STRING 0
108 #define REDIS_LIST 1
109 #define REDIS_SET 2
110 #define REDIS_ZSET 3
111 #define REDIS_HASH 4
112
113 /* Objects encoding */
114 #define REDIS_ENCODING_RAW 0 /* Raw representation */
115 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
116
117 /* Object types only used for dumping to disk */
118 #define REDIS_EXPIRETIME 253
119 #define REDIS_SELECTDB 254
120 #define REDIS_EOF 255
121
122 /* Defines related to the dump file format. To store 32 bits lengths for short
123 * keys requires a lot of space, so we check the most significant 2 bits of
124 * the first byte to interpreter the length:
125 *
126 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
127 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
128 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
129 * 11|000000 this means: specially encoded object will follow. The six bits
130 * number specify the kind of object that follows.
131 * See the REDIS_RDB_ENC_* defines.
132 *
133 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
134 * values, will fit inside. */
135 #define REDIS_RDB_6BITLEN 0
136 #define REDIS_RDB_14BITLEN 1
137 #define REDIS_RDB_32BITLEN 2
138 #define REDIS_RDB_ENCVAL 3
139 #define REDIS_RDB_LENERR UINT_MAX
140
141 /* When a length of a string object stored on disk has the first two bits
142 * set, the remaining two bits specify a special encoding for the object
143 * accordingly to the following defines: */
144 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
145 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
146 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
147 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
148
149 /* Client flags */
150 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
151 #define REDIS_SLAVE 2 /* This client is a slave server */
152 #define REDIS_MASTER 4 /* This client is a master server */
153 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
154
155 /* Slave replication state - slave side */
156 #define REDIS_REPL_NONE 0 /* No active replication */
157 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
158 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
159
160 /* Slave replication state - from the point of view of master
161 * Note that in SEND_BULK and ONLINE state the slave receives new updates
162 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
163 * to start the next background saving in order to send updates to it. */
164 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
165 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
166 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
167 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
168
169 /* List related stuff */
170 #define REDIS_HEAD 0
171 #define REDIS_TAIL 1
172
173 /* Sort operations */
174 #define REDIS_SORT_GET 0
175 #define REDIS_SORT_DEL 1
176 #define REDIS_SORT_INCR 2
177 #define REDIS_SORT_DECR 3
178 #define REDIS_SORT_ASC 4
179 #define REDIS_SORT_DESC 5
180 #define REDIS_SORTKEY_MAX 1024
181
182 /* Log levels */
183 #define REDIS_DEBUG 0
184 #define REDIS_NOTICE 1
185 #define REDIS_WARNING 2
186
187 /* Anti-warning macro... */
188 #define REDIS_NOTUSED(V) ((void) V)
189
190 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
191 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
192
193 /* Append only defines */
194 #define APPENDFSYNC_NO 0
195 #define APPENDFSYNC_ALWAYS 1
196 #define APPENDFSYNC_EVERYSEC 2
197
198 /*================================= Data types ============================== */
199
200 /* A redis object, that is a type able to hold a string / list / set */
201 typedef struct redisObject {
202 void *ptr;
203 unsigned char type;
204 unsigned char encoding;
205 unsigned char notused[2];
206 int refcount;
207 } robj;
208
209 typedef struct redisDb {
210 dict *dict;
211 dict *expires;
212 int id;
213 } redisDb;
214
215 /* With multiplexing we need to take per-clinet state.
216 * Clients are taken in a liked list. */
217 typedef struct redisClient {
218 int fd;
219 redisDb *db;
220 int dictid;
221 sds querybuf;
222 robj **argv, **mbargv;
223 int argc, mbargc;
224 int bulklen; /* bulk read len. -1 if not in bulk read mode */
225 int multibulk; /* multi bulk command format active */
226 list *reply;
227 int sentlen;
228 time_t lastinteraction; /* time of the last interaction, used for timeout */
229 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
230 int slaveseldb; /* slave selected db, if this client is a slave */
231 int authenticated; /* when requirepass is non-NULL */
232 int replstate; /* replication state if this is a slave */
233 int repldbfd; /* replication DB file descriptor */
234 long repldboff; /* replication DB file offset */
235 off_t repldbsize; /* replication DB file size */
236 } redisClient;
237
238 struct saveparam {
239 time_t seconds;
240 int changes;
241 };
242
243 /* Global server state structure */
244 struct redisServer {
245 int port;
246 int fd;
247 redisDb *db;
248 dict *sharingpool;
249 unsigned int sharingpoolsize;
250 long long dirty; /* changes to DB from the last save */
251 list *clients;
252 list *slaves, *monitors;
253 char neterr[ANET_ERR_LEN];
254 aeEventLoop *el;
255 int cronloops; /* number of times the cron function run */
256 list *objfreelist; /* A list of freed objects to avoid malloc() */
257 time_t lastsave; /* Unix time of last save succeeede */
258 size_t usedmemory; /* Used memory in megabytes */
259 /* Fields used only for stats */
260 time_t stat_starttime; /* server start time */
261 long long stat_numcommands; /* number of processed commands */
262 long long stat_numconnections; /* number of connections received */
263 /* Configuration */
264 int verbosity;
265 int glueoutputbuf;
266 int maxidletime;
267 int dbnum;
268 int daemonize;
269 int appendonly;
270 int appendfsync;
271 time_t lastfsync;
272 int appendfd;
273 int appendseldb;
274 char *pidfile;
275 int bgsaveinprogress;
276 pid_t bgsavechildpid;
277 struct saveparam *saveparams;
278 int saveparamslen;
279 char *logfile;
280 char *bindaddr;
281 char *dbfilename;
282 char *appendfilename;
283 char *requirepass;
284 int shareobjects;
285 /* Replication related */
286 int isslave;
287 char *masterhost;
288 int masterport;
289 redisClient *master; /* client that is master for this slave */
290 int replstate;
291 unsigned int maxclients;
292 unsigned long maxmemory;
293 /* Sort parameters - qsort_r() is only available under BSD so we
294 * have to take this state global, in order to pass it to sortCompare() */
295 int sort_desc;
296 int sort_alpha;
297 int sort_bypattern;
298 };
299
300 typedef void redisCommandProc(redisClient *c);
301 struct redisCommand {
302 char *name;
303 redisCommandProc *proc;
304 int arity;
305 int flags;
306 };
307
308 struct redisFunctionSym {
309 char *name;
310 unsigned long pointer;
311 };
312
313 typedef struct _redisSortObject {
314 robj *obj;
315 union {
316 double score;
317 robj *cmpobj;
318 } u;
319 } redisSortObject;
320
321 typedef struct _redisSortOperation {
322 int type;
323 robj *pattern;
324 } redisSortOperation;
325
326 /* ZSETs use a specialized version of Skiplists */
327
328 typedef struct zskiplistNode {
329 struct zskiplistNode **forward;
330 struct zskiplistNode *backward;
331 double score;
332 robj *obj;
333 } zskiplistNode;
334
335 typedef struct zskiplist {
336 struct zskiplistNode *header, *tail;
337 unsigned long length;
338 int level;
339 } zskiplist;
340
341 typedef struct zset {
342 dict *dict;
343 zskiplist *zsl;
344 } zset;
345
346 /* Our shared "common" objects */
347
348 struct sharedObjectsStruct {
349 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
350 *colon, *nullbulk, *nullmultibulk,
351 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
352 *outofrangeerr, *plus,
353 *select0, *select1, *select2, *select3, *select4,
354 *select5, *select6, *select7, *select8, *select9;
355 } shared;
356
357 /* Global vars that are actally used as constants. The following double
358 * values are used for double on-disk serialization, and are initialized
359 * at runtime to avoid strange compiler optimizations. */
360
361 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
362
363 /*================================ Prototypes =============================== */
364
365 static void freeStringObject(robj *o);
366 static void freeListObject(robj *o);
367 static void freeSetObject(robj *o);
368 static void decrRefCount(void *o);
369 static robj *createObject(int type, void *ptr);
370 static void freeClient(redisClient *c);
371 static int rdbLoad(char *filename);
372 static void addReply(redisClient *c, robj *obj);
373 static void addReplySds(redisClient *c, sds s);
374 static void incrRefCount(robj *o);
375 static int rdbSaveBackground(char *filename);
376 static robj *createStringObject(char *ptr, size_t len);
377 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
378 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
379 static int syncWithMaster(void);
380 static robj *tryObjectSharing(robj *o);
381 static int tryObjectEncoding(robj *o);
382 static robj *getDecodedObject(const robj *o);
383 static int removeExpire(redisDb *db, robj *key);
384 static int expireIfNeeded(redisDb *db, robj *key);
385 static int deleteIfVolatile(redisDb *db, robj *key);
386 static int deleteKey(redisDb *db, robj *key);
387 static time_t getExpire(redisDb *db, robj *key);
388 static int setExpire(redisDb *db, robj *key, time_t when);
389 static void updateSlavesWaitingBgsave(int bgsaveerr);
390 static void freeMemoryIfNeeded(void);
391 static int processCommand(redisClient *c);
392 static void setupSigSegvAction(void);
393 static void rdbRemoveTempFile(pid_t childpid);
394 static size_t stringObjectLen(robj *o);
395 static void processInputBuffer(redisClient *c);
396 static zskiplist *zslCreate(void);
397 static void zslFree(zskiplist *zsl);
398 static void zslInsert(zskiplist *zsl, double score, robj *obj);
399
400 static void authCommand(redisClient *c);
401 static void pingCommand(redisClient *c);
402 static void echoCommand(redisClient *c);
403 static void setCommand(redisClient *c);
404 static void setnxCommand(redisClient *c);
405 static void getCommand(redisClient *c);
406 static void delCommand(redisClient *c);
407 static void existsCommand(redisClient *c);
408 static void incrCommand(redisClient *c);
409 static void decrCommand(redisClient *c);
410 static void incrbyCommand(redisClient *c);
411 static void decrbyCommand(redisClient *c);
412 static void selectCommand(redisClient *c);
413 static void randomkeyCommand(redisClient *c);
414 static void keysCommand(redisClient *c);
415 static void dbsizeCommand(redisClient *c);
416 static void lastsaveCommand(redisClient *c);
417 static void saveCommand(redisClient *c);
418 static void bgsaveCommand(redisClient *c);
419 static void shutdownCommand(redisClient *c);
420 static void moveCommand(redisClient *c);
421 static void renameCommand(redisClient *c);
422 static void renamenxCommand(redisClient *c);
423 static void lpushCommand(redisClient *c);
424 static void rpushCommand(redisClient *c);
425 static void lpopCommand(redisClient *c);
426 static void rpopCommand(redisClient *c);
427 static void llenCommand(redisClient *c);
428 static void lindexCommand(redisClient *c);
429 static void lrangeCommand(redisClient *c);
430 static void ltrimCommand(redisClient *c);
431 static void typeCommand(redisClient *c);
432 static void lsetCommand(redisClient *c);
433 static void saddCommand(redisClient *c);
434 static void sremCommand(redisClient *c);
435 static void smoveCommand(redisClient *c);
436 static void sismemberCommand(redisClient *c);
437 static void scardCommand(redisClient *c);
438 static void spopCommand(redisClient *c);
439 static void srandmemberCommand(redisClient *c);
440 static void sinterCommand(redisClient *c);
441 static void sinterstoreCommand(redisClient *c);
442 static void sunionCommand(redisClient *c);
443 static void sunionstoreCommand(redisClient *c);
444 static void sdiffCommand(redisClient *c);
445 static void sdiffstoreCommand(redisClient *c);
446 static void syncCommand(redisClient *c);
447 static void flushdbCommand(redisClient *c);
448 static void flushallCommand(redisClient *c);
449 static void sortCommand(redisClient *c);
450 static void lremCommand(redisClient *c);
451 static void infoCommand(redisClient *c);
452 static void mgetCommand(redisClient *c);
453 static void monitorCommand(redisClient *c);
454 static void expireCommand(redisClient *c);
455 static void expireatCommand(redisClient *c);
456 static void getsetCommand(redisClient *c);
457 static void ttlCommand(redisClient *c);
458 static void slaveofCommand(redisClient *c);
459 static void debugCommand(redisClient *c);
460 static void msetCommand(redisClient *c);
461 static void msetnxCommand(redisClient *c);
462 static void zaddCommand(redisClient *c);
463 static void zrangeCommand(redisClient *c);
464 static void zrangebyscoreCommand(redisClient *c);
465 static void zrevrangeCommand(redisClient *c);
466 static void zcardCommand(redisClient *c);
467 static void zremCommand(redisClient *c);
468 static void zscoreCommand(redisClient *c);
469 static void zremrangebyscoreCommand(redisClient *c);
470
471 /*================================= Globals ================================= */
472
473 /* Global vars */
474 static struct redisServer server; /* server global state */
475 static struct redisCommand cmdTable[] = {
476 {"get",getCommand,2,REDIS_CMD_INLINE},
477 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
478 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
479 {"del",delCommand,-2,REDIS_CMD_INLINE},
480 {"exists",existsCommand,2,REDIS_CMD_INLINE},
481 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
482 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
483 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
484 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
485 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
486 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
487 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
488 {"llen",llenCommand,2,REDIS_CMD_INLINE},
489 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
490 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
491 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
492 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
493 {"lrem",lremCommand,4,REDIS_CMD_BULK},
494 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
495 {"srem",sremCommand,3,REDIS_CMD_BULK},
496 {"smove",smoveCommand,4,REDIS_CMD_BULK},
497 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
498 {"scard",scardCommand,2,REDIS_CMD_INLINE},
499 {"spop",spopCommand,2,REDIS_CMD_INLINE},
500 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
501 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
502 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
503 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
504 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
505 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
506 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
507 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
508 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
509 {"zrem",zremCommand,3,REDIS_CMD_BULK},
510 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
511 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
512 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
513 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
514 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
515 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
516 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
517 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
518 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
519 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
520 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
521 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
522 {"select",selectCommand,2,REDIS_CMD_INLINE},
523 {"move",moveCommand,3,REDIS_CMD_INLINE},
524 {"rename",renameCommand,3,REDIS_CMD_INLINE},
525 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
526 {"expire",expireCommand,3,REDIS_CMD_INLINE},
527 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
528 {"keys",keysCommand,2,REDIS_CMD_INLINE},
529 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
530 {"auth",authCommand,2,REDIS_CMD_INLINE},
531 {"ping",pingCommand,1,REDIS_CMD_INLINE},
532 {"echo",echoCommand,2,REDIS_CMD_BULK},
533 {"save",saveCommand,1,REDIS_CMD_INLINE},
534 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
535 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
536 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
537 {"type",typeCommand,2,REDIS_CMD_INLINE},
538 {"sync",syncCommand,1,REDIS_CMD_INLINE},
539 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
540 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
541 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
542 {"info",infoCommand,1,REDIS_CMD_INLINE},
543 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
544 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
545 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
546 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
547 {NULL,NULL,0,0}
548 };
549 /*============================ Utility functions ============================ */
550
551 /* Glob-style pattern matching. */
552 int stringmatchlen(const char *pattern, int patternLen,
553 const char *string, int stringLen, int nocase)
554 {
555 while(patternLen) {
556 switch(pattern[0]) {
557 case '*':
558 while (pattern[1] == '*') {
559 pattern++;
560 patternLen--;
561 }
562 if (patternLen == 1)
563 return 1; /* match */
564 while(stringLen) {
565 if (stringmatchlen(pattern+1, patternLen-1,
566 string, stringLen, nocase))
567 return 1; /* match */
568 string++;
569 stringLen--;
570 }
571 return 0; /* no match */
572 break;
573 case '?':
574 if (stringLen == 0)
575 return 0; /* no match */
576 string++;
577 stringLen--;
578 break;
579 case '[':
580 {
581 int not, match;
582
583 pattern++;
584 patternLen--;
585 not = pattern[0] == '^';
586 if (not) {
587 pattern++;
588 patternLen--;
589 }
590 match = 0;
591 while(1) {
592 if (pattern[0] == '\\') {
593 pattern++;
594 patternLen--;
595 if (pattern[0] == string[0])
596 match = 1;
597 } else if (pattern[0] == ']') {
598 break;
599 } else if (patternLen == 0) {
600 pattern--;
601 patternLen++;
602 break;
603 } else if (pattern[1] == '-' && patternLen >= 3) {
604 int start = pattern[0];
605 int end = pattern[2];
606 int c = string[0];
607 if (start > end) {
608 int t = start;
609 start = end;
610 end = t;
611 }
612 if (nocase) {
613 start = tolower(start);
614 end = tolower(end);
615 c = tolower(c);
616 }
617 pattern += 2;
618 patternLen -= 2;
619 if (c >= start && c <= end)
620 match = 1;
621 } else {
622 if (!nocase) {
623 if (pattern[0] == string[0])
624 match = 1;
625 } else {
626 if (tolower((int)pattern[0]) == tolower((int)string[0]))
627 match = 1;
628 }
629 }
630 pattern++;
631 patternLen--;
632 }
633 if (not)
634 match = !match;
635 if (!match)
636 return 0; /* no match */
637 string++;
638 stringLen--;
639 break;
640 }
641 case '\\':
642 if (patternLen >= 2) {
643 pattern++;
644 patternLen--;
645 }
646 /* fall through */
647 default:
648 if (!nocase) {
649 if (pattern[0] != string[0])
650 return 0; /* no match */
651 } else {
652 if (tolower((int)pattern[0]) != tolower((int)string[0]))
653 return 0; /* no match */
654 }
655 string++;
656 stringLen--;
657 break;
658 }
659 pattern++;
660 patternLen--;
661 if (stringLen == 0) {
662 while(*pattern == '*') {
663 pattern++;
664 patternLen--;
665 }
666 break;
667 }
668 }
669 if (patternLen == 0 && stringLen == 0)
670 return 1;
671 return 0;
672 }
673
674 static void redisLog(int level, const char *fmt, ...) {
675 va_list ap;
676 FILE *fp;
677
678 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
679 if (!fp) return;
680
681 va_start(ap, fmt);
682 if (level >= server.verbosity) {
683 char *c = ".-*";
684 char buf[64];
685 time_t now;
686
687 now = time(NULL);
688 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
689 fprintf(fp,"%s %c ",buf,c[level]);
690 vfprintf(fp, fmt, ap);
691 fprintf(fp,"\n");
692 fflush(fp);
693 }
694 va_end(ap);
695
696 if (server.logfile) fclose(fp);
697 }
698
699 /*====================== Hash table type implementation ==================== */
700
701 /* This is an hash table type that uses the SDS dynamic strings libary as
702 * keys and radis objects as values (objects can hold SDS strings,
703 * lists, sets). */
704
705 static void dictVanillaFree(void *privdata, void *val)
706 {
707 DICT_NOTUSED(privdata);
708 zfree(val);
709 }
710
711 static int sdsDictKeyCompare(void *privdata, const void *key1,
712 const void *key2)
713 {
714 int l1,l2;
715 DICT_NOTUSED(privdata);
716
717 l1 = sdslen((sds)key1);
718 l2 = sdslen((sds)key2);
719 if (l1 != l2) return 0;
720 return memcmp(key1, key2, l1) == 0;
721 }
722
723 static void dictRedisObjectDestructor(void *privdata, void *val)
724 {
725 DICT_NOTUSED(privdata);
726
727 decrRefCount(val);
728 }
729
730 static int dictObjKeyCompare(void *privdata, const void *key1,
731 const void *key2)
732 {
733 const robj *o1 = key1, *o2 = key2;
734 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
735 }
736
737 static unsigned int dictObjHash(const void *key) {
738 const robj *o = key;
739 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
740 }
741
742 static int dictEncObjKeyCompare(void *privdata, const void *key1,
743 const void *key2)
744 {
745 const robj *o1 = key1, *o2 = key2;
746
747 if (o1->encoding == REDIS_ENCODING_RAW &&
748 o2->encoding == REDIS_ENCODING_RAW)
749 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
750 else {
751 robj *dec1, *dec2;
752 int cmp;
753
754 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
755 getDecodedObject(o1) : (robj*)o1;
756 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
757 getDecodedObject(o2) : (robj*)o2;
758 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
759 if (dec1 != o1) decrRefCount(dec1);
760 if (dec2 != o2) decrRefCount(dec2);
761 return cmp;
762 }
763 }
764
765 static unsigned int dictEncObjHash(const void *key) {
766 const robj *o = key;
767
768 if (o->encoding == REDIS_ENCODING_RAW)
769 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
770 else {
771 robj *dec = getDecodedObject(o);
772 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
773 decrRefCount(dec);
774 return hash;
775 }
776 }
777
778 static dictType setDictType = {
779 dictEncObjHash, /* hash function */
780 NULL, /* key dup */
781 NULL, /* val dup */
782 dictEncObjKeyCompare, /* key compare */
783 dictRedisObjectDestructor, /* key destructor */
784 NULL /* val destructor */
785 };
786
787 static dictType zsetDictType = {
788 dictEncObjHash, /* hash function */
789 NULL, /* key dup */
790 NULL, /* val dup */
791 dictEncObjKeyCompare, /* key compare */
792 dictRedisObjectDestructor, /* key destructor */
793 dictVanillaFree /* val destructor */
794 };
795
796 static dictType hashDictType = {
797 dictObjHash, /* hash function */
798 NULL, /* key dup */
799 NULL, /* val dup */
800 dictObjKeyCompare, /* key compare */
801 dictRedisObjectDestructor, /* key destructor */
802 dictRedisObjectDestructor /* val destructor */
803 };
804
805 /* ========================= Random utility functions ======================= */
806
807 /* Redis generally does not try to recover from out of memory conditions
808 * when allocating objects or strings, it is not clear if it will be possible
809 * to report this condition to the client since the networking layer itself
810 * is based on heap allocation for send buffers, so we simply abort.
811 * At least the code will be simpler to read... */
812 static void oom(const char *msg) {
813 fprintf(stderr, "%s: Out of memory\n",msg);
814 fflush(stderr);
815 sleep(1);
816 abort();
817 }
818
819 /* ====================== Redis server networking stuff ===================== */
820 static void closeTimedoutClients(void) {
821 redisClient *c;
822 listNode *ln;
823 time_t now = time(NULL);
824
825 listRewind(server.clients);
826 while ((ln = listYield(server.clients)) != NULL) {
827 c = listNodeValue(ln);
828 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
829 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
830 (now - c->lastinteraction > server.maxidletime)) {
831 redisLog(REDIS_DEBUG,"Closing idle client");
832 freeClient(c);
833 }
834 }
835 }
836
837 static int htNeedsResize(dict *dict) {
838 long long size, used;
839
840 size = dictSlots(dict);
841 used = dictSize(dict);
842 return (size && used && size > DICT_HT_INITIAL_SIZE &&
843 (used*100/size < REDIS_HT_MINFILL));
844 }
845
846 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
847 * we resize the hash table to save memory */
848 static void tryResizeHashTables(void) {
849 int j;
850
851 for (j = 0; j < server.dbnum; j++) {
852 if (htNeedsResize(server.db[j].dict)) {
853 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
854 dictResize(server.db[j].dict);
855 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
856 }
857 if (htNeedsResize(server.db[j].expires))
858 dictResize(server.db[j].expires);
859 }
860 }
861
862 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
863 int j, loops = server.cronloops++;
864 REDIS_NOTUSED(eventLoop);
865 REDIS_NOTUSED(id);
866 REDIS_NOTUSED(clientData);
867
868 /* Update the global state with the amount of used memory */
869 server.usedmemory = zmalloc_used_memory();
870
871 /* Show some info about non-empty databases */
872 for (j = 0; j < server.dbnum; j++) {
873 long long size, used, vkeys;
874
875 size = dictSlots(server.db[j].dict);
876 used = dictSize(server.db[j].dict);
877 vkeys = dictSize(server.db[j].expires);
878 if (!(loops % 5) && (used || vkeys)) {
879 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
880 /* dictPrintStats(server.dict); */
881 }
882 }
883
884 /* We don't want to resize the hash tables while a bacground saving
885 * is in progress: the saving child is created using fork() that is
886 * implemented with a copy-on-write semantic in most modern systems, so
887 * if we resize the HT while there is the saving child at work actually
888 * a lot of memory movements in the parent will cause a lot of pages
889 * copied. */
890 if (!server.bgsaveinprogress) tryResizeHashTables();
891
892 /* Show information about connected clients */
893 if (!(loops % 5)) {
894 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
895 listLength(server.clients)-listLength(server.slaves),
896 listLength(server.slaves),
897 server.usedmemory,
898 dictSize(server.sharingpool));
899 }
900
901 /* Close connections of timedout clients */
902 if (server.maxidletime && !(loops % 10))
903 closeTimedoutClients();
904
905 /* Check if a background saving in progress terminated */
906 if (server.bgsaveinprogress) {
907 int statloc;
908 if (wait4(-1,&statloc,WNOHANG,NULL)) {
909 int exitcode = WEXITSTATUS(statloc);
910 int bysignal = WIFSIGNALED(statloc);
911
912 if (!bysignal && exitcode == 0) {
913 redisLog(REDIS_NOTICE,
914 "Background saving terminated with success");
915 server.dirty = 0;
916 server.lastsave = time(NULL);
917 } else if (!bysignal && exitcode != 0) {
918 redisLog(REDIS_WARNING, "Background saving error");
919 } else {
920 redisLog(REDIS_WARNING,
921 "Background saving terminated by signal");
922 rdbRemoveTempFile(server.bgsavechildpid);
923 }
924 server.bgsaveinprogress = 0;
925 server.bgsavechildpid = -1;
926 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
927 }
928 } else {
929 /* If there is not a background saving in progress check if
930 * we have to save now */
931 time_t now = time(NULL);
932 for (j = 0; j < server.saveparamslen; j++) {
933 struct saveparam *sp = server.saveparams+j;
934
935 if (server.dirty >= sp->changes &&
936 now-server.lastsave > sp->seconds) {
937 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
938 sp->changes, sp->seconds);
939 rdbSaveBackground(server.dbfilename);
940 break;
941 }
942 }
943 }
944
945 /* Try to expire a few timed out keys */
946 for (j = 0; j < server.dbnum; j++) {
947 redisDb *db = server.db+j;
948 int num = dictSize(db->expires);
949
950 if (num) {
951 time_t now = time(NULL);
952
953 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
954 num = REDIS_EXPIRELOOKUPS_PER_CRON;
955 while (num--) {
956 dictEntry *de;
957 time_t t;
958
959 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
960 t = (time_t) dictGetEntryVal(de);
961 if (now > t) {
962 deleteKey(db,dictGetEntryKey(de));
963 }
964 }
965 }
966 }
967
968 /* Check if we should connect to a MASTER */
969 if (server.replstate == REDIS_REPL_CONNECT) {
970 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
971 if (syncWithMaster() == REDIS_OK) {
972 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
973 }
974 }
975 return 1000;
976 }
977
978 static void createSharedObjects(void) {
979 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
980 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
981 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
982 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
983 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
984 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
985 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
986 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
987 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
988 /* no such key */
989 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
990 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
991 "-ERR Operation against a key holding the wrong kind of value\r\n"));
992 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
993 "-ERR no such key\r\n"));
994 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
995 "-ERR syntax error\r\n"));
996 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
997 "-ERR source and destination objects are the same\r\n"));
998 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
999 "-ERR index out of range\r\n"));
1000 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1001 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1002 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1003 shared.select0 = createStringObject("select 0\r\n",10);
1004 shared.select1 = createStringObject("select 1\r\n",10);
1005 shared.select2 = createStringObject("select 2\r\n",10);
1006 shared.select3 = createStringObject("select 3\r\n",10);
1007 shared.select4 = createStringObject("select 4\r\n",10);
1008 shared.select5 = createStringObject("select 5\r\n",10);
1009 shared.select6 = createStringObject("select 6\r\n",10);
1010 shared.select7 = createStringObject("select 7\r\n",10);
1011 shared.select8 = createStringObject("select 8\r\n",10);
1012 shared.select9 = createStringObject("select 9\r\n",10);
1013 }
1014
1015 static void appendServerSaveParams(time_t seconds, int changes) {
1016 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1017 server.saveparams[server.saveparamslen].seconds = seconds;
1018 server.saveparams[server.saveparamslen].changes = changes;
1019 server.saveparamslen++;
1020 }
1021
1022 static void ResetServerSaveParams() {
1023 zfree(server.saveparams);
1024 server.saveparams = NULL;
1025 server.saveparamslen = 0;
1026 }
1027
1028 static void initServerConfig() {
1029 server.dbnum = REDIS_DEFAULT_DBNUM;
1030 server.port = REDIS_SERVERPORT;
1031 server.verbosity = REDIS_DEBUG;
1032 server.maxidletime = REDIS_MAXIDLETIME;
1033 server.saveparams = NULL;
1034 server.logfile = NULL; /* NULL = log on standard output */
1035 server.bindaddr = NULL;
1036 server.glueoutputbuf = 1;
1037 server.daemonize = 0;
1038 server.appendonly = 0;
1039 server.appendfsync = APPENDFSYNC_ALWAYS;
1040 server.lastfsync = time(NULL);
1041 server.appendfd = -1;
1042 server.appendseldb = -1; /* Make sure the first time will not match */
1043 server.pidfile = "/var/run/redis.pid";
1044 server.dbfilename = "dump.rdb";
1045 server.appendfilename = "appendonly.log";
1046 server.requirepass = NULL;
1047 server.shareobjects = 0;
1048 server.sharingpoolsize = 1024;
1049 server.maxclients = 0;
1050 server.maxmemory = 0;
1051 ResetServerSaveParams();
1052
1053 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1054 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1055 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1056 /* Replication related */
1057 server.isslave = 0;
1058 server.masterhost = NULL;
1059 server.masterport = 6379;
1060 server.master = NULL;
1061 server.replstate = REDIS_REPL_NONE;
1062
1063 /* Double constants initialization */
1064 R_Zero = 0.0;
1065 R_PosInf = 1.0/R_Zero;
1066 R_NegInf = -1.0/R_Zero;
1067 R_Nan = R_Zero/R_Zero;
1068 }
1069
1070 static void initServer() {
1071 int j;
1072
1073 signal(SIGHUP, SIG_IGN);
1074 signal(SIGPIPE, SIG_IGN);
1075 setupSigSegvAction();
1076
1077 server.clients = listCreate();
1078 server.slaves = listCreate();
1079 server.monitors = listCreate();
1080 server.objfreelist = listCreate();
1081 createSharedObjects();
1082 server.el = aeCreateEventLoop();
1083 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1084 server.sharingpool = dictCreate(&setDictType,NULL);
1085 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1086 if (server.fd == -1) {
1087 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1088 exit(1);
1089 }
1090 for (j = 0; j < server.dbnum; j++) {
1091 server.db[j].dict = dictCreate(&hashDictType,NULL);
1092 server.db[j].expires = dictCreate(&setDictType,NULL);
1093 server.db[j].id = j;
1094 }
1095 server.cronloops = 0;
1096 server.bgsaveinprogress = 0;
1097 server.bgsavechildpid = -1;
1098 server.lastsave = time(NULL);
1099 server.dirty = 0;
1100 server.usedmemory = 0;
1101 server.stat_numcommands = 0;
1102 server.stat_numconnections = 0;
1103 server.stat_starttime = time(NULL);
1104 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1105
1106 if (server.appendonly) {
1107 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1108 if (server.appendfd == -1) {
1109 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1110 strerror(errno));
1111 exit(1);
1112 }
1113 }
1114 }
1115
1116 /* Empty the whole database */
1117 static long long emptyDb() {
1118 int j;
1119 long long removed = 0;
1120
1121 for (j = 0; j < server.dbnum; j++) {
1122 removed += dictSize(server.db[j].dict);
1123 dictEmpty(server.db[j].dict);
1124 dictEmpty(server.db[j].expires);
1125 }
1126 return removed;
1127 }
1128
1129 static int yesnotoi(char *s) {
1130 if (!strcasecmp(s,"yes")) return 1;
1131 else if (!strcasecmp(s,"no")) return 0;
1132 else return -1;
1133 }
1134
1135 /* I agree, this is a very rudimental way to load a configuration...
1136 will improve later if the config gets more complex */
1137 static void loadServerConfig(char *filename) {
1138 FILE *fp;
1139 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1140 int linenum = 0;
1141 sds line = NULL;
1142
1143 if (filename[0] == '-' && filename[1] == '\0')
1144 fp = stdin;
1145 else {
1146 if ((fp = fopen(filename,"r")) == NULL) {
1147 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1148 exit(1);
1149 }
1150 }
1151
1152 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1153 sds *argv;
1154 int argc, j;
1155
1156 linenum++;
1157 line = sdsnew(buf);
1158 line = sdstrim(line," \t\r\n");
1159
1160 /* Skip comments and blank lines*/
1161 if (line[0] == '#' || line[0] == '\0') {
1162 sdsfree(line);
1163 continue;
1164 }
1165
1166 /* Split into arguments */
1167 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1168 sdstolower(argv[0]);
1169
1170 /* Execute config directives */
1171 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1172 server.maxidletime = atoi(argv[1]);
1173 if (server.maxidletime < 0) {
1174 err = "Invalid timeout value"; goto loaderr;
1175 }
1176 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1177 server.port = atoi(argv[1]);
1178 if (server.port < 1 || server.port > 65535) {
1179 err = "Invalid port"; goto loaderr;
1180 }
1181 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1182 server.bindaddr = zstrdup(argv[1]);
1183 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1184 int seconds = atoi(argv[1]);
1185 int changes = atoi(argv[2]);
1186 if (seconds < 1 || changes < 0) {
1187 err = "Invalid save parameters"; goto loaderr;
1188 }
1189 appendServerSaveParams(seconds,changes);
1190 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1191 if (chdir(argv[1]) == -1) {
1192 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1193 argv[1], strerror(errno));
1194 exit(1);
1195 }
1196 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1197 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1198 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1199 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1200 else {
1201 err = "Invalid log level. Must be one of debug, notice, warning";
1202 goto loaderr;
1203 }
1204 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1205 FILE *logfp;
1206
1207 server.logfile = zstrdup(argv[1]);
1208 if (!strcasecmp(server.logfile,"stdout")) {
1209 zfree(server.logfile);
1210 server.logfile = NULL;
1211 }
1212 if (server.logfile) {
1213 /* Test if we are able to open the file. The server will not
1214 * be able to abort just for this problem later... */
1215 logfp = fopen(server.logfile,"a");
1216 if (logfp == NULL) {
1217 err = sdscatprintf(sdsempty(),
1218 "Can't open the log file: %s", strerror(errno));
1219 goto loaderr;
1220 }
1221 fclose(logfp);
1222 }
1223 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1224 server.dbnum = atoi(argv[1]);
1225 if (server.dbnum < 1) {
1226 err = "Invalid number of databases"; goto loaderr;
1227 }
1228 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1229 server.maxclients = atoi(argv[1]);
1230 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1231 server.maxmemory = strtoll(argv[1], NULL, 10);
1232 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1233 server.masterhost = sdsnew(argv[1]);
1234 server.masterport = atoi(argv[2]);
1235 server.replstate = REDIS_REPL_CONNECT;
1236 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1237 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1238 err = "argument must be 'yes' or 'no'"; goto loaderr;
1239 }
1240 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1241 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1242 err = "argument must be 'yes' or 'no'"; goto loaderr;
1243 }
1244 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1245 server.sharingpoolsize = atoi(argv[1]);
1246 if (server.sharingpoolsize < 1) {
1247 err = "invalid object sharing pool size"; goto loaderr;
1248 }
1249 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1250 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1251 err = "argument must be 'yes' or 'no'"; goto loaderr;
1252 }
1253 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1254 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1255 err = "argument must be 'yes' or 'no'"; goto loaderr;
1256 }
1257 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1258 if (!strcasecmp(argv[1],"no")) {
1259 server.appendfsync = APPENDFSYNC_NO;
1260 } else if (!strcasecmp(argv[1],"always")) {
1261 server.appendfsync = APPENDFSYNC_ALWAYS;
1262 } else if (!strcasecmp(argv[1],"everysec")) {
1263 server.appendfsync = APPENDFSYNC_EVERYSEC;
1264 } else {
1265 err = "argument must be 'no', 'always' or 'everysec'";
1266 goto loaderr;
1267 }
1268 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1269 server.requirepass = zstrdup(argv[1]);
1270 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1271 server.pidfile = zstrdup(argv[1]);
1272 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1273 server.dbfilename = zstrdup(argv[1]);
1274 } else {
1275 err = "Bad directive or wrong number of arguments"; goto loaderr;
1276 }
1277 for (j = 0; j < argc; j++)
1278 sdsfree(argv[j]);
1279 zfree(argv);
1280 sdsfree(line);
1281 }
1282 if (fp != stdin) fclose(fp);
1283 return;
1284
1285 loaderr:
1286 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1287 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1288 fprintf(stderr, ">>> '%s'\n", line);
1289 fprintf(stderr, "%s\n", err);
1290 exit(1);
1291 }
1292
1293 static void freeClientArgv(redisClient *c) {
1294 int j;
1295
1296 for (j = 0; j < c->argc; j++)
1297 decrRefCount(c->argv[j]);
1298 for (j = 0; j < c->mbargc; j++)
1299 decrRefCount(c->mbargv[j]);
1300 c->argc = 0;
1301 c->mbargc = 0;
1302 }
1303
1304 static void freeClient(redisClient *c) {
1305 listNode *ln;
1306
1307 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1308 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1309 sdsfree(c->querybuf);
1310 listRelease(c->reply);
1311 freeClientArgv(c);
1312 close(c->fd);
1313 ln = listSearchKey(server.clients,c);
1314 assert(ln != NULL);
1315 listDelNode(server.clients,ln);
1316 if (c->flags & REDIS_SLAVE) {
1317 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1318 close(c->repldbfd);
1319 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1320 ln = listSearchKey(l,c);
1321 assert(ln != NULL);
1322 listDelNode(l,ln);
1323 }
1324 if (c->flags & REDIS_MASTER) {
1325 server.master = NULL;
1326 server.replstate = REDIS_REPL_CONNECT;
1327 }
1328 zfree(c->argv);
1329 zfree(c->mbargv);
1330 zfree(c);
1331 }
1332
1333 static void glueReplyBuffersIfNeeded(redisClient *c) {
1334 int totlen = 0;
1335 listNode *ln;
1336 robj *o;
1337
1338 listRewind(c->reply);
1339 while((ln = listYield(c->reply))) {
1340 o = ln->value;
1341 totlen += sdslen(o->ptr);
1342 /* This optimization makes more sense if we don't have to copy
1343 * too much data */
1344 if (totlen > 1024) return;
1345 }
1346 if (totlen > 0) {
1347 char buf[1024];
1348 int copylen = 0;
1349
1350 listRewind(c->reply);
1351 while((ln = listYield(c->reply))) {
1352 o = ln->value;
1353 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1354 copylen += sdslen(o->ptr);
1355 listDelNode(c->reply,ln);
1356 }
1357 /* Now the output buffer is empty, add the new single element */
1358 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1359 listAddNodeTail(c->reply,o);
1360 }
1361 }
1362
1363 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1364 redisClient *c = privdata;
1365 int nwritten = 0, totwritten = 0, objlen;
1366 robj *o;
1367 REDIS_NOTUSED(el);
1368 REDIS_NOTUSED(mask);
1369
1370 if (server.glueoutputbuf && listLength(c->reply) > 1)
1371 glueReplyBuffersIfNeeded(c);
1372 while(listLength(c->reply)) {
1373 o = listNodeValue(listFirst(c->reply));
1374 objlen = sdslen(o->ptr);
1375
1376 if (objlen == 0) {
1377 listDelNode(c->reply,listFirst(c->reply));
1378 continue;
1379 }
1380
1381 if (c->flags & REDIS_MASTER) {
1382 /* Don't reply to a master */
1383 nwritten = objlen - c->sentlen;
1384 } else {
1385 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1386 if (nwritten <= 0) break;
1387 }
1388 c->sentlen += nwritten;
1389 totwritten += nwritten;
1390 /* If we fully sent the object on head go to the next one */
1391 if (c->sentlen == objlen) {
1392 listDelNode(c->reply,listFirst(c->reply));
1393 c->sentlen = 0;
1394 }
1395 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1396 * bytes, in a single threaded server it's a good idea to server
1397 * other clients as well, even if a very large request comes from
1398 * super fast link that is always able to accept data (in real world
1399 * terms think to 'KEYS *' against the loopback interfae) */
1400 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1401 }
1402 if (nwritten == -1) {
1403 if (errno == EAGAIN) {
1404 nwritten = 0;
1405 } else {
1406 redisLog(REDIS_DEBUG,
1407 "Error writing to client: %s", strerror(errno));
1408 freeClient(c);
1409 return;
1410 }
1411 }
1412 if (totwritten > 0) c->lastinteraction = time(NULL);
1413 if (listLength(c->reply) == 0) {
1414 c->sentlen = 0;
1415 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1416 }
1417 }
1418
1419 static struct redisCommand *lookupCommand(char *name) {
1420 int j = 0;
1421 while(cmdTable[j].name != NULL) {
1422 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1423 j++;
1424 }
1425 return NULL;
1426 }
1427
1428 /* resetClient prepare the client to process the next command */
1429 static void resetClient(redisClient *c) {
1430 freeClientArgv(c);
1431 c->bulklen = -1;
1432 c->multibulk = 0;
1433 }
1434
1435 /* If this function gets called we already read a whole
1436 * command, argments are in the client argv/argc fields.
1437 * processCommand() execute the command or prepare the
1438 * server for a bulk read from the client.
1439 *
1440 * If 1 is returned the client is still alive and valid and
1441 * and other operations can be performed by the caller. Otherwise
1442 * if 0 is returned the client was destroied (i.e. after QUIT). */
1443 static int processCommand(redisClient *c) {
1444 struct redisCommand *cmd;
1445 long long dirty;
1446
1447 /* Free some memory if needed (maxmemory setting) */
1448 if (server.maxmemory) freeMemoryIfNeeded();
1449
1450 /* Handle the multi bulk command type. This is an alternative protocol
1451 * supported by Redis in order to receive commands that are composed of
1452 * multiple binary-safe "bulk" arguments. The latency of processing is
1453 * a bit higher but this allows things like multi-sets, so if this
1454 * protocol is used only for MSET and similar commands this is a big win. */
1455 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1456 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1457 if (c->multibulk <= 0) {
1458 resetClient(c);
1459 return 1;
1460 } else {
1461 decrRefCount(c->argv[c->argc-1]);
1462 c->argc--;
1463 return 1;
1464 }
1465 } else if (c->multibulk) {
1466 if (c->bulklen == -1) {
1467 if (((char*)c->argv[0]->ptr)[0] != '$') {
1468 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1469 resetClient(c);
1470 return 1;
1471 } else {
1472 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1473 decrRefCount(c->argv[0]);
1474 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1475 c->argc--;
1476 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1477 resetClient(c);
1478 return 1;
1479 }
1480 c->argc--;
1481 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1482 return 1;
1483 }
1484 } else {
1485 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1486 c->mbargv[c->mbargc] = c->argv[0];
1487 c->mbargc++;
1488 c->argc--;
1489 c->multibulk--;
1490 if (c->multibulk == 0) {
1491 robj **auxargv;
1492 int auxargc;
1493
1494 /* Here we need to swap the multi-bulk argc/argv with the
1495 * normal argc/argv of the client structure. */
1496 auxargv = c->argv;
1497 c->argv = c->mbargv;
1498 c->mbargv = auxargv;
1499
1500 auxargc = c->argc;
1501 c->argc = c->mbargc;
1502 c->mbargc = auxargc;
1503
1504 /* We need to set bulklen to something different than -1
1505 * in order for the code below to process the command without
1506 * to try to read the last argument of a bulk command as
1507 * a special argument. */
1508 c->bulklen = 0;
1509 /* continue below and process the command */
1510 } else {
1511 c->bulklen = -1;
1512 return 1;
1513 }
1514 }
1515 }
1516 /* -- end of multi bulk commands processing -- */
1517
1518 /* The QUIT command is handled as a special case. Normal command
1519 * procs are unable to close the client connection safely */
1520 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1521 freeClient(c);
1522 return 0;
1523 }
1524 cmd = lookupCommand(c->argv[0]->ptr);
1525 if (!cmd) {
1526 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1527 resetClient(c);
1528 return 1;
1529 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1530 (c->argc < -cmd->arity)) {
1531 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1532 resetClient(c);
1533 return 1;
1534 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1535 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1536 resetClient(c);
1537 return 1;
1538 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1539 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1540
1541 decrRefCount(c->argv[c->argc-1]);
1542 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1543 c->argc--;
1544 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1545 resetClient(c);
1546 return 1;
1547 }
1548 c->argc--;
1549 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1550 /* It is possible that the bulk read is already in the
1551 * buffer. Check this condition and handle it accordingly.
1552 * This is just a fast path, alternative to call processInputBuffer().
1553 * It's a good idea since the code is small and this condition
1554 * happens most of the times. */
1555 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1556 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1557 c->argc++;
1558 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1559 } else {
1560 return 1;
1561 }
1562 }
1563 /* Let's try to share objects on the command arguments vector */
1564 if (server.shareobjects) {
1565 int j;
1566 for(j = 1; j < c->argc; j++)
1567 c->argv[j] = tryObjectSharing(c->argv[j]);
1568 }
1569 /* Let's try to encode the bulk object to save space. */
1570 if (cmd->flags & REDIS_CMD_BULK)
1571 tryObjectEncoding(c->argv[c->argc-1]);
1572
1573 /* Check if the user is authenticated */
1574 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1575 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1576 resetClient(c);
1577 return 1;
1578 }
1579
1580 /* Exec the command */
1581 dirty = server.dirty;
1582 cmd->proc(c);
1583 if (server.appendonly != 0)
1584 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1585 if (server.dirty-dirty != 0 && listLength(server.slaves))
1586 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1587 if (listLength(server.monitors))
1588 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1589 server.stat_numcommands++;
1590
1591 /* Prepare the client for the next command */
1592 if (c->flags & REDIS_CLOSE) {
1593 freeClient(c);
1594 return 0;
1595 }
1596 resetClient(c);
1597 return 1;
1598 }
1599
1600 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1601 listNode *ln;
1602 int outc = 0, j;
1603 robj **outv;
1604 /* (args*2)+1 is enough room for args, spaces, newlines */
1605 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1606
1607 if (argc <= REDIS_STATIC_ARGS) {
1608 outv = static_outv;
1609 } else {
1610 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1611 }
1612
1613 for (j = 0; j < argc; j++) {
1614 if (j != 0) outv[outc++] = shared.space;
1615 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1616 robj *lenobj;
1617
1618 lenobj = createObject(REDIS_STRING,
1619 sdscatprintf(sdsempty(),"%d\r\n",
1620 stringObjectLen(argv[j])));
1621 lenobj->refcount = 0;
1622 outv[outc++] = lenobj;
1623 }
1624 outv[outc++] = argv[j];
1625 }
1626 outv[outc++] = shared.crlf;
1627
1628 /* Increment all the refcounts at start and decrement at end in order to
1629 * be sure to free objects if there is no slave in a replication state
1630 * able to be feed with commands */
1631 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1632 listRewind(slaves);
1633 while((ln = listYield(slaves))) {
1634 redisClient *slave = ln->value;
1635
1636 /* Don't feed slaves that are still waiting for BGSAVE to start */
1637 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1638
1639 /* Feed all the other slaves, MONITORs and so on */
1640 if (slave->slaveseldb != dictid) {
1641 robj *selectcmd;
1642
1643 switch(dictid) {
1644 case 0: selectcmd = shared.select0; break;
1645 case 1: selectcmd = shared.select1; break;
1646 case 2: selectcmd = shared.select2; break;
1647 case 3: selectcmd = shared.select3; break;
1648 case 4: selectcmd = shared.select4; break;
1649 case 5: selectcmd = shared.select5; break;
1650 case 6: selectcmd = shared.select6; break;
1651 case 7: selectcmd = shared.select7; break;
1652 case 8: selectcmd = shared.select8; break;
1653 case 9: selectcmd = shared.select9; break;
1654 default:
1655 selectcmd = createObject(REDIS_STRING,
1656 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1657 selectcmd->refcount = 0;
1658 break;
1659 }
1660 addReply(slave,selectcmd);
1661 slave->slaveseldb = dictid;
1662 }
1663 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1664 }
1665 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1666 if (outv != static_outv) zfree(outv);
1667 }
1668
1669 static void processInputBuffer(redisClient *c) {
1670 again:
1671 if (c->bulklen == -1) {
1672 /* Read the first line of the query */
1673 char *p = strchr(c->querybuf,'\n');
1674 size_t querylen;
1675
1676 if (p) {
1677 sds query, *argv;
1678 int argc, j;
1679
1680 query = c->querybuf;
1681 c->querybuf = sdsempty();
1682 querylen = 1+(p-(query));
1683 if (sdslen(query) > querylen) {
1684 /* leave data after the first line of the query in the buffer */
1685 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1686 }
1687 *p = '\0'; /* remove "\n" */
1688 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1689 sdsupdatelen(query);
1690
1691 /* Now we can split the query in arguments */
1692 if (sdslen(query) == 0) {
1693 /* Ignore empty query */
1694 sdsfree(query);
1695 return;
1696 }
1697 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1698 sdsfree(query);
1699
1700 if (c->argv) zfree(c->argv);
1701 c->argv = zmalloc(sizeof(robj*)*argc);
1702
1703 for (j = 0; j < argc; j++) {
1704 if (sdslen(argv[j])) {
1705 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1706 c->argc++;
1707 } else {
1708 sdsfree(argv[j]);
1709 }
1710 }
1711 zfree(argv);
1712 /* Execute the command. If the client is still valid
1713 * after processCommand() return and there is something
1714 * on the query buffer try to process the next command. */
1715 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1716 return;
1717 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1718 redisLog(REDIS_DEBUG, "Client protocol error");
1719 freeClient(c);
1720 return;
1721 }
1722 } else {
1723 /* Bulk read handling. Note that if we are at this point
1724 the client already sent a command terminated with a newline,
1725 we are reading the bulk data that is actually the last
1726 argument of the command. */
1727 int qbl = sdslen(c->querybuf);
1728
1729 if (c->bulklen <= qbl) {
1730 /* Copy everything but the final CRLF as final argument */
1731 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1732 c->argc++;
1733 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1734 /* Process the command. If the client is still valid after
1735 * the processing and there is more data in the buffer
1736 * try to parse it. */
1737 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1738 return;
1739 }
1740 }
1741 }
1742
1743 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1744 redisClient *c = (redisClient*) privdata;
1745 char buf[REDIS_IOBUF_LEN];
1746 int nread;
1747 REDIS_NOTUSED(el);
1748 REDIS_NOTUSED(mask);
1749
1750 nread = read(fd, buf, REDIS_IOBUF_LEN);
1751 if (nread == -1) {
1752 if (errno == EAGAIN) {
1753 nread = 0;
1754 } else {
1755 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1756 freeClient(c);
1757 return;
1758 }
1759 } else if (nread == 0) {
1760 redisLog(REDIS_DEBUG, "Client closed connection");
1761 freeClient(c);
1762 return;
1763 }
1764 if (nread) {
1765 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1766 c->lastinteraction = time(NULL);
1767 } else {
1768 return;
1769 }
1770 processInputBuffer(c);
1771 }
1772
1773 static int selectDb(redisClient *c, int id) {
1774 if (id < 0 || id >= server.dbnum)
1775 return REDIS_ERR;
1776 c->db = &server.db[id];
1777 return REDIS_OK;
1778 }
1779
1780 static void *dupClientReplyValue(void *o) {
1781 incrRefCount((robj*)o);
1782 return 0;
1783 }
1784
1785 static redisClient *createClient(int fd) {
1786 redisClient *c = zmalloc(sizeof(*c));
1787
1788 anetNonBlock(NULL,fd);
1789 anetTcpNoDelay(NULL,fd);
1790 if (!c) return NULL;
1791 selectDb(c,0);
1792 c->fd = fd;
1793 c->querybuf = sdsempty();
1794 c->argc = 0;
1795 c->argv = NULL;
1796 c->bulklen = -1;
1797 c->multibulk = 0;
1798 c->mbargc = 0;
1799 c->mbargv = NULL;
1800 c->sentlen = 0;
1801 c->flags = 0;
1802 c->lastinteraction = time(NULL);
1803 c->authenticated = 0;
1804 c->replstate = REDIS_REPL_NONE;
1805 c->reply = listCreate();
1806 listSetFreeMethod(c->reply,decrRefCount);
1807 listSetDupMethod(c->reply,dupClientReplyValue);
1808 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1809 readQueryFromClient, c, NULL) == AE_ERR) {
1810 freeClient(c);
1811 return NULL;
1812 }
1813 listAddNodeTail(server.clients,c);
1814 return c;
1815 }
1816
1817 static void addReply(redisClient *c, robj *obj) {
1818 if (listLength(c->reply) == 0 &&
1819 (c->replstate == REDIS_REPL_NONE ||
1820 c->replstate == REDIS_REPL_ONLINE) &&
1821 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1822 sendReplyToClient, c, NULL) == AE_ERR) return;
1823 if (obj->encoding != REDIS_ENCODING_RAW) {
1824 obj = getDecodedObject(obj);
1825 } else {
1826 incrRefCount(obj);
1827 }
1828 listAddNodeTail(c->reply,obj);
1829 }
1830
1831 static void addReplySds(redisClient *c, sds s) {
1832 robj *o = createObject(REDIS_STRING,s);
1833 addReply(c,o);
1834 decrRefCount(o);
1835 }
1836
1837 static void addReplyBulkLen(redisClient *c, robj *obj) {
1838 size_t len;
1839
1840 if (obj->encoding == REDIS_ENCODING_RAW) {
1841 len = sdslen(obj->ptr);
1842 } else {
1843 long n = (long)obj->ptr;
1844
1845 len = 1;
1846 if (n < 0) {
1847 len++;
1848 n = -n;
1849 }
1850 while((n = n/10) != 0) {
1851 len++;
1852 }
1853 }
1854 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1855 }
1856
1857 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1858 int cport, cfd;
1859 char cip[128];
1860 redisClient *c;
1861 REDIS_NOTUSED(el);
1862 REDIS_NOTUSED(mask);
1863 REDIS_NOTUSED(privdata);
1864
1865 cfd = anetAccept(server.neterr, fd, cip, &cport);
1866 if (cfd == AE_ERR) {
1867 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1868 return;
1869 }
1870 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1871 if ((c = createClient(cfd)) == NULL) {
1872 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1873 close(cfd); /* May be already closed, just ingore errors */
1874 return;
1875 }
1876 /* If maxclient directive is set and this is one client more... close the
1877 * connection. Note that we create the client instead to check before
1878 * for this condition, since now the socket is already set in nonblocking
1879 * mode and we can send an error for free using the Kernel I/O */
1880 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1881 char *err = "-ERR max number of clients reached\r\n";
1882
1883 /* That's a best effort error message, don't check write errors */
1884 (void) write(c->fd,err,strlen(err));
1885 freeClient(c);
1886 return;
1887 }
1888 server.stat_numconnections++;
1889 }
1890
1891 /* ======================= Redis objects implementation ===================== */
1892
1893 static robj *createObject(int type, void *ptr) {
1894 robj *o;
1895
1896 if (listLength(server.objfreelist)) {
1897 listNode *head = listFirst(server.objfreelist);
1898 o = listNodeValue(head);
1899 listDelNode(server.objfreelist,head);
1900 } else {
1901 o = zmalloc(sizeof(*o));
1902 }
1903 o->type = type;
1904 o->encoding = REDIS_ENCODING_RAW;
1905 o->ptr = ptr;
1906 o->refcount = 1;
1907 return o;
1908 }
1909
1910 static robj *createStringObject(char *ptr, size_t len) {
1911 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1912 }
1913
1914 static robj *createListObject(void) {
1915 list *l = listCreate();
1916
1917 listSetFreeMethod(l,decrRefCount);
1918 return createObject(REDIS_LIST,l);
1919 }
1920
1921 static robj *createSetObject(void) {
1922 dict *d = dictCreate(&setDictType,NULL);
1923 return createObject(REDIS_SET,d);
1924 }
1925
1926 static robj *createZsetObject(void) {
1927 zset *zs = zmalloc(sizeof(*zs));
1928
1929 zs->dict = dictCreate(&zsetDictType,NULL);
1930 zs->zsl = zslCreate();
1931 return createObject(REDIS_ZSET,zs);
1932 }
1933
1934 static void freeStringObject(robj *o) {
1935 if (o->encoding == REDIS_ENCODING_RAW) {
1936 sdsfree(o->ptr);
1937 }
1938 }
1939
1940 static void freeListObject(robj *o) {
1941 listRelease((list*) o->ptr);
1942 }
1943
1944 static void freeSetObject(robj *o) {
1945 dictRelease((dict*) o->ptr);
1946 }
1947
1948 static void freeZsetObject(robj *o) {
1949 zset *zs = o->ptr;
1950
1951 dictRelease(zs->dict);
1952 zslFree(zs->zsl);
1953 zfree(zs);
1954 }
1955
1956 static void freeHashObject(robj *o) {
1957 dictRelease((dict*) o->ptr);
1958 }
1959
1960 static void incrRefCount(robj *o) {
1961 o->refcount++;
1962 #ifdef DEBUG_REFCOUNT
1963 if (o->type == REDIS_STRING)
1964 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1965 #endif
1966 }
1967
1968 static void decrRefCount(void *obj) {
1969 robj *o = obj;
1970
1971 #ifdef DEBUG_REFCOUNT
1972 if (o->type == REDIS_STRING)
1973 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1974 #endif
1975 if (--(o->refcount) == 0) {
1976 switch(o->type) {
1977 case REDIS_STRING: freeStringObject(o); break;
1978 case REDIS_LIST: freeListObject(o); break;
1979 case REDIS_SET: freeSetObject(o); break;
1980 case REDIS_ZSET: freeZsetObject(o); break;
1981 case REDIS_HASH: freeHashObject(o); break;
1982 default: assert(0 != 0); break;
1983 }
1984 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1985 !listAddNodeHead(server.objfreelist,o))
1986 zfree(o);
1987 }
1988 }
1989
1990 static robj *lookupKey(redisDb *db, robj *key) {
1991 dictEntry *de = dictFind(db->dict,key);
1992 return de ? dictGetEntryVal(de) : NULL;
1993 }
1994
1995 static robj *lookupKeyRead(redisDb *db, robj *key) {
1996 expireIfNeeded(db,key);
1997 return lookupKey(db,key);
1998 }
1999
2000 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2001 deleteIfVolatile(db,key);
2002 return lookupKey(db,key);
2003 }
2004
2005 static int deleteKey(redisDb *db, robj *key) {
2006 int retval;
2007
2008 /* We need to protect key from destruction: after the first dictDelete()
2009 * it may happen that 'key' is no longer valid if we don't increment
2010 * it's count. This may happen when we get the object reference directly
2011 * from the hash table with dictRandomKey() or dict iterators */
2012 incrRefCount(key);
2013 if (dictSize(db->expires)) dictDelete(db->expires,key);
2014 retval = dictDelete(db->dict,key);
2015 decrRefCount(key);
2016
2017 return retval == DICT_OK;
2018 }
2019
2020 /* Try to share an object against the shared objects pool */
2021 static robj *tryObjectSharing(robj *o) {
2022 struct dictEntry *de;
2023 unsigned long c;
2024
2025 if (o == NULL || server.shareobjects == 0) return o;
2026
2027 assert(o->type == REDIS_STRING);
2028 de = dictFind(server.sharingpool,o);
2029 if (de) {
2030 robj *shared = dictGetEntryKey(de);
2031
2032 c = ((unsigned long) dictGetEntryVal(de))+1;
2033 dictGetEntryVal(de) = (void*) c;
2034 incrRefCount(shared);
2035 decrRefCount(o);
2036 return shared;
2037 } else {
2038 /* Here we are using a stream algorihtm: Every time an object is
2039 * shared we increment its count, everytime there is a miss we
2040 * recrement the counter of a random object. If this object reaches
2041 * zero we remove the object and put the current object instead. */
2042 if (dictSize(server.sharingpool) >=
2043 server.sharingpoolsize) {
2044 de = dictGetRandomKey(server.sharingpool);
2045 assert(de != NULL);
2046 c = ((unsigned long) dictGetEntryVal(de))-1;
2047 dictGetEntryVal(de) = (void*) c;
2048 if (c == 0) {
2049 dictDelete(server.sharingpool,de->key);
2050 }
2051 } else {
2052 c = 0; /* If the pool is empty we want to add this object */
2053 }
2054 if (c == 0) {
2055 int retval;
2056
2057 retval = dictAdd(server.sharingpool,o,(void*)1);
2058 assert(retval == DICT_OK);
2059 incrRefCount(o);
2060 }
2061 return o;
2062 }
2063 }
2064
2065 /* Check if the nul-terminated string 's' can be represented by a long
2066 * (that is, is a number that fits into long without any other space or
2067 * character before or after the digits).
2068 *
2069 * If so, the function returns REDIS_OK and *longval is set to the value
2070 * of the number. Otherwise REDIS_ERR is returned */
2071 static int isStringRepresentableAsLong(sds s, long *longval) {
2072 char buf[32], *endptr;
2073 long value;
2074 int slen;
2075
2076 value = strtol(s, &endptr, 10);
2077 if (endptr[0] != '\0') return REDIS_ERR;
2078 slen = snprintf(buf,32,"%ld",value);
2079
2080 /* If the number converted back into a string is not identical
2081 * then it's not possible to encode the string as integer */
2082 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2083 if (longval) *longval = value;
2084 return REDIS_OK;
2085 }
2086
2087 /* Try to encode a string object in order to save space */
2088 static int tryObjectEncoding(robj *o) {
2089 long value;
2090 sds s = o->ptr;
2091
2092 if (o->encoding != REDIS_ENCODING_RAW)
2093 return REDIS_ERR; /* Already encoded */
2094
2095 /* It's not save to encode shared objects: shared objects can be shared
2096 * everywhere in the "object space" of Redis. Encoded objects can only
2097 * appear as "values" (and not, for instance, as keys) */
2098 if (o->refcount > 1) return REDIS_ERR;
2099
2100 /* Currently we try to encode only strings */
2101 assert(o->type == REDIS_STRING);
2102
2103 /* Check if we can represent this string as a long integer */
2104 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2105
2106 /* Ok, this object can be encoded */
2107 o->encoding = REDIS_ENCODING_INT;
2108 sdsfree(o->ptr);
2109 o->ptr = (void*) value;
2110 return REDIS_OK;
2111 }
2112
2113 /* Get a decoded version of an encoded object (returned as a new object) */
2114 static robj *getDecodedObject(const robj *o) {
2115 robj *dec;
2116
2117 assert(o->encoding != REDIS_ENCODING_RAW);
2118 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2119 char buf[32];
2120
2121 snprintf(buf,32,"%ld",(long)o->ptr);
2122 dec = createStringObject(buf,strlen(buf));
2123 return dec;
2124 } else {
2125 assert(1 != 1);
2126 }
2127 }
2128
2129 /* Compare two string objects via strcmp() or alike.
2130 * Note that the objects may be integer-encoded. In such a case we
2131 * use snprintf() to get a string representation of the numbers on the stack
2132 * and compare the strings, it's much faster than calling getDecodedObject(). */
2133 static int compareStringObjects(robj *a, robj *b) {
2134 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2135 char bufa[128], bufb[128], *astr, *bstr;
2136 int bothsds = 1;
2137
2138 if (a == b) return 0;
2139 if (a->encoding != REDIS_ENCODING_RAW) {
2140 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2141 astr = bufa;
2142 bothsds = 0;
2143 } else {
2144 astr = a->ptr;
2145 }
2146 if (b->encoding != REDIS_ENCODING_RAW) {
2147 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2148 bstr = bufb;
2149 bothsds = 0;
2150 } else {
2151 bstr = b->ptr;
2152 }
2153 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2154 }
2155
2156 static size_t stringObjectLen(robj *o) {
2157 assert(o->type == REDIS_STRING);
2158 if (o->encoding == REDIS_ENCODING_RAW) {
2159 return sdslen(o->ptr);
2160 } else {
2161 char buf[32];
2162
2163 return snprintf(buf,32,"%ld",(long)o->ptr);
2164 }
2165 }
2166
2167 /*============================ DB saving/loading ============================ */
2168
2169 static int rdbSaveType(FILE *fp, unsigned char type) {
2170 if (fwrite(&type,1,1,fp) == 0) return -1;
2171 return 0;
2172 }
2173
2174 static int rdbSaveTime(FILE *fp, time_t t) {
2175 int32_t t32 = (int32_t) t;
2176 if (fwrite(&t32,4,1,fp) == 0) return -1;
2177 return 0;
2178 }
2179
2180 /* check rdbLoadLen() comments for more info */
2181 static int rdbSaveLen(FILE *fp, uint32_t len) {
2182 unsigned char buf[2];
2183
2184 if (len < (1<<6)) {
2185 /* Save a 6 bit len */
2186 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2187 if (fwrite(buf,1,1,fp) == 0) return -1;
2188 } else if (len < (1<<14)) {
2189 /* Save a 14 bit len */
2190 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2191 buf[1] = len&0xFF;
2192 if (fwrite(buf,2,1,fp) == 0) return -1;
2193 } else {
2194 /* Save a 32 bit len */
2195 buf[0] = (REDIS_RDB_32BITLEN<<6);
2196 if (fwrite(buf,1,1,fp) == 0) return -1;
2197 len = htonl(len);
2198 if (fwrite(&len,4,1,fp) == 0) return -1;
2199 }
2200 return 0;
2201 }
2202
2203 /* String objects in the form "2391" "-100" without any space and with a
2204 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2205 * encoded as integers to save space */
2206 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2207 long long value;
2208 char *endptr, buf[32];
2209
2210 /* Check if it's possible to encode this value as a number */
2211 value = strtoll(s, &endptr, 10);
2212 if (endptr[0] != '\0') return 0;
2213 snprintf(buf,32,"%lld",value);
2214
2215 /* If the number converted back into a string is not identical
2216 * then it's not possible to encode the string as integer */
2217 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2218
2219 /* Finally check if it fits in our ranges */
2220 if (value >= -(1<<7) && value <= (1<<7)-1) {
2221 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2222 enc[1] = value&0xFF;
2223 return 2;
2224 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2225 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2226 enc[1] = value&0xFF;
2227 enc[2] = (value>>8)&0xFF;
2228 return 3;
2229 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2230 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2231 enc[1] = value&0xFF;
2232 enc[2] = (value>>8)&0xFF;
2233 enc[3] = (value>>16)&0xFF;
2234 enc[4] = (value>>24)&0xFF;
2235 return 5;
2236 } else {
2237 return 0;
2238 }
2239 }
2240
2241 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2242 unsigned int comprlen, outlen;
2243 unsigned char byte;
2244 void *out;
2245
2246 /* We require at least four bytes compression for this to be worth it */
2247 outlen = sdslen(obj->ptr)-4;
2248 if (outlen <= 0) return 0;
2249 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2250 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2251 if (comprlen == 0) {
2252 zfree(out);
2253 return 0;
2254 }
2255 /* Data compressed! Let's save it on disk */
2256 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2257 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2258 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2259 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2260 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2261 zfree(out);
2262 return comprlen;
2263
2264 writeerr:
2265 zfree(out);
2266 return -1;
2267 }
2268
2269 /* Save a string objet as [len][data] on disk. If the object is a string
2270 * representation of an integer value we try to safe it in a special form */
2271 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2272 size_t len;
2273 int enclen;
2274
2275 len = sdslen(obj->ptr);
2276
2277 /* Try integer encoding */
2278 if (len <= 11) {
2279 unsigned char buf[5];
2280 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2281 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2282 return 0;
2283 }
2284 }
2285
2286 /* Try LZF compression - under 20 bytes it's unable to compress even
2287 * aaaaaaaaaaaaaaaaaa so skip it */
2288 if (len > 20) {
2289 int retval;
2290
2291 retval = rdbSaveLzfStringObject(fp,obj);
2292 if (retval == -1) return -1;
2293 if (retval > 0) return 0;
2294 /* retval == 0 means data can't be compressed, save the old way */
2295 }
2296
2297 /* Store verbatim */
2298 if (rdbSaveLen(fp,len) == -1) return -1;
2299 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2300 return 0;
2301 }
2302
2303 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2304 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2305 int retval;
2306 robj *dec;
2307
2308 if (obj->encoding != REDIS_ENCODING_RAW) {
2309 dec = getDecodedObject(obj);
2310 retval = rdbSaveStringObjectRaw(fp,dec);
2311 decrRefCount(dec);
2312 return retval;
2313 } else {
2314 return rdbSaveStringObjectRaw(fp,obj);
2315 }
2316 }
2317
2318 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2319 * 8 bit integer specifing the length of the representation.
2320 * This 8 bit integer has special values in order to specify the following
2321 * conditions:
2322 * 253: not a number
2323 * 254: + inf
2324 * 255: - inf
2325 */
2326 static int rdbSaveDoubleValue(FILE *fp, double val) {
2327 unsigned char buf[128];
2328 int len;
2329
2330 if (isnan(val)) {
2331 buf[0] = 253;
2332 len = 1;
2333 } else if (!isfinite(val)) {
2334 len = 1;
2335 buf[0] = (val < 0) ? 255 : 254;
2336 } else {
2337 snprintf((char*)buf+1,sizeof(buf)-1,"%.16g",val);
2338 buf[0] = strlen((char*)buf);
2339 len = buf[0]+1;
2340 }
2341 if (fwrite(buf,len,1,fp) == 0) return -1;
2342 return 0;
2343 }
2344
2345 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2346 static int rdbSave(char *filename) {
2347 dictIterator *di = NULL;
2348 dictEntry *de;
2349 FILE *fp;
2350 char tmpfile[256];
2351 int j;
2352 time_t now = time(NULL);
2353
2354 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2355 fp = fopen(tmpfile,"w");
2356 if (!fp) {
2357 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2358 return REDIS_ERR;
2359 }
2360 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2361 for (j = 0; j < server.dbnum; j++) {
2362 redisDb *db = server.db+j;
2363 dict *d = db->dict;
2364 if (dictSize(d) == 0) continue;
2365 di = dictGetIterator(d);
2366 if (!di) {
2367 fclose(fp);
2368 return REDIS_ERR;
2369 }
2370
2371 /* Write the SELECT DB opcode */
2372 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2373 if (rdbSaveLen(fp,j) == -1) goto werr;
2374
2375 /* Iterate this DB writing every entry */
2376 while((de = dictNext(di)) != NULL) {
2377 robj *key = dictGetEntryKey(de);
2378 robj *o = dictGetEntryVal(de);
2379 time_t expiretime = getExpire(db,key);
2380
2381 /* Save the expire time */
2382 if (expiretime != -1) {
2383 /* If this key is already expired skip it */
2384 if (expiretime < now) continue;
2385 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2386 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2387 }
2388 /* Save the key and associated value */
2389 if (rdbSaveType(fp,o->type) == -1) goto werr;
2390 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2391 if (o->type == REDIS_STRING) {
2392 /* Save a string value */
2393 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2394 } else if (o->type == REDIS_LIST) {
2395 /* Save a list value */
2396 list *list = o->ptr;
2397 listNode *ln;
2398
2399 listRewind(list);
2400 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2401 while((ln = listYield(list))) {
2402 robj *eleobj = listNodeValue(ln);
2403
2404 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2405 }
2406 } else if (o->type == REDIS_SET) {
2407 /* Save a set value */
2408 dict *set = o->ptr;
2409 dictIterator *di = dictGetIterator(set);
2410 dictEntry *de;
2411
2412 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2413 while((de = dictNext(di)) != NULL) {
2414 robj *eleobj = dictGetEntryKey(de);
2415
2416 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2417 }
2418 dictReleaseIterator(di);
2419 } else if (o->type == REDIS_ZSET) {
2420 /* Save a set value */
2421 zset *zs = o->ptr;
2422 dictIterator *di = dictGetIterator(zs->dict);
2423 dictEntry *de;
2424
2425 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2426 while((de = dictNext(di)) != NULL) {
2427 robj *eleobj = dictGetEntryKey(de);
2428 double *score = dictGetEntryVal(de);
2429
2430 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2431 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2432 }
2433 dictReleaseIterator(di);
2434 } else {
2435 assert(0 != 0);
2436 }
2437 }
2438 dictReleaseIterator(di);
2439 }
2440 /* EOF opcode */
2441 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2442
2443 /* Make sure data will not remain on the OS's output buffers */
2444 fflush(fp);
2445 fsync(fileno(fp));
2446 fclose(fp);
2447
2448 /* Use RENAME to make sure the DB file is changed atomically only
2449 * if the generate DB file is ok. */
2450 if (rename(tmpfile,filename) == -1) {
2451 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2452 unlink(tmpfile);
2453 return REDIS_ERR;
2454 }
2455 redisLog(REDIS_NOTICE,"DB saved on disk");
2456 server.dirty = 0;
2457 server.lastsave = time(NULL);
2458 return REDIS_OK;
2459
2460 werr:
2461 fclose(fp);
2462 unlink(tmpfile);
2463 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2464 if (di) dictReleaseIterator(di);
2465 return REDIS_ERR;
2466 }
2467
2468 static int rdbSaveBackground(char *filename) {
2469 pid_t childpid;
2470
2471 if (server.bgsaveinprogress) return REDIS_ERR;
2472 if ((childpid = fork()) == 0) {
2473 /* Child */
2474 close(server.fd);
2475 if (rdbSave(filename) == REDIS_OK) {
2476 exit(0);
2477 } else {
2478 exit(1);
2479 }
2480 } else {
2481 /* Parent */
2482 if (childpid == -1) {
2483 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2484 strerror(errno));
2485 return REDIS_ERR;
2486 }
2487 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2488 server.bgsaveinprogress = 1;
2489 server.bgsavechildpid = childpid;
2490 return REDIS_OK;
2491 }
2492 return REDIS_OK; /* unreached */
2493 }
2494
2495 static void rdbRemoveTempFile(pid_t childpid) {
2496 char tmpfile[256];
2497
2498 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2499 unlink(tmpfile);
2500 }
2501
2502 static int rdbLoadType(FILE *fp) {
2503 unsigned char type;
2504 if (fread(&type,1,1,fp) == 0) return -1;
2505 return type;
2506 }
2507
2508 static time_t rdbLoadTime(FILE *fp) {
2509 int32_t t32;
2510 if (fread(&t32,4,1,fp) == 0) return -1;
2511 return (time_t) t32;
2512 }
2513
2514 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2515 * of this file for a description of how this are stored on disk.
2516 *
2517 * isencoded is set to 1 if the readed length is not actually a length but
2518 * an "encoding type", check the above comments for more info */
2519 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2520 unsigned char buf[2];
2521 uint32_t len;
2522
2523 if (isencoded) *isencoded = 0;
2524 if (rdbver == 0) {
2525 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2526 return ntohl(len);
2527 } else {
2528 int type;
2529
2530 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2531 type = (buf[0]&0xC0)>>6;
2532 if (type == REDIS_RDB_6BITLEN) {
2533 /* Read a 6 bit len */
2534 return buf[0]&0x3F;
2535 } else if (type == REDIS_RDB_ENCVAL) {
2536 /* Read a 6 bit len encoding type */
2537 if (isencoded) *isencoded = 1;
2538 return buf[0]&0x3F;
2539 } else if (type == REDIS_RDB_14BITLEN) {
2540 /* Read a 14 bit len */
2541 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2542 return ((buf[0]&0x3F)<<8)|buf[1];
2543 } else {
2544 /* Read a 32 bit len */
2545 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2546 return ntohl(len);
2547 }
2548 }
2549 }
2550
2551 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2552 unsigned char enc[4];
2553 long long val;
2554
2555 if (enctype == REDIS_RDB_ENC_INT8) {
2556 if (fread(enc,1,1,fp) == 0) return NULL;
2557 val = (signed char)enc[0];
2558 } else if (enctype == REDIS_RDB_ENC_INT16) {
2559 uint16_t v;
2560 if (fread(enc,2,1,fp) == 0) return NULL;
2561 v = enc[0]|(enc[1]<<8);
2562 val = (int16_t)v;
2563 } else if (enctype == REDIS_RDB_ENC_INT32) {
2564 uint32_t v;
2565 if (fread(enc,4,1,fp) == 0) return NULL;
2566 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2567 val = (int32_t)v;
2568 } else {
2569 val = 0; /* anti-warning */
2570 assert(0!=0);
2571 }
2572 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2573 }
2574
2575 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2576 unsigned int len, clen;
2577 unsigned char *c = NULL;
2578 sds val = NULL;
2579
2580 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2581 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2582 if ((c = zmalloc(clen)) == NULL) goto err;
2583 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2584 if (fread(c,clen,1,fp) == 0) goto err;
2585 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2586 zfree(c);
2587 return createObject(REDIS_STRING,val);
2588 err:
2589 zfree(c);
2590 sdsfree(val);
2591 return NULL;
2592 }
2593
2594 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2595 int isencoded;
2596 uint32_t len;
2597 sds val;
2598
2599 len = rdbLoadLen(fp,rdbver,&isencoded);
2600 if (isencoded) {
2601 switch(len) {
2602 case REDIS_RDB_ENC_INT8:
2603 case REDIS_RDB_ENC_INT16:
2604 case REDIS_RDB_ENC_INT32:
2605 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2606 case REDIS_RDB_ENC_LZF:
2607 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2608 default:
2609 assert(0!=0);
2610 }
2611 }
2612
2613 if (len == REDIS_RDB_LENERR) return NULL;
2614 val = sdsnewlen(NULL,len);
2615 if (len && fread(val,len,1,fp) == 0) {
2616 sdsfree(val);
2617 return NULL;
2618 }
2619 return tryObjectSharing(createObject(REDIS_STRING,val));
2620 }
2621
2622 /* For information about double serialization check rdbSaveDoubleValue() */
2623 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2624 char buf[128];
2625 unsigned char len;
2626
2627 if (fread(&len,1,1,fp) == 0) return -1;
2628 switch(len) {
2629 case 255: *val = R_NegInf; return 0;
2630 case 254: *val = R_PosInf; return 0;
2631 case 253: *val = R_Nan; return 0;
2632 default:
2633 if (fread(buf,len,1,fp) == 0) return -1;
2634 sscanf(buf, "%lg", val);
2635 return 0;
2636 }
2637 }
2638
2639 static int rdbLoad(char *filename) {
2640 FILE *fp;
2641 robj *keyobj = NULL;
2642 uint32_t dbid;
2643 int type, retval, rdbver;
2644 dict *d = server.db[0].dict;
2645 redisDb *db = server.db+0;
2646 char buf[1024];
2647 time_t expiretime = -1, now = time(NULL);
2648
2649 fp = fopen(filename,"r");
2650 if (!fp) return REDIS_ERR;
2651 if (fread(buf,9,1,fp) == 0) goto eoferr;
2652 buf[9] = '\0';
2653 if (memcmp(buf,"REDIS",5) != 0) {
2654 fclose(fp);
2655 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2656 return REDIS_ERR;
2657 }
2658 rdbver = atoi(buf+5);
2659 if (rdbver > 1) {
2660 fclose(fp);
2661 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2662 return REDIS_ERR;
2663 }
2664 while(1) {
2665 robj *o;
2666
2667 /* Read type. */
2668 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2669 if (type == REDIS_EXPIRETIME) {
2670 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2671 /* We read the time so we need to read the object type again */
2672 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2673 }
2674 if (type == REDIS_EOF) break;
2675 /* Handle SELECT DB opcode as a special case */
2676 if (type == REDIS_SELECTDB) {
2677 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2678 goto eoferr;
2679 if (dbid >= (unsigned)server.dbnum) {
2680 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2681 exit(1);
2682 }
2683 db = server.db+dbid;
2684 d = db->dict;
2685 continue;
2686 }
2687 /* Read key */
2688 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2689
2690 if (type == REDIS_STRING) {
2691 /* Read string value */
2692 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2693 tryObjectEncoding(o);
2694 } else if (type == REDIS_LIST || type == REDIS_SET) {
2695 /* Read list/set value */
2696 uint32_t listlen;
2697
2698 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2699 goto eoferr;
2700 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2701 /* Load every single element of the list/set */
2702 while(listlen--) {
2703 robj *ele;
2704
2705 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2706 tryObjectEncoding(ele);
2707 if (type == REDIS_LIST) {
2708 listAddNodeTail((list*)o->ptr,ele);
2709 } else {
2710 dictAdd((dict*)o->ptr,ele,NULL);
2711 }
2712 }
2713 } else if (type == REDIS_ZSET) {
2714 /* Read list/set value */
2715 uint32_t zsetlen;
2716 zset *zs;
2717
2718 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2719 goto eoferr;
2720 o = createZsetObject();
2721 zs = o->ptr;
2722 /* Load every single element of the list/set */
2723 while(zsetlen--) {
2724 robj *ele;
2725 double *score = zmalloc(sizeof(double));
2726
2727 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2728 tryObjectEncoding(ele);
2729 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2730 dictAdd(zs->dict,ele,score);
2731 zslInsert(zs->zsl,*score,ele);
2732 incrRefCount(ele); /* added to skiplist */
2733 }
2734 } else {
2735 assert(0 != 0);
2736 }
2737 /* Add the new object in the hash table */
2738 retval = dictAdd(d,keyobj,o);
2739 if (retval == DICT_ERR) {
2740 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2741 exit(1);
2742 }
2743 /* Set the expire time if needed */
2744 if (expiretime != -1) {
2745 setExpire(db,keyobj,expiretime);
2746 /* Delete this key if already expired */
2747 if (expiretime < now) deleteKey(db,keyobj);
2748 expiretime = -1;
2749 }
2750 keyobj = o = NULL;
2751 }
2752 fclose(fp);
2753 return REDIS_OK;
2754
2755 eoferr: /* unexpected end of file is handled here with a fatal exit */
2756 if (keyobj) decrRefCount(keyobj);
2757 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2758 exit(1);
2759 return REDIS_ERR; /* Just to avoid warning */
2760 }
2761
2762 /*================================== Commands =============================== */
2763
2764 static void authCommand(redisClient *c) {
2765 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2766 c->authenticated = 1;
2767 addReply(c,shared.ok);
2768 } else {
2769 c->authenticated = 0;
2770 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2771 }
2772 }
2773
2774 static void pingCommand(redisClient *c) {
2775 addReply(c,shared.pong);
2776 }
2777
2778 static void echoCommand(redisClient *c) {
2779 addReplyBulkLen(c,c->argv[1]);
2780 addReply(c,c->argv[1]);
2781 addReply(c,shared.crlf);
2782 }
2783
2784 /*=================================== Strings =============================== */
2785
2786 static void setGenericCommand(redisClient *c, int nx) {
2787 int retval;
2788
2789 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2790 if (retval == DICT_ERR) {
2791 if (!nx) {
2792 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2793 incrRefCount(c->argv[2]);
2794 } else {
2795 addReply(c,shared.czero);
2796 return;
2797 }
2798 } else {
2799 incrRefCount(c->argv[1]);
2800 incrRefCount(c->argv[2]);
2801 }
2802 server.dirty++;
2803 removeExpire(c->db,c->argv[1]);
2804 addReply(c, nx ? shared.cone : shared.ok);
2805 }
2806
2807 static void setCommand(redisClient *c) {
2808 setGenericCommand(c,0);
2809 }
2810
2811 static void setnxCommand(redisClient *c) {
2812 setGenericCommand(c,1);
2813 }
2814
2815 static void getCommand(redisClient *c) {
2816 robj *o = lookupKeyRead(c->db,c->argv[1]);
2817
2818 if (o == NULL) {
2819 addReply(c,shared.nullbulk);
2820 } else {
2821 if (o->type != REDIS_STRING) {
2822 addReply(c,shared.wrongtypeerr);
2823 } else {
2824 addReplyBulkLen(c,o);
2825 addReply(c,o);
2826 addReply(c,shared.crlf);
2827 }
2828 }
2829 }
2830
2831 static void getsetCommand(redisClient *c) {
2832 getCommand(c);
2833 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2834 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2835 } else {
2836 incrRefCount(c->argv[1]);
2837 }
2838 incrRefCount(c->argv[2]);
2839 server.dirty++;
2840 removeExpire(c->db,c->argv[1]);
2841 }
2842
2843 static void mgetCommand(redisClient *c) {
2844 int j;
2845
2846 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2847 for (j = 1; j < c->argc; j++) {
2848 robj *o = lookupKeyRead(c->db,c->argv[j]);
2849 if (o == NULL) {
2850 addReply(c,shared.nullbulk);
2851 } else {
2852 if (o->type != REDIS_STRING) {
2853 addReply(c,shared.nullbulk);
2854 } else {
2855 addReplyBulkLen(c,o);
2856 addReply(c,o);
2857 addReply(c,shared.crlf);
2858 }
2859 }
2860 }
2861 }
2862
2863 static void incrDecrCommand(redisClient *c, long long incr) {
2864 long long value;
2865 int retval;
2866 robj *o;
2867
2868 o = lookupKeyWrite(c->db,c->argv[1]);
2869 if (o == NULL) {
2870 value = 0;
2871 } else {
2872 if (o->type != REDIS_STRING) {
2873 value = 0;
2874 } else {
2875 char *eptr;
2876
2877 if (o->encoding == REDIS_ENCODING_RAW)
2878 value = strtoll(o->ptr, &eptr, 10);
2879 else if (o->encoding == REDIS_ENCODING_INT)
2880 value = (long)o->ptr;
2881 else
2882 assert(1 != 1);
2883 }
2884 }
2885
2886 value += incr;
2887 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2888 tryObjectEncoding(o);
2889 retval = dictAdd(c->db->dict,c->argv[1],o);
2890 if (retval == DICT_ERR) {
2891 dictReplace(c->db->dict,c->argv[1],o);
2892 removeExpire(c->db,c->argv[1]);
2893 } else {
2894 incrRefCount(c->argv[1]);
2895 }
2896 server.dirty++;
2897 addReply(c,shared.colon);
2898 addReply(c,o);
2899 addReply(c,shared.crlf);
2900 }
2901
2902 static void incrCommand(redisClient *c) {
2903 incrDecrCommand(c,1);
2904 }
2905
2906 static void decrCommand(redisClient *c) {
2907 incrDecrCommand(c,-1);
2908 }
2909
2910 static void incrbyCommand(redisClient *c) {
2911 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2912 incrDecrCommand(c,incr);
2913 }
2914
2915 static void decrbyCommand(redisClient *c) {
2916 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2917 incrDecrCommand(c,-incr);
2918 }
2919
2920 /* ========================= Type agnostic commands ========================= */
2921
2922 static void delCommand(redisClient *c) {
2923 int deleted = 0, j;
2924
2925 for (j = 1; j < c->argc; j++) {
2926 if (deleteKey(c->db,c->argv[j])) {
2927 server.dirty++;
2928 deleted++;
2929 }
2930 }
2931 switch(deleted) {
2932 case 0:
2933 addReply(c,shared.czero);
2934 break;
2935 case 1:
2936 addReply(c,shared.cone);
2937 break;
2938 default:
2939 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2940 break;
2941 }
2942 }
2943
2944 static void existsCommand(redisClient *c) {
2945 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2946 }
2947
2948 static void selectCommand(redisClient *c) {
2949 int id = atoi(c->argv[1]->ptr);
2950
2951 if (selectDb(c,id) == REDIS_ERR) {
2952 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2953 } else {
2954 addReply(c,shared.ok);
2955 }
2956 }
2957
2958 static void randomkeyCommand(redisClient *c) {
2959 dictEntry *de;
2960
2961 while(1) {
2962 de = dictGetRandomKey(c->db->dict);
2963 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2964 }
2965 if (de == NULL) {
2966 addReply(c,shared.plus);
2967 addReply(c,shared.crlf);
2968 } else {
2969 addReply(c,shared.plus);
2970 addReply(c,dictGetEntryKey(de));
2971 addReply(c,shared.crlf);
2972 }
2973 }
2974
2975 static void keysCommand(redisClient *c) {
2976 dictIterator *di;
2977 dictEntry *de;
2978 sds pattern = c->argv[1]->ptr;
2979 int plen = sdslen(pattern);
2980 int numkeys = 0, keyslen = 0;
2981 robj *lenobj = createObject(REDIS_STRING,NULL);
2982
2983 di = dictGetIterator(c->db->dict);
2984 addReply(c,lenobj);
2985 decrRefCount(lenobj);
2986 while((de = dictNext(di)) != NULL) {
2987 robj *keyobj = dictGetEntryKey(de);
2988
2989 sds key = keyobj->ptr;
2990 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2991 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2992 if (expireIfNeeded(c->db,keyobj) == 0) {
2993 if (numkeys != 0)
2994 addReply(c,shared.space);
2995 addReply(c,keyobj);
2996 numkeys++;
2997 keyslen += sdslen(key);
2998 }
2999 }
3000 }
3001 dictReleaseIterator(di);
3002 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3003 addReply(c,shared.crlf);
3004 }
3005
3006 static void dbsizeCommand(redisClient *c) {
3007 addReplySds(c,
3008 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3009 }
3010
3011 static void lastsaveCommand(redisClient *c) {
3012 addReplySds(c,
3013 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3014 }
3015
3016 static void typeCommand(redisClient *c) {
3017 robj *o;
3018 char *type;
3019
3020 o = lookupKeyRead(c->db,c->argv[1]);
3021 if (o == NULL) {
3022 type = "+none";
3023 } else {
3024 switch(o->type) {
3025 case REDIS_STRING: type = "+string"; break;
3026 case REDIS_LIST: type = "+list"; break;
3027 case REDIS_SET: type = "+set"; break;
3028 case REDIS_ZSET: type = "+zset"; break;
3029 default: type = "unknown"; break;
3030 }
3031 }
3032 addReplySds(c,sdsnew(type));
3033 addReply(c,shared.crlf);
3034 }
3035
3036 static void saveCommand(redisClient *c) {
3037 if (server.bgsaveinprogress) {
3038 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3039 return;
3040 }
3041 if (rdbSave(server.dbfilename) == REDIS_OK) {
3042 addReply(c,shared.ok);
3043 } else {
3044 addReply(c,shared.err);
3045 }
3046 }
3047
3048 static void bgsaveCommand(redisClient *c) {
3049 if (server.bgsaveinprogress) {
3050 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3051 return;
3052 }
3053 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3054 addReply(c,shared.ok);
3055 } else {
3056 addReply(c,shared.err);
3057 }
3058 }
3059
3060 static void shutdownCommand(redisClient *c) {
3061 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3062 /* Kill the saving child if there is a background saving in progress.
3063 We want to avoid race conditions, for instance our saving child may
3064 overwrite the synchronous saving did by SHUTDOWN. */
3065 if (server.bgsaveinprogress) {
3066 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3067 kill(server.bgsavechildpid,SIGKILL);
3068 rdbRemoveTempFile(server.bgsavechildpid);
3069 }
3070 /* SYNC SAVE */
3071 if (rdbSave(server.dbfilename) == REDIS_OK) {
3072 if (server.daemonize)
3073 unlink(server.pidfile);
3074 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3075 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3076 exit(1);
3077 } else {
3078 /* Ooops.. error saving! The best we can do is to continue operating.
3079 * Note that if there was a background saving process, in the next
3080 * cron() Redis will be notified that the background saving aborted,
3081 * handling special stuff like slaves pending for synchronization... */
3082 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3083 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3084 }
3085 }
3086
3087 static void renameGenericCommand(redisClient *c, int nx) {
3088 robj *o;
3089
3090 /* To use the same key as src and dst is probably an error */
3091 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3092 addReply(c,shared.sameobjecterr);
3093 return;
3094 }
3095
3096 o = lookupKeyWrite(c->db,c->argv[1]);
3097 if (o == NULL) {
3098 addReply(c,shared.nokeyerr);
3099 return;
3100 }
3101 incrRefCount(o);
3102 deleteIfVolatile(c->db,c->argv[2]);
3103 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3104 if (nx) {
3105 decrRefCount(o);
3106 addReply(c,shared.czero);
3107 return;
3108 }
3109 dictReplace(c->db->dict,c->argv[2],o);
3110 } else {
3111 incrRefCount(c->argv[2]);
3112 }
3113 deleteKey(c->db,c->argv[1]);
3114 server.dirty++;
3115 addReply(c,nx ? shared.cone : shared.ok);
3116 }
3117
3118 static void renameCommand(redisClient *c) {
3119 renameGenericCommand(c,0);
3120 }
3121
3122 static void renamenxCommand(redisClient *c) {
3123 renameGenericCommand(c,1);
3124 }
3125
3126 static void moveCommand(redisClient *c) {
3127 robj *o;
3128 redisDb *src, *dst;
3129 int srcid;
3130
3131 /* Obtain source and target DB pointers */
3132 src = c->db;
3133 srcid = c->db->id;
3134 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3135 addReply(c,shared.outofrangeerr);
3136 return;
3137 }
3138 dst = c->db;
3139 selectDb(c,srcid); /* Back to the source DB */
3140
3141 /* If the user is moving using as target the same
3142 * DB as the source DB it is probably an error. */
3143 if (src == dst) {
3144 addReply(c,shared.sameobjecterr);
3145 return;
3146 }
3147
3148 /* Check if the element exists and get a reference */
3149 o = lookupKeyWrite(c->db,c->argv[1]);
3150 if (!o) {
3151 addReply(c,shared.czero);
3152 return;
3153 }
3154
3155 /* Try to add the element to the target DB */
3156 deleteIfVolatile(dst,c->argv[1]);
3157 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3158 addReply(c,shared.czero);
3159 return;
3160 }
3161 incrRefCount(c->argv[1]);
3162 incrRefCount(o);
3163
3164 /* OK! key moved, free the entry in the source DB */
3165 deleteKey(src,c->argv[1]);
3166 server.dirty++;
3167 addReply(c,shared.cone);
3168 }
3169
3170 /* =================================== Lists ================================ */
3171 static void pushGenericCommand(redisClient *c, int where) {
3172 robj *lobj;
3173 list *list;
3174
3175 lobj = lookupKeyWrite(c->db,c->argv[1]);
3176 if (lobj == NULL) {
3177 lobj = createListObject();
3178 list = lobj->ptr;
3179 if (where == REDIS_HEAD) {
3180 listAddNodeHead(list,c->argv[2]);
3181 } else {
3182 listAddNodeTail(list,c->argv[2]);
3183 }
3184 dictAdd(c->db->dict,c->argv[1],lobj);
3185 incrRefCount(c->argv[1]);
3186 incrRefCount(c->argv[2]);
3187 } else {
3188 if (lobj->type != REDIS_LIST) {
3189 addReply(c,shared.wrongtypeerr);
3190 return;
3191 }
3192 list = lobj->ptr;
3193 if (where == REDIS_HEAD) {
3194 listAddNodeHead(list,c->argv[2]);
3195 } else {
3196 listAddNodeTail(list,c->argv[2]);
3197 }
3198 incrRefCount(c->argv[2]);
3199 }
3200 server.dirty++;
3201 addReply(c,shared.ok);
3202 }
3203
3204 static void lpushCommand(redisClient *c) {
3205 pushGenericCommand(c,REDIS_HEAD);
3206 }
3207
3208 static void rpushCommand(redisClient *c) {
3209 pushGenericCommand(c,REDIS_TAIL);
3210 }
3211
3212 static void llenCommand(redisClient *c) {
3213 robj *o;
3214 list *l;
3215
3216 o = lookupKeyRead(c->db,c->argv[1]);
3217 if (o == NULL) {
3218 addReply(c,shared.czero);
3219 return;
3220 } else {
3221 if (o->type != REDIS_LIST) {
3222 addReply(c,shared.wrongtypeerr);
3223 } else {
3224 l = o->ptr;
3225 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3226 }
3227 }
3228 }
3229
3230 static void lindexCommand(redisClient *c) {
3231 robj *o;
3232 int index = atoi(c->argv[2]->ptr);
3233
3234 o = lookupKeyRead(c->db,c->argv[1]);
3235 if (o == NULL) {
3236 addReply(c,shared.nullbulk);
3237 } else {
3238 if (o->type != REDIS_LIST) {
3239 addReply(c,shared.wrongtypeerr);
3240 } else {
3241 list *list = o->ptr;
3242 listNode *ln;
3243
3244 ln = listIndex(list, index);
3245 if (ln == NULL) {
3246 addReply(c,shared.nullbulk);
3247 } else {
3248 robj *ele = listNodeValue(ln);
3249 addReplyBulkLen(c,ele);
3250 addReply(c,ele);
3251 addReply(c,shared.crlf);
3252 }
3253 }
3254 }
3255 }
3256
3257 static void lsetCommand(redisClient *c) {
3258 robj *o;
3259 int index = atoi(c->argv[2]->ptr);
3260
3261 o = lookupKeyWrite(c->db,c->argv[1]);
3262 if (o == NULL) {
3263 addReply(c,shared.nokeyerr);
3264 } else {
3265 if (o->type != REDIS_LIST) {
3266 addReply(c,shared.wrongtypeerr);
3267 } else {
3268 list *list = o->ptr;
3269 listNode *ln;
3270
3271 ln = listIndex(list, index);
3272 if (ln == NULL) {
3273 addReply(c,shared.outofrangeerr);
3274 } else {
3275 robj *ele = listNodeValue(ln);
3276
3277 decrRefCount(ele);
3278 listNodeValue(ln) = c->argv[3];
3279 incrRefCount(c->argv[3]);
3280 addReply(c,shared.ok);
3281 server.dirty++;
3282 }
3283 }
3284 }
3285 }
3286
3287 static void popGenericCommand(redisClient *c, int where) {
3288 robj *o;
3289
3290 o = lookupKeyWrite(c->db,c->argv[1]);
3291 if (o == NULL) {
3292 addReply(c,shared.nullbulk);
3293 } else {
3294 if (o->type != REDIS_LIST) {
3295 addReply(c,shared.wrongtypeerr);
3296 } else {
3297 list *list = o->ptr;
3298 listNode *ln;
3299
3300 if (where == REDIS_HEAD)
3301 ln = listFirst(list);
3302 else
3303 ln = listLast(list);
3304
3305 if (ln == NULL) {
3306 addReply(c,shared.nullbulk);
3307 } else {
3308 robj *ele = listNodeValue(ln);
3309 addReplyBulkLen(c,ele);
3310 addReply(c,ele);
3311 addReply(c,shared.crlf);
3312 listDelNode(list,ln);
3313 server.dirty++;
3314 }
3315 }
3316 }
3317 }
3318
3319 static void lpopCommand(redisClient *c) {
3320 popGenericCommand(c,REDIS_HEAD);
3321 }
3322
3323 static void rpopCommand(redisClient *c) {
3324 popGenericCommand(c,REDIS_TAIL);
3325 }
3326
3327 static void lrangeCommand(redisClient *c) {
3328 robj *o;
3329 int start = atoi(c->argv[2]->ptr);
3330 int end = atoi(c->argv[3]->ptr);
3331
3332 o = lookupKeyRead(c->db,c->argv[1]);
3333 if (o == NULL) {
3334 addReply(c,shared.nullmultibulk);
3335 } else {
3336 if (o->type != REDIS_LIST) {
3337 addReply(c,shared.wrongtypeerr);
3338 } else {
3339 list *list = o->ptr;
3340 listNode *ln;
3341 int llen = listLength(list);
3342 int rangelen, j;
3343 robj *ele;
3344
3345 /* convert negative indexes */
3346 if (start < 0) start = llen+start;
3347 if (end < 0) end = llen+end;
3348 if (start < 0) start = 0;
3349 if (end < 0) end = 0;
3350
3351 /* indexes sanity checks */
3352 if (start > end || start >= llen) {
3353 /* Out of range start or start > end result in empty list */
3354 addReply(c,shared.emptymultibulk);
3355 return;
3356 }
3357 if (end >= llen) end = llen-1;
3358 rangelen = (end-start)+1;
3359
3360 /* Return the result in form of a multi-bulk reply */
3361 ln = listIndex(list, start);
3362 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3363 for (j = 0; j < rangelen; j++) {
3364 ele = listNodeValue(ln);
3365 addReplyBulkLen(c,ele);
3366 addReply(c,ele);
3367 addReply(c,shared.crlf);
3368 ln = ln->next;
3369 }
3370 }
3371 }
3372 }
3373
3374 static void ltrimCommand(redisClient *c) {
3375 robj *o;
3376 int start = atoi(c->argv[2]->ptr);
3377 int end = atoi(c->argv[3]->ptr);
3378
3379 o = lookupKeyWrite(c->db,c->argv[1]);
3380 if (o == NULL) {
3381 addReply(c,shared.nokeyerr);
3382 } else {
3383 if (o->type != REDIS_LIST) {
3384 addReply(c,shared.wrongtypeerr);
3385 } else {
3386 list *list = o->ptr;
3387 listNode *ln;
3388 int llen = listLength(list);
3389 int j, ltrim, rtrim;
3390
3391 /* convert negative indexes */
3392 if (start < 0) start = llen+start;
3393 if (end < 0) end = llen+end;
3394 if (start < 0) start = 0;
3395 if (end < 0) end = 0;
3396
3397 /* indexes sanity checks */
3398 if (start > end || start >= llen) {
3399 /* Out of range start or start > end result in empty list */
3400 ltrim = llen;
3401 rtrim = 0;
3402 } else {
3403 if (end >= llen) end = llen-1;
3404 ltrim = start;
3405 rtrim = llen-end-1;
3406 }
3407
3408 /* Remove list elements to perform the trim */
3409 for (j = 0; j < ltrim; j++) {
3410 ln = listFirst(list);
3411 listDelNode(list,ln);
3412 }
3413 for (j = 0; j < rtrim; j++) {
3414 ln = listLast(list);
3415 listDelNode(list,ln);
3416 }
3417 server.dirty++;
3418 addReply(c,shared.ok);
3419 }
3420 }
3421 }
3422
3423 static void lremCommand(redisClient *c) {
3424 robj *o;
3425
3426 o = lookupKeyWrite(c->db,c->argv[1]);
3427 if (o == NULL) {
3428 addReply(c,shared.czero);
3429 } else {
3430 if (o->type != REDIS_LIST) {
3431 addReply(c,shared.wrongtypeerr);
3432 } else {
3433 list *list = o->ptr;
3434 listNode *ln, *next;
3435 int toremove = atoi(c->argv[2]->ptr);
3436 int removed = 0;
3437 int fromtail = 0;
3438
3439 if (toremove < 0) {
3440 toremove = -toremove;
3441 fromtail = 1;
3442 }
3443 ln = fromtail ? list->tail : list->head;
3444 while (ln) {
3445 robj *ele = listNodeValue(ln);
3446
3447 next = fromtail ? ln->prev : ln->next;
3448 if (compareStringObjects(ele,c->argv[3]) == 0) {
3449 listDelNode(list,ln);
3450 server.dirty++;
3451 removed++;
3452 if (toremove && removed == toremove) break;
3453 }
3454 ln = next;
3455 }
3456 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3457 }
3458 }
3459 }
3460
3461 /* ==================================== Sets ================================ */
3462
3463 static void saddCommand(redisClient *c) {
3464 robj *set;
3465
3466 set = lookupKeyWrite(c->db,c->argv[1]);
3467 if (set == NULL) {
3468 set = createSetObject();
3469 dictAdd(c->db->dict,c->argv[1],set);
3470 incrRefCount(c->argv[1]);
3471 } else {
3472 if (set->type != REDIS_SET) {
3473 addReply(c,shared.wrongtypeerr);
3474 return;
3475 }
3476 }
3477 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3478 incrRefCount(c->argv[2]);
3479 server.dirty++;
3480 addReply(c,shared.cone);
3481 } else {
3482 addReply(c,shared.czero);
3483 }
3484 }
3485
3486 static void sremCommand(redisClient *c) {
3487 robj *set;
3488
3489 set = lookupKeyWrite(c->db,c->argv[1]);
3490 if (set == NULL) {
3491 addReply(c,shared.czero);
3492 } else {
3493 if (set->type != REDIS_SET) {
3494 addReply(c,shared.wrongtypeerr);
3495 return;
3496 }
3497 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3498 server.dirty++;
3499 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3500 addReply(c,shared.cone);
3501 } else {
3502 addReply(c,shared.czero);
3503 }
3504 }
3505 }
3506
3507 static void smoveCommand(redisClient *c) {
3508 robj *srcset, *dstset;
3509
3510 srcset = lookupKeyWrite(c->db,c->argv[1]);
3511 dstset = lookupKeyWrite(c->db,c->argv[2]);
3512
3513 /* If the source key does not exist return 0, if it's of the wrong type
3514 * raise an error */
3515 if (srcset == NULL || srcset->type != REDIS_SET) {
3516 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3517 return;
3518 }
3519 /* Error if the destination key is not a set as well */
3520 if (dstset && dstset->type != REDIS_SET) {
3521 addReply(c,shared.wrongtypeerr);
3522 return;
3523 }
3524 /* Remove the element from the source set */
3525 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3526 /* Key not found in the src set! return zero */
3527 addReply(c,shared.czero);
3528 return;
3529 }
3530 server.dirty++;
3531 /* Add the element to the destination set */
3532 if (!dstset) {
3533 dstset = createSetObject();
3534 dictAdd(c->db->dict,c->argv[2],dstset);
3535 incrRefCount(c->argv[2]);
3536 }
3537 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3538 incrRefCount(c->argv[3]);
3539 addReply(c,shared.cone);
3540 }
3541
3542 static void sismemberCommand(redisClient *c) {
3543 robj *set;
3544
3545 set = lookupKeyRead(c->db,c->argv[1]);
3546 if (set == NULL) {
3547 addReply(c,shared.czero);
3548 } else {
3549 if (set->type != REDIS_SET) {
3550 addReply(c,shared.wrongtypeerr);
3551 return;
3552 }
3553 if (dictFind(set->ptr,c->argv[2]))
3554 addReply(c,shared.cone);
3555 else
3556 addReply(c,shared.czero);
3557 }
3558 }
3559
3560 static void scardCommand(redisClient *c) {
3561 robj *o;
3562 dict *s;
3563
3564 o = lookupKeyRead(c->db,c->argv[1]);
3565 if (o == NULL) {
3566 addReply(c,shared.czero);
3567 return;
3568 } else {
3569 if (o->type != REDIS_SET) {
3570 addReply(c,shared.wrongtypeerr);
3571 } else {
3572 s = o->ptr;
3573 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3574 dictSize(s)));
3575 }
3576 }
3577 }
3578
3579 static void spopCommand(redisClient *c) {
3580 robj *set;
3581 dictEntry *de;
3582
3583 set = lookupKeyWrite(c->db,c->argv[1]);
3584 if (set == NULL) {
3585 addReply(c,shared.nullbulk);
3586 } else {
3587 if (set->type != REDIS_SET) {
3588 addReply(c,shared.wrongtypeerr);
3589 return;
3590 }
3591 de = dictGetRandomKey(set->ptr);
3592 if (de == NULL) {
3593 addReply(c,shared.nullbulk);
3594 } else {
3595 robj *ele = dictGetEntryKey(de);
3596
3597 addReplyBulkLen(c,ele);
3598 addReply(c,ele);
3599 addReply(c,shared.crlf);
3600 dictDelete(set->ptr,ele);
3601 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3602 server.dirty++;
3603 }
3604 }
3605 }
3606
3607 static void srandmemberCommand(redisClient *c) {
3608 robj *set;
3609 dictEntry *de;
3610
3611 set = lookupKeyRead(c->db,c->argv[1]);
3612 if (set == NULL) {
3613 addReply(c,shared.nullbulk);
3614 } else {
3615 if (set->type != REDIS_SET) {
3616 addReply(c,shared.wrongtypeerr);
3617 return;
3618 }
3619 de = dictGetRandomKey(set->ptr);
3620 if (de == NULL) {
3621 addReply(c,shared.nullbulk);
3622 } else {
3623 robj *ele = dictGetEntryKey(de);
3624
3625 addReplyBulkLen(c,ele);
3626 addReply(c,ele);
3627 addReply(c,shared.crlf);
3628 }
3629 }
3630 }
3631
3632 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3633 dict **d1 = (void*) s1, **d2 = (void*) s2;
3634
3635 return dictSize(*d1)-dictSize(*d2);
3636 }
3637
3638 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3639 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3640 dictIterator *di;
3641 dictEntry *de;
3642 robj *lenobj = NULL, *dstset = NULL;
3643 int j, cardinality = 0;
3644
3645 for (j = 0; j < setsnum; j++) {
3646 robj *setobj;
3647
3648 setobj = dstkey ?
3649 lookupKeyWrite(c->db,setskeys[j]) :
3650 lookupKeyRead(c->db,setskeys[j]);
3651 if (!setobj) {
3652 zfree(dv);
3653 if (dstkey) {
3654 deleteKey(c->db,dstkey);
3655 addReply(c,shared.ok);
3656 } else {
3657 addReply(c,shared.nullmultibulk);
3658 }
3659 return;
3660 }
3661 if (setobj->type != REDIS_SET) {
3662 zfree(dv);
3663 addReply(c,shared.wrongtypeerr);
3664 return;
3665 }
3666 dv[j] = setobj->ptr;
3667 }
3668 /* Sort sets from the smallest to largest, this will improve our
3669 * algorithm's performace */
3670 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3671
3672 /* The first thing we should output is the total number of elements...
3673 * since this is a multi-bulk write, but at this stage we don't know
3674 * the intersection set size, so we use a trick, append an empty object
3675 * to the output list and save the pointer to later modify it with the
3676 * right length */
3677 if (!dstkey) {
3678 lenobj = createObject(REDIS_STRING,NULL);
3679 addReply(c,lenobj);
3680 decrRefCount(lenobj);
3681 } else {
3682 /* If we have a target key where to store the resulting set
3683 * create this key with an empty set inside */
3684 dstset = createSetObject();
3685 }
3686
3687 /* Iterate all the elements of the first (smallest) set, and test
3688 * the element against all the other sets, if at least one set does
3689 * not include the element it is discarded */
3690 di = dictGetIterator(dv[0]);
3691
3692 while((de = dictNext(di)) != NULL) {
3693 robj *ele;
3694
3695 for (j = 1; j < setsnum; j++)
3696 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3697 if (j != setsnum)
3698 continue; /* at least one set does not contain the member */
3699 ele = dictGetEntryKey(de);
3700 if (!dstkey) {
3701 addReplyBulkLen(c,ele);
3702 addReply(c,ele);
3703 addReply(c,shared.crlf);
3704 cardinality++;
3705 } else {
3706 dictAdd(dstset->ptr,ele,NULL);
3707 incrRefCount(ele);
3708 }
3709 }
3710 dictReleaseIterator(di);
3711
3712 if (dstkey) {
3713 /* Store the resulting set into the target */
3714 deleteKey(c->db,dstkey);
3715 dictAdd(c->db->dict,dstkey,dstset);
3716 incrRefCount(dstkey);
3717 }
3718
3719 if (!dstkey) {
3720 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3721 } else {
3722 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3723 dictSize((dict*)dstset->ptr)));
3724 server.dirty++;
3725 }
3726 zfree(dv);
3727 }
3728
3729 static void sinterCommand(redisClient *c) {
3730 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3731 }
3732
3733 static void sinterstoreCommand(redisClient *c) {
3734 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3735 }
3736
3737 #define REDIS_OP_UNION 0
3738 #define REDIS_OP_DIFF 1
3739
3740 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3741 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3742 dictIterator *di;
3743 dictEntry *de;
3744 robj *dstset = NULL;
3745 int j, cardinality = 0;
3746
3747 for (j = 0; j < setsnum; j++) {
3748 robj *setobj;
3749
3750 setobj = dstkey ?
3751 lookupKeyWrite(c->db,setskeys[j]) :
3752 lookupKeyRead(c->db,setskeys[j]);
3753 if (!setobj) {
3754 dv[j] = NULL;
3755 continue;
3756 }
3757 if (setobj->type != REDIS_SET) {
3758 zfree(dv);
3759 addReply(c,shared.wrongtypeerr);
3760 return;
3761 }
3762 dv[j] = setobj->ptr;
3763 }
3764
3765 /* We need a temp set object to store our union. If the dstkey
3766 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3767 * this set object will be the resulting object to set into the target key*/
3768 dstset = createSetObject();
3769
3770 /* Iterate all the elements of all the sets, add every element a single
3771 * time to the result set */
3772 for (j = 0; j < setsnum; j++) {
3773 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3774 if (!dv[j]) continue; /* non existing keys are like empty sets */
3775
3776 di = dictGetIterator(dv[j]);
3777
3778 while((de = dictNext(di)) != NULL) {
3779 robj *ele;
3780
3781 /* dictAdd will not add the same element multiple times */
3782 ele = dictGetEntryKey(de);
3783 if (op == REDIS_OP_UNION || j == 0) {
3784 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3785 incrRefCount(ele);
3786 cardinality++;
3787 }
3788 } else if (op == REDIS_OP_DIFF) {
3789 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3790 cardinality--;
3791 }
3792 }
3793 }
3794 dictReleaseIterator(di);
3795
3796 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3797 }
3798
3799 /* Output the content of the resulting set, if not in STORE mode */
3800 if (!dstkey) {
3801 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3802 di = dictGetIterator(dstset->ptr);
3803 while((de = dictNext(di)) != NULL) {
3804 robj *ele;
3805
3806 ele = dictGetEntryKey(de);
3807 addReplyBulkLen(c,ele);
3808 addReply(c,ele);
3809 addReply(c,shared.crlf);
3810 }
3811 dictReleaseIterator(di);
3812 } else {
3813 /* If we have a target key where to store the resulting set
3814 * create this key with the result set inside */
3815 deleteKey(c->db,dstkey);
3816 dictAdd(c->db->dict,dstkey,dstset);
3817 incrRefCount(dstkey);
3818 }
3819
3820 /* Cleanup */
3821 if (!dstkey) {
3822 decrRefCount(dstset);
3823 } else {
3824 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3825 dictSize((dict*)dstset->ptr)));
3826 server.dirty++;
3827 }
3828 zfree(dv);
3829 }
3830
3831 static void sunionCommand(redisClient *c) {
3832 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3833 }
3834
3835 static void sunionstoreCommand(redisClient *c) {
3836 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3837 }
3838
3839 static void sdiffCommand(redisClient *c) {
3840 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3841 }
3842
3843 static void sdiffstoreCommand(redisClient *c) {
3844 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3845 }
3846
3847 /* ==================================== ZSets =============================== */
3848
3849 /* ZSETs are ordered sets using two data structures to hold the same elements
3850 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3851 * data structure.
3852 *
3853 * The elements are added to an hash table mapping Redis objects to scores.
3854 * At the same time the elements are added to a skip list mapping scores
3855 * to Redis objects (so objects are sorted by scores in this "view"). */
3856
3857 /* This skiplist implementation is almost a C translation of the original
3858 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3859 * Alternative to Balanced Trees", modified in three ways:
3860 * a) this implementation allows for repeated values.
3861 * b) the comparison is not just by key (our 'score') but by satellite data.
3862 * c) there is a back pointer, so it's a doubly linked list with the back
3863 * pointers being only at "level 1". This allows to traverse the list
3864 * from tail to head, useful for ZREVRANGE. */
3865
3866 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3867 zskiplistNode *zn = zmalloc(sizeof(*zn));
3868
3869 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3870 zn->score = score;
3871 zn->obj = obj;
3872 return zn;
3873 }
3874
3875 static zskiplist *zslCreate(void) {
3876 int j;
3877 zskiplist *zsl;
3878
3879 zsl = zmalloc(sizeof(*zsl));
3880 zsl->level = 1;
3881 zsl->length = 0;
3882 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3883 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3884 zsl->header->forward[j] = NULL;
3885 zsl->header->backward = NULL;
3886 zsl->tail = NULL;
3887 return zsl;
3888 }
3889
3890 static void zslFreeNode(zskiplistNode *node) {
3891 decrRefCount(node->obj);
3892 zfree(node->forward);
3893 zfree(node);
3894 }
3895
3896 static void zslFree(zskiplist *zsl) {
3897 zskiplistNode *node = zsl->header->forward[0], *next;
3898
3899 zfree(zsl->header->forward);
3900 zfree(zsl->header);
3901 while(node) {
3902 next = node->forward[0];
3903 zslFreeNode(node);
3904 node = next;
3905 }
3906 zfree(zsl);
3907 }
3908
3909 static int zslRandomLevel(void) {
3910 int level = 1;
3911 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3912 level += 1;
3913 return level;
3914 }
3915
3916 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3917 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3918 int i, level;
3919
3920 x = zsl->header;
3921 for (i = zsl->level-1; i >= 0; i--) {
3922 while (x->forward[i] &&
3923 (x->forward[i]->score < score ||
3924 (x->forward[i]->score == score &&
3925 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3926 x = x->forward[i];
3927 update[i] = x;
3928 }
3929 /* we assume the key is not already inside, since we allow duplicated
3930 * scores, and the re-insertion of score and redis object should never
3931 * happpen since the caller of zslInsert() should test in the hash table
3932 * if the element is already inside or not. */
3933 level = zslRandomLevel();
3934 if (level > zsl->level) {
3935 for (i = zsl->level; i < level; i++)
3936 update[i] = zsl->header;
3937 zsl->level = level;
3938 }
3939 x = zslCreateNode(level,score,obj);
3940 for (i = 0; i < level; i++) {
3941 x->forward[i] = update[i]->forward[i];
3942 update[i]->forward[i] = x;
3943 }
3944 x->backward = (update[0] == zsl->header) ? NULL : update[0];
3945 if (x->forward[0])
3946 x->forward[0]->backward = x;
3947 else
3948 zsl->tail = x;
3949 zsl->length++;
3950 }
3951
3952 /* Delete an element with matching score/object from the skiplist. */
3953 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
3954 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3955 int i;
3956
3957 x = zsl->header;
3958 for (i = zsl->level-1; i >= 0; i--) {
3959 while (x->forward[i] &&
3960 (x->forward[i]->score < score ||
3961 (x->forward[i]->score == score &&
3962 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3963 x = x->forward[i];
3964 update[i] = x;
3965 }
3966 /* We may have multiple elements with the same score, what we need
3967 * is to find the element with both the right score and object. */
3968 x = x->forward[0];
3969 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
3970 for (i = 0; i < zsl->level; i++) {
3971 if (update[i]->forward[i] != x) break;
3972 update[i]->forward[i] = x->forward[i];
3973 }
3974 if (x->forward[0]) {
3975 x->forward[0]->backward = (x->backward == zsl->header) ?
3976 NULL : x->backward;
3977 } else {
3978 zsl->tail = x->backward;
3979 }
3980 zslFreeNode(x);
3981 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
3982 zsl->level--;
3983 zsl->length--;
3984 return 1;
3985 } else {
3986 return 0; /* not found */
3987 }
3988 return 0; /* not found */
3989 }
3990
3991 /* Delete all the elements with score between min and max from the skiplist.
3992 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
3993 * Note that this function takes the reference to the hash table view of the
3994 * sorted set, in order to remove the elements from the hash table too. */
3995 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
3996 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3997 unsigned long removed = 0;
3998 int i;
3999
4000 x = zsl->header;
4001 for (i = zsl->level-1; i >= 0; i--) {
4002 while (x->forward[i] && x->forward[i]->score < min)
4003 x = x->forward[i];
4004 update[i] = x;
4005 }
4006 /* We may have multiple elements with the same score, what we need
4007 * is to find the element with both the right score and object. */
4008 x = x->forward[0];
4009 while (x && x->score <= max) {
4010 zskiplistNode *next;
4011
4012 for (i = 0; i < zsl->level; i++) {
4013 if (update[i]->forward[i] != x) break;
4014 update[i]->forward[i] = x->forward[i];
4015 }
4016 if (x->forward[0]) {
4017 x->forward[0]->backward = (x->backward == zsl->header) ?
4018 NULL : x->backward;
4019 } else {
4020 zsl->tail = x->backward;
4021 }
4022 next = x->forward[0];
4023 dictDelete(dict,x->obj);
4024 zslFreeNode(x);
4025 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4026 zsl->level--;
4027 zsl->length--;
4028 removed++;
4029 x = next;
4030 }
4031 return removed; /* not found */
4032 }
4033
4034 /* Find the first node having a score equal or greater than the specified one.
4035 * Returns NULL if there is no match. */
4036 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4037 zskiplistNode *x;
4038 int i;
4039
4040 x = zsl->header;
4041 for (i = zsl->level-1; i >= 0; i--) {
4042 while (x->forward[i] && x->forward[i]->score < score)
4043 x = x->forward[i];
4044 }
4045 /* We may have multiple elements with the same score, what we need
4046 * is to find the element with both the right score and object. */
4047 return x->forward[0];
4048 }
4049
4050 /* The actual Z-commands implementations */
4051
4052 static void zaddCommand(redisClient *c) {
4053 robj *zsetobj;
4054 zset *zs;
4055 double *score;
4056
4057 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4058 if (zsetobj == NULL) {
4059 zsetobj = createZsetObject();
4060 dictAdd(c->db->dict,c->argv[1],zsetobj);
4061 incrRefCount(c->argv[1]);
4062 } else {
4063 if (zsetobj->type != REDIS_ZSET) {
4064 addReply(c,shared.wrongtypeerr);
4065 return;
4066 }
4067 }
4068 score = zmalloc(sizeof(double));
4069 *score = strtod(c->argv[2]->ptr,NULL);
4070 zs = zsetobj->ptr;
4071 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
4072 /* case 1: New element */
4073 incrRefCount(c->argv[3]); /* added to hash */
4074 zslInsert(zs->zsl,*score,c->argv[3]);
4075 incrRefCount(c->argv[3]); /* added to skiplist */
4076 server.dirty++;
4077 addReply(c,shared.cone);
4078 } else {
4079 dictEntry *de;
4080 double *oldscore;
4081
4082 /* case 2: Score update operation */
4083 de = dictFind(zs->dict,c->argv[3]);
4084 assert(de != NULL);
4085 oldscore = dictGetEntryVal(de);
4086 if (*score != *oldscore) {
4087 int deleted;
4088
4089 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
4090 assert(deleted != 0);
4091 zslInsert(zs->zsl,*score,c->argv[3]);
4092 incrRefCount(c->argv[3]);
4093 dictReplace(zs->dict,c->argv[3],score);
4094 server.dirty++;
4095 } else {
4096 zfree(score);
4097 }
4098 addReply(c,shared.czero);
4099 }
4100 }
4101
4102 static void zremCommand(redisClient *c) {
4103 robj *zsetobj;
4104 zset *zs;
4105
4106 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4107 if (zsetobj == NULL) {
4108 addReply(c,shared.czero);
4109 } else {
4110 dictEntry *de;
4111 double *oldscore;
4112 int deleted;
4113
4114 if (zsetobj->type != REDIS_ZSET) {
4115 addReply(c,shared.wrongtypeerr);
4116 return;
4117 }
4118 zs = zsetobj->ptr;
4119 de = dictFind(zs->dict,c->argv[2]);
4120 if (de == NULL) {
4121 addReply(c,shared.czero);
4122 return;
4123 }
4124 /* Delete from the skiplist */
4125 oldscore = dictGetEntryVal(de);
4126 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4127 assert(deleted != 0);
4128
4129 /* Delete from the hash table */
4130 dictDelete(zs->dict,c->argv[2]);
4131 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4132 server.dirty++;
4133 addReply(c,shared.cone);
4134 }
4135 }
4136
4137 static void zremrangebyscoreCommand(redisClient *c) {
4138 double min = strtod(c->argv[2]->ptr,NULL);
4139 double max = strtod(c->argv[3]->ptr,NULL);
4140 robj *zsetobj;
4141 zset *zs;
4142
4143 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4144 if (zsetobj == NULL) {
4145 addReply(c,shared.czero);
4146 } else {
4147 long deleted;
4148
4149 if (zsetobj->type != REDIS_ZSET) {
4150 addReply(c,shared.wrongtypeerr);
4151 return;
4152 }
4153 zs = zsetobj->ptr;
4154 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4155 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4156 server.dirty += deleted;
4157 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4158 }
4159 }
4160
4161 static void zrangeGenericCommand(redisClient *c, int reverse) {
4162 robj *o;
4163 int start = atoi(c->argv[2]->ptr);
4164 int end = atoi(c->argv[3]->ptr);
4165
4166 o = lookupKeyRead(c->db,c->argv[1]);
4167 if (o == NULL) {
4168 addReply(c,shared.nullmultibulk);
4169 } else {
4170 if (o->type != REDIS_ZSET) {
4171 addReply(c,shared.wrongtypeerr);
4172 } else {
4173 zset *zsetobj = o->ptr;
4174 zskiplist *zsl = zsetobj->zsl;
4175 zskiplistNode *ln;
4176
4177 int llen = zsl->length;
4178 int rangelen, j;
4179 robj *ele;
4180
4181 /* convert negative indexes */
4182 if (start < 0) start = llen+start;
4183 if (end < 0) end = llen+end;
4184 if (start < 0) start = 0;
4185 if (end < 0) end = 0;
4186
4187 /* indexes sanity checks */
4188 if (start > end || start >= llen) {
4189 /* Out of range start or start > end result in empty list */
4190 addReply(c,shared.emptymultibulk);
4191 return;
4192 }
4193 if (end >= llen) end = llen-1;
4194 rangelen = (end-start)+1;
4195
4196 /* Return the result in form of a multi-bulk reply */
4197 if (reverse) {
4198 ln = zsl->tail;
4199 while (start--)
4200 ln = ln->backward;
4201 } else {
4202 ln = zsl->header->forward[0];
4203 while (start--)
4204 ln = ln->forward[0];
4205 }
4206
4207 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4208 for (j = 0; j < rangelen; j++) {
4209 ele = ln->obj;
4210 addReplyBulkLen(c,ele);
4211 addReply(c,ele);
4212 addReply(c,shared.crlf);
4213 ln = reverse ? ln->backward : ln->forward[0];
4214 }
4215 }
4216 }
4217 }
4218
4219 static void zrangeCommand(redisClient *c) {
4220 zrangeGenericCommand(c,0);
4221 }
4222
4223 static void zrevrangeCommand(redisClient *c) {
4224 zrangeGenericCommand(c,1);
4225 }
4226
4227 static void zrangebyscoreCommand(redisClient *c) {
4228 robj *o;
4229 double min = strtod(c->argv[2]->ptr,NULL);
4230 double max = strtod(c->argv[3]->ptr,NULL);
4231
4232 o = lookupKeyRead(c->db,c->argv[1]);
4233 if (o == NULL) {
4234 addReply(c,shared.nullmultibulk);
4235 } else {
4236 if (o->type != REDIS_ZSET) {
4237 addReply(c,shared.wrongtypeerr);
4238 } else {
4239 zset *zsetobj = o->ptr;
4240 zskiplist *zsl = zsetobj->zsl;
4241 zskiplistNode *ln;
4242 robj *ele, *lenobj;
4243 unsigned int rangelen = 0;
4244
4245 /* Get the first node with the score >= min */
4246 ln = zslFirstWithScore(zsl,min);
4247 if (ln == NULL) {
4248 /* No element matching the speciifed interval */
4249 addReply(c,shared.emptymultibulk);
4250 return;
4251 }
4252
4253 /* We don't know in advance how many matching elements there
4254 * are in the list, so we push this object that will represent
4255 * the multi-bulk length in the output buffer, and will "fix"
4256 * it later */
4257 lenobj = createObject(REDIS_STRING,NULL);
4258 addReply(c,lenobj);
4259
4260 while(ln && ln->score <= max) {
4261 ele = ln->obj;
4262 addReplyBulkLen(c,ele);
4263 addReply(c,ele);
4264 addReply(c,shared.crlf);
4265 ln = ln->forward[0];
4266 rangelen++;
4267 }
4268 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4269 }
4270 }
4271 }
4272
4273 static void zcardCommand(redisClient *c) {
4274 robj *o;
4275 zset *zs;
4276
4277 o = lookupKeyRead(c->db,c->argv[1]);
4278 if (o == NULL) {
4279 addReply(c,shared.czero);
4280 return;
4281 } else {
4282 if (o->type != REDIS_ZSET) {
4283 addReply(c,shared.wrongtypeerr);
4284 } else {
4285 zs = o->ptr;
4286 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4287 }
4288 }
4289 }
4290
4291 static void zscoreCommand(redisClient *c) {
4292 robj *o;
4293 zset *zs;
4294
4295 o = lookupKeyRead(c->db,c->argv[1]);
4296 if (o == NULL) {
4297 addReply(c,shared.czero);
4298 return;
4299 } else {
4300 if (o->type != REDIS_ZSET) {
4301 addReply(c,shared.wrongtypeerr);
4302 } else {
4303 dictEntry *de;
4304
4305 zs = o->ptr;
4306 de = dictFind(zs->dict,c->argv[2]);
4307 if (!de) {
4308 addReply(c,shared.nullbulk);
4309 } else {
4310 char buf[128];
4311 double *score = dictGetEntryVal(de);
4312
4313 snprintf(buf,sizeof(buf),"%.16g",*score);
4314 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
4315 strlen(buf),buf));
4316 }
4317 }
4318 }
4319 }
4320
4321 /* ========================= Non type-specific commands ==================== */
4322
4323 static void flushdbCommand(redisClient *c) {
4324 server.dirty += dictSize(c->db->dict);
4325 dictEmpty(c->db->dict);
4326 dictEmpty(c->db->expires);
4327 addReply(c,shared.ok);
4328 }
4329
4330 static void flushallCommand(redisClient *c) {
4331 server.dirty += emptyDb();
4332 addReply(c,shared.ok);
4333 rdbSave(server.dbfilename);
4334 server.dirty++;
4335 }
4336
4337 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4338 redisSortOperation *so = zmalloc(sizeof(*so));
4339 so->type = type;
4340 so->pattern = pattern;
4341 return so;
4342 }
4343
4344 /* Return the value associated to the key with a name obtained
4345 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4346 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4347 char *p;
4348 sds spat, ssub;
4349 robj keyobj;
4350 int prefixlen, sublen, postfixlen;
4351 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4352 struct {
4353 long len;
4354 long free;
4355 char buf[REDIS_SORTKEY_MAX+1];
4356 } keyname;
4357
4358 if (subst->encoding == REDIS_ENCODING_RAW)
4359 incrRefCount(subst);
4360 else {
4361 subst = getDecodedObject(subst);
4362 }
4363
4364 spat = pattern->ptr;
4365 ssub = subst->ptr;
4366 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4367 p = strchr(spat,'*');
4368 if (!p) return NULL;
4369
4370 prefixlen = p-spat;
4371 sublen = sdslen(ssub);
4372 postfixlen = sdslen(spat)-(prefixlen+1);
4373 memcpy(keyname.buf,spat,prefixlen);
4374 memcpy(keyname.buf+prefixlen,ssub,sublen);
4375 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4376 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4377 keyname.len = prefixlen+sublen+postfixlen;
4378
4379 keyobj.refcount = 1;
4380 keyobj.type = REDIS_STRING;
4381 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4382
4383 decrRefCount(subst);
4384
4385 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4386 return lookupKeyRead(db,&keyobj);
4387 }
4388
4389 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4390 * the additional parameter is not standard but a BSD-specific we have to
4391 * pass sorting parameters via the global 'server' structure */
4392 static int sortCompare(const void *s1, const void *s2) {
4393 const redisSortObject *so1 = s1, *so2 = s2;
4394 int cmp;
4395
4396 if (!server.sort_alpha) {
4397 /* Numeric sorting. Here it's trivial as we precomputed scores */
4398 if (so1->u.score > so2->u.score) {
4399 cmp = 1;
4400 } else if (so1->u.score < so2->u.score) {
4401 cmp = -1;
4402 } else {
4403 cmp = 0;
4404 }
4405 } else {
4406 /* Alphanumeric sorting */
4407 if (server.sort_bypattern) {
4408 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4409 /* At least one compare object is NULL */
4410 if (so1->u.cmpobj == so2->u.cmpobj)
4411 cmp = 0;
4412 else if (so1->u.cmpobj == NULL)
4413 cmp = -1;
4414 else
4415 cmp = 1;
4416 } else {
4417 /* We have both the objects, use strcoll */
4418 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4419 }
4420 } else {
4421 /* Compare elements directly */
4422 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4423 so2->obj->encoding == REDIS_ENCODING_RAW) {
4424 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4425 } else {
4426 robj *dec1, *dec2;
4427
4428 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4429 so1->obj : getDecodedObject(so1->obj);
4430 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4431 so2->obj : getDecodedObject(so2->obj);
4432 cmp = strcoll(dec1->ptr,dec2->ptr);
4433 if (dec1 != so1->obj) decrRefCount(dec1);
4434 if (dec2 != so2->obj) decrRefCount(dec2);
4435 }
4436 }
4437 }
4438 return server.sort_desc ? -cmp : cmp;
4439 }
4440
4441 /* The SORT command is the most complex command in Redis. Warning: this code
4442 * is optimized for speed and a bit less for readability */
4443 static void sortCommand(redisClient *c) {
4444 list *operations;
4445 int outputlen = 0;
4446 int desc = 0, alpha = 0;
4447 int limit_start = 0, limit_count = -1, start, end;
4448 int j, dontsort = 0, vectorlen;
4449 int getop = 0; /* GET operation counter */
4450 robj *sortval, *sortby = NULL;
4451 redisSortObject *vector; /* Resulting vector to sort */
4452
4453 /* Lookup the key to sort. It must be of the right types */
4454 sortval = lookupKeyRead(c->db,c->argv[1]);
4455 if (sortval == NULL) {
4456 addReply(c,shared.nokeyerr);
4457 return;
4458 }
4459 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4460 addReply(c,shared.wrongtypeerr);
4461 return;
4462 }
4463
4464 /* Create a list of operations to perform for every sorted element.
4465 * Operations can be GET/DEL/INCR/DECR */
4466 operations = listCreate();
4467 listSetFreeMethod(operations,zfree);
4468 j = 2;
4469
4470 /* Now we need to protect sortval incrementing its count, in the future
4471 * SORT may have options able to overwrite/delete keys during the sorting
4472 * and the sorted key itself may get destroied */
4473 incrRefCount(sortval);
4474
4475 /* The SORT command has an SQL-alike syntax, parse it */
4476 while(j < c->argc) {
4477 int leftargs = c->argc-j-1;
4478 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4479 desc = 0;
4480 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4481 desc = 1;
4482 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4483 alpha = 1;
4484 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4485 limit_start = atoi(c->argv[j+1]->ptr);
4486 limit_count = atoi(c->argv[j+2]->ptr);
4487 j+=2;
4488 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4489 sortby = c->argv[j+1];
4490 /* If the BY pattern does not contain '*', i.e. it is constant,
4491 * we don't need to sort nor to lookup the weight keys. */
4492 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4493 j++;
4494 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4495 listAddNodeTail(operations,createSortOperation(
4496 REDIS_SORT_GET,c->argv[j+1]));
4497 getop++;
4498 j++;
4499 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
4500 listAddNodeTail(operations,createSortOperation(
4501 REDIS_SORT_DEL,c->argv[j+1]));
4502 j++;
4503 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
4504 listAddNodeTail(operations,createSortOperation(
4505 REDIS_SORT_INCR,c->argv[j+1]));
4506 j++;
4507 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4508 listAddNodeTail(operations,createSortOperation(
4509 REDIS_SORT_DECR,c->argv[j+1]));
4510 j++;
4511 } else {
4512 decrRefCount(sortval);
4513 listRelease(operations);
4514 addReply(c,shared.syntaxerr);
4515 return;
4516 }
4517 j++;
4518 }
4519
4520 /* Load the sorting vector with all the objects to sort */
4521 vectorlen = (sortval->type == REDIS_LIST) ?
4522 listLength((list*)sortval->ptr) :
4523 dictSize((dict*)sortval->ptr);
4524 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4525 j = 0;
4526 if (sortval->type == REDIS_LIST) {
4527 list *list = sortval->ptr;
4528 listNode *ln;
4529
4530 listRewind(list);
4531 while((ln = listYield(list))) {
4532 robj *ele = ln->value;
4533 vector[j].obj = ele;
4534 vector[j].u.score = 0;
4535 vector[j].u.cmpobj = NULL;
4536 j++;
4537 }
4538 } else {
4539 dict *set = sortval->ptr;
4540 dictIterator *di;
4541 dictEntry *setele;
4542
4543 di = dictGetIterator(set);
4544 while((setele = dictNext(di)) != NULL) {
4545 vector[j].obj = dictGetEntryKey(setele);
4546 vector[j].u.score = 0;
4547 vector[j].u.cmpobj = NULL;
4548 j++;
4549 }
4550 dictReleaseIterator(di);
4551 }
4552 assert(j == vectorlen);
4553
4554 /* Now it's time to load the right scores in the sorting vector */
4555 if (dontsort == 0) {
4556 for (j = 0; j < vectorlen; j++) {
4557 if (sortby) {
4558 robj *byval;
4559
4560 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4561 if (!byval || byval->type != REDIS_STRING) continue;
4562 if (alpha) {
4563 if (byval->encoding == REDIS_ENCODING_RAW) {
4564 vector[j].u.cmpobj = byval;
4565 incrRefCount(byval);
4566 } else {
4567 vector[j].u.cmpobj = getDecodedObject(byval);
4568 }
4569 } else {
4570 if (byval->encoding == REDIS_ENCODING_RAW) {
4571 vector[j].u.score = strtod(byval->ptr,NULL);
4572 } else {
4573 if (byval->encoding == REDIS_ENCODING_INT) {
4574 vector[j].u.score = (long)byval->ptr;
4575 } else
4576 assert(1 != 1);
4577 }
4578 }
4579 } else {
4580 if (!alpha) {
4581 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4582 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4583 else {
4584 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4585 vector[j].u.score = (long) vector[j].obj->ptr;
4586 else
4587 assert(1 != 1);
4588 }
4589 }
4590 }
4591 }
4592 }
4593
4594 /* We are ready to sort the vector... perform a bit of sanity check
4595 * on the LIMIT option too. We'll use a partial version of quicksort. */
4596 start = (limit_start < 0) ? 0 : limit_start;
4597 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4598 if (start >= vectorlen) {
4599 start = vectorlen-1;
4600 end = vectorlen-2;
4601 }
4602 if (end >= vectorlen) end = vectorlen-1;
4603
4604 if (dontsort == 0) {
4605 server.sort_desc = desc;
4606 server.sort_alpha = alpha;
4607 server.sort_bypattern = sortby ? 1 : 0;
4608 if (sortby && (start != 0 || end != vectorlen-1))
4609 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4610 else
4611 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4612 }
4613
4614 /* Send command output to the output buffer, performing the specified
4615 * GET/DEL/INCR/DECR operations if any. */
4616 outputlen = getop ? getop*(end-start+1) : end-start+1;
4617 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4618 for (j = start; j <= end; j++) {
4619 listNode *ln;
4620 if (!getop) {
4621 addReplyBulkLen(c,vector[j].obj);
4622 addReply(c,vector[j].obj);
4623 addReply(c,shared.crlf);
4624 }
4625 listRewind(operations);
4626 while((ln = listYield(operations))) {
4627 redisSortOperation *sop = ln->value;
4628 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4629 vector[j].obj);
4630
4631 if (sop->type == REDIS_SORT_GET) {
4632 if (!val || val->type != REDIS_STRING) {
4633 addReply(c,shared.nullbulk);
4634 } else {
4635 addReplyBulkLen(c,val);
4636 addReply(c,val);
4637 addReply(c,shared.crlf);
4638 }
4639 } else if (sop->type == REDIS_SORT_DEL) {
4640 /* TODO */
4641 }
4642 }
4643 }
4644
4645 /* Cleanup */
4646 decrRefCount(sortval);
4647 listRelease(operations);
4648 for (j = 0; j < vectorlen; j++) {
4649 if (sortby && alpha && vector[j].u.cmpobj)
4650 decrRefCount(vector[j].u.cmpobj);
4651 }
4652 zfree(vector);
4653 }
4654
4655 static void infoCommand(redisClient *c) {
4656 sds info;
4657 time_t uptime = time(NULL)-server.stat_starttime;
4658 int j;
4659
4660 info = sdscatprintf(sdsempty(),
4661 "redis_version:%s\r\n"
4662 "arch_bits:%s\r\n"
4663 "uptime_in_seconds:%d\r\n"
4664 "uptime_in_days:%d\r\n"
4665 "connected_clients:%d\r\n"
4666 "connected_slaves:%d\r\n"
4667 "used_memory:%zu\r\n"
4668 "changes_since_last_save:%lld\r\n"
4669 "bgsave_in_progress:%d\r\n"
4670 "last_save_time:%d\r\n"
4671 "total_connections_received:%lld\r\n"
4672 "total_commands_processed:%lld\r\n"
4673 "role:%s\r\n"
4674 ,REDIS_VERSION,
4675 (sizeof(long) == 8) ? "64" : "32",
4676 uptime,
4677 uptime/(3600*24),
4678 listLength(server.clients)-listLength(server.slaves),
4679 listLength(server.slaves),
4680 server.usedmemory,
4681 server.dirty,
4682 server.bgsaveinprogress,
4683 server.lastsave,
4684 server.stat_numconnections,
4685 server.stat_numcommands,
4686 server.masterhost == NULL ? "master" : "slave"
4687 );
4688 if (server.masterhost) {
4689 info = sdscatprintf(info,
4690 "master_host:%s\r\n"
4691 "master_port:%d\r\n"
4692 "master_link_status:%s\r\n"
4693 "master_last_io_seconds_ago:%d\r\n"
4694 ,server.masterhost,
4695 server.masterport,
4696 (server.replstate == REDIS_REPL_CONNECTED) ?
4697 "up" : "down",
4698 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
4699 );
4700 }
4701 for (j = 0; j < server.dbnum; j++) {
4702 long long keys, vkeys;
4703
4704 keys = dictSize(server.db[j].dict);
4705 vkeys = dictSize(server.db[j].expires);
4706 if (keys || vkeys) {
4707 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4708 j, keys, vkeys);
4709 }
4710 }
4711 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4712 addReplySds(c,info);
4713 addReply(c,shared.crlf);
4714 }
4715
4716 static void monitorCommand(redisClient *c) {
4717 /* ignore MONITOR if aleady slave or in monitor mode */
4718 if (c->flags & REDIS_SLAVE) return;
4719
4720 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4721 c->slaveseldb = 0;
4722 listAddNodeTail(server.monitors,c);
4723 addReply(c,shared.ok);
4724 }
4725
4726 /* ================================= Expire ================================= */
4727 static int removeExpire(redisDb *db, robj *key) {
4728 if (dictDelete(db->expires,key) == DICT_OK) {
4729 return 1;
4730 } else {
4731 return 0;
4732 }
4733 }
4734
4735 static int setExpire(redisDb *db, robj *key, time_t when) {
4736 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4737 return 0;
4738 } else {
4739 incrRefCount(key);
4740 return 1;
4741 }
4742 }
4743
4744 /* Return the expire time of the specified key, or -1 if no expire
4745 * is associated with this key (i.e. the key is non volatile) */
4746 static time_t getExpire(redisDb *db, robj *key) {
4747 dictEntry *de;
4748
4749 /* No expire? return ASAP */
4750 if (dictSize(db->expires) == 0 ||
4751 (de = dictFind(db->expires,key)) == NULL) return -1;
4752
4753 return (time_t) dictGetEntryVal(de);
4754 }
4755
4756 static int expireIfNeeded(redisDb *db, robj *key) {
4757 time_t when;
4758 dictEntry *de;
4759
4760 /* No expire? return ASAP */
4761 if (dictSize(db->expires) == 0 ||
4762 (de = dictFind(db->expires,key)) == NULL) return 0;
4763
4764 /* Lookup the expire */
4765 when = (time_t) dictGetEntryVal(de);
4766 if (time(NULL) <= when) return 0;
4767
4768 /* Delete the key */
4769 dictDelete(db->expires,key);
4770 return dictDelete(db->dict,key) == DICT_OK;
4771 }
4772
4773 static int deleteIfVolatile(redisDb *db, robj *key) {
4774 dictEntry *de;
4775
4776 /* No expire? return ASAP */
4777 if (dictSize(db->expires) == 0 ||
4778 (de = dictFind(db->expires,key)) == NULL) return 0;
4779
4780 /* Delete the key */
4781 server.dirty++;
4782 dictDelete(db->expires,key);
4783 return dictDelete(db->dict,key) == DICT_OK;
4784 }
4785
4786 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
4787 dictEntry *de;
4788
4789 de = dictFind(c->db->dict,key);
4790 if (de == NULL) {
4791 addReply(c,shared.czero);
4792 return;
4793 }
4794 if (seconds < 0) {
4795 if (deleteKey(c->db,key)) server.dirty++;
4796 addReply(c, shared.cone);
4797 return;
4798 } else {
4799 time_t when = time(NULL)+seconds;
4800 if (setExpire(c->db,key,when)) {
4801 addReply(c,shared.cone);
4802 server.dirty++;
4803 } else {
4804 addReply(c,shared.czero);
4805 }
4806 return;
4807 }
4808 }
4809
4810 static void expireCommand(redisClient *c) {
4811 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
4812 }
4813
4814 static void expireatCommand(redisClient *c) {
4815 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
4816 }
4817
4818 static void ttlCommand(redisClient *c) {
4819 time_t expire;
4820 int ttl = -1;
4821
4822 expire = getExpire(c->db,c->argv[1]);
4823 if (expire != -1) {
4824 ttl = (int) (expire-time(NULL));
4825 if (ttl < 0) ttl = -1;
4826 }
4827 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4828 }
4829
4830 static void msetGenericCommand(redisClient *c, int nx) {
4831 int j;
4832
4833 if ((c->argc % 2) == 0) {
4834 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4835 return;
4836 }
4837 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4838 * set nothing at all if at least one already key exists. */
4839 if (nx) {
4840 for (j = 1; j < c->argc; j += 2) {
4841 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4842 addReply(c, shared.czero);
4843 return;
4844 }
4845 }
4846 }
4847
4848 for (j = 1; j < c->argc; j += 2) {
4849 int retval;
4850
4851 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4852 if (retval == DICT_ERR) {
4853 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4854 incrRefCount(c->argv[j+1]);
4855 } else {
4856 incrRefCount(c->argv[j]);
4857 incrRefCount(c->argv[j+1]);
4858 }
4859 removeExpire(c->db,c->argv[j]);
4860 }
4861 server.dirty += (c->argc-1)/2;
4862 addReply(c, nx ? shared.cone : shared.ok);
4863 }
4864
4865 static void msetCommand(redisClient *c) {
4866 msetGenericCommand(c,0);
4867 }
4868
4869 static void msetnxCommand(redisClient *c) {
4870 msetGenericCommand(c,1);
4871 }
4872
4873 /* =============================== Replication ============================= */
4874
4875 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4876 ssize_t nwritten, ret = size;
4877 time_t start = time(NULL);
4878
4879 timeout++;
4880 while(size) {
4881 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4882 nwritten = write(fd,ptr,size);
4883 if (nwritten == -1) return -1;
4884 ptr += nwritten;
4885 size -= nwritten;
4886 }
4887 if ((time(NULL)-start) > timeout) {
4888 errno = ETIMEDOUT;
4889 return -1;
4890 }
4891 }
4892 return ret;
4893 }
4894
4895 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4896 ssize_t nread, totread = 0;
4897 time_t start = time(NULL);
4898
4899 timeout++;
4900 while(size) {
4901 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4902 nread = read(fd,ptr,size);
4903 if (nread == -1) return -1;
4904 ptr += nread;
4905 size -= nread;
4906 totread += nread;
4907 }
4908 if ((time(NULL)-start) > timeout) {
4909 errno = ETIMEDOUT;
4910 return -1;
4911 }
4912 }
4913 return totread;
4914 }
4915
4916 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4917 ssize_t nread = 0;
4918
4919 size--;
4920 while(size) {
4921 char c;
4922
4923 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4924 if (c == '\n') {
4925 *ptr = '\0';
4926 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4927 return nread;
4928 } else {
4929 *ptr++ = c;
4930 *ptr = '\0';
4931 nread++;
4932 }
4933 }
4934 return nread;
4935 }
4936
4937 static void syncCommand(redisClient *c) {
4938 /* ignore SYNC if aleady slave or in monitor mode */
4939 if (c->flags & REDIS_SLAVE) return;
4940
4941 /* SYNC can't be issued when the server has pending data to send to
4942 * the client about already issued commands. We need a fresh reply
4943 * buffer registering the differences between the BGSAVE and the current
4944 * dataset, so that we can copy to other slaves if needed. */
4945 if (listLength(c->reply) != 0) {
4946 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4947 return;
4948 }
4949
4950 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4951 /* Here we need to check if there is a background saving operation
4952 * in progress, or if it is required to start one */
4953 if (server.bgsaveinprogress) {
4954 /* Ok a background save is in progress. Let's check if it is a good
4955 * one for replication, i.e. if there is another slave that is
4956 * registering differences since the server forked to save */
4957 redisClient *slave;
4958 listNode *ln;
4959
4960 listRewind(server.slaves);
4961 while((ln = listYield(server.slaves))) {
4962 slave = ln->value;
4963 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4964 }
4965 if (ln) {
4966 /* Perfect, the server is already registering differences for
4967 * another slave. Set the right state, and copy the buffer. */
4968 listRelease(c->reply);
4969 c->reply = listDup(slave->reply);
4970 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4971 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4972 } else {
4973 /* No way, we need to wait for the next BGSAVE in order to
4974 * register differences */
4975 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4976 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4977 }
4978 } else {
4979 /* Ok we don't have a BGSAVE in progress, let's start one */
4980 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4981 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4982 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4983 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4984 return;
4985 }
4986 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4987 }
4988 c->repldbfd = -1;
4989 c->flags |= REDIS_SLAVE;
4990 c->slaveseldb = 0;
4991 listAddNodeTail(server.slaves,c);
4992 return;
4993 }
4994
4995 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4996 redisClient *slave = privdata;
4997 REDIS_NOTUSED(el);
4998 REDIS_NOTUSED(mask);
4999 char buf[REDIS_IOBUF_LEN];
5000 ssize_t nwritten, buflen;
5001
5002 if (slave->repldboff == 0) {
5003 /* Write the bulk write count before to transfer the DB. In theory here
5004 * we don't know how much room there is in the output buffer of the
5005 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5006 * operations) will never be smaller than the few bytes we need. */
5007 sds bulkcount;
5008
5009 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5010 slave->repldbsize);
5011 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5012 {
5013 sdsfree(bulkcount);
5014 freeClient(slave);
5015 return;
5016 }
5017 sdsfree(bulkcount);
5018 }
5019 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5020 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5021 if (buflen <= 0) {
5022 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5023 (buflen == 0) ? "premature EOF" : strerror(errno));
5024 freeClient(slave);
5025 return;
5026 }
5027 if ((nwritten = write(fd,buf,buflen)) == -1) {
5028 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5029 strerror(errno));
5030 freeClient(slave);
5031 return;
5032 }
5033 slave->repldboff += nwritten;
5034 if (slave->repldboff == slave->repldbsize) {
5035 close(slave->repldbfd);
5036 slave->repldbfd = -1;
5037 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5038 slave->replstate = REDIS_REPL_ONLINE;
5039 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5040 sendReplyToClient, slave, NULL) == AE_ERR) {
5041 freeClient(slave);
5042 return;
5043 }
5044 addReplySds(slave,sdsempty());
5045 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5046 }
5047 }
5048
5049 /* This function is called at the end of every backgrond saving.
5050 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5051 * otherwise REDIS_ERR is passed to the function.
5052 *
5053 * The goal of this function is to handle slaves waiting for a successful
5054 * background saving in order to perform non-blocking synchronization. */
5055 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5056 listNode *ln;
5057 int startbgsave = 0;
5058
5059 listRewind(server.slaves);
5060 while((ln = listYield(server.slaves))) {
5061 redisClient *slave = ln->value;
5062
5063 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5064 startbgsave = 1;
5065 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5066 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5067 struct redis_stat buf;
5068
5069 if (bgsaveerr != REDIS_OK) {
5070 freeClient(slave);
5071 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5072 continue;
5073 }
5074 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5075 redis_fstat(slave->repldbfd,&buf) == -1) {
5076 freeClient(slave);
5077 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5078 continue;
5079 }
5080 slave->repldboff = 0;
5081 slave->repldbsize = buf.st_size;
5082 slave->replstate = REDIS_REPL_SEND_BULK;
5083 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5084 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
5085 freeClient(slave);
5086 continue;
5087 }
5088 }
5089 }
5090 if (startbgsave) {
5091 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5092 listRewind(server.slaves);
5093 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5094 while((ln = listYield(server.slaves))) {
5095 redisClient *slave = ln->value;
5096
5097 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5098 freeClient(slave);
5099 }
5100 }
5101 }
5102 }
5103
5104 static int syncWithMaster(void) {
5105 char buf[1024], tmpfile[256];
5106 int dumpsize;
5107 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5108 int dfd;
5109
5110 if (fd == -1) {
5111 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5112 strerror(errno));
5113 return REDIS_ERR;
5114 }
5115 /* Issue the SYNC command */
5116 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5117 close(fd);
5118 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5119 strerror(errno));
5120 return REDIS_ERR;
5121 }
5122 /* Read the bulk write count */
5123 if (syncReadLine(fd,buf,1024,3600) == -1) {
5124 close(fd);
5125 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5126 strerror(errno));
5127 return REDIS_ERR;
5128 }
5129 if (buf[0] != '$') {
5130 close(fd);
5131 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5132 return REDIS_ERR;
5133 }
5134 dumpsize = atoi(buf+1);
5135 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5136 /* Read the bulk write data on a temp file */
5137 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5138 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5139 if (dfd == -1) {
5140 close(fd);
5141 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5142 return REDIS_ERR;
5143 }
5144 while(dumpsize) {
5145 int nread, nwritten;
5146
5147 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5148 if (nread == -1) {
5149 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5150 strerror(errno));
5151 close(fd);
5152 close(dfd);
5153 return REDIS_ERR;
5154 }
5155 nwritten = write(dfd,buf,nread);
5156 if (nwritten == -1) {
5157 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5158 close(fd);
5159 close(dfd);
5160 return REDIS_ERR;
5161 }
5162 dumpsize -= nread;
5163 }
5164 close(dfd);
5165 if (rename(tmpfile,server.dbfilename) == -1) {
5166 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5167 unlink(tmpfile);
5168 close(fd);
5169 return REDIS_ERR;
5170 }
5171 emptyDb();
5172 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5173 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5174 close(fd);
5175 return REDIS_ERR;
5176 }
5177 server.master = createClient(fd);
5178 server.master->flags |= REDIS_MASTER;
5179 server.replstate = REDIS_REPL_CONNECTED;
5180 return REDIS_OK;
5181 }
5182
5183 static void slaveofCommand(redisClient *c) {
5184 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5185 !strcasecmp(c->argv[2]->ptr,"one")) {
5186 if (server.masterhost) {
5187 sdsfree(server.masterhost);
5188 server.masterhost = NULL;
5189 if (server.master) freeClient(server.master);
5190 server.replstate = REDIS_REPL_NONE;
5191 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5192 }
5193 } else {
5194 sdsfree(server.masterhost);
5195 server.masterhost = sdsdup(c->argv[1]->ptr);
5196 server.masterport = atoi(c->argv[2]->ptr);
5197 if (server.master) freeClient(server.master);
5198 server.replstate = REDIS_REPL_CONNECT;
5199 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5200 server.masterhost, server.masterport);
5201 }
5202 addReply(c,shared.ok);
5203 }
5204
5205 /* ============================ Maxmemory directive ======================== */
5206
5207 /* This function gets called when 'maxmemory' is set on the config file to limit
5208 * the max memory used by the server, and we are out of memory.
5209 * This function will try to, in order:
5210 *
5211 * - Free objects from the free list
5212 * - Try to remove keys with an EXPIRE set
5213 *
5214 * It is not possible to free enough memory to reach used-memory < maxmemory
5215 * the server will start refusing commands that will enlarge even more the
5216 * memory usage.
5217 */
5218 static void freeMemoryIfNeeded(void) {
5219 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5220 if (listLength(server.objfreelist)) {
5221 robj *o;
5222
5223 listNode *head = listFirst(server.objfreelist);
5224 o = listNodeValue(head);
5225 listDelNode(server.objfreelist,head);
5226 zfree(o);
5227 } else {
5228 int j, k, freed = 0;
5229
5230 for (j = 0; j < server.dbnum; j++) {
5231 int minttl = -1;
5232 robj *minkey = NULL;
5233 struct dictEntry *de;
5234
5235 if (dictSize(server.db[j].expires)) {
5236 freed = 1;
5237 /* From a sample of three keys drop the one nearest to
5238 * the natural expire */
5239 for (k = 0; k < 3; k++) {
5240 time_t t;
5241
5242 de = dictGetRandomKey(server.db[j].expires);
5243 t = (time_t) dictGetEntryVal(de);
5244 if (minttl == -1 || t < minttl) {
5245 minkey = dictGetEntryKey(de);
5246 minttl = t;
5247 }
5248 }
5249 deleteKey(server.db+j,minkey);
5250 }
5251 }
5252 if (!freed) return; /* nothing to free... */
5253 }
5254 }
5255 }
5256
5257 /* ============================== Append Only file ========================== */
5258
5259 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5260 sds buf = sdsempty();
5261 int j;
5262 ssize_t nwritten;
5263 time_t now;
5264 robj *tmpargv[3];
5265
5266 /* The DB this command was targetting is not the same as the last command
5267 * we appendend. To issue a SELECT command is needed. */
5268 if (dictid != server.appendseldb) {
5269 char seldb[64];
5270
5271 snprintf(seldb,sizeof(seldb),"%d",dictid);
5272 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
5273 strlen(seldb),seldb);
5274 server.appendseldb = dictid;
5275 }
5276
5277 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5278 * EXPIREs into EXPIREATs calls */
5279 if (cmd->proc == expireCommand) {
5280 long when;
5281
5282 tmpargv[0] = createStringObject("EXPIREAT",8);
5283 tmpargv[1] = argv[1];
5284 incrRefCount(argv[1]);
5285 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5286 tmpargv[2] = createObject(REDIS_STRING,
5287 sdscatprintf(sdsempty(),"%ld",when));
5288 argv = tmpargv;
5289 }
5290
5291 /* Append the actual command */
5292 buf = sdscatprintf(buf,"*%d\r\n",argc);
5293 for (j = 0; j < argc; j++) {
5294 robj *o = argv[j];
5295
5296 if (o->encoding != REDIS_ENCODING_RAW)
5297 o = getDecodedObject(o);
5298 buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
5299 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5300 buf = sdscatlen(buf,"\r\n",2);
5301 if (o != argv[j])
5302 decrRefCount(o);
5303 }
5304
5305 /* Free the objects from the modified argv for EXPIREAT */
5306 if (cmd->proc == expireCommand) {
5307 for (j = 0; j < 3; j++)
5308 decrRefCount(argv[j]);
5309 }
5310
5311 /* We want to perform a single write. This should be guaranteed atomic
5312 * at least if the filesystem we are writing is a real physical one.
5313 * While this will save us against the server being killed I don't think
5314 * there is much to do about the whole server stopping for power problems
5315 * or alike */
5316 nwritten = write(server.appendfd,buf,sdslen(buf));
5317 if (nwritten != (signed)sdslen(buf)) {
5318 /* Ooops, we are in troubles. The best thing to do for now is
5319 * to simply exit instead to give the illusion that everything is
5320 * working as expected. */
5321 if (nwritten == -1) {
5322 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5323 } else {
5324 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5325 }
5326 exit(1);
5327 }
5328 now = time(NULL);
5329 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5330 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5331 now-server.lastfsync > 1))
5332 {
5333 fsync(server.appendfd); /* Let's try to get this data on the disk */
5334 server.lastfsync = now;
5335 }
5336 }
5337
5338 /* In Redis commands are always executed in the context of a client, so in
5339 * order to load the append only file we need to create a fake client. */
5340 static struct redisClient *createFakeClient(void) {
5341 struct redisClient *c = zmalloc(sizeof(*c));
5342
5343 selectDb(c,0);
5344 c->fd = -1;
5345 c->querybuf = sdsempty();
5346 c->argc = 0;
5347 c->argv = NULL;
5348 c->flags = 0;
5349 /* We set the fake client as a slave waiting for the synchronization
5350 * so that Redis will not try to send replies to this client. */
5351 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5352 c->reply = listCreate();
5353 listSetFreeMethod(c->reply,decrRefCount);
5354 listSetDupMethod(c->reply,dupClientReplyValue);
5355 return c;
5356 }
5357
5358 static void freeFakeClient(struct redisClient *c) {
5359 sdsfree(c->querybuf);
5360 listRelease(c->reply);
5361 zfree(c);
5362 }
5363
5364 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5365 * error (the append only file is zero-length) REDIS_ERR is returned. On
5366 * fatal error an error message is logged and the program exists. */
5367 int loadAppendOnlyFile(char *filename) {
5368 struct redisClient *fakeClient;
5369 FILE *fp = fopen(filename,"r");
5370 struct redis_stat sb;
5371
5372 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5373 return REDIS_ERR;
5374
5375 if (fp == NULL) {
5376 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5377 exit(1);
5378 }
5379
5380 fakeClient = createFakeClient();
5381 while(1) {
5382 int argc, j;
5383 unsigned long len;
5384 robj **argv;
5385 char buf[128];
5386 sds argsds;
5387 struct redisCommand *cmd;
5388
5389 if (fgets(buf,sizeof(buf),fp) == NULL) {
5390 if (feof(fp))
5391 break;
5392 else
5393 goto readerr;
5394 }
5395 if (buf[0] != '*') goto fmterr;
5396 argc = atoi(buf+1);
5397 argv = zmalloc(sizeof(robj*)*argc);
5398 for (j = 0; j < argc; j++) {
5399 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5400 if (buf[0] != '$') goto fmterr;
5401 len = strtol(buf+1,NULL,10);
5402 argsds = sdsnewlen(NULL,len);
5403 if (fread(argsds,len,1,fp) == 0) goto fmterr;
5404 argv[j] = createObject(REDIS_STRING,argsds);
5405 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5406 }
5407
5408 /* Command lookup */
5409 cmd = lookupCommand(argv[0]->ptr);
5410 if (!cmd) {
5411 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5412 exit(1);
5413 }
5414 /* Try object sharing and encoding */
5415 if (server.shareobjects) {
5416 int j;
5417 for(j = 1; j < argc; j++)
5418 argv[j] = tryObjectSharing(argv[j]);
5419 }
5420 if (cmd->flags & REDIS_CMD_BULK)
5421 tryObjectEncoding(argv[argc-1]);
5422 /* Run the command in the context of a fake client */
5423 fakeClient->argc = argc;
5424 fakeClient->argv = argv;
5425 cmd->proc(fakeClient);
5426 /* Discard the reply objects list from the fake client */
5427 while(listLength(fakeClient->reply))
5428 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5429 /* Clean up, ready for the next command */
5430 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5431 zfree(argv);
5432 }
5433 fclose(fp);
5434 freeFakeClient(fakeClient);
5435 return REDIS_OK;
5436
5437 readerr:
5438 if (feof(fp)) {
5439 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5440 } else {
5441 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5442 }
5443 exit(1);
5444 fmterr:
5445 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5446 exit(1);
5447 }
5448
5449 /* ================================= Debugging ============================== */
5450
5451 static void debugCommand(redisClient *c) {
5452 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
5453 *((char*)-1) = 'x';
5454 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
5455 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
5456 robj *key, *val;
5457
5458 if (!de) {
5459 addReply(c,shared.nokeyerr);
5460 return;
5461 }
5462 key = dictGetEntryKey(de);
5463 val = dictGetEntryVal(de);
5464 addReplySds(c,sdscatprintf(sdsempty(),
5465 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5466 key, key->refcount, val, val->refcount, val->encoding));
5467 } else {
5468 addReplySds(c,sdsnew(
5469 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5470 }
5471 }
5472
5473 #ifdef HAVE_BACKTRACE
5474 static struct redisFunctionSym symsTable[] = {
5475 {"compareStringObjects", (unsigned long)compareStringObjects},
5476 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
5477 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
5478 {"dictEncObjHash", (unsigned long)dictEncObjHash},
5479 {"incrDecrCommand", (unsigned long)incrDecrCommand},
5480 {"freeStringObject", (unsigned long)freeStringObject},
5481 {"freeListObject", (unsigned long)freeListObject},
5482 {"freeSetObject", (unsigned long)freeSetObject},
5483 {"decrRefCount", (unsigned long)decrRefCount},
5484 {"createObject", (unsigned long)createObject},
5485 {"freeClient", (unsigned long)freeClient},
5486 {"rdbLoad", (unsigned long)rdbLoad},
5487 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
5488 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
5489 {"addReply", (unsigned long)addReply},
5490 {"addReplySds", (unsigned long)addReplySds},
5491 {"incrRefCount", (unsigned long)incrRefCount},
5492 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
5493 {"createStringObject", (unsigned long)createStringObject},
5494 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
5495 {"syncWithMaster", (unsigned long)syncWithMaster},
5496 {"tryObjectSharing", (unsigned long)tryObjectSharing},
5497 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
5498 {"getDecodedObject", (unsigned long)getDecodedObject},
5499 {"removeExpire", (unsigned long)removeExpire},
5500 {"expireIfNeeded", (unsigned long)expireIfNeeded},
5501 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
5502 {"deleteKey", (unsigned long)deleteKey},
5503 {"getExpire", (unsigned long)getExpire},
5504 {"setExpire", (unsigned long)setExpire},
5505 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
5506 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
5507 {"authCommand", (unsigned long)authCommand},
5508 {"pingCommand", (unsigned long)pingCommand},
5509 {"echoCommand", (unsigned long)echoCommand},
5510 {"setCommand", (unsigned long)setCommand},
5511 {"setnxCommand", (unsigned long)setnxCommand},
5512 {"getCommand", (unsigned long)getCommand},
5513 {"delCommand", (unsigned long)delCommand},
5514 {"existsCommand", (unsigned long)existsCommand},
5515 {"incrCommand", (unsigned long)incrCommand},
5516 {"decrCommand", (unsigned long)decrCommand},
5517 {"incrbyCommand", (unsigned long)incrbyCommand},
5518 {"decrbyCommand", (unsigned long)decrbyCommand},
5519 {"selectCommand", (unsigned long)selectCommand},
5520 {"randomkeyCommand", (unsigned long)randomkeyCommand},
5521 {"keysCommand", (unsigned long)keysCommand},
5522 {"dbsizeCommand", (unsigned long)dbsizeCommand},
5523 {"lastsaveCommand", (unsigned long)lastsaveCommand},
5524 {"saveCommand", (unsigned long)saveCommand},
5525 {"bgsaveCommand", (unsigned long)bgsaveCommand},
5526 {"shutdownCommand", (unsigned long)shutdownCommand},
5527 {"moveCommand", (unsigned long)moveCommand},
5528 {"renameCommand", (unsigned long)renameCommand},
5529 {"renamenxCommand", (unsigned long)renamenxCommand},
5530 {"lpushCommand", (unsigned long)lpushCommand},
5531 {"rpushCommand", (unsigned long)rpushCommand},
5532 {"lpopCommand", (unsigned long)lpopCommand},
5533 {"rpopCommand", (unsigned long)rpopCommand},
5534 {"llenCommand", (unsigned long)llenCommand},
5535 {"lindexCommand", (unsigned long)lindexCommand},
5536 {"lrangeCommand", (unsigned long)lrangeCommand},
5537 {"ltrimCommand", (unsigned long)ltrimCommand},
5538 {"typeCommand", (unsigned long)typeCommand},
5539 {"lsetCommand", (unsigned long)lsetCommand},
5540 {"saddCommand", (unsigned long)saddCommand},
5541 {"sremCommand", (unsigned long)sremCommand},
5542 {"smoveCommand", (unsigned long)smoveCommand},
5543 {"sismemberCommand", (unsigned long)sismemberCommand},
5544 {"scardCommand", (unsigned long)scardCommand},
5545 {"spopCommand", (unsigned long)spopCommand},
5546 {"srandmemberCommand", (unsigned long)srandmemberCommand},
5547 {"sinterCommand", (unsigned long)sinterCommand},
5548 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
5549 {"sunionCommand", (unsigned long)sunionCommand},
5550 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
5551 {"sdiffCommand", (unsigned long)sdiffCommand},
5552 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
5553 {"syncCommand", (unsigned long)syncCommand},
5554 {"flushdbCommand", (unsigned long)flushdbCommand},
5555 {"flushallCommand", (unsigned long)flushallCommand},
5556 {"sortCommand", (unsigned long)sortCommand},
5557 {"lremCommand", (unsigned long)lremCommand},
5558 {"infoCommand", (unsigned long)infoCommand},
5559 {"mgetCommand", (unsigned long)mgetCommand},
5560 {"monitorCommand", (unsigned long)monitorCommand},
5561 {"expireCommand", (unsigned long)expireCommand},
5562 {"expireatCommand", (unsigned long)expireatCommand},
5563 {"getsetCommand", (unsigned long)getsetCommand},
5564 {"ttlCommand", (unsigned long)ttlCommand},
5565 {"slaveofCommand", (unsigned long)slaveofCommand},
5566 {"debugCommand", (unsigned long)debugCommand},
5567 {"processCommand", (unsigned long)processCommand},
5568 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
5569 {"readQueryFromClient", (unsigned long)readQueryFromClient},
5570 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
5571 {"msetGenericCommand", (unsigned long)msetGenericCommand},
5572 {"msetCommand", (unsigned long)msetCommand},
5573 {"msetnxCommand", (unsigned long)msetnxCommand},
5574 {"zslCreateNode", (unsigned long)zslCreateNode},
5575 {"zslCreate", (unsigned long)zslCreate},
5576 {"zslFreeNode",(unsigned long)zslFreeNode},
5577 {"zslFree",(unsigned long)zslFree},
5578 {"zslRandomLevel",(unsigned long)zslRandomLevel},
5579 {"zslInsert",(unsigned long)zslInsert},
5580 {"zslDelete",(unsigned long)zslDelete},
5581 {"createZsetObject",(unsigned long)createZsetObject},
5582 {"zaddCommand",(unsigned long)zaddCommand},
5583 {"zrangeGenericCommand",(unsigned long)zrangeGenericCommand},
5584 {"zrangeCommand",(unsigned long)zrangeCommand},
5585 {"zrevrangeCommand",(unsigned long)zrevrangeCommand},
5586 {"zremCommand",(unsigned long)zremCommand},
5587 {"rdbSaveDoubleValue",(unsigned long)rdbSaveDoubleValue},
5588 {"rdbLoadDoubleValue",(unsigned long)rdbLoadDoubleValue},
5589 {"feedAppendOnlyFile",(unsigned long)feedAppendOnlyFile},
5590 {NULL,0}
5591 };
5592
5593 /* This function try to convert a pointer into a function name. It's used in
5594 * oreder to provide a backtrace under segmentation fault that's able to
5595 * display functions declared as static (otherwise the backtrace is useless). */
5596 static char *findFuncName(void *pointer, unsigned long *offset){
5597 int i, ret = -1;
5598 unsigned long off, minoff = 0;
5599
5600 /* Try to match against the Symbol with the smallest offset */
5601 for (i=0; symsTable[i].pointer; i++) {
5602 unsigned long lp = (unsigned long) pointer;
5603
5604 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5605 off=lp-symsTable[i].pointer;
5606 if (ret < 0 || off < minoff) {
5607 minoff=off;
5608 ret=i;
5609 }
5610 }
5611 }
5612 if (ret == -1) return NULL;
5613 *offset = minoff;
5614 return symsTable[ret].name;
5615 }
5616
5617 static void *getMcontextEip(ucontext_t *uc) {
5618 #if defined(__FreeBSD__)
5619 return (void*) uc->uc_mcontext.mc_eip;
5620 #elif defined(__dietlibc__)
5621 return (void*) uc->uc_mcontext.eip;
5622 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5623 return (void*) uc->uc_mcontext->__ss.__eip;
5624 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5625 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5626 return (void*) uc->uc_mcontext->__ss.__rip;
5627 #else
5628 return (void*) uc->uc_mcontext->__ss.__eip;
5629 #endif
5630 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5631 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5632 #elif defined(__ia64__) /* Linux IA64 */
5633 return (void*) uc->uc_mcontext.sc_ip;
5634 #else
5635 return NULL;
5636 #endif
5637 }
5638
5639 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5640 void *trace[100];
5641 char **messages = NULL;
5642 int i, trace_size = 0;
5643 unsigned long offset=0;
5644 time_t uptime = time(NULL)-server.stat_starttime;
5645 ucontext_t *uc = (ucontext_t*) secret;
5646 REDIS_NOTUSED(info);
5647
5648 redisLog(REDIS_WARNING,
5649 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5650 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5651 "redis_version:%s; "
5652 "uptime_in_seconds:%d; "
5653 "connected_clients:%d; "
5654 "connected_slaves:%d; "
5655 "used_memory:%zu; "
5656 "changes_since_last_save:%lld; "
5657 "bgsave_in_progress:%d; "
5658 "last_save_time:%d; "
5659 "total_connections_received:%lld; "
5660 "total_commands_processed:%lld; "
5661 "role:%s;"
5662 ,REDIS_VERSION,
5663 uptime,
5664 listLength(server.clients)-listLength(server.slaves),
5665 listLength(server.slaves),
5666 server.usedmemory,
5667 server.dirty,
5668 server.bgsaveinprogress,
5669 server.lastsave,
5670 server.stat_numconnections,
5671 server.stat_numcommands,
5672 server.masterhost == NULL ? "master" : "slave"
5673 ));
5674
5675 trace_size = backtrace(trace, 100);
5676 /* overwrite sigaction with caller's address */
5677 if (getMcontextEip(uc) != NULL) {
5678 trace[1] = getMcontextEip(uc);
5679 }
5680 messages = backtrace_symbols(trace, trace_size);
5681
5682 for (i=1; i<trace_size; ++i) {
5683 char *fn = findFuncName(trace[i], &offset), *p;
5684
5685 p = strchr(messages[i],'+');
5686 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5687 redisLog(REDIS_WARNING,"%s", messages[i]);
5688 } else {
5689 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5690 }
5691 }
5692 free(messages);
5693 exit(0);
5694 }
5695
5696 static void setupSigSegvAction(void) {
5697 struct sigaction act;
5698
5699 sigemptyset (&act.sa_mask);
5700 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5701 * is used. Otherwise, sa_handler is used */
5702 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5703 act.sa_sigaction = segvHandler;
5704 sigaction (SIGSEGV, &act, NULL);
5705 sigaction (SIGBUS, &act, NULL);
5706 sigaction (SIGFPE, &act, NULL);
5707 sigaction (SIGILL, &act, NULL);
5708 sigaction (SIGBUS, &act, NULL);
5709 return;
5710 }
5711 #else /* HAVE_BACKTRACE */
5712 static void setupSigSegvAction(void) {
5713 }
5714 #endif /* HAVE_BACKTRACE */
5715
5716 /* =================================== Main! ================================ */
5717
5718 #ifdef __linux__
5719 int linuxOvercommitMemoryValue(void) {
5720 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5721 char buf[64];
5722
5723 if (!fp) return -1;
5724 if (fgets(buf,64,fp) == NULL) {
5725 fclose(fp);
5726 return -1;
5727 }
5728 fclose(fp);
5729
5730 return atoi(buf);
5731 }
5732
5733 void linuxOvercommitMemoryWarning(void) {
5734 if (linuxOvercommitMemoryValue() == 0) {
5735 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5736 }
5737 }
5738 #endif /* __linux__ */
5739
5740 static void daemonize(void) {
5741 int fd;
5742 FILE *fp;
5743
5744 if (fork() != 0) exit(0); /* parent exits */
5745 setsid(); /* create a new session */
5746
5747 /* Every output goes to /dev/null. If Redis is daemonized but
5748 * the 'logfile' is set to 'stdout' in the configuration file
5749 * it will not log at all. */
5750 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5751 dup2(fd, STDIN_FILENO);
5752 dup2(fd, STDOUT_FILENO);
5753 dup2(fd, STDERR_FILENO);
5754 if (fd > STDERR_FILENO) close(fd);
5755 }
5756 /* Try to write the pid file */
5757 fp = fopen(server.pidfile,"w");
5758 if (fp) {
5759 fprintf(fp,"%d\n",getpid());
5760 fclose(fp);
5761 }
5762 }
5763
5764 int main(int argc, char **argv) {
5765 initServerConfig();
5766 if (argc == 2) {
5767 ResetServerSaveParams();
5768 loadServerConfig(argv[1]);
5769 } else if (argc > 2) {
5770 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5771 exit(1);
5772 } else {
5773 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5774 }
5775 initServer();
5776 if (server.daemonize) daemonize();
5777 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5778 #ifdef __linux__
5779 linuxOvercommitMemoryWarning();
5780 #endif
5781 if (server.appendonly) {
5782 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
5783 redisLog(REDIS_NOTICE,"DB loaded from append only file");
5784 } else {
5785 if (rdbLoad(server.dbfilename) == REDIS_OK)
5786 redisLog(REDIS_NOTICE,"DB loaded from disk");
5787 }
5788 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5789 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5790 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5791 aeMain(server.el);
5792 aeDeleteEventLoop(server.el);
5793 return 0;
5794 }