]> git.saurik.com Git - redis.git/blob - redis.c
90063525a199ace4d74137b9007147f77051e16c
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60 #include <math.h>
61
62 #if defined(__sun)
63 #include "solarisfixes.h"
64 #endif
65
66 #include "redis.h"
67 #include "ae.h" /* Event driven programming library */
68 #include "sds.h" /* Dynamic safe strings */
69 #include "anet.h" /* Networking the easy way */
70 #include "dict.h" /* Hash tables */
71 #include "adlist.h" /* Linked lists */
72 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
73 #include "lzf.h" /* LZF compression library */
74 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
75
76 /* Error codes */
77 #define REDIS_OK 0
78 #define REDIS_ERR -1
79
80 /* Static server configuration */
81 #define REDIS_SERVERPORT 6379 /* TCP port */
82 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
83 #define REDIS_IOBUF_LEN 1024
84 #define REDIS_LOADBUF_LEN 1024
85 #define REDIS_STATIC_ARGS 4
86 #define REDIS_DEFAULT_DBNUM 16
87 #define REDIS_CONFIGLINE_MAX 1024
88 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
89 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
90 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
91 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
92 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
93
94 /* Hash table parameters */
95 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
96
97 /* Command flags */
98 #define REDIS_CMD_BULK 1 /* Bulk write command */
99 #define REDIS_CMD_INLINE 2 /* Inline command */
100 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
101 this flags will return an error when the 'maxmemory' option is set in the
102 config file and the server is using more than maxmemory bytes of memory.
103 In short this commands are denied on low memory conditions. */
104 #define REDIS_CMD_DENYOOM 4
105
106 /* Object types */
107 #define REDIS_STRING 0
108 #define REDIS_LIST 1
109 #define REDIS_SET 2
110 #define REDIS_ZSET 3
111 #define REDIS_HASH 4
112
113 /* Objects encoding */
114 #define REDIS_ENCODING_RAW 0 /* Raw representation */
115 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
116
117 /* Object types only used for dumping to disk */
118 #define REDIS_EXPIRETIME 253
119 #define REDIS_SELECTDB 254
120 #define REDIS_EOF 255
121
122 /* Defines related to the dump file format. To store 32 bits lengths for short
123 * keys requires a lot of space, so we check the most significant 2 bits of
124 * the first byte to interpreter the length:
125 *
126 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
127 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
128 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
129 * 11|000000 this means: specially encoded object will follow. The six bits
130 * number specify the kind of object that follows.
131 * See the REDIS_RDB_ENC_* defines.
132 *
133 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
134 * values, will fit inside. */
135 #define REDIS_RDB_6BITLEN 0
136 #define REDIS_RDB_14BITLEN 1
137 #define REDIS_RDB_32BITLEN 2
138 #define REDIS_RDB_ENCVAL 3
139 #define REDIS_RDB_LENERR UINT_MAX
140
141 /* When a length of a string object stored on disk has the first two bits
142 * set, the remaining two bits specify a special encoding for the object
143 * accordingly to the following defines: */
144 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
145 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
146 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
147 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
148
149 /* Client flags */
150 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
151 #define REDIS_SLAVE 2 /* This client is a slave server */
152 #define REDIS_MASTER 4 /* This client is a master server */
153 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
154
155 /* Slave replication state - slave side */
156 #define REDIS_REPL_NONE 0 /* No active replication */
157 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
158 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
159
160 /* Slave replication state - from the point of view of master
161 * Note that in SEND_BULK and ONLINE state the slave receives new updates
162 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
163 * to start the next background saving in order to send updates to it. */
164 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
165 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
166 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
167 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
168
169 /* List related stuff */
170 #define REDIS_HEAD 0
171 #define REDIS_TAIL 1
172
173 /* Sort operations */
174 #define REDIS_SORT_GET 0
175 #define REDIS_SORT_ASC 1
176 #define REDIS_SORT_DESC 2
177 #define REDIS_SORTKEY_MAX 1024
178
179 /* Log levels */
180 #define REDIS_DEBUG 0
181 #define REDIS_NOTICE 1
182 #define REDIS_WARNING 2
183
184 /* Anti-warning macro... */
185 #define REDIS_NOTUSED(V) ((void) V)
186
187 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
188 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
189
190 /* Append only defines */
191 #define APPENDFSYNC_NO 0
192 #define APPENDFSYNC_ALWAYS 1
193 #define APPENDFSYNC_EVERYSEC 2
194
195 /*================================= Data types ============================== */
196
197 /* A redis object, that is a type able to hold a string / list / set */
198 typedef struct redisObject {
199 void *ptr;
200 unsigned char type;
201 unsigned char encoding;
202 unsigned char notused[2];
203 int refcount;
204 } robj;
205
206 typedef struct redisDb {
207 dict *dict;
208 dict *expires;
209 int id;
210 } redisDb;
211
212 /* With multiplexing we need to take per-clinet state.
213 * Clients are taken in a liked list. */
214 typedef struct redisClient {
215 int fd;
216 redisDb *db;
217 int dictid;
218 sds querybuf;
219 robj **argv, **mbargv;
220 int argc, mbargc;
221 int bulklen; /* bulk read len. -1 if not in bulk read mode */
222 int multibulk; /* multi bulk command format active */
223 list *reply;
224 int sentlen;
225 time_t lastinteraction; /* time of the last interaction, used for timeout */
226 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
227 int slaveseldb; /* slave selected db, if this client is a slave */
228 int authenticated; /* when requirepass is non-NULL */
229 int replstate; /* replication state if this is a slave */
230 int repldbfd; /* replication DB file descriptor */
231 long repldboff; /* replication DB file offset */
232 off_t repldbsize; /* replication DB file size */
233 } redisClient;
234
235 struct saveparam {
236 time_t seconds;
237 int changes;
238 };
239
240 /* Global server state structure */
241 struct redisServer {
242 int port;
243 int fd;
244 redisDb *db;
245 dict *sharingpool;
246 unsigned int sharingpoolsize;
247 long long dirty; /* changes to DB from the last save */
248 list *clients;
249 list *slaves, *monitors;
250 char neterr[ANET_ERR_LEN];
251 aeEventLoop *el;
252 int cronloops; /* number of times the cron function run */
253 list *objfreelist; /* A list of freed objects to avoid malloc() */
254 time_t lastsave; /* Unix time of last save succeeede */
255 size_t usedmemory; /* Used memory in megabytes */
256 /* Fields used only for stats */
257 time_t stat_starttime; /* server start time */
258 long long stat_numcommands; /* number of processed commands */
259 long long stat_numconnections; /* number of connections received */
260 /* Configuration */
261 int verbosity;
262 int glueoutputbuf;
263 int maxidletime;
264 int dbnum;
265 int daemonize;
266 int appendonly;
267 int appendfsync;
268 time_t lastfsync;
269 int appendfd;
270 int appendseldb;
271 char *pidfile;
272 int bgsaveinprogress;
273 pid_t bgsavechildpid;
274 struct saveparam *saveparams;
275 int saveparamslen;
276 char *logfile;
277 char *bindaddr;
278 char *dbfilename;
279 char *appendfilename;
280 char *requirepass;
281 int shareobjects;
282 /* Replication related */
283 int isslave;
284 char *masterauth;
285 char *masterhost;
286 int masterport;
287 redisClient *master; /* client that is master for this slave */
288 int replstate;
289 unsigned int maxclients;
290 unsigned long maxmemory;
291 /* Sort parameters - qsort_r() is only available under BSD so we
292 * have to take this state global, in order to pass it to sortCompare() */
293 int sort_desc;
294 int sort_alpha;
295 int sort_bypattern;
296 };
297
298 typedef void redisCommandProc(redisClient *c);
299 struct redisCommand {
300 char *name;
301 redisCommandProc *proc;
302 int arity;
303 int flags;
304 };
305
306 struct redisFunctionSym {
307 char *name;
308 unsigned long pointer;
309 };
310
311 typedef struct _redisSortObject {
312 robj *obj;
313 union {
314 double score;
315 robj *cmpobj;
316 } u;
317 } redisSortObject;
318
319 typedef struct _redisSortOperation {
320 int type;
321 robj *pattern;
322 } redisSortOperation;
323
324 /* ZSETs use a specialized version of Skiplists */
325
326 typedef struct zskiplistNode {
327 struct zskiplistNode **forward;
328 struct zskiplistNode *backward;
329 double score;
330 robj *obj;
331 } zskiplistNode;
332
333 typedef struct zskiplist {
334 struct zskiplistNode *header, *tail;
335 unsigned long length;
336 int level;
337 } zskiplist;
338
339 typedef struct zset {
340 dict *dict;
341 zskiplist *zsl;
342 } zset;
343
344 /* Our shared "common" objects */
345
346 struct sharedObjectsStruct {
347 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
348 *colon, *nullbulk, *nullmultibulk,
349 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
350 *outofrangeerr, *plus,
351 *select0, *select1, *select2, *select3, *select4,
352 *select5, *select6, *select7, *select8, *select9;
353 } shared;
354
355 /* Global vars that are actally used as constants. The following double
356 * values are used for double on-disk serialization, and are initialized
357 * at runtime to avoid strange compiler optimizations. */
358
359 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
360
361 /*================================ Prototypes =============================== */
362
363 static void freeStringObject(robj *o);
364 static void freeListObject(robj *o);
365 static void freeSetObject(robj *o);
366 static void decrRefCount(void *o);
367 static robj *createObject(int type, void *ptr);
368 static void freeClient(redisClient *c);
369 static int rdbLoad(char *filename);
370 static void addReply(redisClient *c, robj *obj);
371 static void addReplySds(redisClient *c, sds s);
372 static void incrRefCount(robj *o);
373 static int rdbSaveBackground(char *filename);
374 static robj *createStringObject(char *ptr, size_t len);
375 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
376 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
377 static int syncWithMaster(void);
378 static robj *tryObjectSharing(robj *o);
379 static int tryObjectEncoding(robj *o);
380 static robj *getDecodedObject(const robj *o);
381 static int removeExpire(redisDb *db, robj *key);
382 static int expireIfNeeded(redisDb *db, robj *key);
383 static int deleteIfVolatile(redisDb *db, robj *key);
384 static int deleteKey(redisDb *db, robj *key);
385 static time_t getExpire(redisDb *db, robj *key);
386 static int setExpire(redisDb *db, robj *key, time_t when);
387 static void updateSlavesWaitingBgsave(int bgsaveerr);
388 static void freeMemoryIfNeeded(void);
389 static int processCommand(redisClient *c);
390 static void setupSigSegvAction(void);
391 static void rdbRemoveTempFile(pid_t childpid);
392 static size_t stringObjectLen(robj *o);
393 static void processInputBuffer(redisClient *c);
394 static zskiplist *zslCreate(void);
395 static void zslFree(zskiplist *zsl);
396 static void zslInsert(zskiplist *zsl, double score, robj *obj);
397
398 static void authCommand(redisClient *c);
399 static void pingCommand(redisClient *c);
400 static void echoCommand(redisClient *c);
401 static void setCommand(redisClient *c);
402 static void setnxCommand(redisClient *c);
403 static void getCommand(redisClient *c);
404 static void delCommand(redisClient *c);
405 static void existsCommand(redisClient *c);
406 static void incrCommand(redisClient *c);
407 static void decrCommand(redisClient *c);
408 static void incrbyCommand(redisClient *c);
409 static void decrbyCommand(redisClient *c);
410 static void selectCommand(redisClient *c);
411 static void randomkeyCommand(redisClient *c);
412 static void keysCommand(redisClient *c);
413 static void dbsizeCommand(redisClient *c);
414 static void lastsaveCommand(redisClient *c);
415 static void saveCommand(redisClient *c);
416 static void bgsaveCommand(redisClient *c);
417 static void shutdownCommand(redisClient *c);
418 static void moveCommand(redisClient *c);
419 static void renameCommand(redisClient *c);
420 static void renamenxCommand(redisClient *c);
421 static void lpushCommand(redisClient *c);
422 static void rpushCommand(redisClient *c);
423 static void lpopCommand(redisClient *c);
424 static void rpopCommand(redisClient *c);
425 static void llenCommand(redisClient *c);
426 static void lindexCommand(redisClient *c);
427 static void lrangeCommand(redisClient *c);
428 static void ltrimCommand(redisClient *c);
429 static void typeCommand(redisClient *c);
430 static void lsetCommand(redisClient *c);
431 static void saddCommand(redisClient *c);
432 static void sremCommand(redisClient *c);
433 static void smoveCommand(redisClient *c);
434 static void sismemberCommand(redisClient *c);
435 static void scardCommand(redisClient *c);
436 static void spopCommand(redisClient *c);
437 static void srandmemberCommand(redisClient *c);
438 static void sinterCommand(redisClient *c);
439 static void sinterstoreCommand(redisClient *c);
440 static void sunionCommand(redisClient *c);
441 static void sunionstoreCommand(redisClient *c);
442 static void sdiffCommand(redisClient *c);
443 static void sdiffstoreCommand(redisClient *c);
444 static void syncCommand(redisClient *c);
445 static void flushdbCommand(redisClient *c);
446 static void flushallCommand(redisClient *c);
447 static void sortCommand(redisClient *c);
448 static void lremCommand(redisClient *c);
449 static void lpoppushCommand(redisClient *c);
450 static void infoCommand(redisClient *c);
451 static void mgetCommand(redisClient *c);
452 static void monitorCommand(redisClient *c);
453 static void expireCommand(redisClient *c);
454 static void expireatCommand(redisClient *c);
455 static void getsetCommand(redisClient *c);
456 static void ttlCommand(redisClient *c);
457 static void slaveofCommand(redisClient *c);
458 static void debugCommand(redisClient *c);
459 static void msetCommand(redisClient *c);
460 static void msetnxCommand(redisClient *c);
461 static void zaddCommand(redisClient *c);
462 static void zrangeCommand(redisClient *c);
463 static void zrangebyscoreCommand(redisClient *c);
464 static void zrevrangeCommand(redisClient *c);
465 static void zcardCommand(redisClient *c);
466 static void zremCommand(redisClient *c);
467 static void zscoreCommand(redisClient *c);
468 static void zremrangebyscoreCommand(redisClient *c);
469
470 /*================================= Globals ================================= */
471
472 /* Global vars */
473 static struct redisServer server; /* server global state */
474 static struct redisCommand cmdTable[] = {
475 {"get",getCommand,2,REDIS_CMD_INLINE},
476 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
477 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
478 {"del",delCommand,-2,REDIS_CMD_INLINE},
479 {"exists",existsCommand,2,REDIS_CMD_INLINE},
480 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
481 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
482 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
483 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
484 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
485 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
486 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
487 {"llen",llenCommand,2,REDIS_CMD_INLINE},
488 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
489 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
490 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
491 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
492 {"lrem",lremCommand,4,REDIS_CMD_BULK},
493 {"lpoppush",lpoppushCommand,3,REDIS_CMD_BULK},
494 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
495 {"srem",sremCommand,3,REDIS_CMD_BULK},
496 {"smove",smoveCommand,4,REDIS_CMD_BULK},
497 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
498 {"scard",scardCommand,2,REDIS_CMD_INLINE},
499 {"spop",spopCommand,2,REDIS_CMD_INLINE},
500 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
501 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
502 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
503 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
504 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
505 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
506 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
507 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
508 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
509 {"zrem",zremCommand,3,REDIS_CMD_BULK},
510 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
511 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
512 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
513 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
514 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
515 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
516 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
517 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
518 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
519 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
520 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
521 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
522 {"select",selectCommand,2,REDIS_CMD_INLINE},
523 {"move",moveCommand,3,REDIS_CMD_INLINE},
524 {"rename",renameCommand,3,REDIS_CMD_INLINE},
525 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
526 {"expire",expireCommand,3,REDIS_CMD_INLINE},
527 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
528 {"keys",keysCommand,2,REDIS_CMD_INLINE},
529 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
530 {"auth",authCommand,2,REDIS_CMD_INLINE},
531 {"ping",pingCommand,1,REDIS_CMD_INLINE},
532 {"echo",echoCommand,2,REDIS_CMD_BULK},
533 {"save",saveCommand,1,REDIS_CMD_INLINE},
534 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
535 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
536 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
537 {"type",typeCommand,2,REDIS_CMD_INLINE},
538 {"sync",syncCommand,1,REDIS_CMD_INLINE},
539 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
540 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
541 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
542 {"info",infoCommand,1,REDIS_CMD_INLINE},
543 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
544 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
545 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
546 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
547 {NULL,NULL,0,0}
548 };
549
550 /*============================ Utility functions ============================ */
551
552 /* Glob-style pattern matching. */
553 int stringmatchlen(const char *pattern, int patternLen,
554 const char *string, int stringLen, int nocase)
555 {
556 while(patternLen) {
557 switch(pattern[0]) {
558 case '*':
559 while (pattern[1] == '*') {
560 pattern++;
561 patternLen--;
562 }
563 if (patternLen == 1)
564 return 1; /* match */
565 while(stringLen) {
566 if (stringmatchlen(pattern+1, patternLen-1,
567 string, stringLen, nocase))
568 return 1; /* match */
569 string++;
570 stringLen--;
571 }
572 return 0; /* no match */
573 break;
574 case '?':
575 if (stringLen == 0)
576 return 0; /* no match */
577 string++;
578 stringLen--;
579 break;
580 case '[':
581 {
582 int not, match;
583
584 pattern++;
585 patternLen--;
586 not = pattern[0] == '^';
587 if (not) {
588 pattern++;
589 patternLen--;
590 }
591 match = 0;
592 while(1) {
593 if (pattern[0] == '\\') {
594 pattern++;
595 patternLen--;
596 if (pattern[0] == string[0])
597 match = 1;
598 } else if (pattern[0] == ']') {
599 break;
600 } else if (patternLen == 0) {
601 pattern--;
602 patternLen++;
603 break;
604 } else if (pattern[1] == '-' && patternLen >= 3) {
605 int start = pattern[0];
606 int end = pattern[2];
607 int c = string[0];
608 if (start > end) {
609 int t = start;
610 start = end;
611 end = t;
612 }
613 if (nocase) {
614 start = tolower(start);
615 end = tolower(end);
616 c = tolower(c);
617 }
618 pattern += 2;
619 patternLen -= 2;
620 if (c >= start && c <= end)
621 match = 1;
622 } else {
623 if (!nocase) {
624 if (pattern[0] == string[0])
625 match = 1;
626 } else {
627 if (tolower((int)pattern[0]) == tolower((int)string[0]))
628 match = 1;
629 }
630 }
631 pattern++;
632 patternLen--;
633 }
634 if (not)
635 match = !match;
636 if (!match)
637 return 0; /* no match */
638 string++;
639 stringLen--;
640 break;
641 }
642 case '\\':
643 if (patternLen >= 2) {
644 pattern++;
645 patternLen--;
646 }
647 /* fall through */
648 default:
649 if (!nocase) {
650 if (pattern[0] != string[0])
651 return 0; /* no match */
652 } else {
653 if (tolower((int)pattern[0]) != tolower((int)string[0]))
654 return 0; /* no match */
655 }
656 string++;
657 stringLen--;
658 break;
659 }
660 pattern++;
661 patternLen--;
662 if (stringLen == 0) {
663 while(*pattern == '*') {
664 pattern++;
665 patternLen--;
666 }
667 break;
668 }
669 }
670 if (patternLen == 0 && stringLen == 0)
671 return 1;
672 return 0;
673 }
674
675 static void redisLog(int level, const char *fmt, ...) {
676 va_list ap;
677 FILE *fp;
678
679 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
680 if (!fp) return;
681
682 va_start(ap, fmt);
683 if (level >= server.verbosity) {
684 char *c = ".-*";
685 char buf[64];
686 time_t now;
687
688 now = time(NULL);
689 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
690 fprintf(fp,"%s %c ",buf,c[level]);
691 vfprintf(fp, fmt, ap);
692 fprintf(fp,"\n");
693 fflush(fp);
694 }
695 va_end(ap);
696
697 if (server.logfile) fclose(fp);
698 }
699
700 /*====================== Hash table type implementation ==================== */
701
702 /* This is an hash table type that uses the SDS dynamic strings libary as
703 * keys and radis objects as values (objects can hold SDS strings,
704 * lists, sets). */
705
706 static void dictVanillaFree(void *privdata, void *val)
707 {
708 DICT_NOTUSED(privdata);
709 zfree(val);
710 }
711
712 static int sdsDictKeyCompare(void *privdata, const void *key1,
713 const void *key2)
714 {
715 int l1,l2;
716 DICT_NOTUSED(privdata);
717
718 l1 = sdslen((sds)key1);
719 l2 = sdslen((sds)key2);
720 if (l1 != l2) return 0;
721 return memcmp(key1, key2, l1) == 0;
722 }
723
724 static void dictRedisObjectDestructor(void *privdata, void *val)
725 {
726 DICT_NOTUSED(privdata);
727
728 decrRefCount(val);
729 }
730
731 static int dictObjKeyCompare(void *privdata, const void *key1,
732 const void *key2)
733 {
734 const robj *o1 = key1, *o2 = key2;
735 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
736 }
737
738 static unsigned int dictObjHash(const void *key) {
739 const robj *o = key;
740 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
741 }
742
743 static int dictEncObjKeyCompare(void *privdata, const void *key1,
744 const void *key2)
745 {
746 const robj *o1 = key1, *o2 = key2;
747
748 if (o1->encoding == REDIS_ENCODING_RAW &&
749 o2->encoding == REDIS_ENCODING_RAW)
750 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
751 else {
752 robj *dec1, *dec2;
753 int cmp;
754
755 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
756 getDecodedObject(o1) : (robj*)o1;
757 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
758 getDecodedObject(o2) : (robj*)o2;
759 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
760 if (dec1 != o1) decrRefCount(dec1);
761 if (dec2 != o2) decrRefCount(dec2);
762 return cmp;
763 }
764 }
765
766 static unsigned int dictEncObjHash(const void *key) {
767 const robj *o = key;
768
769 if (o->encoding == REDIS_ENCODING_RAW)
770 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
771 else {
772 robj *dec = getDecodedObject(o);
773 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
774 decrRefCount(dec);
775 return hash;
776 }
777 }
778
779 static dictType setDictType = {
780 dictEncObjHash, /* hash function */
781 NULL, /* key dup */
782 NULL, /* val dup */
783 dictEncObjKeyCompare, /* key compare */
784 dictRedisObjectDestructor, /* key destructor */
785 NULL /* val destructor */
786 };
787
788 static dictType zsetDictType = {
789 dictEncObjHash, /* hash function */
790 NULL, /* key dup */
791 NULL, /* val dup */
792 dictEncObjKeyCompare, /* key compare */
793 dictRedisObjectDestructor, /* key destructor */
794 dictVanillaFree /* val destructor */
795 };
796
797 static dictType hashDictType = {
798 dictObjHash, /* hash function */
799 NULL, /* key dup */
800 NULL, /* val dup */
801 dictObjKeyCompare, /* key compare */
802 dictRedisObjectDestructor, /* key destructor */
803 dictRedisObjectDestructor /* val destructor */
804 };
805
806 /* ========================= Random utility functions ======================= */
807
808 /* Redis generally does not try to recover from out of memory conditions
809 * when allocating objects or strings, it is not clear if it will be possible
810 * to report this condition to the client since the networking layer itself
811 * is based on heap allocation for send buffers, so we simply abort.
812 * At least the code will be simpler to read... */
813 static void oom(const char *msg) {
814 fprintf(stderr, "%s: Out of memory\n",msg);
815 fflush(stderr);
816 sleep(1);
817 abort();
818 }
819
820 /* ====================== Redis server networking stuff ===================== */
821 static void closeTimedoutClients(void) {
822 redisClient *c;
823 listNode *ln;
824 time_t now = time(NULL);
825
826 listRewind(server.clients);
827 while ((ln = listYield(server.clients)) != NULL) {
828 c = listNodeValue(ln);
829 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
830 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
831 (now - c->lastinteraction > server.maxidletime)) {
832 redisLog(REDIS_DEBUG,"Closing idle client");
833 freeClient(c);
834 }
835 }
836 }
837
838 static int htNeedsResize(dict *dict) {
839 long long size, used;
840
841 size = dictSlots(dict);
842 used = dictSize(dict);
843 return (size && used && size > DICT_HT_INITIAL_SIZE &&
844 (used*100/size < REDIS_HT_MINFILL));
845 }
846
847 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
848 * we resize the hash table to save memory */
849 static void tryResizeHashTables(void) {
850 int j;
851
852 for (j = 0; j < server.dbnum; j++) {
853 if (htNeedsResize(server.db[j].dict)) {
854 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
855 dictResize(server.db[j].dict);
856 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
857 }
858 if (htNeedsResize(server.db[j].expires))
859 dictResize(server.db[j].expires);
860 }
861 }
862
863 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
864 int j, loops = server.cronloops++;
865 REDIS_NOTUSED(eventLoop);
866 REDIS_NOTUSED(id);
867 REDIS_NOTUSED(clientData);
868
869 /* Update the global state with the amount of used memory */
870 server.usedmemory = zmalloc_used_memory();
871
872 /* Show some info about non-empty databases */
873 for (j = 0; j < server.dbnum; j++) {
874 long long size, used, vkeys;
875
876 size = dictSlots(server.db[j].dict);
877 used = dictSize(server.db[j].dict);
878 vkeys = dictSize(server.db[j].expires);
879 if (!(loops % 5) && (used || vkeys)) {
880 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
881 /* dictPrintStats(server.dict); */
882 }
883 }
884
885 /* We don't want to resize the hash tables while a bacground saving
886 * is in progress: the saving child is created using fork() that is
887 * implemented with a copy-on-write semantic in most modern systems, so
888 * if we resize the HT while there is the saving child at work actually
889 * a lot of memory movements in the parent will cause a lot of pages
890 * copied. */
891 if (!server.bgsaveinprogress) tryResizeHashTables();
892
893 /* Show information about connected clients */
894 if (!(loops % 5)) {
895 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
896 listLength(server.clients)-listLength(server.slaves),
897 listLength(server.slaves),
898 server.usedmemory,
899 dictSize(server.sharingpool));
900 }
901
902 /* Close connections of timedout clients */
903 if (server.maxidletime && !(loops % 10))
904 closeTimedoutClients();
905
906 /* Check if a background saving in progress terminated */
907 if (server.bgsaveinprogress) {
908 int statloc;
909 if (wait4(-1,&statloc,WNOHANG,NULL)) {
910 int exitcode = WEXITSTATUS(statloc);
911 int bysignal = WIFSIGNALED(statloc);
912
913 if (!bysignal && exitcode == 0) {
914 redisLog(REDIS_NOTICE,
915 "Background saving terminated with success");
916 server.dirty = 0;
917 server.lastsave = time(NULL);
918 } else if (!bysignal && exitcode != 0) {
919 redisLog(REDIS_WARNING, "Background saving error");
920 } else {
921 redisLog(REDIS_WARNING,
922 "Background saving terminated by signal");
923 rdbRemoveTempFile(server.bgsavechildpid);
924 }
925 server.bgsaveinprogress = 0;
926 server.bgsavechildpid = -1;
927 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
928 }
929 } else {
930 /* If there is not a background saving in progress check if
931 * we have to save now */
932 time_t now = time(NULL);
933 for (j = 0; j < server.saveparamslen; j++) {
934 struct saveparam *sp = server.saveparams+j;
935
936 if (server.dirty >= sp->changes &&
937 now-server.lastsave > sp->seconds) {
938 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
939 sp->changes, sp->seconds);
940 rdbSaveBackground(server.dbfilename);
941 break;
942 }
943 }
944 }
945
946 /* Try to expire a few timed out keys. The algorithm used is adaptive and
947 * will use few CPU cycles if there are few expiring keys, otherwise
948 * it will get more aggressive to avoid that too much memory is used by
949 * keys that can be removed from the keyspace. */
950 for (j = 0; j < server.dbnum; j++) {
951 int expired;
952 redisDb *db = server.db+j;
953
954 /* Continue to expire if at the end of the cycle more than 25%
955 * of the keys were expired. */
956 do {
957 int num = dictSize(db->expires);
958 time_t now = time(NULL);
959
960 expired = 0;
961 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
962 num = REDIS_EXPIRELOOKUPS_PER_CRON;
963 while (num--) {
964 dictEntry *de;
965 time_t t;
966
967 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
968 t = (time_t) dictGetEntryVal(de);
969 if (now > t) {
970 deleteKey(db,dictGetEntryKey(de));
971 expired++;
972 }
973 }
974 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
975 }
976
977 /* Check if we should connect to a MASTER */
978 if (server.replstate == REDIS_REPL_CONNECT) {
979 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
980 if (syncWithMaster() == REDIS_OK) {
981 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
982 }
983 }
984 return 1000;
985 }
986
987 static void createSharedObjects(void) {
988 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
989 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
990 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
991 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
992 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
993 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
994 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
995 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
996 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
997 /* no such key */
998 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
999 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1000 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1001 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1002 "-ERR no such key\r\n"));
1003 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1004 "-ERR syntax error\r\n"));
1005 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1006 "-ERR source and destination objects are the same\r\n"));
1007 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1008 "-ERR index out of range\r\n"));
1009 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1010 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1011 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1012 shared.select0 = createStringObject("select 0\r\n",10);
1013 shared.select1 = createStringObject("select 1\r\n",10);
1014 shared.select2 = createStringObject("select 2\r\n",10);
1015 shared.select3 = createStringObject("select 3\r\n",10);
1016 shared.select4 = createStringObject("select 4\r\n",10);
1017 shared.select5 = createStringObject("select 5\r\n",10);
1018 shared.select6 = createStringObject("select 6\r\n",10);
1019 shared.select7 = createStringObject("select 7\r\n",10);
1020 shared.select8 = createStringObject("select 8\r\n",10);
1021 shared.select9 = createStringObject("select 9\r\n",10);
1022 }
1023
1024 static void appendServerSaveParams(time_t seconds, int changes) {
1025 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1026 server.saveparams[server.saveparamslen].seconds = seconds;
1027 server.saveparams[server.saveparamslen].changes = changes;
1028 server.saveparamslen++;
1029 }
1030
1031 static void resetServerSaveParams() {
1032 zfree(server.saveparams);
1033 server.saveparams = NULL;
1034 server.saveparamslen = 0;
1035 }
1036
1037 static void initServerConfig() {
1038 server.dbnum = REDIS_DEFAULT_DBNUM;
1039 server.port = REDIS_SERVERPORT;
1040 server.verbosity = REDIS_DEBUG;
1041 server.maxidletime = REDIS_MAXIDLETIME;
1042 server.saveparams = NULL;
1043 server.logfile = NULL; /* NULL = log on standard output */
1044 server.bindaddr = NULL;
1045 server.glueoutputbuf = 1;
1046 server.daemonize = 0;
1047 server.appendonly = 0;
1048 server.appendfsync = APPENDFSYNC_ALWAYS;
1049 server.lastfsync = time(NULL);
1050 server.appendfd = -1;
1051 server.appendseldb = -1; /* Make sure the first time will not match */
1052 server.pidfile = "/var/run/redis.pid";
1053 server.dbfilename = "dump.rdb";
1054 server.appendfilename = "appendonly.log";
1055 server.requirepass = NULL;
1056 server.shareobjects = 0;
1057 server.sharingpoolsize = 1024;
1058 server.maxclients = 0;
1059 server.maxmemory = 0;
1060 resetServerSaveParams();
1061
1062 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1063 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1064 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1065 /* Replication related */
1066 server.isslave = 0;
1067 server.masterauth = NULL;
1068 server.masterhost = NULL;
1069 server.masterport = 6379;
1070 server.master = NULL;
1071 server.replstate = REDIS_REPL_NONE;
1072
1073 /* Double constants initialization */
1074 R_Zero = 0.0;
1075 R_PosInf = 1.0/R_Zero;
1076 R_NegInf = -1.0/R_Zero;
1077 R_Nan = R_Zero/R_Zero;
1078 }
1079
1080 static void initServer() {
1081 int j;
1082
1083 signal(SIGHUP, SIG_IGN);
1084 signal(SIGPIPE, SIG_IGN);
1085 setupSigSegvAction();
1086
1087 server.clients = listCreate();
1088 server.slaves = listCreate();
1089 server.monitors = listCreate();
1090 server.objfreelist = listCreate();
1091 createSharedObjects();
1092 server.el = aeCreateEventLoop();
1093 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1094 server.sharingpool = dictCreate(&setDictType,NULL);
1095 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1096 if (server.fd == -1) {
1097 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1098 exit(1);
1099 }
1100 for (j = 0; j < server.dbnum; j++) {
1101 server.db[j].dict = dictCreate(&hashDictType,NULL);
1102 server.db[j].expires = dictCreate(&setDictType,NULL);
1103 server.db[j].id = j;
1104 }
1105 server.cronloops = 0;
1106 server.bgsaveinprogress = 0;
1107 server.bgsavechildpid = -1;
1108 server.lastsave = time(NULL);
1109 server.dirty = 0;
1110 server.usedmemory = 0;
1111 server.stat_numcommands = 0;
1112 server.stat_numconnections = 0;
1113 server.stat_starttime = time(NULL);
1114 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1115
1116 if (server.appendonly) {
1117 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1118 if (server.appendfd == -1) {
1119 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1120 strerror(errno));
1121 exit(1);
1122 }
1123 }
1124 }
1125
1126 /* Empty the whole database */
1127 static long long emptyDb() {
1128 int j;
1129 long long removed = 0;
1130
1131 for (j = 0; j < server.dbnum; j++) {
1132 removed += dictSize(server.db[j].dict);
1133 dictEmpty(server.db[j].dict);
1134 dictEmpty(server.db[j].expires);
1135 }
1136 return removed;
1137 }
1138
1139 static int yesnotoi(char *s) {
1140 if (!strcasecmp(s,"yes")) return 1;
1141 else if (!strcasecmp(s,"no")) return 0;
1142 else return -1;
1143 }
1144
1145 /* I agree, this is a very rudimental way to load a configuration...
1146 will improve later if the config gets more complex */
1147 static void loadServerConfig(char *filename) {
1148 FILE *fp;
1149 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1150 int linenum = 0;
1151 sds line = NULL;
1152
1153 if (filename[0] == '-' && filename[1] == '\0')
1154 fp = stdin;
1155 else {
1156 if ((fp = fopen(filename,"r")) == NULL) {
1157 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1158 exit(1);
1159 }
1160 }
1161
1162 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1163 sds *argv;
1164 int argc, j;
1165
1166 linenum++;
1167 line = sdsnew(buf);
1168 line = sdstrim(line," \t\r\n");
1169
1170 /* Skip comments and blank lines*/
1171 if (line[0] == '#' || line[0] == '\0') {
1172 sdsfree(line);
1173 continue;
1174 }
1175
1176 /* Split into arguments */
1177 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1178 sdstolower(argv[0]);
1179
1180 /* Execute config directives */
1181 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1182 server.maxidletime = atoi(argv[1]);
1183 if (server.maxidletime < 0) {
1184 err = "Invalid timeout value"; goto loaderr;
1185 }
1186 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1187 server.port = atoi(argv[1]);
1188 if (server.port < 1 || server.port > 65535) {
1189 err = "Invalid port"; goto loaderr;
1190 }
1191 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1192 server.bindaddr = zstrdup(argv[1]);
1193 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1194 int seconds = atoi(argv[1]);
1195 int changes = atoi(argv[2]);
1196 if (seconds < 1 || changes < 0) {
1197 err = "Invalid save parameters"; goto loaderr;
1198 }
1199 appendServerSaveParams(seconds,changes);
1200 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1201 if (chdir(argv[1]) == -1) {
1202 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1203 argv[1], strerror(errno));
1204 exit(1);
1205 }
1206 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1207 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1208 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1209 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1210 else {
1211 err = "Invalid log level. Must be one of debug, notice, warning";
1212 goto loaderr;
1213 }
1214 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1215 FILE *logfp;
1216
1217 server.logfile = zstrdup(argv[1]);
1218 if (!strcasecmp(server.logfile,"stdout")) {
1219 zfree(server.logfile);
1220 server.logfile = NULL;
1221 }
1222 if (server.logfile) {
1223 /* Test if we are able to open the file. The server will not
1224 * be able to abort just for this problem later... */
1225 logfp = fopen(server.logfile,"a");
1226 if (logfp == NULL) {
1227 err = sdscatprintf(sdsempty(),
1228 "Can't open the log file: %s", strerror(errno));
1229 goto loaderr;
1230 }
1231 fclose(logfp);
1232 }
1233 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1234 server.dbnum = atoi(argv[1]);
1235 if (server.dbnum < 1) {
1236 err = "Invalid number of databases"; goto loaderr;
1237 }
1238 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1239 server.maxclients = atoi(argv[1]);
1240 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1241 server.maxmemory = strtoll(argv[1], NULL, 10);
1242 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1243 server.masterhost = sdsnew(argv[1]);
1244 server.masterport = atoi(argv[2]);
1245 server.replstate = REDIS_REPL_CONNECT;
1246 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1247 server.masterauth = zstrdup(argv[1]);
1248 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1249 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1250 err = "argument must be 'yes' or 'no'"; goto loaderr;
1251 }
1252 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1253 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1254 err = "argument must be 'yes' or 'no'"; goto loaderr;
1255 }
1256 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1257 server.sharingpoolsize = atoi(argv[1]);
1258 if (server.sharingpoolsize < 1) {
1259 err = "invalid object sharing pool size"; goto loaderr;
1260 }
1261 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1262 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1263 err = "argument must be 'yes' or 'no'"; goto loaderr;
1264 }
1265 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1266 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1267 err = "argument must be 'yes' or 'no'"; goto loaderr;
1268 }
1269 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1270 if (!strcasecmp(argv[1],"no")) {
1271 server.appendfsync = APPENDFSYNC_NO;
1272 } else if (!strcasecmp(argv[1],"always")) {
1273 server.appendfsync = APPENDFSYNC_ALWAYS;
1274 } else if (!strcasecmp(argv[1],"everysec")) {
1275 server.appendfsync = APPENDFSYNC_EVERYSEC;
1276 } else {
1277 err = "argument must be 'no', 'always' or 'everysec'";
1278 goto loaderr;
1279 }
1280 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1281 server.requirepass = zstrdup(argv[1]);
1282 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1283 server.pidfile = zstrdup(argv[1]);
1284 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1285 server.dbfilename = zstrdup(argv[1]);
1286 } else {
1287 err = "Bad directive or wrong number of arguments"; goto loaderr;
1288 }
1289 for (j = 0; j < argc; j++)
1290 sdsfree(argv[j]);
1291 zfree(argv);
1292 sdsfree(line);
1293 }
1294 if (fp != stdin) fclose(fp);
1295 return;
1296
1297 loaderr:
1298 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1299 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1300 fprintf(stderr, ">>> '%s'\n", line);
1301 fprintf(stderr, "%s\n", err);
1302 exit(1);
1303 }
1304
1305 static void freeClientArgv(redisClient *c) {
1306 int j;
1307
1308 for (j = 0; j < c->argc; j++)
1309 decrRefCount(c->argv[j]);
1310 for (j = 0; j < c->mbargc; j++)
1311 decrRefCount(c->mbargv[j]);
1312 c->argc = 0;
1313 c->mbargc = 0;
1314 }
1315
1316 static void freeClient(redisClient *c) {
1317 listNode *ln;
1318
1319 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1320 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1321 sdsfree(c->querybuf);
1322 listRelease(c->reply);
1323 freeClientArgv(c);
1324 close(c->fd);
1325 ln = listSearchKey(server.clients,c);
1326 assert(ln != NULL);
1327 listDelNode(server.clients,ln);
1328 if (c->flags & REDIS_SLAVE) {
1329 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1330 close(c->repldbfd);
1331 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1332 ln = listSearchKey(l,c);
1333 assert(ln != NULL);
1334 listDelNode(l,ln);
1335 }
1336 if (c->flags & REDIS_MASTER) {
1337 server.master = NULL;
1338 server.replstate = REDIS_REPL_CONNECT;
1339 }
1340 zfree(c->argv);
1341 zfree(c->mbargv);
1342 zfree(c);
1343 }
1344
1345 static void glueReplyBuffersIfNeeded(redisClient *c) {
1346 int totlen = 0;
1347 listNode *ln;
1348 robj *o;
1349
1350 listRewind(c->reply);
1351 while((ln = listYield(c->reply))) {
1352 o = ln->value;
1353 totlen += sdslen(o->ptr);
1354 /* This optimization makes more sense if we don't have to copy
1355 * too much data */
1356 if (totlen > 1024) return;
1357 }
1358 if (totlen > 0) {
1359 char buf[1024];
1360 int copylen = 0;
1361
1362 listRewind(c->reply);
1363 while((ln = listYield(c->reply))) {
1364 o = ln->value;
1365 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1366 copylen += sdslen(o->ptr);
1367 listDelNode(c->reply,ln);
1368 }
1369 /* Now the output buffer is empty, add the new single element */
1370 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1371 listAddNodeTail(c->reply,o);
1372 }
1373 }
1374
1375 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1376 redisClient *c = privdata;
1377 int nwritten = 0, totwritten = 0, objlen;
1378 robj *o;
1379 REDIS_NOTUSED(el);
1380 REDIS_NOTUSED(mask);
1381
1382 if (server.glueoutputbuf && listLength(c->reply) > 1)
1383 glueReplyBuffersIfNeeded(c);
1384 while(listLength(c->reply)) {
1385 o = listNodeValue(listFirst(c->reply));
1386 objlen = sdslen(o->ptr);
1387
1388 if (objlen == 0) {
1389 listDelNode(c->reply,listFirst(c->reply));
1390 continue;
1391 }
1392
1393 if (c->flags & REDIS_MASTER) {
1394 /* Don't reply to a master */
1395 nwritten = objlen - c->sentlen;
1396 } else {
1397 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1398 if (nwritten <= 0) break;
1399 }
1400 c->sentlen += nwritten;
1401 totwritten += nwritten;
1402 /* If we fully sent the object on head go to the next one */
1403 if (c->sentlen == objlen) {
1404 listDelNode(c->reply,listFirst(c->reply));
1405 c->sentlen = 0;
1406 }
1407 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1408 * bytes, in a single threaded server it's a good idea to serve
1409 * other clients as well, even if a very large request comes from
1410 * super fast link that is always able to accept data (in real world
1411 * scenario think about 'KEYS *' against the loopback interfae) */
1412 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1413 }
1414 if (nwritten == -1) {
1415 if (errno == EAGAIN) {
1416 nwritten = 0;
1417 } else {
1418 redisLog(REDIS_DEBUG,
1419 "Error writing to client: %s", strerror(errno));
1420 freeClient(c);
1421 return;
1422 }
1423 }
1424 if (totwritten > 0) c->lastinteraction = time(NULL);
1425 if (listLength(c->reply) == 0) {
1426 c->sentlen = 0;
1427 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1428 }
1429 }
1430
1431 static struct redisCommand *lookupCommand(char *name) {
1432 int j = 0;
1433 while(cmdTable[j].name != NULL) {
1434 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1435 j++;
1436 }
1437 return NULL;
1438 }
1439
1440 /* resetClient prepare the client to process the next command */
1441 static void resetClient(redisClient *c) {
1442 freeClientArgv(c);
1443 c->bulklen = -1;
1444 c->multibulk = 0;
1445 }
1446
1447 /* If this function gets called we already read a whole
1448 * command, argments are in the client argv/argc fields.
1449 * processCommand() execute the command or prepare the
1450 * server for a bulk read from the client.
1451 *
1452 * If 1 is returned the client is still alive and valid and
1453 * and other operations can be performed by the caller. Otherwise
1454 * if 0 is returned the client was destroied (i.e. after QUIT). */
1455 static int processCommand(redisClient *c) {
1456 struct redisCommand *cmd;
1457 long long dirty;
1458
1459 /* Free some memory if needed (maxmemory setting) */
1460 if (server.maxmemory) freeMemoryIfNeeded();
1461
1462 /* Handle the multi bulk command type. This is an alternative protocol
1463 * supported by Redis in order to receive commands that are composed of
1464 * multiple binary-safe "bulk" arguments. The latency of processing is
1465 * a bit higher but this allows things like multi-sets, so if this
1466 * protocol is used only for MSET and similar commands this is a big win. */
1467 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1468 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1469 if (c->multibulk <= 0) {
1470 resetClient(c);
1471 return 1;
1472 } else {
1473 decrRefCount(c->argv[c->argc-1]);
1474 c->argc--;
1475 return 1;
1476 }
1477 } else if (c->multibulk) {
1478 if (c->bulklen == -1) {
1479 if (((char*)c->argv[0]->ptr)[0] != '$') {
1480 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1481 resetClient(c);
1482 return 1;
1483 } else {
1484 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1485 decrRefCount(c->argv[0]);
1486 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1487 c->argc--;
1488 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1489 resetClient(c);
1490 return 1;
1491 }
1492 c->argc--;
1493 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1494 return 1;
1495 }
1496 } else {
1497 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1498 c->mbargv[c->mbargc] = c->argv[0];
1499 c->mbargc++;
1500 c->argc--;
1501 c->multibulk--;
1502 if (c->multibulk == 0) {
1503 robj **auxargv;
1504 int auxargc;
1505
1506 /* Here we need to swap the multi-bulk argc/argv with the
1507 * normal argc/argv of the client structure. */
1508 auxargv = c->argv;
1509 c->argv = c->mbargv;
1510 c->mbargv = auxargv;
1511
1512 auxargc = c->argc;
1513 c->argc = c->mbargc;
1514 c->mbargc = auxargc;
1515
1516 /* We need to set bulklen to something different than -1
1517 * in order for the code below to process the command without
1518 * to try to read the last argument of a bulk command as
1519 * a special argument. */
1520 c->bulklen = 0;
1521 /* continue below and process the command */
1522 } else {
1523 c->bulklen = -1;
1524 return 1;
1525 }
1526 }
1527 }
1528 /* -- end of multi bulk commands processing -- */
1529
1530 /* The QUIT command is handled as a special case. Normal command
1531 * procs are unable to close the client connection safely */
1532 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1533 freeClient(c);
1534 return 0;
1535 }
1536 cmd = lookupCommand(c->argv[0]->ptr);
1537 if (!cmd) {
1538 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1539 resetClient(c);
1540 return 1;
1541 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1542 (c->argc < -cmd->arity)) {
1543 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1544 resetClient(c);
1545 return 1;
1546 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1547 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1548 resetClient(c);
1549 return 1;
1550 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1551 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1552
1553 decrRefCount(c->argv[c->argc-1]);
1554 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1555 c->argc--;
1556 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1557 resetClient(c);
1558 return 1;
1559 }
1560 c->argc--;
1561 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1562 /* It is possible that the bulk read is already in the
1563 * buffer. Check this condition and handle it accordingly.
1564 * This is just a fast path, alternative to call processInputBuffer().
1565 * It's a good idea since the code is small and this condition
1566 * happens most of the times. */
1567 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1568 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1569 c->argc++;
1570 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1571 } else {
1572 return 1;
1573 }
1574 }
1575 /* Let's try to share objects on the command arguments vector */
1576 if (server.shareobjects) {
1577 int j;
1578 for(j = 1; j < c->argc; j++)
1579 c->argv[j] = tryObjectSharing(c->argv[j]);
1580 }
1581 /* Let's try to encode the bulk object to save space. */
1582 if (cmd->flags & REDIS_CMD_BULK)
1583 tryObjectEncoding(c->argv[c->argc-1]);
1584
1585 /* Check if the user is authenticated */
1586 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1587 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1588 resetClient(c);
1589 return 1;
1590 }
1591
1592 /* Exec the command */
1593 dirty = server.dirty;
1594 cmd->proc(c);
1595 if (server.appendonly && server.dirty-dirty)
1596 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1597 if (server.dirty-dirty && listLength(server.slaves))
1598 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1599 if (listLength(server.monitors))
1600 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1601 server.stat_numcommands++;
1602
1603 /* Prepare the client for the next command */
1604 if (c->flags & REDIS_CLOSE) {
1605 freeClient(c);
1606 return 0;
1607 }
1608 resetClient(c);
1609 return 1;
1610 }
1611
1612 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1613 listNode *ln;
1614 int outc = 0, j;
1615 robj **outv;
1616 /* (args*2)+1 is enough room for args, spaces, newlines */
1617 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1618
1619 if (argc <= REDIS_STATIC_ARGS) {
1620 outv = static_outv;
1621 } else {
1622 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1623 }
1624
1625 for (j = 0; j < argc; j++) {
1626 if (j != 0) outv[outc++] = shared.space;
1627 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1628 robj *lenobj;
1629
1630 lenobj = createObject(REDIS_STRING,
1631 sdscatprintf(sdsempty(),"%d\r\n",
1632 stringObjectLen(argv[j])));
1633 lenobj->refcount = 0;
1634 outv[outc++] = lenobj;
1635 }
1636 outv[outc++] = argv[j];
1637 }
1638 outv[outc++] = shared.crlf;
1639
1640 /* Increment all the refcounts at start and decrement at end in order to
1641 * be sure to free objects if there is no slave in a replication state
1642 * able to be feed with commands */
1643 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1644 listRewind(slaves);
1645 while((ln = listYield(slaves))) {
1646 redisClient *slave = ln->value;
1647
1648 /* Don't feed slaves that are still waiting for BGSAVE to start */
1649 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1650
1651 /* Feed all the other slaves, MONITORs and so on */
1652 if (slave->slaveseldb != dictid) {
1653 robj *selectcmd;
1654
1655 switch(dictid) {
1656 case 0: selectcmd = shared.select0; break;
1657 case 1: selectcmd = shared.select1; break;
1658 case 2: selectcmd = shared.select2; break;
1659 case 3: selectcmd = shared.select3; break;
1660 case 4: selectcmd = shared.select4; break;
1661 case 5: selectcmd = shared.select5; break;
1662 case 6: selectcmd = shared.select6; break;
1663 case 7: selectcmd = shared.select7; break;
1664 case 8: selectcmd = shared.select8; break;
1665 case 9: selectcmd = shared.select9; break;
1666 default:
1667 selectcmd = createObject(REDIS_STRING,
1668 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1669 selectcmd->refcount = 0;
1670 break;
1671 }
1672 addReply(slave,selectcmd);
1673 slave->slaveseldb = dictid;
1674 }
1675 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1676 }
1677 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1678 if (outv != static_outv) zfree(outv);
1679 }
1680
1681 static void processInputBuffer(redisClient *c) {
1682 again:
1683 if (c->bulklen == -1) {
1684 /* Read the first line of the query */
1685 char *p = strchr(c->querybuf,'\n');
1686 size_t querylen;
1687
1688 if (p) {
1689 sds query, *argv;
1690 int argc, j;
1691
1692 query = c->querybuf;
1693 c->querybuf = sdsempty();
1694 querylen = 1+(p-(query));
1695 if (sdslen(query) > querylen) {
1696 /* leave data after the first line of the query in the buffer */
1697 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1698 }
1699 *p = '\0'; /* remove "\n" */
1700 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1701 sdsupdatelen(query);
1702
1703 /* Now we can split the query in arguments */
1704 if (sdslen(query) == 0) {
1705 /* Ignore empty query */
1706 sdsfree(query);
1707 return;
1708 }
1709 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1710 sdsfree(query);
1711
1712 if (c->argv) zfree(c->argv);
1713 c->argv = zmalloc(sizeof(robj*)*argc);
1714
1715 for (j = 0; j < argc; j++) {
1716 if (sdslen(argv[j])) {
1717 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1718 c->argc++;
1719 } else {
1720 sdsfree(argv[j]);
1721 }
1722 }
1723 zfree(argv);
1724 /* Execute the command. If the client is still valid
1725 * after processCommand() return and there is something
1726 * on the query buffer try to process the next command. */
1727 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1728 return;
1729 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1730 redisLog(REDIS_DEBUG, "Client protocol error");
1731 freeClient(c);
1732 return;
1733 }
1734 } else {
1735 /* Bulk read handling. Note that if we are at this point
1736 the client already sent a command terminated with a newline,
1737 we are reading the bulk data that is actually the last
1738 argument of the command. */
1739 int qbl = sdslen(c->querybuf);
1740
1741 if (c->bulklen <= qbl) {
1742 /* Copy everything but the final CRLF as final argument */
1743 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1744 c->argc++;
1745 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1746 /* Process the command. If the client is still valid after
1747 * the processing and there is more data in the buffer
1748 * try to parse it. */
1749 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1750 return;
1751 }
1752 }
1753 }
1754
1755 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1756 redisClient *c = (redisClient*) privdata;
1757 char buf[REDIS_IOBUF_LEN];
1758 int nread;
1759 REDIS_NOTUSED(el);
1760 REDIS_NOTUSED(mask);
1761
1762 nread = read(fd, buf, REDIS_IOBUF_LEN);
1763 if (nread == -1) {
1764 if (errno == EAGAIN) {
1765 nread = 0;
1766 } else {
1767 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1768 freeClient(c);
1769 return;
1770 }
1771 } else if (nread == 0) {
1772 redisLog(REDIS_DEBUG, "Client closed connection");
1773 freeClient(c);
1774 return;
1775 }
1776 if (nread) {
1777 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1778 c->lastinteraction = time(NULL);
1779 } else {
1780 return;
1781 }
1782 processInputBuffer(c);
1783 }
1784
1785 static int selectDb(redisClient *c, int id) {
1786 if (id < 0 || id >= server.dbnum)
1787 return REDIS_ERR;
1788 c->db = &server.db[id];
1789 return REDIS_OK;
1790 }
1791
1792 static void *dupClientReplyValue(void *o) {
1793 incrRefCount((robj*)o);
1794 return 0;
1795 }
1796
1797 static redisClient *createClient(int fd) {
1798 redisClient *c = zmalloc(sizeof(*c));
1799
1800 anetNonBlock(NULL,fd);
1801 anetTcpNoDelay(NULL,fd);
1802 if (!c) return NULL;
1803 selectDb(c,0);
1804 c->fd = fd;
1805 c->querybuf = sdsempty();
1806 c->argc = 0;
1807 c->argv = NULL;
1808 c->bulklen = -1;
1809 c->multibulk = 0;
1810 c->mbargc = 0;
1811 c->mbargv = NULL;
1812 c->sentlen = 0;
1813 c->flags = 0;
1814 c->lastinteraction = time(NULL);
1815 c->authenticated = 0;
1816 c->replstate = REDIS_REPL_NONE;
1817 c->reply = listCreate();
1818 listSetFreeMethod(c->reply,decrRefCount);
1819 listSetDupMethod(c->reply,dupClientReplyValue);
1820 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1821 readQueryFromClient, c, NULL) == AE_ERR) {
1822 freeClient(c);
1823 return NULL;
1824 }
1825 listAddNodeTail(server.clients,c);
1826 return c;
1827 }
1828
1829 static void addReply(redisClient *c, robj *obj) {
1830 if (listLength(c->reply) == 0 &&
1831 (c->replstate == REDIS_REPL_NONE ||
1832 c->replstate == REDIS_REPL_ONLINE) &&
1833 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1834 sendReplyToClient, c, NULL) == AE_ERR) return;
1835 if (obj->encoding != REDIS_ENCODING_RAW) {
1836 obj = getDecodedObject(obj);
1837 } else {
1838 incrRefCount(obj);
1839 }
1840 listAddNodeTail(c->reply,obj);
1841 }
1842
1843 static void addReplySds(redisClient *c, sds s) {
1844 robj *o = createObject(REDIS_STRING,s);
1845 addReply(c,o);
1846 decrRefCount(o);
1847 }
1848
1849 static void addReplyBulkLen(redisClient *c, robj *obj) {
1850 size_t len;
1851
1852 if (obj->encoding == REDIS_ENCODING_RAW) {
1853 len = sdslen(obj->ptr);
1854 } else {
1855 long n = (long)obj->ptr;
1856
1857 len = 1;
1858 if (n < 0) {
1859 len++;
1860 n = -n;
1861 }
1862 while((n = n/10) != 0) {
1863 len++;
1864 }
1865 }
1866 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1867 }
1868
1869 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1870 int cport, cfd;
1871 char cip[128];
1872 redisClient *c;
1873 REDIS_NOTUSED(el);
1874 REDIS_NOTUSED(mask);
1875 REDIS_NOTUSED(privdata);
1876
1877 cfd = anetAccept(server.neterr, fd, cip, &cport);
1878 if (cfd == AE_ERR) {
1879 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1880 return;
1881 }
1882 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1883 if ((c = createClient(cfd)) == NULL) {
1884 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1885 close(cfd); /* May be already closed, just ingore errors */
1886 return;
1887 }
1888 /* If maxclient directive is set and this is one client more... close the
1889 * connection. Note that we create the client instead to check before
1890 * for this condition, since now the socket is already set in nonblocking
1891 * mode and we can send an error for free using the Kernel I/O */
1892 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1893 char *err = "-ERR max number of clients reached\r\n";
1894
1895 /* That's a best effort error message, don't check write errors */
1896 if (write(c->fd,err,strlen(err)) == -1) {
1897 /* Nothing to do, Just to avoid the warning... */
1898 }
1899 freeClient(c);
1900 return;
1901 }
1902 server.stat_numconnections++;
1903 }
1904
1905 /* ======================= Redis objects implementation ===================== */
1906
1907 static robj *createObject(int type, void *ptr) {
1908 robj *o;
1909
1910 if (listLength(server.objfreelist)) {
1911 listNode *head = listFirst(server.objfreelist);
1912 o = listNodeValue(head);
1913 listDelNode(server.objfreelist,head);
1914 } else {
1915 o = zmalloc(sizeof(*o));
1916 }
1917 o->type = type;
1918 o->encoding = REDIS_ENCODING_RAW;
1919 o->ptr = ptr;
1920 o->refcount = 1;
1921 return o;
1922 }
1923
1924 static robj *createStringObject(char *ptr, size_t len) {
1925 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1926 }
1927
1928 static robj *createListObject(void) {
1929 list *l = listCreate();
1930
1931 listSetFreeMethod(l,decrRefCount);
1932 return createObject(REDIS_LIST,l);
1933 }
1934
1935 static robj *createSetObject(void) {
1936 dict *d = dictCreate(&setDictType,NULL);
1937 return createObject(REDIS_SET,d);
1938 }
1939
1940 static robj *createZsetObject(void) {
1941 zset *zs = zmalloc(sizeof(*zs));
1942
1943 zs->dict = dictCreate(&zsetDictType,NULL);
1944 zs->zsl = zslCreate();
1945 return createObject(REDIS_ZSET,zs);
1946 }
1947
1948 static void freeStringObject(robj *o) {
1949 if (o->encoding == REDIS_ENCODING_RAW) {
1950 sdsfree(o->ptr);
1951 }
1952 }
1953
1954 static void freeListObject(robj *o) {
1955 listRelease((list*) o->ptr);
1956 }
1957
1958 static void freeSetObject(robj *o) {
1959 dictRelease((dict*) o->ptr);
1960 }
1961
1962 static void freeZsetObject(robj *o) {
1963 zset *zs = o->ptr;
1964
1965 dictRelease(zs->dict);
1966 zslFree(zs->zsl);
1967 zfree(zs);
1968 }
1969
1970 static void freeHashObject(robj *o) {
1971 dictRelease((dict*) o->ptr);
1972 }
1973
1974 static void incrRefCount(robj *o) {
1975 o->refcount++;
1976 #ifdef DEBUG_REFCOUNT
1977 if (o->type == REDIS_STRING)
1978 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1979 #endif
1980 }
1981
1982 static void decrRefCount(void *obj) {
1983 robj *o = obj;
1984
1985 #ifdef DEBUG_REFCOUNT
1986 if (o->type == REDIS_STRING)
1987 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1988 #endif
1989 if (--(o->refcount) == 0) {
1990 switch(o->type) {
1991 case REDIS_STRING: freeStringObject(o); break;
1992 case REDIS_LIST: freeListObject(o); break;
1993 case REDIS_SET: freeSetObject(o); break;
1994 case REDIS_ZSET: freeZsetObject(o); break;
1995 case REDIS_HASH: freeHashObject(o); break;
1996 default: assert(0 != 0); break;
1997 }
1998 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1999 !listAddNodeHead(server.objfreelist,o))
2000 zfree(o);
2001 }
2002 }
2003
2004 static robj *lookupKey(redisDb *db, robj *key) {
2005 dictEntry *de = dictFind(db->dict,key);
2006 return de ? dictGetEntryVal(de) : NULL;
2007 }
2008
2009 static robj *lookupKeyRead(redisDb *db, robj *key) {
2010 expireIfNeeded(db,key);
2011 return lookupKey(db,key);
2012 }
2013
2014 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2015 deleteIfVolatile(db,key);
2016 return lookupKey(db,key);
2017 }
2018
2019 static int deleteKey(redisDb *db, robj *key) {
2020 int retval;
2021
2022 /* We need to protect key from destruction: after the first dictDelete()
2023 * it may happen that 'key' is no longer valid if we don't increment
2024 * it's count. This may happen when we get the object reference directly
2025 * from the hash table with dictRandomKey() or dict iterators */
2026 incrRefCount(key);
2027 if (dictSize(db->expires)) dictDelete(db->expires,key);
2028 retval = dictDelete(db->dict,key);
2029 decrRefCount(key);
2030
2031 return retval == DICT_OK;
2032 }
2033
2034 /* Try to share an object against the shared objects pool */
2035 static robj *tryObjectSharing(robj *o) {
2036 struct dictEntry *de;
2037 unsigned long c;
2038
2039 if (o == NULL || server.shareobjects == 0) return o;
2040
2041 assert(o->type == REDIS_STRING);
2042 de = dictFind(server.sharingpool,o);
2043 if (de) {
2044 robj *shared = dictGetEntryKey(de);
2045
2046 c = ((unsigned long) dictGetEntryVal(de))+1;
2047 dictGetEntryVal(de) = (void*) c;
2048 incrRefCount(shared);
2049 decrRefCount(o);
2050 return shared;
2051 } else {
2052 /* Here we are using a stream algorihtm: Every time an object is
2053 * shared we increment its count, everytime there is a miss we
2054 * recrement the counter of a random object. If this object reaches
2055 * zero we remove the object and put the current object instead. */
2056 if (dictSize(server.sharingpool) >=
2057 server.sharingpoolsize) {
2058 de = dictGetRandomKey(server.sharingpool);
2059 assert(de != NULL);
2060 c = ((unsigned long) dictGetEntryVal(de))-1;
2061 dictGetEntryVal(de) = (void*) c;
2062 if (c == 0) {
2063 dictDelete(server.sharingpool,de->key);
2064 }
2065 } else {
2066 c = 0; /* If the pool is empty we want to add this object */
2067 }
2068 if (c == 0) {
2069 int retval;
2070
2071 retval = dictAdd(server.sharingpool,o,(void*)1);
2072 assert(retval == DICT_OK);
2073 incrRefCount(o);
2074 }
2075 return o;
2076 }
2077 }
2078
2079 /* Check if the nul-terminated string 's' can be represented by a long
2080 * (that is, is a number that fits into long without any other space or
2081 * character before or after the digits).
2082 *
2083 * If so, the function returns REDIS_OK and *longval is set to the value
2084 * of the number. Otherwise REDIS_ERR is returned */
2085 static int isStringRepresentableAsLong(sds s, long *longval) {
2086 char buf[32], *endptr;
2087 long value;
2088 int slen;
2089
2090 value = strtol(s, &endptr, 10);
2091 if (endptr[0] != '\0') return REDIS_ERR;
2092 slen = snprintf(buf,32,"%ld",value);
2093
2094 /* If the number converted back into a string is not identical
2095 * then it's not possible to encode the string as integer */
2096 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2097 if (longval) *longval = value;
2098 return REDIS_OK;
2099 }
2100
2101 /* Try to encode a string object in order to save space */
2102 static int tryObjectEncoding(robj *o) {
2103 long value;
2104 sds s = o->ptr;
2105
2106 if (o->encoding != REDIS_ENCODING_RAW)
2107 return REDIS_ERR; /* Already encoded */
2108
2109 /* It's not save to encode shared objects: shared objects can be shared
2110 * everywhere in the "object space" of Redis. Encoded objects can only
2111 * appear as "values" (and not, for instance, as keys) */
2112 if (o->refcount > 1) return REDIS_ERR;
2113
2114 /* Currently we try to encode only strings */
2115 assert(o->type == REDIS_STRING);
2116
2117 /* Check if we can represent this string as a long integer */
2118 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2119
2120 /* Ok, this object can be encoded */
2121 o->encoding = REDIS_ENCODING_INT;
2122 sdsfree(o->ptr);
2123 o->ptr = (void*) value;
2124 return REDIS_OK;
2125 }
2126
2127 /* Get a decoded version of an encoded object (returned as a new object) */
2128 static robj *getDecodedObject(const robj *o) {
2129 robj *dec;
2130
2131 assert(o->encoding != REDIS_ENCODING_RAW);
2132 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2133 char buf[32];
2134
2135 snprintf(buf,32,"%ld",(long)o->ptr);
2136 dec = createStringObject(buf,strlen(buf));
2137 return dec;
2138 } else {
2139 assert(1 != 1);
2140 }
2141 }
2142
2143 /* Compare two string objects via strcmp() or alike.
2144 * Note that the objects may be integer-encoded. In such a case we
2145 * use snprintf() to get a string representation of the numbers on the stack
2146 * and compare the strings, it's much faster than calling getDecodedObject(). */
2147 static int compareStringObjects(robj *a, robj *b) {
2148 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2149 char bufa[128], bufb[128], *astr, *bstr;
2150 int bothsds = 1;
2151
2152 if (a == b) return 0;
2153 if (a->encoding != REDIS_ENCODING_RAW) {
2154 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2155 astr = bufa;
2156 bothsds = 0;
2157 } else {
2158 astr = a->ptr;
2159 }
2160 if (b->encoding != REDIS_ENCODING_RAW) {
2161 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2162 bstr = bufb;
2163 bothsds = 0;
2164 } else {
2165 bstr = b->ptr;
2166 }
2167 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2168 }
2169
2170 static size_t stringObjectLen(robj *o) {
2171 assert(o->type == REDIS_STRING);
2172 if (o->encoding == REDIS_ENCODING_RAW) {
2173 return sdslen(o->ptr);
2174 } else {
2175 char buf[32];
2176
2177 return snprintf(buf,32,"%ld",(long)o->ptr);
2178 }
2179 }
2180
2181 /*============================ DB saving/loading ============================ */
2182
2183 static int rdbSaveType(FILE *fp, unsigned char type) {
2184 if (fwrite(&type,1,1,fp) == 0) return -1;
2185 return 0;
2186 }
2187
2188 static int rdbSaveTime(FILE *fp, time_t t) {
2189 int32_t t32 = (int32_t) t;
2190 if (fwrite(&t32,4,1,fp) == 0) return -1;
2191 return 0;
2192 }
2193
2194 /* check rdbLoadLen() comments for more info */
2195 static int rdbSaveLen(FILE *fp, uint32_t len) {
2196 unsigned char buf[2];
2197
2198 if (len < (1<<6)) {
2199 /* Save a 6 bit len */
2200 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2201 if (fwrite(buf,1,1,fp) == 0) return -1;
2202 } else if (len < (1<<14)) {
2203 /* Save a 14 bit len */
2204 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2205 buf[1] = len&0xFF;
2206 if (fwrite(buf,2,1,fp) == 0) return -1;
2207 } else {
2208 /* Save a 32 bit len */
2209 buf[0] = (REDIS_RDB_32BITLEN<<6);
2210 if (fwrite(buf,1,1,fp) == 0) return -1;
2211 len = htonl(len);
2212 if (fwrite(&len,4,1,fp) == 0) return -1;
2213 }
2214 return 0;
2215 }
2216
2217 /* String objects in the form "2391" "-100" without any space and with a
2218 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2219 * encoded as integers to save space */
2220 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2221 long long value;
2222 char *endptr, buf[32];
2223
2224 /* Check if it's possible to encode this value as a number */
2225 value = strtoll(s, &endptr, 10);
2226 if (endptr[0] != '\0') return 0;
2227 snprintf(buf,32,"%lld",value);
2228
2229 /* If the number converted back into a string is not identical
2230 * then it's not possible to encode the string as integer */
2231 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2232
2233 /* Finally check if it fits in our ranges */
2234 if (value >= -(1<<7) && value <= (1<<7)-1) {
2235 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2236 enc[1] = value&0xFF;
2237 return 2;
2238 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2239 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2240 enc[1] = value&0xFF;
2241 enc[2] = (value>>8)&0xFF;
2242 return 3;
2243 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2244 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2245 enc[1] = value&0xFF;
2246 enc[2] = (value>>8)&0xFF;
2247 enc[3] = (value>>16)&0xFF;
2248 enc[4] = (value>>24)&0xFF;
2249 return 5;
2250 } else {
2251 return 0;
2252 }
2253 }
2254
2255 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2256 unsigned int comprlen, outlen;
2257 unsigned char byte;
2258 void *out;
2259
2260 /* We require at least four bytes compression for this to be worth it */
2261 outlen = sdslen(obj->ptr)-4;
2262 if (outlen <= 0) return 0;
2263 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2264 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2265 if (comprlen == 0) {
2266 zfree(out);
2267 return 0;
2268 }
2269 /* Data compressed! Let's save it on disk */
2270 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2271 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2272 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2273 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2274 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2275 zfree(out);
2276 return comprlen;
2277
2278 writeerr:
2279 zfree(out);
2280 return -1;
2281 }
2282
2283 /* Save a string objet as [len][data] on disk. If the object is a string
2284 * representation of an integer value we try to safe it in a special form */
2285 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2286 size_t len;
2287 int enclen;
2288
2289 len = sdslen(obj->ptr);
2290
2291 /* Try integer encoding */
2292 if (len <= 11) {
2293 unsigned char buf[5];
2294 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2295 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2296 return 0;
2297 }
2298 }
2299
2300 /* Try LZF compression - under 20 bytes it's unable to compress even
2301 * aaaaaaaaaaaaaaaaaa so skip it */
2302 if (len > 20) {
2303 int retval;
2304
2305 retval = rdbSaveLzfStringObject(fp,obj);
2306 if (retval == -1) return -1;
2307 if (retval > 0) return 0;
2308 /* retval == 0 means data can't be compressed, save the old way */
2309 }
2310
2311 /* Store verbatim */
2312 if (rdbSaveLen(fp,len) == -1) return -1;
2313 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2314 return 0;
2315 }
2316
2317 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2318 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2319 int retval;
2320 robj *dec;
2321
2322 if (obj->encoding != REDIS_ENCODING_RAW) {
2323 dec = getDecodedObject(obj);
2324 retval = rdbSaveStringObjectRaw(fp,dec);
2325 decrRefCount(dec);
2326 return retval;
2327 } else {
2328 return rdbSaveStringObjectRaw(fp,obj);
2329 }
2330 }
2331
2332 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2333 * 8 bit integer specifing the length of the representation.
2334 * This 8 bit integer has special values in order to specify the following
2335 * conditions:
2336 * 253: not a number
2337 * 254: + inf
2338 * 255: - inf
2339 */
2340 static int rdbSaveDoubleValue(FILE *fp, double val) {
2341 unsigned char buf[128];
2342 int len;
2343
2344 if (isnan(val)) {
2345 buf[0] = 253;
2346 len = 1;
2347 } else if (!isfinite(val)) {
2348 len = 1;
2349 buf[0] = (val < 0) ? 255 : 254;
2350 } else {
2351 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2352 buf[0] = strlen((char*)buf);
2353 len = buf[0]+1;
2354 }
2355 if (fwrite(buf,len,1,fp) == 0) return -1;
2356 return 0;
2357 }
2358
2359 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2360 static int rdbSave(char *filename) {
2361 dictIterator *di = NULL;
2362 dictEntry *de;
2363 FILE *fp;
2364 char tmpfile[256];
2365 int j;
2366 time_t now = time(NULL);
2367
2368 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2369 fp = fopen(tmpfile,"w");
2370 if (!fp) {
2371 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2372 return REDIS_ERR;
2373 }
2374 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2375 for (j = 0; j < server.dbnum; j++) {
2376 redisDb *db = server.db+j;
2377 dict *d = db->dict;
2378 if (dictSize(d) == 0) continue;
2379 di = dictGetIterator(d);
2380 if (!di) {
2381 fclose(fp);
2382 return REDIS_ERR;
2383 }
2384
2385 /* Write the SELECT DB opcode */
2386 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2387 if (rdbSaveLen(fp,j) == -1) goto werr;
2388
2389 /* Iterate this DB writing every entry */
2390 while((de = dictNext(di)) != NULL) {
2391 robj *key = dictGetEntryKey(de);
2392 robj *o = dictGetEntryVal(de);
2393 time_t expiretime = getExpire(db,key);
2394
2395 /* Save the expire time */
2396 if (expiretime != -1) {
2397 /* If this key is already expired skip it */
2398 if (expiretime < now) continue;
2399 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2400 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2401 }
2402 /* Save the key and associated value */
2403 if (rdbSaveType(fp,o->type) == -1) goto werr;
2404 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2405 if (o->type == REDIS_STRING) {
2406 /* Save a string value */
2407 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2408 } else if (o->type == REDIS_LIST) {
2409 /* Save a list value */
2410 list *list = o->ptr;
2411 listNode *ln;
2412
2413 listRewind(list);
2414 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2415 while((ln = listYield(list))) {
2416 robj *eleobj = listNodeValue(ln);
2417
2418 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2419 }
2420 } else if (o->type == REDIS_SET) {
2421 /* Save a set value */
2422 dict *set = o->ptr;
2423 dictIterator *di = dictGetIterator(set);
2424 dictEntry *de;
2425
2426 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2427 while((de = dictNext(di)) != NULL) {
2428 robj *eleobj = dictGetEntryKey(de);
2429
2430 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2431 }
2432 dictReleaseIterator(di);
2433 } else if (o->type == REDIS_ZSET) {
2434 /* Save a set value */
2435 zset *zs = o->ptr;
2436 dictIterator *di = dictGetIterator(zs->dict);
2437 dictEntry *de;
2438
2439 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2440 while((de = dictNext(di)) != NULL) {
2441 robj *eleobj = dictGetEntryKey(de);
2442 double *score = dictGetEntryVal(de);
2443
2444 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2445 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2446 }
2447 dictReleaseIterator(di);
2448 } else {
2449 assert(0 != 0);
2450 }
2451 }
2452 dictReleaseIterator(di);
2453 }
2454 /* EOF opcode */
2455 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2456
2457 /* Make sure data will not remain on the OS's output buffers */
2458 fflush(fp);
2459 fsync(fileno(fp));
2460 fclose(fp);
2461
2462 /* Use RENAME to make sure the DB file is changed atomically only
2463 * if the generate DB file is ok. */
2464 if (rename(tmpfile,filename) == -1) {
2465 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2466 unlink(tmpfile);
2467 return REDIS_ERR;
2468 }
2469 redisLog(REDIS_NOTICE,"DB saved on disk");
2470 server.dirty = 0;
2471 server.lastsave = time(NULL);
2472 return REDIS_OK;
2473
2474 werr:
2475 fclose(fp);
2476 unlink(tmpfile);
2477 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2478 if (di) dictReleaseIterator(di);
2479 return REDIS_ERR;
2480 }
2481
2482 static int rdbSaveBackground(char *filename) {
2483 pid_t childpid;
2484
2485 if (server.bgsaveinprogress) return REDIS_ERR;
2486 if ((childpid = fork()) == 0) {
2487 /* Child */
2488 close(server.fd);
2489 if (rdbSave(filename) == REDIS_OK) {
2490 exit(0);
2491 } else {
2492 exit(1);
2493 }
2494 } else {
2495 /* Parent */
2496 if (childpid == -1) {
2497 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2498 strerror(errno));
2499 return REDIS_ERR;
2500 }
2501 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2502 server.bgsaveinprogress = 1;
2503 server.bgsavechildpid = childpid;
2504 return REDIS_OK;
2505 }
2506 return REDIS_OK; /* unreached */
2507 }
2508
2509 static void rdbRemoveTempFile(pid_t childpid) {
2510 char tmpfile[256];
2511
2512 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2513 unlink(tmpfile);
2514 }
2515
2516 static int rdbLoadType(FILE *fp) {
2517 unsigned char type;
2518 if (fread(&type,1,1,fp) == 0) return -1;
2519 return type;
2520 }
2521
2522 static time_t rdbLoadTime(FILE *fp) {
2523 int32_t t32;
2524 if (fread(&t32,4,1,fp) == 0) return -1;
2525 return (time_t) t32;
2526 }
2527
2528 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2529 * of this file for a description of how this are stored on disk.
2530 *
2531 * isencoded is set to 1 if the readed length is not actually a length but
2532 * an "encoding type", check the above comments for more info */
2533 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2534 unsigned char buf[2];
2535 uint32_t len;
2536
2537 if (isencoded) *isencoded = 0;
2538 if (rdbver == 0) {
2539 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2540 return ntohl(len);
2541 } else {
2542 int type;
2543
2544 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2545 type = (buf[0]&0xC0)>>6;
2546 if (type == REDIS_RDB_6BITLEN) {
2547 /* Read a 6 bit len */
2548 return buf[0]&0x3F;
2549 } else if (type == REDIS_RDB_ENCVAL) {
2550 /* Read a 6 bit len encoding type */
2551 if (isencoded) *isencoded = 1;
2552 return buf[0]&0x3F;
2553 } else if (type == REDIS_RDB_14BITLEN) {
2554 /* Read a 14 bit len */
2555 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2556 return ((buf[0]&0x3F)<<8)|buf[1];
2557 } else {
2558 /* Read a 32 bit len */
2559 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2560 return ntohl(len);
2561 }
2562 }
2563 }
2564
2565 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2566 unsigned char enc[4];
2567 long long val;
2568
2569 if (enctype == REDIS_RDB_ENC_INT8) {
2570 if (fread(enc,1,1,fp) == 0) return NULL;
2571 val = (signed char)enc[0];
2572 } else if (enctype == REDIS_RDB_ENC_INT16) {
2573 uint16_t v;
2574 if (fread(enc,2,1,fp) == 0) return NULL;
2575 v = enc[0]|(enc[1]<<8);
2576 val = (int16_t)v;
2577 } else if (enctype == REDIS_RDB_ENC_INT32) {
2578 uint32_t v;
2579 if (fread(enc,4,1,fp) == 0) return NULL;
2580 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2581 val = (int32_t)v;
2582 } else {
2583 val = 0; /* anti-warning */
2584 assert(0!=0);
2585 }
2586 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2587 }
2588
2589 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2590 unsigned int len, clen;
2591 unsigned char *c = NULL;
2592 sds val = NULL;
2593
2594 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2595 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2596 if ((c = zmalloc(clen)) == NULL) goto err;
2597 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2598 if (fread(c,clen,1,fp) == 0) goto err;
2599 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2600 zfree(c);
2601 return createObject(REDIS_STRING,val);
2602 err:
2603 zfree(c);
2604 sdsfree(val);
2605 return NULL;
2606 }
2607
2608 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2609 int isencoded;
2610 uint32_t len;
2611 sds val;
2612
2613 len = rdbLoadLen(fp,rdbver,&isencoded);
2614 if (isencoded) {
2615 switch(len) {
2616 case REDIS_RDB_ENC_INT8:
2617 case REDIS_RDB_ENC_INT16:
2618 case REDIS_RDB_ENC_INT32:
2619 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2620 case REDIS_RDB_ENC_LZF:
2621 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2622 default:
2623 assert(0!=0);
2624 }
2625 }
2626
2627 if (len == REDIS_RDB_LENERR) return NULL;
2628 val = sdsnewlen(NULL,len);
2629 if (len && fread(val,len,1,fp) == 0) {
2630 sdsfree(val);
2631 return NULL;
2632 }
2633 return tryObjectSharing(createObject(REDIS_STRING,val));
2634 }
2635
2636 /* For information about double serialization check rdbSaveDoubleValue() */
2637 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2638 char buf[128];
2639 unsigned char len;
2640
2641 if (fread(&len,1,1,fp) == 0) return -1;
2642 switch(len) {
2643 case 255: *val = R_NegInf; return 0;
2644 case 254: *val = R_PosInf; return 0;
2645 case 253: *val = R_Nan; return 0;
2646 default:
2647 if (fread(buf,len,1,fp) == 0) return -1;
2648 sscanf(buf, "%lg", val);
2649 return 0;
2650 }
2651 }
2652
2653 static int rdbLoad(char *filename) {
2654 FILE *fp;
2655 robj *keyobj = NULL;
2656 uint32_t dbid;
2657 int type, retval, rdbver;
2658 dict *d = server.db[0].dict;
2659 redisDb *db = server.db+0;
2660 char buf[1024];
2661 time_t expiretime = -1, now = time(NULL);
2662
2663 fp = fopen(filename,"r");
2664 if (!fp) return REDIS_ERR;
2665 if (fread(buf,9,1,fp) == 0) goto eoferr;
2666 buf[9] = '\0';
2667 if (memcmp(buf,"REDIS",5) != 0) {
2668 fclose(fp);
2669 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2670 return REDIS_ERR;
2671 }
2672 rdbver = atoi(buf+5);
2673 if (rdbver > 1) {
2674 fclose(fp);
2675 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2676 return REDIS_ERR;
2677 }
2678 while(1) {
2679 robj *o;
2680
2681 /* Read type. */
2682 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2683 if (type == REDIS_EXPIRETIME) {
2684 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2685 /* We read the time so we need to read the object type again */
2686 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2687 }
2688 if (type == REDIS_EOF) break;
2689 /* Handle SELECT DB opcode as a special case */
2690 if (type == REDIS_SELECTDB) {
2691 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2692 goto eoferr;
2693 if (dbid >= (unsigned)server.dbnum) {
2694 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2695 exit(1);
2696 }
2697 db = server.db+dbid;
2698 d = db->dict;
2699 continue;
2700 }
2701 /* Read key */
2702 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2703
2704 if (type == REDIS_STRING) {
2705 /* Read string value */
2706 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2707 tryObjectEncoding(o);
2708 } else if (type == REDIS_LIST || type == REDIS_SET) {
2709 /* Read list/set value */
2710 uint32_t listlen;
2711
2712 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2713 goto eoferr;
2714 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2715 /* Load every single element of the list/set */
2716 while(listlen--) {
2717 robj *ele;
2718
2719 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2720 tryObjectEncoding(ele);
2721 if (type == REDIS_LIST) {
2722 listAddNodeTail((list*)o->ptr,ele);
2723 } else {
2724 dictAdd((dict*)o->ptr,ele,NULL);
2725 }
2726 }
2727 } else if (type == REDIS_ZSET) {
2728 /* Read list/set value */
2729 uint32_t zsetlen;
2730 zset *zs;
2731
2732 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2733 goto eoferr;
2734 o = createZsetObject();
2735 zs = o->ptr;
2736 /* Load every single element of the list/set */
2737 while(zsetlen--) {
2738 robj *ele;
2739 double *score = zmalloc(sizeof(double));
2740
2741 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2742 tryObjectEncoding(ele);
2743 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2744 dictAdd(zs->dict,ele,score);
2745 zslInsert(zs->zsl,*score,ele);
2746 incrRefCount(ele); /* added to skiplist */
2747 }
2748 } else {
2749 assert(0 != 0);
2750 }
2751 /* Add the new object in the hash table */
2752 retval = dictAdd(d,keyobj,o);
2753 if (retval == DICT_ERR) {
2754 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2755 exit(1);
2756 }
2757 /* Set the expire time if needed */
2758 if (expiretime != -1) {
2759 setExpire(db,keyobj,expiretime);
2760 /* Delete this key if already expired */
2761 if (expiretime < now) deleteKey(db,keyobj);
2762 expiretime = -1;
2763 }
2764 keyobj = o = NULL;
2765 }
2766 fclose(fp);
2767 return REDIS_OK;
2768
2769 eoferr: /* unexpected end of file is handled here with a fatal exit */
2770 if (keyobj) decrRefCount(keyobj);
2771 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2772 exit(1);
2773 return REDIS_ERR; /* Just to avoid warning */
2774 }
2775
2776 /*================================== Commands =============================== */
2777
2778 static void authCommand(redisClient *c) {
2779 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2780 c->authenticated = 1;
2781 addReply(c,shared.ok);
2782 } else {
2783 c->authenticated = 0;
2784 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2785 }
2786 }
2787
2788 static void pingCommand(redisClient *c) {
2789 addReply(c,shared.pong);
2790 }
2791
2792 static void echoCommand(redisClient *c) {
2793 addReplyBulkLen(c,c->argv[1]);
2794 addReply(c,c->argv[1]);
2795 addReply(c,shared.crlf);
2796 }
2797
2798 /*=================================== Strings =============================== */
2799
2800 static void setGenericCommand(redisClient *c, int nx) {
2801 int retval;
2802
2803 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2804 if (retval == DICT_ERR) {
2805 if (!nx) {
2806 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2807 incrRefCount(c->argv[2]);
2808 } else {
2809 addReply(c,shared.czero);
2810 return;
2811 }
2812 } else {
2813 incrRefCount(c->argv[1]);
2814 incrRefCount(c->argv[2]);
2815 }
2816 server.dirty++;
2817 removeExpire(c->db,c->argv[1]);
2818 addReply(c, nx ? shared.cone : shared.ok);
2819 }
2820
2821 static void setCommand(redisClient *c) {
2822 setGenericCommand(c,0);
2823 }
2824
2825 static void setnxCommand(redisClient *c) {
2826 setGenericCommand(c,1);
2827 }
2828
2829 static void getCommand(redisClient *c) {
2830 robj *o = lookupKeyRead(c->db,c->argv[1]);
2831
2832 if (o == NULL) {
2833 addReply(c,shared.nullbulk);
2834 } else {
2835 if (o->type != REDIS_STRING) {
2836 addReply(c,shared.wrongtypeerr);
2837 } else {
2838 addReplyBulkLen(c,o);
2839 addReply(c,o);
2840 addReply(c,shared.crlf);
2841 }
2842 }
2843 }
2844
2845 static void getsetCommand(redisClient *c) {
2846 getCommand(c);
2847 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2848 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2849 } else {
2850 incrRefCount(c->argv[1]);
2851 }
2852 incrRefCount(c->argv[2]);
2853 server.dirty++;
2854 removeExpire(c->db,c->argv[1]);
2855 }
2856
2857 static void mgetCommand(redisClient *c) {
2858 int j;
2859
2860 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2861 for (j = 1; j < c->argc; j++) {
2862 robj *o = lookupKeyRead(c->db,c->argv[j]);
2863 if (o == NULL) {
2864 addReply(c,shared.nullbulk);
2865 } else {
2866 if (o->type != REDIS_STRING) {
2867 addReply(c,shared.nullbulk);
2868 } else {
2869 addReplyBulkLen(c,o);
2870 addReply(c,o);
2871 addReply(c,shared.crlf);
2872 }
2873 }
2874 }
2875 }
2876
2877 static void incrDecrCommand(redisClient *c, long long incr) {
2878 long long value;
2879 int retval;
2880 robj *o;
2881
2882 o = lookupKeyWrite(c->db,c->argv[1]);
2883 if (o == NULL) {
2884 value = 0;
2885 } else {
2886 if (o->type != REDIS_STRING) {
2887 value = 0;
2888 } else {
2889 char *eptr;
2890
2891 if (o->encoding == REDIS_ENCODING_RAW)
2892 value = strtoll(o->ptr, &eptr, 10);
2893 else if (o->encoding == REDIS_ENCODING_INT)
2894 value = (long)o->ptr;
2895 else
2896 assert(1 != 1);
2897 }
2898 }
2899
2900 value += incr;
2901 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2902 tryObjectEncoding(o);
2903 retval = dictAdd(c->db->dict,c->argv[1],o);
2904 if (retval == DICT_ERR) {
2905 dictReplace(c->db->dict,c->argv[1],o);
2906 removeExpire(c->db,c->argv[1]);
2907 } else {
2908 incrRefCount(c->argv[1]);
2909 }
2910 server.dirty++;
2911 addReply(c,shared.colon);
2912 addReply(c,o);
2913 addReply(c,shared.crlf);
2914 }
2915
2916 static void incrCommand(redisClient *c) {
2917 incrDecrCommand(c,1);
2918 }
2919
2920 static void decrCommand(redisClient *c) {
2921 incrDecrCommand(c,-1);
2922 }
2923
2924 static void incrbyCommand(redisClient *c) {
2925 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2926 incrDecrCommand(c,incr);
2927 }
2928
2929 static void decrbyCommand(redisClient *c) {
2930 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2931 incrDecrCommand(c,-incr);
2932 }
2933
2934 /* ========================= Type agnostic commands ========================= */
2935
2936 static void delCommand(redisClient *c) {
2937 int deleted = 0, j;
2938
2939 for (j = 1; j < c->argc; j++) {
2940 if (deleteKey(c->db,c->argv[j])) {
2941 server.dirty++;
2942 deleted++;
2943 }
2944 }
2945 switch(deleted) {
2946 case 0:
2947 addReply(c,shared.czero);
2948 break;
2949 case 1:
2950 addReply(c,shared.cone);
2951 break;
2952 default:
2953 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2954 break;
2955 }
2956 }
2957
2958 static void existsCommand(redisClient *c) {
2959 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2960 }
2961
2962 static void selectCommand(redisClient *c) {
2963 int id = atoi(c->argv[1]->ptr);
2964
2965 if (selectDb(c,id) == REDIS_ERR) {
2966 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2967 } else {
2968 addReply(c,shared.ok);
2969 }
2970 }
2971
2972 static void randomkeyCommand(redisClient *c) {
2973 dictEntry *de;
2974
2975 while(1) {
2976 de = dictGetRandomKey(c->db->dict);
2977 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2978 }
2979 if (de == NULL) {
2980 addReply(c,shared.plus);
2981 addReply(c,shared.crlf);
2982 } else {
2983 addReply(c,shared.plus);
2984 addReply(c,dictGetEntryKey(de));
2985 addReply(c,shared.crlf);
2986 }
2987 }
2988
2989 static void keysCommand(redisClient *c) {
2990 dictIterator *di;
2991 dictEntry *de;
2992 sds pattern = c->argv[1]->ptr;
2993 int plen = sdslen(pattern);
2994 int numkeys = 0, keyslen = 0;
2995 robj *lenobj = createObject(REDIS_STRING,NULL);
2996
2997 di = dictGetIterator(c->db->dict);
2998 addReply(c,lenobj);
2999 decrRefCount(lenobj);
3000 while((de = dictNext(di)) != NULL) {
3001 robj *keyobj = dictGetEntryKey(de);
3002
3003 sds key = keyobj->ptr;
3004 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3005 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3006 if (expireIfNeeded(c->db,keyobj) == 0) {
3007 if (numkeys != 0)
3008 addReply(c,shared.space);
3009 addReply(c,keyobj);
3010 numkeys++;
3011 keyslen += sdslen(key);
3012 }
3013 }
3014 }
3015 dictReleaseIterator(di);
3016 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3017 addReply(c,shared.crlf);
3018 }
3019
3020 static void dbsizeCommand(redisClient *c) {
3021 addReplySds(c,
3022 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3023 }
3024
3025 static void lastsaveCommand(redisClient *c) {
3026 addReplySds(c,
3027 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3028 }
3029
3030 static void typeCommand(redisClient *c) {
3031 robj *o;
3032 char *type;
3033
3034 o = lookupKeyRead(c->db,c->argv[1]);
3035 if (o == NULL) {
3036 type = "+none";
3037 } else {
3038 switch(o->type) {
3039 case REDIS_STRING: type = "+string"; break;
3040 case REDIS_LIST: type = "+list"; break;
3041 case REDIS_SET: type = "+set"; break;
3042 case REDIS_ZSET: type = "+zset"; break;
3043 default: type = "unknown"; break;
3044 }
3045 }
3046 addReplySds(c,sdsnew(type));
3047 addReply(c,shared.crlf);
3048 }
3049
3050 static void saveCommand(redisClient *c) {
3051 if (server.bgsaveinprogress) {
3052 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3053 return;
3054 }
3055 if (rdbSave(server.dbfilename) == REDIS_OK) {
3056 addReply(c,shared.ok);
3057 } else {
3058 addReply(c,shared.err);
3059 }
3060 }
3061
3062 static void bgsaveCommand(redisClient *c) {
3063 if (server.bgsaveinprogress) {
3064 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3065 return;
3066 }
3067 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3068 addReply(c,shared.ok);
3069 } else {
3070 addReply(c,shared.err);
3071 }
3072 }
3073
3074 static void shutdownCommand(redisClient *c) {
3075 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3076 /* Kill the saving child if there is a background saving in progress.
3077 We want to avoid race conditions, for instance our saving child may
3078 overwrite the synchronous saving did by SHUTDOWN. */
3079 if (server.bgsaveinprogress) {
3080 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3081 kill(server.bgsavechildpid,SIGKILL);
3082 rdbRemoveTempFile(server.bgsavechildpid);
3083 }
3084 /* SYNC SAVE */
3085 if (rdbSave(server.dbfilename) == REDIS_OK) {
3086 if (server.daemonize)
3087 unlink(server.pidfile);
3088 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3089 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3090 exit(1);
3091 } else {
3092 /* Ooops.. error saving! The best we can do is to continue operating.
3093 * Note that if there was a background saving process, in the next
3094 * cron() Redis will be notified that the background saving aborted,
3095 * handling special stuff like slaves pending for synchronization... */
3096 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3097 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3098 }
3099 }
3100
3101 static void renameGenericCommand(redisClient *c, int nx) {
3102 robj *o;
3103
3104 /* To use the same key as src and dst is probably an error */
3105 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3106 addReply(c,shared.sameobjecterr);
3107 return;
3108 }
3109
3110 o = lookupKeyWrite(c->db,c->argv[1]);
3111 if (o == NULL) {
3112 addReply(c,shared.nokeyerr);
3113 return;
3114 }
3115 incrRefCount(o);
3116 deleteIfVolatile(c->db,c->argv[2]);
3117 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3118 if (nx) {
3119 decrRefCount(o);
3120 addReply(c,shared.czero);
3121 return;
3122 }
3123 dictReplace(c->db->dict,c->argv[2],o);
3124 } else {
3125 incrRefCount(c->argv[2]);
3126 }
3127 deleteKey(c->db,c->argv[1]);
3128 server.dirty++;
3129 addReply(c,nx ? shared.cone : shared.ok);
3130 }
3131
3132 static void renameCommand(redisClient *c) {
3133 renameGenericCommand(c,0);
3134 }
3135
3136 static void renamenxCommand(redisClient *c) {
3137 renameGenericCommand(c,1);
3138 }
3139
3140 static void moveCommand(redisClient *c) {
3141 robj *o;
3142 redisDb *src, *dst;
3143 int srcid;
3144
3145 /* Obtain source and target DB pointers */
3146 src = c->db;
3147 srcid = c->db->id;
3148 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3149 addReply(c,shared.outofrangeerr);
3150 return;
3151 }
3152 dst = c->db;
3153 selectDb(c,srcid); /* Back to the source DB */
3154
3155 /* If the user is moving using as target the same
3156 * DB as the source DB it is probably an error. */
3157 if (src == dst) {
3158 addReply(c,shared.sameobjecterr);
3159 return;
3160 }
3161
3162 /* Check if the element exists and get a reference */
3163 o = lookupKeyWrite(c->db,c->argv[1]);
3164 if (!o) {
3165 addReply(c,shared.czero);
3166 return;
3167 }
3168
3169 /* Try to add the element to the target DB */
3170 deleteIfVolatile(dst,c->argv[1]);
3171 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3172 addReply(c,shared.czero);
3173 return;
3174 }
3175 incrRefCount(c->argv[1]);
3176 incrRefCount(o);
3177
3178 /* OK! key moved, free the entry in the source DB */
3179 deleteKey(src,c->argv[1]);
3180 server.dirty++;
3181 addReply(c,shared.cone);
3182 }
3183
3184 /* =================================== Lists ================================ */
3185 static void pushGenericCommand(redisClient *c, int where) {
3186 robj *lobj;
3187 list *list;
3188
3189 lobj = lookupKeyWrite(c->db,c->argv[1]);
3190 if (lobj == NULL) {
3191 lobj = createListObject();
3192 list = lobj->ptr;
3193 if (where == REDIS_HEAD) {
3194 listAddNodeHead(list,c->argv[2]);
3195 } else {
3196 listAddNodeTail(list,c->argv[2]);
3197 }
3198 dictAdd(c->db->dict,c->argv[1],lobj);
3199 incrRefCount(c->argv[1]);
3200 incrRefCount(c->argv[2]);
3201 } else {
3202 if (lobj->type != REDIS_LIST) {
3203 addReply(c,shared.wrongtypeerr);
3204 return;
3205 }
3206 list = lobj->ptr;
3207 if (where == REDIS_HEAD) {
3208 listAddNodeHead(list,c->argv[2]);
3209 } else {
3210 listAddNodeTail(list,c->argv[2]);
3211 }
3212 incrRefCount(c->argv[2]);
3213 }
3214 server.dirty++;
3215 addReply(c,shared.ok);
3216 }
3217
3218 static void lpushCommand(redisClient *c) {
3219 pushGenericCommand(c,REDIS_HEAD);
3220 }
3221
3222 static void rpushCommand(redisClient *c) {
3223 pushGenericCommand(c,REDIS_TAIL);
3224 }
3225
3226 static void llenCommand(redisClient *c) {
3227 robj *o;
3228 list *l;
3229
3230 o = lookupKeyRead(c->db,c->argv[1]);
3231 if (o == NULL) {
3232 addReply(c,shared.czero);
3233 return;
3234 } else {
3235 if (o->type != REDIS_LIST) {
3236 addReply(c,shared.wrongtypeerr);
3237 } else {
3238 l = o->ptr;
3239 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3240 }
3241 }
3242 }
3243
3244 static void lindexCommand(redisClient *c) {
3245 robj *o;
3246 int index = atoi(c->argv[2]->ptr);
3247
3248 o = lookupKeyRead(c->db,c->argv[1]);
3249 if (o == NULL) {
3250 addReply(c,shared.nullbulk);
3251 } else {
3252 if (o->type != REDIS_LIST) {
3253 addReply(c,shared.wrongtypeerr);
3254 } else {
3255 list *list = o->ptr;
3256 listNode *ln;
3257
3258 ln = listIndex(list, index);
3259 if (ln == NULL) {
3260 addReply(c,shared.nullbulk);
3261 } else {
3262 robj *ele = listNodeValue(ln);
3263 addReplyBulkLen(c,ele);
3264 addReply(c,ele);
3265 addReply(c,shared.crlf);
3266 }
3267 }
3268 }
3269 }
3270
3271 static void lsetCommand(redisClient *c) {
3272 robj *o;
3273 int index = atoi(c->argv[2]->ptr);
3274
3275 o = lookupKeyWrite(c->db,c->argv[1]);
3276 if (o == NULL) {
3277 addReply(c,shared.nokeyerr);
3278 } else {
3279 if (o->type != REDIS_LIST) {
3280 addReply(c,shared.wrongtypeerr);
3281 } else {
3282 list *list = o->ptr;
3283 listNode *ln;
3284
3285 ln = listIndex(list, index);
3286 if (ln == NULL) {
3287 addReply(c,shared.outofrangeerr);
3288 } else {
3289 robj *ele = listNodeValue(ln);
3290
3291 decrRefCount(ele);
3292 listNodeValue(ln) = c->argv[3];
3293 incrRefCount(c->argv[3]);
3294 addReply(c,shared.ok);
3295 server.dirty++;
3296 }
3297 }
3298 }
3299 }
3300
3301 static void popGenericCommand(redisClient *c, int where) {
3302 robj *o;
3303
3304 o = lookupKeyWrite(c->db,c->argv[1]);
3305 if (o == NULL) {
3306 addReply(c,shared.nullbulk);
3307 } else {
3308 if (o->type != REDIS_LIST) {
3309 addReply(c,shared.wrongtypeerr);
3310 } else {
3311 list *list = o->ptr;
3312 listNode *ln;
3313
3314 if (where == REDIS_HEAD)
3315 ln = listFirst(list);
3316 else
3317 ln = listLast(list);
3318
3319 if (ln == NULL) {
3320 addReply(c,shared.nullbulk);
3321 } else {
3322 robj *ele = listNodeValue(ln);
3323 addReplyBulkLen(c,ele);
3324 addReply(c,ele);
3325 addReply(c,shared.crlf);
3326 listDelNode(list,ln);
3327 server.dirty++;
3328 }
3329 }
3330 }
3331 }
3332
3333 static void lpopCommand(redisClient *c) {
3334 popGenericCommand(c,REDIS_HEAD);
3335 }
3336
3337 static void rpopCommand(redisClient *c) {
3338 popGenericCommand(c,REDIS_TAIL);
3339 }
3340
3341 static void lrangeCommand(redisClient *c) {
3342 robj *o;
3343 int start = atoi(c->argv[2]->ptr);
3344 int end = atoi(c->argv[3]->ptr);
3345
3346 o = lookupKeyRead(c->db,c->argv[1]);
3347 if (o == NULL) {
3348 addReply(c,shared.nullmultibulk);
3349 } else {
3350 if (o->type != REDIS_LIST) {
3351 addReply(c,shared.wrongtypeerr);
3352 } else {
3353 list *list = o->ptr;
3354 listNode *ln;
3355 int llen = listLength(list);
3356 int rangelen, j;
3357 robj *ele;
3358
3359 /* convert negative indexes */
3360 if (start < 0) start = llen+start;
3361 if (end < 0) end = llen+end;
3362 if (start < 0) start = 0;
3363 if (end < 0) end = 0;
3364
3365 /* indexes sanity checks */
3366 if (start > end || start >= llen) {
3367 /* Out of range start or start > end result in empty list */
3368 addReply(c,shared.emptymultibulk);
3369 return;
3370 }
3371 if (end >= llen) end = llen-1;
3372 rangelen = (end-start)+1;
3373
3374 /* Return the result in form of a multi-bulk reply */
3375 ln = listIndex(list, start);
3376 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3377 for (j = 0; j < rangelen; j++) {
3378 ele = listNodeValue(ln);
3379 addReplyBulkLen(c,ele);
3380 addReply(c,ele);
3381 addReply(c,shared.crlf);
3382 ln = ln->next;
3383 }
3384 }
3385 }
3386 }
3387
3388 static void ltrimCommand(redisClient *c) {
3389 robj *o;
3390 int start = atoi(c->argv[2]->ptr);
3391 int end = atoi(c->argv[3]->ptr);
3392
3393 o = lookupKeyWrite(c->db,c->argv[1]);
3394 if (o == NULL) {
3395 addReply(c,shared.nokeyerr);
3396 } else {
3397 if (o->type != REDIS_LIST) {
3398 addReply(c,shared.wrongtypeerr);
3399 } else {
3400 list *list = o->ptr;
3401 listNode *ln;
3402 int llen = listLength(list);
3403 int j, ltrim, rtrim;
3404
3405 /* convert negative indexes */
3406 if (start < 0) start = llen+start;
3407 if (end < 0) end = llen+end;
3408 if (start < 0) start = 0;
3409 if (end < 0) end = 0;
3410
3411 /* indexes sanity checks */
3412 if (start > end || start >= llen) {
3413 /* Out of range start or start > end result in empty list */
3414 ltrim = llen;
3415 rtrim = 0;
3416 } else {
3417 if (end >= llen) end = llen-1;
3418 ltrim = start;
3419 rtrim = llen-end-1;
3420 }
3421
3422 /* Remove list elements to perform the trim */
3423 for (j = 0; j < ltrim; j++) {
3424 ln = listFirst(list);
3425 listDelNode(list,ln);
3426 }
3427 for (j = 0; j < rtrim; j++) {
3428 ln = listLast(list);
3429 listDelNode(list,ln);
3430 }
3431 server.dirty++;
3432 addReply(c,shared.ok);
3433 }
3434 }
3435 }
3436
3437 static void lremCommand(redisClient *c) {
3438 robj *o;
3439
3440 o = lookupKeyWrite(c->db,c->argv[1]);
3441 if (o == NULL) {
3442 addReply(c,shared.czero);
3443 } else {
3444 if (o->type != REDIS_LIST) {
3445 addReply(c,shared.wrongtypeerr);
3446 } else {
3447 list *list = o->ptr;
3448 listNode *ln, *next;
3449 int toremove = atoi(c->argv[2]->ptr);
3450 int removed = 0;
3451 int fromtail = 0;
3452
3453 if (toremove < 0) {
3454 toremove = -toremove;
3455 fromtail = 1;
3456 }
3457 ln = fromtail ? list->tail : list->head;
3458 while (ln) {
3459 robj *ele = listNodeValue(ln);
3460
3461 next = fromtail ? ln->prev : ln->next;
3462 if (compareStringObjects(ele,c->argv[3]) == 0) {
3463 listDelNode(list,ln);
3464 server.dirty++;
3465 removed++;
3466 if (toremove && removed == toremove) break;
3467 }
3468 ln = next;
3469 }
3470 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3471 }
3472 }
3473 }
3474
3475 /* This is the semantic of this command:
3476 * LPOPPUSH srclist dstlist:
3477 * IF LLEN(srclist) > 0
3478 * element = RPOP srclist
3479 * LPUSH dstlist element
3480 * RETURN element
3481 * ELSE
3482 * RETURN nil
3483 * END
3484 * END
3485 *
3486 * The idea is to be able to get an element from a list in a reliable way
3487 * since the element is not just returned but pushed against another list
3488 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3489 */
3490 static void lpoppushCommand(redisClient *c) {
3491 robj *sobj;
3492
3493 sobj = lookupKeyWrite(c->db,c->argv[1]);
3494 if (sobj == NULL) {
3495 addReply(c,shared.nullbulk);
3496 } else {
3497 if (sobj->type != REDIS_LIST) {
3498 addReply(c,shared.wrongtypeerr);
3499 } else {
3500 list *srclist = sobj->ptr;
3501 listNode *ln = listLast(srclist);
3502
3503 if (ln == NULL) {
3504 addReply(c,shared.nullbulk);
3505 } else {
3506 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3507 robj *ele = listNodeValue(ln);
3508 list *dstlist;
3509
3510 if (dobj == NULL) {
3511
3512 /* Create the list if the key does not exist */
3513 dobj = createListObject();
3514 dictAdd(c->db->dict,c->argv[2],dobj);
3515 incrRefCount(c->argv[2]);
3516 } else if (dobj->type != REDIS_LIST) {
3517 addReply(c,shared.wrongtypeerr);
3518 return;
3519 }
3520 /* Add the element to the target list */
3521 dstlist = dobj->ptr;
3522 listAddNodeHead(dstlist,ele);
3523 incrRefCount(ele);
3524
3525 /* Send the element to the client as reply as well */
3526 addReplyBulkLen(c,ele);
3527 addReply(c,ele);
3528 addReply(c,shared.crlf);
3529
3530 /* Finally remove the element from the source list */
3531 listDelNode(srclist,ln);
3532 server.dirty++;
3533 }
3534 }
3535 }
3536 }
3537
3538
3539 /* ==================================== Sets ================================ */
3540
3541 static void saddCommand(redisClient *c) {
3542 robj *set;
3543
3544 set = lookupKeyWrite(c->db,c->argv[1]);
3545 if (set == NULL) {
3546 set = createSetObject();
3547 dictAdd(c->db->dict,c->argv[1],set);
3548 incrRefCount(c->argv[1]);
3549 } else {
3550 if (set->type != REDIS_SET) {
3551 addReply(c,shared.wrongtypeerr);
3552 return;
3553 }
3554 }
3555 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3556 incrRefCount(c->argv[2]);
3557 server.dirty++;
3558 addReply(c,shared.cone);
3559 } else {
3560 addReply(c,shared.czero);
3561 }
3562 }
3563
3564 static void sremCommand(redisClient *c) {
3565 robj *set;
3566
3567 set = lookupKeyWrite(c->db,c->argv[1]);
3568 if (set == NULL) {
3569 addReply(c,shared.czero);
3570 } else {
3571 if (set->type != REDIS_SET) {
3572 addReply(c,shared.wrongtypeerr);
3573 return;
3574 }
3575 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3576 server.dirty++;
3577 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3578 addReply(c,shared.cone);
3579 } else {
3580 addReply(c,shared.czero);
3581 }
3582 }
3583 }
3584
3585 static void smoveCommand(redisClient *c) {
3586 robj *srcset, *dstset;
3587
3588 srcset = lookupKeyWrite(c->db,c->argv[1]);
3589 dstset = lookupKeyWrite(c->db,c->argv[2]);
3590
3591 /* If the source key does not exist return 0, if it's of the wrong type
3592 * raise an error */
3593 if (srcset == NULL || srcset->type != REDIS_SET) {
3594 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3595 return;
3596 }
3597 /* Error if the destination key is not a set as well */
3598 if (dstset && dstset->type != REDIS_SET) {
3599 addReply(c,shared.wrongtypeerr);
3600 return;
3601 }
3602 /* Remove the element from the source set */
3603 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3604 /* Key not found in the src set! return zero */
3605 addReply(c,shared.czero);
3606 return;
3607 }
3608 server.dirty++;
3609 /* Add the element to the destination set */
3610 if (!dstset) {
3611 dstset = createSetObject();
3612 dictAdd(c->db->dict,c->argv[2],dstset);
3613 incrRefCount(c->argv[2]);
3614 }
3615 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3616 incrRefCount(c->argv[3]);
3617 addReply(c,shared.cone);
3618 }
3619
3620 static void sismemberCommand(redisClient *c) {
3621 robj *set;
3622
3623 set = lookupKeyRead(c->db,c->argv[1]);
3624 if (set == NULL) {
3625 addReply(c,shared.czero);
3626 } else {
3627 if (set->type != REDIS_SET) {
3628 addReply(c,shared.wrongtypeerr);
3629 return;
3630 }
3631 if (dictFind(set->ptr,c->argv[2]))
3632 addReply(c,shared.cone);
3633 else
3634 addReply(c,shared.czero);
3635 }
3636 }
3637
3638 static void scardCommand(redisClient *c) {
3639 robj *o;
3640 dict *s;
3641
3642 o = lookupKeyRead(c->db,c->argv[1]);
3643 if (o == NULL) {
3644 addReply(c,shared.czero);
3645 return;
3646 } else {
3647 if (o->type != REDIS_SET) {
3648 addReply(c,shared.wrongtypeerr);
3649 } else {
3650 s = o->ptr;
3651 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3652 dictSize(s)));
3653 }
3654 }
3655 }
3656
3657 static void spopCommand(redisClient *c) {
3658 robj *set;
3659 dictEntry *de;
3660
3661 set = lookupKeyWrite(c->db,c->argv[1]);
3662 if (set == NULL) {
3663 addReply(c,shared.nullbulk);
3664 } else {
3665 if (set->type != REDIS_SET) {
3666 addReply(c,shared.wrongtypeerr);
3667 return;
3668 }
3669 de = dictGetRandomKey(set->ptr);
3670 if (de == NULL) {
3671 addReply(c,shared.nullbulk);
3672 } else {
3673 robj *ele = dictGetEntryKey(de);
3674
3675 addReplyBulkLen(c,ele);
3676 addReply(c,ele);
3677 addReply(c,shared.crlf);
3678 dictDelete(set->ptr,ele);
3679 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3680 server.dirty++;
3681 }
3682 }
3683 }
3684
3685 static void srandmemberCommand(redisClient *c) {
3686 robj *set;
3687 dictEntry *de;
3688
3689 set = lookupKeyRead(c->db,c->argv[1]);
3690 if (set == NULL) {
3691 addReply(c,shared.nullbulk);
3692 } else {
3693 if (set->type != REDIS_SET) {
3694 addReply(c,shared.wrongtypeerr);
3695 return;
3696 }
3697 de = dictGetRandomKey(set->ptr);
3698 if (de == NULL) {
3699 addReply(c,shared.nullbulk);
3700 } else {
3701 robj *ele = dictGetEntryKey(de);
3702
3703 addReplyBulkLen(c,ele);
3704 addReply(c,ele);
3705 addReply(c,shared.crlf);
3706 }
3707 }
3708 }
3709
3710 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3711 dict **d1 = (void*) s1, **d2 = (void*) s2;
3712
3713 return dictSize(*d1)-dictSize(*d2);
3714 }
3715
3716 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3717 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3718 dictIterator *di;
3719 dictEntry *de;
3720 robj *lenobj = NULL, *dstset = NULL;
3721 int j, cardinality = 0;
3722
3723 for (j = 0; j < setsnum; j++) {
3724 robj *setobj;
3725
3726 setobj = dstkey ?
3727 lookupKeyWrite(c->db,setskeys[j]) :
3728 lookupKeyRead(c->db,setskeys[j]);
3729 if (!setobj) {
3730 zfree(dv);
3731 if (dstkey) {
3732 deleteKey(c->db,dstkey);
3733 addReply(c,shared.ok);
3734 } else {
3735 addReply(c,shared.nullmultibulk);
3736 }
3737 return;
3738 }
3739 if (setobj->type != REDIS_SET) {
3740 zfree(dv);
3741 addReply(c,shared.wrongtypeerr);
3742 return;
3743 }
3744 dv[j] = setobj->ptr;
3745 }
3746 /* Sort sets from the smallest to largest, this will improve our
3747 * algorithm's performace */
3748 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3749
3750 /* The first thing we should output is the total number of elements...
3751 * since this is a multi-bulk write, but at this stage we don't know
3752 * the intersection set size, so we use a trick, append an empty object
3753 * to the output list and save the pointer to later modify it with the
3754 * right length */
3755 if (!dstkey) {
3756 lenobj = createObject(REDIS_STRING,NULL);
3757 addReply(c,lenobj);
3758 decrRefCount(lenobj);
3759 } else {
3760 /* If we have a target key where to store the resulting set
3761 * create this key with an empty set inside */
3762 dstset = createSetObject();
3763 }
3764
3765 /* Iterate all the elements of the first (smallest) set, and test
3766 * the element against all the other sets, if at least one set does
3767 * not include the element it is discarded */
3768 di = dictGetIterator(dv[0]);
3769
3770 while((de = dictNext(di)) != NULL) {
3771 robj *ele;
3772
3773 for (j = 1; j < setsnum; j++)
3774 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3775 if (j != setsnum)
3776 continue; /* at least one set does not contain the member */
3777 ele = dictGetEntryKey(de);
3778 if (!dstkey) {
3779 addReplyBulkLen(c,ele);
3780 addReply(c,ele);
3781 addReply(c,shared.crlf);
3782 cardinality++;
3783 } else {
3784 dictAdd(dstset->ptr,ele,NULL);
3785 incrRefCount(ele);
3786 }
3787 }
3788 dictReleaseIterator(di);
3789
3790 if (dstkey) {
3791 /* Store the resulting set into the target */
3792 deleteKey(c->db,dstkey);
3793 dictAdd(c->db->dict,dstkey,dstset);
3794 incrRefCount(dstkey);
3795 }
3796
3797 if (!dstkey) {
3798 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3799 } else {
3800 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3801 dictSize((dict*)dstset->ptr)));
3802 server.dirty++;
3803 }
3804 zfree(dv);
3805 }
3806
3807 static void sinterCommand(redisClient *c) {
3808 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3809 }
3810
3811 static void sinterstoreCommand(redisClient *c) {
3812 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3813 }
3814
3815 #define REDIS_OP_UNION 0
3816 #define REDIS_OP_DIFF 1
3817
3818 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3819 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3820 dictIterator *di;
3821 dictEntry *de;
3822 robj *dstset = NULL;
3823 int j, cardinality = 0;
3824
3825 for (j = 0; j < setsnum; j++) {
3826 robj *setobj;
3827
3828 setobj = dstkey ?
3829 lookupKeyWrite(c->db,setskeys[j]) :
3830 lookupKeyRead(c->db,setskeys[j]);
3831 if (!setobj) {
3832 dv[j] = NULL;
3833 continue;
3834 }
3835 if (setobj->type != REDIS_SET) {
3836 zfree(dv);
3837 addReply(c,shared.wrongtypeerr);
3838 return;
3839 }
3840 dv[j] = setobj->ptr;
3841 }
3842
3843 /* We need a temp set object to store our union. If the dstkey
3844 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3845 * this set object will be the resulting object to set into the target key*/
3846 dstset = createSetObject();
3847
3848 /* Iterate all the elements of all the sets, add every element a single
3849 * time to the result set */
3850 for (j = 0; j < setsnum; j++) {
3851 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3852 if (!dv[j]) continue; /* non existing keys are like empty sets */
3853
3854 di = dictGetIterator(dv[j]);
3855
3856 while((de = dictNext(di)) != NULL) {
3857 robj *ele;
3858
3859 /* dictAdd will not add the same element multiple times */
3860 ele = dictGetEntryKey(de);
3861 if (op == REDIS_OP_UNION || j == 0) {
3862 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3863 incrRefCount(ele);
3864 cardinality++;
3865 }
3866 } else if (op == REDIS_OP_DIFF) {
3867 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3868 cardinality--;
3869 }
3870 }
3871 }
3872 dictReleaseIterator(di);
3873
3874 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3875 }
3876
3877 /* Output the content of the resulting set, if not in STORE mode */
3878 if (!dstkey) {
3879 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3880 di = dictGetIterator(dstset->ptr);
3881 while((de = dictNext(di)) != NULL) {
3882 robj *ele;
3883
3884 ele = dictGetEntryKey(de);
3885 addReplyBulkLen(c,ele);
3886 addReply(c,ele);
3887 addReply(c,shared.crlf);
3888 }
3889 dictReleaseIterator(di);
3890 } else {
3891 /* If we have a target key where to store the resulting set
3892 * create this key with the result set inside */
3893 deleteKey(c->db,dstkey);
3894 dictAdd(c->db->dict,dstkey,dstset);
3895 incrRefCount(dstkey);
3896 }
3897
3898 /* Cleanup */
3899 if (!dstkey) {
3900 decrRefCount(dstset);
3901 } else {
3902 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3903 dictSize((dict*)dstset->ptr)));
3904 server.dirty++;
3905 }
3906 zfree(dv);
3907 }
3908
3909 static void sunionCommand(redisClient *c) {
3910 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3911 }
3912
3913 static void sunionstoreCommand(redisClient *c) {
3914 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3915 }
3916
3917 static void sdiffCommand(redisClient *c) {
3918 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3919 }
3920
3921 static void sdiffstoreCommand(redisClient *c) {
3922 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3923 }
3924
3925 /* ==================================== ZSets =============================== */
3926
3927 /* ZSETs are ordered sets using two data structures to hold the same elements
3928 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3929 * data structure.
3930 *
3931 * The elements are added to an hash table mapping Redis objects to scores.
3932 * At the same time the elements are added to a skip list mapping scores
3933 * to Redis objects (so objects are sorted by scores in this "view"). */
3934
3935 /* This skiplist implementation is almost a C translation of the original
3936 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3937 * Alternative to Balanced Trees", modified in three ways:
3938 * a) this implementation allows for repeated values.
3939 * b) the comparison is not just by key (our 'score') but by satellite data.
3940 * c) there is a back pointer, so it's a doubly linked list with the back
3941 * pointers being only at "level 1". This allows to traverse the list
3942 * from tail to head, useful for ZREVRANGE. */
3943
3944 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3945 zskiplistNode *zn = zmalloc(sizeof(*zn));
3946
3947 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3948 zn->score = score;
3949 zn->obj = obj;
3950 return zn;
3951 }
3952
3953 static zskiplist *zslCreate(void) {
3954 int j;
3955 zskiplist *zsl;
3956
3957 zsl = zmalloc(sizeof(*zsl));
3958 zsl->level = 1;
3959 zsl->length = 0;
3960 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3961 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3962 zsl->header->forward[j] = NULL;
3963 zsl->header->backward = NULL;
3964 zsl->tail = NULL;
3965 return zsl;
3966 }
3967
3968 static void zslFreeNode(zskiplistNode *node) {
3969 decrRefCount(node->obj);
3970 zfree(node->forward);
3971 zfree(node);
3972 }
3973
3974 static void zslFree(zskiplist *zsl) {
3975 zskiplistNode *node = zsl->header->forward[0], *next;
3976
3977 zfree(zsl->header->forward);
3978 zfree(zsl->header);
3979 while(node) {
3980 next = node->forward[0];
3981 zslFreeNode(node);
3982 node = next;
3983 }
3984 zfree(zsl);
3985 }
3986
3987 static int zslRandomLevel(void) {
3988 int level = 1;
3989 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3990 level += 1;
3991 return level;
3992 }
3993
3994 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3995 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3996 int i, level;
3997
3998 x = zsl->header;
3999 for (i = zsl->level-1; i >= 0; i--) {
4000 while (x->forward[i] &&
4001 (x->forward[i]->score < score ||
4002 (x->forward[i]->score == score &&
4003 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4004 x = x->forward[i];
4005 update[i] = x;
4006 }
4007 /* we assume the key is not already inside, since we allow duplicated
4008 * scores, and the re-insertion of score and redis object should never
4009 * happpen since the caller of zslInsert() should test in the hash table
4010 * if the element is already inside or not. */
4011 level = zslRandomLevel();
4012 if (level > zsl->level) {
4013 for (i = zsl->level; i < level; i++)
4014 update[i] = zsl->header;
4015 zsl->level = level;
4016 }
4017 x = zslCreateNode(level,score,obj);
4018 for (i = 0; i < level; i++) {
4019 x->forward[i] = update[i]->forward[i];
4020 update[i]->forward[i] = x;
4021 }
4022 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4023 if (x->forward[0])
4024 x->forward[0]->backward = x;
4025 else
4026 zsl->tail = x;
4027 zsl->length++;
4028 }
4029
4030 /* Delete an element with matching score/object from the skiplist. */
4031 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4032 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4033 int i;
4034
4035 x = zsl->header;
4036 for (i = zsl->level-1; i >= 0; i--) {
4037 while (x->forward[i] &&
4038 (x->forward[i]->score < score ||
4039 (x->forward[i]->score == score &&
4040 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4041 x = x->forward[i];
4042 update[i] = x;
4043 }
4044 /* We may have multiple elements with the same score, what we need
4045 * is to find the element with both the right score and object. */
4046 x = x->forward[0];
4047 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4048 for (i = 0; i < zsl->level; i++) {
4049 if (update[i]->forward[i] != x) break;
4050 update[i]->forward[i] = x->forward[i];
4051 }
4052 if (x->forward[0]) {
4053 x->forward[0]->backward = (x->backward == zsl->header) ?
4054 NULL : x->backward;
4055 } else {
4056 zsl->tail = x->backward;
4057 }
4058 zslFreeNode(x);
4059 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4060 zsl->level--;
4061 zsl->length--;
4062 return 1;
4063 } else {
4064 return 0; /* not found */
4065 }
4066 return 0; /* not found */
4067 }
4068
4069 /* Delete all the elements with score between min and max from the skiplist.
4070 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4071 * Note that this function takes the reference to the hash table view of the
4072 * sorted set, in order to remove the elements from the hash table too. */
4073 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4074 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4075 unsigned long removed = 0;
4076 int i;
4077
4078 x = zsl->header;
4079 for (i = zsl->level-1; i >= 0; i--) {
4080 while (x->forward[i] && x->forward[i]->score < min)
4081 x = x->forward[i];
4082 update[i] = x;
4083 }
4084 /* We may have multiple elements with the same score, what we need
4085 * is to find the element with both the right score and object. */
4086 x = x->forward[0];
4087 while (x && x->score <= max) {
4088 zskiplistNode *next;
4089
4090 for (i = 0; i < zsl->level; i++) {
4091 if (update[i]->forward[i] != x) break;
4092 update[i]->forward[i] = x->forward[i];
4093 }
4094 if (x->forward[0]) {
4095 x->forward[0]->backward = (x->backward == zsl->header) ?
4096 NULL : x->backward;
4097 } else {
4098 zsl->tail = x->backward;
4099 }
4100 next = x->forward[0];
4101 dictDelete(dict,x->obj);
4102 zslFreeNode(x);
4103 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4104 zsl->level--;
4105 zsl->length--;
4106 removed++;
4107 x = next;
4108 }
4109 return removed; /* not found */
4110 }
4111
4112 /* Find the first node having a score equal or greater than the specified one.
4113 * Returns NULL if there is no match. */
4114 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4115 zskiplistNode *x;
4116 int i;
4117
4118 x = zsl->header;
4119 for (i = zsl->level-1; i >= 0; i--) {
4120 while (x->forward[i] && x->forward[i]->score < score)
4121 x = x->forward[i];
4122 }
4123 /* We may have multiple elements with the same score, what we need
4124 * is to find the element with both the right score and object. */
4125 return x->forward[0];
4126 }
4127
4128 /* The actual Z-commands implementations */
4129
4130 static void zaddCommand(redisClient *c) {
4131 robj *zsetobj;
4132 zset *zs;
4133 double *score;
4134
4135 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4136 if (zsetobj == NULL) {
4137 zsetobj = createZsetObject();
4138 dictAdd(c->db->dict,c->argv[1],zsetobj);
4139 incrRefCount(c->argv[1]);
4140 } else {
4141 if (zsetobj->type != REDIS_ZSET) {
4142 addReply(c,shared.wrongtypeerr);
4143 return;
4144 }
4145 }
4146 score = zmalloc(sizeof(double));
4147 *score = strtod(c->argv[2]->ptr,NULL);
4148 zs = zsetobj->ptr;
4149 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
4150 /* case 1: New element */
4151 incrRefCount(c->argv[3]); /* added to hash */
4152 zslInsert(zs->zsl,*score,c->argv[3]);
4153 incrRefCount(c->argv[3]); /* added to skiplist */
4154 server.dirty++;
4155 addReply(c,shared.cone);
4156 } else {
4157 dictEntry *de;
4158 double *oldscore;
4159
4160 /* case 2: Score update operation */
4161 de = dictFind(zs->dict,c->argv[3]);
4162 assert(de != NULL);
4163 oldscore = dictGetEntryVal(de);
4164 if (*score != *oldscore) {
4165 int deleted;
4166
4167 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
4168 assert(deleted != 0);
4169 zslInsert(zs->zsl,*score,c->argv[3]);
4170 incrRefCount(c->argv[3]);
4171 dictReplace(zs->dict,c->argv[3],score);
4172 server.dirty++;
4173 } else {
4174 zfree(score);
4175 }
4176 addReply(c,shared.czero);
4177 }
4178 }
4179
4180 static void zremCommand(redisClient *c) {
4181 robj *zsetobj;
4182 zset *zs;
4183
4184 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4185 if (zsetobj == NULL) {
4186 addReply(c,shared.czero);
4187 } else {
4188 dictEntry *de;
4189 double *oldscore;
4190 int deleted;
4191
4192 if (zsetobj->type != REDIS_ZSET) {
4193 addReply(c,shared.wrongtypeerr);
4194 return;
4195 }
4196 zs = zsetobj->ptr;
4197 de = dictFind(zs->dict,c->argv[2]);
4198 if (de == NULL) {
4199 addReply(c,shared.czero);
4200 return;
4201 }
4202 /* Delete from the skiplist */
4203 oldscore = dictGetEntryVal(de);
4204 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4205 assert(deleted != 0);
4206
4207 /* Delete from the hash table */
4208 dictDelete(zs->dict,c->argv[2]);
4209 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4210 server.dirty++;
4211 addReply(c,shared.cone);
4212 }
4213 }
4214
4215 static void zremrangebyscoreCommand(redisClient *c) {
4216 double min = strtod(c->argv[2]->ptr,NULL);
4217 double max = strtod(c->argv[3]->ptr,NULL);
4218 robj *zsetobj;
4219 zset *zs;
4220
4221 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4222 if (zsetobj == NULL) {
4223 addReply(c,shared.czero);
4224 } else {
4225 long deleted;
4226
4227 if (zsetobj->type != REDIS_ZSET) {
4228 addReply(c,shared.wrongtypeerr);
4229 return;
4230 }
4231 zs = zsetobj->ptr;
4232 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4233 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4234 server.dirty += deleted;
4235 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4236 }
4237 }
4238
4239 static void zrangeGenericCommand(redisClient *c, int reverse) {
4240 robj *o;
4241 int start = atoi(c->argv[2]->ptr);
4242 int end = atoi(c->argv[3]->ptr);
4243
4244 o = lookupKeyRead(c->db,c->argv[1]);
4245 if (o == NULL) {
4246 addReply(c,shared.nullmultibulk);
4247 } else {
4248 if (o->type != REDIS_ZSET) {
4249 addReply(c,shared.wrongtypeerr);
4250 } else {
4251 zset *zsetobj = o->ptr;
4252 zskiplist *zsl = zsetobj->zsl;
4253 zskiplistNode *ln;
4254
4255 int llen = zsl->length;
4256 int rangelen, j;
4257 robj *ele;
4258
4259 /* convert negative indexes */
4260 if (start < 0) start = llen+start;
4261 if (end < 0) end = llen+end;
4262 if (start < 0) start = 0;
4263 if (end < 0) end = 0;
4264
4265 /* indexes sanity checks */
4266 if (start > end || start >= llen) {
4267 /* Out of range start or start > end result in empty list */
4268 addReply(c,shared.emptymultibulk);
4269 return;
4270 }
4271 if (end >= llen) end = llen-1;
4272 rangelen = (end-start)+1;
4273
4274 /* Return the result in form of a multi-bulk reply */
4275 if (reverse) {
4276 ln = zsl->tail;
4277 while (start--)
4278 ln = ln->backward;
4279 } else {
4280 ln = zsl->header->forward[0];
4281 while (start--)
4282 ln = ln->forward[0];
4283 }
4284
4285 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4286 for (j = 0; j < rangelen; j++) {
4287 ele = ln->obj;
4288 addReplyBulkLen(c,ele);
4289 addReply(c,ele);
4290 addReply(c,shared.crlf);
4291 ln = reverse ? ln->backward : ln->forward[0];
4292 }
4293 }
4294 }
4295 }
4296
4297 static void zrangeCommand(redisClient *c) {
4298 zrangeGenericCommand(c,0);
4299 }
4300
4301 static void zrevrangeCommand(redisClient *c) {
4302 zrangeGenericCommand(c,1);
4303 }
4304
4305 static void zrangebyscoreCommand(redisClient *c) {
4306 robj *o;
4307 double min = strtod(c->argv[2]->ptr,NULL);
4308 double max = strtod(c->argv[3]->ptr,NULL);
4309
4310 o = lookupKeyRead(c->db,c->argv[1]);
4311 if (o == NULL) {
4312 addReply(c,shared.nullmultibulk);
4313 } else {
4314 if (o->type != REDIS_ZSET) {
4315 addReply(c,shared.wrongtypeerr);
4316 } else {
4317 zset *zsetobj = o->ptr;
4318 zskiplist *zsl = zsetobj->zsl;
4319 zskiplistNode *ln;
4320 robj *ele, *lenobj;
4321 unsigned int rangelen = 0;
4322
4323 /* Get the first node with the score >= min */
4324 ln = zslFirstWithScore(zsl,min);
4325 if (ln == NULL) {
4326 /* No element matching the speciifed interval */
4327 addReply(c,shared.emptymultibulk);
4328 return;
4329 }
4330
4331 /* We don't know in advance how many matching elements there
4332 * are in the list, so we push this object that will represent
4333 * the multi-bulk length in the output buffer, and will "fix"
4334 * it later */
4335 lenobj = createObject(REDIS_STRING,NULL);
4336 addReply(c,lenobj);
4337
4338 while(ln && ln->score <= max) {
4339 ele = ln->obj;
4340 addReplyBulkLen(c,ele);
4341 addReply(c,ele);
4342 addReply(c,shared.crlf);
4343 ln = ln->forward[0];
4344 rangelen++;
4345 }
4346 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4347 }
4348 }
4349 }
4350
4351 static void zcardCommand(redisClient *c) {
4352 robj *o;
4353 zset *zs;
4354
4355 o = lookupKeyRead(c->db,c->argv[1]);
4356 if (o == NULL) {
4357 addReply(c,shared.czero);
4358 return;
4359 } else {
4360 if (o->type != REDIS_ZSET) {
4361 addReply(c,shared.wrongtypeerr);
4362 } else {
4363 zs = o->ptr;
4364 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4365 }
4366 }
4367 }
4368
4369 static void zscoreCommand(redisClient *c) {
4370 robj *o;
4371 zset *zs;
4372
4373 o = lookupKeyRead(c->db,c->argv[1]);
4374 if (o == NULL) {
4375 addReply(c,shared.czero);
4376 return;
4377 } else {
4378 if (o->type != REDIS_ZSET) {
4379 addReply(c,shared.wrongtypeerr);
4380 } else {
4381 dictEntry *de;
4382
4383 zs = o->ptr;
4384 de = dictFind(zs->dict,c->argv[2]);
4385 if (!de) {
4386 addReply(c,shared.nullbulk);
4387 } else {
4388 char buf[128];
4389 double *score = dictGetEntryVal(de);
4390
4391 snprintf(buf,sizeof(buf),"%.17g",*score);
4392 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
4393 strlen(buf),buf));
4394 }
4395 }
4396 }
4397 }
4398
4399 /* ========================= Non type-specific commands ==================== */
4400
4401 static void flushdbCommand(redisClient *c) {
4402 server.dirty += dictSize(c->db->dict);
4403 dictEmpty(c->db->dict);
4404 dictEmpty(c->db->expires);
4405 addReply(c,shared.ok);
4406 }
4407
4408 static void flushallCommand(redisClient *c) {
4409 server.dirty += emptyDb();
4410 addReply(c,shared.ok);
4411 rdbSave(server.dbfilename);
4412 server.dirty++;
4413 }
4414
4415 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4416 redisSortOperation *so = zmalloc(sizeof(*so));
4417 so->type = type;
4418 so->pattern = pattern;
4419 return so;
4420 }
4421
4422 /* Return the value associated to the key with a name obtained
4423 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4424 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4425 char *p;
4426 sds spat, ssub;
4427 robj keyobj;
4428 int prefixlen, sublen, postfixlen;
4429 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4430 struct {
4431 long len;
4432 long free;
4433 char buf[REDIS_SORTKEY_MAX+1];
4434 } keyname;
4435
4436 if (subst->encoding == REDIS_ENCODING_RAW)
4437 incrRefCount(subst);
4438 else {
4439 subst = getDecodedObject(subst);
4440 }
4441
4442 spat = pattern->ptr;
4443 ssub = subst->ptr;
4444 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4445 p = strchr(spat,'*');
4446 if (!p) return NULL;
4447
4448 prefixlen = p-spat;
4449 sublen = sdslen(ssub);
4450 postfixlen = sdslen(spat)-(prefixlen+1);
4451 memcpy(keyname.buf,spat,prefixlen);
4452 memcpy(keyname.buf+prefixlen,ssub,sublen);
4453 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4454 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4455 keyname.len = prefixlen+sublen+postfixlen;
4456
4457 keyobj.refcount = 1;
4458 keyobj.type = REDIS_STRING;
4459 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4460
4461 decrRefCount(subst);
4462
4463 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4464 return lookupKeyRead(db,&keyobj);
4465 }
4466
4467 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4468 * the additional parameter is not standard but a BSD-specific we have to
4469 * pass sorting parameters via the global 'server' structure */
4470 static int sortCompare(const void *s1, const void *s2) {
4471 const redisSortObject *so1 = s1, *so2 = s2;
4472 int cmp;
4473
4474 if (!server.sort_alpha) {
4475 /* Numeric sorting. Here it's trivial as we precomputed scores */
4476 if (so1->u.score > so2->u.score) {
4477 cmp = 1;
4478 } else if (so1->u.score < so2->u.score) {
4479 cmp = -1;
4480 } else {
4481 cmp = 0;
4482 }
4483 } else {
4484 /* Alphanumeric sorting */
4485 if (server.sort_bypattern) {
4486 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4487 /* At least one compare object is NULL */
4488 if (so1->u.cmpobj == so2->u.cmpobj)
4489 cmp = 0;
4490 else if (so1->u.cmpobj == NULL)
4491 cmp = -1;
4492 else
4493 cmp = 1;
4494 } else {
4495 /* We have both the objects, use strcoll */
4496 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4497 }
4498 } else {
4499 /* Compare elements directly */
4500 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4501 so2->obj->encoding == REDIS_ENCODING_RAW) {
4502 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4503 } else {
4504 robj *dec1, *dec2;
4505
4506 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4507 so1->obj : getDecodedObject(so1->obj);
4508 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4509 so2->obj : getDecodedObject(so2->obj);
4510 cmp = strcoll(dec1->ptr,dec2->ptr);
4511 if (dec1 != so1->obj) decrRefCount(dec1);
4512 if (dec2 != so2->obj) decrRefCount(dec2);
4513 }
4514 }
4515 }
4516 return server.sort_desc ? -cmp : cmp;
4517 }
4518
4519 /* The SORT command is the most complex command in Redis. Warning: this code
4520 * is optimized for speed and a bit less for readability */
4521 static void sortCommand(redisClient *c) {
4522 list *operations;
4523 int outputlen = 0;
4524 int desc = 0, alpha = 0;
4525 int limit_start = 0, limit_count = -1, start, end;
4526 int j, dontsort = 0, vectorlen;
4527 int getop = 0; /* GET operation counter */
4528 robj *sortval, *sortby = NULL, *storekey = NULL;
4529 redisSortObject *vector; /* Resulting vector to sort */
4530
4531 /* Lookup the key to sort. It must be of the right types */
4532 sortval = lookupKeyRead(c->db,c->argv[1]);
4533 if (sortval == NULL) {
4534 addReply(c,shared.nokeyerr);
4535 return;
4536 }
4537 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4538 addReply(c,shared.wrongtypeerr);
4539 return;
4540 }
4541
4542 /* Create a list of operations to perform for every sorted element.
4543 * Operations can be GET/DEL/INCR/DECR */
4544 operations = listCreate();
4545 listSetFreeMethod(operations,zfree);
4546 j = 2;
4547
4548 /* Now we need to protect sortval incrementing its count, in the future
4549 * SORT may have options able to overwrite/delete keys during the sorting
4550 * and the sorted key itself may get destroied */
4551 incrRefCount(sortval);
4552
4553 /* The SORT command has an SQL-alike syntax, parse it */
4554 while(j < c->argc) {
4555 int leftargs = c->argc-j-1;
4556 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4557 desc = 0;
4558 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4559 desc = 1;
4560 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4561 alpha = 1;
4562 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4563 limit_start = atoi(c->argv[j+1]->ptr);
4564 limit_count = atoi(c->argv[j+2]->ptr);
4565 j+=2;
4566 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4567 storekey = c->argv[j+1];
4568 j++;
4569 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4570 sortby = c->argv[j+1];
4571 /* If the BY pattern does not contain '*', i.e. it is constant,
4572 * we don't need to sort nor to lookup the weight keys. */
4573 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4574 j++;
4575 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4576 listAddNodeTail(operations,createSortOperation(
4577 REDIS_SORT_GET,c->argv[j+1]));
4578 getop++;
4579 j++;
4580 } else {
4581 decrRefCount(sortval);
4582 listRelease(operations);
4583 addReply(c,shared.syntaxerr);
4584 return;
4585 }
4586 j++;
4587 }
4588
4589 /* Load the sorting vector with all the objects to sort */
4590 vectorlen = (sortval->type == REDIS_LIST) ?
4591 listLength((list*)sortval->ptr) :
4592 dictSize((dict*)sortval->ptr);
4593 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4594 j = 0;
4595 if (sortval->type == REDIS_LIST) {
4596 list *list = sortval->ptr;
4597 listNode *ln;
4598
4599 listRewind(list);
4600 while((ln = listYield(list))) {
4601 robj *ele = ln->value;
4602 vector[j].obj = ele;
4603 vector[j].u.score = 0;
4604 vector[j].u.cmpobj = NULL;
4605 j++;
4606 }
4607 } else {
4608 dict *set = sortval->ptr;
4609 dictIterator *di;
4610 dictEntry *setele;
4611
4612 di = dictGetIterator(set);
4613 while((setele = dictNext(di)) != NULL) {
4614 vector[j].obj = dictGetEntryKey(setele);
4615 vector[j].u.score = 0;
4616 vector[j].u.cmpobj = NULL;
4617 j++;
4618 }
4619 dictReleaseIterator(di);
4620 }
4621 assert(j == vectorlen);
4622
4623 /* Now it's time to load the right scores in the sorting vector */
4624 if (dontsort == 0) {
4625 for (j = 0; j < vectorlen; j++) {
4626 if (sortby) {
4627 robj *byval;
4628
4629 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4630 if (!byval || byval->type != REDIS_STRING) continue;
4631 if (alpha) {
4632 if (byval->encoding == REDIS_ENCODING_RAW) {
4633 vector[j].u.cmpobj = byval;
4634 incrRefCount(byval);
4635 } else {
4636 vector[j].u.cmpobj = getDecodedObject(byval);
4637 }
4638 } else {
4639 if (byval->encoding == REDIS_ENCODING_RAW) {
4640 vector[j].u.score = strtod(byval->ptr,NULL);
4641 } else {
4642 if (byval->encoding == REDIS_ENCODING_INT) {
4643 vector[j].u.score = (long)byval->ptr;
4644 } else
4645 assert(1 != 1);
4646 }
4647 }
4648 } else {
4649 if (!alpha) {
4650 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4651 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4652 else {
4653 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4654 vector[j].u.score = (long) vector[j].obj->ptr;
4655 else
4656 assert(1 != 1);
4657 }
4658 }
4659 }
4660 }
4661 }
4662
4663 /* We are ready to sort the vector... perform a bit of sanity check
4664 * on the LIMIT option too. We'll use a partial version of quicksort. */
4665 start = (limit_start < 0) ? 0 : limit_start;
4666 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4667 if (start >= vectorlen) {
4668 start = vectorlen-1;
4669 end = vectorlen-2;
4670 }
4671 if (end >= vectorlen) end = vectorlen-1;
4672
4673 if (dontsort == 0) {
4674 server.sort_desc = desc;
4675 server.sort_alpha = alpha;
4676 server.sort_bypattern = sortby ? 1 : 0;
4677 if (sortby && (start != 0 || end != vectorlen-1))
4678 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4679 else
4680 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4681 }
4682
4683 /* Send command output to the output buffer, performing the specified
4684 * GET/DEL/INCR/DECR operations if any. */
4685 outputlen = getop ? getop*(end-start+1) : end-start+1;
4686 if (storekey == NULL) {
4687 /* STORE option not specified, sent the sorting result to client */
4688 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4689 for (j = start; j <= end; j++) {
4690 listNode *ln;
4691 if (!getop) {
4692 addReplyBulkLen(c,vector[j].obj);
4693 addReply(c,vector[j].obj);
4694 addReply(c,shared.crlf);
4695 }
4696 listRewind(operations);
4697 while((ln = listYield(operations))) {
4698 redisSortOperation *sop = ln->value;
4699 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4700 vector[j].obj);
4701
4702 if (sop->type == REDIS_SORT_GET) {
4703 if (!val || val->type != REDIS_STRING) {
4704 addReply(c,shared.nullbulk);
4705 } else {
4706 addReplyBulkLen(c,val);
4707 addReply(c,val);
4708 addReply(c,shared.crlf);
4709 }
4710 } else {
4711 assert(sop->type == REDIS_SORT_GET); /* always fails */
4712 }
4713 }
4714 }
4715 } else {
4716 robj *listObject = createListObject();
4717 list *listPtr = (list*) listObject->ptr;
4718
4719 /* STORE option specified, set the sorting result as a List object */
4720 for (j = start; j <= end; j++) {
4721 listNode *ln;
4722 if (!getop) {
4723 listAddNodeTail(listPtr,vector[j].obj);
4724 incrRefCount(vector[j].obj);
4725 }
4726 listRewind(operations);
4727 while((ln = listYield(operations))) {
4728 redisSortOperation *sop = ln->value;
4729 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4730 vector[j].obj);
4731
4732 if (sop->type == REDIS_SORT_GET) {
4733 if (!val || val->type != REDIS_STRING) {
4734 listAddNodeTail(listPtr,createStringObject("",0));
4735 } else {
4736 listAddNodeTail(listPtr,val);
4737 incrRefCount(val);
4738 }
4739 } else {
4740 assert(sop->type == REDIS_SORT_GET); /* always fails */
4741 }
4742 }
4743 }
4744 if (dictReplace(c->db->dict,storekey,listObject)) {
4745 incrRefCount(storekey);
4746 }
4747 /* Note: we add 1 because the DB is dirty anyway since even if the
4748 * SORT result is empty a new key is set and maybe the old content
4749 * replaced. */
4750 server.dirty += 1+outputlen;
4751 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
4752 }
4753
4754 /* Cleanup */
4755 decrRefCount(sortval);
4756 listRelease(operations);
4757 for (j = 0; j < vectorlen; j++) {
4758 if (sortby && alpha && vector[j].u.cmpobj)
4759 decrRefCount(vector[j].u.cmpobj);
4760 }
4761 zfree(vector);
4762 }
4763
4764 static void infoCommand(redisClient *c) {
4765 sds info;
4766 time_t uptime = time(NULL)-server.stat_starttime;
4767 int j;
4768
4769 info = sdscatprintf(sdsempty(),
4770 "redis_version:%s\r\n"
4771 "arch_bits:%s\r\n"
4772 "uptime_in_seconds:%d\r\n"
4773 "uptime_in_days:%d\r\n"
4774 "connected_clients:%d\r\n"
4775 "connected_slaves:%d\r\n"
4776 "used_memory:%zu\r\n"
4777 "changes_since_last_save:%lld\r\n"
4778 "bgsave_in_progress:%d\r\n"
4779 "last_save_time:%d\r\n"
4780 "total_connections_received:%lld\r\n"
4781 "total_commands_processed:%lld\r\n"
4782 "role:%s\r\n"
4783 ,REDIS_VERSION,
4784 (sizeof(long) == 8) ? "64" : "32",
4785 uptime,
4786 uptime/(3600*24),
4787 listLength(server.clients)-listLength(server.slaves),
4788 listLength(server.slaves),
4789 server.usedmemory,
4790 server.dirty,
4791 server.bgsaveinprogress,
4792 server.lastsave,
4793 server.stat_numconnections,
4794 server.stat_numcommands,
4795 server.masterhost == NULL ? "master" : "slave"
4796 );
4797 if (server.masterhost) {
4798 info = sdscatprintf(info,
4799 "master_host:%s\r\n"
4800 "master_port:%d\r\n"
4801 "master_link_status:%s\r\n"
4802 "master_last_io_seconds_ago:%d\r\n"
4803 ,server.masterhost,
4804 server.masterport,
4805 (server.replstate == REDIS_REPL_CONNECTED) ?
4806 "up" : "down",
4807 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
4808 );
4809 }
4810 for (j = 0; j < server.dbnum; j++) {
4811 long long keys, vkeys;
4812
4813 keys = dictSize(server.db[j].dict);
4814 vkeys = dictSize(server.db[j].expires);
4815 if (keys || vkeys) {
4816 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4817 j, keys, vkeys);
4818 }
4819 }
4820 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4821 addReplySds(c,info);
4822 addReply(c,shared.crlf);
4823 }
4824
4825 static void monitorCommand(redisClient *c) {
4826 /* ignore MONITOR if aleady slave or in monitor mode */
4827 if (c->flags & REDIS_SLAVE) return;
4828
4829 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4830 c->slaveseldb = 0;
4831 listAddNodeTail(server.monitors,c);
4832 addReply(c,shared.ok);
4833 }
4834
4835 /* ================================= Expire ================================= */
4836 static int removeExpire(redisDb *db, robj *key) {
4837 if (dictDelete(db->expires,key) == DICT_OK) {
4838 return 1;
4839 } else {
4840 return 0;
4841 }
4842 }
4843
4844 static int setExpire(redisDb *db, robj *key, time_t when) {
4845 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4846 return 0;
4847 } else {
4848 incrRefCount(key);
4849 return 1;
4850 }
4851 }
4852
4853 /* Return the expire time of the specified key, or -1 if no expire
4854 * is associated with this key (i.e. the key is non volatile) */
4855 static time_t getExpire(redisDb *db, robj *key) {
4856 dictEntry *de;
4857
4858 /* No expire? return ASAP */
4859 if (dictSize(db->expires) == 0 ||
4860 (de = dictFind(db->expires,key)) == NULL) return -1;
4861
4862 return (time_t) dictGetEntryVal(de);
4863 }
4864
4865 static int expireIfNeeded(redisDb *db, robj *key) {
4866 time_t when;
4867 dictEntry *de;
4868
4869 /* No expire? return ASAP */
4870 if (dictSize(db->expires) == 0 ||
4871 (de = dictFind(db->expires,key)) == NULL) return 0;
4872
4873 /* Lookup the expire */
4874 when = (time_t) dictGetEntryVal(de);
4875 if (time(NULL) <= when) return 0;
4876
4877 /* Delete the key */
4878 dictDelete(db->expires,key);
4879 return dictDelete(db->dict,key) == DICT_OK;
4880 }
4881
4882 static int deleteIfVolatile(redisDb *db, robj *key) {
4883 dictEntry *de;
4884
4885 /* No expire? return ASAP */
4886 if (dictSize(db->expires) == 0 ||
4887 (de = dictFind(db->expires,key)) == NULL) return 0;
4888
4889 /* Delete the key */
4890 server.dirty++;
4891 dictDelete(db->expires,key);
4892 return dictDelete(db->dict,key) == DICT_OK;
4893 }
4894
4895 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
4896 dictEntry *de;
4897
4898 de = dictFind(c->db->dict,key);
4899 if (de == NULL) {
4900 addReply(c,shared.czero);
4901 return;
4902 }
4903 if (seconds < 0) {
4904 if (deleteKey(c->db,key)) server.dirty++;
4905 addReply(c, shared.cone);
4906 return;
4907 } else {
4908 time_t when = time(NULL)+seconds;
4909 if (setExpire(c->db,key,when)) {
4910 addReply(c,shared.cone);
4911 server.dirty++;
4912 } else {
4913 addReply(c,shared.czero);
4914 }
4915 return;
4916 }
4917 }
4918
4919 static void expireCommand(redisClient *c) {
4920 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
4921 }
4922
4923 static void expireatCommand(redisClient *c) {
4924 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
4925 }
4926
4927 static void ttlCommand(redisClient *c) {
4928 time_t expire;
4929 int ttl = -1;
4930
4931 expire = getExpire(c->db,c->argv[1]);
4932 if (expire != -1) {
4933 ttl = (int) (expire-time(NULL));
4934 if (ttl < 0) ttl = -1;
4935 }
4936 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4937 }
4938
4939 static void msetGenericCommand(redisClient *c, int nx) {
4940 int j;
4941
4942 if ((c->argc % 2) == 0) {
4943 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4944 return;
4945 }
4946 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4947 * set nothing at all if at least one already key exists. */
4948 if (nx) {
4949 for (j = 1; j < c->argc; j += 2) {
4950 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4951 addReply(c, shared.czero);
4952 return;
4953 }
4954 }
4955 }
4956
4957 for (j = 1; j < c->argc; j += 2) {
4958 int retval;
4959
4960 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4961 if (retval == DICT_ERR) {
4962 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4963 incrRefCount(c->argv[j+1]);
4964 } else {
4965 incrRefCount(c->argv[j]);
4966 incrRefCount(c->argv[j+1]);
4967 }
4968 removeExpire(c->db,c->argv[j]);
4969 }
4970 server.dirty += (c->argc-1)/2;
4971 addReply(c, nx ? shared.cone : shared.ok);
4972 }
4973
4974 static void msetCommand(redisClient *c) {
4975 msetGenericCommand(c,0);
4976 }
4977
4978 static void msetnxCommand(redisClient *c) {
4979 msetGenericCommand(c,1);
4980 }
4981
4982 /* =============================== Replication ============================= */
4983
4984 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4985 ssize_t nwritten, ret = size;
4986 time_t start = time(NULL);
4987
4988 timeout++;
4989 while(size) {
4990 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4991 nwritten = write(fd,ptr,size);
4992 if (nwritten == -1) return -1;
4993 ptr += nwritten;
4994 size -= nwritten;
4995 }
4996 if ((time(NULL)-start) > timeout) {
4997 errno = ETIMEDOUT;
4998 return -1;
4999 }
5000 }
5001 return ret;
5002 }
5003
5004 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5005 ssize_t nread, totread = 0;
5006 time_t start = time(NULL);
5007
5008 timeout++;
5009 while(size) {
5010 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5011 nread = read(fd,ptr,size);
5012 if (nread == -1) return -1;
5013 ptr += nread;
5014 size -= nread;
5015 totread += nread;
5016 }
5017 if ((time(NULL)-start) > timeout) {
5018 errno = ETIMEDOUT;
5019 return -1;
5020 }
5021 }
5022 return totread;
5023 }
5024
5025 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5026 ssize_t nread = 0;
5027
5028 size--;
5029 while(size) {
5030 char c;
5031
5032 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5033 if (c == '\n') {
5034 *ptr = '\0';
5035 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5036 return nread;
5037 } else {
5038 *ptr++ = c;
5039 *ptr = '\0';
5040 nread++;
5041 }
5042 }
5043 return nread;
5044 }
5045
5046 static void syncCommand(redisClient *c) {
5047 /* ignore SYNC if aleady slave or in monitor mode */
5048 if (c->flags & REDIS_SLAVE) return;
5049
5050 /* SYNC can't be issued when the server has pending data to send to
5051 * the client about already issued commands. We need a fresh reply
5052 * buffer registering the differences between the BGSAVE and the current
5053 * dataset, so that we can copy to other slaves if needed. */
5054 if (listLength(c->reply) != 0) {
5055 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5056 return;
5057 }
5058
5059 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5060 /* Here we need to check if there is a background saving operation
5061 * in progress, or if it is required to start one */
5062 if (server.bgsaveinprogress) {
5063 /* Ok a background save is in progress. Let's check if it is a good
5064 * one for replication, i.e. if there is another slave that is
5065 * registering differences since the server forked to save */
5066 redisClient *slave;
5067 listNode *ln;
5068
5069 listRewind(server.slaves);
5070 while((ln = listYield(server.slaves))) {
5071 slave = ln->value;
5072 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5073 }
5074 if (ln) {
5075 /* Perfect, the server is already registering differences for
5076 * another slave. Set the right state, and copy the buffer. */
5077 listRelease(c->reply);
5078 c->reply = listDup(slave->reply);
5079 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5080 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5081 } else {
5082 /* No way, we need to wait for the next BGSAVE in order to
5083 * register differences */
5084 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5085 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5086 }
5087 } else {
5088 /* Ok we don't have a BGSAVE in progress, let's start one */
5089 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5090 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5091 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5092 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5093 return;
5094 }
5095 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5096 }
5097 c->repldbfd = -1;
5098 c->flags |= REDIS_SLAVE;
5099 c->slaveseldb = 0;
5100 listAddNodeTail(server.slaves,c);
5101 return;
5102 }
5103
5104 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5105 redisClient *slave = privdata;
5106 REDIS_NOTUSED(el);
5107 REDIS_NOTUSED(mask);
5108 char buf[REDIS_IOBUF_LEN];
5109 ssize_t nwritten, buflen;
5110
5111 if (slave->repldboff == 0) {
5112 /* Write the bulk write count before to transfer the DB. In theory here
5113 * we don't know how much room there is in the output buffer of the
5114 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5115 * operations) will never be smaller than the few bytes we need. */
5116 sds bulkcount;
5117
5118 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5119 slave->repldbsize);
5120 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5121 {
5122 sdsfree(bulkcount);
5123 freeClient(slave);
5124 return;
5125 }
5126 sdsfree(bulkcount);
5127 }
5128 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5129 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5130 if (buflen <= 0) {
5131 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5132 (buflen == 0) ? "premature EOF" : strerror(errno));
5133 freeClient(slave);
5134 return;
5135 }
5136 if ((nwritten = write(fd,buf,buflen)) == -1) {
5137 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5138 strerror(errno));
5139 freeClient(slave);
5140 return;
5141 }
5142 slave->repldboff += nwritten;
5143 if (slave->repldboff == slave->repldbsize) {
5144 close(slave->repldbfd);
5145 slave->repldbfd = -1;
5146 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5147 slave->replstate = REDIS_REPL_ONLINE;
5148 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5149 sendReplyToClient, slave, NULL) == AE_ERR) {
5150 freeClient(slave);
5151 return;
5152 }
5153 addReplySds(slave,sdsempty());
5154 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5155 }
5156 }
5157
5158 /* This function is called at the end of every backgrond saving.
5159 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5160 * otherwise REDIS_ERR is passed to the function.
5161 *
5162 * The goal of this function is to handle slaves waiting for a successful
5163 * background saving in order to perform non-blocking synchronization. */
5164 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5165 listNode *ln;
5166 int startbgsave = 0;
5167
5168 listRewind(server.slaves);
5169 while((ln = listYield(server.slaves))) {
5170 redisClient *slave = ln->value;
5171
5172 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5173 startbgsave = 1;
5174 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5175 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5176 struct redis_stat buf;
5177
5178 if (bgsaveerr != REDIS_OK) {
5179 freeClient(slave);
5180 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5181 continue;
5182 }
5183 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5184 redis_fstat(slave->repldbfd,&buf) == -1) {
5185 freeClient(slave);
5186 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5187 continue;
5188 }
5189 slave->repldboff = 0;
5190 slave->repldbsize = buf.st_size;
5191 slave->replstate = REDIS_REPL_SEND_BULK;
5192 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5193 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
5194 freeClient(slave);
5195 continue;
5196 }
5197 }
5198 }
5199 if (startbgsave) {
5200 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5201 listRewind(server.slaves);
5202 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5203 while((ln = listYield(server.slaves))) {
5204 redisClient *slave = ln->value;
5205
5206 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5207 freeClient(slave);
5208 }
5209 }
5210 }
5211 }
5212
5213 static int syncWithMaster(void) {
5214 char buf[1024], tmpfile[256], authcmd[1024];
5215 int dumpsize;
5216 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5217 int dfd;
5218
5219 if (fd == -1) {
5220 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5221 strerror(errno));
5222 return REDIS_ERR;
5223 }
5224
5225 /* AUTH with the master if required. */
5226 if(server.masterauth) {
5227 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5228 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5229 close(fd);
5230 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5231 strerror(errno));
5232 return REDIS_ERR;
5233 }
5234 /* Read the AUTH result. */
5235 if (syncReadLine(fd,buf,1024,3600) == -1) {
5236 close(fd);
5237 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5238 strerror(errno));
5239 return REDIS_ERR;
5240 }
5241 if (buf[0] != '+') {
5242 close(fd);
5243 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5244 return REDIS_ERR;
5245 }
5246 }
5247
5248 /* Issue the SYNC command */
5249 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5250 close(fd);
5251 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5252 strerror(errno));
5253 return REDIS_ERR;
5254 }
5255 /* Read the bulk write count */
5256 if (syncReadLine(fd,buf,1024,3600) == -1) {
5257 close(fd);
5258 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5259 strerror(errno));
5260 return REDIS_ERR;
5261 }
5262 if (buf[0] != '$') {
5263 close(fd);
5264 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5265 return REDIS_ERR;
5266 }
5267 dumpsize = atoi(buf+1);
5268 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5269 /* Read the bulk write data on a temp file */
5270 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5271 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5272 if (dfd == -1) {
5273 close(fd);
5274 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5275 return REDIS_ERR;
5276 }
5277 while(dumpsize) {
5278 int nread, nwritten;
5279
5280 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5281 if (nread == -1) {
5282 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5283 strerror(errno));
5284 close(fd);
5285 close(dfd);
5286 return REDIS_ERR;
5287 }
5288 nwritten = write(dfd,buf,nread);
5289 if (nwritten == -1) {
5290 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5291 close(fd);
5292 close(dfd);
5293 return REDIS_ERR;
5294 }
5295 dumpsize -= nread;
5296 }
5297 close(dfd);
5298 if (rename(tmpfile,server.dbfilename) == -1) {
5299 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5300 unlink(tmpfile);
5301 close(fd);
5302 return REDIS_ERR;
5303 }
5304 emptyDb();
5305 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5306 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5307 close(fd);
5308 return REDIS_ERR;
5309 }
5310 server.master = createClient(fd);
5311 server.master->flags |= REDIS_MASTER;
5312 server.replstate = REDIS_REPL_CONNECTED;
5313 return REDIS_OK;
5314 }
5315
5316 static void slaveofCommand(redisClient *c) {
5317 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5318 !strcasecmp(c->argv[2]->ptr,"one")) {
5319 if (server.masterhost) {
5320 sdsfree(server.masterhost);
5321 server.masterhost = NULL;
5322 if (server.master) freeClient(server.master);
5323 server.replstate = REDIS_REPL_NONE;
5324 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5325 }
5326 } else {
5327 sdsfree(server.masterhost);
5328 server.masterhost = sdsdup(c->argv[1]->ptr);
5329 server.masterport = atoi(c->argv[2]->ptr);
5330 if (server.master) freeClient(server.master);
5331 server.replstate = REDIS_REPL_CONNECT;
5332 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5333 server.masterhost, server.masterport);
5334 }
5335 addReply(c,shared.ok);
5336 }
5337
5338 /* ============================ Maxmemory directive ======================== */
5339
5340 /* This function gets called when 'maxmemory' is set on the config file to limit
5341 * the max memory used by the server, and we are out of memory.
5342 * This function will try to, in order:
5343 *
5344 * - Free objects from the free list
5345 * - Try to remove keys with an EXPIRE set
5346 *
5347 * It is not possible to free enough memory to reach used-memory < maxmemory
5348 * the server will start refusing commands that will enlarge even more the
5349 * memory usage.
5350 */
5351 static void freeMemoryIfNeeded(void) {
5352 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5353 if (listLength(server.objfreelist)) {
5354 robj *o;
5355
5356 listNode *head = listFirst(server.objfreelist);
5357 o = listNodeValue(head);
5358 listDelNode(server.objfreelist,head);
5359 zfree(o);
5360 } else {
5361 int j, k, freed = 0;
5362
5363 for (j = 0; j < server.dbnum; j++) {
5364 int minttl = -1;
5365 robj *minkey = NULL;
5366 struct dictEntry *de;
5367
5368 if (dictSize(server.db[j].expires)) {
5369 freed = 1;
5370 /* From a sample of three keys drop the one nearest to
5371 * the natural expire */
5372 for (k = 0; k < 3; k++) {
5373 time_t t;
5374
5375 de = dictGetRandomKey(server.db[j].expires);
5376 t = (time_t) dictGetEntryVal(de);
5377 if (minttl == -1 || t < minttl) {
5378 minkey = dictGetEntryKey(de);
5379 minttl = t;
5380 }
5381 }
5382 deleteKey(server.db+j,minkey);
5383 }
5384 }
5385 if (!freed) return; /* nothing to free... */
5386 }
5387 }
5388 }
5389
5390 /* ============================== Append Only file ========================== */
5391
5392 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5393 sds buf = sdsempty();
5394 int j;
5395 ssize_t nwritten;
5396 time_t now;
5397 robj *tmpargv[3];
5398
5399 /* The DB this command was targetting is not the same as the last command
5400 * we appendend. To issue a SELECT command is needed. */
5401 if (dictid != server.appendseldb) {
5402 char seldb[64];
5403
5404 snprintf(seldb,sizeof(seldb),"%d",dictid);
5405 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
5406 strlen(seldb),seldb);
5407 server.appendseldb = dictid;
5408 }
5409
5410 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5411 * EXPIREs into EXPIREATs calls */
5412 if (cmd->proc == expireCommand) {
5413 long when;
5414
5415 tmpargv[0] = createStringObject("EXPIREAT",8);
5416 tmpargv[1] = argv[1];
5417 incrRefCount(argv[1]);
5418 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5419 tmpargv[2] = createObject(REDIS_STRING,
5420 sdscatprintf(sdsempty(),"%ld",when));
5421 argv = tmpargv;
5422 }
5423
5424 /* Append the actual command */
5425 buf = sdscatprintf(buf,"*%d\r\n",argc);
5426 for (j = 0; j < argc; j++) {
5427 robj *o = argv[j];
5428
5429 if (o->encoding != REDIS_ENCODING_RAW)
5430 o = getDecodedObject(o);
5431 buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
5432 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5433 buf = sdscatlen(buf,"\r\n",2);
5434 if (o != argv[j])
5435 decrRefCount(o);
5436 }
5437
5438 /* Free the objects from the modified argv for EXPIREAT */
5439 if (cmd->proc == expireCommand) {
5440 for (j = 0; j < 3; j++)
5441 decrRefCount(argv[j]);
5442 }
5443
5444 /* We want to perform a single write. This should be guaranteed atomic
5445 * at least if the filesystem we are writing is a real physical one.
5446 * While this will save us against the server being killed I don't think
5447 * there is much to do about the whole server stopping for power problems
5448 * or alike */
5449 nwritten = write(server.appendfd,buf,sdslen(buf));
5450 if (nwritten != (signed)sdslen(buf)) {
5451 /* Ooops, we are in troubles. The best thing to do for now is
5452 * to simply exit instead to give the illusion that everything is
5453 * working as expected. */
5454 if (nwritten == -1) {
5455 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5456 } else {
5457 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5458 }
5459 exit(1);
5460 }
5461 now = time(NULL);
5462 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5463 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5464 now-server.lastfsync > 1))
5465 {
5466 fsync(server.appendfd); /* Let's try to get this data on the disk */
5467 server.lastfsync = now;
5468 }
5469 }
5470
5471 /* In Redis commands are always executed in the context of a client, so in
5472 * order to load the append only file we need to create a fake client. */
5473 static struct redisClient *createFakeClient(void) {
5474 struct redisClient *c = zmalloc(sizeof(*c));
5475
5476 selectDb(c,0);
5477 c->fd = -1;
5478 c->querybuf = sdsempty();
5479 c->argc = 0;
5480 c->argv = NULL;
5481 c->flags = 0;
5482 /* We set the fake client as a slave waiting for the synchronization
5483 * so that Redis will not try to send replies to this client. */
5484 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5485 c->reply = listCreate();
5486 listSetFreeMethod(c->reply,decrRefCount);
5487 listSetDupMethod(c->reply,dupClientReplyValue);
5488 return c;
5489 }
5490
5491 static void freeFakeClient(struct redisClient *c) {
5492 sdsfree(c->querybuf);
5493 listRelease(c->reply);
5494 zfree(c);
5495 }
5496
5497 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5498 * error (the append only file is zero-length) REDIS_ERR is returned. On
5499 * fatal error an error message is logged and the program exists. */
5500 int loadAppendOnlyFile(char *filename) {
5501 struct redisClient *fakeClient;
5502 FILE *fp = fopen(filename,"r");
5503 struct redis_stat sb;
5504
5505 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5506 return REDIS_ERR;
5507
5508 if (fp == NULL) {
5509 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5510 exit(1);
5511 }
5512
5513 fakeClient = createFakeClient();
5514 while(1) {
5515 int argc, j;
5516 unsigned long len;
5517 robj **argv;
5518 char buf[128];
5519 sds argsds;
5520 struct redisCommand *cmd;
5521
5522 if (fgets(buf,sizeof(buf),fp) == NULL) {
5523 if (feof(fp))
5524 break;
5525 else
5526 goto readerr;
5527 }
5528 if (buf[0] != '*') goto fmterr;
5529 argc = atoi(buf+1);
5530 argv = zmalloc(sizeof(robj*)*argc);
5531 for (j = 0; j < argc; j++) {
5532 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5533 if (buf[0] != '$') goto fmterr;
5534 len = strtol(buf+1,NULL,10);
5535 argsds = sdsnewlen(NULL,len);
5536 if (fread(argsds,len,1,fp) == 0) goto fmterr;
5537 argv[j] = createObject(REDIS_STRING,argsds);
5538 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5539 }
5540
5541 /* Command lookup */
5542 cmd = lookupCommand(argv[0]->ptr);
5543 if (!cmd) {
5544 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5545 exit(1);
5546 }
5547 /* Try object sharing and encoding */
5548 if (server.shareobjects) {
5549 int j;
5550 for(j = 1; j < argc; j++)
5551 argv[j] = tryObjectSharing(argv[j]);
5552 }
5553 if (cmd->flags & REDIS_CMD_BULK)
5554 tryObjectEncoding(argv[argc-1]);
5555 /* Run the command in the context of a fake client */
5556 fakeClient->argc = argc;
5557 fakeClient->argv = argv;
5558 cmd->proc(fakeClient);
5559 /* Discard the reply objects list from the fake client */
5560 while(listLength(fakeClient->reply))
5561 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5562 /* Clean up, ready for the next command */
5563 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5564 zfree(argv);
5565 }
5566 fclose(fp);
5567 freeFakeClient(fakeClient);
5568 return REDIS_OK;
5569
5570 readerr:
5571 if (feof(fp)) {
5572 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5573 } else {
5574 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5575 }
5576 exit(1);
5577 fmterr:
5578 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5579 exit(1);
5580 }
5581
5582 /* ================================= Debugging ============================== */
5583
5584 static void debugCommand(redisClient *c) {
5585 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
5586 *((char*)-1) = 'x';
5587 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
5588 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
5589 robj *key, *val;
5590
5591 if (!de) {
5592 addReply(c,shared.nokeyerr);
5593 return;
5594 }
5595 key = dictGetEntryKey(de);
5596 val = dictGetEntryVal(de);
5597 addReplySds(c,sdscatprintf(sdsempty(),
5598 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5599 key, key->refcount, val, val->refcount, val->encoding));
5600 } else {
5601 addReplySds(c,sdsnew(
5602 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5603 }
5604 }
5605
5606 /* =================================== Main! ================================ */
5607
5608 #ifdef __linux__
5609 int linuxOvercommitMemoryValue(void) {
5610 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5611 char buf[64];
5612
5613 if (!fp) return -1;
5614 if (fgets(buf,64,fp) == NULL) {
5615 fclose(fp);
5616 return -1;
5617 }
5618 fclose(fp);
5619
5620 return atoi(buf);
5621 }
5622
5623 void linuxOvercommitMemoryWarning(void) {
5624 if (linuxOvercommitMemoryValue() == 0) {
5625 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5626 }
5627 }
5628 #endif /* __linux__ */
5629
5630 static void daemonize(void) {
5631 int fd;
5632 FILE *fp;
5633
5634 if (fork() != 0) exit(0); /* parent exits */
5635 setsid(); /* create a new session */
5636
5637 /* Every output goes to /dev/null. If Redis is daemonized but
5638 * the 'logfile' is set to 'stdout' in the configuration file
5639 * it will not log at all. */
5640 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5641 dup2(fd, STDIN_FILENO);
5642 dup2(fd, STDOUT_FILENO);
5643 dup2(fd, STDERR_FILENO);
5644 if (fd > STDERR_FILENO) close(fd);
5645 }
5646 /* Try to write the pid file */
5647 fp = fopen(server.pidfile,"w");
5648 if (fp) {
5649 fprintf(fp,"%d\n",getpid());
5650 fclose(fp);
5651 }
5652 }
5653
5654 int main(int argc, char **argv) {
5655 initServerConfig();
5656 if (argc == 2) {
5657 resetServerSaveParams();
5658 loadServerConfig(argv[1]);
5659 } else if (argc > 2) {
5660 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5661 exit(1);
5662 } else {
5663 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5664 }
5665 initServer();
5666 if (server.daemonize) daemonize();
5667 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5668 #ifdef __linux__
5669 linuxOvercommitMemoryWarning();
5670 #endif
5671 if (server.appendonly) {
5672 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
5673 redisLog(REDIS_NOTICE,"DB loaded from append only file");
5674 } else {
5675 if (rdbLoad(server.dbfilename) == REDIS_OK)
5676 redisLog(REDIS_NOTICE,"DB loaded from disk");
5677 }
5678 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5679 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5680 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5681 aeMain(server.el);
5682 aeDeleteEventLoop(server.el);
5683 return 0;
5684 }
5685
5686 /* ============================= Backtrace support ========================= */
5687
5688 #ifdef HAVE_BACKTRACE
5689 static char *findFuncName(void *pointer, unsigned long *offset);
5690
5691 static void *getMcontextEip(ucontext_t *uc) {
5692 #if defined(__FreeBSD__)
5693 return (void*) uc->uc_mcontext.mc_eip;
5694 #elif defined(__dietlibc__)
5695 return (void*) uc->uc_mcontext.eip;
5696 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5697 return (void*) uc->uc_mcontext->__ss.__eip;
5698 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5699 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5700 return (void*) uc->uc_mcontext->__ss.__rip;
5701 #else
5702 return (void*) uc->uc_mcontext->__ss.__eip;
5703 #endif
5704 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5705 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5706 #elif defined(__ia64__) /* Linux IA64 */
5707 return (void*) uc->uc_mcontext.sc_ip;
5708 #else
5709 return NULL;
5710 #endif
5711 }
5712
5713 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5714 void *trace[100];
5715 char **messages = NULL;
5716 int i, trace_size = 0;
5717 unsigned long offset=0;
5718 time_t uptime = time(NULL)-server.stat_starttime;
5719 ucontext_t *uc = (ucontext_t*) secret;
5720 REDIS_NOTUSED(info);
5721
5722 redisLog(REDIS_WARNING,
5723 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5724 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5725 "redis_version:%s; "
5726 "uptime_in_seconds:%d; "
5727 "connected_clients:%d; "
5728 "connected_slaves:%d; "
5729 "used_memory:%zu; "
5730 "changes_since_last_save:%lld; "
5731 "bgsave_in_progress:%d; "
5732 "last_save_time:%d; "
5733 "total_connections_received:%lld; "
5734 "total_commands_processed:%lld; "
5735 "role:%s;"
5736 ,REDIS_VERSION,
5737 uptime,
5738 listLength(server.clients)-listLength(server.slaves),
5739 listLength(server.slaves),
5740 server.usedmemory,
5741 server.dirty,
5742 server.bgsaveinprogress,
5743 server.lastsave,
5744 server.stat_numconnections,
5745 server.stat_numcommands,
5746 server.masterhost == NULL ? "master" : "slave"
5747 ));
5748
5749 trace_size = backtrace(trace, 100);
5750 /* overwrite sigaction with caller's address */
5751 if (getMcontextEip(uc) != NULL) {
5752 trace[1] = getMcontextEip(uc);
5753 }
5754 messages = backtrace_symbols(trace, trace_size);
5755
5756 for (i=1; i<trace_size; ++i) {
5757 char *fn = findFuncName(trace[i], &offset), *p;
5758
5759 p = strchr(messages[i],'+');
5760 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5761 redisLog(REDIS_WARNING,"%s", messages[i]);
5762 } else {
5763 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5764 }
5765 }
5766 free(messages);
5767 exit(0);
5768 }
5769
5770 static void setupSigSegvAction(void) {
5771 struct sigaction act;
5772
5773 sigemptyset (&act.sa_mask);
5774 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5775 * is used. Otherwise, sa_handler is used */
5776 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5777 act.sa_sigaction = segvHandler;
5778 sigaction (SIGSEGV, &act, NULL);
5779 sigaction (SIGBUS, &act, NULL);
5780 sigaction (SIGFPE, &act, NULL);
5781 sigaction (SIGILL, &act, NULL);
5782 sigaction (SIGBUS, &act, NULL);
5783 return;
5784 }
5785
5786 #include "staticsymbols.h"
5787 /* This function try to convert a pointer into a function name. It's used in
5788 * oreder to provide a backtrace under segmentation fault that's able to
5789 * display functions declared as static (otherwise the backtrace is useless). */
5790 static char *findFuncName(void *pointer, unsigned long *offset){
5791 int i, ret = -1;
5792 unsigned long off, minoff = 0;
5793
5794 /* Try to match against the Symbol with the smallest offset */
5795 for (i=0; symsTable[i].pointer; i++) {
5796 unsigned long lp = (unsigned long) pointer;
5797
5798 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5799 off=lp-symsTable[i].pointer;
5800 if (ret < 0 || off < minoff) {
5801 minoff=off;
5802 ret=i;
5803 }
5804 }
5805 }
5806 if (ret == -1) return NULL;
5807 *offset = minoff;
5808 return symsTable[ret].name;
5809 }
5810 #else /* HAVE_BACKTRACE */
5811 static void setupSigSegvAction(void) {
5812 }
5813 #endif /* HAVE_BACKTRACE */
5814
5815
5816
5817 /* The End */
5818
5819
5820