]> git.saurik.com Git - redis.git/blob - redis.c
2d196674f8f237e7041e6e9718dce29ea44f6421
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60 #include <math.h>
61
62 #if defined(__sun)
63 #include "solarisfixes.h"
64 #endif
65
66 #include "redis.h"
67 #include "ae.h" /* Event driven programming library */
68 #include "sds.h" /* Dynamic safe strings */
69 #include "anet.h" /* Networking the easy way */
70 #include "dict.h" /* Hash tables */
71 #include "adlist.h" /* Linked lists */
72 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
73 #include "lzf.h" /* LZF compression library */
74 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
75
76 /* Error codes */
77 #define REDIS_OK 0
78 #define REDIS_ERR -1
79
80 /* Static server configuration */
81 #define REDIS_SERVERPORT 6379 /* TCP port */
82 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
83 #define REDIS_IOBUF_LEN 1024
84 #define REDIS_LOADBUF_LEN 1024
85 #define REDIS_STATIC_ARGS 4
86 #define REDIS_DEFAULT_DBNUM 16
87 #define REDIS_CONFIGLINE_MAX 1024
88 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
89 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
90 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
91 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
92 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
93
94 /* Hash table parameters */
95 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
96
97 /* Command flags */
98 #define REDIS_CMD_BULK 1 /* Bulk write command */
99 #define REDIS_CMD_INLINE 2 /* Inline command */
100 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
101 this flags will return an error when the 'maxmemory' option is set in the
102 config file and the server is using more than maxmemory bytes of memory.
103 In short this commands are denied on low memory conditions. */
104 #define REDIS_CMD_DENYOOM 4
105
106 /* Object types */
107 #define REDIS_STRING 0
108 #define REDIS_LIST 1
109 #define REDIS_SET 2
110 #define REDIS_ZSET 3
111 #define REDIS_HASH 4
112
113 /* Objects encoding */
114 #define REDIS_ENCODING_RAW 0 /* Raw representation */
115 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
116
117 /* Object types only used for dumping to disk */
118 #define REDIS_EXPIRETIME 253
119 #define REDIS_SELECTDB 254
120 #define REDIS_EOF 255
121
122 /* Defines related to the dump file format. To store 32 bits lengths for short
123 * keys requires a lot of space, so we check the most significant 2 bits of
124 * the first byte to interpreter the length:
125 *
126 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
127 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
128 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
129 * 11|000000 this means: specially encoded object will follow. The six bits
130 * number specify the kind of object that follows.
131 * See the REDIS_RDB_ENC_* defines.
132 *
133 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
134 * values, will fit inside. */
135 #define REDIS_RDB_6BITLEN 0
136 #define REDIS_RDB_14BITLEN 1
137 #define REDIS_RDB_32BITLEN 2
138 #define REDIS_RDB_ENCVAL 3
139 #define REDIS_RDB_LENERR UINT_MAX
140
141 /* When a length of a string object stored on disk has the first two bits
142 * set, the remaining two bits specify a special encoding for the object
143 * accordingly to the following defines: */
144 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
145 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
146 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
147 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
148
149 /* Client flags */
150 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
151 #define REDIS_SLAVE 2 /* This client is a slave server */
152 #define REDIS_MASTER 4 /* This client is a master server */
153 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
154
155 /* Slave replication state - slave side */
156 #define REDIS_REPL_NONE 0 /* No active replication */
157 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
158 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
159
160 /* Slave replication state - from the point of view of master
161 * Note that in SEND_BULK and ONLINE state the slave receives new updates
162 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
163 * to start the next background saving in order to send updates to it. */
164 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
165 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
166 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
167 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
168
169 /* List related stuff */
170 #define REDIS_HEAD 0
171 #define REDIS_TAIL 1
172
173 /* Sort operations */
174 #define REDIS_SORT_GET 0
175 #define REDIS_SORT_ASC 1
176 #define REDIS_SORT_DESC 2
177 #define REDIS_SORTKEY_MAX 1024
178
179 /* Log levels */
180 #define REDIS_DEBUG 0
181 #define REDIS_NOTICE 1
182 #define REDIS_WARNING 2
183
184 /* Anti-warning macro... */
185 #define REDIS_NOTUSED(V) ((void) V)
186
187 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
188 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
189
190 /* Append only defines */
191 #define APPENDFSYNC_NO 0
192 #define APPENDFSYNC_ALWAYS 1
193 #define APPENDFSYNC_EVERYSEC 2
194
195 /*================================= Data types ============================== */
196
197 /* A redis object, that is a type able to hold a string / list / set */
198 typedef struct redisObject {
199 void *ptr;
200 unsigned char type;
201 unsigned char encoding;
202 unsigned char notused[2];
203 int refcount;
204 } robj;
205
206 typedef struct redisDb {
207 dict *dict;
208 dict *expires;
209 int id;
210 } redisDb;
211
212 /* With multiplexing we need to take per-clinet state.
213 * Clients are taken in a liked list. */
214 typedef struct redisClient {
215 int fd;
216 redisDb *db;
217 int dictid;
218 sds querybuf;
219 robj **argv, **mbargv;
220 int argc, mbargc;
221 int bulklen; /* bulk read len. -1 if not in bulk read mode */
222 int multibulk; /* multi bulk command format active */
223 list *reply;
224 int sentlen;
225 time_t lastinteraction; /* time of the last interaction, used for timeout */
226 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
227 int slaveseldb; /* slave selected db, if this client is a slave */
228 int authenticated; /* when requirepass is non-NULL */
229 int replstate; /* replication state if this is a slave */
230 int repldbfd; /* replication DB file descriptor */
231 long repldboff; /* replication DB file offset */
232 off_t repldbsize; /* replication DB file size */
233 } redisClient;
234
235 struct saveparam {
236 time_t seconds;
237 int changes;
238 };
239
240 /* Global server state structure */
241 struct redisServer {
242 int port;
243 int fd;
244 redisDb *db;
245 dict *sharingpool;
246 unsigned int sharingpoolsize;
247 long long dirty; /* changes to DB from the last save */
248 list *clients;
249 list *slaves, *monitors;
250 char neterr[ANET_ERR_LEN];
251 aeEventLoop *el;
252 int cronloops; /* number of times the cron function run */
253 list *objfreelist; /* A list of freed objects to avoid malloc() */
254 time_t lastsave; /* Unix time of last save succeeede */
255 size_t usedmemory; /* Used memory in megabytes */
256 /* Fields used only for stats */
257 time_t stat_starttime; /* server start time */
258 long long stat_numcommands; /* number of processed commands */
259 long long stat_numconnections; /* number of connections received */
260 /* Configuration */
261 int verbosity;
262 int glueoutputbuf;
263 int maxidletime;
264 int dbnum;
265 int daemonize;
266 int appendonly;
267 int appendfsync;
268 time_t lastfsync;
269 int appendfd;
270 int appendseldb;
271 char *pidfile;
272 int bgsaveinprogress;
273 pid_t bgsavechildpid;
274 struct saveparam *saveparams;
275 int saveparamslen;
276 char *logfile;
277 char *bindaddr;
278 char *dbfilename;
279 char *appendfilename;
280 char *requirepass;
281 int shareobjects;
282 /* Replication related */
283 int isslave;
284 char *masterauth;
285 char *masterhost;
286 int masterport;
287 redisClient *master; /* client that is master for this slave */
288 int replstate;
289 unsigned int maxclients;
290 unsigned long maxmemory;
291 /* Sort parameters - qsort_r() is only available under BSD so we
292 * have to take this state global, in order to pass it to sortCompare() */
293 int sort_desc;
294 int sort_alpha;
295 int sort_bypattern;
296 };
297
298 typedef void redisCommandProc(redisClient *c);
299 struct redisCommand {
300 char *name;
301 redisCommandProc *proc;
302 int arity;
303 int flags;
304 };
305
306 struct redisFunctionSym {
307 char *name;
308 unsigned long pointer;
309 };
310
311 typedef struct _redisSortObject {
312 robj *obj;
313 union {
314 double score;
315 robj *cmpobj;
316 } u;
317 } redisSortObject;
318
319 typedef struct _redisSortOperation {
320 int type;
321 robj *pattern;
322 } redisSortOperation;
323
324 /* ZSETs use a specialized version of Skiplists */
325
326 typedef struct zskiplistNode {
327 struct zskiplistNode **forward;
328 struct zskiplistNode *backward;
329 double score;
330 robj *obj;
331 } zskiplistNode;
332
333 typedef struct zskiplist {
334 struct zskiplistNode *header, *tail;
335 unsigned long length;
336 int level;
337 } zskiplist;
338
339 typedef struct zset {
340 dict *dict;
341 zskiplist *zsl;
342 } zset;
343
344 /* Our shared "common" objects */
345
346 struct sharedObjectsStruct {
347 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
348 *colon, *nullbulk, *nullmultibulk,
349 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
350 *outofrangeerr, *plus,
351 *select0, *select1, *select2, *select3, *select4,
352 *select5, *select6, *select7, *select8, *select9;
353 } shared;
354
355 /* Global vars that are actally used as constants. The following double
356 * values are used for double on-disk serialization, and are initialized
357 * at runtime to avoid strange compiler optimizations. */
358
359 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
360
361 /*================================ Prototypes =============================== */
362
363 static void freeStringObject(robj *o);
364 static void freeListObject(robj *o);
365 static void freeSetObject(robj *o);
366 static void decrRefCount(void *o);
367 static robj *createObject(int type, void *ptr);
368 static void freeClient(redisClient *c);
369 static int rdbLoad(char *filename);
370 static void addReply(redisClient *c, robj *obj);
371 static void addReplySds(redisClient *c, sds s);
372 static void incrRefCount(robj *o);
373 static int rdbSaveBackground(char *filename);
374 static robj *createStringObject(char *ptr, size_t len);
375 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
376 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
377 static int syncWithMaster(void);
378 static robj *tryObjectSharing(robj *o);
379 static int tryObjectEncoding(robj *o);
380 static robj *getDecodedObject(const robj *o);
381 static int removeExpire(redisDb *db, robj *key);
382 static int expireIfNeeded(redisDb *db, robj *key);
383 static int deleteIfVolatile(redisDb *db, robj *key);
384 static int deleteKey(redisDb *db, robj *key);
385 static time_t getExpire(redisDb *db, robj *key);
386 static int setExpire(redisDb *db, robj *key, time_t when);
387 static void updateSlavesWaitingBgsave(int bgsaveerr);
388 static void freeMemoryIfNeeded(void);
389 static int processCommand(redisClient *c);
390 static void setupSigSegvAction(void);
391 static void rdbRemoveTempFile(pid_t childpid);
392 static size_t stringObjectLen(robj *o);
393 static void processInputBuffer(redisClient *c);
394 static zskiplist *zslCreate(void);
395 static void zslFree(zskiplist *zsl);
396 static void zslInsert(zskiplist *zsl, double score, robj *obj);
397
398 static void authCommand(redisClient *c);
399 static void pingCommand(redisClient *c);
400 static void echoCommand(redisClient *c);
401 static void setCommand(redisClient *c);
402 static void setnxCommand(redisClient *c);
403 static void getCommand(redisClient *c);
404 static void delCommand(redisClient *c);
405 static void existsCommand(redisClient *c);
406 static void incrCommand(redisClient *c);
407 static void decrCommand(redisClient *c);
408 static void incrbyCommand(redisClient *c);
409 static void decrbyCommand(redisClient *c);
410 static void selectCommand(redisClient *c);
411 static void randomkeyCommand(redisClient *c);
412 static void keysCommand(redisClient *c);
413 static void dbsizeCommand(redisClient *c);
414 static void lastsaveCommand(redisClient *c);
415 static void saveCommand(redisClient *c);
416 static void bgsaveCommand(redisClient *c);
417 static void shutdownCommand(redisClient *c);
418 static void moveCommand(redisClient *c);
419 static void renameCommand(redisClient *c);
420 static void renamenxCommand(redisClient *c);
421 static void lpushCommand(redisClient *c);
422 static void rpushCommand(redisClient *c);
423 static void lpopCommand(redisClient *c);
424 static void rpopCommand(redisClient *c);
425 static void llenCommand(redisClient *c);
426 static void lindexCommand(redisClient *c);
427 static void lrangeCommand(redisClient *c);
428 static void ltrimCommand(redisClient *c);
429 static void typeCommand(redisClient *c);
430 static void lsetCommand(redisClient *c);
431 static void saddCommand(redisClient *c);
432 static void sremCommand(redisClient *c);
433 static void smoveCommand(redisClient *c);
434 static void sismemberCommand(redisClient *c);
435 static void scardCommand(redisClient *c);
436 static void spopCommand(redisClient *c);
437 static void srandmemberCommand(redisClient *c);
438 static void sinterCommand(redisClient *c);
439 static void sinterstoreCommand(redisClient *c);
440 static void sunionCommand(redisClient *c);
441 static void sunionstoreCommand(redisClient *c);
442 static void sdiffCommand(redisClient *c);
443 static void sdiffstoreCommand(redisClient *c);
444 static void syncCommand(redisClient *c);
445 static void flushdbCommand(redisClient *c);
446 static void flushallCommand(redisClient *c);
447 static void sortCommand(redisClient *c);
448 static void lremCommand(redisClient *c);
449 static void infoCommand(redisClient *c);
450 static void mgetCommand(redisClient *c);
451 static void monitorCommand(redisClient *c);
452 static void expireCommand(redisClient *c);
453 static void expireatCommand(redisClient *c);
454 static void getsetCommand(redisClient *c);
455 static void ttlCommand(redisClient *c);
456 static void slaveofCommand(redisClient *c);
457 static void debugCommand(redisClient *c);
458 static void msetCommand(redisClient *c);
459 static void msetnxCommand(redisClient *c);
460 static void zaddCommand(redisClient *c);
461 static void zrangeCommand(redisClient *c);
462 static void zrangebyscoreCommand(redisClient *c);
463 static void zrevrangeCommand(redisClient *c);
464 static void zcardCommand(redisClient *c);
465 static void zremCommand(redisClient *c);
466 static void zscoreCommand(redisClient *c);
467 static void zremrangebyscoreCommand(redisClient *c);
468
469 /*================================= Globals ================================= */
470
471 /* Global vars */
472 static struct redisServer server; /* server global state */
473 static struct redisCommand cmdTable[] = {
474 {"get",getCommand,2,REDIS_CMD_INLINE},
475 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
476 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
477 {"del",delCommand,-2,REDIS_CMD_INLINE},
478 {"exists",existsCommand,2,REDIS_CMD_INLINE},
479 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
480 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
481 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
482 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
483 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
484 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
485 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
486 {"llen",llenCommand,2,REDIS_CMD_INLINE},
487 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
488 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
489 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
490 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
491 {"lrem",lremCommand,4,REDIS_CMD_BULK},
492 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
493 {"srem",sremCommand,3,REDIS_CMD_BULK},
494 {"smove",smoveCommand,4,REDIS_CMD_BULK},
495 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
496 {"scard",scardCommand,2,REDIS_CMD_INLINE},
497 {"spop",spopCommand,2,REDIS_CMD_INLINE},
498 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
499 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
500 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
501 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
502 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
503 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
504 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
505 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
506 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
507 {"zrem",zremCommand,3,REDIS_CMD_BULK},
508 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
509 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
510 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
511 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
512 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
513 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
514 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
515 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
516 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
517 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
518 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
519 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
520 {"select",selectCommand,2,REDIS_CMD_INLINE},
521 {"move",moveCommand,3,REDIS_CMD_INLINE},
522 {"rename",renameCommand,3,REDIS_CMD_INLINE},
523 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
524 {"expire",expireCommand,3,REDIS_CMD_INLINE},
525 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
526 {"keys",keysCommand,2,REDIS_CMD_INLINE},
527 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
528 {"auth",authCommand,2,REDIS_CMD_INLINE},
529 {"ping",pingCommand,1,REDIS_CMD_INLINE},
530 {"echo",echoCommand,2,REDIS_CMD_BULK},
531 {"save",saveCommand,1,REDIS_CMD_INLINE},
532 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
533 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
534 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
535 {"type",typeCommand,2,REDIS_CMD_INLINE},
536 {"sync",syncCommand,1,REDIS_CMD_INLINE},
537 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
538 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
539 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
540 {"info",infoCommand,1,REDIS_CMD_INLINE},
541 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
542 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
543 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
544 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
545 {NULL,NULL,0,0}
546 };
547 /*============================ Utility functions ============================ */
548
549 /* Glob-style pattern matching. */
550 int stringmatchlen(const char *pattern, int patternLen,
551 const char *string, int stringLen, int nocase)
552 {
553 while(patternLen) {
554 switch(pattern[0]) {
555 case '*':
556 while (pattern[1] == '*') {
557 pattern++;
558 patternLen--;
559 }
560 if (patternLen == 1)
561 return 1; /* match */
562 while(stringLen) {
563 if (stringmatchlen(pattern+1, patternLen-1,
564 string, stringLen, nocase))
565 return 1; /* match */
566 string++;
567 stringLen--;
568 }
569 return 0; /* no match */
570 break;
571 case '?':
572 if (stringLen == 0)
573 return 0; /* no match */
574 string++;
575 stringLen--;
576 break;
577 case '[':
578 {
579 int not, match;
580
581 pattern++;
582 patternLen--;
583 not = pattern[0] == '^';
584 if (not) {
585 pattern++;
586 patternLen--;
587 }
588 match = 0;
589 while(1) {
590 if (pattern[0] == '\\') {
591 pattern++;
592 patternLen--;
593 if (pattern[0] == string[0])
594 match = 1;
595 } else if (pattern[0] == ']') {
596 break;
597 } else if (patternLen == 0) {
598 pattern--;
599 patternLen++;
600 break;
601 } else if (pattern[1] == '-' && patternLen >= 3) {
602 int start = pattern[0];
603 int end = pattern[2];
604 int c = string[0];
605 if (start > end) {
606 int t = start;
607 start = end;
608 end = t;
609 }
610 if (nocase) {
611 start = tolower(start);
612 end = tolower(end);
613 c = tolower(c);
614 }
615 pattern += 2;
616 patternLen -= 2;
617 if (c >= start && c <= end)
618 match = 1;
619 } else {
620 if (!nocase) {
621 if (pattern[0] == string[0])
622 match = 1;
623 } else {
624 if (tolower((int)pattern[0]) == tolower((int)string[0]))
625 match = 1;
626 }
627 }
628 pattern++;
629 patternLen--;
630 }
631 if (not)
632 match = !match;
633 if (!match)
634 return 0; /* no match */
635 string++;
636 stringLen--;
637 break;
638 }
639 case '\\':
640 if (patternLen >= 2) {
641 pattern++;
642 patternLen--;
643 }
644 /* fall through */
645 default:
646 if (!nocase) {
647 if (pattern[0] != string[0])
648 return 0; /* no match */
649 } else {
650 if (tolower((int)pattern[0]) != tolower((int)string[0]))
651 return 0; /* no match */
652 }
653 string++;
654 stringLen--;
655 break;
656 }
657 pattern++;
658 patternLen--;
659 if (stringLen == 0) {
660 while(*pattern == '*') {
661 pattern++;
662 patternLen--;
663 }
664 break;
665 }
666 }
667 if (patternLen == 0 && stringLen == 0)
668 return 1;
669 return 0;
670 }
671
672 static void redisLog(int level, const char *fmt, ...) {
673 va_list ap;
674 FILE *fp;
675
676 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
677 if (!fp) return;
678
679 va_start(ap, fmt);
680 if (level >= server.verbosity) {
681 char *c = ".-*";
682 char buf[64];
683 time_t now;
684
685 now = time(NULL);
686 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
687 fprintf(fp,"%s %c ",buf,c[level]);
688 vfprintf(fp, fmt, ap);
689 fprintf(fp,"\n");
690 fflush(fp);
691 }
692 va_end(ap);
693
694 if (server.logfile) fclose(fp);
695 }
696
697 /*====================== Hash table type implementation ==================== */
698
699 /* This is an hash table type that uses the SDS dynamic strings libary as
700 * keys and radis objects as values (objects can hold SDS strings,
701 * lists, sets). */
702
703 static void dictVanillaFree(void *privdata, void *val)
704 {
705 DICT_NOTUSED(privdata);
706 zfree(val);
707 }
708
709 static int sdsDictKeyCompare(void *privdata, const void *key1,
710 const void *key2)
711 {
712 int l1,l2;
713 DICT_NOTUSED(privdata);
714
715 l1 = sdslen((sds)key1);
716 l2 = sdslen((sds)key2);
717 if (l1 != l2) return 0;
718 return memcmp(key1, key2, l1) == 0;
719 }
720
721 static void dictRedisObjectDestructor(void *privdata, void *val)
722 {
723 DICT_NOTUSED(privdata);
724
725 decrRefCount(val);
726 }
727
728 static int dictObjKeyCompare(void *privdata, const void *key1,
729 const void *key2)
730 {
731 const robj *o1 = key1, *o2 = key2;
732 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
733 }
734
735 static unsigned int dictObjHash(const void *key) {
736 const robj *o = key;
737 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
738 }
739
740 static int dictEncObjKeyCompare(void *privdata, const void *key1,
741 const void *key2)
742 {
743 const robj *o1 = key1, *o2 = key2;
744
745 if (o1->encoding == REDIS_ENCODING_RAW &&
746 o2->encoding == REDIS_ENCODING_RAW)
747 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
748 else {
749 robj *dec1, *dec2;
750 int cmp;
751
752 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
753 getDecodedObject(o1) : (robj*)o1;
754 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
755 getDecodedObject(o2) : (robj*)o2;
756 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
757 if (dec1 != o1) decrRefCount(dec1);
758 if (dec2 != o2) decrRefCount(dec2);
759 return cmp;
760 }
761 }
762
763 static unsigned int dictEncObjHash(const void *key) {
764 const robj *o = key;
765
766 if (o->encoding == REDIS_ENCODING_RAW)
767 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
768 else {
769 robj *dec = getDecodedObject(o);
770 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
771 decrRefCount(dec);
772 return hash;
773 }
774 }
775
776 static dictType setDictType = {
777 dictEncObjHash, /* hash function */
778 NULL, /* key dup */
779 NULL, /* val dup */
780 dictEncObjKeyCompare, /* key compare */
781 dictRedisObjectDestructor, /* key destructor */
782 NULL /* val destructor */
783 };
784
785 static dictType zsetDictType = {
786 dictEncObjHash, /* hash function */
787 NULL, /* key dup */
788 NULL, /* val dup */
789 dictEncObjKeyCompare, /* key compare */
790 dictRedisObjectDestructor, /* key destructor */
791 dictVanillaFree /* val destructor */
792 };
793
794 static dictType hashDictType = {
795 dictObjHash, /* hash function */
796 NULL, /* key dup */
797 NULL, /* val dup */
798 dictObjKeyCompare, /* key compare */
799 dictRedisObjectDestructor, /* key destructor */
800 dictRedisObjectDestructor /* val destructor */
801 };
802
803 /* ========================= Random utility functions ======================= */
804
805 /* Redis generally does not try to recover from out of memory conditions
806 * when allocating objects or strings, it is not clear if it will be possible
807 * to report this condition to the client since the networking layer itself
808 * is based on heap allocation for send buffers, so we simply abort.
809 * At least the code will be simpler to read... */
810 static void oom(const char *msg) {
811 fprintf(stderr, "%s: Out of memory\n",msg);
812 fflush(stderr);
813 sleep(1);
814 abort();
815 }
816
817 /* ====================== Redis server networking stuff ===================== */
818 static void closeTimedoutClients(void) {
819 redisClient *c;
820 listNode *ln;
821 time_t now = time(NULL);
822
823 listRewind(server.clients);
824 while ((ln = listYield(server.clients)) != NULL) {
825 c = listNodeValue(ln);
826 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
827 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
828 (now - c->lastinteraction > server.maxidletime)) {
829 redisLog(REDIS_DEBUG,"Closing idle client");
830 freeClient(c);
831 }
832 }
833 }
834
835 static int htNeedsResize(dict *dict) {
836 long long size, used;
837
838 size = dictSlots(dict);
839 used = dictSize(dict);
840 return (size && used && size > DICT_HT_INITIAL_SIZE &&
841 (used*100/size < REDIS_HT_MINFILL));
842 }
843
844 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
845 * we resize the hash table to save memory */
846 static void tryResizeHashTables(void) {
847 int j;
848
849 for (j = 0; j < server.dbnum; j++) {
850 if (htNeedsResize(server.db[j].dict)) {
851 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
852 dictResize(server.db[j].dict);
853 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
854 }
855 if (htNeedsResize(server.db[j].expires))
856 dictResize(server.db[j].expires);
857 }
858 }
859
860 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
861 int j, loops = server.cronloops++;
862 REDIS_NOTUSED(eventLoop);
863 REDIS_NOTUSED(id);
864 REDIS_NOTUSED(clientData);
865
866 /* Update the global state with the amount of used memory */
867 server.usedmemory = zmalloc_used_memory();
868
869 /* Show some info about non-empty databases */
870 for (j = 0; j < server.dbnum; j++) {
871 long long size, used, vkeys;
872
873 size = dictSlots(server.db[j].dict);
874 used = dictSize(server.db[j].dict);
875 vkeys = dictSize(server.db[j].expires);
876 if (!(loops % 5) && (used || vkeys)) {
877 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
878 /* dictPrintStats(server.dict); */
879 }
880 }
881
882 /* We don't want to resize the hash tables while a bacground saving
883 * is in progress: the saving child is created using fork() that is
884 * implemented with a copy-on-write semantic in most modern systems, so
885 * if we resize the HT while there is the saving child at work actually
886 * a lot of memory movements in the parent will cause a lot of pages
887 * copied. */
888 if (!server.bgsaveinprogress) tryResizeHashTables();
889
890 /* Show information about connected clients */
891 if (!(loops % 5)) {
892 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
893 listLength(server.clients)-listLength(server.slaves),
894 listLength(server.slaves),
895 server.usedmemory,
896 dictSize(server.sharingpool));
897 }
898
899 /* Close connections of timedout clients */
900 if (server.maxidletime && !(loops % 10))
901 closeTimedoutClients();
902
903 /* Check if a background saving in progress terminated */
904 if (server.bgsaveinprogress) {
905 int statloc;
906 if (wait4(-1,&statloc,WNOHANG,NULL)) {
907 int exitcode = WEXITSTATUS(statloc);
908 int bysignal = WIFSIGNALED(statloc);
909
910 if (!bysignal && exitcode == 0) {
911 redisLog(REDIS_NOTICE,
912 "Background saving terminated with success");
913 server.dirty = 0;
914 server.lastsave = time(NULL);
915 } else if (!bysignal && exitcode != 0) {
916 redisLog(REDIS_WARNING, "Background saving error");
917 } else {
918 redisLog(REDIS_WARNING,
919 "Background saving terminated by signal");
920 rdbRemoveTempFile(server.bgsavechildpid);
921 }
922 server.bgsaveinprogress = 0;
923 server.bgsavechildpid = -1;
924 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
925 }
926 } else {
927 /* If there is not a background saving in progress check if
928 * we have to save now */
929 time_t now = time(NULL);
930 for (j = 0; j < server.saveparamslen; j++) {
931 struct saveparam *sp = server.saveparams+j;
932
933 if (server.dirty >= sp->changes &&
934 now-server.lastsave > sp->seconds) {
935 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
936 sp->changes, sp->seconds);
937 rdbSaveBackground(server.dbfilename);
938 break;
939 }
940 }
941 }
942
943 /* Try to expire a few timed out keys. The algorithm used is adaptive and
944 * will use few CPU cycles if there are few expiring keys, otherwise
945 * it will get more aggressive to avoid that too much memory is used by
946 * keys that can be removed from the keyspace. */
947 for (j = 0; j < server.dbnum; j++) {
948 int expired;
949 redisDb *db = server.db+j;
950
951 /* Continue to expire if at the end of the cycle more than 25%
952 * of the keys were expired. */
953 do {
954 int num = dictSize(db->expires);
955 time_t now = time(NULL);
956
957 expired = 0;
958 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
959 num = REDIS_EXPIRELOOKUPS_PER_CRON;
960 while (num--) {
961 dictEntry *de;
962 time_t t;
963
964 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
965 t = (time_t) dictGetEntryVal(de);
966 if (now > t) {
967 deleteKey(db,dictGetEntryKey(de));
968 expired++;
969 }
970 }
971 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
972 }
973
974 /* Check if we should connect to a MASTER */
975 if (server.replstate == REDIS_REPL_CONNECT) {
976 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
977 if (syncWithMaster() == REDIS_OK) {
978 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
979 }
980 }
981 return 1000;
982 }
983
984 static void createSharedObjects(void) {
985 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
986 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
987 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
988 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
989 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
990 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
991 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
992 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
993 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
994 /* no such key */
995 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
996 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
997 "-ERR Operation against a key holding the wrong kind of value\r\n"));
998 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
999 "-ERR no such key\r\n"));
1000 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1001 "-ERR syntax error\r\n"));
1002 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1003 "-ERR source and destination objects are the same\r\n"));
1004 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1005 "-ERR index out of range\r\n"));
1006 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1007 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1008 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1009 shared.select0 = createStringObject("select 0\r\n",10);
1010 shared.select1 = createStringObject("select 1\r\n",10);
1011 shared.select2 = createStringObject("select 2\r\n",10);
1012 shared.select3 = createStringObject("select 3\r\n",10);
1013 shared.select4 = createStringObject("select 4\r\n",10);
1014 shared.select5 = createStringObject("select 5\r\n",10);
1015 shared.select6 = createStringObject("select 6\r\n",10);
1016 shared.select7 = createStringObject("select 7\r\n",10);
1017 shared.select8 = createStringObject("select 8\r\n",10);
1018 shared.select9 = createStringObject("select 9\r\n",10);
1019 }
1020
1021 static void appendServerSaveParams(time_t seconds, int changes) {
1022 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1023 server.saveparams[server.saveparamslen].seconds = seconds;
1024 server.saveparams[server.saveparamslen].changes = changes;
1025 server.saveparamslen++;
1026 }
1027
1028 static void ResetServerSaveParams() {
1029 zfree(server.saveparams);
1030 server.saveparams = NULL;
1031 server.saveparamslen = 0;
1032 }
1033
1034 static void initServerConfig() {
1035 server.dbnum = REDIS_DEFAULT_DBNUM;
1036 server.port = REDIS_SERVERPORT;
1037 server.verbosity = REDIS_DEBUG;
1038 server.maxidletime = REDIS_MAXIDLETIME;
1039 server.saveparams = NULL;
1040 server.logfile = NULL; /* NULL = log on standard output */
1041 server.bindaddr = NULL;
1042 server.glueoutputbuf = 1;
1043 server.daemonize = 0;
1044 server.appendonly = 0;
1045 server.appendfsync = APPENDFSYNC_ALWAYS;
1046 server.lastfsync = time(NULL);
1047 server.appendfd = -1;
1048 server.appendseldb = -1; /* Make sure the first time will not match */
1049 server.pidfile = "/var/run/redis.pid";
1050 server.dbfilename = "dump.rdb";
1051 server.appendfilename = "appendonly.log";
1052 server.requirepass = NULL;
1053 server.shareobjects = 0;
1054 server.sharingpoolsize = 1024;
1055 server.maxclients = 0;
1056 server.maxmemory = 0;
1057 ResetServerSaveParams();
1058
1059 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1060 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1061 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1062 /* Replication related */
1063 server.isslave = 0;
1064 server.masterauth = NULL;
1065 server.masterhost = NULL;
1066 server.masterport = 6379;
1067 server.master = NULL;
1068 server.replstate = REDIS_REPL_NONE;
1069
1070 /* Double constants initialization */
1071 R_Zero = 0.0;
1072 R_PosInf = 1.0/R_Zero;
1073 R_NegInf = -1.0/R_Zero;
1074 R_Nan = R_Zero/R_Zero;
1075 }
1076
1077 static void initServer() {
1078 int j;
1079
1080 signal(SIGHUP, SIG_IGN);
1081 signal(SIGPIPE, SIG_IGN);
1082 setupSigSegvAction();
1083
1084 server.clients = listCreate();
1085 server.slaves = listCreate();
1086 server.monitors = listCreate();
1087 server.objfreelist = listCreate();
1088 createSharedObjects();
1089 server.el = aeCreateEventLoop();
1090 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1091 server.sharingpool = dictCreate(&setDictType,NULL);
1092 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1093 if (server.fd == -1) {
1094 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1095 exit(1);
1096 }
1097 for (j = 0; j < server.dbnum; j++) {
1098 server.db[j].dict = dictCreate(&hashDictType,NULL);
1099 server.db[j].expires = dictCreate(&setDictType,NULL);
1100 server.db[j].id = j;
1101 }
1102 server.cronloops = 0;
1103 server.bgsaveinprogress = 0;
1104 server.bgsavechildpid = -1;
1105 server.lastsave = time(NULL);
1106 server.dirty = 0;
1107 server.usedmemory = 0;
1108 server.stat_numcommands = 0;
1109 server.stat_numconnections = 0;
1110 server.stat_starttime = time(NULL);
1111 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1112
1113 if (server.appendonly) {
1114 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1115 if (server.appendfd == -1) {
1116 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1117 strerror(errno));
1118 exit(1);
1119 }
1120 }
1121 }
1122
1123 /* Empty the whole database */
1124 static long long emptyDb() {
1125 int j;
1126 long long removed = 0;
1127
1128 for (j = 0; j < server.dbnum; j++) {
1129 removed += dictSize(server.db[j].dict);
1130 dictEmpty(server.db[j].dict);
1131 dictEmpty(server.db[j].expires);
1132 }
1133 return removed;
1134 }
1135
1136 static int yesnotoi(char *s) {
1137 if (!strcasecmp(s,"yes")) return 1;
1138 else if (!strcasecmp(s,"no")) return 0;
1139 else return -1;
1140 }
1141
1142 /* I agree, this is a very rudimental way to load a configuration...
1143 will improve later if the config gets more complex */
1144 static void loadServerConfig(char *filename) {
1145 FILE *fp;
1146 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1147 int linenum = 0;
1148 sds line = NULL;
1149
1150 if (filename[0] == '-' && filename[1] == '\0')
1151 fp = stdin;
1152 else {
1153 if ((fp = fopen(filename,"r")) == NULL) {
1154 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1155 exit(1);
1156 }
1157 }
1158
1159 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1160 sds *argv;
1161 int argc, j;
1162
1163 linenum++;
1164 line = sdsnew(buf);
1165 line = sdstrim(line," \t\r\n");
1166
1167 /* Skip comments and blank lines*/
1168 if (line[0] == '#' || line[0] == '\0') {
1169 sdsfree(line);
1170 continue;
1171 }
1172
1173 /* Split into arguments */
1174 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1175 sdstolower(argv[0]);
1176
1177 /* Execute config directives */
1178 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1179 server.maxidletime = atoi(argv[1]);
1180 if (server.maxidletime < 0) {
1181 err = "Invalid timeout value"; goto loaderr;
1182 }
1183 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1184 server.port = atoi(argv[1]);
1185 if (server.port < 1 || server.port > 65535) {
1186 err = "Invalid port"; goto loaderr;
1187 }
1188 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1189 server.bindaddr = zstrdup(argv[1]);
1190 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1191 int seconds = atoi(argv[1]);
1192 int changes = atoi(argv[2]);
1193 if (seconds < 1 || changes < 0) {
1194 err = "Invalid save parameters"; goto loaderr;
1195 }
1196 appendServerSaveParams(seconds,changes);
1197 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1198 if (chdir(argv[1]) == -1) {
1199 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1200 argv[1], strerror(errno));
1201 exit(1);
1202 }
1203 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1204 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1205 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1206 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1207 else {
1208 err = "Invalid log level. Must be one of debug, notice, warning";
1209 goto loaderr;
1210 }
1211 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1212 FILE *logfp;
1213
1214 server.logfile = zstrdup(argv[1]);
1215 if (!strcasecmp(server.logfile,"stdout")) {
1216 zfree(server.logfile);
1217 server.logfile = NULL;
1218 }
1219 if (server.logfile) {
1220 /* Test if we are able to open the file. The server will not
1221 * be able to abort just for this problem later... */
1222 logfp = fopen(server.logfile,"a");
1223 if (logfp == NULL) {
1224 err = sdscatprintf(sdsempty(),
1225 "Can't open the log file: %s", strerror(errno));
1226 goto loaderr;
1227 }
1228 fclose(logfp);
1229 }
1230 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1231 server.dbnum = atoi(argv[1]);
1232 if (server.dbnum < 1) {
1233 err = "Invalid number of databases"; goto loaderr;
1234 }
1235 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1236 server.maxclients = atoi(argv[1]);
1237 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1238 server.maxmemory = strtoll(argv[1], NULL, 10);
1239 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1240 server.masterhost = sdsnew(argv[1]);
1241 server.masterport = atoi(argv[2]);
1242 server.replstate = REDIS_REPL_CONNECT;
1243 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1244 server.masterauth = zstrdup(argv[1]);
1245 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1246 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1247 err = "argument must be 'yes' or 'no'"; goto loaderr;
1248 }
1249 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1250 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1251 err = "argument must be 'yes' or 'no'"; goto loaderr;
1252 }
1253 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1254 server.sharingpoolsize = atoi(argv[1]);
1255 if (server.sharingpoolsize < 1) {
1256 err = "invalid object sharing pool size"; goto loaderr;
1257 }
1258 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1259 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1260 err = "argument must be 'yes' or 'no'"; goto loaderr;
1261 }
1262 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1263 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1264 err = "argument must be 'yes' or 'no'"; goto loaderr;
1265 }
1266 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1267 if (!strcasecmp(argv[1],"no")) {
1268 server.appendfsync = APPENDFSYNC_NO;
1269 } else if (!strcasecmp(argv[1],"always")) {
1270 server.appendfsync = APPENDFSYNC_ALWAYS;
1271 } else if (!strcasecmp(argv[1],"everysec")) {
1272 server.appendfsync = APPENDFSYNC_EVERYSEC;
1273 } else {
1274 err = "argument must be 'no', 'always' or 'everysec'";
1275 goto loaderr;
1276 }
1277 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1278 server.requirepass = zstrdup(argv[1]);
1279 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1280 server.pidfile = zstrdup(argv[1]);
1281 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1282 server.dbfilename = zstrdup(argv[1]);
1283 } else {
1284 err = "Bad directive or wrong number of arguments"; goto loaderr;
1285 }
1286 for (j = 0; j < argc; j++)
1287 sdsfree(argv[j]);
1288 zfree(argv);
1289 sdsfree(line);
1290 }
1291 if (fp != stdin) fclose(fp);
1292 return;
1293
1294 loaderr:
1295 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1296 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1297 fprintf(stderr, ">>> '%s'\n", line);
1298 fprintf(stderr, "%s\n", err);
1299 exit(1);
1300 }
1301
1302 static void freeClientArgv(redisClient *c) {
1303 int j;
1304
1305 for (j = 0; j < c->argc; j++)
1306 decrRefCount(c->argv[j]);
1307 for (j = 0; j < c->mbargc; j++)
1308 decrRefCount(c->mbargv[j]);
1309 c->argc = 0;
1310 c->mbargc = 0;
1311 }
1312
1313 static void freeClient(redisClient *c) {
1314 listNode *ln;
1315
1316 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1317 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1318 sdsfree(c->querybuf);
1319 listRelease(c->reply);
1320 freeClientArgv(c);
1321 close(c->fd);
1322 ln = listSearchKey(server.clients,c);
1323 assert(ln != NULL);
1324 listDelNode(server.clients,ln);
1325 if (c->flags & REDIS_SLAVE) {
1326 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1327 close(c->repldbfd);
1328 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1329 ln = listSearchKey(l,c);
1330 assert(ln != NULL);
1331 listDelNode(l,ln);
1332 }
1333 if (c->flags & REDIS_MASTER) {
1334 server.master = NULL;
1335 server.replstate = REDIS_REPL_CONNECT;
1336 }
1337 zfree(c->argv);
1338 zfree(c->mbargv);
1339 zfree(c);
1340 }
1341
1342 static void glueReplyBuffersIfNeeded(redisClient *c) {
1343 int totlen = 0;
1344 listNode *ln;
1345 robj *o;
1346
1347 listRewind(c->reply);
1348 while((ln = listYield(c->reply))) {
1349 o = ln->value;
1350 totlen += sdslen(o->ptr);
1351 /* This optimization makes more sense if we don't have to copy
1352 * too much data */
1353 if (totlen > 1024) return;
1354 }
1355 if (totlen > 0) {
1356 char buf[1024];
1357 int copylen = 0;
1358
1359 listRewind(c->reply);
1360 while((ln = listYield(c->reply))) {
1361 o = ln->value;
1362 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1363 copylen += sdslen(o->ptr);
1364 listDelNode(c->reply,ln);
1365 }
1366 /* Now the output buffer is empty, add the new single element */
1367 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1368 listAddNodeTail(c->reply,o);
1369 }
1370 }
1371
1372 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1373 redisClient *c = privdata;
1374 int nwritten = 0, totwritten = 0, objlen;
1375 robj *o;
1376 REDIS_NOTUSED(el);
1377 REDIS_NOTUSED(mask);
1378
1379 if (server.glueoutputbuf && listLength(c->reply) > 1)
1380 glueReplyBuffersIfNeeded(c);
1381 while(listLength(c->reply)) {
1382 o = listNodeValue(listFirst(c->reply));
1383 objlen = sdslen(o->ptr);
1384
1385 if (objlen == 0) {
1386 listDelNode(c->reply,listFirst(c->reply));
1387 continue;
1388 }
1389
1390 if (c->flags & REDIS_MASTER) {
1391 /* Don't reply to a master */
1392 nwritten = objlen - c->sentlen;
1393 } else {
1394 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1395 if (nwritten <= 0) break;
1396 }
1397 c->sentlen += nwritten;
1398 totwritten += nwritten;
1399 /* If we fully sent the object on head go to the next one */
1400 if (c->sentlen == objlen) {
1401 listDelNode(c->reply,listFirst(c->reply));
1402 c->sentlen = 0;
1403 }
1404 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1405 * bytes, in a single threaded server it's a good idea to server
1406 * other clients as well, even if a very large request comes from
1407 * super fast link that is always able to accept data (in real world
1408 * terms think to 'KEYS *' against the loopback interfae) */
1409 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1410 }
1411 if (nwritten == -1) {
1412 if (errno == EAGAIN) {
1413 nwritten = 0;
1414 } else {
1415 redisLog(REDIS_DEBUG,
1416 "Error writing to client: %s", strerror(errno));
1417 freeClient(c);
1418 return;
1419 }
1420 }
1421 if (totwritten > 0) c->lastinteraction = time(NULL);
1422 if (listLength(c->reply) == 0) {
1423 c->sentlen = 0;
1424 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1425 }
1426 }
1427
1428 static struct redisCommand *lookupCommand(char *name) {
1429 int j = 0;
1430 while(cmdTable[j].name != NULL) {
1431 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1432 j++;
1433 }
1434 return NULL;
1435 }
1436
1437 /* resetClient prepare the client to process the next command */
1438 static void resetClient(redisClient *c) {
1439 freeClientArgv(c);
1440 c->bulklen = -1;
1441 c->multibulk = 0;
1442 }
1443
1444 /* If this function gets called we already read a whole
1445 * command, argments are in the client argv/argc fields.
1446 * processCommand() execute the command or prepare the
1447 * server for a bulk read from the client.
1448 *
1449 * If 1 is returned the client is still alive and valid and
1450 * and other operations can be performed by the caller. Otherwise
1451 * if 0 is returned the client was destroied (i.e. after QUIT). */
1452 static int processCommand(redisClient *c) {
1453 struct redisCommand *cmd;
1454 long long dirty;
1455
1456 /* Free some memory if needed (maxmemory setting) */
1457 if (server.maxmemory) freeMemoryIfNeeded();
1458
1459 /* Handle the multi bulk command type. This is an alternative protocol
1460 * supported by Redis in order to receive commands that are composed of
1461 * multiple binary-safe "bulk" arguments. The latency of processing is
1462 * a bit higher but this allows things like multi-sets, so if this
1463 * protocol is used only for MSET and similar commands this is a big win. */
1464 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1465 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1466 if (c->multibulk <= 0) {
1467 resetClient(c);
1468 return 1;
1469 } else {
1470 decrRefCount(c->argv[c->argc-1]);
1471 c->argc--;
1472 return 1;
1473 }
1474 } else if (c->multibulk) {
1475 if (c->bulklen == -1) {
1476 if (((char*)c->argv[0]->ptr)[0] != '$') {
1477 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1478 resetClient(c);
1479 return 1;
1480 } else {
1481 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1482 decrRefCount(c->argv[0]);
1483 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1484 c->argc--;
1485 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1486 resetClient(c);
1487 return 1;
1488 }
1489 c->argc--;
1490 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1491 return 1;
1492 }
1493 } else {
1494 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1495 c->mbargv[c->mbargc] = c->argv[0];
1496 c->mbargc++;
1497 c->argc--;
1498 c->multibulk--;
1499 if (c->multibulk == 0) {
1500 robj **auxargv;
1501 int auxargc;
1502
1503 /* Here we need to swap the multi-bulk argc/argv with the
1504 * normal argc/argv of the client structure. */
1505 auxargv = c->argv;
1506 c->argv = c->mbargv;
1507 c->mbargv = auxargv;
1508
1509 auxargc = c->argc;
1510 c->argc = c->mbargc;
1511 c->mbargc = auxargc;
1512
1513 /* We need to set bulklen to something different than -1
1514 * in order for the code below to process the command without
1515 * to try to read the last argument of a bulk command as
1516 * a special argument. */
1517 c->bulklen = 0;
1518 /* continue below and process the command */
1519 } else {
1520 c->bulklen = -1;
1521 return 1;
1522 }
1523 }
1524 }
1525 /* -- end of multi bulk commands processing -- */
1526
1527 /* The QUIT command is handled as a special case. Normal command
1528 * procs are unable to close the client connection safely */
1529 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1530 freeClient(c);
1531 return 0;
1532 }
1533 cmd = lookupCommand(c->argv[0]->ptr);
1534 if (!cmd) {
1535 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1536 resetClient(c);
1537 return 1;
1538 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1539 (c->argc < -cmd->arity)) {
1540 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1541 resetClient(c);
1542 return 1;
1543 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1544 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1545 resetClient(c);
1546 return 1;
1547 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1548 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1549
1550 decrRefCount(c->argv[c->argc-1]);
1551 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1552 c->argc--;
1553 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1554 resetClient(c);
1555 return 1;
1556 }
1557 c->argc--;
1558 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1559 /* It is possible that the bulk read is already in the
1560 * buffer. Check this condition and handle it accordingly.
1561 * This is just a fast path, alternative to call processInputBuffer().
1562 * It's a good idea since the code is small and this condition
1563 * happens most of the times. */
1564 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1565 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1566 c->argc++;
1567 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1568 } else {
1569 return 1;
1570 }
1571 }
1572 /* Let's try to share objects on the command arguments vector */
1573 if (server.shareobjects) {
1574 int j;
1575 for(j = 1; j < c->argc; j++)
1576 c->argv[j] = tryObjectSharing(c->argv[j]);
1577 }
1578 /* Let's try to encode the bulk object to save space. */
1579 if (cmd->flags & REDIS_CMD_BULK)
1580 tryObjectEncoding(c->argv[c->argc-1]);
1581
1582 /* Check if the user is authenticated */
1583 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1584 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1585 resetClient(c);
1586 return 1;
1587 }
1588
1589 /* Exec the command */
1590 dirty = server.dirty;
1591 cmd->proc(c);
1592 if (server.appendonly && server.dirty-dirty)
1593 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1594 if (server.dirty-dirty && listLength(server.slaves))
1595 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1596 if (listLength(server.monitors))
1597 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1598 server.stat_numcommands++;
1599
1600 /* Prepare the client for the next command */
1601 if (c->flags & REDIS_CLOSE) {
1602 freeClient(c);
1603 return 0;
1604 }
1605 resetClient(c);
1606 return 1;
1607 }
1608
1609 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1610 listNode *ln;
1611 int outc = 0, j;
1612 robj **outv;
1613 /* (args*2)+1 is enough room for args, spaces, newlines */
1614 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1615
1616 if (argc <= REDIS_STATIC_ARGS) {
1617 outv = static_outv;
1618 } else {
1619 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1620 }
1621
1622 for (j = 0; j < argc; j++) {
1623 if (j != 0) outv[outc++] = shared.space;
1624 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1625 robj *lenobj;
1626
1627 lenobj = createObject(REDIS_STRING,
1628 sdscatprintf(sdsempty(),"%d\r\n",
1629 stringObjectLen(argv[j])));
1630 lenobj->refcount = 0;
1631 outv[outc++] = lenobj;
1632 }
1633 outv[outc++] = argv[j];
1634 }
1635 outv[outc++] = shared.crlf;
1636
1637 /* Increment all the refcounts at start and decrement at end in order to
1638 * be sure to free objects if there is no slave in a replication state
1639 * able to be feed with commands */
1640 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1641 listRewind(slaves);
1642 while((ln = listYield(slaves))) {
1643 redisClient *slave = ln->value;
1644
1645 /* Don't feed slaves that are still waiting for BGSAVE to start */
1646 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1647
1648 /* Feed all the other slaves, MONITORs and so on */
1649 if (slave->slaveseldb != dictid) {
1650 robj *selectcmd;
1651
1652 switch(dictid) {
1653 case 0: selectcmd = shared.select0; break;
1654 case 1: selectcmd = shared.select1; break;
1655 case 2: selectcmd = shared.select2; break;
1656 case 3: selectcmd = shared.select3; break;
1657 case 4: selectcmd = shared.select4; break;
1658 case 5: selectcmd = shared.select5; break;
1659 case 6: selectcmd = shared.select6; break;
1660 case 7: selectcmd = shared.select7; break;
1661 case 8: selectcmd = shared.select8; break;
1662 case 9: selectcmd = shared.select9; break;
1663 default:
1664 selectcmd = createObject(REDIS_STRING,
1665 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1666 selectcmd->refcount = 0;
1667 break;
1668 }
1669 addReply(slave,selectcmd);
1670 slave->slaveseldb = dictid;
1671 }
1672 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1673 }
1674 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1675 if (outv != static_outv) zfree(outv);
1676 }
1677
1678 static void processInputBuffer(redisClient *c) {
1679 again:
1680 if (c->bulklen == -1) {
1681 /* Read the first line of the query */
1682 char *p = strchr(c->querybuf,'\n');
1683 size_t querylen;
1684
1685 if (p) {
1686 sds query, *argv;
1687 int argc, j;
1688
1689 query = c->querybuf;
1690 c->querybuf = sdsempty();
1691 querylen = 1+(p-(query));
1692 if (sdslen(query) > querylen) {
1693 /* leave data after the first line of the query in the buffer */
1694 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1695 }
1696 *p = '\0'; /* remove "\n" */
1697 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1698 sdsupdatelen(query);
1699
1700 /* Now we can split the query in arguments */
1701 if (sdslen(query) == 0) {
1702 /* Ignore empty query */
1703 sdsfree(query);
1704 return;
1705 }
1706 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1707 sdsfree(query);
1708
1709 if (c->argv) zfree(c->argv);
1710 c->argv = zmalloc(sizeof(robj*)*argc);
1711
1712 for (j = 0; j < argc; j++) {
1713 if (sdslen(argv[j])) {
1714 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1715 c->argc++;
1716 } else {
1717 sdsfree(argv[j]);
1718 }
1719 }
1720 zfree(argv);
1721 /* Execute the command. If the client is still valid
1722 * after processCommand() return and there is something
1723 * on the query buffer try to process the next command. */
1724 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1725 return;
1726 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1727 redisLog(REDIS_DEBUG, "Client protocol error");
1728 freeClient(c);
1729 return;
1730 }
1731 } else {
1732 /* Bulk read handling. Note that if we are at this point
1733 the client already sent a command terminated with a newline,
1734 we are reading the bulk data that is actually the last
1735 argument of the command. */
1736 int qbl = sdslen(c->querybuf);
1737
1738 if (c->bulklen <= qbl) {
1739 /* Copy everything but the final CRLF as final argument */
1740 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1741 c->argc++;
1742 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1743 /* Process the command. If the client is still valid after
1744 * the processing and there is more data in the buffer
1745 * try to parse it. */
1746 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1747 return;
1748 }
1749 }
1750 }
1751
1752 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1753 redisClient *c = (redisClient*) privdata;
1754 char buf[REDIS_IOBUF_LEN];
1755 int nread;
1756 REDIS_NOTUSED(el);
1757 REDIS_NOTUSED(mask);
1758
1759 nread = read(fd, buf, REDIS_IOBUF_LEN);
1760 if (nread == -1) {
1761 if (errno == EAGAIN) {
1762 nread = 0;
1763 } else {
1764 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1765 freeClient(c);
1766 return;
1767 }
1768 } else if (nread == 0) {
1769 redisLog(REDIS_DEBUG, "Client closed connection");
1770 freeClient(c);
1771 return;
1772 }
1773 if (nread) {
1774 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1775 c->lastinteraction = time(NULL);
1776 } else {
1777 return;
1778 }
1779 processInputBuffer(c);
1780 }
1781
1782 static int selectDb(redisClient *c, int id) {
1783 if (id < 0 || id >= server.dbnum)
1784 return REDIS_ERR;
1785 c->db = &server.db[id];
1786 return REDIS_OK;
1787 }
1788
1789 static void *dupClientReplyValue(void *o) {
1790 incrRefCount((robj*)o);
1791 return 0;
1792 }
1793
1794 static redisClient *createClient(int fd) {
1795 redisClient *c = zmalloc(sizeof(*c));
1796
1797 anetNonBlock(NULL,fd);
1798 anetTcpNoDelay(NULL,fd);
1799 if (!c) return NULL;
1800 selectDb(c,0);
1801 c->fd = fd;
1802 c->querybuf = sdsempty();
1803 c->argc = 0;
1804 c->argv = NULL;
1805 c->bulklen = -1;
1806 c->multibulk = 0;
1807 c->mbargc = 0;
1808 c->mbargv = NULL;
1809 c->sentlen = 0;
1810 c->flags = 0;
1811 c->lastinteraction = time(NULL);
1812 c->authenticated = 0;
1813 c->replstate = REDIS_REPL_NONE;
1814 c->reply = listCreate();
1815 listSetFreeMethod(c->reply,decrRefCount);
1816 listSetDupMethod(c->reply,dupClientReplyValue);
1817 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1818 readQueryFromClient, c, NULL) == AE_ERR) {
1819 freeClient(c);
1820 return NULL;
1821 }
1822 listAddNodeTail(server.clients,c);
1823 return c;
1824 }
1825
1826 static void addReply(redisClient *c, robj *obj) {
1827 if (listLength(c->reply) == 0 &&
1828 (c->replstate == REDIS_REPL_NONE ||
1829 c->replstate == REDIS_REPL_ONLINE) &&
1830 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1831 sendReplyToClient, c, NULL) == AE_ERR) return;
1832 if (obj->encoding != REDIS_ENCODING_RAW) {
1833 obj = getDecodedObject(obj);
1834 } else {
1835 incrRefCount(obj);
1836 }
1837 listAddNodeTail(c->reply,obj);
1838 }
1839
1840 static void addReplySds(redisClient *c, sds s) {
1841 robj *o = createObject(REDIS_STRING,s);
1842 addReply(c,o);
1843 decrRefCount(o);
1844 }
1845
1846 static void addReplyBulkLen(redisClient *c, robj *obj) {
1847 size_t len;
1848
1849 if (obj->encoding == REDIS_ENCODING_RAW) {
1850 len = sdslen(obj->ptr);
1851 } else {
1852 long n = (long)obj->ptr;
1853
1854 len = 1;
1855 if (n < 0) {
1856 len++;
1857 n = -n;
1858 }
1859 while((n = n/10) != 0) {
1860 len++;
1861 }
1862 }
1863 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1864 }
1865
1866 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1867 int cport, cfd;
1868 char cip[128];
1869 redisClient *c;
1870 REDIS_NOTUSED(el);
1871 REDIS_NOTUSED(mask);
1872 REDIS_NOTUSED(privdata);
1873
1874 cfd = anetAccept(server.neterr, fd, cip, &cport);
1875 if (cfd == AE_ERR) {
1876 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1877 return;
1878 }
1879 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1880 if ((c = createClient(cfd)) == NULL) {
1881 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1882 close(cfd); /* May be already closed, just ingore errors */
1883 return;
1884 }
1885 /* If maxclient directive is set and this is one client more... close the
1886 * connection. Note that we create the client instead to check before
1887 * for this condition, since now the socket is already set in nonblocking
1888 * mode and we can send an error for free using the Kernel I/O */
1889 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1890 char *err = "-ERR max number of clients reached\r\n";
1891
1892 /* That's a best effort error message, don't check write errors */
1893 (void) write(c->fd,err,strlen(err));
1894 freeClient(c);
1895 return;
1896 }
1897 server.stat_numconnections++;
1898 }
1899
1900 /* ======================= Redis objects implementation ===================== */
1901
1902 static robj *createObject(int type, void *ptr) {
1903 robj *o;
1904
1905 if (listLength(server.objfreelist)) {
1906 listNode *head = listFirst(server.objfreelist);
1907 o = listNodeValue(head);
1908 listDelNode(server.objfreelist,head);
1909 } else {
1910 o = zmalloc(sizeof(*o));
1911 }
1912 o->type = type;
1913 o->encoding = REDIS_ENCODING_RAW;
1914 o->ptr = ptr;
1915 o->refcount = 1;
1916 return o;
1917 }
1918
1919 static robj *createStringObject(char *ptr, size_t len) {
1920 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1921 }
1922
1923 static robj *createListObject(void) {
1924 list *l = listCreate();
1925
1926 listSetFreeMethod(l,decrRefCount);
1927 return createObject(REDIS_LIST,l);
1928 }
1929
1930 static robj *createSetObject(void) {
1931 dict *d = dictCreate(&setDictType,NULL);
1932 return createObject(REDIS_SET,d);
1933 }
1934
1935 static robj *createZsetObject(void) {
1936 zset *zs = zmalloc(sizeof(*zs));
1937
1938 zs->dict = dictCreate(&zsetDictType,NULL);
1939 zs->zsl = zslCreate();
1940 return createObject(REDIS_ZSET,zs);
1941 }
1942
1943 static void freeStringObject(robj *o) {
1944 if (o->encoding == REDIS_ENCODING_RAW) {
1945 sdsfree(o->ptr);
1946 }
1947 }
1948
1949 static void freeListObject(robj *o) {
1950 listRelease((list*) o->ptr);
1951 }
1952
1953 static void freeSetObject(robj *o) {
1954 dictRelease((dict*) o->ptr);
1955 }
1956
1957 static void freeZsetObject(robj *o) {
1958 zset *zs = o->ptr;
1959
1960 dictRelease(zs->dict);
1961 zslFree(zs->zsl);
1962 zfree(zs);
1963 }
1964
1965 static void freeHashObject(robj *o) {
1966 dictRelease((dict*) o->ptr);
1967 }
1968
1969 static void incrRefCount(robj *o) {
1970 o->refcount++;
1971 #ifdef DEBUG_REFCOUNT
1972 if (o->type == REDIS_STRING)
1973 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1974 #endif
1975 }
1976
1977 static void decrRefCount(void *obj) {
1978 robj *o = obj;
1979
1980 #ifdef DEBUG_REFCOUNT
1981 if (o->type == REDIS_STRING)
1982 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1983 #endif
1984 if (--(o->refcount) == 0) {
1985 switch(o->type) {
1986 case REDIS_STRING: freeStringObject(o); break;
1987 case REDIS_LIST: freeListObject(o); break;
1988 case REDIS_SET: freeSetObject(o); break;
1989 case REDIS_ZSET: freeZsetObject(o); break;
1990 case REDIS_HASH: freeHashObject(o); break;
1991 default: assert(0 != 0); break;
1992 }
1993 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1994 !listAddNodeHead(server.objfreelist,o))
1995 zfree(o);
1996 }
1997 }
1998
1999 static robj *lookupKey(redisDb *db, robj *key) {
2000 dictEntry *de = dictFind(db->dict,key);
2001 return de ? dictGetEntryVal(de) : NULL;
2002 }
2003
2004 static robj *lookupKeyRead(redisDb *db, robj *key) {
2005 expireIfNeeded(db,key);
2006 return lookupKey(db,key);
2007 }
2008
2009 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2010 deleteIfVolatile(db,key);
2011 return lookupKey(db,key);
2012 }
2013
2014 static int deleteKey(redisDb *db, robj *key) {
2015 int retval;
2016
2017 /* We need to protect key from destruction: after the first dictDelete()
2018 * it may happen that 'key' is no longer valid if we don't increment
2019 * it's count. This may happen when we get the object reference directly
2020 * from the hash table with dictRandomKey() or dict iterators */
2021 incrRefCount(key);
2022 if (dictSize(db->expires)) dictDelete(db->expires,key);
2023 retval = dictDelete(db->dict,key);
2024 decrRefCount(key);
2025
2026 return retval == DICT_OK;
2027 }
2028
2029 /* Try to share an object against the shared objects pool */
2030 static robj *tryObjectSharing(robj *o) {
2031 struct dictEntry *de;
2032 unsigned long c;
2033
2034 if (o == NULL || server.shareobjects == 0) return o;
2035
2036 assert(o->type == REDIS_STRING);
2037 de = dictFind(server.sharingpool,o);
2038 if (de) {
2039 robj *shared = dictGetEntryKey(de);
2040
2041 c = ((unsigned long) dictGetEntryVal(de))+1;
2042 dictGetEntryVal(de) = (void*) c;
2043 incrRefCount(shared);
2044 decrRefCount(o);
2045 return shared;
2046 } else {
2047 /* Here we are using a stream algorihtm: Every time an object is
2048 * shared we increment its count, everytime there is a miss we
2049 * recrement the counter of a random object. If this object reaches
2050 * zero we remove the object and put the current object instead. */
2051 if (dictSize(server.sharingpool) >=
2052 server.sharingpoolsize) {
2053 de = dictGetRandomKey(server.sharingpool);
2054 assert(de != NULL);
2055 c = ((unsigned long) dictGetEntryVal(de))-1;
2056 dictGetEntryVal(de) = (void*) c;
2057 if (c == 0) {
2058 dictDelete(server.sharingpool,de->key);
2059 }
2060 } else {
2061 c = 0; /* If the pool is empty we want to add this object */
2062 }
2063 if (c == 0) {
2064 int retval;
2065
2066 retval = dictAdd(server.sharingpool,o,(void*)1);
2067 assert(retval == DICT_OK);
2068 incrRefCount(o);
2069 }
2070 return o;
2071 }
2072 }
2073
2074 /* Check if the nul-terminated string 's' can be represented by a long
2075 * (that is, is a number that fits into long without any other space or
2076 * character before or after the digits).
2077 *
2078 * If so, the function returns REDIS_OK and *longval is set to the value
2079 * of the number. Otherwise REDIS_ERR is returned */
2080 static int isStringRepresentableAsLong(sds s, long *longval) {
2081 char buf[32], *endptr;
2082 long value;
2083 int slen;
2084
2085 value = strtol(s, &endptr, 10);
2086 if (endptr[0] != '\0') return REDIS_ERR;
2087 slen = snprintf(buf,32,"%ld",value);
2088
2089 /* If the number converted back into a string is not identical
2090 * then it's not possible to encode the string as integer */
2091 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2092 if (longval) *longval = value;
2093 return REDIS_OK;
2094 }
2095
2096 /* Try to encode a string object in order to save space */
2097 static int tryObjectEncoding(robj *o) {
2098 long value;
2099 sds s = o->ptr;
2100
2101 if (o->encoding != REDIS_ENCODING_RAW)
2102 return REDIS_ERR; /* Already encoded */
2103
2104 /* It's not save to encode shared objects: shared objects can be shared
2105 * everywhere in the "object space" of Redis. Encoded objects can only
2106 * appear as "values" (and not, for instance, as keys) */
2107 if (o->refcount > 1) return REDIS_ERR;
2108
2109 /* Currently we try to encode only strings */
2110 assert(o->type == REDIS_STRING);
2111
2112 /* Check if we can represent this string as a long integer */
2113 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2114
2115 /* Ok, this object can be encoded */
2116 o->encoding = REDIS_ENCODING_INT;
2117 sdsfree(o->ptr);
2118 o->ptr = (void*) value;
2119 return REDIS_OK;
2120 }
2121
2122 /* Get a decoded version of an encoded object (returned as a new object) */
2123 static robj *getDecodedObject(const robj *o) {
2124 robj *dec;
2125
2126 assert(o->encoding != REDIS_ENCODING_RAW);
2127 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2128 char buf[32];
2129
2130 snprintf(buf,32,"%ld",(long)o->ptr);
2131 dec = createStringObject(buf,strlen(buf));
2132 return dec;
2133 } else {
2134 assert(1 != 1);
2135 }
2136 }
2137
2138 /* Compare two string objects via strcmp() or alike.
2139 * Note that the objects may be integer-encoded. In such a case we
2140 * use snprintf() to get a string representation of the numbers on the stack
2141 * and compare the strings, it's much faster than calling getDecodedObject(). */
2142 static int compareStringObjects(robj *a, robj *b) {
2143 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2144 char bufa[128], bufb[128], *astr, *bstr;
2145 int bothsds = 1;
2146
2147 if (a == b) return 0;
2148 if (a->encoding != REDIS_ENCODING_RAW) {
2149 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2150 astr = bufa;
2151 bothsds = 0;
2152 } else {
2153 astr = a->ptr;
2154 }
2155 if (b->encoding != REDIS_ENCODING_RAW) {
2156 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2157 bstr = bufb;
2158 bothsds = 0;
2159 } else {
2160 bstr = b->ptr;
2161 }
2162 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2163 }
2164
2165 static size_t stringObjectLen(robj *o) {
2166 assert(o->type == REDIS_STRING);
2167 if (o->encoding == REDIS_ENCODING_RAW) {
2168 return sdslen(o->ptr);
2169 } else {
2170 char buf[32];
2171
2172 return snprintf(buf,32,"%ld",(long)o->ptr);
2173 }
2174 }
2175
2176 /*============================ DB saving/loading ============================ */
2177
2178 static int rdbSaveType(FILE *fp, unsigned char type) {
2179 if (fwrite(&type,1,1,fp) == 0) return -1;
2180 return 0;
2181 }
2182
2183 static int rdbSaveTime(FILE *fp, time_t t) {
2184 int32_t t32 = (int32_t) t;
2185 if (fwrite(&t32,4,1,fp) == 0) return -1;
2186 return 0;
2187 }
2188
2189 /* check rdbLoadLen() comments for more info */
2190 static int rdbSaveLen(FILE *fp, uint32_t len) {
2191 unsigned char buf[2];
2192
2193 if (len < (1<<6)) {
2194 /* Save a 6 bit len */
2195 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2196 if (fwrite(buf,1,1,fp) == 0) return -1;
2197 } else if (len < (1<<14)) {
2198 /* Save a 14 bit len */
2199 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2200 buf[1] = len&0xFF;
2201 if (fwrite(buf,2,1,fp) == 0) return -1;
2202 } else {
2203 /* Save a 32 bit len */
2204 buf[0] = (REDIS_RDB_32BITLEN<<6);
2205 if (fwrite(buf,1,1,fp) == 0) return -1;
2206 len = htonl(len);
2207 if (fwrite(&len,4,1,fp) == 0) return -1;
2208 }
2209 return 0;
2210 }
2211
2212 /* String objects in the form "2391" "-100" without any space and with a
2213 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2214 * encoded as integers to save space */
2215 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2216 long long value;
2217 char *endptr, buf[32];
2218
2219 /* Check if it's possible to encode this value as a number */
2220 value = strtoll(s, &endptr, 10);
2221 if (endptr[0] != '\0') return 0;
2222 snprintf(buf,32,"%lld",value);
2223
2224 /* If the number converted back into a string is not identical
2225 * then it's not possible to encode the string as integer */
2226 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2227
2228 /* Finally check if it fits in our ranges */
2229 if (value >= -(1<<7) && value <= (1<<7)-1) {
2230 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2231 enc[1] = value&0xFF;
2232 return 2;
2233 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2234 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2235 enc[1] = value&0xFF;
2236 enc[2] = (value>>8)&0xFF;
2237 return 3;
2238 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2239 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2240 enc[1] = value&0xFF;
2241 enc[2] = (value>>8)&0xFF;
2242 enc[3] = (value>>16)&0xFF;
2243 enc[4] = (value>>24)&0xFF;
2244 return 5;
2245 } else {
2246 return 0;
2247 }
2248 }
2249
2250 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2251 unsigned int comprlen, outlen;
2252 unsigned char byte;
2253 void *out;
2254
2255 /* We require at least four bytes compression for this to be worth it */
2256 outlen = sdslen(obj->ptr)-4;
2257 if (outlen <= 0) return 0;
2258 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2259 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2260 if (comprlen == 0) {
2261 zfree(out);
2262 return 0;
2263 }
2264 /* Data compressed! Let's save it on disk */
2265 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2266 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2267 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2268 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2269 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2270 zfree(out);
2271 return comprlen;
2272
2273 writeerr:
2274 zfree(out);
2275 return -1;
2276 }
2277
2278 /* Save a string objet as [len][data] on disk. If the object is a string
2279 * representation of an integer value we try to safe it in a special form */
2280 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2281 size_t len;
2282 int enclen;
2283
2284 len = sdslen(obj->ptr);
2285
2286 /* Try integer encoding */
2287 if (len <= 11) {
2288 unsigned char buf[5];
2289 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2290 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2291 return 0;
2292 }
2293 }
2294
2295 /* Try LZF compression - under 20 bytes it's unable to compress even
2296 * aaaaaaaaaaaaaaaaaa so skip it */
2297 if (len > 20) {
2298 int retval;
2299
2300 retval = rdbSaveLzfStringObject(fp,obj);
2301 if (retval == -1) return -1;
2302 if (retval > 0) return 0;
2303 /* retval == 0 means data can't be compressed, save the old way */
2304 }
2305
2306 /* Store verbatim */
2307 if (rdbSaveLen(fp,len) == -1) return -1;
2308 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2309 return 0;
2310 }
2311
2312 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2313 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2314 int retval;
2315 robj *dec;
2316
2317 if (obj->encoding != REDIS_ENCODING_RAW) {
2318 dec = getDecodedObject(obj);
2319 retval = rdbSaveStringObjectRaw(fp,dec);
2320 decrRefCount(dec);
2321 return retval;
2322 } else {
2323 return rdbSaveStringObjectRaw(fp,obj);
2324 }
2325 }
2326
2327 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2328 * 8 bit integer specifing the length of the representation.
2329 * This 8 bit integer has special values in order to specify the following
2330 * conditions:
2331 * 253: not a number
2332 * 254: + inf
2333 * 255: - inf
2334 */
2335 static int rdbSaveDoubleValue(FILE *fp, double val) {
2336 unsigned char buf[128];
2337 int len;
2338
2339 if (isnan(val)) {
2340 buf[0] = 253;
2341 len = 1;
2342 } else if (!isfinite(val)) {
2343 len = 1;
2344 buf[0] = (val < 0) ? 255 : 254;
2345 } else {
2346 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2347 buf[0] = strlen((char*)buf);
2348 len = buf[0]+1;
2349 }
2350 if (fwrite(buf,len,1,fp) == 0) return -1;
2351 return 0;
2352 }
2353
2354 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2355 static int rdbSave(char *filename) {
2356 dictIterator *di = NULL;
2357 dictEntry *de;
2358 FILE *fp;
2359 char tmpfile[256];
2360 int j;
2361 time_t now = time(NULL);
2362
2363 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2364 fp = fopen(tmpfile,"w");
2365 if (!fp) {
2366 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2367 return REDIS_ERR;
2368 }
2369 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2370 for (j = 0; j < server.dbnum; j++) {
2371 redisDb *db = server.db+j;
2372 dict *d = db->dict;
2373 if (dictSize(d) == 0) continue;
2374 di = dictGetIterator(d);
2375 if (!di) {
2376 fclose(fp);
2377 return REDIS_ERR;
2378 }
2379
2380 /* Write the SELECT DB opcode */
2381 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2382 if (rdbSaveLen(fp,j) == -1) goto werr;
2383
2384 /* Iterate this DB writing every entry */
2385 while((de = dictNext(di)) != NULL) {
2386 robj *key = dictGetEntryKey(de);
2387 robj *o = dictGetEntryVal(de);
2388 time_t expiretime = getExpire(db,key);
2389
2390 /* Save the expire time */
2391 if (expiretime != -1) {
2392 /* If this key is already expired skip it */
2393 if (expiretime < now) continue;
2394 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2395 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2396 }
2397 /* Save the key and associated value */
2398 if (rdbSaveType(fp,o->type) == -1) goto werr;
2399 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2400 if (o->type == REDIS_STRING) {
2401 /* Save a string value */
2402 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2403 } else if (o->type == REDIS_LIST) {
2404 /* Save a list value */
2405 list *list = o->ptr;
2406 listNode *ln;
2407
2408 listRewind(list);
2409 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2410 while((ln = listYield(list))) {
2411 robj *eleobj = listNodeValue(ln);
2412
2413 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2414 }
2415 } else if (o->type == REDIS_SET) {
2416 /* Save a set value */
2417 dict *set = o->ptr;
2418 dictIterator *di = dictGetIterator(set);
2419 dictEntry *de;
2420
2421 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2422 while((de = dictNext(di)) != NULL) {
2423 robj *eleobj = dictGetEntryKey(de);
2424
2425 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2426 }
2427 dictReleaseIterator(di);
2428 } else if (o->type == REDIS_ZSET) {
2429 /* Save a set value */
2430 zset *zs = o->ptr;
2431 dictIterator *di = dictGetIterator(zs->dict);
2432 dictEntry *de;
2433
2434 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2435 while((de = dictNext(di)) != NULL) {
2436 robj *eleobj = dictGetEntryKey(de);
2437 double *score = dictGetEntryVal(de);
2438
2439 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2440 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2441 }
2442 dictReleaseIterator(di);
2443 } else {
2444 assert(0 != 0);
2445 }
2446 }
2447 dictReleaseIterator(di);
2448 }
2449 /* EOF opcode */
2450 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2451
2452 /* Make sure data will not remain on the OS's output buffers */
2453 fflush(fp);
2454 fsync(fileno(fp));
2455 fclose(fp);
2456
2457 /* Use RENAME to make sure the DB file is changed atomically only
2458 * if the generate DB file is ok. */
2459 if (rename(tmpfile,filename) == -1) {
2460 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2461 unlink(tmpfile);
2462 return REDIS_ERR;
2463 }
2464 redisLog(REDIS_NOTICE,"DB saved on disk");
2465 server.dirty = 0;
2466 server.lastsave = time(NULL);
2467 return REDIS_OK;
2468
2469 werr:
2470 fclose(fp);
2471 unlink(tmpfile);
2472 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2473 if (di) dictReleaseIterator(di);
2474 return REDIS_ERR;
2475 }
2476
2477 static int rdbSaveBackground(char *filename) {
2478 pid_t childpid;
2479
2480 if (server.bgsaveinprogress) return REDIS_ERR;
2481 if ((childpid = fork()) == 0) {
2482 /* Child */
2483 close(server.fd);
2484 if (rdbSave(filename) == REDIS_OK) {
2485 exit(0);
2486 } else {
2487 exit(1);
2488 }
2489 } else {
2490 /* Parent */
2491 if (childpid == -1) {
2492 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2493 strerror(errno));
2494 return REDIS_ERR;
2495 }
2496 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2497 server.bgsaveinprogress = 1;
2498 server.bgsavechildpid = childpid;
2499 return REDIS_OK;
2500 }
2501 return REDIS_OK; /* unreached */
2502 }
2503
2504 static void rdbRemoveTempFile(pid_t childpid) {
2505 char tmpfile[256];
2506
2507 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2508 unlink(tmpfile);
2509 }
2510
2511 static int rdbLoadType(FILE *fp) {
2512 unsigned char type;
2513 if (fread(&type,1,1,fp) == 0) return -1;
2514 return type;
2515 }
2516
2517 static time_t rdbLoadTime(FILE *fp) {
2518 int32_t t32;
2519 if (fread(&t32,4,1,fp) == 0) return -1;
2520 return (time_t) t32;
2521 }
2522
2523 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2524 * of this file for a description of how this are stored on disk.
2525 *
2526 * isencoded is set to 1 if the readed length is not actually a length but
2527 * an "encoding type", check the above comments for more info */
2528 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2529 unsigned char buf[2];
2530 uint32_t len;
2531
2532 if (isencoded) *isencoded = 0;
2533 if (rdbver == 0) {
2534 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2535 return ntohl(len);
2536 } else {
2537 int type;
2538
2539 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2540 type = (buf[0]&0xC0)>>6;
2541 if (type == REDIS_RDB_6BITLEN) {
2542 /* Read a 6 bit len */
2543 return buf[0]&0x3F;
2544 } else if (type == REDIS_RDB_ENCVAL) {
2545 /* Read a 6 bit len encoding type */
2546 if (isencoded) *isencoded = 1;
2547 return buf[0]&0x3F;
2548 } else if (type == REDIS_RDB_14BITLEN) {
2549 /* Read a 14 bit len */
2550 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2551 return ((buf[0]&0x3F)<<8)|buf[1];
2552 } else {
2553 /* Read a 32 bit len */
2554 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2555 return ntohl(len);
2556 }
2557 }
2558 }
2559
2560 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2561 unsigned char enc[4];
2562 long long val;
2563
2564 if (enctype == REDIS_RDB_ENC_INT8) {
2565 if (fread(enc,1,1,fp) == 0) return NULL;
2566 val = (signed char)enc[0];
2567 } else if (enctype == REDIS_RDB_ENC_INT16) {
2568 uint16_t v;
2569 if (fread(enc,2,1,fp) == 0) return NULL;
2570 v = enc[0]|(enc[1]<<8);
2571 val = (int16_t)v;
2572 } else if (enctype == REDIS_RDB_ENC_INT32) {
2573 uint32_t v;
2574 if (fread(enc,4,1,fp) == 0) return NULL;
2575 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2576 val = (int32_t)v;
2577 } else {
2578 val = 0; /* anti-warning */
2579 assert(0!=0);
2580 }
2581 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2582 }
2583
2584 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2585 unsigned int len, clen;
2586 unsigned char *c = NULL;
2587 sds val = NULL;
2588
2589 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2590 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2591 if ((c = zmalloc(clen)) == NULL) goto err;
2592 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2593 if (fread(c,clen,1,fp) == 0) goto err;
2594 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2595 zfree(c);
2596 return createObject(REDIS_STRING,val);
2597 err:
2598 zfree(c);
2599 sdsfree(val);
2600 return NULL;
2601 }
2602
2603 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2604 int isencoded;
2605 uint32_t len;
2606 sds val;
2607
2608 len = rdbLoadLen(fp,rdbver,&isencoded);
2609 if (isencoded) {
2610 switch(len) {
2611 case REDIS_RDB_ENC_INT8:
2612 case REDIS_RDB_ENC_INT16:
2613 case REDIS_RDB_ENC_INT32:
2614 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2615 case REDIS_RDB_ENC_LZF:
2616 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2617 default:
2618 assert(0!=0);
2619 }
2620 }
2621
2622 if (len == REDIS_RDB_LENERR) return NULL;
2623 val = sdsnewlen(NULL,len);
2624 if (len && fread(val,len,1,fp) == 0) {
2625 sdsfree(val);
2626 return NULL;
2627 }
2628 return tryObjectSharing(createObject(REDIS_STRING,val));
2629 }
2630
2631 /* For information about double serialization check rdbSaveDoubleValue() */
2632 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2633 char buf[128];
2634 unsigned char len;
2635
2636 if (fread(&len,1,1,fp) == 0) return -1;
2637 switch(len) {
2638 case 255: *val = R_NegInf; return 0;
2639 case 254: *val = R_PosInf; return 0;
2640 case 253: *val = R_Nan; return 0;
2641 default:
2642 if (fread(buf,len,1,fp) == 0) return -1;
2643 sscanf(buf, "%lg", val);
2644 return 0;
2645 }
2646 }
2647
2648 static int rdbLoad(char *filename) {
2649 FILE *fp;
2650 robj *keyobj = NULL;
2651 uint32_t dbid;
2652 int type, retval, rdbver;
2653 dict *d = server.db[0].dict;
2654 redisDb *db = server.db+0;
2655 char buf[1024];
2656 time_t expiretime = -1, now = time(NULL);
2657
2658 fp = fopen(filename,"r");
2659 if (!fp) return REDIS_ERR;
2660 if (fread(buf,9,1,fp) == 0) goto eoferr;
2661 buf[9] = '\0';
2662 if (memcmp(buf,"REDIS",5) != 0) {
2663 fclose(fp);
2664 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2665 return REDIS_ERR;
2666 }
2667 rdbver = atoi(buf+5);
2668 if (rdbver > 1) {
2669 fclose(fp);
2670 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2671 return REDIS_ERR;
2672 }
2673 while(1) {
2674 robj *o;
2675
2676 /* Read type. */
2677 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2678 if (type == REDIS_EXPIRETIME) {
2679 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2680 /* We read the time so we need to read the object type again */
2681 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2682 }
2683 if (type == REDIS_EOF) break;
2684 /* Handle SELECT DB opcode as a special case */
2685 if (type == REDIS_SELECTDB) {
2686 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2687 goto eoferr;
2688 if (dbid >= (unsigned)server.dbnum) {
2689 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2690 exit(1);
2691 }
2692 db = server.db+dbid;
2693 d = db->dict;
2694 continue;
2695 }
2696 /* Read key */
2697 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2698
2699 if (type == REDIS_STRING) {
2700 /* Read string value */
2701 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2702 tryObjectEncoding(o);
2703 } else if (type == REDIS_LIST || type == REDIS_SET) {
2704 /* Read list/set value */
2705 uint32_t listlen;
2706
2707 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2708 goto eoferr;
2709 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2710 /* Load every single element of the list/set */
2711 while(listlen--) {
2712 robj *ele;
2713
2714 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2715 tryObjectEncoding(ele);
2716 if (type == REDIS_LIST) {
2717 listAddNodeTail((list*)o->ptr,ele);
2718 } else {
2719 dictAdd((dict*)o->ptr,ele,NULL);
2720 }
2721 }
2722 } else if (type == REDIS_ZSET) {
2723 /* Read list/set value */
2724 uint32_t zsetlen;
2725 zset *zs;
2726
2727 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2728 goto eoferr;
2729 o = createZsetObject();
2730 zs = o->ptr;
2731 /* Load every single element of the list/set */
2732 while(zsetlen--) {
2733 robj *ele;
2734 double *score = zmalloc(sizeof(double));
2735
2736 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2737 tryObjectEncoding(ele);
2738 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2739 dictAdd(zs->dict,ele,score);
2740 zslInsert(zs->zsl,*score,ele);
2741 incrRefCount(ele); /* added to skiplist */
2742 }
2743 } else {
2744 assert(0 != 0);
2745 }
2746 /* Add the new object in the hash table */
2747 retval = dictAdd(d,keyobj,o);
2748 if (retval == DICT_ERR) {
2749 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2750 exit(1);
2751 }
2752 /* Set the expire time if needed */
2753 if (expiretime != -1) {
2754 setExpire(db,keyobj,expiretime);
2755 /* Delete this key if already expired */
2756 if (expiretime < now) deleteKey(db,keyobj);
2757 expiretime = -1;
2758 }
2759 keyobj = o = NULL;
2760 }
2761 fclose(fp);
2762 return REDIS_OK;
2763
2764 eoferr: /* unexpected end of file is handled here with a fatal exit */
2765 if (keyobj) decrRefCount(keyobj);
2766 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2767 exit(1);
2768 return REDIS_ERR; /* Just to avoid warning */
2769 }
2770
2771 /*================================== Commands =============================== */
2772
2773 static void authCommand(redisClient *c) {
2774 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2775 c->authenticated = 1;
2776 addReply(c,shared.ok);
2777 } else {
2778 c->authenticated = 0;
2779 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2780 }
2781 }
2782
2783 static void pingCommand(redisClient *c) {
2784 addReply(c,shared.pong);
2785 }
2786
2787 static void echoCommand(redisClient *c) {
2788 addReplyBulkLen(c,c->argv[1]);
2789 addReply(c,c->argv[1]);
2790 addReply(c,shared.crlf);
2791 }
2792
2793 /*=================================== Strings =============================== */
2794
2795 static void setGenericCommand(redisClient *c, int nx) {
2796 int retval;
2797
2798 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2799 if (retval == DICT_ERR) {
2800 if (!nx) {
2801 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2802 incrRefCount(c->argv[2]);
2803 } else {
2804 addReply(c,shared.czero);
2805 return;
2806 }
2807 } else {
2808 incrRefCount(c->argv[1]);
2809 incrRefCount(c->argv[2]);
2810 }
2811 server.dirty++;
2812 removeExpire(c->db,c->argv[1]);
2813 addReply(c, nx ? shared.cone : shared.ok);
2814 }
2815
2816 static void setCommand(redisClient *c) {
2817 setGenericCommand(c,0);
2818 }
2819
2820 static void setnxCommand(redisClient *c) {
2821 setGenericCommand(c,1);
2822 }
2823
2824 static void getCommand(redisClient *c) {
2825 robj *o = lookupKeyRead(c->db,c->argv[1]);
2826
2827 if (o == NULL) {
2828 addReply(c,shared.nullbulk);
2829 } else {
2830 if (o->type != REDIS_STRING) {
2831 addReply(c,shared.wrongtypeerr);
2832 } else {
2833 addReplyBulkLen(c,o);
2834 addReply(c,o);
2835 addReply(c,shared.crlf);
2836 }
2837 }
2838 }
2839
2840 static void getsetCommand(redisClient *c) {
2841 getCommand(c);
2842 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2843 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2844 } else {
2845 incrRefCount(c->argv[1]);
2846 }
2847 incrRefCount(c->argv[2]);
2848 server.dirty++;
2849 removeExpire(c->db,c->argv[1]);
2850 }
2851
2852 static void mgetCommand(redisClient *c) {
2853 int j;
2854
2855 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2856 for (j = 1; j < c->argc; j++) {
2857 robj *o = lookupKeyRead(c->db,c->argv[j]);
2858 if (o == NULL) {
2859 addReply(c,shared.nullbulk);
2860 } else {
2861 if (o->type != REDIS_STRING) {
2862 addReply(c,shared.nullbulk);
2863 } else {
2864 addReplyBulkLen(c,o);
2865 addReply(c,o);
2866 addReply(c,shared.crlf);
2867 }
2868 }
2869 }
2870 }
2871
2872 static void incrDecrCommand(redisClient *c, long long incr) {
2873 long long value;
2874 int retval;
2875 robj *o;
2876
2877 o = lookupKeyWrite(c->db,c->argv[1]);
2878 if (o == NULL) {
2879 value = 0;
2880 } else {
2881 if (o->type != REDIS_STRING) {
2882 value = 0;
2883 } else {
2884 char *eptr;
2885
2886 if (o->encoding == REDIS_ENCODING_RAW)
2887 value = strtoll(o->ptr, &eptr, 10);
2888 else if (o->encoding == REDIS_ENCODING_INT)
2889 value = (long)o->ptr;
2890 else
2891 assert(1 != 1);
2892 }
2893 }
2894
2895 value += incr;
2896 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2897 tryObjectEncoding(o);
2898 retval = dictAdd(c->db->dict,c->argv[1],o);
2899 if (retval == DICT_ERR) {
2900 dictReplace(c->db->dict,c->argv[1],o);
2901 removeExpire(c->db,c->argv[1]);
2902 } else {
2903 incrRefCount(c->argv[1]);
2904 }
2905 server.dirty++;
2906 addReply(c,shared.colon);
2907 addReply(c,o);
2908 addReply(c,shared.crlf);
2909 }
2910
2911 static void incrCommand(redisClient *c) {
2912 incrDecrCommand(c,1);
2913 }
2914
2915 static void decrCommand(redisClient *c) {
2916 incrDecrCommand(c,-1);
2917 }
2918
2919 static void incrbyCommand(redisClient *c) {
2920 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2921 incrDecrCommand(c,incr);
2922 }
2923
2924 static void decrbyCommand(redisClient *c) {
2925 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2926 incrDecrCommand(c,-incr);
2927 }
2928
2929 /* ========================= Type agnostic commands ========================= */
2930
2931 static void delCommand(redisClient *c) {
2932 int deleted = 0, j;
2933
2934 for (j = 1; j < c->argc; j++) {
2935 if (deleteKey(c->db,c->argv[j])) {
2936 server.dirty++;
2937 deleted++;
2938 }
2939 }
2940 switch(deleted) {
2941 case 0:
2942 addReply(c,shared.czero);
2943 break;
2944 case 1:
2945 addReply(c,shared.cone);
2946 break;
2947 default:
2948 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2949 break;
2950 }
2951 }
2952
2953 static void existsCommand(redisClient *c) {
2954 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2955 }
2956
2957 static void selectCommand(redisClient *c) {
2958 int id = atoi(c->argv[1]->ptr);
2959
2960 if (selectDb(c,id) == REDIS_ERR) {
2961 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2962 } else {
2963 addReply(c,shared.ok);
2964 }
2965 }
2966
2967 static void randomkeyCommand(redisClient *c) {
2968 dictEntry *de;
2969
2970 while(1) {
2971 de = dictGetRandomKey(c->db->dict);
2972 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2973 }
2974 if (de == NULL) {
2975 addReply(c,shared.plus);
2976 addReply(c,shared.crlf);
2977 } else {
2978 addReply(c,shared.plus);
2979 addReply(c,dictGetEntryKey(de));
2980 addReply(c,shared.crlf);
2981 }
2982 }
2983
2984 static void keysCommand(redisClient *c) {
2985 dictIterator *di;
2986 dictEntry *de;
2987 sds pattern = c->argv[1]->ptr;
2988 int plen = sdslen(pattern);
2989 int numkeys = 0, keyslen = 0;
2990 robj *lenobj = createObject(REDIS_STRING,NULL);
2991
2992 di = dictGetIterator(c->db->dict);
2993 addReply(c,lenobj);
2994 decrRefCount(lenobj);
2995 while((de = dictNext(di)) != NULL) {
2996 robj *keyobj = dictGetEntryKey(de);
2997
2998 sds key = keyobj->ptr;
2999 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3000 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3001 if (expireIfNeeded(c->db,keyobj) == 0) {
3002 if (numkeys != 0)
3003 addReply(c,shared.space);
3004 addReply(c,keyobj);
3005 numkeys++;
3006 keyslen += sdslen(key);
3007 }
3008 }
3009 }
3010 dictReleaseIterator(di);
3011 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3012 addReply(c,shared.crlf);
3013 }
3014
3015 static void dbsizeCommand(redisClient *c) {
3016 addReplySds(c,
3017 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3018 }
3019
3020 static void lastsaveCommand(redisClient *c) {
3021 addReplySds(c,
3022 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3023 }
3024
3025 static void typeCommand(redisClient *c) {
3026 robj *o;
3027 char *type;
3028
3029 o = lookupKeyRead(c->db,c->argv[1]);
3030 if (o == NULL) {
3031 type = "+none";
3032 } else {
3033 switch(o->type) {
3034 case REDIS_STRING: type = "+string"; break;
3035 case REDIS_LIST: type = "+list"; break;
3036 case REDIS_SET: type = "+set"; break;
3037 case REDIS_ZSET: type = "+zset"; break;
3038 default: type = "unknown"; break;
3039 }
3040 }
3041 addReplySds(c,sdsnew(type));
3042 addReply(c,shared.crlf);
3043 }
3044
3045 static void saveCommand(redisClient *c) {
3046 if (server.bgsaveinprogress) {
3047 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3048 return;
3049 }
3050 if (rdbSave(server.dbfilename) == REDIS_OK) {
3051 addReply(c,shared.ok);
3052 } else {
3053 addReply(c,shared.err);
3054 }
3055 }
3056
3057 static void bgsaveCommand(redisClient *c) {
3058 if (server.bgsaveinprogress) {
3059 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3060 return;
3061 }
3062 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3063 addReply(c,shared.ok);
3064 } else {
3065 addReply(c,shared.err);
3066 }
3067 }
3068
3069 static void shutdownCommand(redisClient *c) {
3070 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3071 /* Kill the saving child if there is a background saving in progress.
3072 We want to avoid race conditions, for instance our saving child may
3073 overwrite the synchronous saving did by SHUTDOWN. */
3074 if (server.bgsaveinprogress) {
3075 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3076 kill(server.bgsavechildpid,SIGKILL);
3077 rdbRemoveTempFile(server.bgsavechildpid);
3078 }
3079 /* SYNC SAVE */
3080 if (rdbSave(server.dbfilename) == REDIS_OK) {
3081 if (server.daemonize)
3082 unlink(server.pidfile);
3083 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3084 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3085 exit(1);
3086 } else {
3087 /* Ooops.. error saving! The best we can do is to continue operating.
3088 * Note that if there was a background saving process, in the next
3089 * cron() Redis will be notified that the background saving aborted,
3090 * handling special stuff like slaves pending for synchronization... */
3091 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3092 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3093 }
3094 }
3095
3096 static void renameGenericCommand(redisClient *c, int nx) {
3097 robj *o;
3098
3099 /* To use the same key as src and dst is probably an error */
3100 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3101 addReply(c,shared.sameobjecterr);
3102 return;
3103 }
3104
3105 o = lookupKeyWrite(c->db,c->argv[1]);
3106 if (o == NULL) {
3107 addReply(c,shared.nokeyerr);
3108 return;
3109 }
3110 incrRefCount(o);
3111 deleteIfVolatile(c->db,c->argv[2]);
3112 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3113 if (nx) {
3114 decrRefCount(o);
3115 addReply(c,shared.czero);
3116 return;
3117 }
3118 dictReplace(c->db->dict,c->argv[2],o);
3119 } else {
3120 incrRefCount(c->argv[2]);
3121 }
3122 deleteKey(c->db,c->argv[1]);
3123 server.dirty++;
3124 addReply(c,nx ? shared.cone : shared.ok);
3125 }
3126
3127 static void renameCommand(redisClient *c) {
3128 renameGenericCommand(c,0);
3129 }
3130
3131 static void renamenxCommand(redisClient *c) {
3132 renameGenericCommand(c,1);
3133 }
3134
3135 static void moveCommand(redisClient *c) {
3136 robj *o;
3137 redisDb *src, *dst;
3138 int srcid;
3139
3140 /* Obtain source and target DB pointers */
3141 src = c->db;
3142 srcid = c->db->id;
3143 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3144 addReply(c,shared.outofrangeerr);
3145 return;
3146 }
3147 dst = c->db;
3148 selectDb(c,srcid); /* Back to the source DB */
3149
3150 /* If the user is moving using as target the same
3151 * DB as the source DB it is probably an error. */
3152 if (src == dst) {
3153 addReply(c,shared.sameobjecterr);
3154 return;
3155 }
3156
3157 /* Check if the element exists and get a reference */
3158 o = lookupKeyWrite(c->db,c->argv[1]);
3159 if (!o) {
3160 addReply(c,shared.czero);
3161 return;
3162 }
3163
3164 /* Try to add the element to the target DB */
3165 deleteIfVolatile(dst,c->argv[1]);
3166 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3167 addReply(c,shared.czero);
3168 return;
3169 }
3170 incrRefCount(c->argv[1]);
3171 incrRefCount(o);
3172
3173 /* OK! key moved, free the entry in the source DB */
3174 deleteKey(src,c->argv[1]);
3175 server.dirty++;
3176 addReply(c,shared.cone);
3177 }
3178
3179 /* =================================== Lists ================================ */
3180 static void pushGenericCommand(redisClient *c, int where) {
3181 robj *lobj;
3182 list *list;
3183
3184 lobj = lookupKeyWrite(c->db,c->argv[1]);
3185 if (lobj == NULL) {
3186 lobj = createListObject();
3187 list = lobj->ptr;
3188 if (where == REDIS_HEAD) {
3189 listAddNodeHead(list,c->argv[2]);
3190 } else {
3191 listAddNodeTail(list,c->argv[2]);
3192 }
3193 dictAdd(c->db->dict,c->argv[1],lobj);
3194 incrRefCount(c->argv[1]);
3195 incrRefCount(c->argv[2]);
3196 } else {
3197 if (lobj->type != REDIS_LIST) {
3198 addReply(c,shared.wrongtypeerr);
3199 return;
3200 }
3201 list = lobj->ptr;
3202 if (where == REDIS_HEAD) {
3203 listAddNodeHead(list,c->argv[2]);
3204 } else {
3205 listAddNodeTail(list,c->argv[2]);
3206 }
3207 incrRefCount(c->argv[2]);
3208 }
3209 server.dirty++;
3210 addReply(c,shared.ok);
3211 }
3212
3213 static void lpushCommand(redisClient *c) {
3214 pushGenericCommand(c,REDIS_HEAD);
3215 }
3216
3217 static void rpushCommand(redisClient *c) {
3218 pushGenericCommand(c,REDIS_TAIL);
3219 }
3220
3221 static void llenCommand(redisClient *c) {
3222 robj *o;
3223 list *l;
3224
3225 o = lookupKeyRead(c->db,c->argv[1]);
3226 if (o == NULL) {
3227 addReply(c,shared.czero);
3228 return;
3229 } else {
3230 if (o->type != REDIS_LIST) {
3231 addReply(c,shared.wrongtypeerr);
3232 } else {
3233 l = o->ptr;
3234 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3235 }
3236 }
3237 }
3238
3239 static void lindexCommand(redisClient *c) {
3240 robj *o;
3241 int index = atoi(c->argv[2]->ptr);
3242
3243 o = lookupKeyRead(c->db,c->argv[1]);
3244 if (o == NULL) {
3245 addReply(c,shared.nullbulk);
3246 } else {
3247 if (o->type != REDIS_LIST) {
3248 addReply(c,shared.wrongtypeerr);
3249 } else {
3250 list *list = o->ptr;
3251 listNode *ln;
3252
3253 ln = listIndex(list, index);
3254 if (ln == NULL) {
3255 addReply(c,shared.nullbulk);
3256 } else {
3257 robj *ele = listNodeValue(ln);
3258 addReplyBulkLen(c,ele);
3259 addReply(c,ele);
3260 addReply(c,shared.crlf);
3261 }
3262 }
3263 }
3264 }
3265
3266 static void lsetCommand(redisClient *c) {
3267 robj *o;
3268 int index = atoi(c->argv[2]->ptr);
3269
3270 o = lookupKeyWrite(c->db,c->argv[1]);
3271 if (o == NULL) {
3272 addReply(c,shared.nokeyerr);
3273 } else {
3274 if (o->type != REDIS_LIST) {
3275 addReply(c,shared.wrongtypeerr);
3276 } else {
3277 list *list = o->ptr;
3278 listNode *ln;
3279
3280 ln = listIndex(list, index);
3281 if (ln == NULL) {
3282 addReply(c,shared.outofrangeerr);
3283 } else {
3284 robj *ele = listNodeValue(ln);
3285
3286 decrRefCount(ele);
3287 listNodeValue(ln) = c->argv[3];
3288 incrRefCount(c->argv[3]);
3289 addReply(c,shared.ok);
3290 server.dirty++;
3291 }
3292 }
3293 }
3294 }
3295
3296 static void popGenericCommand(redisClient *c, int where) {
3297 robj *o;
3298
3299 o = lookupKeyWrite(c->db,c->argv[1]);
3300 if (o == NULL) {
3301 addReply(c,shared.nullbulk);
3302 } else {
3303 if (o->type != REDIS_LIST) {
3304 addReply(c,shared.wrongtypeerr);
3305 } else {
3306 list *list = o->ptr;
3307 listNode *ln;
3308
3309 if (where == REDIS_HEAD)
3310 ln = listFirst(list);
3311 else
3312 ln = listLast(list);
3313
3314 if (ln == NULL) {
3315 addReply(c,shared.nullbulk);
3316 } else {
3317 robj *ele = listNodeValue(ln);
3318 addReplyBulkLen(c,ele);
3319 addReply(c,ele);
3320 addReply(c,shared.crlf);
3321 listDelNode(list,ln);
3322 server.dirty++;
3323 }
3324 }
3325 }
3326 }
3327
3328 static void lpopCommand(redisClient *c) {
3329 popGenericCommand(c,REDIS_HEAD);
3330 }
3331
3332 static void rpopCommand(redisClient *c) {
3333 popGenericCommand(c,REDIS_TAIL);
3334 }
3335
3336 static void lrangeCommand(redisClient *c) {
3337 robj *o;
3338 int start = atoi(c->argv[2]->ptr);
3339 int end = atoi(c->argv[3]->ptr);
3340
3341 o = lookupKeyRead(c->db,c->argv[1]);
3342 if (o == NULL) {
3343 addReply(c,shared.nullmultibulk);
3344 } else {
3345 if (o->type != REDIS_LIST) {
3346 addReply(c,shared.wrongtypeerr);
3347 } else {
3348 list *list = o->ptr;
3349 listNode *ln;
3350 int llen = listLength(list);
3351 int rangelen, j;
3352 robj *ele;
3353
3354 /* convert negative indexes */
3355 if (start < 0) start = llen+start;
3356 if (end < 0) end = llen+end;
3357 if (start < 0) start = 0;
3358 if (end < 0) end = 0;
3359
3360 /* indexes sanity checks */
3361 if (start > end || start >= llen) {
3362 /* Out of range start or start > end result in empty list */
3363 addReply(c,shared.emptymultibulk);
3364 return;
3365 }
3366 if (end >= llen) end = llen-1;
3367 rangelen = (end-start)+1;
3368
3369 /* Return the result in form of a multi-bulk reply */
3370 ln = listIndex(list, start);
3371 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3372 for (j = 0; j < rangelen; j++) {
3373 ele = listNodeValue(ln);
3374 addReplyBulkLen(c,ele);
3375 addReply(c,ele);
3376 addReply(c,shared.crlf);
3377 ln = ln->next;
3378 }
3379 }
3380 }
3381 }
3382
3383 static void ltrimCommand(redisClient *c) {
3384 robj *o;
3385 int start = atoi(c->argv[2]->ptr);
3386 int end = atoi(c->argv[3]->ptr);
3387
3388 o = lookupKeyWrite(c->db,c->argv[1]);
3389 if (o == NULL) {
3390 addReply(c,shared.nokeyerr);
3391 } else {
3392 if (o->type != REDIS_LIST) {
3393 addReply(c,shared.wrongtypeerr);
3394 } else {
3395 list *list = o->ptr;
3396 listNode *ln;
3397 int llen = listLength(list);
3398 int j, ltrim, rtrim;
3399
3400 /* convert negative indexes */
3401 if (start < 0) start = llen+start;
3402 if (end < 0) end = llen+end;
3403 if (start < 0) start = 0;
3404 if (end < 0) end = 0;
3405
3406 /* indexes sanity checks */
3407 if (start > end || start >= llen) {
3408 /* Out of range start or start > end result in empty list */
3409 ltrim = llen;
3410 rtrim = 0;
3411 } else {
3412 if (end >= llen) end = llen-1;
3413 ltrim = start;
3414 rtrim = llen-end-1;
3415 }
3416
3417 /* Remove list elements to perform the trim */
3418 for (j = 0; j < ltrim; j++) {
3419 ln = listFirst(list);
3420 listDelNode(list,ln);
3421 }
3422 for (j = 0; j < rtrim; j++) {
3423 ln = listLast(list);
3424 listDelNode(list,ln);
3425 }
3426 server.dirty++;
3427 addReply(c,shared.ok);
3428 }
3429 }
3430 }
3431
3432 static void lremCommand(redisClient *c) {
3433 robj *o;
3434
3435 o = lookupKeyWrite(c->db,c->argv[1]);
3436 if (o == NULL) {
3437 addReply(c,shared.czero);
3438 } else {
3439 if (o->type != REDIS_LIST) {
3440 addReply(c,shared.wrongtypeerr);
3441 } else {
3442 list *list = o->ptr;
3443 listNode *ln, *next;
3444 int toremove = atoi(c->argv[2]->ptr);
3445 int removed = 0;
3446 int fromtail = 0;
3447
3448 if (toremove < 0) {
3449 toremove = -toremove;
3450 fromtail = 1;
3451 }
3452 ln = fromtail ? list->tail : list->head;
3453 while (ln) {
3454 robj *ele = listNodeValue(ln);
3455
3456 next = fromtail ? ln->prev : ln->next;
3457 if (compareStringObjects(ele,c->argv[3]) == 0) {
3458 listDelNode(list,ln);
3459 server.dirty++;
3460 removed++;
3461 if (toremove && removed == toremove) break;
3462 }
3463 ln = next;
3464 }
3465 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3466 }
3467 }
3468 }
3469
3470 /* ==================================== Sets ================================ */
3471
3472 static void saddCommand(redisClient *c) {
3473 robj *set;
3474
3475 set = lookupKeyWrite(c->db,c->argv[1]);
3476 if (set == NULL) {
3477 set = createSetObject();
3478 dictAdd(c->db->dict,c->argv[1],set);
3479 incrRefCount(c->argv[1]);
3480 } else {
3481 if (set->type != REDIS_SET) {
3482 addReply(c,shared.wrongtypeerr);
3483 return;
3484 }
3485 }
3486 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3487 incrRefCount(c->argv[2]);
3488 server.dirty++;
3489 addReply(c,shared.cone);
3490 } else {
3491 addReply(c,shared.czero);
3492 }
3493 }
3494
3495 static void sremCommand(redisClient *c) {
3496 robj *set;
3497
3498 set = lookupKeyWrite(c->db,c->argv[1]);
3499 if (set == NULL) {
3500 addReply(c,shared.czero);
3501 } else {
3502 if (set->type != REDIS_SET) {
3503 addReply(c,shared.wrongtypeerr);
3504 return;
3505 }
3506 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3507 server.dirty++;
3508 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3509 addReply(c,shared.cone);
3510 } else {
3511 addReply(c,shared.czero);
3512 }
3513 }
3514 }
3515
3516 static void smoveCommand(redisClient *c) {
3517 robj *srcset, *dstset;
3518
3519 srcset = lookupKeyWrite(c->db,c->argv[1]);
3520 dstset = lookupKeyWrite(c->db,c->argv[2]);
3521
3522 /* If the source key does not exist return 0, if it's of the wrong type
3523 * raise an error */
3524 if (srcset == NULL || srcset->type != REDIS_SET) {
3525 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3526 return;
3527 }
3528 /* Error if the destination key is not a set as well */
3529 if (dstset && dstset->type != REDIS_SET) {
3530 addReply(c,shared.wrongtypeerr);
3531 return;
3532 }
3533 /* Remove the element from the source set */
3534 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3535 /* Key not found in the src set! return zero */
3536 addReply(c,shared.czero);
3537 return;
3538 }
3539 server.dirty++;
3540 /* Add the element to the destination set */
3541 if (!dstset) {
3542 dstset = createSetObject();
3543 dictAdd(c->db->dict,c->argv[2],dstset);
3544 incrRefCount(c->argv[2]);
3545 }
3546 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3547 incrRefCount(c->argv[3]);
3548 addReply(c,shared.cone);
3549 }
3550
3551 static void sismemberCommand(redisClient *c) {
3552 robj *set;
3553
3554 set = lookupKeyRead(c->db,c->argv[1]);
3555 if (set == NULL) {
3556 addReply(c,shared.czero);
3557 } else {
3558 if (set->type != REDIS_SET) {
3559 addReply(c,shared.wrongtypeerr);
3560 return;
3561 }
3562 if (dictFind(set->ptr,c->argv[2]))
3563 addReply(c,shared.cone);
3564 else
3565 addReply(c,shared.czero);
3566 }
3567 }
3568
3569 static void scardCommand(redisClient *c) {
3570 robj *o;
3571 dict *s;
3572
3573 o = lookupKeyRead(c->db,c->argv[1]);
3574 if (o == NULL) {
3575 addReply(c,shared.czero);
3576 return;
3577 } else {
3578 if (o->type != REDIS_SET) {
3579 addReply(c,shared.wrongtypeerr);
3580 } else {
3581 s = o->ptr;
3582 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3583 dictSize(s)));
3584 }
3585 }
3586 }
3587
3588 static void spopCommand(redisClient *c) {
3589 robj *set;
3590 dictEntry *de;
3591
3592 set = lookupKeyWrite(c->db,c->argv[1]);
3593 if (set == NULL) {
3594 addReply(c,shared.nullbulk);
3595 } else {
3596 if (set->type != REDIS_SET) {
3597 addReply(c,shared.wrongtypeerr);
3598 return;
3599 }
3600 de = dictGetRandomKey(set->ptr);
3601 if (de == NULL) {
3602 addReply(c,shared.nullbulk);
3603 } else {
3604 robj *ele = dictGetEntryKey(de);
3605
3606 addReplyBulkLen(c,ele);
3607 addReply(c,ele);
3608 addReply(c,shared.crlf);
3609 dictDelete(set->ptr,ele);
3610 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3611 server.dirty++;
3612 }
3613 }
3614 }
3615
3616 static void srandmemberCommand(redisClient *c) {
3617 robj *set;
3618 dictEntry *de;
3619
3620 set = lookupKeyRead(c->db,c->argv[1]);
3621 if (set == NULL) {
3622 addReply(c,shared.nullbulk);
3623 } else {
3624 if (set->type != REDIS_SET) {
3625 addReply(c,shared.wrongtypeerr);
3626 return;
3627 }
3628 de = dictGetRandomKey(set->ptr);
3629 if (de == NULL) {
3630 addReply(c,shared.nullbulk);
3631 } else {
3632 robj *ele = dictGetEntryKey(de);
3633
3634 addReplyBulkLen(c,ele);
3635 addReply(c,ele);
3636 addReply(c,shared.crlf);
3637 }
3638 }
3639 }
3640
3641 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3642 dict **d1 = (void*) s1, **d2 = (void*) s2;
3643
3644 return dictSize(*d1)-dictSize(*d2);
3645 }
3646
3647 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3648 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3649 dictIterator *di;
3650 dictEntry *de;
3651 robj *lenobj = NULL, *dstset = NULL;
3652 int j, cardinality = 0;
3653
3654 for (j = 0; j < setsnum; j++) {
3655 robj *setobj;
3656
3657 setobj = dstkey ?
3658 lookupKeyWrite(c->db,setskeys[j]) :
3659 lookupKeyRead(c->db,setskeys[j]);
3660 if (!setobj) {
3661 zfree(dv);
3662 if (dstkey) {
3663 deleteKey(c->db,dstkey);
3664 addReply(c,shared.ok);
3665 } else {
3666 addReply(c,shared.nullmultibulk);
3667 }
3668 return;
3669 }
3670 if (setobj->type != REDIS_SET) {
3671 zfree(dv);
3672 addReply(c,shared.wrongtypeerr);
3673 return;
3674 }
3675 dv[j] = setobj->ptr;
3676 }
3677 /* Sort sets from the smallest to largest, this will improve our
3678 * algorithm's performace */
3679 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3680
3681 /* The first thing we should output is the total number of elements...
3682 * since this is a multi-bulk write, but at this stage we don't know
3683 * the intersection set size, so we use a trick, append an empty object
3684 * to the output list and save the pointer to later modify it with the
3685 * right length */
3686 if (!dstkey) {
3687 lenobj = createObject(REDIS_STRING,NULL);
3688 addReply(c,lenobj);
3689 decrRefCount(lenobj);
3690 } else {
3691 /* If we have a target key where to store the resulting set
3692 * create this key with an empty set inside */
3693 dstset = createSetObject();
3694 }
3695
3696 /* Iterate all the elements of the first (smallest) set, and test
3697 * the element against all the other sets, if at least one set does
3698 * not include the element it is discarded */
3699 di = dictGetIterator(dv[0]);
3700
3701 while((de = dictNext(di)) != NULL) {
3702 robj *ele;
3703
3704 for (j = 1; j < setsnum; j++)
3705 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3706 if (j != setsnum)
3707 continue; /* at least one set does not contain the member */
3708 ele = dictGetEntryKey(de);
3709 if (!dstkey) {
3710 addReplyBulkLen(c,ele);
3711 addReply(c,ele);
3712 addReply(c,shared.crlf);
3713 cardinality++;
3714 } else {
3715 dictAdd(dstset->ptr,ele,NULL);
3716 incrRefCount(ele);
3717 }
3718 }
3719 dictReleaseIterator(di);
3720
3721 if (dstkey) {
3722 /* Store the resulting set into the target */
3723 deleteKey(c->db,dstkey);
3724 dictAdd(c->db->dict,dstkey,dstset);
3725 incrRefCount(dstkey);
3726 }
3727
3728 if (!dstkey) {
3729 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3730 } else {
3731 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3732 dictSize((dict*)dstset->ptr)));
3733 server.dirty++;
3734 }
3735 zfree(dv);
3736 }
3737
3738 static void sinterCommand(redisClient *c) {
3739 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3740 }
3741
3742 static void sinterstoreCommand(redisClient *c) {
3743 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3744 }
3745
3746 #define REDIS_OP_UNION 0
3747 #define REDIS_OP_DIFF 1
3748
3749 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3750 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3751 dictIterator *di;
3752 dictEntry *de;
3753 robj *dstset = NULL;
3754 int j, cardinality = 0;
3755
3756 for (j = 0; j < setsnum; j++) {
3757 robj *setobj;
3758
3759 setobj = dstkey ?
3760 lookupKeyWrite(c->db,setskeys[j]) :
3761 lookupKeyRead(c->db,setskeys[j]);
3762 if (!setobj) {
3763 dv[j] = NULL;
3764 continue;
3765 }
3766 if (setobj->type != REDIS_SET) {
3767 zfree(dv);
3768 addReply(c,shared.wrongtypeerr);
3769 return;
3770 }
3771 dv[j] = setobj->ptr;
3772 }
3773
3774 /* We need a temp set object to store our union. If the dstkey
3775 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3776 * this set object will be the resulting object to set into the target key*/
3777 dstset = createSetObject();
3778
3779 /* Iterate all the elements of all the sets, add every element a single
3780 * time to the result set */
3781 for (j = 0; j < setsnum; j++) {
3782 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3783 if (!dv[j]) continue; /* non existing keys are like empty sets */
3784
3785 di = dictGetIterator(dv[j]);
3786
3787 while((de = dictNext(di)) != NULL) {
3788 robj *ele;
3789
3790 /* dictAdd will not add the same element multiple times */
3791 ele = dictGetEntryKey(de);
3792 if (op == REDIS_OP_UNION || j == 0) {
3793 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3794 incrRefCount(ele);
3795 cardinality++;
3796 }
3797 } else if (op == REDIS_OP_DIFF) {
3798 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3799 cardinality--;
3800 }
3801 }
3802 }
3803 dictReleaseIterator(di);
3804
3805 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3806 }
3807
3808 /* Output the content of the resulting set, if not in STORE mode */
3809 if (!dstkey) {
3810 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3811 di = dictGetIterator(dstset->ptr);
3812 while((de = dictNext(di)) != NULL) {
3813 robj *ele;
3814
3815 ele = dictGetEntryKey(de);
3816 addReplyBulkLen(c,ele);
3817 addReply(c,ele);
3818 addReply(c,shared.crlf);
3819 }
3820 dictReleaseIterator(di);
3821 } else {
3822 /* If we have a target key where to store the resulting set
3823 * create this key with the result set inside */
3824 deleteKey(c->db,dstkey);
3825 dictAdd(c->db->dict,dstkey,dstset);
3826 incrRefCount(dstkey);
3827 }
3828
3829 /* Cleanup */
3830 if (!dstkey) {
3831 decrRefCount(dstset);
3832 } else {
3833 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3834 dictSize((dict*)dstset->ptr)));
3835 server.dirty++;
3836 }
3837 zfree(dv);
3838 }
3839
3840 static void sunionCommand(redisClient *c) {
3841 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3842 }
3843
3844 static void sunionstoreCommand(redisClient *c) {
3845 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3846 }
3847
3848 static void sdiffCommand(redisClient *c) {
3849 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3850 }
3851
3852 static void sdiffstoreCommand(redisClient *c) {
3853 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3854 }
3855
3856 /* ==================================== ZSets =============================== */
3857
3858 /* ZSETs are ordered sets using two data structures to hold the same elements
3859 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3860 * data structure.
3861 *
3862 * The elements are added to an hash table mapping Redis objects to scores.
3863 * At the same time the elements are added to a skip list mapping scores
3864 * to Redis objects (so objects are sorted by scores in this "view"). */
3865
3866 /* This skiplist implementation is almost a C translation of the original
3867 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3868 * Alternative to Balanced Trees", modified in three ways:
3869 * a) this implementation allows for repeated values.
3870 * b) the comparison is not just by key (our 'score') but by satellite data.
3871 * c) there is a back pointer, so it's a doubly linked list with the back
3872 * pointers being only at "level 1". This allows to traverse the list
3873 * from tail to head, useful for ZREVRANGE. */
3874
3875 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3876 zskiplistNode *zn = zmalloc(sizeof(*zn));
3877
3878 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3879 zn->score = score;
3880 zn->obj = obj;
3881 return zn;
3882 }
3883
3884 static zskiplist *zslCreate(void) {
3885 int j;
3886 zskiplist *zsl;
3887
3888 zsl = zmalloc(sizeof(*zsl));
3889 zsl->level = 1;
3890 zsl->length = 0;
3891 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3892 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3893 zsl->header->forward[j] = NULL;
3894 zsl->header->backward = NULL;
3895 zsl->tail = NULL;
3896 return zsl;
3897 }
3898
3899 static void zslFreeNode(zskiplistNode *node) {
3900 decrRefCount(node->obj);
3901 zfree(node->forward);
3902 zfree(node);
3903 }
3904
3905 static void zslFree(zskiplist *zsl) {
3906 zskiplistNode *node = zsl->header->forward[0], *next;
3907
3908 zfree(zsl->header->forward);
3909 zfree(zsl->header);
3910 while(node) {
3911 next = node->forward[0];
3912 zslFreeNode(node);
3913 node = next;
3914 }
3915 zfree(zsl);
3916 }
3917
3918 static int zslRandomLevel(void) {
3919 int level = 1;
3920 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3921 level += 1;
3922 return level;
3923 }
3924
3925 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3926 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3927 int i, level;
3928
3929 x = zsl->header;
3930 for (i = zsl->level-1; i >= 0; i--) {
3931 while (x->forward[i] &&
3932 (x->forward[i]->score < score ||
3933 (x->forward[i]->score == score &&
3934 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3935 x = x->forward[i];
3936 update[i] = x;
3937 }
3938 /* we assume the key is not already inside, since we allow duplicated
3939 * scores, and the re-insertion of score and redis object should never
3940 * happpen since the caller of zslInsert() should test in the hash table
3941 * if the element is already inside or not. */
3942 level = zslRandomLevel();
3943 if (level > zsl->level) {
3944 for (i = zsl->level; i < level; i++)
3945 update[i] = zsl->header;
3946 zsl->level = level;
3947 }
3948 x = zslCreateNode(level,score,obj);
3949 for (i = 0; i < level; i++) {
3950 x->forward[i] = update[i]->forward[i];
3951 update[i]->forward[i] = x;
3952 }
3953 x->backward = (update[0] == zsl->header) ? NULL : update[0];
3954 if (x->forward[0])
3955 x->forward[0]->backward = x;
3956 else
3957 zsl->tail = x;
3958 zsl->length++;
3959 }
3960
3961 /* Delete an element with matching score/object from the skiplist. */
3962 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
3963 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3964 int i;
3965
3966 x = zsl->header;
3967 for (i = zsl->level-1; i >= 0; i--) {
3968 while (x->forward[i] &&
3969 (x->forward[i]->score < score ||
3970 (x->forward[i]->score == score &&
3971 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3972 x = x->forward[i];
3973 update[i] = x;
3974 }
3975 /* We may have multiple elements with the same score, what we need
3976 * is to find the element with both the right score and object. */
3977 x = x->forward[0];
3978 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
3979 for (i = 0; i < zsl->level; i++) {
3980 if (update[i]->forward[i] != x) break;
3981 update[i]->forward[i] = x->forward[i];
3982 }
3983 if (x->forward[0]) {
3984 x->forward[0]->backward = (x->backward == zsl->header) ?
3985 NULL : x->backward;
3986 } else {
3987 zsl->tail = x->backward;
3988 }
3989 zslFreeNode(x);
3990 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
3991 zsl->level--;
3992 zsl->length--;
3993 return 1;
3994 } else {
3995 return 0; /* not found */
3996 }
3997 return 0; /* not found */
3998 }
3999
4000 /* Delete all the elements with score between min and max from the skiplist.
4001 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4002 * Note that this function takes the reference to the hash table view of the
4003 * sorted set, in order to remove the elements from the hash table too. */
4004 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4005 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4006 unsigned long removed = 0;
4007 int i;
4008
4009 x = zsl->header;
4010 for (i = zsl->level-1; i >= 0; i--) {
4011 while (x->forward[i] && x->forward[i]->score < min)
4012 x = x->forward[i];
4013 update[i] = x;
4014 }
4015 /* We may have multiple elements with the same score, what we need
4016 * is to find the element with both the right score and object. */
4017 x = x->forward[0];
4018 while (x && x->score <= max) {
4019 zskiplistNode *next;
4020
4021 for (i = 0; i < zsl->level; i++) {
4022 if (update[i]->forward[i] != x) break;
4023 update[i]->forward[i] = x->forward[i];
4024 }
4025 if (x->forward[0]) {
4026 x->forward[0]->backward = (x->backward == zsl->header) ?
4027 NULL : x->backward;
4028 } else {
4029 zsl->tail = x->backward;
4030 }
4031 next = x->forward[0];
4032 dictDelete(dict,x->obj);
4033 zslFreeNode(x);
4034 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4035 zsl->level--;
4036 zsl->length--;
4037 removed++;
4038 x = next;
4039 }
4040 return removed; /* not found */
4041 }
4042
4043 /* Find the first node having a score equal or greater than the specified one.
4044 * Returns NULL if there is no match. */
4045 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4046 zskiplistNode *x;
4047 int i;
4048
4049 x = zsl->header;
4050 for (i = zsl->level-1; i >= 0; i--) {
4051 while (x->forward[i] && x->forward[i]->score < score)
4052 x = x->forward[i];
4053 }
4054 /* We may have multiple elements with the same score, what we need
4055 * is to find the element with both the right score and object. */
4056 return x->forward[0];
4057 }
4058
4059 /* The actual Z-commands implementations */
4060
4061 static void zaddCommand(redisClient *c) {
4062 robj *zsetobj;
4063 zset *zs;
4064 double *score;
4065
4066 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4067 if (zsetobj == NULL) {
4068 zsetobj = createZsetObject();
4069 dictAdd(c->db->dict,c->argv[1],zsetobj);
4070 incrRefCount(c->argv[1]);
4071 } else {
4072 if (zsetobj->type != REDIS_ZSET) {
4073 addReply(c,shared.wrongtypeerr);
4074 return;
4075 }
4076 }
4077 score = zmalloc(sizeof(double));
4078 *score = strtod(c->argv[2]->ptr,NULL);
4079 zs = zsetobj->ptr;
4080 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
4081 /* case 1: New element */
4082 incrRefCount(c->argv[3]); /* added to hash */
4083 zslInsert(zs->zsl,*score,c->argv[3]);
4084 incrRefCount(c->argv[3]); /* added to skiplist */
4085 server.dirty++;
4086 addReply(c,shared.cone);
4087 } else {
4088 dictEntry *de;
4089 double *oldscore;
4090
4091 /* case 2: Score update operation */
4092 de = dictFind(zs->dict,c->argv[3]);
4093 assert(de != NULL);
4094 oldscore = dictGetEntryVal(de);
4095 if (*score != *oldscore) {
4096 int deleted;
4097
4098 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
4099 assert(deleted != 0);
4100 zslInsert(zs->zsl,*score,c->argv[3]);
4101 incrRefCount(c->argv[3]);
4102 dictReplace(zs->dict,c->argv[3],score);
4103 server.dirty++;
4104 } else {
4105 zfree(score);
4106 }
4107 addReply(c,shared.czero);
4108 }
4109 }
4110
4111 static void zremCommand(redisClient *c) {
4112 robj *zsetobj;
4113 zset *zs;
4114
4115 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4116 if (zsetobj == NULL) {
4117 addReply(c,shared.czero);
4118 } else {
4119 dictEntry *de;
4120 double *oldscore;
4121 int deleted;
4122
4123 if (zsetobj->type != REDIS_ZSET) {
4124 addReply(c,shared.wrongtypeerr);
4125 return;
4126 }
4127 zs = zsetobj->ptr;
4128 de = dictFind(zs->dict,c->argv[2]);
4129 if (de == NULL) {
4130 addReply(c,shared.czero);
4131 return;
4132 }
4133 /* Delete from the skiplist */
4134 oldscore = dictGetEntryVal(de);
4135 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4136 assert(deleted != 0);
4137
4138 /* Delete from the hash table */
4139 dictDelete(zs->dict,c->argv[2]);
4140 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4141 server.dirty++;
4142 addReply(c,shared.cone);
4143 }
4144 }
4145
4146 static void zremrangebyscoreCommand(redisClient *c) {
4147 double min = strtod(c->argv[2]->ptr,NULL);
4148 double max = strtod(c->argv[3]->ptr,NULL);
4149 robj *zsetobj;
4150 zset *zs;
4151
4152 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4153 if (zsetobj == NULL) {
4154 addReply(c,shared.czero);
4155 } else {
4156 long deleted;
4157
4158 if (zsetobj->type != REDIS_ZSET) {
4159 addReply(c,shared.wrongtypeerr);
4160 return;
4161 }
4162 zs = zsetobj->ptr;
4163 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4164 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4165 server.dirty += deleted;
4166 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4167 }
4168 }
4169
4170 static void zrangeGenericCommand(redisClient *c, int reverse) {
4171 robj *o;
4172 int start = atoi(c->argv[2]->ptr);
4173 int end = atoi(c->argv[3]->ptr);
4174
4175 o = lookupKeyRead(c->db,c->argv[1]);
4176 if (o == NULL) {
4177 addReply(c,shared.nullmultibulk);
4178 } else {
4179 if (o->type != REDIS_ZSET) {
4180 addReply(c,shared.wrongtypeerr);
4181 } else {
4182 zset *zsetobj = o->ptr;
4183 zskiplist *zsl = zsetobj->zsl;
4184 zskiplistNode *ln;
4185
4186 int llen = zsl->length;
4187 int rangelen, j;
4188 robj *ele;
4189
4190 /* convert negative indexes */
4191 if (start < 0) start = llen+start;
4192 if (end < 0) end = llen+end;
4193 if (start < 0) start = 0;
4194 if (end < 0) end = 0;
4195
4196 /* indexes sanity checks */
4197 if (start > end || start >= llen) {
4198 /* Out of range start or start > end result in empty list */
4199 addReply(c,shared.emptymultibulk);
4200 return;
4201 }
4202 if (end >= llen) end = llen-1;
4203 rangelen = (end-start)+1;
4204
4205 /* Return the result in form of a multi-bulk reply */
4206 if (reverse) {
4207 ln = zsl->tail;
4208 while (start--)
4209 ln = ln->backward;
4210 } else {
4211 ln = zsl->header->forward[0];
4212 while (start--)
4213 ln = ln->forward[0];
4214 }
4215
4216 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4217 for (j = 0; j < rangelen; j++) {
4218 ele = ln->obj;
4219 addReplyBulkLen(c,ele);
4220 addReply(c,ele);
4221 addReply(c,shared.crlf);
4222 ln = reverse ? ln->backward : ln->forward[0];
4223 }
4224 }
4225 }
4226 }
4227
4228 static void zrangeCommand(redisClient *c) {
4229 zrangeGenericCommand(c,0);
4230 }
4231
4232 static void zrevrangeCommand(redisClient *c) {
4233 zrangeGenericCommand(c,1);
4234 }
4235
4236 static void zrangebyscoreCommand(redisClient *c) {
4237 robj *o;
4238 double min = strtod(c->argv[2]->ptr,NULL);
4239 double max = strtod(c->argv[3]->ptr,NULL);
4240
4241 o = lookupKeyRead(c->db,c->argv[1]);
4242 if (o == NULL) {
4243 addReply(c,shared.nullmultibulk);
4244 } else {
4245 if (o->type != REDIS_ZSET) {
4246 addReply(c,shared.wrongtypeerr);
4247 } else {
4248 zset *zsetobj = o->ptr;
4249 zskiplist *zsl = zsetobj->zsl;
4250 zskiplistNode *ln;
4251 robj *ele, *lenobj;
4252 unsigned int rangelen = 0;
4253
4254 /* Get the first node with the score >= min */
4255 ln = zslFirstWithScore(zsl,min);
4256 if (ln == NULL) {
4257 /* No element matching the speciifed interval */
4258 addReply(c,shared.emptymultibulk);
4259 return;
4260 }
4261
4262 /* We don't know in advance how many matching elements there
4263 * are in the list, so we push this object that will represent
4264 * the multi-bulk length in the output buffer, and will "fix"
4265 * it later */
4266 lenobj = createObject(REDIS_STRING,NULL);
4267 addReply(c,lenobj);
4268
4269 while(ln && ln->score <= max) {
4270 ele = ln->obj;
4271 addReplyBulkLen(c,ele);
4272 addReply(c,ele);
4273 addReply(c,shared.crlf);
4274 ln = ln->forward[0];
4275 rangelen++;
4276 }
4277 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4278 }
4279 }
4280 }
4281
4282 static void zcardCommand(redisClient *c) {
4283 robj *o;
4284 zset *zs;
4285
4286 o = lookupKeyRead(c->db,c->argv[1]);
4287 if (o == NULL) {
4288 addReply(c,shared.czero);
4289 return;
4290 } else {
4291 if (o->type != REDIS_ZSET) {
4292 addReply(c,shared.wrongtypeerr);
4293 } else {
4294 zs = o->ptr;
4295 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4296 }
4297 }
4298 }
4299
4300 static void zscoreCommand(redisClient *c) {
4301 robj *o;
4302 zset *zs;
4303
4304 o = lookupKeyRead(c->db,c->argv[1]);
4305 if (o == NULL) {
4306 addReply(c,shared.czero);
4307 return;
4308 } else {
4309 if (o->type != REDIS_ZSET) {
4310 addReply(c,shared.wrongtypeerr);
4311 } else {
4312 dictEntry *de;
4313
4314 zs = o->ptr;
4315 de = dictFind(zs->dict,c->argv[2]);
4316 if (!de) {
4317 addReply(c,shared.nullbulk);
4318 } else {
4319 char buf[128];
4320 double *score = dictGetEntryVal(de);
4321
4322 snprintf(buf,sizeof(buf),"%.17g",*score);
4323 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
4324 strlen(buf),buf));
4325 }
4326 }
4327 }
4328 }
4329
4330 /* ========================= Non type-specific commands ==================== */
4331
4332 static void flushdbCommand(redisClient *c) {
4333 server.dirty += dictSize(c->db->dict);
4334 dictEmpty(c->db->dict);
4335 dictEmpty(c->db->expires);
4336 addReply(c,shared.ok);
4337 }
4338
4339 static void flushallCommand(redisClient *c) {
4340 server.dirty += emptyDb();
4341 addReply(c,shared.ok);
4342 rdbSave(server.dbfilename);
4343 server.dirty++;
4344 }
4345
4346 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4347 redisSortOperation *so = zmalloc(sizeof(*so));
4348 so->type = type;
4349 so->pattern = pattern;
4350 return so;
4351 }
4352
4353 /* Return the value associated to the key with a name obtained
4354 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4355 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4356 char *p;
4357 sds spat, ssub;
4358 robj keyobj;
4359 int prefixlen, sublen, postfixlen;
4360 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4361 struct {
4362 long len;
4363 long free;
4364 char buf[REDIS_SORTKEY_MAX+1];
4365 } keyname;
4366
4367 if (subst->encoding == REDIS_ENCODING_RAW)
4368 incrRefCount(subst);
4369 else {
4370 subst = getDecodedObject(subst);
4371 }
4372
4373 spat = pattern->ptr;
4374 ssub = subst->ptr;
4375 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4376 p = strchr(spat,'*');
4377 if (!p) return NULL;
4378
4379 prefixlen = p-spat;
4380 sublen = sdslen(ssub);
4381 postfixlen = sdslen(spat)-(prefixlen+1);
4382 memcpy(keyname.buf,spat,prefixlen);
4383 memcpy(keyname.buf+prefixlen,ssub,sublen);
4384 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4385 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4386 keyname.len = prefixlen+sublen+postfixlen;
4387
4388 keyobj.refcount = 1;
4389 keyobj.type = REDIS_STRING;
4390 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4391
4392 decrRefCount(subst);
4393
4394 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4395 return lookupKeyRead(db,&keyobj);
4396 }
4397
4398 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4399 * the additional parameter is not standard but a BSD-specific we have to
4400 * pass sorting parameters via the global 'server' structure */
4401 static int sortCompare(const void *s1, const void *s2) {
4402 const redisSortObject *so1 = s1, *so2 = s2;
4403 int cmp;
4404
4405 if (!server.sort_alpha) {
4406 /* Numeric sorting. Here it's trivial as we precomputed scores */
4407 if (so1->u.score > so2->u.score) {
4408 cmp = 1;
4409 } else if (so1->u.score < so2->u.score) {
4410 cmp = -1;
4411 } else {
4412 cmp = 0;
4413 }
4414 } else {
4415 /* Alphanumeric sorting */
4416 if (server.sort_bypattern) {
4417 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4418 /* At least one compare object is NULL */
4419 if (so1->u.cmpobj == so2->u.cmpobj)
4420 cmp = 0;
4421 else if (so1->u.cmpobj == NULL)
4422 cmp = -1;
4423 else
4424 cmp = 1;
4425 } else {
4426 /* We have both the objects, use strcoll */
4427 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4428 }
4429 } else {
4430 /* Compare elements directly */
4431 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4432 so2->obj->encoding == REDIS_ENCODING_RAW) {
4433 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4434 } else {
4435 robj *dec1, *dec2;
4436
4437 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4438 so1->obj : getDecodedObject(so1->obj);
4439 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4440 so2->obj : getDecodedObject(so2->obj);
4441 cmp = strcoll(dec1->ptr,dec2->ptr);
4442 if (dec1 != so1->obj) decrRefCount(dec1);
4443 if (dec2 != so2->obj) decrRefCount(dec2);
4444 }
4445 }
4446 }
4447 return server.sort_desc ? -cmp : cmp;
4448 }
4449
4450 /* The SORT command is the most complex command in Redis. Warning: this code
4451 * is optimized for speed and a bit less for readability */
4452 static void sortCommand(redisClient *c) {
4453 list *operations;
4454 int outputlen = 0;
4455 int desc = 0, alpha = 0;
4456 int limit_start = 0, limit_count = -1, start, end;
4457 int j, dontsort = 0, vectorlen;
4458 int getop = 0; /* GET operation counter */
4459 robj *sortval, *sortby = NULL, *storekey = NULL;
4460 redisSortObject *vector; /* Resulting vector to sort */
4461
4462 /* Lookup the key to sort. It must be of the right types */
4463 sortval = lookupKeyRead(c->db,c->argv[1]);
4464 if (sortval == NULL) {
4465 addReply(c,shared.nokeyerr);
4466 return;
4467 }
4468 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4469 addReply(c,shared.wrongtypeerr);
4470 return;
4471 }
4472
4473 /* Create a list of operations to perform for every sorted element.
4474 * Operations can be GET/DEL/INCR/DECR */
4475 operations = listCreate();
4476 listSetFreeMethod(operations,zfree);
4477 j = 2;
4478
4479 /* Now we need to protect sortval incrementing its count, in the future
4480 * SORT may have options able to overwrite/delete keys during the sorting
4481 * and the sorted key itself may get destroied */
4482 incrRefCount(sortval);
4483
4484 /* The SORT command has an SQL-alike syntax, parse it */
4485 while(j < c->argc) {
4486 int leftargs = c->argc-j-1;
4487 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4488 desc = 0;
4489 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4490 desc = 1;
4491 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4492 alpha = 1;
4493 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4494 limit_start = atoi(c->argv[j+1]->ptr);
4495 limit_count = atoi(c->argv[j+2]->ptr);
4496 j+=2;
4497 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4498 storekey = c->argv[j+1];
4499 j++;
4500 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4501 sortby = c->argv[j+1];
4502 /* If the BY pattern does not contain '*', i.e. it is constant,
4503 * we don't need to sort nor to lookup the weight keys. */
4504 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4505 j++;
4506 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4507 listAddNodeTail(operations,createSortOperation(
4508 REDIS_SORT_GET,c->argv[j+1]));
4509 getop++;
4510 j++;
4511 } else {
4512 decrRefCount(sortval);
4513 listRelease(operations);
4514 addReply(c,shared.syntaxerr);
4515 return;
4516 }
4517 j++;
4518 }
4519
4520 /* Load the sorting vector with all the objects to sort */
4521 vectorlen = (sortval->type == REDIS_LIST) ?
4522 listLength((list*)sortval->ptr) :
4523 dictSize((dict*)sortval->ptr);
4524 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4525 j = 0;
4526 if (sortval->type == REDIS_LIST) {
4527 list *list = sortval->ptr;
4528 listNode *ln;
4529
4530 listRewind(list);
4531 while((ln = listYield(list))) {
4532 robj *ele = ln->value;
4533 vector[j].obj = ele;
4534 vector[j].u.score = 0;
4535 vector[j].u.cmpobj = NULL;
4536 j++;
4537 }
4538 } else {
4539 dict *set = sortval->ptr;
4540 dictIterator *di;
4541 dictEntry *setele;
4542
4543 di = dictGetIterator(set);
4544 while((setele = dictNext(di)) != NULL) {
4545 vector[j].obj = dictGetEntryKey(setele);
4546 vector[j].u.score = 0;
4547 vector[j].u.cmpobj = NULL;
4548 j++;
4549 }
4550 dictReleaseIterator(di);
4551 }
4552 assert(j == vectorlen);
4553
4554 /* Now it's time to load the right scores in the sorting vector */
4555 if (dontsort == 0) {
4556 for (j = 0; j < vectorlen; j++) {
4557 if (sortby) {
4558 robj *byval;
4559
4560 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4561 if (!byval || byval->type != REDIS_STRING) continue;
4562 if (alpha) {
4563 if (byval->encoding == REDIS_ENCODING_RAW) {
4564 vector[j].u.cmpobj = byval;
4565 incrRefCount(byval);
4566 } else {
4567 vector[j].u.cmpobj = getDecodedObject(byval);
4568 }
4569 } else {
4570 if (byval->encoding == REDIS_ENCODING_RAW) {
4571 vector[j].u.score = strtod(byval->ptr,NULL);
4572 } else {
4573 if (byval->encoding == REDIS_ENCODING_INT) {
4574 vector[j].u.score = (long)byval->ptr;
4575 } else
4576 assert(1 != 1);
4577 }
4578 }
4579 } else {
4580 if (!alpha) {
4581 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4582 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4583 else {
4584 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4585 vector[j].u.score = (long) vector[j].obj->ptr;
4586 else
4587 assert(1 != 1);
4588 }
4589 }
4590 }
4591 }
4592 }
4593
4594 /* We are ready to sort the vector... perform a bit of sanity check
4595 * on the LIMIT option too. We'll use a partial version of quicksort. */
4596 start = (limit_start < 0) ? 0 : limit_start;
4597 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4598 if (start >= vectorlen) {
4599 start = vectorlen-1;
4600 end = vectorlen-2;
4601 }
4602 if (end >= vectorlen) end = vectorlen-1;
4603
4604 if (dontsort == 0) {
4605 server.sort_desc = desc;
4606 server.sort_alpha = alpha;
4607 server.sort_bypattern = sortby ? 1 : 0;
4608 if (sortby && (start != 0 || end != vectorlen-1))
4609 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4610 else
4611 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4612 }
4613
4614 /* Send command output to the output buffer, performing the specified
4615 * GET/DEL/INCR/DECR operations if any. */
4616 outputlen = getop ? getop*(end-start+1) : end-start+1;
4617 if (storekey == NULL) {
4618 /* STORE option not specified, sent the sorting result to client */
4619 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4620 for (j = start; j <= end; j++) {
4621 listNode *ln;
4622 if (!getop) {
4623 addReplyBulkLen(c,vector[j].obj);
4624 addReply(c,vector[j].obj);
4625 addReply(c,shared.crlf);
4626 }
4627 listRewind(operations);
4628 while((ln = listYield(operations))) {
4629 redisSortOperation *sop = ln->value;
4630 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4631 vector[j].obj);
4632
4633 if (sop->type == REDIS_SORT_GET) {
4634 if (!val || val->type != REDIS_STRING) {
4635 addReply(c,shared.nullbulk);
4636 } else {
4637 addReplyBulkLen(c,val);
4638 addReply(c,val);
4639 addReply(c,shared.crlf);
4640 }
4641 } else {
4642 assert(sop->type == REDIS_SORT_GET); /* always fails */
4643 }
4644 }
4645 }
4646 } else {
4647 robj *listObject = createListObject();
4648 list *listPtr = (list*) listObject->ptr;
4649
4650 /* STORE option specified, set the sorting result as a List object */
4651 for (j = start; j <= end; j++) {
4652 listNode *ln;
4653 if (!getop) {
4654 listAddNodeTail(listPtr,vector[j].obj);
4655 incrRefCount(vector[j].obj);
4656 }
4657 listRewind(operations);
4658 while((ln = listYield(operations))) {
4659 redisSortOperation *sop = ln->value;
4660 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4661 vector[j].obj);
4662
4663 if (sop->type == REDIS_SORT_GET) {
4664 if (!val || val->type != REDIS_STRING) {
4665 listAddNodeTail(listPtr,createStringObject("",0));
4666 } else {
4667 listAddNodeTail(listPtr,val);
4668 incrRefCount(val);
4669 }
4670 } else {
4671 assert(sop->type == REDIS_SORT_GET); /* always fails */
4672 }
4673 }
4674 }
4675 if (dictReplace(c->db->dict,storekey,listObject)) {
4676 incrRefCount(storekey);
4677 }
4678 /* Note: we add 1 because the DB is dirty anyway since even if the
4679 * SORT result is empty a new key is set and maybe the old content
4680 * replaced. */
4681 server.dirty += 1+outputlen;
4682 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
4683 }
4684
4685 /* Cleanup */
4686 decrRefCount(sortval);
4687 listRelease(operations);
4688 for (j = 0; j < vectorlen; j++) {
4689 if (sortby && alpha && vector[j].u.cmpobj)
4690 decrRefCount(vector[j].u.cmpobj);
4691 }
4692 zfree(vector);
4693 }
4694
4695 static void infoCommand(redisClient *c) {
4696 sds info;
4697 time_t uptime = time(NULL)-server.stat_starttime;
4698 int j;
4699
4700 info = sdscatprintf(sdsempty(),
4701 "redis_version:%s\r\n"
4702 "arch_bits:%s\r\n"
4703 "uptime_in_seconds:%d\r\n"
4704 "uptime_in_days:%d\r\n"
4705 "connected_clients:%d\r\n"
4706 "connected_slaves:%d\r\n"
4707 "used_memory:%zu\r\n"
4708 "changes_since_last_save:%lld\r\n"
4709 "bgsave_in_progress:%d\r\n"
4710 "last_save_time:%d\r\n"
4711 "total_connections_received:%lld\r\n"
4712 "total_commands_processed:%lld\r\n"
4713 "role:%s\r\n"
4714 ,REDIS_VERSION,
4715 (sizeof(long) == 8) ? "64" : "32",
4716 uptime,
4717 uptime/(3600*24),
4718 listLength(server.clients)-listLength(server.slaves),
4719 listLength(server.slaves),
4720 server.usedmemory,
4721 server.dirty,
4722 server.bgsaveinprogress,
4723 server.lastsave,
4724 server.stat_numconnections,
4725 server.stat_numcommands,
4726 server.masterhost == NULL ? "master" : "slave"
4727 );
4728 if (server.masterhost) {
4729 info = sdscatprintf(info,
4730 "master_host:%s\r\n"
4731 "master_port:%d\r\n"
4732 "master_link_status:%s\r\n"
4733 "master_last_io_seconds_ago:%d\r\n"
4734 ,server.masterhost,
4735 server.masterport,
4736 (server.replstate == REDIS_REPL_CONNECTED) ?
4737 "up" : "down",
4738 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
4739 );
4740 }
4741 for (j = 0; j < server.dbnum; j++) {
4742 long long keys, vkeys;
4743
4744 keys = dictSize(server.db[j].dict);
4745 vkeys = dictSize(server.db[j].expires);
4746 if (keys || vkeys) {
4747 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4748 j, keys, vkeys);
4749 }
4750 }
4751 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4752 addReplySds(c,info);
4753 addReply(c,shared.crlf);
4754 }
4755
4756 static void monitorCommand(redisClient *c) {
4757 /* ignore MONITOR if aleady slave or in monitor mode */
4758 if (c->flags & REDIS_SLAVE) return;
4759
4760 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4761 c->slaveseldb = 0;
4762 listAddNodeTail(server.monitors,c);
4763 addReply(c,shared.ok);
4764 }
4765
4766 /* ================================= Expire ================================= */
4767 static int removeExpire(redisDb *db, robj *key) {
4768 if (dictDelete(db->expires,key) == DICT_OK) {
4769 return 1;
4770 } else {
4771 return 0;
4772 }
4773 }
4774
4775 static int setExpire(redisDb *db, robj *key, time_t when) {
4776 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4777 return 0;
4778 } else {
4779 incrRefCount(key);
4780 return 1;
4781 }
4782 }
4783
4784 /* Return the expire time of the specified key, or -1 if no expire
4785 * is associated with this key (i.e. the key is non volatile) */
4786 static time_t getExpire(redisDb *db, robj *key) {
4787 dictEntry *de;
4788
4789 /* No expire? return ASAP */
4790 if (dictSize(db->expires) == 0 ||
4791 (de = dictFind(db->expires,key)) == NULL) return -1;
4792
4793 return (time_t) dictGetEntryVal(de);
4794 }
4795
4796 static int expireIfNeeded(redisDb *db, robj *key) {
4797 time_t when;
4798 dictEntry *de;
4799
4800 /* No expire? return ASAP */
4801 if (dictSize(db->expires) == 0 ||
4802 (de = dictFind(db->expires,key)) == NULL) return 0;
4803
4804 /* Lookup the expire */
4805 when = (time_t) dictGetEntryVal(de);
4806 if (time(NULL) <= when) return 0;
4807
4808 /* Delete the key */
4809 dictDelete(db->expires,key);
4810 return dictDelete(db->dict,key) == DICT_OK;
4811 }
4812
4813 static int deleteIfVolatile(redisDb *db, robj *key) {
4814 dictEntry *de;
4815
4816 /* No expire? return ASAP */
4817 if (dictSize(db->expires) == 0 ||
4818 (de = dictFind(db->expires,key)) == NULL) return 0;
4819
4820 /* Delete the key */
4821 server.dirty++;
4822 dictDelete(db->expires,key);
4823 return dictDelete(db->dict,key) == DICT_OK;
4824 }
4825
4826 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
4827 dictEntry *de;
4828
4829 de = dictFind(c->db->dict,key);
4830 if (de == NULL) {
4831 addReply(c,shared.czero);
4832 return;
4833 }
4834 if (seconds < 0) {
4835 if (deleteKey(c->db,key)) server.dirty++;
4836 addReply(c, shared.cone);
4837 return;
4838 } else {
4839 time_t when = time(NULL)+seconds;
4840 if (setExpire(c->db,key,when)) {
4841 addReply(c,shared.cone);
4842 server.dirty++;
4843 } else {
4844 addReply(c,shared.czero);
4845 }
4846 return;
4847 }
4848 }
4849
4850 static void expireCommand(redisClient *c) {
4851 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
4852 }
4853
4854 static void expireatCommand(redisClient *c) {
4855 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
4856 }
4857
4858 static void ttlCommand(redisClient *c) {
4859 time_t expire;
4860 int ttl = -1;
4861
4862 expire = getExpire(c->db,c->argv[1]);
4863 if (expire != -1) {
4864 ttl = (int) (expire-time(NULL));
4865 if (ttl < 0) ttl = -1;
4866 }
4867 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4868 }
4869
4870 static void msetGenericCommand(redisClient *c, int nx) {
4871 int j;
4872
4873 if ((c->argc % 2) == 0) {
4874 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4875 return;
4876 }
4877 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4878 * set nothing at all if at least one already key exists. */
4879 if (nx) {
4880 for (j = 1; j < c->argc; j += 2) {
4881 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4882 addReply(c, shared.czero);
4883 return;
4884 }
4885 }
4886 }
4887
4888 for (j = 1; j < c->argc; j += 2) {
4889 int retval;
4890
4891 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4892 if (retval == DICT_ERR) {
4893 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4894 incrRefCount(c->argv[j+1]);
4895 } else {
4896 incrRefCount(c->argv[j]);
4897 incrRefCount(c->argv[j+1]);
4898 }
4899 removeExpire(c->db,c->argv[j]);
4900 }
4901 server.dirty += (c->argc-1)/2;
4902 addReply(c, nx ? shared.cone : shared.ok);
4903 }
4904
4905 static void msetCommand(redisClient *c) {
4906 msetGenericCommand(c,0);
4907 }
4908
4909 static void msetnxCommand(redisClient *c) {
4910 msetGenericCommand(c,1);
4911 }
4912
4913 /* =============================== Replication ============================= */
4914
4915 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4916 ssize_t nwritten, ret = size;
4917 time_t start = time(NULL);
4918
4919 timeout++;
4920 while(size) {
4921 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4922 nwritten = write(fd,ptr,size);
4923 if (nwritten == -1) return -1;
4924 ptr += nwritten;
4925 size -= nwritten;
4926 }
4927 if ((time(NULL)-start) > timeout) {
4928 errno = ETIMEDOUT;
4929 return -1;
4930 }
4931 }
4932 return ret;
4933 }
4934
4935 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4936 ssize_t nread, totread = 0;
4937 time_t start = time(NULL);
4938
4939 timeout++;
4940 while(size) {
4941 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4942 nread = read(fd,ptr,size);
4943 if (nread == -1) return -1;
4944 ptr += nread;
4945 size -= nread;
4946 totread += nread;
4947 }
4948 if ((time(NULL)-start) > timeout) {
4949 errno = ETIMEDOUT;
4950 return -1;
4951 }
4952 }
4953 return totread;
4954 }
4955
4956 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4957 ssize_t nread = 0;
4958
4959 size--;
4960 while(size) {
4961 char c;
4962
4963 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4964 if (c == '\n') {
4965 *ptr = '\0';
4966 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4967 return nread;
4968 } else {
4969 *ptr++ = c;
4970 *ptr = '\0';
4971 nread++;
4972 }
4973 }
4974 return nread;
4975 }
4976
4977 static void syncCommand(redisClient *c) {
4978 /* ignore SYNC if aleady slave or in monitor mode */
4979 if (c->flags & REDIS_SLAVE) return;
4980
4981 /* SYNC can't be issued when the server has pending data to send to
4982 * the client about already issued commands. We need a fresh reply
4983 * buffer registering the differences between the BGSAVE and the current
4984 * dataset, so that we can copy to other slaves if needed. */
4985 if (listLength(c->reply) != 0) {
4986 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4987 return;
4988 }
4989
4990 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4991 /* Here we need to check if there is a background saving operation
4992 * in progress, or if it is required to start one */
4993 if (server.bgsaveinprogress) {
4994 /* Ok a background save is in progress. Let's check if it is a good
4995 * one for replication, i.e. if there is another slave that is
4996 * registering differences since the server forked to save */
4997 redisClient *slave;
4998 listNode *ln;
4999
5000 listRewind(server.slaves);
5001 while((ln = listYield(server.slaves))) {
5002 slave = ln->value;
5003 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5004 }
5005 if (ln) {
5006 /* Perfect, the server is already registering differences for
5007 * another slave. Set the right state, and copy the buffer. */
5008 listRelease(c->reply);
5009 c->reply = listDup(slave->reply);
5010 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5011 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5012 } else {
5013 /* No way, we need to wait for the next BGSAVE in order to
5014 * register differences */
5015 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5016 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5017 }
5018 } else {
5019 /* Ok we don't have a BGSAVE in progress, let's start one */
5020 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5021 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5022 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5023 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5024 return;
5025 }
5026 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5027 }
5028 c->repldbfd = -1;
5029 c->flags |= REDIS_SLAVE;
5030 c->slaveseldb = 0;
5031 listAddNodeTail(server.slaves,c);
5032 return;
5033 }
5034
5035 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5036 redisClient *slave = privdata;
5037 REDIS_NOTUSED(el);
5038 REDIS_NOTUSED(mask);
5039 char buf[REDIS_IOBUF_LEN];
5040 ssize_t nwritten, buflen;
5041
5042 if (slave->repldboff == 0) {
5043 /* Write the bulk write count before to transfer the DB. In theory here
5044 * we don't know how much room there is in the output buffer of the
5045 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5046 * operations) will never be smaller than the few bytes we need. */
5047 sds bulkcount;
5048
5049 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5050 slave->repldbsize);
5051 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5052 {
5053 sdsfree(bulkcount);
5054 freeClient(slave);
5055 return;
5056 }
5057 sdsfree(bulkcount);
5058 }
5059 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5060 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5061 if (buflen <= 0) {
5062 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5063 (buflen == 0) ? "premature EOF" : strerror(errno));
5064 freeClient(slave);
5065 return;
5066 }
5067 if ((nwritten = write(fd,buf,buflen)) == -1) {
5068 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5069 strerror(errno));
5070 freeClient(slave);
5071 return;
5072 }
5073 slave->repldboff += nwritten;
5074 if (slave->repldboff == slave->repldbsize) {
5075 close(slave->repldbfd);
5076 slave->repldbfd = -1;
5077 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5078 slave->replstate = REDIS_REPL_ONLINE;
5079 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5080 sendReplyToClient, slave, NULL) == AE_ERR) {
5081 freeClient(slave);
5082 return;
5083 }
5084 addReplySds(slave,sdsempty());
5085 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5086 }
5087 }
5088
5089 /* This function is called at the end of every backgrond saving.
5090 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5091 * otherwise REDIS_ERR is passed to the function.
5092 *
5093 * The goal of this function is to handle slaves waiting for a successful
5094 * background saving in order to perform non-blocking synchronization. */
5095 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5096 listNode *ln;
5097 int startbgsave = 0;
5098
5099 listRewind(server.slaves);
5100 while((ln = listYield(server.slaves))) {
5101 redisClient *slave = ln->value;
5102
5103 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5104 startbgsave = 1;
5105 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5106 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5107 struct redis_stat buf;
5108
5109 if (bgsaveerr != REDIS_OK) {
5110 freeClient(slave);
5111 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5112 continue;
5113 }
5114 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5115 redis_fstat(slave->repldbfd,&buf) == -1) {
5116 freeClient(slave);
5117 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5118 continue;
5119 }
5120 slave->repldboff = 0;
5121 slave->repldbsize = buf.st_size;
5122 slave->replstate = REDIS_REPL_SEND_BULK;
5123 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5124 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
5125 freeClient(slave);
5126 continue;
5127 }
5128 }
5129 }
5130 if (startbgsave) {
5131 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5132 listRewind(server.slaves);
5133 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5134 while((ln = listYield(server.slaves))) {
5135 redisClient *slave = ln->value;
5136
5137 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5138 freeClient(slave);
5139 }
5140 }
5141 }
5142 }
5143
5144 static int syncWithMaster(void) {
5145 char buf[1024], tmpfile[256], authcmd[1024];
5146 int dumpsize;
5147 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5148 int dfd;
5149
5150 if (fd == -1) {
5151 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5152 strerror(errno));
5153 return REDIS_ERR;
5154 }
5155
5156 /* AUTH with the master if required. */
5157 if(server.masterauth) {
5158 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5159 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5160 close(fd);
5161 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5162 strerror(errno));
5163 return REDIS_ERR;
5164 }
5165 /* Read the AUTH result. */
5166 if (syncReadLine(fd,buf,1024,3600) == -1) {
5167 close(fd);
5168 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5169 strerror(errno));
5170 return REDIS_ERR;
5171 }
5172 if (buf[0] != '+') {
5173 close(fd);
5174 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5175 return REDIS_ERR;
5176 }
5177 }
5178
5179 /* Issue the SYNC command */
5180 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5181 close(fd);
5182 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5183 strerror(errno));
5184 return REDIS_ERR;
5185 }
5186 /* Read the bulk write count */
5187 if (syncReadLine(fd,buf,1024,3600) == -1) {
5188 close(fd);
5189 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5190 strerror(errno));
5191 return REDIS_ERR;
5192 }
5193 if (buf[0] != '$') {
5194 close(fd);
5195 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5196 return REDIS_ERR;
5197 }
5198 dumpsize = atoi(buf+1);
5199 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5200 /* Read the bulk write data on a temp file */
5201 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5202 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5203 if (dfd == -1) {
5204 close(fd);
5205 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5206 return REDIS_ERR;
5207 }
5208 while(dumpsize) {
5209 int nread, nwritten;
5210
5211 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5212 if (nread == -1) {
5213 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5214 strerror(errno));
5215 close(fd);
5216 close(dfd);
5217 return REDIS_ERR;
5218 }
5219 nwritten = write(dfd,buf,nread);
5220 if (nwritten == -1) {
5221 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5222 close(fd);
5223 close(dfd);
5224 return REDIS_ERR;
5225 }
5226 dumpsize -= nread;
5227 }
5228 close(dfd);
5229 if (rename(tmpfile,server.dbfilename) == -1) {
5230 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5231 unlink(tmpfile);
5232 close(fd);
5233 return REDIS_ERR;
5234 }
5235 emptyDb();
5236 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5237 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5238 close(fd);
5239 return REDIS_ERR;
5240 }
5241 server.master = createClient(fd);
5242 server.master->flags |= REDIS_MASTER;
5243 server.replstate = REDIS_REPL_CONNECTED;
5244 return REDIS_OK;
5245 }
5246
5247 static void slaveofCommand(redisClient *c) {
5248 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5249 !strcasecmp(c->argv[2]->ptr,"one")) {
5250 if (server.masterhost) {
5251 sdsfree(server.masterhost);
5252 server.masterhost = NULL;
5253 if (server.master) freeClient(server.master);
5254 server.replstate = REDIS_REPL_NONE;
5255 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5256 }
5257 } else {
5258 sdsfree(server.masterhost);
5259 server.masterhost = sdsdup(c->argv[1]->ptr);
5260 server.masterport = atoi(c->argv[2]->ptr);
5261 if (server.master) freeClient(server.master);
5262 server.replstate = REDIS_REPL_CONNECT;
5263 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5264 server.masterhost, server.masterport);
5265 }
5266 addReply(c,shared.ok);
5267 }
5268
5269 /* ============================ Maxmemory directive ======================== */
5270
5271 /* This function gets called when 'maxmemory' is set on the config file to limit
5272 * the max memory used by the server, and we are out of memory.
5273 * This function will try to, in order:
5274 *
5275 * - Free objects from the free list
5276 * - Try to remove keys with an EXPIRE set
5277 *
5278 * It is not possible to free enough memory to reach used-memory < maxmemory
5279 * the server will start refusing commands that will enlarge even more the
5280 * memory usage.
5281 */
5282 static void freeMemoryIfNeeded(void) {
5283 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5284 if (listLength(server.objfreelist)) {
5285 robj *o;
5286
5287 listNode *head = listFirst(server.objfreelist);
5288 o = listNodeValue(head);
5289 listDelNode(server.objfreelist,head);
5290 zfree(o);
5291 } else {
5292 int j, k, freed = 0;
5293
5294 for (j = 0; j < server.dbnum; j++) {
5295 int minttl = -1;
5296 robj *minkey = NULL;
5297 struct dictEntry *de;
5298
5299 if (dictSize(server.db[j].expires)) {
5300 freed = 1;
5301 /* From a sample of three keys drop the one nearest to
5302 * the natural expire */
5303 for (k = 0; k < 3; k++) {
5304 time_t t;
5305
5306 de = dictGetRandomKey(server.db[j].expires);
5307 t = (time_t) dictGetEntryVal(de);
5308 if (minttl == -1 || t < minttl) {
5309 minkey = dictGetEntryKey(de);
5310 minttl = t;
5311 }
5312 }
5313 deleteKey(server.db+j,minkey);
5314 }
5315 }
5316 if (!freed) return; /* nothing to free... */
5317 }
5318 }
5319 }
5320
5321 /* ============================== Append Only file ========================== */
5322
5323 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5324 sds buf = sdsempty();
5325 int j;
5326 ssize_t nwritten;
5327 time_t now;
5328 robj *tmpargv[3];
5329
5330 /* The DB this command was targetting is not the same as the last command
5331 * we appendend. To issue a SELECT command is needed. */
5332 if (dictid != server.appendseldb) {
5333 char seldb[64];
5334
5335 snprintf(seldb,sizeof(seldb),"%d",dictid);
5336 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
5337 strlen(seldb),seldb);
5338 server.appendseldb = dictid;
5339 }
5340
5341 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5342 * EXPIREs into EXPIREATs calls */
5343 if (cmd->proc == expireCommand) {
5344 long when;
5345
5346 tmpargv[0] = createStringObject("EXPIREAT",8);
5347 tmpargv[1] = argv[1];
5348 incrRefCount(argv[1]);
5349 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5350 tmpargv[2] = createObject(REDIS_STRING,
5351 sdscatprintf(sdsempty(),"%ld",when));
5352 argv = tmpargv;
5353 }
5354
5355 /* Append the actual command */
5356 buf = sdscatprintf(buf,"*%d\r\n",argc);
5357 for (j = 0; j < argc; j++) {
5358 robj *o = argv[j];
5359
5360 if (o->encoding != REDIS_ENCODING_RAW)
5361 o = getDecodedObject(o);
5362 buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
5363 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5364 buf = sdscatlen(buf,"\r\n",2);
5365 if (o != argv[j])
5366 decrRefCount(o);
5367 }
5368
5369 /* Free the objects from the modified argv for EXPIREAT */
5370 if (cmd->proc == expireCommand) {
5371 for (j = 0; j < 3; j++)
5372 decrRefCount(argv[j]);
5373 }
5374
5375 /* We want to perform a single write. This should be guaranteed atomic
5376 * at least if the filesystem we are writing is a real physical one.
5377 * While this will save us against the server being killed I don't think
5378 * there is much to do about the whole server stopping for power problems
5379 * or alike */
5380 nwritten = write(server.appendfd,buf,sdslen(buf));
5381 if (nwritten != (signed)sdslen(buf)) {
5382 /* Ooops, we are in troubles. The best thing to do for now is
5383 * to simply exit instead to give the illusion that everything is
5384 * working as expected. */
5385 if (nwritten == -1) {
5386 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5387 } else {
5388 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5389 }
5390 exit(1);
5391 }
5392 now = time(NULL);
5393 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5394 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5395 now-server.lastfsync > 1))
5396 {
5397 fsync(server.appendfd); /* Let's try to get this data on the disk */
5398 server.lastfsync = now;
5399 }
5400 }
5401
5402 /* In Redis commands are always executed in the context of a client, so in
5403 * order to load the append only file we need to create a fake client. */
5404 static struct redisClient *createFakeClient(void) {
5405 struct redisClient *c = zmalloc(sizeof(*c));
5406
5407 selectDb(c,0);
5408 c->fd = -1;
5409 c->querybuf = sdsempty();
5410 c->argc = 0;
5411 c->argv = NULL;
5412 c->flags = 0;
5413 /* We set the fake client as a slave waiting for the synchronization
5414 * so that Redis will not try to send replies to this client. */
5415 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5416 c->reply = listCreate();
5417 listSetFreeMethod(c->reply,decrRefCount);
5418 listSetDupMethod(c->reply,dupClientReplyValue);
5419 return c;
5420 }
5421
5422 static void freeFakeClient(struct redisClient *c) {
5423 sdsfree(c->querybuf);
5424 listRelease(c->reply);
5425 zfree(c);
5426 }
5427
5428 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5429 * error (the append only file is zero-length) REDIS_ERR is returned. On
5430 * fatal error an error message is logged and the program exists. */
5431 int loadAppendOnlyFile(char *filename) {
5432 struct redisClient *fakeClient;
5433 FILE *fp = fopen(filename,"r");
5434 struct redis_stat sb;
5435
5436 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5437 return REDIS_ERR;
5438
5439 if (fp == NULL) {
5440 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5441 exit(1);
5442 }
5443
5444 fakeClient = createFakeClient();
5445 while(1) {
5446 int argc, j;
5447 unsigned long len;
5448 robj **argv;
5449 char buf[128];
5450 sds argsds;
5451 struct redisCommand *cmd;
5452
5453 if (fgets(buf,sizeof(buf),fp) == NULL) {
5454 if (feof(fp))
5455 break;
5456 else
5457 goto readerr;
5458 }
5459 if (buf[0] != '*') goto fmterr;
5460 argc = atoi(buf+1);
5461 argv = zmalloc(sizeof(robj*)*argc);
5462 for (j = 0; j < argc; j++) {
5463 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5464 if (buf[0] != '$') goto fmterr;
5465 len = strtol(buf+1,NULL,10);
5466 argsds = sdsnewlen(NULL,len);
5467 if (fread(argsds,len,1,fp) == 0) goto fmterr;
5468 argv[j] = createObject(REDIS_STRING,argsds);
5469 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5470 }
5471
5472 /* Command lookup */
5473 cmd = lookupCommand(argv[0]->ptr);
5474 if (!cmd) {
5475 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5476 exit(1);
5477 }
5478 /* Try object sharing and encoding */
5479 if (server.shareobjects) {
5480 int j;
5481 for(j = 1; j < argc; j++)
5482 argv[j] = tryObjectSharing(argv[j]);
5483 }
5484 if (cmd->flags & REDIS_CMD_BULK)
5485 tryObjectEncoding(argv[argc-1]);
5486 /* Run the command in the context of a fake client */
5487 fakeClient->argc = argc;
5488 fakeClient->argv = argv;
5489 cmd->proc(fakeClient);
5490 /* Discard the reply objects list from the fake client */
5491 while(listLength(fakeClient->reply))
5492 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5493 /* Clean up, ready for the next command */
5494 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5495 zfree(argv);
5496 }
5497 fclose(fp);
5498 freeFakeClient(fakeClient);
5499 return REDIS_OK;
5500
5501 readerr:
5502 if (feof(fp)) {
5503 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5504 } else {
5505 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5506 }
5507 exit(1);
5508 fmterr:
5509 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5510 exit(1);
5511 }
5512
5513 /* ================================= Debugging ============================== */
5514
5515 static void debugCommand(redisClient *c) {
5516 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
5517 *((char*)-1) = 'x';
5518 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
5519 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
5520 robj *key, *val;
5521
5522 if (!de) {
5523 addReply(c,shared.nokeyerr);
5524 return;
5525 }
5526 key = dictGetEntryKey(de);
5527 val = dictGetEntryVal(de);
5528 addReplySds(c,sdscatprintf(sdsempty(),
5529 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5530 key, key->refcount, val, val->refcount, val->encoding));
5531 } else {
5532 addReplySds(c,sdsnew(
5533 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5534 }
5535 }
5536
5537 #ifdef HAVE_BACKTRACE
5538 static struct redisFunctionSym symsTable[] = {
5539 {"compareStringObjects", (unsigned long)compareStringObjects},
5540 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
5541 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
5542 {"dictEncObjHash", (unsigned long)dictEncObjHash},
5543 {"incrDecrCommand", (unsigned long)incrDecrCommand},
5544 {"freeStringObject", (unsigned long)freeStringObject},
5545 {"freeListObject", (unsigned long)freeListObject},
5546 {"freeSetObject", (unsigned long)freeSetObject},
5547 {"decrRefCount", (unsigned long)decrRefCount},
5548 {"createObject", (unsigned long)createObject},
5549 {"freeClient", (unsigned long)freeClient},
5550 {"rdbLoad", (unsigned long)rdbLoad},
5551 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
5552 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
5553 {"addReply", (unsigned long)addReply},
5554 {"addReplySds", (unsigned long)addReplySds},
5555 {"incrRefCount", (unsigned long)incrRefCount},
5556 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
5557 {"createStringObject", (unsigned long)createStringObject},
5558 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
5559 {"syncWithMaster", (unsigned long)syncWithMaster},
5560 {"tryObjectSharing", (unsigned long)tryObjectSharing},
5561 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
5562 {"getDecodedObject", (unsigned long)getDecodedObject},
5563 {"removeExpire", (unsigned long)removeExpire},
5564 {"expireIfNeeded", (unsigned long)expireIfNeeded},
5565 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
5566 {"deleteKey", (unsigned long)deleteKey},
5567 {"getExpire", (unsigned long)getExpire},
5568 {"setExpire", (unsigned long)setExpire},
5569 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
5570 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
5571 {"authCommand", (unsigned long)authCommand},
5572 {"pingCommand", (unsigned long)pingCommand},
5573 {"echoCommand", (unsigned long)echoCommand},
5574 {"setCommand", (unsigned long)setCommand},
5575 {"setnxCommand", (unsigned long)setnxCommand},
5576 {"getCommand", (unsigned long)getCommand},
5577 {"delCommand", (unsigned long)delCommand},
5578 {"existsCommand", (unsigned long)existsCommand},
5579 {"incrCommand", (unsigned long)incrCommand},
5580 {"decrCommand", (unsigned long)decrCommand},
5581 {"incrbyCommand", (unsigned long)incrbyCommand},
5582 {"decrbyCommand", (unsigned long)decrbyCommand},
5583 {"selectCommand", (unsigned long)selectCommand},
5584 {"randomkeyCommand", (unsigned long)randomkeyCommand},
5585 {"keysCommand", (unsigned long)keysCommand},
5586 {"dbsizeCommand", (unsigned long)dbsizeCommand},
5587 {"lastsaveCommand", (unsigned long)lastsaveCommand},
5588 {"saveCommand", (unsigned long)saveCommand},
5589 {"bgsaveCommand", (unsigned long)bgsaveCommand},
5590 {"shutdownCommand", (unsigned long)shutdownCommand},
5591 {"moveCommand", (unsigned long)moveCommand},
5592 {"renameCommand", (unsigned long)renameCommand},
5593 {"renamenxCommand", (unsigned long)renamenxCommand},
5594 {"lpushCommand", (unsigned long)lpushCommand},
5595 {"rpushCommand", (unsigned long)rpushCommand},
5596 {"lpopCommand", (unsigned long)lpopCommand},
5597 {"rpopCommand", (unsigned long)rpopCommand},
5598 {"llenCommand", (unsigned long)llenCommand},
5599 {"lindexCommand", (unsigned long)lindexCommand},
5600 {"lrangeCommand", (unsigned long)lrangeCommand},
5601 {"ltrimCommand", (unsigned long)ltrimCommand},
5602 {"typeCommand", (unsigned long)typeCommand},
5603 {"lsetCommand", (unsigned long)lsetCommand},
5604 {"saddCommand", (unsigned long)saddCommand},
5605 {"sremCommand", (unsigned long)sremCommand},
5606 {"smoveCommand", (unsigned long)smoveCommand},
5607 {"sismemberCommand", (unsigned long)sismemberCommand},
5608 {"scardCommand", (unsigned long)scardCommand},
5609 {"spopCommand", (unsigned long)spopCommand},
5610 {"srandmemberCommand", (unsigned long)srandmemberCommand},
5611 {"sinterCommand", (unsigned long)sinterCommand},
5612 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
5613 {"sunionCommand", (unsigned long)sunionCommand},
5614 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
5615 {"sdiffCommand", (unsigned long)sdiffCommand},
5616 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
5617 {"syncCommand", (unsigned long)syncCommand},
5618 {"flushdbCommand", (unsigned long)flushdbCommand},
5619 {"flushallCommand", (unsigned long)flushallCommand},
5620 {"sortCommand", (unsigned long)sortCommand},
5621 {"lremCommand", (unsigned long)lremCommand},
5622 {"infoCommand", (unsigned long)infoCommand},
5623 {"mgetCommand", (unsigned long)mgetCommand},
5624 {"monitorCommand", (unsigned long)monitorCommand},
5625 {"expireCommand", (unsigned long)expireCommand},
5626 {"expireatCommand", (unsigned long)expireatCommand},
5627 {"getsetCommand", (unsigned long)getsetCommand},
5628 {"ttlCommand", (unsigned long)ttlCommand},
5629 {"slaveofCommand", (unsigned long)slaveofCommand},
5630 {"debugCommand", (unsigned long)debugCommand},
5631 {"processCommand", (unsigned long)processCommand},
5632 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
5633 {"readQueryFromClient", (unsigned long)readQueryFromClient},
5634 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
5635 {"msetGenericCommand", (unsigned long)msetGenericCommand},
5636 {"msetCommand", (unsigned long)msetCommand},
5637 {"msetnxCommand", (unsigned long)msetnxCommand},
5638 {"zslCreateNode", (unsigned long)zslCreateNode},
5639 {"zslCreate", (unsigned long)zslCreate},
5640 {"zslFreeNode",(unsigned long)zslFreeNode},
5641 {"zslFree",(unsigned long)zslFree},
5642 {"zslRandomLevel",(unsigned long)zslRandomLevel},
5643 {"zslInsert",(unsigned long)zslInsert},
5644 {"zslDelete",(unsigned long)zslDelete},
5645 {"createZsetObject",(unsigned long)createZsetObject},
5646 {"zaddCommand",(unsigned long)zaddCommand},
5647 {"zrangeGenericCommand",(unsigned long)zrangeGenericCommand},
5648 {"zrangeCommand",(unsigned long)zrangeCommand},
5649 {"zrevrangeCommand",(unsigned long)zrevrangeCommand},
5650 {"zremCommand",(unsigned long)zremCommand},
5651 {"rdbSaveDoubleValue",(unsigned long)rdbSaveDoubleValue},
5652 {"rdbLoadDoubleValue",(unsigned long)rdbLoadDoubleValue},
5653 {"feedAppendOnlyFile",(unsigned long)feedAppendOnlyFile},
5654 {NULL,0}
5655 };
5656
5657 /* This function try to convert a pointer into a function name. It's used in
5658 * oreder to provide a backtrace under segmentation fault that's able to
5659 * display functions declared as static (otherwise the backtrace is useless). */
5660 static char *findFuncName(void *pointer, unsigned long *offset){
5661 int i, ret = -1;
5662 unsigned long off, minoff = 0;
5663
5664 /* Try to match against the Symbol with the smallest offset */
5665 for (i=0; symsTable[i].pointer; i++) {
5666 unsigned long lp = (unsigned long) pointer;
5667
5668 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5669 off=lp-symsTable[i].pointer;
5670 if (ret < 0 || off < minoff) {
5671 minoff=off;
5672 ret=i;
5673 }
5674 }
5675 }
5676 if (ret == -1) return NULL;
5677 *offset = minoff;
5678 return symsTable[ret].name;
5679 }
5680
5681 static void *getMcontextEip(ucontext_t *uc) {
5682 #if defined(__FreeBSD__)
5683 return (void*) uc->uc_mcontext.mc_eip;
5684 #elif defined(__dietlibc__)
5685 return (void*) uc->uc_mcontext.eip;
5686 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5687 return (void*) uc->uc_mcontext->__ss.__eip;
5688 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5689 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5690 return (void*) uc->uc_mcontext->__ss.__rip;
5691 #else
5692 return (void*) uc->uc_mcontext->__ss.__eip;
5693 #endif
5694 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5695 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5696 #elif defined(__ia64__) /* Linux IA64 */
5697 return (void*) uc->uc_mcontext.sc_ip;
5698 #else
5699 return NULL;
5700 #endif
5701 }
5702
5703 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5704 void *trace[100];
5705 char **messages = NULL;
5706 int i, trace_size = 0;
5707 unsigned long offset=0;
5708 time_t uptime = time(NULL)-server.stat_starttime;
5709 ucontext_t *uc = (ucontext_t*) secret;
5710 REDIS_NOTUSED(info);
5711
5712 redisLog(REDIS_WARNING,
5713 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5714 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5715 "redis_version:%s; "
5716 "uptime_in_seconds:%d; "
5717 "connected_clients:%d; "
5718 "connected_slaves:%d; "
5719 "used_memory:%zu; "
5720 "changes_since_last_save:%lld; "
5721 "bgsave_in_progress:%d; "
5722 "last_save_time:%d; "
5723 "total_connections_received:%lld; "
5724 "total_commands_processed:%lld; "
5725 "role:%s;"
5726 ,REDIS_VERSION,
5727 uptime,
5728 listLength(server.clients)-listLength(server.slaves),
5729 listLength(server.slaves),
5730 server.usedmemory,
5731 server.dirty,
5732 server.bgsaveinprogress,
5733 server.lastsave,
5734 server.stat_numconnections,
5735 server.stat_numcommands,
5736 server.masterhost == NULL ? "master" : "slave"
5737 ));
5738
5739 trace_size = backtrace(trace, 100);
5740 /* overwrite sigaction with caller's address */
5741 if (getMcontextEip(uc) != NULL) {
5742 trace[1] = getMcontextEip(uc);
5743 }
5744 messages = backtrace_symbols(trace, trace_size);
5745
5746 for (i=1; i<trace_size; ++i) {
5747 char *fn = findFuncName(trace[i], &offset), *p;
5748
5749 p = strchr(messages[i],'+');
5750 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5751 redisLog(REDIS_WARNING,"%s", messages[i]);
5752 } else {
5753 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5754 }
5755 }
5756 free(messages);
5757 exit(0);
5758 }
5759
5760 static void setupSigSegvAction(void) {
5761 struct sigaction act;
5762
5763 sigemptyset (&act.sa_mask);
5764 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5765 * is used. Otherwise, sa_handler is used */
5766 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5767 act.sa_sigaction = segvHandler;
5768 sigaction (SIGSEGV, &act, NULL);
5769 sigaction (SIGBUS, &act, NULL);
5770 sigaction (SIGFPE, &act, NULL);
5771 sigaction (SIGILL, &act, NULL);
5772 sigaction (SIGBUS, &act, NULL);
5773 return;
5774 }
5775 #else /* HAVE_BACKTRACE */
5776 static void setupSigSegvAction(void) {
5777 }
5778 #endif /* HAVE_BACKTRACE */
5779
5780 /* =================================== Main! ================================ */
5781
5782 #ifdef __linux__
5783 int linuxOvercommitMemoryValue(void) {
5784 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5785 char buf[64];
5786
5787 if (!fp) return -1;
5788 if (fgets(buf,64,fp) == NULL) {
5789 fclose(fp);
5790 return -1;
5791 }
5792 fclose(fp);
5793
5794 return atoi(buf);
5795 }
5796
5797 void linuxOvercommitMemoryWarning(void) {
5798 if (linuxOvercommitMemoryValue() == 0) {
5799 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5800 }
5801 }
5802 #endif /* __linux__ */
5803
5804 static void daemonize(void) {
5805 int fd;
5806 FILE *fp;
5807
5808 if (fork() != 0) exit(0); /* parent exits */
5809 setsid(); /* create a new session */
5810
5811 /* Every output goes to /dev/null. If Redis is daemonized but
5812 * the 'logfile' is set to 'stdout' in the configuration file
5813 * it will not log at all. */
5814 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5815 dup2(fd, STDIN_FILENO);
5816 dup2(fd, STDOUT_FILENO);
5817 dup2(fd, STDERR_FILENO);
5818 if (fd > STDERR_FILENO) close(fd);
5819 }
5820 /* Try to write the pid file */
5821 fp = fopen(server.pidfile,"w");
5822 if (fp) {
5823 fprintf(fp,"%d\n",getpid());
5824 fclose(fp);
5825 }
5826 }
5827
5828 int main(int argc, char **argv) {
5829 initServerConfig();
5830 if (argc == 2) {
5831 ResetServerSaveParams();
5832 loadServerConfig(argv[1]);
5833 } else if (argc > 2) {
5834 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5835 exit(1);
5836 } else {
5837 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5838 }
5839 initServer();
5840 if (server.daemonize) daemonize();
5841 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5842 #ifdef __linux__
5843 linuxOvercommitMemoryWarning();
5844 #endif
5845 if (server.appendonly) {
5846 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
5847 redisLog(REDIS_NOTICE,"DB loaded from append only file");
5848 } else {
5849 if (rdbLoad(server.dbfilename) == REDIS_OK)
5850 redisLog(REDIS_NOTICE,"DB loaded from disk");
5851 }
5852 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5853 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5854 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5855 aeMain(server.el);
5856 aeDeleteEventLoop(server.el);
5857 return 0;
5858 }