]> git.saurik.com Git - redis.git/blob - redis.c
21098e56dab98f971bd90cdac40d90a6c28c906a
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60 #include <math.h>
61
62 #if defined(__sun)
63 #include "solarisfixes.h"
64 #endif
65
66 #include "redis.h"
67 #include "ae.h" /* Event driven programming library */
68 #include "sds.h" /* Dynamic safe strings */
69 #include "anet.h" /* Networking the easy way */
70 #include "dict.h" /* Hash tables */
71 #include "adlist.h" /* Linked lists */
72 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
73 #include "lzf.h" /* LZF compression library */
74 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
75
76 /* Error codes */
77 #define REDIS_OK 0
78 #define REDIS_ERR -1
79
80 /* Static server configuration */
81 #define REDIS_SERVERPORT 6379 /* TCP port */
82 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
83 #define REDIS_IOBUF_LEN 1024
84 #define REDIS_LOADBUF_LEN 1024
85 #define REDIS_STATIC_ARGS 4
86 #define REDIS_DEFAULT_DBNUM 16
87 #define REDIS_CONFIGLINE_MAX 1024
88 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
89 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
90 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
91 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
92 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
93
94 /* Hash table parameters */
95 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
96
97 /* Command flags */
98 #define REDIS_CMD_BULK 1 /* Bulk write command */
99 #define REDIS_CMD_INLINE 2 /* Inline command */
100 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
101 this flags will return an error when the 'maxmemory' option is set in the
102 config file and the server is using more than maxmemory bytes of memory.
103 In short this commands are denied on low memory conditions. */
104 #define REDIS_CMD_DENYOOM 4
105
106 /* Object types */
107 #define REDIS_STRING 0
108 #define REDIS_LIST 1
109 #define REDIS_SET 2
110 #define REDIS_ZSET 3
111 #define REDIS_HASH 4
112
113 /* Objects encoding */
114 #define REDIS_ENCODING_RAW 0 /* Raw representation */
115 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
116
117 /* Object types only used for dumping to disk */
118 #define REDIS_EXPIRETIME 253
119 #define REDIS_SELECTDB 254
120 #define REDIS_EOF 255
121
122 /* Defines related to the dump file format. To store 32 bits lengths for short
123 * keys requires a lot of space, so we check the most significant 2 bits of
124 * the first byte to interpreter the length:
125 *
126 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
127 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
128 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
129 * 11|000000 this means: specially encoded object will follow. The six bits
130 * number specify the kind of object that follows.
131 * See the REDIS_RDB_ENC_* defines.
132 *
133 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
134 * values, will fit inside. */
135 #define REDIS_RDB_6BITLEN 0
136 #define REDIS_RDB_14BITLEN 1
137 #define REDIS_RDB_32BITLEN 2
138 #define REDIS_RDB_ENCVAL 3
139 #define REDIS_RDB_LENERR UINT_MAX
140
141 /* When a length of a string object stored on disk has the first two bits
142 * set, the remaining two bits specify a special encoding for the object
143 * accordingly to the following defines: */
144 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
145 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
146 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
147 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
148
149 /* Client flags */
150 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
151 #define REDIS_SLAVE 2 /* This client is a slave server */
152 #define REDIS_MASTER 4 /* This client is a master server */
153 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
154
155 /* Slave replication state - slave side */
156 #define REDIS_REPL_NONE 0 /* No active replication */
157 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
158 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
159
160 /* Slave replication state - from the point of view of master
161 * Note that in SEND_BULK and ONLINE state the slave receives new updates
162 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
163 * to start the next background saving in order to send updates to it. */
164 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
165 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
166 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
167 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
168
169 /* List related stuff */
170 #define REDIS_HEAD 0
171 #define REDIS_TAIL 1
172
173 /* Sort operations */
174 #define REDIS_SORT_GET 0
175 #define REDIS_SORT_ASC 1
176 #define REDIS_SORT_DESC 2
177 #define REDIS_SORTKEY_MAX 1024
178
179 /* Log levels */
180 #define REDIS_DEBUG 0
181 #define REDIS_NOTICE 1
182 #define REDIS_WARNING 2
183
184 /* Anti-warning macro... */
185 #define REDIS_NOTUSED(V) ((void) V)
186
187 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
188 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
189
190 /* Append only defines */
191 #define APPENDFSYNC_NO 0
192 #define APPENDFSYNC_ALWAYS 1
193 #define APPENDFSYNC_EVERYSEC 2
194
195 /*================================= Data types ============================== */
196
197 /* A redis object, that is a type able to hold a string / list / set */
198 typedef struct redisObject {
199 void *ptr;
200 unsigned char type;
201 unsigned char encoding;
202 unsigned char notused[2];
203 int refcount;
204 } robj;
205
206 typedef struct redisDb {
207 dict *dict;
208 dict *expires;
209 int id;
210 } redisDb;
211
212 /* With multiplexing we need to take per-clinet state.
213 * Clients are taken in a liked list. */
214 typedef struct redisClient {
215 int fd;
216 redisDb *db;
217 int dictid;
218 sds querybuf;
219 robj **argv, **mbargv;
220 int argc, mbargc;
221 int bulklen; /* bulk read len. -1 if not in bulk read mode */
222 int multibulk; /* multi bulk command format active */
223 list *reply;
224 int sentlen;
225 time_t lastinteraction; /* time of the last interaction, used for timeout */
226 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
227 int slaveseldb; /* slave selected db, if this client is a slave */
228 int authenticated; /* when requirepass is non-NULL */
229 int replstate; /* replication state if this is a slave */
230 int repldbfd; /* replication DB file descriptor */
231 long repldboff; /* replication DB file offset */
232 off_t repldbsize; /* replication DB file size */
233 } redisClient;
234
235 struct saveparam {
236 time_t seconds;
237 int changes;
238 };
239
240 /* Global server state structure */
241 struct redisServer {
242 int port;
243 int fd;
244 redisDb *db;
245 dict *sharingpool;
246 unsigned int sharingpoolsize;
247 long long dirty; /* changes to DB from the last save */
248 list *clients;
249 list *slaves, *monitors;
250 char neterr[ANET_ERR_LEN];
251 aeEventLoop *el;
252 int cronloops; /* number of times the cron function run */
253 list *objfreelist; /* A list of freed objects to avoid malloc() */
254 time_t lastsave; /* Unix time of last save succeeede */
255 size_t usedmemory; /* Used memory in megabytes */
256 /* Fields used only for stats */
257 time_t stat_starttime; /* server start time */
258 long long stat_numcommands; /* number of processed commands */
259 long long stat_numconnections; /* number of connections received */
260 /* Configuration */
261 int verbosity;
262 int glueoutputbuf;
263 int maxidletime;
264 int dbnum;
265 int daemonize;
266 int appendonly;
267 int appendfsync;
268 time_t lastfsync;
269 int appendfd;
270 int appendseldb;
271 char *pidfile;
272 int bgsaveinprogress;
273 pid_t bgsavechildpid;
274 struct saveparam *saveparams;
275 int saveparamslen;
276 char *logfile;
277 char *bindaddr;
278 char *dbfilename;
279 char *appendfilename;
280 char *requirepass;
281 int shareobjects;
282 /* Replication related */
283 int isslave;
284 char *masterauth;
285 char *masterhost;
286 int masterport;
287 redisClient *master; /* client that is master for this slave */
288 int replstate;
289 unsigned int maxclients;
290 unsigned long maxmemory;
291 /* Sort parameters - qsort_r() is only available under BSD so we
292 * have to take this state global, in order to pass it to sortCompare() */
293 int sort_desc;
294 int sort_alpha;
295 int sort_bypattern;
296 };
297
298 typedef void redisCommandProc(redisClient *c);
299 struct redisCommand {
300 char *name;
301 redisCommandProc *proc;
302 int arity;
303 int flags;
304 };
305
306 struct redisFunctionSym {
307 char *name;
308 unsigned long pointer;
309 };
310
311 typedef struct _redisSortObject {
312 robj *obj;
313 union {
314 double score;
315 robj *cmpobj;
316 } u;
317 } redisSortObject;
318
319 typedef struct _redisSortOperation {
320 int type;
321 robj *pattern;
322 } redisSortOperation;
323
324 /* ZSETs use a specialized version of Skiplists */
325
326 typedef struct zskiplistNode {
327 struct zskiplistNode **forward;
328 struct zskiplistNode *backward;
329 double score;
330 robj *obj;
331 } zskiplistNode;
332
333 typedef struct zskiplist {
334 struct zskiplistNode *header, *tail;
335 unsigned long length;
336 int level;
337 } zskiplist;
338
339 typedef struct zset {
340 dict *dict;
341 zskiplist *zsl;
342 } zset;
343
344 /* Our shared "common" objects */
345
346 struct sharedObjectsStruct {
347 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
348 *colon, *nullbulk, *nullmultibulk,
349 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
350 *outofrangeerr, *plus,
351 *select0, *select1, *select2, *select3, *select4,
352 *select5, *select6, *select7, *select8, *select9;
353 } shared;
354
355 /* Global vars that are actally used as constants. The following double
356 * values are used for double on-disk serialization, and are initialized
357 * at runtime to avoid strange compiler optimizations. */
358
359 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
360
361 /*================================ Prototypes =============================== */
362
363 static void freeStringObject(robj *o);
364 static void freeListObject(robj *o);
365 static void freeSetObject(robj *o);
366 static void decrRefCount(void *o);
367 static robj *createObject(int type, void *ptr);
368 static void freeClient(redisClient *c);
369 static int rdbLoad(char *filename);
370 static void addReply(redisClient *c, robj *obj);
371 static void addReplySds(redisClient *c, sds s);
372 static void incrRefCount(robj *o);
373 static int rdbSaveBackground(char *filename);
374 static robj *createStringObject(char *ptr, size_t len);
375 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
376 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
377 static int syncWithMaster(void);
378 static robj *tryObjectSharing(robj *o);
379 static int tryObjectEncoding(robj *o);
380 static robj *getDecodedObject(const robj *o);
381 static int removeExpire(redisDb *db, robj *key);
382 static int expireIfNeeded(redisDb *db, robj *key);
383 static int deleteIfVolatile(redisDb *db, robj *key);
384 static int deleteKey(redisDb *db, robj *key);
385 static time_t getExpire(redisDb *db, robj *key);
386 static int setExpire(redisDb *db, robj *key, time_t when);
387 static void updateSlavesWaitingBgsave(int bgsaveerr);
388 static void freeMemoryIfNeeded(void);
389 static int processCommand(redisClient *c);
390 static void setupSigSegvAction(void);
391 static void rdbRemoveTempFile(pid_t childpid);
392 static size_t stringObjectLen(robj *o);
393 static void processInputBuffer(redisClient *c);
394 static zskiplist *zslCreate(void);
395 static void zslFree(zskiplist *zsl);
396 static void zslInsert(zskiplist *zsl, double score, robj *obj);
397
398 static void authCommand(redisClient *c);
399 static void pingCommand(redisClient *c);
400 static void echoCommand(redisClient *c);
401 static void setCommand(redisClient *c);
402 static void setnxCommand(redisClient *c);
403 static void getCommand(redisClient *c);
404 static void delCommand(redisClient *c);
405 static void existsCommand(redisClient *c);
406 static void incrCommand(redisClient *c);
407 static void decrCommand(redisClient *c);
408 static void incrbyCommand(redisClient *c);
409 static void decrbyCommand(redisClient *c);
410 static void selectCommand(redisClient *c);
411 static void randomkeyCommand(redisClient *c);
412 static void keysCommand(redisClient *c);
413 static void dbsizeCommand(redisClient *c);
414 static void lastsaveCommand(redisClient *c);
415 static void saveCommand(redisClient *c);
416 static void bgsaveCommand(redisClient *c);
417 static void shutdownCommand(redisClient *c);
418 static void moveCommand(redisClient *c);
419 static void renameCommand(redisClient *c);
420 static void renamenxCommand(redisClient *c);
421 static void lpushCommand(redisClient *c);
422 static void rpushCommand(redisClient *c);
423 static void lpopCommand(redisClient *c);
424 static void rpopCommand(redisClient *c);
425 static void llenCommand(redisClient *c);
426 static void lindexCommand(redisClient *c);
427 static void lrangeCommand(redisClient *c);
428 static void ltrimCommand(redisClient *c);
429 static void typeCommand(redisClient *c);
430 static void lsetCommand(redisClient *c);
431 static void saddCommand(redisClient *c);
432 static void sremCommand(redisClient *c);
433 static void smoveCommand(redisClient *c);
434 static void sismemberCommand(redisClient *c);
435 static void scardCommand(redisClient *c);
436 static void spopCommand(redisClient *c);
437 static void srandmemberCommand(redisClient *c);
438 static void sinterCommand(redisClient *c);
439 static void sinterstoreCommand(redisClient *c);
440 static void sunionCommand(redisClient *c);
441 static void sunionstoreCommand(redisClient *c);
442 static void sdiffCommand(redisClient *c);
443 static void sdiffstoreCommand(redisClient *c);
444 static void syncCommand(redisClient *c);
445 static void flushdbCommand(redisClient *c);
446 static void flushallCommand(redisClient *c);
447 static void sortCommand(redisClient *c);
448 static void lremCommand(redisClient *c);
449 static void infoCommand(redisClient *c);
450 static void mgetCommand(redisClient *c);
451 static void monitorCommand(redisClient *c);
452 static void expireCommand(redisClient *c);
453 static void expireatCommand(redisClient *c);
454 static void getsetCommand(redisClient *c);
455 static void ttlCommand(redisClient *c);
456 static void slaveofCommand(redisClient *c);
457 static void debugCommand(redisClient *c);
458 static void msetCommand(redisClient *c);
459 static void msetnxCommand(redisClient *c);
460 static void zaddCommand(redisClient *c);
461 static void zrangeCommand(redisClient *c);
462 static void zrangebyscoreCommand(redisClient *c);
463 static void zrevrangeCommand(redisClient *c);
464 static void zcardCommand(redisClient *c);
465 static void zremCommand(redisClient *c);
466 static void zscoreCommand(redisClient *c);
467 static void zremrangebyscoreCommand(redisClient *c);
468
469 /*================================= Globals ================================= */
470
471 /* Global vars */
472 static struct redisServer server; /* server global state */
473 static struct redisCommand cmdTable[] = {
474 {"get",getCommand,2,REDIS_CMD_INLINE},
475 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
476 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
477 {"del",delCommand,-2,REDIS_CMD_INLINE},
478 {"exists",existsCommand,2,REDIS_CMD_INLINE},
479 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
480 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
481 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
482 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
483 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
484 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
485 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
486 {"llen",llenCommand,2,REDIS_CMD_INLINE},
487 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
488 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
489 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
490 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
491 {"lrem",lremCommand,4,REDIS_CMD_BULK},
492 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
493 {"srem",sremCommand,3,REDIS_CMD_BULK},
494 {"smove",smoveCommand,4,REDIS_CMD_BULK},
495 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
496 {"scard",scardCommand,2,REDIS_CMD_INLINE},
497 {"spop",spopCommand,2,REDIS_CMD_INLINE},
498 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
499 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
500 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
501 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
502 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
503 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
504 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
505 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
506 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
507 {"zrem",zremCommand,3,REDIS_CMD_BULK},
508 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
509 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
510 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
511 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
512 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
513 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
514 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
515 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
516 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
517 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
518 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
519 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
520 {"select",selectCommand,2,REDIS_CMD_INLINE},
521 {"move",moveCommand,3,REDIS_CMD_INLINE},
522 {"rename",renameCommand,3,REDIS_CMD_INLINE},
523 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
524 {"expire",expireCommand,3,REDIS_CMD_INLINE},
525 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
526 {"keys",keysCommand,2,REDIS_CMD_INLINE},
527 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
528 {"auth",authCommand,2,REDIS_CMD_INLINE},
529 {"ping",pingCommand,1,REDIS_CMD_INLINE},
530 {"echo",echoCommand,2,REDIS_CMD_BULK},
531 {"save",saveCommand,1,REDIS_CMD_INLINE},
532 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
533 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
534 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
535 {"type",typeCommand,2,REDIS_CMD_INLINE},
536 {"sync",syncCommand,1,REDIS_CMD_INLINE},
537 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
538 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
539 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
540 {"info",infoCommand,1,REDIS_CMD_INLINE},
541 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
542 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
543 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
544 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
545 {NULL,NULL,0,0}
546 };
547 /*============================ Utility functions ============================ */
548
549 /* Glob-style pattern matching. */
550 int stringmatchlen(const char *pattern, int patternLen,
551 const char *string, int stringLen, int nocase)
552 {
553 while(patternLen) {
554 switch(pattern[0]) {
555 case '*':
556 while (pattern[1] == '*') {
557 pattern++;
558 patternLen--;
559 }
560 if (patternLen == 1)
561 return 1; /* match */
562 while(stringLen) {
563 if (stringmatchlen(pattern+1, patternLen-1,
564 string, stringLen, nocase))
565 return 1; /* match */
566 string++;
567 stringLen--;
568 }
569 return 0; /* no match */
570 break;
571 case '?':
572 if (stringLen == 0)
573 return 0; /* no match */
574 string++;
575 stringLen--;
576 break;
577 case '[':
578 {
579 int not, match;
580
581 pattern++;
582 patternLen--;
583 not = pattern[0] == '^';
584 if (not) {
585 pattern++;
586 patternLen--;
587 }
588 match = 0;
589 while(1) {
590 if (pattern[0] == '\\') {
591 pattern++;
592 patternLen--;
593 if (pattern[0] == string[0])
594 match = 1;
595 } else if (pattern[0] == ']') {
596 break;
597 } else if (patternLen == 0) {
598 pattern--;
599 patternLen++;
600 break;
601 } else if (pattern[1] == '-' && patternLen >= 3) {
602 int start = pattern[0];
603 int end = pattern[2];
604 int c = string[0];
605 if (start > end) {
606 int t = start;
607 start = end;
608 end = t;
609 }
610 if (nocase) {
611 start = tolower(start);
612 end = tolower(end);
613 c = tolower(c);
614 }
615 pattern += 2;
616 patternLen -= 2;
617 if (c >= start && c <= end)
618 match = 1;
619 } else {
620 if (!nocase) {
621 if (pattern[0] == string[0])
622 match = 1;
623 } else {
624 if (tolower((int)pattern[0]) == tolower((int)string[0]))
625 match = 1;
626 }
627 }
628 pattern++;
629 patternLen--;
630 }
631 if (not)
632 match = !match;
633 if (!match)
634 return 0; /* no match */
635 string++;
636 stringLen--;
637 break;
638 }
639 case '\\':
640 if (patternLen >= 2) {
641 pattern++;
642 patternLen--;
643 }
644 /* fall through */
645 default:
646 if (!nocase) {
647 if (pattern[0] != string[0])
648 return 0; /* no match */
649 } else {
650 if (tolower((int)pattern[0]) != tolower((int)string[0]))
651 return 0; /* no match */
652 }
653 string++;
654 stringLen--;
655 break;
656 }
657 pattern++;
658 patternLen--;
659 if (stringLen == 0) {
660 while(*pattern == '*') {
661 pattern++;
662 patternLen--;
663 }
664 break;
665 }
666 }
667 if (patternLen == 0 && stringLen == 0)
668 return 1;
669 return 0;
670 }
671
672 static void redisLog(int level, const char *fmt, ...) {
673 va_list ap;
674 FILE *fp;
675
676 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
677 if (!fp) return;
678
679 va_start(ap, fmt);
680 if (level >= server.verbosity) {
681 char *c = ".-*";
682 char buf[64];
683 time_t now;
684
685 now = time(NULL);
686 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
687 fprintf(fp,"%s %c ",buf,c[level]);
688 vfprintf(fp, fmt, ap);
689 fprintf(fp,"\n");
690 fflush(fp);
691 }
692 va_end(ap);
693
694 if (server.logfile) fclose(fp);
695 }
696
697 /*====================== Hash table type implementation ==================== */
698
699 /* This is an hash table type that uses the SDS dynamic strings libary as
700 * keys and radis objects as values (objects can hold SDS strings,
701 * lists, sets). */
702
703 static void dictVanillaFree(void *privdata, void *val)
704 {
705 DICT_NOTUSED(privdata);
706 zfree(val);
707 }
708
709 static int sdsDictKeyCompare(void *privdata, const void *key1,
710 const void *key2)
711 {
712 int l1,l2;
713 DICT_NOTUSED(privdata);
714
715 l1 = sdslen((sds)key1);
716 l2 = sdslen((sds)key2);
717 if (l1 != l2) return 0;
718 return memcmp(key1, key2, l1) == 0;
719 }
720
721 static void dictRedisObjectDestructor(void *privdata, void *val)
722 {
723 DICT_NOTUSED(privdata);
724
725 decrRefCount(val);
726 }
727
728 static int dictObjKeyCompare(void *privdata, const void *key1,
729 const void *key2)
730 {
731 const robj *o1 = key1, *o2 = key2;
732 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
733 }
734
735 static unsigned int dictObjHash(const void *key) {
736 const robj *o = key;
737 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
738 }
739
740 static int dictEncObjKeyCompare(void *privdata, const void *key1,
741 const void *key2)
742 {
743 const robj *o1 = key1, *o2 = key2;
744
745 if (o1->encoding == REDIS_ENCODING_RAW &&
746 o2->encoding == REDIS_ENCODING_RAW)
747 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
748 else {
749 robj *dec1, *dec2;
750 int cmp;
751
752 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
753 getDecodedObject(o1) : (robj*)o1;
754 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
755 getDecodedObject(o2) : (robj*)o2;
756 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
757 if (dec1 != o1) decrRefCount(dec1);
758 if (dec2 != o2) decrRefCount(dec2);
759 return cmp;
760 }
761 }
762
763 static unsigned int dictEncObjHash(const void *key) {
764 const robj *o = key;
765
766 if (o->encoding == REDIS_ENCODING_RAW)
767 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
768 else {
769 robj *dec = getDecodedObject(o);
770 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
771 decrRefCount(dec);
772 return hash;
773 }
774 }
775
776 static dictType setDictType = {
777 dictEncObjHash, /* hash function */
778 NULL, /* key dup */
779 NULL, /* val dup */
780 dictEncObjKeyCompare, /* key compare */
781 dictRedisObjectDestructor, /* key destructor */
782 NULL /* val destructor */
783 };
784
785 static dictType zsetDictType = {
786 dictEncObjHash, /* hash function */
787 NULL, /* key dup */
788 NULL, /* val dup */
789 dictEncObjKeyCompare, /* key compare */
790 dictRedisObjectDestructor, /* key destructor */
791 dictVanillaFree /* val destructor */
792 };
793
794 static dictType hashDictType = {
795 dictObjHash, /* hash function */
796 NULL, /* key dup */
797 NULL, /* val dup */
798 dictObjKeyCompare, /* key compare */
799 dictRedisObjectDestructor, /* key destructor */
800 dictRedisObjectDestructor /* val destructor */
801 };
802
803 /* ========================= Random utility functions ======================= */
804
805 /* Redis generally does not try to recover from out of memory conditions
806 * when allocating objects or strings, it is not clear if it will be possible
807 * to report this condition to the client since the networking layer itself
808 * is based on heap allocation for send buffers, so we simply abort.
809 * At least the code will be simpler to read... */
810 static void oom(const char *msg) {
811 fprintf(stderr, "%s: Out of memory\n",msg);
812 fflush(stderr);
813 sleep(1);
814 abort();
815 }
816
817 /* ====================== Redis server networking stuff ===================== */
818 static void closeTimedoutClients(void) {
819 redisClient *c;
820 listNode *ln;
821 time_t now = time(NULL);
822
823 listRewind(server.clients);
824 while ((ln = listYield(server.clients)) != NULL) {
825 c = listNodeValue(ln);
826 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
827 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
828 (now - c->lastinteraction > server.maxidletime)) {
829 redisLog(REDIS_DEBUG,"Closing idle client");
830 freeClient(c);
831 }
832 }
833 }
834
835 static int htNeedsResize(dict *dict) {
836 long long size, used;
837
838 size = dictSlots(dict);
839 used = dictSize(dict);
840 return (size && used && size > DICT_HT_INITIAL_SIZE &&
841 (used*100/size < REDIS_HT_MINFILL));
842 }
843
844 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
845 * we resize the hash table to save memory */
846 static void tryResizeHashTables(void) {
847 int j;
848
849 for (j = 0; j < server.dbnum; j++) {
850 if (htNeedsResize(server.db[j].dict)) {
851 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
852 dictResize(server.db[j].dict);
853 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
854 }
855 if (htNeedsResize(server.db[j].expires))
856 dictResize(server.db[j].expires);
857 }
858 }
859
860 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
861 int j, loops = server.cronloops++;
862 REDIS_NOTUSED(eventLoop);
863 REDIS_NOTUSED(id);
864 REDIS_NOTUSED(clientData);
865
866 /* Update the global state with the amount of used memory */
867 server.usedmemory = zmalloc_used_memory();
868
869 /* Show some info about non-empty databases */
870 for (j = 0; j < server.dbnum; j++) {
871 long long size, used, vkeys;
872
873 size = dictSlots(server.db[j].dict);
874 used = dictSize(server.db[j].dict);
875 vkeys = dictSize(server.db[j].expires);
876 if (!(loops % 5) && (used || vkeys)) {
877 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
878 /* dictPrintStats(server.dict); */
879 }
880 }
881
882 /* We don't want to resize the hash tables while a bacground saving
883 * is in progress: the saving child is created using fork() that is
884 * implemented with a copy-on-write semantic in most modern systems, so
885 * if we resize the HT while there is the saving child at work actually
886 * a lot of memory movements in the parent will cause a lot of pages
887 * copied. */
888 if (!server.bgsaveinprogress) tryResizeHashTables();
889
890 /* Show information about connected clients */
891 if (!(loops % 5)) {
892 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
893 listLength(server.clients)-listLength(server.slaves),
894 listLength(server.slaves),
895 server.usedmemory,
896 dictSize(server.sharingpool));
897 }
898
899 /* Close connections of timedout clients */
900 if (server.maxidletime && !(loops % 10))
901 closeTimedoutClients();
902
903 /* Check if a background saving in progress terminated */
904 if (server.bgsaveinprogress) {
905 int statloc;
906 if (wait4(-1,&statloc,WNOHANG,NULL)) {
907 int exitcode = WEXITSTATUS(statloc);
908 int bysignal = WIFSIGNALED(statloc);
909
910 if (!bysignal && exitcode == 0) {
911 redisLog(REDIS_NOTICE,
912 "Background saving terminated with success");
913 server.dirty = 0;
914 server.lastsave = time(NULL);
915 } else if (!bysignal && exitcode != 0) {
916 redisLog(REDIS_WARNING, "Background saving error");
917 } else {
918 redisLog(REDIS_WARNING,
919 "Background saving terminated by signal");
920 rdbRemoveTempFile(server.bgsavechildpid);
921 }
922 server.bgsaveinprogress = 0;
923 server.bgsavechildpid = -1;
924 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
925 }
926 } else {
927 /* If there is not a background saving in progress check if
928 * we have to save now */
929 time_t now = time(NULL);
930 for (j = 0; j < server.saveparamslen; j++) {
931 struct saveparam *sp = server.saveparams+j;
932
933 if (server.dirty >= sp->changes &&
934 now-server.lastsave > sp->seconds) {
935 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
936 sp->changes, sp->seconds);
937 rdbSaveBackground(server.dbfilename);
938 break;
939 }
940 }
941 }
942
943 /* Try to expire a few timed out keys */
944 for (j = 0; j < server.dbnum; j++) {
945 redisDb *db = server.db+j;
946 int num = dictSize(db->expires);
947
948 if (num) {
949 time_t now = time(NULL);
950
951 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
952 num = REDIS_EXPIRELOOKUPS_PER_CRON;
953 while (num--) {
954 dictEntry *de;
955 time_t t;
956
957 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
958 t = (time_t) dictGetEntryVal(de);
959 if (now > t) {
960 deleteKey(db,dictGetEntryKey(de));
961 }
962 }
963 }
964 }
965
966 /* Check if we should connect to a MASTER */
967 if (server.replstate == REDIS_REPL_CONNECT) {
968 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
969 if (syncWithMaster() == REDIS_OK) {
970 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
971 }
972 }
973 return 1000;
974 }
975
976 static void createSharedObjects(void) {
977 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
978 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
979 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
980 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
981 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
982 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
983 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
984 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
985 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
986 /* no such key */
987 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
988 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
989 "-ERR Operation against a key holding the wrong kind of value\r\n"));
990 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
991 "-ERR no such key\r\n"));
992 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
993 "-ERR syntax error\r\n"));
994 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
995 "-ERR source and destination objects are the same\r\n"));
996 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
997 "-ERR index out of range\r\n"));
998 shared.space = createObject(REDIS_STRING,sdsnew(" "));
999 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1000 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1001 shared.select0 = createStringObject("select 0\r\n",10);
1002 shared.select1 = createStringObject("select 1\r\n",10);
1003 shared.select2 = createStringObject("select 2\r\n",10);
1004 shared.select3 = createStringObject("select 3\r\n",10);
1005 shared.select4 = createStringObject("select 4\r\n",10);
1006 shared.select5 = createStringObject("select 5\r\n",10);
1007 shared.select6 = createStringObject("select 6\r\n",10);
1008 shared.select7 = createStringObject("select 7\r\n",10);
1009 shared.select8 = createStringObject("select 8\r\n",10);
1010 shared.select9 = createStringObject("select 9\r\n",10);
1011 }
1012
1013 static void appendServerSaveParams(time_t seconds, int changes) {
1014 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1015 server.saveparams[server.saveparamslen].seconds = seconds;
1016 server.saveparams[server.saveparamslen].changes = changes;
1017 server.saveparamslen++;
1018 }
1019
1020 static void ResetServerSaveParams() {
1021 zfree(server.saveparams);
1022 server.saveparams = NULL;
1023 server.saveparamslen = 0;
1024 }
1025
1026 static void initServerConfig() {
1027 server.dbnum = REDIS_DEFAULT_DBNUM;
1028 server.port = REDIS_SERVERPORT;
1029 server.verbosity = REDIS_DEBUG;
1030 server.maxidletime = REDIS_MAXIDLETIME;
1031 server.saveparams = NULL;
1032 server.logfile = NULL; /* NULL = log on standard output */
1033 server.bindaddr = NULL;
1034 server.glueoutputbuf = 1;
1035 server.daemonize = 0;
1036 server.appendonly = 0;
1037 server.appendfsync = APPENDFSYNC_ALWAYS;
1038 server.lastfsync = time(NULL);
1039 server.appendfd = -1;
1040 server.appendseldb = -1; /* Make sure the first time will not match */
1041 server.pidfile = "/var/run/redis.pid";
1042 server.dbfilename = "dump.rdb";
1043 server.appendfilename = "appendonly.log";
1044 server.requirepass = NULL;
1045 server.shareobjects = 0;
1046 server.sharingpoolsize = 1024;
1047 server.maxclients = 0;
1048 server.maxmemory = 0;
1049 ResetServerSaveParams();
1050
1051 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1052 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1053 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1054 /* Replication related */
1055 server.isslave = 0;
1056 server.masterauth = NULL;
1057 server.masterhost = NULL;
1058 server.masterport = 6379;
1059 server.master = NULL;
1060 server.replstate = REDIS_REPL_NONE;
1061
1062 /* Double constants initialization */
1063 R_Zero = 0.0;
1064 R_PosInf = 1.0/R_Zero;
1065 R_NegInf = -1.0/R_Zero;
1066 R_Nan = R_Zero/R_Zero;
1067 }
1068
1069 static void initServer() {
1070 int j;
1071
1072 signal(SIGHUP, SIG_IGN);
1073 signal(SIGPIPE, SIG_IGN);
1074 setupSigSegvAction();
1075
1076 server.clients = listCreate();
1077 server.slaves = listCreate();
1078 server.monitors = listCreate();
1079 server.objfreelist = listCreate();
1080 createSharedObjects();
1081 server.el = aeCreateEventLoop();
1082 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1083 server.sharingpool = dictCreate(&setDictType,NULL);
1084 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1085 if (server.fd == -1) {
1086 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1087 exit(1);
1088 }
1089 for (j = 0; j < server.dbnum; j++) {
1090 server.db[j].dict = dictCreate(&hashDictType,NULL);
1091 server.db[j].expires = dictCreate(&setDictType,NULL);
1092 server.db[j].id = j;
1093 }
1094 server.cronloops = 0;
1095 server.bgsaveinprogress = 0;
1096 server.bgsavechildpid = -1;
1097 server.lastsave = time(NULL);
1098 server.dirty = 0;
1099 server.usedmemory = 0;
1100 server.stat_numcommands = 0;
1101 server.stat_numconnections = 0;
1102 server.stat_starttime = time(NULL);
1103 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1104
1105 if (server.appendonly) {
1106 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1107 if (server.appendfd == -1) {
1108 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1109 strerror(errno));
1110 exit(1);
1111 }
1112 }
1113 }
1114
1115 /* Empty the whole database */
1116 static long long emptyDb() {
1117 int j;
1118 long long removed = 0;
1119
1120 for (j = 0; j < server.dbnum; j++) {
1121 removed += dictSize(server.db[j].dict);
1122 dictEmpty(server.db[j].dict);
1123 dictEmpty(server.db[j].expires);
1124 }
1125 return removed;
1126 }
1127
1128 static int yesnotoi(char *s) {
1129 if (!strcasecmp(s,"yes")) return 1;
1130 else if (!strcasecmp(s,"no")) return 0;
1131 else return -1;
1132 }
1133
1134 /* I agree, this is a very rudimental way to load a configuration...
1135 will improve later if the config gets more complex */
1136 static void loadServerConfig(char *filename) {
1137 FILE *fp;
1138 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1139 int linenum = 0;
1140 sds line = NULL;
1141
1142 if (filename[0] == '-' && filename[1] == '\0')
1143 fp = stdin;
1144 else {
1145 if ((fp = fopen(filename,"r")) == NULL) {
1146 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1147 exit(1);
1148 }
1149 }
1150
1151 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1152 sds *argv;
1153 int argc, j;
1154
1155 linenum++;
1156 line = sdsnew(buf);
1157 line = sdstrim(line," \t\r\n");
1158
1159 /* Skip comments and blank lines*/
1160 if (line[0] == '#' || line[0] == '\0') {
1161 sdsfree(line);
1162 continue;
1163 }
1164
1165 /* Split into arguments */
1166 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1167 sdstolower(argv[0]);
1168
1169 /* Execute config directives */
1170 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1171 server.maxidletime = atoi(argv[1]);
1172 if (server.maxidletime < 0) {
1173 err = "Invalid timeout value"; goto loaderr;
1174 }
1175 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1176 server.port = atoi(argv[1]);
1177 if (server.port < 1 || server.port > 65535) {
1178 err = "Invalid port"; goto loaderr;
1179 }
1180 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1181 server.bindaddr = zstrdup(argv[1]);
1182 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1183 int seconds = atoi(argv[1]);
1184 int changes = atoi(argv[2]);
1185 if (seconds < 1 || changes < 0) {
1186 err = "Invalid save parameters"; goto loaderr;
1187 }
1188 appendServerSaveParams(seconds,changes);
1189 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1190 if (chdir(argv[1]) == -1) {
1191 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1192 argv[1], strerror(errno));
1193 exit(1);
1194 }
1195 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1196 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1197 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1198 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1199 else {
1200 err = "Invalid log level. Must be one of debug, notice, warning";
1201 goto loaderr;
1202 }
1203 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1204 FILE *logfp;
1205
1206 server.logfile = zstrdup(argv[1]);
1207 if (!strcasecmp(server.logfile,"stdout")) {
1208 zfree(server.logfile);
1209 server.logfile = NULL;
1210 }
1211 if (server.logfile) {
1212 /* Test if we are able to open the file. The server will not
1213 * be able to abort just for this problem later... */
1214 logfp = fopen(server.logfile,"a");
1215 if (logfp == NULL) {
1216 err = sdscatprintf(sdsempty(),
1217 "Can't open the log file: %s", strerror(errno));
1218 goto loaderr;
1219 }
1220 fclose(logfp);
1221 }
1222 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1223 server.dbnum = atoi(argv[1]);
1224 if (server.dbnum < 1) {
1225 err = "Invalid number of databases"; goto loaderr;
1226 }
1227 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1228 server.maxclients = atoi(argv[1]);
1229 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1230 server.maxmemory = strtoll(argv[1], NULL, 10);
1231 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1232 server.masterhost = sdsnew(argv[1]);
1233 server.masterport = atoi(argv[2]);
1234 server.replstate = REDIS_REPL_CONNECT;
1235 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1236 server.masterauth = zstrdup(argv[1]);
1237 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1238 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1239 err = "argument must be 'yes' or 'no'"; goto loaderr;
1240 }
1241 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1242 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1243 err = "argument must be 'yes' or 'no'"; goto loaderr;
1244 }
1245 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1246 server.sharingpoolsize = atoi(argv[1]);
1247 if (server.sharingpoolsize < 1) {
1248 err = "invalid object sharing pool size"; goto loaderr;
1249 }
1250 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1251 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1252 err = "argument must be 'yes' or 'no'"; goto loaderr;
1253 }
1254 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1255 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1256 err = "argument must be 'yes' or 'no'"; goto loaderr;
1257 }
1258 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1259 if (!strcasecmp(argv[1],"no")) {
1260 server.appendfsync = APPENDFSYNC_NO;
1261 } else if (!strcasecmp(argv[1],"always")) {
1262 server.appendfsync = APPENDFSYNC_ALWAYS;
1263 } else if (!strcasecmp(argv[1],"everysec")) {
1264 server.appendfsync = APPENDFSYNC_EVERYSEC;
1265 } else {
1266 err = "argument must be 'no', 'always' or 'everysec'";
1267 goto loaderr;
1268 }
1269 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1270 server.requirepass = zstrdup(argv[1]);
1271 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1272 server.pidfile = zstrdup(argv[1]);
1273 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1274 server.dbfilename = zstrdup(argv[1]);
1275 } else {
1276 err = "Bad directive or wrong number of arguments"; goto loaderr;
1277 }
1278 for (j = 0; j < argc; j++)
1279 sdsfree(argv[j]);
1280 zfree(argv);
1281 sdsfree(line);
1282 }
1283 if (fp != stdin) fclose(fp);
1284 return;
1285
1286 loaderr:
1287 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1288 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1289 fprintf(stderr, ">>> '%s'\n", line);
1290 fprintf(stderr, "%s\n", err);
1291 exit(1);
1292 }
1293
1294 static void freeClientArgv(redisClient *c) {
1295 int j;
1296
1297 for (j = 0; j < c->argc; j++)
1298 decrRefCount(c->argv[j]);
1299 for (j = 0; j < c->mbargc; j++)
1300 decrRefCount(c->mbargv[j]);
1301 c->argc = 0;
1302 c->mbargc = 0;
1303 }
1304
1305 static void freeClient(redisClient *c) {
1306 listNode *ln;
1307
1308 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1309 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1310 sdsfree(c->querybuf);
1311 listRelease(c->reply);
1312 freeClientArgv(c);
1313 close(c->fd);
1314 ln = listSearchKey(server.clients,c);
1315 assert(ln != NULL);
1316 listDelNode(server.clients,ln);
1317 if (c->flags & REDIS_SLAVE) {
1318 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1319 close(c->repldbfd);
1320 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1321 ln = listSearchKey(l,c);
1322 assert(ln != NULL);
1323 listDelNode(l,ln);
1324 }
1325 if (c->flags & REDIS_MASTER) {
1326 server.master = NULL;
1327 server.replstate = REDIS_REPL_CONNECT;
1328 }
1329 zfree(c->argv);
1330 zfree(c->mbargv);
1331 zfree(c);
1332 }
1333
1334 static void glueReplyBuffersIfNeeded(redisClient *c) {
1335 int totlen = 0;
1336 listNode *ln;
1337 robj *o;
1338
1339 listRewind(c->reply);
1340 while((ln = listYield(c->reply))) {
1341 o = ln->value;
1342 totlen += sdslen(o->ptr);
1343 /* This optimization makes more sense if we don't have to copy
1344 * too much data */
1345 if (totlen > 1024) return;
1346 }
1347 if (totlen > 0) {
1348 char buf[1024];
1349 int copylen = 0;
1350
1351 listRewind(c->reply);
1352 while((ln = listYield(c->reply))) {
1353 o = ln->value;
1354 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1355 copylen += sdslen(o->ptr);
1356 listDelNode(c->reply,ln);
1357 }
1358 /* Now the output buffer is empty, add the new single element */
1359 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1360 listAddNodeTail(c->reply,o);
1361 }
1362 }
1363
1364 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1365 redisClient *c = privdata;
1366 int nwritten = 0, totwritten = 0, objlen;
1367 robj *o;
1368 REDIS_NOTUSED(el);
1369 REDIS_NOTUSED(mask);
1370
1371 if (server.glueoutputbuf && listLength(c->reply) > 1)
1372 glueReplyBuffersIfNeeded(c);
1373 while(listLength(c->reply)) {
1374 o = listNodeValue(listFirst(c->reply));
1375 objlen = sdslen(o->ptr);
1376
1377 if (objlen == 0) {
1378 listDelNode(c->reply,listFirst(c->reply));
1379 continue;
1380 }
1381
1382 if (c->flags & REDIS_MASTER) {
1383 /* Don't reply to a master */
1384 nwritten = objlen - c->sentlen;
1385 } else {
1386 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1387 if (nwritten <= 0) break;
1388 }
1389 c->sentlen += nwritten;
1390 totwritten += nwritten;
1391 /* If we fully sent the object on head go to the next one */
1392 if (c->sentlen == objlen) {
1393 listDelNode(c->reply,listFirst(c->reply));
1394 c->sentlen = 0;
1395 }
1396 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1397 * bytes, in a single threaded server it's a good idea to server
1398 * other clients as well, even if a very large request comes from
1399 * super fast link that is always able to accept data (in real world
1400 * terms think to 'KEYS *' against the loopback interfae) */
1401 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1402 }
1403 if (nwritten == -1) {
1404 if (errno == EAGAIN) {
1405 nwritten = 0;
1406 } else {
1407 redisLog(REDIS_DEBUG,
1408 "Error writing to client: %s", strerror(errno));
1409 freeClient(c);
1410 return;
1411 }
1412 }
1413 if (totwritten > 0) c->lastinteraction = time(NULL);
1414 if (listLength(c->reply) == 0) {
1415 c->sentlen = 0;
1416 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1417 }
1418 }
1419
1420 static struct redisCommand *lookupCommand(char *name) {
1421 int j = 0;
1422 while(cmdTable[j].name != NULL) {
1423 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1424 j++;
1425 }
1426 return NULL;
1427 }
1428
1429 /* resetClient prepare the client to process the next command */
1430 static void resetClient(redisClient *c) {
1431 freeClientArgv(c);
1432 c->bulklen = -1;
1433 c->multibulk = 0;
1434 }
1435
1436 /* If this function gets called we already read a whole
1437 * command, argments are in the client argv/argc fields.
1438 * processCommand() execute the command or prepare the
1439 * server for a bulk read from the client.
1440 *
1441 * If 1 is returned the client is still alive and valid and
1442 * and other operations can be performed by the caller. Otherwise
1443 * if 0 is returned the client was destroied (i.e. after QUIT). */
1444 static int processCommand(redisClient *c) {
1445 struct redisCommand *cmd;
1446 long long dirty;
1447
1448 /* Free some memory if needed (maxmemory setting) */
1449 if (server.maxmemory) freeMemoryIfNeeded();
1450
1451 /* Handle the multi bulk command type. This is an alternative protocol
1452 * supported by Redis in order to receive commands that are composed of
1453 * multiple binary-safe "bulk" arguments. The latency of processing is
1454 * a bit higher but this allows things like multi-sets, so if this
1455 * protocol is used only for MSET and similar commands this is a big win. */
1456 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1457 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1458 if (c->multibulk <= 0) {
1459 resetClient(c);
1460 return 1;
1461 } else {
1462 decrRefCount(c->argv[c->argc-1]);
1463 c->argc--;
1464 return 1;
1465 }
1466 } else if (c->multibulk) {
1467 if (c->bulklen == -1) {
1468 if (((char*)c->argv[0]->ptr)[0] != '$') {
1469 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1470 resetClient(c);
1471 return 1;
1472 } else {
1473 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1474 decrRefCount(c->argv[0]);
1475 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1476 c->argc--;
1477 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1478 resetClient(c);
1479 return 1;
1480 }
1481 c->argc--;
1482 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1483 return 1;
1484 }
1485 } else {
1486 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1487 c->mbargv[c->mbargc] = c->argv[0];
1488 c->mbargc++;
1489 c->argc--;
1490 c->multibulk--;
1491 if (c->multibulk == 0) {
1492 robj **auxargv;
1493 int auxargc;
1494
1495 /* Here we need to swap the multi-bulk argc/argv with the
1496 * normal argc/argv of the client structure. */
1497 auxargv = c->argv;
1498 c->argv = c->mbargv;
1499 c->mbargv = auxargv;
1500
1501 auxargc = c->argc;
1502 c->argc = c->mbargc;
1503 c->mbargc = auxargc;
1504
1505 /* We need to set bulklen to something different than -1
1506 * in order for the code below to process the command without
1507 * to try to read the last argument of a bulk command as
1508 * a special argument. */
1509 c->bulklen = 0;
1510 /* continue below and process the command */
1511 } else {
1512 c->bulklen = -1;
1513 return 1;
1514 }
1515 }
1516 }
1517 /* -- end of multi bulk commands processing -- */
1518
1519 /* The QUIT command is handled as a special case. Normal command
1520 * procs are unable to close the client connection safely */
1521 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1522 freeClient(c);
1523 return 0;
1524 }
1525 cmd = lookupCommand(c->argv[0]->ptr);
1526 if (!cmd) {
1527 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1528 resetClient(c);
1529 return 1;
1530 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1531 (c->argc < -cmd->arity)) {
1532 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1533 resetClient(c);
1534 return 1;
1535 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1536 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1537 resetClient(c);
1538 return 1;
1539 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1540 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1541
1542 decrRefCount(c->argv[c->argc-1]);
1543 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1544 c->argc--;
1545 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1546 resetClient(c);
1547 return 1;
1548 }
1549 c->argc--;
1550 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1551 /* It is possible that the bulk read is already in the
1552 * buffer. Check this condition and handle it accordingly.
1553 * This is just a fast path, alternative to call processInputBuffer().
1554 * It's a good idea since the code is small and this condition
1555 * happens most of the times. */
1556 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1557 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1558 c->argc++;
1559 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1560 } else {
1561 return 1;
1562 }
1563 }
1564 /* Let's try to share objects on the command arguments vector */
1565 if (server.shareobjects) {
1566 int j;
1567 for(j = 1; j < c->argc; j++)
1568 c->argv[j] = tryObjectSharing(c->argv[j]);
1569 }
1570 /* Let's try to encode the bulk object to save space. */
1571 if (cmd->flags & REDIS_CMD_BULK)
1572 tryObjectEncoding(c->argv[c->argc-1]);
1573
1574 /* Check if the user is authenticated */
1575 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1576 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1577 resetClient(c);
1578 return 1;
1579 }
1580
1581 /* Exec the command */
1582 dirty = server.dirty;
1583 cmd->proc(c);
1584 if (server.appendonly && server.dirty-dirty)
1585 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1586 if (server.dirty-dirty && listLength(server.slaves))
1587 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1588 if (listLength(server.monitors))
1589 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1590 server.stat_numcommands++;
1591
1592 /* Prepare the client for the next command */
1593 if (c->flags & REDIS_CLOSE) {
1594 freeClient(c);
1595 return 0;
1596 }
1597 resetClient(c);
1598 return 1;
1599 }
1600
1601 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1602 listNode *ln;
1603 int outc = 0, j;
1604 robj **outv;
1605 /* (args*2)+1 is enough room for args, spaces, newlines */
1606 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1607
1608 if (argc <= REDIS_STATIC_ARGS) {
1609 outv = static_outv;
1610 } else {
1611 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1612 }
1613
1614 for (j = 0; j < argc; j++) {
1615 if (j != 0) outv[outc++] = shared.space;
1616 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1617 robj *lenobj;
1618
1619 lenobj = createObject(REDIS_STRING,
1620 sdscatprintf(sdsempty(),"%d\r\n",
1621 stringObjectLen(argv[j])));
1622 lenobj->refcount = 0;
1623 outv[outc++] = lenobj;
1624 }
1625 outv[outc++] = argv[j];
1626 }
1627 outv[outc++] = shared.crlf;
1628
1629 /* Increment all the refcounts at start and decrement at end in order to
1630 * be sure to free objects if there is no slave in a replication state
1631 * able to be feed with commands */
1632 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1633 listRewind(slaves);
1634 while((ln = listYield(slaves))) {
1635 redisClient *slave = ln->value;
1636
1637 /* Don't feed slaves that are still waiting for BGSAVE to start */
1638 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1639
1640 /* Feed all the other slaves, MONITORs and so on */
1641 if (slave->slaveseldb != dictid) {
1642 robj *selectcmd;
1643
1644 switch(dictid) {
1645 case 0: selectcmd = shared.select0; break;
1646 case 1: selectcmd = shared.select1; break;
1647 case 2: selectcmd = shared.select2; break;
1648 case 3: selectcmd = shared.select3; break;
1649 case 4: selectcmd = shared.select4; break;
1650 case 5: selectcmd = shared.select5; break;
1651 case 6: selectcmd = shared.select6; break;
1652 case 7: selectcmd = shared.select7; break;
1653 case 8: selectcmd = shared.select8; break;
1654 case 9: selectcmd = shared.select9; break;
1655 default:
1656 selectcmd = createObject(REDIS_STRING,
1657 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1658 selectcmd->refcount = 0;
1659 break;
1660 }
1661 addReply(slave,selectcmd);
1662 slave->slaveseldb = dictid;
1663 }
1664 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1665 }
1666 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1667 if (outv != static_outv) zfree(outv);
1668 }
1669
1670 static void processInputBuffer(redisClient *c) {
1671 again:
1672 if (c->bulklen == -1) {
1673 /* Read the first line of the query */
1674 char *p = strchr(c->querybuf,'\n');
1675 size_t querylen;
1676
1677 if (p) {
1678 sds query, *argv;
1679 int argc, j;
1680
1681 query = c->querybuf;
1682 c->querybuf = sdsempty();
1683 querylen = 1+(p-(query));
1684 if (sdslen(query) > querylen) {
1685 /* leave data after the first line of the query in the buffer */
1686 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1687 }
1688 *p = '\0'; /* remove "\n" */
1689 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1690 sdsupdatelen(query);
1691
1692 /* Now we can split the query in arguments */
1693 if (sdslen(query) == 0) {
1694 /* Ignore empty query */
1695 sdsfree(query);
1696 return;
1697 }
1698 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1699 sdsfree(query);
1700
1701 if (c->argv) zfree(c->argv);
1702 c->argv = zmalloc(sizeof(robj*)*argc);
1703
1704 for (j = 0; j < argc; j++) {
1705 if (sdslen(argv[j])) {
1706 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1707 c->argc++;
1708 } else {
1709 sdsfree(argv[j]);
1710 }
1711 }
1712 zfree(argv);
1713 /* Execute the command. If the client is still valid
1714 * after processCommand() return and there is something
1715 * on the query buffer try to process the next command. */
1716 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1717 return;
1718 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1719 redisLog(REDIS_DEBUG, "Client protocol error");
1720 freeClient(c);
1721 return;
1722 }
1723 } else {
1724 /* Bulk read handling. Note that if we are at this point
1725 the client already sent a command terminated with a newline,
1726 we are reading the bulk data that is actually the last
1727 argument of the command. */
1728 int qbl = sdslen(c->querybuf);
1729
1730 if (c->bulklen <= qbl) {
1731 /* Copy everything but the final CRLF as final argument */
1732 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1733 c->argc++;
1734 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1735 /* Process the command. If the client is still valid after
1736 * the processing and there is more data in the buffer
1737 * try to parse it. */
1738 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1739 return;
1740 }
1741 }
1742 }
1743
1744 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1745 redisClient *c = (redisClient*) privdata;
1746 char buf[REDIS_IOBUF_LEN];
1747 int nread;
1748 REDIS_NOTUSED(el);
1749 REDIS_NOTUSED(mask);
1750
1751 nread = read(fd, buf, REDIS_IOBUF_LEN);
1752 if (nread == -1) {
1753 if (errno == EAGAIN) {
1754 nread = 0;
1755 } else {
1756 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1757 freeClient(c);
1758 return;
1759 }
1760 } else if (nread == 0) {
1761 redisLog(REDIS_DEBUG, "Client closed connection");
1762 freeClient(c);
1763 return;
1764 }
1765 if (nread) {
1766 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1767 c->lastinteraction = time(NULL);
1768 } else {
1769 return;
1770 }
1771 processInputBuffer(c);
1772 }
1773
1774 static int selectDb(redisClient *c, int id) {
1775 if (id < 0 || id >= server.dbnum)
1776 return REDIS_ERR;
1777 c->db = &server.db[id];
1778 return REDIS_OK;
1779 }
1780
1781 static void *dupClientReplyValue(void *o) {
1782 incrRefCount((robj*)o);
1783 return 0;
1784 }
1785
1786 static redisClient *createClient(int fd) {
1787 redisClient *c = zmalloc(sizeof(*c));
1788
1789 anetNonBlock(NULL,fd);
1790 anetTcpNoDelay(NULL,fd);
1791 if (!c) return NULL;
1792 selectDb(c,0);
1793 c->fd = fd;
1794 c->querybuf = sdsempty();
1795 c->argc = 0;
1796 c->argv = NULL;
1797 c->bulklen = -1;
1798 c->multibulk = 0;
1799 c->mbargc = 0;
1800 c->mbargv = NULL;
1801 c->sentlen = 0;
1802 c->flags = 0;
1803 c->lastinteraction = time(NULL);
1804 c->authenticated = 0;
1805 c->replstate = REDIS_REPL_NONE;
1806 c->reply = listCreate();
1807 listSetFreeMethod(c->reply,decrRefCount);
1808 listSetDupMethod(c->reply,dupClientReplyValue);
1809 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1810 readQueryFromClient, c, NULL) == AE_ERR) {
1811 freeClient(c);
1812 return NULL;
1813 }
1814 listAddNodeTail(server.clients,c);
1815 return c;
1816 }
1817
1818 static void addReply(redisClient *c, robj *obj) {
1819 if (listLength(c->reply) == 0 &&
1820 (c->replstate == REDIS_REPL_NONE ||
1821 c->replstate == REDIS_REPL_ONLINE) &&
1822 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1823 sendReplyToClient, c, NULL) == AE_ERR) return;
1824 if (obj->encoding != REDIS_ENCODING_RAW) {
1825 obj = getDecodedObject(obj);
1826 } else {
1827 incrRefCount(obj);
1828 }
1829 listAddNodeTail(c->reply,obj);
1830 }
1831
1832 static void addReplySds(redisClient *c, sds s) {
1833 robj *o = createObject(REDIS_STRING,s);
1834 addReply(c,o);
1835 decrRefCount(o);
1836 }
1837
1838 static void addReplyBulkLen(redisClient *c, robj *obj) {
1839 size_t len;
1840
1841 if (obj->encoding == REDIS_ENCODING_RAW) {
1842 len = sdslen(obj->ptr);
1843 } else {
1844 long n = (long)obj->ptr;
1845
1846 len = 1;
1847 if (n < 0) {
1848 len++;
1849 n = -n;
1850 }
1851 while((n = n/10) != 0) {
1852 len++;
1853 }
1854 }
1855 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1856 }
1857
1858 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1859 int cport, cfd;
1860 char cip[128];
1861 redisClient *c;
1862 REDIS_NOTUSED(el);
1863 REDIS_NOTUSED(mask);
1864 REDIS_NOTUSED(privdata);
1865
1866 cfd = anetAccept(server.neterr, fd, cip, &cport);
1867 if (cfd == AE_ERR) {
1868 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1869 return;
1870 }
1871 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1872 if ((c = createClient(cfd)) == NULL) {
1873 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1874 close(cfd); /* May be already closed, just ingore errors */
1875 return;
1876 }
1877 /* If maxclient directive is set and this is one client more... close the
1878 * connection. Note that we create the client instead to check before
1879 * for this condition, since now the socket is already set in nonblocking
1880 * mode and we can send an error for free using the Kernel I/O */
1881 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1882 char *err = "-ERR max number of clients reached\r\n";
1883
1884 /* That's a best effort error message, don't check write errors */
1885 (void) write(c->fd,err,strlen(err));
1886 freeClient(c);
1887 return;
1888 }
1889 server.stat_numconnections++;
1890 }
1891
1892 /* ======================= Redis objects implementation ===================== */
1893
1894 static robj *createObject(int type, void *ptr) {
1895 robj *o;
1896
1897 if (listLength(server.objfreelist)) {
1898 listNode *head = listFirst(server.objfreelist);
1899 o = listNodeValue(head);
1900 listDelNode(server.objfreelist,head);
1901 } else {
1902 o = zmalloc(sizeof(*o));
1903 }
1904 o->type = type;
1905 o->encoding = REDIS_ENCODING_RAW;
1906 o->ptr = ptr;
1907 o->refcount = 1;
1908 return o;
1909 }
1910
1911 static robj *createStringObject(char *ptr, size_t len) {
1912 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1913 }
1914
1915 static robj *createListObject(void) {
1916 list *l = listCreate();
1917
1918 listSetFreeMethod(l,decrRefCount);
1919 return createObject(REDIS_LIST,l);
1920 }
1921
1922 static robj *createSetObject(void) {
1923 dict *d = dictCreate(&setDictType,NULL);
1924 return createObject(REDIS_SET,d);
1925 }
1926
1927 static robj *createZsetObject(void) {
1928 zset *zs = zmalloc(sizeof(*zs));
1929
1930 zs->dict = dictCreate(&zsetDictType,NULL);
1931 zs->zsl = zslCreate();
1932 return createObject(REDIS_ZSET,zs);
1933 }
1934
1935 static void freeStringObject(robj *o) {
1936 if (o->encoding == REDIS_ENCODING_RAW) {
1937 sdsfree(o->ptr);
1938 }
1939 }
1940
1941 static void freeListObject(robj *o) {
1942 listRelease((list*) o->ptr);
1943 }
1944
1945 static void freeSetObject(robj *o) {
1946 dictRelease((dict*) o->ptr);
1947 }
1948
1949 static void freeZsetObject(robj *o) {
1950 zset *zs = o->ptr;
1951
1952 dictRelease(zs->dict);
1953 zslFree(zs->zsl);
1954 zfree(zs);
1955 }
1956
1957 static void freeHashObject(robj *o) {
1958 dictRelease((dict*) o->ptr);
1959 }
1960
1961 static void incrRefCount(robj *o) {
1962 o->refcount++;
1963 #ifdef DEBUG_REFCOUNT
1964 if (o->type == REDIS_STRING)
1965 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1966 #endif
1967 }
1968
1969 static void decrRefCount(void *obj) {
1970 robj *o = obj;
1971
1972 #ifdef DEBUG_REFCOUNT
1973 if (o->type == REDIS_STRING)
1974 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1975 #endif
1976 if (--(o->refcount) == 0) {
1977 switch(o->type) {
1978 case REDIS_STRING: freeStringObject(o); break;
1979 case REDIS_LIST: freeListObject(o); break;
1980 case REDIS_SET: freeSetObject(o); break;
1981 case REDIS_ZSET: freeZsetObject(o); break;
1982 case REDIS_HASH: freeHashObject(o); break;
1983 default: assert(0 != 0); break;
1984 }
1985 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1986 !listAddNodeHead(server.objfreelist,o))
1987 zfree(o);
1988 }
1989 }
1990
1991 static robj *lookupKey(redisDb *db, robj *key) {
1992 dictEntry *de = dictFind(db->dict,key);
1993 return de ? dictGetEntryVal(de) : NULL;
1994 }
1995
1996 static robj *lookupKeyRead(redisDb *db, robj *key) {
1997 expireIfNeeded(db,key);
1998 return lookupKey(db,key);
1999 }
2000
2001 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2002 deleteIfVolatile(db,key);
2003 return lookupKey(db,key);
2004 }
2005
2006 static int deleteKey(redisDb *db, robj *key) {
2007 int retval;
2008
2009 /* We need to protect key from destruction: after the first dictDelete()
2010 * it may happen that 'key' is no longer valid if we don't increment
2011 * it's count. This may happen when we get the object reference directly
2012 * from the hash table with dictRandomKey() or dict iterators */
2013 incrRefCount(key);
2014 if (dictSize(db->expires)) dictDelete(db->expires,key);
2015 retval = dictDelete(db->dict,key);
2016 decrRefCount(key);
2017
2018 return retval == DICT_OK;
2019 }
2020
2021 /* Try to share an object against the shared objects pool */
2022 static robj *tryObjectSharing(robj *o) {
2023 struct dictEntry *de;
2024 unsigned long c;
2025
2026 if (o == NULL || server.shareobjects == 0) return o;
2027
2028 assert(o->type == REDIS_STRING);
2029 de = dictFind(server.sharingpool,o);
2030 if (de) {
2031 robj *shared = dictGetEntryKey(de);
2032
2033 c = ((unsigned long) dictGetEntryVal(de))+1;
2034 dictGetEntryVal(de) = (void*) c;
2035 incrRefCount(shared);
2036 decrRefCount(o);
2037 return shared;
2038 } else {
2039 /* Here we are using a stream algorihtm: Every time an object is
2040 * shared we increment its count, everytime there is a miss we
2041 * recrement the counter of a random object. If this object reaches
2042 * zero we remove the object and put the current object instead. */
2043 if (dictSize(server.sharingpool) >=
2044 server.sharingpoolsize) {
2045 de = dictGetRandomKey(server.sharingpool);
2046 assert(de != NULL);
2047 c = ((unsigned long) dictGetEntryVal(de))-1;
2048 dictGetEntryVal(de) = (void*) c;
2049 if (c == 0) {
2050 dictDelete(server.sharingpool,de->key);
2051 }
2052 } else {
2053 c = 0; /* If the pool is empty we want to add this object */
2054 }
2055 if (c == 0) {
2056 int retval;
2057
2058 retval = dictAdd(server.sharingpool,o,(void*)1);
2059 assert(retval == DICT_OK);
2060 incrRefCount(o);
2061 }
2062 return o;
2063 }
2064 }
2065
2066 /* Check if the nul-terminated string 's' can be represented by a long
2067 * (that is, is a number that fits into long without any other space or
2068 * character before or after the digits).
2069 *
2070 * If so, the function returns REDIS_OK and *longval is set to the value
2071 * of the number. Otherwise REDIS_ERR is returned */
2072 static int isStringRepresentableAsLong(sds s, long *longval) {
2073 char buf[32], *endptr;
2074 long value;
2075 int slen;
2076
2077 value = strtol(s, &endptr, 10);
2078 if (endptr[0] != '\0') return REDIS_ERR;
2079 slen = snprintf(buf,32,"%ld",value);
2080
2081 /* If the number converted back into a string is not identical
2082 * then it's not possible to encode the string as integer */
2083 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2084 if (longval) *longval = value;
2085 return REDIS_OK;
2086 }
2087
2088 /* Try to encode a string object in order to save space */
2089 static int tryObjectEncoding(robj *o) {
2090 long value;
2091 sds s = o->ptr;
2092
2093 if (o->encoding != REDIS_ENCODING_RAW)
2094 return REDIS_ERR; /* Already encoded */
2095
2096 /* It's not save to encode shared objects: shared objects can be shared
2097 * everywhere in the "object space" of Redis. Encoded objects can only
2098 * appear as "values" (and not, for instance, as keys) */
2099 if (o->refcount > 1) return REDIS_ERR;
2100
2101 /* Currently we try to encode only strings */
2102 assert(o->type == REDIS_STRING);
2103
2104 /* Check if we can represent this string as a long integer */
2105 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2106
2107 /* Ok, this object can be encoded */
2108 o->encoding = REDIS_ENCODING_INT;
2109 sdsfree(o->ptr);
2110 o->ptr = (void*) value;
2111 return REDIS_OK;
2112 }
2113
2114 /* Get a decoded version of an encoded object (returned as a new object) */
2115 static robj *getDecodedObject(const robj *o) {
2116 robj *dec;
2117
2118 assert(o->encoding != REDIS_ENCODING_RAW);
2119 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2120 char buf[32];
2121
2122 snprintf(buf,32,"%ld",(long)o->ptr);
2123 dec = createStringObject(buf,strlen(buf));
2124 return dec;
2125 } else {
2126 assert(1 != 1);
2127 }
2128 }
2129
2130 /* Compare two string objects via strcmp() or alike.
2131 * Note that the objects may be integer-encoded. In such a case we
2132 * use snprintf() to get a string representation of the numbers on the stack
2133 * and compare the strings, it's much faster than calling getDecodedObject(). */
2134 static int compareStringObjects(robj *a, robj *b) {
2135 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2136 char bufa[128], bufb[128], *astr, *bstr;
2137 int bothsds = 1;
2138
2139 if (a == b) return 0;
2140 if (a->encoding != REDIS_ENCODING_RAW) {
2141 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2142 astr = bufa;
2143 bothsds = 0;
2144 } else {
2145 astr = a->ptr;
2146 }
2147 if (b->encoding != REDIS_ENCODING_RAW) {
2148 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2149 bstr = bufb;
2150 bothsds = 0;
2151 } else {
2152 bstr = b->ptr;
2153 }
2154 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2155 }
2156
2157 static size_t stringObjectLen(robj *o) {
2158 assert(o->type == REDIS_STRING);
2159 if (o->encoding == REDIS_ENCODING_RAW) {
2160 return sdslen(o->ptr);
2161 } else {
2162 char buf[32];
2163
2164 return snprintf(buf,32,"%ld",(long)o->ptr);
2165 }
2166 }
2167
2168 /*============================ DB saving/loading ============================ */
2169
2170 static int rdbSaveType(FILE *fp, unsigned char type) {
2171 if (fwrite(&type,1,1,fp) == 0) return -1;
2172 return 0;
2173 }
2174
2175 static int rdbSaveTime(FILE *fp, time_t t) {
2176 int32_t t32 = (int32_t) t;
2177 if (fwrite(&t32,4,1,fp) == 0) return -1;
2178 return 0;
2179 }
2180
2181 /* check rdbLoadLen() comments for more info */
2182 static int rdbSaveLen(FILE *fp, uint32_t len) {
2183 unsigned char buf[2];
2184
2185 if (len < (1<<6)) {
2186 /* Save a 6 bit len */
2187 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2188 if (fwrite(buf,1,1,fp) == 0) return -1;
2189 } else if (len < (1<<14)) {
2190 /* Save a 14 bit len */
2191 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2192 buf[1] = len&0xFF;
2193 if (fwrite(buf,2,1,fp) == 0) return -1;
2194 } else {
2195 /* Save a 32 bit len */
2196 buf[0] = (REDIS_RDB_32BITLEN<<6);
2197 if (fwrite(buf,1,1,fp) == 0) return -1;
2198 len = htonl(len);
2199 if (fwrite(&len,4,1,fp) == 0) return -1;
2200 }
2201 return 0;
2202 }
2203
2204 /* String objects in the form "2391" "-100" without any space and with a
2205 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2206 * encoded as integers to save space */
2207 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2208 long long value;
2209 char *endptr, buf[32];
2210
2211 /* Check if it's possible to encode this value as a number */
2212 value = strtoll(s, &endptr, 10);
2213 if (endptr[0] != '\0') return 0;
2214 snprintf(buf,32,"%lld",value);
2215
2216 /* If the number converted back into a string is not identical
2217 * then it's not possible to encode the string as integer */
2218 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2219
2220 /* Finally check if it fits in our ranges */
2221 if (value >= -(1<<7) && value <= (1<<7)-1) {
2222 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2223 enc[1] = value&0xFF;
2224 return 2;
2225 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2226 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2227 enc[1] = value&0xFF;
2228 enc[2] = (value>>8)&0xFF;
2229 return 3;
2230 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2231 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2232 enc[1] = value&0xFF;
2233 enc[2] = (value>>8)&0xFF;
2234 enc[3] = (value>>16)&0xFF;
2235 enc[4] = (value>>24)&0xFF;
2236 return 5;
2237 } else {
2238 return 0;
2239 }
2240 }
2241
2242 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2243 unsigned int comprlen, outlen;
2244 unsigned char byte;
2245 void *out;
2246
2247 /* We require at least four bytes compression for this to be worth it */
2248 outlen = sdslen(obj->ptr)-4;
2249 if (outlen <= 0) return 0;
2250 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2251 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2252 if (comprlen == 0) {
2253 zfree(out);
2254 return 0;
2255 }
2256 /* Data compressed! Let's save it on disk */
2257 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2258 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2259 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2260 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2261 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2262 zfree(out);
2263 return comprlen;
2264
2265 writeerr:
2266 zfree(out);
2267 return -1;
2268 }
2269
2270 /* Save a string objet as [len][data] on disk. If the object is a string
2271 * representation of an integer value we try to safe it in a special form */
2272 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2273 size_t len;
2274 int enclen;
2275
2276 len = sdslen(obj->ptr);
2277
2278 /* Try integer encoding */
2279 if (len <= 11) {
2280 unsigned char buf[5];
2281 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2282 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2283 return 0;
2284 }
2285 }
2286
2287 /* Try LZF compression - under 20 bytes it's unable to compress even
2288 * aaaaaaaaaaaaaaaaaa so skip it */
2289 if (len > 20) {
2290 int retval;
2291
2292 retval = rdbSaveLzfStringObject(fp,obj);
2293 if (retval == -1) return -1;
2294 if (retval > 0) return 0;
2295 /* retval == 0 means data can't be compressed, save the old way */
2296 }
2297
2298 /* Store verbatim */
2299 if (rdbSaveLen(fp,len) == -1) return -1;
2300 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2301 return 0;
2302 }
2303
2304 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2305 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2306 int retval;
2307 robj *dec;
2308
2309 if (obj->encoding != REDIS_ENCODING_RAW) {
2310 dec = getDecodedObject(obj);
2311 retval = rdbSaveStringObjectRaw(fp,dec);
2312 decrRefCount(dec);
2313 return retval;
2314 } else {
2315 return rdbSaveStringObjectRaw(fp,obj);
2316 }
2317 }
2318
2319 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2320 * 8 bit integer specifing the length of the representation.
2321 * This 8 bit integer has special values in order to specify the following
2322 * conditions:
2323 * 253: not a number
2324 * 254: + inf
2325 * 255: - inf
2326 */
2327 static int rdbSaveDoubleValue(FILE *fp, double val) {
2328 unsigned char buf[128];
2329 int len;
2330
2331 if (isnan(val)) {
2332 buf[0] = 253;
2333 len = 1;
2334 } else if (!isfinite(val)) {
2335 len = 1;
2336 buf[0] = (val < 0) ? 255 : 254;
2337 } else {
2338 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2339 buf[0] = strlen((char*)buf);
2340 len = buf[0]+1;
2341 }
2342 if (fwrite(buf,len,1,fp) == 0) return -1;
2343 return 0;
2344 }
2345
2346 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2347 static int rdbSave(char *filename) {
2348 dictIterator *di = NULL;
2349 dictEntry *de;
2350 FILE *fp;
2351 char tmpfile[256];
2352 int j;
2353 time_t now = time(NULL);
2354
2355 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2356 fp = fopen(tmpfile,"w");
2357 if (!fp) {
2358 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2359 return REDIS_ERR;
2360 }
2361 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2362 for (j = 0; j < server.dbnum; j++) {
2363 redisDb *db = server.db+j;
2364 dict *d = db->dict;
2365 if (dictSize(d) == 0) continue;
2366 di = dictGetIterator(d);
2367 if (!di) {
2368 fclose(fp);
2369 return REDIS_ERR;
2370 }
2371
2372 /* Write the SELECT DB opcode */
2373 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2374 if (rdbSaveLen(fp,j) == -1) goto werr;
2375
2376 /* Iterate this DB writing every entry */
2377 while((de = dictNext(di)) != NULL) {
2378 robj *key = dictGetEntryKey(de);
2379 robj *o = dictGetEntryVal(de);
2380 time_t expiretime = getExpire(db,key);
2381
2382 /* Save the expire time */
2383 if (expiretime != -1) {
2384 /* If this key is already expired skip it */
2385 if (expiretime < now) continue;
2386 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2387 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2388 }
2389 /* Save the key and associated value */
2390 if (rdbSaveType(fp,o->type) == -1) goto werr;
2391 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2392 if (o->type == REDIS_STRING) {
2393 /* Save a string value */
2394 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2395 } else if (o->type == REDIS_LIST) {
2396 /* Save a list value */
2397 list *list = o->ptr;
2398 listNode *ln;
2399
2400 listRewind(list);
2401 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2402 while((ln = listYield(list))) {
2403 robj *eleobj = listNodeValue(ln);
2404
2405 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2406 }
2407 } else if (o->type == REDIS_SET) {
2408 /* Save a set value */
2409 dict *set = o->ptr;
2410 dictIterator *di = dictGetIterator(set);
2411 dictEntry *de;
2412
2413 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2414 while((de = dictNext(di)) != NULL) {
2415 robj *eleobj = dictGetEntryKey(de);
2416
2417 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2418 }
2419 dictReleaseIterator(di);
2420 } else if (o->type == REDIS_ZSET) {
2421 /* Save a set value */
2422 zset *zs = o->ptr;
2423 dictIterator *di = dictGetIterator(zs->dict);
2424 dictEntry *de;
2425
2426 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2427 while((de = dictNext(di)) != NULL) {
2428 robj *eleobj = dictGetEntryKey(de);
2429 double *score = dictGetEntryVal(de);
2430
2431 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2432 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2433 }
2434 dictReleaseIterator(di);
2435 } else {
2436 assert(0 != 0);
2437 }
2438 }
2439 dictReleaseIterator(di);
2440 }
2441 /* EOF opcode */
2442 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2443
2444 /* Make sure data will not remain on the OS's output buffers */
2445 fflush(fp);
2446 fsync(fileno(fp));
2447 fclose(fp);
2448
2449 /* Use RENAME to make sure the DB file is changed atomically only
2450 * if the generate DB file is ok. */
2451 if (rename(tmpfile,filename) == -1) {
2452 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2453 unlink(tmpfile);
2454 return REDIS_ERR;
2455 }
2456 redisLog(REDIS_NOTICE,"DB saved on disk");
2457 server.dirty = 0;
2458 server.lastsave = time(NULL);
2459 return REDIS_OK;
2460
2461 werr:
2462 fclose(fp);
2463 unlink(tmpfile);
2464 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2465 if (di) dictReleaseIterator(di);
2466 return REDIS_ERR;
2467 }
2468
2469 static int rdbSaveBackground(char *filename) {
2470 pid_t childpid;
2471
2472 if (server.bgsaveinprogress) return REDIS_ERR;
2473 if ((childpid = fork()) == 0) {
2474 /* Child */
2475 close(server.fd);
2476 if (rdbSave(filename) == REDIS_OK) {
2477 exit(0);
2478 } else {
2479 exit(1);
2480 }
2481 } else {
2482 /* Parent */
2483 if (childpid == -1) {
2484 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2485 strerror(errno));
2486 return REDIS_ERR;
2487 }
2488 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2489 server.bgsaveinprogress = 1;
2490 server.bgsavechildpid = childpid;
2491 return REDIS_OK;
2492 }
2493 return REDIS_OK; /* unreached */
2494 }
2495
2496 static void rdbRemoveTempFile(pid_t childpid) {
2497 char tmpfile[256];
2498
2499 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2500 unlink(tmpfile);
2501 }
2502
2503 static int rdbLoadType(FILE *fp) {
2504 unsigned char type;
2505 if (fread(&type,1,1,fp) == 0) return -1;
2506 return type;
2507 }
2508
2509 static time_t rdbLoadTime(FILE *fp) {
2510 int32_t t32;
2511 if (fread(&t32,4,1,fp) == 0) return -1;
2512 return (time_t) t32;
2513 }
2514
2515 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2516 * of this file for a description of how this are stored on disk.
2517 *
2518 * isencoded is set to 1 if the readed length is not actually a length but
2519 * an "encoding type", check the above comments for more info */
2520 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2521 unsigned char buf[2];
2522 uint32_t len;
2523
2524 if (isencoded) *isencoded = 0;
2525 if (rdbver == 0) {
2526 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2527 return ntohl(len);
2528 } else {
2529 int type;
2530
2531 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2532 type = (buf[0]&0xC0)>>6;
2533 if (type == REDIS_RDB_6BITLEN) {
2534 /* Read a 6 bit len */
2535 return buf[0]&0x3F;
2536 } else if (type == REDIS_RDB_ENCVAL) {
2537 /* Read a 6 bit len encoding type */
2538 if (isencoded) *isencoded = 1;
2539 return buf[0]&0x3F;
2540 } else if (type == REDIS_RDB_14BITLEN) {
2541 /* Read a 14 bit len */
2542 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2543 return ((buf[0]&0x3F)<<8)|buf[1];
2544 } else {
2545 /* Read a 32 bit len */
2546 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2547 return ntohl(len);
2548 }
2549 }
2550 }
2551
2552 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2553 unsigned char enc[4];
2554 long long val;
2555
2556 if (enctype == REDIS_RDB_ENC_INT8) {
2557 if (fread(enc,1,1,fp) == 0) return NULL;
2558 val = (signed char)enc[0];
2559 } else if (enctype == REDIS_RDB_ENC_INT16) {
2560 uint16_t v;
2561 if (fread(enc,2,1,fp) == 0) return NULL;
2562 v = enc[0]|(enc[1]<<8);
2563 val = (int16_t)v;
2564 } else if (enctype == REDIS_RDB_ENC_INT32) {
2565 uint32_t v;
2566 if (fread(enc,4,1,fp) == 0) return NULL;
2567 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2568 val = (int32_t)v;
2569 } else {
2570 val = 0; /* anti-warning */
2571 assert(0!=0);
2572 }
2573 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2574 }
2575
2576 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2577 unsigned int len, clen;
2578 unsigned char *c = NULL;
2579 sds val = NULL;
2580
2581 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2582 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2583 if ((c = zmalloc(clen)) == NULL) goto err;
2584 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2585 if (fread(c,clen,1,fp) == 0) goto err;
2586 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2587 zfree(c);
2588 return createObject(REDIS_STRING,val);
2589 err:
2590 zfree(c);
2591 sdsfree(val);
2592 return NULL;
2593 }
2594
2595 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2596 int isencoded;
2597 uint32_t len;
2598 sds val;
2599
2600 len = rdbLoadLen(fp,rdbver,&isencoded);
2601 if (isencoded) {
2602 switch(len) {
2603 case REDIS_RDB_ENC_INT8:
2604 case REDIS_RDB_ENC_INT16:
2605 case REDIS_RDB_ENC_INT32:
2606 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2607 case REDIS_RDB_ENC_LZF:
2608 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2609 default:
2610 assert(0!=0);
2611 }
2612 }
2613
2614 if (len == REDIS_RDB_LENERR) return NULL;
2615 val = sdsnewlen(NULL,len);
2616 if (len && fread(val,len,1,fp) == 0) {
2617 sdsfree(val);
2618 return NULL;
2619 }
2620 return tryObjectSharing(createObject(REDIS_STRING,val));
2621 }
2622
2623 /* For information about double serialization check rdbSaveDoubleValue() */
2624 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2625 char buf[128];
2626 unsigned char len;
2627
2628 if (fread(&len,1,1,fp) == 0) return -1;
2629 switch(len) {
2630 case 255: *val = R_NegInf; return 0;
2631 case 254: *val = R_PosInf; return 0;
2632 case 253: *val = R_Nan; return 0;
2633 default:
2634 if (fread(buf,len,1,fp) == 0) return -1;
2635 sscanf(buf, "%lg", val);
2636 return 0;
2637 }
2638 }
2639
2640 static int rdbLoad(char *filename) {
2641 FILE *fp;
2642 robj *keyobj = NULL;
2643 uint32_t dbid;
2644 int type, retval, rdbver;
2645 dict *d = server.db[0].dict;
2646 redisDb *db = server.db+0;
2647 char buf[1024];
2648 time_t expiretime = -1, now = time(NULL);
2649
2650 fp = fopen(filename,"r");
2651 if (!fp) return REDIS_ERR;
2652 if (fread(buf,9,1,fp) == 0) goto eoferr;
2653 buf[9] = '\0';
2654 if (memcmp(buf,"REDIS",5) != 0) {
2655 fclose(fp);
2656 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2657 return REDIS_ERR;
2658 }
2659 rdbver = atoi(buf+5);
2660 if (rdbver > 1) {
2661 fclose(fp);
2662 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2663 return REDIS_ERR;
2664 }
2665 while(1) {
2666 robj *o;
2667
2668 /* Read type. */
2669 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2670 if (type == REDIS_EXPIRETIME) {
2671 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2672 /* We read the time so we need to read the object type again */
2673 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2674 }
2675 if (type == REDIS_EOF) break;
2676 /* Handle SELECT DB opcode as a special case */
2677 if (type == REDIS_SELECTDB) {
2678 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2679 goto eoferr;
2680 if (dbid >= (unsigned)server.dbnum) {
2681 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2682 exit(1);
2683 }
2684 db = server.db+dbid;
2685 d = db->dict;
2686 continue;
2687 }
2688 /* Read key */
2689 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2690
2691 if (type == REDIS_STRING) {
2692 /* Read string value */
2693 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2694 tryObjectEncoding(o);
2695 } else if (type == REDIS_LIST || type == REDIS_SET) {
2696 /* Read list/set value */
2697 uint32_t listlen;
2698
2699 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2700 goto eoferr;
2701 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2702 /* Load every single element of the list/set */
2703 while(listlen--) {
2704 robj *ele;
2705
2706 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2707 tryObjectEncoding(ele);
2708 if (type == REDIS_LIST) {
2709 listAddNodeTail((list*)o->ptr,ele);
2710 } else {
2711 dictAdd((dict*)o->ptr,ele,NULL);
2712 }
2713 }
2714 } else if (type == REDIS_ZSET) {
2715 /* Read list/set value */
2716 uint32_t zsetlen;
2717 zset *zs;
2718
2719 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2720 goto eoferr;
2721 o = createZsetObject();
2722 zs = o->ptr;
2723 /* Load every single element of the list/set */
2724 while(zsetlen--) {
2725 robj *ele;
2726 double *score = zmalloc(sizeof(double));
2727
2728 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2729 tryObjectEncoding(ele);
2730 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2731 dictAdd(zs->dict,ele,score);
2732 zslInsert(zs->zsl,*score,ele);
2733 incrRefCount(ele); /* added to skiplist */
2734 }
2735 } else {
2736 assert(0 != 0);
2737 }
2738 /* Add the new object in the hash table */
2739 retval = dictAdd(d,keyobj,o);
2740 if (retval == DICT_ERR) {
2741 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2742 exit(1);
2743 }
2744 /* Set the expire time if needed */
2745 if (expiretime != -1) {
2746 setExpire(db,keyobj,expiretime);
2747 /* Delete this key if already expired */
2748 if (expiretime < now) deleteKey(db,keyobj);
2749 expiretime = -1;
2750 }
2751 keyobj = o = NULL;
2752 }
2753 fclose(fp);
2754 return REDIS_OK;
2755
2756 eoferr: /* unexpected end of file is handled here with a fatal exit */
2757 if (keyobj) decrRefCount(keyobj);
2758 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2759 exit(1);
2760 return REDIS_ERR; /* Just to avoid warning */
2761 }
2762
2763 /*================================== Commands =============================== */
2764
2765 static void authCommand(redisClient *c) {
2766 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2767 c->authenticated = 1;
2768 addReply(c,shared.ok);
2769 } else {
2770 c->authenticated = 0;
2771 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2772 }
2773 }
2774
2775 static void pingCommand(redisClient *c) {
2776 addReply(c,shared.pong);
2777 }
2778
2779 static void echoCommand(redisClient *c) {
2780 addReplyBulkLen(c,c->argv[1]);
2781 addReply(c,c->argv[1]);
2782 addReply(c,shared.crlf);
2783 }
2784
2785 /*=================================== Strings =============================== */
2786
2787 static void setGenericCommand(redisClient *c, int nx) {
2788 int retval;
2789
2790 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2791 if (retval == DICT_ERR) {
2792 if (!nx) {
2793 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2794 incrRefCount(c->argv[2]);
2795 } else {
2796 addReply(c,shared.czero);
2797 return;
2798 }
2799 } else {
2800 incrRefCount(c->argv[1]);
2801 incrRefCount(c->argv[2]);
2802 }
2803 server.dirty++;
2804 removeExpire(c->db,c->argv[1]);
2805 addReply(c, nx ? shared.cone : shared.ok);
2806 }
2807
2808 static void setCommand(redisClient *c) {
2809 setGenericCommand(c,0);
2810 }
2811
2812 static void setnxCommand(redisClient *c) {
2813 setGenericCommand(c,1);
2814 }
2815
2816 static void getCommand(redisClient *c) {
2817 robj *o = lookupKeyRead(c->db,c->argv[1]);
2818
2819 if (o == NULL) {
2820 addReply(c,shared.nullbulk);
2821 } else {
2822 if (o->type != REDIS_STRING) {
2823 addReply(c,shared.wrongtypeerr);
2824 } else {
2825 addReplyBulkLen(c,o);
2826 addReply(c,o);
2827 addReply(c,shared.crlf);
2828 }
2829 }
2830 }
2831
2832 static void getsetCommand(redisClient *c) {
2833 getCommand(c);
2834 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2835 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2836 } else {
2837 incrRefCount(c->argv[1]);
2838 }
2839 incrRefCount(c->argv[2]);
2840 server.dirty++;
2841 removeExpire(c->db,c->argv[1]);
2842 }
2843
2844 static void mgetCommand(redisClient *c) {
2845 int j;
2846
2847 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2848 for (j = 1; j < c->argc; j++) {
2849 robj *o = lookupKeyRead(c->db,c->argv[j]);
2850 if (o == NULL) {
2851 addReply(c,shared.nullbulk);
2852 } else {
2853 if (o->type != REDIS_STRING) {
2854 addReply(c,shared.nullbulk);
2855 } else {
2856 addReplyBulkLen(c,o);
2857 addReply(c,o);
2858 addReply(c,shared.crlf);
2859 }
2860 }
2861 }
2862 }
2863
2864 static void incrDecrCommand(redisClient *c, long long incr) {
2865 long long value;
2866 int retval;
2867 robj *o;
2868
2869 o = lookupKeyWrite(c->db,c->argv[1]);
2870 if (o == NULL) {
2871 value = 0;
2872 } else {
2873 if (o->type != REDIS_STRING) {
2874 value = 0;
2875 } else {
2876 char *eptr;
2877
2878 if (o->encoding == REDIS_ENCODING_RAW)
2879 value = strtoll(o->ptr, &eptr, 10);
2880 else if (o->encoding == REDIS_ENCODING_INT)
2881 value = (long)o->ptr;
2882 else
2883 assert(1 != 1);
2884 }
2885 }
2886
2887 value += incr;
2888 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2889 tryObjectEncoding(o);
2890 retval = dictAdd(c->db->dict,c->argv[1],o);
2891 if (retval == DICT_ERR) {
2892 dictReplace(c->db->dict,c->argv[1],o);
2893 removeExpire(c->db,c->argv[1]);
2894 } else {
2895 incrRefCount(c->argv[1]);
2896 }
2897 server.dirty++;
2898 addReply(c,shared.colon);
2899 addReply(c,o);
2900 addReply(c,shared.crlf);
2901 }
2902
2903 static void incrCommand(redisClient *c) {
2904 incrDecrCommand(c,1);
2905 }
2906
2907 static void decrCommand(redisClient *c) {
2908 incrDecrCommand(c,-1);
2909 }
2910
2911 static void incrbyCommand(redisClient *c) {
2912 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2913 incrDecrCommand(c,incr);
2914 }
2915
2916 static void decrbyCommand(redisClient *c) {
2917 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2918 incrDecrCommand(c,-incr);
2919 }
2920
2921 /* ========================= Type agnostic commands ========================= */
2922
2923 static void delCommand(redisClient *c) {
2924 int deleted = 0, j;
2925
2926 for (j = 1; j < c->argc; j++) {
2927 if (deleteKey(c->db,c->argv[j])) {
2928 server.dirty++;
2929 deleted++;
2930 }
2931 }
2932 switch(deleted) {
2933 case 0:
2934 addReply(c,shared.czero);
2935 break;
2936 case 1:
2937 addReply(c,shared.cone);
2938 break;
2939 default:
2940 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2941 break;
2942 }
2943 }
2944
2945 static void existsCommand(redisClient *c) {
2946 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2947 }
2948
2949 static void selectCommand(redisClient *c) {
2950 int id = atoi(c->argv[1]->ptr);
2951
2952 if (selectDb(c,id) == REDIS_ERR) {
2953 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2954 } else {
2955 addReply(c,shared.ok);
2956 }
2957 }
2958
2959 static void randomkeyCommand(redisClient *c) {
2960 dictEntry *de;
2961
2962 while(1) {
2963 de = dictGetRandomKey(c->db->dict);
2964 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2965 }
2966 if (de == NULL) {
2967 addReply(c,shared.plus);
2968 addReply(c,shared.crlf);
2969 } else {
2970 addReply(c,shared.plus);
2971 addReply(c,dictGetEntryKey(de));
2972 addReply(c,shared.crlf);
2973 }
2974 }
2975
2976 static void keysCommand(redisClient *c) {
2977 dictIterator *di;
2978 dictEntry *de;
2979 sds pattern = c->argv[1]->ptr;
2980 int plen = sdslen(pattern);
2981 int numkeys = 0, keyslen = 0;
2982 robj *lenobj = createObject(REDIS_STRING,NULL);
2983
2984 di = dictGetIterator(c->db->dict);
2985 addReply(c,lenobj);
2986 decrRefCount(lenobj);
2987 while((de = dictNext(di)) != NULL) {
2988 robj *keyobj = dictGetEntryKey(de);
2989
2990 sds key = keyobj->ptr;
2991 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2992 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2993 if (expireIfNeeded(c->db,keyobj) == 0) {
2994 if (numkeys != 0)
2995 addReply(c,shared.space);
2996 addReply(c,keyobj);
2997 numkeys++;
2998 keyslen += sdslen(key);
2999 }
3000 }
3001 }
3002 dictReleaseIterator(di);
3003 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3004 addReply(c,shared.crlf);
3005 }
3006
3007 static void dbsizeCommand(redisClient *c) {
3008 addReplySds(c,
3009 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3010 }
3011
3012 static void lastsaveCommand(redisClient *c) {
3013 addReplySds(c,
3014 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3015 }
3016
3017 static void typeCommand(redisClient *c) {
3018 robj *o;
3019 char *type;
3020
3021 o = lookupKeyRead(c->db,c->argv[1]);
3022 if (o == NULL) {
3023 type = "+none";
3024 } else {
3025 switch(o->type) {
3026 case REDIS_STRING: type = "+string"; break;
3027 case REDIS_LIST: type = "+list"; break;
3028 case REDIS_SET: type = "+set"; break;
3029 case REDIS_ZSET: type = "+zset"; break;
3030 default: type = "unknown"; break;
3031 }
3032 }
3033 addReplySds(c,sdsnew(type));
3034 addReply(c,shared.crlf);
3035 }
3036
3037 static void saveCommand(redisClient *c) {
3038 if (server.bgsaveinprogress) {
3039 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3040 return;
3041 }
3042 if (rdbSave(server.dbfilename) == REDIS_OK) {
3043 addReply(c,shared.ok);
3044 } else {
3045 addReply(c,shared.err);
3046 }
3047 }
3048
3049 static void bgsaveCommand(redisClient *c) {
3050 if (server.bgsaveinprogress) {
3051 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3052 return;
3053 }
3054 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3055 addReply(c,shared.ok);
3056 } else {
3057 addReply(c,shared.err);
3058 }
3059 }
3060
3061 static void shutdownCommand(redisClient *c) {
3062 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3063 /* Kill the saving child if there is a background saving in progress.
3064 We want to avoid race conditions, for instance our saving child may
3065 overwrite the synchronous saving did by SHUTDOWN. */
3066 if (server.bgsaveinprogress) {
3067 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3068 kill(server.bgsavechildpid,SIGKILL);
3069 rdbRemoveTempFile(server.bgsavechildpid);
3070 }
3071 /* SYNC SAVE */
3072 if (rdbSave(server.dbfilename) == REDIS_OK) {
3073 if (server.daemonize)
3074 unlink(server.pidfile);
3075 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3076 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3077 exit(1);
3078 } else {
3079 /* Ooops.. error saving! The best we can do is to continue operating.
3080 * Note that if there was a background saving process, in the next
3081 * cron() Redis will be notified that the background saving aborted,
3082 * handling special stuff like slaves pending for synchronization... */
3083 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3084 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3085 }
3086 }
3087
3088 static void renameGenericCommand(redisClient *c, int nx) {
3089 robj *o;
3090
3091 /* To use the same key as src and dst is probably an error */
3092 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3093 addReply(c,shared.sameobjecterr);
3094 return;
3095 }
3096
3097 o = lookupKeyWrite(c->db,c->argv[1]);
3098 if (o == NULL) {
3099 addReply(c,shared.nokeyerr);
3100 return;
3101 }
3102 incrRefCount(o);
3103 deleteIfVolatile(c->db,c->argv[2]);
3104 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3105 if (nx) {
3106 decrRefCount(o);
3107 addReply(c,shared.czero);
3108 return;
3109 }
3110 dictReplace(c->db->dict,c->argv[2],o);
3111 } else {
3112 incrRefCount(c->argv[2]);
3113 }
3114 deleteKey(c->db,c->argv[1]);
3115 server.dirty++;
3116 addReply(c,nx ? shared.cone : shared.ok);
3117 }
3118
3119 static void renameCommand(redisClient *c) {
3120 renameGenericCommand(c,0);
3121 }
3122
3123 static void renamenxCommand(redisClient *c) {
3124 renameGenericCommand(c,1);
3125 }
3126
3127 static void moveCommand(redisClient *c) {
3128 robj *o;
3129 redisDb *src, *dst;
3130 int srcid;
3131
3132 /* Obtain source and target DB pointers */
3133 src = c->db;
3134 srcid = c->db->id;
3135 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3136 addReply(c,shared.outofrangeerr);
3137 return;
3138 }
3139 dst = c->db;
3140 selectDb(c,srcid); /* Back to the source DB */
3141
3142 /* If the user is moving using as target the same
3143 * DB as the source DB it is probably an error. */
3144 if (src == dst) {
3145 addReply(c,shared.sameobjecterr);
3146 return;
3147 }
3148
3149 /* Check if the element exists and get a reference */
3150 o = lookupKeyWrite(c->db,c->argv[1]);
3151 if (!o) {
3152 addReply(c,shared.czero);
3153 return;
3154 }
3155
3156 /* Try to add the element to the target DB */
3157 deleteIfVolatile(dst,c->argv[1]);
3158 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3159 addReply(c,shared.czero);
3160 return;
3161 }
3162 incrRefCount(c->argv[1]);
3163 incrRefCount(o);
3164
3165 /* OK! key moved, free the entry in the source DB */
3166 deleteKey(src,c->argv[1]);
3167 server.dirty++;
3168 addReply(c,shared.cone);
3169 }
3170
3171 /* =================================== Lists ================================ */
3172 static void pushGenericCommand(redisClient *c, int where) {
3173 robj *lobj;
3174 list *list;
3175
3176 lobj = lookupKeyWrite(c->db,c->argv[1]);
3177 if (lobj == NULL) {
3178 lobj = createListObject();
3179 list = lobj->ptr;
3180 if (where == REDIS_HEAD) {
3181 listAddNodeHead(list,c->argv[2]);
3182 } else {
3183 listAddNodeTail(list,c->argv[2]);
3184 }
3185 dictAdd(c->db->dict,c->argv[1],lobj);
3186 incrRefCount(c->argv[1]);
3187 incrRefCount(c->argv[2]);
3188 } else {
3189 if (lobj->type != REDIS_LIST) {
3190 addReply(c,shared.wrongtypeerr);
3191 return;
3192 }
3193 list = lobj->ptr;
3194 if (where == REDIS_HEAD) {
3195 listAddNodeHead(list,c->argv[2]);
3196 } else {
3197 listAddNodeTail(list,c->argv[2]);
3198 }
3199 incrRefCount(c->argv[2]);
3200 }
3201 server.dirty++;
3202 addReply(c,shared.ok);
3203 }
3204
3205 static void lpushCommand(redisClient *c) {
3206 pushGenericCommand(c,REDIS_HEAD);
3207 }
3208
3209 static void rpushCommand(redisClient *c) {
3210 pushGenericCommand(c,REDIS_TAIL);
3211 }
3212
3213 static void llenCommand(redisClient *c) {
3214 robj *o;
3215 list *l;
3216
3217 o = lookupKeyRead(c->db,c->argv[1]);
3218 if (o == NULL) {
3219 addReply(c,shared.czero);
3220 return;
3221 } else {
3222 if (o->type != REDIS_LIST) {
3223 addReply(c,shared.wrongtypeerr);
3224 } else {
3225 l = o->ptr;
3226 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3227 }
3228 }
3229 }
3230
3231 static void lindexCommand(redisClient *c) {
3232 robj *o;
3233 int index = atoi(c->argv[2]->ptr);
3234
3235 o = lookupKeyRead(c->db,c->argv[1]);
3236 if (o == NULL) {
3237 addReply(c,shared.nullbulk);
3238 } else {
3239 if (o->type != REDIS_LIST) {
3240 addReply(c,shared.wrongtypeerr);
3241 } else {
3242 list *list = o->ptr;
3243 listNode *ln;
3244
3245 ln = listIndex(list, index);
3246 if (ln == NULL) {
3247 addReply(c,shared.nullbulk);
3248 } else {
3249 robj *ele = listNodeValue(ln);
3250 addReplyBulkLen(c,ele);
3251 addReply(c,ele);
3252 addReply(c,shared.crlf);
3253 }
3254 }
3255 }
3256 }
3257
3258 static void lsetCommand(redisClient *c) {
3259 robj *o;
3260 int index = atoi(c->argv[2]->ptr);
3261
3262 o = lookupKeyWrite(c->db,c->argv[1]);
3263 if (o == NULL) {
3264 addReply(c,shared.nokeyerr);
3265 } else {
3266 if (o->type != REDIS_LIST) {
3267 addReply(c,shared.wrongtypeerr);
3268 } else {
3269 list *list = o->ptr;
3270 listNode *ln;
3271
3272 ln = listIndex(list, index);
3273 if (ln == NULL) {
3274 addReply(c,shared.outofrangeerr);
3275 } else {
3276 robj *ele = listNodeValue(ln);
3277
3278 decrRefCount(ele);
3279 listNodeValue(ln) = c->argv[3];
3280 incrRefCount(c->argv[3]);
3281 addReply(c,shared.ok);
3282 server.dirty++;
3283 }
3284 }
3285 }
3286 }
3287
3288 static void popGenericCommand(redisClient *c, int where) {
3289 robj *o;
3290
3291 o = lookupKeyWrite(c->db,c->argv[1]);
3292 if (o == NULL) {
3293 addReply(c,shared.nullbulk);
3294 } else {
3295 if (o->type != REDIS_LIST) {
3296 addReply(c,shared.wrongtypeerr);
3297 } else {
3298 list *list = o->ptr;
3299 listNode *ln;
3300
3301 if (where == REDIS_HEAD)
3302 ln = listFirst(list);
3303 else
3304 ln = listLast(list);
3305
3306 if (ln == NULL) {
3307 addReply(c,shared.nullbulk);
3308 } else {
3309 robj *ele = listNodeValue(ln);
3310 addReplyBulkLen(c,ele);
3311 addReply(c,ele);
3312 addReply(c,shared.crlf);
3313 listDelNode(list,ln);
3314 server.dirty++;
3315 }
3316 }
3317 }
3318 }
3319
3320 static void lpopCommand(redisClient *c) {
3321 popGenericCommand(c,REDIS_HEAD);
3322 }
3323
3324 static void rpopCommand(redisClient *c) {
3325 popGenericCommand(c,REDIS_TAIL);
3326 }
3327
3328 static void lrangeCommand(redisClient *c) {
3329 robj *o;
3330 int start = atoi(c->argv[2]->ptr);
3331 int end = atoi(c->argv[3]->ptr);
3332
3333 o = lookupKeyRead(c->db,c->argv[1]);
3334 if (o == NULL) {
3335 addReply(c,shared.nullmultibulk);
3336 } else {
3337 if (o->type != REDIS_LIST) {
3338 addReply(c,shared.wrongtypeerr);
3339 } else {
3340 list *list = o->ptr;
3341 listNode *ln;
3342 int llen = listLength(list);
3343 int rangelen, j;
3344 robj *ele;
3345
3346 /* convert negative indexes */
3347 if (start < 0) start = llen+start;
3348 if (end < 0) end = llen+end;
3349 if (start < 0) start = 0;
3350 if (end < 0) end = 0;
3351
3352 /* indexes sanity checks */
3353 if (start > end || start >= llen) {
3354 /* Out of range start or start > end result in empty list */
3355 addReply(c,shared.emptymultibulk);
3356 return;
3357 }
3358 if (end >= llen) end = llen-1;
3359 rangelen = (end-start)+1;
3360
3361 /* Return the result in form of a multi-bulk reply */
3362 ln = listIndex(list, start);
3363 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3364 for (j = 0; j < rangelen; j++) {
3365 ele = listNodeValue(ln);
3366 addReplyBulkLen(c,ele);
3367 addReply(c,ele);
3368 addReply(c,shared.crlf);
3369 ln = ln->next;
3370 }
3371 }
3372 }
3373 }
3374
3375 static void ltrimCommand(redisClient *c) {
3376 robj *o;
3377 int start = atoi(c->argv[2]->ptr);
3378 int end = atoi(c->argv[3]->ptr);
3379
3380 o = lookupKeyWrite(c->db,c->argv[1]);
3381 if (o == NULL) {
3382 addReply(c,shared.nokeyerr);
3383 } else {
3384 if (o->type != REDIS_LIST) {
3385 addReply(c,shared.wrongtypeerr);
3386 } else {
3387 list *list = o->ptr;
3388 listNode *ln;
3389 int llen = listLength(list);
3390 int j, ltrim, rtrim;
3391
3392 /* convert negative indexes */
3393 if (start < 0) start = llen+start;
3394 if (end < 0) end = llen+end;
3395 if (start < 0) start = 0;
3396 if (end < 0) end = 0;
3397
3398 /* indexes sanity checks */
3399 if (start > end || start >= llen) {
3400 /* Out of range start or start > end result in empty list */
3401 ltrim = llen;
3402 rtrim = 0;
3403 } else {
3404 if (end >= llen) end = llen-1;
3405 ltrim = start;
3406 rtrim = llen-end-1;
3407 }
3408
3409 /* Remove list elements to perform the trim */
3410 for (j = 0; j < ltrim; j++) {
3411 ln = listFirst(list);
3412 listDelNode(list,ln);
3413 }
3414 for (j = 0; j < rtrim; j++) {
3415 ln = listLast(list);
3416 listDelNode(list,ln);
3417 }
3418 server.dirty++;
3419 addReply(c,shared.ok);
3420 }
3421 }
3422 }
3423
3424 static void lremCommand(redisClient *c) {
3425 robj *o;
3426
3427 o = lookupKeyWrite(c->db,c->argv[1]);
3428 if (o == NULL) {
3429 addReply(c,shared.czero);
3430 } else {
3431 if (o->type != REDIS_LIST) {
3432 addReply(c,shared.wrongtypeerr);
3433 } else {
3434 list *list = o->ptr;
3435 listNode *ln, *next;
3436 int toremove = atoi(c->argv[2]->ptr);
3437 int removed = 0;
3438 int fromtail = 0;
3439
3440 if (toremove < 0) {
3441 toremove = -toremove;
3442 fromtail = 1;
3443 }
3444 ln = fromtail ? list->tail : list->head;
3445 while (ln) {
3446 robj *ele = listNodeValue(ln);
3447
3448 next = fromtail ? ln->prev : ln->next;
3449 if (compareStringObjects(ele,c->argv[3]) == 0) {
3450 listDelNode(list,ln);
3451 server.dirty++;
3452 removed++;
3453 if (toremove && removed == toremove) break;
3454 }
3455 ln = next;
3456 }
3457 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3458 }
3459 }
3460 }
3461
3462 /* ==================================== Sets ================================ */
3463
3464 static void saddCommand(redisClient *c) {
3465 robj *set;
3466
3467 set = lookupKeyWrite(c->db,c->argv[1]);
3468 if (set == NULL) {
3469 set = createSetObject();
3470 dictAdd(c->db->dict,c->argv[1],set);
3471 incrRefCount(c->argv[1]);
3472 } else {
3473 if (set->type != REDIS_SET) {
3474 addReply(c,shared.wrongtypeerr);
3475 return;
3476 }
3477 }
3478 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3479 incrRefCount(c->argv[2]);
3480 server.dirty++;
3481 addReply(c,shared.cone);
3482 } else {
3483 addReply(c,shared.czero);
3484 }
3485 }
3486
3487 static void sremCommand(redisClient *c) {
3488 robj *set;
3489
3490 set = lookupKeyWrite(c->db,c->argv[1]);
3491 if (set == NULL) {
3492 addReply(c,shared.czero);
3493 } else {
3494 if (set->type != REDIS_SET) {
3495 addReply(c,shared.wrongtypeerr);
3496 return;
3497 }
3498 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3499 server.dirty++;
3500 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3501 addReply(c,shared.cone);
3502 } else {
3503 addReply(c,shared.czero);
3504 }
3505 }
3506 }
3507
3508 static void smoveCommand(redisClient *c) {
3509 robj *srcset, *dstset;
3510
3511 srcset = lookupKeyWrite(c->db,c->argv[1]);
3512 dstset = lookupKeyWrite(c->db,c->argv[2]);
3513
3514 /* If the source key does not exist return 0, if it's of the wrong type
3515 * raise an error */
3516 if (srcset == NULL || srcset->type != REDIS_SET) {
3517 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3518 return;
3519 }
3520 /* Error if the destination key is not a set as well */
3521 if (dstset && dstset->type != REDIS_SET) {
3522 addReply(c,shared.wrongtypeerr);
3523 return;
3524 }
3525 /* Remove the element from the source set */
3526 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3527 /* Key not found in the src set! return zero */
3528 addReply(c,shared.czero);
3529 return;
3530 }
3531 server.dirty++;
3532 /* Add the element to the destination set */
3533 if (!dstset) {
3534 dstset = createSetObject();
3535 dictAdd(c->db->dict,c->argv[2],dstset);
3536 incrRefCount(c->argv[2]);
3537 }
3538 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3539 incrRefCount(c->argv[3]);
3540 addReply(c,shared.cone);
3541 }
3542
3543 static void sismemberCommand(redisClient *c) {
3544 robj *set;
3545
3546 set = lookupKeyRead(c->db,c->argv[1]);
3547 if (set == NULL) {
3548 addReply(c,shared.czero);
3549 } else {
3550 if (set->type != REDIS_SET) {
3551 addReply(c,shared.wrongtypeerr);
3552 return;
3553 }
3554 if (dictFind(set->ptr,c->argv[2]))
3555 addReply(c,shared.cone);
3556 else
3557 addReply(c,shared.czero);
3558 }
3559 }
3560
3561 static void scardCommand(redisClient *c) {
3562 robj *o;
3563 dict *s;
3564
3565 o = lookupKeyRead(c->db,c->argv[1]);
3566 if (o == NULL) {
3567 addReply(c,shared.czero);
3568 return;
3569 } else {
3570 if (o->type != REDIS_SET) {
3571 addReply(c,shared.wrongtypeerr);
3572 } else {
3573 s = o->ptr;
3574 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3575 dictSize(s)));
3576 }
3577 }
3578 }
3579
3580 static void spopCommand(redisClient *c) {
3581 robj *set;
3582 dictEntry *de;
3583
3584 set = lookupKeyWrite(c->db,c->argv[1]);
3585 if (set == NULL) {
3586 addReply(c,shared.nullbulk);
3587 } else {
3588 if (set->type != REDIS_SET) {
3589 addReply(c,shared.wrongtypeerr);
3590 return;
3591 }
3592 de = dictGetRandomKey(set->ptr);
3593 if (de == NULL) {
3594 addReply(c,shared.nullbulk);
3595 } else {
3596 robj *ele = dictGetEntryKey(de);
3597
3598 addReplyBulkLen(c,ele);
3599 addReply(c,ele);
3600 addReply(c,shared.crlf);
3601 dictDelete(set->ptr,ele);
3602 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3603 server.dirty++;
3604 }
3605 }
3606 }
3607
3608 static void srandmemberCommand(redisClient *c) {
3609 robj *set;
3610 dictEntry *de;
3611
3612 set = lookupKeyRead(c->db,c->argv[1]);
3613 if (set == NULL) {
3614 addReply(c,shared.nullbulk);
3615 } else {
3616 if (set->type != REDIS_SET) {
3617 addReply(c,shared.wrongtypeerr);
3618 return;
3619 }
3620 de = dictGetRandomKey(set->ptr);
3621 if (de == NULL) {
3622 addReply(c,shared.nullbulk);
3623 } else {
3624 robj *ele = dictGetEntryKey(de);
3625
3626 addReplyBulkLen(c,ele);
3627 addReply(c,ele);
3628 addReply(c,shared.crlf);
3629 }
3630 }
3631 }
3632
3633 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3634 dict **d1 = (void*) s1, **d2 = (void*) s2;
3635
3636 return dictSize(*d1)-dictSize(*d2);
3637 }
3638
3639 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3640 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3641 dictIterator *di;
3642 dictEntry *de;
3643 robj *lenobj = NULL, *dstset = NULL;
3644 int j, cardinality = 0;
3645
3646 for (j = 0; j < setsnum; j++) {
3647 robj *setobj;
3648
3649 setobj = dstkey ?
3650 lookupKeyWrite(c->db,setskeys[j]) :
3651 lookupKeyRead(c->db,setskeys[j]);
3652 if (!setobj) {
3653 zfree(dv);
3654 if (dstkey) {
3655 deleteKey(c->db,dstkey);
3656 addReply(c,shared.ok);
3657 } else {
3658 addReply(c,shared.nullmultibulk);
3659 }
3660 return;
3661 }
3662 if (setobj->type != REDIS_SET) {
3663 zfree(dv);
3664 addReply(c,shared.wrongtypeerr);
3665 return;
3666 }
3667 dv[j] = setobj->ptr;
3668 }
3669 /* Sort sets from the smallest to largest, this will improve our
3670 * algorithm's performace */
3671 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3672
3673 /* The first thing we should output is the total number of elements...
3674 * since this is a multi-bulk write, but at this stage we don't know
3675 * the intersection set size, so we use a trick, append an empty object
3676 * to the output list and save the pointer to later modify it with the
3677 * right length */
3678 if (!dstkey) {
3679 lenobj = createObject(REDIS_STRING,NULL);
3680 addReply(c,lenobj);
3681 decrRefCount(lenobj);
3682 } else {
3683 /* If we have a target key where to store the resulting set
3684 * create this key with an empty set inside */
3685 dstset = createSetObject();
3686 }
3687
3688 /* Iterate all the elements of the first (smallest) set, and test
3689 * the element against all the other sets, if at least one set does
3690 * not include the element it is discarded */
3691 di = dictGetIterator(dv[0]);
3692
3693 while((de = dictNext(di)) != NULL) {
3694 robj *ele;
3695
3696 for (j = 1; j < setsnum; j++)
3697 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3698 if (j != setsnum)
3699 continue; /* at least one set does not contain the member */
3700 ele = dictGetEntryKey(de);
3701 if (!dstkey) {
3702 addReplyBulkLen(c,ele);
3703 addReply(c,ele);
3704 addReply(c,shared.crlf);
3705 cardinality++;
3706 } else {
3707 dictAdd(dstset->ptr,ele,NULL);
3708 incrRefCount(ele);
3709 }
3710 }
3711 dictReleaseIterator(di);
3712
3713 if (dstkey) {
3714 /* Store the resulting set into the target */
3715 deleteKey(c->db,dstkey);
3716 dictAdd(c->db->dict,dstkey,dstset);
3717 incrRefCount(dstkey);
3718 }
3719
3720 if (!dstkey) {
3721 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3722 } else {
3723 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3724 dictSize((dict*)dstset->ptr)));
3725 server.dirty++;
3726 }
3727 zfree(dv);
3728 }
3729
3730 static void sinterCommand(redisClient *c) {
3731 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3732 }
3733
3734 static void sinterstoreCommand(redisClient *c) {
3735 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3736 }
3737
3738 #define REDIS_OP_UNION 0
3739 #define REDIS_OP_DIFF 1
3740
3741 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3742 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3743 dictIterator *di;
3744 dictEntry *de;
3745 robj *dstset = NULL;
3746 int j, cardinality = 0;
3747
3748 for (j = 0; j < setsnum; j++) {
3749 robj *setobj;
3750
3751 setobj = dstkey ?
3752 lookupKeyWrite(c->db,setskeys[j]) :
3753 lookupKeyRead(c->db,setskeys[j]);
3754 if (!setobj) {
3755 dv[j] = NULL;
3756 continue;
3757 }
3758 if (setobj->type != REDIS_SET) {
3759 zfree(dv);
3760 addReply(c,shared.wrongtypeerr);
3761 return;
3762 }
3763 dv[j] = setobj->ptr;
3764 }
3765
3766 /* We need a temp set object to store our union. If the dstkey
3767 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3768 * this set object will be the resulting object to set into the target key*/
3769 dstset = createSetObject();
3770
3771 /* Iterate all the elements of all the sets, add every element a single
3772 * time to the result set */
3773 for (j = 0; j < setsnum; j++) {
3774 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3775 if (!dv[j]) continue; /* non existing keys are like empty sets */
3776
3777 di = dictGetIterator(dv[j]);
3778
3779 while((de = dictNext(di)) != NULL) {
3780 robj *ele;
3781
3782 /* dictAdd will not add the same element multiple times */
3783 ele = dictGetEntryKey(de);
3784 if (op == REDIS_OP_UNION || j == 0) {
3785 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3786 incrRefCount(ele);
3787 cardinality++;
3788 }
3789 } else if (op == REDIS_OP_DIFF) {
3790 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3791 cardinality--;
3792 }
3793 }
3794 }
3795 dictReleaseIterator(di);
3796
3797 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3798 }
3799
3800 /* Output the content of the resulting set, if not in STORE mode */
3801 if (!dstkey) {
3802 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3803 di = dictGetIterator(dstset->ptr);
3804 while((de = dictNext(di)) != NULL) {
3805 robj *ele;
3806
3807 ele = dictGetEntryKey(de);
3808 addReplyBulkLen(c,ele);
3809 addReply(c,ele);
3810 addReply(c,shared.crlf);
3811 }
3812 dictReleaseIterator(di);
3813 } else {
3814 /* If we have a target key where to store the resulting set
3815 * create this key with the result set inside */
3816 deleteKey(c->db,dstkey);
3817 dictAdd(c->db->dict,dstkey,dstset);
3818 incrRefCount(dstkey);
3819 }
3820
3821 /* Cleanup */
3822 if (!dstkey) {
3823 decrRefCount(dstset);
3824 } else {
3825 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3826 dictSize((dict*)dstset->ptr)));
3827 server.dirty++;
3828 }
3829 zfree(dv);
3830 }
3831
3832 static void sunionCommand(redisClient *c) {
3833 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3834 }
3835
3836 static void sunionstoreCommand(redisClient *c) {
3837 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3838 }
3839
3840 static void sdiffCommand(redisClient *c) {
3841 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3842 }
3843
3844 static void sdiffstoreCommand(redisClient *c) {
3845 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3846 }
3847
3848 /* ==================================== ZSets =============================== */
3849
3850 /* ZSETs are ordered sets using two data structures to hold the same elements
3851 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3852 * data structure.
3853 *
3854 * The elements are added to an hash table mapping Redis objects to scores.
3855 * At the same time the elements are added to a skip list mapping scores
3856 * to Redis objects (so objects are sorted by scores in this "view"). */
3857
3858 /* This skiplist implementation is almost a C translation of the original
3859 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3860 * Alternative to Balanced Trees", modified in three ways:
3861 * a) this implementation allows for repeated values.
3862 * b) the comparison is not just by key (our 'score') but by satellite data.
3863 * c) there is a back pointer, so it's a doubly linked list with the back
3864 * pointers being only at "level 1". This allows to traverse the list
3865 * from tail to head, useful for ZREVRANGE. */
3866
3867 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3868 zskiplistNode *zn = zmalloc(sizeof(*zn));
3869
3870 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3871 zn->score = score;
3872 zn->obj = obj;
3873 return zn;
3874 }
3875
3876 static zskiplist *zslCreate(void) {
3877 int j;
3878 zskiplist *zsl;
3879
3880 zsl = zmalloc(sizeof(*zsl));
3881 zsl->level = 1;
3882 zsl->length = 0;
3883 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3884 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3885 zsl->header->forward[j] = NULL;
3886 zsl->header->backward = NULL;
3887 zsl->tail = NULL;
3888 return zsl;
3889 }
3890
3891 static void zslFreeNode(zskiplistNode *node) {
3892 decrRefCount(node->obj);
3893 zfree(node->forward);
3894 zfree(node);
3895 }
3896
3897 static void zslFree(zskiplist *zsl) {
3898 zskiplistNode *node = zsl->header->forward[0], *next;
3899
3900 zfree(zsl->header->forward);
3901 zfree(zsl->header);
3902 while(node) {
3903 next = node->forward[0];
3904 zslFreeNode(node);
3905 node = next;
3906 }
3907 zfree(zsl);
3908 }
3909
3910 static int zslRandomLevel(void) {
3911 int level = 1;
3912 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3913 level += 1;
3914 return level;
3915 }
3916
3917 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3918 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3919 int i, level;
3920
3921 x = zsl->header;
3922 for (i = zsl->level-1; i >= 0; i--) {
3923 while (x->forward[i] &&
3924 (x->forward[i]->score < score ||
3925 (x->forward[i]->score == score &&
3926 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3927 x = x->forward[i];
3928 update[i] = x;
3929 }
3930 /* we assume the key is not already inside, since we allow duplicated
3931 * scores, and the re-insertion of score and redis object should never
3932 * happpen since the caller of zslInsert() should test in the hash table
3933 * if the element is already inside or not. */
3934 level = zslRandomLevel();
3935 if (level > zsl->level) {
3936 for (i = zsl->level; i < level; i++)
3937 update[i] = zsl->header;
3938 zsl->level = level;
3939 }
3940 x = zslCreateNode(level,score,obj);
3941 for (i = 0; i < level; i++) {
3942 x->forward[i] = update[i]->forward[i];
3943 update[i]->forward[i] = x;
3944 }
3945 x->backward = (update[0] == zsl->header) ? NULL : update[0];
3946 if (x->forward[0])
3947 x->forward[0]->backward = x;
3948 else
3949 zsl->tail = x;
3950 zsl->length++;
3951 }
3952
3953 /* Delete an element with matching score/object from the skiplist. */
3954 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
3955 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3956 int i;
3957
3958 x = zsl->header;
3959 for (i = zsl->level-1; i >= 0; i--) {
3960 while (x->forward[i] &&
3961 (x->forward[i]->score < score ||
3962 (x->forward[i]->score == score &&
3963 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3964 x = x->forward[i];
3965 update[i] = x;
3966 }
3967 /* We may have multiple elements with the same score, what we need
3968 * is to find the element with both the right score and object. */
3969 x = x->forward[0];
3970 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
3971 for (i = 0; i < zsl->level; i++) {
3972 if (update[i]->forward[i] != x) break;
3973 update[i]->forward[i] = x->forward[i];
3974 }
3975 if (x->forward[0]) {
3976 x->forward[0]->backward = (x->backward == zsl->header) ?
3977 NULL : x->backward;
3978 } else {
3979 zsl->tail = x->backward;
3980 }
3981 zslFreeNode(x);
3982 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
3983 zsl->level--;
3984 zsl->length--;
3985 return 1;
3986 } else {
3987 return 0; /* not found */
3988 }
3989 return 0; /* not found */
3990 }
3991
3992 /* Delete all the elements with score between min and max from the skiplist.
3993 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
3994 * Note that this function takes the reference to the hash table view of the
3995 * sorted set, in order to remove the elements from the hash table too. */
3996 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
3997 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3998 unsigned long removed = 0;
3999 int i;
4000
4001 x = zsl->header;
4002 for (i = zsl->level-1; i >= 0; i--) {
4003 while (x->forward[i] && x->forward[i]->score < min)
4004 x = x->forward[i];
4005 update[i] = x;
4006 }
4007 /* We may have multiple elements with the same score, what we need
4008 * is to find the element with both the right score and object. */
4009 x = x->forward[0];
4010 while (x && x->score <= max) {
4011 zskiplistNode *next;
4012
4013 for (i = 0; i < zsl->level; i++) {
4014 if (update[i]->forward[i] != x) break;
4015 update[i]->forward[i] = x->forward[i];
4016 }
4017 if (x->forward[0]) {
4018 x->forward[0]->backward = (x->backward == zsl->header) ?
4019 NULL : x->backward;
4020 } else {
4021 zsl->tail = x->backward;
4022 }
4023 next = x->forward[0];
4024 dictDelete(dict,x->obj);
4025 zslFreeNode(x);
4026 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4027 zsl->level--;
4028 zsl->length--;
4029 removed++;
4030 x = next;
4031 }
4032 return removed; /* not found */
4033 }
4034
4035 /* Find the first node having a score equal or greater than the specified one.
4036 * Returns NULL if there is no match. */
4037 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4038 zskiplistNode *x;
4039 int i;
4040
4041 x = zsl->header;
4042 for (i = zsl->level-1; i >= 0; i--) {
4043 while (x->forward[i] && x->forward[i]->score < score)
4044 x = x->forward[i];
4045 }
4046 /* We may have multiple elements with the same score, what we need
4047 * is to find the element with both the right score and object. */
4048 return x->forward[0];
4049 }
4050
4051 /* The actual Z-commands implementations */
4052
4053 static void zaddCommand(redisClient *c) {
4054 robj *zsetobj;
4055 zset *zs;
4056 double *score;
4057
4058 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4059 if (zsetobj == NULL) {
4060 zsetobj = createZsetObject();
4061 dictAdd(c->db->dict,c->argv[1],zsetobj);
4062 incrRefCount(c->argv[1]);
4063 } else {
4064 if (zsetobj->type != REDIS_ZSET) {
4065 addReply(c,shared.wrongtypeerr);
4066 return;
4067 }
4068 }
4069 score = zmalloc(sizeof(double));
4070 *score = strtod(c->argv[2]->ptr,NULL);
4071 zs = zsetobj->ptr;
4072 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
4073 /* case 1: New element */
4074 incrRefCount(c->argv[3]); /* added to hash */
4075 zslInsert(zs->zsl,*score,c->argv[3]);
4076 incrRefCount(c->argv[3]); /* added to skiplist */
4077 server.dirty++;
4078 addReply(c,shared.cone);
4079 } else {
4080 dictEntry *de;
4081 double *oldscore;
4082
4083 /* case 2: Score update operation */
4084 de = dictFind(zs->dict,c->argv[3]);
4085 assert(de != NULL);
4086 oldscore = dictGetEntryVal(de);
4087 if (*score != *oldscore) {
4088 int deleted;
4089
4090 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
4091 assert(deleted != 0);
4092 zslInsert(zs->zsl,*score,c->argv[3]);
4093 incrRefCount(c->argv[3]);
4094 dictReplace(zs->dict,c->argv[3],score);
4095 server.dirty++;
4096 } else {
4097 zfree(score);
4098 }
4099 addReply(c,shared.czero);
4100 }
4101 }
4102
4103 static void zremCommand(redisClient *c) {
4104 robj *zsetobj;
4105 zset *zs;
4106
4107 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4108 if (zsetobj == NULL) {
4109 addReply(c,shared.czero);
4110 } else {
4111 dictEntry *de;
4112 double *oldscore;
4113 int deleted;
4114
4115 if (zsetobj->type != REDIS_ZSET) {
4116 addReply(c,shared.wrongtypeerr);
4117 return;
4118 }
4119 zs = zsetobj->ptr;
4120 de = dictFind(zs->dict,c->argv[2]);
4121 if (de == NULL) {
4122 addReply(c,shared.czero);
4123 return;
4124 }
4125 /* Delete from the skiplist */
4126 oldscore = dictGetEntryVal(de);
4127 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4128 assert(deleted != 0);
4129
4130 /* Delete from the hash table */
4131 dictDelete(zs->dict,c->argv[2]);
4132 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4133 server.dirty++;
4134 addReply(c,shared.cone);
4135 }
4136 }
4137
4138 static void zremrangebyscoreCommand(redisClient *c) {
4139 double min = strtod(c->argv[2]->ptr,NULL);
4140 double max = strtod(c->argv[3]->ptr,NULL);
4141 robj *zsetobj;
4142 zset *zs;
4143
4144 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4145 if (zsetobj == NULL) {
4146 addReply(c,shared.czero);
4147 } else {
4148 long deleted;
4149
4150 if (zsetobj->type != REDIS_ZSET) {
4151 addReply(c,shared.wrongtypeerr);
4152 return;
4153 }
4154 zs = zsetobj->ptr;
4155 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4156 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4157 server.dirty += deleted;
4158 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4159 }
4160 }
4161
4162 static void zrangeGenericCommand(redisClient *c, int reverse) {
4163 robj *o;
4164 int start = atoi(c->argv[2]->ptr);
4165 int end = atoi(c->argv[3]->ptr);
4166
4167 o = lookupKeyRead(c->db,c->argv[1]);
4168 if (o == NULL) {
4169 addReply(c,shared.nullmultibulk);
4170 } else {
4171 if (o->type != REDIS_ZSET) {
4172 addReply(c,shared.wrongtypeerr);
4173 } else {
4174 zset *zsetobj = o->ptr;
4175 zskiplist *zsl = zsetobj->zsl;
4176 zskiplistNode *ln;
4177
4178 int llen = zsl->length;
4179 int rangelen, j;
4180 robj *ele;
4181
4182 /* convert negative indexes */
4183 if (start < 0) start = llen+start;
4184 if (end < 0) end = llen+end;
4185 if (start < 0) start = 0;
4186 if (end < 0) end = 0;
4187
4188 /* indexes sanity checks */
4189 if (start > end || start >= llen) {
4190 /* Out of range start or start > end result in empty list */
4191 addReply(c,shared.emptymultibulk);
4192 return;
4193 }
4194 if (end >= llen) end = llen-1;
4195 rangelen = (end-start)+1;
4196
4197 /* Return the result in form of a multi-bulk reply */
4198 if (reverse) {
4199 ln = zsl->tail;
4200 while (start--)
4201 ln = ln->backward;
4202 } else {
4203 ln = zsl->header->forward[0];
4204 while (start--)
4205 ln = ln->forward[0];
4206 }
4207
4208 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4209 for (j = 0; j < rangelen; j++) {
4210 ele = ln->obj;
4211 addReplyBulkLen(c,ele);
4212 addReply(c,ele);
4213 addReply(c,shared.crlf);
4214 ln = reverse ? ln->backward : ln->forward[0];
4215 }
4216 }
4217 }
4218 }
4219
4220 static void zrangeCommand(redisClient *c) {
4221 zrangeGenericCommand(c,0);
4222 }
4223
4224 static void zrevrangeCommand(redisClient *c) {
4225 zrangeGenericCommand(c,1);
4226 }
4227
4228 static void zrangebyscoreCommand(redisClient *c) {
4229 robj *o;
4230 double min = strtod(c->argv[2]->ptr,NULL);
4231 double max = strtod(c->argv[3]->ptr,NULL);
4232
4233 o = lookupKeyRead(c->db,c->argv[1]);
4234 if (o == NULL) {
4235 addReply(c,shared.nullmultibulk);
4236 } else {
4237 if (o->type != REDIS_ZSET) {
4238 addReply(c,shared.wrongtypeerr);
4239 } else {
4240 zset *zsetobj = o->ptr;
4241 zskiplist *zsl = zsetobj->zsl;
4242 zskiplistNode *ln;
4243 robj *ele, *lenobj;
4244 unsigned int rangelen = 0;
4245
4246 /* Get the first node with the score >= min */
4247 ln = zslFirstWithScore(zsl,min);
4248 if (ln == NULL) {
4249 /* No element matching the speciifed interval */
4250 addReply(c,shared.emptymultibulk);
4251 return;
4252 }
4253
4254 /* We don't know in advance how many matching elements there
4255 * are in the list, so we push this object that will represent
4256 * the multi-bulk length in the output buffer, and will "fix"
4257 * it later */
4258 lenobj = createObject(REDIS_STRING,NULL);
4259 addReply(c,lenobj);
4260
4261 while(ln && ln->score <= max) {
4262 ele = ln->obj;
4263 addReplyBulkLen(c,ele);
4264 addReply(c,ele);
4265 addReply(c,shared.crlf);
4266 ln = ln->forward[0];
4267 rangelen++;
4268 }
4269 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4270 }
4271 }
4272 }
4273
4274 static void zcardCommand(redisClient *c) {
4275 robj *o;
4276 zset *zs;
4277
4278 o = lookupKeyRead(c->db,c->argv[1]);
4279 if (o == NULL) {
4280 addReply(c,shared.czero);
4281 return;
4282 } else {
4283 if (o->type != REDIS_ZSET) {
4284 addReply(c,shared.wrongtypeerr);
4285 } else {
4286 zs = o->ptr;
4287 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4288 }
4289 }
4290 }
4291
4292 static void zscoreCommand(redisClient *c) {
4293 robj *o;
4294 zset *zs;
4295
4296 o = lookupKeyRead(c->db,c->argv[1]);
4297 if (o == NULL) {
4298 addReply(c,shared.czero);
4299 return;
4300 } else {
4301 if (o->type != REDIS_ZSET) {
4302 addReply(c,shared.wrongtypeerr);
4303 } else {
4304 dictEntry *de;
4305
4306 zs = o->ptr;
4307 de = dictFind(zs->dict,c->argv[2]);
4308 if (!de) {
4309 addReply(c,shared.nullbulk);
4310 } else {
4311 char buf[128];
4312 double *score = dictGetEntryVal(de);
4313
4314 snprintf(buf,sizeof(buf),"%.17g",*score);
4315 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
4316 strlen(buf),buf));
4317 }
4318 }
4319 }
4320 }
4321
4322 /* ========================= Non type-specific commands ==================== */
4323
4324 static void flushdbCommand(redisClient *c) {
4325 server.dirty += dictSize(c->db->dict);
4326 dictEmpty(c->db->dict);
4327 dictEmpty(c->db->expires);
4328 addReply(c,shared.ok);
4329 }
4330
4331 static void flushallCommand(redisClient *c) {
4332 server.dirty += emptyDb();
4333 addReply(c,shared.ok);
4334 rdbSave(server.dbfilename);
4335 server.dirty++;
4336 }
4337
4338 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4339 redisSortOperation *so = zmalloc(sizeof(*so));
4340 so->type = type;
4341 so->pattern = pattern;
4342 return so;
4343 }
4344
4345 /* Return the value associated to the key with a name obtained
4346 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4347 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4348 char *p;
4349 sds spat, ssub;
4350 robj keyobj;
4351 int prefixlen, sublen, postfixlen;
4352 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4353 struct {
4354 long len;
4355 long free;
4356 char buf[REDIS_SORTKEY_MAX+1];
4357 } keyname;
4358
4359 if (subst->encoding == REDIS_ENCODING_RAW)
4360 incrRefCount(subst);
4361 else {
4362 subst = getDecodedObject(subst);
4363 }
4364
4365 spat = pattern->ptr;
4366 ssub = subst->ptr;
4367 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4368 p = strchr(spat,'*');
4369 if (!p) return NULL;
4370
4371 prefixlen = p-spat;
4372 sublen = sdslen(ssub);
4373 postfixlen = sdslen(spat)-(prefixlen+1);
4374 memcpy(keyname.buf,spat,prefixlen);
4375 memcpy(keyname.buf+prefixlen,ssub,sublen);
4376 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4377 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4378 keyname.len = prefixlen+sublen+postfixlen;
4379
4380 keyobj.refcount = 1;
4381 keyobj.type = REDIS_STRING;
4382 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4383
4384 decrRefCount(subst);
4385
4386 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4387 return lookupKeyRead(db,&keyobj);
4388 }
4389
4390 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4391 * the additional parameter is not standard but a BSD-specific we have to
4392 * pass sorting parameters via the global 'server' structure */
4393 static int sortCompare(const void *s1, const void *s2) {
4394 const redisSortObject *so1 = s1, *so2 = s2;
4395 int cmp;
4396
4397 if (!server.sort_alpha) {
4398 /* Numeric sorting. Here it's trivial as we precomputed scores */
4399 if (so1->u.score > so2->u.score) {
4400 cmp = 1;
4401 } else if (so1->u.score < so2->u.score) {
4402 cmp = -1;
4403 } else {
4404 cmp = 0;
4405 }
4406 } else {
4407 /* Alphanumeric sorting */
4408 if (server.sort_bypattern) {
4409 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4410 /* At least one compare object is NULL */
4411 if (so1->u.cmpobj == so2->u.cmpobj)
4412 cmp = 0;
4413 else if (so1->u.cmpobj == NULL)
4414 cmp = -1;
4415 else
4416 cmp = 1;
4417 } else {
4418 /* We have both the objects, use strcoll */
4419 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4420 }
4421 } else {
4422 /* Compare elements directly */
4423 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4424 so2->obj->encoding == REDIS_ENCODING_RAW) {
4425 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4426 } else {
4427 robj *dec1, *dec2;
4428
4429 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4430 so1->obj : getDecodedObject(so1->obj);
4431 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4432 so2->obj : getDecodedObject(so2->obj);
4433 cmp = strcoll(dec1->ptr,dec2->ptr);
4434 if (dec1 != so1->obj) decrRefCount(dec1);
4435 if (dec2 != so2->obj) decrRefCount(dec2);
4436 }
4437 }
4438 }
4439 return server.sort_desc ? -cmp : cmp;
4440 }
4441
4442 /* The SORT command is the most complex command in Redis. Warning: this code
4443 * is optimized for speed and a bit less for readability */
4444 static void sortCommand(redisClient *c) {
4445 list *operations;
4446 int outputlen = 0;
4447 int desc = 0, alpha = 0;
4448 int limit_start = 0, limit_count = -1, start, end;
4449 int j, dontsort = 0, vectorlen;
4450 int getop = 0; /* GET operation counter */
4451 robj *sortval, *sortby = NULL, *storekey = NULL;
4452 redisSortObject *vector; /* Resulting vector to sort */
4453
4454 /* Lookup the key to sort. It must be of the right types */
4455 sortval = lookupKeyRead(c->db,c->argv[1]);
4456 if (sortval == NULL) {
4457 addReply(c,shared.nokeyerr);
4458 return;
4459 }
4460 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4461 addReply(c,shared.wrongtypeerr);
4462 return;
4463 }
4464
4465 /* Create a list of operations to perform for every sorted element.
4466 * Operations can be GET/DEL/INCR/DECR */
4467 operations = listCreate();
4468 listSetFreeMethod(operations,zfree);
4469 j = 2;
4470
4471 /* Now we need to protect sortval incrementing its count, in the future
4472 * SORT may have options able to overwrite/delete keys during the sorting
4473 * and the sorted key itself may get destroied */
4474 incrRefCount(sortval);
4475
4476 /* The SORT command has an SQL-alike syntax, parse it */
4477 while(j < c->argc) {
4478 int leftargs = c->argc-j-1;
4479 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4480 desc = 0;
4481 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4482 desc = 1;
4483 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4484 alpha = 1;
4485 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4486 limit_start = atoi(c->argv[j+1]->ptr);
4487 limit_count = atoi(c->argv[j+2]->ptr);
4488 j+=2;
4489 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4490 storekey = c->argv[j+1];
4491 j++;
4492 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4493 sortby = c->argv[j+1];
4494 /* If the BY pattern does not contain '*', i.e. it is constant,
4495 * we don't need to sort nor to lookup the weight keys. */
4496 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4497 j++;
4498 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4499 listAddNodeTail(operations,createSortOperation(
4500 REDIS_SORT_GET,c->argv[j+1]));
4501 getop++;
4502 j++;
4503 } else {
4504 decrRefCount(sortval);
4505 listRelease(operations);
4506 addReply(c,shared.syntaxerr);
4507 return;
4508 }
4509 j++;
4510 }
4511
4512 /* Load the sorting vector with all the objects to sort */
4513 vectorlen = (sortval->type == REDIS_LIST) ?
4514 listLength((list*)sortval->ptr) :
4515 dictSize((dict*)sortval->ptr);
4516 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4517 j = 0;
4518 if (sortval->type == REDIS_LIST) {
4519 list *list = sortval->ptr;
4520 listNode *ln;
4521
4522 listRewind(list);
4523 while((ln = listYield(list))) {
4524 robj *ele = ln->value;
4525 vector[j].obj = ele;
4526 vector[j].u.score = 0;
4527 vector[j].u.cmpobj = NULL;
4528 j++;
4529 }
4530 } else {
4531 dict *set = sortval->ptr;
4532 dictIterator *di;
4533 dictEntry *setele;
4534
4535 di = dictGetIterator(set);
4536 while((setele = dictNext(di)) != NULL) {
4537 vector[j].obj = dictGetEntryKey(setele);
4538 vector[j].u.score = 0;
4539 vector[j].u.cmpobj = NULL;
4540 j++;
4541 }
4542 dictReleaseIterator(di);
4543 }
4544 assert(j == vectorlen);
4545
4546 /* Now it's time to load the right scores in the sorting vector */
4547 if (dontsort == 0) {
4548 for (j = 0; j < vectorlen; j++) {
4549 if (sortby) {
4550 robj *byval;
4551
4552 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4553 if (!byval || byval->type != REDIS_STRING) continue;
4554 if (alpha) {
4555 if (byval->encoding == REDIS_ENCODING_RAW) {
4556 vector[j].u.cmpobj = byval;
4557 incrRefCount(byval);
4558 } else {
4559 vector[j].u.cmpobj = getDecodedObject(byval);
4560 }
4561 } else {
4562 if (byval->encoding == REDIS_ENCODING_RAW) {
4563 vector[j].u.score = strtod(byval->ptr,NULL);
4564 } else {
4565 if (byval->encoding == REDIS_ENCODING_INT) {
4566 vector[j].u.score = (long)byval->ptr;
4567 } else
4568 assert(1 != 1);
4569 }
4570 }
4571 } else {
4572 if (!alpha) {
4573 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4574 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4575 else {
4576 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4577 vector[j].u.score = (long) vector[j].obj->ptr;
4578 else
4579 assert(1 != 1);
4580 }
4581 }
4582 }
4583 }
4584 }
4585
4586 /* We are ready to sort the vector... perform a bit of sanity check
4587 * on the LIMIT option too. We'll use a partial version of quicksort. */
4588 start = (limit_start < 0) ? 0 : limit_start;
4589 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4590 if (start >= vectorlen) {
4591 start = vectorlen-1;
4592 end = vectorlen-2;
4593 }
4594 if (end >= vectorlen) end = vectorlen-1;
4595
4596 if (dontsort == 0) {
4597 server.sort_desc = desc;
4598 server.sort_alpha = alpha;
4599 server.sort_bypattern = sortby ? 1 : 0;
4600 if (sortby && (start != 0 || end != vectorlen-1))
4601 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4602 else
4603 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4604 }
4605
4606 /* Send command output to the output buffer, performing the specified
4607 * GET/DEL/INCR/DECR operations if any. */
4608 outputlen = getop ? getop*(end-start+1) : end-start+1;
4609 if (storekey == NULL) {
4610 /* STORE option not specified, sent the sorting result to client */
4611 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4612 for (j = start; j <= end; j++) {
4613 listNode *ln;
4614 if (!getop) {
4615 addReplyBulkLen(c,vector[j].obj);
4616 addReply(c,vector[j].obj);
4617 addReply(c,shared.crlf);
4618 }
4619 listRewind(operations);
4620 while((ln = listYield(operations))) {
4621 redisSortOperation *sop = ln->value;
4622 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4623 vector[j].obj);
4624
4625 if (sop->type == REDIS_SORT_GET) {
4626 if (!val || val->type != REDIS_STRING) {
4627 addReply(c,shared.nullbulk);
4628 } else {
4629 addReplyBulkLen(c,val);
4630 addReply(c,val);
4631 addReply(c,shared.crlf);
4632 }
4633 } else {
4634 assert(sop->type == REDIS_SORT_GET); /* always fails */
4635 }
4636 }
4637 }
4638 } else {
4639 robj *listObject = createListObject();
4640 list *listPtr = (list*) listObject->ptr;
4641
4642 /* STORE option specified, set the sorting result as a List object */
4643 for (j = start; j <= end; j++) {
4644 listNode *ln;
4645 if (!getop) {
4646 listAddNodeTail(listPtr,vector[j].obj);
4647 incrRefCount(vector[j].obj);
4648 }
4649 listRewind(operations);
4650 while((ln = listYield(operations))) {
4651 redisSortOperation *sop = ln->value;
4652 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4653 vector[j].obj);
4654
4655 if (sop->type == REDIS_SORT_GET) {
4656 if (!val || val->type != REDIS_STRING) {
4657 listAddNodeTail(listPtr,createStringObject("",0));
4658 } else {
4659 listAddNodeTail(listPtr,val);
4660 incrRefCount(val);
4661 }
4662 } else {
4663 assert(sop->type == REDIS_SORT_GET); /* always fails */
4664 }
4665 }
4666 }
4667 dictReplace(c->db->dict,storekey,listObject);
4668 /* Note: we add 1 because the DB is dirty anyway since even if the
4669 * SORT result is empty a new key is set and maybe the old content
4670 * replaced. */
4671 server.dirty += 1+outputlen;
4672 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
4673 }
4674
4675 /* Cleanup */
4676 decrRefCount(sortval);
4677 listRelease(operations);
4678 for (j = 0; j < vectorlen; j++) {
4679 if (sortby && alpha && vector[j].u.cmpobj)
4680 decrRefCount(vector[j].u.cmpobj);
4681 }
4682 zfree(vector);
4683 }
4684
4685 static void infoCommand(redisClient *c) {
4686 sds info;
4687 time_t uptime = time(NULL)-server.stat_starttime;
4688 int j;
4689
4690 info = sdscatprintf(sdsempty(),
4691 "redis_version:%s\r\n"
4692 "arch_bits:%s\r\n"
4693 "uptime_in_seconds:%d\r\n"
4694 "uptime_in_days:%d\r\n"
4695 "connected_clients:%d\r\n"
4696 "connected_slaves:%d\r\n"
4697 "used_memory:%zu\r\n"
4698 "changes_since_last_save:%lld\r\n"
4699 "bgsave_in_progress:%d\r\n"
4700 "last_save_time:%d\r\n"
4701 "total_connections_received:%lld\r\n"
4702 "total_commands_processed:%lld\r\n"
4703 "role:%s\r\n"
4704 ,REDIS_VERSION,
4705 (sizeof(long) == 8) ? "64" : "32",
4706 uptime,
4707 uptime/(3600*24),
4708 listLength(server.clients)-listLength(server.slaves),
4709 listLength(server.slaves),
4710 server.usedmemory,
4711 server.dirty,
4712 server.bgsaveinprogress,
4713 server.lastsave,
4714 server.stat_numconnections,
4715 server.stat_numcommands,
4716 server.masterhost == NULL ? "master" : "slave"
4717 );
4718 if (server.masterhost) {
4719 info = sdscatprintf(info,
4720 "master_host:%s\r\n"
4721 "master_port:%d\r\n"
4722 "master_link_status:%s\r\n"
4723 "master_last_io_seconds_ago:%d\r\n"
4724 ,server.masterhost,
4725 server.masterport,
4726 (server.replstate == REDIS_REPL_CONNECTED) ?
4727 "up" : "down",
4728 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
4729 );
4730 }
4731 for (j = 0; j < server.dbnum; j++) {
4732 long long keys, vkeys;
4733
4734 keys = dictSize(server.db[j].dict);
4735 vkeys = dictSize(server.db[j].expires);
4736 if (keys || vkeys) {
4737 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4738 j, keys, vkeys);
4739 }
4740 }
4741 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4742 addReplySds(c,info);
4743 addReply(c,shared.crlf);
4744 }
4745
4746 static void monitorCommand(redisClient *c) {
4747 /* ignore MONITOR if aleady slave or in monitor mode */
4748 if (c->flags & REDIS_SLAVE) return;
4749
4750 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4751 c->slaveseldb = 0;
4752 listAddNodeTail(server.monitors,c);
4753 addReply(c,shared.ok);
4754 }
4755
4756 /* ================================= Expire ================================= */
4757 static int removeExpire(redisDb *db, robj *key) {
4758 if (dictDelete(db->expires,key) == DICT_OK) {
4759 return 1;
4760 } else {
4761 return 0;
4762 }
4763 }
4764
4765 static int setExpire(redisDb *db, robj *key, time_t when) {
4766 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4767 return 0;
4768 } else {
4769 incrRefCount(key);
4770 return 1;
4771 }
4772 }
4773
4774 /* Return the expire time of the specified key, or -1 if no expire
4775 * is associated with this key (i.e. the key is non volatile) */
4776 static time_t getExpire(redisDb *db, robj *key) {
4777 dictEntry *de;
4778
4779 /* No expire? return ASAP */
4780 if (dictSize(db->expires) == 0 ||
4781 (de = dictFind(db->expires,key)) == NULL) return -1;
4782
4783 return (time_t) dictGetEntryVal(de);
4784 }
4785
4786 static int expireIfNeeded(redisDb *db, robj *key) {
4787 time_t when;
4788 dictEntry *de;
4789
4790 /* No expire? return ASAP */
4791 if (dictSize(db->expires) == 0 ||
4792 (de = dictFind(db->expires,key)) == NULL) return 0;
4793
4794 /* Lookup the expire */
4795 when = (time_t) dictGetEntryVal(de);
4796 if (time(NULL) <= when) return 0;
4797
4798 /* Delete the key */
4799 dictDelete(db->expires,key);
4800 return dictDelete(db->dict,key) == DICT_OK;
4801 }
4802
4803 static int deleteIfVolatile(redisDb *db, robj *key) {
4804 dictEntry *de;
4805
4806 /* No expire? return ASAP */
4807 if (dictSize(db->expires) == 0 ||
4808 (de = dictFind(db->expires,key)) == NULL) return 0;
4809
4810 /* Delete the key */
4811 server.dirty++;
4812 dictDelete(db->expires,key);
4813 return dictDelete(db->dict,key) == DICT_OK;
4814 }
4815
4816 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
4817 dictEntry *de;
4818
4819 de = dictFind(c->db->dict,key);
4820 if (de == NULL) {
4821 addReply(c,shared.czero);
4822 return;
4823 }
4824 if (seconds < 0) {
4825 if (deleteKey(c->db,key)) server.dirty++;
4826 addReply(c, shared.cone);
4827 return;
4828 } else {
4829 time_t when = time(NULL)+seconds;
4830 if (setExpire(c->db,key,when)) {
4831 addReply(c,shared.cone);
4832 server.dirty++;
4833 } else {
4834 addReply(c,shared.czero);
4835 }
4836 return;
4837 }
4838 }
4839
4840 static void expireCommand(redisClient *c) {
4841 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
4842 }
4843
4844 static void expireatCommand(redisClient *c) {
4845 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
4846 }
4847
4848 static void ttlCommand(redisClient *c) {
4849 time_t expire;
4850 int ttl = -1;
4851
4852 expire = getExpire(c->db,c->argv[1]);
4853 if (expire != -1) {
4854 ttl = (int) (expire-time(NULL));
4855 if (ttl < 0) ttl = -1;
4856 }
4857 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4858 }
4859
4860 static void msetGenericCommand(redisClient *c, int nx) {
4861 int j;
4862
4863 if ((c->argc % 2) == 0) {
4864 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4865 return;
4866 }
4867 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4868 * set nothing at all if at least one already key exists. */
4869 if (nx) {
4870 for (j = 1; j < c->argc; j += 2) {
4871 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4872 addReply(c, shared.czero);
4873 return;
4874 }
4875 }
4876 }
4877
4878 for (j = 1; j < c->argc; j += 2) {
4879 int retval;
4880
4881 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4882 if (retval == DICT_ERR) {
4883 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4884 incrRefCount(c->argv[j+1]);
4885 } else {
4886 incrRefCount(c->argv[j]);
4887 incrRefCount(c->argv[j+1]);
4888 }
4889 removeExpire(c->db,c->argv[j]);
4890 }
4891 server.dirty += (c->argc-1)/2;
4892 addReply(c, nx ? shared.cone : shared.ok);
4893 }
4894
4895 static void msetCommand(redisClient *c) {
4896 msetGenericCommand(c,0);
4897 }
4898
4899 static void msetnxCommand(redisClient *c) {
4900 msetGenericCommand(c,1);
4901 }
4902
4903 /* =============================== Replication ============================= */
4904
4905 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4906 ssize_t nwritten, ret = size;
4907 time_t start = time(NULL);
4908
4909 timeout++;
4910 while(size) {
4911 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4912 nwritten = write(fd,ptr,size);
4913 if (nwritten == -1) return -1;
4914 ptr += nwritten;
4915 size -= nwritten;
4916 }
4917 if ((time(NULL)-start) > timeout) {
4918 errno = ETIMEDOUT;
4919 return -1;
4920 }
4921 }
4922 return ret;
4923 }
4924
4925 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4926 ssize_t nread, totread = 0;
4927 time_t start = time(NULL);
4928
4929 timeout++;
4930 while(size) {
4931 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4932 nread = read(fd,ptr,size);
4933 if (nread == -1) return -1;
4934 ptr += nread;
4935 size -= nread;
4936 totread += nread;
4937 }
4938 if ((time(NULL)-start) > timeout) {
4939 errno = ETIMEDOUT;
4940 return -1;
4941 }
4942 }
4943 return totread;
4944 }
4945
4946 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4947 ssize_t nread = 0;
4948
4949 size--;
4950 while(size) {
4951 char c;
4952
4953 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4954 if (c == '\n') {
4955 *ptr = '\0';
4956 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4957 return nread;
4958 } else {
4959 *ptr++ = c;
4960 *ptr = '\0';
4961 nread++;
4962 }
4963 }
4964 return nread;
4965 }
4966
4967 static void syncCommand(redisClient *c) {
4968 /* ignore SYNC if aleady slave or in monitor mode */
4969 if (c->flags & REDIS_SLAVE) return;
4970
4971 /* SYNC can't be issued when the server has pending data to send to
4972 * the client about already issued commands. We need a fresh reply
4973 * buffer registering the differences between the BGSAVE and the current
4974 * dataset, so that we can copy to other slaves if needed. */
4975 if (listLength(c->reply) != 0) {
4976 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4977 return;
4978 }
4979
4980 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4981 /* Here we need to check if there is a background saving operation
4982 * in progress, or if it is required to start one */
4983 if (server.bgsaveinprogress) {
4984 /* Ok a background save is in progress. Let's check if it is a good
4985 * one for replication, i.e. if there is another slave that is
4986 * registering differences since the server forked to save */
4987 redisClient *slave;
4988 listNode *ln;
4989
4990 listRewind(server.slaves);
4991 while((ln = listYield(server.slaves))) {
4992 slave = ln->value;
4993 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4994 }
4995 if (ln) {
4996 /* Perfect, the server is already registering differences for
4997 * another slave. Set the right state, and copy the buffer. */
4998 listRelease(c->reply);
4999 c->reply = listDup(slave->reply);
5000 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5001 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5002 } else {
5003 /* No way, we need to wait for the next BGSAVE in order to
5004 * register differences */
5005 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5006 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5007 }
5008 } else {
5009 /* Ok we don't have a BGSAVE in progress, let's start one */
5010 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5011 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5012 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5013 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5014 return;
5015 }
5016 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5017 }
5018 c->repldbfd = -1;
5019 c->flags |= REDIS_SLAVE;
5020 c->slaveseldb = 0;
5021 listAddNodeTail(server.slaves,c);
5022 return;
5023 }
5024
5025 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5026 redisClient *slave = privdata;
5027 REDIS_NOTUSED(el);
5028 REDIS_NOTUSED(mask);
5029 char buf[REDIS_IOBUF_LEN];
5030 ssize_t nwritten, buflen;
5031
5032 if (slave->repldboff == 0) {
5033 /* Write the bulk write count before to transfer the DB. In theory here
5034 * we don't know how much room there is in the output buffer of the
5035 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5036 * operations) will never be smaller than the few bytes we need. */
5037 sds bulkcount;
5038
5039 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5040 slave->repldbsize);
5041 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5042 {
5043 sdsfree(bulkcount);
5044 freeClient(slave);
5045 return;
5046 }
5047 sdsfree(bulkcount);
5048 }
5049 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5050 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5051 if (buflen <= 0) {
5052 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5053 (buflen == 0) ? "premature EOF" : strerror(errno));
5054 freeClient(slave);
5055 return;
5056 }
5057 if ((nwritten = write(fd,buf,buflen)) == -1) {
5058 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5059 strerror(errno));
5060 freeClient(slave);
5061 return;
5062 }
5063 slave->repldboff += nwritten;
5064 if (slave->repldboff == slave->repldbsize) {
5065 close(slave->repldbfd);
5066 slave->repldbfd = -1;
5067 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5068 slave->replstate = REDIS_REPL_ONLINE;
5069 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5070 sendReplyToClient, slave, NULL) == AE_ERR) {
5071 freeClient(slave);
5072 return;
5073 }
5074 addReplySds(slave,sdsempty());
5075 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5076 }
5077 }
5078
5079 /* This function is called at the end of every backgrond saving.
5080 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5081 * otherwise REDIS_ERR is passed to the function.
5082 *
5083 * The goal of this function is to handle slaves waiting for a successful
5084 * background saving in order to perform non-blocking synchronization. */
5085 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5086 listNode *ln;
5087 int startbgsave = 0;
5088
5089 listRewind(server.slaves);
5090 while((ln = listYield(server.slaves))) {
5091 redisClient *slave = ln->value;
5092
5093 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5094 startbgsave = 1;
5095 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5096 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5097 struct redis_stat buf;
5098
5099 if (bgsaveerr != REDIS_OK) {
5100 freeClient(slave);
5101 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5102 continue;
5103 }
5104 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5105 redis_fstat(slave->repldbfd,&buf) == -1) {
5106 freeClient(slave);
5107 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5108 continue;
5109 }
5110 slave->repldboff = 0;
5111 slave->repldbsize = buf.st_size;
5112 slave->replstate = REDIS_REPL_SEND_BULK;
5113 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5114 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
5115 freeClient(slave);
5116 continue;
5117 }
5118 }
5119 }
5120 if (startbgsave) {
5121 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5122 listRewind(server.slaves);
5123 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5124 while((ln = listYield(server.slaves))) {
5125 redisClient *slave = ln->value;
5126
5127 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5128 freeClient(slave);
5129 }
5130 }
5131 }
5132 }
5133
5134 static int syncWithMaster(void) {
5135 char buf[1024], tmpfile[256], authcmd[1024];
5136 int dumpsize;
5137 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5138 int dfd;
5139
5140 if (fd == -1) {
5141 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5142 strerror(errno));
5143 return REDIS_ERR;
5144 }
5145
5146 /* AUTH with the master if required. */
5147 if(server.masterauth) {
5148 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5149 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5150 close(fd);
5151 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5152 strerror(errno));
5153 return REDIS_ERR;
5154 }
5155 /* Read the AUTH result. */
5156 if (syncReadLine(fd,buf,1024,3600) == -1) {
5157 close(fd);
5158 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5159 strerror(errno));
5160 return REDIS_ERR;
5161 }
5162 if (buf[0] != '+') {
5163 close(fd);
5164 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5165 return REDIS_ERR;
5166 }
5167 }
5168
5169 /* Issue the SYNC command */
5170 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5171 close(fd);
5172 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5173 strerror(errno));
5174 return REDIS_ERR;
5175 }
5176 /* Read the bulk write count */
5177 if (syncReadLine(fd,buf,1024,3600) == -1) {
5178 close(fd);
5179 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5180 strerror(errno));
5181 return REDIS_ERR;
5182 }
5183 if (buf[0] != '$') {
5184 close(fd);
5185 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5186 return REDIS_ERR;
5187 }
5188 dumpsize = atoi(buf+1);
5189 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5190 /* Read the bulk write data on a temp file */
5191 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5192 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5193 if (dfd == -1) {
5194 close(fd);
5195 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5196 return REDIS_ERR;
5197 }
5198 while(dumpsize) {
5199 int nread, nwritten;
5200
5201 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5202 if (nread == -1) {
5203 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5204 strerror(errno));
5205 close(fd);
5206 close(dfd);
5207 return REDIS_ERR;
5208 }
5209 nwritten = write(dfd,buf,nread);
5210 if (nwritten == -1) {
5211 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5212 close(fd);
5213 close(dfd);
5214 return REDIS_ERR;
5215 }
5216 dumpsize -= nread;
5217 }
5218 close(dfd);
5219 if (rename(tmpfile,server.dbfilename) == -1) {
5220 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5221 unlink(tmpfile);
5222 close(fd);
5223 return REDIS_ERR;
5224 }
5225 emptyDb();
5226 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5227 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5228 close(fd);
5229 return REDIS_ERR;
5230 }
5231 server.master = createClient(fd);
5232 server.master->flags |= REDIS_MASTER;
5233 server.replstate = REDIS_REPL_CONNECTED;
5234 return REDIS_OK;
5235 }
5236
5237 static void slaveofCommand(redisClient *c) {
5238 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5239 !strcasecmp(c->argv[2]->ptr,"one")) {
5240 if (server.masterhost) {
5241 sdsfree(server.masterhost);
5242 server.masterhost = NULL;
5243 if (server.master) freeClient(server.master);
5244 server.replstate = REDIS_REPL_NONE;
5245 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5246 }
5247 } else {
5248 sdsfree(server.masterhost);
5249 server.masterhost = sdsdup(c->argv[1]->ptr);
5250 server.masterport = atoi(c->argv[2]->ptr);
5251 if (server.master) freeClient(server.master);
5252 server.replstate = REDIS_REPL_CONNECT;
5253 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5254 server.masterhost, server.masterport);
5255 }
5256 addReply(c,shared.ok);
5257 }
5258
5259 /* ============================ Maxmemory directive ======================== */
5260
5261 /* This function gets called when 'maxmemory' is set on the config file to limit
5262 * the max memory used by the server, and we are out of memory.
5263 * This function will try to, in order:
5264 *
5265 * - Free objects from the free list
5266 * - Try to remove keys with an EXPIRE set
5267 *
5268 * It is not possible to free enough memory to reach used-memory < maxmemory
5269 * the server will start refusing commands that will enlarge even more the
5270 * memory usage.
5271 */
5272 static void freeMemoryIfNeeded(void) {
5273 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5274 if (listLength(server.objfreelist)) {
5275 robj *o;
5276
5277 listNode *head = listFirst(server.objfreelist);
5278 o = listNodeValue(head);
5279 listDelNode(server.objfreelist,head);
5280 zfree(o);
5281 } else {
5282 int j, k, freed = 0;
5283
5284 for (j = 0; j < server.dbnum; j++) {
5285 int minttl = -1;
5286 robj *minkey = NULL;
5287 struct dictEntry *de;
5288
5289 if (dictSize(server.db[j].expires)) {
5290 freed = 1;
5291 /* From a sample of three keys drop the one nearest to
5292 * the natural expire */
5293 for (k = 0; k < 3; k++) {
5294 time_t t;
5295
5296 de = dictGetRandomKey(server.db[j].expires);
5297 t = (time_t) dictGetEntryVal(de);
5298 if (minttl == -1 || t < minttl) {
5299 minkey = dictGetEntryKey(de);
5300 minttl = t;
5301 }
5302 }
5303 deleteKey(server.db+j,minkey);
5304 }
5305 }
5306 if (!freed) return; /* nothing to free... */
5307 }
5308 }
5309 }
5310
5311 /* ============================== Append Only file ========================== */
5312
5313 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5314 sds buf = sdsempty();
5315 int j;
5316 ssize_t nwritten;
5317 time_t now;
5318 robj *tmpargv[3];
5319
5320 /* The DB this command was targetting is not the same as the last command
5321 * we appendend. To issue a SELECT command is needed. */
5322 if (dictid != server.appendseldb) {
5323 char seldb[64];
5324
5325 snprintf(seldb,sizeof(seldb),"%d",dictid);
5326 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
5327 strlen(seldb),seldb);
5328 server.appendseldb = dictid;
5329 }
5330
5331 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5332 * EXPIREs into EXPIREATs calls */
5333 if (cmd->proc == expireCommand) {
5334 long when;
5335
5336 tmpargv[0] = createStringObject("EXPIREAT",8);
5337 tmpargv[1] = argv[1];
5338 incrRefCount(argv[1]);
5339 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5340 tmpargv[2] = createObject(REDIS_STRING,
5341 sdscatprintf(sdsempty(),"%ld",when));
5342 argv = tmpargv;
5343 }
5344
5345 /* Append the actual command */
5346 buf = sdscatprintf(buf,"*%d\r\n",argc);
5347 for (j = 0; j < argc; j++) {
5348 robj *o = argv[j];
5349
5350 if (o->encoding != REDIS_ENCODING_RAW)
5351 o = getDecodedObject(o);
5352 buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
5353 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5354 buf = sdscatlen(buf,"\r\n",2);
5355 if (o != argv[j])
5356 decrRefCount(o);
5357 }
5358
5359 /* Free the objects from the modified argv for EXPIREAT */
5360 if (cmd->proc == expireCommand) {
5361 for (j = 0; j < 3; j++)
5362 decrRefCount(argv[j]);
5363 }
5364
5365 /* We want to perform a single write. This should be guaranteed atomic
5366 * at least if the filesystem we are writing is a real physical one.
5367 * While this will save us against the server being killed I don't think
5368 * there is much to do about the whole server stopping for power problems
5369 * or alike */
5370 nwritten = write(server.appendfd,buf,sdslen(buf));
5371 if (nwritten != (signed)sdslen(buf)) {
5372 /* Ooops, we are in troubles. The best thing to do for now is
5373 * to simply exit instead to give the illusion that everything is
5374 * working as expected. */
5375 if (nwritten == -1) {
5376 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5377 } else {
5378 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5379 }
5380 exit(1);
5381 }
5382 now = time(NULL);
5383 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5384 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5385 now-server.lastfsync > 1))
5386 {
5387 fsync(server.appendfd); /* Let's try to get this data on the disk */
5388 server.lastfsync = now;
5389 }
5390 }
5391
5392 /* In Redis commands are always executed in the context of a client, so in
5393 * order to load the append only file we need to create a fake client. */
5394 static struct redisClient *createFakeClient(void) {
5395 struct redisClient *c = zmalloc(sizeof(*c));
5396
5397 selectDb(c,0);
5398 c->fd = -1;
5399 c->querybuf = sdsempty();
5400 c->argc = 0;
5401 c->argv = NULL;
5402 c->flags = 0;
5403 /* We set the fake client as a slave waiting for the synchronization
5404 * so that Redis will not try to send replies to this client. */
5405 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5406 c->reply = listCreate();
5407 listSetFreeMethod(c->reply,decrRefCount);
5408 listSetDupMethod(c->reply,dupClientReplyValue);
5409 return c;
5410 }
5411
5412 static void freeFakeClient(struct redisClient *c) {
5413 sdsfree(c->querybuf);
5414 listRelease(c->reply);
5415 zfree(c);
5416 }
5417
5418 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5419 * error (the append only file is zero-length) REDIS_ERR is returned. On
5420 * fatal error an error message is logged and the program exists. */
5421 int loadAppendOnlyFile(char *filename) {
5422 struct redisClient *fakeClient;
5423 FILE *fp = fopen(filename,"r");
5424 struct redis_stat sb;
5425
5426 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5427 return REDIS_ERR;
5428
5429 if (fp == NULL) {
5430 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5431 exit(1);
5432 }
5433
5434 fakeClient = createFakeClient();
5435 while(1) {
5436 int argc, j;
5437 unsigned long len;
5438 robj **argv;
5439 char buf[128];
5440 sds argsds;
5441 struct redisCommand *cmd;
5442
5443 if (fgets(buf,sizeof(buf),fp) == NULL) {
5444 if (feof(fp))
5445 break;
5446 else
5447 goto readerr;
5448 }
5449 if (buf[0] != '*') goto fmterr;
5450 argc = atoi(buf+1);
5451 argv = zmalloc(sizeof(robj*)*argc);
5452 for (j = 0; j < argc; j++) {
5453 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5454 if (buf[0] != '$') goto fmterr;
5455 len = strtol(buf+1,NULL,10);
5456 argsds = sdsnewlen(NULL,len);
5457 if (fread(argsds,len,1,fp) == 0) goto fmterr;
5458 argv[j] = createObject(REDIS_STRING,argsds);
5459 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5460 }
5461
5462 /* Command lookup */
5463 cmd = lookupCommand(argv[0]->ptr);
5464 if (!cmd) {
5465 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5466 exit(1);
5467 }
5468 /* Try object sharing and encoding */
5469 if (server.shareobjects) {
5470 int j;
5471 for(j = 1; j < argc; j++)
5472 argv[j] = tryObjectSharing(argv[j]);
5473 }
5474 if (cmd->flags & REDIS_CMD_BULK)
5475 tryObjectEncoding(argv[argc-1]);
5476 /* Run the command in the context of a fake client */
5477 fakeClient->argc = argc;
5478 fakeClient->argv = argv;
5479 cmd->proc(fakeClient);
5480 /* Discard the reply objects list from the fake client */
5481 while(listLength(fakeClient->reply))
5482 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5483 /* Clean up, ready for the next command */
5484 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5485 zfree(argv);
5486 }
5487 fclose(fp);
5488 freeFakeClient(fakeClient);
5489 return REDIS_OK;
5490
5491 readerr:
5492 if (feof(fp)) {
5493 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5494 } else {
5495 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5496 }
5497 exit(1);
5498 fmterr:
5499 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5500 exit(1);
5501 }
5502
5503 /* ================================= Debugging ============================== */
5504
5505 static void debugCommand(redisClient *c) {
5506 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
5507 *((char*)-1) = 'x';
5508 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
5509 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
5510 robj *key, *val;
5511
5512 if (!de) {
5513 addReply(c,shared.nokeyerr);
5514 return;
5515 }
5516 key = dictGetEntryKey(de);
5517 val = dictGetEntryVal(de);
5518 addReplySds(c,sdscatprintf(sdsempty(),
5519 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5520 key, key->refcount, val, val->refcount, val->encoding));
5521 } else {
5522 addReplySds(c,sdsnew(
5523 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5524 }
5525 }
5526
5527 #ifdef HAVE_BACKTRACE
5528 static struct redisFunctionSym symsTable[] = {
5529 {"compareStringObjects", (unsigned long)compareStringObjects},
5530 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
5531 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
5532 {"dictEncObjHash", (unsigned long)dictEncObjHash},
5533 {"incrDecrCommand", (unsigned long)incrDecrCommand},
5534 {"freeStringObject", (unsigned long)freeStringObject},
5535 {"freeListObject", (unsigned long)freeListObject},
5536 {"freeSetObject", (unsigned long)freeSetObject},
5537 {"decrRefCount", (unsigned long)decrRefCount},
5538 {"createObject", (unsigned long)createObject},
5539 {"freeClient", (unsigned long)freeClient},
5540 {"rdbLoad", (unsigned long)rdbLoad},
5541 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
5542 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
5543 {"addReply", (unsigned long)addReply},
5544 {"addReplySds", (unsigned long)addReplySds},
5545 {"incrRefCount", (unsigned long)incrRefCount},
5546 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
5547 {"createStringObject", (unsigned long)createStringObject},
5548 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
5549 {"syncWithMaster", (unsigned long)syncWithMaster},
5550 {"tryObjectSharing", (unsigned long)tryObjectSharing},
5551 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
5552 {"getDecodedObject", (unsigned long)getDecodedObject},
5553 {"removeExpire", (unsigned long)removeExpire},
5554 {"expireIfNeeded", (unsigned long)expireIfNeeded},
5555 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
5556 {"deleteKey", (unsigned long)deleteKey},
5557 {"getExpire", (unsigned long)getExpire},
5558 {"setExpire", (unsigned long)setExpire},
5559 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
5560 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
5561 {"authCommand", (unsigned long)authCommand},
5562 {"pingCommand", (unsigned long)pingCommand},
5563 {"echoCommand", (unsigned long)echoCommand},
5564 {"setCommand", (unsigned long)setCommand},
5565 {"setnxCommand", (unsigned long)setnxCommand},
5566 {"getCommand", (unsigned long)getCommand},
5567 {"delCommand", (unsigned long)delCommand},
5568 {"existsCommand", (unsigned long)existsCommand},
5569 {"incrCommand", (unsigned long)incrCommand},
5570 {"decrCommand", (unsigned long)decrCommand},
5571 {"incrbyCommand", (unsigned long)incrbyCommand},
5572 {"decrbyCommand", (unsigned long)decrbyCommand},
5573 {"selectCommand", (unsigned long)selectCommand},
5574 {"randomkeyCommand", (unsigned long)randomkeyCommand},
5575 {"keysCommand", (unsigned long)keysCommand},
5576 {"dbsizeCommand", (unsigned long)dbsizeCommand},
5577 {"lastsaveCommand", (unsigned long)lastsaveCommand},
5578 {"saveCommand", (unsigned long)saveCommand},
5579 {"bgsaveCommand", (unsigned long)bgsaveCommand},
5580 {"shutdownCommand", (unsigned long)shutdownCommand},
5581 {"moveCommand", (unsigned long)moveCommand},
5582 {"renameCommand", (unsigned long)renameCommand},
5583 {"renamenxCommand", (unsigned long)renamenxCommand},
5584 {"lpushCommand", (unsigned long)lpushCommand},
5585 {"rpushCommand", (unsigned long)rpushCommand},
5586 {"lpopCommand", (unsigned long)lpopCommand},
5587 {"rpopCommand", (unsigned long)rpopCommand},
5588 {"llenCommand", (unsigned long)llenCommand},
5589 {"lindexCommand", (unsigned long)lindexCommand},
5590 {"lrangeCommand", (unsigned long)lrangeCommand},
5591 {"ltrimCommand", (unsigned long)ltrimCommand},
5592 {"typeCommand", (unsigned long)typeCommand},
5593 {"lsetCommand", (unsigned long)lsetCommand},
5594 {"saddCommand", (unsigned long)saddCommand},
5595 {"sremCommand", (unsigned long)sremCommand},
5596 {"smoveCommand", (unsigned long)smoveCommand},
5597 {"sismemberCommand", (unsigned long)sismemberCommand},
5598 {"scardCommand", (unsigned long)scardCommand},
5599 {"spopCommand", (unsigned long)spopCommand},
5600 {"srandmemberCommand", (unsigned long)srandmemberCommand},
5601 {"sinterCommand", (unsigned long)sinterCommand},
5602 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
5603 {"sunionCommand", (unsigned long)sunionCommand},
5604 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
5605 {"sdiffCommand", (unsigned long)sdiffCommand},
5606 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
5607 {"syncCommand", (unsigned long)syncCommand},
5608 {"flushdbCommand", (unsigned long)flushdbCommand},
5609 {"flushallCommand", (unsigned long)flushallCommand},
5610 {"sortCommand", (unsigned long)sortCommand},
5611 {"lremCommand", (unsigned long)lremCommand},
5612 {"infoCommand", (unsigned long)infoCommand},
5613 {"mgetCommand", (unsigned long)mgetCommand},
5614 {"monitorCommand", (unsigned long)monitorCommand},
5615 {"expireCommand", (unsigned long)expireCommand},
5616 {"expireatCommand", (unsigned long)expireatCommand},
5617 {"getsetCommand", (unsigned long)getsetCommand},
5618 {"ttlCommand", (unsigned long)ttlCommand},
5619 {"slaveofCommand", (unsigned long)slaveofCommand},
5620 {"debugCommand", (unsigned long)debugCommand},
5621 {"processCommand", (unsigned long)processCommand},
5622 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
5623 {"readQueryFromClient", (unsigned long)readQueryFromClient},
5624 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
5625 {"msetGenericCommand", (unsigned long)msetGenericCommand},
5626 {"msetCommand", (unsigned long)msetCommand},
5627 {"msetnxCommand", (unsigned long)msetnxCommand},
5628 {"zslCreateNode", (unsigned long)zslCreateNode},
5629 {"zslCreate", (unsigned long)zslCreate},
5630 {"zslFreeNode",(unsigned long)zslFreeNode},
5631 {"zslFree",(unsigned long)zslFree},
5632 {"zslRandomLevel",(unsigned long)zslRandomLevel},
5633 {"zslInsert",(unsigned long)zslInsert},
5634 {"zslDelete",(unsigned long)zslDelete},
5635 {"createZsetObject",(unsigned long)createZsetObject},
5636 {"zaddCommand",(unsigned long)zaddCommand},
5637 {"zrangeGenericCommand",(unsigned long)zrangeGenericCommand},
5638 {"zrangeCommand",(unsigned long)zrangeCommand},
5639 {"zrevrangeCommand",(unsigned long)zrevrangeCommand},
5640 {"zremCommand",(unsigned long)zremCommand},
5641 {"rdbSaveDoubleValue",(unsigned long)rdbSaveDoubleValue},
5642 {"rdbLoadDoubleValue",(unsigned long)rdbLoadDoubleValue},
5643 {"feedAppendOnlyFile",(unsigned long)feedAppendOnlyFile},
5644 {NULL,0}
5645 };
5646
5647 /* This function try to convert a pointer into a function name. It's used in
5648 * oreder to provide a backtrace under segmentation fault that's able to
5649 * display functions declared as static (otherwise the backtrace is useless). */
5650 static char *findFuncName(void *pointer, unsigned long *offset){
5651 int i, ret = -1;
5652 unsigned long off, minoff = 0;
5653
5654 /* Try to match against the Symbol with the smallest offset */
5655 for (i=0; symsTable[i].pointer; i++) {
5656 unsigned long lp = (unsigned long) pointer;
5657
5658 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5659 off=lp-symsTable[i].pointer;
5660 if (ret < 0 || off < minoff) {
5661 minoff=off;
5662 ret=i;
5663 }
5664 }
5665 }
5666 if (ret == -1) return NULL;
5667 *offset = minoff;
5668 return symsTable[ret].name;
5669 }
5670
5671 static void *getMcontextEip(ucontext_t *uc) {
5672 #if defined(__FreeBSD__)
5673 return (void*) uc->uc_mcontext.mc_eip;
5674 #elif defined(__dietlibc__)
5675 return (void*) uc->uc_mcontext.eip;
5676 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5677 return (void*) uc->uc_mcontext->__ss.__eip;
5678 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5679 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5680 return (void*) uc->uc_mcontext->__ss.__rip;
5681 #else
5682 return (void*) uc->uc_mcontext->__ss.__eip;
5683 #endif
5684 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5685 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5686 #elif defined(__ia64__) /* Linux IA64 */
5687 return (void*) uc->uc_mcontext.sc_ip;
5688 #else
5689 return NULL;
5690 #endif
5691 }
5692
5693 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5694 void *trace[100];
5695 char **messages = NULL;
5696 int i, trace_size = 0;
5697 unsigned long offset=0;
5698 time_t uptime = time(NULL)-server.stat_starttime;
5699 ucontext_t *uc = (ucontext_t*) secret;
5700 REDIS_NOTUSED(info);
5701
5702 redisLog(REDIS_WARNING,
5703 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5704 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5705 "redis_version:%s; "
5706 "uptime_in_seconds:%d; "
5707 "connected_clients:%d; "
5708 "connected_slaves:%d; "
5709 "used_memory:%zu; "
5710 "changes_since_last_save:%lld; "
5711 "bgsave_in_progress:%d; "
5712 "last_save_time:%d; "
5713 "total_connections_received:%lld; "
5714 "total_commands_processed:%lld; "
5715 "role:%s;"
5716 ,REDIS_VERSION,
5717 uptime,
5718 listLength(server.clients)-listLength(server.slaves),
5719 listLength(server.slaves),
5720 server.usedmemory,
5721 server.dirty,
5722 server.bgsaveinprogress,
5723 server.lastsave,
5724 server.stat_numconnections,
5725 server.stat_numcommands,
5726 server.masterhost == NULL ? "master" : "slave"
5727 ));
5728
5729 trace_size = backtrace(trace, 100);
5730 /* overwrite sigaction with caller's address */
5731 if (getMcontextEip(uc) != NULL) {
5732 trace[1] = getMcontextEip(uc);
5733 }
5734 messages = backtrace_symbols(trace, trace_size);
5735
5736 for (i=1; i<trace_size; ++i) {
5737 char *fn = findFuncName(trace[i], &offset), *p;
5738
5739 p = strchr(messages[i],'+');
5740 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5741 redisLog(REDIS_WARNING,"%s", messages[i]);
5742 } else {
5743 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5744 }
5745 }
5746 free(messages);
5747 exit(0);
5748 }
5749
5750 static void setupSigSegvAction(void) {
5751 struct sigaction act;
5752
5753 sigemptyset (&act.sa_mask);
5754 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5755 * is used. Otherwise, sa_handler is used */
5756 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5757 act.sa_sigaction = segvHandler;
5758 sigaction (SIGSEGV, &act, NULL);
5759 sigaction (SIGBUS, &act, NULL);
5760 sigaction (SIGFPE, &act, NULL);
5761 sigaction (SIGILL, &act, NULL);
5762 sigaction (SIGBUS, &act, NULL);
5763 return;
5764 }
5765 #else /* HAVE_BACKTRACE */
5766 static void setupSigSegvAction(void) {
5767 }
5768 #endif /* HAVE_BACKTRACE */
5769
5770 /* =================================== Main! ================================ */
5771
5772 #ifdef __linux__
5773 int linuxOvercommitMemoryValue(void) {
5774 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5775 char buf[64];
5776
5777 if (!fp) return -1;
5778 if (fgets(buf,64,fp) == NULL) {
5779 fclose(fp);
5780 return -1;
5781 }
5782 fclose(fp);
5783
5784 return atoi(buf);
5785 }
5786
5787 void linuxOvercommitMemoryWarning(void) {
5788 if (linuxOvercommitMemoryValue() == 0) {
5789 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5790 }
5791 }
5792 #endif /* __linux__ */
5793
5794 static void daemonize(void) {
5795 int fd;
5796 FILE *fp;
5797
5798 if (fork() != 0) exit(0); /* parent exits */
5799 setsid(); /* create a new session */
5800
5801 /* Every output goes to /dev/null. If Redis is daemonized but
5802 * the 'logfile' is set to 'stdout' in the configuration file
5803 * it will not log at all. */
5804 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5805 dup2(fd, STDIN_FILENO);
5806 dup2(fd, STDOUT_FILENO);
5807 dup2(fd, STDERR_FILENO);
5808 if (fd > STDERR_FILENO) close(fd);
5809 }
5810 /* Try to write the pid file */
5811 fp = fopen(server.pidfile,"w");
5812 if (fp) {
5813 fprintf(fp,"%d\n",getpid());
5814 fclose(fp);
5815 }
5816 }
5817
5818 int main(int argc, char **argv) {
5819 initServerConfig();
5820 if (argc == 2) {
5821 ResetServerSaveParams();
5822 loadServerConfig(argv[1]);
5823 } else if (argc > 2) {
5824 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5825 exit(1);
5826 } else {
5827 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5828 }
5829 initServer();
5830 if (server.daemonize) daemonize();
5831 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5832 #ifdef __linux__
5833 linuxOvercommitMemoryWarning();
5834 #endif
5835 if (server.appendonly) {
5836 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
5837 redisLog(REDIS_NOTICE,"DB loaded from append only file");
5838 } else {
5839 if (rdbLoad(server.dbfilename) == REDIS_OK)
5840 redisLog(REDIS_NOTICE,"DB loaded from disk");
5841 }
5842 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5843 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5844 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5845 aeMain(server.el);
5846 aeDeleteEventLoop(server.el);
5847 return 0;
5848 }