]> git.saurik.com Git - redis.git/blob - redis.c
f625333147b3bd55abef59f599c7054fbe6612ea
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60 #include <math.h>
61
62 #if defined(__sun)
63 #include "solarisfixes.h"
64 #endif
65
66 #include "redis.h"
67 #include "ae.h" /* Event driven programming library */
68 #include "sds.h" /* Dynamic safe strings */
69 #include "anet.h" /* Networking the easy way */
70 #include "dict.h" /* Hash tables */
71 #include "adlist.h" /* Linked lists */
72 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
73 #include "lzf.h" /* LZF compression library */
74 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
75
76 /* Error codes */
77 #define REDIS_OK 0
78 #define REDIS_ERR -1
79
80 /* Static server configuration */
81 #define REDIS_SERVERPORT 6379 /* TCP port */
82 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
83 #define REDIS_IOBUF_LEN 1024
84 #define REDIS_LOADBUF_LEN 1024
85 #define REDIS_STATIC_ARGS 4
86 #define REDIS_DEFAULT_DBNUM 16
87 #define REDIS_CONFIGLINE_MAX 1024
88 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
89 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
90 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
91 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
92 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
93
94 /* Hash table parameters */
95 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
96
97 /* Command flags */
98 #define REDIS_CMD_BULK 1 /* Bulk write command */
99 #define REDIS_CMD_INLINE 2 /* Inline command */
100 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
101 this flags will return an error when the 'maxmemory' option is set in the
102 config file and the server is using more than maxmemory bytes of memory.
103 In short this commands are denied on low memory conditions. */
104 #define REDIS_CMD_DENYOOM 4
105
106 /* Object types */
107 #define REDIS_STRING 0
108 #define REDIS_LIST 1
109 #define REDIS_SET 2
110 #define REDIS_ZSET 3
111 #define REDIS_HASH 4
112
113 /* Objects encoding */
114 #define REDIS_ENCODING_RAW 0 /* Raw representation */
115 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
116
117 /* Object types only used for dumping to disk */
118 #define REDIS_EXPIRETIME 253
119 #define REDIS_SELECTDB 254
120 #define REDIS_EOF 255
121
122 /* Defines related to the dump file format. To store 32 bits lengths for short
123 * keys requires a lot of space, so we check the most significant 2 bits of
124 * the first byte to interpreter the length:
125 *
126 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
127 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
128 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
129 * 11|000000 this means: specially encoded object will follow. The six bits
130 * number specify the kind of object that follows.
131 * See the REDIS_RDB_ENC_* defines.
132 *
133 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
134 * values, will fit inside. */
135 #define REDIS_RDB_6BITLEN 0
136 #define REDIS_RDB_14BITLEN 1
137 #define REDIS_RDB_32BITLEN 2
138 #define REDIS_RDB_ENCVAL 3
139 #define REDIS_RDB_LENERR UINT_MAX
140
141 /* When a length of a string object stored on disk has the first two bits
142 * set, the remaining two bits specify a special encoding for the object
143 * accordingly to the following defines: */
144 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
145 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
146 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
147 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
148
149 /* Client flags */
150 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
151 #define REDIS_SLAVE 2 /* This client is a slave server */
152 #define REDIS_MASTER 4 /* This client is a master server */
153 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
154
155 /* Slave replication state - slave side */
156 #define REDIS_REPL_NONE 0 /* No active replication */
157 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
158 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
159
160 /* Slave replication state - from the point of view of master
161 * Note that in SEND_BULK and ONLINE state the slave receives new updates
162 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
163 * to start the next background saving in order to send updates to it. */
164 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
165 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
166 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
167 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
168
169 /* List related stuff */
170 #define REDIS_HEAD 0
171 #define REDIS_TAIL 1
172
173 /* Sort operations */
174 #define REDIS_SORT_GET 0
175 #define REDIS_SORT_DEL 1
176 #define REDIS_SORT_INCR 2
177 #define REDIS_SORT_DECR 3
178 #define REDIS_SORT_ASC 4
179 #define REDIS_SORT_DESC 5
180 #define REDIS_SORTKEY_MAX 1024
181
182 /* Log levels */
183 #define REDIS_DEBUG 0
184 #define REDIS_NOTICE 1
185 #define REDIS_WARNING 2
186
187 /* Anti-warning macro... */
188 #define REDIS_NOTUSED(V) ((void) V)
189
190 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
191 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
192
193 /*================================= Data types ============================== */
194
195 /* A redis object, that is a type able to hold a string / list / set */
196 typedef struct redisObject {
197 void *ptr;
198 unsigned char type;
199 unsigned char encoding;
200 unsigned char notused[2];
201 int refcount;
202 } robj;
203
204 typedef struct redisDb {
205 dict *dict;
206 dict *expires;
207 int id;
208 } redisDb;
209
210 /* With multiplexing we need to take per-clinet state.
211 * Clients are taken in a liked list. */
212 typedef struct redisClient {
213 int fd;
214 redisDb *db;
215 int dictid;
216 sds querybuf;
217 robj **argv, **mbargv;
218 int argc, mbargc;
219 int bulklen; /* bulk read len. -1 if not in bulk read mode */
220 int multibulk; /* multi bulk command format active */
221 list *reply;
222 int sentlen;
223 time_t lastinteraction; /* time of the last interaction, used for timeout */
224 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
225 int slaveseldb; /* slave selected db, if this client is a slave */
226 int authenticated; /* when requirepass is non-NULL */
227 int replstate; /* replication state if this is a slave */
228 int repldbfd; /* replication DB file descriptor */
229 long repldboff; /* replication DB file offset */
230 off_t repldbsize; /* replication DB file size */
231 } redisClient;
232
233 struct saveparam {
234 time_t seconds;
235 int changes;
236 };
237
238 /* Global server state structure */
239 struct redisServer {
240 int port;
241 int fd;
242 redisDb *db;
243 dict *sharingpool;
244 unsigned int sharingpoolsize;
245 long long dirty; /* changes to DB from the last save */
246 list *clients;
247 list *slaves, *monitors;
248 char neterr[ANET_ERR_LEN];
249 aeEventLoop *el;
250 int cronloops; /* number of times the cron function run */
251 list *objfreelist; /* A list of freed objects to avoid malloc() */
252 time_t lastsave; /* Unix time of last save succeeede */
253 size_t usedmemory; /* Used memory in megabytes */
254 /* Fields used only for stats */
255 time_t stat_starttime; /* server start time */
256 long long stat_numcommands; /* number of processed commands */
257 long long stat_numconnections; /* number of connections received */
258 /* Configuration */
259 int verbosity;
260 int glueoutputbuf;
261 int maxidletime;
262 int dbnum;
263 int daemonize;
264 char *pidfile;
265 int bgsaveinprogress;
266 pid_t bgsavechildpid;
267 struct saveparam *saveparams;
268 int saveparamslen;
269 char *logfile;
270 char *bindaddr;
271 char *dbfilename;
272 char *requirepass;
273 int shareobjects;
274 /* Replication related */
275 int isslave;
276 char *masterhost;
277 int masterport;
278 redisClient *master; /* client that is master for this slave */
279 int replstate;
280 unsigned int maxclients;
281 unsigned long maxmemory;
282 /* Sort parameters - qsort_r() is only available under BSD so we
283 * have to take this state global, in order to pass it to sortCompare() */
284 int sort_desc;
285 int sort_alpha;
286 int sort_bypattern;
287 };
288
289 typedef void redisCommandProc(redisClient *c);
290 struct redisCommand {
291 char *name;
292 redisCommandProc *proc;
293 int arity;
294 int flags;
295 };
296
297 struct redisFunctionSym {
298 char *name;
299 unsigned long pointer;
300 };
301
302 typedef struct _redisSortObject {
303 robj *obj;
304 union {
305 double score;
306 robj *cmpobj;
307 } u;
308 } redisSortObject;
309
310 typedef struct _redisSortOperation {
311 int type;
312 robj *pattern;
313 } redisSortOperation;
314
315 /* ZSETs use a specialized version of Skiplists */
316
317 typedef struct zskiplistNode {
318 struct zskiplistNode **forward;
319 struct zskiplistNode *backward;
320 double score;
321 robj *obj;
322 } zskiplistNode;
323
324 typedef struct zskiplist {
325 struct zskiplistNode *header, *tail;
326 long length;
327 int level;
328 } zskiplist;
329
330 typedef struct zset {
331 dict *dict;
332 zskiplist *zsl;
333 } zset;
334
335 /* Our shared "common" objects */
336
337 struct sharedObjectsStruct {
338 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
339 *colon, *nullbulk, *nullmultibulk,
340 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
341 *outofrangeerr, *plus,
342 *select0, *select1, *select2, *select3, *select4,
343 *select5, *select6, *select7, *select8, *select9;
344 } shared;
345
346 /* Global vars that are actally used as constants. The following double
347 * values are used for double on-disk serialization, and are initialized
348 * at runtime to avoid strange compiler optimizations. */
349
350 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
351
352 /*================================ Prototypes =============================== */
353
354 static void freeStringObject(robj *o);
355 static void freeListObject(robj *o);
356 static void freeSetObject(robj *o);
357 static void decrRefCount(void *o);
358 static robj *createObject(int type, void *ptr);
359 static void freeClient(redisClient *c);
360 static int rdbLoad(char *filename);
361 static void addReply(redisClient *c, robj *obj);
362 static void addReplySds(redisClient *c, sds s);
363 static void incrRefCount(robj *o);
364 static int rdbSaveBackground(char *filename);
365 static robj *createStringObject(char *ptr, size_t len);
366 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
367 static int syncWithMaster(void);
368 static robj *tryObjectSharing(robj *o);
369 static int tryObjectEncoding(robj *o);
370 static robj *getDecodedObject(const robj *o);
371 static int removeExpire(redisDb *db, robj *key);
372 static int expireIfNeeded(redisDb *db, robj *key);
373 static int deleteIfVolatile(redisDb *db, robj *key);
374 static int deleteKey(redisDb *db, robj *key);
375 static time_t getExpire(redisDb *db, robj *key);
376 static int setExpire(redisDb *db, robj *key, time_t when);
377 static void updateSlavesWaitingBgsave(int bgsaveerr);
378 static void freeMemoryIfNeeded(void);
379 static int processCommand(redisClient *c);
380 static void setupSigSegvAction(void);
381 static void rdbRemoveTempFile(pid_t childpid);
382 static size_t stringObjectLen(robj *o);
383 static void processInputBuffer(redisClient *c);
384 static zskiplist *zslCreate(void);
385 static void zslFree(zskiplist *zsl);
386 static void zslInsert(zskiplist *zsl, double score, robj *obj);
387
388 static void authCommand(redisClient *c);
389 static void pingCommand(redisClient *c);
390 static void echoCommand(redisClient *c);
391 static void setCommand(redisClient *c);
392 static void setnxCommand(redisClient *c);
393 static void getCommand(redisClient *c);
394 static void delCommand(redisClient *c);
395 static void existsCommand(redisClient *c);
396 static void incrCommand(redisClient *c);
397 static void decrCommand(redisClient *c);
398 static void incrbyCommand(redisClient *c);
399 static void decrbyCommand(redisClient *c);
400 static void selectCommand(redisClient *c);
401 static void randomkeyCommand(redisClient *c);
402 static void keysCommand(redisClient *c);
403 static void dbsizeCommand(redisClient *c);
404 static void lastsaveCommand(redisClient *c);
405 static void saveCommand(redisClient *c);
406 static void bgsaveCommand(redisClient *c);
407 static void shutdownCommand(redisClient *c);
408 static void moveCommand(redisClient *c);
409 static void renameCommand(redisClient *c);
410 static void renamenxCommand(redisClient *c);
411 static void lpushCommand(redisClient *c);
412 static void rpushCommand(redisClient *c);
413 static void lpopCommand(redisClient *c);
414 static void rpopCommand(redisClient *c);
415 static void llenCommand(redisClient *c);
416 static void lindexCommand(redisClient *c);
417 static void lrangeCommand(redisClient *c);
418 static void ltrimCommand(redisClient *c);
419 static void typeCommand(redisClient *c);
420 static void lsetCommand(redisClient *c);
421 static void saddCommand(redisClient *c);
422 static void sremCommand(redisClient *c);
423 static void smoveCommand(redisClient *c);
424 static void sismemberCommand(redisClient *c);
425 static void scardCommand(redisClient *c);
426 static void spopCommand(redisClient *c);
427 static void srandmemberCommand(redisClient *c);
428 static void sinterCommand(redisClient *c);
429 static void sinterstoreCommand(redisClient *c);
430 static void sunionCommand(redisClient *c);
431 static void sunionstoreCommand(redisClient *c);
432 static void sdiffCommand(redisClient *c);
433 static void sdiffstoreCommand(redisClient *c);
434 static void syncCommand(redisClient *c);
435 static void flushdbCommand(redisClient *c);
436 static void flushallCommand(redisClient *c);
437 static void sortCommand(redisClient *c);
438 static void lremCommand(redisClient *c);
439 static void infoCommand(redisClient *c);
440 static void mgetCommand(redisClient *c);
441 static void monitorCommand(redisClient *c);
442 static void expireCommand(redisClient *c);
443 static void getsetCommand(redisClient *c);
444 static void ttlCommand(redisClient *c);
445 static void slaveofCommand(redisClient *c);
446 static void debugCommand(redisClient *c);
447 static void msetCommand(redisClient *c);
448 static void msetnxCommand(redisClient *c);
449 static void zaddCommand(redisClient *c);
450 static void zrangeCommand(redisClient *c);
451 static void zrangebyscoreCommand(redisClient *c);
452 static void zrevrangeCommand(redisClient *c);
453 static void zlenCommand(redisClient *c);
454 static void zremCommand(redisClient *c);
455 static void zscoreCommand(redisClient *c);
456
457 /*================================= Globals ================================= */
458
459 /* Global vars */
460 static struct redisServer server; /* server global state */
461 static struct redisCommand cmdTable[] = {
462 {"get",getCommand,2,REDIS_CMD_INLINE},
463 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
464 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
465 {"del",delCommand,-2,REDIS_CMD_INLINE},
466 {"exists",existsCommand,2,REDIS_CMD_INLINE},
467 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
468 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
469 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
470 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
471 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
472 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
473 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
474 {"llen",llenCommand,2,REDIS_CMD_INLINE},
475 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
476 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
477 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
478 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
479 {"lrem",lremCommand,4,REDIS_CMD_BULK},
480 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
481 {"srem",sremCommand,3,REDIS_CMD_BULK},
482 {"smove",smoveCommand,4,REDIS_CMD_BULK},
483 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
484 {"scard",scardCommand,2,REDIS_CMD_INLINE},
485 {"spop",spopCommand,2,REDIS_CMD_INLINE},
486 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
487 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
488 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
489 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
490 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
491 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
492 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
493 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
494 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
495 {"zrem",zremCommand,3,REDIS_CMD_BULK},
496 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
497 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
498 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
499 {"zlen",zlenCommand,2,REDIS_CMD_INLINE},
500 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
501 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
502 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
503 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
504 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
505 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
506 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
507 {"select",selectCommand,2,REDIS_CMD_INLINE},
508 {"move",moveCommand,3,REDIS_CMD_INLINE},
509 {"rename",renameCommand,3,REDIS_CMD_INLINE},
510 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
511 {"expire",expireCommand,3,REDIS_CMD_INLINE},
512 {"keys",keysCommand,2,REDIS_CMD_INLINE},
513 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
514 {"auth",authCommand,2,REDIS_CMD_INLINE},
515 {"ping",pingCommand,1,REDIS_CMD_INLINE},
516 {"echo",echoCommand,2,REDIS_CMD_BULK},
517 {"save",saveCommand,1,REDIS_CMD_INLINE},
518 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
519 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
520 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
521 {"type",typeCommand,2,REDIS_CMD_INLINE},
522 {"sync",syncCommand,1,REDIS_CMD_INLINE},
523 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
524 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
525 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
526 {"info",infoCommand,1,REDIS_CMD_INLINE},
527 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
528 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
529 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
530 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
531 {NULL,NULL,0,0}
532 };
533 /*============================ Utility functions ============================ */
534
535 /* Glob-style pattern matching. */
536 int stringmatchlen(const char *pattern, int patternLen,
537 const char *string, int stringLen, int nocase)
538 {
539 while(patternLen) {
540 switch(pattern[0]) {
541 case '*':
542 while (pattern[1] == '*') {
543 pattern++;
544 patternLen--;
545 }
546 if (patternLen == 1)
547 return 1; /* match */
548 while(stringLen) {
549 if (stringmatchlen(pattern+1, patternLen-1,
550 string, stringLen, nocase))
551 return 1; /* match */
552 string++;
553 stringLen--;
554 }
555 return 0; /* no match */
556 break;
557 case '?':
558 if (stringLen == 0)
559 return 0; /* no match */
560 string++;
561 stringLen--;
562 break;
563 case '[':
564 {
565 int not, match;
566
567 pattern++;
568 patternLen--;
569 not = pattern[0] == '^';
570 if (not) {
571 pattern++;
572 patternLen--;
573 }
574 match = 0;
575 while(1) {
576 if (pattern[0] == '\\') {
577 pattern++;
578 patternLen--;
579 if (pattern[0] == string[0])
580 match = 1;
581 } else if (pattern[0] == ']') {
582 break;
583 } else if (patternLen == 0) {
584 pattern--;
585 patternLen++;
586 break;
587 } else if (pattern[1] == '-' && patternLen >= 3) {
588 int start = pattern[0];
589 int end = pattern[2];
590 int c = string[0];
591 if (start > end) {
592 int t = start;
593 start = end;
594 end = t;
595 }
596 if (nocase) {
597 start = tolower(start);
598 end = tolower(end);
599 c = tolower(c);
600 }
601 pattern += 2;
602 patternLen -= 2;
603 if (c >= start && c <= end)
604 match = 1;
605 } else {
606 if (!nocase) {
607 if (pattern[0] == string[0])
608 match = 1;
609 } else {
610 if (tolower((int)pattern[0]) == tolower((int)string[0]))
611 match = 1;
612 }
613 }
614 pattern++;
615 patternLen--;
616 }
617 if (not)
618 match = !match;
619 if (!match)
620 return 0; /* no match */
621 string++;
622 stringLen--;
623 break;
624 }
625 case '\\':
626 if (patternLen >= 2) {
627 pattern++;
628 patternLen--;
629 }
630 /* fall through */
631 default:
632 if (!nocase) {
633 if (pattern[0] != string[0])
634 return 0; /* no match */
635 } else {
636 if (tolower((int)pattern[0]) != tolower((int)string[0]))
637 return 0; /* no match */
638 }
639 string++;
640 stringLen--;
641 break;
642 }
643 pattern++;
644 patternLen--;
645 if (stringLen == 0) {
646 while(*pattern == '*') {
647 pattern++;
648 patternLen--;
649 }
650 break;
651 }
652 }
653 if (patternLen == 0 && stringLen == 0)
654 return 1;
655 return 0;
656 }
657
658 static void redisLog(int level, const char *fmt, ...) {
659 va_list ap;
660 FILE *fp;
661
662 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
663 if (!fp) return;
664
665 va_start(ap, fmt);
666 if (level >= server.verbosity) {
667 char *c = ".-*";
668 char buf[64];
669 time_t now;
670
671 now = time(NULL);
672 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
673 fprintf(fp,"%s %c ",buf,c[level]);
674 vfprintf(fp, fmt, ap);
675 fprintf(fp,"\n");
676 fflush(fp);
677 }
678 va_end(ap);
679
680 if (server.logfile) fclose(fp);
681 }
682
683 /*====================== Hash table type implementation ==================== */
684
685 /* This is an hash table type that uses the SDS dynamic strings libary as
686 * keys and radis objects as values (objects can hold SDS strings,
687 * lists, sets). */
688
689 static void dictVanillaFree(void *privdata, void *val)
690 {
691 DICT_NOTUSED(privdata);
692 zfree(val);
693 }
694
695 static int sdsDictKeyCompare(void *privdata, const void *key1,
696 const void *key2)
697 {
698 int l1,l2;
699 DICT_NOTUSED(privdata);
700
701 l1 = sdslen((sds)key1);
702 l2 = sdslen((sds)key2);
703 if (l1 != l2) return 0;
704 return memcmp(key1, key2, l1) == 0;
705 }
706
707 static void dictRedisObjectDestructor(void *privdata, void *val)
708 {
709 DICT_NOTUSED(privdata);
710
711 decrRefCount(val);
712 }
713
714 static int dictObjKeyCompare(void *privdata, const void *key1,
715 const void *key2)
716 {
717 const robj *o1 = key1, *o2 = key2;
718 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
719 }
720
721 static unsigned int dictObjHash(const void *key) {
722 const robj *o = key;
723 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
724 }
725
726 static int dictEncObjKeyCompare(void *privdata, const void *key1,
727 const void *key2)
728 {
729 const robj *o1 = key1, *o2 = key2;
730
731 if (o1->encoding == REDIS_ENCODING_RAW &&
732 o2->encoding == REDIS_ENCODING_RAW)
733 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
734 else {
735 robj *dec1, *dec2;
736 int cmp;
737
738 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
739 getDecodedObject(o1) : (robj*)o1;
740 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
741 getDecodedObject(o2) : (robj*)o2;
742 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
743 if (dec1 != o1) decrRefCount(dec1);
744 if (dec2 != o2) decrRefCount(dec2);
745 return cmp;
746 }
747 }
748
749 static unsigned int dictEncObjHash(const void *key) {
750 const robj *o = key;
751
752 if (o->encoding == REDIS_ENCODING_RAW)
753 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
754 else {
755 robj *dec = getDecodedObject(o);
756 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
757 decrRefCount(dec);
758 return hash;
759 }
760 }
761
762 static dictType setDictType = {
763 dictEncObjHash, /* hash function */
764 NULL, /* key dup */
765 NULL, /* val dup */
766 dictEncObjKeyCompare, /* key compare */
767 dictRedisObjectDestructor, /* key destructor */
768 NULL /* val destructor */
769 };
770
771 static dictType zsetDictType = {
772 dictEncObjHash, /* hash function */
773 NULL, /* key dup */
774 NULL, /* val dup */
775 dictEncObjKeyCompare, /* key compare */
776 dictRedisObjectDestructor, /* key destructor */
777 dictVanillaFree /* val destructor */
778 };
779
780 static dictType hashDictType = {
781 dictObjHash, /* hash function */
782 NULL, /* key dup */
783 NULL, /* val dup */
784 dictObjKeyCompare, /* key compare */
785 dictRedisObjectDestructor, /* key destructor */
786 dictRedisObjectDestructor /* val destructor */
787 };
788
789 /* ========================= Random utility functions ======================= */
790
791 /* Redis generally does not try to recover from out of memory conditions
792 * when allocating objects or strings, it is not clear if it will be possible
793 * to report this condition to the client since the networking layer itself
794 * is based on heap allocation for send buffers, so we simply abort.
795 * At least the code will be simpler to read... */
796 static void oom(const char *msg) {
797 fprintf(stderr, "%s: Out of memory\n",msg);
798 fflush(stderr);
799 sleep(1);
800 abort();
801 }
802
803 /* ====================== Redis server networking stuff ===================== */
804 static void closeTimedoutClients(void) {
805 redisClient *c;
806 listNode *ln;
807 time_t now = time(NULL);
808
809 listRewind(server.clients);
810 while ((ln = listYield(server.clients)) != NULL) {
811 c = listNodeValue(ln);
812 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
813 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
814 (now - c->lastinteraction > server.maxidletime)) {
815 redisLog(REDIS_DEBUG,"Closing idle client");
816 freeClient(c);
817 }
818 }
819 }
820
821 static int htNeedsResize(dict *dict) {
822 long long size, used;
823
824 size = dictSlots(dict);
825 used = dictSize(dict);
826 return (size && used && size > DICT_HT_INITIAL_SIZE &&
827 (used*100/size < REDIS_HT_MINFILL));
828 }
829
830 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
831 * we resize the hash table to save memory */
832 static void tryResizeHashTables(void) {
833 int j;
834
835 for (j = 0; j < server.dbnum; j++) {
836 if (htNeedsResize(server.db[j].dict)) {
837 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
838 dictResize(server.db[j].dict);
839 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
840 }
841 if (htNeedsResize(server.db[j].expires))
842 dictResize(server.db[j].expires);
843 }
844 }
845
846 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
847 int j, loops = server.cronloops++;
848 REDIS_NOTUSED(eventLoop);
849 REDIS_NOTUSED(id);
850 REDIS_NOTUSED(clientData);
851
852 /* Update the global state with the amount of used memory */
853 server.usedmemory = zmalloc_used_memory();
854
855 /* Show some info about non-empty databases */
856 for (j = 0; j < server.dbnum; j++) {
857 long long size, used, vkeys;
858
859 size = dictSlots(server.db[j].dict);
860 used = dictSize(server.db[j].dict);
861 vkeys = dictSize(server.db[j].expires);
862 if (!(loops % 5) && (used || vkeys)) {
863 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
864 /* dictPrintStats(server.dict); */
865 }
866 }
867
868 /* We don't want to resize the hash tables while a bacground saving
869 * is in progress: the saving child is created using fork() that is
870 * implemented with a copy-on-write semantic in most modern systems, so
871 * if we resize the HT while there is the saving child at work actually
872 * a lot of memory movements in the parent will cause a lot of pages
873 * copied. */
874 if (!server.bgsaveinprogress) tryResizeHashTables();
875
876 /* Show information about connected clients */
877 if (!(loops % 5)) {
878 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
879 listLength(server.clients)-listLength(server.slaves),
880 listLength(server.slaves),
881 server.usedmemory,
882 dictSize(server.sharingpool));
883 }
884
885 /* Close connections of timedout clients */
886 if (server.maxidletime && !(loops % 10))
887 closeTimedoutClients();
888
889 /* Check if a background saving in progress terminated */
890 if (server.bgsaveinprogress) {
891 int statloc;
892 if (wait4(-1,&statloc,WNOHANG,NULL)) {
893 int exitcode = WEXITSTATUS(statloc);
894 int bysignal = WIFSIGNALED(statloc);
895
896 if (!bysignal && exitcode == 0) {
897 redisLog(REDIS_NOTICE,
898 "Background saving terminated with success");
899 server.dirty = 0;
900 server.lastsave = time(NULL);
901 } else if (!bysignal && exitcode != 0) {
902 redisLog(REDIS_WARNING, "Background saving error");
903 } else {
904 redisLog(REDIS_WARNING,
905 "Background saving terminated by signal");
906 rdbRemoveTempFile(server.bgsavechildpid);
907 }
908 server.bgsaveinprogress = 0;
909 server.bgsavechildpid = -1;
910 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
911 }
912 } else {
913 /* If there is not a background saving in progress check if
914 * we have to save now */
915 time_t now = time(NULL);
916 for (j = 0; j < server.saveparamslen; j++) {
917 struct saveparam *sp = server.saveparams+j;
918
919 if (server.dirty >= sp->changes &&
920 now-server.lastsave > sp->seconds) {
921 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
922 sp->changes, sp->seconds);
923 rdbSaveBackground(server.dbfilename);
924 break;
925 }
926 }
927 }
928
929 /* Try to expire a few timed out keys */
930 for (j = 0; j < server.dbnum; j++) {
931 redisDb *db = server.db+j;
932 int num = dictSize(db->expires);
933
934 if (num) {
935 time_t now = time(NULL);
936
937 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
938 num = REDIS_EXPIRELOOKUPS_PER_CRON;
939 while (num--) {
940 dictEntry *de;
941 time_t t;
942
943 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
944 t = (time_t) dictGetEntryVal(de);
945 if (now > t) {
946 deleteKey(db,dictGetEntryKey(de));
947 }
948 }
949 }
950 }
951
952 /* Check if we should connect to a MASTER */
953 if (server.replstate == REDIS_REPL_CONNECT) {
954 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
955 if (syncWithMaster() == REDIS_OK) {
956 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
957 }
958 }
959 return 1000;
960 }
961
962 static void createSharedObjects(void) {
963 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
964 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
965 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
966 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
967 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
968 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
969 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
970 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
971 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
972 /* no such key */
973 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
974 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
975 "-ERR Operation against a key holding the wrong kind of value\r\n"));
976 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
977 "-ERR no such key\r\n"));
978 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
979 "-ERR syntax error\r\n"));
980 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
981 "-ERR source and destination objects are the same\r\n"));
982 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
983 "-ERR index out of range\r\n"));
984 shared.space = createObject(REDIS_STRING,sdsnew(" "));
985 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
986 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
987 shared.select0 = createStringObject("select 0\r\n",10);
988 shared.select1 = createStringObject("select 1\r\n",10);
989 shared.select2 = createStringObject("select 2\r\n",10);
990 shared.select3 = createStringObject("select 3\r\n",10);
991 shared.select4 = createStringObject("select 4\r\n",10);
992 shared.select5 = createStringObject("select 5\r\n",10);
993 shared.select6 = createStringObject("select 6\r\n",10);
994 shared.select7 = createStringObject("select 7\r\n",10);
995 shared.select8 = createStringObject("select 8\r\n",10);
996 shared.select9 = createStringObject("select 9\r\n",10);
997 }
998
999 static void appendServerSaveParams(time_t seconds, int changes) {
1000 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1001 server.saveparams[server.saveparamslen].seconds = seconds;
1002 server.saveparams[server.saveparamslen].changes = changes;
1003 server.saveparamslen++;
1004 }
1005
1006 static void ResetServerSaveParams() {
1007 zfree(server.saveparams);
1008 server.saveparams = NULL;
1009 server.saveparamslen = 0;
1010 }
1011
1012 static void initServerConfig() {
1013 server.dbnum = REDIS_DEFAULT_DBNUM;
1014 server.port = REDIS_SERVERPORT;
1015 server.verbosity = REDIS_DEBUG;
1016 server.maxidletime = REDIS_MAXIDLETIME;
1017 server.saveparams = NULL;
1018 server.logfile = NULL; /* NULL = log on standard output */
1019 server.bindaddr = NULL;
1020 server.glueoutputbuf = 1;
1021 server.daemonize = 0;
1022 server.pidfile = "/var/run/redis.pid";
1023 server.dbfilename = "dump.rdb";
1024 server.requirepass = NULL;
1025 server.shareobjects = 0;
1026 server.sharingpoolsize = 1024;
1027 server.maxclients = 0;
1028 server.maxmemory = 0;
1029 ResetServerSaveParams();
1030
1031 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1032 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1033 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1034 /* Replication related */
1035 server.isslave = 0;
1036 server.masterhost = NULL;
1037 server.masterport = 6379;
1038 server.master = NULL;
1039 server.replstate = REDIS_REPL_NONE;
1040
1041 /* Double constants initialization */
1042 R_Zero = 0.0;
1043 R_PosInf = 1.0/R_Zero;
1044 R_NegInf = -1.0/R_Zero;
1045 R_Nan = R_Zero/R_Zero;
1046 }
1047
1048 static void initServer() {
1049 int j;
1050
1051 signal(SIGHUP, SIG_IGN);
1052 signal(SIGPIPE, SIG_IGN);
1053 setupSigSegvAction();
1054
1055 server.clients = listCreate();
1056 server.slaves = listCreate();
1057 server.monitors = listCreate();
1058 server.objfreelist = listCreate();
1059 createSharedObjects();
1060 server.el = aeCreateEventLoop();
1061 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1062 server.sharingpool = dictCreate(&setDictType,NULL);
1063 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1064 if (server.fd == -1) {
1065 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1066 exit(1);
1067 }
1068 for (j = 0; j < server.dbnum; j++) {
1069 server.db[j].dict = dictCreate(&hashDictType,NULL);
1070 server.db[j].expires = dictCreate(&setDictType,NULL);
1071 server.db[j].id = j;
1072 }
1073 server.cronloops = 0;
1074 server.bgsaveinprogress = 0;
1075 server.bgsavechildpid = -1;
1076 server.lastsave = time(NULL);
1077 server.dirty = 0;
1078 server.usedmemory = 0;
1079 server.stat_numcommands = 0;
1080 server.stat_numconnections = 0;
1081 server.stat_starttime = time(NULL);
1082 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1083 }
1084
1085 /* Empty the whole database */
1086 static long long emptyDb() {
1087 int j;
1088 long long removed = 0;
1089
1090 for (j = 0; j < server.dbnum; j++) {
1091 removed += dictSize(server.db[j].dict);
1092 dictEmpty(server.db[j].dict);
1093 dictEmpty(server.db[j].expires);
1094 }
1095 return removed;
1096 }
1097
1098 static int yesnotoi(char *s) {
1099 if (!strcasecmp(s,"yes")) return 1;
1100 else if (!strcasecmp(s,"no")) return 0;
1101 else return -1;
1102 }
1103
1104 /* I agree, this is a very rudimental way to load a configuration...
1105 will improve later if the config gets more complex */
1106 static void loadServerConfig(char *filename) {
1107 FILE *fp;
1108 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1109 int linenum = 0;
1110 sds line = NULL;
1111
1112 if (filename[0] == '-' && filename[1] == '\0')
1113 fp = stdin;
1114 else {
1115 if ((fp = fopen(filename,"r")) == NULL) {
1116 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1117 exit(1);
1118 }
1119 }
1120
1121 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1122 sds *argv;
1123 int argc, j;
1124
1125 linenum++;
1126 line = sdsnew(buf);
1127 line = sdstrim(line," \t\r\n");
1128
1129 /* Skip comments and blank lines*/
1130 if (line[0] == '#' || line[0] == '\0') {
1131 sdsfree(line);
1132 continue;
1133 }
1134
1135 /* Split into arguments */
1136 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1137 sdstolower(argv[0]);
1138
1139 /* Execute config directives */
1140 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1141 server.maxidletime = atoi(argv[1]);
1142 if (server.maxidletime < 0) {
1143 err = "Invalid timeout value"; goto loaderr;
1144 }
1145 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1146 server.port = atoi(argv[1]);
1147 if (server.port < 1 || server.port > 65535) {
1148 err = "Invalid port"; goto loaderr;
1149 }
1150 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1151 server.bindaddr = zstrdup(argv[1]);
1152 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1153 int seconds = atoi(argv[1]);
1154 int changes = atoi(argv[2]);
1155 if (seconds < 1 || changes < 0) {
1156 err = "Invalid save parameters"; goto loaderr;
1157 }
1158 appendServerSaveParams(seconds,changes);
1159 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1160 if (chdir(argv[1]) == -1) {
1161 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1162 argv[1], strerror(errno));
1163 exit(1);
1164 }
1165 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1166 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1167 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1168 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1169 else {
1170 err = "Invalid log level. Must be one of debug, notice, warning";
1171 goto loaderr;
1172 }
1173 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1174 FILE *logfp;
1175
1176 server.logfile = zstrdup(argv[1]);
1177 if (!strcasecmp(server.logfile,"stdout")) {
1178 zfree(server.logfile);
1179 server.logfile = NULL;
1180 }
1181 if (server.logfile) {
1182 /* Test if we are able to open the file. The server will not
1183 * be able to abort just for this problem later... */
1184 logfp = fopen(server.logfile,"a");
1185 if (logfp == NULL) {
1186 err = sdscatprintf(sdsempty(),
1187 "Can't open the log file: %s", strerror(errno));
1188 goto loaderr;
1189 }
1190 fclose(logfp);
1191 }
1192 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1193 server.dbnum = atoi(argv[1]);
1194 if (server.dbnum < 1) {
1195 err = "Invalid number of databases"; goto loaderr;
1196 }
1197 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1198 server.maxclients = atoi(argv[1]);
1199 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1200 server.maxmemory = strtoll(argv[1], NULL, 10);
1201 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1202 server.masterhost = sdsnew(argv[1]);
1203 server.masterport = atoi(argv[2]);
1204 server.replstate = REDIS_REPL_CONNECT;
1205 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1206 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1207 err = "argument must be 'yes' or 'no'"; goto loaderr;
1208 }
1209 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1210 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1211 err = "argument must be 'yes' or 'no'"; goto loaderr;
1212 }
1213 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1214 server.sharingpoolsize = atoi(argv[1]);
1215 if (server.sharingpoolsize < 1) {
1216 err = "invalid object sharing pool size"; goto loaderr;
1217 }
1218 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1219 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1220 err = "argument must be 'yes' or 'no'"; goto loaderr;
1221 }
1222 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1223 server.requirepass = zstrdup(argv[1]);
1224 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1225 server.pidfile = zstrdup(argv[1]);
1226 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1227 server.dbfilename = zstrdup(argv[1]);
1228 } else {
1229 err = "Bad directive or wrong number of arguments"; goto loaderr;
1230 }
1231 for (j = 0; j < argc; j++)
1232 sdsfree(argv[j]);
1233 zfree(argv);
1234 sdsfree(line);
1235 }
1236 if (fp != stdin) fclose(fp);
1237 return;
1238
1239 loaderr:
1240 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1241 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1242 fprintf(stderr, ">>> '%s'\n", line);
1243 fprintf(stderr, "%s\n", err);
1244 exit(1);
1245 }
1246
1247 static void freeClientArgv(redisClient *c) {
1248 int j;
1249
1250 for (j = 0; j < c->argc; j++)
1251 decrRefCount(c->argv[j]);
1252 for (j = 0; j < c->mbargc; j++)
1253 decrRefCount(c->mbargv[j]);
1254 c->argc = 0;
1255 c->mbargc = 0;
1256 }
1257
1258 static void freeClient(redisClient *c) {
1259 listNode *ln;
1260
1261 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1262 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1263 sdsfree(c->querybuf);
1264 listRelease(c->reply);
1265 freeClientArgv(c);
1266 close(c->fd);
1267 ln = listSearchKey(server.clients,c);
1268 assert(ln != NULL);
1269 listDelNode(server.clients,ln);
1270 if (c->flags & REDIS_SLAVE) {
1271 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1272 close(c->repldbfd);
1273 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1274 ln = listSearchKey(l,c);
1275 assert(ln != NULL);
1276 listDelNode(l,ln);
1277 }
1278 if (c->flags & REDIS_MASTER) {
1279 server.master = NULL;
1280 server.replstate = REDIS_REPL_CONNECT;
1281 }
1282 zfree(c->argv);
1283 zfree(c->mbargv);
1284 zfree(c);
1285 }
1286
1287 static void glueReplyBuffersIfNeeded(redisClient *c) {
1288 int totlen = 0;
1289 listNode *ln;
1290 robj *o;
1291
1292 listRewind(c->reply);
1293 while((ln = listYield(c->reply))) {
1294 o = ln->value;
1295 totlen += sdslen(o->ptr);
1296 /* This optimization makes more sense if we don't have to copy
1297 * too much data */
1298 if (totlen > 1024) return;
1299 }
1300 if (totlen > 0) {
1301 char buf[1024];
1302 int copylen = 0;
1303
1304 listRewind(c->reply);
1305 while((ln = listYield(c->reply))) {
1306 o = ln->value;
1307 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1308 copylen += sdslen(o->ptr);
1309 listDelNode(c->reply,ln);
1310 }
1311 /* Now the output buffer is empty, add the new single element */
1312 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1313 listAddNodeTail(c->reply,o);
1314 }
1315 }
1316
1317 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1318 redisClient *c = privdata;
1319 int nwritten = 0, totwritten = 0, objlen;
1320 robj *o;
1321 REDIS_NOTUSED(el);
1322 REDIS_NOTUSED(mask);
1323
1324 if (server.glueoutputbuf && listLength(c->reply) > 1)
1325 glueReplyBuffersIfNeeded(c);
1326 while(listLength(c->reply)) {
1327 o = listNodeValue(listFirst(c->reply));
1328 objlen = sdslen(o->ptr);
1329
1330 if (objlen == 0) {
1331 listDelNode(c->reply,listFirst(c->reply));
1332 continue;
1333 }
1334
1335 if (c->flags & REDIS_MASTER) {
1336 /* Don't reply to a master */
1337 nwritten = objlen - c->sentlen;
1338 } else {
1339 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1340 if (nwritten <= 0) break;
1341 }
1342 c->sentlen += nwritten;
1343 totwritten += nwritten;
1344 /* If we fully sent the object on head go to the next one */
1345 if (c->sentlen == objlen) {
1346 listDelNode(c->reply,listFirst(c->reply));
1347 c->sentlen = 0;
1348 }
1349 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1350 * bytes, in a single threaded server it's a good idea to server
1351 * other clients as well, even if a very large request comes from
1352 * super fast link that is always able to accept data (in real world
1353 * terms think to 'KEYS *' against the loopback interfae) */
1354 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1355 }
1356 if (nwritten == -1) {
1357 if (errno == EAGAIN) {
1358 nwritten = 0;
1359 } else {
1360 redisLog(REDIS_DEBUG,
1361 "Error writing to client: %s", strerror(errno));
1362 freeClient(c);
1363 return;
1364 }
1365 }
1366 if (totwritten > 0) c->lastinteraction = time(NULL);
1367 if (listLength(c->reply) == 0) {
1368 c->sentlen = 0;
1369 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1370 }
1371 }
1372
1373 static struct redisCommand *lookupCommand(char *name) {
1374 int j = 0;
1375 while(cmdTable[j].name != NULL) {
1376 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1377 j++;
1378 }
1379 return NULL;
1380 }
1381
1382 /* resetClient prepare the client to process the next command */
1383 static void resetClient(redisClient *c) {
1384 freeClientArgv(c);
1385 c->bulklen = -1;
1386 c->multibulk = 0;
1387 }
1388
1389 /* If this function gets called we already read a whole
1390 * command, argments are in the client argv/argc fields.
1391 * processCommand() execute the command or prepare the
1392 * server for a bulk read from the client.
1393 *
1394 * If 1 is returned the client is still alive and valid and
1395 * and other operations can be performed by the caller. Otherwise
1396 * if 0 is returned the client was destroied (i.e. after QUIT). */
1397 static int processCommand(redisClient *c) {
1398 struct redisCommand *cmd;
1399 long long dirty;
1400
1401 /* Free some memory if needed (maxmemory setting) */
1402 if (server.maxmemory) freeMemoryIfNeeded();
1403
1404 /* Handle the multi bulk command type. This is an alternative protocol
1405 * supported by Redis in order to receive commands that are composed of
1406 * multiple binary-safe "bulk" arguments. The latency of processing is
1407 * a bit higher but this allows things like multi-sets, so if this
1408 * protocol is used only for MSET and similar commands this is a big win. */
1409 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1410 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1411 if (c->multibulk <= 0) {
1412 resetClient(c);
1413 return 1;
1414 } else {
1415 decrRefCount(c->argv[c->argc-1]);
1416 c->argc--;
1417 return 1;
1418 }
1419 } else if (c->multibulk) {
1420 if (c->bulklen == -1) {
1421 if (((char*)c->argv[0]->ptr)[0] != '$') {
1422 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1423 resetClient(c);
1424 return 1;
1425 } else {
1426 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1427 decrRefCount(c->argv[0]);
1428 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1429 c->argc--;
1430 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1431 resetClient(c);
1432 return 1;
1433 }
1434 c->argc--;
1435 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1436 return 1;
1437 }
1438 } else {
1439 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1440 c->mbargv[c->mbargc] = c->argv[0];
1441 c->mbargc++;
1442 c->argc--;
1443 c->multibulk--;
1444 if (c->multibulk == 0) {
1445 robj **auxargv;
1446 int auxargc;
1447
1448 /* Here we need to swap the multi-bulk argc/argv with the
1449 * normal argc/argv of the client structure. */
1450 auxargv = c->argv;
1451 c->argv = c->mbargv;
1452 c->mbargv = auxargv;
1453
1454 auxargc = c->argc;
1455 c->argc = c->mbargc;
1456 c->mbargc = auxargc;
1457
1458 /* We need to set bulklen to something different than -1
1459 * in order for the code below to process the command without
1460 * to try to read the last argument of a bulk command as
1461 * a special argument. */
1462 c->bulklen = 0;
1463 /* continue below and process the command */
1464 } else {
1465 c->bulklen = -1;
1466 return 1;
1467 }
1468 }
1469 }
1470 /* -- end of multi bulk commands processing -- */
1471
1472 /* The QUIT command is handled as a special case. Normal command
1473 * procs are unable to close the client connection safely */
1474 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1475 freeClient(c);
1476 return 0;
1477 }
1478 cmd = lookupCommand(c->argv[0]->ptr);
1479 if (!cmd) {
1480 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1481 resetClient(c);
1482 return 1;
1483 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1484 (c->argc < -cmd->arity)) {
1485 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1486 resetClient(c);
1487 return 1;
1488 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1489 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1490 resetClient(c);
1491 return 1;
1492 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1493 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1494
1495 decrRefCount(c->argv[c->argc-1]);
1496 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1497 c->argc--;
1498 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1499 resetClient(c);
1500 return 1;
1501 }
1502 c->argc--;
1503 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1504 /* It is possible that the bulk read is already in the
1505 * buffer. Check this condition and handle it accordingly.
1506 * This is just a fast path, alternative to call processInputBuffer().
1507 * It's a good idea since the code is small and this condition
1508 * happens most of the times. */
1509 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1510 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1511 c->argc++;
1512 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1513 } else {
1514 return 1;
1515 }
1516 }
1517 /* Let's try to share objects on the command arguments vector */
1518 if (server.shareobjects) {
1519 int j;
1520 for(j = 1; j < c->argc; j++)
1521 c->argv[j] = tryObjectSharing(c->argv[j]);
1522 }
1523 /* Let's try to encode the bulk object to save space. */
1524 if (cmd->flags & REDIS_CMD_BULK)
1525 tryObjectEncoding(c->argv[c->argc-1]);
1526
1527 /* Check if the user is authenticated */
1528 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1529 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1530 resetClient(c);
1531 return 1;
1532 }
1533
1534 /* Exec the command */
1535 dirty = server.dirty;
1536 cmd->proc(c);
1537 if (server.dirty-dirty != 0 && listLength(server.slaves))
1538 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1539 if (listLength(server.monitors))
1540 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1541 server.stat_numcommands++;
1542
1543 /* Prepare the client for the next command */
1544 if (c->flags & REDIS_CLOSE) {
1545 freeClient(c);
1546 return 0;
1547 }
1548 resetClient(c);
1549 return 1;
1550 }
1551
1552 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1553 listNode *ln;
1554 int outc = 0, j;
1555 robj **outv;
1556 /* (args*2)+1 is enough room for args, spaces, newlines */
1557 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1558
1559 if (argc <= REDIS_STATIC_ARGS) {
1560 outv = static_outv;
1561 } else {
1562 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1563 }
1564
1565 for (j = 0; j < argc; j++) {
1566 if (j != 0) outv[outc++] = shared.space;
1567 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1568 robj *lenobj;
1569
1570 lenobj = createObject(REDIS_STRING,
1571 sdscatprintf(sdsempty(),"%d\r\n",
1572 stringObjectLen(argv[j])));
1573 lenobj->refcount = 0;
1574 outv[outc++] = lenobj;
1575 }
1576 outv[outc++] = argv[j];
1577 }
1578 outv[outc++] = shared.crlf;
1579
1580 /* Increment all the refcounts at start and decrement at end in order to
1581 * be sure to free objects if there is no slave in a replication state
1582 * able to be feed with commands */
1583 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1584 listRewind(slaves);
1585 while((ln = listYield(slaves))) {
1586 redisClient *slave = ln->value;
1587
1588 /* Don't feed slaves that are still waiting for BGSAVE to start */
1589 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1590
1591 /* Feed all the other slaves, MONITORs and so on */
1592 if (slave->slaveseldb != dictid) {
1593 robj *selectcmd;
1594
1595 switch(dictid) {
1596 case 0: selectcmd = shared.select0; break;
1597 case 1: selectcmd = shared.select1; break;
1598 case 2: selectcmd = shared.select2; break;
1599 case 3: selectcmd = shared.select3; break;
1600 case 4: selectcmd = shared.select4; break;
1601 case 5: selectcmd = shared.select5; break;
1602 case 6: selectcmd = shared.select6; break;
1603 case 7: selectcmd = shared.select7; break;
1604 case 8: selectcmd = shared.select8; break;
1605 case 9: selectcmd = shared.select9; break;
1606 default:
1607 selectcmd = createObject(REDIS_STRING,
1608 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1609 selectcmd->refcount = 0;
1610 break;
1611 }
1612 addReply(slave,selectcmd);
1613 slave->slaveseldb = dictid;
1614 }
1615 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1616 }
1617 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1618 if (outv != static_outv) zfree(outv);
1619 }
1620
1621 static void processInputBuffer(redisClient *c) {
1622 again:
1623 if (c->bulklen == -1) {
1624 /* Read the first line of the query */
1625 char *p = strchr(c->querybuf,'\n');
1626 size_t querylen;
1627
1628 if (p) {
1629 sds query, *argv;
1630 int argc, j;
1631
1632 query = c->querybuf;
1633 c->querybuf = sdsempty();
1634 querylen = 1+(p-(query));
1635 if (sdslen(query) > querylen) {
1636 /* leave data after the first line of the query in the buffer */
1637 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1638 }
1639 *p = '\0'; /* remove "\n" */
1640 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1641 sdsupdatelen(query);
1642
1643 /* Now we can split the query in arguments */
1644 if (sdslen(query) == 0) {
1645 /* Ignore empty query */
1646 sdsfree(query);
1647 return;
1648 }
1649 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1650 sdsfree(query);
1651
1652 if (c->argv) zfree(c->argv);
1653 c->argv = zmalloc(sizeof(robj*)*argc);
1654
1655 for (j = 0; j < argc; j++) {
1656 if (sdslen(argv[j])) {
1657 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1658 c->argc++;
1659 } else {
1660 sdsfree(argv[j]);
1661 }
1662 }
1663 zfree(argv);
1664 /* Execute the command. If the client is still valid
1665 * after processCommand() return and there is something
1666 * on the query buffer try to process the next command. */
1667 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1668 return;
1669 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1670 redisLog(REDIS_DEBUG, "Client protocol error");
1671 freeClient(c);
1672 return;
1673 }
1674 } else {
1675 /* Bulk read handling. Note that if we are at this point
1676 the client already sent a command terminated with a newline,
1677 we are reading the bulk data that is actually the last
1678 argument of the command. */
1679 int qbl = sdslen(c->querybuf);
1680
1681 if (c->bulklen <= qbl) {
1682 /* Copy everything but the final CRLF as final argument */
1683 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1684 c->argc++;
1685 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1686 /* Process the command. If the client is still valid after
1687 * the processing and there is more data in the buffer
1688 * try to parse it. */
1689 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1690 return;
1691 }
1692 }
1693 }
1694
1695 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1696 redisClient *c = (redisClient*) privdata;
1697 char buf[REDIS_IOBUF_LEN];
1698 int nread;
1699 REDIS_NOTUSED(el);
1700 REDIS_NOTUSED(mask);
1701
1702 nread = read(fd, buf, REDIS_IOBUF_LEN);
1703 if (nread == -1) {
1704 if (errno == EAGAIN) {
1705 nread = 0;
1706 } else {
1707 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1708 freeClient(c);
1709 return;
1710 }
1711 } else if (nread == 0) {
1712 redisLog(REDIS_DEBUG, "Client closed connection");
1713 freeClient(c);
1714 return;
1715 }
1716 if (nread) {
1717 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1718 c->lastinteraction = time(NULL);
1719 } else {
1720 return;
1721 }
1722 processInputBuffer(c);
1723 }
1724
1725 static int selectDb(redisClient *c, int id) {
1726 if (id < 0 || id >= server.dbnum)
1727 return REDIS_ERR;
1728 c->db = &server.db[id];
1729 return REDIS_OK;
1730 }
1731
1732 static void *dupClientReplyValue(void *o) {
1733 incrRefCount((robj*)o);
1734 return 0;
1735 }
1736
1737 static redisClient *createClient(int fd) {
1738 redisClient *c = zmalloc(sizeof(*c));
1739
1740 anetNonBlock(NULL,fd);
1741 anetTcpNoDelay(NULL,fd);
1742 if (!c) return NULL;
1743 selectDb(c,0);
1744 c->fd = fd;
1745 c->querybuf = sdsempty();
1746 c->argc = 0;
1747 c->argv = NULL;
1748 c->bulklen = -1;
1749 c->multibulk = 0;
1750 c->mbargc = 0;
1751 c->mbargv = NULL;
1752 c->sentlen = 0;
1753 c->flags = 0;
1754 c->lastinteraction = time(NULL);
1755 c->authenticated = 0;
1756 c->replstate = REDIS_REPL_NONE;
1757 c->reply = listCreate();
1758 listSetFreeMethod(c->reply,decrRefCount);
1759 listSetDupMethod(c->reply,dupClientReplyValue);
1760 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1761 readQueryFromClient, c, NULL) == AE_ERR) {
1762 freeClient(c);
1763 return NULL;
1764 }
1765 listAddNodeTail(server.clients,c);
1766 return c;
1767 }
1768
1769 static void addReply(redisClient *c, robj *obj) {
1770 if (listLength(c->reply) == 0 &&
1771 (c->replstate == REDIS_REPL_NONE ||
1772 c->replstate == REDIS_REPL_ONLINE) &&
1773 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1774 sendReplyToClient, c, NULL) == AE_ERR) return;
1775 if (obj->encoding != REDIS_ENCODING_RAW) {
1776 obj = getDecodedObject(obj);
1777 } else {
1778 incrRefCount(obj);
1779 }
1780 listAddNodeTail(c->reply,obj);
1781 }
1782
1783 static void addReplySds(redisClient *c, sds s) {
1784 robj *o = createObject(REDIS_STRING,s);
1785 addReply(c,o);
1786 decrRefCount(o);
1787 }
1788
1789 static void addReplyBulkLen(redisClient *c, robj *obj) {
1790 size_t len;
1791
1792 if (obj->encoding == REDIS_ENCODING_RAW) {
1793 len = sdslen(obj->ptr);
1794 } else {
1795 long n = (long)obj->ptr;
1796
1797 len = 1;
1798 if (n < 0) {
1799 len++;
1800 n = -n;
1801 }
1802 while((n = n/10) != 0) {
1803 len++;
1804 }
1805 }
1806 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1807 }
1808
1809 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1810 int cport, cfd;
1811 char cip[128];
1812 redisClient *c;
1813 REDIS_NOTUSED(el);
1814 REDIS_NOTUSED(mask);
1815 REDIS_NOTUSED(privdata);
1816
1817 cfd = anetAccept(server.neterr, fd, cip, &cport);
1818 if (cfd == AE_ERR) {
1819 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1820 return;
1821 }
1822 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1823 if ((c = createClient(cfd)) == NULL) {
1824 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1825 close(cfd); /* May be already closed, just ingore errors */
1826 return;
1827 }
1828 /* If maxclient directive is set and this is one client more... close the
1829 * connection. Note that we create the client instead to check before
1830 * for this condition, since now the socket is already set in nonblocking
1831 * mode and we can send an error for free using the Kernel I/O */
1832 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1833 char *err = "-ERR max number of clients reached\r\n";
1834
1835 /* That's a best effort error message, don't check write errors */
1836 (void) write(c->fd,err,strlen(err));
1837 freeClient(c);
1838 return;
1839 }
1840 server.stat_numconnections++;
1841 }
1842
1843 /* ======================= Redis objects implementation ===================== */
1844
1845 static robj *createObject(int type, void *ptr) {
1846 robj *o;
1847
1848 if (listLength(server.objfreelist)) {
1849 listNode *head = listFirst(server.objfreelist);
1850 o = listNodeValue(head);
1851 listDelNode(server.objfreelist,head);
1852 } else {
1853 o = zmalloc(sizeof(*o));
1854 }
1855 o->type = type;
1856 o->encoding = REDIS_ENCODING_RAW;
1857 o->ptr = ptr;
1858 o->refcount = 1;
1859 return o;
1860 }
1861
1862 static robj *createStringObject(char *ptr, size_t len) {
1863 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1864 }
1865
1866 static robj *createListObject(void) {
1867 list *l = listCreate();
1868
1869 listSetFreeMethod(l,decrRefCount);
1870 return createObject(REDIS_LIST,l);
1871 }
1872
1873 static robj *createSetObject(void) {
1874 dict *d = dictCreate(&setDictType,NULL);
1875 return createObject(REDIS_SET,d);
1876 }
1877
1878 static robj *createZsetObject(void) {
1879 zset *zs = zmalloc(sizeof(*zs));
1880
1881 zs->dict = dictCreate(&zsetDictType,NULL);
1882 zs->zsl = zslCreate();
1883 return createObject(REDIS_ZSET,zs);
1884 }
1885
1886 static void freeStringObject(robj *o) {
1887 if (o->encoding == REDIS_ENCODING_RAW) {
1888 sdsfree(o->ptr);
1889 }
1890 }
1891
1892 static void freeListObject(robj *o) {
1893 listRelease((list*) o->ptr);
1894 }
1895
1896 static void freeSetObject(robj *o) {
1897 dictRelease((dict*) o->ptr);
1898 }
1899
1900 static void freeZsetObject(robj *o) {
1901 zset *zs = o->ptr;
1902
1903 dictRelease(zs->dict);
1904 zslFree(zs->zsl);
1905 zfree(zs);
1906 }
1907
1908 static void freeHashObject(robj *o) {
1909 dictRelease((dict*) o->ptr);
1910 }
1911
1912 static void incrRefCount(robj *o) {
1913 o->refcount++;
1914 #ifdef DEBUG_REFCOUNT
1915 if (o->type == REDIS_STRING)
1916 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1917 #endif
1918 }
1919
1920 static void decrRefCount(void *obj) {
1921 robj *o = obj;
1922
1923 #ifdef DEBUG_REFCOUNT
1924 if (o->type == REDIS_STRING)
1925 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1926 #endif
1927 if (--(o->refcount) == 0) {
1928 switch(o->type) {
1929 case REDIS_STRING: freeStringObject(o); break;
1930 case REDIS_LIST: freeListObject(o); break;
1931 case REDIS_SET: freeSetObject(o); break;
1932 case REDIS_ZSET: freeZsetObject(o); break;
1933 case REDIS_HASH: freeHashObject(o); break;
1934 default: assert(0 != 0); break;
1935 }
1936 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1937 !listAddNodeHead(server.objfreelist,o))
1938 zfree(o);
1939 }
1940 }
1941
1942 static robj *lookupKey(redisDb *db, robj *key) {
1943 dictEntry *de = dictFind(db->dict,key);
1944 return de ? dictGetEntryVal(de) : NULL;
1945 }
1946
1947 static robj *lookupKeyRead(redisDb *db, robj *key) {
1948 expireIfNeeded(db,key);
1949 return lookupKey(db,key);
1950 }
1951
1952 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1953 deleteIfVolatile(db,key);
1954 return lookupKey(db,key);
1955 }
1956
1957 static int deleteKey(redisDb *db, robj *key) {
1958 int retval;
1959
1960 /* We need to protect key from destruction: after the first dictDelete()
1961 * it may happen that 'key' is no longer valid if we don't increment
1962 * it's count. This may happen when we get the object reference directly
1963 * from the hash table with dictRandomKey() or dict iterators */
1964 incrRefCount(key);
1965 if (dictSize(db->expires)) dictDelete(db->expires,key);
1966 retval = dictDelete(db->dict,key);
1967 decrRefCount(key);
1968
1969 return retval == DICT_OK;
1970 }
1971
1972 /* Try to share an object against the shared objects pool */
1973 static robj *tryObjectSharing(robj *o) {
1974 struct dictEntry *de;
1975 unsigned long c;
1976
1977 if (o == NULL || server.shareobjects == 0) return o;
1978
1979 assert(o->type == REDIS_STRING);
1980 de = dictFind(server.sharingpool,o);
1981 if (de) {
1982 robj *shared = dictGetEntryKey(de);
1983
1984 c = ((unsigned long) dictGetEntryVal(de))+1;
1985 dictGetEntryVal(de) = (void*) c;
1986 incrRefCount(shared);
1987 decrRefCount(o);
1988 return shared;
1989 } else {
1990 /* Here we are using a stream algorihtm: Every time an object is
1991 * shared we increment its count, everytime there is a miss we
1992 * recrement the counter of a random object. If this object reaches
1993 * zero we remove the object and put the current object instead. */
1994 if (dictSize(server.sharingpool) >=
1995 server.sharingpoolsize) {
1996 de = dictGetRandomKey(server.sharingpool);
1997 assert(de != NULL);
1998 c = ((unsigned long) dictGetEntryVal(de))-1;
1999 dictGetEntryVal(de) = (void*) c;
2000 if (c == 0) {
2001 dictDelete(server.sharingpool,de->key);
2002 }
2003 } else {
2004 c = 0; /* If the pool is empty we want to add this object */
2005 }
2006 if (c == 0) {
2007 int retval;
2008
2009 retval = dictAdd(server.sharingpool,o,(void*)1);
2010 assert(retval == DICT_OK);
2011 incrRefCount(o);
2012 }
2013 return o;
2014 }
2015 }
2016
2017 /* Check if the nul-terminated string 's' can be represented by a long
2018 * (that is, is a number that fits into long without any other space or
2019 * character before or after the digits).
2020 *
2021 * If so, the function returns REDIS_OK and *longval is set to the value
2022 * of the number. Otherwise REDIS_ERR is returned */
2023 static int isStringRepresentableAsLong(sds s, long *longval) {
2024 char buf[32], *endptr;
2025 long value;
2026 int slen;
2027
2028 value = strtol(s, &endptr, 10);
2029 if (endptr[0] != '\0') return REDIS_ERR;
2030 slen = snprintf(buf,32,"%ld",value);
2031
2032 /* If the number converted back into a string is not identical
2033 * then it's not possible to encode the string as integer */
2034 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2035 if (longval) *longval = value;
2036 return REDIS_OK;
2037 }
2038
2039 /* Try to encode a string object in order to save space */
2040 static int tryObjectEncoding(robj *o) {
2041 long value;
2042 sds s = o->ptr;
2043
2044 if (o->encoding != REDIS_ENCODING_RAW)
2045 return REDIS_ERR; /* Already encoded */
2046
2047 /* It's not save to encode shared objects: shared objects can be shared
2048 * everywhere in the "object space" of Redis. Encoded objects can only
2049 * appear as "values" (and not, for instance, as keys) */
2050 if (o->refcount > 1) return REDIS_ERR;
2051
2052 /* Currently we try to encode only strings */
2053 assert(o->type == REDIS_STRING);
2054
2055 /* Check if we can represent this string as a long integer */
2056 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2057
2058 /* Ok, this object can be encoded */
2059 o->encoding = REDIS_ENCODING_INT;
2060 sdsfree(o->ptr);
2061 o->ptr = (void*) value;
2062 return REDIS_OK;
2063 }
2064
2065 /* Get a decoded version of an encoded object (returned as a new object) */
2066 static robj *getDecodedObject(const robj *o) {
2067 robj *dec;
2068
2069 assert(o->encoding != REDIS_ENCODING_RAW);
2070 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2071 char buf[32];
2072
2073 snprintf(buf,32,"%ld",(long)o->ptr);
2074 dec = createStringObject(buf,strlen(buf));
2075 return dec;
2076 } else {
2077 assert(1 != 1);
2078 }
2079 }
2080
2081 /* Compare two string objects via strcmp() or alike.
2082 * Note that the objects may be integer-encoded. In such a case we
2083 * use snprintf() to get a string representation of the numbers on the stack
2084 * and compare the strings, it's much faster than calling getDecodedObject(). */
2085 static int compareStringObjects(robj *a, robj *b) {
2086 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2087 char bufa[128], bufb[128], *astr, *bstr;
2088 int bothsds = 1;
2089
2090 if (a == b) return 0;
2091 if (a->encoding != REDIS_ENCODING_RAW) {
2092 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2093 astr = bufa;
2094 bothsds = 0;
2095 } else {
2096 astr = a->ptr;
2097 }
2098 if (b->encoding != REDIS_ENCODING_RAW) {
2099 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2100 bstr = bufb;
2101 bothsds = 0;
2102 } else {
2103 bstr = b->ptr;
2104 }
2105 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2106 }
2107
2108 static size_t stringObjectLen(robj *o) {
2109 assert(o->type == REDIS_STRING);
2110 if (o->encoding == REDIS_ENCODING_RAW) {
2111 return sdslen(o->ptr);
2112 } else {
2113 char buf[32];
2114
2115 return snprintf(buf,32,"%ld",(long)o->ptr);
2116 }
2117 }
2118
2119 /*============================ DB saving/loading ============================ */
2120
2121 static int rdbSaveType(FILE *fp, unsigned char type) {
2122 if (fwrite(&type,1,1,fp) == 0) return -1;
2123 return 0;
2124 }
2125
2126 static int rdbSaveTime(FILE *fp, time_t t) {
2127 int32_t t32 = (int32_t) t;
2128 if (fwrite(&t32,4,1,fp) == 0) return -1;
2129 return 0;
2130 }
2131
2132 /* check rdbLoadLen() comments for more info */
2133 static int rdbSaveLen(FILE *fp, uint32_t len) {
2134 unsigned char buf[2];
2135
2136 if (len < (1<<6)) {
2137 /* Save a 6 bit len */
2138 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2139 if (fwrite(buf,1,1,fp) == 0) return -1;
2140 } else if (len < (1<<14)) {
2141 /* Save a 14 bit len */
2142 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2143 buf[1] = len&0xFF;
2144 if (fwrite(buf,2,1,fp) == 0) return -1;
2145 } else {
2146 /* Save a 32 bit len */
2147 buf[0] = (REDIS_RDB_32BITLEN<<6);
2148 if (fwrite(buf,1,1,fp) == 0) return -1;
2149 len = htonl(len);
2150 if (fwrite(&len,4,1,fp) == 0) return -1;
2151 }
2152 return 0;
2153 }
2154
2155 /* String objects in the form "2391" "-100" without any space and with a
2156 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2157 * encoded as integers to save space */
2158 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2159 long long value;
2160 char *endptr, buf[32];
2161
2162 /* Check if it's possible to encode this value as a number */
2163 value = strtoll(s, &endptr, 10);
2164 if (endptr[0] != '\0') return 0;
2165 snprintf(buf,32,"%lld",value);
2166
2167 /* If the number converted back into a string is not identical
2168 * then it's not possible to encode the string as integer */
2169 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2170
2171 /* Finally check if it fits in our ranges */
2172 if (value >= -(1<<7) && value <= (1<<7)-1) {
2173 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2174 enc[1] = value&0xFF;
2175 return 2;
2176 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2177 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2178 enc[1] = value&0xFF;
2179 enc[2] = (value>>8)&0xFF;
2180 return 3;
2181 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2182 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2183 enc[1] = value&0xFF;
2184 enc[2] = (value>>8)&0xFF;
2185 enc[3] = (value>>16)&0xFF;
2186 enc[4] = (value>>24)&0xFF;
2187 return 5;
2188 } else {
2189 return 0;
2190 }
2191 }
2192
2193 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2194 unsigned int comprlen, outlen;
2195 unsigned char byte;
2196 void *out;
2197
2198 /* We require at least four bytes compression for this to be worth it */
2199 outlen = sdslen(obj->ptr)-4;
2200 if (outlen <= 0) return 0;
2201 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2202 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2203 if (comprlen == 0) {
2204 zfree(out);
2205 return 0;
2206 }
2207 /* Data compressed! Let's save it on disk */
2208 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2209 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2210 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2211 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2212 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2213 zfree(out);
2214 return comprlen;
2215
2216 writeerr:
2217 zfree(out);
2218 return -1;
2219 }
2220
2221 /* Save a string objet as [len][data] on disk. If the object is a string
2222 * representation of an integer value we try to safe it in a special form */
2223 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2224 size_t len;
2225 int enclen;
2226
2227 len = sdslen(obj->ptr);
2228
2229 /* Try integer encoding */
2230 if (len <= 11) {
2231 unsigned char buf[5];
2232 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2233 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2234 return 0;
2235 }
2236 }
2237
2238 /* Try LZF compression - under 20 bytes it's unable to compress even
2239 * aaaaaaaaaaaaaaaaaa so skip it */
2240 if (len > 20) {
2241 int retval;
2242
2243 retval = rdbSaveLzfStringObject(fp,obj);
2244 if (retval == -1) return -1;
2245 if (retval > 0) return 0;
2246 /* retval == 0 means data can't be compressed, save the old way */
2247 }
2248
2249 /* Store verbatim */
2250 if (rdbSaveLen(fp,len) == -1) return -1;
2251 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2252 return 0;
2253 }
2254
2255 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2256 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2257 int retval;
2258 robj *dec;
2259
2260 if (obj->encoding != REDIS_ENCODING_RAW) {
2261 dec = getDecodedObject(obj);
2262 retval = rdbSaveStringObjectRaw(fp,dec);
2263 decrRefCount(dec);
2264 return retval;
2265 } else {
2266 return rdbSaveStringObjectRaw(fp,obj);
2267 }
2268 }
2269
2270 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2271 * 8 bit integer specifing the length of the representation.
2272 * This 8 bit integer has special values in order to specify the following
2273 * conditions:
2274 * 253: not a number
2275 * 254: + inf
2276 * 255: - inf
2277 */
2278 static int rdbSaveDoubleValue(FILE *fp, double val) {
2279 unsigned char buf[128];
2280 int len;
2281
2282 if (isnan(val)) {
2283 buf[0] = 253;
2284 len = 1;
2285 } else if (!isfinite(val)) {
2286 len = 1;
2287 buf[0] = (val < 0) ? 255 : 254;
2288 } else {
2289 snprintf((char*)buf+1,sizeof(buf)-1,"%.16g",val);
2290 buf[0] = strlen((char*)buf);
2291 len = buf[0]+1;
2292 }
2293 if (fwrite(buf,len,1,fp) == 0) return -1;
2294 return 0;
2295 }
2296
2297 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2298 static int rdbSave(char *filename) {
2299 dictIterator *di = NULL;
2300 dictEntry *de;
2301 FILE *fp;
2302 char tmpfile[256];
2303 int j;
2304 time_t now = time(NULL);
2305
2306 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2307 fp = fopen(tmpfile,"w");
2308 if (!fp) {
2309 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2310 return REDIS_ERR;
2311 }
2312 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2313 for (j = 0; j < server.dbnum; j++) {
2314 redisDb *db = server.db+j;
2315 dict *d = db->dict;
2316 if (dictSize(d) == 0) continue;
2317 di = dictGetIterator(d);
2318 if (!di) {
2319 fclose(fp);
2320 return REDIS_ERR;
2321 }
2322
2323 /* Write the SELECT DB opcode */
2324 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2325 if (rdbSaveLen(fp,j) == -1) goto werr;
2326
2327 /* Iterate this DB writing every entry */
2328 while((de = dictNext(di)) != NULL) {
2329 robj *key = dictGetEntryKey(de);
2330 robj *o = dictGetEntryVal(de);
2331 time_t expiretime = getExpire(db,key);
2332
2333 /* Save the expire time */
2334 if (expiretime != -1) {
2335 /* If this key is already expired skip it */
2336 if (expiretime < now) continue;
2337 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2338 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2339 }
2340 /* Save the key and associated value */
2341 if (rdbSaveType(fp,o->type) == -1) goto werr;
2342 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2343 if (o->type == REDIS_STRING) {
2344 /* Save a string value */
2345 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2346 } else if (o->type == REDIS_LIST) {
2347 /* Save a list value */
2348 list *list = o->ptr;
2349 listNode *ln;
2350
2351 listRewind(list);
2352 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2353 while((ln = listYield(list))) {
2354 robj *eleobj = listNodeValue(ln);
2355
2356 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2357 }
2358 } else if (o->type == REDIS_SET) {
2359 /* Save a set value */
2360 dict *set = o->ptr;
2361 dictIterator *di = dictGetIterator(set);
2362 dictEntry *de;
2363
2364 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2365 while((de = dictNext(di)) != NULL) {
2366 robj *eleobj = dictGetEntryKey(de);
2367
2368 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2369 }
2370 dictReleaseIterator(di);
2371 } else if (o->type == REDIS_ZSET) {
2372 /* Save a set value */
2373 zset *zs = o->ptr;
2374 dictIterator *di = dictGetIterator(zs->dict);
2375 dictEntry *de;
2376
2377 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2378 while((de = dictNext(di)) != NULL) {
2379 robj *eleobj = dictGetEntryKey(de);
2380 double *score = dictGetEntryVal(de);
2381
2382 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2383 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2384 }
2385 dictReleaseIterator(di);
2386 } else {
2387 assert(0 != 0);
2388 }
2389 }
2390 dictReleaseIterator(di);
2391 }
2392 /* EOF opcode */
2393 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2394
2395 /* Make sure data will not remain on the OS's output buffers */
2396 fflush(fp);
2397 fsync(fileno(fp));
2398 fclose(fp);
2399
2400 /* Use RENAME to make sure the DB file is changed atomically only
2401 * if the generate DB file is ok. */
2402 if (rename(tmpfile,filename) == -1) {
2403 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2404 unlink(tmpfile);
2405 return REDIS_ERR;
2406 }
2407 redisLog(REDIS_NOTICE,"DB saved on disk");
2408 server.dirty = 0;
2409 server.lastsave = time(NULL);
2410 return REDIS_OK;
2411
2412 werr:
2413 fclose(fp);
2414 unlink(tmpfile);
2415 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2416 if (di) dictReleaseIterator(di);
2417 return REDIS_ERR;
2418 }
2419
2420 static int rdbSaveBackground(char *filename) {
2421 pid_t childpid;
2422
2423 if (server.bgsaveinprogress) return REDIS_ERR;
2424 if ((childpid = fork()) == 0) {
2425 /* Child */
2426 close(server.fd);
2427 if (rdbSave(filename) == REDIS_OK) {
2428 exit(0);
2429 } else {
2430 exit(1);
2431 }
2432 } else {
2433 /* Parent */
2434 if (childpid == -1) {
2435 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2436 strerror(errno));
2437 return REDIS_ERR;
2438 }
2439 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2440 server.bgsaveinprogress = 1;
2441 server.bgsavechildpid = childpid;
2442 return REDIS_OK;
2443 }
2444 return REDIS_OK; /* unreached */
2445 }
2446
2447 static void rdbRemoveTempFile(pid_t childpid) {
2448 char tmpfile[256];
2449
2450 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2451 unlink(tmpfile);
2452 }
2453
2454 static int rdbLoadType(FILE *fp) {
2455 unsigned char type;
2456 if (fread(&type,1,1,fp) == 0) return -1;
2457 return type;
2458 }
2459
2460 static time_t rdbLoadTime(FILE *fp) {
2461 int32_t t32;
2462 if (fread(&t32,4,1,fp) == 0) return -1;
2463 return (time_t) t32;
2464 }
2465
2466 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2467 * of this file for a description of how this are stored on disk.
2468 *
2469 * isencoded is set to 1 if the readed length is not actually a length but
2470 * an "encoding type", check the above comments for more info */
2471 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2472 unsigned char buf[2];
2473 uint32_t len;
2474
2475 if (isencoded) *isencoded = 0;
2476 if (rdbver == 0) {
2477 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2478 return ntohl(len);
2479 } else {
2480 int type;
2481
2482 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2483 type = (buf[0]&0xC0)>>6;
2484 if (type == REDIS_RDB_6BITLEN) {
2485 /* Read a 6 bit len */
2486 return buf[0]&0x3F;
2487 } else if (type == REDIS_RDB_ENCVAL) {
2488 /* Read a 6 bit len encoding type */
2489 if (isencoded) *isencoded = 1;
2490 return buf[0]&0x3F;
2491 } else if (type == REDIS_RDB_14BITLEN) {
2492 /* Read a 14 bit len */
2493 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2494 return ((buf[0]&0x3F)<<8)|buf[1];
2495 } else {
2496 /* Read a 32 bit len */
2497 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2498 return ntohl(len);
2499 }
2500 }
2501 }
2502
2503 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2504 unsigned char enc[4];
2505 long long val;
2506
2507 if (enctype == REDIS_RDB_ENC_INT8) {
2508 if (fread(enc,1,1,fp) == 0) return NULL;
2509 val = (signed char)enc[0];
2510 } else if (enctype == REDIS_RDB_ENC_INT16) {
2511 uint16_t v;
2512 if (fread(enc,2,1,fp) == 0) return NULL;
2513 v = enc[0]|(enc[1]<<8);
2514 val = (int16_t)v;
2515 } else if (enctype == REDIS_RDB_ENC_INT32) {
2516 uint32_t v;
2517 if (fread(enc,4,1,fp) == 0) return NULL;
2518 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2519 val = (int32_t)v;
2520 } else {
2521 val = 0; /* anti-warning */
2522 assert(0!=0);
2523 }
2524 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2525 }
2526
2527 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2528 unsigned int len, clen;
2529 unsigned char *c = NULL;
2530 sds val = NULL;
2531
2532 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2533 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2534 if ((c = zmalloc(clen)) == NULL) goto err;
2535 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2536 if (fread(c,clen,1,fp) == 0) goto err;
2537 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2538 zfree(c);
2539 return createObject(REDIS_STRING,val);
2540 err:
2541 zfree(c);
2542 sdsfree(val);
2543 return NULL;
2544 }
2545
2546 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2547 int isencoded;
2548 uint32_t len;
2549 sds val;
2550
2551 len = rdbLoadLen(fp,rdbver,&isencoded);
2552 if (isencoded) {
2553 switch(len) {
2554 case REDIS_RDB_ENC_INT8:
2555 case REDIS_RDB_ENC_INT16:
2556 case REDIS_RDB_ENC_INT32:
2557 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2558 case REDIS_RDB_ENC_LZF:
2559 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2560 default:
2561 assert(0!=0);
2562 }
2563 }
2564
2565 if (len == REDIS_RDB_LENERR) return NULL;
2566 val = sdsnewlen(NULL,len);
2567 if (len && fread(val,len,1,fp) == 0) {
2568 sdsfree(val);
2569 return NULL;
2570 }
2571 return tryObjectSharing(createObject(REDIS_STRING,val));
2572 }
2573
2574 /* For information about double serialization check rdbSaveDoubleValue() */
2575 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2576 char buf[128];
2577 unsigned char len;
2578
2579 if (fread(&len,1,1,fp) == 0) return -1;
2580 switch(len) {
2581 case 255: *val = R_NegInf; return 0;
2582 case 254: *val = R_PosInf; return 0;
2583 case 253: *val = R_Nan; return 0;
2584 default:
2585 if (fread(buf,len,1,fp) == 0) return -1;
2586 sscanf(buf, "%lg", val);
2587 return 0;
2588 }
2589 }
2590
2591 static int rdbLoad(char *filename) {
2592 FILE *fp;
2593 robj *keyobj = NULL;
2594 uint32_t dbid;
2595 int type, retval, rdbver;
2596 dict *d = server.db[0].dict;
2597 redisDb *db = server.db+0;
2598 char buf[1024];
2599 time_t expiretime = -1, now = time(NULL);
2600
2601 fp = fopen(filename,"r");
2602 if (!fp) return REDIS_ERR;
2603 if (fread(buf,9,1,fp) == 0) goto eoferr;
2604 buf[9] = '\0';
2605 if (memcmp(buf,"REDIS",5) != 0) {
2606 fclose(fp);
2607 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2608 return REDIS_ERR;
2609 }
2610 rdbver = atoi(buf+5);
2611 if (rdbver > 1) {
2612 fclose(fp);
2613 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2614 return REDIS_ERR;
2615 }
2616 while(1) {
2617 robj *o;
2618
2619 /* Read type. */
2620 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2621 if (type == REDIS_EXPIRETIME) {
2622 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2623 /* We read the time so we need to read the object type again */
2624 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2625 }
2626 if (type == REDIS_EOF) break;
2627 /* Handle SELECT DB opcode as a special case */
2628 if (type == REDIS_SELECTDB) {
2629 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2630 goto eoferr;
2631 if (dbid >= (unsigned)server.dbnum) {
2632 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2633 exit(1);
2634 }
2635 db = server.db+dbid;
2636 d = db->dict;
2637 continue;
2638 }
2639 /* Read key */
2640 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2641
2642 if (type == REDIS_STRING) {
2643 /* Read string value */
2644 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2645 tryObjectEncoding(o);
2646 } else if (type == REDIS_LIST || type == REDIS_SET) {
2647 /* Read list/set value */
2648 uint32_t listlen;
2649
2650 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2651 goto eoferr;
2652 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2653 /* Load every single element of the list/set */
2654 while(listlen--) {
2655 robj *ele;
2656
2657 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2658 tryObjectEncoding(ele);
2659 if (type == REDIS_LIST) {
2660 listAddNodeTail((list*)o->ptr,ele);
2661 } else {
2662 dictAdd((dict*)o->ptr,ele,NULL);
2663 }
2664 }
2665 } else if (type == REDIS_ZSET) {
2666 /* Read list/set value */
2667 uint32_t zsetlen;
2668 zset *zs;
2669
2670 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2671 goto eoferr;
2672 o = createZsetObject();
2673 zs = o->ptr;
2674 /* Load every single element of the list/set */
2675 while(zsetlen--) {
2676 robj *ele;
2677 double *score = zmalloc(sizeof(double));
2678
2679 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2680 tryObjectEncoding(ele);
2681 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2682 dictAdd(zs->dict,ele,score);
2683 zslInsert(zs->zsl,*score,ele);
2684 incrRefCount(ele); /* added to skiplist */
2685 }
2686 } else {
2687 assert(0 != 0);
2688 }
2689 /* Add the new object in the hash table */
2690 retval = dictAdd(d,keyobj,o);
2691 if (retval == DICT_ERR) {
2692 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2693 exit(1);
2694 }
2695 /* Set the expire time if needed */
2696 if (expiretime != -1) {
2697 setExpire(db,keyobj,expiretime);
2698 /* Delete this key if already expired */
2699 if (expiretime < now) deleteKey(db,keyobj);
2700 expiretime = -1;
2701 }
2702 keyobj = o = NULL;
2703 }
2704 fclose(fp);
2705 return REDIS_OK;
2706
2707 eoferr: /* unexpected end of file is handled here with a fatal exit */
2708 if (keyobj) decrRefCount(keyobj);
2709 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2710 exit(1);
2711 return REDIS_ERR; /* Just to avoid warning */
2712 }
2713
2714 /*================================== Commands =============================== */
2715
2716 static void authCommand(redisClient *c) {
2717 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2718 c->authenticated = 1;
2719 addReply(c,shared.ok);
2720 } else {
2721 c->authenticated = 0;
2722 addReply(c,shared.err);
2723 }
2724 }
2725
2726 static void pingCommand(redisClient *c) {
2727 addReply(c,shared.pong);
2728 }
2729
2730 static void echoCommand(redisClient *c) {
2731 addReplyBulkLen(c,c->argv[1]);
2732 addReply(c,c->argv[1]);
2733 addReply(c,shared.crlf);
2734 }
2735
2736 /*=================================== Strings =============================== */
2737
2738 static void setGenericCommand(redisClient *c, int nx) {
2739 int retval;
2740
2741 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2742 if (retval == DICT_ERR) {
2743 if (!nx) {
2744 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2745 incrRefCount(c->argv[2]);
2746 } else {
2747 addReply(c,shared.czero);
2748 return;
2749 }
2750 } else {
2751 incrRefCount(c->argv[1]);
2752 incrRefCount(c->argv[2]);
2753 }
2754 server.dirty++;
2755 removeExpire(c->db,c->argv[1]);
2756 addReply(c, nx ? shared.cone : shared.ok);
2757 }
2758
2759 static void setCommand(redisClient *c) {
2760 setGenericCommand(c,0);
2761 }
2762
2763 static void setnxCommand(redisClient *c) {
2764 setGenericCommand(c,1);
2765 }
2766
2767 static void getCommand(redisClient *c) {
2768 robj *o = lookupKeyRead(c->db,c->argv[1]);
2769
2770 if (o == NULL) {
2771 addReply(c,shared.nullbulk);
2772 } else {
2773 if (o->type != REDIS_STRING) {
2774 addReply(c,shared.wrongtypeerr);
2775 } else {
2776 addReplyBulkLen(c,o);
2777 addReply(c,o);
2778 addReply(c,shared.crlf);
2779 }
2780 }
2781 }
2782
2783 static void getsetCommand(redisClient *c) {
2784 getCommand(c);
2785 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2786 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2787 } else {
2788 incrRefCount(c->argv[1]);
2789 }
2790 incrRefCount(c->argv[2]);
2791 server.dirty++;
2792 removeExpire(c->db,c->argv[1]);
2793 }
2794
2795 static void mgetCommand(redisClient *c) {
2796 int j;
2797
2798 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2799 for (j = 1; j < c->argc; j++) {
2800 robj *o = lookupKeyRead(c->db,c->argv[j]);
2801 if (o == NULL) {
2802 addReply(c,shared.nullbulk);
2803 } else {
2804 if (o->type != REDIS_STRING) {
2805 addReply(c,shared.nullbulk);
2806 } else {
2807 addReplyBulkLen(c,o);
2808 addReply(c,o);
2809 addReply(c,shared.crlf);
2810 }
2811 }
2812 }
2813 }
2814
2815 static void incrDecrCommand(redisClient *c, long long incr) {
2816 long long value;
2817 int retval;
2818 robj *o;
2819
2820 o = lookupKeyWrite(c->db,c->argv[1]);
2821 if (o == NULL) {
2822 value = 0;
2823 } else {
2824 if (o->type != REDIS_STRING) {
2825 value = 0;
2826 } else {
2827 char *eptr;
2828
2829 if (o->encoding == REDIS_ENCODING_RAW)
2830 value = strtoll(o->ptr, &eptr, 10);
2831 else if (o->encoding == REDIS_ENCODING_INT)
2832 value = (long)o->ptr;
2833 else
2834 assert(1 != 1);
2835 }
2836 }
2837
2838 value += incr;
2839 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2840 tryObjectEncoding(o);
2841 retval = dictAdd(c->db->dict,c->argv[1],o);
2842 if (retval == DICT_ERR) {
2843 dictReplace(c->db->dict,c->argv[1],o);
2844 removeExpire(c->db,c->argv[1]);
2845 } else {
2846 incrRefCount(c->argv[1]);
2847 }
2848 server.dirty++;
2849 addReply(c,shared.colon);
2850 addReply(c,o);
2851 addReply(c,shared.crlf);
2852 }
2853
2854 static void incrCommand(redisClient *c) {
2855 incrDecrCommand(c,1);
2856 }
2857
2858 static void decrCommand(redisClient *c) {
2859 incrDecrCommand(c,-1);
2860 }
2861
2862 static void incrbyCommand(redisClient *c) {
2863 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2864 incrDecrCommand(c,incr);
2865 }
2866
2867 static void decrbyCommand(redisClient *c) {
2868 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2869 incrDecrCommand(c,-incr);
2870 }
2871
2872 /* ========================= Type agnostic commands ========================= */
2873
2874 static void delCommand(redisClient *c) {
2875 int deleted = 0, j;
2876
2877 for (j = 1; j < c->argc; j++) {
2878 if (deleteKey(c->db,c->argv[j])) {
2879 server.dirty++;
2880 deleted++;
2881 }
2882 }
2883 switch(deleted) {
2884 case 0:
2885 addReply(c,shared.czero);
2886 break;
2887 case 1:
2888 addReply(c,shared.cone);
2889 break;
2890 default:
2891 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2892 break;
2893 }
2894 }
2895
2896 static void existsCommand(redisClient *c) {
2897 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2898 }
2899
2900 static void selectCommand(redisClient *c) {
2901 int id = atoi(c->argv[1]->ptr);
2902
2903 if (selectDb(c,id) == REDIS_ERR) {
2904 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2905 } else {
2906 addReply(c,shared.ok);
2907 }
2908 }
2909
2910 static void randomkeyCommand(redisClient *c) {
2911 dictEntry *de;
2912
2913 while(1) {
2914 de = dictGetRandomKey(c->db->dict);
2915 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2916 }
2917 if (de == NULL) {
2918 addReply(c,shared.plus);
2919 addReply(c,shared.crlf);
2920 } else {
2921 addReply(c,shared.plus);
2922 addReply(c,dictGetEntryKey(de));
2923 addReply(c,shared.crlf);
2924 }
2925 }
2926
2927 static void keysCommand(redisClient *c) {
2928 dictIterator *di;
2929 dictEntry *de;
2930 sds pattern = c->argv[1]->ptr;
2931 int plen = sdslen(pattern);
2932 int numkeys = 0, keyslen = 0;
2933 robj *lenobj = createObject(REDIS_STRING,NULL);
2934
2935 di = dictGetIterator(c->db->dict);
2936 addReply(c,lenobj);
2937 decrRefCount(lenobj);
2938 while((de = dictNext(di)) != NULL) {
2939 robj *keyobj = dictGetEntryKey(de);
2940
2941 sds key = keyobj->ptr;
2942 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2943 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2944 if (expireIfNeeded(c->db,keyobj) == 0) {
2945 if (numkeys != 0)
2946 addReply(c,shared.space);
2947 addReply(c,keyobj);
2948 numkeys++;
2949 keyslen += sdslen(key);
2950 }
2951 }
2952 }
2953 dictReleaseIterator(di);
2954 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2955 addReply(c,shared.crlf);
2956 }
2957
2958 static void dbsizeCommand(redisClient *c) {
2959 addReplySds(c,
2960 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2961 }
2962
2963 static void lastsaveCommand(redisClient *c) {
2964 addReplySds(c,
2965 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2966 }
2967
2968 static void typeCommand(redisClient *c) {
2969 robj *o;
2970 char *type;
2971
2972 o = lookupKeyRead(c->db,c->argv[1]);
2973 if (o == NULL) {
2974 type = "+none";
2975 } else {
2976 switch(o->type) {
2977 case REDIS_STRING: type = "+string"; break;
2978 case REDIS_LIST: type = "+list"; break;
2979 case REDIS_SET: type = "+set"; break;
2980 default: type = "unknown"; break;
2981 }
2982 }
2983 addReplySds(c,sdsnew(type));
2984 addReply(c,shared.crlf);
2985 }
2986
2987 static void saveCommand(redisClient *c) {
2988 if (server.bgsaveinprogress) {
2989 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2990 return;
2991 }
2992 if (rdbSave(server.dbfilename) == REDIS_OK) {
2993 addReply(c,shared.ok);
2994 } else {
2995 addReply(c,shared.err);
2996 }
2997 }
2998
2999 static void bgsaveCommand(redisClient *c) {
3000 if (server.bgsaveinprogress) {
3001 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3002 return;
3003 }
3004 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3005 addReply(c,shared.ok);
3006 } else {
3007 addReply(c,shared.err);
3008 }
3009 }
3010
3011 static void shutdownCommand(redisClient *c) {
3012 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3013 /* Kill the saving child if there is a background saving in progress.
3014 We want to avoid race conditions, for instance our saving child may
3015 overwrite the synchronous saving did by SHUTDOWN. */
3016 if (server.bgsaveinprogress) {
3017 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3018 kill(server.bgsavechildpid,SIGKILL);
3019 rdbRemoveTempFile(server.bgsavechildpid);
3020 }
3021 /* SYNC SAVE */
3022 if (rdbSave(server.dbfilename) == REDIS_OK) {
3023 if (server.daemonize)
3024 unlink(server.pidfile);
3025 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3026 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3027 exit(1);
3028 } else {
3029 /* Ooops.. error saving! The best we can do is to continue operating.
3030 * Note that if there was a background saving process, in the next
3031 * cron() Redis will be notified that the background saving aborted,
3032 * handling special stuff like slaves pending for synchronization... */
3033 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3034 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3035 }
3036 }
3037
3038 static void renameGenericCommand(redisClient *c, int nx) {
3039 robj *o;
3040
3041 /* To use the same key as src and dst is probably an error */
3042 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3043 addReply(c,shared.sameobjecterr);
3044 return;
3045 }
3046
3047 o = lookupKeyWrite(c->db,c->argv[1]);
3048 if (o == NULL) {
3049 addReply(c,shared.nokeyerr);
3050 return;
3051 }
3052 incrRefCount(o);
3053 deleteIfVolatile(c->db,c->argv[2]);
3054 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3055 if (nx) {
3056 decrRefCount(o);
3057 addReply(c,shared.czero);
3058 return;
3059 }
3060 dictReplace(c->db->dict,c->argv[2],o);
3061 } else {
3062 incrRefCount(c->argv[2]);
3063 }
3064 deleteKey(c->db,c->argv[1]);
3065 server.dirty++;
3066 addReply(c,nx ? shared.cone : shared.ok);
3067 }
3068
3069 static void renameCommand(redisClient *c) {
3070 renameGenericCommand(c,0);
3071 }
3072
3073 static void renamenxCommand(redisClient *c) {
3074 renameGenericCommand(c,1);
3075 }
3076
3077 static void moveCommand(redisClient *c) {
3078 robj *o;
3079 redisDb *src, *dst;
3080 int srcid;
3081
3082 /* Obtain source and target DB pointers */
3083 src = c->db;
3084 srcid = c->db->id;
3085 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3086 addReply(c,shared.outofrangeerr);
3087 return;
3088 }
3089 dst = c->db;
3090 selectDb(c,srcid); /* Back to the source DB */
3091
3092 /* If the user is moving using as target the same
3093 * DB as the source DB it is probably an error. */
3094 if (src == dst) {
3095 addReply(c,shared.sameobjecterr);
3096 return;
3097 }
3098
3099 /* Check if the element exists and get a reference */
3100 o = lookupKeyWrite(c->db,c->argv[1]);
3101 if (!o) {
3102 addReply(c,shared.czero);
3103 return;
3104 }
3105
3106 /* Try to add the element to the target DB */
3107 deleteIfVolatile(dst,c->argv[1]);
3108 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3109 addReply(c,shared.czero);
3110 return;
3111 }
3112 incrRefCount(c->argv[1]);
3113 incrRefCount(o);
3114
3115 /* OK! key moved, free the entry in the source DB */
3116 deleteKey(src,c->argv[1]);
3117 server.dirty++;
3118 addReply(c,shared.cone);
3119 }
3120
3121 /* =================================== Lists ================================ */
3122 static void pushGenericCommand(redisClient *c, int where) {
3123 robj *lobj;
3124 list *list;
3125
3126 lobj = lookupKeyWrite(c->db,c->argv[1]);
3127 if (lobj == NULL) {
3128 lobj = createListObject();
3129 list = lobj->ptr;
3130 if (where == REDIS_HEAD) {
3131 listAddNodeHead(list,c->argv[2]);
3132 } else {
3133 listAddNodeTail(list,c->argv[2]);
3134 }
3135 dictAdd(c->db->dict,c->argv[1],lobj);
3136 incrRefCount(c->argv[1]);
3137 incrRefCount(c->argv[2]);
3138 } else {
3139 if (lobj->type != REDIS_LIST) {
3140 addReply(c,shared.wrongtypeerr);
3141 return;
3142 }
3143 list = lobj->ptr;
3144 if (where == REDIS_HEAD) {
3145 listAddNodeHead(list,c->argv[2]);
3146 } else {
3147 listAddNodeTail(list,c->argv[2]);
3148 }
3149 incrRefCount(c->argv[2]);
3150 }
3151 server.dirty++;
3152 addReply(c,shared.ok);
3153 }
3154
3155 static void lpushCommand(redisClient *c) {
3156 pushGenericCommand(c,REDIS_HEAD);
3157 }
3158
3159 static void rpushCommand(redisClient *c) {
3160 pushGenericCommand(c,REDIS_TAIL);
3161 }
3162
3163 static void llenCommand(redisClient *c) {
3164 robj *o;
3165 list *l;
3166
3167 o = lookupKeyRead(c->db,c->argv[1]);
3168 if (o == NULL) {
3169 addReply(c,shared.czero);
3170 return;
3171 } else {
3172 if (o->type != REDIS_LIST) {
3173 addReply(c,shared.wrongtypeerr);
3174 } else {
3175 l = o->ptr;
3176 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3177 }
3178 }
3179 }
3180
3181 static void lindexCommand(redisClient *c) {
3182 robj *o;
3183 int index = atoi(c->argv[2]->ptr);
3184
3185 o = lookupKeyRead(c->db,c->argv[1]);
3186 if (o == NULL) {
3187 addReply(c,shared.nullbulk);
3188 } else {
3189 if (o->type != REDIS_LIST) {
3190 addReply(c,shared.wrongtypeerr);
3191 } else {
3192 list *list = o->ptr;
3193 listNode *ln;
3194
3195 ln = listIndex(list, index);
3196 if (ln == NULL) {
3197 addReply(c,shared.nullbulk);
3198 } else {
3199 robj *ele = listNodeValue(ln);
3200 addReplyBulkLen(c,ele);
3201 addReply(c,ele);
3202 addReply(c,shared.crlf);
3203 }
3204 }
3205 }
3206 }
3207
3208 static void lsetCommand(redisClient *c) {
3209 robj *o;
3210 int index = atoi(c->argv[2]->ptr);
3211
3212 o = lookupKeyWrite(c->db,c->argv[1]);
3213 if (o == NULL) {
3214 addReply(c,shared.nokeyerr);
3215 } else {
3216 if (o->type != REDIS_LIST) {
3217 addReply(c,shared.wrongtypeerr);
3218 } else {
3219 list *list = o->ptr;
3220 listNode *ln;
3221
3222 ln = listIndex(list, index);
3223 if (ln == NULL) {
3224 addReply(c,shared.outofrangeerr);
3225 } else {
3226 robj *ele = listNodeValue(ln);
3227
3228 decrRefCount(ele);
3229 listNodeValue(ln) = c->argv[3];
3230 incrRefCount(c->argv[3]);
3231 addReply(c,shared.ok);
3232 server.dirty++;
3233 }
3234 }
3235 }
3236 }
3237
3238 static void popGenericCommand(redisClient *c, int where) {
3239 robj *o;
3240
3241 o = lookupKeyWrite(c->db,c->argv[1]);
3242 if (o == NULL) {
3243 addReply(c,shared.nullbulk);
3244 } else {
3245 if (o->type != REDIS_LIST) {
3246 addReply(c,shared.wrongtypeerr);
3247 } else {
3248 list *list = o->ptr;
3249 listNode *ln;
3250
3251 if (where == REDIS_HEAD)
3252 ln = listFirst(list);
3253 else
3254 ln = listLast(list);
3255
3256 if (ln == NULL) {
3257 addReply(c,shared.nullbulk);
3258 } else {
3259 robj *ele = listNodeValue(ln);
3260 addReplyBulkLen(c,ele);
3261 addReply(c,ele);
3262 addReply(c,shared.crlf);
3263 listDelNode(list,ln);
3264 server.dirty++;
3265 }
3266 }
3267 }
3268 }
3269
3270 static void lpopCommand(redisClient *c) {
3271 popGenericCommand(c,REDIS_HEAD);
3272 }
3273
3274 static void rpopCommand(redisClient *c) {
3275 popGenericCommand(c,REDIS_TAIL);
3276 }
3277
3278 static void lrangeCommand(redisClient *c) {
3279 robj *o;
3280 int start = atoi(c->argv[2]->ptr);
3281 int end = atoi(c->argv[3]->ptr);
3282
3283 o = lookupKeyRead(c->db,c->argv[1]);
3284 if (o == NULL) {
3285 addReply(c,shared.nullmultibulk);
3286 } else {
3287 if (o->type != REDIS_LIST) {
3288 addReply(c,shared.wrongtypeerr);
3289 } else {
3290 list *list = o->ptr;
3291 listNode *ln;
3292 int llen = listLength(list);
3293 int rangelen, j;
3294 robj *ele;
3295
3296 /* convert negative indexes */
3297 if (start < 0) start = llen+start;
3298 if (end < 0) end = llen+end;
3299 if (start < 0) start = 0;
3300 if (end < 0) end = 0;
3301
3302 /* indexes sanity checks */
3303 if (start > end || start >= llen) {
3304 /* Out of range start or start > end result in empty list */
3305 addReply(c,shared.emptymultibulk);
3306 return;
3307 }
3308 if (end >= llen) end = llen-1;
3309 rangelen = (end-start)+1;
3310
3311 /* Return the result in form of a multi-bulk reply */
3312 ln = listIndex(list, start);
3313 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3314 for (j = 0; j < rangelen; j++) {
3315 ele = listNodeValue(ln);
3316 addReplyBulkLen(c,ele);
3317 addReply(c,ele);
3318 addReply(c,shared.crlf);
3319 ln = ln->next;
3320 }
3321 }
3322 }
3323 }
3324
3325 static void ltrimCommand(redisClient *c) {
3326 robj *o;
3327 int start = atoi(c->argv[2]->ptr);
3328 int end = atoi(c->argv[3]->ptr);
3329
3330 o = lookupKeyWrite(c->db,c->argv[1]);
3331 if (o == NULL) {
3332 addReply(c,shared.nokeyerr);
3333 } else {
3334 if (o->type != REDIS_LIST) {
3335 addReply(c,shared.wrongtypeerr);
3336 } else {
3337 list *list = o->ptr;
3338 listNode *ln;
3339 int llen = listLength(list);
3340 int j, ltrim, rtrim;
3341
3342 /* convert negative indexes */
3343 if (start < 0) start = llen+start;
3344 if (end < 0) end = llen+end;
3345 if (start < 0) start = 0;
3346 if (end < 0) end = 0;
3347
3348 /* indexes sanity checks */
3349 if (start > end || start >= llen) {
3350 /* Out of range start or start > end result in empty list */
3351 ltrim = llen;
3352 rtrim = 0;
3353 } else {
3354 if (end >= llen) end = llen-1;
3355 ltrim = start;
3356 rtrim = llen-end-1;
3357 }
3358
3359 /* Remove list elements to perform the trim */
3360 for (j = 0; j < ltrim; j++) {
3361 ln = listFirst(list);
3362 listDelNode(list,ln);
3363 }
3364 for (j = 0; j < rtrim; j++) {
3365 ln = listLast(list);
3366 listDelNode(list,ln);
3367 }
3368 server.dirty++;
3369 addReply(c,shared.ok);
3370 }
3371 }
3372 }
3373
3374 static void lremCommand(redisClient *c) {
3375 robj *o;
3376
3377 o = lookupKeyWrite(c->db,c->argv[1]);
3378 if (o == NULL) {
3379 addReply(c,shared.czero);
3380 } else {
3381 if (o->type != REDIS_LIST) {
3382 addReply(c,shared.wrongtypeerr);
3383 } else {
3384 list *list = o->ptr;
3385 listNode *ln, *next;
3386 int toremove = atoi(c->argv[2]->ptr);
3387 int removed = 0;
3388 int fromtail = 0;
3389
3390 if (toremove < 0) {
3391 toremove = -toremove;
3392 fromtail = 1;
3393 }
3394 ln = fromtail ? list->tail : list->head;
3395 while (ln) {
3396 robj *ele = listNodeValue(ln);
3397
3398 next = fromtail ? ln->prev : ln->next;
3399 if (compareStringObjects(ele,c->argv[3]) == 0) {
3400 listDelNode(list,ln);
3401 server.dirty++;
3402 removed++;
3403 if (toremove && removed == toremove) break;
3404 }
3405 ln = next;
3406 }
3407 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3408 }
3409 }
3410 }
3411
3412 /* ==================================== Sets ================================ */
3413
3414 static void saddCommand(redisClient *c) {
3415 robj *set;
3416
3417 set = lookupKeyWrite(c->db,c->argv[1]);
3418 if (set == NULL) {
3419 set = createSetObject();
3420 dictAdd(c->db->dict,c->argv[1],set);
3421 incrRefCount(c->argv[1]);
3422 } else {
3423 if (set->type != REDIS_SET) {
3424 addReply(c,shared.wrongtypeerr);
3425 return;
3426 }
3427 }
3428 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3429 incrRefCount(c->argv[2]);
3430 server.dirty++;
3431 addReply(c,shared.cone);
3432 } else {
3433 addReply(c,shared.czero);
3434 }
3435 }
3436
3437 static void sremCommand(redisClient *c) {
3438 robj *set;
3439
3440 set = lookupKeyWrite(c->db,c->argv[1]);
3441 if (set == NULL) {
3442 addReply(c,shared.czero);
3443 } else {
3444 if (set->type != REDIS_SET) {
3445 addReply(c,shared.wrongtypeerr);
3446 return;
3447 }
3448 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3449 server.dirty++;
3450 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3451 addReply(c,shared.cone);
3452 } else {
3453 addReply(c,shared.czero);
3454 }
3455 }
3456 }
3457
3458 static void smoveCommand(redisClient *c) {
3459 robj *srcset, *dstset;
3460
3461 srcset = lookupKeyWrite(c->db,c->argv[1]);
3462 dstset = lookupKeyWrite(c->db,c->argv[2]);
3463
3464 /* If the source key does not exist return 0, if it's of the wrong type
3465 * raise an error */
3466 if (srcset == NULL || srcset->type != REDIS_SET) {
3467 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3468 return;
3469 }
3470 /* Error if the destination key is not a set as well */
3471 if (dstset && dstset->type != REDIS_SET) {
3472 addReply(c,shared.wrongtypeerr);
3473 return;
3474 }
3475 /* Remove the element from the source set */
3476 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3477 /* Key not found in the src set! return zero */
3478 addReply(c,shared.czero);
3479 return;
3480 }
3481 server.dirty++;
3482 /* Add the element to the destination set */
3483 if (!dstset) {
3484 dstset = createSetObject();
3485 dictAdd(c->db->dict,c->argv[2],dstset);
3486 incrRefCount(c->argv[2]);
3487 }
3488 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3489 incrRefCount(c->argv[3]);
3490 addReply(c,shared.cone);
3491 }
3492
3493 static void sismemberCommand(redisClient *c) {
3494 robj *set;
3495
3496 set = lookupKeyRead(c->db,c->argv[1]);
3497 if (set == NULL) {
3498 addReply(c,shared.czero);
3499 } else {
3500 if (set->type != REDIS_SET) {
3501 addReply(c,shared.wrongtypeerr);
3502 return;
3503 }
3504 if (dictFind(set->ptr,c->argv[2]))
3505 addReply(c,shared.cone);
3506 else
3507 addReply(c,shared.czero);
3508 }
3509 }
3510
3511 static void scardCommand(redisClient *c) {
3512 robj *o;
3513 dict *s;
3514
3515 o = lookupKeyRead(c->db,c->argv[1]);
3516 if (o == NULL) {
3517 addReply(c,shared.czero);
3518 return;
3519 } else {
3520 if (o->type != REDIS_SET) {
3521 addReply(c,shared.wrongtypeerr);
3522 } else {
3523 s = o->ptr;
3524 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3525 dictSize(s)));
3526 }
3527 }
3528 }
3529
3530 static void spopCommand(redisClient *c) {
3531 robj *set;
3532 dictEntry *de;
3533
3534 set = lookupKeyWrite(c->db,c->argv[1]);
3535 if (set == NULL) {
3536 addReply(c,shared.nullbulk);
3537 } else {
3538 if (set->type != REDIS_SET) {
3539 addReply(c,shared.wrongtypeerr);
3540 return;
3541 }
3542 de = dictGetRandomKey(set->ptr);
3543 if (de == NULL) {
3544 addReply(c,shared.nullbulk);
3545 } else {
3546 robj *ele = dictGetEntryKey(de);
3547
3548 addReplyBulkLen(c,ele);
3549 addReply(c,ele);
3550 addReply(c,shared.crlf);
3551 dictDelete(set->ptr,ele);
3552 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3553 server.dirty++;
3554 }
3555 }
3556 }
3557
3558 static void srandmemberCommand(redisClient *c) {
3559 robj *set;
3560 dictEntry *de;
3561
3562 set = lookupKeyRead(c->db,c->argv[1]);
3563 if (set == NULL) {
3564 addReply(c,shared.nullbulk);
3565 } else {
3566 if (set->type != REDIS_SET) {
3567 addReply(c,shared.wrongtypeerr);
3568 return;
3569 }
3570 de = dictGetRandomKey(set->ptr);
3571 if (de == NULL) {
3572 addReply(c,shared.nullbulk);
3573 } else {
3574 robj *ele = dictGetEntryKey(de);
3575
3576 addReplyBulkLen(c,ele);
3577 addReply(c,ele);
3578 addReply(c,shared.crlf);
3579 }
3580 }
3581 }
3582
3583 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3584 dict **d1 = (void*) s1, **d2 = (void*) s2;
3585
3586 return dictSize(*d1)-dictSize(*d2);
3587 }
3588
3589 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3590 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3591 dictIterator *di;
3592 dictEntry *de;
3593 robj *lenobj = NULL, *dstset = NULL;
3594 int j, cardinality = 0;
3595
3596 for (j = 0; j < setsnum; j++) {
3597 robj *setobj;
3598
3599 setobj = dstkey ?
3600 lookupKeyWrite(c->db,setskeys[j]) :
3601 lookupKeyRead(c->db,setskeys[j]);
3602 if (!setobj) {
3603 zfree(dv);
3604 if (dstkey) {
3605 deleteKey(c->db,dstkey);
3606 addReply(c,shared.ok);
3607 } else {
3608 addReply(c,shared.nullmultibulk);
3609 }
3610 return;
3611 }
3612 if (setobj->type != REDIS_SET) {
3613 zfree(dv);
3614 addReply(c,shared.wrongtypeerr);
3615 return;
3616 }
3617 dv[j] = setobj->ptr;
3618 }
3619 /* Sort sets from the smallest to largest, this will improve our
3620 * algorithm's performace */
3621 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3622
3623 /* The first thing we should output is the total number of elements...
3624 * since this is a multi-bulk write, but at this stage we don't know
3625 * the intersection set size, so we use a trick, append an empty object
3626 * to the output list and save the pointer to later modify it with the
3627 * right length */
3628 if (!dstkey) {
3629 lenobj = createObject(REDIS_STRING,NULL);
3630 addReply(c,lenobj);
3631 decrRefCount(lenobj);
3632 } else {
3633 /* If we have a target key where to store the resulting set
3634 * create this key with an empty set inside */
3635 dstset = createSetObject();
3636 }
3637
3638 /* Iterate all the elements of the first (smallest) set, and test
3639 * the element against all the other sets, if at least one set does
3640 * not include the element it is discarded */
3641 di = dictGetIterator(dv[0]);
3642
3643 while((de = dictNext(di)) != NULL) {
3644 robj *ele;
3645
3646 for (j = 1; j < setsnum; j++)
3647 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3648 if (j != setsnum)
3649 continue; /* at least one set does not contain the member */
3650 ele = dictGetEntryKey(de);
3651 if (!dstkey) {
3652 addReplyBulkLen(c,ele);
3653 addReply(c,ele);
3654 addReply(c,shared.crlf);
3655 cardinality++;
3656 } else {
3657 dictAdd(dstset->ptr,ele,NULL);
3658 incrRefCount(ele);
3659 }
3660 }
3661 dictReleaseIterator(di);
3662
3663 if (dstkey) {
3664 /* Store the resulting set into the target */
3665 deleteKey(c->db,dstkey);
3666 dictAdd(c->db->dict,dstkey,dstset);
3667 incrRefCount(dstkey);
3668 }
3669
3670 if (!dstkey) {
3671 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3672 } else {
3673 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3674 dictSize((dict*)dstset->ptr)));
3675 server.dirty++;
3676 }
3677 zfree(dv);
3678 }
3679
3680 static void sinterCommand(redisClient *c) {
3681 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3682 }
3683
3684 static void sinterstoreCommand(redisClient *c) {
3685 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3686 }
3687
3688 #define REDIS_OP_UNION 0
3689 #define REDIS_OP_DIFF 1
3690
3691 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3692 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3693 dictIterator *di;
3694 dictEntry *de;
3695 robj *dstset = NULL;
3696 int j, cardinality = 0;
3697
3698 for (j = 0; j < setsnum; j++) {
3699 robj *setobj;
3700
3701 setobj = dstkey ?
3702 lookupKeyWrite(c->db,setskeys[j]) :
3703 lookupKeyRead(c->db,setskeys[j]);
3704 if (!setobj) {
3705 dv[j] = NULL;
3706 continue;
3707 }
3708 if (setobj->type != REDIS_SET) {
3709 zfree(dv);
3710 addReply(c,shared.wrongtypeerr);
3711 return;
3712 }
3713 dv[j] = setobj->ptr;
3714 }
3715
3716 /* We need a temp set object to store our union. If the dstkey
3717 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3718 * this set object will be the resulting object to set into the target key*/
3719 dstset = createSetObject();
3720
3721 /* Iterate all the elements of all the sets, add every element a single
3722 * time to the result set */
3723 for (j = 0; j < setsnum; j++) {
3724 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3725 if (!dv[j]) continue; /* non existing keys are like empty sets */
3726
3727 di = dictGetIterator(dv[j]);
3728
3729 while((de = dictNext(di)) != NULL) {
3730 robj *ele;
3731
3732 /* dictAdd will not add the same element multiple times */
3733 ele = dictGetEntryKey(de);
3734 if (op == REDIS_OP_UNION || j == 0) {
3735 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3736 incrRefCount(ele);
3737 cardinality++;
3738 }
3739 } else if (op == REDIS_OP_DIFF) {
3740 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3741 cardinality--;
3742 }
3743 }
3744 }
3745 dictReleaseIterator(di);
3746
3747 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3748 }
3749
3750 /* Output the content of the resulting set, if not in STORE mode */
3751 if (!dstkey) {
3752 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3753 di = dictGetIterator(dstset->ptr);
3754 while((de = dictNext(di)) != NULL) {
3755 robj *ele;
3756
3757 ele = dictGetEntryKey(de);
3758 addReplyBulkLen(c,ele);
3759 addReply(c,ele);
3760 addReply(c,shared.crlf);
3761 }
3762 dictReleaseIterator(di);
3763 } else {
3764 /* If we have a target key where to store the resulting set
3765 * create this key with the result set inside */
3766 deleteKey(c->db,dstkey);
3767 dictAdd(c->db->dict,dstkey,dstset);
3768 incrRefCount(dstkey);
3769 }
3770
3771 /* Cleanup */
3772 if (!dstkey) {
3773 decrRefCount(dstset);
3774 } else {
3775 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3776 dictSize((dict*)dstset->ptr)));
3777 server.dirty++;
3778 }
3779 zfree(dv);
3780 }
3781
3782 static void sunionCommand(redisClient *c) {
3783 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3784 }
3785
3786 static void sunionstoreCommand(redisClient *c) {
3787 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3788 }
3789
3790 static void sdiffCommand(redisClient *c) {
3791 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3792 }
3793
3794 static void sdiffstoreCommand(redisClient *c) {
3795 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3796 }
3797
3798 /* ==================================== ZSets =============================== */
3799
3800 /* ZSETs are ordered sets using two data structures to hold the same elements
3801 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3802 * data structure.
3803 *
3804 * The elements are added to an hash table mapping Redis objects to scores.
3805 * At the same time the elements are added to a skip list mapping scores
3806 * to Redis objects (so objects are sorted by scores in this "view"). */
3807
3808 /* This skiplist implementation is almost a C translation of the original
3809 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3810 * Alternative to Balanced Trees", modified in three ways:
3811 * a) this implementation allows for repeated values.
3812 * b) the comparison is not just by key (our 'score') but by satellite data.
3813 * c) there is a back pointer, so it's a doubly linked list with the back
3814 * pointers being only at "level 1". This allows to traverse the list
3815 * from tail to head, useful for ZREVRANGE. */
3816
3817 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3818 zskiplistNode *zn = zmalloc(sizeof(*zn));
3819
3820 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3821 zn->score = score;
3822 zn->obj = obj;
3823 return zn;
3824 }
3825
3826 static zskiplist *zslCreate(void) {
3827 int j;
3828 zskiplist *zsl;
3829
3830 zsl = zmalloc(sizeof(*zsl));
3831 zsl->level = 1;
3832 zsl->length = 0;
3833 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3834 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3835 zsl->header->forward[j] = NULL;
3836 zsl->header->backward = NULL;
3837 zsl->tail = NULL;
3838 return zsl;
3839 }
3840
3841 static void zslFreeNode(zskiplistNode *node) {
3842 decrRefCount(node->obj);
3843 zfree(node->forward);
3844 zfree(node);
3845 }
3846
3847 static void zslFree(zskiplist *zsl) {
3848 zskiplistNode *node = zsl->header->forward[0], *next;
3849
3850 zfree(zsl->header->forward);
3851 zfree(zsl->header);
3852 while(node) {
3853 next = node->forward[0];
3854 zslFreeNode(node);
3855 node = next;
3856 }
3857 zfree(zsl);
3858 }
3859
3860 static int zslRandomLevel(void) {
3861 int level = 1;
3862 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3863 level += 1;
3864 return level;
3865 }
3866
3867 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3868 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3869 int i, level;
3870
3871 x = zsl->header;
3872 for (i = zsl->level-1; i >= 0; i--) {
3873 while (x->forward[i] &&
3874 (x->forward[i]->score < score ||
3875 (x->forward[i]->score == score &&
3876 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3877 x = x->forward[i];
3878 update[i] = x;
3879 }
3880 /* we assume the key is not already inside, since we allow duplicated
3881 * scores, and the re-insertion of score and redis object should never
3882 * happpen since the caller of zslInsert() should test in the hash table
3883 * if the element is already inside or not. */
3884 level = zslRandomLevel();
3885 if (level > zsl->level) {
3886 for (i = zsl->level; i < level; i++)
3887 update[i] = zsl->header;
3888 zsl->level = level;
3889 }
3890 x = zslCreateNode(level,score,obj);
3891 for (i = 0; i < level; i++) {
3892 x->forward[i] = update[i]->forward[i];
3893 update[i]->forward[i] = x;
3894 }
3895 x->backward = (update[0] == zsl->header) ? NULL : update[0];
3896 if (x->forward[0])
3897 x->forward[0]->backward = x;
3898 else
3899 zsl->tail = x;
3900 zsl->length++;
3901 }
3902
3903 /* Delete an element with matching score/object from the skiplist. */
3904 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
3905 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3906 int i;
3907
3908 x = zsl->header;
3909 for (i = zsl->level-1; i >= 0; i--) {
3910 while (x->forward[i] &&
3911 (x->forward[i]->score < score ||
3912 (x->forward[i]->score == score &&
3913 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3914 x = x->forward[i];
3915 update[i] = x;
3916 }
3917 /* We may have multiple elements with the same score, what we need
3918 * is to find the element with both the right score and object. */
3919 x = x->forward[0];
3920 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
3921 for (i = 0; i < zsl->level; i++) {
3922 if (update[i]->forward[i] != x) break;
3923 update[i]->forward[i] = x->forward[i];
3924 }
3925 if (x->forward[0]) {
3926 x->forward[0]->backward = (x->backward == zsl->header) ?
3927 NULL : x->backward;
3928 } else {
3929 zsl->tail = x->backward;
3930 }
3931 zslFreeNode(x);
3932 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
3933 zsl->level--;
3934 zsl->length--;
3935 return 1;
3936 } else {
3937 return 0; /* not found */
3938 }
3939 return 0; /* not found */
3940 }
3941
3942 /* Find the first node having a score equal or greater than the specified one.
3943 * Returns NULL if there is no match. */
3944 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
3945 zskiplistNode *x;
3946 int i;
3947
3948 x = zsl->header;
3949 for (i = zsl->level-1; i >= 0; i--) {
3950 while (x->forward[i] && x->forward[i]->score < score)
3951 x = x->forward[i];
3952 }
3953 /* We may have multiple elements with the same score, what we need
3954 * is to find the element with both the right score and object. */
3955 return x->forward[0];
3956 }
3957
3958 /* The actual Z-commands implementations */
3959
3960 static void zaddCommand(redisClient *c) {
3961 robj *zsetobj;
3962 zset *zs;
3963 double *score;
3964
3965 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
3966 if (zsetobj == NULL) {
3967 zsetobj = createZsetObject();
3968 dictAdd(c->db->dict,c->argv[1],zsetobj);
3969 incrRefCount(c->argv[1]);
3970 } else {
3971 if (zsetobj->type != REDIS_ZSET) {
3972 addReply(c,shared.wrongtypeerr);
3973 return;
3974 }
3975 }
3976 score = zmalloc(sizeof(double));
3977 *score = strtod(c->argv[2]->ptr,NULL);
3978 zs = zsetobj->ptr;
3979 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
3980 /* case 1: New element */
3981 incrRefCount(c->argv[3]); /* added to hash */
3982 zslInsert(zs->zsl,*score,c->argv[3]);
3983 incrRefCount(c->argv[3]); /* added to skiplist */
3984 server.dirty++;
3985 addReply(c,shared.cone);
3986 } else {
3987 dictEntry *de;
3988 double *oldscore;
3989
3990 /* case 2: Score update operation */
3991 de = dictFind(zs->dict,c->argv[3]);
3992 assert(de != NULL);
3993 oldscore = dictGetEntryVal(de);
3994 if (*score != *oldscore) {
3995 int deleted;
3996
3997 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
3998 assert(deleted != 0);
3999 zslInsert(zs->zsl,*score,c->argv[3]);
4000 incrRefCount(c->argv[3]);
4001 dictReplace(zs->dict,c->argv[3],score);
4002 server.dirty++;
4003 } else {
4004 zfree(score);
4005 }
4006 addReply(c,shared.czero);
4007 }
4008 }
4009
4010 static void zremCommand(redisClient *c) {
4011 robj *zsetobj;
4012 zset *zs;
4013
4014 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4015 if (zsetobj == NULL) {
4016 addReply(c,shared.czero);
4017 } else {
4018 dictEntry *de;
4019 double *oldscore;
4020 int deleted;
4021
4022 if (zsetobj->type != REDIS_ZSET) {
4023 addReply(c,shared.wrongtypeerr);
4024 return;
4025 }
4026 zs = zsetobj->ptr;
4027 de = dictFind(zs->dict,c->argv[2]);
4028 if (de == NULL) {
4029 addReply(c,shared.czero);
4030 return;
4031 }
4032 /* Delete from the skiplist */
4033 oldscore = dictGetEntryVal(de);
4034 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4035 assert(deleted != 0);
4036
4037 /* Delete from the hash table */
4038 dictDelete(zs->dict,c->argv[2]);
4039 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4040 server.dirty++;
4041 addReply(c,shared.cone);
4042 }
4043 }
4044
4045 static void zrangeGenericCommand(redisClient *c, int reverse) {
4046 robj *o;
4047 int start = atoi(c->argv[2]->ptr);
4048 int end = atoi(c->argv[3]->ptr);
4049
4050 o = lookupKeyRead(c->db,c->argv[1]);
4051 if (o == NULL) {
4052 addReply(c,shared.nullmultibulk);
4053 } else {
4054 if (o->type != REDIS_ZSET) {
4055 addReply(c,shared.wrongtypeerr);
4056 } else {
4057 zset *zsetobj = o->ptr;
4058 zskiplist *zsl = zsetobj->zsl;
4059 zskiplistNode *ln;
4060
4061 int llen = zsl->length;
4062 int rangelen, j;
4063 robj *ele;
4064
4065 /* convert negative indexes */
4066 if (start < 0) start = llen+start;
4067 if (end < 0) end = llen+end;
4068 if (start < 0) start = 0;
4069 if (end < 0) end = 0;
4070
4071 /* indexes sanity checks */
4072 if (start > end || start >= llen) {
4073 /* Out of range start or start > end result in empty list */
4074 addReply(c,shared.emptymultibulk);
4075 return;
4076 }
4077 if (end >= llen) end = llen-1;
4078 rangelen = (end-start)+1;
4079
4080 /* Return the result in form of a multi-bulk reply */
4081 if (reverse) {
4082 ln = zsl->tail;
4083 while (start--)
4084 ln = ln->backward;
4085 } else {
4086 ln = zsl->header->forward[0];
4087 while (start--)
4088 ln = ln->forward[0];
4089 }
4090
4091 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4092 for (j = 0; j < rangelen; j++) {
4093 ele = ln->obj;
4094 addReplyBulkLen(c,ele);
4095 addReply(c,ele);
4096 addReply(c,shared.crlf);
4097 ln = reverse ? ln->backward : ln->forward[0];
4098 }
4099 }
4100 }
4101 }
4102
4103 static void zrangeCommand(redisClient *c) {
4104 zrangeGenericCommand(c,0);
4105 }
4106
4107 static void zrevrangeCommand(redisClient *c) {
4108 zrangeGenericCommand(c,1);
4109 }
4110
4111 static void zrangebyscoreCommand(redisClient *c) {
4112 robj *o;
4113 double min = strtod(c->argv[2]->ptr,NULL);
4114 double max = strtod(c->argv[3]->ptr,NULL);
4115
4116 o = lookupKeyRead(c->db,c->argv[1]);
4117 if (o == NULL) {
4118 addReply(c,shared.nullmultibulk);
4119 } else {
4120 if (o->type != REDIS_ZSET) {
4121 addReply(c,shared.wrongtypeerr);
4122 } else {
4123 zset *zsetobj = o->ptr;
4124 zskiplist *zsl = zsetobj->zsl;
4125 zskiplistNode *ln;
4126 robj *ele, *lenobj;
4127 unsigned int rangelen = 0;
4128
4129 /* Get the first node with the score >= min */
4130 ln = zslFirstWithScore(zsl,min);
4131 if (ln == NULL) {
4132 /* No element matching the speciifed interval */
4133 addReply(c,shared.emptymultibulk);
4134 return;
4135 }
4136
4137 /* We don't know in advance how many matching elements there
4138 * are in the list, so we push this object that will represent
4139 * the multi-bulk length in the output buffer, and will "fix"
4140 * it later */
4141 lenobj = createObject(REDIS_STRING,NULL);
4142 addReply(c,lenobj);
4143
4144 while(ln && ln->score <= max) {
4145 ele = ln->obj;
4146 addReplyBulkLen(c,ele);
4147 addReply(c,ele);
4148 addReply(c,shared.crlf);
4149 ln = ln->forward[0];
4150 rangelen++;
4151 }
4152 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4153 }
4154 }
4155 }
4156
4157 static void zlenCommand(redisClient *c) {
4158 robj *o;
4159 zset *zs;
4160
4161 o = lookupKeyRead(c->db,c->argv[1]);
4162 if (o == NULL) {
4163 addReply(c,shared.czero);
4164 return;
4165 } else {
4166 if (o->type != REDIS_ZSET) {
4167 addReply(c,shared.wrongtypeerr);
4168 } else {
4169 zs = o->ptr;
4170 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4171 }
4172 }
4173 }
4174
4175 static void zscoreCommand(redisClient *c) {
4176 robj *o;
4177 zset *zs;
4178
4179 o = lookupKeyRead(c->db,c->argv[1]);
4180 if (o == NULL) {
4181 addReply(c,shared.czero);
4182 return;
4183 } else {
4184 if (o->type != REDIS_ZSET) {
4185 addReply(c,shared.wrongtypeerr);
4186 } else {
4187 dictEntry *de;
4188
4189 zs = o->ptr;
4190 de = dictFind(zs->dict,c->argv[2]);
4191 if (!de) {
4192 addReply(c,shared.nullbulk);
4193 } else {
4194 char buf[128];
4195 double *score = dictGetEntryVal(de);
4196
4197 snprintf(buf,sizeof(buf),"%.16g",*score);
4198 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
4199 strlen(buf),buf));
4200 }
4201 }
4202 }
4203 }
4204
4205 /* ========================= Non type-specific commands ==================== */
4206
4207 static void flushdbCommand(redisClient *c) {
4208 server.dirty += dictSize(c->db->dict);
4209 dictEmpty(c->db->dict);
4210 dictEmpty(c->db->expires);
4211 addReply(c,shared.ok);
4212 }
4213
4214 static void flushallCommand(redisClient *c) {
4215 server.dirty += emptyDb();
4216 addReply(c,shared.ok);
4217 rdbSave(server.dbfilename);
4218 server.dirty++;
4219 }
4220
4221 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4222 redisSortOperation *so = zmalloc(sizeof(*so));
4223 so->type = type;
4224 so->pattern = pattern;
4225 return so;
4226 }
4227
4228 /* Return the value associated to the key with a name obtained
4229 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4230 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4231 char *p;
4232 sds spat, ssub;
4233 robj keyobj;
4234 int prefixlen, sublen, postfixlen;
4235 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4236 struct {
4237 long len;
4238 long free;
4239 char buf[REDIS_SORTKEY_MAX+1];
4240 } keyname;
4241
4242 if (subst->encoding == REDIS_ENCODING_RAW)
4243 incrRefCount(subst);
4244 else {
4245 subst = getDecodedObject(subst);
4246 }
4247
4248 spat = pattern->ptr;
4249 ssub = subst->ptr;
4250 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4251 p = strchr(spat,'*');
4252 if (!p) return NULL;
4253
4254 prefixlen = p-spat;
4255 sublen = sdslen(ssub);
4256 postfixlen = sdslen(spat)-(prefixlen+1);
4257 memcpy(keyname.buf,spat,prefixlen);
4258 memcpy(keyname.buf+prefixlen,ssub,sublen);
4259 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4260 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4261 keyname.len = prefixlen+sublen+postfixlen;
4262
4263 keyobj.refcount = 1;
4264 keyobj.type = REDIS_STRING;
4265 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4266
4267 decrRefCount(subst);
4268
4269 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4270 return lookupKeyRead(db,&keyobj);
4271 }
4272
4273 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4274 * the additional parameter is not standard but a BSD-specific we have to
4275 * pass sorting parameters via the global 'server' structure */
4276 static int sortCompare(const void *s1, const void *s2) {
4277 const redisSortObject *so1 = s1, *so2 = s2;
4278 int cmp;
4279
4280 if (!server.sort_alpha) {
4281 /* Numeric sorting. Here it's trivial as we precomputed scores */
4282 if (so1->u.score > so2->u.score) {
4283 cmp = 1;
4284 } else if (so1->u.score < so2->u.score) {
4285 cmp = -1;
4286 } else {
4287 cmp = 0;
4288 }
4289 } else {
4290 /* Alphanumeric sorting */
4291 if (server.sort_bypattern) {
4292 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4293 /* At least one compare object is NULL */
4294 if (so1->u.cmpobj == so2->u.cmpobj)
4295 cmp = 0;
4296 else if (so1->u.cmpobj == NULL)
4297 cmp = -1;
4298 else
4299 cmp = 1;
4300 } else {
4301 /* We have both the objects, use strcoll */
4302 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4303 }
4304 } else {
4305 /* Compare elements directly */
4306 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4307 so2->obj->encoding == REDIS_ENCODING_RAW) {
4308 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4309 } else {
4310 robj *dec1, *dec2;
4311
4312 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4313 so1->obj : getDecodedObject(so1->obj);
4314 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4315 so2->obj : getDecodedObject(so2->obj);
4316 cmp = strcoll(dec1->ptr,dec2->ptr);
4317 if (dec1 != so1->obj) decrRefCount(dec1);
4318 if (dec2 != so2->obj) decrRefCount(dec2);
4319 }
4320 }
4321 }
4322 return server.sort_desc ? -cmp : cmp;
4323 }
4324
4325 /* The SORT command is the most complex command in Redis. Warning: this code
4326 * is optimized for speed and a bit less for readability */
4327 static void sortCommand(redisClient *c) {
4328 list *operations;
4329 int outputlen = 0;
4330 int desc = 0, alpha = 0;
4331 int limit_start = 0, limit_count = -1, start, end;
4332 int j, dontsort = 0, vectorlen;
4333 int getop = 0; /* GET operation counter */
4334 robj *sortval, *sortby = NULL;
4335 redisSortObject *vector; /* Resulting vector to sort */
4336
4337 /* Lookup the key to sort. It must be of the right types */
4338 sortval = lookupKeyRead(c->db,c->argv[1]);
4339 if (sortval == NULL) {
4340 addReply(c,shared.nokeyerr);
4341 return;
4342 }
4343 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4344 addReply(c,shared.wrongtypeerr);
4345 return;
4346 }
4347
4348 /* Create a list of operations to perform for every sorted element.
4349 * Operations can be GET/DEL/INCR/DECR */
4350 operations = listCreate();
4351 listSetFreeMethod(operations,zfree);
4352 j = 2;
4353
4354 /* Now we need to protect sortval incrementing its count, in the future
4355 * SORT may have options able to overwrite/delete keys during the sorting
4356 * and the sorted key itself may get destroied */
4357 incrRefCount(sortval);
4358
4359 /* The SORT command has an SQL-alike syntax, parse it */
4360 while(j < c->argc) {
4361 int leftargs = c->argc-j-1;
4362 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4363 desc = 0;
4364 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4365 desc = 1;
4366 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4367 alpha = 1;
4368 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4369 limit_start = atoi(c->argv[j+1]->ptr);
4370 limit_count = atoi(c->argv[j+2]->ptr);
4371 j+=2;
4372 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4373 sortby = c->argv[j+1];
4374 /* If the BY pattern does not contain '*', i.e. it is constant,
4375 * we don't need to sort nor to lookup the weight keys. */
4376 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4377 j++;
4378 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4379 listAddNodeTail(operations,createSortOperation(
4380 REDIS_SORT_GET,c->argv[j+1]));
4381 getop++;
4382 j++;
4383 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
4384 listAddNodeTail(operations,createSortOperation(
4385 REDIS_SORT_DEL,c->argv[j+1]));
4386 j++;
4387 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
4388 listAddNodeTail(operations,createSortOperation(
4389 REDIS_SORT_INCR,c->argv[j+1]));
4390 j++;
4391 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4392 listAddNodeTail(operations,createSortOperation(
4393 REDIS_SORT_DECR,c->argv[j+1]));
4394 j++;
4395 } else {
4396 decrRefCount(sortval);
4397 listRelease(operations);
4398 addReply(c,shared.syntaxerr);
4399 return;
4400 }
4401 j++;
4402 }
4403
4404 /* Load the sorting vector with all the objects to sort */
4405 vectorlen = (sortval->type == REDIS_LIST) ?
4406 listLength((list*)sortval->ptr) :
4407 dictSize((dict*)sortval->ptr);
4408 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4409 j = 0;
4410 if (sortval->type == REDIS_LIST) {
4411 list *list = sortval->ptr;
4412 listNode *ln;
4413
4414 listRewind(list);
4415 while((ln = listYield(list))) {
4416 robj *ele = ln->value;
4417 vector[j].obj = ele;
4418 vector[j].u.score = 0;
4419 vector[j].u.cmpobj = NULL;
4420 j++;
4421 }
4422 } else {
4423 dict *set = sortval->ptr;
4424 dictIterator *di;
4425 dictEntry *setele;
4426
4427 di = dictGetIterator(set);
4428 while((setele = dictNext(di)) != NULL) {
4429 vector[j].obj = dictGetEntryKey(setele);
4430 vector[j].u.score = 0;
4431 vector[j].u.cmpobj = NULL;
4432 j++;
4433 }
4434 dictReleaseIterator(di);
4435 }
4436 assert(j == vectorlen);
4437
4438 /* Now it's time to load the right scores in the sorting vector */
4439 if (dontsort == 0) {
4440 for (j = 0; j < vectorlen; j++) {
4441 if (sortby) {
4442 robj *byval;
4443
4444 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4445 if (!byval || byval->type != REDIS_STRING) continue;
4446 if (alpha) {
4447 if (byval->encoding == REDIS_ENCODING_RAW) {
4448 vector[j].u.cmpobj = byval;
4449 incrRefCount(byval);
4450 } else {
4451 vector[j].u.cmpobj = getDecodedObject(byval);
4452 }
4453 } else {
4454 if (byval->encoding == REDIS_ENCODING_RAW) {
4455 vector[j].u.score = strtod(byval->ptr,NULL);
4456 } else {
4457 if (byval->encoding == REDIS_ENCODING_INT) {
4458 vector[j].u.score = (long)byval->ptr;
4459 } else
4460 assert(1 != 1);
4461 }
4462 }
4463 } else {
4464 if (!alpha) {
4465 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4466 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4467 else {
4468 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4469 vector[j].u.score = (long) vector[j].obj->ptr;
4470 else
4471 assert(1 != 1);
4472 }
4473 }
4474 }
4475 }
4476 }
4477
4478 /* We are ready to sort the vector... perform a bit of sanity check
4479 * on the LIMIT option too. We'll use a partial version of quicksort. */
4480 start = (limit_start < 0) ? 0 : limit_start;
4481 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4482 if (start >= vectorlen) {
4483 start = vectorlen-1;
4484 end = vectorlen-2;
4485 }
4486 if (end >= vectorlen) end = vectorlen-1;
4487
4488 if (dontsort == 0) {
4489 server.sort_desc = desc;
4490 server.sort_alpha = alpha;
4491 server.sort_bypattern = sortby ? 1 : 0;
4492 if (sortby && (start != 0 || end != vectorlen-1))
4493 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4494 else
4495 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4496 }
4497
4498 /* Send command output to the output buffer, performing the specified
4499 * GET/DEL/INCR/DECR operations if any. */
4500 outputlen = getop ? getop*(end-start+1) : end-start+1;
4501 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4502 for (j = start; j <= end; j++) {
4503 listNode *ln;
4504 if (!getop) {
4505 addReplyBulkLen(c,vector[j].obj);
4506 addReply(c,vector[j].obj);
4507 addReply(c,shared.crlf);
4508 }
4509 listRewind(operations);
4510 while((ln = listYield(operations))) {
4511 redisSortOperation *sop = ln->value;
4512 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4513 vector[j].obj);
4514
4515 if (sop->type == REDIS_SORT_GET) {
4516 if (!val || val->type != REDIS_STRING) {
4517 addReply(c,shared.nullbulk);
4518 } else {
4519 addReplyBulkLen(c,val);
4520 addReply(c,val);
4521 addReply(c,shared.crlf);
4522 }
4523 } else if (sop->type == REDIS_SORT_DEL) {
4524 /* TODO */
4525 }
4526 }
4527 }
4528
4529 /* Cleanup */
4530 decrRefCount(sortval);
4531 listRelease(operations);
4532 for (j = 0; j < vectorlen; j++) {
4533 if (sortby && alpha && vector[j].u.cmpobj)
4534 decrRefCount(vector[j].u.cmpobj);
4535 }
4536 zfree(vector);
4537 }
4538
4539 static void infoCommand(redisClient *c) {
4540 sds info;
4541 time_t uptime = time(NULL)-server.stat_starttime;
4542 int j;
4543
4544 info = sdscatprintf(sdsempty(),
4545 "redis_version:%s\r\n"
4546 "arch_bits:%s\r\n"
4547 "uptime_in_seconds:%d\r\n"
4548 "uptime_in_days:%d\r\n"
4549 "connected_clients:%d\r\n"
4550 "connected_slaves:%d\r\n"
4551 "used_memory:%zu\r\n"
4552 "changes_since_last_save:%lld\r\n"
4553 "bgsave_in_progress:%d\r\n"
4554 "last_save_time:%d\r\n"
4555 "total_connections_received:%lld\r\n"
4556 "total_commands_processed:%lld\r\n"
4557 "role:%s\r\n"
4558 ,REDIS_VERSION,
4559 (sizeof(long) == 8) ? "64" : "32",
4560 uptime,
4561 uptime/(3600*24),
4562 listLength(server.clients)-listLength(server.slaves),
4563 listLength(server.slaves),
4564 server.usedmemory,
4565 server.dirty,
4566 server.bgsaveinprogress,
4567 server.lastsave,
4568 server.stat_numconnections,
4569 server.stat_numcommands,
4570 server.masterhost == NULL ? "master" : "slave"
4571 );
4572 if (server.masterhost) {
4573 info = sdscatprintf(info,
4574 "master_host:%s\r\n"
4575 "master_port:%d\r\n"
4576 "master_link_status:%s\r\n"
4577 "master_last_io_seconds_ago:%d\r\n"
4578 ,server.masterhost,
4579 server.masterport,
4580 (server.replstate == REDIS_REPL_CONNECTED) ?
4581 "up" : "down",
4582 (int)(time(NULL)-server.master->lastinteraction)
4583 );
4584 }
4585 for (j = 0; j < server.dbnum; j++) {
4586 long long keys, vkeys;
4587
4588 keys = dictSize(server.db[j].dict);
4589 vkeys = dictSize(server.db[j].expires);
4590 if (keys || vkeys) {
4591 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4592 j, keys, vkeys);
4593 }
4594 }
4595 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4596 addReplySds(c,info);
4597 addReply(c,shared.crlf);
4598 }
4599
4600 static void monitorCommand(redisClient *c) {
4601 /* ignore MONITOR if aleady slave or in monitor mode */
4602 if (c->flags & REDIS_SLAVE) return;
4603
4604 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4605 c->slaveseldb = 0;
4606 listAddNodeTail(server.monitors,c);
4607 addReply(c,shared.ok);
4608 }
4609
4610 /* ================================= Expire ================================= */
4611 static int removeExpire(redisDb *db, robj *key) {
4612 if (dictDelete(db->expires,key) == DICT_OK) {
4613 return 1;
4614 } else {
4615 return 0;
4616 }
4617 }
4618
4619 static int setExpire(redisDb *db, robj *key, time_t when) {
4620 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4621 return 0;
4622 } else {
4623 incrRefCount(key);
4624 return 1;
4625 }
4626 }
4627
4628 /* Return the expire time of the specified key, or -1 if no expire
4629 * is associated with this key (i.e. the key is non volatile) */
4630 static time_t getExpire(redisDb *db, robj *key) {
4631 dictEntry *de;
4632
4633 /* No expire? return ASAP */
4634 if (dictSize(db->expires) == 0 ||
4635 (de = dictFind(db->expires,key)) == NULL) return -1;
4636
4637 return (time_t) dictGetEntryVal(de);
4638 }
4639
4640 static int expireIfNeeded(redisDb *db, robj *key) {
4641 time_t when;
4642 dictEntry *de;
4643
4644 /* No expire? return ASAP */
4645 if (dictSize(db->expires) == 0 ||
4646 (de = dictFind(db->expires,key)) == NULL) return 0;
4647
4648 /* Lookup the expire */
4649 when = (time_t) dictGetEntryVal(de);
4650 if (time(NULL) <= when) return 0;
4651
4652 /* Delete the key */
4653 dictDelete(db->expires,key);
4654 return dictDelete(db->dict,key) == DICT_OK;
4655 }
4656
4657 static int deleteIfVolatile(redisDb *db, robj *key) {
4658 dictEntry *de;
4659
4660 /* No expire? return ASAP */
4661 if (dictSize(db->expires) == 0 ||
4662 (de = dictFind(db->expires,key)) == NULL) return 0;
4663
4664 /* Delete the key */
4665 server.dirty++;
4666 dictDelete(db->expires,key);
4667 return dictDelete(db->dict,key) == DICT_OK;
4668 }
4669
4670 static void expireCommand(redisClient *c) {
4671 dictEntry *de;
4672 int seconds = atoi(c->argv[2]->ptr);
4673
4674 de = dictFind(c->db->dict,c->argv[1]);
4675 if (de == NULL) {
4676 addReply(c,shared.czero);
4677 return;
4678 }
4679 if (seconds <= 0) {
4680 addReply(c, shared.czero);
4681 return;
4682 } else {
4683 time_t when = time(NULL)+seconds;
4684 if (setExpire(c->db,c->argv[1],when)) {
4685 addReply(c,shared.cone);
4686 server.dirty++;
4687 } else {
4688 addReply(c,shared.czero);
4689 }
4690 return;
4691 }
4692 }
4693
4694 static void ttlCommand(redisClient *c) {
4695 time_t expire;
4696 int ttl = -1;
4697
4698 expire = getExpire(c->db,c->argv[1]);
4699 if (expire != -1) {
4700 ttl = (int) (expire-time(NULL));
4701 if (ttl < 0) ttl = -1;
4702 }
4703 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4704 }
4705
4706 static void msetGenericCommand(redisClient *c, int nx) {
4707 int j;
4708
4709 if ((c->argc % 2) == 0) {
4710 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4711 return;
4712 }
4713 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4714 * set nothing at all if at least one already key exists. */
4715 if (nx) {
4716 for (j = 1; j < c->argc; j += 2) {
4717 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4718 addReply(c, shared.czero);
4719 return;
4720 }
4721 }
4722 }
4723
4724 for (j = 1; j < c->argc; j += 2) {
4725 int retval;
4726
4727 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4728 if (retval == DICT_ERR) {
4729 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4730 incrRefCount(c->argv[j+1]);
4731 } else {
4732 incrRefCount(c->argv[j]);
4733 incrRefCount(c->argv[j+1]);
4734 }
4735 removeExpire(c->db,c->argv[j]);
4736 }
4737 server.dirty += (c->argc-1)/2;
4738 addReply(c, nx ? shared.cone : shared.ok);
4739 }
4740
4741 static void msetCommand(redisClient *c) {
4742 msetGenericCommand(c,0);
4743 }
4744
4745 static void msetnxCommand(redisClient *c) {
4746 msetGenericCommand(c,1);
4747 }
4748
4749 /* =============================== Replication ============================= */
4750
4751 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4752 ssize_t nwritten, ret = size;
4753 time_t start = time(NULL);
4754
4755 timeout++;
4756 while(size) {
4757 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4758 nwritten = write(fd,ptr,size);
4759 if (nwritten == -1) return -1;
4760 ptr += nwritten;
4761 size -= nwritten;
4762 }
4763 if ((time(NULL)-start) > timeout) {
4764 errno = ETIMEDOUT;
4765 return -1;
4766 }
4767 }
4768 return ret;
4769 }
4770
4771 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4772 ssize_t nread, totread = 0;
4773 time_t start = time(NULL);
4774
4775 timeout++;
4776 while(size) {
4777 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4778 nread = read(fd,ptr,size);
4779 if (nread == -1) return -1;
4780 ptr += nread;
4781 size -= nread;
4782 totread += nread;
4783 }
4784 if ((time(NULL)-start) > timeout) {
4785 errno = ETIMEDOUT;
4786 return -1;
4787 }
4788 }
4789 return totread;
4790 }
4791
4792 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4793 ssize_t nread = 0;
4794
4795 size--;
4796 while(size) {
4797 char c;
4798
4799 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4800 if (c == '\n') {
4801 *ptr = '\0';
4802 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4803 return nread;
4804 } else {
4805 *ptr++ = c;
4806 *ptr = '\0';
4807 nread++;
4808 }
4809 }
4810 return nread;
4811 }
4812
4813 static void syncCommand(redisClient *c) {
4814 /* ignore SYNC if aleady slave or in monitor mode */
4815 if (c->flags & REDIS_SLAVE) return;
4816
4817 /* SYNC can't be issued when the server has pending data to send to
4818 * the client about already issued commands. We need a fresh reply
4819 * buffer registering the differences between the BGSAVE and the current
4820 * dataset, so that we can copy to other slaves if needed. */
4821 if (listLength(c->reply) != 0) {
4822 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4823 return;
4824 }
4825
4826 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4827 /* Here we need to check if there is a background saving operation
4828 * in progress, or if it is required to start one */
4829 if (server.bgsaveinprogress) {
4830 /* Ok a background save is in progress. Let's check if it is a good
4831 * one for replication, i.e. if there is another slave that is
4832 * registering differences since the server forked to save */
4833 redisClient *slave;
4834 listNode *ln;
4835
4836 listRewind(server.slaves);
4837 while((ln = listYield(server.slaves))) {
4838 slave = ln->value;
4839 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4840 }
4841 if (ln) {
4842 /* Perfect, the server is already registering differences for
4843 * another slave. Set the right state, and copy the buffer. */
4844 listRelease(c->reply);
4845 c->reply = listDup(slave->reply);
4846 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4847 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4848 } else {
4849 /* No way, we need to wait for the next BGSAVE in order to
4850 * register differences */
4851 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4852 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4853 }
4854 } else {
4855 /* Ok we don't have a BGSAVE in progress, let's start one */
4856 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4857 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4858 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4859 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4860 return;
4861 }
4862 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4863 }
4864 c->repldbfd = -1;
4865 c->flags |= REDIS_SLAVE;
4866 c->slaveseldb = 0;
4867 listAddNodeTail(server.slaves,c);
4868 return;
4869 }
4870
4871 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4872 redisClient *slave = privdata;
4873 REDIS_NOTUSED(el);
4874 REDIS_NOTUSED(mask);
4875 char buf[REDIS_IOBUF_LEN];
4876 ssize_t nwritten, buflen;
4877
4878 if (slave->repldboff == 0) {
4879 /* Write the bulk write count before to transfer the DB. In theory here
4880 * we don't know how much room there is in the output buffer of the
4881 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4882 * operations) will never be smaller than the few bytes we need. */
4883 sds bulkcount;
4884
4885 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4886 slave->repldbsize);
4887 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4888 {
4889 sdsfree(bulkcount);
4890 freeClient(slave);
4891 return;
4892 }
4893 sdsfree(bulkcount);
4894 }
4895 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4896 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4897 if (buflen <= 0) {
4898 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4899 (buflen == 0) ? "premature EOF" : strerror(errno));
4900 freeClient(slave);
4901 return;
4902 }
4903 if ((nwritten = write(fd,buf,buflen)) == -1) {
4904 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4905 strerror(errno));
4906 freeClient(slave);
4907 return;
4908 }
4909 slave->repldboff += nwritten;
4910 if (slave->repldboff == slave->repldbsize) {
4911 close(slave->repldbfd);
4912 slave->repldbfd = -1;
4913 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4914 slave->replstate = REDIS_REPL_ONLINE;
4915 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4916 sendReplyToClient, slave, NULL) == AE_ERR) {
4917 freeClient(slave);
4918 return;
4919 }
4920 addReplySds(slave,sdsempty());
4921 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4922 }
4923 }
4924
4925 /* This function is called at the end of every backgrond saving.
4926 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4927 * otherwise REDIS_ERR is passed to the function.
4928 *
4929 * The goal of this function is to handle slaves waiting for a successful
4930 * background saving in order to perform non-blocking synchronization. */
4931 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4932 listNode *ln;
4933 int startbgsave = 0;
4934
4935 listRewind(server.slaves);
4936 while((ln = listYield(server.slaves))) {
4937 redisClient *slave = ln->value;
4938
4939 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4940 startbgsave = 1;
4941 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4942 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4943 struct redis_stat buf;
4944
4945 if (bgsaveerr != REDIS_OK) {
4946 freeClient(slave);
4947 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4948 continue;
4949 }
4950 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4951 redis_fstat(slave->repldbfd,&buf) == -1) {
4952 freeClient(slave);
4953 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4954 continue;
4955 }
4956 slave->repldboff = 0;
4957 slave->repldbsize = buf.st_size;
4958 slave->replstate = REDIS_REPL_SEND_BULK;
4959 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4960 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4961 freeClient(slave);
4962 continue;
4963 }
4964 }
4965 }
4966 if (startbgsave) {
4967 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4968 listRewind(server.slaves);
4969 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4970 while((ln = listYield(server.slaves))) {
4971 redisClient *slave = ln->value;
4972
4973 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4974 freeClient(slave);
4975 }
4976 }
4977 }
4978 }
4979
4980 static int syncWithMaster(void) {
4981 char buf[1024], tmpfile[256];
4982 int dumpsize;
4983 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4984 int dfd;
4985
4986 if (fd == -1) {
4987 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4988 strerror(errno));
4989 return REDIS_ERR;
4990 }
4991 /* Issue the SYNC command */
4992 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4993 close(fd);
4994 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4995 strerror(errno));
4996 return REDIS_ERR;
4997 }
4998 /* Read the bulk write count */
4999 if (syncReadLine(fd,buf,1024,3600) == -1) {
5000 close(fd);
5001 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5002 strerror(errno));
5003 return REDIS_ERR;
5004 }
5005 dumpsize = atoi(buf+1);
5006 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5007 /* Read the bulk write data on a temp file */
5008 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5009 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5010 if (dfd == -1) {
5011 close(fd);
5012 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5013 return REDIS_ERR;
5014 }
5015 while(dumpsize) {
5016 int nread, nwritten;
5017
5018 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5019 if (nread == -1) {
5020 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5021 strerror(errno));
5022 close(fd);
5023 close(dfd);
5024 return REDIS_ERR;
5025 }
5026 nwritten = write(dfd,buf,nread);
5027 if (nwritten == -1) {
5028 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5029 close(fd);
5030 close(dfd);
5031 return REDIS_ERR;
5032 }
5033 dumpsize -= nread;
5034 }
5035 close(dfd);
5036 if (rename(tmpfile,server.dbfilename) == -1) {
5037 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5038 unlink(tmpfile);
5039 close(fd);
5040 return REDIS_ERR;
5041 }
5042 emptyDb();
5043 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5044 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5045 close(fd);
5046 return REDIS_ERR;
5047 }
5048 server.master = createClient(fd);
5049 server.master->flags |= REDIS_MASTER;
5050 server.replstate = REDIS_REPL_CONNECTED;
5051 return REDIS_OK;
5052 }
5053
5054 static void slaveofCommand(redisClient *c) {
5055 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5056 !strcasecmp(c->argv[2]->ptr,"one")) {
5057 if (server.masterhost) {
5058 sdsfree(server.masterhost);
5059 server.masterhost = NULL;
5060 if (server.master) freeClient(server.master);
5061 server.replstate = REDIS_REPL_NONE;
5062 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5063 }
5064 } else {
5065 sdsfree(server.masterhost);
5066 server.masterhost = sdsdup(c->argv[1]->ptr);
5067 server.masterport = atoi(c->argv[2]->ptr);
5068 if (server.master) freeClient(server.master);
5069 server.replstate = REDIS_REPL_CONNECT;
5070 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5071 server.masterhost, server.masterport);
5072 }
5073 addReply(c,shared.ok);
5074 }
5075
5076 /* ============================ Maxmemory directive ======================== */
5077
5078 /* This function gets called when 'maxmemory' is set on the config file to limit
5079 * the max memory used by the server, and we are out of memory.
5080 * This function will try to, in order:
5081 *
5082 * - Free objects from the free list
5083 * - Try to remove keys with an EXPIRE set
5084 *
5085 * It is not possible to free enough memory to reach used-memory < maxmemory
5086 * the server will start refusing commands that will enlarge even more the
5087 * memory usage.
5088 */
5089 static void freeMemoryIfNeeded(void) {
5090 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5091 if (listLength(server.objfreelist)) {
5092 robj *o;
5093
5094 listNode *head = listFirst(server.objfreelist);
5095 o = listNodeValue(head);
5096 listDelNode(server.objfreelist,head);
5097 zfree(o);
5098 } else {
5099 int j, k, freed = 0;
5100
5101 for (j = 0; j < server.dbnum; j++) {
5102 int minttl = -1;
5103 robj *minkey = NULL;
5104 struct dictEntry *de;
5105
5106 if (dictSize(server.db[j].expires)) {
5107 freed = 1;
5108 /* From a sample of three keys drop the one nearest to
5109 * the natural expire */
5110 for (k = 0; k < 3; k++) {
5111 time_t t;
5112
5113 de = dictGetRandomKey(server.db[j].expires);
5114 t = (time_t) dictGetEntryVal(de);
5115 if (minttl == -1 || t < minttl) {
5116 minkey = dictGetEntryKey(de);
5117 minttl = t;
5118 }
5119 }
5120 deleteKey(server.db+j,minkey);
5121 }
5122 }
5123 if (!freed) return; /* nothing to free... */
5124 }
5125 }
5126 }
5127
5128 /* ================================= Debugging ============================== */
5129
5130 static void debugCommand(redisClient *c) {
5131 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
5132 *((char*)-1) = 'x';
5133 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
5134 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
5135 robj *key, *val;
5136
5137 if (!de) {
5138 addReply(c,shared.nokeyerr);
5139 return;
5140 }
5141 key = dictGetEntryKey(de);
5142 val = dictGetEntryVal(de);
5143 addReplySds(c,sdscatprintf(sdsempty(),
5144 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5145 key, key->refcount, val, val->refcount, val->encoding));
5146 } else {
5147 addReplySds(c,sdsnew(
5148 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5149 }
5150 }
5151
5152 #ifdef HAVE_BACKTRACE
5153 static struct redisFunctionSym symsTable[] = {
5154 {"compareStringObjects", (unsigned long)compareStringObjects},
5155 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
5156 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
5157 {"dictEncObjHash", (unsigned long)dictEncObjHash},
5158 {"incrDecrCommand", (unsigned long)incrDecrCommand},
5159 {"freeStringObject", (unsigned long)freeStringObject},
5160 {"freeListObject", (unsigned long)freeListObject},
5161 {"freeSetObject", (unsigned long)freeSetObject},
5162 {"decrRefCount", (unsigned long)decrRefCount},
5163 {"createObject", (unsigned long)createObject},
5164 {"freeClient", (unsigned long)freeClient},
5165 {"rdbLoad", (unsigned long)rdbLoad},
5166 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
5167 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
5168 {"addReply", (unsigned long)addReply},
5169 {"addReplySds", (unsigned long)addReplySds},
5170 {"incrRefCount", (unsigned long)incrRefCount},
5171 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
5172 {"createStringObject", (unsigned long)createStringObject},
5173 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
5174 {"syncWithMaster", (unsigned long)syncWithMaster},
5175 {"tryObjectSharing", (unsigned long)tryObjectSharing},
5176 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
5177 {"getDecodedObject", (unsigned long)getDecodedObject},
5178 {"removeExpire", (unsigned long)removeExpire},
5179 {"expireIfNeeded", (unsigned long)expireIfNeeded},
5180 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
5181 {"deleteKey", (unsigned long)deleteKey},
5182 {"getExpire", (unsigned long)getExpire},
5183 {"setExpire", (unsigned long)setExpire},
5184 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
5185 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
5186 {"authCommand", (unsigned long)authCommand},
5187 {"pingCommand", (unsigned long)pingCommand},
5188 {"echoCommand", (unsigned long)echoCommand},
5189 {"setCommand", (unsigned long)setCommand},
5190 {"setnxCommand", (unsigned long)setnxCommand},
5191 {"getCommand", (unsigned long)getCommand},
5192 {"delCommand", (unsigned long)delCommand},
5193 {"existsCommand", (unsigned long)existsCommand},
5194 {"incrCommand", (unsigned long)incrCommand},
5195 {"decrCommand", (unsigned long)decrCommand},
5196 {"incrbyCommand", (unsigned long)incrbyCommand},
5197 {"decrbyCommand", (unsigned long)decrbyCommand},
5198 {"selectCommand", (unsigned long)selectCommand},
5199 {"randomkeyCommand", (unsigned long)randomkeyCommand},
5200 {"keysCommand", (unsigned long)keysCommand},
5201 {"dbsizeCommand", (unsigned long)dbsizeCommand},
5202 {"lastsaveCommand", (unsigned long)lastsaveCommand},
5203 {"saveCommand", (unsigned long)saveCommand},
5204 {"bgsaveCommand", (unsigned long)bgsaveCommand},
5205 {"shutdownCommand", (unsigned long)shutdownCommand},
5206 {"moveCommand", (unsigned long)moveCommand},
5207 {"renameCommand", (unsigned long)renameCommand},
5208 {"renamenxCommand", (unsigned long)renamenxCommand},
5209 {"lpushCommand", (unsigned long)lpushCommand},
5210 {"rpushCommand", (unsigned long)rpushCommand},
5211 {"lpopCommand", (unsigned long)lpopCommand},
5212 {"rpopCommand", (unsigned long)rpopCommand},
5213 {"llenCommand", (unsigned long)llenCommand},
5214 {"lindexCommand", (unsigned long)lindexCommand},
5215 {"lrangeCommand", (unsigned long)lrangeCommand},
5216 {"ltrimCommand", (unsigned long)ltrimCommand},
5217 {"typeCommand", (unsigned long)typeCommand},
5218 {"lsetCommand", (unsigned long)lsetCommand},
5219 {"saddCommand", (unsigned long)saddCommand},
5220 {"sremCommand", (unsigned long)sremCommand},
5221 {"smoveCommand", (unsigned long)smoveCommand},
5222 {"sismemberCommand", (unsigned long)sismemberCommand},
5223 {"scardCommand", (unsigned long)scardCommand},
5224 {"spopCommand", (unsigned long)spopCommand},
5225 {"srandmemberCommand", (unsigned long)srandmemberCommand},
5226 {"sinterCommand", (unsigned long)sinterCommand},
5227 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
5228 {"sunionCommand", (unsigned long)sunionCommand},
5229 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
5230 {"sdiffCommand", (unsigned long)sdiffCommand},
5231 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
5232 {"syncCommand", (unsigned long)syncCommand},
5233 {"flushdbCommand", (unsigned long)flushdbCommand},
5234 {"flushallCommand", (unsigned long)flushallCommand},
5235 {"sortCommand", (unsigned long)sortCommand},
5236 {"lremCommand", (unsigned long)lremCommand},
5237 {"infoCommand", (unsigned long)infoCommand},
5238 {"mgetCommand", (unsigned long)mgetCommand},
5239 {"monitorCommand", (unsigned long)monitorCommand},
5240 {"expireCommand", (unsigned long)expireCommand},
5241 {"getsetCommand", (unsigned long)getsetCommand},
5242 {"ttlCommand", (unsigned long)ttlCommand},
5243 {"slaveofCommand", (unsigned long)slaveofCommand},
5244 {"debugCommand", (unsigned long)debugCommand},
5245 {"processCommand", (unsigned long)processCommand},
5246 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
5247 {"readQueryFromClient", (unsigned long)readQueryFromClient},
5248 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
5249 {"msetGenericCommand", (unsigned long)msetGenericCommand},
5250 {"msetCommand", (unsigned long)msetCommand},
5251 {"msetnxCommand", (unsigned long)msetnxCommand},
5252 {"zslCreateNode", (unsigned long)zslCreateNode},
5253 {"zslCreate", (unsigned long)zslCreate},
5254 {"zslFreeNode",(unsigned long)zslFreeNode},
5255 {"zslFree",(unsigned long)zslFree},
5256 {"zslRandomLevel",(unsigned long)zslRandomLevel},
5257 {"zslInsert",(unsigned long)zslInsert},
5258 {"zslDelete",(unsigned long)zslDelete},
5259 {"createZsetObject",(unsigned long)createZsetObject},
5260 {"zaddCommand",(unsigned long)zaddCommand},
5261 {"zrangeGenericCommand",(unsigned long)zrangeGenericCommand},
5262 {"zrangeCommand",(unsigned long)zrangeCommand},
5263 {"zrevrangeCommand",(unsigned long)zrevrangeCommand},
5264 {"zremCommand",(unsigned long)zremCommand},
5265 {"rdbSaveDoubleValue",(unsigned long)rdbSaveDoubleValue},
5266 {"rdbLoadDoubleValue",(unsigned long)rdbLoadDoubleValue},
5267 {NULL,0}
5268 };
5269
5270 /* This function try to convert a pointer into a function name. It's used in
5271 * oreder to provide a backtrace under segmentation fault that's able to
5272 * display functions declared as static (otherwise the backtrace is useless). */
5273 static char *findFuncName(void *pointer, unsigned long *offset){
5274 int i, ret = -1;
5275 unsigned long off, minoff = 0;
5276
5277 /* Try to match against the Symbol with the smallest offset */
5278 for (i=0; symsTable[i].pointer; i++) {
5279 unsigned long lp = (unsigned long) pointer;
5280
5281 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5282 off=lp-symsTable[i].pointer;
5283 if (ret < 0 || off < minoff) {
5284 minoff=off;
5285 ret=i;
5286 }
5287 }
5288 }
5289 if (ret == -1) return NULL;
5290 *offset = minoff;
5291 return symsTable[ret].name;
5292 }
5293
5294 static void *getMcontextEip(ucontext_t *uc) {
5295 #if defined(__FreeBSD__)
5296 return (void*) uc->uc_mcontext.mc_eip;
5297 #elif defined(__dietlibc__)
5298 return (void*) uc->uc_mcontext.eip;
5299 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5300 return (void*) uc->uc_mcontext->__ss.__eip;
5301 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5302 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5303 return (void*) uc->uc_mcontext->__ss.__rip;
5304 #else
5305 return (void*) uc->uc_mcontext->__ss.__eip;
5306 #endif
5307 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5308 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5309 #elif defined(__ia64__) /* Linux IA64 */
5310 return (void*) uc->uc_mcontext.sc_ip;
5311 #else
5312 return NULL;
5313 #endif
5314 }
5315
5316 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5317 void *trace[100];
5318 char **messages = NULL;
5319 int i, trace_size = 0;
5320 unsigned long offset=0;
5321 time_t uptime = time(NULL)-server.stat_starttime;
5322 ucontext_t *uc = (ucontext_t*) secret;
5323 REDIS_NOTUSED(info);
5324
5325 redisLog(REDIS_WARNING,
5326 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5327 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5328 "redis_version:%s; "
5329 "uptime_in_seconds:%d; "
5330 "connected_clients:%d; "
5331 "connected_slaves:%d; "
5332 "used_memory:%zu; "
5333 "changes_since_last_save:%lld; "
5334 "bgsave_in_progress:%d; "
5335 "last_save_time:%d; "
5336 "total_connections_received:%lld; "
5337 "total_commands_processed:%lld; "
5338 "role:%s;"
5339 ,REDIS_VERSION,
5340 uptime,
5341 listLength(server.clients)-listLength(server.slaves),
5342 listLength(server.slaves),
5343 server.usedmemory,
5344 server.dirty,
5345 server.bgsaveinprogress,
5346 server.lastsave,
5347 server.stat_numconnections,
5348 server.stat_numcommands,
5349 server.masterhost == NULL ? "master" : "slave"
5350 ));
5351
5352 trace_size = backtrace(trace, 100);
5353 /* overwrite sigaction with caller's address */
5354 if (getMcontextEip(uc) != NULL) {
5355 trace[1] = getMcontextEip(uc);
5356 }
5357 messages = backtrace_symbols(trace, trace_size);
5358
5359 for (i=1; i<trace_size; ++i) {
5360 char *fn = findFuncName(trace[i], &offset), *p;
5361
5362 p = strchr(messages[i],'+');
5363 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5364 redisLog(REDIS_WARNING,"%s", messages[i]);
5365 } else {
5366 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5367 }
5368 }
5369 free(messages);
5370 exit(0);
5371 }
5372
5373 static void setupSigSegvAction(void) {
5374 struct sigaction act;
5375
5376 sigemptyset (&act.sa_mask);
5377 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5378 * is used. Otherwise, sa_handler is used */
5379 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5380 act.sa_sigaction = segvHandler;
5381 sigaction (SIGSEGV, &act, NULL);
5382 sigaction (SIGBUS, &act, NULL);
5383 sigaction (SIGFPE, &act, NULL);
5384 sigaction (SIGILL, &act, NULL);
5385 sigaction (SIGBUS, &act, NULL);
5386 return;
5387 }
5388 #else /* HAVE_BACKTRACE */
5389 static void setupSigSegvAction(void) {
5390 }
5391 #endif /* HAVE_BACKTRACE */
5392
5393 /* =================================== Main! ================================ */
5394
5395 #ifdef __linux__
5396 int linuxOvercommitMemoryValue(void) {
5397 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5398 char buf[64];
5399
5400 if (!fp) return -1;
5401 if (fgets(buf,64,fp) == NULL) {
5402 fclose(fp);
5403 return -1;
5404 }
5405 fclose(fp);
5406
5407 return atoi(buf);
5408 }
5409
5410 void linuxOvercommitMemoryWarning(void) {
5411 if (linuxOvercommitMemoryValue() == 0) {
5412 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5413 }
5414 }
5415 #endif /* __linux__ */
5416
5417 static void daemonize(void) {
5418 int fd;
5419 FILE *fp;
5420
5421 if (fork() != 0) exit(0); /* parent exits */
5422 setsid(); /* create a new session */
5423
5424 /* Every output goes to /dev/null. If Redis is daemonized but
5425 * the 'logfile' is set to 'stdout' in the configuration file
5426 * it will not log at all. */
5427 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5428 dup2(fd, STDIN_FILENO);
5429 dup2(fd, STDOUT_FILENO);
5430 dup2(fd, STDERR_FILENO);
5431 if (fd > STDERR_FILENO) close(fd);
5432 }
5433 /* Try to write the pid file */
5434 fp = fopen(server.pidfile,"w");
5435 if (fp) {
5436 fprintf(fp,"%d\n",getpid());
5437 fclose(fp);
5438 }
5439 }
5440
5441 int main(int argc, char **argv) {
5442 initServerConfig();
5443 if (argc == 2) {
5444 ResetServerSaveParams();
5445 loadServerConfig(argv[1]);
5446 } else if (argc > 2) {
5447 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5448 exit(1);
5449 } else {
5450 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5451 }
5452 initServer();
5453 if (server.daemonize) daemonize();
5454 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5455 #ifdef __linux__
5456 linuxOvercommitMemoryWarning();
5457 #endif
5458 if (rdbLoad(server.dbfilename) == REDIS_OK)
5459 redisLog(REDIS_NOTICE,"DB loaded from disk");
5460 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5461 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5462 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5463 aeMain(server.el);
5464 aeDeleteEventLoop(server.el);
5465 return 0;
5466 }