]> git.saurik.com Git - redis.git/blob - redis.c
LPUSHPOP first implementation
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60 #include <math.h>
61
62 #if defined(__sun)
63 #include "solarisfixes.h"
64 #endif
65
66 #include "redis.h"
67 #include "ae.h" /* Event driven programming library */
68 #include "sds.h" /* Dynamic safe strings */
69 #include "anet.h" /* Networking the easy way */
70 #include "dict.h" /* Hash tables */
71 #include "adlist.h" /* Linked lists */
72 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
73 #include "lzf.h" /* LZF compression library */
74 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
75
76 /* Error codes */
77 #define REDIS_OK 0
78 #define REDIS_ERR -1
79
80 /* Static server configuration */
81 #define REDIS_SERVERPORT 6379 /* TCP port */
82 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
83 #define REDIS_IOBUF_LEN 1024
84 #define REDIS_LOADBUF_LEN 1024
85 #define REDIS_STATIC_ARGS 4
86 #define REDIS_DEFAULT_DBNUM 16
87 #define REDIS_CONFIGLINE_MAX 1024
88 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
89 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
90 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
91 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
92 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
93
94 /* Hash table parameters */
95 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
96
97 /* Command flags */
98 #define REDIS_CMD_BULK 1 /* Bulk write command */
99 #define REDIS_CMD_INLINE 2 /* Inline command */
100 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
101 this flags will return an error when the 'maxmemory' option is set in the
102 config file and the server is using more than maxmemory bytes of memory.
103 In short this commands are denied on low memory conditions. */
104 #define REDIS_CMD_DENYOOM 4
105
106 /* Object types */
107 #define REDIS_STRING 0
108 #define REDIS_LIST 1
109 #define REDIS_SET 2
110 #define REDIS_ZSET 3
111 #define REDIS_HASH 4
112
113 /* Objects encoding */
114 #define REDIS_ENCODING_RAW 0 /* Raw representation */
115 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
116
117 /* Object types only used for dumping to disk */
118 #define REDIS_EXPIRETIME 253
119 #define REDIS_SELECTDB 254
120 #define REDIS_EOF 255
121
122 /* Defines related to the dump file format. To store 32 bits lengths for short
123 * keys requires a lot of space, so we check the most significant 2 bits of
124 * the first byte to interpreter the length:
125 *
126 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
127 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
128 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
129 * 11|000000 this means: specially encoded object will follow. The six bits
130 * number specify the kind of object that follows.
131 * See the REDIS_RDB_ENC_* defines.
132 *
133 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
134 * values, will fit inside. */
135 #define REDIS_RDB_6BITLEN 0
136 #define REDIS_RDB_14BITLEN 1
137 #define REDIS_RDB_32BITLEN 2
138 #define REDIS_RDB_ENCVAL 3
139 #define REDIS_RDB_LENERR UINT_MAX
140
141 /* When a length of a string object stored on disk has the first two bits
142 * set, the remaining two bits specify a special encoding for the object
143 * accordingly to the following defines: */
144 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
145 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
146 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
147 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
148
149 /* Client flags */
150 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
151 #define REDIS_SLAVE 2 /* This client is a slave server */
152 #define REDIS_MASTER 4 /* This client is a master server */
153 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
154
155 /* Slave replication state - slave side */
156 #define REDIS_REPL_NONE 0 /* No active replication */
157 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
158 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
159
160 /* Slave replication state - from the point of view of master
161 * Note that in SEND_BULK and ONLINE state the slave receives new updates
162 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
163 * to start the next background saving in order to send updates to it. */
164 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
165 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
166 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
167 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
168
169 /* List related stuff */
170 #define REDIS_HEAD 0
171 #define REDIS_TAIL 1
172
173 /* Sort operations */
174 #define REDIS_SORT_GET 0
175 #define REDIS_SORT_ASC 1
176 #define REDIS_SORT_DESC 2
177 #define REDIS_SORTKEY_MAX 1024
178
179 /* Log levels */
180 #define REDIS_DEBUG 0
181 #define REDIS_NOTICE 1
182 #define REDIS_WARNING 2
183
184 /* Anti-warning macro... */
185 #define REDIS_NOTUSED(V) ((void) V)
186
187 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
188 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
189
190 /* Append only defines */
191 #define APPENDFSYNC_NO 0
192 #define APPENDFSYNC_ALWAYS 1
193 #define APPENDFSYNC_EVERYSEC 2
194
195 /*================================= Data types ============================== */
196
197 /* A redis object, that is a type able to hold a string / list / set */
198 typedef struct redisObject {
199 void *ptr;
200 unsigned char type;
201 unsigned char encoding;
202 unsigned char notused[2];
203 int refcount;
204 } robj;
205
206 typedef struct redisDb {
207 dict *dict;
208 dict *expires;
209 int id;
210 } redisDb;
211
212 /* With multiplexing we need to take per-clinet state.
213 * Clients are taken in a liked list. */
214 typedef struct redisClient {
215 int fd;
216 redisDb *db;
217 int dictid;
218 sds querybuf;
219 robj **argv, **mbargv;
220 int argc, mbargc;
221 int bulklen; /* bulk read len. -1 if not in bulk read mode */
222 int multibulk; /* multi bulk command format active */
223 list *reply;
224 int sentlen;
225 time_t lastinteraction; /* time of the last interaction, used for timeout */
226 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
227 int slaveseldb; /* slave selected db, if this client is a slave */
228 int authenticated; /* when requirepass is non-NULL */
229 int replstate; /* replication state if this is a slave */
230 int repldbfd; /* replication DB file descriptor */
231 long repldboff; /* replication DB file offset */
232 off_t repldbsize; /* replication DB file size */
233 } redisClient;
234
235 struct saveparam {
236 time_t seconds;
237 int changes;
238 };
239
240 /* Global server state structure */
241 struct redisServer {
242 int port;
243 int fd;
244 redisDb *db;
245 dict *sharingpool;
246 unsigned int sharingpoolsize;
247 long long dirty; /* changes to DB from the last save */
248 list *clients;
249 list *slaves, *monitors;
250 char neterr[ANET_ERR_LEN];
251 aeEventLoop *el;
252 int cronloops; /* number of times the cron function run */
253 list *objfreelist; /* A list of freed objects to avoid malloc() */
254 time_t lastsave; /* Unix time of last save succeeede */
255 size_t usedmemory; /* Used memory in megabytes */
256 /* Fields used only for stats */
257 time_t stat_starttime; /* server start time */
258 long long stat_numcommands; /* number of processed commands */
259 long long stat_numconnections; /* number of connections received */
260 /* Configuration */
261 int verbosity;
262 int glueoutputbuf;
263 int maxidletime;
264 int dbnum;
265 int daemonize;
266 int appendonly;
267 int appendfsync;
268 time_t lastfsync;
269 int appendfd;
270 int appendseldb;
271 char *pidfile;
272 int bgsaveinprogress;
273 pid_t bgsavechildpid;
274 struct saveparam *saveparams;
275 int saveparamslen;
276 char *logfile;
277 char *bindaddr;
278 char *dbfilename;
279 char *appendfilename;
280 char *requirepass;
281 int shareobjects;
282 /* Replication related */
283 int isslave;
284 char *masterauth;
285 char *masterhost;
286 int masterport;
287 redisClient *master; /* client that is master for this slave */
288 int replstate;
289 unsigned int maxclients;
290 unsigned long maxmemory;
291 /* Sort parameters - qsort_r() is only available under BSD so we
292 * have to take this state global, in order to pass it to sortCompare() */
293 int sort_desc;
294 int sort_alpha;
295 int sort_bypattern;
296 };
297
298 typedef void redisCommandProc(redisClient *c);
299 struct redisCommand {
300 char *name;
301 redisCommandProc *proc;
302 int arity;
303 int flags;
304 };
305
306 struct redisFunctionSym {
307 char *name;
308 unsigned long pointer;
309 };
310
311 typedef struct _redisSortObject {
312 robj *obj;
313 union {
314 double score;
315 robj *cmpobj;
316 } u;
317 } redisSortObject;
318
319 typedef struct _redisSortOperation {
320 int type;
321 robj *pattern;
322 } redisSortOperation;
323
324 /* ZSETs use a specialized version of Skiplists */
325
326 typedef struct zskiplistNode {
327 struct zskiplistNode **forward;
328 struct zskiplistNode *backward;
329 double score;
330 robj *obj;
331 } zskiplistNode;
332
333 typedef struct zskiplist {
334 struct zskiplistNode *header, *tail;
335 unsigned long length;
336 int level;
337 } zskiplist;
338
339 typedef struct zset {
340 dict *dict;
341 zskiplist *zsl;
342 } zset;
343
344 /* Our shared "common" objects */
345
346 struct sharedObjectsStruct {
347 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
348 *colon, *nullbulk, *nullmultibulk,
349 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
350 *outofrangeerr, *plus,
351 *select0, *select1, *select2, *select3, *select4,
352 *select5, *select6, *select7, *select8, *select9;
353 } shared;
354
355 /* Global vars that are actally used as constants. The following double
356 * values are used for double on-disk serialization, and are initialized
357 * at runtime to avoid strange compiler optimizations. */
358
359 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
360
361 /*================================ Prototypes =============================== */
362
363 static void freeStringObject(robj *o);
364 static void freeListObject(robj *o);
365 static void freeSetObject(robj *o);
366 static void decrRefCount(void *o);
367 static robj *createObject(int type, void *ptr);
368 static void freeClient(redisClient *c);
369 static int rdbLoad(char *filename);
370 static void addReply(redisClient *c, robj *obj);
371 static void addReplySds(redisClient *c, sds s);
372 static void incrRefCount(robj *o);
373 static int rdbSaveBackground(char *filename);
374 static robj *createStringObject(char *ptr, size_t len);
375 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
376 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
377 static int syncWithMaster(void);
378 static robj *tryObjectSharing(robj *o);
379 static int tryObjectEncoding(robj *o);
380 static robj *getDecodedObject(const robj *o);
381 static int removeExpire(redisDb *db, robj *key);
382 static int expireIfNeeded(redisDb *db, robj *key);
383 static int deleteIfVolatile(redisDb *db, robj *key);
384 static int deleteKey(redisDb *db, robj *key);
385 static time_t getExpire(redisDb *db, robj *key);
386 static int setExpire(redisDb *db, robj *key, time_t when);
387 static void updateSlavesWaitingBgsave(int bgsaveerr);
388 static void freeMemoryIfNeeded(void);
389 static int processCommand(redisClient *c);
390 static void setupSigSegvAction(void);
391 static void rdbRemoveTempFile(pid_t childpid);
392 static size_t stringObjectLen(robj *o);
393 static void processInputBuffer(redisClient *c);
394 static zskiplist *zslCreate(void);
395 static void zslFree(zskiplist *zsl);
396 static void zslInsert(zskiplist *zsl, double score, robj *obj);
397
398 static void authCommand(redisClient *c);
399 static void pingCommand(redisClient *c);
400 static void echoCommand(redisClient *c);
401 static void setCommand(redisClient *c);
402 static void setnxCommand(redisClient *c);
403 static void getCommand(redisClient *c);
404 static void delCommand(redisClient *c);
405 static void existsCommand(redisClient *c);
406 static void incrCommand(redisClient *c);
407 static void decrCommand(redisClient *c);
408 static void incrbyCommand(redisClient *c);
409 static void decrbyCommand(redisClient *c);
410 static void selectCommand(redisClient *c);
411 static void randomkeyCommand(redisClient *c);
412 static void keysCommand(redisClient *c);
413 static void dbsizeCommand(redisClient *c);
414 static void lastsaveCommand(redisClient *c);
415 static void saveCommand(redisClient *c);
416 static void bgsaveCommand(redisClient *c);
417 static void shutdownCommand(redisClient *c);
418 static void moveCommand(redisClient *c);
419 static void renameCommand(redisClient *c);
420 static void renamenxCommand(redisClient *c);
421 static void lpushCommand(redisClient *c);
422 static void rpushCommand(redisClient *c);
423 static void lpopCommand(redisClient *c);
424 static void rpopCommand(redisClient *c);
425 static void llenCommand(redisClient *c);
426 static void lindexCommand(redisClient *c);
427 static void lrangeCommand(redisClient *c);
428 static void ltrimCommand(redisClient *c);
429 static void typeCommand(redisClient *c);
430 static void lsetCommand(redisClient *c);
431 static void saddCommand(redisClient *c);
432 static void sremCommand(redisClient *c);
433 static void smoveCommand(redisClient *c);
434 static void sismemberCommand(redisClient *c);
435 static void scardCommand(redisClient *c);
436 static void spopCommand(redisClient *c);
437 static void srandmemberCommand(redisClient *c);
438 static void sinterCommand(redisClient *c);
439 static void sinterstoreCommand(redisClient *c);
440 static void sunionCommand(redisClient *c);
441 static void sunionstoreCommand(redisClient *c);
442 static void sdiffCommand(redisClient *c);
443 static void sdiffstoreCommand(redisClient *c);
444 static void syncCommand(redisClient *c);
445 static void flushdbCommand(redisClient *c);
446 static void flushallCommand(redisClient *c);
447 static void sortCommand(redisClient *c);
448 static void lremCommand(redisClient *c);
449 static void lpoppushCommand(redisClient *c);
450 static void infoCommand(redisClient *c);
451 static void mgetCommand(redisClient *c);
452 static void monitorCommand(redisClient *c);
453 static void expireCommand(redisClient *c);
454 static void expireatCommand(redisClient *c);
455 static void getsetCommand(redisClient *c);
456 static void ttlCommand(redisClient *c);
457 static void slaveofCommand(redisClient *c);
458 static void debugCommand(redisClient *c);
459 static void msetCommand(redisClient *c);
460 static void msetnxCommand(redisClient *c);
461 static void zaddCommand(redisClient *c);
462 static void zrangeCommand(redisClient *c);
463 static void zrangebyscoreCommand(redisClient *c);
464 static void zrevrangeCommand(redisClient *c);
465 static void zcardCommand(redisClient *c);
466 static void zremCommand(redisClient *c);
467 static void zscoreCommand(redisClient *c);
468 static void zremrangebyscoreCommand(redisClient *c);
469
470 /*================================= Globals ================================= */
471
472 /* Global vars */
473 static struct redisServer server; /* server global state */
474 static struct redisCommand cmdTable[] = {
475 {"get",getCommand,2,REDIS_CMD_INLINE},
476 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
477 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
478 {"del",delCommand,-2,REDIS_CMD_INLINE},
479 {"exists",existsCommand,2,REDIS_CMD_INLINE},
480 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
481 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
482 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
483 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
484 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
485 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
486 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
487 {"llen",llenCommand,2,REDIS_CMD_INLINE},
488 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
489 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
490 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
491 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
492 {"lrem",lremCommand,4,REDIS_CMD_BULK},
493 {"lpoppush",lpoppushCommand,3,REDIS_CMD_BULK},
494 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
495 {"srem",sremCommand,3,REDIS_CMD_BULK},
496 {"smove",smoveCommand,4,REDIS_CMD_BULK},
497 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
498 {"scard",scardCommand,2,REDIS_CMD_INLINE},
499 {"spop",spopCommand,2,REDIS_CMD_INLINE},
500 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
501 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
502 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
503 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
504 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
505 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
506 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
507 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
508 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
509 {"zrem",zremCommand,3,REDIS_CMD_BULK},
510 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
511 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
512 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
513 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
514 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
515 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
516 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
517 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
518 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
519 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
520 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
521 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
522 {"select",selectCommand,2,REDIS_CMD_INLINE},
523 {"move",moveCommand,3,REDIS_CMD_INLINE},
524 {"rename",renameCommand,3,REDIS_CMD_INLINE},
525 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
526 {"expire",expireCommand,3,REDIS_CMD_INLINE},
527 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
528 {"keys",keysCommand,2,REDIS_CMD_INLINE},
529 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
530 {"auth",authCommand,2,REDIS_CMD_INLINE},
531 {"ping",pingCommand,1,REDIS_CMD_INLINE},
532 {"echo",echoCommand,2,REDIS_CMD_BULK},
533 {"save",saveCommand,1,REDIS_CMD_INLINE},
534 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
535 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
536 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
537 {"type",typeCommand,2,REDIS_CMD_INLINE},
538 {"sync",syncCommand,1,REDIS_CMD_INLINE},
539 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
540 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
541 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
542 {"info",infoCommand,1,REDIS_CMD_INLINE},
543 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
544 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
545 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
546 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
547 {NULL,NULL,0,0}
548 };
549
550 /*============================ Utility functions ============================ */
551
552 /* Glob-style pattern matching. */
553 int stringmatchlen(const char *pattern, int patternLen,
554 const char *string, int stringLen, int nocase)
555 {
556 while(patternLen) {
557 switch(pattern[0]) {
558 case '*':
559 while (pattern[1] == '*') {
560 pattern++;
561 patternLen--;
562 }
563 if (patternLen == 1)
564 return 1; /* match */
565 while(stringLen) {
566 if (stringmatchlen(pattern+1, patternLen-1,
567 string, stringLen, nocase))
568 return 1; /* match */
569 string++;
570 stringLen--;
571 }
572 return 0; /* no match */
573 break;
574 case '?':
575 if (stringLen == 0)
576 return 0; /* no match */
577 string++;
578 stringLen--;
579 break;
580 case '[':
581 {
582 int not, match;
583
584 pattern++;
585 patternLen--;
586 not = pattern[0] == '^';
587 if (not) {
588 pattern++;
589 patternLen--;
590 }
591 match = 0;
592 while(1) {
593 if (pattern[0] == '\\') {
594 pattern++;
595 patternLen--;
596 if (pattern[0] == string[0])
597 match = 1;
598 } else if (pattern[0] == ']') {
599 break;
600 } else if (patternLen == 0) {
601 pattern--;
602 patternLen++;
603 break;
604 } else if (pattern[1] == '-' && patternLen >= 3) {
605 int start = pattern[0];
606 int end = pattern[2];
607 int c = string[0];
608 if (start > end) {
609 int t = start;
610 start = end;
611 end = t;
612 }
613 if (nocase) {
614 start = tolower(start);
615 end = tolower(end);
616 c = tolower(c);
617 }
618 pattern += 2;
619 patternLen -= 2;
620 if (c >= start && c <= end)
621 match = 1;
622 } else {
623 if (!nocase) {
624 if (pattern[0] == string[0])
625 match = 1;
626 } else {
627 if (tolower((int)pattern[0]) == tolower((int)string[0]))
628 match = 1;
629 }
630 }
631 pattern++;
632 patternLen--;
633 }
634 if (not)
635 match = !match;
636 if (!match)
637 return 0; /* no match */
638 string++;
639 stringLen--;
640 break;
641 }
642 case '\\':
643 if (patternLen >= 2) {
644 pattern++;
645 patternLen--;
646 }
647 /* fall through */
648 default:
649 if (!nocase) {
650 if (pattern[0] != string[0])
651 return 0; /* no match */
652 } else {
653 if (tolower((int)pattern[0]) != tolower((int)string[0]))
654 return 0; /* no match */
655 }
656 string++;
657 stringLen--;
658 break;
659 }
660 pattern++;
661 patternLen--;
662 if (stringLen == 0) {
663 while(*pattern == '*') {
664 pattern++;
665 patternLen--;
666 }
667 break;
668 }
669 }
670 if (patternLen == 0 && stringLen == 0)
671 return 1;
672 return 0;
673 }
674
675 static void redisLog(int level, const char *fmt, ...) {
676 va_list ap;
677 FILE *fp;
678
679 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
680 if (!fp) return;
681
682 va_start(ap, fmt);
683 if (level >= server.verbosity) {
684 char *c = ".-*";
685 char buf[64];
686 time_t now;
687
688 now = time(NULL);
689 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
690 fprintf(fp,"%s %c ",buf,c[level]);
691 vfprintf(fp, fmt, ap);
692 fprintf(fp,"\n");
693 fflush(fp);
694 }
695 va_end(ap);
696
697 if (server.logfile) fclose(fp);
698 }
699
700 /*====================== Hash table type implementation ==================== */
701
702 /* This is an hash table type that uses the SDS dynamic strings libary as
703 * keys and radis objects as values (objects can hold SDS strings,
704 * lists, sets). */
705
706 static void dictVanillaFree(void *privdata, void *val)
707 {
708 DICT_NOTUSED(privdata);
709 zfree(val);
710 }
711
712 static int sdsDictKeyCompare(void *privdata, const void *key1,
713 const void *key2)
714 {
715 int l1,l2;
716 DICT_NOTUSED(privdata);
717
718 l1 = sdslen((sds)key1);
719 l2 = sdslen((sds)key2);
720 if (l1 != l2) return 0;
721 return memcmp(key1, key2, l1) == 0;
722 }
723
724 static void dictRedisObjectDestructor(void *privdata, void *val)
725 {
726 DICT_NOTUSED(privdata);
727
728 decrRefCount(val);
729 }
730
731 static int dictObjKeyCompare(void *privdata, const void *key1,
732 const void *key2)
733 {
734 const robj *o1 = key1, *o2 = key2;
735 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
736 }
737
738 static unsigned int dictObjHash(const void *key) {
739 const robj *o = key;
740 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
741 }
742
743 static int dictEncObjKeyCompare(void *privdata, const void *key1,
744 const void *key2)
745 {
746 const robj *o1 = key1, *o2 = key2;
747
748 if (o1->encoding == REDIS_ENCODING_RAW &&
749 o2->encoding == REDIS_ENCODING_RAW)
750 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
751 else {
752 robj *dec1, *dec2;
753 int cmp;
754
755 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
756 getDecodedObject(o1) : (robj*)o1;
757 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
758 getDecodedObject(o2) : (robj*)o2;
759 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
760 if (dec1 != o1) decrRefCount(dec1);
761 if (dec2 != o2) decrRefCount(dec2);
762 return cmp;
763 }
764 }
765
766 static unsigned int dictEncObjHash(const void *key) {
767 const robj *o = key;
768
769 if (o->encoding == REDIS_ENCODING_RAW)
770 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
771 else {
772 robj *dec = getDecodedObject(o);
773 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
774 decrRefCount(dec);
775 return hash;
776 }
777 }
778
779 static dictType setDictType = {
780 dictEncObjHash, /* hash function */
781 NULL, /* key dup */
782 NULL, /* val dup */
783 dictEncObjKeyCompare, /* key compare */
784 dictRedisObjectDestructor, /* key destructor */
785 NULL /* val destructor */
786 };
787
788 static dictType zsetDictType = {
789 dictEncObjHash, /* hash function */
790 NULL, /* key dup */
791 NULL, /* val dup */
792 dictEncObjKeyCompare, /* key compare */
793 dictRedisObjectDestructor, /* key destructor */
794 dictVanillaFree /* val destructor */
795 };
796
797 static dictType hashDictType = {
798 dictObjHash, /* hash function */
799 NULL, /* key dup */
800 NULL, /* val dup */
801 dictObjKeyCompare, /* key compare */
802 dictRedisObjectDestructor, /* key destructor */
803 dictRedisObjectDestructor /* val destructor */
804 };
805
806 /* ========================= Random utility functions ======================= */
807
808 /* Redis generally does not try to recover from out of memory conditions
809 * when allocating objects or strings, it is not clear if it will be possible
810 * to report this condition to the client since the networking layer itself
811 * is based on heap allocation for send buffers, so we simply abort.
812 * At least the code will be simpler to read... */
813 static void oom(const char *msg) {
814 fprintf(stderr, "%s: Out of memory\n",msg);
815 fflush(stderr);
816 sleep(1);
817 abort();
818 }
819
820 /* ====================== Redis server networking stuff ===================== */
821 static void closeTimedoutClients(void) {
822 redisClient *c;
823 listNode *ln;
824 time_t now = time(NULL);
825
826 listRewind(server.clients);
827 while ((ln = listYield(server.clients)) != NULL) {
828 c = listNodeValue(ln);
829 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
830 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
831 (now - c->lastinteraction > server.maxidletime)) {
832 redisLog(REDIS_DEBUG,"Closing idle client");
833 freeClient(c);
834 }
835 }
836 }
837
838 static int htNeedsResize(dict *dict) {
839 long long size, used;
840
841 size = dictSlots(dict);
842 used = dictSize(dict);
843 return (size && used && size > DICT_HT_INITIAL_SIZE &&
844 (used*100/size < REDIS_HT_MINFILL));
845 }
846
847 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
848 * we resize the hash table to save memory */
849 static void tryResizeHashTables(void) {
850 int j;
851
852 for (j = 0; j < server.dbnum; j++) {
853 if (htNeedsResize(server.db[j].dict)) {
854 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
855 dictResize(server.db[j].dict);
856 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
857 }
858 if (htNeedsResize(server.db[j].expires))
859 dictResize(server.db[j].expires);
860 }
861 }
862
863 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
864 int j, loops = server.cronloops++;
865 REDIS_NOTUSED(eventLoop);
866 REDIS_NOTUSED(id);
867 REDIS_NOTUSED(clientData);
868
869 /* Update the global state with the amount of used memory */
870 server.usedmemory = zmalloc_used_memory();
871
872 /* Show some info about non-empty databases */
873 for (j = 0; j < server.dbnum; j++) {
874 long long size, used, vkeys;
875
876 size = dictSlots(server.db[j].dict);
877 used = dictSize(server.db[j].dict);
878 vkeys = dictSize(server.db[j].expires);
879 if (!(loops % 5) && (used || vkeys)) {
880 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
881 /* dictPrintStats(server.dict); */
882 }
883 }
884
885 /* We don't want to resize the hash tables while a bacground saving
886 * is in progress: the saving child is created using fork() that is
887 * implemented with a copy-on-write semantic in most modern systems, so
888 * if we resize the HT while there is the saving child at work actually
889 * a lot of memory movements in the parent will cause a lot of pages
890 * copied. */
891 if (!server.bgsaveinprogress) tryResizeHashTables();
892
893 /* Show information about connected clients */
894 if (!(loops % 5)) {
895 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
896 listLength(server.clients)-listLength(server.slaves),
897 listLength(server.slaves),
898 server.usedmemory,
899 dictSize(server.sharingpool));
900 }
901
902 /* Close connections of timedout clients */
903 if (server.maxidletime && !(loops % 10))
904 closeTimedoutClients();
905
906 /* Check if a background saving in progress terminated */
907 if (server.bgsaveinprogress) {
908 int statloc;
909 if (wait4(-1,&statloc,WNOHANG,NULL)) {
910 int exitcode = WEXITSTATUS(statloc);
911 int bysignal = WIFSIGNALED(statloc);
912
913 if (!bysignal && exitcode == 0) {
914 redisLog(REDIS_NOTICE,
915 "Background saving terminated with success");
916 server.dirty = 0;
917 server.lastsave = time(NULL);
918 } else if (!bysignal && exitcode != 0) {
919 redisLog(REDIS_WARNING, "Background saving error");
920 } else {
921 redisLog(REDIS_WARNING,
922 "Background saving terminated by signal");
923 rdbRemoveTempFile(server.bgsavechildpid);
924 }
925 server.bgsaveinprogress = 0;
926 server.bgsavechildpid = -1;
927 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
928 }
929 } else {
930 /* If there is not a background saving in progress check if
931 * we have to save now */
932 time_t now = time(NULL);
933 for (j = 0; j < server.saveparamslen; j++) {
934 struct saveparam *sp = server.saveparams+j;
935
936 if (server.dirty >= sp->changes &&
937 now-server.lastsave > sp->seconds) {
938 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
939 sp->changes, sp->seconds);
940 rdbSaveBackground(server.dbfilename);
941 break;
942 }
943 }
944 }
945
946 /* Try to expire a few timed out keys. The algorithm used is adaptive and
947 * will use few CPU cycles if there are few expiring keys, otherwise
948 * it will get more aggressive to avoid that too much memory is used by
949 * keys that can be removed from the keyspace. */
950 for (j = 0; j < server.dbnum; j++) {
951 int expired;
952 redisDb *db = server.db+j;
953
954 /* Continue to expire if at the end of the cycle more than 25%
955 * of the keys were expired. */
956 do {
957 int num = dictSize(db->expires);
958 time_t now = time(NULL);
959
960 expired = 0;
961 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
962 num = REDIS_EXPIRELOOKUPS_PER_CRON;
963 while (num--) {
964 dictEntry *de;
965 time_t t;
966
967 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
968 t = (time_t) dictGetEntryVal(de);
969 if (now > t) {
970 deleteKey(db,dictGetEntryKey(de));
971 expired++;
972 }
973 }
974 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
975 }
976
977 /* Check if we should connect to a MASTER */
978 if (server.replstate == REDIS_REPL_CONNECT) {
979 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
980 if (syncWithMaster() == REDIS_OK) {
981 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
982 }
983 }
984 return 1000;
985 }
986
987 static void createSharedObjects(void) {
988 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
989 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
990 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
991 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
992 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
993 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
994 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
995 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
996 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
997 /* no such key */
998 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
999 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1000 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1001 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1002 "-ERR no such key\r\n"));
1003 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1004 "-ERR syntax error\r\n"));
1005 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1006 "-ERR source and destination objects are the same\r\n"));
1007 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1008 "-ERR index out of range\r\n"));
1009 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1010 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1011 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1012 shared.select0 = createStringObject("select 0\r\n",10);
1013 shared.select1 = createStringObject("select 1\r\n",10);
1014 shared.select2 = createStringObject("select 2\r\n",10);
1015 shared.select3 = createStringObject("select 3\r\n",10);
1016 shared.select4 = createStringObject("select 4\r\n",10);
1017 shared.select5 = createStringObject("select 5\r\n",10);
1018 shared.select6 = createStringObject("select 6\r\n",10);
1019 shared.select7 = createStringObject("select 7\r\n",10);
1020 shared.select8 = createStringObject("select 8\r\n",10);
1021 shared.select9 = createStringObject("select 9\r\n",10);
1022 }
1023
1024 static void appendServerSaveParams(time_t seconds, int changes) {
1025 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1026 server.saveparams[server.saveparamslen].seconds = seconds;
1027 server.saveparams[server.saveparamslen].changes = changes;
1028 server.saveparamslen++;
1029 }
1030
1031 static void resetServerSaveParams() {
1032 zfree(server.saveparams);
1033 server.saveparams = NULL;
1034 server.saveparamslen = 0;
1035 }
1036
1037 static void initServerConfig() {
1038 server.dbnum = REDIS_DEFAULT_DBNUM;
1039 server.port = REDIS_SERVERPORT;
1040 server.verbosity = REDIS_DEBUG;
1041 server.maxidletime = REDIS_MAXIDLETIME;
1042 server.saveparams = NULL;
1043 server.logfile = NULL; /* NULL = log on standard output */
1044 server.bindaddr = NULL;
1045 server.glueoutputbuf = 1;
1046 server.daemonize = 0;
1047 server.appendonly = 0;
1048 server.appendfsync = APPENDFSYNC_ALWAYS;
1049 server.lastfsync = time(NULL);
1050 server.appendfd = -1;
1051 server.appendseldb = -1; /* Make sure the first time will not match */
1052 server.pidfile = "/var/run/redis.pid";
1053 server.dbfilename = "dump.rdb";
1054 server.appendfilename = "appendonly.log";
1055 server.requirepass = NULL;
1056 server.shareobjects = 0;
1057 server.sharingpoolsize = 1024;
1058 server.maxclients = 0;
1059 server.maxmemory = 0;
1060 resetServerSaveParams();
1061
1062 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1063 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1064 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1065 /* Replication related */
1066 server.isslave = 0;
1067 server.masterauth = NULL;
1068 server.masterhost = NULL;
1069 server.masterport = 6379;
1070 server.master = NULL;
1071 server.replstate = REDIS_REPL_NONE;
1072
1073 /* Double constants initialization */
1074 R_Zero = 0.0;
1075 R_PosInf = 1.0/R_Zero;
1076 R_NegInf = -1.0/R_Zero;
1077 R_Nan = R_Zero/R_Zero;
1078 }
1079
1080 static void initServer() {
1081 int j;
1082
1083 signal(SIGHUP, SIG_IGN);
1084 signal(SIGPIPE, SIG_IGN);
1085 setupSigSegvAction();
1086
1087 server.clients = listCreate();
1088 server.slaves = listCreate();
1089 server.monitors = listCreate();
1090 server.objfreelist = listCreate();
1091 createSharedObjects();
1092 server.el = aeCreateEventLoop();
1093 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1094 server.sharingpool = dictCreate(&setDictType,NULL);
1095 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1096 if (server.fd == -1) {
1097 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1098 exit(1);
1099 }
1100 for (j = 0; j < server.dbnum; j++) {
1101 server.db[j].dict = dictCreate(&hashDictType,NULL);
1102 server.db[j].expires = dictCreate(&setDictType,NULL);
1103 server.db[j].id = j;
1104 }
1105 server.cronloops = 0;
1106 server.bgsaveinprogress = 0;
1107 server.bgsavechildpid = -1;
1108 server.lastsave = time(NULL);
1109 server.dirty = 0;
1110 server.usedmemory = 0;
1111 server.stat_numcommands = 0;
1112 server.stat_numconnections = 0;
1113 server.stat_starttime = time(NULL);
1114 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1115
1116 if (server.appendonly) {
1117 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1118 if (server.appendfd == -1) {
1119 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1120 strerror(errno));
1121 exit(1);
1122 }
1123 }
1124 }
1125
1126 /* Empty the whole database */
1127 static long long emptyDb() {
1128 int j;
1129 long long removed = 0;
1130
1131 for (j = 0; j < server.dbnum; j++) {
1132 removed += dictSize(server.db[j].dict);
1133 dictEmpty(server.db[j].dict);
1134 dictEmpty(server.db[j].expires);
1135 }
1136 return removed;
1137 }
1138
1139 static int yesnotoi(char *s) {
1140 if (!strcasecmp(s,"yes")) return 1;
1141 else if (!strcasecmp(s,"no")) return 0;
1142 else return -1;
1143 }
1144
1145 /* I agree, this is a very rudimental way to load a configuration...
1146 will improve later if the config gets more complex */
1147 static void loadServerConfig(char *filename) {
1148 FILE *fp;
1149 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1150 int linenum = 0;
1151 sds line = NULL;
1152
1153 if (filename[0] == '-' && filename[1] == '\0')
1154 fp = stdin;
1155 else {
1156 if ((fp = fopen(filename,"r")) == NULL) {
1157 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1158 exit(1);
1159 }
1160 }
1161
1162 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1163 sds *argv;
1164 int argc, j;
1165
1166 linenum++;
1167 line = sdsnew(buf);
1168 line = sdstrim(line," \t\r\n");
1169
1170 /* Skip comments and blank lines*/
1171 if (line[0] == '#' || line[0] == '\0') {
1172 sdsfree(line);
1173 continue;
1174 }
1175
1176 /* Split into arguments */
1177 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1178 sdstolower(argv[0]);
1179
1180 /* Execute config directives */
1181 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1182 server.maxidletime = atoi(argv[1]);
1183 if (server.maxidletime < 0) {
1184 err = "Invalid timeout value"; goto loaderr;
1185 }
1186 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1187 server.port = atoi(argv[1]);
1188 if (server.port < 1 || server.port > 65535) {
1189 err = "Invalid port"; goto loaderr;
1190 }
1191 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1192 server.bindaddr = zstrdup(argv[1]);
1193 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1194 int seconds = atoi(argv[1]);
1195 int changes = atoi(argv[2]);
1196 if (seconds < 1 || changes < 0) {
1197 err = "Invalid save parameters"; goto loaderr;
1198 }
1199 appendServerSaveParams(seconds,changes);
1200 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1201 if (chdir(argv[1]) == -1) {
1202 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1203 argv[1], strerror(errno));
1204 exit(1);
1205 }
1206 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1207 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1208 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1209 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1210 else {
1211 err = "Invalid log level. Must be one of debug, notice, warning";
1212 goto loaderr;
1213 }
1214 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1215 FILE *logfp;
1216
1217 server.logfile = zstrdup(argv[1]);
1218 if (!strcasecmp(server.logfile,"stdout")) {
1219 zfree(server.logfile);
1220 server.logfile = NULL;
1221 }
1222 if (server.logfile) {
1223 /* Test if we are able to open the file. The server will not
1224 * be able to abort just for this problem later... */
1225 logfp = fopen(server.logfile,"a");
1226 if (logfp == NULL) {
1227 err = sdscatprintf(sdsempty(),
1228 "Can't open the log file: %s", strerror(errno));
1229 goto loaderr;
1230 }
1231 fclose(logfp);
1232 }
1233 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1234 server.dbnum = atoi(argv[1]);
1235 if (server.dbnum < 1) {
1236 err = "Invalid number of databases"; goto loaderr;
1237 }
1238 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1239 server.maxclients = atoi(argv[1]);
1240 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1241 server.maxmemory = strtoll(argv[1], NULL, 10);
1242 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1243 server.masterhost = sdsnew(argv[1]);
1244 server.masterport = atoi(argv[2]);
1245 server.replstate = REDIS_REPL_CONNECT;
1246 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1247 server.masterauth = zstrdup(argv[1]);
1248 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1249 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1250 err = "argument must be 'yes' or 'no'"; goto loaderr;
1251 }
1252 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1253 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1254 err = "argument must be 'yes' or 'no'"; goto loaderr;
1255 }
1256 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1257 server.sharingpoolsize = atoi(argv[1]);
1258 if (server.sharingpoolsize < 1) {
1259 err = "invalid object sharing pool size"; goto loaderr;
1260 }
1261 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1262 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1263 err = "argument must be 'yes' or 'no'"; goto loaderr;
1264 }
1265 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1266 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1267 err = "argument must be 'yes' or 'no'"; goto loaderr;
1268 }
1269 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1270 if (!strcasecmp(argv[1],"no")) {
1271 server.appendfsync = APPENDFSYNC_NO;
1272 } else if (!strcasecmp(argv[1],"always")) {
1273 server.appendfsync = APPENDFSYNC_ALWAYS;
1274 } else if (!strcasecmp(argv[1],"everysec")) {
1275 server.appendfsync = APPENDFSYNC_EVERYSEC;
1276 } else {
1277 err = "argument must be 'no', 'always' or 'everysec'";
1278 goto loaderr;
1279 }
1280 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1281 server.requirepass = zstrdup(argv[1]);
1282 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1283 server.pidfile = zstrdup(argv[1]);
1284 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1285 server.dbfilename = zstrdup(argv[1]);
1286 } else {
1287 err = "Bad directive or wrong number of arguments"; goto loaderr;
1288 }
1289 for (j = 0; j < argc; j++)
1290 sdsfree(argv[j]);
1291 zfree(argv);
1292 sdsfree(line);
1293 }
1294 if (fp != stdin) fclose(fp);
1295 return;
1296
1297 loaderr:
1298 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1299 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1300 fprintf(stderr, ">>> '%s'\n", line);
1301 fprintf(stderr, "%s\n", err);
1302 exit(1);
1303 }
1304
1305 static void freeClientArgv(redisClient *c) {
1306 int j;
1307
1308 for (j = 0; j < c->argc; j++)
1309 decrRefCount(c->argv[j]);
1310 for (j = 0; j < c->mbargc; j++)
1311 decrRefCount(c->mbargv[j]);
1312 c->argc = 0;
1313 c->mbargc = 0;
1314 }
1315
1316 static void freeClient(redisClient *c) {
1317 listNode *ln;
1318
1319 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1320 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1321 sdsfree(c->querybuf);
1322 listRelease(c->reply);
1323 freeClientArgv(c);
1324 close(c->fd);
1325 ln = listSearchKey(server.clients,c);
1326 assert(ln != NULL);
1327 listDelNode(server.clients,ln);
1328 if (c->flags & REDIS_SLAVE) {
1329 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1330 close(c->repldbfd);
1331 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1332 ln = listSearchKey(l,c);
1333 assert(ln != NULL);
1334 listDelNode(l,ln);
1335 }
1336 if (c->flags & REDIS_MASTER) {
1337 server.master = NULL;
1338 server.replstate = REDIS_REPL_CONNECT;
1339 }
1340 zfree(c->argv);
1341 zfree(c->mbargv);
1342 zfree(c);
1343 }
1344
1345 static void glueReplyBuffersIfNeeded(redisClient *c) {
1346 int totlen = 0;
1347 listNode *ln;
1348 robj *o;
1349
1350 listRewind(c->reply);
1351 while((ln = listYield(c->reply))) {
1352 o = ln->value;
1353 totlen += sdslen(o->ptr);
1354 /* This optimization makes more sense if we don't have to copy
1355 * too much data */
1356 if (totlen > 1024) return;
1357 }
1358 if (totlen > 0) {
1359 char buf[1024];
1360 int copylen = 0;
1361
1362 listRewind(c->reply);
1363 while((ln = listYield(c->reply))) {
1364 o = ln->value;
1365 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1366 copylen += sdslen(o->ptr);
1367 listDelNode(c->reply,ln);
1368 }
1369 /* Now the output buffer is empty, add the new single element */
1370 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1371 listAddNodeTail(c->reply,o);
1372 }
1373 }
1374
1375 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1376 redisClient *c = privdata;
1377 int nwritten = 0, totwritten = 0, objlen;
1378 robj *o;
1379 REDIS_NOTUSED(el);
1380 REDIS_NOTUSED(mask);
1381
1382 if (server.glueoutputbuf && listLength(c->reply) > 1)
1383 glueReplyBuffersIfNeeded(c);
1384 while(listLength(c->reply)) {
1385 o = listNodeValue(listFirst(c->reply));
1386 objlen = sdslen(o->ptr);
1387
1388 if (objlen == 0) {
1389 listDelNode(c->reply,listFirst(c->reply));
1390 continue;
1391 }
1392
1393 if (c->flags & REDIS_MASTER) {
1394 /* Don't reply to a master */
1395 nwritten = objlen - c->sentlen;
1396 } else {
1397 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1398 if (nwritten <= 0) break;
1399 }
1400 c->sentlen += nwritten;
1401 totwritten += nwritten;
1402 /* If we fully sent the object on head go to the next one */
1403 if (c->sentlen == objlen) {
1404 listDelNode(c->reply,listFirst(c->reply));
1405 c->sentlen = 0;
1406 }
1407 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1408 * bytes, in a single threaded server it's a good idea to serve
1409 * other clients as well, even if a very large request comes from
1410 * super fast link that is always able to accept data (in real world
1411 * scenario think about 'KEYS *' against the loopback interfae) */
1412 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1413 }
1414 if (nwritten == -1) {
1415 if (errno == EAGAIN) {
1416 nwritten = 0;
1417 } else {
1418 redisLog(REDIS_DEBUG,
1419 "Error writing to client: %s", strerror(errno));
1420 freeClient(c);
1421 return;
1422 }
1423 }
1424 if (totwritten > 0) c->lastinteraction = time(NULL);
1425 if (listLength(c->reply) == 0) {
1426 c->sentlen = 0;
1427 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1428 }
1429 }
1430
1431 static struct redisCommand *lookupCommand(char *name) {
1432 int j = 0;
1433 while(cmdTable[j].name != NULL) {
1434 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1435 j++;
1436 }
1437 return NULL;
1438 }
1439
1440 /* resetClient prepare the client to process the next command */
1441 static void resetClient(redisClient *c) {
1442 freeClientArgv(c);
1443 c->bulklen = -1;
1444 c->multibulk = 0;
1445 }
1446
1447 /* If this function gets called we already read a whole
1448 * command, argments are in the client argv/argc fields.
1449 * processCommand() execute the command or prepare the
1450 * server for a bulk read from the client.
1451 *
1452 * If 1 is returned the client is still alive and valid and
1453 * and other operations can be performed by the caller. Otherwise
1454 * if 0 is returned the client was destroied (i.e. after QUIT). */
1455 static int processCommand(redisClient *c) {
1456 struct redisCommand *cmd;
1457 long long dirty;
1458
1459 /* Free some memory if needed (maxmemory setting) */
1460 if (server.maxmemory) freeMemoryIfNeeded();
1461
1462 /* Handle the multi bulk command type. This is an alternative protocol
1463 * supported by Redis in order to receive commands that are composed of
1464 * multiple binary-safe "bulk" arguments. The latency of processing is
1465 * a bit higher but this allows things like multi-sets, so if this
1466 * protocol is used only for MSET and similar commands this is a big win. */
1467 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1468 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1469 if (c->multibulk <= 0) {
1470 resetClient(c);
1471 return 1;
1472 } else {
1473 decrRefCount(c->argv[c->argc-1]);
1474 c->argc--;
1475 return 1;
1476 }
1477 } else if (c->multibulk) {
1478 if (c->bulklen == -1) {
1479 if (((char*)c->argv[0]->ptr)[0] != '$') {
1480 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1481 resetClient(c);
1482 return 1;
1483 } else {
1484 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1485 decrRefCount(c->argv[0]);
1486 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1487 c->argc--;
1488 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1489 resetClient(c);
1490 return 1;
1491 }
1492 c->argc--;
1493 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1494 return 1;
1495 }
1496 } else {
1497 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1498 c->mbargv[c->mbargc] = c->argv[0];
1499 c->mbargc++;
1500 c->argc--;
1501 c->multibulk--;
1502 if (c->multibulk == 0) {
1503 robj **auxargv;
1504 int auxargc;
1505
1506 /* Here we need to swap the multi-bulk argc/argv with the
1507 * normal argc/argv of the client structure. */
1508 auxargv = c->argv;
1509 c->argv = c->mbargv;
1510 c->mbargv = auxargv;
1511
1512 auxargc = c->argc;
1513 c->argc = c->mbargc;
1514 c->mbargc = auxargc;
1515
1516 /* We need to set bulklen to something different than -1
1517 * in order for the code below to process the command without
1518 * to try to read the last argument of a bulk command as
1519 * a special argument. */
1520 c->bulklen = 0;
1521 /* continue below and process the command */
1522 } else {
1523 c->bulklen = -1;
1524 return 1;
1525 }
1526 }
1527 }
1528 /* -- end of multi bulk commands processing -- */
1529
1530 /* The QUIT command is handled as a special case. Normal command
1531 * procs are unable to close the client connection safely */
1532 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1533 freeClient(c);
1534 return 0;
1535 }
1536 cmd = lookupCommand(c->argv[0]->ptr);
1537 if (!cmd) {
1538 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1539 resetClient(c);
1540 return 1;
1541 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1542 (c->argc < -cmd->arity)) {
1543 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1544 resetClient(c);
1545 return 1;
1546 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1547 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1548 resetClient(c);
1549 return 1;
1550 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1551 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1552
1553 decrRefCount(c->argv[c->argc-1]);
1554 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1555 c->argc--;
1556 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1557 resetClient(c);
1558 return 1;
1559 }
1560 c->argc--;
1561 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1562 /* It is possible that the bulk read is already in the
1563 * buffer. Check this condition and handle it accordingly.
1564 * This is just a fast path, alternative to call processInputBuffer().
1565 * It's a good idea since the code is small and this condition
1566 * happens most of the times. */
1567 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1568 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1569 c->argc++;
1570 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1571 } else {
1572 return 1;
1573 }
1574 }
1575 /* Let's try to share objects on the command arguments vector */
1576 if (server.shareobjects) {
1577 int j;
1578 for(j = 1; j < c->argc; j++)
1579 c->argv[j] = tryObjectSharing(c->argv[j]);
1580 }
1581 /* Let's try to encode the bulk object to save space. */
1582 if (cmd->flags & REDIS_CMD_BULK)
1583 tryObjectEncoding(c->argv[c->argc-1]);
1584
1585 /* Check if the user is authenticated */
1586 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1587 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1588 resetClient(c);
1589 return 1;
1590 }
1591
1592 /* Exec the command */
1593 dirty = server.dirty;
1594 cmd->proc(c);
1595 if (server.appendonly && server.dirty-dirty)
1596 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1597 if (server.dirty-dirty && listLength(server.slaves))
1598 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1599 if (listLength(server.monitors))
1600 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1601 server.stat_numcommands++;
1602
1603 /* Prepare the client for the next command */
1604 if (c->flags & REDIS_CLOSE) {
1605 freeClient(c);
1606 return 0;
1607 }
1608 resetClient(c);
1609 return 1;
1610 }
1611
1612 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1613 listNode *ln;
1614 int outc = 0, j;
1615 robj **outv;
1616 /* (args*2)+1 is enough room for args, spaces, newlines */
1617 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1618
1619 if (argc <= REDIS_STATIC_ARGS) {
1620 outv = static_outv;
1621 } else {
1622 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1623 }
1624
1625 for (j = 0; j < argc; j++) {
1626 if (j != 0) outv[outc++] = shared.space;
1627 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1628 robj *lenobj;
1629
1630 lenobj = createObject(REDIS_STRING,
1631 sdscatprintf(sdsempty(),"%d\r\n",
1632 stringObjectLen(argv[j])));
1633 lenobj->refcount = 0;
1634 outv[outc++] = lenobj;
1635 }
1636 outv[outc++] = argv[j];
1637 }
1638 outv[outc++] = shared.crlf;
1639
1640 /* Increment all the refcounts at start and decrement at end in order to
1641 * be sure to free objects if there is no slave in a replication state
1642 * able to be feed with commands */
1643 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1644 listRewind(slaves);
1645 while((ln = listYield(slaves))) {
1646 redisClient *slave = ln->value;
1647
1648 /* Don't feed slaves that are still waiting for BGSAVE to start */
1649 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1650
1651 /* Feed all the other slaves, MONITORs and so on */
1652 if (slave->slaveseldb != dictid) {
1653 robj *selectcmd;
1654
1655 switch(dictid) {
1656 case 0: selectcmd = shared.select0; break;
1657 case 1: selectcmd = shared.select1; break;
1658 case 2: selectcmd = shared.select2; break;
1659 case 3: selectcmd = shared.select3; break;
1660 case 4: selectcmd = shared.select4; break;
1661 case 5: selectcmd = shared.select5; break;
1662 case 6: selectcmd = shared.select6; break;
1663 case 7: selectcmd = shared.select7; break;
1664 case 8: selectcmd = shared.select8; break;
1665 case 9: selectcmd = shared.select9; break;
1666 default:
1667 selectcmd = createObject(REDIS_STRING,
1668 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1669 selectcmd->refcount = 0;
1670 break;
1671 }
1672 addReply(slave,selectcmd);
1673 slave->slaveseldb = dictid;
1674 }
1675 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1676 }
1677 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1678 if (outv != static_outv) zfree(outv);
1679 }
1680
1681 static void processInputBuffer(redisClient *c) {
1682 again:
1683 if (c->bulklen == -1) {
1684 /* Read the first line of the query */
1685 char *p = strchr(c->querybuf,'\n');
1686 size_t querylen;
1687
1688 if (p) {
1689 sds query, *argv;
1690 int argc, j;
1691
1692 query = c->querybuf;
1693 c->querybuf = sdsempty();
1694 querylen = 1+(p-(query));
1695 if (sdslen(query) > querylen) {
1696 /* leave data after the first line of the query in the buffer */
1697 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1698 }
1699 *p = '\0'; /* remove "\n" */
1700 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1701 sdsupdatelen(query);
1702
1703 /* Now we can split the query in arguments */
1704 if (sdslen(query) == 0) {
1705 /* Ignore empty query */
1706 sdsfree(query);
1707 return;
1708 }
1709 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1710 sdsfree(query);
1711
1712 if (c->argv) zfree(c->argv);
1713 c->argv = zmalloc(sizeof(robj*)*argc);
1714
1715 for (j = 0; j < argc; j++) {
1716 if (sdslen(argv[j])) {
1717 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1718 c->argc++;
1719 } else {
1720 sdsfree(argv[j]);
1721 }
1722 }
1723 zfree(argv);
1724 /* Execute the command. If the client is still valid
1725 * after processCommand() return and there is something
1726 * on the query buffer try to process the next command. */
1727 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1728 return;
1729 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1730 redisLog(REDIS_DEBUG, "Client protocol error");
1731 freeClient(c);
1732 return;
1733 }
1734 } else {
1735 /* Bulk read handling. Note that if we are at this point
1736 the client already sent a command terminated with a newline,
1737 we are reading the bulk data that is actually the last
1738 argument of the command. */
1739 int qbl = sdslen(c->querybuf);
1740
1741 if (c->bulklen <= qbl) {
1742 /* Copy everything but the final CRLF as final argument */
1743 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1744 c->argc++;
1745 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1746 /* Process the command. If the client is still valid after
1747 * the processing and there is more data in the buffer
1748 * try to parse it. */
1749 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1750 return;
1751 }
1752 }
1753 }
1754
1755 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1756 redisClient *c = (redisClient*) privdata;
1757 char buf[REDIS_IOBUF_LEN];
1758 int nread;
1759 REDIS_NOTUSED(el);
1760 REDIS_NOTUSED(mask);
1761
1762 nread = read(fd, buf, REDIS_IOBUF_LEN);
1763 if (nread == -1) {
1764 if (errno == EAGAIN) {
1765 nread = 0;
1766 } else {
1767 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1768 freeClient(c);
1769 return;
1770 }
1771 } else if (nread == 0) {
1772 redisLog(REDIS_DEBUG, "Client closed connection");
1773 freeClient(c);
1774 return;
1775 }
1776 if (nread) {
1777 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1778 c->lastinteraction = time(NULL);
1779 } else {
1780 return;
1781 }
1782 processInputBuffer(c);
1783 }
1784
1785 static int selectDb(redisClient *c, int id) {
1786 if (id < 0 || id >= server.dbnum)
1787 return REDIS_ERR;
1788 c->db = &server.db[id];
1789 return REDIS_OK;
1790 }
1791
1792 static void *dupClientReplyValue(void *o) {
1793 incrRefCount((robj*)o);
1794 return 0;
1795 }
1796
1797 static redisClient *createClient(int fd) {
1798 redisClient *c = zmalloc(sizeof(*c));
1799
1800 anetNonBlock(NULL,fd);
1801 anetTcpNoDelay(NULL,fd);
1802 if (!c) return NULL;
1803 selectDb(c,0);
1804 c->fd = fd;
1805 c->querybuf = sdsempty();
1806 c->argc = 0;
1807 c->argv = NULL;
1808 c->bulklen = -1;
1809 c->multibulk = 0;
1810 c->mbargc = 0;
1811 c->mbargv = NULL;
1812 c->sentlen = 0;
1813 c->flags = 0;
1814 c->lastinteraction = time(NULL);
1815 c->authenticated = 0;
1816 c->replstate = REDIS_REPL_NONE;
1817 c->reply = listCreate();
1818 listSetFreeMethod(c->reply,decrRefCount);
1819 listSetDupMethod(c->reply,dupClientReplyValue);
1820 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1821 readQueryFromClient, c, NULL) == AE_ERR) {
1822 freeClient(c);
1823 return NULL;
1824 }
1825 listAddNodeTail(server.clients,c);
1826 return c;
1827 }
1828
1829 static void addReply(redisClient *c, robj *obj) {
1830 if (listLength(c->reply) == 0 &&
1831 (c->replstate == REDIS_REPL_NONE ||
1832 c->replstate == REDIS_REPL_ONLINE) &&
1833 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1834 sendReplyToClient, c, NULL) == AE_ERR) return;
1835 if (obj->encoding != REDIS_ENCODING_RAW) {
1836 obj = getDecodedObject(obj);
1837 } else {
1838 incrRefCount(obj);
1839 }
1840 listAddNodeTail(c->reply,obj);
1841 }
1842
1843 static void addReplySds(redisClient *c, sds s) {
1844 robj *o = createObject(REDIS_STRING,s);
1845 addReply(c,o);
1846 decrRefCount(o);
1847 }
1848
1849 static void addReplyBulkLen(redisClient *c, robj *obj) {
1850 size_t len;
1851
1852 if (obj->encoding == REDIS_ENCODING_RAW) {
1853 len = sdslen(obj->ptr);
1854 } else {
1855 long n = (long)obj->ptr;
1856
1857 len = 1;
1858 if (n < 0) {
1859 len++;
1860 n = -n;
1861 }
1862 while((n = n/10) != 0) {
1863 len++;
1864 }
1865 }
1866 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1867 }
1868
1869 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1870 int cport, cfd;
1871 char cip[128];
1872 redisClient *c;
1873 REDIS_NOTUSED(el);
1874 REDIS_NOTUSED(mask);
1875 REDIS_NOTUSED(privdata);
1876
1877 cfd = anetAccept(server.neterr, fd, cip, &cport);
1878 if (cfd == AE_ERR) {
1879 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1880 return;
1881 }
1882 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1883 if ((c = createClient(cfd)) == NULL) {
1884 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1885 close(cfd); /* May be already closed, just ingore errors */
1886 return;
1887 }
1888 /* If maxclient directive is set and this is one client more... close the
1889 * connection. Note that we create the client instead to check before
1890 * for this condition, since now the socket is already set in nonblocking
1891 * mode and we can send an error for free using the Kernel I/O */
1892 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1893 char *err = "-ERR max number of clients reached\r\n";
1894
1895 /* That's a best effort error message, don't check write errors */
1896 (void) write(c->fd,err,strlen(err));
1897 freeClient(c);
1898 return;
1899 }
1900 server.stat_numconnections++;
1901 }
1902
1903 /* ======================= Redis objects implementation ===================== */
1904
1905 static robj *createObject(int type, void *ptr) {
1906 robj *o;
1907
1908 if (listLength(server.objfreelist)) {
1909 listNode *head = listFirst(server.objfreelist);
1910 o = listNodeValue(head);
1911 listDelNode(server.objfreelist,head);
1912 } else {
1913 o = zmalloc(sizeof(*o));
1914 }
1915 o->type = type;
1916 o->encoding = REDIS_ENCODING_RAW;
1917 o->ptr = ptr;
1918 o->refcount = 1;
1919 return o;
1920 }
1921
1922 static robj *createStringObject(char *ptr, size_t len) {
1923 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1924 }
1925
1926 static robj *createListObject(void) {
1927 list *l = listCreate();
1928
1929 listSetFreeMethod(l,decrRefCount);
1930 return createObject(REDIS_LIST,l);
1931 }
1932
1933 static robj *createSetObject(void) {
1934 dict *d = dictCreate(&setDictType,NULL);
1935 return createObject(REDIS_SET,d);
1936 }
1937
1938 static robj *createZsetObject(void) {
1939 zset *zs = zmalloc(sizeof(*zs));
1940
1941 zs->dict = dictCreate(&zsetDictType,NULL);
1942 zs->zsl = zslCreate();
1943 return createObject(REDIS_ZSET,zs);
1944 }
1945
1946 static void freeStringObject(robj *o) {
1947 if (o->encoding == REDIS_ENCODING_RAW) {
1948 sdsfree(o->ptr);
1949 }
1950 }
1951
1952 static void freeListObject(robj *o) {
1953 listRelease((list*) o->ptr);
1954 }
1955
1956 static void freeSetObject(robj *o) {
1957 dictRelease((dict*) o->ptr);
1958 }
1959
1960 static void freeZsetObject(robj *o) {
1961 zset *zs = o->ptr;
1962
1963 dictRelease(zs->dict);
1964 zslFree(zs->zsl);
1965 zfree(zs);
1966 }
1967
1968 static void freeHashObject(robj *o) {
1969 dictRelease((dict*) o->ptr);
1970 }
1971
1972 static void incrRefCount(robj *o) {
1973 o->refcount++;
1974 #ifdef DEBUG_REFCOUNT
1975 if (o->type == REDIS_STRING)
1976 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1977 #endif
1978 }
1979
1980 static void decrRefCount(void *obj) {
1981 robj *o = obj;
1982
1983 #ifdef DEBUG_REFCOUNT
1984 if (o->type == REDIS_STRING)
1985 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1986 #endif
1987 if (--(o->refcount) == 0) {
1988 switch(o->type) {
1989 case REDIS_STRING: freeStringObject(o); break;
1990 case REDIS_LIST: freeListObject(o); break;
1991 case REDIS_SET: freeSetObject(o); break;
1992 case REDIS_ZSET: freeZsetObject(o); break;
1993 case REDIS_HASH: freeHashObject(o); break;
1994 default: assert(0 != 0); break;
1995 }
1996 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1997 !listAddNodeHead(server.objfreelist,o))
1998 zfree(o);
1999 }
2000 }
2001
2002 static robj *lookupKey(redisDb *db, robj *key) {
2003 dictEntry *de = dictFind(db->dict,key);
2004 return de ? dictGetEntryVal(de) : NULL;
2005 }
2006
2007 static robj *lookupKeyRead(redisDb *db, robj *key) {
2008 expireIfNeeded(db,key);
2009 return lookupKey(db,key);
2010 }
2011
2012 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2013 deleteIfVolatile(db,key);
2014 return lookupKey(db,key);
2015 }
2016
2017 static int deleteKey(redisDb *db, robj *key) {
2018 int retval;
2019
2020 /* We need to protect key from destruction: after the first dictDelete()
2021 * it may happen that 'key' is no longer valid if we don't increment
2022 * it's count. This may happen when we get the object reference directly
2023 * from the hash table with dictRandomKey() or dict iterators */
2024 incrRefCount(key);
2025 if (dictSize(db->expires)) dictDelete(db->expires,key);
2026 retval = dictDelete(db->dict,key);
2027 decrRefCount(key);
2028
2029 return retval == DICT_OK;
2030 }
2031
2032 /* Try to share an object against the shared objects pool */
2033 static robj *tryObjectSharing(robj *o) {
2034 struct dictEntry *de;
2035 unsigned long c;
2036
2037 if (o == NULL || server.shareobjects == 0) return o;
2038
2039 assert(o->type == REDIS_STRING);
2040 de = dictFind(server.sharingpool,o);
2041 if (de) {
2042 robj *shared = dictGetEntryKey(de);
2043
2044 c = ((unsigned long) dictGetEntryVal(de))+1;
2045 dictGetEntryVal(de) = (void*) c;
2046 incrRefCount(shared);
2047 decrRefCount(o);
2048 return shared;
2049 } else {
2050 /* Here we are using a stream algorihtm: Every time an object is
2051 * shared we increment its count, everytime there is a miss we
2052 * recrement the counter of a random object. If this object reaches
2053 * zero we remove the object and put the current object instead. */
2054 if (dictSize(server.sharingpool) >=
2055 server.sharingpoolsize) {
2056 de = dictGetRandomKey(server.sharingpool);
2057 assert(de != NULL);
2058 c = ((unsigned long) dictGetEntryVal(de))-1;
2059 dictGetEntryVal(de) = (void*) c;
2060 if (c == 0) {
2061 dictDelete(server.sharingpool,de->key);
2062 }
2063 } else {
2064 c = 0; /* If the pool is empty we want to add this object */
2065 }
2066 if (c == 0) {
2067 int retval;
2068
2069 retval = dictAdd(server.sharingpool,o,(void*)1);
2070 assert(retval == DICT_OK);
2071 incrRefCount(o);
2072 }
2073 return o;
2074 }
2075 }
2076
2077 /* Check if the nul-terminated string 's' can be represented by a long
2078 * (that is, is a number that fits into long without any other space or
2079 * character before or after the digits).
2080 *
2081 * If so, the function returns REDIS_OK and *longval is set to the value
2082 * of the number. Otherwise REDIS_ERR is returned */
2083 static int isStringRepresentableAsLong(sds s, long *longval) {
2084 char buf[32], *endptr;
2085 long value;
2086 int slen;
2087
2088 value = strtol(s, &endptr, 10);
2089 if (endptr[0] != '\0') return REDIS_ERR;
2090 slen = snprintf(buf,32,"%ld",value);
2091
2092 /* If the number converted back into a string is not identical
2093 * then it's not possible to encode the string as integer */
2094 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2095 if (longval) *longval = value;
2096 return REDIS_OK;
2097 }
2098
2099 /* Try to encode a string object in order to save space */
2100 static int tryObjectEncoding(robj *o) {
2101 long value;
2102 sds s = o->ptr;
2103
2104 if (o->encoding != REDIS_ENCODING_RAW)
2105 return REDIS_ERR; /* Already encoded */
2106
2107 /* It's not save to encode shared objects: shared objects can be shared
2108 * everywhere in the "object space" of Redis. Encoded objects can only
2109 * appear as "values" (and not, for instance, as keys) */
2110 if (o->refcount > 1) return REDIS_ERR;
2111
2112 /* Currently we try to encode only strings */
2113 assert(o->type == REDIS_STRING);
2114
2115 /* Check if we can represent this string as a long integer */
2116 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2117
2118 /* Ok, this object can be encoded */
2119 o->encoding = REDIS_ENCODING_INT;
2120 sdsfree(o->ptr);
2121 o->ptr = (void*) value;
2122 return REDIS_OK;
2123 }
2124
2125 /* Get a decoded version of an encoded object (returned as a new object) */
2126 static robj *getDecodedObject(const robj *o) {
2127 robj *dec;
2128
2129 assert(o->encoding != REDIS_ENCODING_RAW);
2130 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2131 char buf[32];
2132
2133 snprintf(buf,32,"%ld",(long)o->ptr);
2134 dec = createStringObject(buf,strlen(buf));
2135 return dec;
2136 } else {
2137 assert(1 != 1);
2138 }
2139 }
2140
2141 /* Compare two string objects via strcmp() or alike.
2142 * Note that the objects may be integer-encoded. In such a case we
2143 * use snprintf() to get a string representation of the numbers on the stack
2144 * and compare the strings, it's much faster than calling getDecodedObject(). */
2145 static int compareStringObjects(robj *a, robj *b) {
2146 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2147 char bufa[128], bufb[128], *astr, *bstr;
2148 int bothsds = 1;
2149
2150 if (a == b) return 0;
2151 if (a->encoding != REDIS_ENCODING_RAW) {
2152 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2153 astr = bufa;
2154 bothsds = 0;
2155 } else {
2156 astr = a->ptr;
2157 }
2158 if (b->encoding != REDIS_ENCODING_RAW) {
2159 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2160 bstr = bufb;
2161 bothsds = 0;
2162 } else {
2163 bstr = b->ptr;
2164 }
2165 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2166 }
2167
2168 static size_t stringObjectLen(robj *o) {
2169 assert(o->type == REDIS_STRING);
2170 if (o->encoding == REDIS_ENCODING_RAW) {
2171 return sdslen(o->ptr);
2172 } else {
2173 char buf[32];
2174
2175 return snprintf(buf,32,"%ld",(long)o->ptr);
2176 }
2177 }
2178
2179 /*============================ DB saving/loading ============================ */
2180
2181 static int rdbSaveType(FILE *fp, unsigned char type) {
2182 if (fwrite(&type,1,1,fp) == 0) return -1;
2183 return 0;
2184 }
2185
2186 static int rdbSaveTime(FILE *fp, time_t t) {
2187 int32_t t32 = (int32_t) t;
2188 if (fwrite(&t32,4,1,fp) == 0) return -1;
2189 return 0;
2190 }
2191
2192 /* check rdbLoadLen() comments for more info */
2193 static int rdbSaveLen(FILE *fp, uint32_t len) {
2194 unsigned char buf[2];
2195
2196 if (len < (1<<6)) {
2197 /* Save a 6 bit len */
2198 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2199 if (fwrite(buf,1,1,fp) == 0) return -1;
2200 } else if (len < (1<<14)) {
2201 /* Save a 14 bit len */
2202 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2203 buf[1] = len&0xFF;
2204 if (fwrite(buf,2,1,fp) == 0) return -1;
2205 } else {
2206 /* Save a 32 bit len */
2207 buf[0] = (REDIS_RDB_32BITLEN<<6);
2208 if (fwrite(buf,1,1,fp) == 0) return -1;
2209 len = htonl(len);
2210 if (fwrite(&len,4,1,fp) == 0) return -1;
2211 }
2212 return 0;
2213 }
2214
2215 /* String objects in the form "2391" "-100" without any space and with a
2216 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2217 * encoded as integers to save space */
2218 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2219 long long value;
2220 char *endptr, buf[32];
2221
2222 /* Check if it's possible to encode this value as a number */
2223 value = strtoll(s, &endptr, 10);
2224 if (endptr[0] != '\0') return 0;
2225 snprintf(buf,32,"%lld",value);
2226
2227 /* If the number converted back into a string is not identical
2228 * then it's not possible to encode the string as integer */
2229 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2230
2231 /* Finally check if it fits in our ranges */
2232 if (value >= -(1<<7) && value <= (1<<7)-1) {
2233 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2234 enc[1] = value&0xFF;
2235 return 2;
2236 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2237 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2238 enc[1] = value&0xFF;
2239 enc[2] = (value>>8)&0xFF;
2240 return 3;
2241 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2242 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2243 enc[1] = value&0xFF;
2244 enc[2] = (value>>8)&0xFF;
2245 enc[3] = (value>>16)&0xFF;
2246 enc[4] = (value>>24)&0xFF;
2247 return 5;
2248 } else {
2249 return 0;
2250 }
2251 }
2252
2253 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2254 unsigned int comprlen, outlen;
2255 unsigned char byte;
2256 void *out;
2257
2258 /* We require at least four bytes compression for this to be worth it */
2259 outlen = sdslen(obj->ptr)-4;
2260 if (outlen <= 0) return 0;
2261 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2262 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2263 if (comprlen == 0) {
2264 zfree(out);
2265 return 0;
2266 }
2267 /* Data compressed! Let's save it on disk */
2268 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2269 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2270 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2271 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2272 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2273 zfree(out);
2274 return comprlen;
2275
2276 writeerr:
2277 zfree(out);
2278 return -1;
2279 }
2280
2281 /* Save a string objet as [len][data] on disk. If the object is a string
2282 * representation of an integer value we try to safe it in a special form */
2283 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2284 size_t len;
2285 int enclen;
2286
2287 len = sdslen(obj->ptr);
2288
2289 /* Try integer encoding */
2290 if (len <= 11) {
2291 unsigned char buf[5];
2292 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2293 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2294 return 0;
2295 }
2296 }
2297
2298 /* Try LZF compression - under 20 bytes it's unable to compress even
2299 * aaaaaaaaaaaaaaaaaa so skip it */
2300 if (len > 20) {
2301 int retval;
2302
2303 retval = rdbSaveLzfStringObject(fp,obj);
2304 if (retval == -1) return -1;
2305 if (retval > 0) return 0;
2306 /* retval == 0 means data can't be compressed, save the old way */
2307 }
2308
2309 /* Store verbatim */
2310 if (rdbSaveLen(fp,len) == -1) return -1;
2311 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2312 return 0;
2313 }
2314
2315 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2316 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2317 int retval;
2318 robj *dec;
2319
2320 if (obj->encoding != REDIS_ENCODING_RAW) {
2321 dec = getDecodedObject(obj);
2322 retval = rdbSaveStringObjectRaw(fp,dec);
2323 decrRefCount(dec);
2324 return retval;
2325 } else {
2326 return rdbSaveStringObjectRaw(fp,obj);
2327 }
2328 }
2329
2330 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2331 * 8 bit integer specifing the length of the representation.
2332 * This 8 bit integer has special values in order to specify the following
2333 * conditions:
2334 * 253: not a number
2335 * 254: + inf
2336 * 255: - inf
2337 */
2338 static int rdbSaveDoubleValue(FILE *fp, double val) {
2339 unsigned char buf[128];
2340 int len;
2341
2342 if (isnan(val)) {
2343 buf[0] = 253;
2344 len = 1;
2345 } else if (!isfinite(val)) {
2346 len = 1;
2347 buf[0] = (val < 0) ? 255 : 254;
2348 } else {
2349 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2350 buf[0] = strlen((char*)buf);
2351 len = buf[0]+1;
2352 }
2353 if (fwrite(buf,len,1,fp) == 0) return -1;
2354 return 0;
2355 }
2356
2357 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2358 static int rdbSave(char *filename) {
2359 dictIterator *di = NULL;
2360 dictEntry *de;
2361 FILE *fp;
2362 char tmpfile[256];
2363 int j;
2364 time_t now = time(NULL);
2365
2366 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2367 fp = fopen(tmpfile,"w");
2368 if (!fp) {
2369 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2370 return REDIS_ERR;
2371 }
2372 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2373 for (j = 0; j < server.dbnum; j++) {
2374 redisDb *db = server.db+j;
2375 dict *d = db->dict;
2376 if (dictSize(d) == 0) continue;
2377 di = dictGetIterator(d);
2378 if (!di) {
2379 fclose(fp);
2380 return REDIS_ERR;
2381 }
2382
2383 /* Write the SELECT DB opcode */
2384 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2385 if (rdbSaveLen(fp,j) == -1) goto werr;
2386
2387 /* Iterate this DB writing every entry */
2388 while((de = dictNext(di)) != NULL) {
2389 robj *key = dictGetEntryKey(de);
2390 robj *o = dictGetEntryVal(de);
2391 time_t expiretime = getExpire(db,key);
2392
2393 /* Save the expire time */
2394 if (expiretime != -1) {
2395 /* If this key is already expired skip it */
2396 if (expiretime < now) continue;
2397 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2398 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2399 }
2400 /* Save the key and associated value */
2401 if (rdbSaveType(fp,o->type) == -1) goto werr;
2402 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2403 if (o->type == REDIS_STRING) {
2404 /* Save a string value */
2405 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2406 } else if (o->type == REDIS_LIST) {
2407 /* Save a list value */
2408 list *list = o->ptr;
2409 listNode *ln;
2410
2411 listRewind(list);
2412 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2413 while((ln = listYield(list))) {
2414 robj *eleobj = listNodeValue(ln);
2415
2416 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2417 }
2418 } else if (o->type == REDIS_SET) {
2419 /* Save a set value */
2420 dict *set = o->ptr;
2421 dictIterator *di = dictGetIterator(set);
2422 dictEntry *de;
2423
2424 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2425 while((de = dictNext(di)) != NULL) {
2426 robj *eleobj = dictGetEntryKey(de);
2427
2428 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2429 }
2430 dictReleaseIterator(di);
2431 } else if (o->type == REDIS_ZSET) {
2432 /* Save a set value */
2433 zset *zs = o->ptr;
2434 dictIterator *di = dictGetIterator(zs->dict);
2435 dictEntry *de;
2436
2437 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2438 while((de = dictNext(di)) != NULL) {
2439 robj *eleobj = dictGetEntryKey(de);
2440 double *score = dictGetEntryVal(de);
2441
2442 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2443 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2444 }
2445 dictReleaseIterator(di);
2446 } else {
2447 assert(0 != 0);
2448 }
2449 }
2450 dictReleaseIterator(di);
2451 }
2452 /* EOF opcode */
2453 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2454
2455 /* Make sure data will not remain on the OS's output buffers */
2456 fflush(fp);
2457 fsync(fileno(fp));
2458 fclose(fp);
2459
2460 /* Use RENAME to make sure the DB file is changed atomically only
2461 * if the generate DB file is ok. */
2462 if (rename(tmpfile,filename) == -1) {
2463 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2464 unlink(tmpfile);
2465 return REDIS_ERR;
2466 }
2467 redisLog(REDIS_NOTICE,"DB saved on disk");
2468 server.dirty = 0;
2469 server.lastsave = time(NULL);
2470 return REDIS_OK;
2471
2472 werr:
2473 fclose(fp);
2474 unlink(tmpfile);
2475 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2476 if (di) dictReleaseIterator(di);
2477 return REDIS_ERR;
2478 }
2479
2480 static int rdbSaveBackground(char *filename) {
2481 pid_t childpid;
2482
2483 if (server.bgsaveinprogress) return REDIS_ERR;
2484 if ((childpid = fork()) == 0) {
2485 /* Child */
2486 close(server.fd);
2487 if (rdbSave(filename) == REDIS_OK) {
2488 exit(0);
2489 } else {
2490 exit(1);
2491 }
2492 } else {
2493 /* Parent */
2494 if (childpid == -1) {
2495 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2496 strerror(errno));
2497 return REDIS_ERR;
2498 }
2499 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2500 server.bgsaveinprogress = 1;
2501 server.bgsavechildpid = childpid;
2502 return REDIS_OK;
2503 }
2504 return REDIS_OK; /* unreached */
2505 }
2506
2507 static void rdbRemoveTempFile(pid_t childpid) {
2508 char tmpfile[256];
2509
2510 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2511 unlink(tmpfile);
2512 }
2513
2514 static int rdbLoadType(FILE *fp) {
2515 unsigned char type;
2516 if (fread(&type,1,1,fp) == 0) return -1;
2517 return type;
2518 }
2519
2520 static time_t rdbLoadTime(FILE *fp) {
2521 int32_t t32;
2522 if (fread(&t32,4,1,fp) == 0) return -1;
2523 return (time_t) t32;
2524 }
2525
2526 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2527 * of this file for a description of how this are stored on disk.
2528 *
2529 * isencoded is set to 1 if the readed length is not actually a length but
2530 * an "encoding type", check the above comments for more info */
2531 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2532 unsigned char buf[2];
2533 uint32_t len;
2534
2535 if (isencoded) *isencoded = 0;
2536 if (rdbver == 0) {
2537 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2538 return ntohl(len);
2539 } else {
2540 int type;
2541
2542 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2543 type = (buf[0]&0xC0)>>6;
2544 if (type == REDIS_RDB_6BITLEN) {
2545 /* Read a 6 bit len */
2546 return buf[0]&0x3F;
2547 } else if (type == REDIS_RDB_ENCVAL) {
2548 /* Read a 6 bit len encoding type */
2549 if (isencoded) *isencoded = 1;
2550 return buf[0]&0x3F;
2551 } else if (type == REDIS_RDB_14BITLEN) {
2552 /* Read a 14 bit len */
2553 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2554 return ((buf[0]&0x3F)<<8)|buf[1];
2555 } else {
2556 /* Read a 32 bit len */
2557 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2558 return ntohl(len);
2559 }
2560 }
2561 }
2562
2563 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2564 unsigned char enc[4];
2565 long long val;
2566
2567 if (enctype == REDIS_RDB_ENC_INT8) {
2568 if (fread(enc,1,1,fp) == 0) return NULL;
2569 val = (signed char)enc[0];
2570 } else if (enctype == REDIS_RDB_ENC_INT16) {
2571 uint16_t v;
2572 if (fread(enc,2,1,fp) == 0) return NULL;
2573 v = enc[0]|(enc[1]<<8);
2574 val = (int16_t)v;
2575 } else if (enctype == REDIS_RDB_ENC_INT32) {
2576 uint32_t v;
2577 if (fread(enc,4,1,fp) == 0) return NULL;
2578 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2579 val = (int32_t)v;
2580 } else {
2581 val = 0; /* anti-warning */
2582 assert(0!=0);
2583 }
2584 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2585 }
2586
2587 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2588 unsigned int len, clen;
2589 unsigned char *c = NULL;
2590 sds val = NULL;
2591
2592 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2593 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2594 if ((c = zmalloc(clen)) == NULL) goto err;
2595 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2596 if (fread(c,clen,1,fp) == 0) goto err;
2597 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2598 zfree(c);
2599 return createObject(REDIS_STRING,val);
2600 err:
2601 zfree(c);
2602 sdsfree(val);
2603 return NULL;
2604 }
2605
2606 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2607 int isencoded;
2608 uint32_t len;
2609 sds val;
2610
2611 len = rdbLoadLen(fp,rdbver,&isencoded);
2612 if (isencoded) {
2613 switch(len) {
2614 case REDIS_RDB_ENC_INT8:
2615 case REDIS_RDB_ENC_INT16:
2616 case REDIS_RDB_ENC_INT32:
2617 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2618 case REDIS_RDB_ENC_LZF:
2619 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2620 default:
2621 assert(0!=0);
2622 }
2623 }
2624
2625 if (len == REDIS_RDB_LENERR) return NULL;
2626 val = sdsnewlen(NULL,len);
2627 if (len && fread(val,len,1,fp) == 0) {
2628 sdsfree(val);
2629 return NULL;
2630 }
2631 return tryObjectSharing(createObject(REDIS_STRING,val));
2632 }
2633
2634 /* For information about double serialization check rdbSaveDoubleValue() */
2635 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2636 char buf[128];
2637 unsigned char len;
2638
2639 if (fread(&len,1,1,fp) == 0) return -1;
2640 switch(len) {
2641 case 255: *val = R_NegInf; return 0;
2642 case 254: *val = R_PosInf; return 0;
2643 case 253: *val = R_Nan; return 0;
2644 default:
2645 if (fread(buf,len,1,fp) == 0) return -1;
2646 sscanf(buf, "%lg", val);
2647 return 0;
2648 }
2649 }
2650
2651 static int rdbLoad(char *filename) {
2652 FILE *fp;
2653 robj *keyobj = NULL;
2654 uint32_t dbid;
2655 int type, retval, rdbver;
2656 dict *d = server.db[0].dict;
2657 redisDb *db = server.db+0;
2658 char buf[1024];
2659 time_t expiretime = -1, now = time(NULL);
2660
2661 fp = fopen(filename,"r");
2662 if (!fp) return REDIS_ERR;
2663 if (fread(buf,9,1,fp) == 0) goto eoferr;
2664 buf[9] = '\0';
2665 if (memcmp(buf,"REDIS",5) != 0) {
2666 fclose(fp);
2667 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2668 return REDIS_ERR;
2669 }
2670 rdbver = atoi(buf+5);
2671 if (rdbver > 1) {
2672 fclose(fp);
2673 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2674 return REDIS_ERR;
2675 }
2676 while(1) {
2677 robj *o;
2678
2679 /* Read type. */
2680 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2681 if (type == REDIS_EXPIRETIME) {
2682 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2683 /* We read the time so we need to read the object type again */
2684 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2685 }
2686 if (type == REDIS_EOF) break;
2687 /* Handle SELECT DB opcode as a special case */
2688 if (type == REDIS_SELECTDB) {
2689 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2690 goto eoferr;
2691 if (dbid >= (unsigned)server.dbnum) {
2692 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2693 exit(1);
2694 }
2695 db = server.db+dbid;
2696 d = db->dict;
2697 continue;
2698 }
2699 /* Read key */
2700 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2701
2702 if (type == REDIS_STRING) {
2703 /* Read string value */
2704 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2705 tryObjectEncoding(o);
2706 } else if (type == REDIS_LIST || type == REDIS_SET) {
2707 /* Read list/set value */
2708 uint32_t listlen;
2709
2710 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2711 goto eoferr;
2712 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2713 /* Load every single element of the list/set */
2714 while(listlen--) {
2715 robj *ele;
2716
2717 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2718 tryObjectEncoding(ele);
2719 if (type == REDIS_LIST) {
2720 listAddNodeTail((list*)o->ptr,ele);
2721 } else {
2722 dictAdd((dict*)o->ptr,ele,NULL);
2723 }
2724 }
2725 } else if (type == REDIS_ZSET) {
2726 /* Read list/set value */
2727 uint32_t zsetlen;
2728 zset *zs;
2729
2730 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2731 goto eoferr;
2732 o = createZsetObject();
2733 zs = o->ptr;
2734 /* Load every single element of the list/set */
2735 while(zsetlen--) {
2736 robj *ele;
2737 double *score = zmalloc(sizeof(double));
2738
2739 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2740 tryObjectEncoding(ele);
2741 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2742 dictAdd(zs->dict,ele,score);
2743 zslInsert(zs->zsl,*score,ele);
2744 incrRefCount(ele); /* added to skiplist */
2745 }
2746 } else {
2747 assert(0 != 0);
2748 }
2749 /* Add the new object in the hash table */
2750 retval = dictAdd(d,keyobj,o);
2751 if (retval == DICT_ERR) {
2752 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2753 exit(1);
2754 }
2755 /* Set the expire time if needed */
2756 if (expiretime != -1) {
2757 setExpire(db,keyobj,expiretime);
2758 /* Delete this key if already expired */
2759 if (expiretime < now) deleteKey(db,keyobj);
2760 expiretime = -1;
2761 }
2762 keyobj = o = NULL;
2763 }
2764 fclose(fp);
2765 return REDIS_OK;
2766
2767 eoferr: /* unexpected end of file is handled here with a fatal exit */
2768 if (keyobj) decrRefCount(keyobj);
2769 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2770 exit(1);
2771 return REDIS_ERR; /* Just to avoid warning */
2772 }
2773
2774 /*================================== Commands =============================== */
2775
2776 static void authCommand(redisClient *c) {
2777 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2778 c->authenticated = 1;
2779 addReply(c,shared.ok);
2780 } else {
2781 c->authenticated = 0;
2782 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2783 }
2784 }
2785
2786 static void pingCommand(redisClient *c) {
2787 addReply(c,shared.pong);
2788 }
2789
2790 static void echoCommand(redisClient *c) {
2791 addReplyBulkLen(c,c->argv[1]);
2792 addReply(c,c->argv[1]);
2793 addReply(c,shared.crlf);
2794 }
2795
2796 /*=================================== Strings =============================== */
2797
2798 static void setGenericCommand(redisClient *c, int nx) {
2799 int retval;
2800
2801 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2802 if (retval == DICT_ERR) {
2803 if (!nx) {
2804 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2805 incrRefCount(c->argv[2]);
2806 } else {
2807 addReply(c,shared.czero);
2808 return;
2809 }
2810 } else {
2811 incrRefCount(c->argv[1]);
2812 incrRefCount(c->argv[2]);
2813 }
2814 server.dirty++;
2815 removeExpire(c->db,c->argv[1]);
2816 addReply(c, nx ? shared.cone : shared.ok);
2817 }
2818
2819 static void setCommand(redisClient *c) {
2820 setGenericCommand(c,0);
2821 }
2822
2823 static void setnxCommand(redisClient *c) {
2824 setGenericCommand(c,1);
2825 }
2826
2827 static void getCommand(redisClient *c) {
2828 robj *o = lookupKeyRead(c->db,c->argv[1]);
2829
2830 if (o == NULL) {
2831 addReply(c,shared.nullbulk);
2832 } else {
2833 if (o->type != REDIS_STRING) {
2834 addReply(c,shared.wrongtypeerr);
2835 } else {
2836 addReplyBulkLen(c,o);
2837 addReply(c,o);
2838 addReply(c,shared.crlf);
2839 }
2840 }
2841 }
2842
2843 static void getsetCommand(redisClient *c) {
2844 getCommand(c);
2845 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2846 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2847 } else {
2848 incrRefCount(c->argv[1]);
2849 }
2850 incrRefCount(c->argv[2]);
2851 server.dirty++;
2852 removeExpire(c->db,c->argv[1]);
2853 }
2854
2855 static void mgetCommand(redisClient *c) {
2856 int j;
2857
2858 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2859 for (j = 1; j < c->argc; j++) {
2860 robj *o = lookupKeyRead(c->db,c->argv[j]);
2861 if (o == NULL) {
2862 addReply(c,shared.nullbulk);
2863 } else {
2864 if (o->type != REDIS_STRING) {
2865 addReply(c,shared.nullbulk);
2866 } else {
2867 addReplyBulkLen(c,o);
2868 addReply(c,o);
2869 addReply(c,shared.crlf);
2870 }
2871 }
2872 }
2873 }
2874
2875 static void incrDecrCommand(redisClient *c, long long incr) {
2876 long long value;
2877 int retval;
2878 robj *o;
2879
2880 o = lookupKeyWrite(c->db,c->argv[1]);
2881 if (o == NULL) {
2882 value = 0;
2883 } else {
2884 if (o->type != REDIS_STRING) {
2885 value = 0;
2886 } else {
2887 char *eptr;
2888
2889 if (o->encoding == REDIS_ENCODING_RAW)
2890 value = strtoll(o->ptr, &eptr, 10);
2891 else if (o->encoding == REDIS_ENCODING_INT)
2892 value = (long)o->ptr;
2893 else
2894 assert(1 != 1);
2895 }
2896 }
2897
2898 value += incr;
2899 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2900 tryObjectEncoding(o);
2901 retval = dictAdd(c->db->dict,c->argv[1],o);
2902 if (retval == DICT_ERR) {
2903 dictReplace(c->db->dict,c->argv[1],o);
2904 removeExpire(c->db,c->argv[1]);
2905 } else {
2906 incrRefCount(c->argv[1]);
2907 }
2908 server.dirty++;
2909 addReply(c,shared.colon);
2910 addReply(c,o);
2911 addReply(c,shared.crlf);
2912 }
2913
2914 static void incrCommand(redisClient *c) {
2915 incrDecrCommand(c,1);
2916 }
2917
2918 static void decrCommand(redisClient *c) {
2919 incrDecrCommand(c,-1);
2920 }
2921
2922 static void incrbyCommand(redisClient *c) {
2923 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2924 incrDecrCommand(c,incr);
2925 }
2926
2927 static void decrbyCommand(redisClient *c) {
2928 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2929 incrDecrCommand(c,-incr);
2930 }
2931
2932 /* ========================= Type agnostic commands ========================= */
2933
2934 static void delCommand(redisClient *c) {
2935 int deleted = 0, j;
2936
2937 for (j = 1; j < c->argc; j++) {
2938 if (deleteKey(c->db,c->argv[j])) {
2939 server.dirty++;
2940 deleted++;
2941 }
2942 }
2943 switch(deleted) {
2944 case 0:
2945 addReply(c,shared.czero);
2946 break;
2947 case 1:
2948 addReply(c,shared.cone);
2949 break;
2950 default:
2951 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2952 break;
2953 }
2954 }
2955
2956 static void existsCommand(redisClient *c) {
2957 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2958 }
2959
2960 static void selectCommand(redisClient *c) {
2961 int id = atoi(c->argv[1]->ptr);
2962
2963 if (selectDb(c,id) == REDIS_ERR) {
2964 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2965 } else {
2966 addReply(c,shared.ok);
2967 }
2968 }
2969
2970 static void randomkeyCommand(redisClient *c) {
2971 dictEntry *de;
2972
2973 while(1) {
2974 de = dictGetRandomKey(c->db->dict);
2975 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2976 }
2977 if (de == NULL) {
2978 addReply(c,shared.plus);
2979 addReply(c,shared.crlf);
2980 } else {
2981 addReply(c,shared.plus);
2982 addReply(c,dictGetEntryKey(de));
2983 addReply(c,shared.crlf);
2984 }
2985 }
2986
2987 static void keysCommand(redisClient *c) {
2988 dictIterator *di;
2989 dictEntry *de;
2990 sds pattern = c->argv[1]->ptr;
2991 int plen = sdslen(pattern);
2992 int numkeys = 0, keyslen = 0;
2993 robj *lenobj = createObject(REDIS_STRING,NULL);
2994
2995 di = dictGetIterator(c->db->dict);
2996 addReply(c,lenobj);
2997 decrRefCount(lenobj);
2998 while((de = dictNext(di)) != NULL) {
2999 robj *keyobj = dictGetEntryKey(de);
3000
3001 sds key = keyobj->ptr;
3002 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3003 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3004 if (expireIfNeeded(c->db,keyobj) == 0) {
3005 if (numkeys != 0)
3006 addReply(c,shared.space);
3007 addReply(c,keyobj);
3008 numkeys++;
3009 keyslen += sdslen(key);
3010 }
3011 }
3012 }
3013 dictReleaseIterator(di);
3014 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3015 addReply(c,shared.crlf);
3016 }
3017
3018 static void dbsizeCommand(redisClient *c) {
3019 addReplySds(c,
3020 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3021 }
3022
3023 static void lastsaveCommand(redisClient *c) {
3024 addReplySds(c,
3025 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3026 }
3027
3028 static void typeCommand(redisClient *c) {
3029 robj *o;
3030 char *type;
3031
3032 o = lookupKeyRead(c->db,c->argv[1]);
3033 if (o == NULL) {
3034 type = "+none";
3035 } else {
3036 switch(o->type) {
3037 case REDIS_STRING: type = "+string"; break;
3038 case REDIS_LIST: type = "+list"; break;
3039 case REDIS_SET: type = "+set"; break;
3040 case REDIS_ZSET: type = "+zset"; break;
3041 default: type = "unknown"; break;
3042 }
3043 }
3044 addReplySds(c,sdsnew(type));
3045 addReply(c,shared.crlf);
3046 }
3047
3048 static void saveCommand(redisClient *c) {
3049 if (server.bgsaveinprogress) {
3050 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3051 return;
3052 }
3053 if (rdbSave(server.dbfilename) == REDIS_OK) {
3054 addReply(c,shared.ok);
3055 } else {
3056 addReply(c,shared.err);
3057 }
3058 }
3059
3060 static void bgsaveCommand(redisClient *c) {
3061 if (server.bgsaveinprogress) {
3062 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3063 return;
3064 }
3065 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3066 addReply(c,shared.ok);
3067 } else {
3068 addReply(c,shared.err);
3069 }
3070 }
3071
3072 static void shutdownCommand(redisClient *c) {
3073 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3074 /* Kill the saving child if there is a background saving in progress.
3075 We want to avoid race conditions, for instance our saving child may
3076 overwrite the synchronous saving did by SHUTDOWN. */
3077 if (server.bgsaveinprogress) {
3078 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3079 kill(server.bgsavechildpid,SIGKILL);
3080 rdbRemoveTempFile(server.bgsavechildpid);
3081 }
3082 /* SYNC SAVE */
3083 if (rdbSave(server.dbfilename) == REDIS_OK) {
3084 if (server.daemonize)
3085 unlink(server.pidfile);
3086 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3087 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3088 exit(1);
3089 } else {
3090 /* Ooops.. error saving! The best we can do is to continue operating.
3091 * Note that if there was a background saving process, in the next
3092 * cron() Redis will be notified that the background saving aborted,
3093 * handling special stuff like slaves pending for synchronization... */
3094 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3095 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3096 }
3097 }
3098
3099 static void renameGenericCommand(redisClient *c, int nx) {
3100 robj *o;
3101
3102 /* To use the same key as src and dst is probably an error */
3103 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3104 addReply(c,shared.sameobjecterr);
3105 return;
3106 }
3107
3108 o = lookupKeyWrite(c->db,c->argv[1]);
3109 if (o == NULL) {
3110 addReply(c,shared.nokeyerr);
3111 return;
3112 }
3113 incrRefCount(o);
3114 deleteIfVolatile(c->db,c->argv[2]);
3115 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3116 if (nx) {
3117 decrRefCount(o);
3118 addReply(c,shared.czero);
3119 return;
3120 }
3121 dictReplace(c->db->dict,c->argv[2],o);
3122 } else {
3123 incrRefCount(c->argv[2]);
3124 }
3125 deleteKey(c->db,c->argv[1]);
3126 server.dirty++;
3127 addReply(c,nx ? shared.cone : shared.ok);
3128 }
3129
3130 static void renameCommand(redisClient *c) {
3131 renameGenericCommand(c,0);
3132 }
3133
3134 static void renamenxCommand(redisClient *c) {
3135 renameGenericCommand(c,1);
3136 }
3137
3138 static void moveCommand(redisClient *c) {
3139 robj *o;
3140 redisDb *src, *dst;
3141 int srcid;
3142
3143 /* Obtain source and target DB pointers */
3144 src = c->db;
3145 srcid = c->db->id;
3146 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3147 addReply(c,shared.outofrangeerr);
3148 return;
3149 }
3150 dst = c->db;
3151 selectDb(c,srcid); /* Back to the source DB */
3152
3153 /* If the user is moving using as target the same
3154 * DB as the source DB it is probably an error. */
3155 if (src == dst) {
3156 addReply(c,shared.sameobjecterr);
3157 return;
3158 }
3159
3160 /* Check if the element exists and get a reference */
3161 o = lookupKeyWrite(c->db,c->argv[1]);
3162 if (!o) {
3163 addReply(c,shared.czero);
3164 return;
3165 }
3166
3167 /* Try to add the element to the target DB */
3168 deleteIfVolatile(dst,c->argv[1]);
3169 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3170 addReply(c,shared.czero);
3171 return;
3172 }
3173 incrRefCount(c->argv[1]);
3174 incrRefCount(o);
3175
3176 /* OK! key moved, free the entry in the source DB */
3177 deleteKey(src,c->argv[1]);
3178 server.dirty++;
3179 addReply(c,shared.cone);
3180 }
3181
3182 /* =================================== Lists ================================ */
3183 static void pushGenericCommand(redisClient *c, int where) {
3184 robj *lobj;
3185 list *list;
3186
3187 lobj = lookupKeyWrite(c->db,c->argv[1]);
3188 if (lobj == NULL) {
3189 lobj = createListObject();
3190 list = lobj->ptr;
3191 if (where == REDIS_HEAD) {
3192 listAddNodeHead(list,c->argv[2]);
3193 } else {
3194 listAddNodeTail(list,c->argv[2]);
3195 }
3196 dictAdd(c->db->dict,c->argv[1],lobj);
3197 incrRefCount(c->argv[1]);
3198 incrRefCount(c->argv[2]);
3199 } else {
3200 if (lobj->type != REDIS_LIST) {
3201 addReply(c,shared.wrongtypeerr);
3202 return;
3203 }
3204 list = lobj->ptr;
3205 if (where == REDIS_HEAD) {
3206 listAddNodeHead(list,c->argv[2]);
3207 } else {
3208 listAddNodeTail(list,c->argv[2]);
3209 }
3210 incrRefCount(c->argv[2]);
3211 }
3212 server.dirty++;
3213 addReply(c,shared.ok);
3214 }
3215
3216 static void lpushCommand(redisClient *c) {
3217 pushGenericCommand(c,REDIS_HEAD);
3218 }
3219
3220 static void rpushCommand(redisClient *c) {
3221 pushGenericCommand(c,REDIS_TAIL);
3222 }
3223
3224 static void llenCommand(redisClient *c) {
3225 robj *o;
3226 list *l;
3227
3228 o = lookupKeyRead(c->db,c->argv[1]);
3229 if (o == NULL) {
3230 addReply(c,shared.czero);
3231 return;
3232 } else {
3233 if (o->type != REDIS_LIST) {
3234 addReply(c,shared.wrongtypeerr);
3235 } else {
3236 l = o->ptr;
3237 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3238 }
3239 }
3240 }
3241
3242 static void lindexCommand(redisClient *c) {
3243 robj *o;
3244 int index = atoi(c->argv[2]->ptr);
3245
3246 o = lookupKeyRead(c->db,c->argv[1]);
3247 if (o == NULL) {
3248 addReply(c,shared.nullbulk);
3249 } else {
3250 if (o->type != REDIS_LIST) {
3251 addReply(c,shared.wrongtypeerr);
3252 } else {
3253 list *list = o->ptr;
3254 listNode *ln;
3255
3256 ln = listIndex(list, index);
3257 if (ln == NULL) {
3258 addReply(c,shared.nullbulk);
3259 } else {
3260 robj *ele = listNodeValue(ln);
3261 addReplyBulkLen(c,ele);
3262 addReply(c,ele);
3263 addReply(c,shared.crlf);
3264 }
3265 }
3266 }
3267 }
3268
3269 static void lsetCommand(redisClient *c) {
3270 robj *o;
3271 int index = atoi(c->argv[2]->ptr);
3272
3273 o = lookupKeyWrite(c->db,c->argv[1]);
3274 if (o == NULL) {
3275 addReply(c,shared.nokeyerr);
3276 } else {
3277 if (o->type != REDIS_LIST) {
3278 addReply(c,shared.wrongtypeerr);
3279 } else {
3280 list *list = o->ptr;
3281 listNode *ln;
3282
3283 ln = listIndex(list, index);
3284 if (ln == NULL) {
3285 addReply(c,shared.outofrangeerr);
3286 } else {
3287 robj *ele = listNodeValue(ln);
3288
3289 decrRefCount(ele);
3290 listNodeValue(ln) = c->argv[3];
3291 incrRefCount(c->argv[3]);
3292 addReply(c,shared.ok);
3293 server.dirty++;
3294 }
3295 }
3296 }
3297 }
3298
3299 static void popGenericCommand(redisClient *c, int where) {
3300 robj *o;
3301
3302 o = lookupKeyWrite(c->db,c->argv[1]);
3303 if (o == NULL) {
3304 addReply(c,shared.nullbulk);
3305 } else {
3306 if (o->type != REDIS_LIST) {
3307 addReply(c,shared.wrongtypeerr);
3308 } else {
3309 list *list = o->ptr;
3310 listNode *ln;
3311
3312 if (where == REDIS_HEAD)
3313 ln = listFirst(list);
3314 else
3315 ln = listLast(list);
3316
3317 if (ln == NULL) {
3318 addReply(c,shared.nullbulk);
3319 } else {
3320 robj *ele = listNodeValue(ln);
3321 addReplyBulkLen(c,ele);
3322 addReply(c,ele);
3323 addReply(c,shared.crlf);
3324 listDelNode(list,ln);
3325 server.dirty++;
3326 }
3327 }
3328 }
3329 }
3330
3331 static void lpopCommand(redisClient *c) {
3332 popGenericCommand(c,REDIS_HEAD);
3333 }
3334
3335 static void rpopCommand(redisClient *c) {
3336 popGenericCommand(c,REDIS_TAIL);
3337 }
3338
3339 static void lrangeCommand(redisClient *c) {
3340 robj *o;
3341 int start = atoi(c->argv[2]->ptr);
3342 int end = atoi(c->argv[3]->ptr);
3343
3344 o = lookupKeyRead(c->db,c->argv[1]);
3345 if (o == NULL) {
3346 addReply(c,shared.nullmultibulk);
3347 } else {
3348 if (o->type != REDIS_LIST) {
3349 addReply(c,shared.wrongtypeerr);
3350 } else {
3351 list *list = o->ptr;
3352 listNode *ln;
3353 int llen = listLength(list);
3354 int rangelen, j;
3355 robj *ele;
3356
3357 /* convert negative indexes */
3358 if (start < 0) start = llen+start;
3359 if (end < 0) end = llen+end;
3360 if (start < 0) start = 0;
3361 if (end < 0) end = 0;
3362
3363 /* indexes sanity checks */
3364 if (start > end || start >= llen) {
3365 /* Out of range start or start > end result in empty list */
3366 addReply(c,shared.emptymultibulk);
3367 return;
3368 }
3369 if (end >= llen) end = llen-1;
3370 rangelen = (end-start)+1;
3371
3372 /* Return the result in form of a multi-bulk reply */
3373 ln = listIndex(list, start);
3374 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3375 for (j = 0; j < rangelen; j++) {
3376 ele = listNodeValue(ln);
3377 addReplyBulkLen(c,ele);
3378 addReply(c,ele);
3379 addReply(c,shared.crlf);
3380 ln = ln->next;
3381 }
3382 }
3383 }
3384 }
3385
3386 static void ltrimCommand(redisClient *c) {
3387 robj *o;
3388 int start = atoi(c->argv[2]->ptr);
3389 int end = atoi(c->argv[3]->ptr);
3390
3391 o = lookupKeyWrite(c->db,c->argv[1]);
3392 if (o == NULL) {
3393 addReply(c,shared.nokeyerr);
3394 } else {
3395 if (o->type != REDIS_LIST) {
3396 addReply(c,shared.wrongtypeerr);
3397 } else {
3398 list *list = o->ptr;
3399 listNode *ln;
3400 int llen = listLength(list);
3401 int j, ltrim, rtrim;
3402
3403 /* convert negative indexes */
3404 if (start < 0) start = llen+start;
3405 if (end < 0) end = llen+end;
3406 if (start < 0) start = 0;
3407 if (end < 0) end = 0;
3408
3409 /* indexes sanity checks */
3410 if (start > end || start >= llen) {
3411 /* Out of range start or start > end result in empty list */
3412 ltrim = llen;
3413 rtrim = 0;
3414 } else {
3415 if (end >= llen) end = llen-1;
3416 ltrim = start;
3417 rtrim = llen-end-1;
3418 }
3419
3420 /* Remove list elements to perform the trim */
3421 for (j = 0; j < ltrim; j++) {
3422 ln = listFirst(list);
3423 listDelNode(list,ln);
3424 }
3425 for (j = 0; j < rtrim; j++) {
3426 ln = listLast(list);
3427 listDelNode(list,ln);
3428 }
3429 server.dirty++;
3430 addReply(c,shared.ok);
3431 }
3432 }
3433 }
3434
3435 static void lremCommand(redisClient *c) {
3436 robj *o;
3437
3438 o = lookupKeyWrite(c->db,c->argv[1]);
3439 if (o == NULL) {
3440 addReply(c,shared.czero);
3441 } else {
3442 if (o->type != REDIS_LIST) {
3443 addReply(c,shared.wrongtypeerr);
3444 } else {
3445 list *list = o->ptr;
3446 listNode *ln, *next;
3447 int toremove = atoi(c->argv[2]->ptr);
3448 int removed = 0;
3449 int fromtail = 0;
3450
3451 if (toremove < 0) {
3452 toremove = -toremove;
3453 fromtail = 1;
3454 }
3455 ln = fromtail ? list->tail : list->head;
3456 while (ln) {
3457 robj *ele = listNodeValue(ln);
3458
3459 next = fromtail ? ln->prev : ln->next;
3460 if (compareStringObjects(ele,c->argv[3]) == 0) {
3461 listDelNode(list,ln);
3462 server.dirty++;
3463 removed++;
3464 if (toremove && removed == toremove) break;
3465 }
3466 ln = next;
3467 }
3468 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3469 }
3470 }
3471 }
3472
3473 /* This is the semantic of this command:
3474 * LPOPPUSH srclist dstlist:
3475 * IF LLEN(srclist) > 0
3476 * element = RPOP srclist
3477 * LPUSH dstlist element
3478 * RETURN element
3479 * ELSE
3480 * RETURN nil
3481 * END
3482 * END
3483 *
3484 * The idea is to be able to get an element from a list in a reliable way
3485 * since the element is not just returned but pushed against another list
3486 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3487 */
3488 static void lpoppushCommand(redisClient *c) {
3489 robj *sobj;
3490
3491 sobj = lookupKeyWrite(c->db,c->argv[1]);
3492 if (sobj == NULL) {
3493 addReply(c,shared.nullbulk);
3494 } else {
3495 if (sobj->type != REDIS_LIST) {
3496 addReply(c,shared.wrongtypeerr);
3497 } else {
3498 list *srclist = sobj->ptr;
3499 listNode *ln = listLast(srclist);
3500
3501 if (ln == NULL) {
3502 addReply(c,shared.nullbulk);
3503 } else {
3504 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3505 robj *ele = listNodeValue(ln);
3506 list *dstlist;
3507
3508 if (dobj == NULL) {
3509
3510 /* Create the list if the key does not exist */
3511 dobj = createListObject();
3512 dictAdd(c->db->dict,c->argv[2],dobj);
3513 incrRefCount(c->argv[2]);
3514 } else if (dobj->type != REDIS_LIST) {
3515 addReply(c,shared.wrongtypeerr);
3516 return;
3517 }
3518 /* Add the element to the target list */
3519 dstlist = dobj->ptr;
3520 listAddNodeHead(dstlist,ele);
3521 incrRefCount(ele);
3522
3523 /* Send the element to the client as reply as well */
3524 addReplyBulkLen(c,ele);
3525 addReply(c,ele);
3526 addReply(c,shared.crlf);
3527
3528 /* Finally remove the element from the source list */
3529 listDelNode(srclist,ln);
3530 server.dirty++;
3531 }
3532 }
3533 }
3534 }
3535
3536
3537 /* ==================================== Sets ================================ */
3538
3539 static void saddCommand(redisClient *c) {
3540 robj *set;
3541
3542 set = lookupKeyWrite(c->db,c->argv[1]);
3543 if (set == NULL) {
3544 set = createSetObject();
3545 dictAdd(c->db->dict,c->argv[1],set);
3546 incrRefCount(c->argv[1]);
3547 } else {
3548 if (set->type != REDIS_SET) {
3549 addReply(c,shared.wrongtypeerr);
3550 return;
3551 }
3552 }
3553 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3554 incrRefCount(c->argv[2]);
3555 server.dirty++;
3556 addReply(c,shared.cone);
3557 } else {
3558 addReply(c,shared.czero);
3559 }
3560 }
3561
3562 static void sremCommand(redisClient *c) {
3563 robj *set;
3564
3565 set = lookupKeyWrite(c->db,c->argv[1]);
3566 if (set == NULL) {
3567 addReply(c,shared.czero);
3568 } else {
3569 if (set->type != REDIS_SET) {
3570 addReply(c,shared.wrongtypeerr);
3571 return;
3572 }
3573 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3574 server.dirty++;
3575 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3576 addReply(c,shared.cone);
3577 } else {
3578 addReply(c,shared.czero);
3579 }
3580 }
3581 }
3582
3583 static void smoveCommand(redisClient *c) {
3584 robj *srcset, *dstset;
3585
3586 srcset = lookupKeyWrite(c->db,c->argv[1]);
3587 dstset = lookupKeyWrite(c->db,c->argv[2]);
3588
3589 /* If the source key does not exist return 0, if it's of the wrong type
3590 * raise an error */
3591 if (srcset == NULL || srcset->type != REDIS_SET) {
3592 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3593 return;
3594 }
3595 /* Error if the destination key is not a set as well */
3596 if (dstset && dstset->type != REDIS_SET) {
3597 addReply(c,shared.wrongtypeerr);
3598 return;
3599 }
3600 /* Remove the element from the source set */
3601 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3602 /* Key not found in the src set! return zero */
3603 addReply(c,shared.czero);
3604 return;
3605 }
3606 server.dirty++;
3607 /* Add the element to the destination set */
3608 if (!dstset) {
3609 dstset = createSetObject();
3610 dictAdd(c->db->dict,c->argv[2],dstset);
3611 incrRefCount(c->argv[2]);
3612 }
3613 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3614 incrRefCount(c->argv[3]);
3615 addReply(c,shared.cone);
3616 }
3617
3618 static void sismemberCommand(redisClient *c) {
3619 robj *set;
3620
3621 set = lookupKeyRead(c->db,c->argv[1]);
3622 if (set == NULL) {
3623 addReply(c,shared.czero);
3624 } else {
3625 if (set->type != REDIS_SET) {
3626 addReply(c,shared.wrongtypeerr);
3627 return;
3628 }
3629 if (dictFind(set->ptr,c->argv[2]))
3630 addReply(c,shared.cone);
3631 else
3632 addReply(c,shared.czero);
3633 }
3634 }
3635
3636 static void scardCommand(redisClient *c) {
3637 robj *o;
3638 dict *s;
3639
3640 o = lookupKeyRead(c->db,c->argv[1]);
3641 if (o == NULL) {
3642 addReply(c,shared.czero);
3643 return;
3644 } else {
3645 if (o->type != REDIS_SET) {
3646 addReply(c,shared.wrongtypeerr);
3647 } else {
3648 s = o->ptr;
3649 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3650 dictSize(s)));
3651 }
3652 }
3653 }
3654
3655 static void spopCommand(redisClient *c) {
3656 robj *set;
3657 dictEntry *de;
3658
3659 set = lookupKeyWrite(c->db,c->argv[1]);
3660 if (set == NULL) {
3661 addReply(c,shared.nullbulk);
3662 } else {
3663 if (set->type != REDIS_SET) {
3664 addReply(c,shared.wrongtypeerr);
3665 return;
3666 }
3667 de = dictGetRandomKey(set->ptr);
3668 if (de == NULL) {
3669 addReply(c,shared.nullbulk);
3670 } else {
3671 robj *ele = dictGetEntryKey(de);
3672
3673 addReplyBulkLen(c,ele);
3674 addReply(c,ele);
3675 addReply(c,shared.crlf);
3676 dictDelete(set->ptr,ele);
3677 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3678 server.dirty++;
3679 }
3680 }
3681 }
3682
3683 static void srandmemberCommand(redisClient *c) {
3684 robj *set;
3685 dictEntry *de;
3686
3687 set = lookupKeyRead(c->db,c->argv[1]);
3688 if (set == NULL) {
3689 addReply(c,shared.nullbulk);
3690 } else {
3691 if (set->type != REDIS_SET) {
3692 addReply(c,shared.wrongtypeerr);
3693 return;
3694 }
3695 de = dictGetRandomKey(set->ptr);
3696 if (de == NULL) {
3697 addReply(c,shared.nullbulk);
3698 } else {
3699 robj *ele = dictGetEntryKey(de);
3700
3701 addReplyBulkLen(c,ele);
3702 addReply(c,ele);
3703 addReply(c,shared.crlf);
3704 }
3705 }
3706 }
3707
3708 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3709 dict **d1 = (void*) s1, **d2 = (void*) s2;
3710
3711 return dictSize(*d1)-dictSize(*d2);
3712 }
3713
3714 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3715 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3716 dictIterator *di;
3717 dictEntry *de;
3718 robj *lenobj = NULL, *dstset = NULL;
3719 int j, cardinality = 0;
3720
3721 for (j = 0; j < setsnum; j++) {
3722 robj *setobj;
3723
3724 setobj = dstkey ?
3725 lookupKeyWrite(c->db,setskeys[j]) :
3726 lookupKeyRead(c->db,setskeys[j]);
3727 if (!setobj) {
3728 zfree(dv);
3729 if (dstkey) {
3730 deleteKey(c->db,dstkey);
3731 addReply(c,shared.ok);
3732 } else {
3733 addReply(c,shared.nullmultibulk);
3734 }
3735 return;
3736 }
3737 if (setobj->type != REDIS_SET) {
3738 zfree(dv);
3739 addReply(c,shared.wrongtypeerr);
3740 return;
3741 }
3742 dv[j] = setobj->ptr;
3743 }
3744 /* Sort sets from the smallest to largest, this will improve our
3745 * algorithm's performace */
3746 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3747
3748 /* The first thing we should output is the total number of elements...
3749 * since this is a multi-bulk write, but at this stage we don't know
3750 * the intersection set size, so we use a trick, append an empty object
3751 * to the output list and save the pointer to later modify it with the
3752 * right length */
3753 if (!dstkey) {
3754 lenobj = createObject(REDIS_STRING,NULL);
3755 addReply(c,lenobj);
3756 decrRefCount(lenobj);
3757 } else {
3758 /* If we have a target key where to store the resulting set
3759 * create this key with an empty set inside */
3760 dstset = createSetObject();
3761 }
3762
3763 /* Iterate all the elements of the first (smallest) set, and test
3764 * the element against all the other sets, if at least one set does
3765 * not include the element it is discarded */
3766 di = dictGetIterator(dv[0]);
3767
3768 while((de = dictNext(di)) != NULL) {
3769 robj *ele;
3770
3771 for (j = 1; j < setsnum; j++)
3772 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3773 if (j != setsnum)
3774 continue; /* at least one set does not contain the member */
3775 ele = dictGetEntryKey(de);
3776 if (!dstkey) {
3777 addReplyBulkLen(c,ele);
3778 addReply(c,ele);
3779 addReply(c,shared.crlf);
3780 cardinality++;
3781 } else {
3782 dictAdd(dstset->ptr,ele,NULL);
3783 incrRefCount(ele);
3784 }
3785 }
3786 dictReleaseIterator(di);
3787
3788 if (dstkey) {
3789 /* Store the resulting set into the target */
3790 deleteKey(c->db,dstkey);
3791 dictAdd(c->db->dict,dstkey,dstset);
3792 incrRefCount(dstkey);
3793 }
3794
3795 if (!dstkey) {
3796 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3797 } else {
3798 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3799 dictSize((dict*)dstset->ptr)));
3800 server.dirty++;
3801 }
3802 zfree(dv);
3803 }
3804
3805 static void sinterCommand(redisClient *c) {
3806 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3807 }
3808
3809 static void sinterstoreCommand(redisClient *c) {
3810 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3811 }
3812
3813 #define REDIS_OP_UNION 0
3814 #define REDIS_OP_DIFF 1
3815
3816 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3817 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3818 dictIterator *di;
3819 dictEntry *de;
3820 robj *dstset = NULL;
3821 int j, cardinality = 0;
3822
3823 for (j = 0; j < setsnum; j++) {
3824 robj *setobj;
3825
3826 setobj = dstkey ?
3827 lookupKeyWrite(c->db,setskeys[j]) :
3828 lookupKeyRead(c->db,setskeys[j]);
3829 if (!setobj) {
3830 dv[j] = NULL;
3831 continue;
3832 }
3833 if (setobj->type != REDIS_SET) {
3834 zfree(dv);
3835 addReply(c,shared.wrongtypeerr);
3836 return;
3837 }
3838 dv[j] = setobj->ptr;
3839 }
3840
3841 /* We need a temp set object to store our union. If the dstkey
3842 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3843 * this set object will be the resulting object to set into the target key*/
3844 dstset = createSetObject();
3845
3846 /* Iterate all the elements of all the sets, add every element a single
3847 * time to the result set */
3848 for (j = 0; j < setsnum; j++) {
3849 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3850 if (!dv[j]) continue; /* non existing keys are like empty sets */
3851
3852 di = dictGetIterator(dv[j]);
3853
3854 while((de = dictNext(di)) != NULL) {
3855 robj *ele;
3856
3857 /* dictAdd will not add the same element multiple times */
3858 ele = dictGetEntryKey(de);
3859 if (op == REDIS_OP_UNION || j == 0) {
3860 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3861 incrRefCount(ele);
3862 cardinality++;
3863 }
3864 } else if (op == REDIS_OP_DIFF) {
3865 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3866 cardinality--;
3867 }
3868 }
3869 }
3870 dictReleaseIterator(di);
3871
3872 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3873 }
3874
3875 /* Output the content of the resulting set, if not in STORE mode */
3876 if (!dstkey) {
3877 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3878 di = dictGetIterator(dstset->ptr);
3879 while((de = dictNext(di)) != NULL) {
3880 robj *ele;
3881
3882 ele = dictGetEntryKey(de);
3883 addReplyBulkLen(c,ele);
3884 addReply(c,ele);
3885 addReply(c,shared.crlf);
3886 }
3887 dictReleaseIterator(di);
3888 } else {
3889 /* If we have a target key where to store the resulting set
3890 * create this key with the result set inside */
3891 deleteKey(c->db,dstkey);
3892 dictAdd(c->db->dict,dstkey,dstset);
3893 incrRefCount(dstkey);
3894 }
3895
3896 /* Cleanup */
3897 if (!dstkey) {
3898 decrRefCount(dstset);
3899 } else {
3900 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3901 dictSize((dict*)dstset->ptr)));
3902 server.dirty++;
3903 }
3904 zfree(dv);
3905 }
3906
3907 static void sunionCommand(redisClient *c) {
3908 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3909 }
3910
3911 static void sunionstoreCommand(redisClient *c) {
3912 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3913 }
3914
3915 static void sdiffCommand(redisClient *c) {
3916 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3917 }
3918
3919 static void sdiffstoreCommand(redisClient *c) {
3920 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3921 }
3922
3923 /* ==================================== ZSets =============================== */
3924
3925 /* ZSETs are ordered sets using two data structures to hold the same elements
3926 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3927 * data structure.
3928 *
3929 * The elements are added to an hash table mapping Redis objects to scores.
3930 * At the same time the elements are added to a skip list mapping scores
3931 * to Redis objects (so objects are sorted by scores in this "view"). */
3932
3933 /* This skiplist implementation is almost a C translation of the original
3934 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3935 * Alternative to Balanced Trees", modified in three ways:
3936 * a) this implementation allows for repeated values.
3937 * b) the comparison is not just by key (our 'score') but by satellite data.
3938 * c) there is a back pointer, so it's a doubly linked list with the back
3939 * pointers being only at "level 1". This allows to traverse the list
3940 * from tail to head, useful for ZREVRANGE. */
3941
3942 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3943 zskiplistNode *zn = zmalloc(sizeof(*zn));
3944
3945 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3946 zn->score = score;
3947 zn->obj = obj;
3948 return zn;
3949 }
3950
3951 static zskiplist *zslCreate(void) {
3952 int j;
3953 zskiplist *zsl;
3954
3955 zsl = zmalloc(sizeof(*zsl));
3956 zsl->level = 1;
3957 zsl->length = 0;
3958 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3959 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3960 zsl->header->forward[j] = NULL;
3961 zsl->header->backward = NULL;
3962 zsl->tail = NULL;
3963 return zsl;
3964 }
3965
3966 static void zslFreeNode(zskiplistNode *node) {
3967 decrRefCount(node->obj);
3968 zfree(node->forward);
3969 zfree(node);
3970 }
3971
3972 static void zslFree(zskiplist *zsl) {
3973 zskiplistNode *node = zsl->header->forward[0], *next;
3974
3975 zfree(zsl->header->forward);
3976 zfree(zsl->header);
3977 while(node) {
3978 next = node->forward[0];
3979 zslFreeNode(node);
3980 node = next;
3981 }
3982 zfree(zsl);
3983 }
3984
3985 static int zslRandomLevel(void) {
3986 int level = 1;
3987 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3988 level += 1;
3989 return level;
3990 }
3991
3992 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3993 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3994 int i, level;
3995
3996 x = zsl->header;
3997 for (i = zsl->level-1; i >= 0; i--) {
3998 while (x->forward[i] &&
3999 (x->forward[i]->score < score ||
4000 (x->forward[i]->score == score &&
4001 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4002 x = x->forward[i];
4003 update[i] = x;
4004 }
4005 /* we assume the key is not already inside, since we allow duplicated
4006 * scores, and the re-insertion of score and redis object should never
4007 * happpen since the caller of zslInsert() should test in the hash table
4008 * if the element is already inside or not. */
4009 level = zslRandomLevel();
4010 if (level > zsl->level) {
4011 for (i = zsl->level; i < level; i++)
4012 update[i] = zsl->header;
4013 zsl->level = level;
4014 }
4015 x = zslCreateNode(level,score,obj);
4016 for (i = 0; i < level; i++) {
4017 x->forward[i] = update[i]->forward[i];
4018 update[i]->forward[i] = x;
4019 }
4020 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4021 if (x->forward[0])
4022 x->forward[0]->backward = x;
4023 else
4024 zsl->tail = x;
4025 zsl->length++;
4026 }
4027
4028 /* Delete an element with matching score/object from the skiplist. */
4029 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4030 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4031 int i;
4032
4033 x = zsl->header;
4034 for (i = zsl->level-1; i >= 0; i--) {
4035 while (x->forward[i] &&
4036 (x->forward[i]->score < score ||
4037 (x->forward[i]->score == score &&
4038 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4039 x = x->forward[i];
4040 update[i] = x;
4041 }
4042 /* We may have multiple elements with the same score, what we need
4043 * is to find the element with both the right score and object. */
4044 x = x->forward[0];
4045 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4046 for (i = 0; i < zsl->level; i++) {
4047 if (update[i]->forward[i] != x) break;
4048 update[i]->forward[i] = x->forward[i];
4049 }
4050 if (x->forward[0]) {
4051 x->forward[0]->backward = (x->backward == zsl->header) ?
4052 NULL : x->backward;
4053 } else {
4054 zsl->tail = x->backward;
4055 }
4056 zslFreeNode(x);
4057 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4058 zsl->level--;
4059 zsl->length--;
4060 return 1;
4061 } else {
4062 return 0; /* not found */
4063 }
4064 return 0; /* not found */
4065 }
4066
4067 /* Delete all the elements with score between min and max from the skiplist.
4068 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4069 * Note that this function takes the reference to the hash table view of the
4070 * sorted set, in order to remove the elements from the hash table too. */
4071 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4072 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4073 unsigned long removed = 0;
4074 int i;
4075
4076 x = zsl->header;
4077 for (i = zsl->level-1; i >= 0; i--) {
4078 while (x->forward[i] && x->forward[i]->score < min)
4079 x = x->forward[i];
4080 update[i] = x;
4081 }
4082 /* We may have multiple elements with the same score, what we need
4083 * is to find the element with both the right score and object. */
4084 x = x->forward[0];
4085 while (x && x->score <= max) {
4086 zskiplistNode *next;
4087
4088 for (i = 0; i < zsl->level; i++) {
4089 if (update[i]->forward[i] != x) break;
4090 update[i]->forward[i] = x->forward[i];
4091 }
4092 if (x->forward[0]) {
4093 x->forward[0]->backward = (x->backward == zsl->header) ?
4094 NULL : x->backward;
4095 } else {
4096 zsl->tail = x->backward;
4097 }
4098 next = x->forward[0];
4099 dictDelete(dict,x->obj);
4100 zslFreeNode(x);
4101 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4102 zsl->level--;
4103 zsl->length--;
4104 removed++;
4105 x = next;
4106 }
4107 return removed; /* not found */
4108 }
4109
4110 /* Find the first node having a score equal or greater than the specified one.
4111 * Returns NULL if there is no match. */
4112 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4113 zskiplistNode *x;
4114 int i;
4115
4116 x = zsl->header;
4117 for (i = zsl->level-1; i >= 0; i--) {
4118 while (x->forward[i] && x->forward[i]->score < score)
4119 x = x->forward[i];
4120 }
4121 /* We may have multiple elements with the same score, what we need
4122 * is to find the element with both the right score and object. */
4123 return x->forward[0];
4124 }
4125
4126 /* The actual Z-commands implementations */
4127
4128 static void zaddCommand(redisClient *c) {
4129 robj *zsetobj;
4130 zset *zs;
4131 double *score;
4132
4133 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4134 if (zsetobj == NULL) {
4135 zsetobj = createZsetObject();
4136 dictAdd(c->db->dict,c->argv[1],zsetobj);
4137 incrRefCount(c->argv[1]);
4138 } else {
4139 if (zsetobj->type != REDIS_ZSET) {
4140 addReply(c,shared.wrongtypeerr);
4141 return;
4142 }
4143 }
4144 score = zmalloc(sizeof(double));
4145 *score = strtod(c->argv[2]->ptr,NULL);
4146 zs = zsetobj->ptr;
4147 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
4148 /* case 1: New element */
4149 incrRefCount(c->argv[3]); /* added to hash */
4150 zslInsert(zs->zsl,*score,c->argv[3]);
4151 incrRefCount(c->argv[3]); /* added to skiplist */
4152 server.dirty++;
4153 addReply(c,shared.cone);
4154 } else {
4155 dictEntry *de;
4156 double *oldscore;
4157
4158 /* case 2: Score update operation */
4159 de = dictFind(zs->dict,c->argv[3]);
4160 assert(de != NULL);
4161 oldscore = dictGetEntryVal(de);
4162 if (*score != *oldscore) {
4163 int deleted;
4164
4165 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
4166 assert(deleted != 0);
4167 zslInsert(zs->zsl,*score,c->argv[3]);
4168 incrRefCount(c->argv[3]);
4169 dictReplace(zs->dict,c->argv[3],score);
4170 server.dirty++;
4171 } else {
4172 zfree(score);
4173 }
4174 addReply(c,shared.czero);
4175 }
4176 }
4177
4178 static void zremCommand(redisClient *c) {
4179 robj *zsetobj;
4180 zset *zs;
4181
4182 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4183 if (zsetobj == NULL) {
4184 addReply(c,shared.czero);
4185 } else {
4186 dictEntry *de;
4187 double *oldscore;
4188 int deleted;
4189
4190 if (zsetobj->type != REDIS_ZSET) {
4191 addReply(c,shared.wrongtypeerr);
4192 return;
4193 }
4194 zs = zsetobj->ptr;
4195 de = dictFind(zs->dict,c->argv[2]);
4196 if (de == NULL) {
4197 addReply(c,shared.czero);
4198 return;
4199 }
4200 /* Delete from the skiplist */
4201 oldscore = dictGetEntryVal(de);
4202 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4203 assert(deleted != 0);
4204
4205 /* Delete from the hash table */
4206 dictDelete(zs->dict,c->argv[2]);
4207 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4208 server.dirty++;
4209 addReply(c,shared.cone);
4210 }
4211 }
4212
4213 static void zremrangebyscoreCommand(redisClient *c) {
4214 double min = strtod(c->argv[2]->ptr,NULL);
4215 double max = strtod(c->argv[3]->ptr,NULL);
4216 robj *zsetobj;
4217 zset *zs;
4218
4219 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4220 if (zsetobj == NULL) {
4221 addReply(c,shared.czero);
4222 } else {
4223 long deleted;
4224
4225 if (zsetobj->type != REDIS_ZSET) {
4226 addReply(c,shared.wrongtypeerr);
4227 return;
4228 }
4229 zs = zsetobj->ptr;
4230 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4231 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4232 server.dirty += deleted;
4233 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4234 }
4235 }
4236
4237 static void zrangeGenericCommand(redisClient *c, int reverse) {
4238 robj *o;
4239 int start = atoi(c->argv[2]->ptr);
4240 int end = atoi(c->argv[3]->ptr);
4241
4242 o = lookupKeyRead(c->db,c->argv[1]);
4243 if (o == NULL) {
4244 addReply(c,shared.nullmultibulk);
4245 } else {
4246 if (o->type != REDIS_ZSET) {
4247 addReply(c,shared.wrongtypeerr);
4248 } else {
4249 zset *zsetobj = o->ptr;
4250 zskiplist *zsl = zsetobj->zsl;
4251 zskiplistNode *ln;
4252
4253 int llen = zsl->length;
4254 int rangelen, j;
4255 robj *ele;
4256
4257 /* convert negative indexes */
4258 if (start < 0) start = llen+start;
4259 if (end < 0) end = llen+end;
4260 if (start < 0) start = 0;
4261 if (end < 0) end = 0;
4262
4263 /* indexes sanity checks */
4264 if (start > end || start >= llen) {
4265 /* Out of range start or start > end result in empty list */
4266 addReply(c,shared.emptymultibulk);
4267 return;
4268 }
4269 if (end >= llen) end = llen-1;
4270 rangelen = (end-start)+1;
4271
4272 /* Return the result in form of a multi-bulk reply */
4273 if (reverse) {
4274 ln = zsl->tail;
4275 while (start--)
4276 ln = ln->backward;
4277 } else {
4278 ln = zsl->header->forward[0];
4279 while (start--)
4280 ln = ln->forward[0];
4281 }
4282
4283 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4284 for (j = 0; j < rangelen; j++) {
4285 ele = ln->obj;
4286 addReplyBulkLen(c,ele);
4287 addReply(c,ele);
4288 addReply(c,shared.crlf);
4289 ln = reverse ? ln->backward : ln->forward[0];
4290 }
4291 }
4292 }
4293 }
4294
4295 static void zrangeCommand(redisClient *c) {
4296 zrangeGenericCommand(c,0);
4297 }
4298
4299 static void zrevrangeCommand(redisClient *c) {
4300 zrangeGenericCommand(c,1);
4301 }
4302
4303 static void zrangebyscoreCommand(redisClient *c) {
4304 robj *o;
4305 double min = strtod(c->argv[2]->ptr,NULL);
4306 double max = strtod(c->argv[3]->ptr,NULL);
4307
4308 o = lookupKeyRead(c->db,c->argv[1]);
4309 if (o == NULL) {
4310 addReply(c,shared.nullmultibulk);
4311 } else {
4312 if (o->type != REDIS_ZSET) {
4313 addReply(c,shared.wrongtypeerr);
4314 } else {
4315 zset *zsetobj = o->ptr;
4316 zskiplist *zsl = zsetobj->zsl;
4317 zskiplistNode *ln;
4318 robj *ele, *lenobj;
4319 unsigned int rangelen = 0;
4320
4321 /* Get the first node with the score >= min */
4322 ln = zslFirstWithScore(zsl,min);
4323 if (ln == NULL) {
4324 /* No element matching the speciifed interval */
4325 addReply(c,shared.emptymultibulk);
4326 return;
4327 }
4328
4329 /* We don't know in advance how many matching elements there
4330 * are in the list, so we push this object that will represent
4331 * the multi-bulk length in the output buffer, and will "fix"
4332 * it later */
4333 lenobj = createObject(REDIS_STRING,NULL);
4334 addReply(c,lenobj);
4335
4336 while(ln && ln->score <= max) {
4337 ele = ln->obj;
4338 addReplyBulkLen(c,ele);
4339 addReply(c,ele);
4340 addReply(c,shared.crlf);
4341 ln = ln->forward[0];
4342 rangelen++;
4343 }
4344 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4345 }
4346 }
4347 }
4348
4349 static void zcardCommand(redisClient *c) {
4350 robj *o;
4351 zset *zs;
4352
4353 o = lookupKeyRead(c->db,c->argv[1]);
4354 if (o == NULL) {
4355 addReply(c,shared.czero);
4356 return;
4357 } else {
4358 if (o->type != REDIS_ZSET) {
4359 addReply(c,shared.wrongtypeerr);
4360 } else {
4361 zs = o->ptr;
4362 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4363 }
4364 }
4365 }
4366
4367 static void zscoreCommand(redisClient *c) {
4368 robj *o;
4369 zset *zs;
4370
4371 o = lookupKeyRead(c->db,c->argv[1]);
4372 if (o == NULL) {
4373 addReply(c,shared.czero);
4374 return;
4375 } else {
4376 if (o->type != REDIS_ZSET) {
4377 addReply(c,shared.wrongtypeerr);
4378 } else {
4379 dictEntry *de;
4380
4381 zs = o->ptr;
4382 de = dictFind(zs->dict,c->argv[2]);
4383 if (!de) {
4384 addReply(c,shared.nullbulk);
4385 } else {
4386 char buf[128];
4387 double *score = dictGetEntryVal(de);
4388
4389 snprintf(buf,sizeof(buf),"%.17g",*score);
4390 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
4391 strlen(buf),buf));
4392 }
4393 }
4394 }
4395 }
4396
4397 /* ========================= Non type-specific commands ==================== */
4398
4399 static void flushdbCommand(redisClient *c) {
4400 server.dirty += dictSize(c->db->dict);
4401 dictEmpty(c->db->dict);
4402 dictEmpty(c->db->expires);
4403 addReply(c,shared.ok);
4404 }
4405
4406 static void flushallCommand(redisClient *c) {
4407 server.dirty += emptyDb();
4408 addReply(c,shared.ok);
4409 rdbSave(server.dbfilename);
4410 server.dirty++;
4411 }
4412
4413 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4414 redisSortOperation *so = zmalloc(sizeof(*so));
4415 so->type = type;
4416 so->pattern = pattern;
4417 return so;
4418 }
4419
4420 /* Return the value associated to the key with a name obtained
4421 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4422 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4423 char *p;
4424 sds spat, ssub;
4425 robj keyobj;
4426 int prefixlen, sublen, postfixlen;
4427 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4428 struct {
4429 long len;
4430 long free;
4431 char buf[REDIS_SORTKEY_MAX+1];
4432 } keyname;
4433
4434 if (subst->encoding == REDIS_ENCODING_RAW)
4435 incrRefCount(subst);
4436 else {
4437 subst = getDecodedObject(subst);
4438 }
4439
4440 spat = pattern->ptr;
4441 ssub = subst->ptr;
4442 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4443 p = strchr(spat,'*');
4444 if (!p) return NULL;
4445
4446 prefixlen = p-spat;
4447 sublen = sdslen(ssub);
4448 postfixlen = sdslen(spat)-(prefixlen+1);
4449 memcpy(keyname.buf,spat,prefixlen);
4450 memcpy(keyname.buf+prefixlen,ssub,sublen);
4451 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4452 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4453 keyname.len = prefixlen+sublen+postfixlen;
4454
4455 keyobj.refcount = 1;
4456 keyobj.type = REDIS_STRING;
4457 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4458
4459 decrRefCount(subst);
4460
4461 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4462 return lookupKeyRead(db,&keyobj);
4463 }
4464
4465 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4466 * the additional parameter is not standard but a BSD-specific we have to
4467 * pass sorting parameters via the global 'server' structure */
4468 static int sortCompare(const void *s1, const void *s2) {
4469 const redisSortObject *so1 = s1, *so2 = s2;
4470 int cmp;
4471
4472 if (!server.sort_alpha) {
4473 /* Numeric sorting. Here it's trivial as we precomputed scores */
4474 if (so1->u.score > so2->u.score) {
4475 cmp = 1;
4476 } else if (so1->u.score < so2->u.score) {
4477 cmp = -1;
4478 } else {
4479 cmp = 0;
4480 }
4481 } else {
4482 /* Alphanumeric sorting */
4483 if (server.sort_bypattern) {
4484 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4485 /* At least one compare object is NULL */
4486 if (so1->u.cmpobj == so2->u.cmpobj)
4487 cmp = 0;
4488 else if (so1->u.cmpobj == NULL)
4489 cmp = -1;
4490 else
4491 cmp = 1;
4492 } else {
4493 /* We have both the objects, use strcoll */
4494 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4495 }
4496 } else {
4497 /* Compare elements directly */
4498 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4499 so2->obj->encoding == REDIS_ENCODING_RAW) {
4500 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4501 } else {
4502 robj *dec1, *dec2;
4503
4504 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4505 so1->obj : getDecodedObject(so1->obj);
4506 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4507 so2->obj : getDecodedObject(so2->obj);
4508 cmp = strcoll(dec1->ptr,dec2->ptr);
4509 if (dec1 != so1->obj) decrRefCount(dec1);
4510 if (dec2 != so2->obj) decrRefCount(dec2);
4511 }
4512 }
4513 }
4514 return server.sort_desc ? -cmp : cmp;
4515 }
4516
4517 /* The SORT command is the most complex command in Redis. Warning: this code
4518 * is optimized for speed and a bit less for readability */
4519 static void sortCommand(redisClient *c) {
4520 list *operations;
4521 int outputlen = 0;
4522 int desc = 0, alpha = 0;
4523 int limit_start = 0, limit_count = -1, start, end;
4524 int j, dontsort = 0, vectorlen;
4525 int getop = 0; /* GET operation counter */
4526 robj *sortval, *sortby = NULL, *storekey = NULL;
4527 redisSortObject *vector; /* Resulting vector to sort */
4528
4529 /* Lookup the key to sort. It must be of the right types */
4530 sortval = lookupKeyRead(c->db,c->argv[1]);
4531 if (sortval == NULL) {
4532 addReply(c,shared.nokeyerr);
4533 return;
4534 }
4535 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4536 addReply(c,shared.wrongtypeerr);
4537 return;
4538 }
4539
4540 /* Create a list of operations to perform for every sorted element.
4541 * Operations can be GET/DEL/INCR/DECR */
4542 operations = listCreate();
4543 listSetFreeMethod(operations,zfree);
4544 j = 2;
4545
4546 /* Now we need to protect sortval incrementing its count, in the future
4547 * SORT may have options able to overwrite/delete keys during the sorting
4548 * and the sorted key itself may get destroied */
4549 incrRefCount(sortval);
4550
4551 /* The SORT command has an SQL-alike syntax, parse it */
4552 while(j < c->argc) {
4553 int leftargs = c->argc-j-1;
4554 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4555 desc = 0;
4556 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4557 desc = 1;
4558 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4559 alpha = 1;
4560 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4561 limit_start = atoi(c->argv[j+1]->ptr);
4562 limit_count = atoi(c->argv[j+2]->ptr);
4563 j+=2;
4564 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4565 storekey = c->argv[j+1];
4566 j++;
4567 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4568 sortby = c->argv[j+1];
4569 /* If the BY pattern does not contain '*', i.e. it is constant,
4570 * we don't need to sort nor to lookup the weight keys. */
4571 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4572 j++;
4573 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4574 listAddNodeTail(operations,createSortOperation(
4575 REDIS_SORT_GET,c->argv[j+1]));
4576 getop++;
4577 j++;
4578 } else {
4579 decrRefCount(sortval);
4580 listRelease(operations);
4581 addReply(c,shared.syntaxerr);
4582 return;
4583 }
4584 j++;
4585 }
4586
4587 /* Load the sorting vector with all the objects to sort */
4588 vectorlen = (sortval->type == REDIS_LIST) ?
4589 listLength((list*)sortval->ptr) :
4590 dictSize((dict*)sortval->ptr);
4591 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4592 j = 0;
4593 if (sortval->type == REDIS_LIST) {
4594 list *list = sortval->ptr;
4595 listNode *ln;
4596
4597 listRewind(list);
4598 while((ln = listYield(list))) {
4599 robj *ele = ln->value;
4600 vector[j].obj = ele;
4601 vector[j].u.score = 0;
4602 vector[j].u.cmpobj = NULL;
4603 j++;
4604 }
4605 } else {
4606 dict *set = sortval->ptr;
4607 dictIterator *di;
4608 dictEntry *setele;
4609
4610 di = dictGetIterator(set);
4611 while((setele = dictNext(di)) != NULL) {
4612 vector[j].obj = dictGetEntryKey(setele);
4613 vector[j].u.score = 0;
4614 vector[j].u.cmpobj = NULL;
4615 j++;
4616 }
4617 dictReleaseIterator(di);
4618 }
4619 assert(j == vectorlen);
4620
4621 /* Now it's time to load the right scores in the sorting vector */
4622 if (dontsort == 0) {
4623 for (j = 0; j < vectorlen; j++) {
4624 if (sortby) {
4625 robj *byval;
4626
4627 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4628 if (!byval || byval->type != REDIS_STRING) continue;
4629 if (alpha) {
4630 if (byval->encoding == REDIS_ENCODING_RAW) {
4631 vector[j].u.cmpobj = byval;
4632 incrRefCount(byval);
4633 } else {
4634 vector[j].u.cmpobj = getDecodedObject(byval);
4635 }
4636 } else {
4637 if (byval->encoding == REDIS_ENCODING_RAW) {
4638 vector[j].u.score = strtod(byval->ptr,NULL);
4639 } else {
4640 if (byval->encoding == REDIS_ENCODING_INT) {
4641 vector[j].u.score = (long)byval->ptr;
4642 } else
4643 assert(1 != 1);
4644 }
4645 }
4646 } else {
4647 if (!alpha) {
4648 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4649 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4650 else {
4651 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4652 vector[j].u.score = (long) vector[j].obj->ptr;
4653 else
4654 assert(1 != 1);
4655 }
4656 }
4657 }
4658 }
4659 }
4660
4661 /* We are ready to sort the vector... perform a bit of sanity check
4662 * on the LIMIT option too. We'll use a partial version of quicksort. */
4663 start = (limit_start < 0) ? 0 : limit_start;
4664 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4665 if (start >= vectorlen) {
4666 start = vectorlen-1;
4667 end = vectorlen-2;
4668 }
4669 if (end >= vectorlen) end = vectorlen-1;
4670
4671 if (dontsort == 0) {
4672 server.sort_desc = desc;
4673 server.sort_alpha = alpha;
4674 server.sort_bypattern = sortby ? 1 : 0;
4675 if (sortby && (start != 0 || end != vectorlen-1))
4676 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4677 else
4678 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4679 }
4680
4681 /* Send command output to the output buffer, performing the specified
4682 * GET/DEL/INCR/DECR operations if any. */
4683 outputlen = getop ? getop*(end-start+1) : end-start+1;
4684 if (storekey == NULL) {
4685 /* STORE option not specified, sent the sorting result to client */
4686 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4687 for (j = start; j <= end; j++) {
4688 listNode *ln;
4689 if (!getop) {
4690 addReplyBulkLen(c,vector[j].obj);
4691 addReply(c,vector[j].obj);
4692 addReply(c,shared.crlf);
4693 }
4694 listRewind(operations);
4695 while((ln = listYield(operations))) {
4696 redisSortOperation *sop = ln->value;
4697 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4698 vector[j].obj);
4699
4700 if (sop->type == REDIS_SORT_GET) {
4701 if (!val || val->type != REDIS_STRING) {
4702 addReply(c,shared.nullbulk);
4703 } else {
4704 addReplyBulkLen(c,val);
4705 addReply(c,val);
4706 addReply(c,shared.crlf);
4707 }
4708 } else {
4709 assert(sop->type == REDIS_SORT_GET); /* always fails */
4710 }
4711 }
4712 }
4713 } else {
4714 robj *listObject = createListObject();
4715 list *listPtr = (list*) listObject->ptr;
4716
4717 /* STORE option specified, set the sorting result as a List object */
4718 for (j = start; j <= end; j++) {
4719 listNode *ln;
4720 if (!getop) {
4721 listAddNodeTail(listPtr,vector[j].obj);
4722 incrRefCount(vector[j].obj);
4723 }
4724 listRewind(operations);
4725 while((ln = listYield(operations))) {
4726 redisSortOperation *sop = ln->value;
4727 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4728 vector[j].obj);
4729
4730 if (sop->type == REDIS_SORT_GET) {
4731 if (!val || val->type != REDIS_STRING) {
4732 listAddNodeTail(listPtr,createStringObject("",0));
4733 } else {
4734 listAddNodeTail(listPtr,val);
4735 incrRefCount(val);
4736 }
4737 } else {
4738 assert(sop->type == REDIS_SORT_GET); /* always fails */
4739 }
4740 }
4741 }
4742 if (dictReplace(c->db->dict,storekey,listObject)) {
4743 incrRefCount(storekey);
4744 }
4745 /* Note: we add 1 because the DB is dirty anyway since even if the
4746 * SORT result is empty a new key is set and maybe the old content
4747 * replaced. */
4748 server.dirty += 1+outputlen;
4749 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
4750 }
4751
4752 /* Cleanup */
4753 decrRefCount(sortval);
4754 listRelease(operations);
4755 for (j = 0; j < vectorlen; j++) {
4756 if (sortby && alpha && vector[j].u.cmpobj)
4757 decrRefCount(vector[j].u.cmpobj);
4758 }
4759 zfree(vector);
4760 }
4761
4762 static void infoCommand(redisClient *c) {
4763 sds info;
4764 time_t uptime = time(NULL)-server.stat_starttime;
4765 int j;
4766
4767 info = sdscatprintf(sdsempty(),
4768 "redis_version:%s\r\n"
4769 "arch_bits:%s\r\n"
4770 "uptime_in_seconds:%d\r\n"
4771 "uptime_in_days:%d\r\n"
4772 "connected_clients:%d\r\n"
4773 "connected_slaves:%d\r\n"
4774 "used_memory:%zu\r\n"
4775 "changes_since_last_save:%lld\r\n"
4776 "bgsave_in_progress:%d\r\n"
4777 "last_save_time:%d\r\n"
4778 "total_connections_received:%lld\r\n"
4779 "total_commands_processed:%lld\r\n"
4780 "role:%s\r\n"
4781 ,REDIS_VERSION,
4782 (sizeof(long) == 8) ? "64" : "32",
4783 uptime,
4784 uptime/(3600*24),
4785 listLength(server.clients)-listLength(server.slaves),
4786 listLength(server.slaves),
4787 server.usedmemory,
4788 server.dirty,
4789 server.bgsaveinprogress,
4790 server.lastsave,
4791 server.stat_numconnections,
4792 server.stat_numcommands,
4793 server.masterhost == NULL ? "master" : "slave"
4794 );
4795 if (server.masterhost) {
4796 info = sdscatprintf(info,
4797 "master_host:%s\r\n"
4798 "master_port:%d\r\n"
4799 "master_link_status:%s\r\n"
4800 "master_last_io_seconds_ago:%d\r\n"
4801 ,server.masterhost,
4802 server.masterport,
4803 (server.replstate == REDIS_REPL_CONNECTED) ?
4804 "up" : "down",
4805 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
4806 );
4807 }
4808 for (j = 0; j < server.dbnum; j++) {
4809 long long keys, vkeys;
4810
4811 keys = dictSize(server.db[j].dict);
4812 vkeys = dictSize(server.db[j].expires);
4813 if (keys || vkeys) {
4814 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4815 j, keys, vkeys);
4816 }
4817 }
4818 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4819 addReplySds(c,info);
4820 addReply(c,shared.crlf);
4821 }
4822
4823 static void monitorCommand(redisClient *c) {
4824 /* ignore MONITOR if aleady slave or in monitor mode */
4825 if (c->flags & REDIS_SLAVE) return;
4826
4827 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4828 c->slaveseldb = 0;
4829 listAddNodeTail(server.monitors,c);
4830 addReply(c,shared.ok);
4831 }
4832
4833 /* ================================= Expire ================================= */
4834 static int removeExpire(redisDb *db, robj *key) {
4835 if (dictDelete(db->expires,key) == DICT_OK) {
4836 return 1;
4837 } else {
4838 return 0;
4839 }
4840 }
4841
4842 static int setExpire(redisDb *db, robj *key, time_t when) {
4843 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4844 return 0;
4845 } else {
4846 incrRefCount(key);
4847 return 1;
4848 }
4849 }
4850
4851 /* Return the expire time of the specified key, or -1 if no expire
4852 * is associated with this key (i.e. the key is non volatile) */
4853 static time_t getExpire(redisDb *db, robj *key) {
4854 dictEntry *de;
4855
4856 /* No expire? return ASAP */
4857 if (dictSize(db->expires) == 0 ||
4858 (de = dictFind(db->expires,key)) == NULL) return -1;
4859
4860 return (time_t) dictGetEntryVal(de);
4861 }
4862
4863 static int expireIfNeeded(redisDb *db, robj *key) {
4864 time_t when;
4865 dictEntry *de;
4866
4867 /* No expire? return ASAP */
4868 if (dictSize(db->expires) == 0 ||
4869 (de = dictFind(db->expires,key)) == NULL) return 0;
4870
4871 /* Lookup the expire */
4872 when = (time_t) dictGetEntryVal(de);
4873 if (time(NULL) <= when) return 0;
4874
4875 /* Delete the key */
4876 dictDelete(db->expires,key);
4877 return dictDelete(db->dict,key) == DICT_OK;
4878 }
4879
4880 static int deleteIfVolatile(redisDb *db, robj *key) {
4881 dictEntry *de;
4882
4883 /* No expire? return ASAP */
4884 if (dictSize(db->expires) == 0 ||
4885 (de = dictFind(db->expires,key)) == NULL) return 0;
4886
4887 /* Delete the key */
4888 server.dirty++;
4889 dictDelete(db->expires,key);
4890 return dictDelete(db->dict,key) == DICT_OK;
4891 }
4892
4893 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
4894 dictEntry *de;
4895
4896 de = dictFind(c->db->dict,key);
4897 if (de == NULL) {
4898 addReply(c,shared.czero);
4899 return;
4900 }
4901 if (seconds < 0) {
4902 if (deleteKey(c->db,key)) server.dirty++;
4903 addReply(c, shared.cone);
4904 return;
4905 } else {
4906 time_t when = time(NULL)+seconds;
4907 if (setExpire(c->db,key,when)) {
4908 addReply(c,shared.cone);
4909 server.dirty++;
4910 } else {
4911 addReply(c,shared.czero);
4912 }
4913 return;
4914 }
4915 }
4916
4917 static void expireCommand(redisClient *c) {
4918 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
4919 }
4920
4921 static void expireatCommand(redisClient *c) {
4922 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
4923 }
4924
4925 static void ttlCommand(redisClient *c) {
4926 time_t expire;
4927 int ttl = -1;
4928
4929 expire = getExpire(c->db,c->argv[1]);
4930 if (expire != -1) {
4931 ttl = (int) (expire-time(NULL));
4932 if (ttl < 0) ttl = -1;
4933 }
4934 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4935 }
4936
4937 static void msetGenericCommand(redisClient *c, int nx) {
4938 int j;
4939
4940 if ((c->argc % 2) == 0) {
4941 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4942 return;
4943 }
4944 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4945 * set nothing at all if at least one already key exists. */
4946 if (nx) {
4947 for (j = 1; j < c->argc; j += 2) {
4948 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4949 addReply(c, shared.czero);
4950 return;
4951 }
4952 }
4953 }
4954
4955 for (j = 1; j < c->argc; j += 2) {
4956 int retval;
4957
4958 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4959 if (retval == DICT_ERR) {
4960 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4961 incrRefCount(c->argv[j+1]);
4962 } else {
4963 incrRefCount(c->argv[j]);
4964 incrRefCount(c->argv[j+1]);
4965 }
4966 removeExpire(c->db,c->argv[j]);
4967 }
4968 server.dirty += (c->argc-1)/2;
4969 addReply(c, nx ? shared.cone : shared.ok);
4970 }
4971
4972 static void msetCommand(redisClient *c) {
4973 msetGenericCommand(c,0);
4974 }
4975
4976 static void msetnxCommand(redisClient *c) {
4977 msetGenericCommand(c,1);
4978 }
4979
4980 /* =============================== Replication ============================= */
4981
4982 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4983 ssize_t nwritten, ret = size;
4984 time_t start = time(NULL);
4985
4986 timeout++;
4987 while(size) {
4988 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4989 nwritten = write(fd,ptr,size);
4990 if (nwritten == -1) return -1;
4991 ptr += nwritten;
4992 size -= nwritten;
4993 }
4994 if ((time(NULL)-start) > timeout) {
4995 errno = ETIMEDOUT;
4996 return -1;
4997 }
4998 }
4999 return ret;
5000 }
5001
5002 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5003 ssize_t nread, totread = 0;
5004 time_t start = time(NULL);
5005
5006 timeout++;
5007 while(size) {
5008 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5009 nread = read(fd,ptr,size);
5010 if (nread == -1) return -1;
5011 ptr += nread;
5012 size -= nread;
5013 totread += nread;
5014 }
5015 if ((time(NULL)-start) > timeout) {
5016 errno = ETIMEDOUT;
5017 return -1;
5018 }
5019 }
5020 return totread;
5021 }
5022
5023 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5024 ssize_t nread = 0;
5025
5026 size--;
5027 while(size) {
5028 char c;
5029
5030 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5031 if (c == '\n') {
5032 *ptr = '\0';
5033 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5034 return nread;
5035 } else {
5036 *ptr++ = c;
5037 *ptr = '\0';
5038 nread++;
5039 }
5040 }
5041 return nread;
5042 }
5043
5044 static void syncCommand(redisClient *c) {
5045 /* ignore SYNC if aleady slave or in monitor mode */
5046 if (c->flags & REDIS_SLAVE) return;
5047
5048 /* SYNC can't be issued when the server has pending data to send to
5049 * the client about already issued commands. We need a fresh reply
5050 * buffer registering the differences between the BGSAVE and the current
5051 * dataset, so that we can copy to other slaves if needed. */
5052 if (listLength(c->reply) != 0) {
5053 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5054 return;
5055 }
5056
5057 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5058 /* Here we need to check if there is a background saving operation
5059 * in progress, or if it is required to start one */
5060 if (server.bgsaveinprogress) {
5061 /* Ok a background save is in progress. Let's check if it is a good
5062 * one for replication, i.e. if there is another slave that is
5063 * registering differences since the server forked to save */
5064 redisClient *slave;
5065 listNode *ln;
5066
5067 listRewind(server.slaves);
5068 while((ln = listYield(server.slaves))) {
5069 slave = ln->value;
5070 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5071 }
5072 if (ln) {
5073 /* Perfect, the server is already registering differences for
5074 * another slave. Set the right state, and copy the buffer. */
5075 listRelease(c->reply);
5076 c->reply = listDup(slave->reply);
5077 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5078 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5079 } else {
5080 /* No way, we need to wait for the next BGSAVE in order to
5081 * register differences */
5082 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5083 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5084 }
5085 } else {
5086 /* Ok we don't have a BGSAVE in progress, let's start one */
5087 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5088 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5089 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5090 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5091 return;
5092 }
5093 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5094 }
5095 c->repldbfd = -1;
5096 c->flags |= REDIS_SLAVE;
5097 c->slaveseldb = 0;
5098 listAddNodeTail(server.slaves,c);
5099 return;
5100 }
5101
5102 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5103 redisClient *slave = privdata;
5104 REDIS_NOTUSED(el);
5105 REDIS_NOTUSED(mask);
5106 char buf[REDIS_IOBUF_LEN];
5107 ssize_t nwritten, buflen;
5108
5109 if (slave->repldboff == 0) {
5110 /* Write the bulk write count before to transfer the DB. In theory here
5111 * we don't know how much room there is in the output buffer of the
5112 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5113 * operations) will never be smaller than the few bytes we need. */
5114 sds bulkcount;
5115
5116 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5117 slave->repldbsize);
5118 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5119 {
5120 sdsfree(bulkcount);
5121 freeClient(slave);
5122 return;
5123 }
5124 sdsfree(bulkcount);
5125 }
5126 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5127 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5128 if (buflen <= 0) {
5129 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5130 (buflen == 0) ? "premature EOF" : strerror(errno));
5131 freeClient(slave);
5132 return;
5133 }
5134 if ((nwritten = write(fd,buf,buflen)) == -1) {
5135 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5136 strerror(errno));
5137 freeClient(slave);
5138 return;
5139 }
5140 slave->repldboff += nwritten;
5141 if (slave->repldboff == slave->repldbsize) {
5142 close(slave->repldbfd);
5143 slave->repldbfd = -1;
5144 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5145 slave->replstate = REDIS_REPL_ONLINE;
5146 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5147 sendReplyToClient, slave, NULL) == AE_ERR) {
5148 freeClient(slave);
5149 return;
5150 }
5151 addReplySds(slave,sdsempty());
5152 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5153 }
5154 }
5155
5156 /* This function is called at the end of every backgrond saving.
5157 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5158 * otherwise REDIS_ERR is passed to the function.
5159 *
5160 * The goal of this function is to handle slaves waiting for a successful
5161 * background saving in order to perform non-blocking synchronization. */
5162 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5163 listNode *ln;
5164 int startbgsave = 0;
5165
5166 listRewind(server.slaves);
5167 while((ln = listYield(server.slaves))) {
5168 redisClient *slave = ln->value;
5169
5170 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5171 startbgsave = 1;
5172 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5173 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5174 struct redis_stat buf;
5175
5176 if (bgsaveerr != REDIS_OK) {
5177 freeClient(slave);
5178 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5179 continue;
5180 }
5181 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5182 redis_fstat(slave->repldbfd,&buf) == -1) {
5183 freeClient(slave);
5184 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5185 continue;
5186 }
5187 slave->repldboff = 0;
5188 slave->repldbsize = buf.st_size;
5189 slave->replstate = REDIS_REPL_SEND_BULK;
5190 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5191 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
5192 freeClient(slave);
5193 continue;
5194 }
5195 }
5196 }
5197 if (startbgsave) {
5198 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5199 listRewind(server.slaves);
5200 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5201 while((ln = listYield(server.slaves))) {
5202 redisClient *slave = ln->value;
5203
5204 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5205 freeClient(slave);
5206 }
5207 }
5208 }
5209 }
5210
5211 static int syncWithMaster(void) {
5212 char buf[1024], tmpfile[256], authcmd[1024];
5213 int dumpsize;
5214 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5215 int dfd;
5216
5217 if (fd == -1) {
5218 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5219 strerror(errno));
5220 return REDIS_ERR;
5221 }
5222
5223 /* AUTH with the master if required. */
5224 if(server.masterauth) {
5225 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5226 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5227 close(fd);
5228 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5229 strerror(errno));
5230 return REDIS_ERR;
5231 }
5232 /* Read the AUTH result. */
5233 if (syncReadLine(fd,buf,1024,3600) == -1) {
5234 close(fd);
5235 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5236 strerror(errno));
5237 return REDIS_ERR;
5238 }
5239 if (buf[0] != '+') {
5240 close(fd);
5241 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5242 return REDIS_ERR;
5243 }
5244 }
5245
5246 /* Issue the SYNC command */
5247 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5248 close(fd);
5249 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5250 strerror(errno));
5251 return REDIS_ERR;
5252 }
5253 /* Read the bulk write count */
5254 if (syncReadLine(fd,buf,1024,3600) == -1) {
5255 close(fd);
5256 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5257 strerror(errno));
5258 return REDIS_ERR;
5259 }
5260 if (buf[0] != '$') {
5261 close(fd);
5262 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5263 return REDIS_ERR;
5264 }
5265 dumpsize = atoi(buf+1);
5266 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5267 /* Read the bulk write data on a temp file */
5268 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5269 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5270 if (dfd == -1) {
5271 close(fd);
5272 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5273 return REDIS_ERR;
5274 }
5275 while(dumpsize) {
5276 int nread, nwritten;
5277
5278 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5279 if (nread == -1) {
5280 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5281 strerror(errno));
5282 close(fd);
5283 close(dfd);
5284 return REDIS_ERR;
5285 }
5286 nwritten = write(dfd,buf,nread);
5287 if (nwritten == -1) {
5288 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5289 close(fd);
5290 close(dfd);
5291 return REDIS_ERR;
5292 }
5293 dumpsize -= nread;
5294 }
5295 close(dfd);
5296 if (rename(tmpfile,server.dbfilename) == -1) {
5297 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5298 unlink(tmpfile);
5299 close(fd);
5300 return REDIS_ERR;
5301 }
5302 emptyDb();
5303 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5304 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5305 close(fd);
5306 return REDIS_ERR;
5307 }
5308 server.master = createClient(fd);
5309 server.master->flags |= REDIS_MASTER;
5310 server.replstate = REDIS_REPL_CONNECTED;
5311 return REDIS_OK;
5312 }
5313
5314 static void slaveofCommand(redisClient *c) {
5315 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5316 !strcasecmp(c->argv[2]->ptr,"one")) {
5317 if (server.masterhost) {
5318 sdsfree(server.masterhost);
5319 server.masterhost = NULL;
5320 if (server.master) freeClient(server.master);
5321 server.replstate = REDIS_REPL_NONE;
5322 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5323 }
5324 } else {
5325 sdsfree(server.masterhost);
5326 server.masterhost = sdsdup(c->argv[1]->ptr);
5327 server.masterport = atoi(c->argv[2]->ptr);
5328 if (server.master) freeClient(server.master);
5329 server.replstate = REDIS_REPL_CONNECT;
5330 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5331 server.masterhost, server.masterport);
5332 }
5333 addReply(c,shared.ok);
5334 }
5335
5336 /* ============================ Maxmemory directive ======================== */
5337
5338 /* This function gets called when 'maxmemory' is set on the config file to limit
5339 * the max memory used by the server, and we are out of memory.
5340 * This function will try to, in order:
5341 *
5342 * - Free objects from the free list
5343 * - Try to remove keys with an EXPIRE set
5344 *
5345 * It is not possible to free enough memory to reach used-memory < maxmemory
5346 * the server will start refusing commands that will enlarge even more the
5347 * memory usage.
5348 */
5349 static void freeMemoryIfNeeded(void) {
5350 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5351 if (listLength(server.objfreelist)) {
5352 robj *o;
5353
5354 listNode *head = listFirst(server.objfreelist);
5355 o = listNodeValue(head);
5356 listDelNode(server.objfreelist,head);
5357 zfree(o);
5358 } else {
5359 int j, k, freed = 0;
5360
5361 for (j = 0; j < server.dbnum; j++) {
5362 int minttl = -1;
5363 robj *minkey = NULL;
5364 struct dictEntry *de;
5365
5366 if (dictSize(server.db[j].expires)) {
5367 freed = 1;
5368 /* From a sample of three keys drop the one nearest to
5369 * the natural expire */
5370 for (k = 0; k < 3; k++) {
5371 time_t t;
5372
5373 de = dictGetRandomKey(server.db[j].expires);
5374 t = (time_t) dictGetEntryVal(de);
5375 if (minttl == -1 || t < minttl) {
5376 minkey = dictGetEntryKey(de);
5377 minttl = t;
5378 }
5379 }
5380 deleteKey(server.db+j,minkey);
5381 }
5382 }
5383 if (!freed) return; /* nothing to free... */
5384 }
5385 }
5386 }
5387
5388 /* ============================== Append Only file ========================== */
5389
5390 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5391 sds buf = sdsempty();
5392 int j;
5393 ssize_t nwritten;
5394 time_t now;
5395 robj *tmpargv[3];
5396
5397 /* The DB this command was targetting is not the same as the last command
5398 * we appendend. To issue a SELECT command is needed. */
5399 if (dictid != server.appendseldb) {
5400 char seldb[64];
5401
5402 snprintf(seldb,sizeof(seldb),"%d",dictid);
5403 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
5404 strlen(seldb),seldb);
5405 server.appendseldb = dictid;
5406 }
5407
5408 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5409 * EXPIREs into EXPIREATs calls */
5410 if (cmd->proc == expireCommand) {
5411 long when;
5412
5413 tmpargv[0] = createStringObject("EXPIREAT",8);
5414 tmpargv[1] = argv[1];
5415 incrRefCount(argv[1]);
5416 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5417 tmpargv[2] = createObject(REDIS_STRING,
5418 sdscatprintf(sdsempty(),"%ld",when));
5419 argv = tmpargv;
5420 }
5421
5422 /* Append the actual command */
5423 buf = sdscatprintf(buf,"*%d\r\n",argc);
5424 for (j = 0; j < argc; j++) {
5425 robj *o = argv[j];
5426
5427 if (o->encoding != REDIS_ENCODING_RAW)
5428 o = getDecodedObject(o);
5429 buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
5430 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5431 buf = sdscatlen(buf,"\r\n",2);
5432 if (o != argv[j])
5433 decrRefCount(o);
5434 }
5435
5436 /* Free the objects from the modified argv for EXPIREAT */
5437 if (cmd->proc == expireCommand) {
5438 for (j = 0; j < 3; j++)
5439 decrRefCount(argv[j]);
5440 }
5441
5442 /* We want to perform a single write. This should be guaranteed atomic
5443 * at least if the filesystem we are writing is a real physical one.
5444 * While this will save us against the server being killed I don't think
5445 * there is much to do about the whole server stopping for power problems
5446 * or alike */
5447 nwritten = write(server.appendfd,buf,sdslen(buf));
5448 if (nwritten != (signed)sdslen(buf)) {
5449 /* Ooops, we are in troubles. The best thing to do for now is
5450 * to simply exit instead to give the illusion that everything is
5451 * working as expected. */
5452 if (nwritten == -1) {
5453 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5454 } else {
5455 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5456 }
5457 exit(1);
5458 }
5459 now = time(NULL);
5460 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5461 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5462 now-server.lastfsync > 1))
5463 {
5464 fsync(server.appendfd); /* Let's try to get this data on the disk */
5465 server.lastfsync = now;
5466 }
5467 }
5468
5469 /* In Redis commands are always executed in the context of a client, so in
5470 * order to load the append only file we need to create a fake client. */
5471 static struct redisClient *createFakeClient(void) {
5472 struct redisClient *c = zmalloc(sizeof(*c));
5473
5474 selectDb(c,0);
5475 c->fd = -1;
5476 c->querybuf = sdsempty();
5477 c->argc = 0;
5478 c->argv = NULL;
5479 c->flags = 0;
5480 /* We set the fake client as a slave waiting for the synchronization
5481 * so that Redis will not try to send replies to this client. */
5482 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5483 c->reply = listCreate();
5484 listSetFreeMethod(c->reply,decrRefCount);
5485 listSetDupMethod(c->reply,dupClientReplyValue);
5486 return c;
5487 }
5488
5489 static void freeFakeClient(struct redisClient *c) {
5490 sdsfree(c->querybuf);
5491 listRelease(c->reply);
5492 zfree(c);
5493 }
5494
5495 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5496 * error (the append only file is zero-length) REDIS_ERR is returned. On
5497 * fatal error an error message is logged and the program exists. */
5498 int loadAppendOnlyFile(char *filename) {
5499 struct redisClient *fakeClient;
5500 FILE *fp = fopen(filename,"r");
5501 struct redis_stat sb;
5502
5503 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5504 return REDIS_ERR;
5505
5506 if (fp == NULL) {
5507 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5508 exit(1);
5509 }
5510
5511 fakeClient = createFakeClient();
5512 while(1) {
5513 int argc, j;
5514 unsigned long len;
5515 robj **argv;
5516 char buf[128];
5517 sds argsds;
5518 struct redisCommand *cmd;
5519
5520 if (fgets(buf,sizeof(buf),fp) == NULL) {
5521 if (feof(fp))
5522 break;
5523 else
5524 goto readerr;
5525 }
5526 if (buf[0] != '*') goto fmterr;
5527 argc = atoi(buf+1);
5528 argv = zmalloc(sizeof(robj*)*argc);
5529 for (j = 0; j < argc; j++) {
5530 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5531 if (buf[0] != '$') goto fmterr;
5532 len = strtol(buf+1,NULL,10);
5533 argsds = sdsnewlen(NULL,len);
5534 if (fread(argsds,len,1,fp) == 0) goto fmterr;
5535 argv[j] = createObject(REDIS_STRING,argsds);
5536 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5537 }
5538
5539 /* Command lookup */
5540 cmd = lookupCommand(argv[0]->ptr);
5541 if (!cmd) {
5542 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5543 exit(1);
5544 }
5545 /* Try object sharing and encoding */
5546 if (server.shareobjects) {
5547 int j;
5548 for(j = 1; j < argc; j++)
5549 argv[j] = tryObjectSharing(argv[j]);
5550 }
5551 if (cmd->flags & REDIS_CMD_BULK)
5552 tryObjectEncoding(argv[argc-1]);
5553 /* Run the command in the context of a fake client */
5554 fakeClient->argc = argc;
5555 fakeClient->argv = argv;
5556 cmd->proc(fakeClient);
5557 /* Discard the reply objects list from the fake client */
5558 while(listLength(fakeClient->reply))
5559 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5560 /* Clean up, ready for the next command */
5561 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5562 zfree(argv);
5563 }
5564 fclose(fp);
5565 freeFakeClient(fakeClient);
5566 return REDIS_OK;
5567
5568 readerr:
5569 if (feof(fp)) {
5570 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5571 } else {
5572 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5573 }
5574 exit(1);
5575 fmterr:
5576 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5577 exit(1);
5578 }
5579
5580 /* ================================= Debugging ============================== */
5581
5582 static void debugCommand(redisClient *c) {
5583 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
5584 *((char*)-1) = 'x';
5585 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
5586 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
5587 robj *key, *val;
5588
5589 if (!de) {
5590 addReply(c,shared.nokeyerr);
5591 return;
5592 }
5593 key = dictGetEntryKey(de);
5594 val = dictGetEntryVal(de);
5595 addReplySds(c,sdscatprintf(sdsempty(),
5596 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5597 key, key->refcount, val, val->refcount, val->encoding));
5598 } else {
5599 addReplySds(c,sdsnew(
5600 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5601 }
5602 }
5603
5604 /* =================================== Main! ================================ */
5605
5606 #ifdef __linux__
5607 int linuxOvercommitMemoryValue(void) {
5608 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5609 char buf[64];
5610
5611 if (!fp) return -1;
5612 if (fgets(buf,64,fp) == NULL) {
5613 fclose(fp);
5614 return -1;
5615 }
5616 fclose(fp);
5617
5618 return atoi(buf);
5619 }
5620
5621 void linuxOvercommitMemoryWarning(void) {
5622 if (linuxOvercommitMemoryValue() == 0) {
5623 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5624 }
5625 }
5626 #endif /* __linux__ */
5627
5628 static void daemonize(void) {
5629 int fd;
5630 FILE *fp;
5631
5632 if (fork() != 0) exit(0); /* parent exits */
5633 setsid(); /* create a new session */
5634
5635 /* Every output goes to /dev/null. If Redis is daemonized but
5636 * the 'logfile' is set to 'stdout' in the configuration file
5637 * it will not log at all. */
5638 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5639 dup2(fd, STDIN_FILENO);
5640 dup2(fd, STDOUT_FILENO);
5641 dup2(fd, STDERR_FILENO);
5642 if (fd > STDERR_FILENO) close(fd);
5643 }
5644 /* Try to write the pid file */
5645 fp = fopen(server.pidfile,"w");
5646 if (fp) {
5647 fprintf(fp,"%d\n",getpid());
5648 fclose(fp);
5649 }
5650 }
5651
5652 int main(int argc, char **argv) {
5653 initServerConfig();
5654 if (argc == 2) {
5655 resetServerSaveParams();
5656 loadServerConfig(argv[1]);
5657 } else if (argc > 2) {
5658 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5659 exit(1);
5660 } else {
5661 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5662 }
5663 initServer();
5664 if (server.daemonize) daemonize();
5665 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5666 #ifdef __linux__
5667 linuxOvercommitMemoryWarning();
5668 #endif
5669 if (server.appendonly) {
5670 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
5671 redisLog(REDIS_NOTICE,"DB loaded from append only file");
5672 } else {
5673 if (rdbLoad(server.dbfilename) == REDIS_OK)
5674 redisLog(REDIS_NOTICE,"DB loaded from disk");
5675 }
5676 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5677 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5678 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5679 aeMain(server.el);
5680 aeDeleteEventLoop(server.el);
5681 return 0;
5682 }
5683
5684 /* ============================= Backtrace support ========================= */
5685
5686 #ifdef HAVE_BACKTRACE
5687 static char *findFuncName(void *pointer, unsigned long *offset);
5688
5689 static void *getMcontextEip(ucontext_t *uc) {
5690 #if defined(__FreeBSD__)
5691 return (void*) uc->uc_mcontext.mc_eip;
5692 #elif defined(__dietlibc__)
5693 return (void*) uc->uc_mcontext.eip;
5694 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5695 return (void*) uc->uc_mcontext->__ss.__eip;
5696 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5697 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5698 return (void*) uc->uc_mcontext->__ss.__rip;
5699 #else
5700 return (void*) uc->uc_mcontext->__ss.__eip;
5701 #endif
5702 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5703 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5704 #elif defined(__ia64__) /* Linux IA64 */
5705 return (void*) uc->uc_mcontext.sc_ip;
5706 #else
5707 return NULL;
5708 #endif
5709 }
5710
5711 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5712 void *trace[100];
5713 char **messages = NULL;
5714 int i, trace_size = 0;
5715 unsigned long offset=0;
5716 time_t uptime = time(NULL)-server.stat_starttime;
5717 ucontext_t *uc = (ucontext_t*) secret;
5718 REDIS_NOTUSED(info);
5719
5720 redisLog(REDIS_WARNING,
5721 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5722 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5723 "redis_version:%s; "
5724 "uptime_in_seconds:%d; "
5725 "connected_clients:%d; "
5726 "connected_slaves:%d; "
5727 "used_memory:%zu; "
5728 "changes_since_last_save:%lld; "
5729 "bgsave_in_progress:%d; "
5730 "last_save_time:%d; "
5731 "total_connections_received:%lld; "
5732 "total_commands_processed:%lld; "
5733 "role:%s;"
5734 ,REDIS_VERSION,
5735 uptime,
5736 listLength(server.clients)-listLength(server.slaves),
5737 listLength(server.slaves),
5738 server.usedmemory,
5739 server.dirty,
5740 server.bgsaveinprogress,
5741 server.lastsave,
5742 server.stat_numconnections,
5743 server.stat_numcommands,
5744 server.masterhost == NULL ? "master" : "slave"
5745 ));
5746
5747 trace_size = backtrace(trace, 100);
5748 /* overwrite sigaction with caller's address */
5749 if (getMcontextEip(uc) != NULL) {
5750 trace[1] = getMcontextEip(uc);
5751 }
5752 messages = backtrace_symbols(trace, trace_size);
5753
5754 for (i=1; i<trace_size; ++i) {
5755 char *fn = findFuncName(trace[i], &offset), *p;
5756
5757 p = strchr(messages[i],'+');
5758 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5759 redisLog(REDIS_WARNING,"%s", messages[i]);
5760 } else {
5761 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5762 }
5763 }
5764 free(messages);
5765 exit(0);
5766 }
5767
5768 static void setupSigSegvAction(void) {
5769 struct sigaction act;
5770
5771 sigemptyset (&act.sa_mask);
5772 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5773 * is used. Otherwise, sa_handler is used */
5774 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5775 act.sa_sigaction = segvHandler;
5776 sigaction (SIGSEGV, &act, NULL);
5777 sigaction (SIGBUS, &act, NULL);
5778 sigaction (SIGFPE, &act, NULL);
5779 sigaction (SIGILL, &act, NULL);
5780 sigaction (SIGBUS, &act, NULL);
5781 return;
5782 }
5783
5784 #include "staticsymbols.h"
5785 /* This function try to convert a pointer into a function name. It's used in
5786 * oreder to provide a backtrace under segmentation fault that's able to
5787 * display functions declared as static (otherwise the backtrace is useless). */
5788 static char *findFuncName(void *pointer, unsigned long *offset){
5789 int i, ret = -1;
5790 unsigned long off, minoff = 0;
5791
5792 /* Try to match against the Symbol with the smallest offset */
5793 for (i=0; symsTable[i].pointer; i++) {
5794 unsigned long lp = (unsigned long) pointer;
5795
5796 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5797 off=lp-symsTable[i].pointer;
5798 if (ret < 0 || off < minoff) {
5799 minoff=off;
5800 ret=i;
5801 }
5802 }
5803 }
5804 if (ret == -1) return NULL;
5805 *offset = minoff;
5806 return symsTable[ret].name;
5807 }
5808 #else /* HAVE_BACKTRACE */
5809 static void setupSigSegvAction(void) {
5810 }
5811 #endif /* HAVE_BACKTRACE */
5812
5813
5814
5815 /* The End */
5816
5817
5818