]> git.saurik.com Git - redis.git/blob - redis.c
fixed a refcounting bug with SORT ... STORE leading to random crashes
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60 #include <math.h>
61
62 #if defined(__sun)
63 #include "solarisfixes.h"
64 #endif
65
66 #include "redis.h"
67 #include "ae.h" /* Event driven programming library */
68 #include "sds.h" /* Dynamic safe strings */
69 #include "anet.h" /* Networking the easy way */
70 #include "dict.h" /* Hash tables */
71 #include "adlist.h" /* Linked lists */
72 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
73 #include "lzf.h" /* LZF compression library */
74 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
75
76 /* Error codes */
77 #define REDIS_OK 0
78 #define REDIS_ERR -1
79
80 /* Static server configuration */
81 #define REDIS_SERVERPORT 6379 /* TCP port */
82 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
83 #define REDIS_IOBUF_LEN 1024
84 #define REDIS_LOADBUF_LEN 1024
85 #define REDIS_STATIC_ARGS 4
86 #define REDIS_DEFAULT_DBNUM 16
87 #define REDIS_CONFIGLINE_MAX 1024
88 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
89 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
90 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
91 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
92 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
93
94 /* Hash table parameters */
95 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
96
97 /* Command flags */
98 #define REDIS_CMD_BULK 1 /* Bulk write command */
99 #define REDIS_CMD_INLINE 2 /* Inline command */
100 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
101 this flags will return an error when the 'maxmemory' option is set in the
102 config file and the server is using more than maxmemory bytes of memory.
103 In short this commands are denied on low memory conditions. */
104 #define REDIS_CMD_DENYOOM 4
105
106 /* Object types */
107 #define REDIS_STRING 0
108 #define REDIS_LIST 1
109 #define REDIS_SET 2
110 #define REDIS_ZSET 3
111 #define REDIS_HASH 4
112
113 /* Objects encoding */
114 #define REDIS_ENCODING_RAW 0 /* Raw representation */
115 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
116
117 /* Object types only used for dumping to disk */
118 #define REDIS_EXPIRETIME 253
119 #define REDIS_SELECTDB 254
120 #define REDIS_EOF 255
121
122 /* Defines related to the dump file format. To store 32 bits lengths for short
123 * keys requires a lot of space, so we check the most significant 2 bits of
124 * the first byte to interpreter the length:
125 *
126 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
127 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
128 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
129 * 11|000000 this means: specially encoded object will follow. The six bits
130 * number specify the kind of object that follows.
131 * See the REDIS_RDB_ENC_* defines.
132 *
133 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
134 * values, will fit inside. */
135 #define REDIS_RDB_6BITLEN 0
136 #define REDIS_RDB_14BITLEN 1
137 #define REDIS_RDB_32BITLEN 2
138 #define REDIS_RDB_ENCVAL 3
139 #define REDIS_RDB_LENERR UINT_MAX
140
141 /* When a length of a string object stored on disk has the first two bits
142 * set, the remaining two bits specify a special encoding for the object
143 * accordingly to the following defines: */
144 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
145 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
146 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
147 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
148
149 /* Client flags */
150 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
151 #define REDIS_SLAVE 2 /* This client is a slave server */
152 #define REDIS_MASTER 4 /* This client is a master server */
153 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
154
155 /* Slave replication state - slave side */
156 #define REDIS_REPL_NONE 0 /* No active replication */
157 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
158 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
159
160 /* Slave replication state - from the point of view of master
161 * Note that in SEND_BULK and ONLINE state the slave receives new updates
162 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
163 * to start the next background saving in order to send updates to it. */
164 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
165 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
166 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
167 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
168
169 /* List related stuff */
170 #define REDIS_HEAD 0
171 #define REDIS_TAIL 1
172
173 /* Sort operations */
174 #define REDIS_SORT_GET 0
175 #define REDIS_SORT_ASC 1
176 #define REDIS_SORT_DESC 2
177 #define REDIS_SORTKEY_MAX 1024
178
179 /* Log levels */
180 #define REDIS_DEBUG 0
181 #define REDIS_NOTICE 1
182 #define REDIS_WARNING 2
183
184 /* Anti-warning macro... */
185 #define REDIS_NOTUSED(V) ((void) V)
186
187 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
188 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
189
190 /* Append only defines */
191 #define APPENDFSYNC_NO 0
192 #define APPENDFSYNC_ALWAYS 1
193 #define APPENDFSYNC_EVERYSEC 2
194
195 /*================================= Data types ============================== */
196
197 /* A redis object, that is a type able to hold a string / list / set */
198 typedef struct redisObject {
199 void *ptr;
200 unsigned char type;
201 unsigned char encoding;
202 unsigned char notused[2];
203 int refcount;
204 } robj;
205
206 typedef struct redisDb {
207 dict *dict;
208 dict *expires;
209 int id;
210 } redisDb;
211
212 /* With multiplexing we need to take per-clinet state.
213 * Clients are taken in a liked list. */
214 typedef struct redisClient {
215 int fd;
216 redisDb *db;
217 int dictid;
218 sds querybuf;
219 robj **argv, **mbargv;
220 int argc, mbargc;
221 int bulklen; /* bulk read len. -1 if not in bulk read mode */
222 int multibulk; /* multi bulk command format active */
223 list *reply;
224 int sentlen;
225 time_t lastinteraction; /* time of the last interaction, used for timeout */
226 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
227 int slaveseldb; /* slave selected db, if this client is a slave */
228 int authenticated; /* when requirepass is non-NULL */
229 int replstate; /* replication state if this is a slave */
230 int repldbfd; /* replication DB file descriptor */
231 long repldboff; /* replication DB file offset */
232 off_t repldbsize; /* replication DB file size */
233 } redisClient;
234
235 struct saveparam {
236 time_t seconds;
237 int changes;
238 };
239
240 /* Global server state structure */
241 struct redisServer {
242 int port;
243 int fd;
244 redisDb *db;
245 dict *sharingpool;
246 unsigned int sharingpoolsize;
247 long long dirty; /* changes to DB from the last save */
248 list *clients;
249 list *slaves, *monitors;
250 char neterr[ANET_ERR_LEN];
251 aeEventLoop *el;
252 int cronloops; /* number of times the cron function run */
253 list *objfreelist; /* A list of freed objects to avoid malloc() */
254 time_t lastsave; /* Unix time of last save succeeede */
255 size_t usedmemory; /* Used memory in megabytes */
256 /* Fields used only for stats */
257 time_t stat_starttime; /* server start time */
258 long long stat_numcommands; /* number of processed commands */
259 long long stat_numconnections; /* number of connections received */
260 /* Configuration */
261 int verbosity;
262 int glueoutputbuf;
263 int maxidletime;
264 int dbnum;
265 int daemonize;
266 int appendonly;
267 int appendfsync;
268 time_t lastfsync;
269 int appendfd;
270 int appendseldb;
271 char *pidfile;
272 int bgsaveinprogress;
273 pid_t bgsavechildpid;
274 struct saveparam *saveparams;
275 int saveparamslen;
276 char *logfile;
277 char *bindaddr;
278 char *dbfilename;
279 char *appendfilename;
280 char *requirepass;
281 int shareobjects;
282 /* Replication related */
283 int isslave;
284 char *masterauth;
285 char *masterhost;
286 int masterport;
287 redisClient *master; /* client that is master for this slave */
288 int replstate;
289 unsigned int maxclients;
290 unsigned long maxmemory;
291 /* Sort parameters - qsort_r() is only available under BSD so we
292 * have to take this state global, in order to pass it to sortCompare() */
293 int sort_desc;
294 int sort_alpha;
295 int sort_bypattern;
296 };
297
298 typedef void redisCommandProc(redisClient *c);
299 struct redisCommand {
300 char *name;
301 redisCommandProc *proc;
302 int arity;
303 int flags;
304 };
305
306 struct redisFunctionSym {
307 char *name;
308 unsigned long pointer;
309 };
310
311 typedef struct _redisSortObject {
312 robj *obj;
313 union {
314 double score;
315 robj *cmpobj;
316 } u;
317 } redisSortObject;
318
319 typedef struct _redisSortOperation {
320 int type;
321 robj *pattern;
322 } redisSortOperation;
323
324 /* ZSETs use a specialized version of Skiplists */
325
326 typedef struct zskiplistNode {
327 struct zskiplistNode **forward;
328 struct zskiplistNode *backward;
329 double score;
330 robj *obj;
331 } zskiplistNode;
332
333 typedef struct zskiplist {
334 struct zskiplistNode *header, *tail;
335 unsigned long length;
336 int level;
337 } zskiplist;
338
339 typedef struct zset {
340 dict *dict;
341 zskiplist *zsl;
342 } zset;
343
344 /* Our shared "common" objects */
345
346 struct sharedObjectsStruct {
347 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
348 *colon, *nullbulk, *nullmultibulk,
349 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
350 *outofrangeerr, *plus,
351 *select0, *select1, *select2, *select3, *select4,
352 *select5, *select6, *select7, *select8, *select9;
353 } shared;
354
355 /* Global vars that are actally used as constants. The following double
356 * values are used for double on-disk serialization, and are initialized
357 * at runtime to avoid strange compiler optimizations. */
358
359 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
360
361 /*================================ Prototypes =============================== */
362
363 static void freeStringObject(robj *o);
364 static void freeListObject(robj *o);
365 static void freeSetObject(robj *o);
366 static void decrRefCount(void *o);
367 static robj *createObject(int type, void *ptr);
368 static void freeClient(redisClient *c);
369 static int rdbLoad(char *filename);
370 static void addReply(redisClient *c, robj *obj);
371 static void addReplySds(redisClient *c, sds s);
372 static void incrRefCount(robj *o);
373 static int rdbSaveBackground(char *filename);
374 static robj *createStringObject(char *ptr, size_t len);
375 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
376 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
377 static int syncWithMaster(void);
378 static robj *tryObjectSharing(robj *o);
379 static int tryObjectEncoding(robj *o);
380 static robj *getDecodedObject(const robj *o);
381 static int removeExpire(redisDb *db, robj *key);
382 static int expireIfNeeded(redisDb *db, robj *key);
383 static int deleteIfVolatile(redisDb *db, robj *key);
384 static int deleteKey(redisDb *db, robj *key);
385 static time_t getExpire(redisDb *db, robj *key);
386 static int setExpire(redisDb *db, robj *key, time_t when);
387 static void updateSlavesWaitingBgsave(int bgsaveerr);
388 static void freeMemoryIfNeeded(void);
389 static int processCommand(redisClient *c);
390 static void setupSigSegvAction(void);
391 static void rdbRemoveTempFile(pid_t childpid);
392 static size_t stringObjectLen(robj *o);
393 static void processInputBuffer(redisClient *c);
394 static zskiplist *zslCreate(void);
395 static void zslFree(zskiplist *zsl);
396 static void zslInsert(zskiplist *zsl, double score, robj *obj);
397
398 static void authCommand(redisClient *c);
399 static void pingCommand(redisClient *c);
400 static void echoCommand(redisClient *c);
401 static void setCommand(redisClient *c);
402 static void setnxCommand(redisClient *c);
403 static void getCommand(redisClient *c);
404 static void delCommand(redisClient *c);
405 static void existsCommand(redisClient *c);
406 static void incrCommand(redisClient *c);
407 static void decrCommand(redisClient *c);
408 static void incrbyCommand(redisClient *c);
409 static void decrbyCommand(redisClient *c);
410 static void selectCommand(redisClient *c);
411 static void randomkeyCommand(redisClient *c);
412 static void keysCommand(redisClient *c);
413 static void dbsizeCommand(redisClient *c);
414 static void lastsaveCommand(redisClient *c);
415 static void saveCommand(redisClient *c);
416 static void bgsaveCommand(redisClient *c);
417 static void shutdownCommand(redisClient *c);
418 static void moveCommand(redisClient *c);
419 static void renameCommand(redisClient *c);
420 static void renamenxCommand(redisClient *c);
421 static void lpushCommand(redisClient *c);
422 static void rpushCommand(redisClient *c);
423 static void lpopCommand(redisClient *c);
424 static void rpopCommand(redisClient *c);
425 static void llenCommand(redisClient *c);
426 static void lindexCommand(redisClient *c);
427 static void lrangeCommand(redisClient *c);
428 static void ltrimCommand(redisClient *c);
429 static void typeCommand(redisClient *c);
430 static void lsetCommand(redisClient *c);
431 static void saddCommand(redisClient *c);
432 static void sremCommand(redisClient *c);
433 static void smoveCommand(redisClient *c);
434 static void sismemberCommand(redisClient *c);
435 static void scardCommand(redisClient *c);
436 static void spopCommand(redisClient *c);
437 static void srandmemberCommand(redisClient *c);
438 static void sinterCommand(redisClient *c);
439 static void sinterstoreCommand(redisClient *c);
440 static void sunionCommand(redisClient *c);
441 static void sunionstoreCommand(redisClient *c);
442 static void sdiffCommand(redisClient *c);
443 static void sdiffstoreCommand(redisClient *c);
444 static void syncCommand(redisClient *c);
445 static void flushdbCommand(redisClient *c);
446 static void flushallCommand(redisClient *c);
447 static void sortCommand(redisClient *c);
448 static void lremCommand(redisClient *c);
449 static void infoCommand(redisClient *c);
450 static void mgetCommand(redisClient *c);
451 static void monitorCommand(redisClient *c);
452 static void expireCommand(redisClient *c);
453 static void expireatCommand(redisClient *c);
454 static void getsetCommand(redisClient *c);
455 static void ttlCommand(redisClient *c);
456 static void slaveofCommand(redisClient *c);
457 static void debugCommand(redisClient *c);
458 static void msetCommand(redisClient *c);
459 static void msetnxCommand(redisClient *c);
460 static void zaddCommand(redisClient *c);
461 static void zrangeCommand(redisClient *c);
462 static void zrangebyscoreCommand(redisClient *c);
463 static void zrevrangeCommand(redisClient *c);
464 static void zcardCommand(redisClient *c);
465 static void zremCommand(redisClient *c);
466 static void zscoreCommand(redisClient *c);
467 static void zremrangebyscoreCommand(redisClient *c);
468
469 /*================================= Globals ================================= */
470
471 /* Global vars */
472 static struct redisServer server; /* server global state */
473 static struct redisCommand cmdTable[] = {
474 {"get",getCommand,2,REDIS_CMD_INLINE},
475 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
476 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
477 {"del",delCommand,-2,REDIS_CMD_INLINE},
478 {"exists",existsCommand,2,REDIS_CMD_INLINE},
479 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
480 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
481 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
482 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
483 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
484 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
485 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
486 {"llen",llenCommand,2,REDIS_CMD_INLINE},
487 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
488 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
489 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
490 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
491 {"lrem",lremCommand,4,REDIS_CMD_BULK},
492 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
493 {"srem",sremCommand,3,REDIS_CMD_BULK},
494 {"smove",smoveCommand,4,REDIS_CMD_BULK},
495 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
496 {"scard",scardCommand,2,REDIS_CMD_INLINE},
497 {"spop",spopCommand,2,REDIS_CMD_INLINE},
498 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
499 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
500 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
501 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
502 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
503 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
504 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
505 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
506 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
507 {"zrem",zremCommand,3,REDIS_CMD_BULK},
508 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
509 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
510 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
511 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
512 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
513 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
514 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
515 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
516 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
517 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
518 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
519 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
520 {"select",selectCommand,2,REDIS_CMD_INLINE},
521 {"move",moveCommand,3,REDIS_CMD_INLINE},
522 {"rename",renameCommand,3,REDIS_CMD_INLINE},
523 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
524 {"expire",expireCommand,3,REDIS_CMD_INLINE},
525 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
526 {"keys",keysCommand,2,REDIS_CMD_INLINE},
527 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
528 {"auth",authCommand,2,REDIS_CMD_INLINE},
529 {"ping",pingCommand,1,REDIS_CMD_INLINE},
530 {"echo",echoCommand,2,REDIS_CMD_BULK},
531 {"save",saveCommand,1,REDIS_CMD_INLINE},
532 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
533 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
534 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
535 {"type",typeCommand,2,REDIS_CMD_INLINE},
536 {"sync",syncCommand,1,REDIS_CMD_INLINE},
537 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
538 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
539 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
540 {"info",infoCommand,1,REDIS_CMD_INLINE},
541 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
542 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
543 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
544 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
545 {NULL,NULL,0,0}
546 };
547 /*============================ Utility functions ============================ */
548
549 /* Glob-style pattern matching. */
550 int stringmatchlen(const char *pattern, int patternLen,
551 const char *string, int stringLen, int nocase)
552 {
553 while(patternLen) {
554 switch(pattern[0]) {
555 case '*':
556 while (pattern[1] == '*') {
557 pattern++;
558 patternLen--;
559 }
560 if (patternLen == 1)
561 return 1; /* match */
562 while(stringLen) {
563 if (stringmatchlen(pattern+1, patternLen-1,
564 string, stringLen, nocase))
565 return 1; /* match */
566 string++;
567 stringLen--;
568 }
569 return 0; /* no match */
570 break;
571 case '?':
572 if (stringLen == 0)
573 return 0; /* no match */
574 string++;
575 stringLen--;
576 break;
577 case '[':
578 {
579 int not, match;
580
581 pattern++;
582 patternLen--;
583 not = pattern[0] == '^';
584 if (not) {
585 pattern++;
586 patternLen--;
587 }
588 match = 0;
589 while(1) {
590 if (pattern[0] == '\\') {
591 pattern++;
592 patternLen--;
593 if (pattern[0] == string[0])
594 match = 1;
595 } else if (pattern[0] == ']') {
596 break;
597 } else if (patternLen == 0) {
598 pattern--;
599 patternLen++;
600 break;
601 } else if (pattern[1] == '-' && patternLen >= 3) {
602 int start = pattern[0];
603 int end = pattern[2];
604 int c = string[0];
605 if (start > end) {
606 int t = start;
607 start = end;
608 end = t;
609 }
610 if (nocase) {
611 start = tolower(start);
612 end = tolower(end);
613 c = tolower(c);
614 }
615 pattern += 2;
616 patternLen -= 2;
617 if (c >= start && c <= end)
618 match = 1;
619 } else {
620 if (!nocase) {
621 if (pattern[0] == string[0])
622 match = 1;
623 } else {
624 if (tolower((int)pattern[0]) == tolower((int)string[0]))
625 match = 1;
626 }
627 }
628 pattern++;
629 patternLen--;
630 }
631 if (not)
632 match = !match;
633 if (!match)
634 return 0; /* no match */
635 string++;
636 stringLen--;
637 break;
638 }
639 case '\\':
640 if (patternLen >= 2) {
641 pattern++;
642 patternLen--;
643 }
644 /* fall through */
645 default:
646 if (!nocase) {
647 if (pattern[0] != string[0])
648 return 0; /* no match */
649 } else {
650 if (tolower((int)pattern[0]) != tolower((int)string[0]))
651 return 0; /* no match */
652 }
653 string++;
654 stringLen--;
655 break;
656 }
657 pattern++;
658 patternLen--;
659 if (stringLen == 0) {
660 while(*pattern == '*') {
661 pattern++;
662 patternLen--;
663 }
664 break;
665 }
666 }
667 if (patternLen == 0 && stringLen == 0)
668 return 1;
669 return 0;
670 }
671
672 static void redisLog(int level, const char *fmt, ...) {
673 va_list ap;
674 FILE *fp;
675
676 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
677 if (!fp) return;
678
679 va_start(ap, fmt);
680 if (level >= server.verbosity) {
681 char *c = ".-*";
682 char buf[64];
683 time_t now;
684
685 now = time(NULL);
686 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
687 fprintf(fp,"%s %c ",buf,c[level]);
688 vfprintf(fp, fmt, ap);
689 fprintf(fp,"\n");
690 fflush(fp);
691 }
692 va_end(ap);
693
694 if (server.logfile) fclose(fp);
695 }
696
697 /*====================== Hash table type implementation ==================== */
698
699 /* This is an hash table type that uses the SDS dynamic strings libary as
700 * keys and radis objects as values (objects can hold SDS strings,
701 * lists, sets). */
702
703 static void dictVanillaFree(void *privdata, void *val)
704 {
705 DICT_NOTUSED(privdata);
706 zfree(val);
707 }
708
709 static int sdsDictKeyCompare(void *privdata, const void *key1,
710 const void *key2)
711 {
712 int l1,l2;
713 DICT_NOTUSED(privdata);
714
715 l1 = sdslen((sds)key1);
716 l2 = sdslen((sds)key2);
717 if (l1 != l2) return 0;
718 return memcmp(key1, key2, l1) == 0;
719 }
720
721 static void dictRedisObjectDestructor(void *privdata, void *val)
722 {
723 DICT_NOTUSED(privdata);
724
725 decrRefCount(val);
726 }
727
728 static int dictObjKeyCompare(void *privdata, const void *key1,
729 const void *key2)
730 {
731 const robj *o1 = key1, *o2 = key2;
732 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
733 }
734
735 static unsigned int dictObjHash(const void *key) {
736 const robj *o = key;
737 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
738 }
739
740 static int dictEncObjKeyCompare(void *privdata, const void *key1,
741 const void *key2)
742 {
743 const robj *o1 = key1, *o2 = key2;
744
745 if (o1->encoding == REDIS_ENCODING_RAW &&
746 o2->encoding == REDIS_ENCODING_RAW)
747 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
748 else {
749 robj *dec1, *dec2;
750 int cmp;
751
752 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
753 getDecodedObject(o1) : (robj*)o1;
754 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
755 getDecodedObject(o2) : (robj*)o2;
756 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
757 if (dec1 != o1) decrRefCount(dec1);
758 if (dec2 != o2) decrRefCount(dec2);
759 return cmp;
760 }
761 }
762
763 static unsigned int dictEncObjHash(const void *key) {
764 const robj *o = key;
765
766 if (o->encoding == REDIS_ENCODING_RAW)
767 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
768 else {
769 robj *dec = getDecodedObject(o);
770 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
771 decrRefCount(dec);
772 return hash;
773 }
774 }
775
776 static dictType setDictType = {
777 dictEncObjHash, /* hash function */
778 NULL, /* key dup */
779 NULL, /* val dup */
780 dictEncObjKeyCompare, /* key compare */
781 dictRedisObjectDestructor, /* key destructor */
782 NULL /* val destructor */
783 };
784
785 static dictType zsetDictType = {
786 dictEncObjHash, /* hash function */
787 NULL, /* key dup */
788 NULL, /* val dup */
789 dictEncObjKeyCompare, /* key compare */
790 dictRedisObjectDestructor, /* key destructor */
791 dictVanillaFree /* val destructor */
792 };
793
794 static dictType hashDictType = {
795 dictObjHash, /* hash function */
796 NULL, /* key dup */
797 NULL, /* val dup */
798 dictObjKeyCompare, /* key compare */
799 dictRedisObjectDestructor, /* key destructor */
800 dictRedisObjectDestructor /* val destructor */
801 };
802
803 /* ========================= Random utility functions ======================= */
804
805 /* Redis generally does not try to recover from out of memory conditions
806 * when allocating objects or strings, it is not clear if it will be possible
807 * to report this condition to the client since the networking layer itself
808 * is based on heap allocation for send buffers, so we simply abort.
809 * At least the code will be simpler to read... */
810 static void oom(const char *msg) {
811 fprintf(stderr, "%s: Out of memory\n",msg);
812 fflush(stderr);
813 sleep(1);
814 abort();
815 }
816
817 /* ====================== Redis server networking stuff ===================== */
818 static void closeTimedoutClients(void) {
819 redisClient *c;
820 listNode *ln;
821 time_t now = time(NULL);
822
823 listRewind(server.clients);
824 while ((ln = listYield(server.clients)) != NULL) {
825 c = listNodeValue(ln);
826 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
827 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
828 (now - c->lastinteraction > server.maxidletime)) {
829 redisLog(REDIS_DEBUG,"Closing idle client");
830 freeClient(c);
831 }
832 }
833 }
834
835 static int htNeedsResize(dict *dict) {
836 long long size, used;
837
838 size = dictSlots(dict);
839 used = dictSize(dict);
840 return (size && used && size > DICT_HT_INITIAL_SIZE &&
841 (used*100/size < REDIS_HT_MINFILL));
842 }
843
844 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
845 * we resize the hash table to save memory */
846 static void tryResizeHashTables(void) {
847 int j;
848
849 for (j = 0; j < server.dbnum; j++) {
850 if (htNeedsResize(server.db[j].dict)) {
851 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
852 dictResize(server.db[j].dict);
853 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
854 }
855 if (htNeedsResize(server.db[j].expires))
856 dictResize(server.db[j].expires);
857 }
858 }
859
860 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
861 int j, loops = server.cronloops++;
862 REDIS_NOTUSED(eventLoop);
863 REDIS_NOTUSED(id);
864 REDIS_NOTUSED(clientData);
865
866 /* Update the global state with the amount of used memory */
867 server.usedmemory = zmalloc_used_memory();
868
869 /* Show some info about non-empty databases */
870 for (j = 0; j < server.dbnum; j++) {
871 long long size, used, vkeys;
872
873 size = dictSlots(server.db[j].dict);
874 used = dictSize(server.db[j].dict);
875 vkeys = dictSize(server.db[j].expires);
876 if (!(loops % 5) && (used || vkeys)) {
877 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
878 /* dictPrintStats(server.dict); */
879 }
880 }
881
882 /* We don't want to resize the hash tables while a bacground saving
883 * is in progress: the saving child is created using fork() that is
884 * implemented with a copy-on-write semantic in most modern systems, so
885 * if we resize the HT while there is the saving child at work actually
886 * a lot of memory movements in the parent will cause a lot of pages
887 * copied. */
888 if (!server.bgsaveinprogress) tryResizeHashTables();
889
890 /* Show information about connected clients */
891 if (!(loops % 5)) {
892 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
893 listLength(server.clients)-listLength(server.slaves),
894 listLength(server.slaves),
895 server.usedmemory,
896 dictSize(server.sharingpool));
897 }
898
899 /* Close connections of timedout clients */
900 if (server.maxidletime && !(loops % 10))
901 closeTimedoutClients();
902
903 /* Check if a background saving in progress terminated */
904 if (server.bgsaveinprogress) {
905 int statloc;
906 if (wait4(-1,&statloc,WNOHANG,NULL)) {
907 int exitcode = WEXITSTATUS(statloc);
908 int bysignal = WIFSIGNALED(statloc);
909
910 if (!bysignal && exitcode == 0) {
911 redisLog(REDIS_NOTICE,
912 "Background saving terminated with success");
913 server.dirty = 0;
914 server.lastsave = time(NULL);
915 } else if (!bysignal && exitcode != 0) {
916 redisLog(REDIS_WARNING, "Background saving error");
917 } else {
918 redisLog(REDIS_WARNING,
919 "Background saving terminated by signal");
920 rdbRemoveTempFile(server.bgsavechildpid);
921 }
922 server.bgsaveinprogress = 0;
923 server.bgsavechildpid = -1;
924 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
925 }
926 } else {
927 /* If there is not a background saving in progress check if
928 * we have to save now */
929 time_t now = time(NULL);
930 for (j = 0; j < server.saveparamslen; j++) {
931 struct saveparam *sp = server.saveparams+j;
932
933 if (server.dirty >= sp->changes &&
934 now-server.lastsave > sp->seconds) {
935 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
936 sp->changes, sp->seconds);
937 rdbSaveBackground(server.dbfilename);
938 break;
939 }
940 }
941 }
942
943 /* Try to expire a few timed out keys */
944 for (j = 0; j < server.dbnum; j++) {
945 redisDb *db = server.db+j;
946 int num = dictSize(db->expires);
947
948 if (num) {
949 time_t now = time(NULL);
950
951 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
952 num = REDIS_EXPIRELOOKUPS_PER_CRON;
953 while (num--) {
954 dictEntry *de;
955 time_t t;
956
957 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
958 t = (time_t) dictGetEntryVal(de);
959 if (now > t) {
960 deleteKey(db,dictGetEntryKey(de));
961 }
962 }
963 }
964 }
965
966 /* Check if we should connect to a MASTER */
967 if (server.replstate == REDIS_REPL_CONNECT) {
968 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
969 if (syncWithMaster() == REDIS_OK) {
970 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
971 }
972 }
973 return 1000;
974 }
975
976 static void createSharedObjects(void) {
977 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
978 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
979 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
980 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
981 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
982 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
983 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
984 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
985 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
986 /* no such key */
987 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
988 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
989 "-ERR Operation against a key holding the wrong kind of value\r\n"));
990 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
991 "-ERR no such key\r\n"));
992 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
993 "-ERR syntax error\r\n"));
994 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
995 "-ERR source and destination objects are the same\r\n"));
996 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
997 "-ERR index out of range\r\n"));
998 shared.space = createObject(REDIS_STRING,sdsnew(" "));
999 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1000 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1001 shared.select0 = createStringObject("select 0\r\n",10);
1002 shared.select1 = createStringObject("select 1\r\n",10);
1003 shared.select2 = createStringObject("select 2\r\n",10);
1004 shared.select3 = createStringObject("select 3\r\n",10);
1005 shared.select4 = createStringObject("select 4\r\n",10);
1006 shared.select5 = createStringObject("select 5\r\n",10);
1007 shared.select6 = createStringObject("select 6\r\n",10);
1008 shared.select7 = createStringObject("select 7\r\n",10);
1009 shared.select8 = createStringObject("select 8\r\n",10);
1010 shared.select9 = createStringObject("select 9\r\n",10);
1011 }
1012
1013 static void appendServerSaveParams(time_t seconds, int changes) {
1014 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1015 server.saveparams[server.saveparamslen].seconds = seconds;
1016 server.saveparams[server.saveparamslen].changes = changes;
1017 server.saveparamslen++;
1018 }
1019
1020 static void ResetServerSaveParams() {
1021 zfree(server.saveparams);
1022 server.saveparams = NULL;
1023 server.saveparamslen = 0;
1024 }
1025
1026 static void initServerConfig() {
1027 server.dbnum = REDIS_DEFAULT_DBNUM;
1028 server.port = REDIS_SERVERPORT;
1029 server.verbosity = REDIS_DEBUG;
1030 server.maxidletime = REDIS_MAXIDLETIME;
1031 server.saveparams = NULL;
1032 server.logfile = NULL; /* NULL = log on standard output */
1033 server.bindaddr = NULL;
1034 server.glueoutputbuf = 1;
1035 server.daemonize = 0;
1036 server.appendonly = 0;
1037 server.appendfsync = APPENDFSYNC_ALWAYS;
1038 server.lastfsync = time(NULL);
1039 server.appendfd = -1;
1040 server.appendseldb = -1; /* Make sure the first time will not match */
1041 server.pidfile = "/var/run/redis.pid";
1042 server.dbfilename = "dump.rdb";
1043 server.appendfilename = "appendonly.log";
1044 server.requirepass = NULL;
1045 server.shareobjects = 0;
1046 server.sharingpoolsize = 1024;
1047 server.maxclients = 0;
1048 server.maxmemory = 0;
1049 ResetServerSaveParams();
1050
1051 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1052 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1053 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1054 /* Replication related */
1055 server.isslave = 0;
1056 server.masterauth = NULL;
1057 server.masterhost = NULL;
1058 server.masterport = 6379;
1059 server.master = NULL;
1060 server.replstate = REDIS_REPL_NONE;
1061
1062 /* Double constants initialization */
1063 R_Zero = 0.0;
1064 R_PosInf = 1.0/R_Zero;
1065 R_NegInf = -1.0/R_Zero;
1066 R_Nan = R_Zero/R_Zero;
1067 }
1068
1069 static void initServer() {
1070 int j;
1071
1072 signal(SIGHUP, SIG_IGN);
1073 signal(SIGPIPE, SIG_IGN);
1074 setupSigSegvAction();
1075
1076 server.clients = listCreate();
1077 server.slaves = listCreate();
1078 server.monitors = listCreate();
1079 server.objfreelist = listCreate();
1080 createSharedObjects();
1081 server.el = aeCreateEventLoop();
1082 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1083 server.sharingpool = dictCreate(&setDictType,NULL);
1084 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1085 if (server.fd == -1) {
1086 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1087 exit(1);
1088 }
1089 for (j = 0; j < server.dbnum; j++) {
1090 server.db[j].dict = dictCreate(&hashDictType,NULL);
1091 server.db[j].expires = dictCreate(&setDictType,NULL);
1092 server.db[j].id = j;
1093 }
1094 server.cronloops = 0;
1095 server.bgsaveinprogress = 0;
1096 server.bgsavechildpid = -1;
1097 server.lastsave = time(NULL);
1098 server.dirty = 0;
1099 server.usedmemory = 0;
1100 server.stat_numcommands = 0;
1101 server.stat_numconnections = 0;
1102 server.stat_starttime = time(NULL);
1103 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1104
1105 if (server.appendonly) {
1106 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1107 if (server.appendfd == -1) {
1108 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1109 strerror(errno));
1110 exit(1);
1111 }
1112 }
1113 }
1114
1115 /* Empty the whole database */
1116 static long long emptyDb() {
1117 int j;
1118 long long removed = 0;
1119
1120 for (j = 0; j < server.dbnum; j++) {
1121 removed += dictSize(server.db[j].dict);
1122 dictEmpty(server.db[j].dict);
1123 dictEmpty(server.db[j].expires);
1124 }
1125 return removed;
1126 }
1127
1128 static int yesnotoi(char *s) {
1129 if (!strcasecmp(s,"yes")) return 1;
1130 else if (!strcasecmp(s,"no")) return 0;
1131 else return -1;
1132 }
1133
1134 /* I agree, this is a very rudimental way to load a configuration...
1135 will improve later if the config gets more complex */
1136 static void loadServerConfig(char *filename) {
1137 FILE *fp;
1138 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1139 int linenum = 0;
1140 sds line = NULL;
1141
1142 if (filename[0] == '-' && filename[1] == '\0')
1143 fp = stdin;
1144 else {
1145 if ((fp = fopen(filename,"r")) == NULL) {
1146 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1147 exit(1);
1148 }
1149 }
1150
1151 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1152 sds *argv;
1153 int argc, j;
1154
1155 linenum++;
1156 line = sdsnew(buf);
1157 line = sdstrim(line," \t\r\n");
1158
1159 /* Skip comments and blank lines*/
1160 if (line[0] == '#' || line[0] == '\0') {
1161 sdsfree(line);
1162 continue;
1163 }
1164
1165 /* Split into arguments */
1166 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1167 sdstolower(argv[0]);
1168
1169 /* Execute config directives */
1170 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1171 server.maxidletime = atoi(argv[1]);
1172 if (server.maxidletime < 0) {
1173 err = "Invalid timeout value"; goto loaderr;
1174 }
1175 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1176 server.port = atoi(argv[1]);
1177 if (server.port < 1 || server.port > 65535) {
1178 err = "Invalid port"; goto loaderr;
1179 }
1180 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1181 server.bindaddr = zstrdup(argv[1]);
1182 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1183 int seconds = atoi(argv[1]);
1184 int changes = atoi(argv[2]);
1185 if (seconds < 1 || changes < 0) {
1186 err = "Invalid save parameters"; goto loaderr;
1187 }
1188 appendServerSaveParams(seconds,changes);
1189 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1190 if (chdir(argv[1]) == -1) {
1191 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1192 argv[1], strerror(errno));
1193 exit(1);
1194 }
1195 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1196 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1197 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1198 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1199 else {
1200 err = "Invalid log level. Must be one of debug, notice, warning";
1201 goto loaderr;
1202 }
1203 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1204 FILE *logfp;
1205
1206 server.logfile = zstrdup(argv[1]);
1207 if (!strcasecmp(server.logfile,"stdout")) {
1208 zfree(server.logfile);
1209 server.logfile = NULL;
1210 }
1211 if (server.logfile) {
1212 /* Test if we are able to open the file. The server will not
1213 * be able to abort just for this problem later... */
1214 logfp = fopen(server.logfile,"a");
1215 if (logfp == NULL) {
1216 err = sdscatprintf(sdsempty(),
1217 "Can't open the log file: %s", strerror(errno));
1218 goto loaderr;
1219 }
1220 fclose(logfp);
1221 }
1222 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1223 server.dbnum = atoi(argv[1]);
1224 if (server.dbnum < 1) {
1225 err = "Invalid number of databases"; goto loaderr;
1226 }
1227 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1228 server.maxclients = atoi(argv[1]);
1229 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1230 server.maxmemory = strtoll(argv[1], NULL, 10);
1231 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1232 server.masterhost = sdsnew(argv[1]);
1233 server.masterport = atoi(argv[2]);
1234 server.replstate = REDIS_REPL_CONNECT;
1235 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1236 server.masterauth = zstrdup(argv[1]);
1237 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1238 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1239 err = "argument must be 'yes' or 'no'"; goto loaderr;
1240 }
1241 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1242 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1243 err = "argument must be 'yes' or 'no'"; goto loaderr;
1244 }
1245 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1246 server.sharingpoolsize = atoi(argv[1]);
1247 if (server.sharingpoolsize < 1) {
1248 err = "invalid object sharing pool size"; goto loaderr;
1249 }
1250 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1251 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1252 err = "argument must be 'yes' or 'no'"; goto loaderr;
1253 }
1254 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1255 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1256 err = "argument must be 'yes' or 'no'"; goto loaderr;
1257 }
1258 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1259 if (!strcasecmp(argv[1],"no")) {
1260 server.appendfsync = APPENDFSYNC_NO;
1261 } else if (!strcasecmp(argv[1],"always")) {
1262 server.appendfsync = APPENDFSYNC_ALWAYS;
1263 } else if (!strcasecmp(argv[1],"everysec")) {
1264 server.appendfsync = APPENDFSYNC_EVERYSEC;
1265 } else {
1266 err = "argument must be 'no', 'always' or 'everysec'";
1267 goto loaderr;
1268 }
1269 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1270 server.requirepass = zstrdup(argv[1]);
1271 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1272 server.pidfile = zstrdup(argv[1]);
1273 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1274 server.dbfilename = zstrdup(argv[1]);
1275 } else {
1276 err = "Bad directive or wrong number of arguments"; goto loaderr;
1277 }
1278 for (j = 0; j < argc; j++)
1279 sdsfree(argv[j]);
1280 zfree(argv);
1281 sdsfree(line);
1282 }
1283 if (fp != stdin) fclose(fp);
1284 return;
1285
1286 loaderr:
1287 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1288 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1289 fprintf(stderr, ">>> '%s'\n", line);
1290 fprintf(stderr, "%s\n", err);
1291 exit(1);
1292 }
1293
1294 static void freeClientArgv(redisClient *c) {
1295 int j;
1296
1297 for (j = 0; j < c->argc; j++)
1298 decrRefCount(c->argv[j]);
1299 for (j = 0; j < c->mbargc; j++)
1300 decrRefCount(c->mbargv[j]);
1301 c->argc = 0;
1302 c->mbargc = 0;
1303 }
1304
1305 static void freeClient(redisClient *c) {
1306 listNode *ln;
1307
1308 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1309 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1310 sdsfree(c->querybuf);
1311 listRelease(c->reply);
1312 freeClientArgv(c);
1313 close(c->fd);
1314 ln = listSearchKey(server.clients,c);
1315 assert(ln != NULL);
1316 listDelNode(server.clients,ln);
1317 if (c->flags & REDIS_SLAVE) {
1318 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1319 close(c->repldbfd);
1320 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1321 ln = listSearchKey(l,c);
1322 assert(ln != NULL);
1323 listDelNode(l,ln);
1324 }
1325 if (c->flags & REDIS_MASTER) {
1326 server.master = NULL;
1327 server.replstate = REDIS_REPL_CONNECT;
1328 }
1329 zfree(c->argv);
1330 zfree(c->mbargv);
1331 zfree(c);
1332 }
1333
1334 static void glueReplyBuffersIfNeeded(redisClient *c) {
1335 int totlen = 0;
1336 listNode *ln;
1337 robj *o;
1338
1339 listRewind(c->reply);
1340 while((ln = listYield(c->reply))) {
1341 o = ln->value;
1342 totlen += sdslen(o->ptr);
1343 /* This optimization makes more sense if we don't have to copy
1344 * too much data */
1345 if (totlen > 1024) return;
1346 }
1347 if (totlen > 0) {
1348 char buf[1024];
1349 int copylen = 0;
1350
1351 listRewind(c->reply);
1352 while((ln = listYield(c->reply))) {
1353 o = ln->value;
1354 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1355 copylen += sdslen(o->ptr);
1356 listDelNode(c->reply,ln);
1357 }
1358 /* Now the output buffer is empty, add the new single element */
1359 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1360 listAddNodeTail(c->reply,o);
1361 }
1362 }
1363
1364 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1365 redisClient *c = privdata;
1366 int nwritten = 0, totwritten = 0, objlen;
1367 robj *o;
1368 REDIS_NOTUSED(el);
1369 REDIS_NOTUSED(mask);
1370
1371 if (server.glueoutputbuf && listLength(c->reply) > 1)
1372 glueReplyBuffersIfNeeded(c);
1373 while(listLength(c->reply)) {
1374 o = listNodeValue(listFirst(c->reply));
1375 objlen = sdslen(o->ptr);
1376
1377 if (objlen == 0) {
1378 listDelNode(c->reply,listFirst(c->reply));
1379 continue;
1380 }
1381
1382 if (c->flags & REDIS_MASTER) {
1383 /* Don't reply to a master */
1384 nwritten = objlen - c->sentlen;
1385 } else {
1386 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1387 if (nwritten <= 0) break;
1388 }
1389 c->sentlen += nwritten;
1390 totwritten += nwritten;
1391 /* If we fully sent the object on head go to the next one */
1392 if (c->sentlen == objlen) {
1393 listDelNode(c->reply,listFirst(c->reply));
1394 c->sentlen = 0;
1395 }
1396 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1397 * bytes, in a single threaded server it's a good idea to server
1398 * other clients as well, even if a very large request comes from
1399 * super fast link that is always able to accept data (in real world
1400 * terms think to 'KEYS *' against the loopback interfae) */
1401 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1402 }
1403 if (nwritten == -1) {
1404 if (errno == EAGAIN) {
1405 nwritten = 0;
1406 } else {
1407 redisLog(REDIS_DEBUG,
1408 "Error writing to client: %s", strerror(errno));
1409 freeClient(c);
1410 return;
1411 }
1412 }
1413 if (totwritten > 0) c->lastinteraction = time(NULL);
1414 if (listLength(c->reply) == 0) {
1415 c->sentlen = 0;
1416 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1417 }
1418 }
1419
1420 static struct redisCommand *lookupCommand(char *name) {
1421 int j = 0;
1422 while(cmdTable[j].name != NULL) {
1423 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1424 j++;
1425 }
1426 return NULL;
1427 }
1428
1429 /* resetClient prepare the client to process the next command */
1430 static void resetClient(redisClient *c) {
1431 freeClientArgv(c);
1432 c->bulklen = -1;
1433 c->multibulk = 0;
1434 }
1435
1436 /* If this function gets called we already read a whole
1437 * command, argments are in the client argv/argc fields.
1438 * processCommand() execute the command or prepare the
1439 * server for a bulk read from the client.
1440 *
1441 * If 1 is returned the client is still alive and valid and
1442 * and other operations can be performed by the caller. Otherwise
1443 * if 0 is returned the client was destroied (i.e. after QUIT). */
1444 static int processCommand(redisClient *c) {
1445 struct redisCommand *cmd;
1446 long long dirty;
1447
1448 /* Free some memory if needed (maxmemory setting) */
1449 if (server.maxmemory) freeMemoryIfNeeded();
1450
1451 /* Handle the multi bulk command type. This is an alternative protocol
1452 * supported by Redis in order to receive commands that are composed of
1453 * multiple binary-safe "bulk" arguments. The latency of processing is
1454 * a bit higher but this allows things like multi-sets, so if this
1455 * protocol is used only for MSET and similar commands this is a big win. */
1456 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1457 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1458 if (c->multibulk <= 0) {
1459 resetClient(c);
1460 return 1;
1461 } else {
1462 decrRefCount(c->argv[c->argc-1]);
1463 c->argc--;
1464 return 1;
1465 }
1466 } else if (c->multibulk) {
1467 if (c->bulklen == -1) {
1468 if (((char*)c->argv[0]->ptr)[0] != '$') {
1469 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1470 resetClient(c);
1471 return 1;
1472 } else {
1473 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1474 decrRefCount(c->argv[0]);
1475 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1476 c->argc--;
1477 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1478 resetClient(c);
1479 return 1;
1480 }
1481 c->argc--;
1482 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1483 return 1;
1484 }
1485 } else {
1486 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1487 c->mbargv[c->mbargc] = c->argv[0];
1488 c->mbargc++;
1489 c->argc--;
1490 c->multibulk--;
1491 if (c->multibulk == 0) {
1492 robj **auxargv;
1493 int auxargc;
1494
1495 /* Here we need to swap the multi-bulk argc/argv with the
1496 * normal argc/argv of the client structure. */
1497 auxargv = c->argv;
1498 c->argv = c->mbargv;
1499 c->mbargv = auxargv;
1500
1501 auxargc = c->argc;
1502 c->argc = c->mbargc;
1503 c->mbargc = auxargc;
1504
1505 /* We need to set bulklen to something different than -1
1506 * in order for the code below to process the command without
1507 * to try to read the last argument of a bulk command as
1508 * a special argument. */
1509 c->bulklen = 0;
1510 /* continue below and process the command */
1511 } else {
1512 c->bulklen = -1;
1513 return 1;
1514 }
1515 }
1516 }
1517 /* -- end of multi bulk commands processing -- */
1518
1519 /* The QUIT command is handled as a special case. Normal command
1520 * procs are unable to close the client connection safely */
1521 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1522 freeClient(c);
1523 return 0;
1524 }
1525 cmd = lookupCommand(c->argv[0]->ptr);
1526 if (!cmd) {
1527 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1528 resetClient(c);
1529 return 1;
1530 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1531 (c->argc < -cmd->arity)) {
1532 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1533 resetClient(c);
1534 return 1;
1535 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1536 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1537 resetClient(c);
1538 return 1;
1539 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1540 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1541
1542 decrRefCount(c->argv[c->argc-1]);
1543 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1544 c->argc--;
1545 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1546 resetClient(c);
1547 return 1;
1548 }
1549 c->argc--;
1550 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1551 /* It is possible that the bulk read is already in the
1552 * buffer. Check this condition and handle it accordingly.
1553 * This is just a fast path, alternative to call processInputBuffer().
1554 * It's a good idea since the code is small and this condition
1555 * happens most of the times. */
1556 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1557 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1558 c->argc++;
1559 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1560 } else {
1561 return 1;
1562 }
1563 }
1564 /* Let's try to share objects on the command arguments vector */
1565 if (server.shareobjects) {
1566 int j;
1567 for(j = 1; j < c->argc; j++)
1568 c->argv[j] = tryObjectSharing(c->argv[j]);
1569 }
1570 /* Let's try to encode the bulk object to save space. */
1571 if (cmd->flags & REDIS_CMD_BULK)
1572 tryObjectEncoding(c->argv[c->argc-1]);
1573
1574 /* Check if the user is authenticated */
1575 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1576 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1577 resetClient(c);
1578 return 1;
1579 }
1580
1581 /* Exec the command */
1582 dirty = server.dirty;
1583 cmd->proc(c);
1584 if (server.appendonly && server.dirty-dirty)
1585 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1586 if (server.dirty-dirty && listLength(server.slaves))
1587 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1588 if (listLength(server.monitors))
1589 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1590 server.stat_numcommands++;
1591
1592 /* Prepare the client for the next command */
1593 if (c->flags & REDIS_CLOSE) {
1594 freeClient(c);
1595 return 0;
1596 }
1597 resetClient(c);
1598 return 1;
1599 }
1600
1601 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1602 listNode *ln;
1603 int outc = 0, j;
1604 robj **outv;
1605 /* (args*2)+1 is enough room for args, spaces, newlines */
1606 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1607
1608 if (argc <= REDIS_STATIC_ARGS) {
1609 outv = static_outv;
1610 } else {
1611 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1612 }
1613
1614 for (j = 0; j < argc; j++) {
1615 if (j != 0) outv[outc++] = shared.space;
1616 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1617 robj *lenobj;
1618
1619 lenobj = createObject(REDIS_STRING,
1620 sdscatprintf(sdsempty(),"%d\r\n",
1621 stringObjectLen(argv[j])));
1622 lenobj->refcount = 0;
1623 outv[outc++] = lenobj;
1624 }
1625 outv[outc++] = argv[j];
1626 }
1627 outv[outc++] = shared.crlf;
1628
1629 /* Increment all the refcounts at start and decrement at end in order to
1630 * be sure to free objects if there is no slave in a replication state
1631 * able to be feed with commands */
1632 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1633 listRewind(slaves);
1634 while((ln = listYield(slaves))) {
1635 redisClient *slave = ln->value;
1636
1637 /* Don't feed slaves that are still waiting for BGSAVE to start */
1638 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1639
1640 /* Feed all the other slaves, MONITORs and so on */
1641 if (slave->slaveseldb != dictid) {
1642 robj *selectcmd;
1643
1644 switch(dictid) {
1645 case 0: selectcmd = shared.select0; break;
1646 case 1: selectcmd = shared.select1; break;
1647 case 2: selectcmd = shared.select2; break;
1648 case 3: selectcmd = shared.select3; break;
1649 case 4: selectcmd = shared.select4; break;
1650 case 5: selectcmd = shared.select5; break;
1651 case 6: selectcmd = shared.select6; break;
1652 case 7: selectcmd = shared.select7; break;
1653 case 8: selectcmd = shared.select8; break;
1654 case 9: selectcmd = shared.select9; break;
1655 default:
1656 selectcmd = createObject(REDIS_STRING,
1657 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1658 selectcmd->refcount = 0;
1659 break;
1660 }
1661 addReply(slave,selectcmd);
1662 slave->slaveseldb = dictid;
1663 }
1664 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1665 }
1666 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1667 if (outv != static_outv) zfree(outv);
1668 }
1669
1670 static void processInputBuffer(redisClient *c) {
1671 again:
1672 if (c->bulklen == -1) {
1673 /* Read the first line of the query */
1674 char *p = strchr(c->querybuf,'\n');
1675 size_t querylen;
1676
1677 if (p) {
1678 sds query, *argv;
1679 int argc, j;
1680
1681 query = c->querybuf;
1682 c->querybuf = sdsempty();
1683 querylen = 1+(p-(query));
1684 if (sdslen(query) > querylen) {
1685 /* leave data after the first line of the query in the buffer */
1686 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1687 }
1688 *p = '\0'; /* remove "\n" */
1689 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1690 sdsupdatelen(query);
1691
1692 /* Now we can split the query in arguments */
1693 if (sdslen(query) == 0) {
1694 /* Ignore empty query */
1695 sdsfree(query);
1696 return;
1697 }
1698 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1699 sdsfree(query);
1700
1701 if (c->argv) zfree(c->argv);
1702 c->argv = zmalloc(sizeof(robj*)*argc);
1703
1704 for (j = 0; j < argc; j++) {
1705 if (sdslen(argv[j])) {
1706 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1707 c->argc++;
1708 } else {
1709 sdsfree(argv[j]);
1710 }
1711 }
1712 zfree(argv);
1713 /* Execute the command. If the client is still valid
1714 * after processCommand() return and there is something
1715 * on the query buffer try to process the next command. */
1716 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1717 return;
1718 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1719 redisLog(REDIS_DEBUG, "Client protocol error");
1720 freeClient(c);
1721 return;
1722 }
1723 } else {
1724 /* Bulk read handling. Note that if we are at this point
1725 the client already sent a command terminated with a newline,
1726 we are reading the bulk data that is actually the last
1727 argument of the command. */
1728 int qbl = sdslen(c->querybuf);
1729
1730 if (c->bulklen <= qbl) {
1731 /* Copy everything but the final CRLF as final argument */
1732 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1733 c->argc++;
1734 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1735 /* Process the command. If the client is still valid after
1736 * the processing and there is more data in the buffer
1737 * try to parse it. */
1738 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1739 return;
1740 }
1741 }
1742 }
1743
1744 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1745 redisClient *c = (redisClient*) privdata;
1746 char buf[REDIS_IOBUF_LEN];
1747 int nread;
1748 REDIS_NOTUSED(el);
1749 REDIS_NOTUSED(mask);
1750
1751 nread = read(fd, buf, REDIS_IOBUF_LEN);
1752 if (nread == -1) {
1753 if (errno == EAGAIN) {
1754 nread = 0;
1755 } else {
1756 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1757 freeClient(c);
1758 return;
1759 }
1760 } else if (nread == 0) {
1761 redisLog(REDIS_DEBUG, "Client closed connection");
1762 freeClient(c);
1763 return;
1764 }
1765 if (nread) {
1766 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1767 c->lastinteraction = time(NULL);
1768 } else {
1769 return;
1770 }
1771 processInputBuffer(c);
1772 }
1773
1774 static int selectDb(redisClient *c, int id) {
1775 if (id < 0 || id >= server.dbnum)
1776 return REDIS_ERR;
1777 c->db = &server.db[id];
1778 return REDIS_OK;
1779 }
1780
1781 static void *dupClientReplyValue(void *o) {
1782 incrRefCount((robj*)o);
1783 return 0;
1784 }
1785
1786 static redisClient *createClient(int fd) {
1787 redisClient *c = zmalloc(sizeof(*c));
1788
1789 anetNonBlock(NULL,fd);
1790 anetTcpNoDelay(NULL,fd);
1791 if (!c) return NULL;
1792 selectDb(c,0);
1793 c->fd = fd;
1794 c->querybuf = sdsempty();
1795 c->argc = 0;
1796 c->argv = NULL;
1797 c->bulklen = -1;
1798 c->multibulk = 0;
1799 c->mbargc = 0;
1800 c->mbargv = NULL;
1801 c->sentlen = 0;
1802 c->flags = 0;
1803 c->lastinteraction = time(NULL);
1804 c->authenticated = 0;
1805 c->replstate = REDIS_REPL_NONE;
1806 c->reply = listCreate();
1807 listSetFreeMethod(c->reply,decrRefCount);
1808 listSetDupMethod(c->reply,dupClientReplyValue);
1809 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1810 readQueryFromClient, c, NULL) == AE_ERR) {
1811 freeClient(c);
1812 return NULL;
1813 }
1814 listAddNodeTail(server.clients,c);
1815 return c;
1816 }
1817
1818 static void addReply(redisClient *c, robj *obj) {
1819 if (listLength(c->reply) == 0 &&
1820 (c->replstate == REDIS_REPL_NONE ||
1821 c->replstate == REDIS_REPL_ONLINE) &&
1822 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1823 sendReplyToClient, c, NULL) == AE_ERR) return;
1824 if (obj->encoding != REDIS_ENCODING_RAW) {
1825 obj = getDecodedObject(obj);
1826 } else {
1827 incrRefCount(obj);
1828 }
1829 listAddNodeTail(c->reply,obj);
1830 }
1831
1832 static void addReplySds(redisClient *c, sds s) {
1833 robj *o = createObject(REDIS_STRING,s);
1834 addReply(c,o);
1835 decrRefCount(o);
1836 }
1837
1838 static void addReplyBulkLen(redisClient *c, robj *obj) {
1839 size_t len;
1840
1841 if (obj->encoding == REDIS_ENCODING_RAW) {
1842 len = sdslen(obj->ptr);
1843 } else {
1844 long n = (long)obj->ptr;
1845
1846 len = 1;
1847 if (n < 0) {
1848 len++;
1849 n = -n;
1850 }
1851 while((n = n/10) != 0) {
1852 len++;
1853 }
1854 }
1855 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1856 }
1857
1858 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1859 int cport, cfd;
1860 char cip[128];
1861 redisClient *c;
1862 REDIS_NOTUSED(el);
1863 REDIS_NOTUSED(mask);
1864 REDIS_NOTUSED(privdata);
1865
1866 cfd = anetAccept(server.neterr, fd, cip, &cport);
1867 if (cfd == AE_ERR) {
1868 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1869 return;
1870 }
1871 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1872 if ((c = createClient(cfd)) == NULL) {
1873 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1874 close(cfd); /* May be already closed, just ingore errors */
1875 return;
1876 }
1877 /* If maxclient directive is set and this is one client more... close the
1878 * connection. Note that we create the client instead to check before
1879 * for this condition, since now the socket is already set in nonblocking
1880 * mode and we can send an error for free using the Kernel I/O */
1881 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1882 char *err = "-ERR max number of clients reached\r\n";
1883
1884 /* That's a best effort error message, don't check write errors */
1885 (void) write(c->fd,err,strlen(err));
1886 freeClient(c);
1887 return;
1888 }
1889 server.stat_numconnections++;
1890 }
1891
1892 /* ======================= Redis objects implementation ===================== */
1893
1894 static robj *createObject(int type, void *ptr) {
1895 robj *o;
1896
1897 if (listLength(server.objfreelist)) {
1898 listNode *head = listFirst(server.objfreelist);
1899 o = listNodeValue(head);
1900 listDelNode(server.objfreelist,head);
1901 } else {
1902 o = zmalloc(sizeof(*o));
1903 }
1904 o->type = type;
1905 o->encoding = REDIS_ENCODING_RAW;
1906 o->ptr = ptr;
1907 o->refcount = 1;
1908 return o;
1909 }
1910
1911 static robj *createStringObject(char *ptr, size_t len) {
1912 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1913 }
1914
1915 static robj *createListObject(void) {
1916 list *l = listCreate();
1917
1918 listSetFreeMethod(l,decrRefCount);
1919 return createObject(REDIS_LIST,l);
1920 }
1921
1922 static robj *createSetObject(void) {
1923 dict *d = dictCreate(&setDictType,NULL);
1924 return createObject(REDIS_SET,d);
1925 }
1926
1927 static robj *createZsetObject(void) {
1928 zset *zs = zmalloc(sizeof(*zs));
1929
1930 zs->dict = dictCreate(&zsetDictType,NULL);
1931 zs->zsl = zslCreate();
1932 return createObject(REDIS_ZSET,zs);
1933 }
1934
1935 static void freeStringObject(robj *o) {
1936 if (o->encoding == REDIS_ENCODING_RAW) {
1937 sdsfree(o->ptr);
1938 }
1939 }
1940
1941 static void freeListObject(robj *o) {
1942 listRelease((list*) o->ptr);
1943 }
1944
1945 static void freeSetObject(robj *o) {
1946 dictRelease((dict*) o->ptr);
1947 }
1948
1949 static void freeZsetObject(robj *o) {
1950 zset *zs = o->ptr;
1951
1952 dictRelease(zs->dict);
1953 zslFree(zs->zsl);
1954 zfree(zs);
1955 }
1956
1957 static void freeHashObject(robj *o) {
1958 dictRelease((dict*) o->ptr);
1959 }
1960
1961 static void incrRefCount(robj *o) {
1962 o->refcount++;
1963 #ifdef DEBUG_REFCOUNT
1964 if (o->type == REDIS_STRING)
1965 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1966 #endif
1967 }
1968
1969 static void decrRefCount(void *obj) {
1970 robj *o = obj;
1971
1972 #ifdef DEBUG_REFCOUNT
1973 if (o->type == REDIS_STRING)
1974 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1975 #endif
1976 if (--(o->refcount) == 0) {
1977 switch(o->type) {
1978 case REDIS_STRING: freeStringObject(o); break;
1979 case REDIS_LIST: freeListObject(o); break;
1980 case REDIS_SET: freeSetObject(o); break;
1981 case REDIS_ZSET: freeZsetObject(o); break;
1982 case REDIS_HASH: freeHashObject(o); break;
1983 default: assert(0 != 0); break;
1984 }
1985 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1986 !listAddNodeHead(server.objfreelist,o))
1987 zfree(o);
1988 }
1989 }
1990
1991 static robj *lookupKey(redisDb *db, robj *key) {
1992 dictEntry *de = dictFind(db->dict,key);
1993 return de ? dictGetEntryVal(de) : NULL;
1994 }
1995
1996 static robj *lookupKeyRead(redisDb *db, robj *key) {
1997 expireIfNeeded(db,key);
1998 return lookupKey(db,key);
1999 }
2000
2001 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2002 deleteIfVolatile(db,key);
2003 return lookupKey(db,key);
2004 }
2005
2006 static int deleteKey(redisDb *db, robj *key) {
2007 int retval;
2008
2009 /* We need to protect key from destruction: after the first dictDelete()
2010 * it may happen that 'key' is no longer valid if we don't increment
2011 * it's count. This may happen when we get the object reference directly
2012 * from the hash table with dictRandomKey() or dict iterators */
2013 incrRefCount(key);
2014 if (dictSize(db->expires)) dictDelete(db->expires,key);
2015 retval = dictDelete(db->dict,key);
2016 decrRefCount(key);
2017
2018 return retval == DICT_OK;
2019 }
2020
2021 /* Try to share an object against the shared objects pool */
2022 static robj *tryObjectSharing(robj *o) {
2023 struct dictEntry *de;
2024 unsigned long c;
2025
2026 if (o == NULL || server.shareobjects == 0) return o;
2027
2028 assert(o->type == REDIS_STRING);
2029 de = dictFind(server.sharingpool,o);
2030 if (de) {
2031 robj *shared = dictGetEntryKey(de);
2032
2033 c = ((unsigned long) dictGetEntryVal(de))+1;
2034 dictGetEntryVal(de) = (void*) c;
2035 incrRefCount(shared);
2036 decrRefCount(o);
2037 return shared;
2038 } else {
2039 /* Here we are using a stream algorihtm: Every time an object is
2040 * shared we increment its count, everytime there is a miss we
2041 * recrement the counter of a random object. If this object reaches
2042 * zero we remove the object and put the current object instead. */
2043 if (dictSize(server.sharingpool) >=
2044 server.sharingpoolsize) {
2045 de = dictGetRandomKey(server.sharingpool);
2046 assert(de != NULL);
2047 c = ((unsigned long) dictGetEntryVal(de))-1;
2048 dictGetEntryVal(de) = (void*) c;
2049 if (c == 0) {
2050 dictDelete(server.sharingpool,de->key);
2051 }
2052 } else {
2053 c = 0; /* If the pool is empty we want to add this object */
2054 }
2055 if (c == 0) {
2056 int retval;
2057
2058 retval = dictAdd(server.sharingpool,o,(void*)1);
2059 assert(retval == DICT_OK);
2060 incrRefCount(o);
2061 }
2062 return o;
2063 }
2064 }
2065
2066 /* Check if the nul-terminated string 's' can be represented by a long
2067 * (that is, is a number that fits into long without any other space or
2068 * character before or after the digits).
2069 *
2070 * If so, the function returns REDIS_OK and *longval is set to the value
2071 * of the number. Otherwise REDIS_ERR is returned */
2072 static int isStringRepresentableAsLong(sds s, long *longval) {
2073 char buf[32], *endptr;
2074 long value;
2075 int slen;
2076
2077 value = strtol(s, &endptr, 10);
2078 if (endptr[0] != '\0') return REDIS_ERR;
2079 slen = snprintf(buf,32,"%ld",value);
2080
2081 /* If the number converted back into a string is not identical
2082 * then it's not possible to encode the string as integer */
2083 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2084 if (longval) *longval = value;
2085 return REDIS_OK;
2086 }
2087
2088 /* Try to encode a string object in order to save space */
2089 static int tryObjectEncoding(robj *o) {
2090 long value;
2091 sds s = o->ptr;
2092
2093 if (o->encoding != REDIS_ENCODING_RAW)
2094 return REDIS_ERR; /* Already encoded */
2095
2096 /* It's not save to encode shared objects: shared objects can be shared
2097 * everywhere in the "object space" of Redis. Encoded objects can only
2098 * appear as "values" (and not, for instance, as keys) */
2099 if (o->refcount > 1) return REDIS_ERR;
2100
2101 /* Currently we try to encode only strings */
2102 assert(o->type == REDIS_STRING);
2103
2104 /* Check if we can represent this string as a long integer */
2105 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2106
2107 /* Ok, this object can be encoded */
2108 o->encoding = REDIS_ENCODING_INT;
2109 sdsfree(o->ptr);
2110 o->ptr = (void*) value;
2111 return REDIS_OK;
2112 }
2113
2114 /* Get a decoded version of an encoded object (returned as a new object) */
2115 static robj *getDecodedObject(const robj *o) {
2116 robj *dec;
2117
2118 assert(o->encoding != REDIS_ENCODING_RAW);
2119 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2120 char buf[32];
2121
2122 snprintf(buf,32,"%ld",(long)o->ptr);
2123 dec = createStringObject(buf,strlen(buf));
2124 return dec;
2125 } else {
2126 assert(1 != 1);
2127 }
2128 }
2129
2130 /* Compare two string objects via strcmp() or alike.
2131 * Note that the objects may be integer-encoded. In such a case we
2132 * use snprintf() to get a string representation of the numbers on the stack
2133 * and compare the strings, it's much faster than calling getDecodedObject(). */
2134 static int compareStringObjects(robj *a, robj *b) {
2135 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2136 char bufa[128], bufb[128], *astr, *bstr;
2137 int bothsds = 1;
2138
2139 if (a == b) return 0;
2140 if (a->encoding != REDIS_ENCODING_RAW) {
2141 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2142 astr = bufa;
2143 bothsds = 0;
2144 } else {
2145 astr = a->ptr;
2146 }
2147 if (b->encoding != REDIS_ENCODING_RAW) {
2148 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2149 bstr = bufb;
2150 bothsds = 0;
2151 } else {
2152 bstr = b->ptr;
2153 }
2154 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2155 }
2156
2157 static size_t stringObjectLen(robj *o) {
2158 assert(o->type == REDIS_STRING);
2159 if (o->encoding == REDIS_ENCODING_RAW) {
2160 return sdslen(o->ptr);
2161 } else {
2162 char buf[32];
2163
2164 return snprintf(buf,32,"%ld",(long)o->ptr);
2165 }
2166 }
2167
2168 /*============================ DB saving/loading ============================ */
2169
2170 static int rdbSaveType(FILE *fp, unsigned char type) {
2171 if (fwrite(&type,1,1,fp) == 0) return -1;
2172 return 0;
2173 }
2174
2175 static int rdbSaveTime(FILE *fp, time_t t) {
2176 int32_t t32 = (int32_t) t;
2177 if (fwrite(&t32,4,1,fp) == 0) return -1;
2178 return 0;
2179 }
2180
2181 /* check rdbLoadLen() comments for more info */
2182 static int rdbSaveLen(FILE *fp, uint32_t len) {
2183 unsigned char buf[2];
2184
2185 if (len < (1<<6)) {
2186 /* Save a 6 bit len */
2187 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2188 if (fwrite(buf,1,1,fp) == 0) return -1;
2189 } else if (len < (1<<14)) {
2190 /* Save a 14 bit len */
2191 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2192 buf[1] = len&0xFF;
2193 if (fwrite(buf,2,1,fp) == 0) return -1;
2194 } else {
2195 /* Save a 32 bit len */
2196 buf[0] = (REDIS_RDB_32BITLEN<<6);
2197 if (fwrite(buf,1,1,fp) == 0) return -1;
2198 len = htonl(len);
2199 if (fwrite(&len,4,1,fp) == 0) return -1;
2200 }
2201 return 0;
2202 }
2203
2204 /* String objects in the form "2391" "-100" without any space and with a
2205 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2206 * encoded as integers to save space */
2207 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2208 long long value;
2209 char *endptr, buf[32];
2210
2211 /* Check if it's possible to encode this value as a number */
2212 value = strtoll(s, &endptr, 10);
2213 if (endptr[0] != '\0') return 0;
2214 snprintf(buf,32,"%lld",value);
2215
2216 /* If the number converted back into a string is not identical
2217 * then it's not possible to encode the string as integer */
2218 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2219
2220 /* Finally check if it fits in our ranges */
2221 if (value >= -(1<<7) && value <= (1<<7)-1) {
2222 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2223 enc[1] = value&0xFF;
2224 return 2;
2225 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2226 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2227 enc[1] = value&0xFF;
2228 enc[2] = (value>>8)&0xFF;
2229 return 3;
2230 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2231 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2232 enc[1] = value&0xFF;
2233 enc[2] = (value>>8)&0xFF;
2234 enc[3] = (value>>16)&0xFF;
2235 enc[4] = (value>>24)&0xFF;
2236 return 5;
2237 } else {
2238 return 0;
2239 }
2240 }
2241
2242 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2243 unsigned int comprlen, outlen;
2244 unsigned char byte;
2245 void *out;
2246
2247 /* We require at least four bytes compression for this to be worth it */
2248 outlen = sdslen(obj->ptr)-4;
2249 if (outlen <= 0) return 0;
2250 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2251 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2252 if (comprlen == 0) {
2253 zfree(out);
2254 return 0;
2255 }
2256 /* Data compressed! Let's save it on disk */
2257 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2258 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2259 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2260 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2261 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2262 zfree(out);
2263 return comprlen;
2264
2265 writeerr:
2266 zfree(out);
2267 return -1;
2268 }
2269
2270 /* Save a string objet as [len][data] on disk. If the object is a string
2271 * representation of an integer value we try to safe it in a special form */
2272 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2273 size_t len;
2274 int enclen;
2275
2276 len = sdslen(obj->ptr);
2277
2278 /* Try integer encoding */
2279 if (len <= 11) {
2280 unsigned char buf[5];
2281 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2282 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2283 return 0;
2284 }
2285 }
2286
2287 /* Try LZF compression - under 20 bytes it's unable to compress even
2288 * aaaaaaaaaaaaaaaaaa so skip it */
2289 if (len > 20) {
2290 int retval;
2291
2292 retval = rdbSaveLzfStringObject(fp,obj);
2293 if (retval == -1) return -1;
2294 if (retval > 0) return 0;
2295 /* retval == 0 means data can't be compressed, save the old way */
2296 }
2297
2298 /* Store verbatim */
2299 if (rdbSaveLen(fp,len) == -1) return -1;
2300 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2301 return 0;
2302 }
2303
2304 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2305 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2306 int retval;
2307 robj *dec;
2308
2309 if (obj->encoding != REDIS_ENCODING_RAW) {
2310 dec = getDecodedObject(obj);
2311 retval = rdbSaveStringObjectRaw(fp,dec);
2312 decrRefCount(dec);
2313 return retval;
2314 } else {
2315 return rdbSaveStringObjectRaw(fp,obj);
2316 }
2317 }
2318
2319 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2320 * 8 bit integer specifing the length of the representation.
2321 * This 8 bit integer has special values in order to specify the following
2322 * conditions:
2323 * 253: not a number
2324 * 254: + inf
2325 * 255: - inf
2326 */
2327 static int rdbSaveDoubleValue(FILE *fp, double val) {
2328 unsigned char buf[128];
2329 int len;
2330
2331 if (isnan(val)) {
2332 buf[0] = 253;
2333 len = 1;
2334 } else if (!isfinite(val)) {
2335 len = 1;
2336 buf[0] = (val < 0) ? 255 : 254;
2337 } else {
2338 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2339 buf[0] = strlen((char*)buf);
2340 len = buf[0]+1;
2341 }
2342 if (fwrite(buf,len,1,fp) == 0) return -1;
2343 return 0;
2344 }
2345
2346 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2347 static int rdbSave(char *filename) {
2348 dictIterator *di = NULL;
2349 dictEntry *de;
2350 FILE *fp;
2351 char tmpfile[256];
2352 int j;
2353 time_t now = time(NULL);
2354
2355 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2356 fp = fopen(tmpfile,"w");
2357 if (!fp) {
2358 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2359 return REDIS_ERR;
2360 }
2361 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2362 for (j = 0; j < server.dbnum; j++) {
2363 redisDb *db = server.db+j;
2364 dict *d = db->dict;
2365 if (dictSize(d) == 0) continue;
2366 di = dictGetIterator(d);
2367 if (!di) {
2368 fclose(fp);
2369 return REDIS_ERR;
2370 }
2371
2372 /* Write the SELECT DB opcode */
2373 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2374 if (rdbSaveLen(fp,j) == -1) goto werr;
2375
2376 /* Iterate this DB writing every entry */
2377 while((de = dictNext(di)) != NULL) {
2378 robj *key = dictGetEntryKey(de);
2379 robj *o = dictGetEntryVal(de);
2380 time_t expiretime = getExpire(db,key);
2381
2382 /* Save the expire time */
2383 if (expiretime != -1) {
2384 /* If this key is already expired skip it */
2385 if (expiretime < now) continue;
2386 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2387 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2388 }
2389 /* Save the key and associated value */
2390 if (rdbSaveType(fp,o->type) == -1) goto werr;
2391 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2392 if (o->type == REDIS_STRING) {
2393 /* Save a string value */
2394 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2395 } else if (o->type == REDIS_LIST) {
2396 /* Save a list value */
2397 list *list = o->ptr;
2398 listNode *ln;
2399
2400 listRewind(list);
2401 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2402 while((ln = listYield(list))) {
2403 robj *eleobj = listNodeValue(ln);
2404
2405 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2406 }
2407 } else if (o->type == REDIS_SET) {
2408 /* Save a set value */
2409 dict *set = o->ptr;
2410 dictIterator *di = dictGetIterator(set);
2411 dictEntry *de;
2412
2413 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2414 while((de = dictNext(di)) != NULL) {
2415 robj *eleobj = dictGetEntryKey(de);
2416
2417 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2418 }
2419 dictReleaseIterator(di);
2420 } else if (o->type == REDIS_ZSET) {
2421 /* Save a set value */
2422 zset *zs = o->ptr;
2423 dictIterator *di = dictGetIterator(zs->dict);
2424 dictEntry *de;
2425
2426 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2427 while((de = dictNext(di)) != NULL) {
2428 robj *eleobj = dictGetEntryKey(de);
2429 double *score = dictGetEntryVal(de);
2430
2431 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2432 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2433 }
2434 dictReleaseIterator(di);
2435 } else {
2436 assert(0 != 0);
2437 }
2438 }
2439 dictReleaseIterator(di);
2440 }
2441 /* EOF opcode */
2442 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2443
2444 /* Make sure data will not remain on the OS's output buffers */
2445 fflush(fp);
2446 fsync(fileno(fp));
2447 fclose(fp);
2448
2449 /* Use RENAME to make sure the DB file is changed atomically only
2450 * if the generate DB file is ok. */
2451 if (rename(tmpfile,filename) == -1) {
2452 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2453 unlink(tmpfile);
2454 return REDIS_ERR;
2455 }
2456 redisLog(REDIS_NOTICE,"DB saved on disk");
2457 server.dirty = 0;
2458 server.lastsave = time(NULL);
2459 return REDIS_OK;
2460
2461 werr:
2462 fclose(fp);
2463 unlink(tmpfile);
2464 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2465 if (di) dictReleaseIterator(di);
2466 return REDIS_ERR;
2467 }
2468
2469 static int rdbSaveBackground(char *filename) {
2470 pid_t childpid;
2471
2472 if (server.bgsaveinprogress) return REDIS_ERR;
2473 if ((childpid = fork()) == 0) {
2474 /* Child */
2475 close(server.fd);
2476 if (rdbSave(filename) == REDIS_OK) {
2477 exit(0);
2478 } else {
2479 exit(1);
2480 }
2481 } else {
2482 /* Parent */
2483 if (childpid == -1) {
2484 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2485 strerror(errno));
2486 return REDIS_ERR;
2487 }
2488 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2489 server.bgsaveinprogress = 1;
2490 server.bgsavechildpid = childpid;
2491 return REDIS_OK;
2492 }
2493 return REDIS_OK; /* unreached */
2494 }
2495
2496 static void rdbRemoveTempFile(pid_t childpid) {
2497 char tmpfile[256];
2498
2499 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2500 unlink(tmpfile);
2501 }
2502
2503 static int rdbLoadType(FILE *fp) {
2504 unsigned char type;
2505 if (fread(&type,1,1,fp) == 0) return -1;
2506 return type;
2507 }
2508
2509 static time_t rdbLoadTime(FILE *fp) {
2510 int32_t t32;
2511 if (fread(&t32,4,1,fp) == 0) return -1;
2512 return (time_t) t32;
2513 }
2514
2515 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2516 * of this file for a description of how this are stored on disk.
2517 *
2518 * isencoded is set to 1 if the readed length is not actually a length but
2519 * an "encoding type", check the above comments for more info */
2520 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2521 unsigned char buf[2];
2522 uint32_t len;
2523
2524 if (isencoded) *isencoded = 0;
2525 if (rdbver == 0) {
2526 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2527 return ntohl(len);
2528 } else {
2529 int type;
2530
2531 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2532 type = (buf[0]&0xC0)>>6;
2533 if (type == REDIS_RDB_6BITLEN) {
2534 /* Read a 6 bit len */
2535 return buf[0]&0x3F;
2536 } else if (type == REDIS_RDB_ENCVAL) {
2537 /* Read a 6 bit len encoding type */
2538 if (isencoded) *isencoded = 1;
2539 return buf[0]&0x3F;
2540 } else if (type == REDIS_RDB_14BITLEN) {
2541 /* Read a 14 bit len */
2542 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2543 return ((buf[0]&0x3F)<<8)|buf[1];
2544 } else {
2545 /* Read a 32 bit len */
2546 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2547 return ntohl(len);
2548 }
2549 }
2550 }
2551
2552 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2553 unsigned char enc[4];
2554 long long val;
2555
2556 if (enctype == REDIS_RDB_ENC_INT8) {
2557 if (fread(enc,1,1,fp) == 0) return NULL;
2558 val = (signed char)enc[0];
2559 } else if (enctype == REDIS_RDB_ENC_INT16) {
2560 uint16_t v;
2561 if (fread(enc,2,1,fp) == 0) return NULL;
2562 v = enc[0]|(enc[1]<<8);
2563 val = (int16_t)v;
2564 } else if (enctype == REDIS_RDB_ENC_INT32) {
2565 uint32_t v;
2566 if (fread(enc,4,1,fp) == 0) return NULL;
2567 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2568 val = (int32_t)v;
2569 } else {
2570 val = 0; /* anti-warning */
2571 assert(0!=0);
2572 }
2573 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2574 }
2575
2576 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2577 unsigned int len, clen;
2578 unsigned char *c = NULL;
2579 sds val = NULL;
2580
2581 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2582 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2583 if ((c = zmalloc(clen)) == NULL) goto err;
2584 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2585 if (fread(c,clen,1,fp) == 0) goto err;
2586 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2587 zfree(c);
2588 return createObject(REDIS_STRING,val);
2589 err:
2590 zfree(c);
2591 sdsfree(val);
2592 return NULL;
2593 }
2594
2595 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2596 int isencoded;
2597 uint32_t len;
2598 sds val;
2599
2600 len = rdbLoadLen(fp,rdbver,&isencoded);
2601 if (isencoded) {
2602 switch(len) {
2603 case REDIS_RDB_ENC_INT8:
2604 case REDIS_RDB_ENC_INT16:
2605 case REDIS_RDB_ENC_INT32:
2606 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2607 case REDIS_RDB_ENC_LZF:
2608 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2609 default:
2610 assert(0!=0);
2611 }
2612 }
2613
2614 if (len == REDIS_RDB_LENERR) return NULL;
2615 val = sdsnewlen(NULL,len);
2616 if (len && fread(val,len,1,fp) == 0) {
2617 sdsfree(val);
2618 return NULL;
2619 }
2620 return tryObjectSharing(createObject(REDIS_STRING,val));
2621 }
2622
2623 /* For information about double serialization check rdbSaveDoubleValue() */
2624 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2625 char buf[128];
2626 unsigned char len;
2627
2628 if (fread(&len,1,1,fp) == 0) return -1;
2629 switch(len) {
2630 case 255: *val = R_NegInf; return 0;
2631 case 254: *val = R_PosInf; return 0;
2632 case 253: *val = R_Nan; return 0;
2633 default:
2634 if (fread(buf,len,1,fp) == 0) return -1;
2635 sscanf(buf, "%lg", val);
2636 return 0;
2637 }
2638 }
2639
2640 static int rdbLoad(char *filename) {
2641 FILE *fp;
2642 robj *keyobj = NULL;
2643 uint32_t dbid;
2644 int type, retval, rdbver;
2645 dict *d = server.db[0].dict;
2646 redisDb *db = server.db+0;
2647 char buf[1024];
2648 time_t expiretime = -1, now = time(NULL);
2649
2650 fp = fopen(filename,"r");
2651 if (!fp) return REDIS_ERR;
2652 if (fread(buf,9,1,fp) == 0) goto eoferr;
2653 buf[9] = '\0';
2654 if (memcmp(buf,"REDIS",5) != 0) {
2655 fclose(fp);
2656 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2657 return REDIS_ERR;
2658 }
2659 rdbver = atoi(buf+5);
2660 if (rdbver > 1) {
2661 fclose(fp);
2662 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2663 return REDIS_ERR;
2664 }
2665 while(1) {
2666 robj *o;
2667
2668 /* Read type. */
2669 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2670 if (type == REDIS_EXPIRETIME) {
2671 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2672 /* We read the time so we need to read the object type again */
2673 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2674 }
2675 if (type == REDIS_EOF) break;
2676 /* Handle SELECT DB opcode as a special case */
2677 if (type == REDIS_SELECTDB) {
2678 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2679 goto eoferr;
2680 if (dbid >= (unsigned)server.dbnum) {
2681 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2682 exit(1);
2683 }
2684 db = server.db+dbid;
2685 d = db->dict;
2686 continue;
2687 }
2688 /* Read key */
2689 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2690
2691 if (type == REDIS_STRING) {
2692 /* Read string value */
2693 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2694 tryObjectEncoding(o);
2695 } else if (type == REDIS_LIST || type == REDIS_SET) {
2696 /* Read list/set value */
2697 uint32_t listlen;
2698
2699 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2700 goto eoferr;
2701 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2702 /* Load every single element of the list/set */
2703 while(listlen--) {
2704 robj *ele;
2705
2706 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2707 tryObjectEncoding(ele);
2708 if (type == REDIS_LIST) {
2709 listAddNodeTail((list*)o->ptr,ele);
2710 } else {
2711 dictAdd((dict*)o->ptr,ele,NULL);
2712 }
2713 }
2714 } else if (type == REDIS_ZSET) {
2715 /* Read list/set value */
2716 uint32_t zsetlen;
2717 zset *zs;
2718
2719 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2720 goto eoferr;
2721 o = createZsetObject();
2722 zs = o->ptr;
2723 /* Load every single element of the list/set */
2724 while(zsetlen--) {
2725 robj *ele;
2726 double *score = zmalloc(sizeof(double));
2727
2728 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2729 tryObjectEncoding(ele);
2730 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2731 dictAdd(zs->dict,ele,score);
2732 zslInsert(zs->zsl,*score,ele);
2733 incrRefCount(ele); /* added to skiplist */
2734 }
2735 } else {
2736 assert(0 != 0);
2737 }
2738 /* Add the new object in the hash table */
2739 retval = dictAdd(d,keyobj,o);
2740 if (retval == DICT_ERR) {
2741 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2742 exit(1);
2743 }
2744 /* Set the expire time if needed */
2745 if (expiretime != -1) {
2746 setExpire(db,keyobj,expiretime);
2747 /* Delete this key if already expired */
2748 if (expiretime < now) deleteKey(db,keyobj);
2749 expiretime = -1;
2750 }
2751 keyobj = o = NULL;
2752 }
2753 fclose(fp);
2754 return REDIS_OK;
2755
2756 eoferr: /* unexpected end of file is handled here with a fatal exit */
2757 if (keyobj) decrRefCount(keyobj);
2758 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2759 exit(1);
2760 return REDIS_ERR; /* Just to avoid warning */
2761 }
2762
2763 /*================================== Commands =============================== */
2764
2765 static void authCommand(redisClient *c) {
2766 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2767 c->authenticated = 1;
2768 addReply(c,shared.ok);
2769 } else {
2770 c->authenticated = 0;
2771 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2772 }
2773 }
2774
2775 static void pingCommand(redisClient *c) {
2776 addReply(c,shared.pong);
2777 }
2778
2779 static void echoCommand(redisClient *c) {
2780 addReplyBulkLen(c,c->argv[1]);
2781 addReply(c,c->argv[1]);
2782 addReply(c,shared.crlf);
2783 }
2784
2785 /*=================================== Strings =============================== */
2786
2787 static void setGenericCommand(redisClient *c, int nx) {
2788 int retval;
2789
2790 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2791 if (retval == DICT_ERR) {
2792 if (!nx) {
2793 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2794 incrRefCount(c->argv[2]);
2795 } else {
2796 addReply(c,shared.czero);
2797 return;
2798 }
2799 } else {
2800 incrRefCount(c->argv[1]);
2801 incrRefCount(c->argv[2]);
2802 }
2803 server.dirty++;
2804 removeExpire(c->db,c->argv[1]);
2805 addReply(c, nx ? shared.cone : shared.ok);
2806 }
2807
2808 static void setCommand(redisClient *c) {
2809 setGenericCommand(c,0);
2810 }
2811
2812 static void setnxCommand(redisClient *c) {
2813 setGenericCommand(c,1);
2814 }
2815
2816 static void getCommand(redisClient *c) {
2817 robj *o = lookupKeyRead(c->db,c->argv[1]);
2818
2819 if (o == NULL) {
2820 addReply(c,shared.nullbulk);
2821 } else {
2822 if (o->type != REDIS_STRING) {
2823 addReply(c,shared.wrongtypeerr);
2824 } else {
2825 addReplyBulkLen(c,o);
2826 addReply(c,o);
2827 addReply(c,shared.crlf);
2828 }
2829 }
2830 }
2831
2832 static void getsetCommand(redisClient *c) {
2833 getCommand(c);
2834 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2835 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2836 } else {
2837 incrRefCount(c->argv[1]);
2838 }
2839 incrRefCount(c->argv[2]);
2840 server.dirty++;
2841 removeExpire(c->db,c->argv[1]);
2842 }
2843
2844 static void mgetCommand(redisClient *c) {
2845 int j;
2846
2847 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2848 for (j = 1; j < c->argc; j++) {
2849 robj *o = lookupKeyRead(c->db,c->argv[j]);
2850 if (o == NULL) {
2851 addReply(c,shared.nullbulk);
2852 } else {
2853 if (o->type != REDIS_STRING) {
2854 addReply(c,shared.nullbulk);
2855 } else {
2856 addReplyBulkLen(c,o);
2857 addReply(c,o);
2858 addReply(c,shared.crlf);
2859 }
2860 }
2861 }
2862 }
2863
2864 static void incrDecrCommand(redisClient *c, long long incr) {
2865 long long value;
2866 int retval;
2867 robj *o;
2868
2869 o = lookupKeyWrite(c->db,c->argv[1]);
2870 if (o == NULL) {
2871 value = 0;
2872 } else {
2873 if (o->type != REDIS_STRING) {
2874 value = 0;
2875 } else {
2876 char *eptr;
2877
2878 if (o->encoding == REDIS_ENCODING_RAW)
2879 value = strtoll(o->ptr, &eptr, 10);
2880 else if (o->encoding == REDIS_ENCODING_INT)
2881 value = (long)o->ptr;
2882 else
2883 assert(1 != 1);
2884 }
2885 }
2886
2887 value += incr;
2888 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2889 tryObjectEncoding(o);
2890 retval = dictAdd(c->db->dict,c->argv[1],o);
2891 if (retval == DICT_ERR) {
2892 dictReplace(c->db->dict,c->argv[1],o);
2893 removeExpire(c->db,c->argv[1]);
2894 } else {
2895 incrRefCount(c->argv[1]);
2896 }
2897 server.dirty++;
2898 addReply(c,shared.colon);
2899 addReply(c,o);
2900 addReply(c,shared.crlf);
2901 }
2902
2903 static void incrCommand(redisClient *c) {
2904 incrDecrCommand(c,1);
2905 }
2906
2907 static void decrCommand(redisClient *c) {
2908 incrDecrCommand(c,-1);
2909 }
2910
2911 static void incrbyCommand(redisClient *c) {
2912 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2913 incrDecrCommand(c,incr);
2914 }
2915
2916 static void decrbyCommand(redisClient *c) {
2917 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2918 incrDecrCommand(c,-incr);
2919 }
2920
2921 /* ========================= Type agnostic commands ========================= */
2922
2923 static void delCommand(redisClient *c) {
2924 int deleted = 0, j;
2925
2926 for (j = 1; j < c->argc; j++) {
2927 if (deleteKey(c->db,c->argv[j])) {
2928 server.dirty++;
2929 deleted++;
2930 }
2931 }
2932 switch(deleted) {
2933 case 0:
2934 addReply(c,shared.czero);
2935 break;
2936 case 1:
2937 addReply(c,shared.cone);
2938 break;
2939 default:
2940 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2941 break;
2942 }
2943 }
2944
2945 static void existsCommand(redisClient *c) {
2946 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2947 }
2948
2949 static void selectCommand(redisClient *c) {
2950 int id = atoi(c->argv[1]->ptr);
2951
2952 if (selectDb(c,id) == REDIS_ERR) {
2953 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2954 } else {
2955 addReply(c,shared.ok);
2956 }
2957 }
2958
2959 static void randomkeyCommand(redisClient *c) {
2960 dictEntry *de;
2961
2962 while(1) {
2963 de = dictGetRandomKey(c->db->dict);
2964 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2965 }
2966 if (de == NULL) {
2967 addReply(c,shared.plus);
2968 addReply(c,shared.crlf);
2969 } else {
2970 addReply(c,shared.plus);
2971 addReply(c,dictGetEntryKey(de));
2972 addReply(c,shared.crlf);
2973 }
2974 }
2975
2976 static void keysCommand(redisClient *c) {
2977 dictIterator *di;
2978 dictEntry *de;
2979 sds pattern = c->argv[1]->ptr;
2980 int plen = sdslen(pattern);
2981 int numkeys = 0, keyslen = 0;
2982 robj *lenobj = createObject(REDIS_STRING,NULL);
2983
2984 di = dictGetIterator(c->db->dict);
2985 addReply(c,lenobj);
2986 decrRefCount(lenobj);
2987 while((de = dictNext(di)) != NULL) {
2988 robj *keyobj = dictGetEntryKey(de);
2989
2990 sds key = keyobj->ptr;
2991 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2992 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2993 if (expireIfNeeded(c->db,keyobj) == 0) {
2994 if (numkeys != 0)
2995 addReply(c,shared.space);
2996 addReply(c,keyobj);
2997 numkeys++;
2998 keyslen += sdslen(key);
2999 }
3000 }
3001 }
3002 dictReleaseIterator(di);
3003 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3004 addReply(c,shared.crlf);
3005 }
3006
3007 static void dbsizeCommand(redisClient *c) {
3008 addReplySds(c,
3009 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3010 }
3011
3012 static void lastsaveCommand(redisClient *c) {
3013 addReplySds(c,
3014 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3015 }
3016
3017 static void typeCommand(redisClient *c) {
3018 robj *o;
3019 char *type;
3020
3021 o = lookupKeyRead(c->db,c->argv[1]);
3022 if (o == NULL) {
3023 type = "+none";
3024 } else {
3025 switch(o->type) {
3026 case REDIS_STRING: type = "+string"; break;
3027 case REDIS_LIST: type = "+list"; break;
3028 case REDIS_SET: type = "+set"; break;
3029 case REDIS_ZSET: type = "+zset"; break;
3030 default: type = "unknown"; break;
3031 }
3032 }
3033 addReplySds(c,sdsnew(type));
3034 addReply(c,shared.crlf);
3035 }
3036
3037 static void saveCommand(redisClient *c) {
3038 if (server.bgsaveinprogress) {
3039 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3040 return;
3041 }
3042 if (rdbSave(server.dbfilename) == REDIS_OK) {
3043 addReply(c,shared.ok);
3044 } else {
3045 addReply(c,shared.err);
3046 }
3047 }
3048
3049 static void bgsaveCommand(redisClient *c) {
3050 if (server.bgsaveinprogress) {
3051 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3052 return;
3053 }
3054 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3055 addReply(c,shared.ok);
3056 } else {
3057 addReply(c,shared.err);
3058 }
3059 }
3060
3061 static void shutdownCommand(redisClient *c) {
3062 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3063 /* Kill the saving child if there is a background saving in progress.
3064 We want to avoid race conditions, for instance our saving child may
3065 overwrite the synchronous saving did by SHUTDOWN. */
3066 if (server.bgsaveinprogress) {
3067 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3068 kill(server.bgsavechildpid,SIGKILL);
3069 rdbRemoveTempFile(server.bgsavechildpid);
3070 }
3071 /* SYNC SAVE */
3072 if (rdbSave(server.dbfilename) == REDIS_OK) {
3073 if (server.daemonize)
3074 unlink(server.pidfile);
3075 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3076 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3077 exit(1);
3078 } else {
3079 /* Ooops.. error saving! The best we can do is to continue operating.
3080 * Note that if there was a background saving process, in the next
3081 * cron() Redis will be notified that the background saving aborted,
3082 * handling special stuff like slaves pending for synchronization... */
3083 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3084 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3085 }
3086 }
3087
3088 static void renameGenericCommand(redisClient *c, int nx) {
3089 robj *o;
3090
3091 /* To use the same key as src and dst is probably an error */
3092 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3093 addReply(c,shared.sameobjecterr);
3094 return;
3095 }
3096
3097 o = lookupKeyWrite(c->db,c->argv[1]);
3098 if (o == NULL) {
3099 addReply(c,shared.nokeyerr);
3100 return;
3101 }
3102 incrRefCount(o);
3103 deleteIfVolatile(c->db,c->argv[2]);
3104 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3105 if (nx) {
3106 decrRefCount(o);
3107 addReply(c,shared.czero);
3108 return;
3109 }
3110 dictReplace(c->db->dict,c->argv[2],o);
3111 } else {
3112 incrRefCount(c->argv[2]);
3113 }
3114 deleteKey(c->db,c->argv[1]);
3115 server.dirty++;
3116 addReply(c,nx ? shared.cone : shared.ok);
3117 }
3118
3119 static void renameCommand(redisClient *c) {
3120 renameGenericCommand(c,0);
3121 }
3122
3123 static void renamenxCommand(redisClient *c) {
3124 renameGenericCommand(c,1);
3125 }
3126
3127 static void moveCommand(redisClient *c) {
3128 robj *o;
3129 redisDb *src, *dst;
3130 int srcid;
3131
3132 /* Obtain source and target DB pointers */
3133 src = c->db;
3134 srcid = c->db->id;
3135 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3136 addReply(c,shared.outofrangeerr);
3137 return;
3138 }
3139 dst = c->db;
3140 selectDb(c,srcid); /* Back to the source DB */
3141
3142 /* If the user is moving using as target the same
3143 * DB as the source DB it is probably an error. */
3144 if (src == dst) {
3145 addReply(c,shared.sameobjecterr);
3146 return;
3147 }
3148
3149 /* Check if the element exists and get a reference */
3150 o = lookupKeyWrite(c->db,c->argv[1]);
3151 if (!o) {
3152 addReply(c,shared.czero);
3153 return;
3154 }
3155
3156 /* Try to add the element to the target DB */
3157 deleteIfVolatile(dst,c->argv[1]);
3158 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3159 addReply(c,shared.czero);
3160 return;
3161 }
3162 incrRefCount(c->argv[1]);
3163 incrRefCount(o);
3164
3165 /* OK! key moved, free the entry in the source DB */
3166 deleteKey(src,c->argv[1]);
3167 server.dirty++;
3168 addReply(c,shared.cone);
3169 }
3170
3171 /* =================================== Lists ================================ */
3172 static void pushGenericCommand(redisClient *c, int where) {
3173 robj *lobj;
3174 list *list;
3175
3176 lobj = lookupKeyWrite(c->db,c->argv[1]);
3177 if (lobj == NULL) {
3178 lobj = createListObject();
3179 list = lobj->ptr;
3180 if (where == REDIS_HEAD) {
3181 listAddNodeHead(list,c->argv[2]);
3182 } else {
3183 listAddNodeTail(list,c->argv[2]);
3184 }
3185 dictAdd(c->db->dict,c->argv[1],lobj);
3186 incrRefCount(c->argv[1]);
3187 incrRefCount(c->argv[2]);
3188 } else {
3189 if (lobj->type != REDIS_LIST) {
3190 addReply(c,shared.wrongtypeerr);
3191 return;
3192 }
3193 list = lobj->ptr;
3194 if (where == REDIS_HEAD) {
3195 listAddNodeHead(list,c->argv[2]);
3196 } else {
3197 listAddNodeTail(list,c->argv[2]);
3198 }
3199 incrRefCount(c->argv[2]);
3200 }
3201 server.dirty++;
3202 addReply(c,shared.ok);
3203 }
3204
3205 static void lpushCommand(redisClient *c) {
3206 pushGenericCommand(c,REDIS_HEAD);
3207 }
3208
3209 static void rpushCommand(redisClient *c) {
3210 pushGenericCommand(c,REDIS_TAIL);
3211 }
3212
3213 static void llenCommand(redisClient *c) {
3214 robj *o;
3215 list *l;
3216
3217 o = lookupKeyRead(c->db,c->argv[1]);
3218 if (o == NULL) {
3219 addReply(c,shared.czero);
3220 return;
3221 } else {
3222 if (o->type != REDIS_LIST) {
3223 addReply(c,shared.wrongtypeerr);
3224 } else {
3225 l = o->ptr;
3226 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3227 }
3228 }
3229 }
3230
3231 static void lindexCommand(redisClient *c) {
3232 robj *o;
3233 int index = atoi(c->argv[2]->ptr);
3234
3235 o = lookupKeyRead(c->db,c->argv[1]);
3236 if (o == NULL) {
3237 addReply(c,shared.nullbulk);
3238 } else {
3239 if (o->type != REDIS_LIST) {
3240 addReply(c,shared.wrongtypeerr);
3241 } else {
3242 list *list = o->ptr;
3243 listNode *ln;
3244
3245 ln = listIndex(list, index);
3246 if (ln == NULL) {
3247 addReply(c,shared.nullbulk);
3248 } else {
3249 robj *ele = listNodeValue(ln);
3250 addReplyBulkLen(c,ele);
3251 addReply(c,ele);
3252 addReply(c,shared.crlf);
3253 }
3254 }
3255 }
3256 }
3257
3258 static void lsetCommand(redisClient *c) {
3259 robj *o;
3260 int index = atoi(c->argv[2]->ptr);
3261
3262 o = lookupKeyWrite(c->db,c->argv[1]);
3263 if (o == NULL) {
3264 addReply(c,shared.nokeyerr);
3265 } else {
3266 if (o->type != REDIS_LIST) {
3267 addReply(c,shared.wrongtypeerr);
3268 } else {
3269 list *list = o->ptr;
3270 listNode *ln;
3271
3272 ln = listIndex(list, index);
3273 if (ln == NULL) {
3274 addReply(c,shared.outofrangeerr);
3275 } else {
3276 robj *ele = listNodeValue(ln);
3277
3278 decrRefCount(ele);
3279 listNodeValue(ln) = c->argv[3];
3280 incrRefCount(c->argv[3]);
3281 addReply(c,shared.ok);
3282 server.dirty++;
3283 }
3284 }
3285 }
3286 }
3287
3288 static void popGenericCommand(redisClient *c, int where) {
3289 robj *o;
3290
3291 o = lookupKeyWrite(c->db,c->argv[1]);
3292 if (o == NULL) {
3293 addReply(c,shared.nullbulk);
3294 } else {
3295 if (o->type != REDIS_LIST) {
3296 addReply(c,shared.wrongtypeerr);
3297 } else {
3298 list *list = o->ptr;
3299 listNode *ln;
3300
3301 if (where == REDIS_HEAD)
3302 ln = listFirst(list);
3303 else
3304 ln = listLast(list);
3305
3306 if (ln == NULL) {
3307 addReply(c,shared.nullbulk);
3308 } else {
3309 robj *ele = listNodeValue(ln);
3310 addReplyBulkLen(c,ele);
3311 addReply(c,ele);
3312 addReply(c,shared.crlf);
3313 listDelNode(list,ln);
3314 server.dirty++;
3315 }
3316 }
3317 }
3318 }
3319
3320 static void lpopCommand(redisClient *c) {
3321 popGenericCommand(c,REDIS_HEAD);
3322 }
3323
3324 static void rpopCommand(redisClient *c) {
3325 popGenericCommand(c,REDIS_TAIL);
3326 }
3327
3328 static void lrangeCommand(redisClient *c) {
3329 robj *o;
3330 int start = atoi(c->argv[2]->ptr);
3331 int end = atoi(c->argv[3]->ptr);
3332
3333 o = lookupKeyRead(c->db,c->argv[1]);
3334 if (o == NULL) {
3335 addReply(c,shared.nullmultibulk);
3336 } else {
3337 if (o->type != REDIS_LIST) {
3338 addReply(c,shared.wrongtypeerr);
3339 } else {
3340 list *list = o->ptr;
3341 listNode *ln;
3342 int llen = listLength(list);
3343 int rangelen, j;
3344 robj *ele;
3345
3346 /* convert negative indexes */
3347 if (start < 0) start = llen+start;
3348 if (end < 0) end = llen+end;
3349 if (start < 0) start = 0;
3350 if (end < 0) end = 0;
3351
3352 /* indexes sanity checks */
3353 if (start > end || start >= llen) {
3354 /* Out of range start or start > end result in empty list */
3355 addReply(c,shared.emptymultibulk);
3356 return;
3357 }
3358 if (end >= llen) end = llen-1;
3359 rangelen = (end-start)+1;
3360
3361 /* Return the result in form of a multi-bulk reply */
3362 ln = listIndex(list, start);
3363 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3364 for (j = 0; j < rangelen; j++) {
3365 ele = listNodeValue(ln);
3366 addReplyBulkLen(c,ele);
3367 addReply(c,ele);
3368 addReply(c,shared.crlf);
3369 ln = ln->next;
3370 }
3371 }
3372 }
3373 }
3374
3375 static void ltrimCommand(redisClient *c) {
3376 robj *o;
3377 int start = atoi(c->argv[2]->ptr);
3378 int end = atoi(c->argv[3]->ptr);
3379
3380 o = lookupKeyWrite(c->db,c->argv[1]);
3381 if (o == NULL) {
3382 addReply(c,shared.nokeyerr);
3383 } else {
3384 if (o->type != REDIS_LIST) {
3385 addReply(c,shared.wrongtypeerr);
3386 } else {
3387 list *list = o->ptr;
3388 listNode *ln;
3389 int llen = listLength(list);
3390 int j, ltrim, rtrim;
3391
3392 /* convert negative indexes */
3393 if (start < 0) start = llen+start;
3394 if (end < 0) end = llen+end;
3395 if (start < 0) start = 0;
3396 if (end < 0) end = 0;
3397
3398 /* indexes sanity checks */
3399 if (start > end || start >= llen) {
3400 /* Out of range start or start > end result in empty list */
3401 ltrim = llen;
3402 rtrim = 0;
3403 } else {
3404 if (end >= llen) end = llen-1;
3405 ltrim = start;
3406 rtrim = llen-end-1;
3407 }
3408
3409 /* Remove list elements to perform the trim */
3410 for (j = 0; j < ltrim; j++) {
3411 ln = listFirst(list);
3412 listDelNode(list,ln);
3413 }
3414 for (j = 0; j < rtrim; j++) {
3415 ln = listLast(list);
3416 listDelNode(list,ln);
3417 }
3418 server.dirty++;
3419 addReply(c,shared.ok);
3420 }
3421 }
3422 }
3423
3424 static void lremCommand(redisClient *c) {
3425 robj *o;
3426
3427 o = lookupKeyWrite(c->db,c->argv[1]);
3428 if (o == NULL) {
3429 addReply(c,shared.czero);
3430 } else {
3431 if (o->type != REDIS_LIST) {
3432 addReply(c,shared.wrongtypeerr);
3433 } else {
3434 list *list = o->ptr;
3435 listNode *ln, *next;
3436 int toremove = atoi(c->argv[2]->ptr);
3437 int removed = 0;
3438 int fromtail = 0;
3439
3440 if (toremove < 0) {
3441 toremove = -toremove;
3442 fromtail = 1;
3443 }
3444 ln = fromtail ? list->tail : list->head;
3445 while (ln) {
3446 robj *ele = listNodeValue(ln);
3447
3448 next = fromtail ? ln->prev : ln->next;
3449 if (compareStringObjects(ele,c->argv[3]) == 0) {
3450 listDelNode(list,ln);
3451 server.dirty++;
3452 removed++;
3453 if (toremove && removed == toremove) break;
3454 }
3455 ln = next;
3456 }
3457 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3458 }
3459 }
3460 }
3461
3462 /* ==================================== Sets ================================ */
3463
3464 static void saddCommand(redisClient *c) {
3465 robj *set;
3466
3467 set = lookupKeyWrite(c->db,c->argv[1]);
3468 if (set == NULL) {
3469 set = createSetObject();
3470 dictAdd(c->db->dict,c->argv[1],set);
3471 incrRefCount(c->argv[1]);
3472 } else {
3473 if (set->type != REDIS_SET) {
3474 addReply(c,shared.wrongtypeerr);
3475 return;
3476 }
3477 }
3478 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3479 incrRefCount(c->argv[2]);
3480 server.dirty++;
3481 addReply(c,shared.cone);
3482 } else {
3483 addReply(c,shared.czero);
3484 }
3485 }
3486
3487 static void sremCommand(redisClient *c) {
3488 robj *set;
3489
3490 set = lookupKeyWrite(c->db,c->argv[1]);
3491 if (set == NULL) {
3492 addReply(c,shared.czero);
3493 } else {
3494 if (set->type != REDIS_SET) {
3495 addReply(c,shared.wrongtypeerr);
3496 return;
3497 }
3498 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3499 server.dirty++;
3500 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3501 addReply(c,shared.cone);
3502 } else {
3503 addReply(c,shared.czero);
3504 }
3505 }
3506 }
3507
3508 static void smoveCommand(redisClient *c) {
3509 robj *srcset, *dstset;
3510
3511 srcset = lookupKeyWrite(c->db,c->argv[1]);
3512 dstset = lookupKeyWrite(c->db,c->argv[2]);
3513
3514 /* If the source key does not exist return 0, if it's of the wrong type
3515 * raise an error */
3516 if (srcset == NULL || srcset->type != REDIS_SET) {
3517 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3518 return;
3519 }
3520 /* Error if the destination key is not a set as well */
3521 if (dstset && dstset->type != REDIS_SET) {
3522 addReply(c,shared.wrongtypeerr);
3523 return;
3524 }
3525 /* Remove the element from the source set */
3526 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3527 /* Key not found in the src set! return zero */
3528 addReply(c,shared.czero);
3529 return;
3530 }
3531 server.dirty++;
3532 /* Add the element to the destination set */
3533 if (!dstset) {
3534 dstset = createSetObject();
3535 dictAdd(c->db->dict,c->argv[2],dstset);
3536 incrRefCount(c->argv[2]);
3537 }
3538 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3539 incrRefCount(c->argv[3]);
3540 addReply(c,shared.cone);
3541 }
3542
3543 static void sismemberCommand(redisClient *c) {
3544 robj *set;
3545
3546 set = lookupKeyRead(c->db,c->argv[1]);
3547 if (set == NULL) {
3548 addReply(c,shared.czero);
3549 } else {
3550 if (set->type != REDIS_SET) {
3551 addReply(c,shared.wrongtypeerr);
3552 return;
3553 }
3554 if (dictFind(set->ptr,c->argv[2]))
3555 addReply(c,shared.cone);
3556 else
3557 addReply(c,shared.czero);
3558 }
3559 }
3560
3561 static void scardCommand(redisClient *c) {
3562 robj *o;
3563 dict *s;
3564
3565 o = lookupKeyRead(c->db,c->argv[1]);
3566 if (o == NULL) {
3567 addReply(c,shared.czero);
3568 return;
3569 } else {
3570 if (o->type != REDIS_SET) {
3571 addReply(c,shared.wrongtypeerr);
3572 } else {
3573 s = o->ptr;
3574 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3575 dictSize(s)));
3576 }
3577 }
3578 }
3579
3580 static void spopCommand(redisClient *c) {
3581 robj *set;
3582 dictEntry *de;
3583
3584 set = lookupKeyWrite(c->db,c->argv[1]);
3585 if (set == NULL) {
3586 addReply(c,shared.nullbulk);
3587 } else {
3588 if (set->type != REDIS_SET) {
3589 addReply(c,shared.wrongtypeerr);
3590 return;
3591 }
3592 de = dictGetRandomKey(set->ptr);
3593 if (de == NULL) {
3594 addReply(c,shared.nullbulk);
3595 } else {
3596 robj *ele = dictGetEntryKey(de);
3597
3598 addReplyBulkLen(c,ele);
3599 addReply(c,ele);
3600 addReply(c,shared.crlf);
3601 dictDelete(set->ptr,ele);
3602 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3603 server.dirty++;
3604 }
3605 }
3606 }
3607
3608 static void srandmemberCommand(redisClient *c) {
3609 robj *set;
3610 dictEntry *de;
3611
3612 set = lookupKeyRead(c->db,c->argv[1]);
3613 if (set == NULL) {
3614 addReply(c,shared.nullbulk);
3615 } else {
3616 if (set->type != REDIS_SET) {
3617 addReply(c,shared.wrongtypeerr);
3618 return;
3619 }
3620 de = dictGetRandomKey(set->ptr);
3621 if (de == NULL) {
3622 addReply(c,shared.nullbulk);
3623 } else {
3624 robj *ele = dictGetEntryKey(de);
3625
3626 addReplyBulkLen(c,ele);
3627 addReply(c,ele);
3628 addReply(c,shared.crlf);
3629 }
3630 }
3631 }
3632
3633 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3634 dict **d1 = (void*) s1, **d2 = (void*) s2;
3635
3636 return dictSize(*d1)-dictSize(*d2);
3637 }
3638
3639 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3640 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3641 dictIterator *di;
3642 dictEntry *de;
3643 robj *lenobj = NULL, *dstset = NULL;
3644 int j, cardinality = 0;
3645
3646 for (j = 0; j < setsnum; j++) {
3647 robj *setobj;
3648
3649 setobj = dstkey ?
3650 lookupKeyWrite(c->db,setskeys[j]) :
3651 lookupKeyRead(c->db,setskeys[j]);
3652 if (!setobj) {
3653 zfree(dv);
3654 if (dstkey) {
3655 deleteKey(c->db,dstkey);
3656 addReply(c,shared.ok);
3657 } else {
3658 addReply(c,shared.nullmultibulk);
3659 }
3660 return;
3661 }
3662 if (setobj->type != REDIS_SET) {
3663 zfree(dv);
3664 addReply(c,shared.wrongtypeerr);
3665 return;
3666 }
3667 dv[j] = setobj->ptr;
3668 }
3669 /* Sort sets from the smallest to largest, this will improve our
3670 * algorithm's performace */
3671 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3672
3673 /* The first thing we should output is the total number of elements...
3674 * since this is a multi-bulk write, but at this stage we don't know
3675 * the intersection set size, so we use a trick, append an empty object
3676 * to the output list and save the pointer to later modify it with the
3677 * right length */
3678 if (!dstkey) {
3679 lenobj = createObject(REDIS_STRING,NULL);
3680 addReply(c,lenobj);
3681 decrRefCount(lenobj);
3682 } else {
3683 /* If we have a target key where to store the resulting set
3684 * create this key with an empty set inside */
3685 dstset = createSetObject();
3686 }
3687
3688 /* Iterate all the elements of the first (smallest) set, and test
3689 * the element against all the other sets, if at least one set does
3690 * not include the element it is discarded */
3691 di = dictGetIterator(dv[0]);
3692
3693 while((de = dictNext(di)) != NULL) {
3694 robj *ele;
3695
3696 for (j = 1; j < setsnum; j++)
3697 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3698 if (j != setsnum)
3699 continue; /* at least one set does not contain the member */
3700 ele = dictGetEntryKey(de);
3701 if (!dstkey) {
3702 addReplyBulkLen(c,ele);
3703 addReply(c,ele);
3704 addReply(c,shared.crlf);
3705 cardinality++;
3706 } else {
3707 dictAdd(dstset->ptr,ele,NULL);
3708 incrRefCount(ele);
3709 }
3710 }
3711 dictReleaseIterator(di);
3712
3713 if (dstkey) {
3714 /* Store the resulting set into the target */
3715 deleteKey(c->db,dstkey);
3716 dictAdd(c->db->dict,dstkey,dstset);
3717 incrRefCount(dstkey);
3718 }
3719
3720 if (!dstkey) {
3721 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3722 } else {
3723 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3724 dictSize((dict*)dstset->ptr)));
3725 server.dirty++;
3726 }
3727 zfree(dv);
3728 }
3729
3730 static void sinterCommand(redisClient *c) {
3731 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3732 }
3733
3734 static void sinterstoreCommand(redisClient *c) {
3735 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3736 }
3737
3738 #define REDIS_OP_UNION 0
3739 #define REDIS_OP_DIFF 1
3740
3741 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3742 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3743 dictIterator *di;
3744 dictEntry *de;
3745 robj *dstset = NULL;
3746 int j, cardinality = 0;
3747
3748 for (j = 0; j < setsnum; j++) {
3749 robj *setobj;
3750
3751 setobj = dstkey ?
3752 lookupKeyWrite(c->db,setskeys[j]) :
3753 lookupKeyRead(c->db,setskeys[j]);
3754 if (!setobj) {
3755 dv[j] = NULL;
3756 continue;
3757 }
3758 if (setobj->type != REDIS_SET) {
3759 zfree(dv);
3760 addReply(c,shared.wrongtypeerr);
3761 return;
3762 }
3763 dv[j] = setobj->ptr;
3764 }
3765
3766 /* We need a temp set object to store our union. If the dstkey
3767 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3768 * this set object will be the resulting object to set into the target key*/
3769 dstset = createSetObject();
3770
3771 /* Iterate all the elements of all the sets, add every element a single
3772 * time to the result set */
3773 for (j = 0; j < setsnum; j++) {
3774 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3775 if (!dv[j]) continue; /* non existing keys are like empty sets */
3776
3777 di = dictGetIterator(dv[j]);
3778
3779 while((de = dictNext(di)) != NULL) {
3780 robj *ele;
3781
3782 /* dictAdd will not add the same element multiple times */
3783 ele = dictGetEntryKey(de);
3784 if (op == REDIS_OP_UNION || j == 0) {
3785 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3786 incrRefCount(ele);
3787 cardinality++;
3788 }
3789 } else if (op == REDIS_OP_DIFF) {
3790 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3791 cardinality--;
3792 }
3793 }
3794 }
3795 dictReleaseIterator(di);
3796
3797 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3798 }
3799
3800 /* Output the content of the resulting set, if not in STORE mode */
3801 if (!dstkey) {
3802 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3803 di = dictGetIterator(dstset->ptr);
3804 while((de = dictNext(di)) != NULL) {
3805 robj *ele;
3806
3807 ele = dictGetEntryKey(de);
3808 addReplyBulkLen(c,ele);
3809 addReply(c,ele);
3810 addReply(c,shared.crlf);
3811 }
3812 dictReleaseIterator(di);
3813 } else {
3814 /* If we have a target key where to store the resulting set
3815 * create this key with the result set inside */
3816 deleteKey(c->db,dstkey);
3817 dictAdd(c->db->dict,dstkey,dstset);
3818 incrRefCount(dstkey);
3819 }
3820
3821 /* Cleanup */
3822 if (!dstkey) {
3823 decrRefCount(dstset);
3824 } else {
3825 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3826 dictSize((dict*)dstset->ptr)));
3827 server.dirty++;
3828 }
3829 zfree(dv);
3830 }
3831
3832 static void sunionCommand(redisClient *c) {
3833 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3834 }
3835
3836 static void sunionstoreCommand(redisClient *c) {
3837 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3838 }
3839
3840 static void sdiffCommand(redisClient *c) {
3841 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3842 }
3843
3844 static void sdiffstoreCommand(redisClient *c) {
3845 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3846 }
3847
3848 /* ==================================== ZSets =============================== */
3849
3850 /* ZSETs are ordered sets using two data structures to hold the same elements
3851 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3852 * data structure.
3853 *
3854 * The elements are added to an hash table mapping Redis objects to scores.
3855 * At the same time the elements are added to a skip list mapping scores
3856 * to Redis objects (so objects are sorted by scores in this "view"). */
3857
3858 /* This skiplist implementation is almost a C translation of the original
3859 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3860 * Alternative to Balanced Trees", modified in three ways:
3861 * a) this implementation allows for repeated values.
3862 * b) the comparison is not just by key (our 'score') but by satellite data.
3863 * c) there is a back pointer, so it's a doubly linked list with the back
3864 * pointers being only at "level 1". This allows to traverse the list
3865 * from tail to head, useful for ZREVRANGE. */
3866
3867 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3868 zskiplistNode *zn = zmalloc(sizeof(*zn));
3869
3870 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3871 zn->score = score;
3872 zn->obj = obj;
3873 return zn;
3874 }
3875
3876 static zskiplist *zslCreate(void) {
3877 int j;
3878 zskiplist *zsl;
3879
3880 zsl = zmalloc(sizeof(*zsl));
3881 zsl->level = 1;
3882 zsl->length = 0;
3883 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3884 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3885 zsl->header->forward[j] = NULL;
3886 zsl->header->backward = NULL;
3887 zsl->tail = NULL;
3888 return zsl;
3889 }
3890
3891 static void zslFreeNode(zskiplistNode *node) {
3892 decrRefCount(node->obj);
3893 zfree(node->forward);
3894 zfree(node);
3895 }
3896
3897 static void zslFree(zskiplist *zsl) {
3898 zskiplistNode *node = zsl->header->forward[0], *next;
3899
3900 zfree(zsl->header->forward);
3901 zfree(zsl->header);
3902 while(node) {
3903 next = node->forward[0];
3904 zslFreeNode(node);
3905 node = next;
3906 }
3907 zfree(zsl);
3908 }
3909
3910 static int zslRandomLevel(void) {
3911 int level = 1;
3912 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3913 level += 1;
3914 return level;
3915 }
3916
3917 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3918 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3919 int i, level;
3920
3921 x = zsl->header;
3922 for (i = zsl->level-1; i >= 0; i--) {
3923 while (x->forward[i] &&
3924 (x->forward[i]->score < score ||
3925 (x->forward[i]->score == score &&
3926 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3927 x = x->forward[i];
3928 update[i] = x;
3929 }
3930 /* we assume the key is not already inside, since we allow duplicated
3931 * scores, and the re-insertion of score and redis object should never
3932 * happpen since the caller of zslInsert() should test in the hash table
3933 * if the element is already inside or not. */
3934 level = zslRandomLevel();
3935 if (level > zsl->level) {
3936 for (i = zsl->level; i < level; i++)
3937 update[i] = zsl->header;
3938 zsl->level = level;
3939 }
3940 x = zslCreateNode(level,score,obj);
3941 for (i = 0; i < level; i++) {
3942 x->forward[i] = update[i]->forward[i];
3943 update[i]->forward[i] = x;
3944 }
3945 x->backward = (update[0] == zsl->header) ? NULL : update[0];
3946 if (x->forward[0])
3947 x->forward[0]->backward = x;
3948 else
3949 zsl->tail = x;
3950 zsl->length++;
3951 }
3952
3953 /* Delete an element with matching score/object from the skiplist. */
3954 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
3955 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3956 int i;
3957
3958 x = zsl->header;
3959 for (i = zsl->level-1; i >= 0; i--) {
3960 while (x->forward[i] &&
3961 (x->forward[i]->score < score ||
3962 (x->forward[i]->score == score &&
3963 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3964 x = x->forward[i];
3965 update[i] = x;
3966 }
3967 /* We may have multiple elements with the same score, what we need
3968 * is to find the element with both the right score and object. */
3969 x = x->forward[0];
3970 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
3971 for (i = 0; i < zsl->level; i++) {
3972 if (update[i]->forward[i] != x) break;
3973 update[i]->forward[i] = x->forward[i];
3974 }
3975 if (x->forward[0]) {
3976 x->forward[0]->backward = (x->backward == zsl->header) ?
3977 NULL : x->backward;
3978 } else {
3979 zsl->tail = x->backward;
3980 }
3981 zslFreeNode(x);
3982 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
3983 zsl->level--;
3984 zsl->length--;
3985 return 1;
3986 } else {
3987 return 0; /* not found */
3988 }
3989 return 0; /* not found */
3990 }
3991
3992 /* Delete all the elements with score between min and max from the skiplist.
3993 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
3994 * Note that this function takes the reference to the hash table view of the
3995 * sorted set, in order to remove the elements from the hash table too. */
3996 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
3997 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3998 unsigned long removed = 0;
3999 int i;
4000
4001 x = zsl->header;
4002 for (i = zsl->level-1; i >= 0; i--) {
4003 while (x->forward[i] && x->forward[i]->score < min)
4004 x = x->forward[i];
4005 update[i] = x;
4006 }
4007 /* We may have multiple elements with the same score, what we need
4008 * is to find the element with both the right score and object. */
4009 x = x->forward[0];
4010 while (x && x->score <= max) {
4011 zskiplistNode *next;
4012
4013 for (i = 0; i < zsl->level; i++) {
4014 if (update[i]->forward[i] != x) break;
4015 update[i]->forward[i] = x->forward[i];
4016 }
4017 if (x->forward[0]) {
4018 x->forward[0]->backward = (x->backward == zsl->header) ?
4019 NULL : x->backward;
4020 } else {
4021 zsl->tail = x->backward;
4022 }
4023 next = x->forward[0];
4024 dictDelete(dict,x->obj);
4025 zslFreeNode(x);
4026 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4027 zsl->level--;
4028 zsl->length--;
4029 removed++;
4030 x = next;
4031 }
4032 return removed; /* not found */
4033 }
4034
4035 /* Find the first node having a score equal or greater than the specified one.
4036 * Returns NULL if there is no match. */
4037 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4038 zskiplistNode *x;
4039 int i;
4040
4041 x = zsl->header;
4042 for (i = zsl->level-1; i >= 0; i--) {
4043 while (x->forward[i] && x->forward[i]->score < score)
4044 x = x->forward[i];
4045 }
4046 /* We may have multiple elements with the same score, what we need
4047 * is to find the element with both the right score and object. */
4048 return x->forward[0];
4049 }
4050
4051 /* The actual Z-commands implementations */
4052
4053 static void zaddCommand(redisClient *c) {
4054 robj *zsetobj;
4055 zset *zs;
4056 double *score;
4057
4058 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4059 if (zsetobj == NULL) {
4060 zsetobj = createZsetObject();
4061 dictAdd(c->db->dict,c->argv[1],zsetobj);
4062 incrRefCount(c->argv[1]);
4063 } else {
4064 if (zsetobj->type != REDIS_ZSET) {
4065 addReply(c,shared.wrongtypeerr);
4066 return;
4067 }
4068 }
4069 score = zmalloc(sizeof(double));
4070 *score = strtod(c->argv[2]->ptr,NULL);
4071 zs = zsetobj->ptr;
4072 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
4073 /* case 1: New element */
4074 incrRefCount(c->argv[3]); /* added to hash */
4075 zslInsert(zs->zsl,*score,c->argv[3]);
4076 incrRefCount(c->argv[3]); /* added to skiplist */
4077 server.dirty++;
4078 addReply(c,shared.cone);
4079 } else {
4080 dictEntry *de;
4081 double *oldscore;
4082
4083 /* case 2: Score update operation */
4084 de = dictFind(zs->dict,c->argv[3]);
4085 assert(de != NULL);
4086 oldscore = dictGetEntryVal(de);
4087 if (*score != *oldscore) {
4088 int deleted;
4089
4090 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
4091 assert(deleted != 0);
4092 zslInsert(zs->zsl,*score,c->argv[3]);
4093 incrRefCount(c->argv[3]);
4094 dictReplace(zs->dict,c->argv[3],score);
4095 server.dirty++;
4096 } else {
4097 zfree(score);
4098 }
4099 addReply(c,shared.czero);
4100 }
4101 }
4102
4103 static void zremCommand(redisClient *c) {
4104 robj *zsetobj;
4105 zset *zs;
4106
4107 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4108 if (zsetobj == NULL) {
4109 addReply(c,shared.czero);
4110 } else {
4111 dictEntry *de;
4112 double *oldscore;
4113 int deleted;
4114
4115 if (zsetobj->type != REDIS_ZSET) {
4116 addReply(c,shared.wrongtypeerr);
4117 return;
4118 }
4119 zs = zsetobj->ptr;
4120 de = dictFind(zs->dict,c->argv[2]);
4121 if (de == NULL) {
4122 addReply(c,shared.czero);
4123 return;
4124 }
4125 /* Delete from the skiplist */
4126 oldscore = dictGetEntryVal(de);
4127 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4128 assert(deleted != 0);
4129
4130 /* Delete from the hash table */
4131 dictDelete(zs->dict,c->argv[2]);
4132 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4133 server.dirty++;
4134 addReply(c,shared.cone);
4135 }
4136 }
4137
4138 static void zremrangebyscoreCommand(redisClient *c) {
4139 double min = strtod(c->argv[2]->ptr,NULL);
4140 double max = strtod(c->argv[3]->ptr,NULL);
4141 robj *zsetobj;
4142 zset *zs;
4143
4144 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4145 if (zsetobj == NULL) {
4146 addReply(c,shared.czero);
4147 } else {
4148 long deleted;
4149
4150 if (zsetobj->type != REDIS_ZSET) {
4151 addReply(c,shared.wrongtypeerr);
4152 return;
4153 }
4154 zs = zsetobj->ptr;
4155 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4156 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4157 server.dirty += deleted;
4158 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4159 }
4160 }
4161
4162 static void zrangeGenericCommand(redisClient *c, int reverse) {
4163 robj *o;
4164 int start = atoi(c->argv[2]->ptr);
4165 int end = atoi(c->argv[3]->ptr);
4166
4167 o = lookupKeyRead(c->db,c->argv[1]);
4168 if (o == NULL) {
4169 addReply(c,shared.nullmultibulk);
4170 } else {
4171 if (o->type != REDIS_ZSET) {
4172 addReply(c,shared.wrongtypeerr);
4173 } else {
4174 zset *zsetobj = o->ptr;
4175 zskiplist *zsl = zsetobj->zsl;
4176 zskiplistNode *ln;
4177
4178 int llen = zsl->length;
4179 int rangelen, j;
4180 robj *ele;
4181
4182 /* convert negative indexes */
4183 if (start < 0) start = llen+start;
4184 if (end < 0) end = llen+end;
4185 if (start < 0) start = 0;
4186 if (end < 0) end = 0;
4187
4188 /* indexes sanity checks */
4189 if (start > end || start >= llen) {
4190 /* Out of range start or start > end result in empty list */
4191 addReply(c,shared.emptymultibulk);
4192 return;
4193 }
4194 if (end >= llen) end = llen-1;
4195 rangelen = (end-start)+1;
4196
4197 /* Return the result in form of a multi-bulk reply */
4198 if (reverse) {
4199 ln = zsl->tail;
4200 while (start--)
4201 ln = ln->backward;
4202 } else {
4203 ln = zsl->header->forward[0];
4204 while (start--)
4205 ln = ln->forward[0];
4206 }
4207
4208 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4209 for (j = 0; j < rangelen; j++) {
4210 ele = ln->obj;
4211 addReplyBulkLen(c,ele);
4212 addReply(c,ele);
4213 addReply(c,shared.crlf);
4214 ln = reverse ? ln->backward : ln->forward[0];
4215 }
4216 }
4217 }
4218 }
4219
4220 static void zrangeCommand(redisClient *c) {
4221 zrangeGenericCommand(c,0);
4222 }
4223
4224 static void zrevrangeCommand(redisClient *c) {
4225 zrangeGenericCommand(c,1);
4226 }
4227
4228 static void zrangebyscoreCommand(redisClient *c) {
4229 robj *o;
4230 double min = strtod(c->argv[2]->ptr,NULL);
4231 double max = strtod(c->argv[3]->ptr,NULL);
4232
4233 o = lookupKeyRead(c->db,c->argv[1]);
4234 if (o == NULL) {
4235 addReply(c,shared.nullmultibulk);
4236 } else {
4237 if (o->type != REDIS_ZSET) {
4238 addReply(c,shared.wrongtypeerr);
4239 } else {
4240 zset *zsetobj = o->ptr;
4241 zskiplist *zsl = zsetobj->zsl;
4242 zskiplistNode *ln;
4243 robj *ele, *lenobj;
4244 unsigned int rangelen = 0;
4245
4246 /* Get the first node with the score >= min */
4247 ln = zslFirstWithScore(zsl,min);
4248 if (ln == NULL) {
4249 /* No element matching the speciifed interval */
4250 addReply(c,shared.emptymultibulk);
4251 return;
4252 }
4253
4254 /* We don't know in advance how many matching elements there
4255 * are in the list, so we push this object that will represent
4256 * the multi-bulk length in the output buffer, and will "fix"
4257 * it later */
4258 lenobj = createObject(REDIS_STRING,NULL);
4259 addReply(c,lenobj);
4260
4261 while(ln && ln->score <= max) {
4262 ele = ln->obj;
4263 addReplyBulkLen(c,ele);
4264 addReply(c,ele);
4265 addReply(c,shared.crlf);
4266 ln = ln->forward[0];
4267 rangelen++;
4268 }
4269 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4270 }
4271 }
4272 }
4273
4274 static void zcardCommand(redisClient *c) {
4275 robj *o;
4276 zset *zs;
4277
4278 o = lookupKeyRead(c->db,c->argv[1]);
4279 if (o == NULL) {
4280 addReply(c,shared.czero);
4281 return;
4282 } else {
4283 if (o->type != REDIS_ZSET) {
4284 addReply(c,shared.wrongtypeerr);
4285 } else {
4286 zs = o->ptr;
4287 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4288 }
4289 }
4290 }
4291
4292 static void zscoreCommand(redisClient *c) {
4293 robj *o;
4294 zset *zs;
4295
4296 o = lookupKeyRead(c->db,c->argv[1]);
4297 if (o == NULL) {
4298 addReply(c,shared.czero);
4299 return;
4300 } else {
4301 if (o->type != REDIS_ZSET) {
4302 addReply(c,shared.wrongtypeerr);
4303 } else {
4304 dictEntry *de;
4305
4306 zs = o->ptr;
4307 de = dictFind(zs->dict,c->argv[2]);
4308 if (!de) {
4309 addReply(c,shared.nullbulk);
4310 } else {
4311 char buf[128];
4312 double *score = dictGetEntryVal(de);
4313
4314 snprintf(buf,sizeof(buf),"%.17g",*score);
4315 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
4316 strlen(buf),buf));
4317 }
4318 }
4319 }
4320 }
4321
4322 /* ========================= Non type-specific commands ==================== */
4323
4324 static void flushdbCommand(redisClient *c) {
4325 server.dirty += dictSize(c->db->dict);
4326 dictEmpty(c->db->dict);
4327 dictEmpty(c->db->expires);
4328 addReply(c,shared.ok);
4329 }
4330
4331 static void flushallCommand(redisClient *c) {
4332 server.dirty += emptyDb();
4333 addReply(c,shared.ok);
4334 rdbSave(server.dbfilename);
4335 server.dirty++;
4336 }
4337
4338 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4339 redisSortOperation *so = zmalloc(sizeof(*so));
4340 so->type = type;
4341 so->pattern = pattern;
4342 return so;
4343 }
4344
4345 /* Return the value associated to the key with a name obtained
4346 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4347 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4348 char *p;
4349 sds spat, ssub;
4350 robj keyobj;
4351 int prefixlen, sublen, postfixlen;
4352 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4353 struct {
4354 long len;
4355 long free;
4356 char buf[REDIS_SORTKEY_MAX+1];
4357 } keyname;
4358
4359 if (subst->encoding == REDIS_ENCODING_RAW)
4360 incrRefCount(subst);
4361 else {
4362 subst = getDecodedObject(subst);
4363 }
4364
4365 spat = pattern->ptr;
4366 ssub = subst->ptr;
4367 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4368 p = strchr(spat,'*');
4369 if (!p) return NULL;
4370
4371 prefixlen = p-spat;
4372 sublen = sdslen(ssub);
4373 postfixlen = sdslen(spat)-(prefixlen+1);
4374 memcpy(keyname.buf,spat,prefixlen);
4375 memcpy(keyname.buf+prefixlen,ssub,sublen);
4376 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4377 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4378 keyname.len = prefixlen+sublen+postfixlen;
4379
4380 keyobj.refcount = 1;
4381 keyobj.type = REDIS_STRING;
4382 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4383
4384 decrRefCount(subst);
4385
4386 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4387 return lookupKeyRead(db,&keyobj);
4388 }
4389
4390 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4391 * the additional parameter is not standard but a BSD-specific we have to
4392 * pass sorting parameters via the global 'server' structure */
4393 static int sortCompare(const void *s1, const void *s2) {
4394 const redisSortObject *so1 = s1, *so2 = s2;
4395 int cmp;
4396
4397 if (!server.sort_alpha) {
4398 /* Numeric sorting. Here it's trivial as we precomputed scores */
4399 if (so1->u.score > so2->u.score) {
4400 cmp = 1;
4401 } else if (so1->u.score < so2->u.score) {
4402 cmp = -1;
4403 } else {
4404 cmp = 0;
4405 }
4406 } else {
4407 /* Alphanumeric sorting */
4408 if (server.sort_bypattern) {
4409 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4410 /* At least one compare object is NULL */
4411 if (so1->u.cmpobj == so2->u.cmpobj)
4412 cmp = 0;
4413 else if (so1->u.cmpobj == NULL)
4414 cmp = -1;
4415 else
4416 cmp = 1;
4417 } else {
4418 /* We have both the objects, use strcoll */
4419 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4420 }
4421 } else {
4422 /* Compare elements directly */
4423 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4424 so2->obj->encoding == REDIS_ENCODING_RAW) {
4425 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4426 } else {
4427 robj *dec1, *dec2;
4428
4429 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4430 so1->obj : getDecodedObject(so1->obj);
4431 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4432 so2->obj : getDecodedObject(so2->obj);
4433 cmp = strcoll(dec1->ptr,dec2->ptr);
4434 if (dec1 != so1->obj) decrRefCount(dec1);
4435 if (dec2 != so2->obj) decrRefCount(dec2);
4436 }
4437 }
4438 }
4439 return server.sort_desc ? -cmp : cmp;
4440 }
4441
4442 /* The SORT command is the most complex command in Redis. Warning: this code
4443 * is optimized for speed and a bit less for readability */
4444 static void sortCommand(redisClient *c) {
4445 list *operations;
4446 int outputlen = 0;
4447 int desc = 0, alpha = 0;
4448 int limit_start = 0, limit_count = -1, start, end;
4449 int j, dontsort = 0, vectorlen;
4450 int getop = 0; /* GET operation counter */
4451 robj *sortval, *sortby = NULL, *storekey = NULL;
4452 redisSortObject *vector; /* Resulting vector to sort */
4453
4454 /* Lookup the key to sort. It must be of the right types */
4455 sortval = lookupKeyRead(c->db,c->argv[1]);
4456 if (sortval == NULL) {
4457 addReply(c,shared.nokeyerr);
4458 return;
4459 }
4460 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4461 addReply(c,shared.wrongtypeerr);
4462 return;
4463 }
4464
4465 /* Create a list of operations to perform for every sorted element.
4466 * Operations can be GET/DEL/INCR/DECR */
4467 operations = listCreate();
4468 listSetFreeMethod(operations,zfree);
4469 j = 2;
4470
4471 /* Now we need to protect sortval incrementing its count, in the future
4472 * SORT may have options able to overwrite/delete keys during the sorting
4473 * and the sorted key itself may get destroied */
4474 incrRefCount(sortval);
4475
4476 /* The SORT command has an SQL-alike syntax, parse it */
4477 while(j < c->argc) {
4478 int leftargs = c->argc-j-1;
4479 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4480 desc = 0;
4481 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4482 desc = 1;
4483 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4484 alpha = 1;
4485 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4486 limit_start = atoi(c->argv[j+1]->ptr);
4487 limit_count = atoi(c->argv[j+2]->ptr);
4488 j+=2;
4489 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4490 storekey = c->argv[j+1];
4491 j++;
4492 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4493 sortby = c->argv[j+1];
4494 /* If the BY pattern does not contain '*', i.e. it is constant,
4495 * we don't need to sort nor to lookup the weight keys. */
4496 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4497 j++;
4498 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4499 listAddNodeTail(operations,createSortOperation(
4500 REDIS_SORT_GET,c->argv[j+1]));
4501 getop++;
4502 j++;
4503 } else {
4504 decrRefCount(sortval);
4505 listRelease(operations);
4506 addReply(c,shared.syntaxerr);
4507 return;
4508 }
4509 j++;
4510 }
4511
4512 /* Load the sorting vector with all the objects to sort */
4513 vectorlen = (sortval->type == REDIS_LIST) ?
4514 listLength((list*)sortval->ptr) :
4515 dictSize((dict*)sortval->ptr);
4516 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4517 j = 0;
4518 if (sortval->type == REDIS_LIST) {
4519 list *list = sortval->ptr;
4520 listNode *ln;
4521
4522 listRewind(list);
4523 while((ln = listYield(list))) {
4524 robj *ele = ln->value;
4525 vector[j].obj = ele;
4526 vector[j].u.score = 0;
4527 vector[j].u.cmpobj = NULL;
4528 j++;
4529 }
4530 } else {
4531 dict *set = sortval->ptr;
4532 dictIterator *di;
4533 dictEntry *setele;
4534
4535 di = dictGetIterator(set);
4536 while((setele = dictNext(di)) != NULL) {
4537 vector[j].obj = dictGetEntryKey(setele);
4538 vector[j].u.score = 0;
4539 vector[j].u.cmpobj = NULL;
4540 j++;
4541 }
4542 dictReleaseIterator(di);
4543 }
4544 assert(j == vectorlen);
4545
4546 /* Now it's time to load the right scores in the sorting vector */
4547 if (dontsort == 0) {
4548 for (j = 0; j < vectorlen; j++) {
4549 if (sortby) {
4550 robj *byval;
4551
4552 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4553 if (!byval || byval->type != REDIS_STRING) continue;
4554 if (alpha) {
4555 if (byval->encoding == REDIS_ENCODING_RAW) {
4556 vector[j].u.cmpobj = byval;
4557 incrRefCount(byval);
4558 } else {
4559 vector[j].u.cmpobj = getDecodedObject(byval);
4560 }
4561 } else {
4562 if (byval->encoding == REDIS_ENCODING_RAW) {
4563 vector[j].u.score = strtod(byval->ptr,NULL);
4564 } else {
4565 if (byval->encoding == REDIS_ENCODING_INT) {
4566 vector[j].u.score = (long)byval->ptr;
4567 } else
4568 assert(1 != 1);
4569 }
4570 }
4571 } else {
4572 if (!alpha) {
4573 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4574 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4575 else {
4576 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4577 vector[j].u.score = (long) vector[j].obj->ptr;
4578 else
4579 assert(1 != 1);
4580 }
4581 }
4582 }
4583 }
4584 }
4585
4586 /* We are ready to sort the vector... perform a bit of sanity check
4587 * on the LIMIT option too. We'll use a partial version of quicksort. */
4588 start = (limit_start < 0) ? 0 : limit_start;
4589 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4590 if (start >= vectorlen) {
4591 start = vectorlen-1;
4592 end = vectorlen-2;
4593 }
4594 if (end >= vectorlen) end = vectorlen-1;
4595
4596 if (dontsort == 0) {
4597 server.sort_desc = desc;
4598 server.sort_alpha = alpha;
4599 server.sort_bypattern = sortby ? 1 : 0;
4600 if (sortby && (start != 0 || end != vectorlen-1))
4601 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4602 else
4603 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4604 }
4605
4606 /* Send command output to the output buffer, performing the specified
4607 * GET/DEL/INCR/DECR operations if any. */
4608 outputlen = getop ? getop*(end-start+1) : end-start+1;
4609 if (storekey == NULL) {
4610 /* STORE option not specified, sent the sorting result to client */
4611 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4612 for (j = start; j <= end; j++) {
4613 listNode *ln;
4614 if (!getop) {
4615 addReplyBulkLen(c,vector[j].obj);
4616 addReply(c,vector[j].obj);
4617 addReply(c,shared.crlf);
4618 }
4619 listRewind(operations);
4620 while((ln = listYield(operations))) {
4621 redisSortOperation *sop = ln->value;
4622 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4623 vector[j].obj);
4624
4625 if (sop->type == REDIS_SORT_GET) {
4626 if (!val || val->type != REDIS_STRING) {
4627 addReply(c,shared.nullbulk);
4628 } else {
4629 addReplyBulkLen(c,val);
4630 addReply(c,val);
4631 addReply(c,shared.crlf);
4632 }
4633 } else {
4634 assert(sop->type == REDIS_SORT_GET); /* always fails */
4635 }
4636 }
4637 }
4638 } else {
4639 robj *listObject = createListObject();
4640 list *listPtr = (list*) listObject->ptr;
4641
4642 /* STORE option specified, set the sorting result as a List object */
4643 for (j = start; j <= end; j++) {
4644 listNode *ln;
4645 if (!getop) {
4646 listAddNodeTail(listPtr,vector[j].obj);
4647 incrRefCount(vector[j].obj);
4648 }
4649 listRewind(operations);
4650 while((ln = listYield(operations))) {
4651 redisSortOperation *sop = ln->value;
4652 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4653 vector[j].obj);
4654
4655 if (sop->type == REDIS_SORT_GET) {
4656 if (!val || val->type != REDIS_STRING) {
4657 listAddNodeTail(listPtr,createStringObject("",0));
4658 } else {
4659 listAddNodeTail(listPtr,val);
4660 incrRefCount(val);
4661 }
4662 } else {
4663 assert(sop->type == REDIS_SORT_GET); /* always fails */
4664 }
4665 }
4666 }
4667 if (dictReplace(c->db->dict,storekey,listObject)) {
4668 incrRefCount(storekey);
4669 }
4670 /* Note: we add 1 because the DB is dirty anyway since even if the
4671 * SORT result is empty a new key is set and maybe the old content
4672 * replaced. */
4673 server.dirty += 1+outputlen;
4674 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
4675 }
4676
4677 /* Cleanup */
4678 decrRefCount(sortval);
4679 listRelease(operations);
4680 for (j = 0; j < vectorlen; j++) {
4681 if (sortby && alpha && vector[j].u.cmpobj)
4682 decrRefCount(vector[j].u.cmpobj);
4683 }
4684 zfree(vector);
4685 }
4686
4687 static void infoCommand(redisClient *c) {
4688 sds info;
4689 time_t uptime = time(NULL)-server.stat_starttime;
4690 int j;
4691
4692 info = sdscatprintf(sdsempty(),
4693 "redis_version:%s\r\n"
4694 "arch_bits:%s\r\n"
4695 "uptime_in_seconds:%d\r\n"
4696 "uptime_in_days:%d\r\n"
4697 "connected_clients:%d\r\n"
4698 "connected_slaves:%d\r\n"
4699 "used_memory:%zu\r\n"
4700 "changes_since_last_save:%lld\r\n"
4701 "bgsave_in_progress:%d\r\n"
4702 "last_save_time:%d\r\n"
4703 "total_connections_received:%lld\r\n"
4704 "total_commands_processed:%lld\r\n"
4705 "role:%s\r\n"
4706 ,REDIS_VERSION,
4707 (sizeof(long) == 8) ? "64" : "32",
4708 uptime,
4709 uptime/(3600*24),
4710 listLength(server.clients)-listLength(server.slaves),
4711 listLength(server.slaves),
4712 server.usedmemory,
4713 server.dirty,
4714 server.bgsaveinprogress,
4715 server.lastsave,
4716 server.stat_numconnections,
4717 server.stat_numcommands,
4718 server.masterhost == NULL ? "master" : "slave"
4719 );
4720 if (server.masterhost) {
4721 info = sdscatprintf(info,
4722 "master_host:%s\r\n"
4723 "master_port:%d\r\n"
4724 "master_link_status:%s\r\n"
4725 "master_last_io_seconds_ago:%d\r\n"
4726 ,server.masterhost,
4727 server.masterport,
4728 (server.replstate == REDIS_REPL_CONNECTED) ?
4729 "up" : "down",
4730 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
4731 );
4732 }
4733 for (j = 0; j < server.dbnum; j++) {
4734 long long keys, vkeys;
4735
4736 keys = dictSize(server.db[j].dict);
4737 vkeys = dictSize(server.db[j].expires);
4738 if (keys || vkeys) {
4739 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4740 j, keys, vkeys);
4741 }
4742 }
4743 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4744 addReplySds(c,info);
4745 addReply(c,shared.crlf);
4746 }
4747
4748 static void monitorCommand(redisClient *c) {
4749 /* ignore MONITOR if aleady slave or in monitor mode */
4750 if (c->flags & REDIS_SLAVE) return;
4751
4752 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4753 c->slaveseldb = 0;
4754 listAddNodeTail(server.monitors,c);
4755 addReply(c,shared.ok);
4756 }
4757
4758 /* ================================= Expire ================================= */
4759 static int removeExpire(redisDb *db, robj *key) {
4760 if (dictDelete(db->expires,key) == DICT_OK) {
4761 return 1;
4762 } else {
4763 return 0;
4764 }
4765 }
4766
4767 static int setExpire(redisDb *db, robj *key, time_t when) {
4768 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4769 return 0;
4770 } else {
4771 incrRefCount(key);
4772 return 1;
4773 }
4774 }
4775
4776 /* Return the expire time of the specified key, or -1 if no expire
4777 * is associated with this key (i.e. the key is non volatile) */
4778 static time_t getExpire(redisDb *db, robj *key) {
4779 dictEntry *de;
4780
4781 /* No expire? return ASAP */
4782 if (dictSize(db->expires) == 0 ||
4783 (de = dictFind(db->expires,key)) == NULL) return -1;
4784
4785 return (time_t) dictGetEntryVal(de);
4786 }
4787
4788 static int expireIfNeeded(redisDb *db, robj *key) {
4789 time_t when;
4790 dictEntry *de;
4791
4792 /* No expire? return ASAP */
4793 if (dictSize(db->expires) == 0 ||
4794 (de = dictFind(db->expires,key)) == NULL) return 0;
4795
4796 /* Lookup the expire */
4797 when = (time_t) dictGetEntryVal(de);
4798 if (time(NULL) <= when) return 0;
4799
4800 /* Delete the key */
4801 dictDelete(db->expires,key);
4802 return dictDelete(db->dict,key) == DICT_OK;
4803 }
4804
4805 static int deleteIfVolatile(redisDb *db, robj *key) {
4806 dictEntry *de;
4807
4808 /* No expire? return ASAP */
4809 if (dictSize(db->expires) == 0 ||
4810 (de = dictFind(db->expires,key)) == NULL) return 0;
4811
4812 /* Delete the key */
4813 server.dirty++;
4814 dictDelete(db->expires,key);
4815 return dictDelete(db->dict,key) == DICT_OK;
4816 }
4817
4818 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
4819 dictEntry *de;
4820
4821 de = dictFind(c->db->dict,key);
4822 if (de == NULL) {
4823 addReply(c,shared.czero);
4824 return;
4825 }
4826 if (seconds < 0) {
4827 if (deleteKey(c->db,key)) server.dirty++;
4828 addReply(c, shared.cone);
4829 return;
4830 } else {
4831 time_t when = time(NULL)+seconds;
4832 if (setExpire(c->db,key,when)) {
4833 addReply(c,shared.cone);
4834 server.dirty++;
4835 } else {
4836 addReply(c,shared.czero);
4837 }
4838 return;
4839 }
4840 }
4841
4842 static void expireCommand(redisClient *c) {
4843 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
4844 }
4845
4846 static void expireatCommand(redisClient *c) {
4847 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
4848 }
4849
4850 static void ttlCommand(redisClient *c) {
4851 time_t expire;
4852 int ttl = -1;
4853
4854 expire = getExpire(c->db,c->argv[1]);
4855 if (expire != -1) {
4856 ttl = (int) (expire-time(NULL));
4857 if (ttl < 0) ttl = -1;
4858 }
4859 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4860 }
4861
4862 static void msetGenericCommand(redisClient *c, int nx) {
4863 int j;
4864
4865 if ((c->argc % 2) == 0) {
4866 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4867 return;
4868 }
4869 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4870 * set nothing at all if at least one already key exists. */
4871 if (nx) {
4872 for (j = 1; j < c->argc; j += 2) {
4873 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4874 addReply(c, shared.czero);
4875 return;
4876 }
4877 }
4878 }
4879
4880 for (j = 1; j < c->argc; j += 2) {
4881 int retval;
4882
4883 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4884 if (retval == DICT_ERR) {
4885 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4886 incrRefCount(c->argv[j+1]);
4887 } else {
4888 incrRefCount(c->argv[j]);
4889 incrRefCount(c->argv[j+1]);
4890 }
4891 removeExpire(c->db,c->argv[j]);
4892 }
4893 server.dirty += (c->argc-1)/2;
4894 addReply(c, nx ? shared.cone : shared.ok);
4895 }
4896
4897 static void msetCommand(redisClient *c) {
4898 msetGenericCommand(c,0);
4899 }
4900
4901 static void msetnxCommand(redisClient *c) {
4902 msetGenericCommand(c,1);
4903 }
4904
4905 /* =============================== Replication ============================= */
4906
4907 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4908 ssize_t nwritten, ret = size;
4909 time_t start = time(NULL);
4910
4911 timeout++;
4912 while(size) {
4913 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4914 nwritten = write(fd,ptr,size);
4915 if (nwritten == -1) return -1;
4916 ptr += nwritten;
4917 size -= nwritten;
4918 }
4919 if ((time(NULL)-start) > timeout) {
4920 errno = ETIMEDOUT;
4921 return -1;
4922 }
4923 }
4924 return ret;
4925 }
4926
4927 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4928 ssize_t nread, totread = 0;
4929 time_t start = time(NULL);
4930
4931 timeout++;
4932 while(size) {
4933 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4934 nread = read(fd,ptr,size);
4935 if (nread == -1) return -1;
4936 ptr += nread;
4937 size -= nread;
4938 totread += nread;
4939 }
4940 if ((time(NULL)-start) > timeout) {
4941 errno = ETIMEDOUT;
4942 return -1;
4943 }
4944 }
4945 return totread;
4946 }
4947
4948 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4949 ssize_t nread = 0;
4950
4951 size--;
4952 while(size) {
4953 char c;
4954
4955 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4956 if (c == '\n') {
4957 *ptr = '\0';
4958 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4959 return nread;
4960 } else {
4961 *ptr++ = c;
4962 *ptr = '\0';
4963 nread++;
4964 }
4965 }
4966 return nread;
4967 }
4968
4969 static void syncCommand(redisClient *c) {
4970 /* ignore SYNC if aleady slave or in monitor mode */
4971 if (c->flags & REDIS_SLAVE) return;
4972
4973 /* SYNC can't be issued when the server has pending data to send to
4974 * the client about already issued commands. We need a fresh reply
4975 * buffer registering the differences between the BGSAVE and the current
4976 * dataset, so that we can copy to other slaves if needed. */
4977 if (listLength(c->reply) != 0) {
4978 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4979 return;
4980 }
4981
4982 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4983 /* Here we need to check if there is a background saving operation
4984 * in progress, or if it is required to start one */
4985 if (server.bgsaveinprogress) {
4986 /* Ok a background save is in progress. Let's check if it is a good
4987 * one for replication, i.e. if there is another slave that is
4988 * registering differences since the server forked to save */
4989 redisClient *slave;
4990 listNode *ln;
4991
4992 listRewind(server.slaves);
4993 while((ln = listYield(server.slaves))) {
4994 slave = ln->value;
4995 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4996 }
4997 if (ln) {
4998 /* Perfect, the server is already registering differences for
4999 * another slave. Set the right state, and copy the buffer. */
5000 listRelease(c->reply);
5001 c->reply = listDup(slave->reply);
5002 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5003 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5004 } else {
5005 /* No way, we need to wait for the next BGSAVE in order to
5006 * register differences */
5007 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5008 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5009 }
5010 } else {
5011 /* Ok we don't have a BGSAVE in progress, let's start one */
5012 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5013 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5014 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5015 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5016 return;
5017 }
5018 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5019 }
5020 c->repldbfd = -1;
5021 c->flags |= REDIS_SLAVE;
5022 c->slaveseldb = 0;
5023 listAddNodeTail(server.slaves,c);
5024 return;
5025 }
5026
5027 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5028 redisClient *slave = privdata;
5029 REDIS_NOTUSED(el);
5030 REDIS_NOTUSED(mask);
5031 char buf[REDIS_IOBUF_LEN];
5032 ssize_t nwritten, buflen;
5033
5034 if (slave->repldboff == 0) {
5035 /* Write the bulk write count before to transfer the DB. In theory here
5036 * we don't know how much room there is in the output buffer of the
5037 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5038 * operations) will never be smaller than the few bytes we need. */
5039 sds bulkcount;
5040
5041 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5042 slave->repldbsize);
5043 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5044 {
5045 sdsfree(bulkcount);
5046 freeClient(slave);
5047 return;
5048 }
5049 sdsfree(bulkcount);
5050 }
5051 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5052 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5053 if (buflen <= 0) {
5054 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5055 (buflen == 0) ? "premature EOF" : strerror(errno));
5056 freeClient(slave);
5057 return;
5058 }
5059 if ((nwritten = write(fd,buf,buflen)) == -1) {
5060 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5061 strerror(errno));
5062 freeClient(slave);
5063 return;
5064 }
5065 slave->repldboff += nwritten;
5066 if (slave->repldboff == slave->repldbsize) {
5067 close(slave->repldbfd);
5068 slave->repldbfd = -1;
5069 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5070 slave->replstate = REDIS_REPL_ONLINE;
5071 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5072 sendReplyToClient, slave, NULL) == AE_ERR) {
5073 freeClient(slave);
5074 return;
5075 }
5076 addReplySds(slave,sdsempty());
5077 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5078 }
5079 }
5080
5081 /* This function is called at the end of every backgrond saving.
5082 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5083 * otherwise REDIS_ERR is passed to the function.
5084 *
5085 * The goal of this function is to handle slaves waiting for a successful
5086 * background saving in order to perform non-blocking synchronization. */
5087 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5088 listNode *ln;
5089 int startbgsave = 0;
5090
5091 listRewind(server.slaves);
5092 while((ln = listYield(server.slaves))) {
5093 redisClient *slave = ln->value;
5094
5095 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5096 startbgsave = 1;
5097 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5098 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5099 struct redis_stat buf;
5100
5101 if (bgsaveerr != REDIS_OK) {
5102 freeClient(slave);
5103 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5104 continue;
5105 }
5106 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5107 redis_fstat(slave->repldbfd,&buf) == -1) {
5108 freeClient(slave);
5109 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5110 continue;
5111 }
5112 slave->repldboff = 0;
5113 slave->repldbsize = buf.st_size;
5114 slave->replstate = REDIS_REPL_SEND_BULK;
5115 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5116 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
5117 freeClient(slave);
5118 continue;
5119 }
5120 }
5121 }
5122 if (startbgsave) {
5123 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5124 listRewind(server.slaves);
5125 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5126 while((ln = listYield(server.slaves))) {
5127 redisClient *slave = ln->value;
5128
5129 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5130 freeClient(slave);
5131 }
5132 }
5133 }
5134 }
5135
5136 static int syncWithMaster(void) {
5137 char buf[1024], tmpfile[256], authcmd[1024];
5138 int dumpsize;
5139 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5140 int dfd;
5141
5142 if (fd == -1) {
5143 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5144 strerror(errno));
5145 return REDIS_ERR;
5146 }
5147
5148 /* AUTH with the master if required. */
5149 if(server.masterauth) {
5150 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5151 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5152 close(fd);
5153 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5154 strerror(errno));
5155 return REDIS_ERR;
5156 }
5157 /* Read the AUTH result. */
5158 if (syncReadLine(fd,buf,1024,3600) == -1) {
5159 close(fd);
5160 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5161 strerror(errno));
5162 return REDIS_ERR;
5163 }
5164 if (buf[0] != '+') {
5165 close(fd);
5166 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5167 return REDIS_ERR;
5168 }
5169 }
5170
5171 /* Issue the SYNC command */
5172 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5173 close(fd);
5174 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5175 strerror(errno));
5176 return REDIS_ERR;
5177 }
5178 /* Read the bulk write count */
5179 if (syncReadLine(fd,buf,1024,3600) == -1) {
5180 close(fd);
5181 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5182 strerror(errno));
5183 return REDIS_ERR;
5184 }
5185 if (buf[0] != '$') {
5186 close(fd);
5187 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5188 return REDIS_ERR;
5189 }
5190 dumpsize = atoi(buf+1);
5191 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5192 /* Read the bulk write data on a temp file */
5193 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5194 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5195 if (dfd == -1) {
5196 close(fd);
5197 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5198 return REDIS_ERR;
5199 }
5200 while(dumpsize) {
5201 int nread, nwritten;
5202
5203 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5204 if (nread == -1) {
5205 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5206 strerror(errno));
5207 close(fd);
5208 close(dfd);
5209 return REDIS_ERR;
5210 }
5211 nwritten = write(dfd,buf,nread);
5212 if (nwritten == -1) {
5213 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5214 close(fd);
5215 close(dfd);
5216 return REDIS_ERR;
5217 }
5218 dumpsize -= nread;
5219 }
5220 close(dfd);
5221 if (rename(tmpfile,server.dbfilename) == -1) {
5222 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5223 unlink(tmpfile);
5224 close(fd);
5225 return REDIS_ERR;
5226 }
5227 emptyDb();
5228 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5229 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5230 close(fd);
5231 return REDIS_ERR;
5232 }
5233 server.master = createClient(fd);
5234 server.master->flags |= REDIS_MASTER;
5235 server.replstate = REDIS_REPL_CONNECTED;
5236 return REDIS_OK;
5237 }
5238
5239 static void slaveofCommand(redisClient *c) {
5240 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5241 !strcasecmp(c->argv[2]->ptr,"one")) {
5242 if (server.masterhost) {
5243 sdsfree(server.masterhost);
5244 server.masterhost = NULL;
5245 if (server.master) freeClient(server.master);
5246 server.replstate = REDIS_REPL_NONE;
5247 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5248 }
5249 } else {
5250 sdsfree(server.masterhost);
5251 server.masterhost = sdsdup(c->argv[1]->ptr);
5252 server.masterport = atoi(c->argv[2]->ptr);
5253 if (server.master) freeClient(server.master);
5254 server.replstate = REDIS_REPL_CONNECT;
5255 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5256 server.masterhost, server.masterport);
5257 }
5258 addReply(c,shared.ok);
5259 }
5260
5261 /* ============================ Maxmemory directive ======================== */
5262
5263 /* This function gets called when 'maxmemory' is set on the config file to limit
5264 * the max memory used by the server, and we are out of memory.
5265 * This function will try to, in order:
5266 *
5267 * - Free objects from the free list
5268 * - Try to remove keys with an EXPIRE set
5269 *
5270 * It is not possible to free enough memory to reach used-memory < maxmemory
5271 * the server will start refusing commands that will enlarge even more the
5272 * memory usage.
5273 */
5274 static void freeMemoryIfNeeded(void) {
5275 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5276 if (listLength(server.objfreelist)) {
5277 robj *o;
5278
5279 listNode *head = listFirst(server.objfreelist);
5280 o = listNodeValue(head);
5281 listDelNode(server.objfreelist,head);
5282 zfree(o);
5283 } else {
5284 int j, k, freed = 0;
5285
5286 for (j = 0; j < server.dbnum; j++) {
5287 int minttl = -1;
5288 robj *minkey = NULL;
5289 struct dictEntry *de;
5290
5291 if (dictSize(server.db[j].expires)) {
5292 freed = 1;
5293 /* From a sample of three keys drop the one nearest to
5294 * the natural expire */
5295 for (k = 0; k < 3; k++) {
5296 time_t t;
5297
5298 de = dictGetRandomKey(server.db[j].expires);
5299 t = (time_t) dictGetEntryVal(de);
5300 if (minttl == -1 || t < minttl) {
5301 minkey = dictGetEntryKey(de);
5302 minttl = t;
5303 }
5304 }
5305 deleteKey(server.db+j,minkey);
5306 }
5307 }
5308 if (!freed) return; /* nothing to free... */
5309 }
5310 }
5311 }
5312
5313 /* ============================== Append Only file ========================== */
5314
5315 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5316 sds buf = sdsempty();
5317 int j;
5318 ssize_t nwritten;
5319 time_t now;
5320 robj *tmpargv[3];
5321
5322 /* The DB this command was targetting is not the same as the last command
5323 * we appendend. To issue a SELECT command is needed. */
5324 if (dictid != server.appendseldb) {
5325 char seldb[64];
5326
5327 snprintf(seldb,sizeof(seldb),"%d",dictid);
5328 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
5329 strlen(seldb),seldb);
5330 server.appendseldb = dictid;
5331 }
5332
5333 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5334 * EXPIREs into EXPIREATs calls */
5335 if (cmd->proc == expireCommand) {
5336 long when;
5337
5338 tmpargv[0] = createStringObject("EXPIREAT",8);
5339 tmpargv[1] = argv[1];
5340 incrRefCount(argv[1]);
5341 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5342 tmpargv[2] = createObject(REDIS_STRING,
5343 sdscatprintf(sdsempty(),"%ld",when));
5344 argv = tmpargv;
5345 }
5346
5347 /* Append the actual command */
5348 buf = sdscatprintf(buf,"*%d\r\n",argc);
5349 for (j = 0; j < argc; j++) {
5350 robj *o = argv[j];
5351
5352 if (o->encoding != REDIS_ENCODING_RAW)
5353 o = getDecodedObject(o);
5354 buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
5355 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5356 buf = sdscatlen(buf,"\r\n",2);
5357 if (o != argv[j])
5358 decrRefCount(o);
5359 }
5360
5361 /* Free the objects from the modified argv for EXPIREAT */
5362 if (cmd->proc == expireCommand) {
5363 for (j = 0; j < 3; j++)
5364 decrRefCount(argv[j]);
5365 }
5366
5367 /* We want to perform a single write. This should be guaranteed atomic
5368 * at least if the filesystem we are writing is a real physical one.
5369 * While this will save us against the server being killed I don't think
5370 * there is much to do about the whole server stopping for power problems
5371 * or alike */
5372 nwritten = write(server.appendfd,buf,sdslen(buf));
5373 if (nwritten != (signed)sdslen(buf)) {
5374 /* Ooops, we are in troubles. The best thing to do for now is
5375 * to simply exit instead to give the illusion that everything is
5376 * working as expected. */
5377 if (nwritten == -1) {
5378 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5379 } else {
5380 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5381 }
5382 exit(1);
5383 }
5384 now = time(NULL);
5385 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5386 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5387 now-server.lastfsync > 1))
5388 {
5389 fsync(server.appendfd); /* Let's try to get this data on the disk */
5390 server.lastfsync = now;
5391 }
5392 }
5393
5394 /* In Redis commands are always executed in the context of a client, so in
5395 * order to load the append only file we need to create a fake client. */
5396 static struct redisClient *createFakeClient(void) {
5397 struct redisClient *c = zmalloc(sizeof(*c));
5398
5399 selectDb(c,0);
5400 c->fd = -1;
5401 c->querybuf = sdsempty();
5402 c->argc = 0;
5403 c->argv = NULL;
5404 c->flags = 0;
5405 /* We set the fake client as a slave waiting for the synchronization
5406 * so that Redis will not try to send replies to this client. */
5407 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5408 c->reply = listCreate();
5409 listSetFreeMethod(c->reply,decrRefCount);
5410 listSetDupMethod(c->reply,dupClientReplyValue);
5411 return c;
5412 }
5413
5414 static void freeFakeClient(struct redisClient *c) {
5415 sdsfree(c->querybuf);
5416 listRelease(c->reply);
5417 zfree(c);
5418 }
5419
5420 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5421 * error (the append only file is zero-length) REDIS_ERR is returned. On
5422 * fatal error an error message is logged and the program exists. */
5423 int loadAppendOnlyFile(char *filename) {
5424 struct redisClient *fakeClient;
5425 FILE *fp = fopen(filename,"r");
5426 struct redis_stat sb;
5427
5428 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5429 return REDIS_ERR;
5430
5431 if (fp == NULL) {
5432 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5433 exit(1);
5434 }
5435
5436 fakeClient = createFakeClient();
5437 while(1) {
5438 int argc, j;
5439 unsigned long len;
5440 robj **argv;
5441 char buf[128];
5442 sds argsds;
5443 struct redisCommand *cmd;
5444
5445 if (fgets(buf,sizeof(buf),fp) == NULL) {
5446 if (feof(fp))
5447 break;
5448 else
5449 goto readerr;
5450 }
5451 if (buf[0] != '*') goto fmterr;
5452 argc = atoi(buf+1);
5453 argv = zmalloc(sizeof(robj*)*argc);
5454 for (j = 0; j < argc; j++) {
5455 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5456 if (buf[0] != '$') goto fmterr;
5457 len = strtol(buf+1,NULL,10);
5458 argsds = sdsnewlen(NULL,len);
5459 if (fread(argsds,len,1,fp) == 0) goto fmterr;
5460 argv[j] = createObject(REDIS_STRING,argsds);
5461 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5462 }
5463
5464 /* Command lookup */
5465 cmd = lookupCommand(argv[0]->ptr);
5466 if (!cmd) {
5467 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5468 exit(1);
5469 }
5470 /* Try object sharing and encoding */
5471 if (server.shareobjects) {
5472 int j;
5473 for(j = 1; j < argc; j++)
5474 argv[j] = tryObjectSharing(argv[j]);
5475 }
5476 if (cmd->flags & REDIS_CMD_BULK)
5477 tryObjectEncoding(argv[argc-1]);
5478 /* Run the command in the context of a fake client */
5479 fakeClient->argc = argc;
5480 fakeClient->argv = argv;
5481 cmd->proc(fakeClient);
5482 /* Discard the reply objects list from the fake client */
5483 while(listLength(fakeClient->reply))
5484 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5485 /* Clean up, ready for the next command */
5486 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5487 zfree(argv);
5488 }
5489 fclose(fp);
5490 freeFakeClient(fakeClient);
5491 return REDIS_OK;
5492
5493 readerr:
5494 if (feof(fp)) {
5495 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5496 } else {
5497 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5498 }
5499 exit(1);
5500 fmterr:
5501 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5502 exit(1);
5503 }
5504
5505 /* ================================= Debugging ============================== */
5506
5507 static void debugCommand(redisClient *c) {
5508 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
5509 *((char*)-1) = 'x';
5510 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
5511 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
5512 robj *key, *val;
5513
5514 if (!de) {
5515 addReply(c,shared.nokeyerr);
5516 return;
5517 }
5518 key = dictGetEntryKey(de);
5519 val = dictGetEntryVal(de);
5520 addReplySds(c,sdscatprintf(sdsempty(),
5521 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5522 key, key->refcount, val, val->refcount, val->encoding));
5523 } else {
5524 addReplySds(c,sdsnew(
5525 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5526 }
5527 }
5528
5529 #ifdef HAVE_BACKTRACE
5530 static struct redisFunctionSym symsTable[] = {
5531 {"compareStringObjects", (unsigned long)compareStringObjects},
5532 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
5533 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
5534 {"dictEncObjHash", (unsigned long)dictEncObjHash},
5535 {"incrDecrCommand", (unsigned long)incrDecrCommand},
5536 {"freeStringObject", (unsigned long)freeStringObject},
5537 {"freeListObject", (unsigned long)freeListObject},
5538 {"freeSetObject", (unsigned long)freeSetObject},
5539 {"decrRefCount", (unsigned long)decrRefCount},
5540 {"createObject", (unsigned long)createObject},
5541 {"freeClient", (unsigned long)freeClient},
5542 {"rdbLoad", (unsigned long)rdbLoad},
5543 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
5544 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
5545 {"addReply", (unsigned long)addReply},
5546 {"addReplySds", (unsigned long)addReplySds},
5547 {"incrRefCount", (unsigned long)incrRefCount},
5548 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
5549 {"createStringObject", (unsigned long)createStringObject},
5550 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
5551 {"syncWithMaster", (unsigned long)syncWithMaster},
5552 {"tryObjectSharing", (unsigned long)tryObjectSharing},
5553 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
5554 {"getDecodedObject", (unsigned long)getDecodedObject},
5555 {"removeExpire", (unsigned long)removeExpire},
5556 {"expireIfNeeded", (unsigned long)expireIfNeeded},
5557 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
5558 {"deleteKey", (unsigned long)deleteKey},
5559 {"getExpire", (unsigned long)getExpire},
5560 {"setExpire", (unsigned long)setExpire},
5561 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
5562 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
5563 {"authCommand", (unsigned long)authCommand},
5564 {"pingCommand", (unsigned long)pingCommand},
5565 {"echoCommand", (unsigned long)echoCommand},
5566 {"setCommand", (unsigned long)setCommand},
5567 {"setnxCommand", (unsigned long)setnxCommand},
5568 {"getCommand", (unsigned long)getCommand},
5569 {"delCommand", (unsigned long)delCommand},
5570 {"existsCommand", (unsigned long)existsCommand},
5571 {"incrCommand", (unsigned long)incrCommand},
5572 {"decrCommand", (unsigned long)decrCommand},
5573 {"incrbyCommand", (unsigned long)incrbyCommand},
5574 {"decrbyCommand", (unsigned long)decrbyCommand},
5575 {"selectCommand", (unsigned long)selectCommand},
5576 {"randomkeyCommand", (unsigned long)randomkeyCommand},
5577 {"keysCommand", (unsigned long)keysCommand},
5578 {"dbsizeCommand", (unsigned long)dbsizeCommand},
5579 {"lastsaveCommand", (unsigned long)lastsaveCommand},
5580 {"saveCommand", (unsigned long)saveCommand},
5581 {"bgsaveCommand", (unsigned long)bgsaveCommand},
5582 {"shutdownCommand", (unsigned long)shutdownCommand},
5583 {"moveCommand", (unsigned long)moveCommand},
5584 {"renameCommand", (unsigned long)renameCommand},
5585 {"renamenxCommand", (unsigned long)renamenxCommand},
5586 {"lpushCommand", (unsigned long)lpushCommand},
5587 {"rpushCommand", (unsigned long)rpushCommand},
5588 {"lpopCommand", (unsigned long)lpopCommand},
5589 {"rpopCommand", (unsigned long)rpopCommand},
5590 {"llenCommand", (unsigned long)llenCommand},
5591 {"lindexCommand", (unsigned long)lindexCommand},
5592 {"lrangeCommand", (unsigned long)lrangeCommand},
5593 {"ltrimCommand", (unsigned long)ltrimCommand},
5594 {"typeCommand", (unsigned long)typeCommand},
5595 {"lsetCommand", (unsigned long)lsetCommand},
5596 {"saddCommand", (unsigned long)saddCommand},
5597 {"sremCommand", (unsigned long)sremCommand},
5598 {"smoveCommand", (unsigned long)smoveCommand},
5599 {"sismemberCommand", (unsigned long)sismemberCommand},
5600 {"scardCommand", (unsigned long)scardCommand},
5601 {"spopCommand", (unsigned long)spopCommand},
5602 {"srandmemberCommand", (unsigned long)srandmemberCommand},
5603 {"sinterCommand", (unsigned long)sinterCommand},
5604 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
5605 {"sunionCommand", (unsigned long)sunionCommand},
5606 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
5607 {"sdiffCommand", (unsigned long)sdiffCommand},
5608 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
5609 {"syncCommand", (unsigned long)syncCommand},
5610 {"flushdbCommand", (unsigned long)flushdbCommand},
5611 {"flushallCommand", (unsigned long)flushallCommand},
5612 {"sortCommand", (unsigned long)sortCommand},
5613 {"lremCommand", (unsigned long)lremCommand},
5614 {"infoCommand", (unsigned long)infoCommand},
5615 {"mgetCommand", (unsigned long)mgetCommand},
5616 {"monitorCommand", (unsigned long)monitorCommand},
5617 {"expireCommand", (unsigned long)expireCommand},
5618 {"expireatCommand", (unsigned long)expireatCommand},
5619 {"getsetCommand", (unsigned long)getsetCommand},
5620 {"ttlCommand", (unsigned long)ttlCommand},
5621 {"slaveofCommand", (unsigned long)slaveofCommand},
5622 {"debugCommand", (unsigned long)debugCommand},
5623 {"processCommand", (unsigned long)processCommand},
5624 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
5625 {"readQueryFromClient", (unsigned long)readQueryFromClient},
5626 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
5627 {"msetGenericCommand", (unsigned long)msetGenericCommand},
5628 {"msetCommand", (unsigned long)msetCommand},
5629 {"msetnxCommand", (unsigned long)msetnxCommand},
5630 {"zslCreateNode", (unsigned long)zslCreateNode},
5631 {"zslCreate", (unsigned long)zslCreate},
5632 {"zslFreeNode",(unsigned long)zslFreeNode},
5633 {"zslFree",(unsigned long)zslFree},
5634 {"zslRandomLevel",(unsigned long)zslRandomLevel},
5635 {"zslInsert",(unsigned long)zslInsert},
5636 {"zslDelete",(unsigned long)zslDelete},
5637 {"createZsetObject",(unsigned long)createZsetObject},
5638 {"zaddCommand",(unsigned long)zaddCommand},
5639 {"zrangeGenericCommand",(unsigned long)zrangeGenericCommand},
5640 {"zrangeCommand",(unsigned long)zrangeCommand},
5641 {"zrevrangeCommand",(unsigned long)zrevrangeCommand},
5642 {"zremCommand",(unsigned long)zremCommand},
5643 {"rdbSaveDoubleValue",(unsigned long)rdbSaveDoubleValue},
5644 {"rdbLoadDoubleValue",(unsigned long)rdbLoadDoubleValue},
5645 {"feedAppendOnlyFile",(unsigned long)feedAppendOnlyFile},
5646 {NULL,0}
5647 };
5648
5649 /* This function try to convert a pointer into a function name. It's used in
5650 * oreder to provide a backtrace under segmentation fault that's able to
5651 * display functions declared as static (otherwise the backtrace is useless). */
5652 static char *findFuncName(void *pointer, unsigned long *offset){
5653 int i, ret = -1;
5654 unsigned long off, minoff = 0;
5655
5656 /* Try to match against the Symbol with the smallest offset */
5657 for (i=0; symsTable[i].pointer; i++) {
5658 unsigned long lp = (unsigned long) pointer;
5659
5660 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5661 off=lp-symsTable[i].pointer;
5662 if (ret < 0 || off < minoff) {
5663 minoff=off;
5664 ret=i;
5665 }
5666 }
5667 }
5668 if (ret == -1) return NULL;
5669 *offset = minoff;
5670 return symsTable[ret].name;
5671 }
5672
5673 static void *getMcontextEip(ucontext_t *uc) {
5674 #if defined(__FreeBSD__)
5675 return (void*) uc->uc_mcontext.mc_eip;
5676 #elif defined(__dietlibc__)
5677 return (void*) uc->uc_mcontext.eip;
5678 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5679 return (void*) uc->uc_mcontext->__ss.__eip;
5680 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5681 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5682 return (void*) uc->uc_mcontext->__ss.__rip;
5683 #else
5684 return (void*) uc->uc_mcontext->__ss.__eip;
5685 #endif
5686 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5687 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5688 #elif defined(__ia64__) /* Linux IA64 */
5689 return (void*) uc->uc_mcontext.sc_ip;
5690 #else
5691 return NULL;
5692 #endif
5693 }
5694
5695 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5696 void *trace[100];
5697 char **messages = NULL;
5698 int i, trace_size = 0;
5699 unsigned long offset=0;
5700 time_t uptime = time(NULL)-server.stat_starttime;
5701 ucontext_t *uc = (ucontext_t*) secret;
5702 REDIS_NOTUSED(info);
5703
5704 redisLog(REDIS_WARNING,
5705 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5706 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5707 "redis_version:%s; "
5708 "uptime_in_seconds:%d; "
5709 "connected_clients:%d; "
5710 "connected_slaves:%d; "
5711 "used_memory:%zu; "
5712 "changes_since_last_save:%lld; "
5713 "bgsave_in_progress:%d; "
5714 "last_save_time:%d; "
5715 "total_connections_received:%lld; "
5716 "total_commands_processed:%lld; "
5717 "role:%s;"
5718 ,REDIS_VERSION,
5719 uptime,
5720 listLength(server.clients)-listLength(server.slaves),
5721 listLength(server.slaves),
5722 server.usedmemory,
5723 server.dirty,
5724 server.bgsaveinprogress,
5725 server.lastsave,
5726 server.stat_numconnections,
5727 server.stat_numcommands,
5728 server.masterhost == NULL ? "master" : "slave"
5729 ));
5730
5731 trace_size = backtrace(trace, 100);
5732 /* overwrite sigaction with caller's address */
5733 if (getMcontextEip(uc) != NULL) {
5734 trace[1] = getMcontextEip(uc);
5735 }
5736 messages = backtrace_symbols(trace, trace_size);
5737
5738 for (i=1; i<trace_size; ++i) {
5739 char *fn = findFuncName(trace[i], &offset), *p;
5740
5741 p = strchr(messages[i],'+');
5742 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5743 redisLog(REDIS_WARNING,"%s", messages[i]);
5744 } else {
5745 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5746 }
5747 }
5748 free(messages);
5749 exit(0);
5750 }
5751
5752 static void setupSigSegvAction(void) {
5753 struct sigaction act;
5754
5755 sigemptyset (&act.sa_mask);
5756 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5757 * is used. Otherwise, sa_handler is used */
5758 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5759 act.sa_sigaction = segvHandler;
5760 sigaction (SIGSEGV, &act, NULL);
5761 sigaction (SIGBUS, &act, NULL);
5762 sigaction (SIGFPE, &act, NULL);
5763 sigaction (SIGILL, &act, NULL);
5764 sigaction (SIGBUS, &act, NULL);
5765 return;
5766 }
5767 #else /* HAVE_BACKTRACE */
5768 static void setupSigSegvAction(void) {
5769 }
5770 #endif /* HAVE_BACKTRACE */
5771
5772 /* =================================== Main! ================================ */
5773
5774 #ifdef __linux__
5775 int linuxOvercommitMemoryValue(void) {
5776 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5777 char buf[64];
5778
5779 if (!fp) return -1;
5780 if (fgets(buf,64,fp) == NULL) {
5781 fclose(fp);
5782 return -1;
5783 }
5784 fclose(fp);
5785
5786 return atoi(buf);
5787 }
5788
5789 void linuxOvercommitMemoryWarning(void) {
5790 if (linuxOvercommitMemoryValue() == 0) {
5791 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5792 }
5793 }
5794 #endif /* __linux__ */
5795
5796 static void daemonize(void) {
5797 int fd;
5798 FILE *fp;
5799
5800 if (fork() != 0) exit(0); /* parent exits */
5801 setsid(); /* create a new session */
5802
5803 /* Every output goes to /dev/null. If Redis is daemonized but
5804 * the 'logfile' is set to 'stdout' in the configuration file
5805 * it will not log at all. */
5806 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5807 dup2(fd, STDIN_FILENO);
5808 dup2(fd, STDOUT_FILENO);
5809 dup2(fd, STDERR_FILENO);
5810 if (fd > STDERR_FILENO) close(fd);
5811 }
5812 /* Try to write the pid file */
5813 fp = fopen(server.pidfile,"w");
5814 if (fp) {
5815 fprintf(fp,"%d\n",getpid());
5816 fclose(fp);
5817 }
5818 }
5819
5820 int main(int argc, char **argv) {
5821 initServerConfig();
5822 if (argc == 2) {
5823 ResetServerSaveParams();
5824 loadServerConfig(argv[1]);
5825 } else if (argc > 2) {
5826 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5827 exit(1);
5828 } else {
5829 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5830 }
5831 initServer();
5832 if (server.daemonize) daemonize();
5833 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5834 #ifdef __linux__
5835 linuxOvercommitMemoryWarning();
5836 #endif
5837 if (server.appendonly) {
5838 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
5839 redisLog(REDIS_NOTICE,"DB loaded from append only file");
5840 } else {
5841 if (rdbLoad(server.dbfilename) == REDIS_OK)
5842 redisLog(REDIS_NOTICE,"DB loaded from disk");
5843 }
5844 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5845 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5846 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5847 aeMain(server.el);
5848 aeDeleteEventLoop(server.el);
5849 return 0;
5850 }